summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_merge.h
diff options
context:
space:
mode:
authorAnton Korshunov <anton.korshunov@mongodb.com>2019-05-23 12:35:21 +0100
committerAnton Korshunov <anton.korshunov@mongodb.com>2019-05-30 23:18:38 +0100
commitfc05d715eb813ddc72d38c74c6a1c4e447ae1b76 (patch)
tree187f98a85f493eb39a0243a87d09f6dfbf4aa7e7 /src/mongo/db/pipeline/document_source_merge.h
parent32287881c1fdd01708a70f912f6775f0afaa5114 (diff)
downloadmongo-fc05d715eb813ddc72d38c74c6a1c4e447ae1b76.tar.gz
SERVER-40432 Undo 4.2 changes to $out
Diffstat (limited to 'src/mongo/db/pipeline/document_source_merge.h')
-rw-r--r--src/mongo/db/pipeline/document_source_merge.h116
1 files changed, 36 insertions, 80 deletions
diff --git a/src/mongo/db/pipeline/document_source_merge.h b/src/mongo/db/pipeline/document_source_merge.h
index aafae386acf..811ebf97591 100644
--- a/src/mongo/db/pipeline/document_source_merge.h
+++ b/src/mongo/db/pipeline/document_source_merge.h
@@ -29,9 +29,8 @@
#pragma once
-#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_merge_gen.h"
-#include "mongo/db/pipeline/document_source_out.h"
+#include "mongo/db/pipeline/document_source_writer.h"
namespace mongo {
@@ -40,10 +39,8 @@ namespace mongo {
* this class must be initialized (via a constructor) with a 'MergeDescriptor', which defines a
* a particular merge strategy for a pair of 'whenMatched' and 'whenNotMatched' merge modes.
*/
-class DocumentSourceMerge final : public DocumentSource {
+class DocumentSourceMerge final : public DocumentSourceWriter<MongoProcessInterface::BatchObject> {
public:
- using BatchedObjects = MongoProcessInterface::BatchedObjects;
-
static constexpr StringData kStageName = "$merge"_sd;
// A descriptor for a merge strategy. Holds a merge strategy function and a set of actions
@@ -84,38 +81,12 @@ public:
}
};
- /**
- * Builds a new $merge stage which will merge all documents into 'outputNs'. If
- * 'targetCollectionVersion' is provided then processing will stop with an error if the
- * collection's epoch changes during the course of execution. This is used as a mechanism to
- * prevent the shard key from changing.
- */
- DocumentSourceMerge(NamespaceString outputNs,
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const MergeStrategyDescriptor& descriptor,
- boost::optional<BSONObj> letVariables,
- boost::optional<std::vector<BSONObj>> pipeline,
- std::set<FieldPath> mergeOnFields,
- boost::optional<ChunkVersion> targetCollectionVersion,
- bool serializeAsOutStage);
-
virtual ~DocumentSourceMerge() = default;
const char* getSourceName() const final override {
return kStageName.rawData();
}
- DepsTracker::State getDependencies(DepsTracker* deps) const final override {
- deps->needWholeDocument = true;
- return DepsTracker::State::EXHAUSTIVE_ALL;
- }
-
- GetModPathsReturn getModifiedPaths() const final override {
- // For purposes of tracking which fields come from where, this stage does not modify any
- // fields.
- return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>{}, {}};
- }
-
StageConstraints constraints(Pipeline::SplitState pipeState) const final override {
// A $merge to an unsharded collection should merge on the primary shard to perform local
// writes. A $merge to a sharded collection has no requirement, since each shard can perform
@@ -147,25 +118,12 @@ public:
if (pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _outputNs)) {
return boost::none;
}
- // {shardsStage, mergingStage, sortPattern}
- return DistributedPlanLogic{nullptr, this, boost::none};
- }
-
- bool canRunInParallelBeforeOut(
- const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const final override {
- // If someone is asking the question, this must be the $merge stage in question, so yes!
- return true;
+ return DocumentSourceWriter::distributedPlanLogic();
}
- GetNextResult getNext() final override;
-
Value serialize(
boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final override;
- const NamespaceString& getOutputNs() const {
- return _outputNs;
- }
-
/**
* Creates a new $merge stage from the given arguments.
*/
@@ -177,8 +135,7 @@ public:
boost::optional<BSONObj> letVariables,
boost::optional<std::vector<BSONObj>> pipeline,
std::set<FieldPath> mergeOnFields,
- boost::optional<ChunkVersion> targetCollectionVersion,
- bool serializeAsOutStage);
+ boost::optional<ChunkVersion> targetCollectionVersion);
/**
* Parses a $merge stage from the user-supplied BSON.
@@ -188,28 +145,23 @@ public:
private:
/**
- * Writes the documents in 'batch' to the output namespace.
+ * Builds a new $merge stage which will merge all documents into 'outputNs'. If
+ * 'targetCollectionVersion' is provided then processing will stop with an error if the
+ * collection's epoch changes during the course of execution. This is used as a mechanism to
+ * prevent the shard key from changing.
*/
- void spill(BatchedObjects&& batch) {
- OutStageWriteBlock writeBlock(pExpCtx->opCtx);
-
- try {
- auto targetEpoch = _targetCollectionVersion
- ? boost::optional<OID>(_targetCollectionVersion->epoch())
- : boost::none;
-
- _descriptor.strategy(pExpCtx, _outputNs, _writeConcern, targetEpoch, std::move(batch));
- } catch (const ExceptionFor<ErrorCodes::ImmutableField>& ex) {
- uassertStatusOKWithContext(ex.toStatus(),
- "$merge failed to update the matching document, did you "
- "attempt to modify the _id or the shard key?");
- }
- }
+ DocumentSourceMerge(NamespaceString outputNs,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const MergeStrategyDescriptor& descriptor,
+ boost::optional<BSONObj> letVariables,
+ boost::optional<std::vector<BSONObj>> pipeline,
+ std::set<FieldPath> mergeOnFields,
+ boost::optional<ChunkVersion> targetCollectionVersion);
/**
* Creates an UpdateModification object from the given 'doc' to be used with the batched update.
*/
- auto makeBatchUpdateModification(const Document& doc) {
+ auto makeBatchUpdateModification(const Document& doc) const {
return _pipeline ? write_ops::UpdateModification(*_pipeline)
: write_ops::UpdateModification(doc.toBson());
}
@@ -218,7 +170,7 @@ private:
* Resolves 'let' defined variables against the 'doc' and stores the results in the returned
* BSON.
*/
- boost::optional<BSONObj> resolveLetVariablesIfNeeded(const Document& doc) {
+ boost::optional<BSONObj> resolveLetVariablesIfNeeded(const Document& doc) const {
// When we resolve 'let' variables, an empty BSON object or boost::none won't make any
// difference at the end-point (in the PipelineExecutor), as in both cases we will end up
// with the update pipeline ExpressionContext not being populated with any variables, so we
@@ -234,18 +186,27 @@ private:
return bob.obj();
}
- // Stash the writeConcern of the original command as the operation context may change by the
- // time we start to spill $merge writes. This is because certain aggregations (e.g. $exchange)
- // establish cursors with batchSize 0 then run subsequent getMore's which use a new operation
- // context. The getMore's will not have an attached writeConcern however we still want to
- // respect the writeConcern of the original command.
- WriteConcernOptions _writeConcern;
+ void spill(BatchedObjects&& batch) override {
+ DocumentSourceWriteBlock writeBlock(pExpCtx->opCtx);
- const NamespaceString _outputNs;
- boost::optional<ChunkVersion> _targetCollectionVersion;
+ try {
+ auto targetEpoch = _targetCollectionVersion
+ ? boost::optional<OID>(_targetCollectionVersion->epoch())
+ : boost::none;
- bool _initialized = false;
- bool _done = false;
+ _descriptor.strategy(pExpCtx, _outputNs, _writeConcern, targetEpoch, std::move(batch));
+ } catch (const ExceptionFor<ErrorCodes::ImmutableField>& ex) {
+ uassertStatusOKWithContext(ex.toStatus(),
+ "$merge failed to update the matching document, did you "
+ "attempt to modify the _id or the shard key?");
+ }
+ }
+
+ void waitWhileFailPointEnabled() override;
+
+ std::pair<BatchObject, int> makeBatchObject(Document&& doc) const override;
+
+ boost::optional<ChunkVersion> _targetCollectionVersion;
// A merge descriptor contains a merge strategy function describing how to merge two
// collections, as well as some other metadata needed to perform the merge operation. This is
@@ -271,11 +232,6 @@ private:
// True if '_mergeOnFields' contains the _id. We store this as a separate boolean to avoid
// repeated lookups into the set.
bool _mergeOnFieldsIncludesId;
-
- // If true, display this stage in the explain output as an $out stage rather that $merge. This
- // is used when the $merge stage was used an alias for $out's 'insertDocuments' and
- // 'replaceDocuments' modes.
- bool _serializeAsOutStage;
};
} // namespace mongo