diff options
author | William Schultz <william.schultz@mongodb.com> | 2019-11-12 15:39:40 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-11-12 15:39:40 +0000 |
commit | 754c07c70cf5cd3c6760683bc29c927010a5718a (patch) | |
tree | 364e222dccf240ae848e0bf0ce13a4678ed2eff2 | |
parent | c57a899b676a8b9fe35fdc2147c12602deda4274 (diff) | |
download | mongo-754c07c70cf5cd3c6760683bc29c927010a5718a.tar.gz |
SERVER-43978 Allow stable timestamp to advance after a transaction that reserved an oplog timestamp is aborted
-rw-r--r-- | jstests/replsets/stable_timestamp_can_advance_after_oplog_hole_abort.js | 194 | ||||
-rw-r--r-- | src/mongo/db/catalog/database_impl.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops_exec.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/repl/local_oplog_info.cpp | 26 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.cpp | 7 |
6 files changed, 235 insertions, 16 deletions
diff --git a/jstests/replsets/stable_timestamp_can_advance_after_oplog_hole_abort.js b/jstests/replsets/stable_timestamp_can_advance_after_oplog_hole_abort.js new file mode 100644 index 00000000000..af361887639 --- /dev/null +++ b/jstests/replsets/stable_timestamp_can_advance_after_oplog_hole_abort.js @@ -0,0 +1,194 @@ +/** + * Test that the stable timestamp can advance after an oplog hole is released via an abort. + * + * @tags: [uses_transactions] + */ + +(function() { +"use strict"; +load("jstests/libs/fail_point_util.js"); + +const replTest = new ReplSetTest({nodes: 1}); +replTest.startSet(); +replTest.initiate(); + +const primary = replTest.getPrimary(); + +const dbName = "test"; +const collName = "stable_timestamp_coll"; +const majorityWriteCollName = "stable_timestamp_coll_majority_writes"; + +const testDB = primary.getDB(dbName); +const testColl = testDB.getCollection(collName); +const majorityWriteColl = testDB.getCollection(majorityWriteCollName); +let id = 0; + +// Create the necessary collections. +assert.commandWorked(testDB.createCollection(collName)); +assert.commandWorked(testDB.createCollection(majorityWriteCollName)); + +/** + * The general structure of each test below is as follows: + * + * 1. Start a command/operation C1. + * 2. C1 reserves an oplog timestamp T1 and then pauses. + * 3. Start a w:majority write operation C2 at T2, where T2 > T1. + * 4. C2 completes its write and starts waiting for write concern. + * 5. Abort operation C1. + * 6. Ensure C2 write concern waiting completes successfully. + * + * The first operation can be any operation that reserves an oplog timestamp and then later aborts. + * This could be any number of operations that write to the oplog, including DDL and/or CRUD ops. We + * test a few different varieties below. + * + */ + +// Run a write with {w: "majority"}. +function majorityWriteFn(host, dbName, collName, doc) { + const testDB = new Mongo(host).getDB(dbName); + const testColl = testDB.getCollection(collName); + + assert.commandWorked( + testColl.insert(doc, {writeConcern: {w: "majority", wtimeout: 10 * 1000}})); +} + +// +// Test createCollection abort. +// + +// Create a new collection. +function createCollFn(host, dbName, collName, expectedErrCode) { + const testDB = new Mongo(host).getDB(dbName); + jsTestLog("Creating a new collection."); + assert.commandFailedWithCode(testDB.createCollection(collName), expectedErrCode); +} + +function testCreateCollection() { + jsTestLog("Running createCollection test."); + + // Initialize the failpoint. + const hangCreatefailPoint = + configureFailPoint(primary, "hangAndFailAfterCreateCollectionReservesOpTime"); + + // Start operation T1. + jsTestLog("Starting the create collection operation."); + const createColl = new Thread(createCollFn, primary.host, dbName, "newColl", 51267); + createColl.start(); + hangCreatefailPoint.wait(); + + // Start operation T2, the majority write. + jsTestLog("Starting the majority write operation."); + const doc = {_id: id++}; + const majorityWrite = + new Thread(majorityWriteFn, primary.host, dbName, majorityWriteCollName, doc); + majorityWrite.start(); + + // Wait until the majority write operation has completed and is waiting for write concern. + assert.soon(() => majorityWriteColl.find(doc).itcount() === 1); + + jsTestLog("Releasing the failpoint."); + hangCreatefailPoint.off(); + + jsTestLog("Waiting for the operations to complete."); + createColl.join(); + majorityWrite.join(); +} + +// +// Test insert abort. +// + +// Insert a single document into a given collection. +function insertFn(host, dbName, collName, doc, expectedErrCode) { + const testDB = new Mongo(host).getDB(dbName); + const testColl = testDB.getCollection(collName); + + // Create the new collection. + jsTestLog("Inserting document: " + tojson(doc)); + assert.commandFailedWithCode(testColl.insert(doc), expectedErrCode); +} + +function testInsert() { + jsTestLog("Running insert test."); + + const failPoint = configureFailPoint(primary, + "hangAndFailAfterDocumentInsertsReserveOpTimes", + {collectionNS: testColl.getFullName()}); + + // Start operation T1. + jsTestLog("Starting the insert operation."); + const insert = new Thread(insertFn, primary.host, dbName, collName, {insert: 1}, 51269); + insert.start(); + failPoint.wait(); + + // Start operation T2, the majority write. + jsTestLog("Starting the majority write operation."); + const doc = {_id: id++}; + const majorityWrite = + new Thread(majorityWriteFn, primary.host, dbName, majorityWriteCollName, doc); + majorityWrite.start(); + + // Wait until the majority write operation has completed and is waiting for write concern. + jsTestLog("Waiting until the majority write is visible."); + assert.soon(() => majorityWriteColl.find(doc).itcount() === 1); + + jsTestLog("Releasing the failpoint."); + failPoint.off(); + + jsTestLog("Waiting for the operations to complete."); + insert.join(); + majorityWrite.join(); +} + +// +// Test unprepared transaction commit abort. +// + +// Run and commit a transaction that inserts a document. +function transactionFn(host, dbName, collName) { + const session = new Mongo(host).startSession(); + const sessionDB = session.getDatabase(dbName); + + session.startTransaction(); + sessionDB[collName].insert({}); + assert.commandFailedWithCode(session.commitTransaction_forTesting(), 51268); +} + +function testUnpreparedTransactionCommit() { + jsTestLog("Running unprepared transaction commit test."); + + const failPoint = + configureFailPoint(primary, "hangAndFailUnpreparedCommitAfterReservingOplogSlot"); + + // Start operation T1. + jsTestLog("Starting the transaction."); + const txn = new Thread(transactionFn, primary.host, dbName, collName); + txn.start(); + failPoint.wait(); + + // Start operation T2, the majority write. + jsTestLog("Starting the majority write operation."); + const doc = {_id: id++}; + const majorityWrite = + new Thread(majorityWriteFn, primary.host, dbName, majorityWriteCollName, doc); + majorityWrite.start(); + + // Wait until the majority write operation has completed and is waiting for write concern. + jsTestLog("Waiting until the majority write is visible."); + assert.soon(() => majorityWriteColl.find(doc).itcount() === 1); + + jsTestLog("Releasing the failpoint."); + failPoint.off(); + + jsTestLog("Waiting for the operations to complete."); + txn.join(); + majorityWrite.join(); +} + +// Execute all the tests. +testCreateCollection(); +testInsert(); +testUnpreparedTransactionCommit(); + +replTest.stopSet(); +}());
\ No newline at end of file diff --git a/src/mongo/db/catalog/database_impl.cpp b/src/mongo/db/catalog/database_impl.cpp index bea3f169399..430b540b034 100644 --- a/src/mongo/db/catalog/database_impl.cpp +++ b/src/mongo/db/catalog/database_impl.cpp @@ -82,6 +82,7 @@ namespace mongo { namespace { MONGO_FAIL_POINT_DEFINE(hangBeforeLoggingCreateCollection); +MONGO_FAIL_POINT_DEFINE(hangAndFailAfterCreateCollectionReservesOpTime); Status validateDBNameForWindows(StringData dbname) { const std::vector<std::string> windowsReservedNames = { @@ -627,6 +628,11 @@ Collection* DatabaseImpl::createCollection(OperationContext* opCtx, createOplogSlot = repl::getNextOpTime(opCtx); } + if (MONGO_unlikely(hangAndFailAfterCreateCollectionReservesOpTime.shouldFail())) { + hangAndFailAfterCreateCollectionReservesOpTime.pauseWhileSet(opCtx); + uasserted(51267, "hangAndFailAfterCreateCollectionReservesOpTime fail point enabled"); + } + _checkCanCreateCollection(opCtx, nss, optionsWithUUID); audit::logCreateCollection(&cc(), nss.ns()); diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 28696a8a95f..bc004104381 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -70,6 +70,7 @@ using repl::OplogEntry; namespace { MONGO_FAIL_POINT_DEFINE(failCollectionUpdates); +MONGO_FAIL_POINT_DEFINE(hangAndFailUnpreparedCommitAfterReservingOplogSlot); const auto documentKeyDecoration = OperationContext::declareDecoration<BSONObj>(); @@ -1009,6 +1010,11 @@ void OpObserverImpl::onUnpreparedTransactionCommit( auto oplogSlots = repl::getNextOpTimes(opCtx, statements.size()); invariant(oplogSlots.size() == statements.size()); + if (MONGO_unlikely(hangAndFailUnpreparedCommitAfterReservingOplogSlot.shouldFail())) { + hangAndFailUnpreparedCommitAfterReservingOplogSlot.pauseWhileSet(opCtx); + uasserted(51268, "hangAndFailUnpreparedCommitAfterReservingOplogSlot fail point enabled"); + } + // Log in-progress entries for the transaction along with the implicit commit. int numOplogEntries = logOplogEntriesForTransaction(opCtx, statements, oplogSlots, false); commitOpTime = oplogSlots[numOplogEntries - 1]; diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index 0876a5598df..38da11a7d0e 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -97,6 +97,7 @@ MONGO_FAIL_POINT_DEFINE(hangAfterAllChildRemoveOpsArePopped); MONGO_FAIL_POINT_DEFINE(hangDuringBatchInsert); MONGO_FAIL_POINT_DEFINE(hangDuringBatchUpdate); MONGO_FAIL_POINT_DEFINE(hangDuringBatchRemove); +MONGO_FAIL_POINT_DEFINE(hangAndFailAfterDocumentInsertsReserveOpTimes); // The withLock fail points are for testing interruptability of these operations, so they will not // themselves check for interrupt. MONGO_FAIL_POINT_DEFINE(hangWithLockDuringBatchInsert); @@ -317,6 +318,17 @@ void insertDocuments(OperationContext* opCtx, } } + hangAndFailAfterDocumentInsertsReserveOpTimes.executeIf( + [&](const BSONObj& data) { + hangAndFailAfterDocumentInsertsReserveOpTimes.pauseWhileSet(opCtx); + uasserted(51269, "hangAndFailAfterDocumentInsertsReserveOpTimes fail point enabled"); + }, + [&](const BSONObj& data) { + // Check if the failpoint specifies no collection or matches the existing one. + const auto collElem = data["collectionNS"]; + return !collElem || collection->ns().ns() == collElem.str(); + }); + uassertStatusOK( collection->insertDocuments(opCtx, begin, end, &CurOp::get(opCtx)->debug(), fromMigrate)); wuow.commit(); diff --git a/src/mongo/db/repl/local_oplog_info.cpp b/src/mongo/db/repl/local_oplog_info.cpp index b17da6d88c5..7a2797bdb47 100644 --- a/src/mongo/db/repl/local_oplog_info.cpp +++ b/src/mongo/db/repl/local_oplog_info.cpp @@ -120,21 +120,29 @@ std::vector<OplogSlot> LocalOplogInfo::getNextOpTimes(OperationContext* opCtx, s // Allow the storage engine to start the transaction outside the critical section. opCtx->recoveryUnit()->preallocateSnapshot(); - stdx::lock_guard<Latch> lk(_newOpMutex); - - ts = LogicalClock::get(opCtx)->reserveTicks(count).asTimestamp(); - const bool orderedCommit = false; + { + stdx::lock_guard<Latch> lk(_newOpMutex); - // The local oplog collection pointer must already be established by this point. - // We can't establish it here because that would require locking the local database, which would - // be a lock order violation. - invariant(_oplog); - fassert(28560, _oplog->getRecordStore()->oplogDiskLocRegister(opCtx, ts, orderedCommit)); + ts = LogicalClock::get(opCtx)->reserveTicks(count).asTimestamp(); + const bool orderedCommit = false; + // The local oplog collection pointer must already be established by this point. + // We can't establish it here because that would require locking the local database, which + // would be a lock order violation. + invariant(_oplog); + fassert(28560, _oplog->getRecordStore()->oplogDiskLocRegister(opCtx, ts, orderedCommit)); + } std::vector<OplogSlot> oplogSlots(count); for (std::size_t i = 0; i < count; i++) { oplogSlots[i] = {Timestamp(ts.asULL() + i), term}; } + + // If we abort a transaction that has reserved an optime, we should make sure to update the + // stable timestamp if necessary, since this oplog hole may have been holding back the stable + // timestamp. + opCtx->recoveryUnit()->onRollback( + [replCoord]() { replCoord->attemptToAdvanceStableTimestamp(); }); + return oplogSlots; } diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index 7ea3e302497..73c348c8d30 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -663,13 +663,6 @@ TransactionParticipant::OplogSlotReserver::~OplogSlotReserver() { // side transaction. _recoveryUnit->abortUnitOfWork(); } - - // After releasing the oplog hole, the all_durable timestamp can advance past this oplog hole, - // if there are no other open holes. Check if we can advance the stable timestamp any further - // since a majority write may be waiting on the stable timestamp to advance beyond this oplog - // hole to acknowledge the write to the user. - auto replCoord = repl::ReplicationCoordinator::get(_opCtx); - replCoord->attemptToAdvanceStableTimestamp(); } TransactionParticipant::TxnResources::TxnResources(WithLock wl, |