summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java589
1 files changed, 280 insertions, 309 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index e7b6adaf7a..6be5460d5f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -49,7 +49,10 @@ import org.apache.log4j.Logger;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.plugin.MessageMetaDataType;
-import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.JsonParseException;
@@ -221,19 +224,125 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
@Override
- public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler)
+ public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler)
{
checkConfigurationStoreOpen();
try
{
- recoveryHandler.beginConfigurationRecovery(this, getConfigVersion());
- loadConfiguredObjects(recoveryHandler);
- setConfigVersion(recoveryHandler.completeConfigurationRecovery());
+ int configVersion = getConfigVersion();
+
+ handler.begin(configVersion);
+ doVisitAllConfiguredObjectRecords(handler);
+
+ int newConfigVersion = handler.end();
+ if(newConfigVersion != configVersion)
+ {
+ setConfigVersion(newConfigVersion);
+ }
}
catch (SQLException e)
{
- throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+ throw new StoreException("Cannot visit configured object records", e);
+ }
+
+ }
+
+ private void doVisitAllConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws SQLException
+ {
+ Connection conn = newAutoCommitConnection();
+ Map<UUID, ConfiguredObjectRecordImpl> configuredObjects = new HashMap<UUID, ConfiguredObjectRecordImpl>();
+ final ObjectMapper objectMapper = new ObjectMapper();
+ try
+ {
+ PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS);
+ try
+ {
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+ while (rs.next())
+ {
+ String id = rs.getString(1);
+ String objectType = rs.getString(2);
+ String attributes = getBlobAsString(rs, 3);
+ final ConfiguredObjectRecordImpl configuredObjectRecord =
+ new ConfiguredObjectRecordImpl(UUID.fromString(id), objectType,
+ objectMapper.readValue(attributes, Map.class));
+ configuredObjects.put(configuredObjectRecord.getId(),configuredObjectRecord);
+
+ }
+ }
+ catch (JsonMappingException e)
+ {
+ throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+ catch (JsonParseException e)
+ {
+ throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+ catch (IOException e)
+ {
+ throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+ stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECT_HIERARCHY);
+ try
+ {
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+ while (rs.next())
+ {
+ UUID childId = UUID.fromString(rs.getString(1));
+ String parentType = rs.getString(2);
+ UUID parentId = UUID.fromString(rs.getString(3));
+
+ ConfiguredObjectRecordImpl child = configuredObjects.get(childId);
+ ConfiguredObjectRecordImpl parent = configuredObjects.get(parentId);
+
+ if(child != null && parent != null)
+ {
+ child.addParent(parentType, parent);
+ }
+ else if(child != null && child.getType().endsWith("Binding") && parentType.equals("Exchange"))
+ {
+ // TODO - remove this hack for amq. exchanges
+ child.addParent(parentType, new ConfiguredObjectRecordImpl(parentId, parentType, Collections.<String,Object>emptyMap()));
+ }
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ }
+ finally
+ {
+ conn.close();
+ }
+
+ for(ConfiguredObjectRecord record : configuredObjects.values())
+ {
+ boolean shoudlContinue = handler.handle(record);
+ if (!shoudlContinue)
+ {
+ break;
+ }
}
}
@@ -282,44 +391,25 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
{
createOrOpenMessageStoreDatabase();
upgradeIfNecessary(parent);
- }
- catch (SQLException e)
- {
- throw new StoreException("Unable to activate message store ", e);
- }
- }
- }
- @Override
- public void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler)
- {
- checkMessageStoreOpen();
-
- if(messageRecoveryHandler != null)
- {
- try
- {
- recoverMessages(messageRecoveryHandler);
- }
- catch (SQLException e)
- {
- throw new StoreException("Error encountered when restoring message data from " +
- "persistent store ", e);
- }
- }
- if(transactionLogRecoveryHandler != null)
- {
- try
- {
- TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = recoverQueueEntries(transactionLogRecoveryHandler);
- recoverXids(dtxrh);
+ visitMessages(new MessageHandler()
+ {
+ @Override
+ public boolean handle(StoredMessage<?> storedMessage)
+ {
+ long id = storedMessage.getMessageNumber();
+ if (_messageId.get() < id)
+ {
+ _messageId.set(id);
+ }
+ return true;
+ }
+ });
}
catch (SQLException e)
{
- throw new StoreException("Error encountered when restoring distributed transaction " +
- "data from persistent store ", e);
+ throw new StoreException("Unable to activate message store ", e);
}
-
}
}
@@ -1043,11 +1133,9 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
getLogger().debug("Enqueuing message "
+ messageId
+ " on queue "
- + (queue instanceof AMQQueue
- ? ((AMQQueue) queue).getName()
- : "")
- + queue.getId()
- + "[Connection"
+ + queue.getName()
+ + " with id " + queue.getId()
+ + " [Connection"
+ conn
+ "]");
}
@@ -1068,7 +1156,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
catch (SQLException e)
{
getLogger().error("Failed to enqueue: " + e.getMessage(), e);
- throw new StoreException("Error writing enqueued message with id " + messageId + " for queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" ) + " with id " + queue.getId()
+ throw new StoreException("Error writing enqueued message with id " + messageId + " for queue " + queue.getName() + " with id " + queue.getId()
+ " to database", e);
}
@@ -1093,15 +1181,13 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
if(results != 1)
{
- throw new StoreException("Unable to find message with id " + messageId + " on queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" )
+ throw new StoreException("Unable to find message with id " + messageId + " on queue " + queue.getName()
+ " with id " + queue.getId());
}
if (getLogger().isDebugEnabled())
{
- getLogger().debug("Dequeuing message " + messageId + " on queue " + (queue instanceof AMQQueue
- ? ((AMQQueue) queue).getName()
- : "")
+ getLogger().debug("Dequeuing message " + messageId + " on queue " + queue.getName()
+ " with id " + queue.getId());
}
}
@@ -1114,7 +1200,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
catch (SQLException e)
{
getLogger().error("Failed to dequeue: " + e.getMessage(), e);
- throw new StoreException("Error deleting enqueued message with id " + messageId + " for queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" )
+ throw new StoreException("Error deleting enqueued message with id " + messageId + " for queue " + queue.getName()
+ " with id " + queue.getId() + " from database", e);
}
@@ -1363,131 +1449,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
- private void recoverMessages(MessageStoreRecoveryHandler recoveryHandler) throws SQLException
- {
- Connection conn = newAutoCommitConnection();
- try
- {
- MessageStoreRecoveryHandler.StoredMessageRecoveryHandler messageHandler = recoveryHandler.begin();
-
- Statement stmt = conn.createStatement();
- try
- {
- ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_META_DATA);
- try
- {
-
- long maxId = 0;
-
- while(rs.next())
- {
-
- long messageId = rs.getLong(1);
- if(messageId > maxId)
- {
- maxId = messageId;
- }
-
- byte[] dataAsBytes = getBlobAsBytes(rs, 2);
-
- ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
- buf.position(1);
- buf = buf.slice();
- MessageMetaDataType type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]);
- StorableMessageMetaData metaData = type.createMetaData(buf);
- StoredJDBCMessage message = new StoredJDBCMessage(messageId, metaData, true);
- messageHandler.message(message);
- }
-
- _messageId.set(maxId);
-
- messageHandler.completeMessageRecovery();
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
- }
- finally
- {
- conn.close();
- }
- }
-
-
- private TransactionLogRecoveryHandler.DtxRecordRecoveryHandler recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) throws SQLException
- {
- Connection conn = newAutoCommitConnection();
- try
- {
- TransactionLogRecoveryHandler.QueueEntryRecoveryHandler queueEntryHandler = recoveryHandler.begin(this);
-
- Statement stmt = conn.createStatement();
- try
- {
- ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY);
- try
- {
- while(rs.next())
- {
-
- String id = rs.getString(1);
- long messageId = rs.getLong(2);
- queueEntryHandler.queueEntry(UUID.fromString(id), messageId);
- }
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
-
- return queueEntryHandler.completeQueueEntryRecovery();
- }
- finally
- {
- conn.close();
- }
- }
-
- private static final class Xid
- {
-
- private final long _format;
- private final byte[] _globalId;
- private final byte[] _branchId;
-
- public Xid(long format, byte[] globalId, byte[] branchId)
- {
- _format = format;
- _globalId = globalId;
- _branchId = branchId;
- }
-
- public long getFormat()
- {
- return _format;
- }
-
- public byte[] getGlobalId()
- {
- return _globalId;
- }
-
- public byte[] getBranchId()
- {
- return _branchId;
- }
- }
private static class RecordImpl implements Transaction.Record, TransactionLogResource, EnqueueableMessage
{
@@ -1550,93 +1511,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
}
- private void recoverXids(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh) throws SQLException
- {
- Connection conn = newAutoCommitConnection();
- try
- {
- List<Xid> xids = new ArrayList<Xid>();
-
- Statement stmt = conn.createStatement();
- try
- {
- ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_XIDS);
- try
- {
- while(rs.next())
- {
-
- long format = rs.getLong(1);
- byte[] globalId = rs.getBytes(2);
- byte[] branchId = rs.getBytes(3);
- xids.add(new Xid(format, globalId, branchId));
- }
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
-
-
-
- for(Xid xid : xids)
- {
- List<RecordImpl> enqueues = new ArrayList<RecordImpl>();
- List<RecordImpl> dequeues = new ArrayList<RecordImpl>();
-
- PreparedStatement pstmt = conn.prepareStatement(SELECT_ALL_FROM_XID_ACTIONS);
-
- try
- {
- pstmt.setLong(1, xid.getFormat());
- pstmt.setBytes(2, xid.getGlobalId());
- pstmt.setBytes(3, xid.getBranchId());
-
- ResultSet rs = pstmt.executeQuery();
- try
- {
- while(rs.next())
- {
-
- String actionType = rs.getString(1);
- UUID queueId = UUID.fromString(rs.getString(2));
- long messageId = rs.getLong(3);
-
- RecordImpl record = new RecordImpl(queueId, messageId);
- List<RecordImpl> records = "E".equals(actionType) ? enqueues : dequeues;
- records.add(record);
- }
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- pstmt.close();
- }
-
- dtxrh.dtxRecord(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(),
- enqueues.toArray(new RecordImpl[enqueues.size()]),
- dequeues.toArray(new RecordImpl[dequeues.size()]));
- }
-
-
- dtxrh.completeDtxRecordRecovery();
- }
- finally
- {
- conn.close();
- }
-
- }
-
private StorableMessageMetaData getMetaData(long messageId) throws SQLException
{
@@ -2357,43 +2231,81 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
}
- private void loadConfiguredObjects(ConfigurationRecoveryHandler recoveryHandler) throws SQLException,
- StoreException
+ @Override
+ public void visitMessages(MessageHandler handler) throws StoreException
{
- Connection conn = newAutoCommitConnection();
- Map<UUID, ConfiguredObjectRecordImpl> configuredObjects = new HashMap<UUID, ConfiguredObjectRecordImpl>();
- final ObjectMapper objectMapper = new ObjectMapper();
+ checkMessageStoreOpen();
+
+ Connection conn = null;
try
{
- PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS);
+ conn = newAutoCommitConnection();
+ Statement stmt = conn.createStatement();
try
{
- ResultSet rs = stmt.executeQuery();
+ ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_META_DATA);
try
{
while (rs.next())
{
- String id = rs.getString(1);
- String objectType = rs.getString(2);
- String attributes = getBlobAsString(rs, 3);
- final ConfiguredObjectRecordImpl configuredObjectRecord =
- new ConfiguredObjectRecordImpl(UUID.fromString(id), objectType,
- objectMapper.readValue(attributes, Map.class));
- configuredObjects.put(configuredObjectRecord.getId(),configuredObjectRecord);
-
+ long messageId = rs.getLong(1);
+ byte[] dataAsBytes = getBlobAsBytes(rs, 2);
+ ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
+ buf.position(1);
+ buf = buf.slice();
+ MessageMetaDataType<?> type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]);
+ StorableMessageMetaData metaData = type.createMetaData(buf);
+ StoredJDBCMessage message = new StoredJDBCMessage(messageId, metaData, true);
+ if (!handler.handle(message))
+ {
+ break;
+ }
}
}
- catch (JsonMappingException e)
- {
- throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
- }
- catch (JsonParseException e)
+ finally
{
- throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+ rs.close();
}
- catch (IOException e)
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Error encountered when visiting messages", e);
+ }
+ finally
+ {
+ closeConnection(conn);
+ }
+ }
+
+ @Override
+ public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
+ {
+ checkMessageStoreOpen();
+
+ Connection conn = null;
+ try
+ {
+ conn = newAutoCommitConnection();
+ Statement stmt = conn.createStatement();
+ try
+ {
+ ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY);
+ try
{
- throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+ while(rs.next())
+ {
+ String id = rs.getString(1);
+ long messageId = rs.getLong(2);
+ if (!handler.handle(UUID.fromString(id), messageId))
+ {
+ break;
+ }
+ }
}
finally
{
@@ -2404,31 +2316,41 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
{
stmt.close();
}
- stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECT_HIERARCHY);
+ }
+ catch(SQLException e)
+ {
+ throw new StoreException("Error encountered when visiting message instances", e);
+ }
+ finally
+ {
+ closeConnection(conn);
+ }
+ }
+
+ @Override
+ public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
+ {
+ checkMessageStoreOpen();
+
+ Connection conn = null;
+ try
+ {
+ conn = newAutoCommitConnection();
+ List<Xid> xids = new ArrayList<Xid>();
+
+ Statement stmt = conn.createStatement();
try
{
- ResultSet rs = stmt.executeQuery();
+ ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_XIDS);
try
{
- while (rs.next())
+ while(rs.next())
{
- UUID childId = UUID.fromString(rs.getString(1));
- String parentType = rs.getString(2);
- UUID parentId = UUID.fromString(rs.getString(3));
-
- ConfiguredObjectRecordImpl child = configuredObjects.get(childId);
- ConfiguredObjectRecordImpl parent = configuredObjects.get(parentId);
-
- if(child != null && parent != null)
- {
- child.addParent(parentType, parent);
- }
- else if(child != null && child.getType().endsWith("Binding") && parentType.equals("Exchange"))
- {
- // TODO - remove this hack for amq. exchanges
- child.addParent(parentType, new ConfiguredObjectRecordImpl(parentId, parentType, Collections.<String,Object>emptyMap()));
- }
+ long format = rs.getLong(1);
+ byte[] globalId = rs.getBytes(2);
+ byte[] branchId = rs.getBytes(3);
+ xids.add(new Xid(format, globalId, branchId));
}
}
finally
@@ -2441,18 +2363,67 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
stmt.close();
}
+
+
+ for(Xid xid : xids)
+ {
+ List<RecordImpl> enqueues = new ArrayList<RecordImpl>();
+ List<RecordImpl> dequeues = new ArrayList<RecordImpl>();
+
+ PreparedStatement pstmt = conn.prepareStatement(SELECT_ALL_FROM_XID_ACTIONS);
+
+ try
+ {
+ pstmt.setLong(1, xid.getFormat());
+ pstmt.setBytes(2, xid.getGlobalId());
+ pstmt.setBytes(3, xid.getBranchId());
+
+ ResultSet rs = pstmt.executeQuery();
+ try
+ {
+ while(rs.next())
+ {
+
+ String actionType = rs.getString(1);
+ UUID queueId = UUID.fromString(rs.getString(2));
+ long messageId = rs.getLong(3);
+
+ RecordImpl record = new RecordImpl(queueId, messageId);
+ List<RecordImpl> records = "E".equals(actionType) ? enqueues : dequeues;
+ records.add(record);
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ pstmt.close();
+ }
+
+ if (!handler.handle(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(),
+ enqueues.toArray(new RecordImpl[enqueues.size()]),
+ dequeues.toArray(new RecordImpl[dequeues.size()])))
+ {
+ break;
+ }
+ }
+
}
- finally
+ catch (SQLException e)
{
- conn.close();
- }
+ throw new StoreException("Error encountered when visiting distributed transactions", e);
- for(ConfiguredObjectRecord record : configuredObjects.values())
+ }
+ finally
{
- recoveryHandler.configuredObject(record);
+ closeConnection(conn);
}
}
+
protected abstract String getBlobAsString(ResultSet rs, int col) throws SQLException;
protected abstract void storedSizeChange(int storeSizeIncrease);