diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2018-07-02 18:23:25 -0400 |
---|---|---|
committer | Charlie Swanson <charlie.swanson@mongodb.com> | 2018-08-15 13:30:12 -0400 |
commit | ee06e6cbe5a75775f76836449558be2f6a98ddfd (patch) | |
tree | d4dbf37110d25f7f4876337a7b1e11abe251fac5 /src/mongo/s | |
parent | a5bde2f3e9afc3f72da01788b76829fb29c2f4e7 (diff) | |
download | mongo-ee06e6cbe5a75775f76836449558be2f6a98ddfd.tar.gz |
SERVER-33323 Refactor agg cursor merging on mongos
This commit makes it so that aggregations will always use a
$mergeCursors as a wrapper around a AsyncResultsMerger, which is new
behavior for mongos. As part of this refactor, we can delete the concept
of a 'merging presorted' $sort stage (which is now handled by the
AsyncResultsMerger) and delete the DocumentSourceRouterAdapter stage
which talked to a RouterStageMerge, instead directly using a
$mergeCursors stage.
Diffstat (limited to 'src/mongo/s')
22 files changed, 889 insertions, 1113 deletions
diff --git a/src/mongo/s/commands/SConscript b/src/mongo/s/commands/SConscript index 5c75e96236e..f39ee98cc70 100644 --- a/src/mongo/s/commands/SConscript +++ b/src/mongo/s/commands/SConscript @@ -108,10 +108,10 @@ env.Library( '$BUILD_DIR/mongo/db/auth/saslauth', '$BUILD_DIR/mongo/db/commands/core', '$BUILD_DIR/mongo/db/commands/current_op_common', - '$BUILD_DIR/mongo/db/commands/servers', '$BUILD_DIR/mongo/db/commands/feature_compatibility_parsers', '$BUILD_DIR/mongo/db/commands/kill_common', '$BUILD_DIR/mongo/db/commands/profile_common', + '$BUILD_DIR/mongo/db/commands/servers', '$BUILD_DIR/mongo/db/commands/test_commands_enabled', '$BUILD_DIR/mongo/db/commands/write_commands_common', '$BUILD_DIR/mongo/db/ftdc/ftdc_server', @@ -119,10 +119,12 @@ env.Library( '$BUILD_DIR/mongo/db/logical_session_cache_impl', '$BUILD_DIR/mongo/db/pipeline/aggregation', '$BUILD_DIR/mongo/db/pipeline/mongos_process_interface', + '$BUILD_DIR/mongo/db/pipeline/cluster_aggregation_planner', '$BUILD_DIR/mongo/db/stats/counters', '$BUILD_DIR/mongo/db/views/views', '$BUILD_DIR/mongo/executor/async_multicaster', '$BUILD_DIR/mongo/rpc/client_metadata', + '$BUILD_DIR/mongo/s/query/cluster_client_cursor', '$BUILD_DIR/mongo/s/sharding_api', '$BUILD_DIR/mongo/s/sharding_legacy_api', '$BUILD_DIR/mongo/s/transaction/router_session', diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp index d06c5426555..fbf2446467b 100644 --- a/src/mongo/s/commands/cluster_aggregate.cpp +++ b/src/mongo/s/commands/cluster_aggregate.cpp @@ -1,30 +1,30 @@ /** -* Copyright (C) 2016 MongoDB Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand @@ -66,7 +66,7 @@ #include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/s/query/cluster_query_knobs.h" #include "mongo/s/query/establish_cursors.h" -#include "mongo/s/query/router_stage_update_on_add_shard.h" +#include "mongo/s/query/router_stage_pipeline.h" #include "mongo/s/query/store_possible_cursor.h" #include "mongo/s/stale_exception.h" #include "mongo/util/fail_point.h" @@ -75,6 +75,8 @@ namespace mongo { +using SplitPipeline = cluster_aggregation_planner::SplitPipeline; + MONGO_FAIL_POINT_DEFINE(clusterAggregateHangBeforeEstablishingShardCursors); namespace { @@ -104,38 +106,6 @@ Document wrapAggAsExplain(Document aggregateCommand, ExplainOptions::Verbosity v return explainCommandBuilder.freeze(); } -Status appendExplainResults( - const std::vector<AsyncRequestsSender::Response>& shardResults, - const boost::intrusive_ptr<ExpressionContext>& mergeCtx, - const std::unique_ptr<Pipeline, PipelineDeleter>& pipelineForTargetedShards, - const std::unique_ptr<Pipeline, PipelineDeleter>& pipelineForMerging, - BSONObjBuilder* result) { - if (pipelineForTargetedShards->isSplitForShards()) { - *result << "mergeType" - << (pipelineForMerging->canRunOnMongos() - ? "mongos" - : pipelineForMerging->needsPrimaryShardMerger() ? "primaryShard" - : "anyShard") - << "splitPipeline" - << Document{ - {"shardsPart", - pipelineForTargetedShards->writeExplainOps(*mergeCtx->explain)}, - {"mergerPart", pipelineForMerging->writeExplainOps(*mergeCtx->explain)}}; - } else { - *result << "splitPipeline" << BSONNULL; - } - - BSONObjBuilder shardExplains(result->subobjStart("shards")); - for (const auto& shardResult : shardResults) { - invariant(shardResult.shardHostAndPort); - shardExplains.append(shardResult.shardId.toString(), - BSON("host" << shardResult.shardHostAndPort->toString() << "stages" - << shardResult.swResponse.getValue().data["stages"])); - } - - return Status::OK(); -} - Status appendCursorResponseToCommandResult(const ShardId& shardId, const BSONObj cursorResponse, BSONObjBuilder* result) { @@ -246,11 +216,10 @@ BSONObj createCommandForTargetedShards( return appendAllowImplicitCreate(cmdObj, true); } -BSONObj createCommandForMergingShard( - const AggregationRequest& request, - const boost::intrusive_ptr<ExpressionContext>& mergeCtx, - const BSONObj originalCmdObj, - const std::unique_ptr<Pipeline, PipelineDeleter>& pipelineForMerging) { +BSONObj createCommandForMergingShard(const AggregationRequest& request, + const boost::intrusive_ptr<ExpressionContext>& mergeCtx, + const BSONObj originalCmdObj, + const Pipeline* pipelineForMerging) { MutableDocument mergeCmd(request.serializeToCommandObj()); mergeCmd["pipeline"] = Value(pipelineForMerging->serialize()); @@ -339,12 +308,11 @@ struct DispatchShardPipelineResults { // Populated if this *is* an explain, this vector represents the results from each shard. std::vector<AsyncRequestsSender::Response> remoteExplainOutput; - // The half of the pipeline that was sent to each shard, or the entire pipeline if there was - // only one shard targeted. - std::unique_ptr<Pipeline, PipelineDeleter> pipelineForTargetedShards; + // The split version of the pipeline if more than one shard was targeted, otherwise boost::none. + boost::optional<SplitPipeline> splitPipeline; - // The merging half of the pipeline if more than one shard was targeted, otherwise nullptr. - std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging; + // If the pipeline targeted a single shard, this is the pipeline to run on that shard. + std::unique_ptr<Pipeline, PipelineDeleter> pipelineForSingleShard; // The command object to send to the targeted shards. BSONObj commandForTargetedShards; @@ -380,8 +348,7 @@ DispatchShardPipelineResults dispatchShardPipeline( const auto shardQuery = pipeline->getInitialQuery(); - auto pipelineForTargetedShards = std::move(pipeline); - std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging; + boost::optional<SplitPipeline> splitPipeline; auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, executionNss); @@ -415,13 +382,19 @@ DispatchShardPipelineResults dispatchShardPipeline( *(shardIds.begin()) != executionNsRoutingInfo->db().primaryId())); if (needsSplit) { - pipelineForMerging = std::move(pipelineForTargetedShards); - pipelineForTargetedShards = pipelineForMerging->splitForSharded(); + splitPipeline = cluster_aggregation_planner::splitPipeline(std::move(pipeline)); } // Generate the command object for the targeted shards. - BSONObj targetedCommand = createCommandForTargetedShards( - opCtx, aggRequest, originalCmdObj, pipelineForTargetedShards, collationObj, atClusterTime); + BSONObj targetedCommand = splitPipeline + ? createCommandForTargetedShards(opCtx, + aggRequest, + originalCmdObj, + splitPipeline->shardsPipeline, + collationObj, + atClusterTime) + : createCommandForTargetedShards( + opCtx, aggRequest, originalCmdObj, pipeline, collationObj, atClusterTime); // Refresh the shard registry if we're targeting all shards. We need the shard registry // to be at least as current as the logical time used when creating the command for @@ -480,11 +453,40 @@ DispatchShardPipelineResults dispatchShardPipeline( return DispatchShardPipelineResults{needsPrimaryShardMerge, std::move(cursors), std::move(shardResults), - std::move(pipelineForTargetedShards), - std::move(pipelineForMerging), + std::move(splitPipeline), + std::move(pipeline), targetedCommand}; } +Status appendExplainResults(DispatchShardPipelineResults&& dispatchResults, + const boost::intrusive_ptr<ExpressionContext>& mergeCtx, + BSONObjBuilder* result) { + if (dispatchResults.splitPipeline) { + auto* mergePipeline = dispatchResults.splitPipeline->mergePipeline.get(); + *result << "mergeType" + << (mergePipeline->canRunOnMongos() + ? "mongos" + : mergePipeline->needsPrimaryShardMerger() ? "primaryShard" : "anyShard") + << "splitPipeline" + << Document{{"shardsPart", + dispatchResults.splitPipeline->shardsPipeline->writeExplainOps( + *mergeCtx->explain)}, + {"mergerPart", mergePipeline->writeExplainOps(*mergeCtx->explain)}}; + } else { + *result << "splitPipeline" << BSONNULL; + } + + BSONObjBuilder shardExplains(result->subobjStart("shards")); + for (const auto& shardResult : dispatchResults.remoteExplainOutput) { + invariant(shardResult.shardHostAndPort); + shardExplains.append(shardResult.shardId.toString(), + BSON("host" << shardResult.shardHostAndPort->toString() << "stages" + << shardResult.swResponse.getValue().data["stages"])); + } + + return Status::OK(); +} + Shard::CommandResponse establishMergingShardCursor(OperationContext* opCtx, const NamespaceString& nss, const BSONObj mergeCmdObj, @@ -500,20 +502,18 @@ Shard::CommandResponse establishMergingShardCursor(OperationContext* opCtx, Shard::RetryPolicy::kIdempotent)); } -BSONObj establishMergingMongosCursor(OperationContext* opCtx, - const AggregationRequest& request, - const NamespaceString& requestedNss, - BSONObj cmdToRunOnNewShards, - const LiteParsedPipeline& liteParsedPipeline, - std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging, - std::vector<RemoteCursor> cursors) { +BSONObj establishMergingMongosCursor( + OperationContext* opCtx, + const AggregationRequest& request, + const NamespaceString& requestedNss, + BSONObj cmdToRunOnNewShards, + const LiteParsedPipeline& liteParsedPipeline, + std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging) { ClusterClientCursorParams params(requestedNss, ReadPreferenceSetting::get(opCtx)); params.originatingCommandObj = CurOp::get(opCtx)->opDescription().getOwned(); params.tailableMode = pipelineForMerging->getContext()->tailableMode; - params.mergePipeline = std::move(pipelineForMerging); - params.remotes = std::move(cursors); // A batch size of 0 is legal for the initial aggregate, but not valid for getMores, the batch // size we pass here is used for getMores, so do not specify a batch size if the initial request // had a batch size of 0. @@ -523,25 +523,8 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx, params.lsid = opCtx->getLogicalSessionId(); params.txnNumber = opCtx->getTxnNumber(); - if (liteParsedPipeline.hasChangeStream()) { - // For change streams, we need to set up a custom stage to establish cursors on new shards - // when they are added. Be careful to extract the targeted shard IDs before the remote - // cursors are transferred from the ClusterClientCursorParams to the AsyncResultsMerger. - std::vector<ShardId> shardIds; - for (const auto& remote : params.remotes) { - shardIds.emplace_back(remote.getShardId().toString()); - } - - params.createCustomCursorSource = [cmdToRunOnNewShards, - shardIds](OperationContext* opCtx, - executor::TaskExecutor* executor, - ClusterClientCursorParams* params) { - return stdx::make_unique<RouterStageUpdateOnAddShard>( - opCtx, executor, params, std::move(shardIds), cmdToRunOnNewShards); - }; - } - auto ccc = ClusterClientCursorImpl::make( - opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), std::move(params)); + auto ccc = cluster_aggregation_planner::buildClusterCursor( + opCtx, std::move(pipelineForMerging), std::move(params)); auto cursorState = ClusterCursorManager::CursorState::NotExhausted; @@ -726,17 +709,15 @@ std::pair<BSONObj, boost::optional<UUID>> getCollationAndUUID( } ShardId pickMergingShard(OperationContext* opCtx, - const DispatchShardPipelineResults& dispatchResults, + bool needsPrimaryShardMerge, + const std::vector<ShardId>& targetedShards, ShardId primaryShard) { auto& prng = opCtx->getClient()->getPrng(); // If we cannot merge on mongoS, establish the merge cursor on a shard. Perform the merging // command on random shard, unless the pipeline dictates that it needs to be run on the primary // shard for the database. - return dispatchResults.needsPrimaryShardMerge - ? primaryShard - : dispatchResults.remoteCursors[prng.nextInt32(dispatchResults.remoteCursors.size())] - .getShardId() - .toString(); + return needsPrimaryShardMerge ? primaryShard + : targetedShards[prng.nextInt32(targetedShards.size())]; } // "Resolve" involved namespaces and verify that none of them are sharded unless allowed by the @@ -798,7 +779,6 @@ Status runPipelineOnMongoS(const boost::intrusive_ptr<ExpressionContext>& expCtx BSONObj cmdObj, const LiteParsedPipeline& litePipe, std::unique_ptr<Pipeline, PipelineDeleter> pipeline, - std::vector<RemoteCursor>&& cursors, BSONObjBuilder* result) { // We should never receive a pipeline intended for the shards, or which cannot run on mongoS. invariant(!pipeline->isSplitForShards()); @@ -807,8 +787,7 @@ Status runPipelineOnMongoS(const boost::intrusive_ptr<ExpressionContext>& expCtx const auto& requestedNss = namespaces.requestedNss; const auto opCtx = expCtx->opCtx; - // If this is an unsplit mongoS-only pipeline, verify that the first stage can produce input for - // the remainder of the pipeline. + // Verify that the first stage can produce input for the remainder of the pipeline. uassert(ErrorCodes::IllegalOperation, str::stream() << "Aggregation pipeline must be run on mongoS, but " << pipeline->getSources().front()->getSourceName() @@ -816,7 +795,6 @@ Status runPipelineOnMongoS(const boost::intrusive_ptr<ExpressionContext>& expCtx pipeline->isSplitForMerge() || !pipeline->getSources().front()->constraints().requiresInputDocSource); - // If this is an explain and the pipeline is not split, write the explain output and return. if (expCtx->explain && !pipeline->isSplitForMerge()) { *result << "splitPipeline" << BSONNULL << "mongos" << Document{{"host", getHostNameCachedAndPort()}, @@ -826,7 +804,7 @@ Status runPipelineOnMongoS(const boost::intrusive_ptr<ExpressionContext>& expCtx // Register the new mongoS cursor, and retrieve the initial batch of results. auto cursorResponse = establishMergingMongosCursor( - opCtx, request, requestedNss, cmdObj, litePipe, std::move(pipeline), std::move(cursors)); + opCtx, request, requestedNss, cmdObj, litePipe, std::move(pipeline)); // We don't need to storePossibleCursor or propagate writeConcern errors; an $out pipeline // can never run on mongoS. Filter the command response and return immediately. @@ -840,25 +818,38 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& ex BSONObj cmdObj, const LiteParsedPipeline& litePipe, const boost::optional<CachedCollectionRoutingInfo>& routingInfo, - DispatchShardPipelineResults& shardDispatchResults, + DispatchShardPipelineResults&& shardDispatchResults, BSONObjBuilder* result) { // We should never be in a situation where we call this function on a non-merge pipeline. - auto& mergingPipeline = shardDispatchResults.pipelineForMerging; - invariant(mergingPipeline && mergingPipeline->isSplitForMerge()); + invariant(shardDispatchResults.splitPipeline); + auto* mergePipeline = shardDispatchResults.splitPipeline->mergePipeline.get(); + auto* opCtx = expCtx->opCtx; + + std::vector<ShardId> targetedShards; + targetedShards.reserve(shardDispatchResults.remoteCursors.size()); + for (auto&& remoteCursor : shardDispatchResults.remoteCursors) { + targetedShards.emplace_back(remoteCursor.getShardId().toString()); + } - const auto opCtx = expCtx->opCtx; + cluster_aggregation_planner::addMergeCursorsSource( + mergePipeline, + litePipe, + shardDispatchResults.commandForTargetedShards, + std::move(shardDispatchResults.remoteCursors), + targetedShards, + shardDispatchResults.splitPipeline->shardCursorsSortSpec, + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor()); // First, check whether we can merge on the mongoS. If the merge pipeline MUST run on mongoS, // then ignore the internalQueryProhibitMergingOnMongoS parameter. - if (mergingPipeline->requiredToRunOnMongos() || - (!internalQueryProhibitMergingOnMongoS.load() && mergingPipeline->canRunOnMongos())) { + if (mergePipeline->requiredToRunOnMongos() || + (!internalQueryProhibitMergingOnMongoS.load() && mergePipeline->canRunOnMongos())) { return runPipelineOnMongoS(expCtx, namespaces, request, shardDispatchResults.commandForTargetedShards, litePipe, - std::move(mergingPipeline), - std::move(shardDispatchResults.remoteCursors), + std::move(shardDispatchResults.splitPipeline->mergePipeline), result); } @@ -873,15 +864,12 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& ex "merging on a shard", !opCtx->getTxnNumber()); - ShardId mergingShardId = - pickMergingShard(opCtx, shardDispatchResults, routingInfo->db().primaryId()); - - cluster_aggregation_planner::addMergeCursorsSource( - mergingPipeline.get(), - std::move(shardDispatchResults.remoteCursors), - Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor()); + ShardId mergingShardId = pickMergingShard(opCtx, + shardDispatchResults.needsPrimaryShardMerge, + targetedShards, + routingInfo->db().primaryId()); - auto mergeCmdObj = createCommandForMergingShard(request, expCtx, cmdObj, mergingPipeline); + auto mergeCmdObj = createCommandForMergingShard(request, expCtx, cmdObj, mergePipeline); // Dispatch $mergeCursors to the chosen shard, store the resulting cursor, and return. auto mergeResponse = @@ -964,7 +952,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, // Check whether the entire pipeline must be run on mongoS. if (pipeline->requiredToRunOnMongos()) { return runPipelineOnMongoS( - expCtx, namespaces, request, cmdObj, litePipe, std::move(pipeline), {}, result); + expCtx, namespaces, request, cmdObj, litePipe, std::move(pipeline), result); } // If not, split the pipeline as necessary and dispatch to the relevant shards. @@ -980,18 +968,14 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, // write the results to the output builder, and return immediately. if (expCtx->explain) { uassertAllShardsSupportExplain(shardDispatchResults.remoteExplainOutput); - return appendExplainResults(std::move(shardDispatchResults.remoteExplainOutput), - expCtx, - shardDispatchResults.pipelineForTargetedShards, - shardDispatchResults.pipelineForMerging, - result); + return appendExplainResults(std::move(shardDispatchResults), expCtx, result); } // If this isn't an explain, then we must have established cursors on at least one shard. invariant(shardDispatchResults.remoteCursors.size() > 0); // If we sent the entire pipeline to a single shard, store the remote cursor and return. - if (!shardDispatchResults.pipelineForTargetedShards->isSplitForShards()) { + if (!shardDispatchResults.splitPipeline) { invariant(shardDispatchResults.remoteCursors.size() == 1); auto& remoteCursor = shardDispatchResults.remoteCursors.front(); const auto reply = uassertStatusOK(storePossibleCursor( @@ -1001,8 +985,14 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, } // If we reach here, we have a merge pipeline to dispatch. - return dispatchMergingPipeline( - expCtx, namespaces, request, cmdObj, litePipe, routingInfo, shardDispatchResults, result); + return dispatchMergingPipeline(expCtx, + namespaces, + request, + cmdObj, + litePipe, + routingInfo, + std::move(shardDispatchResults), + result); } void ClusterAggregate::uassertAllShardsSupportExplain( diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript index f153381d12c..ae0c24ad21e 100644 --- a/src/mongo/s/query/SConscript +++ b/src/mongo/s/query/SConscript @@ -40,7 +40,6 @@ env.Library( "cluster_client_cursor_impl.cpp", ], LIBDEPS=[ - "$BUILD_DIR/mongo/db/pipeline/pipeline", "router_exec_stage", ], ) @@ -48,20 +47,14 @@ env.Library( env.Library( target="router_exec_stage", source=[ - "document_source_router_adapter.cpp", "router_stage_limit.cpp", - "router_stage_merge.cpp", "router_stage_mock.cpp", "router_stage_pipeline.cpp", "router_stage_remove_metadata_fields.cpp", "router_stage_skip.cpp", - "router_stage_update_on_add_shard.cpp", ], LIBDEPS=[ - "$BUILD_DIR/mongo/db/query/query_common", "async_results_merger", - ], - LIBDEPS_PRIVATE=[ "$BUILD_DIR/mongo/db/pipeline/pipeline", ], ) @@ -83,11 +76,13 @@ env.Library( target="async_results_merger", source=[ "async_results_merger.cpp", + "blocking_results_merger.cpp", "establish_cursors.cpp", env.Idlc('async_results_merger_params.idl')[0], ], LIBDEPS=[ "$BUILD_DIR/mongo/db/query/command_request_response", + "$BUILD_DIR/mongo/db/query/query_common", "$BUILD_DIR/mongo/executor/task_executor_interface", "$BUILD_DIR/mongo/s/async_requests_sender", "$BUILD_DIR/mongo/s/client/sharding_client", @@ -106,9 +101,11 @@ env.Library( ) env.CppUnitTest( - target="async_results_merger_test", + target="results_merger_test", source=[ + "blocking_results_merger_test.cpp", "async_results_merger_test.cpp", + "results_merger_test_fixture.cpp", ], LIBDEPS=[ 'async_results_merger', diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index f5268ac3408..3cc6756c843 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -88,8 +88,7 @@ AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx, // This strange initialization is to work around the fact that the IDL does not currently // support a default value for an enum. The default tailable mode should be 'kNormal', but // since that is not supported we treat boost::none (unspecified) to mean 'kNormal'. - _tailableMode(params.getTailableMode() ? *params.getTailableMode() - : TailableModeEnum::kNormal), + _tailableMode(params.getTailableMode().value_or(TailableModeEnum::kNormal)), _params(std::move(params)), _mergeQueue(MergingComparator(_remotes, _params.getSort() ? *_params.getSort() : BSONObj(), @@ -116,12 +115,12 @@ AsyncResultsMerger::~AsyncResultsMerger() { invariant(_remotesExhausted(lk) || _lifecycleState == kKillComplete); } -bool AsyncResultsMerger::remotesExhausted() { +bool AsyncResultsMerger::remotesExhausted() const { stdx::lock_guard<stdx::mutex> lk(_mutex); return _remotesExhausted(lk); } -bool AsyncResultsMerger::_remotesExhausted(WithLock) { +bool AsyncResultsMerger::_remotesExhausted(WithLock) const { for (const auto& remote : _remotes) { if (!remote.exhausted()) { return false; @@ -769,36 +768,4 @@ bool AsyncResultsMerger::MergingComparator::operator()(const size_t& lhs, const _sort) > 0; } -void AsyncResultsMerger::blockingKill(OperationContext* opCtx) { - auto killEvent = kill(opCtx); - if (!killEvent) { - // We are shutting down. - return; - } - _executor->waitForEvent(killEvent); -} - -StatusWith<ClusterQueryResult> AsyncResultsMerger::blockingNext() { - while (!ready()) { - auto nextEventStatus = nextEvent(); - if (!nextEventStatus.isOK()) { - return nextEventStatus.getStatus(); - } - auto event = nextEventStatus.getValue(); - - // Block until there are further results to return. - auto status = _executor->waitForEvent(_opCtx, event); - - if (!status.isOK()) { - return status.getStatus(); - } - - // We have not provided a deadline, so if the wait returns without interruption, we do not - // expect to have timed out. - invariant(status.getValue() == stdx::cv_status::no_timeout); - } - - return nextReady(); -} - } // namespace mongo diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h index 5f8a18194d2..488e03d2ee5 100644 --- a/src/mongo/s/query/async_results_merger.h +++ b/src/mongo/s/query/async_results_merger.h @@ -109,7 +109,7 @@ public: /** * Returns true if all of the remote cursors are exhausted. */ - bool remotesExhausted(); + bool remotesExhausted() const; /** * Sets the maxTimeMS value that the ARM should forward with any internally issued getMore @@ -167,12 +167,6 @@ public: StatusWith<ClusterQueryResult> nextReady(); /** - * Blocks until the next result is ready, all remote cursors are exhausted, or there is an - * error. - */ - StatusWith<ClusterQueryResult> blockingNext(); - - /** * Schedules remote work as required in order to make further results available. If there is an * error in scheduling this work, returns a non-ok status. On success, returns an event handle. * The caller can pass this event handle to 'executor' in order to be blocked until further @@ -238,11 +232,6 @@ public: */ executor::TaskExecutor::EventHandle kill(OperationContext* opCtx); - /** - * A blocking version of kill() that will not return until this is safe to destroy. - */ - void blockingKill(OperationContext*); - private: /** * We instantiate one of these per remote host. It contains the buffer of results we've @@ -346,7 +335,7 @@ private: /** * Checks whether or not the remote cursors are all exhausted. */ - bool _remotesExhausted(WithLock); + bool _remotesExhausted(WithLock) const; // // Helpers for ready(). @@ -433,7 +422,7 @@ private: AsyncResultsMergerParams _params; // Must be acquired before accessing any data members (other than _params, which is read-only). - stdx::mutex _mutex; + mutable stdx::mutex _mutex; // Data tracking the state of our communication with each of the remote nodes. std::vector<RemoteCursorData> _remotes; diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp index 7960d22f018..b852cf33f79 100644 --- a/src/mongo/s/query/async_results_merger_test.cpp +++ b/src/mongo/s/query/async_results_merger_test.cpp @@ -30,18 +30,13 @@ #include "mongo/s/query/async_results_merger.h" -#include "mongo/client/remote_command_targeter_factory_mock.h" -#include "mongo/client/remote_command_targeter_mock.h" #include "mongo/db/json.h" #include "mongo/db/query/cursor_response.h" #include "mongo/db/query/getmore_request.h" #include "mongo/db/query/query_request.h" -#include "mongo/executor/network_interface_mock.h" #include "mongo/executor/task_executor.h" -#include "mongo/executor/thread_pool_task_executor_test_fixture.h" -#include "mongo/s/catalog/type_shard.h" #include "mongo/s/client/shard_registry.h" -#include "mongo/s/sharding_router_test_fixture.h" +#include "mongo/s/query/results_merger_test_fixture.h" #include "mongo/stdx/memory.h" #include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" @@ -50,212 +45,11 @@ namespace mongo { namespace { -using executor::NetworkInterfaceMock; -using executor::RemoteCommandRequest; -using executor::RemoteCommandResponse; - -using ResponseStatus = executor::TaskExecutor::ResponseStatus; - -const HostAndPort kTestConfigShardHost = HostAndPort("FakeConfigHost", 12345); -const std::vector<ShardId> kTestShardIds = { - ShardId("FakeShard1"), ShardId("FakeShard2"), ShardId("FakeShard3")}; -const std::vector<HostAndPort> kTestShardHosts = {HostAndPort("FakeShard1Host", 12345), - HostAndPort("FakeShard2Host", 12345), - HostAndPort("FakeShard3Host", 12345)}; - -const NamespaceString kTestNss("testdb.testcoll"); - LogicalSessionId parseSessionIdFromCmd(BSONObj cmdObj) { return LogicalSessionId::parse(IDLParserErrorContext("lsid"), cmdObj["lsid"].Obj()); } -class AsyncResultsMergerTest : public ShardingTestFixture { -public: - AsyncResultsMergerTest() {} - - void setUp() override { - setRemote(HostAndPort("ClientHost", 12345)); - - configTargeter()->setFindHostReturnValue(kTestConfigShardHost); - - std::vector<ShardType> shards; - - for (size_t i = 0; i < kTestShardIds.size(); i++) { - ShardType shardType; - shardType.setName(kTestShardIds[i].toString()); - shardType.setHost(kTestShardHosts[i].toString()); - - shards.push_back(shardType); - - std::unique_ptr<RemoteCommandTargeterMock> targeter( - stdx::make_unique<RemoteCommandTargeterMock>()); - targeter->setConnectionStringReturnValue(ConnectionString(kTestShardHosts[i])); - targeter->setFindHostReturnValue(kTestShardHosts[i]); - - targeterFactory()->addTargeterToReturn(ConnectionString(kTestShardHosts[i]), - std::move(targeter)); - } - - setupShards(shards); - } - -protected: - /** - * Constructs an ARM with the given vector of existing cursors. - * - * If 'findCmd' is not set, the default AsyncResultsMergerParams are used. - * Otherwise, the 'findCmd' is used to construct the AsyncResultsMergerParams. - * - * 'findCmd' should not have a 'batchSize', since the find's batchSize is used just in the - * initial find. The getMore 'batchSize' can be passed in through 'getMoreBatchSize.' - */ - std::unique_ptr<AsyncResultsMerger> makeARMFromExistingCursors( - std::vector<RemoteCursor> remoteCursors, - boost::optional<BSONObj> findCmd = boost::none, - boost::optional<std::int64_t> getMoreBatchSize = boost::none) { - AsyncResultsMergerParams params; - params.setNss(kTestNss); - params.setRemotes(std::move(remoteCursors)); - - - if (findCmd) { - const auto qr = unittest::assertGet( - QueryRequest::makeFromFindCommand(kTestNss, *findCmd, false /* isExplain */)); - if (!qr->getSort().isEmpty()) { - params.setSort(qr->getSort().getOwned()); - } - - if (getMoreBatchSize) { - params.setBatchSize(getMoreBatchSize); - } else { - params.setBatchSize(qr->getBatchSize() - ? boost::optional<std::int64_t>( - static_cast<std::int64_t>(*qr->getBatchSize())) - : boost::none); - } - params.setTailableMode(qr->getTailableMode()); - params.setAllowPartialResults(qr->isAllowPartialResults()); - } - - OperationSessionInfo sessionInfo; - sessionInfo.setSessionId(operationContext()->getLogicalSessionId()); - sessionInfo.setTxnNumber(operationContext()->getTxnNumber()); - params.setOperationSessionInfo(sessionInfo); - - return stdx::make_unique<AsyncResultsMerger>( - operationContext(), executor(), std::move(params)); - } - - /** - * Schedules a "CommandOnShardedViewNotSupportedOnMongod" error response w/ view definition. - */ - void scheduleNetworkViewResponse(const std::string& ns, const std::string& pipelineJsonArr) { - BSONObjBuilder viewDefBob; - viewDefBob.append("ns", ns); - viewDefBob.append("pipeline", fromjson(pipelineJsonArr)); - - BSONObjBuilder bob; - bob.append("resolvedView", viewDefBob.obj()); - bob.append("ok", 0.0); - bob.append("errmsg", "Command on view must be executed by mongos"); - bob.append("code", 169); - - std::vector<BSONObj> batch = {bob.obj()}; - scheduleNetworkResponseObjs(batch); - } - - /** - * Schedules a list of cursor responses to be returned by the mock network. - */ - void scheduleNetworkResponses(std::vector<CursorResponse> responses) { - std::vector<BSONObj> objs; - for (const auto& cursorResponse : responses) { - // For tests of the AsyncResultsMerger, all CursorRepsonses scheduled by the tests are - // subsequent responses, since the AsyncResultsMerger will only ever run getMores. - objs.push_back(cursorResponse.toBSON(CursorResponse::ResponseType::SubsequentResponse)); - } - scheduleNetworkResponseObjs(objs); - } - - /** - * Schedules a list of raw BSON command responses to be returned by the mock network. - */ - void scheduleNetworkResponseObjs(std::vector<BSONObj> objs) { - executor::NetworkInterfaceMock* net = network(); - net->enterNetwork(); - for (const auto& obj : objs) { - ASSERT_TRUE(net->hasReadyRequests()); - Milliseconds millis(0); - RemoteCommandResponse response(obj, millis); - executor::TaskExecutor::ResponseStatus responseStatus(response); - net->scheduleResponse(net->getNextReadyRequest(), net->now(), responseStatus); - } - net->runReadyNetworkOperations(); - net->exitNetwork(); - } - - RemoteCommandRequest getNthPendingRequest(size_t n) { - executor::NetworkInterfaceMock* net = network(); - net->enterNetwork(); - ASSERT_TRUE(net->hasReadyRequests()); - NetworkInterfaceMock::NetworkOperationIterator noi = net->getNthUnscheduledRequest(n); - RemoteCommandRequest retRequest = noi->getRequest(); - net->exitNetwork(); - return retRequest; - } - - bool networkHasReadyRequests() { - NetworkInterfaceMock::InNetworkGuard guard(network()); - return guard->hasReadyRequests(); - } - - void scheduleErrorResponse(ResponseStatus rs) { - invariant(!rs.isOK()); - rs.elapsedMillis = Milliseconds(0); - executor::NetworkInterfaceMock* net = network(); - net->enterNetwork(); - ASSERT_TRUE(net->hasReadyRequests()); - net->scheduleResponse(net->getNextReadyRequest(), net->now(), rs); - net->runReadyNetworkOperations(); - net->exitNetwork(); - } - - void runReadyCallbacks() { - executor::NetworkInterfaceMock* net = network(); - net->enterNetwork(); - net->runReadyNetworkOperations(); - net->exitNetwork(); - } - - void blackHoleNextRequest() { - executor::NetworkInterfaceMock* net = network(); - net->enterNetwork(); - ASSERT_TRUE(net->hasReadyRequests()); - net->blackHole(net->getNextReadyRequest()); - net->exitNetwork(); - } -}; - -void assertKillCusorsCmdHasCursorId(const BSONObj& killCmd, CursorId cursorId) { - ASSERT_TRUE(killCmd.hasElement("killCursors")); - ASSERT_EQ(killCmd["cursors"].type(), BSONType::Array); - - size_t numCursors = 0; - for (auto&& cursor : killCmd["cursors"].Obj()) { - ASSERT_EQ(cursor.type(), BSONType::NumberLong); - ASSERT_EQ(cursor.numberLong(), cursorId); - ++numCursors; - } - ASSERT_EQ(numCursors, 1u); -} - -RemoteCursor makeRemoteCursor(ShardId shardId, HostAndPort host, CursorResponse response) { - RemoteCursor remoteCursor; - remoteCursor.setShardId(std::move(shardId)); - remoteCursor.setHostAndPort(std::move(host)); - remoteCursor.setCursorResponse(std::move(response)); - return remoteCursor; -} +using AsyncResultsMergerTest = ResultsMergerTestFixture; TEST_F(AsyncResultsMergerTest, SingleShardUnsorted) { std::vector<RemoteCursor> cursors; @@ -1888,76 +1682,6 @@ TEST_F(AsyncResultsMergerTest, KillShouldNotWaitForRemoteCommandsBeforeSchedulin executor()->waitForEvent(killEvent); } -TEST_F(AsyncResultsMergerTest, ShouldBeAbleToBlockUntilNextResultIsReady) { - std::vector<RemoteCursor> cursors; - cursors.emplace_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors)); - - // Before any requests are scheduled, ARM is not ready to return results. - ASSERT_FALSE(arm->ready()); - ASSERT_FALSE(arm->remotesExhausted()); - - // Issue a blocking wait for the next result asynchronously on a different thread. - auto future = launchAsync([&]() { - auto next = unittest::assertGet(arm->blockingNext()); - ASSERT_FALSE(next.isEOF()); - ASSERT_BSONOBJ_EQ(*next.getResult(), BSON("x" << 1)); - next = unittest::assertGet(arm->blockingNext()); - ASSERT_TRUE(next.isEOF()); - }); - - // Schedule the response to the getMore which will return the next result and mark the cursor as - // exhausted. - onCommand([&](const auto& request) { - ASSERT(request.cmdObj["getMore"]); - return CursorResponse(kTestNss, 0LL, {BSON("x" << 1)}) - .toBSON(CursorResponse::ResponseType::SubsequentResponse); - }); - - future.timed_get(kFutureTimeout); -} - -TEST_F(AsyncResultsMergerTest, ShouldBeInterruptableDuringBlockingNext) { - std::vector<RemoteCursor> cursors; - cursors.emplace_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors)); - - // Issue a blocking wait for the next result asynchronously on a different thread. - auto future = launchAsync([&]() { - auto nextStatus = arm->blockingNext(); - ASSERT_EQ(nextStatus.getStatus(), ErrorCodes::Interrupted); - }); - - // Now mark the OperationContext as killed from this thread. - { - stdx::lock_guard<Client> lk(*operationContext()->getClient()); - operationContext()->markKilled(ErrorCodes::Interrupted); - } - future.timed_get(kFutureTimeout); - // Be careful not to use a blocking kill here, since the main thread is in charge of running the - // callbacks, and we'd block on ourselves. - auto killEvent = arm->kill(operationContext()); - - assertKillCusorsCmdHasCursorId(getNthPendingRequest(0u).cmdObj, 1); - runReadyCallbacks(); - executor()->waitForEvent(killEvent); -} - -TEST_F(AsyncResultsMergerTest, ShouldBeAbleToBlockUntilKilled) { - std::vector<RemoteCursor> cursors; - cursors.emplace_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors)); - - // Before any requests are scheduled, ARM is not ready to return results. - ASSERT_FALSE(arm->ready()); - ASSERT_FALSE(arm->remotesExhausted()); - - arm->blockingKill(operationContext()); -} - TEST_F(AsyncResultsMergerTest, GetMoresShouldNotIncludeLSIDOrTxnNumberIfNoneSpecified) { std::vector<RemoteCursor> cursors; cursors.emplace_back( diff --git a/src/mongo/s/query/blocking_results_merger.cpp b/src/mongo/s/query/blocking_results_merger.cpp new file mode 100644 index 00000000000..f5ba2af0bf6 --- /dev/null +++ b/src/mongo/s/query/blocking_results_merger.cpp @@ -0,0 +1,140 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/query/find_common.h" +#include "mongo/s/query/blocking_results_merger.h" + +namespace mongo { + +BlockingResultsMerger::BlockingResultsMerger(OperationContext* opCtx, + AsyncResultsMergerParams&& armParams, + executor::TaskExecutor* executor) + : _tailableMode(armParams.getTailableMode().value_or(TailableModeEnum::kNormal)), + _executor(executor), + _arm(opCtx, executor, std::move(armParams)) {} + +StatusWith<ClusterQueryResult> BlockingResultsMerger::awaitNextWithTimeout( + OperationContext* opCtx, RouterExecStage::ExecContext execCtx) { + invariant(_tailableMode == TailableModeEnum::kTailableAndAwaitData); + // If we are in kInitialFind or kGetMoreWithAtLeastOneResultInBatch context and the ARM is not + // ready, we don't block. Fall straight through to the return statement. + while (!_arm.ready() && execCtx == RouterExecStage::ExecContext::kGetMoreNoResultsYet) { + auto nextEventStatus = getNextEvent(); + if (!nextEventStatus.isOK()) { + return nextEventStatus.getStatus(); + } + auto event = nextEventStatus.getValue(); + + // Block until there are further results to return, or our time limit is exceeded. + auto waitStatus = + _executor->waitForEvent(opCtx, event, awaitDataState(opCtx).waitForInsertsDeadline); + + if (!waitStatus.isOK()) { + return waitStatus.getStatus(); + } + // Swallow timeout errors for tailable awaitData cursors, stash the event that we were + // waiting on, and return EOF. + if (waitStatus == stdx::cv_status::timeout) { + _leftoverEventFromLastTimeout = std::move(event); + return ClusterQueryResult{}; + } + } + + // We reach this point either if the ARM is ready, or if the ARM is !ready and we are in + // kInitialFind or kGetMoreWithAtLeastOneResultInBatch ExecContext. In the latter case, we + // return EOF immediately rather than blocking for further results. + return _arm.ready() ? _arm.nextReady() : ClusterQueryResult{}; +} + +StatusWith<ClusterQueryResult> BlockingResultsMerger::blockUntilNext(OperationContext* opCtx) { + while (!_arm.ready()) { + auto nextEventStatus = _arm.nextEvent(); + if (!nextEventStatus.isOK()) { + return nextEventStatus.getStatus(); + } + auto event = nextEventStatus.getValue(); + + // Block until there are further results to return. + auto status = _executor->waitForEvent(opCtx, event); + + if (!status.isOK()) { + return status.getStatus(); + } + + // We have not provided a deadline, so if the wait returns without interruption, we do not + // expect to have timed out. + invariant(status.getValue() == stdx::cv_status::no_timeout); + } + + return _arm.nextReady(); +} +StatusWith<ClusterQueryResult> BlockingResultsMerger::next(OperationContext* opCtx, + RouterExecStage::ExecContext execCtx) { + // Non-tailable and tailable non-awaitData cursors always block until ready(). AwaitData + // cursors wait for ready() only until a specified time limit is exceeded. + return (_tailableMode == TailableModeEnum::kTailableAndAwaitData + ? awaitNextWithTimeout(opCtx, execCtx) + : blockUntilNext(opCtx)); +} + +StatusWith<executor::TaskExecutor::EventHandle> BlockingResultsMerger::getNextEvent() { + // If we abandoned a previous event due to a mongoS-side timeout, wait for it first. + if (_leftoverEventFromLastTimeout) { + invariant(_tailableMode == TailableModeEnum::kTailableAndAwaitData); + // If we have an outstanding event from last time, then we might have to manually schedule + // some getMores for the cursors. If a remote response came back while we were between + // getMores (from the user to mongos), the response may have been an empty batch, and the + // ARM would not be able to ask for the next batch immediately since it is not attached to + // an OperationContext. Now that we have a valid OperationContext, we schedule the getMores + // ourselves. + Status getMoreStatus = _arm.scheduleGetMores(); + if (!getMoreStatus.isOK()) { + return getMoreStatus; + } + + // Return the leftover event and clear '_leftoverEventFromLastTimeout'. + auto event = _leftoverEventFromLastTimeout; + _leftoverEventFromLastTimeout = executor::TaskExecutor::EventHandle(); + return event; + } + + return _arm.nextEvent(); +} + +void BlockingResultsMerger::kill(OperationContext* opCtx) { + auto killEvent = _arm.kill(opCtx); + if (!killEvent) { + // We are shutting down. + return; + } + _executor->waitForEvent(killEvent); +} + +} // namespace mongo diff --git a/src/mongo/s/query/blocking_results_merger.h b/src/mongo/s/query/blocking_results_merger.h new file mode 100644 index 00000000000..cbc96cbbfc0 --- /dev/null +++ b/src/mongo/s/query/blocking_results_merger.h @@ -0,0 +1,113 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/s/query/async_results_merger.h" +#include "mongo/s/query/router_exec_stage.h" + +namespace mongo { + +/** + * Layers a simpler blocking interface on top of the AsyncResultsMerger from which this + * BlockingResultsMerger is constructed. + */ +class BlockingResultsMerger { +public: + BlockingResultsMerger(OperationContext* opCtx, + AsyncResultsMergerParams&& arm, + executor::TaskExecutor*); + + /** + * Blocks until the next result is available or an error is detected. + */ + StatusWith<ClusterQueryResult> next(OperationContext*, RouterExecStage::ExecContext); + + Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) { + return _arm.setAwaitDataTimeout(awaitDataTimeout); + } + + void reattachToOperationContext(OperationContext* opCtx) { + _arm.reattachToOperationContext(opCtx); + } + + void detachFromOperationContext() { + _arm.detachFromOperationContext(); + } + + bool remotesExhausted() const { + return _arm.remotesExhausted(); + } + + std::size_t getNumRemotes() const { + return _arm.getNumRemotes(); + } + + void addNewShardCursors(std::vector<RemoteCursor>&& newCursors) { + _arm.addNewShardCursors(std::move(newCursors)); + } + + /** + * Blocks until '_arm' has been killed, which involves cleaning up any remote cursors managed + * by this results merger. + */ + void kill(OperationContext* opCtx); + +private: + /** + * Awaits the next result from the ARM with no time limit. + */ + StatusWith<ClusterQueryResult> blockUntilNext(OperationContext* opCtx); + + /** + * Awaits the next result from the ARM up to the time limit specified on 'opCtx'. If this is the + * user's initial find or we have already obtained at least one result for this batch, this + * method returns EOF immediately rather than blocking. + */ + StatusWith<ClusterQueryResult> awaitNextWithTimeout(OperationContext* opCtx, + RouterExecStage::ExecContext execCtx); + + /** + * Returns the next event to wait upon - either a new event from the ARM, or a valid preceding + * event which we scheduled during the previous call to next(). + */ + StatusWith<executor::TaskExecutor::EventHandle> getNextEvent(); + + TailableModeEnum _tailableMode; + executor::TaskExecutor* _executor; + + // In a case where we have a tailable, awaitData cursor, a call to 'next()' will block waiting + // for an event generated by '_arm', but may time out waiting for this event to be triggered. + // While it's waiting, the time limit for the 'awaitData' piece of the cursor may have been + // exceeded. When this happens, we use '_leftoverEventFromLastTimeout' to remember the old event + // and pick back up waiting for it on the next call to 'next()'. + executor::TaskExecutor::EventHandle _leftoverEventFromLastTimeout; + AsyncResultsMerger _arm; +}; + +} // namespace mongo diff --git a/src/mongo/s/query/blocking_results_merger_test.cpp b/src/mongo/s/query/blocking_results_merger_test.cpp new file mode 100644 index 00000000000..821eda4d8ad --- /dev/null +++ b/src/mongo/s/query/blocking_results_merger_test.cpp @@ -0,0 +1,119 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/s/query/blocking_results_merger.h" +#include "mongo/s/query/results_merger_test_fixture.h" + +namespace mongo { + +namespace { + +using BlockingResultsMergerTest = ResultsMergerTestFixture; + +TEST_F(ResultsMergerTestFixture, ShouldBeAbleToBlockUntilKilled) { + std::vector<RemoteCursor> cursors; + cursors.emplace_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); + BlockingResultsMerger blockingMerger( + operationContext(), makeARMParamsFromExistingCursors(std::move(cursors)), executor()); + + blockingMerger.kill(operationContext()); +} + +TEST_F(ResultsMergerTestFixture, ShouldBeAbleToBlockUntilNextResultIsReady) { + std::vector<RemoteCursor> cursors; + cursors.emplace_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); + BlockingResultsMerger blockingMerger( + operationContext(), makeARMParamsFromExistingCursors(std::move(cursors)), executor()); + + // Issue a blocking wait for the next result asynchronously on a different thread. + auto future = launchAsync([&]() { + auto next = unittest::assertGet( + blockingMerger.next(operationContext(), RouterExecStage::ExecContext::kInitialFind)); + ASSERT_FALSE(next.isEOF()); + ASSERT_BSONOBJ_EQ(*next.getResult(), BSON("x" << 1)); + next = unittest::assertGet( + blockingMerger.next(operationContext(), RouterExecStage::ExecContext::kInitialFind)); + ASSERT_TRUE(next.isEOF()); + }); + + // Schedule the response to the getMore which will return the next result and mark the cursor as + // exhausted. + onCommand([&](const auto& request) { + ASSERT(request.cmdObj["getMore"]); + return CursorResponse(kTestNss, 0LL, {BSON("x" << 1)}) + .toBSON(CursorResponse::ResponseType::SubsequentResponse); + }); + + future.timed_get(kFutureTimeout); +} + +TEST_F(ResultsMergerTestFixture, ShouldBeInterruptableDuringBlockingNext) { + std::vector<RemoteCursor> cursors; + cursors.emplace_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); + auto params = makeARMParamsFromExistingCursors(std::move(cursors)); + params.setTailableMode(TailableModeEnum::kTailableAndAwaitData); + BlockingResultsMerger blockingMerger(operationContext(), std::move(params), executor()); + + // Issue a blocking wait for the next result asynchronously on a different thread. + auto future = launchAsync([&]() { + auto nextStatus = blockingMerger.next(operationContext(), + RouterExecStage::ExecContext::kGetMoreNoResultsYet); + ASSERT_EQ(nextStatus.getStatus(), ErrorCodes::Interrupted); + }); + + // Now mark the OperationContext as killed from this thread. + { + stdx::lock_guard<Client> lk(*operationContext()->getClient()); + operationContext()->markKilled(ErrorCodes::Interrupted); + } + // Wait for the merger to be interrupted. + future.timed_get(kFutureTimeout); + + // Now that we've seen it interrupted, kill it. We have to do this in another thread because + // killing a BlockingResultsMerger involves running a killCursors, and this main thread is in + // charge of scheduling the response to that request. + future = launchAsync([&]() { blockingMerger.kill(operationContext()); }); + while (!networkHasReadyRequests() || !getNthPendingRequest(0u).cmdObj["killCursors"]) { + // Wait for the kill to schedule it's killCursors. It may schedule a getMore first before + // cancelling it, so wait until the pending request is actually a killCursors. + } + assertKillCusorsCmdHasCursorId(getNthPendingRequest(0u).cmdObj, 1); + + // Run the callback for the killCursors. We don't actually inspect the value so we don't have to + // schedule a response. + runReadyCallbacks(); + future.timed_get(kFutureTimeout); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp index 1b3a665df5e..acda45f66f0 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.cpp +++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp @@ -30,14 +30,8 @@ #include "mongo/s/query/cluster_client_cursor_impl.h" -#include "mongo/db/pipeline/cluster_aggregation_planner.h" -#include "mongo/db/pipeline/document_source_limit.h" -#include "mongo/db/pipeline/document_source_skip.h" -#include "mongo/db/pipeline/document_source_sort.h" #include "mongo/s/query/router_stage_limit.h" #include "mongo/s/query/router_stage_merge.h" -#include "mongo/s/query/router_stage_mock.h" -#include "mongo/s/query/router_stage_pipeline.h" #include "mongo/s/query/router_stage_remove_metadata_fields.h" #include "mongo/s/query/router_stage_skip.h" #include "mongo/stdx/memory.h" @@ -70,6 +64,14 @@ ClusterClientCursorGuard ClusterClientCursorImpl::make(OperationContext* opCtx, return ClusterClientCursorGuard(opCtx, std::move(cursor)); } +ClusterClientCursorGuard ClusterClientCursorImpl::make(OperationContext* opCtx, + std::unique_ptr<RouterExecStage> root, + ClusterClientCursorParams&& params) { + std::unique_ptr<ClusterClientCursor> cursor(new ClusterClientCursorImpl( + opCtx, std::move(root), std::move(params), opCtx->getLogicalSessionId())); + return ClusterClientCursorGuard(opCtx, std::move(cursor)); +} + ClusterClientCursorImpl::ClusterClientCursorImpl(OperationContext* opCtx, executor::TaskExecutor* executor, ClusterClientCursorParams&& params, @@ -84,7 +86,7 @@ ClusterClientCursorImpl::ClusterClientCursorImpl(OperationContext* opCtx, } ClusterClientCursorImpl::ClusterClientCursorImpl(OperationContext* opCtx, - std::unique_ptr<RouterStageMock> root, + std::unique_ptr<RouterExecStage> root, ClusterClientCursorParams&& params, boost::optional<LogicalSessionId> lsid) : _params(std::move(params)), _root(std::move(root)), _lsid(lsid), _opCtx(opCtx) { @@ -183,81 +185,13 @@ boost::optional<ReadPreferenceSetting> ClusterClientCursorImpl::getReadPreferenc return _params.readPreference; } -namespace { - -bool isSkipOrLimit(const boost::intrusive_ptr<DocumentSource>& stage) { - return (dynamic_cast<DocumentSourceLimit*>(stage.get()) || - dynamic_cast<DocumentSourceSkip*>(stage.get())); -} - -bool isAllLimitsAndSkips(Pipeline* pipeline) { - const auto stages = pipeline->getSources(); - return std::all_of( - stages.begin(), stages.end(), [&](const auto& stage) { return isSkipOrLimit(stage); }); -} - -/** - * Creates the initial stage to feed data into the execution plan. By default, a RouterExecMerge - * stage, or a custom stage if specified in 'params->creatCustomMerge'. - */ -std::unique_ptr<RouterExecStage> createInitialStage(OperationContext* opCtx, - executor::TaskExecutor* executor, - ClusterClientCursorParams* params) { - if (params->createCustomCursorSource) { - return params->createCustomCursorSource(opCtx, executor, params); - } else { - return stdx::make_unique<RouterStageMerge>(opCtx, executor, params); - } -} - -std::unique_ptr<RouterExecStage> buildPipelinePlan(executor::TaskExecutor* executor, - ClusterClientCursorParams* params) { - invariant(params->mergePipeline); - invariant(!params->skip); - invariant(!params->limit); - auto* pipeline = params->mergePipeline.get(); - auto* opCtx = pipeline->getContext()->opCtx; - - std::unique_ptr<RouterExecStage> root = createInitialStage(opCtx, executor, params); - if (!isAllLimitsAndSkips(pipeline)) { - return stdx::make_unique<RouterStagePipeline>(std::move(root), - std::move(params->mergePipeline)); - } - - // After extracting an optional leading $sort, the pipeline consisted entirely of $skip and - // $limit stages. Avoid creating a RouterStagePipeline (which will go through an expensive - // conversion from BSONObj -> Document for each result), and create a RouterExecStage tree - // instead. - while (!pipeline->getSources().empty()) { - invariant(isSkipOrLimit(pipeline->getSources().front())); - if (auto skip = pipeline->popFrontWithName(DocumentSourceSkip::kStageName)) { - root = stdx::make_unique<RouterStageSkip>( - opCtx, std::move(root), static_cast<DocumentSourceSkip*>(skip.get())->getSkip()); - } else if (auto limit = pipeline->popFrontWithName(DocumentSourceLimit::kStageName)) { - root = stdx::make_unique<RouterStageLimit>( - opCtx, std::move(root), static_cast<DocumentSourceLimit*>(limit.get())->getLimit()); - } - } - // We are executing the pipeline without using an actual Pipeline, so we need to strip out any - // Document metadata ourselves. - return stdx::make_unique<RouterStageRemoveMetadataFields>( - opCtx, std::move(root), Document::allMetadataFieldNames); -} -} // namespace - std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan( OperationContext* opCtx, executor::TaskExecutor* executor, ClusterClientCursorParams* params) { const auto skip = params->skip; const auto limit = params->limit; - if (params->mergePipeline) { - if (auto sort = - cluster_aggregation_planner::popLeadingMergeSort(params->mergePipeline.get())) { - params->sort = *sort; - } - return buildPipelinePlan(executor, params); - } - std::unique_ptr<RouterExecStage> root = createInitialStage(opCtx, executor, params); + std::unique_ptr<RouterExecStage> root = + std::make_unique<RouterStageMerge>(opCtx, executor, params->extractARMParams()); if (skip) { root = stdx::make_unique<RouterStageSkip>(opCtx, std::move(root), *skip); diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h index 36f9d3995c8..04e97cad3d9 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.h +++ b/src/mongo/s/query/cluster_client_cursor_impl.h @@ -83,12 +83,21 @@ class ClusterClientCursorImpl final : public ClusterClientCursor { public: /** - * Constructs a CCC whose safe cleanup is ensured by an RAII object. + * Constructs a cluster query plan and CCC from the given parameters whose safe cleanup is + * ensured by an RAII object. */ static ClusterClientCursorGuard make(OperationContext* opCtx, executor::TaskExecutor* executor, ClusterClientCursorParams&& params); + /** + * Constructs a CCC from the given execution tree 'root'. The CCC's safe cleanup is ensured by + * an RAII object. + */ + static ClusterClientCursorGuard make(OperationContext* opCtx, + std::unique_ptr<RouterExecStage> root, + ClusterClientCursorParams&& params); + StatusWith<ClusterQueryResult> next(RouterExecStage::ExecContext) final; void kill(OperationContext* opCtx) final; @@ -122,12 +131,11 @@ public: boost::optional<ReadPreferenceSetting> getReadPreference() const final; public: - /** private for tests */ /** * Constructs a CCC whose result set is generated by a mock execution stage. */ ClusterClientCursorImpl(OperationContext* opCtx, - std::unique_ptr<RouterStageMock> root, + std::unique_ptr<RouterExecStage> root, ClusterClientCursorParams&& params, boost::optional<LogicalSessionId> lsid); diff --git a/src/mongo/s/query/cluster_client_cursor_params.h b/src/mongo/s/query/cluster_client_cursor_params.h index c2d300ee19e..a853d26a99f 100644 --- a/src/mongo/s/query/cluster_client_cursor_params.h +++ b/src/mongo/s/query/cluster_client_cursor_params.h @@ -123,9 +123,6 @@ struct ClusterClientCursorParams { // Should be forwarded to the remote hosts in 'cmdObj'. boost::optional<long long> limit; - // If set, we use this pipeline to merge the output of aggregations on each remote. - std::unique_ptr<Pipeline, PipelineDeleter> mergePipeline; - // Whether this cursor is tailing a capped collection, and whether it has the awaitData option // set. TailableModeEnum tailableMode = TailableModeEnum::kNormal; @@ -133,12 +130,6 @@ struct ClusterClientCursorParams { // Set if a readPreference must be respected throughout the lifetime of the cursor. boost::optional<ReadPreferenceSetting> readPreference; - // If valid, is called to return the RouterExecStage which becomes the initial source in this - // cursor's execution plan. Otherwise, a RouterStageMerge is used. - stdx::function<std::unique_ptr<RouterExecStage>( - OperationContext*, executor::TaskExecutor*, ClusterClientCursorParams*)> - createCustomCursorSource; - // Whether the client indicated that it is willing to receive partial results in the case of an // unreachable host. bool isAllowPartialResults = false; diff --git a/src/mongo/s/query/document_source_router_adapter.cpp b/src/mongo/s/query/document_source_router_adapter.cpp deleted file mode 100644 index 26a944ed5cc..00000000000 --- a/src/mongo/s/query/document_source_router_adapter.cpp +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Copyright (C) 2017 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/s/query/document_source_router_adapter.h" - -#include "mongo/db/pipeline/document_source.h" -#include "mongo/db/pipeline/expression_context.h" - -namespace mongo { - -boost::intrusive_ptr<DocumentSourceRouterAdapter> DocumentSourceRouterAdapter::create( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - std::unique_ptr<RouterExecStage> childStage) { - return new DocumentSourceRouterAdapter(expCtx, std::move(childStage)); -} - -DocumentSource::GetNextResult DocumentSourceRouterAdapter::getNext() { - auto next = uassertStatusOK(_child->next(_execContext)); - if (auto nextObj = next.getResult()) { - return Document::fromBsonWithMetaData(*nextObj); - } - return GetNextResult::makeEOF(); -} - -void DocumentSourceRouterAdapter::doDispose() { - _child->kill(pExpCtx->opCtx); -} - -void DocumentSourceRouterAdapter::reattachToOperationContext(OperationContext* opCtx) { - _child->reattachToOperationContext(opCtx); -} - -void DocumentSourceRouterAdapter::detachFromOperationContext() { - _child->detachFromOperationContext(); -} - -Value DocumentSourceRouterAdapter::serialize( - boost::optional<ExplainOptions::Verbosity> explain) const { - invariant(explain); // We shouldn't need to serialize this stage to send it anywhere. - return Value(); // Return the empty value to hide this stage from explain output. -} - -std::size_t DocumentSourceRouterAdapter::getNumRemotes() const { - return _child->getNumRemotes(); -} - -bool DocumentSourceRouterAdapter::remotesExhausted() { - return _child->remotesExhausted(); -} - -DocumentSourceRouterAdapter::DocumentSourceRouterAdapter( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - std::unique_ptr<RouterExecStage> childStage) - : DocumentSource(expCtx), _child(std::move(childStage)) {} - -} // namespace mongo diff --git a/src/mongo/s/query/document_source_router_adapter.h b/src/mongo/s/query/document_source_router_adapter.h deleted file mode 100644 index a7db7734539..00000000000 --- a/src/mongo/s/query/document_source_router_adapter.h +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Copyright (C) 2017 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -#pragma once - -#include "mongo/s/query/router_exec_stage.h" - -#include "mongo/db/pipeline/document_source.h" -#include "mongo/db/pipeline/pipeline.h" - -namespace mongo { -/** - * A class that acts as an adapter between the RouterExecStage and DocumentSource interfaces, - * translating results from an input RouterExecStage into DocumentSource::GetNextResults. - */ -class DocumentSourceRouterAdapter final : public DocumentSource { -public: - static boost::intrusive_ptr<DocumentSourceRouterAdapter> create( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - std::unique_ptr<RouterExecStage> childStage); - - StageConstraints constraints(Pipeline::SplitState pipeState) const final { - return {StreamType::kStreaming, - PositionRequirement::kFirst, - HostTypeRequirement::kMongoS, - DiskUseRequirement::kNoDiskUse, - FacetRequirement::kNotAllowed, - TransactionRequirement::kAllowed}; - } - - GetNextResult getNext() final; - void doDispose() final; - void reattachToOperationContext(OperationContext* opCtx) final; - void detachFromOperationContext() final; - Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final; - bool remotesExhausted(); - std::size_t getNumRemotes() const; - - void setExecContext(RouterExecStage::ExecContext execContext) { - _execContext = execContext; - } - - Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) const { - return _child->setAwaitDataTimeout(awaitDataTimeout); - } - -private: - DocumentSourceRouterAdapter(const boost::intrusive_ptr<ExpressionContext>& expCtx, - std::unique_ptr<RouterExecStage> childStage); - - std::unique_ptr<RouterExecStage> _child; - RouterExecStage::ExecContext _execContext; -}; -} // namespace mongo diff --git a/src/mongo/s/query/results_merger_test_fixture.cpp b/src/mongo/s/query/results_merger_test_fixture.cpp new file mode 100644 index 00000000000..57033523c68 --- /dev/null +++ b/src/mongo/s/query/results_merger_test_fixture.cpp @@ -0,0 +1,76 @@ +/** + * Copyright 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/client/remote_command_targeter_factory_mock.h" +#include "mongo/client/remote_command_targeter_mock.h" +#include "mongo/executor/network_interface_mock.h" +#include "mongo/executor/thread_pool_task_executor_test_fixture.h" +#include "mongo/s/catalog/type_shard.h" +#include "mongo/s/query/results_merger_test_fixture.h" + +namespace mongo { +const HostAndPort ResultsMergerTestFixture::kTestConfigShardHost = + HostAndPort("FakeConfigHost", 12345); +const std::vector<ShardId> ResultsMergerTestFixture::kTestShardIds = { + ShardId("FakeShard1"), ShardId("FakeShard2"), ShardId("FakeShard3")}; +const std::vector<HostAndPort> ResultsMergerTestFixture::kTestShardHosts = { + HostAndPort("FakeShard1Host", 12345), + HostAndPort("FakeShard2Host", 12345), + HostAndPort("FakeShard3Host", 12345)}; + +const NamespaceString ResultsMergerTestFixture::kTestNss = NamespaceString{"testdb.testcoll"}; + +void ResultsMergerTestFixture::setUp() { + setRemote(HostAndPort("ClientHost", 12345)); + + configTargeter()->setFindHostReturnValue(kTestConfigShardHost); + + std::vector<ShardType> shards; + + for (size_t i = 0; i < kTestShardIds.size(); i++) { + ShardType shardType; + shardType.setName(kTestShardIds[i].toString()); + shardType.setHost(kTestShardHosts[i].toString()); + + shards.push_back(shardType); + + std::unique_ptr<RemoteCommandTargeterMock> targeter( + stdx::make_unique<RemoteCommandTargeterMock>()); + targeter->setConnectionStringReturnValue(ConnectionString(kTestShardHosts[i])); + targeter->setFindHostReturnValue(kTestShardHosts[i]); + + targeterFactory()->addTargeterToReturn(ConnectionString(kTestShardHosts[i]), + std::move(targeter)); + } + + setupShards(shards); +} + +} // namespace mongo diff --git a/src/mongo/s/query/results_merger_test_fixture.h b/src/mongo/s/query/results_merger_test_fixture.h new file mode 100644 index 00000000000..1252f22b793 --- /dev/null +++ b/src/mongo/s/query/results_merger_test_fixture.h @@ -0,0 +1,228 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/s/query/async_results_merger.h" +#include "mongo/s/sharding_router_test_fixture.h" + +namespace mongo { + +/** + * Test fixture which is useful to both the tests for AsyncResultsMerger and BlockingResultsMerger. + */ +class ResultsMergerTestFixture : public ShardingTestFixture { +public: + static const HostAndPort kTestConfigShardHost; + static const std::vector<ShardId> kTestShardIds; + static const std::vector<HostAndPort> kTestShardHosts; + + static const NamespaceString kTestNss; + + ResultsMergerTestFixture() {} + + void setUp() override; + +protected: + /** + * Constructs an AsyncResultsMergerParams object with the given vector of existing cursors. + * + * If 'findCmd' is not set, the default AsyncResultsMergerParams are used. Otherwise, the + * 'findCmd' is used to construct the AsyncResultsMergerParams. + * + * 'findCmd' should not have a 'batchSize', since the find's batchSize is used just in the + * initial find. The getMore 'batchSize' can be passed in through 'getMoreBatchSize.' + */ + AsyncResultsMergerParams makeARMParamsFromExistingCursors( + std::vector<RemoteCursor> remoteCursors, + boost::optional<BSONObj> findCmd = boost::none, + boost::optional<std::int64_t> getMoreBatchSize = boost::none) { + AsyncResultsMergerParams params; + params.setNss(kTestNss); + params.setRemotes(std::move(remoteCursors)); + + + if (findCmd) { + const auto qr = unittest::assertGet( + QueryRequest::makeFromFindCommand(kTestNss, *findCmd, false /* isExplain */)); + if (!qr->getSort().isEmpty()) { + params.setSort(qr->getSort().getOwned()); + } + + if (getMoreBatchSize) { + params.setBatchSize(getMoreBatchSize); + } else { + params.setBatchSize(qr->getBatchSize() + ? boost::optional<std::int64_t>( + static_cast<std::int64_t>(*qr->getBatchSize())) + : boost::none); + } + params.setTailableMode(qr->getTailableMode()); + params.setAllowPartialResults(qr->isAllowPartialResults()); + } + + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(operationContext()->getLogicalSessionId()); + sessionInfo.setTxnNumber(operationContext()->getTxnNumber()); + params.setOperationSessionInfo(sessionInfo); + return params; + } + /** + * Constructs an ARM with the given vector of existing cursors. + * + * If 'findCmd' is not set, the default AsyncResultsMergerParams are used. + * Otherwise, the 'findCmd' is used to construct the AsyncResultsMergerParams. + * + * 'findCmd' should not have a 'batchSize', since the find's batchSize is used just in the + * initial find. The getMore 'batchSize' can be passed in through 'getMoreBatchSize.' + */ + std::unique_ptr<AsyncResultsMerger> makeARMFromExistingCursors( + std::vector<RemoteCursor> remoteCursors, + boost::optional<BSONObj> findCmd = boost::none, + boost::optional<std::int64_t> getMoreBatchSize = boost::none) { + + return stdx::make_unique<AsyncResultsMerger>( + operationContext(), + executor(), + makeARMParamsFromExistingCursors(std::move(remoteCursors), findCmd, getMoreBatchSize)); + } + + /** + * Schedules a "CommandOnShardedViewNotSupportedOnMongod" error response w/ view definition. + */ + void scheduleNetworkViewResponse(const std::string& ns, const std::string& pipelineJsonArr) { + BSONObjBuilder viewDefBob; + viewDefBob.append("ns", ns); + viewDefBob.append("pipeline", fromjson(pipelineJsonArr)); + + BSONObjBuilder bob; + bob.append("resolvedView", viewDefBob.obj()); + bob.append("ok", 0.0); + bob.append("errmsg", "Command on view must be executed by mongos"); + bob.append("code", 169); + + std::vector<BSONObj> batch = {bob.obj()}; + scheduleNetworkResponseObjs(batch); + } + + /** + * Schedules a list of cursor responses to be returned by the mock network. + */ + void scheduleNetworkResponses(std::vector<CursorResponse> responses) { + std::vector<BSONObj> objs; + for (const auto& cursorResponse : responses) { + // For tests of the AsyncResultsMerger, all CursorRepsonses scheduled by the tests are + // subsequent responses, since the AsyncResultsMerger will only ever run getMores. + objs.push_back(cursorResponse.toBSON(CursorResponse::ResponseType::SubsequentResponse)); + } + scheduleNetworkResponseObjs(objs); + } + + /** + * Schedules a list of raw BSON command responses to be returned by the mock network. + */ + void scheduleNetworkResponseObjs(std::vector<BSONObj> objs) { + executor::NetworkInterfaceMock* net = network(); + net->enterNetwork(); + for (const auto& obj : objs) { + ASSERT_TRUE(net->hasReadyRequests()); + Milliseconds millis(0); + executor::RemoteCommandResponse response(obj, millis); + executor::TaskExecutor::ResponseStatus responseStatus(response); + net->scheduleResponse(net->getNextReadyRequest(), net->now(), responseStatus); + } + net->runReadyNetworkOperations(); + net->exitNetwork(); + } + + executor::RemoteCommandRequest getNthPendingRequest(size_t n) { + executor::NetworkInterfaceMock* net = network(); + net->enterNetwork(); + ASSERT_TRUE(net->hasReadyRequests()); + executor::NetworkInterfaceMock::NetworkOperationIterator noi = + net->getNthUnscheduledRequest(n); + executor::RemoteCommandRequest retRequest = noi->getRequest(); + net->exitNetwork(); + return retRequest; + } + + bool networkHasReadyRequests() { + executor::NetworkInterfaceMock::InNetworkGuard guard(network()); + return guard->hasReadyRequests(); + } + + void scheduleErrorResponse(executor::ResponseStatus rs) { + invariant(!rs.isOK()); + rs.elapsedMillis = Milliseconds(0); + executor::NetworkInterfaceMock* net = network(); + net->enterNetwork(); + ASSERT_TRUE(net->hasReadyRequests()); + net->scheduleResponse(net->getNextReadyRequest(), net->now(), rs); + net->runReadyNetworkOperations(); + net->exitNetwork(); + } + + void runReadyCallbacks() { + executor::NetworkInterfaceMock* net = network(); + net->enterNetwork(); + net->runReadyNetworkOperations(); + net->exitNetwork(); + } + + void blackHoleNextRequest() { + executor::NetworkInterfaceMock* net = network(); + net->enterNetwork(); + ASSERT_TRUE(net->hasReadyRequests()); + net->blackHole(net->getNextReadyRequest()); + net->exitNetwork(); + } + + void assertKillCusorsCmdHasCursorId(const BSONObj& killCmd, CursorId cursorId) { + std::cout << "CHARLIE: " << killCmd; + ASSERT_TRUE(killCmd.hasElement("killCursors")); + ASSERT_EQ(killCmd["cursors"].type(), BSONType::Array); + + size_t numCursors = 0; + for (auto&& cursor : killCmd["cursors"].Obj()) { + ASSERT_EQ(cursor.type(), BSONType::NumberLong); + ASSERT_EQ(cursor.numberLong(), cursorId); + ++numCursors; + } + ASSERT_EQ(numCursors, 1u); + } + + RemoteCursor makeRemoteCursor(ShardId shardId, HostAndPort host, CursorResponse response) { + RemoteCursor remoteCursor; + remoteCursor.setShardId(std::move(shardId)); + remoteCursor.setHostAndPort(std::move(host)); + remoteCursor.setCursorResponse(std::move(response)); + return remoteCursor; + } +}; + +} // namespace mongo
\ No newline at end of file diff --git a/src/mongo/s/query/router_stage_merge.cpp b/src/mongo/s/query/router_stage_merge.cpp deleted file mode 100644 index 967c9f60b35..00000000000 --- a/src/mongo/s/query/router_stage_merge.cpp +++ /dev/null @@ -1,132 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery - -#include "mongo/platform/basic.h" - -#include "mongo/s/query/router_stage_merge.h" - -#include "mongo/db/query/find_common.h" -#include "mongo/util/scopeguard.h" - -namespace mongo { - -RouterStageMerge::RouterStageMerge(OperationContext* opCtx, - executor::TaskExecutor* executor, - ClusterClientCursorParams* params) - : RouterExecStage(opCtx), - _executor(executor), - _params(params), - _arm(opCtx, executor, params->extractARMParams()) {} - -StatusWith<ClusterQueryResult> RouterStageMerge::next(ExecContext execCtx) { - // Non-tailable and tailable non-awaitData cursors always block until ready(). AwaitData - // cursors wait for ready() only until a specified time limit is exceeded. - return (_params->tailableMode == TailableModeEnum::kTailableAndAwaitData - ? awaitNextWithTimeout(execCtx) - : _arm.blockingNext()); -} - -StatusWith<ClusterQueryResult> RouterStageMerge::awaitNextWithTimeout(ExecContext execCtx) { - invariant(_params->tailableMode == TailableModeEnum::kTailableAndAwaitData); - // If we are in kInitialFind or kGetMoreWithAtLeastOneResultInBatch context and the ARM is not - // ready, we don't block. Fall straight through to the return statement. - while (!_arm.ready() && execCtx == ExecContext::kGetMoreNoResultsYet) { - auto nextEventStatus = getNextEvent(); - if (!nextEventStatus.isOK()) { - return nextEventStatus.getStatus(); - } - auto event = nextEventStatus.getValue(); - - // Block until there are further results to return, or our time limit is exceeded. - auto waitStatus = _executor->waitForEvent( - getOpCtx(), event, awaitDataState(getOpCtx()).waitForInsertsDeadline); - - if (!waitStatus.isOK()) { - return waitStatus.getStatus(); - } - // Swallow timeout errors for tailable awaitData cursors, stash the event that we were - // waiting on, and return EOF. - if (waitStatus == stdx::cv_status::timeout) { - _leftoverEventFromLastTimeout = std::move(event); - return ClusterQueryResult{}; - } - } - - // We reach this point either if the ARM is ready, or if the ARM is !ready and we are in - // kInitialFind or kGetMoreWithAtLeastOneResultInBatch ExecContext. In the latter case, we - // return EOF immediately rather than blocking for further results. - return _arm.ready() ? _arm.nextReady() : ClusterQueryResult{}; -} - -StatusWith<EventHandle> RouterStageMerge::getNextEvent() { - // If we abandoned a previous event due to a mongoS-side timeout, wait for it first. - if (_leftoverEventFromLastTimeout) { - invariant(_params->tailableMode == TailableModeEnum::kTailableAndAwaitData); - // If we have an outstanding event from last time, then we might have to manually schedule - // some getMores for the cursors. If a remote response came back while we were between - // getMores (from the user to mongos), the response may have been an empty batch, and the - // ARM would not be able to ask for the next batch immediately since it is not attached to - // an OperationContext. Now that we have a valid OperationContext, we schedule the getMores - // ourselves. - Status getMoreStatus = _arm.scheduleGetMores(); - if (!getMoreStatus.isOK()) { - return getMoreStatus; - } - - // Return the leftover event and clear '_leftoverEventFromLastTimeout'. - auto event = _leftoverEventFromLastTimeout; - _leftoverEventFromLastTimeout = EventHandle(); - return event; - } - - return _arm.nextEvent(); -} - -void RouterStageMerge::kill(OperationContext* opCtx) { - _arm.blockingKill(opCtx); -} - -bool RouterStageMerge::remotesExhausted() { - return _arm.remotesExhausted(); -} - -std::size_t RouterStageMerge::getNumRemotes() const { - return _arm.getNumRemotes(); -} - -Status RouterStageMerge::doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) { - return _arm.setAwaitDataTimeout(awaitDataTimeout); -} - -void RouterStageMerge::addNewShardCursors(std::vector<RemoteCursor>&& newShards) { - _arm.addNewShardCursors(std::move(newShards)); -} - -} // namespace mongo diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h index b6bfee146b6..c0a847f7bd2 100644 --- a/src/mongo/s/query/router_stage_merge.h +++ b/src/mongo/s/query/router_stage_merge.h @@ -29,74 +29,56 @@ #pragma once #include "mongo/executor/task_executor.h" -#include "mongo/s/query/async_results_merger.h" +#include "mongo/s/query/blocking_results_merger.h" #include "mongo/s/query/cluster_client_cursor_params.h" #include "mongo/s/query/router_exec_stage.h" #include "mongo/util/net/hostandport.h" namespace mongo { -namespace { -using EventHandle = executor::TaskExecutor::EventHandle; -} // namespace - /** - * Draws results from the AsyncResultsMerger, which is the underlying source of the stream of merged - * documents manipulated by the RouterExecStage pipeline. Used to present a stream of documents - * merged from the shards to the stages later in the pipeline. + * Serves as an adapter between the RouterExecStage interface and the BlockingResultsMerger + * interface, providing a single stream of results populated from many remote streams. */ class RouterStageMerge final : public RouterExecStage { public: RouterStageMerge(OperationContext* opCtx, executor::TaskExecutor* executor, - ClusterClientCursorParams* params); - - StatusWith<ClusterQueryResult> next(ExecContext) final; + AsyncResultsMergerParams&& armParams) + : RouterExecStage(opCtx), _resultsMerger(opCtx, std::move(armParams), executor) {} - void kill(OperationContext* opCtx) final; + StatusWith<ClusterQueryResult> next(ExecContext execCtx) final { + return _resultsMerger.next(getOpCtx(), execCtx); + } - bool remotesExhausted() final; + void kill(OperationContext* opCtx) final { + _resultsMerger.kill(opCtx); + } - std::size_t getNumRemotes() const final; + bool remotesExhausted() final { + return _resultsMerger.remotesExhausted(); + } - /** - * Adds the cursors in 'newShards' to those being merged by the ARM. - */ - void addNewShardCursors(std::vector<RemoteCursor>&& newShards); + std::size_t getNumRemotes() const final { + return _resultsMerger.getNumRemotes(); + } protected: - Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final; + Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final { + return _resultsMerger.setAwaitDataTimeout(awaitDataTimeout); + } void doReattachToOperationContext() override { - _arm.reattachToOperationContext(getOpCtx()); + _resultsMerger.reattachToOperationContext(getOpCtx()); } virtual void doDetachFromOperationContext() { - _arm.detachFromOperationContext(); + _resultsMerger.detachFromOperationContext(); } private: - /** - * Awaits the next result from the ARM up to a specified time limit. If this is the user's - * initial find or we have already obtained at least one result for this batch, this method - * returns EOF immediately rather than blocking. - */ - StatusWith<ClusterQueryResult> awaitNextWithTimeout(ExecContext execCtx); - - /** - * Returns the next event to wait upon - either a new event from the ARM, or a valid preceding - * event which we scheduled during the previous call to next(). - */ - StatusWith<EventHandle> getNextEvent(); - - // Not owned here. - executor::TaskExecutor* _executor; - EventHandle _leftoverEventFromLastTimeout; - - ClusterClientCursorParams* _params; - // Schedules remote work and merges results from 'remotes'. - AsyncResultsMerger _arm; + BlockingResultsMerger _resultsMerger; }; } // namespace mongo diff --git a/src/mongo/s/query/router_stage_pipeline.cpp b/src/mongo/s/query/router_stage_pipeline.cpp index 5e94274b9ac..a5a97bdbdbc 100644 --- a/src/mongo/s/query/router_stage_pipeline.cpp +++ b/src/mongo/s/query/router_stage_pipeline.cpp @@ -35,26 +35,20 @@ #include "mongo/db/pipeline/document_source_list_local_sessions.h" #include "mongo/db/pipeline/document_source_merge_cursors.h" #include "mongo/db/pipeline/expression_context.h" -#include "mongo/s/query/document_source_router_adapter.h" namespace mongo { -RouterStagePipeline::RouterStagePipeline(std::unique_ptr<RouterExecStage> child, - std::unique_ptr<Pipeline, PipelineDeleter> mergePipeline) +RouterStagePipeline::RouterStagePipeline(std::unique_ptr<Pipeline, PipelineDeleter> mergePipeline) : RouterExecStage(mergePipeline->getContext()->opCtx), - _mergePipeline(std::move(mergePipeline)), - _mongosOnlyPipeline(!_mergePipeline->isSplitForMerge()) { - if (!_mongosOnlyPipeline) { - // Add an adapter to the front of the pipeline to draw results from 'child'. - _routerAdapter = - DocumentSourceRouterAdapter::create(_mergePipeline->getContext(), std::move(child)), - _mergePipeline->addInitialSource(_routerAdapter); - } + _mergePipeline(std::move(mergePipeline)) { + invariant(!_mergePipeline->getSources().empty()); + _mergeCursorsStage = + dynamic_cast<DocumentSourceMergeCursors*>(_mergePipeline->getSources().front().get()); } StatusWith<ClusterQueryResult> RouterStagePipeline::next(RouterExecStage::ExecContext execContext) { - if (_routerAdapter) { - _routerAdapter->setExecContext(execContext); + if (_mergeCursorsStage) { + _mergeCursorsStage->setExecContext(execContext); } // Pipeline::getNext will return a boost::optional<Document> or boost::none if EOF. @@ -85,15 +79,20 @@ void RouterStagePipeline::kill(OperationContext* opCtx) { } std::size_t RouterStagePipeline::getNumRemotes() const { - return _mongosOnlyPipeline ? 0 : _routerAdapter->getNumRemotes(); + if (_mergeCursorsStage) { + return _mergeCursorsStage->getNumRemotes(); + } + return 0; } bool RouterStagePipeline::remotesExhausted() { - return _mongosOnlyPipeline || _routerAdapter->remotesExhausted(); + return !_mergeCursorsStage || _mergeCursorsStage->remotesExhausted(); } Status RouterStagePipeline::doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) { - return _routerAdapter->setAwaitDataTimeout(awaitDataTimeout); + invariant(_mergeCursorsStage, + "The only cursors which should be tailable are those with remote cursors."); + return _mergeCursorsStage->setAwaitDataTimeout(awaitDataTimeout); } } // namespace mongo diff --git a/src/mongo/s/query/router_stage_pipeline.h b/src/mongo/s/query/router_stage_pipeline.h index c14ddf9f80b..43706b42cd9 100644 --- a/src/mongo/s/query/router_stage_pipeline.h +++ b/src/mongo/s/query/router_stage_pipeline.h @@ -31,8 +31,8 @@ #include "mongo/s/query/router_exec_stage.h" #include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_merge_cursors.h" #include "mongo/db/pipeline/pipeline.h" -#include "mongo/s/query/document_source_router_adapter.h" namespace mongo { @@ -42,8 +42,7 @@ namespace mongo { */ class RouterStagePipeline final : public RouterExecStage { public: - RouterStagePipeline(std::unique_ptr<RouterExecStage> child, - std::unique_ptr<Pipeline, PipelineDeleter> mergePipeline); + RouterStagePipeline(std::unique_ptr<Pipeline, PipelineDeleter> mergePipeline); StatusWith<ClusterQueryResult> next(RouterExecStage::ExecContext execContext) final; @@ -61,8 +60,10 @@ protected: void doDetachFromOperationContext() final; private: - boost::intrusive_ptr<DocumentSourceRouterAdapter> _routerAdapter; std::unique_ptr<Pipeline, PipelineDeleter> _mergePipeline; - bool _mongosOnlyPipeline; + + // May be null if this pipeline is executing exclusively on mongos and will not contact the + // shards at all. + boost::intrusive_ptr<DocumentSourceMergeCursors> _mergeCursorsStage; }; } // namespace mongo diff --git a/src/mongo/s/query/router_stage_update_on_add_shard.cpp b/src/mongo/s/query/router_stage_update_on_add_shard.cpp deleted file mode 100644 index 61fa2a9176d..00000000000 --- a/src/mongo/s/query/router_stage_update_on_add_shard.cpp +++ /dev/null @@ -1,122 +0,0 @@ -/** - * Copyright (C) 2017 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery -#include "mongo/s/query/router_stage_update_on_add_shard.h" - -#include <algorithm> - -#include "mongo/base/checked_cast.h" -#include "mongo/db/pipeline/document_source_change_stream.h" -#include "mongo/executor/task_executor_pool.h" -#include "mongo/s/client/shard_registry.h" -#include "mongo/s/grid.h" -#include "mongo/s/query/establish_cursors.h" -#include "mongo/s/query/router_stage_merge.h" -#include "mongo/util/log.h" - -namespace mongo { -namespace { - -// Returns true if the change stream document has an 'operationType' of 'newShardDetected'. -bool needsUpdate(const StatusWith<ClusterQueryResult>& childResult) { - if (!childResult.isOK() || childResult.getValue().isEOF()) { - return false; - } - return ((*childResult.getValue().getResult())[DocumentSourceChangeStream::kOperationTypeField] - .str() == DocumentSourceChangeStream::kNewShardDetectedOpType); -} -} - -RouterStageUpdateOnAddShard::RouterStageUpdateOnAddShard(OperationContext* opCtx, - executor::TaskExecutor* executor, - ClusterClientCursorParams* params, - std::vector<ShardId> shardIds, - BSONObj cmdToRunOnNewShards) - : RouterExecStage(opCtx, stdx::make_unique<RouterStageMerge>(opCtx, executor, params)), - _params(params), - _shardIds(std::move(shardIds)), - _cmdToRunOnNewShards(cmdToRunOnNewShards) {} - -StatusWith<ClusterQueryResult> RouterStageUpdateOnAddShard::next( - RouterExecStage::ExecContext execContext) { - auto childStage = getChildStage(); - auto childResult = childStage->next(execContext); - while (needsUpdate(childResult)) { - addNewShardCursors(*childResult.getValue().getResult()); - childResult = childStage->next(execContext); - } - return childResult; -} - -void RouterStageUpdateOnAddShard::addNewShardCursors(BSONObj newShardDetectedObj) { - checked_cast<RouterStageMerge*>(getChildStage()) - ->addNewShardCursors(establishShardCursorsOnNewShards(newShardDetectedObj)); -} - -std::vector<RemoteCursor> RouterStageUpdateOnAddShard::establishShardCursorsOnNewShards( - const BSONObj& newShardDetectedObj) { - auto* opCtx = getOpCtx(); - // Reload the shard registry. We need to ensure a reload initiated after calling this method - // caused the reload, otherwise we aren't guaranteed to get all the new shards. - auto* shardRegistry = Grid::get(opCtx)->shardRegistry(); - if (!shardRegistry->reload(opCtx)) { - // A 'false' return from shardRegistry.reload() means a reload was already in progress and - // it completed before reload() returned. So another reload(), regardless of return - // value, will ensure a reload started after the first call to reload(). - shardRegistry->reload(opCtx); - } - - std::vector<ShardId> shardIds, newShardIds; - shardRegistry->getAllShardIdsNoReload(&shardIds); - std::sort(_shardIds.begin(), _shardIds.end()); - std::sort(shardIds.begin(), shardIds.end()); - std::set_difference(shardIds.begin(), - shardIds.end(), - _shardIds.begin(), - _shardIds.end(), - std::back_inserter(newShardIds)); - - auto cmdObj = DocumentSourceChangeStream::replaceResumeTokenInCommand( - _cmdToRunOnNewShards, - newShardDetectedObj[DocumentSourceChangeStream::kIdField].embeddedObject()); - std::vector<std::pair<ShardId, BSONObj>> requests; - for (const auto& shardId : newShardIds) { - requests.emplace_back(shardId, cmdObj); - _shardIds.push_back(shardId); - } - const bool allowPartialResults = false; // partial results are not allowed - return establishCursors(opCtx, - Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), - _params->nsString, - _params->readPreference.value_or(ReadPreferenceSetting()), - requests, - allowPartialResults); -} - -} // namespace mongo diff --git a/src/mongo/s/query/router_stage_update_on_add_shard.h b/src/mongo/s/query/router_stage_update_on_add_shard.h deleted file mode 100644 index 00ee921e2af..00000000000 --- a/src/mongo/s/query/router_stage_update_on_add_shard.h +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Copyright (C) 2017 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ -#pragma once - -#include "mongo/executor/task_executor.h" -#include "mongo/s/query/cluster_client_cursor_params.h" -#include "mongo/s/query/router_exec_stage.h" - -namespace mongo { -/** - * Uses a RouterStageMerge to merge results, and monitors the merged stream for special - * sentinel documents which indicate the the set of cursors needs to be updated. When the - * sentinel is detected, removes it from the stream and updates the set of cursors. - * - * cmdToRunOnNewShards: Command to execute on the new shard to open the cursor. - */ -class RouterStageUpdateOnAddShard final : public RouterExecStage { -public: - RouterStageUpdateOnAddShard(OperationContext* opCtx, - executor::TaskExecutor* executor, - ClusterClientCursorParams* params, - std::vector<ShardId> shardIds, - BSONObj cmdToRunOnNewShards); - - StatusWith<ClusterQueryResult> next(ExecContext) final; - -private: - /** - * Establish the new cursors and tell the RouterStageMerge about them. - * obj: The BSONObj which triggered the establishment of the new cursors - */ - void addNewShardCursors(BSONObj obj); - - /** - * Open the cursors on the new shards. - */ - std::vector<RemoteCursor> establishShardCursorsOnNewShards(const BSONObj& newShardDetectedObj); - - ClusterClientCursorParams* _params; - std::vector<ShardId> _shardIds; - BSONObj _cmdToRunOnNewShards; -}; -} |