diff options
17 files changed, 232 insertions, 198 deletions
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index 16dc1d93843..325e74d7c51 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -160,6 +160,7 @@ env.Library( '$BUILD_DIR/mongo/db/logical_session_cache_impl', '$BUILD_DIR/mongo/db/logical_session_id', '$BUILD_DIR/mongo/db/logical_session_id_helpers', + '$BUILD_DIR/mongo/db/pipeline/change_stream_pipeline', '$BUILD_DIR/mongo/db/pipeline/pipeline', '$BUILD_DIR/mongo/db/repl/isself', '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface', diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 1554ab17ab3..2ad08caa453 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -223,17 +223,10 @@ pipelineEnv.InjectThirdParty(libraries=['snappy']) pipelineEnv.Library( target='pipeline', source=[ - 'change_stream_document_diff_parser.cpp', 'document_source.cpp', 'document_source_add_fields.cpp', 'document_source_bucket.cpp', 'document_source_bucket_auto.cpp', - 'document_source_change_stream.cpp', - 'document_source_change_stream_close_cursor.cpp', - 'document_source_change_stream_transform.cpp', - 'document_source_change_stream_unwind_transactions.cpp', - 'document_source_check_invalidate.cpp', - 'document_source_check_resume_token.cpp', 'document_source_coll_stats.cpp', 'document_source_count.cpp', 'document_source_current_op.cpp', @@ -251,8 +244,6 @@ pipelineEnv.Library( 'document_source_list_local_sessions.cpp', 'document_source_list_sessions.cpp', 'document_source_lookup.cpp', - 'document_source_lookup_change_post_image.cpp', - 'document_source_lookup_change_pre_image.cpp', 'document_source_match.cpp', 'document_source_merge.cpp', 'document_source_operation_metrics.cpp', @@ -339,6 +330,28 @@ pipelineEnv.Library( ) env.Library( + target="change_stream_pipeline", + source=[ + 'change_stream_document_diff_parser.cpp', + 'document_source_change_stream.cpp', + 'document_source_change_stream_close_cursor.cpp', + 'document_source_change_stream_transform.cpp', + 'document_source_change_stream_unwind_transactions.cpp', + 'document_source_check_invalidate.cpp', + 'document_source_check_resume_token.cpp', + 'document_source_lookup_change_post_image.cpp', + 'document_source_lookup_change_pre_image.cpp', + 'document_source_update_on_add_shard.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/db/pipeline/pipeline', + '$BUILD_DIR/mongo/db/pipeline/sharded_agg_helpers', + '$BUILD_DIR/mongo/s/query/router_exec_stage', + ], +) + + +env.Library( target='runtime_constants_idl', source=[ 'legacy_runtime_constants.idl', @@ -497,6 +510,7 @@ env.CppUnitTest( '$BUILD_DIR/mongo/util/clock_source_mock', 'accumulator', 'aggregation_request_helper', + 'change_stream_pipeline', 'document_source_mock', 'document_sources_idl', 'expression_context', diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index b450cae4c72..05133ece477 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -46,6 +46,7 @@ #include "mongo/db/pipeline/document_source_lookup_change_post_image.h" #include "mongo/db/pipeline/document_source_lookup_change_pre_image.h" #include "mongo/db/pipeline/document_source_sort.h" +#include "mongo/db/pipeline/document_source_update_on_add_shard.h" #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/lite_parsed_document_source.h" #include "mongo/db/pipeline/resume_token.h" @@ -448,6 +449,23 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<Expression // The resume stage must come after the check invalidate stage so that the former can determine // whether the event that matches the resume token should be followed by an "invalidate" event. stages.push_back(DocumentSourceCheckInvalidate::create(expCtx, startAfterInvalidate)); + + // The resume stage 'DocumentSourceCheckResumability' should come before the split point stage + // 'DocumentSourceUpdateOnAddShard'. + if (resumeStage && + resumeStage->getSourceName() == DocumentSourceCheckResumability::kStageName) { + stages.push_back(resumeStage); + resumeStage.reset(); + } + + // If the pipeline is built on MongoS, then the stage 'DocumentSourceUpdateOnAddShard' acts as + // the split point for the pipline. All stages before this stages will run on shards and all + // stages after and inclusive of this stage will run on the MongoS. + if (expCtx->inMongos) { + stages.push_back(DocumentSourceUpdateOnAddShard::create(expCtx)); + } + + // This resume stage should be 'DocumentSourceEnsureResumeTokenPresent'. if (resumeStage) { stages.push_back(resumeStage); } @@ -491,16 +509,8 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson( auto stages = buildPipeline(expCtx, spec, elem); - const bool csOptFeatureFlag = - feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV(); - - if (expCtx->inMongos && csOptFeatureFlag) { - // TODO SERVER-55491: replace with DocumentSourceUpdateOnAddShard. - stages.push_back(DocumentSourceChangeStreamPipelineSplitter::create(expCtx)); - } - if (!expCtx->needsMerge) { - if (!csOptFeatureFlag) { + if (!feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()) { // There should only be one close cursor stage. If we're on the shards and producing // input to be merged, do not add a close cursor stage, since the mongos will already // have one. @@ -527,28 +537,6 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson( return stages; } -BSONObj DocumentSourceChangeStream::replaceResumeTokenInCommand(BSONObj originalCmdObj, - Document resumeToken) { - Document originalCmd(originalCmdObj); - auto pipeline = originalCmd[AggregateCommandRequest::kPipelineFieldName].getArray(); - // A $changeStream must be the first element of the pipeline in order to be able - // to replace (or add) a resume token. - invariant(!pipeline[0][DocumentSourceChangeStream::kStageName].missing()); - - MutableDocument changeStreamStage( - pipeline[0][DocumentSourceChangeStream::kStageName].getDocument()); - changeStreamStage[DocumentSourceChangeStreamSpec::kResumeAfterFieldName] = Value(resumeToken); - - // If the command was initially specified with a startAtOperationTime, we need to remove it to - // use the new resume token. - changeStreamStage[DocumentSourceChangeStreamSpec::kStartAtOperationTimeFieldName] = Value(); - pipeline[0] = - Value(Document{{DocumentSourceChangeStream::kStageName, changeStreamStage.freeze()}}); - MutableDocument newCmd(std::move(originalCmd)); - newCmd[AggregateCommandRequest::kPipelineFieldName] = Value(pipeline); - return newCmd.freeze().toBson(); -} - void DocumentSourceChangeStream::assertIsLegalSpecification( const intrusive_ptr<ExpressionContext>& expCtx, const DocumentSourceChangeStreamSpec& spec) { // If 'allChangesForCluster' is true, the stream must be opened on the 'admin' database with diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index eb40d2908ab..4368ffb01fd 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -184,14 +184,6 @@ public: BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx); /** - * Given a BSON object containing an aggregation command with a $changeStream stage, and a - * resume token, returns a new BSON object with the same command except with the addition of a - * resumeAfter: option containing the resume token. If there was a previous resumeAfter: - * option, it is removed. - */ - static BSONObj replaceResumeTokenInCommand(BSONObj originalCmdObj, Document resumeToken); - - /** * Helper used by various change stream stages. Used for asserting that a certain Value of a * field has a certain type. Will uassert() if the field does not have the expected type. */ @@ -269,57 +261,4 @@ private: using DocumentSourceMatch::DocumentSourceMatch; }; -/** - * A DocumentSource that if part of the pipeline, directly passes on the received documents to the - * next stages without interpreting it and marks where a sharded change streams pipeline should be - * split. This stage should only ever be created by a mongoS. - * - * TODO SERVER-55491: replace this class with DocumentSourceUpdateOnAddShard. - */ -class DocumentSourceChangeStreamPipelineSplitter final : public DocumentSource { -public: - static constexpr StringData kStageName = "$_internalChangeStreamPipelineSplitter"_sd; - - static boost::intrusive_ptr<DocumentSourceChangeStreamPipelineSplitter> create( - const boost::intrusive_ptr<ExpressionContext>& expCtx) { - return new DocumentSourceChangeStreamPipelineSplitter(expCtx); - } - - const char* getSourceName() const final { - return DocumentSourceChangeStreamPipelineSplitter::kStageName.rawData(); - } - - StageConstraints constraints(Pipeline::SplitState pipeState) const final { - return {StreamType::kStreaming, - PositionRequirement::kNone, - HostTypeRequirement::kMongoS, - DiskUseRequirement::kNoDiskUse, - FacetRequirement::kNotAllowed, - TransactionRequirement::kNotAllowed, - LookupRequirement::kNotAllowed, - UnionRequirement::kNotAllowed, - ChangeStreamRequirement::kChangeStreamStage}; - } - - Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final { - return (explain ? Value(Document{{kStageName, Document{}}}) : Value()); - } - - boost::optional<DistributedPlanLogic> distributedPlanLogic() final { - return DistributedPlanLogic{nullptr, nullptr, change_stream_constants::kSortSpec}; - } - -private: - DocumentSourceChangeStreamPipelineSplitter( - const boost::intrusive_ptr<ExpressionContext>& expCtx) - : DocumentSource(kStageName, expCtx) { - invariant(expCtx->inMongos); - } - - GetNextResult doGetNext() final { - // Pass on the document to the next stage without interpreting. - return pSource->getNext(); - } -}; - } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h index 5713af11d58..59be3f550b3 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h +++ b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h @@ -77,10 +77,7 @@ public: } boost::optional<DistributedPlanLogic> distributedPlanLogic() final { - // This stage must run on mongos to ensure it sees any invalidation in the correct order, - // and to ensure that all remote cursors are cleaned up properly. - // {shardsStage, mergingStage, sortPattern} - return DistributedPlanLogic{nullptr, this, change_stream_constants::kSortSpec}; + return boost::none; } private: diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h index c3e19a4a450..2af09ea780d 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.h +++ b/src/mongo/db/pipeline/document_source_check_resume_token.h @@ -85,7 +85,7 @@ public: ChangeStreamRequirement::kChangeStreamStage}; } - boost::optional<DistributedPlanLogic> distributedPlanLogic() override { + boost::optional<DistributedPlanLogic> distributedPlanLogic() final { return boost::none; } @@ -134,19 +134,6 @@ public: ChangeStreamRequirement::kChangeStreamStage}; } - boost::optional<DistributedPlanLogic> distributedPlanLogic() final { - DistributedPlanLogic logic; - // This stage must run on mongos to ensure it sees the resume token, which could have come - // from any shard. We also must include a mergingPresorted $sort stage to communicate to - // the AsyncResultsMerger that we need to merge the streams in a particular order. - logic.mergingStage = this; - // Also add logic to the shards to ensure that each shard has enough oplog history to resume - // the change stream. - logic.shardsStage = DocumentSourceCheckResumability::create(pExpCtx, _tokenFromClient); - logic.inputSortPattern = change_stream_constants::kSortSpec; - return logic; - }; - static boost::intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> create( const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token); diff --git a/src/mongo/s/query/document_source_update_on_add_shard.cpp b/src/mongo/db/pipeline/document_source_update_on_add_shard.cpp index d531065306f..7aec2272d25 100644 --- a/src/mongo/s/query/document_source_update_on_add_shard.cpp +++ b/src/mongo/db/pipeline/document_source_update_on_add_shard.cpp @@ -27,13 +27,16 @@ * it in the license file. */ -#include "mongo/s/query/document_source_update_on_add_shard.h" +#include "mongo/db/pipeline/document_source_update_on_add_shard.h" #include <algorithm> #include "mongo/db/pipeline/document_source_change_stream.h" +#include "mongo/db/pipeline/sharded_agg_helpers.h" +#include "mongo/db/query/query_feature_flags_gen.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/client/shard_registry.h" +#include "mongo/s/cluster_commands_helpers.h" #include "mongo/s/grid.h" #include "mongo/s/query/async_results_merger_params_gen.h" #include "mongo/s/query/establish_cursors.h" @@ -59,25 +62,26 @@ bool isShardConfigEvent(const Document& eventDoc) { } // namespace boost::intrusive_ptr<DocumentSourceUpdateOnAddShard> DocumentSourceUpdateOnAddShard::create( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const boost::intrusive_ptr<DocumentSourceMergeCursors>& mergeCursors, - std::vector<ShardId> shardsWithCursors, - BSONObj cmdToRunOnNewShards) { - return new DocumentSourceUpdateOnAddShard( - expCtx, mergeCursors, std::move(shardsWithCursors), cmdToRunOnNewShards); + const boost::intrusive_ptr<ExpressionContext>& expCtx) { + return new DocumentSourceUpdateOnAddShard(expCtx); } DocumentSourceUpdateOnAddShard::DocumentSourceUpdateOnAddShard( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const boost::intrusive_ptr<DocumentSourceMergeCursors>& mergeCursors, - std::vector<ShardId>&& shardsWithCursors, - BSONObj cmdToRunOnNewShards) - : DocumentSource(kStageName, expCtx), - _mergeCursors(mergeCursors), - _shardsWithCursors(shardsWithCursors.begin(), shardsWithCursors.end()), - _cmdToRunOnNewShards(cmdToRunOnNewShards.getOwned()) {} + const boost::intrusive_ptr<ExpressionContext>& expCtx) + : DocumentSource(kStageName, expCtx) {} DocumentSource::GetNextResult DocumentSourceUpdateOnAddShard::doGetNext() { + // For the first call to the 'doGetNext', the '_mergeCursors' will be null and must be + // populated. We also resolve the original aggregation command from the expression context. + if (!_mergeCursors) { + _mergeCursors = dynamic_cast<DocumentSourceMergeCursors*>(pSource); + _originalAggregateCommand = pExpCtx->originalAggregateCommand.getOwned(); + + tassert(5549100, "Missing $mergeCursors stage", _mergeCursors); + tassert( + 5549101, "Empty $changeStream command object", !_originalAggregateCommand.isEmpty()); + } + auto childResult = pSource->getNext(); // If this is an insertion into the 'config.shards' collection, open a cursor on the new shard. @@ -107,17 +111,12 @@ std::vector<RemoteCursor> DocumentSourceUpdateOnAddShard::establishShardCursorsO auto newShard = uassertStatusOK(ShardType::fromBSON(newShardSpec.getDocument().toBson())); // Make sure we are not attempting to open a cursor on a shard that already has one. - if (!_shardsWithCursors.insert(newShard.getName()).second) { + if (_mergeCursors->getShardIds().count(newShard.getName()) != 0) { return {}; } - // We must start the new cursor from the moment at which the shard became visible. - const auto newShardAddedTime = LogicalTime{ - newShardDetectedObj[DocumentSourceChangeStream::kClusterTimeField].getTimestamp()}; - auto resumeTokenForNewShard = - ResumeToken::makeHighWaterMarkToken(newShardAddedTime.addTicks(1).asTimestamp()); - auto cmdObj = DocumentSourceChangeStream::replaceResumeTokenInCommand( - _cmdToRunOnNewShards, resumeTokenForNewShard.toDocument()); + auto cmdObj = createUpdatedCommandForNewShard( + newShardDetectedObj[DocumentSourceChangeStream::kClusterTimeField].getTimestamp()); const bool allowPartialResults = false; // partial results are not allowed return establishCursors(opCtx, @@ -128,4 +127,59 @@ std::vector<RemoteCursor> DocumentSourceUpdateOnAddShard::establishShardCursorsO allowPartialResults); } +BSONObj DocumentSourceUpdateOnAddShard::createUpdatedCommandForNewShard(Timestamp shardAddedTime) { + // We must start the new cursor from the moment at which the shard became visible. + const auto newShardAddedTime = LogicalTime{shardAddedTime}; + auto resumeTokenForNewShard = + ResumeToken::makeHighWaterMarkToken(newShardAddedTime.addTicks(1).asTimestamp()); + + // Create a new shard command object containing the new resume token. + auto shardCommand = replaceResumeTokenInCommand(resumeTokenForNewShard.toDocument()); + + auto* opCtx = pExpCtx->opCtx; + bool apiStrict = APIParameters::get(opCtx).getAPIStrict().value_or(false); + + // Create the 'AggregateCommandRequest' object which will help in creating the parsed pipeline. + auto aggCmdRequest = aggregation_request_helper::parseFromBSON( + pExpCtx->ns, shardCommand, boost::none, apiStrict); + + // Parse and optimize the pipeline. + auto pipeline = Pipeline::parse(aggCmdRequest.getPipeline(), pExpCtx); + pipeline->optimizePipeline(); + + // Split the full pipeline to get the shard pipeline. + auto splitPipelines = sharded_agg_helpers::splitPipeline(std::move(pipeline)); + + // Create the new command that will run on the shard. + return sharded_agg_helpers::createCommandForTargetedShards(pExpCtx, + Document{shardCommand}, + splitPipelines, + boost::none, /* exhangeSpec */ + true /* needsMerge */); +} + +BSONObj DocumentSourceUpdateOnAddShard::replaceResumeTokenInCommand(Document resumeToken) { + Document originalCmd(_originalAggregateCommand); + auto pipeline = originalCmd[AggregateCommandRequest::kPipelineFieldName].getArray(); + + // A $changeStream must be the first element of the pipeline in order to be able + // to replace (or add) a resume token. + tassert(5549102, + "Invalid $changeStream command object", + !pipeline[0][DocumentSourceChangeStream::kStageName].missing()); + + MutableDocument changeStreamStage( + pipeline[0][DocumentSourceChangeStream::kStageName].getDocument()); + changeStreamStage[DocumentSourceChangeStreamSpec::kResumeAfterFieldName] = Value(resumeToken); + + // If the command was initially specified with a startAtOperationTime, we need to remove it to + // use the new resume token. + changeStreamStage[DocumentSourceChangeStreamSpec::kStartAtOperationTimeFieldName] = Value(); + pipeline[0] = + Value(Document{{DocumentSourceChangeStream::kStageName, changeStreamStage.freeze()}}); + MutableDocument newCmd(std::move(originalCmd)); + newCmd[AggregateCommandRequest::kPipelineFieldName] = Value(pipeline); + return newCmd.freeze().toBson(); +} + } // namespace mongo diff --git a/src/mongo/s/query/document_source_update_on_add_shard.h b/src/mongo/db/pipeline/document_source_update_on_add_shard.h index 621b508bf63..82e981572c1 100644 --- a/src/mongo/s/query/document_source_update_on_add_shard.h +++ b/src/mongo/db/pipeline/document_source_update_on_add_shard.h @@ -31,6 +31,7 @@ #include <memory> +#include "mongo/db/pipeline/change_stream_constants.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/s/query/document_source_merge_cursors.h" #include "mongo/s/shard_id.h" @@ -53,15 +54,10 @@ public: * by 'mergeCursorsStage' whenever a new shard is detected by a change stream. */ static boost::intrusive_ptr<DocumentSourceUpdateOnAddShard> create( - const boost::intrusive_ptr<ExpressionContext>&, - const boost::intrusive_ptr<DocumentSourceMergeCursors>&, - std::vector<ShardId> shardsWithCursors, - BSONObj cmdToRunOnNewShards); + const boost::intrusive_ptr<ExpressionContext>&); Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final { - // We only ever expect to add this stage if the pipeline is being executed locally on a - // mongos. In this case, it should never be serialized. - MONGO_UNREACHABLE; + return (explain ? Value(Document{{kStageName, Value()}}) : Value()); } virtual StageConstraints constraints(Pipeline::SplitState) const { @@ -77,14 +73,11 @@ public: } boost::optional<DistributedPlanLogic> distributedPlanLogic() final { - return boost::none; + return DistributedPlanLogic{nullptr, this, change_stream_constants::kSortSpec}; } private: - DocumentSourceUpdateOnAddShard(const boost::intrusive_ptr<ExpressionContext>&, - const boost::intrusive_ptr<DocumentSourceMergeCursors>&, - std::vector<ShardId>&& shardsWithCursors, - BSONObj cmdToRunOnNewShards); + DocumentSourceUpdateOnAddShard(const boost::intrusive_ptr<ExpressionContext>&); GetNextResult doGetNext() final; @@ -98,8 +91,21 @@ private: */ std::vector<RemoteCursor> establishShardCursorsOnNewShards(const Document& newShardDetectedObj); + /** + * Updates the $changeStream stage in the '_originalAggregateCommand' to reflect the start time + * for the newly-added shard(s), then generates the final command object to be run on those + * shards. + */ + BSONObj createUpdatedCommandForNewShard(Timestamp shardAddedTime); + + /** + * Given the '_originalAggregateCommand' and a resume token, returns a new BSON object with the + * same command except with the addition of a resumeAfter option containing the resume token. + * If there was a previous resumeAfter option, it will be removed. + */ + BSONObj replaceResumeTokenInCommand(Document resumeToken); + boost::intrusive_ptr<DocumentSourceMergeCursors> _mergeCursors; - std::set<ShardId> _shardsWithCursors; - BSONObj _cmdToRunOnNewShards; + BSONObj _originalAggregateCommand; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/expression_context.cpp b/src/mongo/db/pipeline/expression_context.cpp index d63c9f4962a..c4cb0f6e411 100644 --- a/src/mongo/db/pipeline/expression_context.cpp +++ b/src/mongo/db/pipeline/expression_context.cpp @@ -214,6 +214,9 @@ intrusive_ptr<ExpressionContext> ExpressionContext::copyWith( expCtx->exprUnstableForApiV1 = exprUnstableForApiV1; expCtx->exprDeprectedForApiV1 = exprDeprectedForApiV1; + expCtx->initialPostBatchResumeToken = initialPostBatchResumeToken.getOwned(); + expCtx->originalAggregateCommand = originalAggregateCommand.getOwned(); + // Note that we intentionally skip copying the value of '_interruptCounter' because 'expCtx' is // intended to be used for executing a separate aggregation pipeline. diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h index 84d9f46236d..5ca843b1498 100644 --- a/src/mongo/db/pipeline/expression_context.h +++ b/src/mongo/db/pipeline/expression_context.h @@ -383,6 +383,9 @@ public: enum class CollationMatchesDefault { kNoDefault, kYes, kNo }; CollationMatchesDefault collationMatchesDefault = CollationMatchesDefault::kNoDefault; + // When non-empty, contains the unmodified user provided aggregation command. + BSONObj originalAggregateCommand; + protected: static const int kInterruptCheckPeriod = 128; diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index 9fe9104da25..7d405768fc2 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -47,6 +47,7 @@ #include "mongo/db/pipeline/document_source_skip.h" #include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/document_source_unwind.h" +#include "mongo/db/pipeline/document_source_update_on_add_shard.h" #include "mongo/db/pipeline/lite_parsed_pipeline.h" #include "mongo/db/pipeline/semantic_analysis.h" #include "mongo/db/vector_clock.h" @@ -56,7 +57,6 @@ #include "mongo/s/cluster_commands_helpers.h" #include "mongo/s/query/cluster_query_knobs_gen.h" #include "mongo/s/query/document_source_merge_cursors.h" -#include "mongo/s/query/document_source_update_on_add_shard.h" #include "mongo/s/query/establish_cursors.h" #include "mongo/s/transaction_router.h" #include "mongo/util/fail_point.h" @@ -273,11 +273,8 @@ boost::optional<BSONObj> findSplitPoint(Pipeline::SourceContainer* shardPipe, Pi continue; } - // TODO SERVER-55491: remove this 'if' to make the invariant unconditional. - if (distributedPlanLogic->shardsStage && distributedPlanLogic->mergingStage) { - // A source may not simultaneously be present on both sides of the split. - invariant(distributedPlanLogic->shardsStage != distributedPlanLogic->mergingStage); - } + // A source may not simultaneously be present on both sides of the split. + invariant(distributedPlanLogic->shardsStage != distributedPlanLogic->mergingStage); if (distributedPlanLogic->shardsStage) shardPipe->push_back(std::move(distributedPlanLogic->shardsStage)); @@ -837,8 +834,15 @@ BSONObj createPassthroughCommandForShard( targetedCmd[AggregateCommandRequest::kPipelineFieldName] = Value(pipeline->serialize()); } - return genericTransformForShards( - std::move(targetedCmd), expCtx, explainVerbosity, collationObj); + auto shardCommand = + genericTransformForShards(std::move(targetedCmd), expCtx, explainVerbosity, collationObj); + + // Apply filter and RW concern to the final shard command. + return CommandHelpers::filterCommandRequestForPassthrough( + applyReadWriteConcern(expCtx->opCtx, + true, /* appendRC */ + !explainVerbosity, /* appendWC */ + shardCommand)); } BSONObj createCommandForTargetedShards(const boost::intrusive_ptr<ExpressionContext>& expCtx, @@ -876,8 +880,14 @@ BSONObj createCommandForTargetedShards(const boost::intrusive_ptr<ExpressionCont targetedCmd[AggregateCommandRequest::kExchangeFieldName] = exchangeSpec ? Value(exchangeSpec->exchangeSpec.toBSON()) : Value(); - return genericTransformForShards( + auto shardCommand = genericTransformForShards( std::move(targetedCmd), expCtx, expCtx->explain, expCtx->getCollatorBSON()); + + // Apply RW concern to the final shard command. + return applyReadWriteConcern(expCtx->opCtx, + true, /* appendRC */ + !expCtx->explain, /* appendWC */ + shardCommand); } /** @@ -951,15 +961,12 @@ DispatchShardPipelineResults dispatchShardPipeline( } // Generate the command object for the targeted shards. - BSONObj targetedCommand = applyReadWriteConcern( - opCtx, - true, /* appendRC */ - !expCtx->explain, /* appendWC */ - splitPipelines - ? createCommandForTargetedShards( - expCtx, serializedCommand, *splitPipelines, exchangeSpec, true) - : createPassthroughCommandForShard( - expCtx, serializedCommand, expCtx->explain, pipeline.get(), collationObj)); + BSONObj targetedCommand = + (splitPipelines + ? createCommandForTargetedShards( + expCtx, serializedCommand, *splitPipelines, exchangeSpec, true /* needsMerge */) + : createPassthroughCommandForShard( + expCtx, serializedCommand, expCtx->explain, pipeline.get(), collationObj)); // A $changeStream pipeline must run on all shards, and will also open an extra cursor on the // config server in order to monitor for new shards. To guarantee that we do not miss any @@ -1097,16 +1104,9 @@ void addMergeCursorsSource(Pipeline* mergePipeline, armParams.setRemotes(std::move(remoteCursors)); - // For change streams, we need to set up a custom stage to establish cursors on new shards when - // they are added, to ensure we don't miss results from the new shards. auto mergeCursorsStage = DocumentSourceMergeCursors::create(mergePipeline->getContext(), std::move(armParams)); - if (hasChangeStream) { - mergePipeline->addInitialSource(DocumentSourceUpdateOnAddShard::create( - mergePipeline->getContext(), mergeCursorsStage, targetedShards, cmdSentToShards)); - } - mergePipeline->addInitialSource(std::move(mergeCursorsStage)); } diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript index 2c05aee0f20..37dee4875f3 100644 --- a/src/mongo/s/query/SConscript +++ b/src/mongo/s/query/SConscript @@ -56,7 +56,6 @@ env.Library( target="router_exec_stage", source=[ 'document_source_merge_cursors.cpp', - 'document_source_update_on_add_shard.cpp', 'router_stage_limit.cpp', 'router_stage_mock.cpp', 'router_stage_pipeline.cpp', diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index 111fde047f1..5ac4777ad48 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -128,6 +128,8 @@ AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx, tassert( 5493704, "Found invalidated cursor on the first batch", !_remotes.back().invalidated); + _remotes.back().shardId = remote.getShardId().toString(); + // We don't check the return value of _addBatchToBuffer here; if there was an error, // it will be stored in the remote and the first call to ready() will return true. _addBatchToBuffer(WithLock::withoutLock(), remoteIndex, remote.getCursorResponse()); diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp index 2521315c8e3..b9f68107a40 100644 --- a/src/mongo/s/query/cluster_aggregate.cpp +++ b/src/mongo/s/query/cluster_aggregate.cpp @@ -105,7 +105,8 @@ boost::intrusive_ptr<ExpressionContext> makeExpressionContext( const AggregateCommandRequest& request, BSONObj collationObj, boost::optional<UUID> uuid, - StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces) { + StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces, + bool hasChangeStream) { std::unique_ptr<CollatorInterface> collation; if (!collationObj.isEmpty()) { @@ -126,6 +127,15 @@ boost::intrusive_ptr<ExpressionContext> makeExpressionContext( uuid); mergeCtx->inMongos = true; + + // Serialize the 'AggregateCommandRequest' and save it so that the original command can be + // reconstructed for dispatch to a new shard, which is sometimes necessary for change streams + // pipelines. + if (hasChangeStream) { + mergeCtx->originalAggregateCommand = + aggregation_request_helper::serializeToCommandObj(request); + } + return mergeCtx; } @@ -293,8 +303,12 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, // Build an ExpressionContext for the pipeline. This instantiates an appropriate collator, // resolves all involved namespaces, and creates a shared MongoProcessInterface for use by // the pipeline's stages. - expCtx = makeExpressionContext( - opCtx, request, collationObj, uuid, resolveInvolvedNamespaces(involvedNamespaces)); + expCtx = makeExpressionContext(opCtx, + request, + collationObj, + uuid, + resolveInvolvedNamespaces(involvedNamespaces), + hasChangeStream); // Parse and optimize the full pipeline. auto pipeline = Pipeline::parse(request.getPipeline(), expCtx); diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp index 8827907d5a2..8714c6b1aad 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.cpp +++ b/src/mongo/s/query/cluster_aggregation_planner.cpp @@ -410,14 +410,15 @@ DispatchShardPipelineResults dispatchExchangeConsumerPipeline( consumerPipelines.emplace_back(std::move(consumerPipeline), nullptr, boost::none); - auto consumerCmdObj = sharded_agg_helpers::createCommandForTargetedShards( - expCtx, serializedCommand, consumerPipelines.back(), boost::none, false); + auto consumerCmdObj = + sharded_agg_helpers::createCommandForTargetedShards(expCtx, + serializedCommand, + consumerPipelines.back(), + boost::none, /* exchangeSpec */ + false /* needsMerge */); requests.emplace_back(shardDispatchResults->exchangeSpec->consumerShards[idx], - applyReadWriteConcern(opCtx, - true, /* appendRC */ - !expCtx->explain, /* appendWC */ - consumerCmdObj)); + consumerCmdObj); } auto cursors = establishCursors(opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), @@ -620,13 +621,11 @@ Status runPipelineOnPrimaryShard(const boost::intrusive_ptr<ExpressionContext>& // Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an // explain if necessary, and rewrites the result into a format safe to forward to shards. - BSONObj cmdObj = - applyReadWriteConcern(opCtx, - true, /* appendRC */ - !explain, /* appendWC */ - CommandHelpers::filterCommandRequestForPassthrough( - sharded_agg_helpers::createPassthroughCommandForShard( - expCtx, serializedCommand, explain, nullptr, BSONObj()))); + BSONObj cmdObj = sharded_agg_helpers::createPassthroughCommandForShard(expCtx, + serializedCommand, + explain, + nullptr, /* pipeline */ + BSONObj()); const auto shardId = cm.dbPrimary(); const auto cmdObjWithShardVersion = (shardId != ShardId::kConfigServerId) diff --git a/src/mongo/s/query/document_source_merge_cursors.cpp b/src/mongo/s/query/document_source_merge_cursors.cpp index a08e966817a..737ad721006 100644 --- a/src/mongo/s/query/document_source_merge_cursors.cpp +++ b/src/mongo/s/query/document_source_merge_cursors.cpp @@ -53,6 +53,9 @@ DocumentSourceMergeCursors::DocumentSourceMergeCursors( _armParamsObj(std::move(ownedParamsSpec)), _armParams(std::move(armParams)) { _armParams->setRecordRemoteOpWaitTime(true); + + // Populate the shard ids from the 'RemoteCursor'. + recordRemoteCursorShardIds(_armParams->getRemotes()); } std::size_t DocumentSourceMergeCursors::getNumRemotes() const { @@ -153,4 +156,13 @@ void DocumentSourceMergeCursors::doDispose() { } } + +void DocumentSourceMergeCursors::recordRemoteCursorShardIds( + const std::vector<RemoteCursor>& remoteCursors) { + for (const auto& remoteCursor : remoteCursors) { + tassert(5549103, "Encountered invalid shard ID", !remoteCursor.getShardId().empty()); + _shardsWithCursors.emplace(remoteCursor.getShardId().toString()); + } +} + } // namespace mongo diff --git a/src/mongo/s/query/document_source_merge_cursors.h b/src/mongo/s/query/document_source_merge_cursors.h index b637e21e27a..d79c192d4b3 100644 --- a/src/mongo/s/query/document_source_merge_cursors.h +++ b/src/mongo/s/query/document_source_merge_cursors.h @@ -103,6 +103,13 @@ public: std::size_t getNumRemotes() const; /** + * Returns the set of shard ids whose cursor has already been established. + */ + const std::set<ShardId>& getShardIds() const { + return _shardsWithCursors; + } + + /** * Returns the high water mark sort key for the given cursor, if it exists; otherwise, returns * an empty BSONObj. Calling this method causes the underlying BlockingResultsMerger to be * populated and assumes ownership of the remote cursors. @@ -133,6 +140,7 @@ public: */ void addNewShardCursors(std::vector<RemoteCursor>&& newCursors) { invariant(_blockingResultsMerger); + recordRemoteCursorShardIds(newCursors); _blockingResultsMerger->addNewShardCursors(std::move(newCursors)); } @@ -159,6 +167,11 @@ private: */ void populateMerger(); + /** + * Adds the shard Ids of the given remote cursors into the _shardsWithCursors set. + */ + void recordRemoteCursorShardIds(const std::vector<RemoteCursor>& remoteCursors); + // When we have parsed the params out of a BSONObj, the object needs to stay around while the // params are in use. We store them here. boost::optional<BSONObj> _armParamsObj; @@ -182,6 +195,9 @@ private: // Indicates whether the cursors stored in _armParams are "owned", meaning the cursors should be // killed upon disposal of this DocumentSource. bool _ownCursors = true; + + // Set containing shard ids with valid cursors. + std::set<ShardId> _shardsWithCursors; }; } // namespace mongo |