diff options
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java')
-rw-r--r-- | qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java | 589 |
1 files changed, 280 insertions, 309 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java index e7b6adaf7a..6be5460d5f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java @@ -49,7 +49,10 @@ import org.apache.log4j.Logger; import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.plugin.MessageMetaDataType; -import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; +import org.apache.qpid.server.store.handler.DistributedTransactionHandler; +import org.apache.qpid.server.store.handler.MessageHandler; +import org.apache.qpid.server.store.handler.MessageInstanceHandler; import org.codehaus.jackson.JsonGenerationException; import org.codehaus.jackson.JsonGenerator; import org.codehaus.jackson.JsonParseException; @@ -221,19 +224,125 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } @Override - public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler) + public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) { checkConfigurationStoreOpen(); try { - recoveryHandler.beginConfigurationRecovery(this, getConfigVersion()); - loadConfiguredObjects(recoveryHandler); - setConfigVersion(recoveryHandler.completeConfigurationRecovery()); + int configVersion = getConfigVersion(); + + handler.begin(configVersion); + doVisitAllConfiguredObjectRecords(handler); + + int newConfigVersion = handler.end(); + if(newConfigVersion != configVersion) + { + setConfigVersion(newConfigVersion); + } } catch (SQLException e) { - throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); + throw new StoreException("Cannot visit configured object records", e); + } + + } + + private void doVisitAllConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws SQLException + { + Connection conn = newAutoCommitConnection(); + Map<UUID, ConfiguredObjectRecordImpl> configuredObjects = new HashMap<UUID, ConfiguredObjectRecordImpl>(); + final ObjectMapper objectMapper = new ObjectMapper(); + try + { + PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS); + try + { + ResultSet rs = stmt.executeQuery(); + try + { + while (rs.next()) + { + String id = rs.getString(1); + String objectType = rs.getString(2); + String attributes = getBlobAsString(rs, 3); + final ConfiguredObjectRecordImpl configuredObjectRecord = + new ConfiguredObjectRecordImpl(UUID.fromString(id), objectType, + objectMapper.readValue(attributes, Map.class)); + configuredObjects.put(configuredObjectRecord.getId(),configuredObjectRecord); + + } + } + catch (JsonMappingException e) + { + throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); + } + catch (JsonParseException e) + { + throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); + } + catch (IOException e) + { + throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); + } + finally + { + rs.close(); + } + } + finally + { + stmt.close(); + } + stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECT_HIERARCHY); + try + { + ResultSet rs = stmt.executeQuery(); + try + { + while (rs.next()) + { + UUID childId = UUID.fromString(rs.getString(1)); + String parentType = rs.getString(2); + UUID parentId = UUID.fromString(rs.getString(3)); + + ConfiguredObjectRecordImpl child = configuredObjects.get(childId); + ConfiguredObjectRecordImpl parent = configuredObjects.get(parentId); + + if(child != null && parent != null) + { + child.addParent(parentType, parent); + } + else if(child != null && child.getType().endsWith("Binding") && parentType.equals("Exchange")) + { + // TODO - remove this hack for amq. exchanges + child.addParent(parentType, new ConfiguredObjectRecordImpl(parentId, parentType, Collections.<String,Object>emptyMap())); + } + } + } + finally + { + rs.close(); + } + } + finally + { + stmt.close(); + } + + } + finally + { + conn.close(); + } + + for(ConfiguredObjectRecord record : configuredObjects.values()) + { + boolean shoudlContinue = handler.handle(record); + if (!shoudlContinue) + { + break; + } } } @@ -282,44 +391,25 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC { createOrOpenMessageStoreDatabase(); upgradeIfNecessary(parent); - } - catch (SQLException e) - { - throw new StoreException("Unable to activate message store ", e); - } - } - } - @Override - public void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler) - { - checkMessageStoreOpen(); - - if(messageRecoveryHandler != null) - { - try - { - recoverMessages(messageRecoveryHandler); - } - catch (SQLException e) - { - throw new StoreException("Error encountered when restoring message data from " + - "persistent store ", e); - } - } - if(transactionLogRecoveryHandler != null) - { - try - { - TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = recoverQueueEntries(transactionLogRecoveryHandler); - recoverXids(dtxrh); + visitMessages(new MessageHandler() + { + @Override + public boolean handle(StoredMessage<?> storedMessage) + { + long id = storedMessage.getMessageNumber(); + if (_messageId.get() < id) + { + _messageId.set(id); + } + return true; + } + }); } catch (SQLException e) { - throw new StoreException("Error encountered when restoring distributed transaction " + - "data from persistent store ", e); + throw new StoreException("Unable to activate message store ", e); } - } } @@ -1043,11 +1133,9 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC getLogger().debug("Enqueuing message " + messageId + " on queue " - + (queue instanceof AMQQueue - ? ((AMQQueue) queue).getName() - : "") - + queue.getId() - + "[Connection" + + queue.getName() + + " with id " + queue.getId() + + " [Connection" + conn + "]"); } @@ -1068,7 +1156,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC catch (SQLException e) { getLogger().error("Failed to enqueue: " + e.getMessage(), e); - throw new StoreException("Error writing enqueued message with id " + messageId + " for queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" ) + " with id " + queue.getId() + throw new StoreException("Error writing enqueued message with id " + messageId + " for queue " + queue.getName() + " with id " + queue.getId() + " to database", e); } @@ -1093,15 +1181,13 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC if(results != 1) { - throw new StoreException("Unable to find message with id " + messageId + " on queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" ) + throw new StoreException("Unable to find message with id " + messageId + " on queue " + queue.getName() + " with id " + queue.getId()); } if (getLogger().isDebugEnabled()) { - getLogger().debug("Dequeuing message " + messageId + " on queue " + (queue instanceof AMQQueue - ? ((AMQQueue) queue).getName() - : "") + getLogger().debug("Dequeuing message " + messageId + " on queue " + queue.getName() + " with id " + queue.getId()); } } @@ -1114,7 +1200,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC catch (SQLException e) { getLogger().error("Failed to dequeue: " + e.getMessage(), e); - throw new StoreException("Error deleting enqueued message with id " + messageId + " for queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" ) + throw new StoreException("Error deleting enqueued message with id " + messageId + " for queue " + queue.getName() + " with id " + queue.getId() + " from database", e); } @@ -1363,131 +1449,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } - private void recoverMessages(MessageStoreRecoveryHandler recoveryHandler) throws SQLException - { - Connection conn = newAutoCommitConnection(); - try - { - MessageStoreRecoveryHandler.StoredMessageRecoveryHandler messageHandler = recoveryHandler.begin(); - - Statement stmt = conn.createStatement(); - try - { - ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_META_DATA); - try - { - - long maxId = 0; - - while(rs.next()) - { - - long messageId = rs.getLong(1); - if(messageId > maxId) - { - maxId = messageId; - } - - byte[] dataAsBytes = getBlobAsBytes(rs, 2); - - ByteBuffer buf = ByteBuffer.wrap(dataAsBytes); - buf.position(1); - buf = buf.slice(); - MessageMetaDataType type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]); - StorableMessageMetaData metaData = type.createMetaData(buf); - StoredJDBCMessage message = new StoredJDBCMessage(messageId, metaData, true); - messageHandler.message(message); - } - - _messageId.set(maxId); - - messageHandler.completeMessageRecovery(); - } - finally - { - rs.close(); - } - } - finally - { - stmt.close(); - } - } - finally - { - conn.close(); - } - } - - - private TransactionLogRecoveryHandler.DtxRecordRecoveryHandler recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) throws SQLException - { - Connection conn = newAutoCommitConnection(); - try - { - TransactionLogRecoveryHandler.QueueEntryRecoveryHandler queueEntryHandler = recoveryHandler.begin(this); - - Statement stmt = conn.createStatement(); - try - { - ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY); - try - { - while(rs.next()) - { - - String id = rs.getString(1); - long messageId = rs.getLong(2); - queueEntryHandler.queueEntry(UUID.fromString(id), messageId); - } - } - finally - { - rs.close(); - } - } - finally - { - stmt.close(); - } - - return queueEntryHandler.completeQueueEntryRecovery(); - } - finally - { - conn.close(); - } - } - - private static final class Xid - { - - private final long _format; - private final byte[] _globalId; - private final byte[] _branchId; - - public Xid(long format, byte[] globalId, byte[] branchId) - { - _format = format; - _globalId = globalId; - _branchId = branchId; - } - - public long getFormat() - { - return _format; - } - - public byte[] getGlobalId() - { - return _globalId; - } - - public byte[] getBranchId() - { - return _branchId; - } - } private static class RecordImpl implements Transaction.Record, TransactionLogResource, EnqueueableMessage { @@ -1550,93 +1511,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } } - private void recoverXids(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh) throws SQLException - { - Connection conn = newAutoCommitConnection(); - try - { - List<Xid> xids = new ArrayList<Xid>(); - - Statement stmt = conn.createStatement(); - try - { - ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_XIDS); - try - { - while(rs.next()) - { - - long format = rs.getLong(1); - byte[] globalId = rs.getBytes(2); - byte[] branchId = rs.getBytes(3); - xids.add(new Xid(format, globalId, branchId)); - } - } - finally - { - rs.close(); - } - } - finally - { - stmt.close(); - } - - - - for(Xid xid : xids) - { - List<RecordImpl> enqueues = new ArrayList<RecordImpl>(); - List<RecordImpl> dequeues = new ArrayList<RecordImpl>(); - - PreparedStatement pstmt = conn.prepareStatement(SELECT_ALL_FROM_XID_ACTIONS); - - try - { - pstmt.setLong(1, xid.getFormat()); - pstmt.setBytes(2, xid.getGlobalId()); - pstmt.setBytes(3, xid.getBranchId()); - - ResultSet rs = pstmt.executeQuery(); - try - { - while(rs.next()) - { - - String actionType = rs.getString(1); - UUID queueId = UUID.fromString(rs.getString(2)); - long messageId = rs.getLong(3); - - RecordImpl record = new RecordImpl(queueId, messageId); - List<RecordImpl> records = "E".equals(actionType) ? enqueues : dequeues; - records.add(record); - } - } - finally - { - rs.close(); - } - } - finally - { - pstmt.close(); - } - - dtxrh.dtxRecord(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(), - enqueues.toArray(new RecordImpl[enqueues.size()]), - dequeues.toArray(new RecordImpl[dequeues.size()])); - } - - - dtxrh.completeDtxRecordRecovery(); - } - finally - { - conn.close(); - } - - } - private StorableMessageMetaData getMetaData(long messageId) throws SQLException { @@ -2357,43 +2231,81 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } } - private void loadConfiguredObjects(ConfigurationRecoveryHandler recoveryHandler) throws SQLException, - StoreException + @Override + public void visitMessages(MessageHandler handler) throws StoreException { - Connection conn = newAutoCommitConnection(); - Map<UUID, ConfiguredObjectRecordImpl> configuredObjects = new HashMap<UUID, ConfiguredObjectRecordImpl>(); - final ObjectMapper objectMapper = new ObjectMapper(); + checkMessageStoreOpen(); + + Connection conn = null; try { - PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS); + conn = newAutoCommitConnection(); + Statement stmt = conn.createStatement(); try { - ResultSet rs = stmt.executeQuery(); + ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_META_DATA); try { while (rs.next()) { - String id = rs.getString(1); - String objectType = rs.getString(2); - String attributes = getBlobAsString(rs, 3); - final ConfiguredObjectRecordImpl configuredObjectRecord = - new ConfiguredObjectRecordImpl(UUID.fromString(id), objectType, - objectMapper.readValue(attributes, Map.class)); - configuredObjects.put(configuredObjectRecord.getId(),configuredObjectRecord); - + long messageId = rs.getLong(1); + byte[] dataAsBytes = getBlobAsBytes(rs, 2); + ByteBuffer buf = ByteBuffer.wrap(dataAsBytes); + buf.position(1); + buf = buf.slice(); + MessageMetaDataType<?> type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]); + StorableMessageMetaData metaData = type.createMetaData(buf); + StoredJDBCMessage message = new StoredJDBCMessage(messageId, metaData, true); + if (!handler.handle(message)) + { + break; + } } } - catch (JsonMappingException e) - { - throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); - } - catch (JsonParseException e) + finally { - throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); + rs.close(); } - catch (IOException e) + } + finally + { + stmt.close(); + } + } + catch (SQLException e) + { + throw new StoreException("Error encountered when visiting messages", e); + } + finally + { + closeConnection(conn); + } + } + + @Override + public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException + { + checkMessageStoreOpen(); + + Connection conn = null; + try + { + conn = newAutoCommitConnection(); + Statement stmt = conn.createStatement(); + try + { + ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY); + try { - throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); + while(rs.next()) + { + String id = rs.getString(1); + long messageId = rs.getLong(2); + if (!handler.handle(UUID.fromString(id), messageId)) + { + break; + } + } } finally { @@ -2404,31 +2316,41 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC { stmt.close(); } - stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECT_HIERARCHY); + } + catch(SQLException e) + { + throw new StoreException("Error encountered when visiting message instances", e); + } + finally + { + closeConnection(conn); + } + } + + @Override + public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException + { + checkMessageStoreOpen(); + + Connection conn = null; + try + { + conn = newAutoCommitConnection(); + List<Xid> xids = new ArrayList<Xid>(); + + Statement stmt = conn.createStatement(); try { - ResultSet rs = stmt.executeQuery(); + ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_XIDS); try { - while (rs.next()) + while(rs.next()) { - UUID childId = UUID.fromString(rs.getString(1)); - String parentType = rs.getString(2); - UUID parentId = UUID.fromString(rs.getString(3)); - - ConfiguredObjectRecordImpl child = configuredObjects.get(childId); - ConfiguredObjectRecordImpl parent = configuredObjects.get(parentId); - - if(child != null && parent != null) - { - child.addParent(parentType, parent); - } - else if(child != null && child.getType().endsWith("Binding") && parentType.equals("Exchange")) - { - // TODO - remove this hack for amq. exchanges - child.addParent(parentType, new ConfiguredObjectRecordImpl(parentId, parentType, Collections.<String,Object>emptyMap())); - } + long format = rs.getLong(1); + byte[] globalId = rs.getBytes(2); + byte[] branchId = rs.getBytes(3); + xids.add(new Xid(format, globalId, branchId)); } } finally @@ -2441,18 +2363,67 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC stmt.close(); } + + + for(Xid xid : xids) + { + List<RecordImpl> enqueues = new ArrayList<RecordImpl>(); + List<RecordImpl> dequeues = new ArrayList<RecordImpl>(); + + PreparedStatement pstmt = conn.prepareStatement(SELECT_ALL_FROM_XID_ACTIONS); + + try + { + pstmt.setLong(1, xid.getFormat()); + pstmt.setBytes(2, xid.getGlobalId()); + pstmt.setBytes(3, xid.getBranchId()); + + ResultSet rs = pstmt.executeQuery(); + try + { + while(rs.next()) + { + + String actionType = rs.getString(1); + UUID queueId = UUID.fromString(rs.getString(2)); + long messageId = rs.getLong(3); + + RecordImpl record = new RecordImpl(queueId, messageId); + List<RecordImpl> records = "E".equals(actionType) ? enqueues : dequeues; + records.add(record); + } + } + finally + { + rs.close(); + } + } + finally + { + pstmt.close(); + } + + if (!handler.handle(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(), + enqueues.toArray(new RecordImpl[enqueues.size()]), + dequeues.toArray(new RecordImpl[dequeues.size()]))) + { + break; + } + } + } - finally + catch (SQLException e) { - conn.close(); - } + throw new StoreException("Error encountered when visiting distributed transactions", e); - for(ConfiguredObjectRecord record : configuredObjects.values()) + } + finally { - recoveryHandler.configuredObject(record); + closeConnection(conn); } } + protected abstract String getBlobAsString(ResultSet rs, int col) throws SQLException; protected abstract void storedSizeChange(int storeSizeIncrease); |