summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorMihai Andrei <mihai.andrei@10gen.com>2022-08-16 21:13:23 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-08-16 22:51:32 +0000
commit707ba0a0ade42c4540b9cabaaf5a257de944cc3e (patch)
treee9c856451624391d8c7216603889cb7f779c5759 /src/mongo
parente6e66274ce5e2740b5a8265a4ba0fb20856cecca (diff)
downloadmongo-707ba0a0ade42c4540b9cabaaf5a257de944cc3e.tar.gz
SERVER-66289 Update write size estimation logic in DocumentSourceWriter
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/ops/write_ops.cpp62
-rw-r--r--src/mongo/db/ops/write_ops.h12
-rw-r--r--src/mongo/db/pipeline/document_source_merge.cpp110
-rw-r--r--src/mongo/db/pipeline/document_source_merge.h15
-rw-r--r--src/mongo/db/pipeline/document_source_out.h3
-rw-r--r--src/mongo/db/pipeline/document_source_writer.h7
-rw-r--r--src/mongo/db/pipeline/process_interface/common_process_interface.cpp6
-rw-r--r--src/mongo/db/pipeline/process_interface/common_process_interface.h45
-rw-r--r--src/mongo/db/pipeline/process_interface/mongo_process_interface.h21
-rw-r--r--src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp6
-rw-r--r--src/mongo/db/pipeline/process_interface/mongos_process_interface.h3
-rw-r--r--src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h9
-rw-r--r--src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h16
-rw-r--r--src/mongo/s/write_ops/batch_write_op.cpp53
14 files changed, 275 insertions, 93 deletions
diff --git a/src/mongo/db/ops/write_ops.cpp b/src/mongo/db/ops/write_ops.cpp
index 0ef8f329558..8e19bfd3117 100644
--- a/src/mongo/db/ops/write_ops.cpp
+++ b/src/mongo/db/ops/write_ops.cpp
@@ -136,6 +136,68 @@ int32_t getStmtIdForWriteAt(const WriteCommandRequestBase& writeCommandBase, siz
return kFirstStmtId + writePos;
}
+int getUpdateSizeEstimate(const BSONObj& q,
+ const write_ops::UpdateModification& u,
+ const boost::optional<mongo::BSONObj>& c,
+ const bool includeUpsertSupplied,
+ const boost::optional<mongo::BSONObj>& collation,
+ const boost::optional<std::vector<mongo::BSONObj>>& arrayFilters,
+ const mongo::BSONObj& hint) {
+ using UpdateOpEntry = write_ops::UpdateOpEntry;
+
+ // This constant accounts for the null terminator in each field name and the BSONType byte for
+ // each element.
+ static const int kPerElementOverhead = 2;
+ static const int kBoolSize = 1;
+ int estSize = static_cast<int>(BSONObj::kMinBSONLength);
+
+ // Add the sizes of the 'multi' and 'upsert' fields.
+ estSize += UpdateOpEntry::kUpsertFieldName.size() + kBoolSize + kPerElementOverhead;
+ estSize += UpdateOpEntry::kMultiFieldName.size() + kBoolSize + kPerElementOverhead;
+
+ // Add the size of 'upsertSupplied' field if present.
+ if (includeUpsertSupplied) {
+ estSize += UpdateOpEntry::kUpsertSuppliedFieldName.size() + kBoolSize + kPerElementOverhead;
+ }
+
+ // Add the sizes of the 'q' and 'u' fields.
+ estSize += (UpdateOpEntry::kQFieldName.size() + q.objsize() + kPerElementOverhead +
+ UpdateOpEntry::kUFieldName.size() + u.objsize() + kPerElementOverhead);
+
+ // Add the size of the 'c' field, if present.
+ if (c) {
+ estSize += (UpdateOpEntry::kCFieldName.size() + c->objsize() + kPerElementOverhead);
+ }
+
+ // Add the size of the 'collation' field, if present.
+ if (collation) {
+ estSize += (UpdateOpEntry::kCollationFieldName.size() + collation->objsize() +
+ kPerElementOverhead);
+ }
+
+ // Add the size of the 'arrayFilters' field, if present.
+ if (arrayFilters) {
+ estSize += ([&]() {
+ auto size = BSONObj::kMinBSONLength + UpdateOpEntry::kArrayFiltersFieldName.size() +
+ kPerElementOverhead;
+ for (auto&& filter : *arrayFilters) {
+ // For each filter, we not only need to account for the size of the filter itself,
+ // but also for the per array element overhead.
+ size += filter.objsize();
+ size += write_ops::kWriteCommandBSONArrayPerElementOverheadBytes;
+ }
+ return size;
+ })();
+ }
+
+ // Add the size of 'hint' field if present.
+ if (!hint.isEmpty()) {
+ estSize += UpdateOpEntry::kHintFieldName.size() + hint.objsize() + kPerElementOverhead;
+ }
+
+ return estSize;
+}
+
bool isClassicalUpdateReplacement(const BSONObj& update) {
// An empty update object will be treated as replacement as firstElementFieldName() returns "".
return update.firstElementFieldName()[0] != '$';
diff --git a/src/mongo/db/ops/write_ops.h b/src/mongo/db/ops/write_ops.h
index d78791fdfdc..a1201412a33 100644
--- a/src/mongo/db/ops/write_ops.h
+++ b/src/mongo/db/ops/write_ops.h
@@ -104,6 +104,18 @@ const std::vector<BSONObj>& arrayFiltersOf(const T& opEntry) {
}
/**
+ * Utility which estimates the size in bytes of an update statement with the given parameters, when
+ * serialized in the format used for the update command.
+ */
+int getUpdateSizeEstimate(const BSONObj& q,
+ const write_ops::UpdateModification& u,
+ const boost::optional<mongo::BSONObj>& c,
+ bool includeUpsertSupplied,
+ const boost::optional<mongo::BSONObj>& collation,
+ const boost::optional<std::vector<mongo::BSONObj>>& arrayFilters,
+ const mongo::BSONObj& hint);
+
+/**
* If the response from a write command contains any write errors, it will throw the first one. All
* the remaining errors will be disregarded.
*
diff --git a/src/mongo/db/pipeline/document_source_merge.cpp b/src/mongo/db/pipeline/document_source_merge.cpp
index 2e200076bb9..e881407df60 100644
--- a/src/mongo/db/pipeline/document_source_merge.cpp
+++ b/src/mongo/db/pipeline/document_source_merge.cpp
@@ -62,7 +62,7 @@ using MergeStrategy = MergeStrategyDescriptor::MergeStrategy;
using MergeStrategyDescriptorsMap = std::map<const MergeMode, const MergeStrategyDescriptor>;
using WhenMatched = MergeStrategyDescriptor::WhenMatched;
using WhenNotMatched = MergeStrategyDescriptor::WhenNotMatched;
-using BatchTransform = std::function<void(DocumentSourceMerge::BatchedObjects&)>;
+using BatchTransform = DocumentSourceMerge::BatchTransform;
using UpdateModification = write_ops::UpdateModification;
using UpsertType = MongoProcessInterface::UpsertType;
@@ -86,17 +86,15 @@ const auto kDefaultPipelineLet = BSON("new"
<< "$$ROOT");
/**
- * Creates a merge strategy which uses update semantics to perform a merge operation. If
- * 'BatchTransform' function is provided, it will be called to transform batched objects before
- * passing them to the 'update'.
+ * Creates a merge strategy which uses update semantics to perform a merge operation.
*/
-MergeStrategy makeUpdateStrategy(UpsertType upsert, BatchTransform transform) {
- return [upsert, transform](
- const auto& expCtx, const auto& ns, const auto& wc, auto epoch, auto&& batch) {
- if (transform) {
- transform(batch);
- }
-
+MergeStrategy makeUpdateStrategy() {
+ return [](const auto& expCtx,
+ const auto& ns,
+ const auto& wc,
+ auto epoch,
+ auto&& batch,
+ UpsertType upsert) {
constexpr auto multi = false;
uassertStatusOK(expCtx->mongoProcessInterface->update(
expCtx, ns, std::move(batch), wc, upsert, multi, epoch));
@@ -108,16 +106,15 @@ MergeStrategy makeUpdateStrategy(UpsertType upsert, BatchTransform transform) {
* that each document in the batch has a matching document in the 'ns' collection (note that a
* matching document may not be modified as a result of an update operation, yet it still will be
* counted as matching). If at least one document doesn't have a match, this strategy returns an
- * error. If 'BatchTransform' function is provided, it will be called to transform batched objects
- * before passing them to the 'update'.
+ * error.
*/
-MergeStrategy makeStrictUpdateStrategy(UpsertType upsert, BatchTransform transform) {
- return [upsert, transform](
- const auto& expCtx, const auto& ns, const auto& wc, auto epoch, auto&& batch) {
- if (transform) {
- transform(batch);
- }
-
+MergeStrategy makeStrictUpdateStrategy() {
+ return [](const auto& expCtx,
+ const auto& ns,
+ const auto& wc,
+ auto epoch,
+ auto&& batch,
+ UpsertType upsert) {
const int64_t batchSize = batch.size();
constexpr auto multi = false;
auto updateResult = uassertStatusOK(expCtx->mongoProcessInterface->update(
@@ -133,7 +130,12 @@ MergeStrategy makeStrictUpdateStrategy(UpsertType upsert, BatchTransform transfo
* Creates a merge strategy which uses insert semantics to perform a merge operation.
*/
MergeStrategy makeInsertStrategy() {
- return [](const auto& expCtx, const auto& ns, const auto& wc, auto epoch, auto&& batch) {
+ return [](const auto& expCtx,
+ const auto& ns,
+ const auto& wc,
+ auto epoch,
+ auto&& batch,
+ UpsertType upsertType) {
std::vector<BSONObj> objectsToInsert(batch.size());
// The batch stores replacement style updates, but for this "insert" style of $merge we'd
// like to just insert the new document without attempting any sort of replacement.
@@ -146,15 +148,13 @@ MergeStrategy makeInsertStrategy() {
}
/**
- * Creates a batched objects transformation function which wraps each element of the
- * 'batch.modifications' array into the given 'updateOp' operator.
+ * Creates a batched object transformation function which wraps 'obj' into the given 'updateOp'
+ * operator.
*/
BatchTransform makeUpdateTransform(const std::string& updateOp) {
- return [updateOp](auto& batch) {
- for (auto&& obj : batch) {
- std::get<UpdateModification>(obj) = UpdateModification::parseFromClassicUpdate(
- BSON(updateOp << std::get<UpdateModification>(obj).getUpdateReplacement()));
- }
+ return [updateOp](auto& obj) {
+ std::get<UpdateModification>(obj) = UpdateModification::parseFromClassicUpdate(
+ BSON(updateOp << std::get<UpdateModification>(obj).getUpdateReplacement()));
};
}
@@ -178,48 +178,67 @@ const MergeStrategyDescriptorsMap& getDescriptors() {
{kReplaceInsertMode,
{kReplaceInsertMode,
{ActionType::insert, ActionType::update},
- makeUpdateStrategy(UpsertType::kGenerateNewDoc, {})}},
+ makeUpdateStrategy(),
+ {},
+ UpsertType::kGenerateNewDoc}},
// whenMatched: replace, whenNotMatched: fail
{kReplaceFailMode,
- {kReplaceFailMode, {ActionType::update}, makeStrictUpdateStrategy(UpsertType::kNone, {})}},
+ {kReplaceFailMode,
+ {ActionType::update},
+ makeStrictUpdateStrategy(),
+ {},
+ UpsertType::kNone}},
// whenMatched: replace, whenNotMatched: discard
{kReplaceDiscardMode,
- {kReplaceDiscardMode, {ActionType::update}, makeUpdateStrategy(UpsertType::kNone, {})}},
+ {kReplaceDiscardMode, {ActionType::update}, makeUpdateStrategy(), {}, UpsertType::kNone}},
// whenMatched: merge, whenNotMatched: insert
{kMergeInsertMode,
{kMergeInsertMode,
{ActionType::insert, ActionType::update},
- makeUpdateStrategy(UpsertType::kGenerateNewDoc, makeUpdateTransform("$set"))}},
+ makeUpdateStrategy(),
+ makeUpdateTransform("$set"),
+ UpsertType::kGenerateNewDoc}},
// whenMatched: merge, whenNotMatched: fail
{kMergeFailMode,
{kMergeFailMode,
{ActionType::update},
- makeStrictUpdateStrategy(UpsertType::kNone, makeUpdateTransform("$set"))}},
+ makeStrictUpdateStrategy(),
+ makeUpdateTransform("$set"),
+ UpsertType::kNone}},
// whenMatched: merge, whenNotMatched: discard
{kMergeDiscardMode,
{kMergeDiscardMode,
{ActionType::update},
- makeUpdateStrategy(UpsertType::kNone, makeUpdateTransform("$set"))}},
+ makeUpdateStrategy(),
+ makeUpdateTransform("$set"),
+ UpsertType::kNone}},
// whenMatched: keepExisting, whenNotMatched: insert
{kKeepExistingInsertMode,
{kKeepExistingInsertMode,
{ActionType::insert, ActionType::update},
- makeUpdateStrategy(UpsertType::kGenerateNewDoc, makeUpdateTransform("$setOnInsert"))}},
+ makeUpdateStrategy(),
+ makeUpdateTransform("$setOnInsert"),
+ UpsertType::kGenerateNewDoc}},
// whenMatched: [pipeline], whenNotMatched: insert
{kPipelineInsertMode,
{kPipelineInsertMode,
{ActionType::insert, ActionType::update},
- makeUpdateStrategy(UpsertType::kInsertSuppliedDoc, {})}},
+ makeUpdateStrategy(),
+ {},
+ UpsertType::kInsertSuppliedDoc}},
// whenMatched: [pipeline], whenNotMatched: fail
{kPipelineFailMode,
{kPipelineFailMode,
{ActionType::update},
- makeStrictUpdateStrategy(UpsertType::kNone, {})}},
+ makeStrictUpdateStrategy(),
+ {},
+ UpsertType::kNone}},
// whenMatched: [pipeline], whenNotMatched: discard
{kPipelineDiscardMode,
- {kPipelineDiscardMode, {ActionType::update}, makeUpdateStrategy(UpsertType::kNone, {})}},
+ {kPipelineDiscardMode, {ActionType::update}, makeUpdateStrategy(), {}, UpsertType::kNone}},
// whenMatched: fail, whenNotMatched: insert
- {kFailInsertMode, {kFailInsertMode, {ActionType::insert}, makeInsertStrategy()}}};
+ {kFailInsertMode,
+ {kFailInsertMode, {ActionType::insert}, makeInsertStrategy(), {}, UpsertType::kNone}}};
return mergeStrategyDescriptors;
}
@@ -552,8 +571,14 @@ std::pair<DocumentSourceMerge::BatchObject, int> DocumentSourceMerge::makeBatchO
auto mergeOnFields = extractMergeOnFieldsFromDoc(doc, _mergeOnFields);
auto mod = makeBatchUpdateModification(doc);
auto vars = resolveLetVariablesIfNeeded(doc);
- auto modSize = mod.objsize() + (vars ? vars->objsize() : 0);
- return {{std::move(mergeOnFields), std::move(mod), std::move(vars)}, modSize};
+ BatchObject batchObject{std::move(mergeOnFields), std::move(mod), std::move(vars)};
+ if (_descriptor.transform) {
+ _descriptor.transform(batchObject);
+ }
+
+ tassert(6628901, "_writeSizeEstimator should be initialized", _writeSizeEstimator);
+ return {batchObject,
+ _writeSizeEstimator->estimateUpdateSizeBytes(batchObject, _descriptor.upsertType)};
}
void DocumentSourceMerge::spill(BatchedObjects&& batch) try {
@@ -562,7 +587,8 @@ void DocumentSourceMerge::spill(BatchedObjects&& batch) try {
? boost::optional<OID>(_targetCollectionVersion->epoch())
: boost::none;
- _descriptor.strategy(pExpCtx, _outputNs, _writeConcern, targetEpoch, std::move(batch));
+ _descriptor.strategy(
+ pExpCtx, _outputNs, _writeConcern, targetEpoch, std::move(batch), _descriptor.upsertType);
} catch (const ExceptionFor<ErrorCodes::ImmutableField>& ex) {
uassertStatusOKWithContext(ex.toStatus(),
"$merge failed to update the matching document, did you "
diff --git a/src/mongo/db/pipeline/document_source_merge.h b/src/mongo/db/pipeline/document_source_merge.h
index 05a87f1e340..e3784ae545c 100644
--- a/src/mongo/db/pipeline/document_source_merge.h
+++ b/src/mongo/db/pipeline/document_source_merge.h
@@ -44,24 +44,31 @@ class DocumentSourceMerge final : public DocumentSourceWriter<MongoProcessInterf
public:
static constexpr StringData kStageName = "$merge"_sd;
- // A descriptor for a merge strategy. Holds a merge strategy function and a set of actions
- // the client should be authorized to perform in order to be able to execute a merge operation
- // using this merge strategy.
+ using BatchTransform = std::function<void(MongoProcessInterface::BatchObject&)>;
+
+ // A descriptor for a merge strategy. Holds a merge strategy function and a set of actions the
+ // client should be authorized to perform in order to be able to execute a merge operation using
+ // this merge strategy. If a 'BatchTransform' function is provided, it will be called when
+ // constructing a batch object to transform updates.
struct MergeStrategyDescriptor {
using WhenMatched = MergeWhenMatchedModeEnum;
using WhenNotMatched = MergeWhenNotMatchedModeEnum;
using MergeMode = std::pair<WhenMatched, WhenNotMatched>;
+ using UpsertType = MongoProcessInterface::UpsertType;
// A function encapsulating a merge strategy for the $merge stage based on the pair of
// whenMatched/whenNotMatched modes.
using MergeStrategy = std::function<void(const boost::intrusive_ptr<ExpressionContext>&,
const NamespaceString&,
const WriteConcernOptions&,
boost::optional<OID>,
- BatchedObjects&&)>;
+ BatchedObjects&&,
+ UpsertType upsert)>;
MergeMode mode;
ActionSet actions;
MergeStrategy strategy;
+ BatchTransform transform;
+ UpsertType upsertType;
};
/**
diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h
index 88883cb6231..602db5815f7 100644
--- a/src/mongo/db/pipeline/document_source_out.h
+++ b/src/mongo/db/pipeline/document_source_out.h
@@ -131,7 +131,8 @@ private:
std::pair<BSONObj, int> makeBatchObject(Document&& doc) const override {
auto obj = doc.toBson();
- return {obj, obj.objsize()};
+ tassert(6628900, "_writeSizeEstimator should be initialized", _writeSizeEstimator);
+ return {obj, _writeSizeEstimator->estimateInsertSizeBytes(obj)};
}
void waitWhileFailPointEnabled() override;
diff --git a/src/mongo/db/pipeline/document_source_writer.h b/src/mongo/db/pipeline/document_source_writer.h
index 1965bd4dee6..284699deb30 100644
--- a/src/mongo/db/pipeline/document_source_writer.h
+++ b/src/mongo/db/pipeline/document_source_writer.h
@@ -104,7 +104,9 @@ public:
const boost::intrusive_ptr<ExpressionContext>& expCtx)
: DocumentSource(stageName, expCtx),
_outputNs(std::move(outputNs)),
- _writeConcern(expCtx->opCtx->getWriteConcern()) {}
+ _writeConcern(expCtx->opCtx->getWriteConcern()),
+ _writeSizeEstimator(
+ expCtx->mongoProcessInterface->getWriteSizeEstimator(expCtx->opCtx, outputNs)) {}
DepsTracker::State getDependencies(DepsTracker* deps) const override {
deps->needWholeDocument = true;
@@ -169,6 +171,9 @@ protected:
// respect the writeConcern of the original command.
WriteConcernOptions _writeConcern;
+ // An interface that is used to estimate the size of each write operation.
+ const std::unique_ptr<MongoProcessInterface::WriteSizeEstimator> _writeSizeEstimator;
+
private:
bool _initialized{false};
bool _done{false};
diff --git a/src/mongo/db/pipeline/process_interface/common_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_process_interface.cpp
index 0987e83a6ee..8e56f390b13 100644
--- a/src/mongo/db/pipeline/process_interface/common_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/common_process_interface.cpp
@@ -152,6 +152,12 @@ std::vector<FieldPath> CommonProcessInterface::collectDocumentKeyFieldsActingAsR
return {"_id"};
}
+std::unique_ptr<CommonProcessInterface::WriteSizeEstimator>
+CommonProcessInterface::getWriteSizeEstimator(OperationContext* opCtx,
+ const NamespaceString& ns) const {
+ return std::make_unique<LocalWriteSizeEstimator>();
+}
+
void CommonProcessInterface::updateClientOperationTime(OperationContext* opCtx) const {
// In order to support causal consistency in a replica set or a sharded cluster when reading
// with secondary read preference, the secondary must propagate the primary's operation time
diff --git a/src/mongo/db/pipeline/process_interface/common_process_interface.h b/src/mongo/db/pipeline/process_interface/common_process_interface.h
index efaf292f796..ea18e45b50a 100644
--- a/src/mongo/db/pipeline/process_interface/common_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/common_process_interface.h
@@ -46,6 +46,48 @@ public:
virtual ~CommonProcessInterface() = default;
/**
+ * Estimates the size of writes that will be executed on the current node. Note that this
+ * does not account for the full size of an update statement.
+ */
+ class LocalWriteSizeEstimator final : public WriteSizeEstimator {
+ public:
+ int estimateInsertSizeBytes(const BSONObj& insert) const override {
+ return insert.objsize();
+ }
+
+ int estimateUpdateSizeBytes(const BatchObject& batchObject,
+ UpsertType type) const override {
+ int size = std::get<write_ops::UpdateModification>(batchObject).objsize();
+ if (auto vars = std::get<boost::optional<BSONObj>>(batchObject)) {
+ size += vars->objsize();
+ }
+ return size;
+ }
+ };
+
+ /**
+ * Estimate the size of writes that will be sent to the replica set primary.
+ */
+ class TargetPrimaryWriteSizeEstimator final : public WriteSizeEstimator {
+ public:
+ int estimateInsertSizeBytes(const BSONObj& insert) const override {
+ return insert.objsize() + write_ops::kWriteCommandBSONArrayPerElementOverheadBytes;
+ }
+
+ int estimateUpdateSizeBytes(const BatchObject& batchObject,
+ UpsertType type) const override {
+ return getUpdateSizeEstimate(std::get<BSONObj>(batchObject),
+ std::get<write_ops::UpdateModification>(batchObject),
+ std::get<boost::optional<BSONObj>>(batchObject),
+ type != UpsertType::kNone /* includeUpsertSupplied */,
+ boost::none /* collation */,
+ boost::none /* arrayFilters */,
+ BSONObj() /* hint*/) +
+ write_ops::kWriteCommandBSONArrayPerElementOverheadBytes;
+ }
+ };
+
+ /**
* Returns true if the field names of 'keyPattern' are exactly those in 'uniqueKeyPaths', and
* each of the elements of 'keyPattern' is numeric, i.e. not "text", "$**", or any other special
* type of index.
@@ -64,6 +106,9 @@ public:
virtual std::vector<FieldPath> collectDocumentKeyFieldsActingAsRouter(
OperationContext*, const NamespaceString&) const override;
+ std::unique_ptr<WriteSizeEstimator> getWriteSizeEstimator(
+ OperationContext* opCtx, const NamespaceString& ns) const override;
+
virtual void updateClientOperationTime(OperationContext* opCtx) const final;
boost::optional<ShardVersion> refreshAndGetCollectionVersion(
diff --git a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
index 3ca0029411f..1f2c73d9c6e 100644
--- a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
@@ -78,7 +78,7 @@ class MongoProcessInterface {
public:
/**
* Storage for a batch of BSON Objects to be updated in the write namespace. For each element
- * in the batch we store a tuple of the folliwng elements:
+ * in the batch we store a tuple of the following elements:
* 1. BSONObj - specifies the query that identifies a document in the to collection to be
* updated.
* 2. write_ops::UpdateModification - either the new document we want to upsert or insert into
@@ -106,6 +106,19 @@ public:
enum class CurrentOpBacktraceMode { kIncludeBacktrace, kExcludeBacktrace };
/**
+ * Interface which estimates the size of a given write operation.
+ */
+ class WriteSizeEstimator {
+ public:
+ virtual ~WriteSizeEstimator() = default;
+
+ virtual int estimateInsertSizeBytes(const BSONObj& insert) const = 0;
+
+ virtual int estimateUpdateSizeBytes(const BatchObject& batchObject,
+ UpsertType type) const = 0;
+ };
+
+ /**
* Factory function to create MongoProcessInterface of the right type. The implementation will
* be installed by a lib higher up in the link graph depending on the application type.
*/
@@ -127,6 +140,12 @@ public:
virtual ~MongoProcessInterface(){};
/**
+ * Returns an instance of a 'WriteSizeEstimator' interface.
+ */
+ virtual std::unique_ptr<WriteSizeEstimator> getWriteSizeEstimator(
+ OperationContext* opCtx, const NamespaceString& ns) const = 0;
+
+ /**
* Creates a new TransactionHistoryIterator object. Only applicable in processes which support
* locally traversing the oplog.
*/
diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp
index 117fa156d71..1e46ea9117c 100644
--- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp
@@ -101,6 +101,12 @@ bool supportsUniqueKey(const boost::intrusive_ptr<ExpressionContext>& expCtx,
} // namespace
+std::unique_ptr<CommonProcessInterface::WriteSizeEstimator>
+MongosProcessInterface::getWriteSizeEstimator(OperationContext* opCtx,
+ const NamespaceString& ns) const {
+ return std::make_unique<TargetPrimaryWriteSizeEstimator>();
+}
+
std::unique_ptr<Pipeline, PipelineDeleter> MongosProcessInterface::attachCursorSourceToPipeline(
Pipeline* ownedPipeline,
ShardTargetingPolicy shardTargetingPolicy,
diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h
index 3bb10f83f90..786440cb5af 100644
--- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h
@@ -45,6 +45,9 @@ public:
virtual ~MongosProcessInterface() = default;
+ std::unique_ptr<WriteSizeEstimator> getWriteSizeEstimator(
+ OperationContext* opCtx, const NamespaceString& ns) const final;
+
boost::optional<Document> lookupSingleDocument(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& nss,
diff --git a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h
index 7d72c064123..02c183d43a0 100644
--- a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h
@@ -43,6 +43,15 @@ class ReplicaSetNodeProcessInterface final : public NonShardServerProcessInterfa
public:
using NonShardServerProcessInterface::NonShardServerProcessInterface;
+ std::unique_ptr<WriteSizeEstimator> getWriteSizeEstimator(
+ OperationContext* opCtx, const NamespaceString& ns) const override {
+ if (_canWriteLocally(opCtx, ns)) {
+ return std::make_unique<LocalWriteSizeEstimator>();
+ } else {
+ return std::make_unique<TargetPrimaryWriteSizeEstimator>();
+ }
+ }
+
static std::shared_ptr<executor::TaskExecutor> getReplicaSetNodeExecutor(
ServiceContext* service);
diff --git a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
index 7b96bb53ed0..813684b8f8e 100644
--- a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
@@ -52,6 +52,22 @@ public:
MONGO_UNREACHABLE;
}
+ class StubWriteSizeEstimator final : public WriteSizeEstimator {
+ public:
+ int estimateInsertSizeBytes(const BSONObj& insert) const override {
+ MONGO_UNREACHABLE;
+ }
+
+ int estimateUpdateSizeBytes(const BatchObject& batchObject,
+ UpsertType type) const override {
+ MONGO_UNREACHABLE;
+ }
+ };
+ std::unique_ptr<WriteSizeEstimator> getWriteSizeEstimator(
+ OperationContext* opCtx, const NamespaceString& ns) const override {
+ return std::make_unique<StubWriteSizeEstimator>();
+ }
+
bool isSharded(OperationContext* opCtx, const NamespaceString& ns) override {
return false;
}
diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp
index 4c2061dd07f..26f9a4d3488 100644
--- a/src/mongo/s/write_ops/batch_write_op.cpp
+++ b/src/mongo/s/write_ops/batch_write_op.cpp
@@ -59,7 +59,6 @@ struct WriteErrorComp {
// batches before serializing.
//
// TODO: Revisit when we revisit command limits in general
-const int kEstUpdateOverheadBytes = (BSONObjMaxInternalSize - BSONObjMaxUserSize) / 100;
const int kEstDeleteOverheadBytes = (BSONObjMaxInternalSize - BSONObjMaxUserSize) / 100;
/**
@@ -159,51 +158,17 @@ int getWriteSizeBytes(const WriteOp& writeOp) {
return item.getDocument().objsize();
} else if (batchType == BatchedCommandRequest::BatchType_Update) {
// Note: Be conservative here - it's okay if we send slightly too many batches.
- auto estSize = static_cast<int>(BSONObj::kMinBSONLength);
- static const auto boolSize = 1;
-
- // Add the size of the 'collation' field, if present.
- estSize += !item.getUpdate().getCollation() ? 0
- : (UpdateOpEntry::kCollationFieldName.size() +
- item.getUpdate().getCollation()->objsize());
-
- // Add the size of the 'arrayFilters' field, if present.
- estSize += !item.getUpdate().getArrayFilters() ? 0 : ([&item]() {
- auto size = BSONObj::kMinBSONLength + UpdateOpEntry::kArrayFiltersFieldName.size();
- for (auto&& filter : *item.getUpdate().getArrayFilters()) {
- size += filter.objsize();
- }
- return size;
- })();
-
- // Add the sizes of the 'multi' and 'upsert' fields.
- estSize += UpdateOpEntry::kUpsertFieldName.size() + boolSize;
- estSize += UpdateOpEntry::kMultiFieldName.size() + boolSize;
-
- // Add the size of 'upsertSupplied' field if present.
- if (auto upsertSupplied = item.getUpdate().getUpsertSupplied()) {
- estSize += UpdateOpEntry::kUpsertSuppliedFieldName.size() + boolSize;
- }
-
- // Add the sizes of the 'q' and 'u' fields.
- estSize += (UpdateOpEntry::kQFieldName.size() + item.getUpdate().getQ().objsize() +
- UpdateOpEntry::kUFieldName.size() + item.getUpdate().getU().objsize());
-
- // Add the size of the 'c' field if present.
- if (auto constants = item.getUpdate().getC()) {
- estSize += UpdateOpEntry::kCFieldName.size() + item.getUpdate().getC()->objsize();
- }
-
- // Add the size of 'hint' field if present.
- if (auto hint = item.getUpdate().getHint(); !hint.isEmpty()) {
- estSize += UpdateOpEntry::kHintFieldName.size() + hint.objsize();
- }
-
- // Finally, add the constant updateOp overhead size.
- estSize += kEstUpdateOverheadBytes;
+ const auto& update = item.getUpdate();
+ auto estSize = write_ops::getUpdateSizeEstimate(update.getQ(),
+ update.getU(),
+ update.getC(),
+ update.getUpsertSupplied().has_value(),
+ update.getCollation(),
+ update.getArrayFilters(),
+ update.getHint());
// When running a debug build, verify that estSize is at least the BSON serialization size.
- dassert(estSize >= item.getUpdate().toBSON().objsize());
+ dassert(estSize >= update.toBSON().objsize());
return estSize;
} else if (batchType == BatchedCommandRequest::BatchType_Delete) {
// Note: Be conservative here - it's okay if we send slightly too many batches.