diff options
28 files changed, 999 insertions, 280 deletions
diff --git a/etc/backports_required_for_multiversion_tests.yml b/etc/backports_required_for_multiversion_tests.yml index 8f969854d21..4eedccccb4f 100644 --- a/etc/backports_required_for_multiversion_tests.yml +++ b/etc/backports_required_for_multiversion_tests.yml @@ -379,6 +379,8 @@ last-continuous: ticket: SERVER-76719 - test_file: jstests/sharding/shard_keys_with_dollar_sign.js ticket: SERVER-76948 + - test_file: jstests/sharding/merge_let_params_size_estimation.js + ticket: SERVER-74806 suites: null last-lts: all: @@ -842,4 +844,6 @@ last-lts: ticket: SERVER-76489 - test_file: jstests/sharding/shard_keys_with_dollar_sign.js ticket: SERVER-76948 + - test_file: jstests/sharding/merge_let_params_size_estimation.js + ticket: SERVER-74806 suites: null diff --git a/jstests/sharding/merge_let_params_size_estimation.js b/jstests/sharding/merge_let_params_size_estimation.js new file mode 100644 index 00000000000..66f30d38335 --- /dev/null +++ b/jstests/sharding/merge_let_params_size_estimation.js @@ -0,0 +1,155 @@ +/** + * Test which verifies that $merge accounts for the size of let parameters and runtime constants + * when it serializes writes to send to other nodes. + * + * @tags: [ + * # The $merge in this test targets the '_id' field, and requires a unique index. + * expects_explicit_underscore_id_index, + * ] + */ +(function() { +"use strict"; + +load('jstests/libs/fixture_helpers.js'); // For isReplSet(). + +// Function to run the test against a test fixture. Accepts an object that contains the following +// fields: +// - testFixture: The fixture to run the test against. +// - conn: Connection to the test fixture specified above. +// - shardLocal and shardOutput: Indicates whether the local/output collection should be sharded in +// this test run (ignored when not running against a sharded cluster). +function runTest({testFixture, conn, shardLocal, shardOutput}) { + const dbName = "db"; + const collName = "merge_let_params"; + const dbCollName = dbName + "." + collName; + const outCollName = "outcoll"; + const dbOutCollName = dbName + "." + outCollName; + const admin = conn.getDB("admin"); + const isReplSet = FixtureHelpers.isReplSet(admin); + + function shardColls() { + // When running against a sharded cluster, configure the collections according to + // 'shardLocal' and 'shardOutput'. + if (!isReplSet) { + assert.commandWorked(admin.runCommand({enableSharding: dbName})); + testFixture.ensurePrimaryShard(dbName, testFixture.shard0.shardName); + if (shardLocal) { + testFixture.shardColl(collName, {_id: 1}, {_id: 0}, {_id: 0}, dbName); + } + if (shardOutput) { + testFixture.shardColl(outCollName, {_id: 1}, {_id: 0}, {_id: 0}, dbName); + } + } + } + const coll = conn.getCollection(dbCollName); + const outColl = conn.getCollection(dbOutCollName); + coll.drop(); + outColl.drop(); + shardColls(); + + // Insert two large documents in both collections. By inserting the documents with the same _id + // values in both collections and splitting these documents between chunks, this will guarantee + // that we need to serialize and send update command(s) across the wire when targeting the + // output collection. + const kOneMB = 1024 * 1024; + const kDataString = "a".repeat(4 * kOneMB); + const kDocs = [{_id: 2, data: kDataString}, {_id: -2, data: kDataString}]; + assert.commandWorked(coll.insertMany(kDocs)); + assert.commandWorked(outColl.insertMany(kDocs)); + + // The sizes of the different update command components are deliberately chosen to test the + // batching logic when the update is targeted to another node in the cluster. In particular, the + // update command will contain the 10MB 'outFieldValue' and we will be updating two 4MB + // documents. The 18MB total exceeds the 16MB size limit, so we expect the batching logic to + // split the two documents into separate batches of 14MB each. + const outFieldValue = "a".repeat(10 * kOneMB); + let aggCommand = { + pipeline: [{ + $merge: { + into: {db: "db", coll: outCollName}, + on: "_id", + whenMatched: [{$addFields: {out: "$$outField"}}], + whenNotMatched: "insert" + } + }], + cursor: {}, + let : {"outField": outFieldValue} + }; + + // If this is a replica set, we need to target a secondary node to force writes to go over + // the wire. + const aggColl = isReplSet ? testFixture.getSecondary().getCollection(dbCollName) : coll; + + if (isReplSet) { + aggCommand["$readPreference"] = {mode: "secondary"}; + } + + // The aggregate should not fail. + assert.commandWorked(aggColl.runCommand("aggregate", aggCommand)); + + // Verify that each document in the output collection contains the value of 'outField'. + let outContents = outColl.find().toArray(); + for (const res of outContents) { + const out = res["out"]; + assert.eq(out, outFieldValue, outContents); + } + + assert(coll.drop()); + assert(outColl.drop()); + shardColls(); + + // Insert four large documents in both collections. As before, this will force updates to be + // sent across the wire, but this will generate double the batches. + const kMoreDocs = [ + {_id: -2, data: kDataString}, + {_id: -1, data: kDataString}, + {_id: 1, data: kDataString}, + {_id: 2, data: kDataString}, + ]; + + assert.commandWorked(coll.insertMany(kMoreDocs)); + assert.commandWorked(outColl.insertMany(kMoreDocs)); + + // The aggregate should not fail. + assert.commandWorked(aggColl.runCommand("aggregate", aggCommand)); + + // Verify that each document in the output collection contains the value of 'outField'. + outContents = outColl.find().toArray(); + for (const res of outContents) { + const out = res["out"]; + assert.eq(out, outFieldValue, outContents); + } + + assert(coll.drop()); + assert(outColl.drop()); + shardColls(); + + // If the documents and the let parameters are large enough, the $merge is expected to fail. + const kVeryLargeDataString = "a".repeat(10 * kOneMB); + const kLargeDocs = + [{_id: 2, data: kVeryLargeDataString}, {_id: -2, data: kVeryLargeDataString}]; + assert.commandWorked(coll.insertMany(kLargeDocs)); + assert.commandWorked(outColl.insertMany(kLargeDocs)); + assert.commandFailedWithCode(aggColl.runCommand("aggregate", aggCommand), + ErrorCodes.BSONObjectTooLarge); +} + +// Test against a replica set. +const rst = new ReplSetTest({nodes: 2}); +rst.startSet(); +rst.initiate(); +rst.awaitSecondaryNodes(); + +runTest({testFixture: rst, conn: rst.getPrimary()}); + +rst.stopSet(); + +// Test against a sharded cluster. +const st = new ShardingTest({shards: 2, mongos: 1}); +runTest({testFixture: st, conn: st.s0, shardLocal: false, shardOutput: false}); +runTest({testFixture: st, conn: st.s0, shardLocal: true, shardOutput: false}); +runTest({testFixture: st, conn: st.s0, shardLocal: false, shardOutput: true}); +runTest({testFixture: st, conn: st.s0, shardLocal: true, shardOutput: true}); + +st.stop(); +})(); diff --git a/src/mongo/db/commands/write_commands.cpp b/src/mongo/db/commands/write_commands.cpp index 60e56da7180..ceb797e339c 100644 --- a/src/mongo/db/commands/write_commands.cpp +++ b/src/mongo/db/commands/write_commands.cpp @@ -258,6 +258,11 @@ public: } write_ops::InsertCommandReply typedRun(OperationContext* opCtx) final try { + // On debug builds, verify that the estimated size of the insert command is at least as + // large as the size of the actual, serialized insert command. This ensures that the + // logic which estimates the size of insert commands is correct. + dassert(write_ops::verifySizeEstimate(request())); + doTransactionValidationForWrites(opCtx, ns()); if (request().getEncryptionInformation().has_value()) { // Flag set here and in fle_crud.cpp since this only executes on a mongod. @@ -430,6 +435,11 @@ public: } write_ops::UpdateCommandReply typedRun(OperationContext* opCtx) final try { + // On debug builds, verify that the estimated size of the update command is at least as + // large as the size of the actual, serialized update command. This ensures that the + // logic which estimates the size of update commands is correct. + dassert(write_ops::verifySizeEstimate(request())); + doTransactionValidationForWrites(opCtx, ns()); write_ops::UpdateCommandReply updateReply; OperationSource source = OperationSource::kStandard; @@ -450,15 +460,6 @@ public: source = OperationSource::kTimeseriesUpdate; } - // On debug builds, verify that the estimated size of the updates are at least as large - // as the actual, serialized size. This ensures that the logic that estimates the size - // of deletes for batch writes is correct. - if constexpr (kDebugBuild) { - for (auto&& updateOp : request().getUpdates()) { - invariant(write_ops::verifySizeEstimate(updateOp)); - } - } - long long nModified = 0; // Tracks the upserted information. The memory of this variable gets moved in the @@ -669,6 +670,11 @@ public: } write_ops::DeleteCommandReply typedRun(OperationContext* opCtx) final try { + // On debug builds, verify that the estimated size of the deletes are at least as large + // as the actual, serialized size. This ensures that the logic that estimates the size + // of deletes for batch writes is correct. + dassert(write_ops::verifySizeEstimate(request())); + doTransactionValidationForWrites(opCtx, ns()); write_ops::DeleteCommandReply deleteReply; OperationSource source = OperationSource::kStandard; @@ -686,14 +692,6 @@ public: source = OperationSource::kTimeseriesDelete; } - // On debug builds, verify that the estimated size of the deletes are at least as large - // as the actual, serialized size. This ensures that the logic that estimates the size - // of deletes for batch writes is correct. - if constexpr (kDebugBuild) { - for (auto&& deleteOp : request().getDeletes()) { - invariant(write_ops::verifySizeEstimate(deleteOp)); - } - } auto reply = write_ops_exec::performDeletes(opCtx, request(), source); populateReply(opCtx, diff --git a/src/mongo/db/ops/write_ops.cpp b/src/mongo/db/ops/write_ops.cpp index c57f65ccdf8..7bac799c5cf 100644 --- a/src/mongo/db/ops/write_ops.cpp +++ b/src/mongo/db/ops/write_ops.cpp @@ -57,6 +57,9 @@ namespace { // each element. static constexpr int kPerElementOverhead = 2; +// This constant accounts for the size of a bool. +static constexpr int kBoolSize = 1; + // This constant tracks the overhead for serializing UUIDs. It includes 1 byte for the // 'BinDataType', 4 bytes for serializing the integer size of the UUID, and finally, 16 bytes // for the UUID itself. @@ -86,6 +89,60 @@ void checkOpCountForCommand(const T& op, size_t numOps) { } } +// Utility which estimates the size of 'WriteCommandRequestBase' when serialized. +int getWriteCommandRequestBaseSize(const WriteCommandRequestBase& base) { + static const int kSizeOfOrderedField = + write_ops::WriteCommandRequestBase::kOrderedFieldName.size() + kBoolSize + + kPerElementOverhead; + static const int kSizeOfBypassDocumentValidationField = + write_ops::WriteCommandRequestBase::kBypassDocumentValidationFieldName.size() + kBoolSize + + kPerElementOverhead; + + auto estSize = static_cast<int>(BSONObj::kMinBSONLength) + kSizeOfOrderedField + + kSizeOfBypassDocumentValidationField; + + if (auto stmtId = base.getStmtId(); stmtId) { + estSize += write_ops::WriteCommandRequestBase::kStmtIdFieldName.size() + + sizeof(std::int32_t) + kPerElementOverhead; + } + + if (auto stmtIds = base.getStmtIds(); stmtIds) { + estSize += write_ops::WriteCommandRequestBase::kStmtIdsFieldName.size(); + estSize += static_cast<int>(BSONObj::kMinBSONLength); + estSize += + ((sizeof(std::int32_t) + write_ops::kWriteCommandBSONArrayPerElementOverheadBytes) * + stmtIds->size()); + estSize += kPerElementOverhead; + } + + if (auto isTimeseries = base.getIsTimeseriesNamespace(); isTimeseries.has_value()) { + estSize += write_ops::WriteCommandRequestBase::kIsTimeseriesNamespaceFieldName.size() + + kBoolSize + kPerElementOverhead; + } + + if (auto collUUID = base.getCollectionUUID(); collUUID) { + estSize += write_ops::WriteCommandRequestBase::kCollectionUUIDFieldName.size() + kUUIDSize + + kPerElementOverhead; + } + + if (auto encryptionInfo = base.getEncryptionInformation(); encryptionInfo) { + estSize += write_ops::WriteCommandRequestBase::kEncryptionInformationFieldName.size() + + encryptionInfo->toBSON().objsize() + kPerElementOverhead; + } + + if (auto query = base.getOriginalQuery(); query) { + estSize += write_ops::WriteCommandRequestBase::kOriginalQueryFieldName.size() + + query->objsize() + kPerElementOverhead; + } + + if (auto originalCollation = base.getOriginalCollation(); originalCollation) { + estSize += write_ops::WriteCommandRequestBase::kOriginalCollationFieldName.size() + + originalCollation->objsize() + kPerElementOverhead; + } + + return estSize; +} + } // namespace namespace write_ops { @@ -145,6 +202,38 @@ int32_t getStmtIdForWriteAt(const WriteCommandRequestBase& writeCommandBase, siz return kFirstStmtId + writePos; } +int estimateRuntimeConstantsSize(const mongo::LegacyRuntimeConstants& constants) { + int size = write_ops::UpdateCommandRequest::kLegacyRuntimeConstantsFieldName.size() + + static_cast<int>(BSONObj::kMinBSONLength) + kPerElementOverhead; + + // $$NOW + size += + LegacyRuntimeConstants::kLocalNowFieldName.size() + sizeof(Date_t) + kPerElementOverhead; + + // $$CLUSTER_TIME + size += LegacyRuntimeConstants::kClusterTimeFieldName.size() + sizeof(Timestamp) + + kPerElementOverhead; + + // $$JS_SCOPE + if (const auto& scope = constants.getJsScope(); scope.has_value()) { + size += LegacyRuntimeConstants::kJsScopeFieldName.size() + scope->objsize() + + kPerElementOverhead; + } + + // $$IS_MR + if (const auto& isMR = constants.getIsMapReduce(); isMR.has_value()) { + size += + LegacyRuntimeConstants::kIsMapReduceFieldName.size() + kBoolSize + kPerElementOverhead; + } + + // $$USER_ROLES + if (const auto& userRoles = constants.getUserRoles(); userRoles.has_value()) { + size += LegacyRuntimeConstants::kUserRolesFieldName.size() + userRoles->objsize() + + kPerElementOverhead; + } + return size; +} + int getUpdateSizeEstimate(const BSONObj& q, const write_ops::UpdateModification& u, const boost::optional<mongo::BSONObj>& c, @@ -155,11 +244,6 @@ int getUpdateSizeEstimate(const BSONObj& q, const boost::optional<UUID>& sampleId, const bool includeAllowShardKeyUpdatesWithoutFullShardKeyInQuery) { using UpdateOpEntry = write_ops::UpdateOpEntry; - - // This constant accounts for the null terminator in each field name and the BSONType byte for - // each element. - static const int kPerElementOverhead = 2; - static const int kBoolSize = 1; int estSize = static_cast<int>(BSONObj::kMinBSONLength); // Add the sizes of the 'multi' and 'upsert' fields. @@ -268,6 +352,123 @@ bool verifySizeEstimate(const write_ops::UpdateOpEntry& update) { update.toBSON().objsize(); } +bool verifySizeEstimate(const InsertCommandRequest& insertReq) { + int size = getInsertHeaderSizeEstimate(insertReq); + for (auto&& docToInsert : insertReq.getDocuments()) { + size += docToInsert.objsize() + kWriteCommandBSONArrayPerElementOverheadBytes; + } + return size >= insertReq.toBSON({} /* commandPassthroughFields */).objsize(); +} + +bool verifySizeEstimate(const UpdateCommandRequest& updateReq) { + int size = getUpdateHeaderSizeEstimate(updateReq); + + for (auto&& update : updateReq.getUpdates()) { + size += getUpdateSizeEstimate( + update.getQ(), + update.getU(), + update.getC(), + update.getUpsertSupplied().has_value(), + update.getCollation(), + update.getArrayFilters(), + update.getHint(), + update.getSampleId(), + update.getAllowShardKeyUpdatesWithoutFullShardKeyInQuery().has_value()) + + kWriteCommandBSONArrayPerElementOverheadBytes; + } + return size >= updateReq.toBSON({} /* commandPassthroughFields */).objsize(); +} + +bool verifySizeEstimate(const DeleteCommandRequest& deleteReq) { + int size = getDeleteHeaderSizeEstimate(deleteReq); + + for (auto&& deleteOp : deleteReq.getDeletes()) { + size += write_ops::getDeleteSizeEstimate(deleteOp.getQ(), + deleteOp.getCollation(), + deleteOp.getHint(), + deleteOp.getSampleId()) + + kWriteCommandBSONArrayPerElementOverheadBytes; + } + return size >= deleteReq.toBSON({} /* commandPassthroughFields */).objsize(); +} + +int getInsertHeaderSizeEstimate(const InsertCommandRequest& insertReq) { + int size = getWriteCommandRequestBaseSize(insertReq.getWriteCommandRequestBase()) + + write_ops::InsertCommandRequest::kDocumentsFieldName.size() + kPerElementOverhead + + static_cast<int>(BSONObj::kMinBSONLength); + + size += InsertCommandRequest::kCommandName.size() + kPerElementOverhead + + insertReq.getNamespace().size() + 1 /* ns string null terminator */; + + // Handle $tenant. Note that $tenant is injected as a hidden field into all IDL commands, unlike + // other passthrough fields. + if (auto tenant = insertReq.getDollarTenant(); tenant.has_value()) { + size += InsertCommandRequest::kDollarTenantFieldName.size() + OID::kOIDSize + + kPerElementOverhead; + } + return size; +} + +int getUpdateHeaderSizeEstimate(const UpdateCommandRequest& updateReq) { + int size = getWriteCommandRequestBaseSize(updateReq.getWriteCommandRequestBase()); + + size += UpdateCommandRequest::kCommandName.size() + kPerElementOverhead + + updateReq.getNamespace().size() + 1 /* ns string null terminator */; + + size += write_ops::UpdateCommandRequest::kUpdatesFieldName.size() + kPerElementOverhead + + static_cast<int>(BSONObj::kMinBSONLength); + + // Handle $tenant. Note that $tenant is injected as a hidden field into all IDL commands, unlike + // other passthrough fields. + if (auto tenant = updateReq.getDollarTenant(); tenant.has_value()) { + size += UpdateCommandRequest::kDollarTenantFieldName.size() + OID::kOIDSize + + kPerElementOverhead; + } + + // Handle legacy runtime constants. + if (auto runtimeConstants = updateReq.getLegacyRuntimeConstants(); + runtimeConstants.has_value()) { + size += estimateRuntimeConstantsSize(*runtimeConstants); + } + + // Handle let parameters. + if (auto let = updateReq.getLet(); let.has_value()) { + size += write_ops::UpdateCommandRequest::kLetFieldName.size() + let->objsize() + + kPerElementOverhead; + } + return size; +} + +int getDeleteHeaderSizeEstimate(const DeleteCommandRequest& deleteReq) { + int size = getWriteCommandRequestBaseSize(deleteReq.getWriteCommandRequestBase()); + + size += DeleteCommandRequest::kCommandName.size() + kPerElementOverhead + + deleteReq.getNamespace().size() + 1 /* ns string null terminator */; + + size += write_ops::DeleteCommandRequest::kDeletesFieldName.size() + kPerElementOverhead + + static_cast<int>(BSONObj::kMinBSONLength); + + // Handle $tenant. Note that $tenant is injected as a hidden field into all IDL commands, unlike + // other passthrough fields. + if (auto tenant = deleteReq.getDollarTenant(); tenant.has_value()) { + size += DeleteCommandRequest::kDollarTenantFieldName.size() + OID::kOIDSize + + kPerElementOverhead; + } + + // Handle legacy runtime constants. + if (auto runtimeConstants = deleteReq.getLegacyRuntimeConstants(); + runtimeConstants.has_value()) { + size += estimateRuntimeConstantsSize(*runtimeConstants); + } + + // Handle let parameters. + if (auto let = deleteReq.getLet(); let.has_value()) { + size += write_ops::UpdateCommandRequest::kLetFieldName.size() + let->objsize() + + kPerElementOverhead; + } + return size; +} + bool verifySizeEstimate(const write_ops::DeleteOpEntry& deleteOp) { return write_ops::getDeleteSizeEstimate(deleteOp.getQ(), deleteOp.getCollation(), diff --git a/src/mongo/db/ops/write_ops.h b/src/mongo/db/ops/write_ops.h index 2545c8eb78e..4f44b8e0c95 100644 --- a/src/mongo/db/ops/write_ops.h +++ b/src/mongo/db/ops/write_ops.h @@ -127,6 +127,17 @@ int getDeleteSizeEstimate(const BSONObj& q, */ bool verifySizeEstimate(const write_ops::UpdateOpEntry& update); bool verifySizeEstimate(const write_ops::DeleteOpEntry& deleteOp); +bool verifySizeEstimate(const InsertCommandRequest& insertReq); +bool verifySizeEstimate(const UpdateCommandRequest& updateReq); +bool verifySizeEstimate(const DeleteCommandRequest& deleteReq); + +/** + * Set of utilities which estimate the size of the headers (that is, all fields in a write command + * outside of the write statements themselves) of an insert/update/delete command, respectively. + */ +int getInsertHeaderSizeEstimate(const InsertCommandRequest& insertReq); +int getUpdateHeaderSizeEstimate(const UpdateCommandRequest& updateReq); +int getDeleteHeaderSizeEstimate(const DeleteCommandRequest& deleteReq); /** * If the response from a write command contains any write errors, it will throw the first one. All diff --git a/src/mongo/db/ops/write_ops.idl b/src/mongo/db/ops/write_ops.idl index a2d80295d03..1991a286637 100644 --- a/src/mongo/db/ops/write_ops.idl +++ b/src/mongo/db/ops/write_ops.idl @@ -151,6 +151,8 @@ structs: chained_structs: WriteCommandReplyBase: writeCommandReplyBase + # IMPORTANT: If any changes are made to the fields here, please update the corresponding size + # estimation functions in 'write_ops.cpp'. WriteCommandRequestBase: description: "Contains basic information included by all write commands" strict: false @@ -363,7 +365,8 @@ structs: stability: unstable commands: - + # IMPORTANT: If any changes are made to the fields here, please update the corresponding insert + # size estimation functions in 'write_ops.cpp'. insert: description: "Parser for the 'insert' command." command_name: insert @@ -386,6 +389,8 @@ commands: supports_doc_sequence: true stability: stable + # IMPORTANT: If any changes are made to the fields here, please update the corresponding update + # size estimation functions in 'write_ops.cpp'. update: description: "Parser for the 'update' command." command_name: update @@ -421,6 +426,8 @@ commands: optional: true stability: stable + # IMPORTANT: If any changes are made to the fields here, please update the corresponding delete + # size estimation functions in 'write_ops.cpp'. delete: description: "Parser for the 'delete' command." command_name: delete diff --git a/src/mongo/db/ops/write_ops_exec_test.cpp b/src/mongo/db/ops/write_ops_exec_test.cpp index f977d7878dc..a0210671b7b 100644 --- a/src/mongo/db/ops/write_ops_exec_test.cpp +++ b/src/mongo/db/ops/write_ops_exec_test.cpp @@ -122,6 +122,200 @@ TEST_F(WriteOpsExecTest, TestDeleteSizeEstimationLogic) { ASSERT(write_ops::verifySizeEstimate(deleteOpEntry)); } +TEST_F(WriteOpsExecTest, TestInsertRequestSizeEstimationLogic) { + NamespaceString ns = + NamespaceString::createNamespaceString_forTest("db_write_ops_exec_test", "insert_test"); + write_ops::InsertCommandRequest insert(ns); + BSONObj docToInsert(fromjson("{_id: 1, foo: 1}")); + insert.setDocuments({docToInsert}); + ASSERT(write_ops::verifySizeEstimate(insert)); + + // Configure $tenant. + insert.setDollarTenant(mongo::TenantId(mongo::OID::gen())); + ASSERT(write_ops::verifySizeEstimate(insert)); + + // Configure different fields for 'wcb'. + write_ops::WriteCommandRequestBase wcb; + + // stmtId + wcb.setStmtId(2); + insert.setWriteCommandRequestBase(wcb); + ASSERT(write_ops::verifySizeEstimate(insert)); + + // stmtIds + wcb.setStmtIds(std::vector<int32_t>{2, 3}); + insert.setWriteCommandRequestBase(wcb); + ASSERT(write_ops::verifySizeEstimate(insert)); + + // isTimeseries + wcb.setIsTimeseriesNamespace(true); + insert.setWriteCommandRequestBase(wcb); + ASSERT(write_ops::verifySizeEstimate(insert)); + + // collUUID + wcb.setCollectionUUID(UUID::gen()); + insert.setWriteCommandRequestBase(wcb); + ASSERT(write_ops::verifySizeEstimate(insert)); + + // encryptionInfo + wcb.setEncryptionInformation( + EncryptionInformation(fromjson("{schema: 'I love encrypting and protecting my data'}"))); + insert.setWriteCommandRequestBase(wcb); + ASSERT(write_ops::verifySizeEstimate(insert)); + + // originalQuery + wcb.setOriginalQuery(fromjson("{field: 'value'}")); + insert.setWriteCommandRequestBase(wcb); + ASSERT(write_ops::verifySizeEstimate(insert)); + + // originalCollation + wcb.setOriginalCollation(fromjson("{locale: 'fr'}")); + insert.setWriteCommandRequestBase(wcb); + ASSERT(write_ops::verifySizeEstimate(insert)); +} + +TEST_F(WriteOpsExecTest, TestUpdateRequestSizeEstimationLogic) { + NamespaceString ns = + NamespaceString::createNamespaceString_forTest("db_write_ops_exec_test", "update_test"); + write_ops::UpdateCommandRequest update(ns); + + const BSONObj updateStmt = fromjson("{$set: {a: 5}}"); + auto mod = write_ops::UpdateModification::parseFromClassicUpdate(updateStmt); + write_ops::UpdateOpEntry updateOpEntry(BSON("_id" << 1), std::move(mod)); + update.setUpdates({updateOpEntry}); + + ASSERT(write_ops::verifySizeEstimate(update)); + + // Configure $tenant. + update.setDollarTenant(mongo::TenantId(mongo::OID::gen())); + ASSERT(write_ops::verifySizeEstimate(update)); + + // Configure different fields for 'wcb'. + write_ops::WriteCommandRequestBase wcb; + + // stmtId + wcb.setStmtId(2); + update.setWriteCommandRequestBase(wcb); + ASSERT(write_ops::verifySizeEstimate(update)); + + // stmtIds + wcb.setStmtIds(std::vector<int32_t>{2, 3}); + update.setWriteCommandRequestBase(wcb); + ASSERT(write_ops::verifySizeEstimate(update)); + + // isTimeseries + wcb.setIsTimeseriesNamespace(true); + update.setWriteCommandRequestBase(wcb); + ASSERT(write_ops::verifySizeEstimate(update)); + + // collUUID + wcb.setCollectionUUID(UUID::gen()); + update.setWriteCommandRequestBase(wcb); + ASSERT(write_ops::verifySizeEstimate(update)); + + // encryptionInfo + wcb.setEncryptionInformation( + EncryptionInformation(fromjson("{schema: 'I love encrypting and protecting my data'}"))); + update.setWriteCommandRequestBase(wcb); + ASSERT(write_ops::verifySizeEstimate(update)); + + // originalQuery + wcb.setOriginalQuery(fromjson("{field: 'value'}")); + update.setWriteCommandRequestBase(wcb); + ASSERT(write_ops::verifySizeEstimate(update)); + + // originalCollation + wcb.setOriginalCollation(fromjson("{locale: 'fr'}")); + update.setWriteCommandRequestBase(wcb); + ASSERT(write_ops::verifySizeEstimate(update)); + + // Configure different fields specific to 'UpdateStatementRequest'. + LegacyRuntimeConstants legacyRuntimeConstants; + const auto now = Date_t::now(); + + // At a minimum, $$NOW and $$CLUSTER_TIME must be set. + legacyRuntimeConstants.setLocalNow(now); + legacyRuntimeConstants.setClusterTime(Timestamp(now)); + update.setLegacyRuntimeConstants(legacyRuntimeConstants); + ASSERT(write_ops::verifySizeEstimate(update)); + + // $$JS_SCOPE + BSONObj jsScope = fromjson("{constant: 'I love mapReduce and javascript :D'}"); + legacyRuntimeConstants.setJsScope(jsScope); + update.setLegacyRuntimeConstants(legacyRuntimeConstants); + ASSERT(write_ops::verifySizeEstimate(update)); + + // $$IS_MR + legacyRuntimeConstants.setIsMapReduce(true); + update.setLegacyRuntimeConstants(legacyRuntimeConstants); + ASSERT(write_ops::verifySizeEstimate(update)); + + // $$USER_ROLES + BSONArray arr = BSON_ARRAY(fromjson("{role: 'readWriteAnyDatabase', db: 'admin'}")); + legacyRuntimeConstants.setUserRoles(arr); + update.setLegacyRuntimeConstants(legacyRuntimeConstants); + ASSERT(write_ops::verifySizeEstimate(update)); + + const std::string kLargeString(100 * 1024, 'b'); + BSONObj letParams = BSON("largeStrParam" << kLargeString); + update.setLet(letParams); + ASSERT(write_ops::verifySizeEstimate(update)); +} + +TEST_F(WriteOpsExecTest, TestDeleteRequestSizeEstimationLogic) { + NamespaceString ns = + NamespaceString::createNamespaceString_forTest("db_write_ops_exec_test", "delete_test"); + write_ops::DeleteCommandRequest deleteReq(ns); + // Basic test case. + write_ops::DeleteOpEntry deleteOpEntry(BSON("_id" << 1), false /* multi */); + deleteReq.setDeletes({deleteOpEntry}); + + ASSERT(write_ops::verifySizeEstimate(deleteReq)); + + // Configure $tenant. + deleteReq.setDollarTenant(mongo::TenantId(mongo::OID::gen())); + ASSERT(write_ops::verifySizeEstimate(deleteReq)); + + // Configure different fields for 'wcb'. + write_ops::WriteCommandRequestBase wcb; + + // stmtId + wcb.setStmtId(2); + deleteReq.setWriteCommandRequestBase(wcb); + ASSERT(write_ops::verifySizeEstimate(deleteReq)); + + // stmtIds + wcb.setStmtIds(std::vector<int32_t>{2, 3}); + deleteReq.setWriteCommandRequestBase(wcb); + ASSERT(write_ops::verifySizeEstimate(deleteReq)); + + // isTimeseries + wcb.setIsTimeseriesNamespace(true); + deleteReq.setWriteCommandRequestBase(wcb); + ASSERT(write_ops::verifySizeEstimate(deleteReq)); + + // collUUID + wcb.setCollectionUUID(UUID::gen()); + deleteReq.setWriteCommandRequestBase(wcb); + ASSERT(write_ops::verifySizeEstimate(deleteReq)); + + // encryptionInfo + wcb.setEncryptionInformation( + EncryptionInformation(fromjson("{schema: 'I love encrypting and protecting my data'}"))); + deleteReq.setWriteCommandRequestBase(wcb); + ASSERT(write_ops::verifySizeEstimate(deleteReq)); + + // originalQuery + wcb.setOriginalQuery(fromjson("{field: 'value'}")); + deleteReq.setWriteCommandRequestBase(wcb); + ASSERT(write_ops::verifySizeEstimate(deleteReq)); + + // originalCollation + wcb.setOriginalCollation(fromjson("{locale: 'fr'}")); + deleteReq.setWriteCommandRequestBase(wcb); + ASSERT(write_ops::verifySizeEstimate(deleteReq)); +} + TEST_F(WriteOpsExecTest, PerformAtomicTimeseriesWritesWithTransform) { NamespaceString ns = NamespaceString::createNamespaceString_forTest("db_write_ops_exec_test", "ts"); diff --git a/src/mongo/db/pipeline/document_source_merge.cpp b/src/mongo/db/pipeline/document_source_merge.cpp index 722de5936f9..896a50ad0dc 100644 --- a/src/mongo/db/pipeline/document_source_merge.cpp +++ b/src/mongo/db/pipeline/document_source_merge.cpp @@ -60,6 +60,7 @@ namespace { using MergeStrategyDescriptor = DocumentSourceMerge::MergeStrategyDescriptor; using MergeMode = MergeStrategyDescriptor::MergeMode; using MergeStrategy = MergeStrategyDescriptor::MergeStrategy; +using BatchedCommandGenerator = MergeStrategyDescriptor::BatchedCommandGenerator; using MergeStrategyDescriptorsMap = std::map<const MergeMode, const MergeStrategyDescriptor>; using WhenMatched = MergeStrategyDescriptor::WhenMatched; using WhenNotMatched = MergeStrategyDescriptor::WhenNotMatched; @@ -86,6 +87,55 @@ constexpr auto kPipelineDiscardMode = MergeMode{WhenMatched::kPipeline, WhenNotM const auto kDefaultPipelineLet = BSON("new" << "$$ROOT"); +BatchedCommandGenerator makeInsertCommandGenerator() { + return [](const auto& expCtx, const auto& ns) -> BatchedCommandRequest { + return DocumentSourceMerge::DocumentSourceWriter::makeInsertCommand( + ns, expCtx->bypassDocumentValidation); + }; +} + +BatchedCommandGenerator makeUpdateCommandGenerator() { + return [](const auto& expCtx, const auto& ns) -> BatchedCommandRequest { + write_ops::UpdateCommandRequest updateOp(ns); + updateOp.setWriteCommandRequestBase([&] { + write_ops::WriteCommandRequestBase wcb; + wcb.setOrdered(false); + wcb.setBypassDocumentValidation(expCtx->bypassDocumentValidation); + return wcb; + }()); + auto [constants, letParams] = + expCtx->variablesParseState.transitionalCompatibilitySerialize(expCtx->variables); + updateOp.setLegacyRuntimeConstants(std::move(constants)); + if (!letParams.isEmpty()) { + updateOp.setLet(std::move(letParams)); + } + return BatchedCommandRequest(std::move(updateOp)); + }; +} + +/** + * Converts 'batch' into a vector of UpdateOpEntries. + */ +std::vector<write_ops::UpdateOpEntry> constructUpdateEntries( + DocumentSourceMerge::DocumentSourceWriter::BatchedObjects&& batch, + UpsertType upsert, + bool multi) { + std::vector<write_ops::UpdateOpEntry> updateEntries; + for (auto&& obj : batch) { + write_ops::UpdateOpEntry entry; + auto&& [q, u, c] = obj; + entry.setQ(std::move(q)); + entry.setU(std::move(u)); + entry.setC(std::move(c)); + entry.setUpsert(upsert != UpsertType::kNone); + entry.setUpsertSupplied({{entry.getUpsert(), upsert == UpsertType::kInsertSuppliedDoc}}); + entry.setMulti(multi); + + updateEntries.push_back(std::move(entry)); + } + return updateEntries; +} + /** * Creates a merge strategy which uses update semantics to perform a merge operation. */ @@ -95,10 +145,13 @@ MergeStrategy makeUpdateStrategy() { const auto& wc, auto epoch, auto&& batch, + auto&& bcr, UpsertType upsert) { constexpr auto multi = false; + auto updateCommand = bcr.extractUpdateRequest(); + updateCommand->setUpdates(constructUpdateEntries(std::move(batch), upsert, multi)); uassertStatusOK(expCtx->mongoProcessInterface->update( - expCtx, ns, std::move(batch), wc, upsert, multi, epoch)); + expCtx, ns, std::move(updateCommand), wc, upsert, multi, epoch)); }; } @@ -115,11 +168,14 @@ MergeStrategy makeStrictUpdateStrategy() { const auto& wc, auto epoch, auto&& batch, + auto&& bcr, UpsertType upsert) { const int64_t batchSize = batch.size(); constexpr auto multi = false; + auto updateCommand = bcr.extractUpdateRequest(); + updateCommand->setUpdates(constructUpdateEntries(std::move(batch), upsert, multi)); auto updateResult = uassertStatusOK(expCtx->mongoProcessInterface->update( - expCtx, ns, std::move(batch), wc, upsert, multi, epoch)); + expCtx, ns, std::move(updateCommand), wc, upsert, multi, epoch)); uassert(ErrorCodes::MergeStageNoMatchingDocument, "{} could not find a matching document in the target collection " "for at least one document in the source collection"_format(kStageName), @@ -136,6 +192,7 @@ MergeStrategy makeInsertStrategy() { const auto& wc, auto epoch, auto&& batch, + auto&& bcr, UpsertType upsertType) { std::vector<BSONObj> objectsToInsert(batch.size()); // The batch stores replacement style updates, but for this "insert" style of $merge we'd @@ -143,8 +200,10 @@ MergeStrategy makeInsertStrategy() { std::transform(batch.begin(), batch.end(), objectsToInsert.begin(), [](const auto& obj) { return std::get<UpdateModification>(obj).getUpdateReplacement(); }); - uassertStatusOK(expCtx->mongoProcessInterface->insert( - expCtx, ns, std::move(objectsToInsert), wc, epoch)); + auto insertCommand = bcr.extractInsertRequest(); + insertCommand->setDocuments(std::move(objectsToInsert)); + uassertStatusOK( + expCtx->mongoProcessInterface->insert(expCtx, ns, std::move(insertCommand), wc, epoch)); }; } @@ -174,72 +233,95 @@ const MergeStrategyDescriptorsMap& getDescriptors() { // be initialized first. By wrapping the map into a function we can guarantee that it won't be // initialized until the first use, which is when the program already started and all global // variables had been initialized. - static const auto mergeStrategyDescriptors = MergeStrategyDescriptorsMap{ - // whenMatched: replace, whenNotMatched: insert - {kReplaceInsertMode, - {kReplaceInsertMode, - {ActionType::insert, ActionType::update}, - makeUpdateStrategy(), - {}, - UpsertType::kGenerateNewDoc}}, - // whenMatched: replace, whenNotMatched: fail - {kReplaceFailMode, - {kReplaceFailMode, - {ActionType::update}, - makeStrictUpdateStrategy(), - {}, - UpsertType::kNone}}, - // whenMatched: replace, whenNotMatched: discard - {kReplaceDiscardMode, - {kReplaceDiscardMode, {ActionType::update}, makeUpdateStrategy(), {}, UpsertType::kNone}}, - // whenMatched: merge, whenNotMatched: insert - {kMergeInsertMode, - {kMergeInsertMode, - {ActionType::insert, ActionType::update}, - makeUpdateStrategy(), - makeUpdateTransform("$set"), - UpsertType::kGenerateNewDoc}}, - // whenMatched: merge, whenNotMatched: fail - {kMergeFailMode, - {kMergeFailMode, - {ActionType::update}, - makeStrictUpdateStrategy(), - makeUpdateTransform("$set"), - UpsertType::kNone}}, - // whenMatched: merge, whenNotMatched: discard - {kMergeDiscardMode, - {kMergeDiscardMode, - {ActionType::update}, - makeUpdateStrategy(), - makeUpdateTransform("$set"), - UpsertType::kNone}}, - // whenMatched: keepExisting, whenNotMatched: insert - {kKeepExistingInsertMode, - {kKeepExistingInsertMode, - {ActionType::insert, ActionType::update}, - makeUpdateStrategy(), - makeUpdateTransform("$setOnInsert"), - UpsertType::kGenerateNewDoc}}, - // whenMatched: [pipeline], whenNotMatched: insert - {kPipelineInsertMode, - {kPipelineInsertMode, - {ActionType::insert, ActionType::update}, - makeUpdateStrategy(), - {}, - UpsertType::kInsertSuppliedDoc}}, - // whenMatched: [pipeline], whenNotMatched: fail - {kPipelineFailMode, - {kPipelineFailMode, - {ActionType::update}, - makeStrictUpdateStrategy(), - {}, - UpsertType::kNone}}, - // whenMatched: [pipeline], whenNotMatched: discard - {kPipelineDiscardMode, - {kPipelineDiscardMode, {ActionType::update}, makeUpdateStrategy(), {}, UpsertType::kNone}}, - // whenMatched: fail, whenNotMatched: insert - {kFailInsertMode, - {kFailInsertMode, {ActionType::insert}, makeInsertStrategy(), {}, UpsertType::kNone}}}; + static const auto mergeStrategyDescriptors = + MergeStrategyDescriptorsMap{// whenMatched: replace, whenNotMatched: insert + {kReplaceInsertMode, + {kReplaceInsertMode, + {ActionType::insert, ActionType::update}, + makeUpdateStrategy(), + {}, + UpsertType::kGenerateNewDoc, + makeUpdateCommandGenerator()}}, + // whenMatched: replace, whenNotMatched: fail + {kReplaceFailMode, + {kReplaceFailMode, + {ActionType::update}, + makeStrictUpdateStrategy(), + {}, + UpsertType::kNone, + makeUpdateCommandGenerator()}}, + // whenMatched: replace, whenNotMatched: discard + {kReplaceDiscardMode, + {kReplaceDiscardMode, + {ActionType::update}, + makeUpdateStrategy(), + {}, + UpsertType::kNone, + makeUpdateCommandGenerator()}}, + // whenMatched: merge, whenNotMatched: insert + {kMergeInsertMode, + {kMergeInsertMode, + {ActionType::insert, ActionType::update}, + makeUpdateStrategy(), + makeUpdateTransform("$set"), + UpsertType::kGenerateNewDoc, + makeUpdateCommandGenerator()}}, + // whenMatched: merge, whenNotMatched: fail + {kMergeFailMode, + {kMergeFailMode, + {ActionType::update}, + makeStrictUpdateStrategy(), + makeUpdateTransform("$set"), + UpsertType::kNone, + makeUpdateCommandGenerator()}}, + // whenMatched: merge, whenNotMatched: discard + {kMergeDiscardMode, + {kMergeDiscardMode, + {ActionType::update}, + makeUpdateStrategy(), + makeUpdateTransform("$set"), + UpsertType::kNone, + makeUpdateCommandGenerator()}}, + // whenMatched: keepExisting, whenNotMatched: insert + {kKeepExistingInsertMode, + {kKeepExistingInsertMode, + {ActionType::insert, ActionType::update}, + makeUpdateStrategy(), + makeUpdateTransform("$setOnInsert"), + UpsertType::kGenerateNewDoc, + makeUpdateCommandGenerator()}}, + // whenMatched: [pipeline], whenNotMatched: insert + {kPipelineInsertMode, + {kPipelineInsertMode, + {ActionType::insert, ActionType::update}, + makeUpdateStrategy(), + {}, + UpsertType::kInsertSuppliedDoc, + makeUpdateCommandGenerator()}}, + // whenMatched: [pipeline], whenNotMatched: fail + {kPipelineFailMode, + {kPipelineFailMode, + {ActionType::update}, + makeStrictUpdateStrategy(), + {}, + UpsertType::kNone, + makeUpdateCommandGenerator()}}, + // whenMatched: [pipeline], whenNotMatched: discard + {kPipelineDiscardMode, + {kPipelineDiscardMode, + {ActionType::update}, + makeUpdateStrategy(), + {}, + UpsertType::kNone, + makeUpdateCommandGenerator()}}, + // whenMatched: fail, whenNotMatched: insert + {kFailInsertMode, + {kFailInsertMode, + {ActionType::insert}, + makeInsertStrategy(), + {}, + UpsertType::kNone, + makeInsertCommandGenerator()}}}; return mergeStrategyDescriptors; } @@ -599,14 +681,19 @@ std::pair<DocumentSourceMerge::BatchObject, int> DocumentSourceMerge::makeBatchO _writeSizeEstimator->estimateUpdateSizeBytes(batchObject, _descriptor.upsertType)}; } -void DocumentSourceMerge::spill(BatchedObjects&& batch) try { +void DocumentSourceMerge::spill(BatchedCommandRequest&& bcr, BatchedObjects&& batch) try { DocumentSourceWriteBlock writeBlock(pExpCtx->opCtx); auto targetEpoch = _targetCollectionPlacementVersion ? boost::optional<OID>(_targetCollectionPlacementVersion->epoch()) : boost::none; - _descriptor.strategy( - pExpCtx, _outputNs, _writeConcern, targetEpoch, std::move(batch), _descriptor.upsertType); + _descriptor.strategy(pExpCtx, + _outputNs, + _writeConcern, + targetEpoch, + std::move(batch), + std::move(bcr), + _descriptor.upsertType); } catch (const ExceptionFor<ErrorCodes::ImmutableField>& ex) { uassertStatusOKWithContext(ex.toStatus(), "$merge failed to update the matching document, did you " @@ -630,6 +717,10 @@ void DocumentSourceMerge::spill(BatchedObjects&& batch) try { } } +BatchedCommandRequest DocumentSourceMerge::initializeBatchedWriteRequest() const { + return _descriptor.batchedCommandGenerator(pExpCtx, _outputNs); +} + void DocumentSourceMerge::waitWhileFailPointEnabled() { CurOpFailpointHelpers::waitWhileFailPointEnabled( &hangWhileBuildingDocumentSourceMergeBatch, diff --git a/src/mongo/db/pipeline/document_source_merge.h b/src/mongo/db/pipeline/document_source_merge.h index b0eed7a6df9..0349374fadb 100644 --- a/src/mongo/db/pipeline/document_source_merge.h +++ b/src/mongo/db/pipeline/document_source_merge.h @@ -48,8 +48,9 @@ public: // A descriptor for a merge strategy. Holds a merge strategy function and a set of actions the // client should be authorized to perform in order to be able to execute a merge operation using - // this merge strategy. If a 'BatchTransform' function is provided, it will be called when - // constructing a batch object to transform updates. + // this merge strategy. Additionally holds a 'BatchedCommandGenerator' that will initialize a + // BatchedWriteRequest for executing the batch write. If a 'BatchTransform' function is + // provided, it will be called when constructing a batch object to transform updates. struct MergeStrategyDescriptor { using WhenMatched = MergeWhenMatchedModeEnum; using WhenNotMatched = MergeWhenNotMatchedModeEnum; @@ -62,13 +63,19 @@ public: const WriteConcernOptions&, boost::optional<OID>, BatchedObjects&&, + BatchedCommandRequest&&, UpsertType upsert)>; + // A function object that will be invoked to generate a BatchedCommandRequest. + using BatchedCommandGenerator = std::function<BatchedCommandRequest( + const boost::intrusive_ptr<ExpressionContext>&, const NamespaceString&)>; + MergeMode mode; ActionSet actions; MergeStrategy strategy; BatchTransform transform; UpsertType upsertType; + BatchedCommandGenerator batchedCommandGenerator; }; /** @@ -218,7 +225,9 @@ private: return bob.obj(); } - void spill(BatchedObjects&& batch) override; + void spill(BatchedCommandRequest&& bcr, BatchedObjects&& batch) override; + + BatchedCommandRequest initializeBatchedWriteRequest() const override; void waitWhileFailPointEnabled() override; diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp index 8763cb0874c..e45933d4f73 100644 --- a/src/mongo/db/pipeline/document_source_out.cpp +++ b/src/mongo/db/pipeline/document_source_out.cpp @@ -284,6 +284,13 @@ void DocumentSourceOut::finalize() { _timeseriesStateConsistent = true; } +BatchedCommandRequest DocumentSourceOut::initializeBatchedWriteRequest() const { + // Note that our insert targets '_tempNs' (or the associated timeseries view) since we will + // never write to 'outputNs' directly. + const auto& targetNss = _timeseries ? _tempNs.getTimeseriesViewNamespace() : _tempNs; + return DocumentSourceWriter::makeInsertCommand(targetNss, pExpCtx->bypassDocumentValidation); +} + boost::intrusive_ptr<DocumentSource> DocumentSourceOut::create( NamespaceString outputNs, const boost::intrusive_ptr<ExpressionContext>& expCtx, diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h index 60d6a865c1d..becd7998bd0 100644 --- a/src/mongo/db/pipeline/document_source_out.h +++ b/src/mongo/db/pipeline/document_source_out.h @@ -127,20 +127,23 @@ private: void finalize() override; - void spill(BatchedObjects&& batch) override { + void spill(BatchedCommandRequest&& bcr, BatchedObjects&& batch) override { DocumentSourceWriteBlock writeBlock(pExpCtx->opCtx); + auto insertCommand = bcr.extractInsertRequest(); + insertCommand->setDocuments(std::move(batch)); auto targetEpoch = boost::none; + if (_timeseries) { uassertStatusOK(pExpCtx->mongoProcessInterface->insertTimeseries( pExpCtx, _tempNs.getTimeseriesViewNamespace(), - std::move(batch), + std::move(insertCommand), _writeConcern, targetEpoch)); } else { uassertStatusOK(pExpCtx->mongoProcessInterface->insert( - pExpCtx, _tempNs, std::move(batch), _writeConcern, targetEpoch)); + pExpCtx, _tempNs, std::move(insertCommand), _writeConcern, targetEpoch)); } } @@ -150,6 +153,8 @@ private: return {obj, _writeSizeEstimator->estimateInsertSizeBytes(obj)}; } + BatchedCommandRequest initializeBatchedWriteRequest() const override; + void waitWhileFailPointEnabled() override; /** diff --git a/src/mongo/db/pipeline/document_source_writer.h b/src/mongo/db/pipeline/document_source_writer.h index 124625d6f94..b2c5363d429 100644 --- a/src/mongo/db/pipeline/document_source_writer.h +++ b/src/mongo/db/pipeline/document_source_writer.h @@ -39,6 +39,7 @@ #include "mongo/db/read_concern.h" #include "mongo/db/storage/recovery_unit.h" #include "mongo/rpc/metadata/impersonated_user_metadata.h" +#include "mongo/s/write_ops/batched_command_request.h" namespace mongo { using namespace fmt::literals; @@ -84,11 +85,14 @@ public: /** * This is a base abstract class for all stages performing a write operation into an output * collection. The writes are organized in batches in which elements are objects of the templated - * type 'B'. A subclass must override two methods to be able to write into the output collection: + * type 'B'. A subclass must override the following methods to be able to write into the output + * collection: * - * 1. 'makeBatchObject()' - to create an object of type 'B' from the given 'Document', which is, + * - 'makeBatchObject()' - creates an object of type 'B' from the given 'Document', which is, * essentially, a result of the input source's 'getNext()' . - * 2. 'spill()' - to write the batch into the output collection. + * - 'spill()' - writes the batch into the output collection. + * - 'initializeBatchedWriteRequest()' - initializes the request object for writing a batch to + * the output collection. * * Two other virtual methods exist which a subclass may override: 'initialize()' and 'finalize()', * which are called before the first element is read from the input source, and after the last one @@ -100,6 +104,18 @@ public: using BatchObject = B; using BatchedObjects = std::vector<BatchObject>; + static BatchedCommandRequest makeInsertCommand(const NamespaceString& outputNs, + bool bypassDocumentValidation) { + write_ops::InsertCommandRequest insertOp(outputNs); + insertOp.setWriteCommandRequestBase([&] { + write_ops::WriteCommandRequestBase wcb; + wcb.setOrdered(false); + wcb.setBypassDocumentValidation(bypassDocumentValidation); + return wcb; + }()); + return BatchedCommandRequest(std::move(insertOp)); + } + DocumentSourceWriter(const char* stageName, NamespaceString outputNs, const boost::intrusive_ptr<ExpressionContext>& expCtx) @@ -146,9 +162,31 @@ protected: virtual void finalize() {} /** - * Writes the documents in 'batch' to the output namespace. + * Writes the documents in 'batch' to the output namespace via 'bcr'. + */ + virtual void spill(BatchedCommandRequest&& bcr, BatchedObjects&& batch) = 0; + + /** + * Estimates the size of the header of a batch write (that is, the size of the write command + * minus the size of write statements themselves). + */ + int estimateWriteHeaderSize(const BatchedCommandRequest& bcr) const { + using BatchType = BatchedCommandRequest::BatchType; + switch (bcr.getBatchType()) { + case BatchType::BatchType_Insert: + return _writeSizeEstimator->estimateInsertHeaderSize(bcr.getInsertRequest()); + case BatchType::BatchType_Update: + return _writeSizeEstimator->estimateUpdateHeaderSize(bcr.getUpdateRequest()); + case BatchType::BatchType_Delete: + break; + } + MONGO_UNREACHABLE; + } + + /** + * Constructs and configures a BatchedCommandRequest for performing a batch write. */ - virtual void spill(BatchedObjects&& batch) = 0; + virtual BatchedCommandRequest initializeBatchedWriteRequest() const = 0; /** * Creates a batch object from the given document and returns it to the caller along with the @@ -210,15 +248,20 @@ DocumentSource::GetNextResult DocumentSourceWriter<B>::doGetNext() { // and assume the rest can fit in the 16KB already built into BSONObjMaxUserSize. const auto estimatedMetadataSizeBytes = rpc::estimateImpersonatedUserMetadataSize(pExpCtx->opCtx); + + BatchedCommandRequest batchWrite = initializeBatchedWriteRequest(); + const auto writeHeaderSize = estimateWriteHeaderSize(batchWrite); + const auto initialRequestSize = estimatedMetadataSizeBytes + writeHeaderSize; + uassert(7637800, "Unable to proceed with write while metadata size ({}KB) exceeds {}KB"_format( - estimatedMetadataSizeBytes / 1024, BSONObjMaxUserSize / 1024), - estimatedMetadataSizeBytes <= BSONObjMaxUserSize); + initialRequestSize / 1024, BSONObjMaxUserSize / 1024), + initialRequestSize <= BSONObjMaxUserSize); - const auto maxBatchSizeBytes = BSONObjMaxUserSize - estimatedMetadataSizeBytes; - BatchedObjects batch; - std::size_t bufferedBytes = 0; + const auto maxBatchSizeBytes = BSONObjMaxUserSize - initialRequestSize; + BatchedObjects batch; + size_t bufferedBytes = 0; auto nextInput = pSource->getNext(); for (; nextInput.isAdvanced(); nextInput = pSource->getNext()) { waitWhileFailPointEnabled(); @@ -230,14 +273,15 @@ DocumentSource::GetNextResult DocumentSourceWriter<B>::doGetNext() { if (!batch.empty() && (bufferedBytes > maxBatchSizeBytes || batch.size() >= write_ops::kMaxWriteBatchSize)) { - spill(std::move(batch)); + spill(std::move(batchWrite), std::move(batch)); batch.clear(); + batchWrite = initializeBatchedWriteRequest(); bufferedBytes = objSize; } batch.push_back(obj); } if (!batch.empty()) { - spill(std::move(batch)); + spill(std::move(batchWrite), std::move(batch)); batch.clear(); } diff --git a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp index 9c2dd64172f..c3db7716d48 100644 --- a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp @@ -788,59 +788,6 @@ CommonMongodProcessInterface::ensureFieldsUniqueOrResolveDocumentKey( return {*fieldPaths, targetCollectionPlacementVersion}; } -write_ops::InsertCommandRequest CommonMongodProcessInterface::buildInsertOp( - const NamespaceString& nss, std::vector<BSONObj>&& objs, bool bypassDocValidation) { - write_ops::InsertCommandRequest insertOp(nss); - insertOp.setDocuments(std::move(objs)); - insertOp.setWriteCommandRequestBase([&] { - write_ops::WriteCommandRequestBase wcb; - wcb.setOrdered(false); - wcb.setBypassDocumentValidation(bypassDocValidation); - return wcb; - }()); - return insertOp; -} - -write_ops::UpdateCommandRequest CommonMongodProcessInterface::buildUpdateOp( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& nss, - BatchedObjects&& batch, - UpsertType upsert, - bool multi) { - write_ops::UpdateCommandRequest updateOp(nss); - updateOp.setUpdates([&] { - std::vector<write_ops::UpdateOpEntry> updateEntries; - for (auto&& obj : batch) { - updateEntries.push_back([&] { - write_ops::UpdateOpEntry entry; - auto&& [q, u, c] = obj; - entry.setQ(std::move(q)); - entry.setU(std::move(u)); - entry.setC(std::move(c)); - entry.setUpsert(upsert != UpsertType::kNone); - entry.setUpsertSupplied( - {{entry.getUpsert(), upsert == UpsertType::kInsertSuppliedDoc}}); - entry.setMulti(multi); - return entry; - }()); - } - return updateEntries; - }()); - updateOp.setWriteCommandRequestBase([&] { - write_ops::WriteCommandRequestBase wcb; - wcb.setOrdered(false); - wcb.setBypassDocumentValidation(expCtx->bypassDocumentValidation); - return wcb; - }()); - auto [constants, letParams] = - expCtx->variablesParseState.transitionalCompatibilitySerialize(expCtx->variables); - updateOp.setLegacyRuntimeConstants(std::move(constants)); - if (!letParams.isEmpty()) { - updateOp.setLet(std::move(letParams)); - } - return updateOp; -} - BSONObj CommonMongodProcessInterface::_convertRenameToInternalRename( OperationContext* opCtx, const NamespaceString& sourceNs, diff --git a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h index 09b3ed8fa33..07e2d0370ce 100644 --- a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h @@ -145,23 +145,6 @@ protected: const Document& documentKey, MakePipelineOptions opts); - /** - * Builds an ordered insert op on namespace 'nss' and documents to be written 'objs'. - */ - write_ops::InsertCommandRequest buildInsertOp(const NamespaceString& nss, - std::vector<BSONObj>&& objs, - bool bypassDocValidation); - - /** - * Builds an ordered update op on namespace 'nss' with update entries contained in 'batch'. - */ - write_ops::UpdateCommandRequest buildUpdateOp( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& nss, - BatchedObjects&& batch, - UpsertType upsert, - bool multi); - BSONObj _reportCurrentOpForClient(OperationContext* opCtx, Client* client, CurrentOpTruncateMode truncateOps, diff --git a/src/mongo/db/pipeline/process_interface/common_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_process_interface.cpp index c3a978ca766..5cf57474365 100644 --- a/src/mongo/db/pipeline/process_interface/common_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/common_process_interface.cpp @@ -168,12 +168,6 @@ std::vector<FieldPath> CommonProcessInterface::collectDocumentKeyFieldsActingAsR return {"_id"}; } -std::unique_ptr<CommonProcessInterface::WriteSizeEstimator> -CommonProcessInterface::getWriteSizeEstimator(OperationContext* opCtx, - const NamespaceString& ns) const { - return std::make_unique<LocalWriteSizeEstimator>(); -} - void CommonProcessInterface::updateClientOperationTime(OperationContext* opCtx) const { // In order to support causal consistency in a replica set or a sharded cluster when reading // with secondary read preference, the secondary must propagate the primary's operation time diff --git a/src/mongo/db/pipeline/process_interface/common_process_interface.h b/src/mongo/db/pipeline/process_interface/common_process_interface.h index b3b5f26468f..4a4f1d1e990 100644 --- a/src/mongo/db/pipeline/process_interface/common_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/common_process_interface.h @@ -32,6 +32,7 @@ #include <vector> #include "mongo/bson/bsonobj.h" +#include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/process_interface/mongo_process_interface.h" namespace mongo { @@ -47,10 +48,22 @@ public: /** * Estimates the size of writes that will be executed on the current node. Note that this - * does not account for the full size of an update statement. + * does not account for the full size of an update statement because in the case of local + * writes, we will not have to serialize to BSON and are therefore not subject to the 16MB + * BSONObj size limit. */ class LocalWriteSizeEstimator final : public WriteSizeEstimator { public: + int estimateInsertHeaderSize( + const write_ops::InsertCommandRequest& insertReq) const override { + return 0; + } + + int estimateUpdateHeaderSize( + const write_ops::UpdateCommandRequest& insertReq) const override { + return 0; + } + int estimateInsertSizeBytes(const BSONObj& insert) const override { return insert.objsize(); } @@ -70,6 +83,16 @@ public: */ class TargetPrimaryWriteSizeEstimator final : public WriteSizeEstimator { public: + int estimateInsertHeaderSize( + const write_ops::InsertCommandRequest& insertReq) const override { + return write_ops::getInsertHeaderSizeEstimate(insertReq); + } + + int estimateUpdateHeaderSize( + const write_ops::UpdateCommandRequest& updateReq) const override { + return write_ops::getUpdateHeaderSizeEstimate(updateReq); + } + int estimateInsertSizeBytes(const BSONObj& insert) const override { return insert.objsize() + write_ops::kWriteCommandBSONArrayPerElementOverheadBytes; } @@ -109,8 +132,6 @@ public: virtual std::vector<FieldPath> collectDocumentKeyFieldsActingAsRouter( OperationContext*, const NamespaceString&) const override; - std::unique_ptr<WriteSizeEstimator> getWriteSizeEstimator( - OperationContext* opCtx, const NamespaceString& ns) const override; virtual void updateClientOperationTime(OperationContext* opCtx) const final; diff --git a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h index 1aa7d6be8de..05ee7dd7ad7 100644 --- a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h @@ -112,8 +112,19 @@ public: public: virtual ~WriteSizeEstimator() = default; + /** + * Set of functions which estimate the entire size of a write command except for the array + * of write statements themselves. + */ + virtual int estimateInsertHeaderSize( + const write_ops::InsertCommandRequest& insertReq) const = 0; + virtual int estimateUpdateHeaderSize( + const write_ops::UpdateCommandRequest& updateReq) const = 0; + + /** + * Set of functions which estimate the size of a single write statement. + */ virtual int estimateInsertSizeBytes(const BSONObj& insert) const = 0; - virtual int estimateUpdateSizeBytes(const BatchObject& batchObject, UpsertType type) const = 0; }; @@ -168,34 +179,35 @@ public: virtual void updateClientOperationTime(OperationContext* opCtx) const = 0; /** - * Inserts 'objs' into 'ns' and returns an error Status if the insert fails. If 'targetEpoch' is - * set, throws ErrorCodes::StaleEpoch if the targeted collection does not have the same epoch or - * the epoch changes during the course of the insert. + * Executes 'insertCommand' against 'ns' and returns an error Status if the insert fails. If + * 'targetEpoch' is set, throws ErrorCodes::StaleEpoch if the targeted collection does not have + * the same epoch or the epoch changes during the course of the insert. */ virtual Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - std::vector<BSONObj>&& objs, + std::unique_ptr<write_ops::InsertCommandRequest> insertCommand, const WriteConcernOptions& wc, boost::optional<OID> targetEpoch) = 0; virtual Status insertTimeseries(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - std::vector<BSONObj>&& objs, + std::unique_ptr<write_ops::InsertCommandRequest> insertCommand, const WriteConcernOptions& wc, boost::optional<OID> targetEpoch) = 0; /** - * Updates the documents matching 'queries' with the objects 'updates'. Returns an error Status - * if any of the updates fail, otherwise returns an 'UpdateResult' objects with the details of - * the update operation. If 'targetEpoch' is set, throws ErrorCodes::StaleEpoch if the targeted - * collection does not have the same epoch, or if the epoch changes during the update. - */ - virtual StatusWith<UpdateResult> update(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& ns, - BatchedObjects&& batch, - const WriteConcernOptions& wc, - UpsertType upsert, - bool multi, - boost::optional<OID> targetEpoch) = 0; + * Executes the updates described by 'updateCommand'. Returns an error Status if any of the + * updates fail, otherwise returns an 'UpdateResult' objects with the details of the update + * operation. If 'targetEpoch' is set, throws ErrorCodes::StaleEpoch if the targeted collection + * does not have the same epoch, or if the epoch changes during the update. + */ + virtual StatusWith<UpdateResult> update( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + std::unique_ptr<write_ops::UpdateCommandRequest> updateCommand, + const WriteConcernOptions& wc, + UpsertType upsert, + bool multi, + boost::optional<OID> targetEpoch) = 0; /** * Returns index usage statistics for each index on collection 'ns' along with additional diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp index 9e4a6f7cce7..33e900c06a9 100644 --- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp @@ -98,7 +98,7 @@ bool supportsUniqueKey(const boost::intrusive_ptr<ExpressionContext>& expCtx, } // namespace -std::unique_ptr<CommonProcessInterface::WriteSizeEstimator> +std::unique_ptr<MongoProcessInterface::WriteSizeEstimator> MongosProcessInterface::getWriteSizeEstimator(OperationContext* opCtx, const NamespaceString& ns) const { return std::make_unique<TargetPrimaryWriteSizeEstimator>(); diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h index a7dea455057..0ea14f6dcc2 100644 --- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h @@ -72,7 +72,7 @@ public: Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - std::vector<BSONObj>&& objs, + std::unique_ptr<write_ops::InsertCommandRequest> insertCommand, const WriteConcernOptions& wc, boost::optional<OID>) final { MONGO_UNREACHABLE; @@ -80,7 +80,7 @@ public: Status insertTimeseries(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - std::vector<BSONObj>&& objs, + std::unique_ptr<write_ops::InsertCommandRequest> insertCommand, const WriteConcernOptions& wc, boost::optional<OID> targetEpoch) final { MONGO_UNREACHABLE; @@ -88,7 +88,7 @@ public: StatusWith<UpdateResult> update(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - BatchedObjects&& batch, + std::unique_ptr<write_ops::UpdateCommandRequest> updateCommand, const WriteConcernOptions& wc, UpsertType upsert, bool multi, diff --git a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp index e5bceac9686..f153109e0e6 100644 --- a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp @@ -108,13 +108,13 @@ boost::optional<Document> NonShardServerProcessInterface::lookupSingleDocument( return lookedUpDocument; } -Status NonShardServerProcessInterface::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& ns, - std::vector<BSONObj>&& objs, - const WriteConcernOptions& wc, - boost::optional<OID> targetEpoch) { - auto writeResults = write_ops_exec::performInserts( - expCtx->opCtx, buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation)); +Status NonShardServerProcessInterface::insert( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + std::unique_ptr<write_ops::InsertCommandRequest> insertCommand, + const WriteConcernOptions& wc, + boost::optional<OID> targetEpoch) { + auto writeResults = write_ops_exec::performInserts(expCtx->opCtx, *insertCommand); // Need to check each result in the batch since the writes are unordered. for (const auto& result : writeResults.results) { @@ -128,12 +128,11 @@ Status NonShardServerProcessInterface::insert(const boost::intrusive_ptr<Express Status NonShardServerProcessInterface::insertTimeseries( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - std::vector<BSONObj>&& objs, + std::unique_ptr<write_ops::InsertCommandRequest> insertCommand, const WriteConcernOptions& wc, boost::optional<OID> targetEpoch) { try { - auto insertReply = write_ops_exec::performTimeseriesWrites( - expCtx->opCtx, buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation)); + auto insertReply = write_ops_exec::performTimeseriesWrites(expCtx->opCtx, *insertCommand); checkWriteErrors(insertReply.getWriteCommandReplyBase()); } catch (DBException& ex) { @@ -146,13 +145,12 @@ Status NonShardServerProcessInterface::insertTimeseries( StatusWith<MongoProcessInterface::UpdateResult> NonShardServerProcessInterface::update( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - BatchedObjects&& batch, + std::unique_ptr<write_ops::UpdateCommandRequest> updateCommand, const WriteConcernOptions& wc, UpsertType upsert, bool multi, boost::optional<OID> targetEpoch) { - auto writeResults = write_ops_exec::performUpdates( - expCtx->opCtx, buildUpdateOp(expCtx, ns, std::move(batch), upsert, multi)); + auto writeResults = write_ops_exec::performUpdates(expCtx->opCtx, *updateCommand); // Need to check each result in the batch since the writes are unordered. UpdateResult updateResult; diff --git a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h index 9bae8030aac..639cc22d044 100644 --- a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h @@ -91,19 +91,19 @@ public: Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - std::vector<BSONObj>&& objs, + std::unique_ptr<write_ops::InsertCommandRequest> insertCommand, const WriteConcernOptions& wc, boost::optional<OID> targetEpoch) override; Status insertTimeseries(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - std::vector<BSONObj>&& objs, + std::unique_ptr<write_ops::InsertCommandRequest> insertCommand, const WriteConcernOptions& wc, boost::optional<OID> targetEpoch) override; StatusWith<UpdateResult> update(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - BatchedObjects&& batch, + std::unique_ptr<write_ops::UpdateCommandRequest> updateCommand, const WriteConcernOptions& wc, UpsertType upsert, bool multi, diff --git a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp index ad504860169..3195b504faf 100644 --- a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp @@ -69,26 +69,27 @@ void ReplicaSetNodeProcessInterface::setReplicaSetNodeExecutor( replicaSetNodeExecutor(service) = std::move(executor); } -Status ReplicaSetNodeProcessInterface::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& ns, - std::vector<BSONObj>&& objs, - const WriteConcernOptions& wc, - boost::optional<OID> targetEpoch) { +Status ReplicaSetNodeProcessInterface::insert( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + std::unique_ptr<write_ops::InsertCommandRequest> insertCommand, + const WriteConcernOptions& wc, + boost::optional<OID> targetEpoch) { auto&& opCtx = expCtx->opCtx; if (_canWriteLocally(opCtx, ns)) { - return NonShardServerProcessInterface::insert(expCtx, ns, std::move(objs), wc, targetEpoch); + return NonShardServerProcessInterface::insert( + expCtx, ns, std::move(insertCommand), wc, targetEpoch); } - BatchedCommandRequest insertCommand( - buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation)); + BatchedCommandRequest batchInsertCommand(std::move(insertCommand)); - return _executeCommandOnPrimary(opCtx, ns, std::move(insertCommand.toBSON())).getStatus(); + return _executeCommandOnPrimary(opCtx, ns, batchInsertCommand.toBSON()).getStatus(); } StatusWith<MongoProcessInterface::UpdateResult> ReplicaSetNodeProcessInterface::update( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - BatchedObjects&& batch, + std::unique_ptr<write_ops::UpdateCommandRequest> updateCommand, const WriteConcernOptions& wc, UpsertType upsert, bool multi, @@ -96,11 +97,11 @@ StatusWith<MongoProcessInterface::UpdateResult> ReplicaSetNodeProcessInterface:: auto&& opCtx = expCtx->opCtx; if (_canWriteLocally(opCtx, ns)) { return NonShardServerProcessInterface::update( - expCtx, ns, std::move(batch), wc, upsert, multi, targetEpoch); + expCtx, ns, std::move(updateCommand), wc, upsert, multi, targetEpoch); } + BatchedCommandRequest batchUpdateCommand(std::move(updateCommand)); - BatchedCommandRequest updateCommand(buildUpdateOp(expCtx, ns, std::move(batch), upsert, multi)); - auto result = _executeCommandOnPrimary(opCtx, ns, std::move(updateCommand.toBSON())); + auto result = _executeCommandOnPrimary(opCtx, ns, batchUpdateCommand.toBSON()); if (!result.isOK()) { return result.getStatus(); } @@ -142,14 +143,15 @@ void ReplicaSetNodeProcessInterface::createTimeseriesView(OperationContext* opCt Status ReplicaSetNodeProcessInterface::insertTimeseries( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - std::vector<BSONObj>&& objs, + std::unique_ptr<write_ops::InsertCommandRequest> insertCommand, const WriteConcernOptions& wc, boost::optional<OID> targetEpoch) { if (_canWriteLocally(expCtx->opCtx, ns)) { return NonShardServerProcessInterface::insertTimeseries( - expCtx, ns, std::move(objs), wc, targetEpoch); + expCtx, ns, std::move(insertCommand), wc, targetEpoch); } else { - return ReplicaSetNodeProcessInterface::insert(expCtx, ns, std::move(objs), wc, targetEpoch); + return ReplicaSetNodeProcessInterface::insert( + expCtx, ns, std::move(insertCommand), wc, targetEpoch); } } diff --git a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h index cf5bb5a90ef..94f45559384 100644 --- a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h @@ -68,12 +68,13 @@ public: Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - std::vector<BSONObj>&& objs, + std::unique_ptr<write_ops::InsertCommandRequest> insertCommand, const WriteConcernOptions& wc, boost::optional<OID> targetEpoch) final; + StatusWith<UpdateResult> update(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - BatchedObjects&& batch, + std::unique_ptr<write_ops::UpdateCommandRequest> updateCommand, const WriteConcernOptions& wc, UpsertType upsert, bool multi, @@ -100,7 +101,7 @@ public: Status insertTimeseries(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - std::vector<BSONObj>&& objs, + std::unique_ptr<write_ops::InsertCommandRequest> insertCommand, const WriteConcernOptions& wc, boost::optional<OID> targetEpoch); diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp index bfea6b001a6..dd9367b09e2 100644 --- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp @@ -123,20 +123,20 @@ boost::optional<Document> ShardServerProcessInterface::lookupSingleDocument( return doLookupSingleDocument(expCtx, nss, collectionUUID, documentKey, std::move(opts)); } -Status ShardServerProcessInterface::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& ns, - std::vector<BSONObj>&& objs, - const WriteConcernOptions& wc, - boost::optional<OID> targetEpoch) { +Status ShardServerProcessInterface::insert( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + std::unique_ptr<write_ops::InsertCommandRequest> insertCommand, + const WriteConcernOptions& wc, + boost::optional<OID> targetEpoch) { BatchedCommandResponse response; BatchWriteExecStats stats; - BatchedCommandRequest insertCommand( - buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation)); + BatchedCommandRequest batchInsertCommand(std::move(insertCommand)); - insertCommand.setWriteConcern(wc.toBSON()); + batchInsertCommand.setWriteConcern(wc.toBSON()); - cluster::write(expCtx->opCtx, insertCommand, &stats, &response, targetEpoch); + cluster::write(expCtx->opCtx, batchInsertCommand, &stats, &response, targetEpoch); return response.toStatus(); } @@ -144,7 +144,7 @@ Status ShardServerProcessInterface::insert(const boost::intrusive_ptr<Expression StatusWith<MongoProcessInterface::UpdateResult> ShardServerProcessInterface::update( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - BatchedObjects&& batch, + std::unique_ptr<write_ops::UpdateCommandRequest> updateCommand, const WriteConcernOptions& wc, UpsertType upsert, bool multi, @@ -152,11 +152,10 @@ StatusWith<MongoProcessInterface::UpdateResult> ShardServerProcessInterface::upd BatchedCommandResponse response; BatchWriteExecStats stats; - BatchedCommandRequest updateCommand(buildUpdateOp(expCtx, ns, std::move(batch), upsert, multi)); - - updateCommand.setWriteConcern(wc.toBSON()); + BatchedCommandRequest batchUpdateCommand(std::move(updateCommand)); + batchUpdateCommand.setWriteConcern(wc.toBSON()); - cluster::write(expCtx->opCtx, updateCommand, &stats, &response, targetEpoch); + cluster::write(expCtx->opCtx, batchUpdateCommand, &stats, &response, targetEpoch); if (auto status = response.toStatus(); status != Status::OK()) { return status; @@ -402,10 +401,11 @@ void ShardServerProcessInterface::createTimeseriesView(OperationContext* opCtx, Status ShardServerProcessInterface::insertTimeseries( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - std::vector<BSONObj>&& objs, + std::unique_ptr<write_ops::InsertCommandRequest> insertCommand, const WriteConcernOptions& wc, boost::optional<OID> targetEpoch) { - return ShardServerProcessInterface::insert(expCtx, ns, std::move(objs), wc, targetEpoch); + return ShardServerProcessInterface::insert( + expCtx, ns, std::move(insertCommand), wc, targetEpoch); } std::unique_ptr<Pipeline, PipelineDeleter> diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h index c81af91b6e0..aef9845f135 100644 --- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h @@ -53,6 +53,11 @@ public: const NamespaceString& nss, ChunkVersion targetCollectionPlacementVersion) const final; + std::unique_ptr<WriteSizeEstimator> getWriteSizeEstimator( + OperationContext* opCtx, const NamespaceString& ns) const final { + return std::make_unique<TargetPrimaryWriteSizeEstimator>(); + } + std::vector<FieldPath> collectDocumentKeyFieldsActingAsRouter( OperationContext*, const NamespaceString&) const final { // We don't expect anyone to use this method on the shard itself (yet). This is currently @@ -71,23 +76,15 @@ public: const Document& documentKey, boost::optional<BSONObj> readConcern) final; - /** - * Inserts the documents 'objs' into the namespace 'ns' using the ClusterWriter for locking, - * routing, stale config handling, etc. - */ Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - std::vector<BSONObj>&& objs, + std::unique_ptr<write_ops::InsertCommandRequest> insertCommand, const WriteConcernOptions& wc, boost::optional<OID> targetEpoch) final; - /** - * Replaces the documents matching 'queries' with 'updates' using the ClusterWriter for locking, - * routing, stale config handling, etc. - */ StatusWith<UpdateResult> update(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - BatchedObjects&& batch, + std::unique_ptr<write_ops::UpdateCommandRequest> updateCommand, const WriteConcernOptions& wc, UpsertType upsert, bool multi, @@ -149,7 +146,7 @@ public: Status insertTimeseries(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - std::vector<BSONObj>&& objs, + std::unique_ptr<write_ops::InsertCommandRequest> insertCommand, const WriteConcernOptions& wc, boost::optional<OID> targetEpoch) final; }; diff --git a/src/mongo/db/pipeline/process_interface/standalone_process_interface.h b/src/mongo/db/pipeline/process_interface/standalone_process_interface.h index aceff8e6928..dc562b9089e 100644 --- a/src/mongo/db/pipeline/process_interface/standalone_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/standalone_process_interface.h @@ -41,6 +41,11 @@ public: StandaloneProcessInterface(std::shared_ptr<executor::TaskExecutor> exec) : NonShardServerProcessInterface(std::move(exec)) {} + std::unique_ptr<MongoProcessInterface::WriteSizeEstimator> getWriteSizeEstimator( + OperationContext* opCtx, const NamespaceString& ns) const final { + return std::make_unique<LocalWriteSizeEstimator>(); + } + virtual ~StandaloneProcessInterface() = default; }; diff --git a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h index 60783a7709b..93ed117a53d 100644 --- a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h @@ -55,6 +55,16 @@ public: class StubWriteSizeEstimator final : public WriteSizeEstimator { public: + int estimateInsertHeaderSize( + const write_ops::InsertCommandRequest& insertReq) const override { + return 0; + } + + int estimateUpdateHeaderSize( + const write_ops::UpdateCommandRequest& insertReq) const override { + return 0; + } + int estimateInsertSizeBytes(const BSONObj& insert) const override { MONGO_UNREACHABLE; } @@ -64,6 +74,7 @@ public: MONGO_UNREACHABLE; } }; + std::unique_ptr<WriteSizeEstimator> getWriteSizeEstimator( OperationContext* opCtx, const NamespaceString& ns) const override { return std::make_unique<StubWriteSizeEstimator>(); @@ -77,7 +88,7 @@ public: Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - std::vector<BSONObj>&& objs, + std::unique_ptr<write_ops::InsertCommandRequest> insertCommand, const WriteConcernOptions& wc, boost::optional<OID>) override { MONGO_UNREACHABLE; @@ -85,7 +96,7 @@ public: Status insertTimeseries(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - std::vector<BSONObj>&& objs, + std::unique_ptr<write_ops::InsertCommandRequest> insertCommand, const WriteConcernOptions& wc, boost::optional<OID> targetEpoch) override { MONGO_UNREACHABLE; @@ -93,7 +104,7 @@ public: StatusWith<UpdateResult> update(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - BatchedObjects&& batch, + std::unique_ptr<write_ops::UpdateCommandRequest> updateCommand, const WriteConcernOptions& wc, UpsertType upsert, bool multi, diff --git a/src/mongo/s/write_ops/batched_command_request.h b/src/mongo/s/write_ops/batched_command_request.h index 8dc815053f1..e5c87d84167 100644 --- a/src/mongo/s/write_ops/batched_command_request.h +++ b/src/mongo/s/write_ops/batched_command_request.h @@ -54,15 +54,25 @@ public: : _batchType(BatchType_Insert), _insertReq(std::make_unique<write_ops::InsertCommandRequest>(std::move(insertOp))) {} + BatchedCommandRequest(std::unique_ptr<write_ops::InsertCommandRequest> insertOp) + : _batchType(BatchType_Insert), _insertReq(std::move(insertOp)) {} + BatchedCommandRequest(write_ops::UpdateCommandRequest updateOp) : _batchType(BatchType_Update), _updateReq(std::make_unique<write_ops::UpdateCommandRequest>(std::move(updateOp))) {} + BatchedCommandRequest(std::unique_ptr<write_ops::UpdateCommandRequest> updateOp) + : _batchType(BatchType_Update), _updateReq(std::move(updateOp)) {} + BatchedCommandRequest(write_ops::DeleteCommandRequest deleteOp) : _batchType(BatchType_Delete), _deleteReq(std::make_unique<write_ops::DeleteCommandRequest>(std::move(deleteOp))) {} + BatchedCommandRequest(std::unique_ptr<write_ops::DeleteCommandRequest> deleteOp) + : _batchType(BatchType_Delete), _deleteReq(std::move(deleteOp)) {} + BatchedCommandRequest(BatchedCommandRequest&&) = default; + BatchedCommandRequest& operator=(BatchedCommandRequest&&) = default; static BatchedCommandRequest parseInsert(const OpMsgRequest& request); static BatchedCommandRequest parseUpdate(const OpMsgRequest& request); @@ -93,6 +103,18 @@ public: return *_deleteReq; } + std::unique_ptr<write_ops::InsertCommandRequest> extractInsertRequest() { + return std::move(_insertReq); + } + + std::unique_ptr<write_ops::UpdateCommandRequest> extractUpdateRequest() { + return std::move(_updateReq); + } + + std::unique_ptr<write_ops::DeleteCommandRequest> extractDeleteRequest() { + return std::move(_deleteReq); + } + std::size_t sizeWriteOps() const; void setWriteConcern(const BSONObj& writeConcern) { |