diff options
27 files changed, 519 insertions, 66 deletions
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_initsync_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/replica_sets_initsync_jscore_passthrough.yml index 5ebf921108d..dba7fd9b567 100644 --- a/buildscripts/resmokeconfig/suites/replica_sets_initsync_jscore_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/replica_sets_initsync_jscore_passthrough.yml @@ -77,6 +77,7 @@ selector: # TODO (SERVER-35865): Unblacklist when we correctly write and apply 'commitTransaction' oplog # entries. - jstests/core/txns/commit_prepared_transaction.js + - jstests/core/txns/timestamped_reads_wait_for_prepare_oplog_visibility.js run_hook_interval: &run_hook_interval 20 executor: diff --git a/buildscripts/resmokeconfig/suites/replica_sets_initsync_static_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/replica_sets_initsync_static_jscore_passthrough.yml index 693bbc87c8f..4ececd471fd 100644 --- a/buildscripts/resmokeconfig/suites/replica_sets_initsync_static_jscore_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/replica_sets_initsync_static_jscore_passthrough.yml @@ -14,6 +14,7 @@ selector: # TODO (SERVER-35865): Unblacklist when we correctly write and apply 'commitTransaction' oplog # entries. - jstests/core/txns/commit_prepared_transaction.js + - jstests/core/txns/timestamped_reads_wait_for_prepare_oplog_visibility.js run_hook_interval: &run_hook_interval 20 executor: diff --git a/buildscripts/resmokeconfig/suites/replica_sets_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/replica_sets_jscore_passthrough.yml index 47d09c7e859..40a40434b62 100644 --- a/buildscripts/resmokeconfig/suites/replica_sets_jscore_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/replica_sets_jscore_passthrough.yml @@ -26,6 +26,7 @@ selector: # TODO (SERVER-35865): Unblacklist when we correctly write and apply 'commitTransaction' oplog # entries. - jstests/core/txns/commit_prepared_transaction.js + - jstests/core/txns/timestamped_reads_wait_for_prepare_oplog_visibility.js executor: archive: diff --git a/buildscripts/resmokeconfig/suites/replica_sets_kill_secondaries_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/replica_sets_kill_secondaries_jscore_passthrough.yml index 90c3e9bc1ae..3e26a655d41 100644 --- a/buildscripts/resmokeconfig/suites/replica_sets_kill_secondaries_jscore_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/replica_sets_kill_secondaries_jscore_passthrough.yml @@ -17,6 +17,7 @@ selector: # TODO (SERVER-35865): Unblacklist when we correctly write and apply 'commitTransaction' oplog # entries. - jstests/core/txns/commit_prepared_transaction.js + - jstests/core/txns/timestamped_reads_wait_for_prepare_oplog_visibility.js executor: archive: diff --git a/buildscripts/resmokeconfig/suites/sharded_core_txns.yml b/buildscripts/resmokeconfig/suites/sharded_core_txns.yml index a9f77e85e4e..7f37c9c1c28 100644 --- a/buildscripts/resmokeconfig/suites/sharded_core_txns.yml +++ b/buildscripts/resmokeconfig/suites/sharded_core_txns.yml @@ -22,6 +22,7 @@ selector: - jstests/core/txns/no_new_transactions_when_prepared_transaction_in_progress.js - jstests/core/txns/prepare_committed_transaction.js - jstests/core/txns/prepare_conflict.js + - jstests/core/txns/timestamped_reads_wait_for_prepare_oplog_visibility.js # Mongos doesn't upconvert from local or majority level readConcern to snapshot. - jstests/core/txns/upconvert_read_concern.js diff --git a/jstests/core/txns/prepare_conflict.js b/jstests/core/txns/prepare_conflict.js index 5fc81d0cdff..870af78909a 100644 --- a/jstests/core/txns/prepare_conflict.js +++ b/jstests/core/txns/prepare_conflict.js @@ -32,9 +32,16 @@ assert(prepareConflicted); } - // Insert the document. - const doc1 = {_id: 1, x: 1}; - assert.commandWorked(testColl.insert(doc1)); + // Insert a document modified by the transaction. + const txnDoc = {_id: 1, x: 1}; + assert.commandWorked(testColl.insert(txnDoc)); + + // Insert a document unmodified by the transaction. + const otherDoc = {_id: 2, y: 2}; + assert.commandWorked(testColl.insert(otherDoc)); + + // Create an index on 'y' to avoid conflicts on the field. + assert.commandWorked(testColl.createIndex({y: 1})); // Enable the profiler to log slow queries. We expect a 'find' to hang until the prepare // conflict is resolved. @@ -45,19 +52,31 @@ session.startTransaction({readConcern: {level: "snapshot"}}); assert.commandWorked(sessionDB.runCommand({ update: collName, - updates: [{q: doc1, u: {$inc: {x: 1}}}], + updates: [{q: txnDoc, u: {$inc: {x: 1}}}], })); assert.commandWorked(sessionDB.adminCommand({prepareTransaction: 1})); - assertPrepareConflict({_id: 1}); + + // Conflict on _id of prepared document. + assertPrepareConflict({_id: txnDoc._id}); + + // Conflict on field that could be added to a prepared document. + assertPrepareConflict({randomField: "random"}); + + // No conflict on _id of a non-prepared document. + assert.commandWorked(testDB.runCommand({find: collName, filter: {_id: otherDoc._id}})); + + // No conflict on indexed field of a non-prepared document. + assert.commandWorked(testDB.runCommand({find: collName, filter: {y: otherDoc.y}})); // At this point, we can guarantee all subsequent reads will conflict. Do a read in a parallel // shell, abort the transaction, then ensure the read succeeded with the old document. TestData.collName = collName; TestData.dbName = dbName; + TestData.txnDoc = txnDoc; const findAwait = startParallelShell(function() { const it = db.getSiblingDB(TestData.dbName) - .runCommand({find: TestData.collName, filter: {_id: 1}}); + .runCommand({find: TestData.collName, filter: {_id: TestData.txnDoc._id}}); }, db.getMongo().port); session.abortTransaction(); @@ -66,5 +85,5 @@ findAwait({checkExitSuccess: true}); // The document should be unmodified, because we aborted. - assert.eq(doc1, testColl.findOne(doc1)); + assert.eq(txnDoc, testColl.findOne(txnDoc)); })(); diff --git a/jstests/core/txns/timestamped_reads_wait_for_prepare_oplog_visibility.js b/jstests/core/txns/timestamped_reads_wait_for_prepare_oplog_visibility.js new file mode 100644 index 00000000000..74671ee8439 --- /dev/null +++ b/jstests/core/txns/timestamped_reads_wait_for_prepare_oplog_visibility.js @@ -0,0 +1,231 @@ +/** + * Tests that timestamped reads, reads with snapshot and afterClusterTime, wait for the prepare + * transaction oplog entry to be visible before choosing a read timestamp. + * + * @tags: [uses_transactions] + */ +(function() { + 'use strict'; + load("jstests/libs/check_log.js"); + load('jstests/core/txns/libs/prepare_helpers.js'); + load('jstests/libs/parallel_shell_helpers.js'); + + TestData.dbName = 'test'; + const baseCollName = 'timestamped_reads_wait_for_prepare_oplog_visibility'; + const testDB = db.getSiblingDB(TestData.dbName); + TestData.failureTimeout = 1 * 1000; // 1 second. + TestData.successTimeout = 5 * 60 * 1000; // 5 minutes. + TestData.txnDoc = {_id: 1, x: 1}; + TestData.otherDoc = {_id: 2, y: 7}; + TestData.txnDocFilter = {_id: TestData.txnDoc._id}; + TestData.otherDocFilter = {_id: TestData.otherDoc._id}; + + /** + * A function that accepts a 'readFunc' and a collection name. 'readFunc' accepts a collection + * name and returns an object with an 'oplogVisibility' test field and a 'prepareConflict' test + * field. This function is run in a separate thread and tests that oplog visibility blocks + * certain reads and that prepare conflicts block other types of reads. + */ + const readThreadFunc = function(readFunc, _collName) { + load("jstests/libs/check_log.js"); + + // Do not start reads until we are blocked in 'prepareTransaction'. + checkLog.contains(db.getMongo(), "hangAfterReservingPrepareTimestamp fail point enabled"); + + // Create a 'readFuncObj' from the 'readFunc'. + const readFuncObj = readFunc(_collName); + readFuncObj.oplogVisibility(); + + // Let the transaction finish preparing and wait for 'prepareTransaction' to complete. + assert.commandWorked(db.adminCommand( + {configureFailPoint: 'hangAfterReservingPrepareTimestamp', mode: 'off'})); + checkLog.contains(db.getMongo(), "command: prepareTransaction"); + + readFuncObj.prepareConflict(); + }; + + function runTest(prefix, readFunc) { + // Reset the log history between tests. + assert.commandWorked(db.adminCommand({clearLog: 'global'})); + + jsTestLog('Testing oplog visibility for ' + prefix); + const collName = baseCollName + '_' + prefix; + const testColl = testDB.getCollection(collName); + + testColl.drop({writeConcern: {w: "majority"}}); + assert.commandWorked(testDB.runCommand({create: collName, writeConcern: {w: 'majority'}})); + + assert.commandWorked(testDB.adminCommand( + {configureFailPoint: 'hangAfterReservingPrepareTimestamp', mode: 'alwaysOn'})); + + // Insert a document for the transaction. + assert.commandWorked(testColl.insert(TestData.txnDoc)); + // Insert a document untouched by the transaction. + assert.commandWorked(testColl.insert(TestData.otherDoc)); + + // Start a transaction with a single update on the 'txnDoc'. + const session = db.getMongo().startSession({causalConsistency: false}); + const sessionDB = session.getDatabase(TestData.dbName); + session.startTransaction({readConcern: {level: 'snapshot'}}); + assert.commandWorked(sessionDB[collName].update(TestData.txnDoc, {$inc: {x: 1}})); + + // We set the log level up to know when 'prepareTransaction' completes. + db.setLogLevel(1); + + // Clear the log history to ensure we only see the most recent 'prepareTransaction' + // failpoint log message. + assert.commandWorked(db.adminCommand({clearLog: 'global'})); + const joinReadThread = startParallelShell(funWithArgs(readThreadFunc, readFunc, collName)); + + jsTestLog("Preparing the transaction for " + prefix); + const prepareTimestamp = PrepareHelpers.prepareTransaction(session); + + db.setLogLevel(0); + joinReadThread({checkExitSuccess: true}); + + PrepareHelpers.commitTransaction(session, prepareTimestamp); + } + + const snapshotRead = function(_collName) { + const _db = db.getSiblingDB(TestData.dbName); + + const session = db.getMongo().startSession({causalConsistency: false}); + const sessionDB = session.getDatabase(TestData.dbName); + session.startTransaction({readConcern: {level: 'snapshot'}}); + + const oplogVisibility = function() { + jsTestLog("Snapshot reads should block on oplog visibility."); + assert.commandFailedWithCode(sessionDB.runCommand({ + find: _collName, + filter: TestData.txnDocFilter, + maxTimeMS: TestData.failureTimeout + }), + ErrorCodes.MaxTimeMSExpired); + assert.commandFailedWithCode(sessionDB.runCommand({ + find: _collName, + filter: TestData.otherDocFilter, + maxTimeMS: TestData.failureTimeout + }), + ErrorCodes.MaxTimeMSExpired); + }; + + const prepareConflict = function() { + jsTestLog("Snapshot reads should block on prepared transactions for " + + "conflicting documents."); + assert.commandFailedWithCode(sessionDB.runCommand({ + find: _collName, + filter: TestData.txnDocFilter, + maxTimeMS: TestData.failureTimeout + }), + ErrorCodes.MaxTimeMSExpired); + + jsTestLog("Snapshot reads should succeed on non-conflicting documents while a " + + "transaction is in prepare."); + let cursor = assert.commandWorked(sessionDB.runCommand({ + find: _collName, + filter: TestData.otherDocFilter, + maxTimeMS: TestData.successTimeout + })); + assert.docEq(cursor.cursor.firstBatch, [TestData.otherDoc], tojson(cursor)); + }; + + return {oplogVisibility: oplogVisibility, prepareConflict: prepareConflict}; + }; + + const afterClusterTime = function(_collName) { + const _db = db.getSiblingDB(TestData.dbName); + + // Advance the cluster time with an arbitrary other insert. + let res = assert.commandWorked( + _db.runCommand({insert: _collName, documents: [{advanceClusterTime: 1}]})); + assert(res.hasOwnProperty("$clusterTime"), tojson(res)); + assert(res.$clusterTime.hasOwnProperty("clusterTime"), tojson(res)); + const clusterTime = res.$clusterTime.clusterTime; + jsTestLog("Using afterClusterTime: " + clusterTime); + + const oplogVisibility = function() { + jsTestLog("afterClusterTime reads should block on oplog visibility."); + assert.commandFailedWithCode(_db.runCommand({ + find: _collName, + filter: TestData.txnDocFilter, + readConcern: {afterClusterTime: clusterTime}, + maxTimeMS: TestData.failureTimeout + }), + ErrorCodes.MaxTimeMSExpired); + assert.commandFailedWithCode(_db.runCommand({ + find: _collName, + filter: TestData.otherDocFilter, + readConcern: {afterClusterTime: clusterTime}, + maxTimeMS: TestData.failureTimeout + }), + ErrorCodes.MaxTimeMSExpired); + }; + + const prepareConflict = function() { + jsTestLog("afterClusterTime reads should block on prepared transactions for " + + "conflicting documents."); + assert.commandFailedWithCode(_db.runCommand({ + find: _collName, + filter: TestData.txnDocFilter, + readConcern: {afterClusterTime: clusterTime}, + maxTimeMS: TestData.failureTimeout + }), + ErrorCodes.MaxTimeMSExpired); + + jsTestLog("afterClusterTime reads should succeed on non-conflicting documents " + + "while transaction is in prepare."); + let cursor = assert.commandWorked(_db.runCommand({ + find: _collName, + filter: TestData.otherDocFilter, + readConcern: {afterClusterTime: clusterTime}, + maxTimeMS: TestData.successTimeout + })); + assert.docEq(cursor.cursor.firstBatch, [TestData.otherDoc], tojson(cursor)); + }; + + return {oplogVisibility: oplogVisibility, prepareConflict: prepareConflict}; + }; + + const normalRead = function(_collName) { + const _db = db.getSiblingDB(TestData.dbName); + + const oplogVisibility = function() { + jsTestLog("Ordinary reads should not block on oplog visibility."); + let cursor = assert.commandWorked(_db.runCommand({ + find: _collName, + filter: TestData.txnDocFilter, + maxTimeMS: TestData.successTimeout + })); + assert.docEq(cursor.cursor.firstBatch, [TestData.txnDoc], tojson(cursor)); + cursor = assert.commandWorked(_db.runCommand({ + find: _collName, + filter: TestData.otherDocFilter, + maxTimeMS: TestData.successTimeout + })); + assert.docEq(cursor.cursor.firstBatch, [TestData.otherDoc], tojson(cursor)); + }; + + const prepareConflict = function() { + jsTestLog("Ordinary reads should not block on prepared transactions."); + // TODO (SERVER-36382): Uncomment this block when local reads don't cause prepare + // conflicts. + // cursor = assert.commandWorked(_db.runCommand( + // {find: _collName, filter: TestData.txnDocFilter, maxTimeMS: + // TestData.successTimeout})); + // assert.docEq(cursor.cursor.firstBatch, [TestData.txnDoc], tojson(cursor)); + let cursor = assert.commandWorked(_db.runCommand({ + find: _collName, + filter: TestData.otherDocFilter, + maxTimeMS: TestData.successTimeout + })); + assert.docEq(cursor.cursor.firstBatch, [TestData.otherDoc], tojson(cursor)); + }; + + return {oplogVisibility: oplogVisibility, prepareConflict: prepareConflict}; + }; + + runTest('normal_reads', normalRead); + // TODO (SERVER-35821): Unblacklist this snapshot reads test. + // runTest('snapshot_reads', snapshotRead); + runTest('afterClusterTime', afterClusterTime); +})(); diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index c242abd8d26..e7a9d32fba5 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -645,6 +645,7 @@ env.Library( 'logical_session_id', 'namespace_string', 'repl/oplog_entry', + 'repl/oplog_shim', 's/sharding_api_d', 'views/views', '$BUILD_DIR/mongo/db/commands/test_commands_enabled', diff --git a/src/mongo/db/catalog/uuid_catalog.h b/src/mongo/db/catalog/uuid_catalog.h index 54e05d33177..b561e1d1a3d 100644 --- a/src/mongo/db/catalog/uuid_catalog.h +++ b/src/mongo/db/catalog/uuid_catalog.h @@ -115,7 +115,7 @@ public: const NamespaceString& collectionName, OptionalCollectionUUID uuid) override {} void onTransactionCommit(OperationContext* opCtx, bool wasPrepared) override {} - void onTransactionPrepare(OperationContext* opCtx) override {} + void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) override {} void onTransactionAbort(OperationContext* opCtx) override {} void onReplicationRollback(OperationContext* opCtx, const RollbackObserverInfo& rbInfo) override {} diff --git a/src/mongo/db/free_mon/free_mon_op_observer.h b/src/mongo/db/free_mon/free_mon_op_observer.h index e70f5d77932..7b4d5861a2d 100644 --- a/src/mongo/db/free_mon/free_mon_op_observer.h +++ b/src/mongo/db/free_mon/free_mon_op_observer.h @@ -133,7 +133,7 @@ public: void onTransactionCommit(OperationContext* opCtx, bool wasPrepared) final {} - void onTransactionPrepare(OperationContext* opCtx) final {} + void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) final {} void onTransactionAbort(OperationContext* opCtx) final {} diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h index f2d3c766045..ebc0590bad0 100644 --- a/src/mongo/db/op_observer.h +++ b/src/mongo/db/op_observer.h @@ -269,8 +269,10 @@ public: /** * The onTransactionPrepare method is called when an atomic transaction is prepared. It must be * called when a transaction is active. + * + * The 'prepareOpTime' is passed in to be used as the OpTime of the oplog entry. */ - virtual void onTransactionPrepare(OperationContext* opCtx) = 0; + virtual void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) = 0; /** * The onTransactionAbort method is called when an atomic transaction aborts, before the diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index e75618ed99e..16b9701cd47 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -294,7 +294,8 @@ OpTimeBundle replLogApplyOps(OperationContext* opCtx, const OperationSessionInfo& sessionInfo, StmtId stmtId, const repl::OplogLink& oplogLink, - bool prepare) { + bool prepare, + const OplogSlot& oplogSlot) { OpTimeBundle times; times.wallClockTime = getWallClockTimeForOpLog(opCtx); times.writeOpTime = logOperation(opCtx, @@ -309,7 +310,7 @@ OpTimeBundle replLogApplyOps(OperationContext* opCtx, stmtId, oplogLink, prepare, - OplogSlot()); + oplogSlot); return times; } @@ -885,7 +886,7 @@ void OpObserverImpl::onApplyOps(OperationContext* opCtx, // Only transactional 'applyOps' commands can be prepared. constexpr bool prepare = false; - replLogApplyOps(opCtx, cmdNss, applyOpCmd, {}, kUninitializedStmtId, {}, prepare); + replLogApplyOps(opCtx, cmdNss, applyOpCmd, {}, kUninitializedStmtId, {}, prepare, OplogSlot()); AuthorizationManager::get(opCtx->getServiceContext()) ->logOp(opCtx, "c", cmdNss, applyOpCmd, nullptr); @@ -923,7 +924,7 @@ namespace { OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx, Session* const session, std::vector<repl::ReplOperation> stmts, - bool prepare) { + const OplogSlot& prepareOplogSlot) { BSONObjBuilder applyOpsBuilder; BSONArrayBuilder opsArray(applyOpsBuilder.subarrayStart("applyOps"_sd)); for (auto& stmt : stmts) { @@ -943,9 +944,11 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx, invariant(oplogLink.prevOpTime.isNull()); try { + // We are only given an oplog slot for prepared transactions. + auto prepare = !prepareOplogSlot.opTime.isNull(); auto applyOpCmd = applyOpsBuilder.done(); - auto times = - replLogApplyOps(opCtx, cmdNss, applyOpCmd, sessionInfo, stmtId, oplogLink, prepare); + auto times = replLogApplyOps( + opCtx, cmdNss, applyOpCmd, sessionInfo, stmtId, oplogLink, prepare, prepareOplogSlot); onWriteOpCompleted( opCtx, cmdNss, session, {stmtId}, times.writeOpTime, times.wallClockTime); @@ -996,32 +999,29 @@ void OpObserverImpl::onTransactionCommit(OperationContext* opCtx, bool wasPrepar return; const auto commitOpTime = - logApplyOpsForTransaction(opCtx, session, stmts, false /* prepare */).writeOpTime; + logApplyOpsForTransaction(opCtx, session, stmts, OplogSlot()).writeOpTime; invariant(!commitOpTime.isNull()); } } -void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx) { +void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) { invariant(opCtx->getTxnNumber()); Session* const session = OperationContextSession::get(opCtx); invariant(session); invariant(session->inMultiDocumentTransaction()); + invariant(!prepareOpTime.opTime.isNull()); auto stmts = session->endTransactionAndRetrieveOperations(opCtx); // We write the oplog entry in a side transaction so that we do not commit the now-prepared - // transaction. We then return to the main transaction and set its 'prepareTimestamp'. + // transaction. // We write an empty 'applyOps' entry if there were no writes to choose a prepare timestamp // and allow this transaction to be continued on failover. - repl::OpTime prepareOpTime; { Session::SideTransactionBlock sideTxn(opCtx); WriteUnitOfWork wuow(opCtx); - prepareOpTime = - logApplyOpsForTransaction(opCtx, session, stmts, true /* prepare */).writeOpTime; + logApplyOpsForTransaction(opCtx, session, stmts, prepareOpTime); wuow.commit(); } - invariant(!prepareOpTime.isNull()); - opCtx->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); } void OpObserverImpl::onTransactionAbort(OperationContext* opCtx) { diff --git a/src/mongo/db/op_observer_impl.h b/src/mongo/db/op_observer_impl.h index 4931629a64c..1fa7e076eb4 100644 --- a/src/mongo/db/op_observer_impl.h +++ b/src/mongo/db/op_observer_impl.h @@ -111,7 +111,7 @@ public: const NamespaceString& collectionName, OptionalCollectionUUID uuid); void onTransactionCommit(OperationContext* opCtx, bool wasPrepared) override; - void onTransactionPrepare(OperationContext* opCtx) override; + void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) override; void onTransactionAbort(OperationContext* opCtx) override; void onReplicationRollback(OperationContext* opCtx, const RollbackObserverInfo& rbInfo) override; diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp index 0fd3455b930..048b4491dae 100644 --- a/src/mongo/db/op_observer_impl_test.cpp +++ b/src/mongo/db/op_observer_impl_test.cpp @@ -593,7 +593,12 @@ TEST_F(OpObserverTransactionTest, TransactionalPrepareTest) { opObserver().onDelete(opCtx(), nss1, uuid1, 0, false, boost::none); session()->transitionToPreparedforTest(); - opObserver().onTransactionPrepare(opCtx()); + { + WriteUnitOfWork wuow(opCtx()); + OplogSlot slot = repl::getNextOpTime(opCtx()); + opObserver().onTransactionPrepare(opCtx(), slot); + opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp()); + } auto oplogEntryObj = getSingleOplogEntry(opCtx()); checkCommonFields(oplogEntryObj); @@ -654,7 +659,12 @@ TEST_F(OpObserverTransactionTest, PreparingEmptyTransactionLogsEmptyApplyOps) { session()->unstashTransactionResources(opCtx(), "prepareTransaction"); session()->transitionToPreparedforTest(); - opObserver().onTransactionPrepare(opCtx()); + { + WriteUnitOfWork wuow(opCtx()); + OplogSlot slot = repl::getNextOpTime(opCtx()); + opObserver().onTransactionPrepare(opCtx(), slot); + opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp()); + } auto oplogEntryObj = getSingleOplogEntry(opCtx()); checkCommonFields(oplogEntryObj); diff --git a/src/mongo/db/op_observer_noop.h b/src/mongo/db/op_observer_noop.h index b04fce35c50..98be250f0cf 100644 --- a/src/mongo/db/op_observer_noop.h +++ b/src/mongo/db/op_observer_noop.h @@ -110,7 +110,7 @@ public: const NamespaceString& collectionName, OptionalCollectionUUID uuid) override {} void onTransactionCommit(OperationContext* opCtx, bool wasPrepared) override{}; - void onTransactionPrepare(OperationContext* opCtx) override{}; + void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) override{}; void onTransactionAbort(OperationContext* opCtx) override{}; void onReplicationRollback(OperationContext* opCtx, const RollbackObserverInfo& rbInfo) override {} diff --git a/src/mongo/db/op_observer_registry.h b/src/mongo/db/op_observer_registry.h index d1a34bbef6e..a89950d81cd 100644 --- a/src/mongo/db/op_observer_registry.h +++ b/src/mongo/db/op_observer_registry.h @@ -222,10 +222,10 @@ public: o->onTransactionCommit(opCtx, wasPrepared); } - void onTransactionPrepare(OperationContext* opCtx) override { + void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) override { ReservedTimes times{opCtx}; for (auto& observer : _observers) { - observer->onTransactionPrepare(opCtx); + observer->onTransactionPrepare(opCtx, prepareOpTime); } } diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index db50f0278c7..a5f66564955 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -31,6 +31,16 @@ env.Library( ) env.Library( + target="oplog_shim", + source=[ + "oplog_shim.cpp", + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/base', + ], +) + +env.Library( target='oplogreader', source=[ 'oplogreader.cpp', diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index db12da59795..93906fc21a5 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -678,7 +678,7 @@ void createOplog(OperationContext* opCtx) { createOplog(opCtx, localOplogInfo(opCtx->getServiceContext()).oplogName, isReplSet); } -OplogSlot getNextOpTime(OperationContext* opCtx) { +MONGO_REGISTER_SHIM(GetNextOpTimeClass::getNextOpTime)(OperationContext* opCtx)->OplogSlot { // The local oplog collection pointer must already be established by this point. // We can't establish it here because that would require locking the local database, which would // be a lock order violation. diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h index b209b695fa3..565ab7f31cb 100644 --- a/src/mongo/db/repl/oplog.h +++ b/src/mongo/db/repl/oplog.h @@ -31,6 +31,7 @@ #include <string> #include <vector> +#include "mongo/base/shim.h" #include "mongo/base/status.h" #include "mongo/bson/bsonobj.h" #include "mongo/bson/timestamp.h" @@ -264,11 +265,20 @@ void createIndexForApplyOps(OperationContext* opCtx, IncrementOpsAppliedStatsFn incrementOpsAppliedStats, OplogApplication::Mode mode); -/** - * Allocates optimes for new entries in the oplog. Returns an OplogSlot or a vector of OplogSlots, - * which contain the new optimes along with their terms and newly calculated hash fields. - */ -OplogSlot getNextOpTime(OperationContext* opCtx); +// Shims currently do not support free functions so we wrap getNextOpTime in a class as a +// workaround. +struct GetNextOpTimeClass { + /** + * Allocates optimes for new entries in the oplog. Returns an OplogSlot or a vector of + * OplogSlots, which contain the new optimes along with their terms and newly calculated hash + * fields. + */ + static MONGO_DECLARE_SHIM((OperationContext * opCtx)->OplogSlot) getNextOpTime; +}; + +inline OplogSlot getNextOpTime(OperationContext* opCtx) { + return GetNextOpTimeClass::getNextOpTime(opCtx); +} /** * Allocates an OpTime, but does not update the storage engine with the timestamp. This is used to diff --git a/src/mongo/db/repl/oplog_shim.cpp b/src/mongo/db/repl/oplog_shim.cpp new file mode 100644 index 00000000000..4f6ad99932f --- /dev/null +++ b/src/mongo/db/repl/oplog_shim.cpp @@ -0,0 +1,35 @@ +/** +* Copyright (C) 2018 MongoDB Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +* +* As a special exception, the copyright holders give permission to link the +* code of portions of this program with the OpenSSL library under certain +* conditions as described in each individual source file and distribute +* linked combinations including the program with the OpenSSL library. You +* must comply with the GNU Affero General Public License in all respects for +* all of the code used other than as permitted herein. If you modify file(s) +* with this exception, you may extend this exception to your version of the +* file(s), but you are not obligated to do so. If you do not wish to do so, +* delete this exception statement from your version. If you delete this +* exception statement from all source files in the program, then also delete +* it in the license file. +*/ + +#include "mongo/db/repl/oplog.h" + +namespace mongo { +namespace repl { +MONGO_DEFINE_SHIM(GetNextOpTimeClass::getNextOpTime); +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/s/config_server_op_observer.h b/src/mongo/db/s/config_server_op_observer.h index 193cb29320c..ebed73cce59 100644 --- a/src/mongo/db/s/config_server_op_observer.h +++ b/src/mongo/db/s/config_server_op_observer.h @@ -133,7 +133,7 @@ public: void onTransactionCommit(OperationContext* opCtx, bool wasPrepared) override {} - void onTransactionPrepare(OperationContext* opCtx) override {} + void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) override {} void onTransactionAbort(OperationContext* opCtx) override {} diff --git a/src/mongo/db/s/shard_server_op_observer.h b/src/mongo/db/s/shard_server_op_observer.h index 21001318ac6..9edc8da25fa 100644 --- a/src/mongo/db/s/shard_server_op_observer.h +++ b/src/mongo/db/s/shard_server_op_observer.h @@ -134,7 +134,7 @@ public: void onTransactionCommit(OperationContext* opCtx, bool wasPrepared) override {} - void onTransactionPrepare(OperationContext* opCtx) override {} + void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) override {} void onTransactionAbort(OperationContext* opCtx) override {} diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp index 501895072e3..bb892794afa 100644 --- a/src/mongo/db/session.cpp +++ b/src/mongo/db/session.cpp @@ -286,6 +286,8 @@ MONGO_FAIL_POINT_DEFINE(onPrimaryTransactionalWrite); // Failpoint which will pause an operation just after allocating a point-in-time storage engine // transaction. MONGO_FAIL_POINT_DEFINE(hangAfterPreallocateSnapshot); + +MONGO_FAIL_POINT_DEFINE(hangAfterReservingPrepareTimestamp); } // namespace const BSONObj Session::kDeadEndSentinel(BSON("$incompleteOplogHistory" << 1)); @@ -636,6 +638,47 @@ void Session::_checkTxnValid(WithLock, TxnNumber txnNumber) const { txnNumber >= _activeTxnNumber); } +Session::OplogSlotReserver::OplogSlotReserver(OperationContext* opCtx) { + // Stash the transaction on the OperationContext on the stack. At the end of this function it + // will be unstashed onto the OperationContext. + Session::SideTransactionBlock sideTxn(opCtx); + + // Begin a new WUOW and reserve a slot in the oplog. + WriteUnitOfWork wuow(opCtx); + _oplogSlot = repl::getNextOpTime(opCtx); + + // Release the WUOW state since this WUOW is no longer in use. + wuow.release(); + + // The new transaction should have an empty locker, and thus we do not need to save it. + invariant(opCtx->lockState()->getClientState() == Locker::ClientState::kInactive); + _locker = opCtx->swapLockState(stdx::make_unique<LockerImpl>()); + _locker->unsetThreadId(); + + // This thread must still respect the transaction lock timeout, since it can prevent the + // transaction from making progress. + auto maxTransactionLockMillis = maxTransactionLockRequestTimeoutMillis.load(); + if (maxTransactionLockMillis >= 0) { + opCtx->lockState()->setMaxLockTimeout(Milliseconds(maxTransactionLockMillis)); + } + + // Save the RecoveryUnit from the new transaction and replace it with an empty one. + _recoveryUnit = std::unique_ptr<RecoveryUnit>(opCtx->releaseRecoveryUnit()); + opCtx->setRecoveryUnit(opCtx->getServiceContext()->getStorageEngine()->newRecoveryUnit(), + WriteUnitOfWork::RecoveryUnitState::kNotInUnitOfWork); +} + +Session::OplogSlotReserver::~OplogSlotReserver() { + // If the constructor did not complete, we do not attempt to abort the units of work. + if (_recoveryUnit) { + // We should be at WUOW nesting level 1, only the top level WUOW for the oplog reservation + // side transaction. + _locker->endWriteUnitOfWork(); + invariant(!_locker->inAWriteUnitOfWork()); + _recoveryUnit->abortUnitOfWork(); + } +} + Session::TxnResources::TxnResources(OperationContext* opCtx) { _ruState = opCtx->getWriteUnitOfWork()->release(); opCtx->setWriteUnitOfWork(nullptr); @@ -683,8 +726,10 @@ void Session::TxnResources::release(OperationContext* opCtx) { opCtx->swapLockState(std::move(_locker)); opCtx->lockState()->updateThreadIdToCurrentThread(); - opCtx->setRecoveryUnit(_recoveryUnit.release(), - WriteUnitOfWork::RecoveryUnitState::kNotInUnitOfWork); + auto oldState = opCtx->setRecoveryUnit(_recoveryUnit.release(), + WriteUnitOfWork::RecoveryUnitState::kNotInUnitOfWork); + invariant(oldState == WriteUnitOfWork::RecoveryUnitState::kNotInUnitOfWork, + str::stream() << "RecoveryUnit state was " << oldState); opCtx->setWriteUnitOfWork(WriteUnitOfWork::createForSnapshotResume(opCtx, _ruState)); @@ -870,24 +915,50 @@ Timestamp Session::prepareTransaction(OperationContext* opCtx) { _txnState.transitionTo(lk, TransactionState::kPrepared); + // Reserve an optime for the 'prepareTimestamp'. This will create a hole in the oplog and cause + // 'snapshot' and 'afterClusterTime' readers to block until this transaction is done being + // prepared. When the OplogSlotReserver goes out of scope and is destroyed, the + // storage-transaction it uses to keep the hole open will abort and the slot (and corresponding + // oplog hole) will vanish. + OplogSlotReserver oplogSlotReserver(opCtx); + const auto prepareOplogSlot = oplogSlotReserver.getReservedOplogSlot(); + const auto prepareTimestamp = prepareOplogSlot.opTime.getTimestamp(); + + if (MONGO_FAIL_POINT(hangAfterReservingPrepareTimestamp)) { + // This log output is used in js tests so please leave it. + log() << "transaction - hangAfterReservingPrepareTimestamp fail point " + "enabled. Blocking until fail point is disabled. Prepare OpTime: " + << prepareOplogSlot.opTime; + MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangAfterReservingPrepareTimestamp); + } + + opCtx->recoveryUnit()->setPrepareTimestamp(prepareTimestamp); + opCtx->getWriteUnitOfWork()->prepare(); + // We need to unlock the session to run the opObserver onTransactionPrepare, which calls back // into the session. lk.unlock(); auto opObserver = opCtx->getServiceContext()->getOpObserver(); invariant(opObserver); - opObserver->onTransactionPrepare(opCtx); - lk.lock(); - _checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true); + opObserver->onTransactionPrepare(opCtx, prepareOplogSlot); - // Ensure that the transaction is still prepared. - invariant(_txnState.isPrepared(lk), str::stream() << "Current state: " << _txnState); + // After the oplog entry is written successfully, it is illegal to implicitly abort or fail. + try { + abortGuard.Dismiss(); - opCtx->getWriteUnitOfWork()->prepare(); + lk.lock(); - abortGuard.Dismiss(); + // Although we are not allowed to abort here, we check that we don't even try to. If we do + // try to, that is a bug and we will fassert below. + _checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true); - // Return the prepareTimestamp from the recovery unit. - return opCtx->recoveryUnit()->getPrepareTimestamp(); + // Ensure that the transaction is still prepared. + invariant(_txnState.isPrepared(lk), str::stream() << "Current state: " << _txnState); + } catch (...) { + severe() << "Illegal exception after transaction was prepared."; + fassertFailedWithStatus(50906, exceptionToStatus()); + } + return prepareTimestamp; } void Session::abortArbitraryTransaction() { diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h index 6faa0f28b5f..21fa7b9deb5 100644 --- a/src/mongo/db/session.h +++ b/src/mongo/db/session.h @@ -36,6 +36,7 @@ #include "mongo/db/logical_session_id.h" #include "mongo/db/multi_key_path_tracker.h" #include "mongo/db/operation_context.h" +#include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/db/session_txn_record_gen.h" @@ -483,6 +484,36 @@ private: const repl::OpTime& lastStmtIdWriteTs); /** + * Reserves a slot in the oplog with an open storage-transaction while it is alive. Reserves the + * slot at construction. Aborts the storage-transaction and releases the oplog slot at + * destruction. + */ + class OplogSlotReserver { + public: + OplogSlotReserver(OperationContext* opCtx); + + ~OplogSlotReserver(); + + // Rule of 5: because we have a class-defined destructor, we need to explictly specify + // the move operator and move assignment operator. + OplogSlotReserver(OplogSlotReserver&&) = default; + OplogSlotReserver& operator=(OplogSlotReserver&&) = default; + + /** + * Returns the oplog slot reserved at construction. + */ + OplogSlot getReservedOplogSlot() const { + invariant(!_oplogSlot.opTime.isNull()); + return _oplogSlot; + } + + private: + std::unique_ptr<Locker> _locker; + std::unique_ptr<RecoveryUnit> _recoveryUnit; + OplogSlot _oplogSlot; + }; + + /** * Indicates the state of the current multi-document transaction, if any. If the transaction is * in any state but kInProgress, no more operations can be collected. Once the transaction is in * kPrepared, the transaction is not allowed to abort outside of an 'abortTransaction' command. diff --git a/src/mongo/db/session_test.cpp b/src/mongo/db/session_test.cpp index d5618ac91af..588b10dbee6 100644 --- a/src/mongo/db/session_test.cpp +++ b/src/mongo/db/session_test.cpp @@ -54,7 +54,6 @@ namespace { const NamespaceString kNss("TestDB", "TestColl"); const OptionalCollectionUUID kUUID; -const Timestamp kPrepareTimestamp(Timestamp(1, 1)); /** * Creates an OplogEntry with given parameters and preset defaults for this test suite. @@ -87,7 +86,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime, class OpObserverMock : public OpObserverNoop { public: - void onTransactionPrepare(OperationContext* opCtx) override; + void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) override; bool onTransactionPrepareThrowsException = false; bool transactionPrepared = false; stdx::function<void()> onTransactionPrepareFn = [this]() { transactionPrepared = true; }; @@ -100,13 +99,9 @@ public: }; }; -void OpObserverMock::onTransactionPrepare(OperationContext* opCtx) { +void OpObserverMock::onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) { ASSERT_TRUE(opCtx->lockState()->inAWriteUnitOfWork()); - OpObserverNoop::onTransactionPrepare(opCtx); - - // Get the recovery unit and set the prepareTimestamp. - RecoveryUnit* recoveryUnit = opCtx->recoveryUnit(); - recoveryUnit->setPrepareTimestamp(kPrepareTimestamp); + OpObserverNoop::onTransactionPrepare(opCtx, prepareOpTime); uassert(ErrorCodes::OperationFailed, "onTransactionPrepare() failed", @@ -1525,10 +1520,13 @@ TEST_F(SessionTest, KillSessionsDuringPrepareDoesNotAbortTransaction) { session.unstashTransactionResources(opCtx(), "prepareTransaction"); + auto ruPrepareTimestamp = Timestamp(); auto originalFn = _opObserver->onTransactionPrepareFn; _opObserver->onTransactionPrepareFn = [&]() { originalFn(); + ruPrepareTimestamp = opCtx()->recoveryUnit()->getPrepareTimestamp(); + ASSERT_FALSE(ruPrepareTimestamp.isNull()); // The transaction may be aborted without checking out the session. session.abortArbitraryTransaction(); ASSERT_FALSE(session.transactionIsAborted()); @@ -1536,12 +1534,12 @@ TEST_F(SessionTest, KillSessionsDuringPrepareDoesNotAbortTransaction) { // Check that prepareTimestamp gets set. auto prepareTimestamp = session.prepareTransaction(opCtx()); - ASSERT_EQ(kPrepareTimestamp, prepareTimestamp); + ASSERT_EQ(ruPrepareTimestamp, prepareTimestamp); ASSERT(_opObserver->transactionPrepared); ASSERT_FALSE(session.transactionIsAborted()); } -TEST_F(SessionTest, AbortDuringPrepareAbortsTransaction) { +DEATH_TEST_F(SessionTest, AbortDuringPrepareIsFatal, "Fatal assertion 50906") { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); @@ -1562,10 +1560,7 @@ TEST_F(SessionTest, AbortDuringPrepareAbortsTransaction) { ASSERT(session.transactionIsAborted()); }; - // A prepareTransaction() after an abort should uassert. - ASSERT_THROWS_CODE( - session.prepareTransaction(opCtx()), AssertionException, ErrorCodes::NoSuchTransaction); - ASSERT(session.transactionIsAborted()); + session.prepareTransaction(opCtx()); } TEST_F(SessionTest, ThrowDuringOnTransactionPrepareAbortsTransaction) { @@ -1834,9 +1829,18 @@ TEST_F(SessionTest, KillSessionsDoesNotAbortPreparedTransactions) { session.unstashTransactionResources(opCtx(), "insert"); - // Check that prepareTimestamp is set. + auto ruPrepareTimestamp = Timestamp(); + auto originalFn = _opObserver->onTransactionPrepareFn; + _opObserver->onTransactionPrepareFn = [&]() { + originalFn(); + + ruPrepareTimestamp = opCtx()->recoveryUnit()->getPrepareTimestamp(); + ASSERT_FALSE(ruPrepareTimestamp.isNull()); + }; + + // Check that prepareTimestamp gets set. auto prepareTimestamp = session.prepareTransaction(opCtx()); - ASSERT_EQ(kPrepareTimestamp, prepareTimestamp); + ASSERT_EQ(ruPrepareTimestamp, prepareTimestamp); session.stashTransactionResources(opCtx()); session.abortArbitraryTransaction(); @@ -1856,9 +1860,19 @@ TEST_F(SessionTest, CannotStartNewTransactionWhilePreparedTransactionInProgress) session.unstashTransactionResources(opCtx(), "insert"); + auto ruPrepareTimestamp = Timestamp(); + auto originalFn = _opObserver->onTransactionPrepareFn; + _opObserver->onTransactionPrepareFn = [&]() { + originalFn(); + + ruPrepareTimestamp = opCtx()->recoveryUnit()->getPrepareTimestamp(); + ASSERT_FALSE(ruPrepareTimestamp.isNull()); + }; + // Check that prepareTimestamp gets set. auto prepareTimestamp = session.prepareTransaction(opCtx()); - ASSERT_EQ(kPrepareTimestamp, prepareTimestamp); + ASSERT_EQ(ruPrepareTimestamp, prepareTimestamp); + session.stashTransactionResources(opCtx()); // Try to start a new transaction while there is already a prepared transaction on the diff --git a/src/mongo/db/storage/write_unit_of_work.cpp b/src/mongo/db/storage/write_unit_of_work.cpp index 0a37830e0ad..e7ae368ada9 100644 --- a/src/mongo/db/storage/write_unit_of_work.cpp +++ b/src/mongo/db/storage/write_unit_of_work.cpp @@ -100,4 +100,16 @@ void WriteUnitOfWork::commit() { _committed = true; } +std::ostream& operator<<(std::ostream& os, WriteUnitOfWork::RecoveryUnitState state) { + switch (state) { + case WriteUnitOfWork::kNotInUnitOfWork: + return os << "NotInUnitOfWork"; + case WriteUnitOfWork::kActiveUnitOfWork: + return os << "ActiveUnitOfWork"; + case WriteUnitOfWork::kFailedUnitOfWork: + return os << "FailedUnitOfWork"; + } + MONGO_UNREACHABLE; +} + } // namespace mongo diff --git a/src/mongo/db/storage/write_unit_of_work.h b/src/mongo/db/storage/write_unit_of_work.h index 410dda73bfd..d5d7f08ecf9 100644 --- a/src/mongo/db/storage/write_unit_of_work.h +++ b/src/mongo/db/storage/write_unit_of_work.h @@ -107,4 +107,6 @@ private: bool _released = false; }; +std::ostream& operator<<(std::ostream& os, WriteUnitOfWork::RecoveryUnitState state); + } // namespace mongo |