From 9b3e7447b40c106ec4b83b176084525c4bb45133 Mon Sep 17 00:00:00 2001 From: Henrik Edin Date: Mon, 24 Sep 2018 14:14:47 -0400 Subject: SERVER-37297 Move aggregation pipeline stages that require sharding out of the pipeline lib. --- src/mongo/s/query/SConscript | 16 +- src/mongo/s/query/cluster_aggregate.cpp | 2 +- src/mongo/s/query/cluster_aggregate.h | 2 +- src/mongo/s/query/cluster_aggregation_planner.cpp | 4 +- .../s/query/document_source_merge_cursors.cpp | 141 +++++++++++++++++ src/mongo/s/query/document_source_merge_cursors.h | 167 +++++++++++++++++++++ .../query/document_source_update_on_add_shard.cpp | 125 +++++++++++++++ .../s/query/document_source_update_on_add_shard.h | 98 ++++++++++++ src/mongo/s/query/router_stage_pipeline.cpp | 2 +- src/mongo/s/query/router_stage_pipeline.h | 2 +- 10 files changed, 546 insertions(+), 13 deletions(-) create mode 100644 src/mongo/s/query/document_source_merge_cursors.cpp create mode 100644 src/mongo/s/query/document_source_merge_cursors.h create mode 100644 src/mongo/s/query/document_source_update_on_add_shard.cpp create mode 100644 src/mongo/s/query/document_source_update_on_add_shard.h (limited to 'src/mongo/s') diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript index e156eafc303..015ea1c66d1 100644 --- a/src/mongo/s/query/SConscript +++ b/src/mongo/s/query/SConscript @@ -64,15 +64,17 @@ env.Library( env.Library( target="router_exec_stage", source=[ - "router_stage_limit.cpp", - "router_stage_mock.cpp", - "router_stage_pipeline.cpp", - "router_stage_remove_metadata_fields.cpp", - "router_stage_skip.cpp", + 'document_source_merge_cursors.cpp', + 'document_source_update_on_add_shard.cpp', + 'router_stage_limit.cpp', + 'router_stage_mock.cpp', + 'router_stage_pipeline.cpp', + 'router_stage_remove_metadata_fields.cpp', + 'router_stage_skip.cpp', ], LIBDEPS=[ - "async_results_merger", - "$BUILD_DIR/mongo/db/pipeline/pipeline", + '$BUILD_DIR/mongo/db/pipeline/pipeline', + 'async_results_merger', ], ) diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp index c145908ec5e..14e3059ed73 100644 --- a/src/mongo/s/query/cluster_aggregate.cpp +++ b/src/mongo/s/query/cluster_aggregate.cpp @@ -43,7 +43,6 @@ #include "mongo/db/logical_clock.h" #include "mongo/db/operation_context.h" #include "mongo/db/pipeline/document_source_change_stream.h" -#include "mongo/db/pipeline/document_source_merge_cursors.h" #include "mongo/db/pipeline/document_source_out.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/lite_parsed_pipeline.h" @@ -65,6 +64,7 @@ #include "mongo/s/query/cluster_client_cursor_params.h" #include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/s/query/cluster_query_knobs.h" +#include "mongo/s/query/document_source_merge_cursors.h" #include "mongo/s/query/establish_cursors.h" #include "mongo/s/query/router_stage_pipeline.h" #include "mongo/s/query/store_possible_cursor.h" diff --git a/src/mongo/s/query/cluster_aggregate.h b/src/mongo/s/query/cluster_aggregate.h index 9f45297819c..99adf22b418 100644 --- a/src/mongo/s/query/cluster_aggregate.h +++ b/src/mongo/s/query/cluster_aggregate.h @@ -36,10 +36,10 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/pipeline/aggregation_request.h" #include "mongo/db/pipeline/document_source.h" -#include "mongo/db/pipeline/document_source_merge_cursors.h" #include "mongo/s/async_requests_sender.h" #include "mongo/s/commands/strategy.h" #include "mongo/s/query/cluster_client_cursor_params.h" +#include "mongo/s/query/document_source_merge_cursors.h" namespace mongo { diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp index a19b832cb30..3cf21281b11 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.cpp +++ b/src/mongo/s/query/cluster_aggregation_planner.cpp @@ -33,17 +33,17 @@ #include "mongo/db/pipeline/document_source_group.h" #include "mongo/db/pipeline/document_source_limit.h" #include "mongo/db/pipeline/document_source_match.h" -#include "mongo/db/pipeline/document_source_merge_cursors.h" #include "mongo/db/pipeline/document_source_out.h" #include "mongo/db/pipeline/document_source_project.h" #include "mongo/db/pipeline/document_source_skip.h" #include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/document_source_unwind.h" -#include "mongo/db/pipeline/document_source_update_on_add_shard.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/grid.h" #include "mongo/s/query/cluster_query_knobs.h" +#include "mongo/s/query/document_source_merge_cursors.h" +#include "mongo/s/query/document_source_update_on_add_shard.h" #include "mongo/s/query/router_stage_limit.h" #include "mongo/s/query/router_stage_pipeline.h" #include "mongo/s/query/router_stage_remove_metadata_fields.h" diff --git a/src/mongo/s/query/document_source_merge_cursors.cpp b/src/mongo/s/query/document_source_merge_cursors.cpp new file mode 100644 index 00000000000..5ee49e8d54b --- /dev/null +++ b/src/mongo/s/query/document_source_merge_cursors.cpp @@ -0,0 +1,141 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/s/query/document_source_merge_cursors.h" + +#include "mongo/db/pipeline/document_source_sort.h" +#include "mongo/db/query/find_common.h" +#include "mongo/executor/task_executor_pool.h" +#include "mongo/s/grid.h" + +namespace mongo { + +REGISTER_DOCUMENT_SOURCE(mergeCursors, + LiteParsedDocumentSourceDefault::parse, + DocumentSourceMergeCursors::createFromBson); + +constexpr StringData DocumentSourceMergeCursors::kStageName; + +DocumentSourceMergeCursors::DocumentSourceMergeCursors( + executor::TaskExecutor* executor, + AsyncResultsMergerParams armParams, + const boost::intrusive_ptr& expCtx, + boost::optional ownedParamsSpec) + : DocumentSource(expCtx), + _armParamsObj(std::move(ownedParamsSpec)), + _executor(executor), + _armParams(std::move(armParams)) {} + +std::size_t DocumentSourceMergeCursors::getNumRemotes() const { + if (_armParams) { + return _armParams->getRemotes().size(); + } + return _blockingResultsMerger->getNumRemotes(); +} + +bool DocumentSourceMergeCursors::remotesExhausted() const { + if (_armParams) { + // We haven't started iteration yet. + return false; + } + return _blockingResultsMerger->remotesExhausted(); +} + +void DocumentSourceMergeCursors::populateMerger() { + invariant(!_blockingResultsMerger); + invariant(_armParams); + _blockingResultsMerger.emplace(pExpCtx->opCtx, std::move(*_armParams), _executor); + _armParams = boost::none; +} + +std::unique_ptr DocumentSourceMergeCursors::convertToRouterStage() { + invariant(!_blockingResultsMerger, "Expected conversion to happen before execution"); + return stdx::make_unique(pExpCtx->opCtx, _executor, std::move(*_armParams)); +} + +DocumentSource::GetNextResult DocumentSourceMergeCursors::getNext() { + if (!_blockingResultsMerger) { + populateMerger(); + } + + auto next = uassertStatusOK(_blockingResultsMerger->next(pExpCtx->opCtx, _execContext)); + if (next.isEOF()) { + return GetNextResult::makeEOF(); + } + return Document::fromBsonWithMetaData(*next.getResult()); +} + +Value DocumentSourceMergeCursors::serialize( + boost::optional explain) const { + invariant(!_blockingResultsMerger); + invariant(_armParams); + return Value(Document{{kStageName, _armParams->toBSON()}}); +} + +boost::intrusive_ptr DocumentSourceMergeCursors::createFromBson( + BSONElement elem, const boost::intrusive_ptr& expCtx) { + uassert(17026, + "$mergeCursors stage expected an object as argument", + elem.type() == BSONType::Object); + auto ownedObj = elem.embeddedObject().getOwned(); + auto armParams = AsyncResultsMergerParams::parse(IDLParserErrorContext(kStageName), ownedObj); + return new DocumentSourceMergeCursors( + Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(), + std::move(armParams), + expCtx, + std::move(ownedObj)); +} + +boost::intrusive_ptr DocumentSourceMergeCursors::create( + executor::TaskExecutor* executor, + AsyncResultsMergerParams params, + const boost::intrusive_ptr& expCtx) { + return new DocumentSourceMergeCursors(executor, std::move(params), expCtx); +} + +void DocumentSourceMergeCursors::detachFromOperationContext() { + if (_blockingResultsMerger) { + _blockingResultsMerger->detachFromOperationContext(); + } +} + +void DocumentSourceMergeCursors::reattachToOperationContext(OperationContext* opCtx) { + if (_blockingResultsMerger) { + _blockingResultsMerger->reattachToOperationContext(opCtx); + } +} + +void DocumentSourceMergeCursors::doDispose() { + if (_blockingResultsMerger) { + _blockingResultsMerger->kill(pExpCtx->opCtx); + } +} + +} // namespace mongo diff --git a/src/mongo/s/query/document_source_merge_cursors.h b/src/mongo/s/query/document_source_merge_cursors.h new file mode 100644 index 00000000000..8a2db705ab0 --- /dev/null +++ b/src/mongo/s/query/document_source_merge_cursors.h @@ -0,0 +1,167 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source.h" +#include "mongo/executor/task_executor.h" +#include "mongo/s/query/blocking_results_merger.h" +#include "mongo/s/query/router_stage_merge.h" + +namespace mongo { + +/** + * A stage used only internally to merge results that are being gathered from remote hosts, possibly + * including this host. + * + * Does not assume ownership of cursors until the first call to getNext(). This is to allow this + * stage to be used on mongos without actually iterating the cursors. For example, when this stage + * is parsed on mongos it may later be decided that the merging should happen on one of the shards. + * Then this stage is forwarded to the merging shard, and it should not kill the cursors when it + * goes out of scope on mongos. + */ +class DocumentSourceMergeCursors : public DocumentSource { +public: + static constexpr StringData kStageName = "$mergeCursors"_sd; + + /** + * Parses a serialized version of this stage. + */ + static boost::intrusive_ptr createFromBson( + BSONElement, const boost::intrusive_ptr&); + + /** + * Creates a new DocumentSourceMergeCursors from the given parameters. + */ + static boost::intrusive_ptr create( + executor::TaskExecutor*, + AsyncResultsMergerParams, + const boost::intrusive_ptr&); + + /** + * Extracts the remote cursors and converts the execution machinery from a DocumentSource to a + * RouterStage interface. Can only be called at planning time before any call to getNext(). + */ + std::unique_ptr convertToRouterStage(); + + const char* getSourceName() const final { + return kStageName.rawData(); + } + + void detachFromOperationContext() final; + void reattachToOperationContext(OperationContext*) final; + + /** + * Serializes this stage to be sent to perform the merging on a different host. + */ + Value serialize(boost::optional explain = boost::none) const final; + + StageConstraints constraints(Pipeline::SplitState pipeState) const final { + StageConstraints constraints(StreamType::kStreaming, + PositionRequirement::kFirst, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed, + // TODO SERVER-33683: Permit $mergeCursors with readConcern + // level "snapshot". + TransactionRequirement::kNotAllowed); + + constraints.requiresInputDocSource = false; + return constraints; + } + + GetNextResult getNext() final; + + std::size_t getNumRemotes() const; + + bool remotesExhausted() const; + + void setExecContext(RouterExecStage::ExecContext execContext) { + _execContext = execContext; + } + + Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) { + if (!_blockingResultsMerger) { + // In cases where a cursor was established with a batchSize of 0, the first getMore + // might specify a custom maxTimeMS (AKA await data timeout). In these cases we will not + // have iterated the cursor yet so will not have populated the merger, but need to + // remember/track the custom await data timeout. We will soon iterate the cursor, so we + // just populate the merger now and let it track the await data timeout itself. + populateMerger(); + } + return _blockingResultsMerger->setAwaitDataTimeout(awaitDataTimeout); + } + + /** + * Adds the specified shard cursors to the set of cursors to be merged. The results from the + * new cursors will be returned as normal through getNext(). + */ + void addNewShardCursors(std::vector&& newCursors) { + invariant(_blockingResultsMerger); + _blockingResultsMerger->addNewShardCursors(std::move(newCursors)); + } + +protected: + void doDispose() final; + +private: + DocumentSourceMergeCursors(executor::TaskExecutor*, + AsyncResultsMergerParams, + const boost::intrusive_ptr&, + boost::optional ownedParamsSpec = boost::none); + + /** + * Converts '_armParams' into the execution machinery to merge the cursors. See below for why + * this is done lazily. Clears '_armParams' and populates '_blockingResultsMerger'. + */ + void populateMerger(); + + // When we have parsed the params out of a BSONObj, the object needs to stay around while the + // params are in use. We store them here. + boost::optional _armParamsObj; + + executor::TaskExecutor* _executor; + + // '_blockingResultsMerger' is lazily populated. Until we need to use it, '_armParams' will be + // populated with the parameters. Once we start using '_blockingResultsMerger', '_armParams' + // will become boost::none. We do this to prevent populating '_blockingResultsMerger' on mongos + // before serializing this stage and sending it to a shard to perform the merge. If we always + // populated '_blockingResultsMerger', then the destruction of this stage would cause the + // cursors within '_blockingResultsMerger' to be killed prematurely. For example, if this stage + // is parsed on mongos then forwarded to the shards, it should not kill the cursors when it goes + // out of scope on mongos. + boost::optional _armParams; + boost::optional _blockingResultsMerger; + + // The ExecContext is needed because if we're a tailable, awaitData cursor, we only want to + // 'await data' if we 1) are in a getMore and 2) don't already have data to return. This context + // allows us to determine which situation we're in. + RouterExecStage::ExecContext _execContext = RouterExecStage::ExecContext::kInitialFind; +}; + +} // namespace mongo diff --git a/src/mongo/s/query/document_source_update_on_add_shard.cpp b/src/mongo/s/query/document_source_update_on_add_shard.cpp new file mode 100644 index 00000000000..40da845353a --- /dev/null +++ b/src/mongo/s/query/document_source_update_on_add_shard.cpp @@ -0,0 +1,125 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/s/query/document_source_update_on_add_shard.h" + +#include + +#include "mongo/db/pipeline/document_source_change_stream.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/grid.h" +#include "mongo/s/query/async_results_merger_params_gen.h" +#include "mongo/s/query/establish_cursors.h" + +namespace mongo { +namespace { + +// Returns true if the change stream document has an 'operationType' of 'newShardDetected'. +bool needsUpdate(const Document& childResult) { + return childResult[DocumentSourceChangeStream::kOperationTypeField].getStringData() == + DocumentSourceChangeStream::kNewShardDetectedOpType; +} +} // namespace + +boost::intrusive_ptr DocumentSourceUpdateOnAddShard::create( + const boost::intrusive_ptr& expCtx, + executor::TaskExecutor* executor, + const boost::intrusive_ptr& mergeCursors, + std::vector shardsWithCursors, + BSONObj cmdToRunOnNewShards) { + return new DocumentSourceUpdateOnAddShard( + expCtx, executor, mergeCursors, std::move(shardsWithCursors), cmdToRunOnNewShards); +} + +DocumentSourceUpdateOnAddShard::DocumentSourceUpdateOnAddShard( + const boost::intrusive_ptr& expCtx, + executor::TaskExecutor* executor, + const boost::intrusive_ptr& mergeCursors, + std::vector&& shardsWithCursors, + BSONObj cmdToRunOnNewShards) + : DocumentSource(expCtx), + _executor(executor), + _mergeCursors(mergeCursors), + _shardsWithCursors(std::move(shardsWithCursors)), + _cmdToRunOnNewShards(cmdToRunOnNewShards.getOwned()) {} + +DocumentSource::GetNextResult DocumentSourceUpdateOnAddShard::getNext() { + auto childResult = pSource->getNext(); + + while (childResult.isAdvanced() && needsUpdate(childResult.getDocument())) { + addNewShardCursors(childResult.getDocument()); + childResult = pSource->getNext(); + } + return childResult; +} + +void DocumentSourceUpdateOnAddShard::addNewShardCursors(const Document& newShardDetectedObj) { + _mergeCursors->addNewShardCursors(establishShardCursorsOnNewShards(newShardDetectedObj)); +} + +std::vector DocumentSourceUpdateOnAddShard::establishShardCursorsOnNewShards( + const Document& newShardDetectedObj) { + auto* opCtx = pExpCtx->opCtx; + // Reload the shard registry. We need to ensure a reload initiated after calling this method + // caused the reload, otherwise we aren't guaranteed to get all the new shards. + auto* shardRegistry = Grid::get(opCtx)->shardRegistry(); + if (!shardRegistry->reload(opCtx)) { + // A 'false' return from shardRegistry.reload() means a reload was already in progress and + // it completed before reload() returned. So another reload(), regardless of return value, + // will ensure a reload started after the first call to reload(). + shardRegistry->reload(opCtx); + } + + std::vector shardIds, newShardIds; + shardRegistry->getAllShardIdsNoReload(&shardIds); + std::sort(_shardsWithCursors.begin(), _shardsWithCursors.end()); + std::sort(shardIds.begin(), shardIds.end()); + std::set_difference(shardIds.begin(), + shardIds.end(), + _shardsWithCursors.begin(), + _shardsWithCursors.end(), + std::back_inserter(newShardIds)); + + auto cmdObj = DocumentSourceChangeStream::replaceResumeTokenInCommand( + _cmdToRunOnNewShards, + newShardDetectedObj[DocumentSourceChangeStream::kIdField].getDocument()); + std::vector> requests; + for (const auto& shardId : newShardIds) { + requests.emplace_back(shardId, cmdObj); + _shardsWithCursors.push_back(shardId); + } + const bool allowPartialResults = false; // partial results are not allowed + return establishCursors(opCtx, + _executor, + pExpCtx->ns, + ReadPreferenceSetting::get(opCtx), + requests, + allowPartialResults); +} + +} // namespace mongo diff --git a/src/mongo/s/query/document_source_update_on_add_shard.h b/src/mongo/s/query/document_source_update_on_add_shard.h new file mode 100644 index 00000000000..279729807df --- /dev/null +++ b/src/mongo/s/query/document_source_update_on_add_shard.h @@ -0,0 +1,98 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source.h" +#include "mongo/executor/task_executor.h" +#include "mongo/s/query/document_source_merge_cursors.h" +#include "mongo/s/shard_id.h" + +namespace mongo { + +/** + * An internal stage used as part of the change streams infrastructure to listen for an event + * signaling that a new shard now has potentially matching data. For example, this stage will + * detect if a collection is being watched and a chunk for that collection migrates to a shard for + * the first time. When this event is detected, this stage will establish a new cursor on that + * shard and add it to the cursors being merged. + */ +class DocumentSourceUpdateOnAddShard final : public DocumentSource { +public: + /** + * Creates a new stage which will establish a new cursor and add it to the cursors being merged + * by 'mergeCursorsStage' whenever a new shard is detected by a change stream. + */ + static boost::intrusive_ptr create( + const boost::intrusive_ptr&, + executor::TaskExecutor*, + const boost::intrusive_ptr&, + std::vector shardsWithCursors, + BSONObj cmdToRunOnNewShards); + + Value serialize(boost::optional explain) const final { + // We only ever expect to add this stage if the pipeline is being executed locally on a + // mongos. In this case, it should never be serialized. + MONGO_UNREACHABLE; + } + + virtual StageConstraints constraints(Pipeline::SplitState) const { + return {StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kMongoS, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed, + TransactionRequirement::kNotAllowed, + ChangeStreamRequirement::kChangeStreamStage}; + } + + GetNextResult getNext() final; + +private: + DocumentSourceUpdateOnAddShard(const boost::intrusive_ptr&, + executor::TaskExecutor*, + const boost::intrusive_ptr&, + std::vector&& shardsWithCursors, + BSONObj cmdToRunOnNewShards); + + /** + * Establish the new cursors and tell the RouterStageMerge about them. + */ + void addNewShardCursors(const Document& newShardDetectedObj); + + /** + * Open the cursors on the new shards. + */ + std::vector establishShardCursorsOnNewShards(const Document& newShardDetectedObj); + + executor::TaskExecutor* _executor; + boost::intrusive_ptr _mergeCursors; + std::vector _shardsWithCursors; + BSONObj _cmdToRunOnNewShards; +}; +} // namespace mongo diff --git a/src/mongo/s/query/router_stage_pipeline.cpp b/src/mongo/s/query/router_stage_pipeline.cpp index a5a97bdbdbc..e90131c027e 100644 --- a/src/mongo/s/query/router_stage_pipeline.cpp +++ b/src/mongo/s/query/router_stage_pipeline.cpp @@ -33,8 +33,8 @@ #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_change_stream.h" #include "mongo/db/pipeline/document_source_list_local_sessions.h" -#include "mongo/db/pipeline/document_source_merge_cursors.h" #include "mongo/db/pipeline/expression_context.h" +#include "mongo/s/query/document_source_merge_cursors.h" namespace mongo { diff --git a/src/mongo/s/query/router_stage_pipeline.h b/src/mongo/s/query/router_stage_pipeline.h index 43706b42cd9..ef7e1b39ad1 100644 --- a/src/mongo/s/query/router_stage_pipeline.h +++ b/src/mongo/s/query/router_stage_pipeline.h @@ -31,8 +31,8 @@ #include "mongo/s/query/router_exec_stage.h" #include "mongo/db/pipeline/document_source.h" -#include "mongo/db/pipeline/document_source_merge_cursors.h" #include "mongo/db/pipeline/pipeline.h" +#include "mongo/s/query/document_source_merge_cursors.h" namespace mongo { -- cgit v1.2.1