summaryrefslogtreecommitdiff
path: root/src/mongo/s
diff options
context:
space:
mode:
authorHenrik Edin <henrik.edin@mongodb.com>2018-09-24 14:14:47 -0400
committerHenrik Edin <henrik.edin@mongodb.com>2018-10-03 16:40:40 -0400
commit9b3e7447b40c106ec4b83b176084525c4bb45133 (patch)
tree59f491d32afcdd20cf2807ad5989ce096e434719 /src/mongo/s
parent509f243d99e693e826807a26db703095120bbd73 (diff)
downloadmongo-9b3e7447b40c106ec4b83b176084525c4bb45133.tar.gz
SERVER-37297 Move aggregation pipeline stages that require sharding out of the pipeline lib.
Diffstat (limited to 'src/mongo/s')
-rw-r--r--src/mongo/s/query/SConscript16
-rw-r--r--src/mongo/s/query/cluster_aggregate.cpp2
-rw-r--r--src/mongo/s/query/cluster_aggregate.h2
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner.cpp4
-rw-r--r--src/mongo/s/query/document_source_merge_cursors.cpp141
-rw-r--r--src/mongo/s/query/document_source_merge_cursors.h167
-rw-r--r--src/mongo/s/query/document_source_update_on_add_shard.cpp125
-rw-r--r--src/mongo/s/query/document_source_update_on_add_shard.h98
-rw-r--r--src/mongo/s/query/router_stage_pipeline.cpp2
-rw-r--r--src/mongo/s/query/router_stage_pipeline.h2
10 files changed, 546 insertions, 13 deletions
diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript
index e156eafc303..015ea1c66d1 100644
--- a/src/mongo/s/query/SConscript
+++ b/src/mongo/s/query/SConscript
@@ -64,15 +64,17 @@ env.Library(
env.Library(
target="router_exec_stage",
source=[
- "router_stage_limit.cpp",
- "router_stage_mock.cpp",
- "router_stage_pipeline.cpp",
- "router_stage_remove_metadata_fields.cpp",
- "router_stage_skip.cpp",
+ 'document_source_merge_cursors.cpp',
+ 'document_source_update_on_add_shard.cpp',
+ 'router_stage_limit.cpp',
+ 'router_stage_mock.cpp',
+ 'router_stage_pipeline.cpp',
+ 'router_stage_remove_metadata_fields.cpp',
+ 'router_stage_skip.cpp',
],
LIBDEPS=[
- "async_results_merger",
- "$BUILD_DIR/mongo/db/pipeline/pipeline",
+ '$BUILD_DIR/mongo/db/pipeline/pipeline',
+ 'async_results_merger',
],
)
diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp
index c145908ec5e..14e3059ed73 100644
--- a/src/mongo/s/query/cluster_aggregate.cpp
+++ b/src/mongo/s/query/cluster_aggregate.cpp
@@ -43,7 +43,6 @@
#include "mongo/db/logical_clock.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/pipeline/document_source_change_stream.h"
-#include "mongo/db/pipeline/document_source_merge_cursors.h"
#include "mongo/db/pipeline/document_source_out.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
@@ -65,6 +64,7 @@
#include "mongo/s/query/cluster_client_cursor_params.h"
#include "mongo/s/query/cluster_cursor_manager.h"
#include "mongo/s/query/cluster_query_knobs.h"
+#include "mongo/s/query/document_source_merge_cursors.h"
#include "mongo/s/query/establish_cursors.h"
#include "mongo/s/query/router_stage_pipeline.h"
#include "mongo/s/query/store_possible_cursor.h"
diff --git a/src/mongo/s/query/cluster_aggregate.h b/src/mongo/s/query/cluster_aggregate.h
index 9f45297819c..99adf22b418 100644
--- a/src/mongo/s/query/cluster_aggregate.h
+++ b/src/mongo/s/query/cluster_aggregate.h
@@ -36,10 +36,10 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/aggregation_request.h"
#include "mongo/db/pipeline/document_source.h"
-#include "mongo/db/pipeline/document_source_merge_cursors.h"
#include "mongo/s/async_requests_sender.h"
#include "mongo/s/commands/strategy.h"
#include "mongo/s/query/cluster_client_cursor_params.h"
+#include "mongo/s/query/document_source_merge_cursors.h"
namespace mongo {
diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp
index a19b832cb30..3cf21281b11 100644
--- a/src/mongo/s/query/cluster_aggregation_planner.cpp
+++ b/src/mongo/s/query/cluster_aggregation_planner.cpp
@@ -33,17 +33,17 @@
#include "mongo/db/pipeline/document_source_group.h"
#include "mongo/db/pipeline/document_source_limit.h"
#include "mongo/db/pipeline/document_source_match.h"
-#include "mongo/db/pipeline/document_source_merge_cursors.h"
#include "mongo/db/pipeline/document_source_out.h"
#include "mongo/db/pipeline/document_source_project.h"
#include "mongo/db/pipeline/document_source_skip.h"
#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/document_source_unwind.h"
-#include "mongo/db/pipeline/document_source_update_on_add_shard.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/s/grid.h"
#include "mongo/s/query/cluster_query_knobs.h"
+#include "mongo/s/query/document_source_merge_cursors.h"
+#include "mongo/s/query/document_source_update_on_add_shard.h"
#include "mongo/s/query/router_stage_limit.h"
#include "mongo/s/query/router_stage_pipeline.h"
#include "mongo/s/query/router_stage_remove_metadata_fields.h"
diff --git a/src/mongo/s/query/document_source_merge_cursors.cpp b/src/mongo/s/query/document_source_merge_cursors.cpp
new file mode 100644
index 00000000000..5ee49e8d54b
--- /dev/null
+++ b/src/mongo/s/query/document_source_merge_cursors.cpp
@@ -0,0 +1,141 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/query/document_source_merge_cursors.h"
+
+#include "mongo/db/pipeline/document_source_sort.h"
+#include "mongo/db/query/find_common.h"
+#include "mongo/executor/task_executor_pool.h"
+#include "mongo/s/grid.h"
+
+namespace mongo {
+
+REGISTER_DOCUMENT_SOURCE(mergeCursors,
+ LiteParsedDocumentSourceDefault::parse,
+ DocumentSourceMergeCursors::createFromBson);
+
+constexpr StringData DocumentSourceMergeCursors::kStageName;
+
+DocumentSourceMergeCursors::DocumentSourceMergeCursors(
+ executor::TaskExecutor* executor,
+ AsyncResultsMergerParams armParams,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ boost::optional<BSONObj> ownedParamsSpec)
+ : DocumentSource(expCtx),
+ _armParamsObj(std::move(ownedParamsSpec)),
+ _executor(executor),
+ _armParams(std::move(armParams)) {}
+
+std::size_t DocumentSourceMergeCursors::getNumRemotes() const {
+ if (_armParams) {
+ return _armParams->getRemotes().size();
+ }
+ return _blockingResultsMerger->getNumRemotes();
+}
+
+bool DocumentSourceMergeCursors::remotesExhausted() const {
+ if (_armParams) {
+ // We haven't started iteration yet.
+ return false;
+ }
+ return _blockingResultsMerger->remotesExhausted();
+}
+
+void DocumentSourceMergeCursors::populateMerger() {
+ invariant(!_blockingResultsMerger);
+ invariant(_armParams);
+ _blockingResultsMerger.emplace(pExpCtx->opCtx, std::move(*_armParams), _executor);
+ _armParams = boost::none;
+}
+
+std::unique_ptr<RouterStageMerge> DocumentSourceMergeCursors::convertToRouterStage() {
+ invariant(!_blockingResultsMerger, "Expected conversion to happen before execution");
+ return stdx::make_unique<RouterStageMerge>(pExpCtx->opCtx, _executor, std::move(*_armParams));
+}
+
+DocumentSource::GetNextResult DocumentSourceMergeCursors::getNext() {
+ if (!_blockingResultsMerger) {
+ populateMerger();
+ }
+
+ auto next = uassertStatusOK(_blockingResultsMerger->next(pExpCtx->opCtx, _execContext));
+ if (next.isEOF()) {
+ return GetNextResult::makeEOF();
+ }
+ return Document::fromBsonWithMetaData(*next.getResult());
+}
+
+Value DocumentSourceMergeCursors::serialize(
+ boost::optional<ExplainOptions::Verbosity> explain) const {
+ invariant(!_blockingResultsMerger);
+ invariant(_armParams);
+ return Value(Document{{kStageName, _armParams->toBSON()}});
+}
+
+boost::intrusive_ptr<DocumentSource> DocumentSourceMergeCursors::createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) {
+ uassert(17026,
+ "$mergeCursors stage expected an object as argument",
+ elem.type() == BSONType::Object);
+ auto ownedObj = elem.embeddedObject().getOwned();
+ auto armParams = AsyncResultsMergerParams::parse(IDLParserErrorContext(kStageName), ownedObj);
+ return new DocumentSourceMergeCursors(
+ Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ std::move(armParams),
+ expCtx,
+ std::move(ownedObj));
+}
+
+boost::intrusive_ptr<DocumentSourceMergeCursors> DocumentSourceMergeCursors::create(
+ executor::TaskExecutor* executor,
+ AsyncResultsMergerParams params,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx) {
+ return new DocumentSourceMergeCursors(executor, std::move(params), expCtx);
+}
+
+void DocumentSourceMergeCursors::detachFromOperationContext() {
+ if (_blockingResultsMerger) {
+ _blockingResultsMerger->detachFromOperationContext();
+ }
+}
+
+void DocumentSourceMergeCursors::reattachToOperationContext(OperationContext* opCtx) {
+ if (_blockingResultsMerger) {
+ _blockingResultsMerger->reattachToOperationContext(opCtx);
+ }
+}
+
+void DocumentSourceMergeCursors::doDispose() {
+ if (_blockingResultsMerger) {
+ _blockingResultsMerger->kill(pExpCtx->opCtx);
+ }
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/query/document_source_merge_cursors.h b/src/mongo/s/query/document_source_merge_cursors.h
new file mode 100644
index 00000000000..8a2db705ab0
--- /dev/null
+++ b/src/mongo/s/query/document_source_merge_cursors.h
@@ -0,0 +1,167 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/executor/task_executor.h"
+#include "mongo/s/query/blocking_results_merger.h"
+#include "mongo/s/query/router_stage_merge.h"
+
+namespace mongo {
+
+/**
+ * A stage used only internally to merge results that are being gathered from remote hosts, possibly
+ * including this host.
+ *
+ * Does not assume ownership of cursors until the first call to getNext(). This is to allow this
+ * stage to be used on mongos without actually iterating the cursors. For example, when this stage
+ * is parsed on mongos it may later be decided that the merging should happen on one of the shards.
+ * Then this stage is forwarded to the merging shard, and it should not kill the cursors when it
+ * goes out of scope on mongos.
+ */
+class DocumentSourceMergeCursors : public DocumentSource {
+public:
+ static constexpr StringData kStageName = "$mergeCursors"_sd;
+
+ /**
+ * Parses a serialized version of this stage.
+ */
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement, const boost::intrusive_ptr<ExpressionContext>&);
+
+ /**
+ * Creates a new DocumentSourceMergeCursors from the given parameters.
+ */
+ static boost::intrusive_ptr<DocumentSourceMergeCursors> create(
+ executor::TaskExecutor*,
+ AsyncResultsMergerParams,
+ const boost::intrusive_ptr<ExpressionContext>&);
+
+ /**
+ * Extracts the remote cursors and converts the execution machinery from a DocumentSource to a
+ * RouterStage interface. Can only be called at planning time before any call to getNext().
+ */
+ std::unique_ptr<RouterStageMerge> convertToRouterStage();
+
+ const char* getSourceName() const final {
+ return kStageName.rawData();
+ }
+
+ void detachFromOperationContext() final;
+ void reattachToOperationContext(OperationContext*) final;
+
+ /**
+ * Serializes this stage to be sent to perform the merging on a different host.
+ */
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
+
+ StageConstraints constraints(Pipeline::SplitState pipeState) const final {
+ StageConstraints constraints(StreamType::kStreaming,
+ PositionRequirement::kFirst,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed,
+ // TODO SERVER-33683: Permit $mergeCursors with readConcern
+ // level "snapshot".
+ TransactionRequirement::kNotAllowed);
+
+ constraints.requiresInputDocSource = false;
+ return constraints;
+ }
+
+ GetNextResult getNext() final;
+
+ std::size_t getNumRemotes() const;
+
+ bool remotesExhausted() const;
+
+ void setExecContext(RouterExecStage::ExecContext execContext) {
+ _execContext = execContext;
+ }
+
+ Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
+ if (!_blockingResultsMerger) {
+ // In cases where a cursor was established with a batchSize of 0, the first getMore
+ // might specify a custom maxTimeMS (AKA await data timeout). In these cases we will not
+ // have iterated the cursor yet so will not have populated the merger, but need to
+ // remember/track the custom await data timeout. We will soon iterate the cursor, so we
+ // just populate the merger now and let it track the await data timeout itself.
+ populateMerger();
+ }
+ return _blockingResultsMerger->setAwaitDataTimeout(awaitDataTimeout);
+ }
+
+ /**
+ * Adds the specified shard cursors to the set of cursors to be merged. The results from the
+ * new cursors will be returned as normal through getNext().
+ */
+ void addNewShardCursors(std::vector<RemoteCursor>&& newCursors) {
+ invariant(_blockingResultsMerger);
+ _blockingResultsMerger->addNewShardCursors(std::move(newCursors));
+ }
+
+protected:
+ void doDispose() final;
+
+private:
+ DocumentSourceMergeCursors(executor::TaskExecutor*,
+ AsyncResultsMergerParams,
+ const boost::intrusive_ptr<ExpressionContext>&,
+ boost::optional<BSONObj> ownedParamsSpec = boost::none);
+
+ /**
+ * Converts '_armParams' into the execution machinery to merge the cursors. See below for why
+ * this is done lazily. Clears '_armParams' and populates '_blockingResultsMerger'.
+ */
+ void populateMerger();
+
+ // When we have parsed the params out of a BSONObj, the object needs to stay around while the
+ // params are in use. We store them here.
+ boost::optional<BSONObj> _armParamsObj;
+
+ executor::TaskExecutor* _executor;
+
+ // '_blockingResultsMerger' is lazily populated. Until we need to use it, '_armParams' will be
+ // populated with the parameters. Once we start using '_blockingResultsMerger', '_armParams'
+ // will become boost::none. We do this to prevent populating '_blockingResultsMerger' on mongos
+ // before serializing this stage and sending it to a shard to perform the merge. If we always
+ // populated '_blockingResultsMerger', then the destruction of this stage would cause the
+ // cursors within '_blockingResultsMerger' to be killed prematurely. For example, if this stage
+ // is parsed on mongos then forwarded to the shards, it should not kill the cursors when it goes
+ // out of scope on mongos.
+ boost::optional<AsyncResultsMergerParams> _armParams;
+ boost::optional<BlockingResultsMerger> _blockingResultsMerger;
+
+ // The ExecContext is needed because if we're a tailable, awaitData cursor, we only want to
+ // 'await data' if we 1) are in a getMore and 2) don't already have data to return. This context
+ // allows us to determine which situation we're in.
+ RouterExecStage::ExecContext _execContext = RouterExecStage::ExecContext::kInitialFind;
+};
+
+} // namespace mongo
diff --git a/src/mongo/s/query/document_source_update_on_add_shard.cpp b/src/mongo/s/query/document_source_update_on_add_shard.cpp
new file mode 100644
index 00000000000..40da845353a
--- /dev/null
+++ b/src/mongo/s/query/document_source_update_on_add_shard.cpp
@@ -0,0 +1,125 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/s/query/document_source_update_on_add_shard.h"
+
+#include <algorithm>
+
+#include "mongo/db/pipeline/document_source_change_stream.h"
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/grid.h"
+#include "mongo/s/query/async_results_merger_params_gen.h"
+#include "mongo/s/query/establish_cursors.h"
+
+namespace mongo {
+namespace {
+
+// Returns true if the change stream document has an 'operationType' of 'newShardDetected'.
+bool needsUpdate(const Document& childResult) {
+ return childResult[DocumentSourceChangeStream::kOperationTypeField].getStringData() ==
+ DocumentSourceChangeStream::kNewShardDetectedOpType;
+}
+} // namespace
+
+boost::intrusive_ptr<DocumentSourceUpdateOnAddShard> DocumentSourceUpdateOnAddShard::create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ executor::TaskExecutor* executor,
+ const boost::intrusive_ptr<DocumentSourceMergeCursors>& mergeCursors,
+ std::vector<ShardId> shardsWithCursors,
+ BSONObj cmdToRunOnNewShards) {
+ return new DocumentSourceUpdateOnAddShard(
+ expCtx, executor, mergeCursors, std::move(shardsWithCursors), cmdToRunOnNewShards);
+}
+
+DocumentSourceUpdateOnAddShard::DocumentSourceUpdateOnAddShard(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ executor::TaskExecutor* executor,
+ const boost::intrusive_ptr<DocumentSourceMergeCursors>& mergeCursors,
+ std::vector<ShardId>&& shardsWithCursors,
+ BSONObj cmdToRunOnNewShards)
+ : DocumentSource(expCtx),
+ _executor(executor),
+ _mergeCursors(mergeCursors),
+ _shardsWithCursors(std::move(shardsWithCursors)),
+ _cmdToRunOnNewShards(cmdToRunOnNewShards.getOwned()) {}
+
+DocumentSource::GetNextResult DocumentSourceUpdateOnAddShard::getNext() {
+ auto childResult = pSource->getNext();
+
+ while (childResult.isAdvanced() && needsUpdate(childResult.getDocument())) {
+ addNewShardCursors(childResult.getDocument());
+ childResult = pSource->getNext();
+ }
+ return childResult;
+}
+
+void DocumentSourceUpdateOnAddShard::addNewShardCursors(const Document& newShardDetectedObj) {
+ _mergeCursors->addNewShardCursors(establishShardCursorsOnNewShards(newShardDetectedObj));
+}
+
+std::vector<RemoteCursor> DocumentSourceUpdateOnAddShard::establishShardCursorsOnNewShards(
+ const Document& newShardDetectedObj) {
+ auto* opCtx = pExpCtx->opCtx;
+ // Reload the shard registry. We need to ensure a reload initiated after calling this method
+ // caused the reload, otherwise we aren't guaranteed to get all the new shards.
+ auto* shardRegistry = Grid::get(opCtx)->shardRegistry();
+ if (!shardRegistry->reload(opCtx)) {
+ // A 'false' return from shardRegistry.reload() means a reload was already in progress and
+ // it completed before reload() returned. So another reload(), regardless of return value,
+ // will ensure a reload started after the first call to reload().
+ shardRegistry->reload(opCtx);
+ }
+
+ std::vector<ShardId> shardIds, newShardIds;
+ shardRegistry->getAllShardIdsNoReload(&shardIds);
+ std::sort(_shardsWithCursors.begin(), _shardsWithCursors.end());
+ std::sort(shardIds.begin(), shardIds.end());
+ std::set_difference(shardIds.begin(),
+ shardIds.end(),
+ _shardsWithCursors.begin(),
+ _shardsWithCursors.end(),
+ std::back_inserter(newShardIds));
+
+ auto cmdObj = DocumentSourceChangeStream::replaceResumeTokenInCommand(
+ _cmdToRunOnNewShards,
+ newShardDetectedObj[DocumentSourceChangeStream::kIdField].getDocument());
+ std::vector<std::pair<ShardId, BSONObj>> requests;
+ for (const auto& shardId : newShardIds) {
+ requests.emplace_back(shardId, cmdObj);
+ _shardsWithCursors.push_back(shardId);
+ }
+ const bool allowPartialResults = false; // partial results are not allowed
+ return establishCursors(opCtx,
+ _executor,
+ pExpCtx->ns,
+ ReadPreferenceSetting::get(opCtx),
+ requests,
+ allowPartialResults);
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/query/document_source_update_on_add_shard.h b/src/mongo/s/query/document_source_update_on_add_shard.h
new file mode 100644
index 00000000000..279729807df
--- /dev/null
+++ b/src/mongo/s/query/document_source_update_on_add_shard.h
@@ -0,0 +1,98 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/executor/task_executor.h"
+#include "mongo/s/query/document_source_merge_cursors.h"
+#include "mongo/s/shard_id.h"
+
+namespace mongo {
+
+/**
+ * An internal stage used as part of the change streams infrastructure to listen for an event
+ * signaling that a new shard now has potentially matching data. For example, this stage will
+ * detect if a collection is being watched and a chunk for that collection migrates to a shard for
+ * the first time. When this event is detected, this stage will establish a new cursor on that
+ * shard and add it to the cursors being merged.
+ */
+class DocumentSourceUpdateOnAddShard final : public DocumentSource {
+public:
+ /**
+ * Creates a new stage which will establish a new cursor and add it to the cursors being merged
+ * by 'mergeCursorsStage' whenever a new shard is detected by a change stream.
+ */
+ static boost::intrusive_ptr<DocumentSourceUpdateOnAddShard> create(
+ const boost::intrusive_ptr<ExpressionContext>&,
+ executor::TaskExecutor*,
+ const boost::intrusive_ptr<DocumentSourceMergeCursors>&,
+ std::vector<ShardId> shardsWithCursors,
+ BSONObj cmdToRunOnNewShards);
+
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final {
+ // We only ever expect to add this stage if the pipeline is being executed locally on a
+ // mongos. In this case, it should never be serialized.
+ MONGO_UNREACHABLE;
+ }
+
+ virtual StageConstraints constraints(Pipeline::SplitState) const {
+ return {StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kMongoS,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed,
+ TransactionRequirement::kNotAllowed,
+ ChangeStreamRequirement::kChangeStreamStage};
+ }
+
+ GetNextResult getNext() final;
+
+private:
+ DocumentSourceUpdateOnAddShard(const boost::intrusive_ptr<ExpressionContext>&,
+ executor::TaskExecutor*,
+ const boost::intrusive_ptr<DocumentSourceMergeCursors>&,
+ std::vector<ShardId>&& shardsWithCursors,
+ BSONObj cmdToRunOnNewShards);
+
+ /**
+ * Establish the new cursors and tell the RouterStageMerge about them.
+ */
+ void addNewShardCursors(const Document& newShardDetectedObj);
+
+ /**
+ * Open the cursors on the new shards.
+ */
+ std::vector<RemoteCursor> establishShardCursorsOnNewShards(const Document& newShardDetectedObj);
+
+ executor::TaskExecutor* _executor;
+ boost::intrusive_ptr<DocumentSourceMergeCursors> _mergeCursors;
+ std::vector<ShardId> _shardsWithCursors;
+ BSONObj _cmdToRunOnNewShards;
+};
+} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_pipeline.cpp b/src/mongo/s/query/router_stage_pipeline.cpp
index a5a97bdbdbc..e90131c027e 100644
--- a/src/mongo/s/query/router_stage_pipeline.cpp
+++ b/src/mongo/s/query/router_stage_pipeline.cpp
@@ -33,8 +33,8 @@
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_change_stream.h"
#include "mongo/db/pipeline/document_source_list_local_sessions.h"
-#include "mongo/db/pipeline/document_source_merge_cursors.h"
#include "mongo/db/pipeline/expression_context.h"
+#include "mongo/s/query/document_source_merge_cursors.h"
namespace mongo {
diff --git a/src/mongo/s/query/router_stage_pipeline.h b/src/mongo/s/query/router_stage_pipeline.h
index 43706b42cd9..ef7e1b39ad1 100644
--- a/src/mongo/s/query/router_stage_pipeline.h
+++ b/src/mongo/s/query/router_stage_pipeline.h
@@ -31,8 +31,8 @@
#include "mongo/s/query/router_exec_stage.h"
#include "mongo/db/pipeline/document_source.h"
-#include "mongo/db/pipeline/document_source_merge_cursors.h"
#include "mongo/db/pipeline/pipeline.h"
+#include "mongo/s/query/document_source_merge_cursors.h"
namespace mongo {