diff options
Diffstat (limited to 'src/mongo/db')
27 files changed, 964 insertions, 1631 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index a778d314936..dac668b3ae3 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -398,7 +398,6 @@ pipelineeEnv.Library( 'document_source_match.cpp', 'document_source_merge.cpp', 'document_source_out.cpp', - 'document_source_out_replace_coll.cpp', 'document_source_plan_cache_stats.cpp', 'document_source_project.cpp', 'document_source_queue.cpp', @@ -612,7 +611,6 @@ env.Library( env.Idlc('document_source_list_sessions.idl')[0], env.Idlc('document_source_merge.idl')[0], env.Idlc('document_source_merge_modes.idl')[0], - env.Idlc('document_source_out.idl')[0], env.Idlc('document_source_replace_root.idl')[0], env.Idlc('exchange_spec.idl')[0], env.Idlc('value.idl')[0], @@ -655,3 +653,19 @@ env.CppUnitTest( 'expression', ], ) + +env.CppUnitTest( + target='process_interface_test', + source=[ + 'mongos_process_interface_test.cpp', + 'process_interface_standalone_test.cpp' + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/db/service_context_test_fixture', + 'mongo_process_common', + 'mongo_process_interface', + 'mongos_process_interface', + 'process_interface_shardsvr', + 'process_interface_standalone' + ], + ) diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index 5e8244e7984..49689de85dd 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -461,10 +461,11 @@ public: /** * Returns true if it would be correct to execute this stage in parallel across the shards in - * cases where the final stage is an $out. For example, a $group stage which is just merging the - * groups from the shards can be run in parallel since it will preserve the shard key. + * cases where the final stage is a stage which can perform a write operation, such as $merge. + * For example, a $group stage which is just merging the groups from the shards can be run in + * parallel since it will preserve the shard key. */ - virtual bool canRunInParallelBeforeOut( + virtual bool canRunInParallelBeforeWriteStage( const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const { return false; } diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp index c2d2c0d7509..d8ce48bca13 100644 --- a/src/mongo/db/pipeline/document_source_group.cpp +++ b/src/mongo/db/pipeline/document_source_group.cpp @@ -701,7 +701,7 @@ bool DocumentSourceGroup::pathIncludedInGroupKeys(const std::string& dottedPath) }); } -bool DocumentSourceGroup::canRunInParallelBeforeOut( +bool DocumentSourceGroup::canRunInParallelBeforeWriteStage( const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const { if (_doingMerge) { return true; // This is fine. diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h index 8324fc2b7f5..29a46df1f9f 100644 --- a/src/mongo/db/pipeline/document_source_group.h +++ b/src/mongo/db/pipeline/document_source_group.h @@ -158,7 +158,7 @@ public: bool usedDisk() final; boost::optional<DistributedPlanLogic> distributedPlanLogic() final; - bool canRunInParallelBeforeOut( + bool canRunInParallelBeforeWriteStage( const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const final; /** diff --git a/src/mongo/db/pipeline/document_source_merge.cpp b/src/mongo/db/pipeline/document_source_merge.cpp index 3578d0117d9..988d3e242f8 100644 --- a/src/mongo/db/pipeline/document_source_merge.cpp +++ b/src/mongo/db/pipeline/document_source_merge.cpp @@ -39,7 +39,6 @@ #include "mongo/db/curop_failpoint_helpers.h" #include "mongo/db/ops/write_ops.h" #include "mongo/db/pipeline/document_path_support.h" -#include "mongo/db/pipeline/document_source_out.h" #include "mongo/util/log.h" namespace mongo { @@ -227,23 +226,6 @@ bool isSupportedMergeMode(WhenMatched whenMatched, WhenNotMatched whenNotMatched } /** - * Parses the fields of the $merge 'on' from the user-specified 'fields', returning a set of field - * paths. Throws if 'fields' contains duplicate elements. - */ -std::set<FieldPath> parseMergeOnFieldsFromSpec(const std::vector<std::string>& fields) { - std::set<FieldPath> mergeOnFields; - - for (const auto& field : fields) { - const auto res = mergeOnFields.insert(FieldPath(field)); - uassert(ErrorCodes::BadValue, - "Found a duplicate field '{}' in {} 'on'"_format(field, kStageName), - res.second); - } - - return mergeOnFields; -} - -/** * Extracts the fields of $merge 'on' from 'doc' and returns the key as a BSONObj. Throws if any * field of the 'on' extracted from 'doc' is nullish or an array. */ @@ -265,93 +247,6 @@ BSONObj extractMergeOnFieldsFromDoc(const Document& doc, const std::set<FieldPat } /** - * Extracts $merge 'on' fields from the $merge spec when the pipeline is executed on mongoD, or use - * a default _id field if the user hasn't supplied the 'on' field. For the user supplied field - * ensures that it can be used to uniquely identify documents for merge. - */ -std::pair<std::set<FieldPath>, boost::optional<ChunkVersion>> resolveMergeOnFieldsOnMongoD( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const DocumentSourceMergeSpec& spec, - const NamespaceString& outputNs) { - invariant(!expCtx->inMongos); - auto targetCollectionVersion = spec.getTargetCollectionVersion(); - if (targetCollectionVersion) { - uassert(51123, "Unexpected target chunk version specified", expCtx->fromMongos); - // If mongos has sent us a target shard version, we need to be sure we are prepared to - // act as a router which is at least as recent as that mongos. - expCtx->mongoProcessInterface->checkRoutingInfoEpochOrThrow( - expCtx, outputNs, *targetCollectionVersion); - } - - auto userSpecifiedMergeOnFields = spec.getOn(); - if (!userSpecifiedMergeOnFields) { - uassert(51124, "Expected 'on' field to be provided from mongos", !expCtx->fromMongos); - return {std::set<FieldPath>{"_id"}, targetCollectionVersion}; - } - - // Make sure the 'on' field has a supporting index. Skip this check if the command is sent - // from mongos since the 'on' field check would've happened already. - auto mergeOnFields = parseMergeOnFieldsFromSpec(*userSpecifiedMergeOnFields); - if (!expCtx->fromMongos) { - uassert(51183, - "Cannot find index to verify that 'on' fields will be unique", - expCtx->mongoProcessInterface->uniqueKeyIsSupportedByIndex( - expCtx, outputNs, mergeOnFields)); - } - return {mergeOnFields, targetCollectionVersion}; -} - -/** - * Extracts $merge 'on' fields from the $merge spec when the pipeline is executed on mongoS. If the - * user supplied the 'on' field, ensures that it can be used to uniquely identify documents for - * merge. Otherwise, extracts the shard key and use it as the 'on' field. - */ -std::pair<std::set<FieldPath>, boost::optional<ChunkVersion>> resolveMergeOnFieldsOnMongoS( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const DocumentSourceMergeSpec& spec, - const NamespaceString& outputNs) { - invariant(expCtx->inMongos); - uassert(51179, - "{} received unexpected 'targetCollectionVersion' on mongos"_format(kStageName), - !spec.getTargetCollectionVersion()); - - if (auto userSpecifiedMergeOnFields = spec.getOn()) { - // Convert 'on' array to a vector of FieldPaths. - auto mergeOnFields = parseMergeOnFieldsFromSpec(*userSpecifiedMergeOnFields); - uassert(51190, - "Cannot find index to verify that 'on' fields will be unique", - expCtx->mongoProcessInterface->uniqueKeyIsSupportedByIndex( - expCtx, outputNs, mergeOnFields)); - - // If the user supplies the 'on' field we don't need to attach a ChunkVersion for the shards - // since we are not at risk of 'guessing' the wrong shard key. - return {mergeOnFields, boost::none}; - } - - // In case there are multiple shards which will perform this stage in parallel, we need to - // figure out and attach the collection's shard version to ensure each shard is talking about - // the same version of the collection. This mongos will coordinate that. We force a catalog - // refresh to do so because there is no shard versioning protocol on this namespace and so we - // otherwise could not be sure this node is (or will become) at all recent. We will also - // figure out and attach the 'on' field to send to the shards. - - // There are cases where the aggregation could fail if the collection is dropped or re-created - // during or near the time of the aggregation. This is okay - we are mostly paranoid that this - // mongos is very stale and want to prevent returning an error if the collection was dropped a - // long time ago. Because of this, we are okay with piggy-backing off another thread's request - // to refresh the cache, simply waiting for that request to return instead of forcing another - // refresh. - boost::optional<ChunkVersion> targetCollectionVersion = - expCtx->mongoProcessInterface->refreshAndGetCollectionVersion(expCtx, outputNs); - - auto docKeyPaths = expCtx->mongoProcessInterface->collectDocumentKeyFieldsActingAsRouter( - expCtx->opCtx, outputNs); - return {std::set<FieldPath>(std::make_move_iterator(docKeyPaths.begin()), - std::make_move_iterator(docKeyPaths.end())), - targetCollectionVersion}; -} - -/** * Parses a $merge stage specification and resolves the target database name and collection name. * The $merge specification can be either a string or an object. If the target database name is not * explicitly specified, it will be defaulted to 'defaultDb'. @@ -432,18 +327,13 @@ DocumentSourceMerge::DocumentSourceMerge(NamespaceString outputNs, boost::optional<BSONObj> letVariables, boost::optional<std::vector<BSONObj>> pipeline, std::set<FieldPath> mergeOnFields, - boost::optional<ChunkVersion> targetCollectionVersion, - bool serializeAsOutStage) - : DocumentSource(expCtx), - _writeConcern(expCtx->opCtx->getWriteConcern()), - _outputNs(std::move(outputNs)), + boost::optional<ChunkVersion> targetCollectionVersion) + : DocumentSourceWriter(std::move(outputNs), expCtx), _targetCollectionVersion(targetCollectionVersion), - _done(false), _descriptor(descriptor), _pipeline(std::move(pipeline)), _mergeOnFields(std::move(mergeOnFields)), - _mergeOnFieldsIncludesId(_mergeOnFields.count("_id") == 1), - _serializeAsOutStage(serializeAsOutStage) { + _mergeOnFieldsIncludesId(_mergeOnFields.count("_id") == 1) { if (letVariables) { _letVariables.emplace(); @@ -466,9 +356,7 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceMerge::create( boost::optional<BSONObj> letVariables, boost::optional<std::vector<BSONObj>> pipeline, std::set<FieldPath> mergeOnFields, - boost::optional<ChunkVersion> targetCollectionVersion, - bool serializeAsOutStage) { - + boost::optional<ChunkVersion> targetCollectionVersion) { uassert(51189, "Combination of {} modes 'whenMatched: {}' and 'whenNotMatched: {}' " "is not supported"_format(kStageName, @@ -515,8 +403,7 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceMerge::create( std::move(letVariables), std::move(pipeline), std::move(mergeOnFields), - targetCollectionVersion, - serializeAsOutStage); + targetCollectionVersion); } boost::intrusive_ptr<DocumentSource> DocumentSourceMerge::createFromBson( @@ -531,10 +418,9 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceMerge::createFromBson( mergeSpec.getWhenMatched() ? mergeSpec.getWhenMatched()->mode : kDefaultWhenMatched; auto whenNotMatched = mergeSpec.getWhenNotMatched().value_or(kDefaultWhenNotMatched); auto pipeline = mergeSpec.getWhenMatched() ? mergeSpec.getWhenMatched()->pipeline : boost::none; - // TODO SERVER-40432: move resolveMergeOnFieldsOnMongo* into MongoProcessInterface. - auto[mergeOnFields, targetCollectionVersion] = expCtx->inMongos - ? resolveMergeOnFieldsOnMongoS(expCtx, mergeSpec, targetNss) - : resolveMergeOnFieldsOnMongoD(expCtx, mergeSpec, targetNss); + auto[mergeOnFields, targetCollectionVersion] = + expCtx->mongoProcessInterface->ensureFieldsUniqueOrResolveDocumentKey( + expCtx, mergeSpec.getOn(), mergeSpec.getTargetCollectionVersion(), targetNss); return DocumentSourceMerge::create(std::move(targetNss), expCtx, @@ -543,137 +429,61 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceMerge::createFromBson( mergeSpec.getLet(), std::move(pipeline), std::move(mergeOnFields), - targetCollectionVersion, - false /* serialize as $out stage */); + targetCollectionVersion); } -DocumentSource::GetNextResult DocumentSourceMerge::getNext() { - pExpCtx->checkForInterrupt(); - - if (_done) { - return GetNextResult::makeEOF(); - } - - if (!_initialized) { - // Explain of a $merge should never try to actually execute any writes. We only ever expect - // getNext() to be called for the 'executionStats' and 'allPlansExecution' explain modes. - // This assertion should not be triggered for 'queryPlanner' explain of a $merge, which is - // perfectly legal. - uassert(51184, - "explain of {} is not allowed with verbosity {}"_format( - kStageName, ExplainOptions::verbosityString(*pExpCtx->explain)), - !pExpCtx->explain); - _initialized = true; - } - - BatchedObjects batch; - int bufferedBytes = 0; - - auto nextInput = pSource->getNext(); - for (; nextInput.isAdvanced(); nextInput = pSource->getNext()) { - CurOpFailpointHelpers::waitWhileFailPointEnabled( - &hangWhileBuildingDocumentSourceMergeBatch, - pExpCtx->opCtx, - "hangWhileBuildingDocumentSourceMergeBatch", - []() { - log() << "Hanging aggregation due to 'hangWhileBuildingDocumentSourceMergeBatch' " - << "failpoint"; - }); - - auto doc = nextInput.releaseDocument(); - - // Generate an _id if the uniqueKey includes _id but the document doesn't have one. - if (_mergeOnFieldsIncludesId && doc.getField("_id"_sd).missing()) { - MutableDocument mutableDoc(std::move(doc)); - mutableDoc["_id"_sd] = Value(OID::gen()); - doc = mutableDoc.freeze(); +Value DocumentSourceMerge::serialize(boost::optional<ExplainOptions::Verbosity> explain) const { + DocumentSourceMergeSpec spec; + spec.setTargetNss(_outputNs); + spec.setLet([&]() -> boost::optional<BSONObj> { + if (!_letVariables) { + return boost::none; } - auto mergeOnFields = extractMergeOnFieldsFromDoc(doc, _mergeOnFields); - auto mod = makeBatchUpdateModification(doc); - auto vars = resolveLetVariablesIfNeeded(doc); - auto modSize = mod.objsize() + (vars ? vars->objsize() : 0); - - bufferedBytes += modSize; - if (!batch.empty() && - (bufferedBytes > BSONObjMaxUserSize || batch.size() >= write_ops::kMaxWriteBatchSize)) { - spill(std::move(batch)); - batch.clear(); - bufferedBytes = modSize; + BSONObjBuilder bob; + for (auto && [ name, expr ] : *_letVariables) { + bob << name << expr->serialize(static_cast<bool>(explain)); } - batch.emplace_back(std::move(mergeOnFields), std::move(mod), std::move(vars)); - } - if (!batch.empty()) { - spill(std::move(batch)); - batch.clear(); - } - - switch (nextInput.getStatus()) { - case GetNextResult::ReturnStatus::kAdvanced: { - MONGO_UNREACHABLE; // We consumed all advances above. + return bob.obj(); + }()); + spec.setWhenMatched(MergeWhenMatchedPolicy{_descriptor.mode.first, _pipeline}); + spec.setWhenNotMatched(_descriptor.mode.second); + spec.setOn([&]() { + std::vector<std::string> mergeOnFields; + for (auto path : _mergeOnFields) { + mergeOnFields.push_back(path.fullPath()); } - case GetNextResult::ReturnStatus::kPauseExecution: { - return nextInput; // Propagate the pause. - } - case GetNextResult::ReturnStatus::kEOF: { - _done = true; - return nextInput; - } - } - MONGO_UNREACHABLE; + return mergeOnFields; + }()); + spec.setTargetCollectionVersion(_targetCollectionVersion); + return Value(Document{{getSourceName(), spec.toBSON()}}); } -Value DocumentSourceMerge::serialize(boost::optional<ExplainOptions::Verbosity> explain) const { - if (_serializeAsOutStage) { - uassert(ErrorCodes::BadValue, - "Cannot serialize {} stage as $out for 'whenMatched: {}' and " - "'whenNotMatched: {}'"_format( - kStageName, - MergeWhenMatchedMode_serializer(_descriptor.mode.first), - MergeWhenNotMatchedMode_serializer(_descriptor.mode.second)), - (_descriptor.mode.first == WhenMatched::kFail || - _descriptor.mode.first == WhenMatched::kReplace) && - (_descriptor.mode.second == WhenNotMatched::kInsert)); - DocumentSourceOutSpec spec; - spec.setTargetDb(_outputNs.db()); - spec.setTargetCollection(_outputNs.coll()); - spec.setMode(_descriptor.mode.first == WhenMatched::kFail - ? WriteModeEnum::kModeInsertDocuments - : WriteModeEnum::kModeReplaceDocuments); - spec.setUniqueKey([&]() { - BSONObjBuilder uniqueKeyBob; - for (auto path : _mergeOnFields) { - uniqueKeyBob.append(path.fullPath(), 1); - } - return uniqueKeyBob.obj(); - }()); - spec.setTargetCollectionVersion(_targetCollectionVersion); - return Value(Document{{DocumentSourceOut::kStageName.rawData(), spec.toBSON()}}); - } else { - DocumentSourceMergeSpec spec; - spec.setTargetNss(_outputNs); - spec.setLet([&]() -> boost::optional<BSONObj> { - if (!_letVariables) { - return boost::none; - } - - BSONObjBuilder bob; - for (auto && [ name, expr ] : *_letVariables) { - bob << name << expr->serialize(static_cast<bool>(explain)); - } - return bob.obj(); - }()); - spec.setWhenMatched(MergeWhenMatchedPolicy{_descriptor.mode.first, _pipeline}); - spec.setWhenNotMatched(_descriptor.mode.second); - spec.setOn([&]() { - std::vector<std::string> mergeOnFields; - for (auto path : _mergeOnFields) { - mergeOnFields.push_back(path.fullPath()); - } - return mergeOnFields; - }()); - spec.setTargetCollectionVersion(_targetCollectionVersion); - return Value(Document{{getSourceName(), spec.toBSON()}}); +std::pair<DocumentSourceMerge::BatchObject, int> DocumentSourceMerge::makeBatchObject( + Document&& doc) const { + // Generate an _id if the uniqueKey includes _id but the document doesn't have one. + if (_mergeOnFieldsIncludesId && doc.getField("_id"_sd).missing()) { + MutableDocument mutableDoc(std::move(doc)); + mutableDoc["_id"_sd] = Value(OID::gen()); + doc = mutableDoc.freeze(); } + + auto mergeOnFields = extractMergeOnFieldsFromDoc(doc, _mergeOnFields); + auto mod = makeBatchUpdateModification(doc); + auto vars = resolveLetVariablesIfNeeded(doc); + auto modSize = mod.objsize() + (vars ? vars->objsize() : 0); + return {{std::move(mergeOnFields), std::move(mod), std::move(vars)}, modSize}; } + +void DocumentSourceMerge::waitWhileFailPointEnabled() { + CurOpFailpointHelpers::waitWhileFailPointEnabled( + &hangWhileBuildingDocumentSourceMergeBatch, + pExpCtx->opCtx, + "hangWhileBuildingDocumentSourceMergeBatch", + []() { + log() << "Hanging aggregation due to 'hangWhileBuildingDocumentSourceMergeBatch' " + << "failpoint"; + }); +} + } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_merge.h b/src/mongo/db/pipeline/document_source_merge.h index aafae386acf..811ebf97591 100644 --- a/src/mongo/db/pipeline/document_source_merge.h +++ b/src/mongo/db/pipeline/document_source_merge.h @@ -29,9 +29,8 @@ #pragma once -#include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_merge_gen.h" -#include "mongo/db/pipeline/document_source_out.h" +#include "mongo/db/pipeline/document_source_writer.h" namespace mongo { @@ -40,10 +39,8 @@ namespace mongo { * this class must be initialized (via a constructor) with a 'MergeDescriptor', which defines a * a particular merge strategy for a pair of 'whenMatched' and 'whenNotMatched' merge modes. */ -class DocumentSourceMerge final : public DocumentSource { +class DocumentSourceMerge final : public DocumentSourceWriter<MongoProcessInterface::BatchObject> { public: - using BatchedObjects = MongoProcessInterface::BatchedObjects; - static constexpr StringData kStageName = "$merge"_sd; // A descriptor for a merge strategy. Holds a merge strategy function and a set of actions @@ -84,38 +81,12 @@ public: } }; - /** - * Builds a new $merge stage which will merge all documents into 'outputNs'. If - * 'targetCollectionVersion' is provided then processing will stop with an error if the - * collection's epoch changes during the course of execution. This is used as a mechanism to - * prevent the shard key from changing. - */ - DocumentSourceMerge(NamespaceString outputNs, - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const MergeStrategyDescriptor& descriptor, - boost::optional<BSONObj> letVariables, - boost::optional<std::vector<BSONObj>> pipeline, - std::set<FieldPath> mergeOnFields, - boost::optional<ChunkVersion> targetCollectionVersion, - bool serializeAsOutStage); - virtual ~DocumentSourceMerge() = default; const char* getSourceName() const final override { return kStageName.rawData(); } - DepsTracker::State getDependencies(DepsTracker* deps) const final override { - deps->needWholeDocument = true; - return DepsTracker::State::EXHAUSTIVE_ALL; - } - - GetModPathsReturn getModifiedPaths() const final override { - // For purposes of tracking which fields come from where, this stage does not modify any - // fields. - return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>{}, {}}; - } - StageConstraints constraints(Pipeline::SplitState pipeState) const final override { // A $merge to an unsharded collection should merge on the primary shard to perform local // writes. A $merge to a sharded collection has no requirement, since each shard can perform @@ -147,25 +118,12 @@ public: if (pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _outputNs)) { return boost::none; } - // {shardsStage, mergingStage, sortPattern} - return DistributedPlanLogic{nullptr, this, boost::none}; - } - - bool canRunInParallelBeforeOut( - const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const final override { - // If someone is asking the question, this must be the $merge stage in question, so yes! - return true; + return DocumentSourceWriter::distributedPlanLogic(); } - GetNextResult getNext() final override; - Value serialize( boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final override; - const NamespaceString& getOutputNs() const { - return _outputNs; - } - /** * Creates a new $merge stage from the given arguments. */ @@ -177,8 +135,7 @@ public: boost::optional<BSONObj> letVariables, boost::optional<std::vector<BSONObj>> pipeline, std::set<FieldPath> mergeOnFields, - boost::optional<ChunkVersion> targetCollectionVersion, - bool serializeAsOutStage); + boost::optional<ChunkVersion> targetCollectionVersion); /** * Parses a $merge stage from the user-supplied BSON. @@ -188,28 +145,23 @@ public: private: /** - * Writes the documents in 'batch' to the output namespace. + * Builds a new $merge stage which will merge all documents into 'outputNs'. If + * 'targetCollectionVersion' is provided then processing will stop with an error if the + * collection's epoch changes during the course of execution. This is used as a mechanism to + * prevent the shard key from changing. */ - void spill(BatchedObjects&& batch) { - OutStageWriteBlock writeBlock(pExpCtx->opCtx); - - try { - auto targetEpoch = _targetCollectionVersion - ? boost::optional<OID>(_targetCollectionVersion->epoch()) - : boost::none; - - _descriptor.strategy(pExpCtx, _outputNs, _writeConcern, targetEpoch, std::move(batch)); - } catch (const ExceptionFor<ErrorCodes::ImmutableField>& ex) { - uassertStatusOKWithContext(ex.toStatus(), - "$merge failed to update the matching document, did you " - "attempt to modify the _id or the shard key?"); - } - } + DocumentSourceMerge(NamespaceString outputNs, + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const MergeStrategyDescriptor& descriptor, + boost::optional<BSONObj> letVariables, + boost::optional<std::vector<BSONObj>> pipeline, + std::set<FieldPath> mergeOnFields, + boost::optional<ChunkVersion> targetCollectionVersion); /** * Creates an UpdateModification object from the given 'doc' to be used with the batched update. */ - auto makeBatchUpdateModification(const Document& doc) { + auto makeBatchUpdateModification(const Document& doc) const { return _pipeline ? write_ops::UpdateModification(*_pipeline) : write_ops::UpdateModification(doc.toBson()); } @@ -218,7 +170,7 @@ private: * Resolves 'let' defined variables against the 'doc' and stores the results in the returned * BSON. */ - boost::optional<BSONObj> resolveLetVariablesIfNeeded(const Document& doc) { + boost::optional<BSONObj> resolveLetVariablesIfNeeded(const Document& doc) const { // When we resolve 'let' variables, an empty BSON object or boost::none won't make any // difference at the end-point (in the PipelineExecutor), as in both cases we will end up // with the update pipeline ExpressionContext not being populated with any variables, so we @@ -234,18 +186,27 @@ private: return bob.obj(); } - // Stash the writeConcern of the original command as the operation context may change by the - // time we start to spill $merge writes. This is because certain aggregations (e.g. $exchange) - // establish cursors with batchSize 0 then run subsequent getMore's which use a new operation - // context. The getMore's will not have an attached writeConcern however we still want to - // respect the writeConcern of the original command. - WriteConcernOptions _writeConcern; + void spill(BatchedObjects&& batch) override { + DocumentSourceWriteBlock writeBlock(pExpCtx->opCtx); - const NamespaceString _outputNs; - boost::optional<ChunkVersion> _targetCollectionVersion; + try { + auto targetEpoch = _targetCollectionVersion + ? boost::optional<OID>(_targetCollectionVersion->epoch()) + : boost::none; - bool _initialized = false; - bool _done = false; + _descriptor.strategy(pExpCtx, _outputNs, _writeConcern, targetEpoch, std::move(batch)); + } catch (const ExceptionFor<ErrorCodes::ImmutableField>& ex) { + uassertStatusOKWithContext(ex.toStatus(), + "$merge failed to update the matching document, did you " + "attempt to modify the _id or the shard key?"); + } + } + + void waitWhileFailPointEnabled() override; + + std::pair<BatchObject, int> makeBatchObject(Document&& doc) const override; + + boost::optional<ChunkVersion> _targetCollectionVersion; // A merge descriptor contains a merge strategy function describing how to merge two // collections, as well as some other metadata needed to perform the merge operation. This is @@ -271,11 +232,6 @@ private: // True if '_mergeOnFields' contains the _id. We store this as a separate boolean to avoid // repeated lookups into the set. bool _mergeOnFieldsIncludesId; - - // If true, display this stage in the explain output as an $out stage rather that $merge. This - // is used when the $merge stage was used an alias for $out's 'insertDocuments' and - // 'replaceDocuments' modes. - bool _serializeAsOutStage; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_merge_test.cpp b/src/mongo/db/pipeline/document_source_merge_test.cpp index f61afee8210..50e75e9d264 100644 --- a/src/mongo/db/pipeline/document_source_merge_test.cpp +++ b/src/mongo/db/pipeline/document_source_merge_test.cpp @@ -35,6 +35,7 @@ #include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/document_source_merge.h" #include "mongo/db/pipeline/document_value_test_util.h" +#include "mongo/db/pipeline/process_interface_standalone.h" namespace mongo { namespace { @@ -393,53 +394,6 @@ TEST_F(DocumentSourceMergeTest, FailsToParseIfOnFieldIsNotStringOrArrayOfStrings ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, 51186); } -TEST_F(DocumentSourceMergeTest, FailsToParseIfOnFieldHasDuplicateFields) { - auto spec = BSON("$merge" << BSON("into" - << "target_collection" - << "on" - << BSON_ARRAY("_id" - << "_id"))); - ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, ErrorCodes::BadValue); - - spec = BSON("$merge" << BSON("into" - << "test" - << "on" - << BSON_ARRAY("x" - << "y" - << "x"))); - ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, ErrorCodes::BadValue); -} - -TEST_F(DocumentSourceMergeTest, FailsToParseIfTargetCollectionVersionIsSpecifiedOnMongos) { - auto spec = BSON("$merge" << BSON("into" - << "target_collection" - << "on" - << "_id" - << "targetCollectionVersion" - << ChunkVersion(0, 0, OID::gen()).toBSON())); - getExpCtx()->inMongos = true; - ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, 51179); - - // Test that 'targetCollectionVersion' is accepted if _from_ mongos. - getExpCtx()->inMongos = false; - getExpCtx()->fromMongos = true; - ASSERT(createMergeStage(spec) != nullptr); - - // Test that 'targetCollectionVersion' is not accepted if on mongod but not from mongos. - getExpCtx()->inMongos = false; - getExpCtx()->fromMongos = false; - ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, 51123); -} - -TEST_F(DocumentSourceMergeTest, FailsToParseIfOnFieldIsNotSentFromMongos) { - auto spec = BSON("$merge" << BSON("into" - << "target_collection" - << "targetCollectionVersion" - << ChunkVersion(0, 0, OID::gen()).toBSON())); - getExpCtx()->fromMongos = true; - ASSERT_THROWS_CODE(createMergeStage(spec), AssertionException, 51124); -} - TEST_F(DocumentSourceMergeTest, CorrectlyUsesTargetDbThatMatchesAggregationDb) { const auto targetDbSameAsAggregationDb = getExpCtx()->ns.db(); const auto targetColl = "target_collection"; diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp index d9ddf73015d..ab340885632 100644 --- a/src/mongo/db/pipeline/document_source_out.cpp +++ b/src/mongo/db/pipeline/document_source_out.cpp @@ -31,454 +31,204 @@ #include "mongo/platform/basic.h" +#include "mongo/db/pipeline/document_source_out.h" + +#include <fmt/format.h> + #include "mongo/db/curop_failpoint_helpers.h" #include "mongo/db/ops/write_ops.h" #include "mongo/db/pipeline/document_path_support.h" -#include "mongo/db/pipeline/document_source_merge.h" -#include "mongo/db/pipeline/document_source_out.h" -#include "mongo/db/pipeline/document_source_out_gen.h" -#include "mongo/db/pipeline/document_source_out_replace_coll.h" -#include "mongo/util/fail_point_service.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/util/destructor_guard.h" #include "mongo/util/log.h" namespace mongo { +using namespace fmt::literals; + +static AtomicWord<unsigned> aggOutCounter; MONGO_FAIL_POINT_DEFINE(hangWhileBuildingDocumentSourceOutBatch); +REGISTER_DOCUMENT_SOURCE(out, + DocumentSourceOut::LiteParsed::parse, + DocumentSourceOut::createFromBson); -using boost::intrusive_ptr; -using std::vector; +DocumentSourceOut::~DocumentSourceOut() { + DESTRUCTOR_GUARD( + // Make sure we drop the temp collection if anything goes wrong. Errors are ignored + // here because nothing can be done about them. Additionally, if this fails and the + // collection is left behind, it will be cleaned up next time the server is started. + if (_tempNs.size()) { + auto cleanupClient = + pExpCtx->opCtx->getServiceContext()->makeClient("$out_replace_coll_cleanup"); + AlternativeClientRegion acr(cleanupClient); + // Create a new operation context so that any interrupts on the current operation will + // not affect the dropCollection operation below. + auto cleanupOpCtx = cc().makeOperationContext(); + + DocumentSourceWriteBlock writeBlock(cleanupOpCtx.get()); + + // Reset the operation context back to original once dropCollection is done. + ON_BLOCK_EXIT( + [this] { pExpCtx->mongoProcessInterface->setOperationContext(pExpCtx->opCtx); }); + + pExpCtx->mongoProcessInterface->setOperationContext(cleanupOpCtx.get()); + pExpCtx->mongoProcessInterface->directClient()->dropCollection(_tempNs.ns()); + }); +} std::unique_ptr<DocumentSourceOut::LiteParsed> DocumentSourceOut::LiteParsed::parse( const AggregationRequest& request, const BSONElement& spec) { uassert(ErrorCodes::TypeMismatch, - str::stream() << "$out stage requires a string or object argument, but found " - << typeName(spec.type()), - spec.type() == BSONType::String || spec.type() == BSONType::Object); - - NamespaceString targetNss; - bool allowSharded; - WriteModeEnum mode; - if (spec.type() == BSONType::String) { - targetNss = NamespaceString(request.getNamespaceString().db(), spec.valueStringData()); - allowSharded = false; - mode = WriteModeEnum::kModeReplaceCollection; - } else if (spec.type() == BSONType::Object) { - auto outSpec = - DocumentSourceOutSpec::parse(IDLParserErrorContext("$out"), spec.embeddedObject()); - - if (auto targetDb = outSpec.getTargetDb()) { - targetNss = NamespaceString(*targetDb, outSpec.getTargetCollection()); - } else { - targetNss = - NamespaceString(request.getNamespaceString().db(), outSpec.getTargetCollection()); - } - - mode = outSpec.getMode(); - - // Sharded output collections are not allowed with mode "replaceCollection". - allowSharded = mode != WriteModeEnum::kModeReplaceCollection; - } + "{} stage requires a string argument, but found {}"_format(kStageName, + typeName(spec.type())), + spec.type() == BSONType::String); + NamespaceString targetNss{request.getNamespaceString().db(), spec.valueStringData()}; uassert(ErrorCodes::InvalidNamespace, - str::stream() << "Invalid $out target namespace, " << targetNss.ns(), + "Invalid {} target namespace, {}"_format(kStageName, targetNss.ns()), targetNss.isValid()); - // All modes require the "insert" action. - ActionSet actions{ActionType::insert}; - switch (mode) { - case WriteModeEnum::kModeReplaceCollection: - actions.addAction(ActionType::remove); - break; - case WriteModeEnum::kModeReplaceDocuments: - actions.addAction(ActionType::update); - break; - case WriteModeEnum::kModeInsertDocuments: - // "insertDocuments" mode only requires the "insert" action. - break; - } - + ActionSet actions{ActionType::insert, ActionType::remove}; if (request.shouldBypassDocumentValidation()) { actions.addAction(ActionType::bypassDocumentValidation); } PrivilegeVector privileges{Privilege(ResourcePattern::forExactNamespace(targetNss), actions)}; - return stdx::make_unique<DocumentSourceOut::LiteParsed>( - std::move(targetNss), std::move(privileges), allowSharded); + return stdx::make_unique<DocumentSourceOut::LiteParsed>(std::move(targetNss), + std::move(privileges)); } -REGISTER_DOCUMENT_SOURCE(out, - DocumentSourceOut::LiteParsed::parse, - DocumentSourceOut::createFromBson); +void DocumentSourceOut::initialize() { + DocumentSourceWriteBlock writeBlock(pExpCtx->opCtx); -const char* DocumentSourceOut::getSourceName() const { - return "$out"; -} + DBClientBase* conn = pExpCtx->mongoProcessInterface->directClient(); -namespace { -/** - * Parses the fields of the 'uniqueKey' from the user-specified 'obj' from the $out spec, returning - * a set of field paths. Throws if 'obj' is invalid. - */ -std::set<FieldPath> parseUniqueKeyFromSpec(const BSONObj& obj) { - std::set<FieldPath> uniqueKey; - for (const auto& elem : obj) { - uassert(ErrorCodes::TypeMismatch, - str::stream() << "All fields of $out uniqueKey must be the number 1, but '" - << elem.fieldNameStringData() - << "' is of type " - << elem.type(), - elem.isNumber()); - - uassert(ErrorCodes::BadValue, - str::stream() << "All fields of $out uniqueKey must be the number 1, but '" - << elem.fieldNameStringData() - << "' has the invalid value " - << elem.numberDouble(), - elem.numberDouble() == 1.0); - - const auto res = uniqueKey.insert(FieldPath(elem.fieldNameStringData())); - uassert(ErrorCodes::BadValue, - str::stream() << "Found a duplicate field '" << elem.fieldNameStringData() - << "' in $out uniqueKey", - res.second); - } + const auto& outputNs = getOutputNs(); + _tempNs = NamespaceString(str::stream() << outputNs.db() << ".tmp.agg_out." + << aggOutCounter.addAndFetch(1)); - uassert(ErrorCodes::InvalidOptions, - "If explicitly specifying $out uniqueKey, must include at least one field", - uniqueKey.size() > 0); - return uniqueKey; -} + // Save the original collection options and index specs so we can check they didn't change + // during computation. + _originalOutOptions = pExpCtx->mongoProcessInterface->getCollectionOptions(outputNs); + _originalIndexes = conn->getIndexSpecs(outputNs.ns()); -/** - * Extracts the fields of 'uniqueKey' from 'doc' and returns the key as a BSONObj. Throws if any - * field of the 'uniqueKey' extracted from 'doc' is nullish or an array. - */ -BSONObj extractUniqueKeyFromDoc(const Document& doc, const std::set<FieldPath>& uniqueKey) { - MutableDocument result; - for (const auto& field : uniqueKey) { - auto value = doc.getNestedField(field); - uassert(50943, - str::stream() << "$out write error: uniqueKey field '" << field.fullPath() - << "' is an array in the document '" - << doc.toString() - << "'", - !value.isArray()); - uassert( - 50905, - str::stream() << "$out write error: uniqueKey field '" << field.fullPath() - << "' cannot be missing, null, undefined or an array. Full document: '" - << doc.toString() - << "'", - !value.nullish()); - result.addField(field.fullPath(), std::move(value)); - } - return result.freeze().toBson(); -} + // Check if it's capped to make sure we have a chance of succeeding before we do all the work. + // If the collection becomes capped during processing, the collection options will have changed, + // and the $out will fail. + uassert(17152, + "namespace '{}' is capped so it can't be used for {}"_format(outputNs.ns(), kStageName), + _originalOutOptions["capped"].eoo()); -void ensureUniqueKeyHasSupportingIndex(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& outputNs, - const std::set<FieldPath>& uniqueKey, - const BSONObj& userSpecifiedUniqueKey) { - uassert( - 50938, - str::stream() << "Cannot find index to verify that $out's unique key will be unique: " - << userSpecifiedUniqueKey, - expCtx->mongoProcessInterface->uniqueKeyIsSupportedByIndex(expCtx, outputNs, uniqueKey)); -} -} // namespace + // We will write all results into a temporary collection, then rename the temporary + // collection to be the target collection once we are done. + _tempNs = NamespaceString(str::stream() << outputNs.db() << ".tmp.agg_out." + << aggOutCounter.addAndFetch(1)); -DocumentSource::GetNextResult DocumentSourceOut::getNext() { - pExpCtx->checkForInterrupt(); + // Create temp collection, copying options from the existing output collection if any. + { + BSONObjBuilder cmd; + cmd << "create" << _tempNs.coll(); + cmd << "temp" << true; + cmd.appendElementsUnique(_originalOutOptions); - if (_done) { - return GetNextResult::makeEOF(); + BSONObj info; + uassert(16994, + "failed to create temporary {} collection '{}': {}"_format( + kStageName, _tempNs.ns(), getStatusFromCommandResult(info).reason()), + conn->runCommand(outputNs.db().toString(), cmd.done(), info)); } - if (!_initialized) { - // Explain of a $out should never try to actually execute any writes. We only ever expect - // getNext() to be called for the 'executionStats' and 'allPlansExecution' explain modes. - // This assertion should not be triggered for 'queryPlanner' explain of a $out, which is - // perfectly legal. - uassert(51029, - str::stream() << "explain of $out is not allowed with verbosity: " - << ExplainOptions::verbosityString(*pExpCtx->explain), - !pExpCtx->explain); - - initializeWriteNs(); - _initialized = true; + if (_originalIndexes.empty()) { + return; } - BatchedObjects batch; - int bufferedBytes = 0; - - auto nextInput = pSource->getNext(); - for (; nextInput.isAdvanced(); nextInput = pSource->getNext()) { - // clang-format off - CurOpFailpointHelpers::waitWhileFailPointEnabled( - &hangWhileBuildingDocumentSourceOutBatch, - pExpCtx->opCtx, - "hangWhileBuildingDocumentSourceOutBatch", - []() { - log() << "Hanging aggregation due to 'hangWhileBuildingDocumentSourceOutBatch' " - << "failpoint"; - }); - // clang-format on - - auto doc = nextInput.releaseDocument(); - - // Generate an _id if the uniqueKey includes _id but the document doesn't have one. - if (_uniqueKeyIncludesId && doc.getField("_id"_sd).missing()) { - MutableDocument mutableDoc(std::move(doc)); - mutableDoc["_id"_sd] = Value(OID::gen()); - doc = mutableDoc.freeze(); - } - - // Extract the unique key before converting the document to BSON. - auto uniqueKey = extractUniqueKeyFromDoc(doc, _uniqueKeyFields); - auto insertObj = doc.toBson(); - - bufferedBytes += insertObj.objsize(); - if (!batch.empty() && - (bufferedBytes > BSONObjMaxUserSize || batch.size() >= write_ops::kMaxWriteBatchSize)) { - spill(std::move(batch)); - batch.clear(); - bufferedBytes = insertObj.objsize(); - } - batch.emplace(std::move(insertObj), std::move(uniqueKey)); + // Copy the indexes of the output collection to the temp collection. + std::vector<BSONObj> tempNsIndexes; + for (const auto& indexSpec : _originalIndexes) { + // Replace the spec's 'ns' field value, which is the original collection, with the temp + // collection. + tempNsIndexes.push_back(indexSpec.addField(BSON("ns" << _tempNs.ns()).firstElement())); } - if (!batch.empty()) { - spill(std::move(batch)); - batch.clear(); + try { + conn->createIndexes(_tempNs.ns(), tempNsIndexes); + } catch (DBException& ex) { + ex.addContext("Copying indexes for $out failed"); + throw; } +} - switch (nextInput.getStatus()) { - case GetNextResult::ReturnStatus::kAdvanced: { - MONGO_UNREACHABLE; // We consumed all advances above. - } - case GetNextResult::ReturnStatus::kPauseExecution: { - return nextInput; // Propagate the pause. - } - case GetNextResult::ReturnStatus::kEOF: { - - finalize(); - _done = true; - - // $out doesn't currently produce any outputs. - return nextInput; - } - } - MONGO_UNREACHABLE; +void DocumentSourceOut::finalize() { + DocumentSourceWriteBlock writeBlock(pExpCtx->opCtx); + + const auto& outputNs = getOutputNs(); + auto renameCommandObj = + BSON("renameCollection" << _tempNs.ns() << "to" << outputNs.ns() << "dropTarget" << true); + + pExpCtx->mongoProcessInterface->renameIfOptionsAndIndexesHaveNotChanged( + pExpCtx->opCtx, renameCommandObj, outputNs, _originalOutOptions, _originalIndexes); + + // The rename succeeded, so the temp collection no longer exists. + _tempNs = {}; } -intrusive_ptr<DocumentSource> DocumentSourceOut::create( - NamespaceString outputNs, - const intrusive_ptr<ExpressionContext>& expCtx, - WriteModeEnum mode, - std::set<FieldPath> uniqueKey, - boost::optional<ChunkVersion> targetCollectionVersion) { +boost::intrusive_ptr<DocumentSource> DocumentSourceOut::create( + NamespaceString outputNs, const boost::intrusive_ptr<ExpressionContext>& expCtx) { // TODO (SERVER-36832): Allow this combination. - uassert( - 50939, - str::stream() << "$out with mode " << WriteMode_serializer(mode) - << " is not supported when the output collection is in a different database", - !(mode == WriteModeEnum::kModeReplaceCollection && outputNs.db() != expCtx->ns.db())); - - uassert(50992, - str::stream() << "$out with mode " << WriteMode_serializer(mode) - << " is not supported when the output collection is the same as the" - << " aggregation collection", - mode == WriteModeEnum::kModeReplaceCollection || expCtx->ns != outputNs); + uassert(50939, + "{} is not supported when the output collection is in a different " + "database"_format(kStageName), + outputNs.db() == expCtx->ns.db()); uassert(ErrorCodes::OperationNotSupportedInTransaction, - "$out cannot be used in a transaction", + "{} cannot be used in a transaction"_format(kStageName), !expCtx->inMultiDocumentTransaction); auto readConcernLevel = repl::ReadConcernArgs::get(expCtx->opCtx).getLevel(); uassert(ErrorCodes::InvalidOptions, - "$out cannot be used with a 'linearizable' read concern level", + "{} cannot be used with a 'linearizable' read concern level"_format(kStageName), readConcernLevel != repl::ReadConcernLevel::kLinearizableReadConcern); - // Although we perform a check for "replaceCollection" mode with a sharded output collection - // during lite parsing, we need to do it here as well in case mongos is stale or the command is - // sent directly to the shard. - if (mode == WriteModeEnum::kModeReplaceCollection) { - uassert(17017, - str::stream() << "$out with mode " << WriteMode_serializer(mode) - << " is not supported to an existing *sharded* output collection.", - !expCtx->mongoProcessInterface->isSharded(expCtx->opCtx, outputNs)); - } - uassert(17385, "Can't $out to special collection: " + outputNs.coll(), !outputNs.isSpecial()); - - switch (mode) { - case WriteModeEnum::kModeReplaceCollection: - return new DocumentSourceOutReplaceColl( - std::move(outputNs), expCtx, mode, std::move(uniqueKey), targetCollectionVersion); - case WriteModeEnum::kModeInsertDocuments: - return DocumentSourceMerge::create(std::move(outputNs), - expCtx, - MergeWhenMatchedModeEnum::kFail, - MergeWhenNotMatchedModeEnum::kInsert, - boost::none, /* no variables */ - boost::none, /* no custom pipeline */ - std::move(uniqueKey), - targetCollectionVersion, - true /* serialize as $out stage */); - case WriteModeEnum::kModeReplaceDocuments: - return DocumentSourceMerge::create(std::move(outputNs), - expCtx, - MergeWhenMatchedModeEnum::kReplace, - MergeWhenNotMatchedModeEnum::kInsert, - boost::none, /* no variables */ - boost::none, /* no custom pipeline */ - std::move(uniqueKey), - targetCollectionVersion, - true /* serialize as $out stage */); - default: - MONGO_UNREACHABLE; - } -} + uassert(17017, + "{} is not supported to an existing *sharded* output collection"_format(kStageName), + !expCtx->mongoProcessInterface->isSharded(expCtx->opCtx, outputNs)); -DocumentSourceOut::DocumentSourceOut(NamespaceString outputNs, - const intrusive_ptr<ExpressionContext>& expCtx, - WriteModeEnum mode, - std::set<FieldPath> uniqueKey, - boost::optional<ChunkVersion> targetCollectionVersion) - : DocumentSource(expCtx), - _writeConcern(expCtx->opCtx->getWriteConcern()), - _outputNs(std::move(outputNs)), - _targetCollectionVersion(targetCollectionVersion), - _done(false), - _mode(mode), - _uniqueKeyFields(std::move(uniqueKey)), - _uniqueKeyIncludesId(_uniqueKeyFields.count("_id") == 1) {} - -intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson( - BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) { - - auto mode = WriteModeEnum::kModeReplaceCollection; - std::set<FieldPath> uniqueKey; - NamespaceString outputNs; - boost::optional<ChunkVersion> targetCollectionVersion; - if (elem.type() == BSONType::String) { - outputNs = NamespaceString(expCtx->ns.db().toString() + '.' + elem.str()); - uniqueKey.emplace("_id"); - } else if (elem.type() == BSONType::Object) { - auto spec = - DocumentSourceOutSpec::parse(IDLParserErrorContext("$out"), elem.embeddedObject()); - mode = spec.getMode(); - - // Retrieve the target database from the user command, otherwise use the namespace from the - // expression context. - auto dbName = spec.getTargetDb() ? *spec.getTargetDb() : expCtx->ns.db(); - outputNs = NamespaceString(dbName, spec.getTargetCollection()); - - std::tie(uniqueKey, targetCollectionVersion) = expCtx->inMongos - ? resolveUniqueKeyOnMongoS(expCtx, spec, outputNs) - : resolveUniqueKeyOnMongoD(expCtx, spec, outputNs); - } else { - uasserted(16990, - str::stream() << "$out only supports a string or object argument, not " - << typeName(elem.type())); - } + uassert(17385, + "Can't {} to special collection: {}"_format(kStageName, outputNs.coll()), + !outputNs.isSpecial()); - return create(std::move(outputNs), expCtx, mode, std::move(uniqueKey), targetCollectionVersion); + return new DocumentSourceOut(std::move(outputNs), expCtx); } -std::pair<std::set<FieldPath>, boost::optional<ChunkVersion>> -DocumentSourceOut::resolveUniqueKeyOnMongoD(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const DocumentSourceOutSpec& spec, - const NamespaceString& outputNs) { - invariant(!expCtx->inMongos); - auto targetCollectionVersion = spec.getTargetCollectionVersion(); - if (targetCollectionVersion) { - uassert(51018, "Unexpected target chunk version specified", expCtx->fromMongos); - // If mongos has sent us a target shard version, we need to be sure we are prepared to - // act as a router which is at least as recent as that mongos. - expCtx->mongoProcessInterface->checkRoutingInfoEpochOrThrow( - expCtx, outputNs, *targetCollectionVersion); - } - - auto userSpecifiedUniqueKey = spec.getUniqueKey(); - if (!userSpecifiedUniqueKey) { - uassert(51017, "Expected uniqueKey to be provided from mongos", !expCtx->fromMongos); - return {std::set<FieldPath>{"_id"}, targetCollectionVersion}; - } +boost::intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) { + uassert(16990, + "{} only supports a string argument, but found {}"_format(kStageName, + typeName(elem.type())), + elem.type() == BSONType::String); - // Make sure the uniqueKey has a supporting index. Skip this check if the command is sent - // from mongos since the uniqueKey check would've happened already. - auto uniqueKey = parseUniqueKeyFromSpec(userSpecifiedUniqueKey.get()); - if (!expCtx->fromMongos) { - ensureUniqueKeyHasSupportingIndex(expCtx, outputNs, uniqueKey, *userSpecifiedUniqueKey); - } - return {uniqueKey, targetCollectionVersion}; + return create({expCtx->ns.db(), elem.str()}, expCtx); } -std::pair<std::set<FieldPath>, boost::optional<ChunkVersion>> -DocumentSourceOut::resolveUniqueKeyOnMongoS(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const DocumentSourceOutSpec& spec, - const NamespaceString& outputNs) { - invariant(expCtx->inMongos); - uassert(50984, - "$out received unexpected 'targetCollectionVersion' on mongos", - !spec.getTargetCollectionVersion()); - - if (auto userSpecifiedUniqueKey = spec.getUniqueKey()) { - // Convert unique key object to a vector of FieldPaths. - auto uniqueKey = parseUniqueKeyFromSpec(userSpecifiedUniqueKey.get()); - ensureUniqueKeyHasSupportingIndex(expCtx, outputNs, uniqueKey, *userSpecifiedUniqueKey); - - // If the user supplies the uniqueKey we don't need to attach a ChunkVersion for the shards - // since we are not at risk of 'guessing' the wrong shard key. - return {uniqueKey, boost::none}; - } +Value DocumentSourceOut::serialize(boost::optional<ExplainOptions::Verbosity> explain) const { + massert(17000, + "{} shouldn't have different db than input"_format(kStageName), + _outputNs.db() == pExpCtx->ns.db()); - // In case there are multiple shards which will perform this $out in parallel, we need to figure - // out and attach the collection's shard version to ensure each shard is talking about the same - // version of the collection. This mongos will coordinate that. We force a catalog refresh to do - // so because there is no shard versioning protocol on this namespace and so we otherwise could - // not be sure this node is (or will be come) at all recent. We will also figure out and attach - // the uniqueKey to send to the shards. We don't need to do this for 'replaceCollection' mode - // since that mode cannot currently target a sharded collection. - - // There are cases where the aggregation could fail if the collection is dropped or re-created - // during or near the time of the aggregation. This is okay - we are mostly paranoid that this - // mongos is very stale and want to prevent returning an error if the collection was dropped a - // long time ago. Because of this, we are okay with piggy-backing off another thread's request - // to refresh the cache, simply waiting for that request to return instead of forcing another - // refresh. - boost::optional<ChunkVersion> targetCollectionVersion = - spec.getMode() == WriteModeEnum::kModeReplaceCollection - ? boost::none - : expCtx->mongoProcessInterface->refreshAndGetCollectionVersion(expCtx, outputNs); - - auto docKeyPaths = expCtx->mongoProcessInterface->collectDocumentKeyFieldsActingAsRouter( - expCtx->opCtx, outputNs); - return {std::set<FieldPath>(std::make_move_iterator(docKeyPaths.begin()), - std::make_move_iterator(docKeyPaths.end())), - targetCollectionVersion}; + return Value(DOC(getSourceName() << _outputNs.coll())); } -Value DocumentSourceOut::serialize(boost::optional<ExplainOptions::Verbosity> explain) const { - DocumentSourceOutSpec spec; - spec.setTargetDb(_outputNs.db()); - spec.setTargetCollection(_outputNs.coll()); - spec.setMode(_mode); - spec.setUniqueKey([&]() { - BSONObjBuilder uniqueKeyBob; - for (auto path : _uniqueKeyFields) { - uniqueKeyBob.append(path.fullPath(), 1); - } - return uniqueKeyBob.obj(); - }()); - spec.setTargetCollectionVersion(_targetCollectionVersion); - return Value(Document{{getSourceName(), spec.toBSON()}}); +void DocumentSourceOut::waitWhileFailPointEnabled() { + CurOpFailpointHelpers::waitWhileFailPointEnabled( + &hangWhileBuildingDocumentSourceOutBatch, + pExpCtx->opCtx, + "hangWhileBuildingDocumentSourceOutBatch", + []() { + log() << "Hanging aggregation due to 'hangWhileBuildingDocumentSourceOutBatch' " + << "failpoint"; + }); } -DepsTracker::State DocumentSourceOut::getDependencies(DepsTracker* deps) const { - deps->needWholeDocument = true; - return DepsTracker::State::EXHAUSTIVE_ALL; -} } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h index bfb9bb08771..7efb8450aed 100644 --- a/src/mongo/db/pipeline/document_source_out.h +++ b/src/mongo/db/pipeline/document_source_out.h @@ -29,46 +29,13 @@ #pragma once -#include "mongo/db/db_raii.h" -#include "mongo/db/pipeline/document_source.h" -#include "mongo/db/pipeline/document_source_out_gen.h" -#include "mongo/db/write_concern_options.h" -#include "mongo/s/chunk_version.h" +#include "mongo/db/pipeline/document_source_writer.h" namespace mongo { - -/** - * Manipulates the state of the OperationContext so that while this object is in scope, reads and - * writes will use a local read concern and see the latest version of the data. It will also reset - * ignore_prepared on the recovery unit so that any reads or writes will block on a conflict with a - * prepared transaction. Resets the OperationContext back to its original state upon destruction. - */ -class OutStageWriteBlock { - OperationContext* _opCtx; - repl::ReadConcernArgs _originalArgs; - RecoveryUnit::ReadSource _originalSource; - EnforcePrepareConflictsBlock _enforcePrepareConflictsBlock; - -public: - OutStageWriteBlock(OperationContext* opCtx) - : _opCtx(opCtx), _enforcePrepareConflictsBlock(opCtx) { - _originalArgs = repl::ReadConcernArgs::get(_opCtx); - _originalSource = _opCtx->recoveryUnit()->getTimestampReadSource(); - - repl::ReadConcernArgs::get(_opCtx) = repl::ReadConcernArgs(); - _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::kUnset); - } - - ~OutStageWriteBlock() { - repl::ReadConcernArgs::get(_opCtx) = _originalArgs; - _opCtx->recoveryUnit()->setTimestampReadSource(_originalSource); - } -}; - /** - * Abstract class for the $out aggregation stage. + * Implementation for the $out aggregation stage. */ -class DocumentSourceOut : public DocumentSource { +class DocumentSourceOut final : public DocumentSourceWriter<BSONObj> { public: static constexpr StringData kStageName = "$out"_sd; @@ -78,172 +45,46 @@ public: */ class LiteParsed final : public LiteParsedDocumentSourceForeignCollections { public: + using LiteParsedDocumentSourceForeignCollections:: + LiteParsedDocumentSourceForeignCollections; + + static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request, const BSONElement& spec); - LiteParsed(NamespaceString outNss, PrivilegeVector privileges, bool allowShardedOutNss) - : LiteParsedDocumentSourceForeignCollections(outNss, privileges), - _allowShardedOutNss(allowShardedOutNss) {} - bool allowShardedForeignCollection(NamespaceString nss) const final { - return _allowShardedOutNss ? true : (_foreignNssSet.find(nss) == _foreignNssSet.end()); + return _foreignNssSet.find(nss) == _foreignNssSet.end(); } bool allowedToPassthroughFromMongos() const final { - // Do not allow passthrough from mongos even if the source collection is unsharded. This - // ensures that the unique index verification happens once on mongos and can be bypassed - // on the shards. return false; } - - private: - bool _allowShardedOutNss; }; - /** - * Builds a new $out stage which will spill all documents into 'outputNs' as inserts. If - * 'targetCollectionVersion' is provided then processing will stop with an error if the - * collection's epoch changes during the course of execution. This is used as a mechanism to - * prevent the shard key from changing. - */ - DocumentSourceOut(NamespaceString outputNs, - const boost::intrusive_ptr<ExpressionContext>& expCtx, - WriteModeEnum mode, - std::set<FieldPath> uniqueKey, - boost::optional<ChunkVersion> targetCollectionVersion); + ~DocumentSourceOut() override; - virtual ~DocumentSourceOut() = default; - - GetNextResult getNext() final; - const char* getSourceName() const final; - Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; - DepsTracker::State getDependencies(DepsTracker* deps) const final; - /** - * For purposes of tracking which fields come from where, this stage does not modify any fields. - */ - GetModPathsReturn getModifiedPaths() const final { - return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>{}, {}}; + const char* getSourceName() const final override { + return kStageName.rawData(); } - StageConstraints constraints(Pipeline::SplitState pipeState) const final { - // A $out to an unsharded collection should merge on the primary shard to perform local - // writes. A $out to a sharded collection has no requirement, since each shard can perform - // its own portion of the write. We use 'kAnyShard' to direct it to execute on one of the - // shards in case some of the writes happen to end up being local. - // - // Note that this decision is inherently racy and subject to become stale. This is okay - // because either choice will work correctly, we are simply applying a heuristic - // optimization. - auto hostTypeRequirement = HostTypeRequirement::kPrimaryShard; - if (pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _outputNs) && - _mode != WriteModeEnum::kModeReplaceCollection) { - hostTypeRequirement = HostTypeRequirement::kAnyShard; - } + StageConstraints constraints(Pipeline::SplitState pipeState) const final override { return {StreamType::kStreaming, PositionRequirement::kLast, - hostTypeRequirement, + HostTypeRequirement::kPrimaryShard, DiskUseRequirement::kWritesPersistentData, FacetRequirement::kNotAllowed, TransactionRequirement::kNotAllowed, LookupRequirement::kNotAllowed}; } - const NamespaceString& getOutputNs() const { - return _outputNs; - } - - WriteModeEnum getMode() const { - return _mode; - } - - boost::optional<DistributedPlanLogic> distributedPlanLogic() final { - // It should always be faster to avoid splitting the pipeline if the output collection is - // sharded. If we avoid splitting the pipeline then each shard can perform the writes to the - // target collection in parallel. - // - // Note that this decision is inherently racy and subject to become stale. This is okay - // because either choice will work correctly, we are simply applying a heuristic - // optimization. - if (pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _outputNs)) { - return boost::none; - } - // {shardsStage, mergingStage, sortPattern} - return DistributedPlanLogic{nullptr, this, boost::none}; - } - - virtual bool canRunInParallelBeforeOut( - const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const final { - // If someone is asking the question, this must be the $out stage in question, so yes! - return true; - } - - - /** - * Retrieves the namespace to direct each batch to, which may be a temporary namespace or the - * final output namespace. - */ - virtual const NamespaceString& getWriteNs() const = 0; - - /** - * Prepares the DocumentSource to be able to write incoming batches to the desired collection. - */ - virtual void initializeWriteNs() = 0; - - /** - * Storage for a batch of BSON Objects to be inserted/updated to the write namespace. The - * extracted unique key values are also stored in a batch, used by $out with mode - * "replaceDocuments" as the query portion of the update. - * - */ - struct BatchedObjects { - void emplace(BSONObj&& obj, BSONObj&& key) { - objects.emplace_back(std::move(obj)); - uniqueKeys.emplace_back(std::move(key)); - } - - bool empty() const { - return objects.empty(); - } - - size_t size() const { - return objects.size(); - } - - void clear() { - objects.clear(); - uniqueKeys.clear(); - } - - std::vector<BSONObj> objects; - // Store the unique keys as BSON objects instead of Documents for compatibility with the - // batch update command. (e.g. {q: <array of uniqueKeys>, u: <array of objects>}) - std::vector<BSONObj> uniqueKeys; - }; - - /** - * Writes the documents in 'batch' to the write namespace. - */ - virtual void spill(BatchedObjects&& batch) { - OutStageWriteBlock writeBlock(pExpCtx->opCtx); - - pExpCtx->mongoProcessInterface->insert( - pExpCtx, getWriteNs(), std::move(batch.objects), _writeConcern, _targetEpoch()); - }; - - /** - * Finalize the output collection, called when there are no more documents to write. - */ - virtual void finalize() = 0; + Value serialize( + boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final override; /** * Creates a new $out stage from the given arguments. */ static boost::intrusive_ptr<DocumentSource> create( - NamespaceString outputNs, - const boost::intrusive_ptr<ExpressionContext>& expCtx, - WriteModeEnum, - std::set<FieldPath> uniqueKey = std::set<FieldPath>{"_id"}, - boost::optional<ChunkVersion> targetCollectionVersion = boost::none); + NamespaceString outputNs, const boost::intrusive_ptr<ExpressionContext>& expCtx); /** * Parses a $out stage from the user-supplied BSON. @@ -251,59 +92,37 @@ public: static boost::intrusive_ptr<DocumentSource> createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); -protected: - // Stash the writeConcern of the original command as the operation context may change by the - // time we start to spill $out writes. This is because certain aggregations (e.g. $exchange) - // establish cursors with batchSize 0 then run subsequent getMore's which use a new operation - // context. The getMore's will not have an attached writeConcern however we still want to - // respect the writeConcern of the original command. - WriteConcernOptions _writeConcern; +private: + DocumentSourceOut(NamespaceString outputNs, + const boost::intrusive_ptr<ExpressionContext>& expCtx) + : DocumentSourceWriter(std::move(outputNs), expCtx) {} - const NamespaceString _outputNs; - boost::optional<ChunkVersion> _targetCollectionVersion; + void initialize() override; - boost::optional<OID> _targetEpoch() { - return _targetCollectionVersion ? boost::optional<OID>(_targetCollectionVersion->epoch()) - : boost::none; - } + void finalize() override; -private: - /** - * If 'spec' does not specify a uniqueKey, uses the sharding catalog to pick a default key of - * the shard key + _id. Returns a pair of the uniqueKey (either from the spec or generated), and - * an optional ChunkVersion, populated with the version stored in the sharding catalog when we - * asked for the shard key. - */ - static std::pair<std::set<FieldPath>, boost::optional<ChunkVersion>> resolveUniqueKeyOnMongoS( - const boost::intrusive_ptr<ExpressionContext>&, - const DocumentSourceOutSpec& spec, - const NamespaceString& outputNs); + void spill(BatchedObjects&& batch) override { + DocumentSourceWriteBlock writeBlock(pExpCtx->opCtx); - /** - * Ensures that 'spec' contains a uniqueKey which has a supporting index - either because the - * uniqueKey was sent from mongos or because there is a corresponding unique index. Returns the - * target ChunkVersion already attached to 'spec', but verifies that this node's cached routing - * table agrees on the epoch for that version before returning. Throws a StaleConfigException if - * not. - */ - static std::pair<std::set<FieldPath>, boost::optional<ChunkVersion>> resolveUniqueKeyOnMongoD( - const boost::intrusive_ptr<ExpressionContext>&, - const DocumentSourceOutSpec& spec, - const NamespaceString& outputNs); + auto targetEpoch = boost::none; + pExpCtx->mongoProcessInterface->insert( + pExpCtx, _tempNs, std::move(batch), _writeConcern, targetEpoch); + } - bool _initialized = false; - bool _done = false; + std::pair<BSONObj, int> makeBatchObject(Document&& doc) const override { + auto obj = doc.toBson(); + return {obj, obj.objsize()}; + } - WriteModeEnum _mode; + void waitWhileFailPointEnabled() override; - // Holds the unique key used for uniquely identifying documents. There must exist a unique index - // with this key pattern (up to order). Default is "_id" for unsharded collections, and "_id" - // plus the shard key for sharded collections. - std::set<FieldPath> _uniqueKeyFields; + // Holds on to the original collection options and index specs so we can check they didn't + // change during computation. + BSONObj _originalOutOptions; + std::list<BSONObj> _originalIndexes; - // True if '_uniqueKeyFields' contains the _id. We store this as a separate boolean to avoid - // repeated lookups into the set. - bool _uniqueKeyIncludesId; + // The temporary namespace for the $out writes. + NamespaceString _tempNs; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_out.idl b/src/mongo/db/pipeline/document_source_out.idl deleted file mode 100644 index a0b3407c46e..00000000000 --- a/src/mongo/db/pipeline/document_source_out.idl +++ /dev/null @@ -1,84 +0,0 @@ -# Copyright (C) 2018-present MongoDB, Inc. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the Server Side Public License, version 1, -# as published by MongoDB, Inc. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# Server Side Public License for more details. -# -# You should have received a copy of the Server Side Public License -# along with this program. If not, see -# <http://www.mongodb.com/licensing/server-side-public-license>. -# -# As a special exception, the copyright holders give permission to link the -# code of portions of this program with the OpenSSL library under certain -# conditions as described in each individual source file and distribute -# linked combinations including the program with the OpenSSL library. You -# must comply with the Server Side Public License in all respects for -# all of the code used other than as permitted herein. If you modify file(s) -# with this exception, you may extend this exception to your version of the -# file(s), but you are not obligated to do so. If you do not wish to do so, -# delete this exception statement from your version. If you delete this -# exception statement from all source files in the program, then also delete -# it in the license file. -# - -# Document source out stage IDL file - -global: - cpp_namespace: "mongo" - cpp_includes: - - "mongo/s/chunk_version.h" - -imports: - - "mongo/idl/basic_types.idl" - - "mongo/s/chunk_version.idl" - -enums: - WriteMode: - description: "Possible write mode values." - type: string - values: - kModeReplaceCollection: "replaceCollection" - kModeInsertDocuments: "insertDocuments" - kModeReplaceDocuments: "replaceDocuments" - -structs: - DocumentSourceOutSpec: - description: A document used to specify the $out stage of an aggregation pipeline. - strict: true - fields: - to: - cpp_name: targetCollection - type: string - description: Name of the target collection. - - db: - cpp_name: targetDb - type: string - optional: true - description: Name of the target database, defaults to the database of the - aggregation. - - mode: - cpp_name: mode - type: WriteMode - description: The write mode for the output operation. - - uniqueKey: - cpp_name: uniqueKey - type: object - optional: true - description: Document of fields representing the unique key. - - targetCollectionVersion: - type: ChunkVersion - optional: true - description: If set, the collection's ChunkVersion found when parsed on mongos. Can - be used to check if a collection has since been dropped and re-created, - in which case the shard key may have changed. As of this writing, this - also can be used to detect if the collection has gone from unsharded to - sharded, and thus now has a shard key. diff --git a/src/mongo/db/pipeline/document_source_out_replace_coll.cpp b/src/mongo/db/pipeline/document_source_out_replace_coll.cpp deleted file mode 100644 index 4c7bf19d1fd..00000000000 --- a/src/mongo/db/pipeline/document_source_out_replace_coll.cpp +++ /dev/null @@ -1,139 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/db/pipeline/document_source_out_replace_coll.h" - -#include "mongo/rpc/get_status_from_command_result.h" - -namespace mongo { - -static AtomicWord<unsigned> aggOutCounter; - -DocumentSourceOutReplaceColl::~DocumentSourceOutReplaceColl() { - DESTRUCTOR_GUARD( - // Make sure we drop the temp collection if anything goes wrong. Errors are ignored - // here because nothing can be done about them. Additionally, if this fails and the - // collection is left behind, it will be cleaned up next time the server is started. - if (_tempNs.size()) { - auto cleanupClient = - pExpCtx->opCtx->getServiceContext()->makeClient("$out_replace_coll_cleanup"); - AlternativeClientRegion acr(cleanupClient); - // Create a new operation context so that any interrputs on the current operation will - // not affect the dropCollection operation below. - auto cleanupOpCtx = cc().makeOperationContext(); - - OutStageWriteBlock writeBlock(cleanupOpCtx.get()); - - // Reset the operation context back to original once dropCollection is done. - ON_BLOCK_EXIT( - [this] { pExpCtx->mongoProcessInterface->setOperationContext(pExpCtx->opCtx); }); - - pExpCtx->mongoProcessInterface->setOperationContext(cleanupOpCtx.get()); - pExpCtx->mongoProcessInterface->directClient()->dropCollection(_tempNs.ns()); - }); -} - -void DocumentSourceOutReplaceColl::initializeWriteNs() { - OutStageWriteBlock writeBlock(pExpCtx->opCtx); - - DBClientBase* conn = pExpCtx->mongoProcessInterface->directClient(); - - const auto& outputNs = getOutputNs(); - _tempNs = NamespaceString(str::stream() << outputNs.db() << ".tmp.agg_out." - << aggOutCounter.addAndFetch(1)); - - // Save the original collection options and index specs so we can check they didn't change - // during computation. - _originalOutOptions = pExpCtx->mongoProcessInterface->getCollectionOptions(outputNs); - _originalIndexes = conn->getIndexSpecs(outputNs.ns()); - - // Check if it's capped to make sure we have a chance of succeeding before we do all the work. - // If the collection becomes capped during processing, the collection options will have changed, - // and the $out will fail. - uassert(17152, - str::stream() << "namespace '" << outputNs.ns() - << "' is capped so it can't be used for $out", - _originalOutOptions["capped"].eoo()); - - // We will write all results into a temporary collection, then rename the temporary - // collection to be the target collection once we are done. - _tempNs = NamespaceString(str::stream() << outputNs.db() << ".tmp.agg_out." - << aggOutCounter.addAndFetch(1)); - - // Create temp collection, copying options from the existing output collection if any. - { - BSONObjBuilder cmd; - cmd << "create" << _tempNs.coll(); - cmd << "temp" << true; - cmd.appendElementsUnique(_originalOutOptions); - - BSONObj info; - uassert(16994, - str::stream() << "failed to create temporary $out collection '" << _tempNs.ns() - << "': " - << getStatusFromCommandResult(info).reason(), - conn->runCommand(outputNs.db().toString(), cmd.done(), info)); - } - - if (_originalIndexes.empty()) { - return; - } - - // Copy the indexes of the output collection to the temp collection. - std::vector<BSONObj> tempNsIndexes; - for (const auto& indexSpec : _originalIndexes) { - // Replace the spec's 'ns' field value, which is the original collection, with the temp - // collection. - tempNsIndexes.push_back(indexSpec.addField(BSON("ns" << _tempNs.ns()).firstElement())); - } - try { - conn->createIndexes(_tempNs.ns(), tempNsIndexes); - } catch (DBException& ex) { - ex.addContext("Copying indexes for $out failed"); - throw; - } -}; - -void DocumentSourceOutReplaceColl::finalize() { - OutStageWriteBlock writeBlock(pExpCtx->opCtx); - - const auto& outputNs = getOutputNs(); - auto renameCommandObj = - BSON("renameCollection" << _tempNs.ns() << "to" << outputNs.ns() << "dropTarget" << true); - - pExpCtx->mongoProcessInterface->renameIfOptionsAndIndexesHaveNotChanged( - pExpCtx->opCtx, renameCommandObj, outputNs, _originalOutOptions, _originalIndexes); - - // The rename succeeded, so the temp collection no longer exists. - _tempNs = {}; -}; - -} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_out_replace_coll.h b/src/mongo/db/pipeline/document_source_out_replace_coll.h deleted file mode 100644 index db0dde23ec0..00000000000 --- a/src/mongo/db/pipeline/document_source_out_replace_coll.h +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include "mongo/db/pipeline/document_source_out.h" -#include "mongo/util/destructor_guard.h" - -namespace mongo { - -/** - * Version of $out which directs writes to a temporary collection, then renames the temp collection - * to the target collection with the 'dropTarget' option set to true. - */ -class DocumentSourceOutReplaceColl final : public DocumentSourceOut { -public: - using DocumentSourceOut::DocumentSourceOut; - - ~DocumentSourceOutReplaceColl(); - - /** - * Sets up a temp collection which contains the same indexes and options as the output - * collection. All writes will be directed to the temp collection. - */ - void initializeWriteNs() final; - - /** - * Renames the temp collection to the output collection with the 'dropTarget' option set to - * true. - */ - void finalize() final; - - const NamespaceString& getWriteNs() const final { - return _tempNs; - }; - -private: - // Holds on to the original collection options and index specs so we can check they didn't - // change during computation. - BSONObj _originalOutOptions; - std::list<BSONObj> _originalIndexes; - - // The temporary namespace for the $out writes. - NamespaceString _tempNs; -}; - -} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_out_test.cpp b/src/mongo/db/pipeline/document_source_out_test.cpp index 6105b2cf84d..69aa6105a72 100644 --- a/src/mongo/db/pipeline/document_source_out_test.cpp +++ b/src/mongo/db/pipeline/document_source_out_test.cpp @@ -41,12 +41,6 @@ namespace { using boost::intrusive_ptr; -StringData kModeFieldName = DocumentSourceOutSpec::kModeFieldName; -StringData kUniqueKeyFieldName = DocumentSourceOutSpec::kUniqueKeyFieldName; -StringData kDefaultMode = WriteMode_serializer(WriteModeEnum::kModeReplaceCollection); -StringData kInsertDocumentsMode = WriteMode_serializer(WriteModeEnum::kModeInsertDocuments); -StringData kReplaceDocumentsMode = WriteMode_serializer(WriteModeEnum::kModeReplaceDocuments); - /** * For the purpsoses of this test, assume every collection is unsharded. Stages may ask this during * setup. For example, to compute its constraints, the $out stage needs to know if the output @@ -95,6 +89,9 @@ TEST_F(DocumentSourceOutTest, FailsToParseIncorrectType) { spec = BSON("$out" << BSONArray()); ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 16990); + + spec = BSON("$out" << BSONObj()); + ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 16990); } TEST_F(DocumentSourceOutTest, AcceptsStringArgument) { @@ -104,319 +101,18 @@ TEST_F(DocumentSourceOutTest, AcceptsStringArgument) { ASSERT_EQ(outStage->getOutputNs().coll(), "some_collection"); } -TEST_F(DocumentSourceOutTest, SerializeDefaultsModeRecreateCollection) { +TEST_F(DocumentSourceOutTest, SerializeToString) { BSONObj spec = BSON("$out" << "some_collection"); auto outStage = createOutStage(spec); auto serialized = outStage->serialize().getDocument(); - ASSERT_EQ(serialized["$out"][kModeFieldName].getStringData(), kDefaultMode); + ASSERT_EQ(serialized["$out"].getStringData(), "some_collection"); // Make sure we can reparse the serialized BSON. auto reparsedOutStage = createOutStage(serialized.toBson()); auto reSerialized = reparsedOutStage->serialize().getDocument(); - ASSERT_EQ(reSerialized["$out"][kModeFieldName].getStringData(), kDefaultMode); -} - -TEST_F(DocumentSourceOutTest, SerializeUniqueKeyDefaultsToId) { - BSONObj spec = BSON("$out" << BSON("to" - << "target" - << "mode" - << kDefaultMode)); - auto outStage = createOutStage(spec); - auto serialized = outStage->serialize().getDocument(); - ASSERT_EQ(serialized["$out"][kModeFieldName].getStringData(), kDefaultMode); - ASSERT_DOCUMENT_EQ(serialized["$out"][kUniqueKeyFieldName].getDocument(), - (Document{{"_id", 1}})); - - spec = BSON("$out" - << "some_collection"); - outStage = createOutStage(spec); - serialized = outStage->serialize().getDocument(); - ASSERT_EQ(serialized["$out"][kModeFieldName].getStringData(), kDefaultMode); - ASSERT_DOCUMENT_EQ(serialized["$out"][kUniqueKeyFieldName].getDocument(), - (Document{{"_id", 1}})); -} - -TEST_F(DocumentSourceOutTest, SerializeCompoundUniqueKey) { - BSONObj spec = BSON("$out" << BSON("to" - << "target" - << "mode" - << kDefaultMode - << "uniqueKey" - << BSON("_id" << 1 << "shardKey" << 1))); - auto outStage = createOutStage(spec); - auto serialized = outStage->serialize().getDocument(); - ASSERT_EQ(serialized["$out"][kModeFieldName].getStringData(), kDefaultMode); - ASSERT_DOCUMENT_EQ(serialized["$out"][kUniqueKeyFieldName].getDocument(), - (Document{{"_id", 1}, {"shardKey", 1}})); -} - -TEST_F(DocumentSourceOutTest, SerializeDottedPathUniqueKey) { - BSONObj spec = BSON("$out" << BSON("to" - << "target" - << "mode" - << kDefaultMode - << "uniqueKey" - << BSON("_id" << 1 << "a.b" << 1))); - auto outStage = createOutStage(spec); - auto serialized = outStage->serialize().getDocument(); - ASSERT_EQ(serialized["$out"][kModeFieldName].getStringData(), kDefaultMode); - ASSERT_DOCUMENT_EQ(serialized["$out"][kUniqueKeyFieldName].getDocument(), - (Document{{"_id", 1}, {"a.b", 1}})); - - spec = BSON("$out" << BSON("to" - << "target" - << "mode" - << kDefaultMode - << "uniqueKey" - << BSON("_id.a" << 1))); - outStage = createOutStage(spec); - serialized = outStage->serialize().getDocument(); - ASSERT_EQ(serialized["$out"][kModeFieldName].getStringData(), kDefaultMode); - ASSERT_DOCUMENT_EQ(serialized["$out"][kUniqueKeyFieldName].getDocument(), - (Document{{"_id.a", 1}})); -} - -TEST_F(DocumentSourceOutTest, SerializeDottedPathUniqueKeySharedPrefix) { - BSONObj spec = BSON("$out" << BSON("to" - << "target" - << "mode" - << kDefaultMode - << "uniqueKey" - << BSON("_id" << 1 << "a.b" << 1 << "a.c" << 1))); - auto outStage = createOutStage(spec); - auto serialized = outStage->serialize().getDocument(); - ASSERT_EQ(serialized["$out"][kModeFieldName].getStringData(), kDefaultMode); - ASSERT_DOCUMENT_EQ(serialized["$out"][kUniqueKeyFieldName].getDocument(), - (Document{{"_id", 1}, {"a.b", 1}, {"a.c", 1}})); -} - -TEST_F(DocumentSourceOutTest, FailsToParseIfToIsNotString) { - BSONObj spec = BSON("$out" << BSONObj()); - ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 40414); - - spec = BSON("$out" << BSON("to" << 1)); - ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, ErrorCodes::TypeMismatch); - - spec = BSON("$out" << BSON("to" << BSON("a" << 1))); - ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, ErrorCodes::TypeMismatch); -} - -TEST_F(DocumentSourceOutTest, FailsToParseIfToIsNotAValidUserCollection) { - BSONObj spec = BSON("$out" << BSON("to" - << "$test" - << "mode" - << kDefaultMode)); - ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 17385); - - spec = BSON("$out" << BSON("to" - << "system.views" - << "mode" - << kDefaultMode)); - ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 17385); - - spec = BSON("$out" << BSON("to" - << ".test." - << "mode" - << kDefaultMode)); - ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, ErrorCodes::InvalidNamespace); -} - -TEST_F(DocumentSourceOutTest, FailsToParseIfDbIsNotString) { - BSONObj spec = BSON("$out" << BSON("to" - << "test" - << "mode" - << kDefaultMode - << "db" - << true)); - ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, ErrorCodes::TypeMismatch); - - spec = BSON("$out" << BSON("to" - << "test" - << "mode" - << kDefaultMode - << "db" - << BSONArray())); - ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, ErrorCodes::TypeMismatch); - - spec = BSON("$out" << BSON("to" - << "test" - << "mode" - << kDefaultMode - << "db" - << BSON("" - << "test"))); - ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, ErrorCodes::TypeMismatch); -} - -TEST_F(DocumentSourceOutTest, FailsToParseIfDbIsNotAValidDatabaseName) { - BSONObj spec = BSON("$out" << BSON("to" - << "test" - << "mode" - << kInsertDocumentsMode - << "db" - << "$invalid")); - ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 17385); - - spec = BSON("$out" << BSON("to" - << "test" - << "mode" - << kInsertDocumentsMode - << "db" - << ".test")); - ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, ErrorCodes::InvalidNamespace); -} - -TEST_F(DocumentSourceOutTest, FailsToParseIfModeIsNotString) { - BSONObj spec = BSON("$out" << BSON("to" - << "test" - << "mode" - << true)); - ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, ErrorCodes::TypeMismatch); - - spec = BSON("$out" << BSON("to" - << "test" - << "mode" - << BSONArray())); - ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, ErrorCodes::TypeMismatch); - - spec = BSON("$out" << BSON("to" - << "test" - << "mode" - << BSON("" << kDefaultMode))); - ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, ErrorCodes::TypeMismatch); -} - -TEST_F(DocumentSourceOutTest, CorrectlyAddressesMatchingTargetAndAggregationNamespaces) { - const auto targetNsSameAsAggregationNs = getExpCtx()->ns; - const auto targetColl = targetNsSameAsAggregationNs.coll(); - const auto targetDb = targetNsSameAsAggregationNs.db(); - - BSONObj spec = BSON( - "$out" << BSON("to" << targetColl << "mode" << kInsertDocumentsMode << "db" << targetDb)); - ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 50992); - - spec = BSON( - "$out" << BSON("to" << targetColl << "mode" << kReplaceDocumentsMode << "db" << targetDb)); - ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 50992); - - spec = BSON("$out" << BSON("to" << targetColl << "mode" << kDefaultMode << "db" << targetDb)); - auto outStage = createOutStage(spec); - ASSERT_EQ(outStage->getOutputNs().db(), targetNsSameAsAggregationNs.db()); - ASSERT_EQ(outStage->getOutputNs().coll(), targetNsSameAsAggregationNs.coll()); -} - -TEST_F(DocumentSourceOutTest, FailsToParseIfModeIsUnsupportedString) { - BSONObj spec = BSON("$out" << BSON("to" - << "test" - << "mode" - << "unsupported")); - ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, ErrorCodes::BadValue); - - spec = BSON("$out" << BSON("to" - << "test" - << "mode" - << "merge")); - ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, ErrorCodes::BadValue); -} - -TEST_F(DocumentSourceOutTest, FailsToParseIfUniqueKeyIsNotAnObject) { - BSONObj spec = BSON("$out" << BSON("to" - << "test" - << "mode" - << kDefaultMode - << "uniqueKey" - << 1)); - ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, ErrorCodes::TypeMismatch); - - spec = BSON("$out" << BSON("to" - << "test" - << "mode" - << kDefaultMode - << "uniqueKey" - << BSONArray())); - ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, ErrorCodes::TypeMismatch); - - spec = BSON("$out" << BSON("to" - << "test" - << "mode" - << kDefaultMode - << "uniqueKey" - << "_id")); - ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, ErrorCodes::TypeMismatch); -} - -TEST_F(DocumentSourceOutTest, FailsToParseIfUniqueKeyHasDuplicateFields) { - BSONObj spec = BSON("$out" << BSON("to" - << "test" - << "mode" - << kDefaultMode - << "uniqueKey" - << BSON("_id" << 1 << "_id" << 1))); - ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, ErrorCodes::BadValue); - - spec = BSON("$out" << BSON("to" - << "test" - << "mode" - << kDefaultMode - << "uniqueKey" - << BSON("x" << 1 << "y" << 1 << "x" << 1))); - ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, ErrorCodes::BadValue); -} - -TEST_F(DocumentSourceOutTest, FailsToParseIfTargetCollectionVersionIsSpecifiedOnMongos) { - BSONObj spec = BSON("$out" << BSON("to" - << "test" - << "mode" - << kDefaultMode - << "uniqueKey" - << BSON("_id" << 1) - << "targetCollectionVersion" - << ChunkVersion(0, 0, OID::gen()).toBSON())); - getExpCtx()->inMongos = true; - ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 50984); - - // Test that 'targetCollectionVersion' is accepted if _from_ mongos. - getExpCtx()->inMongos = false; - getExpCtx()->fromMongos = true; - ASSERT(createOutStage(spec) != nullptr); - - // Test that 'targetCollectionVersion' is not accepted if on mongod but not from mongos. - getExpCtx()->inMongos = false; - getExpCtx()->fromMongos = false; - ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 51018); + ASSERT_EQ(reSerialized["$out"].getStringData(), "some_collection"); } -TEST_F(DocumentSourceOutTest, FailsToParseifUniqueKeyIsNotSentFromMongos) { - BSONObj spec = BSON("$out" << BSON("to" - << "test" - << "mode" - << kDefaultMode - << "targetCollectionVersion" - << ChunkVersion(0, 0, OID::gen()).toBSON())); - getExpCtx()->fromMongos = true; - ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 51017); -} - -TEST_F(DocumentSourceOutTest, CorrectlyUsesTargetDbThatMatchesAggregationDb) { - const auto targetDbSameAsAggregationDb = getExpCtx()->ns.db(); - const auto targetColl = "test"_sd; - BSONObj spec = BSON("$out" << BSON("to" << targetColl << "mode" << kDefaultMode << "db" - << targetDbSameAsAggregationDb)); - - auto outStage = createOutStage(spec); - ASSERT_EQ(outStage->getOutputNs().db(), targetDbSameAsAggregationDb); - ASSERT_EQ(outStage->getOutputNs().coll(), targetColl); -} - -// TODO (SERVER-36832): Allow "replaceCollection" to a foreign database. -TEST_F(DocumentSourceOutTest, CorrectlyUsesForeignTargetDb) { - const auto foreignDb = "someOtherDb"_sd; - const auto targetColl = "test"_sd; - BSONObj spec = - BSON("$out" << BSON("to" << targetColl << "mode" << kDefaultMode << "db" << foreignDb)); - - ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 50939); -} } // namespace } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp index 88a05b076bb..e9f86b947ed 100644 --- a/src/mongo/db/pipeline/document_source_sort.cpp +++ b/src/mongo/db/pipeline/document_source_sort.cpp @@ -542,11 +542,11 @@ boost::optional<DocumentSource::DistributedPlanLogic> DocumentSourceSort::distri return split; } -bool DocumentSourceSort::canRunInParallelBeforeOut( +bool DocumentSourceSort::canRunInParallelBeforeWriteStage( const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const { // This is an interesting special case. If there are no further stages which require merging the // streams into one, a $sort should not require it. This is only the case because the sort order - // doesn't matter for a pipeline ending with a $out stage. We may encounter it here as an + // doesn't matter for a pipeline ending with a write stage. We may encounter it here as an // intermediate stage before a final $group with a $sort, which would make sense. Should we // extend our analysis to detect if an exchange is appropriate in a general pipeline, a $sort // would generally require merging the streams before producing output. diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h index ca68b71ad72..0e0310c5ca8 100644 --- a/src/mongo/db/pipeline/document_source_sort.h +++ b/src/mongo/db/pipeline/document_source_sort.h @@ -91,7 +91,7 @@ public: DepsTracker::State getDependencies(DepsTracker* deps) const final; boost::optional<DistributedPlanLogic> distributedPlanLogic() final; - bool canRunInParallelBeforeOut( + bool canRunInParallelBeforeWriteStage( const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const final; /** diff --git a/src/mongo/db/pipeline/document_source_writer.h b/src/mongo/db/pipeline/document_source_writer.h new file mode 100644 index 00000000000..fd10532d469 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_writer.h @@ -0,0 +1,228 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/platform/basic.h" + +#include <fmt/format.h> + +#include "mongo/db/db_raii.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/read_concern.h" +#include "mongo/db/storage/recovery_unit.h" + +namespace mongo { +using namespace fmt::literals; + +/** + * Manipulates the state of the OperationContext so that while this object is in scope, reads and + * writes will use a local read concern and see the latest version of the data. It will also reset + * ignore_prepared on the recovery unit so that any reads or writes will block on a conflict with a + * prepared transaction. Resets the OperationContext back to its original state upon destruction. + */ +class DocumentSourceWriteBlock { + OperationContext* _opCtx; + repl::ReadConcernArgs _originalArgs; + RecoveryUnit::ReadSource _originalSource; + EnforcePrepareConflictsBlock _enforcePrepareConflictsBlock; + +public: + DocumentSourceWriteBlock(OperationContext* opCtx) + : _opCtx(opCtx), _enforcePrepareConflictsBlock(opCtx) { + _originalArgs = repl::ReadConcernArgs::get(_opCtx); + _originalSource = _opCtx->recoveryUnit()->getTimestampReadSource(); + + repl::ReadConcernArgs::get(_opCtx) = repl::ReadConcernArgs(); + _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::kUnset); + } + + ~DocumentSourceWriteBlock() { + repl::ReadConcernArgs::get(_opCtx) = _originalArgs; + _opCtx->recoveryUnit()->setTimestampReadSource(_originalSource); + } +}; + +/** + * This is a base abstract class for all stages performing a write operation into an output + * collection. The writes are organized in batches in which elements are objects of the templated + * type 'B'. A subclass must override two methods to be able to write into the output collection: + * + * 1. 'makeBatchObject()' - to create an object of type 'B' from the given 'Document', which is, + * essentially, a result of the input source's 'getNext()' . + * 2. 'spill()' - to write the batch into the output collection. + * + * Two other virtual methods exist which a subclass may override: 'initialize()' and 'finalize()', + * which are called before the first element is read from the input source, and after the last one + * has been read, respectively. + */ +template <typename B> +class DocumentSourceWriter : public DocumentSource { +public: + using BatchObject = B; + using BatchedObjects = std::vector<BatchObject>; + + DocumentSourceWriter(NamespaceString outputNs, + const boost::intrusive_ptr<ExpressionContext>& expCtx) + : DocumentSource(expCtx), + _outputNs(std::move(outputNs)), + _writeConcern(expCtx->opCtx->getWriteConcern()) {} + + DepsTracker::State getDependencies(DepsTracker* deps) const override { + deps->needWholeDocument = true; + return DepsTracker::State::EXHAUSTIVE_ALL; + } + + GetModPathsReturn getModifiedPaths() const override { + // For purposes of tracking which fields come from where, the writer stage does not modify + // any fields by default. + return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>{}, {}}; + } + + boost::optional<DistributedPlanLogic> distributedPlanLogic() override { + return DistributedPlanLogic{nullptr, this, boost::none}; + } + + bool canRunInParallelBeforeWriteStage( + const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const override { + return true; + } + + const NamespaceString& getOutputNs() const { + return _outputNs; + } + + GetNextResult getNext() final override; + +protected: + /** + * Prepares the stage to be able to write incoming batches. + */ + virtual void initialize() {} + + /** + * Finalize the output collection, called when there are no more documents to write. + */ + virtual void finalize() {} + + /** + * Writes the documents in 'batch' to the output namespace. + */ + virtual void spill(BatchedObjects&& batch) = 0; + + /** + * Creates a batch object from the given document and returns it to the caller along with the + * object size. + */ + virtual std::pair<B, int> makeBatchObject(Document&& doc) const = 0; + + /** + * A subclass may override this method to enable a fail point right after a next input element + * has been retrieved, but not processed yet. + */ + virtual void waitWhileFailPointEnabled() {} + + // The namespace where the output will be written to. + const NamespaceString _outputNs; + + // Stash the writeConcern of the original command as the operation context may change by the + // time we start to spill writes. This is because certain aggregations (e.g. $exchange) + // establish cursors with batchSize 0 then run subsequent getMore's which use a new operation + // context. The getMore's will not have an attached writeConcern however we still want to + // respect the writeConcern of the original command. + WriteConcernOptions _writeConcern; + +private: + bool _initialized{false}; + bool _done{false}; +}; + +template <typename B> +DocumentSource::GetNextResult DocumentSourceWriter<B>::getNext() { + pExpCtx->checkForInterrupt(); + + if (_done) { + return GetNextResult::makeEOF(); + } + + if (!_initialized) { + // Explain should never try to actually execute any writes. We only ever expect + // getNext() to be called for the 'executionStats' and 'allPlansExecution' explain + // modes. This assertion should not be triggered for 'queryPlanner' explain, which + // is perfectly legal. + uassert(51029, + "explain of {} is not allowed with verbosity {}"_format( + getSourceName(), ExplainOptions::verbosityString(*pExpCtx->explain)), + !pExpCtx->explain); + initialize(); + _initialized = true; + } + + BatchedObjects batch; + int bufferedBytes = 0; + + auto nextInput = pSource->getNext(); + for (; nextInput.isAdvanced(); nextInput = pSource->getNext()) { + waitWhileFailPointEnabled(); + + auto doc = nextInput.releaseDocument(); + auto[obj, objSize] = makeBatchObject(std::move(doc)); + + bufferedBytes += objSize; + if (!batch.empty() && + (bufferedBytes > BSONObjMaxUserSize || batch.size() >= write_ops::kMaxWriteBatchSize)) { + spill(std::move(batch)); + batch.clear(); + bufferedBytes = objSize; + } + batch.push_back(obj); + } + if (!batch.empty()) { + spill(std::move(batch)); + batch.clear(); + } + + switch (nextInput.getStatus()) { + case GetNextResult::ReturnStatus::kAdvanced: { + MONGO_UNREACHABLE; // We consumed all advances above. + } + case GetNextResult::ReturnStatus::kPauseExecution: { + return nextInput; // Propagate the pause. + } + case GetNextResult::ReturnStatus::kEOF: { + _done = true; + finalize(); + return nextInput; + } + } + MONGO_UNREACHABLE; +} + +} // namespace mongo diff --git a/src/mongo/db/pipeline/mongo_process_common.cpp b/src/mongo/db/pipeline/mongo_process_common.cpp index 5db5dd993ee..a2527aded86 100644 --- a/src/mongo/db/pipeline/mongo_process_common.cpp +++ b/src/mongo/db/pipeline/mongo_process_common.cpp @@ -167,4 +167,17 @@ std::vector<FieldPath> MongoProcessCommon::_shardKeyToDocumentKeyFields( return result; } +std::set<FieldPath> MongoProcessCommon::_convertToFieldPaths( + const std::vector<std::string>& fields) const { + std::set<FieldPath> fieldPaths; + + for (const auto& field : fields) { + const auto res = fieldPaths.insert(FieldPath(field)); + uassert(ErrorCodes::BadValue, + str::stream() << "Found a duplicate field '" << field << "'", + res.second); + } + return fieldPaths; +} + } // namespace mongo diff --git a/src/mongo/db/pipeline/mongo_process_common.h b/src/mongo/db/pipeline/mongo_process_common.h index ad6bf255c5f..cef5cb8c7db 100644 --- a/src/mongo/db/pipeline/mongo_process_common.h +++ b/src/mongo/db/pipeline/mongo_process_common.h @@ -90,6 +90,12 @@ protected: virtual void _reportCurrentOpsForIdleSessions(OperationContext* opCtx, CurrentOpUserMode userMode, std::vector<BSONObj>* ops) const = 0; + + /** + * Converts an array of field names into a set of FieldPath. Throws if 'fields' contains + * duplicate elements. + */ + std::set<FieldPath> _convertToFieldPaths(const std::vector<std::string>& fields) const; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h index 87dce6017e9..7f2d8eab5d7 100644 --- a/src/mongo/db/pipeline/mongo_process_interface.h +++ b/src/mongo/db/pipeline/mongo_process_interface.h @@ -80,8 +80,9 @@ public: * 3. boost::optional<BSONObj> - for pipeline-style updated, specifies variables that can be * referred to in the pipeline performing the custom update. */ - using BatchedObjects = - std::vector<std::tuple<BSONObj, write_ops::UpdateModification, boost::optional<BSONObj>>>; + using BatchObject = + std::tuple<BSONObj, write_ops::UpdateModification, boost::optional<BSONObj>>; + using BatchedObjects = std::vector<BatchObject>; enum class CurrentOpConnectionsMode { kIncludeIdle, kExcludeIdle }; enum class CurrentOpUserMode { kIncludeAll, kExcludeOthers }; @@ -346,15 +347,16 @@ public: /** * Returns true if there is an index on 'nss' with properties that will guarantee that a - * document with non-array values for each of 'uniqueKeyPaths' will have at most one matching + * document with non-array values for each of 'fieldPaths' will have at most one matching * document in 'nss'. * * Specifically, such an index must include all the fields, be unique, not be a partial index, * and match the operation's collation as given by 'expCtx'. */ - virtual bool uniqueKeyIsSupportedByIndex(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& nss, - const std::set<FieldPath>& uniqueKeyPaths) const = 0; + virtual bool fieldsHaveSupportingUniqueIndex( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& nss, + const std::set<FieldPath>& fieldPaths) const = 0; /** * Refreshes the CatalogCache entry for the namespace 'nss', and returns the epoch associated @@ -376,6 +378,21 @@ public: ChunkVersion targetCollectionVersion) const = 0; virtual std::unique_ptr<ResourceYielder> getResourceYielder() const = 0; + + /** + * If the user supplied the 'fields' array, ensures that it can be used to uniquely identify a + * document. Otherwise, picks a default unique key, which can be either the "_id" field, or + * or a shard key, depending on the 'outputNs' collection type and the server type (mongod or + * mongos). Also returns an optional ChunkVersion, populated with the version stored in the + * sharding catalog when we asked for the shard key (on mongos only). On mongod, this is the + * value of the 'targetCollectionVersion' parameter, which is the target shard version of the + * collection, as sent by mongos. + */ + virtual std::pair<std::set<FieldPath>, boost::optional<ChunkVersion>> + ensureFieldsUniqueOrResolveDocumentKey(const boost::intrusive_ptr<ExpressionContext>& expCtx, + boost::optional<std::vector<std::string>> fields, + boost::optional<ChunkVersion> targetCollectionVersion, + const NamespaceString& outputNs) const = 0; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/mongos_process_interface.cpp b/src/mongo/db/pipeline/mongos_process_interface.cpp index 5072c389bc0..331b02bcbcf 100644 --- a/src/mongo/db/pipeline/mongos_process_interface.cpp +++ b/src/mongo/db/pipeline/mongos_process_interface.cpp @@ -265,10 +265,10 @@ bool MongoSInterface::isSharded(OperationContext* opCtx, const NamespaceString& return routingInfo.isOK() && routingInfo.getValue().cm(); } -bool MongoSInterface::uniqueKeyIsSupportedByIndex( +bool MongoSInterface::fieldsHaveSupportingUniqueIndex( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss, - const std::set<FieldPath>& uniqueKeyPaths) const { + const std::set<FieldPath>& fieldPaths) const { const auto opCtx = expCtx->opCtx; const auto routingInfo = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); @@ -281,17 +281,59 @@ bool MongoSInterface::uniqueKeyIsSupportedByIndex( BSON("listIndexes" << nss.coll()), opCtx->hasDeadline() ? opCtx->getRemainingMaxTimeMillis() : Milliseconds(-1)); - // If the namespace does not exist, then the unique key *must* be _id only. + // If the namespace does not exist, then the field paths *must* be _id only. if (response.getStatus() == ErrorCodes::NamespaceNotFound) { - return uniqueKeyPaths == std::set<FieldPath>{"_id"}; + return fieldPaths == std::set<FieldPath>{"_id"}; } uassertStatusOK(response); const auto& indexes = response.getValue().docs; - return std::any_of( - indexes.begin(), indexes.end(), [&expCtx, &uniqueKeyPaths](const auto& index) { - return supportsUniqueKey(expCtx, index, uniqueKeyPaths); - }); + return std::any_of(indexes.begin(), indexes.end(), [&expCtx, &fieldPaths](const auto& index) { + return supportsUniqueKey(expCtx, index, fieldPaths); + }); +} + +std::pair<std::set<FieldPath>, boost::optional<ChunkVersion>> +MongoSInterface::ensureFieldsUniqueOrResolveDocumentKey( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + boost::optional<std::vector<std::string>> fields, + boost::optional<ChunkVersion> targetCollectionVersion, + const NamespaceString& outputNs) const { + invariant(expCtx->inMongos); + uassert( + 51179, "Received unexpected 'targetCollectionVersion' on mongos", !targetCollectionVersion); + + if (fields) { + // Convert 'fields' array to a set of FieldPaths. + auto fieldPaths = _convertToFieldPaths(*fields); + uassert(51190, + "Cannot find index to verify that join fields will be unique", + fieldsHaveSupportingUniqueIndex(expCtx, outputNs, fieldPaths)); + + // If the user supplies the 'fields' array, we don't need to attach a ChunkVersion for the + // shards since we are not at risk of 'guessing' the wrong shard key. + return {fieldPaths, boost::none}; + } + + // In case there are multiple shards which will perform this stage in parallel, we need to + // figure out and attach the collection's shard version to ensure each shard is talking about + // the same version of the collection. This mongos will coordinate that. We force a catalog + // refresh to do so because there is no shard versioning protocol on this namespace and so we + // otherwise could not be sure this node is (or will become) at all recent. We will also + // figure out and attach the 'joinFields' to send to the shards. + + // There are edge cases when the collection could be dropped or re-created during or near the + // time of the operation (for example, during aggregation). This is okay - we are mostly + // paranoid that this mongos is very stale and want to prevent returning an error if the + // collection was dropped a long time ago. Because of this, we are okay with piggy-backing off + // another thread's request to refresh the cache, simply waiting for that request to return + // instead of forcing another refresh. + targetCollectionVersion = refreshAndGetCollectionVersion(expCtx, outputNs); + + auto docKeyPaths = collectDocumentKeyFieldsActingAsRouter(expCtx->opCtx, outputNs); + return {std::set<FieldPath>(std::make_move_iterator(docKeyPaths.begin()), + std::make_move_iterator(docKeyPaths.end())), + targetCollectionVersion}; } } // namespace mongo diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h index 4c3b06ad30c..6a9d4b2431f 100644 --- a/src/mongo/db/pipeline/mongos_process_interface.h +++ b/src/mongo/db/pipeline/mongos_process_interface.h @@ -42,7 +42,7 @@ namespace mongo { * Class to provide access to mongos-specific implementations of methods required by some * document sources. */ -class MongoSInterface final : public MongoProcessCommon { +class MongoSInterface : public MongoProcessCommon { public: static BSONObj createPassthroughCommandForShard(OperationContext* opCtx, const AggregationRequest& request, @@ -217,9 +217,9 @@ public: MONGO_UNREACHABLE; } - bool uniqueKeyIsSupportedByIndex(const boost::intrusive_ptr<ExpressionContext>&, - const NamespaceString&, - const std::set<FieldPath>& uniqueKeyPaths) const final; + bool fieldsHaveSupportingUniqueIndex(const boost::intrusive_ptr<ExpressionContext>&, + const NamespaceString&, + const std::set<FieldPath>& fieldPaths) const; void checkRoutingInfoEpochOrThrow(const boost::intrusive_ptr<ExpressionContext>&, const NamespaceString&, @@ -231,6 +231,12 @@ public: return nullptr; } + std::pair<std::set<FieldPath>, boost::optional<ChunkVersion>> + ensureFieldsUniqueOrResolveDocumentKey(const boost::intrusive_ptr<ExpressionContext>& expCtx, + boost::optional<std::vector<std::string>> fields, + boost::optional<ChunkVersion> targetCollectionVersion, + const NamespaceString& outputNs) const override; + protected: BSONObj _reportCurrentOpForClient(OperationContext* opCtx, Client* client, diff --git a/src/mongo/db/pipeline/mongos_process_interface_test.cpp b/src/mongo/db/pipeline/mongos_process_interface_test.cpp new file mode 100644 index 00000000000..91d79185481 --- /dev/null +++ b/src/mongo/db/pipeline/mongos_process_interface_test.cpp @@ -0,0 +1,86 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/aggregation_context_fixture.h" +#include "mongo/db/pipeline/mongos_process_interface.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +class MongoProcessInterfaceForTest : public MongoSInterface { +public: + using MongoSInterface::MongoSInterface; + + bool fieldsHaveSupportingUniqueIndex(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& nss, + const std::set<FieldPath>& fieldPaths) const override { + return hasSupportingIndexForFields; + } + + bool hasSupportingIndexForFields{true}; +}; + +class MongoSInterfaceTest : public AggregationContextFixture { +public: + MongoSInterfaceTest() { + getExpCtx()->inMongos = true; + } + + auto makeProcessInterface() { + return std::make_unique<MongoProcessInterfaceForTest>(); + } +}; + +TEST_F(MongoSInterfaceTest, FailsToEnsureFieldsUniqueIfTargetCollectionVersionIsSpecified) { + auto expCtx = getExpCtx(); + auto targetCollectionVersion = boost::make_optional(ChunkVersion(0, 0, OID::gen())); + auto processInterface = makeProcessInterface(); + + ASSERT_THROWS_CODE(processInterface->ensureFieldsUniqueOrResolveDocumentKey( + expCtx, {{"_id"}}, targetCollectionVersion, expCtx->ns), + AssertionException, + 51179); +} + +TEST_F(MongoSInterfaceTest, FailsToEnsureFieldsUniqueIfNotSupportedByIndex) { + auto expCtx = getExpCtx(); + auto targetCollectionVersion = boost::none; + auto processInterface = makeProcessInterface(); + + processInterface->hasSupportingIndexForFields = false; + ASSERT_THROWS_CODE(processInterface->ensureFieldsUniqueOrResolveDocumentKey( + expCtx, {{"x"}}, targetCollectionVersion, expCtx->ns), + AssertionException, + 51190); +} +} // namespace +} // namespace mongo diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index 1b1f2748b66..d6179e48e78 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -1808,8 +1808,7 @@ public: rawPipeline.push_back(stageElem.embeddedObject()); } AggregationRequest request(kTestNss, rawPipeline); - intrusive_ptr<ExpressionContextForTest> ctx = - new ExpressionContextForTest(&_opCtx, request); + intrusive_ptr<ExpressionContextForTest> ctx = createExpressionContext(request); TempDir tempDir("PipelineTest"); ctx->tempDir = tempDir.path(); @@ -1837,6 +1836,11 @@ public: virtual ~Base() {} + virtual intrusive_ptr<ExpressionContextForTest> createExpressionContext( + const AggregationRequest& request) { + return new ExpressionContextForTest(&_opCtx, request); + } + protected: std::unique_ptr<Pipeline, PipelineDeleter> mergePipe; std::unique_ptr<Pipeline, PipelineDeleter> shardPipe; @@ -2271,7 +2275,7 @@ class ShouldNotCoalesceUnwindNotOnAs : public Base { namespace needsPrimaryShardMerger { -class needsPrimaryShardMergerBase : public Base { +class ShardMergerBase : public Base { public: void run() override { Base::run(); @@ -2281,7 +2285,7 @@ public: virtual bool needsPrimaryShardMerger() = 0; }; -class Out : public needsPrimaryShardMergerBase { +class Out : public ShardMergerBase { bool needsPrimaryShardMerger() { return true; } @@ -2292,13 +2296,56 @@ class Out : public needsPrimaryShardMergerBase { return "[]"; } string mergePipeJson() { - return "[{$out: {to: 'outColl', db: 'a', mode: '" + - WriteMode_serializer(WriteModeEnum::kModeReplaceCollection) + - "', uniqueKey: {_id: 1}}}]"; + return "[{$out: 'outColl'}]"; + } +}; + +class MergeWithUnshardedCollection : public ShardMergerBase { + bool needsPrimaryShardMerger() { + return true; + } + string inputPipeJson() { + return "[{$merge: 'outColl'}]"; + } + string shardPipeJson() { + return "[]"; + } + string mergePipeJson() { + return "[{$merge: {into: {db: 'a', coll: 'outColl'}, on: '_id', " + "whenMatched: 'merge', whenNotMatched: 'insert'}}]"; + } +}; + +class MergeWithShardedCollection : public ShardMergerBase { + intrusive_ptr<ExpressionContextForTest> createExpressionContext( + const AggregationRequest& request) override { + class ProcessInterface : public StubMongoProcessInterface { + bool isSharded(OperationContext* opCtx, const NamespaceString& ns) override { + return true; + } + }; + + auto expCtx = ShardMergerBase::createExpressionContext(request); + expCtx->mongoProcessInterface = std::make_shared<ProcessInterface>(); + return expCtx; + } + + bool needsPrimaryShardMerger() { + return false; + } + string inputPipeJson() { + return "[{$merge: 'outColl'}]"; + } + string shardPipeJson() { + return "[{$merge: {into: {db: 'a', coll: 'outColl'}, on: '_id', " + "whenMatched: 'merge', whenNotMatched: 'insert'}}]"; + } + string mergePipeJson() { + return "[]"; } }; -class Project : public needsPrimaryShardMergerBase { +class Project : public ShardMergerBase { bool needsPrimaryShardMerger() { return false; } @@ -2313,7 +2360,7 @@ class Project : public needsPrimaryShardMergerBase { } }; -class LookUp : public needsPrimaryShardMergerBase { +class LookUp : public ShardMergerBase { bool needsPrimaryShardMerger() { return true; } @@ -3252,6 +3299,8 @@ public: ShardedMatchSortProjLimBecomesMatchTopKSortProj>(); add<Optimizations::Sharded::limitFieldsSentFromShardsToMerger::ShardAlreadyExhaustive>(); add<Optimizations::Sharded::needsPrimaryShardMerger::Out>(); + add<Optimizations::Sharded::needsPrimaryShardMerger::MergeWithUnshardedCollection>(); + add<Optimizations::Sharded::needsPrimaryShardMerger::MergeWithShardedCollection>(); add<Optimizations::Sharded::needsPrimaryShardMerger::Project>(); add<Optimizations::Sharded::needsPrimaryShardMerger::LookUp>(); } diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp index 7d493a6ba7a..02f47a0fe1c 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.cpp +++ b/src/mongo/db/pipeline/process_interface_standalone.cpp @@ -527,10 +527,10 @@ std::vector<BSONObj> MongoInterfaceStandalone::getMatchingPlanCacheEntryStats( return planCache->getMatchingStats(serializer, predicate); } -bool MongoInterfaceStandalone::uniqueKeyIsSupportedByIndex( +bool MongoInterfaceStandalone::fieldsHaveSupportingUniqueIndex( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss, - const std::set<FieldPath>& uniqueKeyPaths) const { + const std::set<FieldPath>& fieldPaths) const { auto* opCtx = expCtx->opCtx; // We purposefully avoid a helper like AutoGetCollection here because we don't want to check the // db version or do anything else. We simply want to protect against concurrent modifications to @@ -541,13 +541,13 @@ bool MongoInterfaceStandalone::uniqueKeyIsSupportedByIndex( auto db = databaseHolder->getDb(opCtx, nss.db()); auto collection = db ? db->getCollection(opCtx, nss) : nullptr; if (!collection) { - return uniqueKeyPaths == std::set<FieldPath>{"_id"}; + return fieldPaths == std::set<FieldPath>{"_id"}; } auto indexIterator = collection->getIndexCatalog()->getIndexIterator(opCtx, false); while (indexIterator->more()) { const IndexCatalogEntry* entry = indexIterator->next(); - if (supportsUniqueKey(expCtx, entry, uniqueKeyPaths)) { + if (supportsUniqueKey(expCtx, entry, fieldPaths)) { return true; } } @@ -635,4 +635,34 @@ std::unique_ptr<ResourceYielder> MongoInterfaceStandalone::getResourceYielder() return std::make_unique<MongoDResourceYielder>(); } + +std::pair<std::set<FieldPath>, boost::optional<ChunkVersion>> +MongoInterfaceStandalone::ensureFieldsUniqueOrResolveDocumentKey( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + boost::optional<std::vector<std::string>> fields, + boost::optional<ChunkVersion> targetCollectionVersion, + const NamespaceString& outputNs) const { + if (targetCollectionVersion) { + uassert(51123, "Unexpected target chunk version specified", expCtx->fromMongos); + // If mongos has sent us a target shard version, we need to be sure we are prepared to + // act as a router which is at least as recent as that mongos. + checkRoutingInfoEpochOrThrow(expCtx, outputNs, *targetCollectionVersion); + } + + if (!fields) { + uassert(51124, "Expected fields to be provided from mongos", !expCtx->fromMongos); + return {std::set<FieldPath>{"_id"}, targetCollectionVersion}; + } + + // Make sure the 'fields' array has a supporting index. Skip this check if the command is sent + // from mongos since the 'fields' check would've happened already. + auto fieldPaths = _convertToFieldPaths(*fields); + if (!expCtx->fromMongos) { + uassert(51183, + "Cannot find index to verify that join fields will be unique", + fieldsHaveSupportingUniqueIndex(expCtx, outputNs, fieldPaths)); + } + return {fieldPaths, targetCollectionVersion}; +} + } // namespace mongo diff --git a/src/mongo/db/pipeline/process_interface_standalone.h b/src/mongo/db/pipeline/process_interface_standalone.h index 9c9fc301c18..e08526d99db 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.h +++ b/src/mongo/db/pipeline/process_interface_standalone.h @@ -127,9 +127,9 @@ public: const NamespaceString&, const MatchExpression*) const final; - bool uniqueKeyIsSupportedByIndex(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& nss, - const std::set<FieldPath>& uniqueKeyPaths) const final; + bool fieldsHaveSupportingUniqueIndex(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& nss, + const std::set<FieldPath>& fieldPaths) const; virtual void checkRoutingInfoEpochOrThrow(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss, @@ -139,6 +139,12 @@ public: std::unique_ptr<ResourceYielder> getResourceYielder() const override; + std::pair<std::set<FieldPath>, boost::optional<ChunkVersion>> + ensureFieldsUniqueOrResolveDocumentKey(const boost::intrusive_ptr<ExpressionContext>& expCtx, + boost::optional<std::vector<std::string>> fields, + boost::optional<ChunkVersion> targetCollectionVersion, + const NamespaceString& outputNs) const override; + protected: BSONObj _reportCurrentOpForClient(OperationContext* opCtx, Client* client, diff --git a/src/mongo/db/pipeline/process_interface_standalone_test.cpp b/src/mongo/db/pipeline/process_interface_standalone_test.cpp new file mode 100644 index 00000000000..fa246fc2e9d --- /dev/null +++ b/src/mongo/db/pipeline/process_interface_standalone_test.cpp @@ -0,0 +1,130 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/aggregation_context_fixture.h" +#include "mongo/db/pipeline/process_interface_standalone.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +class MongoProcessInterfaceForTest : public MongoInterfaceStandalone { +public: + using MongoInterfaceStandalone::MongoInterfaceStandalone; + + bool fieldsHaveSupportingUniqueIndex(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& nss, + const std::set<FieldPath>& fields) const override { + return hasSupportingIndexForFields; + } + + void checkRoutingInfoEpochOrThrow(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString&, + ChunkVersion) const override { + // Assume it always matches for our tests here. + return; + } + + bool hasSupportingIndexForFields{true}; +}; + +class ProcessInterfaceStandaloneTest : public AggregationContextFixture { +public: + auto makeProcessInterface() { + return std::make_unique<MongoProcessInterfaceForTest>(getExpCtx()->opCtx); + } +}; + +TEST_F(ProcessInterfaceStandaloneTest, FailsToEnsureFieldsUniqueIfFieldsHaveDuplicates) { + auto expCtx = getExpCtx(); + auto targetCollectionVersion = boost::none; + auto processInterface = makeProcessInterface(); + + ASSERT_THROWS_CODE(processInterface->ensureFieldsUniqueOrResolveDocumentKey( + expCtx, {{"_id", "_id"}}, targetCollectionVersion, expCtx->ns), + AssertionException, + ErrorCodes::BadValue); + ASSERT_THROWS_CODE(processInterface->ensureFieldsUniqueOrResolveDocumentKey( + expCtx, {{"x", "y", "x"}}, targetCollectionVersion, expCtx->ns), + AssertionException, + ErrorCodes::BadValue); +} + +TEST_F(ProcessInterfaceStandaloneTest, + FailsToEnsureFieldsUniqueIfTargetCollectionVersionIsSpecifiedOnMongos) { + auto expCtx = getExpCtx(); + auto targetCollectionVersion = boost::make_optional(ChunkVersion(0, 0, OID::gen())); + auto processInterface = makeProcessInterface(); + + // Test that 'targetCollectionVersion' is not accepted if not from mongos. + expCtx->fromMongos = false; + ASSERT_THROWS_CODE(processInterface->ensureFieldsUniqueOrResolveDocumentKey( + expCtx, {{"_id"}}, targetCollectionVersion, expCtx->ns), + AssertionException, + 51123); + + // Test that 'targetCollectionVersion' is accepted if from mongos. + expCtx->fromMongos = true; + auto[joinKey, chunkVersion] = processInterface->ensureFieldsUniqueOrResolveDocumentKey( + expCtx, {{"_id"}}, targetCollectionVersion, expCtx->ns); + ASSERT_EQ(joinKey.size(), 1UL); + ASSERT_EQ(joinKey.count(FieldPath("_id")), 1UL); + ASSERT(chunkVersion); + ASSERT_EQ(*chunkVersion, *targetCollectionVersion); +} + +TEST_F(ProcessInterfaceStandaloneTest, FailsToEnsureFieldsUniqueIfJoinFieldsAreNotSentFromMongos) { + auto expCtx = getExpCtx(); + auto targetCollectionVersion = boost::make_optional(ChunkVersion(0, 0, OID::gen())); + auto processInterface = makeProcessInterface(); + + expCtx->fromMongos = true; + ASSERT_THROWS_CODE(processInterface->ensureFieldsUniqueOrResolveDocumentKey( + expCtx, boost::none, targetCollectionVersion, expCtx->ns), + AssertionException, + 51124); +} + +TEST_F(ProcessInterfaceStandaloneTest, + FailsToEnsureFieldsUniqueIfFieldsDoesNotHaveSupportingUniqueIndex) { + auto expCtx = getExpCtx(); + auto targetCollectionVersion = boost::none; + auto processInterface = makeProcessInterface(); + + expCtx->fromMongos = false; + processInterface->hasSupportingIndexForFields = false; + ASSERT_THROWS_CODE(processInterface->ensureFieldsUniqueOrResolveDocumentKey( + expCtx, {{"x"}}, targetCollectionVersion, expCtx->ns), + AssertionException, + 51183); +} +} // namespace +} // namespace mongo diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h index 77f76eb5c9b..f97f39ac2f5 100644 --- a/src/mongo/db/pipeline/stub_mongo_process_interface.h +++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h @@ -202,9 +202,9 @@ public: MONGO_UNREACHABLE; } - bool uniqueKeyIsSupportedByIndex(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& nss, - const std::set<FieldPath>& uniqueKeyPaths) const override { + bool fieldsHaveSupportingUniqueIndex(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& nss, + const std::set<FieldPath>& fieldPaths) const override { return true; } @@ -223,5 +223,21 @@ public: std::unique_ptr<ResourceYielder> getResourceYielder() const override { return nullptr; } + + std::pair<std::set<FieldPath>, boost::optional<ChunkVersion>> + ensureFieldsUniqueOrResolveDocumentKey(const boost::intrusive_ptr<ExpressionContext>& expCtx, + boost::optional<std::vector<std::string>> fields, + boost::optional<ChunkVersion> targetCollectionVersion, + const NamespaceString& outputNs) const override { + if (!fields) { + return {std::set<FieldPath>{"_id"}, targetCollectionVersion}; + } + + std::set<FieldPath> fieldPaths; + for (const auto& field : *fields) { + fieldPaths.insert(FieldPath(field)); + } + return {fieldPaths, targetCollectionVersion}; + } }; } // namespace mongo |