summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline
diff options
context:
space:
mode:
authorMihai Andrei <mihai.andrei@mongodb.com>2023-05-11 15:10:12 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-05-11 18:05:42 +0000
commit1e233afdb0def2aed7521ae087b575c95245e257 (patch)
tree5a2be72bb51c8597a01bda589f2b13b27d578753 /src/mongo/db/pipeline
parent5d0653dce309f18986c754d81d6ab2997ef95735 (diff)
downloadmongo-1e233afdb0def2aed7521ae087b575c95245e257.tar.gz
SERVER-74806 Account for header size when computing initial size of batch write
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r--src/mongo/db/pipeline/document_source_merge.cpp237
-rw-r--r--src/mongo/db/pipeline/document_source_merge.h15
-rw-r--r--src/mongo/db/pipeline/document_source_out.cpp7
-rw-r--r--src/mongo/db/pipeline/document_source_out.h11
-rw-r--r--src/mongo/db/pipeline/document_source_writer.h68
-rw-r--r--src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp53
-rw-r--r--src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h17
-rw-r--r--src/mongo/db/pipeline/process_interface/common_process_interface.cpp6
-rw-r--r--src/mongo/db/pipeline/process_interface/common_process_interface.h27
-rw-r--r--src/mongo/db/pipeline/process_interface/mongo_process_interface.h48
-rw-r--r--src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp2
-rw-r--r--src/mongo/db/pipeline/process_interface/mongos_process_interface.h6
-rw-r--r--src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp24
-rw-r--r--src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h6
-rw-r--r--src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp34
-rw-r--r--src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h7
-rw-r--r--src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp32
-rw-r--r--src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h19
-rw-r--r--src/mongo/db/pipeline/process_interface/standalone_process_interface.h5
-rw-r--r--src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h17
20 files changed, 384 insertions, 257 deletions
diff --git a/src/mongo/db/pipeline/document_source_merge.cpp b/src/mongo/db/pipeline/document_source_merge.cpp
index 722de5936f9..896a50ad0dc 100644
--- a/src/mongo/db/pipeline/document_source_merge.cpp
+++ b/src/mongo/db/pipeline/document_source_merge.cpp
@@ -60,6 +60,7 @@ namespace {
using MergeStrategyDescriptor = DocumentSourceMerge::MergeStrategyDescriptor;
using MergeMode = MergeStrategyDescriptor::MergeMode;
using MergeStrategy = MergeStrategyDescriptor::MergeStrategy;
+using BatchedCommandGenerator = MergeStrategyDescriptor::BatchedCommandGenerator;
using MergeStrategyDescriptorsMap = std::map<const MergeMode, const MergeStrategyDescriptor>;
using WhenMatched = MergeStrategyDescriptor::WhenMatched;
using WhenNotMatched = MergeStrategyDescriptor::WhenNotMatched;
@@ -86,6 +87,55 @@ constexpr auto kPipelineDiscardMode = MergeMode{WhenMatched::kPipeline, WhenNotM
const auto kDefaultPipelineLet = BSON("new"
<< "$$ROOT");
+BatchedCommandGenerator makeInsertCommandGenerator() {
+ return [](const auto& expCtx, const auto& ns) -> BatchedCommandRequest {
+ return DocumentSourceMerge::DocumentSourceWriter::makeInsertCommand(
+ ns, expCtx->bypassDocumentValidation);
+ };
+}
+
+BatchedCommandGenerator makeUpdateCommandGenerator() {
+ return [](const auto& expCtx, const auto& ns) -> BatchedCommandRequest {
+ write_ops::UpdateCommandRequest updateOp(ns);
+ updateOp.setWriteCommandRequestBase([&] {
+ write_ops::WriteCommandRequestBase wcb;
+ wcb.setOrdered(false);
+ wcb.setBypassDocumentValidation(expCtx->bypassDocumentValidation);
+ return wcb;
+ }());
+ auto [constants, letParams] =
+ expCtx->variablesParseState.transitionalCompatibilitySerialize(expCtx->variables);
+ updateOp.setLegacyRuntimeConstants(std::move(constants));
+ if (!letParams.isEmpty()) {
+ updateOp.setLet(std::move(letParams));
+ }
+ return BatchedCommandRequest(std::move(updateOp));
+ };
+}
+
+/**
+ * Converts 'batch' into a vector of UpdateOpEntries.
+ */
+std::vector<write_ops::UpdateOpEntry> constructUpdateEntries(
+ DocumentSourceMerge::DocumentSourceWriter::BatchedObjects&& batch,
+ UpsertType upsert,
+ bool multi) {
+ std::vector<write_ops::UpdateOpEntry> updateEntries;
+ for (auto&& obj : batch) {
+ write_ops::UpdateOpEntry entry;
+ auto&& [q, u, c] = obj;
+ entry.setQ(std::move(q));
+ entry.setU(std::move(u));
+ entry.setC(std::move(c));
+ entry.setUpsert(upsert != UpsertType::kNone);
+ entry.setUpsertSupplied({{entry.getUpsert(), upsert == UpsertType::kInsertSuppliedDoc}});
+ entry.setMulti(multi);
+
+ updateEntries.push_back(std::move(entry));
+ }
+ return updateEntries;
+}
+
/**
* Creates a merge strategy which uses update semantics to perform a merge operation.
*/
@@ -95,10 +145,13 @@ MergeStrategy makeUpdateStrategy() {
const auto& wc,
auto epoch,
auto&& batch,
+ auto&& bcr,
UpsertType upsert) {
constexpr auto multi = false;
+ auto updateCommand = bcr.extractUpdateRequest();
+ updateCommand->setUpdates(constructUpdateEntries(std::move(batch), upsert, multi));
uassertStatusOK(expCtx->mongoProcessInterface->update(
- expCtx, ns, std::move(batch), wc, upsert, multi, epoch));
+ expCtx, ns, std::move(updateCommand), wc, upsert, multi, epoch));
};
}
@@ -115,11 +168,14 @@ MergeStrategy makeStrictUpdateStrategy() {
const auto& wc,
auto epoch,
auto&& batch,
+ auto&& bcr,
UpsertType upsert) {
const int64_t batchSize = batch.size();
constexpr auto multi = false;
+ auto updateCommand = bcr.extractUpdateRequest();
+ updateCommand->setUpdates(constructUpdateEntries(std::move(batch), upsert, multi));
auto updateResult = uassertStatusOK(expCtx->mongoProcessInterface->update(
- expCtx, ns, std::move(batch), wc, upsert, multi, epoch));
+ expCtx, ns, std::move(updateCommand), wc, upsert, multi, epoch));
uassert(ErrorCodes::MergeStageNoMatchingDocument,
"{} could not find a matching document in the target collection "
"for at least one document in the source collection"_format(kStageName),
@@ -136,6 +192,7 @@ MergeStrategy makeInsertStrategy() {
const auto& wc,
auto epoch,
auto&& batch,
+ auto&& bcr,
UpsertType upsertType) {
std::vector<BSONObj> objectsToInsert(batch.size());
// The batch stores replacement style updates, but for this "insert" style of $merge we'd
@@ -143,8 +200,10 @@ MergeStrategy makeInsertStrategy() {
std::transform(batch.begin(), batch.end(), objectsToInsert.begin(), [](const auto& obj) {
return std::get<UpdateModification>(obj).getUpdateReplacement();
});
- uassertStatusOK(expCtx->mongoProcessInterface->insert(
- expCtx, ns, std::move(objectsToInsert), wc, epoch));
+ auto insertCommand = bcr.extractInsertRequest();
+ insertCommand->setDocuments(std::move(objectsToInsert));
+ uassertStatusOK(
+ expCtx->mongoProcessInterface->insert(expCtx, ns, std::move(insertCommand), wc, epoch));
};
}
@@ -174,72 +233,95 @@ const MergeStrategyDescriptorsMap& getDescriptors() {
// be initialized first. By wrapping the map into a function we can guarantee that it won't be
// initialized until the first use, which is when the program already started and all global
// variables had been initialized.
- static const auto mergeStrategyDescriptors = MergeStrategyDescriptorsMap{
- // whenMatched: replace, whenNotMatched: insert
- {kReplaceInsertMode,
- {kReplaceInsertMode,
- {ActionType::insert, ActionType::update},
- makeUpdateStrategy(),
- {},
- UpsertType::kGenerateNewDoc}},
- // whenMatched: replace, whenNotMatched: fail
- {kReplaceFailMode,
- {kReplaceFailMode,
- {ActionType::update},
- makeStrictUpdateStrategy(),
- {},
- UpsertType::kNone}},
- // whenMatched: replace, whenNotMatched: discard
- {kReplaceDiscardMode,
- {kReplaceDiscardMode, {ActionType::update}, makeUpdateStrategy(), {}, UpsertType::kNone}},
- // whenMatched: merge, whenNotMatched: insert
- {kMergeInsertMode,
- {kMergeInsertMode,
- {ActionType::insert, ActionType::update},
- makeUpdateStrategy(),
- makeUpdateTransform("$set"),
- UpsertType::kGenerateNewDoc}},
- // whenMatched: merge, whenNotMatched: fail
- {kMergeFailMode,
- {kMergeFailMode,
- {ActionType::update},
- makeStrictUpdateStrategy(),
- makeUpdateTransform("$set"),
- UpsertType::kNone}},
- // whenMatched: merge, whenNotMatched: discard
- {kMergeDiscardMode,
- {kMergeDiscardMode,
- {ActionType::update},
- makeUpdateStrategy(),
- makeUpdateTransform("$set"),
- UpsertType::kNone}},
- // whenMatched: keepExisting, whenNotMatched: insert
- {kKeepExistingInsertMode,
- {kKeepExistingInsertMode,
- {ActionType::insert, ActionType::update},
- makeUpdateStrategy(),
- makeUpdateTransform("$setOnInsert"),
- UpsertType::kGenerateNewDoc}},
- // whenMatched: [pipeline], whenNotMatched: insert
- {kPipelineInsertMode,
- {kPipelineInsertMode,
- {ActionType::insert, ActionType::update},
- makeUpdateStrategy(),
- {},
- UpsertType::kInsertSuppliedDoc}},
- // whenMatched: [pipeline], whenNotMatched: fail
- {kPipelineFailMode,
- {kPipelineFailMode,
- {ActionType::update},
- makeStrictUpdateStrategy(),
- {},
- UpsertType::kNone}},
- // whenMatched: [pipeline], whenNotMatched: discard
- {kPipelineDiscardMode,
- {kPipelineDiscardMode, {ActionType::update}, makeUpdateStrategy(), {}, UpsertType::kNone}},
- // whenMatched: fail, whenNotMatched: insert
- {kFailInsertMode,
- {kFailInsertMode, {ActionType::insert}, makeInsertStrategy(), {}, UpsertType::kNone}}};
+ static const auto mergeStrategyDescriptors =
+ MergeStrategyDescriptorsMap{// whenMatched: replace, whenNotMatched: insert
+ {kReplaceInsertMode,
+ {kReplaceInsertMode,
+ {ActionType::insert, ActionType::update},
+ makeUpdateStrategy(),
+ {},
+ UpsertType::kGenerateNewDoc,
+ makeUpdateCommandGenerator()}},
+ // whenMatched: replace, whenNotMatched: fail
+ {kReplaceFailMode,
+ {kReplaceFailMode,
+ {ActionType::update},
+ makeStrictUpdateStrategy(),
+ {},
+ UpsertType::kNone,
+ makeUpdateCommandGenerator()}},
+ // whenMatched: replace, whenNotMatched: discard
+ {kReplaceDiscardMode,
+ {kReplaceDiscardMode,
+ {ActionType::update},
+ makeUpdateStrategy(),
+ {},
+ UpsertType::kNone,
+ makeUpdateCommandGenerator()}},
+ // whenMatched: merge, whenNotMatched: insert
+ {kMergeInsertMode,
+ {kMergeInsertMode,
+ {ActionType::insert, ActionType::update},
+ makeUpdateStrategy(),
+ makeUpdateTransform("$set"),
+ UpsertType::kGenerateNewDoc,
+ makeUpdateCommandGenerator()}},
+ // whenMatched: merge, whenNotMatched: fail
+ {kMergeFailMode,
+ {kMergeFailMode,
+ {ActionType::update},
+ makeStrictUpdateStrategy(),
+ makeUpdateTransform("$set"),
+ UpsertType::kNone,
+ makeUpdateCommandGenerator()}},
+ // whenMatched: merge, whenNotMatched: discard
+ {kMergeDiscardMode,
+ {kMergeDiscardMode,
+ {ActionType::update},
+ makeUpdateStrategy(),
+ makeUpdateTransform("$set"),
+ UpsertType::kNone,
+ makeUpdateCommandGenerator()}},
+ // whenMatched: keepExisting, whenNotMatched: insert
+ {kKeepExistingInsertMode,
+ {kKeepExistingInsertMode,
+ {ActionType::insert, ActionType::update},
+ makeUpdateStrategy(),
+ makeUpdateTransform("$setOnInsert"),
+ UpsertType::kGenerateNewDoc,
+ makeUpdateCommandGenerator()}},
+ // whenMatched: [pipeline], whenNotMatched: insert
+ {kPipelineInsertMode,
+ {kPipelineInsertMode,
+ {ActionType::insert, ActionType::update},
+ makeUpdateStrategy(),
+ {},
+ UpsertType::kInsertSuppliedDoc,
+ makeUpdateCommandGenerator()}},
+ // whenMatched: [pipeline], whenNotMatched: fail
+ {kPipelineFailMode,
+ {kPipelineFailMode,
+ {ActionType::update},
+ makeStrictUpdateStrategy(),
+ {},
+ UpsertType::kNone,
+ makeUpdateCommandGenerator()}},
+ // whenMatched: [pipeline], whenNotMatched: discard
+ {kPipelineDiscardMode,
+ {kPipelineDiscardMode,
+ {ActionType::update},
+ makeUpdateStrategy(),
+ {},
+ UpsertType::kNone,
+ makeUpdateCommandGenerator()}},
+ // whenMatched: fail, whenNotMatched: insert
+ {kFailInsertMode,
+ {kFailInsertMode,
+ {ActionType::insert},
+ makeInsertStrategy(),
+ {},
+ UpsertType::kNone,
+ makeInsertCommandGenerator()}}};
return mergeStrategyDescriptors;
}
@@ -599,14 +681,19 @@ std::pair<DocumentSourceMerge::BatchObject, int> DocumentSourceMerge::makeBatchO
_writeSizeEstimator->estimateUpdateSizeBytes(batchObject, _descriptor.upsertType)};
}
-void DocumentSourceMerge::spill(BatchedObjects&& batch) try {
+void DocumentSourceMerge::spill(BatchedCommandRequest&& bcr, BatchedObjects&& batch) try {
DocumentSourceWriteBlock writeBlock(pExpCtx->opCtx);
auto targetEpoch = _targetCollectionPlacementVersion
? boost::optional<OID>(_targetCollectionPlacementVersion->epoch())
: boost::none;
- _descriptor.strategy(
- pExpCtx, _outputNs, _writeConcern, targetEpoch, std::move(batch), _descriptor.upsertType);
+ _descriptor.strategy(pExpCtx,
+ _outputNs,
+ _writeConcern,
+ targetEpoch,
+ std::move(batch),
+ std::move(bcr),
+ _descriptor.upsertType);
} catch (const ExceptionFor<ErrorCodes::ImmutableField>& ex) {
uassertStatusOKWithContext(ex.toStatus(),
"$merge failed to update the matching document, did you "
@@ -630,6 +717,10 @@ void DocumentSourceMerge::spill(BatchedObjects&& batch) try {
}
}
+BatchedCommandRequest DocumentSourceMerge::initializeBatchedWriteRequest() const {
+ return _descriptor.batchedCommandGenerator(pExpCtx, _outputNs);
+}
+
void DocumentSourceMerge::waitWhileFailPointEnabled() {
CurOpFailpointHelpers::waitWhileFailPointEnabled(
&hangWhileBuildingDocumentSourceMergeBatch,
diff --git a/src/mongo/db/pipeline/document_source_merge.h b/src/mongo/db/pipeline/document_source_merge.h
index b0eed7a6df9..0349374fadb 100644
--- a/src/mongo/db/pipeline/document_source_merge.h
+++ b/src/mongo/db/pipeline/document_source_merge.h
@@ -48,8 +48,9 @@ public:
// A descriptor for a merge strategy. Holds a merge strategy function and a set of actions the
// client should be authorized to perform in order to be able to execute a merge operation using
- // this merge strategy. If a 'BatchTransform' function is provided, it will be called when
- // constructing a batch object to transform updates.
+ // this merge strategy. Additionally holds a 'BatchedCommandGenerator' that will initialize a
+ // BatchedWriteRequest for executing the batch write. If a 'BatchTransform' function is
+ // provided, it will be called when constructing a batch object to transform updates.
struct MergeStrategyDescriptor {
using WhenMatched = MergeWhenMatchedModeEnum;
using WhenNotMatched = MergeWhenNotMatchedModeEnum;
@@ -62,13 +63,19 @@ public:
const WriteConcernOptions&,
boost::optional<OID>,
BatchedObjects&&,
+ BatchedCommandRequest&&,
UpsertType upsert)>;
+ // A function object that will be invoked to generate a BatchedCommandRequest.
+ using BatchedCommandGenerator = std::function<BatchedCommandRequest(
+ const boost::intrusive_ptr<ExpressionContext>&, const NamespaceString&)>;
+
MergeMode mode;
ActionSet actions;
MergeStrategy strategy;
BatchTransform transform;
UpsertType upsertType;
+ BatchedCommandGenerator batchedCommandGenerator;
};
/**
@@ -218,7 +225,9 @@ private:
return bob.obj();
}
- void spill(BatchedObjects&& batch) override;
+ void spill(BatchedCommandRequest&& bcr, BatchedObjects&& batch) override;
+
+ BatchedCommandRequest initializeBatchedWriteRequest() const override;
void waitWhileFailPointEnabled() override;
diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp
index 8763cb0874c..e45933d4f73 100644
--- a/src/mongo/db/pipeline/document_source_out.cpp
+++ b/src/mongo/db/pipeline/document_source_out.cpp
@@ -284,6 +284,13 @@ void DocumentSourceOut::finalize() {
_timeseriesStateConsistent = true;
}
+BatchedCommandRequest DocumentSourceOut::initializeBatchedWriteRequest() const {
+ // Note that our insert targets '_tempNs' (or the associated timeseries view) since we will
+ // never write to 'outputNs' directly.
+ const auto& targetNss = _timeseries ? _tempNs.getTimeseriesViewNamespace() : _tempNs;
+ return DocumentSourceWriter::makeInsertCommand(targetNss, pExpCtx->bypassDocumentValidation);
+}
+
boost::intrusive_ptr<DocumentSource> DocumentSourceOut::create(
NamespaceString outputNs,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h
index 60d6a865c1d..becd7998bd0 100644
--- a/src/mongo/db/pipeline/document_source_out.h
+++ b/src/mongo/db/pipeline/document_source_out.h
@@ -127,20 +127,23 @@ private:
void finalize() override;
- void spill(BatchedObjects&& batch) override {
+ void spill(BatchedCommandRequest&& bcr, BatchedObjects&& batch) override {
DocumentSourceWriteBlock writeBlock(pExpCtx->opCtx);
+ auto insertCommand = bcr.extractInsertRequest();
+ insertCommand->setDocuments(std::move(batch));
auto targetEpoch = boost::none;
+
if (_timeseries) {
uassertStatusOK(pExpCtx->mongoProcessInterface->insertTimeseries(
pExpCtx,
_tempNs.getTimeseriesViewNamespace(),
- std::move(batch),
+ std::move(insertCommand),
_writeConcern,
targetEpoch));
} else {
uassertStatusOK(pExpCtx->mongoProcessInterface->insert(
- pExpCtx, _tempNs, std::move(batch), _writeConcern, targetEpoch));
+ pExpCtx, _tempNs, std::move(insertCommand), _writeConcern, targetEpoch));
}
}
@@ -150,6 +153,8 @@ private:
return {obj, _writeSizeEstimator->estimateInsertSizeBytes(obj)};
}
+ BatchedCommandRequest initializeBatchedWriteRequest() const override;
+
void waitWhileFailPointEnabled() override;
/**
diff --git a/src/mongo/db/pipeline/document_source_writer.h b/src/mongo/db/pipeline/document_source_writer.h
index 124625d6f94..b2c5363d429 100644
--- a/src/mongo/db/pipeline/document_source_writer.h
+++ b/src/mongo/db/pipeline/document_source_writer.h
@@ -39,6 +39,7 @@
#include "mongo/db/read_concern.h"
#include "mongo/db/storage/recovery_unit.h"
#include "mongo/rpc/metadata/impersonated_user_metadata.h"
+#include "mongo/s/write_ops/batched_command_request.h"
namespace mongo {
using namespace fmt::literals;
@@ -84,11 +85,14 @@ public:
/**
* This is a base abstract class for all stages performing a write operation into an output
* collection. The writes are organized in batches in which elements are objects of the templated
- * type 'B'. A subclass must override two methods to be able to write into the output collection:
+ * type 'B'. A subclass must override the following methods to be able to write into the output
+ * collection:
*
- * 1. 'makeBatchObject()' - to create an object of type 'B' from the given 'Document', which is,
+ * - 'makeBatchObject()' - creates an object of type 'B' from the given 'Document', which is,
* essentially, a result of the input source's 'getNext()' .
- * 2. 'spill()' - to write the batch into the output collection.
+ * - 'spill()' - writes the batch into the output collection.
+ * - 'initializeBatchedWriteRequest()' - initializes the request object for writing a batch to
+ * the output collection.
*
* Two other virtual methods exist which a subclass may override: 'initialize()' and 'finalize()',
* which are called before the first element is read from the input source, and after the last one
@@ -100,6 +104,18 @@ public:
using BatchObject = B;
using BatchedObjects = std::vector<BatchObject>;
+ static BatchedCommandRequest makeInsertCommand(const NamespaceString& outputNs,
+ bool bypassDocumentValidation) {
+ write_ops::InsertCommandRequest insertOp(outputNs);
+ insertOp.setWriteCommandRequestBase([&] {
+ write_ops::WriteCommandRequestBase wcb;
+ wcb.setOrdered(false);
+ wcb.setBypassDocumentValidation(bypassDocumentValidation);
+ return wcb;
+ }());
+ return BatchedCommandRequest(std::move(insertOp));
+ }
+
DocumentSourceWriter(const char* stageName,
NamespaceString outputNs,
const boost::intrusive_ptr<ExpressionContext>& expCtx)
@@ -146,9 +162,31 @@ protected:
virtual void finalize() {}
/**
- * Writes the documents in 'batch' to the output namespace.
+ * Writes the documents in 'batch' to the output namespace via 'bcr'.
+ */
+ virtual void spill(BatchedCommandRequest&& bcr, BatchedObjects&& batch) = 0;
+
+ /**
+ * Estimates the size of the header of a batch write (that is, the size of the write command
+ * minus the size of write statements themselves).
+ */
+ int estimateWriteHeaderSize(const BatchedCommandRequest& bcr) const {
+ using BatchType = BatchedCommandRequest::BatchType;
+ switch (bcr.getBatchType()) {
+ case BatchType::BatchType_Insert:
+ return _writeSizeEstimator->estimateInsertHeaderSize(bcr.getInsertRequest());
+ case BatchType::BatchType_Update:
+ return _writeSizeEstimator->estimateUpdateHeaderSize(bcr.getUpdateRequest());
+ case BatchType::BatchType_Delete:
+ break;
+ }
+ MONGO_UNREACHABLE;
+ }
+
+ /**
+ * Constructs and configures a BatchedCommandRequest for performing a batch write.
*/
- virtual void spill(BatchedObjects&& batch) = 0;
+ virtual BatchedCommandRequest initializeBatchedWriteRequest() const = 0;
/**
* Creates a batch object from the given document and returns it to the caller along with the
@@ -210,15 +248,20 @@ DocumentSource::GetNextResult DocumentSourceWriter<B>::doGetNext() {
// and assume the rest can fit in the 16KB already built into BSONObjMaxUserSize.
const auto estimatedMetadataSizeBytes =
rpc::estimateImpersonatedUserMetadataSize(pExpCtx->opCtx);
+
+ BatchedCommandRequest batchWrite = initializeBatchedWriteRequest();
+ const auto writeHeaderSize = estimateWriteHeaderSize(batchWrite);
+ const auto initialRequestSize = estimatedMetadataSizeBytes + writeHeaderSize;
+
uassert(7637800,
"Unable to proceed with write while metadata size ({}KB) exceeds {}KB"_format(
- estimatedMetadataSizeBytes / 1024, BSONObjMaxUserSize / 1024),
- estimatedMetadataSizeBytes <= BSONObjMaxUserSize);
+ initialRequestSize / 1024, BSONObjMaxUserSize / 1024),
+ initialRequestSize <= BSONObjMaxUserSize);
- const auto maxBatchSizeBytes = BSONObjMaxUserSize - estimatedMetadataSizeBytes;
- BatchedObjects batch;
- std::size_t bufferedBytes = 0;
+ const auto maxBatchSizeBytes = BSONObjMaxUserSize - initialRequestSize;
+ BatchedObjects batch;
+ size_t bufferedBytes = 0;
auto nextInput = pSource->getNext();
for (; nextInput.isAdvanced(); nextInput = pSource->getNext()) {
waitWhileFailPointEnabled();
@@ -230,14 +273,15 @@ DocumentSource::GetNextResult DocumentSourceWriter<B>::doGetNext() {
if (!batch.empty() &&
(bufferedBytes > maxBatchSizeBytes ||
batch.size() >= write_ops::kMaxWriteBatchSize)) {
- spill(std::move(batch));
+ spill(std::move(batchWrite), std::move(batch));
batch.clear();
+ batchWrite = initializeBatchedWriteRequest();
bufferedBytes = objSize;
}
batch.push_back(obj);
}
if (!batch.empty()) {
- spill(std::move(batch));
+ spill(std::move(batchWrite), std::move(batch));
batch.clear();
}
diff --git a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp
index 9c2dd64172f..c3db7716d48 100644
--- a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp
@@ -788,59 +788,6 @@ CommonMongodProcessInterface::ensureFieldsUniqueOrResolveDocumentKey(
return {*fieldPaths, targetCollectionPlacementVersion};
}
-write_ops::InsertCommandRequest CommonMongodProcessInterface::buildInsertOp(
- const NamespaceString& nss, std::vector<BSONObj>&& objs, bool bypassDocValidation) {
- write_ops::InsertCommandRequest insertOp(nss);
- insertOp.setDocuments(std::move(objs));
- insertOp.setWriteCommandRequestBase([&] {
- write_ops::WriteCommandRequestBase wcb;
- wcb.setOrdered(false);
- wcb.setBypassDocumentValidation(bypassDocValidation);
- return wcb;
- }());
- return insertOp;
-}
-
-write_ops::UpdateCommandRequest CommonMongodProcessInterface::buildUpdateOp(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& nss,
- BatchedObjects&& batch,
- UpsertType upsert,
- bool multi) {
- write_ops::UpdateCommandRequest updateOp(nss);
- updateOp.setUpdates([&] {
- std::vector<write_ops::UpdateOpEntry> updateEntries;
- for (auto&& obj : batch) {
- updateEntries.push_back([&] {
- write_ops::UpdateOpEntry entry;
- auto&& [q, u, c] = obj;
- entry.setQ(std::move(q));
- entry.setU(std::move(u));
- entry.setC(std::move(c));
- entry.setUpsert(upsert != UpsertType::kNone);
- entry.setUpsertSupplied(
- {{entry.getUpsert(), upsert == UpsertType::kInsertSuppliedDoc}});
- entry.setMulti(multi);
- return entry;
- }());
- }
- return updateEntries;
- }());
- updateOp.setWriteCommandRequestBase([&] {
- write_ops::WriteCommandRequestBase wcb;
- wcb.setOrdered(false);
- wcb.setBypassDocumentValidation(expCtx->bypassDocumentValidation);
- return wcb;
- }());
- auto [constants, letParams] =
- expCtx->variablesParseState.transitionalCompatibilitySerialize(expCtx->variables);
- updateOp.setLegacyRuntimeConstants(std::move(constants));
- if (!letParams.isEmpty()) {
- updateOp.setLet(std::move(letParams));
- }
- return updateOp;
-}
-
BSONObj CommonMongodProcessInterface::_convertRenameToInternalRename(
OperationContext* opCtx,
const NamespaceString& sourceNs,
diff --git a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h
index 09b3ed8fa33..07e2d0370ce 100644
--- a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h
@@ -145,23 +145,6 @@ protected:
const Document& documentKey,
MakePipelineOptions opts);
- /**
- * Builds an ordered insert op on namespace 'nss' and documents to be written 'objs'.
- */
- write_ops::InsertCommandRequest buildInsertOp(const NamespaceString& nss,
- std::vector<BSONObj>&& objs,
- bool bypassDocValidation);
-
- /**
- * Builds an ordered update op on namespace 'nss' with update entries contained in 'batch'.
- */
- write_ops::UpdateCommandRequest buildUpdateOp(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& nss,
- BatchedObjects&& batch,
- UpsertType upsert,
- bool multi);
-
BSONObj _reportCurrentOpForClient(OperationContext* opCtx,
Client* client,
CurrentOpTruncateMode truncateOps,
diff --git a/src/mongo/db/pipeline/process_interface/common_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_process_interface.cpp
index c3a978ca766..5cf57474365 100644
--- a/src/mongo/db/pipeline/process_interface/common_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/common_process_interface.cpp
@@ -168,12 +168,6 @@ std::vector<FieldPath> CommonProcessInterface::collectDocumentKeyFieldsActingAsR
return {"_id"};
}
-std::unique_ptr<CommonProcessInterface::WriteSizeEstimator>
-CommonProcessInterface::getWriteSizeEstimator(OperationContext* opCtx,
- const NamespaceString& ns) const {
- return std::make_unique<LocalWriteSizeEstimator>();
-}
-
void CommonProcessInterface::updateClientOperationTime(OperationContext* opCtx) const {
// In order to support causal consistency in a replica set or a sharded cluster when reading
// with secondary read preference, the secondary must propagate the primary's operation time
diff --git a/src/mongo/db/pipeline/process_interface/common_process_interface.h b/src/mongo/db/pipeline/process_interface/common_process_interface.h
index b3b5f26468f..4a4f1d1e990 100644
--- a/src/mongo/db/pipeline/process_interface/common_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/common_process_interface.h
@@ -32,6 +32,7 @@
#include <vector>
#include "mongo/bson/bsonobj.h"
+#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/process_interface/mongo_process_interface.h"
namespace mongo {
@@ -47,10 +48,22 @@ public:
/**
* Estimates the size of writes that will be executed on the current node. Note that this
- * does not account for the full size of an update statement.
+ * does not account for the full size of an update statement because in the case of local
+ * writes, we will not have to serialize to BSON and are therefore not subject to the 16MB
+ * BSONObj size limit.
*/
class LocalWriteSizeEstimator final : public WriteSizeEstimator {
public:
+ int estimateInsertHeaderSize(
+ const write_ops::InsertCommandRequest& insertReq) const override {
+ return 0;
+ }
+
+ int estimateUpdateHeaderSize(
+ const write_ops::UpdateCommandRequest& insertReq) const override {
+ return 0;
+ }
+
int estimateInsertSizeBytes(const BSONObj& insert) const override {
return insert.objsize();
}
@@ -70,6 +83,16 @@ public:
*/
class TargetPrimaryWriteSizeEstimator final : public WriteSizeEstimator {
public:
+ int estimateInsertHeaderSize(
+ const write_ops::InsertCommandRequest& insertReq) const override {
+ return write_ops::getInsertHeaderSizeEstimate(insertReq);
+ }
+
+ int estimateUpdateHeaderSize(
+ const write_ops::UpdateCommandRequest& updateReq) const override {
+ return write_ops::getUpdateHeaderSizeEstimate(updateReq);
+ }
+
int estimateInsertSizeBytes(const BSONObj& insert) const override {
return insert.objsize() + write_ops::kWriteCommandBSONArrayPerElementOverheadBytes;
}
@@ -109,8 +132,6 @@ public:
virtual std::vector<FieldPath> collectDocumentKeyFieldsActingAsRouter(
OperationContext*, const NamespaceString&) const override;
- std::unique_ptr<WriteSizeEstimator> getWriteSizeEstimator(
- OperationContext* opCtx, const NamespaceString& ns) const override;
virtual void updateClientOperationTime(OperationContext* opCtx) const final;
diff --git a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
index 1aa7d6be8de..05ee7dd7ad7 100644
--- a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
@@ -112,8 +112,19 @@ public:
public:
virtual ~WriteSizeEstimator() = default;
+ /**
+ * Set of functions which estimate the entire size of a write command except for the array
+ * of write statements themselves.
+ */
+ virtual int estimateInsertHeaderSize(
+ const write_ops::InsertCommandRequest& insertReq) const = 0;
+ virtual int estimateUpdateHeaderSize(
+ const write_ops::UpdateCommandRequest& updateReq) const = 0;
+
+ /**
+ * Set of functions which estimate the size of a single write statement.
+ */
virtual int estimateInsertSizeBytes(const BSONObj& insert) const = 0;
-
virtual int estimateUpdateSizeBytes(const BatchObject& batchObject,
UpsertType type) const = 0;
};
@@ -168,34 +179,35 @@ public:
virtual void updateClientOperationTime(OperationContext* opCtx) const = 0;
/**
- * Inserts 'objs' into 'ns' and returns an error Status if the insert fails. If 'targetEpoch' is
- * set, throws ErrorCodes::StaleEpoch if the targeted collection does not have the same epoch or
- * the epoch changes during the course of the insert.
+ * Executes 'insertCommand' against 'ns' and returns an error Status if the insert fails. If
+ * 'targetEpoch' is set, throws ErrorCodes::StaleEpoch if the targeted collection does not have
+ * the same epoch or the epoch changes during the course of the insert.
*/
virtual Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- std::vector<BSONObj>&& objs,
+ std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) = 0;
virtual Status insertTimeseries(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- std::vector<BSONObj>&& objs,
+ std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) = 0;
/**
- * Updates the documents matching 'queries' with the objects 'updates'. Returns an error Status
- * if any of the updates fail, otherwise returns an 'UpdateResult' objects with the details of
- * the update operation. If 'targetEpoch' is set, throws ErrorCodes::StaleEpoch if the targeted
- * collection does not have the same epoch, or if the epoch changes during the update.
- */
- virtual StatusWith<UpdateResult> update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& ns,
- BatchedObjects&& batch,
- const WriteConcernOptions& wc,
- UpsertType upsert,
- bool multi,
- boost::optional<OID> targetEpoch) = 0;
+ * Executes the updates described by 'updateCommand'. Returns an error Status if any of the
+ * updates fail, otherwise returns an 'UpdateResult' objects with the details of the update
+ * operation. If 'targetEpoch' is set, throws ErrorCodes::StaleEpoch if the targeted collection
+ * does not have the same epoch, or if the epoch changes during the update.
+ */
+ virtual StatusWith<UpdateResult> update(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ std::unique_ptr<write_ops::UpdateCommandRequest> updateCommand,
+ const WriteConcernOptions& wc,
+ UpsertType upsert,
+ bool multi,
+ boost::optional<OID> targetEpoch) = 0;
/**
* Returns index usage statistics for each index on collection 'ns' along with additional
diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp
index 9e4a6f7cce7..33e900c06a9 100644
--- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp
@@ -98,7 +98,7 @@ bool supportsUniqueKey(const boost::intrusive_ptr<ExpressionContext>& expCtx,
} // namespace
-std::unique_ptr<CommonProcessInterface::WriteSizeEstimator>
+std::unique_ptr<MongoProcessInterface::WriteSizeEstimator>
MongosProcessInterface::getWriteSizeEstimator(OperationContext* opCtx,
const NamespaceString& ns) const {
return std::make_unique<TargetPrimaryWriteSizeEstimator>();
diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h
index a7dea455057..0ea14f6dcc2 100644
--- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h
@@ -72,7 +72,7 @@ public:
Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- std::vector<BSONObj>&& objs,
+ std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID>) final {
MONGO_UNREACHABLE;
@@ -80,7 +80,7 @@ public:
Status insertTimeseries(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- std::vector<BSONObj>&& objs,
+ std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) final {
MONGO_UNREACHABLE;
@@ -88,7 +88,7 @@ public:
StatusWith<UpdateResult> update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- BatchedObjects&& batch,
+ std::unique_ptr<write_ops::UpdateCommandRequest> updateCommand,
const WriteConcernOptions& wc,
UpsertType upsert,
bool multi,
diff --git a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp
index e5bceac9686..f153109e0e6 100644
--- a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp
@@ -108,13 +108,13 @@ boost::optional<Document> NonShardServerProcessInterface::lookupSingleDocument(
return lookedUpDocument;
}
-Status NonShardServerProcessInterface::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& ns,
- std::vector<BSONObj>&& objs,
- const WriteConcernOptions& wc,
- boost::optional<OID> targetEpoch) {
- auto writeResults = write_ops_exec::performInserts(
- expCtx->opCtx, buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation));
+Status NonShardServerProcessInterface::insert(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
+ const WriteConcernOptions& wc,
+ boost::optional<OID> targetEpoch) {
+ auto writeResults = write_ops_exec::performInserts(expCtx->opCtx, *insertCommand);
// Need to check each result in the batch since the writes are unordered.
for (const auto& result : writeResults.results) {
@@ -128,12 +128,11 @@ Status NonShardServerProcessInterface::insert(const boost::intrusive_ptr<Express
Status NonShardServerProcessInterface::insertTimeseries(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- std::vector<BSONObj>&& objs,
+ std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) {
try {
- auto insertReply = write_ops_exec::performTimeseriesWrites(
- expCtx->opCtx, buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation));
+ auto insertReply = write_ops_exec::performTimeseriesWrites(expCtx->opCtx, *insertCommand);
checkWriteErrors(insertReply.getWriteCommandReplyBase());
} catch (DBException& ex) {
@@ -146,13 +145,12 @@ Status NonShardServerProcessInterface::insertTimeseries(
StatusWith<MongoProcessInterface::UpdateResult> NonShardServerProcessInterface::update(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- BatchedObjects&& batch,
+ std::unique_ptr<write_ops::UpdateCommandRequest> updateCommand,
const WriteConcernOptions& wc,
UpsertType upsert,
bool multi,
boost::optional<OID> targetEpoch) {
- auto writeResults = write_ops_exec::performUpdates(
- expCtx->opCtx, buildUpdateOp(expCtx, ns, std::move(batch), upsert, multi));
+ auto writeResults = write_ops_exec::performUpdates(expCtx->opCtx, *updateCommand);
// Need to check each result in the batch since the writes are unordered.
UpdateResult updateResult;
diff --git a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h
index 9bae8030aac..639cc22d044 100644
--- a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h
@@ -91,19 +91,19 @@ public:
Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- std::vector<BSONObj>&& objs,
+ std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) override;
Status insertTimeseries(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- std::vector<BSONObj>&& objs,
+ std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) override;
StatusWith<UpdateResult> update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- BatchedObjects&& batch,
+ std::unique_ptr<write_ops::UpdateCommandRequest> updateCommand,
const WriteConcernOptions& wc,
UpsertType upsert,
bool multi,
diff --git a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp
index ad504860169..3195b504faf 100644
--- a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp
@@ -69,26 +69,27 @@ void ReplicaSetNodeProcessInterface::setReplicaSetNodeExecutor(
replicaSetNodeExecutor(service) = std::move(executor);
}
-Status ReplicaSetNodeProcessInterface::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& ns,
- std::vector<BSONObj>&& objs,
- const WriteConcernOptions& wc,
- boost::optional<OID> targetEpoch) {
+Status ReplicaSetNodeProcessInterface::insert(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
+ const WriteConcernOptions& wc,
+ boost::optional<OID> targetEpoch) {
auto&& opCtx = expCtx->opCtx;
if (_canWriteLocally(opCtx, ns)) {
- return NonShardServerProcessInterface::insert(expCtx, ns, std::move(objs), wc, targetEpoch);
+ return NonShardServerProcessInterface::insert(
+ expCtx, ns, std::move(insertCommand), wc, targetEpoch);
}
- BatchedCommandRequest insertCommand(
- buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation));
+ BatchedCommandRequest batchInsertCommand(std::move(insertCommand));
- return _executeCommandOnPrimary(opCtx, ns, std::move(insertCommand.toBSON())).getStatus();
+ return _executeCommandOnPrimary(opCtx, ns, batchInsertCommand.toBSON()).getStatus();
}
StatusWith<MongoProcessInterface::UpdateResult> ReplicaSetNodeProcessInterface::update(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- BatchedObjects&& batch,
+ std::unique_ptr<write_ops::UpdateCommandRequest> updateCommand,
const WriteConcernOptions& wc,
UpsertType upsert,
bool multi,
@@ -96,11 +97,11 @@ StatusWith<MongoProcessInterface::UpdateResult> ReplicaSetNodeProcessInterface::
auto&& opCtx = expCtx->opCtx;
if (_canWriteLocally(opCtx, ns)) {
return NonShardServerProcessInterface::update(
- expCtx, ns, std::move(batch), wc, upsert, multi, targetEpoch);
+ expCtx, ns, std::move(updateCommand), wc, upsert, multi, targetEpoch);
}
+ BatchedCommandRequest batchUpdateCommand(std::move(updateCommand));
- BatchedCommandRequest updateCommand(buildUpdateOp(expCtx, ns, std::move(batch), upsert, multi));
- auto result = _executeCommandOnPrimary(opCtx, ns, std::move(updateCommand.toBSON()));
+ auto result = _executeCommandOnPrimary(opCtx, ns, batchUpdateCommand.toBSON());
if (!result.isOK()) {
return result.getStatus();
}
@@ -142,14 +143,15 @@ void ReplicaSetNodeProcessInterface::createTimeseriesView(OperationContext* opCt
Status ReplicaSetNodeProcessInterface::insertTimeseries(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- std::vector<BSONObj>&& objs,
+ std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) {
if (_canWriteLocally(expCtx->opCtx, ns)) {
return NonShardServerProcessInterface::insertTimeseries(
- expCtx, ns, std::move(objs), wc, targetEpoch);
+ expCtx, ns, std::move(insertCommand), wc, targetEpoch);
} else {
- return ReplicaSetNodeProcessInterface::insert(expCtx, ns, std::move(objs), wc, targetEpoch);
+ return ReplicaSetNodeProcessInterface::insert(
+ expCtx, ns, std::move(insertCommand), wc, targetEpoch);
}
}
diff --git a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h
index cf5bb5a90ef..94f45559384 100644
--- a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h
@@ -68,12 +68,13 @@ public:
Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- std::vector<BSONObj>&& objs,
+ std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) final;
+
StatusWith<UpdateResult> update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- BatchedObjects&& batch,
+ std::unique_ptr<write_ops::UpdateCommandRequest> updateCommand,
const WriteConcernOptions& wc,
UpsertType upsert,
bool multi,
@@ -100,7 +101,7 @@ public:
Status insertTimeseries(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- std::vector<BSONObj>&& objs,
+ std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch);
diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
index bfea6b001a6..dd9367b09e2 100644
--- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
@@ -123,20 +123,20 @@ boost::optional<Document> ShardServerProcessInterface::lookupSingleDocument(
return doLookupSingleDocument(expCtx, nss, collectionUUID, documentKey, std::move(opts));
}
-Status ShardServerProcessInterface::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& ns,
- std::vector<BSONObj>&& objs,
- const WriteConcernOptions& wc,
- boost::optional<OID> targetEpoch) {
+Status ShardServerProcessInterface::insert(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
+ const WriteConcernOptions& wc,
+ boost::optional<OID> targetEpoch) {
BatchedCommandResponse response;
BatchWriteExecStats stats;
- BatchedCommandRequest insertCommand(
- buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation));
+ BatchedCommandRequest batchInsertCommand(std::move(insertCommand));
- insertCommand.setWriteConcern(wc.toBSON());
+ batchInsertCommand.setWriteConcern(wc.toBSON());
- cluster::write(expCtx->opCtx, insertCommand, &stats, &response, targetEpoch);
+ cluster::write(expCtx->opCtx, batchInsertCommand, &stats, &response, targetEpoch);
return response.toStatus();
}
@@ -144,7 +144,7 @@ Status ShardServerProcessInterface::insert(const boost::intrusive_ptr<Expression
StatusWith<MongoProcessInterface::UpdateResult> ShardServerProcessInterface::update(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- BatchedObjects&& batch,
+ std::unique_ptr<write_ops::UpdateCommandRequest> updateCommand,
const WriteConcernOptions& wc,
UpsertType upsert,
bool multi,
@@ -152,11 +152,10 @@ StatusWith<MongoProcessInterface::UpdateResult> ShardServerProcessInterface::upd
BatchedCommandResponse response;
BatchWriteExecStats stats;
- BatchedCommandRequest updateCommand(buildUpdateOp(expCtx, ns, std::move(batch), upsert, multi));
-
- updateCommand.setWriteConcern(wc.toBSON());
+ BatchedCommandRequest batchUpdateCommand(std::move(updateCommand));
+ batchUpdateCommand.setWriteConcern(wc.toBSON());
- cluster::write(expCtx->opCtx, updateCommand, &stats, &response, targetEpoch);
+ cluster::write(expCtx->opCtx, batchUpdateCommand, &stats, &response, targetEpoch);
if (auto status = response.toStatus(); status != Status::OK()) {
return status;
@@ -402,10 +401,11 @@ void ShardServerProcessInterface::createTimeseriesView(OperationContext* opCtx,
Status ShardServerProcessInterface::insertTimeseries(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- std::vector<BSONObj>&& objs,
+ std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) {
- return ShardServerProcessInterface::insert(expCtx, ns, std::move(objs), wc, targetEpoch);
+ return ShardServerProcessInterface::insert(
+ expCtx, ns, std::move(insertCommand), wc, targetEpoch);
}
std::unique_ptr<Pipeline, PipelineDeleter>
diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h
index c81af91b6e0..aef9845f135 100644
--- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h
@@ -53,6 +53,11 @@ public:
const NamespaceString& nss,
ChunkVersion targetCollectionPlacementVersion) const final;
+ std::unique_ptr<WriteSizeEstimator> getWriteSizeEstimator(
+ OperationContext* opCtx, const NamespaceString& ns) const final {
+ return std::make_unique<TargetPrimaryWriteSizeEstimator>();
+ }
+
std::vector<FieldPath> collectDocumentKeyFieldsActingAsRouter(
OperationContext*, const NamespaceString&) const final {
// We don't expect anyone to use this method on the shard itself (yet). This is currently
@@ -71,23 +76,15 @@ public:
const Document& documentKey,
boost::optional<BSONObj> readConcern) final;
- /**
- * Inserts the documents 'objs' into the namespace 'ns' using the ClusterWriter for locking,
- * routing, stale config handling, etc.
- */
Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- std::vector<BSONObj>&& objs,
+ std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) final;
- /**
- * Replaces the documents matching 'queries' with 'updates' using the ClusterWriter for locking,
- * routing, stale config handling, etc.
- */
StatusWith<UpdateResult> update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- BatchedObjects&& batch,
+ std::unique_ptr<write_ops::UpdateCommandRequest> updateCommand,
const WriteConcernOptions& wc,
UpsertType upsert,
bool multi,
@@ -149,7 +146,7 @@ public:
Status insertTimeseries(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- std::vector<BSONObj>&& objs,
+ std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) final;
};
diff --git a/src/mongo/db/pipeline/process_interface/standalone_process_interface.h b/src/mongo/db/pipeline/process_interface/standalone_process_interface.h
index aceff8e6928..dc562b9089e 100644
--- a/src/mongo/db/pipeline/process_interface/standalone_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/standalone_process_interface.h
@@ -41,6 +41,11 @@ public:
StandaloneProcessInterface(std::shared_ptr<executor::TaskExecutor> exec)
: NonShardServerProcessInterface(std::move(exec)) {}
+ std::unique_ptr<MongoProcessInterface::WriteSizeEstimator> getWriteSizeEstimator(
+ OperationContext* opCtx, const NamespaceString& ns) const final {
+ return std::make_unique<LocalWriteSizeEstimator>();
+ }
+
virtual ~StandaloneProcessInterface() = default;
};
diff --git a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
index 60783a7709b..93ed117a53d 100644
--- a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
@@ -55,6 +55,16 @@ public:
class StubWriteSizeEstimator final : public WriteSizeEstimator {
public:
+ int estimateInsertHeaderSize(
+ const write_ops::InsertCommandRequest& insertReq) const override {
+ return 0;
+ }
+
+ int estimateUpdateHeaderSize(
+ const write_ops::UpdateCommandRequest& insertReq) const override {
+ return 0;
+ }
+
int estimateInsertSizeBytes(const BSONObj& insert) const override {
MONGO_UNREACHABLE;
}
@@ -64,6 +74,7 @@ public:
MONGO_UNREACHABLE;
}
};
+
std::unique_ptr<WriteSizeEstimator> getWriteSizeEstimator(
OperationContext* opCtx, const NamespaceString& ns) const override {
return std::make_unique<StubWriteSizeEstimator>();
@@ -77,7 +88,7 @@ public:
Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- std::vector<BSONObj>&& objs,
+ std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID>) override {
MONGO_UNREACHABLE;
@@ -85,7 +96,7 @@ public:
Status insertTimeseries(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- std::vector<BSONObj>&& objs,
+ std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) override {
MONGO_UNREACHABLE;
@@ -93,7 +104,7 @@ public:
StatusWith<UpdateResult> update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- BatchedObjects&& batch,
+ std::unique_ptr<write_ops::UpdateCommandRequest> updateCommand,
const WriteConcernOptions& wc,
UpsertType upsert,
bool multi,