diff options
author | Ian Boros <ian.boros@10gen.com> | 2019-01-03 16:02:22 -0500 |
---|---|---|
committer | Ian Boros <ian.boros@10gen.com> | 2019-01-29 12:20:53 -0500 |
commit | da3c2c2dfcf0fc680a4f49f8f29ab0671f345d61 (patch) | |
tree | 2fea8318b64d66fbd14847d01e94a3bc12886b01 | |
parent | cf47aee946c42c246a9176e1df1cd27b12dde685 (diff) | |
download | mongo-da3c2c2dfcf0fc680a4f49f8f29ab0671f345d61.tar.gz |
SERVER-38728 allow pipeline with lookup stage on sharded collection to run on mongod
17 files changed, 915 insertions, 571 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml index 96c0ba7845a..7b811248e13 100644 --- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml +++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml @@ -57,6 +57,7 @@ selector: - jstests/sharding/restart_transactions.js - jstests/sharding/shard7.js - jstests/sharding/shard_config_db_collections.js + - jstests/sharding/lookup_on_shard.js # TODO: SERVER-38541 remove from blacklist - jstests/sharding/shard_collection_existing_zones.js - jstests/sharding/single_shard_transaction_with_arbiter.js diff --git a/jstests/core/txns/aggregation_in_transaction.js b/jstests/core/txns/aggregation_in_transaction.js index acc30c81873..5cd5285a6b5 100644 --- a/jstests/core/txns/aggregation_in_transaction.js +++ b/jstests/core/txns/aggregation_in_transaction.js @@ -3,6 +3,8 @@ (function() { "use strict"; + load("jstests/libs/fixture_helpers.js"); // For isSharded. + const session = db.getMongo().startSession({causalConsistency: false}); const testDB = session.getDatabase("test"); const coll = testDB.getCollection("aggregation_in_transaction"); @@ -21,6 +23,8 @@ const foreignDoc = {_id: "orange", val: 9}; assert.commandWorked(foreignColl.insert(foreignDoc, {writeConcern: {w: "majority"}})); + const isForeignSharded = FixtureHelpers.isSharded(foreignColl); + // Run a dummy find to start the transaction. jsTestLog("Starting transaction."); session.startTransaction({readConcern: {level: "snapshot"}}); @@ -43,29 +47,35 @@ assert(!cursor.hasNext()); // Perform aggregations that look at other collections. - const lookupDoc = Object.extend(testDoc, {lookup: [foreignDoc]}); - cursor = coll.aggregate({ - $lookup: { - from: foreignColl.getName(), - localField: "foreignKey", - foreignField: "_id", - as: "lookup", - } - }); - assert.docEq(cursor.next(), lookupDoc); - assert(!cursor.hasNext()); - - cursor = coll.aggregate({ - $graphLookup: { - from: foreignColl.getName(), - startWith: "$foreignKey", - connectFromField: "foreignKey", - connectToField: "_id", - as: "lookup" - } - }); - assert.docEq(cursor.next(), lookupDoc); - assert(!cursor.hasNext()); + // TODO: SERVER-39162 Sharded $lookup is not supported in transactions. + if (!isForeignSharded) { + const lookupDoc = Object.extend(testDoc, {lookup: [foreignDoc]}); + cursor = coll.aggregate({ + $lookup: { + from: foreignColl.getName(), + localField: "foreignKey", + foreignField: "_id", + as: "lookup", + } + }); + assert.docEq(cursor.next(), lookupDoc); + assert(!cursor.hasNext()); + + cursor = coll.aggregate({ + $graphLookup: { + from: foreignColl.getName(), + startWith: "$foreignKey", + connectFromField: "foreignKey", + connectToField: "_id", + as: "lookup" + } + }); + assert.docEq(cursor.next(), lookupDoc); + assert(!cursor.hasNext()); + } else { + // TODO SERVER-39048: Test that $lookup on sharded collection is banned + // within a transaction. + } jsTestLog("Testing $count within a transaction."); diff --git a/jstests/sharding/lookup_mongod_unaware.js b/jstests/sharding/lookup_mongod_unaware.js index 0c6072f8095..a7dfd6bc38a 100644 --- a/jstests/sharding/lookup_mongod_unaware.js +++ b/jstests/sharding/lookup_mongod_unaware.js @@ -94,17 +94,11 @@ restartPrimaryShard(st.rs0, mongos0LocalColl, mongos0ForeignColl); assert.eq(mongos0LocalColl.aggregate(pipeline).toArray(), expectedResults); - // Verify $lookup results through mongos1, which is not aware that the local - // collection is sharded. The results are expected to be incorrect when both the mongos and - // primary shard incorrectly believe that a collection is unsharded. - // TODO: This should be fixed by SERVER-32629, likewise for the other aggregates in this file - // sent to the stale mongos. + // Verify $lookup results through mongos1, which is not aware that the foreign collection is + // sharded. In this case the results will be correct since the entire pipeline will be run on a + // shard, which will do a refresh before executing the foreign pipeline. restartPrimaryShard(st.rs0, mongos0LocalColl, mongos0ForeignColl); - assert.eq(mongos1LocalColl.aggregate(pipeline).toArray(), [ - {_id: 0, a: 1, "same": []}, - {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]}, - {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]} - ]); + assert.eq(mongos1LocalColl.aggregate(pipeline).toArray(), expectedResults); // // Test sharded local and sharded foreign collections, with the primary shard unaware that @@ -134,6 +128,8 @@ // Verify $lookup results through mongos1, which is not aware that the local // collection is sharded. The results are expected to be incorrect when both the mongos and // primary shard incorrectly believe that a collection is unsharded. + // TODO: This should be fixed by SERVER-32629, likewise for the other aggregates in this file + // sent to the stale mongos. restartPrimaryShard(st.rs0, mongos0LocalColl, mongos0ForeignColl); assert.eq(mongos1LocalColl.aggregate(pipeline).toArray(), [ {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]}, diff --git a/jstests/sharding/lookup_on_shard.js b/jstests/sharding/lookup_on_shard.js new file mode 100644 index 00000000000..c7c90d7c626 --- /dev/null +++ b/jstests/sharding/lookup_on_shard.js @@ -0,0 +1,141 @@ +// Test that a pipeline with a $lookup stage on a sharded foreign collection may be run on a mongod. +(function() { + const sharded = new ShardingTest({mongos: 1, shards: 2}); + + assert.commandWorked(sharded.s.adminCommand({enableSharding: "test"})); + sharded.ensurePrimaryShard('test', sharded.shard0.shardName); + + const coll = sharded.s.getDB('test').mainColl; + const foreignColl = sharded.s.getDB('test').foreignColl; + const smallColl = sharded.s.getDB("test").smallColl; + + const nDocsMainColl = 10; + const nDocsForeignColl = 2 * nDocsMainColl; + + for (let i = 0; i < nDocsMainColl; i++) { + assert.commandWorked(coll.insert({_id: i, collName: "mainColl", foreignId: i})); + + assert.commandWorked( + foreignColl.insert({_id: 2 * i, key: i, collName: "foreignColl", data: "hello-0"})); + assert.commandWorked( + foreignColl.insert({_id: 2 * i + 1, key: i, collName: "foreignColl", data: "hello-1"})); + } + assert.commandWorked(smallColl.insert({_id: 0, collName: "smallColl"})); + + const runTest = function() { + (function testSingleLookupFromShard() { + // Run a pipeline which must be merged on a shard. This should force the $lookup (on + // the sharded collection) to be run on a mongod. + pipeline = [ + { + $lookup: { + localField: "foreignId", + foreignField: "key", + from: "foreignColl", + as: "foreignDoc" + } + }, + {$_internalSplitPipeline: {mergeType: "anyShard"}} + ]; + + const results = coll.aggregate(pipeline).toArray(); + assert.eq(results.length, nDocsMainColl); + for (let i = 0; i < results.length; i++) { + assert.eq(results[i].foreignDoc.length, 2, results[i]); + } + })(); + + (function testMultipleLookupsFromShard() { + // Run two lookups in a row (both on mongod). + pipeline = [ + { + $lookup: { + localField: "foreignId", + foreignField: "key", + from: "foreignColl", + as: "foreignDoc" + } + }, + { + $lookup: { + from: "smallColl", + as: "smallCollDocs", + pipeline: [], + } + }, + {$_internalSplitPipeline: {mergeType: "anyShard"}} + ]; + const results = coll.aggregate(pipeline).toArray(); + assert.eq(results.length, nDocsMainColl); + for (let i = 0; i < results.length; i++) { + assert.eq(results[i].foreignDoc.length, 2, results[i]); + assert.eq(results[i].smallCollDocs.length, 1, results[i]); + } + })(); + + (function testUnshardedLookupWithinShardedLookup() { + // Pipeline with unsharded $lookup inside a sharded $lookup. + pipeline = [ + { + $lookup: { + from: "foreignColl", + as: "foreignDoc", + pipeline: [ + {$lookup: {from: "smallColl", as: "doc", pipeline: []}}, + ], + } + }, + {$_internalSplitPipeline: {mergeType: "anyShard"}} + ]; + const results = coll.aggregate(pipeline).toArray(); + + assert.eq(results.length, nDocsMainColl); + for (let i = 0; i < results.length; i++) { + assert.eq(results[i].foreignDoc.length, nDocsForeignColl); + for (let j = 0; j < nDocsForeignColl; j++) { + // Each document pulled from the foreign collection should have one document + // from "smallColl." + assert.eq(results[i].foreignDoc[j].collName, "foreignColl"); + + // TODO SERVER-39016: Once a mongod is able to target the primary shard when + // reading from a non-sharded collection this should always work. Until then, + // the results of the query depend on which shard is chosen as the merging + // shard. If the primary shard is chosen, we'll get the correct results (and + // correctly find a document in "smallColl"). Otherwise if the merging shard is + // not the primary shard, the merging shard will attempt to do a local read (on + // an empty/non-existent collection), which will return nothing. + if (results[i].foreignDoc[j].doc.length === 1) { + assert.eq(results[i].foreignDoc[j].doc[0].collName, "smallColl"); + } else { + assert.eq(results[i].foreignDoc[j].doc.length, 0); + } + } + } + })(); + }; + + jsTestLog("Running test with neither collection sharded"); + runTest(); + + jsTestLog("Running test with foreign collection sharded"); + sharded.shardColl("foreignColl", + {_id: 1}, // shard key + {_id: 5}, // split + {_id: 5}, // move + "test", // dbName + true // waitForDelete + ); + runTest(); + + jsTestLog("Running test with main and foreign collection sharded"); + sharded.shardColl("mainColl", + {_id: 1}, // shard key + {_id: 5}, // split + {_id: 5}, // move + "test", // dbName + true // waitForDelete + ); + runTest(); + + sharded.stop(); +})(); diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index b2037454108..f03305d8d53 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -194,6 +194,19 @@ env.Library( ] ) +env.Library( + target='sharded_agg_helpers', + source=[ + 'sharded_agg_helpers.cpp', + ], + LIBDEPS=[ + 'aggregation', + '$BUILD_DIR/mongo/s/async_requests_sender', + '$BUILD_DIR/mongo/s/query/cluster_aggregation_planner', + '$BUILD_DIR/mongo/s/commands/shared_cluster_commands', + ], +) + env.CppUnitTest( target='document_source_test', source=[ @@ -314,6 +327,7 @@ env.Library( LIBDEPS=[ '$BUILD_DIR/mongo/s/sharding_api', 'process_interface_standalone', + 'sharded_agg_helpers', ], ) @@ -328,6 +342,7 @@ env.Library( '$BUILD_DIR/mongo/s/query/cluster_aggregation_planner', '$BUILD_DIR/mongo/s/query/cluster_query', 'mongo_process_common', + 'sharded_agg_helpers', ] ) diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index 09e7e3442ac..692717c6966 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -197,10 +197,10 @@ StageConstraints DocumentSourceLookUp::constraints(Pipeline::SplitState) const { } // If executing on mongos and the foreign collection is sharded, then this stage can run on - // mongos. + // mongos or any shard. HostTypeRequirement hostRequirement = (pExpCtx->inMongos && pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _fromNs)) - ? HostTypeRequirement::kMongoS + ? HostTypeRequirement::kNone : HostTypeRequirement::kPrimaryShard; StageConstraints constraints(StreamType::kStreaming, @@ -330,6 +330,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipeline( _cache.reset(); } + invariant(pipeline); return pipeline; } diff --git a/src/mongo/db/pipeline/mongos_process_interface.cpp b/src/mongo/db/pipeline/mongos_process_interface.cpp index f4c12f2cadf..896554f2dc2 100644 --- a/src/mongo/db/pipeline/mongos_process_interface.cpp +++ b/src/mongo/db/pipeline/mongos_process_interface.cpp @@ -39,6 +39,7 @@ #include "mongo/db/curop.h" #include "mongo/db/index/index_descriptor.h" #include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/sharded_agg_helpers.h" #include "mongo/db/query/collation/collation_spec.h" #include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/db/repl/read_concern_args.h" @@ -49,7 +50,6 @@ #include "mongo/s/grid.h" #include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/s/query/cluster_query_knobs.h" -#include "mongo/s/query/document_source_merge_cursors.h" #include "mongo/s/query/establish_cursors.h" #include "mongo/s/query/router_exec_stage.h" #include "mongo/s/transaction_router.h" @@ -58,98 +58,12 @@ namespace mongo { -MONGO_FAIL_POINT_DEFINE(clusterAggregateHangBeforeEstablishingShardCursors); - using boost::intrusive_ptr; using std::shared_ptr; using std::string; using std::unique_ptr; namespace { -// Given a document representing an aggregation command such as -// -// {aggregate: "myCollection", pipeline: [], ...}, -// -// produces the corresponding explain command: -// -// {explain: {aggregate: "myCollection", pipline: [], ...}, $queryOptions: {...}, verbosity: ...} -Document wrapAggAsExplain(Document aggregateCommand, ExplainOptions::Verbosity verbosity) { - MutableDocument explainCommandBuilder; - explainCommandBuilder["explain"] = Value(aggregateCommand); - // Downstream host targeting code expects queryOptions at the top level of the command object. - explainCommandBuilder[QueryRequest::kUnwrappedReadPrefField] = - Value(aggregateCommand[QueryRequest::kUnwrappedReadPrefField]); - - // readConcern needs to be promoted to the top-level of the request. - explainCommandBuilder[repl::ReadConcernArgs::kReadConcernFieldName] = - Value(aggregateCommand[repl::ReadConcernArgs::kReadConcernFieldName]); - - // Add explain command options. - for (auto&& explainOption : ExplainOptions::toBSON(verbosity)) { - explainCommandBuilder[explainOption.fieldNameStringData()] = Value(explainOption); - } - - return explainCommandBuilder.freeze(); -} - -std::vector<RemoteCursor> establishShardCursors( - OperationContext* opCtx, - const NamespaceString& nss, - const LiteParsedPipeline& litePipe, - boost::optional<CachedCollectionRoutingInfo>& routingInfo, - const BSONObj& cmdObj, - const AggregationRequest& request, - const ReadPreferenceSetting& readPref, - const BSONObj& shardQuery) { - LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards"; - - const bool mustRunOnAll = MongoSInterface::mustRunOnAllShards(nss, litePipe); - std::set<ShardId> shardIds = MongoSInterface::getTargetedShards( - opCtx, mustRunOnAll, routingInfo, shardQuery, request.getCollation()); - std::vector<std::pair<ShardId, BSONObj>> requests; - - // If we don't need to run on all shards, then we should always have a valid routing table. - invariant(routingInfo || mustRunOnAll); - - if (mustRunOnAll) { - // The pipeline contains a stage which must be run on all shards. Skip versioning and - // enqueue the raw command objects. - for (auto&& shardId : shardIds) { - requests.emplace_back(std::move(shardId), cmdObj); - } - } else if (routingInfo->cm()) { - // The collection is sharded. Use the routing table to decide which shards to target - // based on the query and collation, and build versioned requests for them. - for (auto& shardId : shardIds) { - auto versionedCmdObj = - appendShardVersion(cmdObj, routingInfo->cm()->getVersion(shardId)); - requests.emplace_back(std::move(shardId), std::move(versionedCmdObj)); - } - } else { - // The collection is unsharded. Target only the primary shard for the database. - // Don't append shard version info when contacting the config servers. - requests.emplace_back(routingInfo->db().primaryId(), - !routingInfo->db().primary()->isConfig() - ? appendShardVersion(cmdObj, ChunkVersion::UNSHARDED()) - : cmdObj); - } - - if (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) { - log() << "clusterAggregateHangBeforeEstablishingShardCursors fail point enabled. Blocking " - "until fail point is disabled."; - while (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) { - sleepsecs(1); - } - } - - return establishCursors(opCtx, - Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), - nss, - readPref, - requests, - false /* do not allow partial results */, - MongoSInterface::getDesiredRetryPolicy(request)); -} /** * Determines the single shard to which the given query will be targeted, and its associated @@ -210,307 +124,6 @@ bool supportsUniqueKey(const boost::intrusive_ptr<ExpressionContext>& expCtx, } // namespace -Shard::RetryPolicy MongoSInterface::getDesiredRetryPolicy(const AggregationRequest& req) { - // The idempotent retry policy will retry even for writeConcern failures, so only set it if the - // pipeline does not support writeConcern. - if (req.getWriteConcern()) { - return Shard::RetryPolicy::kNotIdempotent; - } - return Shard::RetryPolicy::kIdempotent; -} - -BSONObj MongoSInterface::createPassthroughCommandForShard(OperationContext* opCtx, - const AggregationRequest& request, - const boost::optional<ShardId>& shardId, - Pipeline* pipeline, - BSONObj collationObj) { - // Create the command for the shards. - MutableDocument targetedCmd(request.serializeToCommandObj()); - if (pipeline) { - targetedCmd[AggregationRequest::kPipelineName] = Value(pipeline->serialize()); - } - - return MongoSInterface::genericTransformForShards( - std::move(targetedCmd), opCtx, shardId, request, collationObj); -} - -BSONObj MongoSInterface::genericTransformForShards(MutableDocument&& cmdForShards, - OperationContext* opCtx, - const boost::optional<ShardId>& shardId, - const AggregationRequest& request, - BSONObj collationObj) { - cmdForShards[AggregationRequest::kFromMongosName] = Value(true); - // If this is a request for an aggregation explain, then we must wrap the aggregate inside an - // explain command. - if (auto explainVerbosity = request.getExplain()) { - cmdForShards.reset(wrapAggAsExplain(cmdForShards.freeze(), *explainVerbosity)); - } - - if (!collationObj.isEmpty()) { - cmdForShards[AggregationRequest::kCollationName] = Value(collationObj); - } - - if (opCtx->getTxnNumber()) { - invariant(cmdForShards.peek()[OperationSessionInfo::kTxnNumberFieldName].missing(), - str::stream() << "Command for shards unexpectedly had the " - << OperationSessionInfo::kTxnNumberFieldName - << " field set: " - << cmdForShards.peek().toString()); - cmdForShards[OperationSessionInfo::kTxnNumberFieldName] = - Value(static_cast<long long>(*opCtx->getTxnNumber())); - } - - auto aggCmd = cmdForShards.freeze().toBson(); - - if (shardId) { - if (auto txnRouter = TransactionRouter::get(opCtx)) { - aggCmd = txnRouter->attachTxnFieldsIfNeeded(*shardId, aggCmd); - } - } - - // agg creates temp collection and should handle implicit create separately. - return appendAllowImplicitCreate(aggCmd, true); -} - -BSONObj MongoSInterface::createCommandForTargetedShards( - OperationContext* opCtx, - const AggregationRequest& request, - const LiteParsedPipeline& litePipe, - const cluster_aggregation_planner::SplitPipeline& splitPipeline, - const BSONObj collationObj, - const boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec, - bool needsMerge) { - - // Create the command for the shards. - MutableDocument targetedCmd(request.serializeToCommandObj()); - // If we've parsed a pipeline on mongos, always override the pipeline, in case parsing it - // has defaulted any arguments or otherwise changed the spec. For example, $listSessions may - // have detected a logged in user and appended that user name to the $listSessions spec to - // send to the shards. - targetedCmd[AggregationRequest::kPipelineName] = - Value(splitPipeline.shardsPipeline->serialize()); - - // When running on many shards with the exchange we may not need merging. - if (needsMerge) { - targetedCmd[AggregationRequest::kNeedsMergeName] = Value(true); - - // If this is a change stream aggregation, set the 'mergeByPBRT' flag on the command. This - // notifies the shards that the mongoS is capable of merging streams based on resume token. - // TODO SERVER-38539: the 'mergeByPBRT' flag is no longer necessary in 4.4. - targetedCmd[AggregationRequest::kMergeByPBRTName] = Value(litePipe.hasChangeStream()); - - // For split pipelines which need merging, do *not* propagate the writeConcern to the shards - // part. Otherwise this is part of an exchange and in that case we should include the - // writeConcern. - targetedCmd[WriteConcernOptions::kWriteConcernField] = Value(); - } - - targetedCmd[AggregationRequest::kCursorName] = - Value(DOC(AggregationRequest::kBatchSizeName << 0)); - - targetedCmd[AggregationRequest::kExchangeName] = - exchangeSpec ? Value(exchangeSpec->exchangeSpec.toBSON()) : Value(); - - return genericTransformForShards( - std::move(targetedCmd), opCtx, boost::none, request, collationObj); -} - -std::set<ShardId> MongoSInterface::getTargetedShards( - OperationContext* opCtx, - bool mustRunOnAllShards, - const boost::optional<CachedCollectionRoutingInfo>& routingInfo, - const BSONObj shardQuery, - const BSONObj collation) { - if (mustRunOnAllShards) { - // The pipeline begins with a stage which must be run on all shards. - std::vector<ShardId> shardIds; - Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx, &shardIds); - return {shardIds.begin(), shardIds.end()}; - } - - // If we don't need to run on all shards, then we should always have a valid routing table. - invariant(routingInfo); - - return getTargetedShardsForQuery(opCtx, *routingInfo, shardQuery, collation); -} - -bool MongoSInterface::mustRunOnAllShards(const NamespaceString& nss, - const LiteParsedPipeline& litePipe) { - // The following aggregations must be routed to all shards: - // - Any collectionless aggregation, such as non-localOps $currentOp. - // - Any aggregation which begins with a $changeStream stage. - return nss.isCollectionlessAggregateNS() || litePipe.hasChangeStream(); -} - -StatusWith<CachedCollectionRoutingInfo> MongoSInterface::getExecutionNsRoutingInfo( - OperationContext* opCtx, const NamespaceString& execNss) { - // First, verify that there are shards present in the cluster. If not, then we return the - // stronger 'ShardNotFound' error rather than 'NamespaceNotFound'. We must do this because - // $changeStream aggregations ignore NamespaceNotFound in order to allow streams to be opened on - // a collection before its enclosing database is created. However, if there are no shards - // present, then $changeStream should immediately return an empty cursor just as other - // aggregations do when the database does not exist. - std::vector<ShardId> shardIds; - Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx, &shardIds); - if (shardIds.size() == 0) { - return {ErrorCodes::ShardNotFound, "No shards are present in the cluster"}; - } - - // This call to getCollectionRoutingInfoForTxnCmd will return !OK if the database does not - // exist. - return getCollectionRoutingInfoForTxnCmd(opCtx, execNss); -} - -/** - * Targets shards for the pipeline and returns a struct with the remote cursors or results, and - * the pipeline that will need to be executed to merge the results from the remotes. If a stale - * shard version is encountered, refreshes the routing table and tries again. - */ -MongoSInterface::DispatchShardPipelineResults MongoSInterface::dispatchShardPipeline( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& executionNss, - const AggregationRequest& aggRequest, - const LiteParsedPipeline& litePipe, - std::unique_ptr<Pipeline, PipelineDeleter> pipeline, - BSONObj collationObj) { - // The process is as follows: - // - First, determine whether we need to target more than one shard. If so, we split the - // pipeline; if not, we retain the existing pipeline. - // - Call establishShardCursors to dispatch the aggregation to the targeted shards. - // - Stale shard version errors are thrown up to the top-level handler, causing a retry on the - // entire aggregation commmand. - auto cursors = std::vector<RemoteCursor>(); - auto shardResults = std::vector<AsyncRequestsSender::Response>(); - auto opCtx = expCtx->opCtx; - - const bool needsPrimaryShardMerge = - (pipeline->needsPrimaryShardMerger() || internalQueryAlwaysMergeOnPrimaryShard.load()); - - const bool needsMongosMerge = pipeline->needsMongosMerger(); - - const auto shardQuery = pipeline->getInitialQuery(); - - auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, executionNss); - - // If this is a $changeStream, we swallow NamespaceNotFound exceptions and continue. - // Otherwise, uassert on all exceptions here. - if (!(litePipe.hasChangeStream() && - executionNsRoutingInfoStatus == ErrorCodes::NamespaceNotFound)) { - uassertStatusOK(executionNsRoutingInfoStatus); - } - - auto executionNsRoutingInfo = executionNsRoutingInfoStatus.isOK() - ? std::move(executionNsRoutingInfoStatus.getValue()) - : boost::optional<CachedCollectionRoutingInfo>{}; - - // Determine whether we can run the entire aggregation on a single shard. - const bool mustRunOnAll = mustRunOnAllShards(executionNss, litePipe); - std::set<ShardId> shardIds = getTargetedShards( - opCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, aggRequest.getCollation()); - - // Don't need to split the pipeline if we are only targeting a single shard, unless: - // - There is a stage that needs to be run on the primary shard and the single target shard - // is not the primary. - // - The pipeline contains one or more stages which must always merge on mongoS. - const bool needsSplit = (shardIds.size() > 1u || needsMongosMerge || - (needsPrimaryShardMerge && executionNsRoutingInfo && - *(shardIds.begin()) != executionNsRoutingInfo->db().primaryId())); - - boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec; - boost::optional<cluster_aggregation_planner::SplitPipeline> splitPipeline; - - if (needsSplit) { - splitPipeline = cluster_aggregation_planner::splitPipeline(std::move(pipeline)); - - exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange( - opCtx, splitPipeline->mergePipeline.get()); - } - - // Generate the command object for the targeted shards. - BSONObj targetedCommand = splitPipeline - ? createCommandForTargetedShards( - opCtx, aggRequest, litePipe, *splitPipeline, collationObj, exchangeSpec, true) - : createPassthroughCommandForShard( - opCtx, aggRequest, boost::none, pipeline.get(), collationObj); - - // Refresh the shard registry if we're targeting all shards. We need the shard registry - // to be at least as current as the logical time used when creating the command for - // $changeStream to work reliably, so we do a "hard" reload. - if (mustRunOnAll) { - auto* shardRegistry = Grid::get(opCtx)->shardRegistry(); - if (!shardRegistry->reload(opCtx)) { - shardRegistry->reload(opCtx); - } - // Rebuild the set of shards as the shard registry might have changed. - shardIds = getTargetedShards( - opCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, aggRequest.getCollation()); - } - - // Explain does not produce a cursor, so instead we scatter-gather commands to the shards. - if (expCtx->explain) { - if (mustRunOnAll) { - // Some stages (such as $currentOp) need to be broadcast to all shards, and - // should not participate in the shard version protocol. - shardResults = - scatterGatherUnversionedTargetAllShards(opCtx, - executionNss.db(), - targetedCommand, - ReadPreferenceSetting::get(opCtx), - Shard::RetryPolicy::kIdempotent); - } else { - // Aggregations on a real namespace should use the routing table to target - // shards, and should participate in the shard version protocol. - invariant(executionNsRoutingInfo); - shardResults = - scatterGatherVersionedTargetByRoutingTable(opCtx, - executionNss.db(), - executionNss, - *executionNsRoutingInfo, - targetedCommand, - ReadPreferenceSetting::get(opCtx), - Shard::RetryPolicy::kIdempotent, - shardQuery, - aggRequest.getCollation()); - } - } else { - cursors = establishShardCursors(opCtx, - executionNss, - litePipe, - executionNsRoutingInfo, - targetedCommand, - aggRequest, - ReadPreferenceSetting::get(opCtx), - shardQuery); - invariant(cursors.size() % shardIds.size() == 0, - str::stream() << "Number of cursors (" << cursors.size() - << ") is not a multiple of producers (" - << shardIds.size() - << ")"); - } - - // Convert remote cursors into a vector of "owned" cursors. - std::vector<OwnedRemoteCursor> ownedCursors; - for (auto&& cursor : cursors) { - ownedCursors.emplace_back(OwnedRemoteCursor(opCtx, std::move(cursor), executionNss)); - } - - // Record the number of shards involved in the aggregation. If we are required to merge on - // the primary shard, but the primary shard was not in the set of targeted shards, then we - // must increment the number of involved shards. - CurOp::get(opCtx)->debug().nShards = - shardIds.size() + (needsPrimaryShardMerge && executionNsRoutingInfo && - !shardIds.count(executionNsRoutingInfo->db().primaryId())); - - return DispatchShardPipelineResults{needsPrimaryShardMerge, - std::move(ownedCursors), - std::move(shardResults), - std::move(splitPipeline), - std::move(pipeline), - targetedCommand, - shardIds.size(), - exchangeSpec}; -} - std::unique_ptr<Pipeline, PipelineDeleter> MongoSInterface::makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, @@ -530,61 +143,9 @@ std::unique_ptr<Pipeline, PipelineDeleter> MongoSInterface::makePipeline( return pipeline; } - std::unique_ptr<Pipeline, PipelineDeleter> MongoSInterface::attachCursorSourceToPipeline( const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* ownedPipeline) { - std::unique_ptr<Pipeline, PipelineDeleter> pipeline(ownedPipeline, - PipelineDeleter(expCtx->opCtx)); - - invariant(pipeline->getSources().empty() || - !dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get())); - - // Generate the command object for the targeted shards. - std::vector<BSONObj> rawStages = [&pipeline]() { - auto serialization = pipeline->serialize(); - std::vector<BSONObj> stages; - stages.reserve(serialization.size()); - - for (const auto& stageObj : serialization) { - invariant(stageObj.getType() == BSONType::Object); - stages.push_back(stageObj.getDocument().toBson()); - } - - return stages; - }(); - - AggregationRequest aggRequest(expCtx->ns, rawStages); - LiteParsedPipeline liteParsedPipeline(aggRequest); - auto shardDispatchResults = MongoSInterface::dispatchShardPipeline( - expCtx, expCtx->ns, aggRequest, liteParsedPipeline, std::move(pipeline), expCtx->collation); - - std::vector<ShardId> targetedShards; - targetedShards.reserve(shardDispatchResults.remoteCursors.size()); - for (auto&& remoteCursor : shardDispatchResults.remoteCursors) { - targetedShards.emplace_back(remoteCursor->getShardId().toString()); - } - - std::unique_ptr<Pipeline, PipelineDeleter> mergePipeline; - boost::optional<BSONObj> shardCursorsSortSpec = boost::none; - if (shardDispatchResults.splitPipeline) { - mergePipeline = std::move(shardDispatchResults.splitPipeline->mergePipeline); - shardCursorsSortSpec = shardDispatchResults.splitPipeline->shardCursorsSortSpec; - } else { - // We have not split the pipeline, and will execute entirely on the remote shards. Set up an - // empty local pipeline which we will attach the merge cursors stage to. - mergePipeline = uassertStatusOK(Pipeline::parse(std::vector<BSONObj>(), expCtx)); - } - - cluster_aggregation_planner::addMergeCursorsSource( - mergePipeline.get(), - liteParsedPipeline, - shardDispatchResults.commandForTargetedShards, - std::move(shardDispatchResults.remoteCursors), - targetedShards, - shardCursorsSortSpec, - Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor()); - - return mergePipeline; + return sharded_agg_helpers::targetShardsAndAddMergeCursors(expCtx, ownedPipeline); } boost::optional<Document> MongoSInterface::lookupSingleDocument( diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h index 1bdcae87595..dc7ec6fa955 100644 --- a/src/mongo/db/pipeline/mongos_process_interface.h +++ b/src/mongo/db/pipeline/mongos_process_interface.h @@ -45,37 +45,6 @@ namespace mongo { */ class MongoSInterface final : public MongoProcessCommon { public: - struct DispatchShardPipelineResults { - // True if this pipeline was split, and the second half of the pipeline needs to be run on - // the primary shard for the database. - bool needsPrimaryShardMerge; - - // Populated if this *is not* an explain, this vector represents the cursors on the remote - // shards. - std::vector<OwnedRemoteCursor> remoteCursors; - - // Populated if this *is* an explain, this vector represents the results from each shard. - std::vector<AsyncRequestsSender::Response> remoteExplainOutput; - - // The split version of the pipeline if more than one shard was targeted, otherwise - // boost::none. - boost::optional<cluster_aggregation_planner::SplitPipeline> splitPipeline; - - // If the pipeline targeted a single shard, this is the pipeline to run on that shard. - std::unique_ptr<Pipeline, PipelineDeleter> pipelineForSingleShard; - - // The command object to send to the targeted shards. - BSONObj commandForTargetedShards; - - // How many exchange producers are running the shard part of splitPipeline. - size_t numProducers; - - // The exchange specification if the query can run with the exchange otherwise boost::none. - boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec; - }; - - static Shard::RetryPolicy getDesiredRetryPolicy(const AggregationRequest& req); - static BSONObj createPassthroughCommandForShard(OperationContext* opCtx, const AggregationRequest& request, const boost::optional<ShardId>& shardId, @@ -101,26 +70,9 @@ public: const boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec, bool needsMerge); - static std::set<ShardId> getTargetedShards( - OperationContext* opCtx, - bool mustRunOnAllShards, - const boost::optional<CachedCollectionRoutingInfo>& routingInfo, - const BSONObj shardQuery, - const BSONObj collation); - - static bool mustRunOnAllShards(const NamespaceString& nss, const LiteParsedPipeline& litePipe); - static StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo( OperationContext* opCtx, const NamespaceString& execNss); - static DispatchShardPipelineResults dispatchShardPipeline( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& executionNss, - const AggregationRequest& aggRequest, - const LiteParsedPipeline& litePipe, - std::unique_ptr<Pipeline, PipelineDeleter> pipeline, - BSONObj collationObj); - MongoSInterface() = default; virtual ~MongoSInterface() = default; diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.cpp b/src/mongo/db/pipeline/process_interface_shardsvr.cpp index 76949f4348d..fd005414c02 100644 --- a/src/mongo/db/pipeline/process_interface_shardsvr.cpp +++ b/src/mongo/db/pipeline/process_interface_shardsvr.cpp @@ -44,10 +44,14 @@ #include "mongo/db/db_raii.h" #include "mongo/db/ops/write_ops_exec.h" #include "mongo/db/ops/write_ops_gen.h" +#include "mongo/db/pipeline/sharded_agg_helpers.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/sharding_state.h" +#include "mongo/db/transaction_participant.h" #include "mongo/s/catalog_cache.h" +#include "mongo/s/cluster_commands_helpers.h" #include "mongo/s/grid.h" +#include "mongo/s/query/document_source_merge_cursors.h" #include "mongo/s/write_ops/cluster_write.h" #include "mongo/util/log.h" @@ -153,4 +157,47 @@ void MongoInterfaceShardServer::update(const boost::intrusive_ptr<ExpressionCont uassertStatusOKWithContext(response.toStatus(), "Update failed: "); } +unique_ptr<Pipeline, PipelineDeleter> MongoInterfaceShardServer::attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* ownedPipeline) { + std::unique_ptr<Pipeline, PipelineDeleter> pipeline(ownedPipeline, + PipelineDeleter(expCtx->opCtx)); + + invariant(pipeline->getSources().empty() || + !dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get())); + + // $lookup on a sharded collection is not allowed in a transaction. We assume that if we're in + // a transaction, the foreign collection is unsharded. Otherwise, we may access the catalog + // cache, and attempt to do a network request while holding locks. + // TODO: SERVER-39162 allow $lookup in sharded transactions. + auto txnParticipant = TransactionParticipant::get(expCtx->opCtx); + const bool inTxn = txnParticipant && txnParticipant->inMultiDocumentTransaction(); + + const bool isSharded = [&]() { + if (inTxn || !ShardingState::get(expCtx->opCtx)->enabled()) { + // Sharding isn't enabled or we're in a transaction. In either case we assume it's + // unsharded. + return false; + } else if (expCtx->ns.db() == "local") { + // This may be a change stream examining the oplog. We know the oplog (or any local + // collections for that matter) will never be sharded. + return false; + } + return uassertStatusOK(getCollectionRoutingInfoForTxnCmd(expCtx->opCtx, expCtx->ns)).cm() != + nullptr; + }(); + + if (isSharded) { + // For a sharded collection we may have to establish cursors on a remote host. + return sharded_agg_helpers::targetShardsAndAddMergeCursors(expCtx, pipeline.release()); + } + + // Perform a "local read", the same as if we weren't a shard server. + + // TODO SERVER-39015 we should do a shard version check here after we acquire a lock within + // this function, to be sure the collection didn't become sharded between the time we checked + // whether it's sharded and the time we took the lock. + + return MongoInterfaceStandalone::attachCursorSourceToPipeline(expCtx, pipeline.release()); +} + } // namespace mongo diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.h b/src/mongo/db/pipeline/process_interface_shardsvr.h index 22c54a8a6cf..75391ea7410 100644 --- a/src/mongo/db/pipeline/process_interface_shardsvr.h +++ b/src/mongo/db/pipeline/process_interface_shardsvr.h @@ -83,6 +83,9 @@ public: bool upsert, bool multi, boost::optional<OID> targetEpoch) final; + + std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp index f7736cd41b1..e9de70428dc 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.cpp +++ b/src/mongo/db/pipeline/process_interface_standalone.cpp @@ -45,6 +45,7 @@ #include "mongo/db/db_raii.h" #include "mongo/db/index/index_descriptor.h" #include "mongo/db/pipeline/document_source_cursor.h" +#include "mongo/db/pipeline/lite_parsed_pipeline.h" #include "mongo/db/pipeline/pipeline_d.h" #include "mongo/db/repl/speculative_majority_read_info.h" #include "mongo/db/s/collection_sharding_state.h" @@ -55,6 +56,8 @@ #include "mongo/db/stats/storage_stats.h" #include "mongo/db/storage/backup_cursor_hooks.h" #include "mongo/db/transaction_participant.h" +#include "mongo/s/cluster_commands_helpers.h" +#include "mongo/s/query/document_source_merge_cursors.h" #include "mongo/util/log.h" namespace mongo { @@ -340,29 +343,14 @@ unique_ptr<Pipeline, PipelineDeleter> MongoInterfaceStandalone::attachCursorSour !dynamic_cast<DocumentSourceCursor*>(pipeline->getSources().front().get())); boost::optional<AutoGetCollectionForReadCommand> autoColl; - if (expCtx->uuid) { - autoColl.emplace(expCtx->opCtx, - NamespaceStringOrUUID{expCtx->ns.db().toString(), *expCtx->uuid}, - AutoGetCollection::ViewMode::kViewsForbidden, - Date_t::max(), - AutoStatsTracker::LogMode::kUpdateTop); - } else { - autoColl.emplace(expCtx->opCtx, - expCtx->ns, - AutoGetCollection::ViewMode::kViewsForbidden, - Date_t::max(), - AutoStatsTracker::LogMode::kUpdateTop); - } - - // makePipeline() is only called to perform secondary aggregation requests and expects the - // collection representing the document source to be not-sharded. We confirm sharding state - // here to avoid taking a collection lock elsewhere for this purpose alone. - // TODO SERVER-27616: This check is incorrect in that we don't acquire a collection cursor - // until after we release the lock, leaving room for a collection to be sharded in-between. - auto css = CollectionShardingState::get(expCtx->opCtx, expCtx->ns); - uassert(4567, - str::stream() << "from collection (" << expCtx->ns.ns() << ") cannot be sharded", - !css->getMetadataForOperation(expCtx->opCtx)->isSharded()); + const NamespaceStringOrUUID nsOrUUID = expCtx->uuid + ? NamespaceStringOrUUID{expCtx->ns.db().toString(), *expCtx->uuid} + : expCtx->ns; + autoColl.emplace(expCtx->opCtx, + nsOrUUID, + AutoGetCollection::ViewMode::kViewsForbidden, + Date_t::max(), + AutoStatsTracker::LogMode::kUpdateTop); PipelineD::prepareCursorSource(autoColl->getCollection(), expCtx->ns, nullptr, pipeline.get()); diff --git a/src/mongo/db/pipeline/process_interface_standalone.h b/src/mongo/db/pipeline/process_interface_standalone.h index b4331733639..a04b2a6859d 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.h +++ b/src/mongo/db/pipeline/process_interface_standalone.h @@ -92,7 +92,7 @@ public: const boost::intrusive_ptr<ExpressionContext>& expCtx, const MakePipelineOptions opts = MakePipelineOptions{}) final; std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( - const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final; + const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) override; std::string getShardName(OperationContext* opCtx) const final; std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFieldsForHostedCollection( OperationContext* opCtx, const NamespaceString&, UUID) const override; diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp new file mode 100644 index 00000000000..cb1f44f7092 --- /dev/null +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -0,0 +1,488 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand + +#include "mongo/platform/basic.h" + +#include "sharded_agg_helpers.h" + +#include "mongo/db/curop.h" +#include "mongo/db/pipeline/document_source.h" +#include "mongo/s/catalog_cache.h" +#include "mongo/s/cluster_commands_helpers.h" +#include "mongo/s/query/cluster_query_knobs.h" +#include "mongo/s/query/document_source_merge_cursors.h" +#include "mongo/s/transaction_router.h" +#include "mongo/util/fail_point.h" +#include "mongo/util/log.h" + +namespace mongo { +namespace sharded_agg_helpers { + +MONGO_FAIL_POINT_DEFINE(clusterAggregateHangBeforeEstablishingShardCursors); + +/** + * Given a document representing an aggregation command such as + * {aggregate: "myCollection", pipeline: [], ...}, + * + * produces the corresponding explain command: + * {explain: {aggregate: "myCollection", pipline: [], ...}, $queryOptions: {...}, verbosity: ...} + */ +Document wrapAggAsExplain(Document aggregateCommand, ExplainOptions::Verbosity verbosity) { + MutableDocument explainCommandBuilder; + explainCommandBuilder["explain"] = Value(aggregateCommand); + // Downstream host targeting code expects queryOptions at the top level of the command object. + explainCommandBuilder[QueryRequest::kUnwrappedReadPrefField] = + Value(aggregateCommand[QueryRequest::kUnwrappedReadPrefField]); + + // readConcern needs to be promoted to the top-level of the request. + explainCommandBuilder[repl::ReadConcernArgs::kReadConcernFieldName] = + Value(aggregateCommand[repl::ReadConcernArgs::kReadConcernFieldName]); + + // Add explain command options. + for (auto&& explainOption : ExplainOptions::toBSON(verbosity)) { + explainCommandBuilder[explainOption.fieldNameStringData()] = Value(explainOption); + } + + return explainCommandBuilder.freeze(); +} + +BSONObj createPassthroughCommandForShard(OperationContext* opCtx, + const AggregationRequest& request, + const boost::optional<ShardId>& shardId, + Pipeline* pipeline, + BSONObj collationObj) { + // Create the command for the shards. + MutableDocument targetedCmd(request.serializeToCommandObj()); + if (pipeline) { + targetedCmd[AggregationRequest::kPipelineName] = Value(pipeline->serialize()); + } + + return genericTransformForShards(std::move(targetedCmd), opCtx, shardId, request, collationObj); +} + +BSONObj genericTransformForShards(MutableDocument&& cmdForShards, + OperationContext* opCtx, + const boost::optional<ShardId>& shardId, + const AggregationRequest& request, + BSONObj collationObj) { + cmdForShards[AggregationRequest::kFromMongosName] = Value(true); + // If this is a request for an aggregation explain, then we must wrap the aggregate inside an + // explain command. + if (auto explainVerbosity = request.getExplain()) { + cmdForShards.reset(wrapAggAsExplain(cmdForShards.freeze(), *explainVerbosity)); + } + + if (!collationObj.isEmpty()) { + cmdForShards[AggregationRequest::kCollationName] = Value(collationObj); + } + + if (opCtx->getTxnNumber()) { + invariant(cmdForShards.peek()[OperationSessionInfo::kTxnNumberFieldName].missing(), + str::stream() << "Command for shards unexpectedly had the " + << OperationSessionInfo::kTxnNumberFieldName + << " field set: " + << cmdForShards.peek().toString()); + cmdForShards[OperationSessionInfo::kTxnNumberFieldName] = + Value(static_cast<long long>(*opCtx->getTxnNumber())); + } + + auto aggCmd = cmdForShards.freeze().toBson(); + + if (shardId) { + if (auto txnRouter = TransactionRouter::get(opCtx)) { + aggCmd = txnRouter->attachTxnFieldsIfNeeded(*shardId, aggCmd); + } + } + + // agg creates temp collection and should handle implicit create separately. + return appendAllowImplicitCreate(aggCmd, true); +} + +StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo(OperationContext* opCtx, + const NamespaceString& execNss) { + // First, verify that there are shards present in the cluster. If not, then we return the + // stronger 'ShardNotFound' error rather than 'NamespaceNotFound'. We must do this because + // $changeStream aggregations ignore NamespaceNotFound in order to allow streams to be opened on + // a collection before its enclosing database is created. However, if there are no shards + // present, then $changeStream should immediately return an empty cursor just as other + // aggregations do when the database does not exist. + std::vector<ShardId> shardIds; + Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx, &shardIds); + if (shardIds.size() == 0) { + return {ErrorCodes::ShardNotFound, "No shards are present in the cluster"}; + } + + // This call to getCollectionRoutingInfoForTxnCmd will return !OK if the database does not + // exist. + return getCollectionRoutingInfoForTxnCmd(opCtx, execNss); +} + +Shard::RetryPolicy getDesiredRetryPolicy(const AggregationRequest& req) { + // The idempotent retry policy will retry even for writeConcern failures, so only set it if the + // pipeline does not support writeConcern. + if (req.getWriteConcern()) { + return Shard::RetryPolicy::kNotIdempotent; + } + return Shard::RetryPolicy::kIdempotent; +} + +bool mustRunOnAllShards(const NamespaceString& nss, const LiteParsedPipeline& litePipe) { + // The following aggregations must be routed to all shards: + // - Any collectionless aggregation, such as non-localOps $currentOp. + // - Any aggregation which begins with a $changeStream stage. + return nss.isCollectionlessAggregateNS() || litePipe.hasChangeStream(); +} +BSONObj createCommandForTargetedShards( + OperationContext* opCtx, + const AggregationRequest& request, + const LiteParsedPipeline& litePipe, + const cluster_aggregation_planner::SplitPipeline& splitPipeline, + const BSONObj collationObj, + const boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec, + bool needsMerge) { + + // Create the command for the shards. + MutableDocument targetedCmd(request.serializeToCommandObj()); + // If we've parsed a pipeline on mongos, always override the pipeline, in case parsing it + // has defaulted any arguments or otherwise changed the spec. For example, $listSessions may + // have detected a logged in user and appended that user name to the $listSessions spec to + // send to the shards. + targetedCmd[AggregationRequest::kPipelineName] = + Value(splitPipeline.shardsPipeline->serialize()); + + // When running on many shards with the exchange we may not need merging. + if (needsMerge) { + targetedCmd[AggregationRequest::kNeedsMergeName] = Value(true); + + // If this is a change stream aggregation, set the 'mergeByPBRT' flag on the command. This + // notifies the shards that the mongoS is capable of merging streams based on resume token. + // TODO SERVER-38539: the 'mergeByPBRT' flag is no longer necessary in 4.4. + targetedCmd[AggregationRequest::kMergeByPBRTName] = Value(litePipe.hasChangeStream()); + + // For split pipelines which need merging, do *not* propagate the writeConcern to the shards + // part. Otherwise this is part of an exchange and in that case we should include the + // writeConcern. + targetedCmd[WriteConcernOptions::kWriteConcernField] = Value(); + } + + targetedCmd[AggregationRequest::kCursorName] = + Value(DOC(AggregationRequest::kBatchSizeName << 0)); + + targetedCmd[AggregationRequest::kExchangeName] = + exchangeSpec ? Value(exchangeSpec->exchangeSpec.toBSON()) : Value(); + + return genericTransformForShards( + std::move(targetedCmd), opCtx, boost::none, request, collationObj); +} + +/** + * Targets shards for the pipeline and returns a struct with the remote cursors or results, and + * the pipeline that will need to be executed to merge the results from the remotes. If a stale + * shard version is encountered, refreshes the routing table and tries again. + */ +DispatchShardPipelineResults dispatchShardPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& executionNss, + const AggregationRequest& aggRequest, + const LiteParsedPipeline& litePipe, + std::unique_ptr<Pipeline, PipelineDeleter> pipeline, + BSONObj collationObj) { + // The process is as follows: + // - First, determine whether we need to target more than one shard. If so, we split the + // pipeline; if not, we retain the existing pipeline. + // - Call establishShardCursors to dispatch the aggregation to the targeted shards. + // - Stale shard version errors are thrown up to the top-level handler, causing a retry on the + // entire aggregation commmand. + auto cursors = std::vector<RemoteCursor>(); + auto shardResults = std::vector<AsyncRequestsSender::Response>(); + auto opCtx = expCtx->opCtx; + + const bool needsPrimaryShardMerge = + (pipeline->needsPrimaryShardMerger() || internalQueryAlwaysMergeOnPrimaryShard.load()); + + const bool needsMongosMerge = pipeline->needsMongosMerger(); + + const auto shardQuery = pipeline->getInitialQuery(); + + auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, executionNss); + + // If this is a $changeStream, we swallow NamespaceNotFound exceptions and continue. + // Otherwise, uassert on all exceptions here. + if (!(litePipe.hasChangeStream() && + executionNsRoutingInfoStatus == ErrorCodes::NamespaceNotFound)) { + uassertStatusOK(executionNsRoutingInfoStatus); + } + + auto executionNsRoutingInfo = executionNsRoutingInfoStatus.isOK() + ? std::move(executionNsRoutingInfoStatus.getValue()) + : boost::optional<CachedCollectionRoutingInfo>{}; + + // Determine whether we can run the entire aggregation on a single shard. + const bool mustRunOnAll = mustRunOnAllShards(executionNss, litePipe); + std::set<ShardId> shardIds = getTargetedShards( + opCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, aggRequest.getCollation()); + + // Don't need to split the pipeline if we are only targeting a single shard, unless: + // - There is a stage that needs to be run on the primary shard and the single target shard + // is not the primary. + // - The pipeline contains one or more stages which must always merge on mongoS. + const bool needsSplit = (shardIds.size() > 1u || needsMongosMerge || + (needsPrimaryShardMerge && executionNsRoutingInfo && + *(shardIds.begin()) != executionNsRoutingInfo->db().primaryId())); + + boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec; + boost::optional<cluster_aggregation_planner::SplitPipeline> splitPipeline; + + if (needsSplit) { + splitPipeline = cluster_aggregation_planner::splitPipeline(std::move(pipeline)); + + exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange( + opCtx, splitPipeline->mergePipeline.get()); + } + + // Generate the command object for the targeted shards. + BSONObj targetedCommand = splitPipeline + ? createCommandForTargetedShards( + opCtx, aggRequest, litePipe, *splitPipeline, collationObj, exchangeSpec, true) + : createPassthroughCommandForShard( + opCtx, aggRequest, boost::none, pipeline.get(), collationObj); + + // Refresh the shard registry if we're targeting all shards. We need the shard registry + // to be at least as current as the logical time used when creating the command for + // $changeStream to work reliably, so we do a "hard" reload. + if (mustRunOnAll) { + auto* shardRegistry = Grid::get(opCtx)->shardRegistry(); + if (!shardRegistry->reload(opCtx)) { + shardRegistry->reload(opCtx); + } + // Rebuild the set of shards as the shard registry might have changed. + shardIds = getTargetedShards( + opCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, aggRequest.getCollation()); + } + + // Explain does not produce a cursor, so instead we scatter-gather commands to the shards. + if (expCtx->explain) { + if (mustRunOnAll) { + // Some stages (such as $currentOp) need to be broadcast to all shards, and + // should not participate in the shard version protocol. + shardResults = + scatterGatherUnversionedTargetAllShards(opCtx, + executionNss.db(), + targetedCommand, + ReadPreferenceSetting::get(opCtx), + Shard::RetryPolicy::kIdempotent); + } else { + // Aggregations on a real namespace should use the routing table to target + // shards, and should participate in the shard version protocol. + invariant(executionNsRoutingInfo); + shardResults = + scatterGatherVersionedTargetByRoutingTable(opCtx, + executionNss.db(), + executionNss, + *executionNsRoutingInfo, + targetedCommand, + ReadPreferenceSetting::get(opCtx), + Shard::RetryPolicy::kIdempotent, + shardQuery, + aggRequest.getCollation()); + } + } else { + cursors = establishShardCursors(opCtx, + executionNss, + litePipe, + executionNsRoutingInfo, + targetedCommand, + aggRequest, + ReadPreferenceSetting::get(opCtx), + shardQuery); + invariant(cursors.size() % shardIds.size() == 0, + str::stream() << "Number of cursors (" << cursors.size() + << ") is not a multiple of producers (" + << shardIds.size() + << ")"); + } + + // Convert remote cursors into a vector of "owned" cursors. + std::vector<OwnedRemoteCursor> ownedCursors; + for (auto&& cursor : cursors) { + ownedCursors.emplace_back(OwnedRemoteCursor(opCtx, std::move(cursor), executionNss)); + } + + // Record the number of shards involved in the aggregation. If we are required to merge on + // the primary shard, but the primary shard was not in the set of targeted shards, then we + // must increment the number of involved shards. + CurOp::get(opCtx)->debug().nShards = + shardIds.size() + (needsPrimaryShardMerge && executionNsRoutingInfo && + !shardIds.count(executionNsRoutingInfo->db().primaryId())); + + return DispatchShardPipelineResults{needsPrimaryShardMerge, + std::move(ownedCursors), + std::move(shardResults), + std::move(splitPipeline), + std::move(pipeline), + targetedCommand, + shardIds.size(), + exchangeSpec}; +} + +std::set<ShardId> getTargetedShards(OperationContext* opCtx, + bool mustRunOnAllShards, + const boost::optional<CachedCollectionRoutingInfo>& routingInfo, + const BSONObj shardQuery, + const BSONObj collation) { + if (mustRunOnAllShards) { + // The pipeline begins with a stage which must be run on all shards. + std::vector<ShardId> shardIds; + Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx, &shardIds); + return {shardIds.begin(), shardIds.end()}; + } + + // If we don't need to run on all shards, then we should always have a valid routing table. + invariant(routingInfo); + + return getTargetedShardsForQuery(opCtx, *routingInfo, shardQuery, collation); +} + +std::vector<RemoteCursor> establishShardCursors( + OperationContext* opCtx, + const NamespaceString& nss, + const LiteParsedPipeline& litePipe, + boost::optional<CachedCollectionRoutingInfo>& routingInfo, + const BSONObj& cmdObj, + const AggregationRequest& request, + const ReadPreferenceSetting& readPref, + const BSONObj& shardQuery) { + LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards"; + + const bool mustRunOnAll = mustRunOnAllShards(nss, litePipe); + std::set<ShardId> shardIds = + getTargetedShards(opCtx, mustRunOnAll, routingInfo, shardQuery, request.getCollation()); + std::vector<std::pair<ShardId, BSONObj>> requests; + + // If we don't need to run on all shards, then we should always have a valid routing table. + invariant(routingInfo || mustRunOnAll); + + if (mustRunOnAll) { + // The pipeline contains a stage which must be run on all shards. Skip versioning and + // enqueue the raw command objects. + for (auto&& shardId : shardIds) { + requests.emplace_back(std::move(shardId), cmdObj); + } + } else if (routingInfo->cm()) { + // The collection is sharded. Use the routing table to decide which shards to target + // based on the query and collation, and build versioned requests for them. + for (auto& shardId : shardIds) { + auto versionedCmdObj = + appendShardVersion(cmdObj, routingInfo->cm()->getVersion(shardId)); + requests.emplace_back(std::move(shardId), std::move(versionedCmdObj)); + } + } else { + // The collection is unsharded. Target only the primary shard for the database. + // Don't append shard version info when contacting the config servers. + requests.emplace_back(routingInfo->db().primaryId(), + !routingInfo->db().primary()->isConfig() + ? appendShardVersion(cmdObj, ChunkVersion::UNSHARDED()) + : cmdObj); + } + + if (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) { + log() << "clusterAggregateHangBeforeEstablishingShardCursors fail point enabled. Blocking " + "until fail point is disabled."; + while (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) { + sleepsecs(1); + } + } + + return establishCursors(opCtx, + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), + nss, + readPref, + requests, + false /* do not allow partial results */, + getDesiredRetryPolicy(request)); +} + +std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors( + const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* ownedPipeline) { + std::unique_ptr<Pipeline, PipelineDeleter> pipeline(ownedPipeline, + PipelineDeleter(expCtx->opCtx)); + + invariant(pipeline->getSources().empty() || + !dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get())); + + // Generate the command object for the targeted shards. + std::vector<BSONObj> rawStages = [&pipeline]() { + auto serialization = pipeline->serialize(); + std::vector<BSONObj> stages; + stages.reserve(serialization.size()); + + for (const auto& stageObj : serialization) { + invariant(stageObj.getType() == BSONType::Object); + stages.push_back(stageObj.getDocument().toBson()); + } + + return stages; + }(); + + AggregationRequest aggRequest(expCtx->ns, rawStages); + LiteParsedPipeline liteParsedPipeline(aggRequest); + auto shardDispatchResults = dispatchShardPipeline( + expCtx, expCtx->ns, aggRequest, liteParsedPipeline, std::move(pipeline), expCtx->collation); + + std::vector<ShardId> targetedShards; + targetedShards.reserve(shardDispatchResults.remoteCursors.size()); + for (auto&& remoteCursor : shardDispatchResults.remoteCursors) { + targetedShards.emplace_back(remoteCursor->getShardId().toString()); + } + + std::unique_ptr<Pipeline, PipelineDeleter> mergePipeline; + boost::optional<BSONObj> shardCursorsSortSpec = boost::none; + if (shardDispatchResults.splitPipeline) { + mergePipeline = std::move(shardDispatchResults.splitPipeline->mergePipeline); + shardCursorsSortSpec = shardDispatchResults.splitPipeline->shardCursorsSortSpec; + } else { + // We have not split the pipeline, and will execute entirely on the remote shards. Set up an + // empty local pipeline which we will attach the merge cursors stage to. + mergePipeline = uassertStatusOK(Pipeline::parse(std::vector<BSONObj>(), expCtx)); + } + + cluster_aggregation_planner::addMergeCursorsSource( + mergePipeline.get(), + liteParsedPipeline, + shardDispatchResults.commandForTargetedShards, + std::move(shardDispatchResults.remoteCursors), + targetedShards, + shardCursorsSortSpec, + Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor()); + + return mergePipeline; +} +} // namespace sharded_agg_helpers +} // namespace mongo diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.h b/src/mongo/db/pipeline/sharded_agg_helpers.h new file mode 100644 index 00000000000..f1dc2c2d91e --- /dev/null +++ b/src/mongo/db/pipeline/sharded_agg_helpers.h @@ -0,0 +1,137 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/pipeline.h" +#include "mongo/s/async_requests_sender.h" +#include "mongo/s/catalog_cache.h" +#include "mongo/s/query/cluster_aggregation_planner.h" + +namespace mongo { +class CachedCollectionRoutingInfo; + +namespace sharded_agg_helpers { +struct DispatchShardPipelineResults { + // True if this pipeline was split, and the second half of the pipeline needs to be run on + // the primary shard for the database. + bool needsPrimaryShardMerge; + + // Populated if this *is not* an explain, this vector represents the cursors on the remote + // shards. + std::vector<OwnedRemoteCursor> remoteCursors; + + // Populated if this *is* an explain, this vector represents the results from each shard. + std::vector<AsyncRequestsSender::Response> remoteExplainOutput; + + // The split version of the pipeline if more than one shard was targeted, otherwise + // boost::none. + boost::optional<cluster_aggregation_planner::SplitPipeline> splitPipeline; + + // If the pipeline targeted a single shard, this is the pipeline to run on that shard. + std::unique_ptr<Pipeline, PipelineDeleter> pipelineForSingleShard; + + // The command object to send to the targeted shards. + BSONObj commandForTargetedShards; + + // How many exchange producers are running the shard part of splitPipeline. + size_t numProducers; + + // The exchange specification if the query can run with the exchange otherwise boost::none. + boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec; +}; + +Shard::RetryPolicy getDesiredRetryPolicy(const AggregationRequest& req); + +bool mustRunOnAllShards(const NamespaceString& nss, const LiteParsedPipeline& litePipe); + +StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo(OperationContext* opCtx, + const NamespaceString& execNss); + +/** + * Targets shards for the pipeline and returns a struct with the remote cursors or results, and the + * pipeline that will need to be executed to merge the results from the remotes. If a stale shard + * version is encountered, refreshes the routing table and tries again. + */ +DispatchShardPipelineResults dispatchShardPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& executionNss, + const AggregationRequest& aggRequest, + const LiteParsedPipeline& liteParsedPipeline, + std::unique_ptr<Pipeline, PipelineDeleter> pipeline, + BSONObj collationObj); + +std::set<ShardId> getTargetedShards(OperationContext* opCtx, + bool mustRunOnAllShards, + const boost::optional<CachedCollectionRoutingInfo>& routingInfo, + const BSONObj shardQuery, + const BSONObj collation); + +std::vector<RemoteCursor> establishShardCursors( + OperationContext* opCtx, + const NamespaceString& nss, + const LiteParsedPipeline& litePipe, + boost::optional<CachedCollectionRoutingInfo>& routingInfo, + const BSONObj& cmdObj, + const AggregationRequest& request, + const ReadPreferenceSetting& readPref, + const BSONObj& shardQuery); + +BSONObj createCommandForTargetedShards( + OperationContext* opCtx, + const AggregationRequest& request, + const LiteParsedPipeline& litePipe, + const cluster_aggregation_planner::SplitPipeline& splitPipeline, + const BSONObj collationObj, + const boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec, + bool needsMerge); + +BSONObj createPassthroughCommandForShard(OperationContext* opCtx, + const AggregationRequest& request, + const boost::optional<ShardId>& shardId, + Pipeline* pipeline, + BSONObj collationObj); + +BSONObj genericTransformForShards(MutableDocument&& cmdForShards, + OperationContext* opCtx, + const boost::optional<ShardId>& shardId, + const AggregationRequest& request, + BSONObj collationObj); + +/** + * For a sharded collection, establishes remote cursors on each shard that may have results, and + * creates a DocumentSourceMergeCursors stage to merge the remove cursors. Returns a pipeline + * beginning with that DocumentSourceMergeCursors stage. Note that one of the 'remote' cursors might + * be this node itself. + */ +std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors( + const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* ownedPipeline); + +} // namespace sharded_agg_helpers +} // namespace mongo diff --git a/src/mongo/s/cluster_commands_helpers.cpp b/src/mongo/s/cluster_commands_helpers.cpp index 062e192140b..13ca158622f 100644 --- a/src/mongo/s/cluster_commands_helpers.cpp +++ b/src/mongo/s/cluster_commands_helpers.cpp @@ -553,6 +553,7 @@ std::set<ShardId> getTargetedShardsForQuery(OperationContext* opCtx, StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfoForTxnCmd( OperationContext* opCtx, const NamespaceString& nss) { auto catalogCache = Grid::get(opCtx)->catalogCache(); + invariant(catalogCache); // Return the latest routing table if not running in a transaction with snapshot level read // concern. diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript index d06c2e91373..08042d1f9f9 100644 --- a/src/mongo/s/query/SConscript +++ b/src/mongo/s/query/SConscript @@ -32,6 +32,7 @@ env.Library( '$BUILD_DIR/mongo/s/query/cluster_aggregation_planner', '$BUILD_DIR/mongo/s/query/cluster_client_cursor', '$BUILD_DIR/mongo/db/pipeline/mongos_process_interface', + '$BUILD_DIR/mongo/db/pipeline/sharded_agg_helpers', '$BUILD_DIR/mongo/db/views/views', 'cluster_query', ] diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp index 568f96021f4..d2071b01fc6 100644 --- a/src/mongo/s/query/cluster_aggregate.cpp +++ b/src/mongo/s/query/cluster_aggregate.cpp @@ -49,6 +49,7 @@ #include "mongo/db/pipeline/lite_parsed_pipeline.h" #include "mongo/db/pipeline/mongos_process_interface.h" #include "mongo/db/pipeline/pipeline.h" +#include "mongo/db/pipeline/sharded_agg_helpers.h" #include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/db/query/cursor_response.h" #include "mongo/db/query/find_common.h" @@ -140,13 +141,13 @@ BSONObj createCommandForMergingShard(const AggregationRequest& request, return appendAllowImplicitCreate(aggCmd, true); } -MongoSInterface::DispatchShardPipelineResults dispatchExchangeConsumerPipeline( +sharded_agg_helpers::DispatchShardPipelineResults dispatchExchangeConsumerPipeline( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& executionNss, const AggregationRequest& request, const LiteParsedPipeline& litePipe, BSONObj collationObj, - MongoSInterface::DispatchShardPipelineResults* shardDispatchResults) { + sharded_agg_helpers::DispatchShardPipelineResults* shardDispatchResults) { invariant(!litePipe.hasChangeStream()); auto opCtx = expCtx->opCtx; @@ -183,7 +184,7 @@ MongoSInterface::DispatchShardPipelineResults dispatchExchangeConsumerPipeline( consumerPipelines.emplace_back(std::move(consumerPipeline), nullptr, boost::none); - auto consumerCmdObj = MongoSInterface::createCommandForTargetedShards( + auto consumerCmdObj = sharded_agg_helpers::createCommandForTargetedShards( opCtx, request, litePipe, consumerPipelines.back(), collationObj, boost::none, false); requests.emplace_back(shardDispatchResults->exchangeSpec->consumerShards[idx], @@ -216,16 +217,16 @@ MongoSInterface::DispatchShardPipelineResults dispatchExchangeConsumerPipeline( static_cast<DocumentSourceMergeCursors*>(pipeline.shardsPipeline->peekFront()); mergeCursors->dismissCursorOwnership(); } - return MongoSInterface::DispatchShardPipelineResults{false, - std::move(ownedCursors), - {} /*TODO SERVER-36279*/, - std::move(splitPipeline), - nullptr, - BSONObj(), - numConsumers}; + return sharded_agg_helpers::DispatchShardPipelineResults{false, + std::move(ownedCursors), + {} /*TODO SERVER-36279*/, + std::move(splitPipeline), + nullptr, + BSONObj(), + numConsumers}; } -Status appendExplainResults(MongoSInterface::DispatchShardPipelineResults&& dispatchResults, +Status appendExplainResults(sharded_agg_helpers::DispatchShardPipelineResults&& dispatchResults, const boost::intrusive_ptr<ExpressionContext>& mergeCtx, BSONObjBuilder* result) { if (dispatchResults.splitPipeline) { @@ -287,7 +288,7 @@ Shard::CommandResponse establishMergingShardCursor(OperationContext* opCtx, const auto mergingShard = uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, mergingShardId)); - Shard::RetryPolicy retryPolicy = MongoSInterface::getDesiredRetryPolicy(request); + Shard::RetryPolicy retryPolicy = sharded_agg_helpers::getDesiredRetryPolicy(request); return uassertStatusOK(mergingShard->runCommandWithFixedRetryAttempts( opCtx, ReadPreferenceSetting::get(opCtx), nss.db().toString(), mergeCmdObj, retryPolicy)); } @@ -599,13 +600,14 @@ Status runPipelineOnMongoS(const boost::intrusive_ptr<ExpressionContext>& expCtx return getStatusFromCommandResult(result->asTempObj()); } -Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const ClusterAggregate::Namespaces& namespaces, - const AggregationRequest& request, - const LiteParsedPipeline& litePipe, - const boost::optional<CachedCollectionRoutingInfo>& routingInfo, - MongoSInterface::DispatchShardPipelineResults&& shardDispatchResults, - BSONObjBuilder* result) { +Status dispatchMergingPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const ClusterAggregate::Namespaces& namespaces, + const AggregationRequest& request, + const LiteParsedPipeline& litePipe, + const boost::optional<CachedCollectionRoutingInfo>& routingInfo, + sharded_agg_helpers::DispatchShardPipelineResults&& shardDispatchResults, + BSONObjBuilder* result) { // We should never be in a situation where we call this function on a non-merge pipeline. invariant(shardDispatchResults.splitPipeline); auto* mergePipeline = shardDispatchResults.splitPipeline->mergePipeline.get(); @@ -689,7 +691,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, BSONObjBuilder* result) { uassert(51028, "Cannot specify exchange option to a mongos", !request.getExchangeSpec()); auto executionNsRoutingInfoStatus = - MongoSInterface::getExecutionNsRoutingInfo(opCtx, namespaces.executionNss); + sharded_agg_helpers::getExecutionNsRoutingInfo(opCtx, namespaces.executionNss); boost::optional<CachedCollectionRoutingInfo> routingInfo; LiteParsedPipeline litePipe(request); const auto isSharded = [](OperationContext* opCtx, const NamespaceString& nss) -> bool { @@ -717,7 +719,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, // Determine whether this aggregation must be dispatched to all shards in the cluster. const bool mustRunOnAll = - MongoSInterface::mustRunOnAllShards(namespaces.executionNss, litePipe); + sharded_agg_helpers::mustRunOnAllShards(namespaces.executionNss, litePipe); // If we don't have a routing table, then this is a $changeStream which must run on all shards. invariant(routingInfo || (mustRunOnAll && litePipe.hasChangeStream())); @@ -768,7 +770,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, } // If not, split the pipeline as necessary and dispatch to the relevant shards. - auto shardDispatchResults = MongoSInterface::dispatchShardPipeline( + auto shardDispatchResults = sharded_agg_helpers::dispatchShardPipeline( expCtx, namespaces.executionNss, request, litePipe, std::move(pipeline), collationObj); // If the operation is an explain, then we verify that it succeeded on all targeted shards, @@ -846,7 +848,7 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx, // Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an // explain if necessary, and rewrites the result into a format safe to forward to shards. BSONObj cmdObj = CommandHelpers::filterCommandRequestForPassthrough( - MongoSInterface::createPassthroughCommandForShard( + sharded_agg_helpers::createPassthroughCommandForShard( opCtx, aggRequest, shardId, nullptr, BSONObj())); auto cmdResponse = uassertStatusOK(shard->runCommandWithFixedRetryAttempts( |