summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/store
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/store')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java43
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java57
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java1846
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java131
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java196
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataType.java41
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java80
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java36
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreRecoveryHandler.java33
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java36
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java73
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java80
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMessage.java38
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java91
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java33
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java26
16 files changed, 2840 insertions, 0 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java
new file mode 100644
index 0000000000..b9adaeacdf
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.MessageStoreMessages;
+import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
+import org.apache.qpid.server.logging.LogSubject;
+
+public abstract class AbstractMessageStore implements MessageStore
+{
+ protected LogSubject _logSubject;
+
+ public void configure(VirtualHost virtualHost) throws Exception
+ {
+ _logSubject = new MessageStoreLogSubject(virtualHost, this);
+ CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED(this.getClass().getName()));
+ }
+
+ public void close() throws Exception
+ {
+ CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED());
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
new file mode 100755
index 0000000000..a883f656be
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
@@ -0,0 +1,57 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.store;
+
+import java.nio.ByteBuffer;
+import org.apache.qpid.framing.FieldTable;
+
+public interface ConfigurationRecoveryHandler
+{
+ QueueRecoveryHandler begin(MessageStore store);
+
+ public static interface QueueRecoveryHandler
+ {
+ void queue(String queueName, String owner, boolean exclusive, FieldTable arguments);
+ ExchangeRecoveryHandler completeQueueRecovery();
+ }
+
+ public static interface ExchangeRecoveryHandler
+ {
+ void exchange(String exchangeName, String type, boolean autoDelete);
+ BindingRecoveryHandler completeExchangeRecovery();
+ }
+
+ public static interface BindingRecoveryHandler
+ {
+ void binding(String exchangeName, String queueName, String bindingKey, ByteBuffer buf);
+ void completeBindingRecovery();
+ }
+
+ public static interface QueueEntryRecoveryHandler
+ {
+ void complete();
+
+ void queueEntry(String queueName, long messageId);
+ }
+
+
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
new file mode 100644
index 0000000000..2e694b24ea
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
@@ -0,0 +1,1846 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.store;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.lang.ref.SoftReference;
+import java.nio.ByteBuffer;
+import java.sql.Blob;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
+import org.apache.qpid.server.logging.messages.MessageStoreMessages;
+import org.apache.qpid.server.logging.messages.TransactionLogMessages;
+import org.apache.qpid.server.queue.AMQQueue;
+
+/**
+ * An implementation of a {@link MessageStore} that uses Apache Derby as the persistance
+ * mechanism.
+ *
+ * TODO extract the SQL statements into a generic JDBC store
+ */
+public class DerbyMessageStore implements MessageStore
+{
+
+ private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class);
+
+ public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
+
+
+ private static final String SQL_DRIVER_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
+
+ private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION";
+
+ private static final String EXCHANGE_TABLE_NAME = "QPID_EXCHANGE";
+ private static final String QUEUE_TABLE_NAME = "QPID_QUEUE";
+ private static final String BINDINGS_TABLE_NAME = "QPID_BINDINGS";
+ private static final String QUEUE_ENTRY_TABLE_NAME = "QPID_QUEUE_ENTRY";
+
+ private static final String META_DATA_TABLE_NAME = "QPID_META_DATA";
+ private static final String MESSAGE_CONTENT_TABLE_NAME = "QPID_MESSAGE_CONTENT";
+
+ private static final int DB_VERSION = 3;
+
+
+
+ private static Class<Driver> DRIVER_CLASS;
+
+ private final AtomicLong _messageId = new AtomicLong(0);
+ private AtomicBoolean _closed = new AtomicBoolean(false);
+
+ private String _connectionURL;
+
+ private static final String TABLE_EXISTANCE_QUERY = "SELECT 1 FROM SYS.SYSTABLES WHERE TABLENAME = ?";
+
+ private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+DB_VERSION_TABLE_NAME+" ( version int not null )";
+ private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+DB_VERSION_TABLE_NAME+" ( version ) VALUES ( ? )";
+
+ private static final String CREATE_EXCHANGE_TABLE = "CREATE TABLE "+EXCHANGE_TABLE_NAME+" ( name varchar(255) not null, type varchar(255) not null, autodelete SMALLINT not null, PRIMARY KEY ( name ) )";
+ private static final String CREATE_QUEUE_TABLE = "CREATE TABLE "+QUEUE_TABLE_NAME+" ( name varchar(255) not null, owner varchar(255), exclusive SMALLINT not null, arguments blob, PRIMARY KEY ( name ))";
+ private static final String CREATE_BINDINGS_TABLE = "CREATE TABLE "+BINDINGS_TABLE_NAME+" ( exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255) not null, arguments blob , PRIMARY KEY ( exchange_name, queue_name, binding_key ) )";
+ private static final String SELECT_FROM_QUEUE = "SELECT name, owner, exclusive, arguments FROM " + QUEUE_TABLE_NAME;
+ private static final String FIND_QUEUE = "SELECT name, owner FROM " + QUEUE_TABLE_NAME + " WHERE name = ?";
+ private static final String UPDATE_QUEUE_EXCLUSIVITY = "UPDATE " + QUEUE_TABLE_NAME + " SET exclusive = ? WHERE name = ?";
+ private static final String SELECT_FROM_EXCHANGE = "SELECT name, type, autodelete FROM " + EXCHANGE_TABLE_NAME;
+ private static final String SELECT_FROM_BINDINGS =
+ "SELECT exchange_name, queue_name, binding_key, arguments FROM " + BINDINGS_TABLE_NAME + " ORDER BY exchange_name";
+ private static final String FIND_BINDING =
+ "SELECT * FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ? AND queue_name = ? AND binding_key = ? ";
+ private static final String INSERT_INTO_EXCHANGE = "INSERT INTO " + EXCHANGE_TABLE_NAME + " ( name, type, autodelete ) VALUES ( ?, ?, ? )";
+ private static final String DELETE_FROM_EXCHANGE = "DELETE FROM " + EXCHANGE_TABLE_NAME + " WHERE name = ?";
+ private static final String FIND_EXCHANGE = "SELECT name FROM " + EXCHANGE_TABLE_NAME + " WHERE name = ?";
+ private static final String INSERT_INTO_BINDINGS = "INSERT INTO " + BINDINGS_TABLE_NAME + " ( exchange_name, queue_name, binding_key, arguments ) values ( ?, ?, ?, ? )";
+ private static final String DELETE_FROM_BINDINGS = "DELETE FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ? AND queue_name = ? AND binding_key = ?";
+ private static final String INSERT_INTO_QUEUE = "INSERT INTO " + QUEUE_TABLE_NAME + " (name, owner, exclusive, arguments) VALUES (?, ?, ?, ?)";
+ private static final String DELETE_FROM_QUEUE = "DELETE FROM " + QUEUE_TABLE_NAME + " WHERE name = ?";
+
+ private static final String CREATE_QUEUE_ENTRY_TABLE = "CREATE TABLE "+QUEUE_ENTRY_TABLE_NAME+" ( queue_name varchar(255) not null, message_id bigint not null, PRIMARY KEY (queue_name, message_id) )";
+ private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_name, message_id) values (?,?)";
+ private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_name = ? AND message_id =?";
+ private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_name, message_id FROM " + QUEUE_ENTRY_TABLE_NAME + " ORDER BY queue_name, message_id";
+
+
+ private static final String CREATE_META_DATA_TABLE = "CREATE TABLE "+META_DATA_TABLE_NAME+" ( message_id bigint not null, meta_data blob, PRIMARY KEY ( message_id ) )";
+ private static final String CREATE_MESSAGE_CONTENT_TABLE = "CREATE TABLE "+MESSAGE_CONTENT_TABLE_NAME+" ( message_id bigint not null, offset int not null, last_byte int not null, content blob , PRIMARY KEY (message_id, offset) )";
+
+ private static final String INSERT_INTO_MESSAGE_CONTENT = "INSERT INTO " + MESSAGE_CONTENT_TABLE_NAME + "( message_id, offset, last_byte, content ) values (?, ?, ?, ?)";
+ private static final String SELECT_FROM_MESSAGE_CONTENT =
+ "SELECT offset, content FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ? AND last_byte > ? AND offset < ? ORDER BY message_id, offset";
+ private static final String DELETE_FROM_MESSAGE_CONTENT = "DELETE FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ?";
+
+ private static final String INSERT_INTO_META_DATA = "INSERT INTO " + META_DATA_TABLE_NAME + "( message_id , meta_data ) values (?, ?)";;
+ private static final String SELECT_FROM_META_DATA =
+ "SELECT meta_data FROM " + META_DATA_TABLE_NAME + " WHERE message_id = ?";
+ private static final String DELETE_FROM_META_DATA = "DELETE FROM " + META_DATA_TABLE_NAME + " WHERE message_id = ?";
+ private static final String SELECT_ALL_FROM_META_DATA = "SELECT message_id, meta_data FROM " + META_DATA_TABLE_NAME;
+
+ private static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006";
+
+
+ private LogSubject _logSubject;
+ private boolean _configured;
+
+
+ private enum State
+ {
+ INITIAL,
+ CONFIGURING,
+ RECOVERING,
+ STARTED,
+ CLOSING,
+ CLOSED
+ }
+
+ private State _state = State.INITIAL;
+
+
+ public void configureConfigStore(String name,
+ ConfigurationRecoveryHandler recoveryHandler,
+ Configuration storeConfiguration,
+ LogSubject logSubject) throws Exception
+ {
+ stateTransition(State.INITIAL, State.CONFIGURING);
+ _logSubject = logSubject;
+ CurrentActor.get().message(_logSubject, ConfigStoreMessages.CREATED(this.getClass().getName()));
+
+ if(!_configured)
+ {
+ commonConfiguration(name, storeConfiguration, logSubject);
+ _configured = true;
+ }
+
+ // this recovers durable exchanges, queues, and bindings
+ recover(recoveryHandler);
+
+
+ stateTransition(State.RECOVERING, State.STARTED);
+
+ }
+
+
+ public void configureMessageStore(String name,
+ MessageStoreRecoveryHandler recoveryHandler,
+ Configuration storeConfiguration,
+ LogSubject logSubject) throws Exception
+ {
+ CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED(this.getClass().getName()));
+
+ if(!_configured)
+ {
+
+ _logSubject = logSubject;
+
+ commonConfiguration(name, storeConfiguration, logSubject);
+ _configured = true;
+ }
+
+ recoverMessages(recoveryHandler);
+
+ }
+
+
+
+ public void configureTransactionLog(String name,
+ TransactionLogRecoveryHandler recoveryHandler,
+ Configuration storeConfiguration,
+ LogSubject logSubject) throws Exception
+ {
+ CurrentActor.get().message(_logSubject, TransactionLogMessages.CREATED(this.getClass().getName()));
+
+ if(!_configured)
+ {
+
+ _logSubject = logSubject;
+
+ commonConfiguration(name, storeConfiguration, logSubject);
+ _configured = true;
+ }
+
+ recoverQueueEntries(recoveryHandler);
+
+ }
+
+
+
+ private void commonConfiguration(String name, Configuration storeConfiguration, LogSubject logSubject)
+ throws ClassNotFoundException, SQLException
+ {
+ initialiseDriver();
+
+ //Update to pick up QPID_WORK and use that as the default location not just derbyDB
+
+ final String databasePath = storeConfiguration.getString(ENVIRONMENT_PATH_PROPERTY, System.getProperty("QPID_WORK")
+ + File.separator + "derbyDB");
+
+ File environmentPath = new File(databasePath);
+ if (!environmentPath.exists())
+ {
+ if (!environmentPath.mkdirs())
+ {
+ throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. "
+ + "Ensure the path is correct and that the permissions are correct.");
+ }
+ }
+
+ CurrentActor.get().message(_logSubject, MessageStoreMessages.STORE_LOCATION(environmentPath.getAbsolutePath()));
+
+ createOrOpenDatabase(name, databasePath);
+ }
+
+ private static synchronized void initialiseDriver() throws ClassNotFoundException
+ {
+ if(DRIVER_CLASS == null)
+ {
+ DRIVER_CLASS = (Class<Driver>) Class.forName(SQL_DRIVER_NAME);
+ }
+ }
+
+ private void createOrOpenDatabase(String name, final String environmentPath) throws SQLException
+ {
+ //FIXME this the _vhost name should not be added here, but derby wont use an empty directory as was possibly just created.
+ _connectionURL = "jdbc:derby:" + environmentPath + "/" + name + ";create=true";
+
+ Connection conn = newAutoCommitConnection();
+
+ createVersionTable(conn);
+ createExchangeTable(conn);
+ createQueueTable(conn);
+ createBindingsTable(conn);
+ createQueueEntryTable(conn);
+ createMetaDataTable(conn);
+ createMessageContentTable(conn);
+
+ conn.close();
+ }
+
+
+
+ private void createVersionTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(DB_VERSION_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute(CREATE_DB_VERSION_TABLE);
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ PreparedStatement pstmt = conn.prepareStatement(INSERT_INTO_DB_VERSION);
+ try
+ {
+ pstmt.setInt(1, DB_VERSION);
+ pstmt.execute();
+ }
+ finally
+ {
+ pstmt.close();
+ }
+ }
+
+ }
+
+
+ private void createExchangeTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(EXCHANGE_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute(CREATE_EXCHANGE_TABLE);
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ }
+
+ private void createQueueTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(QUEUE_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute(CREATE_QUEUE_TABLE);
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ }
+
+ private void createBindingsTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(BINDINGS_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute(CREATE_BINDINGS_TABLE);
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+
+ }
+
+ private void createQueueEntryTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(QUEUE_ENTRY_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute(CREATE_QUEUE_ENTRY_TABLE);
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+
+ }
+
+ private void createMetaDataTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(META_DATA_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute(CREATE_META_DATA_TABLE);
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+
+ }
+
+
+ private void createMessageContentTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(MESSAGE_CONTENT_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute(CREATE_MESSAGE_CONTENT_TABLE);
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+
+ }
+
+
+
+ private boolean tableExists(final String tableName, final Connection conn) throws SQLException
+ {
+ PreparedStatement stmt = conn.prepareStatement(TABLE_EXISTANCE_QUERY);
+ try
+ {
+ stmt.setString(1, tableName);
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+ return rs.next();
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ }
+
+ public void recover(ConfigurationRecoveryHandler recoveryHandler) throws AMQException
+ {
+ stateTransition(State.CONFIGURING, State.RECOVERING);
+
+ CurrentActor.get().message(_logSubject,MessageStoreMessages.RECOVERY_START());
+
+ try
+ {
+ ConfigurationRecoveryHandler.QueueRecoveryHandler qrh = recoveryHandler.begin(this);
+ loadQueues(qrh);
+
+ ConfigurationRecoveryHandler.ExchangeRecoveryHandler erh = qrh.completeQueueRecovery();
+ List<String> exchanges = loadExchanges(erh);
+ ConfigurationRecoveryHandler.BindingRecoveryHandler brh = erh.completeExchangeRecovery();
+ recoverBindings(brh, exchanges);
+ brh.completeBindingRecovery();
+ }
+ catch (SQLException e)
+ {
+
+ throw new AMQStoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+
+
+ }
+
+ private void loadQueues(ConfigurationRecoveryHandler.QueueRecoveryHandler qrh) throws SQLException
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE);
+ try
+ {
+
+ while(rs.next())
+ {
+ String queueName = rs.getString(1);
+ String owner = rs.getString(2);
+ boolean exclusive = rs.getBoolean(3);
+ Blob argumentsAsBlob = rs.getBlob(4);
+
+ byte[] dataAsBytes = argumentsAsBlob.getBytes(1,(int) argumentsAsBlob.length());
+ FieldTable arguments;
+ if(dataAsBytes.length > 0)
+ {
+ org.apache.mina.common.ByteBuffer buffer = org.apache.mina.common.ByteBuffer.wrap(dataAsBytes);
+
+ arguments = new FieldTable(buffer,buffer.limit());
+ }
+ else
+ {
+ arguments = null;
+ }
+
+ qrh.queue(queueName, owner, exclusive, arguments);
+
+ }
+
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+
+
+ private List<String> loadExchanges(ConfigurationRecoveryHandler.ExchangeRecoveryHandler erh) throws SQLException
+ {
+
+ List<String> exchanges = new ArrayList<String>();
+ Connection conn = null;
+ try
+ {
+ conn = newAutoCommitConnection();
+
+ Statement stmt = conn.createStatement();
+ try
+ {
+ ResultSet rs = stmt.executeQuery(SELECT_FROM_EXCHANGE);
+ try
+ {
+ while(rs.next())
+ {
+ String exchangeName = rs.getString(1);
+ String type = rs.getString(2);
+ boolean autoDelete = rs.getShort(3) != 0;
+
+ exchanges.add(exchangeName);
+
+ erh.exchange(exchangeName, type, autoDelete);
+
+ }
+ return exchanges;
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ finally
+ {
+ if(conn != null)
+ {
+ conn.close();
+ }
+ }
+
+ }
+
+ private void recoverBindings(ConfigurationRecoveryHandler.BindingRecoveryHandler brh, List<String> exchanges) throws SQLException
+ {
+ _logger.info("Recovering bindings...");
+
+ Connection conn = null;
+ try
+ {
+ conn = newAutoCommitConnection();
+
+ PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_BINDINGS);
+
+ try
+ {
+ ResultSet rs = stmt.executeQuery();
+
+ try
+ {
+
+ while(rs.next())
+ {
+ String exchangeName = rs.getString(1);
+ String queueName = rs.getString(2);
+ String bindingKey = rs.getString(3);
+ Blob arguments = rs.getBlob(4);
+ java.nio.ByteBuffer buf;
+
+ if(arguments != null && arguments.length() != 0)
+ {
+ byte[] argumentBytes = arguments.getBytes(1, (int) arguments.length());
+ buf = java.nio.ByteBuffer.wrap(argumentBytes);
+ }
+ else
+ {
+ buf = null;
+ }
+
+ brh.binding(exchangeName, queueName, bindingKey, buf);
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ }
+ finally
+ {
+ if(conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+
+
+ public void close() throws Exception
+ {
+ CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED());
+ _closed.getAndSet(true);
+
+ try
+ {
+ Connection conn = DriverManager.getConnection(_connectionURL + ";shutdown=true");
+ // Shouldn't reach this point - shutdown=true should throw SQLException
+ conn.close();
+ _logger.error("Unable to shut down the store");
+ }
+ catch (SQLException e)
+ {
+ if (e.getSQLState().equalsIgnoreCase(DERBY_SINGLE_DB_SHUTDOWN_CODE))
+ {
+ //expected and represents a clean shutdown of this database only, do nothing.
+ }
+ else
+ {
+ _logger.error("Exception whilst shutting down the store: " + e);
+ }
+ }
+ }
+
+ public StoredMessage addMessage(StorableMessageMetaData metaData)
+ {
+ if(metaData.isPersistent())
+ {
+ return new StoredDerbyMessage(_messageId.incrementAndGet(), metaData);
+ }
+ else
+ {
+ return new StoredMemoryMessage(_messageId.incrementAndGet(), metaData);
+ }
+ }
+
+ public StoredMessage getMessage(long messageNumber)
+ {
+ return null;
+ }
+
+ public void removeMessage(long messageId)
+ {
+ try
+ {
+ Connection conn = newConnection();
+ try
+ {
+ PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_META_DATA);
+ try
+ {
+ stmt.setLong(1,messageId);
+ int results = stmt.executeUpdate();
+ stmt.close();
+
+ if (results == 0)
+ {
+ throw new RuntimeException("Message metadata not found for message id " + messageId);
+ }
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Deleted metadata for message " + messageId);
+ }
+
+ stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_CONTENT);
+ stmt.setLong(1,messageId);
+ results = stmt.executeUpdate();
+ }
+ finally
+ {
+ stmt.close();
+ }
+ conn.commit();
+ }
+ catch(SQLException e)
+ {
+ try
+ {
+ conn.rollback();
+ }
+ catch(SQLException t)
+ {
+ // ignore - we are re-throwing underlying exception
+ }
+
+ throw e;
+
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new RuntimeException("Error removing message with id " + messageId + " from database: " + e.getMessage(), e);
+ }
+
+ }
+
+ public void createExchange(Exchange exchange) throws AMQStoreException
+ {
+ if (_state != State.RECOVERING)
+ {
+ try
+ {
+ Connection conn = newAutoCommitConnection();
+
+ try
+ {
+
+
+ PreparedStatement stmt = conn.prepareStatement(FIND_EXCHANGE);
+ try
+ {
+ stmt.setString(1, exchange.getNameShortString().toString());
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+
+ // If we don't have any data in the result set then we can add this exchange
+ if (!rs.next())
+ {
+
+ PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_EXCHANGE);
+ try
+ {
+ insertStmt.setString(1, exchange.getName().toString());
+ insertStmt.setString(2, exchange.getTypeShortString().asString());
+ insertStmt.setShort(3, exchange.isAutoDelete() ? (short) 1 : (short) 0);
+ insertStmt.execute();
+ }
+ finally
+ {
+ insertStmt.close();
+ }
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error writing Exchange with name " + exchange.getNameShortString() + " to database: " + e.getMessage(), e);
+ }
+ }
+
+ }
+
+ public void removeExchange(Exchange exchange) throws AMQStoreException
+ {
+
+ try
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_EXCHANGE);
+ try
+ {
+ stmt.setString(1, exchange.getNameShortString().toString());
+ int results = stmt.executeUpdate();
+ stmt.close();
+ if(results == 0)
+ {
+ throw new AMQStoreException("Exchange " + exchange.getNameShortString() + " not found");
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error deleting Exchange with name " + exchange.getNameShortString() + " from database: " + e.getMessage(), e);
+ }
+ }
+
+ public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args)
+ throws AMQStoreException
+ {
+ if (_state != State.RECOVERING)
+ {
+ try
+ {
+ Connection conn = newAutoCommitConnection();
+
+ try
+ {
+
+ PreparedStatement stmt = conn.prepareStatement(FIND_BINDING);
+ try
+ {
+ stmt.setString(1, exchange.getNameShortString().toString() );
+ stmt.setString(2, queue.getNameShortString().toString());
+ stmt.setString(3, routingKey == null ? null : routingKey.toString());
+
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+ // If this binding is not already in the store then create it.
+ if (!rs.next())
+ {
+ PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_BINDINGS);
+ try
+ {
+ insertStmt.setString(1, exchange.getNameShortString().toString() );
+ insertStmt.setString(2, queue.getNameShortString().toString());
+ insertStmt.setString(3, routingKey == null ? null : routingKey.toString());
+ if(args != null)
+ {
+ /* This would be the Java 6 way of setting a Blob
+ Blob blobArgs = conn.createBlob();
+ blobArgs.setBytes(0, args.getDataAsBytes());
+ stmt.setBlob(4, blobArgs);
+ */
+ byte[] bytes = args.getDataAsBytes();
+ ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+ insertStmt.setBinaryStream(4, bis, bytes.length);
+ }
+ else
+ {
+ insertStmt.setNull(4, Types.BLOB);
+ }
+
+ insertStmt.executeUpdate();
+ }
+ finally
+ {
+ insertStmt.close();
+ }
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error writing binding for AMQQueue with name " + queue.getNameShortString() + " to exchange "
+ + exchange.getNameShortString() + " to database: " + e.getMessage(), e);
+ }
+
+ }
+
+
+ }
+
+ public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args)
+ throws AMQStoreException
+ {
+ Connection conn = null;
+
+ try
+ {
+ conn = newAutoCommitConnection();
+ // exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255), arguments blob
+ PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_BINDINGS);
+ stmt.setString(1, exchange.getNameShortString().toString() );
+ stmt.setString(2, queue.getNameShortString().toString());
+ stmt.setString(3, routingKey == null ? null : routingKey.toString());
+
+ int result = stmt.executeUpdate();
+ stmt.close();
+
+ if(result != 1)
+ {
+ throw new AMQStoreException("Queue binding for queue with name " + queue.getNameShortString() + " to exchange "
+ + exchange.getNameShortString() + " not found");
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error removing binding for AMQQueue with name " + queue.getNameShortString() + " to exchange "
+ + exchange.getNameShortString() + " in database: " + e.getMessage(), e);
+ }
+ finally
+ {
+ if(conn != null)
+ {
+ try
+ {
+ conn.close();
+ }
+ catch (SQLException e)
+ {
+ _logger.error(e);
+ }
+ }
+
+ }
+
+
+ }
+
+ public void createQueue(AMQQueue queue) throws AMQStoreException
+ {
+ createQueue(queue, null);
+ }
+
+ public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException
+ {
+ _logger.debug("public void createQueue(AMQQueue queue = " + queue + "): called");
+
+ if (_state != State.RECOVERING)
+ {
+ try
+ {
+ Connection conn = newAutoCommitConnection();
+
+ PreparedStatement stmt = conn.prepareStatement(FIND_QUEUE);
+ try
+ {
+ stmt.setString(1, queue.getNameShortString().toString());
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+
+ // If we don't have any data in the result set then we can add this queue
+ if (!rs.next())
+ {
+ PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_QUEUE);
+
+ try
+ {
+ String owner = queue.getOwner() == null ? null : queue.getOwner().toString();
+
+ insertStmt.setString(1, queue.getNameShortString().toString());
+ insertStmt.setString(2, owner);
+ insertStmt.setBoolean(3,queue.isExclusive());
+
+ final byte[] underlying;
+ if(arguments != null)
+ {
+ underlying = arguments.getDataAsBytes();
+ }
+ else
+ {
+ underlying = new byte[0];
+ }
+
+ ByteArrayInputStream bis = new ByteArrayInputStream(underlying);
+ insertStmt.setBinaryStream(4,bis,underlying.length);
+
+ insertStmt.execute();
+ }
+ finally
+ {
+ insertStmt.close();
+ }
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+ conn.close();
+
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error writing AMQQueue with name " + queue.getNameShortString() + " to database: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ /**
+ * Updates the specified queue in the persistent store, IF it is already present. If the queue
+ * is not present in the store, it will not be added.
+ *
+ * NOTE: Currently only updates the exclusivity.
+ *
+ * @param queue The queue to update the entry for.
+ * @throws AMQStoreException If the operation fails for any reason.
+ */
+ public void updateQueue(final AMQQueue queue) throws AMQStoreException
+ {
+ if (_state != State.RECOVERING)
+ {
+ try
+ {
+ Connection conn = newAutoCommitConnection();
+
+ try
+ {
+ PreparedStatement stmt = conn.prepareStatement(FIND_QUEUE);
+ try
+ {
+ stmt.setString(1, queue.getNameShortString().toString());
+
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+ if (rs.next())
+ {
+ PreparedStatement stmt2 = conn.prepareStatement(UPDATE_QUEUE_EXCLUSIVITY);
+ try
+ {
+ stmt2.setBoolean(1,queue.isExclusive());
+ stmt2.setString(2, queue.getNameShortString().toString());
+
+ stmt2.execute();
+ }
+ finally
+ {
+ stmt2.close();
+ }
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error updating AMQQueue with name " + queue.getNameShortString() + " to database: " + e.getMessage(), e);
+ }
+ }
+
+ }
+
+ /**
+ * Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED
+ * isolation and with auto-commit transactions enabled.
+ */
+ private Connection newAutoCommitConnection() throws SQLException
+ {
+ final Connection connection = newConnection();
+ try
+ {
+ connection.setAutoCommit(true);
+ }
+ catch (SQLException sqlEx)
+ {
+
+ try
+ {
+ connection.close();
+ }
+ finally
+ {
+ throw sqlEx;
+ }
+ }
+
+ return connection;
+ }
+
+ /**
+ * Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED
+ * isolation and with auto-commit transactions disabled.
+ */
+ private Connection newConnection() throws SQLException
+ {
+ final Connection connection = DriverManager.getConnection(_connectionURL);
+ try
+ {
+ connection.setAutoCommit(false);
+ connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
+ }
+ catch (SQLException sqlEx)
+ {
+ try
+ {
+ connection.close();
+ }
+ finally
+ {
+ throw sqlEx;
+ }
+ }
+ return connection;
+ }
+
+ public void removeQueue(final AMQQueue queue) throws AMQStoreException
+ {
+ AMQShortString name = queue.getNameShortString();
+ _logger.debug("public void removeQueue(AMQShortString name = " + name + "): called");
+ Connection conn = null;
+
+ try
+ {
+ conn = newAutoCommitConnection();
+ PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE);
+ stmt.setString(1, name.toString());
+ int results = stmt.executeUpdate();
+ stmt.close();
+
+ if (results == 0)
+ {
+ throw new AMQStoreException("Queue " + name + " not found");
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error deleting AMQQueue with name " + name + " from database: " + e.getMessage(), e);
+ }
+ finally
+ {
+ if(conn != null)
+ {
+ try
+ {
+ conn.close();
+ }
+ catch (SQLException e)
+ {
+ _logger.error(e);
+ }
+ }
+
+ }
+
+
+ }
+
+ public Transaction newTransaction()
+ {
+ return new DerbyTransaction();
+ }
+
+ public void enqueueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws AMQStoreException
+ {
+ String name = queue.getResourceName();
+
+ Connection conn = connWrapper.getConnection();
+
+
+ try
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Enqueuing message " + messageId + " on queue " + name + "[Connection" + conn + "]");
+ }
+
+ PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY);
+ try
+ {
+ stmt.setString(1,name);
+ stmt.setLong(2,messageId);
+ stmt.executeUpdate();
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ _logger.error("Failed to enqueue: " + e.getMessage(), e);
+ throw new AMQStoreException("Error writing enqueued message with id " + messageId + " for queue " + name
+ + " to database", e);
+ }
+
+ }
+
+ public void dequeueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws AMQStoreException
+ {
+ String name = queue.getResourceName();
+
+
+ Connection conn = connWrapper.getConnection();
+
+
+ try
+ {
+ PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE_ENTRY);
+ try
+ {
+ stmt.setString(1,name);
+ stmt.setLong(2,messageId);
+ int results = stmt.executeUpdate();
+
+
+
+ if(results != 1)
+ {
+ throw new AMQStoreException("Unable to find message with id " + messageId + " on queue " + name);
+ }
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Dequeuing message " + messageId + " on queue " + name );//+ "[Connection" + conn + "]");
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ _logger.error("Failed to dequeue: " + e.getMessage(), e);
+ throw new AMQStoreException("Error deleting enqueued message with id " + messageId + " for queue " + name
+ + " from database", e);
+ }
+
+ }
+
+ private static final class ConnectionWrapper
+ {
+ private final Connection _connection;
+
+ public ConnectionWrapper(Connection conn)
+ {
+ _connection = conn;
+ }
+
+ public Connection getConnection()
+ {
+ return _connection;
+ }
+ }
+
+
+ public void commitTran(ConnectionWrapper connWrapper) throws AMQStoreException
+ {
+
+ try
+ {
+ Connection conn = connWrapper.getConnection();
+ conn.commit();
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("commit tran completed");
+ }
+
+ conn.close();
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error commit tx: " + e.getMessage(), e);
+ }
+ finally
+ {
+
+ }
+ }
+
+ public StoreFuture commitTranAsync(ConnectionWrapper connWrapper) throws AMQStoreException
+ {
+ commitTran(connWrapper);
+ return new StoreFuture()
+ {
+ public boolean isComplete()
+ {
+ return true;
+ }
+
+ public void waitForCompletion()
+ {
+
+ }
+ };
+
+ }
+
+ public void abortTran(ConnectionWrapper connWrapper) throws AMQStoreException
+ {
+ if (connWrapper == null)
+ {
+ throw new AMQStoreException("Fatal internal error: transactional context is empty at abortTran");
+ }
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("abort tran called: " + connWrapper.getConnection());
+ }
+
+ try
+ {
+ Connection conn = connWrapper.getConnection();
+ conn.rollback();
+ conn.close();
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error aborting transaction: " + e.getMessage(), e);
+ }
+
+ }
+
+ public Long getNewMessageId()
+ {
+ return _messageId.incrementAndGet();
+ }
+
+
+ private void storeMetaData(Connection conn, long messageId, StorableMessageMetaData metaData)
+ throws SQLException
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Adding metadata for message " +messageId);
+ }
+
+ PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_META_DATA);
+ try
+ {
+ stmt.setLong(1,messageId);
+
+ final int bodySize = 1 + metaData.getStorableSize();
+ byte[] underlying = new byte[bodySize];
+ underlying[0] = (byte) metaData.getType().ordinal();
+ java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(underlying);
+ buf.position(1);
+ buf = buf.slice();
+
+ metaData.writeToBuffer(0, buf);
+ ByteArrayInputStream bis = new ByteArrayInputStream(underlying);
+ try
+ {
+ stmt.setBinaryStream(2,bis,underlying.length);
+ int result = stmt.executeUpdate();
+
+ if(result == 0)
+ {
+ throw new RuntimeException("Unable to add meta data for message " +messageId);
+ }
+ }
+ finally
+ {
+ try
+ {
+ bis.close();
+ }
+ catch (IOException e)
+ {
+
+ throw new SQLException(e);
+ }
+ }
+
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ }
+
+
+
+
+ private void recoverMessages(MessageStoreRecoveryHandler recoveryHandler) throws SQLException
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ MessageStoreRecoveryHandler.StoredMessageRecoveryHandler messageHandler = recoveryHandler.begin();
+
+ Statement stmt = conn.createStatement();
+ try
+ {
+ ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_META_DATA);
+ try
+ {
+
+ long maxId = 0;
+
+ while(rs.next())
+ {
+
+ long messageId = rs.getLong(1);
+ Blob dataAsBlob = rs.getBlob(2);
+
+ if(messageId > maxId)
+ {
+ maxId = messageId;
+ }
+
+ byte[] dataAsBytes = dataAsBlob.getBytes(1,(int) dataAsBlob.length());
+ java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(dataAsBytes);
+ buf.position(1);
+ buf = buf.slice();
+ MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]];
+ StorableMessageMetaData metaData = type.getFactory().createMetaData(buf);
+ StoredDerbyMessage message = new StoredDerbyMessage(messageId, metaData, false);
+ messageHandler.message(message);
+ }
+
+ _messageId.set(maxId);
+
+ messageHandler.completeMessageRecovery();
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+
+
+
+ private void recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) throws SQLException
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ TransactionLogRecoveryHandler.QueueEntryRecoveryHandler queueEntryHandler = recoveryHandler.begin(this);
+
+ Statement stmt = conn.createStatement();
+ try
+ {
+ ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY);
+ try
+ {
+ while(rs.next())
+ {
+
+ String queueName = rs.getString(1);
+ long messageId = rs.getLong(2);
+ queueEntryHandler.queueEntry(queueName,messageId);
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ queueEntryHandler.completeQueueEntryRecovery();
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+
+ StorableMessageMetaData getMetaData(long messageId) throws SQLException
+ {
+
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_META_DATA);
+ try
+ {
+ stmt.setLong(1,messageId);
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+
+ if(rs.next())
+ {
+ Blob dataAsBlob = rs.getBlob(1);
+
+ byte[] dataAsBytes = dataAsBlob.getBytes(1,(int) dataAsBlob.length());
+ java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(dataAsBytes);
+ buf.position(1);
+ buf = buf.slice();
+ MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]];
+ StorableMessageMetaData metaData = type.getFactory().createMetaData(buf);
+
+ return metaData;
+ }
+ else
+ {
+ throw new RuntimeException("Meta data not found for message with id " + messageId);
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+
+
+ private void addContent(Connection conn, long messageId, int offset, ByteBuffer src)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Adding content chunk offset " + offset + " for message " +messageId);
+ }
+
+ try
+ {
+ src = src.slice();
+
+ byte[] chunkData = new byte[src.limit()];
+ src.duplicate().get(chunkData);
+
+ PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_CONTENT);
+ stmt.setLong(1,messageId);
+ stmt.setInt(2, offset);
+ stmt.setInt(3, offset+chunkData.length);
+
+
+ /* this would be the Java 6 way of doing things
+ Blob dataAsBlob = conn.createBlob();
+ dataAsBlob.setBytes(1L, chunkData);
+ stmt.setBlob(3, dataAsBlob);
+ */
+ ByteArrayInputStream bis = new ByteArrayInputStream(chunkData);
+ stmt.setBinaryStream(4, bis, chunkData.length);
+ stmt.executeUpdate();
+ stmt.close();
+ }
+ catch (SQLException e)
+ {
+ if(conn != null)
+ {
+ try
+ {
+ conn.close();
+ }
+ catch (SQLException e1)
+ {
+
+ }
+ }
+
+ throw new RuntimeException("Error adding content chunk offset " + offset + " for message " + messageId + ": " + e.getMessage(), e);
+ }
+
+ }
+
+
+ public int getContent(long messageId, int offset, ByteBuffer dst)
+ {
+ Connection conn = null;
+
+
+ try
+ {
+ conn = newAutoCommitConnection();
+
+ PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_CONTENT);
+ stmt.setLong(1,messageId);
+ stmt.setInt(2, offset);
+ stmt.setInt(3, offset+dst.remaining());
+ ResultSet rs = stmt.executeQuery();
+
+ int written = 0;
+
+ while(rs.next())
+ {
+ int offsetInMessage = rs.getInt(1);
+ Blob dataAsBlob = rs.getBlob(2);
+
+ final int size = (int) dataAsBlob.length();
+ byte[] dataAsBytes = dataAsBlob.getBytes(1, size);
+
+ int posInArray = offset + written - offsetInMessage;
+ int count = size - posInArray;
+ if(count > dst.remaining())
+ {
+ count = dst.remaining();
+ }
+ dst.put(dataAsBytes,posInArray,count);
+ written+=count;
+
+ if(dst.remaining() == 0)
+ {
+ break;
+ }
+ }
+
+ stmt.close();
+ conn.close();
+ return written;
+
+ }
+ catch (SQLException e)
+ {
+ if(conn != null)
+ {
+ try
+ {
+ conn.close();
+ }
+ catch (SQLException e1)
+ {
+
+ }
+ }
+
+ throw new RuntimeException("Error retrieving content from offset " + offset + " for message " + messageId + ": " + e.getMessage(), e);
+ }
+
+
+
+ }
+
+ public boolean isPersistent()
+ {
+ return true;
+ }
+
+
+ private synchronized void stateTransition(State requiredState, State newState) throws AMQStoreException
+ {
+ if (_state != requiredState)
+ {
+ throw new AMQStoreException("Cannot transition to the state: " + newState + "; need to be in state: " + requiredState
+ + "; currently in state: " + _state);
+ }
+
+ _state = newState;
+ }
+
+
+ private class DerbyTransaction implements Transaction
+ {
+ private final ConnectionWrapper _connWrapper;
+
+
+ private DerbyTransaction()
+ {
+ try
+ {
+ _connWrapper = new ConnectionWrapper(newConnection());
+ }
+ catch (SQLException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+ {
+ DerbyMessageStore.this.enqueueMessage(_connWrapper, queue, messageId);
+ }
+
+ public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+ {
+ DerbyMessageStore.this.dequeueMessage(_connWrapper, queue, messageId);
+
+ }
+
+ public void commitTran() throws AMQStoreException
+ {
+ DerbyMessageStore.this.commitTran(_connWrapper);
+ }
+
+ public StoreFuture commitTranAsync() throws AMQStoreException
+ {
+ return DerbyMessageStore.this.commitTranAsync(_connWrapper);
+ }
+
+ public void abortTran() throws AMQStoreException
+ {
+ DerbyMessageStore.this.abortTran(_connWrapper);
+ }
+ }
+
+ private class StoredDerbyMessage implements StoredMessage
+ {
+
+ private final long _messageId;
+ private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
+ private Connection _conn;
+
+ StoredDerbyMessage(long messageId, StorableMessageMetaData metaData)
+ {
+ this(messageId, metaData, true);
+ }
+
+
+ StoredDerbyMessage(long messageId,
+ StorableMessageMetaData metaData, boolean persist)
+ {
+ try
+ {
+ _messageId = messageId;
+
+ _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
+ if(persist)
+ {
+ _conn = newConnection();
+ storeMetaData(_conn, messageId, metaData);
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ public StorableMessageMetaData getMetaData()
+ {
+ StorableMessageMetaData metaData = _metaDataRef.get();
+ if(metaData == null)
+ {
+ try
+ {
+ metaData = DerbyMessageStore.this.getMetaData(_messageId);
+ }
+ catch (SQLException e)
+ {
+ throw new RuntimeException(e);
+ }
+ _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
+ }
+
+ return metaData;
+ }
+
+ public long getMessageNumber()
+ {
+ return _messageId;
+ }
+
+ public void addContent(int offsetInMessage, java.nio.ByteBuffer src)
+ {
+ DerbyMessageStore.this.addContent(_conn, _messageId, offsetInMessage, src);
+ }
+
+ public int getContent(int offsetInMessage, java.nio.ByteBuffer dst)
+ {
+ return DerbyMessageStore.this.getContent(_messageId, offsetInMessage, dst);
+ }
+
+ public StoreFuture flushToStore()
+ {
+ try
+ {
+ if(_conn != null)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Flushing message " + _messageId + " to store");
+ }
+
+ _conn.commit();
+ _conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Error when trying to flush message " + _messageId + " to store: " + e);
+ }
+ throw new RuntimeException(e);
+ }
+ finally
+ {
+ _conn = null;
+ }
+ return IMMEDIATE_FUTURE;
+ }
+
+ public void remove()
+ {
+ flushToStore();
+ DerbyMessageStore.this.removeMessage(_messageId);
+ }
+ }
+
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
new file mode 100755
index 0000000000..5fb23653cb
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
@@ -0,0 +1,131 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.queue.AMQQueue;
+
+public interface DurableConfigurationStore
+{
+
+ public static interface Source
+ {
+ DurableConfigurationStore getDurableConfigurationStore();
+ }
+
+ /**
+ * Called after instantiation in order to configure the message store. A particular implementation can define
+ * whatever parameters it wants.
+ *
+ * @param name The name to be used by this storem
+ * @param recoveryHandler Handler to be called as the store recovers on start up
+ * @param config The apache commons configuration object.
+ *
+ * @throws Exception If any error occurs that means the store is unable to configure itself.
+ */
+ void configureConfigStore(String name,
+ ConfigurationRecoveryHandler recoveryHandler,
+ Configuration config,
+ LogSubject logSubject) throws Exception;
+ /**
+ * Makes the specified exchange persistent.
+ *
+ * @param exchange The exchange to persist.
+ *
+ * @throws AMQStoreException If the operation fails for any reason.
+ */
+ void createExchange(Exchange exchange) throws AMQStoreException;
+
+ /**
+ * Removes the specified persistent exchange.
+ *
+ * @param exchange The exchange to remove.
+ *
+ * @throws AMQStoreException If the operation fails for any reason.
+ */
+ void removeExchange(Exchange exchange) throws AMQStoreException;
+
+ /**
+ * Binds the specified queue to an exchange with a routing key.
+ *
+ * @param exchange The exchange to bind to.
+ * @param routingKey The routing key to bind by.
+ * @param queue The queue to bind.
+ * @param args Additional parameters.
+ *
+ * @throws AMQStoreException if the operation fails for any reason.
+ */
+ void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException;
+
+ /**
+ * Unbinds the specified from an exchange under a particular routing key.
+ *
+ * @param exchange The exchange to unbind from.
+ * @param routingKey The routing key to unbind.
+ * @param queue The queue to unbind.
+ * @param args Additonal parameters.
+ *
+ * @throws AMQStoreException If the operation fails for any reason.
+ */
+ void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException;
+
+ /**
+ * Makes the specified queue persistent.
+ *
+ * @param queue The queue to store.
+ *
+ * @throws AMQStoreException If the operation fails for any reason.
+ */
+ void createQueue(AMQQueue queue) throws AMQStoreException;
+
+ /**
+ * Makes the specified queue persistent.
+ *
+ * @param queue The queue to store.
+ * @param arguments The additional arguments to the binding
+ *
+ * @throws AMQStoreException If the operation fails for any reason.
+ */
+ void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException;
+
+ /**
+ * Removes the specified queue from the persistent store.
+ *
+ * @param queue The queue to remove.
+ *
+ * @throws AMQStoreException If the operation fails for any reason.
+ */
+ void removeQueue(AMQQueue queue) throws AMQStoreException;
+
+ /**
+ * Updates the specified queue in the persistent store, IF it is already present. If the queue
+ * is not present in the store, it will not be added.
+ *
+ * @param queue The queue to update the entry for.
+ * @throws AMQStoreException If the operation fails for any reason.
+ */
+ void updateQueue(AMQQueue queue) throws AMQStoreException;
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
new file mode 100644
index 0000000000..d008d42fa0
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -0,0 +1,196 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
+import org.apache.qpid.server.logging.messages.MessageStoreMessages;
+import org.apache.qpid.server.queue.AMQQueue;
+
+/** A simple message store that stores the messages in a threadsafe structure in memory. */
+public class MemoryMessageStore implements MessageStore
+{
+ private static final Logger _log = Logger.getLogger(MemoryMessageStore.class);
+
+ private static final int DEFAULT_HASHTABLE_CAPACITY = 50000;
+
+ private static final String HASHTABLE_CAPACITY_CONFIG = "hashtable-capacity";
+
+
+ private final AtomicLong _messageId = new AtomicLong(1);
+ private AtomicBoolean _closed = new AtomicBoolean(false);
+ private LogSubject _logSubject;
+
+ private static final Transaction IN_MEMORY_TRANSACTION = new Transaction()
+ {
+ public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+ {
+ }
+
+ public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+ {
+ }
+
+ public void commitTran() throws AMQStoreException
+ {
+ }
+
+ public StoreFuture commitTranAsync() throws AMQStoreException
+ {
+ return IMMEDIATE_FUTURE;
+ }
+
+ public void abortTran() throws AMQStoreException
+ {
+ }
+
+ };
+
+ public void configureConfigStore(String name, ConfigurationRecoveryHandler handler, Configuration configuration, LogSubject logSubject) throws Exception
+ {
+ _logSubject = logSubject;
+ CurrentActor.get().message(_logSubject, ConfigStoreMessages.CREATED(this.getClass().getName()));
+
+
+ }
+
+ public void configureMessageStore(String name,
+ MessageStoreRecoveryHandler recoveryHandler,
+ Configuration config,
+ LogSubject logSubject) throws Exception
+ {
+ if(_logSubject == null)
+ {
+ _logSubject = logSubject;
+ }
+ int hashtableCapacity = config.getInt(name + "." + HASHTABLE_CAPACITY_CONFIG, DEFAULT_HASHTABLE_CAPACITY);
+ _log.info("Using capacity " + hashtableCapacity + " for hash tables");
+ CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED(this.getClass().getName()));
+ }
+
+ public void close() throws Exception
+ {
+ _closed.getAndSet(true);
+ CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED());
+
+ }
+
+ public StoredMessage addMessage(StorableMessageMetaData metaData)
+ {
+ final long id = _messageId.getAndIncrement();
+ StoredMemoryMessage message = new StoredMemoryMessage(id, metaData);
+
+ return message;
+ }
+
+
+ public void createExchange(Exchange exchange) throws AMQStoreException
+ {
+
+ }
+
+ public void removeExchange(Exchange exchange) throws AMQStoreException
+ {
+
+ }
+
+ public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
+ {
+
+ }
+
+ public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
+ {
+
+ }
+
+
+ public void createQueue(AMQQueue queue) throws AMQStoreException
+ {
+ // Not requred to do anything
+ }
+
+ public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException
+ {
+ // Not required to do anything
+ }
+
+ public void removeQueue(final AMQQueue queue) throws AMQStoreException
+ {
+ // Not required to do anything
+ }
+
+ public void updateQueue(final AMQQueue queue) throws AMQStoreException
+ {
+ // Not required to do anything
+ }
+
+ public void configureTransactionLog(String name,
+ TransactionLogRecoveryHandler recoveryHandler,
+ Configuration storeConfiguration,
+ LogSubject logSubject) throws Exception
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Transaction newTransaction()
+ {
+ return IN_MEMORY_TRANSACTION;
+ }
+
+
+ public List<AMQQueue> createQueues() throws AMQException
+ {
+ return null;
+ }
+
+ public Long getNewMessageId()
+ {
+ return _messageId.getAndIncrement();
+ }
+
+ public boolean isPersistent()
+ {
+ return false;
+ }
+
+ private void checkNotClosed() throws MessageStoreClosedException
+ {
+ if (_closed.get())
+ {
+ throw new MessageStoreClosedException();
+ }
+ }
+
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataType.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataType.java
new file mode 100755
index 0000000000..428bb1e41b
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataType.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.server.message.MessageMetaData_0_10;
+
+import java.nio.ByteBuffer;
+
+public enum MessageMetaDataType
+{
+ META_DATA_0_8 { public Factory<MessageMetaData> getFactory() { return MessageMetaData.FACTORY; } },
+ META_DATA_0_10 { public Factory<MessageMetaData_0_10> getFactory() { return MessageMetaData_0_10.FACTORY; } };
+
+
+ public static interface Factory<M extends StorableMessageMetaData>
+ {
+ M createMetaData(ByteBuffer buf);
+ }
+
+ abstract public Factory<? extends StorableMessageMetaData> getFactory();
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
new file mode 100644
index 0000000000..e2fca2f9c7
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
@@ -0,0 +1,80 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.commons.configuration.Configuration;
+
+/**
+ * MessageStore defines the interface to a storage area, which can be used to preserve the state of messages.
+ *
+ */
+public interface MessageStore extends DurableConfigurationStore, TransactionLog
+{
+ StoreFuture IMMEDIATE_FUTURE = new StoreFuture()
+ {
+ public boolean isComplete()
+ {
+ return true;
+ }
+
+ public void waitForCompletion()
+ {
+
+ }
+ };
+
+
+ /**
+ * Called after instantiation in order to configure the message store. A particular implementation can define
+ * whatever parameters it wants.
+ *
+ * @param name The name to be used by this storem
+ * @param recoveryHandler Handler to be called as the store recovers on start up
+ * @param config The apache commons configuration object.
+ *
+ * @throws Exception If any error occurs that means the store is unable to configure itself.
+ */
+ void configureMessageStore(String name,
+ MessageStoreRecoveryHandler recoveryHandler,
+ Configuration config,
+ LogSubject logSubject) throws Exception;
+
+ /**
+ * Called to close and cleanup any resources used by the message store.
+ *
+ * @throws Exception If the close fails.
+ */
+ void close() throws Exception;
+
+
+ public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData);
+
+
+ /**
+ * Is this store capable of persisting the data
+ *
+ * @return true if this store is capable of persisting data
+ */
+ boolean isPersistent();
+
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java
new file mode 100644
index 0000000000..3d1538c7eb
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java
@@ -0,0 +1,36 @@
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.AMQException;/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * NOTE: this class currently extends AMQException but
+ * we should be using AMQExceptions internally in the code base for Protocol errors hence
+ * the message store interface should throw a different super class which this should be
+ * moved to reflect
+ */
+public class MessageStoreClosedException extends AMQException
+{
+ public MessageStoreClosedException()
+ {
+ super("Message store closed");
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreRecoveryHandler.java
new file mode 100755
index 0000000000..ba65b8e1ec
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreRecoveryHandler.java
@@ -0,0 +1,33 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.store;
+
+public interface MessageStoreRecoveryHandler
+{
+ StoredMessageRecoveryHandler begin();
+
+ public static interface StoredMessageRecoveryHandler
+ {
+ void message(StoredMessage message);
+
+ void completeMessageRecovery();
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java
new file mode 100755
index 0000000000..12d2a6a6c7
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.nio.ByteBuffer;
+
+public interface StorableMessageMetaData
+{
+ MessageMetaDataType getType();
+
+ int getStorableSize();
+
+ int writeToBuffer(int offsetInMetaData, ByteBuffer dest);
+
+ int getContentSize();
+
+ boolean isPersistent();
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
new file mode 100644
index 0000000000..88cc68bc71
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import org.apache.log4j.Logger;
+
+/**
+ * A context that the store can use to associate with a transactional context. For example, it could store
+ * some kind of txn id.
+ *
+ * @author Apache Software Foundation
+ */
+public class StoreContext
+{
+ private static final Logger _logger = Logger.getLogger(StoreContext.class);
+
+ private String _name;
+ private Object _payload;
+
+
+ public StoreContext()
+ {
+ _name = "StoreContext";
+ }
+
+ public StoreContext(String name)
+ {
+ _name = name;
+ }
+
+ public Object getPayload()
+ {
+ return _payload;
+ }
+
+ public void setPayload(Object payload)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("public void setPayload(Object payload = " + payload + "): called");
+ }
+ _payload = payload;
+ }
+
+ /**
+ * Prints out the transactional context as a string, mainly for debugging purposes.
+ *
+ * @return The transactional context as a string.
+ */
+ public String toString()
+ {
+ return "<_name = " + _name + ", _payload = " + _payload + ">";
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
new file mode 100755
index 0000000000..1f5b027b80
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
@@ -0,0 +1,80 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.store;
+
+import java.nio.ByteBuffer;
+
+public class StoredMemoryMessage implements StoredMessage
+{
+ private final long _messageNumber;
+ private final ByteBuffer _content;
+ private final StorableMessageMetaData _metaData;
+
+ public StoredMemoryMessage(long messageNumber, StorableMessageMetaData metaData)
+ {
+ _messageNumber = messageNumber;
+ _metaData = metaData;
+ _content = ByteBuffer.allocate(metaData.getContentSize());
+
+ }
+
+ public long getMessageNumber()
+ {
+ return _messageNumber;
+ }
+
+ public void addContent(int offsetInMessage, ByteBuffer src)
+ {
+ src = src.duplicate();
+ ByteBuffer dst = _content.duplicate();
+ dst.position(offsetInMessage);
+ dst.put(src);
+ }
+
+ public int getContent(int offset, ByteBuffer dst)
+ {
+ ByteBuffer src = _content.duplicate();
+ src.position(offset);
+ src = src.slice();
+ if(dst.remaining() < src.limit())
+ {
+ src.limit(dst.remaining());
+ }
+ dst.put(src);
+ return src.limit();
+ }
+
+ public TransactionLog.StoreFuture flushToStore()
+ {
+ return MessageStore.IMMEDIATE_FUTURE;
+ }
+
+
+ public StorableMessageMetaData getMetaData()
+ {
+ return _metaData;
+ }
+
+ public void remove()
+ {
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMessage.java
new file mode 100755
index 0000000000..0bc45c6718
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMessage.java
@@ -0,0 +1,38 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.store;
+
+import java.nio.ByteBuffer;
+
+public interface StoredMessage<M extends StorableMessageMetaData>
+{
+ M getMetaData();
+
+ public long getMessageNumber();
+
+ void addContent(int offsetInMessage, ByteBuffer src);
+
+ int getContent(int offsetInMessage, ByteBuffer dst);
+
+ TransactionLog.StoreFuture flushToStore();
+
+ void remove();
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java
new file mode 100755
index 0000000000..d196a91930
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java
@@ -0,0 +1,91 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.AMQStoreException;
+import org.apache.commons.configuration.Configuration;
+
+public interface TransactionLog
+{
+
+ public static interface Transaction
+ {
+ /**
+ * Places a message onto a specified queue, in a given transactional context.
+ *
+ * @param queue The queue to place the message on.
+ * @param messageId The message to enqueue.
+ * @throws AMQStoreException If the operation fails for any reason.
+ */
+ void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException;
+
+ /**
+ * Extracts a message from a specified queue, in a given transactional context.
+ *
+ * @param queue The queue to place the message on.
+ * @param messageId The message to dequeue.
+ * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
+ */
+ void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException;
+
+
+ /**
+ * Commits all operations performed within a given transactional context.
+ *
+ * @throws AMQStoreException If the operation fails for any reason.
+ */
+ void commitTran() throws AMQStoreException;
+
+ /**
+ * Commits all operations performed within a given transactional context.
+ *
+ * @throws AMQStoreException If the operation fails for any reason.
+ */
+ StoreFuture commitTranAsync() throws AMQStoreException;
+
+ /**
+ * Abandons all operations performed within a given transactional context.
+ *
+ * @throws AMQStoreException If the operation fails for any reason.
+ */
+ void abortTran() throws AMQStoreException;
+
+
+
+ }
+
+ public void configureTransactionLog(String name,
+ TransactionLogRecoveryHandler recoveryHandler,
+ Configuration storeConfiguration,
+ LogSubject logSubject) throws Exception;
+
+ Transaction newTransaction();
+
+
+
+ public static interface StoreFuture
+ {
+ boolean isComplete();
+
+ void waitForCompletion();
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java
new file mode 100755
index 0000000000..7781c52df3
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java
@@ -0,0 +1,33 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.store;
+
+public interface TransactionLogRecoveryHandler
+{
+ QueueEntryRecoveryHandler begin(TransactionLog log);
+
+ public static interface QueueEntryRecoveryHandler
+ {
+ void queueEntry(String queuename, long messageId);
+
+ void completeQueueEntryRecovery();
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java
new file mode 100755
index 0000000000..0d81dd151d
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java
@@ -0,0 +1,26 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.store;
+
+public interface TransactionLogResource
+{
+ public String getResourceName();
+}