summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java420
1 files changed, 383 insertions, 37 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
index 45083c1595..4d63136a9d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
@@ -20,6 +20,24 @@
*/
package org.apache.qpid.server.store;
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.federation.Bridge;
+import org.apache.qpid.server.federation.BrokerLink;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
+import org.apache.qpid.server.logging.messages.MessageStoreMessages;
+import org.apache.qpid.server.logging.messages.TransactionLogMessages;
+import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
@@ -45,23 +63,6 @@ import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.configuration.Configuration;
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.federation.Bridge;
-import org.apache.qpid.server.federation.BrokerLink;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
-import org.apache.qpid.server.logging.messages.MessageStoreMessages;
-import org.apache.qpid.server.logging.messages.TransactionLogMessages;
-import org.apache.qpid.server.message.EnqueableMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-
/**
* An implementation of a {@link MessageStore} that uses Apache Derby as the persistance
* mechanism.
@@ -91,7 +92,9 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor
private static final String LINKS_TABLE_NAME = "QPID_LINKS";
private static final String BRIDGES_TABLE_NAME = "QPID_BRIDGES";
-
+ private static final String XID_TABLE_NAME = "QPID_XIDS";
+ private static final String XID_ACTIONS_TABLE_NAME = "QPID_XID_ACTIONS";
+
private static final int DB_VERSION = 3;
@@ -189,6 +192,31 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor
+ "arguments )"
+ " values (?, ?, ?, ?, ?, ?)";
+ private static final String CREATE_XIDS_TABLE =
+ "CREATE TABLE "+XID_TABLE_NAME+" ( format bigint not null,"
+ + " global_id varchar(64) for bit data, branch_id varchar(64) for bit data, PRIMARY KEY ( format, " +
+ "global_id, branch_id ))";
+ private static final String INSERT_INTO_XIDS =
+ "INSERT INTO "+XID_TABLE_NAME+" ( format, global_id, branch_id ) values (?, ?, ?)";
+ private static final String DELETE_FROM_XIDS = "DELETE FROM " + XID_TABLE_NAME
+ + " WHERE format = ? and global_id = ? and branch_id = ?";
+ private static final String SELECT_ALL_FROM_XIDS = "SELECT format, global_id, branch_id FROM " + XID_TABLE_NAME;
+
+
+ private static final String CREATE_XID_ACTIONS_TABLE =
+ "CREATE TABLE "+XID_ACTIONS_TABLE_NAME+" ( format bigint not null,"
+ + " global_id varchar(64) for bit data not null, branch_id varchar(64) for bit data not null, " +
+ "action_type char not null, queue_name varchar(255) not null, message_id bigint not null" +
+ ", PRIMARY KEY ( " +
+ "format, global_id, branch_id, action_type, queue_name, message_id))";
+ private static final String INSERT_INTO_XID_ACTIONS =
+ "INSERT INTO "+XID_ACTIONS_TABLE_NAME+" ( format, global_id, branch_id, action_type, " +
+ "queue_name, message_id ) values (?,?,?,?,?,?) ";
+ private static final String DELETE_FROM_XID_ACTIONS = "DELETE FROM " + XID_ACTIONS_TABLE_NAME
+ + " WHERE format = ? and global_id = ? and branch_id = ?";
+ private static final String SELECT_ALL_FROM_XID_ACTIONS =
+ "SELECT action_type, queue_name, message_id FROM " + XID_ACTIONS_TABLE_NAME +
+ " WHERE format = ? and global_id = ? and branch_id = ?";
private static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006";
@@ -294,7 +322,8 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor
_configured = true;
}
- recoverQueueEntries(recoveryHandler);
+ TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = recoverQueueEntries(recoveryHandler);
+ recoverXids(dtxrh);
}
@@ -349,7 +378,8 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor
createMessageContentTable(conn);
createLinkTable(conn);
createBridgeTable(conn);
-
+ createXidTable(conn);
+ createXidActionTable(conn);
conn.close();
}
@@ -518,8 +548,38 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor
}
}
+ private void createXidTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(XID_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute(CREATE_XIDS_TABLE);
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ }
+ private void createXidActionTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(XID_ACTIONS_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute(CREATE_XID_ACTIONS_TABLE);
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ }
private boolean tableExists(final String tableName, final Connection conn) throws SQLException
{
@@ -650,12 +710,12 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor
conn = newAutoCommitConnection();
PreparedStatement stmt = conn.prepareStatement(SELECT_ALL_FROM_BRIDGES);
- stmt.setLong(1, linkId.getLeastSignificantBits());
- stmt.setLong(2, linkId.getMostSignificantBits());
-
try
{
+ stmt.setLong(1, linkId.getLeastSignificantBits());
+ stmt.setLong(2, linkId.getMostSignificantBits());
+
ResultSet rs = stmt.executeQuery();
try
@@ -1110,11 +1170,7 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor
insertStmt.setString(3, routingKey == null ? null : routingKey.toString());
if(args != null)
{
- /* This would be the Java 6 way of setting a Blob
- Blob blobArgs = conn.createBlob();
- blobArgs.setBytes(0, args.getDataAsBytes());
- stmt.setBlob(4, blobArgs);
- */
+ // TODO - In Java 6 we could use create/set Blob
byte[] bytes = args.getDataAsBytes();
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
insertStmt.setBinaryStream(4, bis, bytes.length);
@@ -1712,7 +1768,7 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor
if (_logger.isDebugEnabled())
{
- _logger.debug("Dequeuing message " + messageId + " on queue " + name );//+ "[Connection" + conn + "]");
+ _logger.debug("Dequeuing message " + messageId + " on queue " + name );
}
}
finally
@@ -1729,6 +1785,126 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor
}
+
+ private void removeXid(ConnectionWrapper connWrapper, long format, byte[] globalId, byte[] branchId)
+ throws AMQStoreException
+ {
+ Connection conn = connWrapper.getConnection();
+
+
+ try
+ {
+ PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_XIDS);
+ try
+ {
+ stmt.setLong(1,format);
+ stmt.setBytes(2,globalId);
+ stmt.setBytes(3,branchId);
+ int results = stmt.executeUpdate();
+
+
+
+ if(results != 1)
+ {
+ throw new AMQStoreException("Unable to find message with xid");
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ stmt = conn.prepareStatement(DELETE_FROM_XID_ACTIONS);
+ try
+ {
+ stmt.setLong(1,format);
+ stmt.setBytes(2,globalId);
+ stmt.setBytes(3,branchId);
+ int results = stmt.executeUpdate();
+
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ }
+ catch (SQLException e)
+ {
+ _logger.error("Failed to dequeue: " + e.getMessage(), e);
+ throw new AMQStoreException("Error deleting enqueued message with xid", e);
+ }
+
+ }
+
+
+ private void recordXid(ConnectionWrapper connWrapper, long format, byte[] globalId, byte[] branchId,
+ Transaction.Record[] enqueues, Transaction.Record[] dequeues) throws AMQStoreException
+ {
+ Connection conn = connWrapper.getConnection();
+
+
+ try
+ {
+
+ PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_XIDS);
+ try
+ {
+ stmt.setLong(1,format);
+ stmt.setBytes(2, globalId);
+ stmt.setBytes(3, branchId);
+ stmt.executeUpdate();
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ stmt = conn.prepareStatement(INSERT_INTO_XID_ACTIONS);
+
+ try
+ {
+ stmt.setLong(1,format);
+ stmt.setBytes(2, globalId);
+ stmt.setBytes(3, branchId);
+
+ if(enqueues != null)
+ {
+ stmt.setString(4, "E");
+ for(Transaction.Record record : enqueues)
+ {
+ stmt.setString(5, record.getQueue().getResourceName());
+ stmt.setLong(6, record.getMessage().getMessageNumber());
+ stmt.executeUpdate();
+ }
+ }
+
+ if(dequeues != null)
+ {
+ stmt.setString(4, "D");
+ for(Transaction.Record record : dequeues)
+ {
+ stmt.setString(5, record.getQueue().getResourceName());
+ stmt.setLong(6, record.getMessage().getMessageNumber());
+ stmt.executeUpdate();
+ }
+ }
+
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ }
+ catch (SQLException e)
+ {
+ _logger.error("Failed to enqueue: " + e.getMessage(), e);
+ throw new AMQStoreException("Error writing xid ", e);
+ }
+
+ }
+
private static final class ConnectionWrapper
{
private final Connection _connection;
@@ -1922,7 +2098,7 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor
- private void recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) throws SQLException
+ private TransactionLogRecoveryHandler.DtxRecordRecoveryHandler recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) throws SQLException
{
Connection conn = newAutoCommitConnection();
try
@@ -1953,7 +2129,7 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor
stmt.close();
}
- queueEntryHandler.completeQueueEntryRecovery();
+ return queueEntryHandler.completeQueueEntryRecovery();
}
finally
{
@@ -1961,6 +2137,166 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor
}
}
+ private static final class Xid
+ {
+
+ private final long _format;
+ private final byte[] _globalId;
+ private final byte[] _branchId;
+
+ public Xid(long format, byte[] globalId, byte[] branchId)
+ {
+ _format = format;
+ _globalId = globalId;
+ _branchId = branchId;
+ }
+
+ public long getFormat()
+ {
+ return _format;
+ }
+
+ public byte[] getGlobalId()
+ {
+ return _globalId;
+ }
+
+ public byte[] getBranchId()
+ {
+ return _branchId;
+ }
+ }
+
+ private static class RecordImpl implements MessageStore.Transaction.Record, TransactionLogResource, EnqueableMessage
+ {
+
+ private final String _queueName;
+ private long _messageNumber;
+
+ public RecordImpl(String queueName, long messageNumber)
+ {
+ _queueName = queueName;
+ _messageNumber = messageNumber;
+ }
+
+ public TransactionLogResource getQueue()
+ {
+ return this;
+ }
+
+ public EnqueableMessage getMessage()
+ {
+ return this;
+ }
+
+ public long getMessageNumber()
+ {
+ return _messageNumber;
+ }
+
+ public boolean isPersistent()
+ {
+ return true;
+ }
+
+ public StoredMessage getStoredMessage()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public String getResourceName()
+ {
+ return _queueName;
+ }
+ }
+
+ private void recoverXids(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh) throws SQLException
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ List<Xid> xids = new ArrayList<Xid>();
+
+ Statement stmt = conn.createStatement();
+ try
+ {
+ ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_XIDS);
+ try
+ {
+ while(rs.next())
+ {
+
+ long format = rs.getLong(1);
+ byte[] globalId = rs.getBytes(2);
+ byte[] branchId = rs.getBytes(3);
+ xids.add(new Xid(format, globalId, branchId));
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+
+
+ for(Xid xid : xids)
+ {
+ List<RecordImpl> enqueues = new ArrayList<RecordImpl>();
+ List<RecordImpl> dequeues = new ArrayList<RecordImpl>();
+
+ PreparedStatement pstmt = conn.prepareStatement(SELECT_ALL_FROM_XID_ACTIONS);
+
+ try
+ {
+ pstmt.setLong(1, xid.getFormat());
+ pstmt.setBytes(2, xid.getGlobalId());
+ pstmt.setBytes(3, xid.getBranchId());
+
+ ResultSet rs = pstmt.executeQuery();
+ try
+ {
+ while(rs.next())
+ {
+
+ String actionType = rs.getString(1);
+ String queueName = rs.getString(2);
+ long messageId = rs.getLong(3);
+
+ RecordImpl record = new RecordImpl(queueName, messageId);
+ List<RecordImpl> records = "E".equals(actionType) ? enqueues : dequeues;
+ records.add(record);
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ pstmt.close();
+ }
+
+ dtxrh.dtxRecord(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(),
+ enqueues.toArray(new RecordImpl[enqueues.size()]),
+ dequeues.toArray(new RecordImpl[dequeues.size()]));
+ }
+
+
+ dtxrh.completeDtxRecordRecovery();
+ }
+ finally
+ {
+ conn.close();
+ }
+
+ }
+
StorableMessageMetaData getMetaData(long messageId) throws SQLException
{
@@ -2031,11 +2367,8 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor
stmt.setInt(3, offset+chunkData.length);
- /* this would be the Java 6 way of doing things
- Blob dataAsBlob = conn.createBlob();
- dataAsBlob.setBytes(1L, chunkData);
- stmt.setBlob(3, dataAsBlob);
- */
+ // TODO in Java 6 we could just use blobs
+
ByteArrayInputStream bis = new ByteArrayInputStream(chunkData);
stmt.setBinaryStream(4, bis, chunkData.length);
stmt.executeUpdate();
@@ -2181,8 +2514,21 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor
{
DerbyMessageStore.this.abortTran(_connWrapper);
}
+
+ public void removeXid(long format, byte[] globalId, byte[] branchId) throws AMQStoreException
+ {
+ DerbyMessageStore.this.removeXid(_connWrapper, format, globalId, branchId);
+ }
+
+ public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
+ throws AMQStoreException
+ {
+ DerbyMessageStore.this.recordXid(_connWrapper, format, globalId, branchId, enqueues, dequeues);
+ }
}
+
+
private class StoredDerbyMessage implements StoredMessage
{
@@ -2366,4 +2712,4 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor
}
}
-}
+} \ No newline at end of file