diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2018-07-02 18:23:25 -0400 |
---|---|---|
committer | Charlie Swanson <charlie.swanson@mongodb.com> | 2018-08-15 13:30:12 -0400 |
commit | ee06e6cbe5a75775f76836449558be2f6a98ddfd (patch) | |
tree | d4dbf37110d25f7f4876337a7b1e11abe251fac5 /src | |
parent | a5bde2f3e9afc3f72da01788b76829fb29c2f4e7 (diff) | |
download | mongo-ee06e6cbe5a75775f76836449558be2f6a98ddfd.tar.gz |
SERVER-33323 Refactor agg cursor merging on mongos
This commit makes it so that aggregations will always use a
$mergeCursors as a wrapper around a AsyncResultsMerger, which is new
behavior for mongos. As part of this refactor, we can delete the concept
of a 'merging presorted' $sort stage (which is now handled by the
AsyncResultsMerger) and delete the DocumentSourceRouterAdapter stage
which talked to a RouterStageMerge, instead directly using a
$mergeCursors stage.
Diffstat (limited to 'src')
57 files changed, 1514 insertions, 1359 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index e9157f816d0..150fa1deb51 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -168,6 +168,17 @@ env.Library( ) env.Library( + target='cluster_aggregation_planner', + source=[ + 'cluster_aggregation_planner.cpp', + ], + LIBDEPS=[ + 'pipeline', + '$BUILD_DIR/mongo/s/query/cluster_client_cursor', + ] +) + +env.Library( target='granularity_rounder', source=[ 'granularity_rounder.cpp', @@ -303,7 +314,6 @@ pipelineeEnv.InjectThirdPartyIncludePaths(libraries=['snappy']) pipelineeEnv.Library( target='pipeline', source=[ - "cluster_aggregation_planner.cpp", 'document_source.cpp', 'document_source_add_fields.cpp', 'document_source_backup_cursor.cpp', @@ -334,6 +344,7 @@ pipelineeEnv.Library( 'document_source_lookup_change_post_image.cpp', 'document_source_match.cpp', 'document_source_merge_cursors.cpp', + 'document_source_update_on_add_shard.cpp', 'document_source_out.cpp', 'document_source_out_replace_coll.cpp', 'document_source_project.cpp', @@ -439,8 +450,9 @@ env.CppUnitTest( '$BUILD_DIR/mongo/db/repl/replmocks', '$BUILD_DIR/mongo/db/service_context_test_fixture', '$BUILD_DIR/mongo/s/is_mongos', - 'document_value_test_util', + 'cluster_aggregation_planner', 'document_source_mock', + 'document_value_test_util', 'pipeline', ], ) diff --git a/src/mongo/db/pipeline/cluster_aggregation_planner.cpp b/src/mongo/db/pipeline/cluster_aggregation_planner.cpp index 8fa76c1db79..c9dcfaf2da3 100644 --- a/src/mongo/db/pipeline/cluster_aggregation_planner.cpp +++ b/src/mongo/db/pipeline/cluster_aggregation_planner.cpp @@ -30,11 +30,21 @@ #include "mongo/db/pipeline/cluster_aggregation_planner.h" +#include "mongo/db/pipeline/document_source_limit.h" #include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/document_source_merge_cursors.h" #include "mongo/db/pipeline/document_source_project.h" +#include "mongo/db/pipeline/document_source_skip.h" #include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/document_source_unwind.h" +#include "mongo/db/pipeline/document_source_update_on_add_shard.h" +#include "mongo/executor/task_executor_pool.h" +#include "mongo/s/grid.h" +#include "mongo/s/query/router_stage_limit.h" +#include "mongo/s/query/router_stage_pipeline.h" +#include "mongo/s/query/router_stage_remove_metadata_fields.h" +#include "mongo/s/query/router_stage_skip.h" +#include "mongo/s/shard_id.h" namespace mongo { namespace cluster_aggregation_planner { @@ -47,8 +57,10 @@ namespace { * It is not safe to call this optimization multiple times. * * NOTE: looks for NeedsMergerDocumentSources and uses that API + * + * Returns the sort specification if the input streams are sorted, and false otherwise. */ -void findSplitPoint(Pipeline* shardPipe, Pipeline* mergePipe) { +boost::optional<BSONObj> findSplitPoint(Pipeline::SourceContainer* shardPipe, Pipeline* mergePipe) { while (!mergePipe->getSources().empty()) { boost::intrusive_ptr<DocumentSource> current = mergePipe->popFront(); @@ -58,28 +70,25 @@ void findSplitPoint(Pipeline* shardPipe, Pipeline* mergePipe) { if (!splittable) { // Move the source from the merger _sources to the shard _sources. - shardPipe->pushBack(current); + shardPipe->push_back(current); } else { // Split this source into 'merge' and 'shard' _sources. boost::intrusive_ptr<DocumentSource> shardSource = splittable->getShardSource(); - auto mergeSources = splittable->getMergeSources(); + auto mergeLogic = splittable->mergingLogic(); // A source may not simultaneously be present on both sides of the split. - invariant(std::find(mergeSources.begin(), mergeSources.end(), shardSource) == - mergeSources.end()); + invariant(shardSource != mergeLogic.mergingStage); if (shardSource) - shardPipe->pushBack(shardSource); + shardPipe->push_back(std::move(shardSource)); - // Add the stages in reverse order, so that they appear in the pipeline in the same - // order as they were returned by the stage. - for (auto it = mergeSources.rbegin(); it != mergeSources.rend(); ++it) { - mergePipe->addInitialSource(*it); - } + if (mergeLogic.mergingStage) + mergePipe->addInitialSource(std::move(mergeLogic.mergingStage)); - break; + return mergeLogic.inputSortPattern; } } + return boost::none; } /** @@ -131,48 +140,118 @@ void limitFieldsSentFromShardsToMerger(Pipeline* shardPipe, Pipeline* mergePipe) BSON("$project" << mergeDeps.toProjection()).firstElement(), shardPipe->getContext()); shardPipe->pushBack(project); } -} // namespace -void performSplitPipelineOptimizations(Pipeline* shardPipeline, Pipeline* mergingPipeline) { - // The order in which optimizations are applied can have significant impact on the - // efficiency of the final pipeline. Be Careful! - findSplitPoint(shardPipeline, mergingPipeline); - moveFinalUnwindFromShardsToMerger(shardPipeline, mergingPipeline); - limitFieldsSentFromShardsToMerger(shardPipeline, mergingPipeline); +bool isMergeSkipOrLimit(const boost::intrusive_ptr<DocumentSource>& stage) { + return (dynamic_cast<DocumentSourceLimit*>(stage.get()) || + dynamic_cast<DocumentSourceMergeCursors*>(stage.get()) || + dynamic_cast<DocumentSourceSkip*>(stage.get())); } -boost::optional<BSONObj> popLeadingMergeSort(Pipeline* pipeline) { - // Remove a leading $sort iff it is a mergesort, since the ARM cannot handle blocking $sort. - auto frontSort = pipeline->popFrontWithNameAndCriteria( - DocumentSourceSort::kStageName, [](const DocumentSource* const source) { - return static_cast<const DocumentSourceSort* const>(source)->mergingPresorted(); - }); - - if (frontSort) { - auto sortStage = static_cast<DocumentSourceSort*>(frontSort.get()); - if (auto sortLimit = sortStage->getLimitSrc()) { - // There was a limit stage absorbed into the sort stage, so we need to preserve that. - pipeline->addInitialSource(sortLimit); +bool isAllLimitsAndSkips(Pipeline* pipeline) { + const auto stages = pipeline->getSources(); + return std::all_of( + stages.begin(), stages.end(), [](const auto& stage) { return isMergeSkipOrLimit(stage); }); +} + +ClusterClientCursorGuard convertPipelineToRouterStages( + std::unique_ptr<Pipeline, PipelineDeleter> pipeline, ClusterClientCursorParams&& cursorParams) { + auto* opCtx = pipeline->getContext()->opCtx; + + // We expect the pipeline to be fully executable at this point, so if the pipeline was all skips + // and limits we expect it to start with a $mergeCursors stage. + auto mergeCursors = + checked_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get()); + // Replace the pipeline with RouterExecStages. + std::unique_ptr<RouterExecStage> root = mergeCursors->convertToRouterStage(); + pipeline->popFront(); + while (!pipeline->getSources().empty()) { + if (auto skip = pipeline->popFrontWithName(DocumentSourceSkip::kStageName)) { + root = std::make_unique<RouterStageSkip>( + opCtx, std::move(root), static_cast<DocumentSourceSkip*>(skip.get())->getSkip()); + } else if (auto limit = pipeline->popFrontWithName(DocumentSourceLimit::kStageName)) { + root = std::make_unique<RouterStageLimit>( + opCtx, std::move(root), static_cast<DocumentSourceLimit*>(limit.get())->getLimit()); + } else { + // We previously checked that everything was a $mergeCursors, $skip, or $limit. We + // already popped off the $mergeCursors, so everything else should be a $skip or a + // $limit. + MONGO_UNREACHABLE; } - return sortStage - ->sortKeyPattern(DocumentSourceSort::SortKeySerialization::kForSortKeyMerging) - .toBson(); } - return boost::none; + // We are executing the pipeline without using an actual Pipeline, so we need to strip out any + // Document metadata ourselves. + return ClusterClientCursorImpl::make( + opCtx, + std::make_unique<RouterStageRemoveMetadataFields>( + opCtx, std::move(root), Document::allMetadataFieldNames), + std::move(cursorParams)); +} +} // namespace + +SplitPipeline splitPipeline(std::unique_ptr<Pipeline, PipelineDeleter> pipeline) { + auto& expCtx = pipeline->getContext(); + // Re-brand 'pipeline' as the merging pipeline. We will move stages one by one from the merging + // half to the shards, as possible. + auto mergePipeline = std::move(pipeline); + + Pipeline::SourceContainer shardStages; + boost::optional<BSONObj> inputsSort = findSplitPoint(&shardStages, mergePipeline.get()); + auto shardsPipeline = uassertStatusOK(Pipeline::create(std::move(shardStages), expCtx)); + + // The order in which optimizations are applied can have significant impact on the efficiency of + // the final pipeline. Be Careful! + moveFinalUnwindFromShardsToMerger(shardsPipeline.get(), mergePipeline.get()); + limitFieldsSentFromShardsToMerger(shardsPipeline.get(), mergePipeline.get()); + shardsPipeline->setSplitState(Pipeline::SplitState::kSplitForShards); + mergePipeline->setSplitState(Pipeline::SplitState::kSplitForMerge); + + return {std::move(shardsPipeline), std::move(mergePipeline), std::move(inputsSort)}; } void addMergeCursorsSource(Pipeline* mergePipeline, + const LiteParsedPipeline& liteParsedPipeline, + BSONObj cmdSentToShards, std::vector<RemoteCursor> remoteCursors, + const std::vector<ShardId>& targetedShards, + boost::optional<BSONObj> shardCursorsSortSpec, executor::TaskExecutor* executor) { + auto* opCtx = mergePipeline->getContext()->opCtx; AsyncResultsMergerParams armParams; - if (auto sort = popLeadingMergeSort(mergePipeline)) { - armParams.setSort(std::move(*sort)); - } + armParams.setSort(shardCursorsSortSpec); armParams.setRemotes(std::move(remoteCursors)); armParams.setTailableMode(mergePipeline->getContext()->tailableMode); armParams.setNss(mergePipeline->getContext()->ns); - mergePipeline->addInitialSource(DocumentSourceMergeCursors::create( - executor, std::move(armParams), mergePipeline->getContext())); + + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(opCtx->getLogicalSessionId()); + sessionInfo.setTxnNumber(opCtx->getTxnNumber()); + armParams.setOperationSessionInfo(sessionInfo); + + // For change streams, we need to set up a custom stage to establish cursors on new shards when + // they are added, to ensure we don't miss results from the new shards. + auto mergeCursorsStage = DocumentSourceMergeCursors::create( + executor, std::move(armParams), mergePipeline->getContext()); + if (liteParsedPipeline.hasChangeStream()) { + mergePipeline->addInitialSource(DocumentSourceUpdateOnAddShard::create( + mergePipeline->getContext(), + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), + mergeCursorsStage, + targetedShards, + cmdSentToShards)); + } + mergePipeline->addInitialSource(std::move(mergeCursorsStage)); +} + +ClusterClientCursorGuard buildClusterCursor(OperationContext* opCtx, + std::unique_ptr<Pipeline, PipelineDeleter> pipeline, + ClusterClientCursorParams&& cursorParams) { + if (isAllLimitsAndSkips(pipeline.get())) { + // We can optimize this Pipeline to avoid going through any DocumentSources at all and thus + // skip the expensive BSON->Document->BSON conversion. + return convertPipelineToRouterStages(std::move(pipeline), std::move(cursorParams)); + } + return ClusterClientCursorImpl::make( + opCtx, std::make_unique<RouterStagePipeline>(std::move(pipeline)), std::move(cursorParams)); } } // namespace cluster_aggregation_planner diff --git a/src/mongo/db/pipeline/cluster_aggregation_planner.h b/src/mongo/db/pipeline/cluster_aggregation_planner.h index f62f158f6ca..3b3aaa63df4 100644 --- a/src/mongo/db/pipeline/cluster_aggregation_planner.h +++ b/src/mongo/db/pipeline/cluster_aggregation_planner.h @@ -28,31 +28,66 @@ #pragma once +#include "mongo/db/pipeline/lite_parsed_pipeline.h" #include "mongo/db/pipeline/pipeline.h" +#include "mongo/s/query/cluster_client_cursor_impl.h" namespace mongo { namespace cluster_aggregation_planner { /** - * Performs optimizations with the aim of reducing computing time and network traffic when a - * pipeline has been split into two pieces. Modifies 'shardPipeline' and 'mergingPipeline' such that - * they may contain different stages, but still compute the same results when executed. + * Represents the two halves of a pipeline that will execute in a sharded cluster. 'shardsPipeline' + * will execute in parallel on each shard, and 'mergePipeline' will execute on the merge host - + * either one of the shards or a mongos. */ -void performSplitPipelineOptimizations(Pipeline* shardPipeline, Pipeline* mergingPipeline); +struct SplitPipeline { + SplitPipeline(std::unique_ptr<Pipeline, PipelineDeleter> shardsPipeline, + std::unique_ptr<Pipeline, PipelineDeleter> mergePipeline, + boost::optional<BSONObj> shardCursorsSortSpec) + : shardsPipeline(std::move(shardsPipeline)), + mergePipeline(std::move(mergePipeline)), + shardCursorsSortSpec(std::move(shardCursorsSortSpec)) {} + + std::unique_ptr<Pipeline, PipelineDeleter> shardsPipeline; + std::unique_ptr<Pipeline, PipelineDeleter> mergePipeline; + + // If set, the cursors from the shards are expected to be sorted according to this spec, and to + // have populated a "$sortKey" metadata field which can be used to compare the results. + boost::optional<BSONObj> shardCursorsSortSpec; +}; /** - * Rips off an initial $sort stage that can be handled by cursor merging machinery. Returns the - * sort key pattern of such a $sort stage if there was one, and boost::none otherwise. + * Split the current Pipeline into a Pipeline for each shard, and a Pipeline that combines the + * results within a merging process. This call also performs optimizations with the aim of reducing + * computing time and network traffic when a pipeline has been split into two pieces. + * + * The 'mergePipeline' returned as part of the SplitPipeline here is not ready to execute until the + * 'shardsPipeline' has been sent to the shards and cursors have been established. Once cursors have + * been established, the merge pipeline can be made executable by calling 'addMergeCursorsSource()' */ -boost::optional<BSONObj> popLeadingMergeSort(Pipeline* mergePipeline); +SplitPipeline splitPipeline(std::unique_ptr<Pipeline, PipelineDeleter> pipeline); /** * Creates a new DocumentSourceMergeCursors from the provided 'remoteCursors' and adds it to the * front of 'mergePipeline'. */ void addMergeCursorsSource(Pipeline* mergePipeline, + const LiteParsedPipeline&, + BSONObj cmdSentToShards, std::vector<RemoteCursor> remoteCursors, + const std::vector<ShardId>& targetedShards, + boost::optional<BSONObj> shardCursorsSortSpec, executor::TaskExecutor*); +/** + * Builds a ClusterClientCursor which will execute 'pipeline'. If 'pipeline' consists entirely of + * $skip and $limit stages, the pipeline is eliminated entirely and replaced with a RouterExecStage + * tree that does same thing but will avoid using a RouterStagePipeline. Avoiding a + * RouterStagePipeline will remove an expensive conversion from BSONObj -> Document for each result. + */ +ClusterClientCursorGuard buildClusterCursor(OperationContext* opCtx, + std::unique_ptr<Pipeline, PipelineDeleter> pipeline, + ClusterClientCursorParams&&); + } // namespace cluster_aggregation_planner } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index 4aae392153d..682b03b76e1 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -698,6 +698,22 @@ private: class NeedsMergerDocumentSource { public: /** + * A struct representing the information needed to merge the cursors for the shards half of this + * pipeline. If 'inputSortPattern' is set, each document is expected to have sort key metadata + * which will be serialized in the '$sortKey' field. 'inputSortPattern' will then be used to + * describe which fields are ascending and which fields are descending when merging the streams + * together. + */ + struct MergingLogic { + MergingLogic(boost::intrusive_ptr<DocumentSource>&& mergingStage, + boost::optional<BSONObj> inputSortPattern = boost::none) + : mergingStage(std::move(mergingStage)), inputSortPattern(inputSortPattern) {} + + boost::intrusive_ptr<DocumentSource> mergingStage; + boost::optional<BSONObj> inputSortPattern; + }; + + /** * Returns a source to be run on the shards, or NULL if no work should be done on the shards for * this stage. Must not mutate the existing source object; if different behaviour is required in * the split-pipeline case, a new source should be created and configured appropriately. It is @@ -708,12 +724,12 @@ public: virtual boost::intrusive_ptr<DocumentSource> getShardSource() = 0; /** - * Returns a list of stages that combine results from the shards. Subclasses of this class - * should not return an empty list. Must not mutate the existing source object; if different - * behaviour is required, a new source should be created and configured appropriately. It is an - * error for getMergeSources() to return a pointer to the same object as getShardSource(). + * Returns a struct representing what needs to be done to merge each shard's pipeline into a + * single stream of results. Must not mutate the existing source object; if different behaviour + * is required, a new source should be created and configured appropriately. It is an error for + * mergingLogic() to return a pointer to the same object as getShardSource(). */ - virtual std::list<boost::intrusive_ptr<DocumentSource>> getMergeSources() = 0; + virtual MergingLogic mergingLogic() = 0; protected: // It is invalid to delete through a NeedsMergerDocumentSource-typed pointer. diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.h b/src/mongo/db/pipeline/document_source_bucket_auto.h index 9bd2c9f2877..18700913240 100644 --- a/src/mongo/db/pipeline/document_source_bucket_auto.h +++ b/src/mongo/db/pipeline/document_source_bucket_auto.h @@ -63,7 +63,7 @@ public: boost::intrusive_ptr<DocumentSource> getShardSource() final { return nullptr; } - std::list<boost::intrusive_ptr<DocumentSource>> getMergeSources() final { + MergingLogic mergingLogic() final { return {this}; } diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index ac10c64b0ad..448ac608724 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -442,8 +442,8 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson( return stages; } -BSONObj DocumentSourceChangeStream::replaceResumeTokenInCommand(const BSONObj originalCmdObj, - const BSONObj resumeToken) { +BSONObj DocumentSourceChangeStream::replaceResumeTokenInCommand(BSONObj originalCmdObj, + Document resumeToken) { Document originalCmd(originalCmdObj); auto pipeline = originalCmd[AggregationRequest::kPipelineName].getArray(); // A $changeStream must be the first element of the pipeline in order to be able @@ -454,12 +454,12 @@ BSONObj DocumentSourceChangeStream::replaceResumeTokenInCommand(const BSONObj or pipeline[0][DocumentSourceChangeStream::kStageName].getDocument()); changeStreamStage[DocumentSourceChangeStreamSpec::kResumeAfterFieldName] = Value(resumeToken); - // If the command was initially specified with a startAtOperationTime, we need to remove it - // to use the new resume token. + // If the command was initially specified with a startAtOperationTime, we need to remove it to + // use the new resume token. changeStreamStage[DocumentSourceChangeStreamSpec::kStartAtOperationTimeFieldName] = Value(); pipeline[0] = Value(Document{{DocumentSourceChangeStream::kStageName, changeStreamStage.freeze()}}); - MutableDocument newCmd(originalCmd); + MutableDocument newCmd(std::move(originalCmd)); newCmd[AggregationRequest::kPipelineName] = Value(pipeline); return newCmd.freeze().toBson(); } diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index 802a717294e..5f4ee1c528c 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -182,8 +182,7 @@ public: * resumeAfter: option containing the resume token. If there was a previous resumeAfter: * option, it is removed. */ - static BSONObj replaceResumeTokenInCommand(const BSONObj originalCmdObj, - const BSONObj resumeToken); + static BSONObj replaceResumeTokenInCommand(BSONObj originalCmdObj, Document resumeToken); /** * Helper used by various change stream stages. Used for asserting that a certain Value of a diff --git a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h index cda09f2257e..31fa2704005 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h +++ b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h @@ -54,8 +54,9 @@ public: invariant(pipeState != Pipeline::SplitState::kSplitForShards); return {StreamType::kStreaming, PositionRequirement::kNone, - (pipeState == Pipeline::SplitState::kUnsplit ? HostTypeRequirement::kNone - : HostTypeRequirement::kMongoS), + // If this is parsed on mongos it should stay on mongos. If we're not in a sharded + // cluster then it's okay to run on mongod. + HostTypeRequirement::kLocalOnly, DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, TransactionRequirement::kNotAllowed, @@ -77,20 +78,10 @@ public: return nullptr; } - std::list<boost::intrusive_ptr<DocumentSource>> getMergeSources() final { + MergingLogic mergingLogic() final { // This stage must run on mongos to ensure it sees any invalidation in the correct order, - // and to ensure that all remote cursors are cleaned up properly. We also must include a - // mergingPresorted $sort stage to communicate to the AsyncResultsMerger that we need to - // merge the streams in a particular order. - const bool mergingPresorted = true; - const long long noLimit = -1; - auto sortMergingPresorted = - DocumentSourceSort::create(pExpCtx, - change_stream_constants::kSortSpec, - noLimit, - internalDocumentSourceSortMaxBlockingSortBytes.load(), - mergingPresorted); - return {sortMergingPresorted, this}; + // and to ensure that all remote cursors are cleaned up properly. + return {this, change_stream_constants::kSortSpec}; } private: diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h index f7d970bab32..ff3a32f4246 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.h +++ b/src/mongo/db/pipeline/document_source_check_resume_token.h @@ -105,13 +105,12 @@ public: GetNextResult getNext() final; const char* getSourceName() const final; - StageConstraints constraints(Pipeline::SplitState pipeState) const final { - // This stage should never be in the shards part of a split pipeline. - invariant(pipeState != Pipeline::SplitState::kSplitForShards); + StageConstraints constraints(Pipeline::SplitState) const final { return {StreamType::kStreaming, PositionRequirement::kNone, - (pipeState == Pipeline::SplitState::kUnsplit ? HostTypeRequirement::kNone - : HostTypeRequirement::kMongoS), + // If this is parsed on mongos it should stay on mongos. If we're not in a sharded + // cluster then it's okay to run on mongod. + HostTypeRequirement::kLocalOnly, DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, TransactionRequirement::kNotAllowed, @@ -128,19 +127,11 @@ public: _tokenFromClient.getClusterTime()); }; - std::list<boost::intrusive_ptr<DocumentSource>> getMergeSources() final { + MergingLogic mergingLogic() final { // This stage must run on mongos to ensure it sees the resume token, which could have come // from any shard. We also must include a mergingPresorted $sort stage to communicate to // the AsyncResultsMerger that we need to merge the streams in a particular order. - const bool mergingPresorted = true; - const long long noLimit = -1; - auto sortMergingPresorted = - DocumentSourceSort::create(pExpCtx, - change_stream_constants::kSortSpec, - noLimit, - internalDocumentSourceSortMaxBlockingSortBytes.load(), - mergingPresorted); - return {sortMergingPresorted, this}; + return {this, change_stream_constants::kSortSpec}; }; Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; diff --git a/src/mongo/db/pipeline/document_source_exchange.h b/src/mongo/db/pipeline/document_source_exchange.h index b4231424032..ba4cb56ecdb 100644 --- a/src/mongo/db/pipeline/document_source_exchange.h +++ b/src/mongo/db/pipeline/document_source_exchange.h @@ -167,7 +167,7 @@ public: boost::intrusive_ptr<DocumentSource> getShardSource() final { return this; } - std::list<boost::intrusive_ptr<DocumentSource>> getMergeSources() final { + MergingLogic mergingLogic() final { // TODO SERVER-35974 we have to revisit this when we implement consumers. return {this}; } diff --git a/src/mongo/db/pipeline/document_source_facet.h b/src/mongo/db/pipeline/document_source_facet.h index d97dec116df..92e9f2f05d2 100644 --- a/src/mongo/db/pipeline/document_source_facet.h +++ b/src/mongo/db/pipeline/document_source_facet.h @@ -125,7 +125,7 @@ public: boost::intrusive_ptr<DocumentSource> getShardSource() final { return nullptr; } - std::list<boost::intrusive_ptr<DocumentSource>> getMergeSources() final { + MergingLogic mergingLogic() final { return {this}; } diff --git a/src/mongo/db/pipeline/document_source_geo_near.cpp b/src/mongo/db/pipeline/document_source_geo_near.cpp index 1d8387309ae..8b62a8c21f8 100644 --- a/src/mongo/db/pipeline/document_source_geo_near.cpp +++ b/src/mongo/db/pipeline/document_source_geo_near.cpp @@ -235,8 +235,8 @@ DepsTracker::State DocumentSourceGeoNear::getDependencies(DepsTracker* deps) con DocumentSourceGeoNear::DocumentSourceGeoNear(const intrusive_ptr<ExpressionContext>& pExpCtx) : DocumentSource(pExpCtx), coordsIsArray(false), spherical(false) {} -std::list<boost::intrusive_ptr<DocumentSource>> DocumentSourceGeoNear::getMergeSources() { - return {DocumentSourceSort::create( - pExpCtx, BSON(distanceField->fullPath() << 1 << "$mergePresorted" << true))}; +NeedsMergerDocumentSource::MergingLogic DocumentSourceGeoNear::mergingLogic() { + return {nullptr, BSON(distanceField->fullPath() << 1)}; } + } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_geo_near.h b/src/mongo/db/pipeline/document_source_geo_near.h index a0a6ea878ad..9237faeddcd 100644 --- a/src/mongo/db/pipeline/document_source_geo_near.h +++ b/src/mongo/db/pipeline/document_source_geo_near.h @@ -128,7 +128,8 @@ public: /** * In a sharded cluster, this becomes a merge sort by distance, from nearest to furthest. */ - std::list<boost::intrusive_ptr<DocumentSource>> getMergeSources() final; + MergingLogic mergingLogic() final; + private: explicit DocumentSourceGeoNear(const boost::intrusive_ptr<ExpressionContext>& pExpCtx); diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp index fc5d6b1e2ee..1563c6ba14b 100644 --- a/src/mongo/db/pipeline/document_source_group.cpp +++ b/src/mongo/db/pipeline/document_source_group.cpp @@ -839,13 +839,13 @@ intrusive_ptr<DocumentSource> DocumentSourceGroup::getShardSource() { return this; // No modifications necessary when on shard } -std::list<intrusive_ptr<DocumentSource>> DocumentSourceGroup::getMergeSources() { - intrusive_ptr<DocumentSourceGroup> pMerger(new DocumentSourceGroup(pExpCtx)); - pMerger->setDoingMerge(true); +NeedsMergerDocumentSource::MergingLogic DocumentSourceGroup::mergingLogic() { + intrusive_ptr<DocumentSourceGroup> mergingGroup(new DocumentSourceGroup(pExpCtx)); + mergingGroup->setDoingMerge(true); VariablesParseState vps = pExpCtx->variablesParseState; /* the merger will use the same grouping key */ - pMerger->setIdExpression(ExpressionFieldPath::parse(pExpCtx, "$$ROOT._id", vps)); + mergingGroup->setIdExpression(ExpressionFieldPath::parse(pExpCtx, "$$ROOT._id", vps)); for (auto&& accumulatedField : _accumulatedFields) { // The merger's output field names will be the same, as will the accumulator factories. @@ -855,12 +855,12 @@ std::list<intrusive_ptr<DocumentSource>> DocumentSourceGroup::getMergeSources() auto copiedAccumuledField = accumulatedField; copiedAccumuledField.expression = ExpressionFieldPath::parse(pExpCtx, "$$ROOT." + accumulatedField.fieldName, vps); - pMerger->addAccumulator(copiedAccumuledField); + mergingGroup->addAccumulator(copiedAccumuledField); } - return {pMerger}; -} + return {mergingGroup}; } +} // namespace mongo #include "mongo/db/sorter/sorter.cpp" // Explicit instantiation unneeded since we aren't exposing Sorter outside of this file. diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h index 265ef8502d6..421c244641e 100644 --- a/src/mongo/db/pipeline/document_source_group.h +++ b/src/mongo/db/pipeline/document_source_group.h @@ -100,7 +100,7 @@ public: // Virtuals for NeedsMergerDocumentSource. boost::intrusive_ptr<DocumentSource> getShardSource() final; - std::list<boost::intrusive_ptr<DocumentSource>> getMergeSources() final; + MergingLogic mergingLogic() final; /** * Returns true if this $group stage used disk during execution and false otherwise. diff --git a/src/mongo/db/pipeline/document_source_group_test.cpp b/src/mongo/db/pipeline/document_source_group_test.cpp index 4062c497a53..3a8cecedf54 100644 --- a/src/mongo/db/pipeline/document_source_group_test.cpp +++ b/src/mongo/db/pipeline/document_source_group_test.cpp @@ -543,10 +543,11 @@ protected: // case only one shard is in use. NeedsMergerDocumentSource* splittable = dynamic_cast<NeedsMergerDocumentSource*>(group()); ASSERT(splittable); - auto routerSources = splittable->getMergeSources(); - ASSERT_EQ(routerSources.size(), 1UL); - ASSERT_NOT_EQUALS(group(), routerSources.front().get()); - return routerSources.front(); + auto mergeLogic = splittable->mergingLogic(); + ASSERT(mergeLogic.mergingStage); + ASSERT_NOT_EQUALS(group(), mergeLogic.mergingStage); + ASSERT_FALSE(static_cast<bool>(mergeLogic.inputSortPattern)); + return mergeLogic.mergingStage; } void checkResultSet(const intrusive_ptr<DocumentSource>& sink) { // Load the results from the DocumentSourceGroup and sort them by _id. diff --git a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h index 2d5bf946ead..b96c5cdba51 100644 --- a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h +++ b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h @@ -62,7 +62,7 @@ public: return nullptr; } - std::list<boost::intrusive_ptr<DocumentSource>> getMergeSources() final { + MergingLogic mergingLogic() final { return {this}; } diff --git a/src/mongo/db/pipeline/document_source_limit.cpp b/src/mongo/db/pipeline/document_source_limit.cpp index 8256d055805..97afa6331b4 100644 --- a/src/mongo/db/pipeline/document_source_limit.cpp +++ b/src/mongo/db/pipeline/document_source_limit.cpp @@ -1,30 +1,30 @@ /** -* Copyright (C) 2011 10gen Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ + * Copyright (C) 2011 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ #include "mongo/platform/basic.h" @@ -101,4 +101,4 @@ intrusive_ptr<DocumentSource> DocumentSourceLimit::createFromBson( long long limit = elem.numberLong(); return DocumentSourceLimit::create(pExpCtx, limit); } -} +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_limit.h b/src/mongo/db/pipeline/document_source_limit.h index aeda5902054..703e35ee2eb 100644 --- a/src/mongo/db/pipeline/document_source_limit.h +++ b/src/mongo/db/pipeline/document_source_limit.h @@ -92,7 +92,7 @@ public: * merge pipeline. Unlike the shards source, it is necessary for this stage to run on the * merging host in order to produce correct pipeline output. */ - std::list<boost::intrusive_ptr<DocumentSource>> getMergeSources() final { + MergingLogic mergingLogic() final { return {DocumentSourceLimit::create(pExpCtx, _limit)}; } diff --git a/src/mongo/db/pipeline/document_source_limit_test.cpp b/src/mongo/db/pipeline/document_source_limit_test.cpp index eb3ba447457..611ace66cd4 100644 --- a/src/mongo/db/pipeline/document_source_limit_test.cpp +++ b/src/mongo/db/pipeline/document_source_limit_test.cpp @@ -59,6 +59,23 @@ TEST_F(DocumentSourceLimitTest, ShouldDisposeSourceWhenLimitIsReached) { ASSERT_TRUE(source->isDisposed); } +TEST_F(DocumentSourceLimitTest, ShouldNotBeAbleToLimitToZeroDocuments) { + auto source = DocumentSourceMock::create({"{a: 1}", "{a: 2}"}); + ASSERT_THROWS_CODE(DocumentSourceLimit::create(getExpCtx(), 0), AssertionException, 15958); +} + +TEST_F(DocumentSourceLimitTest, ShouldRejectUserLimitOfZero) { + ASSERT_THROWS_CODE( + DocumentSourceLimit::createFromBson(BSON("$limit" << 0).firstElement(), getExpCtx()), + AssertionException, + 15958); + + // A $limit with size 1 should be okay. + auto shouldNotThrow = + DocumentSourceLimit::createFromBson(BSON("$limit" << 1).firstElement(), getExpCtx()); + ASSERT(dynamic_cast<DocumentSourceLimit*>(shouldNotThrow.get())); +} + TEST_F(DocumentSourceLimitTest, TwoLimitStagesShouldCombineIntoOne) { Pipeline::SourceContainer container; auto firstLimit = DocumentSourceLimit::create(getExpCtx(), 10); diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h index f9597592b69..d059cbc3a9e 100644 --- a/src/mongo/db/pipeline/document_source_lookup.h +++ b/src/mongo/db/pipeline/document_source_lookup.h @@ -134,7 +134,7 @@ public: return nullptr; } - std::list<boost::intrusive_ptr<DocumentSource>> getMergeSources() final { + MergingLogic mergingLogic() final { return {this}; } diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h index 83d6b1e1d42..573667176e8 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h @@ -62,9 +62,10 @@ public: invariant(pipeState != Pipeline::SplitState::kSplitForShards); StageConstraints constraints(StreamType::kStreaming, PositionRequirement::kNone, - pipeState == Pipeline::SplitState::kUnsplit - ? HostTypeRequirement::kNone - : HostTypeRequirement::kMongoS, + // If this is parsed on mongos it should stay on mongos. If + // we're not in a sharded cluster then it's okay to run on + // mongod. + HostTypeRequirement::kLocalOnly, DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, TransactionRequirement::kNotAllowed, diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.cpp b/src/mongo/db/pipeline/document_source_merge_cursors.cpp index bce0a16d6be..795755893de 100644 --- a/src/mongo/db/pipeline/document_source_merge_cursors.cpp +++ b/src/mongo/db/pipeline/document_source_merge_cursors.cpp @@ -30,6 +30,7 @@ #include "mongo/db/pipeline/document_source_merge_cursors.h" #include "mongo/db/pipeline/document_source_sort.h" +#include "mongo/db/query/find_common.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/s/grid.h" @@ -51,14 +52,39 @@ DocumentSourceMergeCursors::DocumentSourceMergeCursors( _executor(executor), _armParams(std::move(armParams)) {} +std::size_t DocumentSourceMergeCursors::getNumRemotes() const { + if (_armParams) { + return _armParams->getRemotes().size(); + } + return _blockingResultsMerger->getNumRemotes(); +} + +bool DocumentSourceMergeCursors::remotesExhausted() const { + if (_armParams) { + // We haven't started iteration yet. + return false; + } + return _blockingResultsMerger->remotesExhausted(); +} + +void DocumentSourceMergeCursors::populateMerger() { + invariant(!_blockingResultsMerger); + invariant(_armParams); + _blockingResultsMerger.emplace(pExpCtx->opCtx, std::move(*_armParams), _executor); + _armParams = boost::none; +} + +std::unique_ptr<RouterStageMerge> DocumentSourceMergeCursors::convertToRouterStage() { + invariant(!_blockingResultsMerger, "Expected conversion to happen before execution"); + return stdx::make_unique<RouterStageMerge>(pExpCtx->opCtx, _executor, std::move(*_armParams)); +} + DocumentSource::GetNextResult DocumentSourceMergeCursors::getNext() { - // We don't expect or support tailable cursors to be executing through this stage. - invariant(pExpCtx->tailableMode == TailableModeEnum::kNormal); - if (!_arm) { - _arm.emplace(pExpCtx->opCtx, _executor, std::move(*_armParams)); - _armParams = boost::none; + if (!_blockingResultsMerger) { + populateMerger(); } - auto next = uassertStatusOK(_arm->blockingNext()); + + auto next = uassertStatusOK(_blockingResultsMerger->next(pExpCtx->opCtx, _execContext)); if (next.isEOF()) { return GetNextResult::makeEOF(); } @@ -67,7 +93,7 @@ DocumentSource::GetNextResult DocumentSourceMergeCursors::getNext() { Value DocumentSourceMergeCursors::serialize( boost::optional<ExplainOptions::Verbosity> explain) const { - invariant(!_arm); + invariant(!_blockingResultsMerger); invariant(_armParams); return Value(Document{{kStageName, _armParams->toBSON()}}); } @@ -86,7 +112,7 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceMergeCursors::createFromBson( std::move(ownedObj)); } -boost::intrusive_ptr<DocumentSource> DocumentSourceMergeCursors::create( +boost::intrusive_ptr<DocumentSourceMergeCursors> DocumentSourceMergeCursors::create( executor::TaskExecutor* executor, AsyncResultsMergerParams params, const boost::intrusive_ptr<ExpressionContext>& expCtx) { @@ -94,20 +120,20 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceMergeCursors::create( } void DocumentSourceMergeCursors::detachFromOperationContext() { - if (_arm) { - _arm->detachFromOperationContext(); + if (_blockingResultsMerger) { + _blockingResultsMerger->detachFromOperationContext(); } } void DocumentSourceMergeCursors::reattachToOperationContext(OperationContext* opCtx) { - if (_arm) { - _arm->reattachToOperationContext(opCtx); + if (_blockingResultsMerger) { + _blockingResultsMerger->reattachToOperationContext(opCtx); } } void DocumentSourceMergeCursors::doDispose() { - if (_arm) { - _arm->blockingKill(pExpCtx->opCtx); + if (_blockingResultsMerger) { + _blockingResultsMerger->kill(pExpCtx->opCtx); } } diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.h b/src/mongo/db/pipeline/document_source_merge_cursors.h index 4034a267aba..7f033fb8c15 100644 --- a/src/mongo/db/pipeline/document_source_merge_cursors.h +++ b/src/mongo/db/pipeline/document_source_merge_cursors.h @@ -30,7 +30,8 @@ #include "mongo/db/pipeline/document_source.h" #include "mongo/executor/task_executor.h" -#include "mongo/s/query/async_results_merger.h" +#include "mongo/s/query/blocking_results_merger.h" +#include "mongo/s/query/router_stage_merge.h" namespace mongo { @@ -57,11 +58,17 @@ public: /** * Creates a new DocumentSourceMergeCursors from the given parameters. */ - static boost::intrusive_ptr<DocumentSource> create( + static boost::intrusive_ptr<DocumentSourceMergeCursors> create( executor::TaskExecutor*, AsyncResultsMergerParams, const boost::intrusive_ptr<ExpressionContext>&); + /** + * Extracts the remote cursors and converts the execution machinery from a DocumentSource to a + * RouterStage interface. Can only be called at planning time before any call to getNext(). + */ + std::unique_ptr<RouterStageMerge> convertToRouterStage(); + const char* getSourceName() const final { return kStageName.rawData(); } @@ -90,6 +97,35 @@ public: GetNextResult getNext() final; + std::size_t getNumRemotes() const; + + bool remotesExhausted() const; + + void setExecContext(RouterExecStage::ExecContext execContext) { + _execContext = execContext; + } + + Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) { + if (!_blockingResultsMerger) { + // In cases where a cursor was established with a batchSize of 0, the first getMore + // might specify a custom maxTimeMS (AKA await data timeout). In these cases we will not + // have iterated the cursor yet so will not have populated the merger, but need to + // remember/track the custom await data timeout. We will soon iterate the cursor, so we + // just populate the merger now and let it track the await data timeout itself. + populateMerger(); + } + return _blockingResultsMerger->setAwaitDataTimeout(awaitDataTimeout); + } + + /** + * Adds the specified shard cursors to the set of cursors to be merged. The results from the + * new cursors will be returned as normal through getNext(). + */ + void addNewShardCursors(std::vector<RemoteCursor>&& newCursors) { + invariant(_blockingResultsMerger); + _blockingResultsMerger->addNewShardCursors(std::move(newCursors)); + } + protected: void doDispose() final; @@ -99,20 +135,33 @@ private: const boost::intrusive_ptr<ExpressionContext>&, boost::optional<BSONObj> ownedParamsSpec = boost::none); + /** + * Converts '_armParams' into the execution machinery to merge the cursors. See below for why + * this is done lazily. Clears '_armParams' and populates '_blockingResultsMerger'. + */ + void populateMerger(); + // When we have parsed the params out of a BSONObj, the object needs to stay around while the // params are in use. We store them here. boost::optional<BSONObj> _armParamsObj; executor::TaskExecutor* _executor; - // '_armParams' is populated until the first call to getNext(). Upon the first call to getNext() - // '_arm' will be populated using '_armParams', and '_armParams' will become boost::none. So if - // getNext() is never called we will never populate '_arm'. If we did so the destruction of this - // stage would cause the cursors within the ARM to be killed prematurely. For example, if this - // stage is parsed on mongos then forwarded to the shards, it should not kill the cursors when - // it goes out of scope on mongos. + // '_blockingResultsMerger' is lazily populated. Until we need to use it, '_armParams' will be + // populated with the parameters. Once we start using '_blockingResultsMerger', '_armParams' + // will become boost::none. We do this to prevent populating '_blockingResultsMerger' on mongos + // before serializing this stage and sending it to a shard to perform the merge. If we always + // populated '_blockingResultsMerger', then the destruction of this stage would cause the + // cursors within '_blockingResultsMerger' to be killed prematurely. For example, if this stage + // is parsed on mongos then forwarded to the shards, it should not kill the cursors when it goes + // out of scope on mongos. boost::optional<AsyncResultsMergerParams> _armParams; - boost::optional<AsyncResultsMerger> _arm; + boost::optional<BlockingResultsMerger> _blockingResultsMerger; + + // The ExecContext is needed because if we're a tailable, awaitData cursor, we only want to + // 'await data' if we 1) are in a getMore and 2) don't already have data to return. This context + // allows us to determine which situation we're in. + RouterExecStage::ExecContext _execContext; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h index 5168346957b..f8ee8829038 100644 --- a/src/mongo/db/pipeline/document_source_out.h +++ b/src/mongo/db/pipeline/document_source_out.h @@ -66,7 +66,7 @@ public: boost::intrusive_ptr<DocumentSource> getShardSource() final { return NULL; } - std::list<boost::intrusive_ptr<DocumentSource>> getMergeSources() final { + MergingLogic mergingLogic() final { return {this}; } diff --git a/src/mongo/db/pipeline/document_source_sample.cpp b/src/mongo/db/pipeline/document_source_sample.cpp index 0f7583bd07b..03098a48d36 100644 --- a/src/mongo/db/pipeline/document_source_sample.cpp +++ b/src/mongo/db/pipeline/document_source_sample.cpp @@ -119,13 +119,13 @@ intrusive_ptr<DocumentSource> DocumentSourceSample::getShardSource() { return this; } -std::list<intrusive_ptr<DocumentSource>> DocumentSourceSample::getMergeSources() { - // Just need to merge the pre-sorted documents by their random values. - BSONObjBuilder randMergeSortSpec; - - randMergeSortSpec.appendElements(randSortSpec); - randMergeSortSpec.append("$mergePresorted", true); - - return {DocumentSourceSort::create(pExpCtx, randMergeSortSpec.obj(), _size)}; +NeedsMergerDocumentSource::MergingLogic DocumentSourceSample::mergingLogic() { + // On the merger we need to merge the pre-sorted documents by their random values, then limit to + // the number we need. Here we don't use 'randSortSpec' because it uses a metadata sort which + // the merging logic does not understand. The merging logic will use the serialized sort key, + // and this sort pattern is just used to communicate ascending/descending information. A pattern + // like {$meta: "randVal"} is neither ascending nor descending, and so will not be useful when + // constructing the merging logic. + return {_size > 0 ? DocumentSourceLimit::create(pExpCtx, _size) : nullptr, BSON("$rand" << -1)}; } -} // mongo +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_sample.h b/src/mongo/db/pipeline/document_source_sample.h index 6195328b90d..d6f8f858d64 100644 --- a/src/mongo/db/pipeline/document_source_sample.h +++ b/src/mongo/db/pipeline/document_source_sample.h @@ -57,7 +57,7 @@ public: } boost::intrusive_ptr<DocumentSource> getShardSource() final; - std::list<boost::intrusive_ptr<DocumentSource>> getMergeSources() final; + MergingLogic mergingLogic() final; long long getSampleSize() const { return _size; diff --git a/src/mongo/db/pipeline/document_source_skip.h b/src/mongo/db/pipeline/document_source_skip.h index 927957d87a7..014092865f6 100644 --- a/src/mongo/db/pipeline/document_source_skip.h +++ b/src/mongo/db/pipeline/document_source_skip.h @@ -82,12 +82,10 @@ public: return DepsTracker::State::SEE_NEXT; // This doesn't affect needed fields } - // Virtuals for NeedsMergerDocumentSource - // Need to run on rounter. Can't run on shards. boost::intrusive_ptr<DocumentSource> getShardSource() final { - return NULL; + return nullptr; } - std::list<boost::intrusive_ptr<DocumentSource>> getMergeSources() final { + MergingLogic mergingLogic() final { return {this}; } diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp index d992fb8053c..cb753b400f9 100644 --- a/src/mongo/db/pipeline/document_source_sort.cpp +++ b/src/mongo/db/pipeline/document_source_sort.cpp @@ -1,30 +1,30 @@ /** -* Copyright (C) 2011 10gen Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ + * Copyright (C) 2011 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ #include "mongo/platform/basic.h" @@ -44,9 +44,9 @@ namespace mongo { using boost::intrusive_ptr; -using std::unique_ptr; using std::make_pair; using std::string; +using std::unique_ptr; using std::vector; namespace { @@ -98,7 +98,7 @@ Value deserializeSortKey(size_t sortPatternSize, BSONObj bsonSortKey) { constexpr StringData DocumentSourceSort::kStageName; DocumentSourceSort::DocumentSourceSort(const intrusive_ptr<ExpressionContext>& pExpCtx) - : DocumentSource(pExpCtx), _mergingPresorted(false) {} + : DocumentSource(pExpCtx) {} REGISTER_DOCUMENT_SOURCE(sort, LiteParsedDocumentSourceDefault::parse, @@ -106,8 +106,6 @@ REGISTER_DOCUMENT_SOURCE(sort, DocumentSource::GetNextResult DocumentSourceSort::getNext() { pExpCtx->checkForInterrupt(); - invariant(!_mergingPresorted); // A presorted-merge should be optimized into the merge, and - // never executed. if (!_populated) { const auto populationResult = populate(); @@ -118,9 +116,6 @@ DocumentSource::GetNextResult DocumentSourceSort::getNext() { } if (!_output || !_output->more()) { - // Need to be sure connections are marked as done so they can be returned to the connection - // pool. This only needs to happen in the _mergingPresorted case, but it doesn't hurt to - // always do it. dispose(); return GetNextResult::makeEOF(); } @@ -131,17 +126,12 @@ DocumentSource::GetNextResult DocumentSourceSort::getNext() { void DocumentSourceSort::serializeToArray( std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const { if (explain) { // always one Value for combined $sort + $limit - array.push_back(Value(DOC( - kStageName << DOC("sortKey" << sortKeyPattern(SortKeySerialization::kForExplain) - << "mergePresorted" - << (_mergingPresorted ? Value(true) : Value()) - << "limit" - << (_limitSrc ? Value(_limitSrc->getLimit()) : Value()))))); + array.push_back( + Value(DOC(kStageName << DOC( + "sortKey" << sortKeyPattern(SortKeySerialization::kForExplain) << "limit" + << (_limitSrc ? Value(_limitSrc->getLimit()) : Value()))))); } else { // one Value for $sort and maybe a Value for $limit MutableDocument inner(sortKeyPattern(SortKeySerialization::kForPipelineSerialization)); - if (_mergingPresorted) { - inner["$mergePresorted"] = Value(true); - } array.push_back(Value(DOC(kStageName << inner.freeze()))); if (_limitSrc) { @@ -243,24 +233,16 @@ intrusive_ptr<DocumentSourceSort> DocumentSourceSort::create( const intrusive_ptr<ExpressionContext>& pExpCtx, BSONObj sortOrder, long long limit, - boost::optional<uint64_t> maxMemoryUsageBytes, - bool mergingPresorted) { + boost::optional<uint64_t> maxMemoryUsageBytes) { intrusive_ptr<DocumentSourceSort> pSort(new DocumentSourceSort(pExpCtx)); pSort->_maxMemoryUsageBytes = maxMemoryUsageBytes ? *maxMemoryUsageBytes : internalDocumentSourceSortMaxBlockingSortBytes.load(); pSort->_rawSort = sortOrder.getOwned(); - pSort->_mergingPresorted = mergingPresorted; for (auto&& keyField : sortOrder) { auto fieldName = keyField.fieldNameStringData(); - if ("$mergePresorted" == fieldName) { - verify(keyField.Bool()); - pSort->_mergingPresorted = true; - continue; - } - SortPatternPart patternPart; if (keyField.type() == Object) { @@ -524,25 +506,14 @@ int DocumentSourceSort::compare(const Value& lhs, const Value& rhs) const { } intrusive_ptr<DocumentSource> DocumentSourceSort::getShardSource() { - verify(!_mergingPresorted); return this; } -std::list<intrusive_ptr<DocumentSource>> DocumentSourceSort::getMergeSources() { - verify(!_mergingPresorted); - intrusive_ptr<DocumentSourceSort> other = new DocumentSourceSort(pExpCtx); - other->_sortPattern = _sortPattern; - other->_sortKeyGen = SortKeyGenerator{ - other->sortKeyPattern(SortKeySerialization::kForPipelineSerialization).toBson(), - pExpCtx->getCollator()}; - other->_paths = _paths; - other->_limitSrc = _limitSrc; - other->_maxMemoryUsageBytes = _maxMemoryUsageBytes; - other->_mergingPresorted = true; - other->_rawSort = _rawSort; - return {other}; -} +NeedsMergerDocumentSource::MergingLogic DocumentSourceSort::mergingLogic() { + return {_limitSrc ? DocumentSourceLimit::create(pExpCtx, _limitSrc->getLimit()) : nullptr, + sortKeyPattern(SortKeySerialization::kForSortKeyMerging).toBson()}; } +} // namespace mongo #include "mongo/db/sorter/sorter.cpp" // Explicit instantiation unneeded since we aren't exposing Sorter outside of this file. diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h index d2165acdb89..ac860c5b7a5 100644 --- a/src/mongo/db/pipeline/document_source_sort.h +++ b/src/mongo/db/pipeline/document_source_sort.h @@ -61,16 +61,14 @@ public: return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>{}, {}}; } - StageConstraints constraints(Pipeline::SplitState pipeState) const final { - StageConstraints constraints( - _mergingPresorted ? StreamType::kStreaming : StreamType::kBlocking, - PositionRequirement::kNone, - HostTypeRequirement::kNone, - _mergingPresorted ? DiskUseRequirement::kNoDiskUse : DiskUseRequirement::kWritesTmpData, - _mergingPresorted ? FacetRequirement::kNotAllowed : FacetRequirement::kAllowed, - TransactionRequirement::kAllowed, - _mergingPresorted ? ChangeStreamRequirement::kWhitelist - : ChangeStreamRequirement::kBlacklist); + StageConstraints constraints(Pipeline::SplitState) const final { + StageConstraints constraints(StreamType::kBlocking, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kWritesTmpData, + FacetRequirement::kAllowed, + TransactionRequirement::kAllowed, + ChangeStreamRequirement::kBlacklist); // Can't swap with a $match if a limit has been absorbed, as $match can't swap with $limit. constraints.canSwapWithMatch = !_limitSrc; @@ -84,7 +82,7 @@ public: DepsTracker::State getDependencies(DepsTracker* deps) const final; boost::intrusive_ptr<DocumentSource> getShardSource() final; - std::list<boost::intrusive_ptr<DocumentSource>> getMergeSources() final; + MergingLogic mergingLogic() final; /** * Write out a Document whose contents are the sort key pattern. @@ -105,15 +103,7 @@ public: const boost::intrusive_ptr<ExpressionContext>& pExpCtx, BSONObj sortOrder, long long limit = -1, - boost::optional<uint64_t> maxMemoryUsageBytes = boost::none, - bool mergingPresorted = false); - - /** - * Returns true if this $sort stage is merging presorted streams. - */ - bool mergingPresorted() const { - return _mergingPresorted; - } + boost::optional<uint64_t> maxMemoryUsageBytes = boost::none); /** * Returns -1 for no limit. @@ -267,7 +257,6 @@ private: uint64_t _maxMemoryUsageBytes; bool _done; - bool _mergingPresorted; // TODO SERVER-34009 Remove this flag. std::unique_ptr<MySorter> _sorter; std::unique_ptr<MySorter::Iterator> _output; bool _usedDisk = false; diff --git a/src/mongo/db/pipeline/document_source_sort_test.cpp b/src/mongo/db/pipeline/document_source_sort_test.cpp index 42a6590dca6..4aacdd1c4cc 100644 --- a/src/mongo/db/pipeline/document_source_sort_test.cpp +++ b/src/mongo/db/pipeline/document_source_sort_test.cpp @@ -131,7 +131,7 @@ TEST_F(DocumentSourceSortTest, SortWithLimit) { ASSERT_BSONOBJ_EQ(arr[0].getDocument().toBson(), BSON("$sort" << BSON("a" << 1))); ASSERT(sort()->getShardSource() != nullptr); - ASSERT(!sort()->getMergeSources().empty()); + ASSERT(sort()->mergingLogic().mergingStage == nullptr); } container.push_back(DocumentSourceLimit::create(expCtx, 10)); @@ -158,7 +158,8 @@ TEST_F(DocumentSourceSortTest, SortWithLimit) { DOC_ARRAY(DOC("$sort" << DOC("a" << 1)) << DOC("$limit" << sort()->getLimit()))); ASSERT(sort()->getShardSource() != nullptr); - ASSERT(!sort()->getMergeSources().empty()); + ASSERT(sort()->mergingLogic().mergingStage != nullptr); + ASSERT(dynamic_cast<DocumentSourceLimit*>(sort()->mergingLogic().mergingStage.get())); } TEST_F(DocumentSourceSortTest, Dependencies) { diff --git a/src/mongo/s/query/router_stage_update_on_add_shard.cpp b/src/mongo/db/pipeline/document_source_update_on_add_shard.cpp index 61fa2a9176d..7177d705c67 100644 --- a/src/mongo/s/query/router_stage_update_on_add_shard.cpp +++ b/src/mongo/db/pipeline/document_source_update_on_add_shard.cpp @@ -1,5 +1,5 @@ /** - * Copyright (C) 2017 MongoDB Inc. + * Copyright (C) 2018 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, @@ -26,95 +26,98 @@ * then also delete it in the license file. */ -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery -#include "mongo/s/query/router_stage_update_on_add_shard.h" +#include "mongo/db/pipeline/document_source_update_on_add_shard.h" #include <algorithm> -#include "mongo/base/checked_cast.h" #include "mongo/db/pipeline/document_source_change_stream.h" -#include "mongo/executor/task_executor_pool.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" +#include "mongo/s/query/async_results_merger_params_gen.h" #include "mongo/s/query/establish_cursors.h" -#include "mongo/s/query/router_stage_merge.h" -#include "mongo/util/log.h" namespace mongo { namespace { // Returns true if the change stream document has an 'operationType' of 'newShardDetected'. -bool needsUpdate(const StatusWith<ClusterQueryResult>& childResult) { - if (!childResult.isOK() || childResult.getValue().isEOF()) { - return false; - } - return ((*childResult.getValue().getResult())[DocumentSourceChangeStream::kOperationTypeField] - .str() == DocumentSourceChangeStream::kNewShardDetectedOpType); +bool needsUpdate(const Document& childResult) { + return childResult[DocumentSourceChangeStream::kOperationTypeField].getStringData() == + DocumentSourceChangeStream::kNewShardDetectedOpType; } +} // namespace + +boost::intrusive_ptr<DocumentSourceUpdateOnAddShard> DocumentSourceUpdateOnAddShard::create( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + executor::TaskExecutor* executor, + const boost::intrusive_ptr<DocumentSourceMergeCursors>& mergeCursors, + std::vector<ShardId> shardsWithCursors, + BSONObj cmdToRunOnNewShards) { + return new DocumentSourceUpdateOnAddShard( + expCtx, executor, mergeCursors, std::move(shardsWithCursors), cmdToRunOnNewShards); } -RouterStageUpdateOnAddShard::RouterStageUpdateOnAddShard(OperationContext* opCtx, - executor::TaskExecutor* executor, - ClusterClientCursorParams* params, - std::vector<ShardId> shardIds, - BSONObj cmdToRunOnNewShards) - : RouterExecStage(opCtx, stdx::make_unique<RouterStageMerge>(opCtx, executor, params)), - _params(params), - _shardIds(std::move(shardIds)), - _cmdToRunOnNewShards(cmdToRunOnNewShards) {} +DocumentSourceUpdateOnAddShard::DocumentSourceUpdateOnAddShard( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + executor::TaskExecutor* executor, + const boost::intrusive_ptr<DocumentSourceMergeCursors>& mergeCursors, + std::vector<ShardId>&& shardsWithCursors, + BSONObj cmdToRunOnNewShards) + : DocumentSource(expCtx), + _executor(executor), + _mergeCursors(mergeCursors), + _shardsWithCursors(std::move(shardsWithCursors)), + _cmdToRunOnNewShards(cmdToRunOnNewShards.getOwned()) {} + +DocumentSource::GetNextResult DocumentSourceUpdateOnAddShard::getNext() { + auto childResult = pSource->getNext(); -StatusWith<ClusterQueryResult> RouterStageUpdateOnAddShard::next( - RouterExecStage::ExecContext execContext) { - auto childStage = getChildStage(); - auto childResult = childStage->next(execContext); - while (needsUpdate(childResult)) { - addNewShardCursors(*childResult.getValue().getResult()); - childResult = childStage->next(execContext); + while (childResult.isAdvanced() && needsUpdate(childResult.getDocument())) { + addNewShardCursors(childResult.getDocument()); + childResult = pSource->getNext(); } return childResult; } -void RouterStageUpdateOnAddShard::addNewShardCursors(BSONObj newShardDetectedObj) { - checked_cast<RouterStageMerge*>(getChildStage()) - ->addNewShardCursors(establishShardCursorsOnNewShards(newShardDetectedObj)); +void DocumentSourceUpdateOnAddShard::addNewShardCursors(const Document& newShardDetectedObj) { + _mergeCursors->addNewShardCursors(establishShardCursorsOnNewShards(newShardDetectedObj)); } -std::vector<RemoteCursor> RouterStageUpdateOnAddShard::establishShardCursorsOnNewShards( - const BSONObj& newShardDetectedObj) { - auto* opCtx = getOpCtx(); - // Reload the shard registry. We need to ensure a reload initiated after calling this method +std::vector<RemoteCursor> DocumentSourceUpdateOnAddShard::establishShardCursorsOnNewShards( + const Document& newShardDetectedObj) { + auto* opCtx = pExpCtx->opCtx; + // Reload the shard registry. We need to ensure a reload initiated after calling this method // caused the reload, otherwise we aren't guaranteed to get all the new shards. auto* shardRegistry = Grid::get(opCtx)->shardRegistry(); if (!shardRegistry->reload(opCtx)) { // A 'false' return from shardRegistry.reload() means a reload was already in progress and - // it completed before reload() returned. So another reload(), regardless of return - // value, will ensure a reload started after the first call to reload(). + // it completed before reload() returned. So another reload(), regardless of return value, + // will ensure a reload started after the first call to reload(). shardRegistry->reload(opCtx); } std::vector<ShardId> shardIds, newShardIds; shardRegistry->getAllShardIdsNoReload(&shardIds); - std::sort(_shardIds.begin(), _shardIds.end()); + std::sort(_shardsWithCursors.begin(), _shardsWithCursors.end()); std::sort(shardIds.begin(), shardIds.end()); std::set_difference(shardIds.begin(), shardIds.end(), - _shardIds.begin(), - _shardIds.end(), + _shardsWithCursors.begin(), + _shardsWithCursors.end(), std::back_inserter(newShardIds)); auto cmdObj = DocumentSourceChangeStream::replaceResumeTokenInCommand( _cmdToRunOnNewShards, - newShardDetectedObj[DocumentSourceChangeStream::kIdField].embeddedObject()); + newShardDetectedObj[DocumentSourceChangeStream::kIdField].getDocument()); std::vector<std::pair<ShardId, BSONObj>> requests; for (const auto& shardId : newShardIds) { requests.emplace_back(shardId, cmdObj); - _shardIds.push_back(shardId); + _shardsWithCursors.push_back(shardId); } const bool allowPartialResults = false; // partial results are not allowed return establishCursors(opCtx, - Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), - _params->nsString, - _params->readPreference.value_or(ReadPreferenceSetting()), + _executor, + pExpCtx->ns, + ReadPreferenceSetting::get(opCtx), requests, allowPartialResults); } diff --git a/src/mongo/db/pipeline/document_source_update_on_add_shard.h b/src/mongo/db/pipeline/document_source_update_on_add_shard.h new file mode 100644 index 00000000000..4c6e0473528 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_update_on_add_shard.h @@ -0,0 +1,98 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_merge_cursors.h" +#include "mongo/executor/task_executor.h" +#include "mongo/s/shard_id.h" + +namespace mongo { + +/** + * An internal stage used as part of the change streams infrastructure to listen for an event + * signaling that a new shard now has potentially matching data. For example, this stage will + * detect if a collection is being watched and a chunk for that collection migrates to a shard for + * the first time. When this event is detected, this stage will establish a new cursor on that + * shard and add it to the cursors being merged. + */ +class DocumentSourceUpdateOnAddShard final : public DocumentSource { +public: + /** + * Creates a new stage which will establish a new cursor and add it to the cursors being merged + * by 'mergeCursorsStage' whenever a new shard is detected by a change stream. + */ + static boost::intrusive_ptr<DocumentSourceUpdateOnAddShard> create( + const boost::intrusive_ptr<ExpressionContext>&, + executor::TaskExecutor*, + const boost::intrusive_ptr<DocumentSourceMergeCursors>&, + std::vector<ShardId> shardsWithCursors, + BSONObj cmdToRunOnNewShards); + + Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final { + // We only ever expect to add this stage if the pipeline is being executed locally on a + // mongos. In this case, it should never be serialized. + MONGO_UNREACHABLE; + } + + virtual StageConstraints constraints(Pipeline::SplitState) const { + return {StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kMongoS, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed, + TransactionRequirement::kNotAllowed, + ChangeStreamRequirement::kChangeStreamStage}; + } + + GetNextResult getNext() final; + +private: + DocumentSourceUpdateOnAddShard(const boost::intrusive_ptr<ExpressionContext>&, + executor::TaskExecutor*, + const boost::intrusive_ptr<DocumentSourceMergeCursors>&, + std::vector<ShardId>&& shardsWithCursors, + BSONObj cmdToRunOnNewShards); + + /** + * Establish the new cursors and tell the RouterStageMerge about them. + */ + void addNewShardCursors(const Document& newShardDetectedObj); + + /** + * Open the cursors on the new shards. + */ + std::vector<RemoteCursor> establishShardCursorsOnNewShards(const Document& newShardDetectedObj); + + executor::TaskExecutor* _executor; + boost::intrusive_ptr<DocumentSourceMergeCursors> _mergeCursors; + std::vector<ShardId> _shardsWithCursors; + BSONObj _cmdToRunOnNewShards; +}; +} // namespace mongo diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index 31b90963034..1a82bf1569a 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -38,7 +38,6 @@ #include "mongo/db/jsobj.h" #include "mongo/db/operation_context.h" #include "mongo/db/pipeline/accumulator.h" -#include "mongo/db/pipeline/cluster_aggregation_planner.h" #include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_geo_near.h" @@ -335,25 +334,6 @@ bool Pipeline::usedDisk() { _sources.begin(), _sources.end(), [](const auto& stage) { return stage->usedDisk(); }); } -std::unique_ptr<Pipeline, PipelineDeleter> Pipeline::splitForSharded() { - invariant(!isSplitForShards()); - invariant(!isSplitForMerge()); - - // Create and initialize the shard spec we'll return. We start with an empty pipeline on the - // shards and all work being done in the merger. Optimizations can move operations between - // the pipelines to be more efficient. - std::unique_ptr<Pipeline, PipelineDeleter> shardPipeline(new Pipeline(pCtx), - PipelineDeleter(pCtx->opCtx)); - - cluster_aggregation_planner::performSplitPipelineOptimizations(shardPipeline.get(), this); - shardPipeline->_splitState = SplitState::kSplitForShards; - _splitState = SplitState::kSplitForMerge; - - stitch(); - - return shardPipeline; -} - BSONObj Pipeline::getInitialQuery() const { if (_sources.empty()) return BSONObj(); @@ -399,12 +379,13 @@ bool Pipeline::canRunOnMongos() const { } bool Pipeline::requiredToRunOnMongos() const { - invariant(!isSplitForShards()); + invariant(_splitState != SplitState::kSplitForShards); for (auto&& stage : _sources) { // If this pipeline is capable of splitting before the mongoS-only stage, then the pipeline // as a whole is not required to run on mongoS. - if (isUnsplit() && dynamic_cast<NeedsMergerDocumentSource*>(stage.get())) { + if (_splitState == SplitState::kUnsplit && + dynamic_cast<NeedsMergerDocumentSource*>(stage.get())) { return false; } diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h index 0bfb2602d2c..d4a2c813060 100644 --- a/src/mongo/db/pipeline/pipeline.h +++ b/src/mongo/db/pipeline/pipeline.h @@ -48,9 +48,9 @@ namespace mongo { class BSONObj; class BSONObjBuilder; -class ExpressionContext; -class DocumentSource; class CollatorInterface; +class DocumentSource; +class ExpressionContext; class OperationContext; class PipelineDeleter; @@ -172,17 +172,11 @@ public: bool usedDisk(); /** - * Split the current Pipeline into a Pipeline for each shard, and a Pipeline that combines the - * results within mongos. This permanently alters this pipeline for the merging operation, and - * returns a Pipeline object that should be executed on each targeted shard. - */ - std::unique_ptr<Pipeline, PipelineDeleter> splitForSharded(); - - /** - * Returns true if this pipeline has not been split. + * Communicates to the pipeline which part of a split pipeline it is when the pipeline has been + * split in two. */ - bool isUnsplit() const { - return _splitState == SplitState::kUnsplit; + void setSplitState(SplitState state) { + _splitState = state; } /** diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index 9d28060c00d..371f5f9dc89 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -34,6 +34,7 @@ #include "mongo/db/operation_context_noop.h" #include "mongo/db/pipeline/aggregation_context_fixture.h" +#include "mongo/db/pipeline/cluster_aggregation_planner.h" #include "mongo/db/pipeline/dependencies.h" #include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/document_source.h" @@ -1757,13 +1758,17 @@ public: mergePipe = uassertStatusOK(Pipeline::parse(request.getPipeline(), ctx)); mergePipe->optimizePipeline(); - shardPipe = mergePipe->splitForSharded(); - ASSERT(shardPipe); + auto splitPipeline = cluster_aggregation_planner::splitPipeline(std::move(mergePipe)); - ASSERT_VALUE_EQ(Value(shardPipe->writeExplainOps(ExplainOptions::Verbosity::kQueryPlanner)), + ASSERT_VALUE_EQ(Value(splitPipeline.shardsPipeline->writeExplainOps( + ExplainOptions::Verbosity::kQueryPlanner)), Value(shardPipeExpected["pipeline"])); - ASSERT_VALUE_EQ(Value(mergePipe->writeExplainOps(ExplainOptions::Verbosity::kQueryPlanner)), + ASSERT_VALUE_EQ(Value(splitPipeline.mergePipeline->writeExplainOps( + ExplainOptions::Verbosity::kQueryPlanner)), Value(mergePipeExpected["pipeline"])); + + shardPipe = std::move(splitPipeline.shardsPipeline); + mergePipe = std::move(splitPipeline.mergePipeline); } virtual ~Base() {} @@ -1936,7 +1941,7 @@ class ShardedSortMatchProjSkipLimBecomesMatchTopKSortSkipProj : public Base { "]"; } string mergePipeJson() { - return "[{$sort: {sortKey: {a: 1}, mergePresorted: true, limit: 8}}" + return "[{$limit: 8}" ",{$skip: 3}" ",{$project: {_id: true, a: true}}" "]"; @@ -1974,7 +1979,7 @@ class ShardedSortProjLimBecomesTopKSortProj : public Base { "]"; } string mergePipeJson() { - return "[{$sort: {sortKey: {a: 1}, mergePresorted: true, limit: 5}}" + return "[{$limit: 5}" ",{$project: {_id: true, a: true}}" "]"; } @@ -1994,8 +1999,7 @@ class ShardedSortGroupProjLimDoesNotBecomeTopKSortProjGroup : public Base { "]"; } string mergePipeJson() { - return "[{$sort: {sortKey: {a: 1}, mergePresorted: true}}" - ",{$group : {_id: {a: '$a'}}}" + return "[{$group : {_id: {a: '$a'}}}" ",{$project: {_id: true, a: true}}" ",{$limit: 5}" "]"; @@ -2017,7 +2021,7 @@ class ShardedMatchSortProjLimBecomesMatchTopKSortProj : public Base { "]"; } string mergePipeJson() { - return "[{$sort: {sortKey: {a: -1}, mergePresorted: true, limit: 6}}" + return "[{$limit: 6}" ",{$project: {_id: true, a: true}}" "]"; } @@ -2232,14 +2236,14 @@ DEATH_TEST_F(PipelineMustRunOnMongoSTest, // $_internalSplitPipeline. ASSERT_FALSE(pipeline->requiredToRunOnMongos()); - auto shardPipe = pipeline->splitForSharded(); - ASSERT(shardPipe); + auto splitPipeline = cluster_aggregation_planner::splitPipeline(std::move(pipeline)); + ASSERT(splitPipeline.shardsPipeline); + ASSERT(splitPipeline.mergePipeline); - // The merge half of the pipeline must run on mongoS. - ASSERT_TRUE(pipeline->requiredToRunOnMongos()); + ASSERT_TRUE(splitPipeline.mergePipeline->requiredToRunOnMongos()); // Calling 'requiredToRunOnMongos' on the shard pipeline will hit an invariant. - shardPipe->requiredToRunOnMongos(); + splitPipeline.shardsPipeline->requiredToRunOnMongos(); } TEST_F(PipelineMustRunOnMongoSTest, SplitMongoSMergePipelineAssertsIfShardStagePresent) { @@ -2260,12 +2264,12 @@ TEST_F(PipelineMustRunOnMongoSTest, SplitMongoSMergePipelineAssertsIfShardStageP // $_internalSplitPipeline. ASSERT_FALSE(pipeline->requiredToRunOnMongos()); - auto shardPipe = pipeline->splitForSharded(); - ASSERT(shardPipe); + auto splitPipeline = cluster_aggregation_planner::splitPipeline(std::move(pipeline)); // The merge pipeline must run on mongoS, but $out needs to run on the primary shard. - ASSERT_THROWS_CODE( - pipeline->requiredToRunOnMongos(), AssertionException, ErrorCodes::IllegalOperation); + ASSERT_THROWS_CODE(splitPipeline.mergePipeline->requiredToRunOnMongos(), + AssertionException, + ErrorCodes::IllegalOperation); } TEST_F(PipelineMustRunOnMongoSTest, SplittablePipelineAssertsIfMongoSStageOnShardSideOfSplit) { diff --git a/src/mongo/s/commands/SConscript b/src/mongo/s/commands/SConscript index 5c75e96236e..f39ee98cc70 100644 --- a/src/mongo/s/commands/SConscript +++ b/src/mongo/s/commands/SConscript @@ -108,10 +108,10 @@ env.Library( '$BUILD_DIR/mongo/db/auth/saslauth', '$BUILD_DIR/mongo/db/commands/core', '$BUILD_DIR/mongo/db/commands/current_op_common', - '$BUILD_DIR/mongo/db/commands/servers', '$BUILD_DIR/mongo/db/commands/feature_compatibility_parsers', '$BUILD_DIR/mongo/db/commands/kill_common', '$BUILD_DIR/mongo/db/commands/profile_common', + '$BUILD_DIR/mongo/db/commands/servers', '$BUILD_DIR/mongo/db/commands/test_commands_enabled', '$BUILD_DIR/mongo/db/commands/write_commands_common', '$BUILD_DIR/mongo/db/ftdc/ftdc_server', @@ -119,10 +119,12 @@ env.Library( '$BUILD_DIR/mongo/db/logical_session_cache_impl', '$BUILD_DIR/mongo/db/pipeline/aggregation', '$BUILD_DIR/mongo/db/pipeline/mongos_process_interface', + '$BUILD_DIR/mongo/db/pipeline/cluster_aggregation_planner', '$BUILD_DIR/mongo/db/stats/counters', '$BUILD_DIR/mongo/db/views/views', '$BUILD_DIR/mongo/executor/async_multicaster', '$BUILD_DIR/mongo/rpc/client_metadata', + '$BUILD_DIR/mongo/s/query/cluster_client_cursor', '$BUILD_DIR/mongo/s/sharding_api', '$BUILD_DIR/mongo/s/sharding_legacy_api', '$BUILD_DIR/mongo/s/transaction/router_session', diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp index d06c5426555..fbf2446467b 100644 --- a/src/mongo/s/commands/cluster_aggregate.cpp +++ b/src/mongo/s/commands/cluster_aggregate.cpp @@ -1,30 +1,30 @@ /** -* Copyright (C) 2016 MongoDB Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand @@ -66,7 +66,7 @@ #include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/s/query/cluster_query_knobs.h" #include "mongo/s/query/establish_cursors.h" -#include "mongo/s/query/router_stage_update_on_add_shard.h" +#include "mongo/s/query/router_stage_pipeline.h" #include "mongo/s/query/store_possible_cursor.h" #include "mongo/s/stale_exception.h" #include "mongo/util/fail_point.h" @@ -75,6 +75,8 @@ namespace mongo { +using SplitPipeline = cluster_aggregation_planner::SplitPipeline; + MONGO_FAIL_POINT_DEFINE(clusterAggregateHangBeforeEstablishingShardCursors); namespace { @@ -104,38 +106,6 @@ Document wrapAggAsExplain(Document aggregateCommand, ExplainOptions::Verbosity v return explainCommandBuilder.freeze(); } -Status appendExplainResults( - const std::vector<AsyncRequestsSender::Response>& shardResults, - const boost::intrusive_ptr<ExpressionContext>& mergeCtx, - const std::unique_ptr<Pipeline, PipelineDeleter>& pipelineForTargetedShards, - const std::unique_ptr<Pipeline, PipelineDeleter>& pipelineForMerging, - BSONObjBuilder* result) { - if (pipelineForTargetedShards->isSplitForShards()) { - *result << "mergeType" - << (pipelineForMerging->canRunOnMongos() - ? "mongos" - : pipelineForMerging->needsPrimaryShardMerger() ? "primaryShard" - : "anyShard") - << "splitPipeline" - << Document{ - {"shardsPart", - pipelineForTargetedShards->writeExplainOps(*mergeCtx->explain)}, - {"mergerPart", pipelineForMerging->writeExplainOps(*mergeCtx->explain)}}; - } else { - *result << "splitPipeline" << BSONNULL; - } - - BSONObjBuilder shardExplains(result->subobjStart("shards")); - for (const auto& shardResult : shardResults) { - invariant(shardResult.shardHostAndPort); - shardExplains.append(shardResult.shardId.toString(), - BSON("host" << shardResult.shardHostAndPort->toString() << "stages" - << shardResult.swResponse.getValue().data["stages"])); - } - - return Status::OK(); -} - Status appendCursorResponseToCommandResult(const ShardId& shardId, const BSONObj cursorResponse, BSONObjBuilder* result) { @@ -246,11 +216,10 @@ BSONObj createCommandForTargetedShards( return appendAllowImplicitCreate(cmdObj, true); } -BSONObj createCommandForMergingShard( - const AggregationRequest& request, - const boost::intrusive_ptr<ExpressionContext>& mergeCtx, - const BSONObj originalCmdObj, - const std::unique_ptr<Pipeline, PipelineDeleter>& pipelineForMerging) { +BSONObj createCommandForMergingShard(const AggregationRequest& request, + const boost::intrusive_ptr<ExpressionContext>& mergeCtx, + const BSONObj originalCmdObj, + const Pipeline* pipelineForMerging) { MutableDocument mergeCmd(request.serializeToCommandObj()); mergeCmd["pipeline"] = Value(pipelineForMerging->serialize()); @@ -339,12 +308,11 @@ struct DispatchShardPipelineResults { // Populated if this *is* an explain, this vector represents the results from each shard. std::vector<AsyncRequestsSender::Response> remoteExplainOutput; - // The half of the pipeline that was sent to each shard, or the entire pipeline if there was - // only one shard targeted. - std::unique_ptr<Pipeline, PipelineDeleter> pipelineForTargetedShards; + // The split version of the pipeline if more than one shard was targeted, otherwise boost::none. + boost::optional<SplitPipeline> splitPipeline; - // The merging half of the pipeline if more than one shard was targeted, otherwise nullptr. - std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging; + // If the pipeline targeted a single shard, this is the pipeline to run on that shard. + std::unique_ptr<Pipeline, PipelineDeleter> pipelineForSingleShard; // The command object to send to the targeted shards. BSONObj commandForTargetedShards; @@ -380,8 +348,7 @@ DispatchShardPipelineResults dispatchShardPipeline( const auto shardQuery = pipeline->getInitialQuery(); - auto pipelineForTargetedShards = std::move(pipeline); - std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging; + boost::optional<SplitPipeline> splitPipeline; auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, executionNss); @@ -415,13 +382,19 @@ DispatchShardPipelineResults dispatchShardPipeline( *(shardIds.begin()) != executionNsRoutingInfo->db().primaryId())); if (needsSplit) { - pipelineForMerging = std::move(pipelineForTargetedShards); - pipelineForTargetedShards = pipelineForMerging->splitForSharded(); + splitPipeline = cluster_aggregation_planner::splitPipeline(std::move(pipeline)); } // Generate the command object for the targeted shards. - BSONObj targetedCommand = createCommandForTargetedShards( - opCtx, aggRequest, originalCmdObj, pipelineForTargetedShards, collationObj, atClusterTime); + BSONObj targetedCommand = splitPipeline + ? createCommandForTargetedShards(opCtx, + aggRequest, + originalCmdObj, + splitPipeline->shardsPipeline, + collationObj, + atClusterTime) + : createCommandForTargetedShards( + opCtx, aggRequest, originalCmdObj, pipeline, collationObj, atClusterTime); // Refresh the shard registry if we're targeting all shards. We need the shard registry // to be at least as current as the logical time used when creating the command for @@ -480,11 +453,40 @@ DispatchShardPipelineResults dispatchShardPipeline( return DispatchShardPipelineResults{needsPrimaryShardMerge, std::move(cursors), std::move(shardResults), - std::move(pipelineForTargetedShards), - std::move(pipelineForMerging), + std::move(splitPipeline), + std::move(pipeline), targetedCommand}; } +Status appendExplainResults(DispatchShardPipelineResults&& dispatchResults, + const boost::intrusive_ptr<ExpressionContext>& mergeCtx, + BSONObjBuilder* result) { + if (dispatchResults.splitPipeline) { + auto* mergePipeline = dispatchResults.splitPipeline->mergePipeline.get(); + *result << "mergeType" + << (mergePipeline->canRunOnMongos() + ? "mongos" + : mergePipeline->needsPrimaryShardMerger() ? "primaryShard" : "anyShard") + << "splitPipeline" + << Document{{"shardsPart", + dispatchResults.splitPipeline->shardsPipeline->writeExplainOps( + *mergeCtx->explain)}, + {"mergerPart", mergePipeline->writeExplainOps(*mergeCtx->explain)}}; + } else { + *result << "splitPipeline" << BSONNULL; + } + + BSONObjBuilder shardExplains(result->subobjStart("shards")); + for (const auto& shardResult : dispatchResults.remoteExplainOutput) { + invariant(shardResult.shardHostAndPort); + shardExplains.append(shardResult.shardId.toString(), + BSON("host" << shardResult.shardHostAndPort->toString() << "stages" + << shardResult.swResponse.getValue().data["stages"])); + } + + return Status::OK(); +} + Shard::CommandResponse establishMergingShardCursor(OperationContext* opCtx, const NamespaceString& nss, const BSONObj mergeCmdObj, @@ -500,20 +502,18 @@ Shard::CommandResponse establishMergingShardCursor(OperationContext* opCtx, Shard::RetryPolicy::kIdempotent)); } -BSONObj establishMergingMongosCursor(OperationContext* opCtx, - const AggregationRequest& request, - const NamespaceString& requestedNss, - BSONObj cmdToRunOnNewShards, - const LiteParsedPipeline& liteParsedPipeline, - std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging, - std::vector<RemoteCursor> cursors) { +BSONObj establishMergingMongosCursor( + OperationContext* opCtx, + const AggregationRequest& request, + const NamespaceString& requestedNss, + BSONObj cmdToRunOnNewShards, + const LiteParsedPipeline& liteParsedPipeline, + std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging) { ClusterClientCursorParams params(requestedNss, ReadPreferenceSetting::get(opCtx)); params.originatingCommandObj = CurOp::get(opCtx)->opDescription().getOwned(); params.tailableMode = pipelineForMerging->getContext()->tailableMode; - params.mergePipeline = std::move(pipelineForMerging); - params.remotes = std::move(cursors); // A batch size of 0 is legal for the initial aggregate, but not valid for getMores, the batch // size we pass here is used for getMores, so do not specify a batch size if the initial request // had a batch size of 0. @@ -523,25 +523,8 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx, params.lsid = opCtx->getLogicalSessionId(); params.txnNumber = opCtx->getTxnNumber(); - if (liteParsedPipeline.hasChangeStream()) { - // For change streams, we need to set up a custom stage to establish cursors on new shards - // when they are added. Be careful to extract the targeted shard IDs before the remote - // cursors are transferred from the ClusterClientCursorParams to the AsyncResultsMerger. - std::vector<ShardId> shardIds; - for (const auto& remote : params.remotes) { - shardIds.emplace_back(remote.getShardId().toString()); - } - - params.createCustomCursorSource = [cmdToRunOnNewShards, - shardIds](OperationContext* opCtx, - executor::TaskExecutor* executor, - ClusterClientCursorParams* params) { - return stdx::make_unique<RouterStageUpdateOnAddShard>( - opCtx, executor, params, std::move(shardIds), cmdToRunOnNewShards); - }; - } - auto ccc = ClusterClientCursorImpl::make( - opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), std::move(params)); + auto ccc = cluster_aggregation_planner::buildClusterCursor( + opCtx, std::move(pipelineForMerging), std::move(params)); auto cursorState = ClusterCursorManager::CursorState::NotExhausted; @@ -726,17 +709,15 @@ std::pair<BSONObj, boost::optional<UUID>> getCollationAndUUID( } ShardId pickMergingShard(OperationContext* opCtx, - const DispatchShardPipelineResults& dispatchResults, + bool needsPrimaryShardMerge, + const std::vector<ShardId>& targetedShards, ShardId primaryShard) { auto& prng = opCtx->getClient()->getPrng(); // If we cannot merge on mongoS, establish the merge cursor on a shard. Perform the merging // command on random shard, unless the pipeline dictates that it needs to be run on the primary // shard for the database. - return dispatchResults.needsPrimaryShardMerge - ? primaryShard - : dispatchResults.remoteCursors[prng.nextInt32(dispatchResults.remoteCursors.size())] - .getShardId() - .toString(); + return needsPrimaryShardMerge ? primaryShard + : targetedShards[prng.nextInt32(targetedShards.size())]; } // "Resolve" involved namespaces and verify that none of them are sharded unless allowed by the @@ -798,7 +779,6 @@ Status runPipelineOnMongoS(const boost::intrusive_ptr<ExpressionContext>& expCtx BSONObj cmdObj, const LiteParsedPipeline& litePipe, std::unique_ptr<Pipeline, PipelineDeleter> pipeline, - std::vector<RemoteCursor>&& cursors, BSONObjBuilder* result) { // We should never receive a pipeline intended for the shards, or which cannot run on mongoS. invariant(!pipeline->isSplitForShards()); @@ -807,8 +787,7 @@ Status runPipelineOnMongoS(const boost::intrusive_ptr<ExpressionContext>& expCtx const auto& requestedNss = namespaces.requestedNss; const auto opCtx = expCtx->opCtx; - // If this is an unsplit mongoS-only pipeline, verify that the first stage can produce input for - // the remainder of the pipeline. + // Verify that the first stage can produce input for the remainder of the pipeline. uassert(ErrorCodes::IllegalOperation, str::stream() << "Aggregation pipeline must be run on mongoS, but " << pipeline->getSources().front()->getSourceName() @@ -816,7 +795,6 @@ Status runPipelineOnMongoS(const boost::intrusive_ptr<ExpressionContext>& expCtx pipeline->isSplitForMerge() || !pipeline->getSources().front()->constraints().requiresInputDocSource); - // If this is an explain and the pipeline is not split, write the explain output and return. if (expCtx->explain && !pipeline->isSplitForMerge()) { *result << "splitPipeline" << BSONNULL << "mongos" << Document{{"host", getHostNameCachedAndPort()}, @@ -826,7 +804,7 @@ Status runPipelineOnMongoS(const boost::intrusive_ptr<ExpressionContext>& expCtx // Register the new mongoS cursor, and retrieve the initial batch of results. auto cursorResponse = establishMergingMongosCursor( - opCtx, request, requestedNss, cmdObj, litePipe, std::move(pipeline), std::move(cursors)); + opCtx, request, requestedNss, cmdObj, litePipe, std::move(pipeline)); // We don't need to storePossibleCursor or propagate writeConcern errors; an $out pipeline // can never run on mongoS. Filter the command response and return immediately. @@ -840,25 +818,38 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& ex BSONObj cmdObj, const LiteParsedPipeline& litePipe, const boost::optional<CachedCollectionRoutingInfo>& routingInfo, - DispatchShardPipelineResults& shardDispatchResults, + DispatchShardPipelineResults&& shardDispatchResults, BSONObjBuilder* result) { // We should never be in a situation where we call this function on a non-merge pipeline. - auto& mergingPipeline = shardDispatchResults.pipelineForMerging; - invariant(mergingPipeline && mergingPipeline->isSplitForMerge()); + invariant(shardDispatchResults.splitPipeline); + auto* mergePipeline = shardDispatchResults.splitPipeline->mergePipeline.get(); + auto* opCtx = expCtx->opCtx; + + std::vector<ShardId> targetedShards; + targetedShards.reserve(shardDispatchResults.remoteCursors.size()); + for (auto&& remoteCursor : shardDispatchResults.remoteCursors) { + targetedShards.emplace_back(remoteCursor.getShardId().toString()); + } - const auto opCtx = expCtx->opCtx; + cluster_aggregation_planner::addMergeCursorsSource( + mergePipeline, + litePipe, + shardDispatchResults.commandForTargetedShards, + std::move(shardDispatchResults.remoteCursors), + targetedShards, + shardDispatchResults.splitPipeline->shardCursorsSortSpec, + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor()); // First, check whether we can merge on the mongoS. If the merge pipeline MUST run on mongoS, // then ignore the internalQueryProhibitMergingOnMongoS parameter. - if (mergingPipeline->requiredToRunOnMongos() || - (!internalQueryProhibitMergingOnMongoS.load() && mergingPipeline->canRunOnMongos())) { + if (mergePipeline->requiredToRunOnMongos() || + (!internalQueryProhibitMergingOnMongoS.load() && mergePipeline->canRunOnMongos())) { return runPipelineOnMongoS(expCtx, namespaces, request, shardDispatchResults.commandForTargetedShards, litePipe, - std::move(mergingPipeline), - std::move(shardDispatchResults.remoteCursors), + std::move(shardDispatchResults.splitPipeline->mergePipeline), result); } @@ -873,15 +864,12 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& ex "merging on a shard", !opCtx->getTxnNumber()); - ShardId mergingShardId = - pickMergingShard(opCtx, shardDispatchResults, routingInfo->db().primaryId()); - - cluster_aggregation_planner::addMergeCursorsSource( - mergingPipeline.get(), - std::move(shardDispatchResults.remoteCursors), - Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor()); + ShardId mergingShardId = pickMergingShard(opCtx, + shardDispatchResults.needsPrimaryShardMerge, + targetedShards, + routingInfo->db().primaryId()); - auto mergeCmdObj = createCommandForMergingShard(request, expCtx, cmdObj, mergingPipeline); + auto mergeCmdObj = createCommandForMergingShard(request, expCtx, cmdObj, mergePipeline); // Dispatch $mergeCursors to the chosen shard, store the resulting cursor, and return. auto mergeResponse = @@ -964,7 +952,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, // Check whether the entire pipeline must be run on mongoS. if (pipeline->requiredToRunOnMongos()) { return runPipelineOnMongoS( - expCtx, namespaces, request, cmdObj, litePipe, std::move(pipeline), {}, result); + expCtx, namespaces, request, cmdObj, litePipe, std::move(pipeline), result); } // If not, split the pipeline as necessary and dispatch to the relevant shards. @@ -980,18 +968,14 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, // write the results to the output builder, and return immediately. if (expCtx->explain) { uassertAllShardsSupportExplain(shardDispatchResults.remoteExplainOutput); - return appendExplainResults(std::move(shardDispatchResults.remoteExplainOutput), - expCtx, - shardDispatchResults.pipelineForTargetedShards, - shardDispatchResults.pipelineForMerging, - result); + return appendExplainResults(std::move(shardDispatchResults), expCtx, result); } // If this isn't an explain, then we must have established cursors on at least one shard. invariant(shardDispatchResults.remoteCursors.size() > 0); // If we sent the entire pipeline to a single shard, store the remote cursor and return. - if (!shardDispatchResults.pipelineForTargetedShards->isSplitForShards()) { + if (!shardDispatchResults.splitPipeline) { invariant(shardDispatchResults.remoteCursors.size() == 1); auto& remoteCursor = shardDispatchResults.remoteCursors.front(); const auto reply = uassertStatusOK(storePossibleCursor( @@ -1001,8 +985,14 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, } // If we reach here, we have a merge pipeline to dispatch. - return dispatchMergingPipeline( - expCtx, namespaces, request, cmdObj, litePipe, routingInfo, shardDispatchResults, result); + return dispatchMergingPipeline(expCtx, + namespaces, + request, + cmdObj, + litePipe, + routingInfo, + std::move(shardDispatchResults), + result); } void ClusterAggregate::uassertAllShardsSupportExplain( diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript index f153381d12c..ae0c24ad21e 100644 --- a/src/mongo/s/query/SConscript +++ b/src/mongo/s/query/SConscript @@ -40,7 +40,6 @@ env.Library( "cluster_client_cursor_impl.cpp", ], LIBDEPS=[ - "$BUILD_DIR/mongo/db/pipeline/pipeline", "router_exec_stage", ], ) @@ -48,20 +47,14 @@ env.Library( env.Library( target="router_exec_stage", source=[ - "document_source_router_adapter.cpp", "router_stage_limit.cpp", - "router_stage_merge.cpp", "router_stage_mock.cpp", "router_stage_pipeline.cpp", "router_stage_remove_metadata_fields.cpp", "router_stage_skip.cpp", - "router_stage_update_on_add_shard.cpp", ], LIBDEPS=[ - "$BUILD_DIR/mongo/db/query/query_common", "async_results_merger", - ], - LIBDEPS_PRIVATE=[ "$BUILD_DIR/mongo/db/pipeline/pipeline", ], ) @@ -83,11 +76,13 @@ env.Library( target="async_results_merger", source=[ "async_results_merger.cpp", + "blocking_results_merger.cpp", "establish_cursors.cpp", env.Idlc('async_results_merger_params.idl')[0], ], LIBDEPS=[ "$BUILD_DIR/mongo/db/query/command_request_response", + "$BUILD_DIR/mongo/db/query/query_common", "$BUILD_DIR/mongo/executor/task_executor_interface", "$BUILD_DIR/mongo/s/async_requests_sender", "$BUILD_DIR/mongo/s/client/sharding_client", @@ -106,9 +101,11 @@ env.Library( ) env.CppUnitTest( - target="async_results_merger_test", + target="results_merger_test", source=[ + "blocking_results_merger_test.cpp", "async_results_merger_test.cpp", + "results_merger_test_fixture.cpp", ], LIBDEPS=[ 'async_results_merger', diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index f5268ac3408..3cc6756c843 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -88,8 +88,7 @@ AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx, // This strange initialization is to work around the fact that the IDL does not currently // support a default value for an enum. The default tailable mode should be 'kNormal', but // since that is not supported we treat boost::none (unspecified) to mean 'kNormal'. - _tailableMode(params.getTailableMode() ? *params.getTailableMode() - : TailableModeEnum::kNormal), + _tailableMode(params.getTailableMode().value_or(TailableModeEnum::kNormal)), _params(std::move(params)), _mergeQueue(MergingComparator(_remotes, _params.getSort() ? *_params.getSort() : BSONObj(), @@ -116,12 +115,12 @@ AsyncResultsMerger::~AsyncResultsMerger() { invariant(_remotesExhausted(lk) || _lifecycleState == kKillComplete); } -bool AsyncResultsMerger::remotesExhausted() { +bool AsyncResultsMerger::remotesExhausted() const { stdx::lock_guard<stdx::mutex> lk(_mutex); return _remotesExhausted(lk); } -bool AsyncResultsMerger::_remotesExhausted(WithLock) { +bool AsyncResultsMerger::_remotesExhausted(WithLock) const { for (const auto& remote : _remotes) { if (!remote.exhausted()) { return false; @@ -769,36 +768,4 @@ bool AsyncResultsMerger::MergingComparator::operator()(const size_t& lhs, const _sort) > 0; } -void AsyncResultsMerger::blockingKill(OperationContext* opCtx) { - auto killEvent = kill(opCtx); - if (!killEvent) { - // We are shutting down. - return; - } - _executor->waitForEvent(killEvent); -} - -StatusWith<ClusterQueryResult> AsyncResultsMerger::blockingNext() { - while (!ready()) { - auto nextEventStatus = nextEvent(); - if (!nextEventStatus.isOK()) { - return nextEventStatus.getStatus(); - } - auto event = nextEventStatus.getValue(); - - // Block until there are further results to return. - auto status = _executor->waitForEvent(_opCtx, event); - - if (!status.isOK()) { - return status.getStatus(); - } - - // We have not provided a deadline, so if the wait returns without interruption, we do not - // expect to have timed out. - invariant(status.getValue() == stdx::cv_status::no_timeout); - } - - return nextReady(); -} - } // namespace mongo diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h index 5f8a18194d2..488e03d2ee5 100644 --- a/src/mongo/s/query/async_results_merger.h +++ b/src/mongo/s/query/async_results_merger.h @@ -109,7 +109,7 @@ public: /** * Returns true if all of the remote cursors are exhausted. */ - bool remotesExhausted(); + bool remotesExhausted() const; /** * Sets the maxTimeMS value that the ARM should forward with any internally issued getMore @@ -167,12 +167,6 @@ public: StatusWith<ClusterQueryResult> nextReady(); /** - * Blocks until the next result is ready, all remote cursors are exhausted, or there is an - * error. - */ - StatusWith<ClusterQueryResult> blockingNext(); - - /** * Schedules remote work as required in order to make further results available. If there is an * error in scheduling this work, returns a non-ok status. On success, returns an event handle. * The caller can pass this event handle to 'executor' in order to be blocked until further @@ -238,11 +232,6 @@ public: */ executor::TaskExecutor::EventHandle kill(OperationContext* opCtx); - /** - * A blocking version of kill() that will not return until this is safe to destroy. - */ - void blockingKill(OperationContext*); - private: /** * We instantiate one of these per remote host. It contains the buffer of results we've @@ -346,7 +335,7 @@ private: /** * Checks whether or not the remote cursors are all exhausted. */ - bool _remotesExhausted(WithLock); + bool _remotesExhausted(WithLock) const; // // Helpers for ready(). @@ -433,7 +422,7 @@ private: AsyncResultsMergerParams _params; // Must be acquired before accessing any data members (other than _params, which is read-only). - stdx::mutex _mutex; + mutable stdx::mutex _mutex; // Data tracking the state of our communication with each of the remote nodes. std::vector<RemoteCursorData> _remotes; diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp index 7960d22f018..b852cf33f79 100644 --- a/src/mongo/s/query/async_results_merger_test.cpp +++ b/src/mongo/s/query/async_results_merger_test.cpp @@ -30,18 +30,13 @@ #include "mongo/s/query/async_results_merger.h" -#include "mongo/client/remote_command_targeter_factory_mock.h" -#include "mongo/client/remote_command_targeter_mock.h" #include "mongo/db/json.h" #include "mongo/db/query/cursor_response.h" #include "mongo/db/query/getmore_request.h" #include "mongo/db/query/query_request.h" -#include "mongo/executor/network_interface_mock.h" #include "mongo/executor/task_executor.h" -#include "mongo/executor/thread_pool_task_executor_test_fixture.h" -#include "mongo/s/catalog/type_shard.h" #include "mongo/s/client/shard_registry.h" -#include "mongo/s/sharding_router_test_fixture.h" +#include "mongo/s/query/results_merger_test_fixture.h" #include "mongo/stdx/memory.h" #include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" @@ -50,212 +45,11 @@ namespace mongo { namespace { -using executor::NetworkInterfaceMock; -using executor::RemoteCommandRequest; -using executor::RemoteCommandResponse; - -using ResponseStatus = executor::TaskExecutor::ResponseStatus; - -const HostAndPort kTestConfigShardHost = HostAndPort("FakeConfigHost", 12345); -const std::vector<ShardId> kTestShardIds = { - ShardId("FakeShard1"), ShardId("FakeShard2"), ShardId("FakeShard3")}; -const std::vector<HostAndPort> kTestShardHosts = {HostAndPort("FakeShard1Host", 12345), - HostAndPort("FakeShard2Host", 12345), - HostAndPort("FakeShard3Host", 12345)}; - -const NamespaceString kTestNss("testdb.testcoll"); - LogicalSessionId parseSessionIdFromCmd(BSONObj cmdObj) { return LogicalSessionId::parse(IDLParserErrorContext("lsid"), cmdObj["lsid"].Obj()); } -class AsyncResultsMergerTest : public ShardingTestFixture { -public: - AsyncResultsMergerTest() {} - - void setUp() override { - setRemote(HostAndPort("ClientHost", 12345)); - - configTargeter()->setFindHostReturnValue(kTestConfigShardHost); - - std::vector<ShardType> shards; - - for (size_t i = 0; i < kTestShardIds.size(); i++) { - ShardType shardType; - shardType.setName(kTestShardIds[i].toString()); - shardType.setHost(kTestShardHosts[i].toString()); - - shards.push_back(shardType); - - std::unique_ptr<RemoteCommandTargeterMock> targeter( - stdx::make_unique<RemoteCommandTargeterMock>()); - targeter->setConnectionStringReturnValue(ConnectionString(kTestShardHosts[i])); - targeter->setFindHostReturnValue(kTestShardHosts[i]); - - targeterFactory()->addTargeterToReturn(ConnectionString(kTestShardHosts[i]), - std::move(targeter)); - } - - setupShards(shards); - } - -protected: - /** - * Constructs an ARM with the given vector of existing cursors. - * - * If 'findCmd' is not set, the default AsyncResultsMergerParams are used. - * Otherwise, the 'findCmd' is used to construct the AsyncResultsMergerParams. - * - * 'findCmd' should not have a 'batchSize', since the find's batchSize is used just in the - * initial find. The getMore 'batchSize' can be passed in through 'getMoreBatchSize.' - */ - std::unique_ptr<AsyncResultsMerger> makeARMFromExistingCursors( - std::vector<RemoteCursor> remoteCursors, - boost::optional<BSONObj> findCmd = boost::none, - boost::optional<std::int64_t> getMoreBatchSize = boost::none) { - AsyncResultsMergerParams params; - params.setNss(kTestNss); - params.setRemotes(std::move(remoteCursors)); - - - if (findCmd) { - const auto qr = unittest::assertGet( - QueryRequest::makeFromFindCommand(kTestNss, *findCmd, false /* isExplain */)); - if (!qr->getSort().isEmpty()) { - params.setSort(qr->getSort().getOwned()); - } - - if (getMoreBatchSize) { - params.setBatchSize(getMoreBatchSize); - } else { - params.setBatchSize(qr->getBatchSize() - ? boost::optional<std::int64_t>( - static_cast<std::int64_t>(*qr->getBatchSize())) - : boost::none); - } - params.setTailableMode(qr->getTailableMode()); - params.setAllowPartialResults(qr->isAllowPartialResults()); - } - - OperationSessionInfo sessionInfo; - sessionInfo.setSessionId(operationContext()->getLogicalSessionId()); - sessionInfo.setTxnNumber(operationContext()->getTxnNumber()); - params.setOperationSessionInfo(sessionInfo); - - return stdx::make_unique<AsyncResultsMerger>( - operationContext(), executor(), std::move(params)); - } - - /** - * Schedules a "CommandOnShardedViewNotSupportedOnMongod" error response w/ view definition. - */ - void scheduleNetworkViewResponse(const std::string& ns, const std::string& pipelineJsonArr) { - BSONObjBuilder viewDefBob; - viewDefBob.append("ns", ns); - viewDefBob.append("pipeline", fromjson(pipelineJsonArr)); - - BSONObjBuilder bob; - bob.append("resolvedView", viewDefBob.obj()); - bob.append("ok", 0.0); - bob.append("errmsg", "Command on view must be executed by mongos"); - bob.append("code", 169); - - std::vector<BSONObj> batch = {bob.obj()}; - scheduleNetworkResponseObjs(batch); - } - - /** - * Schedules a list of cursor responses to be returned by the mock network. - */ - void scheduleNetworkResponses(std::vector<CursorResponse> responses) { - std::vector<BSONObj> objs; - for (const auto& cursorResponse : responses) { - // For tests of the AsyncResultsMerger, all CursorRepsonses scheduled by the tests are - // subsequent responses, since the AsyncResultsMerger will only ever run getMores. - objs.push_back(cursorResponse.toBSON(CursorResponse::ResponseType::SubsequentResponse)); - } - scheduleNetworkResponseObjs(objs); - } - - /** - * Schedules a list of raw BSON command responses to be returned by the mock network. - */ - void scheduleNetworkResponseObjs(std::vector<BSONObj> objs) { - executor::NetworkInterfaceMock* net = network(); - net->enterNetwork(); - for (const auto& obj : objs) { - ASSERT_TRUE(net->hasReadyRequests()); - Milliseconds millis(0); - RemoteCommandResponse response(obj, millis); - executor::TaskExecutor::ResponseStatus responseStatus(response); - net->scheduleResponse(net->getNextReadyRequest(), net->now(), responseStatus); - } - net->runReadyNetworkOperations(); - net->exitNetwork(); - } - - RemoteCommandRequest getNthPendingRequest(size_t n) { - executor::NetworkInterfaceMock* net = network(); - net->enterNetwork(); - ASSERT_TRUE(net->hasReadyRequests()); - NetworkInterfaceMock::NetworkOperationIterator noi = net->getNthUnscheduledRequest(n); - RemoteCommandRequest retRequest = noi->getRequest(); - net->exitNetwork(); - return retRequest; - } - - bool networkHasReadyRequests() { - NetworkInterfaceMock::InNetworkGuard guard(network()); - return guard->hasReadyRequests(); - } - - void scheduleErrorResponse(ResponseStatus rs) { - invariant(!rs.isOK()); - rs.elapsedMillis = Milliseconds(0); - executor::NetworkInterfaceMock* net = network(); - net->enterNetwork(); - ASSERT_TRUE(net->hasReadyRequests()); - net->scheduleResponse(net->getNextReadyRequest(), net->now(), rs); - net->runReadyNetworkOperations(); - net->exitNetwork(); - } - - void runReadyCallbacks() { - executor::NetworkInterfaceMock* net = network(); - net->enterNetwork(); - net->runReadyNetworkOperations(); - net->exitNetwork(); - } - - void blackHoleNextRequest() { - executor::NetworkInterfaceMock* net = network(); - net->enterNetwork(); - ASSERT_TRUE(net->hasReadyRequests()); - net->blackHole(net->getNextReadyRequest()); - net->exitNetwork(); - } -}; - -void assertKillCusorsCmdHasCursorId(const BSONObj& killCmd, CursorId cursorId) { - ASSERT_TRUE(killCmd.hasElement("killCursors")); - ASSERT_EQ(killCmd["cursors"].type(), BSONType::Array); - - size_t numCursors = 0; - for (auto&& cursor : killCmd["cursors"].Obj()) { - ASSERT_EQ(cursor.type(), BSONType::NumberLong); - ASSERT_EQ(cursor.numberLong(), cursorId); - ++numCursors; - } - ASSERT_EQ(numCursors, 1u); -} - -RemoteCursor makeRemoteCursor(ShardId shardId, HostAndPort host, CursorResponse response) { - RemoteCursor remoteCursor; - remoteCursor.setShardId(std::move(shardId)); - remoteCursor.setHostAndPort(std::move(host)); - remoteCursor.setCursorResponse(std::move(response)); - return remoteCursor; -} +using AsyncResultsMergerTest = ResultsMergerTestFixture; TEST_F(AsyncResultsMergerTest, SingleShardUnsorted) { std::vector<RemoteCursor> cursors; @@ -1888,76 +1682,6 @@ TEST_F(AsyncResultsMergerTest, KillShouldNotWaitForRemoteCommandsBeforeSchedulin executor()->waitForEvent(killEvent); } -TEST_F(AsyncResultsMergerTest, ShouldBeAbleToBlockUntilNextResultIsReady) { - std::vector<RemoteCursor> cursors; - cursors.emplace_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors)); - - // Before any requests are scheduled, ARM is not ready to return results. - ASSERT_FALSE(arm->ready()); - ASSERT_FALSE(arm->remotesExhausted()); - - // Issue a blocking wait for the next result asynchronously on a different thread. - auto future = launchAsync([&]() { - auto next = unittest::assertGet(arm->blockingNext()); - ASSERT_FALSE(next.isEOF()); - ASSERT_BSONOBJ_EQ(*next.getResult(), BSON("x" << 1)); - next = unittest::assertGet(arm->blockingNext()); - ASSERT_TRUE(next.isEOF()); - }); - - // Schedule the response to the getMore which will return the next result and mark the cursor as - // exhausted. - onCommand([&](const auto& request) { - ASSERT(request.cmdObj["getMore"]); - return CursorResponse(kTestNss, 0LL, {BSON("x" << 1)}) - .toBSON(CursorResponse::ResponseType::SubsequentResponse); - }); - - future.timed_get(kFutureTimeout); -} - -TEST_F(AsyncResultsMergerTest, ShouldBeInterruptableDuringBlockingNext) { - std::vector<RemoteCursor> cursors; - cursors.emplace_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors)); - - // Issue a blocking wait for the next result asynchronously on a different thread. - auto future = launchAsync([&]() { - auto nextStatus = arm->blockingNext(); - ASSERT_EQ(nextStatus.getStatus(), ErrorCodes::Interrupted); - }); - - // Now mark the OperationContext as killed from this thread. - { - stdx::lock_guard<Client> lk(*operationContext()->getClient()); - operationContext()->markKilled(ErrorCodes::Interrupted); - } - future.timed_get(kFutureTimeout); - // Be careful not to use a blocking kill here, since the main thread is in charge of running the - // callbacks, and we'd block on ourselves. - auto killEvent = arm->kill(operationContext()); - - assertKillCusorsCmdHasCursorId(getNthPendingRequest(0u).cmdObj, 1); - runReadyCallbacks(); - executor()->waitForEvent(killEvent); -} - -TEST_F(AsyncResultsMergerTest, ShouldBeAbleToBlockUntilKilled) { - std::vector<RemoteCursor> cursors; - cursors.emplace_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors)); - - // Before any requests are scheduled, ARM is not ready to return results. - ASSERT_FALSE(arm->ready()); - ASSERT_FALSE(arm->remotesExhausted()); - - arm->blockingKill(operationContext()); -} - TEST_F(AsyncResultsMergerTest, GetMoresShouldNotIncludeLSIDOrTxnNumberIfNoneSpecified) { std::vector<RemoteCursor> cursors; cursors.emplace_back( diff --git a/src/mongo/s/query/blocking_results_merger.cpp b/src/mongo/s/query/blocking_results_merger.cpp new file mode 100644 index 00000000000..f5ba2af0bf6 --- /dev/null +++ b/src/mongo/s/query/blocking_results_merger.cpp @@ -0,0 +1,140 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/query/find_common.h" +#include "mongo/s/query/blocking_results_merger.h" + +namespace mongo { + +BlockingResultsMerger::BlockingResultsMerger(OperationContext* opCtx, + AsyncResultsMergerParams&& armParams, + executor::TaskExecutor* executor) + : _tailableMode(armParams.getTailableMode().value_or(TailableModeEnum::kNormal)), + _executor(executor), + _arm(opCtx, executor, std::move(armParams)) {} + +StatusWith<ClusterQueryResult> BlockingResultsMerger::awaitNextWithTimeout( + OperationContext* opCtx, RouterExecStage::ExecContext execCtx) { + invariant(_tailableMode == TailableModeEnum::kTailableAndAwaitData); + // If we are in kInitialFind or kGetMoreWithAtLeastOneResultInBatch context and the ARM is not + // ready, we don't block. Fall straight through to the return statement. + while (!_arm.ready() && execCtx == RouterExecStage::ExecContext::kGetMoreNoResultsYet) { + auto nextEventStatus = getNextEvent(); + if (!nextEventStatus.isOK()) { + return nextEventStatus.getStatus(); + } + auto event = nextEventStatus.getValue(); + + // Block until there are further results to return, or our time limit is exceeded. + auto waitStatus = + _executor->waitForEvent(opCtx, event, awaitDataState(opCtx).waitForInsertsDeadline); + + if (!waitStatus.isOK()) { + return waitStatus.getStatus(); + } + // Swallow timeout errors for tailable awaitData cursors, stash the event that we were + // waiting on, and return EOF. + if (waitStatus == stdx::cv_status::timeout) { + _leftoverEventFromLastTimeout = std::move(event); + return ClusterQueryResult{}; + } + } + + // We reach this point either if the ARM is ready, or if the ARM is !ready and we are in + // kInitialFind or kGetMoreWithAtLeastOneResultInBatch ExecContext. In the latter case, we + // return EOF immediately rather than blocking for further results. + return _arm.ready() ? _arm.nextReady() : ClusterQueryResult{}; +} + +StatusWith<ClusterQueryResult> BlockingResultsMerger::blockUntilNext(OperationContext* opCtx) { + while (!_arm.ready()) { + auto nextEventStatus = _arm.nextEvent(); + if (!nextEventStatus.isOK()) { + return nextEventStatus.getStatus(); + } + auto event = nextEventStatus.getValue(); + + // Block until there are further results to return. + auto status = _executor->waitForEvent(opCtx, event); + + if (!status.isOK()) { + return status.getStatus(); + } + + // We have not provided a deadline, so if the wait returns without interruption, we do not + // expect to have timed out. + invariant(status.getValue() == stdx::cv_status::no_timeout); + } + + return _arm.nextReady(); +} +StatusWith<ClusterQueryResult> BlockingResultsMerger::next(OperationContext* opCtx, + RouterExecStage::ExecContext execCtx) { + // Non-tailable and tailable non-awaitData cursors always block until ready(). AwaitData + // cursors wait for ready() only until a specified time limit is exceeded. + return (_tailableMode == TailableModeEnum::kTailableAndAwaitData + ? awaitNextWithTimeout(opCtx, execCtx) + : blockUntilNext(opCtx)); +} + +StatusWith<executor::TaskExecutor::EventHandle> BlockingResultsMerger::getNextEvent() { + // If we abandoned a previous event due to a mongoS-side timeout, wait for it first. + if (_leftoverEventFromLastTimeout) { + invariant(_tailableMode == TailableModeEnum::kTailableAndAwaitData); + // If we have an outstanding event from last time, then we might have to manually schedule + // some getMores for the cursors. If a remote response came back while we were between + // getMores (from the user to mongos), the response may have been an empty batch, and the + // ARM would not be able to ask for the next batch immediately since it is not attached to + // an OperationContext. Now that we have a valid OperationContext, we schedule the getMores + // ourselves. + Status getMoreStatus = _arm.scheduleGetMores(); + if (!getMoreStatus.isOK()) { + return getMoreStatus; + } + + // Return the leftover event and clear '_leftoverEventFromLastTimeout'. + auto event = _leftoverEventFromLastTimeout; + _leftoverEventFromLastTimeout = executor::TaskExecutor::EventHandle(); + return event; + } + + return _arm.nextEvent(); +} + +void BlockingResultsMerger::kill(OperationContext* opCtx) { + auto killEvent = _arm.kill(opCtx); + if (!killEvent) { + // We are shutting down. + return; + } + _executor->waitForEvent(killEvent); +} + +} // namespace mongo diff --git a/src/mongo/s/query/blocking_results_merger.h b/src/mongo/s/query/blocking_results_merger.h new file mode 100644 index 00000000000..cbc96cbbfc0 --- /dev/null +++ b/src/mongo/s/query/blocking_results_merger.h @@ -0,0 +1,113 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/s/query/async_results_merger.h" +#include "mongo/s/query/router_exec_stage.h" + +namespace mongo { + +/** + * Layers a simpler blocking interface on top of the AsyncResultsMerger from which this + * BlockingResultsMerger is constructed. + */ +class BlockingResultsMerger { +public: + BlockingResultsMerger(OperationContext* opCtx, + AsyncResultsMergerParams&& arm, + executor::TaskExecutor*); + + /** + * Blocks until the next result is available or an error is detected. + */ + StatusWith<ClusterQueryResult> next(OperationContext*, RouterExecStage::ExecContext); + + Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) { + return _arm.setAwaitDataTimeout(awaitDataTimeout); + } + + void reattachToOperationContext(OperationContext* opCtx) { + _arm.reattachToOperationContext(opCtx); + } + + void detachFromOperationContext() { + _arm.detachFromOperationContext(); + } + + bool remotesExhausted() const { + return _arm.remotesExhausted(); + } + + std::size_t getNumRemotes() const { + return _arm.getNumRemotes(); + } + + void addNewShardCursors(std::vector<RemoteCursor>&& newCursors) { + _arm.addNewShardCursors(std::move(newCursors)); + } + + /** + * Blocks until '_arm' has been killed, which involves cleaning up any remote cursors managed + * by this results merger. + */ + void kill(OperationContext* opCtx); + +private: + /** + * Awaits the next result from the ARM with no time limit. + */ + StatusWith<ClusterQueryResult> blockUntilNext(OperationContext* opCtx); + + /** + * Awaits the next result from the ARM up to the time limit specified on 'opCtx'. If this is the + * user's initial find or we have already obtained at least one result for this batch, this + * method returns EOF immediately rather than blocking. + */ + StatusWith<ClusterQueryResult> awaitNextWithTimeout(OperationContext* opCtx, + RouterExecStage::ExecContext execCtx); + + /** + * Returns the next event to wait upon - either a new event from the ARM, or a valid preceding + * event which we scheduled during the previous call to next(). + */ + StatusWith<executor::TaskExecutor::EventHandle> getNextEvent(); + + TailableModeEnum _tailableMode; + executor::TaskExecutor* _executor; + + // In a case where we have a tailable, awaitData cursor, a call to 'next()' will block waiting + // for an event generated by '_arm', but may time out waiting for this event to be triggered. + // While it's waiting, the time limit for the 'awaitData' piece of the cursor may have been + // exceeded. When this happens, we use '_leftoverEventFromLastTimeout' to remember the old event + // and pick back up waiting for it on the next call to 'next()'. + executor::TaskExecutor::EventHandle _leftoverEventFromLastTimeout; + AsyncResultsMerger _arm; +}; + +} // namespace mongo diff --git a/src/mongo/s/query/blocking_results_merger_test.cpp b/src/mongo/s/query/blocking_results_merger_test.cpp new file mode 100644 index 00000000000..821eda4d8ad --- /dev/null +++ b/src/mongo/s/query/blocking_results_merger_test.cpp @@ -0,0 +1,119 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/s/query/blocking_results_merger.h" +#include "mongo/s/query/results_merger_test_fixture.h" + +namespace mongo { + +namespace { + +using BlockingResultsMergerTest = ResultsMergerTestFixture; + +TEST_F(ResultsMergerTestFixture, ShouldBeAbleToBlockUntilKilled) { + std::vector<RemoteCursor> cursors; + cursors.emplace_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); + BlockingResultsMerger blockingMerger( + operationContext(), makeARMParamsFromExistingCursors(std::move(cursors)), executor()); + + blockingMerger.kill(operationContext()); +} + +TEST_F(ResultsMergerTestFixture, ShouldBeAbleToBlockUntilNextResultIsReady) { + std::vector<RemoteCursor> cursors; + cursors.emplace_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); + BlockingResultsMerger blockingMerger( + operationContext(), makeARMParamsFromExistingCursors(std::move(cursors)), executor()); + + // Issue a blocking wait for the next result asynchronously on a different thread. + auto future = launchAsync([&]() { + auto next = unittest::assertGet( + blockingMerger.next(operationContext(), RouterExecStage::ExecContext::kInitialFind)); + ASSERT_FALSE(next.isEOF()); + ASSERT_BSONOBJ_EQ(*next.getResult(), BSON("x" << 1)); + next = unittest::assertGet( + blockingMerger.next(operationContext(), RouterExecStage::ExecContext::kInitialFind)); + ASSERT_TRUE(next.isEOF()); + }); + + // Schedule the response to the getMore which will return the next result and mark the cursor as + // exhausted. + onCommand([&](const auto& request) { + ASSERT(request.cmdObj["getMore"]); + return CursorResponse(kTestNss, 0LL, {BSON("x" << 1)}) + .toBSON(CursorResponse::ResponseType::SubsequentResponse); + }); + + future.timed_get(kFutureTimeout); +} + +TEST_F(ResultsMergerTestFixture, ShouldBeInterruptableDuringBlockingNext) { + std::vector<RemoteCursor> cursors; + cursors.emplace_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); + auto params = makeARMParamsFromExistingCursors(std::move(cursors)); + params.setTailableMode(TailableModeEnum::kTailableAndAwaitData); + BlockingResultsMerger blockingMerger(operationContext(), std::move(params), executor()); + + // Issue a blocking wait for the next result asynchronously on a different thread. + auto future = launchAsync([&]() { + auto nextStatus = blockingMerger.next(operationContext(), + RouterExecStage::ExecContext::kGetMoreNoResultsYet); + ASSERT_EQ(nextStatus.getStatus(), ErrorCodes::Interrupted); + }); + + // Now mark the OperationContext as killed from this thread. + { + stdx::lock_guard<Client> lk(*operationContext()->getClient()); + operationContext()->markKilled(ErrorCodes::Interrupted); + } + // Wait for the merger to be interrupted. + future.timed_get(kFutureTimeout); + + // Now that we've seen it interrupted, kill it. We have to do this in another thread because + // killing a BlockingResultsMerger involves running a killCursors, and this main thread is in + // charge of scheduling the response to that request. + future = launchAsync([&]() { blockingMerger.kill(operationContext()); }); + while (!networkHasReadyRequests() || !getNthPendingRequest(0u).cmdObj["killCursors"]) { + // Wait for the kill to schedule it's killCursors. It may schedule a getMore first before + // cancelling it, so wait until the pending request is actually a killCursors. + } + assertKillCusorsCmdHasCursorId(getNthPendingRequest(0u).cmdObj, 1); + + // Run the callback for the killCursors. We don't actually inspect the value so we don't have to + // schedule a response. + runReadyCallbacks(); + future.timed_get(kFutureTimeout); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp index 1b3a665df5e..acda45f66f0 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.cpp +++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp @@ -30,14 +30,8 @@ #include "mongo/s/query/cluster_client_cursor_impl.h" -#include "mongo/db/pipeline/cluster_aggregation_planner.h" -#include "mongo/db/pipeline/document_source_limit.h" -#include "mongo/db/pipeline/document_source_skip.h" -#include "mongo/db/pipeline/document_source_sort.h" #include "mongo/s/query/router_stage_limit.h" #include "mongo/s/query/router_stage_merge.h" -#include "mongo/s/query/router_stage_mock.h" -#include "mongo/s/query/router_stage_pipeline.h" #include "mongo/s/query/router_stage_remove_metadata_fields.h" #include "mongo/s/query/router_stage_skip.h" #include "mongo/stdx/memory.h" @@ -70,6 +64,14 @@ ClusterClientCursorGuard ClusterClientCursorImpl::make(OperationContext* opCtx, return ClusterClientCursorGuard(opCtx, std::move(cursor)); } +ClusterClientCursorGuard ClusterClientCursorImpl::make(OperationContext* opCtx, + std::unique_ptr<RouterExecStage> root, + ClusterClientCursorParams&& params) { + std::unique_ptr<ClusterClientCursor> cursor(new ClusterClientCursorImpl( + opCtx, std::move(root), std::move(params), opCtx->getLogicalSessionId())); + return ClusterClientCursorGuard(opCtx, std::move(cursor)); +} + ClusterClientCursorImpl::ClusterClientCursorImpl(OperationContext* opCtx, executor::TaskExecutor* executor, ClusterClientCursorParams&& params, @@ -84,7 +86,7 @@ ClusterClientCursorImpl::ClusterClientCursorImpl(OperationContext* opCtx, } ClusterClientCursorImpl::ClusterClientCursorImpl(OperationContext* opCtx, - std::unique_ptr<RouterStageMock> root, + std::unique_ptr<RouterExecStage> root, ClusterClientCursorParams&& params, boost::optional<LogicalSessionId> lsid) : _params(std::move(params)), _root(std::move(root)), _lsid(lsid), _opCtx(opCtx) { @@ -183,81 +185,13 @@ boost::optional<ReadPreferenceSetting> ClusterClientCursorImpl::getReadPreferenc return _params.readPreference; } -namespace { - -bool isSkipOrLimit(const boost::intrusive_ptr<DocumentSource>& stage) { - return (dynamic_cast<DocumentSourceLimit*>(stage.get()) || - dynamic_cast<DocumentSourceSkip*>(stage.get())); -} - -bool isAllLimitsAndSkips(Pipeline* pipeline) { - const auto stages = pipeline->getSources(); - return std::all_of( - stages.begin(), stages.end(), [&](const auto& stage) { return isSkipOrLimit(stage); }); -} - -/** - * Creates the initial stage to feed data into the execution plan. By default, a RouterExecMerge - * stage, or a custom stage if specified in 'params->creatCustomMerge'. - */ -std::unique_ptr<RouterExecStage> createInitialStage(OperationContext* opCtx, - executor::TaskExecutor* executor, - ClusterClientCursorParams* params) { - if (params->createCustomCursorSource) { - return params->createCustomCursorSource(opCtx, executor, params); - } else { - return stdx::make_unique<RouterStageMerge>(opCtx, executor, params); - } -} - -std::unique_ptr<RouterExecStage> buildPipelinePlan(executor::TaskExecutor* executor, - ClusterClientCursorParams* params) { - invariant(params->mergePipeline); - invariant(!params->skip); - invariant(!params->limit); - auto* pipeline = params->mergePipeline.get(); - auto* opCtx = pipeline->getContext()->opCtx; - - std::unique_ptr<RouterExecStage> root = createInitialStage(opCtx, executor, params); - if (!isAllLimitsAndSkips(pipeline)) { - return stdx::make_unique<RouterStagePipeline>(std::move(root), - std::move(params->mergePipeline)); - } - - // After extracting an optional leading $sort, the pipeline consisted entirely of $skip and - // $limit stages. Avoid creating a RouterStagePipeline (which will go through an expensive - // conversion from BSONObj -> Document for each result), and create a RouterExecStage tree - // instead. - while (!pipeline->getSources().empty()) { - invariant(isSkipOrLimit(pipeline->getSources().front())); - if (auto skip = pipeline->popFrontWithName(DocumentSourceSkip::kStageName)) { - root = stdx::make_unique<RouterStageSkip>( - opCtx, std::move(root), static_cast<DocumentSourceSkip*>(skip.get())->getSkip()); - } else if (auto limit = pipeline->popFrontWithName(DocumentSourceLimit::kStageName)) { - root = stdx::make_unique<RouterStageLimit>( - opCtx, std::move(root), static_cast<DocumentSourceLimit*>(limit.get())->getLimit()); - } - } - // We are executing the pipeline without using an actual Pipeline, so we need to strip out any - // Document metadata ourselves. - return stdx::make_unique<RouterStageRemoveMetadataFields>( - opCtx, std::move(root), Document::allMetadataFieldNames); -} -} // namespace - std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan( OperationContext* opCtx, executor::TaskExecutor* executor, ClusterClientCursorParams* params) { const auto skip = params->skip; const auto limit = params->limit; - if (params->mergePipeline) { - if (auto sort = - cluster_aggregation_planner::popLeadingMergeSort(params->mergePipeline.get())) { - params->sort = *sort; - } - return buildPipelinePlan(executor, params); - } - std::unique_ptr<RouterExecStage> root = createInitialStage(opCtx, executor, params); + std::unique_ptr<RouterExecStage> root = + std::make_unique<RouterStageMerge>(opCtx, executor, params->extractARMParams()); if (skip) { root = stdx::make_unique<RouterStageSkip>(opCtx, std::move(root), *skip); diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h index 36f9d3995c8..04e97cad3d9 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.h +++ b/src/mongo/s/query/cluster_client_cursor_impl.h @@ -83,12 +83,21 @@ class ClusterClientCursorImpl final : public ClusterClientCursor { public: /** - * Constructs a CCC whose safe cleanup is ensured by an RAII object. + * Constructs a cluster query plan and CCC from the given parameters whose safe cleanup is + * ensured by an RAII object. */ static ClusterClientCursorGuard make(OperationContext* opCtx, executor::TaskExecutor* executor, ClusterClientCursorParams&& params); + /** + * Constructs a CCC from the given execution tree 'root'. The CCC's safe cleanup is ensured by + * an RAII object. + */ + static ClusterClientCursorGuard make(OperationContext* opCtx, + std::unique_ptr<RouterExecStage> root, + ClusterClientCursorParams&& params); + StatusWith<ClusterQueryResult> next(RouterExecStage::ExecContext) final; void kill(OperationContext* opCtx) final; @@ -122,12 +131,11 @@ public: boost::optional<ReadPreferenceSetting> getReadPreference() const final; public: - /** private for tests */ /** * Constructs a CCC whose result set is generated by a mock execution stage. */ ClusterClientCursorImpl(OperationContext* opCtx, - std::unique_ptr<RouterStageMock> root, + std::unique_ptr<RouterExecStage> root, ClusterClientCursorParams&& params, boost::optional<LogicalSessionId> lsid); diff --git a/src/mongo/s/query/cluster_client_cursor_params.h b/src/mongo/s/query/cluster_client_cursor_params.h index c2d300ee19e..a853d26a99f 100644 --- a/src/mongo/s/query/cluster_client_cursor_params.h +++ b/src/mongo/s/query/cluster_client_cursor_params.h @@ -123,9 +123,6 @@ struct ClusterClientCursorParams { // Should be forwarded to the remote hosts in 'cmdObj'. boost::optional<long long> limit; - // If set, we use this pipeline to merge the output of aggregations on each remote. - std::unique_ptr<Pipeline, PipelineDeleter> mergePipeline; - // Whether this cursor is tailing a capped collection, and whether it has the awaitData option // set. TailableModeEnum tailableMode = TailableModeEnum::kNormal; @@ -133,12 +130,6 @@ struct ClusterClientCursorParams { // Set if a readPreference must be respected throughout the lifetime of the cursor. boost::optional<ReadPreferenceSetting> readPreference; - // If valid, is called to return the RouterExecStage which becomes the initial source in this - // cursor's execution plan. Otherwise, a RouterStageMerge is used. - stdx::function<std::unique_ptr<RouterExecStage>( - OperationContext*, executor::TaskExecutor*, ClusterClientCursorParams*)> - createCustomCursorSource; - // Whether the client indicated that it is willing to receive partial results in the case of an // unreachable host. bool isAllowPartialResults = false; diff --git a/src/mongo/s/query/document_source_router_adapter.cpp b/src/mongo/s/query/document_source_router_adapter.cpp deleted file mode 100644 index 26a944ed5cc..00000000000 --- a/src/mongo/s/query/document_source_router_adapter.cpp +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Copyright (C) 2017 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/s/query/document_source_router_adapter.h" - -#include "mongo/db/pipeline/document_source.h" -#include "mongo/db/pipeline/expression_context.h" - -namespace mongo { - -boost::intrusive_ptr<DocumentSourceRouterAdapter> DocumentSourceRouterAdapter::create( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - std::unique_ptr<RouterExecStage> childStage) { - return new DocumentSourceRouterAdapter(expCtx, std::move(childStage)); -} - -DocumentSource::GetNextResult DocumentSourceRouterAdapter::getNext() { - auto next = uassertStatusOK(_child->next(_execContext)); - if (auto nextObj = next.getResult()) { - return Document::fromBsonWithMetaData(*nextObj); - } - return GetNextResult::makeEOF(); -} - -void DocumentSourceRouterAdapter::doDispose() { - _child->kill(pExpCtx->opCtx); -} - -void DocumentSourceRouterAdapter::reattachToOperationContext(OperationContext* opCtx) { - _child->reattachToOperationContext(opCtx); -} - -void DocumentSourceRouterAdapter::detachFromOperationContext() { - _child->detachFromOperationContext(); -} - -Value DocumentSourceRouterAdapter::serialize( - boost::optional<ExplainOptions::Verbosity> explain) const { - invariant(explain); // We shouldn't need to serialize this stage to send it anywhere. - return Value(); // Return the empty value to hide this stage from explain output. -} - -std::size_t DocumentSourceRouterAdapter::getNumRemotes() const { - return _child->getNumRemotes(); -} - -bool DocumentSourceRouterAdapter::remotesExhausted() { - return _child->remotesExhausted(); -} - -DocumentSourceRouterAdapter::DocumentSourceRouterAdapter( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - std::unique_ptr<RouterExecStage> childStage) - : DocumentSource(expCtx), _child(std::move(childStage)) {} - -} // namespace mongo diff --git a/src/mongo/s/query/document_source_router_adapter.h b/src/mongo/s/query/document_source_router_adapter.h deleted file mode 100644 index a7db7734539..00000000000 --- a/src/mongo/s/query/document_source_router_adapter.h +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Copyright (C) 2017 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -#pragma once - -#include "mongo/s/query/router_exec_stage.h" - -#include "mongo/db/pipeline/document_source.h" -#include "mongo/db/pipeline/pipeline.h" - -namespace mongo { -/** - * A class that acts as an adapter between the RouterExecStage and DocumentSource interfaces, - * translating results from an input RouterExecStage into DocumentSource::GetNextResults. - */ -class DocumentSourceRouterAdapter final : public DocumentSource { -public: - static boost::intrusive_ptr<DocumentSourceRouterAdapter> create( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - std::unique_ptr<RouterExecStage> childStage); - - StageConstraints constraints(Pipeline::SplitState pipeState) const final { - return {StreamType::kStreaming, - PositionRequirement::kFirst, - HostTypeRequirement::kMongoS, - DiskUseRequirement::kNoDiskUse, - FacetRequirement::kNotAllowed, - TransactionRequirement::kAllowed}; - } - - GetNextResult getNext() final; - void doDispose() final; - void reattachToOperationContext(OperationContext* opCtx) final; - void detachFromOperationContext() final; - Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final; - bool remotesExhausted(); - std::size_t getNumRemotes() const; - - void setExecContext(RouterExecStage::ExecContext execContext) { - _execContext = execContext; - } - - Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) const { - return _child->setAwaitDataTimeout(awaitDataTimeout); - } - -private: - DocumentSourceRouterAdapter(const boost::intrusive_ptr<ExpressionContext>& expCtx, - std::unique_ptr<RouterExecStage> childStage); - - std::unique_ptr<RouterExecStage> _child; - RouterExecStage::ExecContext _execContext; -}; -} // namespace mongo diff --git a/src/mongo/s/query/results_merger_test_fixture.cpp b/src/mongo/s/query/results_merger_test_fixture.cpp new file mode 100644 index 00000000000..57033523c68 --- /dev/null +++ b/src/mongo/s/query/results_merger_test_fixture.cpp @@ -0,0 +1,76 @@ +/** + * Copyright 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/client/remote_command_targeter_factory_mock.h" +#include "mongo/client/remote_command_targeter_mock.h" +#include "mongo/executor/network_interface_mock.h" +#include "mongo/executor/thread_pool_task_executor_test_fixture.h" +#include "mongo/s/catalog/type_shard.h" +#include "mongo/s/query/results_merger_test_fixture.h" + +namespace mongo { +const HostAndPort ResultsMergerTestFixture::kTestConfigShardHost = + HostAndPort("FakeConfigHost", 12345); +const std::vector<ShardId> ResultsMergerTestFixture::kTestShardIds = { + ShardId("FakeShard1"), ShardId("FakeShard2"), ShardId("FakeShard3")}; +const std::vector<HostAndPort> ResultsMergerTestFixture::kTestShardHosts = { + HostAndPort("FakeShard1Host", 12345), + HostAndPort("FakeShard2Host", 12345), + HostAndPort("FakeShard3Host", 12345)}; + +const NamespaceString ResultsMergerTestFixture::kTestNss = NamespaceString{"testdb.testcoll"}; + +void ResultsMergerTestFixture::setUp() { + setRemote(HostAndPort("ClientHost", 12345)); + + configTargeter()->setFindHostReturnValue(kTestConfigShardHost); + + std::vector<ShardType> shards; + + for (size_t i = 0; i < kTestShardIds.size(); i++) { + ShardType shardType; + shardType.setName(kTestShardIds[i].toString()); + shardType.setHost(kTestShardHosts[i].toString()); + + shards.push_back(shardType); + + std::unique_ptr<RemoteCommandTargeterMock> targeter( + stdx::make_unique<RemoteCommandTargeterMock>()); + targeter->setConnectionStringReturnValue(ConnectionString(kTestShardHosts[i])); + targeter->setFindHostReturnValue(kTestShardHosts[i]); + + targeterFactory()->addTargeterToReturn(ConnectionString(kTestShardHosts[i]), + std::move(targeter)); + } + + setupShards(shards); +} + +} // namespace mongo diff --git a/src/mongo/s/query/results_merger_test_fixture.h b/src/mongo/s/query/results_merger_test_fixture.h new file mode 100644 index 00000000000..1252f22b793 --- /dev/null +++ b/src/mongo/s/query/results_merger_test_fixture.h @@ -0,0 +1,228 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/s/query/async_results_merger.h" +#include "mongo/s/sharding_router_test_fixture.h" + +namespace mongo { + +/** + * Test fixture which is useful to both the tests for AsyncResultsMerger and BlockingResultsMerger. + */ +class ResultsMergerTestFixture : public ShardingTestFixture { +public: + static const HostAndPort kTestConfigShardHost; + static const std::vector<ShardId> kTestShardIds; + static const std::vector<HostAndPort> kTestShardHosts; + + static const NamespaceString kTestNss; + + ResultsMergerTestFixture() {} + + void setUp() override; + +protected: + /** + * Constructs an AsyncResultsMergerParams object with the given vector of existing cursors. + * + * If 'findCmd' is not set, the default AsyncResultsMergerParams are used. Otherwise, the + * 'findCmd' is used to construct the AsyncResultsMergerParams. + * + * 'findCmd' should not have a 'batchSize', since the find's batchSize is used just in the + * initial find. The getMore 'batchSize' can be passed in through 'getMoreBatchSize.' + */ + AsyncResultsMergerParams makeARMParamsFromExistingCursors( + std::vector<RemoteCursor> remoteCursors, + boost::optional<BSONObj> findCmd = boost::none, + boost::optional<std::int64_t> getMoreBatchSize = boost::none) { + AsyncResultsMergerParams params; + params.setNss(kTestNss); + params.setRemotes(std::move(remoteCursors)); + + + if (findCmd) { + const auto qr = unittest::assertGet( + QueryRequest::makeFromFindCommand(kTestNss, *findCmd, false /* isExplain */)); + if (!qr->getSort().isEmpty()) { + params.setSort(qr->getSort().getOwned()); + } + + if (getMoreBatchSize) { + params.setBatchSize(getMoreBatchSize); + } else { + params.setBatchSize(qr->getBatchSize() + ? boost::optional<std::int64_t>( + static_cast<std::int64_t>(*qr->getBatchSize())) + : boost::none); + } + params.setTailableMode(qr->getTailableMode()); + params.setAllowPartialResults(qr->isAllowPartialResults()); + } + + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(operationContext()->getLogicalSessionId()); + sessionInfo.setTxnNumber(operationContext()->getTxnNumber()); + params.setOperationSessionInfo(sessionInfo); + return params; + } + /** + * Constructs an ARM with the given vector of existing cursors. + * + * If 'findCmd' is not set, the default AsyncResultsMergerParams are used. + * Otherwise, the 'findCmd' is used to construct the AsyncResultsMergerParams. + * + * 'findCmd' should not have a 'batchSize', since the find's batchSize is used just in the + * initial find. The getMore 'batchSize' can be passed in through 'getMoreBatchSize.' + */ + std::unique_ptr<AsyncResultsMerger> makeARMFromExistingCursors( + std::vector<RemoteCursor> remoteCursors, + boost::optional<BSONObj> findCmd = boost::none, + boost::optional<std::int64_t> getMoreBatchSize = boost::none) { + + return stdx::make_unique<AsyncResultsMerger>( + operationContext(), + executor(), + makeARMParamsFromExistingCursors(std::move(remoteCursors), findCmd, getMoreBatchSize)); + } + + /** + * Schedules a "CommandOnShardedViewNotSupportedOnMongod" error response w/ view definition. + */ + void scheduleNetworkViewResponse(const std::string& ns, const std::string& pipelineJsonArr) { + BSONObjBuilder viewDefBob; + viewDefBob.append("ns", ns); + viewDefBob.append("pipeline", fromjson(pipelineJsonArr)); + + BSONObjBuilder bob; + bob.append("resolvedView", viewDefBob.obj()); + bob.append("ok", 0.0); + bob.append("errmsg", "Command on view must be executed by mongos"); + bob.append("code", 169); + + std::vector<BSONObj> batch = {bob.obj()}; + scheduleNetworkResponseObjs(batch); + } + + /** + * Schedules a list of cursor responses to be returned by the mock network. + */ + void scheduleNetworkResponses(std::vector<CursorResponse> responses) { + std::vector<BSONObj> objs; + for (const auto& cursorResponse : responses) { + // For tests of the AsyncResultsMerger, all CursorRepsonses scheduled by the tests are + // subsequent responses, since the AsyncResultsMerger will only ever run getMores. + objs.push_back(cursorResponse.toBSON(CursorResponse::ResponseType::SubsequentResponse)); + } + scheduleNetworkResponseObjs(objs); + } + + /** + * Schedules a list of raw BSON command responses to be returned by the mock network. + */ + void scheduleNetworkResponseObjs(std::vector<BSONObj> objs) { + executor::NetworkInterfaceMock* net = network(); + net->enterNetwork(); + for (const auto& obj : objs) { + ASSERT_TRUE(net->hasReadyRequests()); + Milliseconds millis(0); + executor::RemoteCommandResponse response(obj, millis); + executor::TaskExecutor::ResponseStatus responseStatus(response); + net->scheduleResponse(net->getNextReadyRequest(), net->now(), responseStatus); + } + net->runReadyNetworkOperations(); + net->exitNetwork(); + } + + executor::RemoteCommandRequest getNthPendingRequest(size_t n) { + executor::NetworkInterfaceMock* net = network(); + net->enterNetwork(); + ASSERT_TRUE(net->hasReadyRequests()); + executor::NetworkInterfaceMock::NetworkOperationIterator noi = + net->getNthUnscheduledRequest(n); + executor::RemoteCommandRequest retRequest = noi->getRequest(); + net->exitNetwork(); + return retRequest; + } + + bool networkHasReadyRequests() { + executor::NetworkInterfaceMock::InNetworkGuard guard(network()); + return guard->hasReadyRequests(); + } + + void scheduleErrorResponse(executor::ResponseStatus rs) { + invariant(!rs.isOK()); + rs.elapsedMillis = Milliseconds(0); + executor::NetworkInterfaceMock* net = network(); + net->enterNetwork(); + ASSERT_TRUE(net->hasReadyRequests()); + net->scheduleResponse(net->getNextReadyRequest(), net->now(), rs); + net->runReadyNetworkOperations(); + net->exitNetwork(); + } + + void runReadyCallbacks() { + executor::NetworkInterfaceMock* net = network(); + net->enterNetwork(); + net->runReadyNetworkOperations(); + net->exitNetwork(); + } + + void blackHoleNextRequest() { + executor::NetworkInterfaceMock* net = network(); + net->enterNetwork(); + ASSERT_TRUE(net->hasReadyRequests()); + net->blackHole(net->getNextReadyRequest()); + net->exitNetwork(); + } + + void assertKillCusorsCmdHasCursorId(const BSONObj& killCmd, CursorId cursorId) { + std::cout << "CHARLIE: " << killCmd; + ASSERT_TRUE(killCmd.hasElement("killCursors")); + ASSERT_EQ(killCmd["cursors"].type(), BSONType::Array); + + size_t numCursors = 0; + for (auto&& cursor : killCmd["cursors"].Obj()) { + ASSERT_EQ(cursor.type(), BSONType::NumberLong); + ASSERT_EQ(cursor.numberLong(), cursorId); + ++numCursors; + } + ASSERT_EQ(numCursors, 1u); + } + + RemoteCursor makeRemoteCursor(ShardId shardId, HostAndPort host, CursorResponse response) { + RemoteCursor remoteCursor; + remoteCursor.setShardId(std::move(shardId)); + remoteCursor.setHostAndPort(std::move(host)); + remoteCursor.setCursorResponse(std::move(response)); + return remoteCursor; + } +}; + +} // namespace mongo
\ No newline at end of file diff --git a/src/mongo/s/query/router_stage_merge.cpp b/src/mongo/s/query/router_stage_merge.cpp deleted file mode 100644 index 967c9f60b35..00000000000 --- a/src/mongo/s/query/router_stage_merge.cpp +++ /dev/null @@ -1,132 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery - -#include "mongo/platform/basic.h" - -#include "mongo/s/query/router_stage_merge.h" - -#include "mongo/db/query/find_common.h" -#include "mongo/util/scopeguard.h" - -namespace mongo { - -RouterStageMerge::RouterStageMerge(OperationContext* opCtx, - executor::TaskExecutor* executor, - ClusterClientCursorParams* params) - : RouterExecStage(opCtx), - _executor(executor), - _params(params), - _arm(opCtx, executor, params->extractARMParams()) {} - -StatusWith<ClusterQueryResult> RouterStageMerge::next(ExecContext execCtx) { - // Non-tailable and tailable non-awaitData cursors always block until ready(). AwaitData - // cursors wait for ready() only until a specified time limit is exceeded. - return (_params->tailableMode == TailableModeEnum::kTailableAndAwaitData - ? awaitNextWithTimeout(execCtx) - : _arm.blockingNext()); -} - -StatusWith<ClusterQueryResult> RouterStageMerge::awaitNextWithTimeout(ExecContext execCtx) { - invariant(_params->tailableMode == TailableModeEnum::kTailableAndAwaitData); - // If we are in kInitialFind or kGetMoreWithAtLeastOneResultInBatch context and the ARM is not - // ready, we don't block. Fall straight through to the return statement. - while (!_arm.ready() && execCtx == ExecContext::kGetMoreNoResultsYet) { - auto nextEventStatus = getNextEvent(); - if (!nextEventStatus.isOK()) { - return nextEventStatus.getStatus(); - } - auto event = nextEventStatus.getValue(); - - // Block until there are further results to return, or our time limit is exceeded. - auto waitStatus = _executor->waitForEvent( - getOpCtx(), event, awaitDataState(getOpCtx()).waitForInsertsDeadline); - - if (!waitStatus.isOK()) { - return waitStatus.getStatus(); - } - // Swallow timeout errors for tailable awaitData cursors, stash the event that we were - // waiting on, and return EOF. - if (waitStatus == stdx::cv_status::timeout) { - _leftoverEventFromLastTimeout = std::move(event); - return ClusterQueryResult{}; - } - } - - // We reach this point either if the ARM is ready, or if the ARM is !ready and we are in - // kInitialFind or kGetMoreWithAtLeastOneResultInBatch ExecContext. In the latter case, we - // return EOF immediately rather than blocking for further results. - return _arm.ready() ? _arm.nextReady() : ClusterQueryResult{}; -} - -StatusWith<EventHandle> RouterStageMerge::getNextEvent() { - // If we abandoned a previous event due to a mongoS-side timeout, wait for it first. - if (_leftoverEventFromLastTimeout) { - invariant(_params->tailableMode == TailableModeEnum::kTailableAndAwaitData); - // If we have an outstanding event from last time, then we might have to manually schedule - // some getMores for the cursors. If a remote response came back while we were between - // getMores (from the user to mongos), the response may have been an empty batch, and the - // ARM would not be able to ask for the next batch immediately since it is not attached to - // an OperationContext. Now that we have a valid OperationContext, we schedule the getMores - // ourselves. - Status getMoreStatus = _arm.scheduleGetMores(); - if (!getMoreStatus.isOK()) { - return getMoreStatus; - } - - // Return the leftover event and clear '_leftoverEventFromLastTimeout'. - auto event = _leftoverEventFromLastTimeout; - _leftoverEventFromLastTimeout = EventHandle(); - return event; - } - - return _arm.nextEvent(); -} - -void RouterStageMerge::kill(OperationContext* opCtx) { - _arm.blockingKill(opCtx); -} - -bool RouterStageMerge::remotesExhausted() { - return _arm.remotesExhausted(); -} - -std::size_t RouterStageMerge::getNumRemotes() const { - return _arm.getNumRemotes(); -} - -Status RouterStageMerge::doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) { - return _arm.setAwaitDataTimeout(awaitDataTimeout); -} - -void RouterStageMerge::addNewShardCursors(std::vector<RemoteCursor>&& newShards) { - _arm.addNewShardCursors(std::move(newShards)); -} - -} // namespace mongo diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h index b6bfee146b6..c0a847f7bd2 100644 --- a/src/mongo/s/query/router_stage_merge.h +++ b/src/mongo/s/query/router_stage_merge.h @@ -29,74 +29,56 @@ #pragma once #include "mongo/executor/task_executor.h" -#include "mongo/s/query/async_results_merger.h" +#include "mongo/s/query/blocking_results_merger.h" #include "mongo/s/query/cluster_client_cursor_params.h" #include "mongo/s/query/router_exec_stage.h" #include "mongo/util/net/hostandport.h" namespace mongo { -namespace { -using EventHandle = executor::TaskExecutor::EventHandle; -} // namespace - /** - * Draws results from the AsyncResultsMerger, which is the underlying source of the stream of merged - * documents manipulated by the RouterExecStage pipeline. Used to present a stream of documents - * merged from the shards to the stages later in the pipeline. + * Serves as an adapter between the RouterExecStage interface and the BlockingResultsMerger + * interface, providing a single stream of results populated from many remote streams. */ class RouterStageMerge final : public RouterExecStage { public: RouterStageMerge(OperationContext* opCtx, executor::TaskExecutor* executor, - ClusterClientCursorParams* params); - - StatusWith<ClusterQueryResult> next(ExecContext) final; + AsyncResultsMergerParams&& armParams) + : RouterExecStage(opCtx), _resultsMerger(opCtx, std::move(armParams), executor) {} - void kill(OperationContext* opCtx) final; + StatusWith<ClusterQueryResult> next(ExecContext execCtx) final { + return _resultsMerger.next(getOpCtx(), execCtx); + } - bool remotesExhausted() final; + void kill(OperationContext* opCtx) final { + _resultsMerger.kill(opCtx); + } - std::size_t getNumRemotes() const final; + bool remotesExhausted() final { + return _resultsMerger.remotesExhausted(); + } - /** - * Adds the cursors in 'newShards' to those being merged by the ARM. - */ - void addNewShardCursors(std::vector<RemoteCursor>&& newShards); + std::size_t getNumRemotes() const final { + return _resultsMerger.getNumRemotes(); + } protected: - Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final; + Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final { + return _resultsMerger.setAwaitDataTimeout(awaitDataTimeout); + } void doReattachToOperationContext() override { - _arm.reattachToOperationContext(getOpCtx()); + _resultsMerger.reattachToOperationContext(getOpCtx()); } virtual void doDetachFromOperationContext() { - _arm.detachFromOperationContext(); + _resultsMerger.detachFromOperationContext(); } private: - /** - * Awaits the next result from the ARM up to a specified time limit. If this is the user's - * initial find or we have already obtained at least one result for this batch, this method - * returns EOF immediately rather than blocking. - */ - StatusWith<ClusterQueryResult> awaitNextWithTimeout(ExecContext execCtx); - - /** - * Returns the next event to wait upon - either a new event from the ARM, or a valid preceding - * event which we scheduled during the previous call to next(). - */ - StatusWith<EventHandle> getNextEvent(); - - // Not owned here. - executor::TaskExecutor* _executor; - EventHandle _leftoverEventFromLastTimeout; - - ClusterClientCursorParams* _params; - // Schedules remote work and merges results from 'remotes'. - AsyncResultsMerger _arm; + BlockingResultsMerger _resultsMerger; }; } // namespace mongo diff --git a/src/mongo/s/query/router_stage_pipeline.cpp b/src/mongo/s/query/router_stage_pipeline.cpp index 5e94274b9ac..a5a97bdbdbc 100644 --- a/src/mongo/s/query/router_stage_pipeline.cpp +++ b/src/mongo/s/query/router_stage_pipeline.cpp @@ -35,26 +35,20 @@ #include "mongo/db/pipeline/document_source_list_local_sessions.h" #include "mongo/db/pipeline/document_source_merge_cursors.h" #include "mongo/db/pipeline/expression_context.h" -#include "mongo/s/query/document_source_router_adapter.h" namespace mongo { -RouterStagePipeline::RouterStagePipeline(std::unique_ptr<RouterExecStage> child, - std::unique_ptr<Pipeline, PipelineDeleter> mergePipeline) +RouterStagePipeline::RouterStagePipeline(std::unique_ptr<Pipeline, PipelineDeleter> mergePipeline) : RouterExecStage(mergePipeline->getContext()->opCtx), - _mergePipeline(std::move(mergePipeline)), - _mongosOnlyPipeline(!_mergePipeline->isSplitForMerge()) { - if (!_mongosOnlyPipeline) { - // Add an adapter to the front of the pipeline to draw results from 'child'. - _routerAdapter = - DocumentSourceRouterAdapter::create(_mergePipeline->getContext(), std::move(child)), - _mergePipeline->addInitialSource(_routerAdapter); - } + _mergePipeline(std::move(mergePipeline)) { + invariant(!_mergePipeline->getSources().empty()); + _mergeCursorsStage = + dynamic_cast<DocumentSourceMergeCursors*>(_mergePipeline->getSources().front().get()); } StatusWith<ClusterQueryResult> RouterStagePipeline::next(RouterExecStage::ExecContext execContext) { - if (_routerAdapter) { - _routerAdapter->setExecContext(execContext); + if (_mergeCursorsStage) { + _mergeCursorsStage->setExecContext(execContext); } // Pipeline::getNext will return a boost::optional<Document> or boost::none if EOF. @@ -85,15 +79,20 @@ void RouterStagePipeline::kill(OperationContext* opCtx) { } std::size_t RouterStagePipeline::getNumRemotes() const { - return _mongosOnlyPipeline ? 0 : _routerAdapter->getNumRemotes(); + if (_mergeCursorsStage) { + return _mergeCursorsStage->getNumRemotes(); + } + return 0; } bool RouterStagePipeline::remotesExhausted() { - return _mongosOnlyPipeline || _routerAdapter->remotesExhausted(); + return !_mergeCursorsStage || _mergeCursorsStage->remotesExhausted(); } Status RouterStagePipeline::doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) { - return _routerAdapter->setAwaitDataTimeout(awaitDataTimeout); + invariant(_mergeCursorsStage, + "The only cursors which should be tailable are those with remote cursors."); + return _mergeCursorsStage->setAwaitDataTimeout(awaitDataTimeout); } } // namespace mongo diff --git a/src/mongo/s/query/router_stage_pipeline.h b/src/mongo/s/query/router_stage_pipeline.h index c14ddf9f80b..43706b42cd9 100644 --- a/src/mongo/s/query/router_stage_pipeline.h +++ b/src/mongo/s/query/router_stage_pipeline.h @@ -31,8 +31,8 @@ #include "mongo/s/query/router_exec_stage.h" #include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_merge_cursors.h" #include "mongo/db/pipeline/pipeline.h" -#include "mongo/s/query/document_source_router_adapter.h" namespace mongo { @@ -42,8 +42,7 @@ namespace mongo { */ class RouterStagePipeline final : public RouterExecStage { public: - RouterStagePipeline(std::unique_ptr<RouterExecStage> child, - std::unique_ptr<Pipeline, PipelineDeleter> mergePipeline); + RouterStagePipeline(std::unique_ptr<Pipeline, PipelineDeleter> mergePipeline); StatusWith<ClusterQueryResult> next(RouterExecStage::ExecContext execContext) final; @@ -61,8 +60,10 @@ protected: void doDetachFromOperationContext() final; private: - boost::intrusive_ptr<DocumentSourceRouterAdapter> _routerAdapter; std::unique_ptr<Pipeline, PipelineDeleter> _mergePipeline; - bool _mongosOnlyPipeline; + + // May be null if this pipeline is executing exclusively on mongos and will not contact the + // shards at all. + boost::intrusive_ptr<DocumentSourceMergeCursors> _mergeCursorsStage; }; } // namespace mongo diff --git a/src/mongo/s/query/router_stage_update_on_add_shard.h b/src/mongo/s/query/router_stage_update_on_add_shard.h deleted file mode 100644 index 00ee921e2af..00000000000 --- a/src/mongo/s/query/router_stage_update_on_add_shard.h +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Copyright (C) 2017 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ -#pragma once - -#include "mongo/executor/task_executor.h" -#include "mongo/s/query/cluster_client_cursor_params.h" -#include "mongo/s/query/router_exec_stage.h" - -namespace mongo { -/** - * Uses a RouterStageMerge to merge results, and monitors the merged stream for special - * sentinel documents which indicate the the set of cursors needs to be updated. When the - * sentinel is detected, removes it from the stream and updates the set of cursors. - * - * cmdToRunOnNewShards: Command to execute on the new shard to open the cursor. - */ -class RouterStageUpdateOnAddShard final : public RouterExecStage { -public: - RouterStageUpdateOnAddShard(OperationContext* opCtx, - executor::TaskExecutor* executor, - ClusterClientCursorParams* params, - std::vector<ShardId> shardIds, - BSONObj cmdToRunOnNewShards); - - StatusWith<ClusterQueryResult> next(ExecContext) final; - -private: - /** - * Establish the new cursors and tell the RouterStageMerge about them. - * obj: The BSONObj which triggered the establishment of the new cursors - */ - void addNewShardCursors(BSONObj obj); - - /** - * Open the cursors on the new shards. - */ - std::vector<RemoteCursor> establishShardCursorsOnNewShards(const BSONObj& newShardDetectedObj); - - ClusterClientCursorParams* _params; - std::vector<ShardId> _shardIds; - BSONObj _cmdToRunOnNewShards; -}; -} |