diff options
author | Mihai Andrei <mihai.andrei@10gen.com> | 2022-10-05 03:38:33 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-10-05 04:03:03 +0000 |
commit | dad5d0a196ffb05b2c5c8b315c33ca46c8b65934 (patch) | |
tree | 6e5af82babe479135c38e21d10bf40b4180d8d72 | |
parent | 38f102485006e7071c0d8ee015ec2da160a0d014 (diff) | |
download | mongo-dad5d0a196ffb05b2c5c8b315c33ca46c8b65934.tar.gz |
SERVER-66289 Update write size estimation logic in DocumentSourceWriter
(cherry picked from commit 707ba0a0ade42c4540b9cabaaf5a257de944cc3e)
(cherry picked from commit c172ccd37516f3c2118f349817cdb1841a2486b9)
(cherry picked from commit 3f2cd9485a807eaeabc60bd99653cffd2942f662)
17 files changed, 434 insertions, 102 deletions
diff --git a/jstests/aggregation/sources/merge/mode_merge_fail.js b/jstests/aggregation/sources/merge/mode_merge_fail.js index 7235c8e1c7e..fcc13f8e871 100644 --- a/jstests/aggregation/sources/merge/mode_merge_fail.js +++ b/jstests/aggregation/sources/merge/mode_merge_fail.js @@ -97,9 +97,13 @@ const pipeline = [mergeStage]; // and updated. (function testMergeUnorderedBatchUpdate() { const maxBatchSize = 16 * 1024 * 1024; // 16MB - const docSize = 1024 * 1024; // 1MB + + // Each document is just under 1MB in order to allow for some extra space for writes that need + // to be serialized over the wire in certain cluster configurations. Otherwise, the number of + // modified/unmodified documents would be off by one depending on how our cluster is configured. + const docSize = 1024 * 1023; const numDocs = 20; - const maxDocsInBatch = maxBatchSize / docSize; + const maxDocsInBatch = Math.floor(maxBatchSize / docSize); assert(source.drop()); dropWithoutImplicitRecreate(target.getName()); diff --git a/jstests/aggregation/sources/merge/mode_replace_fail.js b/jstests/aggregation/sources/merge/mode_replace_fail.js index 88582238c8f..cc3df1b1ce7 100644 --- a/jstests/aggregation/sources/merge/mode_replace_fail.js +++ b/jstests/aggregation/sources/merge/mode_replace_fail.js @@ -90,9 +90,13 @@ const pipeline = [mergeStage]; // and updated. (function testMergeUnorderedBatchUpdate() { const maxBatchSize = 16 * 1024 * 1024; // 16MB - const docSize = 1024 * 1024; // 1MB + + // Each document is just under 1MB in order to allow for some extra space for writes that need + // to be serialized over the wire in certain cluster configurations. Otherwise, the number of + // modified/unmodified documents would be off by one depending on how our cluster is configured. + const docSize = 1024 * 1023; // 1MB const numDocs = 20; - const maxDocsInBatch = maxBatchSize / docSize; + const maxDocsInBatch = Math.floor(maxBatchSize / docSize); assert(source.drop()); dropWithoutImplicitRecreate(target.getName()); diff --git a/jstests/noPassthrough/out_merge_on_secondary_batch_write.js b/jstests/noPassthrough/out_merge_on_secondary_batch_write.js new file mode 100644 index 00000000000..c51ff1afcf2 --- /dev/null +++ b/jstests/noPassthrough/out_merge_on_secondary_batch_write.js @@ -0,0 +1,111 @@ +/** + * Test which verifies that $out/$merge aggregations with secondary read preference which write + * over 16 MB work as expected (especially with respect to producing correctly sized write batches). + * + * @tags: [uses_$out, assumes_read_preference_unchanged, requires_replication] + */ +(function() { +const dbName = "db"; +const collName = "movies"; +const targetCollName = "movies2"; + +function testFn(db) { + const coll = db[collName]; + coll.drop(); + db[targetCollName].drop(); + + // Insert 4 MB more than the maximum bytes allowed in a single write batch worth of data + // serialized as a single BSONObj. + const hello = db.hello(); + const maxBatchSize = hello.maxWriteBatchSize; + const totalDataSize = hello.maxBsonObjectSize + (4 * 1024 * 1024); + const sizePerDoc = totalDataSize / maxBatchSize; + const bigString = "a".repeat(sizePerDoc); + const bulk = coll.initializeUnorderedBulkOp(); + + for (let i = 0; i < maxBatchSize; ++i) { + bulk.insert({_id: NumberInt(i), foo: bigString}); + } + assert.commandWorked(bulk.execute({w: "majority"})); + + function defaultSetUpFn(db) { + db[targetCollName].drop({writeConcern: {w: "majority"}}); + } + + function cleanUpFn(db) { + db[targetCollName].drop({writeConcern: {w: "majority"}}); + } + + function testWriteAggSpec(aggWriteStageSpec, setUpFn, errorCodes = []) { + // Run 'aggWriteStageSpec' with both primary and secondary read preference. + for (const readPref of ["primary", "secondary"]) { + jsTestLog("Testing " + tojson(aggWriteStageSpec) + " with read preference " + readPref); + setUpFn(db); + + // If the caller provided some error codes, assert that the command failed with one + // of these codes. + const fn = () => + db[collName] + .aggregate([aggWriteStageSpec], {$readPreference: {mode: readPref}}) + .itcount(); + const errMsg = "Failed to run aggregate with read preference " + readPref; + if (errorCodes.length > 0) { + assert.throwsWithCode(fn, errorCodes, [] /* params */, errMsg); + } else { + assert.doesNotThrow(fn, [] /* params */, errMsg); + } + cleanUpFn(db); + } + } + + // Set up documents in the output collection so that $merge will perform updates. + function mergeUpdateSetupFn(db) { + defaultSetUpFn(db); + const bulk = db[targetCollName].initializeUnorderedBulkOp(); + for (let i = 0; i < maxBatchSize; ++i) { + bulk.insert({_id: NumberInt(i), extraField: i * 3}); + } + assert.commandWorked(bulk.execute({w: "majority"})); + } + + testWriteAggSpec({$out: targetCollName}, defaultSetUpFn); + testWriteAggSpec( + {$merge: {into: targetCollName, whenMatched: "replace", whenNotMatched: "insert"}}, + defaultSetUpFn); + testWriteAggSpec( + {$merge: {into: targetCollName, whenMatched: "merge", whenNotMatched: "insert"}}, + defaultSetUpFn); + testWriteAggSpec( + {$merge: {into: targetCollName, whenMatched: "keepExisting", whenNotMatched: "insert"}}, + defaultSetUpFn); + testWriteAggSpec( + {$merge: {into: targetCollName, whenMatched: "merge", whenNotMatched: "insert", on: "_id"}}, + mergeUpdateSetupFn); + + // Failure cases. + testWriteAggSpec( + {$merge: {into: targetCollName, whenMatched: "replace", whenNotMatched: "fail", on: "_id"}}, + defaultSetUpFn, + [ErrorCodes.MergeStageNoMatchingDocument]); + testWriteAggSpec( + {$merge: {into: targetCollName, whenMatched: "merge", whenNotMatched: "fail", on: "_id"}}, + defaultSetUpFn, + [ErrorCodes.MergeStageNoMatchingDocument]); + testWriteAggSpec( + {$merge: {into: targetCollName, whenMatched: "fail", whenNotMatched: "insert", on: "_id"}}, + mergeUpdateSetupFn, + [ErrorCodes.DuplicateKey]); +} + +jsTestLog("Testing against a replica set"); +const rst = new ReplSetTest({nodes: 2}); +rst.startSet(); +rst.initiate(); +testFn(new Mongo(rst.getURL()).getDB(dbName)); +rst.stopSet(); + +jsTestLog("Testing against a sharded cluster"); +const st = new ShardingTest({shards: 1, rs: {nodes: 2}}); +testFn(st.s.getDB(dbName)); +st.stop(); +}()); diff --git a/src/mongo/db/ops/write_ops.h b/src/mongo/db/ops/write_ops.h index 8b741524480..aae387fec52 100644 --- a/src/mongo/db/ops/write_ops.h +++ b/src/mongo/db/ops/write_ops.h @@ -89,5 +89,32 @@ const std::vector<BSONObj>& arrayFiltersOf(const T& opEntry) { return opEntry.getArrayFilters().get_value_or(emptyBSONArray); } +/** + * Utility which estimates the size in bytes of an update statement with the given parameters, when + * serialized in the format used for the update command. + */ +int getUpdateSizeEstimate(const BSONObj& q, + const write_ops::UpdateModification& u, + const boost::optional<mongo::BSONObj>& c, + bool includeUpsertSupplied, + const boost::optional<mongo::BSONObj>& collation, + const boost::optional<std::vector<mongo::BSONObj>>& arrayFilters, + const mongo::BSONObj& hint); + +/** + * If the response from a write command contains any write errors, it will throw the first one. All + * the remaining errors will be disregarded. + * + * Usages of this utility for anything other than single-document writes would be suspicious due to + * the fact that it will swallow the remaining ones. + */ +void checkWriteErrors(const WriteCommandBase& reply); + +template <class T> +T checkWriteErrors(T op) { + checkWriteErrors(op.getWriteCommandBase()); + return std::move(op); +} + } // namespace write_ops } // namespace mongo diff --git a/src/mongo/db/ops/write_ops_parsers.cpp b/src/mongo/db/ops/write_ops_parsers.cpp index 51b5e1b2cf7..fcb5fe5ca0f 100644 --- a/src/mongo/db/ops/write_ops_parsers.cpp +++ b/src/mongo/db/ops/write_ops_parsers.cpp @@ -100,6 +100,68 @@ int32_t getStmtIdForWriteAt(const WriteCommandBase& writeCommandBase, size_t wri return kFirstStmtId + writePos; } +int getUpdateSizeEstimate(const BSONObj& q, + const write_ops::UpdateModification& u, + const boost::optional<mongo::BSONObj>& c, + const bool includeUpsertSupplied, + const boost::optional<mongo::BSONObj>& collation, + const boost::optional<std::vector<mongo::BSONObj>>& arrayFilters, + const mongo::BSONObj& hint) { + using UpdateOpEntry = write_ops::UpdateOpEntry; + + // This constant accounts for the null terminator in each field name and the BSONType byte for + // each element. + static const int kPerElementOverhead = 2; + static const int kBoolSize = 1; + int estSize = static_cast<int>(BSONObj::kMinBSONLength); + + // Add the sizes of the 'multi' and 'upsert' fields. + estSize += UpdateOpEntry::kUpsertFieldName.size() + kBoolSize + kPerElementOverhead; + estSize += UpdateOpEntry::kMultiFieldName.size() + kBoolSize + kPerElementOverhead; + + // Add the size of 'upsertSupplied' field if present. + if (includeUpsertSupplied) { + estSize += UpdateOpEntry::kUpsertSuppliedFieldName.size() + kBoolSize + kPerElementOverhead; + } + + // Add the sizes of the 'q' and 'u' fields. + estSize += (UpdateOpEntry::kQFieldName.size() + q.objsize() + kPerElementOverhead + + UpdateOpEntry::kUFieldName.size() + u.objsize() + kPerElementOverhead); + + // Add the size of the 'c' field, if present. + if (c) { + estSize += (UpdateOpEntry::kCFieldName.size() + c->objsize() + kPerElementOverhead); + } + + // Add the size of the 'collation' field, if present. + if (collation) { + estSize += (UpdateOpEntry::kCollationFieldName.size() + collation->objsize() + + kPerElementOverhead); + } + + // Add the size of the 'arrayFilters' field, if present. + if (arrayFilters) { + estSize += ([&]() { + auto size = BSONObj::kMinBSONLength + UpdateOpEntry::kArrayFiltersFieldName.size() + + kPerElementOverhead; + for (auto&& filter : *arrayFilters) { + // For each filter, we not only need to account for the size of the filter itself, + // but also for the per array element overhead. + size += filter.objsize(); + size += write_ops::kWriteCommandBSONArrayPerElementOverheadBytes; + } + return size; + })(); + } + + // Add the size of 'hint' field if present. + if (!hint.isEmpty()) { + estSize += UpdateOpEntry::kHintFieldName.size() + hint.objsize() + kPerElementOverhead; + } + + return estSize; +} + } // namespace write_ops write_ops::Insert InsertOp::parse(const OpMsgRequest& request) { diff --git a/src/mongo/db/pipeline/document_source_merge.cpp b/src/mongo/db/pipeline/document_source_merge.cpp index 09da5fea8d1..ce2a78e2437 100644 --- a/src/mongo/db/pipeline/document_source_merge.cpp +++ b/src/mongo/db/pipeline/document_source_merge.cpp @@ -57,7 +57,7 @@ using MergeStrategy = MergeStrategyDescriptor::MergeStrategy; using MergeStrategyDescriptorsMap = std::map<const MergeMode, const MergeStrategyDescriptor>; using WhenMatched = MergeStrategyDescriptor::WhenMatched; using WhenNotMatched = MergeStrategyDescriptor::WhenNotMatched; -using BatchTransform = std::function<void(DocumentSourceMerge::BatchedObjects&)>; +using BatchTransform = DocumentSourceMerge::BatchTransform; using UpdateModification = write_ops::UpdateModification; using UpsertType = MongoProcessInterface::UpsertType; @@ -81,17 +81,15 @@ const auto kDefaultPipelineLet = BSON("new" << "$$ROOT"); /** - * Creates a merge strategy which uses update semantics to perform a merge operation. If - * 'BatchTransform' function is provided, it will be called to transform batched objects before - * passing them to the 'update'. + * Creates a merge strategy which uses update semantics to perform a merge operation. */ -MergeStrategy makeUpdateStrategy(UpsertType upsert, BatchTransform transform) { - return [upsert, transform]( - const auto& expCtx, const auto& ns, const auto& wc, auto epoch, auto&& batch) { - if (transform) { - transform(batch); - } - +MergeStrategy makeUpdateStrategy() { + return [](const auto& expCtx, + const auto& ns, + const auto& wc, + auto epoch, + auto&& batch, + UpsertType upsert) { constexpr auto multi = false; uassertStatusOK(expCtx->mongoProcessInterface->update( expCtx, ns, std::move(batch), wc, upsert, multi, epoch)); @@ -103,16 +101,15 @@ MergeStrategy makeUpdateStrategy(UpsertType upsert, BatchTransform transform) { * that each document in the batch has a matching document in the 'ns' collection (note that a * matching document may not be modified as a result of an update operation, yet it still will be * counted as matching). If at least one document doesn't have a match, this strategy returns an - * error. If 'BatchTransform' function is provided, it will be called to transform batched objects - * before passing them to the 'update'. + * error. */ -MergeStrategy makeStrictUpdateStrategy(UpsertType upsert, BatchTransform transform) { - return [upsert, transform]( - const auto& expCtx, const auto& ns, const auto& wc, auto epoch, auto&& batch) { - if (transform) { - transform(batch); - } - +MergeStrategy makeStrictUpdateStrategy() { + return [](const auto& expCtx, + const auto& ns, + const auto& wc, + auto epoch, + auto&& batch, + UpsertType upsert) { const int64_t batchSize = batch.size(); constexpr auto multi = false; auto updateResult = uassertStatusOK(expCtx->mongoProcessInterface->update( @@ -128,7 +125,12 @@ MergeStrategy makeStrictUpdateStrategy(UpsertType upsert, BatchTransform transfo * Creates a merge strategy which uses insert semantics to perform a merge operation. */ MergeStrategy makeInsertStrategy() { - return [](const auto& expCtx, const auto& ns, const auto& wc, auto epoch, auto&& batch) { + return [](const auto& expCtx, + const auto& ns, + const auto& wc, + auto epoch, + auto&& batch, + UpsertType upsertType) { std::vector<BSONObj> objectsToInsert(batch.size()); // The batch stores replacement style updates, but for this "insert" style of $merge we'd // like to just insert the new document without attempting any sort of replacement. @@ -141,15 +143,13 @@ MergeStrategy makeInsertStrategy() { } /** - * Creates a batched objects transformation function which wraps each element of the - * 'batch.modifications' array into the given 'updateOp' operator. + * Creates a batched object transformation function which wraps 'obj' into the given 'updateOp' + * operator. */ BatchTransform makeUpdateTransform(const std::string& updateOp) { - return [updateOp](auto& batch) { - for (auto&& obj : batch) { - std::get<UpdateModification>(obj) = - BSON(updateOp << std::get<UpdateModification>(obj).getUpdateClassic()); - } + return [updateOp](auto& obj) { + std::get<UpdateModification>(obj) = + BSON(updateOp << std::get<UpdateModification>(obj).getUpdateClassic()); }; } @@ -173,48 +173,67 @@ const MergeStrategyDescriptorsMap& getDescriptors() { {kReplaceInsertMode, {kReplaceInsertMode, {ActionType::insert, ActionType::update}, - makeUpdateStrategy(UpsertType::kGenerateNewDoc, {})}}, + makeUpdateStrategy(), + {}, + UpsertType::kGenerateNewDoc}}, // whenMatched: replace, whenNotMatched: fail {kReplaceFailMode, - {kReplaceFailMode, {ActionType::update}, makeStrictUpdateStrategy(UpsertType::kNone, {})}}, + {kReplaceFailMode, + {ActionType::update}, + makeStrictUpdateStrategy(), + {}, + UpsertType::kNone}}, // whenMatched: replace, whenNotMatched: discard {kReplaceDiscardMode, - {kReplaceDiscardMode, {ActionType::update}, makeUpdateStrategy(UpsertType::kNone, {})}}, + {kReplaceDiscardMode, {ActionType::update}, makeUpdateStrategy(), {}, UpsertType::kNone}}, // whenMatched: merge, whenNotMatched: insert {kMergeInsertMode, {kMergeInsertMode, {ActionType::insert, ActionType::update}, - makeUpdateStrategy(UpsertType::kGenerateNewDoc, makeUpdateTransform("$set"))}}, + makeUpdateStrategy(), + makeUpdateTransform("$set"), + UpsertType::kGenerateNewDoc}}, // whenMatched: merge, whenNotMatched: fail {kMergeFailMode, {kMergeFailMode, {ActionType::update}, - makeStrictUpdateStrategy(UpsertType::kNone, makeUpdateTransform("$set"))}}, + makeStrictUpdateStrategy(), + makeUpdateTransform("$set"), + UpsertType::kNone}}, // whenMatched: merge, whenNotMatched: discard {kMergeDiscardMode, {kMergeDiscardMode, {ActionType::update}, - makeUpdateStrategy(UpsertType::kNone, makeUpdateTransform("$set"))}}, + makeUpdateStrategy(), + makeUpdateTransform("$set"), + UpsertType::kNone}}, // whenMatched: keepExisting, whenNotMatched: insert {kKeepExistingInsertMode, {kKeepExistingInsertMode, {ActionType::insert, ActionType::update}, - makeUpdateStrategy(UpsertType::kGenerateNewDoc, makeUpdateTransform("$setOnInsert"))}}, + makeUpdateStrategy(), + makeUpdateTransform("$setOnInsert"), + UpsertType::kGenerateNewDoc}}, // whenMatched: [pipeline], whenNotMatched: insert {kPipelineInsertMode, {kPipelineInsertMode, {ActionType::insert, ActionType::update}, - makeUpdateStrategy(UpsertType::kInsertSuppliedDoc, {})}}, + makeUpdateStrategy(), + {}, + UpsertType::kInsertSuppliedDoc}}, // whenMatched: [pipeline], whenNotMatched: fail {kPipelineFailMode, {kPipelineFailMode, {ActionType::update}, - makeStrictUpdateStrategy(UpsertType::kNone, {})}}, + makeStrictUpdateStrategy(), + {}, + UpsertType::kNone}}, // whenMatched: [pipeline], whenNotMatched: discard {kPipelineDiscardMode, - {kPipelineDiscardMode, {ActionType::update}, makeUpdateStrategy(UpsertType::kNone, {})}}, + {kPipelineDiscardMode, {ActionType::update}, makeUpdateStrategy(), {}, UpsertType::kNone}}, // whenMatched: fail, whenNotMatched: insert - {kFailInsertMode, {kFailInsertMode, {ActionType::insert}, makeInsertStrategy()}}}; + {kFailInsertMode, + {kFailInsertMode, {ActionType::insert}, makeInsertStrategy(), {}, UpsertType::kNone}}}; return mergeStrategyDescriptors; } @@ -506,8 +525,35 @@ std::pair<DocumentSourceMerge::BatchObject, int> DocumentSourceMerge::makeBatchO auto mergeOnFields = extractMergeOnFieldsFromDoc(doc, _mergeOnFields); auto mod = makeBatchUpdateModification(doc); auto vars = resolveLetVariablesIfNeeded(doc); - auto modSize = mod.objsize() + (vars ? vars->objsize() : 0); - return {{std::move(mergeOnFields), std::move(mod), std::move(vars)}, modSize}; + BatchObject batchObject{std::move(mergeOnFields), std::move(mod), std::move(vars)}; + if (_descriptor.transform) { + _descriptor.transform(batchObject); + } + + invariant(_writeSizeEstimator); + return {batchObject, + _writeSizeEstimator->estimateUpdateSizeBytes(batchObject, _descriptor.upsertType)}; +} + +void DocumentSourceMerge::spill(BatchedObjects&& batch) { + DocumentSourceWriteBlock writeBlock(pExpCtx->opCtx); + + try { + auto targetEpoch = _targetCollectionVersion + ? boost::optional<OID>(_targetCollectionVersion->epoch()) + : boost::none; + + _descriptor.strategy(pExpCtx, + _outputNs, + _writeConcern, + targetEpoch, + std::move(batch), + _descriptor.upsertType); + } catch (const ExceptionFor<ErrorCodes::ImmutableField>& ex) { + uassertStatusOKWithContext(ex.toStatus(), + "$merge failed to update the matching document, did you " + "attempt to modify the _id or the shard key?"); + } } void DocumentSourceMerge::waitWhileFailPointEnabled() { diff --git a/src/mongo/db/pipeline/document_source_merge.h b/src/mongo/db/pipeline/document_source_merge.h index 464b4cfdaa0..e563fa0d2cf 100644 --- a/src/mongo/db/pipeline/document_source_merge.h +++ b/src/mongo/db/pipeline/document_source_merge.h @@ -44,24 +44,31 @@ class DocumentSourceMerge final : public DocumentSourceWriter<MongoProcessInterf public: static constexpr StringData kStageName = "$merge"_sd; - // A descriptor for a merge strategy. Holds a merge strategy function and a set of actions - // the client should be authorized to perform in order to be able to execute a merge operation - // using this merge strategy. + using BatchTransform = std::function<void(MongoProcessInterface::BatchObject&)>; + + // A descriptor for a merge strategy. Holds a merge strategy function and a set of actions the + // client should be authorized to perform in order to be able to execute a merge operation using + // this merge strategy. If a 'BatchTransform' function is provided, it will be called when + // constructing a batch object to transform updates. struct MergeStrategyDescriptor { using WhenMatched = MergeWhenMatchedModeEnum; using WhenNotMatched = MergeWhenNotMatchedModeEnum; using MergeMode = std::pair<WhenMatched, WhenNotMatched>; + using UpsertType = MongoProcessInterface::UpsertType; // A function encapsulating a merge strategy for the $merge stage based on the pair of // whenMatched/whenNotMatched modes. using MergeStrategy = std::function<void(const boost::intrusive_ptr<ExpressionContext>&, const NamespaceString&, const WriteConcernOptions&, boost::optional<OID>, - BatchedObjects&&)>; + BatchedObjects&&, + UpsertType upsert)>; MergeMode mode; ActionSet actions; MergeStrategy strategy; + BatchTransform transform; + UpsertType upsertType; }; /** @@ -214,21 +221,7 @@ private: return bob.obj(); } - void spill(BatchedObjects&& batch) override { - DocumentSourceWriteBlock writeBlock(pExpCtx->opCtx); - - try { - auto targetEpoch = _targetCollectionVersion - ? boost::optional<OID>(_targetCollectionVersion->epoch()) - : boost::none; - - _descriptor.strategy(pExpCtx, _outputNs, _writeConcern, targetEpoch, std::move(batch)); - } catch (const ExceptionFor<ErrorCodes::ImmutableField>& ex) { - uassertStatusOKWithContext(ex.toStatus(), - "$merge failed to update the matching document, did you " - "attempt to modify the _id or the shard key?"); - } - } + void spill(BatchedObjects&& batch) override; void waitWhileFailPointEnabled() override; diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h index b01182ffcbd..10795a2c135 100644 --- a/src/mongo/db/pipeline/document_source_out.h +++ b/src/mongo/db/pipeline/document_source_out.h @@ -129,7 +129,8 @@ private: std::pair<BSONObj, int> makeBatchObject(Document&& doc) const override { auto obj = doc.toBson(); - return {obj, obj.objsize()}; + invariant(_writeSizeEstimator); + return {obj, _writeSizeEstimator->estimateInsertSizeBytes(obj)}; } void waitWhileFailPointEnabled() override; diff --git a/src/mongo/db/pipeline/document_source_writer.h b/src/mongo/db/pipeline/document_source_writer.h index 46bd0bf29eb..e0739382c7f 100644 --- a/src/mongo/db/pipeline/document_source_writer.h +++ b/src/mongo/db/pipeline/document_source_writer.h @@ -94,13 +94,14 @@ public: const boost::intrusive_ptr<ExpressionContext>& expCtx) : DocumentSource(stageName, expCtx), _outputNs(std::move(outputNs)), - _writeConcern(expCtx->opCtx->getWriteConcern()) { + _writeConcern(expCtx->opCtx->getWriteConcern()), + _writeSizeEstimator( + expCtx->mongoProcessInterface->getWriteSizeEstimator(expCtx->opCtx, outputNs)) { uassert(31476, "Cluster must be fully upgraded to version 4.4 with featureCompatibilityVersion" " 4.4 before attempting to run $out/$merge with a non-primary read preference", pExpCtx->mongoProcessInterface->supportsReadPreferenceForWriteOp(pExpCtx)); } - DepsTracker::State getDependencies(DepsTracker* deps) const override { deps->needWholeDocument = true; return DepsTracker::State::EXHAUSTIVE_ALL; @@ -164,6 +165,9 @@ protected: // respect the writeConcern of the original command. WriteConcernOptions _writeConcern; + // An interface that is used to estimate the size of each write operation. + const std::unique_ptr<MongoProcessInterface::WriteSizeEstimator> _writeSizeEstimator; + private: bool _initialized{false}; bool _done{false}; diff --git a/src/mongo/db/pipeline/process_interface/common_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_process_interface.cpp index 5cd9327fafa..b43902c3f42 100644 --- a/src/mongo/db/pipeline/process_interface/common_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/common_process_interface.cpp @@ -149,6 +149,12 @@ std::vector<FieldPath> CommonProcessInterface::collectDocumentKeyFieldsActingAsR return {"_id"}; } +std::unique_ptr<CommonProcessInterface::WriteSizeEstimator> +CommonProcessInterface::getWriteSizeEstimator(OperationContext* opCtx, + const NamespaceString& ns) const { + return std::make_unique<LocalWriteSizeEstimator>(); +} + void CommonProcessInterface::updateClientOperationTime(OperationContext* opCtx) const { // In order to support causal consistency in a replica set or a sharded cluster when reading // with secondary read preference, the secondary must propagate the primary's operation time diff --git a/src/mongo/db/pipeline/process_interface/common_process_interface.h b/src/mongo/db/pipeline/process_interface/common_process_interface.h index 75c3e01c513..7900054aa34 100644 --- a/src/mongo/db/pipeline/process_interface/common_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/common_process_interface.h @@ -46,6 +46,49 @@ public: virtual ~CommonProcessInterface() = default; /** + * Estimates the size of writes that will be executed on the current node. Note that this + * does not account for the full size of an update statement. + */ + class LocalWriteSizeEstimator final : public WriteSizeEstimator { + public: + int estimateInsertSizeBytes(const BSONObj& insert) const override { + return insert.objsize(); + } + + int estimateUpdateSizeBytes(const BatchObject& batchObject, + UpsertType type) const override { + int size = std::get<write_ops::UpdateModification>(batchObject).objsize(); + if (auto vars = std::get<boost::optional<BSONObj>>(batchObject)) { + size += vars->objsize(); + } + return size; + } + }; + + /** + * Estimate the size of writes that will be sent to the replica set primary. + */ + class TargetPrimaryWriteSizeEstimator final : public WriteSizeEstimator { + public: + int estimateInsertSizeBytes(const BSONObj& insert) const override { + return insert.objsize() + write_ops::kWriteCommandBSONArrayPerElementOverheadBytes; + } + + int estimateUpdateSizeBytes(const BatchObject& batchObject, + UpsertType type) const override { + return write_ops::getUpdateSizeEstimate( + std::get<BSONObj>(batchObject), + std::get<write_ops::UpdateModification>(batchObject), + std::get<boost::optional<BSONObj>>(batchObject), + type != UpsertType::kNone /* includeUpsertSupplied */, + boost::none /* collation */, + boost::none /* arrayFilters */, + BSONObj() /* hint*/) + + write_ops::kWriteCommandBSONArrayPerElementOverheadBytes; + } + }; + + /** * Returns true if the field names of 'keyPattern' are exactly those in 'uniqueKeyPaths', and * each of the elements of 'keyPattern' is numeric, i.e. not "text", "$**", or any other special * type of index. @@ -64,6 +107,9 @@ public: virtual std::vector<FieldPath> collectDocumentKeyFieldsActingAsRouter( OperationContext*, const NamespaceString&) const override; + std::unique_ptr<WriteSizeEstimator> getWriteSizeEstimator( + OperationContext* opCtx, const NamespaceString& ns) const override; + virtual void updateClientOperationTime(OperationContext* opCtx) const final; boost::optional<ChunkVersion> refreshAndGetCollectionVersion( diff --git a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h index e10d959c932..9a29d0b5df9 100644 --- a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h @@ -74,7 +74,7 @@ class MongoProcessInterface { public: /** * Storage for a batch of BSON Objects to be updated in the write namespace. For each element - * in the batch we store a tuple of the folliwng elements: + * in the batch we store a tuple of the following elements: * 1. BSONObj - specifies the query that identifies a document in the to collection to be * updated. * 2. write_ops::UpdateModification - either the new document we want to upsert or insert into @@ -102,6 +102,19 @@ public: enum class CurrentOpBacktraceMode { kIncludeBacktrace, kExcludeBacktrace }; /** + * Interface which estimates the size of a given write operation. + */ + class WriteSizeEstimator { + public: + virtual ~WriteSizeEstimator() = default; + + virtual int estimateInsertSizeBytes(const BSONObj& insert) const = 0; + + virtual int estimateUpdateSizeBytes(const BatchObject& batchObject, + UpsertType type) const = 0; + }; + + /** * Factory function to create MongoProcessInterface of the right type. The implementation will * be installed by a lib higher up in the link graph depending on the application type. */ @@ -123,6 +136,12 @@ public: virtual ~MongoProcessInterface(){}; /** + * Returns an instance of a 'WriteSizeEstimator' interface. + */ + virtual std::unique_ptr<WriteSizeEstimator> getWriteSizeEstimator( + OperationContext* opCtx, const NamespaceString& ns) const = 0; + + /** * Creates a new TransactionHistoryIterator object. Only applicable in processes which support * locally traversing the oplog. */ diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp index a9fd76aabb3..e5d497aa0a2 100644 --- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp @@ -105,6 +105,12 @@ bool supportsUniqueKey(const boost::intrusive_ptr<ExpressionContext>& expCtx, } // namespace +std::unique_ptr<CommonProcessInterface::WriteSizeEstimator> +MongosProcessInterface::getWriteSizeEstimator(OperationContext* opCtx, + const NamespaceString& ns) const { + return std::make_unique<TargetPrimaryWriteSizeEstimator>(); +} + std::unique_ptr<Pipeline, PipelineDeleter> MongosProcessInterface::attachCursorSourceToPipeline( Pipeline* ownedPipeline, bool allowTargetingShards) { // On mongos we can't have local cursors. diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h index 639d17bf4f3..7141915b7d3 100644 --- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h @@ -45,6 +45,9 @@ public: virtual ~MongosProcessInterface() = default; + std::unique_ptr<WriteSizeEstimator> getWriteSizeEstimator( + OperationContext* opCtx, const NamespaceString& ns) const final; + boost::optional<Document> lookupSingleDocument( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss, diff --git a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h index 0d954fd1035..b8e14a5a52e 100644 --- a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h @@ -44,6 +44,15 @@ class ReplicaSetNodeProcessInterface final : public NonShardServerProcessInterfa public: using NonShardServerProcessInterface::NonShardServerProcessInterface; + std::unique_ptr<WriteSizeEstimator> getWriteSizeEstimator( + OperationContext* opCtx, const NamespaceString& ns) const override { + if (_canWriteLocally(opCtx, ns)) { + return std::make_unique<LocalWriteSizeEstimator>(); + } else { + return std::make_unique<TargetPrimaryWriteSizeEstimator>(); + } + } + static std::shared_ptr<executor::TaskExecutor> getReplicaSetNodeExecutor( ServiceContext* service); diff --git a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h index 1b0b4863ef8..fcf0458ea3c 100644 --- a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h @@ -53,6 +53,22 @@ public: MONGO_UNREACHABLE; } + class StubWriteSizeEstimator final : public WriteSizeEstimator { + public: + int estimateInsertSizeBytes(const BSONObj& insert) const override { + MONGO_UNREACHABLE; + } + + int estimateUpdateSizeBytes(const BatchObject& batchObject, + UpsertType type) const override { + MONGO_UNREACHABLE; + } + }; + std::unique_ptr<WriteSizeEstimator> getWriteSizeEstimator( + OperationContext* opCtx, const NamespaceString& ns) const override { + return std::make_unique<StubWriteSizeEstimator>(); + } + bool isSharded(OperationContext* opCtx, const NamespaceString& ns) override { return false; } diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp index 338e208a99f..4cee9a41252 100644 --- a/src/mongo/s/write_ops/batch_write_op.cpp +++ b/src/mongo/s/write_ops/batch_write_op.cpp @@ -65,7 +65,6 @@ struct WriteErrorDetailComp { // batches before serializing. // // TODO: Revisit when we revisit command limits in general -const int kEstUpdateOverheadBytes = (BSONObjMaxInternalSize - BSONObjMaxUserSize) / 100; const int kEstDeleteOverheadBytes = (BSONObjMaxInternalSize - BSONObjMaxUserSize) / 100; /** @@ -171,41 +170,17 @@ int getWriteSizeBytes(const WriteOp& writeOp) { return item.getDocument().objsize(); } else if (batchType == BatchedCommandRequest::BatchType_Update) { // Note: Be conservative here - it's okay if we send slightly too many batches. - auto estSize = static_cast<int>(BSONObj::kMinBSONLength); - static const auto boolSize = 1; - - // Add the size of the 'collation' field, if present. - estSize += !item.getUpdate().getCollation() ? 0 - : (UpdateOpEntry::kCollationFieldName.size() + - item.getUpdate().getCollation()->objsize()); - - // Add the size of the 'arrayFilters' field, if present. - estSize += !item.getUpdate().getArrayFilters() ? 0 : ([&item]() { - auto size = BSONObj::kMinBSONLength + UpdateOpEntry::kArrayFiltersFieldName.size(); - for (auto&& filter : *item.getUpdate().getArrayFilters()) { - size += filter.objsize(); - } - return size; - })(); - - // Add the sizes of the 'multi' and 'upsert' fields. - estSize += UpdateOpEntry::kUpsertFieldName.size() + boolSize; - estSize += UpdateOpEntry::kMultiFieldName.size() + boolSize; - - // Add the sizes of the 'q' and 'u' fields. - estSize += (UpdateOpEntry::kQFieldName.size() + item.getUpdate().getQ().objsize() + - UpdateOpEntry::kUFieldName.size() + item.getUpdate().getU().objsize()); - - // Add the size of the 'c' field if present. - if (auto constants = item.getUpdate().getC()) { - estSize += UpdateOpEntry::kCFieldName.size() + item.getUpdate().getC()->objsize(); - } - - // Finally, add the constant updateOp overhead size. - estSize += kEstUpdateOverheadBytes; + const auto& update = item.getUpdate(); + auto estSize = write_ops::getUpdateSizeEstimate(update.getQ(), + update.getU(), + update.getC(), + update.getUpsertSupplied().has_value(), + update.getCollation(), + update.getArrayFilters(), + update.getHint()); // When running a debug build, verify that estSize is at least the BSON serialization size. - dassert(estSize >= item.getUpdate().toBSON().objsize()); + dassert(estSize >= update.toBSON().objsize()); return estSize; } else if (batchType == BatchedCommandRequest::BatchType_Delete) { // Note: Be conservative here - it's okay if we send slightly too many batches. |