diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java | 1464 |
1 files changed, 1464 insertions, 0 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java new file mode 100644 index 0000000000..e7f9c777c9 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java @@ -0,0 +1,1464 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.store; + +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.queue.MessageMetaData; +import org.apache.qpid.server.queue.QueueRegistry; + +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.MessageHandleFactory; +import org.apache.qpid.server.txn.TransactionalContext; +import org.apache.qpid.server.txn.NonTransactionalContext; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.commons.configuration.Configuration; +import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; + +import java.io.File; +import java.io.ByteArrayInputStream; +import java.sql.DriverManager; +import java.sql.Driver; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.Blob; +import java.sql.Types; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import java.util.TreeMap; + + +public class DerbyMessageStore implements MessageStore +{ + + private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class); + + private static final String ENVIRONMENT_PATH_PROPERTY = "environment-path"; + + + private static final String SQL_DRIVER_NAME = "org.apache.derby.jdbc.EmbeddedDriver"; + + private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION"; + + private static final String EXCHANGE_TABLE_NAME = "QPID_EXCHANGE"; + private static final String QUEUE_TABLE_NAME = "QPID_QUEUE"; + private static final String BINDINGS_TABLE_NAME = "QPID_BINDINGS"; + private static final String QUEUE_ENTRY_TABLE_NAME = "QPID_QUEUE_ENTRY"; + private static final String MESSAGE_META_DATA_TABLE_NAME = "QPID_MESSAGE_META_DATA"; + private static final String MESSAGE_CONTENT_TABLE_NAME = "QPID_MESSAGE_CONTENT"; + + private static final int DB_VERSION = 1; + + + + private VirtualHost _virtualHost; + private static Class<Driver> DRIVER_CLASS; + + private final AtomicLong _messageId = new AtomicLong(1); + private AtomicBoolean _closed = new AtomicBoolean(false); + + private String _connectionURL; + + + + private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+DB_VERSION_TABLE_NAME+" ( version int not null )"; + private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+DB_VERSION_TABLE_NAME+" ( version ) VALUES ( ? )"; + private static final String CREATE_EXCHANGE_TABLE = "CREATE TABLE "+EXCHANGE_TABLE_NAME+" ( name varchar(255) not null, type varchar(255) not null, autodelete SMALLINT not null, PRIMARY KEY ( name ) )"; + private static final String CREATE_QUEUE_TABLE = "CREATE TABLE "+QUEUE_TABLE_NAME+" ( name varchar(255) not null, owner varchar(255), PRIMARY KEY ( name ) )"; + private static final String CREATE_BINDINGS_TABLE = "CREATE TABLE "+BINDINGS_TABLE_NAME+" ( exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255) not null, arguments blob , PRIMARY KEY ( exchange_name, queue_name, binding_key ) )"; + private static final String CREATE_QUEUE_ENTRY_TABLE = "CREATE TABLE "+QUEUE_ENTRY_TABLE_NAME+" ( queue_name varchar(255) not null, message_id bigint not null, PRIMARY KEY (queue_name, message_id) )"; + private static final String CREATE_MESSAGE_META_DATA_TABLE = "CREATE TABLE "+MESSAGE_META_DATA_TABLE_NAME+" ( message_id bigint not null, exchange_name varchar(255) not null, routing_key varchar(255), flag_mandatory smallint not null, flag_immediate smallint not null, content_header blob, chunk_count int not null, PRIMARY KEY ( message_id ) )"; + private static final String CREATE_MESSAGE_CONTENT_TABLE = "CREATE TABLE "+MESSAGE_CONTENT_TABLE_NAME+" ( message_id bigint not null, chunk_id int not null, content_chunk blob , PRIMARY KEY (message_id, chunk_id) )"; + private static final String SELECT_FROM_QUEUE = "SELECT name, owner FROM " + QUEUE_TABLE_NAME; + private static final String SELECT_FROM_EXCHANGE = "SELECT name, type, autodelete FROM " + EXCHANGE_TABLE_NAME; + private static final String SELECT_FROM_BINDINGS = + "SELECT queue_name, binding_key, arguments FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ?"; + private static final String DELETE_FROM_MESSAGE_META_DATA = "DELETE FROM " + MESSAGE_META_DATA_TABLE_NAME + " WHERE message_id = ?"; + private static final String DELETE_FROM_MESSAGE_CONTENT = "DELETE FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ?"; + private static final String INSERT_INTO_EXCHANGE = "INSERT INTO " + EXCHANGE_TABLE_NAME + " ( name, type, autodelete ) VALUES ( ?, ?, ? )"; + private static final String DELETE_FROM_EXCHANGE = "DELETE FROM " + EXCHANGE_TABLE_NAME + " WHERE name = ?"; + private static final String INSERT_INTO_BINDINGS = "INSERT INTO " + BINDINGS_TABLE_NAME + " ( exchange_name, queue_name, binding_key, arguments ) values ( ?, ?, ?, ? )"; + private static final String DELETE_FROM_BINDINGS = "DELETE FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ? AND queue_name = ? AND binding_key = ?"; + private static final String INSERT_INTO_QUEUE = "INSERT INTO " + QUEUE_TABLE_NAME + " (name, owner) VALUES (?, ?)"; + private static final String DELETE_FROM_QUEUE = "DELETE FROM " + QUEUE_TABLE_NAME + " WHERE name = ?"; + private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_name, message_id) values (?,?)"; + private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_name = ? AND message_id =?"; + private static final String INSERT_INTO_MESSAGE_CONTENT = "INSERT INTO " + MESSAGE_CONTENT_TABLE_NAME + "( message_id, chunk_id, content_chunk ) values (?, ?, ?)"; + private static final String INSERT_INTO_MESSAGE_META_DATA = "INSERT INTO " + MESSAGE_META_DATA_TABLE_NAME + "( message_id , exchange_name , routing_key , flag_mandatory , flag_immediate , content_header , chunk_count ) values (?, ?, ?, ?, ?, ?, ?)"; + private static final String SELECT_FROM_MESSAGE_META_DATA = + "SELECT exchange_name , routing_key , flag_mandatory , flag_immediate , content_header , chunk_count FROM " + MESSAGE_META_DATA_TABLE_NAME + " WHERE message_id = ?"; + private static final String SELECT_FROM_MESSAGE_CONTENT = + "SELECT content_chunk FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ? and chunk_id = ?"; + private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_name, message_id FROM " + QUEUE_ENTRY_TABLE_NAME; + private static final String TABLE_EXISTANCE_QUERY = "SELECT 1 FROM SYS.SYSTABLES WHERE TABLENAME = ?"; + + + private enum State + { + INITIAL, + CONFIGURING, + RECOVERING, + STARTED, + CLOSING, + CLOSED + } + + private State _state = State.INITIAL; + + + public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception + { + stateTransition(State.INITIAL, State.CONFIGURING); + + initialiseDriver(); + + _virtualHost = virtualHost; + + _logger.info("Configuring Derby message store for virtual host " + virtualHost.getName()); + QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); + + final String databasePath = config.getStoreConfiguration().getString(base + "." + ENVIRONMENT_PATH_PROPERTY, "derbyDB"); + + File environmentPath = new File(databasePath); + if (!environmentPath.exists()) + { + if (!environmentPath.mkdirs()) + { + throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. " + + "Ensure the path is correct and that the permissions are correct."); + } + } + + createOrOpenDatabase(databasePath); + + // this recovers durable queues and persistent messages + + recover(); + + stateTransition(State.RECOVERING, State.STARTED); + + } + + private static synchronized void initialiseDriver() throws ClassNotFoundException + { + if(DRIVER_CLASS == null) + { + DRIVER_CLASS = (Class<Driver>) Class.forName(SQL_DRIVER_NAME); + } + } + + private void createOrOpenDatabase(final String environmentPath) throws SQLException + { + _connectionURL = "jdbc:derby:" + environmentPath + "/" + _virtualHost.getName() + ";create=true"; + + Connection conn = newConnection(); + + createVersionTable(conn); + createExchangeTable(conn); + createQueueTable(conn); + createBindingsTable(conn); + createQueueEntryTable(conn); + createMessageMetaDataTable(conn); + createMessageContentTable(conn); + + conn.close(); + } + + + + private void createVersionTable(final Connection conn) throws SQLException + { + if(!tableExists(DB_VERSION_TABLE_NAME, conn)) + { + Statement stmt = conn.createStatement(); + + stmt.execute(CREATE_DB_VERSION_TABLE); + stmt.close(); + + PreparedStatement pstmt = conn.prepareStatement(INSERT_INTO_DB_VERSION); + pstmt.setInt(1, DB_VERSION); + pstmt.execute(); + pstmt.close(); + } + + } + + + private void createExchangeTable(final Connection conn) throws SQLException + { + if(!tableExists(EXCHANGE_TABLE_NAME, conn)) + { + Statement stmt = conn.createStatement(); + + stmt.execute(CREATE_EXCHANGE_TABLE); + stmt.close(); + } + } + + private void createQueueTable(final Connection conn) throws SQLException + { + if(!tableExists(QUEUE_TABLE_NAME, conn)) + { + Statement stmt = conn.createStatement(); + stmt.execute(CREATE_QUEUE_TABLE); + stmt.close(); + } + } + + private void createBindingsTable(final Connection conn) throws SQLException + { + if(!tableExists(BINDINGS_TABLE_NAME, conn)) + { + Statement stmt = conn.createStatement(); + stmt.execute(CREATE_BINDINGS_TABLE); + + stmt.close(); + } + + } + + private void createQueueEntryTable(final Connection conn) throws SQLException + { + if(!tableExists(QUEUE_ENTRY_TABLE_NAME, conn)) + { + Statement stmt = conn.createStatement(); + stmt.execute(CREATE_QUEUE_ENTRY_TABLE); + + stmt.close(); + } + + } + + private void createMessageMetaDataTable(final Connection conn) throws SQLException + { + if(!tableExists(MESSAGE_META_DATA_TABLE_NAME, conn)) + { + Statement stmt = conn.createStatement(); + stmt.execute(CREATE_MESSAGE_META_DATA_TABLE); + + stmt.close(); + } + + } + + + private void createMessageContentTable(final Connection conn) throws SQLException + { + if(!tableExists(MESSAGE_CONTENT_TABLE_NAME, conn)) + { + Statement stmt = conn.createStatement(); + stmt.execute(CREATE_MESSAGE_CONTENT_TABLE); + + stmt.close(); + } + + } + + + + private boolean tableExists(final String tableName, final Connection conn) throws SQLException + { + PreparedStatement stmt = conn.prepareStatement(TABLE_EXISTANCE_QUERY); + stmt.setString(1, tableName); + ResultSet rs = stmt.executeQuery(); + boolean exists = rs.next(); + rs.close(); + stmt.close(); + return exists; + } + + public void recover() throws AMQException + { + stateTransition(State.CONFIGURING, State.RECOVERING); + + _logger.info("Recovering persistent state..."); + StoreContext context = new StoreContext(); + + try + { + Map<AMQShortString, AMQQueue> queues = loadQueues(); + + recoverExchanges(); + + try + { + + beginTran(context); + + deliverMessages(context, queues); + _logger.info("Persistent state recovered successfully"); + commitTran(context); + + } + finally + { + if(inTran(context)) + { + abortTran(context); + } + } + } + catch (SQLException e) + { + + throw new AMQException("Error recovering persistent state: " + e, e); + } + + } + + private Map<AMQShortString, AMQQueue> loadQueues() throws SQLException, AMQException + { + Connection conn = newConnection(); + + + Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE); + Map<AMQShortString, AMQQueue> queueMap = new HashMap<AMQShortString, AMQQueue>(); + while(rs.next()) + { + String queueName = rs.getString(1); + String owner = rs.getString(2); + AMQShortString queueNameShortString = new AMQShortString(queueName); + AMQQueue q = AMQQueueFactory.createAMQQueueImpl(queueNameShortString, true, owner == null ? null : new AMQShortString(owner), false, _virtualHost, + null); + _virtualHost.getQueueRegistry().registerQueue(q); + queueMap.put(queueNameShortString,q); + + } + return queueMap; + } + + private void recoverExchanges() throws AMQException, SQLException + { + for (Exchange exchange : loadExchanges()) + { + recoverExchange(exchange); + } + } + + + private List<Exchange> loadExchanges() throws AMQException, SQLException + { + + List<Exchange> exchanges = new ArrayList<Exchange>(); + Connection conn = null; + try + { + conn = newConnection(); + + + Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery(SELECT_FROM_EXCHANGE); + + Exchange exchange; + while(rs.next()) + { + String exchangeName = rs.getString(1); + String type = rs.getString(2); + boolean autoDelete = rs.getShort(3) != 0; + + exchange = _virtualHost.getExchangeFactory().createExchange(new AMQShortString(exchangeName), new AMQShortString(type), true, autoDelete, 0); + _virtualHost.getExchangeRegistry().registerExchange(exchange); + exchanges.add(exchange); + + } + return exchanges; + + } + finally + { + if(conn != null) + { + conn.close(); + } + } + + } + + private void recoverExchange(Exchange exchange) throws AMQException, SQLException + { + _logger.info("Recovering durable exchange " + exchange.getName() + " of type " + exchange.getType() + "..."); + + QueueRegistry queueRegistry = _virtualHost.getQueueRegistry(); + + Connection conn = null; + try + { + conn = newConnection(); + + PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_BINDINGS); + stmt.setString(1, exchange.getName().toString()); + + ResultSet rs = stmt.executeQuery(); + + + while(rs.next()) + { + String queueName = rs.getString(1); + String bindingKey = rs.getString(2); + Blob arguments = rs.getBlob(3); + + + AMQQueue queue = queueRegistry.getQueue(new AMQShortString(queueName)); + if (queue == null) + { + _logger.error("Unkown queue: " + queueName + " cannot be bound to exchange: " + + exchange.getName()); + } + else + { + _logger.info("Restoring binding: (Exchange: " + exchange.getName() + ", Queue: " + queueName + + ", Routing Key: " + bindingKey + ", Arguments: " + arguments + + ")"); + + FieldTable argumentsFT = null; + if(arguments != null) + { + byte[] argumentBytes = arguments.getBytes(0, (int) arguments.length()); + ByteBuffer buf = ByteBuffer.wrap(argumentBytes); + argumentsFT = new FieldTable(buf,arguments.length()); + } + + queue.bind(exchange, bindingKey == null ? null : new AMQShortString(bindingKey), argumentsFT); + + } + } + } + finally + { + if(conn != null) + { + conn.close(); + } + } + } + + public void close() throws Exception + { + _closed.getAndSet(true); + } + + public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException + { + + boolean localTx = getOrCreateTransaction(storeContext); + + Connection conn = getConnection(storeContext); + ConnectionWrapper wrapper = (ConnectionWrapper) storeContext.getPayload(); + + + if (_logger.isDebugEnabled()) + { + _logger.debug("Message Id: " + messageId + " Removing"); + } + + // first we need to look up the header to get the chunk count + MessageMetaData mmd = getMessageMetaData(storeContext, messageId); + try + { + PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_META_DATA); + stmt.setLong(1,messageId); + wrapper.setRequiresCommit(); + int results = stmt.executeUpdate(); + + if (results == 0) + { + if (localTx) + { + abortTran(storeContext); + } + + throw new AMQException("Message metadata not found for message id " + messageId); + } + stmt.close(); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Deleted metadata for message " + messageId); + } + + stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_CONTENT); + stmt.setLong(1,messageId); + results = stmt.executeUpdate(); + + if(results != mmd.getContentChunkCount()) + { + if (localTx) + { + abortTran(storeContext); + } + throw new AMQException("Unexpected number of content chunks when deleting message. Expected " + mmd.getContentChunkCount() + " but found " + results); + + } + + if (localTx) + { + commitTran(storeContext); + } + } + catch (SQLException e) + { + if ((conn != null) && localTx) + { + abortTran(storeContext); + } + + throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, e); + } + + } + + public void createExchange(Exchange exchange) throws AMQException + { + if (_state != State.RECOVERING) + { + try + { + Connection conn = null; + + try + { + conn = newConnection(); + + PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_EXCHANGE); + stmt.setString(1, exchange.getName().toString()); + stmt.setString(2, exchange.getType().toString()); + stmt.setShort(3, exchange.isAutoDelete() ? (short) 1 : (short) 0); + stmt.execute(); + stmt.close(); + conn.commit(); + + } + finally + { + if(conn != null) + { + conn.close(); + } + } + } + catch (SQLException e) + { + throw new AMQException("Error writing Exchange with name " + exchange.getName() + " to database: " + e, e); + } + } + + } + + public void removeExchange(Exchange exchange) throws AMQException + { + Connection conn = null; + + try + { + conn = newConnection(); + PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_EXCHANGE); + stmt.setString(1, exchange.getName().toString()); + int results = stmt.executeUpdate(); + if(results == 0) + { + throw new AMQException("Exchange " + exchange.getName() + " not found"); + } + else + { + conn.commit(); + stmt.close(); + } + } + catch (SQLException e) + { + throw new AMQException("Error writing deleting with name " + exchange.getName() + " from database: " + e, e); + } + finally + { + if(conn != null) + { + try + { + conn.close(); + } + catch (SQLException e) + { + _logger.error(e); + } + } + + } + } + + public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) + throws AMQException + { + if (_state != State.RECOVERING) + { + Connection conn = null; + + + try + { + conn = newConnection(); + PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_BINDINGS); + stmt.setString(1, exchange.getName().toString() ); + stmt.setString(2, queue.getName().toString()); + stmt.setString(3, routingKey == null ? null : routingKey.toString()); + if(args != null) + { + /* This would be the Java 6 way of setting a Blob + Blob blobArgs = conn.createBlob(); + blobArgs.setBytes(0, args.getDataAsBytes()); + stmt.setBlob(4, blobArgs); + */ + byte[] bytes = args.getDataAsBytes(); + ByteArrayInputStream bis = new ByteArrayInputStream(bytes); + stmt.setBinaryStream(4, bis, bytes.length); + } + else + { + stmt.setNull(4, Types.BLOB); + } + + stmt.executeUpdate(); + conn.commit(); + stmt.close(); + } + catch (SQLException e) + { + throw new AMQException("Error writing binding for AMQQueue with name " + queue.getName() + " to exchange " + + exchange.getName() + " to database: " + e, e); + } + finally + { + if(conn != null) + { + try + { + conn.close(); + } + catch (SQLException e) + { + _logger.error(e); + } + } + + } + + } + + + } + + public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) + throws AMQException + { + Connection conn = null; + + + try + { + conn = newConnection(); + // exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255), arguments blob + PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_BINDINGS); + stmt.setString(1, exchange.getName().toString() ); + stmt.setString(2, queue.getName().toString()); + stmt.setString(3, routingKey == null ? null : routingKey.toString()); + + + if(stmt.executeUpdate() != 1) + { + throw new AMQException("Queue binding for queue with name " + queue.getName() + " to exchange " + + exchange.getName() + " not found"); + } + conn.commit(); + stmt.close(); + } + catch (SQLException e) + { + throw new AMQException("Error removing binding for AMQQueue with name " + queue.getName() + " to exchange " + + exchange.getName() + " in database: " + e, e); + } + finally + { + if(conn != null) + { + try + { + conn.close(); + } + catch (SQLException e) + { + _logger.error(e); + } + } + + } + + + } + + public void createQueue(AMQQueue queue) throws AMQException + { + createQueue(queue, null); + } + + public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException + { + _logger.debug("public void createQueue(AMQQueue queue = " + queue + "): called"); + + if (_state != State.RECOVERING) + { + try + { + Connection conn = newConnection(); + + PreparedStatement stmt = + conn.prepareStatement(INSERT_INTO_QUEUE); + + stmt.setString(1, queue.getName().toString()); + stmt.setString(2, queue.getOwner() == null ? null : queue.getOwner().toString()); + + stmt.execute(); + + stmt.close(); + + conn.commit(); + + conn.close(); + } + catch (SQLException e) + { + throw new AMQException("Error writing AMQQueue with name " + queue.getName() + " to database: " + e, e); + } + } + } + + private Connection newConnection() throws SQLException + { + final Connection connection = DriverManager.getConnection(_connectionURL); + return connection; + } + + public void removeQueue(final AMQQueue queue) throws AMQException + { + AMQShortString name = queue.getName(); + _logger.debug("public void removeQueue(AMQShortString name = " + name + "): called"); + Connection conn = null; + + + try + { + conn = newConnection(); + PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE); + stmt.setString(1, name.toString()); + int results = stmt.executeUpdate(); + + + if (results == 0) + { + throw new AMQException("Queue " + name + " not found"); + } + + conn.commit(); + stmt.close(); + } + catch (SQLException e) + { + throw new AMQException("Error writing deleting with name " + name + " from database: " + e, e); + } + finally + { + if(conn != null) + { + try + { + conn.close(); + } + catch (SQLException e) + { + _logger.error(e); + } + } + + } + + + } + + public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException + { + AMQShortString name = queue.getName(); + + boolean localTx = getOrCreateTransaction(context); + Connection conn = getConnection(context); + ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload(); + + try + { + PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY); + stmt.setString(1,name.toString()); + stmt.setLong(2,messageId); + stmt.executeUpdate(); + connWrapper.requiresCommit(); + + if(localTx) + { + commitTran(context); + } + + + + if (_logger.isDebugEnabled()) + { + _logger.debug("Enqueuing message " + messageId + " on queue " + name + "[Connection" + conn + "]"); + } + } + catch (SQLException e) + { + if(localTx) + { + abortTran(context); + } + _logger.error("Failed to enqueue: " + e, e); + throw new AMQException("Error writing enqueued message with id " + messageId + " for queue " + name + + " to database", e); + } + + } + + public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException + { + AMQShortString name = queue.getName(); + + boolean localTx = getOrCreateTransaction(context); + Connection conn = getConnection(context); + ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload(); + + try + { + PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE_ENTRY); + stmt.setString(1,name.toString()); + stmt.setLong(2,messageId); + int results = stmt.executeUpdate(); + + connWrapper.requiresCommit(); + + if(results != 1) + { + throw new AMQException("Unable to find message with id " + messageId + " on queue " + name); + } + + if(localTx) + { + commitTran(context); + } + + + + if (_logger.isDebugEnabled()) + { + _logger.debug("Dequeuing message " + messageId + " on queue " + name + "[Connection" + conn + "]"); + } + } + catch (SQLException e) + { + if(localTx) + { + abortTran(context); + } + _logger.error("Failed to dequeue: " + e, e); + throw new AMQException("Error deleting enqueued message with id " + messageId + " for queue " + name + + " from database", e); + } + + } + + private static final class ConnectionWrapper + { + private final Connection _connection; + private boolean _requiresCommit; + + public ConnectionWrapper(Connection conn) + { + _connection = conn; + } + + public void setRequiresCommit() + { + _requiresCommit = true; + } + + public boolean requiresCommit() + { + return _requiresCommit; + } + + public Connection getConnection() + { + return _connection; + } + } + + public void beginTran(StoreContext context) throws AMQException + { + if (context.getPayload() != null) + { + throw new AMQException("Fatal internal error: transactional context is not empty at beginTran: " + + context.getPayload()); + } + else + { + try + { + Connection conn = newConnection(); + + + context.setPayload(new ConnectionWrapper(conn)); + } + catch (SQLException e) + { + throw new AMQException("Error starting transaction: " + e, e); + } + } + } + + public void commitTran(StoreContext context) throws AMQException + { + ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload(); + + if (connWrapper == null) + { + throw new AMQException("Fatal internal error: transactional context is empty at commitTran"); + } + + try + { + Connection conn = connWrapper.getConnection(); + if(connWrapper.requiresCommit()) + { + conn.commit(); + + if (_logger.isDebugEnabled()) + { + _logger.debug("commit tran completed"); + } + + } + conn.close(); + } + catch (SQLException e) + { + throw new AMQException("Error commit tx: " + e, e); + } + finally + { + context.setPayload(null); + } + } + + public void abortTran(StoreContext context) throws AMQException + { + ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload(); + + if (connWrapper == null) + { + throw new AMQException("Fatal internal error: transactional context is empty at abortTran"); + } + + if (_logger.isDebugEnabled()) + { + _logger.debug("abort tran called: " + connWrapper.getConnection()); + } + + try + { + Connection conn = connWrapper.getConnection(); + if(connWrapper.requiresCommit()) + { + conn.rollback(); + } + + conn.close(); + } + catch (SQLException e) + { + throw new AMQException("Error aborting transaction: " + e, e); + } + finally + { + context.setPayload(null); + } + } + + public boolean inTran(StoreContext context) + { + return context.getPayload() != null; + } + + public Long getNewMessageId() + { + return _messageId.getAndIncrement(); + } + + public void storeContentBodyChunk(StoreContext context, + Long messageId, + int index, + ContentChunk contentBody, + boolean lastContentBody) throws AMQException + { + boolean localTx = getOrCreateTransaction(context); + Connection conn = getConnection(context); + ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload(); + + try + { + PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_CONTENT); + stmt.setLong(1,messageId); + stmt.setInt(2, index); + byte[] chunkData = new byte[contentBody.getSize()]; + contentBody.getData().duplicate().get(chunkData); + /* this would be the Java 6 way of doing things + Blob dataAsBlob = conn.createBlob(); + dataAsBlob.setBytes(1L, chunkData); + stmt.setBlob(3, dataAsBlob); + */ + ByteArrayInputStream bis = new ByteArrayInputStream(chunkData); + stmt.setBinaryStream(3, bis, chunkData.length); + stmt.executeUpdate(); + connWrapper.requiresCommit(); + + if(localTx) + { + commitTran(context); + } + } + catch (SQLException e) + { + if(localTx) + { + abortTran(context); + } + + throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, e); + } + + } + + public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData mmd) + throws AMQException + { + + boolean localTx = getOrCreateTransaction(context); + Connection conn = getConnection(context); + ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload(); + + try + { + + PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_META_DATA); + stmt.setLong(1,messageId); + stmt.setString(2, mmd.getMessagePublishInfo().getExchange().toString()); + stmt.setString(3, mmd.getMessagePublishInfo().getRoutingKey().toString()); + stmt.setShort(4, mmd.getMessagePublishInfo().isMandatory() ? (short) 1 : (short) 0); + stmt.setShort(5, mmd.getMessagePublishInfo().isImmediate() ? (short) 1 : (short) 0); + + ContentHeaderBody headerBody = mmd.getContentHeaderBody(); + final int bodySize = headerBody.getSize(); + byte[] underlying = new byte[bodySize]; + ByteBuffer buf = ByteBuffer.wrap(underlying); + headerBody.writePayload(buf); +/* + Blob dataAsBlob = conn.createBlob(); + dataAsBlob.setBytes(1L, underlying); + stmt.setBlob(6, dataAsBlob); +*/ + ByteArrayInputStream bis = new ByteArrayInputStream(underlying); + stmt.setBinaryStream(6,bis,underlying.length); + + stmt.setInt(7, mmd.getContentChunkCount()); + + stmt.executeUpdate(); + connWrapper.requiresCommit(); + + if(localTx) + { + commitTran(context); + } + } + catch (SQLException e) + { + if(localTx) + { + abortTran(context); + } + + throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, e); + } + + + } + + public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException + { + boolean localTx = getOrCreateTransaction(context); + Connection conn = getConnection(context); + + + try + { + + PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_META_DATA); + stmt.setLong(1,messageId); + ResultSet rs = stmt.executeQuery(); + + if(rs.next()) + { + final AMQShortString exchange = new AMQShortString(rs.getString(1)); + final AMQShortString routingKey = rs.getString(2) == null ? null : new AMQShortString(rs.getString(2)); + final boolean mandatory = (rs.getShort(3) != (short)0); + final boolean immediate = (rs.getShort(4) != (short)0); + MessagePublishInfo info = new MessagePublishInfo() + { + + public AMQShortString getExchange() + { + return exchange; + } + + public void setExchange(AMQShortString exchange) + { + + } + + public boolean isImmediate() + { + return immediate; + } + + public boolean isMandatory() + { + return mandatory; + } + + public AMQShortString getRoutingKey() + { + return routingKey; + } + } ; + + Blob dataAsBlob = rs.getBlob(5); + + byte[] dataAsBytes = dataAsBlob.getBytes(1,(int) dataAsBlob.length()); + ByteBuffer buf = ByteBuffer.wrap(dataAsBytes); + + ContentHeaderBody chb = ContentHeaderBody.createFromBuffer(buf, dataAsBytes.length); + + if(localTx) + { + commitTran(context); + } + + return new MessageMetaData(info, chb, rs.getInt(6)); + + } + else + { + if(localTx) + { + abortTran(context); + } + throw new AMQException("Metadata not found for message with id " + messageId); + } + } + catch (SQLException e) + { + if(localTx) + { + abortTran(context); + } + + throw new AMQException("Error reading AMQMessage with id " + messageId + " from database: " + e, e); + } + + + } + + public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException + { + boolean localTx = getOrCreateTransaction(context); + Connection conn = getConnection(context); + + + try + { + + PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_CONTENT); + stmt.setLong(1,messageId); + stmt.setInt(2, index); + ResultSet rs = stmt.executeQuery(); + + if(rs.next()) + { + Blob dataAsBlob = rs.getBlob(1); + + final int size = (int) dataAsBlob.length(); + byte[] dataAsBytes = dataAsBlob.getBytes(1, size); + final ByteBuffer buf = ByteBuffer.wrap(dataAsBytes); + + ContentChunk cb = new ContentChunk() + { + + public int getSize() + { + return size; + } + + public ByteBuffer getData() + { + return buf; + } + + public void reduceToFit() + { + + } + }; + + if(localTx) + { + commitTran(context); + } + + return cb; + + } + else + { + if(localTx) + { + abortTran(context); + } + throw new AMQException("Message not found for message with id " + messageId); + } + } + catch (SQLException e) + { + if(localTx) + { + abortTran(context); + } + + throw new AMQException("Error reading AMQMessage with id " + messageId + " from database: " + e, e); + } + + + + } + + public boolean isPersistent() + { + return true; + } + + private void checkNotClosed() throws MessageStoreClosedException + { + if (_closed.get()) + { + throw new MessageStoreClosedException(); + } + } + + + private static final class ProcessAction + { + private final AMQQueue _queue; + private final StoreContext _context; + private final AMQMessage _message; + + public ProcessAction(AMQQueue queue, StoreContext context, AMQMessage message) + { + _queue = queue; + _context = context; + _message = message; + } + + public void process() throws AMQException + { + _queue.enqueue(_context, _message); + + } + + } + + + private void deliverMessages(final StoreContext context, Map<AMQShortString, AMQQueue> queues) + throws SQLException, AMQException + { + Map<Long, AMQMessage> msgMap = new HashMap<Long,AMQMessage>(); + List<ProcessAction> actions = new ArrayList<ProcessAction>(); + + Map<AMQShortString, Integer> queueRecoveries = new TreeMap<AMQShortString, Integer>(); + + final boolean inLocaltran = inTran(context); + Connection conn = null; + try + { + + if(inLocaltran) + { + conn = getConnection(context); + } + else + { + conn = newConnection(); + } + + + MessageHandleFactory messageHandleFactory = new MessageHandleFactory(); + long maxId = 1; + + TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null); + + Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY); + + + while (rs.next()) + { + + + + AMQShortString queueName = new AMQShortString(rs.getString(1)); + + + AMQQueue queue = queues.get(queueName); + if (queue == null) + { + queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, null, false, _virtualHost, null); + + _virtualHost.getQueueRegistry().registerQueue(queue); + queues.put(queueName, queue); + } + + long messageId = rs.getLong(2); + maxId = Math.max(maxId, messageId); + AMQMessage message = msgMap.get(messageId); + + if(message != null) + { + message.incrementReference(); + } + else + { + message = new AMQMessage(messageId, this, messageHandleFactory, txnContext); + msgMap.put(messageId,message); + } + + if (_logger.isDebugEnabled()) + { + _logger.debug("On recovery, delivering " + message.getMessageId() + " to " + queue.getName()); + } + + if (_logger.isInfoEnabled()) + { + Integer count = queueRecoveries.get(queueName); + if (count == null) + { + count = 0; + } + + queueRecoveries.put(queueName, ++count); + + } + + actions.add(new ProcessAction(queue, context, message)); + + } + + for(ProcessAction action : actions) + { + action.process(); + } + + _messageId.set(maxId + 1); + } + catch (SQLException e) + { + _logger.error("Error: " + e, e); + throw e; + } + finally + { + if (inLocaltran && conn != null) + { + conn.close(); + } + } + + if (_logger.isInfoEnabled()) + { + _logger.info("Recovered message counts: " + queueRecoveries); + } + } + + private Connection getConnection(final StoreContext context) + { + return ((ConnectionWrapper)context.getPayload()).getConnection(); + } + + private boolean getOrCreateTransaction(StoreContext context) throws AMQException + { + + ConnectionWrapper tx = (ConnectionWrapper) context.getPayload(); + if (tx == null) + { + beginTran(context); + return true; + } + + return false; + } + + private synchronized void stateTransition(State requiredState, State newState) throws AMQException + { + if (_state != requiredState) + { + throw new AMQException("Cannot transition to the state: " + newState + "; need to be in state: " + requiredState + + "; currently in state: " + _state); + } + + _state = newState; + } +} |