diff options
author | Sanika Phanse <sanika.phanse@mongodb.com> | 2023-02-24 20:47:38 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-02-25 03:38:11 +0000 |
commit | 8a53ac6194cbd298e517fc4102263facf9f66ede (patch) | |
tree | 47aad707e7571dccea43f4b5b2c888691e360603 | |
parent | 7820a2ce672fffb91b9f03bd72c1cf827efc391e (diff) | |
download | mongo-8a53ac6194cbd298e517fc4102263facf9f66ede.tar.gz |
SERVER-69918 Produce upsert document in _clusterQueryWithoutShardKey
19 files changed, 495 insertions, 107 deletions
diff --git a/jstests/sharding/query/collation_targeting.js b/jstests/sharding/query/collation_targeting.js index fa1a4dda73e..c5795ac2328 100644 --- a/jstests/sharding/query/collation_targeting.js +++ b/jstests/sharding/query/collation_targeting.js @@ -491,7 +491,7 @@ assert.eq(1, writeRes.nMatched); // Sharded upsert that does not target a single shard can now be executed with a two phase // write protocol that will target at most 1 matching document. if (WriteWithoutShardKeyTestUtil.isWriteWithoutShardKeyFeatureEnabled(testDB)) { - // TODO: SERVER-69918 Implement upsert behavior for _clusterQueryWithoutShardKey + // TODO: SERVER-73057 Implement upsert behavior for _clusterQueryWithoutShardKey } else { // Upsert must always be single-shard. diff --git a/jstests/sharding/query/collation_targeting_inherited.js b/jstests/sharding/query/collation_targeting_inherited.js index a315fcd1968..9924425e4c1 100644 --- a/jstests/sharding/query/collation_targeting_inherited.js +++ b/jstests/sharding/query/collation_targeting_inherited.js @@ -517,7 +517,7 @@ assert.eq(1, writeRes.nMatched); // Sharded upsert that does not target a single shard can now be executed with a two phase // write protocol that will target at most 1 matching document. if (WriteWithoutShardKeyTestUtil.isWriteWithoutShardKeyFeatureEnabled(testDB)) { - // TODO: SERVER-69918 Implement upsert behavior for _clusterQueryWithoutShardKey + // TODO: SERVER-73057 Implement upsert behavior for _clusterQueryWithoutShardKey } else { // Upsert must always be single-shard. diff --git a/jstests/sharding/regex_targeting.js b/jstests/sharding/regex_targeting.js index d559aa53657..f07eaf95bfa 100644 --- a/jstests/sharding/regex_targeting.js +++ b/jstests/sharding/regex_targeting.js @@ -195,7 +195,7 @@ assert.commandWorked(collNested.update({'a.b': /abcde-1/}, {a: {b: /abcde.*/}}, // Sharded updateOnes that do not directly target a shard can now use the two phase write // protocol to execute. if (WriteWithoutShardKeyTestUtil.isWriteWithoutShardKeyFeatureEnabled(st.s)) { - // TODO: SERVER-69918 Implement upsert behavior for _clusterQueryWithoutShardKey + // TODO: SERVER-73057 Implement upsert behavior for _clusterQueryWithoutShardKey } else { // // diff --git a/jstests/sharding/updateOne_without_shard_key/cluster_query_without_shard_key_produces_upsert_document.js b/jstests/sharding/updateOne_without_shard_key/cluster_query_without_shard_key_produces_upsert_document.js new file mode 100644 index 00000000000..d82cad8424f --- /dev/null +++ b/jstests/sharding/updateOne_without_shard_key/cluster_query_without_shard_key_produces_upsert_document.js @@ -0,0 +1,236 @@ +/** + * Test success of findAndModify and update commands without shard key, no document matching on the + * filter and {upsert: true}. + * + * @tags: [ + * requires_sharding, + * uses_transactions, + * uses_multi_shard_transaction, + * featureFlagUpdateOneWithoutShardKey, + * ] + */ + +(function() { +"use strict"; + +load("jstests/sharding/updateOne_without_shard_key/libs/write_without_shard_key_test_util.js"); + +// 2 shards single node, 1 mongos, 1 config server 3-node +const st = new ShardingTest({}); +const dbName = "testDb"; +const collectionName = "testColl"; +const nss = dbName + "." + collectionName; +const splitPoint = 0; + +// Sets up a 2 shard cluster using 'x' as a shard key where Shard 0 owns x < +// the splitpoint and Shard 1 >= splitpoint. +WriteWithoutShardKeyTestUtil.setupShardedCollection( + st, nss, {x: 1}, [{x: splitPoint}], [{query: {x: splitPoint}, shard: st.shard1.shardName}]); + +const testCases = [ + { + logMessage: "FindAndModify, replacement style update, should upsert.", + cmdObj: { + _clusterQueryWithoutShardKey: 1, + writeCmd: { + findAndModify: collectionName, + query: {a: 0}, + update: {x: -1, y: 7}, + upsert: true, + }, + stmtId: NumberInt(0), + txnNumber: NumberLong(0), + lsid: {id: UUID()}, + startTransaction: true, + autocommit: false + }, + expectedMods: [{'x': -1}, {'y': 7}], + upsertRequired: true, + }, + { + logMessage: "FindAndModify, pipeline style update, should upsert.", + cmdObj: { + _clusterQueryWithoutShardKey: 1, + writeCmd: { + findAndModify: collectionName, + query: {a: 0}, + update: [{$set: {y: 3}}, {$set: {x: 5}}], + upsert: true, + }, + stmtId: NumberInt(0), + txnNumber: NumberLong(0), + lsid: {id: UUID()}, + startTransaction: true, + autocommit: false + }, + expectedMods: [{'a': 0}, {'x': 5}, {'y': 3}], + upsertRequired: true, + }, + { + logMessage: "FindAndModify, modification style update, should upsert.", + cmdObj: { + _clusterQueryWithoutShardKey: 1, + writeCmd: { + findAndModify: collectionName, + query: {a: -1}, + update: {$inc: {a: 1}}, + upsert: true, + }, + stmtId: NumberInt(0), + txnNumber: NumberLong(0), + lsid: {id: UUID()}, + startTransaction: true, + autocommit: false + }, + expectedMods: [{'a': 0}], + upsertRequired: true, + }, + { + logMessage: "Update, replacement style update, should upsert.", + cmdObj: { + _clusterQueryWithoutShardKey: 1, + writeCmd: { + update: collectionName, + updates: [{q: {a: 0}, u: {x: -1, y: 7}, upsert: true}], + }, + stmtId: NumberInt(0), + txnNumber: NumberLong(0), + lsid: {id: UUID()}, + startTransaction: true, + autocommit: false + }, + expectedMods: [{'x': -1}, {'y': 7}], + upsertRequired: true, + }, + { + logMessage: "Update, pipeline style update, should upsert.", + cmdObj: { + _clusterQueryWithoutShardKey: 1, + writeCmd: { + update: collectionName, + updates: [{q: {a: 0}, u: [{$set: {y: 3}}, {$set: {x: 5}}], upsert: true}], + }, + stmtId: NumberInt(0), + txnNumber: NumberLong(0), + lsid: {id: UUID()}, + startTransaction: true, + autocommit: false + }, + expectedMods: [{'a': 0}, {'x': 5}, {'y': 3}], + upsertRequired: true, + }, + { + logMessage: "Update, modification style update, should upsert.", + cmdObj: { + _clusterQueryWithoutShardKey: 1, + writeCmd: { + update: collectionName, + updates: [{q: {a: 0}, u: {$inc: {a: 1}}, upsert: true}], + }, + stmtId: NumberInt(0), + txnNumber: NumberLong(0), + lsid: {id: UUID()}, + startTransaction: true, + autocommit: false + }, + expectedMods: [{'a': 1}], + upsertRequired: true, + }, + { + logMessage: "Update, arrayFilters, case-insensitive collation, should upsert.", + cmdObj: { + _clusterQueryWithoutShardKey: 1, + writeCmd: { + update: collectionName, + updates: [{ + q: {x: ["bar", "BAR", "foo"]}, + u: {$set: {'x.$[b]': 'FOO'}}, + upsert: true, + arrayFilters: [{'b': {$eq: 'bar'}}], + collation: {locale: 'en_US', strength: 2}, + }], + }, + stmtId: NumberInt(0), + txnNumber: NumberLong(0), + lsid: {id: UUID()}, + startTransaction: true, + autocommit: false + }, + expectedMods: [{x: ["FOO", "FOO", "foo"]}], + upsertRequired: true, + }, + { + logMessage: "No document matches query, {upsert: false}, no modifications expected.", + cmdObj: { + _clusterQueryWithoutShardKey: 1, + writeCmd: { + update: collectionName, + updates: [{q: {a: 0}, u: {$inc: {a: 1}}, upsert: false}], + }, + stmtId: NumberInt(0), + txnNumber: NumberLong(0), + lsid: {id: UUID()}, + startTransaction: true, + autocommit: false + }, + expectedMods: [], + upsertRequired: false, + }, + { + logMessage: "Update, incorrect modification style update, {upsert: true}, should error.", + cmdObj: { + _clusterQueryWithoutShardKey: 1, + writeCmd: { + update: collectionName, + updates: [{q: {a: 0}, u: {$match: {a: 1}}, upsert: true}], + }, + stmtId: NumberInt(0), + txnNumber: NumberLong(0), + lsid: {id: UUID()}, + startTransaction: true, + autocommit: false + }, + errorCode: ErrorCodes.FailedToParse, + }, + { + logMessage: "Update immutable _id field, errors.", + cmdObj: { + _clusterQueryWithoutShardKey: 1, + writeCmd: { + findAndModify: collectionName, + query: {_id: 0}, + update: {_id: -1, y: 7}, + upsert: true, + }, + stmtId: NumberInt(0), + txnNumber: NumberLong(0), + lsid: {id: UUID()}, + startTransaction: true, + autocommit: false + }, + errorCode: ErrorCodes.ImmutableField, + }, +]; + +testCases.forEach(testCase => { + jsTest.log(testCase.logMessage + '\n' + tojson(testCase.cmdObj)); + + if (testCase.errorCode) { + assert.commandFailedWithCode(st.getDB(dbName).runCommand(testCase.cmdObj), + testCase.errorCode); + } else { + const res = assert.commandWorked(st.getDB(dbName).runCommand(testCase.cmdObj)); + assert.eq(res.upsertRequired, testCase.upsertRequired, res); + testCase.expectedMods.forEach(mod => { + let field = Object.keys(mod)[0]; + assert.eq(res.targetDoc[field], mod[field]); + }); + + if (!testCase.upsertRequired) { + assert.eq(null, res.targetDoc, res.targetDoc); + } + } +}); + +st.stop(); +})(); diff --git a/jstests/sharding/update_compound_shard_key.js b/jstests/sharding/update_compound_shard_key.js index 0a04ec716bc..567e2be9987 100644 --- a/jstests/sharding/update_compound_shard_key.js +++ b/jstests/sharding/update_compound_shard_key.js @@ -225,7 +225,7 @@ assert.eq(1, sessionDB.coll.find(updateDocTxn).itcount()); // Shard key field modifications do not have to specify full shard key. if (WriteWithoutShardKeyTestUtil.isWriteWithoutShardKeyFeatureEnabled(st.s)) { - // TODO: SERVER-69918 Implement upsert behavior for _clusterQueryWithoutShardKey + // TODO: SERVER-73057 Implement upsert behavior for _clusterQueryWithoutShardKey } else { // Full shard key not specified in query. @@ -348,7 +348,7 @@ assert.eq(1, sessionDB.coll.find(upsertDocTxn["$set"]).itcount()); // Shard key field modifications do not have to specify full shard key. if (WriteWithoutShardKeyTestUtil.isWriteWithoutShardKeyFeatureEnabled(st.s)) { - // TODO: SERVER-69918 Implement upsert behavior for _clusterQueryWithoutShardKey + // TODO: SERVER-73057 Implement upsert behavior for _clusterQueryWithoutShardKey } else { // Full shard key not specified in query. @@ -501,7 +501,7 @@ assert.commandWorked(session.commitTransaction_forTesting()); assert.eq(1, sessionDB.coll.find(upsertProjectTxnDoc).itcount()); if (WriteWithoutShardKeyTestUtil.isWriteWithoutShardKeyFeatureEnabled(st.s)) { - // TODO: SERVER-69918 Implement upsert behavior for _clusterQueryWithoutShardKey + // TODO: SERVER-73057 Implement upsert behavior for _clusterQueryWithoutShardKey } else { // Full shard key not specified in query. assert.commandFailedWithCode( diff --git a/jstests/sharding/update_sharded.js b/jstests/sharding/update_sharded.js index 9c5d51a8785..af9176a91a4 100644 --- a/jstests/sharding/update_sharded.js +++ b/jstests/sharding/update_sharded.js @@ -58,7 +58,7 @@ for (let i = 0; i < 2; i++) { assert.commandWorked(coll.update({_id: 2}, {key: 2, other: 2})); assert.commandWorked(coll.update({_id: 3}, {key: 3, other: 3})); - // TODO: SERVER-69918 Implement upsert behavior for _clusterQueryWithoutShardKey + // TODO: SERVER-73057 Implement upsert behavior for _clusterQueryWithoutShardKey if (!WriteWithoutShardKeyTestUtil.isWriteWithoutShardKeyFeatureEnabled(sessionDb)) { // do a replacement-style update which queries the shard key and keeps it constant assert.commandWorked(coll.update({key: 4}, {_id: 4, key: 4}, {upsert: true})); @@ -80,7 +80,7 @@ for (let i = 0; i < 2; i++) { assert.commandWorked(coll.update({_id: 1, key: 1}, {$set: {foo: 2}})); - // TODO: SERVER-69918 Implement upsert behavior for _clusterQueryWithoutShardKey + // TODO: SERVER-73057 Implement upsert behavior for _clusterQueryWithoutShardKey if (!WriteWithoutShardKeyTestUtil.isWriteWithoutShardKeyFeatureEnabled(sessionDb)) { coll.update({key: 17}, {$inc: {x: 5}}, true); assert.eq(5, coll.findOne({key: 17}).x, "up1"); diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp index e4d4a533534..cdc7965ddfa 100644 --- a/src/mongo/db/commands/find_and_modify.cpp +++ b/src/mongo/db/commands/find_and_modify.cpp @@ -44,16 +44,11 @@ #include "mongo/db/fle_crud.h" #include "mongo/db/matcher/extensions_callback_real.h" #include "mongo/db/namespace_string.h" -#include "mongo/db/operation_context.h" -#include "mongo/db/ops/delete_request_gen.h" #include "mongo/db/ops/insert.h" #include "mongo/db/ops/parsed_delete.h" #include "mongo/db/ops/parsed_update.h" -#include "mongo/db/ops/update_request.h" -#include "mongo/db/ops/write_ops_exec.h" #include "mongo/db/ops/write_ops_retryability.h" #include "mongo/db/query/collection_query_info.h" -#include "mongo/db/query/explain.h" #include "mongo/db/query/get_executor.h" #include "mongo/db/query/plan_executor.h" #include "mongo/db/query/plan_summary_stats.h" @@ -69,6 +64,7 @@ #include "mongo/db/transaction/retryable_writes_stats.h" #include "mongo/db/transaction/transaction_participant.h" #include "mongo/db/transaction_validation.h" +#include "mongo/db/update/update_util.h" #include "mongo/db/write_concern.h" #include "mongo/logv2/log.h" #include "mongo/s/would_change_owning_shard_exception.h" @@ -147,32 +143,6 @@ void validate(const write_ops::FindAndModifyCommandRequest& request) { } } -void makeUpdateRequest(OperationContext* opCtx, - const write_ops::FindAndModifyCommandRequest& request, - boost::optional<ExplainOptions::Verbosity> explain, - UpdateRequest* requestOut) { - requestOut->setQuery(request.getQuery()); - requestOut->setProj(request.getFields().value_or(BSONObj())); - invariant(request.getUpdate()); - requestOut->setUpdateModification(*request.getUpdate()); - requestOut->setLegacyRuntimeConstants( - request.getLegacyRuntimeConstants().value_or(Variables::generateRuntimeConstants(opCtx))); - requestOut->setLetParameters(request.getLet()); - requestOut->setSort(request.getSort().value_or(BSONObj())); - requestOut->setHint(request.getHint()); - requestOut->setCollation(request.getCollation().value_or(BSONObj())); - requestOut->setArrayFilters(request.getArrayFilters().value_or(std::vector<BSONObj>())); - requestOut->setUpsert(request.getUpsert().value_or(false)); - requestOut->setReturnDocs((request.getNew().value_or(false)) ? UpdateRequest::RETURN_NEW - : UpdateRequest::RETURN_OLD); - requestOut->setMulti(false); - requestOut->setExplain(explain); - - requestOut->setYieldPolicy(opCtx->inMultiDocumentTransaction() - ? PlanYieldPolicy::YieldPolicy::INTERRUPT_ONLY - : PlanYieldPolicy::YieldPolicy::YIELD_AUTO); -} - void makeDeleteRequest(OperationContext* opCtx, const write_ops::FindAndModifyCommandRequest& request, bool explain, @@ -602,7 +572,7 @@ void CmdFindAndModify::Invocation::explain(OperationContext* opCtx, } else { auto updateRequest = UpdateRequest(); updateRequest.setNamespaceString(nss); - makeUpdateRequest(opCtx, request, verbosity, &updateRequest); + update::makeUpdateRequest(opCtx, request, verbosity, &updateRequest); const ExtensionsCallbackReal extensionsCallback(opCtx, &updateRequest.getNamespaceString()); ParsedUpdate parsedUpdate(opCtx, &updateRequest, extensionsCallback); @@ -726,7 +696,7 @@ write_ops::FindAndModifyCommandReply CmdFindAndModify::Invocation::typedRun( auto updateRequest = UpdateRequest(); updateRequest.setNamespaceString(nsString); const auto verbosity = boost::none; - makeUpdateRequest(opCtx, req, verbosity, &updateRequest); + update::makeUpdateRequest(opCtx, req, verbosity, &updateRequest); if (opCtx->getTxnNumber()) { updateRequest.setStmtIds({stmtId}); diff --git a/src/mongo/db/exec/update_stage.cpp b/src/mongo/db/exec/update_stage.cpp index 3abd591d463..99c7c28c96b 100644 --- a/src/mongo/db/exec/update_stage.cpp +++ b/src/mongo/db/exec/update_stage.cpp @@ -41,8 +41,8 @@ #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/update/path_support.h" -#include "mongo/db/update/produce_document_for_upsert.h" #include "mongo/db/update/update_oplog_entry_serialization.h" +#include "mongo/db/update/update_util.h" #include "mongo/logv2/log.h" #include "mongo/s/grid.h" #include "mongo/s/would_change_owning_shard_exception.h" diff --git a/src/mongo/db/exec/upsert_stage.cpp b/src/mongo/db/exec/upsert_stage.cpp index c4917b191ab..502b68b07a4 100644 --- a/src/mongo/db/exec/upsert_stage.cpp +++ b/src/mongo/db/exec/upsert_stage.cpp @@ -38,8 +38,8 @@ #include "mongo/db/query/query_feature_flags_gen.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/operation_sharding_state.h" -#include "mongo/db/update/produce_document_for_upsert.h" #include "mongo/db/update/storage_validation.h" +#include "mongo/db/update/update_util.h" #include "mongo/s/would_change_owning_shard_exception.h" namespace mongo { diff --git a/src/mongo/db/update/SConscript b/src/mongo/db/update/SConscript index e4806030657..2b3d9973aeb 100644 --- a/src/mongo/db/update/SConscript +++ b/src/mongo/db/update/SConscript @@ -90,7 +90,7 @@ env.Library( target='update_driver', source=[ 'update_driver.cpp', - 'produce_document_for_upsert.cpp', + 'update_util.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/common', diff --git a/src/mongo/db/update/produce_document_for_upsert.cpp b/src/mongo/db/update/update_util.cpp index b541d2b3aa1..90f41da7dd1 100644 --- a/src/mongo/db/update/produce_document_for_upsert.cpp +++ b/src/mongo/db/update/update_util.cpp @@ -27,7 +27,7 @@ * it in the license file. */ -#include "mongo/db/update/produce_document_for_upsert.h" +#include "mongo/db/update/update_util.h" #include "mongo/bson/mutable/algorithm.h" #include "mongo/db/s/operation_sharding_state.h" @@ -144,5 +144,31 @@ void ensureIdFieldIsFirst(mutablebson::Document* doc, bool generateOIDIfMissing) } } +void makeUpdateRequest(OperationContext* opCtx, + const write_ops::FindAndModifyCommandRequest& request, + boost::optional<ExplainOptions::Verbosity> explain, + UpdateRequest* requestOut) { + requestOut->setQuery(request.getQuery()); + requestOut->setProj(request.getFields().value_or(BSONObj())); + invariant(request.getUpdate()); + requestOut->setUpdateModification(*request.getUpdate()); + requestOut->setLegacyRuntimeConstants( + request.getLegacyRuntimeConstants().value_or(Variables::generateRuntimeConstants(opCtx))); + requestOut->setLetParameters(request.getLet()); + requestOut->setSort(request.getSort().value_or(BSONObj())); + requestOut->setHint(request.getHint()); + requestOut->setCollation(request.getCollation().value_or(BSONObj())); + requestOut->setArrayFilters(request.getArrayFilters().value_or(std::vector<BSONObj>())); + requestOut->setUpsert(request.getUpsert().value_or(false)); + requestOut->setReturnDocs((request.getNew().value_or(false)) ? UpdateRequest::RETURN_NEW + : UpdateRequest::RETURN_OLD); + requestOut->setMulti(false); + requestOut->setExplain(explain); + + requestOut->setYieldPolicy(opCtx->inMultiDocumentTransaction() + ? PlanYieldPolicy::YieldPolicy::INTERRUPT_ONLY + : PlanYieldPolicy::YieldPolicy::YIELD_AUTO); +} + } // namespace update } // namespace mongo diff --git a/src/mongo/db/update/produce_document_for_upsert.h b/src/mongo/db/update/update_util.h index 3056d8222a0..edec49f6f8a 100644 --- a/src/mongo/db/update/produce_document_for_upsert.h +++ b/src/mongo/db/update/update_util.h @@ -48,5 +48,12 @@ void produceDocumentForUpsert(OperationContext* opCtx, void ensureIdFieldIsFirst(mutablebson::Document* doc, bool generateOIDIfMissing); void assertPathsNotArray(const mutablebson::Document& document, const FieldRefSet& paths); +/** + * Parse FindAndModify update command request into an updateRequest. + */ +void makeUpdateRequest(OperationContext* opCtx, + const write_ops::FindAndModifyCommandRequest& request, + boost::optional<ExplainOptions::Verbosity> explain, + UpdateRequest* requestOut); } // namespace update } // namespace mongo diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index a243129bbd6..45591a29d5f 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -42,6 +42,7 @@ env.Library( '$BUILD_DIR/mongo/db/commands/server_status_core', '$BUILD_DIR/mongo/db/fle_crud', '$BUILD_DIR/mongo/db/not_primary_error_tracker', + '$BUILD_DIR/mongo/db/ops/parsed_update', '$BUILD_DIR/mongo/db/pipeline/pipeline', '$BUILD_DIR/mongo/db/pipeline/process_interface/mongos_process_interface', '$BUILD_DIR/mongo/db/timeseries/timeseries_conversion_util', @@ -698,6 +699,7 @@ env.CppUnitTest( LIBDEPS=[ '$BUILD_DIR/mongo/db/auth/authmocks', '$BUILD_DIR/mongo/db/mongohasher', + '$BUILD_DIR/mongo/db/ops/parsed_update', '$BUILD_DIR/mongo/db/ops/write_ops_parsers_test_helpers', '$BUILD_DIR/mongo/db/pipeline/process_interface/mongos_process_interface_factory', '$BUILD_DIR/mongo/db/query/query_test_service_context', diff --git a/src/mongo/s/commands/SConscript b/src/mongo/s/commands/SConscript index f6c5813816c..f96e38a15ce 100644 --- a/src/mongo/s/commands/SConscript +++ b/src/mongo/s/commands/SConscript @@ -139,6 +139,7 @@ env.Library( '$BUILD_DIR/mongo/db/index_commands_idl', '$BUILD_DIR/mongo/db/initialize_api_parameters', '$BUILD_DIR/mongo/db/internal_transactions_feature_flag', + '$BUILD_DIR/mongo/db/ops/parsed_update', '$BUILD_DIR/mongo/db/query/command_request_response', '$BUILD_DIR/mongo/db/query/cursor_response_idl', '$BUILD_DIR/mongo/db/query/map_reduce_output_format', diff --git a/src/mongo/s/commands/cluster_query_without_shard_key_cmd.cpp b/src/mongo/s/commands/cluster_query_without_shard_key_cmd.cpp index 2cf70715f31..1ec30066770 100644 --- a/src/mongo/s/commands/cluster_query_without_shard_key_cmd.cpp +++ b/src/mongo/s/commands/cluster_query_without_shard_key_cmd.cpp @@ -26,11 +26,12 @@ * exception statement from all source files in the program, then also delete * it in the license file. */ - #include "mongo/db/auth/authorization_session.h" #include "mongo/db/commands.h" #include "mongo/db/internal_transactions_feature_flag_gen.h" +#include "mongo/db/matcher/extensions_callback_noop.h" #include "mongo/db/query/collation/collator_factory_interface.h" +#include "mongo/db/update/update_util.h" #include "mongo/logv2/log.h" #include "mongo/s/cluster_commands_helpers.h" #include "mongo/s/grid.h" @@ -39,6 +40,7 @@ #include "mongo/s/request_types/cluster_commands_without_shard_key_gen.h" #include "mongo/s/shard_key_pattern_query_util.h" #include "mongo/s/write_ops/batch_write_op.h" +#include "mongo/s/write_ops/write_without_shard_key_util.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kCommand @@ -48,10 +50,9 @@ namespace { struct ParsedCommandInfo { BSONObj query; BSONObj collation; - int stmtId; - - ParsedCommandInfo(BSONObj query, BSONObj collation, int stmtId) - : query(query), collation(collation), stmtId(stmtId) {} + bool upsert = false; + int stmtId = kUninitializedStmtId; + boost::optional<UpdateRequest> updateRequest; }; struct AsyncRequestSenderResponseData { @@ -111,6 +112,65 @@ BSONObj createAggregateCmdObj(OperationContext* opCtx, return aggregate.toBSON({}); } +ParsedCommandInfo parseWriteCommand(OperationContext* opCtx, + StringData commandName, + const BSONObj& writeCmdObj) { + ParsedCommandInfo parsedInfo; + if (commandName == write_ops::UpdateCommandRequest::kCommandName) { + auto updateRequest = write_ops::UpdateCommandRequest::parse( + IDLParserContext("_clusterQueryWithoutShardKeyForUpdate"), writeCmdObj); + parsedInfo.query = updateRequest.getUpdates().front().getQ(); + + // In the batch write path, when the request is reconstructed to be passed to + // the two phase write protocol, only the stmtIds field is used. + if (auto stmtIds = updateRequest.getStmtIds()) { + parsedInfo.stmtId = stmtIds->front(); + } + + if ((parsedInfo.upsert = updateRequest.getUpdates().front().getUpsert())) { + parsedInfo.updateRequest = updateRequest.getUpdates().front(); + } + + if (auto parsedCollation = updateRequest.getUpdates().front().getCollation()) { + parsedInfo.collation = *parsedCollation; + } + } else if (commandName == write_ops::DeleteCommandRequest::kCommandName) { + auto deleteRequest = write_ops::DeleteCommandRequest::parse( + IDLParserContext("_clusterQueryWithoutShardKeyForDelete"), writeCmdObj); + parsedInfo.query = deleteRequest.getDeletes().front().getQ(); + + // In the batch write path, when the request is reconstructed to be passed to + // the two phase write protocol, only the stmtIds field is used. + if (auto stmtIds = deleteRequest.getStmtIds()) { + parsedInfo.stmtId = stmtIds->front(); + } + + if (auto parsedCollation = deleteRequest.getDeletes().front().getCollation()) { + parsedInfo.collation = *parsedCollation; + } + } else if (commandName == write_ops::FindAndModifyCommandRequest::kCommandName || + commandName == write_ops::FindAndModifyCommandRequest::kCommandAlias) { + auto findAndModifyRequest = write_ops::FindAndModifyCommandRequest::parse( + IDLParserContext("_clusterQueryWithoutShardKeyFindAndModify"), writeCmdObj); + parsedInfo.query = findAndModifyRequest.getQuery(); + parsedInfo.stmtId = findAndModifyRequest.getStmtId().value_or(kUninitializedStmtId); + + if ((parsedInfo.upsert = findAndModifyRequest.getUpsert().get_value_or(false))) { + parsedInfo.updateRequest = UpdateRequest{}; + parsedInfo.updateRequest->setNamespaceString(findAndModifyRequest.getNamespace()); + update::makeUpdateRequest( + opCtx, findAndModifyRequest, boost::none, parsedInfo.updateRequest.get_ptr()); + } + + if (auto parsedCollation = findAndModifyRequest.getCollation()) { + parsedInfo.collation = *parsedCollation; + } + } else { + uasserted(ErrorCodes::InvalidOptions, "Not a supported batch write command"); + } + return parsedInfo; +} + class ClusterQueryWithoutShardKeyCmd : public TypedCommand<ClusterQueryWithoutShardKeyCmd> { public: using Request = ClusterQueryWithoutShardKey; @@ -134,63 +194,15 @@ public: CommandHelpers::parseNsCollectionRequired(ns().dbName(), request().getWriteCmd())); const auto cri = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, nss)); - auto parsedInfoFromRequest = [&] { - const auto commandName = request().getWriteCmd().firstElementFieldNameStringData(); - - // Parse into OpMsgRequest to append the $db field, which is required for command - // parsing. - const auto opMsgRequest = - OpMsgRequest::fromDBAndBody(nss.db(), request().getWriteCmd()); - BSONObj query; - BSONObj collation; - int stmtId = kUninitializedStmtId; - - if (commandName == write_ops::UpdateCommandRequest::kCommandName) { - auto updateRequest = write_ops::UpdateCommandRequest::parse( - IDLParserContext("_clusterQueryWithoutShardKeyForUpdate"), - opMsgRequest.body); - query = updateRequest.getUpdates().front().getQ(); - - // In the batch write path, when the request is reconstructed to be passed to - // the two phase write protocol, only the stmtIds field is used. - if (auto stmtIds = updateRequest.getStmtIds()) { - stmtId = stmtIds->front(); - } - - if (auto parsedCollation = updateRequest.getUpdates().front().getCollation()) { - collation = *parsedCollation; - } - } else if (commandName == write_ops::DeleteCommandRequest::kCommandName) { - auto deleteRequest = write_ops::DeleteCommandRequest::parse( - IDLParserContext("_clusterQueryWithoutShardKeyForDelete"), - opMsgRequest.body); - query = deleteRequest.getDeletes().front().getQ(); - - // In the batch write path, when the request is reconstructed to be passed to - // the two phase write protocol, only the stmtIds field is used. - if (auto stmtIds = deleteRequest.getStmtIds()) { - stmtId = stmtIds->front(); - } - - if (auto parsedCollation = deleteRequest.getDeletes().front().getCollation()) { - collation = *parsedCollation; - } - } else if (commandName == write_ops::FindAndModifyCommandRequest::kCommandName || - commandName == write_ops::FindAndModifyCommandRequest::kCommandAlias) { - auto findAndModifyRequest = write_ops::FindAndModifyCommandRequest::parse( - IDLParserContext("_clusterQueryWithoutShardKeyFindAndModify"), - opMsgRequest.body); - query = findAndModifyRequest.getQuery(); - stmtId = findAndModifyRequest.getStmtId().value_or(kUninitializedStmtId); - - if (auto parsedCollation = findAndModifyRequest.getCollation()) { - collation = *parsedCollation; - } - } else { - uasserted(ErrorCodes::InvalidOptions, "Not a supported batch write command"); - } - return ParsedCommandInfo(query.getOwned(), collation.getOwned(), stmtId); - }(); + // Parse into OpMsgRequest to append the $db field, which is required for command + // parsing. + const auto opMsgRequest = + OpMsgRequest::fromDBAndBody(nss.db(), request().getWriteCmd()); + + auto parsedInfoFromRequest = + parseWriteCommand(opCtx, + request().getWriteCmd().firstElementFieldNameStringData(), + opMsgRequest.body); auto allShardsContainingChunksForNs = getShardsToTarget(opCtx, cri.cm, nss, parsedInfoFromRequest); @@ -241,6 +253,13 @@ public: res.setShardId(boost::optional<mongo::StringData>(shardId)); } } + + if (!res.getTargetDoc() && parsedInfoFromRequest.upsert) { + res.setTargetDoc(write_without_shard_key::generateUpsertDocument( + opCtx, parsedInfoFromRequest.updateRequest.get())); + res.setUpsertRequired(true); + } + return res; } diff --git a/src/mongo/s/request_types/cluster_commands_without_shard_key.idl b/src/mongo/s/request_types/cluster_commands_without_shard_key.idl index d2618e3d3e6..4546fe19b02 100644 --- a/src/mongo/s/request_types/cluster_commands_without_shard_key.idl +++ b/src/mongo/s/request_types/cluster_commands_without_shard_key.idl @@ -48,6 +48,10 @@ structs: description: "The shard id of the target shard." type: string optional: true + upsertRequired: + description: "True if the targetDoc should be upserted." + type: bool + default: false clusterWriteWithoutShardKeyResponse: description: "The response for the '_clusterWriteWithoutShardKeyFind' command." is_command_reply: true diff --git a/src/mongo/s/write_ops/write_without_shard_key_util.cpp b/src/mongo/s/write_ops/write_without_shard_key_util.cpp index b71a9f8406a..2dda88a47d3 100644 --- a/src/mongo/s/write_ops/write_without_shard_key_util.cpp +++ b/src/mongo/s/write_ops/write_without_shard_key_util.cpp @@ -34,6 +34,7 @@ #include "mongo/db/query/collation/collation_index_key.h" #include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/db/transaction/transaction_api.h" +#include "mongo/db/update/update_util.h" #include "mongo/logv2/log.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/grid.h" @@ -48,6 +49,7 @@ namespace write_without_shard_key { namespace { constexpr auto kIdFieldName = "_id"_sd; +const FieldRef idFieldRef(kIdFieldName); // Used to do query validation for the _id field. const ShardKeyPattern kVirtualIdShardKey(BSON(kIdFieldName << 1)); @@ -104,6 +106,25 @@ bool shardKeyHasCollatableType(const BSONObj& shardKey) { } } // namespace +BSONObj generateUpsertDocument(OperationContext* opCtx, const UpdateRequest& updateRequest) { + ExtensionsCallbackNoop extensionsCallback = ExtensionsCallbackNoop(); + ParsedUpdate parsedUpdate(opCtx, &updateRequest, extensionsCallback); + uassertStatusOK(parsedUpdate.parseRequest()); + + const CanonicalQuery* canonicalQuery = + parsedUpdate.hasParsedQuery() ? parsedUpdate.getParsedQuery() : nullptr; + FieldRefSet immutablePaths; + immutablePaths.insert(&idFieldRef); + update::produceDocumentForUpsert(opCtx, + &updateRequest, + parsedUpdate.getDriver(), + canonicalQuery, + immutablePaths, + parsedUpdate.getDriver()->getDocument()); + + return parsedUpdate.getDriver()->getDocument().getObject(); +} + bool useTwoPhaseProtocol(OperationContext* opCtx, NamespaceString nss, bool isUpdateOrDelete, diff --git a/src/mongo/s/write_ops/write_without_shard_key_util.h b/src/mongo/s/write_ops/write_without_shard_key_util.h index da9955777b8..fe567b87fa1 100644 --- a/src/mongo/s/write_ops/write_without_shard_key_util.h +++ b/src/mongo/s/write_ops/write_without_shard_key_util.h @@ -32,12 +32,18 @@ #include "mongo/bson/bsonobj.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" +#include "mongo/db/ops/parsed_update.h" #include "mongo/s/request_types/cluster_commands_without_shard_key_gen.h" namespace mongo { namespace write_without_shard_key { /** + * Uses updateDriver to produce the document to insert. Only use when {upsert: true}. + */ +BSONObj generateUpsertDocument(OperationContext* opCtx, const UpdateRequest& updateRequest); + +/** * Returns true if we can use the two phase protocol to complete a single write without shard * key. **/ diff --git a/src/mongo/s/write_ops/write_without_shard_key_util_test.cpp b/src/mongo/s/write_ops/write_without_shard_key_util_test.cpp index 5fb78f3927a..d4a47885ca4 100644 --- a/src/mongo/s/write_ops/write_without_shard_key_util_test.cpp +++ b/src/mongo/s/write_ops/write_without_shard_key_util_test.cpp @@ -27,9 +27,11 @@ * it in the license file. */ +#include "mongo/db/ops/update_request.h" #include "mongo/idl/server_parameter_test_util.h" #include "mongo/logv2/log.h" #include "mongo/s/catalog_cache_test_fixture.h" +#include "mongo/s/concurrency/locker_mongos_client_observer.h" #include "mongo/s/sharding_feature_flags_gen.h" #include "mongo/s/write_ops/write_without_shard_key_util.h" #include "mongo/unittest/unittest.h" @@ -83,6 +85,23 @@ protected: } }; +class ProduceUpsertDocumentTest : public ServiceContextTest { +public: + void setUp() override { + ServiceContextTest::setUp(); + auto service = getServiceContext(); + service->registerClientObserver(std::make_unique<LockerMongosClientObserver>()); + _opCtx = makeOperationContext(); + } + + OperationContext* getOpCtx() const { + return _opCtx.get(); + } + +protected: + ServiceContext::UniqueOperationContext _opCtx; +}; + TEST_F(WriteWithoutShardKeyUtilTest, WriteQueryContainingFullShardKeyCanTargetSingleDocument) { RAIIServerParameterControllerForTest featureFlagController( "featureFlagUpdateOneWithoutShardKey", true); @@ -262,6 +281,83 @@ TEST_F(WriteWithoutShardKeyUtilTest, ASSERT_EQ(useTwoPhaseProtocol, true); } +TEST_F(ProduceUpsertDocumentTest, produceUpsertDocumentUsingReplacementUpdate) { + write_ops::UpdateOpEntry entry; + entry.setQ(BSON("_id" << 3)); + entry.setU(write_ops::UpdateModification::parseFromClassicUpdate(BSON("x" << 2))); + + write_ops::UpdateCommandRequest updateCommandRequest(kNss); + updateCommandRequest.setUpdates({entry}); + UpdateRequest updateRequest(updateCommandRequest.getUpdates().front()); + + auto doc = write_without_shard_key::generateUpsertDocument(getOpCtx(), updateRequest); + ASSERT_BSONOBJ_EQ(doc, fromjson("{ _id: 3, x: 2 }")); +} + +TEST_F(ProduceUpsertDocumentTest, produceUpsertDocumentUsingLetConstantAndPipelineUpdate) { + write_ops::UpdateOpEntry entry; + entry.setQ(BSON("_id" << 4 << "x" << 3)); + + std::vector<BSONObj> pipelineUpdate; + pipelineUpdate.push_back(fromjson("{$set: {'x': '$$constOne'}}")); + pipelineUpdate.push_back(fromjson("{$set: {'y': 3}}")); + entry.setU(pipelineUpdate); + + BSONObj constants = fromjson("{constOne: 'foo'}"); + entry.setC(constants); + + write_ops::UpdateCommandRequest updateCommandRequest(kNss); + updateCommandRequest.setUpdates({entry}); + UpdateRequest updateRequest(updateCommandRequest.getUpdates().front()); + + auto doc = write_without_shard_key::generateUpsertDocument(getOpCtx(), updateRequest); + ASSERT_BSONOBJ_EQ(doc, fromjson("{ _id: 4, x: 'foo', y: 3 }")); +} + +TEST_F(ProduceUpsertDocumentTest, produceUpsertDocumentUsingArrayFilterAndModificationUpdate) { + write_ops::UpdateOpEntry entry; + BSONArrayBuilder arrayBuilder; + arrayBuilder.append(BSON("a" << 90)); + entry.setQ(BSON("_id" << 4 << "x" << arrayBuilder.arr())); + entry.setU( + write_ops::UpdateModification::parseFromClassicUpdate(fromjson("{$inc: {'x.$[b].a': 3}}"))); + + auto arrayFilter = std::vector<BSONObj>{fromjson("{'b.a': {$gt: 85}}")}; + entry.setArrayFilters(arrayFilter); + + write_ops::UpdateCommandRequest updateCommandRequest(kNss); + updateCommandRequest.setUpdates({entry}); + UpdateRequest updateRequest(updateCommandRequest.getUpdates().front()); + + auto doc = write_without_shard_key::generateUpsertDocument(getOpCtx(), updateRequest); + ASSERT_BSONOBJ_EQ(doc, fromjson("{ _id: 4, x: [ { a: 93 } ] }")); +} + +TEST_F(ProduceUpsertDocumentTest, produceUpsertDocumentUsingCollation) { + write_ops::UpdateOpEntry entry; + BSONArrayBuilder arrayBuilder; + arrayBuilder.append(BSON("a" + << "BAR")); + arrayBuilder.append(BSON("a" + << "bar")); + arrayBuilder.append(BSON("a" + << "foo")); + entry.setQ(BSON("_id" << 4 << "x" << arrayBuilder.arr())); + entry.setU(write_ops::UpdateModification::parseFromClassicUpdate( + fromjson("{$set: {'x.$[b].a': 'FOO'}}"))); + + auto arrayFilter = std::vector<BSONObj>{fromjson("{'b.a': {$eq: 'bar'}}")}; + entry.setArrayFilters(arrayFilter); + entry.setCollation(fromjson("{locale: 'en_US', strength: 2}")); + + write_ops::UpdateCommandRequest updateCommandRequest(kNss); + updateCommandRequest.setUpdates({entry}); + UpdateRequest updateRequest(updateCommandRequest.getUpdates().front()); + + auto doc = write_without_shard_key::generateUpsertDocument(getOpCtx(), updateRequest); + ASSERT_BSONOBJ_EQ(doc, fromjson("{ _id: 4, x: [ { a: 'FOO' }, { a: 'FOO' }, { a: 'foo' } ] }")); +} + } // namespace } // namespace write_without_shard_key } // namespace mongo |