/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package org.apache.qpid.server.store; import java.io.ByteArrayInputStream; import java.io.IOException; import java.lang.ref.SoftReference; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.sql.Types; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.log4j.Logger; import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.plugin.MessageMetaDataType; import org.apache.qpid.server.queue.AMQQueue; import org.codehaus.jackson.JsonGenerationException; import org.codehaus.jackson.JsonGenerator; import org.codehaus.jackson.JsonParseException; import org.codehaus.jackson.JsonProcessingException; import org.codehaus.jackson.Version; import org.codehaus.jackson.map.JsonMappingException; import org.codehaus.jackson.map.JsonSerializer; import org.codehaus.jackson.map.Module; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.SerializerProvider; import org.codehaus.jackson.map.module.SimpleModule; abstract public class AbstractJDBCMessageStore implements MessageStore, DurableConfigurationStore { private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION"; private static final String CONFIGURATION_VERSION_TABLE_NAME = "QPID_CONFIG_VERSION"; private static final String QUEUE_ENTRY_TABLE_NAME = "QPID_QUEUE_ENTRIES"; private static final String META_DATA_TABLE_NAME = "QPID_MESSAGE_METADATA"; private static final String MESSAGE_CONTENT_TABLE_NAME = "QPID_MESSAGE_CONTENT"; private static final String XID_TABLE_NAME = "QPID_XIDS"; private static final String XID_ACTIONS_TABLE_NAME = "QPID_XID_ACTIONS"; private static final String CONFIGURED_OBJECTS_TABLE_NAME = "QPID_CONFIGURED_OBJECTS"; private static final String CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME = "QPID_CONFIGURED_OBJECT_HIERARCHY"; private static final int DEFAULT_CONFIG_VERSION = 0; public static final Set CONFIGURATION_STORE_TABLE_NAMES = new HashSet(Arrays.asList(CONFIGURED_OBJECTS_TABLE_NAME, CONFIGURATION_VERSION_TABLE_NAME)); public static final Set MESSAGE_STORE_TABLE_NAMES = new HashSet(Arrays.asList(DB_VERSION_TABLE_NAME, META_DATA_TABLE_NAME, MESSAGE_CONTENT_TABLE_NAME, QUEUE_ENTRY_TABLE_NAME, XID_TABLE_NAME, XID_ACTIONS_TABLE_NAME)); private static final int DB_VERSION = 8; private final AtomicLong _messageId = new AtomicLong(0); private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+ DB_VERSION_TABLE_NAME + " ( version int not null )"; private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+ DB_VERSION_TABLE_NAME + " ( version ) VALUES ( ? )"; private static final String SELECT_FROM_DB_VERSION = "SELECT version FROM " + DB_VERSION_TABLE_NAME; private static final String UPDATE_DB_VERSION = "UPDATE " + DB_VERSION_TABLE_NAME + " SET version = ?"; private static final String CREATE_CONFIG_VERSION_TABLE = "CREATE TABLE "+ CONFIGURATION_VERSION_TABLE_NAME + " ( version int not null )"; private static final String INSERT_INTO_CONFIG_VERSION = "INSERT INTO "+ CONFIGURATION_VERSION_TABLE_NAME + " ( version ) VALUES ( ? )"; private static final String SELECT_FROM_CONFIG_VERSION = "SELECT version FROM " + CONFIGURATION_VERSION_TABLE_NAME; private static final String UPDATE_CONFIG_VERSION = "UPDATE " + CONFIGURATION_VERSION_TABLE_NAME + " SET version = ?"; private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_id, message_id) values (?,?)"; private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_id = ? AND message_id =?"; private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_id, message_id FROM " + QUEUE_ENTRY_TABLE_NAME + " ORDER BY queue_id, message_id"; private static final String INSERT_INTO_MESSAGE_CONTENT = "INSERT INTO " + MESSAGE_CONTENT_TABLE_NAME + "( message_id, content ) values (?, ?)"; private static final String SELECT_FROM_MESSAGE_CONTENT = "SELECT content FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ?"; private static final String DELETE_FROM_MESSAGE_CONTENT = "DELETE FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ?"; private static final String INSERT_INTO_META_DATA = "INSERT INTO " + META_DATA_TABLE_NAME + "( message_id , meta_data ) values (?, ?)"; private static final String SELECT_FROM_META_DATA = "SELECT meta_data FROM " + META_DATA_TABLE_NAME + " WHERE message_id = ?"; private static final String DELETE_FROM_META_DATA = "DELETE FROM " + META_DATA_TABLE_NAME + " WHERE message_id = ?"; private static final String SELECT_ALL_FROM_META_DATA = "SELECT message_id, meta_data FROM " + META_DATA_TABLE_NAME; private static final String INSERT_INTO_XIDS = "INSERT INTO "+ XID_TABLE_NAME +" ( format, global_id, branch_id ) values (?, ?, ?)"; private static final String DELETE_FROM_XIDS = "DELETE FROM " + XID_TABLE_NAME + " WHERE format = ? and global_id = ? and branch_id = ?"; private static final String SELECT_ALL_FROM_XIDS = "SELECT format, global_id, branch_id FROM " + XID_TABLE_NAME; private static final String INSERT_INTO_XID_ACTIONS = "INSERT INTO "+ XID_ACTIONS_TABLE_NAME +" ( format, global_id, branch_id, action_type, " + "queue_id, message_id ) values (?,?,?,?,?,?) "; private static final String DELETE_FROM_XID_ACTIONS = "DELETE FROM " + XID_ACTIONS_TABLE_NAME + " WHERE format = ? and global_id = ? and branch_id = ?"; private static final String SELECT_ALL_FROM_XID_ACTIONS = "SELECT action_type, queue_id, message_id FROM " + XID_ACTIONS_TABLE_NAME + " WHERE format = ? and global_id = ? and branch_id = ?"; private static final String INSERT_INTO_CONFIGURED_OBJECTS = "INSERT INTO " + CONFIGURED_OBJECTS_TABLE_NAME + " ( id, object_type, attributes) VALUES (?,?,?)"; private static final String UPDATE_CONFIGURED_OBJECTS = "UPDATE " + CONFIGURED_OBJECTS_TABLE_NAME + " set object_type =?, attributes = ? where id = ?"; private static final String DELETE_FROM_CONFIGURED_OBJECTS = "DELETE FROM " + CONFIGURED_OBJECTS_TABLE_NAME + " where id = ?"; private static final String FIND_CONFIGURED_OBJECT = "SELECT object_type, attributes FROM " + CONFIGURED_OBJECTS_TABLE_NAME + " where id = ?"; private static final String SELECT_FROM_CONFIGURED_OBJECTS = "SELECT id, object_type, attributes FROM " + CONFIGURED_OBJECTS_TABLE_NAME; private static final String INSERT_INTO_CONFIGURED_OBJECT_HIERARCHY = "INSERT INTO " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME + " ( child_id, parent_type, parent_id) VALUES (?,?,?)"; private static final String DELETE_FROM_CONFIGURED_OBJECT_HIERARCHY = "DELETE FROM " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME + " where child_id = ?"; private static final String SELECT_FROM_CONFIGURED_OBJECT_HIERARCHY = "SELECT child_id, parent_type, parent_id FROM " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME; protected static final Charset UTF8_CHARSET = Charset.forName("UTF-8"); private static final Module _module; static { SimpleModule module= new SimpleModule("ConfiguredObjectSerializer", new Version(1,0,0,null)); final JsonSerializer serializer = new JsonSerializer() { @Override public void serialize(final ConfiguredObject value, final JsonGenerator jgen, final SerializerProvider provider) throws IOException, JsonProcessingException { jgen.writeString(value.getId().toString()); } }; module.addSerializer(ConfiguredObject.class, serializer); _module = module; } protected final EventManager _eventManager = new EventManager(); private final AtomicBoolean _messageStoreOpen = new AtomicBoolean(); private final AtomicBoolean _configurationStoreOpen = new AtomicBoolean(); private boolean _initialized; @Override public void openConfigurationStore(ConfiguredObject parent, Map storeSettings) { if (_configurationStoreOpen.compareAndSet(false, true)) { initialiseIfNecessary(parent.getName(), storeSettings); try { createOrOpenConfigurationStoreDatabase(); upgradeIfVersionTableExists(parent); } catch(SQLException e) { throw new StoreException("Cannot create databases or upgrade", e); } } } private void initialiseIfNecessary(String virtualHostName, Map storeSettings) { if (!_initialized) { try { implementationSpecificConfiguration(virtualHostName, storeSettings); } catch (ClassNotFoundException e) { throw new StoreException("Cannot find driver class", e); } catch (SQLException e) { throw new StoreException("Unexpected exception occured", e); } _initialized = true; } } @Override public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler) { checkConfigurationStoreOpen(); try { recoveryHandler.beginConfigurationRecovery(this, getConfigVersion()); loadConfiguredObjects(recoveryHandler); setConfigVersion(recoveryHandler.completeConfigurationRecovery()); } catch (SQLException e) { throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); } } private void checkConfigurationStoreOpen() { if (!_configurationStoreOpen.get()) { throw new IllegalStateException("Configuration store is not open"); } } private void checkMessageStoreOpen() { if (!_messageStoreOpen.get()) { throw new IllegalStateException("Message store is not open"); } } private void upgradeIfVersionTableExists(ConfiguredObject parent) throws SQLException { Connection conn = newAutoCommitConnection(); try { if (tableExists(DB_VERSION_TABLE_NAME, conn)) { upgradeIfNecessary(parent); } } finally { if (conn != null) { conn.close(); } } } @Override public void openMessageStore(ConfiguredObject parent, Map messageStoreSettings) { if (_messageStoreOpen.compareAndSet(false, true)) { initialiseIfNecessary(parent.getName(), messageStoreSettings); try { createOrOpenMessageStoreDatabase(); upgradeIfNecessary(parent); } catch (SQLException e) { throw new StoreException("Unable to activate message store ", e); } } } @Override public void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler) { checkMessageStoreOpen(); if(messageRecoveryHandler != null) { try { recoverMessages(messageRecoveryHandler); } catch (SQLException e) { throw new StoreException("Error encountered when restoring message data from " + "persistent store ", e); } } if(transactionLogRecoveryHandler != null) { try { TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = recoverQueueEntries(transactionLogRecoveryHandler); recoverXids(dtxrh); } catch (SQLException e) { throw new StoreException("Error encountered when restoring distributed transaction " + "data from persistent store ", e); } } } protected void upgradeIfNecessary(ConfiguredObject parent) throws SQLException { Connection conn = newAutoCommitConnection(); try { PreparedStatement statement = conn.prepareStatement(SELECT_FROM_DB_VERSION); try { ResultSet rs = statement.executeQuery(); try { if(!rs.next()) { throw new StoreException(DB_VERSION_TABLE_NAME + " does not contain the database version"); } int version = rs.getInt(1); switch (version) { case 6: upgradeFromV6(); case 7: upgradeFromV7(parent); case DB_VERSION: return; default: throw new StoreException("Unknown database version: " + version); } } finally { rs.close(); } } finally { statement.close(); } } finally { conn.close(); } } private void upgradeFromV6() throws SQLException { updateDbVersion(7); } private void upgradeFromV7(ConfiguredObject parent) throws SQLException { Connection connection = newConnection(); try { Map> bindingsToUpdate = new HashMap>(); List others = new ArrayList(); final ObjectMapper objectMapper = new ObjectMapper(); objectMapper.registerModule(_module); PreparedStatement stmt = connection.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS); try { ResultSet rs = stmt.executeQuery(); try { while (rs.next()) { UUID id = UUID.fromString(rs.getString(1)); String objectType = rs.getString(2); Map attributes = objectMapper.readValue(getBlobAsString(rs, 3),Map.class); if(objectType.endsWith("Binding")) { bindingsToUpdate.put(id,attributes); } else { others.add(id); } } } catch (JsonMappingException e) { throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); } catch (JsonParseException e) { throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); } catch (IOException e) { throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); } finally { rs.close(); } } finally { stmt.close(); } stmt = connection.prepareStatement(INSERT_INTO_CONFIGURED_OBJECT_HIERARCHY); try { for (UUID id : others) { stmt.setString(1, id.toString()); stmt.setString(2, "VirtualHost"); stmt.setString(3, parent.getId().toString()); stmt.execute(); } for(Map.Entry> bindingEntry : bindingsToUpdate.entrySet()) { stmt.setString(1, bindingEntry.getKey().toString()); stmt.setString(2,"Queue"); stmt.setString(3, bindingEntry.getValue().remove("queue").toString()); stmt.execute(); stmt.setString(1, bindingEntry.getKey().toString()); stmt.setString(2,"Exchange"); stmt.setString(3, bindingEntry.getValue().remove("exchange").toString()); stmt.execute(); } } finally { stmt.close(); } stmt = connection.prepareStatement(UPDATE_CONFIGURED_OBJECTS); try { for(Map.Entry> bindingEntry : bindingsToUpdate.entrySet()) { stmt.setString(1, "Binding"); byte[] attributesAsBytes = objectMapper.writeValueAsBytes(bindingEntry.getValue()); ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes); stmt.setBinaryStream(2, bis, attributesAsBytes.length); stmt.setString(3, bindingEntry.getKey().toString()); stmt.execute(); } } catch (JsonMappingException e) { throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); } catch (JsonGenerationException e) { throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); } catch (IOException e) { throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); } finally { stmt.close(); } stmt = connection.prepareStatement(UPDATE_DB_VERSION); try { stmt.setInt(1, 8); stmt.execute(); } finally { stmt.close(); } connection.commit(); } finally { connection.close(); } } private void updateDbVersion(int newVersion) throws SQLException { Connection conn = newAutoCommitConnection(); try { PreparedStatement statement = conn.prepareStatement(UPDATE_DB_VERSION); try { statement.setInt(1,newVersion); statement.execute(); } finally { statement.close(); } } finally { conn.close(); } } protected abstract void implementationSpecificConfiguration(String name, Map messageStoreSettings) throws ClassNotFoundException, SQLException; abstract protected Logger getLogger(); abstract protected String getSqlBlobType(); abstract protected String getSqlVarBinaryType(int size); abstract protected String getSqlBigIntType(); protected void createOrOpenMessageStoreDatabase() throws SQLException { Connection conn = newAutoCommitConnection(); createVersionTable(conn); createQueueEntryTable(conn); createMetaDataTable(conn); createMessageContentTable(conn); createXidTable(conn); createXidActionTable(conn); conn.close(); } protected void createOrOpenConfigurationStoreDatabase() throws SQLException { Connection conn = newAutoCommitConnection(); createConfigVersionTable(conn); createConfiguredObjectsTable(conn); createConfiguredObjectHierarchyTable(conn); conn.close(); } private void createVersionTable(final Connection conn) throws SQLException { if(!tableExists(DB_VERSION_TABLE_NAME, conn)) { Statement stmt = conn.createStatement(); try { stmt.execute(CREATE_DB_VERSION_TABLE); } finally { stmt.close(); } PreparedStatement pstmt = conn.prepareStatement(INSERT_INTO_DB_VERSION); try { pstmt.setInt(1, DB_VERSION); pstmt.execute(); } finally { pstmt.close(); } } } private void createConfigVersionTable(final Connection conn) throws SQLException { if(!tableExists(CONFIGURATION_VERSION_TABLE_NAME, conn)) { Statement stmt = conn.createStatement(); try { stmt.execute(CREATE_CONFIG_VERSION_TABLE); } finally { stmt.close(); } PreparedStatement pstmt = conn.prepareStatement(INSERT_INTO_CONFIG_VERSION); try { pstmt.setInt(1, DEFAULT_CONFIG_VERSION); pstmt.execute(); } finally { pstmt.close(); } } } private void createConfiguredObjectsTable(final Connection conn) throws SQLException { if(!tableExists(CONFIGURED_OBJECTS_TABLE_NAME, conn)) { Statement stmt = conn.createStatement(); try { stmt.execute("CREATE TABLE " + CONFIGURED_OBJECTS_TABLE_NAME + " ( id VARCHAR(36) not null, object_type varchar(255), attributes "+getSqlBlobType()+", PRIMARY KEY (id))"); } finally { stmt.close(); } } } private void createConfiguredObjectHierarchyTable(final Connection conn) throws SQLException { if(!tableExists(CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME, conn)) { Statement stmt = conn.createStatement(); try { stmt.execute("CREATE TABLE " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME + " ( child_id VARCHAR(36) not null, parent_type varchar(255), parent_id VARCHAR(36), PRIMARY KEY (child_id, parent_type))"); } finally { stmt.close(); } } } private void createQueueEntryTable(final Connection conn) throws SQLException { if(!tableExists(QUEUE_ENTRY_TABLE_NAME, conn)) { Statement stmt = conn.createStatement(); try { stmt.execute("CREATE TABLE "+ QUEUE_ENTRY_TABLE_NAME +" ( queue_id varchar(36) not null, message_id " + getSqlBigIntType() + " not null, PRIMARY KEY (queue_id, message_id) )"); } finally { stmt.close(); } } } private void createMetaDataTable(final Connection conn) throws SQLException { if(!tableExists(META_DATA_TABLE_NAME, conn)) { Statement stmt = conn.createStatement(); try { stmt.execute("CREATE TABLE " + META_DATA_TABLE_NAME + " ( message_id " + getSqlBigIntType() + " not null, meta_data " + getSqlBlobType() + ", PRIMARY KEY ( message_id ) )"); } finally { stmt.close(); } } } private void createMessageContentTable(final Connection conn) throws SQLException { if(!tableExists(MESSAGE_CONTENT_TABLE_NAME, conn)) { Statement stmt = conn.createStatement(); try { stmt.execute("CREATE TABLE " + MESSAGE_CONTENT_TABLE_NAME + " ( message_id " + getSqlBigIntType() + " not null, content " + getSqlBlobType() + ", PRIMARY KEY (message_id) )"); } finally { stmt.close(); } } } private void createXidTable(final Connection conn) throws SQLException { if(!tableExists(XID_TABLE_NAME, conn)) { Statement stmt = conn.createStatement(); try { stmt.execute("CREATE TABLE " + XID_TABLE_NAME + " ( format " + getSqlBigIntType() + " not null," + " global_id " + getSqlVarBinaryType(64) + ", branch_id " + getSqlVarBinaryType(64) + " , PRIMARY KEY ( format, " + "global_id, branch_id ))"); } finally { stmt.close(); } } } private void createXidActionTable(final Connection conn) throws SQLException { if(!tableExists(XID_ACTIONS_TABLE_NAME, conn)) { Statement stmt = conn.createStatement(); try { stmt.execute("CREATE TABLE " + XID_ACTIONS_TABLE_NAME + " ( format " + getSqlBigIntType() + " not null," + " global_id " + getSqlVarBinaryType(64) + " not null, branch_id " + getSqlVarBinaryType( 64) + " not null, " + "action_type char not null, queue_id varchar(36) not null, message_id " + getSqlBigIntType() + " not null" + ", PRIMARY KEY ( " + "format, global_id, branch_id, action_type, queue_id, message_id))"); } finally { stmt.close(); } } } protected boolean tableExists(final String tableName, final Connection conn) throws SQLException { DatabaseMetaData metaData = conn.getMetaData(); ResultSet rs = metaData.getTables(null, null, "%", null); try { while(rs.next()) { final String table = rs.getString(3); if(tableName.equalsIgnoreCase(table)) { return true; } } return false; } finally { rs.close(); } } private void setConfigVersion(int version) throws SQLException { Connection conn = newAutoCommitConnection(); try { PreparedStatement stmt = conn.prepareStatement(UPDATE_CONFIG_VERSION); try { stmt.setInt(1, version); stmt.execute(); } finally { stmt.close(); } } finally { conn.close(); } } private int getConfigVersion() throws SQLException { Connection conn = newAutoCommitConnection(); try { Statement stmt = conn.createStatement(); try { ResultSet rs = stmt.executeQuery(SELECT_FROM_CONFIG_VERSION); try { if(rs.next()) { return rs.getInt(1); } return DEFAULT_CONFIG_VERSION; } finally { rs.close(); } } finally { stmt.close(); } } finally { conn.close(); } } @Override public void closeMessageStore() { if (_messageStoreOpen.compareAndSet(true, false)) { if (!_configurationStoreOpen.get()) { doClose(); } } } @Override public void closeConfigurationStore() { if (_configurationStoreOpen.compareAndSet(true, false)) { if (!_messageStoreOpen.get()) { doClose(); } } } protected abstract void doClose(); @Override public StoredMessage addMessage(StorableMessageMetaData metaData) { checkMessageStoreOpen(); if(metaData.isPersistent()) { return new StoredJDBCMessage(_messageId.incrementAndGet(), metaData); } else { return new StoredMemoryMessage(_messageId.incrementAndGet(), metaData); } } private void removeMessage(long messageId) { try { Connection conn = newConnection(); try { PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_META_DATA); try { stmt.setLong(1,messageId); int results = stmt.executeUpdate(); stmt.close(); if (results == 0) { getLogger().warn("Message metadata not found for message id " + messageId); } if (getLogger().isDebugEnabled()) { getLogger().debug("Deleted metadata for message " + messageId); } stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_CONTENT); stmt.setLong(1,messageId); results = stmt.executeUpdate(); } finally { stmt.close(); } conn.commit(); } catch(SQLException e) { try { conn.rollback(); } catch(SQLException t) { // ignore - we are re-throwing underlying exception } throw e; } finally { conn.close(); } } catch (SQLException e) { throw new StoreException("Error removing message with id " + messageId + " from database: " + e.getMessage(), e); } } @Override public void create(ConfiguredObjectRecord object) throws StoreException { checkConfigurationStoreOpen(); try { Connection conn = newConnection(); try { insertConfiguredObject(object, conn); conn.commit(); } finally { conn.close(); } } catch (SQLException e) { throw new StoreException("Error creating ConfiguredObject " + object); } } /** * Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED * isolation and with auto-commit transactions enabled. */ protected Connection newAutoCommitConnection() throws SQLException { final Connection connection = newConnection(); try { connection.setAutoCommit(true); } catch (SQLException sqlEx) { try { connection.close(); } finally { throw sqlEx; } } return connection; } /** * Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED * isolation and with auto-commit transactions disabled. */ protected Connection newConnection() throws SQLException { final Connection connection = getConnection(); try { connection.setAutoCommit(false); connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); } catch (SQLException sqlEx) { try { connection.close(); } finally { throw sqlEx; } } return connection; } protected abstract Connection getConnection() throws SQLException; @Override public Transaction newTransaction() { checkMessageStoreOpen(); return new JDBCTransaction(); } private void enqueueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws StoreException { Connection conn = connWrapper.getConnection(); try { if (getLogger().isDebugEnabled()) { getLogger().debug("Enqueuing message " + messageId + " on queue " + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() : "") + queue.getId() + "[Connection" + conn + "]"); } PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY); try { stmt.setString(1, queue.getId().toString()); stmt.setLong(2,messageId); stmt.executeUpdate(); } finally { stmt.close(); } } catch (SQLException e) { getLogger().error("Failed to enqueue: " + e.getMessage(), e); throw new StoreException("Error writing enqueued message with id " + messageId + " for queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" ) + " with id " + queue.getId() + " to database", e); } } private void dequeueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws StoreException { Connection conn = connWrapper.getConnection(); try { PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE_ENTRY); try { stmt.setString(1, queue.getId().toString()); stmt.setLong(2, messageId); int results = stmt.executeUpdate(); if(results != 1) { throw new StoreException("Unable to find message with id " + messageId + " on queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" ) + " with id " + queue.getId()); } if (getLogger().isDebugEnabled()) { getLogger().debug("Dequeuing message " + messageId + " on queue " + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() : "") + " with id " + queue.getId()); } } finally { stmt.close(); } } catch (SQLException e) { getLogger().error("Failed to dequeue: " + e.getMessage(), e); throw new StoreException("Error deleting enqueued message with id " + messageId + " for queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" ) + " with id " + queue.getId() + " from database", e); } } private void removeXid(ConnectionWrapper connWrapper, long format, byte[] globalId, byte[] branchId) throws StoreException { Connection conn = connWrapper.getConnection(); try { PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_XIDS); try { stmt.setLong(1,format); stmt.setBytes(2,globalId); stmt.setBytes(3,branchId); int results = stmt.executeUpdate(); if(results != 1) { throw new StoreException("Unable to find message with xid"); } } finally { stmt.close(); } stmt = conn.prepareStatement(DELETE_FROM_XID_ACTIONS); try { stmt.setLong(1,format); stmt.setBytes(2,globalId); stmt.setBytes(3,branchId); int results = stmt.executeUpdate(); } finally { stmt.close(); } } catch (SQLException e) { getLogger().error("Failed to dequeue: " + e.getMessage(), e); throw new StoreException("Error deleting enqueued message with xid", e); } } private void recordXid(ConnectionWrapper connWrapper, long format, byte[] globalId, byte[] branchId, Transaction.Record[] enqueues, Transaction.Record[] dequeues) throws StoreException { Connection conn = connWrapper.getConnection(); try { PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_XIDS); try { stmt.setLong(1,format); stmt.setBytes(2, globalId); stmt.setBytes(3, branchId); stmt.executeUpdate(); } finally { stmt.close(); } stmt = conn.prepareStatement(INSERT_INTO_XID_ACTIONS); try { stmt.setLong(1,format); stmt.setBytes(2, globalId); stmt.setBytes(3, branchId); if(enqueues != null) { stmt.setString(4, "E"); for(Transaction.Record record : enqueues) { stmt.setString(5, record.getResource().getId().toString()); stmt.setLong(6, record.getMessage().getMessageNumber()); stmt.executeUpdate(); } } if(dequeues != null) { stmt.setString(4, "D"); for(Transaction.Record record : dequeues) { stmt.setString(5, record.getResource().getId().toString()); stmt.setLong(6, record.getMessage().getMessageNumber()); stmt.executeUpdate(); } } } finally { stmt.close(); } } catch (SQLException e) { getLogger().error("Failed to enqueue: " + e.getMessage(), e); throw new StoreException("Error writing xid ", e); } } private static final class ConnectionWrapper { private final Connection _connection; public ConnectionWrapper(Connection conn) { _connection = conn; } public Connection getConnection() { return _connection; } } private void commitTran(ConnectionWrapper connWrapper) throws StoreException { try { Connection conn = connWrapper.getConnection(); conn.commit(); if (getLogger().isDebugEnabled()) { getLogger().debug("commit tran completed"); } conn.close(); } catch (SQLException e) { throw new StoreException("Error commit tx: " + e.getMessage(), e); } finally { } } private StoreFuture commitTranAsync(ConnectionWrapper connWrapper) throws StoreException { commitTran(connWrapper); return StoreFuture.IMMEDIATE_FUTURE; } private void abortTran(ConnectionWrapper connWrapper) throws StoreException { if (connWrapper == null) { throw new StoreException("Fatal internal error: transactional context is empty at abortTran"); } if (getLogger().isDebugEnabled()) { getLogger().debug("abort tran called: " + connWrapper.getConnection()); } try { Connection conn = connWrapper.getConnection(); conn.rollback(); conn.close(); } catch (SQLException e) { throw new StoreException("Error aborting transaction: " + e.getMessage(), e); } } private void storeMetaData(Connection conn, long messageId, StorableMessageMetaData metaData) throws SQLException { if(getLogger().isDebugEnabled()) { getLogger().debug("Adding metadata for message " + messageId); } PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_META_DATA); try { stmt.setLong(1,messageId); final int bodySize = 1 + metaData.getStorableSize(); byte[] underlying = new byte[bodySize]; underlying[0] = (byte) metaData.getType().ordinal(); ByteBuffer buf = ByteBuffer.wrap(underlying); buf.position(1); buf = buf.slice(); metaData.writeToBuffer(buf); ByteArrayInputStream bis = new ByteArrayInputStream(underlying); try { stmt.setBinaryStream(2,bis,underlying.length); int result = stmt.executeUpdate(); if(result == 0) { throw new StoreException("Unable to add meta data for message " +messageId); } } finally { try { bis.close(); } catch (IOException e) { throw new SQLException(e); } } } finally { stmt.close(); } } private void recoverMessages(MessageStoreRecoveryHandler recoveryHandler) throws SQLException { Connection conn = newAutoCommitConnection(); try { MessageStoreRecoveryHandler.StoredMessageRecoveryHandler messageHandler = recoveryHandler.begin(); Statement stmt = conn.createStatement(); try { ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_META_DATA); try { long maxId = 0; while(rs.next()) { long messageId = rs.getLong(1); if(messageId > maxId) { maxId = messageId; } byte[] dataAsBytes = getBlobAsBytes(rs, 2); ByteBuffer buf = ByteBuffer.wrap(dataAsBytes); buf.position(1); buf = buf.slice(); MessageMetaDataType type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]); StorableMessageMetaData metaData = type.createMetaData(buf); StoredJDBCMessage message = new StoredJDBCMessage(messageId, metaData, true); messageHandler.message(message); } _messageId.set(maxId); messageHandler.completeMessageRecovery(); } finally { rs.close(); } } finally { stmt.close(); } } finally { conn.close(); } } private TransactionLogRecoveryHandler.DtxRecordRecoveryHandler recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) throws SQLException { Connection conn = newAutoCommitConnection(); try { TransactionLogRecoveryHandler.QueueEntryRecoveryHandler queueEntryHandler = recoveryHandler.begin(this); Statement stmt = conn.createStatement(); try { ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY); try { while(rs.next()) { String id = rs.getString(1); long messageId = rs.getLong(2); queueEntryHandler.queueEntry(UUID.fromString(id), messageId); } } finally { rs.close(); } } finally { stmt.close(); } return queueEntryHandler.completeQueueEntryRecovery(); } finally { conn.close(); } } private static final class Xid { private final long _format; private final byte[] _globalId; private final byte[] _branchId; public Xid(long format, byte[] globalId, byte[] branchId) { _format = format; _globalId = globalId; _branchId = branchId; } public long getFormat() { return _format; } public byte[] getGlobalId() { return _globalId; } public byte[] getBranchId() { return _branchId; } } private static class RecordImpl implements Transaction.Record, TransactionLogResource, EnqueueableMessage { private long _messageNumber; private UUID _queueId; public RecordImpl(UUID queueId, long messageNumber) { _messageNumber = messageNumber; _queueId = queueId; } @Override public TransactionLogResource getResource() { return this; } @Override public EnqueueableMessage getMessage() { return this; } @Override public long getMessageNumber() { return _messageNumber; } @Override public boolean isPersistent() { return true; } @Override public StoredMessage getStoredMessage() { throw new UnsupportedOperationException(); } @Override public String getName() { return _queueId.toString(); } @Override public UUID getId() { return _queueId; } @Override public boolean isDurable() { return true; } } private void recoverXids(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh) throws SQLException { Connection conn = newAutoCommitConnection(); try { List xids = new ArrayList(); Statement stmt = conn.createStatement(); try { ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_XIDS); try { while(rs.next()) { long format = rs.getLong(1); byte[] globalId = rs.getBytes(2); byte[] branchId = rs.getBytes(3); xids.add(new Xid(format, globalId, branchId)); } } finally { rs.close(); } } finally { stmt.close(); } for(Xid xid : xids) { List enqueues = new ArrayList(); List dequeues = new ArrayList(); PreparedStatement pstmt = conn.prepareStatement(SELECT_ALL_FROM_XID_ACTIONS); try { pstmt.setLong(1, xid.getFormat()); pstmt.setBytes(2, xid.getGlobalId()); pstmt.setBytes(3, xid.getBranchId()); ResultSet rs = pstmt.executeQuery(); try { while(rs.next()) { String actionType = rs.getString(1); UUID queueId = UUID.fromString(rs.getString(2)); long messageId = rs.getLong(3); RecordImpl record = new RecordImpl(queueId, messageId); List records = "E".equals(actionType) ? enqueues : dequeues; records.add(record); } } finally { rs.close(); } } finally { pstmt.close(); } dtxrh.dtxRecord(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(), enqueues.toArray(new RecordImpl[enqueues.size()]), dequeues.toArray(new RecordImpl[dequeues.size()])); } dtxrh.completeDtxRecordRecovery(); } finally { conn.close(); } } private StorableMessageMetaData getMetaData(long messageId) throws SQLException { Connection conn = newAutoCommitConnection(); try { PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_META_DATA); try { stmt.setLong(1,messageId); ResultSet rs = stmt.executeQuery(); try { if(rs.next()) { byte[] dataAsBytes = getBlobAsBytes(rs, 1); ByteBuffer buf = ByteBuffer.wrap(dataAsBytes); buf.position(1); buf = buf.slice(); MessageMetaDataType type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]); StorableMessageMetaData metaData = type.createMetaData(buf); return metaData; } else { throw new StoreException("Meta data not found for message with id " + messageId); } } finally { rs.close(); } } finally { stmt.close(); } } finally { conn.close(); } } protected abstract byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException; private void addContent(Connection conn, long messageId, ByteBuffer src) { if(getLogger().isDebugEnabled()) { getLogger().debug("Adding content for message " + messageId); } PreparedStatement stmt = null; try { src = src.slice(); byte[] chunkData = new byte[src.limit()]; src.duplicate().get(chunkData); stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_CONTENT); stmt.setLong(1,messageId); ByteArrayInputStream bis = new ByteArrayInputStream(chunkData); stmt.setBinaryStream(2, bis, chunkData.length); stmt.executeUpdate(); } catch (SQLException e) { closeConnection(conn); throw new StoreException("Error adding content for message " + messageId + ": " + e.getMessage(), e); } finally { closePreparedStatement(stmt); } } private int getContent(long messageId, int offset, ByteBuffer dst) { Connection conn = null; PreparedStatement stmt = null; try { conn = newAutoCommitConnection(); stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_CONTENT); stmt.setLong(1,messageId); ResultSet rs = stmt.executeQuery(); int written = 0; if (rs.next()) { byte[] dataAsBytes = getBlobAsBytes(rs, 1); int size = dataAsBytes.length; if (offset > size) { throw new StoreException("Offset " + offset + " is greater than message size " + size + " for message id " + messageId + "!"); } written = size - offset; if(written > dst.remaining()) { written = dst.remaining(); } dst.put(dataAsBytes, offset, written); } return written; } catch (SQLException e) { throw new StoreException("Error retrieving content from offset " + offset + " for message " + messageId + ": " + e.getMessage(), e); } finally { closePreparedStatement(stmt); closeConnection(conn); } } @Override public boolean isPersistent() { return true; } protected class JDBCTransaction implements Transaction { private final ConnectionWrapper _connWrapper; private int _storeSizeIncrease; protected JDBCTransaction() { try { _connWrapper = new ConnectionWrapper(newConnection()); } catch (SQLException e) { throw new StoreException(e); } } @Override public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) { checkMessageStoreOpen(); final StoredMessage storedMessage = message.getStoredMessage(); if(storedMessage instanceof StoredJDBCMessage) { try { ((StoredJDBCMessage) storedMessage).store(_connWrapper.getConnection()); } catch (SQLException e) { throw new StoreException("Exception on enqueuing message into message store" + _messageId, e); } } _storeSizeIncrease += storedMessage.getMetaData().getContentSize(); AbstractJDBCMessageStore.this.enqueueMessage(_connWrapper, queue, message.getMessageNumber()); } @Override public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message) { checkMessageStoreOpen(); AbstractJDBCMessageStore.this.dequeueMessage(_connWrapper, queue, message.getMessageNumber()); } @Override public void commitTran() { checkMessageStoreOpen(); AbstractJDBCMessageStore.this.commitTran(_connWrapper); storedSizeChange(_storeSizeIncrease); } @Override public StoreFuture commitTranAsync() { checkMessageStoreOpen(); StoreFuture storeFuture = AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper); storedSizeChange(_storeSizeIncrease); return storeFuture; } @Override public void abortTran() { checkMessageStoreOpen(); AbstractJDBCMessageStore.this.abortTran(_connWrapper); } @Override public void removeXid(long format, byte[] globalId, byte[] branchId) { checkMessageStoreOpen(); AbstractJDBCMessageStore.this.removeXid(_connWrapper, format, globalId, branchId); } @Override public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues) { checkMessageStoreOpen(); AbstractJDBCMessageStore.this.recordXid(_connWrapper, format, globalId, branchId, enqueues, dequeues); } } private class StoredJDBCMessage implements StoredMessage { private final long _messageId; private final boolean _isRecovered; private StorableMessageMetaData _metaData; private volatile SoftReference _metaDataRef; private byte[] _data; private volatile SoftReference _dataRef; StoredJDBCMessage(long messageId, StorableMessageMetaData metaData) { this(messageId, metaData, false); } StoredJDBCMessage(long messageId, StorableMessageMetaData metaData, boolean isRecovered) { _messageId = messageId; _isRecovered = isRecovered; if(!_isRecovered) { _metaData = metaData; } _metaDataRef = new SoftReference(metaData); } @Override public StorableMessageMetaData getMetaData() { StorableMessageMetaData metaData = _metaData == null ? _metaDataRef.get() : _metaData; if(metaData == null) { checkMessageStoreOpen(); try { metaData = AbstractJDBCMessageStore.this.getMetaData(_messageId); } catch (SQLException e) { throw new StoreException(e); } _metaDataRef = new SoftReference(metaData); } return metaData; } @Override public long getMessageNumber() { return _messageId; } @Override public void addContent(int offsetInMessage, ByteBuffer src) { src = src.slice(); if(_data == null) { _data = new byte[src.remaining()]; _dataRef = new SoftReference(_data); src.duplicate().get(_data); } else { byte[] oldData = _data; _data = new byte[oldData.length + src.remaining()]; _dataRef = new SoftReference(_data); System.arraycopy(oldData,0,_data,0,oldData.length); src.duplicate().get(_data, oldData.length, src.remaining()); } } @Override public int getContent(int offsetInMessage, ByteBuffer dst) { byte[] data = _dataRef == null ? null : _dataRef.get(); if(data != null) { int length = Math.min(dst.remaining(), data.length - offsetInMessage); dst.put(data, offsetInMessage, length); return length; } else { checkMessageStoreOpen(); return AbstractJDBCMessageStore.this.getContent(_messageId, offsetInMessage, dst); } } @Override public ByteBuffer getContent(int offsetInMessage, int size) { ByteBuffer buf = ByteBuffer.allocate(size); int length = getContent(offsetInMessage, buf); buf.position(0); buf.limit(length); return buf; } @Override public synchronized StoreFuture flushToStore() { checkMessageStoreOpen(); Connection conn = null; try { if(!stored()) { conn = newConnection(); store(conn); conn.commit(); storedSizeChange(getMetaData().getContentSize()); } } catch (SQLException e) { if(getLogger().isDebugEnabled()) { getLogger().debug("Error when trying to flush message " + _messageId + " to store: " + e); } throw new StoreException(e); } finally { closeConnection(conn); } return StoreFuture.IMMEDIATE_FUTURE; } @Override public void remove() { checkMessageStoreOpen(); int delta = getMetaData().getContentSize(); AbstractJDBCMessageStore.this.removeMessage(_messageId); storedSizeChange(-delta); } private synchronized void store(final Connection conn) throws SQLException { if (!stored()) { try { storeMetaData(conn, _messageId, _metaData); AbstractJDBCMessageStore.this.addContent(conn, _messageId, _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data)); } finally { _metaData = null; _data = null; } if(getLogger().isDebugEnabled()) { getLogger().debug("Storing message " + _messageId + " to store"); } } } private boolean stored() { return _metaData == null || _isRecovered; } } protected void closeConnection(final Connection conn) { if(conn != null) { try { conn.close(); } catch (SQLException e) { getLogger().error("Problem closing connection", e); } } } protected void closePreparedStatement(final PreparedStatement stmt) { if (stmt != null) { try { stmt.close(); } catch(SQLException e) { getLogger().error("Problem closing prepared statement", e); } } } @Override public void addEventListener(EventListener eventListener, Event... events) { _eventManager.addEventListener(eventListener, events); } private void insertConfiguredObject(ConfiguredObjectRecord configuredObject, final Connection conn) throws StoreException { try { PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT); try { stmt.setString(1, configuredObject.getId().toString()); ResultSet rs = stmt.executeQuery(); boolean exists; try { exists = rs.next(); } finally { rs.close(); } // If we don't have any data in the result set then we can add this configured object if (!exists) { PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS); try { insertStmt.setString(1, configuredObject.getId().toString()); insertStmt.setString(2, configuredObject.getType()); if(configuredObject.getAttributes() == null) { insertStmt.setNull(3, Types.BLOB); } else { final Map attributes = configuredObject.getAttributes(); final ObjectMapper objectMapper = new ObjectMapper(); objectMapper.registerModule(_module); byte[] attributesAsBytes = objectMapper.writeValueAsBytes(attributes); ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes); insertStmt.setBinaryStream(3, bis, attributesAsBytes.length); } insertStmt.execute(); } finally { insertStmt.close(); } writeHierarchy(configuredObject, conn); } } finally { stmt.close(); } } catch (JsonMappingException e) { throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e); } catch (JsonGenerationException e) { throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e); } catch (IOException e) { throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e); } catch (SQLException e) { throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e); } } @Override public UUID[] remove(ConfiguredObjectRecord... objects) throws StoreException { checkConfigurationStoreOpen(); Collection removed = new ArrayList(objects.length); try { Connection conn = newAutoCommitConnection(); try { for(ConfiguredObjectRecord record : objects) { if(removeConfiguredObject(record.getId(), conn) != 0) { removed.add(record.getId()); } } } finally { conn.close(); } } catch (SQLException e) { throw new StoreException("Error deleting of configured objects " + Arrays.asList(objects) + " from database: " + e.getMessage(), e); } return removed.toArray(new UUID[removed.size()]); } private int removeConfiguredObject(final UUID id, final Connection conn) throws SQLException { final int results; PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_CONFIGURED_OBJECTS); try { stmt.setString(1, id.toString()); results = stmt.executeUpdate(); } finally { stmt.close(); } stmt = conn.prepareStatement(DELETE_FROM_CONFIGURED_OBJECT_HIERARCHY); try { stmt.setString(1, id.toString()); stmt.executeUpdate(); } finally { stmt.close(); } return results; } public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException { checkConfigurationStoreOpen(); try { Connection conn = newConnection(); try { for(ConfiguredObjectRecord record : records) { updateConfiguredObject(record, createIfNecessary, conn); } conn.commit(); } finally { conn.close(); } } catch (SQLException e) { throw new StoreException("Error updating configured objects in database: " + e.getMessage(), e); } } private void updateConfiguredObject(ConfiguredObjectRecord configuredObject, boolean createIfNecessary, Connection conn) throws SQLException, StoreException { PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT); try { stmt.setString(1, configuredObject.getId().toString()); ResultSet rs = stmt.executeQuery(); try { final ObjectMapper objectMapper = new ObjectMapper(); objectMapper.registerModule(_module); if (rs.next()) { PreparedStatement stmt2 = conn.prepareStatement(UPDATE_CONFIGURED_OBJECTS); try { stmt2.setString(1, configuredObject.getType()); if (configuredObject.getAttributes() != null) { byte[] attributesAsBytes = objectMapper.writeValueAsBytes( configuredObject.getAttributes()); ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes); stmt2.setBinaryStream(2, bis, attributesAsBytes.length); } else { stmt2.setNull(2, Types.BLOB); } stmt2.setString(3, configuredObject.getId().toString()); stmt2.execute(); } finally { stmt2.close(); } } else if(createIfNecessary) { PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS); try { insertStmt.setString(1, configuredObject.getId().toString()); insertStmt.setString(2, configuredObject.getType()); if(configuredObject.getAttributes() == null) { insertStmt.setNull(3, Types.BLOB); } else { final Map attributes = configuredObject.getAttributes(); byte[] attributesAsBytes = objectMapper.writeValueAsBytes(attributes); ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes); insertStmt.setBinaryStream(3, bis, attributesAsBytes.length); } insertStmt.execute(); } finally { insertStmt.close(); } writeHierarchy(configuredObject, conn); } } finally { rs.close(); } } catch (JsonMappingException e) { throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e); } catch (JsonGenerationException e) { throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e); } catch (IOException e) { throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e); } finally { stmt.close(); } } private void writeHierarchy(final ConfiguredObjectRecord configuredObject, final Connection conn) throws SQLException, StoreException { PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECT_HIERARCHY); try { for(Map.Entry parentEntry : configuredObject.getParents().entrySet()) { insertStmt.setString(1, configuredObject.getId().toString()); insertStmt.setString(2, parentEntry.getKey()); insertStmt.setString(3, parentEntry.getValue().getId().toString()); insertStmt.execute(); } } finally { insertStmt.close(); } } private void loadConfiguredObjects(ConfigurationRecoveryHandler recoveryHandler) throws SQLException, StoreException { Connection conn = newAutoCommitConnection(); Map configuredObjects = new HashMap(); final ObjectMapper objectMapper = new ObjectMapper(); try { PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS); try { ResultSet rs = stmt.executeQuery(); try { while (rs.next()) { String id = rs.getString(1); String objectType = rs.getString(2); String attributes = getBlobAsString(rs, 3); final ConfiguredObjectRecordImpl configuredObjectRecord = new ConfiguredObjectRecordImpl(UUID.fromString(id), objectType, objectMapper.readValue(attributes, Map.class)); configuredObjects.put(configuredObjectRecord.getId(),configuredObjectRecord); } } catch (JsonMappingException e) { throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); } catch (JsonParseException e) { throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); } catch (IOException e) { throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); } finally { rs.close(); } } finally { stmt.close(); } stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECT_HIERARCHY); try { ResultSet rs = stmt.executeQuery(); try { while (rs.next()) { UUID childId = UUID.fromString(rs.getString(1)); String parentType = rs.getString(2); UUID parentId = UUID.fromString(rs.getString(3)); ConfiguredObjectRecordImpl child = configuredObjects.get(childId); ConfiguredObjectRecordImpl parent = configuredObjects.get(parentId); if(child != null && parent != null) { child.addParent(parentType, parent); } else if(child != null && child.getType().endsWith("Binding") && parentType.equals("Exchange")) { // TODO - remove this hack for amq. exchanges child.addParent(parentType, new ConfiguredObjectRecordImpl(parentId, parentType, Collections.emptyMap())); } } } finally { rs.close(); } } finally { stmt.close(); } } finally { conn.close(); } for(ConfiguredObjectRecord record : configuredObjects.values()) { recoveryHandler.configuredObject(record); } } protected abstract String getBlobAsString(ResultSet rs, int col) throws SQLException; protected abstract void storedSizeChange(int storeSizeIncrease); @Override public void onDelete() { // TODO should probably check we are closed try { Connection conn = newAutoCommitConnection(); try { List tables = new ArrayList(); tables.addAll(CONFIGURATION_STORE_TABLE_NAMES); tables.addAll(MESSAGE_STORE_TABLE_NAMES); for (String tableName : tables) { Statement stmt = conn.createStatement(); try { stmt.execute("DROP TABLE " + tableName); } catch(SQLException e) { getLogger().warn("Failed to drop table '" + tableName + "' :" + e); } finally { stmt.close(); } } } finally { conn.close(); } } catch(SQLException e) { getLogger().error("Exception while deleting store tables", e); } } private static final class ConfiguredObjectRecordImpl implements ConfiguredObjectRecord { private final UUID _id; private final String _type; private final Map _attributes; private final Map _parents = new HashMap(); private ConfiguredObjectRecordImpl(final UUID id, final String type, final Map attributes) { _id = id; _type = type; _attributes = Collections.unmodifiableMap(attributes); } @Override public UUID getId() { return _id; } @Override public String getType() { return _type; } private void addParent(String parentType, ConfiguredObjectRecord parent) { _parents.put(parentType, parent); } @Override public Map getAttributes() { return _attributes; } @Override public Map getParents() { return Collections.unmodifiableMap(_parents); } } }