diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2018-11-29 09:56:42 -0500 |
---|---|---|
committer | Charlie Swanson <charlie.swanson@mongodb.com> | 2019-02-13 07:30:10 -0500 |
commit | 87194fbe0c24525bc1f2d674012fe6978eca77d2 (patch) | |
tree | 0f13b14046152f0b475ae16d9a95b5e2ba0c4cbb /src/mongo/db/pipeline/document_source_out.h | |
parent | 69f26fa3798b0d7927858aa704243cdac676c6e9 (diff) | |
download | mongo-87194fbe0c24525bc1f2d674012fe6978eca77d2.tar.gz |
SERVER-38311 Change out merging strategy
Allows an $out stage to run in parallel on all shards if the target
collection is sharded and so is the input collection to the aggregate.
Diffstat (limited to 'src/mongo/db/pipeline/document_source_out.h')
-rw-r--r-- | src/mongo/db/pipeline/document_source_out.h | 39 |
1 files changed, 29 insertions, 10 deletions
diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h index 906a30f10ee..da09c728e79 100644 --- a/src/mongo/db/pipeline/document_source_out.h +++ b/src/mongo/db/pipeline/document_source_out.h @@ -65,7 +65,7 @@ public: /** * Abstract class for the $out aggregation stage. */ -class DocumentSourceOut : public DocumentSource, public NeedsMergerDocumentSource { +class DocumentSourceOut : public DocumentSource { public: /** * A "lite parsed" $out stage is similar to other stages involving foreign collections except in @@ -121,12 +121,22 @@ public: } StageConstraints constraints(Pipeline::SplitState pipeState) const final { + // A $out to an unsharded collection should merge on the primary shard to perform local + // writes. A $out to a sharded collection has no requirement, since each shard can perform + // its own portion of the write. We use 'kAnyShard' to direct it to execute on one of the + // shards in case some of the writes happen to end up being local. + // + // Note that this decision is inherently racy and subject to become stale. This is okay + // because either choice will work correctly, we are simply applying a heuristic + // optimization. + auto hostTypeRequirement = HostTypeRequirement::kPrimaryShard; + if (pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _outputNs) && + _mode != WriteModeEnum::kModeReplaceCollection) { + hostTypeRequirement = HostTypeRequirement::kAnyShard; + } return {StreamType::kStreaming, PositionRequirement::kLast, - // A $out to an unsharded collection should merge on the primary shard to perform - // local writes. A $out to a sharded collection has no requirement, since each shard - // can perform its own portion of the write. - HostTypeRequirement::kPrimaryShard, + hostTypeRequirement, DiskUseRequirement::kWritesPersistentData, FacetRequirement::kNotAllowed, TransactionRequirement::kNotAllowed}; @@ -140,12 +150,21 @@ public: return _mode; } - boost::intrusive_ptr<DocumentSource> getShardSource() final { - return nullptr; - } - MergingLogic mergingLogic() final { - return {this}; + boost::optional<MergingLogic> mergingLogic() final { + // It should always be faster to avoid splitting the pipeline if the output collection is + // sharded. If we avoid splitting the pipeline then each shard can perform the writes to the + // target collection in parallel. + // + // Note that this decision is inherently racy and subject to become stale. This is okay + // because either choice will work correctly, we are simply applying a heuristic + // optimization. + if (pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _outputNs)) { + return boost::none; + } + // {shardsStage, mergingStage, sortPattern} + return MergingLogic{nullptr, this, boost::none}; } + virtual bool canRunInParallelBeforeOut( const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const final { // If someone is asking the question, this must be the $out stage in question, so yes! |