summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSanika Phanse <sanika.phanse@mongodb.com>2023-02-24 20:47:38 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-02-25 03:38:11 +0000
commit8a53ac6194cbd298e517fc4102263facf9f66ede (patch)
tree47aad707e7571dccea43f4b5b2c888691e360603
parent7820a2ce672fffb91b9f03bd72c1cf827efc391e (diff)
downloadmongo-8a53ac6194cbd298e517fc4102263facf9f66ede.tar.gz
SERVER-69918 Produce upsert document in _clusterQueryWithoutShardKey
-rw-r--r--jstests/sharding/query/collation_targeting.js2
-rw-r--r--jstests/sharding/query/collation_targeting_inherited.js2
-rw-r--r--jstests/sharding/regex_targeting.js2
-rw-r--r--jstests/sharding/updateOne_without_shard_key/cluster_query_without_shard_key_produces_upsert_document.js236
-rw-r--r--jstests/sharding/update_compound_shard_key.js6
-rw-r--r--jstests/sharding/update_sharded.js4
-rw-r--r--src/mongo/db/commands/find_and_modify.cpp36
-rw-r--r--src/mongo/db/exec/update_stage.cpp2
-rw-r--r--src/mongo/db/exec/upsert_stage.cpp2
-rw-r--r--src/mongo/db/update/SConscript2
-rw-r--r--src/mongo/db/update/update_util.cpp (renamed from src/mongo/db/update/produce_document_for_upsert.cpp)28
-rw-r--r--src/mongo/db/update/update_util.h (renamed from src/mongo/db/update/produce_document_for_upsert.h)7
-rw-r--r--src/mongo/s/SConscript2
-rw-r--r--src/mongo/s/commands/SConscript1
-rw-r--r--src/mongo/s/commands/cluster_query_without_shard_key_cmd.cpp143
-rw-r--r--src/mongo/s/request_types/cluster_commands_without_shard_key.idl4
-rw-r--r--src/mongo/s/write_ops/write_without_shard_key_util.cpp21
-rw-r--r--src/mongo/s/write_ops/write_without_shard_key_util.h6
-rw-r--r--src/mongo/s/write_ops/write_without_shard_key_util_test.cpp96
19 files changed, 495 insertions, 107 deletions
diff --git a/jstests/sharding/query/collation_targeting.js b/jstests/sharding/query/collation_targeting.js
index fa1a4dda73e..c5795ac2328 100644
--- a/jstests/sharding/query/collation_targeting.js
+++ b/jstests/sharding/query/collation_targeting.js
@@ -491,7 +491,7 @@ assert.eq(1, writeRes.nMatched);
// Sharded upsert that does not target a single shard can now be executed with a two phase
// write protocol that will target at most 1 matching document.
if (WriteWithoutShardKeyTestUtil.isWriteWithoutShardKeyFeatureEnabled(testDB)) {
- // TODO: SERVER-69918 Implement upsert behavior for _clusterQueryWithoutShardKey
+ // TODO: SERVER-73057 Implement upsert behavior for _clusterQueryWithoutShardKey
} else {
// Upsert must always be single-shard.
diff --git a/jstests/sharding/query/collation_targeting_inherited.js b/jstests/sharding/query/collation_targeting_inherited.js
index a315fcd1968..9924425e4c1 100644
--- a/jstests/sharding/query/collation_targeting_inherited.js
+++ b/jstests/sharding/query/collation_targeting_inherited.js
@@ -517,7 +517,7 @@ assert.eq(1, writeRes.nMatched);
// Sharded upsert that does not target a single shard can now be executed with a two phase
// write protocol that will target at most 1 matching document.
if (WriteWithoutShardKeyTestUtil.isWriteWithoutShardKeyFeatureEnabled(testDB)) {
- // TODO: SERVER-69918 Implement upsert behavior for _clusterQueryWithoutShardKey
+ // TODO: SERVER-73057 Implement upsert behavior for _clusterQueryWithoutShardKey
} else {
// Upsert must always be single-shard.
diff --git a/jstests/sharding/regex_targeting.js b/jstests/sharding/regex_targeting.js
index d559aa53657..f07eaf95bfa 100644
--- a/jstests/sharding/regex_targeting.js
+++ b/jstests/sharding/regex_targeting.js
@@ -195,7 +195,7 @@ assert.commandWorked(collNested.update({'a.b': /abcde-1/}, {a: {b: /abcde.*/}},
// Sharded updateOnes that do not directly target a shard can now use the two phase write
// protocol to execute.
if (WriteWithoutShardKeyTestUtil.isWriteWithoutShardKeyFeatureEnabled(st.s)) {
- // TODO: SERVER-69918 Implement upsert behavior for _clusterQueryWithoutShardKey
+ // TODO: SERVER-73057 Implement upsert behavior for _clusterQueryWithoutShardKey
} else {
//
//
diff --git a/jstests/sharding/updateOne_without_shard_key/cluster_query_without_shard_key_produces_upsert_document.js b/jstests/sharding/updateOne_without_shard_key/cluster_query_without_shard_key_produces_upsert_document.js
new file mode 100644
index 00000000000..d82cad8424f
--- /dev/null
+++ b/jstests/sharding/updateOne_without_shard_key/cluster_query_without_shard_key_produces_upsert_document.js
@@ -0,0 +1,236 @@
+/**
+ * Test success of findAndModify and update commands without shard key, no document matching on the
+ * filter and {upsert: true}.
+ *
+ * @tags: [
+ * requires_sharding,
+ * uses_transactions,
+ * uses_multi_shard_transaction,
+ * featureFlagUpdateOneWithoutShardKey,
+ * ]
+ */
+
+(function() {
+"use strict";
+
+load("jstests/sharding/updateOne_without_shard_key/libs/write_without_shard_key_test_util.js");
+
+// 2 shards single node, 1 mongos, 1 config server 3-node
+const st = new ShardingTest({});
+const dbName = "testDb";
+const collectionName = "testColl";
+const nss = dbName + "." + collectionName;
+const splitPoint = 0;
+
+// Sets up a 2 shard cluster using 'x' as a shard key where Shard 0 owns x <
+// the splitpoint and Shard 1 >= splitpoint.
+WriteWithoutShardKeyTestUtil.setupShardedCollection(
+ st, nss, {x: 1}, [{x: splitPoint}], [{query: {x: splitPoint}, shard: st.shard1.shardName}]);
+
+const testCases = [
+ {
+ logMessage: "FindAndModify, replacement style update, should upsert.",
+ cmdObj: {
+ _clusterQueryWithoutShardKey: 1,
+ writeCmd: {
+ findAndModify: collectionName,
+ query: {a: 0},
+ update: {x: -1, y: 7},
+ upsert: true,
+ },
+ stmtId: NumberInt(0),
+ txnNumber: NumberLong(0),
+ lsid: {id: UUID()},
+ startTransaction: true,
+ autocommit: false
+ },
+ expectedMods: [{'x': -1}, {'y': 7}],
+ upsertRequired: true,
+ },
+ {
+ logMessage: "FindAndModify, pipeline style update, should upsert.",
+ cmdObj: {
+ _clusterQueryWithoutShardKey: 1,
+ writeCmd: {
+ findAndModify: collectionName,
+ query: {a: 0},
+ update: [{$set: {y: 3}}, {$set: {x: 5}}],
+ upsert: true,
+ },
+ stmtId: NumberInt(0),
+ txnNumber: NumberLong(0),
+ lsid: {id: UUID()},
+ startTransaction: true,
+ autocommit: false
+ },
+ expectedMods: [{'a': 0}, {'x': 5}, {'y': 3}],
+ upsertRequired: true,
+ },
+ {
+ logMessage: "FindAndModify, modification style update, should upsert.",
+ cmdObj: {
+ _clusterQueryWithoutShardKey: 1,
+ writeCmd: {
+ findAndModify: collectionName,
+ query: {a: -1},
+ update: {$inc: {a: 1}},
+ upsert: true,
+ },
+ stmtId: NumberInt(0),
+ txnNumber: NumberLong(0),
+ lsid: {id: UUID()},
+ startTransaction: true,
+ autocommit: false
+ },
+ expectedMods: [{'a': 0}],
+ upsertRequired: true,
+ },
+ {
+ logMessage: "Update, replacement style update, should upsert.",
+ cmdObj: {
+ _clusterQueryWithoutShardKey: 1,
+ writeCmd: {
+ update: collectionName,
+ updates: [{q: {a: 0}, u: {x: -1, y: 7}, upsert: true}],
+ },
+ stmtId: NumberInt(0),
+ txnNumber: NumberLong(0),
+ lsid: {id: UUID()},
+ startTransaction: true,
+ autocommit: false
+ },
+ expectedMods: [{'x': -1}, {'y': 7}],
+ upsertRequired: true,
+ },
+ {
+ logMessage: "Update, pipeline style update, should upsert.",
+ cmdObj: {
+ _clusterQueryWithoutShardKey: 1,
+ writeCmd: {
+ update: collectionName,
+ updates: [{q: {a: 0}, u: [{$set: {y: 3}}, {$set: {x: 5}}], upsert: true}],
+ },
+ stmtId: NumberInt(0),
+ txnNumber: NumberLong(0),
+ lsid: {id: UUID()},
+ startTransaction: true,
+ autocommit: false
+ },
+ expectedMods: [{'a': 0}, {'x': 5}, {'y': 3}],
+ upsertRequired: true,
+ },
+ {
+ logMessage: "Update, modification style update, should upsert.",
+ cmdObj: {
+ _clusterQueryWithoutShardKey: 1,
+ writeCmd: {
+ update: collectionName,
+ updates: [{q: {a: 0}, u: {$inc: {a: 1}}, upsert: true}],
+ },
+ stmtId: NumberInt(0),
+ txnNumber: NumberLong(0),
+ lsid: {id: UUID()},
+ startTransaction: true,
+ autocommit: false
+ },
+ expectedMods: [{'a': 1}],
+ upsertRequired: true,
+ },
+ {
+ logMessage: "Update, arrayFilters, case-insensitive collation, should upsert.",
+ cmdObj: {
+ _clusterQueryWithoutShardKey: 1,
+ writeCmd: {
+ update: collectionName,
+ updates: [{
+ q: {x: ["bar", "BAR", "foo"]},
+ u: {$set: {'x.$[b]': 'FOO'}},
+ upsert: true,
+ arrayFilters: [{'b': {$eq: 'bar'}}],
+ collation: {locale: 'en_US', strength: 2},
+ }],
+ },
+ stmtId: NumberInt(0),
+ txnNumber: NumberLong(0),
+ lsid: {id: UUID()},
+ startTransaction: true,
+ autocommit: false
+ },
+ expectedMods: [{x: ["FOO", "FOO", "foo"]}],
+ upsertRequired: true,
+ },
+ {
+ logMessage: "No document matches query, {upsert: false}, no modifications expected.",
+ cmdObj: {
+ _clusterQueryWithoutShardKey: 1,
+ writeCmd: {
+ update: collectionName,
+ updates: [{q: {a: 0}, u: {$inc: {a: 1}}, upsert: false}],
+ },
+ stmtId: NumberInt(0),
+ txnNumber: NumberLong(0),
+ lsid: {id: UUID()},
+ startTransaction: true,
+ autocommit: false
+ },
+ expectedMods: [],
+ upsertRequired: false,
+ },
+ {
+ logMessage: "Update, incorrect modification style update, {upsert: true}, should error.",
+ cmdObj: {
+ _clusterQueryWithoutShardKey: 1,
+ writeCmd: {
+ update: collectionName,
+ updates: [{q: {a: 0}, u: {$match: {a: 1}}, upsert: true}],
+ },
+ stmtId: NumberInt(0),
+ txnNumber: NumberLong(0),
+ lsid: {id: UUID()},
+ startTransaction: true,
+ autocommit: false
+ },
+ errorCode: ErrorCodes.FailedToParse,
+ },
+ {
+ logMessage: "Update immutable _id field, errors.",
+ cmdObj: {
+ _clusterQueryWithoutShardKey: 1,
+ writeCmd: {
+ findAndModify: collectionName,
+ query: {_id: 0},
+ update: {_id: -1, y: 7},
+ upsert: true,
+ },
+ stmtId: NumberInt(0),
+ txnNumber: NumberLong(0),
+ lsid: {id: UUID()},
+ startTransaction: true,
+ autocommit: false
+ },
+ errorCode: ErrorCodes.ImmutableField,
+ },
+];
+
+testCases.forEach(testCase => {
+ jsTest.log(testCase.logMessage + '\n' + tojson(testCase.cmdObj));
+
+ if (testCase.errorCode) {
+ assert.commandFailedWithCode(st.getDB(dbName).runCommand(testCase.cmdObj),
+ testCase.errorCode);
+ } else {
+ const res = assert.commandWorked(st.getDB(dbName).runCommand(testCase.cmdObj));
+ assert.eq(res.upsertRequired, testCase.upsertRequired, res);
+ testCase.expectedMods.forEach(mod => {
+ let field = Object.keys(mod)[0];
+ assert.eq(res.targetDoc[field], mod[field]);
+ });
+
+ if (!testCase.upsertRequired) {
+ assert.eq(null, res.targetDoc, res.targetDoc);
+ }
+ }
+});
+
+st.stop();
+})();
diff --git a/jstests/sharding/update_compound_shard_key.js b/jstests/sharding/update_compound_shard_key.js
index 0a04ec716bc..567e2be9987 100644
--- a/jstests/sharding/update_compound_shard_key.js
+++ b/jstests/sharding/update_compound_shard_key.js
@@ -225,7 +225,7 @@ assert.eq(1, sessionDB.coll.find(updateDocTxn).itcount());
// Shard key field modifications do not have to specify full shard key.
if (WriteWithoutShardKeyTestUtil.isWriteWithoutShardKeyFeatureEnabled(st.s)) {
- // TODO: SERVER-69918 Implement upsert behavior for _clusterQueryWithoutShardKey
+ // TODO: SERVER-73057 Implement upsert behavior for _clusterQueryWithoutShardKey
} else {
// Full shard key not specified in query.
@@ -348,7 +348,7 @@ assert.eq(1, sessionDB.coll.find(upsertDocTxn["$set"]).itcount());
// Shard key field modifications do not have to specify full shard key.
if (WriteWithoutShardKeyTestUtil.isWriteWithoutShardKeyFeatureEnabled(st.s)) {
- // TODO: SERVER-69918 Implement upsert behavior for _clusterQueryWithoutShardKey
+ // TODO: SERVER-73057 Implement upsert behavior for _clusterQueryWithoutShardKey
} else {
// Full shard key not specified in query.
@@ -501,7 +501,7 @@ assert.commandWorked(session.commitTransaction_forTesting());
assert.eq(1, sessionDB.coll.find(upsertProjectTxnDoc).itcount());
if (WriteWithoutShardKeyTestUtil.isWriteWithoutShardKeyFeatureEnabled(st.s)) {
- // TODO: SERVER-69918 Implement upsert behavior for _clusterQueryWithoutShardKey
+ // TODO: SERVER-73057 Implement upsert behavior for _clusterQueryWithoutShardKey
} else {
// Full shard key not specified in query.
assert.commandFailedWithCode(
diff --git a/jstests/sharding/update_sharded.js b/jstests/sharding/update_sharded.js
index 9c5d51a8785..af9176a91a4 100644
--- a/jstests/sharding/update_sharded.js
+++ b/jstests/sharding/update_sharded.js
@@ -58,7 +58,7 @@ for (let i = 0; i < 2; i++) {
assert.commandWorked(coll.update({_id: 2}, {key: 2, other: 2}));
assert.commandWorked(coll.update({_id: 3}, {key: 3, other: 3}));
- // TODO: SERVER-69918 Implement upsert behavior for _clusterQueryWithoutShardKey
+ // TODO: SERVER-73057 Implement upsert behavior for _clusterQueryWithoutShardKey
if (!WriteWithoutShardKeyTestUtil.isWriteWithoutShardKeyFeatureEnabled(sessionDb)) {
// do a replacement-style update which queries the shard key and keeps it constant
assert.commandWorked(coll.update({key: 4}, {_id: 4, key: 4}, {upsert: true}));
@@ -80,7 +80,7 @@ for (let i = 0; i < 2; i++) {
assert.commandWorked(coll.update({_id: 1, key: 1}, {$set: {foo: 2}}));
- // TODO: SERVER-69918 Implement upsert behavior for _clusterQueryWithoutShardKey
+ // TODO: SERVER-73057 Implement upsert behavior for _clusterQueryWithoutShardKey
if (!WriteWithoutShardKeyTestUtil.isWriteWithoutShardKeyFeatureEnabled(sessionDb)) {
coll.update({key: 17}, {$inc: {x: 5}}, true);
assert.eq(5, coll.findOne({key: 17}).x, "up1");
diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp
index e4d4a533534..cdc7965ddfa 100644
--- a/src/mongo/db/commands/find_and_modify.cpp
+++ b/src/mongo/db/commands/find_and_modify.cpp
@@ -44,16 +44,11 @@
#include "mongo/db/fle_crud.h"
#include "mongo/db/matcher/extensions_callback_real.h"
#include "mongo/db/namespace_string.h"
-#include "mongo/db/operation_context.h"
-#include "mongo/db/ops/delete_request_gen.h"
#include "mongo/db/ops/insert.h"
#include "mongo/db/ops/parsed_delete.h"
#include "mongo/db/ops/parsed_update.h"
-#include "mongo/db/ops/update_request.h"
-#include "mongo/db/ops/write_ops_exec.h"
#include "mongo/db/ops/write_ops_retryability.h"
#include "mongo/db/query/collection_query_info.h"
-#include "mongo/db/query/explain.h"
#include "mongo/db/query/get_executor.h"
#include "mongo/db/query/plan_executor.h"
#include "mongo/db/query/plan_summary_stats.h"
@@ -69,6 +64,7 @@
#include "mongo/db/transaction/retryable_writes_stats.h"
#include "mongo/db/transaction/transaction_participant.h"
#include "mongo/db/transaction_validation.h"
+#include "mongo/db/update/update_util.h"
#include "mongo/db/write_concern.h"
#include "mongo/logv2/log.h"
#include "mongo/s/would_change_owning_shard_exception.h"
@@ -147,32 +143,6 @@ void validate(const write_ops::FindAndModifyCommandRequest& request) {
}
}
-void makeUpdateRequest(OperationContext* opCtx,
- const write_ops::FindAndModifyCommandRequest& request,
- boost::optional<ExplainOptions::Verbosity> explain,
- UpdateRequest* requestOut) {
- requestOut->setQuery(request.getQuery());
- requestOut->setProj(request.getFields().value_or(BSONObj()));
- invariant(request.getUpdate());
- requestOut->setUpdateModification(*request.getUpdate());
- requestOut->setLegacyRuntimeConstants(
- request.getLegacyRuntimeConstants().value_or(Variables::generateRuntimeConstants(opCtx)));
- requestOut->setLetParameters(request.getLet());
- requestOut->setSort(request.getSort().value_or(BSONObj()));
- requestOut->setHint(request.getHint());
- requestOut->setCollation(request.getCollation().value_or(BSONObj()));
- requestOut->setArrayFilters(request.getArrayFilters().value_or(std::vector<BSONObj>()));
- requestOut->setUpsert(request.getUpsert().value_or(false));
- requestOut->setReturnDocs((request.getNew().value_or(false)) ? UpdateRequest::RETURN_NEW
- : UpdateRequest::RETURN_OLD);
- requestOut->setMulti(false);
- requestOut->setExplain(explain);
-
- requestOut->setYieldPolicy(opCtx->inMultiDocumentTransaction()
- ? PlanYieldPolicy::YieldPolicy::INTERRUPT_ONLY
- : PlanYieldPolicy::YieldPolicy::YIELD_AUTO);
-}
-
void makeDeleteRequest(OperationContext* opCtx,
const write_ops::FindAndModifyCommandRequest& request,
bool explain,
@@ -602,7 +572,7 @@ void CmdFindAndModify::Invocation::explain(OperationContext* opCtx,
} else {
auto updateRequest = UpdateRequest();
updateRequest.setNamespaceString(nss);
- makeUpdateRequest(opCtx, request, verbosity, &updateRequest);
+ update::makeUpdateRequest(opCtx, request, verbosity, &updateRequest);
const ExtensionsCallbackReal extensionsCallback(opCtx, &updateRequest.getNamespaceString());
ParsedUpdate parsedUpdate(opCtx, &updateRequest, extensionsCallback);
@@ -726,7 +696,7 @@ write_ops::FindAndModifyCommandReply CmdFindAndModify::Invocation::typedRun(
auto updateRequest = UpdateRequest();
updateRequest.setNamespaceString(nsString);
const auto verbosity = boost::none;
- makeUpdateRequest(opCtx, req, verbosity, &updateRequest);
+ update::makeUpdateRequest(opCtx, req, verbosity, &updateRequest);
if (opCtx->getTxnNumber()) {
updateRequest.setStmtIds({stmtId});
diff --git a/src/mongo/db/exec/update_stage.cpp b/src/mongo/db/exec/update_stage.cpp
index 3abd591d463..99c7c28c96b 100644
--- a/src/mongo/db/exec/update_stage.cpp
+++ b/src/mongo/db/exec/update_stage.cpp
@@ -41,8 +41,8 @@
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/update/path_support.h"
-#include "mongo/db/update/produce_document_for_upsert.h"
#include "mongo/db/update/update_oplog_entry_serialization.h"
+#include "mongo/db/update/update_util.h"
#include "mongo/logv2/log.h"
#include "mongo/s/grid.h"
#include "mongo/s/would_change_owning_shard_exception.h"
diff --git a/src/mongo/db/exec/upsert_stage.cpp b/src/mongo/db/exec/upsert_stage.cpp
index c4917b191ab..502b68b07a4 100644
--- a/src/mongo/db/exec/upsert_stage.cpp
+++ b/src/mongo/db/exec/upsert_stage.cpp
@@ -38,8 +38,8 @@
#include "mongo/db/query/query_feature_flags_gen.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/operation_sharding_state.h"
-#include "mongo/db/update/produce_document_for_upsert.h"
#include "mongo/db/update/storage_validation.h"
+#include "mongo/db/update/update_util.h"
#include "mongo/s/would_change_owning_shard_exception.h"
namespace mongo {
diff --git a/src/mongo/db/update/SConscript b/src/mongo/db/update/SConscript
index e4806030657..2b3d9973aeb 100644
--- a/src/mongo/db/update/SConscript
+++ b/src/mongo/db/update/SConscript
@@ -90,7 +90,7 @@ env.Library(
target='update_driver',
source=[
'update_driver.cpp',
- 'produce_document_for_upsert.cpp',
+ 'update_util.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/common',
diff --git a/src/mongo/db/update/produce_document_for_upsert.cpp b/src/mongo/db/update/update_util.cpp
index b541d2b3aa1..90f41da7dd1 100644
--- a/src/mongo/db/update/produce_document_for_upsert.cpp
+++ b/src/mongo/db/update/update_util.cpp
@@ -27,7 +27,7 @@
* it in the license file.
*/
-#include "mongo/db/update/produce_document_for_upsert.h"
+#include "mongo/db/update/update_util.h"
#include "mongo/bson/mutable/algorithm.h"
#include "mongo/db/s/operation_sharding_state.h"
@@ -144,5 +144,31 @@ void ensureIdFieldIsFirst(mutablebson::Document* doc, bool generateOIDIfMissing)
}
}
+void makeUpdateRequest(OperationContext* opCtx,
+ const write_ops::FindAndModifyCommandRequest& request,
+ boost::optional<ExplainOptions::Verbosity> explain,
+ UpdateRequest* requestOut) {
+ requestOut->setQuery(request.getQuery());
+ requestOut->setProj(request.getFields().value_or(BSONObj()));
+ invariant(request.getUpdate());
+ requestOut->setUpdateModification(*request.getUpdate());
+ requestOut->setLegacyRuntimeConstants(
+ request.getLegacyRuntimeConstants().value_or(Variables::generateRuntimeConstants(opCtx)));
+ requestOut->setLetParameters(request.getLet());
+ requestOut->setSort(request.getSort().value_or(BSONObj()));
+ requestOut->setHint(request.getHint());
+ requestOut->setCollation(request.getCollation().value_or(BSONObj()));
+ requestOut->setArrayFilters(request.getArrayFilters().value_or(std::vector<BSONObj>()));
+ requestOut->setUpsert(request.getUpsert().value_or(false));
+ requestOut->setReturnDocs((request.getNew().value_or(false)) ? UpdateRequest::RETURN_NEW
+ : UpdateRequest::RETURN_OLD);
+ requestOut->setMulti(false);
+ requestOut->setExplain(explain);
+
+ requestOut->setYieldPolicy(opCtx->inMultiDocumentTransaction()
+ ? PlanYieldPolicy::YieldPolicy::INTERRUPT_ONLY
+ : PlanYieldPolicy::YieldPolicy::YIELD_AUTO);
+}
+
} // namespace update
} // namespace mongo
diff --git a/src/mongo/db/update/produce_document_for_upsert.h b/src/mongo/db/update/update_util.h
index 3056d8222a0..edec49f6f8a 100644
--- a/src/mongo/db/update/produce_document_for_upsert.h
+++ b/src/mongo/db/update/update_util.h
@@ -48,5 +48,12 @@ void produceDocumentForUpsert(OperationContext* opCtx,
void ensureIdFieldIsFirst(mutablebson::Document* doc, bool generateOIDIfMissing);
void assertPathsNotArray(const mutablebson::Document& document, const FieldRefSet& paths);
+/**
+ * Parse FindAndModify update command request into an updateRequest.
+ */
+void makeUpdateRequest(OperationContext* opCtx,
+ const write_ops::FindAndModifyCommandRequest& request,
+ boost::optional<ExplainOptions::Verbosity> explain,
+ UpdateRequest* requestOut);
} // namespace update
} // namespace mongo
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index a243129bbd6..45591a29d5f 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -42,6 +42,7 @@ env.Library(
'$BUILD_DIR/mongo/db/commands/server_status_core',
'$BUILD_DIR/mongo/db/fle_crud',
'$BUILD_DIR/mongo/db/not_primary_error_tracker',
+ '$BUILD_DIR/mongo/db/ops/parsed_update',
'$BUILD_DIR/mongo/db/pipeline/pipeline',
'$BUILD_DIR/mongo/db/pipeline/process_interface/mongos_process_interface',
'$BUILD_DIR/mongo/db/timeseries/timeseries_conversion_util',
@@ -698,6 +699,7 @@ env.CppUnitTest(
LIBDEPS=[
'$BUILD_DIR/mongo/db/auth/authmocks',
'$BUILD_DIR/mongo/db/mongohasher',
+ '$BUILD_DIR/mongo/db/ops/parsed_update',
'$BUILD_DIR/mongo/db/ops/write_ops_parsers_test_helpers',
'$BUILD_DIR/mongo/db/pipeline/process_interface/mongos_process_interface_factory',
'$BUILD_DIR/mongo/db/query/query_test_service_context',
diff --git a/src/mongo/s/commands/SConscript b/src/mongo/s/commands/SConscript
index f6c5813816c..f96e38a15ce 100644
--- a/src/mongo/s/commands/SConscript
+++ b/src/mongo/s/commands/SConscript
@@ -139,6 +139,7 @@ env.Library(
'$BUILD_DIR/mongo/db/index_commands_idl',
'$BUILD_DIR/mongo/db/initialize_api_parameters',
'$BUILD_DIR/mongo/db/internal_transactions_feature_flag',
+ '$BUILD_DIR/mongo/db/ops/parsed_update',
'$BUILD_DIR/mongo/db/query/command_request_response',
'$BUILD_DIR/mongo/db/query/cursor_response_idl',
'$BUILD_DIR/mongo/db/query/map_reduce_output_format',
diff --git a/src/mongo/s/commands/cluster_query_without_shard_key_cmd.cpp b/src/mongo/s/commands/cluster_query_without_shard_key_cmd.cpp
index 2cf70715f31..1ec30066770 100644
--- a/src/mongo/s/commands/cluster_query_without_shard_key_cmd.cpp
+++ b/src/mongo/s/commands/cluster_query_without_shard_key_cmd.cpp
@@ -26,11 +26,12 @@
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
-
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/commands.h"
#include "mongo/db/internal_transactions_feature_flag_gen.h"
+#include "mongo/db/matcher/extensions_callback_noop.h"
#include "mongo/db/query/collation/collator_factory_interface.h"
+#include "mongo/db/update/update_util.h"
#include "mongo/logv2/log.h"
#include "mongo/s/cluster_commands_helpers.h"
#include "mongo/s/grid.h"
@@ -39,6 +40,7 @@
#include "mongo/s/request_types/cluster_commands_without_shard_key_gen.h"
#include "mongo/s/shard_key_pattern_query_util.h"
#include "mongo/s/write_ops/batch_write_op.h"
+#include "mongo/s/write_ops/write_without_shard_key_util.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kCommand
@@ -48,10 +50,9 @@ namespace {
struct ParsedCommandInfo {
BSONObj query;
BSONObj collation;
- int stmtId;
-
- ParsedCommandInfo(BSONObj query, BSONObj collation, int stmtId)
- : query(query), collation(collation), stmtId(stmtId) {}
+ bool upsert = false;
+ int stmtId = kUninitializedStmtId;
+ boost::optional<UpdateRequest> updateRequest;
};
struct AsyncRequestSenderResponseData {
@@ -111,6 +112,65 @@ BSONObj createAggregateCmdObj(OperationContext* opCtx,
return aggregate.toBSON({});
}
+ParsedCommandInfo parseWriteCommand(OperationContext* opCtx,
+ StringData commandName,
+ const BSONObj& writeCmdObj) {
+ ParsedCommandInfo parsedInfo;
+ if (commandName == write_ops::UpdateCommandRequest::kCommandName) {
+ auto updateRequest = write_ops::UpdateCommandRequest::parse(
+ IDLParserContext("_clusterQueryWithoutShardKeyForUpdate"), writeCmdObj);
+ parsedInfo.query = updateRequest.getUpdates().front().getQ();
+
+ // In the batch write path, when the request is reconstructed to be passed to
+ // the two phase write protocol, only the stmtIds field is used.
+ if (auto stmtIds = updateRequest.getStmtIds()) {
+ parsedInfo.stmtId = stmtIds->front();
+ }
+
+ if ((parsedInfo.upsert = updateRequest.getUpdates().front().getUpsert())) {
+ parsedInfo.updateRequest = updateRequest.getUpdates().front();
+ }
+
+ if (auto parsedCollation = updateRequest.getUpdates().front().getCollation()) {
+ parsedInfo.collation = *parsedCollation;
+ }
+ } else if (commandName == write_ops::DeleteCommandRequest::kCommandName) {
+ auto deleteRequest = write_ops::DeleteCommandRequest::parse(
+ IDLParserContext("_clusterQueryWithoutShardKeyForDelete"), writeCmdObj);
+ parsedInfo.query = deleteRequest.getDeletes().front().getQ();
+
+ // In the batch write path, when the request is reconstructed to be passed to
+ // the two phase write protocol, only the stmtIds field is used.
+ if (auto stmtIds = deleteRequest.getStmtIds()) {
+ parsedInfo.stmtId = stmtIds->front();
+ }
+
+ if (auto parsedCollation = deleteRequest.getDeletes().front().getCollation()) {
+ parsedInfo.collation = *parsedCollation;
+ }
+ } else if (commandName == write_ops::FindAndModifyCommandRequest::kCommandName ||
+ commandName == write_ops::FindAndModifyCommandRequest::kCommandAlias) {
+ auto findAndModifyRequest = write_ops::FindAndModifyCommandRequest::parse(
+ IDLParserContext("_clusterQueryWithoutShardKeyFindAndModify"), writeCmdObj);
+ parsedInfo.query = findAndModifyRequest.getQuery();
+ parsedInfo.stmtId = findAndModifyRequest.getStmtId().value_or(kUninitializedStmtId);
+
+ if ((parsedInfo.upsert = findAndModifyRequest.getUpsert().get_value_or(false))) {
+ parsedInfo.updateRequest = UpdateRequest{};
+ parsedInfo.updateRequest->setNamespaceString(findAndModifyRequest.getNamespace());
+ update::makeUpdateRequest(
+ opCtx, findAndModifyRequest, boost::none, parsedInfo.updateRequest.get_ptr());
+ }
+
+ if (auto parsedCollation = findAndModifyRequest.getCollation()) {
+ parsedInfo.collation = *parsedCollation;
+ }
+ } else {
+ uasserted(ErrorCodes::InvalidOptions, "Not a supported batch write command");
+ }
+ return parsedInfo;
+}
+
class ClusterQueryWithoutShardKeyCmd : public TypedCommand<ClusterQueryWithoutShardKeyCmd> {
public:
using Request = ClusterQueryWithoutShardKey;
@@ -134,63 +194,15 @@ public:
CommandHelpers::parseNsCollectionRequired(ns().dbName(), request().getWriteCmd()));
const auto cri = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, nss));
- auto parsedInfoFromRequest = [&] {
- const auto commandName = request().getWriteCmd().firstElementFieldNameStringData();
-
- // Parse into OpMsgRequest to append the $db field, which is required for command
- // parsing.
- const auto opMsgRequest =
- OpMsgRequest::fromDBAndBody(nss.db(), request().getWriteCmd());
- BSONObj query;
- BSONObj collation;
- int stmtId = kUninitializedStmtId;
-
- if (commandName == write_ops::UpdateCommandRequest::kCommandName) {
- auto updateRequest = write_ops::UpdateCommandRequest::parse(
- IDLParserContext("_clusterQueryWithoutShardKeyForUpdate"),
- opMsgRequest.body);
- query = updateRequest.getUpdates().front().getQ();
-
- // In the batch write path, when the request is reconstructed to be passed to
- // the two phase write protocol, only the stmtIds field is used.
- if (auto stmtIds = updateRequest.getStmtIds()) {
- stmtId = stmtIds->front();
- }
-
- if (auto parsedCollation = updateRequest.getUpdates().front().getCollation()) {
- collation = *parsedCollation;
- }
- } else if (commandName == write_ops::DeleteCommandRequest::kCommandName) {
- auto deleteRequest = write_ops::DeleteCommandRequest::parse(
- IDLParserContext("_clusterQueryWithoutShardKeyForDelete"),
- opMsgRequest.body);
- query = deleteRequest.getDeletes().front().getQ();
-
- // In the batch write path, when the request is reconstructed to be passed to
- // the two phase write protocol, only the stmtIds field is used.
- if (auto stmtIds = deleteRequest.getStmtIds()) {
- stmtId = stmtIds->front();
- }
-
- if (auto parsedCollation = deleteRequest.getDeletes().front().getCollation()) {
- collation = *parsedCollation;
- }
- } else if (commandName == write_ops::FindAndModifyCommandRequest::kCommandName ||
- commandName == write_ops::FindAndModifyCommandRequest::kCommandAlias) {
- auto findAndModifyRequest = write_ops::FindAndModifyCommandRequest::parse(
- IDLParserContext("_clusterQueryWithoutShardKeyFindAndModify"),
- opMsgRequest.body);
- query = findAndModifyRequest.getQuery();
- stmtId = findAndModifyRequest.getStmtId().value_or(kUninitializedStmtId);
-
- if (auto parsedCollation = findAndModifyRequest.getCollation()) {
- collation = *parsedCollation;
- }
- } else {
- uasserted(ErrorCodes::InvalidOptions, "Not a supported batch write command");
- }
- return ParsedCommandInfo(query.getOwned(), collation.getOwned(), stmtId);
- }();
+ // Parse into OpMsgRequest to append the $db field, which is required for command
+ // parsing.
+ const auto opMsgRequest =
+ OpMsgRequest::fromDBAndBody(nss.db(), request().getWriteCmd());
+
+ auto parsedInfoFromRequest =
+ parseWriteCommand(opCtx,
+ request().getWriteCmd().firstElementFieldNameStringData(),
+ opMsgRequest.body);
auto allShardsContainingChunksForNs =
getShardsToTarget(opCtx, cri.cm, nss, parsedInfoFromRequest);
@@ -241,6 +253,13 @@ public:
res.setShardId(boost::optional<mongo::StringData>(shardId));
}
}
+
+ if (!res.getTargetDoc() && parsedInfoFromRequest.upsert) {
+ res.setTargetDoc(write_without_shard_key::generateUpsertDocument(
+ opCtx, parsedInfoFromRequest.updateRequest.get()));
+ res.setUpsertRequired(true);
+ }
+
return res;
}
diff --git a/src/mongo/s/request_types/cluster_commands_without_shard_key.idl b/src/mongo/s/request_types/cluster_commands_without_shard_key.idl
index d2618e3d3e6..4546fe19b02 100644
--- a/src/mongo/s/request_types/cluster_commands_without_shard_key.idl
+++ b/src/mongo/s/request_types/cluster_commands_without_shard_key.idl
@@ -48,6 +48,10 @@ structs:
description: "The shard id of the target shard."
type: string
optional: true
+ upsertRequired:
+ description: "True if the targetDoc should be upserted."
+ type: bool
+ default: false
clusterWriteWithoutShardKeyResponse:
description: "The response for the '_clusterWriteWithoutShardKeyFind' command."
is_command_reply: true
diff --git a/src/mongo/s/write_ops/write_without_shard_key_util.cpp b/src/mongo/s/write_ops/write_without_shard_key_util.cpp
index b71a9f8406a..2dda88a47d3 100644
--- a/src/mongo/s/write_ops/write_without_shard_key_util.cpp
+++ b/src/mongo/s/write_ops/write_without_shard_key_util.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/query/collation/collation_index_key.h"
#include "mongo/db/query/collation/collator_factory_interface.h"
#include "mongo/db/transaction/transaction_api.h"
+#include "mongo/db/update/update_util.h"
#include "mongo/logv2/log.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/s/grid.h"
@@ -48,6 +49,7 @@ namespace write_without_shard_key {
namespace {
constexpr auto kIdFieldName = "_id"_sd;
+const FieldRef idFieldRef(kIdFieldName);
// Used to do query validation for the _id field.
const ShardKeyPattern kVirtualIdShardKey(BSON(kIdFieldName << 1));
@@ -104,6 +106,25 @@ bool shardKeyHasCollatableType(const BSONObj& shardKey) {
}
} // namespace
+BSONObj generateUpsertDocument(OperationContext* opCtx, const UpdateRequest& updateRequest) {
+ ExtensionsCallbackNoop extensionsCallback = ExtensionsCallbackNoop();
+ ParsedUpdate parsedUpdate(opCtx, &updateRequest, extensionsCallback);
+ uassertStatusOK(parsedUpdate.parseRequest());
+
+ const CanonicalQuery* canonicalQuery =
+ parsedUpdate.hasParsedQuery() ? parsedUpdate.getParsedQuery() : nullptr;
+ FieldRefSet immutablePaths;
+ immutablePaths.insert(&idFieldRef);
+ update::produceDocumentForUpsert(opCtx,
+ &updateRequest,
+ parsedUpdate.getDriver(),
+ canonicalQuery,
+ immutablePaths,
+ parsedUpdate.getDriver()->getDocument());
+
+ return parsedUpdate.getDriver()->getDocument().getObject();
+}
+
bool useTwoPhaseProtocol(OperationContext* opCtx,
NamespaceString nss,
bool isUpdateOrDelete,
diff --git a/src/mongo/s/write_ops/write_without_shard_key_util.h b/src/mongo/s/write_ops/write_without_shard_key_util.h
index da9955777b8..fe567b87fa1 100644
--- a/src/mongo/s/write_ops/write_without_shard_key_util.h
+++ b/src/mongo/s/write_ops/write_without_shard_key_util.h
@@ -32,12 +32,18 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/ops/parsed_update.h"
#include "mongo/s/request_types/cluster_commands_without_shard_key_gen.h"
namespace mongo {
namespace write_without_shard_key {
/**
+ * Uses updateDriver to produce the document to insert. Only use when {upsert: true}.
+ */
+BSONObj generateUpsertDocument(OperationContext* opCtx, const UpdateRequest& updateRequest);
+
+/**
* Returns true if we can use the two phase protocol to complete a single write without shard
* key.
**/
diff --git a/src/mongo/s/write_ops/write_without_shard_key_util_test.cpp b/src/mongo/s/write_ops/write_without_shard_key_util_test.cpp
index 5fb78f3927a..d4a47885ca4 100644
--- a/src/mongo/s/write_ops/write_without_shard_key_util_test.cpp
+++ b/src/mongo/s/write_ops/write_without_shard_key_util_test.cpp
@@ -27,9 +27,11 @@
* it in the license file.
*/
+#include "mongo/db/ops/update_request.h"
#include "mongo/idl/server_parameter_test_util.h"
#include "mongo/logv2/log.h"
#include "mongo/s/catalog_cache_test_fixture.h"
+#include "mongo/s/concurrency/locker_mongos_client_observer.h"
#include "mongo/s/sharding_feature_flags_gen.h"
#include "mongo/s/write_ops/write_without_shard_key_util.h"
#include "mongo/unittest/unittest.h"
@@ -83,6 +85,23 @@ protected:
}
};
+class ProduceUpsertDocumentTest : public ServiceContextTest {
+public:
+ void setUp() override {
+ ServiceContextTest::setUp();
+ auto service = getServiceContext();
+ service->registerClientObserver(std::make_unique<LockerMongosClientObserver>());
+ _opCtx = makeOperationContext();
+ }
+
+ OperationContext* getOpCtx() const {
+ return _opCtx.get();
+ }
+
+protected:
+ ServiceContext::UniqueOperationContext _opCtx;
+};
+
TEST_F(WriteWithoutShardKeyUtilTest, WriteQueryContainingFullShardKeyCanTargetSingleDocument) {
RAIIServerParameterControllerForTest featureFlagController(
"featureFlagUpdateOneWithoutShardKey", true);
@@ -262,6 +281,83 @@ TEST_F(WriteWithoutShardKeyUtilTest,
ASSERT_EQ(useTwoPhaseProtocol, true);
}
+TEST_F(ProduceUpsertDocumentTest, produceUpsertDocumentUsingReplacementUpdate) {
+ write_ops::UpdateOpEntry entry;
+ entry.setQ(BSON("_id" << 3));
+ entry.setU(write_ops::UpdateModification::parseFromClassicUpdate(BSON("x" << 2)));
+
+ write_ops::UpdateCommandRequest updateCommandRequest(kNss);
+ updateCommandRequest.setUpdates({entry});
+ UpdateRequest updateRequest(updateCommandRequest.getUpdates().front());
+
+ auto doc = write_without_shard_key::generateUpsertDocument(getOpCtx(), updateRequest);
+ ASSERT_BSONOBJ_EQ(doc, fromjson("{ _id: 3, x: 2 }"));
+}
+
+TEST_F(ProduceUpsertDocumentTest, produceUpsertDocumentUsingLetConstantAndPipelineUpdate) {
+ write_ops::UpdateOpEntry entry;
+ entry.setQ(BSON("_id" << 4 << "x" << 3));
+
+ std::vector<BSONObj> pipelineUpdate;
+ pipelineUpdate.push_back(fromjson("{$set: {'x': '$$constOne'}}"));
+ pipelineUpdate.push_back(fromjson("{$set: {'y': 3}}"));
+ entry.setU(pipelineUpdate);
+
+ BSONObj constants = fromjson("{constOne: 'foo'}");
+ entry.setC(constants);
+
+ write_ops::UpdateCommandRequest updateCommandRequest(kNss);
+ updateCommandRequest.setUpdates({entry});
+ UpdateRequest updateRequest(updateCommandRequest.getUpdates().front());
+
+ auto doc = write_without_shard_key::generateUpsertDocument(getOpCtx(), updateRequest);
+ ASSERT_BSONOBJ_EQ(doc, fromjson("{ _id: 4, x: 'foo', y: 3 }"));
+}
+
+TEST_F(ProduceUpsertDocumentTest, produceUpsertDocumentUsingArrayFilterAndModificationUpdate) {
+ write_ops::UpdateOpEntry entry;
+ BSONArrayBuilder arrayBuilder;
+ arrayBuilder.append(BSON("a" << 90));
+ entry.setQ(BSON("_id" << 4 << "x" << arrayBuilder.arr()));
+ entry.setU(
+ write_ops::UpdateModification::parseFromClassicUpdate(fromjson("{$inc: {'x.$[b].a': 3}}")));
+
+ auto arrayFilter = std::vector<BSONObj>{fromjson("{'b.a': {$gt: 85}}")};
+ entry.setArrayFilters(arrayFilter);
+
+ write_ops::UpdateCommandRequest updateCommandRequest(kNss);
+ updateCommandRequest.setUpdates({entry});
+ UpdateRequest updateRequest(updateCommandRequest.getUpdates().front());
+
+ auto doc = write_without_shard_key::generateUpsertDocument(getOpCtx(), updateRequest);
+ ASSERT_BSONOBJ_EQ(doc, fromjson("{ _id: 4, x: [ { a: 93 } ] }"));
+}
+
+TEST_F(ProduceUpsertDocumentTest, produceUpsertDocumentUsingCollation) {
+ write_ops::UpdateOpEntry entry;
+ BSONArrayBuilder arrayBuilder;
+ arrayBuilder.append(BSON("a"
+ << "BAR"));
+ arrayBuilder.append(BSON("a"
+ << "bar"));
+ arrayBuilder.append(BSON("a"
+ << "foo"));
+ entry.setQ(BSON("_id" << 4 << "x" << arrayBuilder.arr()));
+ entry.setU(write_ops::UpdateModification::parseFromClassicUpdate(
+ fromjson("{$set: {'x.$[b].a': 'FOO'}}")));
+
+ auto arrayFilter = std::vector<BSONObj>{fromjson("{'b.a': {$eq: 'bar'}}")};
+ entry.setArrayFilters(arrayFilter);
+ entry.setCollation(fromjson("{locale: 'en_US', strength: 2}"));
+
+ write_ops::UpdateCommandRequest updateCommandRequest(kNss);
+ updateCommandRequest.setUpdates({entry});
+ UpdateRequest updateRequest(updateCommandRequest.getUpdates().front());
+
+ auto doc = write_without_shard_key::generateUpsertDocument(getOpCtx(), updateRequest);
+ ASSERT_BSONOBJ_EQ(doc, fromjson("{ _id: 4, x: [ { a: 'FOO' }, { a: 'FOO' }, { a: 'foo' } ] }"));
+}
+
} // namespace
} // namespace write_without_shard_key
} // namespace mongo