diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/store')
16 files changed, 2840 insertions, 0 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java new file mode 100644 index 0000000000..b9adaeacdf --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java @@ -0,0 +1,43 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.messages.MessageStoreMessages; +import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; +import org.apache.qpid.server.logging.LogSubject; + +public abstract class AbstractMessageStore implements MessageStore +{ + protected LogSubject _logSubject; + + public void configure(VirtualHost virtualHost) throws Exception + { + _logSubject = new MessageStoreLogSubject(virtualHost, this); + CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED(this.getClass().getName())); + } + + public void close() throws Exception + { + CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED()); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java new file mode 100755 index 0000000000..a883f656be --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java @@ -0,0 +1,57 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.store; + +import java.nio.ByteBuffer; +import org.apache.qpid.framing.FieldTable; + +public interface ConfigurationRecoveryHandler +{ + QueueRecoveryHandler begin(MessageStore store); + + public static interface QueueRecoveryHandler + { + void queue(String queueName, String owner, boolean exclusive, FieldTable arguments); + ExchangeRecoveryHandler completeQueueRecovery(); + } + + public static interface ExchangeRecoveryHandler + { + void exchange(String exchangeName, String type, boolean autoDelete); + BindingRecoveryHandler completeExchangeRecovery(); + } + + public static interface BindingRecoveryHandler + { + void binding(String exchangeName, String queueName, String bindingKey, ByteBuffer buf); + void completeBindingRecovery(); + } + + public static interface QueueEntryRecoveryHandler + { + void complete(); + + void queueEntry(String queueName, long messageId); + } + + + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java new file mode 100644 index 0000000000..2e694b24ea --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java @@ -0,0 +1,1846 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.store; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.lang.ref.SoftReference; +import java.nio.ByteBuffer; +import java.sql.Blob; +import java.sql.Connection; +import java.sql.Driver; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Types; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.configuration.Configuration; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.messages.ConfigStoreMessages; +import org.apache.qpid.server.logging.messages.MessageStoreMessages; +import org.apache.qpid.server.logging.messages.TransactionLogMessages; +import org.apache.qpid.server.queue.AMQQueue; + +/** + * An implementation of a {@link MessageStore} that uses Apache Derby as the persistance + * mechanism. + * + * TODO extract the SQL statements into a generic JDBC store + */ +public class DerbyMessageStore implements MessageStore +{ + + private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class); + + public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path"; + + + private static final String SQL_DRIVER_NAME = "org.apache.derby.jdbc.EmbeddedDriver"; + + private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION"; + + private static final String EXCHANGE_TABLE_NAME = "QPID_EXCHANGE"; + private static final String QUEUE_TABLE_NAME = "QPID_QUEUE"; + private static final String BINDINGS_TABLE_NAME = "QPID_BINDINGS"; + private static final String QUEUE_ENTRY_TABLE_NAME = "QPID_QUEUE_ENTRY"; + + private static final String META_DATA_TABLE_NAME = "QPID_META_DATA"; + private static final String MESSAGE_CONTENT_TABLE_NAME = "QPID_MESSAGE_CONTENT"; + + private static final int DB_VERSION = 3; + + + + private static Class<Driver> DRIVER_CLASS; + + private final AtomicLong _messageId = new AtomicLong(0); + private AtomicBoolean _closed = new AtomicBoolean(false); + + private String _connectionURL; + + private static final String TABLE_EXISTANCE_QUERY = "SELECT 1 FROM SYS.SYSTABLES WHERE TABLENAME = ?"; + + private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+DB_VERSION_TABLE_NAME+" ( version int not null )"; + private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+DB_VERSION_TABLE_NAME+" ( version ) VALUES ( ? )"; + + private static final String CREATE_EXCHANGE_TABLE = "CREATE TABLE "+EXCHANGE_TABLE_NAME+" ( name varchar(255) not null, type varchar(255) not null, autodelete SMALLINT not null, PRIMARY KEY ( name ) )"; + private static final String CREATE_QUEUE_TABLE = "CREATE TABLE "+QUEUE_TABLE_NAME+" ( name varchar(255) not null, owner varchar(255), exclusive SMALLINT not null, arguments blob, PRIMARY KEY ( name ))"; + private static final String CREATE_BINDINGS_TABLE = "CREATE TABLE "+BINDINGS_TABLE_NAME+" ( exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255) not null, arguments blob , PRIMARY KEY ( exchange_name, queue_name, binding_key ) )"; + private static final String SELECT_FROM_QUEUE = "SELECT name, owner, exclusive, arguments FROM " + QUEUE_TABLE_NAME; + private static final String FIND_QUEUE = "SELECT name, owner FROM " + QUEUE_TABLE_NAME + " WHERE name = ?"; + private static final String UPDATE_QUEUE_EXCLUSIVITY = "UPDATE " + QUEUE_TABLE_NAME + " SET exclusive = ? WHERE name = ?"; + private static final String SELECT_FROM_EXCHANGE = "SELECT name, type, autodelete FROM " + EXCHANGE_TABLE_NAME; + private static final String SELECT_FROM_BINDINGS = + "SELECT exchange_name, queue_name, binding_key, arguments FROM " + BINDINGS_TABLE_NAME + " ORDER BY exchange_name"; + private static final String FIND_BINDING = + "SELECT * FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ? AND queue_name = ? AND binding_key = ? "; + private static final String INSERT_INTO_EXCHANGE = "INSERT INTO " + EXCHANGE_TABLE_NAME + " ( name, type, autodelete ) VALUES ( ?, ?, ? )"; + private static final String DELETE_FROM_EXCHANGE = "DELETE FROM " + EXCHANGE_TABLE_NAME + " WHERE name = ?"; + private static final String FIND_EXCHANGE = "SELECT name FROM " + EXCHANGE_TABLE_NAME + " WHERE name = ?"; + private static final String INSERT_INTO_BINDINGS = "INSERT INTO " + BINDINGS_TABLE_NAME + " ( exchange_name, queue_name, binding_key, arguments ) values ( ?, ?, ?, ? )"; + private static final String DELETE_FROM_BINDINGS = "DELETE FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ? AND queue_name = ? AND binding_key = ?"; + private static final String INSERT_INTO_QUEUE = "INSERT INTO " + QUEUE_TABLE_NAME + " (name, owner, exclusive, arguments) VALUES (?, ?, ?, ?)"; + private static final String DELETE_FROM_QUEUE = "DELETE FROM " + QUEUE_TABLE_NAME + " WHERE name = ?"; + + private static final String CREATE_QUEUE_ENTRY_TABLE = "CREATE TABLE "+QUEUE_ENTRY_TABLE_NAME+" ( queue_name varchar(255) not null, message_id bigint not null, PRIMARY KEY (queue_name, message_id) )"; + private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_name, message_id) values (?,?)"; + private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_name = ? AND message_id =?"; + private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_name, message_id FROM " + QUEUE_ENTRY_TABLE_NAME + " ORDER BY queue_name, message_id"; + + + private static final String CREATE_META_DATA_TABLE = "CREATE TABLE "+META_DATA_TABLE_NAME+" ( message_id bigint not null, meta_data blob, PRIMARY KEY ( message_id ) )"; + private static final String CREATE_MESSAGE_CONTENT_TABLE = "CREATE TABLE "+MESSAGE_CONTENT_TABLE_NAME+" ( message_id bigint not null, offset int not null, last_byte int not null, content blob , PRIMARY KEY (message_id, offset) )"; + + private static final String INSERT_INTO_MESSAGE_CONTENT = "INSERT INTO " + MESSAGE_CONTENT_TABLE_NAME + "( message_id, offset, last_byte, content ) values (?, ?, ?, ?)"; + private static final String SELECT_FROM_MESSAGE_CONTENT = + "SELECT offset, content FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ? AND last_byte > ? AND offset < ? ORDER BY message_id, offset"; + private static final String DELETE_FROM_MESSAGE_CONTENT = "DELETE FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ?"; + + private static final String INSERT_INTO_META_DATA = "INSERT INTO " + META_DATA_TABLE_NAME + "( message_id , meta_data ) values (?, ?)";; + private static final String SELECT_FROM_META_DATA = + "SELECT meta_data FROM " + META_DATA_TABLE_NAME + " WHERE message_id = ?"; + private static final String DELETE_FROM_META_DATA = "DELETE FROM " + META_DATA_TABLE_NAME + " WHERE message_id = ?"; + private static final String SELECT_ALL_FROM_META_DATA = "SELECT message_id, meta_data FROM " + META_DATA_TABLE_NAME; + + private static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006"; + + + private LogSubject _logSubject; + private boolean _configured; + + + private enum State + { + INITIAL, + CONFIGURING, + RECOVERING, + STARTED, + CLOSING, + CLOSED + } + + private State _state = State.INITIAL; + + + public void configureConfigStore(String name, + ConfigurationRecoveryHandler recoveryHandler, + Configuration storeConfiguration, + LogSubject logSubject) throws Exception + { + stateTransition(State.INITIAL, State.CONFIGURING); + _logSubject = logSubject; + CurrentActor.get().message(_logSubject, ConfigStoreMessages.CREATED(this.getClass().getName())); + + if(!_configured) + { + commonConfiguration(name, storeConfiguration, logSubject); + _configured = true; + } + + // this recovers durable exchanges, queues, and bindings + recover(recoveryHandler); + + + stateTransition(State.RECOVERING, State.STARTED); + + } + + + public void configureMessageStore(String name, + MessageStoreRecoveryHandler recoveryHandler, + Configuration storeConfiguration, + LogSubject logSubject) throws Exception + { + CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED(this.getClass().getName())); + + if(!_configured) + { + + _logSubject = logSubject; + + commonConfiguration(name, storeConfiguration, logSubject); + _configured = true; + } + + recoverMessages(recoveryHandler); + + } + + + + public void configureTransactionLog(String name, + TransactionLogRecoveryHandler recoveryHandler, + Configuration storeConfiguration, + LogSubject logSubject) throws Exception + { + CurrentActor.get().message(_logSubject, TransactionLogMessages.CREATED(this.getClass().getName())); + + if(!_configured) + { + + _logSubject = logSubject; + + commonConfiguration(name, storeConfiguration, logSubject); + _configured = true; + } + + recoverQueueEntries(recoveryHandler); + + } + + + + private void commonConfiguration(String name, Configuration storeConfiguration, LogSubject logSubject) + throws ClassNotFoundException, SQLException + { + initialiseDriver(); + + //Update to pick up QPID_WORK and use that as the default location not just derbyDB + + final String databasePath = storeConfiguration.getString(ENVIRONMENT_PATH_PROPERTY, System.getProperty("QPID_WORK") + + File.separator + "derbyDB"); + + File environmentPath = new File(databasePath); + if (!environmentPath.exists()) + { + if (!environmentPath.mkdirs()) + { + throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. " + + "Ensure the path is correct and that the permissions are correct."); + } + } + + CurrentActor.get().message(_logSubject, MessageStoreMessages.STORE_LOCATION(environmentPath.getAbsolutePath())); + + createOrOpenDatabase(name, databasePath); + } + + private static synchronized void initialiseDriver() throws ClassNotFoundException + { + if(DRIVER_CLASS == null) + { + DRIVER_CLASS = (Class<Driver>) Class.forName(SQL_DRIVER_NAME); + } + } + + private void createOrOpenDatabase(String name, final String environmentPath) throws SQLException + { + //FIXME this the _vhost name should not be added here, but derby wont use an empty directory as was possibly just created. + _connectionURL = "jdbc:derby:" + environmentPath + "/" + name + ";create=true"; + + Connection conn = newAutoCommitConnection(); + + createVersionTable(conn); + createExchangeTable(conn); + createQueueTable(conn); + createBindingsTable(conn); + createQueueEntryTable(conn); + createMetaDataTable(conn); + createMessageContentTable(conn); + + conn.close(); + } + + + + private void createVersionTable(final Connection conn) throws SQLException + { + if(!tableExists(DB_VERSION_TABLE_NAME, conn)) + { + Statement stmt = conn.createStatement(); + try + { + stmt.execute(CREATE_DB_VERSION_TABLE); + } + finally + { + stmt.close(); + } + + PreparedStatement pstmt = conn.prepareStatement(INSERT_INTO_DB_VERSION); + try + { + pstmt.setInt(1, DB_VERSION); + pstmt.execute(); + } + finally + { + pstmt.close(); + } + } + + } + + + private void createExchangeTable(final Connection conn) throws SQLException + { + if(!tableExists(EXCHANGE_TABLE_NAME, conn)) + { + Statement stmt = conn.createStatement(); + try + { + stmt.execute(CREATE_EXCHANGE_TABLE); + } + finally + { + stmt.close(); + } + } + } + + private void createQueueTable(final Connection conn) throws SQLException + { + if(!tableExists(QUEUE_TABLE_NAME, conn)) + { + Statement stmt = conn.createStatement(); + try + { + stmt.execute(CREATE_QUEUE_TABLE); + } + finally + { + stmt.close(); + } + } + } + + private void createBindingsTable(final Connection conn) throws SQLException + { + if(!tableExists(BINDINGS_TABLE_NAME, conn)) + { + Statement stmt = conn.createStatement(); + try + { + stmt.execute(CREATE_BINDINGS_TABLE); + } + finally + { + stmt.close(); + } + } + + } + + private void createQueueEntryTable(final Connection conn) throws SQLException + { + if(!tableExists(QUEUE_ENTRY_TABLE_NAME, conn)) + { + Statement stmt = conn.createStatement(); + try + { + stmt.execute(CREATE_QUEUE_ENTRY_TABLE); + } + finally + { + stmt.close(); + } + } + + } + + private void createMetaDataTable(final Connection conn) throws SQLException + { + if(!tableExists(META_DATA_TABLE_NAME, conn)) + { + Statement stmt = conn.createStatement(); + try + { + stmt.execute(CREATE_META_DATA_TABLE); + } + finally + { + stmt.close(); + } + } + + } + + + private void createMessageContentTable(final Connection conn) throws SQLException + { + if(!tableExists(MESSAGE_CONTENT_TABLE_NAME, conn)) + { + Statement stmt = conn.createStatement(); + try + { + stmt.execute(CREATE_MESSAGE_CONTENT_TABLE); + } + finally + { + stmt.close(); + } + } + + } + + + + private boolean tableExists(final String tableName, final Connection conn) throws SQLException + { + PreparedStatement stmt = conn.prepareStatement(TABLE_EXISTANCE_QUERY); + try + { + stmt.setString(1, tableName); + ResultSet rs = stmt.executeQuery(); + try + { + return rs.next(); + } + finally + { + rs.close(); + } + } + finally + { + stmt.close(); + } + + } + + public void recover(ConfigurationRecoveryHandler recoveryHandler) throws AMQException + { + stateTransition(State.CONFIGURING, State.RECOVERING); + + CurrentActor.get().message(_logSubject,MessageStoreMessages.RECOVERY_START()); + + try + { + ConfigurationRecoveryHandler.QueueRecoveryHandler qrh = recoveryHandler.begin(this); + loadQueues(qrh); + + ConfigurationRecoveryHandler.ExchangeRecoveryHandler erh = qrh.completeQueueRecovery(); + List<String> exchanges = loadExchanges(erh); + ConfigurationRecoveryHandler.BindingRecoveryHandler brh = erh.completeExchangeRecovery(); + recoverBindings(brh, exchanges); + brh.completeBindingRecovery(); + } + catch (SQLException e) + { + + throw new AMQStoreException("Error recovering persistent state: " + e.getMessage(), e); + } + + + } + + private void loadQueues(ConfigurationRecoveryHandler.QueueRecoveryHandler qrh) throws SQLException + { + Connection conn = newAutoCommitConnection(); + try + { + Statement stmt = conn.createStatement(); + try + { + ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE); + try + { + + while(rs.next()) + { + String queueName = rs.getString(1); + String owner = rs.getString(2); + boolean exclusive = rs.getBoolean(3); + Blob argumentsAsBlob = rs.getBlob(4); + + byte[] dataAsBytes = argumentsAsBlob.getBytes(1,(int) argumentsAsBlob.length()); + FieldTable arguments; + if(dataAsBytes.length > 0) + { + org.apache.mina.common.ByteBuffer buffer = org.apache.mina.common.ByteBuffer.wrap(dataAsBytes); + + arguments = new FieldTable(buffer,buffer.limit()); + } + else + { + arguments = null; + } + + qrh.queue(queueName, owner, exclusive, arguments); + + } + + } + finally + { + rs.close(); + } + } + finally + { + stmt.close(); + } + } + finally + { + conn.close(); + } + } + + + private List<String> loadExchanges(ConfigurationRecoveryHandler.ExchangeRecoveryHandler erh) throws SQLException + { + + List<String> exchanges = new ArrayList<String>(); + Connection conn = null; + try + { + conn = newAutoCommitConnection(); + + Statement stmt = conn.createStatement(); + try + { + ResultSet rs = stmt.executeQuery(SELECT_FROM_EXCHANGE); + try + { + while(rs.next()) + { + String exchangeName = rs.getString(1); + String type = rs.getString(2); + boolean autoDelete = rs.getShort(3) != 0; + + exchanges.add(exchangeName); + + erh.exchange(exchangeName, type, autoDelete); + + } + return exchanges; + } + finally + { + rs.close(); + } + } + finally + { + stmt.close(); + } + } + finally + { + if(conn != null) + { + conn.close(); + } + } + + } + + private void recoverBindings(ConfigurationRecoveryHandler.BindingRecoveryHandler brh, List<String> exchanges) throws SQLException + { + _logger.info("Recovering bindings..."); + + Connection conn = null; + try + { + conn = newAutoCommitConnection(); + + PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_BINDINGS); + + try + { + ResultSet rs = stmt.executeQuery(); + + try + { + + while(rs.next()) + { + String exchangeName = rs.getString(1); + String queueName = rs.getString(2); + String bindingKey = rs.getString(3); + Blob arguments = rs.getBlob(4); + java.nio.ByteBuffer buf; + + if(arguments != null && arguments.length() != 0) + { + byte[] argumentBytes = arguments.getBytes(1, (int) arguments.length()); + buf = java.nio.ByteBuffer.wrap(argumentBytes); + } + else + { + buf = null; + } + + brh.binding(exchangeName, queueName, bindingKey, buf); + } + } + finally + { + rs.close(); + } + } + finally + { + stmt.close(); + } + + } + finally + { + if(conn != null) + { + conn.close(); + } + } + } + + + + public void close() throws Exception + { + CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED()); + _closed.getAndSet(true); + + try + { + Connection conn = DriverManager.getConnection(_connectionURL + ";shutdown=true"); + // Shouldn't reach this point - shutdown=true should throw SQLException + conn.close(); + _logger.error("Unable to shut down the store"); + } + catch (SQLException e) + { + if (e.getSQLState().equalsIgnoreCase(DERBY_SINGLE_DB_SHUTDOWN_CODE)) + { + //expected and represents a clean shutdown of this database only, do nothing. + } + else + { + _logger.error("Exception whilst shutting down the store: " + e); + } + } + } + + public StoredMessage addMessage(StorableMessageMetaData metaData) + { + if(metaData.isPersistent()) + { + return new StoredDerbyMessage(_messageId.incrementAndGet(), metaData); + } + else + { + return new StoredMemoryMessage(_messageId.incrementAndGet(), metaData); + } + } + + public StoredMessage getMessage(long messageNumber) + { + return null; + } + + public void removeMessage(long messageId) + { + try + { + Connection conn = newConnection(); + try + { + PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_META_DATA); + try + { + stmt.setLong(1,messageId); + int results = stmt.executeUpdate(); + stmt.close(); + + if (results == 0) + { + throw new RuntimeException("Message metadata not found for message id " + messageId); + } + + if (_logger.isDebugEnabled()) + { + _logger.debug("Deleted metadata for message " + messageId); + } + + stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_CONTENT); + stmt.setLong(1,messageId); + results = stmt.executeUpdate(); + } + finally + { + stmt.close(); + } + conn.commit(); + } + catch(SQLException e) + { + try + { + conn.rollback(); + } + catch(SQLException t) + { + // ignore - we are re-throwing underlying exception + } + + throw e; + + } + finally + { + conn.close(); + } + } + catch (SQLException e) + { + throw new RuntimeException("Error removing message with id " + messageId + " from database: " + e.getMessage(), e); + } + + } + + public void createExchange(Exchange exchange) throws AMQStoreException + { + if (_state != State.RECOVERING) + { + try + { + Connection conn = newAutoCommitConnection(); + + try + { + + + PreparedStatement stmt = conn.prepareStatement(FIND_EXCHANGE); + try + { + stmt.setString(1, exchange.getNameShortString().toString()); + ResultSet rs = stmt.executeQuery(); + try + { + + // If we don't have any data in the result set then we can add this exchange + if (!rs.next()) + { + + PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_EXCHANGE); + try + { + insertStmt.setString(1, exchange.getName().toString()); + insertStmt.setString(2, exchange.getTypeShortString().asString()); + insertStmt.setShort(3, exchange.isAutoDelete() ? (short) 1 : (short) 0); + insertStmt.execute(); + } + finally + { + insertStmt.close(); + } + } + } + finally + { + rs.close(); + } + } + finally + { + stmt.close(); + } + + } + finally + { + conn.close(); + } + } + catch (SQLException e) + { + throw new AMQStoreException("Error writing Exchange with name " + exchange.getNameShortString() + " to database: " + e.getMessage(), e); + } + } + + } + + public void removeExchange(Exchange exchange) throws AMQStoreException + { + + try + { + Connection conn = newAutoCommitConnection(); + try + { + PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_EXCHANGE); + try + { + stmt.setString(1, exchange.getNameShortString().toString()); + int results = stmt.executeUpdate(); + stmt.close(); + if(results == 0) + { + throw new AMQStoreException("Exchange " + exchange.getNameShortString() + " not found"); + } + } + finally + { + stmt.close(); + } + } + finally + { + conn.close(); + } + } + catch (SQLException e) + { + throw new AMQStoreException("Error deleting Exchange with name " + exchange.getNameShortString() + " from database: " + e.getMessage(), e); + } + } + + public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) + throws AMQStoreException + { + if (_state != State.RECOVERING) + { + try + { + Connection conn = newAutoCommitConnection(); + + try + { + + PreparedStatement stmt = conn.prepareStatement(FIND_BINDING); + try + { + stmt.setString(1, exchange.getNameShortString().toString() ); + stmt.setString(2, queue.getNameShortString().toString()); + stmt.setString(3, routingKey == null ? null : routingKey.toString()); + + ResultSet rs = stmt.executeQuery(); + try + { + // If this binding is not already in the store then create it. + if (!rs.next()) + { + PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_BINDINGS); + try + { + insertStmt.setString(1, exchange.getNameShortString().toString() ); + insertStmt.setString(2, queue.getNameShortString().toString()); + insertStmt.setString(3, routingKey == null ? null : routingKey.toString()); + if(args != null) + { + /* This would be the Java 6 way of setting a Blob + Blob blobArgs = conn.createBlob(); + blobArgs.setBytes(0, args.getDataAsBytes()); + stmt.setBlob(4, blobArgs); + */ + byte[] bytes = args.getDataAsBytes(); + ByteArrayInputStream bis = new ByteArrayInputStream(bytes); + insertStmt.setBinaryStream(4, bis, bytes.length); + } + else + { + insertStmt.setNull(4, Types.BLOB); + } + + insertStmt.executeUpdate(); + } + finally + { + insertStmt.close(); + } + } + } + finally + { + rs.close(); + } + } + finally + { + stmt.close(); + } + } + finally + { + conn.close(); + } + } + catch (SQLException e) + { + throw new AMQStoreException("Error writing binding for AMQQueue with name " + queue.getNameShortString() + " to exchange " + + exchange.getNameShortString() + " to database: " + e.getMessage(), e); + } + + } + + + } + + public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) + throws AMQStoreException + { + Connection conn = null; + + try + { + conn = newAutoCommitConnection(); + // exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255), arguments blob + PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_BINDINGS); + stmt.setString(1, exchange.getNameShortString().toString() ); + stmt.setString(2, queue.getNameShortString().toString()); + stmt.setString(3, routingKey == null ? null : routingKey.toString()); + + int result = stmt.executeUpdate(); + stmt.close(); + + if(result != 1) + { + throw new AMQStoreException("Queue binding for queue with name " + queue.getNameShortString() + " to exchange " + + exchange.getNameShortString() + " not found"); + } + } + catch (SQLException e) + { + throw new AMQStoreException("Error removing binding for AMQQueue with name " + queue.getNameShortString() + " to exchange " + + exchange.getNameShortString() + " in database: " + e.getMessage(), e); + } + finally + { + if(conn != null) + { + try + { + conn.close(); + } + catch (SQLException e) + { + _logger.error(e); + } + } + + } + + + } + + public void createQueue(AMQQueue queue) throws AMQStoreException + { + createQueue(queue, null); + } + + public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException + { + _logger.debug("public void createQueue(AMQQueue queue = " + queue + "): called"); + + if (_state != State.RECOVERING) + { + try + { + Connection conn = newAutoCommitConnection(); + + PreparedStatement stmt = conn.prepareStatement(FIND_QUEUE); + try + { + stmt.setString(1, queue.getNameShortString().toString()); + ResultSet rs = stmt.executeQuery(); + try + { + + // If we don't have any data in the result set then we can add this queue + if (!rs.next()) + { + PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_QUEUE); + + try + { + String owner = queue.getOwner() == null ? null : queue.getOwner().toString(); + + insertStmt.setString(1, queue.getNameShortString().toString()); + insertStmt.setString(2, owner); + insertStmt.setBoolean(3,queue.isExclusive()); + + final byte[] underlying; + if(arguments != null) + { + underlying = arguments.getDataAsBytes(); + } + else + { + underlying = new byte[0]; + } + + ByteArrayInputStream bis = new ByteArrayInputStream(underlying); + insertStmt.setBinaryStream(4,bis,underlying.length); + + insertStmt.execute(); + } + finally + { + insertStmt.close(); + } + } + } + finally + { + rs.close(); + } + } + finally + { + stmt.close(); + } + conn.close(); + + } + catch (SQLException e) + { + throw new AMQStoreException("Error writing AMQQueue with name " + queue.getNameShortString() + " to database: " + e.getMessage(), e); + } + } + } + + /** + * Updates the specified queue in the persistent store, IF it is already present. If the queue + * is not present in the store, it will not be added. + * + * NOTE: Currently only updates the exclusivity. + * + * @param queue The queue to update the entry for. + * @throws AMQStoreException If the operation fails for any reason. + */ + public void updateQueue(final AMQQueue queue) throws AMQStoreException + { + if (_state != State.RECOVERING) + { + try + { + Connection conn = newAutoCommitConnection(); + + try + { + PreparedStatement stmt = conn.prepareStatement(FIND_QUEUE); + try + { + stmt.setString(1, queue.getNameShortString().toString()); + + ResultSet rs = stmt.executeQuery(); + try + { + if (rs.next()) + { + PreparedStatement stmt2 = conn.prepareStatement(UPDATE_QUEUE_EXCLUSIVITY); + try + { + stmt2.setBoolean(1,queue.isExclusive()); + stmt2.setString(2, queue.getNameShortString().toString()); + + stmt2.execute(); + } + finally + { + stmt2.close(); + } + } + } + finally + { + rs.close(); + } + } + finally + { + stmt.close(); + } + } + finally + { + conn.close(); + } + } + catch (SQLException e) + { + throw new AMQStoreException("Error updating AMQQueue with name " + queue.getNameShortString() + " to database: " + e.getMessage(), e); + } + } + + } + + /** + * Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED + * isolation and with auto-commit transactions enabled. + */ + private Connection newAutoCommitConnection() throws SQLException + { + final Connection connection = newConnection(); + try + { + connection.setAutoCommit(true); + } + catch (SQLException sqlEx) + { + + try + { + connection.close(); + } + finally + { + throw sqlEx; + } + } + + return connection; + } + + /** + * Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED + * isolation and with auto-commit transactions disabled. + */ + private Connection newConnection() throws SQLException + { + final Connection connection = DriverManager.getConnection(_connectionURL); + try + { + connection.setAutoCommit(false); + connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); + } + catch (SQLException sqlEx) + { + try + { + connection.close(); + } + finally + { + throw sqlEx; + } + } + return connection; + } + + public void removeQueue(final AMQQueue queue) throws AMQStoreException + { + AMQShortString name = queue.getNameShortString(); + _logger.debug("public void removeQueue(AMQShortString name = " + name + "): called"); + Connection conn = null; + + try + { + conn = newAutoCommitConnection(); + PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE); + stmt.setString(1, name.toString()); + int results = stmt.executeUpdate(); + stmt.close(); + + if (results == 0) + { + throw new AMQStoreException("Queue " + name + " not found"); + } + } + catch (SQLException e) + { + throw new AMQStoreException("Error deleting AMQQueue with name " + name + " from database: " + e.getMessage(), e); + } + finally + { + if(conn != null) + { + try + { + conn.close(); + } + catch (SQLException e) + { + _logger.error(e); + } + } + + } + + + } + + public Transaction newTransaction() + { + return new DerbyTransaction(); + } + + public void enqueueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws AMQStoreException + { + String name = queue.getResourceName(); + + Connection conn = connWrapper.getConnection(); + + + try + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Enqueuing message " + messageId + " on queue " + name + "[Connection" + conn + "]"); + } + + PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY); + try + { + stmt.setString(1,name); + stmt.setLong(2,messageId); + stmt.executeUpdate(); + } + finally + { + stmt.close(); + } + } + catch (SQLException e) + { + _logger.error("Failed to enqueue: " + e.getMessage(), e); + throw new AMQStoreException("Error writing enqueued message with id " + messageId + " for queue " + name + + " to database", e); + } + + } + + public void dequeueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws AMQStoreException + { + String name = queue.getResourceName(); + + + Connection conn = connWrapper.getConnection(); + + + try + { + PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE_ENTRY); + try + { + stmt.setString(1,name); + stmt.setLong(2,messageId); + int results = stmt.executeUpdate(); + + + + if(results != 1) + { + throw new AMQStoreException("Unable to find message with id " + messageId + " on queue " + name); + } + + if (_logger.isDebugEnabled()) + { + _logger.debug("Dequeuing message " + messageId + " on queue " + name );//+ "[Connection" + conn + "]"); + } + } + finally + { + stmt.close(); + } + } + catch (SQLException e) + { + _logger.error("Failed to dequeue: " + e.getMessage(), e); + throw new AMQStoreException("Error deleting enqueued message with id " + messageId + " for queue " + name + + " from database", e); + } + + } + + private static final class ConnectionWrapper + { + private final Connection _connection; + + public ConnectionWrapper(Connection conn) + { + _connection = conn; + } + + public Connection getConnection() + { + return _connection; + } + } + + + public void commitTran(ConnectionWrapper connWrapper) throws AMQStoreException + { + + try + { + Connection conn = connWrapper.getConnection(); + conn.commit(); + + if (_logger.isDebugEnabled()) + { + _logger.debug("commit tran completed"); + } + + conn.close(); + } + catch (SQLException e) + { + throw new AMQStoreException("Error commit tx: " + e.getMessage(), e); + } + finally + { + + } + } + + public StoreFuture commitTranAsync(ConnectionWrapper connWrapper) throws AMQStoreException + { + commitTran(connWrapper); + return new StoreFuture() + { + public boolean isComplete() + { + return true; + } + + public void waitForCompletion() + { + + } + }; + + } + + public void abortTran(ConnectionWrapper connWrapper) throws AMQStoreException + { + if (connWrapper == null) + { + throw new AMQStoreException("Fatal internal error: transactional context is empty at abortTran"); + } + + if (_logger.isDebugEnabled()) + { + _logger.debug("abort tran called: " + connWrapper.getConnection()); + } + + try + { + Connection conn = connWrapper.getConnection(); + conn.rollback(); + conn.close(); + } + catch (SQLException e) + { + throw new AMQStoreException("Error aborting transaction: " + e.getMessage(), e); + } + + } + + public Long getNewMessageId() + { + return _messageId.incrementAndGet(); + } + + + private void storeMetaData(Connection conn, long messageId, StorableMessageMetaData metaData) + throws SQLException + { + if(_logger.isDebugEnabled()) + { + _logger.debug("Adding metadata for message " +messageId); + } + + PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_META_DATA); + try + { + stmt.setLong(1,messageId); + + final int bodySize = 1 + metaData.getStorableSize(); + byte[] underlying = new byte[bodySize]; + underlying[0] = (byte) metaData.getType().ordinal(); + java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(underlying); + buf.position(1); + buf = buf.slice(); + + metaData.writeToBuffer(0, buf); + ByteArrayInputStream bis = new ByteArrayInputStream(underlying); + try + { + stmt.setBinaryStream(2,bis,underlying.length); + int result = stmt.executeUpdate(); + + if(result == 0) + { + throw new RuntimeException("Unable to add meta data for message " +messageId); + } + } + finally + { + try + { + bis.close(); + } + catch (IOException e) + { + + throw new SQLException(e); + } + } + + } + finally + { + stmt.close(); + } + + } + + + + + private void recoverMessages(MessageStoreRecoveryHandler recoveryHandler) throws SQLException + { + Connection conn = newAutoCommitConnection(); + try + { + MessageStoreRecoveryHandler.StoredMessageRecoveryHandler messageHandler = recoveryHandler.begin(); + + Statement stmt = conn.createStatement(); + try + { + ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_META_DATA); + try + { + + long maxId = 0; + + while(rs.next()) + { + + long messageId = rs.getLong(1); + Blob dataAsBlob = rs.getBlob(2); + + if(messageId > maxId) + { + maxId = messageId; + } + + byte[] dataAsBytes = dataAsBlob.getBytes(1,(int) dataAsBlob.length()); + java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(dataAsBytes); + buf.position(1); + buf = buf.slice(); + MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]]; + StorableMessageMetaData metaData = type.getFactory().createMetaData(buf); + StoredDerbyMessage message = new StoredDerbyMessage(messageId, metaData, false); + messageHandler.message(message); + } + + _messageId.set(maxId); + + messageHandler.completeMessageRecovery(); + } + finally + { + rs.close(); + } + } + finally + { + stmt.close(); + } + } + finally + { + conn.close(); + } + } + + + + private void recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) throws SQLException + { + Connection conn = newAutoCommitConnection(); + try + { + TransactionLogRecoveryHandler.QueueEntryRecoveryHandler queueEntryHandler = recoveryHandler.begin(this); + + Statement stmt = conn.createStatement(); + try + { + ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY); + try + { + while(rs.next()) + { + + String queueName = rs.getString(1); + long messageId = rs.getLong(2); + queueEntryHandler.queueEntry(queueName,messageId); + } + } + finally + { + rs.close(); + } + } + finally + { + stmt.close(); + } + + queueEntryHandler.completeQueueEntryRecovery(); + } + finally + { + conn.close(); + } + } + + StorableMessageMetaData getMetaData(long messageId) throws SQLException + { + + Connection conn = newAutoCommitConnection(); + try + { + PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_META_DATA); + try + { + stmt.setLong(1,messageId); + ResultSet rs = stmt.executeQuery(); + try + { + + if(rs.next()) + { + Blob dataAsBlob = rs.getBlob(1); + + byte[] dataAsBytes = dataAsBlob.getBytes(1,(int) dataAsBlob.length()); + java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(dataAsBytes); + buf.position(1); + buf = buf.slice(); + MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]]; + StorableMessageMetaData metaData = type.getFactory().createMetaData(buf); + + return metaData; + } + else + { + throw new RuntimeException("Meta data not found for message with id " + messageId); + } + } + finally + { + rs.close(); + } + } + finally + { + stmt.close(); + } + } + finally + { + conn.close(); + } + } + + + private void addContent(Connection conn, long messageId, int offset, ByteBuffer src) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("Adding content chunk offset " + offset + " for message " +messageId); + } + + try + { + src = src.slice(); + + byte[] chunkData = new byte[src.limit()]; + src.duplicate().get(chunkData); + + PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_CONTENT); + stmt.setLong(1,messageId); + stmt.setInt(2, offset); + stmt.setInt(3, offset+chunkData.length); + + + /* this would be the Java 6 way of doing things + Blob dataAsBlob = conn.createBlob(); + dataAsBlob.setBytes(1L, chunkData); + stmt.setBlob(3, dataAsBlob); + */ + ByteArrayInputStream bis = new ByteArrayInputStream(chunkData); + stmt.setBinaryStream(4, bis, chunkData.length); + stmt.executeUpdate(); + stmt.close(); + } + catch (SQLException e) + { + if(conn != null) + { + try + { + conn.close(); + } + catch (SQLException e1) + { + + } + } + + throw new RuntimeException("Error adding content chunk offset " + offset + " for message " + messageId + ": " + e.getMessage(), e); + } + + } + + + public int getContent(long messageId, int offset, ByteBuffer dst) + { + Connection conn = null; + + + try + { + conn = newAutoCommitConnection(); + + PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_CONTENT); + stmt.setLong(1,messageId); + stmt.setInt(2, offset); + stmt.setInt(3, offset+dst.remaining()); + ResultSet rs = stmt.executeQuery(); + + int written = 0; + + while(rs.next()) + { + int offsetInMessage = rs.getInt(1); + Blob dataAsBlob = rs.getBlob(2); + + final int size = (int) dataAsBlob.length(); + byte[] dataAsBytes = dataAsBlob.getBytes(1, size); + + int posInArray = offset + written - offsetInMessage; + int count = size - posInArray; + if(count > dst.remaining()) + { + count = dst.remaining(); + } + dst.put(dataAsBytes,posInArray,count); + written+=count; + + if(dst.remaining() == 0) + { + break; + } + } + + stmt.close(); + conn.close(); + return written; + + } + catch (SQLException e) + { + if(conn != null) + { + try + { + conn.close(); + } + catch (SQLException e1) + { + + } + } + + throw new RuntimeException("Error retrieving content from offset " + offset + " for message " + messageId + ": " + e.getMessage(), e); + } + + + + } + + public boolean isPersistent() + { + return true; + } + + + private synchronized void stateTransition(State requiredState, State newState) throws AMQStoreException + { + if (_state != requiredState) + { + throw new AMQStoreException("Cannot transition to the state: " + newState + "; need to be in state: " + requiredState + + "; currently in state: " + _state); + } + + _state = newState; + } + + + private class DerbyTransaction implements Transaction + { + private final ConnectionWrapper _connWrapper; + + + private DerbyTransaction() + { + try + { + _connWrapper = new ConnectionWrapper(newConnection()); + } + catch (SQLException e) + { + throw new RuntimeException(e); + } + } + + public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException + { + DerbyMessageStore.this.enqueueMessage(_connWrapper, queue, messageId); + } + + public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException + { + DerbyMessageStore.this.dequeueMessage(_connWrapper, queue, messageId); + + } + + public void commitTran() throws AMQStoreException + { + DerbyMessageStore.this.commitTran(_connWrapper); + } + + public StoreFuture commitTranAsync() throws AMQStoreException + { + return DerbyMessageStore.this.commitTranAsync(_connWrapper); + } + + public void abortTran() throws AMQStoreException + { + DerbyMessageStore.this.abortTran(_connWrapper); + } + } + + private class StoredDerbyMessage implements StoredMessage + { + + private final long _messageId; + private volatile SoftReference<StorableMessageMetaData> _metaDataRef; + private Connection _conn; + + StoredDerbyMessage(long messageId, StorableMessageMetaData metaData) + { + this(messageId, metaData, true); + } + + + StoredDerbyMessage(long messageId, + StorableMessageMetaData metaData, boolean persist) + { + try + { + _messageId = messageId; + + _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData); + if(persist) + { + _conn = newConnection(); + storeMetaData(_conn, messageId, metaData); + } + } + catch (SQLException e) + { + throw new RuntimeException(e); + } + + } + + public StorableMessageMetaData getMetaData() + { + StorableMessageMetaData metaData = _metaDataRef.get(); + if(metaData == null) + { + try + { + metaData = DerbyMessageStore.this.getMetaData(_messageId); + } + catch (SQLException e) + { + throw new RuntimeException(e); + } + _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData); + } + + return metaData; + } + + public long getMessageNumber() + { + return _messageId; + } + + public void addContent(int offsetInMessage, java.nio.ByteBuffer src) + { + DerbyMessageStore.this.addContent(_conn, _messageId, offsetInMessage, src); + } + + public int getContent(int offsetInMessage, java.nio.ByteBuffer dst) + { + return DerbyMessageStore.this.getContent(_messageId, offsetInMessage, dst); + } + + public StoreFuture flushToStore() + { + try + { + if(_conn != null) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("Flushing message " + _messageId + " to store"); + } + + _conn.commit(); + _conn.close(); + } + } + catch (SQLException e) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("Error when trying to flush message " + _messageId + " to store: " + e); + } + throw new RuntimeException(e); + } + finally + { + _conn = null; + } + return IMMEDIATE_FUTURE; + } + + public void remove() + { + flushToStore(); + DerbyMessageStore.this.removeMessage(_messageId); + } + } + + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java new file mode 100755 index 0000000000..5fb23653cb --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java @@ -0,0 +1,131 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import org.apache.commons.configuration.Configuration; +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.queue.AMQQueue; + +public interface DurableConfigurationStore +{ + + public static interface Source + { + DurableConfigurationStore getDurableConfigurationStore(); + } + + /** + * Called after instantiation in order to configure the message store. A particular implementation can define + * whatever parameters it wants. + * + * @param name The name to be used by this storem + * @param recoveryHandler Handler to be called as the store recovers on start up + * @param config The apache commons configuration object. + * + * @throws Exception If any error occurs that means the store is unable to configure itself. + */ + void configureConfigStore(String name, + ConfigurationRecoveryHandler recoveryHandler, + Configuration config, + LogSubject logSubject) throws Exception; + /** + * Makes the specified exchange persistent. + * + * @param exchange The exchange to persist. + * + * @throws AMQStoreException If the operation fails for any reason. + */ + void createExchange(Exchange exchange) throws AMQStoreException; + + /** + * Removes the specified persistent exchange. + * + * @param exchange The exchange to remove. + * + * @throws AMQStoreException If the operation fails for any reason. + */ + void removeExchange(Exchange exchange) throws AMQStoreException; + + /** + * Binds the specified queue to an exchange with a routing key. + * + * @param exchange The exchange to bind to. + * @param routingKey The routing key to bind by. + * @param queue The queue to bind. + * @param args Additional parameters. + * + * @throws AMQStoreException if the operation fails for any reason. + */ + void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException; + + /** + * Unbinds the specified from an exchange under a particular routing key. + * + * @param exchange The exchange to unbind from. + * @param routingKey The routing key to unbind. + * @param queue The queue to unbind. + * @param args Additonal parameters. + * + * @throws AMQStoreException If the operation fails for any reason. + */ + void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException; + + /** + * Makes the specified queue persistent. + * + * @param queue The queue to store. + * + * @throws AMQStoreException If the operation fails for any reason. + */ + void createQueue(AMQQueue queue) throws AMQStoreException; + + /** + * Makes the specified queue persistent. + * + * @param queue The queue to store. + * @param arguments The additional arguments to the binding + * + * @throws AMQStoreException If the operation fails for any reason. + */ + void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException; + + /** + * Removes the specified queue from the persistent store. + * + * @param queue The queue to remove. + * + * @throws AMQStoreException If the operation fails for any reason. + */ + void removeQueue(AMQQueue queue) throws AMQStoreException; + + /** + * Updates the specified queue in the persistent store, IF it is already present. If the queue + * is not present in the store, it will not be added. + * + * @param queue The queue to update the entry for. + * @throws AMQStoreException If the operation fails for any reason. + */ + void updateQueue(AMQQueue queue) throws AMQStoreException; +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java new file mode 100644 index 0000000000..d008d42fa0 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -0,0 +1,196 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.configuration.Configuration; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.messages.ConfigStoreMessages; +import org.apache.qpid.server.logging.messages.MessageStoreMessages; +import org.apache.qpid.server.queue.AMQQueue; + +/** A simple message store that stores the messages in a threadsafe structure in memory. */ +public class MemoryMessageStore implements MessageStore +{ + private static final Logger _log = Logger.getLogger(MemoryMessageStore.class); + + private static final int DEFAULT_HASHTABLE_CAPACITY = 50000; + + private static final String HASHTABLE_CAPACITY_CONFIG = "hashtable-capacity"; + + + private final AtomicLong _messageId = new AtomicLong(1); + private AtomicBoolean _closed = new AtomicBoolean(false); + private LogSubject _logSubject; + + private static final Transaction IN_MEMORY_TRANSACTION = new Transaction() + { + public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException + { + } + + public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException + { + } + + public void commitTran() throws AMQStoreException + { + } + + public StoreFuture commitTranAsync() throws AMQStoreException + { + return IMMEDIATE_FUTURE; + } + + public void abortTran() throws AMQStoreException + { + } + + }; + + public void configureConfigStore(String name, ConfigurationRecoveryHandler handler, Configuration configuration, LogSubject logSubject) throws Exception + { + _logSubject = logSubject; + CurrentActor.get().message(_logSubject, ConfigStoreMessages.CREATED(this.getClass().getName())); + + + } + + public void configureMessageStore(String name, + MessageStoreRecoveryHandler recoveryHandler, + Configuration config, + LogSubject logSubject) throws Exception + { + if(_logSubject == null) + { + _logSubject = logSubject; + } + int hashtableCapacity = config.getInt(name + "." + HASHTABLE_CAPACITY_CONFIG, DEFAULT_HASHTABLE_CAPACITY); + _log.info("Using capacity " + hashtableCapacity + " for hash tables"); + CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED(this.getClass().getName())); + } + + public void close() throws Exception + { + _closed.getAndSet(true); + CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED()); + + } + + public StoredMessage addMessage(StorableMessageMetaData metaData) + { + final long id = _messageId.getAndIncrement(); + StoredMemoryMessage message = new StoredMemoryMessage(id, metaData); + + return message; + } + + + public void createExchange(Exchange exchange) throws AMQStoreException + { + + } + + public void removeExchange(Exchange exchange) throws AMQStoreException + { + + } + + public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException + { + + } + + public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException + { + + } + + + public void createQueue(AMQQueue queue) throws AMQStoreException + { + // Not requred to do anything + } + + public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException + { + // Not required to do anything + } + + public void removeQueue(final AMQQueue queue) throws AMQStoreException + { + // Not required to do anything + } + + public void updateQueue(final AMQQueue queue) throws AMQStoreException + { + // Not required to do anything + } + + public void configureTransactionLog(String name, + TransactionLogRecoveryHandler recoveryHandler, + Configuration storeConfiguration, + LogSubject logSubject) throws Exception + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public Transaction newTransaction() + { + return IN_MEMORY_TRANSACTION; + } + + + public List<AMQQueue> createQueues() throws AMQException + { + return null; + } + + public Long getNewMessageId() + { + return _messageId.getAndIncrement(); + } + + public boolean isPersistent() + { + return false; + } + + private void checkNotClosed() throws MessageStoreClosedException + { + if (_closed.get()) + { + throw new MessageStoreClosedException(); + } + } + + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataType.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataType.java new file mode 100755 index 0000000000..428bb1e41b --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataType.java @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import org.apache.qpid.server.message.MessageMetaData; +import org.apache.qpid.server.message.MessageMetaData_0_10; + +import java.nio.ByteBuffer; + +public enum MessageMetaDataType +{ + META_DATA_0_8 { public Factory<MessageMetaData> getFactory() { return MessageMetaData.FACTORY; } }, + META_DATA_0_10 { public Factory<MessageMetaData_0_10> getFactory() { return MessageMetaData_0_10.FACTORY; } }; + + + public static interface Factory<M extends StorableMessageMetaData> + { + M createMetaData(ByteBuffer buf); + } + + abstract public Factory<? extends StorableMessageMetaData> getFactory(); + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java new file mode 100644 index 0000000000..e2fca2f9c7 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java @@ -0,0 +1,80 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import org.apache.qpid.server.logging.LogSubject; +import org.apache.commons.configuration.Configuration; + +/** + * MessageStore defines the interface to a storage area, which can be used to preserve the state of messages. + * + */ +public interface MessageStore extends DurableConfigurationStore, TransactionLog +{ + StoreFuture IMMEDIATE_FUTURE = new StoreFuture() + { + public boolean isComplete() + { + return true; + } + + public void waitForCompletion() + { + + } + }; + + + /** + * Called after instantiation in order to configure the message store. A particular implementation can define + * whatever parameters it wants. + * + * @param name The name to be used by this storem + * @param recoveryHandler Handler to be called as the store recovers on start up + * @param config The apache commons configuration object. + * + * @throws Exception If any error occurs that means the store is unable to configure itself. + */ + void configureMessageStore(String name, + MessageStoreRecoveryHandler recoveryHandler, + Configuration config, + LogSubject logSubject) throws Exception; + + /** + * Called to close and cleanup any resources used by the message store. + * + * @throws Exception If the close fails. + */ + void close() throws Exception; + + + public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData); + + + /** + * Is this store capable of persisting the data + * + * @return true if this store is capable of persisting data + */ + boolean isPersistent(); + + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java new file mode 100644 index 0000000000..3d1538c7eb --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java @@ -0,0 +1,36 @@ +package org.apache.qpid.server.store; + +import org.apache.qpid.AMQException;/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * NOTE: this class currently extends AMQException but + * we should be using AMQExceptions internally in the code base for Protocol errors hence + * the message store interface should throw a different super class which this should be + * moved to reflect + */ +public class MessageStoreClosedException extends AMQException +{ + public MessageStoreClosedException() + { + super("Message store closed"); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreRecoveryHandler.java new file mode 100755 index 0000000000..ba65b8e1ec --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreRecoveryHandler.java @@ -0,0 +1,33 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.store; + +public interface MessageStoreRecoveryHandler +{ + StoredMessageRecoveryHandler begin(); + + public static interface StoredMessageRecoveryHandler + { + void message(StoredMessage message); + + void completeMessageRecovery(); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java new file mode 100755 index 0000000000..12d2a6a6c7 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java @@ -0,0 +1,36 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.nio.ByteBuffer; + +public interface StorableMessageMetaData +{ + MessageMetaDataType getType(); + + int getStorableSize(); + + int writeToBuffer(int offsetInMetaData, ByteBuffer dest); + + int getContentSize(); + + boolean isPersistent(); +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java new file mode 100644 index 0000000000..88cc68bc71 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import org.apache.log4j.Logger; + +/** + * A context that the store can use to associate with a transactional context. For example, it could store + * some kind of txn id. + * + * @author Apache Software Foundation + */ +public class StoreContext +{ + private static final Logger _logger = Logger.getLogger(StoreContext.class); + + private String _name; + private Object _payload; + + + public StoreContext() + { + _name = "StoreContext"; + } + + public StoreContext(String name) + { + _name = name; + } + + public Object getPayload() + { + return _payload; + } + + public void setPayload(Object payload) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("public void setPayload(Object payload = " + payload + "): called"); + } + _payload = payload; + } + + /** + * Prints out the transactional context as a string, mainly for debugging purposes. + * + * @return The transactional context as a string. + */ + public String toString() + { + return "<_name = " + _name + ", _payload = " + _payload + ">"; + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java new file mode 100755 index 0000000000..1f5b027b80 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java @@ -0,0 +1,80 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.store; + +import java.nio.ByteBuffer; + +public class StoredMemoryMessage implements StoredMessage +{ + private final long _messageNumber; + private final ByteBuffer _content; + private final StorableMessageMetaData _metaData; + + public StoredMemoryMessage(long messageNumber, StorableMessageMetaData metaData) + { + _messageNumber = messageNumber; + _metaData = metaData; + _content = ByteBuffer.allocate(metaData.getContentSize()); + + } + + public long getMessageNumber() + { + return _messageNumber; + } + + public void addContent(int offsetInMessage, ByteBuffer src) + { + src = src.duplicate(); + ByteBuffer dst = _content.duplicate(); + dst.position(offsetInMessage); + dst.put(src); + } + + public int getContent(int offset, ByteBuffer dst) + { + ByteBuffer src = _content.duplicate(); + src.position(offset); + src = src.slice(); + if(dst.remaining() < src.limit()) + { + src.limit(dst.remaining()); + } + dst.put(src); + return src.limit(); + } + + public TransactionLog.StoreFuture flushToStore() + { + return MessageStore.IMMEDIATE_FUTURE; + } + + + public StorableMessageMetaData getMetaData() + { + return _metaData; + } + + public void remove() + { + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMessage.java new file mode 100755 index 0000000000..0bc45c6718 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMessage.java @@ -0,0 +1,38 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.store; + +import java.nio.ByteBuffer; + +public interface StoredMessage<M extends StorableMessageMetaData> +{ + M getMetaData(); + + public long getMessageNumber(); + + void addContent(int offsetInMessage, ByteBuffer src); + + int getContent(int offsetInMessage, ByteBuffer dst); + + TransactionLog.StoreFuture flushToStore(); + + void remove(); +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java new file mode 100755 index 0000000000..d196a91930 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java @@ -0,0 +1,91 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.AMQStoreException; +import org.apache.commons.configuration.Configuration; + +public interface TransactionLog +{ + + public static interface Transaction + { + /** + * Places a message onto a specified queue, in a given transactional context. + * + * @param queue The queue to place the message on. + * @param messageId The message to enqueue. + * @throws AMQStoreException If the operation fails for any reason. + */ + void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException; + + /** + * Extracts a message from a specified queue, in a given transactional context. + * + * @param queue The queue to place the message on. + * @param messageId The message to dequeue. + * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist. + */ + void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException; + + + /** + * Commits all operations performed within a given transactional context. + * + * @throws AMQStoreException If the operation fails for any reason. + */ + void commitTran() throws AMQStoreException; + + /** + * Commits all operations performed within a given transactional context. + * + * @throws AMQStoreException If the operation fails for any reason. + */ + StoreFuture commitTranAsync() throws AMQStoreException; + + /** + * Abandons all operations performed within a given transactional context. + * + * @throws AMQStoreException If the operation fails for any reason. + */ + void abortTran() throws AMQStoreException; + + + + } + + public void configureTransactionLog(String name, + TransactionLogRecoveryHandler recoveryHandler, + Configuration storeConfiguration, + LogSubject logSubject) throws Exception; + + Transaction newTransaction(); + + + + public static interface StoreFuture + { + boolean isComplete(); + + void waitForCompletion(); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java new file mode 100755 index 0000000000..7781c52df3 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java @@ -0,0 +1,33 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.store; + +public interface TransactionLogRecoveryHandler +{ + QueueEntryRecoveryHandler begin(TransactionLog log); + + public static interface QueueEntryRecoveryHandler + { + void queueEntry(String queuename, long messageId); + + void completeQueueEntryRecovery(); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java new file mode 100755 index 0000000000..0d81dd151d --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java @@ -0,0 +1,26 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.store; + +public interface TransactionLogResource +{ + public String getResourceName(); +} |