summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2018-07-02 18:23:25 -0400
committerCharlie Swanson <charlie.swanson@mongodb.com>2018-08-15 13:30:12 -0400
commitee06e6cbe5a75775f76836449558be2f6a98ddfd (patch)
treed4dbf37110d25f7f4876337a7b1e11abe251fac5
parenta5bde2f3e9afc3f72da01788b76829fb29c2f4e7 (diff)
downloadmongo-ee06e6cbe5a75775f76836449558be2f6a98ddfd.tar.gz
SERVER-33323 Refactor agg cursor merging on mongos
This commit makes it so that aggregations will always use a $mergeCursors as a wrapper around a AsyncResultsMerger, which is new behavior for mongos. As part of this refactor, we can delete the concept of a 'merging presorted' $sort stage (which is now handled by the AsyncResultsMerger) and delete the DocumentSourceRouterAdapter stage which talked to a RouterStageMerge, instead directly using a $mergeCursors stage.
-rw-r--r--src/mongo/db/pipeline/SConscript16
-rw-r--r--src/mongo/db/pipeline/cluster_aggregation_planner.cpp159
-rw-r--r--src/mongo/db/pipeline/cluster_aggregation_planner.h49
-rw-r--r--src/mongo/db/pipeline/document_source.h26
-rw-r--r--src/mongo/db/pipeline/document_source_bucket_auto.h2
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp10
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h3
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_close_cursor.h21
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.h21
-rw-r--r--src/mongo/db/pipeline/document_source_exchange.h2
-rw-r--r--src/mongo/db/pipeline/document_source_facet.h2
-rw-r--r--src/mongo/db/pipeline/document_source_geo_near.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_geo_near.h3
-rw-r--r--src/mongo/db/pipeline/document_source_group.cpp14
-rw-r--r--src/mongo/db/pipeline/document_source_group.h2
-rw-r--r--src/mongo/db/pipeline/document_source_group_test.cpp9
-rw-r--r--src/mongo/db/pipeline/document_source_internal_split_pipeline.h2
-rw-r--r--src/mongo/db/pipeline/document_source_limit.cpp54
-rw-r--r--src/mongo/db/pipeline/document_source_limit.h2
-rw-r--r--src/mongo/db/pipeline/document_source_limit_test.cpp17
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.h2
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_change_post_image.h7
-rw-r--r--src/mongo/db/pipeline/document_source_merge_cursors.cpp54
-rw-r--r--src/mongo/db/pipeline/document_source_merge_cursors.h67
-rw-r--r--src/mongo/db/pipeline/document_source_out.h2
-rw-r--r--src/mongo/db/pipeline/document_source_sample.cpp18
-rw-r--r--src/mongo/db/pipeline/document_source_sample.h2
-rw-r--r--src/mongo/db/pipeline/document_source_skip.h6
-rw-r--r--src/mongo/db/pipeline/document_source_sort.cpp103
-rw-r--r--src/mongo/db/pipeline/document_source_sort.h31
-rw-r--r--src/mongo/db/pipeline/document_source_sort_test.cpp5
-rw-r--r--src/mongo/db/pipeline/document_source_update_on_add_shard.cpp (renamed from src/mongo/s/query/router_stage_update_on_add_shard.cpp)95
-rw-r--r--src/mongo/db/pipeline/document_source_update_on_add_shard.h98
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp25
-rw-r--r--src/mongo/db/pipeline/pipeline.h18
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp40
-rw-r--r--src/mongo/s/commands/SConscript4
-rw-r--r--src/mongo/s/commands/cluster_aggregate.cpp272
-rw-r--r--src/mongo/s/query/SConscript13
-rw-r--r--src/mongo/s/query/async_results_merger.cpp39
-rw-r--r--src/mongo/s/query/async_results_merger.h17
-rw-r--r--src/mongo/s/query/async_results_merger_test.cpp280
-rw-r--r--src/mongo/s/query/blocking_results_merger.cpp140
-rw-r--r--src/mongo/s/query/blocking_results_merger.h113
-rw-r--r--src/mongo/s/query/blocking_results_merger_test.cpp119
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.cpp88
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.h14
-rw-r--r--src/mongo/s/query/cluster_client_cursor_params.h9
-rw-r--r--src/mongo/s/query/document_source_router_adapter.cpp83
-rw-r--r--src/mongo/s/query/document_source_router_adapter.h79
-rw-r--r--src/mongo/s/query/results_merger_test_fixture.cpp76
-rw-r--r--src/mongo/s/query/results_merger_test_fixture.h228
-rw-r--r--src/mongo/s/query/router_stage_merge.cpp132
-rw-r--r--src/mongo/s/query/router_stage_merge.h64
-rw-r--r--src/mongo/s/query/router_stage_pipeline.cpp31
-rw-r--r--src/mongo/s/query/router_stage_pipeline.h11
-rw-r--r--src/mongo/s/query/router_stage_update_on_add_shard.h68
57 files changed, 1514 insertions, 1359 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index e9157f816d0..150fa1deb51 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -168,6 +168,17 @@ env.Library(
)
env.Library(
+ target='cluster_aggregation_planner',
+ source=[
+ 'cluster_aggregation_planner.cpp',
+ ],
+ LIBDEPS=[
+ 'pipeline',
+ '$BUILD_DIR/mongo/s/query/cluster_client_cursor',
+ ]
+)
+
+env.Library(
target='granularity_rounder',
source=[
'granularity_rounder.cpp',
@@ -303,7 +314,6 @@ pipelineeEnv.InjectThirdPartyIncludePaths(libraries=['snappy'])
pipelineeEnv.Library(
target='pipeline',
source=[
- "cluster_aggregation_planner.cpp",
'document_source.cpp',
'document_source_add_fields.cpp',
'document_source_backup_cursor.cpp',
@@ -334,6 +344,7 @@ pipelineeEnv.Library(
'document_source_lookup_change_post_image.cpp',
'document_source_match.cpp',
'document_source_merge_cursors.cpp',
+ 'document_source_update_on_add_shard.cpp',
'document_source_out.cpp',
'document_source_out_replace_coll.cpp',
'document_source_project.cpp',
@@ -439,8 +450,9 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/db/repl/replmocks',
'$BUILD_DIR/mongo/db/service_context_test_fixture',
'$BUILD_DIR/mongo/s/is_mongos',
- 'document_value_test_util',
+ 'cluster_aggregation_planner',
'document_source_mock',
+ 'document_value_test_util',
'pipeline',
],
)
diff --git a/src/mongo/db/pipeline/cluster_aggregation_planner.cpp b/src/mongo/db/pipeline/cluster_aggregation_planner.cpp
index 8fa76c1db79..c9dcfaf2da3 100644
--- a/src/mongo/db/pipeline/cluster_aggregation_planner.cpp
+++ b/src/mongo/db/pipeline/cluster_aggregation_planner.cpp
@@ -30,11 +30,21 @@
#include "mongo/db/pipeline/cluster_aggregation_planner.h"
+#include "mongo/db/pipeline/document_source_limit.h"
#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/document_source_merge_cursors.h"
#include "mongo/db/pipeline/document_source_project.h"
+#include "mongo/db/pipeline/document_source_skip.h"
#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/document_source_unwind.h"
+#include "mongo/db/pipeline/document_source_update_on_add_shard.h"
+#include "mongo/executor/task_executor_pool.h"
+#include "mongo/s/grid.h"
+#include "mongo/s/query/router_stage_limit.h"
+#include "mongo/s/query/router_stage_pipeline.h"
+#include "mongo/s/query/router_stage_remove_metadata_fields.h"
+#include "mongo/s/query/router_stage_skip.h"
+#include "mongo/s/shard_id.h"
namespace mongo {
namespace cluster_aggregation_planner {
@@ -47,8 +57,10 @@ namespace {
* It is not safe to call this optimization multiple times.
*
* NOTE: looks for NeedsMergerDocumentSources and uses that API
+ *
+ * Returns the sort specification if the input streams are sorted, and false otherwise.
*/
-void findSplitPoint(Pipeline* shardPipe, Pipeline* mergePipe) {
+boost::optional<BSONObj> findSplitPoint(Pipeline::SourceContainer* shardPipe, Pipeline* mergePipe) {
while (!mergePipe->getSources().empty()) {
boost::intrusive_ptr<DocumentSource> current = mergePipe->popFront();
@@ -58,28 +70,25 @@ void findSplitPoint(Pipeline* shardPipe, Pipeline* mergePipe) {
if (!splittable) {
// Move the source from the merger _sources to the shard _sources.
- shardPipe->pushBack(current);
+ shardPipe->push_back(current);
} else {
// Split this source into 'merge' and 'shard' _sources.
boost::intrusive_ptr<DocumentSource> shardSource = splittable->getShardSource();
- auto mergeSources = splittable->getMergeSources();
+ auto mergeLogic = splittable->mergingLogic();
// A source may not simultaneously be present on both sides of the split.
- invariant(std::find(mergeSources.begin(), mergeSources.end(), shardSource) ==
- mergeSources.end());
+ invariant(shardSource != mergeLogic.mergingStage);
if (shardSource)
- shardPipe->pushBack(shardSource);
+ shardPipe->push_back(std::move(shardSource));
- // Add the stages in reverse order, so that they appear in the pipeline in the same
- // order as they were returned by the stage.
- for (auto it = mergeSources.rbegin(); it != mergeSources.rend(); ++it) {
- mergePipe->addInitialSource(*it);
- }
+ if (mergeLogic.mergingStage)
+ mergePipe->addInitialSource(std::move(mergeLogic.mergingStage));
- break;
+ return mergeLogic.inputSortPattern;
}
}
+ return boost::none;
}
/**
@@ -131,48 +140,118 @@ void limitFieldsSentFromShardsToMerger(Pipeline* shardPipe, Pipeline* mergePipe)
BSON("$project" << mergeDeps.toProjection()).firstElement(), shardPipe->getContext());
shardPipe->pushBack(project);
}
-} // namespace
-void performSplitPipelineOptimizations(Pipeline* shardPipeline, Pipeline* mergingPipeline) {
- // The order in which optimizations are applied can have significant impact on the
- // efficiency of the final pipeline. Be Careful!
- findSplitPoint(shardPipeline, mergingPipeline);
- moveFinalUnwindFromShardsToMerger(shardPipeline, mergingPipeline);
- limitFieldsSentFromShardsToMerger(shardPipeline, mergingPipeline);
+bool isMergeSkipOrLimit(const boost::intrusive_ptr<DocumentSource>& stage) {
+ return (dynamic_cast<DocumentSourceLimit*>(stage.get()) ||
+ dynamic_cast<DocumentSourceMergeCursors*>(stage.get()) ||
+ dynamic_cast<DocumentSourceSkip*>(stage.get()));
}
-boost::optional<BSONObj> popLeadingMergeSort(Pipeline* pipeline) {
- // Remove a leading $sort iff it is a mergesort, since the ARM cannot handle blocking $sort.
- auto frontSort = pipeline->popFrontWithNameAndCriteria(
- DocumentSourceSort::kStageName, [](const DocumentSource* const source) {
- return static_cast<const DocumentSourceSort* const>(source)->mergingPresorted();
- });
-
- if (frontSort) {
- auto sortStage = static_cast<DocumentSourceSort*>(frontSort.get());
- if (auto sortLimit = sortStage->getLimitSrc()) {
- // There was a limit stage absorbed into the sort stage, so we need to preserve that.
- pipeline->addInitialSource(sortLimit);
+bool isAllLimitsAndSkips(Pipeline* pipeline) {
+ const auto stages = pipeline->getSources();
+ return std::all_of(
+ stages.begin(), stages.end(), [](const auto& stage) { return isMergeSkipOrLimit(stage); });
+}
+
+ClusterClientCursorGuard convertPipelineToRouterStages(
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline, ClusterClientCursorParams&& cursorParams) {
+ auto* opCtx = pipeline->getContext()->opCtx;
+
+ // We expect the pipeline to be fully executable at this point, so if the pipeline was all skips
+ // and limits we expect it to start with a $mergeCursors stage.
+ auto mergeCursors =
+ checked_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get());
+ // Replace the pipeline with RouterExecStages.
+ std::unique_ptr<RouterExecStage> root = mergeCursors->convertToRouterStage();
+ pipeline->popFront();
+ while (!pipeline->getSources().empty()) {
+ if (auto skip = pipeline->popFrontWithName(DocumentSourceSkip::kStageName)) {
+ root = std::make_unique<RouterStageSkip>(
+ opCtx, std::move(root), static_cast<DocumentSourceSkip*>(skip.get())->getSkip());
+ } else if (auto limit = pipeline->popFrontWithName(DocumentSourceLimit::kStageName)) {
+ root = std::make_unique<RouterStageLimit>(
+ opCtx, std::move(root), static_cast<DocumentSourceLimit*>(limit.get())->getLimit());
+ } else {
+ // We previously checked that everything was a $mergeCursors, $skip, or $limit. We
+ // already popped off the $mergeCursors, so everything else should be a $skip or a
+ // $limit.
+ MONGO_UNREACHABLE;
}
- return sortStage
- ->sortKeyPattern(DocumentSourceSort::SortKeySerialization::kForSortKeyMerging)
- .toBson();
}
- return boost::none;
+ // We are executing the pipeline without using an actual Pipeline, so we need to strip out any
+ // Document metadata ourselves.
+ return ClusterClientCursorImpl::make(
+ opCtx,
+ std::make_unique<RouterStageRemoveMetadataFields>(
+ opCtx, std::move(root), Document::allMetadataFieldNames),
+ std::move(cursorParams));
+}
+} // namespace
+
+SplitPipeline splitPipeline(std::unique_ptr<Pipeline, PipelineDeleter> pipeline) {
+ auto& expCtx = pipeline->getContext();
+ // Re-brand 'pipeline' as the merging pipeline. We will move stages one by one from the merging
+ // half to the shards, as possible.
+ auto mergePipeline = std::move(pipeline);
+
+ Pipeline::SourceContainer shardStages;
+ boost::optional<BSONObj> inputsSort = findSplitPoint(&shardStages, mergePipeline.get());
+ auto shardsPipeline = uassertStatusOK(Pipeline::create(std::move(shardStages), expCtx));
+
+ // The order in which optimizations are applied can have significant impact on the efficiency of
+ // the final pipeline. Be Careful!
+ moveFinalUnwindFromShardsToMerger(shardsPipeline.get(), mergePipeline.get());
+ limitFieldsSentFromShardsToMerger(shardsPipeline.get(), mergePipeline.get());
+ shardsPipeline->setSplitState(Pipeline::SplitState::kSplitForShards);
+ mergePipeline->setSplitState(Pipeline::SplitState::kSplitForMerge);
+
+ return {std::move(shardsPipeline), std::move(mergePipeline), std::move(inputsSort)};
}
void addMergeCursorsSource(Pipeline* mergePipeline,
+ const LiteParsedPipeline& liteParsedPipeline,
+ BSONObj cmdSentToShards,
std::vector<RemoteCursor> remoteCursors,
+ const std::vector<ShardId>& targetedShards,
+ boost::optional<BSONObj> shardCursorsSortSpec,
executor::TaskExecutor* executor) {
+ auto* opCtx = mergePipeline->getContext()->opCtx;
AsyncResultsMergerParams armParams;
- if (auto sort = popLeadingMergeSort(mergePipeline)) {
- armParams.setSort(std::move(*sort));
- }
+ armParams.setSort(shardCursorsSortSpec);
armParams.setRemotes(std::move(remoteCursors));
armParams.setTailableMode(mergePipeline->getContext()->tailableMode);
armParams.setNss(mergePipeline->getContext()->ns);
- mergePipeline->addInitialSource(DocumentSourceMergeCursors::create(
- executor, std::move(armParams), mergePipeline->getContext()));
+
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(opCtx->getLogicalSessionId());
+ sessionInfo.setTxnNumber(opCtx->getTxnNumber());
+ armParams.setOperationSessionInfo(sessionInfo);
+
+ // For change streams, we need to set up a custom stage to establish cursors on new shards when
+ // they are added, to ensure we don't miss results from the new shards.
+ auto mergeCursorsStage = DocumentSourceMergeCursors::create(
+ executor, std::move(armParams), mergePipeline->getContext());
+ if (liteParsedPipeline.hasChangeStream()) {
+ mergePipeline->addInitialSource(DocumentSourceUpdateOnAddShard::create(
+ mergePipeline->getContext(),
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ mergeCursorsStage,
+ targetedShards,
+ cmdSentToShards));
+ }
+ mergePipeline->addInitialSource(std::move(mergeCursorsStage));
+}
+
+ClusterClientCursorGuard buildClusterCursor(OperationContext* opCtx,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
+ ClusterClientCursorParams&& cursorParams) {
+ if (isAllLimitsAndSkips(pipeline.get())) {
+ // We can optimize this Pipeline to avoid going through any DocumentSources at all and thus
+ // skip the expensive BSON->Document->BSON conversion.
+ return convertPipelineToRouterStages(std::move(pipeline), std::move(cursorParams));
+ }
+ return ClusterClientCursorImpl::make(
+ opCtx, std::make_unique<RouterStagePipeline>(std::move(pipeline)), std::move(cursorParams));
}
} // namespace cluster_aggregation_planner
diff --git a/src/mongo/db/pipeline/cluster_aggregation_planner.h b/src/mongo/db/pipeline/cluster_aggregation_planner.h
index f62f158f6ca..3b3aaa63df4 100644
--- a/src/mongo/db/pipeline/cluster_aggregation_planner.h
+++ b/src/mongo/db/pipeline/cluster_aggregation_planner.h
@@ -28,31 +28,66 @@
#pragma once
+#include "mongo/db/pipeline/lite_parsed_pipeline.h"
#include "mongo/db/pipeline/pipeline.h"
+#include "mongo/s/query/cluster_client_cursor_impl.h"
namespace mongo {
namespace cluster_aggregation_planner {
/**
- * Performs optimizations with the aim of reducing computing time and network traffic when a
- * pipeline has been split into two pieces. Modifies 'shardPipeline' and 'mergingPipeline' such that
- * they may contain different stages, but still compute the same results when executed.
+ * Represents the two halves of a pipeline that will execute in a sharded cluster. 'shardsPipeline'
+ * will execute in parallel on each shard, and 'mergePipeline' will execute on the merge host -
+ * either one of the shards or a mongos.
*/
-void performSplitPipelineOptimizations(Pipeline* shardPipeline, Pipeline* mergingPipeline);
+struct SplitPipeline {
+ SplitPipeline(std::unique_ptr<Pipeline, PipelineDeleter> shardsPipeline,
+ std::unique_ptr<Pipeline, PipelineDeleter> mergePipeline,
+ boost::optional<BSONObj> shardCursorsSortSpec)
+ : shardsPipeline(std::move(shardsPipeline)),
+ mergePipeline(std::move(mergePipeline)),
+ shardCursorsSortSpec(std::move(shardCursorsSortSpec)) {}
+
+ std::unique_ptr<Pipeline, PipelineDeleter> shardsPipeline;
+ std::unique_ptr<Pipeline, PipelineDeleter> mergePipeline;
+
+ // If set, the cursors from the shards are expected to be sorted according to this spec, and to
+ // have populated a "$sortKey" metadata field which can be used to compare the results.
+ boost::optional<BSONObj> shardCursorsSortSpec;
+};
/**
- * Rips off an initial $sort stage that can be handled by cursor merging machinery. Returns the
- * sort key pattern of such a $sort stage if there was one, and boost::none otherwise.
+ * Split the current Pipeline into a Pipeline for each shard, and a Pipeline that combines the
+ * results within a merging process. This call also performs optimizations with the aim of reducing
+ * computing time and network traffic when a pipeline has been split into two pieces.
+ *
+ * The 'mergePipeline' returned as part of the SplitPipeline here is not ready to execute until the
+ * 'shardsPipeline' has been sent to the shards and cursors have been established. Once cursors have
+ * been established, the merge pipeline can be made executable by calling 'addMergeCursorsSource()'
*/
-boost::optional<BSONObj> popLeadingMergeSort(Pipeline* mergePipeline);
+SplitPipeline splitPipeline(std::unique_ptr<Pipeline, PipelineDeleter> pipeline);
/**
* Creates a new DocumentSourceMergeCursors from the provided 'remoteCursors' and adds it to the
* front of 'mergePipeline'.
*/
void addMergeCursorsSource(Pipeline* mergePipeline,
+ const LiteParsedPipeline&,
+ BSONObj cmdSentToShards,
std::vector<RemoteCursor> remoteCursors,
+ const std::vector<ShardId>& targetedShards,
+ boost::optional<BSONObj> shardCursorsSortSpec,
executor::TaskExecutor*);
+/**
+ * Builds a ClusterClientCursor which will execute 'pipeline'. If 'pipeline' consists entirely of
+ * $skip and $limit stages, the pipeline is eliminated entirely and replaced with a RouterExecStage
+ * tree that does same thing but will avoid using a RouterStagePipeline. Avoiding a
+ * RouterStagePipeline will remove an expensive conversion from BSONObj -> Document for each result.
+ */
+ClusterClientCursorGuard buildClusterCursor(OperationContext* opCtx,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
+ ClusterClientCursorParams&&);
+
} // namespace cluster_aggregation_planner
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index 4aae392153d..682b03b76e1 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -698,6 +698,22 @@ private:
class NeedsMergerDocumentSource {
public:
/**
+ * A struct representing the information needed to merge the cursors for the shards half of this
+ * pipeline. If 'inputSortPattern' is set, each document is expected to have sort key metadata
+ * which will be serialized in the '$sortKey' field. 'inputSortPattern' will then be used to
+ * describe which fields are ascending and which fields are descending when merging the streams
+ * together.
+ */
+ struct MergingLogic {
+ MergingLogic(boost::intrusive_ptr<DocumentSource>&& mergingStage,
+ boost::optional<BSONObj> inputSortPattern = boost::none)
+ : mergingStage(std::move(mergingStage)), inputSortPattern(inputSortPattern) {}
+
+ boost::intrusive_ptr<DocumentSource> mergingStage;
+ boost::optional<BSONObj> inputSortPattern;
+ };
+
+ /**
* Returns a source to be run on the shards, or NULL if no work should be done on the shards for
* this stage. Must not mutate the existing source object; if different behaviour is required in
* the split-pipeline case, a new source should be created and configured appropriately. It is
@@ -708,12 +724,12 @@ public:
virtual boost::intrusive_ptr<DocumentSource> getShardSource() = 0;
/**
- * Returns a list of stages that combine results from the shards. Subclasses of this class
- * should not return an empty list. Must not mutate the existing source object; if different
- * behaviour is required, a new source should be created and configured appropriately. It is an
- * error for getMergeSources() to return a pointer to the same object as getShardSource().
+ * Returns a struct representing what needs to be done to merge each shard's pipeline into a
+ * single stream of results. Must not mutate the existing source object; if different behaviour
+ * is required, a new source should be created and configured appropriately. It is an error for
+ * mergingLogic() to return a pointer to the same object as getShardSource().
*/
- virtual std::list<boost::intrusive_ptr<DocumentSource>> getMergeSources() = 0;
+ virtual MergingLogic mergingLogic() = 0;
protected:
// It is invalid to delete through a NeedsMergerDocumentSource-typed pointer.
diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.h b/src/mongo/db/pipeline/document_source_bucket_auto.h
index 9bd2c9f2877..18700913240 100644
--- a/src/mongo/db/pipeline/document_source_bucket_auto.h
+++ b/src/mongo/db/pipeline/document_source_bucket_auto.h
@@ -63,7 +63,7 @@ public:
boost::intrusive_ptr<DocumentSource> getShardSource() final {
return nullptr;
}
- std::list<boost::intrusive_ptr<DocumentSource>> getMergeSources() final {
+ MergingLogic mergingLogic() final {
return {this};
}
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index ac10c64b0ad..448ac608724 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -442,8 +442,8 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson(
return stages;
}
-BSONObj DocumentSourceChangeStream::replaceResumeTokenInCommand(const BSONObj originalCmdObj,
- const BSONObj resumeToken) {
+BSONObj DocumentSourceChangeStream::replaceResumeTokenInCommand(BSONObj originalCmdObj,
+ Document resumeToken) {
Document originalCmd(originalCmdObj);
auto pipeline = originalCmd[AggregationRequest::kPipelineName].getArray();
// A $changeStream must be the first element of the pipeline in order to be able
@@ -454,12 +454,12 @@ BSONObj DocumentSourceChangeStream::replaceResumeTokenInCommand(const BSONObj or
pipeline[0][DocumentSourceChangeStream::kStageName].getDocument());
changeStreamStage[DocumentSourceChangeStreamSpec::kResumeAfterFieldName] = Value(resumeToken);
- // If the command was initially specified with a startAtOperationTime, we need to remove it
- // to use the new resume token.
+ // If the command was initially specified with a startAtOperationTime, we need to remove it to
+ // use the new resume token.
changeStreamStage[DocumentSourceChangeStreamSpec::kStartAtOperationTimeFieldName] = Value();
pipeline[0] =
Value(Document{{DocumentSourceChangeStream::kStageName, changeStreamStage.freeze()}});
- MutableDocument newCmd(originalCmd);
+ MutableDocument newCmd(std::move(originalCmd));
newCmd[AggregationRequest::kPipelineName] = Value(pipeline);
return newCmd.freeze().toBson();
}
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index 802a717294e..5f4ee1c528c 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -182,8 +182,7 @@ public:
* resumeAfter: option containing the resume token. If there was a previous resumeAfter:
* option, it is removed.
*/
- static BSONObj replaceResumeTokenInCommand(const BSONObj originalCmdObj,
- const BSONObj resumeToken);
+ static BSONObj replaceResumeTokenInCommand(BSONObj originalCmdObj, Document resumeToken);
/**
* Helper used by various change stream stages. Used for asserting that a certain Value of a
diff --git a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h
index cda09f2257e..31fa2704005 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h
@@ -54,8 +54,9 @@ public:
invariant(pipeState != Pipeline::SplitState::kSplitForShards);
return {StreamType::kStreaming,
PositionRequirement::kNone,
- (pipeState == Pipeline::SplitState::kUnsplit ? HostTypeRequirement::kNone
- : HostTypeRequirement::kMongoS),
+ // If this is parsed on mongos it should stay on mongos. If we're not in a sharded
+ // cluster then it's okay to run on mongod.
+ HostTypeRequirement::kLocalOnly,
DiskUseRequirement::kNoDiskUse,
FacetRequirement::kNotAllowed,
TransactionRequirement::kNotAllowed,
@@ -77,20 +78,10 @@ public:
return nullptr;
}
- std::list<boost::intrusive_ptr<DocumentSource>> getMergeSources() final {
+ MergingLogic mergingLogic() final {
// This stage must run on mongos to ensure it sees any invalidation in the correct order,
- // and to ensure that all remote cursors are cleaned up properly. We also must include a
- // mergingPresorted $sort stage to communicate to the AsyncResultsMerger that we need to
- // merge the streams in a particular order.
- const bool mergingPresorted = true;
- const long long noLimit = -1;
- auto sortMergingPresorted =
- DocumentSourceSort::create(pExpCtx,
- change_stream_constants::kSortSpec,
- noLimit,
- internalDocumentSourceSortMaxBlockingSortBytes.load(),
- mergingPresorted);
- return {sortMergingPresorted, this};
+ // and to ensure that all remote cursors are cleaned up properly.
+ return {this, change_stream_constants::kSortSpec};
}
private:
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h
index f7d970bab32..ff3a32f4246 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token.h
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.h
@@ -105,13 +105,12 @@ public:
GetNextResult getNext() final;
const char* getSourceName() const final;
- StageConstraints constraints(Pipeline::SplitState pipeState) const final {
- // This stage should never be in the shards part of a split pipeline.
- invariant(pipeState != Pipeline::SplitState::kSplitForShards);
+ StageConstraints constraints(Pipeline::SplitState) const final {
return {StreamType::kStreaming,
PositionRequirement::kNone,
- (pipeState == Pipeline::SplitState::kUnsplit ? HostTypeRequirement::kNone
- : HostTypeRequirement::kMongoS),
+ // If this is parsed on mongos it should stay on mongos. If we're not in a sharded
+ // cluster then it's okay to run on mongod.
+ HostTypeRequirement::kLocalOnly,
DiskUseRequirement::kNoDiskUse,
FacetRequirement::kNotAllowed,
TransactionRequirement::kNotAllowed,
@@ -128,19 +127,11 @@ public:
_tokenFromClient.getClusterTime());
};
- std::list<boost::intrusive_ptr<DocumentSource>> getMergeSources() final {
+ MergingLogic mergingLogic() final {
// This stage must run on mongos to ensure it sees the resume token, which could have come
// from any shard. We also must include a mergingPresorted $sort stage to communicate to
// the AsyncResultsMerger that we need to merge the streams in a particular order.
- const bool mergingPresorted = true;
- const long long noLimit = -1;
- auto sortMergingPresorted =
- DocumentSourceSort::create(pExpCtx,
- change_stream_constants::kSortSpec,
- noLimit,
- internalDocumentSourceSortMaxBlockingSortBytes.load(),
- mergingPresorted);
- return {sortMergingPresorted, this};
+ return {this, change_stream_constants::kSortSpec};
};
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
diff --git a/src/mongo/db/pipeline/document_source_exchange.h b/src/mongo/db/pipeline/document_source_exchange.h
index b4231424032..ba4cb56ecdb 100644
--- a/src/mongo/db/pipeline/document_source_exchange.h
+++ b/src/mongo/db/pipeline/document_source_exchange.h
@@ -167,7 +167,7 @@ public:
boost::intrusive_ptr<DocumentSource> getShardSource() final {
return this;
}
- std::list<boost::intrusive_ptr<DocumentSource>> getMergeSources() final {
+ MergingLogic mergingLogic() final {
// TODO SERVER-35974 we have to revisit this when we implement consumers.
return {this};
}
diff --git a/src/mongo/db/pipeline/document_source_facet.h b/src/mongo/db/pipeline/document_source_facet.h
index d97dec116df..92e9f2f05d2 100644
--- a/src/mongo/db/pipeline/document_source_facet.h
+++ b/src/mongo/db/pipeline/document_source_facet.h
@@ -125,7 +125,7 @@ public:
boost::intrusive_ptr<DocumentSource> getShardSource() final {
return nullptr;
}
- std::list<boost::intrusive_ptr<DocumentSource>> getMergeSources() final {
+ MergingLogic mergingLogic() final {
return {this};
}
diff --git a/src/mongo/db/pipeline/document_source_geo_near.cpp b/src/mongo/db/pipeline/document_source_geo_near.cpp
index 1d8387309ae..8b62a8c21f8 100644
--- a/src/mongo/db/pipeline/document_source_geo_near.cpp
+++ b/src/mongo/db/pipeline/document_source_geo_near.cpp
@@ -235,8 +235,8 @@ DepsTracker::State DocumentSourceGeoNear::getDependencies(DepsTracker* deps) con
DocumentSourceGeoNear::DocumentSourceGeoNear(const intrusive_ptr<ExpressionContext>& pExpCtx)
: DocumentSource(pExpCtx), coordsIsArray(false), spherical(false) {}
-std::list<boost::intrusive_ptr<DocumentSource>> DocumentSourceGeoNear::getMergeSources() {
- return {DocumentSourceSort::create(
- pExpCtx, BSON(distanceField->fullPath() << 1 << "$mergePresorted" << true))};
+NeedsMergerDocumentSource::MergingLogic DocumentSourceGeoNear::mergingLogic() {
+ return {nullptr, BSON(distanceField->fullPath() << 1)};
}
+
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_geo_near.h b/src/mongo/db/pipeline/document_source_geo_near.h
index a0a6ea878ad..9237faeddcd 100644
--- a/src/mongo/db/pipeline/document_source_geo_near.h
+++ b/src/mongo/db/pipeline/document_source_geo_near.h
@@ -128,7 +128,8 @@ public:
/**
* In a sharded cluster, this becomes a merge sort by distance, from nearest to furthest.
*/
- std::list<boost::intrusive_ptr<DocumentSource>> getMergeSources() final;
+ MergingLogic mergingLogic() final;
+
private:
explicit DocumentSourceGeoNear(const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp
index fc5d6b1e2ee..1563c6ba14b 100644
--- a/src/mongo/db/pipeline/document_source_group.cpp
+++ b/src/mongo/db/pipeline/document_source_group.cpp
@@ -839,13 +839,13 @@ intrusive_ptr<DocumentSource> DocumentSourceGroup::getShardSource() {
return this; // No modifications necessary when on shard
}
-std::list<intrusive_ptr<DocumentSource>> DocumentSourceGroup::getMergeSources() {
- intrusive_ptr<DocumentSourceGroup> pMerger(new DocumentSourceGroup(pExpCtx));
- pMerger->setDoingMerge(true);
+NeedsMergerDocumentSource::MergingLogic DocumentSourceGroup::mergingLogic() {
+ intrusive_ptr<DocumentSourceGroup> mergingGroup(new DocumentSourceGroup(pExpCtx));
+ mergingGroup->setDoingMerge(true);
VariablesParseState vps = pExpCtx->variablesParseState;
/* the merger will use the same grouping key */
- pMerger->setIdExpression(ExpressionFieldPath::parse(pExpCtx, "$$ROOT._id", vps));
+ mergingGroup->setIdExpression(ExpressionFieldPath::parse(pExpCtx, "$$ROOT._id", vps));
for (auto&& accumulatedField : _accumulatedFields) {
// The merger's output field names will be the same, as will the accumulator factories.
@@ -855,12 +855,12 @@ std::list<intrusive_ptr<DocumentSource>> DocumentSourceGroup::getMergeSources()
auto copiedAccumuledField = accumulatedField;
copiedAccumuledField.expression =
ExpressionFieldPath::parse(pExpCtx, "$$ROOT." + accumulatedField.fieldName, vps);
- pMerger->addAccumulator(copiedAccumuledField);
+ mergingGroup->addAccumulator(copiedAccumuledField);
}
- return {pMerger};
-}
+ return {mergingGroup};
}
+} // namespace mongo
#include "mongo/db/sorter/sorter.cpp"
// Explicit instantiation unneeded since we aren't exposing Sorter outside of this file.
diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h
index 265ef8502d6..421c244641e 100644
--- a/src/mongo/db/pipeline/document_source_group.h
+++ b/src/mongo/db/pipeline/document_source_group.h
@@ -100,7 +100,7 @@ public:
// Virtuals for NeedsMergerDocumentSource.
boost::intrusive_ptr<DocumentSource> getShardSource() final;
- std::list<boost::intrusive_ptr<DocumentSource>> getMergeSources() final;
+ MergingLogic mergingLogic() final;
/**
* Returns true if this $group stage used disk during execution and false otherwise.
diff --git a/src/mongo/db/pipeline/document_source_group_test.cpp b/src/mongo/db/pipeline/document_source_group_test.cpp
index 4062c497a53..3a8cecedf54 100644
--- a/src/mongo/db/pipeline/document_source_group_test.cpp
+++ b/src/mongo/db/pipeline/document_source_group_test.cpp
@@ -543,10 +543,11 @@ protected:
// case only one shard is in use.
NeedsMergerDocumentSource* splittable = dynamic_cast<NeedsMergerDocumentSource*>(group());
ASSERT(splittable);
- auto routerSources = splittable->getMergeSources();
- ASSERT_EQ(routerSources.size(), 1UL);
- ASSERT_NOT_EQUALS(group(), routerSources.front().get());
- return routerSources.front();
+ auto mergeLogic = splittable->mergingLogic();
+ ASSERT(mergeLogic.mergingStage);
+ ASSERT_NOT_EQUALS(group(), mergeLogic.mergingStage);
+ ASSERT_FALSE(static_cast<bool>(mergeLogic.inputSortPattern));
+ return mergeLogic.mergingStage;
}
void checkResultSet(const intrusive_ptr<DocumentSource>& sink) {
// Load the results from the DocumentSourceGroup and sort them by _id.
diff --git a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h
index 2d5bf946ead..b96c5cdba51 100644
--- a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h
+++ b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h
@@ -62,7 +62,7 @@ public:
return nullptr;
}
- std::list<boost::intrusive_ptr<DocumentSource>> getMergeSources() final {
+ MergingLogic mergingLogic() final {
return {this};
}
diff --git a/src/mongo/db/pipeline/document_source_limit.cpp b/src/mongo/db/pipeline/document_source_limit.cpp
index 8256d055805..97afa6331b4 100644
--- a/src/mongo/db/pipeline/document_source_limit.cpp
+++ b/src/mongo/db/pipeline/document_source_limit.cpp
@@ -1,30 +1,30 @@
/**
-* Copyright (C) 2011 10gen Inc.
-*
-* This program is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License, version 3,
-* as published by the Free Software Foundation.
-*
-* This program is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see <http://www.gnu.org/licenses/>.
-*
-* As a special exception, the copyright holders give permission to link the
-* code of portions of this program with the OpenSSL library under certain
-* conditions as described in each individual source file and distribute
-* linked combinations including the program with the OpenSSL library. You
-* must comply with the GNU Affero General Public License in all respects for
-* all of the code used other than as permitted herein. If you modify file(s)
-* with this exception, you may extend this exception to your version of the
-* file(s), but you are not obligated to do so. If you do not wish to do so,
-* delete this exception statement from your version. If you delete this
-* exception statement from all source files in the program, then also delete
-* it in the license file.
-*/
+ * Copyright (C) 2011 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
#include "mongo/platform/basic.h"
@@ -101,4 +101,4 @@ intrusive_ptr<DocumentSource> DocumentSourceLimit::createFromBson(
long long limit = elem.numberLong();
return DocumentSourceLimit::create(pExpCtx, limit);
}
-}
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_limit.h b/src/mongo/db/pipeline/document_source_limit.h
index aeda5902054..703e35ee2eb 100644
--- a/src/mongo/db/pipeline/document_source_limit.h
+++ b/src/mongo/db/pipeline/document_source_limit.h
@@ -92,7 +92,7 @@ public:
* merge pipeline. Unlike the shards source, it is necessary for this stage to run on the
* merging host in order to produce correct pipeline output.
*/
- std::list<boost::intrusive_ptr<DocumentSource>> getMergeSources() final {
+ MergingLogic mergingLogic() final {
return {DocumentSourceLimit::create(pExpCtx, _limit)};
}
diff --git a/src/mongo/db/pipeline/document_source_limit_test.cpp b/src/mongo/db/pipeline/document_source_limit_test.cpp
index eb3ba447457..611ace66cd4 100644
--- a/src/mongo/db/pipeline/document_source_limit_test.cpp
+++ b/src/mongo/db/pipeline/document_source_limit_test.cpp
@@ -59,6 +59,23 @@ TEST_F(DocumentSourceLimitTest, ShouldDisposeSourceWhenLimitIsReached) {
ASSERT_TRUE(source->isDisposed);
}
+TEST_F(DocumentSourceLimitTest, ShouldNotBeAbleToLimitToZeroDocuments) {
+ auto source = DocumentSourceMock::create({"{a: 1}", "{a: 2}"});
+ ASSERT_THROWS_CODE(DocumentSourceLimit::create(getExpCtx(), 0), AssertionException, 15958);
+}
+
+TEST_F(DocumentSourceLimitTest, ShouldRejectUserLimitOfZero) {
+ ASSERT_THROWS_CODE(
+ DocumentSourceLimit::createFromBson(BSON("$limit" << 0).firstElement(), getExpCtx()),
+ AssertionException,
+ 15958);
+
+ // A $limit with size 1 should be okay.
+ auto shouldNotThrow =
+ DocumentSourceLimit::createFromBson(BSON("$limit" << 1).firstElement(), getExpCtx());
+ ASSERT(dynamic_cast<DocumentSourceLimit*>(shouldNotThrow.get()));
+}
+
TEST_F(DocumentSourceLimitTest, TwoLimitStagesShouldCombineIntoOne) {
Pipeline::SourceContainer container;
auto firstLimit = DocumentSourceLimit::create(getExpCtx(), 10);
diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h
index f9597592b69..d059cbc3a9e 100644
--- a/src/mongo/db/pipeline/document_source_lookup.h
+++ b/src/mongo/db/pipeline/document_source_lookup.h
@@ -134,7 +134,7 @@ public:
return nullptr;
}
- std::list<boost::intrusive_ptr<DocumentSource>> getMergeSources() final {
+ MergingLogic mergingLogic() final {
return {this};
}
diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h
index 83d6b1e1d42..573667176e8 100644
--- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h
+++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h
@@ -62,9 +62,10 @@ public:
invariant(pipeState != Pipeline::SplitState::kSplitForShards);
StageConstraints constraints(StreamType::kStreaming,
PositionRequirement::kNone,
- pipeState == Pipeline::SplitState::kUnsplit
- ? HostTypeRequirement::kNone
- : HostTypeRequirement::kMongoS,
+ // If this is parsed on mongos it should stay on mongos. If
+ // we're not in a sharded cluster then it's okay to run on
+ // mongod.
+ HostTypeRequirement::kLocalOnly,
DiskUseRequirement::kNoDiskUse,
FacetRequirement::kNotAllowed,
TransactionRequirement::kNotAllowed,
diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.cpp b/src/mongo/db/pipeline/document_source_merge_cursors.cpp
index bce0a16d6be..795755893de 100644
--- a/src/mongo/db/pipeline/document_source_merge_cursors.cpp
+++ b/src/mongo/db/pipeline/document_source_merge_cursors.cpp
@@ -30,6 +30,7 @@
#include "mongo/db/pipeline/document_source_merge_cursors.h"
#include "mongo/db/pipeline/document_source_sort.h"
+#include "mongo/db/query/find_common.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/s/grid.h"
@@ -51,14 +52,39 @@ DocumentSourceMergeCursors::DocumentSourceMergeCursors(
_executor(executor),
_armParams(std::move(armParams)) {}
+std::size_t DocumentSourceMergeCursors::getNumRemotes() const {
+ if (_armParams) {
+ return _armParams->getRemotes().size();
+ }
+ return _blockingResultsMerger->getNumRemotes();
+}
+
+bool DocumentSourceMergeCursors::remotesExhausted() const {
+ if (_armParams) {
+ // We haven't started iteration yet.
+ return false;
+ }
+ return _blockingResultsMerger->remotesExhausted();
+}
+
+void DocumentSourceMergeCursors::populateMerger() {
+ invariant(!_blockingResultsMerger);
+ invariant(_armParams);
+ _blockingResultsMerger.emplace(pExpCtx->opCtx, std::move(*_armParams), _executor);
+ _armParams = boost::none;
+}
+
+std::unique_ptr<RouterStageMerge> DocumentSourceMergeCursors::convertToRouterStage() {
+ invariant(!_blockingResultsMerger, "Expected conversion to happen before execution");
+ return stdx::make_unique<RouterStageMerge>(pExpCtx->opCtx, _executor, std::move(*_armParams));
+}
+
DocumentSource::GetNextResult DocumentSourceMergeCursors::getNext() {
- // We don't expect or support tailable cursors to be executing through this stage.
- invariant(pExpCtx->tailableMode == TailableModeEnum::kNormal);
- if (!_arm) {
- _arm.emplace(pExpCtx->opCtx, _executor, std::move(*_armParams));
- _armParams = boost::none;
+ if (!_blockingResultsMerger) {
+ populateMerger();
}
- auto next = uassertStatusOK(_arm->blockingNext());
+
+ auto next = uassertStatusOK(_blockingResultsMerger->next(pExpCtx->opCtx, _execContext));
if (next.isEOF()) {
return GetNextResult::makeEOF();
}
@@ -67,7 +93,7 @@ DocumentSource::GetNextResult DocumentSourceMergeCursors::getNext() {
Value DocumentSourceMergeCursors::serialize(
boost::optional<ExplainOptions::Verbosity> explain) const {
- invariant(!_arm);
+ invariant(!_blockingResultsMerger);
invariant(_armParams);
return Value(Document{{kStageName, _armParams->toBSON()}});
}
@@ -86,7 +112,7 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceMergeCursors::createFromBson(
std::move(ownedObj));
}
-boost::intrusive_ptr<DocumentSource> DocumentSourceMergeCursors::create(
+boost::intrusive_ptr<DocumentSourceMergeCursors> DocumentSourceMergeCursors::create(
executor::TaskExecutor* executor,
AsyncResultsMergerParams params,
const boost::intrusive_ptr<ExpressionContext>& expCtx) {
@@ -94,20 +120,20 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceMergeCursors::create(
}
void DocumentSourceMergeCursors::detachFromOperationContext() {
- if (_arm) {
- _arm->detachFromOperationContext();
+ if (_blockingResultsMerger) {
+ _blockingResultsMerger->detachFromOperationContext();
}
}
void DocumentSourceMergeCursors::reattachToOperationContext(OperationContext* opCtx) {
- if (_arm) {
- _arm->reattachToOperationContext(opCtx);
+ if (_blockingResultsMerger) {
+ _blockingResultsMerger->reattachToOperationContext(opCtx);
}
}
void DocumentSourceMergeCursors::doDispose() {
- if (_arm) {
- _arm->blockingKill(pExpCtx->opCtx);
+ if (_blockingResultsMerger) {
+ _blockingResultsMerger->kill(pExpCtx->opCtx);
}
}
diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.h b/src/mongo/db/pipeline/document_source_merge_cursors.h
index 4034a267aba..7f033fb8c15 100644
--- a/src/mongo/db/pipeline/document_source_merge_cursors.h
+++ b/src/mongo/db/pipeline/document_source_merge_cursors.h
@@ -30,7 +30,8 @@
#include "mongo/db/pipeline/document_source.h"
#include "mongo/executor/task_executor.h"
-#include "mongo/s/query/async_results_merger.h"
+#include "mongo/s/query/blocking_results_merger.h"
+#include "mongo/s/query/router_stage_merge.h"
namespace mongo {
@@ -57,11 +58,17 @@ public:
/**
* Creates a new DocumentSourceMergeCursors from the given parameters.
*/
- static boost::intrusive_ptr<DocumentSource> create(
+ static boost::intrusive_ptr<DocumentSourceMergeCursors> create(
executor::TaskExecutor*,
AsyncResultsMergerParams,
const boost::intrusive_ptr<ExpressionContext>&);
+ /**
+ * Extracts the remote cursors and converts the execution machinery from a DocumentSource to a
+ * RouterStage interface. Can only be called at planning time before any call to getNext().
+ */
+ std::unique_ptr<RouterStageMerge> convertToRouterStage();
+
const char* getSourceName() const final {
return kStageName.rawData();
}
@@ -90,6 +97,35 @@ public:
GetNextResult getNext() final;
+ std::size_t getNumRemotes() const;
+
+ bool remotesExhausted() const;
+
+ void setExecContext(RouterExecStage::ExecContext execContext) {
+ _execContext = execContext;
+ }
+
+ Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
+ if (!_blockingResultsMerger) {
+ // In cases where a cursor was established with a batchSize of 0, the first getMore
+ // might specify a custom maxTimeMS (AKA await data timeout). In these cases we will not
+ // have iterated the cursor yet so will not have populated the merger, but need to
+ // remember/track the custom await data timeout. We will soon iterate the cursor, so we
+ // just populate the merger now and let it track the await data timeout itself.
+ populateMerger();
+ }
+ return _blockingResultsMerger->setAwaitDataTimeout(awaitDataTimeout);
+ }
+
+ /**
+ * Adds the specified shard cursors to the set of cursors to be merged. The results from the
+ * new cursors will be returned as normal through getNext().
+ */
+ void addNewShardCursors(std::vector<RemoteCursor>&& newCursors) {
+ invariant(_blockingResultsMerger);
+ _blockingResultsMerger->addNewShardCursors(std::move(newCursors));
+ }
+
protected:
void doDispose() final;
@@ -99,20 +135,33 @@ private:
const boost::intrusive_ptr<ExpressionContext>&,
boost::optional<BSONObj> ownedParamsSpec = boost::none);
+ /**
+ * Converts '_armParams' into the execution machinery to merge the cursors. See below for why
+ * this is done lazily. Clears '_armParams' and populates '_blockingResultsMerger'.
+ */
+ void populateMerger();
+
// When we have parsed the params out of a BSONObj, the object needs to stay around while the
// params are in use. We store them here.
boost::optional<BSONObj> _armParamsObj;
executor::TaskExecutor* _executor;
- // '_armParams' is populated until the first call to getNext(). Upon the first call to getNext()
- // '_arm' will be populated using '_armParams', and '_armParams' will become boost::none. So if
- // getNext() is never called we will never populate '_arm'. If we did so the destruction of this
- // stage would cause the cursors within the ARM to be killed prematurely. For example, if this
- // stage is parsed on mongos then forwarded to the shards, it should not kill the cursors when
- // it goes out of scope on mongos.
+ // '_blockingResultsMerger' is lazily populated. Until we need to use it, '_armParams' will be
+ // populated with the parameters. Once we start using '_blockingResultsMerger', '_armParams'
+ // will become boost::none. We do this to prevent populating '_blockingResultsMerger' on mongos
+ // before serializing this stage and sending it to a shard to perform the merge. If we always
+ // populated '_blockingResultsMerger', then the destruction of this stage would cause the
+ // cursors within '_blockingResultsMerger' to be killed prematurely. For example, if this stage
+ // is parsed on mongos then forwarded to the shards, it should not kill the cursors when it goes
+ // out of scope on mongos.
boost::optional<AsyncResultsMergerParams> _armParams;
- boost::optional<AsyncResultsMerger> _arm;
+ boost::optional<BlockingResultsMerger> _blockingResultsMerger;
+
+ // The ExecContext is needed because if we're a tailable, awaitData cursor, we only want to
+ // 'await data' if we 1) are in a getMore and 2) don't already have data to return. This context
+ // allows us to determine which situation we're in.
+ RouterExecStage::ExecContext _execContext;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h
index 5168346957b..f8ee8829038 100644
--- a/src/mongo/db/pipeline/document_source_out.h
+++ b/src/mongo/db/pipeline/document_source_out.h
@@ -66,7 +66,7 @@ public:
boost::intrusive_ptr<DocumentSource> getShardSource() final {
return NULL;
}
- std::list<boost::intrusive_ptr<DocumentSource>> getMergeSources() final {
+ MergingLogic mergingLogic() final {
return {this};
}
diff --git a/src/mongo/db/pipeline/document_source_sample.cpp b/src/mongo/db/pipeline/document_source_sample.cpp
index 0f7583bd07b..03098a48d36 100644
--- a/src/mongo/db/pipeline/document_source_sample.cpp
+++ b/src/mongo/db/pipeline/document_source_sample.cpp
@@ -119,13 +119,13 @@ intrusive_ptr<DocumentSource> DocumentSourceSample::getShardSource() {
return this;
}
-std::list<intrusive_ptr<DocumentSource>> DocumentSourceSample::getMergeSources() {
- // Just need to merge the pre-sorted documents by their random values.
- BSONObjBuilder randMergeSortSpec;
-
- randMergeSortSpec.appendElements(randSortSpec);
- randMergeSortSpec.append("$mergePresorted", true);
-
- return {DocumentSourceSort::create(pExpCtx, randMergeSortSpec.obj(), _size)};
+NeedsMergerDocumentSource::MergingLogic DocumentSourceSample::mergingLogic() {
+ // On the merger we need to merge the pre-sorted documents by their random values, then limit to
+ // the number we need. Here we don't use 'randSortSpec' because it uses a metadata sort which
+ // the merging logic does not understand. The merging logic will use the serialized sort key,
+ // and this sort pattern is just used to communicate ascending/descending information. A pattern
+ // like {$meta: "randVal"} is neither ascending nor descending, and so will not be useful when
+ // constructing the merging logic.
+ return {_size > 0 ? DocumentSourceLimit::create(pExpCtx, _size) : nullptr, BSON("$rand" << -1)};
}
-} // mongo
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_sample.h b/src/mongo/db/pipeline/document_source_sample.h
index 6195328b90d..d6f8f858d64 100644
--- a/src/mongo/db/pipeline/document_source_sample.h
+++ b/src/mongo/db/pipeline/document_source_sample.h
@@ -57,7 +57,7 @@ public:
}
boost::intrusive_ptr<DocumentSource> getShardSource() final;
- std::list<boost::intrusive_ptr<DocumentSource>> getMergeSources() final;
+ MergingLogic mergingLogic() final;
long long getSampleSize() const {
return _size;
diff --git a/src/mongo/db/pipeline/document_source_skip.h b/src/mongo/db/pipeline/document_source_skip.h
index 927957d87a7..014092865f6 100644
--- a/src/mongo/db/pipeline/document_source_skip.h
+++ b/src/mongo/db/pipeline/document_source_skip.h
@@ -82,12 +82,10 @@ public:
return DepsTracker::State::SEE_NEXT; // This doesn't affect needed fields
}
- // Virtuals for NeedsMergerDocumentSource
- // Need to run on rounter. Can't run on shards.
boost::intrusive_ptr<DocumentSource> getShardSource() final {
- return NULL;
+ return nullptr;
}
- std::list<boost::intrusive_ptr<DocumentSource>> getMergeSources() final {
+ MergingLogic mergingLogic() final {
return {this};
}
diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp
index d992fb8053c..cb753b400f9 100644
--- a/src/mongo/db/pipeline/document_source_sort.cpp
+++ b/src/mongo/db/pipeline/document_source_sort.cpp
@@ -1,30 +1,30 @@
/**
-* Copyright (C) 2011 10gen Inc.
-*
-* This program is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License, version 3,
-* as published by the Free Software Foundation.
-*
-* This program is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see <http://www.gnu.org/licenses/>.
-*
-* As a special exception, the copyright holders give permission to link the
-* code of portions of this program with the OpenSSL library under certain
-* conditions as described in each individual source file and distribute
-* linked combinations including the program with the OpenSSL library. You
-* must comply with the GNU Affero General Public License in all respects for
-* all of the code used other than as permitted herein. If you modify file(s)
-* with this exception, you may extend this exception to your version of the
-* file(s), but you are not obligated to do so. If you do not wish to do so,
-* delete this exception statement from your version. If you delete this
-* exception statement from all source files in the program, then also delete
-* it in the license file.
-*/
+ * Copyright (C) 2011 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
#include "mongo/platform/basic.h"
@@ -44,9 +44,9 @@
namespace mongo {
using boost::intrusive_ptr;
-using std::unique_ptr;
using std::make_pair;
using std::string;
+using std::unique_ptr;
using std::vector;
namespace {
@@ -98,7 +98,7 @@ Value deserializeSortKey(size_t sortPatternSize, BSONObj bsonSortKey) {
constexpr StringData DocumentSourceSort::kStageName;
DocumentSourceSort::DocumentSourceSort(const intrusive_ptr<ExpressionContext>& pExpCtx)
- : DocumentSource(pExpCtx), _mergingPresorted(false) {}
+ : DocumentSource(pExpCtx) {}
REGISTER_DOCUMENT_SOURCE(sort,
LiteParsedDocumentSourceDefault::parse,
@@ -106,8 +106,6 @@ REGISTER_DOCUMENT_SOURCE(sort,
DocumentSource::GetNextResult DocumentSourceSort::getNext() {
pExpCtx->checkForInterrupt();
- invariant(!_mergingPresorted); // A presorted-merge should be optimized into the merge, and
- // never executed.
if (!_populated) {
const auto populationResult = populate();
@@ -118,9 +116,6 @@ DocumentSource::GetNextResult DocumentSourceSort::getNext() {
}
if (!_output || !_output->more()) {
- // Need to be sure connections are marked as done so they can be returned to the connection
- // pool. This only needs to happen in the _mergingPresorted case, but it doesn't hurt to
- // always do it.
dispose();
return GetNextResult::makeEOF();
}
@@ -131,17 +126,12 @@ DocumentSource::GetNextResult DocumentSourceSort::getNext() {
void DocumentSourceSort::serializeToArray(
std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const {
if (explain) { // always one Value for combined $sort + $limit
- array.push_back(Value(DOC(
- kStageName << DOC("sortKey" << sortKeyPattern(SortKeySerialization::kForExplain)
- << "mergePresorted"
- << (_mergingPresorted ? Value(true) : Value())
- << "limit"
- << (_limitSrc ? Value(_limitSrc->getLimit()) : Value())))));
+ array.push_back(
+ Value(DOC(kStageName << DOC(
+ "sortKey" << sortKeyPattern(SortKeySerialization::kForExplain) << "limit"
+ << (_limitSrc ? Value(_limitSrc->getLimit()) : Value())))));
} else { // one Value for $sort and maybe a Value for $limit
MutableDocument inner(sortKeyPattern(SortKeySerialization::kForPipelineSerialization));
- if (_mergingPresorted) {
- inner["$mergePresorted"] = Value(true);
- }
array.push_back(Value(DOC(kStageName << inner.freeze())));
if (_limitSrc) {
@@ -243,24 +233,16 @@ intrusive_ptr<DocumentSourceSort> DocumentSourceSort::create(
const intrusive_ptr<ExpressionContext>& pExpCtx,
BSONObj sortOrder,
long long limit,
- boost::optional<uint64_t> maxMemoryUsageBytes,
- bool mergingPresorted) {
+ boost::optional<uint64_t> maxMemoryUsageBytes) {
intrusive_ptr<DocumentSourceSort> pSort(new DocumentSourceSort(pExpCtx));
pSort->_maxMemoryUsageBytes = maxMemoryUsageBytes
? *maxMemoryUsageBytes
: internalDocumentSourceSortMaxBlockingSortBytes.load();
pSort->_rawSort = sortOrder.getOwned();
- pSort->_mergingPresorted = mergingPresorted;
for (auto&& keyField : sortOrder) {
auto fieldName = keyField.fieldNameStringData();
- if ("$mergePresorted" == fieldName) {
- verify(keyField.Bool());
- pSort->_mergingPresorted = true;
- continue;
- }
-
SortPatternPart patternPart;
if (keyField.type() == Object) {
@@ -524,25 +506,14 @@ int DocumentSourceSort::compare(const Value& lhs, const Value& rhs) const {
}
intrusive_ptr<DocumentSource> DocumentSourceSort::getShardSource() {
- verify(!_mergingPresorted);
return this;
}
-std::list<intrusive_ptr<DocumentSource>> DocumentSourceSort::getMergeSources() {
- verify(!_mergingPresorted);
- intrusive_ptr<DocumentSourceSort> other = new DocumentSourceSort(pExpCtx);
- other->_sortPattern = _sortPattern;
- other->_sortKeyGen = SortKeyGenerator{
- other->sortKeyPattern(SortKeySerialization::kForPipelineSerialization).toBson(),
- pExpCtx->getCollator()};
- other->_paths = _paths;
- other->_limitSrc = _limitSrc;
- other->_maxMemoryUsageBytes = _maxMemoryUsageBytes;
- other->_mergingPresorted = true;
- other->_rawSort = _rawSort;
- return {other};
-}
+NeedsMergerDocumentSource::MergingLogic DocumentSourceSort::mergingLogic() {
+ return {_limitSrc ? DocumentSourceLimit::create(pExpCtx, _limitSrc->getLimit()) : nullptr,
+ sortKeyPattern(SortKeySerialization::kForSortKeyMerging).toBson()};
}
+} // namespace mongo
#include "mongo/db/sorter/sorter.cpp"
// Explicit instantiation unneeded since we aren't exposing Sorter outside of this file.
diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h
index d2165acdb89..ac860c5b7a5 100644
--- a/src/mongo/db/pipeline/document_source_sort.h
+++ b/src/mongo/db/pipeline/document_source_sort.h
@@ -61,16 +61,14 @@ public:
return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>{}, {}};
}
- StageConstraints constraints(Pipeline::SplitState pipeState) const final {
- StageConstraints constraints(
- _mergingPresorted ? StreamType::kStreaming : StreamType::kBlocking,
- PositionRequirement::kNone,
- HostTypeRequirement::kNone,
- _mergingPresorted ? DiskUseRequirement::kNoDiskUse : DiskUseRequirement::kWritesTmpData,
- _mergingPresorted ? FacetRequirement::kNotAllowed : FacetRequirement::kAllowed,
- TransactionRequirement::kAllowed,
- _mergingPresorted ? ChangeStreamRequirement::kWhitelist
- : ChangeStreamRequirement::kBlacklist);
+ StageConstraints constraints(Pipeline::SplitState) const final {
+ StageConstraints constraints(StreamType::kBlocking,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kWritesTmpData,
+ FacetRequirement::kAllowed,
+ TransactionRequirement::kAllowed,
+ ChangeStreamRequirement::kBlacklist);
// Can't swap with a $match if a limit has been absorbed, as $match can't swap with $limit.
constraints.canSwapWithMatch = !_limitSrc;
@@ -84,7 +82,7 @@ public:
DepsTracker::State getDependencies(DepsTracker* deps) const final;
boost::intrusive_ptr<DocumentSource> getShardSource() final;
- std::list<boost::intrusive_ptr<DocumentSource>> getMergeSources() final;
+ MergingLogic mergingLogic() final;
/**
* Write out a Document whose contents are the sort key pattern.
@@ -105,15 +103,7 @@ public:
const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
BSONObj sortOrder,
long long limit = -1,
- boost::optional<uint64_t> maxMemoryUsageBytes = boost::none,
- bool mergingPresorted = false);
-
- /**
- * Returns true if this $sort stage is merging presorted streams.
- */
- bool mergingPresorted() const {
- return _mergingPresorted;
- }
+ boost::optional<uint64_t> maxMemoryUsageBytes = boost::none);
/**
* Returns -1 for no limit.
@@ -267,7 +257,6 @@ private:
uint64_t _maxMemoryUsageBytes;
bool _done;
- bool _mergingPresorted; // TODO SERVER-34009 Remove this flag.
std::unique_ptr<MySorter> _sorter;
std::unique_ptr<MySorter::Iterator> _output;
bool _usedDisk = false;
diff --git a/src/mongo/db/pipeline/document_source_sort_test.cpp b/src/mongo/db/pipeline/document_source_sort_test.cpp
index 42a6590dca6..4aacdd1c4cc 100644
--- a/src/mongo/db/pipeline/document_source_sort_test.cpp
+++ b/src/mongo/db/pipeline/document_source_sort_test.cpp
@@ -131,7 +131,7 @@ TEST_F(DocumentSourceSortTest, SortWithLimit) {
ASSERT_BSONOBJ_EQ(arr[0].getDocument().toBson(), BSON("$sort" << BSON("a" << 1)));
ASSERT(sort()->getShardSource() != nullptr);
- ASSERT(!sort()->getMergeSources().empty());
+ ASSERT(sort()->mergingLogic().mergingStage == nullptr);
}
container.push_back(DocumentSourceLimit::create(expCtx, 10));
@@ -158,7 +158,8 @@ TEST_F(DocumentSourceSortTest, SortWithLimit) {
DOC_ARRAY(DOC("$sort" << DOC("a" << 1)) << DOC("$limit" << sort()->getLimit())));
ASSERT(sort()->getShardSource() != nullptr);
- ASSERT(!sort()->getMergeSources().empty());
+ ASSERT(sort()->mergingLogic().mergingStage != nullptr);
+ ASSERT(dynamic_cast<DocumentSourceLimit*>(sort()->mergingLogic().mergingStage.get()));
}
TEST_F(DocumentSourceSortTest, Dependencies) {
diff --git a/src/mongo/s/query/router_stage_update_on_add_shard.cpp b/src/mongo/db/pipeline/document_source_update_on_add_shard.cpp
index 61fa2a9176d..7177d705c67 100644
--- a/src/mongo/s/query/router_stage_update_on_add_shard.cpp
+++ b/src/mongo/db/pipeline/document_source_update_on_add_shard.cpp
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2017 MongoDB Inc.
+ * Copyright (C) 2018 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
@@ -26,95 +26,98 @@
* then also delete it in the license file.
*/
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery
-#include "mongo/s/query/router_stage_update_on_add_shard.h"
+#include "mongo/db/pipeline/document_source_update_on_add_shard.h"
#include <algorithm>
-#include "mongo/base/checked_cast.h"
#include "mongo/db/pipeline/document_source_change_stream.h"
-#include "mongo/executor/task_executor_pool.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
+#include "mongo/s/query/async_results_merger_params_gen.h"
#include "mongo/s/query/establish_cursors.h"
-#include "mongo/s/query/router_stage_merge.h"
-#include "mongo/util/log.h"
namespace mongo {
namespace {
// Returns true if the change stream document has an 'operationType' of 'newShardDetected'.
-bool needsUpdate(const StatusWith<ClusterQueryResult>& childResult) {
- if (!childResult.isOK() || childResult.getValue().isEOF()) {
- return false;
- }
- return ((*childResult.getValue().getResult())[DocumentSourceChangeStream::kOperationTypeField]
- .str() == DocumentSourceChangeStream::kNewShardDetectedOpType);
+bool needsUpdate(const Document& childResult) {
+ return childResult[DocumentSourceChangeStream::kOperationTypeField].getStringData() ==
+ DocumentSourceChangeStream::kNewShardDetectedOpType;
}
+} // namespace
+
+boost::intrusive_ptr<DocumentSourceUpdateOnAddShard> DocumentSourceUpdateOnAddShard::create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ executor::TaskExecutor* executor,
+ const boost::intrusive_ptr<DocumentSourceMergeCursors>& mergeCursors,
+ std::vector<ShardId> shardsWithCursors,
+ BSONObj cmdToRunOnNewShards) {
+ return new DocumentSourceUpdateOnAddShard(
+ expCtx, executor, mergeCursors, std::move(shardsWithCursors), cmdToRunOnNewShards);
}
-RouterStageUpdateOnAddShard::RouterStageUpdateOnAddShard(OperationContext* opCtx,
- executor::TaskExecutor* executor,
- ClusterClientCursorParams* params,
- std::vector<ShardId> shardIds,
- BSONObj cmdToRunOnNewShards)
- : RouterExecStage(opCtx, stdx::make_unique<RouterStageMerge>(opCtx, executor, params)),
- _params(params),
- _shardIds(std::move(shardIds)),
- _cmdToRunOnNewShards(cmdToRunOnNewShards) {}
+DocumentSourceUpdateOnAddShard::DocumentSourceUpdateOnAddShard(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ executor::TaskExecutor* executor,
+ const boost::intrusive_ptr<DocumentSourceMergeCursors>& mergeCursors,
+ std::vector<ShardId>&& shardsWithCursors,
+ BSONObj cmdToRunOnNewShards)
+ : DocumentSource(expCtx),
+ _executor(executor),
+ _mergeCursors(mergeCursors),
+ _shardsWithCursors(std::move(shardsWithCursors)),
+ _cmdToRunOnNewShards(cmdToRunOnNewShards.getOwned()) {}
+
+DocumentSource::GetNextResult DocumentSourceUpdateOnAddShard::getNext() {
+ auto childResult = pSource->getNext();
-StatusWith<ClusterQueryResult> RouterStageUpdateOnAddShard::next(
- RouterExecStage::ExecContext execContext) {
- auto childStage = getChildStage();
- auto childResult = childStage->next(execContext);
- while (needsUpdate(childResult)) {
- addNewShardCursors(*childResult.getValue().getResult());
- childResult = childStage->next(execContext);
+ while (childResult.isAdvanced() && needsUpdate(childResult.getDocument())) {
+ addNewShardCursors(childResult.getDocument());
+ childResult = pSource->getNext();
}
return childResult;
}
-void RouterStageUpdateOnAddShard::addNewShardCursors(BSONObj newShardDetectedObj) {
- checked_cast<RouterStageMerge*>(getChildStage())
- ->addNewShardCursors(establishShardCursorsOnNewShards(newShardDetectedObj));
+void DocumentSourceUpdateOnAddShard::addNewShardCursors(const Document& newShardDetectedObj) {
+ _mergeCursors->addNewShardCursors(establishShardCursorsOnNewShards(newShardDetectedObj));
}
-std::vector<RemoteCursor> RouterStageUpdateOnAddShard::establishShardCursorsOnNewShards(
- const BSONObj& newShardDetectedObj) {
- auto* opCtx = getOpCtx();
- // Reload the shard registry. We need to ensure a reload initiated after calling this method
+std::vector<RemoteCursor> DocumentSourceUpdateOnAddShard::establishShardCursorsOnNewShards(
+ const Document& newShardDetectedObj) {
+ auto* opCtx = pExpCtx->opCtx;
+ // Reload the shard registry. We need to ensure a reload initiated after calling this method
// caused the reload, otherwise we aren't guaranteed to get all the new shards.
auto* shardRegistry = Grid::get(opCtx)->shardRegistry();
if (!shardRegistry->reload(opCtx)) {
// A 'false' return from shardRegistry.reload() means a reload was already in progress and
- // it completed before reload() returned. So another reload(), regardless of return
- // value, will ensure a reload started after the first call to reload().
+ // it completed before reload() returned. So another reload(), regardless of return value,
+ // will ensure a reload started after the first call to reload().
shardRegistry->reload(opCtx);
}
std::vector<ShardId> shardIds, newShardIds;
shardRegistry->getAllShardIdsNoReload(&shardIds);
- std::sort(_shardIds.begin(), _shardIds.end());
+ std::sort(_shardsWithCursors.begin(), _shardsWithCursors.end());
std::sort(shardIds.begin(), shardIds.end());
std::set_difference(shardIds.begin(),
shardIds.end(),
- _shardIds.begin(),
- _shardIds.end(),
+ _shardsWithCursors.begin(),
+ _shardsWithCursors.end(),
std::back_inserter(newShardIds));
auto cmdObj = DocumentSourceChangeStream::replaceResumeTokenInCommand(
_cmdToRunOnNewShards,
- newShardDetectedObj[DocumentSourceChangeStream::kIdField].embeddedObject());
+ newShardDetectedObj[DocumentSourceChangeStream::kIdField].getDocument());
std::vector<std::pair<ShardId, BSONObj>> requests;
for (const auto& shardId : newShardIds) {
requests.emplace_back(shardId, cmdObj);
- _shardIds.push_back(shardId);
+ _shardsWithCursors.push_back(shardId);
}
const bool allowPartialResults = false; // partial results are not allowed
return establishCursors(opCtx,
- Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
- _params->nsString,
- _params->readPreference.value_or(ReadPreferenceSetting()),
+ _executor,
+ pExpCtx->ns,
+ ReadPreferenceSetting::get(opCtx),
requests,
allowPartialResults);
}
diff --git a/src/mongo/db/pipeline/document_source_update_on_add_shard.h b/src/mongo/db/pipeline/document_source_update_on_add_shard.h
new file mode 100644
index 00000000000..4c6e0473528
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_update_on_add_shard.h
@@ -0,0 +1,98 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_merge_cursors.h"
+#include "mongo/executor/task_executor.h"
+#include "mongo/s/shard_id.h"
+
+namespace mongo {
+
+/**
+ * An internal stage used as part of the change streams infrastructure to listen for an event
+ * signaling that a new shard now has potentially matching data. For example, this stage will
+ * detect if a collection is being watched and a chunk for that collection migrates to a shard for
+ * the first time. When this event is detected, this stage will establish a new cursor on that
+ * shard and add it to the cursors being merged.
+ */
+class DocumentSourceUpdateOnAddShard final : public DocumentSource {
+public:
+ /**
+ * Creates a new stage which will establish a new cursor and add it to the cursors being merged
+ * by 'mergeCursorsStage' whenever a new shard is detected by a change stream.
+ */
+ static boost::intrusive_ptr<DocumentSourceUpdateOnAddShard> create(
+ const boost::intrusive_ptr<ExpressionContext>&,
+ executor::TaskExecutor*,
+ const boost::intrusive_ptr<DocumentSourceMergeCursors>&,
+ std::vector<ShardId> shardsWithCursors,
+ BSONObj cmdToRunOnNewShards);
+
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final {
+ // We only ever expect to add this stage if the pipeline is being executed locally on a
+ // mongos. In this case, it should never be serialized.
+ MONGO_UNREACHABLE;
+ }
+
+ virtual StageConstraints constraints(Pipeline::SplitState) const {
+ return {StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kMongoS,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed,
+ TransactionRequirement::kNotAllowed,
+ ChangeStreamRequirement::kChangeStreamStage};
+ }
+
+ GetNextResult getNext() final;
+
+private:
+ DocumentSourceUpdateOnAddShard(const boost::intrusive_ptr<ExpressionContext>&,
+ executor::TaskExecutor*,
+ const boost::intrusive_ptr<DocumentSourceMergeCursors>&,
+ std::vector<ShardId>&& shardsWithCursors,
+ BSONObj cmdToRunOnNewShards);
+
+ /**
+ * Establish the new cursors and tell the RouterStageMerge about them.
+ */
+ void addNewShardCursors(const Document& newShardDetectedObj);
+
+ /**
+ * Open the cursors on the new shards.
+ */
+ std::vector<RemoteCursor> establishShardCursorsOnNewShards(const Document& newShardDetectedObj);
+
+ executor::TaskExecutor* _executor;
+ boost::intrusive_ptr<DocumentSourceMergeCursors> _mergeCursors;
+ std::vector<ShardId> _shardsWithCursors;
+ BSONObj _cmdToRunOnNewShards;
+};
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index 31b90963034..1a82bf1569a 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -38,7 +38,6 @@
#include "mongo/db/jsobj.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/pipeline/accumulator.h"
-#include "mongo/db/pipeline/cluster_aggregation_planner.h"
#include "mongo/db/pipeline/document.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_geo_near.h"
@@ -335,25 +334,6 @@ bool Pipeline::usedDisk() {
_sources.begin(), _sources.end(), [](const auto& stage) { return stage->usedDisk(); });
}
-std::unique_ptr<Pipeline, PipelineDeleter> Pipeline::splitForSharded() {
- invariant(!isSplitForShards());
- invariant(!isSplitForMerge());
-
- // Create and initialize the shard spec we'll return. We start with an empty pipeline on the
- // shards and all work being done in the merger. Optimizations can move operations between
- // the pipelines to be more efficient.
- std::unique_ptr<Pipeline, PipelineDeleter> shardPipeline(new Pipeline(pCtx),
- PipelineDeleter(pCtx->opCtx));
-
- cluster_aggregation_planner::performSplitPipelineOptimizations(shardPipeline.get(), this);
- shardPipeline->_splitState = SplitState::kSplitForShards;
- _splitState = SplitState::kSplitForMerge;
-
- stitch();
-
- return shardPipeline;
-}
-
BSONObj Pipeline::getInitialQuery() const {
if (_sources.empty())
return BSONObj();
@@ -399,12 +379,13 @@ bool Pipeline::canRunOnMongos() const {
}
bool Pipeline::requiredToRunOnMongos() const {
- invariant(!isSplitForShards());
+ invariant(_splitState != SplitState::kSplitForShards);
for (auto&& stage : _sources) {
// If this pipeline is capable of splitting before the mongoS-only stage, then the pipeline
// as a whole is not required to run on mongoS.
- if (isUnsplit() && dynamic_cast<NeedsMergerDocumentSource*>(stage.get())) {
+ if (_splitState == SplitState::kUnsplit &&
+ dynamic_cast<NeedsMergerDocumentSource*>(stage.get())) {
return false;
}
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h
index 0bfb2602d2c..d4a2c813060 100644
--- a/src/mongo/db/pipeline/pipeline.h
+++ b/src/mongo/db/pipeline/pipeline.h
@@ -48,9 +48,9 @@
namespace mongo {
class BSONObj;
class BSONObjBuilder;
-class ExpressionContext;
-class DocumentSource;
class CollatorInterface;
+class DocumentSource;
+class ExpressionContext;
class OperationContext;
class PipelineDeleter;
@@ -172,17 +172,11 @@ public:
bool usedDisk();
/**
- * Split the current Pipeline into a Pipeline for each shard, and a Pipeline that combines the
- * results within mongos. This permanently alters this pipeline for the merging operation, and
- * returns a Pipeline object that should be executed on each targeted shard.
- */
- std::unique_ptr<Pipeline, PipelineDeleter> splitForSharded();
-
- /**
- * Returns true if this pipeline has not been split.
+ * Communicates to the pipeline which part of a split pipeline it is when the pipeline has been
+ * split in two.
*/
- bool isUnsplit() const {
- return _splitState == SplitState::kUnsplit;
+ void setSplitState(SplitState state) {
+ _splitState = state;
}
/**
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index 9d28060c00d..371f5f9dc89 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/operation_context_noop.h"
#include "mongo/db/pipeline/aggregation_context_fixture.h"
+#include "mongo/db/pipeline/cluster_aggregation_planner.h"
#include "mongo/db/pipeline/dependencies.h"
#include "mongo/db/pipeline/document.h"
#include "mongo/db/pipeline/document_source.h"
@@ -1757,13 +1758,17 @@ public:
mergePipe = uassertStatusOK(Pipeline::parse(request.getPipeline(), ctx));
mergePipe->optimizePipeline();
- shardPipe = mergePipe->splitForSharded();
- ASSERT(shardPipe);
+ auto splitPipeline = cluster_aggregation_planner::splitPipeline(std::move(mergePipe));
- ASSERT_VALUE_EQ(Value(shardPipe->writeExplainOps(ExplainOptions::Verbosity::kQueryPlanner)),
+ ASSERT_VALUE_EQ(Value(splitPipeline.shardsPipeline->writeExplainOps(
+ ExplainOptions::Verbosity::kQueryPlanner)),
Value(shardPipeExpected["pipeline"]));
- ASSERT_VALUE_EQ(Value(mergePipe->writeExplainOps(ExplainOptions::Verbosity::kQueryPlanner)),
+ ASSERT_VALUE_EQ(Value(splitPipeline.mergePipeline->writeExplainOps(
+ ExplainOptions::Verbosity::kQueryPlanner)),
Value(mergePipeExpected["pipeline"]));
+
+ shardPipe = std::move(splitPipeline.shardsPipeline);
+ mergePipe = std::move(splitPipeline.mergePipeline);
}
virtual ~Base() {}
@@ -1936,7 +1941,7 @@ class ShardedSortMatchProjSkipLimBecomesMatchTopKSortSkipProj : public Base {
"]";
}
string mergePipeJson() {
- return "[{$sort: {sortKey: {a: 1}, mergePresorted: true, limit: 8}}"
+ return "[{$limit: 8}"
",{$skip: 3}"
",{$project: {_id: true, a: true}}"
"]";
@@ -1974,7 +1979,7 @@ class ShardedSortProjLimBecomesTopKSortProj : public Base {
"]";
}
string mergePipeJson() {
- return "[{$sort: {sortKey: {a: 1}, mergePresorted: true, limit: 5}}"
+ return "[{$limit: 5}"
",{$project: {_id: true, a: true}}"
"]";
}
@@ -1994,8 +1999,7 @@ class ShardedSortGroupProjLimDoesNotBecomeTopKSortProjGroup : public Base {
"]";
}
string mergePipeJson() {
- return "[{$sort: {sortKey: {a: 1}, mergePresorted: true}}"
- ",{$group : {_id: {a: '$a'}}}"
+ return "[{$group : {_id: {a: '$a'}}}"
",{$project: {_id: true, a: true}}"
",{$limit: 5}"
"]";
@@ -2017,7 +2021,7 @@ class ShardedMatchSortProjLimBecomesMatchTopKSortProj : public Base {
"]";
}
string mergePipeJson() {
- return "[{$sort: {sortKey: {a: -1}, mergePresorted: true, limit: 6}}"
+ return "[{$limit: 6}"
",{$project: {_id: true, a: true}}"
"]";
}
@@ -2232,14 +2236,14 @@ DEATH_TEST_F(PipelineMustRunOnMongoSTest,
// $_internalSplitPipeline.
ASSERT_FALSE(pipeline->requiredToRunOnMongos());
- auto shardPipe = pipeline->splitForSharded();
- ASSERT(shardPipe);
+ auto splitPipeline = cluster_aggregation_planner::splitPipeline(std::move(pipeline));
+ ASSERT(splitPipeline.shardsPipeline);
+ ASSERT(splitPipeline.mergePipeline);
- // The merge half of the pipeline must run on mongoS.
- ASSERT_TRUE(pipeline->requiredToRunOnMongos());
+ ASSERT_TRUE(splitPipeline.mergePipeline->requiredToRunOnMongos());
// Calling 'requiredToRunOnMongos' on the shard pipeline will hit an invariant.
- shardPipe->requiredToRunOnMongos();
+ splitPipeline.shardsPipeline->requiredToRunOnMongos();
}
TEST_F(PipelineMustRunOnMongoSTest, SplitMongoSMergePipelineAssertsIfShardStagePresent) {
@@ -2260,12 +2264,12 @@ TEST_F(PipelineMustRunOnMongoSTest, SplitMongoSMergePipelineAssertsIfShardStageP
// $_internalSplitPipeline.
ASSERT_FALSE(pipeline->requiredToRunOnMongos());
- auto shardPipe = pipeline->splitForSharded();
- ASSERT(shardPipe);
+ auto splitPipeline = cluster_aggregation_planner::splitPipeline(std::move(pipeline));
// The merge pipeline must run on mongoS, but $out needs to run on the primary shard.
- ASSERT_THROWS_CODE(
- pipeline->requiredToRunOnMongos(), AssertionException, ErrorCodes::IllegalOperation);
+ ASSERT_THROWS_CODE(splitPipeline.mergePipeline->requiredToRunOnMongos(),
+ AssertionException,
+ ErrorCodes::IllegalOperation);
}
TEST_F(PipelineMustRunOnMongoSTest, SplittablePipelineAssertsIfMongoSStageOnShardSideOfSplit) {
diff --git a/src/mongo/s/commands/SConscript b/src/mongo/s/commands/SConscript
index 5c75e96236e..f39ee98cc70 100644
--- a/src/mongo/s/commands/SConscript
+++ b/src/mongo/s/commands/SConscript
@@ -108,10 +108,10 @@ env.Library(
'$BUILD_DIR/mongo/db/auth/saslauth',
'$BUILD_DIR/mongo/db/commands/core',
'$BUILD_DIR/mongo/db/commands/current_op_common',
- '$BUILD_DIR/mongo/db/commands/servers',
'$BUILD_DIR/mongo/db/commands/feature_compatibility_parsers',
'$BUILD_DIR/mongo/db/commands/kill_common',
'$BUILD_DIR/mongo/db/commands/profile_common',
+ '$BUILD_DIR/mongo/db/commands/servers',
'$BUILD_DIR/mongo/db/commands/test_commands_enabled',
'$BUILD_DIR/mongo/db/commands/write_commands_common',
'$BUILD_DIR/mongo/db/ftdc/ftdc_server',
@@ -119,10 +119,12 @@ env.Library(
'$BUILD_DIR/mongo/db/logical_session_cache_impl',
'$BUILD_DIR/mongo/db/pipeline/aggregation',
'$BUILD_DIR/mongo/db/pipeline/mongos_process_interface',
+ '$BUILD_DIR/mongo/db/pipeline/cluster_aggregation_planner',
'$BUILD_DIR/mongo/db/stats/counters',
'$BUILD_DIR/mongo/db/views/views',
'$BUILD_DIR/mongo/executor/async_multicaster',
'$BUILD_DIR/mongo/rpc/client_metadata',
+ '$BUILD_DIR/mongo/s/query/cluster_client_cursor',
'$BUILD_DIR/mongo/s/sharding_api',
'$BUILD_DIR/mongo/s/sharding_legacy_api',
'$BUILD_DIR/mongo/s/transaction/router_session',
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp
index d06c5426555..fbf2446467b 100644
--- a/src/mongo/s/commands/cluster_aggregate.cpp
+++ b/src/mongo/s/commands/cluster_aggregate.cpp
@@ -1,30 +1,30 @@
/**
-* Copyright (C) 2016 MongoDB Inc.
-*
-* This program is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License, version 3,
-* as published by the Free Software Foundation.
-*
-* This program is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see <http://www.gnu.org/licenses/>.
-*
-* As a special exception, the copyright holders give permission to link the
-* code of portions of this program with the OpenSSL library under certain
-* conditions as described in each individual source file and distribute
-* linked combinations including the program with the OpenSSL library. You
-* must comply with the GNU Affero General Public License in all respects for
-* all of the code used other than as permitted herein. If you modify file(s)
-* with this exception, you may extend this exception to your version of the
-* file(s), but you are not obligated to do so. If you do not wish to do so,
-* delete this exception statement from your version. If you delete this
-* exception statement from all source files in the program, then also delete
-* it in the license file.
-*/
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand
@@ -66,7 +66,7 @@
#include "mongo/s/query/cluster_cursor_manager.h"
#include "mongo/s/query/cluster_query_knobs.h"
#include "mongo/s/query/establish_cursors.h"
-#include "mongo/s/query/router_stage_update_on_add_shard.h"
+#include "mongo/s/query/router_stage_pipeline.h"
#include "mongo/s/query/store_possible_cursor.h"
#include "mongo/s/stale_exception.h"
#include "mongo/util/fail_point.h"
@@ -75,6 +75,8 @@
namespace mongo {
+using SplitPipeline = cluster_aggregation_planner::SplitPipeline;
+
MONGO_FAIL_POINT_DEFINE(clusterAggregateHangBeforeEstablishingShardCursors);
namespace {
@@ -104,38 +106,6 @@ Document wrapAggAsExplain(Document aggregateCommand, ExplainOptions::Verbosity v
return explainCommandBuilder.freeze();
}
-Status appendExplainResults(
- const std::vector<AsyncRequestsSender::Response>& shardResults,
- const boost::intrusive_ptr<ExpressionContext>& mergeCtx,
- const std::unique_ptr<Pipeline, PipelineDeleter>& pipelineForTargetedShards,
- const std::unique_ptr<Pipeline, PipelineDeleter>& pipelineForMerging,
- BSONObjBuilder* result) {
- if (pipelineForTargetedShards->isSplitForShards()) {
- *result << "mergeType"
- << (pipelineForMerging->canRunOnMongos()
- ? "mongos"
- : pipelineForMerging->needsPrimaryShardMerger() ? "primaryShard"
- : "anyShard")
- << "splitPipeline"
- << Document{
- {"shardsPart",
- pipelineForTargetedShards->writeExplainOps(*mergeCtx->explain)},
- {"mergerPart", pipelineForMerging->writeExplainOps(*mergeCtx->explain)}};
- } else {
- *result << "splitPipeline" << BSONNULL;
- }
-
- BSONObjBuilder shardExplains(result->subobjStart("shards"));
- for (const auto& shardResult : shardResults) {
- invariant(shardResult.shardHostAndPort);
- shardExplains.append(shardResult.shardId.toString(),
- BSON("host" << shardResult.shardHostAndPort->toString() << "stages"
- << shardResult.swResponse.getValue().data["stages"]));
- }
-
- return Status::OK();
-}
-
Status appendCursorResponseToCommandResult(const ShardId& shardId,
const BSONObj cursorResponse,
BSONObjBuilder* result) {
@@ -246,11 +216,10 @@ BSONObj createCommandForTargetedShards(
return appendAllowImplicitCreate(cmdObj, true);
}
-BSONObj createCommandForMergingShard(
- const AggregationRequest& request,
- const boost::intrusive_ptr<ExpressionContext>& mergeCtx,
- const BSONObj originalCmdObj,
- const std::unique_ptr<Pipeline, PipelineDeleter>& pipelineForMerging) {
+BSONObj createCommandForMergingShard(const AggregationRequest& request,
+ const boost::intrusive_ptr<ExpressionContext>& mergeCtx,
+ const BSONObj originalCmdObj,
+ const Pipeline* pipelineForMerging) {
MutableDocument mergeCmd(request.serializeToCommandObj());
mergeCmd["pipeline"] = Value(pipelineForMerging->serialize());
@@ -339,12 +308,11 @@ struct DispatchShardPipelineResults {
// Populated if this *is* an explain, this vector represents the results from each shard.
std::vector<AsyncRequestsSender::Response> remoteExplainOutput;
- // The half of the pipeline that was sent to each shard, or the entire pipeline if there was
- // only one shard targeted.
- std::unique_ptr<Pipeline, PipelineDeleter> pipelineForTargetedShards;
+ // The split version of the pipeline if more than one shard was targeted, otherwise boost::none.
+ boost::optional<SplitPipeline> splitPipeline;
- // The merging half of the pipeline if more than one shard was targeted, otherwise nullptr.
- std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging;
+ // If the pipeline targeted a single shard, this is the pipeline to run on that shard.
+ std::unique_ptr<Pipeline, PipelineDeleter> pipelineForSingleShard;
// The command object to send to the targeted shards.
BSONObj commandForTargetedShards;
@@ -380,8 +348,7 @@ DispatchShardPipelineResults dispatchShardPipeline(
const auto shardQuery = pipeline->getInitialQuery();
- auto pipelineForTargetedShards = std::move(pipeline);
- std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging;
+ boost::optional<SplitPipeline> splitPipeline;
auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, executionNss);
@@ -415,13 +382,19 @@ DispatchShardPipelineResults dispatchShardPipeline(
*(shardIds.begin()) != executionNsRoutingInfo->db().primaryId()));
if (needsSplit) {
- pipelineForMerging = std::move(pipelineForTargetedShards);
- pipelineForTargetedShards = pipelineForMerging->splitForSharded();
+ splitPipeline = cluster_aggregation_planner::splitPipeline(std::move(pipeline));
}
// Generate the command object for the targeted shards.
- BSONObj targetedCommand = createCommandForTargetedShards(
- opCtx, aggRequest, originalCmdObj, pipelineForTargetedShards, collationObj, atClusterTime);
+ BSONObj targetedCommand = splitPipeline
+ ? createCommandForTargetedShards(opCtx,
+ aggRequest,
+ originalCmdObj,
+ splitPipeline->shardsPipeline,
+ collationObj,
+ atClusterTime)
+ : createCommandForTargetedShards(
+ opCtx, aggRequest, originalCmdObj, pipeline, collationObj, atClusterTime);
// Refresh the shard registry if we're targeting all shards. We need the shard registry
// to be at least as current as the logical time used when creating the command for
@@ -480,11 +453,40 @@ DispatchShardPipelineResults dispatchShardPipeline(
return DispatchShardPipelineResults{needsPrimaryShardMerge,
std::move(cursors),
std::move(shardResults),
- std::move(pipelineForTargetedShards),
- std::move(pipelineForMerging),
+ std::move(splitPipeline),
+ std::move(pipeline),
targetedCommand};
}
+Status appendExplainResults(DispatchShardPipelineResults&& dispatchResults,
+ const boost::intrusive_ptr<ExpressionContext>& mergeCtx,
+ BSONObjBuilder* result) {
+ if (dispatchResults.splitPipeline) {
+ auto* mergePipeline = dispatchResults.splitPipeline->mergePipeline.get();
+ *result << "mergeType"
+ << (mergePipeline->canRunOnMongos()
+ ? "mongos"
+ : mergePipeline->needsPrimaryShardMerger() ? "primaryShard" : "anyShard")
+ << "splitPipeline"
+ << Document{{"shardsPart",
+ dispatchResults.splitPipeline->shardsPipeline->writeExplainOps(
+ *mergeCtx->explain)},
+ {"mergerPart", mergePipeline->writeExplainOps(*mergeCtx->explain)}};
+ } else {
+ *result << "splitPipeline" << BSONNULL;
+ }
+
+ BSONObjBuilder shardExplains(result->subobjStart("shards"));
+ for (const auto& shardResult : dispatchResults.remoteExplainOutput) {
+ invariant(shardResult.shardHostAndPort);
+ shardExplains.append(shardResult.shardId.toString(),
+ BSON("host" << shardResult.shardHostAndPort->toString() << "stages"
+ << shardResult.swResponse.getValue().data["stages"]));
+ }
+
+ return Status::OK();
+}
+
Shard::CommandResponse establishMergingShardCursor(OperationContext* opCtx,
const NamespaceString& nss,
const BSONObj mergeCmdObj,
@@ -500,20 +502,18 @@ Shard::CommandResponse establishMergingShardCursor(OperationContext* opCtx,
Shard::RetryPolicy::kIdempotent));
}
-BSONObj establishMergingMongosCursor(OperationContext* opCtx,
- const AggregationRequest& request,
- const NamespaceString& requestedNss,
- BSONObj cmdToRunOnNewShards,
- const LiteParsedPipeline& liteParsedPipeline,
- std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging,
- std::vector<RemoteCursor> cursors) {
+BSONObj establishMergingMongosCursor(
+ OperationContext* opCtx,
+ const AggregationRequest& request,
+ const NamespaceString& requestedNss,
+ BSONObj cmdToRunOnNewShards,
+ const LiteParsedPipeline& liteParsedPipeline,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging) {
ClusterClientCursorParams params(requestedNss, ReadPreferenceSetting::get(opCtx));
params.originatingCommandObj = CurOp::get(opCtx)->opDescription().getOwned();
params.tailableMode = pipelineForMerging->getContext()->tailableMode;
- params.mergePipeline = std::move(pipelineForMerging);
- params.remotes = std::move(cursors);
// A batch size of 0 is legal for the initial aggregate, but not valid for getMores, the batch
// size we pass here is used for getMores, so do not specify a batch size if the initial request
// had a batch size of 0.
@@ -523,25 +523,8 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx,
params.lsid = opCtx->getLogicalSessionId();
params.txnNumber = opCtx->getTxnNumber();
- if (liteParsedPipeline.hasChangeStream()) {
- // For change streams, we need to set up a custom stage to establish cursors on new shards
- // when they are added. Be careful to extract the targeted shard IDs before the remote
- // cursors are transferred from the ClusterClientCursorParams to the AsyncResultsMerger.
- std::vector<ShardId> shardIds;
- for (const auto& remote : params.remotes) {
- shardIds.emplace_back(remote.getShardId().toString());
- }
-
- params.createCustomCursorSource = [cmdToRunOnNewShards,
- shardIds](OperationContext* opCtx,
- executor::TaskExecutor* executor,
- ClusterClientCursorParams* params) {
- return stdx::make_unique<RouterStageUpdateOnAddShard>(
- opCtx, executor, params, std::move(shardIds), cmdToRunOnNewShards);
- };
- }
- auto ccc = ClusterClientCursorImpl::make(
- opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), std::move(params));
+ auto ccc = cluster_aggregation_planner::buildClusterCursor(
+ opCtx, std::move(pipelineForMerging), std::move(params));
auto cursorState = ClusterCursorManager::CursorState::NotExhausted;
@@ -726,17 +709,15 @@ std::pair<BSONObj, boost::optional<UUID>> getCollationAndUUID(
}
ShardId pickMergingShard(OperationContext* opCtx,
- const DispatchShardPipelineResults& dispatchResults,
+ bool needsPrimaryShardMerge,
+ const std::vector<ShardId>& targetedShards,
ShardId primaryShard) {
auto& prng = opCtx->getClient()->getPrng();
// If we cannot merge on mongoS, establish the merge cursor on a shard. Perform the merging
// command on random shard, unless the pipeline dictates that it needs to be run on the primary
// shard for the database.
- return dispatchResults.needsPrimaryShardMerge
- ? primaryShard
- : dispatchResults.remoteCursors[prng.nextInt32(dispatchResults.remoteCursors.size())]
- .getShardId()
- .toString();
+ return needsPrimaryShardMerge ? primaryShard
+ : targetedShards[prng.nextInt32(targetedShards.size())];
}
// "Resolve" involved namespaces and verify that none of them are sharded unless allowed by the
@@ -798,7 +779,6 @@ Status runPipelineOnMongoS(const boost::intrusive_ptr<ExpressionContext>& expCtx
BSONObj cmdObj,
const LiteParsedPipeline& litePipe,
std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
- std::vector<RemoteCursor>&& cursors,
BSONObjBuilder* result) {
// We should never receive a pipeline intended for the shards, or which cannot run on mongoS.
invariant(!pipeline->isSplitForShards());
@@ -807,8 +787,7 @@ Status runPipelineOnMongoS(const boost::intrusive_ptr<ExpressionContext>& expCtx
const auto& requestedNss = namespaces.requestedNss;
const auto opCtx = expCtx->opCtx;
- // If this is an unsplit mongoS-only pipeline, verify that the first stage can produce input for
- // the remainder of the pipeline.
+ // Verify that the first stage can produce input for the remainder of the pipeline.
uassert(ErrorCodes::IllegalOperation,
str::stream() << "Aggregation pipeline must be run on mongoS, but "
<< pipeline->getSources().front()->getSourceName()
@@ -816,7 +795,6 @@ Status runPipelineOnMongoS(const boost::intrusive_ptr<ExpressionContext>& expCtx
pipeline->isSplitForMerge() ||
!pipeline->getSources().front()->constraints().requiresInputDocSource);
- // If this is an explain and the pipeline is not split, write the explain output and return.
if (expCtx->explain && !pipeline->isSplitForMerge()) {
*result << "splitPipeline" << BSONNULL << "mongos"
<< Document{{"host", getHostNameCachedAndPort()},
@@ -826,7 +804,7 @@ Status runPipelineOnMongoS(const boost::intrusive_ptr<ExpressionContext>& expCtx
// Register the new mongoS cursor, and retrieve the initial batch of results.
auto cursorResponse = establishMergingMongosCursor(
- opCtx, request, requestedNss, cmdObj, litePipe, std::move(pipeline), std::move(cursors));
+ opCtx, request, requestedNss, cmdObj, litePipe, std::move(pipeline));
// We don't need to storePossibleCursor or propagate writeConcern errors; an $out pipeline
// can never run on mongoS. Filter the command response and return immediately.
@@ -840,25 +818,38 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& ex
BSONObj cmdObj,
const LiteParsedPipeline& litePipe,
const boost::optional<CachedCollectionRoutingInfo>& routingInfo,
- DispatchShardPipelineResults& shardDispatchResults,
+ DispatchShardPipelineResults&& shardDispatchResults,
BSONObjBuilder* result) {
// We should never be in a situation where we call this function on a non-merge pipeline.
- auto& mergingPipeline = shardDispatchResults.pipelineForMerging;
- invariant(mergingPipeline && mergingPipeline->isSplitForMerge());
+ invariant(shardDispatchResults.splitPipeline);
+ auto* mergePipeline = shardDispatchResults.splitPipeline->mergePipeline.get();
+ auto* opCtx = expCtx->opCtx;
+
+ std::vector<ShardId> targetedShards;
+ targetedShards.reserve(shardDispatchResults.remoteCursors.size());
+ for (auto&& remoteCursor : shardDispatchResults.remoteCursors) {
+ targetedShards.emplace_back(remoteCursor.getShardId().toString());
+ }
- const auto opCtx = expCtx->opCtx;
+ cluster_aggregation_planner::addMergeCursorsSource(
+ mergePipeline,
+ litePipe,
+ shardDispatchResults.commandForTargetedShards,
+ std::move(shardDispatchResults.remoteCursors),
+ targetedShards,
+ shardDispatchResults.splitPipeline->shardCursorsSortSpec,
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor());
// First, check whether we can merge on the mongoS. If the merge pipeline MUST run on mongoS,
// then ignore the internalQueryProhibitMergingOnMongoS parameter.
- if (mergingPipeline->requiredToRunOnMongos() ||
- (!internalQueryProhibitMergingOnMongoS.load() && mergingPipeline->canRunOnMongos())) {
+ if (mergePipeline->requiredToRunOnMongos() ||
+ (!internalQueryProhibitMergingOnMongoS.load() && mergePipeline->canRunOnMongos())) {
return runPipelineOnMongoS(expCtx,
namespaces,
request,
shardDispatchResults.commandForTargetedShards,
litePipe,
- std::move(mergingPipeline),
- std::move(shardDispatchResults.remoteCursors),
+ std::move(shardDispatchResults.splitPipeline->mergePipeline),
result);
}
@@ -873,15 +864,12 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& ex
"merging on a shard",
!opCtx->getTxnNumber());
- ShardId mergingShardId =
- pickMergingShard(opCtx, shardDispatchResults, routingInfo->db().primaryId());
-
- cluster_aggregation_planner::addMergeCursorsSource(
- mergingPipeline.get(),
- std::move(shardDispatchResults.remoteCursors),
- Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor());
+ ShardId mergingShardId = pickMergingShard(opCtx,
+ shardDispatchResults.needsPrimaryShardMerge,
+ targetedShards,
+ routingInfo->db().primaryId());
- auto mergeCmdObj = createCommandForMergingShard(request, expCtx, cmdObj, mergingPipeline);
+ auto mergeCmdObj = createCommandForMergingShard(request, expCtx, cmdObj, mergePipeline);
// Dispatch $mergeCursors to the chosen shard, store the resulting cursor, and return.
auto mergeResponse =
@@ -964,7 +952,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
// Check whether the entire pipeline must be run on mongoS.
if (pipeline->requiredToRunOnMongos()) {
return runPipelineOnMongoS(
- expCtx, namespaces, request, cmdObj, litePipe, std::move(pipeline), {}, result);
+ expCtx, namespaces, request, cmdObj, litePipe, std::move(pipeline), result);
}
// If not, split the pipeline as necessary and dispatch to the relevant shards.
@@ -980,18 +968,14 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
// write the results to the output builder, and return immediately.
if (expCtx->explain) {
uassertAllShardsSupportExplain(shardDispatchResults.remoteExplainOutput);
- return appendExplainResults(std::move(shardDispatchResults.remoteExplainOutput),
- expCtx,
- shardDispatchResults.pipelineForTargetedShards,
- shardDispatchResults.pipelineForMerging,
- result);
+ return appendExplainResults(std::move(shardDispatchResults), expCtx, result);
}
// If this isn't an explain, then we must have established cursors on at least one shard.
invariant(shardDispatchResults.remoteCursors.size() > 0);
// If we sent the entire pipeline to a single shard, store the remote cursor and return.
- if (!shardDispatchResults.pipelineForTargetedShards->isSplitForShards()) {
+ if (!shardDispatchResults.splitPipeline) {
invariant(shardDispatchResults.remoteCursors.size() == 1);
auto& remoteCursor = shardDispatchResults.remoteCursors.front();
const auto reply = uassertStatusOK(storePossibleCursor(
@@ -1001,8 +985,14 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
}
// If we reach here, we have a merge pipeline to dispatch.
- return dispatchMergingPipeline(
- expCtx, namespaces, request, cmdObj, litePipe, routingInfo, shardDispatchResults, result);
+ return dispatchMergingPipeline(expCtx,
+ namespaces,
+ request,
+ cmdObj,
+ litePipe,
+ routingInfo,
+ std::move(shardDispatchResults),
+ result);
}
void ClusterAggregate::uassertAllShardsSupportExplain(
diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript
index f153381d12c..ae0c24ad21e 100644
--- a/src/mongo/s/query/SConscript
+++ b/src/mongo/s/query/SConscript
@@ -40,7 +40,6 @@ env.Library(
"cluster_client_cursor_impl.cpp",
],
LIBDEPS=[
- "$BUILD_DIR/mongo/db/pipeline/pipeline",
"router_exec_stage",
],
)
@@ -48,20 +47,14 @@ env.Library(
env.Library(
target="router_exec_stage",
source=[
- "document_source_router_adapter.cpp",
"router_stage_limit.cpp",
- "router_stage_merge.cpp",
"router_stage_mock.cpp",
"router_stage_pipeline.cpp",
"router_stage_remove_metadata_fields.cpp",
"router_stage_skip.cpp",
- "router_stage_update_on_add_shard.cpp",
],
LIBDEPS=[
- "$BUILD_DIR/mongo/db/query/query_common",
"async_results_merger",
- ],
- LIBDEPS_PRIVATE=[
"$BUILD_DIR/mongo/db/pipeline/pipeline",
],
)
@@ -83,11 +76,13 @@ env.Library(
target="async_results_merger",
source=[
"async_results_merger.cpp",
+ "blocking_results_merger.cpp",
"establish_cursors.cpp",
env.Idlc('async_results_merger_params.idl')[0],
],
LIBDEPS=[
"$BUILD_DIR/mongo/db/query/command_request_response",
+ "$BUILD_DIR/mongo/db/query/query_common",
"$BUILD_DIR/mongo/executor/task_executor_interface",
"$BUILD_DIR/mongo/s/async_requests_sender",
"$BUILD_DIR/mongo/s/client/sharding_client",
@@ -106,9 +101,11 @@ env.Library(
)
env.CppUnitTest(
- target="async_results_merger_test",
+ target="results_merger_test",
source=[
+ "blocking_results_merger_test.cpp",
"async_results_merger_test.cpp",
+ "results_merger_test_fixture.cpp",
],
LIBDEPS=[
'async_results_merger',
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index f5268ac3408..3cc6756c843 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -88,8 +88,7 @@ AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx,
// This strange initialization is to work around the fact that the IDL does not currently
// support a default value for an enum. The default tailable mode should be 'kNormal', but
// since that is not supported we treat boost::none (unspecified) to mean 'kNormal'.
- _tailableMode(params.getTailableMode() ? *params.getTailableMode()
- : TailableModeEnum::kNormal),
+ _tailableMode(params.getTailableMode().value_or(TailableModeEnum::kNormal)),
_params(std::move(params)),
_mergeQueue(MergingComparator(_remotes,
_params.getSort() ? *_params.getSort() : BSONObj(),
@@ -116,12 +115,12 @@ AsyncResultsMerger::~AsyncResultsMerger() {
invariant(_remotesExhausted(lk) || _lifecycleState == kKillComplete);
}
-bool AsyncResultsMerger::remotesExhausted() {
+bool AsyncResultsMerger::remotesExhausted() const {
stdx::lock_guard<stdx::mutex> lk(_mutex);
return _remotesExhausted(lk);
}
-bool AsyncResultsMerger::_remotesExhausted(WithLock) {
+bool AsyncResultsMerger::_remotesExhausted(WithLock) const {
for (const auto& remote : _remotes) {
if (!remote.exhausted()) {
return false;
@@ -769,36 +768,4 @@ bool AsyncResultsMerger::MergingComparator::operator()(const size_t& lhs, const
_sort) > 0;
}
-void AsyncResultsMerger::blockingKill(OperationContext* opCtx) {
- auto killEvent = kill(opCtx);
- if (!killEvent) {
- // We are shutting down.
- return;
- }
- _executor->waitForEvent(killEvent);
-}
-
-StatusWith<ClusterQueryResult> AsyncResultsMerger::blockingNext() {
- while (!ready()) {
- auto nextEventStatus = nextEvent();
- if (!nextEventStatus.isOK()) {
- return nextEventStatus.getStatus();
- }
- auto event = nextEventStatus.getValue();
-
- // Block until there are further results to return.
- auto status = _executor->waitForEvent(_opCtx, event);
-
- if (!status.isOK()) {
- return status.getStatus();
- }
-
- // We have not provided a deadline, so if the wait returns without interruption, we do not
- // expect to have timed out.
- invariant(status.getValue() == stdx::cv_status::no_timeout);
- }
-
- return nextReady();
-}
-
} // namespace mongo
diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h
index 5f8a18194d2..488e03d2ee5 100644
--- a/src/mongo/s/query/async_results_merger.h
+++ b/src/mongo/s/query/async_results_merger.h
@@ -109,7 +109,7 @@ public:
/**
* Returns true if all of the remote cursors are exhausted.
*/
- bool remotesExhausted();
+ bool remotesExhausted() const;
/**
* Sets the maxTimeMS value that the ARM should forward with any internally issued getMore
@@ -167,12 +167,6 @@ public:
StatusWith<ClusterQueryResult> nextReady();
/**
- * Blocks until the next result is ready, all remote cursors are exhausted, or there is an
- * error.
- */
- StatusWith<ClusterQueryResult> blockingNext();
-
- /**
* Schedules remote work as required in order to make further results available. If there is an
* error in scheduling this work, returns a non-ok status. On success, returns an event handle.
* The caller can pass this event handle to 'executor' in order to be blocked until further
@@ -238,11 +232,6 @@ public:
*/
executor::TaskExecutor::EventHandle kill(OperationContext* opCtx);
- /**
- * A blocking version of kill() that will not return until this is safe to destroy.
- */
- void blockingKill(OperationContext*);
-
private:
/**
* We instantiate one of these per remote host. It contains the buffer of results we've
@@ -346,7 +335,7 @@ private:
/**
* Checks whether or not the remote cursors are all exhausted.
*/
- bool _remotesExhausted(WithLock);
+ bool _remotesExhausted(WithLock) const;
//
// Helpers for ready().
@@ -433,7 +422,7 @@ private:
AsyncResultsMergerParams _params;
// Must be acquired before accessing any data members (other than _params, which is read-only).
- stdx::mutex _mutex;
+ mutable stdx::mutex _mutex;
// Data tracking the state of our communication with each of the remote nodes.
std::vector<RemoteCursorData> _remotes;
diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp
index 7960d22f018..b852cf33f79 100644
--- a/src/mongo/s/query/async_results_merger_test.cpp
+++ b/src/mongo/s/query/async_results_merger_test.cpp
@@ -30,18 +30,13 @@
#include "mongo/s/query/async_results_merger.h"
-#include "mongo/client/remote_command_targeter_factory_mock.h"
-#include "mongo/client/remote_command_targeter_mock.h"
#include "mongo/db/json.h"
#include "mongo/db/query/cursor_response.h"
#include "mongo/db/query/getmore_request.h"
#include "mongo/db/query/query_request.h"
-#include "mongo/executor/network_interface_mock.h"
#include "mongo/executor/task_executor.h"
-#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
-#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/client/shard_registry.h"
-#include "mongo/s/sharding_router_test_fixture.h"
+#include "mongo/s/query/results_merger_test_fixture.h"
#include "mongo/stdx/memory.h"
#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
@@ -50,212 +45,11 @@ namespace mongo {
namespace {
-using executor::NetworkInterfaceMock;
-using executor::RemoteCommandRequest;
-using executor::RemoteCommandResponse;
-
-using ResponseStatus = executor::TaskExecutor::ResponseStatus;
-
-const HostAndPort kTestConfigShardHost = HostAndPort("FakeConfigHost", 12345);
-const std::vector<ShardId> kTestShardIds = {
- ShardId("FakeShard1"), ShardId("FakeShard2"), ShardId("FakeShard3")};
-const std::vector<HostAndPort> kTestShardHosts = {HostAndPort("FakeShard1Host", 12345),
- HostAndPort("FakeShard2Host", 12345),
- HostAndPort("FakeShard3Host", 12345)};
-
-const NamespaceString kTestNss("testdb.testcoll");
-
LogicalSessionId parseSessionIdFromCmd(BSONObj cmdObj) {
return LogicalSessionId::parse(IDLParserErrorContext("lsid"), cmdObj["lsid"].Obj());
}
-class AsyncResultsMergerTest : public ShardingTestFixture {
-public:
- AsyncResultsMergerTest() {}
-
- void setUp() override {
- setRemote(HostAndPort("ClientHost", 12345));
-
- configTargeter()->setFindHostReturnValue(kTestConfigShardHost);
-
- std::vector<ShardType> shards;
-
- for (size_t i = 0; i < kTestShardIds.size(); i++) {
- ShardType shardType;
- shardType.setName(kTestShardIds[i].toString());
- shardType.setHost(kTestShardHosts[i].toString());
-
- shards.push_back(shardType);
-
- std::unique_ptr<RemoteCommandTargeterMock> targeter(
- stdx::make_unique<RemoteCommandTargeterMock>());
- targeter->setConnectionStringReturnValue(ConnectionString(kTestShardHosts[i]));
- targeter->setFindHostReturnValue(kTestShardHosts[i]);
-
- targeterFactory()->addTargeterToReturn(ConnectionString(kTestShardHosts[i]),
- std::move(targeter));
- }
-
- setupShards(shards);
- }
-
-protected:
- /**
- * Constructs an ARM with the given vector of existing cursors.
- *
- * If 'findCmd' is not set, the default AsyncResultsMergerParams are used.
- * Otherwise, the 'findCmd' is used to construct the AsyncResultsMergerParams.
- *
- * 'findCmd' should not have a 'batchSize', since the find's batchSize is used just in the
- * initial find. The getMore 'batchSize' can be passed in through 'getMoreBatchSize.'
- */
- std::unique_ptr<AsyncResultsMerger> makeARMFromExistingCursors(
- std::vector<RemoteCursor> remoteCursors,
- boost::optional<BSONObj> findCmd = boost::none,
- boost::optional<std::int64_t> getMoreBatchSize = boost::none) {
- AsyncResultsMergerParams params;
- params.setNss(kTestNss);
- params.setRemotes(std::move(remoteCursors));
-
-
- if (findCmd) {
- const auto qr = unittest::assertGet(
- QueryRequest::makeFromFindCommand(kTestNss, *findCmd, false /* isExplain */));
- if (!qr->getSort().isEmpty()) {
- params.setSort(qr->getSort().getOwned());
- }
-
- if (getMoreBatchSize) {
- params.setBatchSize(getMoreBatchSize);
- } else {
- params.setBatchSize(qr->getBatchSize()
- ? boost::optional<std::int64_t>(
- static_cast<std::int64_t>(*qr->getBatchSize()))
- : boost::none);
- }
- params.setTailableMode(qr->getTailableMode());
- params.setAllowPartialResults(qr->isAllowPartialResults());
- }
-
- OperationSessionInfo sessionInfo;
- sessionInfo.setSessionId(operationContext()->getLogicalSessionId());
- sessionInfo.setTxnNumber(operationContext()->getTxnNumber());
- params.setOperationSessionInfo(sessionInfo);
-
- return stdx::make_unique<AsyncResultsMerger>(
- operationContext(), executor(), std::move(params));
- }
-
- /**
- * Schedules a "CommandOnShardedViewNotSupportedOnMongod" error response w/ view definition.
- */
- void scheduleNetworkViewResponse(const std::string& ns, const std::string& pipelineJsonArr) {
- BSONObjBuilder viewDefBob;
- viewDefBob.append("ns", ns);
- viewDefBob.append("pipeline", fromjson(pipelineJsonArr));
-
- BSONObjBuilder bob;
- bob.append("resolvedView", viewDefBob.obj());
- bob.append("ok", 0.0);
- bob.append("errmsg", "Command on view must be executed by mongos");
- bob.append("code", 169);
-
- std::vector<BSONObj> batch = {bob.obj()};
- scheduleNetworkResponseObjs(batch);
- }
-
- /**
- * Schedules a list of cursor responses to be returned by the mock network.
- */
- void scheduleNetworkResponses(std::vector<CursorResponse> responses) {
- std::vector<BSONObj> objs;
- for (const auto& cursorResponse : responses) {
- // For tests of the AsyncResultsMerger, all CursorRepsonses scheduled by the tests are
- // subsequent responses, since the AsyncResultsMerger will only ever run getMores.
- objs.push_back(cursorResponse.toBSON(CursorResponse::ResponseType::SubsequentResponse));
- }
- scheduleNetworkResponseObjs(objs);
- }
-
- /**
- * Schedules a list of raw BSON command responses to be returned by the mock network.
- */
- void scheduleNetworkResponseObjs(std::vector<BSONObj> objs) {
- executor::NetworkInterfaceMock* net = network();
- net->enterNetwork();
- for (const auto& obj : objs) {
- ASSERT_TRUE(net->hasReadyRequests());
- Milliseconds millis(0);
- RemoteCommandResponse response(obj, millis);
- executor::TaskExecutor::ResponseStatus responseStatus(response);
- net->scheduleResponse(net->getNextReadyRequest(), net->now(), responseStatus);
- }
- net->runReadyNetworkOperations();
- net->exitNetwork();
- }
-
- RemoteCommandRequest getNthPendingRequest(size_t n) {
- executor::NetworkInterfaceMock* net = network();
- net->enterNetwork();
- ASSERT_TRUE(net->hasReadyRequests());
- NetworkInterfaceMock::NetworkOperationIterator noi = net->getNthUnscheduledRequest(n);
- RemoteCommandRequest retRequest = noi->getRequest();
- net->exitNetwork();
- return retRequest;
- }
-
- bool networkHasReadyRequests() {
- NetworkInterfaceMock::InNetworkGuard guard(network());
- return guard->hasReadyRequests();
- }
-
- void scheduleErrorResponse(ResponseStatus rs) {
- invariant(!rs.isOK());
- rs.elapsedMillis = Milliseconds(0);
- executor::NetworkInterfaceMock* net = network();
- net->enterNetwork();
- ASSERT_TRUE(net->hasReadyRequests());
- net->scheduleResponse(net->getNextReadyRequest(), net->now(), rs);
- net->runReadyNetworkOperations();
- net->exitNetwork();
- }
-
- void runReadyCallbacks() {
- executor::NetworkInterfaceMock* net = network();
- net->enterNetwork();
- net->runReadyNetworkOperations();
- net->exitNetwork();
- }
-
- void blackHoleNextRequest() {
- executor::NetworkInterfaceMock* net = network();
- net->enterNetwork();
- ASSERT_TRUE(net->hasReadyRequests());
- net->blackHole(net->getNextReadyRequest());
- net->exitNetwork();
- }
-};
-
-void assertKillCusorsCmdHasCursorId(const BSONObj& killCmd, CursorId cursorId) {
- ASSERT_TRUE(killCmd.hasElement("killCursors"));
- ASSERT_EQ(killCmd["cursors"].type(), BSONType::Array);
-
- size_t numCursors = 0;
- for (auto&& cursor : killCmd["cursors"].Obj()) {
- ASSERT_EQ(cursor.type(), BSONType::NumberLong);
- ASSERT_EQ(cursor.numberLong(), cursorId);
- ++numCursors;
- }
- ASSERT_EQ(numCursors, 1u);
-}
-
-RemoteCursor makeRemoteCursor(ShardId shardId, HostAndPort host, CursorResponse response) {
- RemoteCursor remoteCursor;
- remoteCursor.setShardId(std::move(shardId));
- remoteCursor.setHostAndPort(std::move(host));
- remoteCursor.setCursorResponse(std::move(response));
- return remoteCursor;
-}
+using AsyncResultsMergerTest = ResultsMergerTestFixture;
TEST_F(AsyncResultsMergerTest, SingleShardUnsorted) {
std::vector<RemoteCursor> cursors;
@@ -1888,76 +1682,6 @@ TEST_F(AsyncResultsMergerTest, KillShouldNotWaitForRemoteCommandsBeforeSchedulin
executor()->waitForEvent(killEvent);
}
-TEST_F(AsyncResultsMergerTest, ShouldBeAbleToBlockUntilNextResultIsReady) {
- std::vector<RemoteCursor> cursors;
- cursors.emplace_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
-
- // Before any requests are scheduled, ARM is not ready to return results.
- ASSERT_FALSE(arm->ready());
- ASSERT_FALSE(arm->remotesExhausted());
-
- // Issue a blocking wait for the next result asynchronously on a different thread.
- auto future = launchAsync([&]() {
- auto next = unittest::assertGet(arm->blockingNext());
- ASSERT_FALSE(next.isEOF());
- ASSERT_BSONOBJ_EQ(*next.getResult(), BSON("x" << 1));
- next = unittest::assertGet(arm->blockingNext());
- ASSERT_TRUE(next.isEOF());
- });
-
- // Schedule the response to the getMore which will return the next result and mark the cursor as
- // exhausted.
- onCommand([&](const auto& request) {
- ASSERT(request.cmdObj["getMore"]);
- return CursorResponse(kTestNss, 0LL, {BSON("x" << 1)})
- .toBSON(CursorResponse::ResponseType::SubsequentResponse);
- });
-
- future.timed_get(kFutureTimeout);
-}
-
-TEST_F(AsyncResultsMergerTest, ShouldBeInterruptableDuringBlockingNext) {
- std::vector<RemoteCursor> cursors;
- cursors.emplace_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
-
- // Issue a blocking wait for the next result asynchronously on a different thread.
- auto future = launchAsync([&]() {
- auto nextStatus = arm->blockingNext();
- ASSERT_EQ(nextStatus.getStatus(), ErrorCodes::Interrupted);
- });
-
- // Now mark the OperationContext as killed from this thread.
- {
- stdx::lock_guard<Client> lk(*operationContext()->getClient());
- operationContext()->markKilled(ErrorCodes::Interrupted);
- }
- future.timed_get(kFutureTimeout);
- // Be careful not to use a blocking kill here, since the main thread is in charge of running the
- // callbacks, and we'd block on ourselves.
- auto killEvent = arm->kill(operationContext());
-
- assertKillCusorsCmdHasCursorId(getNthPendingRequest(0u).cmdObj, 1);
- runReadyCallbacks();
- executor()->waitForEvent(killEvent);
-}
-
-TEST_F(AsyncResultsMergerTest, ShouldBeAbleToBlockUntilKilled) {
- std::vector<RemoteCursor> cursors;
- cursors.emplace_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
-
- // Before any requests are scheduled, ARM is not ready to return results.
- ASSERT_FALSE(arm->ready());
- ASSERT_FALSE(arm->remotesExhausted());
-
- arm->blockingKill(operationContext());
-}
-
TEST_F(AsyncResultsMergerTest, GetMoresShouldNotIncludeLSIDOrTxnNumberIfNoneSpecified) {
std::vector<RemoteCursor> cursors;
cursors.emplace_back(
diff --git a/src/mongo/s/query/blocking_results_merger.cpp b/src/mongo/s/query/blocking_results_merger.cpp
new file mode 100644
index 00000000000..f5ba2af0bf6
--- /dev/null
+++ b/src/mongo/s/query/blocking_results_merger.cpp
@@ -0,0 +1,140 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/query/find_common.h"
+#include "mongo/s/query/blocking_results_merger.h"
+
+namespace mongo {
+
+BlockingResultsMerger::BlockingResultsMerger(OperationContext* opCtx,
+ AsyncResultsMergerParams&& armParams,
+ executor::TaskExecutor* executor)
+ : _tailableMode(armParams.getTailableMode().value_or(TailableModeEnum::kNormal)),
+ _executor(executor),
+ _arm(opCtx, executor, std::move(armParams)) {}
+
+StatusWith<ClusterQueryResult> BlockingResultsMerger::awaitNextWithTimeout(
+ OperationContext* opCtx, RouterExecStage::ExecContext execCtx) {
+ invariant(_tailableMode == TailableModeEnum::kTailableAndAwaitData);
+ // If we are in kInitialFind or kGetMoreWithAtLeastOneResultInBatch context and the ARM is not
+ // ready, we don't block. Fall straight through to the return statement.
+ while (!_arm.ready() && execCtx == RouterExecStage::ExecContext::kGetMoreNoResultsYet) {
+ auto nextEventStatus = getNextEvent();
+ if (!nextEventStatus.isOK()) {
+ return nextEventStatus.getStatus();
+ }
+ auto event = nextEventStatus.getValue();
+
+ // Block until there are further results to return, or our time limit is exceeded.
+ auto waitStatus =
+ _executor->waitForEvent(opCtx, event, awaitDataState(opCtx).waitForInsertsDeadline);
+
+ if (!waitStatus.isOK()) {
+ return waitStatus.getStatus();
+ }
+ // Swallow timeout errors for tailable awaitData cursors, stash the event that we were
+ // waiting on, and return EOF.
+ if (waitStatus == stdx::cv_status::timeout) {
+ _leftoverEventFromLastTimeout = std::move(event);
+ return ClusterQueryResult{};
+ }
+ }
+
+ // We reach this point either if the ARM is ready, or if the ARM is !ready and we are in
+ // kInitialFind or kGetMoreWithAtLeastOneResultInBatch ExecContext. In the latter case, we
+ // return EOF immediately rather than blocking for further results.
+ return _arm.ready() ? _arm.nextReady() : ClusterQueryResult{};
+}
+
+StatusWith<ClusterQueryResult> BlockingResultsMerger::blockUntilNext(OperationContext* opCtx) {
+ while (!_arm.ready()) {
+ auto nextEventStatus = _arm.nextEvent();
+ if (!nextEventStatus.isOK()) {
+ return nextEventStatus.getStatus();
+ }
+ auto event = nextEventStatus.getValue();
+
+ // Block until there are further results to return.
+ auto status = _executor->waitForEvent(opCtx, event);
+
+ if (!status.isOK()) {
+ return status.getStatus();
+ }
+
+ // We have not provided a deadline, so if the wait returns without interruption, we do not
+ // expect to have timed out.
+ invariant(status.getValue() == stdx::cv_status::no_timeout);
+ }
+
+ return _arm.nextReady();
+}
+StatusWith<ClusterQueryResult> BlockingResultsMerger::next(OperationContext* opCtx,
+ RouterExecStage::ExecContext execCtx) {
+ // Non-tailable and tailable non-awaitData cursors always block until ready(). AwaitData
+ // cursors wait for ready() only until a specified time limit is exceeded.
+ return (_tailableMode == TailableModeEnum::kTailableAndAwaitData
+ ? awaitNextWithTimeout(opCtx, execCtx)
+ : blockUntilNext(opCtx));
+}
+
+StatusWith<executor::TaskExecutor::EventHandle> BlockingResultsMerger::getNextEvent() {
+ // If we abandoned a previous event due to a mongoS-side timeout, wait for it first.
+ if (_leftoverEventFromLastTimeout) {
+ invariant(_tailableMode == TailableModeEnum::kTailableAndAwaitData);
+ // If we have an outstanding event from last time, then we might have to manually schedule
+ // some getMores for the cursors. If a remote response came back while we were between
+ // getMores (from the user to mongos), the response may have been an empty batch, and the
+ // ARM would not be able to ask for the next batch immediately since it is not attached to
+ // an OperationContext. Now that we have a valid OperationContext, we schedule the getMores
+ // ourselves.
+ Status getMoreStatus = _arm.scheduleGetMores();
+ if (!getMoreStatus.isOK()) {
+ return getMoreStatus;
+ }
+
+ // Return the leftover event and clear '_leftoverEventFromLastTimeout'.
+ auto event = _leftoverEventFromLastTimeout;
+ _leftoverEventFromLastTimeout = executor::TaskExecutor::EventHandle();
+ return event;
+ }
+
+ return _arm.nextEvent();
+}
+
+void BlockingResultsMerger::kill(OperationContext* opCtx) {
+ auto killEvent = _arm.kill(opCtx);
+ if (!killEvent) {
+ // We are shutting down.
+ return;
+ }
+ _executor->waitForEvent(killEvent);
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/query/blocking_results_merger.h b/src/mongo/s/query/blocking_results_merger.h
new file mode 100644
index 00000000000..cbc96cbbfc0
--- /dev/null
+++ b/src/mongo/s/query/blocking_results_merger.h
@@ -0,0 +1,113 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/s/query/async_results_merger.h"
+#include "mongo/s/query/router_exec_stage.h"
+
+namespace mongo {
+
+/**
+ * Layers a simpler blocking interface on top of the AsyncResultsMerger from which this
+ * BlockingResultsMerger is constructed.
+ */
+class BlockingResultsMerger {
+public:
+ BlockingResultsMerger(OperationContext* opCtx,
+ AsyncResultsMergerParams&& arm,
+ executor::TaskExecutor*);
+
+ /**
+ * Blocks until the next result is available or an error is detected.
+ */
+ StatusWith<ClusterQueryResult> next(OperationContext*, RouterExecStage::ExecContext);
+
+ Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
+ return _arm.setAwaitDataTimeout(awaitDataTimeout);
+ }
+
+ void reattachToOperationContext(OperationContext* opCtx) {
+ _arm.reattachToOperationContext(opCtx);
+ }
+
+ void detachFromOperationContext() {
+ _arm.detachFromOperationContext();
+ }
+
+ bool remotesExhausted() const {
+ return _arm.remotesExhausted();
+ }
+
+ std::size_t getNumRemotes() const {
+ return _arm.getNumRemotes();
+ }
+
+ void addNewShardCursors(std::vector<RemoteCursor>&& newCursors) {
+ _arm.addNewShardCursors(std::move(newCursors));
+ }
+
+ /**
+ * Blocks until '_arm' has been killed, which involves cleaning up any remote cursors managed
+ * by this results merger.
+ */
+ void kill(OperationContext* opCtx);
+
+private:
+ /**
+ * Awaits the next result from the ARM with no time limit.
+ */
+ StatusWith<ClusterQueryResult> blockUntilNext(OperationContext* opCtx);
+
+ /**
+ * Awaits the next result from the ARM up to the time limit specified on 'opCtx'. If this is the
+ * user's initial find or we have already obtained at least one result for this batch, this
+ * method returns EOF immediately rather than blocking.
+ */
+ StatusWith<ClusterQueryResult> awaitNextWithTimeout(OperationContext* opCtx,
+ RouterExecStage::ExecContext execCtx);
+
+ /**
+ * Returns the next event to wait upon - either a new event from the ARM, or a valid preceding
+ * event which we scheduled during the previous call to next().
+ */
+ StatusWith<executor::TaskExecutor::EventHandle> getNextEvent();
+
+ TailableModeEnum _tailableMode;
+ executor::TaskExecutor* _executor;
+
+ // In a case where we have a tailable, awaitData cursor, a call to 'next()' will block waiting
+ // for an event generated by '_arm', but may time out waiting for this event to be triggered.
+ // While it's waiting, the time limit for the 'awaitData' piece of the cursor may have been
+ // exceeded. When this happens, we use '_leftoverEventFromLastTimeout' to remember the old event
+ // and pick back up waiting for it on the next call to 'next()'.
+ executor::TaskExecutor::EventHandle _leftoverEventFromLastTimeout;
+ AsyncResultsMerger _arm;
+};
+
+} // namespace mongo
diff --git a/src/mongo/s/query/blocking_results_merger_test.cpp b/src/mongo/s/query/blocking_results_merger_test.cpp
new file mode 100644
index 00000000000..821eda4d8ad
--- /dev/null
+++ b/src/mongo/s/query/blocking_results_merger_test.cpp
@@ -0,0 +1,119 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/query/blocking_results_merger.h"
+#include "mongo/s/query/results_merger_test_fixture.h"
+
+namespace mongo {
+
+namespace {
+
+using BlockingResultsMergerTest = ResultsMergerTestFixture;
+
+TEST_F(ResultsMergerTestFixture, ShouldBeAbleToBlockUntilKilled) {
+ std::vector<RemoteCursor> cursors;
+ cursors.emplace_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ BlockingResultsMerger blockingMerger(
+ operationContext(), makeARMParamsFromExistingCursors(std::move(cursors)), executor());
+
+ blockingMerger.kill(operationContext());
+}
+
+TEST_F(ResultsMergerTestFixture, ShouldBeAbleToBlockUntilNextResultIsReady) {
+ std::vector<RemoteCursor> cursors;
+ cursors.emplace_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ BlockingResultsMerger blockingMerger(
+ operationContext(), makeARMParamsFromExistingCursors(std::move(cursors)), executor());
+
+ // Issue a blocking wait for the next result asynchronously on a different thread.
+ auto future = launchAsync([&]() {
+ auto next = unittest::assertGet(
+ blockingMerger.next(operationContext(), RouterExecStage::ExecContext::kInitialFind));
+ ASSERT_FALSE(next.isEOF());
+ ASSERT_BSONOBJ_EQ(*next.getResult(), BSON("x" << 1));
+ next = unittest::assertGet(
+ blockingMerger.next(operationContext(), RouterExecStage::ExecContext::kInitialFind));
+ ASSERT_TRUE(next.isEOF());
+ });
+
+ // Schedule the response to the getMore which will return the next result and mark the cursor as
+ // exhausted.
+ onCommand([&](const auto& request) {
+ ASSERT(request.cmdObj["getMore"]);
+ return CursorResponse(kTestNss, 0LL, {BSON("x" << 1)})
+ .toBSON(CursorResponse::ResponseType::SubsequentResponse);
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ResultsMergerTestFixture, ShouldBeInterruptableDuringBlockingNext) {
+ std::vector<RemoteCursor> cursors;
+ cursors.emplace_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ auto params = makeARMParamsFromExistingCursors(std::move(cursors));
+ params.setTailableMode(TailableModeEnum::kTailableAndAwaitData);
+ BlockingResultsMerger blockingMerger(operationContext(), std::move(params), executor());
+
+ // Issue a blocking wait for the next result asynchronously on a different thread.
+ auto future = launchAsync([&]() {
+ auto nextStatus = blockingMerger.next(operationContext(),
+ RouterExecStage::ExecContext::kGetMoreNoResultsYet);
+ ASSERT_EQ(nextStatus.getStatus(), ErrorCodes::Interrupted);
+ });
+
+ // Now mark the OperationContext as killed from this thread.
+ {
+ stdx::lock_guard<Client> lk(*operationContext()->getClient());
+ operationContext()->markKilled(ErrorCodes::Interrupted);
+ }
+ // Wait for the merger to be interrupted.
+ future.timed_get(kFutureTimeout);
+
+ // Now that we've seen it interrupted, kill it. We have to do this in another thread because
+ // killing a BlockingResultsMerger involves running a killCursors, and this main thread is in
+ // charge of scheduling the response to that request.
+ future = launchAsync([&]() { blockingMerger.kill(operationContext()); });
+ while (!networkHasReadyRequests() || !getNthPendingRequest(0u).cmdObj["killCursors"]) {
+ // Wait for the kill to schedule it's killCursors. It may schedule a getMore first before
+ // cancelling it, so wait until the pending request is actually a killCursors.
+ }
+ assertKillCusorsCmdHasCursorId(getNthPendingRequest(0u).cmdObj, 1);
+
+ // Run the callback for the killCursors. We don't actually inspect the value so we don't have to
+ // schedule a response.
+ runReadyCallbacks();
+ future.timed_get(kFutureTimeout);
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp
index 1b3a665df5e..acda45f66f0 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp
@@ -30,14 +30,8 @@
#include "mongo/s/query/cluster_client_cursor_impl.h"
-#include "mongo/db/pipeline/cluster_aggregation_planner.h"
-#include "mongo/db/pipeline/document_source_limit.h"
-#include "mongo/db/pipeline/document_source_skip.h"
-#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/s/query/router_stage_limit.h"
#include "mongo/s/query/router_stage_merge.h"
-#include "mongo/s/query/router_stage_mock.h"
-#include "mongo/s/query/router_stage_pipeline.h"
#include "mongo/s/query/router_stage_remove_metadata_fields.h"
#include "mongo/s/query/router_stage_skip.h"
#include "mongo/stdx/memory.h"
@@ -70,6 +64,14 @@ ClusterClientCursorGuard ClusterClientCursorImpl::make(OperationContext* opCtx,
return ClusterClientCursorGuard(opCtx, std::move(cursor));
}
+ClusterClientCursorGuard ClusterClientCursorImpl::make(OperationContext* opCtx,
+ std::unique_ptr<RouterExecStage> root,
+ ClusterClientCursorParams&& params) {
+ std::unique_ptr<ClusterClientCursor> cursor(new ClusterClientCursorImpl(
+ opCtx, std::move(root), std::move(params), opCtx->getLogicalSessionId()));
+ return ClusterClientCursorGuard(opCtx, std::move(cursor));
+}
+
ClusterClientCursorImpl::ClusterClientCursorImpl(OperationContext* opCtx,
executor::TaskExecutor* executor,
ClusterClientCursorParams&& params,
@@ -84,7 +86,7 @@ ClusterClientCursorImpl::ClusterClientCursorImpl(OperationContext* opCtx,
}
ClusterClientCursorImpl::ClusterClientCursorImpl(OperationContext* opCtx,
- std::unique_ptr<RouterStageMock> root,
+ std::unique_ptr<RouterExecStage> root,
ClusterClientCursorParams&& params,
boost::optional<LogicalSessionId> lsid)
: _params(std::move(params)), _root(std::move(root)), _lsid(lsid), _opCtx(opCtx) {
@@ -183,81 +185,13 @@ boost::optional<ReadPreferenceSetting> ClusterClientCursorImpl::getReadPreferenc
return _params.readPreference;
}
-namespace {
-
-bool isSkipOrLimit(const boost::intrusive_ptr<DocumentSource>& stage) {
- return (dynamic_cast<DocumentSourceLimit*>(stage.get()) ||
- dynamic_cast<DocumentSourceSkip*>(stage.get()));
-}
-
-bool isAllLimitsAndSkips(Pipeline* pipeline) {
- const auto stages = pipeline->getSources();
- return std::all_of(
- stages.begin(), stages.end(), [&](const auto& stage) { return isSkipOrLimit(stage); });
-}
-
-/**
- * Creates the initial stage to feed data into the execution plan. By default, a RouterExecMerge
- * stage, or a custom stage if specified in 'params->creatCustomMerge'.
- */
-std::unique_ptr<RouterExecStage> createInitialStage(OperationContext* opCtx,
- executor::TaskExecutor* executor,
- ClusterClientCursorParams* params) {
- if (params->createCustomCursorSource) {
- return params->createCustomCursorSource(opCtx, executor, params);
- } else {
- return stdx::make_unique<RouterStageMerge>(opCtx, executor, params);
- }
-}
-
-std::unique_ptr<RouterExecStage> buildPipelinePlan(executor::TaskExecutor* executor,
- ClusterClientCursorParams* params) {
- invariant(params->mergePipeline);
- invariant(!params->skip);
- invariant(!params->limit);
- auto* pipeline = params->mergePipeline.get();
- auto* opCtx = pipeline->getContext()->opCtx;
-
- std::unique_ptr<RouterExecStage> root = createInitialStage(opCtx, executor, params);
- if (!isAllLimitsAndSkips(pipeline)) {
- return stdx::make_unique<RouterStagePipeline>(std::move(root),
- std::move(params->mergePipeline));
- }
-
- // After extracting an optional leading $sort, the pipeline consisted entirely of $skip and
- // $limit stages. Avoid creating a RouterStagePipeline (which will go through an expensive
- // conversion from BSONObj -> Document for each result), and create a RouterExecStage tree
- // instead.
- while (!pipeline->getSources().empty()) {
- invariant(isSkipOrLimit(pipeline->getSources().front()));
- if (auto skip = pipeline->popFrontWithName(DocumentSourceSkip::kStageName)) {
- root = stdx::make_unique<RouterStageSkip>(
- opCtx, std::move(root), static_cast<DocumentSourceSkip*>(skip.get())->getSkip());
- } else if (auto limit = pipeline->popFrontWithName(DocumentSourceLimit::kStageName)) {
- root = stdx::make_unique<RouterStageLimit>(
- opCtx, std::move(root), static_cast<DocumentSourceLimit*>(limit.get())->getLimit());
- }
- }
- // We are executing the pipeline without using an actual Pipeline, so we need to strip out any
- // Document metadata ourselves.
- return stdx::make_unique<RouterStageRemoveMetadataFields>(
- opCtx, std::move(root), Document::allMetadataFieldNames);
-}
-} // namespace
-
std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan(
OperationContext* opCtx, executor::TaskExecutor* executor, ClusterClientCursorParams* params) {
const auto skip = params->skip;
const auto limit = params->limit;
- if (params->mergePipeline) {
- if (auto sort =
- cluster_aggregation_planner::popLeadingMergeSort(params->mergePipeline.get())) {
- params->sort = *sort;
- }
- return buildPipelinePlan(executor, params);
- }
- std::unique_ptr<RouterExecStage> root = createInitialStage(opCtx, executor, params);
+ std::unique_ptr<RouterExecStage> root =
+ std::make_unique<RouterStageMerge>(opCtx, executor, params->extractARMParams());
if (skip) {
root = stdx::make_unique<RouterStageSkip>(opCtx, std::move(root), *skip);
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h
index 36f9d3995c8..04e97cad3d9 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.h
+++ b/src/mongo/s/query/cluster_client_cursor_impl.h
@@ -83,12 +83,21 @@ class ClusterClientCursorImpl final : public ClusterClientCursor {
public:
/**
- * Constructs a CCC whose safe cleanup is ensured by an RAII object.
+ * Constructs a cluster query plan and CCC from the given parameters whose safe cleanup is
+ * ensured by an RAII object.
*/
static ClusterClientCursorGuard make(OperationContext* opCtx,
executor::TaskExecutor* executor,
ClusterClientCursorParams&& params);
+ /**
+ * Constructs a CCC from the given execution tree 'root'. The CCC's safe cleanup is ensured by
+ * an RAII object.
+ */
+ static ClusterClientCursorGuard make(OperationContext* opCtx,
+ std::unique_ptr<RouterExecStage> root,
+ ClusterClientCursorParams&& params);
+
StatusWith<ClusterQueryResult> next(RouterExecStage::ExecContext) final;
void kill(OperationContext* opCtx) final;
@@ -122,12 +131,11 @@ public:
boost::optional<ReadPreferenceSetting> getReadPreference() const final;
public:
- /** private for tests */
/**
* Constructs a CCC whose result set is generated by a mock execution stage.
*/
ClusterClientCursorImpl(OperationContext* opCtx,
- std::unique_ptr<RouterStageMock> root,
+ std::unique_ptr<RouterExecStage> root,
ClusterClientCursorParams&& params,
boost::optional<LogicalSessionId> lsid);
diff --git a/src/mongo/s/query/cluster_client_cursor_params.h b/src/mongo/s/query/cluster_client_cursor_params.h
index c2d300ee19e..a853d26a99f 100644
--- a/src/mongo/s/query/cluster_client_cursor_params.h
+++ b/src/mongo/s/query/cluster_client_cursor_params.h
@@ -123,9 +123,6 @@ struct ClusterClientCursorParams {
// Should be forwarded to the remote hosts in 'cmdObj'.
boost::optional<long long> limit;
- // If set, we use this pipeline to merge the output of aggregations on each remote.
- std::unique_ptr<Pipeline, PipelineDeleter> mergePipeline;
-
// Whether this cursor is tailing a capped collection, and whether it has the awaitData option
// set.
TailableModeEnum tailableMode = TailableModeEnum::kNormal;
@@ -133,12 +130,6 @@ struct ClusterClientCursorParams {
// Set if a readPreference must be respected throughout the lifetime of the cursor.
boost::optional<ReadPreferenceSetting> readPreference;
- // If valid, is called to return the RouterExecStage which becomes the initial source in this
- // cursor's execution plan. Otherwise, a RouterStageMerge is used.
- stdx::function<std::unique_ptr<RouterExecStage>(
- OperationContext*, executor::TaskExecutor*, ClusterClientCursorParams*)>
- createCustomCursorSource;
-
// Whether the client indicated that it is willing to receive partial results in the case of an
// unreachable host.
bool isAllowPartialResults = false;
diff --git a/src/mongo/s/query/document_source_router_adapter.cpp b/src/mongo/s/query/document_source_router_adapter.cpp
deleted file mode 100644
index 26a944ed5cc..00000000000
--- a/src/mongo/s/query/document_source_router_adapter.cpp
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Copyright (C) 2017 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects
- * for all of the code used other than as permitted herein. If you modify
- * file(s) with this exception, you may extend this exception to your
- * version of the file(s), but you are not obligated to do so. If you do not
- * wish to do so, delete this exception statement from your version. If you
- * delete this exception statement from all source files in the program,
- * then also delete it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/s/query/document_source_router_adapter.h"
-
-#include "mongo/db/pipeline/document_source.h"
-#include "mongo/db/pipeline/expression_context.h"
-
-namespace mongo {
-
-boost::intrusive_ptr<DocumentSourceRouterAdapter> DocumentSourceRouterAdapter::create(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- std::unique_ptr<RouterExecStage> childStage) {
- return new DocumentSourceRouterAdapter(expCtx, std::move(childStage));
-}
-
-DocumentSource::GetNextResult DocumentSourceRouterAdapter::getNext() {
- auto next = uassertStatusOK(_child->next(_execContext));
- if (auto nextObj = next.getResult()) {
- return Document::fromBsonWithMetaData(*nextObj);
- }
- return GetNextResult::makeEOF();
-}
-
-void DocumentSourceRouterAdapter::doDispose() {
- _child->kill(pExpCtx->opCtx);
-}
-
-void DocumentSourceRouterAdapter::reattachToOperationContext(OperationContext* opCtx) {
- _child->reattachToOperationContext(opCtx);
-}
-
-void DocumentSourceRouterAdapter::detachFromOperationContext() {
- _child->detachFromOperationContext();
-}
-
-Value DocumentSourceRouterAdapter::serialize(
- boost::optional<ExplainOptions::Verbosity> explain) const {
- invariant(explain); // We shouldn't need to serialize this stage to send it anywhere.
- return Value(); // Return the empty value to hide this stage from explain output.
-}
-
-std::size_t DocumentSourceRouterAdapter::getNumRemotes() const {
- return _child->getNumRemotes();
-}
-
-bool DocumentSourceRouterAdapter::remotesExhausted() {
- return _child->remotesExhausted();
-}
-
-DocumentSourceRouterAdapter::DocumentSourceRouterAdapter(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- std::unique_ptr<RouterExecStage> childStage)
- : DocumentSource(expCtx), _child(std::move(childStage)) {}
-
-} // namespace mongo
diff --git a/src/mongo/s/query/document_source_router_adapter.h b/src/mongo/s/query/document_source_router_adapter.h
deleted file mode 100644
index a7db7734539..00000000000
--- a/src/mongo/s/query/document_source_router_adapter.h
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Copyright (C) 2017 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects
- * for all of the code used other than as permitted herein. If you modify
- * file(s) with this exception, you may extend this exception to your
- * version of the file(s), but you are not obligated to do so. If you do not
- * wish to do so, delete this exception statement from your version. If you
- * delete this exception statement from all source files in the program,
- * then also delete it in the license file.
- */
-
-#pragma once
-
-#include "mongo/s/query/router_exec_stage.h"
-
-#include "mongo/db/pipeline/document_source.h"
-#include "mongo/db/pipeline/pipeline.h"
-
-namespace mongo {
-/**
- * A class that acts as an adapter between the RouterExecStage and DocumentSource interfaces,
- * translating results from an input RouterExecStage into DocumentSource::GetNextResults.
- */
-class DocumentSourceRouterAdapter final : public DocumentSource {
-public:
- static boost::intrusive_ptr<DocumentSourceRouterAdapter> create(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- std::unique_ptr<RouterExecStage> childStage);
-
- StageConstraints constraints(Pipeline::SplitState pipeState) const final {
- return {StreamType::kStreaming,
- PositionRequirement::kFirst,
- HostTypeRequirement::kMongoS,
- DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kNotAllowed,
- TransactionRequirement::kAllowed};
- }
-
- GetNextResult getNext() final;
- void doDispose() final;
- void reattachToOperationContext(OperationContext* opCtx) final;
- void detachFromOperationContext() final;
- Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final;
- bool remotesExhausted();
- std::size_t getNumRemotes() const;
-
- void setExecContext(RouterExecStage::ExecContext execContext) {
- _execContext = execContext;
- }
-
- Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) const {
- return _child->setAwaitDataTimeout(awaitDataTimeout);
- }
-
-private:
- DocumentSourceRouterAdapter(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- std::unique_ptr<RouterExecStage> childStage);
-
- std::unique_ptr<RouterExecStage> _child;
- RouterExecStage::ExecContext _execContext;
-};
-} // namespace mongo
diff --git a/src/mongo/s/query/results_merger_test_fixture.cpp b/src/mongo/s/query/results_merger_test_fixture.cpp
new file mode 100644
index 00000000000..57033523c68
--- /dev/null
+++ b/src/mongo/s/query/results_merger_test_fixture.cpp
@@ -0,0 +1,76 @@
+/**
+ * Copyright 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/client/remote_command_targeter_factory_mock.h"
+#include "mongo/client/remote_command_targeter_mock.h"
+#include "mongo/executor/network_interface_mock.h"
+#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
+#include "mongo/s/catalog/type_shard.h"
+#include "mongo/s/query/results_merger_test_fixture.h"
+
+namespace mongo {
+const HostAndPort ResultsMergerTestFixture::kTestConfigShardHost =
+ HostAndPort("FakeConfigHost", 12345);
+const std::vector<ShardId> ResultsMergerTestFixture::kTestShardIds = {
+ ShardId("FakeShard1"), ShardId("FakeShard2"), ShardId("FakeShard3")};
+const std::vector<HostAndPort> ResultsMergerTestFixture::kTestShardHosts = {
+ HostAndPort("FakeShard1Host", 12345),
+ HostAndPort("FakeShard2Host", 12345),
+ HostAndPort("FakeShard3Host", 12345)};
+
+const NamespaceString ResultsMergerTestFixture::kTestNss = NamespaceString{"testdb.testcoll"};
+
+void ResultsMergerTestFixture::setUp() {
+ setRemote(HostAndPort("ClientHost", 12345));
+
+ configTargeter()->setFindHostReturnValue(kTestConfigShardHost);
+
+ std::vector<ShardType> shards;
+
+ for (size_t i = 0; i < kTestShardIds.size(); i++) {
+ ShardType shardType;
+ shardType.setName(kTestShardIds[i].toString());
+ shardType.setHost(kTestShardHosts[i].toString());
+
+ shards.push_back(shardType);
+
+ std::unique_ptr<RemoteCommandTargeterMock> targeter(
+ stdx::make_unique<RemoteCommandTargeterMock>());
+ targeter->setConnectionStringReturnValue(ConnectionString(kTestShardHosts[i]));
+ targeter->setFindHostReturnValue(kTestShardHosts[i]);
+
+ targeterFactory()->addTargeterToReturn(ConnectionString(kTestShardHosts[i]),
+ std::move(targeter));
+ }
+
+ setupShards(shards);
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/query/results_merger_test_fixture.h b/src/mongo/s/query/results_merger_test_fixture.h
new file mode 100644
index 00000000000..1252f22b793
--- /dev/null
+++ b/src/mongo/s/query/results_merger_test_fixture.h
@@ -0,0 +1,228 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/s/query/async_results_merger.h"
+#include "mongo/s/sharding_router_test_fixture.h"
+
+namespace mongo {
+
+/**
+ * Test fixture which is useful to both the tests for AsyncResultsMerger and BlockingResultsMerger.
+ */
+class ResultsMergerTestFixture : public ShardingTestFixture {
+public:
+ static const HostAndPort kTestConfigShardHost;
+ static const std::vector<ShardId> kTestShardIds;
+ static const std::vector<HostAndPort> kTestShardHosts;
+
+ static const NamespaceString kTestNss;
+
+ ResultsMergerTestFixture() {}
+
+ void setUp() override;
+
+protected:
+ /**
+ * Constructs an AsyncResultsMergerParams object with the given vector of existing cursors.
+ *
+ * If 'findCmd' is not set, the default AsyncResultsMergerParams are used. Otherwise, the
+ * 'findCmd' is used to construct the AsyncResultsMergerParams.
+ *
+ * 'findCmd' should not have a 'batchSize', since the find's batchSize is used just in the
+ * initial find. The getMore 'batchSize' can be passed in through 'getMoreBatchSize.'
+ */
+ AsyncResultsMergerParams makeARMParamsFromExistingCursors(
+ std::vector<RemoteCursor> remoteCursors,
+ boost::optional<BSONObj> findCmd = boost::none,
+ boost::optional<std::int64_t> getMoreBatchSize = boost::none) {
+ AsyncResultsMergerParams params;
+ params.setNss(kTestNss);
+ params.setRemotes(std::move(remoteCursors));
+
+
+ if (findCmd) {
+ const auto qr = unittest::assertGet(
+ QueryRequest::makeFromFindCommand(kTestNss, *findCmd, false /* isExplain */));
+ if (!qr->getSort().isEmpty()) {
+ params.setSort(qr->getSort().getOwned());
+ }
+
+ if (getMoreBatchSize) {
+ params.setBatchSize(getMoreBatchSize);
+ } else {
+ params.setBatchSize(qr->getBatchSize()
+ ? boost::optional<std::int64_t>(
+ static_cast<std::int64_t>(*qr->getBatchSize()))
+ : boost::none);
+ }
+ params.setTailableMode(qr->getTailableMode());
+ params.setAllowPartialResults(qr->isAllowPartialResults());
+ }
+
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(operationContext()->getLogicalSessionId());
+ sessionInfo.setTxnNumber(operationContext()->getTxnNumber());
+ params.setOperationSessionInfo(sessionInfo);
+ return params;
+ }
+ /**
+ * Constructs an ARM with the given vector of existing cursors.
+ *
+ * If 'findCmd' is not set, the default AsyncResultsMergerParams are used.
+ * Otherwise, the 'findCmd' is used to construct the AsyncResultsMergerParams.
+ *
+ * 'findCmd' should not have a 'batchSize', since the find's batchSize is used just in the
+ * initial find. The getMore 'batchSize' can be passed in through 'getMoreBatchSize.'
+ */
+ std::unique_ptr<AsyncResultsMerger> makeARMFromExistingCursors(
+ std::vector<RemoteCursor> remoteCursors,
+ boost::optional<BSONObj> findCmd = boost::none,
+ boost::optional<std::int64_t> getMoreBatchSize = boost::none) {
+
+ return stdx::make_unique<AsyncResultsMerger>(
+ operationContext(),
+ executor(),
+ makeARMParamsFromExistingCursors(std::move(remoteCursors), findCmd, getMoreBatchSize));
+ }
+
+ /**
+ * Schedules a "CommandOnShardedViewNotSupportedOnMongod" error response w/ view definition.
+ */
+ void scheduleNetworkViewResponse(const std::string& ns, const std::string& pipelineJsonArr) {
+ BSONObjBuilder viewDefBob;
+ viewDefBob.append("ns", ns);
+ viewDefBob.append("pipeline", fromjson(pipelineJsonArr));
+
+ BSONObjBuilder bob;
+ bob.append("resolvedView", viewDefBob.obj());
+ bob.append("ok", 0.0);
+ bob.append("errmsg", "Command on view must be executed by mongos");
+ bob.append("code", 169);
+
+ std::vector<BSONObj> batch = {bob.obj()};
+ scheduleNetworkResponseObjs(batch);
+ }
+
+ /**
+ * Schedules a list of cursor responses to be returned by the mock network.
+ */
+ void scheduleNetworkResponses(std::vector<CursorResponse> responses) {
+ std::vector<BSONObj> objs;
+ for (const auto& cursorResponse : responses) {
+ // For tests of the AsyncResultsMerger, all CursorRepsonses scheduled by the tests are
+ // subsequent responses, since the AsyncResultsMerger will only ever run getMores.
+ objs.push_back(cursorResponse.toBSON(CursorResponse::ResponseType::SubsequentResponse));
+ }
+ scheduleNetworkResponseObjs(objs);
+ }
+
+ /**
+ * Schedules a list of raw BSON command responses to be returned by the mock network.
+ */
+ void scheduleNetworkResponseObjs(std::vector<BSONObj> objs) {
+ executor::NetworkInterfaceMock* net = network();
+ net->enterNetwork();
+ for (const auto& obj : objs) {
+ ASSERT_TRUE(net->hasReadyRequests());
+ Milliseconds millis(0);
+ executor::RemoteCommandResponse response(obj, millis);
+ executor::TaskExecutor::ResponseStatus responseStatus(response);
+ net->scheduleResponse(net->getNextReadyRequest(), net->now(), responseStatus);
+ }
+ net->runReadyNetworkOperations();
+ net->exitNetwork();
+ }
+
+ executor::RemoteCommandRequest getNthPendingRequest(size_t n) {
+ executor::NetworkInterfaceMock* net = network();
+ net->enterNetwork();
+ ASSERT_TRUE(net->hasReadyRequests());
+ executor::NetworkInterfaceMock::NetworkOperationIterator noi =
+ net->getNthUnscheduledRequest(n);
+ executor::RemoteCommandRequest retRequest = noi->getRequest();
+ net->exitNetwork();
+ return retRequest;
+ }
+
+ bool networkHasReadyRequests() {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(network());
+ return guard->hasReadyRequests();
+ }
+
+ void scheduleErrorResponse(executor::ResponseStatus rs) {
+ invariant(!rs.isOK());
+ rs.elapsedMillis = Milliseconds(0);
+ executor::NetworkInterfaceMock* net = network();
+ net->enterNetwork();
+ ASSERT_TRUE(net->hasReadyRequests());
+ net->scheduleResponse(net->getNextReadyRequest(), net->now(), rs);
+ net->runReadyNetworkOperations();
+ net->exitNetwork();
+ }
+
+ void runReadyCallbacks() {
+ executor::NetworkInterfaceMock* net = network();
+ net->enterNetwork();
+ net->runReadyNetworkOperations();
+ net->exitNetwork();
+ }
+
+ void blackHoleNextRequest() {
+ executor::NetworkInterfaceMock* net = network();
+ net->enterNetwork();
+ ASSERT_TRUE(net->hasReadyRequests());
+ net->blackHole(net->getNextReadyRequest());
+ net->exitNetwork();
+ }
+
+ void assertKillCusorsCmdHasCursorId(const BSONObj& killCmd, CursorId cursorId) {
+ std::cout << "CHARLIE: " << killCmd;
+ ASSERT_TRUE(killCmd.hasElement("killCursors"));
+ ASSERT_EQ(killCmd["cursors"].type(), BSONType::Array);
+
+ size_t numCursors = 0;
+ for (auto&& cursor : killCmd["cursors"].Obj()) {
+ ASSERT_EQ(cursor.type(), BSONType::NumberLong);
+ ASSERT_EQ(cursor.numberLong(), cursorId);
+ ++numCursors;
+ }
+ ASSERT_EQ(numCursors, 1u);
+ }
+
+ RemoteCursor makeRemoteCursor(ShardId shardId, HostAndPort host, CursorResponse response) {
+ RemoteCursor remoteCursor;
+ remoteCursor.setShardId(std::move(shardId));
+ remoteCursor.setHostAndPort(std::move(host));
+ remoteCursor.setCursorResponse(std::move(response));
+ return remoteCursor;
+ }
+};
+
+} // namespace mongo \ No newline at end of file
diff --git a/src/mongo/s/query/router_stage_merge.cpp b/src/mongo/s/query/router_stage_merge.cpp
deleted file mode 100644
index 967c9f60b35..00000000000
--- a/src/mongo/s/query/router_stage_merge.cpp
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/s/query/router_stage_merge.h"
-
-#include "mongo/db/query/find_common.h"
-#include "mongo/util/scopeguard.h"
-
-namespace mongo {
-
-RouterStageMerge::RouterStageMerge(OperationContext* opCtx,
- executor::TaskExecutor* executor,
- ClusterClientCursorParams* params)
- : RouterExecStage(opCtx),
- _executor(executor),
- _params(params),
- _arm(opCtx, executor, params->extractARMParams()) {}
-
-StatusWith<ClusterQueryResult> RouterStageMerge::next(ExecContext execCtx) {
- // Non-tailable and tailable non-awaitData cursors always block until ready(). AwaitData
- // cursors wait for ready() only until a specified time limit is exceeded.
- return (_params->tailableMode == TailableModeEnum::kTailableAndAwaitData
- ? awaitNextWithTimeout(execCtx)
- : _arm.blockingNext());
-}
-
-StatusWith<ClusterQueryResult> RouterStageMerge::awaitNextWithTimeout(ExecContext execCtx) {
- invariant(_params->tailableMode == TailableModeEnum::kTailableAndAwaitData);
- // If we are in kInitialFind or kGetMoreWithAtLeastOneResultInBatch context and the ARM is not
- // ready, we don't block. Fall straight through to the return statement.
- while (!_arm.ready() && execCtx == ExecContext::kGetMoreNoResultsYet) {
- auto nextEventStatus = getNextEvent();
- if (!nextEventStatus.isOK()) {
- return nextEventStatus.getStatus();
- }
- auto event = nextEventStatus.getValue();
-
- // Block until there are further results to return, or our time limit is exceeded.
- auto waitStatus = _executor->waitForEvent(
- getOpCtx(), event, awaitDataState(getOpCtx()).waitForInsertsDeadline);
-
- if (!waitStatus.isOK()) {
- return waitStatus.getStatus();
- }
- // Swallow timeout errors for tailable awaitData cursors, stash the event that we were
- // waiting on, and return EOF.
- if (waitStatus == stdx::cv_status::timeout) {
- _leftoverEventFromLastTimeout = std::move(event);
- return ClusterQueryResult{};
- }
- }
-
- // We reach this point either if the ARM is ready, or if the ARM is !ready and we are in
- // kInitialFind or kGetMoreWithAtLeastOneResultInBatch ExecContext. In the latter case, we
- // return EOF immediately rather than blocking for further results.
- return _arm.ready() ? _arm.nextReady() : ClusterQueryResult{};
-}
-
-StatusWith<EventHandle> RouterStageMerge::getNextEvent() {
- // If we abandoned a previous event due to a mongoS-side timeout, wait for it first.
- if (_leftoverEventFromLastTimeout) {
- invariant(_params->tailableMode == TailableModeEnum::kTailableAndAwaitData);
- // If we have an outstanding event from last time, then we might have to manually schedule
- // some getMores for the cursors. If a remote response came back while we were between
- // getMores (from the user to mongos), the response may have been an empty batch, and the
- // ARM would not be able to ask for the next batch immediately since it is not attached to
- // an OperationContext. Now that we have a valid OperationContext, we schedule the getMores
- // ourselves.
- Status getMoreStatus = _arm.scheduleGetMores();
- if (!getMoreStatus.isOK()) {
- return getMoreStatus;
- }
-
- // Return the leftover event and clear '_leftoverEventFromLastTimeout'.
- auto event = _leftoverEventFromLastTimeout;
- _leftoverEventFromLastTimeout = EventHandle();
- return event;
- }
-
- return _arm.nextEvent();
-}
-
-void RouterStageMerge::kill(OperationContext* opCtx) {
- _arm.blockingKill(opCtx);
-}
-
-bool RouterStageMerge::remotesExhausted() {
- return _arm.remotesExhausted();
-}
-
-std::size_t RouterStageMerge::getNumRemotes() const {
- return _arm.getNumRemotes();
-}
-
-Status RouterStageMerge::doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) {
- return _arm.setAwaitDataTimeout(awaitDataTimeout);
-}
-
-void RouterStageMerge::addNewShardCursors(std::vector<RemoteCursor>&& newShards) {
- _arm.addNewShardCursors(std::move(newShards));
-}
-
-} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h
index b6bfee146b6..c0a847f7bd2 100644
--- a/src/mongo/s/query/router_stage_merge.h
+++ b/src/mongo/s/query/router_stage_merge.h
@@ -29,74 +29,56 @@
#pragma once
#include "mongo/executor/task_executor.h"
-#include "mongo/s/query/async_results_merger.h"
+#include "mongo/s/query/blocking_results_merger.h"
#include "mongo/s/query/cluster_client_cursor_params.h"
#include "mongo/s/query/router_exec_stage.h"
#include "mongo/util/net/hostandport.h"
namespace mongo {
-namespace {
-using EventHandle = executor::TaskExecutor::EventHandle;
-} // namespace
-
/**
- * Draws results from the AsyncResultsMerger, which is the underlying source of the stream of merged
- * documents manipulated by the RouterExecStage pipeline. Used to present a stream of documents
- * merged from the shards to the stages later in the pipeline.
+ * Serves as an adapter between the RouterExecStage interface and the BlockingResultsMerger
+ * interface, providing a single stream of results populated from many remote streams.
*/
class RouterStageMerge final : public RouterExecStage {
public:
RouterStageMerge(OperationContext* opCtx,
executor::TaskExecutor* executor,
- ClusterClientCursorParams* params);
-
- StatusWith<ClusterQueryResult> next(ExecContext) final;
+ AsyncResultsMergerParams&& armParams)
+ : RouterExecStage(opCtx), _resultsMerger(opCtx, std::move(armParams), executor) {}
- void kill(OperationContext* opCtx) final;
+ StatusWith<ClusterQueryResult> next(ExecContext execCtx) final {
+ return _resultsMerger.next(getOpCtx(), execCtx);
+ }
- bool remotesExhausted() final;
+ void kill(OperationContext* opCtx) final {
+ _resultsMerger.kill(opCtx);
+ }
- std::size_t getNumRemotes() const final;
+ bool remotesExhausted() final {
+ return _resultsMerger.remotesExhausted();
+ }
- /**
- * Adds the cursors in 'newShards' to those being merged by the ARM.
- */
- void addNewShardCursors(std::vector<RemoteCursor>&& newShards);
+ std::size_t getNumRemotes() const final {
+ return _resultsMerger.getNumRemotes();
+ }
protected:
- Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
+ Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final {
+ return _resultsMerger.setAwaitDataTimeout(awaitDataTimeout);
+ }
void doReattachToOperationContext() override {
- _arm.reattachToOperationContext(getOpCtx());
+ _resultsMerger.reattachToOperationContext(getOpCtx());
}
virtual void doDetachFromOperationContext() {
- _arm.detachFromOperationContext();
+ _resultsMerger.detachFromOperationContext();
}
private:
- /**
- * Awaits the next result from the ARM up to a specified time limit. If this is the user's
- * initial find or we have already obtained at least one result for this batch, this method
- * returns EOF immediately rather than blocking.
- */
- StatusWith<ClusterQueryResult> awaitNextWithTimeout(ExecContext execCtx);
-
- /**
- * Returns the next event to wait upon - either a new event from the ARM, or a valid preceding
- * event which we scheduled during the previous call to next().
- */
- StatusWith<EventHandle> getNextEvent();
-
- // Not owned here.
- executor::TaskExecutor* _executor;
- EventHandle _leftoverEventFromLastTimeout;
-
- ClusterClientCursorParams* _params;
-
// Schedules remote work and merges results from 'remotes'.
- AsyncResultsMerger _arm;
+ BlockingResultsMerger _resultsMerger;
};
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_pipeline.cpp b/src/mongo/s/query/router_stage_pipeline.cpp
index 5e94274b9ac..a5a97bdbdbc 100644
--- a/src/mongo/s/query/router_stage_pipeline.cpp
+++ b/src/mongo/s/query/router_stage_pipeline.cpp
@@ -35,26 +35,20 @@
#include "mongo/db/pipeline/document_source_list_local_sessions.h"
#include "mongo/db/pipeline/document_source_merge_cursors.h"
#include "mongo/db/pipeline/expression_context.h"
-#include "mongo/s/query/document_source_router_adapter.h"
namespace mongo {
-RouterStagePipeline::RouterStagePipeline(std::unique_ptr<RouterExecStage> child,
- std::unique_ptr<Pipeline, PipelineDeleter> mergePipeline)
+RouterStagePipeline::RouterStagePipeline(std::unique_ptr<Pipeline, PipelineDeleter> mergePipeline)
: RouterExecStage(mergePipeline->getContext()->opCtx),
- _mergePipeline(std::move(mergePipeline)),
- _mongosOnlyPipeline(!_mergePipeline->isSplitForMerge()) {
- if (!_mongosOnlyPipeline) {
- // Add an adapter to the front of the pipeline to draw results from 'child'.
- _routerAdapter =
- DocumentSourceRouterAdapter::create(_mergePipeline->getContext(), std::move(child)),
- _mergePipeline->addInitialSource(_routerAdapter);
- }
+ _mergePipeline(std::move(mergePipeline)) {
+ invariant(!_mergePipeline->getSources().empty());
+ _mergeCursorsStage =
+ dynamic_cast<DocumentSourceMergeCursors*>(_mergePipeline->getSources().front().get());
}
StatusWith<ClusterQueryResult> RouterStagePipeline::next(RouterExecStage::ExecContext execContext) {
- if (_routerAdapter) {
- _routerAdapter->setExecContext(execContext);
+ if (_mergeCursorsStage) {
+ _mergeCursorsStage->setExecContext(execContext);
}
// Pipeline::getNext will return a boost::optional<Document> or boost::none if EOF.
@@ -85,15 +79,20 @@ void RouterStagePipeline::kill(OperationContext* opCtx) {
}
std::size_t RouterStagePipeline::getNumRemotes() const {
- return _mongosOnlyPipeline ? 0 : _routerAdapter->getNumRemotes();
+ if (_mergeCursorsStage) {
+ return _mergeCursorsStage->getNumRemotes();
+ }
+ return 0;
}
bool RouterStagePipeline::remotesExhausted() {
- return _mongosOnlyPipeline || _routerAdapter->remotesExhausted();
+ return !_mergeCursorsStage || _mergeCursorsStage->remotesExhausted();
}
Status RouterStagePipeline::doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) {
- return _routerAdapter->setAwaitDataTimeout(awaitDataTimeout);
+ invariant(_mergeCursorsStage,
+ "The only cursors which should be tailable are those with remote cursors.");
+ return _mergeCursorsStage->setAwaitDataTimeout(awaitDataTimeout);
}
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_pipeline.h b/src/mongo/s/query/router_stage_pipeline.h
index c14ddf9f80b..43706b42cd9 100644
--- a/src/mongo/s/query/router_stage_pipeline.h
+++ b/src/mongo/s/query/router_stage_pipeline.h
@@ -31,8 +31,8 @@
#include "mongo/s/query/router_exec_stage.h"
#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_merge_cursors.h"
#include "mongo/db/pipeline/pipeline.h"
-#include "mongo/s/query/document_source_router_adapter.h"
namespace mongo {
@@ -42,8 +42,7 @@ namespace mongo {
*/
class RouterStagePipeline final : public RouterExecStage {
public:
- RouterStagePipeline(std::unique_ptr<RouterExecStage> child,
- std::unique_ptr<Pipeline, PipelineDeleter> mergePipeline);
+ RouterStagePipeline(std::unique_ptr<Pipeline, PipelineDeleter> mergePipeline);
StatusWith<ClusterQueryResult> next(RouterExecStage::ExecContext execContext) final;
@@ -61,8 +60,10 @@ protected:
void doDetachFromOperationContext() final;
private:
- boost::intrusive_ptr<DocumentSourceRouterAdapter> _routerAdapter;
std::unique_ptr<Pipeline, PipelineDeleter> _mergePipeline;
- bool _mongosOnlyPipeline;
+
+ // May be null if this pipeline is executing exclusively on mongos and will not contact the
+ // shards at all.
+ boost::intrusive_ptr<DocumentSourceMergeCursors> _mergeCursorsStage;
};
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_update_on_add_shard.h b/src/mongo/s/query/router_stage_update_on_add_shard.h
deleted file mode 100644
index 00ee921e2af..00000000000
--- a/src/mongo/s/query/router_stage_update_on_add_shard.h
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Copyright (C) 2017 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects
- * for all of the code used other than as permitted herein. If you modify
- * file(s) with this exception, you may extend this exception to your
- * version of the file(s), but you are not obligated to do so. If you do not
- * wish to do so, delete this exception statement from your version. If you
- * delete this exception statement from all source files in the program,
- * then also delete it in the license file.
- */
-#pragma once
-
-#include "mongo/executor/task_executor.h"
-#include "mongo/s/query/cluster_client_cursor_params.h"
-#include "mongo/s/query/router_exec_stage.h"
-
-namespace mongo {
-/**
- * Uses a RouterStageMerge to merge results, and monitors the merged stream for special
- * sentinel documents which indicate the the set of cursors needs to be updated. When the
- * sentinel is detected, removes it from the stream and updates the set of cursors.
- *
- * cmdToRunOnNewShards: Command to execute on the new shard to open the cursor.
- */
-class RouterStageUpdateOnAddShard final : public RouterExecStage {
-public:
- RouterStageUpdateOnAddShard(OperationContext* opCtx,
- executor::TaskExecutor* executor,
- ClusterClientCursorParams* params,
- std::vector<ShardId> shardIds,
- BSONObj cmdToRunOnNewShards);
-
- StatusWith<ClusterQueryResult> next(ExecContext) final;
-
-private:
- /**
- * Establish the new cursors and tell the RouterStageMerge about them.
- * obj: The BSONObj which triggered the establishment of the new cursors
- */
- void addNewShardCursors(BSONObj obj);
-
- /**
- * Open the cursors on the new shards.
- */
- std::vector<RemoteCursor> establishShardCursorsOnNewShards(const BSONObj& newShardDetectedObj);
-
- ClusterClientCursorParams* _params;
- std::vector<ShardId> _shardIds;
- BSONObj _cmdToRunOnNewShards;
-};
-}