summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_out.h
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2018-11-29 09:56:42 -0500
committerCharlie Swanson <charlie.swanson@mongodb.com>2019-02-13 07:30:10 -0500
commit87194fbe0c24525bc1f2d674012fe6978eca77d2 (patch)
tree0f13b14046152f0b475ae16d9a95b5e2ba0c4cbb /src/mongo/db/pipeline/document_source_out.h
parent69f26fa3798b0d7927858aa704243cdac676c6e9 (diff)
downloadmongo-87194fbe0c24525bc1f2d674012fe6978eca77d2.tar.gz
SERVER-38311 Change out merging strategy
Allows an $out stage to run in parallel on all shards if the target collection is sharded and so is the input collection to the aggregate.
Diffstat (limited to 'src/mongo/db/pipeline/document_source_out.h')
-rw-r--r--src/mongo/db/pipeline/document_source_out.h39
1 files changed, 29 insertions, 10 deletions
diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h
index 906a30f10ee..da09c728e79 100644
--- a/src/mongo/db/pipeline/document_source_out.h
+++ b/src/mongo/db/pipeline/document_source_out.h
@@ -65,7 +65,7 @@ public:
/**
* Abstract class for the $out aggregation stage.
*/
-class DocumentSourceOut : public DocumentSource, public NeedsMergerDocumentSource {
+class DocumentSourceOut : public DocumentSource {
public:
/**
* A "lite parsed" $out stage is similar to other stages involving foreign collections except in
@@ -121,12 +121,22 @@ public:
}
StageConstraints constraints(Pipeline::SplitState pipeState) const final {
+ // A $out to an unsharded collection should merge on the primary shard to perform local
+ // writes. A $out to a sharded collection has no requirement, since each shard can perform
+ // its own portion of the write. We use 'kAnyShard' to direct it to execute on one of the
+ // shards in case some of the writes happen to end up being local.
+ //
+ // Note that this decision is inherently racy and subject to become stale. This is okay
+ // because either choice will work correctly, we are simply applying a heuristic
+ // optimization.
+ auto hostTypeRequirement = HostTypeRequirement::kPrimaryShard;
+ if (pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _outputNs) &&
+ _mode != WriteModeEnum::kModeReplaceCollection) {
+ hostTypeRequirement = HostTypeRequirement::kAnyShard;
+ }
return {StreamType::kStreaming,
PositionRequirement::kLast,
- // A $out to an unsharded collection should merge on the primary shard to perform
- // local writes. A $out to a sharded collection has no requirement, since each shard
- // can perform its own portion of the write.
- HostTypeRequirement::kPrimaryShard,
+ hostTypeRequirement,
DiskUseRequirement::kWritesPersistentData,
FacetRequirement::kNotAllowed,
TransactionRequirement::kNotAllowed};
@@ -140,12 +150,21 @@ public:
return _mode;
}
- boost::intrusive_ptr<DocumentSource> getShardSource() final {
- return nullptr;
- }
- MergingLogic mergingLogic() final {
- return {this};
+ boost::optional<MergingLogic> mergingLogic() final {
+ // It should always be faster to avoid splitting the pipeline if the output collection is
+ // sharded. If we avoid splitting the pipeline then each shard can perform the writes to the
+ // target collection in parallel.
+ //
+ // Note that this decision is inherently racy and subject to become stale. This is okay
+ // because either choice will work correctly, we are simply applying a heuristic
+ // optimization.
+ if (pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _outputNs)) {
+ return boost::none;
+ }
+ // {shardsStage, mergingStage, sortPattern}
+ return MergingLogic{nullptr, this, boost::none};
}
+
virtual bool canRunInParallelBeforeOut(
const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const final {
// If someone is asking the question, this must be the $out stage in question, so yes!