diff options
Diffstat (limited to 'java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java')
-rw-r--r-- | java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java | 140 |
1 files changed, 139 insertions, 1 deletions
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index 29f2a2f2fb..a91d8f359e 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -36,6 +36,7 @@ import com.sleepycat.je.EnvironmentConfig; import com.sleepycat.je.LockConflictException; import com.sleepycat.je.LockMode; import com.sleepycat.je.OperationStatus; +import com.sleepycat.je.Transaction; import com.sleepycat.je.TransactionConfig; import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; @@ -68,14 +69,18 @@ import org.apache.qpid.server.store.TransactionLogRecoveryHandler; import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_5; +import org.apache.qpid.server.store.berkeleydb.keys.Xid; import org.apache.qpid.server.store.berkeleydb.records.BindingRecord; import org.apache.qpid.server.store.berkeleydb.records.ExchangeRecord; +import org.apache.qpid.server.store.berkeleydb.records.PreparedTransaction; import org.apache.qpid.server.store.berkeleydb.records.QueueRecord; import org.apache.qpid.server.store.berkeleydb.tuples.BindingTupleBindingFactory; import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTB_5; import org.apache.qpid.server.store.berkeleydb.tuples.MessageMetaDataTupleBindingFactory; +import org.apache.qpid.server.store.berkeleydb.tuples.PreparedTransactionTB; import org.apache.qpid.server.store.berkeleydb.tuples.QueueEntryTB; import org.apache.qpid.server.store.berkeleydb.tuples.QueueTupleBindingFactory; +import org.apache.qpid.server.store.berkeleydb.tuples.XidTB; import java.io.File; import java.lang.ref.SoftReference; @@ -120,6 +125,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore private String QUEUEDB_NAME = "queueDb"; private String BRIDGEDB_NAME = "bridges"; private String LINKDB_NAME = "links"; + private String XIDDB_NAME = "xids"; private Database _messageMetaDataDb; private Database _messageContentDb; @@ -129,6 +135,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore private Database _queueDb; private Database _bridgeDb; private Database _linkDb; + private Database _xidDb; /* ======= * Schema: @@ -217,6 +224,8 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore LINKDB_NAME += "_v" + version; BRIDGEDB_NAME += "_v" + version; + + XIDDB_NAME += "_v" + version; } } @@ -272,6 +281,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } recoverQueueEntries(recoveryHandler); + } @@ -487,6 +497,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore _deliveryDb = openDatabase(DELIVERYDB_NAME, dbConfig); _linkDb = openDatabase(LINKDB_NAME, dbConfig); _bridgeDb = openDatabase(BRIDGEDB_NAME, dbConfig); + _xidDb = openDatabase(XIDDB_NAME, dbConfig); } @@ -564,6 +575,13 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore _linkDb.close(); } + + if (_xidDb != null) + { + _log.info("Close xid database"); + _xidDb.close(); + } + closeEnvironment(); _state = State.CLOSED; @@ -884,7 +902,52 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } } - qerh.completeQueueEntryRecovery(); + + + TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = qerh.completeQueueEntryRecovery(); + + cursor = null; + try + { + cursor = _xidDb.openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + XidTB keyBinding = new XidTB(); + PreparedTransactionTB valueBinding = new PreparedTransactionTB(); + DatabaseEntry value = new DatabaseEntry(); + + while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + Xid xid = keyBinding.entryToObject(key); + PreparedTransaction preparedTransaction = valueBinding.entryToObject(value); + dtxrh.dtxRecord(xid.getFormat(),xid.getGlobalId(),xid.getBranchId(), + preparedTransaction.getEnqueues(),preparedTransaction.getDequeues()); + } + + try + { + cursor.close(); + } + finally + { + cursor = null; + } + + } + catch (DatabaseException e) + { + _log.error("Database Error: " + e.getMessage(), e); + throw e; + } + finally + { + if (cursor != null) + { + cursor.close(); + } + } + + + dtxrh.completeDtxRecordRecovery(); } /** @@ -1481,6 +1544,69 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } } + + private void recordXid(com.sleepycat.je.Transaction txn, + long format, + byte[] globalId, + byte[] branchId, + Transaction.Record[] enqueues, + Transaction.Record[] dequeues) throws AMQStoreException + { + DatabaseEntry key = new DatabaseEntry(); + Xid xid = new Xid(format, globalId, branchId); + XidTB keyBinding = new XidTB(); + keyBinding.objectToEntry(xid,key); + + DatabaseEntry value = new DatabaseEntry(); + PreparedTransaction preparedTransaction = new PreparedTransaction(enqueues, dequeues); + PreparedTransactionTB valueBinding = new PreparedTransactionTB(); + valueBinding.objectToEntry(preparedTransaction, value); + + try + { + _xidDb.put(txn, key, value); + } + catch (DatabaseException e) + { + _log.error("Failed to write xid: " + e.getMessage(), e); + throw new AMQStoreException("Error writing xid to database", e); + } + } + + private void removeXid(com.sleepycat.je.Transaction txn, long format, byte[] globalId, byte[] branchId) + throws AMQStoreException + { + DatabaseEntry key = new DatabaseEntry(); + Xid xid = new Xid(format, globalId, branchId); + XidTB keyBinding = new XidTB(); + + keyBinding.objectToEntry(xid, key); + + + try + { + + OperationStatus status = _xidDb.delete(txn, key); + if (status == OperationStatus.NOTFOUND) + { + throw new AMQStoreException("Unable to find xid"); + } + else if (status != OperationStatus.SUCCESS) + { + throw new AMQStoreException("Unable to remove xid"); + } + + } + catch (DatabaseException e) + { + + _log.error("Failed to remove xid ", e); + _log.error(txn); + + throw new AMQStoreException("Error accessing database while removing xid: " + e.getMessage(), e); + } + } + /** * Commits all operations performed within a given transaction. * @@ -2385,6 +2511,18 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore { BDBMessageStore.this.abortTran(_txn); } + + public void removeXid(long format, byte[] globalId, byte[] branchId) throws AMQStoreException + { + BDBMessageStore.this.removeXid(_txn, format, globalId, branchId); + } + + public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, + Record[] dequeues) throws AMQStoreException + { + BDBMessageStore.this.recordXid(_txn, format, globalId, branchId, enqueues, dequeues); + } } + } |