summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorAnton Korshunov <anton.korshunov@mongodb.com>2019-05-23 12:35:21 +0100
committerAnton Korshunov <anton.korshunov@mongodb.com>2019-05-30 23:18:38 +0100
commitfc05d715eb813ddc72d38c74c6a1c4e447ae1b76 (patch)
tree187f98a85f493eb39a0243a87d09f6dfbf4aa7e7 /src/mongo
parent32287881c1fdd01708a70f912f6775f0afaa5114 (diff)
downloadmongo-fc05d715eb813ddc72d38c74c6a1c4e447ae1b76.tar.gz
SERVER-40432 Undo 4.2 changes to $out
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/pipeline/SConscript18
-rw-r--r--src/mongo/db/pipeline/document_source.h7
-rw-r--r--src/mongo/db/pipeline/document_source_group.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_group.h2
-rw-r--r--src/mongo/db/pipeline/document_source_merge.cpp300
-rw-r--r--src/mongo/db/pipeline/document_source_merge.h116
-rw-r--r--src/mongo/db/pipeline/document_source_merge_test.cpp48
-rw-r--r--src/mongo/db/pipeline/document_source_out.cpp522
-rw-r--r--src/mongo/db/pipeline/document_source_out.h259
-rw-r--r--src/mongo/db/pipeline/document_source_out.idl84
-rw-r--r--src/mongo/db/pipeline/document_source_out_replace_coll.cpp139
-rw-r--r--src/mongo/db/pipeline/document_source_out_replace_coll.h73
-rw-r--r--src/mongo/db/pipeline/document_source_out_test.cpp316
-rw-r--r--src/mongo/db/pipeline/document_source_sort.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_sort.h2
-rw-r--r--src/mongo/db/pipeline/document_source_writer.h228
-rw-r--r--src/mongo/db/pipeline/mongo_process_common.cpp13
-rw-r--r--src/mongo/db/pipeline/mongo_process_common.h6
-rw-r--r--src/mongo/db/pipeline/mongo_process_interface.h29
-rw-r--r--src/mongo/db/pipeline/mongos_process_interface.cpp58
-rw-r--r--src/mongo/db/pipeline/mongos_process_interface.h14
-rw-r--r--src/mongo/db/pipeline/mongos_process_interface_test.cpp86
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp67
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.cpp38
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.h12
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone_test.cpp130
-rw-r--r--src/mongo/db/pipeline/stub_mongo_process_interface.h22
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner.cpp2
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner_test.cpp236
29 files changed, 1128 insertions, 1705 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index a778d314936..dac668b3ae3 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -398,7 +398,6 @@ pipelineeEnv.Library(
'document_source_match.cpp',
'document_source_merge.cpp',
'document_source_out.cpp',
- 'document_source_out_replace_coll.cpp',
'document_source_plan_cache_stats.cpp',
'document_source_project.cpp',
'document_source_queue.cpp',
@@ -612,7 +611,6 @@ env.Library(
env.Idlc('document_source_list_sessions.idl')[0],
env.Idlc('document_source_merge.idl')[0],
env.Idlc('document_source_merge_modes.idl')[0],
- env.Idlc('document_source_out.idl')[0],
env.Idlc('document_source_replace_root.idl')[0],
env.Idlc('exchange_spec.idl')[0],
env.Idlc('value.idl')[0],
@@ -655,3 +653,19 @@ env.CppUnitTest(
'expression',
],
)
+
+env.CppUnitTest(
+ target='process_interface_test',
+ source=[
+ 'mongos_process_interface_test.cpp',
+ 'process_interface_standalone_test.cpp'
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/db/service_context_test_fixture',
+ 'mongo_process_common',
+ 'mongo_process_interface',
+ 'mongos_process_interface',
+ 'process_interface_shardsvr',
+ 'process_interface_standalone'
+ ],
+ )
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index 5e8244e7984..49689de85dd 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -461,10 +461,11 @@ public:
/**
* Returns true if it would be correct to execute this stage in parallel across the shards in
- * cases where the final stage is an $out. For example, a $group stage which is just merging the
- * groups from the shards can be run in parallel since it will preserve the shard key.
+ * cases where the final stage is a stage which can perform a write operation, such as $merge.
+ * For example, a $group stage which is just merging the groups from the shards can be run in
+ * parallel since it will preserve the shard key.
*/
- virtual bool canRunInParallelBeforeOut(
+ virtual bool canRunInParallelBeforeWriteStage(
const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const {
return false;
}
diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp
index c2d2c0d7509..d8ce48bca13 100644
--- a/src/mongo/db/pipeline/document_source_group.cpp
+++ b/src/mongo/db/pipeline/document_source_group.cpp
@@ -701,7 +701,7 @@ bool DocumentSourceGroup::pathIncludedInGroupKeys(const std::string& dottedPath)
});
}
-bool DocumentSourceGroup::canRunInParallelBeforeOut(
+bool DocumentSourceGroup::canRunInParallelBeforeWriteStage(
const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const {
if (_doingMerge) {
return true; // This is fine.
diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h
index 8324fc2b7f5..29a46df1f9f 100644
--- a/src/mongo/db/pipeline/document_source_group.h
+++ b/src/mongo/db/pipeline/document_source_group.h
@@ -158,7 +158,7 @@ public:
bool usedDisk() final;
boost::optional<DistributedPlanLogic> distributedPlanLogic() final;
- bool canRunInParallelBeforeOut(
+ bool canRunInParallelBeforeWriteStage(
const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const final;
/**
diff --git a/src/mongo/db/pipeline/document_source_merge.cpp b/src/mongo/db/pipeline/document_source_merge.cpp
index 3578d0117d9..988d3e242f8 100644
--- a/src/mongo/db/pipeline/document_source_merge.cpp
+++ b/src/mongo/db/pipeline/document_source_merge.cpp
@@ -39,7 +39,6 @@
#include "mongo/db/curop_failpoint_helpers.h"
#include "mongo/db/ops/write_ops.h"
#include "mongo/db/pipeline/document_path_support.h"
-#include "mongo/db/pipeline/document_source_out.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -227,23 +226,6 @@ bool isSupportedMergeMode(WhenMatched whenMatched, WhenNotMatched whenNotMatched
}
/**
- * Parses the fields of the $merge 'on' from the user-specified 'fields', returning a set of field
- * paths. Throws if 'fields' contains duplicate elements.
- */
-std::set<FieldPath> parseMergeOnFieldsFromSpec(const std::vector<std::string>& fields) {
- std::set<FieldPath> mergeOnFields;
-
- for (const auto& field : fields) {
- const auto res = mergeOnFields.insert(FieldPath(field));
- uassert(ErrorCodes::BadValue,
- "Found a duplicate field '{}' in {} 'on'"_format(field, kStageName),
- res.second);
- }
-
- return mergeOnFields;
-}
-
-/**
* Extracts the fields of $merge 'on' from 'doc' and returns the key as a BSONObj. Throws if any
* field of the 'on' extracted from 'doc' is nullish or an array.
*/
@@ -265,93 +247,6 @@ BSONObj extractMergeOnFieldsFromDoc(const Document& doc, const std::set<FieldPat
}
/**
- * Extracts $merge 'on' fields from the $merge spec when the pipeline is executed on mongoD, or use
- * a default _id field if the user hasn't supplied the 'on' field. For the user supplied field
- * ensures that it can be used to uniquely identify documents for merge.
- */
-std::pair<std::set<FieldPath>, boost::optional<ChunkVersion>> resolveMergeOnFieldsOnMongoD(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const DocumentSourceMergeSpec& spec,
- const NamespaceString& outputNs) {
- invariant(!expCtx->inMongos);
- auto targetCollectionVersion = spec.getTargetCollectionVersion();
- if (targetCollectionVersion) {
- uassert(51123, "Unexpected target chunk version specified", expCtx->fromMongos);
- // If mongos has sent us a target shard version, we need to be sure we are prepared to
- // act as a router which is at least as recent as that mongos.
- expCtx->mongoProcessInterface->checkRoutingInfoEpochOrThrow(
- expCtx, outputNs, *targetCollectionVersion);
- }
-
- auto userSpecifiedMergeOnFields = spec.getOn();
- if (!userSpecifiedMergeOnFields) {
- uassert(51124, "Expected 'on' field to be provided from mongos", !expCtx->fromMongos);
- return {std::set<FieldPath>{"_id"}, targetCollectionVersion};
- }
-
- // Make sure the 'on' field has a supporting index. Skip this check if the command is sent
- // from mongos since the 'on' field check would've happened already.
- auto mergeOnFields = parseMergeOnFieldsFromSpec(*userSpecifiedMergeOnFields);
- if (!expCtx->fromMongos) {
- uassert(51183,
- "Cannot find index to verify that 'on' fields will be unique",
- expCtx->mongoProcessInterface->uniqueKeyIsSupportedByIndex(
- expCtx, outputNs, mergeOnFields));
- }
- return {mergeOnFields, targetCollectionVersion};
-}
-
-/**
- * Extracts $merge 'on' fields from the $merge spec when the pipeline is executed on mongoS. If the
- * user supplied the 'on' field, ensures that it can be used to uniquely identify documents for
- * merge. Otherwise, extracts the shard key and use it as the 'on' field.
- */
-std::pair<std::set<FieldPath>, boost::optional<ChunkVersion>> resolveMergeOnFieldsOnMongoS(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const DocumentSourceMergeSpec& spec,
- const NamespaceString& outputNs) {
- invariant(expCtx->inMongos);
- uassert(51179,
- "{} received unexpected 'targetCollectionVersion' on mongos"_format(kStageName),
- !spec.getTargetCollectionVersion());
-
- if (auto userSpecifiedMergeOnFields = spec.getOn()) {
- // Convert 'on' array to a vector of FieldPaths.
- auto mergeOnFields = parseMergeOnFieldsFromSpec(*userSpecifiedMergeOnFields);
- uassert(51190,
- "Cannot find index to verify that 'on' fields will be unique",
- expCtx->mongoProcessInterface->uniqueKeyIsSupportedByIndex(
- expCtx, outputNs, mergeOnFields));
-
- // If the user supplies the 'on' field we don't need to attach a ChunkVersion for the shards
- // since we are not at risk of 'guessing' the wrong shard key.
- return {mergeOnFields, boost::none};
- }
-
- // In case there are multiple shards which will perform this stage in parallel, we need to
- // figure out and attach the collection's shard version to ensure each shard is talking about
- // the same version of the collection. This mongos will coordinate that. We force a catalog
- // refresh to do so because there is no shard versioning protocol on this namespace and so we
- // otherwise could not be sure this node is (or will become) at all recent. We will also
- // figure out and attach the 'on' field to send to the shards.
-
- // There are cases where the aggregation could fail if the collection is dropped or re-created
- // during or near the time of the aggregation. This is okay - we are mostly paranoid that this
- // mongos is very stale and want to prevent returning an error if the collection was dropped a
- // long time ago. Because of this, we are okay with piggy-backing off another thread's request
- // to refresh the cache, simply waiting for that request to return instead of forcing another
- // refresh.
- boost::optional<ChunkVersion> targetCollectionVersion =
- expCtx->mongoProcessInterface->refreshAndGetCollectionVersion(expCtx, outputNs);
-
- auto docKeyPaths = expCtx->mongoProcessInterface->collectDocumentKeyFieldsActingAsRouter(
- expCtx->opCtx, outputNs);
- return {std::set<FieldPath>(std::make_move_iterator(docKeyPaths.begin()),
- std::make_move_iterator(docKeyPaths.end())),
- targetCollectionVersion};
-}
-
-/**
* Parses a $merge stage specification and resolves the target database name and collection name.
* The $merge specification can be either a string or an object. If the target database name is not
* explicitly specified, it will be defaulted to 'defaultDb'.
@@ -432,18 +327,13 @@ DocumentSourceMerge::DocumentSourceMerge(NamespaceString outputNs,
boost::optional<BSONObj> letVariables,
boost::optional<std::vector<BSONObj>> pipeline,
std::set<FieldPath> mergeOnFields,
- boost::optional<ChunkVersion> targetCollectionVersion,
- bool serializeAsOutStage)
- : DocumentSource(expCtx),
- _writeConcern(expCtx->opCtx->getWriteConcern()),
- _outputNs(std::move(outputNs)),
+ boost::optional<ChunkVersion> targetCollectionVersion)
+ : DocumentSourceWriter(std::move(outputNs), expCtx),
_targetCollectionVersion(targetCollectionVersion),
- _done(false),
_descriptor(descriptor),
_pipeline(std::move(pipeline)),
_mergeOnFields(std::move(mergeOnFields)),
- _mergeOnFieldsIncludesId(_mergeOnFields.count("_id") == 1),
- _serializeAsOutStage(serializeAsOutStage) {
+ _mergeOnFieldsIncludesId(_mergeOnFields.count("_id") == 1) {
if (letVariables) {
_letVariables.emplace();
@@ -466,9 +356,7 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceMerge::create(
boost::optional<BSONObj> letVariables,
boost::optional<std::vector<BSONObj>> pipeline,
std::set<FieldPath> mergeOnFields,
- boost::optional<ChunkVersion> targetCollectionVersion,
- bool serializeAsOutStage) {
-
+ boost::optional<ChunkVersion> targetCollectionVersion) {
uassert(51189,
"Combination of {} modes 'whenMatched: {}' and 'whenNotMatched: {}' "
"is not supported"_format(kStageName,
@@ -515,8 +403,7 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceMerge::create(
std::move(letVariables),
std::move(pipeline),
std::move(mergeOnFields),
- targetCollectionVersion,
- serializeAsOutStage);
+ targetCollectionVersion);
}
boost::intrusive_ptr<DocumentSource> DocumentSourceMerge::createFromBson(
@@ -531,10 +418,9 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceMerge::createFromBson(
mergeSpec.getWhenMatched() ? mergeSpec.getWhenMatched()->mode : kDefaultWhenMatched;
auto whenNotMatched = mergeSpec.getWhenNotMatched().value_or(kDefaultWhenNotMatched);
auto pipeline = mergeSpec.getWhenMatched() ? mergeSpec.getWhenMatched()->pipeline : boost::none;
- // TODO SERVER-40432: move resolveMergeOnFieldsOnMongo* into MongoProcessInterface.
- auto[mergeOnFields, targetCollectionVersion] = expCtx->inMongos
- ? resolveMergeOnFieldsOnMongoS(expCtx, mergeSpec, targetNss)
- : resolveMergeOnFieldsOnMongoD(expCtx, mergeSpec, targetNss);
+ auto[mergeOnFields, targetCollectionVersion] =
+ expCtx->mongoProcessInterface->ensureFieldsUniqueOrResolveDocumentKey(
+ expCtx, mergeSpec.getOn(), mergeSpec.getTargetCollectionVersion(), targetNss);
return DocumentSourceMerge::create(std::move(targetNss),
expCtx,
@@ -543,137 +429,61 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceMerge::createFromBson(
mergeSpec.getLet(),
std::move(pipeline),
std::move(mergeOnFields),
- targetCollectionVersion,
- false /* serialize as $out stage */);
+ targetCollectionVersion);
}
-DocumentSource::GetNextResult DocumentSourceMerge::getNext() {
- pExpCtx->checkForInterrupt();
-
- if (_done) {
- return GetNextResult::makeEOF();
- }
-
- if (!_initialized) {
- // Explain of a $merge should never try to actually execute any writes. We only ever expect
- // getNext() to be called for the 'executionStats' and 'allPlansExecution' explain modes.
- // This assertion should not be triggered for 'queryPlanner' explain of a $merge, which is
- // perfectly legal.
- uassert(51184,
- "explain of {} is not allowed with verbosity {}"_format(
- kStageName, ExplainOptions::verbosityString(*pExpCtx->explain)),
- !pExpCtx->explain);
- _initialized = true;
- }
-
- BatchedObjects batch;
- int bufferedBytes = 0;
-
- auto nextInput = pSource->getNext();
- for (; nextInput.isAdvanced(); nextInput = pSource->getNext()) {
- CurOpFailpointHelpers::waitWhileFailPointEnabled(
- &hangWhileBuildingDocumentSourceMergeBatch,
- pExpCtx->opCtx,
- "hangWhileBuildingDocumentSourceMergeBatch",
- []() {
- log() << "Hanging aggregation due to 'hangWhileBuildingDocumentSourceMergeBatch' "
- << "failpoint";
- });
-
- auto doc = nextInput.releaseDocument();
-
- // Generate an _id if the uniqueKey includes _id but the document doesn't have one.
- if (_mergeOnFieldsIncludesId && doc.getField("_id"_sd).missing()) {
- MutableDocument mutableDoc(std::move(doc));
- mutableDoc["_id"_sd] = Value(OID::gen());
- doc = mutableDoc.freeze();
+Value DocumentSourceMerge::serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
+ DocumentSourceMergeSpec spec;
+ spec.setTargetNss(_outputNs);
+ spec.setLet([&]() -> boost::optional<BSONObj> {
+ if (!_letVariables) {
+ return boost::none;
}
- auto mergeOnFields = extractMergeOnFieldsFromDoc(doc, _mergeOnFields);
- auto mod = makeBatchUpdateModification(doc);
- auto vars = resolveLetVariablesIfNeeded(doc);
- auto modSize = mod.objsize() + (vars ? vars->objsize() : 0);
-
- bufferedBytes += modSize;
- if (!batch.empty() &&
- (bufferedBytes > BSONObjMaxUserSize || batch.size() >= write_ops::kMaxWriteBatchSize)) {
- spill(std::move(batch));
- batch.clear();
- bufferedBytes = modSize;
+ BSONObjBuilder bob;
+ for (auto && [ name, expr ] : *_letVariables) {
+ bob << name << expr->serialize(static_cast<bool>(explain));
}
- batch.emplace_back(std::move(mergeOnFields), std::move(mod), std::move(vars));
- }
- if (!batch.empty()) {
- spill(std::move(batch));
- batch.clear();
- }
-
- switch (nextInput.getStatus()) {
- case GetNextResult::ReturnStatus::kAdvanced: {
- MONGO_UNREACHABLE; // We consumed all advances above.
+ return bob.obj();
+ }());
+ spec.setWhenMatched(MergeWhenMatchedPolicy{_descriptor.mode.first, _pipeline});
+ spec.setWhenNotMatched(_descriptor.mode.second);
+ spec.setOn([&]() {
+ std::vector<std::string> mergeOnFields;
+ for (auto path : _mergeOnFields) {
+ mergeOnFields.push_back(path.fullPath());
}
- case GetNextResult::ReturnStatus::kPauseExecution: {
- return nextInput; // Propagate the pause.
- }
- case GetNextResult::ReturnStatus::kEOF: {
- _done = true;
- return nextInput;
- }
- }
- MONGO_UNREACHABLE;
+ return mergeOnFields;
+ }());
+ spec.setTargetCollectionVersion(_targetCollectionVersion);
+ return Value(Document{{getSourceName(), spec.toBSON()}});
}
-Value DocumentSourceMerge::serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
- if (_serializeAsOutStage) {
- uassert(ErrorCodes::BadValue,
- "Cannot serialize {} stage as $out for 'whenMatched: {}' and "
- "'whenNotMatched: {}'"_format(
- kStageName,
- MergeWhenMatchedMode_serializer(_descriptor.mode.first),
- MergeWhenNotMatchedMode_serializer(_descriptor.mode.second)),
- (_descriptor.mode.first == WhenMatched::kFail ||
- _descriptor.mode.first == WhenMatched::kReplace) &&
- (_descriptor.mode.second == WhenNotMatched::kInsert));
- DocumentSourceOutSpec spec;
- spec.setTargetDb(_outputNs.db());
- spec.setTargetCollection(_outputNs.coll());
- spec.setMode(_descriptor.mode.first == WhenMatched::kFail
- ? WriteModeEnum::kModeInsertDocuments
- : WriteModeEnum::kModeReplaceDocuments);
- spec.setUniqueKey([&]() {
- BSONObjBuilder uniqueKeyBob;
- for (auto path : _mergeOnFields) {
- uniqueKeyBob.append(path.fullPath(), 1);
- }
- return uniqueKeyBob.obj();
- }());
- spec.setTargetCollectionVersion(_targetCollectionVersion);
- return Value(Document{{DocumentSourceOut::kStageName.rawData(), spec.toBSON()}});
- } else {
- DocumentSourceMergeSpec spec;
- spec.setTargetNss(_outputNs);
- spec.setLet([&]() -> boost::optional<BSONObj> {
- if (!_letVariables) {
- return boost::none;
- }
-
- BSONObjBuilder bob;
- for (auto && [ name, expr ] : *_letVariables) {
- bob << name << expr->serialize(static_cast<bool>(explain));
- }
- return bob.obj();
- }());
- spec.setWhenMatched(MergeWhenMatchedPolicy{_descriptor.mode.first, _pipeline});
- spec.setWhenNotMatched(_descriptor.mode.second);
- spec.setOn([&]() {
- std::vector<std::string> mergeOnFields;
- for (auto path : _mergeOnFields) {
- mergeOnFields.push_back(path.fullPath());
- }
- return mergeOnFields;
- }());
- spec.setTargetCollectionVersion(_targetCollectionVersion);
- return Value(Document{{getSourceName(), spec.toBSON()}});
+std::pair<DocumentSourceMerge::BatchObject, int> DocumentSourceMerge::makeBatchObject(
+ Document&& doc) const {
+ // Generate an _id if the uniqueKey includes _id but the document doesn't have one.
+ if (_mergeOnFieldsIncludesId && doc.getField("_id"_sd).missing()) {
+ MutableDocument mutableDoc(std::move(doc));
+ mutableDoc["_id"_sd] = Value(OID::gen());
+ doc = mutableDoc.freeze();
}
+
+ auto mergeOnFields = extractMergeOnFieldsFromDoc(doc, _mergeOnFields);
+ auto mod = makeBatchUpdateModification(doc);
+ auto vars = resolveLetVariablesIfNeeded(doc);
+ auto modSize = mod.objsize() + (vars ? vars->objsize() : 0);
+ return {{std::move(mergeOnFields), std::move(mod), std::move(vars)}, modSize};
}
+
+void DocumentSourceMerge::waitWhileFailPointEnabled() {
+ CurOpFailpointHelpers::waitWhileFailPointEnabled(
+ &hangWhileBuildingDocumentSourceMergeBatch,
+ pExpCtx->opCtx,
+ "hangWhileBuildingDocumentSourceMergeBatch",
+ []() {
+ log() << "Hanging aggregation due to 'hangWhileBuildingDocumentSourceMergeBatch' "
+ << "failpoint";
+ });
+}
+
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_merge.h b/src/mongo/db/pipeline/document_source_merge.h
index aafae386acf..811ebf97591 100644
--- a/src/mongo/db/pipeline/document_source_merge.h
+++ b/src/mongo/db/pipeline/document_source_merge.h
@@ -29,9 +29,8 @@
#pragma once
-#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_merge_gen.h"
-#include "mongo/db/pipeline/document_source_out.h"
+#include "mongo/db/pipeline/document_source_writer.h"
namespace mongo {
@@ -40,10 +39,8 @@ namespace mongo {
* this class must be initialized (via a constructor) with a 'MergeDescriptor', which defines a
* a particular merge strategy for a pair of 'whenMatched' and 'whenNotMatched' merge modes.
*/
-class DocumentSourceMerge final : public DocumentSource {
+class DocumentSourceMerge final : public DocumentSourceWriter<MongoProcessInterface::BatchObject> {
public:
- using BatchedObjects = MongoProcessInterface::BatchedObjects;
-
static constexpr StringData kStageName = "$merge"_sd;
// A descriptor for a merge strategy. Holds a merge strategy function and a set of actions
@@ -84,38 +81,12 @@ public:
}
};
- /**
- * Builds a new $merge stage which will merge all documents into 'outputNs'. If
- * 'targetCollectionVersion' is provided then processing will stop with an error if the
- * collection's epoch changes during the course of execution. This is used as a mechanism to
- * prevent the shard key from changing.
- */
- DocumentSourceMerge(NamespaceString outputNs,
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const MergeStrategyDescriptor& descriptor,
- boost::optional<BSONObj> letVariables,
- boost::optional<std::vector<BSONObj>> pipeline,
- std::set<FieldPath> mergeOnFields,
- boost::optional<ChunkVersion> targetCollectionVersion,
- bool serializeAsOutStage);
-
virtual ~DocumentSourceMerge() = default;
const char* getSourceName() const final override {
return kStageName.rawData();
}
- DepsTracker::State getDependencies(DepsTracker* deps) const final override {
- deps->needWholeDocument = true;
- return DepsTracker::State::EXHAUSTIVE_ALL;
- }
-
- GetModPathsReturn getModifiedPaths() const final override {
- // For purposes of tracking which fields come from where, this stage does not modify any
- // fields.
- return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>{}, {}};
- }
-
StageConstraints constraints(Pipeline::SplitState pipeState) const final override {
// A $merge to an unsharded collection should merge on the primary shard to perform local
// writes. A $merge to a sharded collection has no requirement, since each shard can perform
@@ -147,25 +118,12 @@ public:
if (pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _outputNs)) {
return boost::none;
}
- // {shardsStage, mergingStage, sortPattern}
- return DistributedPlanLogic{nullptr, this, boost::none};
- }
-
- bool canRunInParallelBeforeOut(
- const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const final override {
- // If someone is asking the question, this must be the $merge stage in question, so yes!
- return true;
+ return DocumentSourceWriter::distributedPlanLogic();
}
- GetNextResult getNext() final override;
-
Value serialize(
boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final override;
- const NamespaceString& getOutputNs() const {
- return _outputNs;
- }
-
/**
* Creates a new $merge stage from the given arguments.
*/
@@ -177,8 +135,7 @@ public:
boost::optional<BSONObj> letVariables,
boost::optional<std::vector<BSONObj>> pipeline,
std::set<FieldPath> mergeOnFields,
- boost::optional<ChunkVersion> targetCollectionVersion,
- bool serializeAsOutStage);
+ boost::optional<ChunkVersion> targetCollectionVersion);
/**
* Parses a $merge stage from the user-supplied BSON.
@@ -188,28 +145,23 @@ public:
private:
/**
- * Writes the documents in 'batch' to the output namespace.
+ * Builds a new $merge stage which will merge all documents into 'outputNs'. If
+ * 'targetCollectionVersion' is provided then processing will stop with an error if the
+ * collection's epoch changes during the course of execution. This is used as a mechanism to
+ * prevent the shard key from changing.
*/
- void spill(BatchedObjects&& batch) {
- OutStageWriteBlock writeBlock(pExpCtx->opCtx);
-
- try {
- auto targetEpoch = _targetCollectionVersion
- ? boost::optional<OID>(_targetCollectionVersion->epoch())
- : boost::none;
-
- _descriptor.strategy(pExpCtx, _outputNs, _writeConcern, targetEpoch, std::move(batch));
- } catch (const ExceptionFor<ErrorCodes::ImmutableField>& ex) {
- uassertStatusOKWithContext(ex.toStatus(),
- "$merge failed to update the matching document, did you "
- "attempt to modify the _id or the shard key?");
- }
- }
+ DocumentSourceMerge(NamespaceString outputNs,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const MergeStrategyDescriptor& descriptor,
+ boost::optional<BSONObj> letVariables,
+ boost::optional<std::vector<BSONObj>> pipeline,
+ std::set<FieldPath> mergeOnFields,
+ boost::optional<ChunkVersion> targetCollectionVersion);
/**
* Creates an UpdateModification object from the given 'doc' to be used with the batched update.
*/
- auto makeBatchUpdateModification(const Document& doc) {
+ auto makeBatchUpdateModification(const Document& doc) const {
return _pipeline ? write_ops::UpdateModification(*_pipeline)
: write_ops::UpdateModification(doc.toBson());
}
@@ -218,7 +170,7 @@ private:
* Resolves 'let' defined variables against the 'doc' and stores the results in the returned
* BSON.
*/
- boost::optional<BSONObj> resolveLetVariablesIfNeeded(const Document& doc) {
+ boost::optional<BSONObj> resolveLetVariablesIfNeeded(const Document& doc) const {
// When we resolve 'let' variables, an empty BSON object or boost::none won't make any
// difference at the end-point (in the PipelineExecutor), as in both cases we will end up
// with the update pipeline ExpressionContext not being populated with any variables, so we
@@ -234,18 +186,27 @@ private:
return bob.obj();
}
- // Stash the writeConcern of the original command as the operation context may change by the
- // time we start to spill $merge writes. This is because certain aggregations (e.g. $exchange)
- // establish cursors with batchSize 0 then run subsequent getMore's which use a new operation
- // context. The getMore's will not have an attached writeConcern however we still want to
- // respect the writeConcern of the original command.
- WriteConcernOptions _writeConcern;
+ void spill(BatchedObjects&& batch) override {
+ DocumentSourceWriteBlock writeBlock(pExpCtx->opCtx);
- const NamespaceString _outputNs;
- boost::optional<ChunkVersion> _targetCollectionVersion;
+ try {
+ auto targetEpoch = _targetCollectionVersion
+ ? boost::optional<OID>(_targetCollectionVersion->epoch())
+ : boost::none;
- bool _initialized = false;
- bool _done = false;
+ _descriptor.strategy(pExpCtx, _outputNs, _writeConcern, targetEpoch, std::move(batch));
+ } catch (const ExceptionFor<ErrorCodes::ImmutableField>& ex) {
+ uassertStatusOKWithContext(ex.toStatus(),
+ "$merge failed to update the matching document, did you "
+ "attempt to modify the _id or the shard key?");
+ }
+ }
+
+ void waitWhileFailPointEnabled() override;
+
+ std::pair<BatchObject, int> makeBatchObject(Document&& doc) const override;
+
+ boost::optional<ChunkVersion> _targetCollectionVersion;
// A merge descriptor contains a merge strategy function describing how to merge two
// collections, as well as some other metadata needed to perform the merge operation. This is
@@ -271,11 +232,6 @@ private:
// True if '_mergeOnFields' contains the _id. We store this as a separate boolean to avoid
// repeated lookups into the set.
bool _mergeOnFieldsIncludesId;
-
- // If true, display this stage in the explain output as an $out stage rather that $merge. This
- // is used when the $merge stage was used an alias for $out's 'insertDocuments' and
- // 'replaceDocuments' modes.
- bool _serializeAsOutStage;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_merge_test.cpp b/src/mongo/db/pipeline/document_source_merge_test.cpp
index f61afee8210..50e75e9d264 100644
--- a/src/mongo/db/pipeline/document_source_merge_test.cpp
+++ b/src/mongo/db/pipeline/document_source_merge_test.cpp
@@ -35,6 +35,7 @@
#include "mongo/db/pipeline/document.h"
#include "mongo/db/pipeline/document_source_merge.h"
#include "mongo/db/pipeline/document_value_test_util.h"
+#include "mongo/db/pipeline/process_interface_standalone.h"
namespace mongo {
namespace {
@@ -393,53 +394,6 @@ TEST_F(DocumentSourceMergeTest, FailsToParseIfOnFieldIsNotStringOrArrayOfStrings
ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, 51186);
}
-TEST_F(DocumentSourceMergeTest, FailsToParseIfOnFieldHasDuplicateFields) {
- auto spec = BSON("$merge" << BSON("into"
- << "target_collection"
- << "on"
- << BSON_ARRAY("_id"
- << "_id")));
- ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, ErrorCodes::BadValue);
-
- spec = BSON("$merge" << BSON("into"
- << "test"
- << "on"
- << BSON_ARRAY("x"
- << "y"
- << "x")));
- ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, ErrorCodes::BadValue);
-}
-
-TEST_F(DocumentSourceMergeTest, FailsToParseIfTargetCollectionVersionIsSpecifiedOnMongos) {
- auto spec = BSON("$merge" << BSON("into"
- << "target_collection"
- << "on"
- << "_id"
- << "targetCollectionVersion"
- << ChunkVersion(0, 0, OID::gen()).toBSON()));
- getExpCtx()->inMongos = true;
- ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, 51179);
-
- // Test that 'targetCollectionVersion' is accepted if _from_ mongos.
- getExpCtx()->inMongos = false;
- getExpCtx()->fromMongos = true;
- ASSERT(createMergeStage(spec) != nullptr);
-
- // Test that 'targetCollectionVersion' is not accepted if on mongod but not from mongos.
- getExpCtx()->inMongos = false;
- getExpCtx()->fromMongos = false;
- ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, 51123);
-}
-
-TEST_F(DocumentSourceMergeTest, FailsToParseIfOnFieldIsNotSentFromMongos) {
- auto spec = BSON("$merge" << BSON("into"
- << "target_collection"
- << "targetCollectionVersion"
- << ChunkVersion(0, 0, OID::gen()).toBSON()));
- getExpCtx()->fromMongos = true;
- ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, 51124);
-}
-
TEST_F(DocumentSourceMergeTest, CorrectlyUsesTargetDbThatMatchesAggregationDb) {
const auto targetDbSameAsAggregationDb = getExpCtx()->ns.db();
const auto targetColl = "target_collection";
diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp
index d9ddf73015d..ab340885632 100644
--- a/src/mongo/db/pipeline/document_source_out.cpp
+++ b/src/mongo/db/pipeline/document_source_out.cpp
@@ -31,454 +31,204 @@
#include "mongo/platform/basic.h"
+#include "mongo/db/pipeline/document_source_out.h"
+
+#include <fmt/format.h>
+
#include "mongo/db/curop_failpoint_helpers.h"
#include "mongo/db/ops/write_ops.h"
#include "mongo/db/pipeline/document_path_support.h"
-#include "mongo/db/pipeline/document_source_merge.h"
-#include "mongo/db/pipeline/document_source_out.h"
-#include "mongo/db/pipeline/document_source_out_gen.h"
-#include "mongo/db/pipeline/document_source_out_replace_coll.h"
-#include "mongo/util/fail_point_service.h"
+#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/util/destructor_guard.h"
#include "mongo/util/log.h"
namespace mongo {
+using namespace fmt::literals;
+
+static AtomicWord<unsigned> aggOutCounter;
MONGO_FAIL_POINT_DEFINE(hangWhileBuildingDocumentSourceOutBatch);
+REGISTER_DOCUMENT_SOURCE(out,
+ DocumentSourceOut::LiteParsed::parse,
+ DocumentSourceOut::createFromBson);
-using boost::intrusive_ptr;
-using std::vector;
+DocumentSourceOut::~DocumentSourceOut() {
+ DESTRUCTOR_GUARD(
+ // Make sure we drop the temp collection if anything goes wrong. Errors are ignored
+ // here because nothing can be done about them. Additionally, if this fails and the
+ // collection is left behind, it will be cleaned up next time the server is started.
+ if (_tempNs.size()) {
+ auto cleanupClient =
+ pExpCtx->opCtx->getServiceContext()->makeClient("$out_replace_coll_cleanup");
+ AlternativeClientRegion acr(cleanupClient);
+ // Create a new operation context so that any interrupts on the current operation will
+ // not affect the dropCollection operation below.
+ auto cleanupOpCtx = cc().makeOperationContext();
+
+ DocumentSourceWriteBlock writeBlock(cleanupOpCtx.get());
+
+ // Reset the operation context back to original once dropCollection is done.
+ ON_BLOCK_EXIT(
+ [this] { pExpCtx->mongoProcessInterface->setOperationContext(pExpCtx->opCtx); });
+
+ pExpCtx->mongoProcessInterface->setOperationContext(cleanupOpCtx.get());
+ pExpCtx->mongoProcessInterface->directClient()->dropCollection(_tempNs.ns());
+ });
+}
std::unique_ptr<DocumentSourceOut::LiteParsed> DocumentSourceOut::LiteParsed::parse(
const AggregationRequest& request, const BSONElement& spec) {
uassert(ErrorCodes::TypeMismatch,
- str::stream() << "$out stage requires a string or object argument, but found "
- << typeName(spec.type()),
- spec.type() == BSONType::String || spec.type() == BSONType::Object);
-
- NamespaceString targetNss;
- bool allowSharded;
- WriteModeEnum mode;
- if (spec.type() == BSONType::String) {
- targetNss = NamespaceString(request.getNamespaceString().db(), spec.valueStringData());
- allowSharded = false;
- mode = WriteModeEnum::kModeReplaceCollection;
- } else if (spec.type() == BSONType::Object) {
- auto outSpec =
- DocumentSourceOutSpec::parse(IDLParserErrorContext("$out"), spec.embeddedObject());
-
- if (auto targetDb = outSpec.getTargetDb()) {
- targetNss = NamespaceString(*targetDb, outSpec.getTargetCollection());
- } else {
- targetNss =
- NamespaceString(request.getNamespaceString().db(), outSpec.getTargetCollection());
- }
-
- mode = outSpec.getMode();
-
- // Sharded output collections are not allowed with mode "replaceCollection".
- allowSharded = mode != WriteModeEnum::kModeReplaceCollection;
- }
+ "{} stage requires a string argument, but found {}"_format(kStageName,
+ typeName(spec.type())),
+ spec.type() == BSONType::String);
+ NamespaceString targetNss{request.getNamespaceString().db(), spec.valueStringData()};
uassert(ErrorCodes::InvalidNamespace,
- str::stream() << "Invalid $out target namespace, " << targetNss.ns(),
+ "Invalid {} target namespace, {}"_format(kStageName, targetNss.ns()),
targetNss.isValid());
- // All modes require the "insert" action.
- ActionSet actions{ActionType::insert};
- switch (mode) {
- case WriteModeEnum::kModeReplaceCollection:
- actions.addAction(ActionType::remove);
- break;
- case WriteModeEnum::kModeReplaceDocuments:
- actions.addAction(ActionType::update);
- break;
- case WriteModeEnum::kModeInsertDocuments:
- // "insertDocuments" mode only requires the "insert" action.
- break;
- }
-
+ ActionSet actions{ActionType::insert, ActionType::remove};
if (request.shouldBypassDocumentValidation()) {
actions.addAction(ActionType::bypassDocumentValidation);
}
PrivilegeVector privileges{Privilege(ResourcePattern::forExactNamespace(targetNss), actions)};
- return stdx::make_unique<DocumentSourceOut::LiteParsed>(
- std::move(targetNss), std::move(privileges), allowSharded);
+ return stdx::make_unique<DocumentSourceOut::LiteParsed>(std::move(targetNss),
+ std::move(privileges));
}
-REGISTER_DOCUMENT_SOURCE(out,
- DocumentSourceOut::LiteParsed::parse,
- DocumentSourceOut::createFromBson);
+void DocumentSourceOut::initialize() {
+ DocumentSourceWriteBlock writeBlock(pExpCtx->opCtx);
-const char* DocumentSourceOut::getSourceName() const {
- return "$out";
-}
+ DBClientBase* conn = pExpCtx->mongoProcessInterface->directClient();
-namespace {
-/**
- * Parses the fields of the 'uniqueKey' from the user-specified 'obj' from the $out spec, returning
- * a set of field paths. Throws if 'obj' is invalid.
- */
-std::set<FieldPath> parseUniqueKeyFromSpec(const BSONObj& obj) {
- std::set<FieldPath> uniqueKey;
- for (const auto& elem : obj) {
- uassert(ErrorCodes::TypeMismatch,
- str::stream() << "All fields of $out uniqueKey must be the number 1, but '"
- << elem.fieldNameStringData()
- << "' is of type "
- << elem.type(),
- elem.isNumber());
-
- uassert(ErrorCodes::BadValue,
- str::stream() << "All fields of $out uniqueKey must be the number 1, but '"
- << elem.fieldNameStringData()
- << "' has the invalid value "
- << elem.numberDouble(),
- elem.numberDouble() == 1.0);
-
- const auto res = uniqueKey.insert(FieldPath(elem.fieldNameStringData()));
- uassert(ErrorCodes::BadValue,
- str::stream() << "Found a duplicate field '" << elem.fieldNameStringData()
- << "' in $out uniqueKey",
- res.second);
- }
+ const auto& outputNs = getOutputNs();
+ _tempNs = NamespaceString(str::stream() << outputNs.db() << ".tmp.agg_out."
+ << aggOutCounter.addAndFetch(1));
- uassert(ErrorCodes::InvalidOptions,
- "If explicitly specifying $out uniqueKey, must include at least one field",
- uniqueKey.size() > 0);
- return uniqueKey;
-}
+ // Save the original collection options and index specs so we can check they didn't change
+ // during computation.
+ _originalOutOptions = pExpCtx->mongoProcessInterface->getCollectionOptions(outputNs);
+ _originalIndexes = conn->getIndexSpecs(outputNs.ns());
-/**
- * Extracts the fields of 'uniqueKey' from 'doc' and returns the key as a BSONObj. Throws if any
- * field of the 'uniqueKey' extracted from 'doc' is nullish or an array.
- */
-BSONObj extractUniqueKeyFromDoc(const Document& doc, const std::set<FieldPath>& uniqueKey) {
- MutableDocument result;
- for (const auto& field : uniqueKey) {
- auto value = doc.getNestedField(field);
- uassert(50943,
- str::stream() << "$out write error: uniqueKey field '" << field.fullPath()
- << "' is an array in the document '"
- << doc.toString()
- << "'",
- !value.isArray());
- uassert(
- 50905,
- str::stream() << "$out write error: uniqueKey field '" << field.fullPath()
- << "' cannot be missing, null, undefined or an array. Full document: '"
- << doc.toString()
- << "'",
- !value.nullish());
- result.addField(field.fullPath(), std::move(value));
- }
- return result.freeze().toBson();
-}
+ // Check if it's capped to make sure we have a chance of succeeding before we do all the work.
+ // If the collection becomes capped during processing, the collection options will have changed,
+ // and the $out will fail.
+ uassert(17152,
+ "namespace '{}' is capped so it can't be used for {}"_format(outputNs.ns(), kStageName),
+ _originalOutOptions["capped"].eoo());
-void ensureUniqueKeyHasSupportingIndex(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& outputNs,
- const std::set<FieldPath>& uniqueKey,
- const BSONObj& userSpecifiedUniqueKey) {
- uassert(
- 50938,
- str::stream() << "Cannot find index to verify that $out's unique key will be unique: "
- << userSpecifiedUniqueKey,
- expCtx->mongoProcessInterface->uniqueKeyIsSupportedByIndex(expCtx, outputNs, uniqueKey));
-}
-} // namespace
+ // We will write all results into a temporary collection, then rename the temporary
+ // collection to be the target collection once we are done.
+ _tempNs = NamespaceString(str::stream() << outputNs.db() << ".tmp.agg_out."
+ << aggOutCounter.addAndFetch(1));
-DocumentSource::GetNextResult DocumentSourceOut::getNext() {
- pExpCtx->checkForInterrupt();
+ // Create temp collection, copying options from the existing output collection if any.
+ {
+ BSONObjBuilder cmd;
+ cmd << "create" << _tempNs.coll();
+ cmd << "temp" << true;
+ cmd.appendElementsUnique(_originalOutOptions);
- if (_done) {
- return GetNextResult::makeEOF();
+ BSONObj info;
+ uassert(16994,
+ "failed to create temporary {} collection '{}': {}"_format(
+ kStageName, _tempNs.ns(), getStatusFromCommandResult(info).reason()),
+ conn->runCommand(outputNs.db().toString(), cmd.done(), info));
}
- if (!_initialized) {
- // Explain of a $out should never try to actually execute any writes. We only ever expect
- // getNext() to be called for the 'executionStats' and 'allPlansExecution' explain modes.
- // This assertion should not be triggered for 'queryPlanner' explain of a $out, which is
- // perfectly legal.
- uassert(51029,
- str::stream() << "explain of $out is not allowed with verbosity: "
- << ExplainOptions::verbosityString(*pExpCtx->explain),
- !pExpCtx->explain);
-
- initializeWriteNs();
- _initialized = true;
+ if (_originalIndexes.empty()) {
+ return;
}
- BatchedObjects batch;
- int bufferedBytes = 0;
-
- auto nextInput = pSource->getNext();
- for (; nextInput.isAdvanced(); nextInput = pSource->getNext()) {
- // clang-format off
- CurOpFailpointHelpers::waitWhileFailPointEnabled(
- &hangWhileBuildingDocumentSourceOutBatch,
- pExpCtx->opCtx,
- "hangWhileBuildingDocumentSourceOutBatch",
- []() {
- log() << "Hanging aggregation due to 'hangWhileBuildingDocumentSourceOutBatch' "
- << "failpoint";
- });
- // clang-format on
-
- auto doc = nextInput.releaseDocument();
-
- // Generate an _id if the uniqueKey includes _id but the document doesn't have one.
- if (_uniqueKeyIncludesId && doc.getField("_id"_sd).missing()) {
- MutableDocument mutableDoc(std::move(doc));
- mutableDoc["_id"_sd] = Value(OID::gen());
- doc = mutableDoc.freeze();
- }
-
- // Extract the unique key before converting the document to BSON.
- auto uniqueKey = extractUniqueKeyFromDoc(doc, _uniqueKeyFields);
- auto insertObj = doc.toBson();
-
- bufferedBytes += insertObj.objsize();
- if (!batch.empty() &&
- (bufferedBytes > BSONObjMaxUserSize || batch.size() >= write_ops::kMaxWriteBatchSize)) {
- spill(std::move(batch));
- batch.clear();
- bufferedBytes = insertObj.objsize();
- }
- batch.emplace(std::move(insertObj), std::move(uniqueKey));
+ // Copy the indexes of the output collection to the temp collection.
+ std::vector<BSONObj> tempNsIndexes;
+ for (const auto& indexSpec : _originalIndexes) {
+ // Replace the spec's 'ns' field value, which is the original collection, with the temp
+ // collection.
+ tempNsIndexes.push_back(indexSpec.addField(BSON("ns" << _tempNs.ns()).firstElement()));
}
- if (!batch.empty()) {
- spill(std::move(batch));
- batch.clear();
+ try {
+ conn->createIndexes(_tempNs.ns(), tempNsIndexes);
+ } catch (DBException& ex) {
+ ex.addContext("Copying indexes for $out failed");
+ throw;
}
+}
- switch (nextInput.getStatus()) {
- case GetNextResult::ReturnStatus::kAdvanced: {
- MONGO_UNREACHABLE; // We consumed all advances above.
- }
- case GetNextResult::ReturnStatus::kPauseExecution: {
- return nextInput; // Propagate the pause.
- }
- case GetNextResult::ReturnStatus::kEOF: {
-
- finalize();
- _done = true;
-
- // $out doesn't currently produce any outputs.
- return nextInput;
- }
- }
- MONGO_UNREACHABLE;
+void DocumentSourceOut::finalize() {
+ DocumentSourceWriteBlock writeBlock(pExpCtx->opCtx);
+
+ const auto& outputNs = getOutputNs();
+ auto renameCommandObj =
+ BSON("renameCollection" << _tempNs.ns() << "to" << outputNs.ns() << "dropTarget" << true);
+
+ pExpCtx->mongoProcessInterface->renameIfOptionsAndIndexesHaveNotChanged(
+ pExpCtx->opCtx, renameCommandObj, outputNs, _originalOutOptions, _originalIndexes);
+
+ // The rename succeeded, so the temp collection no longer exists.
+ _tempNs = {};
}
-intrusive_ptr<DocumentSource> DocumentSourceOut::create(
- NamespaceString outputNs,
- const intrusive_ptr<ExpressionContext>& expCtx,
- WriteModeEnum mode,
- std::set<FieldPath> uniqueKey,
- boost::optional<ChunkVersion> targetCollectionVersion) {
+boost::intrusive_ptr<DocumentSource> DocumentSourceOut::create(
+ NamespaceString outputNs, const boost::intrusive_ptr<ExpressionContext>& expCtx) {
// TODO (SERVER-36832): Allow this combination.
- uassert(
- 50939,
- str::stream() << "$out with mode " << WriteMode_serializer(mode)
- << " is not supported when the output collection is in a different database",
- !(mode == WriteModeEnum::kModeReplaceCollection && outputNs.db() != expCtx->ns.db()));
-
- uassert(50992,
- str::stream() << "$out with mode " << WriteMode_serializer(mode)
- << " is not supported when the output collection is the same as the"
- << " aggregation collection",
- mode == WriteModeEnum::kModeReplaceCollection || expCtx->ns != outputNs);
+ uassert(50939,
+ "{} is not supported when the output collection is in a different "
+ "database"_format(kStageName),
+ outputNs.db() == expCtx->ns.db());
uassert(ErrorCodes::OperationNotSupportedInTransaction,
- "$out cannot be used in a transaction",
+ "{} cannot be used in a transaction"_format(kStageName),
!expCtx->inMultiDocumentTransaction);
auto readConcernLevel = repl::ReadConcernArgs::get(expCtx->opCtx).getLevel();
uassert(ErrorCodes::InvalidOptions,
- "$out cannot be used with a 'linearizable' read concern level",
+ "{} cannot be used with a 'linearizable' read concern level"_format(kStageName),
readConcernLevel != repl::ReadConcernLevel::kLinearizableReadConcern);
- // Although we perform a check for "replaceCollection" mode with a sharded output collection
- // during lite parsing, we need to do it here as well in case mongos is stale or the command is
- // sent directly to the shard.
- if (mode == WriteModeEnum::kModeReplaceCollection) {
- uassert(17017,
- str::stream() << "$out with mode " << WriteMode_serializer(mode)
- << " is not supported to an existing *sharded* output collection.",
- !expCtx->mongoProcessInterface->isSharded(expCtx->opCtx, outputNs));
- }
- uassert(17385, "Can't $out to special collection: " + outputNs.coll(), !outputNs.isSpecial());
-
- switch (mode) {
- case WriteModeEnum::kModeReplaceCollection:
- return new DocumentSourceOutReplaceColl(
- std::move(outputNs), expCtx, mode, std::move(uniqueKey), targetCollectionVersion);
- case WriteModeEnum::kModeInsertDocuments:
- return DocumentSourceMerge::create(std::move(outputNs),
- expCtx,
- MergeWhenMatchedModeEnum::kFail,
- MergeWhenNotMatchedModeEnum::kInsert,
- boost::none, /* no variables */
- boost::none, /* no custom pipeline */
- std::move(uniqueKey),
- targetCollectionVersion,
- true /* serialize as $out stage */);
- case WriteModeEnum::kModeReplaceDocuments:
- return DocumentSourceMerge::create(std::move(outputNs),
- expCtx,
- MergeWhenMatchedModeEnum::kReplace,
- MergeWhenNotMatchedModeEnum::kInsert,
- boost::none, /* no variables */
- boost::none, /* no custom pipeline */
- std::move(uniqueKey),
- targetCollectionVersion,
- true /* serialize as $out stage */);
- default:
- MONGO_UNREACHABLE;
- }
-}
+ uassert(17017,
+ "{} is not supported to an existing *sharded* output collection"_format(kStageName),
+ !expCtx->mongoProcessInterface->isSharded(expCtx->opCtx, outputNs));
-DocumentSourceOut::DocumentSourceOut(NamespaceString outputNs,
- const intrusive_ptr<ExpressionContext>& expCtx,
- WriteModeEnum mode,
- std::set<FieldPath> uniqueKey,
- boost::optional<ChunkVersion> targetCollectionVersion)
- : DocumentSource(expCtx),
- _writeConcern(expCtx->opCtx->getWriteConcern()),
- _outputNs(std::move(outputNs)),
- _targetCollectionVersion(targetCollectionVersion),
- _done(false),
- _mode(mode),
- _uniqueKeyFields(std::move(uniqueKey)),
- _uniqueKeyIncludesId(_uniqueKeyFields.count("_id") == 1) {}
-
-intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson(
- BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) {
-
- auto mode = WriteModeEnum::kModeReplaceCollection;
- std::set<FieldPath> uniqueKey;
- NamespaceString outputNs;
- boost::optional<ChunkVersion> targetCollectionVersion;
- if (elem.type() == BSONType::String) {
- outputNs = NamespaceString(expCtx->ns.db().toString() + '.' + elem.str());
- uniqueKey.emplace("_id");
- } else if (elem.type() == BSONType::Object) {
- auto spec =
- DocumentSourceOutSpec::parse(IDLParserErrorContext("$out"), elem.embeddedObject());
- mode = spec.getMode();
-
- // Retrieve the target database from the user command, otherwise use the namespace from the
- // expression context.
- auto dbName = spec.getTargetDb() ? *spec.getTargetDb() : expCtx->ns.db();
- outputNs = NamespaceString(dbName, spec.getTargetCollection());
-
- std::tie(uniqueKey, targetCollectionVersion) = expCtx->inMongos
- ? resolveUniqueKeyOnMongoS(expCtx, spec, outputNs)
- : resolveUniqueKeyOnMongoD(expCtx, spec, outputNs);
- } else {
- uasserted(16990,
- str::stream() << "$out only supports a string or object argument, not "
- << typeName(elem.type()));
- }
+ uassert(17385,
+ "Can't {} to special collection: {}"_format(kStageName, outputNs.coll()),
+ !outputNs.isSpecial());
- return create(std::move(outputNs), expCtx, mode, std::move(uniqueKey), targetCollectionVersion);
+ return new DocumentSourceOut(std::move(outputNs), expCtx);
}
-std::pair<std::set<FieldPath>, boost::optional<ChunkVersion>>
-DocumentSourceOut::resolveUniqueKeyOnMongoD(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const DocumentSourceOutSpec& spec,
- const NamespaceString& outputNs) {
- invariant(!expCtx->inMongos);
- auto targetCollectionVersion = spec.getTargetCollectionVersion();
- if (targetCollectionVersion) {
- uassert(51018, "Unexpected target chunk version specified", expCtx->fromMongos);
- // If mongos has sent us a target shard version, we need to be sure we are prepared to
- // act as a router which is at least as recent as that mongos.
- expCtx->mongoProcessInterface->checkRoutingInfoEpochOrThrow(
- expCtx, outputNs, *targetCollectionVersion);
- }
-
- auto userSpecifiedUniqueKey = spec.getUniqueKey();
- if (!userSpecifiedUniqueKey) {
- uassert(51017, "Expected uniqueKey to be provided from mongos", !expCtx->fromMongos);
- return {std::set<FieldPath>{"_id"}, targetCollectionVersion};
- }
+boost::intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) {
+ uassert(16990,
+ "{} only supports a string argument, but found {}"_format(kStageName,
+ typeName(elem.type())),
+ elem.type() == BSONType::String);
- // Make sure the uniqueKey has a supporting index. Skip this check if the command is sent
- // from mongos since the uniqueKey check would've happened already.
- auto uniqueKey = parseUniqueKeyFromSpec(userSpecifiedUniqueKey.get());
- if (!expCtx->fromMongos) {
- ensureUniqueKeyHasSupportingIndex(expCtx, outputNs, uniqueKey, *userSpecifiedUniqueKey);
- }
- return {uniqueKey, targetCollectionVersion};
+ return create({expCtx->ns.db(), elem.str()}, expCtx);
}
-std::pair<std::set<FieldPath>, boost::optional<ChunkVersion>>
-DocumentSourceOut::resolveUniqueKeyOnMongoS(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const DocumentSourceOutSpec& spec,
- const NamespaceString& outputNs) {
- invariant(expCtx->inMongos);
- uassert(50984,
- "$out received unexpected 'targetCollectionVersion' on mongos",
- !spec.getTargetCollectionVersion());
-
- if (auto userSpecifiedUniqueKey = spec.getUniqueKey()) {
- // Convert unique key object to a vector of FieldPaths.
- auto uniqueKey = parseUniqueKeyFromSpec(userSpecifiedUniqueKey.get());
- ensureUniqueKeyHasSupportingIndex(expCtx, outputNs, uniqueKey, *userSpecifiedUniqueKey);
-
- // If the user supplies the uniqueKey we don't need to attach a ChunkVersion for the shards
- // since we are not at risk of 'guessing' the wrong shard key.
- return {uniqueKey, boost::none};
- }
+Value DocumentSourceOut::serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
+ massert(17000,
+ "{} shouldn't have different db than input"_format(kStageName),
+ _outputNs.db() == pExpCtx->ns.db());
- // In case there are multiple shards which will perform this $out in parallel, we need to figure
- // out and attach the collection's shard version to ensure each shard is talking about the same
- // version of the collection. This mongos will coordinate that. We force a catalog refresh to do
- // so because there is no shard versioning protocol on this namespace and so we otherwise could
- // not be sure this node is (or will be come) at all recent. We will also figure out and attach
- // the uniqueKey to send to the shards. We don't need to do this for 'replaceCollection' mode
- // since that mode cannot currently target a sharded collection.
-
- // There are cases where the aggregation could fail if the collection is dropped or re-created
- // during or near the time of the aggregation. This is okay - we are mostly paranoid that this
- // mongos is very stale and want to prevent returning an error if the collection was dropped a
- // long time ago. Because of this, we are okay with piggy-backing off another thread's request
- // to refresh the cache, simply waiting for that request to return instead of forcing another
- // refresh.
- boost::optional<ChunkVersion> targetCollectionVersion =
- spec.getMode() == WriteModeEnum::kModeReplaceCollection
- ? boost::none
- : expCtx->mongoProcessInterface->refreshAndGetCollectionVersion(expCtx, outputNs);
-
- auto docKeyPaths = expCtx->mongoProcessInterface->collectDocumentKeyFieldsActingAsRouter(
- expCtx->opCtx, outputNs);
- return {std::set<FieldPath>(std::make_move_iterator(docKeyPaths.begin()),
- std::make_move_iterator(docKeyPaths.end())),
- targetCollectionVersion};
+ return Value(DOC(getSourceName() << _outputNs.coll()));
}
-Value DocumentSourceOut::serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
- DocumentSourceOutSpec spec;
- spec.setTargetDb(_outputNs.db());
- spec.setTargetCollection(_outputNs.coll());
- spec.setMode(_mode);
- spec.setUniqueKey([&]() {
- BSONObjBuilder uniqueKeyBob;
- for (auto path : _uniqueKeyFields) {
- uniqueKeyBob.append(path.fullPath(), 1);
- }
- return uniqueKeyBob.obj();
- }());
- spec.setTargetCollectionVersion(_targetCollectionVersion);
- return Value(Document{{getSourceName(), spec.toBSON()}});
+void DocumentSourceOut::waitWhileFailPointEnabled() {
+ CurOpFailpointHelpers::waitWhileFailPointEnabled(
+ &hangWhileBuildingDocumentSourceOutBatch,
+ pExpCtx->opCtx,
+ "hangWhileBuildingDocumentSourceOutBatch",
+ []() {
+ log() << "Hanging aggregation due to 'hangWhileBuildingDocumentSourceOutBatch' "
+ << "failpoint";
+ });
}
-DepsTracker::State DocumentSourceOut::getDependencies(DepsTracker* deps) const {
- deps->needWholeDocument = true;
- return DepsTracker::State::EXHAUSTIVE_ALL;
-}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h
index bfb9bb08771..7efb8450aed 100644
--- a/src/mongo/db/pipeline/document_source_out.h
+++ b/src/mongo/db/pipeline/document_source_out.h
@@ -29,46 +29,13 @@
#pragma once
-#include "mongo/db/db_raii.h"
-#include "mongo/db/pipeline/document_source.h"
-#include "mongo/db/pipeline/document_source_out_gen.h"
-#include "mongo/db/write_concern_options.h"
-#include "mongo/s/chunk_version.h"
+#include "mongo/db/pipeline/document_source_writer.h"
namespace mongo {
-
-/**
- * Manipulates the state of the OperationContext so that while this object is in scope, reads and
- * writes will use a local read concern and see the latest version of the data. It will also reset
- * ignore_prepared on the recovery unit so that any reads or writes will block on a conflict with a
- * prepared transaction. Resets the OperationContext back to its original state upon destruction.
- */
-class OutStageWriteBlock {
- OperationContext* _opCtx;
- repl::ReadConcernArgs _originalArgs;
- RecoveryUnit::ReadSource _originalSource;
- EnforcePrepareConflictsBlock _enforcePrepareConflictsBlock;
-
-public:
- OutStageWriteBlock(OperationContext* opCtx)
- : _opCtx(opCtx), _enforcePrepareConflictsBlock(opCtx) {
- _originalArgs = repl::ReadConcernArgs::get(_opCtx);
- _originalSource = _opCtx->recoveryUnit()->getTimestampReadSource();
-
- repl::ReadConcernArgs::get(_opCtx) = repl::ReadConcernArgs();
- _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::kUnset);
- }
-
- ~OutStageWriteBlock() {
- repl::ReadConcernArgs::get(_opCtx) = _originalArgs;
- _opCtx->recoveryUnit()->setTimestampReadSource(_originalSource);
- }
-};
-
/**
- * Abstract class for the $out aggregation stage.
+ * Implementation for the $out aggregation stage.
*/
-class DocumentSourceOut : public DocumentSource {
+class DocumentSourceOut final : public DocumentSourceWriter<BSONObj> {
public:
static constexpr StringData kStageName = "$out"_sd;
@@ -78,172 +45,46 @@ public:
*/
class LiteParsed final : public LiteParsedDocumentSourceForeignCollections {
public:
+ using LiteParsedDocumentSourceForeignCollections::
+ LiteParsedDocumentSourceForeignCollections;
+
+
static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request,
const BSONElement& spec);
- LiteParsed(NamespaceString outNss, PrivilegeVector privileges, bool allowShardedOutNss)
- : LiteParsedDocumentSourceForeignCollections(outNss, privileges),
- _allowShardedOutNss(allowShardedOutNss) {}
-
bool allowShardedForeignCollection(NamespaceString nss) const final {
- return _allowShardedOutNss ? true : (_foreignNssSet.find(nss) == _foreignNssSet.end());
+ return _foreignNssSet.find(nss) == _foreignNssSet.end();
}
bool allowedToPassthroughFromMongos() const final {
- // Do not allow passthrough from mongos even if the source collection is unsharded. This
- // ensures that the unique index verification happens once on mongos and can be bypassed
- // on the shards.
return false;
}
-
- private:
- bool _allowShardedOutNss;
};
- /**
- * Builds a new $out stage which will spill all documents into 'outputNs' as inserts. If
- * 'targetCollectionVersion' is provided then processing will stop with an error if the
- * collection's epoch changes during the course of execution. This is used as a mechanism to
- * prevent the shard key from changing.
- */
- DocumentSourceOut(NamespaceString outputNs,
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- WriteModeEnum mode,
- std::set<FieldPath> uniqueKey,
- boost::optional<ChunkVersion> targetCollectionVersion);
+ ~DocumentSourceOut() override;
- virtual ~DocumentSourceOut() = default;
-
- GetNextResult getNext() final;
- const char* getSourceName() const final;
- Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
- DepsTracker::State getDependencies(DepsTracker* deps) const final;
- /**
- * For purposes of tracking which fields come from where, this stage does not modify any fields.
- */
- GetModPathsReturn getModifiedPaths() const final {
- return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>{}, {}};
+ const char* getSourceName() const final override {
+ return kStageName.rawData();
}
- StageConstraints constraints(Pipeline::SplitState pipeState) const final {
- // A $out to an unsharded collection should merge on the primary shard to perform local
- // writes. A $out to a sharded collection has no requirement, since each shard can perform
- // its own portion of the write. We use 'kAnyShard' to direct it to execute on one of the
- // shards in case some of the writes happen to end up being local.
- //
- // Note that this decision is inherently racy and subject to become stale. This is okay
- // because either choice will work correctly, we are simply applying a heuristic
- // optimization.
- auto hostTypeRequirement = HostTypeRequirement::kPrimaryShard;
- if (pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _outputNs) &&
- _mode != WriteModeEnum::kModeReplaceCollection) {
- hostTypeRequirement = HostTypeRequirement::kAnyShard;
- }
+ StageConstraints constraints(Pipeline::SplitState pipeState) const final override {
return {StreamType::kStreaming,
PositionRequirement::kLast,
- hostTypeRequirement,
+ HostTypeRequirement::kPrimaryShard,
DiskUseRequirement::kWritesPersistentData,
FacetRequirement::kNotAllowed,
TransactionRequirement::kNotAllowed,
LookupRequirement::kNotAllowed};
}
- const NamespaceString& getOutputNs() const {
- return _outputNs;
- }
-
- WriteModeEnum getMode() const {
- return _mode;
- }
-
- boost::optional<DistributedPlanLogic> distributedPlanLogic() final {
- // It should always be faster to avoid splitting the pipeline if the output collection is
- // sharded. If we avoid splitting the pipeline then each shard can perform the writes to the
- // target collection in parallel.
- //
- // Note that this decision is inherently racy and subject to become stale. This is okay
- // because either choice will work correctly, we are simply applying a heuristic
- // optimization.
- if (pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _outputNs)) {
- return boost::none;
- }
- // {shardsStage, mergingStage, sortPattern}
- return DistributedPlanLogic{nullptr, this, boost::none};
- }
-
- virtual bool canRunInParallelBeforeOut(
- const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const final {
- // If someone is asking the question, this must be the $out stage in question, so yes!
- return true;
- }
-
-
- /**
- * Retrieves the namespace to direct each batch to, which may be a temporary namespace or the
- * final output namespace.
- */
- virtual const NamespaceString& getWriteNs() const = 0;
-
- /**
- * Prepares the DocumentSource to be able to write incoming batches to the desired collection.
- */
- virtual void initializeWriteNs() = 0;
-
- /**
- * Storage for a batch of BSON Objects to be inserted/updated to the write namespace. The
- * extracted unique key values are also stored in a batch, used by $out with mode
- * "replaceDocuments" as the query portion of the update.
- *
- */
- struct BatchedObjects {
- void emplace(BSONObj&& obj, BSONObj&& key) {
- objects.emplace_back(std::move(obj));
- uniqueKeys.emplace_back(std::move(key));
- }
-
- bool empty() const {
- return objects.empty();
- }
-
- size_t size() const {
- return objects.size();
- }
-
- void clear() {
- objects.clear();
- uniqueKeys.clear();
- }
-
- std::vector<BSONObj> objects;
- // Store the unique keys as BSON objects instead of Documents for compatibility with the
- // batch update command. (e.g. {q: <array of uniqueKeys>, u: <array of objects>})
- std::vector<BSONObj> uniqueKeys;
- };
-
- /**
- * Writes the documents in 'batch' to the write namespace.
- */
- virtual void spill(BatchedObjects&& batch) {
- OutStageWriteBlock writeBlock(pExpCtx->opCtx);
-
- pExpCtx->mongoProcessInterface->insert(
- pExpCtx, getWriteNs(), std::move(batch.objects), _writeConcern, _targetEpoch());
- };
-
- /**
- * Finalize the output collection, called when there are no more documents to write.
- */
- virtual void finalize() = 0;
+ Value serialize(
+ boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final override;
/**
* Creates a new $out stage from the given arguments.
*/
static boost::intrusive_ptr<DocumentSource> create(
- NamespaceString outputNs,
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- WriteModeEnum,
- std::set<FieldPath> uniqueKey = std::set<FieldPath>{"_id"},
- boost::optional<ChunkVersion> targetCollectionVersion = boost::none);
+ NamespaceString outputNs, const boost::intrusive_ptr<ExpressionContext>& expCtx);
/**
* Parses a $out stage from the user-supplied BSON.
@@ -251,59 +92,37 @@ public:
static boost::intrusive_ptr<DocumentSource> createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
-protected:
- // Stash the writeConcern of the original command as the operation context may change by the
- // time we start to spill $out writes. This is because certain aggregations (e.g. $exchange)
- // establish cursors with batchSize 0 then run subsequent getMore's which use a new operation
- // context. The getMore's will not have an attached writeConcern however we still want to
- // respect the writeConcern of the original command.
- WriteConcernOptions _writeConcern;
+private:
+ DocumentSourceOut(NamespaceString outputNs,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx)
+ : DocumentSourceWriter(std::move(outputNs), expCtx) {}
- const NamespaceString _outputNs;
- boost::optional<ChunkVersion> _targetCollectionVersion;
+ void initialize() override;
- boost::optional<OID> _targetEpoch() {
- return _targetCollectionVersion ? boost::optional<OID>(_targetCollectionVersion->epoch())
- : boost::none;
- }
+ void finalize() override;
-private:
- /**
- * If 'spec' does not specify a uniqueKey, uses the sharding catalog to pick a default key of
- * the shard key + _id. Returns a pair of the uniqueKey (either from the spec or generated), and
- * an optional ChunkVersion, populated with the version stored in the sharding catalog when we
- * asked for the shard key.
- */
- static std::pair<std::set<FieldPath>, boost::optional<ChunkVersion>> resolveUniqueKeyOnMongoS(
- const boost::intrusive_ptr<ExpressionContext>&,
- const DocumentSourceOutSpec& spec,
- const NamespaceString& outputNs);
+ void spill(BatchedObjects&& batch) override {
+ DocumentSourceWriteBlock writeBlock(pExpCtx->opCtx);
- /**
- * Ensures that 'spec' contains a uniqueKey which has a supporting index - either because the
- * uniqueKey was sent from mongos or because there is a corresponding unique index. Returns the
- * target ChunkVersion already attached to 'spec', but verifies that this node's cached routing
- * table agrees on the epoch for that version before returning. Throws a StaleConfigException if
- * not.
- */
- static std::pair<std::set<FieldPath>, boost::optional<ChunkVersion>> resolveUniqueKeyOnMongoD(
- const boost::intrusive_ptr<ExpressionContext>&,
- const DocumentSourceOutSpec& spec,
- const NamespaceString& outputNs);
+ auto targetEpoch = boost::none;
+ pExpCtx->mongoProcessInterface->insert(
+ pExpCtx, _tempNs, std::move(batch), _writeConcern, targetEpoch);
+ }
- bool _initialized = false;
- bool _done = false;
+ std::pair<BSONObj, int> makeBatchObject(Document&& doc) const override {
+ auto obj = doc.toBson();
+ return {obj, obj.objsize()};
+ }
- WriteModeEnum _mode;
+ void waitWhileFailPointEnabled() override;
- // Holds the unique key used for uniquely identifying documents. There must exist a unique index
- // with this key pattern (up to order). Default is "_id" for unsharded collections, and "_id"
- // plus the shard key for sharded collections.
- std::set<FieldPath> _uniqueKeyFields;
+ // Holds on to the original collection options and index specs so we can check they didn't
+ // change during computation.
+ BSONObj _originalOutOptions;
+ std::list<BSONObj> _originalIndexes;
- // True if '_uniqueKeyFields' contains the _id. We store this as a separate boolean to avoid
- // repeated lookups into the set.
- bool _uniqueKeyIncludesId;
+ // The temporary namespace for the $out writes.
+ NamespaceString _tempNs;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_out.idl b/src/mongo/db/pipeline/document_source_out.idl
deleted file mode 100644
index a0b3407c46e..00000000000
--- a/src/mongo/db/pipeline/document_source_out.idl
+++ /dev/null
@@ -1,84 +0,0 @@
-# Copyright (C) 2018-present MongoDB, Inc.
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the Server Side Public License, version 1,
-# as published by MongoDB, Inc.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# Server Side Public License for more details.
-#
-# You should have received a copy of the Server Side Public License
-# along with this program. If not, see
-# <http://www.mongodb.com/licensing/server-side-public-license>.
-#
-# As a special exception, the copyright holders give permission to link the
-# code of portions of this program with the OpenSSL library under certain
-# conditions as described in each individual source file and distribute
-# linked combinations including the program with the OpenSSL library. You
-# must comply with the Server Side Public License in all respects for
-# all of the code used other than as permitted herein. If you modify file(s)
-# with this exception, you may extend this exception to your version of the
-# file(s), but you are not obligated to do so. If you do not wish to do so,
-# delete this exception statement from your version. If you delete this
-# exception statement from all source files in the program, then also delete
-# it in the license file.
-#
-
-# Document source out stage IDL file
-
-global:
- cpp_namespace: "mongo"
- cpp_includes:
- - "mongo/s/chunk_version.h"
-
-imports:
- - "mongo/idl/basic_types.idl"
- - "mongo/s/chunk_version.idl"
-
-enums:
- WriteMode:
- description: "Possible write mode values."
- type: string
- values:
- kModeReplaceCollection: "replaceCollection"
- kModeInsertDocuments: "insertDocuments"
- kModeReplaceDocuments: "replaceDocuments"
-
-structs:
- DocumentSourceOutSpec:
- description: A document used to specify the $out stage of an aggregation pipeline.
- strict: true
- fields:
- to:
- cpp_name: targetCollection
- type: string
- description: Name of the target collection.
-
- db:
- cpp_name: targetDb
- type: string
- optional: true
- description: Name of the target database, defaults to the database of the
- aggregation.
-
- mode:
- cpp_name: mode
- type: WriteMode
- description: The write mode for the output operation.
-
- uniqueKey:
- cpp_name: uniqueKey
- type: object
- optional: true
- description: Document of fields representing the unique key.
-
- targetCollectionVersion:
- type: ChunkVersion
- optional: true
- description: If set, the collection's ChunkVersion found when parsed on mongos. Can
- be used to check if a collection has since been dropped and re-created,
- in which case the shard key may have changed. As of this writing, this
- also can be used to detect if the collection has gone from unsharded to
- sharded, and thus now has a shard key.
diff --git a/src/mongo/db/pipeline/document_source_out_replace_coll.cpp b/src/mongo/db/pipeline/document_source_out_replace_coll.cpp
deleted file mode 100644
index 4c7bf19d1fd..00000000000
--- a/src/mongo/db/pipeline/document_source_out_replace_coll.cpp
+++ /dev/null
@@ -1,139 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/pipeline/document_source_out_replace_coll.h"
-
-#include "mongo/rpc/get_status_from_command_result.h"
-
-namespace mongo {
-
-static AtomicWord<unsigned> aggOutCounter;
-
-DocumentSourceOutReplaceColl::~DocumentSourceOutReplaceColl() {
- DESTRUCTOR_GUARD(
- // Make sure we drop the temp collection if anything goes wrong. Errors are ignored
- // here because nothing can be done about them. Additionally, if this fails and the
- // collection is left behind, it will be cleaned up next time the server is started.
- if (_tempNs.size()) {
- auto cleanupClient =
- pExpCtx->opCtx->getServiceContext()->makeClient("$out_replace_coll_cleanup");
- AlternativeClientRegion acr(cleanupClient);
- // Create a new operation context so that any interrputs on the current operation will
- // not affect the dropCollection operation below.
- auto cleanupOpCtx = cc().makeOperationContext();
-
- OutStageWriteBlock writeBlock(cleanupOpCtx.get());
-
- // Reset the operation context back to original once dropCollection is done.
- ON_BLOCK_EXIT(
- [this] { pExpCtx->mongoProcessInterface->setOperationContext(pExpCtx->opCtx); });
-
- pExpCtx->mongoProcessInterface->setOperationContext(cleanupOpCtx.get());
- pExpCtx->mongoProcessInterface->directClient()->dropCollection(_tempNs.ns());
- });
-}
-
-void DocumentSourceOutReplaceColl::initializeWriteNs() {
- OutStageWriteBlock writeBlock(pExpCtx->opCtx);
-
- DBClientBase* conn = pExpCtx->mongoProcessInterface->directClient();
-
- const auto& outputNs = getOutputNs();
- _tempNs = NamespaceString(str::stream() << outputNs.db() << ".tmp.agg_out."
- << aggOutCounter.addAndFetch(1));
-
- // Save the original collection options and index specs so we can check they didn't change
- // during computation.
- _originalOutOptions = pExpCtx->mongoProcessInterface->getCollectionOptions(outputNs);
- _originalIndexes = conn->getIndexSpecs(outputNs.ns());
-
- // Check if it's capped to make sure we have a chance of succeeding before we do all the work.
- // If the collection becomes capped during processing, the collection options will have changed,
- // and the $out will fail.
- uassert(17152,
- str::stream() << "namespace '" << outputNs.ns()
- << "' is capped so it can't be used for $out",
- _originalOutOptions["capped"].eoo());
-
- // We will write all results into a temporary collection, then rename the temporary
- // collection to be the target collection once we are done.
- _tempNs = NamespaceString(str::stream() << outputNs.db() << ".tmp.agg_out."
- << aggOutCounter.addAndFetch(1));
-
- // Create temp collection, copying options from the existing output collection if any.
- {
- BSONObjBuilder cmd;
- cmd << "create" << _tempNs.coll();
- cmd << "temp" << true;
- cmd.appendElementsUnique(_originalOutOptions);
-
- BSONObj info;
- uassert(16994,
- str::stream() << "failed to create temporary $out collection '" << _tempNs.ns()
- << "': "
- << getStatusFromCommandResult(info).reason(),
- conn->runCommand(outputNs.db().toString(), cmd.done(), info));
- }
-
- if (_originalIndexes.empty()) {
- return;
- }
-
- // Copy the indexes of the output collection to the temp collection.
- std::vector<BSONObj> tempNsIndexes;
- for (const auto& indexSpec : _originalIndexes) {
- // Replace the spec's 'ns' field value, which is the original collection, with the temp
- // collection.
- tempNsIndexes.push_back(indexSpec.addField(BSON("ns" << _tempNs.ns()).firstElement()));
- }
- try {
- conn->createIndexes(_tempNs.ns(), tempNsIndexes);
- } catch (DBException& ex) {
- ex.addContext("Copying indexes for $out failed");
- throw;
- }
-};
-
-void DocumentSourceOutReplaceColl::finalize() {
- OutStageWriteBlock writeBlock(pExpCtx->opCtx);
-
- const auto& outputNs = getOutputNs();
- auto renameCommandObj =
- BSON("renameCollection" << _tempNs.ns() << "to" << outputNs.ns() << "dropTarget" << true);
-
- pExpCtx->mongoProcessInterface->renameIfOptionsAndIndexesHaveNotChanged(
- pExpCtx->opCtx, renameCommandObj, outputNs, _originalOutOptions, _originalIndexes);
-
- // The rename succeeded, so the temp collection no longer exists.
- _tempNs = {};
-};
-
-} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_out_replace_coll.h b/src/mongo/db/pipeline/document_source_out_replace_coll.h
deleted file mode 100644
index db0dde23ec0..00000000000
--- a/src/mongo/db/pipeline/document_source_out_replace_coll.h
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include "mongo/db/pipeline/document_source_out.h"
-#include "mongo/util/destructor_guard.h"
-
-namespace mongo {
-
-/**
- * Version of $out which directs writes to a temporary collection, then renames the temp collection
- * to the target collection with the 'dropTarget' option set to true.
- */
-class DocumentSourceOutReplaceColl final : public DocumentSourceOut {
-public:
- using DocumentSourceOut::DocumentSourceOut;
-
- ~DocumentSourceOutReplaceColl();
-
- /**
- * Sets up a temp collection which contains the same indexes and options as the output
- * collection. All writes will be directed to the temp collection.
- */
- void initializeWriteNs() final;
-
- /**
- * Renames the temp collection to the output collection with the 'dropTarget' option set to
- * true.
- */
- void finalize() final;
-
- const NamespaceString& getWriteNs() const final {
- return _tempNs;
- };
-
-private:
- // Holds on to the original collection options and index specs so we can check they didn't
- // change during computation.
- BSONObj _originalOutOptions;
- std::list<BSONObj> _originalIndexes;
-
- // The temporary namespace for the $out writes.
- NamespaceString _tempNs;
-};
-
-} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_out_test.cpp b/src/mongo/db/pipeline/document_source_out_test.cpp
index 6105b2cf84d..69aa6105a72 100644
--- a/src/mongo/db/pipeline/document_source_out_test.cpp
+++ b/src/mongo/db/pipeline/document_source_out_test.cpp
@@ -41,12 +41,6 @@ namespace {
using boost::intrusive_ptr;
-StringData kModeFieldName = DocumentSourceOutSpec::kModeFieldName;
-StringData kUniqueKeyFieldName = DocumentSourceOutSpec::kUniqueKeyFieldName;
-StringData kDefaultMode = WriteMode_serializer(WriteModeEnum::kModeReplaceCollection);
-StringData kInsertDocumentsMode = WriteMode_serializer(WriteModeEnum::kModeInsertDocuments);
-StringData kReplaceDocumentsMode = WriteMode_serializer(WriteModeEnum::kModeReplaceDocuments);
-
/**
* For the purpsoses of this test, assume every collection is unsharded. Stages may ask this during
* setup. For example, to compute its constraints, the $out stage needs to know if the output
@@ -95,6 +89,9 @@ TEST_F(DocumentSourceOutTest, FailsToParseIncorrectType) {
spec = BSON("$out" << BSONArray());
ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 16990);
+
+ spec = BSON("$out" << BSONObj());
+ ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 16990);
}
TEST_F(DocumentSourceOutTest, AcceptsStringArgument) {
@@ -104,319 +101,18 @@ TEST_F(DocumentSourceOutTest, AcceptsStringArgument) {
ASSERT_EQ(outStage->getOutputNs().coll(), "some_collection");
}
-TEST_F(DocumentSourceOutTest, SerializeDefaultsModeRecreateCollection) {
+TEST_F(DocumentSourceOutTest, SerializeToString) {
BSONObj spec = BSON("$out"
<< "some_collection");
auto outStage = createOutStage(spec);
auto serialized = outStage->serialize().getDocument();
- ASSERT_EQ(serialized["$out"][kModeFieldName].getStringData(), kDefaultMode);
+ ASSERT_EQ(serialized["$out"].getStringData(), "some_collection");
// Make sure we can reparse the serialized BSON.
auto reparsedOutStage = createOutStage(serialized.toBson());
auto reSerialized = reparsedOutStage->serialize().getDocument();
- ASSERT_EQ(reSerialized["$out"][kModeFieldName].getStringData(), kDefaultMode);
-}
-
-TEST_F(DocumentSourceOutTest, SerializeUniqueKeyDefaultsToId) {
- BSONObj spec = BSON("$out" << BSON("to"
- << "target"
- << "mode"
- << kDefaultMode));
- auto outStage = createOutStage(spec);
- auto serialized = outStage->serialize().getDocument();
- ASSERT_EQ(serialized["$out"][kModeFieldName].getStringData(), kDefaultMode);
- ASSERT_DOCUMENT_EQ(serialized["$out"][kUniqueKeyFieldName].getDocument(),
- (Document{{"_id", 1}}));
-
- spec = BSON("$out"
- << "some_collection");
- outStage = createOutStage(spec);
- serialized = outStage->serialize().getDocument();
- ASSERT_EQ(serialized["$out"][kModeFieldName].getStringData(), kDefaultMode);
- ASSERT_DOCUMENT_EQ(serialized["$out"][kUniqueKeyFieldName].getDocument(),
- (Document{{"_id", 1}}));
-}
-
-TEST_F(DocumentSourceOutTest, SerializeCompoundUniqueKey) {
- BSONObj spec = BSON("$out" << BSON("to"
- << "target"
- << "mode"
- << kDefaultMode
- << "uniqueKey"
- << BSON("_id" << 1 << "shardKey" << 1)));
- auto outStage = createOutStage(spec);
- auto serialized = outStage->serialize().getDocument();
- ASSERT_EQ(serialized["$out"][kModeFieldName].getStringData(), kDefaultMode);
- ASSERT_DOCUMENT_EQ(serialized["$out"][kUniqueKeyFieldName].getDocument(),
- (Document{{"_id", 1}, {"shardKey", 1}}));
-}
-
-TEST_F(DocumentSourceOutTest, SerializeDottedPathUniqueKey) {
- BSONObj spec = BSON("$out" << BSON("to"
- << "target"
- << "mode"
- << kDefaultMode
- << "uniqueKey"
- << BSON("_id" << 1 << "a.b" << 1)));
- auto outStage = createOutStage(spec);
- auto serialized = outStage->serialize().getDocument();
- ASSERT_EQ(serialized["$out"][kModeFieldName].getStringData(), kDefaultMode);
- ASSERT_DOCUMENT_EQ(serialized["$out"][kUniqueKeyFieldName].getDocument(),
- (Document{{"_id", 1}, {"a.b", 1}}));
-
- spec = BSON("$out" << BSON("to"
- << "target"
- << "mode"
- << kDefaultMode
- << "uniqueKey"
- << BSON("_id.a" << 1)));
- outStage = createOutStage(spec);
- serialized = outStage->serialize().getDocument();
- ASSERT_EQ(serialized["$out"][kModeFieldName].getStringData(), kDefaultMode);
- ASSERT_DOCUMENT_EQ(serialized["$out"][kUniqueKeyFieldName].getDocument(),
- (Document{{"_id.a", 1}}));
-}
-
-TEST_F(DocumentSourceOutTest, SerializeDottedPathUniqueKeySharedPrefix) {
- BSONObj spec = BSON("$out" << BSON("to"
- << "target"
- << "mode"
- << kDefaultMode
- << "uniqueKey"
- << BSON("_id" << 1 << "a.b" << 1 << "a.c" << 1)));
- auto outStage = createOutStage(spec);
- auto serialized = outStage->serialize().getDocument();
- ASSERT_EQ(serialized["$out"][kModeFieldName].getStringData(), kDefaultMode);
- ASSERT_DOCUMENT_EQ(serialized["$out"][kUniqueKeyFieldName].getDocument(),
- (Document{{"_id", 1}, {"a.b", 1}, {"a.c", 1}}));
-}
-
-TEST_F(DocumentSourceOutTest, FailsToParseIfToIsNotString) {
- BSONObj spec = BSON("$out" << BSONObj());
- ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 40414);
-
- spec = BSON("$out" << BSON("to" << 1));
- ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, ErrorCodes::TypeMismatch);
-
- spec = BSON("$out" << BSON("to" << BSON("a" << 1)));
- ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, ErrorCodes::TypeMismatch);
-}
-
-TEST_F(DocumentSourceOutTest, FailsToParseIfToIsNotAValidUserCollection) {
- BSONObj spec = BSON("$out" << BSON("to"
- << "$test"
- << "mode"
- << kDefaultMode));
- ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 17385);
-
- spec = BSON("$out" << BSON("to"
- << "system.views"
- << "mode"
- << kDefaultMode));
- ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 17385);
-
- spec = BSON("$out" << BSON("to"
- << ".test."
- << "mode"
- << kDefaultMode));
- ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, ErrorCodes::InvalidNamespace);
-}
-
-TEST_F(DocumentSourceOutTest, FailsToParseIfDbIsNotString) {
- BSONObj spec = BSON("$out" << BSON("to"
- << "test"
- << "mode"
- << kDefaultMode
- << "db"
- << true));
- ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, ErrorCodes::TypeMismatch);
-
- spec = BSON("$out" << BSON("to"
- << "test"
- << "mode"
- << kDefaultMode
- << "db"
- << BSONArray()));
- ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, ErrorCodes::TypeMismatch);
-
- spec = BSON("$out" << BSON("to"
- << "test"
- << "mode"
- << kDefaultMode
- << "db"
- << BSON(""
- << "test")));
- ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, ErrorCodes::TypeMismatch);
-}
-
-TEST_F(DocumentSourceOutTest, FailsToParseIfDbIsNotAValidDatabaseName) {
- BSONObj spec = BSON("$out" << BSON("to"
- << "test"
- << "mode"
- << kInsertDocumentsMode
- << "db"
- << "$invalid"));
- ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 17385);
-
- spec = BSON("$out" << BSON("to"
- << "test"
- << "mode"
- << kInsertDocumentsMode
- << "db"
- << ".test"));
- ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, ErrorCodes::InvalidNamespace);
-}
-
-TEST_F(DocumentSourceOutTest, FailsToParseIfModeIsNotString) {
- BSONObj spec = BSON("$out" << BSON("to"
- << "test"
- << "mode"
- << true));
- ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, ErrorCodes::TypeMismatch);
-
- spec = BSON("$out" << BSON("to"
- << "test"
- << "mode"
- << BSONArray()));
- ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, ErrorCodes::TypeMismatch);
-
- spec = BSON("$out" << BSON("to"
- << "test"
- << "mode"
- << BSON("" << kDefaultMode)));
- ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, ErrorCodes::TypeMismatch);
-}
-
-TEST_F(DocumentSourceOutTest, CorrectlyAddressesMatchingTargetAndAggregationNamespaces) {
- const auto targetNsSameAsAggregationNs = getExpCtx()->ns;
- const auto targetColl = targetNsSameAsAggregationNs.coll();
- const auto targetDb = targetNsSameAsAggregationNs.db();
-
- BSONObj spec = BSON(
- "$out" << BSON("to" << targetColl << "mode" << kInsertDocumentsMode << "db" << targetDb));
- ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 50992);
-
- spec = BSON(
- "$out" << BSON("to" << targetColl << "mode" << kReplaceDocumentsMode << "db" << targetDb));
- ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 50992);
-
- spec = BSON("$out" << BSON("to" << targetColl << "mode" << kDefaultMode << "db" << targetDb));
- auto outStage = createOutStage(spec);
- ASSERT_EQ(outStage->getOutputNs().db(), targetNsSameAsAggregationNs.db());
- ASSERT_EQ(outStage->getOutputNs().coll(), targetNsSameAsAggregationNs.coll());
-}
-
-TEST_F(DocumentSourceOutTest, FailsToParseIfModeIsUnsupportedString) {
- BSONObj spec = BSON("$out" << BSON("to"
- << "test"
- << "mode"
- << "unsupported"));
- ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, ErrorCodes::BadValue);
-
- spec = BSON("$out" << BSON("to"
- << "test"
- << "mode"
- << "merge"));
- ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, ErrorCodes::BadValue);
-}
-
-TEST_F(DocumentSourceOutTest, FailsToParseIfUniqueKeyIsNotAnObject) {
- BSONObj spec = BSON("$out" << BSON("to"
- << "test"
- << "mode"
- << kDefaultMode
- << "uniqueKey"
- << 1));
- ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, ErrorCodes::TypeMismatch);
-
- spec = BSON("$out" << BSON("to"
- << "test"
- << "mode"
- << kDefaultMode
- << "uniqueKey"
- << BSONArray()));
- ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, ErrorCodes::TypeMismatch);
-
- spec = BSON("$out" << BSON("to"
- << "test"
- << "mode"
- << kDefaultMode
- << "uniqueKey"
- << "_id"));
- ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, ErrorCodes::TypeMismatch);
-}
-
-TEST_F(DocumentSourceOutTest, FailsToParseIfUniqueKeyHasDuplicateFields) {
- BSONObj spec = BSON("$out" << BSON("to"
- << "test"
- << "mode"
- << kDefaultMode
- << "uniqueKey"
- << BSON("_id" << 1 << "_id" << 1)));
- ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, ErrorCodes::BadValue);
-
- spec = BSON("$out" << BSON("to"
- << "test"
- << "mode"
- << kDefaultMode
- << "uniqueKey"
- << BSON("x" << 1 << "y" << 1 << "x" << 1)));
- ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, ErrorCodes::BadValue);
-}
-
-TEST_F(DocumentSourceOutTest, FailsToParseIfTargetCollectionVersionIsSpecifiedOnMongos) {
- BSONObj spec = BSON("$out" << BSON("to"
- << "test"
- << "mode"
- << kDefaultMode
- << "uniqueKey"
- << BSON("_id" << 1)
- << "targetCollectionVersion"
- << ChunkVersion(0, 0, OID::gen()).toBSON()));
- getExpCtx()->inMongos = true;
- ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 50984);
-
- // Test that 'targetCollectionVersion' is accepted if _from_ mongos.
- getExpCtx()->inMongos = false;
- getExpCtx()->fromMongos = true;
- ASSERT(createOutStage(spec) != nullptr);
-
- // Test that 'targetCollectionVersion' is not accepted if on mongod but not from mongos.
- getExpCtx()->inMongos = false;
- getExpCtx()->fromMongos = false;
- ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 51018);
+ ASSERT_EQ(reSerialized["$out"].getStringData(), "some_collection");
}
-TEST_F(DocumentSourceOutTest, FailsToParseifUniqueKeyIsNotSentFromMongos) {
- BSONObj spec = BSON("$out" << BSON("to"
- << "test"
- << "mode"
- << kDefaultMode
- << "targetCollectionVersion"
- << ChunkVersion(0, 0, OID::gen()).toBSON()));
- getExpCtx()->fromMongos = true;
- ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 51017);
-}
-
-TEST_F(DocumentSourceOutTest, CorrectlyUsesTargetDbThatMatchesAggregationDb) {
- const auto targetDbSameAsAggregationDb = getExpCtx()->ns.db();
- const auto targetColl = "test"_sd;
- BSONObj spec = BSON("$out" << BSON("to" << targetColl << "mode" << kDefaultMode << "db"
- << targetDbSameAsAggregationDb));
-
- auto outStage = createOutStage(spec);
- ASSERT_EQ(outStage->getOutputNs().db(), targetDbSameAsAggregationDb);
- ASSERT_EQ(outStage->getOutputNs().coll(), targetColl);
-}
-
-// TODO (SERVER-36832): Allow "replaceCollection" to a foreign database.
-TEST_F(DocumentSourceOutTest, CorrectlyUsesForeignTargetDb) {
- const auto foreignDb = "someOtherDb"_sd;
- const auto targetColl = "test"_sd;
- BSONObj spec =
- BSON("$out" << BSON("to" << targetColl << "mode" << kDefaultMode << "db" << foreignDb));
-
- ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 50939);
-}
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp
index 88a05b076bb..e9f86b947ed 100644
--- a/src/mongo/db/pipeline/document_source_sort.cpp
+++ b/src/mongo/db/pipeline/document_source_sort.cpp
@@ -542,11 +542,11 @@ boost::optional<DocumentSource::DistributedPlanLogic> DocumentSourceSort::distri
return split;
}
-bool DocumentSourceSort::canRunInParallelBeforeOut(
+bool DocumentSourceSort::canRunInParallelBeforeWriteStage(
const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const {
// This is an interesting special case. If there are no further stages which require merging the
// streams into one, a $sort should not require it. This is only the case because the sort order
- // doesn't matter for a pipeline ending with a $out stage. We may encounter it here as an
+ // doesn't matter for a pipeline ending with a write stage. We may encounter it here as an
// intermediate stage before a final $group with a $sort, which would make sense. Should we
// extend our analysis to detect if an exchange is appropriate in a general pipeline, a $sort
// would generally require merging the streams before producing output.
diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h
index ca68b71ad72..0e0310c5ca8 100644
--- a/src/mongo/db/pipeline/document_source_sort.h
+++ b/src/mongo/db/pipeline/document_source_sort.h
@@ -91,7 +91,7 @@ public:
DepsTracker::State getDependencies(DepsTracker* deps) const final;
boost::optional<DistributedPlanLogic> distributedPlanLogic() final;
- bool canRunInParallelBeforeOut(
+ bool canRunInParallelBeforeWriteStage(
const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const final;
/**
diff --git a/src/mongo/db/pipeline/document_source_writer.h b/src/mongo/db/pipeline/document_source_writer.h
new file mode 100644
index 00000000000..fd10532d469
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_writer.h
@@ -0,0 +1,228 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/platform/basic.h"
+
+#include <fmt/format.h>
+
+#include "mongo/db/db_raii.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/read_concern.h"
+#include "mongo/db/storage/recovery_unit.h"
+
+namespace mongo {
+using namespace fmt::literals;
+
+/**
+ * Manipulates the state of the OperationContext so that while this object is in scope, reads and
+ * writes will use a local read concern and see the latest version of the data. It will also reset
+ * ignore_prepared on the recovery unit so that any reads or writes will block on a conflict with a
+ * prepared transaction. Resets the OperationContext back to its original state upon destruction.
+ */
+class DocumentSourceWriteBlock {
+ OperationContext* _opCtx;
+ repl::ReadConcernArgs _originalArgs;
+ RecoveryUnit::ReadSource _originalSource;
+ EnforcePrepareConflictsBlock _enforcePrepareConflictsBlock;
+
+public:
+ DocumentSourceWriteBlock(OperationContext* opCtx)
+ : _opCtx(opCtx), _enforcePrepareConflictsBlock(opCtx) {
+ _originalArgs = repl::ReadConcernArgs::get(_opCtx);
+ _originalSource = _opCtx->recoveryUnit()->getTimestampReadSource();
+
+ repl::ReadConcernArgs::get(_opCtx) = repl::ReadConcernArgs();
+ _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::kUnset);
+ }
+
+ ~DocumentSourceWriteBlock() {
+ repl::ReadConcernArgs::get(_opCtx) = _originalArgs;
+ _opCtx->recoveryUnit()->setTimestampReadSource(_originalSource);
+ }
+};
+
+/**
+ * This is a base abstract class for all stages performing a write operation into an output
+ * collection. The writes are organized in batches in which elements are objects of the templated
+ * type 'B'. A subclass must override two methods to be able to write into the output collection:
+ *
+ * 1. 'makeBatchObject()' - to create an object of type 'B' from the given 'Document', which is,
+ * essentially, a result of the input source's 'getNext()' .
+ * 2. 'spill()' - to write the batch into the output collection.
+ *
+ * Two other virtual methods exist which a subclass may override: 'initialize()' and 'finalize()',
+ * which are called before the first element is read from the input source, and after the last one
+ * has been read, respectively.
+ */
+template <typename B>
+class DocumentSourceWriter : public DocumentSource {
+public:
+ using BatchObject = B;
+ using BatchedObjects = std::vector<BatchObject>;
+
+ DocumentSourceWriter(NamespaceString outputNs,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx)
+ : DocumentSource(expCtx),
+ _outputNs(std::move(outputNs)),
+ _writeConcern(expCtx->opCtx->getWriteConcern()) {}
+
+ DepsTracker::State getDependencies(DepsTracker* deps) const override {
+ deps->needWholeDocument = true;
+ return DepsTracker::State::EXHAUSTIVE_ALL;
+ }
+
+ GetModPathsReturn getModifiedPaths() const override {
+ // For purposes of tracking which fields come from where, the writer stage does not modify
+ // any fields by default.
+ return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>{}, {}};
+ }
+
+ boost::optional<DistributedPlanLogic> distributedPlanLogic() override {
+ return DistributedPlanLogic{nullptr, this, boost::none};
+ }
+
+ bool canRunInParallelBeforeWriteStage(
+ const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const override {
+ return true;
+ }
+
+ const NamespaceString& getOutputNs() const {
+ return _outputNs;
+ }
+
+ GetNextResult getNext() final override;
+
+protected:
+ /**
+ * Prepares the stage to be able to write incoming batches.
+ */
+ virtual void initialize() {}
+
+ /**
+ * Finalize the output collection, called when there are no more documents to write.
+ */
+ virtual void finalize() {}
+
+ /**
+ * Writes the documents in 'batch' to the output namespace.
+ */
+ virtual void spill(BatchedObjects&& batch) = 0;
+
+ /**
+ * Creates a batch object from the given document and returns it to the caller along with the
+ * object size.
+ */
+ virtual std::pair<B, int> makeBatchObject(Document&& doc) const = 0;
+
+ /**
+ * A subclass may override this method to enable a fail point right after a next input element
+ * has been retrieved, but not processed yet.
+ */
+ virtual void waitWhileFailPointEnabled() {}
+
+ // The namespace where the output will be written to.
+ const NamespaceString _outputNs;
+
+ // Stash the writeConcern of the original command as the operation context may change by the
+ // time we start to spill writes. This is because certain aggregations (e.g. $exchange)
+ // establish cursors with batchSize 0 then run subsequent getMore's which use a new operation
+ // context. The getMore's will not have an attached writeConcern however we still want to
+ // respect the writeConcern of the original command.
+ WriteConcernOptions _writeConcern;
+
+private:
+ bool _initialized{false};
+ bool _done{false};
+};
+
+template <typename B>
+DocumentSource::GetNextResult DocumentSourceWriter<B>::getNext() {
+ pExpCtx->checkForInterrupt();
+
+ if (_done) {
+ return GetNextResult::makeEOF();
+ }
+
+ if (!_initialized) {
+ // Explain should never try to actually execute any writes. We only ever expect
+ // getNext() to be called for the 'executionStats' and 'allPlansExecution' explain
+ // modes. This assertion should not be triggered for 'queryPlanner' explain, which
+ // is perfectly legal.
+ uassert(51029,
+ "explain of {} is not allowed with verbosity {}"_format(
+ getSourceName(), ExplainOptions::verbosityString(*pExpCtx->explain)),
+ !pExpCtx->explain);
+ initialize();
+ _initialized = true;
+ }
+
+ BatchedObjects batch;
+ int bufferedBytes = 0;
+
+ auto nextInput = pSource->getNext();
+ for (; nextInput.isAdvanced(); nextInput = pSource->getNext()) {
+ waitWhileFailPointEnabled();
+
+ auto doc = nextInput.releaseDocument();
+ auto[obj, objSize] = makeBatchObject(std::move(doc));
+
+ bufferedBytes += objSize;
+ if (!batch.empty() &&
+ (bufferedBytes > BSONObjMaxUserSize || batch.size() >= write_ops::kMaxWriteBatchSize)) {
+ spill(std::move(batch));
+ batch.clear();
+ bufferedBytes = objSize;
+ }
+ batch.push_back(obj);
+ }
+ if (!batch.empty()) {
+ spill(std::move(batch));
+ batch.clear();
+ }
+
+ switch (nextInput.getStatus()) {
+ case GetNextResult::ReturnStatus::kAdvanced: {
+ MONGO_UNREACHABLE; // We consumed all advances above.
+ }
+ case GetNextResult::ReturnStatus::kPauseExecution: {
+ return nextInput; // Propagate the pause.
+ }
+ case GetNextResult::ReturnStatus::kEOF: {
+ _done = true;
+ finalize();
+ return nextInput;
+ }
+ }
+ MONGO_UNREACHABLE;
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/mongo_process_common.cpp b/src/mongo/db/pipeline/mongo_process_common.cpp
index 5db5dd993ee..a2527aded86 100644
--- a/src/mongo/db/pipeline/mongo_process_common.cpp
+++ b/src/mongo/db/pipeline/mongo_process_common.cpp
@@ -167,4 +167,17 @@ std::vector<FieldPath> MongoProcessCommon::_shardKeyToDocumentKeyFields(
return result;
}
+std::set<FieldPath> MongoProcessCommon::_convertToFieldPaths(
+ const std::vector<std::string>& fields) const {
+ std::set<FieldPath> fieldPaths;
+
+ for (const auto& field : fields) {
+ const auto res = fieldPaths.insert(FieldPath(field));
+ uassert(ErrorCodes::BadValue,
+ str::stream() << "Found a duplicate field '" << field << "'",
+ res.second);
+ }
+ return fieldPaths;
+}
+
} // namespace mongo
diff --git a/src/mongo/db/pipeline/mongo_process_common.h b/src/mongo/db/pipeline/mongo_process_common.h
index ad6bf255c5f..cef5cb8c7db 100644
--- a/src/mongo/db/pipeline/mongo_process_common.h
+++ b/src/mongo/db/pipeline/mongo_process_common.h
@@ -90,6 +90,12 @@ protected:
virtual void _reportCurrentOpsForIdleSessions(OperationContext* opCtx,
CurrentOpUserMode userMode,
std::vector<BSONObj>* ops) const = 0;
+
+ /**
+ * Converts an array of field names into a set of FieldPath. Throws if 'fields' contains
+ * duplicate elements.
+ */
+ std::set<FieldPath> _convertToFieldPaths(const std::vector<std::string>& fields) const;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h
index 87dce6017e9..7f2d8eab5d7 100644
--- a/src/mongo/db/pipeline/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/mongo_process_interface.h
@@ -80,8 +80,9 @@ public:
* 3. boost::optional<BSONObj> - for pipeline-style updated, specifies variables that can be
* referred to in the pipeline performing the custom update.
*/
- using BatchedObjects =
- std::vector<std::tuple<BSONObj, write_ops::UpdateModification, boost::optional<BSONObj>>>;
+ using BatchObject =
+ std::tuple<BSONObj, write_ops::UpdateModification, boost::optional<BSONObj>>;
+ using BatchedObjects = std::vector<BatchObject>;
enum class CurrentOpConnectionsMode { kIncludeIdle, kExcludeIdle };
enum class CurrentOpUserMode { kIncludeAll, kExcludeOthers };
@@ -346,15 +347,16 @@ public:
/**
* Returns true if there is an index on 'nss' with properties that will guarantee that a
- * document with non-array values for each of 'uniqueKeyPaths' will have at most one matching
+ * document with non-array values for each of 'fieldPaths' will have at most one matching
* document in 'nss'.
*
* Specifically, such an index must include all the fields, be unique, not be a partial index,
* and match the operation's collation as given by 'expCtx'.
*/
- virtual bool uniqueKeyIsSupportedByIndex(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& nss,
- const std::set<FieldPath>& uniqueKeyPaths) const = 0;
+ virtual bool fieldsHaveSupportingUniqueIndex(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& nss,
+ const std::set<FieldPath>& fieldPaths) const = 0;
/**
* Refreshes the CatalogCache entry for the namespace 'nss', and returns the epoch associated
@@ -376,6 +378,21 @@ public:
ChunkVersion targetCollectionVersion) const = 0;
virtual std::unique_ptr<ResourceYielder> getResourceYielder() const = 0;
+
+ /**
+ * If the user supplied the 'fields' array, ensures that it can be used to uniquely identify a
+ * document. Otherwise, picks a default unique key, which can be either the "_id" field, or
+ * or a shard key, depending on the 'outputNs' collection type and the server type (mongod or
+ * mongos). Also returns an optional ChunkVersion, populated with the version stored in the
+ * sharding catalog when we asked for the shard key (on mongos only). On mongod, this is the
+ * value of the 'targetCollectionVersion' parameter, which is the target shard version of the
+ * collection, as sent by mongos.
+ */
+ virtual std::pair<std::set<FieldPath>, boost::optional<ChunkVersion>>
+ ensureFieldsUniqueOrResolveDocumentKey(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ boost::optional<std::vector<std::string>> fields,
+ boost::optional<ChunkVersion> targetCollectionVersion,
+ const NamespaceString& outputNs) const = 0;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/mongos_process_interface.cpp b/src/mongo/db/pipeline/mongos_process_interface.cpp
index 5072c389bc0..331b02bcbcf 100644
--- a/src/mongo/db/pipeline/mongos_process_interface.cpp
+++ b/src/mongo/db/pipeline/mongos_process_interface.cpp
@@ -265,10 +265,10 @@ bool MongoSInterface::isSharded(OperationContext* opCtx, const NamespaceString&
return routingInfo.isOK() && routingInfo.getValue().cm();
}
-bool MongoSInterface::uniqueKeyIsSupportedByIndex(
+bool MongoSInterface::fieldsHaveSupportingUniqueIndex(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& nss,
- const std::set<FieldPath>& uniqueKeyPaths) const {
+ const std::set<FieldPath>& fieldPaths) const {
const auto opCtx = expCtx->opCtx;
const auto routingInfo =
uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss));
@@ -281,17 +281,59 @@ bool MongoSInterface::uniqueKeyIsSupportedByIndex(
BSON("listIndexes" << nss.coll()),
opCtx->hasDeadline() ? opCtx->getRemainingMaxTimeMillis() : Milliseconds(-1));
- // If the namespace does not exist, then the unique key *must* be _id only.
+ // If the namespace does not exist, then the field paths *must* be _id only.
if (response.getStatus() == ErrorCodes::NamespaceNotFound) {
- return uniqueKeyPaths == std::set<FieldPath>{"_id"};
+ return fieldPaths == std::set<FieldPath>{"_id"};
}
uassertStatusOK(response);
const auto& indexes = response.getValue().docs;
- return std::any_of(
- indexes.begin(), indexes.end(), [&expCtx, &uniqueKeyPaths](const auto& index) {
- return supportsUniqueKey(expCtx, index, uniqueKeyPaths);
- });
+ return std::any_of(indexes.begin(), indexes.end(), [&expCtx, &fieldPaths](const auto& index) {
+ return supportsUniqueKey(expCtx, index, fieldPaths);
+ });
+}
+
+std::pair<std::set<FieldPath>, boost::optional<ChunkVersion>>
+MongoSInterface::ensureFieldsUniqueOrResolveDocumentKey(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ boost::optional<std::vector<std::string>> fields,
+ boost::optional<ChunkVersion> targetCollectionVersion,
+ const NamespaceString& outputNs) const {
+ invariant(expCtx->inMongos);
+ uassert(
+ 51179, "Received unexpected 'targetCollectionVersion' on mongos", !targetCollectionVersion);
+
+ if (fields) {
+ // Convert 'fields' array to a set of FieldPaths.
+ auto fieldPaths = _convertToFieldPaths(*fields);
+ uassert(51190,
+ "Cannot find index to verify that join fields will be unique",
+ fieldsHaveSupportingUniqueIndex(expCtx, outputNs, fieldPaths));
+
+ // If the user supplies the 'fields' array, we don't need to attach a ChunkVersion for the
+ // shards since we are not at risk of 'guessing' the wrong shard key.
+ return {fieldPaths, boost::none};
+ }
+
+ // In case there are multiple shards which will perform this stage in parallel, we need to
+ // figure out and attach the collection's shard version to ensure each shard is talking about
+ // the same version of the collection. This mongos will coordinate that. We force a catalog
+ // refresh to do so because there is no shard versioning protocol on this namespace and so we
+ // otherwise could not be sure this node is (or will become) at all recent. We will also
+ // figure out and attach the 'joinFields' to send to the shards.
+
+ // There are edge cases when the collection could be dropped or re-created during or near the
+ // time of the operation (for example, during aggregation). This is okay - we are mostly
+ // paranoid that this mongos is very stale and want to prevent returning an error if the
+ // collection was dropped a long time ago. Because of this, we are okay with piggy-backing off
+ // another thread's request to refresh the cache, simply waiting for that request to return
+ // instead of forcing another refresh.
+ targetCollectionVersion = refreshAndGetCollectionVersion(expCtx, outputNs);
+
+ auto docKeyPaths = collectDocumentKeyFieldsActingAsRouter(expCtx->opCtx, outputNs);
+ return {std::set<FieldPath>(std::make_move_iterator(docKeyPaths.begin()),
+ std::make_move_iterator(docKeyPaths.end())),
+ targetCollectionVersion};
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h
index 4c3b06ad30c..6a9d4b2431f 100644
--- a/src/mongo/db/pipeline/mongos_process_interface.h
+++ b/src/mongo/db/pipeline/mongos_process_interface.h
@@ -42,7 +42,7 @@ namespace mongo {
* Class to provide access to mongos-specific implementations of methods required by some
* document sources.
*/
-class MongoSInterface final : public MongoProcessCommon {
+class MongoSInterface : public MongoProcessCommon {
public:
static BSONObj createPassthroughCommandForShard(OperationContext* opCtx,
const AggregationRequest& request,
@@ -217,9 +217,9 @@ public:
MONGO_UNREACHABLE;
}
- bool uniqueKeyIsSupportedByIndex(const boost::intrusive_ptr<ExpressionContext>&,
- const NamespaceString&,
- const std::set<FieldPath>& uniqueKeyPaths) const final;
+ bool fieldsHaveSupportingUniqueIndex(const boost::intrusive_ptr<ExpressionContext>&,
+ const NamespaceString&,
+ const std::set<FieldPath>& fieldPaths) const;
void checkRoutingInfoEpochOrThrow(const boost::intrusive_ptr<ExpressionContext>&,
const NamespaceString&,
@@ -231,6 +231,12 @@ public:
return nullptr;
}
+ std::pair<std::set<FieldPath>, boost::optional<ChunkVersion>>
+ ensureFieldsUniqueOrResolveDocumentKey(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ boost::optional<std::vector<std::string>> fields,
+ boost::optional<ChunkVersion> targetCollectionVersion,
+ const NamespaceString& outputNs) const override;
+
protected:
BSONObj _reportCurrentOpForClient(OperationContext* opCtx,
Client* client,
diff --git a/src/mongo/db/pipeline/mongos_process_interface_test.cpp b/src/mongo/db/pipeline/mongos_process_interface_test.cpp
new file mode 100644
index 00000000000..91d79185481
--- /dev/null
+++ b/src/mongo/db/pipeline/mongos_process_interface_test.cpp
@@ -0,0 +1,86 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/aggregation_context_fixture.h"
+#include "mongo/db/pipeline/mongos_process_interface.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+class MongoProcessInterfaceForTest : public MongoSInterface {
+public:
+ using MongoSInterface::MongoSInterface;
+
+ bool fieldsHaveSupportingUniqueIndex(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& nss,
+ const std::set<FieldPath>& fieldPaths) const override {
+ return hasSupportingIndexForFields;
+ }
+
+ bool hasSupportingIndexForFields{true};
+};
+
+class MongoSInterfaceTest : public AggregationContextFixture {
+public:
+ MongoSInterfaceTest() {
+ getExpCtx()->inMongos = true;
+ }
+
+ auto makeProcessInterface() {
+ return std::make_unique<MongoProcessInterfaceForTest>();
+ }
+};
+
+TEST_F(MongoSInterfaceTest, FailsToEnsureFieldsUniqueIfTargetCollectionVersionIsSpecified) {
+ auto expCtx = getExpCtx();
+ auto targetCollectionVersion = boost::make_optional(ChunkVersion(0, 0, OID::gen()));
+ auto processInterface = makeProcessInterface();
+
+ ASSERT_THROWS_CODE(processInterface->ensureFieldsUniqueOrResolveDocumentKey(
+ expCtx, {{"_id"}}, targetCollectionVersion, expCtx->ns),
+ AssertionException,
+ 51179);
+}
+
+TEST_F(MongoSInterfaceTest, FailsToEnsureFieldsUniqueIfNotSupportedByIndex) {
+ auto expCtx = getExpCtx();
+ auto targetCollectionVersion = boost::none;
+ auto processInterface = makeProcessInterface();
+
+ processInterface->hasSupportingIndexForFields = false;
+ ASSERT_THROWS_CODE(processInterface->ensureFieldsUniqueOrResolveDocumentKey(
+ expCtx, {{"x"}}, targetCollectionVersion, expCtx->ns),
+ AssertionException,
+ 51190);
+}
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index 1b1f2748b66..d6179e48e78 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -1808,8 +1808,7 @@ public:
rawPipeline.push_back(stageElem.embeddedObject());
}
AggregationRequest request(kTestNss, rawPipeline);
- intrusive_ptr<ExpressionContextForTest> ctx =
- new ExpressionContextForTest(&_opCtx, request);
+ intrusive_ptr<ExpressionContextForTest> ctx = createExpressionContext(request);
TempDir tempDir("PipelineTest");
ctx->tempDir = tempDir.path();
@@ -1837,6 +1836,11 @@ public:
virtual ~Base() {}
+ virtual intrusive_ptr<ExpressionContextForTest> createExpressionContext(
+ const AggregationRequest& request) {
+ return new ExpressionContextForTest(&_opCtx, request);
+ }
+
protected:
std::unique_ptr<Pipeline, PipelineDeleter> mergePipe;
std::unique_ptr<Pipeline, PipelineDeleter> shardPipe;
@@ -2271,7 +2275,7 @@ class ShouldNotCoalesceUnwindNotOnAs : public Base {
namespace needsPrimaryShardMerger {
-class needsPrimaryShardMergerBase : public Base {
+class ShardMergerBase : public Base {
public:
void run() override {
Base::run();
@@ -2281,7 +2285,7 @@ public:
virtual bool needsPrimaryShardMerger() = 0;
};
-class Out : public needsPrimaryShardMergerBase {
+class Out : public ShardMergerBase {
bool needsPrimaryShardMerger() {
return true;
}
@@ -2292,13 +2296,56 @@ class Out : public needsPrimaryShardMergerBase {
return "[]";
}
string mergePipeJson() {
- return "[{$out: {to: 'outColl', db: 'a', mode: '" +
- WriteMode_serializer(WriteModeEnum::kModeReplaceCollection) +
- "', uniqueKey: {_id: 1}}}]";
+ return "[{$out: 'outColl'}]";
+ }
+};
+
+class MergeWithUnshardedCollection : public ShardMergerBase {
+ bool needsPrimaryShardMerger() {
+ return true;
+ }
+ string inputPipeJson() {
+ return "[{$merge: 'outColl'}]";
+ }
+ string shardPipeJson() {
+ return "[]";
+ }
+ string mergePipeJson() {
+ return "[{$merge: {into: {db: 'a', coll: 'outColl'}, on: '_id', "
+ "whenMatched: 'merge', whenNotMatched: 'insert'}}]";
+ }
+};
+
+class MergeWithShardedCollection : public ShardMergerBase {
+ intrusive_ptr<ExpressionContextForTest> createExpressionContext(
+ const AggregationRequest& request) override {
+ class ProcessInterface : public StubMongoProcessInterface {
+ bool isSharded(OperationContext* opCtx, const NamespaceString& ns) override {
+ return true;
+ }
+ };
+
+ auto expCtx = ShardMergerBase::createExpressionContext(request);
+ expCtx->mongoProcessInterface = std::make_shared<ProcessInterface>();
+ return expCtx;
+ }
+
+ bool needsPrimaryShardMerger() {
+ return false;
+ }
+ string inputPipeJson() {
+ return "[{$merge: 'outColl'}]";
+ }
+ string shardPipeJson() {
+ return "[{$merge: {into: {db: 'a', coll: 'outColl'}, on: '_id', "
+ "whenMatched: 'merge', whenNotMatched: 'insert'}}]";
+ }
+ string mergePipeJson() {
+ return "[]";
}
};
-class Project : public needsPrimaryShardMergerBase {
+class Project : public ShardMergerBase {
bool needsPrimaryShardMerger() {
return false;
}
@@ -2313,7 +2360,7 @@ class Project : public needsPrimaryShardMergerBase {
}
};
-class LookUp : public needsPrimaryShardMergerBase {
+class LookUp : public ShardMergerBase {
bool needsPrimaryShardMerger() {
return true;
}
@@ -3252,6 +3299,8 @@ public:
ShardedMatchSortProjLimBecomesMatchTopKSortProj>();
add<Optimizations::Sharded::limitFieldsSentFromShardsToMerger::ShardAlreadyExhaustive>();
add<Optimizations::Sharded::needsPrimaryShardMerger::Out>();
+ add<Optimizations::Sharded::needsPrimaryShardMerger::MergeWithUnshardedCollection>();
+ add<Optimizations::Sharded::needsPrimaryShardMerger::MergeWithShardedCollection>();
add<Optimizations::Sharded::needsPrimaryShardMerger::Project>();
add<Optimizations::Sharded::needsPrimaryShardMerger::LookUp>();
}
diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp
index 7d493a6ba7a..02f47a0fe1c 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.cpp
+++ b/src/mongo/db/pipeline/process_interface_standalone.cpp
@@ -527,10 +527,10 @@ std::vector<BSONObj> MongoInterfaceStandalone::getMatchingPlanCacheEntryStats(
return planCache->getMatchingStats(serializer, predicate);
}
-bool MongoInterfaceStandalone::uniqueKeyIsSupportedByIndex(
+bool MongoInterfaceStandalone::fieldsHaveSupportingUniqueIndex(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& nss,
- const std::set<FieldPath>& uniqueKeyPaths) const {
+ const std::set<FieldPath>& fieldPaths) const {
auto* opCtx = expCtx->opCtx;
// We purposefully avoid a helper like AutoGetCollection here because we don't want to check the
// db version or do anything else. We simply want to protect against concurrent modifications to
@@ -541,13 +541,13 @@ bool MongoInterfaceStandalone::uniqueKeyIsSupportedByIndex(
auto db = databaseHolder->getDb(opCtx, nss.db());
auto collection = db ? db->getCollection(opCtx, nss) : nullptr;
if (!collection) {
- return uniqueKeyPaths == std::set<FieldPath>{"_id"};
+ return fieldPaths == std::set<FieldPath>{"_id"};
}
auto indexIterator = collection->getIndexCatalog()->getIndexIterator(opCtx, false);
while (indexIterator->more()) {
const IndexCatalogEntry* entry = indexIterator->next();
- if (supportsUniqueKey(expCtx, entry, uniqueKeyPaths)) {
+ if (supportsUniqueKey(expCtx, entry, fieldPaths)) {
return true;
}
}
@@ -635,4 +635,34 @@ std::unique_ptr<ResourceYielder> MongoInterfaceStandalone::getResourceYielder()
return std::make_unique<MongoDResourceYielder>();
}
+
+std::pair<std::set<FieldPath>, boost::optional<ChunkVersion>>
+MongoInterfaceStandalone::ensureFieldsUniqueOrResolveDocumentKey(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ boost::optional<std::vector<std::string>> fields,
+ boost::optional<ChunkVersion> targetCollectionVersion,
+ const NamespaceString& outputNs) const {
+ if (targetCollectionVersion) {
+ uassert(51123, "Unexpected target chunk version specified", expCtx->fromMongos);
+ // If mongos has sent us a target shard version, we need to be sure we are prepared to
+ // act as a router which is at least as recent as that mongos.
+ checkRoutingInfoEpochOrThrow(expCtx, outputNs, *targetCollectionVersion);
+ }
+
+ if (!fields) {
+ uassert(51124, "Expected fields to be provided from mongos", !expCtx->fromMongos);
+ return {std::set<FieldPath>{"_id"}, targetCollectionVersion};
+ }
+
+ // Make sure the 'fields' array has a supporting index. Skip this check if the command is sent
+ // from mongos since the 'fields' check would've happened already.
+ auto fieldPaths = _convertToFieldPaths(*fields);
+ if (!expCtx->fromMongos) {
+ uassert(51183,
+ "Cannot find index to verify that join fields will be unique",
+ fieldsHaveSupportingUniqueIndex(expCtx, outputNs, fieldPaths));
+ }
+ return {fieldPaths, targetCollectionVersion};
+}
+
} // namespace mongo
diff --git a/src/mongo/db/pipeline/process_interface_standalone.h b/src/mongo/db/pipeline/process_interface_standalone.h
index 9c9fc301c18..e08526d99db 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.h
+++ b/src/mongo/db/pipeline/process_interface_standalone.h
@@ -127,9 +127,9 @@ public:
const NamespaceString&,
const MatchExpression*) const final;
- bool uniqueKeyIsSupportedByIndex(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& nss,
- const std::set<FieldPath>& uniqueKeyPaths) const final;
+ bool fieldsHaveSupportingUniqueIndex(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& nss,
+ const std::set<FieldPath>& fieldPaths) const;
virtual void checkRoutingInfoEpochOrThrow(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& nss,
@@ -139,6 +139,12 @@ public:
std::unique_ptr<ResourceYielder> getResourceYielder() const override;
+ std::pair<std::set<FieldPath>, boost::optional<ChunkVersion>>
+ ensureFieldsUniqueOrResolveDocumentKey(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ boost::optional<std::vector<std::string>> fields,
+ boost::optional<ChunkVersion> targetCollectionVersion,
+ const NamespaceString& outputNs) const override;
+
protected:
BSONObj _reportCurrentOpForClient(OperationContext* opCtx,
Client* client,
diff --git a/src/mongo/db/pipeline/process_interface_standalone_test.cpp b/src/mongo/db/pipeline/process_interface_standalone_test.cpp
new file mode 100644
index 00000000000..fa246fc2e9d
--- /dev/null
+++ b/src/mongo/db/pipeline/process_interface_standalone_test.cpp
@@ -0,0 +1,130 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/aggregation_context_fixture.h"
+#include "mongo/db/pipeline/process_interface_standalone.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+class MongoProcessInterfaceForTest : public MongoInterfaceStandalone {
+public:
+ using MongoInterfaceStandalone::MongoInterfaceStandalone;
+
+ bool fieldsHaveSupportingUniqueIndex(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& nss,
+ const std::set<FieldPath>& fields) const override {
+ return hasSupportingIndexForFields;
+ }
+
+ void checkRoutingInfoEpochOrThrow(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString&,
+ ChunkVersion) const override {
+ // Assume it always matches for our tests here.
+ return;
+ }
+
+ bool hasSupportingIndexForFields{true};
+};
+
+class ProcessInterfaceStandaloneTest : public AggregationContextFixture {
+public:
+ auto makeProcessInterface() {
+ return std::make_unique<MongoProcessInterfaceForTest>(getExpCtx()->opCtx);
+ }
+};
+
+TEST_F(ProcessInterfaceStandaloneTest, FailsToEnsureFieldsUniqueIfFieldsHaveDuplicates) {
+ auto expCtx = getExpCtx();
+ auto targetCollectionVersion = boost::none;
+ auto processInterface = makeProcessInterface();
+
+ ASSERT_THROWS_CODE(processInterface->ensureFieldsUniqueOrResolveDocumentKey(
+ expCtx, {{"_id", "_id"}}, targetCollectionVersion, expCtx->ns),
+ AssertionException,
+ ErrorCodes::BadValue);
+ ASSERT_THROWS_CODE(processInterface->ensureFieldsUniqueOrResolveDocumentKey(
+ expCtx, {{"x", "y", "x"}}, targetCollectionVersion, expCtx->ns),
+ AssertionException,
+ ErrorCodes::BadValue);
+}
+
+TEST_F(ProcessInterfaceStandaloneTest,
+ FailsToEnsureFieldsUniqueIfTargetCollectionVersionIsSpecifiedOnMongos) {
+ auto expCtx = getExpCtx();
+ auto targetCollectionVersion = boost::make_optional(ChunkVersion(0, 0, OID::gen()));
+ auto processInterface = makeProcessInterface();
+
+ // Test that 'targetCollectionVersion' is not accepted if not from mongos.
+ expCtx->fromMongos = false;
+ ASSERT_THROWS_CODE(processInterface->ensureFieldsUniqueOrResolveDocumentKey(
+ expCtx, {{"_id"}}, targetCollectionVersion, expCtx->ns),
+ AssertionException,
+ 51123);
+
+ // Test that 'targetCollectionVersion' is accepted if from mongos.
+ expCtx->fromMongos = true;
+ auto[joinKey, chunkVersion] = processInterface->ensureFieldsUniqueOrResolveDocumentKey(
+ expCtx, {{"_id"}}, targetCollectionVersion, expCtx->ns);
+ ASSERT_EQ(joinKey.size(), 1UL);
+ ASSERT_EQ(joinKey.count(FieldPath("_id")), 1UL);
+ ASSERT(chunkVersion);
+ ASSERT_EQ(*chunkVersion, *targetCollectionVersion);
+}
+
+TEST_F(ProcessInterfaceStandaloneTest, FailsToEnsureFieldsUniqueIfJoinFieldsAreNotSentFromMongos) {
+ auto expCtx = getExpCtx();
+ auto targetCollectionVersion = boost::make_optional(ChunkVersion(0, 0, OID::gen()));
+ auto processInterface = makeProcessInterface();
+
+ expCtx->fromMongos = true;
+ ASSERT_THROWS_CODE(processInterface->ensureFieldsUniqueOrResolveDocumentKey(
+ expCtx, boost::none, targetCollectionVersion, expCtx->ns),
+ AssertionException,
+ 51124);
+}
+
+TEST_F(ProcessInterfaceStandaloneTest,
+ FailsToEnsureFieldsUniqueIfFieldsDoesNotHaveSupportingUniqueIndex) {
+ auto expCtx = getExpCtx();
+ auto targetCollectionVersion = boost::none;
+ auto processInterface = makeProcessInterface();
+
+ expCtx->fromMongos = false;
+ processInterface->hasSupportingIndexForFields = false;
+ ASSERT_THROWS_CODE(processInterface->ensureFieldsUniqueOrResolveDocumentKey(
+ expCtx, {{"x"}}, targetCollectionVersion, expCtx->ns),
+ AssertionException,
+ 51183);
+}
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h
index 77f76eb5c9b..f97f39ac2f5 100644
--- a/src/mongo/db/pipeline/stub_mongo_process_interface.h
+++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h
@@ -202,9 +202,9 @@ public:
MONGO_UNREACHABLE;
}
- bool uniqueKeyIsSupportedByIndex(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& nss,
- const std::set<FieldPath>& uniqueKeyPaths) const override {
+ bool fieldsHaveSupportingUniqueIndex(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& nss,
+ const std::set<FieldPath>& fieldPaths) const override {
return true;
}
@@ -223,5 +223,21 @@ public:
std::unique_ptr<ResourceYielder> getResourceYielder() const override {
return nullptr;
}
+
+ std::pair<std::set<FieldPath>, boost::optional<ChunkVersion>>
+ ensureFieldsUniqueOrResolveDocumentKey(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ boost::optional<std::vector<std::string>> fields,
+ boost::optional<ChunkVersion> targetCollectionVersion,
+ const NamespaceString& outputNs) const override {
+ if (!fields) {
+ return {std::set<FieldPath>{"_id"}, targetCollectionVersion};
+ }
+
+ std::set<FieldPath> fieldPaths;
+ for (const auto& field : *fields) {
+ fieldPaths.insert(FieldPath(field));
+ }
+ return {fieldPaths, targetCollectionVersion};
+ }
};
} // namespace mongo
diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp
index 200f83ebc26..165140819ed 100644
--- a/src/mongo/s/query/cluster_aggregation_planner.cpp
+++ b/src/mongo/s/query/cluster_aggregation_planner.cpp
@@ -290,7 +290,7 @@ ClusterClientCursorGuard convertPipelineToRouterStages(
bool stageCanRunInParallel(const boost::intrusive_ptr<DocumentSource>& stage,
const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) {
if (stage->distributedPlanLogic()) {
- return stage->canRunInParallelBeforeOut(nameOfShardKeyFieldsUponEntryToStage);
+ return stage->canRunInParallelBeforeWriteStage(nameOfShardKeyFieldsUponEntryToStage);
} else {
// This stage is fine to execute in parallel on each stream. For example, a $match can be
// applied to each stream in parallel.
diff --git a/src/mongo/s/query/cluster_aggregation_planner_test.cpp b/src/mongo/s/query/cluster_aggregation_planner_test.cpp
index a6a81e7a5d4..a51ba59007f 100644
--- a/src/mongo/s/query/cluster_aggregation_planner_test.cpp
+++ b/src/mongo/s/query/cluster_aggregation_planner_test.cpp
@@ -34,8 +34,8 @@
#include "mongo/client/remote_command_targeter_mock.h"
#include "mongo/db/pipeline/document_source_group.h"
#include "mongo/db/pipeline/document_source_match.h"
+#include "mongo/db/pipeline/document_source_merge.h"
#include "mongo/db/pipeline/document_source_out.h"
-#include "mongo/db/pipeline/document_source_out_gen.h"
#include "mongo/db/pipeline/document_source_project.h"
#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/expression_context_for_test.h"
@@ -49,6 +49,9 @@
namespace mongo {
namespace {
+using MergeStrategyDescriptor = DocumentSourceMerge::MergeStrategyDescriptor;
+using WhenMatched = MergeStrategyDescriptor::WhenMatched;
+using WhenNotMatched = MergeStrategyDescriptor::WhenNotMatched;
const NamespaceString kTestAggregateNss = NamespaceString{"unittests", "cluster_exchange"};
const NamespaceString kTestOutNss = NamespaceString{"unittests", "out_ns"};
@@ -118,8 +121,12 @@ public:
future.default_timed_get().get();
}
-private:
+protected:
boost::intrusive_ptr<ExpressionContext> _expCtx;
+ boost::optional<BSONObj> _mergeLetVariables;
+ boost::optional<std::vector<BSONObj>> _mergePipeline;
+ std::set<FieldPath> _mergeOnFields{"_id"};
+ boost::optional<ChunkVersion> _mergeTargetCollectionVersion;
};
TEST_F(ClusterExchangeTest, ShouldNotExchangeIfPipelineDoesNotEndWithOut) {
@@ -134,27 +141,32 @@ TEST_F(ClusterExchangeTest, ShouldNotExchangeIfPipelineDoesNotEndWithOut) {
mergePipe.get()));
}
-TEST_F(ClusterExchangeTest, ShouldNotExchangeIfPipelineEndsWithReplaceCollectionOut) {
+TEST_F(ClusterExchangeTest, ShouldNotExchangeIfPipelineEndsWithOut) {
setupNShards(2);
- // For this test pretend 'kTestOutNss' is not sharded so that we can use a "replaceCollection"
- // $out.
+ // For this test pretend 'kTestOutNss' is not sharded so that we can use $out.
const auto originalMongoProcessInterface = expCtx()->mongoProcessInterface;
expCtx()->mongoProcessInterface = std::make_shared<StubMongoProcessInterface>();
ON_BLOCK_EXIT([&]() { expCtx()->mongoProcessInterface = originalMongoProcessInterface; });
- auto mergePipe = unittest::assertGet(Pipeline::create(
- {DocumentSourceOut::create(kTestOutNss, expCtx(), WriteModeEnum::kModeReplaceCollection)},
- expCtx()));
+ auto mergePipe = unittest::assertGet(
+ Pipeline::create({DocumentSourceOut::create(kTestOutNss, expCtx())}, expCtx()));
ASSERT_FALSE(cluster_aggregation_planner::checkIfEligibleForExchange(operationContext(),
mergePipe.get()));
}
-TEST_F(ClusterExchangeTest, SingleOutStageNotEligibleForExchangeIfOutputDatabaseDoesNotExist) {
+TEST_F(ClusterExchangeTest, SingleMergeStageNotEligibleForExchangeIfOutputDatabaseDoesNotExist) {
setupNShards(2);
- auto mergePipe = unittest::assertGet(Pipeline::create(
- {DocumentSourceOut::create(kTestOutNss, expCtx(), WriteModeEnum::kModeInsertDocuments)},
- expCtx()));
+ auto mergePipe = unittest::assertGet(
+ Pipeline::create({DocumentSourceMerge::create(kTestOutNss,
+ expCtx(),
+ WhenMatched::kFail,
+ WhenNotMatched::kInsert,
+ _mergeLetVariables,
+ _mergePipeline,
+ _mergeOnFields,
+ _mergeTargetCollectionVersion)},
+ expCtx()));
auto future = launchAsync([&] {
ASSERT_THROWS_CODE(cluster_aggregation_planner::checkIfEligibleForExchange(
@@ -171,12 +183,20 @@ TEST_F(ClusterExchangeTest, SingleOutStageNotEligibleForExchangeIfOutputDatabase
}
// If the output collection doesn't exist, we don't know how to distribute the output documents so
-// cannot insert an $exchange. The $out stage should later create a new, unsharded collection.
-TEST_F(ClusterExchangeTest, SingleOutStageNotEligibleForExchangeIfOutputCollectionDoesNotExist) {
+// cannot insert an $exchange. The $merge stage should later create a new, unsharded collection.
+TEST_F(ClusterExchangeTest, SingleMergeStageNotEligibleForExchangeIfOutputCollectionDoesNotExist) {
setupNShards(2);
- auto mergePipe = unittest::assertGet(Pipeline::create(
- {DocumentSourceOut::create(kTestOutNss, expCtx(), WriteModeEnum::kModeInsertDocuments)},
- expCtx()));
+ auto mergePipe = unittest::assertGet(
+ Pipeline::create({DocumentSourceMerge::create(kTestOutNss,
+ expCtx(),
+ WhenMatched::kFail,
+ WhenNotMatched::kInsert,
+ _mergeLetVariables,
+ _mergePipeline,
+ _mergeOnFields,
+ _mergeTargetCollectionVersion)},
+
+ expCtx()));
auto future = launchAsync([&] {
ASSERT_FALSE(cluster_aggregation_planner::checkIfEligibleForExchange(operationContext(),
@@ -191,15 +211,22 @@ TEST_F(ClusterExchangeTest, SingleOutStageNotEligibleForExchangeIfOutputCollecti
}
// A $limit stage requires a single merger.
-TEST_F(ClusterExchangeTest, LimitFollowedByOutStageIsNotEligibleForExchange) {
+TEST_F(ClusterExchangeTest, LimitFollowedByMergeStageIsNotEligibleForExchange) {
// Sharded by {_id: 1}, [MinKey, 0) on shard "0", [0, MaxKey) on shard "1".
setupNShards(2);
loadRoutingTableWithTwoChunksAndTwoShards(kTestOutNss);
- auto mergePipe = unittest::assertGet(Pipeline::create(
- {DocumentSourceLimit::create(expCtx(), 6),
- DocumentSourceOut::create(kTestOutNss, expCtx(), WriteModeEnum::kModeInsertDocuments)},
- expCtx()));
+ auto mergePipe = unittest::assertGet(
+ Pipeline::create({DocumentSourceLimit::create(expCtx(), 6),
+ DocumentSourceMerge::create(kTestOutNss,
+ expCtx(),
+ WhenMatched::kFail,
+ WhenNotMatched::kInsert,
+ _mergeLetVariables,
+ _mergePipeline,
+ _mergeOnFields,
+ _mergeTargetCollectionVersion)},
+ expCtx()));
auto future = launchAsync([&] {
ASSERT_FALSE(cluster_aggregation_planner::checkIfEligibleForExchange(operationContext(),
@@ -209,15 +236,22 @@ TEST_F(ClusterExchangeTest, LimitFollowedByOutStageIsNotEligibleForExchange) {
future.default_timed_get();
}
-TEST_F(ClusterExchangeTest, GroupFollowedByOutIsEligbleForExchange) {
+TEST_F(ClusterExchangeTest, GroupFollowedByMergeIsEligbleForExchange) {
// Sharded by {_id: 1}, [MinKey, 0) on shard "0", [0, MaxKey) on shard "1".
setupNShards(2);
loadRoutingTableWithTwoChunksAndTwoShards(kTestOutNss);
- auto mergePipe = unittest::assertGet(Pipeline::create(
- {parse("{$group: {_id: '$x', $doingMerge: true}}"),
- DocumentSourceOut::create(kTestOutNss, expCtx(), WriteModeEnum::kModeInsertDocuments)},
- expCtx()));
+ auto mergePipe = unittest::assertGet(
+ Pipeline::create({parse("{$group: {_id: '$x', $doingMerge: true}}"),
+ DocumentSourceMerge::create(kTestOutNss,
+ expCtx(),
+ WhenMatched::kFail,
+ WhenNotMatched::kInsert,
+ _mergeLetVariables,
+ _mergePipeline,
+ _mergeOnFields,
+ _mergeTargetCollectionVersion)},
+ expCtx()));
auto future = launchAsync([&] {
auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange(
@@ -242,12 +276,19 @@ TEST_F(ClusterExchangeTest, RenamesAreEligibleForExchange) {
setupNShards(2);
loadRoutingTableWithTwoChunksAndTwoShards(kTestOutNss);
- auto mergePipe = unittest::assertGet(Pipeline::create(
- {parse("{$group: {_id: '$x', $doingMerge: true}}"),
- parse("{$project: {temporarily_renamed: '$_id'}}"),
- parse("{$project: {_id: '$temporarily_renamed'}}"),
- DocumentSourceOut::create(kTestOutNss, expCtx(), WriteModeEnum::kModeInsertDocuments)},
- expCtx()));
+ auto mergePipe = unittest::assertGet(
+ Pipeline::create({parse("{$group: {_id: '$x', $doingMerge: true}}"),
+ parse("{$project: {temporarily_renamed: '$_id'}}"),
+ parse("{$project: {_id: '$temporarily_renamed'}}"),
+ DocumentSourceMerge::create(kTestOutNss,
+ expCtx(),
+ WhenMatched::kFail,
+ WhenNotMatched::kInsert,
+ _mergeLetVariables,
+ _mergePipeline,
+ _mergeOnFields,
+ _mergeTargetCollectionVersion)},
+ expCtx()));
auto future = launchAsync([&] {
auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange(
@@ -276,11 +317,18 @@ TEST_F(ClusterExchangeTest, MatchesAreEligibleForExchange) {
setupNShards(2);
loadRoutingTableWithTwoChunksAndTwoShards(kTestOutNss);
- auto mergePipe = unittest::assertGet(Pipeline::create(
- {parse("{$group: {_id: '$x', $doingMerge: true}}"),
- parse("{$match: {_id: {$gte: 0}}}"),
- DocumentSourceOut::create(kTestOutNss, expCtx(), WriteModeEnum::kModeInsertDocuments)},
- expCtx()));
+ auto mergePipe = unittest::assertGet(
+ Pipeline::create({parse("{$group: {_id: '$x', $doingMerge: true}}"),
+ parse("{$match: {_id: {$gte: 0}}}"),
+ DocumentSourceMerge::create(kTestOutNss,
+ expCtx(),
+ WhenMatched::kFail,
+ WhenNotMatched::kInsert,
+ _mergeLetVariables,
+ _mergePipeline,
+ _mergeOnFields,
+ _mergeTargetCollectionVersion)},
+ expCtx()));
auto future = launchAsync([&] {
auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange(
@@ -315,10 +363,17 @@ TEST_F(ClusterExchangeTest, SortThenGroupIsEligibleForExchange) {
// {$out: {to: "sharded_by_id", mode: "replaceDocuments"}}].
// No $sort stage appears in the merging half since we'd expect that to be absorbed by the
// $mergeCursors and AsyncResultsMerger.
- auto mergePipe = unittest::assertGet(Pipeline::create(
- {parse("{$group: {_id: '$x'}}"),
- DocumentSourceOut::create(kTestOutNss, expCtx(), WriteModeEnum::kModeInsertDocuments)},
- expCtx()));
+ auto mergePipe = unittest::assertGet(
+ Pipeline::create({parse("{$group: {_id: '$x'}}"),
+ DocumentSourceMerge::create(kTestOutNss,
+ expCtx(),
+ WhenMatched::kFail,
+ WhenNotMatched::kInsert,
+ _mergeLetVariables,
+ _mergePipeline,
+ _mergeOnFields,
+ _mergeTargetCollectionVersion)},
+ expCtx()));
auto future = launchAsync([&] {
auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange(
@@ -350,13 +405,20 @@ TEST_F(ClusterExchangeTest, SortThenGroupIsEligibleForExchangeHash) {
// This would be the merging half of the pipeline if the original pipeline was
// [{$sort: {x: 1}},
// {$group: {_id: "$x"}},
- // {$out: {to: "sharded_by_id", mode: "replaceDocuments"}}].
+ // {$merge: {into: "sharded_by_id", whenMatched: "fail", whenNotMatched: "insert"}}].
// No $sort stage appears in the merging half since we'd expect that to be absorbed by the
// $mergeCursors and AsyncResultsMerger.
- auto mergePipe = unittest::assertGet(Pipeline::create(
- {parse("{$group: {_id: '$x'}}"),
- DocumentSourceOut::create(kTestOutNss, expCtx(), WriteModeEnum::kModeInsertDocuments)},
- expCtx()));
+ auto mergePipe = unittest::assertGet(
+ Pipeline::create({parse("{$group: {_id: '$x'}}"),
+ DocumentSourceMerge::create(kTestOutNss,
+ expCtx(),
+ WhenMatched::kFail,
+ WhenNotMatched::kInsert,
+ _mergeLetVariables,
+ _mergePipeline,
+ _mergeOnFields,
+ _mergeTargetCollectionVersion)},
+ expCtx()));
auto future = launchAsync([&] {
auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange(
@@ -395,7 +457,14 @@ TEST_F(ClusterExchangeTest, ProjectThroughDottedFieldDoesNotPreserveShardKey) {
"}}"),
parse(
"{$project: {_id: '$_id.country', region: '$_id.region', population: 1, cities: 1}}"),
- DocumentSourceOut::create(kTestOutNss, expCtx(), WriteModeEnum::kModeInsertDocuments)},
+ DocumentSourceMerge::create(kTestOutNss,
+ expCtx(),
+ WhenMatched::kFail,
+ WhenNotMatched::kInsert,
+ _mergeLetVariables,
+ _mergePipeline,
+ _mergeOnFields,
+ _mergeTargetCollectionVersion)},
expCtx()));
auto future = launchAsync([&] {
@@ -418,14 +487,21 @@ TEST_F(ClusterExchangeTest, WordCountUseCaseExample) {
// As an example of a pipeline that might replace a map reduce, imagine that we are performing a
// word count, and the shards part of the pipeline tokenized some text field of each document
// into {word: <token>, count: 1}. Then this is the merging half of the pipeline:
- auto mergePipe = unittest::assertGet(Pipeline::create(
- {parse("{$group: {"
- " _id: '$word',"
- " count: {$sum: 1},"
- " $doingMerge: true"
- "}}"),
- DocumentSourceOut::create(kTestOutNss, expCtx(), WriteModeEnum::kModeInsertDocuments)},
- expCtx()));
+ auto mergePipe = unittest::assertGet(
+ Pipeline::create({parse("{$group: {"
+ " _id: '$word',"
+ " count: {$sum: 1},"
+ " $doingMerge: true"
+ "}}"),
+ DocumentSourceMerge::create(kTestOutNss,
+ expCtx(),
+ WhenMatched::kFail,
+ WhenNotMatched::kInsert,
+ _mergeLetVariables,
+ _mergePipeline,
+ _mergeOnFields,
+ _mergeTargetCollectionVersion)},
+ expCtx()));
auto future = launchAsync([&] {
auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange(
@@ -475,15 +551,22 @@ TEST_F(ClusterExchangeTest, WordCountUseCaseExampleShardedByWord) {
// As an example of a pipeline that might replace a map reduce, imagine that we are performing a
// word count, and the shards part of the pipeline tokenized some text field of each document
// into {word: <token>, count: 1}. Then this is the merging half of the pipeline:
- auto mergePipe = unittest::assertGet(Pipeline::create(
- {parse("{$group: {"
- " _id: '$word',"
- " count: {$sum: 1},"
- " $doingMerge: true"
- "}}"),
- parse("{$project: {word: '$_id', count: 1}}"),
- DocumentSourceOut::create(kTestOutNss, expCtx(), WriteModeEnum::kModeInsertDocuments)},
- expCtx()));
+ auto mergePipe = unittest::assertGet(
+ Pipeline::create({parse("{$group: {"
+ " _id: '$word',"
+ " count: {$sum: 1},"
+ " $doingMerge: true"
+ "}}"),
+ parse("{$project: {word: '$_id', count: 1}}"),
+ DocumentSourceMerge::create(kTestOutNss,
+ expCtx(),
+ WhenMatched::kFail,
+ WhenNotMatched::kInsert,
+ _mergeLetVariables,
+ _mergePipeline,
+ _mergeOnFields,
+ _mergeTargetCollectionVersion)},
+ expCtx()));
auto future = launchAsync([&] {
auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange(
@@ -549,14 +632,21 @@ TEST_F(ClusterExchangeTest, CompoundShardKeyThreeShards) {
loadRoutingTable(kTestOutNss, epoch, shardKey, chunks);
- auto mergePipe = unittest::assertGet(Pipeline::create(
- {parse("{$group: {"
- " _id: '$x',"
- " $doingMerge: true"
- "}}"),
- parse("{$project: {x: '$_id', y: '$_id'}}"),
- DocumentSourceOut::create(kTestOutNss, expCtx(), WriteModeEnum::kModeInsertDocuments)},
- expCtx()));
+ auto mergePipe = unittest::assertGet(
+ Pipeline::create({parse("{$group: {"
+ " _id: '$x',"
+ " $doingMerge: true"
+ "}}"),
+ parse("{$project: {x: '$_id', y: '$_id'}}"),
+ DocumentSourceMerge::create(kTestOutNss,
+ expCtx(),
+ WhenMatched::kFail,
+ WhenNotMatched::kInsert,
+ _mergeLetVariables,
+ _mergePipeline,
+ _mergeOnFields,
+ _mergeTargetCollectionVersion)},
+ expCtx()));
auto future = launchAsync([&] {
auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange(