summaryrefslogtreecommitdiff
path: root/M4-RCs/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
diff options
context:
space:
mode:
Diffstat (limited to 'M4-RCs/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java')
-rw-r--r--M4-RCs/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java1463
1 files changed, 0 insertions, 1463 deletions
diff --git a/M4-RCs/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/M4-RCs/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
deleted file mode 100644
index 743a736884..0000000000
--- a/M4-RCs/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
+++ /dev/null
@@ -1,1463 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-package org.apache.qpid.server.store;
-
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AMQQueueFactory;
-import org.apache.qpid.server.queue.MessageMetaData;
-import org.apache.qpid.server.queue.QueueRegistry;
-
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.MessageHandleFactory;
-import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.commons.configuration.Configuration;
-import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
-
-import java.io.File;
-import java.io.ByteArrayInputStream;
-import java.sql.DriverManager;
-import java.sql.Driver;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.Blob;
-import java.sql.Types;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.TreeMap;
-
-
-public class DerbyMessageStore implements MessageStore
-{
-
- private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class);
-
- private static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
-
-
- private static final String SQL_DRIVER_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
-
- private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION";
-
- private static final String EXCHANGE_TABLE_NAME = "QPID_EXCHANGE";
- private static final String QUEUE_TABLE_NAME = "QPID_QUEUE";
- private static final String BINDINGS_TABLE_NAME = "QPID_BINDINGS";
- private static final String QUEUE_ENTRY_TABLE_NAME = "QPID_QUEUE_ENTRY";
- private static final String MESSAGE_META_DATA_TABLE_NAME = "QPID_MESSAGE_META_DATA";
- private static final String MESSAGE_CONTENT_TABLE_NAME = "QPID_MESSAGE_CONTENT";
-
- private static final int DB_VERSION = 1;
-
-
-
- private VirtualHost _virtualHost;
- private static Class<Driver> DRIVER_CLASS;
-
- private final AtomicLong _messageId = new AtomicLong(1);
- private AtomicBoolean _closed = new AtomicBoolean(false);
-
- private String _connectionURL;
-
-
-
- private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+DB_VERSION_TABLE_NAME+" ( version int not null )";
- private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+DB_VERSION_TABLE_NAME+" ( version ) VALUES ( ? )";
- private static final String CREATE_EXCHANGE_TABLE = "CREATE TABLE "+EXCHANGE_TABLE_NAME+" ( name varchar(255) not null, type varchar(255) not null, autodelete SMALLINT not null, PRIMARY KEY ( name ) )";
- private static final String CREATE_QUEUE_TABLE = "CREATE TABLE "+QUEUE_TABLE_NAME+" ( name varchar(255) not null, owner varchar(255), PRIMARY KEY ( name ) )";
- private static final String CREATE_BINDINGS_TABLE = "CREATE TABLE "+BINDINGS_TABLE_NAME+" ( exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255) not null, arguments blob , PRIMARY KEY ( exchange_name, queue_name, binding_key ) )";
- private static final String CREATE_QUEUE_ENTRY_TABLE = "CREATE TABLE "+QUEUE_ENTRY_TABLE_NAME+" ( queue_name varchar(255) not null, message_id bigint not null, PRIMARY KEY (queue_name, message_id) )";
- private static final String CREATE_MESSAGE_META_DATA_TABLE = "CREATE TABLE "+MESSAGE_META_DATA_TABLE_NAME+" ( message_id bigint not null, exchange_name varchar(255) not null, routing_key varchar(255), flag_mandatory smallint not null, flag_immediate smallint not null, content_header blob, chunk_count int not null, PRIMARY KEY ( message_id ) )";
- private static final String CREATE_MESSAGE_CONTENT_TABLE = "CREATE TABLE "+MESSAGE_CONTENT_TABLE_NAME+" ( message_id bigint not null, chunk_id int not null, content_chunk blob , PRIMARY KEY (message_id, chunk_id) )";
- private static final String SELECT_FROM_QUEUE = "SELECT name, owner FROM " + QUEUE_TABLE_NAME;
- private static final String SELECT_FROM_EXCHANGE = "SELECT name, type, autodelete FROM " + EXCHANGE_TABLE_NAME;
- private static final String SELECT_FROM_BINDINGS =
- "SELECT queue_name, binding_key, arguments FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ?";
- private static final String DELETE_FROM_MESSAGE_META_DATA = "DELETE FROM " + MESSAGE_META_DATA_TABLE_NAME + " WHERE message_id = ?";
- private static final String DELETE_FROM_MESSAGE_CONTENT = "DELETE FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ?";
- private static final String INSERT_INTO_EXCHANGE = "INSERT INTO " + EXCHANGE_TABLE_NAME + " ( name, type, autodelete ) VALUES ( ?, ?, ? )";
- private static final String DELETE_FROM_EXCHANGE = "DELETE FROM " + EXCHANGE_TABLE_NAME + " WHERE name = ?";
- private static final String INSERT_INTO_BINDINGS = "INSERT INTO " + BINDINGS_TABLE_NAME + " ( exchange_name, queue_name, binding_key, arguments ) values ( ?, ?, ?, ? )";
- private static final String DELETE_FROM_BINDINGS = "DELETE FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ? AND queue_name = ? AND binding_key = ?";
- private static final String INSERT_INTO_QUEUE = "INSERT INTO " + QUEUE_TABLE_NAME + " (name, owner) VALUES (?, ?)";
- private static final String DELETE_FROM_QUEUE = "DELETE FROM " + QUEUE_TABLE_NAME + " WHERE name = ?";
- private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_name, message_id) values (?,?)";
- private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_name = ? AND message_id =?";
- private static final String INSERT_INTO_MESSAGE_CONTENT = "INSERT INTO " + MESSAGE_CONTENT_TABLE_NAME + "( message_id, chunk_id, content_chunk ) values (?, ?, ?)";
- private static final String INSERT_INTO_MESSAGE_META_DATA = "INSERT INTO " + MESSAGE_META_DATA_TABLE_NAME + "( message_id , exchange_name , routing_key , flag_mandatory , flag_immediate , content_header , chunk_count ) values (?, ?, ?, ?, ?, ?, ?)";
- private static final String SELECT_FROM_MESSAGE_META_DATA =
- "SELECT exchange_name , routing_key , flag_mandatory , flag_immediate , content_header , chunk_count FROM " + MESSAGE_META_DATA_TABLE_NAME + " WHERE message_id = ?";
- private static final String SELECT_FROM_MESSAGE_CONTENT =
- "SELECT content_chunk FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ? and chunk_id = ?";
- private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_name, message_id FROM " + QUEUE_ENTRY_TABLE_NAME;
- private static final String TABLE_EXISTANCE_QUERY = "SELECT 1 FROM SYS.SYSTABLES WHERE TABLENAME = ?";
-
-
- private enum State
- {
- INITIAL,
- CONFIGURING,
- RECOVERING,
- STARTED,
- CLOSING,
- CLOSED
- }
-
- private State _state = State.INITIAL;
-
-
- public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception
- {
- stateTransition(State.INITIAL, State.CONFIGURING);
-
- initialiseDriver();
-
- _virtualHost = virtualHost;
-
- _logger.info("Configuring Derby message store for virtual host " + virtualHost.getName());
- QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
-
- final String databasePath = config.getString(base + "." + ENVIRONMENT_PATH_PROPERTY, "derbyDB");
-
- File environmentPath = new File(databasePath);
- if (!environmentPath.exists())
- {
- if (!environmentPath.mkdirs())
- {
- throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. "
- + "Ensure the path is correct and that the permissions are correct.");
- }
- }
-
- createOrOpenDatabase(databasePath);
-
- // this recovers durable queues and persistent messages
-
- recover();
-
- stateTransition(State.RECOVERING, State.STARTED);
-
- }
-
- private static synchronized void initialiseDriver() throws ClassNotFoundException
- {
- if(DRIVER_CLASS == null)
- {
- DRIVER_CLASS = (Class<Driver>) Class.forName(SQL_DRIVER_NAME);
- }
- }
-
- private void createOrOpenDatabase(final String environmentPath) throws SQLException
- {
- _connectionURL = "jdbc:derby:" + environmentPath + "/" + _virtualHost.getName() + ";create=true";
-
- Connection conn = newConnection();
-
- createVersionTable(conn);
- createExchangeTable(conn);
- createQueueTable(conn);
- createBindingsTable(conn);
- createQueueEntryTable(conn);
- createMessageMetaDataTable(conn);
- createMessageContentTable(conn);
-
- conn.close();
- }
-
-
-
- private void createVersionTable(final Connection conn) throws SQLException
- {
- if(!tableExists(DB_VERSION_TABLE_NAME, conn))
- {
- Statement stmt = conn.createStatement();
-
- stmt.execute(CREATE_DB_VERSION_TABLE);
- stmt.close();
-
- PreparedStatement pstmt = conn.prepareStatement(INSERT_INTO_DB_VERSION);
- pstmt.setInt(1, DB_VERSION);
- pstmt.execute();
- pstmt.close();
- }
-
- }
-
-
- private void createExchangeTable(final Connection conn) throws SQLException
- {
- if(!tableExists(EXCHANGE_TABLE_NAME, conn))
- {
- Statement stmt = conn.createStatement();
-
- stmt.execute(CREATE_EXCHANGE_TABLE);
- stmt.close();
- }
- }
-
- private void createQueueTable(final Connection conn) throws SQLException
- {
- if(!tableExists(QUEUE_TABLE_NAME, conn))
- {
- Statement stmt = conn.createStatement();
- stmt.execute(CREATE_QUEUE_TABLE);
- stmt.close();
- }
- }
-
- private void createBindingsTable(final Connection conn) throws SQLException
- {
- if(!tableExists(BINDINGS_TABLE_NAME, conn))
- {
- Statement stmt = conn.createStatement();
- stmt.execute(CREATE_BINDINGS_TABLE);
-
- stmt.close();
- }
-
- }
-
- private void createQueueEntryTable(final Connection conn) throws SQLException
- {
- if(!tableExists(QUEUE_ENTRY_TABLE_NAME, conn))
- {
- Statement stmt = conn.createStatement();
- stmt.execute(CREATE_QUEUE_ENTRY_TABLE);
-
- stmt.close();
- }
-
- }
-
- private void createMessageMetaDataTable(final Connection conn) throws SQLException
- {
- if(!tableExists(MESSAGE_META_DATA_TABLE_NAME, conn))
- {
- Statement stmt = conn.createStatement();
- stmt.execute(CREATE_MESSAGE_META_DATA_TABLE);
-
- stmt.close();
- }
-
- }
-
-
- private void createMessageContentTable(final Connection conn) throws SQLException
- {
- if(!tableExists(MESSAGE_CONTENT_TABLE_NAME, conn))
- {
- Statement stmt = conn.createStatement();
- stmt.execute(CREATE_MESSAGE_CONTENT_TABLE);
-
- stmt.close();
- }
-
- }
-
-
-
- private boolean tableExists(final String tableName, final Connection conn) throws SQLException
- {
- PreparedStatement stmt = conn.prepareStatement(TABLE_EXISTANCE_QUERY);
- stmt.setString(1, tableName);
- ResultSet rs = stmt.executeQuery();
- boolean exists = rs.next();
- rs.close();
- stmt.close();
- return exists;
- }
-
- public void recover() throws AMQException
- {
- stateTransition(State.CONFIGURING, State.RECOVERING);
-
- _logger.info("Recovering persistent state...");
- StoreContext context = new StoreContext();
-
- try
- {
- Map<AMQShortString, AMQQueue> queues = loadQueues();
-
- recoverExchanges();
-
- try
- {
-
- beginTran(context);
-
- deliverMessages(context, queues);
- _logger.info("Persistent state recovered successfully");
- commitTran(context);
-
- }
- finally
- {
- if(inTran(context))
- {
- abortTran(context);
- }
- }
- }
- catch (SQLException e)
- {
-
- throw new AMQException("Error recovering persistent state: " + e, e);
- }
-
- }
-
- private Map<AMQShortString, AMQQueue> loadQueues() throws SQLException, AMQException
- {
- Connection conn = newConnection();
-
-
- Statement stmt = conn.createStatement();
- ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE);
- Map<AMQShortString, AMQQueue> queueMap = new HashMap<AMQShortString, AMQQueue>();
- while(rs.next())
- {
- String queueName = rs.getString(1);
- String owner = rs.getString(2);
- AMQShortString queueNameShortString = new AMQShortString(queueName);
- AMQQueue q = AMQQueueFactory.createAMQQueueImpl(queueNameShortString, true, owner == null ? null : new AMQShortString(owner), false, _virtualHost,
- null);
- _virtualHost.getQueueRegistry().registerQueue(q);
- queueMap.put(queueNameShortString,q);
-
- }
- return queueMap;
- }
-
- private void recoverExchanges() throws AMQException, SQLException
- {
- for (Exchange exchange : loadExchanges())
- {
- recoverExchange(exchange);
- }
- }
-
-
- private List<Exchange> loadExchanges() throws AMQException, SQLException
- {
-
- List<Exchange> exchanges = new ArrayList<Exchange>();
- Connection conn = null;
- try
- {
- conn = newConnection();
-
-
- Statement stmt = conn.createStatement();
- ResultSet rs = stmt.executeQuery(SELECT_FROM_EXCHANGE);
-
- Exchange exchange;
- while(rs.next())
- {
- String exchangeName = rs.getString(1);
- String type = rs.getString(2);
- boolean autoDelete = rs.getShort(3) != 0;
-
- exchange = _virtualHost.getExchangeFactory().createExchange(new AMQShortString(exchangeName), new AMQShortString(type), true, autoDelete, 0);
- _virtualHost.getExchangeRegistry().registerExchange(exchange);
- exchanges.add(exchange);
-
- }
- return exchanges;
-
- }
- finally
- {
- if(conn != null)
- {
- conn.close();
- }
- }
-
- }
-
- private void recoverExchange(Exchange exchange) throws AMQException, SQLException
- {
- _logger.info("Recovering durable exchange " + exchange.getName() + " of type " + exchange.getType() + "...");
-
- QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
-
- Connection conn = null;
- try
- {
- conn = newConnection();
-
- PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_BINDINGS);
- stmt.setString(1, exchange.getName().toString());
-
- ResultSet rs = stmt.executeQuery();
-
-
- while(rs.next())
- {
- String queueName = rs.getString(1);
- String bindingKey = rs.getString(2);
- Blob arguments = rs.getBlob(3);
-
-
- AMQQueue queue = queueRegistry.getQueue(new AMQShortString(queueName));
- if (queue == null)
- {
- _logger.error("Unkown queue: " + queueName + " cannot be bound to exchange: "
- + exchange.getName());
- }
- else
- {
- _logger.info("Restoring binding: (Exchange: " + exchange.getName() + ", Queue: " + queueName
- + ", Routing Key: " + bindingKey + ", Arguments: " + arguments
- + ")");
-
- FieldTable argumentsFT = null;
- if(arguments != null)
- {
- byte[] argumentBytes = arguments.getBytes(0, (int) arguments.length());
- ByteBuffer buf = ByteBuffer.wrap(argumentBytes);
- argumentsFT = new FieldTable(buf,arguments.length());
- }
-
- queue.bind(exchange, bindingKey == null ? null : new AMQShortString(bindingKey), argumentsFT);
-
- }
- }
- }
- finally
- {
- if(conn != null)
- {
- conn.close();
- }
- }
- }
-
- public void close() throws Exception
- {
- _closed.getAndSet(true);
- }
-
- public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException
- {
-
- boolean localTx = getOrCreateTransaction(storeContext);
-
- Connection conn = getConnection(storeContext);
- ConnectionWrapper wrapper = (ConnectionWrapper) storeContext.getPayload();
-
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Message Id: " + messageId + " Removing");
- }
-
- // first we need to look up the header to get the chunk count
- MessageMetaData mmd = getMessageMetaData(storeContext, messageId);
- try
- {
- PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_META_DATA);
- stmt.setLong(1,messageId);
- wrapper.setRequiresCommit();
- int results = stmt.executeUpdate();
-
- if (results == 0)
- {
- if (localTx)
- {
- abortTran(storeContext);
- }
-
- throw new AMQException("Message metadata not found for message id " + messageId);
- }
- stmt.close();
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Deleted metadata for message " + messageId);
- }
-
- stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_CONTENT);
- stmt.setLong(1,messageId);
- results = stmt.executeUpdate();
-
- if(results != mmd.getContentChunkCount())
- {
- if (localTx)
- {
- abortTran(storeContext);
- }
- throw new AMQException("Unexpected number of content chunks when deleting message. Expected " + mmd.getContentChunkCount() + " but found " + results);
-
- }
-
- if (localTx)
- {
- commitTran(storeContext);
- }
- }
- catch (SQLException e)
- {
- if ((conn != null) && localTx)
- {
- abortTran(storeContext);
- }
-
- throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, e);
- }
-
- }
-
- public void createExchange(Exchange exchange) throws AMQException
- {
- if (_state != State.RECOVERING)
- {
- try
- {
- Connection conn = null;
-
- try
- {
- conn = newConnection();
-
- PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_EXCHANGE);
- stmt.setString(1, exchange.getName().toString());
- stmt.setString(2, exchange.getType().toString());
- stmt.setShort(3, exchange.isAutoDelete() ? (short) 1 : (short) 0);
- stmt.execute();
- stmt.close();
- conn.commit();
-
- }
- finally
- {
- if(conn != null)
- {
- conn.close();
- }
- }
- }
- catch (SQLException e)
- {
- throw new AMQException("Error writing Exchange with name " + exchange.getName() + " to database: " + e, e);
- }
- }
-
- }
-
- public void removeExchange(Exchange exchange) throws AMQException
- {
- Connection conn = null;
-
- try
- {
- conn = newConnection();
- PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_EXCHANGE);
- stmt.setString(1, exchange.getName().toString());
- int results = stmt.executeUpdate();
- if(results == 0)
- {
- throw new AMQException("Exchange " + exchange.getName() + " not found");
- }
- else
- {
- conn.commit();
- stmt.close();
- }
- }
- catch (SQLException e)
- {
- throw new AMQException("Error writing deleting with name " + exchange.getName() + " from database: " + e, e);
- }
- finally
- {
- if(conn != null)
- {
- try
- {
- conn.close();
- }
- catch (SQLException e)
- {
- _logger.error(e);
- }
- }
-
- }
- }
-
- public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args)
- throws AMQException
- {
- if (_state != State.RECOVERING)
- {
- Connection conn = null;
-
-
- try
- {
- conn = newConnection();
- PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_BINDINGS);
- stmt.setString(1, exchange.getName().toString() );
- stmt.setString(2, queue.getName().toString());
- stmt.setString(3, routingKey == null ? null : routingKey.toString());
- if(args != null)
- {
- /* This would be the Java 6 way of setting a Blob
- Blob blobArgs = conn.createBlob();
- blobArgs.setBytes(0, args.getDataAsBytes());
- stmt.setBlob(4, blobArgs);
- */
- byte[] bytes = args.getDataAsBytes();
- ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
- stmt.setBinaryStream(4, bis, bytes.length);
- }
- else
- {
- stmt.setNull(4, Types.BLOB);
- }
-
- stmt.executeUpdate();
- conn.commit();
- stmt.close();
- }
- catch (SQLException e)
- {
- throw new AMQException("Error writing binding for AMQQueue with name " + queue.getName() + " to exchange "
- + exchange.getName() + " to database: " + e, e);
- }
- finally
- {
- if(conn != null)
- {
- try
- {
- conn.close();
- }
- catch (SQLException e)
- {
- _logger.error(e);
- }
- }
-
- }
-
- }
-
-
- }
-
- public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args)
- throws AMQException
- {
- Connection conn = null;
-
-
- try
- {
- conn = newConnection();
- // exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255), arguments blob
- PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_BINDINGS);
- stmt.setString(1, exchange.getName().toString() );
- stmt.setString(2, queue.getName().toString());
- stmt.setString(3, routingKey == null ? null : routingKey.toString());
-
-
- if(stmt.executeUpdate() != 1)
- {
- throw new AMQException("Queue binding for queue with name " + queue.getName() + " to exchange "
- + exchange.getName() + " not found");
- }
- conn.commit();
- stmt.close();
- }
- catch (SQLException e)
- {
- throw new AMQException("Error removing binding for AMQQueue with name " + queue.getName() + " to exchange "
- + exchange.getName() + " in database: " + e, e);
- }
- finally
- {
- if(conn != null)
- {
- try
- {
- conn.close();
- }
- catch (SQLException e)
- {
- _logger.error(e);
- }
- }
-
- }
-
-
- }
-
- public void createQueue(AMQQueue queue) throws AMQException
- {
- createQueue(queue, null);
- }
-
- public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException
- {
- _logger.debug("public void createQueue(AMQQueue queue = " + queue + "): called");
-
- if (_state != State.RECOVERING)
- {
- try
- {
- Connection conn = newConnection();
-
- PreparedStatement stmt =
- conn.prepareStatement(INSERT_INTO_QUEUE);
-
- stmt.setString(1, queue.getName().toString());
- stmt.setString(2, queue.getOwner() == null ? null : queue.getOwner().toString());
-
- stmt.execute();
-
- stmt.close();
-
- conn.commit();
-
- conn.close();
- }
- catch (SQLException e)
- {
- throw new AMQException("Error writing AMQQueue with name " + queue.getName() + " to database: " + e, e);
- }
- }
- }
-
- private Connection newConnection() throws SQLException
- {
- final Connection connection = DriverManager.getConnection(_connectionURL);
- return connection;
- }
-
- public void removeQueue(final AMQQueue queue) throws AMQException
- {
- AMQShortString name = queue.getName();
- _logger.debug("public void removeQueue(AMQShortString name = " + name + "): called");
- Connection conn = null;
-
-
- try
- {
- conn = newConnection();
- PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE);
- stmt.setString(1, name.toString());
- int results = stmt.executeUpdate();
-
-
- if (results == 0)
- {
- throw new AMQException("Queue " + name + " not found");
- }
-
- conn.commit();
- stmt.close();
- }
- catch (SQLException e)
- {
- throw new AMQException("Error writing deleting with name " + name + " from database: " + e, e);
- }
- finally
- {
- if(conn != null)
- {
- try
- {
- conn.close();
- }
- catch (SQLException e)
- {
- _logger.error(e);
- }
- }
-
- }
-
-
- }
-
- public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
- {
- AMQShortString name = queue.getName();
-
- boolean localTx = getOrCreateTransaction(context);
- Connection conn = getConnection(context);
- ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
-
- try
- {
- PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY);
- stmt.setString(1,name.toString());
- stmt.setLong(2,messageId);
- stmt.executeUpdate();
- connWrapper.requiresCommit();
-
- if(localTx)
- {
- commitTran(context);
- }
-
-
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Enqueuing message " + messageId + " on queue " + name + "[Connection" + conn + "]");
- }
- }
- catch (SQLException e)
- {
- if(localTx)
- {
- abortTran(context);
- }
- _logger.error("Failed to enqueue: " + e, e);
- throw new AMQException("Error writing enqueued message with id " + messageId + " for queue " + name
- + " to database", e);
- }
-
- }
-
- public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
- {
- AMQShortString name = queue.getName();
-
- boolean localTx = getOrCreateTransaction(context);
- Connection conn = getConnection(context);
- ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
-
- try
- {
- PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE_ENTRY);
- stmt.setString(1,name.toString());
- stmt.setLong(2,messageId);
- int results = stmt.executeUpdate();
-
- connWrapper.requiresCommit();
-
- if(results != 1)
- {
- throw new AMQException("Unable to find message with id " + messageId + " on queue " + name);
- }
-
- if(localTx)
- {
- commitTran(context);
- }
-
-
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Dequeuing message " + messageId + " on queue " + name + "[Connection" + conn + "]");
- }
- }
- catch (SQLException e)
- {
- if(localTx)
- {
- abortTran(context);
- }
- _logger.error("Failed to dequeue: " + e, e);
- throw new AMQException("Error deleting enqueued message with id " + messageId + " for queue " + name
- + " from database", e);
- }
-
- }
-
- private static final class ConnectionWrapper
- {
- private final Connection _connection;
- private boolean _requiresCommit;
-
- public ConnectionWrapper(Connection conn)
- {
- _connection = conn;
- }
-
- public void setRequiresCommit()
- {
- _requiresCommit = true;
- }
-
- public boolean requiresCommit()
- {
- return _requiresCommit;
- }
-
- public Connection getConnection()
- {
- return _connection;
- }
- }
-
- public void beginTran(StoreContext context) throws AMQException
- {
- if (context.getPayload() != null)
- {
- throw new AMQException("Fatal internal error: transactional context is not empty at beginTran: "
- + context.getPayload());
- }
- else
- {
- try
- {
- Connection conn = newConnection();
-
-
- context.setPayload(new ConnectionWrapper(conn));
- }
- catch (SQLException e)
- {
- throw new AMQException("Error starting transaction: " + e, e);
- }
- }
- }
-
- public void commitTran(StoreContext context) throws AMQException
- {
- ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
-
- if (connWrapper == null)
- {
- throw new AMQException("Fatal internal error: transactional context is empty at commitTran");
- }
-
- try
- {
- Connection conn = connWrapper.getConnection();
- if(connWrapper.requiresCommit())
- {
- conn.commit();
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("commit tran completed");
- }
-
- }
- conn.close();
- }
- catch (SQLException e)
- {
- throw new AMQException("Error commit tx: " + e, e);
- }
- finally
- {
- context.setPayload(null);
- }
- }
-
- public void abortTran(StoreContext context) throws AMQException
- {
- ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
-
- if (connWrapper == null)
- {
- throw new AMQException("Fatal internal error: transactional context is empty at abortTran");
- }
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("abort tran called: " + connWrapper.getConnection());
- }
-
- try
- {
- Connection conn = connWrapper.getConnection();
- if(connWrapper.requiresCommit())
- {
- conn.rollback();
- }
-
- conn.close();
- }
- catch (SQLException e)
- {
- throw new AMQException("Error aborting transaction: " + e, e);
- }
- finally
- {
- context.setPayload(null);
- }
- }
-
- public boolean inTran(StoreContext context)
- {
- return context.getPayload() != null;
- }
-
- public Long getNewMessageId()
- {
- return _messageId.getAndIncrement();
- }
-
- public void storeContentBodyChunk(StoreContext context,
- Long messageId,
- int index,
- ContentChunk contentBody,
- boolean lastContentBody) throws AMQException
- {
- boolean localTx = getOrCreateTransaction(context);
- Connection conn = getConnection(context);
- ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
-
- try
- {
- PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_CONTENT);
- stmt.setLong(1,messageId);
- stmt.setInt(2, index);
- byte[] chunkData = new byte[contentBody.getSize()];
- contentBody.getData().duplicate().get(chunkData);
- /* this would be the Java 6 way of doing things
- Blob dataAsBlob = conn.createBlob();
- dataAsBlob.setBytes(1L, chunkData);
- stmt.setBlob(3, dataAsBlob);
- */
- ByteArrayInputStream bis = new ByteArrayInputStream(chunkData);
- stmt.setBinaryStream(3, bis, chunkData.length);
- stmt.executeUpdate();
- connWrapper.requiresCommit();
-
- if(localTx)
- {
- commitTran(context);
- }
- }
- catch (SQLException e)
- {
- if(localTx)
- {
- abortTran(context);
- }
-
- throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, e);
- }
-
- }
-
- public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData mmd)
- throws AMQException
- {
-
- boolean localTx = getOrCreateTransaction(context);
- Connection conn = getConnection(context);
- ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
-
- try
- {
-
- PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_META_DATA);
- stmt.setLong(1,messageId);
- stmt.setString(2, mmd.getMessagePublishInfo().getExchange().toString());
- stmt.setString(3, mmd.getMessagePublishInfo().getRoutingKey().toString());
- stmt.setShort(4, mmd.getMessagePublishInfo().isMandatory() ? (short) 1 : (short) 0);
- stmt.setShort(5, mmd.getMessagePublishInfo().isImmediate() ? (short) 1 : (short) 0);
-
- ContentHeaderBody headerBody = mmd.getContentHeaderBody();
- final int bodySize = headerBody.getSize();
- byte[] underlying = new byte[bodySize];
- ByteBuffer buf = ByteBuffer.wrap(underlying);
- headerBody.writePayload(buf);
-/*
- Blob dataAsBlob = conn.createBlob();
- dataAsBlob.setBytes(1L, underlying);
- stmt.setBlob(6, dataAsBlob);
-*/
- ByteArrayInputStream bis = new ByteArrayInputStream(underlying);
- stmt.setBinaryStream(6,bis,underlying.length);
-
- stmt.setInt(7, mmd.getContentChunkCount());
-
- stmt.executeUpdate();
- connWrapper.requiresCommit();
-
- if(localTx)
- {
- commitTran(context);
- }
- }
- catch (SQLException e)
- {
- if(localTx)
- {
- abortTran(context);
- }
-
- throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, e);
- }
-
-
- }
-
- public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
- {
- boolean localTx = getOrCreateTransaction(context);
- Connection conn = getConnection(context);
-
-
- try
- {
-
- PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_META_DATA);
- stmt.setLong(1,messageId);
- ResultSet rs = stmt.executeQuery();
-
- if(rs.next())
- {
- final AMQShortString exchange = new AMQShortString(rs.getString(1));
- final AMQShortString routingKey = rs.getString(2) == null ? null : new AMQShortString(rs.getString(2));
- final boolean mandatory = (rs.getShort(3) != (short)0);
- final boolean immediate = (rs.getShort(4) != (short)0);
- MessagePublishInfo info = new MessagePublishInfo()
- {
-
- public AMQShortString getExchange()
- {
- return exchange;
- }
-
- public void setExchange(AMQShortString exchange)
- {
-
- }
-
- public boolean isImmediate()
- {
- return immediate;
- }
-
- public boolean isMandatory()
- {
- return mandatory;
- }
-
- public AMQShortString getRoutingKey()
- {
- return routingKey;
- }
- } ;
-
- Blob dataAsBlob = rs.getBlob(5);
-
- byte[] dataAsBytes = dataAsBlob.getBytes(1,(int) dataAsBlob.length());
- ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
-
- ContentHeaderBody chb = ContentHeaderBody.createFromBuffer(buf, dataAsBytes.length);
-
- if(localTx)
- {
- commitTran(context);
- }
-
- return new MessageMetaData(info, chb, rs.getInt(6));
-
- }
- else
- {
- if(localTx)
- {
- abortTran(context);
- }
- throw new AMQException("Metadata not found for message with id " + messageId);
- }
- }
- catch (SQLException e)
- {
- if(localTx)
- {
- abortTran(context);
- }
-
- throw new AMQException("Error reading AMQMessage with id " + messageId + " from database: " + e, e);
- }
-
-
- }
-
- public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
- {
- boolean localTx = getOrCreateTransaction(context);
- Connection conn = getConnection(context);
-
-
- try
- {
-
- PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_CONTENT);
- stmt.setLong(1,messageId);
- stmt.setInt(2, index);
- ResultSet rs = stmt.executeQuery();
-
- if(rs.next())
- {
- Blob dataAsBlob = rs.getBlob(1);
-
- final int size = (int) dataAsBlob.length();
- byte[] dataAsBytes = dataAsBlob.getBytes(1, size);
- final ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
-
- ContentChunk cb = new ContentChunk()
- {
-
- public int getSize()
- {
- return size;
- }
-
- public ByteBuffer getData()
- {
- return buf;
- }
-
- public void reduceToFit()
- {
-
- }
- };
-
- if(localTx)
- {
- commitTran(context);
- }
-
- return cb;
-
- }
- else
- {
- if(localTx)
- {
- abortTran(context);
- }
- throw new AMQException("Message not found for message with id " + messageId);
- }
- }
- catch (SQLException e)
- {
- if(localTx)
- {
- abortTran(context);
- }
-
- throw new AMQException("Error reading AMQMessage with id " + messageId + " from database: " + e, e);
- }
-
-
-
- }
-
- public boolean isPersistent()
- {
- return true;
- }
-
- private void checkNotClosed() throws MessageStoreClosedException
- {
- if (_closed.get())
- {
- throw new MessageStoreClosedException();
- }
- }
-
-
- private static final class ProcessAction
- {
- private final AMQQueue _queue;
- private final StoreContext _context;
- private final AMQMessage _message;
-
- public ProcessAction(AMQQueue queue, StoreContext context, AMQMessage message)
- {
- _queue = queue;
- _context = context;
- _message = message;
- }
-
- public void process() throws AMQException
- {
- _queue.enqueue(_context, _message);
-
- }
-
- }
-
-
- private void deliverMessages(final StoreContext context, Map<AMQShortString, AMQQueue> queues)
- throws SQLException, AMQException
- {
- Map<Long, AMQMessage> msgMap = new HashMap<Long,AMQMessage>();
- List<ProcessAction> actions = new ArrayList<ProcessAction>();
-
- Map<AMQShortString, Integer> queueRecoveries = new TreeMap<AMQShortString, Integer>();
-
- final boolean inLocaltran = inTran(context);
- Connection conn = null;
- try
- {
-
- if(inLocaltran)
- {
- conn = getConnection(context);
- }
- else
- {
- conn = newConnection();
- }
-
-
- MessageHandleFactory messageHandleFactory = new MessageHandleFactory();
- long maxId = 1;
-
- TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null);
-
- Statement stmt = conn.createStatement();
- ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY);
-
-
- while (rs.next())
- {
-
-
-
- AMQShortString queueName = new AMQShortString(rs.getString(1));
-
-
- AMQQueue queue = queues.get(queueName);
- if (queue == null)
- {
- queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, null, false, _virtualHost, null);
-
- _virtualHost.getQueueRegistry().registerQueue(queue);
- queues.put(queueName, queue);
- }
-
- long messageId = rs.getLong(2);
- maxId = Math.max(maxId, messageId);
- AMQMessage message = msgMap.get(messageId);
-
- if(message != null)
- {
- message.incrementReference();
- }
- else
- {
- message = new AMQMessage(messageId, this, messageHandleFactory, txnContext);
- msgMap.put(messageId,message);
- }
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("On recovery, delivering " + message.getMessageId() + " to " + queue.getName());
- }
-
- if (_logger.isInfoEnabled())
- {
- Integer count = queueRecoveries.get(queueName);
- if (count == null)
- {
- count = 0;
- }
-
- queueRecoveries.put(queueName, ++count);
-
- }
-
- actions.add(new ProcessAction(queue, context, message));
-
- }
-
- for(ProcessAction action : actions)
- {
- action.process();
- }
-
- _messageId.set(maxId + 1);
- }
- catch (SQLException e)
- {
- _logger.error("Error: " + e, e);
- throw e;
- }
- finally
- {
- if (inLocaltran && conn != null)
- {
- conn.close();
- }
- }
-
- if (_logger.isInfoEnabled())
- {
- _logger.info("Recovered message counts: " + queueRecoveries);
- }
- }
-
- private Connection getConnection(final StoreContext context)
- {
- return ((ConnectionWrapper)context.getPayload()).getConnection();
- }
-
- private boolean getOrCreateTransaction(StoreContext context) throws AMQException
- {
-
- ConnectionWrapper tx = (ConnectionWrapper) context.getPayload();
- if (tx == null)
- {
- beginTran(context);
- return true;
- }
-
- return false;
- }
-
- private synchronized void stateTransition(State requiredState, State newState) throws AMQException
- {
- if (_state != requiredState)
- {
- throw new AMQException("Cannot transition to the state: " + newState + "; need to be in state: " + requiredState
- + "; currently in state: " + _state);
- }
-
- _state = newState;
- }
-}