diff options
author | Mihai Andrei <mihai.andrei@10gen.com> | 2022-08-16 21:13:23 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-08-16 22:51:32 +0000 |
commit | 707ba0a0ade42c4540b9cabaaf5a257de944cc3e (patch) | |
tree | e9c856451624391d8c7216603889cb7f779c5759 /src/mongo | |
parent | e6e66274ce5e2740b5a8265a4ba0fb20856cecca (diff) | |
download | mongo-707ba0a0ade42c4540b9cabaaf5a257de944cc3e.tar.gz |
SERVER-66289 Update write size estimation logic in DocumentSourceWriter
Diffstat (limited to 'src/mongo')
14 files changed, 275 insertions, 93 deletions
diff --git a/src/mongo/db/ops/write_ops.cpp b/src/mongo/db/ops/write_ops.cpp index 0ef8f329558..8e19bfd3117 100644 --- a/src/mongo/db/ops/write_ops.cpp +++ b/src/mongo/db/ops/write_ops.cpp @@ -136,6 +136,68 @@ int32_t getStmtIdForWriteAt(const WriteCommandRequestBase& writeCommandBase, siz return kFirstStmtId + writePos; } +int getUpdateSizeEstimate(const BSONObj& q, + const write_ops::UpdateModification& u, + const boost::optional<mongo::BSONObj>& c, + const bool includeUpsertSupplied, + const boost::optional<mongo::BSONObj>& collation, + const boost::optional<std::vector<mongo::BSONObj>>& arrayFilters, + const mongo::BSONObj& hint) { + using UpdateOpEntry = write_ops::UpdateOpEntry; + + // This constant accounts for the null terminator in each field name and the BSONType byte for + // each element. + static const int kPerElementOverhead = 2; + static const int kBoolSize = 1; + int estSize = static_cast<int>(BSONObj::kMinBSONLength); + + // Add the sizes of the 'multi' and 'upsert' fields. + estSize += UpdateOpEntry::kUpsertFieldName.size() + kBoolSize + kPerElementOverhead; + estSize += UpdateOpEntry::kMultiFieldName.size() + kBoolSize + kPerElementOverhead; + + // Add the size of 'upsertSupplied' field if present. + if (includeUpsertSupplied) { + estSize += UpdateOpEntry::kUpsertSuppliedFieldName.size() + kBoolSize + kPerElementOverhead; + } + + // Add the sizes of the 'q' and 'u' fields. + estSize += (UpdateOpEntry::kQFieldName.size() + q.objsize() + kPerElementOverhead + + UpdateOpEntry::kUFieldName.size() + u.objsize() + kPerElementOverhead); + + // Add the size of the 'c' field, if present. + if (c) { + estSize += (UpdateOpEntry::kCFieldName.size() + c->objsize() + kPerElementOverhead); + } + + // Add the size of the 'collation' field, if present. + if (collation) { + estSize += (UpdateOpEntry::kCollationFieldName.size() + collation->objsize() + + kPerElementOverhead); + } + + // Add the size of the 'arrayFilters' field, if present. + if (arrayFilters) { + estSize += ([&]() { + auto size = BSONObj::kMinBSONLength + UpdateOpEntry::kArrayFiltersFieldName.size() + + kPerElementOverhead; + for (auto&& filter : *arrayFilters) { + // For each filter, we not only need to account for the size of the filter itself, + // but also for the per array element overhead. + size += filter.objsize(); + size += write_ops::kWriteCommandBSONArrayPerElementOverheadBytes; + } + return size; + })(); + } + + // Add the size of 'hint' field if present. + if (!hint.isEmpty()) { + estSize += UpdateOpEntry::kHintFieldName.size() + hint.objsize() + kPerElementOverhead; + } + + return estSize; +} + bool isClassicalUpdateReplacement(const BSONObj& update) { // An empty update object will be treated as replacement as firstElementFieldName() returns "". return update.firstElementFieldName()[0] != '$'; diff --git a/src/mongo/db/ops/write_ops.h b/src/mongo/db/ops/write_ops.h index d78791fdfdc..a1201412a33 100644 --- a/src/mongo/db/ops/write_ops.h +++ b/src/mongo/db/ops/write_ops.h @@ -104,6 +104,18 @@ const std::vector<BSONObj>& arrayFiltersOf(const T& opEntry) { } /** + * Utility which estimates the size in bytes of an update statement with the given parameters, when + * serialized in the format used for the update command. + */ +int getUpdateSizeEstimate(const BSONObj& q, + const write_ops::UpdateModification& u, + const boost::optional<mongo::BSONObj>& c, + bool includeUpsertSupplied, + const boost::optional<mongo::BSONObj>& collation, + const boost::optional<std::vector<mongo::BSONObj>>& arrayFilters, + const mongo::BSONObj& hint); + +/** * If the response from a write command contains any write errors, it will throw the first one. All * the remaining errors will be disregarded. * diff --git a/src/mongo/db/pipeline/document_source_merge.cpp b/src/mongo/db/pipeline/document_source_merge.cpp index 2e200076bb9..e881407df60 100644 --- a/src/mongo/db/pipeline/document_source_merge.cpp +++ b/src/mongo/db/pipeline/document_source_merge.cpp @@ -62,7 +62,7 @@ using MergeStrategy = MergeStrategyDescriptor::MergeStrategy; using MergeStrategyDescriptorsMap = std::map<const MergeMode, const MergeStrategyDescriptor>; using WhenMatched = MergeStrategyDescriptor::WhenMatched; using WhenNotMatched = MergeStrategyDescriptor::WhenNotMatched; -using BatchTransform = std::function<void(DocumentSourceMerge::BatchedObjects&)>; +using BatchTransform = DocumentSourceMerge::BatchTransform; using UpdateModification = write_ops::UpdateModification; using UpsertType = MongoProcessInterface::UpsertType; @@ -86,17 +86,15 @@ const auto kDefaultPipelineLet = BSON("new" << "$$ROOT"); /** - * Creates a merge strategy which uses update semantics to perform a merge operation. If - * 'BatchTransform' function is provided, it will be called to transform batched objects before - * passing them to the 'update'. + * Creates a merge strategy which uses update semantics to perform a merge operation. */ -MergeStrategy makeUpdateStrategy(UpsertType upsert, BatchTransform transform) { - return [upsert, transform]( - const auto& expCtx, const auto& ns, const auto& wc, auto epoch, auto&& batch) { - if (transform) { - transform(batch); - } - +MergeStrategy makeUpdateStrategy() { + return [](const auto& expCtx, + const auto& ns, + const auto& wc, + auto epoch, + auto&& batch, + UpsertType upsert) { constexpr auto multi = false; uassertStatusOK(expCtx->mongoProcessInterface->update( expCtx, ns, std::move(batch), wc, upsert, multi, epoch)); @@ -108,16 +106,15 @@ MergeStrategy makeUpdateStrategy(UpsertType upsert, BatchTransform transform) { * that each document in the batch has a matching document in the 'ns' collection (note that a * matching document may not be modified as a result of an update operation, yet it still will be * counted as matching). If at least one document doesn't have a match, this strategy returns an - * error. If 'BatchTransform' function is provided, it will be called to transform batched objects - * before passing them to the 'update'. + * error. */ -MergeStrategy makeStrictUpdateStrategy(UpsertType upsert, BatchTransform transform) { - return [upsert, transform]( - const auto& expCtx, const auto& ns, const auto& wc, auto epoch, auto&& batch) { - if (transform) { - transform(batch); - } - +MergeStrategy makeStrictUpdateStrategy() { + return [](const auto& expCtx, + const auto& ns, + const auto& wc, + auto epoch, + auto&& batch, + UpsertType upsert) { const int64_t batchSize = batch.size(); constexpr auto multi = false; auto updateResult = uassertStatusOK(expCtx->mongoProcessInterface->update( @@ -133,7 +130,12 @@ MergeStrategy makeStrictUpdateStrategy(UpsertType upsert, BatchTransform transfo * Creates a merge strategy which uses insert semantics to perform a merge operation. */ MergeStrategy makeInsertStrategy() { - return [](const auto& expCtx, const auto& ns, const auto& wc, auto epoch, auto&& batch) { + return [](const auto& expCtx, + const auto& ns, + const auto& wc, + auto epoch, + auto&& batch, + UpsertType upsertType) { std::vector<BSONObj> objectsToInsert(batch.size()); // The batch stores replacement style updates, but for this "insert" style of $merge we'd // like to just insert the new document without attempting any sort of replacement. @@ -146,15 +148,13 @@ MergeStrategy makeInsertStrategy() { } /** - * Creates a batched objects transformation function which wraps each element of the - * 'batch.modifications' array into the given 'updateOp' operator. + * Creates a batched object transformation function which wraps 'obj' into the given 'updateOp' + * operator. */ BatchTransform makeUpdateTransform(const std::string& updateOp) { - return [updateOp](auto& batch) { - for (auto&& obj : batch) { - std::get<UpdateModification>(obj) = UpdateModification::parseFromClassicUpdate( - BSON(updateOp << std::get<UpdateModification>(obj).getUpdateReplacement())); - } + return [updateOp](auto& obj) { + std::get<UpdateModification>(obj) = UpdateModification::parseFromClassicUpdate( + BSON(updateOp << std::get<UpdateModification>(obj).getUpdateReplacement())); }; } @@ -178,48 +178,67 @@ const MergeStrategyDescriptorsMap& getDescriptors() { {kReplaceInsertMode, {kReplaceInsertMode, {ActionType::insert, ActionType::update}, - makeUpdateStrategy(UpsertType::kGenerateNewDoc, {})}}, + makeUpdateStrategy(), + {}, + UpsertType::kGenerateNewDoc}}, // whenMatched: replace, whenNotMatched: fail {kReplaceFailMode, - {kReplaceFailMode, {ActionType::update}, makeStrictUpdateStrategy(UpsertType::kNone, {})}}, + {kReplaceFailMode, + {ActionType::update}, + makeStrictUpdateStrategy(), + {}, + UpsertType::kNone}}, // whenMatched: replace, whenNotMatched: discard {kReplaceDiscardMode, - {kReplaceDiscardMode, {ActionType::update}, makeUpdateStrategy(UpsertType::kNone, {})}}, + {kReplaceDiscardMode, {ActionType::update}, makeUpdateStrategy(), {}, UpsertType::kNone}}, // whenMatched: merge, whenNotMatched: insert {kMergeInsertMode, {kMergeInsertMode, {ActionType::insert, ActionType::update}, - makeUpdateStrategy(UpsertType::kGenerateNewDoc, makeUpdateTransform("$set"))}}, + makeUpdateStrategy(), + makeUpdateTransform("$set"), + UpsertType::kGenerateNewDoc}}, // whenMatched: merge, whenNotMatched: fail {kMergeFailMode, {kMergeFailMode, {ActionType::update}, - makeStrictUpdateStrategy(UpsertType::kNone, makeUpdateTransform("$set"))}}, + makeStrictUpdateStrategy(), + makeUpdateTransform("$set"), + UpsertType::kNone}}, // whenMatched: merge, whenNotMatched: discard {kMergeDiscardMode, {kMergeDiscardMode, {ActionType::update}, - makeUpdateStrategy(UpsertType::kNone, makeUpdateTransform("$set"))}}, + makeUpdateStrategy(), + makeUpdateTransform("$set"), + UpsertType::kNone}}, // whenMatched: keepExisting, whenNotMatched: insert {kKeepExistingInsertMode, {kKeepExistingInsertMode, {ActionType::insert, ActionType::update}, - makeUpdateStrategy(UpsertType::kGenerateNewDoc, makeUpdateTransform("$setOnInsert"))}}, + makeUpdateStrategy(), + makeUpdateTransform("$setOnInsert"), + UpsertType::kGenerateNewDoc}}, // whenMatched: [pipeline], whenNotMatched: insert {kPipelineInsertMode, {kPipelineInsertMode, {ActionType::insert, ActionType::update}, - makeUpdateStrategy(UpsertType::kInsertSuppliedDoc, {})}}, + makeUpdateStrategy(), + {}, + UpsertType::kInsertSuppliedDoc}}, // whenMatched: [pipeline], whenNotMatched: fail {kPipelineFailMode, {kPipelineFailMode, {ActionType::update}, - makeStrictUpdateStrategy(UpsertType::kNone, {})}}, + makeStrictUpdateStrategy(), + {}, + UpsertType::kNone}}, // whenMatched: [pipeline], whenNotMatched: discard {kPipelineDiscardMode, - {kPipelineDiscardMode, {ActionType::update}, makeUpdateStrategy(UpsertType::kNone, {})}}, + {kPipelineDiscardMode, {ActionType::update}, makeUpdateStrategy(), {}, UpsertType::kNone}}, // whenMatched: fail, whenNotMatched: insert - {kFailInsertMode, {kFailInsertMode, {ActionType::insert}, makeInsertStrategy()}}}; + {kFailInsertMode, + {kFailInsertMode, {ActionType::insert}, makeInsertStrategy(), {}, UpsertType::kNone}}}; return mergeStrategyDescriptors; } @@ -552,8 +571,14 @@ std::pair<DocumentSourceMerge::BatchObject, int> DocumentSourceMerge::makeBatchO auto mergeOnFields = extractMergeOnFieldsFromDoc(doc, _mergeOnFields); auto mod = makeBatchUpdateModification(doc); auto vars = resolveLetVariablesIfNeeded(doc); - auto modSize = mod.objsize() + (vars ? vars->objsize() : 0); - return {{std::move(mergeOnFields), std::move(mod), std::move(vars)}, modSize}; + BatchObject batchObject{std::move(mergeOnFields), std::move(mod), std::move(vars)}; + if (_descriptor.transform) { + _descriptor.transform(batchObject); + } + + tassert(6628901, "_writeSizeEstimator should be initialized", _writeSizeEstimator); + return {batchObject, + _writeSizeEstimator->estimateUpdateSizeBytes(batchObject, _descriptor.upsertType)}; } void DocumentSourceMerge::spill(BatchedObjects&& batch) try { @@ -562,7 +587,8 @@ void DocumentSourceMerge::spill(BatchedObjects&& batch) try { ? boost::optional<OID>(_targetCollectionVersion->epoch()) : boost::none; - _descriptor.strategy(pExpCtx, _outputNs, _writeConcern, targetEpoch, std::move(batch)); + _descriptor.strategy( + pExpCtx, _outputNs, _writeConcern, targetEpoch, std::move(batch), _descriptor.upsertType); } catch (const ExceptionFor<ErrorCodes::ImmutableField>& ex) { uassertStatusOKWithContext(ex.toStatus(), "$merge failed to update the matching document, did you " diff --git a/src/mongo/db/pipeline/document_source_merge.h b/src/mongo/db/pipeline/document_source_merge.h index 05a87f1e340..e3784ae545c 100644 --- a/src/mongo/db/pipeline/document_source_merge.h +++ b/src/mongo/db/pipeline/document_source_merge.h @@ -44,24 +44,31 @@ class DocumentSourceMerge final : public DocumentSourceWriter<MongoProcessInterf public: static constexpr StringData kStageName = "$merge"_sd; - // A descriptor for a merge strategy. Holds a merge strategy function and a set of actions - // the client should be authorized to perform in order to be able to execute a merge operation - // using this merge strategy. + using BatchTransform = std::function<void(MongoProcessInterface::BatchObject&)>; + + // A descriptor for a merge strategy. Holds a merge strategy function and a set of actions the + // client should be authorized to perform in order to be able to execute a merge operation using + // this merge strategy. If a 'BatchTransform' function is provided, it will be called when + // constructing a batch object to transform updates. struct MergeStrategyDescriptor { using WhenMatched = MergeWhenMatchedModeEnum; using WhenNotMatched = MergeWhenNotMatchedModeEnum; using MergeMode = std::pair<WhenMatched, WhenNotMatched>; + using UpsertType = MongoProcessInterface::UpsertType; // A function encapsulating a merge strategy for the $merge stage based on the pair of // whenMatched/whenNotMatched modes. using MergeStrategy = std::function<void(const boost::intrusive_ptr<ExpressionContext>&, const NamespaceString&, const WriteConcernOptions&, boost::optional<OID>, - BatchedObjects&&)>; + BatchedObjects&&, + UpsertType upsert)>; MergeMode mode; ActionSet actions; MergeStrategy strategy; + BatchTransform transform; + UpsertType upsertType; }; /** diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h index 88883cb6231..602db5815f7 100644 --- a/src/mongo/db/pipeline/document_source_out.h +++ b/src/mongo/db/pipeline/document_source_out.h @@ -131,7 +131,8 @@ private: std::pair<BSONObj, int> makeBatchObject(Document&& doc) const override { auto obj = doc.toBson(); - return {obj, obj.objsize()}; + tassert(6628900, "_writeSizeEstimator should be initialized", _writeSizeEstimator); + return {obj, _writeSizeEstimator->estimateInsertSizeBytes(obj)}; } void waitWhileFailPointEnabled() override; diff --git a/src/mongo/db/pipeline/document_source_writer.h b/src/mongo/db/pipeline/document_source_writer.h index 1965bd4dee6..284699deb30 100644 --- a/src/mongo/db/pipeline/document_source_writer.h +++ b/src/mongo/db/pipeline/document_source_writer.h @@ -104,7 +104,9 @@ public: const boost::intrusive_ptr<ExpressionContext>& expCtx) : DocumentSource(stageName, expCtx), _outputNs(std::move(outputNs)), - _writeConcern(expCtx->opCtx->getWriteConcern()) {} + _writeConcern(expCtx->opCtx->getWriteConcern()), + _writeSizeEstimator( + expCtx->mongoProcessInterface->getWriteSizeEstimator(expCtx->opCtx, outputNs)) {} DepsTracker::State getDependencies(DepsTracker* deps) const override { deps->needWholeDocument = true; @@ -169,6 +171,9 @@ protected: // respect the writeConcern of the original command. WriteConcernOptions _writeConcern; + // An interface that is used to estimate the size of each write operation. + const std::unique_ptr<MongoProcessInterface::WriteSizeEstimator> _writeSizeEstimator; + private: bool _initialized{false}; bool _done{false}; diff --git a/src/mongo/db/pipeline/process_interface/common_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_process_interface.cpp index 0987e83a6ee..8e56f390b13 100644 --- a/src/mongo/db/pipeline/process_interface/common_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/common_process_interface.cpp @@ -152,6 +152,12 @@ std::vector<FieldPath> CommonProcessInterface::collectDocumentKeyFieldsActingAsR return {"_id"}; } +std::unique_ptr<CommonProcessInterface::WriteSizeEstimator> +CommonProcessInterface::getWriteSizeEstimator(OperationContext* opCtx, + const NamespaceString& ns) const { + return std::make_unique<LocalWriteSizeEstimator>(); +} + void CommonProcessInterface::updateClientOperationTime(OperationContext* opCtx) const { // In order to support causal consistency in a replica set or a sharded cluster when reading // with secondary read preference, the secondary must propagate the primary's operation time diff --git a/src/mongo/db/pipeline/process_interface/common_process_interface.h b/src/mongo/db/pipeline/process_interface/common_process_interface.h index efaf292f796..ea18e45b50a 100644 --- a/src/mongo/db/pipeline/process_interface/common_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/common_process_interface.h @@ -46,6 +46,48 @@ public: virtual ~CommonProcessInterface() = default; /** + * Estimates the size of writes that will be executed on the current node. Note that this + * does not account for the full size of an update statement. + */ + class LocalWriteSizeEstimator final : public WriteSizeEstimator { + public: + int estimateInsertSizeBytes(const BSONObj& insert) const override { + return insert.objsize(); + } + + int estimateUpdateSizeBytes(const BatchObject& batchObject, + UpsertType type) const override { + int size = std::get<write_ops::UpdateModification>(batchObject).objsize(); + if (auto vars = std::get<boost::optional<BSONObj>>(batchObject)) { + size += vars->objsize(); + } + return size; + } + }; + + /** + * Estimate the size of writes that will be sent to the replica set primary. + */ + class TargetPrimaryWriteSizeEstimator final : public WriteSizeEstimator { + public: + int estimateInsertSizeBytes(const BSONObj& insert) const override { + return insert.objsize() + write_ops::kWriteCommandBSONArrayPerElementOverheadBytes; + } + + int estimateUpdateSizeBytes(const BatchObject& batchObject, + UpsertType type) const override { + return getUpdateSizeEstimate(std::get<BSONObj>(batchObject), + std::get<write_ops::UpdateModification>(batchObject), + std::get<boost::optional<BSONObj>>(batchObject), + type != UpsertType::kNone /* includeUpsertSupplied */, + boost::none /* collation */, + boost::none /* arrayFilters */, + BSONObj() /* hint*/) + + write_ops::kWriteCommandBSONArrayPerElementOverheadBytes; + } + }; + + /** * Returns true if the field names of 'keyPattern' are exactly those in 'uniqueKeyPaths', and * each of the elements of 'keyPattern' is numeric, i.e. not "text", "$**", or any other special * type of index. @@ -64,6 +106,9 @@ public: virtual std::vector<FieldPath> collectDocumentKeyFieldsActingAsRouter( OperationContext*, const NamespaceString&) const override; + std::unique_ptr<WriteSizeEstimator> getWriteSizeEstimator( + OperationContext* opCtx, const NamespaceString& ns) const override; + virtual void updateClientOperationTime(OperationContext* opCtx) const final; boost::optional<ShardVersion> refreshAndGetCollectionVersion( diff --git a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h index 3ca0029411f..1f2c73d9c6e 100644 --- a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h @@ -78,7 +78,7 @@ class MongoProcessInterface { public: /** * Storage for a batch of BSON Objects to be updated in the write namespace. For each element - * in the batch we store a tuple of the folliwng elements: + * in the batch we store a tuple of the following elements: * 1. BSONObj - specifies the query that identifies a document in the to collection to be * updated. * 2. write_ops::UpdateModification - either the new document we want to upsert or insert into @@ -106,6 +106,19 @@ public: enum class CurrentOpBacktraceMode { kIncludeBacktrace, kExcludeBacktrace }; /** + * Interface which estimates the size of a given write operation. + */ + class WriteSizeEstimator { + public: + virtual ~WriteSizeEstimator() = default; + + virtual int estimateInsertSizeBytes(const BSONObj& insert) const = 0; + + virtual int estimateUpdateSizeBytes(const BatchObject& batchObject, + UpsertType type) const = 0; + }; + + /** * Factory function to create MongoProcessInterface of the right type. The implementation will * be installed by a lib higher up in the link graph depending on the application type. */ @@ -127,6 +140,12 @@ public: virtual ~MongoProcessInterface(){}; /** + * Returns an instance of a 'WriteSizeEstimator' interface. + */ + virtual std::unique_ptr<WriteSizeEstimator> getWriteSizeEstimator( + OperationContext* opCtx, const NamespaceString& ns) const = 0; + + /** * Creates a new TransactionHistoryIterator object. Only applicable in processes which support * locally traversing the oplog. */ diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp index 117fa156d71..1e46ea9117c 100644 --- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp @@ -101,6 +101,12 @@ bool supportsUniqueKey(const boost::intrusive_ptr<ExpressionContext>& expCtx, } // namespace +std::unique_ptr<CommonProcessInterface::WriteSizeEstimator> +MongosProcessInterface::getWriteSizeEstimator(OperationContext* opCtx, + const NamespaceString& ns) const { + return std::make_unique<TargetPrimaryWriteSizeEstimator>(); +} + std::unique_ptr<Pipeline, PipelineDeleter> MongosProcessInterface::attachCursorSourceToPipeline( Pipeline* ownedPipeline, ShardTargetingPolicy shardTargetingPolicy, diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h index 3bb10f83f90..786440cb5af 100644 --- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h @@ -45,6 +45,9 @@ public: virtual ~MongosProcessInterface() = default; + std::unique_ptr<WriteSizeEstimator> getWriteSizeEstimator( + OperationContext* opCtx, const NamespaceString& ns) const final; + boost::optional<Document> lookupSingleDocument( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss, diff --git a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h index 7d72c064123..02c183d43a0 100644 --- a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h @@ -43,6 +43,15 @@ class ReplicaSetNodeProcessInterface final : public NonShardServerProcessInterfa public: using NonShardServerProcessInterface::NonShardServerProcessInterface; + std::unique_ptr<WriteSizeEstimator> getWriteSizeEstimator( + OperationContext* opCtx, const NamespaceString& ns) const override { + if (_canWriteLocally(opCtx, ns)) { + return std::make_unique<LocalWriteSizeEstimator>(); + } else { + return std::make_unique<TargetPrimaryWriteSizeEstimator>(); + } + } + static std::shared_ptr<executor::TaskExecutor> getReplicaSetNodeExecutor( ServiceContext* service); diff --git a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h index 7b96bb53ed0..813684b8f8e 100644 --- a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h @@ -52,6 +52,22 @@ public: MONGO_UNREACHABLE; } + class StubWriteSizeEstimator final : public WriteSizeEstimator { + public: + int estimateInsertSizeBytes(const BSONObj& insert) const override { + MONGO_UNREACHABLE; + } + + int estimateUpdateSizeBytes(const BatchObject& batchObject, + UpsertType type) const override { + MONGO_UNREACHABLE; + } + }; + std::unique_ptr<WriteSizeEstimator> getWriteSizeEstimator( + OperationContext* opCtx, const NamespaceString& ns) const override { + return std::make_unique<StubWriteSizeEstimator>(); + } + bool isSharded(OperationContext* opCtx, const NamespaceString& ns) override { return false; } diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp index 4c2061dd07f..26f9a4d3488 100644 --- a/src/mongo/s/write_ops/batch_write_op.cpp +++ b/src/mongo/s/write_ops/batch_write_op.cpp @@ -59,7 +59,6 @@ struct WriteErrorComp { // batches before serializing. // // TODO: Revisit when we revisit command limits in general -const int kEstUpdateOverheadBytes = (BSONObjMaxInternalSize - BSONObjMaxUserSize) / 100; const int kEstDeleteOverheadBytes = (BSONObjMaxInternalSize - BSONObjMaxUserSize) / 100; /** @@ -159,51 +158,17 @@ int getWriteSizeBytes(const WriteOp& writeOp) { return item.getDocument().objsize(); } else if (batchType == BatchedCommandRequest::BatchType_Update) { // Note: Be conservative here - it's okay if we send slightly too many batches. - auto estSize = static_cast<int>(BSONObj::kMinBSONLength); - static const auto boolSize = 1; - - // Add the size of the 'collation' field, if present. - estSize += !item.getUpdate().getCollation() ? 0 - : (UpdateOpEntry::kCollationFieldName.size() + - item.getUpdate().getCollation()->objsize()); - - // Add the size of the 'arrayFilters' field, if present. - estSize += !item.getUpdate().getArrayFilters() ? 0 : ([&item]() { - auto size = BSONObj::kMinBSONLength + UpdateOpEntry::kArrayFiltersFieldName.size(); - for (auto&& filter : *item.getUpdate().getArrayFilters()) { - size += filter.objsize(); - } - return size; - })(); - - // Add the sizes of the 'multi' and 'upsert' fields. - estSize += UpdateOpEntry::kUpsertFieldName.size() + boolSize; - estSize += UpdateOpEntry::kMultiFieldName.size() + boolSize; - - // Add the size of 'upsertSupplied' field if present. - if (auto upsertSupplied = item.getUpdate().getUpsertSupplied()) { - estSize += UpdateOpEntry::kUpsertSuppliedFieldName.size() + boolSize; - } - - // Add the sizes of the 'q' and 'u' fields. - estSize += (UpdateOpEntry::kQFieldName.size() + item.getUpdate().getQ().objsize() + - UpdateOpEntry::kUFieldName.size() + item.getUpdate().getU().objsize()); - - // Add the size of the 'c' field if present. - if (auto constants = item.getUpdate().getC()) { - estSize += UpdateOpEntry::kCFieldName.size() + item.getUpdate().getC()->objsize(); - } - - // Add the size of 'hint' field if present. - if (auto hint = item.getUpdate().getHint(); !hint.isEmpty()) { - estSize += UpdateOpEntry::kHintFieldName.size() + hint.objsize(); - } - - // Finally, add the constant updateOp overhead size. - estSize += kEstUpdateOverheadBytes; + const auto& update = item.getUpdate(); + auto estSize = write_ops::getUpdateSizeEstimate(update.getQ(), + update.getU(), + update.getC(), + update.getUpsertSupplied().has_value(), + update.getCollation(), + update.getArrayFilters(), + update.getHint()); // When running a debug build, verify that estSize is at least the BSON serialization size. - dassert(estSize >= item.getUpdate().toBSON().objsize()); + dassert(estSize >= update.toBSON().objsize()); return estSize; } else if (batchType == BatchedCommandRequest::BatchType_Delete) { // Note: Be conservative here - it's okay if we send slightly too many batches. |