summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java2363
1 files changed, 2363 insertions, 0 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
new file mode 100644
index 0000000000..4a1452d86c
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -0,0 +1,2363 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.store;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.lang.ref.SoftReference;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.plugin.MessageMetaDataType;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+
+abstract public class AbstractJDBCMessageStore implements MessageStore, DurableConfigurationStore
+{
+ private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION";
+ private static final String CONFIGURATION_VERSION_TABLE_NAME = "QPID_CONFIG_VERSION";
+
+ private static final String QUEUE_ENTRY_TABLE_NAME = "QPID_QUEUE_ENTRIES";
+
+ private static final String META_DATA_TABLE_NAME = "QPID_MESSAGE_METADATA";
+ private static final String MESSAGE_CONTENT_TABLE_NAME = "QPID_MESSAGE_CONTENT";
+
+ private static final String LINKS_TABLE_NAME = "QPID_LINKS";
+ private static final String BRIDGES_TABLE_NAME = "QPID_BRIDGES";
+
+ private static final String XID_TABLE_NAME = "QPID_XIDS";
+ private static final String XID_ACTIONS_TABLE_NAME = "QPID_XID_ACTIONS";
+
+ private static final String CONFIGURED_OBJECTS_TABLE_NAME = "QPID_CONFIGURED_OBJECTS";
+ private static final int DEFAULT_CONFIG_VERSION = 0;
+
+ public static String[] ALL_TABLES = new String[] { DB_VERSION_TABLE_NAME, LINKS_TABLE_NAME, BRIDGES_TABLE_NAME, XID_ACTIONS_TABLE_NAME,
+ XID_TABLE_NAME, QUEUE_ENTRY_TABLE_NAME, MESSAGE_CONTENT_TABLE_NAME, META_DATA_TABLE_NAME, CONFIGURED_OBJECTS_TABLE_NAME, CONFIGURATION_VERSION_TABLE_NAME };
+
+ private static final int DB_VERSION = 7;
+
+ private final AtomicLong _messageId = new AtomicLong(0);
+ private AtomicBoolean _closed = new AtomicBoolean(false);
+
+ private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+ DB_VERSION_TABLE_NAME + " ( version int not null )";
+ private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+ DB_VERSION_TABLE_NAME + " ( version ) VALUES ( ? )";
+ private static final String SELECT_FROM_DB_VERSION = "SELECT version FROM " + DB_VERSION_TABLE_NAME;
+ private static final String UPDATE_DB_VERSION = "UPDATE " + DB_VERSION_TABLE_NAME + " SET version = ?";
+
+
+ private static final String CREATE_CONFIG_VERSION_TABLE = "CREATE TABLE "+ CONFIGURATION_VERSION_TABLE_NAME + " ( version int not null )";
+ private static final String INSERT_INTO_CONFIG_VERSION = "INSERT INTO "+ CONFIGURATION_VERSION_TABLE_NAME + " ( version ) VALUES ( ? )";
+ private static final String SELECT_FROM_CONFIG_VERSION = "SELECT version FROM " + CONFIGURATION_VERSION_TABLE_NAME;
+ private static final String UPDATE_CONFIG_VERSION = "UPDATE " + CONFIGURATION_VERSION_TABLE_NAME + " SET version = ?";
+
+
+ private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_id, message_id) values (?,?)";
+ private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_id = ? AND message_id =?";
+ private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_id, message_id FROM " + QUEUE_ENTRY_TABLE_NAME + " ORDER BY queue_id, message_id";
+ private static final String INSERT_INTO_MESSAGE_CONTENT = "INSERT INTO " + MESSAGE_CONTENT_TABLE_NAME
+ + "( message_id, content ) values (?, ?)";
+ private static final String SELECT_FROM_MESSAGE_CONTENT = "SELECT content FROM " + MESSAGE_CONTENT_TABLE_NAME
+ + " WHERE message_id = ?";
+ private static final String DELETE_FROM_MESSAGE_CONTENT = "DELETE FROM " + MESSAGE_CONTENT_TABLE_NAME
+ + " WHERE message_id = ?";
+
+ private static final String INSERT_INTO_META_DATA = "INSERT INTO " + META_DATA_TABLE_NAME + "( message_id , meta_data ) values (?, ?)";
+ private static final String SELECT_FROM_META_DATA =
+ "SELECT meta_data FROM " + META_DATA_TABLE_NAME + " WHERE message_id = ?";
+ private static final String DELETE_FROM_META_DATA = "DELETE FROM " + META_DATA_TABLE_NAME + " WHERE message_id = ?";
+ private static final String SELECT_ALL_FROM_META_DATA = "SELECT message_id, meta_data FROM " + META_DATA_TABLE_NAME;
+
+ private static final String SELECT_FROM_LINKS =
+ "SELECT create_time, arguments FROM " + LINKS_TABLE_NAME + " WHERE id_lsb = ? and id_msb";
+ private static final String DELETE_FROM_LINKS = "DELETE FROM " + LINKS_TABLE_NAME
+ + " WHERE id_lsb = ? and id_msb = ?";
+ private static final String SELECT_ALL_FROM_LINKS = "SELECT id_lsb, id_msb, create_time, "
+ + "arguments FROM " + LINKS_TABLE_NAME;
+ private static final String FIND_LINK = "SELECT id_lsb, id_msb FROM " + LINKS_TABLE_NAME + " WHERE id_lsb = ? and"
+ + " id_msb = ?";
+ private static final String INSERT_INTO_LINKS = "INSERT INTO " + LINKS_TABLE_NAME + "( id_lsb, "
+ + "id_msb, create_time, arguments ) values (?, ?, ?, ?)";
+ private static final String SELECT_FROM_BRIDGES =
+ "SELECT create_time, link_id_lsb, link_id_msb, arguments FROM "
+ + BRIDGES_TABLE_NAME + " WHERE id_lsb = ? and id_msb = ?";
+ private static final String DELETE_FROM_BRIDGES = "DELETE FROM " + BRIDGES_TABLE_NAME
+ + " WHERE id_lsb = ? and id_msb = ?";
+ private static final String SELECT_ALL_FROM_BRIDGES = "SELECT id_lsb, id_msb, "
+ + " create_time,"
+ + " link_id_lsb, link_id_msb, "
+ + "arguments FROM " + BRIDGES_TABLE_NAME
+ + " WHERE link_id_lsb = ? and link_id_msb = ?";
+ private static final String FIND_BRIDGE = "SELECT id_lsb, id_msb FROM " + BRIDGES_TABLE_NAME +
+ " WHERE id_lsb = ? and id_msb = ?";
+ private static final String INSERT_INTO_BRIDGES = "INSERT INTO " + BRIDGES_TABLE_NAME + "( id_lsb, id_msb, "
+ + "create_time, "
+ + "link_id_lsb, link_id_msb, "
+ + "arguments )"
+ + " values (?, ?, ?, ?, ?, ?)";
+
+ private static final String INSERT_INTO_XIDS =
+ "INSERT INTO "+ XID_TABLE_NAME +" ( format, global_id, branch_id ) values (?, ?, ?)";
+ private static final String DELETE_FROM_XIDS = "DELETE FROM " + XID_TABLE_NAME
+ + " WHERE format = ? and global_id = ? and branch_id = ?";
+ private static final String SELECT_ALL_FROM_XIDS = "SELECT format, global_id, branch_id FROM " + XID_TABLE_NAME;
+ private static final String INSERT_INTO_XID_ACTIONS =
+ "INSERT INTO "+ XID_ACTIONS_TABLE_NAME +" ( format, global_id, branch_id, action_type, " +
+ "queue_id, message_id ) values (?,?,?,?,?,?) ";
+ private static final String DELETE_FROM_XID_ACTIONS = "DELETE FROM " + XID_ACTIONS_TABLE_NAME
+ + " WHERE format = ? and global_id = ? and branch_id = ?";
+ private static final String SELECT_ALL_FROM_XID_ACTIONS =
+ "SELECT action_type, queue_id, message_id FROM " + XID_ACTIONS_TABLE_NAME +
+ " WHERE format = ? and global_id = ? and branch_id = ?";
+ private static final String INSERT_INTO_CONFIGURED_OBJECTS = "INSERT INTO " + CONFIGURED_OBJECTS_TABLE_NAME
+ + " ( id, object_type, attributes) VALUES (?,?,?)";
+ private static final String UPDATE_CONFIGURED_OBJECTS = "UPDATE " + CONFIGURED_OBJECTS_TABLE_NAME
+ + " set object_type =?, attributes = ? where id = ?";
+ private static final String DELETE_FROM_CONFIGURED_OBJECTS = "DELETE FROM " + CONFIGURED_OBJECTS_TABLE_NAME
+ + " where id = ?";
+ private static final String FIND_CONFIGURED_OBJECT = "SELECT object_type, attributes FROM " + CONFIGURED_OBJECTS_TABLE_NAME
+ + " where id = ?";
+ private static final String SELECT_FROM_CONFIGURED_OBJECTS = "SELECT id, object_type, attributes FROM " + CONFIGURED_OBJECTS_TABLE_NAME;
+
+ protected static final Charset UTF8_CHARSET = Charset.forName("UTF-8");
+
+ protected final EventManager _eventManager = new EventManager();
+
+ protected final StateManager _stateManager;
+
+ private MessageStoreRecoveryHandler _messageRecoveryHandler;
+ private TransactionLogRecoveryHandler _tlogRecoveryHandler;
+ private ConfigurationRecoveryHandler _configRecoveryHandler;
+ private VirtualHost _virtualHost;
+
+ public AbstractJDBCMessageStore()
+ {
+ _stateManager = new StateManager(_eventManager);
+ }
+
+ @Override
+ public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler configRecoveryHandler) throws Exception
+ {
+ _stateManager.attainState(State.INITIALISING);
+ _configRecoveryHandler = configRecoveryHandler;
+ _virtualHost = virtualHost;
+
+ }
+
+ @Override
+ public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler recoveryHandler,
+ TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception
+ {
+ if(_stateManager.isInState(State.INITIAL))
+ {
+ _stateManager.attainState(State.INITIALISING);
+ }
+
+ _virtualHost = virtualHost;
+ _tlogRecoveryHandler = tlogRecoveryHandler;
+ _messageRecoveryHandler = recoveryHandler;
+
+ completeInitialisation();
+ }
+
+ private void completeInitialisation() throws ClassNotFoundException, SQLException, AMQStoreException
+ {
+ commonConfiguration();
+
+ _stateManager.attainState(State.INITIALISED);
+ }
+
+ @Override
+ public void activate() throws Exception
+ {
+ if(_stateManager.isInState(State.INITIALISING))
+ {
+ completeInitialisation();
+ }
+ _stateManager.attainState(State.ACTIVATING);
+
+ // this recovers durable exchanges, queues, and bindings
+ if(_configRecoveryHandler != null)
+ {
+ recoverConfiguration(_configRecoveryHandler);
+ }
+ if(_messageRecoveryHandler != null)
+ {
+ recoverMessages(_messageRecoveryHandler);
+ }
+ if(_tlogRecoveryHandler != null)
+ {
+ TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = recoverQueueEntries(_tlogRecoveryHandler);
+ recoverXids(dtxrh);
+
+ }
+
+ _stateManager.attainState(State.ACTIVE);
+ }
+
+ private void commonConfiguration()
+ throws ClassNotFoundException, SQLException, AMQStoreException
+ {
+ implementationSpecificConfiguration(_virtualHost.getName(), _virtualHost);
+ createOrOpenDatabase();
+ upgradeIfNecessary();
+ }
+
+ protected void upgradeIfNecessary() throws SQLException, AMQStoreException
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+
+ PreparedStatement statement = conn.prepareStatement(SELECT_FROM_DB_VERSION);
+ try
+ {
+ ResultSet rs = statement.executeQuery();
+ try
+ {
+ if(!rs.next())
+ {
+ throw new AMQStoreException(DB_VERSION_TABLE_NAME + " does not contain the database version");
+ }
+ int version = rs.getInt(1);
+ switch (version)
+ {
+ case 6:
+ upgradeFromV6();
+ case DB_VERSION:
+ return;
+ default:
+ throw new AMQStoreException("Unknown database version: " + version);
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ statement.close();
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+
+ }
+
+ private void upgradeFromV6() throws SQLException
+ {
+ updateDbVersion(7);
+ }
+
+ private void updateDbVersion(int newVersion) throws SQLException
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+
+ PreparedStatement statement = conn.prepareStatement(UPDATE_DB_VERSION);
+ try
+ {
+ statement.setInt(1,newVersion);
+ statement.execute();
+ }
+ finally
+ {
+ statement.close();
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+
+ protected abstract void implementationSpecificConfiguration(String name,
+ VirtualHost virtualHost) throws ClassNotFoundException, SQLException;
+
+ abstract protected Logger getLogger();
+
+ abstract protected String getSqlBlobType();
+
+ abstract protected String getSqlVarBinaryType(int size);
+
+ abstract protected String getSqlBigIntType();
+
+ protected void createOrOpenDatabase() throws SQLException
+ {
+ Connection conn = newAutoCommitConnection();
+
+ createVersionTable(conn);
+ createConfigVersionTable(conn);
+ createConfiguredObjectsTable(conn);
+ createQueueEntryTable(conn);
+ createMetaDataTable(conn);
+ createMessageContentTable(conn);
+ createLinkTable(conn);
+ createBridgeTable(conn);
+ createXidTable(conn);
+ createXidActionTable(conn);
+ conn.close();
+ }
+
+ private void createVersionTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(DB_VERSION_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute(CREATE_DB_VERSION_TABLE);
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ PreparedStatement pstmt = conn.prepareStatement(INSERT_INTO_DB_VERSION);
+ try
+ {
+ pstmt.setInt(1, DB_VERSION);
+ pstmt.execute();
+ }
+ finally
+ {
+ pstmt.close();
+ }
+ }
+ }
+
+ private void createConfigVersionTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(CONFIGURATION_VERSION_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute(CREATE_CONFIG_VERSION_TABLE);
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ PreparedStatement pstmt = conn.prepareStatement(INSERT_INTO_CONFIG_VERSION);
+ try
+ {
+ pstmt.setInt(1, DEFAULT_CONFIG_VERSION);
+ pstmt.execute();
+ }
+ finally
+ {
+ pstmt.close();
+ }
+ }
+ }
+
+ private void createConfiguredObjectsTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(CONFIGURED_OBJECTS_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute("CREATE TABLE " + CONFIGURED_OBJECTS_TABLE_NAME
+ + " ( id VARCHAR(36) not null, object_type varchar(255), attributes "+getSqlBlobType()+", PRIMARY KEY (id))");
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ }
+
+
+
+ private void createQueueEntryTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(QUEUE_ENTRY_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute("CREATE TABLE "+ QUEUE_ENTRY_TABLE_NAME +" ( queue_id varchar(36) not null, message_id "
+ + getSqlBigIntType() + " not null, PRIMARY KEY (queue_id, message_id) )");
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+
+ }
+
+ private void createMetaDataTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(META_DATA_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute("CREATE TABLE "
+ + META_DATA_TABLE_NAME
+ + " ( message_id "
+ + getSqlBigIntType()
+ + " not null, meta_data "
+ + getSqlBlobType()
+ + ", PRIMARY KEY ( message_id ) )");
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+
+ }
+
+ private void createMessageContentTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(MESSAGE_CONTENT_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute("CREATE TABLE "
+ + MESSAGE_CONTENT_TABLE_NAME
+ + " ( message_id "
+ + getSqlBigIntType()
+ + " not null, content "
+ + getSqlBlobType()
+ + ", PRIMARY KEY (message_id) )");
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+
+ }
+
+ private void createLinkTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(LINKS_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute("CREATE TABLE "+ LINKS_TABLE_NAME +" ( id_lsb " + getSqlBigIntType() + " not null,"
+ + " id_msb " + getSqlBigIntType() + " not null,"
+ + " create_time " + getSqlBigIntType() + " not null,"
+ + " arguments "+getSqlBlobType()+", PRIMARY KEY ( id_lsb, id_msb ))");
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ }
+
+ private void createBridgeTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(BRIDGES_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute("CREATE TABLE "+ BRIDGES_TABLE_NAME +" ( id_lsb " + getSqlBigIntType() + " not null,"
+ + " id_msb " + getSqlBigIntType() + " not null,"
+ + " create_time " + getSqlBigIntType() + " not null,"
+ + " link_id_lsb " + getSqlBigIntType() + " not null,"
+ + " link_id_msb " + getSqlBigIntType() + " not null,"
+ + " arguments "+getSqlBlobType()+", PRIMARY KEY ( id_lsb, id_msb ))");
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ }
+
+ private void createXidTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(XID_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute("CREATE TABLE "
+ + XID_TABLE_NAME
+ + " ( format " + getSqlBigIntType() + " not null,"
+ + " global_id "
+ + getSqlVarBinaryType(64)
+ + ", branch_id "
+ + getSqlVarBinaryType(64)
+ + " , PRIMARY KEY ( format, "
+ +
+ "global_id, branch_id ))");
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ }
+
+ private void createXidActionTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(XID_ACTIONS_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute("CREATE TABLE " + XID_ACTIONS_TABLE_NAME + " ( format " + getSqlBigIntType() + " not null,"
+ + " global_id " + getSqlVarBinaryType(64) + " not null, branch_id " + getSqlVarBinaryType(
+ 64) + " not null, " +
+ "action_type char not null, queue_id varchar(36) not null, message_id " + getSqlBigIntType() + " not null" +
+ ", PRIMARY KEY ( " +
+ "format, global_id, branch_id, action_type, queue_id, message_id))");
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ }
+
+ protected boolean tableExists(final String tableName, final Connection conn) throws SQLException
+ {
+ DatabaseMetaData metaData = conn.getMetaData();
+ ResultSet rs = metaData.getTables(null, null, "%", null);
+
+ try
+ {
+
+ while(rs.next())
+ {
+ final String table = rs.getString(3);
+ if(tableName.equalsIgnoreCase(table))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+
+ protected void recoverConfiguration(ConfigurationRecoveryHandler recoveryHandler) throws AMQException
+ {
+ try
+ {
+ recoveryHandler.beginConfigurationRecovery(this, getConfigVersion());
+ loadConfiguredObjects(recoveryHandler);
+
+ setConfigVersion(recoveryHandler.completeConfigurationRecovery());
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+ }
+
+ private void setConfigVersion(int version) throws SQLException
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+
+ PreparedStatement stmt = conn.prepareStatement(UPDATE_CONFIG_VERSION);
+ try
+ {
+ stmt.setInt(1, version);
+ stmt.execute();
+
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+
+ private int getConfigVersion() throws SQLException
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+
+ Statement stmt = conn.createStatement();
+ try
+ {
+ ResultSet rs = stmt.executeQuery(SELECT_FROM_CONFIG_VERSION);
+ try
+ {
+
+ if(rs.next())
+ {
+ return rs.getInt(1);
+ }
+ return DEFAULT_CONFIG_VERSION;
+ }
+ finally
+ {
+ rs.close();
+ }
+
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+
+ }
+
+ @Override
+ public void close() throws Exception
+ {
+ _closed.getAndSet(true);
+ _stateManager.attainState(State.CLOSING);
+
+ doClose();
+
+ _stateManager.attainState(State.CLOSED);
+ }
+
+
+ protected abstract void doClose() throws Exception;
+
+ @Override
+ public StoredMessage addMessage(StorableMessageMetaData metaData)
+ {
+ if(metaData.isPersistent())
+ {
+ return new StoredJDBCMessage(_messageId.incrementAndGet(), metaData);
+ }
+ else
+ {
+ return new StoredMemoryMessage(_messageId.incrementAndGet(), metaData);
+ }
+ }
+
+ public StoredMessage getMessage(long messageNumber)
+ {
+ return null;
+ }
+
+ public void removeMessage(long messageId)
+ {
+ try
+ {
+ Connection conn = newConnection();
+ try
+ {
+ PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_META_DATA);
+ try
+ {
+ stmt.setLong(1,messageId);
+ int results = stmt.executeUpdate();
+ stmt.close();
+
+ if (results == 0)
+ {
+ getLogger().warn("Message metadata not found for message id " + messageId);
+ }
+
+ if (getLogger().isDebugEnabled())
+ {
+ getLogger().debug("Deleted metadata for message " + messageId);
+ }
+
+ stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_CONTENT);
+ stmt.setLong(1,messageId);
+ results = stmt.executeUpdate();
+ }
+ finally
+ {
+ stmt.close();
+ }
+ conn.commit();
+ }
+ catch(SQLException e)
+ {
+ try
+ {
+ conn.rollback();
+ }
+ catch(SQLException t)
+ {
+ // ignore - we are re-throwing underlying exception
+ }
+
+ throw e;
+
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new RuntimeException("Error removing message with id " + messageId + " from database: " + e.getMessage(), e);
+ }
+
+ }
+
+
+ @Override
+ public void create(UUID id, String type, Map<String,Object> attributes) throws AMQStoreException
+ {
+ if (_stateManager.isInState(State.ACTIVE))
+ {
+ insertConfiguredObject(new ConfiguredObjectRecord(id, type, attributes));
+ }
+
+ }
+
+ @Override
+ public void remove(UUID id, String type) throws AMQStoreException
+ {
+ int results = removeConfiguredObject(id);
+ if (results == 0)
+ {
+ throw new AMQStoreException(type + " with id " + id + " not found");
+ }
+ }
+
+ @Override
+ public void update(UUID id, String type, Map<String, Object> attributes) throws AMQStoreException
+ {
+ if (_stateManager.isInState(State.ACTIVE))
+ {
+ ConfiguredObjectRecord queueConfiguredObject = loadConfiguredObject(id);
+ if (queueConfiguredObject != null)
+ {
+ ConfiguredObjectRecord newQueueRecord = new ConfiguredObjectRecord(id, type, attributes);
+ updateConfiguredObject(newQueueRecord);
+ }
+ }
+
+ }
+
+ /**
+ * Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED
+ * isolation and with auto-commit transactions enabled.
+ */
+ protected Connection newAutoCommitConnection() throws SQLException
+ {
+ final Connection connection = newConnection();
+ try
+ {
+ connection.setAutoCommit(true);
+ }
+ catch (SQLException sqlEx)
+ {
+
+ try
+ {
+ connection.close();
+ }
+ finally
+ {
+ throw sqlEx;
+ }
+ }
+
+ return connection;
+ }
+
+ /**
+ * Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED
+ * isolation and with auto-commit transactions disabled.
+ */
+ protected Connection newConnection() throws SQLException
+ {
+ final Connection connection = getConnection();
+ try
+ {
+ connection.setAutoCommit(false);
+ connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
+ }
+ catch (SQLException sqlEx)
+ {
+ try
+ {
+ connection.close();
+ }
+ finally
+ {
+ throw sqlEx;
+ }
+ }
+ return connection;
+ }
+
+ protected abstract Connection getConnection() throws SQLException;
+
+ private byte[] convertStringMapToBytes(final Map<String, String> arguments) throws AMQStoreException
+ {
+ byte[] argumentBytes;
+ if(arguments == null)
+ {
+ argumentBytes = new byte[0];
+ }
+ else
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+
+
+ try
+ {
+ dos.writeInt(arguments.size());
+ for(Map.Entry<String,String> arg : arguments.entrySet())
+ {
+ dos.writeUTF(arg.getKey());
+ dos.writeUTF(arg.getValue());
+ }
+ }
+ catch (IOException e)
+ {
+ // This should never happen
+ throw new AMQStoreException(e.getMessage(), e);
+ }
+ argumentBytes = bos.toByteArray();
+ }
+ return argumentBytes;
+ }
+
+ @Override
+ public Transaction newTransaction()
+ {
+ return new JDBCTransaction();
+ }
+
+ public void enqueueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws AMQStoreException
+ {
+ Connection conn = connWrapper.getConnection();
+
+
+ try
+ {
+ if (getLogger().isDebugEnabled())
+ {
+ getLogger().debug("Enqueuing message "
+ + messageId
+ + " on queue "
+ + (queue instanceof AMQQueue
+ ? ((AMQQueue) queue).getName()
+ : "")
+ + queue.getId()
+ + "[Connection"
+ + conn
+ + "]");
+ }
+
+ PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY);
+ try
+ {
+ stmt.setString(1, queue.getId().toString());
+ stmt.setLong(2,messageId);
+ stmt.executeUpdate();
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ }
+ catch (SQLException e)
+ {
+ getLogger().error("Failed to enqueue: " + e.getMessage(), e);
+ throw new AMQStoreException("Error writing enqueued message with id " + messageId + " for queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" ) + " with id " + queue.getId()
+ + " to database", e);
+ }
+
+ }
+
+ public void dequeueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws AMQStoreException
+ {
+
+ Connection conn = connWrapper.getConnection();
+
+
+ try
+ {
+ PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE_ENTRY);
+ try
+ {
+ stmt.setString(1, queue.getId().toString());
+ stmt.setLong(2, messageId);
+ int results = stmt.executeUpdate();
+
+
+
+ if(results != 1)
+ {
+ throw new AMQStoreException("Unable to find message with id " + messageId + " on queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" )
+ + " with id " + queue.getId());
+ }
+
+ if (getLogger().isDebugEnabled())
+ {
+ getLogger().debug("Dequeuing message " + messageId + " on queue " + (queue instanceof AMQQueue
+ ? ((AMQQueue) queue).getName()
+ : "")
+ + " with id " + queue.getId());
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ }
+ catch (SQLException e)
+ {
+ getLogger().error("Failed to dequeue: " + e.getMessage(), e);
+ throw new AMQStoreException("Error deleting enqueued message with id " + messageId + " for queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" )
+ + " with id " + queue.getId() + " from database", e);
+ }
+
+ }
+
+ private void removeXid(ConnectionWrapper connWrapper, long format, byte[] globalId, byte[] branchId)
+ throws AMQStoreException
+ {
+ Connection conn = connWrapper.getConnection();
+
+
+ try
+ {
+ PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_XIDS);
+ try
+ {
+ stmt.setLong(1,format);
+ stmt.setBytes(2,globalId);
+ stmt.setBytes(3,branchId);
+ int results = stmt.executeUpdate();
+
+
+
+ if(results != 1)
+ {
+ throw new AMQStoreException("Unable to find message with xid");
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ stmt = conn.prepareStatement(DELETE_FROM_XID_ACTIONS);
+ try
+ {
+ stmt.setLong(1,format);
+ stmt.setBytes(2,globalId);
+ stmt.setBytes(3,branchId);
+ int results = stmt.executeUpdate();
+
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ }
+ catch (SQLException e)
+ {
+ getLogger().error("Failed to dequeue: " + e.getMessage(), e);
+ throw new AMQStoreException("Error deleting enqueued message with xid", e);
+ }
+
+ }
+
+ private void recordXid(ConnectionWrapper connWrapper, long format, byte[] globalId, byte[] branchId,
+ Transaction.Record[] enqueues, Transaction.Record[] dequeues) throws AMQStoreException
+ {
+ Connection conn = connWrapper.getConnection();
+
+
+ try
+ {
+
+ PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_XIDS);
+ try
+ {
+ stmt.setLong(1,format);
+ stmt.setBytes(2, globalId);
+ stmt.setBytes(3, branchId);
+ stmt.executeUpdate();
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ stmt = conn.prepareStatement(INSERT_INTO_XID_ACTIONS);
+
+ try
+ {
+ stmt.setLong(1,format);
+ stmt.setBytes(2, globalId);
+ stmt.setBytes(3, branchId);
+
+ if(enqueues != null)
+ {
+ stmt.setString(4, "E");
+ for(Transaction.Record record : enqueues)
+ {
+ stmt.setString(5, record.getQueue().getId().toString());
+ stmt.setLong(6, record.getMessage().getMessageNumber());
+ stmt.executeUpdate();
+ }
+ }
+
+ if(dequeues != null)
+ {
+ stmt.setString(4, "D");
+ for(Transaction.Record record : dequeues)
+ {
+ stmt.setString(5, record.getQueue().getId().toString());
+ stmt.setLong(6, record.getMessage().getMessageNumber());
+ stmt.executeUpdate();
+ }
+ }
+
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ }
+ catch (SQLException e)
+ {
+ getLogger().error("Failed to enqueue: " + e.getMessage(), e);
+ throw new AMQStoreException("Error writing xid ", e);
+ }
+
+ }
+
+ protected boolean isConfigStoreOnly()
+ {
+ return _messageRecoveryHandler == null;
+ }
+
+ private static final class ConnectionWrapper
+ {
+ private final Connection _connection;
+
+ public ConnectionWrapper(Connection conn)
+ {
+ _connection = conn;
+ }
+
+ public Connection getConnection()
+ {
+ return _connection;
+ }
+ }
+
+
+ public void commitTran(ConnectionWrapper connWrapper) throws AMQStoreException
+ {
+
+ try
+ {
+ Connection conn = connWrapper.getConnection();
+ conn.commit();
+
+ if (getLogger().isDebugEnabled())
+ {
+ getLogger().debug("commit tran completed");
+ }
+
+ conn.close();
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error commit tx: " + e.getMessage(), e);
+ }
+ finally
+ {
+
+ }
+ }
+
+ public StoreFuture commitTranAsync(ConnectionWrapper connWrapper) throws AMQStoreException
+ {
+ commitTran(connWrapper);
+ return StoreFuture.IMMEDIATE_FUTURE;
+ }
+
+ public void abortTran(ConnectionWrapper connWrapper) throws AMQStoreException
+ {
+ if (connWrapper == null)
+ {
+ throw new AMQStoreException("Fatal internal error: transactional context is empty at abortTran");
+ }
+
+ if (getLogger().isDebugEnabled())
+ {
+ getLogger().debug("abort tran called: " + connWrapper.getConnection());
+ }
+
+ try
+ {
+ Connection conn = connWrapper.getConnection();
+ conn.rollback();
+ conn.close();
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error aborting transaction: " + e.getMessage(), e);
+ }
+
+ }
+
+ public Long getNewMessageId()
+ {
+ return _messageId.incrementAndGet();
+ }
+
+ private void storeMetaData(Connection conn, long messageId, StorableMessageMetaData metaData)
+ throws SQLException
+ {
+ if(getLogger().isDebugEnabled())
+ {
+ getLogger().debug("Adding metadata for message " + messageId);
+ }
+
+ PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_META_DATA);
+ try
+ {
+ stmt.setLong(1,messageId);
+
+ final int bodySize = 1 + metaData.getStorableSize();
+ byte[] underlying = new byte[bodySize];
+ underlying[0] = (byte) metaData.getType().ordinal();
+ ByteBuffer buf = ByteBuffer.wrap(underlying);
+ buf.position(1);
+ buf = buf.slice();
+
+ metaData.writeToBuffer(0, buf);
+ ByteArrayInputStream bis = new ByteArrayInputStream(underlying);
+ try
+ {
+ stmt.setBinaryStream(2,bis,underlying.length);
+ int result = stmt.executeUpdate();
+
+ if(result == 0)
+ {
+ throw new RuntimeException("Unable to add meta data for message " +messageId);
+ }
+ }
+ finally
+ {
+ try
+ {
+ bis.close();
+ }
+ catch (IOException e)
+ {
+
+ throw new SQLException(e);
+ }
+ }
+
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ }
+
+ protected void recoverMessages(MessageStoreRecoveryHandler recoveryHandler) throws SQLException
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ MessageStoreRecoveryHandler.StoredMessageRecoveryHandler messageHandler = recoveryHandler.begin();
+
+ Statement stmt = conn.createStatement();
+ try
+ {
+ ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_META_DATA);
+ try
+ {
+
+ long maxId = 0;
+
+ while(rs.next())
+ {
+
+ long messageId = rs.getLong(1);
+ if(messageId > maxId)
+ {
+ maxId = messageId;
+ }
+
+ byte[] dataAsBytes = getBlobAsBytes(rs, 2);
+
+ ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
+ buf.position(1);
+ buf = buf.slice();
+ MessageMetaDataType type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]);
+ StorableMessageMetaData metaData = type.createMetaData(buf);
+ StoredJDBCMessage message = new StoredJDBCMessage(messageId, metaData, true);
+ messageHandler.message(message);
+ }
+
+ _messageId.set(maxId);
+
+ messageHandler.completeMessageRecovery();
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+
+
+ protected TransactionLogRecoveryHandler.DtxRecordRecoveryHandler recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) throws SQLException
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ TransactionLogRecoveryHandler.QueueEntryRecoveryHandler queueEntryHandler = recoveryHandler.begin(this);
+
+ Statement stmt = conn.createStatement();
+ try
+ {
+ ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY);
+ try
+ {
+ while(rs.next())
+ {
+
+ String id = rs.getString(1);
+ long messageId = rs.getLong(2);
+ queueEntryHandler.queueEntry(UUID.fromString(id), messageId);
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ return queueEntryHandler.completeQueueEntryRecovery();
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+
+ private static final class Xid
+ {
+
+ private final long _format;
+ private final byte[] _globalId;
+ private final byte[] _branchId;
+
+ public Xid(long format, byte[] globalId, byte[] branchId)
+ {
+ _format = format;
+ _globalId = globalId;
+ _branchId = branchId;
+ }
+
+ public long getFormat()
+ {
+ return _format;
+ }
+
+ public byte[] getGlobalId()
+ {
+ return _globalId;
+ }
+
+ public byte[] getBranchId()
+ {
+ return _branchId;
+ }
+ }
+
+ private static class RecordImpl implements Transaction.Record, TransactionLogResource, EnqueableMessage
+ {
+
+ private long _messageNumber;
+ private UUID _queueId;
+
+ public RecordImpl(UUID queueId, long messageNumber)
+ {
+ _messageNumber = messageNumber;
+ _queueId = queueId;
+ }
+
+ @Override
+ public TransactionLogResource getQueue()
+ {
+ return this;
+ }
+
+ @Override
+ public EnqueableMessage getMessage()
+ {
+ return this;
+ }
+
+ @Override
+ public long getMessageNumber()
+ {
+ return _messageNumber;
+ }
+
+ @Override
+ public boolean isPersistent()
+ {
+ return true;
+ }
+
+ @Override
+ public StoredMessage getStoredMessage()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public UUID getId()
+ {
+ return _queueId;
+ }
+ }
+
+ protected void recoverXids(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh) throws SQLException
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ List<Xid> xids = new ArrayList<Xid>();
+
+ Statement stmt = conn.createStatement();
+ try
+ {
+ ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_XIDS);
+ try
+ {
+ while(rs.next())
+ {
+
+ long format = rs.getLong(1);
+ byte[] globalId = rs.getBytes(2);
+ byte[] branchId = rs.getBytes(3);
+ xids.add(new Xid(format, globalId, branchId));
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+
+
+ for(Xid xid : xids)
+ {
+ List<RecordImpl> enqueues = new ArrayList<RecordImpl>();
+ List<RecordImpl> dequeues = new ArrayList<RecordImpl>();
+
+ PreparedStatement pstmt = conn.prepareStatement(SELECT_ALL_FROM_XID_ACTIONS);
+
+ try
+ {
+ pstmt.setLong(1, xid.getFormat());
+ pstmt.setBytes(2, xid.getGlobalId());
+ pstmt.setBytes(3, xid.getBranchId());
+
+ ResultSet rs = pstmt.executeQuery();
+ try
+ {
+ while(rs.next())
+ {
+
+ String actionType = rs.getString(1);
+ UUID queueId = UUID.fromString(rs.getString(2));
+ long messageId = rs.getLong(3);
+
+ RecordImpl record = new RecordImpl(queueId, messageId);
+ List<RecordImpl> records = "E".equals(actionType) ? enqueues : dequeues;
+ records.add(record);
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ pstmt.close();
+ }
+
+ dtxrh.dtxRecord(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(),
+ enqueues.toArray(new RecordImpl[enqueues.size()]),
+ dequeues.toArray(new RecordImpl[dequeues.size()]));
+ }
+
+
+ dtxrh.completeDtxRecordRecovery();
+ }
+ finally
+ {
+ conn.close();
+ }
+
+ }
+
+ StorableMessageMetaData getMetaData(long messageId) throws SQLException
+ {
+
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_META_DATA);
+ try
+ {
+ stmt.setLong(1,messageId);
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+
+ if(rs.next())
+ {
+ byte[] dataAsBytes = getBlobAsBytes(rs, 1);
+ ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
+ buf.position(1);
+ buf = buf.slice();
+ MessageMetaDataType type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]);
+ StorableMessageMetaData metaData = type.createMetaData(buf);
+
+ return metaData;
+ }
+ else
+ {
+ throw new RuntimeException("Meta data not found for message with id " + messageId);
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+
+ protected abstract byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException;
+
+ private void addContent(Connection conn, long messageId, ByteBuffer src)
+ {
+ if(getLogger().isDebugEnabled())
+ {
+ getLogger().debug("Adding content for message " + messageId);
+ }
+ PreparedStatement stmt = null;
+
+ try
+ {
+ src = src.slice();
+
+ byte[] chunkData = new byte[src.limit()];
+ src.duplicate().get(chunkData);
+
+ stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_CONTENT);
+ stmt.setLong(1,messageId);
+
+ ByteArrayInputStream bis = new ByteArrayInputStream(chunkData);
+ stmt.setBinaryStream(2, bis, chunkData.length);
+ stmt.executeUpdate();
+ }
+ catch (SQLException e)
+ {
+ closeConnection(conn);
+ throw new RuntimeException("Error adding content for message " + messageId + ": " + e.getMessage(), e);
+ }
+ finally
+ {
+ closePreparedStatement(stmt);
+ }
+
+ }
+
+ public int getContent(long messageId, int offset, ByteBuffer dst)
+ {
+ Connection conn = null;
+ PreparedStatement stmt = null;
+
+ try
+ {
+ conn = newAutoCommitConnection();
+
+ stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_CONTENT);
+ stmt.setLong(1,messageId);
+ ResultSet rs = stmt.executeQuery();
+
+ int written = 0;
+
+ if (rs.next())
+ {
+
+ byte[] dataAsBytes = getBlobAsBytes(rs, 1);
+ int size = dataAsBytes.length;
+
+ if (offset > size)
+ {
+ throw new RuntimeException("Offset " + offset + " is greater than message size " + size
+ + " for message id " + messageId + "!");
+
+ }
+
+ written = size - offset;
+ if(written > dst.remaining())
+ {
+ written = dst.remaining();
+ }
+
+ dst.put(dataAsBytes, offset, written);
+ }
+
+ return written;
+
+ }
+ catch (SQLException e)
+ {
+ throw new RuntimeException("Error retrieving content from offset " + offset + " for message " + messageId + ": " + e.getMessage(), e);
+ }
+ finally
+ {
+ closePreparedStatement(stmt);
+ closeConnection(conn);
+ }
+
+
+ }
+
+ @Override
+ public boolean isPersistent()
+ {
+ return true;
+ }
+
+
+ protected class JDBCTransaction implements Transaction
+ {
+ private final ConnectionWrapper _connWrapper;
+ private int _storeSizeIncrease;
+
+
+ protected JDBCTransaction()
+ {
+ try
+ {
+ _connWrapper = new ConnectionWrapper(newConnection());
+ }
+ catch (SQLException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
+ {
+ final StoredMessage storedMessage = message.getStoredMessage();
+ if(storedMessage instanceof StoredJDBCMessage)
+ {
+ try
+ {
+ ((StoredJDBCMessage) storedMessage).store(_connWrapper.getConnection());
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Exception on enqueuing message " + _messageId, e);
+ }
+ }
+ _storeSizeIncrease += storedMessage.getMetaData().getContentSize();
+ AbstractJDBCMessageStore.this.enqueueMessage(_connWrapper, queue, message.getMessageNumber());
+ }
+
+ @Override
+ public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
+ {
+ AbstractJDBCMessageStore.this.dequeueMessage(_connWrapper, queue, message.getMessageNumber());
+
+ }
+
+ @Override
+ public void commitTran() throws AMQStoreException
+ {
+ AbstractJDBCMessageStore.this.commitTran(_connWrapper);
+ storedSizeChange(_storeSizeIncrease);
+ }
+
+ @Override
+ public StoreFuture commitTranAsync() throws AMQStoreException
+ {
+ final StoreFuture storeFuture = AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper);
+ storedSizeChange(_storeSizeIncrease);
+ return storeFuture;
+ }
+
+ @Override
+ public void abortTran() throws AMQStoreException
+ {
+ AbstractJDBCMessageStore.this.abortTran(_connWrapper);
+ }
+
+ @Override
+ public void removeXid(long format, byte[] globalId, byte[] branchId) throws AMQStoreException
+ {
+ AbstractJDBCMessageStore.this.removeXid(_connWrapper, format, globalId, branchId);
+ }
+
+ @Override
+ public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
+ throws AMQStoreException
+ {
+ AbstractJDBCMessageStore.this.recordXid(_connWrapper, format, globalId, branchId, enqueues, dequeues);
+ }
+ }
+
+ private class StoredJDBCMessage implements StoredMessage
+ {
+
+ private final long _messageId;
+ private final boolean _isRecovered;
+
+ private StorableMessageMetaData _metaData;
+ private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
+ private byte[] _data;
+ private volatile SoftReference<byte[]> _dataRef;
+
+
+ StoredJDBCMessage(long messageId, StorableMessageMetaData metaData)
+ {
+ this(messageId, metaData, false);
+ }
+
+
+ StoredJDBCMessage(long messageId,
+ StorableMessageMetaData metaData, boolean isRecovered)
+ {
+ _messageId = messageId;
+ _isRecovered = isRecovered;
+
+ if(!_isRecovered)
+ {
+ _metaData = metaData;
+ }
+ _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
+ }
+
+ @Override
+ public StorableMessageMetaData getMetaData()
+ {
+ StorableMessageMetaData metaData = _metaData == null ? _metaDataRef.get() : _metaData;
+ if(metaData == null)
+ {
+ try
+ {
+ metaData = AbstractJDBCMessageStore.this.getMetaData(_messageId);
+ }
+ catch (SQLException e)
+ {
+ throw new RuntimeException(e);
+ }
+ _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
+ }
+
+ return metaData;
+ }
+
+ @Override
+ public long getMessageNumber()
+ {
+ return _messageId;
+ }
+
+ @Override
+ public void addContent(int offsetInMessage, ByteBuffer src)
+ {
+ src = src.slice();
+
+ if(_data == null)
+ {
+ _data = new byte[src.remaining()];
+ _dataRef = new SoftReference<byte[]>(_data);
+ src.duplicate().get(_data);
+ }
+ else
+ {
+ byte[] oldData = _data;
+ _data = new byte[oldData.length + src.remaining()];
+ _dataRef = new SoftReference<byte[]>(_data);
+
+ System.arraycopy(oldData,0,_data,0,oldData.length);
+ src.duplicate().get(_data, oldData.length, src.remaining());
+ }
+
+ }
+
+ @Override
+ public int getContent(int offsetInMessage, ByteBuffer dst)
+ {
+ byte[] data = _dataRef == null ? null : _dataRef.get();
+ if(data != null)
+ {
+ int length = Math.min(dst.remaining(), data.length - offsetInMessage);
+ dst.put(data, offsetInMessage, length);
+ return length;
+ }
+ else
+ {
+ return AbstractJDBCMessageStore.this.getContent(_messageId, offsetInMessage, dst);
+ }
+ }
+
+
+ @Override
+ public ByteBuffer getContent(int offsetInMessage, int size)
+ {
+ ByteBuffer buf = ByteBuffer.allocate(size);
+ int length = getContent(offsetInMessage, buf);
+ buf.position(0);
+ buf.limit(length);
+ return buf;
+ }
+
+ @Override
+ public synchronized StoreFuture flushToStore()
+ {
+ Connection conn = null;
+ try
+ {
+ if(!stored())
+ {
+ conn = newConnection();
+
+ store(conn);
+
+ conn.commit();
+ storedSizeChange(getMetaData().getContentSize());
+ }
+ }
+ catch (SQLException e)
+ {
+ if(getLogger().isDebugEnabled())
+ {
+ getLogger().debug("Error when trying to flush message " + _messageId + " to store: " + e);
+ }
+ throw new RuntimeException(e);
+ }
+ finally
+ {
+ closeConnection(conn);
+ }
+ return StoreFuture.IMMEDIATE_FUTURE;
+ }
+
+ @Override
+ public void remove()
+ {
+ int delta = getMetaData().getContentSize();
+ AbstractJDBCMessageStore.this.removeMessage(_messageId);
+ storedSizeChange(-delta);
+ }
+
+ private synchronized void store(final Connection conn) throws SQLException
+ {
+ if (!stored())
+ {
+ try
+ {
+ storeMetaData(conn, _messageId, _metaData);
+ AbstractJDBCMessageStore.this.addContent(conn, _messageId,
+ _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data));
+ }
+ finally
+ {
+ _metaData = null;
+ _data = null;
+ }
+
+ if(getLogger().isDebugEnabled())
+ {
+ getLogger().debug("Storing message " + _messageId + " to store");
+ }
+ }
+ }
+
+ private boolean stored()
+ {
+ return _metaData == null || _isRecovered;
+ }
+ }
+
+ protected void closeConnection(final Connection conn)
+ {
+ if(conn != null)
+ {
+ try
+ {
+ conn.close();
+ }
+ catch (SQLException e)
+ {
+ getLogger().error("Problem closing connection", e);
+ }
+ }
+ }
+
+ protected void closePreparedStatement(final PreparedStatement stmt)
+ {
+ if (stmt != null)
+ {
+ try
+ {
+ stmt.close();
+ }
+ catch(SQLException e)
+ {
+ getLogger().error("Problem closing prepared statement", e);
+ }
+ }
+ }
+
+ @Override
+ public void addEventListener(EventListener eventListener, Event... events)
+ {
+ _eventManager.addEventListener(eventListener, events);
+ }
+
+ private void insertConfiguredObject(ConfiguredObjectRecord configuredObject) throws AMQStoreException
+ {
+ if (_stateManager.isInState(State.ACTIVE))
+ {
+ try
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
+ try
+ {
+ stmt.setString(1, configuredObject.getId().toString());
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+ // If we don't have any data in the result set then we can add this configured object
+ if (!rs.next())
+ {
+ PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS);
+ try
+ {
+ insertStmt.setString(1, configuredObject.getId().toString());
+ insertStmt.setString(2, configuredObject.getType());
+ if(configuredObject.getAttributes() == null)
+ {
+ insertStmt.setNull(3, Types.BLOB);
+ }
+ else
+ {
+ final Map<String, Object> attributes = configuredObject.getAttributes();
+ byte[] attributesAsBytes = new ObjectMapper().writeValueAsBytes(attributes);
+ ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
+ insertStmt.setBinaryStream(3, bis, attributesAsBytes.length);
+ }
+ insertStmt.execute();
+ }
+ finally
+ {
+ insertStmt.close();
+ }
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+ catch (JsonMappingException e)
+ {
+ throw new AMQStoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
+ }
+ catch (JsonGenerationException e)
+ {
+ throw new AMQStoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
+ }
+ catch (IOException e)
+ {
+ throw new AMQStoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ private int removeConfiguredObject(UUID id) throws AMQStoreException
+ {
+ int results = 0;
+ try
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ results = removeConfiguredObject(id, conn);
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error deleting of configured object with id " + id + " from database: " + e.getMessage(), e);
+ }
+ return results;
+ }
+
+ public UUID[] removeConfiguredObjects(UUID... objects) throws AMQStoreException
+ {
+ Collection<UUID> removed = new ArrayList<UUID>(objects.length);
+ try
+ {
+
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ for(UUID id : objects)
+ {
+ if(removeConfiguredObject(id, conn) != 0)
+ {
+ removed.add(id);
+ }
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error deleting of configured objects " + Arrays.asList(objects) + " from database: " + e.getMessage(), e);
+ }
+ return removed.toArray(new UUID[removed.size()]);
+ }
+
+ private int removeConfiguredObject(final UUID id, final Connection conn) throws SQLException
+ {
+ final int results;PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_CONFIGURED_OBJECTS);
+ try
+ {
+ stmt.setString(1, id.toString());
+ results = stmt.executeUpdate();
+ }
+ finally
+ {
+ stmt.close();
+ }
+ return results;
+ }
+
+ private void updateConfiguredObject(final ConfiguredObjectRecord configuredObject) throws AMQStoreException
+ {
+ if (_stateManager.isInState(State.ACTIVE))
+ {
+ try
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ updateConfiguredObject(configuredObject, false, conn);
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ @Override
+ public void update(ConfiguredObjectRecord... records) throws AMQStoreException
+ {
+ update(false, records);
+ }
+
+ public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws AMQStoreException
+ {
+ if (_stateManager.isInState(State.ACTIVE) || _stateManager.isInState(State.ACTIVATING))
+ {
+ try
+ {
+ Connection conn = newConnection();
+ try
+ {
+ for(ConfiguredObjectRecord record : records)
+ {
+ updateConfiguredObject(record, createIfNecessary, conn);
+ }
+ conn.commit();
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error updating configured objects in database: " + e.getMessage(), e);
+ }
+
+ }
+
+ }
+
+ private void updateConfiguredObject(ConfiguredObjectRecord configuredObject,
+ boolean createIfNecessary,
+ Connection conn)
+ throws SQLException, AMQStoreException
+ {
+ PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
+ try
+ {
+ stmt.setString(1, configuredObject.getId().toString());
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+ if (rs.next())
+ {
+ PreparedStatement stmt2 = conn.prepareStatement(UPDATE_CONFIGURED_OBJECTS);
+ try
+ {
+ stmt2.setString(1, configuredObject.getType());
+ if (configuredObject.getAttributes() != null)
+ {
+ byte[] attributesAsBytes = (new ObjectMapper()).writeValueAsBytes(
+ configuredObject.getAttributes());
+ ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
+ stmt2.setBinaryStream(2, bis, attributesAsBytes.length);
+ }
+ else
+ {
+ stmt2.setNull(2, Types.BLOB);
+ }
+ stmt2.setString(3, configuredObject.getId().toString());
+ stmt2.execute();
+ }
+ finally
+ {
+ stmt2.close();
+ }
+ }
+ else if(createIfNecessary)
+ {
+ PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS);
+ try
+ {
+ insertStmt.setString(1, configuredObject.getId().toString());
+ insertStmt.setString(2, configuredObject.getType());
+ if(configuredObject.getAttributes() == null)
+ {
+ insertStmt.setNull(3, Types.BLOB);
+ }
+ else
+ {
+ final Map<String, Object> attributes = configuredObject.getAttributes();
+ byte[] attributesAsBytes = new ObjectMapper().writeValueAsBytes(attributes);
+ ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
+ insertStmt.setBinaryStream(3, bis, attributesAsBytes.length);
+ }
+ insertStmt.execute();
+ }
+ finally
+ {
+ insertStmt.close();
+ }
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ catch (JsonMappingException e)
+ {
+ throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
+ }
+ catch (JsonGenerationException e)
+ {
+ throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
+ }
+ catch (IOException e)
+ {
+ throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ }
+
+ private ConfiguredObjectRecord loadConfiguredObject(final UUID id) throws AMQStoreException
+ {
+ ConfiguredObjectRecord result = null;
+ try
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
+ try
+ {
+ stmt.setString(1, id.toString());
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+ if (rs.next())
+ {
+ String type = rs.getString(1);
+ String attributes = getBlobAsString(rs, 2);
+ result = new ConfiguredObjectRecord(id, type,
+ (new ObjectMapper()).readValue(attributes,Map.class));
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+ catch (JsonMappingException e)
+ {
+ throw new AMQStoreException("Error loading of configured object with id " + id + " from database: "
+ + e.getMessage(), e);
+ }
+ catch (JsonParseException e)
+ {
+ throw new AMQStoreException("Error loading of configured object with id " + id + " from database: "
+ + e.getMessage(), e);
+ }
+ catch (IOException e)
+ {
+ throw new AMQStoreException("Error loading of configured object with id " + id + " from database: "
+ + e.getMessage(), e);
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error loading of configured object with id " + id + " from database: "
+ + e.getMessage(), e);
+ }
+ return result;
+ }
+
+ private void loadConfiguredObjects(ConfigurationRecoveryHandler recoveryHandler) throws SQLException, AMQStoreException
+ {
+ Connection conn = newAutoCommitConnection();
+
+ final ObjectMapper objectMapper = new ObjectMapper();
+ try
+ {
+ PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS);
+ try
+ {
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+ while (rs.next())
+ {
+ String id = rs.getString(1);
+ String objectType = rs.getString(2);
+ String attributes = getBlobAsString(rs, 3);
+ recoveryHandler.configuredObject(UUID.fromString(id), objectType,
+ objectMapper.readValue(attributes,Map.class));
+ }
+ }
+ catch (JsonMappingException e)
+ {
+ throw new AMQStoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+ catch (JsonParseException e)
+ {
+ throw new AMQStoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+ catch (IOException e)
+ {
+ throw new AMQStoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+
+ protected abstract String getBlobAsString(ResultSet rs, int col) throws SQLException;
+
+ protected abstract void storedSizeChange(int storeSizeIncrease);
+
+
+ @Override
+ public void onDelete()
+ {
+ try
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ for (String tableName : ALL_TABLES)
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute("DROP TABLE " + tableName);
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+ catch(SQLException e)
+ {
+ getLogger().error("Exception while deleting store tables", e);
+ }
+ }
+
+}