diff options
author | Uladzimir Makouski <uladzimir.makouski@mongodb.com> | 2022-07-28 09:42:17 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-07-28 10:48:12 +0000 |
commit | 4e9dd74af2467b9c04e7014c0f754465321da8a9 (patch) | |
tree | 6af10dea03d18cee43ac130e5526438048504457 | |
parent | 042a60c450b5f48a27171cf4150a5bf850efc79a (diff) | |
download | mongo-4e9dd74af2467b9c04e7014c0f754465321da8a9.tar.gz |
Revert "SERVER-66289 Update write size estimation logic in DocumentSourceWriter"
This reverts commit 7b7fe658db948e6f5a4a6c30d4590d7866c59371.
17 files changed, 97 insertions, 395 deletions
diff --git a/jstests/aggregation/sources/merge/mode_merge_fail.js b/jstests/aggregation/sources/merge/mode_merge_fail.js index fcc13f8e871..7235c8e1c7e 100644 --- a/jstests/aggregation/sources/merge/mode_merge_fail.js +++ b/jstests/aggregation/sources/merge/mode_merge_fail.js @@ -97,13 +97,9 @@ const pipeline = [mergeStage]; // and updated. (function testMergeUnorderedBatchUpdate() { const maxBatchSize = 16 * 1024 * 1024; // 16MB - - // Each document is just under 1MB in order to allow for some extra space for writes that need - // to be serialized over the wire in certain cluster configurations. Otherwise, the number of - // modified/unmodified documents would be off by one depending on how our cluster is configured. - const docSize = 1024 * 1023; + const docSize = 1024 * 1024; // 1MB const numDocs = 20; - const maxDocsInBatch = Math.floor(maxBatchSize / docSize); + const maxDocsInBatch = maxBatchSize / docSize; assert(source.drop()); dropWithoutImplicitRecreate(target.getName()); diff --git a/jstests/aggregation/sources/merge/mode_replace_fail.js b/jstests/aggregation/sources/merge/mode_replace_fail.js index cc3df1b1ce7..88582238c8f 100644 --- a/jstests/aggregation/sources/merge/mode_replace_fail.js +++ b/jstests/aggregation/sources/merge/mode_replace_fail.js @@ -90,13 +90,9 @@ const pipeline = [mergeStage]; // and updated. (function testMergeUnorderedBatchUpdate() { const maxBatchSize = 16 * 1024 * 1024; // 16MB - - // Each document is just under 1MB in order to allow for some extra space for writes that need - // to be serialized over the wire in certain cluster configurations. Otherwise, the number of - // modified/unmodified documents would be off by one depending on how our cluster is configured. - const docSize = 1024 * 1023; // 1MB + const docSize = 1024 * 1024; // 1MB const numDocs = 20; - const maxDocsInBatch = Math.floor(maxBatchSize / docSize); + const maxDocsInBatch = maxBatchSize / docSize; assert(source.drop()); dropWithoutImplicitRecreate(target.getName()); diff --git a/jstests/noPassthrough/out_merge_on_secondary_batch_write.js b/jstests/noPassthrough/out_merge_on_secondary_batch_write.js deleted file mode 100644 index 56dd050630c..00000000000 --- a/jstests/noPassthrough/out_merge_on_secondary_batch_write.js +++ /dev/null @@ -1,111 +0,0 @@ -/** - * Test which verifies that $out/$merge aggregations with secondary read preference which write - * over 16 MB work as expected (especially with respect to producing correctly sized write batches). - * - * @tags: [uses_$out, assumes_read_preference_unchanged] - */ -(function() { -const dbName = "db"; -const collName = "movies"; -const targetCollName = "movies2"; - -function testFn(db) { - const coll = db[collName]; - coll.drop(); - db[targetCollName].drop(); - - // Insert 4 MB more than the maximum bytes allowed in a single write batch worth of data - // serialized as a single BSONObj. - const hello = db.hello(); - const maxBatchSize = hello.maxWriteBatchSize; - const totalDataSize = hello.maxBsonObjectSize + (4 * 1024 * 1024); - const sizePerDoc = totalDataSize / maxBatchSize; - const bigString = "a".repeat(sizePerDoc); - const bulk = coll.initializeUnorderedBulkOp(); - - for (let i = 0; i < maxBatchSize; ++i) { - bulk.insert({_id: NumberInt(i), foo: bigString}); - } - assert.commandWorked(bulk.execute({w: "majority"})); - - function defaultSetUpFn(db) { - db[targetCollName].drop({writeConcern: {w: "majority"}}); - } - - function cleanUpFn(db) { - db[targetCollName].drop({writeConcern: {w: "majority"}}); - } - - function testWriteAggSpec(aggWriteStageSpec, setUpFn, errorCodes = []) { - // Run 'aggWriteStageSpec' with both primary and secondary read preference. - for (const readPref of ["primary", "secondary"]) { - jsTestLog("Testing " + tojson(aggWriteStageSpec) + " with read preference " + readPref); - setUpFn(db); - - // If the caller provided some error codes, assert that the command failed with one - // of these codes. - const fn = () => - db[collName] - .aggregate([aggWriteStageSpec], {$readPreference: {mode: readPref}}) - .itcount(); - const errMsg = "Failed to run aggregate with read preference " + readPref; - if (errorCodes.length > 0) { - assert.throwsWithCode(fn, errorCodes, [] /* params */, errMsg); - } else { - assert.doesNotThrow(fn, [] /* params */, errMsg); - } - cleanUpFn(db); - } - } - - // Set up documents in the output collection so that $merge will perform updates. - function mergeUpdateSetupFn(db) { - defaultSetUpFn(db); - const bulk = db[targetCollName].initializeUnorderedBulkOp(); - for (let i = 0; i < maxBatchSize; ++i) { - bulk.insert({_id: NumberInt(i), extraField: i * 3}); - } - assert.commandWorked(bulk.execute({w: "majority"})); - } - - testWriteAggSpec({$out: targetCollName}, defaultSetUpFn); - testWriteAggSpec( - {$merge: {into: targetCollName, whenMatched: "replace", whenNotMatched: "insert"}}, - defaultSetUpFn); - testWriteAggSpec( - {$merge: {into: targetCollName, whenMatched: "merge", whenNotMatched: "insert"}}, - defaultSetUpFn); - testWriteAggSpec( - {$merge: {into: targetCollName, whenMatched: "keepExisting", whenNotMatched: "insert"}}, - defaultSetUpFn); - testWriteAggSpec( - {$merge: {into: targetCollName, whenMatched: "merge", whenNotMatched: "insert", on: "_id"}}, - mergeUpdateSetupFn); - - // Failure cases. - testWriteAggSpec( - {$merge: {into: targetCollName, whenMatched: "replace", whenNotMatched: "fail", on: "_id"}}, - defaultSetUpFn, - [ErrorCodes.MergeStageNoMatchingDocument]); - testWriteAggSpec( - {$merge: {into: targetCollName, whenMatched: "merge", whenNotMatched: "fail", on: "_id"}}, - defaultSetUpFn, - [ErrorCodes.MergeStageNoMatchingDocument]); - testWriteAggSpec( - {$merge: {into: targetCollName, whenMatched: "fail", whenNotMatched: "insert", on: "_id"}}, - mergeUpdateSetupFn, - [ErrorCodes.DuplicateKey]); -} - -jsTestLog("Testing against a replica set"); -const rst = new ReplSetTest({nodes: 2}); -rst.startSet(); -rst.initiate(); -testFn(new Mongo(rst.getURL()).getDB(dbName)); -rst.stopSet(); - -jsTestLog("Testing against a sharded cluster"); -const st = new ShardingTest({shards: 1, rs: {nodes: 2}}); -testFn(st.s.getDB(dbName)); -st.stop(); -}()); diff --git a/src/mongo/db/ops/write_ops.cpp b/src/mongo/db/ops/write_ops.cpp index f0ee04348b2..0ef8f329558 100644 --- a/src/mongo/db/ops/write_ops.cpp +++ b/src/mongo/db/ops/write_ops.cpp @@ -136,65 +136,6 @@ int32_t getStmtIdForWriteAt(const WriteCommandRequestBase& writeCommandBase, siz return kFirstStmtId + writePos; } -int getUpdateSizeEstimate(const BSONObj& q, - const write_ops::UpdateModification& u, - const boost::optional<mongo::BSONObj>& c, - const bool includeUpsertSupplied, - const boost::optional<mongo::BSONObj>& collation, - const boost::optional<std::vector<mongo::BSONObj>>& arrayFilters, - const mongo::BSONObj& hint) { - using UpdateOpEntry = write_ops::UpdateOpEntry; - - // This constant accounts for the null terminator in each field name and the BSONType byte for - // each element. - static const int kPerElementOverhead = 2; - static const int kBoolSize = 1; - int estSize = static_cast<int>(BSONObj::kMinBSONLength); - - // Add the sizes of the 'multi' and 'upsert' fields. - estSize += UpdateOpEntry::kUpsertFieldName.size() + kBoolSize + kPerElementOverhead; - estSize += UpdateOpEntry::kMultiFieldName.size() + kBoolSize + kPerElementOverhead; - - // Add the size of 'upsertSupplied' field if present. - if (includeUpsertSupplied) { - estSize += UpdateOpEntry::kUpsertSuppliedFieldName.size() + kBoolSize + kPerElementOverhead; - } - - // Add the sizes of the 'q' and 'u' fields. - estSize += (UpdateOpEntry::kQFieldName.size() + q.objsize() + kPerElementOverhead + - UpdateOpEntry::kUFieldName.size() + u.objsize() + kPerElementOverhead); - - // Add the size of the 'c' field, if present. - if (c) { - estSize += (UpdateOpEntry::kCFieldName.size() + c->objsize() + kPerElementOverhead); - } - - // Add the size of the 'collation' field, if present. - if (collation) { - estSize += (UpdateOpEntry::kCollationFieldName.size() + collation->objsize() + - kPerElementOverhead); - } - - // Add the size of the 'arrayFilters' field, if present. - if (arrayFilters) { - estSize += ([&]() { - auto size = BSONObj::kMinBSONLength + UpdateOpEntry::kArrayFiltersFieldName.size() + - kPerElementOverhead; - for (auto&& filter : *arrayFilters) { - size += filter.objsize(); - } - return size; - })(); - } - - // Add the size of 'hint' field if present. - if (!hint.isEmpty()) { - estSize += UpdateOpEntry::kHintFieldName.size() + hint.objsize() + kPerElementOverhead; - } - - return estSize; -} - bool isClassicalUpdateReplacement(const BSONObj& update) { // An empty update object will be treated as replacement as firstElementFieldName() returns "". return update.firstElementFieldName()[0] != '$'; diff --git a/src/mongo/db/ops/write_ops.h b/src/mongo/db/ops/write_ops.h index a1201412a33..d78791fdfdc 100644 --- a/src/mongo/db/ops/write_ops.h +++ b/src/mongo/db/ops/write_ops.h @@ -104,18 +104,6 @@ const std::vector<BSONObj>& arrayFiltersOf(const T& opEntry) { } /** - * Utility which estimates the size in bytes of an update statement with the given parameters, when - * serialized in the format used for the update command. - */ -int getUpdateSizeEstimate(const BSONObj& q, - const write_ops::UpdateModification& u, - const boost::optional<mongo::BSONObj>& c, - bool includeUpsertSupplied, - const boost::optional<mongo::BSONObj>& collation, - const boost::optional<std::vector<mongo::BSONObj>>& arrayFilters, - const mongo::BSONObj& hint); - -/** * If the response from a write command contains any write errors, it will throw the first one. All * the remaining errors will be disregarded. * diff --git a/src/mongo/db/pipeline/document_source_merge.cpp b/src/mongo/db/pipeline/document_source_merge.cpp index 28bb4e31ff3..ae413052ddf 100644 --- a/src/mongo/db/pipeline/document_source_merge.cpp +++ b/src/mongo/db/pipeline/document_source_merge.cpp @@ -62,7 +62,7 @@ using MergeStrategy = MergeStrategyDescriptor::MergeStrategy; using MergeStrategyDescriptorsMap = std::map<const MergeMode, const MergeStrategyDescriptor>; using WhenMatched = MergeStrategyDescriptor::WhenMatched; using WhenNotMatched = MergeStrategyDescriptor::WhenNotMatched; -using BatchTransform = DocumentSourceMerge::BatchTransform; +using BatchTransform = std::function<void(DocumentSourceMerge::BatchedObjects&)>; using UpdateModification = write_ops::UpdateModification; using UpsertType = MongoProcessInterface::UpsertType; @@ -86,15 +86,17 @@ const auto kDefaultPipelineLet = BSON("new" << "$$ROOT"); /** - * Creates a merge strategy which uses update semantics to perform a merge operation. + * Creates a merge strategy which uses update semantics to perform a merge operation. If + * 'BatchTransform' function is provided, it will be called to transform batched objects before + * passing them to the 'update'. */ -MergeStrategy makeUpdateStrategy() { - return [](const auto& expCtx, - const auto& ns, - const auto& wc, - auto epoch, - auto&& batch, - UpsertType upsert) { +MergeStrategy makeUpdateStrategy(UpsertType upsert, BatchTransform transform) { + return [upsert, transform]( + const auto& expCtx, const auto& ns, const auto& wc, auto epoch, auto&& batch) { + if (transform) { + transform(batch); + } + constexpr auto multi = false; uassertStatusOK(expCtx->mongoProcessInterface->update( expCtx, ns, std::move(batch), wc, upsert, multi, epoch)); @@ -106,15 +108,16 @@ MergeStrategy makeUpdateStrategy() { * that each document in the batch has a matching document in the 'ns' collection (note that a * matching document may not be modified as a result of an update operation, yet it still will be * counted as matching). If at least one document doesn't have a match, this strategy returns an - * error. + * error. If 'BatchTransform' function is provided, it will be called to transform batched objects + * before passing them to the 'update'. */ -MergeStrategy makeStrictUpdateStrategy() { - return [](const auto& expCtx, - const auto& ns, - const auto& wc, - auto epoch, - auto&& batch, - UpsertType upsert) { +MergeStrategy makeStrictUpdateStrategy(UpsertType upsert, BatchTransform transform) { + return [upsert, transform]( + const auto& expCtx, const auto& ns, const auto& wc, auto epoch, auto&& batch) { + if (transform) { + transform(batch); + } + const int64_t batchSize = batch.size(); constexpr auto multi = false; auto updateResult = uassertStatusOK(expCtx->mongoProcessInterface->update( @@ -130,12 +133,7 @@ MergeStrategy makeStrictUpdateStrategy() { * Creates a merge strategy which uses insert semantics to perform a merge operation. */ MergeStrategy makeInsertStrategy() { - return [](const auto& expCtx, - const auto& ns, - const auto& wc, - auto epoch, - auto&& batch, - UpsertType upsertType) { + return [](const auto& expCtx, const auto& ns, const auto& wc, auto epoch, auto&& batch) { std::vector<BSONObj> objectsToInsert(batch.size()); // The batch stores replacement style updates, but for this "insert" style of $merge we'd // like to just insert the new document without attempting any sort of replacement. @@ -148,13 +146,15 @@ MergeStrategy makeInsertStrategy() { } /** - * Creates a batched object transformation function which wraps 'obj' into the given 'updateOp' - * operator. + * Creates a batched objects transformation function which wraps each element of the + * 'batch.modifications' array into the given 'updateOp' operator. */ BatchTransform makeUpdateTransform(const std::string& updateOp) { - return [updateOp](auto& obj) { - std::get<UpdateModification>(obj) = UpdateModification::parseFromClassicUpdate( - BSON(updateOp << std::get<UpdateModification>(obj).getUpdateReplacement())); + return [updateOp](auto& batch) { + for (auto&& obj : batch) { + std::get<UpdateModification>(obj) = UpdateModification::parseFromClassicUpdate( + BSON(updateOp << std::get<UpdateModification>(obj).getUpdateReplacement())); + } }; } @@ -178,67 +178,48 @@ const MergeStrategyDescriptorsMap& getDescriptors() { {kReplaceInsertMode, {kReplaceInsertMode, {ActionType::insert, ActionType::update}, - makeUpdateStrategy(), - {}, - UpsertType::kGenerateNewDoc}}, + makeUpdateStrategy(UpsertType::kGenerateNewDoc, {})}}, // whenMatched: replace, whenNotMatched: fail {kReplaceFailMode, - {kReplaceFailMode, - {ActionType::update}, - makeStrictUpdateStrategy(), - {}, - UpsertType::kNone}}, + {kReplaceFailMode, {ActionType::update}, makeStrictUpdateStrategy(UpsertType::kNone, {})}}, // whenMatched: replace, whenNotMatched: discard {kReplaceDiscardMode, - {kReplaceDiscardMode, {ActionType::update}, makeUpdateStrategy(), {}, UpsertType::kNone}}, + {kReplaceDiscardMode, {ActionType::update}, makeUpdateStrategy(UpsertType::kNone, {})}}, // whenMatched: merge, whenNotMatched: insert {kMergeInsertMode, {kMergeInsertMode, {ActionType::insert, ActionType::update}, - makeUpdateStrategy(), - makeUpdateTransform("$set"), - UpsertType::kGenerateNewDoc}}, + makeUpdateStrategy(UpsertType::kGenerateNewDoc, makeUpdateTransform("$set"))}}, // whenMatched: merge, whenNotMatched: fail {kMergeFailMode, {kMergeFailMode, {ActionType::update}, - makeStrictUpdateStrategy(), - makeUpdateTransform("$set"), - UpsertType::kNone}}, + makeStrictUpdateStrategy(UpsertType::kNone, makeUpdateTransform("$set"))}}, // whenMatched: merge, whenNotMatched: discard {kMergeDiscardMode, {kMergeDiscardMode, {ActionType::update}, - makeUpdateStrategy(), - makeUpdateTransform("$set"), - UpsertType::kNone}}, + makeUpdateStrategy(UpsertType::kNone, makeUpdateTransform("$set"))}}, // whenMatched: keepExisting, whenNotMatched: insert {kKeepExistingInsertMode, {kKeepExistingInsertMode, {ActionType::insert, ActionType::update}, - makeUpdateStrategy(), - makeUpdateTransform("$setOnInsert"), - UpsertType::kGenerateNewDoc}}, + makeUpdateStrategy(UpsertType::kGenerateNewDoc, makeUpdateTransform("$setOnInsert"))}}, // whenMatched: [pipeline], whenNotMatched: insert {kPipelineInsertMode, {kPipelineInsertMode, {ActionType::insert, ActionType::update}, - makeUpdateStrategy(), - {}, - UpsertType::kInsertSuppliedDoc}}, + makeUpdateStrategy(UpsertType::kInsertSuppliedDoc, {})}}, // whenMatched: [pipeline], whenNotMatched: fail {kPipelineFailMode, {kPipelineFailMode, {ActionType::update}, - makeStrictUpdateStrategy(), - {}, - UpsertType::kNone}}, + makeStrictUpdateStrategy(UpsertType::kNone, {})}}, // whenMatched: [pipeline], whenNotMatched: discard {kPipelineDiscardMode, - {kPipelineDiscardMode, {ActionType::update}, makeUpdateStrategy(), {}, UpsertType::kNone}}, + {kPipelineDiscardMode, {ActionType::update}, makeUpdateStrategy(UpsertType::kNone, {})}}, // whenMatched: fail, whenNotMatched: insert - {kFailInsertMode, - {kFailInsertMode, {ActionType::insert}, makeInsertStrategy(), {}, UpsertType::kNone}}}; + {kFailInsertMode, {kFailInsertMode, {ActionType::insert}, makeInsertStrategy()}}}; return mergeStrategyDescriptors; } @@ -564,14 +545,8 @@ std::pair<DocumentSourceMerge::BatchObject, int> DocumentSourceMerge::makeBatchO auto mergeOnFields = extractMergeOnFieldsFromDoc(doc, _mergeOnFields); auto mod = makeBatchUpdateModification(doc); auto vars = resolveLetVariablesIfNeeded(doc); - BatchObject batchObject{std::move(mergeOnFields), std::move(mod), std::move(vars)}; - if (_descriptor.transform) { - _descriptor.transform(batchObject); - } - - tassert(6628901, "_writeSizeEstimator should be initialized", _writeSizeEstimator); - return {batchObject, - _writeSizeEstimator->estimateUpdateSizeBytes(batchObject, _descriptor.upsertType)}; + auto modSize = mod.objsize() + (vars ? vars->objsize() : 0); + return {{std::move(mergeOnFields), std::move(mod), std::move(vars)}, modSize}; } void DocumentSourceMerge::spill(BatchedObjects&& batch) try { @@ -580,8 +555,7 @@ void DocumentSourceMerge::spill(BatchedObjects&& batch) try { ? boost::optional<OID>(_targetCollectionVersion->epoch()) : boost::none; - _descriptor.strategy( - pExpCtx, _outputNs, _writeConcern, targetEpoch, std::move(batch), _descriptor.upsertType); + _descriptor.strategy(pExpCtx, _outputNs, _writeConcern, targetEpoch, std::move(batch)); } catch (const ExceptionFor<ErrorCodes::ImmutableField>& ex) { uassertStatusOKWithContext(ex.toStatus(), "$merge failed to update the matching document, did you " diff --git a/src/mongo/db/pipeline/document_source_merge.h b/src/mongo/db/pipeline/document_source_merge.h index e3784ae545c..05a87f1e340 100644 --- a/src/mongo/db/pipeline/document_source_merge.h +++ b/src/mongo/db/pipeline/document_source_merge.h @@ -44,31 +44,24 @@ class DocumentSourceMerge final : public DocumentSourceWriter<MongoProcessInterf public: static constexpr StringData kStageName = "$merge"_sd; - using BatchTransform = std::function<void(MongoProcessInterface::BatchObject&)>; - - // A descriptor for a merge strategy. Holds a merge strategy function and a set of actions the - // client should be authorized to perform in order to be able to execute a merge operation using - // this merge strategy. If a 'BatchTransform' function is provided, it will be called when - // constructing a batch object to transform updates. + // A descriptor for a merge strategy. Holds a merge strategy function and a set of actions + // the client should be authorized to perform in order to be able to execute a merge operation + // using this merge strategy. struct MergeStrategyDescriptor { using WhenMatched = MergeWhenMatchedModeEnum; using WhenNotMatched = MergeWhenNotMatchedModeEnum; using MergeMode = std::pair<WhenMatched, WhenNotMatched>; - using UpsertType = MongoProcessInterface::UpsertType; // A function encapsulating a merge strategy for the $merge stage based on the pair of // whenMatched/whenNotMatched modes. using MergeStrategy = std::function<void(const boost::intrusive_ptr<ExpressionContext>&, const NamespaceString&, const WriteConcernOptions&, boost::optional<OID>, - BatchedObjects&&, - UpsertType upsert)>; + BatchedObjects&&)>; MergeMode mode; ActionSet actions; MergeStrategy strategy; - BatchTransform transform; - UpsertType upsertType; }; /** diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h index 602db5815f7..88883cb6231 100644 --- a/src/mongo/db/pipeline/document_source_out.h +++ b/src/mongo/db/pipeline/document_source_out.h @@ -131,8 +131,7 @@ private: std::pair<BSONObj, int> makeBatchObject(Document&& doc) const override { auto obj = doc.toBson(); - tassert(6628900, "_writeSizeEstimator should be initialized", _writeSizeEstimator); - return {obj, _writeSizeEstimator->estimateInsertSizeBytes(obj)}; + return {obj, obj.objsize()}; } void waitWhileFailPointEnabled() override; diff --git a/src/mongo/db/pipeline/document_source_writer.h b/src/mongo/db/pipeline/document_source_writer.h index 284699deb30..1965bd4dee6 100644 --- a/src/mongo/db/pipeline/document_source_writer.h +++ b/src/mongo/db/pipeline/document_source_writer.h @@ -104,9 +104,7 @@ public: const boost::intrusive_ptr<ExpressionContext>& expCtx) : DocumentSource(stageName, expCtx), _outputNs(std::move(outputNs)), - _writeConcern(expCtx->opCtx->getWriteConcern()), - _writeSizeEstimator( - expCtx->mongoProcessInterface->getWriteSizeEstimator(expCtx->opCtx, outputNs)) {} + _writeConcern(expCtx->opCtx->getWriteConcern()) {} DepsTracker::State getDependencies(DepsTracker* deps) const override { deps->needWholeDocument = true; @@ -171,9 +169,6 @@ protected: // respect the writeConcern of the original command. WriteConcernOptions _writeConcern; - // An interface that is used to estimate the size of each write operation. - const std::unique_ptr<MongoProcessInterface::WriteSizeEstimator> _writeSizeEstimator; - private: bool _initialized{false}; bool _done{false}; diff --git a/src/mongo/db/pipeline/process_interface/common_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_process_interface.cpp index 3e4897e9fc9..57e4c15f3cc 100644 --- a/src/mongo/db/pipeline/process_interface/common_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/common_process_interface.cpp @@ -152,12 +152,6 @@ std::vector<FieldPath> CommonProcessInterface::collectDocumentKeyFieldsActingAsR return {"_id"}; } -std::unique_ptr<CommonProcessInterface::WriteSizeEstimator> -CommonProcessInterface::getWriteSizeEstimator(OperationContext* opCtx, - const NamespaceString& ns) const { - return std::make_unique<LocalWriteSizeEstimator>(); -} - void CommonProcessInterface::updateClientOperationTime(OperationContext* opCtx) const { // In order to support causal consistency in a replica set or a sharded cluster when reading // with secondary read preference, the secondary must propagate the primary's operation time diff --git a/src/mongo/db/pipeline/process_interface/common_process_interface.h b/src/mongo/db/pipeline/process_interface/common_process_interface.h index 3e8625a2562..513edd5a6f4 100644 --- a/src/mongo/db/pipeline/process_interface/common_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/common_process_interface.h @@ -46,48 +46,6 @@ public: virtual ~CommonProcessInterface() = default; /** - * Estimates the size of writes that will be executed on the current node. Note that this - * does not account for the full size of an update statement. - */ - class LocalWriteSizeEstimator final : public WriteSizeEstimator { - public: - int estimateInsertSizeBytes(const BSONObj& insert) const override { - return insert.objsize(); - } - - int estimateUpdateSizeBytes(const BatchObject& batchObject, - UpsertType type) const override { - int size = std::get<write_ops::UpdateModification>(batchObject).objsize(); - if (auto vars = std::get<boost::optional<BSONObj>>(batchObject)) { - size += vars->objsize(); - } - return size; - } - }; - - /** - * Estimate the size of writes that will be sent to the replica set primary. - */ - class TargetPrimaryWriteSizeEstimator final : public WriteSizeEstimator { - public: - int estimateInsertSizeBytes(const BSONObj& insert) const override { - return insert.objsize() + write_ops::kWriteCommandBSONArrayPerElementOverheadBytes; - } - - int estimateUpdateSizeBytes(const BatchObject& batchObject, - UpsertType type) const override { - return getUpdateSizeEstimate(std::get<BSONObj>(batchObject), - std::get<write_ops::UpdateModification>(batchObject), - std::get<boost::optional<BSONObj>>(batchObject), - type != UpsertType::kNone /* includeUpsertSupplied */, - boost::none /* collation */, - boost::none /* arrayFilters */, - BSONObj() /* hint*/) + - write_ops::kWriteCommandBSONArrayPerElementOverheadBytes; - } - }; - - /** * Returns true if the field names of 'keyPattern' are exactly those in 'uniqueKeyPaths', and * each of the elements of 'keyPattern' is numeric, i.e. not "text", "$**", or any other special * type of index. @@ -106,9 +64,6 @@ public: virtual std::vector<FieldPath> collectDocumentKeyFieldsActingAsRouter( OperationContext*, const NamespaceString&) const override; - std::unique_ptr<WriteSizeEstimator> getWriteSizeEstimator( - OperationContext* opCtx, const NamespaceString& ns) const override; - virtual void updateClientOperationTime(OperationContext* opCtx) const final; boost::optional<ChunkVersion> refreshAndGetCollectionVersion( diff --git a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h index a0183fd9d33..0e36e13613f 100644 --- a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h @@ -78,7 +78,7 @@ class MongoProcessInterface { public: /** * Storage for a batch of BSON Objects to be updated in the write namespace. For each element - * in the batch we store a tuple of the following elements: + * in the batch we store a tuple of the folliwng elements: * 1. BSONObj - specifies the query that identifies a document in the to collection to be * updated. * 2. write_ops::UpdateModification - either the new document we want to upsert or insert into @@ -106,19 +106,6 @@ public: enum class CurrentOpBacktraceMode { kIncludeBacktrace, kExcludeBacktrace }; /** - * Interface which estimates the size of a given write operation. - */ - class WriteSizeEstimator { - public: - virtual ~WriteSizeEstimator() = default; - - virtual int estimateInsertSizeBytes(const BSONObj& insert) const = 0; - - virtual int estimateUpdateSizeBytes(const BatchObject& batchObject, - UpsertType type) const = 0; - }; - - /** * Factory function to create MongoProcessInterface of the right type. The implementation will * be installed by a lib higher up in the link graph depending on the application type. */ @@ -140,12 +127,6 @@ public: virtual ~MongoProcessInterface(){}; /** - * Returns an instance of a 'WriteSizeEstimator' interface. - */ - virtual std::unique_ptr<WriteSizeEstimator> getWriteSizeEstimator( - OperationContext* opCtx, const NamespaceString& ns) const = 0; - - /** * Creates a new TransactionHistoryIterator object. Only applicable in processes which support * locally traversing the oplog. */ diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp index 2ca328947ab..c3dcee83248 100644 --- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp @@ -101,12 +101,6 @@ bool supportsUniqueKey(const boost::intrusive_ptr<ExpressionContext>& expCtx, } // namespace -std::unique_ptr<CommonProcessInterface::WriteSizeEstimator> -MongosProcessInterface::getWriteSizeEstimator(OperationContext* opCtx, - const NamespaceString& ns) const { - return std::make_unique<TargetPrimaryWriteSizeEstimator>(); -} - std::unique_ptr<Pipeline, PipelineDeleter> MongosProcessInterface::attachCursorSourceToPipeline( Pipeline* ownedPipeline, ShardTargetingPolicy shardTargetingPolicy, diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h index 786440cb5af..3bb10f83f90 100644 --- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h @@ -45,9 +45,6 @@ public: virtual ~MongosProcessInterface() = default; - std::unique_ptr<WriteSizeEstimator> getWriteSizeEstimator( - OperationContext* opCtx, const NamespaceString& ns) const final; - boost::optional<Document> lookupSingleDocument( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss, diff --git a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h index 02c183d43a0..7d72c064123 100644 --- a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h @@ -43,15 +43,6 @@ class ReplicaSetNodeProcessInterface final : public NonShardServerProcessInterfa public: using NonShardServerProcessInterface::NonShardServerProcessInterface; - std::unique_ptr<WriteSizeEstimator> getWriteSizeEstimator( - OperationContext* opCtx, const NamespaceString& ns) const override { - if (_canWriteLocally(opCtx, ns)) { - return std::make_unique<LocalWriteSizeEstimator>(); - } else { - return std::make_unique<TargetPrimaryWriteSizeEstimator>(); - } - } - static std::shared_ptr<executor::TaskExecutor> getReplicaSetNodeExecutor( ServiceContext* service); diff --git a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h index ab0d6f2a8ab..f406cdfb726 100644 --- a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h @@ -52,22 +52,6 @@ public: MONGO_UNREACHABLE; } - class StubWriteSizeEstimator final : public WriteSizeEstimator { - public: - int estimateInsertSizeBytes(const BSONObj& insert) const override { - MONGO_UNREACHABLE; - } - - int estimateUpdateSizeBytes(const BatchObject& batchObject, - UpsertType type) const override { - MONGO_UNREACHABLE; - } - }; - std::unique_ptr<WriteSizeEstimator> getWriteSizeEstimator( - OperationContext* opCtx, const NamespaceString& ns) const override { - return std::make_unique<StubWriteSizeEstimator>(); - } - bool isSharded(OperationContext* opCtx, const NamespaceString& ns) override { return false; } diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp index 26f9a4d3488..4c2061dd07f 100644 --- a/src/mongo/s/write_ops/batch_write_op.cpp +++ b/src/mongo/s/write_ops/batch_write_op.cpp @@ -59,6 +59,7 @@ struct WriteErrorComp { // batches before serializing. // // TODO: Revisit when we revisit command limits in general +const int kEstUpdateOverheadBytes = (BSONObjMaxInternalSize - BSONObjMaxUserSize) / 100; const int kEstDeleteOverheadBytes = (BSONObjMaxInternalSize - BSONObjMaxUserSize) / 100; /** @@ -158,17 +159,51 @@ int getWriteSizeBytes(const WriteOp& writeOp) { return item.getDocument().objsize(); } else if (batchType == BatchedCommandRequest::BatchType_Update) { // Note: Be conservative here - it's okay if we send slightly too many batches. - const auto& update = item.getUpdate(); - auto estSize = write_ops::getUpdateSizeEstimate(update.getQ(), - update.getU(), - update.getC(), - update.getUpsertSupplied().has_value(), - update.getCollation(), - update.getArrayFilters(), - update.getHint()); + auto estSize = static_cast<int>(BSONObj::kMinBSONLength); + static const auto boolSize = 1; + + // Add the size of the 'collation' field, if present. + estSize += !item.getUpdate().getCollation() ? 0 + : (UpdateOpEntry::kCollationFieldName.size() + + item.getUpdate().getCollation()->objsize()); + + // Add the size of the 'arrayFilters' field, if present. + estSize += !item.getUpdate().getArrayFilters() ? 0 : ([&item]() { + auto size = BSONObj::kMinBSONLength + UpdateOpEntry::kArrayFiltersFieldName.size(); + for (auto&& filter : *item.getUpdate().getArrayFilters()) { + size += filter.objsize(); + } + return size; + })(); + + // Add the sizes of the 'multi' and 'upsert' fields. + estSize += UpdateOpEntry::kUpsertFieldName.size() + boolSize; + estSize += UpdateOpEntry::kMultiFieldName.size() + boolSize; + + // Add the size of 'upsertSupplied' field if present. + if (auto upsertSupplied = item.getUpdate().getUpsertSupplied()) { + estSize += UpdateOpEntry::kUpsertSuppliedFieldName.size() + boolSize; + } + + // Add the sizes of the 'q' and 'u' fields. + estSize += (UpdateOpEntry::kQFieldName.size() + item.getUpdate().getQ().objsize() + + UpdateOpEntry::kUFieldName.size() + item.getUpdate().getU().objsize()); + + // Add the size of the 'c' field if present. + if (auto constants = item.getUpdate().getC()) { + estSize += UpdateOpEntry::kCFieldName.size() + item.getUpdate().getC()->objsize(); + } + + // Add the size of 'hint' field if present. + if (auto hint = item.getUpdate().getHint(); !hint.isEmpty()) { + estSize += UpdateOpEntry::kHintFieldName.size() + hint.objsize(); + } + + // Finally, add the constant updateOp overhead size. + estSize += kEstUpdateOverheadBytes; // When running a debug build, verify that estSize is at least the BSON serialization size. - dassert(estSize >= update.toBSON().objsize()); + dassert(estSize >= item.getUpdate().toBSON().objsize()); return estSize; } else if (batchType == BatchedCommandRequest::BatchType_Delete) { // Note: Be conservative here - it's okay if we send slightly too many batches. |