diff options
author | Ian Boros <ian.boros@mongodb.com> | 2020-07-13 13:39:26 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-07-31 16:56:59 +0000 |
commit | 388817e4b26f3c3a7820affec248da20677eab4d (patch) | |
tree | a52b03c6d0be711fd0c04d70d4847adbe22959cf | |
parent | 70a223895e7d3ffd182d93518e58ec2961c6895f (diff) | |
download | mongo-388817e4b26f3c3a7820affec248da20677eab4d.tar.gz |
SERVER-47731 Add FCV check for $v:2 delta oplog entries
-rw-r--r-- | jstests/multiVersion/v2_delta_oplog_entries_fcv.js | 364 | ||||
-rw-r--r-- | src/mongo/db/auth/authz_manager_external_state_mock.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/auth/role_graph_update.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/exec/update_stage.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/exec/upsert_stage.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/ops/update.cpp | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 42 | ||||
-rw-r--r-- | src/mongo/db/update/update_driver.cpp | 21 | ||||
-rw-r--r-- | src/mongo/db/update/update_driver.h | 3 | ||||
-rw-r--r-- | src/mongo/db/update/update_driver_test.cpp | 13 | ||||
-rw-r--r-- | src/mongo/embedded/stitch_support/stitch_support.cpp | 6 |
11 files changed, 455 insertions, 18 deletions
diff --git a/jstests/multiVersion/v2_delta_oplog_entries_fcv.js b/jstests/multiVersion/v2_delta_oplog_entries_fcv.js new file mode 100644 index 00000000000..e17899888e7 --- /dev/null +++ b/jstests/multiVersion/v2_delta_oplog_entries_fcv.js @@ -0,0 +1,364 @@ +/* + * This file tests the usage of $v: 2 oplog entries along with its associated server parameter and + * the FCV flag. + */ +(function() { +"use strict"; + +load("jstests/libs/curop_helpers.js"); // For waitForCurOpByFailPoint(). + +const kLatest = "latest"; +const kLastStable = "4.4"; +const kCollName = "v2_delta_oplog_entries_fcv"; + +// When attempting to apply a $v: 2 oplog entry on a node with bin version latest and FCV set to +// 4.4, this code is thrown. +const kErrorCodeThrownWhenApplyingV2OplogEntriesWithFCV44 = 4773100; + +const kGiantStr = "x".repeat(100); + +/** + * Given a two-node ReplSetTest, sets the feature flag which enables V2 oplog entries on the + * primary and secondary. + */ +function enableV2OplogEntries(rst) { + const cmd = {setParameter: 1, internalQueryEnableLoggingV2OplogEntries: true}; + assert.commandWorked(rst.getPrimary().adminCommand(cmd)); + assert.commandWorked(rst.getSecondary().adminCommand(cmd)); +} + +/** + * Helper function which runs an update on the test document that is eligible to be logged as a + * $v: 2 delta style oplog entry. + */ +const kTestDocId = 0; +function runV2EligibleUpdate(coll) { + assert.commandWorked(coll.update({_id: kTestDocId}, [{$set: {a: {$add: ["$a", 1]}}}])); +} + +/** + * Returns the current value of the test document. + */ +function getTestDoc(coll) { + const arr = coll.find({_id: kTestDocId}).toArray(); + assert.eq(arr.length, 1); + return arr[0]; +} + +/** + * Helper function which will find the latest oplog entry for the test document. + */ +function getLatestOplogEntryForDoc(primaryDB) { + const oplogRes = primaryDB.getSiblingDB("local")["oplog.rs"] + .find({"o2._id": kTestDocId}) + .hint({$natural: -1}) + .limit(1) + .toArray(); + assert.eq(oplogRes.length, 1); + return oplogRes[0]; +} + +/** + * This function tests behavior around applying $v: 2 oplog entries. If 'errorCode' is null + * then the function checks that $v: 2 entries can be applied. Otherwise, it checks that + * attempting to apply a $v: 2 entry via applyOps results in the given code. + */ +function checkApplyOpsOfV2Entries(coll, errorCode) { + let testDoc = getTestDoc(coll); + + // Check that applying a $v:2 entry directly using applyOps works as expected. + const applyOpsRes = coll.getDB().adminCommand({ + applyOps: [{ + "op": "u", + "ns": coll.getFullName(), + "o2": {_id: kTestDocId}, + "o": {$v: NumberInt(2), diff: {u: {a: 0}}} + }] + }); + + if (errorCode) { + assert.commandFailedWithCode(applyOpsRes, errorCode); + } else { + assert.commandWorked(applyOpsRes); + // Ensure that the diff was applied correctly and that 'a' now is equal to 0. + testDoc.a = 0; + assert.eq(getTestDoc(coll), testDoc); + } +} + +/** + * Helper function which runs an update that would be eligible to use a $v: 2 oplog entry, and + * then checks that the most recent oplog entry for the updated document is *not* $v: 2. + */ +function runUpdateAndCheckV2EntriesNotLogged(coll) { + let testDoc = getTestDoc(coll); + // Run an update which would be eligible to use a $v: 2 entry. + runV2EligibleUpdate(coll); + + // Check that the oplog entry we create is not $v: 2. + const oplogEntry = getLatestOplogEntryForDoc(coll.getDB()); + assert.neq(oplogEntry.o.$v, 2, oplogEntry); + assert(!oplogEntry.o.hasOwnProperty("diff")); + + // Check that the update was applied, which incremented 'a'. + testDoc.a += 1; + assert.eq(getTestDoc(coll), testDoc); +} + +/** + * Similar to runUpdateAndCheckV2EntriesNotLogged() but checks that a $v: 2 entry _is_ logged. + */ +function runUpdateAndCheckV2EntriesLogged(coll) { + let testDoc = getTestDoc(coll); + + // Run an update which would be eligible to use a $v: 2 entry. + runV2EligibleUpdate(coll); + + // Check that the oplog entry we create is $v: 2. + const oplogEntry = getLatestOplogEntryForDoc(coll.getDB()); + assert.eq(oplogEntry.o.$v, 2, oplogEntry); + assert.eq(typeof (oplogEntry.o.diff), "object"); + + // Check that the update was applied, which incremented 'a'. + testDoc.a += 1; + assert.eq(getTestDoc(coll), testDoc); +} + +const rst = new ReplSetTest({nodes: 2, nodeOpts: {noCleanData: true}}); + +// Start a last stable replica set and ensure that $v: 2 oplog entries are not logged. +(function runLastStable() { + rst.startSet({binVersion: kLastStable}); + // Initiate with high election timeout to prevent unplanned elections from happening. + rst.initiateWithHighElectionTimeout(); + + const primaryDB = rst.getPrimary().getDB("test"); + const coll = primaryDB[kCollName]; + + // Insert the test document. + assert.commandWorked(coll.insert({_id: kTestDocId, a: 1, padding: kGiantStr})); + + // We expect the primary not to log $v:2 oplog entries or to allow application of + // them via applyOps. + runUpdateAndCheckV2EntriesNotLogged(coll); + + // The error code used by 4.4 in this scenario is different from the one used in 4.6. + const k44ApplyOpsUnknownUpdateVersionErrorCode = 40682; + checkApplyOpsOfV2Entries(coll, k44ApplyOpsUnknownUpdateVersionErrorCode); + + rst.stopSet( + null, // signal + true // for restart + ); +})(); + +// Start a latest replica set using the same data files. The set should start in FCV 4.4 by +// default. +(function runLatest() { + const nodes = rst.startSet({restart: true, binVersion: kLatest}); + rst.awaitNodesAgreeOnAppliedOpTime(); + // Step up node 0. Since we started with a high election timeout this would otherwise + // take a while. + assert.commandWorked(nodes[0].adminCommand({replSetStepUp: 1})); + + const primaryAdminDB = rst.getPrimary().getDB("admin"); + checkFCV(primaryAdminDB, kLastStable); + const coll = rst.getPrimary().getDB("test")[kCollName]; + const oplog = primaryAdminDB.getSiblingDB("local")["oplog.rs"]; + + // We should not log, or allow application of $v: 2 entries while in FCV 4.4. + runUpdateAndCheckV2EntriesNotLogged(coll); + checkApplyOpsOfV2Entries(coll, kErrorCodeThrownWhenApplyingV2OplogEntriesWithFCV44); + + // Now set the feature flag which allows $v: 2 oplog entries. Even with the flag enabled, + // $v:2 oplog entries should not be used because the FCV is still 4.4. + enableV2OplogEntries(rst); + runUpdateAndCheckV2EntriesNotLogged(coll); + checkApplyOpsOfV2Entries(coll, kErrorCodeThrownWhenApplyingV2OplogEntriesWithFCV44); + + // Upgrade to the new FCV. Now $v: 2 oplog entries will be logged (and applied). + assert.commandWorked(primaryAdminDB.runCommand({setFeatureCompatibilityVersion: latestFCV})); + checkFCV(primaryAdminDB, latestFCV); + runUpdateAndCheckV2EntriesLogged(coll); + checkApplyOpsOfV2Entries(coll); + + rst.awaitReplication(); + + // Disable the flag which allows *logging* of $v: 2 entries, but only on the secondary. We + // check that nothing goes wrong when the primary logs a $v: 2 oplog entry. The secondary + // should still be able to apply it. + assert.commandWorked(rst.getSecondary().adminCommand( + {setParameter: 1, internalQueryEnableLoggingV2OplogEntries: false})); + runUpdateAndCheckV2EntriesLogged(coll); + checkApplyOpsOfV2Entries(coll); + rst.awaitReplication(); + + // Check that if we disable the flag which allows *logging* of $v: 2 entries on the primary + // but enable it on the secondary, a $v: 2 entry is not used. + assert.commandWorked(rst.getSecondary().adminCommand( + {setParameter: 1, internalQueryEnableLoggingV2OplogEntries: true})); + assert.commandWorked(rst.getPrimary().adminCommand( + {setParameter: 1, internalQueryEnableLoggingV2OplogEntries: false})); + runUpdateAndCheckV2EntriesNotLogged(coll); + + // Although we don't expect a $v: 2 entry to be logged, application should still be allowed. + checkApplyOpsOfV2Entries(coll); + rst.awaitReplication(); + + // Re-enable the parameter that controls logging of $v: 2 oplog entries on both the primary + // and second. Then, downgrade the FCV back to 4.4. $v: 2 oplog entries should not be used. + enableV2OplogEntries(rst); + + assert.commandWorked(primaryAdminDB.runCommand({setFeatureCompatibilityVersion: lastLTSFCV})); + checkFCV(primaryAdminDB, lastLTSFCV); + runUpdateAndCheckV2EntriesNotLogged(coll); + checkApplyOpsOfV2Entries(coll, kErrorCodeThrownWhenApplyingV2OplogEntriesWithFCV44); + rst.awaitReplication(); + + /** + * Function intended to be run by a parallel shell that will downgrade the FCV. + */ + function downgradeFCVParallelShellFn() { + assert.commandWorked( + db.getSiblingDB("admin").runCommand({setFeatureCompatibilityVersion: lastLTSFCV})); + } + + /** + * Waits until an oplog entry which puts the set into a "downgrading" state is logged at time + * greater than 'afterTs'. + */ + function waitUntilDowngradingOplogEntryLogged(afterTs) { + assert.soon(function() { + // Wait until a (recent) oplog entry changing the FCV document appears. + return oplog.find({ts: {$gt: afterTs}, "o.targetVersion": "4.4"}).itcount() > 0; + }); + } + + // We will now test the situation where the FCV is downgraded _while_ a $v:2 eligible pipeline + // update is running. In particular, we want to check that if a $v: 2 entry is logged + // while the RS is in the "downgrading" state (meaning the FCV target version is set to 4.4) + // nothing bad happens. + (function testFCVDowngradeWhileV2UpdateRuns() { + // First set the FCV back to latest. + assert.commandWorked( + primaryAdminDB.runCommand({setFeatureCompatibilityVersion: latestFCV})); + checkFCV(primaryAdminDB, latestFCV); + + // Determine the last timestamp in the oplog. We'll use this later when checking that the + // sequence of oplog entries recorded is expected. + const lastTs = oplog.find().hint({$natural: -1}).limit(1).toArray()[0].ts; + + // Insert another test document. + assert.commandWorked(coll.insert({_id: 1, padding: kGiantStr})); + + // First we are going to run a $v:2 eligible update and have it hang after it checks the + // FCV. It will read a value of 4.5/4.6, and based on that, decide to log a $v: 2 oplog + // entry. + + const kPipelineFCVCheckFPName = "hangAfterPipelineUpdateFCVCheck"; + assert.commandWorked(primaryAdminDB.runCommand( + {configureFailPoint: kPipelineFCVCheckFPName, mode: "alwaysOn"})); + function runUpdateParallelShellFn() { + assert.commandWorked( + db.v2_delta_oplog_entries_fcv.update({_id: 1}, [{$set: {"x": 0}}])); + } + const joinUpdateShell = startParallelShell(runUpdateParallelShellFn, rst.getPrimary().port); + + // Wait until the update is hanging on the fail point. This means that the update has read + // the FCV value and "decided" to take the $v: 2 code path. + waitForCurOpByFailPointNoNS(coll.getDB(), kPipelineFCVCheckFPName); + + // Now start another parallel shell to downgrade the FCV. + const joinDowngradeFCV = + startParallelShell(downgradeFCVParallelShellFn, rst.getPrimary().port); + + // This downgrade of the FCV will record an oplog entry which indicates that the set is in a + // "downgrading" state. After that, the primary will block attempting to acquire a MODE_S + // lock, which conflicts with the update's MODE_IX lock. + waitUntilDowngradingOplogEntryLogged(lastTs); + + // Unblock the update command currently running in a parallel shell. It will log a $v: 2 + // update oplog entry. + assert.commandWorked( + primaryAdminDB.runCommand({configureFailPoint: kPipelineFCVCheckFPName, mode: "off"})); + joinUpdateShell(); + + // Now that the update has run, the FCV change can complete. + joinDowngradeFCV(); + + // Check that the sequence of oplog entries is right. We expect to see the following + // sequence, in ascending order by timestamp: + // 1) Set target FCV to 4.4 + // 2) $v:2 update + // 3) Set FCV to 4.4 (removing the 'targetVersion') + // There may be other operations which happen in between these three, such as noop writes + // and so on, so we find the timestamps for (1), (2) and (3) and check that they are in the + // correct order. + + // Find the ts of (1). + const setTargetFCVTimestamp = + oplog.find({ts: {$gt: lastTs}, "o.targetVersion": "4.4"}).limit(1).toArray()[0].ts; + + // Find the ts of (2). + const v2UpdateTimestamp = + oplog.find({ts: {$gt: lastTs}, op: "u", "o.$v": 2}).limit(1).toArray()[0].ts; + + // Find the ts of (3). + const setFCVFinalTimestamp = + oplog + .find( + {ts: {$gt: setTargetFCVTimestamp}, "o.targetVersion": null, "o.version": "4.4"}) + .limit(1) + .toArray()[0] + .ts; + + function dumpOplog() { + oplog.find().hint({$natural: -1}).limit(10).toArray(); + } + assert.lt(setTargetFCVTimestamp, v2UpdateTimestamp, dumpOplog); + assert.lt(v2UpdateTimestamp, setFCVFinalTimestamp, dumpOplog); + + rst.awaitReplication(); + // Done. The sequence of oplog entries was expected, and the secondaries had no issues + // applying the $v: 2 update while in the downgrading state. + })(); + + // Now we check that manual application of $v: 2 entries via applyOps while in the downgrading + // state is banned. + (function checkCannotUseV2EntryWithApplyOpsWhileDowngrading() { + // First set the FCV back to latest. + assert.commandWorked( + primaryAdminDB.runCommand({setFeatureCompatibilityVersion: latestFCV})); + checkFCV(primaryAdminDB, latestFCV); + + // Determine the last timestamp in the oplog. + const lastTs = oplog.find().sort({ts: -1}).limit(1).toArray()[0].ts; + + const kHangWhileDowngradingFP = "hangWhileDowngrading"; + primaryAdminDB.runCommand({configureFailPoint: kHangWhileDowngradingFP, mode: "alwaysOn"}); + + // Begin a downgrade. It will hang while in the "downgrading" state. + const joinDowngradeFCV = + startParallelShell(downgradeFCVParallelShellFn, rst.getPrimary().port); + + // Wait until the node enters a "downgrading" state. + waitUntilDowngradingOplogEntryLogged(lastTs); + + // The primary is now in a "downgrading" state. Attempts to log $v:2 oplog entries with + // apply ops should fail. + checkApplyOpsOfV2Entries(coll, kErrorCodeThrownWhenApplyingV2OplogEntriesWithFCV44); + + // Disable the fail point and join with the parallel shell. + primaryAdminDB.runCommand({configureFailPoint: kHangWhileDowngradingFP, mode: "off"}); + joinDowngradeFCV(); + })(); + + // Finally, as a last sanity test, we add a new node to the replica set. This should not + // cause any issues. + const newSecondary = rst.add({binVersion: kLatest}); + rst.reInitiate(); + rst.awaitReplication(); + + rst.stopSet(); +})(); +})(); diff --git a/src/mongo/db/auth/authz_manager_external_state_mock.cpp b/src/mongo/db/auth/authz_manager_external_state_mock.cpp index ff292b3ab86..1e2c22934b3 100644 --- a/src/mongo/db/auth/authz_manager_external_state_mock.cpp +++ b/src/mongo/db/auth/authz_manager_external_state_mock.cpp @@ -200,8 +200,13 @@ Status AuthzManagerExternalStateMock::updateOne(OperationContext* opCtx, const FieldRefSet emptyImmutablePaths; const bool isInsert = false; BSONObj logObj; - status = driver.update( - StringData(), &document, validateForStorage, emptyImmutablePaths, isInsert, &logObj); + status = driver.update(opCtx, + StringData(), + &document, + validateForStorage, + emptyImmutablePaths, + isInsert, + &logObj); if (!status.isOK()) return status; BSONObj newObj = document.getObject().copy(); @@ -229,7 +234,7 @@ Status AuthzManagerExternalStateMock::updateOne(OperationContext* opCtx, const FieldRefSet emptyImmutablePaths; const bool isInsert = false; status = driver.update( - StringData(), &document, validateForStorage, emptyImmutablePaths, isInsert); + opCtx, StringData(), &document, validateForStorage, emptyImmutablePaths, isInsert); if (!status.isOK()) { return status; } diff --git a/src/mongo/db/auth/role_graph_update.cpp b/src/mongo/db/auth/role_graph_update.cpp index 0307f4ef187..f8945340274 100644 --- a/src/mongo/db/auth/role_graph_update.cpp +++ b/src/mongo/db/auth/role_graph_update.cpp @@ -227,7 +227,7 @@ Status handleOplogUpdate(OperationContext* opCtx, const FieldRefSet emptyImmutablePaths; bool isInsert = false; status = driver.update( - StringData(), &roleDocument, validateForStorage, emptyImmutablePaths, isInsert); + opCtx, StringData(), &roleDocument, validateForStorage, emptyImmutablePaths, isInsert); if (!status.isOK()) return status; diff --git a/src/mongo/db/exec/update_stage.cpp b/src/mongo/db/exec/update_stage.cpp index 5534821ca6b..51e0abfae2c 100644 --- a/src/mongo/db/exec/update_stage.cpp +++ b/src/mongo/db/exec/update_stage.cpp @@ -184,7 +184,8 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, Reco } if (!driver->needMatchDetails()) { // If we don't need match details, avoid doing the rematch - status = driver->update(StringData(), + status = driver->update(opCtx(), + StringData(), &_doc, _isUserInitiatedWrite, immutablePaths, @@ -203,7 +204,8 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, Reco if (matchDetails.hasElemMatchKey()) matchedField = matchDetails.elemMatchKey(); - status = driver->update(matchedField, + status = driver->update(opCtx(), + matchedField, &_doc, _isUserInitiatedWrite, immutablePaths, diff --git a/src/mongo/db/exec/upsert_stage.cpp b/src/mongo/db/exec/upsert_stage.cpp index cb1f112a8db..831946beed0 100644 --- a/src/mongo/db/exec/upsert_stage.cpp +++ b/src/mongo/db/exec/upsert_stage.cpp @@ -230,7 +230,7 @@ void UpsertStage::_generateNewDocumentFromUpdateOp(const FieldRefSet& immutableP const bool validateForStorage = false; const bool isInsert = true; uassertStatusOK( - _params.driver->update({}, &_doc, validateForStorage, immutablePaths, isInsert)); + _params.driver->update(opCtx(), {}, &_doc, validateForStorage, immutablePaths, isInsert)); }; void UpsertStage::_generateNewDocumentFromSuppliedDoc(const FieldRefSet& immutablePaths) { @@ -255,7 +255,7 @@ void UpsertStage::_generateNewDocumentFromSuppliedDoc(const FieldRefSet& immutab const bool validateForStorage = false; const bool isInsert = true; uassertStatusOK( - replacementDriver.update({}, &_doc, validateForStorage, immutablePaths, isInsert)); + replacementDriver.update(opCtx(), {}, &_doc, validateForStorage, immutablePaths, isInsert)); } void UpsertStage::_assertDocumentToBeInsertedIsValid(const mb::Document& document, diff --git a/src/mongo/db/ops/update.cpp b/src/mongo/db/ops/update.cpp index cbf8200b340..e55608644b5 100644 --- a/src/mongo/db/ops/update.cpp +++ b/src/mongo/db/ops/update.cpp @@ -101,5 +101,4 @@ UpdateResult update(OperationContext* opCtx, Database* db, const UpdateRequest& return UpdateStage::makeUpdateResult(updateStats); } - } // namespace mongo diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index de480f38378..a5c8109de41 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -1225,7 +1225,47 @@ Status applyOperation_inlock(OperationContext* opCtx, auto request = UpdateRequest(); request.setNamespaceString(requestNss); request.setQuery(updateCriteria); - request.setUpdateModification(write_ops::UpdateModification::parseFromOplogEntry(o)); + auto updateMod = write_ops::UpdateModification::parseFromOplogEntry(o); + if (updateMod.type() == write_ops::UpdateModification::Type::kDelta) { + // We may only use delta oplog entries when in FCV 4.5 or in the "downgrading to + // 4.4" state. The latter case can happen when a $v:2 update is logged in the + // window between the oplog entry which sets the target FCV to 4.4 (putting the + // node in a "downgrading state") and the entry which removes the target FCV + // (putting the node in a "downgraded" state). + // + // However, we should not allow user-run applyOps to log $v:2 updates while the FCV + // is in the "downgrading to 4.4" state. If we did so, it would be possible for the + // following sequence of events to happen: + // + // (Thread A): Begins {setFCV: "4.4"} command, which sets the target version to + // 4.4. The server is now in a "downgrading" state. + // + // (Thread A) Takes MODE_S lock and then immediately drops it. It will proceed on + // the assumption that no new commands will perform 4.4-incompatible writes. + // + // (Thread B) Runs applyOps, checks the FCV, and sees that the server is in a + // downgrading state. It continues on the $v:2 oplog entry code path. + // + // (Thread A) setFCV command completes. The server is now in FCV 4.4. + // + // (Thread B) The applyOps completes, and a $v:2 oplog entry is logged. + // + // This would mean that a 4.4-incompatible write is performed after the FCV is set + // to 4.4, which is illegal. + const auto fcvVersion = serverGlobalParams.featureCompatibility.getVersion(); + const bool fromApplyOpsCmd = mode == OplogApplication::Mode::kApplyOpsCmd; + + uassert(4773100, + "Delta oplog entries may not be used in FCV below 4.5", + fcvVersion == + ServerGlobalParams::FeatureCompatibility::Version::kVersion451 || + (!fromApplyOpsCmd && + fcvVersion == + ServerGlobalParams::FeatureCompatibility::Version:: + kDowngradingFrom451To44)); + } + + request.setUpdateModification(std::move(updateMod)); request.setUpsert(upsert); request.setFromOplogApplication(true); diff --git a/src/mongo/db/update/update_driver.cpp b/src/mongo/db/update/update_driver.cpp index 92dca56582f..c66512526a8 100644 --- a/src/mongo/db/update/update_driver.cpp +++ b/src/mongo/db/update/update_driver.cpp @@ -35,6 +35,7 @@ #include "mongo/bson/mutable/algorithm.h" #include "mongo/bson/mutable/document.h" #include "mongo/db/bson/dotted_path_support.h" +#include "mongo/db/curop_failpoint_helpers.h" #include "mongo/db/field_ref.h" #include "mongo/db/matcher/expression_leaf.h" #include "mongo/db/matcher/extensions_callback_noop.h" @@ -51,6 +52,8 @@ namespace mongo { +MONGO_FAIL_POINT_DEFINE(hangAfterPipelineUpdateFCVCheck); + using pathsupport::EqualityMatches; namespace { @@ -147,6 +150,9 @@ void UpdateDriver::parse( "arrayFilters may not be specified for delta-syle updates", arrayFilters.empty()); + // Delta updates should only be applied as part of oplog application. + invariant(_fromOplogApplication); + _updateType = UpdateType::kDelta; _updateExecutor = std::make_unique<DeltaExecutor>(updateMod.getDiff()); return; @@ -240,7 +246,8 @@ Status UpdateDriver::populateDocumentWithQueryFields(const CanonicalQuery& query return status; } -Status UpdateDriver::update(StringData matchedField, +Status UpdateDriver::update(OperationContext* opCtx, + StringData matchedField, mutablebson::Document* doc, bool validateForStorage, const FieldRefSet& immutablePaths, @@ -265,9 +272,19 @@ Status UpdateDriver::update(StringData matchedField, invariant(!modifiedPaths || modifiedPaths->empty()); if (_logOp && logOpRec) { - applyParams.logMode = internalQueryEnableLoggingV2OplogEntries.load() + const auto& fcv = serverGlobalParams.featureCompatibility; + const bool fcvAllowsV2Entries = fcv.isVersionInitialized() && + fcv.getVersion() == ServerGlobalParams::FeatureCompatibility::Version::kVersion451; + + applyParams.logMode = fcvAllowsV2Entries && internalQueryEnableLoggingV2OplogEntries.load() ? ApplyParams::LogMode::kGenerateOplogEntry : ApplyParams::LogMode::kGenerateOnlyV1OplogEntry; + + if (MONGO_unlikely(hangAfterPipelineUpdateFCVCheck.shouldFail()) && + type() == UpdateType::kPipeline) { + CurOpFailpointHelpers::waitWhileFailPointEnabled( + &hangAfterPipelineUpdateFCVCheck, opCtx, "hangAfterPipelineUpdateFCVCheck"); + } } invariant(_updateExecutor); diff --git a/src/mongo/db/update/update_driver.h b/src/mongo/db/update/update_driver.h index fe035195cfb..5df4d80293f 100644 --- a/src/mongo/db/update/update_driver.h +++ b/src/mongo/db/update/update_driver.h @@ -115,7 +115,8 @@ public: * The caller must either provide a null pointer, or a non-null pointer to an empty field ref * set. */ - Status update(StringData matchedField, + Status update(OperationContext* opCtx, + StringData matchedField, mutablebson::Document* doc, bool validateForStorage, const FieldRefSet& immutablePaths, diff --git a/src/mongo/db/update/update_driver_test.cpp b/src/mongo/db/update/update_driver_test.cpp index 18ac94213f7..5c736122c65 100644 --- a/src/mongo/db/update/update_driver_test.cpp +++ b/src/mongo/db/update/update_driver_test.cpp @@ -185,8 +185,14 @@ TEST(Collator, SetCollationUpdatesModifierInterfaces) { bool modified = false; mutablebson::Document doc(fromjson("{a: 'cba'}")); driver.setCollator(&reverseStringCollator); - ASSERT_OK(driver.update( - StringData(), &doc, validateForStorage, emptyImmutablePaths, isInsert, nullptr, &modified)); + ASSERT_OK(driver.update(expCtx->opCtx, + StringData(), + &doc, + validateForStorage, + emptyImmutablePaths, + isInsert, + nullptr, + &modified)); ASSERT_TRUE(modified); } @@ -579,7 +585,8 @@ public: const FieldRefSet emptyImmutablePaths; const bool isInsert = false; FieldRefSetWithStorage modifiedPaths; - ASSERT_OK(driver.update(matchedField, + ASSERT_OK(driver.update(expCtx->opCtx, + matchedField, doc, validateForStorage, emptyImmutablePaths, diff --git a/src/mongo/embedded/stitch_support/stitch_support.cpp b/src/mongo/embedded/stitch_support/stitch_support.cpp index c4711b5c719..41693e92cd8 100644 --- a/src/mongo/embedded/stitch_support/stitch_support.cpp +++ b/src/mongo/embedded/stitch_support/stitch_support.cpp @@ -591,7 +591,8 @@ stitch_support_v1_update_apply(stitch_support_v1_update* const update, mongo::FieldRefSetWithStorage modifiedPaths; - uassertStatusOK(update->updateDriver.update(matchedField, + uassertStatusOK(update->updateDriver.update(update->opCtx.get(), + matchedField, &mutableDoc, false /* validateForStorage */, immutablePaths, @@ -637,7 +638,8 @@ uint8_t* MONGO_API_CALL stitch_support_v1_update_upsert(stitch_support_v1_update mutableDoc)); } - uassertStatusOK(update->updateDriver.update(mongo::StringData() /* matchedField */, + uassertStatusOK(update->updateDriver.update(update->opCtx.get(), + mongo::StringData() /* matchedField */, &mutableDoc, false /* validateForStorage */, immutablePaths, |