diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java | 1148 |
1 files changed, 575 insertions, 573 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java index 743a736884..1764e2324e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java @@ -20,45 +20,37 @@ */ package org.apache.qpid.server.store; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.AMQQueueFactory; -import org.apache.qpid.server.queue.MessageMetaData; -import org.apache.qpid.server.queue.QueueRegistry; - -import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.MessageHandleFactory; -import org.apache.qpid.server.txn.TransactionalContext; -import org.apache.qpid.server.txn.NonTransactionalContext; +import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.abstraction.ContentChunk; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.messages.MessageStoreMessages; +import org.apache.qpid.server.logging.messages.ConfigStoreMessages; +import org.apache.qpid.server.logging.messages.TransactionLogMessages; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.commons.configuration.Configuration; -import org.apache.log4j.Logger; -import org.apache.mina.common.ByteBuffer; -import java.io.File; + import java.io.ByteArrayInputStream; -import java.sql.DriverManager; -import java.sql.Driver; +import java.io.File; +import java.sql.Blob; import java.sql.Connection; -import java.sql.SQLException; -import java.sql.Statement; +import java.sql.Driver; +import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; -import java.sql.Blob; +import java.sql.SQLException; +import java.sql.Statement; import java.sql.Types; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.List; import java.util.ArrayList; -import java.util.Map; -import java.util.HashMap; -import java.util.TreeMap; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.lang.ref.WeakReference; +import java.nio.ByteBuffer; public class DerbyMessageStore implements MessageStore @@ -66,7 +58,7 @@ public class DerbyMessageStore implements MessageStore private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class); - private static final String ENVIRONMENT_PATH_PROPERTY = "environment-path"; + public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path"; private static final String SQL_DRIVER_NAME = "org.apache.derby.jdbc.EmbeddedDriver"; @@ -77,53 +69,66 @@ public class DerbyMessageStore implements MessageStore private static final String QUEUE_TABLE_NAME = "QPID_QUEUE"; private static final String BINDINGS_TABLE_NAME = "QPID_BINDINGS"; private static final String QUEUE_ENTRY_TABLE_NAME = "QPID_QUEUE_ENTRY"; - private static final String MESSAGE_META_DATA_TABLE_NAME = "QPID_MESSAGE_META_DATA"; + + private static final String META_DATA_TABLE_NAME = "QPID_META_DATA"; private static final String MESSAGE_CONTENT_TABLE_NAME = "QPID_MESSAGE_CONTENT"; private static final int DB_VERSION = 1; - private VirtualHost _virtualHost; private static Class<Driver> DRIVER_CLASS; - private final AtomicLong _messageId = new AtomicLong(1); + private final AtomicLong _messageId = new AtomicLong(0); private AtomicBoolean _closed = new AtomicBoolean(false); private String _connectionURL; - + private static final String TABLE_EXISTANCE_QUERY = "SELECT 1 FROM SYS.SYSTABLES WHERE TABLENAME = ?"; private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+DB_VERSION_TABLE_NAME+" ( version int not null )"; private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+DB_VERSION_TABLE_NAME+" ( version ) VALUES ( ? )"; + private static final String CREATE_EXCHANGE_TABLE = "CREATE TABLE "+EXCHANGE_TABLE_NAME+" ( name varchar(255) not null, type varchar(255) not null, autodelete SMALLINT not null, PRIMARY KEY ( name ) )"; private static final String CREATE_QUEUE_TABLE = "CREATE TABLE "+QUEUE_TABLE_NAME+" ( name varchar(255) not null, owner varchar(255), PRIMARY KEY ( name ) )"; private static final String CREATE_BINDINGS_TABLE = "CREATE TABLE "+BINDINGS_TABLE_NAME+" ( exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255) not null, arguments blob , PRIMARY KEY ( exchange_name, queue_name, binding_key ) )"; - private static final String CREATE_QUEUE_ENTRY_TABLE = "CREATE TABLE "+QUEUE_ENTRY_TABLE_NAME+" ( queue_name varchar(255) not null, message_id bigint not null, PRIMARY KEY (queue_name, message_id) )"; - private static final String CREATE_MESSAGE_META_DATA_TABLE = "CREATE TABLE "+MESSAGE_META_DATA_TABLE_NAME+" ( message_id bigint not null, exchange_name varchar(255) not null, routing_key varchar(255), flag_mandatory smallint not null, flag_immediate smallint not null, content_header blob, chunk_count int not null, PRIMARY KEY ( message_id ) )"; - private static final String CREATE_MESSAGE_CONTENT_TABLE = "CREATE TABLE "+MESSAGE_CONTENT_TABLE_NAME+" ( message_id bigint not null, chunk_id int not null, content_chunk blob , PRIMARY KEY (message_id, chunk_id) )"; private static final String SELECT_FROM_QUEUE = "SELECT name, owner FROM " + QUEUE_TABLE_NAME; + private static final String FIND_QUEUE = "SELECT name, owner FROM " + QUEUE_TABLE_NAME + " WHERE name = ?"; private static final String SELECT_FROM_EXCHANGE = "SELECT name, type, autodelete FROM " + EXCHANGE_TABLE_NAME; private static final String SELECT_FROM_BINDINGS = - "SELECT queue_name, binding_key, arguments FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ?"; - private static final String DELETE_FROM_MESSAGE_META_DATA = "DELETE FROM " + MESSAGE_META_DATA_TABLE_NAME + " WHERE message_id = ?"; - private static final String DELETE_FROM_MESSAGE_CONTENT = "DELETE FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ?"; + "SELECT exchange_name, queue_name, binding_key, arguments FROM " + BINDINGS_TABLE_NAME + " ORDER BY exchange_name"; + private static final String FIND_BINDING = + "SELECT * FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ? AND queue_name = ? AND binding_key = ? "; private static final String INSERT_INTO_EXCHANGE = "INSERT INTO " + EXCHANGE_TABLE_NAME + " ( name, type, autodelete ) VALUES ( ?, ?, ? )"; private static final String DELETE_FROM_EXCHANGE = "DELETE FROM " + EXCHANGE_TABLE_NAME + " WHERE name = ?"; private static final String INSERT_INTO_BINDINGS = "INSERT INTO " + BINDINGS_TABLE_NAME + " ( exchange_name, queue_name, binding_key, arguments ) values ( ?, ?, ?, ? )"; private static final String DELETE_FROM_BINDINGS = "DELETE FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ? AND queue_name = ? AND binding_key = ?"; private static final String INSERT_INTO_QUEUE = "INSERT INTO " + QUEUE_TABLE_NAME + " (name, owner) VALUES (?, ?)"; private static final String DELETE_FROM_QUEUE = "DELETE FROM " + QUEUE_TABLE_NAME + " WHERE name = ?"; + + private static final String CREATE_QUEUE_ENTRY_TABLE = "CREATE TABLE "+QUEUE_ENTRY_TABLE_NAME+" ( queue_name varchar(255) not null, message_id bigint not null, PRIMARY KEY (queue_name, message_id) )"; private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_name, message_id) values (?,?)"; private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_name = ? AND message_id =?"; - private static final String INSERT_INTO_MESSAGE_CONTENT = "INSERT INTO " + MESSAGE_CONTENT_TABLE_NAME + "( message_id, chunk_id, content_chunk ) values (?, ?, ?)"; - private static final String INSERT_INTO_MESSAGE_META_DATA = "INSERT INTO " + MESSAGE_META_DATA_TABLE_NAME + "( message_id , exchange_name , routing_key , flag_mandatory , flag_immediate , content_header , chunk_count ) values (?, ?, ?, ?, ?, ?, ?)"; - private static final String SELECT_FROM_MESSAGE_META_DATA = - "SELECT exchange_name , routing_key , flag_mandatory , flag_immediate , content_header , chunk_count FROM " + MESSAGE_META_DATA_TABLE_NAME + " WHERE message_id = ?"; + private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_name, message_id FROM " + QUEUE_ENTRY_TABLE_NAME + " ORDER BY queue_name, message_id"; + + + private static final String CREATE_META_DATA_TABLE = "CREATE TABLE "+META_DATA_TABLE_NAME+" ( message_id bigint not null, meta_data blob, PRIMARY KEY ( message_id ) )"; + private static final String CREATE_MESSAGE_CONTENT_TABLE = "CREATE TABLE "+MESSAGE_CONTENT_TABLE_NAME+" ( message_id bigint not null, offset int not null, last_byte int not null, content blob , PRIMARY KEY (message_id, offset) )"; + + private static final String INSERT_INTO_MESSAGE_CONTENT = "INSERT INTO " + MESSAGE_CONTENT_TABLE_NAME + "( message_id, offset, last_byte, content ) values (?, ?, ?, ?)"; private static final String SELECT_FROM_MESSAGE_CONTENT = - "SELECT content_chunk FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ? and chunk_id = ?"; - private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_name, message_id FROM " + QUEUE_ENTRY_TABLE_NAME; - private static final String TABLE_EXISTANCE_QUERY = "SELECT 1 FROM SYS.SYSTABLES WHERE TABLENAME = ?"; + "SELECT offset, content FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ? AND last_byte > ? AND offset < ? ORDER BY message_id, offset"; + private static final String DELETE_FROM_MESSAGE_CONTENT = "DELETE FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ?"; + + private static final String INSERT_INTO_META_DATA = "INSERT INTO " + META_DATA_TABLE_NAME + "( message_id , meta_data ) values (?, ?)";; + private static final String SELECT_FROM_META_DATA = + "SELECT meta_data FROM " + META_DATA_TABLE_NAME + " WHERE message_id = ?"; + private static final String DELETE_FROM_META_DATA = "DELETE FROM " + META_DATA_TABLE_NAME + " WHERE message_id = ?"; + private static final String SELECT_ALL_FROM_META_DATA = "SELECT message_id, meta_data FROM " + META_DATA_TABLE_NAME; + + + private LogSubject _logSubject; + private boolean _configured; private enum State @@ -139,18 +144,82 @@ public class DerbyMessageStore implements MessageStore private State _state = State.INITIAL; - public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception + public void configureConfigStore(String name, + ConfigurationRecoveryHandler recoveryHandler, + Configuration storeConfiguration, + LogSubject logSubject) throws Exception { stateTransition(State.INITIAL, State.CONFIGURING); + _logSubject = logSubject; + CurrentActor.get().message(_logSubject, ConfigStoreMessages.CFG_1001(this.getClass().getName())); - initialiseDriver(); + if(!_configured) + { + commonConfiguration(name, storeConfiguration, logSubject); + _configured = true; + } - _virtualHost = virtualHost; + // this recovers durable exchanges, queues, and bindings + recover(recoveryHandler); - _logger.info("Configuring Derby message store for virtual host " + virtualHost.getName()); - QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); - final String databasePath = config.getString(base + "." + ENVIRONMENT_PATH_PROPERTY, "derbyDB"); + stateTransition(State.RECOVERING, State.STARTED); + + } + + + public void configureMessageStore(String name, + MessageStoreRecoveryHandler recoveryHandler, + Configuration storeConfiguration, + LogSubject logSubject) throws Exception + { + CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_CREATED(this.getClass().getName())); + + if(!_configured) + { + + _logSubject = logSubject; + + commonConfiguration(name, storeConfiguration, logSubject); + _configured = true; + } + + recoverMessages(recoveryHandler); + + } + + + + public void configureTransactionLog(String name, + TransactionLogRecoveryHandler recoveryHandler, + Configuration storeConfiguration, + LogSubject logSubject) throws Exception + { + CurrentActor.get().message(_logSubject, TransactionLogMessages.TXN_1001(this.getClass().getName())); + + if(!_configured) + { + + _logSubject = logSubject; + + commonConfiguration(name, storeConfiguration, logSubject); + _configured = true; + } + + recoverQueueEntries(recoveryHandler); + + } + + + + private void commonConfiguration(String name, Configuration storeConfiguration, LogSubject logSubject) + throws ClassNotFoundException, SQLException + { + initialiseDriver(); + + //Update to pick up QPID_WORK and use that as the default location not just derbyDB + + final String databasePath = storeConfiguration.getString(ENVIRONMENT_PATH_PROPERTY, System.getProperty("QPID_WORK")+"/derbyDB"); File environmentPath = new File(databasePath); if (!environmentPath.exists()) @@ -162,14 +231,9 @@ public class DerbyMessageStore implements MessageStore } } - createOrOpenDatabase(databasePath); - - // this recovers durable queues and persistent messages - - recover(); - - stateTransition(State.RECOVERING, State.STARTED); + CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_STORE_LOCATION(environmentPath.getAbsolutePath())); + createOrOpenDatabase(name, databasePath); } private static synchronized void initialiseDriver() throws ClassNotFoundException @@ -180,9 +244,10 @@ public class DerbyMessageStore implements MessageStore } } - private void createOrOpenDatabase(final String environmentPath) throws SQLException + private void createOrOpenDatabase(String name, final String environmentPath) throws SQLException { - _connectionURL = "jdbc:derby:" + environmentPath + "/" + _virtualHost.getName() + ";create=true"; + //fixme this the _vhost name should not be added here. + _connectionURL = "jdbc:derby:" + environmentPath + "/" + name + ";create=true"; Connection conn = newConnection(); @@ -191,7 +256,7 @@ public class DerbyMessageStore implements MessageStore createQueueTable(conn); createBindingsTable(conn); createQueueEntryTable(conn); - createMessageMetaDataTable(conn); + createMetaDataTable(conn); createMessageContentTable(conn); conn.close(); @@ -262,12 +327,12 @@ public class DerbyMessageStore implements MessageStore } - private void createMessageMetaDataTable(final Connection conn) throws SQLException + private void createMetaDataTable(final Connection conn) throws SQLException { - if(!tableExists(MESSAGE_META_DATA_TABLE_NAME, conn)) + if(!tableExists(META_DATA_TABLE_NAME, conn)) { Statement stmt = conn.createStatement(); - stmt.execute(CREATE_MESSAGE_META_DATA_TABLE); + stmt.execute(CREATE_META_DATA_TABLE); stmt.close(); } @@ -300,36 +365,22 @@ public class DerbyMessageStore implements MessageStore return exists; } - public void recover() throws AMQException + public void recover(ConfigurationRecoveryHandler recoveryHandler) throws AMQException { stateTransition(State.CONFIGURING, State.RECOVERING); - _logger.info("Recovering persistent state..."); - StoreContext context = new StoreContext(); + CurrentActor.get().message(_logSubject,MessageStoreMessages.MST_RECOVERY_START()); try { - Map<AMQShortString, AMQQueue> queues = loadQueues(); - - recoverExchanges(); - - try - { - - beginTran(context); - - deliverMessages(context, queues); - _logger.info("Persistent state recovered successfully"); - commitTran(context); + ConfigurationRecoveryHandler.QueueRecoveryHandler qrh = recoveryHandler.begin(this); + List<String> queues = loadQueues(qrh); - } - finally - { - if(inTran(context)) - { - abortTran(context); - } - } + ConfigurationRecoveryHandler.ExchangeRecoveryHandler erh = qrh.completeQueueRecovery(); + List<String> exchanges = loadExchanges(erh); + ConfigurationRecoveryHandler.BindingRecoveryHandler brh = erh.completeExchangeRecovery(); + recoverBindings(brh, exchanges); + brh.completeBindingRecovery(); } catch (SQLException e) { @@ -337,43 +388,34 @@ public class DerbyMessageStore implements MessageStore throw new AMQException("Error recovering persistent state: " + e, e); } + } - private Map<AMQShortString, AMQQueue> loadQueues() throws SQLException, AMQException + private List<String> loadQueues(ConfigurationRecoveryHandler.QueueRecoveryHandler qrh) throws SQLException, AMQException { Connection conn = newConnection(); Statement stmt = conn.createStatement(); ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE); - Map<AMQShortString, AMQQueue> queueMap = new HashMap<AMQShortString, AMQQueue>(); + List<String> queues = new ArrayList<String>(); + while(rs.next()) { String queueName = rs.getString(1); String owner = rs.getString(2); - AMQShortString queueNameShortString = new AMQShortString(queueName); - AMQQueue q = AMQQueueFactory.createAMQQueueImpl(queueNameShortString, true, owner == null ? null : new AMQShortString(owner), false, _virtualHost, - null); - _virtualHost.getQueueRegistry().registerQueue(q); - queueMap.put(queueNameShortString,q); - - } - return queueMap; - } + qrh.queue(queueName, owner, null); - private void recoverExchanges() throws AMQException, SQLException - { - for (Exchange exchange : loadExchanges()) - { - recoverExchange(exchange); + queues.add(queueName); } + return queues; } - private List<Exchange> loadExchanges() throws AMQException, SQLException + private List<String> loadExchanges(ConfigurationRecoveryHandler.ExchangeRecoveryHandler erh) throws AMQException, SQLException { - List<Exchange> exchanges = new ArrayList<Exchange>(); + List<String> exchanges = new ArrayList<String>(); Connection conn = null; try { @@ -383,16 +425,15 @@ public class DerbyMessageStore implements MessageStore Statement stmt = conn.createStatement(); ResultSet rs = stmt.executeQuery(SELECT_FROM_EXCHANGE); - Exchange exchange; while(rs.next()) { String exchangeName = rs.getString(1); String type = rs.getString(2); boolean autoDelete = rs.getShort(3) != 0; - exchange = _virtualHost.getExchangeFactory().createExchange(new AMQShortString(exchangeName), new AMQShortString(type), true, autoDelete, 0); - _virtualHost.getExchangeRegistry().registerExchange(exchange); - exchanges.add(exchange); + exchanges.add(exchangeName); + + erh.exchange(exchangeName, type, autoDelete); } return exchanges; @@ -408,11 +449,13 @@ public class DerbyMessageStore implements MessageStore } - private void recoverExchange(Exchange exchange) throws AMQException, SQLException + private void recoverBindings(ConfigurationRecoveryHandler.BindingRecoveryHandler brh, List<String> exchanges) throws AMQException, SQLException { - _logger.info("Recovering durable exchange " + exchange.getName() + " of type " + exchange.getType() + "..."); - QueueRegistry queueRegistry = _virtualHost.getQueueRegistry(); + + _logger.info("Recovering bindings..."); + + Connection conn = null; try @@ -420,41 +463,29 @@ public class DerbyMessageStore implements MessageStore conn = newConnection(); PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_BINDINGS); - stmt.setString(1, exchange.getName().toString()); ResultSet rs = stmt.executeQuery(); while(rs.next()) { - String queueName = rs.getString(1); - String bindingKey = rs.getString(2); - Blob arguments = rs.getBlob(3); - + String exchangeName = rs.getString(1); + String queueName = rs.getString(2); + String bindingKey = rs.getString(3); + Blob arguments = rs.getBlob(4); + java.nio.ByteBuffer buf; - AMQQueue queue = queueRegistry.getQueue(new AMQShortString(queueName)); - if (queue == null) + if(arguments != null && arguments.length() != 0) { - _logger.error("Unkown queue: " + queueName + " cannot be bound to exchange: " - + exchange.getName()); + byte[] argumentBytes = arguments.getBytes(1, (int) arguments.length()); + buf = java.nio.ByteBuffer.wrap(argumentBytes); } else { - _logger.info("Restoring binding: (Exchange: " + exchange.getName() + ", Queue: " + queueName - + ", Routing Key: " + bindingKey + ", Arguments: " + arguments - + ")"); - - FieldTable argumentsFT = null; - if(arguments != null) - { - byte[] argumentBytes = arguments.getBytes(0, (int) arguments.length()); - ByteBuffer buf = ByteBuffer.wrap(argumentBytes); - argumentsFT = new FieldTable(buf,arguments.length()); - } - - queue.bind(exchange, bindingKey == null ? null : new AMQShortString(bindingKey), argumentsFT); - + buf = null; } + + brh.binding(exchangeName, queueName, bindingKey, buf); } } finally @@ -466,42 +497,48 @@ public class DerbyMessageStore implements MessageStore } } + + public void close() throws Exception { + CurrentActor.get().message(_logSubject,MessageStoreMessages.MST_CLOSED()); _closed.getAndSet(true); } - public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException + public StoredMessage addMessage(StorableMessageMetaData metaData) { - - boolean localTx = getOrCreateTransaction(storeContext); - - Connection conn = getConnection(storeContext); - ConnectionWrapper wrapper = (ConnectionWrapper) storeContext.getPayload(); - - - if (_logger.isDebugEnabled()) + if(metaData.isPersistent()) { - _logger.debug("Message Id: " + messageId + " Removing"); + return new StoredDerbyMessage(_messageId.incrementAndGet(), metaData); } + else + { + return new StoredMemoryMessage(_messageId.incrementAndGet(), metaData); + } + } + + public StoredMessage getMessage(long messageNumber) + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } - // first we need to look up the header to get the chunk count - MessageMetaData mmd = getMessageMetaData(storeContext, messageId); + public void removeMessage(long messageId) + { + Connection conn = null; try { - PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_META_DATA); + + + conn = newConnection(); + PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_META_DATA); stmt.setLong(1,messageId); - wrapper.setRequiresCommit(); int results = stmt.executeUpdate(); if (results == 0) { - if (localTx) - { - abortTran(storeContext); - } - throw new AMQException("Message metadata not found for message id " + messageId); + + throw new RuntimeException("Message metadata not found for message id " + messageId); } stmt.close(); @@ -514,29 +551,27 @@ public class DerbyMessageStore implements MessageStore stmt.setLong(1,messageId); results = stmt.executeUpdate(); - if(results != mmd.getContentChunkCount()) - { - if (localTx) - { - abortTran(storeContext); - } - throw new AMQException("Unexpected number of content chunks when deleting message. Expected " + mmd.getContentChunkCount() + " but found " + results); - } - if (localTx) - { - commitTran(storeContext); - } + conn.commit(); + conn.close(); } catch (SQLException e) { - if ((conn != null) && localTx) + if ((conn != null)) { - abortTran(storeContext); + try + { + conn.rollback(); + conn.close(); + } + catch (SQLException e1) + { + + } } - throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, e); + throw new RuntimeException("Error removing Message with id " + messageId + " to database: " + e, e); } } @@ -630,29 +665,41 @@ public class DerbyMessageStore implements MessageStore try { conn = newConnection(); - PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_BINDINGS); + + PreparedStatement stmt = conn.prepareStatement(FIND_BINDING); stmt.setString(1, exchange.getName().toString() ); stmt.setString(2, queue.getName().toString()); stmt.setString(3, routingKey == null ? null : routingKey.toString()); - if(args != null) - { - /* This would be the Java 6 way of setting a Blob - Blob blobArgs = conn.createBlob(); - blobArgs.setBytes(0, args.getDataAsBytes()); - stmt.setBlob(4, blobArgs); - */ - byte[] bytes = args.getDataAsBytes(); - ByteArrayInputStream bis = new ByteArrayInputStream(bytes); - stmt.setBinaryStream(4, bis, bytes.length); - } - else + + ResultSet rs = stmt.executeQuery(); + + // If this binding is not already in the store then create it. + if (!rs.next()) { - stmt.setNull(4, Types.BLOB); - } + stmt = conn.prepareStatement(INSERT_INTO_BINDINGS); + stmt.setString(1, exchange.getName().toString() ); + stmt.setString(2, queue.getName().toString()); + stmt.setString(3, routingKey == null ? null : routingKey.toString()); + if(args != null) + { + /* This would be the Java 6 way of setting a Blob + Blob blobArgs = conn.createBlob(); + blobArgs.setBytes(0, args.getDataAsBytes()); + stmt.setBlob(4, blobArgs); + */ + byte[] bytes = args.getDataAsBytes(); + ByteArrayInputStream bis = new ByteArrayInputStream(bytes); + stmt.setBinaryStream(4, bis, bytes.length); + } + else + { + stmt.setNull(4, Types.BLOB); + } - stmt.executeUpdate(); - conn.commit(); - stmt.close(); + stmt.executeUpdate(); + conn.commit(); + stmt.close(); + } } catch (SQLException e) { @@ -743,19 +790,33 @@ public class DerbyMessageStore implements MessageStore { Connection conn = newConnection(); - PreparedStatement stmt = - conn.prepareStatement(INSERT_INTO_QUEUE); - + PreparedStatement stmt = conn.prepareStatement(FIND_QUEUE); stmt.setString(1, queue.getName().toString()); - stmt.setString(2, queue.getOwner() == null ? null : queue.getOwner().toString()); - stmt.execute(); + ResultSet rs = stmt.executeQuery(); - stmt.close(); + // If we don't have any data in the result set then we can add this queue + if (!rs.next()) + { + stmt = conn.prepareStatement(INSERT_INTO_QUEUE); - conn.commit(); + String owner = queue.getPrincipalHolder() == null + ? null + : queue.getPrincipalHolder().getPrincipal() == null + ? null + : queue.getPrincipalHolder().getPrincipal().getName(); - conn.close(); + stmt.setString(1, queue.getName().toString()); + stmt.setString(2, owner); + + stmt.execute(); + + stmt.close(); + + conn.commit(); + + conn.close(); + } } catch (SQLException e) { @@ -816,29 +877,26 @@ public class DerbyMessageStore implements MessageStore } - public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException + public Transaction newTransaction() { - AMQShortString name = queue.getName(); + return new DerbyTransaction(); + } + + public void enqueueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws AMQException + { + String name = queue.getResourceName(); + + Connection conn = connWrapper.getConnection(); - boolean localTx = getOrCreateTransaction(context); - Connection conn = getConnection(context); - ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload(); try { PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY); - stmt.setString(1,name.toString()); + stmt.setString(1,name); stmt.setLong(2,messageId); stmt.executeUpdate(); connWrapper.requiresCommit(); - if(localTx) - { - commitTran(context); - } - - - if (_logger.isDebugEnabled()) { _logger.debug("Enqueuing message " + messageId + " on queue " + name + "[Connection" + conn + "]"); @@ -846,10 +904,6 @@ public class DerbyMessageStore implements MessageStore } catch (SQLException e) { - if(localTx) - { - abortTran(context); - } _logger.error("Failed to enqueue: " + e, e); throw new AMQException("Error writing enqueued message with id " + messageId + " for queue " + name + " to database", e); @@ -857,18 +911,18 @@ public class DerbyMessageStore implements MessageStore } - public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException + public void dequeueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws AMQException { - AMQShortString name = queue.getName(); + String name = queue.getResourceName(); + + + Connection conn = connWrapper.getConnection(); - boolean localTx = getOrCreateTransaction(context); - Connection conn = getConnection(context); - ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload(); try { PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE_ENTRY); - stmt.setString(1,name.toString()); + stmt.setString(1,name); stmt.setLong(2,messageId); int results = stmt.executeUpdate(); @@ -879,24 +933,13 @@ public class DerbyMessageStore implements MessageStore throw new AMQException("Unable to find message with id " + messageId + " on queue " + name); } - if(localTx) - { - commitTran(context); - } - - - if (_logger.isDebugEnabled()) { - _logger.debug("Dequeuing message " + messageId + " on queue " + name + "[Connection" + conn + "]"); + _logger.debug("Dequeuing message " + messageId + " on queue " + name );//+ "[Connection" + conn + "]"); } } catch (SQLException e) { - if(localTx) - { - abortTran(context); - } _logger.error("Failed to dequeue: " + e, e); throw new AMQException("Error deleting enqueued message with id " + messageId + " for queue " + name + " from database", e); @@ -930,51 +973,20 @@ public class DerbyMessageStore implements MessageStore } } - public void beginTran(StoreContext context) throws AMQException - { - if (context.getPayload() != null) - { - throw new AMQException("Fatal internal error: transactional context is not empty at beginTran: " - + context.getPayload()); - } - else - { - try - { - Connection conn = newConnection(); - - context.setPayload(new ConnectionWrapper(conn)); - } - catch (SQLException e) - { - throw new AMQException("Error starting transaction: " + e, e); - } - } - } - - public void commitTran(StoreContext context) throws AMQException + public void commitTran(ConnectionWrapper connWrapper) throws AMQException { - ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload(); - - if (connWrapper == null) - { - throw new AMQException("Fatal internal error: transactional context is empty at commitTran"); - } try { Connection conn = connWrapper.getConnection(); - if(connWrapper.requiresCommit()) - { - conn.commit(); - - if (_logger.isDebugEnabled()) - { - _logger.debug("commit tran completed"); - } + conn.commit(); + if (_logger.isDebugEnabled()) + { + _logger.debug("commit tran completed"); } + conn.close(); } catch (SQLException e) @@ -983,14 +995,30 @@ public class DerbyMessageStore implements MessageStore } finally { - context.setPayload(null); + } } - public void abortTran(StoreContext context) throws AMQException + public StoreFuture commitTranAsync(ConnectionWrapper connWrapper) throws AMQException { - ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload(); + commitTran(connWrapper); + return new StoreFuture() + { + public boolean isComplete() + { + return true; + } + public void waitForCompletion() + { + + } + }; + + } + + public void abortTran(ConnectionWrapper connWrapper) throws AMQException + { if (connWrapper == null) { throw new AMQException("Fatal internal error: transactional context is empty at abortTran"); @@ -1015,272 +1043,261 @@ public class DerbyMessageStore implements MessageStore { throw new AMQException("Error aborting transaction: " + e, e); } - finally - { - context.setPayload(null); - } + } - public boolean inTran(StoreContext context) + public Long getNewMessageId() { - return context.getPayload() != null; + return _messageId.incrementAndGet(); } - public Long getNewMessageId() + + private void storeMetaData(Connection conn, long messageId, StorableMessageMetaData metaData) + throws SQLException { - return _messageId.getAndIncrement(); + PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_META_DATA); + stmt.setLong(1,messageId); + + final int bodySize = 1 + metaData.getStorableSize(); + byte[] underlying = new byte[bodySize]; + underlying[0] = (byte) metaData.getType().ordinal(); + java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(underlying); + buf.position(1); + buf = buf.slice(); + + metaData.writeToBuffer(0, buf); + ByteArrayInputStream bis = new ByteArrayInputStream(underlying); + stmt.setBinaryStream(2,bis,underlying.length); + stmt.executeUpdate(); + } - public void storeContentBodyChunk(StoreContext context, - Long messageId, - int index, - ContentChunk contentBody, - boolean lastContentBody) throws AMQException + + + + private void recoverMessages(MessageStoreRecoveryHandler recoveryHandler) throws SQLException { - boolean localTx = getOrCreateTransaction(context); - Connection conn = getConnection(context); - ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload(); + Connection conn = newConnection(); - try - { - PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_CONTENT); - stmt.setLong(1,messageId); - stmt.setInt(2, index); - byte[] chunkData = new byte[contentBody.getSize()]; - contentBody.getData().duplicate().get(chunkData); - /* this would be the Java 6 way of doing things - Blob dataAsBlob = conn.createBlob(); - dataAsBlob.setBytes(1L, chunkData); - stmt.setBlob(3, dataAsBlob); - */ - ByteArrayInputStream bis = new ByteArrayInputStream(chunkData); - stmt.setBinaryStream(3, bis, chunkData.length); - stmt.executeUpdate(); - connWrapper.requiresCommit(); + MessageStoreRecoveryHandler.StoredMessageRecoveryHandler messageHandler = recoveryHandler.begin(); - if(localTx) - { - commitTran(context); - } - } - catch (SQLException e) + Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_META_DATA); + + long maxId = 0; + + while(rs.next()) { - if(localTx) + + long messageId = rs.getLong(1); + Blob dataAsBlob = rs.getBlob(2); + + if(messageId > maxId) { - abortTran(context); + maxId = messageId; } - throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, e); + byte[] dataAsBytes = dataAsBlob.getBytes(1,(int) dataAsBlob.length()); + java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(dataAsBytes); + buf.position(1); + buf = buf.slice(); + MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]]; + StorableMessageMetaData metaData = type.getFactory().createMetaData(buf); + StoredDerbyMessage message = new StoredDerbyMessage(messageId, metaData, false); + messageHandler.message(message); + + } + _messageId.set(maxId); + + messageHandler.completeMessageRecovery(); } - public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData mmd) - throws AMQException - { - boolean localTx = getOrCreateTransaction(context); - Connection conn = getConnection(context); - ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload(); - try - { + private void recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) throws SQLException + { + Connection conn = newConnection(); - PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_META_DATA); - stmt.setLong(1,messageId); - stmt.setString(2, mmd.getMessagePublishInfo().getExchange().toString()); - stmt.setString(3, mmd.getMessagePublishInfo().getRoutingKey().toString()); - stmt.setShort(4, mmd.getMessagePublishInfo().isMandatory() ? (short) 1 : (short) 0); - stmt.setShort(5, mmd.getMessagePublishInfo().isImmediate() ? (short) 1 : (short) 0); - - ContentHeaderBody headerBody = mmd.getContentHeaderBody(); - final int bodySize = headerBody.getSize(); - byte[] underlying = new byte[bodySize]; - ByteBuffer buf = ByteBuffer.wrap(underlying); - headerBody.writePayload(buf); -/* - Blob dataAsBlob = conn.createBlob(); - dataAsBlob.setBytes(1L, underlying); - stmt.setBlob(6, dataAsBlob); -*/ - ByteArrayInputStream bis = new ByteArrayInputStream(underlying); - stmt.setBinaryStream(6,bis,underlying.length); + TransactionLogRecoveryHandler.QueueEntryRecoveryHandler queueEntryHandler = recoveryHandler.begin(this); - stmt.setInt(7, mmd.getContentChunkCount()); + Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY); - stmt.executeUpdate(); - connWrapper.requiresCommit(); - if(localTx) - { - commitTran(context); - } - } - catch (SQLException e) + while(rs.next()) { - if(localTx) - { - abortTran(context); - } - throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, e); + String queueName = rs.getString(1); + long messageId = rs.getLong(2); + queueEntryHandler.queueEntry(queueName,messageId); } + + queueEntryHandler.completeQueueEntryRecovery(); + } - public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException + StorableMessageMetaData getMetaData(long messageId) throws SQLException { - boolean localTx = getOrCreateTransaction(context); - Connection conn = getConnection(context); - + Connection conn = newConnection(); try { - - PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_META_DATA); + PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_META_DATA); stmt.setLong(1,messageId); ResultSet rs = stmt.executeQuery(); if(rs.next()) { - final AMQShortString exchange = new AMQShortString(rs.getString(1)); - final AMQShortString routingKey = rs.getString(2) == null ? null : new AMQShortString(rs.getString(2)); - final boolean mandatory = (rs.getShort(3) != (short)0); - final boolean immediate = (rs.getShort(4) != (short)0); - MessagePublishInfo info = new MessagePublishInfo() - { - public AMQShortString getExchange() - { - return exchange; - } + Blob dataAsBlob = rs.getBlob(1); - public void setExchange(AMQShortString exchange) - { + byte[] dataAsBytes = dataAsBlob.getBytes(1,(int) dataAsBlob.length()); + java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(dataAsBytes); + buf.position(1); + buf = buf.slice(); + MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]]; + StorableMessageMetaData metaData = type.getFactory().createMetaData(buf); - } + return metaData; + } + else + { + throw new RuntimeException("Meta data not found for message with id " + messageId); + } - public boolean isImmediate() - { - return immediate; - } + } + finally + { + conn.close(); + } + } - public boolean isMandatory() - { - return mandatory; - } - public AMQShortString getRoutingKey() - { - return routingKey; - } - } ; + private void addContent(Connection conn, long messageId, int offset, ByteBuffer src) + { - Blob dataAsBlob = rs.getBlob(5); - byte[] dataAsBytes = dataAsBlob.getBytes(1,(int) dataAsBlob.length()); - ByteBuffer buf = ByteBuffer.wrap(dataAsBytes); - ContentHeaderBody chb = ContentHeaderBody.createFromBuffer(buf, dataAsBytes.length); + try + { + final boolean newConnection = conn == null; - if(localTx) - { - commitTran(context); - } + if(newConnection) + { + conn = newConnection(); + } - return new MessageMetaData(info, chb, rs.getInt(6)); + src = src.slice(); - } - else + byte[] chunkData = new byte[src.limit()]; + src.duplicate().get(chunkData); + + PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_CONTENT); + stmt.setLong(1,messageId); + stmt.setInt(2, offset); + stmt.setInt(3, offset+chunkData.length); + + + /* this would be the Java 6 way of doing things + Blob dataAsBlob = conn.createBlob(); + dataAsBlob.setBytes(1L, chunkData); + stmt.setBlob(3, dataAsBlob); + */ + ByteArrayInputStream bis = new ByteArrayInputStream(chunkData); + stmt.setBinaryStream(4, bis, chunkData.length); + stmt.executeUpdate(); + + if(newConnection) { - if(localTx) - { - abortTran(context); - } - throw new AMQException("Metadata not found for message with id " + messageId); + conn.commit(); + conn.close(); } } catch (SQLException e) { - if(localTx) + if(conn != null) { - abortTran(context); + try + { + conn.close(); + } + catch (SQLException e1) + { + + } } - throw new AMQException("Error reading AMQMessage with id " + messageId + " from database: " + e, e); + throw new RuntimeException("Error reading AMQMessage with id " + messageId + " from database: " + e, e); } } - public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException - { - boolean localTx = getOrCreateTransaction(context); - Connection conn = getConnection(context); - - try - { + public int getContent(long messageId, int offset, ByteBuffer dst) + { + Connection conn = null; - PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_CONTENT); - stmt.setLong(1,messageId); - stmt.setInt(2, index); - ResultSet rs = stmt.executeQuery(); - if(rs.next()) - { - Blob dataAsBlob = rs.getBlob(1); - - final int size = (int) dataAsBlob.length(); - byte[] dataAsBytes = dataAsBlob.getBytes(1, size); - final ByteBuffer buf = ByteBuffer.wrap(dataAsBytes); + try + { + conn = newConnection(); - ContentChunk cb = new ContentChunk() - { + PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_CONTENT); + stmt.setLong(1,messageId); + stmt.setInt(2, offset); + stmt.setInt(3, offset+dst.remaining()); + ResultSet rs = stmt.executeQuery(); - public int getSize() - { - return size; - } + int written = 0; - public ByteBuffer getData() - { - return buf; - } + while(rs.next()) + { + int offsetInMessage = rs.getInt(1); + Blob dataAsBlob = rs.getBlob(2); - public void reduceToFit() - { + final int size = (int) dataAsBlob.length(); + byte[] dataAsBytes = dataAsBlob.getBytes(1, size); - } - }; + int posInArray = offset + written - offsetInMessage; + int count = size - posInArray; + if(count > dst.remaining()) + { + count = dst.remaining(); + } + dst.put(dataAsBytes,posInArray,count); + written+=count; - if(localTx) - { - commitTran(context); - } + if(dst.remaining() == 0) + { + break; + } + } - return cb; + conn.close(); + return written; - } - else - { - if(localTx) - { - abortTran(context); - } - throw new AMQException("Message not found for message with id " + messageId); - } + } + catch (SQLException e) + { + if(conn != null) + { + try + { + conn.close(); } - catch (SQLException e) + catch (SQLException e1) { - if(localTx) - { - abortTran(context); - } - throw new AMQException("Error reading AMQMessage with id " + messageId + " from database: " + e, e); } + } + + throw new RuntimeException("Error reading AMQMessage with id " + messageId + " from database: " + e, e); + } @@ -1291,173 +1308,158 @@ public class DerbyMessageStore implements MessageStore return true; } - private void checkNotClosed() throws MessageStoreClosedException - { - if (_closed.get()) - { - throw new MessageStoreClosedException(); - } - } - - private static final class ProcessAction + private synchronized void stateTransition(State requiredState, State newState) throws AMQException { - private final AMQQueue _queue; - private final StoreContext _context; - private final AMQMessage _message; - - public ProcessAction(AMQQueue queue, StoreContext context, AMQMessage message) - { - _queue = queue; - _context = context; - _message = message; - } - - public void process() throws AMQException + if (_state != requiredState) { - _queue.enqueue(_context, _message); - + throw new AMQException("Cannot transition to the state: " + newState + "; need to be in state: " + requiredState + + "; currently in state: " + _state); } + _state = newState; } - private void deliverMessages(final StoreContext context, Map<AMQShortString, AMQQueue> queues) - throws SQLException, AMQException + private class DerbyTransaction implements Transaction { - Map<Long, AMQMessage> msgMap = new HashMap<Long,AMQMessage>(); - List<ProcessAction> actions = new ArrayList<ProcessAction>(); + private final ConnectionWrapper _connWrapper; - Map<AMQShortString, Integer> queueRecoveries = new TreeMap<AMQShortString, Integer>(); - final boolean inLocaltran = inTran(context); - Connection conn = null; - try + private DerbyTransaction() { - - if(inLocaltran) + try { - conn = getConnection(context); + _connWrapper = new ConnectionWrapper(newConnection()); } - else + catch (SQLException e) { - conn = newConnection(); + throw new RuntimeException(e); } + } + public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQException + { + DerbyMessageStore.this.enqueueMessage(_connWrapper, queue, messageId); + } - MessageHandleFactory messageHandleFactory = new MessageHandleFactory(); - long maxId = 1; - - TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null); + public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQException + { + DerbyMessageStore.this.dequeueMessage(_connWrapper, queue, messageId); - Statement stmt = conn.createStatement(); - ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY); + } + public void commitTran() throws AMQException + { + DerbyMessageStore.this.commitTran(_connWrapper); + } - while (rs.next()) - { + public StoreFuture commitTranAsync() throws AMQException + { + return DerbyMessageStore.this.commitTranAsync(_connWrapper); + } + public void abortTran() throws AMQException + { + DerbyMessageStore.this.abortTran(_connWrapper); + } + } + private class StoredDerbyMessage implements StoredMessage + { - AMQShortString queueName = new AMQShortString(rs.getString(1)); + private final long _messageId; + private volatile WeakReference<StorableMessageMetaData> _metaDataRef; + private Connection _conn; + StoredDerbyMessage(long messageId, StorableMessageMetaData metaData) + { + this(messageId, metaData, true); + } - AMQQueue queue = queues.get(queueName); - if (queue == null) - { - queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, null, false, _virtualHost, null); - _virtualHost.getQueueRegistry().registerQueue(queue); - queues.put(queueName, queue); - } - - long messageId = rs.getLong(2); - maxId = Math.max(maxId, messageId); - AMQMessage message = msgMap.get(messageId); + StoredDerbyMessage(long messageId, + StorableMessageMetaData metaData, boolean persist) + { + try + { + _messageId = messageId; - if(message != null) + _metaDataRef = new WeakReference(metaData); + if(persist) { - message.incrementReference(); - } - else - { - message = new AMQMessage(messageId, this, messageHandleFactory, txnContext); - msgMap.put(messageId,message); + _conn = newConnection(); + storeMetaData(_conn, messageId, metaData); } + } + catch (SQLException e) + { + throw new RuntimeException(e); + } + + } - if (_logger.isDebugEnabled()) + public StorableMessageMetaData getMetaData() + { + StorableMessageMetaData metaData = _metaDataRef.get(); + if(metaData == null) + { + try { - _logger.debug("On recovery, delivering " + message.getMessageId() + " to " + queue.getName()); + metaData = DerbyMessageStore.this.getMetaData(_messageId); } - - if (_logger.isInfoEnabled()) + catch (SQLException e) { - Integer count = queueRecoveries.get(queueName); - if (count == null) - { - count = 0; - } - - queueRecoveries.put(queueName, ++count); - + throw new RuntimeException(e); } - - actions.add(new ProcessAction(queue, context, message)); - + _metaDataRef = new WeakReference(metaData); } - for(ProcessAction action : actions) - { - action.process(); - } - - _messageId.set(maxId + 1); + return metaData; } - catch (SQLException e) + + public long getMessageNumber() { - _logger.error("Error: " + e, e); - throw e; + return _messageId; } - finally + + public void addContent(int offsetInMessage, java.nio.ByteBuffer src) { - if (inLocaltran && conn != null) - { - conn.close(); - } + DerbyMessageStore.this.addContent(_conn, _messageId, offsetInMessage, src); } - if (_logger.isInfoEnabled()) + public int getContent(int offsetInMessage, java.nio.ByteBuffer dst) { - _logger.info("Recovered message counts: " + queueRecoveries); + return DerbyMessageStore.this.getContent(_messageId, offsetInMessage, dst); } - } - - private Connection getConnection(final StoreContext context) - { - return ((ConnectionWrapper)context.getPayload()).getConnection(); - } - - private boolean getOrCreateTransaction(StoreContext context) throws AMQException - { - ConnectionWrapper tx = (ConnectionWrapper) context.getPayload(); - if (tx == null) + public StoreFuture flushToStore() { - beginTran(context); - return true; + try + { + if(_conn != null) + { + _conn.commit(); + _conn.close(); + } + } + catch (SQLException e) + { + throw new RuntimeException(e); + } + finally + { + _conn = null; + } + return IMMEDIATE_FUTURE; } - return false; - } - - private synchronized void stateTransition(State requiredState, State newState) throws AMQException - { - if (_state != requiredState) + public void remove() { - throw new AMQException("Cannot transition to the state: " + newState + "; need to be in state: " + requiredState - + "; currently in state: " + _state); + flushToStore(); + DerbyMessageStore.this.removeMessage(_messageId); } - - _state = newState; } + + } |