diff options
Diffstat (limited to 'qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java')
-rw-r--r-- | qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java | 542 |
1 files changed, 247 insertions, 295 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index c64bc43066..652e4c135d 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -27,8 +27,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; import java.util.Map; import java.util.Random; import java.util.UUID; @@ -38,29 +36,23 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.log4j.Logger; import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; import org.apache.qpid.server.store.EventManager; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreRecoveryHandler; -import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMemoryMessage; import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.store.TransactionLogRecoveryHandler; -import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler; import org.apache.qpid.server.store.TransactionLogResource; +import org.apache.qpid.server.store.Xid; import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory.EnvironmentFacadeTask; import org.apache.qpid.server.store.berkeleydb.entry.HierarchyKey; import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction; import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey; -import org.apache.qpid.server.store.berkeleydb.entry.Xid; import org.apache.qpid.server.store.berkeleydb.tuple.ConfiguredObjectBinding; import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding; import org.apache.qpid.server.store.berkeleydb.tuple.HierarchyKeyBinding; @@ -70,6 +62,10 @@ import org.apache.qpid.server.store.berkeleydb.tuple.QueueEntryBinding; import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding; import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding; import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader; +import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; +import org.apache.qpid.server.store.handler.DistributedTransactionHandler; +import org.apache.qpid.server.store.handler.MessageHandler; +import org.apache.qpid.server.store.handler.MessageInstanceHandler; import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.util.FileUtils; @@ -129,7 +125,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore private long _persistentSizeHighThreshold; private final EventManager _eventManager = new EventManager(); - private final String _type; private final EnvironmentFacadeFactory _environmentFacadeFactory; @@ -143,7 +138,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore public BDBMessageStore(EnvironmentFacadeFactory environmentFacadeFactory) { - _type = environmentFacadeFactory.getType(); _environmentFacadeFactory = environmentFacadeFactory; } @@ -160,18 +154,19 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore { if (_environmentFacade == null) { - String[] databaseNames = null; + EnvironmentFacadeTask[] initialisationTasks = null; if (MapValueConverter.getBooleanAttribute(IS_MESSAGE_STORE_TOO, storeSettings, false)) { - databaseNames = new String[CONFIGURATION_STORE_DATABASE_NAMES.length + MESSAGE_STORE_DATABASE_NAMES.length]; + String[] databaseNames = new String[CONFIGURATION_STORE_DATABASE_NAMES.length + MESSAGE_STORE_DATABASE_NAMES.length]; System.arraycopy(CONFIGURATION_STORE_DATABASE_NAMES, 0, databaseNames, 0, CONFIGURATION_STORE_DATABASE_NAMES.length); System.arraycopy(MESSAGE_STORE_DATABASE_NAMES, 0, databaseNames, CONFIGURATION_STORE_DATABASE_NAMES.length, MESSAGE_STORE_DATABASE_NAMES.length); + initialisationTasks = new EnvironmentFacadeTask[]{new UpgradeTask(parent), new OpenDatabasesTask(databaseNames), new DiskSpaceTask(), new MaxMessageIdTask() }; } else { - databaseNames = CONFIGURATION_STORE_DATABASE_NAMES; + initialisationTasks = new EnvironmentFacadeTask[]{new UpgradeTask(parent), new OpenDatabasesTask(CONFIGURATION_STORE_DATABASE_NAMES)}; } - _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(storeSettings, new UpgradeTask(parent), new OpenDatabasesTask(databaseNames)); + _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(storeSettings, initialisationTasks); } else { @@ -181,11 +176,88 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } @Override - public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler) + public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) { checkConfigurationStoreOpen(); - recoverConfig(recoveryHandler); + try + { + int configVersion = getConfigVersion(); + + handler.begin(configVersion); + doVisitAllConfiguredObjectRecords(handler); + + int newConfigVersion = handler.end(); + if(newConfigVersion != configVersion) + { + updateConfigVersion(newConfigVersion); + } + } + catch (DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("Cannot visit configured object records", e); + } + + } + + private void doVisitAllConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) + { + Map<UUID, BDBConfiguredObjectRecord> configuredObjects = new HashMap<UUID, BDBConfiguredObjectRecord>(); + Cursor objectsCursor = null; + Cursor hierarchyCursor = null; + try + { + objectsCursor = getConfiguredObjectsDb().openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + DatabaseEntry value = new DatabaseEntry(); + + + while (objectsCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + UUID id = UUIDTupleBinding.getInstance().entryToObject(key); + + BDBConfiguredObjectRecord configuredObject = + (BDBConfiguredObjectRecord) new ConfiguredObjectBinding(id).entryToObject(value); + configuredObjects.put(configuredObject.getId(), configuredObject); + } + + // set parents + hierarchyCursor = getConfiguredObjectHierarchyDb().openCursor(null, null); + while (hierarchyCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + HierarchyKey hk = HierarchyKeyBinding.getInstance().entryToObject(key); + UUID parentId = UUIDTupleBinding.getInstance().entryToObject(value); + BDBConfiguredObjectRecord child = configuredObjects.get(hk.getChildId()); + if(child != null) + { + ConfiguredObjectRecord parent = configuredObjects.get(parentId); + if(parent != null) + { + child.addParent(hk.getParentType(), parent); + } + else if(hk.getParentType().equals("Exchange")) + { + // TODO - remove this hack for the pre-defined exchanges + child.addParent(hk.getParentType(), new BDBConfiguredObjectRecord(parentId, "Exchange", Collections.<String,Object>emptyMap())); + } + } + } + } + finally + { + closeCursorSafely(objectsCursor); + closeCursorSafely(hierarchyCursor); + } + + for (ConfiguredObjectRecord record : configuredObjects.values()) + { + boolean shoudlContinue = handler.handle(record); + if (!shoudlContinue) + { + break; + } + } + } @Override @@ -209,7 +281,8 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore if (_environmentFacade == null) { - _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(messageStoreSettings, new UpgradeTask(parent), new OpenDatabasesTask(MESSAGE_STORE_DATABASE_NAMES), new DiskSpaceTask()); + _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(messageStoreSettings, + new UpgradeTask(parent), new OpenDatabasesTask(MESSAGE_STORE_DATABASE_NAMES), new DiskSpaceTask(), new MaxMessageIdTask()); } _committer = _environmentFacade.createCommitter(parent.getName()); @@ -218,21 +291,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } @Override - public synchronized void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler) throws StoreException - { - checkMessageStoreOpen(); - - if(messageRecoveryHandler != null) - { - recoverMessages(messageRecoveryHandler); - } - if(transactionLogRecoveryHandler != null) - { - recoverQueueEntries(transactionLogRecoveryHandler); - } - } - - @Override public org.apache.qpid.server.store.Transaction newTransaction() throws StoreException { checkMessageStoreOpen(); @@ -314,27 +372,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } } - private void recoverConfig(ConfigurationRecoveryHandler recoveryHandler) throws StoreException - { - try - { - final int configVersion = getConfigVersion(); - recoveryHandler.beginConfigurationRecovery(this, configVersion); - loadConfiguredObjects(recoveryHandler); - - final int newConfigVersion = recoveryHandler.completeConfigurationRecovery(); - if(newConfigVersion != configVersion) - { - updateConfigVersion(newConfigVersion); - } - } - catch (DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("Error recovering persistent state: " + e.getMessage(), e); - } - - } - @SuppressWarnings("resource") private void updateConfigVersion(int newConfigVersion) throws StoreException { @@ -399,62 +436,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } } - private void loadConfiguredObjects(ConfigurationRecoveryHandler crh) throws DatabaseException, StoreException - { - Cursor objectsCursor = null; - Cursor hierarchyCursor = null; - try - { - objectsCursor = getConfiguredObjectsDb().openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - DatabaseEntry value = new DatabaseEntry(); - - Map<UUID, BDBConfiguredObjectRecord> configuredObjects = - new HashMap<UUID, BDBConfiguredObjectRecord>(); - - while (objectsCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - UUID id = UUIDTupleBinding.getInstance().entryToObject(key); - - BDBConfiguredObjectRecord configuredObject = - (BDBConfiguredObjectRecord) new ConfiguredObjectBinding(id).entryToObject(value); - configuredObjects.put(configuredObject.getId(), configuredObject); - } - - // set parents - hierarchyCursor = getConfiguredObjectHierarchyDb().openCursor(null, null); - while (hierarchyCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - HierarchyKey hk = HierarchyKeyBinding.getInstance().entryToObject(key); - UUID parentId = UUIDTupleBinding.getInstance().entryToObject(value); - BDBConfiguredObjectRecord child = configuredObjects.get(hk.getChildId()); - if(child != null) - { - ConfiguredObjectRecord parent = configuredObjects.get(parentId); - if(parent != null) - { - child.addParent(hk.getParentType(), parent); - } - else if(hk.getParentType().equals("Exchange")) - { - // TODO - remove this hack for the pre-defined exchanges - child.addParent(hk.getParentType(), new BDBConfiguredObjectRecord(parentId, "Exchange", Collections.<String,Object>emptyMap())); - } - } - } - - for (ConfiguredObjectRecord record : configuredObjects.values()) - { - crh.configuredObject(record); - } - } - finally - { - closeCursorSafely(objectsCursor); - closeCursorSafely(hierarchyCursor); - } - } - private void closeCursorSafely(Cursor cursor) throws StoreException { if (cursor != null) @@ -470,124 +451,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } } - private void recoverMessages(MessageStoreRecoveryHandler msrh) throws StoreException - { - StoredMessageRecoveryHandler mrh = msrh.begin(); - - Cursor cursor = null; - try - { - cursor = getMessageMetaDataDb().openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - DatabaseEntry value = new DatabaseEntry(); - MessageMetaDataBinding valueBinding = MessageMetaDataBinding.getInstance(); - - long maxId = 0; - - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - long messageId = LongBinding.entryToLong(key); - StorableMessageMetaData metaData = valueBinding.entryToObject(value); - - StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true); - - mrh.message(message); - - maxId = Math.max(maxId, messageId); - } - - _messageId.set(maxId); - mrh.completeMessageRecovery(); - } - catch (DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("Cannot recover messages", e); - } - finally - { - closeCursorSafely(cursor); - } - } - - private void recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) - throws StoreException - { - QueueEntryRecoveryHandler qerh = recoveryHandler.begin(this); - - ArrayList<QueueEntryKey> entries = new ArrayList<QueueEntryKey>(); - - Cursor cursor = null; - try - { - cursor = getDeliveryDb().openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); - - DatabaseEntry value = new DatabaseEntry(); - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - QueueEntryKey qek = keyBinding.entryToObject(key); - - entries.add(qek); - } - - try - { - cursor.close(); - } - finally - { - cursor = null; - } - - for(QueueEntryKey entry : entries) - { - UUID queueId = entry.getQueueId(); - long messageId = entry.getMessageId(); - qerh.queueEntry(queueId, messageId); - } - } - catch (DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("Cannot recover queue entries", e); - } - finally - { - closeCursorSafely(cursor); - } - - TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = qerh.completeQueueEntryRecovery(); - - cursor = null; - try - { - cursor = getXidDb().openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - XidBinding keyBinding = XidBinding.getInstance(); - PreparedTransactionBinding valueBinding = new PreparedTransactionBinding(); - DatabaseEntry value = new DatabaseEntry(); - - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - Xid xid = keyBinding.entryToObject(key); - PreparedTransaction preparedTransaction = valueBinding.entryToObject(value); - dtxrh.dtxRecord(xid.getFormat(),xid.getGlobalId(),xid.getBranchId(), - preparedTransaction.getEnqueues(),preparedTransaction.getDequeues()); - } - - } - catch (DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("Cannot recover transactions", e); - } - finally - { - closeCursorSafely(cursor); - } - - - dtxrh.completeDtxRecordRecovery(); - } void removeMessage(long messageId, boolean sync) throws StoreException { @@ -738,6 +601,12 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore public void create(ConfiguredObjectRecord configuredObject) throws StoreException { checkConfigurationStoreOpen(); + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Create " + configuredObject); + } + com.sleepycat.je.Transaction txn = null; try { @@ -831,7 +700,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Updating " + record.getType() + ", id: " + record.getId()); + LOGGER.debug("Updating, creating " + createIfNecessary + " : " + record); } DatabaseEntry key = new DatabaseEntry(); @@ -889,8 +758,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore if (LOGGER.isDebugEnabled()) { LOGGER.debug("Enqueuing message " + messageId + " on queue " - + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + queue.getId() - + " in transaction " + tx); + + queue.getName() + " with id " + queue.getId() + " in transaction " + tx); } getDeliveryDb().put(tx, key, value); } @@ -898,8 +766,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore { LOGGER.error("Failed to enqueue: " + e.getMessage(), e); throw _environmentFacade.handleDatabaseException("Error writing enqueued message with id " + messageId + " for queue " - + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + queue.getId() - + " to database", e); + + queue.getName() + " with id " + queue.getId() + " to database", e); } } @@ -924,7 +791,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore if (LOGGER.isDebugEnabled()) { LOGGER.debug("Dequeue message id " + messageId + " from queue " - + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id); + + queue.getName() + " with id " + id); } try @@ -934,19 +801,18 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore if (status == OperationStatus.NOTFOUND) { throw new StoreException("Unable to find message with id " + messageId + " on queue " - + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id); + + queue.getName() + " with id " + id); } else if (status != OperationStatus.SUCCESS) { throw new StoreException("Unable to remove message with id " + messageId + " on queue" - + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id); + + queue.getName() + " with id " + id); } if (LOGGER.isDebugEnabled()) { LOGGER.debug("Removed message " + messageId + " on queue " - + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id - + " from delivery db"); + + queue.getName() + " with id " + id); } } @@ -1072,57 +938,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } /** - * Primarily for testing purposes. - * - * @param queueId - * - * @return a list of message ids for messages enqueued for a particular queue - */ - List<Long> getEnqueuedMessages(UUID queueId) throws StoreException - { - Cursor cursor = null; - try - { - cursor = getDeliveryDb().openCursor(null, null); - - DatabaseEntry key = new DatabaseEntry(); - - QueueEntryKey dd = new QueueEntryKey(queueId, 0); - - QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); - keyBinding.objectToEntry(dd, key); - - DatabaseEntry value = new DatabaseEntry(); - - LinkedList<Long> messageIds = new LinkedList<Long>(); - - OperationStatus status = cursor.getSearchKeyRange(key, value, LockMode.DEFAULT); - dd = keyBinding.entryToObject(key); - - while ((status == OperationStatus.SUCCESS) && dd.getQueueId().equals(queueId)) - { - - messageIds.add(dd.getMessageId()); - status = cursor.getNext(key, value, LockMode.DEFAULT); - if (status == OperationStatus.SUCCESS) - { - dd = keyBinding.entryToObject(key); - } - } - - return messageIds; - } - catch (DatabaseException e) - { - throw new StoreException("Database error: " + e.getMessage(), e); - } - finally - { - closeCursorSafely(cursor); - } - } - - /** * Return a valid, currently unused message id. * * @return A fresh message id. @@ -1792,12 +1607,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } } - @Override - public String getStoreType() - { - return _type; - } - private Database getConfiguredObjectsDb() { return _environmentFacade.getOpenDatabase(CONFIGURED_OBJECTS_DB_NAME); @@ -1901,4 +1710,147 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } } + + public class MaxMessageIdTask implements EnvironmentFacadeTask, MessageHandler + { + private long _maxId; + + @Override + public void execute(EnvironmentFacade facade) + { + visitMessagesInternal(this, facade); + _messageId.set(_maxId); + } + + @Override + public boolean handle(StoredMessage<?> storedMessage) + { + long id = storedMessage.getMessageNumber(); + if (_maxId<id) + { + _maxId = id; + } + return true; + } + + } + + @Override + public void visitMessages(MessageHandler handler) throws StoreException + { + checkMessageStoreOpen(); + visitMessagesInternal(handler, _environmentFacade); + } + + private void visitMessagesInternal(MessageHandler handler, EnvironmentFacade environmentFacade) + { + Cursor cursor = null; + try + { + cursor = environmentFacade.getOpenDatabase(MESSAGE_META_DATA_DB_NAME).openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + DatabaseEntry value = new DatabaseEntry(); + MessageMetaDataBinding valueBinding = MessageMetaDataBinding.getInstance(); + + while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + long messageId = LongBinding.entryToLong(key); + StorableMessageMetaData metaData = valueBinding.entryToObject(value); + StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true); + + if (!handler.handle(message)) + { + break; + } + } + } + catch (DatabaseException e) + { + throw environmentFacade.handleDatabaseException("Cannot recover messages", e); + } + finally + { + if (cursor != null) + { + try + { + cursor.close(); + } + catch(DatabaseException e) + { + throw environmentFacade.handleDatabaseException("Cannot close cursor", e); + } + } + } + } + + @Override + public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException + { + checkMessageStoreOpen(); + + Cursor cursor = null; + try + { + cursor = getDeliveryDb().openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); + + DatabaseEntry value = new DatabaseEntry(); + while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + QueueEntryKey entry = keyBinding.entryToObject(key); + UUID queueId = entry.getQueueId(); + long messageId = entry.getMessageId(); + if (!handler.handle(queueId, messageId)) + { + break; + } + } + } + catch (DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("Cannot visit message instances", e); + } + finally + { + closeCursorSafely(cursor); + } + } + + @Override + public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException + { + checkMessageStoreOpen(); + + Cursor cursor = null; + try + { + cursor = getXidDb().openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + XidBinding keyBinding = XidBinding.getInstance(); + PreparedTransactionBinding valueBinding = new PreparedTransactionBinding(); + DatabaseEntry value = new DatabaseEntry(); + + while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + Xid xid = keyBinding.entryToObject(key); + PreparedTransaction preparedTransaction = valueBinding.entryToObject(value); + if (!handler.handle(xid.getFormat(),xid.getGlobalId(),xid.getBranchId(), + preparedTransaction.getEnqueues(),preparedTransaction.getDequeues())) + { + break; + } + } + + } + catch (DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("Cannot recover distributed transactions", e); + } + finally + { + closeCursorSafely(cursor); + } + } } |