summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMihai Andrei <mihai.andrei@10gen.com>2022-10-05 03:38:33 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-10-05 04:03:03 +0000
commitdad5d0a196ffb05b2c5c8b315c33ca46c8b65934 (patch)
tree6e5af82babe479135c38e21d10bf40b4180d8d72
parent38f102485006e7071c0d8ee015ec2da160a0d014 (diff)
downloadmongo-dad5d0a196ffb05b2c5c8b315c33ca46c8b65934.tar.gz
SERVER-66289 Update write size estimation logic in DocumentSourceWriter
(cherry picked from commit 707ba0a0ade42c4540b9cabaaf5a257de944cc3e) (cherry picked from commit c172ccd37516f3c2118f349817cdb1841a2486b9) (cherry picked from commit 3f2cd9485a807eaeabc60bd99653cffd2942f662)
-rw-r--r--jstests/aggregation/sources/merge/mode_merge_fail.js8
-rw-r--r--jstests/aggregation/sources/merge/mode_replace_fail.js8
-rw-r--r--jstests/noPassthrough/out_merge_on_secondary_batch_write.js111
-rw-r--r--src/mongo/db/ops/write_ops.h27
-rw-r--r--src/mongo/db/ops/write_ops_parsers.cpp62
-rw-r--r--src/mongo/db/pipeline/document_source_merge.cpp128
-rw-r--r--src/mongo/db/pipeline/document_source_merge.h31
-rw-r--r--src/mongo/db/pipeline/document_source_out.h3
-rw-r--r--src/mongo/db/pipeline/document_source_writer.h8
-rw-r--r--src/mongo/db/pipeline/process_interface/common_process_interface.cpp6
-rw-r--r--src/mongo/db/pipeline/process_interface/common_process_interface.h46
-rw-r--r--src/mongo/db/pipeline/process_interface/mongo_process_interface.h21
-rw-r--r--src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp6
-rw-r--r--src/mongo/db/pipeline/process_interface/mongos_process_interface.h3
-rw-r--r--src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h9
-rw-r--r--src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h16
-rw-r--r--src/mongo/s/write_ops/batch_write_op.cpp43
17 files changed, 434 insertions, 102 deletions
diff --git a/jstests/aggregation/sources/merge/mode_merge_fail.js b/jstests/aggregation/sources/merge/mode_merge_fail.js
index 7235c8e1c7e..fcc13f8e871 100644
--- a/jstests/aggregation/sources/merge/mode_merge_fail.js
+++ b/jstests/aggregation/sources/merge/mode_merge_fail.js
@@ -97,9 +97,13 @@ const pipeline = [mergeStage];
// and updated.
(function testMergeUnorderedBatchUpdate() {
const maxBatchSize = 16 * 1024 * 1024; // 16MB
- const docSize = 1024 * 1024; // 1MB
+
+ // Each document is just under 1MB in order to allow for some extra space for writes that need
+ // to be serialized over the wire in certain cluster configurations. Otherwise, the number of
+ // modified/unmodified documents would be off by one depending on how our cluster is configured.
+ const docSize = 1024 * 1023;
const numDocs = 20;
- const maxDocsInBatch = maxBatchSize / docSize;
+ const maxDocsInBatch = Math.floor(maxBatchSize / docSize);
assert(source.drop());
dropWithoutImplicitRecreate(target.getName());
diff --git a/jstests/aggregation/sources/merge/mode_replace_fail.js b/jstests/aggregation/sources/merge/mode_replace_fail.js
index 88582238c8f..cc3df1b1ce7 100644
--- a/jstests/aggregation/sources/merge/mode_replace_fail.js
+++ b/jstests/aggregation/sources/merge/mode_replace_fail.js
@@ -90,9 +90,13 @@ const pipeline = [mergeStage];
// and updated.
(function testMergeUnorderedBatchUpdate() {
const maxBatchSize = 16 * 1024 * 1024; // 16MB
- const docSize = 1024 * 1024; // 1MB
+
+ // Each document is just under 1MB in order to allow for some extra space for writes that need
+ // to be serialized over the wire in certain cluster configurations. Otherwise, the number of
+ // modified/unmodified documents would be off by one depending on how our cluster is configured.
+ const docSize = 1024 * 1023; // 1MB
const numDocs = 20;
- const maxDocsInBatch = maxBatchSize / docSize;
+ const maxDocsInBatch = Math.floor(maxBatchSize / docSize);
assert(source.drop());
dropWithoutImplicitRecreate(target.getName());
diff --git a/jstests/noPassthrough/out_merge_on_secondary_batch_write.js b/jstests/noPassthrough/out_merge_on_secondary_batch_write.js
new file mode 100644
index 00000000000..c51ff1afcf2
--- /dev/null
+++ b/jstests/noPassthrough/out_merge_on_secondary_batch_write.js
@@ -0,0 +1,111 @@
+/**
+ * Test which verifies that $out/$merge aggregations with secondary read preference which write
+ * over 16 MB work as expected (especially with respect to producing correctly sized write batches).
+ *
+ * @tags: [uses_$out, assumes_read_preference_unchanged, requires_replication]
+ */
+(function() {
+const dbName = "db";
+const collName = "movies";
+const targetCollName = "movies2";
+
+function testFn(db) {
+ const coll = db[collName];
+ coll.drop();
+ db[targetCollName].drop();
+
+ // Insert 4 MB more than the maximum bytes allowed in a single write batch worth of data
+ // serialized as a single BSONObj.
+ const hello = db.hello();
+ const maxBatchSize = hello.maxWriteBatchSize;
+ const totalDataSize = hello.maxBsonObjectSize + (4 * 1024 * 1024);
+ const sizePerDoc = totalDataSize / maxBatchSize;
+ const bigString = "a".repeat(sizePerDoc);
+ const bulk = coll.initializeUnorderedBulkOp();
+
+ for (let i = 0; i < maxBatchSize; ++i) {
+ bulk.insert({_id: NumberInt(i), foo: bigString});
+ }
+ assert.commandWorked(bulk.execute({w: "majority"}));
+
+ function defaultSetUpFn(db) {
+ db[targetCollName].drop({writeConcern: {w: "majority"}});
+ }
+
+ function cleanUpFn(db) {
+ db[targetCollName].drop({writeConcern: {w: "majority"}});
+ }
+
+ function testWriteAggSpec(aggWriteStageSpec, setUpFn, errorCodes = []) {
+ // Run 'aggWriteStageSpec' with both primary and secondary read preference.
+ for (const readPref of ["primary", "secondary"]) {
+ jsTestLog("Testing " + tojson(aggWriteStageSpec) + " with read preference " + readPref);
+ setUpFn(db);
+
+ // If the caller provided some error codes, assert that the command failed with one
+ // of these codes.
+ const fn = () =>
+ db[collName]
+ .aggregate([aggWriteStageSpec], {$readPreference: {mode: readPref}})
+ .itcount();
+ const errMsg = "Failed to run aggregate with read preference " + readPref;
+ if (errorCodes.length > 0) {
+ assert.throwsWithCode(fn, errorCodes, [] /* params */, errMsg);
+ } else {
+ assert.doesNotThrow(fn, [] /* params */, errMsg);
+ }
+ cleanUpFn(db);
+ }
+ }
+
+ // Set up documents in the output collection so that $merge will perform updates.
+ function mergeUpdateSetupFn(db) {
+ defaultSetUpFn(db);
+ const bulk = db[targetCollName].initializeUnorderedBulkOp();
+ for (let i = 0; i < maxBatchSize; ++i) {
+ bulk.insert({_id: NumberInt(i), extraField: i * 3});
+ }
+ assert.commandWorked(bulk.execute({w: "majority"}));
+ }
+
+ testWriteAggSpec({$out: targetCollName}, defaultSetUpFn);
+ testWriteAggSpec(
+ {$merge: {into: targetCollName, whenMatched: "replace", whenNotMatched: "insert"}},
+ defaultSetUpFn);
+ testWriteAggSpec(
+ {$merge: {into: targetCollName, whenMatched: "merge", whenNotMatched: "insert"}},
+ defaultSetUpFn);
+ testWriteAggSpec(
+ {$merge: {into: targetCollName, whenMatched: "keepExisting", whenNotMatched: "insert"}},
+ defaultSetUpFn);
+ testWriteAggSpec(
+ {$merge: {into: targetCollName, whenMatched: "merge", whenNotMatched: "insert", on: "_id"}},
+ mergeUpdateSetupFn);
+
+ // Failure cases.
+ testWriteAggSpec(
+ {$merge: {into: targetCollName, whenMatched: "replace", whenNotMatched: "fail", on: "_id"}},
+ defaultSetUpFn,
+ [ErrorCodes.MergeStageNoMatchingDocument]);
+ testWriteAggSpec(
+ {$merge: {into: targetCollName, whenMatched: "merge", whenNotMatched: "fail", on: "_id"}},
+ defaultSetUpFn,
+ [ErrorCodes.MergeStageNoMatchingDocument]);
+ testWriteAggSpec(
+ {$merge: {into: targetCollName, whenMatched: "fail", whenNotMatched: "insert", on: "_id"}},
+ mergeUpdateSetupFn,
+ [ErrorCodes.DuplicateKey]);
+}
+
+jsTestLog("Testing against a replica set");
+const rst = new ReplSetTest({nodes: 2});
+rst.startSet();
+rst.initiate();
+testFn(new Mongo(rst.getURL()).getDB(dbName));
+rst.stopSet();
+
+jsTestLog("Testing against a sharded cluster");
+const st = new ShardingTest({shards: 1, rs: {nodes: 2}});
+testFn(st.s.getDB(dbName));
+st.stop();
+}());
diff --git a/src/mongo/db/ops/write_ops.h b/src/mongo/db/ops/write_ops.h
index 8b741524480..aae387fec52 100644
--- a/src/mongo/db/ops/write_ops.h
+++ b/src/mongo/db/ops/write_ops.h
@@ -89,5 +89,32 @@ const std::vector<BSONObj>& arrayFiltersOf(const T& opEntry) {
return opEntry.getArrayFilters().get_value_or(emptyBSONArray);
}
+/**
+ * Utility which estimates the size in bytes of an update statement with the given parameters, when
+ * serialized in the format used for the update command.
+ */
+int getUpdateSizeEstimate(const BSONObj& q,
+ const write_ops::UpdateModification& u,
+ const boost::optional<mongo::BSONObj>& c,
+ bool includeUpsertSupplied,
+ const boost::optional<mongo::BSONObj>& collation,
+ const boost::optional<std::vector<mongo::BSONObj>>& arrayFilters,
+ const mongo::BSONObj& hint);
+
+/**
+ * If the response from a write command contains any write errors, it will throw the first one. All
+ * the remaining errors will be disregarded.
+ *
+ * Usages of this utility for anything other than single-document writes would be suspicious due to
+ * the fact that it will swallow the remaining ones.
+ */
+void checkWriteErrors(const WriteCommandBase& reply);
+
+template <class T>
+T checkWriteErrors(T op) {
+ checkWriteErrors(op.getWriteCommandBase());
+ return std::move(op);
+}
+
} // namespace write_ops
} // namespace mongo
diff --git a/src/mongo/db/ops/write_ops_parsers.cpp b/src/mongo/db/ops/write_ops_parsers.cpp
index 51b5e1b2cf7..fcb5fe5ca0f 100644
--- a/src/mongo/db/ops/write_ops_parsers.cpp
+++ b/src/mongo/db/ops/write_ops_parsers.cpp
@@ -100,6 +100,68 @@ int32_t getStmtIdForWriteAt(const WriteCommandBase& writeCommandBase, size_t wri
return kFirstStmtId + writePos;
}
+int getUpdateSizeEstimate(const BSONObj& q,
+ const write_ops::UpdateModification& u,
+ const boost::optional<mongo::BSONObj>& c,
+ const bool includeUpsertSupplied,
+ const boost::optional<mongo::BSONObj>& collation,
+ const boost::optional<std::vector<mongo::BSONObj>>& arrayFilters,
+ const mongo::BSONObj& hint) {
+ using UpdateOpEntry = write_ops::UpdateOpEntry;
+
+ // This constant accounts for the null terminator in each field name and the BSONType byte for
+ // each element.
+ static const int kPerElementOverhead = 2;
+ static const int kBoolSize = 1;
+ int estSize = static_cast<int>(BSONObj::kMinBSONLength);
+
+ // Add the sizes of the 'multi' and 'upsert' fields.
+ estSize += UpdateOpEntry::kUpsertFieldName.size() + kBoolSize + kPerElementOverhead;
+ estSize += UpdateOpEntry::kMultiFieldName.size() + kBoolSize + kPerElementOverhead;
+
+ // Add the size of 'upsertSupplied' field if present.
+ if (includeUpsertSupplied) {
+ estSize += UpdateOpEntry::kUpsertSuppliedFieldName.size() + kBoolSize + kPerElementOverhead;
+ }
+
+ // Add the sizes of the 'q' and 'u' fields.
+ estSize += (UpdateOpEntry::kQFieldName.size() + q.objsize() + kPerElementOverhead +
+ UpdateOpEntry::kUFieldName.size() + u.objsize() + kPerElementOverhead);
+
+ // Add the size of the 'c' field, if present.
+ if (c) {
+ estSize += (UpdateOpEntry::kCFieldName.size() + c->objsize() + kPerElementOverhead);
+ }
+
+ // Add the size of the 'collation' field, if present.
+ if (collation) {
+ estSize += (UpdateOpEntry::kCollationFieldName.size() + collation->objsize() +
+ kPerElementOverhead);
+ }
+
+ // Add the size of the 'arrayFilters' field, if present.
+ if (arrayFilters) {
+ estSize += ([&]() {
+ auto size = BSONObj::kMinBSONLength + UpdateOpEntry::kArrayFiltersFieldName.size() +
+ kPerElementOverhead;
+ for (auto&& filter : *arrayFilters) {
+ // For each filter, we not only need to account for the size of the filter itself,
+ // but also for the per array element overhead.
+ size += filter.objsize();
+ size += write_ops::kWriteCommandBSONArrayPerElementOverheadBytes;
+ }
+ return size;
+ })();
+ }
+
+ // Add the size of 'hint' field if present.
+ if (!hint.isEmpty()) {
+ estSize += UpdateOpEntry::kHintFieldName.size() + hint.objsize() + kPerElementOverhead;
+ }
+
+ return estSize;
+}
+
} // namespace write_ops
write_ops::Insert InsertOp::parse(const OpMsgRequest& request) {
diff --git a/src/mongo/db/pipeline/document_source_merge.cpp b/src/mongo/db/pipeline/document_source_merge.cpp
index 09da5fea8d1..ce2a78e2437 100644
--- a/src/mongo/db/pipeline/document_source_merge.cpp
+++ b/src/mongo/db/pipeline/document_source_merge.cpp
@@ -57,7 +57,7 @@ using MergeStrategy = MergeStrategyDescriptor::MergeStrategy;
using MergeStrategyDescriptorsMap = std::map<const MergeMode, const MergeStrategyDescriptor>;
using WhenMatched = MergeStrategyDescriptor::WhenMatched;
using WhenNotMatched = MergeStrategyDescriptor::WhenNotMatched;
-using BatchTransform = std::function<void(DocumentSourceMerge::BatchedObjects&)>;
+using BatchTransform = DocumentSourceMerge::BatchTransform;
using UpdateModification = write_ops::UpdateModification;
using UpsertType = MongoProcessInterface::UpsertType;
@@ -81,17 +81,15 @@ const auto kDefaultPipelineLet = BSON("new"
<< "$$ROOT");
/**
- * Creates a merge strategy which uses update semantics to perform a merge operation. If
- * 'BatchTransform' function is provided, it will be called to transform batched objects before
- * passing them to the 'update'.
+ * Creates a merge strategy which uses update semantics to perform a merge operation.
*/
-MergeStrategy makeUpdateStrategy(UpsertType upsert, BatchTransform transform) {
- return [upsert, transform](
- const auto& expCtx, const auto& ns, const auto& wc, auto epoch, auto&& batch) {
- if (transform) {
- transform(batch);
- }
-
+MergeStrategy makeUpdateStrategy() {
+ return [](const auto& expCtx,
+ const auto& ns,
+ const auto& wc,
+ auto epoch,
+ auto&& batch,
+ UpsertType upsert) {
constexpr auto multi = false;
uassertStatusOK(expCtx->mongoProcessInterface->update(
expCtx, ns, std::move(batch), wc, upsert, multi, epoch));
@@ -103,16 +101,15 @@ MergeStrategy makeUpdateStrategy(UpsertType upsert, BatchTransform transform) {
* that each document in the batch has a matching document in the 'ns' collection (note that a
* matching document may not be modified as a result of an update operation, yet it still will be
* counted as matching). If at least one document doesn't have a match, this strategy returns an
- * error. If 'BatchTransform' function is provided, it will be called to transform batched objects
- * before passing them to the 'update'.
+ * error.
*/
-MergeStrategy makeStrictUpdateStrategy(UpsertType upsert, BatchTransform transform) {
- return [upsert, transform](
- const auto& expCtx, const auto& ns, const auto& wc, auto epoch, auto&& batch) {
- if (transform) {
- transform(batch);
- }
-
+MergeStrategy makeStrictUpdateStrategy() {
+ return [](const auto& expCtx,
+ const auto& ns,
+ const auto& wc,
+ auto epoch,
+ auto&& batch,
+ UpsertType upsert) {
const int64_t batchSize = batch.size();
constexpr auto multi = false;
auto updateResult = uassertStatusOK(expCtx->mongoProcessInterface->update(
@@ -128,7 +125,12 @@ MergeStrategy makeStrictUpdateStrategy(UpsertType upsert, BatchTransform transfo
* Creates a merge strategy which uses insert semantics to perform a merge operation.
*/
MergeStrategy makeInsertStrategy() {
- return [](const auto& expCtx, const auto& ns, const auto& wc, auto epoch, auto&& batch) {
+ return [](const auto& expCtx,
+ const auto& ns,
+ const auto& wc,
+ auto epoch,
+ auto&& batch,
+ UpsertType upsertType) {
std::vector<BSONObj> objectsToInsert(batch.size());
// The batch stores replacement style updates, but for this "insert" style of $merge we'd
// like to just insert the new document without attempting any sort of replacement.
@@ -141,15 +143,13 @@ MergeStrategy makeInsertStrategy() {
}
/**
- * Creates a batched objects transformation function which wraps each element of the
- * 'batch.modifications' array into the given 'updateOp' operator.
+ * Creates a batched object transformation function which wraps 'obj' into the given 'updateOp'
+ * operator.
*/
BatchTransform makeUpdateTransform(const std::string& updateOp) {
- return [updateOp](auto& batch) {
- for (auto&& obj : batch) {
- std::get<UpdateModification>(obj) =
- BSON(updateOp << std::get<UpdateModification>(obj).getUpdateClassic());
- }
+ return [updateOp](auto& obj) {
+ std::get<UpdateModification>(obj) =
+ BSON(updateOp << std::get<UpdateModification>(obj).getUpdateClassic());
};
}
@@ -173,48 +173,67 @@ const MergeStrategyDescriptorsMap& getDescriptors() {
{kReplaceInsertMode,
{kReplaceInsertMode,
{ActionType::insert, ActionType::update},
- makeUpdateStrategy(UpsertType::kGenerateNewDoc, {})}},
+ makeUpdateStrategy(),
+ {},
+ UpsertType::kGenerateNewDoc}},
// whenMatched: replace, whenNotMatched: fail
{kReplaceFailMode,
- {kReplaceFailMode, {ActionType::update}, makeStrictUpdateStrategy(UpsertType::kNone, {})}},
+ {kReplaceFailMode,
+ {ActionType::update},
+ makeStrictUpdateStrategy(),
+ {},
+ UpsertType::kNone}},
// whenMatched: replace, whenNotMatched: discard
{kReplaceDiscardMode,
- {kReplaceDiscardMode, {ActionType::update}, makeUpdateStrategy(UpsertType::kNone, {})}},
+ {kReplaceDiscardMode, {ActionType::update}, makeUpdateStrategy(), {}, UpsertType::kNone}},
// whenMatched: merge, whenNotMatched: insert
{kMergeInsertMode,
{kMergeInsertMode,
{ActionType::insert, ActionType::update},
- makeUpdateStrategy(UpsertType::kGenerateNewDoc, makeUpdateTransform("$set"))}},
+ makeUpdateStrategy(),
+ makeUpdateTransform("$set"),
+ UpsertType::kGenerateNewDoc}},
// whenMatched: merge, whenNotMatched: fail
{kMergeFailMode,
{kMergeFailMode,
{ActionType::update},
- makeStrictUpdateStrategy(UpsertType::kNone, makeUpdateTransform("$set"))}},
+ makeStrictUpdateStrategy(),
+ makeUpdateTransform("$set"),
+ UpsertType::kNone}},
// whenMatched: merge, whenNotMatched: discard
{kMergeDiscardMode,
{kMergeDiscardMode,
{ActionType::update},
- makeUpdateStrategy(UpsertType::kNone, makeUpdateTransform("$set"))}},
+ makeUpdateStrategy(),
+ makeUpdateTransform("$set"),
+ UpsertType::kNone}},
// whenMatched: keepExisting, whenNotMatched: insert
{kKeepExistingInsertMode,
{kKeepExistingInsertMode,
{ActionType::insert, ActionType::update},
- makeUpdateStrategy(UpsertType::kGenerateNewDoc, makeUpdateTransform("$setOnInsert"))}},
+ makeUpdateStrategy(),
+ makeUpdateTransform("$setOnInsert"),
+ UpsertType::kGenerateNewDoc}},
// whenMatched: [pipeline], whenNotMatched: insert
{kPipelineInsertMode,
{kPipelineInsertMode,
{ActionType::insert, ActionType::update},
- makeUpdateStrategy(UpsertType::kInsertSuppliedDoc, {})}},
+ makeUpdateStrategy(),
+ {},
+ UpsertType::kInsertSuppliedDoc}},
// whenMatched: [pipeline], whenNotMatched: fail
{kPipelineFailMode,
{kPipelineFailMode,
{ActionType::update},
- makeStrictUpdateStrategy(UpsertType::kNone, {})}},
+ makeStrictUpdateStrategy(),
+ {},
+ UpsertType::kNone}},
// whenMatched: [pipeline], whenNotMatched: discard
{kPipelineDiscardMode,
- {kPipelineDiscardMode, {ActionType::update}, makeUpdateStrategy(UpsertType::kNone, {})}},
+ {kPipelineDiscardMode, {ActionType::update}, makeUpdateStrategy(), {}, UpsertType::kNone}},
// whenMatched: fail, whenNotMatched: insert
- {kFailInsertMode, {kFailInsertMode, {ActionType::insert}, makeInsertStrategy()}}};
+ {kFailInsertMode,
+ {kFailInsertMode, {ActionType::insert}, makeInsertStrategy(), {}, UpsertType::kNone}}};
return mergeStrategyDescriptors;
}
@@ -506,8 +525,35 @@ std::pair<DocumentSourceMerge::BatchObject, int> DocumentSourceMerge::makeBatchO
auto mergeOnFields = extractMergeOnFieldsFromDoc(doc, _mergeOnFields);
auto mod = makeBatchUpdateModification(doc);
auto vars = resolveLetVariablesIfNeeded(doc);
- auto modSize = mod.objsize() + (vars ? vars->objsize() : 0);
- return {{std::move(mergeOnFields), std::move(mod), std::move(vars)}, modSize};
+ BatchObject batchObject{std::move(mergeOnFields), std::move(mod), std::move(vars)};
+ if (_descriptor.transform) {
+ _descriptor.transform(batchObject);
+ }
+
+ invariant(_writeSizeEstimator);
+ return {batchObject,
+ _writeSizeEstimator->estimateUpdateSizeBytes(batchObject, _descriptor.upsertType)};
+}
+
+void DocumentSourceMerge::spill(BatchedObjects&& batch) {
+ DocumentSourceWriteBlock writeBlock(pExpCtx->opCtx);
+
+ try {
+ auto targetEpoch = _targetCollectionVersion
+ ? boost::optional<OID>(_targetCollectionVersion->epoch())
+ : boost::none;
+
+ _descriptor.strategy(pExpCtx,
+ _outputNs,
+ _writeConcern,
+ targetEpoch,
+ std::move(batch),
+ _descriptor.upsertType);
+ } catch (const ExceptionFor<ErrorCodes::ImmutableField>& ex) {
+ uassertStatusOKWithContext(ex.toStatus(),
+ "$merge failed to update the matching document, did you "
+ "attempt to modify the _id or the shard key?");
+ }
}
void DocumentSourceMerge::waitWhileFailPointEnabled() {
diff --git a/src/mongo/db/pipeline/document_source_merge.h b/src/mongo/db/pipeline/document_source_merge.h
index 464b4cfdaa0..e563fa0d2cf 100644
--- a/src/mongo/db/pipeline/document_source_merge.h
+++ b/src/mongo/db/pipeline/document_source_merge.h
@@ -44,24 +44,31 @@ class DocumentSourceMerge final : public DocumentSourceWriter<MongoProcessInterf
public:
static constexpr StringData kStageName = "$merge"_sd;
- // A descriptor for a merge strategy. Holds a merge strategy function and a set of actions
- // the client should be authorized to perform in order to be able to execute a merge operation
- // using this merge strategy.
+ using BatchTransform = std::function<void(MongoProcessInterface::BatchObject&)>;
+
+ // A descriptor for a merge strategy. Holds a merge strategy function and a set of actions the
+ // client should be authorized to perform in order to be able to execute a merge operation using
+ // this merge strategy. If a 'BatchTransform' function is provided, it will be called when
+ // constructing a batch object to transform updates.
struct MergeStrategyDescriptor {
using WhenMatched = MergeWhenMatchedModeEnum;
using WhenNotMatched = MergeWhenNotMatchedModeEnum;
using MergeMode = std::pair<WhenMatched, WhenNotMatched>;
+ using UpsertType = MongoProcessInterface::UpsertType;
// A function encapsulating a merge strategy for the $merge stage based on the pair of
// whenMatched/whenNotMatched modes.
using MergeStrategy = std::function<void(const boost::intrusive_ptr<ExpressionContext>&,
const NamespaceString&,
const WriteConcernOptions&,
boost::optional<OID>,
- BatchedObjects&&)>;
+ BatchedObjects&&,
+ UpsertType upsert)>;
MergeMode mode;
ActionSet actions;
MergeStrategy strategy;
+ BatchTransform transform;
+ UpsertType upsertType;
};
/**
@@ -214,21 +221,7 @@ private:
return bob.obj();
}
- void spill(BatchedObjects&& batch) override {
- DocumentSourceWriteBlock writeBlock(pExpCtx->opCtx);
-
- try {
- auto targetEpoch = _targetCollectionVersion
- ? boost::optional<OID>(_targetCollectionVersion->epoch())
- : boost::none;
-
- _descriptor.strategy(pExpCtx, _outputNs, _writeConcern, targetEpoch, std::move(batch));
- } catch (const ExceptionFor<ErrorCodes::ImmutableField>& ex) {
- uassertStatusOKWithContext(ex.toStatus(),
- "$merge failed to update the matching document, did you "
- "attempt to modify the _id or the shard key?");
- }
- }
+ void spill(BatchedObjects&& batch) override;
void waitWhileFailPointEnabled() override;
diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h
index b01182ffcbd..10795a2c135 100644
--- a/src/mongo/db/pipeline/document_source_out.h
+++ b/src/mongo/db/pipeline/document_source_out.h
@@ -129,7 +129,8 @@ private:
std::pair<BSONObj, int> makeBatchObject(Document&& doc) const override {
auto obj = doc.toBson();
- return {obj, obj.objsize()};
+ invariant(_writeSizeEstimator);
+ return {obj, _writeSizeEstimator->estimateInsertSizeBytes(obj)};
}
void waitWhileFailPointEnabled() override;
diff --git a/src/mongo/db/pipeline/document_source_writer.h b/src/mongo/db/pipeline/document_source_writer.h
index 46bd0bf29eb..e0739382c7f 100644
--- a/src/mongo/db/pipeline/document_source_writer.h
+++ b/src/mongo/db/pipeline/document_source_writer.h
@@ -94,13 +94,14 @@ public:
const boost::intrusive_ptr<ExpressionContext>& expCtx)
: DocumentSource(stageName, expCtx),
_outputNs(std::move(outputNs)),
- _writeConcern(expCtx->opCtx->getWriteConcern()) {
+ _writeConcern(expCtx->opCtx->getWriteConcern()),
+ _writeSizeEstimator(
+ expCtx->mongoProcessInterface->getWriteSizeEstimator(expCtx->opCtx, outputNs)) {
uassert(31476,
"Cluster must be fully upgraded to version 4.4 with featureCompatibilityVersion"
" 4.4 before attempting to run $out/$merge with a non-primary read preference",
pExpCtx->mongoProcessInterface->supportsReadPreferenceForWriteOp(pExpCtx));
}
-
DepsTracker::State getDependencies(DepsTracker* deps) const override {
deps->needWholeDocument = true;
return DepsTracker::State::EXHAUSTIVE_ALL;
@@ -164,6 +165,9 @@ protected:
// respect the writeConcern of the original command.
WriteConcernOptions _writeConcern;
+ // An interface that is used to estimate the size of each write operation.
+ const std::unique_ptr<MongoProcessInterface::WriteSizeEstimator> _writeSizeEstimator;
+
private:
bool _initialized{false};
bool _done{false};
diff --git a/src/mongo/db/pipeline/process_interface/common_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_process_interface.cpp
index 5cd9327fafa..b43902c3f42 100644
--- a/src/mongo/db/pipeline/process_interface/common_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/common_process_interface.cpp
@@ -149,6 +149,12 @@ std::vector<FieldPath> CommonProcessInterface::collectDocumentKeyFieldsActingAsR
return {"_id"};
}
+std::unique_ptr<CommonProcessInterface::WriteSizeEstimator>
+CommonProcessInterface::getWriteSizeEstimator(OperationContext* opCtx,
+ const NamespaceString& ns) const {
+ return std::make_unique<LocalWriteSizeEstimator>();
+}
+
void CommonProcessInterface::updateClientOperationTime(OperationContext* opCtx) const {
// In order to support causal consistency in a replica set or a sharded cluster when reading
// with secondary read preference, the secondary must propagate the primary's operation time
diff --git a/src/mongo/db/pipeline/process_interface/common_process_interface.h b/src/mongo/db/pipeline/process_interface/common_process_interface.h
index 75c3e01c513..7900054aa34 100644
--- a/src/mongo/db/pipeline/process_interface/common_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/common_process_interface.h
@@ -46,6 +46,49 @@ public:
virtual ~CommonProcessInterface() = default;
/**
+ * Estimates the size of writes that will be executed on the current node. Note that this
+ * does not account for the full size of an update statement.
+ */
+ class LocalWriteSizeEstimator final : public WriteSizeEstimator {
+ public:
+ int estimateInsertSizeBytes(const BSONObj& insert) const override {
+ return insert.objsize();
+ }
+
+ int estimateUpdateSizeBytes(const BatchObject& batchObject,
+ UpsertType type) const override {
+ int size = std::get<write_ops::UpdateModification>(batchObject).objsize();
+ if (auto vars = std::get<boost::optional<BSONObj>>(batchObject)) {
+ size += vars->objsize();
+ }
+ return size;
+ }
+ };
+
+ /**
+ * Estimate the size of writes that will be sent to the replica set primary.
+ */
+ class TargetPrimaryWriteSizeEstimator final : public WriteSizeEstimator {
+ public:
+ int estimateInsertSizeBytes(const BSONObj& insert) const override {
+ return insert.objsize() + write_ops::kWriteCommandBSONArrayPerElementOverheadBytes;
+ }
+
+ int estimateUpdateSizeBytes(const BatchObject& batchObject,
+ UpsertType type) const override {
+ return write_ops::getUpdateSizeEstimate(
+ std::get<BSONObj>(batchObject),
+ std::get<write_ops::UpdateModification>(batchObject),
+ std::get<boost::optional<BSONObj>>(batchObject),
+ type != UpsertType::kNone /* includeUpsertSupplied */,
+ boost::none /* collation */,
+ boost::none /* arrayFilters */,
+ BSONObj() /* hint*/) +
+ write_ops::kWriteCommandBSONArrayPerElementOverheadBytes;
+ }
+ };
+
+ /**
* Returns true if the field names of 'keyPattern' are exactly those in 'uniqueKeyPaths', and
* each of the elements of 'keyPattern' is numeric, i.e. not "text", "$**", or any other special
* type of index.
@@ -64,6 +107,9 @@ public:
virtual std::vector<FieldPath> collectDocumentKeyFieldsActingAsRouter(
OperationContext*, const NamespaceString&) const override;
+ std::unique_ptr<WriteSizeEstimator> getWriteSizeEstimator(
+ OperationContext* opCtx, const NamespaceString& ns) const override;
+
virtual void updateClientOperationTime(OperationContext* opCtx) const final;
boost::optional<ChunkVersion> refreshAndGetCollectionVersion(
diff --git a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
index e10d959c932..9a29d0b5df9 100644
--- a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
@@ -74,7 +74,7 @@ class MongoProcessInterface {
public:
/**
* Storage for a batch of BSON Objects to be updated in the write namespace. For each element
- * in the batch we store a tuple of the folliwng elements:
+ * in the batch we store a tuple of the following elements:
* 1. BSONObj - specifies the query that identifies a document in the to collection to be
* updated.
* 2. write_ops::UpdateModification - either the new document we want to upsert or insert into
@@ -102,6 +102,19 @@ public:
enum class CurrentOpBacktraceMode { kIncludeBacktrace, kExcludeBacktrace };
/**
+ * Interface which estimates the size of a given write operation.
+ */
+ class WriteSizeEstimator {
+ public:
+ virtual ~WriteSizeEstimator() = default;
+
+ virtual int estimateInsertSizeBytes(const BSONObj& insert) const = 0;
+
+ virtual int estimateUpdateSizeBytes(const BatchObject& batchObject,
+ UpsertType type) const = 0;
+ };
+
+ /**
* Factory function to create MongoProcessInterface of the right type. The implementation will
* be installed by a lib higher up in the link graph depending on the application type.
*/
@@ -123,6 +136,12 @@ public:
virtual ~MongoProcessInterface(){};
/**
+ * Returns an instance of a 'WriteSizeEstimator' interface.
+ */
+ virtual std::unique_ptr<WriteSizeEstimator> getWriteSizeEstimator(
+ OperationContext* opCtx, const NamespaceString& ns) const = 0;
+
+ /**
* Creates a new TransactionHistoryIterator object. Only applicable in processes which support
* locally traversing the oplog.
*/
diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp
index a9fd76aabb3..e5d497aa0a2 100644
--- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp
@@ -105,6 +105,12 @@ bool supportsUniqueKey(const boost::intrusive_ptr<ExpressionContext>& expCtx,
} // namespace
+std::unique_ptr<CommonProcessInterface::WriteSizeEstimator>
+MongosProcessInterface::getWriteSizeEstimator(OperationContext* opCtx,
+ const NamespaceString& ns) const {
+ return std::make_unique<TargetPrimaryWriteSizeEstimator>();
+}
+
std::unique_ptr<Pipeline, PipelineDeleter> MongosProcessInterface::attachCursorSourceToPipeline(
Pipeline* ownedPipeline, bool allowTargetingShards) {
// On mongos we can't have local cursors.
diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h
index 639d17bf4f3..7141915b7d3 100644
--- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h
@@ -45,6 +45,9 @@ public:
virtual ~MongosProcessInterface() = default;
+ std::unique_ptr<WriteSizeEstimator> getWriteSizeEstimator(
+ OperationContext* opCtx, const NamespaceString& ns) const final;
+
boost::optional<Document> lookupSingleDocument(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& nss,
diff --git a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h
index 0d954fd1035..b8e14a5a52e 100644
--- a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h
@@ -44,6 +44,15 @@ class ReplicaSetNodeProcessInterface final : public NonShardServerProcessInterfa
public:
using NonShardServerProcessInterface::NonShardServerProcessInterface;
+ std::unique_ptr<WriteSizeEstimator> getWriteSizeEstimator(
+ OperationContext* opCtx, const NamespaceString& ns) const override {
+ if (_canWriteLocally(opCtx, ns)) {
+ return std::make_unique<LocalWriteSizeEstimator>();
+ } else {
+ return std::make_unique<TargetPrimaryWriteSizeEstimator>();
+ }
+ }
+
static std::shared_ptr<executor::TaskExecutor> getReplicaSetNodeExecutor(
ServiceContext* service);
diff --git a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
index 1b0b4863ef8..fcf0458ea3c 100644
--- a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
@@ -53,6 +53,22 @@ public:
MONGO_UNREACHABLE;
}
+ class StubWriteSizeEstimator final : public WriteSizeEstimator {
+ public:
+ int estimateInsertSizeBytes(const BSONObj& insert) const override {
+ MONGO_UNREACHABLE;
+ }
+
+ int estimateUpdateSizeBytes(const BatchObject& batchObject,
+ UpsertType type) const override {
+ MONGO_UNREACHABLE;
+ }
+ };
+ std::unique_ptr<WriteSizeEstimator> getWriteSizeEstimator(
+ OperationContext* opCtx, const NamespaceString& ns) const override {
+ return std::make_unique<StubWriteSizeEstimator>();
+ }
+
bool isSharded(OperationContext* opCtx, const NamespaceString& ns) override {
return false;
}
diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp
index 338e208a99f..4cee9a41252 100644
--- a/src/mongo/s/write_ops/batch_write_op.cpp
+++ b/src/mongo/s/write_ops/batch_write_op.cpp
@@ -65,7 +65,6 @@ struct WriteErrorDetailComp {
// batches before serializing.
//
// TODO: Revisit when we revisit command limits in general
-const int kEstUpdateOverheadBytes = (BSONObjMaxInternalSize - BSONObjMaxUserSize) / 100;
const int kEstDeleteOverheadBytes = (BSONObjMaxInternalSize - BSONObjMaxUserSize) / 100;
/**
@@ -171,41 +170,17 @@ int getWriteSizeBytes(const WriteOp& writeOp) {
return item.getDocument().objsize();
} else if (batchType == BatchedCommandRequest::BatchType_Update) {
// Note: Be conservative here - it's okay if we send slightly too many batches.
- auto estSize = static_cast<int>(BSONObj::kMinBSONLength);
- static const auto boolSize = 1;
-
- // Add the size of the 'collation' field, if present.
- estSize += !item.getUpdate().getCollation() ? 0
- : (UpdateOpEntry::kCollationFieldName.size() +
- item.getUpdate().getCollation()->objsize());
-
- // Add the size of the 'arrayFilters' field, if present.
- estSize += !item.getUpdate().getArrayFilters() ? 0 : ([&item]() {
- auto size = BSONObj::kMinBSONLength + UpdateOpEntry::kArrayFiltersFieldName.size();
- for (auto&& filter : *item.getUpdate().getArrayFilters()) {
- size += filter.objsize();
- }
- return size;
- })();
-
- // Add the sizes of the 'multi' and 'upsert' fields.
- estSize += UpdateOpEntry::kUpsertFieldName.size() + boolSize;
- estSize += UpdateOpEntry::kMultiFieldName.size() + boolSize;
-
- // Add the sizes of the 'q' and 'u' fields.
- estSize += (UpdateOpEntry::kQFieldName.size() + item.getUpdate().getQ().objsize() +
- UpdateOpEntry::kUFieldName.size() + item.getUpdate().getU().objsize());
-
- // Add the size of the 'c' field if present.
- if (auto constants = item.getUpdate().getC()) {
- estSize += UpdateOpEntry::kCFieldName.size() + item.getUpdate().getC()->objsize();
- }
-
- // Finally, add the constant updateOp overhead size.
- estSize += kEstUpdateOverheadBytes;
+ const auto& update = item.getUpdate();
+ auto estSize = write_ops::getUpdateSizeEstimate(update.getQ(),
+ update.getU(),
+ update.getC(),
+ update.getUpsertSupplied().has_value(),
+ update.getCollation(),
+ update.getArrayFilters(),
+ update.getHint());
// When running a debug build, verify that estSize is at least the BSON serialization size.
- dassert(estSize >= item.getUpdate().toBSON().objsize());
+ dassert(estSize >= update.toBSON().objsize());
return estSize;
} else if (batchType == BatchedCommandRequest::BatchType_Delete) {
// Note: Be conservative here - it's okay if we send slightly too many batches.