diff options
author | Anton Korshunov <anton.korshunov@mongodb.com> | 2019-05-23 12:35:21 +0100 |
---|---|---|
committer | Anton Korshunov <anton.korshunov@mongodb.com> | 2019-05-30 23:18:38 +0100 |
commit | fc05d715eb813ddc72d38c74c6a1c4e447ae1b76 (patch) | |
tree | 187f98a85f493eb39a0243a87d09f6dfbf4aa7e7 /src/mongo/db/pipeline/document_source_merge.h | |
parent | 32287881c1fdd01708a70f912f6775f0afaa5114 (diff) | |
download | mongo-fc05d715eb813ddc72d38c74c6a1c4e447ae1b76.tar.gz |
SERVER-40432 Undo 4.2 changes to $out
Diffstat (limited to 'src/mongo/db/pipeline/document_source_merge.h')
-rw-r--r-- | src/mongo/db/pipeline/document_source_merge.h | 116 |
1 files changed, 36 insertions, 80 deletions
diff --git a/src/mongo/db/pipeline/document_source_merge.h b/src/mongo/db/pipeline/document_source_merge.h index aafae386acf..811ebf97591 100644 --- a/src/mongo/db/pipeline/document_source_merge.h +++ b/src/mongo/db/pipeline/document_source_merge.h @@ -29,9 +29,8 @@ #pragma once -#include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_merge_gen.h" -#include "mongo/db/pipeline/document_source_out.h" +#include "mongo/db/pipeline/document_source_writer.h" namespace mongo { @@ -40,10 +39,8 @@ namespace mongo { * this class must be initialized (via a constructor) with a 'MergeDescriptor', which defines a * a particular merge strategy for a pair of 'whenMatched' and 'whenNotMatched' merge modes. */ -class DocumentSourceMerge final : public DocumentSource { +class DocumentSourceMerge final : public DocumentSourceWriter<MongoProcessInterface::BatchObject> { public: - using BatchedObjects = MongoProcessInterface::BatchedObjects; - static constexpr StringData kStageName = "$merge"_sd; // A descriptor for a merge strategy. Holds a merge strategy function and a set of actions @@ -84,38 +81,12 @@ public: } }; - /** - * Builds a new $merge stage which will merge all documents into 'outputNs'. If - * 'targetCollectionVersion' is provided then processing will stop with an error if the - * collection's epoch changes during the course of execution. This is used as a mechanism to - * prevent the shard key from changing. - */ - DocumentSourceMerge(NamespaceString outputNs, - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const MergeStrategyDescriptor& descriptor, - boost::optional<BSONObj> letVariables, - boost::optional<std::vector<BSONObj>> pipeline, - std::set<FieldPath> mergeOnFields, - boost::optional<ChunkVersion> targetCollectionVersion, - bool serializeAsOutStage); - virtual ~DocumentSourceMerge() = default; const char* getSourceName() const final override { return kStageName.rawData(); } - DepsTracker::State getDependencies(DepsTracker* deps) const final override { - deps->needWholeDocument = true; - return DepsTracker::State::EXHAUSTIVE_ALL; - } - - GetModPathsReturn getModifiedPaths() const final override { - // For purposes of tracking which fields come from where, this stage does not modify any - // fields. - return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>{}, {}}; - } - StageConstraints constraints(Pipeline::SplitState pipeState) const final override { // A $merge to an unsharded collection should merge on the primary shard to perform local // writes. A $merge to a sharded collection has no requirement, since each shard can perform @@ -147,25 +118,12 @@ public: if (pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _outputNs)) { return boost::none; } - // {shardsStage, mergingStage, sortPattern} - return DistributedPlanLogic{nullptr, this, boost::none}; - } - - bool canRunInParallelBeforeOut( - const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const final override { - // If someone is asking the question, this must be the $merge stage in question, so yes! - return true; + return DocumentSourceWriter::distributedPlanLogic(); } - GetNextResult getNext() final override; - Value serialize( boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final override; - const NamespaceString& getOutputNs() const { - return _outputNs; - } - /** * Creates a new $merge stage from the given arguments. */ @@ -177,8 +135,7 @@ public: boost::optional<BSONObj> letVariables, boost::optional<std::vector<BSONObj>> pipeline, std::set<FieldPath> mergeOnFields, - boost::optional<ChunkVersion> targetCollectionVersion, - bool serializeAsOutStage); + boost::optional<ChunkVersion> targetCollectionVersion); /** * Parses a $merge stage from the user-supplied BSON. @@ -188,28 +145,23 @@ public: private: /** - * Writes the documents in 'batch' to the output namespace. + * Builds a new $merge stage which will merge all documents into 'outputNs'. If + * 'targetCollectionVersion' is provided then processing will stop with an error if the + * collection's epoch changes during the course of execution. This is used as a mechanism to + * prevent the shard key from changing. */ - void spill(BatchedObjects&& batch) { - OutStageWriteBlock writeBlock(pExpCtx->opCtx); - - try { - auto targetEpoch = _targetCollectionVersion - ? boost::optional<OID>(_targetCollectionVersion->epoch()) - : boost::none; - - _descriptor.strategy(pExpCtx, _outputNs, _writeConcern, targetEpoch, std::move(batch)); - } catch (const ExceptionFor<ErrorCodes::ImmutableField>& ex) { - uassertStatusOKWithContext(ex.toStatus(), - "$merge failed to update the matching document, did you " - "attempt to modify the _id or the shard key?"); - } - } + DocumentSourceMerge(NamespaceString outputNs, + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const MergeStrategyDescriptor& descriptor, + boost::optional<BSONObj> letVariables, + boost::optional<std::vector<BSONObj>> pipeline, + std::set<FieldPath> mergeOnFields, + boost::optional<ChunkVersion> targetCollectionVersion); /** * Creates an UpdateModification object from the given 'doc' to be used with the batched update. */ - auto makeBatchUpdateModification(const Document& doc) { + auto makeBatchUpdateModification(const Document& doc) const { return _pipeline ? write_ops::UpdateModification(*_pipeline) : write_ops::UpdateModification(doc.toBson()); } @@ -218,7 +170,7 @@ private: * Resolves 'let' defined variables against the 'doc' and stores the results in the returned * BSON. */ - boost::optional<BSONObj> resolveLetVariablesIfNeeded(const Document& doc) { + boost::optional<BSONObj> resolveLetVariablesIfNeeded(const Document& doc) const { // When we resolve 'let' variables, an empty BSON object or boost::none won't make any // difference at the end-point (in the PipelineExecutor), as in both cases we will end up // with the update pipeline ExpressionContext not being populated with any variables, so we @@ -234,18 +186,27 @@ private: return bob.obj(); } - // Stash the writeConcern of the original command as the operation context may change by the - // time we start to spill $merge writes. This is because certain aggregations (e.g. $exchange) - // establish cursors with batchSize 0 then run subsequent getMore's which use a new operation - // context. The getMore's will not have an attached writeConcern however we still want to - // respect the writeConcern of the original command. - WriteConcernOptions _writeConcern; + void spill(BatchedObjects&& batch) override { + DocumentSourceWriteBlock writeBlock(pExpCtx->opCtx); - const NamespaceString _outputNs; - boost::optional<ChunkVersion> _targetCollectionVersion; + try { + auto targetEpoch = _targetCollectionVersion + ? boost::optional<OID>(_targetCollectionVersion->epoch()) + : boost::none; - bool _initialized = false; - bool _done = false; + _descriptor.strategy(pExpCtx, _outputNs, _writeConcern, targetEpoch, std::move(batch)); + } catch (const ExceptionFor<ErrorCodes::ImmutableField>& ex) { + uassertStatusOKWithContext(ex.toStatus(), + "$merge failed to update the matching document, did you " + "attempt to modify the _id or the shard key?"); + } + } + + void waitWhileFailPointEnabled() override; + + std::pair<BatchObject, int> makeBatchObject(Document&& doc) const override; + + boost::optional<ChunkVersion> _targetCollectionVersion; // A merge descriptor contains a merge strategy function describing how to merge two // collections, as well as some other metadata needed to perform the merge operation. This is @@ -271,11 +232,6 @@ private: // True if '_mergeOnFields' contains the _id. We store this as a separate boolean to avoid // repeated lookups into the set. bool _mergeOnFieldsIncludesId; - - // If true, display this stage in the explain output as an $out stage rather that $merge. This - // is used when the $merge stage was used an alias for $out's 'insertDocuments' and - // 'replaceDocuments' modes. - bool _serializeAsOutStage; }; } // namespace mongo |