summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIan Boros <ian.boros@10gen.com>2019-01-03 16:02:22 -0500
committerIan Boros <ian.boros@10gen.com>2019-01-29 12:20:53 -0500
commitda3c2c2dfcf0fc680a4f49f8f29ab0671f345d61 (patch)
tree2fea8318b64d66fbd14847d01e94a3bc12886b01
parentcf47aee946c42c246a9176e1df1cd27b12dde685 (diff)
downloadmongo-da3c2c2dfcf0fc680a4f49f8f29ab0671f345d61.tar.gz
SERVER-38728 allow pipeline with lookup stage on sharded collection to run on mongod
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml1
-rw-r--r--jstests/core/txns/aggregation_in_transaction.js56
-rw-r--r--jstests/sharding/lookup_mongod_unaware.js16
-rw-r--r--jstests/sharding/lookup_on_shard.js141
-rw-r--r--src/mongo/db/pipeline/SConscript15
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp5
-rw-r--r--src/mongo/db/pipeline/mongos_process_interface.cpp443
-rw-r--r--src/mongo/db/pipeline/mongos_process_interface.h48
-rw-r--r--src/mongo/db/pipeline/process_interface_shardsvr.cpp47
-rw-r--r--src/mongo/db/pipeline/process_interface_shardsvr.h3
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.cpp34
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.h2
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp488
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.h137
-rw-r--r--src/mongo/s/cluster_commands_helpers.cpp1
-rw-r--r--src/mongo/s/query/SConscript1
-rw-r--r--src/mongo/s/query/cluster_aggregate.cpp48
17 files changed, 915 insertions, 571 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
index 96c0ba7845a..7b811248e13 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
@@ -57,6 +57,7 @@ selector:
- jstests/sharding/restart_transactions.js
- jstests/sharding/shard7.js
- jstests/sharding/shard_config_db_collections.js
+ - jstests/sharding/lookup_on_shard.js
# TODO: SERVER-38541 remove from blacklist
- jstests/sharding/shard_collection_existing_zones.js
- jstests/sharding/single_shard_transaction_with_arbiter.js
diff --git a/jstests/core/txns/aggregation_in_transaction.js b/jstests/core/txns/aggregation_in_transaction.js
index acc30c81873..5cd5285a6b5 100644
--- a/jstests/core/txns/aggregation_in_transaction.js
+++ b/jstests/core/txns/aggregation_in_transaction.js
@@ -3,6 +3,8 @@
(function() {
"use strict";
+ load("jstests/libs/fixture_helpers.js"); // For isSharded.
+
const session = db.getMongo().startSession({causalConsistency: false});
const testDB = session.getDatabase("test");
const coll = testDB.getCollection("aggregation_in_transaction");
@@ -21,6 +23,8 @@
const foreignDoc = {_id: "orange", val: 9};
assert.commandWorked(foreignColl.insert(foreignDoc, {writeConcern: {w: "majority"}}));
+ const isForeignSharded = FixtureHelpers.isSharded(foreignColl);
+
// Run a dummy find to start the transaction.
jsTestLog("Starting transaction.");
session.startTransaction({readConcern: {level: "snapshot"}});
@@ -43,29 +47,35 @@
assert(!cursor.hasNext());
// Perform aggregations that look at other collections.
- const lookupDoc = Object.extend(testDoc, {lookup: [foreignDoc]});
- cursor = coll.aggregate({
- $lookup: {
- from: foreignColl.getName(),
- localField: "foreignKey",
- foreignField: "_id",
- as: "lookup",
- }
- });
- assert.docEq(cursor.next(), lookupDoc);
- assert(!cursor.hasNext());
-
- cursor = coll.aggregate({
- $graphLookup: {
- from: foreignColl.getName(),
- startWith: "$foreignKey",
- connectFromField: "foreignKey",
- connectToField: "_id",
- as: "lookup"
- }
- });
- assert.docEq(cursor.next(), lookupDoc);
- assert(!cursor.hasNext());
+ // TODO: SERVER-39162 Sharded $lookup is not supported in transactions.
+ if (!isForeignSharded) {
+ const lookupDoc = Object.extend(testDoc, {lookup: [foreignDoc]});
+ cursor = coll.aggregate({
+ $lookup: {
+ from: foreignColl.getName(),
+ localField: "foreignKey",
+ foreignField: "_id",
+ as: "lookup",
+ }
+ });
+ assert.docEq(cursor.next(), lookupDoc);
+ assert(!cursor.hasNext());
+
+ cursor = coll.aggregate({
+ $graphLookup: {
+ from: foreignColl.getName(),
+ startWith: "$foreignKey",
+ connectFromField: "foreignKey",
+ connectToField: "_id",
+ as: "lookup"
+ }
+ });
+ assert.docEq(cursor.next(), lookupDoc);
+ assert(!cursor.hasNext());
+ } else {
+ // TODO SERVER-39048: Test that $lookup on sharded collection is banned
+ // within a transaction.
+ }
jsTestLog("Testing $count within a transaction.");
diff --git a/jstests/sharding/lookup_mongod_unaware.js b/jstests/sharding/lookup_mongod_unaware.js
index 0c6072f8095..a7dfd6bc38a 100644
--- a/jstests/sharding/lookup_mongod_unaware.js
+++ b/jstests/sharding/lookup_mongod_unaware.js
@@ -94,17 +94,11 @@
restartPrimaryShard(st.rs0, mongos0LocalColl, mongos0ForeignColl);
assert.eq(mongos0LocalColl.aggregate(pipeline).toArray(), expectedResults);
- // Verify $lookup results through mongos1, which is not aware that the local
- // collection is sharded. The results are expected to be incorrect when both the mongos and
- // primary shard incorrectly believe that a collection is unsharded.
- // TODO: This should be fixed by SERVER-32629, likewise for the other aggregates in this file
- // sent to the stale mongos.
+ // Verify $lookup results through mongos1, which is not aware that the foreign collection is
+ // sharded. In this case the results will be correct since the entire pipeline will be run on a
+ // shard, which will do a refresh before executing the foreign pipeline.
restartPrimaryShard(st.rs0, mongos0LocalColl, mongos0ForeignColl);
- assert.eq(mongos1LocalColl.aggregate(pipeline).toArray(), [
- {_id: 0, a: 1, "same": []},
- {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]},
- {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]}
- ]);
+ assert.eq(mongos1LocalColl.aggregate(pipeline).toArray(), expectedResults);
//
// Test sharded local and sharded foreign collections, with the primary shard unaware that
@@ -134,6 +128,8 @@
// Verify $lookup results through mongos1, which is not aware that the local
// collection is sharded. The results are expected to be incorrect when both the mongos and
// primary shard incorrectly believe that a collection is unsharded.
+ // TODO: This should be fixed by SERVER-32629, likewise for the other aggregates in this file
+ // sent to the stale mongos.
restartPrimaryShard(st.rs0, mongos0LocalColl, mongos0ForeignColl);
assert.eq(mongos1LocalColl.aggregate(pipeline).toArray(), [
{_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]},
diff --git a/jstests/sharding/lookup_on_shard.js b/jstests/sharding/lookup_on_shard.js
new file mode 100644
index 00000000000..c7c90d7c626
--- /dev/null
+++ b/jstests/sharding/lookup_on_shard.js
@@ -0,0 +1,141 @@
+// Test that a pipeline with a $lookup stage on a sharded foreign collection may be run on a mongod.
+(function() {
+ const sharded = new ShardingTest({mongos: 1, shards: 2});
+
+ assert.commandWorked(sharded.s.adminCommand({enableSharding: "test"}));
+ sharded.ensurePrimaryShard('test', sharded.shard0.shardName);
+
+ const coll = sharded.s.getDB('test').mainColl;
+ const foreignColl = sharded.s.getDB('test').foreignColl;
+ const smallColl = sharded.s.getDB("test").smallColl;
+
+ const nDocsMainColl = 10;
+ const nDocsForeignColl = 2 * nDocsMainColl;
+
+ for (let i = 0; i < nDocsMainColl; i++) {
+ assert.commandWorked(coll.insert({_id: i, collName: "mainColl", foreignId: i}));
+
+ assert.commandWorked(
+ foreignColl.insert({_id: 2 * i, key: i, collName: "foreignColl", data: "hello-0"}));
+ assert.commandWorked(
+ foreignColl.insert({_id: 2 * i + 1, key: i, collName: "foreignColl", data: "hello-1"}));
+ }
+ assert.commandWorked(smallColl.insert({_id: 0, collName: "smallColl"}));
+
+ const runTest = function() {
+ (function testSingleLookupFromShard() {
+ // Run a pipeline which must be merged on a shard. This should force the $lookup (on
+ // the sharded collection) to be run on a mongod.
+ pipeline = [
+ {
+ $lookup: {
+ localField: "foreignId",
+ foreignField: "key",
+ from: "foreignColl",
+ as: "foreignDoc"
+ }
+ },
+ {$_internalSplitPipeline: {mergeType: "anyShard"}}
+ ];
+
+ const results = coll.aggregate(pipeline).toArray();
+ assert.eq(results.length, nDocsMainColl);
+ for (let i = 0; i < results.length; i++) {
+ assert.eq(results[i].foreignDoc.length, 2, results[i]);
+ }
+ })();
+
+ (function testMultipleLookupsFromShard() {
+ // Run two lookups in a row (both on mongod).
+ pipeline = [
+ {
+ $lookup: {
+ localField: "foreignId",
+ foreignField: "key",
+ from: "foreignColl",
+ as: "foreignDoc"
+ }
+ },
+ {
+ $lookup: {
+ from: "smallColl",
+ as: "smallCollDocs",
+ pipeline: [],
+ }
+ },
+ {$_internalSplitPipeline: {mergeType: "anyShard"}}
+ ];
+ const results = coll.aggregate(pipeline).toArray();
+ assert.eq(results.length, nDocsMainColl);
+ for (let i = 0; i < results.length; i++) {
+ assert.eq(results[i].foreignDoc.length, 2, results[i]);
+ assert.eq(results[i].smallCollDocs.length, 1, results[i]);
+ }
+ })();
+
+ (function testUnshardedLookupWithinShardedLookup() {
+ // Pipeline with unsharded $lookup inside a sharded $lookup.
+ pipeline = [
+ {
+ $lookup: {
+ from: "foreignColl",
+ as: "foreignDoc",
+ pipeline: [
+ {$lookup: {from: "smallColl", as: "doc", pipeline: []}},
+ ],
+ }
+ },
+ {$_internalSplitPipeline: {mergeType: "anyShard"}}
+ ];
+ const results = coll.aggregate(pipeline).toArray();
+
+ assert.eq(results.length, nDocsMainColl);
+ for (let i = 0; i < results.length; i++) {
+ assert.eq(results[i].foreignDoc.length, nDocsForeignColl);
+ for (let j = 0; j < nDocsForeignColl; j++) {
+ // Each document pulled from the foreign collection should have one document
+ // from "smallColl."
+ assert.eq(results[i].foreignDoc[j].collName, "foreignColl");
+
+ // TODO SERVER-39016: Once a mongod is able to target the primary shard when
+ // reading from a non-sharded collection this should always work. Until then,
+ // the results of the query depend on which shard is chosen as the merging
+ // shard. If the primary shard is chosen, we'll get the correct results (and
+ // correctly find a document in "smallColl"). Otherwise if the merging shard is
+ // not the primary shard, the merging shard will attempt to do a local read (on
+ // an empty/non-existent collection), which will return nothing.
+ if (results[i].foreignDoc[j].doc.length === 1) {
+ assert.eq(results[i].foreignDoc[j].doc[0].collName, "smallColl");
+ } else {
+ assert.eq(results[i].foreignDoc[j].doc.length, 0);
+ }
+ }
+ }
+ })();
+ };
+
+ jsTestLog("Running test with neither collection sharded");
+ runTest();
+
+ jsTestLog("Running test with foreign collection sharded");
+ sharded.shardColl("foreignColl",
+ {_id: 1}, // shard key
+ {_id: 5}, // split
+ {_id: 5}, // move
+ "test", // dbName
+ true // waitForDelete
+ );
+ runTest();
+
+ jsTestLog("Running test with main and foreign collection sharded");
+ sharded.shardColl("mainColl",
+ {_id: 1}, // shard key
+ {_id: 5}, // split
+ {_id: 5}, // move
+ "test", // dbName
+ true // waitForDelete
+ );
+ runTest();
+
+ sharded.stop();
+})();
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index b2037454108..f03305d8d53 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -194,6 +194,19 @@ env.Library(
]
)
+env.Library(
+ target='sharded_agg_helpers',
+ source=[
+ 'sharded_agg_helpers.cpp',
+ ],
+ LIBDEPS=[
+ 'aggregation',
+ '$BUILD_DIR/mongo/s/async_requests_sender',
+ '$BUILD_DIR/mongo/s/query/cluster_aggregation_planner',
+ '$BUILD_DIR/mongo/s/commands/shared_cluster_commands',
+ ],
+)
+
env.CppUnitTest(
target='document_source_test',
source=[
@@ -314,6 +327,7 @@ env.Library(
LIBDEPS=[
'$BUILD_DIR/mongo/s/sharding_api',
'process_interface_standalone',
+ 'sharded_agg_helpers',
],
)
@@ -328,6 +342,7 @@ env.Library(
'$BUILD_DIR/mongo/s/query/cluster_aggregation_planner',
'$BUILD_DIR/mongo/s/query/cluster_query',
'mongo_process_common',
+ 'sharded_agg_helpers',
]
)
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index 09e7e3442ac..692717c6966 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -197,10 +197,10 @@ StageConstraints DocumentSourceLookUp::constraints(Pipeline::SplitState) const {
}
// If executing on mongos and the foreign collection is sharded, then this stage can run on
- // mongos.
+ // mongos or any shard.
HostTypeRequirement hostRequirement =
(pExpCtx->inMongos && pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _fromNs))
- ? HostTypeRequirement::kMongoS
+ ? HostTypeRequirement::kNone
: HostTypeRequirement::kPrimaryShard;
StageConstraints constraints(StreamType::kStreaming,
@@ -330,6 +330,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipeline(
_cache.reset();
}
+ invariant(pipeline);
return pipeline;
}
diff --git a/src/mongo/db/pipeline/mongos_process_interface.cpp b/src/mongo/db/pipeline/mongos_process_interface.cpp
index f4c12f2cadf..896554f2dc2 100644
--- a/src/mongo/db/pipeline/mongos_process_interface.cpp
+++ b/src/mongo/db/pipeline/mongos_process_interface.cpp
@@ -39,6 +39,7 @@
#include "mongo/db/curop.h"
#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/sharded_agg_helpers.h"
#include "mongo/db/query/collation/collation_spec.h"
#include "mongo/db/query/collation/collator_factory_interface.h"
#include "mongo/db/repl/read_concern_args.h"
@@ -49,7 +50,6 @@
#include "mongo/s/grid.h"
#include "mongo/s/query/cluster_cursor_manager.h"
#include "mongo/s/query/cluster_query_knobs.h"
-#include "mongo/s/query/document_source_merge_cursors.h"
#include "mongo/s/query/establish_cursors.h"
#include "mongo/s/query/router_exec_stage.h"
#include "mongo/s/transaction_router.h"
@@ -58,98 +58,12 @@
namespace mongo {
-MONGO_FAIL_POINT_DEFINE(clusterAggregateHangBeforeEstablishingShardCursors);
-
using boost::intrusive_ptr;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
namespace {
-// Given a document representing an aggregation command such as
-//
-// {aggregate: "myCollection", pipeline: [], ...},
-//
-// produces the corresponding explain command:
-//
-// {explain: {aggregate: "myCollection", pipline: [], ...}, $queryOptions: {...}, verbosity: ...}
-Document wrapAggAsExplain(Document aggregateCommand, ExplainOptions::Verbosity verbosity) {
- MutableDocument explainCommandBuilder;
- explainCommandBuilder["explain"] = Value(aggregateCommand);
- // Downstream host targeting code expects queryOptions at the top level of the command object.
- explainCommandBuilder[QueryRequest::kUnwrappedReadPrefField] =
- Value(aggregateCommand[QueryRequest::kUnwrappedReadPrefField]);
-
- // readConcern needs to be promoted to the top-level of the request.
- explainCommandBuilder[repl::ReadConcernArgs::kReadConcernFieldName] =
- Value(aggregateCommand[repl::ReadConcernArgs::kReadConcernFieldName]);
-
- // Add explain command options.
- for (auto&& explainOption : ExplainOptions::toBSON(verbosity)) {
- explainCommandBuilder[explainOption.fieldNameStringData()] = Value(explainOption);
- }
-
- return explainCommandBuilder.freeze();
-}
-
-std::vector<RemoteCursor> establishShardCursors(
- OperationContext* opCtx,
- const NamespaceString& nss,
- const LiteParsedPipeline& litePipe,
- boost::optional<CachedCollectionRoutingInfo>& routingInfo,
- const BSONObj& cmdObj,
- const AggregationRequest& request,
- const ReadPreferenceSetting& readPref,
- const BSONObj& shardQuery) {
- LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards";
-
- const bool mustRunOnAll = MongoSInterface::mustRunOnAllShards(nss, litePipe);
- std::set<ShardId> shardIds = MongoSInterface::getTargetedShards(
- opCtx, mustRunOnAll, routingInfo, shardQuery, request.getCollation());
- std::vector<std::pair<ShardId, BSONObj>> requests;
-
- // If we don't need to run on all shards, then we should always have a valid routing table.
- invariant(routingInfo || mustRunOnAll);
-
- if (mustRunOnAll) {
- // The pipeline contains a stage which must be run on all shards. Skip versioning and
- // enqueue the raw command objects.
- for (auto&& shardId : shardIds) {
- requests.emplace_back(std::move(shardId), cmdObj);
- }
- } else if (routingInfo->cm()) {
- // The collection is sharded. Use the routing table to decide which shards to target
- // based on the query and collation, and build versioned requests for them.
- for (auto& shardId : shardIds) {
- auto versionedCmdObj =
- appendShardVersion(cmdObj, routingInfo->cm()->getVersion(shardId));
- requests.emplace_back(std::move(shardId), std::move(versionedCmdObj));
- }
- } else {
- // The collection is unsharded. Target only the primary shard for the database.
- // Don't append shard version info when contacting the config servers.
- requests.emplace_back(routingInfo->db().primaryId(),
- !routingInfo->db().primary()->isConfig()
- ? appendShardVersion(cmdObj, ChunkVersion::UNSHARDED())
- : cmdObj);
- }
-
- if (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) {
- log() << "clusterAggregateHangBeforeEstablishingShardCursors fail point enabled. Blocking "
- "until fail point is disabled.";
- while (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) {
- sleepsecs(1);
- }
- }
-
- return establishCursors(opCtx,
- Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
- nss,
- readPref,
- requests,
- false /* do not allow partial results */,
- MongoSInterface::getDesiredRetryPolicy(request));
-}
/**
* Determines the single shard to which the given query will be targeted, and its associated
@@ -210,307 +124,6 @@ bool supportsUniqueKey(const boost::intrusive_ptr<ExpressionContext>& expCtx,
} // namespace
-Shard::RetryPolicy MongoSInterface::getDesiredRetryPolicy(const AggregationRequest& req) {
- // The idempotent retry policy will retry even for writeConcern failures, so only set it if the
- // pipeline does not support writeConcern.
- if (req.getWriteConcern()) {
- return Shard::RetryPolicy::kNotIdempotent;
- }
- return Shard::RetryPolicy::kIdempotent;
-}
-
-BSONObj MongoSInterface::createPassthroughCommandForShard(OperationContext* opCtx,
- const AggregationRequest& request,
- const boost::optional<ShardId>& shardId,
- Pipeline* pipeline,
- BSONObj collationObj) {
- // Create the command for the shards.
- MutableDocument targetedCmd(request.serializeToCommandObj());
- if (pipeline) {
- targetedCmd[AggregationRequest::kPipelineName] = Value(pipeline->serialize());
- }
-
- return MongoSInterface::genericTransformForShards(
- std::move(targetedCmd), opCtx, shardId, request, collationObj);
-}
-
-BSONObj MongoSInterface::genericTransformForShards(MutableDocument&& cmdForShards,
- OperationContext* opCtx,
- const boost::optional<ShardId>& shardId,
- const AggregationRequest& request,
- BSONObj collationObj) {
- cmdForShards[AggregationRequest::kFromMongosName] = Value(true);
- // If this is a request for an aggregation explain, then we must wrap the aggregate inside an
- // explain command.
- if (auto explainVerbosity = request.getExplain()) {
- cmdForShards.reset(wrapAggAsExplain(cmdForShards.freeze(), *explainVerbosity));
- }
-
- if (!collationObj.isEmpty()) {
- cmdForShards[AggregationRequest::kCollationName] = Value(collationObj);
- }
-
- if (opCtx->getTxnNumber()) {
- invariant(cmdForShards.peek()[OperationSessionInfo::kTxnNumberFieldName].missing(),
- str::stream() << "Command for shards unexpectedly had the "
- << OperationSessionInfo::kTxnNumberFieldName
- << " field set: "
- << cmdForShards.peek().toString());
- cmdForShards[OperationSessionInfo::kTxnNumberFieldName] =
- Value(static_cast<long long>(*opCtx->getTxnNumber()));
- }
-
- auto aggCmd = cmdForShards.freeze().toBson();
-
- if (shardId) {
- if (auto txnRouter = TransactionRouter::get(opCtx)) {
- aggCmd = txnRouter->attachTxnFieldsIfNeeded(*shardId, aggCmd);
- }
- }
-
- // agg creates temp collection and should handle implicit create separately.
- return appendAllowImplicitCreate(aggCmd, true);
-}
-
-BSONObj MongoSInterface::createCommandForTargetedShards(
- OperationContext* opCtx,
- const AggregationRequest& request,
- const LiteParsedPipeline& litePipe,
- const cluster_aggregation_planner::SplitPipeline& splitPipeline,
- const BSONObj collationObj,
- const boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec,
- bool needsMerge) {
-
- // Create the command for the shards.
- MutableDocument targetedCmd(request.serializeToCommandObj());
- // If we've parsed a pipeline on mongos, always override the pipeline, in case parsing it
- // has defaulted any arguments or otherwise changed the spec. For example, $listSessions may
- // have detected a logged in user and appended that user name to the $listSessions spec to
- // send to the shards.
- targetedCmd[AggregationRequest::kPipelineName] =
- Value(splitPipeline.shardsPipeline->serialize());
-
- // When running on many shards with the exchange we may not need merging.
- if (needsMerge) {
- targetedCmd[AggregationRequest::kNeedsMergeName] = Value(true);
-
- // If this is a change stream aggregation, set the 'mergeByPBRT' flag on the command. This
- // notifies the shards that the mongoS is capable of merging streams based on resume token.
- // TODO SERVER-38539: the 'mergeByPBRT' flag is no longer necessary in 4.4.
- targetedCmd[AggregationRequest::kMergeByPBRTName] = Value(litePipe.hasChangeStream());
-
- // For split pipelines which need merging, do *not* propagate the writeConcern to the shards
- // part. Otherwise this is part of an exchange and in that case we should include the
- // writeConcern.
- targetedCmd[WriteConcernOptions::kWriteConcernField] = Value();
- }
-
- targetedCmd[AggregationRequest::kCursorName] =
- Value(DOC(AggregationRequest::kBatchSizeName << 0));
-
- targetedCmd[AggregationRequest::kExchangeName] =
- exchangeSpec ? Value(exchangeSpec->exchangeSpec.toBSON()) : Value();
-
- return genericTransformForShards(
- std::move(targetedCmd), opCtx, boost::none, request, collationObj);
-}
-
-std::set<ShardId> MongoSInterface::getTargetedShards(
- OperationContext* opCtx,
- bool mustRunOnAllShards,
- const boost::optional<CachedCollectionRoutingInfo>& routingInfo,
- const BSONObj shardQuery,
- const BSONObj collation) {
- if (mustRunOnAllShards) {
- // The pipeline begins with a stage which must be run on all shards.
- std::vector<ShardId> shardIds;
- Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx, &shardIds);
- return {shardIds.begin(), shardIds.end()};
- }
-
- // If we don't need to run on all shards, then we should always have a valid routing table.
- invariant(routingInfo);
-
- return getTargetedShardsForQuery(opCtx, *routingInfo, shardQuery, collation);
-}
-
-bool MongoSInterface::mustRunOnAllShards(const NamespaceString& nss,
- const LiteParsedPipeline& litePipe) {
- // The following aggregations must be routed to all shards:
- // - Any collectionless aggregation, such as non-localOps $currentOp.
- // - Any aggregation which begins with a $changeStream stage.
- return nss.isCollectionlessAggregateNS() || litePipe.hasChangeStream();
-}
-
-StatusWith<CachedCollectionRoutingInfo> MongoSInterface::getExecutionNsRoutingInfo(
- OperationContext* opCtx, const NamespaceString& execNss) {
- // First, verify that there are shards present in the cluster. If not, then we return the
- // stronger 'ShardNotFound' error rather than 'NamespaceNotFound'. We must do this because
- // $changeStream aggregations ignore NamespaceNotFound in order to allow streams to be opened on
- // a collection before its enclosing database is created. However, if there are no shards
- // present, then $changeStream should immediately return an empty cursor just as other
- // aggregations do when the database does not exist.
- std::vector<ShardId> shardIds;
- Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx, &shardIds);
- if (shardIds.size() == 0) {
- return {ErrorCodes::ShardNotFound, "No shards are present in the cluster"};
- }
-
- // This call to getCollectionRoutingInfoForTxnCmd will return !OK if the database does not
- // exist.
- return getCollectionRoutingInfoForTxnCmd(opCtx, execNss);
-}
-
-/**
- * Targets shards for the pipeline and returns a struct with the remote cursors or results, and
- * the pipeline that will need to be executed to merge the results from the remotes. If a stale
- * shard version is encountered, refreshes the routing table and tries again.
- */
-MongoSInterface::DispatchShardPipelineResults MongoSInterface::dispatchShardPipeline(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& executionNss,
- const AggregationRequest& aggRequest,
- const LiteParsedPipeline& litePipe,
- std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
- BSONObj collationObj) {
- // The process is as follows:
- // - First, determine whether we need to target more than one shard. If so, we split the
- // pipeline; if not, we retain the existing pipeline.
- // - Call establishShardCursors to dispatch the aggregation to the targeted shards.
- // - Stale shard version errors are thrown up to the top-level handler, causing a retry on the
- // entire aggregation commmand.
- auto cursors = std::vector<RemoteCursor>();
- auto shardResults = std::vector<AsyncRequestsSender::Response>();
- auto opCtx = expCtx->opCtx;
-
- const bool needsPrimaryShardMerge =
- (pipeline->needsPrimaryShardMerger() || internalQueryAlwaysMergeOnPrimaryShard.load());
-
- const bool needsMongosMerge = pipeline->needsMongosMerger();
-
- const auto shardQuery = pipeline->getInitialQuery();
-
- auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, executionNss);
-
- // If this is a $changeStream, we swallow NamespaceNotFound exceptions and continue.
- // Otherwise, uassert on all exceptions here.
- if (!(litePipe.hasChangeStream() &&
- executionNsRoutingInfoStatus == ErrorCodes::NamespaceNotFound)) {
- uassertStatusOK(executionNsRoutingInfoStatus);
- }
-
- auto executionNsRoutingInfo = executionNsRoutingInfoStatus.isOK()
- ? std::move(executionNsRoutingInfoStatus.getValue())
- : boost::optional<CachedCollectionRoutingInfo>{};
-
- // Determine whether we can run the entire aggregation on a single shard.
- const bool mustRunOnAll = mustRunOnAllShards(executionNss, litePipe);
- std::set<ShardId> shardIds = getTargetedShards(
- opCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, aggRequest.getCollation());
-
- // Don't need to split the pipeline if we are only targeting a single shard, unless:
- // - There is a stage that needs to be run on the primary shard and the single target shard
- // is not the primary.
- // - The pipeline contains one or more stages which must always merge on mongoS.
- const bool needsSplit = (shardIds.size() > 1u || needsMongosMerge ||
- (needsPrimaryShardMerge && executionNsRoutingInfo &&
- *(shardIds.begin()) != executionNsRoutingInfo->db().primaryId()));
-
- boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec;
- boost::optional<cluster_aggregation_planner::SplitPipeline> splitPipeline;
-
- if (needsSplit) {
- splitPipeline = cluster_aggregation_planner::splitPipeline(std::move(pipeline));
-
- exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange(
- opCtx, splitPipeline->mergePipeline.get());
- }
-
- // Generate the command object for the targeted shards.
- BSONObj targetedCommand = splitPipeline
- ? createCommandForTargetedShards(
- opCtx, aggRequest, litePipe, *splitPipeline, collationObj, exchangeSpec, true)
- : createPassthroughCommandForShard(
- opCtx, aggRequest, boost::none, pipeline.get(), collationObj);
-
- // Refresh the shard registry if we're targeting all shards. We need the shard registry
- // to be at least as current as the logical time used when creating the command for
- // $changeStream to work reliably, so we do a "hard" reload.
- if (mustRunOnAll) {
- auto* shardRegistry = Grid::get(opCtx)->shardRegistry();
- if (!shardRegistry->reload(opCtx)) {
- shardRegistry->reload(opCtx);
- }
- // Rebuild the set of shards as the shard registry might have changed.
- shardIds = getTargetedShards(
- opCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, aggRequest.getCollation());
- }
-
- // Explain does not produce a cursor, so instead we scatter-gather commands to the shards.
- if (expCtx->explain) {
- if (mustRunOnAll) {
- // Some stages (such as $currentOp) need to be broadcast to all shards, and
- // should not participate in the shard version protocol.
- shardResults =
- scatterGatherUnversionedTargetAllShards(opCtx,
- executionNss.db(),
- targetedCommand,
- ReadPreferenceSetting::get(opCtx),
- Shard::RetryPolicy::kIdempotent);
- } else {
- // Aggregations on a real namespace should use the routing table to target
- // shards, and should participate in the shard version protocol.
- invariant(executionNsRoutingInfo);
- shardResults =
- scatterGatherVersionedTargetByRoutingTable(opCtx,
- executionNss.db(),
- executionNss,
- *executionNsRoutingInfo,
- targetedCommand,
- ReadPreferenceSetting::get(opCtx),
- Shard::RetryPolicy::kIdempotent,
- shardQuery,
- aggRequest.getCollation());
- }
- } else {
- cursors = establishShardCursors(opCtx,
- executionNss,
- litePipe,
- executionNsRoutingInfo,
- targetedCommand,
- aggRequest,
- ReadPreferenceSetting::get(opCtx),
- shardQuery);
- invariant(cursors.size() % shardIds.size() == 0,
- str::stream() << "Number of cursors (" << cursors.size()
- << ") is not a multiple of producers ("
- << shardIds.size()
- << ")");
- }
-
- // Convert remote cursors into a vector of "owned" cursors.
- std::vector<OwnedRemoteCursor> ownedCursors;
- for (auto&& cursor : cursors) {
- ownedCursors.emplace_back(OwnedRemoteCursor(opCtx, std::move(cursor), executionNss));
- }
-
- // Record the number of shards involved in the aggregation. If we are required to merge on
- // the primary shard, but the primary shard was not in the set of targeted shards, then we
- // must increment the number of involved shards.
- CurOp::get(opCtx)->debug().nShards =
- shardIds.size() + (needsPrimaryShardMerge && executionNsRoutingInfo &&
- !shardIds.count(executionNsRoutingInfo->db().primaryId()));
-
- return DispatchShardPipelineResults{needsPrimaryShardMerge,
- std::move(ownedCursors),
- std::move(shardResults),
- std::move(splitPipeline),
- std::move(pipeline),
- targetedCommand,
- shardIds.size(),
- exchangeSpec};
-}
-
std::unique_ptr<Pipeline, PipelineDeleter> MongoSInterface::makePipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
@@ -530,61 +143,9 @@ std::unique_ptr<Pipeline, PipelineDeleter> MongoSInterface::makePipeline(
return pipeline;
}
-
std::unique_ptr<Pipeline, PipelineDeleter> MongoSInterface::attachCursorSourceToPipeline(
const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* ownedPipeline) {
- std::unique_ptr<Pipeline, PipelineDeleter> pipeline(ownedPipeline,
- PipelineDeleter(expCtx->opCtx));
-
- invariant(pipeline->getSources().empty() ||
- !dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get()));
-
- // Generate the command object for the targeted shards.
- std::vector<BSONObj> rawStages = [&pipeline]() {
- auto serialization = pipeline->serialize();
- std::vector<BSONObj> stages;
- stages.reserve(serialization.size());
-
- for (const auto& stageObj : serialization) {
- invariant(stageObj.getType() == BSONType::Object);
- stages.push_back(stageObj.getDocument().toBson());
- }
-
- return stages;
- }();
-
- AggregationRequest aggRequest(expCtx->ns, rawStages);
- LiteParsedPipeline liteParsedPipeline(aggRequest);
- auto shardDispatchResults = MongoSInterface::dispatchShardPipeline(
- expCtx, expCtx->ns, aggRequest, liteParsedPipeline, std::move(pipeline), expCtx->collation);
-
- std::vector<ShardId> targetedShards;
- targetedShards.reserve(shardDispatchResults.remoteCursors.size());
- for (auto&& remoteCursor : shardDispatchResults.remoteCursors) {
- targetedShards.emplace_back(remoteCursor->getShardId().toString());
- }
-
- std::unique_ptr<Pipeline, PipelineDeleter> mergePipeline;
- boost::optional<BSONObj> shardCursorsSortSpec = boost::none;
- if (shardDispatchResults.splitPipeline) {
- mergePipeline = std::move(shardDispatchResults.splitPipeline->mergePipeline);
- shardCursorsSortSpec = shardDispatchResults.splitPipeline->shardCursorsSortSpec;
- } else {
- // We have not split the pipeline, and will execute entirely on the remote shards. Set up an
- // empty local pipeline which we will attach the merge cursors stage to.
- mergePipeline = uassertStatusOK(Pipeline::parse(std::vector<BSONObj>(), expCtx));
- }
-
- cluster_aggregation_planner::addMergeCursorsSource(
- mergePipeline.get(),
- liteParsedPipeline,
- shardDispatchResults.commandForTargetedShards,
- std::move(shardDispatchResults.remoteCursors),
- targetedShards,
- shardCursorsSortSpec,
- Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor());
-
- return mergePipeline;
+ return sharded_agg_helpers::targetShardsAndAddMergeCursors(expCtx, ownedPipeline);
}
boost::optional<Document> MongoSInterface::lookupSingleDocument(
diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h
index 1bdcae87595..dc7ec6fa955 100644
--- a/src/mongo/db/pipeline/mongos_process_interface.h
+++ b/src/mongo/db/pipeline/mongos_process_interface.h
@@ -45,37 +45,6 @@ namespace mongo {
*/
class MongoSInterface final : public MongoProcessCommon {
public:
- struct DispatchShardPipelineResults {
- // True if this pipeline was split, and the second half of the pipeline needs to be run on
- // the primary shard for the database.
- bool needsPrimaryShardMerge;
-
- // Populated if this *is not* an explain, this vector represents the cursors on the remote
- // shards.
- std::vector<OwnedRemoteCursor> remoteCursors;
-
- // Populated if this *is* an explain, this vector represents the results from each shard.
- std::vector<AsyncRequestsSender::Response> remoteExplainOutput;
-
- // The split version of the pipeline if more than one shard was targeted, otherwise
- // boost::none.
- boost::optional<cluster_aggregation_planner::SplitPipeline> splitPipeline;
-
- // If the pipeline targeted a single shard, this is the pipeline to run on that shard.
- std::unique_ptr<Pipeline, PipelineDeleter> pipelineForSingleShard;
-
- // The command object to send to the targeted shards.
- BSONObj commandForTargetedShards;
-
- // How many exchange producers are running the shard part of splitPipeline.
- size_t numProducers;
-
- // The exchange specification if the query can run with the exchange otherwise boost::none.
- boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec;
- };
-
- static Shard::RetryPolicy getDesiredRetryPolicy(const AggregationRequest& req);
-
static BSONObj createPassthroughCommandForShard(OperationContext* opCtx,
const AggregationRequest& request,
const boost::optional<ShardId>& shardId,
@@ -101,26 +70,9 @@ public:
const boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec,
bool needsMerge);
- static std::set<ShardId> getTargetedShards(
- OperationContext* opCtx,
- bool mustRunOnAllShards,
- const boost::optional<CachedCollectionRoutingInfo>& routingInfo,
- const BSONObj shardQuery,
- const BSONObj collation);
-
- static bool mustRunOnAllShards(const NamespaceString& nss, const LiteParsedPipeline& litePipe);
-
static StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo(
OperationContext* opCtx, const NamespaceString& execNss);
- static DispatchShardPipelineResults dispatchShardPipeline(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& executionNss,
- const AggregationRequest& aggRequest,
- const LiteParsedPipeline& litePipe,
- std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
- BSONObj collationObj);
-
MongoSInterface() = default;
virtual ~MongoSInterface() = default;
diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.cpp b/src/mongo/db/pipeline/process_interface_shardsvr.cpp
index 76949f4348d..fd005414c02 100644
--- a/src/mongo/db/pipeline/process_interface_shardsvr.cpp
+++ b/src/mongo/db/pipeline/process_interface_shardsvr.cpp
@@ -44,10 +44,14 @@
#include "mongo/db/db_raii.h"
#include "mongo/db/ops/write_ops_exec.h"
#include "mongo/db/ops/write_ops_gen.h"
+#include "mongo/db/pipeline/sharded_agg_helpers.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/sharding_state.h"
+#include "mongo/db/transaction_participant.h"
#include "mongo/s/catalog_cache.h"
+#include "mongo/s/cluster_commands_helpers.h"
#include "mongo/s/grid.h"
+#include "mongo/s/query/document_source_merge_cursors.h"
#include "mongo/s/write_ops/cluster_write.h"
#include "mongo/util/log.h"
@@ -153,4 +157,47 @@ void MongoInterfaceShardServer::update(const boost::intrusive_ptr<ExpressionCont
uassertStatusOKWithContext(response.toStatus(), "Update failed: ");
}
+unique_ptr<Pipeline, PipelineDeleter> MongoInterfaceShardServer::attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* ownedPipeline) {
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline(ownedPipeline,
+ PipelineDeleter(expCtx->opCtx));
+
+ invariant(pipeline->getSources().empty() ||
+ !dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get()));
+
+ // $lookup on a sharded collection is not allowed in a transaction. We assume that if we're in
+ // a transaction, the foreign collection is unsharded. Otherwise, we may access the catalog
+ // cache, and attempt to do a network request while holding locks.
+ // TODO: SERVER-39162 allow $lookup in sharded transactions.
+ auto txnParticipant = TransactionParticipant::get(expCtx->opCtx);
+ const bool inTxn = txnParticipant && txnParticipant->inMultiDocumentTransaction();
+
+ const bool isSharded = [&]() {
+ if (inTxn || !ShardingState::get(expCtx->opCtx)->enabled()) {
+ // Sharding isn't enabled or we're in a transaction. In either case we assume it's
+ // unsharded.
+ return false;
+ } else if (expCtx->ns.db() == "local") {
+ // This may be a change stream examining the oplog. We know the oplog (or any local
+ // collections for that matter) will never be sharded.
+ return false;
+ }
+ return uassertStatusOK(getCollectionRoutingInfoForTxnCmd(expCtx->opCtx, expCtx->ns)).cm() !=
+ nullptr;
+ }();
+
+ if (isSharded) {
+ // For a sharded collection we may have to establish cursors on a remote host.
+ return sharded_agg_helpers::targetShardsAndAddMergeCursors(expCtx, pipeline.release());
+ }
+
+ // Perform a "local read", the same as if we weren't a shard server.
+
+ // TODO SERVER-39015 we should do a shard version check here after we acquire a lock within
+ // this function, to be sure the collection didn't become sharded between the time we checked
+ // whether it's sharded and the time we took the lock.
+
+ return MongoInterfaceStandalone::attachCursorSourceToPipeline(expCtx, pipeline.release());
+}
+
} // namespace mongo
diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.h b/src/mongo/db/pipeline/process_interface_shardsvr.h
index 22c54a8a6cf..75391ea7410 100644
--- a/src/mongo/db/pipeline/process_interface_shardsvr.h
+++ b/src/mongo/db/pipeline/process_interface_shardsvr.h
@@ -83,6 +83,9 @@ public:
bool upsert,
bool multi,
boost::optional<OID> targetEpoch) final;
+
+ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp
index f7736cd41b1..e9de70428dc 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.cpp
+++ b/src/mongo/db/pipeline/process_interface_standalone.cpp
@@ -45,6 +45,7 @@
#include "mongo/db/db_raii.h"
#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/pipeline/document_source_cursor.h"
+#include "mongo/db/pipeline/lite_parsed_pipeline.h"
#include "mongo/db/pipeline/pipeline_d.h"
#include "mongo/db/repl/speculative_majority_read_info.h"
#include "mongo/db/s/collection_sharding_state.h"
@@ -55,6 +56,8 @@
#include "mongo/db/stats/storage_stats.h"
#include "mongo/db/storage/backup_cursor_hooks.h"
#include "mongo/db/transaction_participant.h"
+#include "mongo/s/cluster_commands_helpers.h"
+#include "mongo/s/query/document_source_merge_cursors.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -340,29 +343,14 @@ unique_ptr<Pipeline, PipelineDeleter> MongoInterfaceStandalone::attachCursorSour
!dynamic_cast<DocumentSourceCursor*>(pipeline->getSources().front().get()));
boost::optional<AutoGetCollectionForReadCommand> autoColl;
- if (expCtx->uuid) {
- autoColl.emplace(expCtx->opCtx,
- NamespaceStringOrUUID{expCtx->ns.db().toString(), *expCtx->uuid},
- AutoGetCollection::ViewMode::kViewsForbidden,
- Date_t::max(),
- AutoStatsTracker::LogMode::kUpdateTop);
- } else {
- autoColl.emplace(expCtx->opCtx,
- expCtx->ns,
- AutoGetCollection::ViewMode::kViewsForbidden,
- Date_t::max(),
- AutoStatsTracker::LogMode::kUpdateTop);
- }
-
- // makePipeline() is only called to perform secondary aggregation requests and expects the
- // collection representing the document source to be not-sharded. We confirm sharding state
- // here to avoid taking a collection lock elsewhere for this purpose alone.
- // TODO SERVER-27616: This check is incorrect in that we don't acquire a collection cursor
- // until after we release the lock, leaving room for a collection to be sharded in-between.
- auto css = CollectionShardingState::get(expCtx->opCtx, expCtx->ns);
- uassert(4567,
- str::stream() << "from collection (" << expCtx->ns.ns() << ") cannot be sharded",
- !css->getMetadataForOperation(expCtx->opCtx)->isSharded());
+ const NamespaceStringOrUUID nsOrUUID = expCtx->uuid
+ ? NamespaceStringOrUUID{expCtx->ns.db().toString(), *expCtx->uuid}
+ : expCtx->ns;
+ autoColl.emplace(expCtx->opCtx,
+ nsOrUUID,
+ AutoGetCollection::ViewMode::kViewsForbidden,
+ Date_t::max(),
+ AutoStatsTracker::LogMode::kUpdateTop);
PipelineD::prepareCursorSource(autoColl->getCollection(), expCtx->ns, nullptr, pipeline.get());
diff --git a/src/mongo/db/pipeline/process_interface_standalone.h b/src/mongo/db/pipeline/process_interface_standalone.h
index b4331733639..a04b2a6859d 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.h
+++ b/src/mongo/db/pipeline/process_interface_standalone.h
@@ -92,7 +92,7 @@ public:
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MakePipelineOptions opts = MakePipelineOptions{}) final;
std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
- const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final;
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) override;
std::string getShardName(OperationContext* opCtx) const final;
std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFieldsForHostedCollection(
OperationContext* opCtx, const NamespaceString&, UUID) const override;
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
new file mode 100644
index 00000000000..cb1f44f7092
--- /dev/null
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -0,0 +1,488 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand
+
+#include "mongo/platform/basic.h"
+
+#include "sharded_agg_helpers.h"
+
+#include "mongo/db/curop.h"
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/s/catalog_cache.h"
+#include "mongo/s/cluster_commands_helpers.h"
+#include "mongo/s/query/cluster_query_knobs.h"
+#include "mongo/s/query/document_source_merge_cursors.h"
+#include "mongo/s/transaction_router.h"
+#include "mongo/util/fail_point.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+namespace sharded_agg_helpers {
+
+MONGO_FAIL_POINT_DEFINE(clusterAggregateHangBeforeEstablishingShardCursors);
+
+/**
+ * Given a document representing an aggregation command such as
+ * {aggregate: "myCollection", pipeline: [], ...},
+ *
+ * produces the corresponding explain command:
+ * {explain: {aggregate: "myCollection", pipline: [], ...}, $queryOptions: {...}, verbosity: ...}
+ */
+Document wrapAggAsExplain(Document aggregateCommand, ExplainOptions::Verbosity verbosity) {
+ MutableDocument explainCommandBuilder;
+ explainCommandBuilder["explain"] = Value(aggregateCommand);
+ // Downstream host targeting code expects queryOptions at the top level of the command object.
+ explainCommandBuilder[QueryRequest::kUnwrappedReadPrefField] =
+ Value(aggregateCommand[QueryRequest::kUnwrappedReadPrefField]);
+
+ // readConcern needs to be promoted to the top-level of the request.
+ explainCommandBuilder[repl::ReadConcernArgs::kReadConcernFieldName] =
+ Value(aggregateCommand[repl::ReadConcernArgs::kReadConcernFieldName]);
+
+ // Add explain command options.
+ for (auto&& explainOption : ExplainOptions::toBSON(verbosity)) {
+ explainCommandBuilder[explainOption.fieldNameStringData()] = Value(explainOption);
+ }
+
+ return explainCommandBuilder.freeze();
+}
+
+BSONObj createPassthroughCommandForShard(OperationContext* opCtx,
+ const AggregationRequest& request,
+ const boost::optional<ShardId>& shardId,
+ Pipeline* pipeline,
+ BSONObj collationObj) {
+ // Create the command for the shards.
+ MutableDocument targetedCmd(request.serializeToCommandObj());
+ if (pipeline) {
+ targetedCmd[AggregationRequest::kPipelineName] = Value(pipeline->serialize());
+ }
+
+ return genericTransformForShards(std::move(targetedCmd), opCtx, shardId, request, collationObj);
+}
+
+BSONObj genericTransformForShards(MutableDocument&& cmdForShards,
+ OperationContext* opCtx,
+ const boost::optional<ShardId>& shardId,
+ const AggregationRequest& request,
+ BSONObj collationObj) {
+ cmdForShards[AggregationRequest::kFromMongosName] = Value(true);
+ // If this is a request for an aggregation explain, then we must wrap the aggregate inside an
+ // explain command.
+ if (auto explainVerbosity = request.getExplain()) {
+ cmdForShards.reset(wrapAggAsExplain(cmdForShards.freeze(), *explainVerbosity));
+ }
+
+ if (!collationObj.isEmpty()) {
+ cmdForShards[AggregationRequest::kCollationName] = Value(collationObj);
+ }
+
+ if (opCtx->getTxnNumber()) {
+ invariant(cmdForShards.peek()[OperationSessionInfo::kTxnNumberFieldName].missing(),
+ str::stream() << "Command for shards unexpectedly had the "
+ << OperationSessionInfo::kTxnNumberFieldName
+ << " field set: "
+ << cmdForShards.peek().toString());
+ cmdForShards[OperationSessionInfo::kTxnNumberFieldName] =
+ Value(static_cast<long long>(*opCtx->getTxnNumber()));
+ }
+
+ auto aggCmd = cmdForShards.freeze().toBson();
+
+ if (shardId) {
+ if (auto txnRouter = TransactionRouter::get(opCtx)) {
+ aggCmd = txnRouter->attachTxnFieldsIfNeeded(*shardId, aggCmd);
+ }
+ }
+
+ // agg creates temp collection and should handle implicit create separately.
+ return appendAllowImplicitCreate(aggCmd, true);
+}
+
+StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo(OperationContext* opCtx,
+ const NamespaceString& execNss) {
+ // First, verify that there are shards present in the cluster. If not, then we return the
+ // stronger 'ShardNotFound' error rather than 'NamespaceNotFound'. We must do this because
+ // $changeStream aggregations ignore NamespaceNotFound in order to allow streams to be opened on
+ // a collection before its enclosing database is created. However, if there are no shards
+ // present, then $changeStream should immediately return an empty cursor just as other
+ // aggregations do when the database does not exist.
+ std::vector<ShardId> shardIds;
+ Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx, &shardIds);
+ if (shardIds.size() == 0) {
+ return {ErrorCodes::ShardNotFound, "No shards are present in the cluster"};
+ }
+
+ // This call to getCollectionRoutingInfoForTxnCmd will return !OK if the database does not
+ // exist.
+ return getCollectionRoutingInfoForTxnCmd(opCtx, execNss);
+}
+
+Shard::RetryPolicy getDesiredRetryPolicy(const AggregationRequest& req) {
+ // The idempotent retry policy will retry even for writeConcern failures, so only set it if the
+ // pipeline does not support writeConcern.
+ if (req.getWriteConcern()) {
+ return Shard::RetryPolicy::kNotIdempotent;
+ }
+ return Shard::RetryPolicy::kIdempotent;
+}
+
+bool mustRunOnAllShards(const NamespaceString& nss, const LiteParsedPipeline& litePipe) {
+ // The following aggregations must be routed to all shards:
+ // - Any collectionless aggregation, such as non-localOps $currentOp.
+ // - Any aggregation which begins with a $changeStream stage.
+ return nss.isCollectionlessAggregateNS() || litePipe.hasChangeStream();
+}
+BSONObj createCommandForTargetedShards(
+ OperationContext* opCtx,
+ const AggregationRequest& request,
+ const LiteParsedPipeline& litePipe,
+ const cluster_aggregation_planner::SplitPipeline& splitPipeline,
+ const BSONObj collationObj,
+ const boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec,
+ bool needsMerge) {
+
+ // Create the command for the shards.
+ MutableDocument targetedCmd(request.serializeToCommandObj());
+ // If we've parsed a pipeline on mongos, always override the pipeline, in case parsing it
+ // has defaulted any arguments or otherwise changed the spec. For example, $listSessions may
+ // have detected a logged in user and appended that user name to the $listSessions spec to
+ // send to the shards.
+ targetedCmd[AggregationRequest::kPipelineName] =
+ Value(splitPipeline.shardsPipeline->serialize());
+
+ // When running on many shards with the exchange we may not need merging.
+ if (needsMerge) {
+ targetedCmd[AggregationRequest::kNeedsMergeName] = Value(true);
+
+ // If this is a change stream aggregation, set the 'mergeByPBRT' flag on the command. This
+ // notifies the shards that the mongoS is capable of merging streams based on resume token.
+ // TODO SERVER-38539: the 'mergeByPBRT' flag is no longer necessary in 4.4.
+ targetedCmd[AggregationRequest::kMergeByPBRTName] = Value(litePipe.hasChangeStream());
+
+ // For split pipelines which need merging, do *not* propagate the writeConcern to the shards
+ // part. Otherwise this is part of an exchange and in that case we should include the
+ // writeConcern.
+ targetedCmd[WriteConcernOptions::kWriteConcernField] = Value();
+ }
+
+ targetedCmd[AggregationRequest::kCursorName] =
+ Value(DOC(AggregationRequest::kBatchSizeName << 0));
+
+ targetedCmd[AggregationRequest::kExchangeName] =
+ exchangeSpec ? Value(exchangeSpec->exchangeSpec.toBSON()) : Value();
+
+ return genericTransformForShards(
+ std::move(targetedCmd), opCtx, boost::none, request, collationObj);
+}
+
+/**
+ * Targets shards for the pipeline and returns a struct with the remote cursors or results, and
+ * the pipeline that will need to be executed to merge the results from the remotes. If a stale
+ * shard version is encountered, refreshes the routing table and tries again.
+ */
+DispatchShardPipelineResults dispatchShardPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& executionNss,
+ const AggregationRequest& aggRequest,
+ const LiteParsedPipeline& litePipe,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
+ BSONObj collationObj) {
+ // The process is as follows:
+ // - First, determine whether we need to target more than one shard. If so, we split the
+ // pipeline; if not, we retain the existing pipeline.
+ // - Call establishShardCursors to dispatch the aggregation to the targeted shards.
+ // - Stale shard version errors are thrown up to the top-level handler, causing a retry on the
+ // entire aggregation commmand.
+ auto cursors = std::vector<RemoteCursor>();
+ auto shardResults = std::vector<AsyncRequestsSender::Response>();
+ auto opCtx = expCtx->opCtx;
+
+ const bool needsPrimaryShardMerge =
+ (pipeline->needsPrimaryShardMerger() || internalQueryAlwaysMergeOnPrimaryShard.load());
+
+ const bool needsMongosMerge = pipeline->needsMongosMerger();
+
+ const auto shardQuery = pipeline->getInitialQuery();
+
+ auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, executionNss);
+
+ // If this is a $changeStream, we swallow NamespaceNotFound exceptions and continue.
+ // Otherwise, uassert on all exceptions here.
+ if (!(litePipe.hasChangeStream() &&
+ executionNsRoutingInfoStatus == ErrorCodes::NamespaceNotFound)) {
+ uassertStatusOK(executionNsRoutingInfoStatus);
+ }
+
+ auto executionNsRoutingInfo = executionNsRoutingInfoStatus.isOK()
+ ? std::move(executionNsRoutingInfoStatus.getValue())
+ : boost::optional<CachedCollectionRoutingInfo>{};
+
+ // Determine whether we can run the entire aggregation on a single shard.
+ const bool mustRunOnAll = mustRunOnAllShards(executionNss, litePipe);
+ std::set<ShardId> shardIds = getTargetedShards(
+ opCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, aggRequest.getCollation());
+
+ // Don't need to split the pipeline if we are only targeting a single shard, unless:
+ // - There is a stage that needs to be run on the primary shard and the single target shard
+ // is not the primary.
+ // - The pipeline contains one or more stages which must always merge on mongoS.
+ const bool needsSplit = (shardIds.size() > 1u || needsMongosMerge ||
+ (needsPrimaryShardMerge && executionNsRoutingInfo &&
+ *(shardIds.begin()) != executionNsRoutingInfo->db().primaryId()));
+
+ boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec;
+ boost::optional<cluster_aggregation_planner::SplitPipeline> splitPipeline;
+
+ if (needsSplit) {
+ splitPipeline = cluster_aggregation_planner::splitPipeline(std::move(pipeline));
+
+ exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange(
+ opCtx, splitPipeline->mergePipeline.get());
+ }
+
+ // Generate the command object for the targeted shards.
+ BSONObj targetedCommand = splitPipeline
+ ? createCommandForTargetedShards(
+ opCtx, aggRequest, litePipe, *splitPipeline, collationObj, exchangeSpec, true)
+ : createPassthroughCommandForShard(
+ opCtx, aggRequest, boost::none, pipeline.get(), collationObj);
+
+ // Refresh the shard registry if we're targeting all shards. We need the shard registry
+ // to be at least as current as the logical time used when creating the command for
+ // $changeStream to work reliably, so we do a "hard" reload.
+ if (mustRunOnAll) {
+ auto* shardRegistry = Grid::get(opCtx)->shardRegistry();
+ if (!shardRegistry->reload(opCtx)) {
+ shardRegistry->reload(opCtx);
+ }
+ // Rebuild the set of shards as the shard registry might have changed.
+ shardIds = getTargetedShards(
+ opCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, aggRequest.getCollation());
+ }
+
+ // Explain does not produce a cursor, so instead we scatter-gather commands to the shards.
+ if (expCtx->explain) {
+ if (mustRunOnAll) {
+ // Some stages (such as $currentOp) need to be broadcast to all shards, and
+ // should not participate in the shard version protocol.
+ shardResults =
+ scatterGatherUnversionedTargetAllShards(opCtx,
+ executionNss.db(),
+ targetedCommand,
+ ReadPreferenceSetting::get(opCtx),
+ Shard::RetryPolicy::kIdempotent);
+ } else {
+ // Aggregations on a real namespace should use the routing table to target
+ // shards, and should participate in the shard version protocol.
+ invariant(executionNsRoutingInfo);
+ shardResults =
+ scatterGatherVersionedTargetByRoutingTable(opCtx,
+ executionNss.db(),
+ executionNss,
+ *executionNsRoutingInfo,
+ targetedCommand,
+ ReadPreferenceSetting::get(opCtx),
+ Shard::RetryPolicy::kIdempotent,
+ shardQuery,
+ aggRequest.getCollation());
+ }
+ } else {
+ cursors = establishShardCursors(opCtx,
+ executionNss,
+ litePipe,
+ executionNsRoutingInfo,
+ targetedCommand,
+ aggRequest,
+ ReadPreferenceSetting::get(opCtx),
+ shardQuery);
+ invariant(cursors.size() % shardIds.size() == 0,
+ str::stream() << "Number of cursors (" << cursors.size()
+ << ") is not a multiple of producers ("
+ << shardIds.size()
+ << ")");
+ }
+
+ // Convert remote cursors into a vector of "owned" cursors.
+ std::vector<OwnedRemoteCursor> ownedCursors;
+ for (auto&& cursor : cursors) {
+ ownedCursors.emplace_back(OwnedRemoteCursor(opCtx, std::move(cursor), executionNss));
+ }
+
+ // Record the number of shards involved in the aggregation. If we are required to merge on
+ // the primary shard, but the primary shard was not in the set of targeted shards, then we
+ // must increment the number of involved shards.
+ CurOp::get(opCtx)->debug().nShards =
+ shardIds.size() + (needsPrimaryShardMerge && executionNsRoutingInfo &&
+ !shardIds.count(executionNsRoutingInfo->db().primaryId()));
+
+ return DispatchShardPipelineResults{needsPrimaryShardMerge,
+ std::move(ownedCursors),
+ std::move(shardResults),
+ std::move(splitPipeline),
+ std::move(pipeline),
+ targetedCommand,
+ shardIds.size(),
+ exchangeSpec};
+}
+
+std::set<ShardId> getTargetedShards(OperationContext* opCtx,
+ bool mustRunOnAllShards,
+ const boost::optional<CachedCollectionRoutingInfo>& routingInfo,
+ const BSONObj shardQuery,
+ const BSONObj collation) {
+ if (mustRunOnAllShards) {
+ // The pipeline begins with a stage which must be run on all shards.
+ std::vector<ShardId> shardIds;
+ Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx, &shardIds);
+ return {shardIds.begin(), shardIds.end()};
+ }
+
+ // If we don't need to run on all shards, then we should always have a valid routing table.
+ invariant(routingInfo);
+
+ return getTargetedShardsForQuery(opCtx, *routingInfo, shardQuery, collation);
+}
+
+std::vector<RemoteCursor> establishShardCursors(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const LiteParsedPipeline& litePipe,
+ boost::optional<CachedCollectionRoutingInfo>& routingInfo,
+ const BSONObj& cmdObj,
+ const AggregationRequest& request,
+ const ReadPreferenceSetting& readPref,
+ const BSONObj& shardQuery) {
+ LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards";
+
+ const bool mustRunOnAll = mustRunOnAllShards(nss, litePipe);
+ std::set<ShardId> shardIds =
+ getTargetedShards(opCtx, mustRunOnAll, routingInfo, shardQuery, request.getCollation());
+ std::vector<std::pair<ShardId, BSONObj>> requests;
+
+ // If we don't need to run on all shards, then we should always have a valid routing table.
+ invariant(routingInfo || mustRunOnAll);
+
+ if (mustRunOnAll) {
+ // The pipeline contains a stage which must be run on all shards. Skip versioning and
+ // enqueue the raw command objects.
+ for (auto&& shardId : shardIds) {
+ requests.emplace_back(std::move(shardId), cmdObj);
+ }
+ } else if (routingInfo->cm()) {
+ // The collection is sharded. Use the routing table to decide which shards to target
+ // based on the query and collation, and build versioned requests for them.
+ for (auto& shardId : shardIds) {
+ auto versionedCmdObj =
+ appendShardVersion(cmdObj, routingInfo->cm()->getVersion(shardId));
+ requests.emplace_back(std::move(shardId), std::move(versionedCmdObj));
+ }
+ } else {
+ // The collection is unsharded. Target only the primary shard for the database.
+ // Don't append shard version info when contacting the config servers.
+ requests.emplace_back(routingInfo->db().primaryId(),
+ !routingInfo->db().primary()->isConfig()
+ ? appendShardVersion(cmdObj, ChunkVersion::UNSHARDED())
+ : cmdObj);
+ }
+
+ if (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) {
+ log() << "clusterAggregateHangBeforeEstablishingShardCursors fail point enabled. Blocking "
+ "until fail point is disabled.";
+ while (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) {
+ sleepsecs(1);
+ }
+ }
+
+ return establishCursors(opCtx,
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ nss,
+ readPref,
+ requests,
+ false /* do not allow partial results */,
+ getDesiredRetryPolicy(request));
+}
+
+std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* ownedPipeline) {
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline(ownedPipeline,
+ PipelineDeleter(expCtx->opCtx));
+
+ invariant(pipeline->getSources().empty() ||
+ !dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get()));
+
+ // Generate the command object for the targeted shards.
+ std::vector<BSONObj> rawStages = [&pipeline]() {
+ auto serialization = pipeline->serialize();
+ std::vector<BSONObj> stages;
+ stages.reserve(serialization.size());
+
+ for (const auto& stageObj : serialization) {
+ invariant(stageObj.getType() == BSONType::Object);
+ stages.push_back(stageObj.getDocument().toBson());
+ }
+
+ return stages;
+ }();
+
+ AggregationRequest aggRequest(expCtx->ns, rawStages);
+ LiteParsedPipeline liteParsedPipeline(aggRequest);
+ auto shardDispatchResults = dispatchShardPipeline(
+ expCtx, expCtx->ns, aggRequest, liteParsedPipeline, std::move(pipeline), expCtx->collation);
+
+ std::vector<ShardId> targetedShards;
+ targetedShards.reserve(shardDispatchResults.remoteCursors.size());
+ for (auto&& remoteCursor : shardDispatchResults.remoteCursors) {
+ targetedShards.emplace_back(remoteCursor->getShardId().toString());
+ }
+
+ std::unique_ptr<Pipeline, PipelineDeleter> mergePipeline;
+ boost::optional<BSONObj> shardCursorsSortSpec = boost::none;
+ if (shardDispatchResults.splitPipeline) {
+ mergePipeline = std::move(shardDispatchResults.splitPipeline->mergePipeline);
+ shardCursorsSortSpec = shardDispatchResults.splitPipeline->shardCursorsSortSpec;
+ } else {
+ // We have not split the pipeline, and will execute entirely on the remote shards. Set up an
+ // empty local pipeline which we will attach the merge cursors stage to.
+ mergePipeline = uassertStatusOK(Pipeline::parse(std::vector<BSONObj>(), expCtx));
+ }
+
+ cluster_aggregation_planner::addMergeCursorsSource(
+ mergePipeline.get(),
+ liteParsedPipeline,
+ shardDispatchResults.commandForTargetedShards,
+ std::move(shardDispatchResults.remoteCursors),
+ targetedShards,
+ shardCursorsSortSpec,
+ Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor());
+
+ return mergePipeline;
+}
+} // namespace sharded_agg_helpers
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.h b/src/mongo/db/pipeline/sharded_agg_helpers.h
new file mode 100644
index 00000000000..f1dc2c2d91e
--- /dev/null
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.h
@@ -0,0 +1,137 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/pipeline.h"
+#include "mongo/s/async_requests_sender.h"
+#include "mongo/s/catalog_cache.h"
+#include "mongo/s/query/cluster_aggregation_planner.h"
+
+namespace mongo {
+class CachedCollectionRoutingInfo;
+
+namespace sharded_agg_helpers {
+struct DispatchShardPipelineResults {
+ // True if this pipeline was split, and the second half of the pipeline needs to be run on
+ // the primary shard for the database.
+ bool needsPrimaryShardMerge;
+
+ // Populated if this *is not* an explain, this vector represents the cursors on the remote
+ // shards.
+ std::vector<OwnedRemoteCursor> remoteCursors;
+
+ // Populated if this *is* an explain, this vector represents the results from each shard.
+ std::vector<AsyncRequestsSender::Response> remoteExplainOutput;
+
+ // The split version of the pipeline if more than one shard was targeted, otherwise
+ // boost::none.
+ boost::optional<cluster_aggregation_planner::SplitPipeline> splitPipeline;
+
+ // If the pipeline targeted a single shard, this is the pipeline to run on that shard.
+ std::unique_ptr<Pipeline, PipelineDeleter> pipelineForSingleShard;
+
+ // The command object to send to the targeted shards.
+ BSONObj commandForTargetedShards;
+
+ // How many exchange producers are running the shard part of splitPipeline.
+ size_t numProducers;
+
+ // The exchange specification if the query can run with the exchange otherwise boost::none.
+ boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec;
+};
+
+Shard::RetryPolicy getDesiredRetryPolicy(const AggregationRequest& req);
+
+bool mustRunOnAllShards(const NamespaceString& nss, const LiteParsedPipeline& litePipe);
+
+StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo(OperationContext* opCtx,
+ const NamespaceString& execNss);
+
+/**
+ * Targets shards for the pipeline and returns a struct with the remote cursors or results, and the
+ * pipeline that will need to be executed to merge the results from the remotes. If a stale shard
+ * version is encountered, refreshes the routing table and tries again.
+ */
+DispatchShardPipelineResults dispatchShardPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& executionNss,
+ const AggregationRequest& aggRequest,
+ const LiteParsedPipeline& liteParsedPipeline,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
+ BSONObj collationObj);
+
+std::set<ShardId> getTargetedShards(OperationContext* opCtx,
+ bool mustRunOnAllShards,
+ const boost::optional<CachedCollectionRoutingInfo>& routingInfo,
+ const BSONObj shardQuery,
+ const BSONObj collation);
+
+std::vector<RemoteCursor> establishShardCursors(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const LiteParsedPipeline& litePipe,
+ boost::optional<CachedCollectionRoutingInfo>& routingInfo,
+ const BSONObj& cmdObj,
+ const AggregationRequest& request,
+ const ReadPreferenceSetting& readPref,
+ const BSONObj& shardQuery);
+
+BSONObj createCommandForTargetedShards(
+ OperationContext* opCtx,
+ const AggregationRequest& request,
+ const LiteParsedPipeline& litePipe,
+ const cluster_aggregation_planner::SplitPipeline& splitPipeline,
+ const BSONObj collationObj,
+ const boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec,
+ bool needsMerge);
+
+BSONObj createPassthroughCommandForShard(OperationContext* opCtx,
+ const AggregationRequest& request,
+ const boost::optional<ShardId>& shardId,
+ Pipeline* pipeline,
+ BSONObj collationObj);
+
+BSONObj genericTransformForShards(MutableDocument&& cmdForShards,
+ OperationContext* opCtx,
+ const boost::optional<ShardId>& shardId,
+ const AggregationRequest& request,
+ BSONObj collationObj);
+
+/**
+ * For a sharded collection, establishes remote cursors on each shard that may have results, and
+ * creates a DocumentSourceMergeCursors stage to merge the remove cursors. Returns a pipeline
+ * beginning with that DocumentSourceMergeCursors stage. Note that one of the 'remote' cursors might
+ * be this node itself.
+ */
+std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* ownedPipeline);
+
+} // namespace sharded_agg_helpers
+} // namespace mongo
diff --git a/src/mongo/s/cluster_commands_helpers.cpp b/src/mongo/s/cluster_commands_helpers.cpp
index 062e192140b..13ca158622f 100644
--- a/src/mongo/s/cluster_commands_helpers.cpp
+++ b/src/mongo/s/cluster_commands_helpers.cpp
@@ -553,6 +553,7 @@ std::set<ShardId> getTargetedShardsForQuery(OperationContext* opCtx,
StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfoForTxnCmd(
OperationContext* opCtx, const NamespaceString& nss) {
auto catalogCache = Grid::get(opCtx)->catalogCache();
+ invariant(catalogCache);
// Return the latest routing table if not running in a transaction with snapshot level read
// concern.
diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript
index d06c2e91373..08042d1f9f9 100644
--- a/src/mongo/s/query/SConscript
+++ b/src/mongo/s/query/SConscript
@@ -32,6 +32,7 @@ env.Library(
'$BUILD_DIR/mongo/s/query/cluster_aggregation_planner',
'$BUILD_DIR/mongo/s/query/cluster_client_cursor',
'$BUILD_DIR/mongo/db/pipeline/mongos_process_interface',
+ '$BUILD_DIR/mongo/db/pipeline/sharded_agg_helpers',
'$BUILD_DIR/mongo/db/views/views',
'cluster_query',
]
diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp
index 568f96021f4..d2071b01fc6 100644
--- a/src/mongo/s/query/cluster_aggregate.cpp
+++ b/src/mongo/s/query/cluster_aggregate.cpp
@@ -49,6 +49,7 @@
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
#include "mongo/db/pipeline/mongos_process_interface.h"
#include "mongo/db/pipeline/pipeline.h"
+#include "mongo/db/pipeline/sharded_agg_helpers.h"
#include "mongo/db/query/collation/collator_factory_interface.h"
#include "mongo/db/query/cursor_response.h"
#include "mongo/db/query/find_common.h"
@@ -140,13 +141,13 @@ BSONObj createCommandForMergingShard(const AggregationRequest& request,
return appendAllowImplicitCreate(aggCmd, true);
}
-MongoSInterface::DispatchShardPipelineResults dispatchExchangeConsumerPipeline(
+sharded_agg_helpers::DispatchShardPipelineResults dispatchExchangeConsumerPipeline(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& executionNss,
const AggregationRequest& request,
const LiteParsedPipeline& litePipe,
BSONObj collationObj,
- MongoSInterface::DispatchShardPipelineResults* shardDispatchResults) {
+ sharded_agg_helpers::DispatchShardPipelineResults* shardDispatchResults) {
invariant(!litePipe.hasChangeStream());
auto opCtx = expCtx->opCtx;
@@ -183,7 +184,7 @@ MongoSInterface::DispatchShardPipelineResults dispatchExchangeConsumerPipeline(
consumerPipelines.emplace_back(std::move(consumerPipeline), nullptr, boost::none);
- auto consumerCmdObj = MongoSInterface::createCommandForTargetedShards(
+ auto consumerCmdObj = sharded_agg_helpers::createCommandForTargetedShards(
opCtx, request, litePipe, consumerPipelines.back(), collationObj, boost::none, false);
requests.emplace_back(shardDispatchResults->exchangeSpec->consumerShards[idx],
@@ -216,16 +217,16 @@ MongoSInterface::DispatchShardPipelineResults dispatchExchangeConsumerPipeline(
static_cast<DocumentSourceMergeCursors*>(pipeline.shardsPipeline->peekFront());
mergeCursors->dismissCursorOwnership();
}
- return MongoSInterface::DispatchShardPipelineResults{false,
- std::move(ownedCursors),
- {} /*TODO SERVER-36279*/,
- std::move(splitPipeline),
- nullptr,
- BSONObj(),
- numConsumers};
+ return sharded_agg_helpers::DispatchShardPipelineResults{false,
+ std::move(ownedCursors),
+ {} /*TODO SERVER-36279*/,
+ std::move(splitPipeline),
+ nullptr,
+ BSONObj(),
+ numConsumers};
}
-Status appendExplainResults(MongoSInterface::DispatchShardPipelineResults&& dispatchResults,
+Status appendExplainResults(sharded_agg_helpers::DispatchShardPipelineResults&& dispatchResults,
const boost::intrusive_ptr<ExpressionContext>& mergeCtx,
BSONObjBuilder* result) {
if (dispatchResults.splitPipeline) {
@@ -287,7 +288,7 @@ Shard::CommandResponse establishMergingShardCursor(OperationContext* opCtx,
const auto mergingShard =
uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, mergingShardId));
- Shard::RetryPolicy retryPolicy = MongoSInterface::getDesiredRetryPolicy(request);
+ Shard::RetryPolicy retryPolicy = sharded_agg_helpers::getDesiredRetryPolicy(request);
return uassertStatusOK(mergingShard->runCommandWithFixedRetryAttempts(
opCtx, ReadPreferenceSetting::get(opCtx), nss.db().toString(), mergeCmdObj, retryPolicy));
}
@@ -599,13 +600,14 @@ Status runPipelineOnMongoS(const boost::intrusive_ptr<ExpressionContext>& expCtx
return getStatusFromCommandResult(result->asTempObj());
}
-Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const ClusterAggregate::Namespaces& namespaces,
- const AggregationRequest& request,
- const LiteParsedPipeline& litePipe,
- const boost::optional<CachedCollectionRoutingInfo>& routingInfo,
- MongoSInterface::DispatchShardPipelineResults&& shardDispatchResults,
- BSONObjBuilder* result) {
+Status dispatchMergingPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const ClusterAggregate::Namespaces& namespaces,
+ const AggregationRequest& request,
+ const LiteParsedPipeline& litePipe,
+ const boost::optional<CachedCollectionRoutingInfo>& routingInfo,
+ sharded_agg_helpers::DispatchShardPipelineResults&& shardDispatchResults,
+ BSONObjBuilder* result) {
// We should never be in a situation where we call this function on a non-merge pipeline.
invariant(shardDispatchResults.splitPipeline);
auto* mergePipeline = shardDispatchResults.splitPipeline->mergePipeline.get();
@@ -689,7 +691,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
BSONObjBuilder* result) {
uassert(51028, "Cannot specify exchange option to a mongos", !request.getExchangeSpec());
auto executionNsRoutingInfoStatus =
- MongoSInterface::getExecutionNsRoutingInfo(opCtx, namespaces.executionNss);
+ sharded_agg_helpers::getExecutionNsRoutingInfo(opCtx, namespaces.executionNss);
boost::optional<CachedCollectionRoutingInfo> routingInfo;
LiteParsedPipeline litePipe(request);
const auto isSharded = [](OperationContext* opCtx, const NamespaceString& nss) -> bool {
@@ -717,7 +719,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
// Determine whether this aggregation must be dispatched to all shards in the cluster.
const bool mustRunOnAll =
- MongoSInterface::mustRunOnAllShards(namespaces.executionNss, litePipe);
+ sharded_agg_helpers::mustRunOnAllShards(namespaces.executionNss, litePipe);
// If we don't have a routing table, then this is a $changeStream which must run on all shards.
invariant(routingInfo || (mustRunOnAll && litePipe.hasChangeStream()));
@@ -768,7 +770,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
}
// If not, split the pipeline as necessary and dispatch to the relevant shards.
- auto shardDispatchResults = MongoSInterface::dispatchShardPipeline(
+ auto shardDispatchResults = sharded_agg_helpers::dispatchShardPipeline(
expCtx, namespaces.executionNss, request, litePipe, std::move(pipeline), collationObj);
// If the operation is an explain, then we verify that it succeeded on all targeted shards,
@@ -846,7 +848,7 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx,
// Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an
// explain if necessary, and rewrites the result into a format safe to forward to shards.
BSONObj cmdObj = CommandHelpers::filterCommandRequestForPassthrough(
- MongoSInterface::createPassthroughCommandForShard(
+ sharded_agg_helpers::createPassthroughCommandForShard(
opCtx, aggRequest, shardId, nullptr, BSONObj()));
auto cmdResponse = uassertStatusOK(shard->runCommandWithFixedRetryAttempts(