summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--etc/backports_required_for_multiversion_tests.yml4
-rw-r--r--jstests/sharding/merge_let_params_size_estimation.js155
-rw-r--r--src/mongo/db/commands/write_commands.cpp32
-rw-r--r--src/mongo/db/ops/write_ops.cpp211
-rw-r--r--src/mongo/db/ops/write_ops.h11
-rw-r--r--src/mongo/db/ops/write_ops.idl9
-rw-r--r--src/mongo/db/ops/write_ops_exec_test.cpp194
-rw-r--r--src/mongo/db/pipeline/document_source_merge.cpp237
-rw-r--r--src/mongo/db/pipeline/document_source_merge.h15
-rw-r--r--src/mongo/db/pipeline/document_source_out.cpp7
-rw-r--r--src/mongo/db/pipeline/document_source_out.h11
-rw-r--r--src/mongo/db/pipeline/document_source_writer.h68
-rw-r--r--src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp53
-rw-r--r--src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h17
-rw-r--r--src/mongo/db/pipeline/process_interface/common_process_interface.cpp6
-rw-r--r--src/mongo/db/pipeline/process_interface/common_process_interface.h27
-rw-r--r--src/mongo/db/pipeline/process_interface/mongo_process_interface.h48
-rw-r--r--src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp2
-rw-r--r--src/mongo/db/pipeline/process_interface/mongos_process_interface.h6
-rw-r--r--src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp24
-rw-r--r--src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h6
-rw-r--r--src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp34
-rw-r--r--src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h7
-rw-r--r--src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp32
-rw-r--r--src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h19
-rw-r--r--src/mongo/db/pipeline/process_interface/standalone_process_interface.h5
-rw-r--r--src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h17
-rw-r--r--src/mongo/s/write_ops/batched_command_request.h22
28 files changed, 999 insertions, 280 deletions
diff --git a/etc/backports_required_for_multiversion_tests.yml b/etc/backports_required_for_multiversion_tests.yml
index 8f969854d21..4eedccccb4f 100644
--- a/etc/backports_required_for_multiversion_tests.yml
+++ b/etc/backports_required_for_multiversion_tests.yml
@@ -379,6 +379,8 @@ last-continuous:
ticket: SERVER-76719
- test_file: jstests/sharding/shard_keys_with_dollar_sign.js
ticket: SERVER-76948
+ - test_file: jstests/sharding/merge_let_params_size_estimation.js
+ ticket: SERVER-74806
suites: null
last-lts:
all:
@@ -842,4 +844,6 @@ last-lts:
ticket: SERVER-76489
- test_file: jstests/sharding/shard_keys_with_dollar_sign.js
ticket: SERVER-76948
+ - test_file: jstests/sharding/merge_let_params_size_estimation.js
+ ticket: SERVER-74806
suites: null
diff --git a/jstests/sharding/merge_let_params_size_estimation.js b/jstests/sharding/merge_let_params_size_estimation.js
new file mode 100644
index 00000000000..66f30d38335
--- /dev/null
+++ b/jstests/sharding/merge_let_params_size_estimation.js
@@ -0,0 +1,155 @@
+/**
+ * Test which verifies that $merge accounts for the size of let parameters and runtime constants
+ * when it serializes writes to send to other nodes.
+ *
+ * @tags: [
+ * # The $merge in this test targets the '_id' field, and requires a unique index.
+ * expects_explicit_underscore_id_index,
+ * ]
+ */
+(function() {
+"use strict";
+
+load('jstests/libs/fixture_helpers.js'); // For isReplSet().
+
+// Function to run the test against a test fixture. Accepts an object that contains the following
+// fields:
+// - testFixture: The fixture to run the test against.
+// - conn: Connection to the test fixture specified above.
+// - shardLocal and shardOutput: Indicates whether the local/output collection should be sharded in
+// this test run (ignored when not running against a sharded cluster).
+function runTest({testFixture, conn, shardLocal, shardOutput}) {
+ const dbName = "db";
+ const collName = "merge_let_params";
+ const dbCollName = dbName + "." + collName;
+ const outCollName = "outcoll";
+ const dbOutCollName = dbName + "." + outCollName;
+ const admin = conn.getDB("admin");
+ const isReplSet = FixtureHelpers.isReplSet(admin);
+
+ function shardColls() {
+ // When running against a sharded cluster, configure the collections according to
+ // 'shardLocal' and 'shardOutput'.
+ if (!isReplSet) {
+ assert.commandWorked(admin.runCommand({enableSharding: dbName}));
+ testFixture.ensurePrimaryShard(dbName, testFixture.shard0.shardName);
+ if (shardLocal) {
+ testFixture.shardColl(collName, {_id: 1}, {_id: 0}, {_id: 0}, dbName);
+ }
+ if (shardOutput) {
+ testFixture.shardColl(outCollName, {_id: 1}, {_id: 0}, {_id: 0}, dbName);
+ }
+ }
+ }
+ const coll = conn.getCollection(dbCollName);
+ const outColl = conn.getCollection(dbOutCollName);
+ coll.drop();
+ outColl.drop();
+ shardColls();
+
+ // Insert two large documents in both collections. By inserting the documents with the same _id
+ // values in both collections and splitting these documents between chunks, this will guarantee
+ // that we need to serialize and send update command(s) across the wire when targeting the
+ // output collection.
+ const kOneMB = 1024 * 1024;
+ const kDataString = "a".repeat(4 * kOneMB);
+ const kDocs = [{_id: 2, data: kDataString}, {_id: -2, data: kDataString}];
+ assert.commandWorked(coll.insertMany(kDocs));
+ assert.commandWorked(outColl.insertMany(kDocs));
+
+ // The sizes of the different update command components are deliberately chosen to test the
+ // batching logic when the update is targeted to another node in the cluster. In particular, the
+ // update command will contain the 10MB 'outFieldValue' and we will be updating two 4MB
+ // documents. The 18MB total exceeds the 16MB size limit, so we expect the batching logic to
+ // split the two documents into separate batches of 14MB each.
+ const outFieldValue = "a".repeat(10 * kOneMB);
+ let aggCommand = {
+ pipeline: [{
+ $merge: {
+ into: {db: "db", coll: outCollName},
+ on: "_id",
+ whenMatched: [{$addFields: {out: "$$outField"}}],
+ whenNotMatched: "insert"
+ }
+ }],
+ cursor: {},
+ let : {"outField": outFieldValue}
+ };
+
+ // If this is a replica set, we need to target a secondary node to force writes to go over
+ // the wire.
+ const aggColl = isReplSet ? testFixture.getSecondary().getCollection(dbCollName) : coll;
+
+ if (isReplSet) {
+ aggCommand["$readPreference"] = {mode: "secondary"};
+ }
+
+ // The aggregate should not fail.
+ assert.commandWorked(aggColl.runCommand("aggregate", aggCommand));
+
+ // Verify that each document in the output collection contains the value of 'outField'.
+ let outContents = outColl.find().toArray();
+ for (const res of outContents) {
+ const out = res["out"];
+ assert.eq(out, outFieldValue, outContents);
+ }
+
+ assert(coll.drop());
+ assert(outColl.drop());
+ shardColls();
+
+ // Insert four large documents in both collections. As before, this will force updates to be
+ // sent across the wire, but this will generate double the batches.
+ const kMoreDocs = [
+ {_id: -2, data: kDataString},
+ {_id: -1, data: kDataString},
+ {_id: 1, data: kDataString},
+ {_id: 2, data: kDataString},
+ ];
+
+ assert.commandWorked(coll.insertMany(kMoreDocs));
+ assert.commandWorked(outColl.insertMany(kMoreDocs));
+
+ // The aggregate should not fail.
+ assert.commandWorked(aggColl.runCommand("aggregate", aggCommand));
+
+ // Verify that each document in the output collection contains the value of 'outField'.
+ outContents = outColl.find().toArray();
+ for (const res of outContents) {
+ const out = res["out"];
+ assert.eq(out, outFieldValue, outContents);
+ }
+
+ assert(coll.drop());
+ assert(outColl.drop());
+ shardColls();
+
+ // If the documents and the let parameters are large enough, the $merge is expected to fail.
+ const kVeryLargeDataString = "a".repeat(10 * kOneMB);
+ const kLargeDocs =
+ [{_id: 2, data: kVeryLargeDataString}, {_id: -2, data: kVeryLargeDataString}];
+ assert.commandWorked(coll.insertMany(kLargeDocs));
+ assert.commandWorked(outColl.insertMany(kLargeDocs));
+ assert.commandFailedWithCode(aggColl.runCommand("aggregate", aggCommand),
+ ErrorCodes.BSONObjectTooLarge);
+}
+
+// Test against a replica set.
+const rst = new ReplSetTest({nodes: 2});
+rst.startSet();
+rst.initiate();
+rst.awaitSecondaryNodes();
+
+runTest({testFixture: rst, conn: rst.getPrimary()});
+
+rst.stopSet();
+
+// Test against a sharded cluster.
+const st = new ShardingTest({shards: 2, mongos: 1});
+runTest({testFixture: st, conn: st.s0, shardLocal: false, shardOutput: false});
+runTest({testFixture: st, conn: st.s0, shardLocal: true, shardOutput: false});
+runTest({testFixture: st, conn: st.s0, shardLocal: false, shardOutput: true});
+runTest({testFixture: st, conn: st.s0, shardLocal: true, shardOutput: true});
+
+st.stop();
+})();
diff --git a/src/mongo/db/commands/write_commands.cpp b/src/mongo/db/commands/write_commands.cpp
index 60e56da7180..ceb797e339c 100644
--- a/src/mongo/db/commands/write_commands.cpp
+++ b/src/mongo/db/commands/write_commands.cpp
@@ -258,6 +258,11 @@ public:
}
write_ops::InsertCommandReply typedRun(OperationContext* opCtx) final try {
+ // On debug builds, verify that the estimated size of the insert command is at least as
+ // large as the size of the actual, serialized insert command. This ensures that the
+ // logic which estimates the size of insert commands is correct.
+ dassert(write_ops::verifySizeEstimate(request()));
+
doTransactionValidationForWrites(opCtx, ns());
if (request().getEncryptionInformation().has_value()) {
// Flag set here and in fle_crud.cpp since this only executes on a mongod.
@@ -430,6 +435,11 @@ public:
}
write_ops::UpdateCommandReply typedRun(OperationContext* opCtx) final try {
+ // On debug builds, verify that the estimated size of the update command is at least as
+ // large as the size of the actual, serialized update command. This ensures that the
+ // logic which estimates the size of update commands is correct.
+ dassert(write_ops::verifySizeEstimate(request()));
+
doTransactionValidationForWrites(opCtx, ns());
write_ops::UpdateCommandReply updateReply;
OperationSource source = OperationSource::kStandard;
@@ -450,15 +460,6 @@ public:
source = OperationSource::kTimeseriesUpdate;
}
- // On debug builds, verify that the estimated size of the updates are at least as large
- // as the actual, serialized size. This ensures that the logic that estimates the size
- // of deletes for batch writes is correct.
- if constexpr (kDebugBuild) {
- for (auto&& updateOp : request().getUpdates()) {
- invariant(write_ops::verifySizeEstimate(updateOp));
- }
- }
-
long long nModified = 0;
// Tracks the upserted information. The memory of this variable gets moved in the
@@ -669,6 +670,11 @@ public:
}
write_ops::DeleteCommandReply typedRun(OperationContext* opCtx) final try {
+ // On debug builds, verify that the estimated size of the deletes are at least as large
+ // as the actual, serialized size. This ensures that the logic that estimates the size
+ // of deletes for batch writes is correct.
+ dassert(write_ops::verifySizeEstimate(request()));
+
doTransactionValidationForWrites(opCtx, ns());
write_ops::DeleteCommandReply deleteReply;
OperationSource source = OperationSource::kStandard;
@@ -686,14 +692,6 @@ public:
source = OperationSource::kTimeseriesDelete;
}
- // On debug builds, verify that the estimated size of the deletes are at least as large
- // as the actual, serialized size. This ensures that the logic that estimates the size
- // of deletes for batch writes is correct.
- if constexpr (kDebugBuild) {
- for (auto&& deleteOp : request().getDeletes()) {
- invariant(write_ops::verifySizeEstimate(deleteOp));
- }
- }
auto reply = write_ops_exec::performDeletes(opCtx, request(), source);
populateReply(opCtx,
diff --git a/src/mongo/db/ops/write_ops.cpp b/src/mongo/db/ops/write_ops.cpp
index c57f65ccdf8..7bac799c5cf 100644
--- a/src/mongo/db/ops/write_ops.cpp
+++ b/src/mongo/db/ops/write_ops.cpp
@@ -57,6 +57,9 @@ namespace {
// each element.
static constexpr int kPerElementOverhead = 2;
+// This constant accounts for the size of a bool.
+static constexpr int kBoolSize = 1;
+
// This constant tracks the overhead for serializing UUIDs. It includes 1 byte for the
// 'BinDataType', 4 bytes for serializing the integer size of the UUID, and finally, 16 bytes
// for the UUID itself.
@@ -86,6 +89,60 @@ void checkOpCountForCommand(const T& op, size_t numOps) {
}
}
+// Utility which estimates the size of 'WriteCommandRequestBase' when serialized.
+int getWriteCommandRequestBaseSize(const WriteCommandRequestBase& base) {
+ static const int kSizeOfOrderedField =
+ write_ops::WriteCommandRequestBase::kOrderedFieldName.size() + kBoolSize +
+ kPerElementOverhead;
+ static const int kSizeOfBypassDocumentValidationField =
+ write_ops::WriteCommandRequestBase::kBypassDocumentValidationFieldName.size() + kBoolSize +
+ kPerElementOverhead;
+
+ auto estSize = static_cast<int>(BSONObj::kMinBSONLength) + kSizeOfOrderedField +
+ kSizeOfBypassDocumentValidationField;
+
+ if (auto stmtId = base.getStmtId(); stmtId) {
+ estSize += write_ops::WriteCommandRequestBase::kStmtIdFieldName.size() +
+ sizeof(std::int32_t) + kPerElementOverhead;
+ }
+
+ if (auto stmtIds = base.getStmtIds(); stmtIds) {
+ estSize += write_ops::WriteCommandRequestBase::kStmtIdsFieldName.size();
+ estSize += static_cast<int>(BSONObj::kMinBSONLength);
+ estSize +=
+ ((sizeof(std::int32_t) + write_ops::kWriteCommandBSONArrayPerElementOverheadBytes) *
+ stmtIds->size());
+ estSize += kPerElementOverhead;
+ }
+
+ if (auto isTimeseries = base.getIsTimeseriesNamespace(); isTimeseries.has_value()) {
+ estSize += write_ops::WriteCommandRequestBase::kIsTimeseriesNamespaceFieldName.size() +
+ kBoolSize + kPerElementOverhead;
+ }
+
+ if (auto collUUID = base.getCollectionUUID(); collUUID) {
+ estSize += write_ops::WriteCommandRequestBase::kCollectionUUIDFieldName.size() + kUUIDSize +
+ kPerElementOverhead;
+ }
+
+ if (auto encryptionInfo = base.getEncryptionInformation(); encryptionInfo) {
+ estSize += write_ops::WriteCommandRequestBase::kEncryptionInformationFieldName.size() +
+ encryptionInfo->toBSON().objsize() + kPerElementOverhead;
+ }
+
+ if (auto query = base.getOriginalQuery(); query) {
+ estSize += write_ops::WriteCommandRequestBase::kOriginalQueryFieldName.size() +
+ query->objsize() + kPerElementOverhead;
+ }
+
+ if (auto originalCollation = base.getOriginalCollation(); originalCollation) {
+ estSize += write_ops::WriteCommandRequestBase::kOriginalCollationFieldName.size() +
+ originalCollation->objsize() + kPerElementOverhead;
+ }
+
+ return estSize;
+}
+
} // namespace
namespace write_ops {
@@ -145,6 +202,38 @@ int32_t getStmtIdForWriteAt(const WriteCommandRequestBase& writeCommandBase, siz
return kFirstStmtId + writePos;
}
+int estimateRuntimeConstantsSize(const mongo::LegacyRuntimeConstants& constants) {
+ int size = write_ops::UpdateCommandRequest::kLegacyRuntimeConstantsFieldName.size() +
+ static_cast<int>(BSONObj::kMinBSONLength) + kPerElementOverhead;
+
+ // $$NOW
+ size +=
+ LegacyRuntimeConstants::kLocalNowFieldName.size() + sizeof(Date_t) + kPerElementOverhead;
+
+ // $$CLUSTER_TIME
+ size += LegacyRuntimeConstants::kClusterTimeFieldName.size() + sizeof(Timestamp) +
+ kPerElementOverhead;
+
+ // $$JS_SCOPE
+ if (const auto& scope = constants.getJsScope(); scope.has_value()) {
+ size += LegacyRuntimeConstants::kJsScopeFieldName.size() + scope->objsize() +
+ kPerElementOverhead;
+ }
+
+ // $$IS_MR
+ if (const auto& isMR = constants.getIsMapReduce(); isMR.has_value()) {
+ size +=
+ LegacyRuntimeConstants::kIsMapReduceFieldName.size() + kBoolSize + kPerElementOverhead;
+ }
+
+ // $$USER_ROLES
+ if (const auto& userRoles = constants.getUserRoles(); userRoles.has_value()) {
+ size += LegacyRuntimeConstants::kUserRolesFieldName.size() + userRoles->objsize() +
+ kPerElementOverhead;
+ }
+ return size;
+}
+
int getUpdateSizeEstimate(const BSONObj& q,
const write_ops::UpdateModification& u,
const boost::optional<mongo::BSONObj>& c,
@@ -155,11 +244,6 @@ int getUpdateSizeEstimate(const BSONObj& q,
const boost::optional<UUID>& sampleId,
const bool includeAllowShardKeyUpdatesWithoutFullShardKeyInQuery) {
using UpdateOpEntry = write_ops::UpdateOpEntry;
-
- // This constant accounts for the null terminator in each field name and the BSONType byte for
- // each element.
- static const int kPerElementOverhead = 2;
- static const int kBoolSize = 1;
int estSize = static_cast<int>(BSONObj::kMinBSONLength);
// Add the sizes of the 'multi' and 'upsert' fields.
@@ -268,6 +352,123 @@ bool verifySizeEstimate(const write_ops::UpdateOpEntry& update) {
update.toBSON().objsize();
}
+bool verifySizeEstimate(const InsertCommandRequest& insertReq) {
+ int size = getInsertHeaderSizeEstimate(insertReq);
+ for (auto&& docToInsert : insertReq.getDocuments()) {
+ size += docToInsert.objsize() + kWriteCommandBSONArrayPerElementOverheadBytes;
+ }
+ return size >= insertReq.toBSON({} /* commandPassthroughFields */).objsize();
+}
+
+bool verifySizeEstimate(const UpdateCommandRequest& updateReq) {
+ int size = getUpdateHeaderSizeEstimate(updateReq);
+
+ for (auto&& update : updateReq.getUpdates()) {
+ size += getUpdateSizeEstimate(
+ update.getQ(),
+ update.getU(),
+ update.getC(),
+ update.getUpsertSupplied().has_value(),
+ update.getCollation(),
+ update.getArrayFilters(),
+ update.getHint(),
+ update.getSampleId(),
+ update.getAllowShardKeyUpdatesWithoutFullShardKeyInQuery().has_value()) +
+ kWriteCommandBSONArrayPerElementOverheadBytes;
+ }
+ return size >= updateReq.toBSON({} /* commandPassthroughFields */).objsize();
+}
+
+bool verifySizeEstimate(const DeleteCommandRequest& deleteReq) {
+ int size = getDeleteHeaderSizeEstimate(deleteReq);
+
+ for (auto&& deleteOp : deleteReq.getDeletes()) {
+ size += write_ops::getDeleteSizeEstimate(deleteOp.getQ(),
+ deleteOp.getCollation(),
+ deleteOp.getHint(),
+ deleteOp.getSampleId()) +
+ kWriteCommandBSONArrayPerElementOverheadBytes;
+ }
+ return size >= deleteReq.toBSON({} /* commandPassthroughFields */).objsize();
+}
+
+int getInsertHeaderSizeEstimate(const InsertCommandRequest& insertReq) {
+ int size = getWriteCommandRequestBaseSize(insertReq.getWriteCommandRequestBase()) +
+ write_ops::InsertCommandRequest::kDocumentsFieldName.size() + kPerElementOverhead +
+ static_cast<int>(BSONObj::kMinBSONLength);
+
+ size += InsertCommandRequest::kCommandName.size() + kPerElementOverhead +
+ insertReq.getNamespace().size() + 1 /* ns string null terminator */;
+
+ // Handle $tenant. Note that $tenant is injected as a hidden field into all IDL commands, unlike
+ // other passthrough fields.
+ if (auto tenant = insertReq.getDollarTenant(); tenant.has_value()) {
+ size += InsertCommandRequest::kDollarTenantFieldName.size() + OID::kOIDSize +
+ kPerElementOverhead;
+ }
+ return size;
+}
+
+int getUpdateHeaderSizeEstimate(const UpdateCommandRequest& updateReq) {
+ int size = getWriteCommandRequestBaseSize(updateReq.getWriteCommandRequestBase());
+
+ size += UpdateCommandRequest::kCommandName.size() + kPerElementOverhead +
+ updateReq.getNamespace().size() + 1 /* ns string null terminator */;
+
+ size += write_ops::UpdateCommandRequest::kUpdatesFieldName.size() + kPerElementOverhead +
+ static_cast<int>(BSONObj::kMinBSONLength);
+
+ // Handle $tenant. Note that $tenant is injected as a hidden field into all IDL commands, unlike
+ // other passthrough fields.
+ if (auto tenant = updateReq.getDollarTenant(); tenant.has_value()) {
+ size += UpdateCommandRequest::kDollarTenantFieldName.size() + OID::kOIDSize +
+ kPerElementOverhead;
+ }
+
+ // Handle legacy runtime constants.
+ if (auto runtimeConstants = updateReq.getLegacyRuntimeConstants();
+ runtimeConstants.has_value()) {
+ size += estimateRuntimeConstantsSize(*runtimeConstants);
+ }
+
+ // Handle let parameters.
+ if (auto let = updateReq.getLet(); let.has_value()) {
+ size += write_ops::UpdateCommandRequest::kLetFieldName.size() + let->objsize() +
+ kPerElementOverhead;
+ }
+ return size;
+}
+
+int getDeleteHeaderSizeEstimate(const DeleteCommandRequest& deleteReq) {
+ int size = getWriteCommandRequestBaseSize(deleteReq.getWriteCommandRequestBase());
+
+ size += DeleteCommandRequest::kCommandName.size() + kPerElementOverhead +
+ deleteReq.getNamespace().size() + 1 /* ns string null terminator */;
+
+ size += write_ops::DeleteCommandRequest::kDeletesFieldName.size() + kPerElementOverhead +
+ static_cast<int>(BSONObj::kMinBSONLength);
+
+ // Handle $tenant. Note that $tenant is injected as a hidden field into all IDL commands, unlike
+ // other passthrough fields.
+ if (auto tenant = deleteReq.getDollarTenant(); tenant.has_value()) {
+ size += DeleteCommandRequest::kDollarTenantFieldName.size() + OID::kOIDSize +
+ kPerElementOverhead;
+ }
+
+ // Handle legacy runtime constants.
+ if (auto runtimeConstants = deleteReq.getLegacyRuntimeConstants();
+ runtimeConstants.has_value()) {
+ size += estimateRuntimeConstantsSize(*runtimeConstants);
+ }
+
+ // Handle let parameters.
+ if (auto let = deleteReq.getLet(); let.has_value()) {
+ size += write_ops::UpdateCommandRequest::kLetFieldName.size() + let->objsize() +
+ kPerElementOverhead;
+ }
+ return size;
+}
+
bool verifySizeEstimate(const write_ops::DeleteOpEntry& deleteOp) {
return write_ops::getDeleteSizeEstimate(deleteOp.getQ(),
deleteOp.getCollation(),
diff --git a/src/mongo/db/ops/write_ops.h b/src/mongo/db/ops/write_ops.h
index 2545c8eb78e..4f44b8e0c95 100644
--- a/src/mongo/db/ops/write_ops.h
+++ b/src/mongo/db/ops/write_ops.h
@@ -127,6 +127,17 @@ int getDeleteSizeEstimate(const BSONObj& q,
*/
bool verifySizeEstimate(const write_ops::UpdateOpEntry& update);
bool verifySizeEstimate(const write_ops::DeleteOpEntry& deleteOp);
+bool verifySizeEstimate(const InsertCommandRequest& insertReq);
+bool verifySizeEstimate(const UpdateCommandRequest& updateReq);
+bool verifySizeEstimate(const DeleteCommandRequest& deleteReq);
+
+/**
+ * Set of utilities which estimate the size of the headers (that is, all fields in a write command
+ * outside of the write statements themselves) of an insert/update/delete command, respectively.
+ */
+int getInsertHeaderSizeEstimate(const InsertCommandRequest& insertReq);
+int getUpdateHeaderSizeEstimate(const UpdateCommandRequest& updateReq);
+int getDeleteHeaderSizeEstimate(const DeleteCommandRequest& deleteReq);
/**
* If the response from a write command contains any write errors, it will throw the first one. All
diff --git a/src/mongo/db/ops/write_ops.idl b/src/mongo/db/ops/write_ops.idl
index a2d80295d03..1991a286637 100644
--- a/src/mongo/db/ops/write_ops.idl
+++ b/src/mongo/db/ops/write_ops.idl
@@ -151,6 +151,8 @@ structs:
chained_structs:
WriteCommandReplyBase: writeCommandReplyBase
+ # IMPORTANT: If any changes are made to the fields here, please update the corresponding size
+ # estimation functions in 'write_ops.cpp'.
WriteCommandRequestBase:
description: "Contains basic information included by all write commands"
strict: false
@@ -363,7 +365,8 @@ structs:
stability: unstable
commands:
-
+ # IMPORTANT: If any changes are made to the fields here, please update the corresponding insert
+ # size estimation functions in 'write_ops.cpp'.
insert:
description: "Parser for the 'insert' command."
command_name: insert
@@ -386,6 +389,8 @@ commands:
supports_doc_sequence: true
stability: stable
+ # IMPORTANT: If any changes are made to the fields here, please update the corresponding update
+ # size estimation functions in 'write_ops.cpp'.
update:
description: "Parser for the 'update' command."
command_name: update
@@ -421,6 +426,8 @@ commands:
optional: true
stability: stable
+ # IMPORTANT: If any changes are made to the fields here, please update the corresponding delete
+ # size estimation functions in 'write_ops.cpp'.
delete:
description: "Parser for the 'delete' command."
command_name: delete
diff --git a/src/mongo/db/ops/write_ops_exec_test.cpp b/src/mongo/db/ops/write_ops_exec_test.cpp
index f977d7878dc..a0210671b7b 100644
--- a/src/mongo/db/ops/write_ops_exec_test.cpp
+++ b/src/mongo/db/ops/write_ops_exec_test.cpp
@@ -122,6 +122,200 @@ TEST_F(WriteOpsExecTest, TestDeleteSizeEstimationLogic) {
ASSERT(write_ops::verifySizeEstimate(deleteOpEntry));
}
+TEST_F(WriteOpsExecTest, TestInsertRequestSizeEstimationLogic) {
+ NamespaceString ns =
+ NamespaceString::createNamespaceString_forTest("db_write_ops_exec_test", "insert_test");
+ write_ops::InsertCommandRequest insert(ns);
+ BSONObj docToInsert(fromjson("{_id: 1, foo: 1}"));
+ insert.setDocuments({docToInsert});
+ ASSERT(write_ops::verifySizeEstimate(insert));
+
+ // Configure $tenant.
+ insert.setDollarTenant(mongo::TenantId(mongo::OID::gen()));
+ ASSERT(write_ops::verifySizeEstimate(insert));
+
+ // Configure different fields for 'wcb'.
+ write_ops::WriteCommandRequestBase wcb;
+
+ // stmtId
+ wcb.setStmtId(2);
+ insert.setWriteCommandRequestBase(wcb);
+ ASSERT(write_ops::verifySizeEstimate(insert));
+
+ // stmtIds
+ wcb.setStmtIds(std::vector<int32_t>{2, 3});
+ insert.setWriteCommandRequestBase(wcb);
+ ASSERT(write_ops::verifySizeEstimate(insert));
+
+ // isTimeseries
+ wcb.setIsTimeseriesNamespace(true);
+ insert.setWriteCommandRequestBase(wcb);
+ ASSERT(write_ops::verifySizeEstimate(insert));
+
+ // collUUID
+ wcb.setCollectionUUID(UUID::gen());
+ insert.setWriteCommandRequestBase(wcb);
+ ASSERT(write_ops::verifySizeEstimate(insert));
+
+ // encryptionInfo
+ wcb.setEncryptionInformation(
+ EncryptionInformation(fromjson("{schema: 'I love encrypting and protecting my data'}")));
+ insert.setWriteCommandRequestBase(wcb);
+ ASSERT(write_ops::verifySizeEstimate(insert));
+
+ // originalQuery
+ wcb.setOriginalQuery(fromjson("{field: 'value'}"));
+ insert.setWriteCommandRequestBase(wcb);
+ ASSERT(write_ops::verifySizeEstimate(insert));
+
+ // originalCollation
+ wcb.setOriginalCollation(fromjson("{locale: 'fr'}"));
+ insert.setWriteCommandRequestBase(wcb);
+ ASSERT(write_ops::verifySizeEstimate(insert));
+}
+
+TEST_F(WriteOpsExecTest, TestUpdateRequestSizeEstimationLogic) {
+ NamespaceString ns =
+ NamespaceString::createNamespaceString_forTest("db_write_ops_exec_test", "update_test");
+ write_ops::UpdateCommandRequest update(ns);
+
+ const BSONObj updateStmt = fromjson("{$set: {a: 5}}");
+ auto mod = write_ops::UpdateModification::parseFromClassicUpdate(updateStmt);
+ write_ops::UpdateOpEntry updateOpEntry(BSON("_id" << 1), std::move(mod));
+ update.setUpdates({updateOpEntry});
+
+ ASSERT(write_ops::verifySizeEstimate(update));
+
+ // Configure $tenant.
+ update.setDollarTenant(mongo::TenantId(mongo::OID::gen()));
+ ASSERT(write_ops::verifySizeEstimate(update));
+
+ // Configure different fields for 'wcb'.
+ write_ops::WriteCommandRequestBase wcb;
+
+ // stmtId
+ wcb.setStmtId(2);
+ update.setWriteCommandRequestBase(wcb);
+ ASSERT(write_ops::verifySizeEstimate(update));
+
+ // stmtIds
+ wcb.setStmtIds(std::vector<int32_t>{2, 3});
+ update.setWriteCommandRequestBase(wcb);
+ ASSERT(write_ops::verifySizeEstimate(update));
+
+ // isTimeseries
+ wcb.setIsTimeseriesNamespace(true);
+ update.setWriteCommandRequestBase(wcb);
+ ASSERT(write_ops::verifySizeEstimate(update));
+
+ // collUUID
+ wcb.setCollectionUUID(UUID::gen());
+ update.setWriteCommandRequestBase(wcb);
+ ASSERT(write_ops::verifySizeEstimate(update));
+
+ // encryptionInfo
+ wcb.setEncryptionInformation(
+ EncryptionInformation(fromjson("{schema: 'I love encrypting and protecting my data'}")));
+ update.setWriteCommandRequestBase(wcb);
+ ASSERT(write_ops::verifySizeEstimate(update));
+
+ // originalQuery
+ wcb.setOriginalQuery(fromjson("{field: 'value'}"));
+ update.setWriteCommandRequestBase(wcb);
+ ASSERT(write_ops::verifySizeEstimate(update));
+
+ // originalCollation
+ wcb.setOriginalCollation(fromjson("{locale: 'fr'}"));
+ update.setWriteCommandRequestBase(wcb);
+ ASSERT(write_ops::verifySizeEstimate(update));
+
+ // Configure different fields specific to 'UpdateStatementRequest'.
+ LegacyRuntimeConstants legacyRuntimeConstants;
+ const auto now = Date_t::now();
+
+ // At a minimum, $$NOW and $$CLUSTER_TIME must be set.
+ legacyRuntimeConstants.setLocalNow(now);
+ legacyRuntimeConstants.setClusterTime(Timestamp(now));
+ update.setLegacyRuntimeConstants(legacyRuntimeConstants);
+ ASSERT(write_ops::verifySizeEstimate(update));
+
+ // $$JS_SCOPE
+ BSONObj jsScope = fromjson("{constant: 'I love mapReduce and javascript :D'}");
+ legacyRuntimeConstants.setJsScope(jsScope);
+ update.setLegacyRuntimeConstants(legacyRuntimeConstants);
+ ASSERT(write_ops::verifySizeEstimate(update));
+
+ // $$IS_MR
+ legacyRuntimeConstants.setIsMapReduce(true);
+ update.setLegacyRuntimeConstants(legacyRuntimeConstants);
+ ASSERT(write_ops::verifySizeEstimate(update));
+
+ // $$USER_ROLES
+ BSONArray arr = BSON_ARRAY(fromjson("{role: 'readWriteAnyDatabase', db: 'admin'}"));
+ legacyRuntimeConstants.setUserRoles(arr);
+ update.setLegacyRuntimeConstants(legacyRuntimeConstants);
+ ASSERT(write_ops::verifySizeEstimate(update));
+
+ const std::string kLargeString(100 * 1024, 'b');
+ BSONObj letParams = BSON("largeStrParam" << kLargeString);
+ update.setLet(letParams);
+ ASSERT(write_ops::verifySizeEstimate(update));
+}
+
+TEST_F(WriteOpsExecTest, TestDeleteRequestSizeEstimationLogic) {
+ NamespaceString ns =
+ NamespaceString::createNamespaceString_forTest("db_write_ops_exec_test", "delete_test");
+ write_ops::DeleteCommandRequest deleteReq(ns);
+ // Basic test case.
+ write_ops::DeleteOpEntry deleteOpEntry(BSON("_id" << 1), false /* multi */);
+ deleteReq.setDeletes({deleteOpEntry});
+
+ ASSERT(write_ops::verifySizeEstimate(deleteReq));
+
+ // Configure $tenant.
+ deleteReq.setDollarTenant(mongo::TenantId(mongo::OID::gen()));
+ ASSERT(write_ops::verifySizeEstimate(deleteReq));
+
+ // Configure different fields for 'wcb'.
+ write_ops::WriteCommandRequestBase wcb;
+
+ // stmtId
+ wcb.setStmtId(2);
+ deleteReq.setWriteCommandRequestBase(wcb);
+ ASSERT(write_ops::verifySizeEstimate(deleteReq));
+
+ // stmtIds
+ wcb.setStmtIds(std::vector<int32_t>{2, 3});
+ deleteReq.setWriteCommandRequestBase(wcb);
+ ASSERT(write_ops::verifySizeEstimate(deleteReq));
+
+ // isTimeseries
+ wcb.setIsTimeseriesNamespace(true);
+ deleteReq.setWriteCommandRequestBase(wcb);
+ ASSERT(write_ops::verifySizeEstimate(deleteReq));
+
+ // collUUID
+ wcb.setCollectionUUID(UUID::gen());
+ deleteReq.setWriteCommandRequestBase(wcb);
+ ASSERT(write_ops::verifySizeEstimate(deleteReq));
+
+ // encryptionInfo
+ wcb.setEncryptionInformation(
+ EncryptionInformation(fromjson("{schema: 'I love encrypting and protecting my data'}")));
+ deleteReq.setWriteCommandRequestBase(wcb);
+ ASSERT(write_ops::verifySizeEstimate(deleteReq));
+
+ // originalQuery
+ wcb.setOriginalQuery(fromjson("{field: 'value'}"));
+ deleteReq.setWriteCommandRequestBase(wcb);
+ ASSERT(write_ops::verifySizeEstimate(deleteReq));
+
+ // originalCollation
+ wcb.setOriginalCollation(fromjson("{locale: 'fr'}"));
+ deleteReq.setWriteCommandRequestBase(wcb);
+ ASSERT(write_ops::verifySizeEstimate(deleteReq));
+}
+
TEST_F(WriteOpsExecTest, PerformAtomicTimeseriesWritesWithTransform) {
NamespaceString ns =
NamespaceString::createNamespaceString_forTest("db_write_ops_exec_test", "ts");
diff --git a/src/mongo/db/pipeline/document_source_merge.cpp b/src/mongo/db/pipeline/document_source_merge.cpp
index 722de5936f9..896a50ad0dc 100644
--- a/src/mongo/db/pipeline/document_source_merge.cpp
+++ b/src/mongo/db/pipeline/document_source_merge.cpp
@@ -60,6 +60,7 @@ namespace {
using MergeStrategyDescriptor = DocumentSourceMerge::MergeStrategyDescriptor;
using MergeMode = MergeStrategyDescriptor::MergeMode;
using MergeStrategy = MergeStrategyDescriptor::MergeStrategy;
+using BatchedCommandGenerator = MergeStrategyDescriptor::BatchedCommandGenerator;
using MergeStrategyDescriptorsMap = std::map<const MergeMode, const MergeStrategyDescriptor>;
using WhenMatched = MergeStrategyDescriptor::WhenMatched;
using WhenNotMatched = MergeStrategyDescriptor::WhenNotMatched;
@@ -86,6 +87,55 @@ constexpr auto kPipelineDiscardMode = MergeMode{WhenMatched::kPipeline, WhenNotM
const auto kDefaultPipelineLet = BSON("new"
<< "$$ROOT");
+BatchedCommandGenerator makeInsertCommandGenerator() {
+ return [](const auto& expCtx, const auto& ns) -> BatchedCommandRequest {
+ return DocumentSourceMerge::DocumentSourceWriter::makeInsertCommand(
+ ns, expCtx->bypassDocumentValidation);
+ };
+}
+
+BatchedCommandGenerator makeUpdateCommandGenerator() {
+ return [](const auto& expCtx, const auto& ns) -> BatchedCommandRequest {
+ write_ops::UpdateCommandRequest updateOp(ns);
+ updateOp.setWriteCommandRequestBase([&] {
+ write_ops::WriteCommandRequestBase wcb;
+ wcb.setOrdered(false);
+ wcb.setBypassDocumentValidation(expCtx->bypassDocumentValidation);
+ return wcb;
+ }());
+ auto [constants, letParams] =
+ expCtx->variablesParseState.transitionalCompatibilitySerialize(expCtx->variables);
+ updateOp.setLegacyRuntimeConstants(std::move(constants));
+ if (!letParams.isEmpty()) {
+ updateOp.setLet(std::move(letParams));
+ }
+ return BatchedCommandRequest(std::move(updateOp));
+ };
+}
+
+/**
+ * Converts 'batch' into a vector of UpdateOpEntries.
+ */
+std::vector<write_ops::UpdateOpEntry> constructUpdateEntries(
+ DocumentSourceMerge::DocumentSourceWriter::BatchedObjects&& batch,
+ UpsertType upsert,
+ bool multi) {
+ std::vector<write_ops::UpdateOpEntry> updateEntries;
+ for (auto&& obj : batch) {
+ write_ops::UpdateOpEntry entry;
+ auto&& [q, u, c] = obj;
+ entry.setQ(std::move(q));
+ entry.setU(std::move(u));
+ entry.setC(std::move(c));
+ entry.setUpsert(upsert != UpsertType::kNone);
+ entry.setUpsertSupplied({{entry.getUpsert(), upsert == UpsertType::kInsertSuppliedDoc}});
+ entry.setMulti(multi);
+
+ updateEntries.push_back(std::move(entry));
+ }
+ return updateEntries;
+}
+
/**
* Creates a merge strategy which uses update semantics to perform a merge operation.
*/
@@ -95,10 +145,13 @@ MergeStrategy makeUpdateStrategy() {
const auto& wc,
auto epoch,
auto&& batch,
+ auto&& bcr,
UpsertType upsert) {
constexpr auto multi = false;
+ auto updateCommand = bcr.extractUpdateRequest();
+ updateCommand->setUpdates(constructUpdateEntries(std::move(batch), upsert, multi));
uassertStatusOK(expCtx->mongoProcessInterface->update(
- expCtx, ns, std::move(batch), wc, upsert, multi, epoch));
+ expCtx, ns, std::move(updateCommand), wc, upsert, multi, epoch));
};
}
@@ -115,11 +168,14 @@ MergeStrategy makeStrictUpdateStrategy() {
const auto& wc,
auto epoch,
auto&& batch,
+ auto&& bcr,
UpsertType upsert) {
const int64_t batchSize = batch.size();
constexpr auto multi = false;
+ auto updateCommand = bcr.extractUpdateRequest();
+ updateCommand->setUpdates(constructUpdateEntries(std::move(batch), upsert, multi));
auto updateResult = uassertStatusOK(expCtx->mongoProcessInterface->update(
- expCtx, ns, std::move(batch), wc, upsert, multi, epoch));
+ expCtx, ns, std::move(updateCommand), wc, upsert, multi, epoch));
uassert(ErrorCodes::MergeStageNoMatchingDocument,
"{} could not find a matching document in the target collection "
"for at least one document in the source collection"_format(kStageName),
@@ -136,6 +192,7 @@ MergeStrategy makeInsertStrategy() {
const auto& wc,
auto epoch,
auto&& batch,
+ auto&& bcr,
UpsertType upsertType) {
std::vector<BSONObj> objectsToInsert(batch.size());
// The batch stores replacement style updates, but for this "insert" style of $merge we'd
@@ -143,8 +200,10 @@ MergeStrategy makeInsertStrategy() {
std::transform(batch.begin(), batch.end(), objectsToInsert.begin(), [](const auto& obj) {
return std::get<UpdateModification>(obj).getUpdateReplacement();
});
- uassertStatusOK(expCtx->mongoProcessInterface->insert(
- expCtx, ns, std::move(objectsToInsert), wc, epoch));
+ auto insertCommand = bcr.extractInsertRequest();
+ insertCommand->setDocuments(std::move(objectsToInsert));
+ uassertStatusOK(
+ expCtx->mongoProcessInterface->insert(expCtx, ns, std::move(insertCommand), wc, epoch));
};
}
@@ -174,72 +233,95 @@ const MergeStrategyDescriptorsMap& getDescriptors() {
// be initialized first. By wrapping the map into a function we can guarantee that it won't be
// initialized until the first use, which is when the program already started and all global
// variables had been initialized.
- static const auto mergeStrategyDescriptors = MergeStrategyDescriptorsMap{
- // whenMatched: replace, whenNotMatched: insert
- {kReplaceInsertMode,
- {kReplaceInsertMode,
- {ActionType::insert, ActionType::update},
- makeUpdateStrategy(),
- {},
- UpsertType::kGenerateNewDoc}},
- // whenMatched: replace, whenNotMatched: fail
- {kReplaceFailMode,
- {kReplaceFailMode,
- {ActionType::update},
- makeStrictUpdateStrategy(),
- {},
- UpsertType::kNone}},
- // whenMatched: replace, whenNotMatched: discard
- {kReplaceDiscardMode,
- {kReplaceDiscardMode, {ActionType::update}, makeUpdateStrategy(), {}, UpsertType::kNone}},
- // whenMatched: merge, whenNotMatched: insert
- {kMergeInsertMode,
- {kMergeInsertMode,
- {ActionType::insert, ActionType::update},
- makeUpdateStrategy(),
- makeUpdateTransform("$set"),
- UpsertType::kGenerateNewDoc}},
- // whenMatched: merge, whenNotMatched: fail
- {kMergeFailMode,
- {kMergeFailMode,
- {ActionType::update},
- makeStrictUpdateStrategy(),
- makeUpdateTransform("$set"),
- UpsertType::kNone}},
- // whenMatched: merge, whenNotMatched: discard
- {kMergeDiscardMode,
- {kMergeDiscardMode,
- {ActionType::update},
- makeUpdateStrategy(),
- makeUpdateTransform("$set"),
- UpsertType::kNone}},
- // whenMatched: keepExisting, whenNotMatched: insert
- {kKeepExistingInsertMode,
- {kKeepExistingInsertMode,
- {ActionType::insert, ActionType::update},
- makeUpdateStrategy(),
- makeUpdateTransform("$setOnInsert"),
- UpsertType::kGenerateNewDoc}},
- // whenMatched: [pipeline], whenNotMatched: insert
- {kPipelineInsertMode,
- {kPipelineInsertMode,
- {ActionType::insert, ActionType::update},
- makeUpdateStrategy(),
- {},
- UpsertType::kInsertSuppliedDoc}},
- // whenMatched: [pipeline], whenNotMatched: fail
- {kPipelineFailMode,
- {kPipelineFailMode,
- {ActionType::update},
- makeStrictUpdateStrategy(),
- {},
- UpsertType::kNone}},
- // whenMatched: [pipeline], whenNotMatched: discard
- {kPipelineDiscardMode,
- {kPipelineDiscardMode, {ActionType::update}, makeUpdateStrategy(), {}, UpsertType::kNone}},
- // whenMatched: fail, whenNotMatched: insert
- {kFailInsertMode,
- {kFailInsertMode, {ActionType::insert}, makeInsertStrategy(), {}, UpsertType::kNone}}};
+ static const auto mergeStrategyDescriptors =
+ MergeStrategyDescriptorsMap{// whenMatched: replace, whenNotMatched: insert
+ {kReplaceInsertMode,
+ {kReplaceInsertMode,
+ {ActionType::insert, ActionType::update},
+ makeUpdateStrategy(),
+ {},
+ UpsertType::kGenerateNewDoc,
+ makeUpdateCommandGenerator()}},
+ // whenMatched: replace, whenNotMatched: fail
+ {kReplaceFailMode,
+ {kReplaceFailMode,
+ {ActionType::update},
+ makeStrictUpdateStrategy(),
+ {},
+ UpsertType::kNone,
+ makeUpdateCommandGenerator()}},
+ // whenMatched: replace, whenNotMatched: discard
+ {kReplaceDiscardMode,
+ {kReplaceDiscardMode,
+ {ActionType::update},
+ makeUpdateStrategy(),
+ {},
+ UpsertType::kNone,
+ makeUpdateCommandGenerator()}},
+ // whenMatched: merge, whenNotMatched: insert
+ {kMergeInsertMode,
+ {kMergeInsertMode,
+ {ActionType::insert, ActionType::update},
+ makeUpdateStrategy(),
+ makeUpdateTransform("$set"),
+ UpsertType::kGenerateNewDoc,
+ makeUpdateCommandGenerator()}},
+ // whenMatched: merge, whenNotMatched: fail
+ {kMergeFailMode,
+ {kMergeFailMode,
+ {ActionType::update},
+ makeStrictUpdateStrategy(),
+ makeUpdateTransform("$set"),
+ UpsertType::kNone,
+ makeUpdateCommandGenerator()}},
+ // whenMatched: merge, whenNotMatched: discard
+ {kMergeDiscardMode,
+ {kMergeDiscardMode,
+ {ActionType::update},
+ makeUpdateStrategy(),
+ makeUpdateTransform("$set"),
+ UpsertType::kNone,
+ makeUpdateCommandGenerator()}},
+ // whenMatched: keepExisting, whenNotMatched: insert
+ {kKeepExistingInsertMode,
+ {kKeepExistingInsertMode,
+ {ActionType::insert, ActionType::update},
+ makeUpdateStrategy(),
+ makeUpdateTransform("$setOnInsert"),
+ UpsertType::kGenerateNewDoc,
+ makeUpdateCommandGenerator()}},
+ // whenMatched: [pipeline], whenNotMatched: insert
+ {kPipelineInsertMode,
+ {kPipelineInsertMode,
+ {ActionType::insert, ActionType::update},
+ makeUpdateStrategy(),
+ {},
+ UpsertType::kInsertSuppliedDoc,
+ makeUpdateCommandGenerator()}},
+ // whenMatched: [pipeline], whenNotMatched: fail
+ {kPipelineFailMode,
+ {kPipelineFailMode,
+ {ActionType::update},
+ makeStrictUpdateStrategy(),
+ {},
+ UpsertType::kNone,
+ makeUpdateCommandGenerator()}},
+ // whenMatched: [pipeline], whenNotMatched: discard
+ {kPipelineDiscardMode,
+ {kPipelineDiscardMode,
+ {ActionType::update},
+ makeUpdateStrategy(),
+ {},
+ UpsertType::kNone,
+ makeUpdateCommandGenerator()}},
+ // whenMatched: fail, whenNotMatched: insert
+ {kFailInsertMode,
+ {kFailInsertMode,
+ {ActionType::insert},
+ makeInsertStrategy(),
+ {},
+ UpsertType::kNone,
+ makeInsertCommandGenerator()}}};
return mergeStrategyDescriptors;
}
@@ -599,14 +681,19 @@ std::pair<DocumentSourceMerge::BatchObject, int> DocumentSourceMerge::makeBatchO
_writeSizeEstimator->estimateUpdateSizeBytes(batchObject, _descriptor.upsertType)};
}
-void DocumentSourceMerge::spill(BatchedObjects&& batch) try {
+void DocumentSourceMerge::spill(BatchedCommandRequest&& bcr, BatchedObjects&& batch) try {
DocumentSourceWriteBlock writeBlock(pExpCtx->opCtx);
auto targetEpoch = _targetCollectionPlacementVersion
? boost::optional<OID>(_targetCollectionPlacementVersion->epoch())
: boost::none;
- _descriptor.strategy(
- pExpCtx, _outputNs, _writeConcern, targetEpoch, std::move(batch), _descriptor.upsertType);
+ _descriptor.strategy(pExpCtx,
+ _outputNs,
+ _writeConcern,
+ targetEpoch,
+ std::move(batch),
+ std::move(bcr),
+ _descriptor.upsertType);
} catch (const ExceptionFor<ErrorCodes::ImmutableField>& ex) {
uassertStatusOKWithContext(ex.toStatus(),
"$merge failed to update the matching document, did you "
@@ -630,6 +717,10 @@ void DocumentSourceMerge::spill(BatchedObjects&& batch) try {
}
}
+BatchedCommandRequest DocumentSourceMerge::initializeBatchedWriteRequest() const {
+ return _descriptor.batchedCommandGenerator(pExpCtx, _outputNs);
+}
+
void DocumentSourceMerge::waitWhileFailPointEnabled() {
CurOpFailpointHelpers::waitWhileFailPointEnabled(
&hangWhileBuildingDocumentSourceMergeBatch,
diff --git a/src/mongo/db/pipeline/document_source_merge.h b/src/mongo/db/pipeline/document_source_merge.h
index b0eed7a6df9..0349374fadb 100644
--- a/src/mongo/db/pipeline/document_source_merge.h
+++ b/src/mongo/db/pipeline/document_source_merge.h
@@ -48,8 +48,9 @@ public:
// A descriptor for a merge strategy. Holds a merge strategy function and a set of actions the
// client should be authorized to perform in order to be able to execute a merge operation using
- // this merge strategy. If a 'BatchTransform' function is provided, it will be called when
- // constructing a batch object to transform updates.
+ // this merge strategy. Additionally holds a 'BatchedCommandGenerator' that will initialize a
+ // BatchedWriteRequest for executing the batch write. If a 'BatchTransform' function is
+ // provided, it will be called when constructing a batch object to transform updates.
struct MergeStrategyDescriptor {
using WhenMatched = MergeWhenMatchedModeEnum;
using WhenNotMatched = MergeWhenNotMatchedModeEnum;
@@ -62,13 +63,19 @@ public:
const WriteConcernOptions&,
boost::optional<OID>,
BatchedObjects&&,
+ BatchedCommandRequest&&,
UpsertType upsert)>;
+ // A function object that will be invoked to generate a BatchedCommandRequest.
+ using BatchedCommandGenerator = std::function<BatchedCommandRequest(
+ const boost::intrusive_ptr<ExpressionContext>&, const NamespaceString&)>;
+
MergeMode mode;
ActionSet actions;
MergeStrategy strategy;
BatchTransform transform;
UpsertType upsertType;
+ BatchedCommandGenerator batchedCommandGenerator;
};
/**
@@ -218,7 +225,9 @@ private:
return bob.obj();
}
- void spill(BatchedObjects&& batch) override;
+ void spill(BatchedCommandRequest&& bcr, BatchedObjects&& batch) override;
+
+ BatchedCommandRequest initializeBatchedWriteRequest() const override;
void waitWhileFailPointEnabled() override;
diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp
index 8763cb0874c..e45933d4f73 100644
--- a/src/mongo/db/pipeline/document_source_out.cpp
+++ b/src/mongo/db/pipeline/document_source_out.cpp
@@ -284,6 +284,13 @@ void DocumentSourceOut::finalize() {
_timeseriesStateConsistent = true;
}
+BatchedCommandRequest DocumentSourceOut::initializeBatchedWriteRequest() const {
+ // Note that our insert targets '_tempNs' (or the associated timeseries view) since we will
+ // never write to 'outputNs' directly.
+ const auto& targetNss = _timeseries ? _tempNs.getTimeseriesViewNamespace() : _tempNs;
+ return DocumentSourceWriter::makeInsertCommand(targetNss, pExpCtx->bypassDocumentValidation);
+}
+
boost::intrusive_ptr<DocumentSource> DocumentSourceOut::create(
NamespaceString outputNs,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h
index 60d6a865c1d..becd7998bd0 100644
--- a/src/mongo/db/pipeline/document_source_out.h
+++ b/src/mongo/db/pipeline/document_source_out.h
@@ -127,20 +127,23 @@ private:
void finalize() override;
- void spill(BatchedObjects&& batch) override {
+ void spill(BatchedCommandRequest&& bcr, BatchedObjects&& batch) override {
DocumentSourceWriteBlock writeBlock(pExpCtx->opCtx);
+ auto insertCommand = bcr.extractInsertRequest();
+ insertCommand->setDocuments(std::move(batch));
auto targetEpoch = boost::none;
+
if (_timeseries) {
uassertStatusOK(pExpCtx->mongoProcessInterface->insertTimeseries(
pExpCtx,
_tempNs.getTimeseriesViewNamespace(),
- std::move(batch),
+ std::move(insertCommand),
_writeConcern,
targetEpoch));
} else {
uassertStatusOK(pExpCtx->mongoProcessInterface->insert(
- pExpCtx, _tempNs, std::move(batch), _writeConcern, targetEpoch));
+ pExpCtx, _tempNs, std::move(insertCommand), _writeConcern, targetEpoch));
}
}
@@ -150,6 +153,8 @@ private:
return {obj, _writeSizeEstimator->estimateInsertSizeBytes(obj)};
}
+ BatchedCommandRequest initializeBatchedWriteRequest() const override;
+
void waitWhileFailPointEnabled() override;
/**
diff --git a/src/mongo/db/pipeline/document_source_writer.h b/src/mongo/db/pipeline/document_source_writer.h
index 124625d6f94..b2c5363d429 100644
--- a/src/mongo/db/pipeline/document_source_writer.h
+++ b/src/mongo/db/pipeline/document_source_writer.h
@@ -39,6 +39,7 @@
#include "mongo/db/read_concern.h"
#include "mongo/db/storage/recovery_unit.h"
#include "mongo/rpc/metadata/impersonated_user_metadata.h"
+#include "mongo/s/write_ops/batched_command_request.h"
namespace mongo {
using namespace fmt::literals;
@@ -84,11 +85,14 @@ public:
/**
* This is a base abstract class for all stages performing a write operation into an output
* collection. The writes are organized in batches in which elements are objects of the templated
- * type 'B'. A subclass must override two methods to be able to write into the output collection:
+ * type 'B'. A subclass must override the following methods to be able to write into the output
+ * collection:
*
- * 1. 'makeBatchObject()' - to create an object of type 'B' from the given 'Document', which is,
+ * - 'makeBatchObject()' - creates an object of type 'B' from the given 'Document', which is,
* essentially, a result of the input source's 'getNext()' .
- * 2. 'spill()' - to write the batch into the output collection.
+ * - 'spill()' - writes the batch into the output collection.
+ * - 'initializeBatchedWriteRequest()' - initializes the request object for writing a batch to
+ * the output collection.
*
* Two other virtual methods exist which a subclass may override: 'initialize()' and 'finalize()',
* which are called before the first element is read from the input source, and after the last one
@@ -100,6 +104,18 @@ public:
using BatchObject = B;
using BatchedObjects = std::vector<BatchObject>;
+ static BatchedCommandRequest makeInsertCommand(const NamespaceString& outputNs,
+ bool bypassDocumentValidation) {
+ write_ops::InsertCommandRequest insertOp(outputNs);
+ insertOp.setWriteCommandRequestBase([&] {
+ write_ops::WriteCommandRequestBase wcb;
+ wcb.setOrdered(false);
+ wcb.setBypassDocumentValidation(bypassDocumentValidation);
+ return wcb;
+ }());
+ return BatchedCommandRequest(std::move(insertOp));
+ }
+
DocumentSourceWriter(const char* stageName,
NamespaceString outputNs,
const boost::intrusive_ptr<ExpressionContext>& expCtx)
@@ -146,9 +162,31 @@ protected:
virtual void finalize() {}
/**
- * Writes the documents in 'batch' to the output namespace.
+ * Writes the documents in 'batch' to the output namespace via 'bcr'.
+ */
+ virtual void spill(BatchedCommandRequest&& bcr, BatchedObjects&& batch) = 0;
+
+ /**
+ * Estimates the size of the header of a batch write (that is, the size of the write command
+ * minus the size of write statements themselves).
+ */
+ int estimateWriteHeaderSize(const BatchedCommandRequest& bcr) const {
+ using BatchType = BatchedCommandRequest::BatchType;
+ switch (bcr.getBatchType()) {
+ case BatchType::BatchType_Insert:
+ return _writeSizeEstimator->estimateInsertHeaderSize(bcr.getInsertRequest());
+ case BatchType::BatchType_Update:
+ return _writeSizeEstimator->estimateUpdateHeaderSize(bcr.getUpdateRequest());
+ case BatchType::BatchType_Delete:
+ break;
+ }
+ MONGO_UNREACHABLE;
+ }
+
+ /**
+ * Constructs and configures a BatchedCommandRequest for performing a batch write.
*/
- virtual void spill(BatchedObjects&& batch) = 0;
+ virtual BatchedCommandRequest initializeBatchedWriteRequest() const = 0;
/**
* Creates a batch object from the given document and returns it to the caller along with the
@@ -210,15 +248,20 @@ DocumentSource::GetNextResult DocumentSourceWriter<B>::doGetNext() {
// and assume the rest can fit in the 16KB already built into BSONObjMaxUserSize.
const auto estimatedMetadataSizeBytes =
rpc::estimateImpersonatedUserMetadataSize(pExpCtx->opCtx);
+
+ BatchedCommandRequest batchWrite = initializeBatchedWriteRequest();
+ const auto writeHeaderSize = estimateWriteHeaderSize(batchWrite);
+ const auto initialRequestSize = estimatedMetadataSizeBytes + writeHeaderSize;
+
uassert(7637800,
"Unable to proceed with write while metadata size ({}KB) exceeds {}KB"_format(
- estimatedMetadataSizeBytes / 1024, BSONObjMaxUserSize / 1024),
- estimatedMetadataSizeBytes <= BSONObjMaxUserSize);
+ initialRequestSize / 1024, BSONObjMaxUserSize / 1024),
+ initialRequestSize <= BSONObjMaxUserSize);
- const auto maxBatchSizeBytes = BSONObjMaxUserSize - estimatedMetadataSizeBytes;
- BatchedObjects batch;
- std::size_t bufferedBytes = 0;
+ const auto maxBatchSizeBytes = BSONObjMaxUserSize - initialRequestSize;
+ BatchedObjects batch;
+ size_t bufferedBytes = 0;
auto nextInput = pSource->getNext();
for (; nextInput.isAdvanced(); nextInput = pSource->getNext()) {
waitWhileFailPointEnabled();
@@ -230,14 +273,15 @@ DocumentSource::GetNextResult DocumentSourceWriter<B>::doGetNext() {
if (!batch.empty() &&
(bufferedBytes > maxBatchSizeBytes ||
batch.size() >= write_ops::kMaxWriteBatchSize)) {
- spill(std::move(batch));
+ spill(std::move(batchWrite), std::move(batch));
batch.clear();
+ batchWrite = initializeBatchedWriteRequest();
bufferedBytes = objSize;
}
batch.push_back(obj);
}
if (!batch.empty()) {
- spill(std::move(batch));
+ spill(std::move(batchWrite), std::move(batch));
batch.clear();
}
diff --git a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp
index 9c2dd64172f..c3db7716d48 100644
--- a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp
@@ -788,59 +788,6 @@ CommonMongodProcessInterface::ensureFieldsUniqueOrResolveDocumentKey(
return {*fieldPaths, targetCollectionPlacementVersion};
}
-write_ops::InsertCommandRequest CommonMongodProcessInterface::buildInsertOp(
- const NamespaceString& nss, std::vector<BSONObj>&& objs, bool bypassDocValidation) {
- write_ops::InsertCommandRequest insertOp(nss);
- insertOp.setDocuments(std::move(objs));
- insertOp.setWriteCommandRequestBase([&] {
- write_ops::WriteCommandRequestBase wcb;
- wcb.setOrdered(false);
- wcb.setBypassDocumentValidation(bypassDocValidation);
- return wcb;
- }());
- return insertOp;
-}
-
-write_ops::UpdateCommandRequest CommonMongodProcessInterface::buildUpdateOp(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& nss,
- BatchedObjects&& batch,
- UpsertType upsert,
- bool multi) {
- write_ops::UpdateCommandRequest updateOp(nss);
- updateOp.setUpdates([&] {
- std::vector<write_ops::UpdateOpEntry> updateEntries;
- for (auto&& obj : batch) {
- updateEntries.push_back([&] {
- write_ops::UpdateOpEntry entry;
- auto&& [q, u, c] = obj;
- entry.setQ(std::move(q));
- entry.setU(std::move(u));
- entry.setC(std::move(c));
- entry.setUpsert(upsert != UpsertType::kNone);
- entry.setUpsertSupplied(
- {{entry.getUpsert(), upsert == UpsertType::kInsertSuppliedDoc}});
- entry.setMulti(multi);
- return entry;
- }());
- }
- return updateEntries;
- }());
- updateOp.setWriteCommandRequestBase([&] {
- write_ops::WriteCommandRequestBase wcb;
- wcb.setOrdered(false);
- wcb.setBypassDocumentValidation(expCtx->bypassDocumentValidation);
- return wcb;
- }());
- auto [constants, letParams] =
- expCtx->variablesParseState.transitionalCompatibilitySerialize(expCtx->variables);
- updateOp.setLegacyRuntimeConstants(std::move(constants));
- if (!letParams.isEmpty()) {
- updateOp.setLet(std::move(letParams));
- }
- return updateOp;
-}
-
BSONObj CommonMongodProcessInterface::_convertRenameToInternalRename(
OperationContext* opCtx,
const NamespaceString& sourceNs,
diff --git a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h
index 09b3ed8fa33..07e2d0370ce 100644
--- a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h
@@ -145,23 +145,6 @@ protected:
const Document& documentKey,
MakePipelineOptions opts);
- /**
- * Builds an ordered insert op on namespace 'nss' and documents to be written 'objs'.
- */
- write_ops::InsertCommandRequest buildInsertOp(const NamespaceString& nss,
- std::vector<BSONObj>&& objs,
- bool bypassDocValidation);
-
- /**
- * Builds an ordered update op on namespace 'nss' with update entries contained in 'batch'.
- */
- write_ops::UpdateCommandRequest buildUpdateOp(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& nss,
- BatchedObjects&& batch,
- UpsertType upsert,
- bool multi);
-
BSONObj _reportCurrentOpForClient(OperationContext* opCtx,
Client* client,
CurrentOpTruncateMode truncateOps,
diff --git a/src/mongo/db/pipeline/process_interface/common_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_process_interface.cpp
index c3a978ca766..5cf57474365 100644
--- a/src/mongo/db/pipeline/process_interface/common_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/common_process_interface.cpp
@@ -168,12 +168,6 @@ std::vector<FieldPath> CommonProcessInterface::collectDocumentKeyFieldsActingAsR
return {"_id"};
}
-std::unique_ptr<CommonProcessInterface::WriteSizeEstimator>
-CommonProcessInterface::getWriteSizeEstimator(OperationContext* opCtx,
- const NamespaceString& ns) const {
- return std::make_unique<LocalWriteSizeEstimator>();
-}
-
void CommonProcessInterface::updateClientOperationTime(OperationContext* opCtx) const {
// In order to support causal consistency in a replica set or a sharded cluster when reading
// with secondary read preference, the secondary must propagate the primary's operation time
diff --git a/src/mongo/db/pipeline/process_interface/common_process_interface.h b/src/mongo/db/pipeline/process_interface/common_process_interface.h
index b3b5f26468f..4a4f1d1e990 100644
--- a/src/mongo/db/pipeline/process_interface/common_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/common_process_interface.h
@@ -32,6 +32,7 @@
#include <vector>
#include "mongo/bson/bsonobj.h"
+#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/process_interface/mongo_process_interface.h"
namespace mongo {
@@ -47,10 +48,22 @@ public:
/**
* Estimates the size of writes that will be executed on the current node. Note that this
- * does not account for the full size of an update statement.
+ * does not account for the full size of an update statement because in the case of local
+ * writes, we will not have to serialize to BSON and are therefore not subject to the 16MB
+ * BSONObj size limit.
*/
class LocalWriteSizeEstimator final : public WriteSizeEstimator {
public:
+ int estimateInsertHeaderSize(
+ const write_ops::InsertCommandRequest& insertReq) const override {
+ return 0;
+ }
+
+ int estimateUpdateHeaderSize(
+ const write_ops::UpdateCommandRequest& insertReq) const override {
+ return 0;
+ }
+
int estimateInsertSizeBytes(const BSONObj& insert) const override {
return insert.objsize();
}
@@ -70,6 +83,16 @@ public:
*/
class TargetPrimaryWriteSizeEstimator final : public WriteSizeEstimator {
public:
+ int estimateInsertHeaderSize(
+ const write_ops::InsertCommandRequest& insertReq) const override {
+ return write_ops::getInsertHeaderSizeEstimate(insertReq);
+ }
+
+ int estimateUpdateHeaderSize(
+ const write_ops::UpdateCommandRequest& updateReq) const override {
+ return write_ops::getUpdateHeaderSizeEstimate(updateReq);
+ }
+
int estimateInsertSizeBytes(const BSONObj& insert) const override {
return insert.objsize() + write_ops::kWriteCommandBSONArrayPerElementOverheadBytes;
}
@@ -109,8 +132,6 @@ public:
virtual std::vector<FieldPath> collectDocumentKeyFieldsActingAsRouter(
OperationContext*, const NamespaceString&) const override;
- std::unique_ptr<WriteSizeEstimator> getWriteSizeEstimator(
- OperationContext* opCtx, const NamespaceString& ns) const override;
virtual void updateClientOperationTime(OperationContext* opCtx) const final;
diff --git a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
index 1aa7d6be8de..05ee7dd7ad7 100644
--- a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
@@ -112,8 +112,19 @@ public:
public:
virtual ~WriteSizeEstimator() = default;
+ /**
+ * Set of functions which estimate the entire size of a write command except for the array
+ * of write statements themselves.
+ */
+ virtual int estimateInsertHeaderSize(
+ const write_ops::InsertCommandRequest& insertReq) const = 0;
+ virtual int estimateUpdateHeaderSize(
+ const write_ops::UpdateCommandRequest& updateReq) const = 0;
+
+ /**
+ * Set of functions which estimate the size of a single write statement.
+ */
virtual int estimateInsertSizeBytes(const BSONObj& insert) const = 0;
-
virtual int estimateUpdateSizeBytes(const BatchObject& batchObject,
UpsertType type) const = 0;
};
@@ -168,34 +179,35 @@ public:
virtual void updateClientOperationTime(OperationContext* opCtx) const = 0;
/**
- * Inserts 'objs' into 'ns' and returns an error Status if the insert fails. If 'targetEpoch' is
- * set, throws ErrorCodes::StaleEpoch if the targeted collection does not have the same epoch or
- * the epoch changes during the course of the insert.
+ * Executes 'insertCommand' against 'ns' and returns an error Status if the insert fails. If
+ * 'targetEpoch' is set, throws ErrorCodes::StaleEpoch if the targeted collection does not have
+ * the same epoch or the epoch changes during the course of the insert.
*/
virtual Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- std::vector<BSONObj>&& objs,
+ std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) = 0;
virtual Status insertTimeseries(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- std::vector<BSONObj>&& objs,
+ std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) = 0;
/**
- * Updates the documents matching 'queries' with the objects 'updates'. Returns an error Status
- * if any of the updates fail, otherwise returns an 'UpdateResult' objects with the details of
- * the update operation. If 'targetEpoch' is set, throws ErrorCodes::StaleEpoch if the targeted
- * collection does not have the same epoch, or if the epoch changes during the update.
- */
- virtual StatusWith<UpdateResult> update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& ns,
- BatchedObjects&& batch,
- const WriteConcernOptions& wc,
- UpsertType upsert,
- bool multi,
- boost::optional<OID> targetEpoch) = 0;
+ * Executes the updates described by 'updateCommand'. Returns an error Status if any of the
+ * updates fail, otherwise returns an 'UpdateResult' objects with the details of the update
+ * operation. If 'targetEpoch' is set, throws ErrorCodes::StaleEpoch if the targeted collection
+ * does not have the same epoch, or if the epoch changes during the update.
+ */
+ virtual StatusWith<UpdateResult> update(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ std::unique_ptr<write_ops::UpdateCommandRequest> updateCommand,
+ const WriteConcernOptions& wc,
+ UpsertType upsert,
+ bool multi,
+ boost::optional<OID> targetEpoch) = 0;
/**
* Returns index usage statistics for each index on collection 'ns' along with additional
diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp
index 9e4a6f7cce7..33e900c06a9 100644
--- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp
@@ -98,7 +98,7 @@ bool supportsUniqueKey(const boost::intrusive_ptr<ExpressionContext>& expCtx,
} // namespace
-std::unique_ptr<CommonProcessInterface::WriteSizeEstimator>
+std::unique_ptr<MongoProcessInterface::WriteSizeEstimator>
MongosProcessInterface::getWriteSizeEstimator(OperationContext* opCtx,
const NamespaceString& ns) const {
return std::make_unique<TargetPrimaryWriteSizeEstimator>();
diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h
index a7dea455057..0ea14f6dcc2 100644
--- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h
@@ -72,7 +72,7 @@ public:
Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- std::vector<BSONObj>&& objs,
+ std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID>) final {
MONGO_UNREACHABLE;
@@ -80,7 +80,7 @@ public:
Status insertTimeseries(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- std::vector<BSONObj>&& objs,
+ std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) final {
MONGO_UNREACHABLE;
@@ -88,7 +88,7 @@ public:
StatusWith<UpdateResult> update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- BatchedObjects&& batch,
+ std::unique_ptr<write_ops::UpdateCommandRequest> updateCommand,
const WriteConcernOptions& wc,
UpsertType upsert,
bool multi,
diff --git a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp
index e5bceac9686..f153109e0e6 100644
--- a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp
@@ -108,13 +108,13 @@ boost::optional<Document> NonShardServerProcessInterface::lookupSingleDocument(
return lookedUpDocument;
}
-Status NonShardServerProcessInterface::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& ns,
- std::vector<BSONObj>&& objs,
- const WriteConcernOptions& wc,
- boost::optional<OID> targetEpoch) {
- auto writeResults = write_ops_exec::performInserts(
- expCtx->opCtx, buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation));
+Status NonShardServerProcessInterface::insert(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
+ const WriteConcernOptions& wc,
+ boost::optional<OID> targetEpoch) {
+ auto writeResults = write_ops_exec::performInserts(expCtx->opCtx, *insertCommand);
// Need to check each result in the batch since the writes are unordered.
for (const auto& result : writeResults.results) {
@@ -128,12 +128,11 @@ Status NonShardServerProcessInterface::insert(const boost::intrusive_ptr<Express
Status NonShardServerProcessInterface::insertTimeseries(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- std::vector<BSONObj>&& objs,
+ std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) {
try {
- auto insertReply = write_ops_exec::performTimeseriesWrites(
- expCtx->opCtx, buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation));
+ auto insertReply = write_ops_exec::performTimeseriesWrites(expCtx->opCtx, *insertCommand);
checkWriteErrors(insertReply.getWriteCommandReplyBase());
} catch (DBException& ex) {
@@ -146,13 +145,12 @@ Status NonShardServerProcessInterface::insertTimeseries(
StatusWith<MongoProcessInterface::UpdateResult> NonShardServerProcessInterface::update(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- BatchedObjects&& batch,
+ std::unique_ptr<write_ops::UpdateCommandRequest> updateCommand,
const WriteConcernOptions& wc,
UpsertType upsert,
bool multi,
boost::optional<OID> targetEpoch) {
- auto writeResults = write_ops_exec::performUpdates(
- expCtx->opCtx, buildUpdateOp(expCtx, ns, std::move(batch), upsert, multi));
+ auto writeResults = write_ops_exec::performUpdates(expCtx->opCtx, *updateCommand);
// Need to check each result in the batch since the writes are unordered.
UpdateResult updateResult;
diff --git a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h
index 9bae8030aac..639cc22d044 100644
--- a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h
@@ -91,19 +91,19 @@ public:
Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- std::vector<BSONObj>&& objs,
+ std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) override;
Status insertTimeseries(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- std::vector<BSONObj>&& objs,
+ std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) override;
StatusWith<UpdateResult> update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- BatchedObjects&& batch,
+ std::unique_ptr<write_ops::UpdateCommandRequest> updateCommand,
const WriteConcernOptions& wc,
UpsertType upsert,
bool multi,
diff --git a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp
index ad504860169..3195b504faf 100644
--- a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp
@@ -69,26 +69,27 @@ void ReplicaSetNodeProcessInterface::setReplicaSetNodeExecutor(
replicaSetNodeExecutor(service) = std::move(executor);
}
-Status ReplicaSetNodeProcessInterface::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& ns,
- std::vector<BSONObj>&& objs,
- const WriteConcernOptions& wc,
- boost::optional<OID> targetEpoch) {
+Status ReplicaSetNodeProcessInterface::insert(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
+ const WriteConcernOptions& wc,
+ boost::optional<OID> targetEpoch) {
auto&& opCtx = expCtx->opCtx;
if (_canWriteLocally(opCtx, ns)) {
- return NonShardServerProcessInterface::insert(expCtx, ns, std::move(objs), wc, targetEpoch);
+ return NonShardServerProcessInterface::insert(
+ expCtx, ns, std::move(insertCommand), wc, targetEpoch);
}
- BatchedCommandRequest insertCommand(
- buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation));
+ BatchedCommandRequest batchInsertCommand(std::move(insertCommand));
- return _executeCommandOnPrimary(opCtx, ns, std::move(insertCommand.toBSON())).getStatus();
+ return _executeCommandOnPrimary(opCtx, ns, batchInsertCommand.toBSON()).getStatus();
}
StatusWith<MongoProcessInterface::UpdateResult> ReplicaSetNodeProcessInterface::update(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- BatchedObjects&& batch,
+ std::unique_ptr<write_ops::UpdateCommandRequest> updateCommand,
const WriteConcernOptions& wc,
UpsertType upsert,
bool multi,
@@ -96,11 +97,11 @@ StatusWith<MongoProcessInterface::UpdateResult> ReplicaSetNodeProcessInterface::
auto&& opCtx = expCtx->opCtx;
if (_canWriteLocally(opCtx, ns)) {
return NonShardServerProcessInterface::update(
- expCtx, ns, std::move(batch), wc, upsert, multi, targetEpoch);
+ expCtx, ns, std::move(updateCommand), wc, upsert, multi, targetEpoch);
}
+ BatchedCommandRequest batchUpdateCommand(std::move(updateCommand));
- BatchedCommandRequest updateCommand(buildUpdateOp(expCtx, ns, std::move(batch), upsert, multi));
- auto result = _executeCommandOnPrimary(opCtx, ns, std::move(updateCommand.toBSON()));
+ auto result = _executeCommandOnPrimary(opCtx, ns, batchUpdateCommand.toBSON());
if (!result.isOK()) {
return result.getStatus();
}
@@ -142,14 +143,15 @@ void ReplicaSetNodeProcessInterface::createTimeseriesView(OperationContext* opCt
Status ReplicaSetNodeProcessInterface::insertTimeseries(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- std::vector<BSONObj>&& objs,
+ std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) {
if (_canWriteLocally(expCtx->opCtx, ns)) {
return NonShardServerProcessInterface::insertTimeseries(
- expCtx, ns, std::move(objs), wc, targetEpoch);
+ expCtx, ns, std::move(insertCommand), wc, targetEpoch);
} else {
- return ReplicaSetNodeProcessInterface::insert(expCtx, ns, std::move(objs), wc, targetEpoch);
+ return ReplicaSetNodeProcessInterface::insert(
+ expCtx, ns, std::move(insertCommand), wc, targetEpoch);
}
}
diff --git a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h
index cf5bb5a90ef..94f45559384 100644
--- a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h
@@ -68,12 +68,13 @@ public:
Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- std::vector<BSONObj>&& objs,
+ std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) final;
+
StatusWith<UpdateResult> update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- BatchedObjects&& batch,
+ std::unique_ptr<write_ops::UpdateCommandRequest> updateCommand,
const WriteConcernOptions& wc,
UpsertType upsert,
bool multi,
@@ -100,7 +101,7 @@ public:
Status insertTimeseries(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- std::vector<BSONObj>&& objs,
+ std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch);
diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
index bfea6b001a6..dd9367b09e2 100644
--- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
@@ -123,20 +123,20 @@ boost::optional<Document> ShardServerProcessInterface::lookupSingleDocument(
return doLookupSingleDocument(expCtx, nss, collectionUUID, documentKey, std::move(opts));
}
-Status ShardServerProcessInterface::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& ns,
- std::vector<BSONObj>&& objs,
- const WriteConcernOptions& wc,
- boost::optional<OID> targetEpoch) {
+Status ShardServerProcessInterface::insert(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
+ const WriteConcernOptions& wc,
+ boost::optional<OID> targetEpoch) {
BatchedCommandResponse response;
BatchWriteExecStats stats;
- BatchedCommandRequest insertCommand(
- buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation));
+ BatchedCommandRequest batchInsertCommand(std::move(insertCommand));
- insertCommand.setWriteConcern(wc.toBSON());
+ batchInsertCommand.setWriteConcern(wc.toBSON());
- cluster::write(expCtx->opCtx, insertCommand, &stats, &response, targetEpoch);
+ cluster::write(expCtx->opCtx, batchInsertCommand, &stats, &response, targetEpoch);
return response.toStatus();
}
@@ -144,7 +144,7 @@ Status ShardServerProcessInterface::insert(const boost::intrusive_ptr<Expression
StatusWith<MongoProcessInterface::UpdateResult> ShardServerProcessInterface::update(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- BatchedObjects&& batch,
+ std::unique_ptr<write_ops::UpdateCommandRequest> updateCommand,
const WriteConcernOptions& wc,
UpsertType upsert,
bool multi,
@@ -152,11 +152,10 @@ StatusWith<MongoProcessInterface::UpdateResult> ShardServerProcessInterface::upd
BatchedCommandResponse response;
BatchWriteExecStats stats;
- BatchedCommandRequest updateCommand(buildUpdateOp(expCtx, ns, std::move(batch), upsert, multi));
-
- updateCommand.setWriteConcern(wc.toBSON());
+ BatchedCommandRequest batchUpdateCommand(std::move(updateCommand));
+ batchUpdateCommand.setWriteConcern(wc.toBSON());
- cluster::write(expCtx->opCtx, updateCommand, &stats, &response, targetEpoch);
+ cluster::write(expCtx->opCtx, batchUpdateCommand, &stats, &response, targetEpoch);
if (auto status = response.toStatus(); status != Status::OK()) {
return status;
@@ -402,10 +401,11 @@ void ShardServerProcessInterface::createTimeseriesView(OperationContext* opCtx,
Status ShardServerProcessInterface::insertTimeseries(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- std::vector<BSONObj>&& objs,
+ std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) {
- return ShardServerProcessInterface::insert(expCtx, ns, std::move(objs), wc, targetEpoch);
+ return ShardServerProcessInterface::insert(
+ expCtx, ns, std::move(insertCommand), wc, targetEpoch);
}
std::unique_ptr<Pipeline, PipelineDeleter>
diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h
index c81af91b6e0..aef9845f135 100644
--- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h
@@ -53,6 +53,11 @@ public:
const NamespaceString& nss,
ChunkVersion targetCollectionPlacementVersion) const final;
+ std::unique_ptr<WriteSizeEstimator> getWriteSizeEstimator(
+ OperationContext* opCtx, const NamespaceString& ns) const final {
+ return std::make_unique<TargetPrimaryWriteSizeEstimator>();
+ }
+
std::vector<FieldPath> collectDocumentKeyFieldsActingAsRouter(
OperationContext*, const NamespaceString&) const final {
// We don't expect anyone to use this method on the shard itself (yet). This is currently
@@ -71,23 +76,15 @@ public:
const Document& documentKey,
boost::optional<BSONObj> readConcern) final;
- /**
- * Inserts the documents 'objs' into the namespace 'ns' using the ClusterWriter for locking,
- * routing, stale config handling, etc.
- */
Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- std::vector<BSONObj>&& objs,
+ std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) final;
- /**
- * Replaces the documents matching 'queries' with 'updates' using the ClusterWriter for locking,
- * routing, stale config handling, etc.
- */
StatusWith<UpdateResult> update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- BatchedObjects&& batch,
+ std::unique_ptr<write_ops::UpdateCommandRequest> updateCommand,
const WriteConcernOptions& wc,
UpsertType upsert,
bool multi,
@@ -149,7 +146,7 @@ public:
Status insertTimeseries(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- std::vector<BSONObj>&& objs,
+ std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) final;
};
diff --git a/src/mongo/db/pipeline/process_interface/standalone_process_interface.h b/src/mongo/db/pipeline/process_interface/standalone_process_interface.h
index aceff8e6928..dc562b9089e 100644
--- a/src/mongo/db/pipeline/process_interface/standalone_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/standalone_process_interface.h
@@ -41,6 +41,11 @@ public:
StandaloneProcessInterface(std::shared_ptr<executor::TaskExecutor> exec)
: NonShardServerProcessInterface(std::move(exec)) {}
+ std::unique_ptr<MongoProcessInterface::WriteSizeEstimator> getWriteSizeEstimator(
+ OperationContext* opCtx, const NamespaceString& ns) const final {
+ return std::make_unique<LocalWriteSizeEstimator>();
+ }
+
virtual ~StandaloneProcessInterface() = default;
};
diff --git a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
index 60783a7709b..93ed117a53d 100644
--- a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
@@ -55,6 +55,16 @@ public:
class StubWriteSizeEstimator final : public WriteSizeEstimator {
public:
+ int estimateInsertHeaderSize(
+ const write_ops::InsertCommandRequest& insertReq) const override {
+ return 0;
+ }
+
+ int estimateUpdateHeaderSize(
+ const write_ops::UpdateCommandRequest& insertReq) const override {
+ return 0;
+ }
+
int estimateInsertSizeBytes(const BSONObj& insert) const override {
MONGO_UNREACHABLE;
}
@@ -64,6 +74,7 @@ public:
MONGO_UNREACHABLE;
}
};
+
std::unique_ptr<WriteSizeEstimator> getWriteSizeEstimator(
OperationContext* opCtx, const NamespaceString& ns) const override {
return std::make_unique<StubWriteSizeEstimator>();
@@ -77,7 +88,7 @@ public:
Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- std::vector<BSONObj>&& objs,
+ std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID>) override {
MONGO_UNREACHABLE;
@@ -85,7 +96,7 @@ public:
Status insertTimeseries(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- std::vector<BSONObj>&& objs,
+ std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) override {
MONGO_UNREACHABLE;
@@ -93,7 +104,7 @@ public:
StatusWith<UpdateResult> update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- BatchedObjects&& batch,
+ std::unique_ptr<write_ops::UpdateCommandRequest> updateCommand,
const WriteConcernOptions& wc,
UpsertType upsert,
bool multi,
diff --git a/src/mongo/s/write_ops/batched_command_request.h b/src/mongo/s/write_ops/batched_command_request.h
index 8dc815053f1..e5c87d84167 100644
--- a/src/mongo/s/write_ops/batched_command_request.h
+++ b/src/mongo/s/write_ops/batched_command_request.h
@@ -54,15 +54,25 @@ public:
: _batchType(BatchType_Insert),
_insertReq(std::make_unique<write_ops::InsertCommandRequest>(std::move(insertOp))) {}
+ BatchedCommandRequest(std::unique_ptr<write_ops::InsertCommandRequest> insertOp)
+ : _batchType(BatchType_Insert), _insertReq(std::move(insertOp)) {}
+
BatchedCommandRequest(write_ops::UpdateCommandRequest updateOp)
: _batchType(BatchType_Update),
_updateReq(std::make_unique<write_ops::UpdateCommandRequest>(std::move(updateOp))) {}
+ BatchedCommandRequest(std::unique_ptr<write_ops::UpdateCommandRequest> updateOp)
+ : _batchType(BatchType_Update), _updateReq(std::move(updateOp)) {}
+
BatchedCommandRequest(write_ops::DeleteCommandRequest deleteOp)
: _batchType(BatchType_Delete),
_deleteReq(std::make_unique<write_ops::DeleteCommandRequest>(std::move(deleteOp))) {}
+ BatchedCommandRequest(std::unique_ptr<write_ops::DeleteCommandRequest> deleteOp)
+ : _batchType(BatchType_Delete), _deleteReq(std::move(deleteOp)) {}
+
BatchedCommandRequest(BatchedCommandRequest&&) = default;
+ BatchedCommandRequest& operator=(BatchedCommandRequest&&) = default;
static BatchedCommandRequest parseInsert(const OpMsgRequest& request);
static BatchedCommandRequest parseUpdate(const OpMsgRequest& request);
@@ -93,6 +103,18 @@ public:
return *_deleteReq;
}
+ std::unique_ptr<write_ops::InsertCommandRequest> extractInsertRequest() {
+ return std::move(_insertReq);
+ }
+
+ std::unique_ptr<write_ops::UpdateCommandRequest> extractUpdateRequest() {
+ return std::move(_updateReq);
+ }
+
+ std::unique_ptr<write_ops::DeleteCommandRequest> extractDeleteRequest() {
+ return std::move(_deleteReq);
+ }
+
std::size_t sizeWriteOps() const;
void setWriteConcern(const BSONObj& writeConcern) {