diff options
Diffstat (limited to 'M4-RCs/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java')
-rw-r--r-- | M4-RCs/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java | 1463 |
1 files changed, 0 insertions, 1463 deletions
diff --git a/M4-RCs/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/M4-RCs/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java deleted file mode 100644 index 743a736884..0000000000 --- a/M4-RCs/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java +++ /dev/null @@ -1,1463 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.store; - -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.AMQQueueFactory; -import org.apache.qpid.server.queue.MessageMetaData; -import org.apache.qpid.server.queue.QueueRegistry; - -import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.MessageHandleFactory; -import org.apache.qpid.server.txn.TransactionalContext; -import org.apache.qpid.server.txn.NonTransactionalContext; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.abstraction.ContentChunk; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.commons.configuration.Configuration; -import org.apache.log4j.Logger; -import org.apache.mina.common.ByteBuffer; - -import java.io.File; -import java.io.ByteArrayInputStream; -import java.sql.DriverManager; -import java.sql.Driver; -import java.sql.Connection; -import java.sql.SQLException; -import java.sql.Statement; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.Blob; -import java.sql.Types; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.List; -import java.util.ArrayList; -import java.util.Map; -import java.util.HashMap; -import java.util.TreeMap; - - -public class DerbyMessageStore implements MessageStore -{ - - private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class); - - private static final String ENVIRONMENT_PATH_PROPERTY = "environment-path"; - - - private static final String SQL_DRIVER_NAME = "org.apache.derby.jdbc.EmbeddedDriver"; - - private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION"; - - private static final String EXCHANGE_TABLE_NAME = "QPID_EXCHANGE"; - private static final String QUEUE_TABLE_NAME = "QPID_QUEUE"; - private static final String BINDINGS_TABLE_NAME = "QPID_BINDINGS"; - private static final String QUEUE_ENTRY_TABLE_NAME = "QPID_QUEUE_ENTRY"; - private static final String MESSAGE_META_DATA_TABLE_NAME = "QPID_MESSAGE_META_DATA"; - private static final String MESSAGE_CONTENT_TABLE_NAME = "QPID_MESSAGE_CONTENT"; - - private static final int DB_VERSION = 1; - - - - private VirtualHost _virtualHost; - private static Class<Driver> DRIVER_CLASS; - - private final AtomicLong _messageId = new AtomicLong(1); - private AtomicBoolean _closed = new AtomicBoolean(false); - - private String _connectionURL; - - - - private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+DB_VERSION_TABLE_NAME+" ( version int not null )"; - private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+DB_VERSION_TABLE_NAME+" ( version ) VALUES ( ? )"; - private static final String CREATE_EXCHANGE_TABLE = "CREATE TABLE "+EXCHANGE_TABLE_NAME+" ( name varchar(255) not null, type varchar(255) not null, autodelete SMALLINT not null, PRIMARY KEY ( name ) )"; - private static final String CREATE_QUEUE_TABLE = "CREATE TABLE "+QUEUE_TABLE_NAME+" ( name varchar(255) not null, owner varchar(255), PRIMARY KEY ( name ) )"; - private static final String CREATE_BINDINGS_TABLE = "CREATE TABLE "+BINDINGS_TABLE_NAME+" ( exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255) not null, arguments blob , PRIMARY KEY ( exchange_name, queue_name, binding_key ) )"; - private static final String CREATE_QUEUE_ENTRY_TABLE = "CREATE TABLE "+QUEUE_ENTRY_TABLE_NAME+" ( queue_name varchar(255) not null, message_id bigint not null, PRIMARY KEY (queue_name, message_id) )"; - private static final String CREATE_MESSAGE_META_DATA_TABLE = "CREATE TABLE "+MESSAGE_META_DATA_TABLE_NAME+" ( message_id bigint not null, exchange_name varchar(255) not null, routing_key varchar(255), flag_mandatory smallint not null, flag_immediate smallint not null, content_header blob, chunk_count int not null, PRIMARY KEY ( message_id ) )"; - private static final String CREATE_MESSAGE_CONTENT_TABLE = "CREATE TABLE "+MESSAGE_CONTENT_TABLE_NAME+" ( message_id bigint not null, chunk_id int not null, content_chunk blob , PRIMARY KEY (message_id, chunk_id) )"; - private static final String SELECT_FROM_QUEUE = "SELECT name, owner FROM " + QUEUE_TABLE_NAME; - private static final String SELECT_FROM_EXCHANGE = "SELECT name, type, autodelete FROM " + EXCHANGE_TABLE_NAME; - private static final String SELECT_FROM_BINDINGS = - "SELECT queue_name, binding_key, arguments FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ?"; - private static final String DELETE_FROM_MESSAGE_META_DATA = "DELETE FROM " + MESSAGE_META_DATA_TABLE_NAME + " WHERE message_id = ?"; - private static final String DELETE_FROM_MESSAGE_CONTENT = "DELETE FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ?"; - private static final String INSERT_INTO_EXCHANGE = "INSERT INTO " + EXCHANGE_TABLE_NAME + " ( name, type, autodelete ) VALUES ( ?, ?, ? )"; - private static final String DELETE_FROM_EXCHANGE = "DELETE FROM " + EXCHANGE_TABLE_NAME + " WHERE name = ?"; - private static final String INSERT_INTO_BINDINGS = "INSERT INTO " + BINDINGS_TABLE_NAME + " ( exchange_name, queue_name, binding_key, arguments ) values ( ?, ?, ?, ? )"; - private static final String DELETE_FROM_BINDINGS = "DELETE FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ? AND queue_name = ? AND binding_key = ?"; - private static final String INSERT_INTO_QUEUE = "INSERT INTO " + QUEUE_TABLE_NAME + " (name, owner) VALUES (?, ?)"; - private static final String DELETE_FROM_QUEUE = "DELETE FROM " + QUEUE_TABLE_NAME + " WHERE name = ?"; - private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_name, message_id) values (?,?)"; - private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_name = ? AND message_id =?"; - private static final String INSERT_INTO_MESSAGE_CONTENT = "INSERT INTO " + MESSAGE_CONTENT_TABLE_NAME + "( message_id, chunk_id, content_chunk ) values (?, ?, ?)"; - private static final String INSERT_INTO_MESSAGE_META_DATA = "INSERT INTO " + MESSAGE_META_DATA_TABLE_NAME + "( message_id , exchange_name , routing_key , flag_mandatory , flag_immediate , content_header , chunk_count ) values (?, ?, ?, ?, ?, ?, ?)"; - private static final String SELECT_FROM_MESSAGE_META_DATA = - "SELECT exchange_name , routing_key , flag_mandatory , flag_immediate , content_header , chunk_count FROM " + MESSAGE_META_DATA_TABLE_NAME + " WHERE message_id = ?"; - private static final String SELECT_FROM_MESSAGE_CONTENT = - "SELECT content_chunk FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ? and chunk_id = ?"; - private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_name, message_id FROM " + QUEUE_ENTRY_TABLE_NAME; - private static final String TABLE_EXISTANCE_QUERY = "SELECT 1 FROM SYS.SYSTABLES WHERE TABLENAME = ?"; - - - private enum State - { - INITIAL, - CONFIGURING, - RECOVERING, - STARTED, - CLOSING, - CLOSED - } - - private State _state = State.INITIAL; - - - public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception - { - stateTransition(State.INITIAL, State.CONFIGURING); - - initialiseDriver(); - - _virtualHost = virtualHost; - - _logger.info("Configuring Derby message store for virtual host " + virtualHost.getName()); - QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); - - final String databasePath = config.getString(base + "." + ENVIRONMENT_PATH_PROPERTY, "derbyDB"); - - File environmentPath = new File(databasePath); - if (!environmentPath.exists()) - { - if (!environmentPath.mkdirs()) - { - throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. " - + "Ensure the path is correct and that the permissions are correct."); - } - } - - createOrOpenDatabase(databasePath); - - // this recovers durable queues and persistent messages - - recover(); - - stateTransition(State.RECOVERING, State.STARTED); - - } - - private static synchronized void initialiseDriver() throws ClassNotFoundException - { - if(DRIVER_CLASS == null) - { - DRIVER_CLASS = (Class<Driver>) Class.forName(SQL_DRIVER_NAME); - } - } - - private void createOrOpenDatabase(final String environmentPath) throws SQLException - { - _connectionURL = "jdbc:derby:" + environmentPath + "/" + _virtualHost.getName() + ";create=true"; - - Connection conn = newConnection(); - - createVersionTable(conn); - createExchangeTable(conn); - createQueueTable(conn); - createBindingsTable(conn); - createQueueEntryTable(conn); - createMessageMetaDataTable(conn); - createMessageContentTable(conn); - - conn.close(); - } - - - - private void createVersionTable(final Connection conn) throws SQLException - { - if(!tableExists(DB_VERSION_TABLE_NAME, conn)) - { - Statement stmt = conn.createStatement(); - - stmt.execute(CREATE_DB_VERSION_TABLE); - stmt.close(); - - PreparedStatement pstmt = conn.prepareStatement(INSERT_INTO_DB_VERSION); - pstmt.setInt(1, DB_VERSION); - pstmt.execute(); - pstmt.close(); - } - - } - - - private void createExchangeTable(final Connection conn) throws SQLException - { - if(!tableExists(EXCHANGE_TABLE_NAME, conn)) - { - Statement stmt = conn.createStatement(); - - stmt.execute(CREATE_EXCHANGE_TABLE); - stmt.close(); - } - } - - private void createQueueTable(final Connection conn) throws SQLException - { - if(!tableExists(QUEUE_TABLE_NAME, conn)) - { - Statement stmt = conn.createStatement(); - stmt.execute(CREATE_QUEUE_TABLE); - stmt.close(); - } - } - - private void createBindingsTable(final Connection conn) throws SQLException - { - if(!tableExists(BINDINGS_TABLE_NAME, conn)) - { - Statement stmt = conn.createStatement(); - stmt.execute(CREATE_BINDINGS_TABLE); - - stmt.close(); - } - - } - - private void createQueueEntryTable(final Connection conn) throws SQLException - { - if(!tableExists(QUEUE_ENTRY_TABLE_NAME, conn)) - { - Statement stmt = conn.createStatement(); - stmt.execute(CREATE_QUEUE_ENTRY_TABLE); - - stmt.close(); - } - - } - - private void createMessageMetaDataTable(final Connection conn) throws SQLException - { - if(!tableExists(MESSAGE_META_DATA_TABLE_NAME, conn)) - { - Statement stmt = conn.createStatement(); - stmt.execute(CREATE_MESSAGE_META_DATA_TABLE); - - stmt.close(); - } - - } - - - private void createMessageContentTable(final Connection conn) throws SQLException - { - if(!tableExists(MESSAGE_CONTENT_TABLE_NAME, conn)) - { - Statement stmt = conn.createStatement(); - stmt.execute(CREATE_MESSAGE_CONTENT_TABLE); - - stmt.close(); - } - - } - - - - private boolean tableExists(final String tableName, final Connection conn) throws SQLException - { - PreparedStatement stmt = conn.prepareStatement(TABLE_EXISTANCE_QUERY); - stmt.setString(1, tableName); - ResultSet rs = stmt.executeQuery(); - boolean exists = rs.next(); - rs.close(); - stmt.close(); - return exists; - } - - public void recover() throws AMQException - { - stateTransition(State.CONFIGURING, State.RECOVERING); - - _logger.info("Recovering persistent state..."); - StoreContext context = new StoreContext(); - - try - { - Map<AMQShortString, AMQQueue> queues = loadQueues(); - - recoverExchanges(); - - try - { - - beginTran(context); - - deliverMessages(context, queues); - _logger.info("Persistent state recovered successfully"); - commitTran(context); - - } - finally - { - if(inTran(context)) - { - abortTran(context); - } - } - } - catch (SQLException e) - { - - throw new AMQException("Error recovering persistent state: " + e, e); - } - - } - - private Map<AMQShortString, AMQQueue> loadQueues() throws SQLException, AMQException - { - Connection conn = newConnection(); - - - Statement stmt = conn.createStatement(); - ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE); - Map<AMQShortString, AMQQueue> queueMap = new HashMap<AMQShortString, AMQQueue>(); - while(rs.next()) - { - String queueName = rs.getString(1); - String owner = rs.getString(2); - AMQShortString queueNameShortString = new AMQShortString(queueName); - AMQQueue q = AMQQueueFactory.createAMQQueueImpl(queueNameShortString, true, owner == null ? null : new AMQShortString(owner), false, _virtualHost, - null); - _virtualHost.getQueueRegistry().registerQueue(q); - queueMap.put(queueNameShortString,q); - - } - return queueMap; - } - - private void recoverExchanges() throws AMQException, SQLException - { - for (Exchange exchange : loadExchanges()) - { - recoverExchange(exchange); - } - } - - - private List<Exchange> loadExchanges() throws AMQException, SQLException - { - - List<Exchange> exchanges = new ArrayList<Exchange>(); - Connection conn = null; - try - { - conn = newConnection(); - - - Statement stmt = conn.createStatement(); - ResultSet rs = stmt.executeQuery(SELECT_FROM_EXCHANGE); - - Exchange exchange; - while(rs.next()) - { - String exchangeName = rs.getString(1); - String type = rs.getString(2); - boolean autoDelete = rs.getShort(3) != 0; - - exchange = _virtualHost.getExchangeFactory().createExchange(new AMQShortString(exchangeName), new AMQShortString(type), true, autoDelete, 0); - _virtualHost.getExchangeRegistry().registerExchange(exchange); - exchanges.add(exchange); - - } - return exchanges; - - } - finally - { - if(conn != null) - { - conn.close(); - } - } - - } - - private void recoverExchange(Exchange exchange) throws AMQException, SQLException - { - _logger.info("Recovering durable exchange " + exchange.getName() + " of type " + exchange.getType() + "..."); - - QueueRegistry queueRegistry = _virtualHost.getQueueRegistry(); - - Connection conn = null; - try - { - conn = newConnection(); - - PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_BINDINGS); - stmt.setString(1, exchange.getName().toString()); - - ResultSet rs = stmt.executeQuery(); - - - while(rs.next()) - { - String queueName = rs.getString(1); - String bindingKey = rs.getString(2); - Blob arguments = rs.getBlob(3); - - - AMQQueue queue = queueRegistry.getQueue(new AMQShortString(queueName)); - if (queue == null) - { - _logger.error("Unkown queue: " + queueName + " cannot be bound to exchange: " - + exchange.getName()); - } - else - { - _logger.info("Restoring binding: (Exchange: " + exchange.getName() + ", Queue: " + queueName - + ", Routing Key: " + bindingKey + ", Arguments: " + arguments - + ")"); - - FieldTable argumentsFT = null; - if(arguments != null) - { - byte[] argumentBytes = arguments.getBytes(0, (int) arguments.length()); - ByteBuffer buf = ByteBuffer.wrap(argumentBytes); - argumentsFT = new FieldTable(buf,arguments.length()); - } - - queue.bind(exchange, bindingKey == null ? null : new AMQShortString(bindingKey), argumentsFT); - - } - } - } - finally - { - if(conn != null) - { - conn.close(); - } - } - } - - public void close() throws Exception - { - _closed.getAndSet(true); - } - - public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException - { - - boolean localTx = getOrCreateTransaction(storeContext); - - Connection conn = getConnection(storeContext); - ConnectionWrapper wrapper = (ConnectionWrapper) storeContext.getPayload(); - - - if (_logger.isDebugEnabled()) - { - _logger.debug("Message Id: " + messageId + " Removing"); - } - - // first we need to look up the header to get the chunk count - MessageMetaData mmd = getMessageMetaData(storeContext, messageId); - try - { - PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_META_DATA); - stmt.setLong(1,messageId); - wrapper.setRequiresCommit(); - int results = stmt.executeUpdate(); - - if (results == 0) - { - if (localTx) - { - abortTran(storeContext); - } - - throw new AMQException("Message metadata not found for message id " + messageId); - } - stmt.close(); - - if (_logger.isDebugEnabled()) - { - _logger.debug("Deleted metadata for message " + messageId); - } - - stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_CONTENT); - stmt.setLong(1,messageId); - results = stmt.executeUpdate(); - - if(results != mmd.getContentChunkCount()) - { - if (localTx) - { - abortTran(storeContext); - } - throw new AMQException("Unexpected number of content chunks when deleting message. Expected " + mmd.getContentChunkCount() + " but found " + results); - - } - - if (localTx) - { - commitTran(storeContext); - } - } - catch (SQLException e) - { - if ((conn != null) && localTx) - { - abortTran(storeContext); - } - - throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, e); - } - - } - - public void createExchange(Exchange exchange) throws AMQException - { - if (_state != State.RECOVERING) - { - try - { - Connection conn = null; - - try - { - conn = newConnection(); - - PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_EXCHANGE); - stmt.setString(1, exchange.getName().toString()); - stmt.setString(2, exchange.getType().toString()); - stmt.setShort(3, exchange.isAutoDelete() ? (short) 1 : (short) 0); - stmt.execute(); - stmt.close(); - conn.commit(); - - } - finally - { - if(conn != null) - { - conn.close(); - } - } - } - catch (SQLException e) - { - throw new AMQException("Error writing Exchange with name " + exchange.getName() + " to database: " + e, e); - } - } - - } - - public void removeExchange(Exchange exchange) throws AMQException - { - Connection conn = null; - - try - { - conn = newConnection(); - PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_EXCHANGE); - stmt.setString(1, exchange.getName().toString()); - int results = stmt.executeUpdate(); - if(results == 0) - { - throw new AMQException("Exchange " + exchange.getName() + " not found"); - } - else - { - conn.commit(); - stmt.close(); - } - } - catch (SQLException e) - { - throw new AMQException("Error writing deleting with name " + exchange.getName() + " from database: " + e, e); - } - finally - { - if(conn != null) - { - try - { - conn.close(); - } - catch (SQLException e) - { - _logger.error(e); - } - } - - } - } - - public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) - throws AMQException - { - if (_state != State.RECOVERING) - { - Connection conn = null; - - - try - { - conn = newConnection(); - PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_BINDINGS); - stmt.setString(1, exchange.getName().toString() ); - stmt.setString(2, queue.getName().toString()); - stmt.setString(3, routingKey == null ? null : routingKey.toString()); - if(args != null) - { - /* This would be the Java 6 way of setting a Blob - Blob blobArgs = conn.createBlob(); - blobArgs.setBytes(0, args.getDataAsBytes()); - stmt.setBlob(4, blobArgs); - */ - byte[] bytes = args.getDataAsBytes(); - ByteArrayInputStream bis = new ByteArrayInputStream(bytes); - stmt.setBinaryStream(4, bis, bytes.length); - } - else - { - stmt.setNull(4, Types.BLOB); - } - - stmt.executeUpdate(); - conn.commit(); - stmt.close(); - } - catch (SQLException e) - { - throw new AMQException("Error writing binding for AMQQueue with name " + queue.getName() + " to exchange " - + exchange.getName() + " to database: " + e, e); - } - finally - { - if(conn != null) - { - try - { - conn.close(); - } - catch (SQLException e) - { - _logger.error(e); - } - } - - } - - } - - - } - - public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) - throws AMQException - { - Connection conn = null; - - - try - { - conn = newConnection(); - // exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255), arguments blob - PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_BINDINGS); - stmt.setString(1, exchange.getName().toString() ); - stmt.setString(2, queue.getName().toString()); - stmt.setString(3, routingKey == null ? null : routingKey.toString()); - - - if(stmt.executeUpdate() != 1) - { - throw new AMQException("Queue binding for queue with name " + queue.getName() + " to exchange " - + exchange.getName() + " not found"); - } - conn.commit(); - stmt.close(); - } - catch (SQLException e) - { - throw new AMQException("Error removing binding for AMQQueue with name " + queue.getName() + " to exchange " - + exchange.getName() + " in database: " + e, e); - } - finally - { - if(conn != null) - { - try - { - conn.close(); - } - catch (SQLException e) - { - _logger.error(e); - } - } - - } - - - } - - public void createQueue(AMQQueue queue) throws AMQException - { - createQueue(queue, null); - } - - public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException - { - _logger.debug("public void createQueue(AMQQueue queue = " + queue + "): called"); - - if (_state != State.RECOVERING) - { - try - { - Connection conn = newConnection(); - - PreparedStatement stmt = - conn.prepareStatement(INSERT_INTO_QUEUE); - - stmt.setString(1, queue.getName().toString()); - stmt.setString(2, queue.getOwner() == null ? null : queue.getOwner().toString()); - - stmt.execute(); - - stmt.close(); - - conn.commit(); - - conn.close(); - } - catch (SQLException e) - { - throw new AMQException("Error writing AMQQueue with name " + queue.getName() + " to database: " + e, e); - } - } - } - - private Connection newConnection() throws SQLException - { - final Connection connection = DriverManager.getConnection(_connectionURL); - return connection; - } - - public void removeQueue(final AMQQueue queue) throws AMQException - { - AMQShortString name = queue.getName(); - _logger.debug("public void removeQueue(AMQShortString name = " + name + "): called"); - Connection conn = null; - - - try - { - conn = newConnection(); - PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE); - stmt.setString(1, name.toString()); - int results = stmt.executeUpdate(); - - - if (results == 0) - { - throw new AMQException("Queue " + name + " not found"); - } - - conn.commit(); - stmt.close(); - } - catch (SQLException e) - { - throw new AMQException("Error writing deleting with name " + name + " from database: " + e, e); - } - finally - { - if(conn != null) - { - try - { - conn.close(); - } - catch (SQLException e) - { - _logger.error(e); - } - } - - } - - - } - - public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException - { - AMQShortString name = queue.getName(); - - boolean localTx = getOrCreateTransaction(context); - Connection conn = getConnection(context); - ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload(); - - try - { - PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY); - stmt.setString(1,name.toString()); - stmt.setLong(2,messageId); - stmt.executeUpdate(); - connWrapper.requiresCommit(); - - if(localTx) - { - commitTran(context); - } - - - - if (_logger.isDebugEnabled()) - { - _logger.debug("Enqueuing message " + messageId + " on queue " + name + "[Connection" + conn + "]"); - } - } - catch (SQLException e) - { - if(localTx) - { - abortTran(context); - } - _logger.error("Failed to enqueue: " + e, e); - throw new AMQException("Error writing enqueued message with id " + messageId + " for queue " + name - + " to database", e); - } - - } - - public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException - { - AMQShortString name = queue.getName(); - - boolean localTx = getOrCreateTransaction(context); - Connection conn = getConnection(context); - ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload(); - - try - { - PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE_ENTRY); - stmt.setString(1,name.toString()); - stmt.setLong(2,messageId); - int results = stmt.executeUpdate(); - - connWrapper.requiresCommit(); - - if(results != 1) - { - throw new AMQException("Unable to find message with id " + messageId + " on queue " + name); - } - - if(localTx) - { - commitTran(context); - } - - - - if (_logger.isDebugEnabled()) - { - _logger.debug("Dequeuing message " + messageId + " on queue " + name + "[Connection" + conn + "]"); - } - } - catch (SQLException e) - { - if(localTx) - { - abortTran(context); - } - _logger.error("Failed to dequeue: " + e, e); - throw new AMQException("Error deleting enqueued message with id " + messageId + " for queue " + name - + " from database", e); - } - - } - - private static final class ConnectionWrapper - { - private final Connection _connection; - private boolean _requiresCommit; - - public ConnectionWrapper(Connection conn) - { - _connection = conn; - } - - public void setRequiresCommit() - { - _requiresCommit = true; - } - - public boolean requiresCommit() - { - return _requiresCommit; - } - - public Connection getConnection() - { - return _connection; - } - } - - public void beginTran(StoreContext context) throws AMQException - { - if (context.getPayload() != null) - { - throw new AMQException("Fatal internal error: transactional context is not empty at beginTran: " - + context.getPayload()); - } - else - { - try - { - Connection conn = newConnection(); - - - context.setPayload(new ConnectionWrapper(conn)); - } - catch (SQLException e) - { - throw new AMQException("Error starting transaction: " + e, e); - } - } - } - - public void commitTran(StoreContext context) throws AMQException - { - ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload(); - - if (connWrapper == null) - { - throw new AMQException("Fatal internal error: transactional context is empty at commitTran"); - } - - try - { - Connection conn = connWrapper.getConnection(); - if(connWrapper.requiresCommit()) - { - conn.commit(); - - if (_logger.isDebugEnabled()) - { - _logger.debug("commit tran completed"); - } - - } - conn.close(); - } - catch (SQLException e) - { - throw new AMQException("Error commit tx: " + e, e); - } - finally - { - context.setPayload(null); - } - } - - public void abortTran(StoreContext context) throws AMQException - { - ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload(); - - if (connWrapper == null) - { - throw new AMQException("Fatal internal error: transactional context is empty at abortTran"); - } - - if (_logger.isDebugEnabled()) - { - _logger.debug("abort tran called: " + connWrapper.getConnection()); - } - - try - { - Connection conn = connWrapper.getConnection(); - if(connWrapper.requiresCommit()) - { - conn.rollback(); - } - - conn.close(); - } - catch (SQLException e) - { - throw new AMQException("Error aborting transaction: " + e, e); - } - finally - { - context.setPayload(null); - } - } - - public boolean inTran(StoreContext context) - { - return context.getPayload() != null; - } - - public Long getNewMessageId() - { - return _messageId.getAndIncrement(); - } - - public void storeContentBodyChunk(StoreContext context, - Long messageId, - int index, - ContentChunk contentBody, - boolean lastContentBody) throws AMQException - { - boolean localTx = getOrCreateTransaction(context); - Connection conn = getConnection(context); - ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload(); - - try - { - PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_CONTENT); - stmt.setLong(1,messageId); - stmt.setInt(2, index); - byte[] chunkData = new byte[contentBody.getSize()]; - contentBody.getData().duplicate().get(chunkData); - /* this would be the Java 6 way of doing things - Blob dataAsBlob = conn.createBlob(); - dataAsBlob.setBytes(1L, chunkData); - stmt.setBlob(3, dataAsBlob); - */ - ByteArrayInputStream bis = new ByteArrayInputStream(chunkData); - stmt.setBinaryStream(3, bis, chunkData.length); - stmt.executeUpdate(); - connWrapper.requiresCommit(); - - if(localTx) - { - commitTran(context); - } - } - catch (SQLException e) - { - if(localTx) - { - abortTran(context); - } - - throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, e); - } - - } - - public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData mmd) - throws AMQException - { - - boolean localTx = getOrCreateTransaction(context); - Connection conn = getConnection(context); - ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload(); - - try - { - - PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_META_DATA); - stmt.setLong(1,messageId); - stmt.setString(2, mmd.getMessagePublishInfo().getExchange().toString()); - stmt.setString(3, mmd.getMessagePublishInfo().getRoutingKey().toString()); - stmt.setShort(4, mmd.getMessagePublishInfo().isMandatory() ? (short) 1 : (short) 0); - stmt.setShort(5, mmd.getMessagePublishInfo().isImmediate() ? (short) 1 : (short) 0); - - ContentHeaderBody headerBody = mmd.getContentHeaderBody(); - final int bodySize = headerBody.getSize(); - byte[] underlying = new byte[bodySize]; - ByteBuffer buf = ByteBuffer.wrap(underlying); - headerBody.writePayload(buf); -/* - Blob dataAsBlob = conn.createBlob(); - dataAsBlob.setBytes(1L, underlying); - stmt.setBlob(6, dataAsBlob); -*/ - ByteArrayInputStream bis = new ByteArrayInputStream(underlying); - stmt.setBinaryStream(6,bis,underlying.length); - - stmt.setInt(7, mmd.getContentChunkCount()); - - stmt.executeUpdate(); - connWrapper.requiresCommit(); - - if(localTx) - { - commitTran(context); - } - } - catch (SQLException e) - { - if(localTx) - { - abortTran(context); - } - - throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, e); - } - - - } - - public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException - { - boolean localTx = getOrCreateTransaction(context); - Connection conn = getConnection(context); - - - try - { - - PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_META_DATA); - stmt.setLong(1,messageId); - ResultSet rs = stmt.executeQuery(); - - if(rs.next()) - { - final AMQShortString exchange = new AMQShortString(rs.getString(1)); - final AMQShortString routingKey = rs.getString(2) == null ? null : new AMQShortString(rs.getString(2)); - final boolean mandatory = (rs.getShort(3) != (short)0); - final boolean immediate = (rs.getShort(4) != (short)0); - MessagePublishInfo info = new MessagePublishInfo() - { - - public AMQShortString getExchange() - { - return exchange; - } - - public void setExchange(AMQShortString exchange) - { - - } - - public boolean isImmediate() - { - return immediate; - } - - public boolean isMandatory() - { - return mandatory; - } - - public AMQShortString getRoutingKey() - { - return routingKey; - } - } ; - - Blob dataAsBlob = rs.getBlob(5); - - byte[] dataAsBytes = dataAsBlob.getBytes(1,(int) dataAsBlob.length()); - ByteBuffer buf = ByteBuffer.wrap(dataAsBytes); - - ContentHeaderBody chb = ContentHeaderBody.createFromBuffer(buf, dataAsBytes.length); - - if(localTx) - { - commitTran(context); - } - - return new MessageMetaData(info, chb, rs.getInt(6)); - - } - else - { - if(localTx) - { - abortTran(context); - } - throw new AMQException("Metadata not found for message with id " + messageId); - } - } - catch (SQLException e) - { - if(localTx) - { - abortTran(context); - } - - throw new AMQException("Error reading AMQMessage with id " + messageId + " from database: " + e, e); - } - - - } - - public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException - { - boolean localTx = getOrCreateTransaction(context); - Connection conn = getConnection(context); - - - try - { - - PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_CONTENT); - stmt.setLong(1,messageId); - stmt.setInt(2, index); - ResultSet rs = stmt.executeQuery(); - - if(rs.next()) - { - Blob dataAsBlob = rs.getBlob(1); - - final int size = (int) dataAsBlob.length(); - byte[] dataAsBytes = dataAsBlob.getBytes(1, size); - final ByteBuffer buf = ByteBuffer.wrap(dataAsBytes); - - ContentChunk cb = new ContentChunk() - { - - public int getSize() - { - return size; - } - - public ByteBuffer getData() - { - return buf; - } - - public void reduceToFit() - { - - } - }; - - if(localTx) - { - commitTran(context); - } - - return cb; - - } - else - { - if(localTx) - { - abortTran(context); - } - throw new AMQException("Message not found for message with id " + messageId); - } - } - catch (SQLException e) - { - if(localTx) - { - abortTran(context); - } - - throw new AMQException("Error reading AMQMessage with id " + messageId + " from database: " + e, e); - } - - - - } - - public boolean isPersistent() - { - return true; - } - - private void checkNotClosed() throws MessageStoreClosedException - { - if (_closed.get()) - { - throw new MessageStoreClosedException(); - } - } - - - private static final class ProcessAction - { - private final AMQQueue _queue; - private final StoreContext _context; - private final AMQMessage _message; - - public ProcessAction(AMQQueue queue, StoreContext context, AMQMessage message) - { - _queue = queue; - _context = context; - _message = message; - } - - public void process() throws AMQException - { - _queue.enqueue(_context, _message); - - } - - } - - - private void deliverMessages(final StoreContext context, Map<AMQShortString, AMQQueue> queues) - throws SQLException, AMQException - { - Map<Long, AMQMessage> msgMap = new HashMap<Long,AMQMessage>(); - List<ProcessAction> actions = new ArrayList<ProcessAction>(); - - Map<AMQShortString, Integer> queueRecoveries = new TreeMap<AMQShortString, Integer>(); - - final boolean inLocaltran = inTran(context); - Connection conn = null; - try - { - - if(inLocaltran) - { - conn = getConnection(context); - } - else - { - conn = newConnection(); - } - - - MessageHandleFactory messageHandleFactory = new MessageHandleFactory(); - long maxId = 1; - - TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null); - - Statement stmt = conn.createStatement(); - ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY); - - - while (rs.next()) - { - - - - AMQShortString queueName = new AMQShortString(rs.getString(1)); - - - AMQQueue queue = queues.get(queueName); - if (queue == null) - { - queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, null, false, _virtualHost, null); - - _virtualHost.getQueueRegistry().registerQueue(queue); - queues.put(queueName, queue); - } - - long messageId = rs.getLong(2); - maxId = Math.max(maxId, messageId); - AMQMessage message = msgMap.get(messageId); - - if(message != null) - { - message.incrementReference(); - } - else - { - message = new AMQMessage(messageId, this, messageHandleFactory, txnContext); - msgMap.put(messageId,message); - } - - if (_logger.isDebugEnabled()) - { - _logger.debug("On recovery, delivering " + message.getMessageId() + " to " + queue.getName()); - } - - if (_logger.isInfoEnabled()) - { - Integer count = queueRecoveries.get(queueName); - if (count == null) - { - count = 0; - } - - queueRecoveries.put(queueName, ++count); - - } - - actions.add(new ProcessAction(queue, context, message)); - - } - - for(ProcessAction action : actions) - { - action.process(); - } - - _messageId.set(maxId + 1); - } - catch (SQLException e) - { - _logger.error("Error: " + e, e); - throw e; - } - finally - { - if (inLocaltran && conn != null) - { - conn.close(); - } - } - - if (_logger.isInfoEnabled()) - { - _logger.info("Recovered message counts: " + queueRecoveries); - } - } - - private Connection getConnection(final StoreContext context) - { - return ((ConnectionWrapper)context.getPayload()).getConnection(); - } - - private boolean getOrCreateTransaction(StoreContext context) throws AMQException - { - - ConnectionWrapper tx = (ConnectionWrapper) context.getPayload(); - if (tx == null) - { - beginTran(context); - return true; - } - - return false; - } - - private synchronized void stateTransition(State requiredState, State newState) throws AMQException - { - if (_state != requiredState) - { - throw new AMQException("Cannot transition to the state: " + newState + "; need to be in state: " + requiredState - + "; currently in state: " + _state); - } - - _state = newState; - } -} |