summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorUladzimir Makouski <uladzimir.makouski@mongodb.com>2022-07-28 09:42:17 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-07-28 10:48:12 +0000
commit4e9dd74af2467b9c04e7014c0f754465321da8a9 (patch)
tree6af10dea03d18cee43ac130e5526438048504457
parent042a60c450b5f48a27171cf4150a5bf850efc79a (diff)
downloadmongo-4e9dd74af2467b9c04e7014c0f754465321da8a9.tar.gz
Revert "SERVER-66289 Update write size estimation logic in DocumentSourceWriter"
This reverts commit 7b7fe658db948e6f5a4a6c30d4590d7866c59371.
-rw-r--r--jstests/aggregation/sources/merge/mode_merge_fail.js8
-rw-r--r--jstests/aggregation/sources/merge/mode_replace_fail.js8
-rw-r--r--jstests/noPassthrough/out_merge_on_secondary_batch_write.js111
-rw-r--r--src/mongo/db/ops/write_ops.cpp59
-rw-r--r--src/mongo/db/ops/write_ops.h12
-rw-r--r--src/mongo/db/pipeline/document_source_merge.cpp110
-rw-r--r--src/mongo/db/pipeline/document_source_merge.h15
-rw-r--r--src/mongo/db/pipeline/document_source_out.h3
-rw-r--r--src/mongo/db/pipeline/document_source_writer.h7
-rw-r--r--src/mongo/db/pipeline/process_interface/common_process_interface.cpp6
-rw-r--r--src/mongo/db/pipeline/process_interface/common_process_interface.h45
-rw-r--r--src/mongo/db/pipeline/process_interface/mongo_process_interface.h21
-rw-r--r--src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp6
-rw-r--r--src/mongo/db/pipeline/process_interface/mongos_process_interface.h3
-rw-r--r--src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h9
-rw-r--r--src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h16
-rw-r--r--src/mongo/s/write_ops/batch_write_op.cpp53
17 files changed, 97 insertions, 395 deletions
diff --git a/jstests/aggregation/sources/merge/mode_merge_fail.js b/jstests/aggregation/sources/merge/mode_merge_fail.js
index fcc13f8e871..7235c8e1c7e 100644
--- a/jstests/aggregation/sources/merge/mode_merge_fail.js
+++ b/jstests/aggregation/sources/merge/mode_merge_fail.js
@@ -97,13 +97,9 @@ const pipeline = [mergeStage];
// and updated.
(function testMergeUnorderedBatchUpdate() {
const maxBatchSize = 16 * 1024 * 1024; // 16MB
-
- // Each document is just under 1MB in order to allow for some extra space for writes that need
- // to be serialized over the wire in certain cluster configurations. Otherwise, the number of
- // modified/unmodified documents would be off by one depending on how our cluster is configured.
- const docSize = 1024 * 1023;
+ const docSize = 1024 * 1024; // 1MB
const numDocs = 20;
- const maxDocsInBatch = Math.floor(maxBatchSize / docSize);
+ const maxDocsInBatch = maxBatchSize / docSize;
assert(source.drop());
dropWithoutImplicitRecreate(target.getName());
diff --git a/jstests/aggregation/sources/merge/mode_replace_fail.js b/jstests/aggregation/sources/merge/mode_replace_fail.js
index cc3df1b1ce7..88582238c8f 100644
--- a/jstests/aggregation/sources/merge/mode_replace_fail.js
+++ b/jstests/aggregation/sources/merge/mode_replace_fail.js
@@ -90,13 +90,9 @@ const pipeline = [mergeStage];
// and updated.
(function testMergeUnorderedBatchUpdate() {
const maxBatchSize = 16 * 1024 * 1024; // 16MB
-
- // Each document is just under 1MB in order to allow for some extra space for writes that need
- // to be serialized over the wire in certain cluster configurations. Otherwise, the number of
- // modified/unmodified documents would be off by one depending on how our cluster is configured.
- const docSize = 1024 * 1023; // 1MB
+ const docSize = 1024 * 1024; // 1MB
const numDocs = 20;
- const maxDocsInBatch = Math.floor(maxBatchSize / docSize);
+ const maxDocsInBatch = maxBatchSize / docSize;
assert(source.drop());
dropWithoutImplicitRecreate(target.getName());
diff --git a/jstests/noPassthrough/out_merge_on_secondary_batch_write.js b/jstests/noPassthrough/out_merge_on_secondary_batch_write.js
deleted file mode 100644
index 56dd050630c..00000000000
--- a/jstests/noPassthrough/out_merge_on_secondary_batch_write.js
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Test which verifies that $out/$merge aggregations with secondary read preference which write
- * over 16 MB work as expected (especially with respect to producing correctly sized write batches).
- *
- * @tags: [uses_$out, assumes_read_preference_unchanged]
- */
-(function() {
-const dbName = "db";
-const collName = "movies";
-const targetCollName = "movies2";
-
-function testFn(db) {
- const coll = db[collName];
- coll.drop();
- db[targetCollName].drop();
-
- // Insert 4 MB more than the maximum bytes allowed in a single write batch worth of data
- // serialized as a single BSONObj.
- const hello = db.hello();
- const maxBatchSize = hello.maxWriteBatchSize;
- const totalDataSize = hello.maxBsonObjectSize + (4 * 1024 * 1024);
- const sizePerDoc = totalDataSize / maxBatchSize;
- const bigString = "a".repeat(sizePerDoc);
- const bulk = coll.initializeUnorderedBulkOp();
-
- for (let i = 0; i < maxBatchSize; ++i) {
- bulk.insert({_id: NumberInt(i), foo: bigString});
- }
- assert.commandWorked(bulk.execute({w: "majority"}));
-
- function defaultSetUpFn(db) {
- db[targetCollName].drop({writeConcern: {w: "majority"}});
- }
-
- function cleanUpFn(db) {
- db[targetCollName].drop({writeConcern: {w: "majority"}});
- }
-
- function testWriteAggSpec(aggWriteStageSpec, setUpFn, errorCodes = []) {
- // Run 'aggWriteStageSpec' with both primary and secondary read preference.
- for (const readPref of ["primary", "secondary"]) {
- jsTestLog("Testing " + tojson(aggWriteStageSpec) + " with read preference " + readPref);
- setUpFn(db);
-
- // If the caller provided some error codes, assert that the command failed with one
- // of these codes.
- const fn = () =>
- db[collName]
- .aggregate([aggWriteStageSpec], {$readPreference: {mode: readPref}})
- .itcount();
- const errMsg = "Failed to run aggregate with read preference " + readPref;
- if (errorCodes.length > 0) {
- assert.throwsWithCode(fn, errorCodes, [] /* params */, errMsg);
- } else {
- assert.doesNotThrow(fn, [] /* params */, errMsg);
- }
- cleanUpFn(db);
- }
- }
-
- // Set up documents in the output collection so that $merge will perform updates.
- function mergeUpdateSetupFn(db) {
- defaultSetUpFn(db);
- const bulk = db[targetCollName].initializeUnorderedBulkOp();
- for (let i = 0; i < maxBatchSize; ++i) {
- bulk.insert({_id: NumberInt(i), extraField: i * 3});
- }
- assert.commandWorked(bulk.execute({w: "majority"}));
- }
-
- testWriteAggSpec({$out: targetCollName}, defaultSetUpFn);
- testWriteAggSpec(
- {$merge: {into: targetCollName, whenMatched: "replace", whenNotMatched: "insert"}},
- defaultSetUpFn);
- testWriteAggSpec(
- {$merge: {into: targetCollName, whenMatched: "merge", whenNotMatched: "insert"}},
- defaultSetUpFn);
- testWriteAggSpec(
- {$merge: {into: targetCollName, whenMatched: "keepExisting", whenNotMatched: "insert"}},
- defaultSetUpFn);
- testWriteAggSpec(
- {$merge: {into: targetCollName, whenMatched: "merge", whenNotMatched: "insert", on: "_id"}},
- mergeUpdateSetupFn);
-
- // Failure cases.
- testWriteAggSpec(
- {$merge: {into: targetCollName, whenMatched: "replace", whenNotMatched: "fail", on: "_id"}},
- defaultSetUpFn,
- [ErrorCodes.MergeStageNoMatchingDocument]);
- testWriteAggSpec(
- {$merge: {into: targetCollName, whenMatched: "merge", whenNotMatched: "fail", on: "_id"}},
- defaultSetUpFn,
- [ErrorCodes.MergeStageNoMatchingDocument]);
- testWriteAggSpec(
- {$merge: {into: targetCollName, whenMatched: "fail", whenNotMatched: "insert", on: "_id"}},
- mergeUpdateSetupFn,
- [ErrorCodes.DuplicateKey]);
-}
-
-jsTestLog("Testing against a replica set");
-const rst = new ReplSetTest({nodes: 2});
-rst.startSet();
-rst.initiate();
-testFn(new Mongo(rst.getURL()).getDB(dbName));
-rst.stopSet();
-
-jsTestLog("Testing against a sharded cluster");
-const st = new ShardingTest({shards: 1, rs: {nodes: 2}});
-testFn(st.s.getDB(dbName));
-st.stop();
-}());
diff --git a/src/mongo/db/ops/write_ops.cpp b/src/mongo/db/ops/write_ops.cpp
index f0ee04348b2..0ef8f329558 100644
--- a/src/mongo/db/ops/write_ops.cpp
+++ b/src/mongo/db/ops/write_ops.cpp
@@ -136,65 +136,6 @@ int32_t getStmtIdForWriteAt(const WriteCommandRequestBase& writeCommandBase, siz
return kFirstStmtId + writePos;
}
-int getUpdateSizeEstimate(const BSONObj& q,
- const write_ops::UpdateModification& u,
- const boost::optional<mongo::BSONObj>& c,
- const bool includeUpsertSupplied,
- const boost::optional<mongo::BSONObj>& collation,
- const boost::optional<std::vector<mongo::BSONObj>>& arrayFilters,
- const mongo::BSONObj& hint) {
- using UpdateOpEntry = write_ops::UpdateOpEntry;
-
- // This constant accounts for the null terminator in each field name and the BSONType byte for
- // each element.
- static const int kPerElementOverhead = 2;
- static const int kBoolSize = 1;
- int estSize = static_cast<int>(BSONObj::kMinBSONLength);
-
- // Add the sizes of the 'multi' and 'upsert' fields.
- estSize += UpdateOpEntry::kUpsertFieldName.size() + kBoolSize + kPerElementOverhead;
- estSize += UpdateOpEntry::kMultiFieldName.size() + kBoolSize + kPerElementOverhead;
-
- // Add the size of 'upsertSupplied' field if present.
- if (includeUpsertSupplied) {
- estSize += UpdateOpEntry::kUpsertSuppliedFieldName.size() + kBoolSize + kPerElementOverhead;
- }
-
- // Add the sizes of the 'q' and 'u' fields.
- estSize += (UpdateOpEntry::kQFieldName.size() + q.objsize() + kPerElementOverhead +
- UpdateOpEntry::kUFieldName.size() + u.objsize() + kPerElementOverhead);
-
- // Add the size of the 'c' field, if present.
- if (c) {
- estSize += (UpdateOpEntry::kCFieldName.size() + c->objsize() + kPerElementOverhead);
- }
-
- // Add the size of the 'collation' field, if present.
- if (collation) {
- estSize += (UpdateOpEntry::kCollationFieldName.size() + collation->objsize() +
- kPerElementOverhead);
- }
-
- // Add the size of the 'arrayFilters' field, if present.
- if (arrayFilters) {
- estSize += ([&]() {
- auto size = BSONObj::kMinBSONLength + UpdateOpEntry::kArrayFiltersFieldName.size() +
- kPerElementOverhead;
- for (auto&& filter : *arrayFilters) {
- size += filter.objsize();
- }
- return size;
- })();
- }
-
- // Add the size of 'hint' field if present.
- if (!hint.isEmpty()) {
- estSize += UpdateOpEntry::kHintFieldName.size() + hint.objsize() + kPerElementOverhead;
- }
-
- return estSize;
-}
-
bool isClassicalUpdateReplacement(const BSONObj& update) {
// An empty update object will be treated as replacement as firstElementFieldName() returns "".
return update.firstElementFieldName()[0] != '$';
diff --git a/src/mongo/db/ops/write_ops.h b/src/mongo/db/ops/write_ops.h
index a1201412a33..d78791fdfdc 100644
--- a/src/mongo/db/ops/write_ops.h
+++ b/src/mongo/db/ops/write_ops.h
@@ -104,18 +104,6 @@ const std::vector<BSONObj>& arrayFiltersOf(const T& opEntry) {
}
/**
- * Utility which estimates the size in bytes of an update statement with the given parameters, when
- * serialized in the format used for the update command.
- */
-int getUpdateSizeEstimate(const BSONObj& q,
- const write_ops::UpdateModification& u,
- const boost::optional<mongo::BSONObj>& c,
- bool includeUpsertSupplied,
- const boost::optional<mongo::BSONObj>& collation,
- const boost::optional<std::vector<mongo::BSONObj>>& arrayFilters,
- const mongo::BSONObj& hint);
-
-/**
* If the response from a write command contains any write errors, it will throw the first one. All
* the remaining errors will be disregarded.
*
diff --git a/src/mongo/db/pipeline/document_source_merge.cpp b/src/mongo/db/pipeline/document_source_merge.cpp
index 28bb4e31ff3..ae413052ddf 100644
--- a/src/mongo/db/pipeline/document_source_merge.cpp
+++ b/src/mongo/db/pipeline/document_source_merge.cpp
@@ -62,7 +62,7 @@ using MergeStrategy = MergeStrategyDescriptor::MergeStrategy;
using MergeStrategyDescriptorsMap = std::map<const MergeMode, const MergeStrategyDescriptor>;
using WhenMatched = MergeStrategyDescriptor::WhenMatched;
using WhenNotMatched = MergeStrategyDescriptor::WhenNotMatched;
-using BatchTransform = DocumentSourceMerge::BatchTransform;
+using BatchTransform = std::function<void(DocumentSourceMerge::BatchedObjects&)>;
using UpdateModification = write_ops::UpdateModification;
using UpsertType = MongoProcessInterface::UpsertType;
@@ -86,15 +86,17 @@ const auto kDefaultPipelineLet = BSON("new"
<< "$$ROOT");
/**
- * Creates a merge strategy which uses update semantics to perform a merge operation.
+ * Creates a merge strategy which uses update semantics to perform a merge operation. If
+ * 'BatchTransform' function is provided, it will be called to transform batched objects before
+ * passing them to the 'update'.
*/
-MergeStrategy makeUpdateStrategy() {
- return [](const auto& expCtx,
- const auto& ns,
- const auto& wc,
- auto epoch,
- auto&& batch,
- UpsertType upsert) {
+MergeStrategy makeUpdateStrategy(UpsertType upsert, BatchTransform transform) {
+ return [upsert, transform](
+ const auto& expCtx, const auto& ns, const auto& wc, auto epoch, auto&& batch) {
+ if (transform) {
+ transform(batch);
+ }
+
constexpr auto multi = false;
uassertStatusOK(expCtx->mongoProcessInterface->update(
expCtx, ns, std::move(batch), wc, upsert, multi, epoch));
@@ -106,15 +108,16 @@ MergeStrategy makeUpdateStrategy() {
* that each document in the batch has a matching document in the 'ns' collection (note that a
* matching document may not be modified as a result of an update operation, yet it still will be
* counted as matching). If at least one document doesn't have a match, this strategy returns an
- * error.
+ * error. If 'BatchTransform' function is provided, it will be called to transform batched objects
+ * before passing them to the 'update'.
*/
-MergeStrategy makeStrictUpdateStrategy() {
- return [](const auto& expCtx,
- const auto& ns,
- const auto& wc,
- auto epoch,
- auto&& batch,
- UpsertType upsert) {
+MergeStrategy makeStrictUpdateStrategy(UpsertType upsert, BatchTransform transform) {
+ return [upsert, transform](
+ const auto& expCtx, const auto& ns, const auto& wc, auto epoch, auto&& batch) {
+ if (transform) {
+ transform(batch);
+ }
+
const int64_t batchSize = batch.size();
constexpr auto multi = false;
auto updateResult = uassertStatusOK(expCtx->mongoProcessInterface->update(
@@ -130,12 +133,7 @@ MergeStrategy makeStrictUpdateStrategy() {
* Creates a merge strategy which uses insert semantics to perform a merge operation.
*/
MergeStrategy makeInsertStrategy() {
- return [](const auto& expCtx,
- const auto& ns,
- const auto& wc,
- auto epoch,
- auto&& batch,
- UpsertType upsertType) {
+ return [](const auto& expCtx, const auto& ns, const auto& wc, auto epoch, auto&& batch) {
std::vector<BSONObj> objectsToInsert(batch.size());
// The batch stores replacement style updates, but for this "insert" style of $merge we'd
// like to just insert the new document without attempting any sort of replacement.
@@ -148,13 +146,15 @@ MergeStrategy makeInsertStrategy() {
}
/**
- * Creates a batched object transformation function which wraps 'obj' into the given 'updateOp'
- * operator.
+ * Creates a batched objects transformation function which wraps each element of the
+ * 'batch.modifications' array into the given 'updateOp' operator.
*/
BatchTransform makeUpdateTransform(const std::string& updateOp) {
- return [updateOp](auto& obj) {
- std::get<UpdateModification>(obj) = UpdateModification::parseFromClassicUpdate(
- BSON(updateOp << std::get<UpdateModification>(obj).getUpdateReplacement()));
+ return [updateOp](auto& batch) {
+ for (auto&& obj : batch) {
+ std::get<UpdateModification>(obj) = UpdateModification::parseFromClassicUpdate(
+ BSON(updateOp << std::get<UpdateModification>(obj).getUpdateReplacement()));
+ }
};
}
@@ -178,67 +178,48 @@ const MergeStrategyDescriptorsMap& getDescriptors() {
{kReplaceInsertMode,
{kReplaceInsertMode,
{ActionType::insert, ActionType::update},
- makeUpdateStrategy(),
- {},
- UpsertType::kGenerateNewDoc}},
+ makeUpdateStrategy(UpsertType::kGenerateNewDoc, {})}},
// whenMatched: replace, whenNotMatched: fail
{kReplaceFailMode,
- {kReplaceFailMode,
- {ActionType::update},
- makeStrictUpdateStrategy(),
- {},
- UpsertType::kNone}},
+ {kReplaceFailMode, {ActionType::update}, makeStrictUpdateStrategy(UpsertType::kNone, {})}},
// whenMatched: replace, whenNotMatched: discard
{kReplaceDiscardMode,
- {kReplaceDiscardMode, {ActionType::update}, makeUpdateStrategy(), {}, UpsertType::kNone}},
+ {kReplaceDiscardMode, {ActionType::update}, makeUpdateStrategy(UpsertType::kNone, {})}},
// whenMatched: merge, whenNotMatched: insert
{kMergeInsertMode,
{kMergeInsertMode,
{ActionType::insert, ActionType::update},
- makeUpdateStrategy(),
- makeUpdateTransform("$set"),
- UpsertType::kGenerateNewDoc}},
+ makeUpdateStrategy(UpsertType::kGenerateNewDoc, makeUpdateTransform("$set"))}},
// whenMatched: merge, whenNotMatched: fail
{kMergeFailMode,
{kMergeFailMode,
{ActionType::update},
- makeStrictUpdateStrategy(),
- makeUpdateTransform("$set"),
- UpsertType::kNone}},
+ makeStrictUpdateStrategy(UpsertType::kNone, makeUpdateTransform("$set"))}},
// whenMatched: merge, whenNotMatched: discard
{kMergeDiscardMode,
{kMergeDiscardMode,
{ActionType::update},
- makeUpdateStrategy(),
- makeUpdateTransform("$set"),
- UpsertType::kNone}},
+ makeUpdateStrategy(UpsertType::kNone, makeUpdateTransform("$set"))}},
// whenMatched: keepExisting, whenNotMatched: insert
{kKeepExistingInsertMode,
{kKeepExistingInsertMode,
{ActionType::insert, ActionType::update},
- makeUpdateStrategy(),
- makeUpdateTransform("$setOnInsert"),
- UpsertType::kGenerateNewDoc}},
+ makeUpdateStrategy(UpsertType::kGenerateNewDoc, makeUpdateTransform("$setOnInsert"))}},
// whenMatched: [pipeline], whenNotMatched: insert
{kPipelineInsertMode,
{kPipelineInsertMode,
{ActionType::insert, ActionType::update},
- makeUpdateStrategy(),
- {},
- UpsertType::kInsertSuppliedDoc}},
+ makeUpdateStrategy(UpsertType::kInsertSuppliedDoc, {})}},
// whenMatched: [pipeline], whenNotMatched: fail
{kPipelineFailMode,
{kPipelineFailMode,
{ActionType::update},
- makeStrictUpdateStrategy(),
- {},
- UpsertType::kNone}},
+ makeStrictUpdateStrategy(UpsertType::kNone, {})}},
// whenMatched: [pipeline], whenNotMatched: discard
{kPipelineDiscardMode,
- {kPipelineDiscardMode, {ActionType::update}, makeUpdateStrategy(), {}, UpsertType::kNone}},
+ {kPipelineDiscardMode, {ActionType::update}, makeUpdateStrategy(UpsertType::kNone, {})}},
// whenMatched: fail, whenNotMatched: insert
- {kFailInsertMode,
- {kFailInsertMode, {ActionType::insert}, makeInsertStrategy(), {}, UpsertType::kNone}}};
+ {kFailInsertMode, {kFailInsertMode, {ActionType::insert}, makeInsertStrategy()}}};
return mergeStrategyDescriptors;
}
@@ -564,14 +545,8 @@ std::pair<DocumentSourceMerge::BatchObject, int> DocumentSourceMerge::makeBatchO
auto mergeOnFields = extractMergeOnFieldsFromDoc(doc, _mergeOnFields);
auto mod = makeBatchUpdateModification(doc);
auto vars = resolveLetVariablesIfNeeded(doc);
- BatchObject batchObject{std::move(mergeOnFields), std::move(mod), std::move(vars)};
- if (_descriptor.transform) {
- _descriptor.transform(batchObject);
- }
-
- tassert(6628901, "_writeSizeEstimator should be initialized", _writeSizeEstimator);
- return {batchObject,
- _writeSizeEstimator->estimateUpdateSizeBytes(batchObject, _descriptor.upsertType)};
+ auto modSize = mod.objsize() + (vars ? vars->objsize() : 0);
+ return {{std::move(mergeOnFields), std::move(mod), std::move(vars)}, modSize};
}
void DocumentSourceMerge::spill(BatchedObjects&& batch) try {
@@ -580,8 +555,7 @@ void DocumentSourceMerge::spill(BatchedObjects&& batch) try {
? boost::optional<OID>(_targetCollectionVersion->epoch())
: boost::none;
- _descriptor.strategy(
- pExpCtx, _outputNs, _writeConcern, targetEpoch, std::move(batch), _descriptor.upsertType);
+ _descriptor.strategy(pExpCtx, _outputNs, _writeConcern, targetEpoch, std::move(batch));
} catch (const ExceptionFor<ErrorCodes::ImmutableField>& ex) {
uassertStatusOKWithContext(ex.toStatus(),
"$merge failed to update the matching document, did you "
diff --git a/src/mongo/db/pipeline/document_source_merge.h b/src/mongo/db/pipeline/document_source_merge.h
index e3784ae545c..05a87f1e340 100644
--- a/src/mongo/db/pipeline/document_source_merge.h
+++ b/src/mongo/db/pipeline/document_source_merge.h
@@ -44,31 +44,24 @@ class DocumentSourceMerge final : public DocumentSourceWriter<MongoProcessInterf
public:
static constexpr StringData kStageName = "$merge"_sd;
- using BatchTransform = std::function<void(MongoProcessInterface::BatchObject&)>;
-
- // A descriptor for a merge strategy. Holds a merge strategy function and a set of actions the
- // client should be authorized to perform in order to be able to execute a merge operation using
- // this merge strategy. If a 'BatchTransform' function is provided, it will be called when
- // constructing a batch object to transform updates.
+ // A descriptor for a merge strategy. Holds a merge strategy function and a set of actions
+ // the client should be authorized to perform in order to be able to execute a merge operation
+ // using this merge strategy.
struct MergeStrategyDescriptor {
using WhenMatched = MergeWhenMatchedModeEnum;
using WhenNotMatched = MergeWhenNotMatchedModeEnum;
using MergeMode = std::pair<WhenMatched, WhenNotMatched>;
- using UpsertType = MongoProcessInterface::UpsertType;
// A function encapsulating a merge strategy for the $merge stage based on the pair of
// whenMatched/whenNotMatched modes.
using MergeStrategy = std::function<void(const boost::intrusive_ptr<ExpressionContext>&,
const NamespaceString&,
const WriteConcernOptions&,
boost::optional<OID>,
- BatchedObjects&&,
- UpsertType upsert)>;
+ BatchedObjects&&)>;
MergeMode mode;
ActionSet actions;
MergeStrategy strategy;
- BatchTransform transform;
- UpsertType upsertType;
};
/**
diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h
index 602db5815f7..88883cb6231 100644
--- a/src/mongo/db/pipeline/document_source_out.h
+++ b/src/mongo/db/pipeline/document_source_out.h
@@ -131,8 +131,7 @@ private:
std::pair<BSONObj, int> makeBatchObject(Document&& doc) const override {
auto obj = doc.toBson();
- tassert(6628900, "_writeSizeEstimator should be initialized", _writeSizeEstimator);
- return {obj, _writeSizeEstimator->estimateInsertSizeBytes(obj)};
+ return {obj, obj.objsize()};
}
void waitWhileFailPointEnabled() override;
diff --git a/src/mongo/db/pipeline/document_source_writer.h b/src/mongo/db/pipeline/document_source_writer.h
index 284699deb30..1965bd4dee6 100644
--- a/src/mongo/db/pipeline/document_source_writer.h
+++ b/src/mongo/db/pipeline/document_source_writer.h
@@ -104,9 +104,7 @@ public:
const boost::intrusive_ptr<ExpressionContext>& expCtx)
: DocumentSource(stageName, expCtx),
_outputNs(std::move(outputNs)),
- _writeConcern(expCtx->opCtx->getWriteConcern()),
- _writeSizeEstimator(
- expCtx->mongoProcessInterface->getWriteSizeEstimator(expCtx->opCtx, outputNs)) {}
+ _writeConcern(expCtx->opCtx->getWriteConcern()) {}
DepsTracker::State getDependencies(DepsTracker* deps) const override {
deps->needWholeDocument = true;
@@ -171,9 +169,6 @@ protected:
// respect the writeConcern of the original command.
WriteConcernOptions _writeConcern;
- // An interface that is used to estimate the size of each write operation.
- const std::unique_ptr<MongoProcessInterface::WriteSizeEstimator> _writeSizeEstimator;
-
private:
bool _initialized{false};
bool _done{false};
diff --git a/src/mongo/db/pipeline/process_interface/common_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_process_interface.cpp
index 3e4897e9fc9..57e4c15f3cc 100644
--- a/src/mongo/db/pipeline/process_interface/common_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/common_process_interface.cpp
@@ -152,12 +152,6 @@ std::vector<FieldPath> CommonProcessInterface::collectDocumentKeyFieldsActingAsR
return {"_id"};
}
-std::unique_ptr<CommonProcessInterface::WriteSizeEstimator>
-CommonProcessInterface::getWriteSizeEstimator(OperationContext* opCtx,
- const NamespaceString& ns) const {
- return std::make_unique<LocalWriteSizeEstimator>();
-}
-
void CommonProcessInterface::updateClientOperationTime(OperationContext* opCtx) const {
// In order to support causal consistency in a replica set or a sharded cluster when reading
// with secondary read preference, the secondary must propagate the primary's operation time
diff --git a/src/mongo/db/pipeline/process_interface/common_process_interface.h b/src/mongo/db/pipeline/process_interface/common_process_interface.h
index 3e8625a2562..513edd5a6f4 100644
--- a/src/mongo/db/pipeline/process_interface/common_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/common_process_interface.h
@@ -46,48 +46,6 @@ public:
virtual ~CommonProcessInterface() = default;
/**
- * Estimates the size of writes that will be executed on the current node. Note that this
- * does not account for the full size of an update statement.
- */
- class LocalWriteSizeEstimator final : public WriteSizeEstimator {
- public:
- int estimateInsertSizeBytes(const BSONObj& insert) const override {
- return insert.objsize();
- }
-
- int estimateUpdateSizeBytes(const BatchObject& batchObject,
- UpsertType type) const override {
- int size = std::get<write_ops::UpdateModification>(batchObject).objsize();
- if (auto vars = std::get<boost::optional<BSONObj>>(batchObject)) {
- size += vars->objsize();
- }
- return size;
- }
- };
-
- /**
- * Estimate the size of writes that will be sent to the replica set primary.
- */
- class TargetPrimaryWriteSizeEstimator final : public WriteSizeEstimator {
- public:
- int estimateInsertSizeBytes(const BSONObj& insert) const override {
- return insert.objsize() + write_ops::kWriteCommandBSONArrayPerElementOverheadBytes;
- }
-
- int estimateUpdateSizeBytes(const BatchObject& batchObject,
- UpsertType type) const override {
- return getUpdateSizeEstimate(std::get<BSONObj>(batchObject),
- std::get<write_ops::UpdateModification>(batchObject),
- std::get<boost::optional<BSONObj>>(batchObject),
- type != UpsertType::kNone /* includeUpsertSupplied */,
- boost::none /* collation */,
- boost::none /* arrayFilters */,
- BSONObj() /* hint*/) +
- write_ops::kWriteCommandBSONArrayPerElementOverheadBytes;
- }
- };
-
- /**
* Returns true if the field names of 'keyPattern' are exactly those in 'uniqueKeyPaths', and
* each of the elements of 'keyPattern' is numeric, i.e. not "text", "$**", or any other special
* type of index.
@@ -106,9 +64,6 @@ public:
virtual std::vector<FieldPath> collectDocumentKeyFieldsActingAsRouter(
OperationContext*, const NamespaceString&) const override;
- std::unique_ptr<WriteSizeEstimator> getWriteSizeEstimator(
- OperationContext* opCtx, const NamespaceString& ns) const override;
-
virtual void updateClientOperationTime(OperationContext* opCtx) const final;
boost::optional<ChunkVersion> refreshAndGetCollectionVersion(
diff --git a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
index a0183fd9d33..0e36e13613f 100644
--- a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
@@ -78,7 +78,7 @@ class MongoProcessInterface {
public:
/**
* Storage for a batch of BSON Objects to be updated in the write namespace. For each element
- * in the batch we store a tuple of the following elements:
+ * in the batch we store a tuple of the folliwng elements:
* 1. BSONObj - specifies the query that identifies a document in the to collection to be
* updated.
* 2. write_ops::UpdateModification - either the new document we want to upsert or insert into
@@ -106,19 +106,6 @@ public:
enum class CurrentOpBacktraceMode { kIncludeBacktrace, kExcludeBacktrace };
/**
- * Interface which estimates the size of a given write operation.
- */
- class WriteSizeEstimator {
- public:
- virtual ~WriteSizeEstimator() = default;
-
- virtual int estimateInsertSizeBytes(const BSONObj& insert) const = 0;
-
- virtual int estimateUpdateSizeBytes(const BatchObject& batchObject,
- UpsertType type) const = 0;
- };
-
- /**
* Factory function to create MongoProcessInterface of the right type. The implementation will
* be installed by a lib higher up in the link graph depending on the application type.
*/
@@ -140,12 +127,6 @@ public:
virtual ~MongoProcessInterface(){};
/**
- * Returns an instance of a 'WriteSizeEstimator' interface.
- */
- virtual std::unique_ptr<WriteSizeEstimator> getWriteSizeEstimator(
- OperationContext* opCtx, const NamespaceString& ns) const = 0;
-
- /**
* Creates a new TransactionHistoryIterator object. Only applicable in processes which support
* locally traversing the oplog.
*/
diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp
index 2ca328947ab..c3dcee83248 100644
--- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp
@@ -101,12 +101,6 @@ bool supportsUniqueKey(const boost::intrusive_ptr<ExpressionContext>& expCtx,
} // namespace
-std::unique_ptr<CommonProcessInterface::WriteSizeEstimator>
-MongosProcessInterface::getWriteSizeEstimator(OperationContext* opCtx,
- const NamespaceString& ns) const {
- return std::make_unique<TargetPrimaryWriteSizeEstimator>();
-}
-
std::unique_ptr<Pipeline, PipelineDeleter> MongosProcessInterface::attachCursorSourceToPipeline(
Pipeline* ownedPipeline,
ShardTargetingPolicy shardTargetingPolicy,
diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h
index 786440cb5af..3bb10f83f90 100644
--- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h
@@ -45,9 +45,6 @@ public:
virtual ~MongosProcessInterface() = default;
- std::unique_ptr<WriteSizeEstimator> getWriteSizeEstimator(
- OperationContext* opCtx, const NamespaceString& ns) const final;
-
boost::optional<Document> lookupSingleDocument(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& nss,
diff --git a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h
index 02c183d43a0..7d72c064123 100644
--- a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h
@@ -43,15 +43,6 @@ class ReplicaSetNodeProcessInterface final : public NonShardServerProcessInterfa
public:
using NonShardServerProcessInterface::NonShardServerProcessInterface;
- std::unique_ptr<WriteSizeEstimator> getWriteSizeEstimator(
- OperationContext* opCtx, const NamespaceString& ns) const override {
- if (_canWriteLocally(opCtx, ns)) {
- return std::make_unique<LocalWriteSizeEstimator>();
- } else {
- return std::make_unique<TargetPrimaryWriteSizeEstimator>();
- }
- }
-
static std::shared_ptr<executor::TaskExecutor> getReplicaSetNodeExecutor(
ServiceContext* service);
diff --git a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
index ab0d6f2a8ab..f406cdfb726 100644
--- a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
@@ -52,22 +52,6 @@ public:
MONGO_UNREACHABLE;
}
- class StubWriteSizeEstimator final : public WriteSizeEstimator {
- public:
- int estimateInsertSizeBytes(const BSONObj& insert) const override {
- MONGO_UNREACHABLE;
- }
-
- int estimateUpdateSizeBytes(const BatchObject& batchObject,
- UpsertType type) const override {
- MONGO_UNREACHABLE;
- }
- };
- std::unique_ptr<WriteSizeEstimator> getWriteSizeEstimator(
- OperationContext* opCtx, const NamespaceString& ns) const override {
- return std::make_unique<StubWriteSizeEstimator>();
- }
-
bool isSharded(OperationContext* opCtx, const NamespaceString& ns) override {
return false;
}
diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp
index 26f9a4d3488..4c2061dd07f 100644
--- a/src/mongo/s/write_ops/batch_write_op.cpp
+++ b/src/mongo/s/write_ops/batch_write_op.cpp
@@ -59,6 +59,7 @@ struct WriteErrorComp {
// batches before serializing.
//
// TODO: Revisit when we revisit command limits in general
+const int kEstUpdateOverheadBytes = (BSONObjMaxInternalSize - BSONObjMaxUserSize) / 100;
const int kEstDeleteOverheadBytes = (BSONObjMaxInternalSize - BSONObjMaxUserSize) / 100;
/**
@@ -158,17 +159,51 @@ int getWriteSizeBytes(const WriteOp& writeOp) {
return item.getDocument().objsize();
} else if (batchType == BatchedCommandRequest::BatchType_Update) {
// Note: Be conservative here - it's okay if we send slightly too many batches.
- const auto& update = item.getUpdate();
- auto estSize = write_ops::getUpdateSizeEstimate(update.getQ(),
- update.getU(),
- update.getC(),
- update.getUpsertSupplied().has_value(),
- update.getCollation(),
- update.getArrayFilters(),
- update.getHint());
+ auto estSize = static_cast<int>(BSONObj::kMinBSONLength);
+ static const auto boolSize = 1;
+
+ // Add the size of the 'collation' field, if present.
+ estSize += !item.getUpdate().getCollation() ? 0
+ : (UpdateOpEntry::kCollationFieldName.size() +
+ item.getUpdate().getCollation()->objsize());
+
+ // Add the size of the 'arrayFilters' field, if present.
+ estSize += !item.getUpdate().getArrayFilters() ? 0 : ([&item]() {
+ auto size = BSONObj::kMinBSONLength + UpdateOpEntry::kArrayFiltersFieldName.size();
+ for (auto&& filter : *item.getUpdate().getArrayFilters()) {
+ size += filter.objsize();
+ }
+ return size;
+ })();
+
+ // Add the sizes of the 'multi' and 'upsert' fields.
+ estSize += UpdateOpEntry::kUpsertFieldName.size() + boolSize;
+ estSize += UpdateOpEntry::kMultiFieldName.size() + boolSize;
+
+ // Add the size of 'upsertSupplied' field if present.
+ if (auto upsertSupplied = item.getUpdate().getUpsertSupplied()) {
+ estSize += UpdateOpEntry::kUpsertSuppliedFieldName.size() + boolSize;
+ }
+
+ // Add the sizes of the 'q' and 'u' fields.
+ estSize += (UpdateOpEntry::kQFieldName.size() + item.getUpdate().getQ().objsize() +
+ UpdateOpEntry::kUFieldName.size() + item.getUpdate().getU().objsize());
+
+ // Add the size of the 'c' field if present.
+ if (auto constants = item.getUpdate().getC()) {
+ estSize += UpdateOpEntry::kCFieldName.size() + item.getUpdate().getC()->objsize();
+ }
+
+ // Add the size of 'hint' field if present.
+ if (auto hint = item.getUpdate().getHint(); !hint.isEmpty()) {
+ estSize += UpdateOpEntry::kHintFieldName.size() + hint.objsize();
+ }
+
+ // Finally, add the constant updateOp overhead size.
+ estSize += kEstUpdateOverheadBytes;
// When running a debug build, verify that estSize is at least the BSON serialization size.
- dassert(estSize >= update.toBSON().objsize());
+ dassert(estSize >= item.getUpdate().toBSON().objsize());
return estSize;
} else if (batchType == BatchedCommandRequest::BatchType_Delete) {
// Note: Be conservative here - it's okay if we send slightly too many batches.