diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java | 420 |
1 files changed, 383 insertions, 37 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java index 45083c1595..4d63136a9d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java @@ -20,6 +20,24 @@ */ package org.apache.qpid.server.store; +import org.apache.commons.configuration.Configuration; +import org.apache.log4j.Logger; + +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.federation.Bridge; +import org.apache.qpid.server.federation.BrokerLink; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.messages.ConfigStoreMessages; +import org.apache.qpid.server.logging.messages.MessageStoreMessages; +import org.apache.qpid.server.logging.messages.TransactionLogMessages; +import org.apache.qpid.server.message.EnqueableMessage; +import org.apache.qpid.server.queue.AMQQueue; + import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; @@ -45,23 +63,6 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import org.apache.commons.configuration.Configuration; -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.AMQStoreException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.federation.Bridge; -import org.apache.qpid.server.federation.BrokerLink; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.messages.ConfigStoreMessages; -import org.apache.qpid.server.logging.messages.MessageStoreMessages; -import org.apache.qpid.server.logging.messages.TransactionLogMessages; -import org.apache.qpid.server.message.EnqueableMessage; -import org.apache.qpid.server.queue.AMQQueue; - /** * An implementation of a {@link MessageStore} that uses Apache Derby as the persistance * mechanism. @@ -91,7 +92,9 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor private static final String LINKS_TABLE_NAME = "QPID_LINKS"; private static final String BRIDGES_TABLE_NAME = "QPID_BRIDGES"; - + private static final String XID_TABLE_NAME = "QPID_XIDS"; + private static final String XID_ACTIONS_TABLE_NAME = "QPID_XID_ACTIONS"; + private static final int DB_VERSION = 3; @@ -189,6 +192,31 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor + "arguments )" + " values (?, ?, ?, ?, ?, ?)"; + private static final String CREATE_XIDS_TABLE = + "CREATE TABLE "+XID_TABLE_NAME+" ( format bigint not null," + + " global_id varchar(64) for bit data, branch_id varchar(64) for bit data, PRIMARY KEY ( format, " + + "global_id, branch_id ))"; + private static final String INSERT_INTO_XIDS = + "INSERT INTO "+XID_TABLE_NAME+" ( format, global_id, branch_id ) values (?, ?, ?)"; + private static final String DELETE_FROM_XIDS = "DELETE FROM " + XID_TABLE_NAME + + " WHERE format = ? and global_id = ? and branch_id = ?"; + private static final String SELECT_ALL_FROM_XIDS = "SELECT format, global_id, branch_id FROM " + XID_TABLE_NAME; + + + private static final String CREATE_XID_ACTIONS_TABLE = + "CREATE TABLE "+XID_ACTIONS_TABLE_NAME+" ( format bigint not null," + + " global_id varchar(64) for bit data not null, branch_id varchar(64) for bit data not null, " + + "action_type char not null, queue_name varchar(255) not null, message_id bigint not null" + + ", PRIMARY KEY ( " + + "format, global_id, branch_id, action_type, queue_name, message_id))"; + private static final String INSERT_INTO_XID_ACTIONS = + "INSERT INTO "+XID_ACTIONS_TABLE_NAME+" ( format, global_id, branch_id, action_type, " + + "queue_name, message_id ) values (?,?,?,?,?,?) "; + private static final String DELETE_FROM_XID_ACTIONS = "DELETE FROM " + XID_ACTIONS_TABLE_NAME + + " WHERE format = ? and global_id = ? and branch_id = ?"; + private static final String SELECT_ALL_FROM_XID_ACTIONS = + "SELECT action_type, queue_name, message_id FROM " + XID_ACTIONS_TABLE_NAME + + " WHERE format = ? and global_id = ? and branch_id = ?"; private static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006"; @@ -294,7 +322,8 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor _configured = true; } - recoverQueueEntries(recoveryHandler); + TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = recoverQueueEntries(recoveryHandler); + recoverXids(dtxrh); } @@ -349,7 +378,8 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor createMessageContentTable(conn); createLinkTable(conn); createBridgeTable(conn); - + createXidTable(conn); + createXidActionTable(conn); conn.close(); } @@ -518,8 +548,38 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor } } + private void createXidTable(final Connection conn) throws SQLException + { + if(!tableExists(XID_TABLE_NAME, conn)) + { + Statement stmt = conn.createStatement(); + try + { + stmt.execute(CREATE_XIDS_TABLE); + } + finally + { + stmt.close(); + } + } + } + private void createXidActionTable(final Connection conn) throws SQLException + { + if(!tableExists(XID_ACTIONS_TABLE_NAME, conn)) + { + Statement stmt = conn.createStatement(); + try + { + stmt.execute(CREATE_XID_ACTIONS_TABLE); + } + finally + { + stmt.close(); + } + } + } private boolean tableExists(final String tableName, final Connection conn) throws SQLException { @@ -650,12 +710,12 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor conn = newAutoCommitConnection(); PreparedStatement stmt = conn.prepareStatement(SELECT_ALL_FROM_BRIDGES); - stmt.setLong(1, linkId.getLeastSignificantBits()); - stmt.setLong(2, linkId.getMostSignificantBits()); - try { + stmt.setLong(1, linkId.getLeastSignificantBits()); + stmt.setLong(2, linkId.getMostSignificantBits()); + ResultSet rs = stmt.executeQuery(); try @@ -1110,11 +1170,7 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor insertStmt.setString(3, routingKey == null ? null : routingKey.toString()); if(args != null) { - /* This would be the Java 6 way of setting a Blob - Blob blobArgs = conn.createBlob(); - blobArgs.setBytes(0, args.getDataAsBytes()); - stmt.setBlob(4, blobArgs); - */ + // TODO - In Java 6 we could use create/set Blob byte[] bytes = args.getDataAsBytes(); ByteArrayInputStream bis = new ByteArrayInputStream(bytes); insertStmt.setBinaryStream(4, bis, bytes.length); @@ -1712,7 +1768,7 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor if (_logger.isDebugEnabled()) { - _logger.debug("Dequeuing message " + messageId + " on queue " + name );//+ "[Connection" + conn + "]"); + _logger.debug("Dequeuing message " + messageId + " on queue " + name ); } } finally @@ -1729,6 +1785,126 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor } + + private void removeXid(ConnectionWrapper connWrapper, long format, byte[] globalId, byte[] branchId) + throws AMQStoreException + { + Connection conn = connWrapper.getConnection(); + + + try + { + PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_XIDS); + try + { + stmt.setLong(1,format); + stmt.setBytes(2,globalId); + stmt.setBytes(3,branchId); + int results = stmt.executeUpdate(); + + + + if(results != 1) + { + throw new AMQStoreException("Unable to find message with xid"); + } + } + finally + { + stmt.close(); + } + + stmt = conn.prepareStatement(DELETE_FROM_XID_ACTIONS); + try + { + stmt.setLong(1,format); + stmt.setBytes(2,globalId); + stmt.setBytes(3,branchId); + int results = stmt.executeUpdate(); + + } + finally + { + stmt.close(); + } + + } + catch (SQLException e) + { + _logger.error("Failed to dequeue: " + e.getMessage(), e); + throw new AMQStoreException("Error deleting enqueued message with xid", e); + } + + } + + + private void recordXid(ConnectionWrapper connWrapper, long format, byte[] globalId, byte[] branchId, + Transaction.Record[] enqueues, Transaction.Record[] dequeues) throws AMQStoreException + { + Connection conn = connWrapper.getConnection(); + + + try + { + + PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_XIDS); + try + { + stmt.setLong(1,format); + stmt.setBytes(2, globalId); + stmt.setBytes(3, branchId); + stmt.executeUpdate(); + } + finally + { + stmt.close(); + } + + stmt = conn.prepareStatement(INSERT_INTO_XID_ACTIONS); + + try + { + stmt.setLong(1,format); + stmt.setBytes(2, globalId); + stmt.setBytes(3, branchId); + + if(enqueues != null) + { + stmt.setString(4, "E"); + for(Transaction.Record record : enqueues) + { + stmt.setString(5, record.getQueue().getResourceName()); + stmt.setLong(6, record.getMessage().getMessageNumber()); + stmt.executeUpdate(); + } + } + + if(dequeues != null) + { + stmt.setString(4, "D"); + for(Transaction.Record record : dequeues) + { + stmt.setString(5, record.getQueue().getResourceName()); + stmt.setLong(6, record.getMessage().getMessageNumber()); + stmt.executeUpdate(); + } + } + + } + finally + { + stmt.close(); + } + + } + catch (SQLException e) + { + _logger.error("Failed to enqueue: " + e.getMessage(), e); + throw new AMQStoreException("Error writing xid ", e); + } + + } + private static final class ConnectionWrapper { private final Connection _connection; @@ -1922,7 +2098,7 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor - private void recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) throws SQLException + private TransactionLogRecoveryHandler.DtxRecordRecoveryHandler recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) throws SQLException { Connection conn = newAutoCommitConnection(); try @@ -1953,7 +2129,7 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor stmt.close(); } - queueEntryHandler.completeQueueEntryRecovery(); + return queueEntryHandler.completeQueueEntryRecovery(); } finally { @@ -1961,6 +2137,166 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor } } + private static final class Xid + { + + private final long _format; + private final byte[] _globalId; + private final byte[] _branchId; + + public Xid(long format, byte[] globalId, byte[] branchId) + { + _format = format; + _globalId = globalId; + _branchId = branchId; + } + + public long getFormat() + { + return _format; + } + + public byte[] getGlobalId() + { + return _globalId; + } + + public byte[] getBranchId() + { + return _branchId; + } + } + + private static class RecordImpl implements MessageStore.Transaction.Record, TransactionLogResource, EnqueableMessage + { + + private final String _queueName; + private long _messageNumber; + + public RecordImpl(String queueName, long messageNumber) + { + _queueName = queueName; + _messageNumber = messageNumber; + } + + public TransactionLogResource getQueue() + { + return this; + } + + public EnqueableMessage getMessage() + { + return this; + } + + public long getMessageNumber() + { + return _messageNumber; + } + + public boolean isPersistent() + { + return true; + } + + public StoredMessage getStoredMessage() + { + throw new UnsupportedOperationException(); + } + + public String getResourceName() + { + return _queueName; + } + } + + private void recoverXids(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh) throws SQLException + { + Connection conn = newAutoCommitConnection(); + try + { + List<Xid> xids = new ArrayList<Xid>(); + + Statement stmt = conn.createStatement(); + try + { + ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_XIDS); + try + { + while(rs.next()) + { + + long format = rs.getLong(1); + byte[] globalId = rs.getBytes(2); + byte[] branchId = rs.getBytes(3); + xids.add(new Xid(format, globalId, branchId)); + } + } + finally + { + rs.close(); + } + } + finally + { + stmt.close(); + } + + + + for(Xid xid : xids) + { + List<RecordImpl> enqueues = new ArrayList<RecordImpl>(); + List<RecordImpl> dequeues = new ArrayList<RecordImpl>(); + + PreparedStatement pstmt = conn.prepareStatement(SELECT_ALL_FROM_XID_ACTIONS); + + try + { + pstmt.setLong(1, xid.getFormat()); + pstmt.setBytes(2, xid.getGlobalId()); + pstmt.setBytes(3, xid.getBranchId()); + + ResultSet rs = pstmt.executeQuery(); + try + { + while(rs.next()) + { + + String actionType = rs.getString(1); + String queueName = rs.getString(2); + long messageId = rs.getLong(3); + + RecordImpl record = new RecordImpl(queueName, messageId); + List<RecordImpl> records = "E".equals(actionType) ? enqueues : dequeues; + records.add(record); + } + } + finally + { + rs.close(); + } + } + finally + { + pstmt.close(); + } + + dtxrh.dtxRecord(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(), + enqueues.toArray(new RecordImpl[enqueues.size()]), + dequeues.toArray(new RecordImpl[dequeues.size()])); + } + + + dtxrh.completeDtxRecordRecovery(); + } + finally + { + conn.close(); + } + + } + StorableMessageMetaData getMetaData(long messageId) throws SQLException { @@ -2031,11 +2367,8 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor stmt.setInt(3, offset+chunkData.length); - /* this would be the Java 6 way of doing things - Blob dataAsBlob = conn.createBlob(); - dataAsBlob.setBytes(1L, chunkData); - stmt.setBlob(3, dataAsBlob); - */ + // TODO in Java 6 we could just use blobs + ByteArrayInputStream bis = new ByteArrayInputStream(chunkData); stmt.setBinaryStream(4, bis, chunkData.length); stmt.executeUpdate(); @@ -2181,8 +2514,21 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor { DerbyMessageStore.this.abortTran(_connWrapper); } + + public void removeXid(long format, byte[] globalId, byte[] branchId) throws AMQStoreException + { + DerbyMessageStore.this.removeXid(_connWrapper, format, globalId, branchId); + } + + public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues) + throws AMQStoreException + { + DerbyMessageStore.this.recordXid(_connWrapper, format, globalId, branchId, enqueues, dequeues); + } } + + private class StoredDerbyMessage implements StoredMessage { @@ -2366,4 +2712,4 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor } } -} +}
\ No newline at end of file |