summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2008-04-16 20:30:38 +0000
committerRobert Godfrey <rgodfrey@apache.org>2008-04-16 20:30:38 +0000
commitcf5c341c92b554b2b8d4491dc846618f038e0da8 (patch)
treed551a659a87c2acceff887e1debe44620b4f50b8
parent5104d7d22e9cec1501cf4b1fcce3526a8e4a5e3c (diff)
downloadqpid-python-cf5c341c92b554b2b8d4491dc846618f038e0da8.tar.gz
QPID-156 : Add an Apache licensed store - created an experimental Derby based store
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.x@648834 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/pom.xml5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java1445
-rw-r--r--java/pom.xml5
3 files changed, 1455 insertions, 0 deletions
diff --git a/java/broker/pom.xml b/java/broker/pom.xml
index 1dd613e5f9..153e186a26 100644
--- a/java/broker/pom.xml
+++ b/java/broker/pom.xml
@@ -128,6 +128,11 @@
<artifactId>org.apache.felix.framework</artifactId>
<version>1.0.0</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.derby</groupId>
+ <artifactId>derby</artifactId>
+ <version>10.3.2.1</version>
+ </dependency>
</dependencies>
<build>
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
new file mode 100644
index 0000000000..4664fd5e14
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
@@ -0,0 +1,1445 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.server.queue.QueueRegistry;
+
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.MessageHandleFactory;
+import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+
+import java.io.File;
+import java.io.ByteArrayInputStream;
+import java.sql.DriverManager;
+import java.sql.Driver;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Blob;
+import java.sql.Types;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.TreeMap;
+
+
+public class DerbyMessageStore implements MessageStore
+{
+
+ private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class);
+
+ private static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
+
+
+ private static final String SQL_DRIVER_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
+
+ private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION";
+
+ private static final String EXCHANGE_TABLE_NAME = "QPID_EXCHANGE";
+ private static final String QUEUE_TABLE_NAME = "QPID_QUEUE";
+ private static final String BINDINGS_TABLE_NAME = "QPID_BINDINGS";
+ private static final String QUEUE_ENTRY_TABLE_NAME = "QPID_QUEUE_ENTRY";
+ private static final String MESSAGE_META_DATA_TABLE_NAME = "QPID_MESSAGE_META_DATA";
+ private static final String MESSAGE_CONTENT_TABLE_NAME = "QPID_MESSAGE_CONTENT";
+
+ private static final int DB_VERSION = 1;
+
+
+
+ private VirtualHost _virtualHost;
+ private static Class<Driver> DRIVER_CLASS;
+
+ private final AtomicLong _messageId = new AtomicLong(1);
+ private AtomicBoolean _closed = new AtomicBoolean(false);
+
+ private String _connectionURL;
+
+
+
+ private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+DB_VERSION_TABLE_NAME+" ( version int not null )";
+ private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+DB_VERSION_TABLE_NAME+" ( version ) VALUES ( ? )";
+ private static final String CREATE_EXCHANGE_TABLE = "CREATE TABLE "+EXCHANGE_TABLE_NAME+" ( name varchar(255) not null, type varchar(255) not null, autodelete SMALLINT not null, PRIMARY KEY ( name ) )";
+ private static final String CREATE_QUEUE_TABLE = "CREATE TABLE "+QUEUE_TABLE_NAME+" ( name varchar(255) not null, owner varchar(255), PRIMARY KEY ( name ) )";
+ private static final String CREATE_BINDINGS_TABLE = "CREATE TABLE "+BINDINGS_TABLE_NAME+" ( exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255) not null, arguments blob , PRIMARY KEY ( exchange_name, queue_name, binding_key ) )";
+ private static final String CREATE_QUEUE_ENTRY_TABLE = "CREATE TABLE "+QUEUE_ENTRY_TABLE_NAME+" ( queue_name varchar(255) not null, message_id bigint not null, PRIMARY KEY (queue_name, message_id) )";
+ private static final String CREATE_MESSAGE_META_DATA_TABLE = "CREATE TABLE "+MESSAGE_META_DATA_TABLE_NAME+" ( message_id bigint not null, exchange_name varchar(255) not null, routing_key varchar(255), flag_mandatory smallint not null, flag_immediate smallint not null, content_header blob, chunk_count int not null, PRIMARY KEY ( message_id ) )";
+ private static final String CREATE_MESSAGE_CONTENT_TABLE = "CREATE TABLE "+MESSAGE_CONTENT_TABLE_NAME+" ( message_id bigint not null, chunk_id int not null, content_chunk blob , PRIMARY KEY (message_id, chunk_id) )";
+ private static final String SELECT_FROM_QUEUE = "SELECT name, owner FROM " + QUEUE_TABLE_NAME;
+ private static final String SELECT_FROM_EXCHANGE = "SELECT name, type, autodelete FROM " + EXCHANGE_TABLE_NAME;
+ private static final String SELECT_FROM_BINDINGS =
+ "SELECT queue_name, binding_key, arguments FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ?";
+ private static final String DELETE_FROM_MESSAGE_META_DATA = "DELETE FROM " + MESSAGE_META_DATA_TABLE_NAME + " WHERE message_id = ?";
+ private static final String DELETE_FROM_MESSAGE_CONTENT = "DELETE FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ?";
+ private static final String INSERT_INTO_EXCHANGE = "INSERT INTO " + EXCHANGE_TABLE_NAME + " ( name, type, autodelete ) VALUES ( ?, ?, ? )";
+ private static final String DELETE_FROM_EXCHANGE = "DELETE FROM " + EXCHANGE_TABLE_NAME + " WHERE name = ?";
+ private static final String INSERT_INTO_BINDINGS = "INSERT INTO " + BINDINGS_TABLE_NAME + " ( exchange_name, queue_name, binding_key, arguments ) values ( ?, ?, ?, ? )";
+ private static final String DELETE_FROM_BINDINGS = "DELETE FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ? AND queue_name = ? AND binding_key = ?";
+ private static final String INSERT_INTO_QUEUE = "INSERT INTO " + QUEUE_TABLE_NAME + " (name, owner) VALUES (?, ?)";
+ private static final String DELETE_FROM_QUEUE = "DELETE FROM " + QUEUE_TABLE_NAME + " WHERE name = ?";
+ private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_name, message_id) values (?,?)";
+ private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_name = ? AND message_id =?";
+ private static final String INSERT_INTO_MESSAGE_CONTENT = "INSERT INTO " + MESSAGE_CONTENT_TABLE_NAME + "( message_id, chunk_id, content_chunk ) values (?, ?, ?)";
+ private static final String INSERT_INTO_MESSAGE_META_DATA = "INSERT INTO " + MESSAGE_META_DATA_TABLE_NAME + "( message_id , exchange_name , routing_key , flag_mandatory , flag_immediate , content_header , chunk_count ) values (?, ?, ?, ?, ?, ?, ?)";
+ private static final String SELECT_FROM_MESSAGE_META_DATA =
+ "SELECT exchange_name , routing_key , flag_mandatory , flag_immediate , content_header , chunk_count FROM " + MESSAGE_META_DATA_TABLE_NAME + " WHERE message_id = ?";
+ private static final String SELECT_FROM_MESSAGE_CONTENT =
+ "SELECT content_chunk FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ? and chunk_id = ?";
+ private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_name, message_id FROM " + QUEUE_ENTRY_TABLE_NAME;
+ private static final String TABLE_EXISTANCE_QUERY = "SELECT 1 FROM SYS.SYSTABLES WHERE TABLENAME = ?";
+
+
+ private enum State
+ {
+ INITIAL,
+ CONFIGURING,
+ RECOVERING,
+ STARTED,
+ CLOSING,
+ CLOSED
+ }
+
+ private State _state = State.INITIAL;
+
+
+ public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception
+ {
+ stateTransition(State.INITIAL, State.CONFIGURING);
+
+ initialiseDriver();
+
+ _virtualHost = virtualHost;
+
+ _logger.info("Configuring Derby message store for virtual host " + virtualHost.getName());
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
+ final String databasePath = config.getString(base + "." + ENVIRONMENT_PATH_PROPERTY, "derbyDB");
+
+ File environmentPath = new File(databasePath);
+ if (!environmentPath.exists())
+ {
+ if (!environmentPath.mkdirs())
+ {
+ throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. "
+ + "Ensure the path is correct and that the permissions are correct.");
+ }
+ }
+
+ createOrOpenDatabase(databasePath);
+
+ // this recovers durable queues and persistent messages
+
+ recover();
+
+ stateTransition(State.RECOVERING, State.STARTED);
+
+ }
+
+ private static synchronized void initialiseDriver() throws ClassNotFoundException
+ {
+ if(DRIVER_CLASS == null)
+ {
+ DRIVER_CLASS = (Class<Driver>) Class.forName(SQL_DRIVER_NAME);
+ }
+ }
+
+ private void createOrOpenDatabase(final String environmentPath) throws SQLException
+ {
+ _connectionURL = "jdbc:derby:" + environmentPath + "/" + _virtualHost.getName() + ";create=true";
+
+ Connection conn = newConnection();
+
+ createVersionTable(conn);
+ createExchangeTable(conn);
+ createQueueTable(conn);
+ createBindingsTable(conn);
+ createQueueEntryTable(conn);
+ createMessageMetaDataTable(conn);
+ createMessageContentTable(conn);
+
+ conn.close();
+ }
+
+
+
+ private void createVersionTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(DB_VERSION_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+
+ stmt.execute(CREATE_DB_VERSION_TABLE);
+ stmt.close();
+
+ PreparedStatement pstmt = conn.prepareStatement(INSERT_INTO_DB_VERSION);
+ pstmt.setInt(1, DB_VERSION);
+ pstmt.execute();
+ pstmt.close();
+ }
+
+ }
+
+
+ private void createExchangeTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(EXCHANGE_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+
+ stmt.execute(CREATE_EXCHANGE_TABLE);
+ stmt.close();
+ }
+ }
+
+ private void createQueueTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(QUEUE_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ stmt.execute(CREATE_QUEUE_TABLE);
+ stmt.close();
+ }
+ }
+
+ private void createBindingsTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(BINDINGS_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ stmt.execute(CREATE_BINDINGS_TABLE);
+
+ stmt.close();
+ }
+
+ }
+
+ private void createQueueEntryTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(QUEUE_ENTRY_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ stmt.execute(CREATE_QUEUE_ENTRY_TABLE);
+
+ stmt.close();
+ }
+
+ }
+
+ private void createMessageMetaDataTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(MESSAGE_META_DATA_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ stmt.execute(CREATE_MESSAGE_META_DATA_TABLE);
+
+ stmt.close();
+ }
+
+ }
+
+
+ private void createMessageContentTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(MESSAGE_CONTENT_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ stmt.execute(CREATE_MESSAGE_CONTENT_TABLE);
+
+ stmt.close();
+ }
+
+ }
+
+
+
+ private boolean tableExists(final String tableName, final Connection conn) throws SQLException
+ {
+ PreparedStatement stmt = conn.prepareStatement(TABLE_EXISTANCE_QUERY);
+ stmt.setString(1, tableName);
+ ResultSet rs = stmt.executeQuery();
+ boolean exists = rs.next();
+ rs.close();
+ stmt.close();
+ return exists;
+ }
+
+ public void recover() throws AMQException
+ {
+ stateTransition(State.CONFIGURING, State.RECOVERING);
+
+ _logger.info("Recovering persistent state...");
+ StoreContext context = new StoreContext();
+
+ try
+ {
+ Map<AMQShortString, AMQQueue> queues = loadQueues();
+
+ recoverExchanges();
+
+ try
+ {
+
+ beginTran(context);
+
+ deliverMessages(context, queues);
+ _logger.info("Persistent state recovered successfully");
+ commitTran(context);
+
+ }
+ finally
+ {
+ if(inTran(context))
+ {
+ abortTran(context);
+ }
+ }
+ }
+ catch (SQLException e)
+ {
+
+ throw new AMQException("Error recovering persistent state: " + e, e);
+ }
+
+ }
+
+ private Map<AMQShortString, AMQQueue> loadQueues() throws SQLException, AMQException
+ {
+ Connection conn = newConnection();
+
+
+ Statement stmt = conn.createStatement();
+ ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE);
+ Map<AMQShortString, AMQQueue> queueMap = new HashMap<AMQShortString, AMQQueue>();
+ while(rs.next())
+ {
+ String queueName = rs.getString(1);
+ String owner = rs.getString(2);
+ AMQShortString queueNameShortString = new AMQShortString(queueName);
+ AMQQueue q = new AMQQueue(queueNameShortString, true, owner == null ? null : new AMQShortString(owner), false, _virtualHost);
+ _virtualHost.getQueueRegistry().registerQueue(q);
+ queueMap.put(queueNameShortString,q);
+
+ }
+ return queueMap;
+ }
+
+ private void recoverExchanges() throws AMQException, SQLException
+ {
+ for (Exchange exchange : loadExchanges())
+ {
+ recoverExchange(exchange);
+ }
+ }
+
+
+ private List<Exchange> loadExchanges() throws AMQException, SQLException
+ {
+
+ List<Exchange> exchanges = new ArrayList<Exchange>();
+ Connection conn = null;
+ try
+ {
+ conn = newConnection();
+
+
+ Statement stmt = conn.createStatement();
+ ResultSet rs = stmt.executeQuery(SELECT_FROM_EXCHANGE);
+
+ Exchange exchange;
+ while(rs.next())
+ {
+ String exchangeName = rs.getString(1);
+ String type = rs.getString(2);
+ boolean autoDelete = rs.getShort(3) != 0;
+
+ exchange = _virtualHost.getExchangeFactory().createExchange(new AMQShortString(exchangeName), new AMQShortString(type), true, autoDelete, 0);
+ _virtualHost.getExchangeRegistry().registerExchange(exchange);
+ exchanges.add(exchange);
+
+ }
+ return exchanges;
+
+ }
+ finally
+ {
+ if(conn != null)
+ {
+ conn.close();
+ }
+ }
+
+ }
+
+ private void recoverExchange(Exchange exchange) throws AMQException, SQLException
+ {
+ _logger.info("Recovering durable exchange " + exchange.getName() + " of type " + exchange.getType() + "...");
+
+ QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
+
+ Connection conn = null;
+ try
+ {
+ conn = newConnection();
+
+ PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_BINDINGS);
+ stmt.setString(1, exchange.getName().toString());
+
+ ResultSet rs = stmt.executeQuery();
+
+
+ while(rs.next())
+ {
+ String queueName = rs.getString(1);
+ String bindingKey = rs.getString(2);
+ Blob arguments = rs.getBlob(3);
+
+
+ AMQQueue queue = queueRegistry.getQueue(new AMQShortString(queueName));
+ if (queue == null)
+ {
+ _logger.error("Unkown queue: " + queueName + " cannot be bound to exchange: "
+ + exchange.getName());
+ }
+ else
+ {
+ _logger.info("Restoring binding: (Exchange: " + exchange.getName() + ", Queue: " + queueName
+ + ", Routing Key: " + bindingKey + ", Arguments: " + arguments
+ + ")");
+
+ FieldTable argumentsFT = null;
+ if(arguments != null)
+ {
+ byte[] argumentBytes = arguments.getBytes(0, (int) arguments.length());
+ ByteBuffer buf = ByteBuffer.wrap(argumentBytes);
+ argumentsFT = new FieldTable(buf,arguments.length());
+ }
+
+ queue.bind(bindingKey == null ? null : new AMQShortString(bindingKey), argumentsFT, exchange);
+ }
+ }
+ }
+ finally
+ {
+ if(conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ public void close() throws Exception
+ {
+ _closed.getAndSet(true);
+ }
+
+ public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException
+ {
+
+ boolean localTx = getOrCreateTransaction(storeContext);
+
+ Connection conn = getConnection(storeContext);
+ ConnectionWrapper wrapper = (ConnectionWrapper) storeContext.getPayload();
+
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Message Id: " + messageId + " Removing");
+ }
+
+ // first we need to look up the header to get the chunk count
+ MessageMetaData mmd = getMessageMetaData(storeContext, messageId);
+ try
+ {
+ PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_META_DATA);
+ stmt.setLong(1,messageId);
+ wrapper.setRequiresCommit();
+ int results = stmt.executeUpdate();
+
+ if (results == 0)
+ {
+ if (localTx)
+ {
+ abortTran(storeContext);
+ }
+
+ throw new AMQException("Message metadata not found for message id " + messageId);
+ }
+ stmt.close();
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Deleted metadata for message " + messageId);
+ }
+
+ stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_CONTENT);
+ stmt.setLong(1,messageId);
+ results = stmt.executeUpdate();
+
+ if(results != mmd.getContentChunkCount())
+ {
+ if (localTx)
+ {
+ abortTran(storeContext);
+ }
+ throw new AMQException("Unexpected number of content chunks when deleting message. Expected " + mmd.getContentChunkCount() + " but found " + results);
+
+ }
+
+ if (localTx)
+ {
+ commitTran(storeContext);
+ }
+ }
+ catch (SQLException e)
+ {
+ if ((conn != null) && localTx)
+ {
+ abortTran(storeContext);
+ }
+
+ throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, e);
+ }
+
+ }
+
+ public void createExchange(Exchange exchange) throws AMQException
+ {
+ if (_state != State.RECOVERING)
+ {
+ try
+ {
+ Connection conn = null;
+
+ try
+ {
+ conn = newConnection();
+
+ PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_EXCHANGE);
+ stmt.setString(1, exchange.getName().toString());
+ stmt.setString(2, exchange.getType().toString());
+ stmt.setShort(3, exchange.isAutoDelete() ? (short) 1 : (short) 0);
+ stmt.execute();
+ stmt.close();
+ conn.commit();
+
+ }
+ finally
+ {
+ if(conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQException("Error writing Exchange with name " + exchange.getName() + " to database: " + e, e);
+ }
+ }
+
+ }
+
+ public void removeExchange(Exchange exchange) throws AMQException
+ {
+ Connection conn = null;
+
+ try
+ {
+ conn = newConnection();
+ PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_EXCHANGE);
+ stmt.setString(1, exchange.getName().toString());
+ int results = stmt.executeUpdate();
+ if(results == 0)
+ {
+ throw new AMQException("Exchange " + exchange.getName() + " not found");
+ }
+ else
+ {
+ conn.commit();
+ stmt.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQException("Error writing deleting with name " + exchange.getName() + " from database: " + e, e);
+ }
+ finally
+ {
+ if(conn != null)
+ {
+ try
+ {
+ conn.close();
+ }
+ catch (SQLException e)
+ {
+ _logger.error(e);
+ }
+ }
+
+ }
+ }
+
+ public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args)
+ throws AMQException
+ {
+ if (_state != State.RECOVERING)
+ {
+ Connection conn = null;
+
+
+ try
+ {
+ conn = newConnection();
+ PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_BINDINGS);
+ stmt.setString(1, exchange.getName().toString() );
+ stmt.setString(2, queue.getName().toString());
+ stmt.setString(3, routingKey == null ? null : routingKey.toString());
+ if(args != null)
+ {
+ /* This would be the Java 6 way of setting a Blob
+ Blob blobArgs = conn.createBlob();
+ blobArgs.setBytes(0, args.getDataAsBytes());
+ stmt.setBlob(4, blobArgs);
+ */
+ ByteArrayInputStream bis = new ByteArrayInputStream(args.getDataAsBytes());
+ stmt.setBinaryStream(4, bis);
+ }
+ else
+ {
+ stmt.setNull(4, Types.BLOB);
+ }
+
+ stmt.executeUpdate();
+ conn.commit();
+ stmt.close();
+ }
+ catch (SQLException e)
+ {
+ throw new AMQException("Error writing binding for AMQQueue with name " + queue.getName() + " to exchange "
+ + exchange.getName() + " to database: " + e, e);
+ }
+ finally
+ {
+ if(conn != null)
+ {
+ try
+ {
+ conn.close();
+ }
+ catch (SQLException e)
+ {
+ _logger.error(e);
+ }
+ }
+
+ }
+
+ }
+
+
+ }
+
+ public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args)
+ throws AMQException
+ {
+ Connection conn = null;
+
+
+ try
+ {
+ conn = newConnection();
+ // exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255), arguments blob
+ PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_BINDINGS);
+ stmt.setString(1, exchange.getName().toString() );
+ stmt.setString(2, queue.getName().toString());
+ stmt.setString(3, routingKey == null ? null : routingKey.toString());
+
+
+ if(stmt.executeUpdate() != 1)
+ {
+ throw new AMQException("Queue binding for queue with name " + queue.getName() + " to exchange "
+ + exchange.getName() + " not found");
+ }
+ conn.commit();
+ stmt.close();
+ }
+ catch (SQLException e)
+ {
+ throw new AMQException("Error removing binding for AMQQueue with name " + queue.getName() + " to exchange "
+ + exchange.getName() + " in database: " + e, e);
+ }
+ finally
+ {
+ if(conn != null)
+ {
+ try
+ {
+ conn.close();
+ }
+ catch (SQLException e)
+ {
+ _logger.error(e);
+ }
+ }
+
+ }
+
+
+ }
+
+ public void createQueue(AMQQueue queue) throws AMQException
+ {
+ _logger.debug("public void createQueue(AMQQueue queue = " + queue + "): called");
+
+ if (_state != State.RECOVERING)
+ {
+ try
+ {
+ Connection conn = newConnection();
+
+ PreparedStatement stmt =
+ conn.prepareStatement(INSERT_INTO_QUEUE);
+
+ stmt.setString(1, queue.getName().toString());
+ stmt.setString(2, queue.getOwner() == null ? null : queue.getOwner().toString());
+
+ stmt.execute();
+
+ stmt.close();
+
+ conn.commit();
+
+ conn.close();
+ }
+ catch (SQLException e)
+ {
+ throw new AMQException("Error writing AMQQueue with name " + queue.getName() + " to database: " + e, e);
+ }
+ }
+ }
+
+ private Connection newConnection() throws SQLException
+ {
+ final Connection connection = DriverManager.getConnection(_connectionURL);
+ return connection;
+ }
+
+ public void removeQueue(AMQShortString name) throws AMQException
+ {
+
+ _logger.debug("public void removeQueue(AMQShortString name = " + name + "): called");
+ Connection conn = null;
+
+
+ try
+ {
+ conn = newConnection();
+ PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE);
+ stmt.setString(1, name.toString());
+ int results = stmt.executeUpdate();
+
+
+ if (results == 0)
+ {
+ throw new AMQException("Queue " + name + " not found");
+ }
+
+ conn.commit();
+ stmt.close();
+ }
+ catch (SQLException e)
+ {
+ throw new AMQException("Error writing deleting with name " + name + " from database: " + e, e);
+ }
+ finally
+ {
+ if(conn != null)
+ {
+ try
+ {
+ conn.close();
+ }
+ catch (SQLException e)
+ {
+ _logger.error(e);
+ }
+ }
+
+ }
+
+
+ }
+
+ public void enqueueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException
+ {
+
+ boolean localTx = getOrCreateTransaction(context);
+ Connection conn = getConnection(context);
+ ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
+
+ try
+ {
+ PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY);
+ stmt.setString(1,name.toString());
+ stmt.setLong(2,messageId);
+ stmt.executeUpdate();
+ connWrapper.requiresCommit();
+
+ if(localTx)
+ {
+ commitTran(context);
+ }
+
+
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Enqueuing message " + messageId + " on queue " + name + "[Connection" + conn + "]");
+ }
+ }
+ catch (SQLException e)
+ {
+ if(localTx)
+ {
+ abortTran(context);
+ }
+ _logger.error("Failed to enqueue: " + e, e);
+ throw new AMQException("Error writing enqueued message with id " + messageId + " for queue " + name
+ + " to database", e);
+ }
+
+ }
+
+ public void dequeueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException
+ {
+
+ boolean localTx = getOrCreateTransaction(context);
+ Connection conn = getConnection(context);
+ ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
+
+ try
+ {
+ PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE_ENTRY);
+ stmt.setString(1,name.toString());
+ stmt.setLong(2,messageId);
+ int results = stmt.executeUpdate();
+
+ connWrapper.requiresCommit();
+
+ if(results != 1)
+ {
+ throw new AMQException("Unable to find message with id " + messageId + " on queue " + name);
+ }
+
+ if(localTx)
+ {
+ commitTran(context);
+ }
+
+
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Dequeuing message " + messageId + " on queue " + name + "[Connection" + conn + "]");
+ }
+ }
+ catch (SQLException e)
+ {
+ if(localTx)
+ {
+ abortTran(context);
+ }
+ _logger.error("Failed to dequeue: " + e, e);
+ throw new AMQException("Error deleting enqueued message with id " + messageId + " for queue " + name
+ + " from database", e);
+ }
+
+ }
+
+ private static final class ConnectionWrapper
+ {
+ private final Connection _connection;
+ private boolean _requiresCommit;
+
+ public ConnectionWrapper(Connection conn)
+ {
+ _connection = conn;
+ }
+
+ public void setRequiresCommit()
+ {
+ _requiresCommit = true;
+ }
+
+ public boolean requiresCommit()
+ {
+ return _requiresCommit;
+ }
+
+ public Connection getConnection()
+ {
+ return _connection;
+ }
+ }
+
+ public void beginTran(StoreContext context) throws AMQException
+ {
+ if (context.getPayload() != null)
+ {
+ throw new AMQException("Fatal internal error: transactional context is not empty at beginTran: "
+ + context.getPayload());
+ }
+ else
+ {
+ try
+ {
+ Connection conn = newConnection();
+
+
+ context.setPayload(new ConnectionWrapper(conn));
+ }
+ catch (SQLException e)
+ {
+ throw new AMQException("Error starting transaction: " + e, e);
+ }
+ }
+ }
+
+ public void commitTran(StoreContext context) throws AMQException
+ {
+ ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
+
+ if (connWrapper == null)
+ {
+ throw new AMQException("Fatal internal error: transactional context is empty at commitTran");
+ }
+
+ try
+ {
+ Connection conn = connWrapper.getConnection();
+ if(connWrapper.requiresCommit())
+ {
+ conn.commit();
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("commit tran completed");
+ }
+
+ }
+ conn.close();
+ }
+ catch (SQLException e)
+ {
+ throw new AMQException("Error commit tx: " + e, e);
+ }
+ finally
+ {
+ context.setPayload(null);
+ }
+ }
+
+ public void abortTran(StoreContext context) throws AMQException
+ {
+ ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
+
+ if (connWrapper == null)
+ {
+ throw new AMQException("Fatal internal error: transactional context is empty at abortTran");
+ }
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("abort tran called: " + connWrapper.getConnection());
+ }
+
+ try
+ {
+ Connection conn = connWrapper.getConnection();
+ if(connWrapper.requiresCommit())
+ {
+ conn.rollback();
+ }
+
+ conn.close();
+ }
+ catch (SQLException e)
+ {
+ throw new AMQException("Error aborting transaction: " + e, e);
+ }
+ finally
+ {
+ context.setPayload(null);
+ }
+ }
+
+ public boolean inTran(StoreContext context)
+ {
+ return context.getPayload() != null;
+ }
+
+ public Long getNewMessageId()
+ {
+ return _messageId.getAndIncrement();
+ }
+
+ public void storeContentBodyChunk(StoreContext context,
+ Long messageId,
+ int index,
+ ContentChunk contentBody,
+ boolean lastContentBody) throws AMQException
+ {
+ boolean localTx = getOrCreateTransaction(context);
+ Connection conn = getConnection(context);
+ ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
+
+ try
+ {
+ PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_CONTENT);
+ stmt.setLong(1,messageId);
+ stmt.setInt(2, index);
+ byte[] chunkData = new byte[contentBody.getSize()];
+ contentBody.getData().duplicate().get(chunkData);
+ /* this would be the Java 6 way of doing things
+ Blob dataAsBlob = conn.createBlob();
+ dataAsBlob.setBytes(1L, chunkData);
+ stmt.setBlob(3, dataAsBlob);
+ */
+ ByteArrayInputStream bis = new ByteArrayInputStream(chunkData);
+ stmt.setBinaryStream(3, bis);
+ stmt.executeUpdate();
+ connWrapper.requiresCommit();
+
+ if(localTx)
+ {
+ commitTran(context);
+ }
+ }
+ catch (SQLException e)
+ {
+ if(localTx)
+ {
+ abortTran(context);
+ }
+
+ throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, e);
+ }
+
+ }
+
+ public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData mmd)
+ throws AMQException
+ {
+
+ boolean localTx = getOrCreateTransaction(context);
+ Connection conn = getConnection(context);
+ ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
+
+ try
+ {
+
+ PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_META_DATA);
+ stmt.setLong(1,messageId);
+ stmt.setString(2, mmd.getMessagePublishInfo().getExchange().toString());
+ stmt.setString(3, mmd.getMessagePublishInfo().getRoutingKey().toString());
+ stmt.setShort(4, mmd.getMessagePublishInfo().isMandatory() ? (short) 1 : (short) 0);
+ stmt.setShort(5, mmd.getMessagePublishInfo().isImmediate() ? (short) 1 : (short) 0);
+
+ ContentHeaderBody headerBody = mmd.getContentHeaderBody();
+ final int bodySize = headerBody.getSize();
+ byte[] underlying = new byte[bodySize];
+ ByteBuffer buf = ByteBuffer.wrap(underlying);
+ headerBody.writePayload(buf);
+/*
+ Blob dataAsBlob = conn.createBlob();
+ dataAsBlob.setBytes(1L, underlying);
+ stmt.setBlob(6, dataAsBlob);
+*/
+ ByteArrayInputStream bis = new ByteArrayInputStream(underlying);
+ stmt.setBinaryStream(6,bis);
+
+ stmt.setInt(7, mmd.getContentChunkCount());
+
+ stmt.executeUpdate();
+ connWrapper.requiresCommit();
+
+ if(localTx)
+ {
+ commitTran(context);
+ }
+ }
+ catch (SQLException e)
+ {
+ if(localTx)
+ {
+ abortTran(context);
+ }
+
+ throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, e);
+ }
+
+
+ }
+
+ public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
+ {
+ boolean localTx = getOrCreateTransaction(context);
+ Connection conn = getConnection(context);
+
+
+ try
+ {
+
+ PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_META_DATA);
+ stmt.setLong(1,messageId);
+ ResultSet rs = stmt.executeQuery();
+
+ if(rs.next())
+ {
+ final AMQShortString exchange = new AMQShortString(rs.getString(1));
+ final AMQShortString routingKey = rs.getString(2) == null ? null : new AMQShortString(rs.getString(2));
+ final boolean mandatory = (rs.getShort(3) != (short)0);
+ final boolean immediate = (rs.getShort(4) != (short)0);
+ MessagePublishInfo info = new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return exchange;
+ }
+
+ public void setExchange(AMQShortString exchange)
+ {
+
+ }
+
+ public boolean isImmediate()
+ {
+ return immediate;
+ }
+
+ public boolean isMandatory()
+ {
+ return mandatory;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return routingKey;
+ }
+ } ;
+
+ Blob dataAsBlob = rs.getBlob(5);
+
+ byte[] dataAsBytes = dataAsBlob.getBytes(1,(int) dataAsBlob.length());
+ ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
+
+ ContentHeaderBody chb = ContentHeaderBody.createFromBuffer(buf, dataAsBytes.length);
+
+ if(localTx)
+ {
+ commitTran(context);
+ }
+
+ return new MessageMetaData(info, chb, rs.getInt(6));
+
+ }
+ else
+ {
+ if(localTx)
+ {
+ abortTran(context);
+ }
+ throw new AMQException("Metadata not found for message with id " + messageId);
+ }
+ }
+ catch (SQLException e)
+ {
+ if(localTx)
+ {
+ abortTran(context);
+ }
+
+ throw new AMQException("Error reading AMQMessage with id " + messageId + " from database: " + e, e);
+ }
+
+
+ }
+
+ public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
+ {
+ boolean localTx = getOrCreateTransaction(context);
+ Connection conn = getConnection(context);
+
+
+ try
+ {
+
+ PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_CONTENT);
+ stmt.setLong(1,messageId);
+ stmt.setInt(2, index);
+ ResultSet rs = stmt.executeQuery();
+
+ if(rs.next())
+ {
+ Blob dataAsBlob = rs.getBlob(1);
+
+ final int size = (int) dataAsBlob.length();
+ byte[] dataAsBytes = dataAsBlob.getBytes(1, size);
+ final ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
+
+ ContentChunk cb = new ContentChunk()
+ {
+
+ public int getSize()
+ {
+ return size;
+ }
+
+ public ByteBuffer getData()
+ {
+ return buf;
+ }
+
+ public void reduceToFit()
+ {
+
+ }
+ };
+
+ if(localTx)
+ {
+ commitTran(context);
+ }
+
+ return cb;
+
+ }
+ else
+ {
+ if(localTx)
+ {
+ abortTran(context);
+ }
+ throw new AMQException("Message not found for message with id " + messageId);
+ }
+ }
+ catch (SQLException e)
+ {
+ if(localTx)
+ {
+ abortTran(context);
+ }
+
+ throw new AMQException("Error reading AMQMessage with id " + messageId + " from database: " + e, e);
+ }
+
+
+
+ }
+
+ private void checkNotClosed() throws MessageStoreClosedException
+ {
+ if (_closed.get())
+ {
+ throw new MessageStoreClosedException();
+ }
+ }
+
+
+ private static final class ProcessAction
+ {
+ private final AMQQueue _queue;
+ private final StoreContext _context;
+ private final AMQMessage _message;
+
+ public ProcessAction(AMQQueue queue, StoreContext context, AMQMessage message)
+ {
+ _queue = queue;
+ _context = context;
+ _message = message;
+ }
+
+ public void process() throws AMQException
+ {
+ _queue.process(_context, _queue.createEntry(_message), false);
+ }
+
+ }
+
+
+ private void deliverMessages(final StoreContext context, Map<AMQShortString, AMQQueue> queues)
+ throws SQLException, AMQException
+ {
+ Map<Long, AMQMessage> msgMap = new HashMap<Long,AMQMessage>();
+ List<ProcessAction> actions = new ArrayList<ProcessAction>();
+
+ Map<AMQShortString, Integer> queueRecoveries = new TreeMap<AMQShortString, Integer>();
+
+ final boolean inLocaltran = inTran(context);
+ Connection conn = null;
+ try
+ {
+
+ if(inLocaltran)
+ {
+ conn = getConnection(context);
+ }
+ else
+ {
+ conn = newConnection();
+ }
+
+
+ MessageHandleFactory messageHandleFactory = new MessageHandleFactory();
+ long maxId = 1;
+
+ TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null);
+
+ Statement stmt = conn.createStatement();
+ ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY);
+
+
+ while (rs.next())
+ {
+
+
+
+ AMQShortString queueName = new AMQShortString(rs.getString(1));
+
+
+ AMQQueue queue = queues.get(queueName);
+ if (queue == null)
+ {
+ queue = new AMQQueue(queueName, false, null, false, _virtualHost);
+ _virtualHost.getQueueRegistry().registerQueue(queue);
+ queues.put(queueName, queue);
+ }
+
+ long messageId = rs.getLong(2);
+ maxId = Math.max(maxId, messageId);
+ AMQMessage message = msgMap.get(messageId);
+
+ if(message != null)
+ {
+ message.incrementReference();
+ }
+ else
+ {
+ message = new AMQMessage(messageId, this, messageHandleFactory, txnContext);
+ msgMap.put(messageId,message);
+ }
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("On recovery, delivering " + message.getMessageId() + " to " + queue.getName());
+ }
+
+ if (_logger.isInfoEnabled())
+ {
+ Integer count = queueRecoveries.get(queueName);
+ if (count == null)
+ {
+ count = 0;
+ }
+
+ queueRecoveries.put(queueName, ++count);
+
+ }
+
+ actions.add(new ProcessAction(queue, context, message));
+
+ }
+
+ for(ProcessAction action : actions)
+ {
+ action.process();
+ }
+
+ _messageId.set(maxId + 1);
+ }
+ catch (SQLException e)
+ {
+ _logger.error("Error: " + e, e);
+ throw e;
+ }
+ finally
+ {
+ if (inLocaltran && conn != null)
+ {
+ conn.close();
+ }
+ }
+
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Recovered message counts: " + queueRecoveries);
+ }
+ }
+
+ private Connection getConnection(final StoreContext context)
+ {
+ return ((ConnectionWrapper)context.getPayload()).getConnection();
+ }
+
+ private boolean getOrCreateTransaction(StoreContext context) throws AMQException
+ {
+
+ ConnectionWrapper tx = (ConnectionWrapper) context.getPayload();
+ if (tx == null)
+ {
+ beginTran(context);
+ return true;
+ }
+
+ return false;
+ }
+
+ private synchronized void stateTransition(State requiredState, State newState) throws AMQException
+ {
+ if (_state != requiredState)
+ {
+ throw new AMQException("Cannot transition to the state: " + newState + "; need to be in state: " + requiredState
+ + "; currently in state: " + _state);
+ }
+
+ _state = newState;
+ }
+}
diff --git a/java/pom.xml b/java/pom.xml
index 06031ca01e..e2da86fed3 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -524,6 +524,11 @@ under the License.
<version>1.0.1</version>
</dependency>
<dependency>
+ <groupId>org.apache.derby</groupId>
+ <artifactId>derby</artifactId>
+ <version>10.3.2.1</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.mina</groupId>
<artifactId>mina-java5</artifactId>
<version>1.0.1</version>