diff options
author | Mihai Andrei <mihai.andrei@mongodb.com> | 2023-05-11 15:10:12 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-05-11 18:05:42 +0000 |
commit | 1e233afdb0def2aed7521ae087b575c95245e257 (patch) | |
tree | 5a2be72bb51c8597a01bda589f2b13b27d578753 /src/mongo/db/pipeline | |
parent | 5d0653dce309f18986c754d81d6ab2997ef95735 (diff) | |
download | mongo-1e233afdb0def2aed7521ae087b575c95245e257.tar.gz |
SERVER-74806 Account for header size when computing initial size of batch write
Diffstat (limited to 'src/mongo/db/pipeline')
20 files changed, 384 insertions, 257 deletions
diff --git a/src/mongo/db/pipeline/document_source_merge.cpp b/src/mongo/db/pipeline/document_source_merge.cpp index 722de5936f9..896a50ad0dc 100644 --- a/src/mongo/db/pipeline/document_source_merge.cpp +++ b/src/mongo/db/pipeline/document_source_merge.cpp @@ -60,6 +60,7 @@ namespace { using MergeStrategyDescriptor = DocumentSourceMerge::MergeStrategyDescriptor; using MergeMode = MergeStrategyDescriptor::MergeMode; using MergeStrategy = MergeStrategyDescriptor::MergeStrategy; +using BatchedCommandGenerator = MergeStrategyDescriptor::BatchedCommandGenerator; using MergeStrategyDescriptorsMap = std::map<const MergeMode, const MergeStrategyDescriptor>; using WhenMatched = MergeStrategyDescriptor::WhenMatched; using WhenNotMatched = MergeStrategyDescriptor::WhenNotMatched; @@ -86,6 +87,55 @@ constexpr auto kPipelineDiscardMode = MergeMode{WhenMatched::kPipeline, WhenNotM const auto kDefaultPipelineLet = BSON("new" << "$$ROOT"); +BatchedCommandGenerator makeInsertCommandGenerator() { + return [](const auto& expCtx, const auto& ns) -> BatchedCommandRequest { + return DocumentSourceMerge::DocumentSourceWriter::makeInsertCommand( + ns, expCtx->bypassDocumentValidation); + }; +} + +BatchedCommandGenerator makeUpdateCommandGenerator() { + return [](const auto& expCtx, const auto& ns) -> BatchedCommandRequest { + write_ops::UpdateCommandRequest updateOp(ns); + updateOp.setWriteCommandRequestBase([&] { + write_ops::WriteCommandRequestBase wcb; + wcb.setOrdered(false); + wcb.setBypassDocumentValidation(expCtx->bypassDocumentValidation); + return wcb; + }()); + auto [constants, letParams] = + expCtx->variablesParseState.transitionalCompatibilitySerialize(expCtx->variables); + updateOp.setLegacyRuntimeConstants(std::move(constants)); + if (!letParams.isEmpty()) { + updateOp.setLet(std::move(letParams)); + } + return BatchedCommandRequest(std::move(updateOp)); + }; +} + +/** + * Converts 'batch' into a vector of UpdateOpEntries. + */ +std::vector<write_ops::UpdateOpEntry> constructUpdateEntries( + DocumentSourceMerge::DocumentSourceWriter::BatchedObjects&& batch, + UpsertType upsert, + bool multi) { + std::vector<write_ops::UpdateOpEntry> updateEntries; + for (auto&& obj : batch) { + write_ops::UpdateOpEntry entry; + auto&& [q, u, c] = obj; + entry.setQ(std::move(q)); + entry.setU(std::move(u)); + entry.setC(std::move(c)); + entry.setUpsert(upsert != UpsertType::kNone); + entry.setUpsertSupplied({{entry.getUpsert(), upsert == UpsertType::kInsertSuppliedDoc}}); + entry.setMulti(multi); + + updateEntries.push_back(std::move(entry)); + } + return updateEntries; +} + /** * Creates a merge strategy which uses update semantics to perform a merge operation. */ @@ -95,10 +145,13 @@ MergeStrategy makeUpdateStrategy() { const auto& wc, auto epoch, auto&& batch, + auto&& bcr, UpsertType upsert) { constexpr auto multi = false; + auto updateCommand = bcr.extractUpdateRequest(); + updateCommand->setUpdates(constructUpdateEntries(std::move(batch), upsert, multi)); uassertStatusOK(expCtx->mongoProcessInterface->update( - expCtx, ns, std::move(batch), wc, upsert, multi, epoch)); + expCtx, ns, std::move(updateCommand), wc, upsert, multi, epoch)); }; } @@ -115,11 +168,14 @@ MergeStrategy makeStrictUpdateStrategy() { const auto& wc, auto epoch, auto&& batch, + auto&& bcr, UpsertType upsert) { const int64_t batchSize = batch.size(); constexpr auto multi = false; + auto updateCommand = bcr.extractUpdateRequest(); + updateCommand->setUpdates(constructUpdateEntries(std::move(batch), upsert, multi)); auto updateResult = uassertStatusOK(expCtx->mongoProcessInterface->update( - expCtx, ns, std::move(batch), wc, upsert, multi, epoch)); + expCtx, ns, std::move(updateCommand), wc, upsert, multi, epoch)); uassert(ErrorCodes::MergeStageNoMatchingDocument, "{} could not find a matching document in the target collection " "for at least one document in the source collection"_format(kStageName), @@ -136,6 +192,7 @@ MergeStrategy makeInsertStrategy() { const auto& wc, auto epoch, auto&& batch, + auto&& bcr, UpsertType upsertType) { std::vector<BSONObj> objectsToInsert(batch.size()); // The batch stores replacement style updates, but for this "insert" style of $merge we'd @@ -143,8 +200,10 @@ MergeStrategy makeInsertStrategy() { std::transform(batch.begin(), batch.end(), objectsToInsert.begin(), [](const auto& obj) { return std::get<UpdateModification>(obj).getUpdateReplacement(); }); - uassertStatusOK(expCtx->mongoProcessInterface->insert( - expCtx, ns, std::move(objectsToInsert), wc, epoch)); + auto insertCommand = bcr.extractInsertRequest(); + insertCommand->setDocuments(std::move(objectsToInsert)); + uassertStatusOK( + expCtx->mongoProcessInterface->insert(expCtx, ns, std::move(insertCommand), wc, epoch)); }; } @@ -174,72 +233,95 @@ const MergeStrategyDescriptorsMap& getDescriptors() { // be initialized first. By wrapping the map into a function we can guarantee that it won't be // initialized until the first use, which is when the program already started and all global // variables had been initialized. - static const auto mergeStrategyDescriptors = MergeStrategyDescriptorsMap{ - // whenMatched: replace, whenNotMatched: insert - {kReplaceInsertMode, - {kReplaceInsertMode, - {ActionType::insert, ActionType::update}, - makeUpdateStrategy(), - {}, - UpsertType::kGenerateNewDoc}}, - // whenMatched: replace, whenNotMatched: fail - {kReplaceFailMode, - {kReplaceFailMode, - {ActionType::update}, - makeStrictUpdateStrategy(), - {}, - UpsertType::kNone}}, - // whenMatched: replace, whenNotMatched: discard - {kReplaceDiscardMode, - {kReplaceDiscardMode, {ActionType::update}, makeUpdateStrategy(), {}, UpsertType::kNone}}, - // whenMatched: merge, whenNotMatched: insert - {kMergeInsertMode, - {kMergeInsertMode, - {ActionType::insert, ActionType::update}, - makeUpdateStrategy(), - makeUpdateTransform("$set"), - UpsertType::kGenerateNewDoc}}, - // whenMatched: merge, whenNotMatched: fail - {kMergeFailMode, - {kMergeFailMode, - {ActionType::update}, - makeStrictUpdateStrategy(), - makeUpdateTransform("$set"), - UpsertType::kNone}}, - // whenMatched: merge, whenNotMatched: discard - {kMergeDiscardMode, - {kMergeDiscardMode, - {ActionType::update}, - makeUpdateStrategy(), - makeUpdateTransform("$set"), - UpsertType::kNone}}, - // whenMatched: keepExisting, whenNotMatched: insert - {kKeepExistingInsertMode, - {kKeepExistingInsertMode, - {ActionType::insert, ActionType::update}, - makeUpdateStrategy(), - makeUpdateTransform("$setOnInsert"), - UpsertType::kGenerateNewDoc}}, - // whenMatched: [pipeline], whenNotMatched: insert - {kPipelineInsertMode, - {kPipelineInsertMode, - {ActionType::insert, ActionType::update}, - makeUpdateStrategy(), - {}, - UpsertType::kInsertSuppliedDoc}}, - // whenMatched: [pipeline], whenNotMatched: fail - {kPipelineFailMode, - {kPipelineFailMode, - {ActionType::update}, - makeStrictUpdateStrategy(), - {}, - UpsertType::kNone}}, - // whenMatched: [pipeline], whenNotMatched: discard - {kPipelineDiscardMode, - {kPipelineDiscardMode, {ActionType::update}, makeUpdateStrategy(), {}, UpsertType::kNone}}, - // whenMatched: fail, whenNotMatched: insert - {kFailInsertMode, - {kFailInsertMode, {ActionType::insert}, makeInsertStrategy(), {}, UpsertType::kNone}}}; + static const auto mergeStrategyDescriptors = + MergeStrategyDescriptorsMap{// whenMatched: replace, whenNotMatched: insert + {kReplaceInsertMode, + {kReplaceInsertMode, + {ActionType::insert, ActionType::update}, + makeUpdateStrategy(), + {}, + UpsertType::kGenerateNewDoc, + makeUpdateCommandGenerator()}}, + // whenMatched: replace, whenNotMatched: fail + {kReplaceFailMode, + {kReplaceFailMode, + {ActionType::update}, + makeStrictUpdateStrategy(), + {}, + UpsertType::kNone, + makeUpdateCommandGenerator()}}, + // whenMatched: replace, whenNotMatched: discard + {kReplaceDiscardMode, + {kReplaceDiscardMode, + {ActionType::update}, + makeUpdateStrategy(), + {}, + UpsertType::kNone, + makeUpdateCommandGenerator()}}, + // whenMatched: merge, whenNotMatched: insert + {kMergeInsertMode, + {kMergeInsertMode, + {ActionType::insert, ActionType::update}, + makeUpdateStrategy(), + makeUpdateTransform("$set"), + UpsertType::kGenerateNewDoc, + makeUpdateCommandGenerator()}}, + // whenMatched: merge, whenNotMatched: fail + {kMergeFailMode, + {kMergeFailMode, + {ActionType::update}, + makeStrictUpdateStrategy(), + makeUpdateTransform("$set"), + UpsertType::kNone, + makeUpdateCommandGenerator()}}, + // whenMatched: merge, whenNotMatched: discard + {kMergeDiscardMode, + {kMergeDiscardMode, + {ActionType::update}, + makeUpdateStrategy(), + makeUpdateTransform("$set"), + UpsertType::kNone, + makeUpdateCommandGenerator()}}, + // whenMatched: keepExisting, whenNotMatched: insert + {kKeepExistingInsertMode, + {kKeepExistingInsertMode, + {ActionType::insert, ActionType::update}, + makeUpdateStrategy(), + makeUpdateTransform("$setOnInsert"), + UpsertType::kGenerateNewDoc, + makeUpdateCommandGenerator()}}, + // whenMatched: [pipeline], whenNotMatched: insert + {kPipelineInsertMode, + {kPipelineInsertMode, + {ActionType::insert, ActionType::update}, + makeUpdateStrategy(), + {}, + UpsertType::kInsertSuppliedDoc, + makeUpdateCommandGenerator()}}, + // whenMatched: [pipeline], whenNotMatched: fail + {kPipelineFailMode, + {kPipelineFailMode, + {ActionType::update}, + makeStrictUpdateStrategy(), + {}, + UpsertType::kNone, + makeUpdateCommandGenerator()}}, + // whenMatched: [pipeline], whenNotMatched: discard + {kPipelineDiscardMode, + {kPipelineDiscardMode, + {ActionType::update}, + makeUpdateStrategy(), + {}, + UpsertType::kNone, + makeUpdateCommandGenerator()}}, + // whenMatched: fail, whenNotMatched: insert + {kFailInsertMode, + {kFailInsertMode, + {ActionType::insert}, + makeInsertStrategy(), + {}, + UpsertType::kNone, + makeInsertCommandGenerator()}}}; return mergeStrategyDescriptors; } @@ -599,14 +681,19 @@ std::pair<DocumentSourceMerge::BatchObject, int> DocumentSourceMerge::makeBatchO _writeSizeEstimator->estimateUpdateSizeBytes(batchObject, _descriptor.upsertType)}; } -void DocumentSourceMerge::spill(BatchedObjects&& batch) try { +void DocumentSourceMerge::spill(BatchedCommandRequest&& bcr, BatchedObjects&& batch) try { DocumentSourceWriteBlock writeBlock(pExpCtx->opCtx); auto targetEpoch = _targetCollectionPlacementVersion ? boost::optional<OID>(_targetCollectionPlacementVersion->epoch()) : boost::none; - _descriptor.strategy( - pExpCtx, _outputNs, _writeConcern, targetEpoch, std::move(batch), _descriptor.upsertType); + _descriptor.strategy(pExpCtx, + _outputNs, + _writeConcern, + targetEpoch, + std::move(batch), + std::move(bcr), + _descriptor.upsertType); } catch (const ExceptionFor<ErrorCodes::ImmutableField>& ex) { uassertStatusOKWithContext(ex.toStatus(), "$merge failed to update the matching document, did you " @@ -630,6 +717,10 @@ void DocumentSourceMerge::spill(BatchedObjects&& batch) try { } } +BatchedCommandRequest DocumentSourceMerge::initializeBatchedWriteRequest() const { + return _descriptor.batchedCommandGenerator(pExpCtx, _outputNs); +} + void DocumentSourceMerge::waitWhileFailPointEnabled() { CurOpFailpointHelpers::waitWhileFailPointEnabled( &hangWhileBuildingDocumentSourceMergeBatch, diff --git a/src/mongo/db/pipeline/document_source_merge.h b/src/mongo/db/pipeline/document_source_merge.h index b0eed7a6df9..0349374fadb 100644 --- a/src/mongo/db/pipeline/document_source_merge.h +++ b/src/mongo/db/pipeline/document_source_merge.h @@ -48,8 +48,9 @@ public: // A descriptor for a merge strategy. Holds a merge strategy function and a set of actions the // client should be authorized to perform in order to be able to execute a merge operation using - // this merge strategy. If a 'BatchTransform' function is provided, it will be called when - // constructing a batch object to transform updates. + // this merge strategy. Additionally holds a 'BatchedCommandGenerator' that will initialize a + // BatchedWriteRequest for executing the batch write. If a 'BatchTransform' function is + // provided, it will be called when constructing a batch object to transform updates. struct MergeStrategyDescriptor { using WhenMatched = MergeWhenMatchedModeEnum; using WhenNotMatched = MergeWhenNotMatchedModeEnum; @@ -62,13 +63,19 @@ public: const WriteConcernOptions&, boost::optional<OID>, BatchedObjects&&, + BatchedCommandRequest&&, UpsertType upsert)>; + // A function object that will be invoked to generate a BatchedCommandRequest. + using BatchedCommandGenerator = std::function<BatchedCommandRequest( + const boost::intrusive_ptr<ExpressionContext>&, const NamespaceString&)>; + MergeMode mode; ActionSet actions; MergeStrategy strategy; BatchTransform transform; UpsertType upsertType; + BatchedCommandGenerator batchedCommandGenerator; }; /** @@ -218,7 +225,9 @@ private: return bob.obj(); } - void spill(BatchedObjects&& batch) override; + void spill(BatchedCommandRequest&& bcr, BatchedObjects&& batch) override; + + BatchedCommandRequest initializeBatchedWriteRequest() const override; void waitWhileFailPointEnabled() override; diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp index 8763cb0874c..e45933d4f73 100644 --- a/src/mongo/db/pipeline/document_source_out.cpp +++ b/src/mongo/db/pipeline/document_source_out.cpp @@ -284,6 +284,13 @@ void DocumentSourceOut::finalize() { _timeseriesStateConsistent = true; } +BatchedCommandRequest DocumentSourceOut::initializeBatchedWriteRequest() const { + // Note that our insert targets '_tempNs' (or the associated timeseries view) since we will + // never write to 'outputNs' directly. + const auto& targetNss = _timeseries ? _tempNs.getTimeseriesViewNamespace() : _tempNs; + return DocumentSourceWriter::makeInsertCommand(targetNss, pExpCtx->bypassDocumentValidation); +} + boost::intrusive_ptr<DocumentSource> DocumentSourceOut::create( NamespaceString outputNs, const boost::intrusive_ptr<ExpressionContext>& expCtx, diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h index 60d6a865c1d..becd7998bd0 100644 --- a/src/mongo/db/pipeline/document_source_out.h +++ b/src/mongo/db/pipeline/document_source_out.h @@ -127,20 +127,23 @@ private: void finalize() override; - void spill(BatchedObjects&& batch) override { + void spill(BatchedCommandRequest&& bcr, BatchedObjects&& batch) override { DocumentSourceWriteBlock writeBlock(pExpCtx->opCtx); + auto insertCommand = bcr.extractInsertRequest(); + insertCommand->setDocuments(std::move(batch)); auto targetEpoch = boost::none; + if (_timeseries) { uassertStatusOK(pExpCtx->mongoProcessInterface->insertTimeseries( pExpCtx, _tempNs.getTimeseriesViewNamespace(), - std::move(batch), + std::move(insertCommand), _writeConcern, targetEpoch)); } else { uassertStatusOK(pExpCtx->mongoProcessInterface->insert( - pExpCtx, _tempNs, std::move(batch), _writeConcern, targetEpoch)); + pExpCtx, _tempNs, std::move(insertCommand), _writeConcern, targetEpoch)); } } @@ -150,6 +153,8 @@ private: return {obj, _writeSizeEstimator->estimateInsertSizeBytes(obj)}; } + BatchedCommandRequest initializeBatchedWriteRequest() const override; + void waitWhileFailPointEnabled() override; /** diff --git a/src/mongo/db/pipeline/document_source_writer.h b/src/mongo/db/pipeline/document_source_writer.h index 124625d6f94..b2c5363d429 100644 --- a/src/mongo/db/pipeline/document_source_writer.h +++ b/src/mongo/db/pipeline/document_source_writer.h @@ -39,6 +39,7 @@ #include "mongo/db/read_concern.h" #include "mongo/db/storage/recovery_unit.h" #include "mongo/rpc/metadata/impersonated_user_metadata.h" +#include "mongo/s/write_ops/batched_command_request.h" namespace mongo { using namespace fmt::literals; @@ -84,11 +85,14 @@ public: /** * This is a base abstract class for all stages performing a write operation into an output * collection. The writes are organized in batches in which elements are objects of the templated - * type 'B'. A subclass must override two methods to be able to write into the output collection: + * type 'B'. A subclass must override the following methods to be able to write into the output + * collection: * - * 1. 'makeBatchObject()' - to create an object of type 'B' from the given 'Document', which is, + * - 'makeBatchObject()' - creates an object of type 'B' from the given 'Document', which is, * essentially, a result of the input source's 'getNext()' . - * 2. 'spill()' - to write the batch into the output collection. + * - 'spill()' - writes the batch into the output collection. + * - 'initializeBatchedWriteRequest()' - initializes the request object for writing a batch to + * the output collection. * * Two other virtual methods exist which a subclass may override: 'initialize()' and 'finalize()', * which are called before the first element is read from the input source, and after the last one @@ -100,6 +104,18 @@ public: using BatchObject = B; using BatchedObjects = std::vector<BatchObject>; + static BatchedCommandRequest makeInsertCommand(const NamespaceString& outputNs, + bool bypassDocumentValidation) { + write_ops::InsertCommandRequest insertOp(outputNs); + insertOp.setWriteCommandRequestBase([&] { + write_ops::WriteCommandRequestBase wcb; + wcb.setOrdered(false); + wcb.setBypassDocumentValidation(bypassDocumentValidation); + return wcb; + }()); + return BatchedCommandRequest(std::move(insertOp)); + } + DocumentSourceWriter(const char* stageName, NamespaceString outputNs, const boost::intrusive_ptr<ExpressionContext>& expCtx) @@ -146,9 +162,31 @@ protected: virtual void finalize() {} /** - * Writes the documents in 'batch' to the output namespace. + * Writes the documents in 'batch' to the output namespace via 'bcr'. + */ + virtual void spill(BatchedCommandRequest&& bcr, BatchedObjects&& batch) = 0; + + /** + * Estimates the size of the header of a batch write (that is, the size of the write command + * minus the size of write statements themselves). + */ + int estimateWriteHeaderSize(const BatchedCommandRequest& bcr) const { + using BatchType = BatchedCommandRequest::BatchType; + switch (bcr.getBatchType()) { + case BatchType::BatchType_Insert: + return _writeSizeEstimator->estimateInsertHeaderSize(bcr.getInsertRequest()); + case BatchType::BatchType_Update: + return _writeSizeEstimator->estimateUpdateHeaderSize(bcr.getUpdateRequest()); + case BatchType::BatchType_Delete: + break; + } + MONGO_UNREACHABLE; + } + + /** + * Constructs and configures a BatchedCommandRequest for performing a batch write. */ - virtual void spill(BatchedObjects&& batch) = 0; + virtual BatchedCommandRequest initializeBatchedWriteRequest() const = 0; /** * Creates a batch object from the given document and returns it to the caller along with the @@ -210,15 +248,20 @@ DocumentSource::GetNextResult DocumentSourceWriter<B>::doGetNext() { // and assume the rest can fit in the 16KB already built into BSONObjMaxUserSize. const auto estimatedMetadataSizeBytes = rpc::estimateImpersonatedUserMetadataSize(pExpCtx->opCtx); + + BatchedCommandRequest batchWrite = initializeBatchedWriteRequest(); + const auto writeHeaderSize = estimateWriteHeaderSize(batchWrite); + const auto initialRequestSize = estimatedMetadataSizeBytes + writeHeaderSize; + uassert(7637800, "Unable to proceed with write while metadata size ({}KB) exceeds {}KB"_format( - estimatedMetadataSizeBytes / 1024, BSONObjMaxUserSize / 1024), - estimatedMetadataSizeBytes <= BSONObjMaxUserSize); + initialRequestSize / 1024, BSONObjMaxUserSize / 1024), + initialRequestSize <= BSONObjMaxUserSize); - const auto maxBatchSizeBytes = BSONObjMaxUserSize - estimatedMetadataSizeBytes; - BatchedObjects batch; - std::size_t bufferedBytes = 0; + const auto maxBatchSizeBytes = BSONObjMaxUserSize - initialRequestSize; + BatchedObjects batch; + size_t bufferedBytes = 0; auto nextInput = pSource->getNext(); for (; nextInput.isAdvanced(); nextInput = pSource->getNext()) { waitWhileFailPointEnabled(); @@ -230,14 +273,15 @@ DocumentSource::GetNextResult DocumentSourceWriter<B>::doGetNext() { if (!batch.empty() && (bufferedBytes > maxBatchSizeBytes || batch.size() >= write_ops::kMaxWriteBatchSize)) { - spill(std::move(batch)); + spill(std::move(batchWrite), std::move(batch)); batch.clear(); + batchWrite = initializeBatchedWriteRequest(); bufferedBytes = objSize; } batch.push_back(obj); } if (!batch.empty()) { - spill(std::move(batch)); + spill(std::move(batchWrite), std::move(batch)); batch.clear(); } diff --git a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp index 9c2dd64172f..c3db7716d48 100644 --- a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp @@ -788,59 +788,6 @@ CommonMongodProcessInterface::ensureFieldsUniqueOrResolveDocumentKey( return {*fieldPaths, targetCollectionPlacementVersion}; } -write_ops::InsertCommandRequest CommonMongodProcessInterface::buildInsertOp( - const NamespaceString& nss, std::vector<BSONObj>&& objs, bool bypassDocValidation) { - write_ops::InsertCommandRequest insertOp(nss); - insertOp.setDocuments(std::move(objs)); - insertOp.setWriteCommandRequestBase([&] { - write_ops::WriteCommandRequestBase wcb; - wcb.setOrdered(false); - wcb.setBypassDocumentValidation(bypassDocValidation); - return wcb; - }()); - return insertOp; -} - -write_ops::UpdateCommandRequest CommonMongodProcessInterface::buildUpdateOp( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& nss, - BatchedObjects&& batch, - UpsertType upsert, - bool multi) { - write_ops::UpdateCommandRequest updateOp(nss); - updateOp.setUpdates([&] { - std::vector<write_ops::UpdateOpEntry> updateEntries; - for (auto&& obj : batch) { - updateEntries.push_back([&] { - write_ops::UpdateOpEntry entry; - auto&& [q, u, c] = obj; - entry.setQ(std::move(q)); - entry.setU(std::move(u)); - entry.setC(std::move(c)); - entry.setUpsert(upsert != UpsertType::kNone); - entry.setUpsertSupplied( - {{entry.getUpsert(), upsert == UpsertType::kInsertSuppliedDoc}}); - entry.setMulti(multi); - return entry; - }()); - } - return updateEntries; - }()); - updateOp.setWriteCommandRequestBase([&] { - write_ops::WriteCommandRequestBase wcb; - wcb.setOrdered(false); - wcb.setBypassDocumentValidation(expCtx->bypassDocumentValidation); - return wcb; - }()); - auto [constants, letParams] = - expCtx->variablesParseState.transitionalCompatibilitySerialize(expCtx->variables); - updateOp.setLegacyRuntimeConstants(std::move(constants)); - if (!letParams.isEmpty()) { - updateOp.setLet(std::move(letParams)); - } - return updateOp; -} - BSONObj CommonMongodProcessInterface::_convertRenameToInternalRename( OperationContext* opCtx, const NamespaceString& sourceNs, diff --git a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h index 09b3ed8fa33..07e2d0370ce 100644 --- a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h @@ -145,23 +145,6 @@ protected: const Document& documentKey, MakePipelineOptions opts); - /** - * Builds an ordered insert op on namespace 'nss' and documents to be written 'objs'. - */ - write_ops::InsertCommandRequest buildInsertOp(const NamespaceString& nss, - std::vector<BSONObj>&& objs, - bool bypassDocValidation); - - /** - * Builds an ordered update op on namespace 'nss' with update entries contained in 'batch'. - */ - write_ops::UpdateCommandRequest buildUpdateOp( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& nss, - BatchedObjects&& batch, - UpsertType upsert, - bool multi); - BSONObj _reportCurrentOpForClient(OperationContext* opCtx, Client* client, CurrentOpTruncateMode truncateOps, diff --git a/src/mongo/db/pipeline/process_interface/common_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_process_interface.cpp index c3a978ca766..5cf57474365 100644 --- a/src/mongo/db/pipeline/process_interface/common_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/common_process_interface.cpp @@ -168,12 +168,6 @@ std::vector<FieldPath> CommonProcessInterface::collectDocumentKeyFieldsActingAsR return {"_id"}; } -std::unique_ptr<CommonProcessInterface::WriteSizeEstimator> -CommonProcessInterface::getWriteSizeEstimator(OperationContext* opCtx, - const NamespaceString& ns) const { - return std::make_unique<LocalWriteSizeEstimator>(); -} - void CommonProcessInterface::updateClientOperationTime(OperationContext* opCtx) const { // In order to support causal consistency in a replica set or a sharded cluster when reading // with secondary read preference, the secondary must propagate the primary's operation time diff --git a/src/mongo/db/pipeline/process_interface/common_process_interface.h b/src/mongo/db/pipeline/process_interface/common_process_interface.h index b3b5f26468f..4a4f1d1e990 100644 --- a/src/mongo/db/pipeline/process_interface/common_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/common_process_interface.h @@ -32,6 +32,7 @@ #include <vector> #include "mongo/bson/bsonobj.h" +#include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/process_interface/mongo_process_interface.h" namespace mongo { @@ -47,10 +48,22 @@ public: /** * Estimates the size of writes that will be executed on the current node. Note that this - * does not account for the full size of an update statement. + * does not account for the full size of an update statement because in the case of local + * writes, we will not have to serialize to BSON and are therefore not subject to the 16MB + * BSONObj size limit. */ class LocalWriteSizeEstimator final : public WriteSizeEstimator { public: + int estimateInsertHeaderSize( + const write_ops::InsertCommandRequest& insertReq) const override { + return 0; + } + + int estimateUpdateHeaderSize( + const write_ops::UpdateCommandRequest& insertReq) const override { + return 0; + } + int estimateInsertSizeBytes(const BSONObj& insert) const override { return insert.objsize(); } @@ -70,6 +83,16 @@ public: */ class TargetPrimaryWriteSizeEstimator final : public WriteSizeEstimator { public: + int estimateInsertHeaderSize( + const write_ops::InsertCommandRequest& insertReq) const override { + return write_ops::getInsertHeaderSizeEstimate(insertReq); + } + + int estimateUpdateHeaderSize( + const write_ops::UpdateCommandRequest& updateReq) const override { + return write_ops::getUpdateHeaderSizeEstimate(updateReq); + } + int estimateInsertSizeBytes(const BSONObj& insert) const override { return insert.objsize() + write_ops::kWriteCommandBSONArrayPerElementOverheadBytes; } @@ -109,8 +132,6 @@ public: virtual std::vector<FieldPath> collectDocumentKeyFieldsActingAsRouter( OperationContext*, const NamespaceString&) const override; - std::unique_ptr<WriteSizeEstimator> getWriteSizeEstimator( - OperationContext* opCtx, const NamespaceString& ns) const override; virtual void updateClientOperationTime(OperationContext* opCtx) const final; diff --git a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h index 1aa7d6be8de..05ee7dd7ad7 100644 --- a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h @@ -112,8 +112,19 @@ public: public: virtual ~WriteSizeEstimator() = default; + /** + * Set of functions which estimate the entire size of a write command except for the array + * of write statements themselves. + */ + virtual int estimateInsertHeaderSize( + const write_ops::InsertCommandRequest& insertReq) const = 0; + virtual int estimateUpdateHeaderSize( + const write_ops::UpdateCommandRequest& updateReq) const = 0; + + /** + * Set of functions which estimate the size of a single write statement. + */ virtual int estimateInsertSizeBytes(const BSONObj& insert) const = 0; - virtual int estimateUpdateSizeBytes(const BatchObject& batchObject, UpsertType type) const = 0; }; @@ -168,34 +179,35 @@ public: virtual void updateClientOperationTime(OperationContext* opCtx) const = 0; /** - * Inserts 'objs' into 'ns' and returns an error Status if the insert fails. If 'targetEpoch' is - * set, throws ErrorCodes::StaleEpoch if the targeted collection does not have the same epoch or - * the epoch changes during the course of the insert. + * Executes 'insertCommand' against 'ns' and returns an error Status if the insert fails. If + * 'targetEpoch' is set, throws ErrorCodes::StaleEpoch if the targeted collection does not have + * the same epoch or the epoch changes during the course of the insert. */ virtual Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - std::vector<BSONObj>&& objs, + std::unique_ptr<write_ops::InsertCommandRequest> insertCommand, const WriteConcernOptions& wc, boost::optional<OID> targetEpoch) = 0; virtual Status insertTimeseries(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - std::vector<BSONObj>&& objs, + std::unique_ptr<write_ops::InsertCommandRequest> insertCommand, const WriteConcernOptions& wc, boost::optional<OID> targetEpoch) = 0; /** - * Updates the documents matching 'queries' with the objects 'updates'. Returns an error Status - * if any of the updates fail, otherwise returns an 'UpdateResult' objects with the details of - * the update operation. If 'targetEpoch' is set, throws ErrorCodes::StaleEpoch if the targeted - * collection does not have the same epoch, or if the epoch changes during the update. - */ - virtual StatusWith<UpdateResult> update(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& ns, - BatchedObjects&& batch, - const WriteConcernOptions& wc, - UpsertType upsert, - bool multi, - boost::optional<OID> targetEpoch) = 0; + * Executes the updates described by 'updateCommand'. Returns an error Status if any of the + * updates fail, otherwise returns an 'UpdateResult' objects with the details of the update + * operation. If 'targetEpoch' is set, throws ErrorCodes::StaleEpoch if the targeted collection + * does not have the same epoch, or if the epoch changes during the update. + */ + virtual StatusWith<UpdateResult> update( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + std::unique_ptr<write_ops::UpdateCommandRequest> updateCommand, + const WriteConcernOptions& wc, + UpsertType upsert, + bool multi, + boost::optional<OID> targetEpoch) = 0; /** * Returns index usage statistics for each index on collection 'ns' along with additional diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp index 9e4a6f7cce7..33e900c06a9 100644 --- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp @@ -98,7 +98,7 @@ bool supportsUniqueKey(const boost::intrusive_ptr<ExpressionContext>& expCtx, } // namespace -std::unique_ptr<CommonProcessInterface::WriteSizeEstimator> +std::unique_ptr<MongoProcessInterface::WriteSizeEstimator> MongosProcessInterface::getWriteSizeEstimator(OperationContext* opCtx, const NamespaceString& ns) const { return std::make_unique<TargetPrimaryWriteSizeEstimator>(); diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h index a7dea455057..0ea14f6dcc2 100644 --- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h @@ -72,7 +72,7 @@ public: Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - std::vector<BSONObj>&& objs, + std::unique_ptr<write_ops::InsertCommandRequest> insertCommand, const WriteConcernOptions& wc, boost::optional<OID>) final { MONGO_UNREACHABLE; @@ -80,7 +80,7 @@ public: Status insertTimeseries(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - std::vector<BSONObj>&& objs, + std::unique_ptr<write_ops::InsertCommandRequest> insertCommand, const WriteConcernOptions& wc, boost::optional<OID> targetEpoch) final { MONGO_UNREACHABLE; @@ -88,7 +88,7 @@ public: StatusWith<UpdateResult> update(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - BatchedObjects&& batch, + std::unique_ptr<write_ops::UpdateCommandRequest> updateCommand, const WriteConcernOptions& wc, UpsertType upsert, bool multi, diff --git a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp index e5bceac9686..f153109e0e6 100644 --- a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp @@ -108,13 +108,13 @@ boost::optional<Document> NonShardServerProcessInterface::lookupSingleDocument( return lookedUpDocument; } -Status NonShardServerProcessInterface::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& ns, - std::vector<BSONObj>&& objs, - const WriteConcernOptions& wc, - boost::optional<OID> targetEpoch) { - auto writeResults = write_ops_exec::performInserts( - expCtx->opCtx, buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation)); +Status NonShardServerProcessInterface::insert( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + std::unique_ptr<write_ops::InsertCommandRequest> insertCommand, + const WriteConcernOptions& wc, + boost::optional<OID> targetEpoch) { + auto writeResults = write_ops_exec::performInserts(expCtx->opCtx, *insertCommand); // Need to check each result in the batch since the writes are unordered. for (const auto& result : writeResults.results) { @@ -128,12 +128,11 @@ Status NonShardServerProcessInterface::insert(const boost::intrusive_ptr<Express Status NonShardServerProcessInterface::insertTimeseries( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - std::vector<BSONObj>&& objs, + std::unique_ptr<write_ops::InsertCommandRequest> insertCommand, const WriteConcernOptions& wc, boost::optional<OID> targetEpoch) { try { - auto insertReply = write_ops_exec::performTimeseriesWrites( - expCtx->opCtx, buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation)); + auto insertReply = write_ops_exec::performTimeseriesWrites(expCtx->opCtx, *insertCommand); checkWriteErrors(insertReply.getWriteCommandReplyBase()); } catch (DBException& ex) { @@ -146,13 +145,12 @@ Status NonShardServerProcessInterface::insertTimeseries( StatusWith<MongoProcessInterface::UpdateResult> NonShardServerProcessInterface::update( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - BatchedObjects&& batch, + std::unique_ptr<write_ops::UpdateCommandRequest> updateCommand, const WriteConcernOptions& wc, UpsertType upsert, bool multi, boost::optional<OID> targetEpoch) { - auto writeResults = write_ops_exec::performUpdates( - expCtx->opCtx, buildUpdateOp(expCtx, ns, std::move(batch), upsert, multi)); + auto writeResults = write_ops_exec::performUpdates(expCtx->opCtx, *updateCommand); // Need to check each result in the batch since the writes are unordered. UpdateResult updateResult; diff --git a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h index 9bae8030aac..639cc22d044 100644 --- a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h @@ -91,19 +91,19 @@ public: Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - std::vector<BSONObj>&& objs, + std::unique_ptr<write_ops::InsertCommandRequest> insertCommand, const WriteConcernOptions& wc, boost::optional<OID> targetEpoch) override; Status insertTimeseries(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - std::vector<BSONObj>&& objs, + std::unique_ptr<write_ops::InsertCommandRequest> insertCommand, const WriteConcernOptions& wc, boost::optional<OID> targetEpoch) override; StatusWith<UpdateResult> update(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - BatchedObjects&& batch, + std::unique_ptr<write_ops::UpdateCommandRequest> updateCommand, const WriteConcernOptions& wc, UpsertType upsert, bool multi, diff --git a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp index ad504860169..3195b504faf 100644 --- a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp @@ -69,26 +69,27 @@ void ReplicaSetNodeProcessInterface::setReplicaSetNodeExecutor( replicaSetNodeExecutor(service) = std::move(executor); } -Status ReplicaSetNodeProcessInterface::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& ns, - std::vector<BSONObj>&& objs, - const WriteConcernOptions& wc, - boost::optional<OID> targetEpoch) { +Status ReplicaSetNodeProcessInterface::insert( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + std::unique_ptr<write_ops::InsertCommandRequest> insertCommand, + const WriteConcernOptions& wc, + boost::optional<OID> targetEpoch) { auto&& opCtx = expCtx->opCtx; if (_canWriteLocally(opCtx, ns)) { - return NonShardServerProcessInterface::insert(expCtx, ns, std::move(objs), wc, targetEpoch); + return NonShardServerProcessInterface::insert( + expCtx, ns, std::move(insertCommand), wc, targetEpoch); } - BatchedCommandRequest insertCommand( - buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation)); + BatchedCommandRequest batchInsertCommand(std::move(insertCommand)); - return _executeCommandOnPrimary(opCtx, ns, std::move(insertCommand.toBSON())).getStatus(); + return _executeCommandOnPrimary(opCtx, ns, batchInsertCommand.toBSON()).getStatus(); } StatusWith<MongoProcessInterface::UpdateResult> ReplicaSetNodeProcessInterface::update( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - BatchedObjects&& batch, + std::unique_ptr<write_ops::UpdateCommandRequest> updateCommand, const WriteConcernOptions& wc, UpsertType upsert, bool multi, @@ -96,11 +97,11 @@ StatusWith<MongoProcessInterface::UpdateResult> ReplicaSetNodeProcessInterface:: auto&& opCtx = expCtx->opCtx; if (_canWriteLocally(opCtx, ns)) { return NonShardServerProcessInterface::update( - expCtx, ns, std::move(batch), wc, upsert, multi, targetEpoch); + expCtx, ns, std::move(updateCommand), wc, upsert, multi, targetEpoch); } + BatchedCommandRequest batchUpdateCommand(std::move(updateCommand)); - BatchedCommandRequest updateCommand(buildUpdateOp(expCtx, ns, std::move(batch), upsert, multi)); - auto result = _executeCommandOnPrimary(opCtx, ns, std::move(updateCommand.toBSON())); + auto result = _executeCommandOnPrimary(opCtx, ns, batchUpdateCommand.toBSON()); if (!result.isOK()) { return result.getStatus(); } @@ -142,14 +143,15 @@ void ReplicaSetNodeProcessInterface::createTimeseriesView(OperationContext* opCt Status ReplicaSetNodeProcessInterface::insertTimeseries( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - std::vector<BSONObj>&& objs, + std::unique_ptr<write_ops::InsertCommandRequest> insertCommand, const WriteConcernOptions& wc, boost::optional<OID> targetEpoch) { if (_canWriteLocally(expCtx->opCtx, ns)) { return NonShardServerProcessInterface::insertTimeseries( - expCtx, ns, std::move(objs), wc, targetEpoch); + expCtx, ns, std::move(insertCommand), wc, targetEpoch); } else { - return ReplicaSetNodeProcessInterface::insert(expCtx, ns, std::move(objs), wc, targetEpoch); + return ReplicaSetNodeProcessInterface::insert( + expCtx, ns, std::move(insertCommand), wc, targetEpoch); } } diff --git a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h index cf5bb5a90ef..94f45559384 100644 --- a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h @@ -68,12 +68,13 @@ public: Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - std::vector<BSONObj>&& objs, + std::unique_ptr<write_ops::InsertCommandRequest> insertCommand, const WriteConcernOptions& wc, boost::optional<OID> targetEpoch) final; + StatusWith<UpdateResult> update(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - BatchedObjects&& batch, + std::unique_ptr<write_ops::UpdateCommandRequest> updateCommand, const WriteConcernOptions& wc, UpsertType upsert, bool multi, @@ -100,7 +101,7 @@ public: Status insertTimeseries(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - std::vector<BSONObj>&& objs, + std::unique_ptr<write_ops::InsertCommandRequest> insertCommand, const WriteConcernOptions& wc, boost::optional<OID> targetEpoch); diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp index bfea6b001a6..dd9367b09e2 100644 --- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp @@ -123,20 +123,20 @@ boost::optional<Document> ShardServerProcessInterface::lookupSingleDocument( return doLookupSingleDocument(expCtx, nss, collectionUUID, documentKey, std::move(opts)); } -Status ShardServerProcessInterface::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& ns, - std::vector<BSONObj>&& objs, - const WriteConcernOptions& wc, - boost::optional<OID> targetEpoch) { +Status ShardServerProcessInterface::insert( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + std::unique_ptr<write_ops::InsertCommandRequest> insertCommand, + const WriteConcernOptions& wc, + boost::optional<OID> targetEpoch) { BatchedCommandResponse response; BatchWriteExecStats stats; - BatchedCommandRequest insertCommand( - buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation)); + BatchedCommandRequest batchInsertCommand(std::move(insertCommand)); - insertCommand.setWriteConcern(wc.toBSON()); + batchInsertCommand.setWriteConcern(wc.toBSON()); - cluster::write(expCtx->opCtx, insertCommand, &stats, &response, targetEpoch); + cluster::write(expCtx->opCtx, batchInsertCommand, &stats, &response, targetEpoch); return response.toStatus(); } @@ -144,7 +144,7 @@ Status ShardServerProcessInterface::insert(const boost::intrusive_ptr<Expression StatusWith<MongoProcessInterface::UpdateResult> ShardServerProcessInterface::update( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - BatchedObjects&& batch, + std::unique_ptr<write_ops::UpdateCommandRequest> updateCommand, const WriteConcernOptions& wc, UpsertType upsert, bool multi, @@ -152,11 +152,10 @@ StatusWith<MongoProcessInterface::UpdateResult> ShardServerProcessInterface::upd BatchedCommandResponse response; BatchWriteExecStats stats; - BatchedCommandRequest updateCommand(buildUpdateOp(expCtx, ns, std::move(batch), upsert, multi)); - - updateCommand.setWriteConcern(wc.toBSON()); + BatchedCommandRequest batchUpdateCommand(std::move(updateCommand)); + batchUpdateCommand.setWriteConcern(wc.toBSON()); - cluster::write(expCtx->opCtx, updateCommand, &stats, &response, targetEpoch); + cluster::write(expCtx->opCtx, batchUpdateCommand, &stats, &response, targetEpoch); if (auto status = response.toStatus(); status != Status::OK()) { return status; @@ -402,10 +401,11 @@ void ShardServerProcessInterface::createTimeseriesView(OperationContext* opCtx, Status ShardServerProcessInterface::insertTimeseries( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - std::vector<BSONObj>&& objs, + std::unique_ptr<write_ops::InsertCommandRequest> insertCommand, const WriteConcernOptions& wc, boost::optional<OID> targetEpoch) { - return ShardServerProcessInterface::insert(expCtx, ns, std::move(objs), wc, targetEpoch); + return ShardServerProcessInterface::insert( + expCtx, ns, std::move(insertCommand), wc, targetEpoch); } std::unique_ptr<Pipeline, PipelineDeleter> diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h index c81af91b6e0..aef9845f135 100644 --- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h @@ -53,6 +53,11 @@ public: const NamespaceString& nss, ChunkVersion targetCollectionPlacementVersion) const final; + std::unique_ptr<WriteSizeEstimator> getWriteSizeEstimator( + OperationContext* opCtx, const NamespaceString& ns) const final { + return std::make_unique<TargetPrimaryWriteSizeEstimator>(); + } + std::vector<FieldPath> collectDocumentKeyFieldsActingAsRouter( OperationContext*, const NamespaceString&) const final { // We don't expect anyone to use this method on the shard itself (yet). This is currently @@ -71,23 +76,15 @@ public: const Document& documentKey, boost::optional<BSONObj> readConcern) final; - /** - * Inserts the documents 'objs' into the namespace 'ns' using the ClusterWriter for locking, - * routing, stale config handling, etc. - */ Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - std::vector<BSONObj>&& objs, + std::unique_ptr<write_ops::InsertCommandRequest> insertCommand, const WriteConcernOptions& wc, boost::optional<OID> targetEpoch) final; - /** - * Replaces the documents matching 'queries' with 'updates' using the ClusterWriter for locking, - * routing, stale config handling, etc. - */ StatusWith<UpdateResult> update(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - BatchedObjects&& batch, + std::unique_ptr<write_ops::UpdateCommandRequest> updateCommand, const WriteConcernOptions& wc, UpsertType upsert, bool multi, @@ -149,7 +146,7 @@ public: Status insertTimeseries(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - std::vector<BSONObj>&& objs, + std::unique_ptr<write_ops::InsertCommandRequest> insertCommand, const WriteConcernOptions& wc, boost::optional<OID> targetEpoch) final; }; diff --git a/src/mongo/db/pipeline/process_interface/standalone_process_interface.h b/src/mongo/db/pipeline/process_interface/standalone_process_interface.h index aceff8e6928..dc562b9089e 100644 --- a/src/mongo/db/pipeline/process_interface/standalone_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/standalone_process_interface.h @@ -41,6 +41,11 @@ public: StandaloneProcessInterface(std::shared_ptr<executor::TaskExecutor> exec) : NonShardServerProcessInterface(std::move(exec)) {} + std::unique_ptr<MongoProcessInterface::WriteSizeEstimator> getWriteSizeEstimator( + OperationContext* opCtx, const NamespaceString& ns) const final { + return std::make_unique<LocalWriteSizeEstimator>(); + } + virtual ~StandaloneProcessInterface() = default; }; diff --git a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h index 60783a7709b..93ed117a53d 100644 --- a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h @@ -55,6 +55,16 @@ public: class StubWriteSizeEstimator final : public WriteSizeEstimator { public: + int estimateInsertHeaderSize( + const write_ops::InsertCommandRequest& insertReq) const override { + return 0; + } + + int estimateUpdateHeaderSize( + const write_ops::UpdateCommandRequest& insertReq) const override { + return 0; + } + int estimateInsertSizeBytes(const BSONObj& insert) const override { MONGO_UNREACHABLE; } @@ -64,6 +74,7 @@ public: MONGO_UNREACHABLE; } }; + std::unique_ptr<WriteSizeEstimator> getWriteSizeEstimator( OperationContext* opCtx, const NamespaceString& ns) const override { return std::make_unique<StubWriteSizeEstimator>(); @@ -77,7 +88,7 @@ public: Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - std::vector<BSONObj>&& objs, + std::unique_ptr<write_ops::InsertCommandRequest> insertCommand, const WriteConcernOptions& wc, boost::optional<OID>) override { MONGO_UNREACHABLE; @@ -85,7 +96,7 @@ public: Status insertTimeseries(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - std::vector<BSONObj>&& objs, + std::unique_ptr<write_ops::InsertCommandRequest> insertCommand, const WriteConcernOptions& wc, boost::optional<OID> targetEpoch) override { MONGO_UNREACHABLE; @@ -93,7 +104,7 @@ public: StatusWith<UpdateResult> update(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - BatchedObjects&& batch, + std::unique_ptr<write_ops::UpdateCommandRequest> updateCommand, const WriteConcernOptions& wc, UpsertType upsert, bool multi, |