diff options
author | William Schultz <william.schultz@mongodb.com> | 2020-01-06 17:48:35 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2020-01-06 17:48:35 +0000 |
commit | e62de66b40d490267506ef89b081dcca3c33b508 (patch) | |
tree | 58d32ed3a10c4a1bfdc2ba69d008394400424a7b | |
parent | 6474ab0ffa9bd0fcfe526bbb07a84100b79d34fb (diff) | |
download | mongo-e62de66b40d490267506ef89b081dcca3c33b508.tar.gz |
SERVER-43978 Allow stable timestamp to advance after a transaction that reserved an oplog timestamp is aborted
(cherry picked from commit 754c07c70cf5cd3c6760683bc29c927010a5718a)
-rw-r--r-- | jstests/replsets/stable_timestamp_can_advance_after_oplog_hole_abort.js | 195 | ||||
-rw-r--r-- | src/mongo/db/catalog/database_impl.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops_exec.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/repl/local_oplog_info.cpp | 26 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.cpp | 7 |
6 files changed, 238 insertions, 16 deletions
diff --git a/jstests/replsets/stable_timestamp_can_advance_after_oplog_hole_abort.js b/jstests/replsets/stable_timestamp_can_advance_after_oplog_hole_abort.js new file mode 100644 index 00000000000..590e8e6d1e6 --- /dev/null +++ b/jstests/replsets/stable_timestamp_can_advance_after_oplog_hole_abort.js @@ -0,0 +1,195 @@ +/** + * Test that the stable timestamp can advance after an oplog hole is released via an abort. + * + * @tags: [uses_transactions] + */ + +(function() { +"use strict"; +load("jstests/libs/fail_point_util.js"); +load("jstests/libs/parallelTester.js"); + +const replTest = new ReplSetTest({nodes: 1}); +replTest.startSet(); +replTest.initiate(); + +const primary = replTest.getPrimary(); + +const dbName = "test"; +const collName = "stable_timestamp_coll"; +const majorityWriteCollName = "stable_timestamp_coll_majority_writes"; + +const testDB = primary.getDB(dbName); +const testColl = testDB.getCollection(collName); +const majorityWriteColl = testDB.getCollection(majorityWriteCollName); +let id = 0; + +// Create the necessary collections. +assert.commandWorked(testDB.createCollection(collName)); +assert.commandWorked(testDB.createCollection(majorityWriteCollName)); + +/** + * The general structure of each test below is as follows: + * + * 1. Start a command/operation C1. + * 2. C1 reserves an oplog timestamp T1 and then pauses. + * 3. Start a w:majority write operation C2 at T2, where T2 > T1. + * 4. C2 completes its write and starts waiting for write concern. + * 5. Abort operation C1. + * 6. Ensure C2 write concern waiting completes successfully. + * + * The first operation can be any operation that reserves an oplog timestamp and then later aborts. + * This could be any number of operations that write to the oplog, including DDL and/or CRUD ops. We + * test a few different varieties below. + * + */ + +// Run a write with {w: "majority"}. +function majorityWriteFn(host, dbName, collName, doc) { + const testDB = new Mongo(host).getDB(dbName); + const testColl = testDB.getCollection(collName); + + assert.commandWorked( + testColl.insert(doc, {writeConcern: {w: "majority", wtimeout: 10 * 1000}})); +} + +// +// Test createCollection abort. +// + +// Create a new collection. +function createCollFn(host, dbName, collName, expectedErrCode) { + const testDB = new Mongo(host).getDB(dbName); + jsTestLog("Creating a new collection."); + assert.commandFailedWithCode(testDB.createCollection(collName), expectedErrCode); +} + +function testCreateCollection() { + jsTestLog("Running createCollection test."); + + // Initialize the failpoint. + const hangCreatefailPoint = + configureFailPoint(primary, "hangAndFailAfterCreateCollectionReservesOpTime"); + + // Start operation T1. + jsTestLog("Starting the create collection operation."); + const createColl = new Thread(createCollFn, primary.host, dbName, "newColl", 51267); + createColl.start(); + hangCreatefailPoint.wait(); + + // Start operation T2, the majority write. + jsTestLog("Starting the majority write operation."); + const doc = {_id: id++}; + const majorityWrite = + new Thread(majorityWriteFn, primary.host, dbName, majorityWriteCollName, doc); + majorityWrite.start(); + + // Wait until the majority write operation has completed and is waiting for write concern. + assert.soon(() => majorityWriteColl.find(doc).itcount() === 1); + + jsTestLog("Releasing the failpoint."); + hangCreatefailPoint.off(); + + jsTestLog("Waiting for the operations to complete."); + createColl.join(); + majorityWrite.join(); +} + +// +// Test insert abort. +// + +// Insert a single document into a given collection. +function insertFn(host, dbName, collName, doc, expectedErrCode) { + const testDB = new Mongo(host).getDB(dbName); + const testColl = testDB.getCollection(collName); + + // Create the new collection. + jsTestLog("Inserting document: " + tojson(doc)); + assert.commandFailedWithCode(testColl.insert(doc), expectedErrCode); +} + +function testInsert() { + jsTestLog("Running insert test."); + + const failPoint = configureFailPoint(primary, + "hangAndFailAfterDocumentInsertsReserveOpTimes", + {collectionNS: testColl.getFullName()}); + + // Start operation T1. + jsTestLog("Starting the insert operation."); + const insert = new Thread(insertFn, primary.host, dbName, collName, {insert: 1}, 51269); + insert.start(); + failPoint.wait(); + + // Start operation T2, the majority write. + jsTestLog("Starting the majority write operation."); + const doc = {_id: id++}; + const majorityWrite = + new Thread(majorityWriteFn, primary.host, dbName, majorityWriteCollName, doc); + majorityWrite.start(); + + // Wait until the majority write operation has completed and is waiting for write concern. + jsTestLog("Waiting until the majority write is visible."); + assert.soon(() => majorityWriteColl.find(doc).itcount() === 1); + + jsTestLog("Releasing the failpoint."); + failPoint.off(); + + jsTestLog("Waiting for the operations to complete."); + insert.join(); + majorityWrite.join(); +} + +// +// Test unprepared transaction commit abort. +// + +// Run and commit a transaction that inserts a document. +function transactionFn(host, dbName, collName) { + const session = new Mongo(host).startSession(); + const sessionDB = session.getDatabase(dbName); + + session.startTransaction(); + sessionDB[collName].insert({}); + assert.commandFailedWithCode(session.commitTransaction_forTesting(), 51268); +} + +function testUnpreparedTransactionCommit() { + jsTestLog("Running unprepared transaction commit test."); + + const failPoint = + configureFailPoint(primary, "hangAndFailUnpreparedCommitAfterReservingOplogSlot"); + + // Start operation T1. + jsTestLog("Starting the transaction."); + const txn = new Thread(transactionFn, primary.host, dbName, collName); + txn.start(); + failPoint.wait(); + + // Start operation T2, the majority write. + jsTestLog("Starting the majority write operation."); + const doc = {_id: id++}; + const majorityWrite = + new Thread(majorityWriteFn, primary.host, dbName, majorityWriteCollName, doc); + majorityWrite.start(); + + // Wait until the majority write operation has completed and is waiting for write concern. + jsTestLog("Waiting until the majority write is visible."); + assert.soon(() => majorityWriteColl.find(doc).itcount() === 1); + + jsTestLog("Releasing the failpoint."); + failPoint.off(); + + jsTestLog("Waiting for the operations to complete."); + txn.join(); + majorityWrite.join(); +} + +// Execute all the tests. +testCreateCollection(); +testInsert(); +testUnpreparedTransactionCommit(); + +replTest.stopSet(); +}());
\ No newline at end of file diff --git a/src/mongo/db/catalog/database_impl.cpp b/src/mongo/db/catalog/database_impl.cpp index 4fb3849f254..8642fd8068b 100644 --- a/src/mongo/db/catalog/database_impl.cpp +++ b/src/mongo/db/catalog/database_impl.cpp @@ -82,6 +82,7 @@ namespace mongo { namespace { MONGO_FAIL_POINT_DEFINE(hangBeforeLoggingCreateCollection); +MONGO_FAIL_POINT_DEFINE(hangAndFailAfterCreateCollectionReservesOpTime); Status validateDBNameForWindows(StringData dbname) { const std::vector<std::string> windowsReservedNames = { @@ -642,6 +643,12 @@ Collection* DatabaseImpl::createCollection(OperationContext* opCtx, createOplogSlot = repl::getNextOpTime(opCtx); } + if (MONGO_unlikely(hangAndFailAfterCreateCollectionReservesOpTime.shouldFail())) { + MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED( + opCtx, hangAndFailAfterCreateCollectionReservesOpTime); + uasserted(51267, "hangAndFailAfterCreateCollectionReservesOpTime fail point enabled"); + } + _checkCanCreateCollection(opCtx, nss, optionsWithUUID); audit::logCreateCollection(&cc(), nss.ns()); diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 2fe31d10d98..73b60b7d904 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -69,6 +69,7 @@ using repl::OplogEntry; namespace { MONGO_FAIL_POINT_DEFINE(failCollectionUpdates); +MONGO_FAIL_POINT_DEFINE(hangAndFailUnpreparedCommitAfterReservingOplogSlot); const auto documentKeyDecoration = OperationContext::declareDecoration<BSONObj>(); @@ -1297,6 +1298,13 @@ void OpObserverImpl::onUnpreparedTransactionCommit( auto oplogSlots = repl::getNextOpTimes(opCtx, statements.size()); invariant(oplogSlots.size() == statements.size()); + if (MONGO_unlikely(hangAndFailUnpreparedCommitAfterReservingOplogSlot.shouldFail())) { + MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED( + opCtx, hangAndFailUnpreparedCommitAfterReservingOplogSlot); + uasserted(51268, + "hangAndFailUnpreparedCommitAfterReservingOplogSlot fail point enabled"); + } + // Log in-progress entries for the transaction along with the implicit commit. int numOplogEntries = logOplogEntriesForTransaction(opCtx, statements, oplogSlots, false); commitOpTime = oplogSlots[numOplogEntries - 1]; diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index 3ae46dc6146..324caf762ab 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -97,6 +97,7 @@ MONGO_FAIL_POINT_DEFINE(hangAfterAllChildRemoveOpsArePopped); MONGO_FAIL_POINT_DEFINE(hangDuringBatchInsert); MONGO_FAIL_POINT_DEFINE(hangDuringBatchUpdate); MONGO_FAIL_POINT_DEFINE(hangDuringBatchRemove); +MONGO_FAIL_POINT_DEFINE(hangAndFailAfterDocumentInsertsReserveOpTimes); // The withLock fail points are for testing interruptability of these operations, so they will not // themselves check for interrupt. MONGO_FAIL_POINT_DEFINE(hangWithLockDuringBatchInsert); @@ -316,6 +317,16 @@ void insertDocuments(OperationContext* opCtx, } } + MONGO_FAIL_POINT_BLOCK(hangAndFailAfterDocumentInsertsReserveOpTimes, nssData) { + const BSONObj& data = nssData.getData(); + const auto collElem = data["collectionNS"]; + if (!collElem || collection->ns().ns() == collElem.str()) { + MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED( + opCtx, hangAndFailAfterDocumentInsertsReserveOpTimes); + uasserted(51269, "hangAndFailAfterDocumentInsertsReserveOpTimes fail point enabled"); + } + } + uassertStatusOK( collection->insertDocuments(opCtx, begin, end, &CurOp::get(opCtx)->debug(), fromMigrate)); wuow.commit(); diff --git a/src/mongo/db/repl/local_oplog_info.cpp b/src/mongo/db/repl/local_oplog_info.cpp index b17da6d88c5..7a2797bdb47 100644 --- a/src/mongo/db/repl/local_oplog_info.cpp +++ b/src/mongo/db/repl/local_oplog_info.cpp @@ -120,21 +120,29 @@ std::vector<OplogSlot> LocalOplogInfo::getNextOpTimes(OperationContext* opCtx, s // Allow the storage engine to start the transaction outside the critical section. opCtx->recoveryUnit()->preallocateSnapshot(); - stdx::lock_guard<Latch> lk(_newOpMutex); - - ts = LogicalClock::get(opCtx)->reserveTicks(count).asTimestamp(); - const bool orderedCommit = false; + { + stdx::lock_guard<Latch> lk(_newOpMutex); - // The local oplog collection pointer must already be established by this point. - // We can't establish it here because that would require locking the local database, which would - // be a lock order violation. - invariant(_oplog); - fassert(28560, _oplog->getRecordStore()->oplogDiskLocRegister(opCtx, ts, orderedCommit)); + ts = LogicalClock::get(opCtx)->reserveTicks(count).asTimestamp(); + const bool orderedCommit = false; + // The local oplog collection pointer must already be established by this point. + // We can't establish it here because that would require locking the local database, which + // would be a lock order violation. + invariant(_oplog); + fassert(28560, _oplog->getRecordStore()->oplogDiskLocRegister(opCtx, ts, orderedCommit)); + } std::vector<OplogSlot> oplogSlots(count); for (std::size_t i = 0; i < count; i++) { oplogSlots[i] = {Timestamp(ts.asULL() + i), term}; } + + // If we abort a transaction that has reserved an optime, we should make sure to update the + // stable timestamp if necessary, since this oplog hole may have been holding back the stable + // timestamp. + opCtx->recoveryUnit()->onRollback( + [replCoord]() { replCoord->attemptToAdvanceStableTimestamp(); }); + return oplogSlots; } diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index b627d2fd637..dde6317acb4 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -676,13 +676,6 @@ TransactionParticipant::OplogSlotReserver::~OplogSlotReserver() { // side transaction. _recoveryUnit->abortUnitOfWork(); } - - // After releasing the oplog hole, the all_durable timestamp can advance past this oplog hole, - // if there are no other open holes. Check if we can advance the stable timestamp any further - // since a majority write may be waiting on the stable timestamp to advance beyond this oplog - // hole to acknowledge the write to the user. - auto replCoord = repl::ReplicationCoordinator::get(_opCtx); - replCoord->attemptToAdvanceStableTimestamp(); } TransactionParticipant::TxnResources::TxnResources(WithLock wl, |