diff options
author | Jason Zhang <jason.zhang@mongodb.com> | 2023-02-14 21:06:25 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-02-15 01:59:50 +0000 |
commit | 77c03f9e26931596059122c251bf875c190123e3 (patch) | |
tree | aca2fe4ffbafb1add38f557f24e4e8e8c0d71e80 | |
parent | d1fbeed2f867f9c51e6f4e0c3739fcd771567fb2 (diff) | |
download | mongo-77c03f9e26931596059122c251bf875c190123e3.tar.gz |
SERVER-70581 Handle WCOS for update and findAndModify if replacement document changes data placement
14 files changed, 855 insertions, 501 deletions
diff --git a/jstests/sharding/resharding_replicate_updates_as_insert_delete.js b/jstests/sharding/resharding_replicate_updates_as_insert_delete.js index d3d26575e7c..16f831d7cd1 100644 --- a/jstests/sharding/resharding_replicate_updates_as_insert_delete.js +++ b/jstests/sharding/resharding_replicate_updates_as_insert_delete.js @@ -12,7 +12,6 @@ load('jstests/libs/discover_topology.js'); load('jstests/sharding/libs/resharding_test_fixture.js'); load('jstests/sharding/libs/sharded_transactions_helpers.js'); -load("jstests/sharding/updateOne_without_shard_key/libs/write_without_shard_key_test_util.js"); const reshardingTest = new ReshardingTest({numDonors: 2, numRecipients: 2, reshardInPlace: true}); reshardingTest.setup(); @@ -75,19 +74,11 @@ reshardingTest.withReshardingInBackground( // ErrorCodes.InvalidOptions, 'was able to update value under new shard key when {multi: true} specified'); - // Sharded updateOnes that do not directly target a shard can now use the two phase write - // protocol to execute. - if (WriteWithoutShardKeyTestUtil.isWriteWithoutShardKeyFeatureEnabled(mongos)) { - // TODO: SERVER-70581 Handle WCOS for update and findAndModify if replacement document - // changes data placement - // assert.commandWorked(sessionColl.update({_id: 0}, {$set: {y: 10}})); - } else { - assert.commandFailedWithCode( - sessionColl.update({_id: 0}, {$set: {y: 10}}), - 31025, - 'was able to update value under new shard key without specifying the full shard ' + - 'key in the query'); - } + assert.commandFailedWithCode( + sessionColl.update({_id: 0}, {$set: {y: 10}}), + 31025, + 'was able to update value under new shard key without specifying the full shard ' + + 'key in the query'); let res; assert.soon( diff --git a/jstests/sharding/server_status_crud_metrics.js b/jstests/sharding/server_status_crud_metrics.js index 4764a308f1f..5d1f5943fff 100644 --- a/jstests/sharding/server_status_crud_metrics.js +++ b/jstests/sharding/server_status_crud_metrics.js @@ -57,15 +57,21 @@ assert.commandWorked(testColl.update({x: 1}, {x: 1, a: 1})); // Sharded deleteOnes that do not directly target a shard can now use the two phase write // protocol to execute. if (WriteWithoutShardKeyTestUtil.isWriteWithoutShardKeyFeatureEnabled(st.s)) { - // TODO: SERVER-70581 Handle WCOS for update and findAndModify if replacement document changes - // data placement - - // Could match a different document on retry. - // assert.commandWorked(testColl.update({}, {$set: {x: 2}}, {multi: false})); - - // TODO: SERVER-69918 Implement upsert behavior for _clusterQueryWithoutShardKey - // assert.commandWorked(testColl.update({_id: 1}, {$set: {x: 2}}, {upsert: true}), - // ErrorCodes.ShardKeyNotFound); + const testColl2 = testDB.testColl2; + + // Shard testColl2 on {x:1}, split it at {x:0}, and move chunk {x:1} to shard1. This collection + // is used to for the update below which would use the write without shard key protocol, but + // since the query is unspecified, any 1 random document could be modified. In order to not + // break the state of the original test 'testColl', 'testColl2' is used specifically for the + // single update below. + st.shardColl(testColl2, {x: 1}, {x: 0}, {x: 1}); + + assert.commandWorked(testColl2.insert({x: 1, _id: 1})); + assert.commandWorked(testColl2.insert({x: -1, _id: 0})); + let updateRes = assert.commandWorked(testColl2.update({}, {$set: {x: 2}}, {multi: false})); + assert.eq(1, updateRes.nMatched); + assert.eq(1, updateRes.nModified); + assert.eq(testColl2.find({x: 2}).itcount(), 1); // Shouldn't increment the metrics for unsharded collection. assert.commandWorked(unshardedColl.update({_id: "missing"}, {$set: {a: 1}}, {multi: false})); @@ -78,8 +84,9 @@ if (WriteWithoutShardKeyTestUtil.isWriteWithoutShardKeyFeatureEnabled(st.s)) { mongosServerStatus = testDB.adminCommand({serverStatus: 1}); - // Verify that only the first four upserts incremented the metric counter. - assert.eq(4, mongosServerStatus.metrics.query.updateOneOpStyleBroadcastWithExactIDCount); + // TODO: SERVER-69810 ServerStatus metrics for tracking number of + // updateOnes/deleteOnes/findAndModifies + assert.eq(5, mongosServerStatus.metrics.query.updateOneOpStyleBroadcastWithExactIDCount); } else { // Shouldn't increment the metric when routing fails. assert.commandFailedWithCode(testColl.update({}, {$set: {x: 2}}, {multi: false}), diff --git a/jstests/sharding/updateOne_without_shard_key/libs/write_without_shard_key_test_util.js b/jstests/sharding/updateOne_without_shard_key/libs/write_without_shard_key_test_util.js index 2024d0757aa..68af2705fe2 100644 --- a/jstests/sharding/updateOne_without_shard_key/libs/write_without_shard_key_test_util.js +++ b/jstests/sharding/updateOne_without_shard_key/libs/write_without_shard_key_test_util.js @@ -92,7 +92,7 @@ var WriteWithoutShardKeyTestUtil = (function() { cmdObj, operationType, expectedResponse, - expectedRetryResponse) { + expectedRetryResponse = {}) { assert.commandWorked(conn.getCollection(collName).insert(docsToInsert)); let res = assert.commandWorked(conn.runCommand(cmdObj)); if (operationType === OperationType.updateOne) { diff --git a/jstests/sharding/updateOne_without_shard_key/updateOne_without_shard_key_basic.js b/jstests/sharding/updateOne_without_shard_key/updateOne_without_shard_key_basic.js index e430c64e5e5..f86d015647c 100644 --- a/jstests/sharding/updateOne_without_shard_key/updateOne_without_shard_key_basic.js +++ b/jstests/sharding/updateOne_without_shard_key/updateOne_without_shard_key_basic.js @@ -319,16 +319,15 @@ const testCases = [ {_id: 1, x: xFieldValShard0_2, y: yFieldVal} ], - replacementDocTest: true, + replacementDocTest: true, // Replacement tests validate that the final replacement + // operation was only applied once. cmdObj: { update: collName, updates: [{q: {y: yFieldVal}, u: {x: xFieldValShard0_2 - 1, y: yFieldVal, a: setFieldVal}}] }, options: [{ordered: true}, {ordered: false}], - expectedMods: [ - {x: xFieldValShard0_2 - 1, y: yFieldVal, a: setFieldVal} - ], // Expect only one document to have been replaced. + expectedMods: [{x: xFieldValShard0_2 - 1, y: yFieldVal, a: setFieldVal}], expectedResponse: {n: 1, nModified: 1}, dbName: dbName, collName: collName @@ -341,7 +340,8 @@ const testCases = [ {_id: 1, x: xFieldValShard0_2, y: yFieldVal} ], - replacementDocTest: true, + replacementDocTest: true, // Replacement tests validate that the final replacement + // operation was only applied once. cmdObj: { update: collName, updates: [ @@ -350,9 +350,7 @@ const testCases = [ ] }, options: [{ordered: true}, {ordered: false}], - expectedMods: [ - {x: xFieldValShard0_2 - 1, y: yFieldVal, z: setFieldVal} - ], // Expect only one document to have been replaced. + expectedMods: [{x: xFieldValShard0_2 - 1, y: yFieldVal, z: setFieldVal}], expectedResponse: {n: 2, nModified: 2}, dbName: dbName, collName: collName @@ -365,7 +363,8 @@ const testCases = [ {_id: 1, x: xFieldValShard0_2, y: yFieldVal} ], - replacementDocTest: true, + replacementDocTest: true, // Replacement tests validate that the final replacement + // operation was only applied once. cmdObj: { update: collName, updates: [ @@ -377,15 +376,80 @@ const testCases = [ ], }, options: [{ordered: true}, {ordered: false}], - expectedMods: [ - {x: xFieldValShard0_2 - 1, y: yFieldVal, b: setFieldVal} - ], // Expect only one document to have been replaced. + expectedMods: [{x: xFieldValShard0_2 - 1, y: yFieldVal, b: setFieldVal}], + expectedResponse: {n: 2, nModified: 2}, + dbName: dbName, + collName: collName + }, + { + logMessage: "Running single replacement style update with shard key and updateOne " + + "without shard key on different shards.", + docsToInsert: [ + {_id: 0, x: xFieldValShard0_1, y: yFieldVal}, + {_id: 1, x: xFieldValShard1_1, y: yFieldVal} + ], + + replacementDocTest: true, // Replacement tests validate that the final replacement + // operation was only applied once. + cmdObj: { + update: collName, + updates: [{q: {y: yFieldVal}, u: {x: xFieldValShard0_2, y: yFieldVal, a: setFieldVal}}] + }, + options: [{ordered: true}, {ordered: false}], + expectedMods: [{x: xFieldValShard0_2, y: yFieldVal, a: setFieldVal}], + expectedResponse: {n: 1, nModified: 1}, + dbName: dbName, + collName: collName + }, + { + logMessage: "Running multiple replacement style update with shard key and updateOne " + + "without shard key on different shards.", + docsToInsert: [ + {_id: 0, x: xFieldValShard0_1, y: yFieldVal}, + {_id: 1, x: xFieldValShard1_1, y: yFieldVal} + ], + + replacementDocTest: true, // Replacement tests validate that the final replacement + // operation was only applied once. + cmdObj: { + update: collName, + updates: [ + {q: {y: yFieldVal}, u: {x: xFieldValShard0_2, y: yFieldVal, a: setFieldVal}}, + {q: {y: yFieldVal}, u: {x: xFieldValShard0_2 - 1, y: yFieldVal, z: setFieldVal}} + ] + }, + options: [{ordered: true}, {ordered: false}], + expectedMods: [{x: xFieldValShard0_2 - 1, y: yFieldVal, z: setFieldVal}], + expectedResponse: {n: 2, nModified: 2}, + dbName: dbName, + collName: collName + }, + { + logMessage: "Running mixed replacement style update with shard key and updateOne " + + "without shard key for documents on different shards.", + docsToInsert: [ + {_id: 0, x: xFieldValShard0_1, y: yFieldVal}, + {_id: 1, x: xFieldValShard1_1, y: yFieldVal} + ], + + replacementDocTest: true, // Replacement tests validate that the final replacement + // operation was only applied once. + cmdObj: { + update: collName, + updates: [ + { + q: {x: xFieldValShard0_1}, + u: {x: xFieldValShard0_2, y: yFieldVal, a: setFieldVal} + }, + {q: {y: yFieldVal}, u: {x: xFieldValShard0_2 - 1, y: yFieldVal, b: setFieldVal}}, + ], + }, + options: [{ordered: true}, {ordered: false}], + expectedMods: [{x: xFieldValShard0_2 - 1, y: yFieldVal, b: setFieldVal}], expectedResponse: {n: 2, nModified: 2}, dbName: dbName, collName: collName }, - // TODO SERVER-70581: Handle WCOS for update and findAndModify if replacement document changes - // data placement ]; const configurations = [ diff --git a/jstests/sharding/updateOne_without_shard_key/would_change_owning_shard_test.js b/jstests/sharding/updateOne_without_shard_key/would_change_owning_shard_test.js new file mode 100644 index 00000000000..0b278644cfd --- /dev/null +++ b/jstests/sharding/updateOne_without_shard_key/would_change_owning_shard_test.js @@ -0,0 +1,84 @@ +/** + * Tests the behavior of updates and findAndModifys that would change the owning shard of a + * document. + * + * @tags: [ + * requires_sharding, + * requires_fcv_63, + * featureFlagUpdateOneWithoutShardKey, + * featureFlagUpdateDocumentShardKeyUsingTransactionApi + * ] + */ + +(function() { +"use strict"; + +load("jstests/sharding/updateOne_without_shard_key/libs/write_without_shard_key_test_util.js"); + +// Make sure we're testing with no implicit session. +TestData.disableImplicitSessions = true; + +// 2 shards single node, 1 mongos, 1 config server 3-node. +const st = new ShardingTest({}); +const dbName = "testDb"; +const collName = "testColl"; +const nss = dbName + "." + collName; +const splitPoint = 0; +const shardKey1 = -2; +const shardKey2 = 2; +const docsToInsert = [{_id: 0, x: shardKey1, y: 1}]; + +// Sets up a 2 shard cluster using 'x' as a shard key where Shard 0 owns x < +// splitPoint and Shard 1 splitPoint >= 0. +WriteWithoutShardKeyTestUtil.setupShardedCollection( + st, nss, {x: 1}, [{x: splitPoint}], [{query: {x: splitPoint}, shard: st.shard1.shardName}]); + +let testCases = [ + { + logMessage: "Running WouldChangeOwningShard update without shard key", + docsToInsert: docsToInsert, + cmdObj: { + update: collName, + updates: [{q: {y: 1}, u: {x: shardKey2, y: 1}}], + }, + replacementDocTest: true, + options: [{ordered: true}, {ordered: false}], + expectedMods: [{x: shardKey2, y: 1}], + expectedResponse: {n: 1, nModified: 1}, + dbName: dbName, + collName: collName, + opType: WriteWithoutShardKeyTestUtil.OperationType.updateOne, + }, + { + logMessage: "Running WouldChangeOwningShard findAndModify without shard key", + docsToInsert: docsToInsert, + cmdObj: { + findAndModify: collName, + query: {y: 1}, + update: {x: shardKey2, y: 1}, + }, + replacementDocTest: true, + expectedMods: [{x: shardKey2, y: 1}], + expectedResponse: {lastErrorObject: {n: 1, updatedExisting: true}}, + dbName: dbName, + collName: collName, + opType: WriteWithoutShardKeyTestUtil.OperationType.findAndModifyUpdate, + }, +]; + +const configurations = [ + WriteWithoutShardKeyTestUtil.Configurations.noSession, + WriteWithoutShardKeyTestUtil.Configurations.sessionNotRetryableWrite, + WriteWithoutShardKeyTestUtil.Configurations.sessionRetryableWrite, + WriteWithoutShardKeyTestUtil.Configurations.transaction +]; + +configurations.forEach(config => { + let conn = WriteWithoutShardKeyTestUtil.getClusterConnection(st, config); + testCases.forEach(testCase => { + WriteWithoutShardKeyTestUtil.runTestWithConfig(conn, testCase, config, testCase.opType); + }); +}); + +st.stop(); +})(); diff --git a/jstests/sharding/update_compound_shard_key.js b/jstests/sharding/update_compound_shard_key.js index 9e2ee7f5a9e..b659bd9f2e0 100644 --- a/jstests/sharding/update_compound_shard_key.js +++ b/jstests/sharding/update_compound_shard_key.js @@ -168,10 +168,24 @@ assertUpdateWorked({_id: 0}, {z: 3, x: 4, y: 3, replStyle: 2}, false, 0); // Shard key field modifications do not have to specify full shard key. if (WriteWithoutShardKeyTestUtil.isWriteWithoutShardKeyFeatureEnabled(st.s)) { - // TODO: SERVER-70581 Handle WCOS for update and findAndModify if replacement document changes - // data placement - // assert.commandWorked(st.s.getDB(kDbName).coll.update({}, {x: 110, y: 55, z: 3, a: 110}, - // false)); + const testDB = st.s.getDB("test"); + const testColl = testDB.coll; + + // Shard testColl on {x:1}, split it at {x:0}, and move chunk {x:1} to shard1. This collection + // is used to for the update below which would use the write without shard key protocol, but + // since the query is unspecified, any 1 random document could be modified. In order to not + // break the state of the original test collection, 'testColl' is used specifically for the + // single update below. + st.shardColl(testColl, {x: 1}, {x: 0}, {x: 1}); + + assert.commandWorked(testColl.insert({x: 1, _id: 1})); + assert.commandWorked(testColl.insert({x: -1, _id: 0})); + let updateRes = assert.commandWorked(testColl.update({}, {x: 110, y: 55, z: 3, a: 110}, false)); + assert.eq(1, updateRes.nMatched); + assert.eq(1, updateRes.nModified); + assert.eq(testColl.find({x: 110, y: 55, z: 3, a: 110}).itcount(), 1); + + // TODO: SERVER-73689 Fix shard key update check in update_stage.cpp to exclude _id queries. assert.commandWorked( st.s.getDB(kDbName).coll.update({_id: 2}, {x: 110, y: 55, z: 3, a: 110}, false)); } else { diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index 9b5e718643c..b708a595391 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -677,10 +677,14 @@ Status runAggregate(OperationContext* opCtx, // If we are running a retryable write without shard key, check if the write was applied on this // shard, and if so, return early with an empty cursor with $_wasStatementExecuted - // set to true. + // set to true. The isRetryableWrite() check here is to check that the client executed write was + // a retryable write (which would've spawned an internal session for a retryable write to + // execute the two phase write without shard key protocol), otherwise we skip the retryable + // write check. auto isClusterQueryWithoutShardKeyCmd = request.getIsClusterQueryWithoutShardKeyCmd(); - auto stmtId = request.getStmtId(); - if (isClusterQueryWithoutShardKeyCmd && stmtId) { + if (opCtx->isRetryableWrite() && isClusterQueryWithoutShardKeyCmd) { + auto stmtId = request.getStmtId(); + tassert(7058100, "StmtId must be set for a retryable write without shard key", stmtId); if (TransactionParticipant::get(opCtx).checkStatementExecuted(opCtx, *stmtId)) { CursorResponseBuilder::Options options; options.isInitialResponse = true; diff --git a/src/mongo/db/exec/update_stage.cpp b/src/mongo/db/exec/update_stage.cpp index e265f0cdda1..95dd0a4d8eb 100644 --- a/src/mongo/db/exec/update_stage.cpp +++ b/src/mongo/db/exec/update_stage.cpp @@ -646,6 +646,7 @@ void UpdateStage::_checkRestrictionsOnUpdatingShardKeyAreNotViolated( serverGlobalParams.featureCompatibility) && sentShardVersion && !ShardVersion::isIgnoredVersion(*sentShardVersion); + // TODO: SERVER-73689 Fix shard key update check in update_stage.cpp to exclude _id queries. uassert(31025, "Shard key update is not allowed without specifying the full shard key in the " "query", diff --git a/src/mongo/s/collection_routing_info_targeter.cpp b/src/mongo/s/collection_routing_info_targeter.cpp index 444e7666c95..d56281a9d4e 100644 --- a/src/mongo/s/collection_routing_info_targeter.cpp +++ b/src/mongo/s/collection_routing_info_targeter.cpp @@ -478,14 +478,22 @@ std::vector<ShardEndpoint> CollectionRoutingInfoTargeter::targetUpdate( return endPoints; } - // Replacement-style updates must always target a single shard. If we were unable to do so using - // the query, we attempt to extract the shard key from the replacement and target based on it. - if (updateOp.getU().type() == write_ops::UpdateModification::Type::kReplacement) { - if (chunkRanges) { - chunkRanges->clear(); + // Targeting by replacement document is no longer necessary when an updateOne without shard key + // is allowed, since we're able to decisively select a document to modify with the two phase + // write without shard key protocol. + if (!feature_flags::gFeatureFlagUpdateOneWithoutShardKey.isEnabled( + serverGlobalParams.featureCompatibility) || + isExactIdQuery(opCtx, _nss, query, collation, _cri.cm)) { + // Replacement-style updates must always target a single shard. If we were unable to do so + // using the query, we attempt to extract the shard key from the replacement and target + // based on it. + if (updateOp.getU().type() == write_ops::UpdateModification::Type::kReplacement) { + if (chunkRanges) { + chunkRanges->clear(); + } + return targetByShardKey(shardKeyPattern.extractShardKeyFromDoc(updateExpr), + "Failed to target update by replacement document"); } - return targetByShardKey(shardKeyPattern.extractShardKeyFromDoc(updateExpr), - "Failed to target update by replacement document"); } // If we are here then this is an op-style update and we were not able to target a single shard. diff --git a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp index f47f2d5d987..1353e6f5d15 100644 --- a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp +++ b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp @@ -27,6 +27,8 @@ * it in the license file. */ +#include "mongo/s/commands/cluster_find_and_modify_cmd.h" + #include "mongo/base/status_with.h" #include "mongo/bson/util/bson_extract.h" #include "mongo/db/auth/action_set.h" @@ -346,467 +348,410 @@ BSONObj prepareCmdObjForPassthrough(OperationContext* opCtx, return newCmdObj; } -class FindAndModifyCmd : public BasicCommand { -public: - FindAndModifyCmd() - : BasicCommand("findAndModify", "findandmodify"), _updateMetrics{"findAndModify"} {} - - const std::set<std::string>& apiVersions() const override { - return kApiVersions1; - } +FindAndModifyCmd findAndModifyCmd; - AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { - return AllowedOnSecondary::kAlways; - } +} // namespace - ReadWriteType getReadWriteType() const override { - return ReadWriteType::kWrite; +Status FindAndModifyCmd::checkAuthForOperation(OperationContext* opCtx, + const DatabaseName& dbName, + const BSONObj& cmdObj) const { + const bool update = cmdObj["update"].trueValue(); + const bool upsert = cmdObj["upsert"].trueValue(); + const bool remove = cmdObj["remove"].trueValue(); + + ActionSet actions; + actions.addAction(ActionType::find); + if (update) { + actions.addAction(ActionType::update); } - - bool adminOnly() const override { - return false; + if (upsert) { + actions.addAction(ActionType::insert); } - - bool supportsWriteConcern(const BSONObj& cmd) const override { - return true; + if (remove) { + actions.addAction(ActionType::remove); } - - ReadConcernSupportResult supportsReadConcern(const BSONObj& cmdObj, - repl::ReadConcernLevel level, - bool isImplicitDefault) const override { - return {{level != repl::ReadConcernLevel::kLocalReadConcern && - level != repl::ReadConcernLevel::kSnapshotReadConcern, - {ErrorCodes::InvalidOptions, "read concern not supported"}}, - {{ErrorCodes::InvalidOptions, "default read concern not permitted"}}}; + if (shouldBypassDocumentValidationForCommand(cmdObj)) { + actions.addAction(ActionType::bypassDocumentValidation); } - Status checkAuthForOperation(OperationContext* opCtx, - const DatabaseName& dbName, - const BSONObj& cmdObj) const override { - const bool update = cmdObj["update"].trueValue(); - const bool upsert = cmdObj["upsert"].trueValue(); - const bool remove = cmdObj["remove"].trueValue(); - - ActionSet actions; - actions.addAction(ActionType::find); - if (update) { - actions.addAction(ActionType::update); - } - if (upsert) { - actions.addAction(ActionType::insert); - } - if (remove) { - actions.addAction(ActionType::remove); - } - if (shouldBypassDocumentValidationForCommand(cmdObj)) { - actions.addAction(ActionType::bypassDocumentValidation); - } - - auto nss = CommandHelpers::parseNsFromCommand(dbName, cmdObj); - ResourcePattern resource(CommandHelpers::resourcePatternForNamespace(nss.ns())); - uassert(17137, - "Invalid target namespace " + resource.toString(), - resource.isExactNamespacePattern()); - - auto* as = AuthorizationSession::get(opCtx->getClient()); - if (!as->isAuthorizedForActionsOnResource(resource, actions)) { - return {ErrorCodes::Unauthorized, "unauthorized"}; - } + auto nss = CommandHelpers::parseNsFromCommand(dbName, cmdObj); + ResourcePattern resource(CommandHelpers::resourcePatternForNamespace(nss.ns())); + uassert(17137, + "Invalid target namespace " + resource.toString(), + resource.isExactNamespacePattern()); - return Status::OK(); + auto* as = AuthorizationSession::get(opCtx->getClient()); + if (!as->isAuthorizedForActionsOnResource(resource, actions)) { + return {ErrorCodes::Unauthorized, "unauthorized"}; } - Status explain(OperationContext* opCtx, - const OpMsgRequest& request, - ExplainOptions::Verbosity verbosity, - rpc::ReplyBuilderInterface* result) const override { - const DatabaseName dbName(request.getValidatedTenantId(), request.getDatabase()); - const BSONObj& cmdObj = [&]() { - // Check whether the query portion needs to be rewritten for FLE. - auto findAndModifyRequest = write_ops::FindAndModifyCommandRequest::parse( - IDLParserContext("ClusterFindAndModify"), request.body); - if (shouldDoFLERewrite(findAndModifyRequest)) { - auto newRequest = processFLEFindAndModifyExplainMongos(opCtx, findAndModifyRequest); - return newRequest.first.toBSON(request.body); - } else { - return request.body; - } - }(); - const NamespaceString nss(CommandHelpers::parseNsCollectionRequired(dbName, cmdObj)); - - const auto cri = - uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); - const auto& cm = cri.cm; - - std::shared_ptr<Shard> shard; - if (cm.isSharded()) { - const BSONObj query = cmdObj.getObjectField("query"); - const BSONObj collation = getCollation(cmdObj); - const auto let = getLet(cmdObj); - const auto rc = getLegacyRuntimeConstants(cmdObj); - const BSONObj shardKey = - getShardKey(opCtx, cm, nss, query, collation, verbosity, let, rc); - const auto chunk = cm.findIntersectingChunk(shardKey, collation); + return Status::OK(); +} - shard = uassertStatusOK( - Grid::get(opCtx)->shardRegistry()->getShard(opCtx, chunk.getShardId())); +Status FindAndModifyCmd::explain(OperationContext* opCtx, + const OpMsgRequest& request, + ExplainOptions::Verbosity verbosity, + rpc::ReplyBuilderInterface* result) const { + const DatabaseName dbName(request.getValidatedTenantId(), request.getDatabase()); + const BSONObj& cmdObj = [&]() { + // Check whether the query portion needs to be rewritten for FLE. + auto findAndModifyRequest = write_ops::FindAndModifyCommandRequest::parse( + IDLParserContext("ClusterFindAndModify"), request.body); + if (shouldDoFLERewrite(findAndModifyRequest)) { + auto newRequest = processFLEFindAndModifyExplainMongos(opCtx, findAndModifyRequest); + return newRequest.first.toBSON(request.body); } else { - shard = - uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, cm.dbPrimary())); + return request.body; } + }(); + const NamespaceString nss(CommandHelpers::parseNsCollectionRequired(dbName, cmdObj)); + + const auto cri = + uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); + const auto& cm = cri.cm; + + std::shared_ptr<Shard> shard; + if (cm.isSharded()) { + const BSONObj query = cmdObj.getObjectField("query"); + const BSONObj collation = getCollation(cmdObj); + const auto let = getLet(cmdObj); + const auto rc = getLegacyRuntimeConstants(cmdObj); + const BSONObj shardKey = getShardKey(opCtx, cm, nss, query, collation, verbosity, let, rc); + const auto chunk = cm.findIntersectingChunk(shardKey, collation); + + shard = + uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, chunk.getShardId())); + } else { + shard = uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, cm.dbPrimary())); + } - const auto explainCmd = ClusterExplain::wrapAsExplain( - appendLegacyRuntimeConstantsToCommandObject(opCtx, cmdObj), verbosity); - - // Time how long it takes to run the explain command on the shard. - Timer timer; - BSONObjBuilder bob; - - if (cm.isSharded()) { - _runCommand(opCtx, - shard->getId(), - cri.getShardVersion(shard->getId()), - boost::none, - nss, - applyReadWriteConcern(opCtx, false, false, explainCmd), - true /* isExplain */, - &bob); - } else { - _runCommand(opCtx, - shard->getId(), - boost::make_optional(!cm.dbVersion().isFixed(), ShardVersion::UNSHARDED()), - cm.dbVersion(), - nss, - applyReadWriteConcern(opCtx, false, false, explainCmd), - true /* isExplain */, - &bob); - } + const auto explainCmd = ClusterExplain::wrapAsExplain( + appendLegacyRuntimeConstantsToCommandObject(opCtx, cmdObj), verbosity); + + // Time how long it takes to run the explain command on the shard. + Timer timer; + BSONObjBuilder bob; + + if (cm.isSharded()) { + _runCommand(opCtx, + shard->getId(), + cri.getShardVersion(shard->getId()), + boost::none, + nss, + applyReadWriteConcern(opCtx, false, false, explainCmd), + true /* isExplain */, + &bob); + } else { + _runCommand(opCtx, + shard->getId(), + boost::make_optional(!cm.dbVersion().isFixed(), ShardVersion::UNSHARDED()), + cm.dbVersion(), + nss, + applyReadWriteConcern(opCtx, false, false, explainCmd), + true /* isExplain */, + &bob); + } - const auto millisElapsed = timer.millis(); + const auto millisElapsed = timer.millis(); - executor::RemoteCommandResponse response(bob.obj(), Milliseconds(millisElapsed)); + executor::RemoteCommandResponse response(bob.obj(), Milliseconds(millisElapsed)); - // We fetch an arbitrary host from the ConnectionString, since - // ClusterExplain::buildExplainResult() doesn't use the given HostAndPort. - AsyncRequestsSender::Response arsResponse{ - shard->getId(), response, shard->getConnString().getServers().front()}; + // We fetch an arbitrary host from the ConnectionString, since + // ClusterExplain::buildExplainResult() doesn't use the given HostAndPort. + AsyncRequestsSender::Response arsResponse{ + shard->getId(), response, shard->getConnString().getServers().front()}; - auto bodyBuilder = result->getBodyBuilder(); - return ClusterExplain::buildExplainResult(opCtx, - {arsResponse}, - ClusterExplain::kSingleShard, - millisElapsed, - cmdObj, - &bodyBuilder); - } + auto bodyBuilder = result->getBodyBuilder(); + return ClusterExplain::buildExplainResult( + opCtx, {arsResponse}, ClusterExplain::kSingleShard, millisElapsed, cmdObj, &bodyBuilder); +} - bool allowedInTransactions() const final { - return true; - } +bool FindAndModifyCmd::run(OperationContext* opCtx, + const DatabaseName& dbName, + const BSONObj& cmdObj, + BSONObjBuilder& result) { + const NamespaceString nss(CommandHelpers::parseNsCollectionRequired(dbName, cmdObj)); - bool supportsRetryableWrite() const final { + if (processFLEFindAndModify(opCtx, cmdObj, result) == FLEBatchResult::kProcessed) { return true; } - bool run(OperationContext* opCtx, - const DatabaseName& dbName, - const BSONObj& cmdObj, - BSONObjBuilder& result) override { - const NamespaceString nss(CommandHelpers::parseNsCollectionRequired(dbName, cmdObj)); - - if (processFLEFindAndModify(opCtx, cmdObj, result) == FLEBatchResult::kProcessed) { - return true; - } - - // Collect metrics. - _updateMetrics.collectMetrics(cmdObj); - - // Technically, findAndModify should only be creating database if upsert is true, but this - // would require that the parsing be pulled into this function. - cluster::createDatabase(opCtx, nss.db()); - - // Append mongoS' runtime constants to the command object before forwarding it to the shard. - auto cmdObjForShard = appendLegacyRuntimeConstantsToCommandObject(opCtx, cmdObj); - - const auto cri = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, nss)); - const auto& cm = cri.cm; - if (cm.isSharded()) { - const BSONObj query = cmdObjForShard.getObjectField("query"); - if (write_without_shard_key::useTwoPhaseProtocol(opCtx, - nss, - false /* isUpdateOrDelete */, - query, - getCollation(cmdObjForShard))) { - _runCommandWithoutShardKey(opCtx, - nss, - applyReadWriteConcern(opCtx, this, cmdObjForShard), - false /* isExplain */, - &result); - } else { - const BSONObj collation = getCollation(cmdObjForShard); - const auto let = getLet(cmdObjForShard); - const auto rc = getLegacyRuntimeConstants(cmdObjForShard); - const BSONObj shardKey = - getShardKey(opCtx, cm, nss, query, collation, boost::none, let, rc); - - // For now, set bypassIsFieldHashedCheck to be true in order to skip the - // isFieldHashedCheck in the special case where _id is hashed and used as the shard - // key. This means that we always assume that a findAndModify request using _id is - // targetable to a single shard. - auto chunk = cm.findIntersectingChunk(shardKey, collation, true); - _runCommand(opCtx, - chunk.getShardId(), - cri.getShardVersion(chunk.getShardId()), - boost::none, - nss, - applyReadWriteConcern(opCtx, this, cmdObjForShard), - false /* isExplain */, - &result); - } + // Collect metrics. + _updateMetrics.collectMetrics(cmdObj); + + // Technically, findAndModify should only be creating database if upsert is true, but this + // would require that the parsing be pulled into this function. + cluster::createDatabase(opCtx, nss.db()); + + // Append mongoS' runtime constants to the command object before forwarding it to the shard. + auto cmdObjForShard = appendLegacyRuntimeConstantsToCommandObject(opCtx, cmdObj); + + const auto cri = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, nss)); + const auto& cm = cri.cm; + if (cm.isSharded()) { + const BSONObj query = cmdObjForShard.getObjectField("query"); + if (write_without_shard_key::useTwoPhaseProtocol( + opCtx, nss, false /* isUpdateOrDelete */, query, getCollation(cmdObjForShard))) { + _runCommandWithoutShardKey(opCtx, + nss, + applyReadWriteConcern(opCtx, this, cmdObjForShard), + false /* isExplain */, + &result); } else { + const BSONObj collation = getCollation(cmdObjForShard); + const auto let = getLet(cmdObjForShard); + const auto rc = getLegacyRuntimeConstants(cmdObjForShard); + const BSONObj shardKey = + getShardKey(opCtx, cm, nss, query, collation, boost::none, let, rc); + + // For now, set bypassIsFieldHashedCheck to be true in order to skip the + // isFieldHashedCheck in the special case where _id is hashed and used as the shard + // key. This means that we always assume that a findAndModify request using _id is + // targetable to a single shard. + auto chunk = cm.findIntersectingChunk(shardKey, collation, true); _runCommand(opCtx, - cm.dbPrimary(), - boost::make_optional(!cm.dbVersion().isFixed(), ShardVersion::UNSHARDED()), - cm.dbVersion(), + chunk.getShardId(), + cri.getShardVersion(chunk.getShardId()), + boost::none, nss, applyReadWriteConcern(opCtx, this, cmdObjForShard), false /* isExplain */, &result); } - - return true; + } else { + _runCommand(opCtx, + cm.dbPrimary(), + boost::make_optional(!cm.dbVersion().isFixed(), ShardVersion::UNSHARDED()), + cm.dbVersion(), + nss, + applyReadWriteConcern(opCtx, this, cmdObjForShard), + false /* isExplain */, + &result); } -private: - static bool getCrudProcessedFromCmd(const BSONObj& cmdObj) { - // We could have wrapped the FindAndModify command in an explain object - const BSONObj& realCmdObj = - cmdObj.getField("explain").ok() ? cmdObj.getObjectField("explain") : cmdObj; - auto req = write_ops::FindAndModifyCommandRequest::parse( - IDLParserContext("ClusterFindAndModify"), realCmdObj); + return true; +} - return req.getEncryptionInformation().has_value() && - req.getEncryptionInformation()->getCrudProcessed().get_value_or(false); - } +bool FindAndModifyCmd::getCrudProcessedFromCmd(const BSONObj& cmdObj) { + // We could have wrapped the FindAndModify command in an explain object + const BSONObj& realCmdObj = + cmdObj.getField("explain").ok() ? cmdObj.getObjectField("explain") : cmdObj; + auto req = write_ops::FindAndModifyCommandRequest::parse( + IDLParserContext("ClusterFindAndModify"), realCmdObj); - // Catches errors in the given response, and reruns the command if necessary. Uses the given - // response to construct the findAndModify command result passed to the client. - static void _constructResult(OperationContext* opCtx, - const ShardId& shardId, - const boost::optional<ShardVersion>& shardVersion, - const boost::optional<DatabaseVersion>& dbVersion, - const NamespaceString& nss, - const BSONObj& cmdObj, - const BSONObj& response, - BSONObjBuilder* result) { - auto txnRouter = TransactionRouter::get(opCtx); - bool isRetryableWrite = opCtx->getTxnNumber() && !txnRouter; - - const auto responseStatus = getStatusFromCommandResult(response); - if (ErrorCodes::isNeedRetargettingError(responseStatus.code()) || - ErrorCodes::isSnapshotError(responseStatus.code()) || - responseStatus.code() == ErrorCodes::StaleDbVersion) { - // Command code traps this exception and re-runs - uassertStatusOK(responseStatus.withContext("findAndModify")); - } + return req.getEncryptionInformation().has_value() && + req.getEncryptionInformation()->getCrudProcessed().get_value_or(false); +} - if (responseStatus.code() == ErrorCodes::TenantMigrationAborted) { - uassertStatusOK(responseStatus.withContext("findAndModify")); - } +// Catches errors in the given response, and reruns the command if necessary. Uses the given +// response to construct the findAndModify command result passed to the client. +void FindAndModifyCmd::_constructResult(OperationContext* opCtx, + const ShardId& shardId, + const boost::optional<ShardVersion>& shardVersion, + const boost::optional<DatabaseVersion>& dbVersion, + const NamespaceString& nss, + const BSONObj& cmdObj, + const BSONObj& response, + BSONObjBuilder* result) { + auto txnRouter = TransactionRouter::get(opCtx); + bool isRetryableWrite = opCtx->getTxnNumber() && !txnRouter; + + const auto responseStatus = getStatusFromCommandResult(response); + if (ErrorCodes::isNeedRetargettingError(responseStatus.code()) || + ErrorCodes::isSnapshotError(responseStatus.code()) || + responseStatus.code() == ErrorCodes::StaleDbVersion) { + // Command code traps this exception and re-runs + uassertStatusOK(responseStatus.withContext("findAndModify")); + } - if (responseStatus.code() == ErrorCodes::WouldChangeOwningShard) { - if (feature_flags::gFeatureFlagUpdateDocumentShardKeyUsingTransactionApi.isEnabled( - serverGlobalParams.featureCompatibility)) { - - auto parsedRequest = write_ops::FindAndModifyCommandRequest::parse( - IDLParserContext("ClusterFindAndModify"), cmdObj); - // Strip write concern because this command will be sent as part of a - // transaction and the write concern has already been loaded onto the opCtx and - // will be picked up by the transaction API. - parsedRequest.setWriteConcern(boost::none); - - // Strip runtime constants because they will be added again when this command is - // recursively sent through the service entry point. - parsedRequest.setLegacyRuntimeConstants(boost::none); - if (txnRouter) { - handleWouldChangeOwningShardErrorTransaction(opCtx, - nss, - responseStatus, - parsedRequest, - result, - getCrudProcessedFromCmd(cmdObj)); - } else { - if (isRetryableWrite) { - parsedRequest.setStmtId(0); - } - handleWouldChangeOwningShardErrorNonTransaction( - opCtx, shardId, nss, parsedRequest, result); - } + if (responseStatus.code() == ErrorCodes::TenantMigrationAborted) { + uassertStatusOK(responseStatus.withContext("findAndModify")); + } + + if (responseStatus.code() == ErrorCodes::WouldChangeOwningShard) { + if (feature_flags::gFeatureFlagUpdateDocumentShardKeyUsingTransactionApi.isEnabled( + serverGlobalParams.featureCompatibility)) { + handleWouldChangeOwningShardError(opCtx, shardId, nss, cmdObj, responseStatus, result); + } else { + // TODO SERVER-67429: Remove this branch. + if (isRetryableWrite) { + _handleWouldChangeOwningShardErrorRetryableWriteLegacy( + opCtx, shardId, shardVersion, dbVersion, nss, cmdObj, result); } else { - // TODO SERVER-67429: Remove this branch. - if (isRetryableWrite) { - _handleWouldChangeOwningShardErrorRetryableWriteLegacy( - opCtx, shardId, shardVersion, dbVersion, nss, cmdObj, result); - } else { - handleWouldChangeOwningShardErrorTransactionLegacy( - opCtx, - nss, - responseStatus, - cmdObj, - result, - getCrudProcessedFromCmd(cmdObj)); - } + handleWouldChangeOwningShardErrorTransactionLegacy( + opCtx, nss, responseStatus, cmdObj, result, getCrudProcessedFromCmd(cmdObj)); } - - return; } - // First append the properly constructed writeConcernError. It will then be skipped in - // appendElementsUnique. - if (auto wcErrorElem = response["writeConcernError"]) { - appendWriteConcernErrorToCmdResponse(shardId, wcErrorElem, *result); - } + return; + } - result->appendElementsUnique(CommandHelpers::filterCommandReplyForPassthrough(response)); + // First append the properly constructed writeConcernError. It will then be skipped in + // appendElementsUnique. + if (auto wcErrorElem = response["writeConcernError"]) { + appendWriteConcernErrorToCmdResponse(shardId, wcErrorElem, *result); } - // Two-phase protocol to run a findAndModify command without a shard key or _id. - static void _runCommandWithoutShardKey(OperationContext* opCtx, - const NamespaceString& nss, - const BSONObj& cmdObj, - bool isExplain, - BSONObjBuilder* result) { - - auto cmdObjForPassthrough = prepareCmdObjForPassthrough(opCtx, - cmdObj, - nss, - isExplain, - boost::none /* dbVersion */, - boost::none /* shardVersion */); - - auto swRes = - write_without_shard_key::runTwoPhaseWriteProtocol(opCtx, nss, cmdObjForPassthrough); - uassertStatusOK(swRes.getStatus()); - - // runTwoPhaseWriteProtocol returns an empty response when there are not matching documents - // and {upsert: false}. - BSONObj response; - if (swRes.getValue().getResponse().isEmpty()) { - write_ops::FindAndModifyLastError lastError(0 /* n */); - lastError.setUpdatedExisting(false); - - write_ops::FindAndModifyCommandReply findAndModifyResponse; - findAndModifyResponse.setLastErrorObject(std::move(lastError)); - findAndModifyResponse.setValue(boost::none); - response = findAndModifyResponse.toBSON(); - } else { - response = swRes.getValue().getResponse(); - } + result->appendElementsUnique(CommandHelpers::filterCommandReplyForPassthrough(response)); +} - // Extract findAndModify command result from the result of the two phase write protocol. - _constructResult(opCtx, - ShardId(swRes.getValue().getShardId().toString()), - boost::none /* shardVersion */, - boost::none /* dbVersion */, - nss, - cmdObj, - response, - result); +// Two-phase protocol to run a findAndModify command without a shard key or _id. +void FindAndModifyCmd::_runCommandWithoutShardKey(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& cmdObj, + bool isExplain, + BSONObjBuilder* result) { + + auto cmdObjForPassthrough = prepareCmdObjForPassthrough( + opCtx, cmdObj, nss, isExplain, boost::none /* dbVersion */, boost::none /* shardVersion */); + + auto swRes = + write_without_shard_key::runTwoPhaseWriteProtocol(opCtx, nss, cmdObjForPassthrough); + uassertStatusOK(swRes.getStatus()); + + // runTwoPhaseWriteProtocol returns an empty response when there are not matching documents + // and {upsert: false}. + BSONObj response; + if (swRes.getValue().getResponse().isEmpty()) { + write_ops::FindAndModifyLastError lastError(0 /* n */); + lastError.setUpdatedExisting(false); + + write_ops::FindAndModifyCommandReply findAndModifyResponse; + findAndModifyResponse.setLastErrorObject(std::move(lastError)); + findAndModifyResponse.setValue(boost::none); + response = findAndModifyResponse.toBSON(); + } else { + response = swRes.getValue().getResponse(); } - // Command invocation to be used if a shard key is specified or the collection is unsharded. - static void _runCommand(OperationContext* opCtx, - const ShardId& shardId, - const boost::optional<ShardVersion>& shardVersion, - const boost::optional<DatabaseVersion>& dbVersion, - const NamespaceString& nss, - const BSONObj& cmdObj, - bool isExplain, - BSONObjBuilder* result) { - auto txnRouter = TransactionRouter::get(opCtx); - bool isRetryableWrite = opCtx->getTxnNumber() && !txnRouter; - - const auto response = [&] { - std::vector<AsyncRequestsSender::Request> requests; - auto cmdObjForPassthrough = - prepareCmdObjForPassthrough(opCtx, cmdObj, nss, isExplain, dbVersion, shardVersion); - requests.emplace_back(shardId, cmdObjForPassthrough); - - MultiStatementTransactionRequestsSender ars( - opCtx, - Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), - nss.db().toString(), - requests, - kPrimaryOnlyReadPreference, - isRetryableWrite ? Shard::RetryPolicy::kIdempotent : Shard::RetryPolicy::kNoRetry); - - auto response = ars.next(); - invariant(ars.done()); - - return uassertStatusOK(std::move(response.swResponse)); - }(); - - uassertStatusOK(response.status); - _constructResult( - opCtx, shardId, shardVersion, dbVersion, nss, cmdObj, response.data, result); - } + // Extract findAndModify command result from the result of the two phase write protocol. + _constructResult(opCtx, + ShardId(swRes.getValue().getShardId().toString()), + boost::none /* shardVersion */, + boost::none /* dbVersion */, + nss, + cmdObj, + response, + result); +} - // TODO SERVER-67429: Remove this function. - static void _handleWouldChangeOwningShardErrorRetryableWriteLegacy( - OperationContext* opCtx, - const ShardId& shardId, - const boost::optional<ShardVersion>& shardVersion, - const boost::optional<DatabaseVersion>& dbVersion, - const NamespaceString& nss, - const BSONObj& cmdObj, - BSONObjBuilder* result) { - RouterOperationContextSession routerSession(opCtx); - try { - auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); - readConcernArgs = repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern); - - // Re-run the findAndModify command that will change the shard key value in a - // transaction. We call _runCommand recursively, and this second time through - // since it will be run as a transaction it will take the other code path to - // handleWouldChangeOwningShardErrorTransactionLegacy. We ensure the retried - // operation does not include WC inside the transaction by stripping it from the - // cmdObj. The transaction commit will still use the WC, because it uses the WC - // from the opCtx (which has been set previously in Strategy). - documentShardKeyUpdateUtil::startTransactionForShardKeyUpdate(opCtx); - _runCommand(opCtx, - shardId, - shardVersion, - dbVersion, - nss, - stripWriteConcern(cmdObj), - false /* isExplain */, - result); - uassertStatusOK(getStatusFromCommandResult(result->asTempObj())); - auto commitResponse = - documentShardKeyUpdateUtil::commitShardKeyUpdateTransaction(opCtx); - - uassertStatusOK(getStatusFromCommandResult(commitResponse)); - if (auto wcErrorElem = commitResponse["writeConcernError"]) { - appendWriteConcernErrorToCmdResponse(shardId, wcErrorElem, *result); - } - } catch (DBException& e) { - if (e.code() != ErrorCodes::DuplicateKey || - (e.code() == ErrorCodes::DuplicateKey && - !e.extraInfo<DuplicateKeyErrorInfo>()->getKeyPattern().hasField("_id"))) { - e.addContext(documentShardKeyUpdateUtil::kNonDuplicateKeyErrorContext); - } +// Command invocation to be used if a shard key is specified or the collection is unsharded. +void FindAndModifyCmd::_runCommand(OperationContext* opCtx, + const ShardId& shardId, + const boost::optional<ShardVersion>& shardVersion, + const boost::optional<DatabaseVersion>& dbVersion, + const NamespaceString& nss, + const BSONObj& cmdObj, + bool isExplain, + BSONObjBuilder* result) { + auto txnRouter = TransactionRouter::get(opCtx); + bool isRetryableWrite = opCtx->getTxnNumber() && !txnRouter; + + const auto response = [&] { + std::vector<AsyncRequestsSender::Request> requests; + auto cmdObjForPassthrough = + prepareCmdObjForPassthrough(opCtx, cmdObj, nss, isExplain, dbVersion, shardVersion); + requests.emplace_back(shardId, cmdObjForPassthrough); + + MultiStatementTransactionRequestsSender ars( + opCtx, + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), + nss.db().toString(), + requests, + kPrimaryOnlyReadPreference, + isRetryableWrite ? Shard::RetryPolicy::kIdempotent : Shard::RetryPolicy::kNoRetry); - auto txnRouterForAbort = TransactionRouter::get(opCtx); - if (txnRouterForAbort) - txnRouterForAbort.implicitlyAbortTransaction(opCtx, e.toStatus()); + auto response = ars.next(); + invariant(ars.done()); + + return uassertStatusOK(std::move(response.swResponse)); + }(); + + uassertStatusOK(response.status); + _constructResult(opCtx, shardId, shardVersion, dbVersion, nss, cmdObj, response.data, result); +} - throw; +// TODO SERVER-67429: Remove this function. +void FindAndModifyCmd::_handleWouldChangeOwningShardErrorRetryableWriteLegacy( + OperationContext* opCtx, + const ShardId& shardId, + const boost::optional<ShardVersion>& shardVersion, + const boost::optional<DatabaseVersion>& dbVersion, + const NamespaceString& nss, + const BSONObj& cmdObj, + BSONObjBuilder* result) { + RouterOperationContextSession routerSession(opCtx); + try { + auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); + readConcernArgs = repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern); + + // Re-run the findAndModify command that will change the shard key value in a + // transaction. We call _runCommand recursively, and this second time through + // since it will be run as a transaction it will take the other code path to + // handleWouldChangeOwningShardErrorTransactionLegacy. We ensure the retried + // operation does not include WC inside the transaction by stripping it from the + // cmdObj. The transaction commit will still use the WC, because it uses the WC + // from the opCtx (which has been set previously in Strategy). + documentShardKeyUpdateUtil::startTransactionForShardKeyUpdate(opCtx); + _runCommand(opCtx, + shardId, + shardVersion, + dbVersion, + nss, + stripWriteConcern(cmdObj), + false /* isExplain */, + result); + uassertStatusOK(getStatusFromCommandResult(result->asTempObj())); + auto commitResponse = documentShardKeyUpdateUtil::commitShardKeyUpdateTransaction(opCtx); + + uassertStatusOK(getStatusFromCommandResult(commitResponse)); + if (auto wcErrorElem = commitResponse["writeConcernError"]) { + appendWriteConcernErrorToCmdResponse(shardId, wcErrorElem, *result); } + } catch (DBException& e) { + if (e.code() != ErrorCodes::DuplicateKey || + (e.code() == ErrorCodes::DuplicateKey && + !e.extraInfo<DuplicateKeyErrorInfo>()->getKeyPattern().hasField("_id"))) { + e.addContext(documentShardKeyUpdateUtil::kNonDuplicateKeyErrorContext); + } + + auto txnRouterForAbort = TransactionRouter::get(opCtx); + if (txnRouterForAbort) + txnRouterForAbort.implicitlyAbortTransaction(opCtx, e.toStatus()); + + throw; } +} - // Update related command execution metrics. - UpdateMetrics _updateMetrics; -} findAndModifyCmd; +void FindAndModifyCmd::handleWouldChangeOwningShardError(OperationContext* opCtx, + const ShardId& shardId, + const NamespaceString& nss, + const BSONObj& cmdObj, + Status responseStatus, + BSONObjBuilder* result) { + auto txnRouter = TransactionRouter::get(opCtx); + bool isRetryableWrite = opCtx->getTxnNumber() && !txnRouter; + + auto parsedRequest = write_ops::FindAndModifyCommandRequest::parse( + IDLParserContext("ClusterFindAndModify"), cmdObj); + + // Strip write concern because this command will be sent as part of a + // transaction and the write concern has already been loaded onto the opCtx and + // will be picked up by the transaction API. + parsedRequest.setWriteConcern(boost::none); + + // Strip runtime constants because they will be added again when this command is + // recursively sent through the service entry point. + parsedRequest.setLegacyRuntimeConstants(boost::none); + if (txnRouter) { + handleWouldChangeOwningShardErrorTransaction( + opCtx, nss, responseStatus, parsedRequest, result, getCrudProcessedFromCmd(cmdObj)); + } else { + if (isRetryableWrite) { + parsedRequest.setStmtId(0); + } + handleWouldChangeOwningShardErrorNonTransaction(opCtx, shardId, nss, parsedRequest, result); + } +} -} // namespace } // namespace mongo diff --git a/src/mongo/s/commands/cluster_find_and_modify_cmd.h b/src/mongo/s/commands/cluster_find_and_modify_cmd.h new file mode 100644 index 00000000000..5b6144467b4 --- /dev/null +++ b/src/mongo/s/commands/cluster_find_and_modify_cmd.h @@ -0,0 +1,188 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/base/status_with.h" +#include "mongo/bson/util/bson_extract.h" +#include "mongo/db/auth/action_set.h" +#include "mongo/db/auth/action_type.h" +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/auth/resource_pattern.h" +#include "mongo/db/catalog/document_validation.h" +#include "mongo/db/commands.h" +#include "mongo/db/commands/update_metrics.h" +#include "mongo/db/fle_crud.h" +#include "mongo/db/internal_transactions_feature_flag_gen.h" +#include "mongo/db/ops/write_ops_gen.h" +#include "mongo/db/query/collation/collator_factory_interface.h" +#include "mongo/db/storage/duplicate_key_error_info.h" +#include "mongo/db/transaction/transaction_api.h" +#include "mongo/executor/task_executor_pool.h" +#include "mongo/logv2/log.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/s/balancer_configuration.h" +#include "mongo/s/catalog_cache.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/cluster_commands_helpers.h" +#include "mongo/s/cluster_ddl.h" +#include "mongo/s/commands/cluster_explain.h" +#include "mongo/s/commands/document_shard_key_update_util.h" +#include "mongo/s/commands/strategy.h" +#include "mongo/s/grid.h" +#include "mongo/s/multi_statement_transaction_requests_sender.h" +#include "mongo/s/query_analysis_sampler_util.h" +#include "mongo/s/request_types/cluster_commands_without_shard_key_gen.h" +#include "mongo/s/session_catalog_router.h" +#include "mongo/s/shard_key_pattern_query_util.h" +#include "mongo/s/stale_exception.h" +#include "mongo/s/transaction_router.h" +#include "mongo/s/transaction_router_resource_yielder.h" +#include "mongo/s/would_change_owning_shard_exception.h" +#include "mongo/s/write_ops/write_without_shard_key_util.h" +#include "mongo/util/timer.h" + +namespace mongo { + +class FindAndModifyCmd : public BasicCommand { +public: + FindAndModifyCmd() + : BasicCommand("findAndModify", "findandmodify"), _updateMetrics{"findAndModify"} {} + + const std::set<std::string>& apiVersions() const override { + return kApiVersions1; + } + + AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { + return AllowedOnSecondary::kAlways; + } + + ReadWriteType getReadWriteType() const override { + return ReadWriteType::kWrite; + } + + bool adminOnly() const override { + return false; + } + + bool supportsWriteConcern(const BSONObj& cmd) const override { + return true; + } + + ReadConcernSupportResult supportsReadConcern(const BSONObj& cmdObj, + repl::ReadConcernLevel level, + bool isImplicitDefault) const override { + return {{level != repl::ReadConcernLevel::kLocalReadConcern && + level != repl::ReadConcernLevel::kSnapshotReadConcern, + {ErrorCodes::InvalidOptions, "read concern not supported"}}, + {{ErrorCodes::InvalidOptions, "default read concern not permitted"}}}; + } + + Status checkAuthForOperation(OperationContext* opCtx, + const DatabaseName& dbName, + const BSONObj& cmdObj) const override; + + Status explain(OperationContext* opCtx, + const OpMsgRequest& request, + ExplainOptions::Verbosity verbosity, + rpc::ReplyBuilderInterface* result) const override; + + bool allowedInTransactions() const final { + return true; + } + + bool supportsRetryableWrite() const final { + return true; + } + + bool run(OperationContext* opCtx, + const DatabaseName& dbName, + const BSONObj& cmdObj, + BSONObjBuilder& result) override; + + /** + * Changes the shard key for the document if the response object contains a + * WouldChangeOwningShard error. If the original command was sent as a retryable write, starts a + * transaction on the same session and txnNum, deletes the original document, inserts the new + * one, and commits the transaction. If the original command is part of a transaction, deletes + * the original document and inserts the new one. + */ + static void handleWouldChangeOwningShardError(OperationContext* opCtx, + const ShardId& shardId, + const NamespaceString& nss, + const BSONObj& cmdObj, + Status responseStatus, + BSONObjBuilder* result); + +private: + static bool getCrudProcessedFromCmd(const BSONObj& cmdObj); + + // Catches errors in the given response, and reruns the command if necessary. Uses the given + // response to construct the findAndModify command result passed to the client. + static void _constructResult(OperationContext* opCtx, + const ShardId& shardId, + const boost::optional<ShardVersion>& shardVersion, + const boost::optional<DatabaseVersion>& dbVersion, + const NamespaceString& nss, + const BSONObj& cmdObj, + const BSONObj& response, + BSONObjBuilder* result); + + // Two-phase protocol to run a findAndModify command without a shard key or _id. + static void _runCommandWithoutShardKey(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& cmdObj, + bool isExplain, + BSONObjBuilder* result); + + // Command invocation to be used if a shard key is specified or the collection is unsharded. + static void _runCommand(OperationContext* opCtx, + const ShardId& shardId, + const boost::optional<ShardVersion>& shardVersion, + const boost::optional<DatabaseVersion>& dbVersion, + const NamespaceString& nss, + const BSONObj& cmdObj, + bool isExplain, + BSONObjBuilder* result); + + // TODO SERVER-67429: Remove this function. + static void _handleWouldChangeOwningShardErrorRetryableWriteLegacy( + OperationContext* opCtx, + const ShardId& shardId, + const boost::optional<ShardVersion>& shardVersion, + const boost::optional<DatabaseVersion>& dbVersion, + const NamespaceString& nss, + const BSONObj& cmdObj, + BSONObjBuilder* result); + + // Update related command execution metrics. + UpdateMetrics _updateMetrics; +}; + +} // namespace mongo diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp index 98c4daef9a6..39defdab465 100644 --- a/src/mongo/s/commands/cluster_write_cmd.cpp +++ b/src/mongo/s/commands/cluster_write_cmd.cpp @@ -270,17 +270,36 @@ UpdateShardKeyResult handleWouldChangeOwningShardErrorTransaction( return UpdateShardKeyResult{sharedBlock->updatedShardKey, std::move(upsertedId)}; } -/** - * Changes the shard key for the document if the response object contains a WouldChangeOwningShard - * error. If the original command was sent as a retryable write, starts a transaction on the same - * session and txnNum, deletes the original document, inserts the new one, and commits the - * transaction. If the original command is part of a transaction, deletes the original document and - * inserts the new one. Returns whether or not we actually complete the delete and insert. - */ -bool handleWouldChangeOwningShardError(OperationContext* opCtx, - BatchedCommandRequest* request, - BatchedCommandResponse* response, - BatchWriteExecStats stats) { +void updateHostsTargetedMetrics(OperationContext* opCtx, + BatchedCommandRequest::BatchType batchType, + int nShardsOwningChunks, + int nShardsTargeted) { + NumHostsTargetedMetrics::QueryType writeType; + switch (batchType) { + case BatchedCommandRequest::BatchType_Insert: + writeType = NumHostsTargetedMetrics::QueryType::kInsertCmd; + break; + case BatchedCommandRequest::BatchType_Update: + writeType = NumHostsTargetedMetrics::QueryType::kUpdateCmd; + break; + case BatchedCommandRequest::BatchType_Delete: + writeType = NumHostsTargetedMetrics::QueryType::kDeleteCmd; + break; + + MONGO_UNREACHABLE; + } + + auto targetType = NumHostsTargetedMetrics::get(opCtx).parseTargetType( + opCtx, nShardsTargeted, nShardsOwningChunks); + NumHostsTargetedMetrics::get(opCtx).addNumHostsTargeted(writeType, targetType); +} + +} // namespace + +bool ClusterWriteCmd::handleWouldChangeOwningShardError(OperationContext* opCtx, + BatchedCommandRequest* request, + BatchedCommandResponse* response, + BatchWriteExecStats stats) { auto txnRouter = TransactionRouter::get(opCtx); bool isRetryableWrite = opCtx->getTxnNumber() && !txnRouter; @@ -426,32 +445,6 @@ bool handleWouldChangeOwningShardError(OperationContext* opCtx, return updatedShardKey; } -void updateHostsTargetedMetrics(OperationContext* opCtx, - BatchedCommandRequest::BatchType batchType, - int nShardsOwningChunks, - int nShardsTargeted) { - NumHostsTargetedMetrics::QueryType writeType; - switch (batchType) { - case BatchedCommandRequest::BatchType_Insert: - writeType = NumHostsTargetedMetrics::QueryType::kInsertCmd; - break; - case BatchedCommandRequest::BatchType_Update: - writeType = NumHostsTargetedMetrics::QueryType::kUpdateCmd; - break; - case BatchedCommandRequest::BatchType_Delete: - writeType = NumHostsTargetedMetrics::QueryType::kDeleteCmd; - break; - - MONGO_UNREACHABLE; - } - - auto targetType = NumHostsTargetedMetrics::get(opCtx).parseTargetType( - opCtx, nShardsTargeted, nShardsOwningChunks); - NumHostsTargetedMetrics::get(opCtx).addNumHostsTargeted(writeType, targetType); -} - -} // namespace - void ClusterWriteCmd::_commandOpWrite(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& command, diff --git a/src/mongo/s/commands/cluster_write_cmd.h b/src/mongo/s/commands/cluster_write_cmd.h index 9151de7cd7b..f29f4182808 100644 --- a/src/mongo/s/commands/cluster_write_cmd.h +++ b/src/mongo/s/commands/cluster_write_cmd.h @@ -35,6 +35,7 @@ #include "mongo/db/commands/write_commands_common.h" #include "mongo/db/not_primary_error_tracker.h" #include "mongo/s/multi_statement_transaction_requests_sender.h" +#include "mongo/s/write_ops/batch_write_exec.h" #include "mongo/s/write_ops/batched_command_request.h" namespace mongo { @@ -66,6 +67,19 @@ public: return Command::ReadWriteType::kWrite; } + /** + * Changes the shard key for the document if the response object contains a + * WouldChangeOwningShard error. If the original command was sent as a retryable write, starts a + * transaction on the same session and txnNum, deletes the original document, inserts the new + * one, and commits the transaction. If the original command is part of a transaction, deletes + * the original document and inserts the new one. Returns whether or not we actually complete + * the delete and insert. + */ + static bool handleWouldChangeOwningShardError(OperationContext* opCtx, + BatchedCommandRequest* request, + BatchedCommandResponse* response, + BatchWriteExecStats stats); + protected: class InvocationBase; diff --git a/src/mongo/s/commands/cluster_write_without_shard_key_cmd.cpp b/src/mongo/s/commands/cluster_write_without_shard_key_cmd.cpp index 7d9a470dc4a..de5297e798f 100644 --- a/src/mongo/s/commands/cluster_write_without_shard_key_cmd.cpp +++ b/src/mongo/s/commands/cluster_write_without_shard_key_cmd.cpp @@ -34,6 +34,8 @@ #include "mongo/db/shard_id.h" #include "mongo/logv2/log.h" #include "mongo/s/cluster_commands_helpers.h" +#include "mongo/s/commands/cluster_find_and_modify_cmd.h" +#include "mongo/s/commands/cluster_write_cmd.h" #include "mongo/s/grid.h" #include "mongo/s/is_mongos.h" #include "mongo/s/multi_statement_transaction_requests_sender.h" @@ -173,6 +175,45 @@ public: Shard::RetryPolicy::kNoRetry); auto response = uassertStatusOK(ars.next().swResponse); + if (getStatusFromCommandResult(response.data) == ErrorCodes::WouldChangeOwningShard) { + if (commandName == "update") { + auto request = BatchedCommandRequest::parseUpdate( + OpMsgRequest::fromDBAndBody(ns().db(), cmdObj)); + + write_ops::WriteError error(0, getStatusFromCommandResult(response.data)); + error.setIndex(0); + BatchedCommandResponse emulatedResponse; + emulatedResponse.setStatus(Status::OK()); + emulatedResponse.setN(0); + emulatedResponse.addToErrDetails(std::move(error)); + + auto wouldChangeOwningShardSucceeded = + ClusterWriteCmd::handleWouldChangeOwningShardError( + opCtx, &request, &emulatedResponse, {}); + + if (wouldChangeOwningShardSucceeded) { + BSONObjBuilder bob(emulatedResponse.toBSON()); + bob.append("ok", 1); + auto res = bob.obj(); + return Response(res, shardId.toString()); + } + } else { + // Append the $db field to satisfy findAndModify command object parser. + BSONObjBuilder bob(cmdObj); + bob.append("$db", nss.dbName().toString()); + auto writeCmdObjWithDb = bob.obj(); + + BSONObjBuilder res; + FindAndModifyCmd::handleWouldChangeOwningShardError( + opCtx, + shardId, + nss, + writeCmdObjWithDb, + getStatusFromCommandResult(response.data), + &res); + return Response(res.obj(), shardId.toString()); + } + } return Response(response.data, shardId.toString()); } |