summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2018-04-23 23:20:31 +0100
committerBernard Gorman <bernard.gorman@gmail.com>2018-04-29 16:16:16 +0100
commit793e59f11b558db3d833a12ec23bbacc359011a1 (patch)
tree14f832460035d1f604341677fc2790986177e0ff
parent4ddf18bcf4d517c3dc0f005f9222ffaab9a86ffa (diff)
downloadmongo-793e59f11b558db3d833a12ec23bbacc359011a1.tar.gz
SERVER-34138 Allow change stream to be opened against non-existent DB and collection in sharded cluster
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml5
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml5
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml5
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml5
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml5
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml5
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml1
-rw-r--r--jstests/change_streams/change_stream_collation.js4
-rw-r--r--jstests/change_streams/change_stream_does_not_implicitly_create_database.js81
-rw-r--r--jstests/sharding/change_stream_no_shards.js39
-rw-r--r--jstests/sharding/change_streams_whole_db.js8
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp8
-rw-r--r--src/mongo/s/commands/cluster_aggregate.cpp514
-rw-r--r--src/mongo/s/query/store_possible_cursor.cpp35
-rw-r--r--src/mongo/s/query/store_possible_cursor.h21
16 files changed, 465 insertions, 277 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml
index c8bab397c18..5af7b8cd63b 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml
@@ -64,8 +64,3 @@ executor:
periodicNoopIntervalSecs: 1
writePeriodicNoops: true
num_rs_nodes_per_shard: 1
- # TODO SERVER-34138: This suite doesn't actually shard any collections, but enabling sharding
- # will prevent read commands against non-existent databases from unconditionally returning a
- # CursorId of 0.
- enable_sharding:
- - test
diff --git a/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml b/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml
index 667291fa1ef..712986a1950 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml
@@ -82,8 +82,3 @@ executor:
num_rs_nodes_per_shard: 2
shard_options:
voting_secondaries: true
- # TODO SERVER-34138: This suite doesn't actually shard any collections, but enabling sharding
- # will prevent read commands against non-existent databases from unconditionally returning a
- # CursorId of 0.
- enable_sharding:
- - test
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml
index a34bf9bda63..086f96021c3 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml
@@ -67,8 +67,3 @@ executor:
periodicNoopIntervalSecs: 1
writePeriodicNoops: true
num_rs_nodes_per_shard: 1
- # TODO SERVER-34138: This suite doesn't actually shard any collections, but enabling sharding
- # will prevent read commands against non-existent databases from unconditionally returning a
- # CursorId of 0.
- enable_sharding:
- - test
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml
index eff8a859473..651fad01bed 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml
@@ -83,8 +83,3 @@ executor:
num_rs_nodes_per_shard: 2
shard_options:
voting_secondaries: true
- # TODO SERVER-34138: This suite doesn't actually shard any collections, but enabling sharding
- # will prevent read commands against non-existent databases from unconditionally returning a
- # CursorId of 0.
- enable_sharding:
- - test
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml
index 9996d4b75c9..d2452f5c739 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml
@@ -67,8 +67,3 @@ executor:
periodicNoopIntervalSecs: 1
writePeriodicNoops: true
num_rs_nodes_per_shard: 1
- # TODO SERVER-34138: This suite doesn't actually shard any collections, but enabling sharding
- # will prevent read commands against non-existent databases from unconditionally returning a
- # CursorId of 0.
- enable_sharding:
- - test
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml
index 945c1c8ddfb..96332cfe463 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml
@@ -83,8 +83,3 @@ executor:
num_rs_nodes_per_shard: 2
shard_options:
voting_secondaries: true
- # TODO SERVER-34138: This suite doesn't actually shard any collections, but enabling sharding
- # will prevent read commands against non-existent databases from unconditionally returning a
- # CursorId of 0.
- enable_sharding:
- - test
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
index 43928e3d11f..94664da3a44 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
@@ -13,6 +13,7 @@ selector:
- jstests/sharding/database_versioning_upgrade_downgrade.js
- jstests/sharding/shard_collection_cache_upgrade_downgrade.js
#### Enable when 4.0 becomes last-stable.
+ - jstests/sharding/change_stream_no_shards.js
- jstests/sharding/change_streams_unsharded_becomes_sharded.js
- jstests/sharding/create_database.js
- jstests/sharding/database_and_shard_versioning_all_commands.js
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml
index 5157e0c189a..2a0741edc54 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml
@@ -21,6 +21,7 @@ selector:
- jstests/sharding/database_versioning_upgrade_downgrade.js
- jstests/sharding/shard_collection_cache_upgrade_downgrade.js
#### Enable when 4.0 becomes last-stable.
+ - jstests/sharding/change_stream_no_shards.js
- jstests/sharding/change_streams_unsharded_becomes_sharded.js
- jstests/sharding/create_database.js
- jstests/sharding/database_and_shard_versioning_all_commands.js
diff --git a/jstests/change_streams/change_stream_collation.js b/jstests/change_streams/change_stream_collation.js
index 0f2c4ee22e3..ceb1eb463dc 100644
--- a/jstests/change_streams/change_stream_collation.js
+++ b/jstests/change_streams/change_stream_collation.js
@@ -21,10 +21,6 @@
const caseInsensitive = {locale: "en_US", strength: 2};
- // $changeStream cannot run on a non-existent database. Create an unrelated collection to
- // ensure that the database is present before testing.
- assertDropAndRecreateCollection(db, "change_stream_ensure_db_exists");
-
let caseInsensitiveCollection = "change_stream_case_insensitive";
assertDropCollection(db, caseInsensitiveCollection);
diff --git a/jstests/change_streams/change_stream_does_not_implicitly_create_database.js b/jstests/change_streams/change_stream_does_not_implicitly_create_database.js
new file mode 100644
index 00000000000..052a53585bd
--- /dev/null
+++ b/jstests/change_streams/change_stream_does_not_implicitly_create_database.js
@@ -0,0 +1,81 @@
+/**
+ * Tests that change streams can be opened on a namespace before the collection or database has been
+ * created, and will not implicitly create either.
+ */
+
+(function() {
+ "use strict";
+
+ load("jstests/libs/change_stream_util.js"); // For 'ChangeStreamTest'.
+
+ // Ensure that the test DB does not exist.
+ const testDB = db.getSiblingDB(jsTestName());
+ assert.commandWorked(testDB.dropDatabase());
+
+ let dbList = assert.commandWorked(
+ db.adminCommand({listDatabases: 1, nameOnly: true, filter: {name: testDB.getName()}}));
+ assert.docEq(dbList.databases, []);
+
+ const collName = "test";
+
+ // Start a new $changeStream on the non-existent db.
+ const cst = new ChangeStreamTest(testDB);
+ const changeStreamCursor =
+ cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: collName});
+
+ // Confirm that a $changeStream cursor has been opened on the namespace.
+ assert.gt(changeStreamCursor.id, 0);
+
+ // Confirm that the database has not been implicitly created.
+ dbList = assert.commandWorked(
+ db.adminCommand({listDatabases: 1, nameOnly: true, filter: {name: testDB.getName()}}));
+ assert.docEq(dbList.databases, []);
+
+ // Confirm that a non-$changeStream aggregation on the non-existent database returns an empty
+ // cursor.
+ const nonCsCmdRes = assert.commandWorked(
+ testDB.runCommand({aggregate: collName, pipeline: [{$match: {}}], cursor: {}}));
+ assert.docEq(nonCsCmdRes.cursor.firstBatch, []);
+ assert.eq(nonCsCmdRes.cursor.id, 0);
+
+ // Now perform some writes into the collection...
+ assert.commandWorked(testDB[collName].insert({_id: 1}));
+ assert.commandWorked(testDB[collName].insert({_id: 2}));
+ assert.commandWorked(testDB[collName].update({_id: 1}, {$set: {updated: true}}));
+ assert.commandWorked(testDB[collName].remove({_id: 2}));
+
+ // ... confirm that the database has been created...
+ dbList = assert.commandWorked(
+ db.adminCommand({listDatabases: 1, nameOnly: true, filter: {name: testDB.getName()}}));
+ assert.docEq(dbList.databases, [{name: testDB.getName()}]);
+
+ // ... and verify that the changes are observed by the stream.
+ const expectedChanges = [
+ {
+ documentKey: {_id: 1},
+ fullDocument: {_id: 1},
+ ns: {db: testDB.getName(), coll: collName},
+ operationType: "insert"
+ },
+ {
+ documentKey: {_id: 2},
+ fullDocument: {_id: 2},
+ ns: {db: testDB.getName(), coll: collName},
+ operationType: "insert"
+ },
+ {
+ documentKey: {_id: 1},
+ ns: {db: testDB.getName(), coll: collName},
+ updateDescription: {removedFields: [], updatedFields: {updated: true}},
+ operationType: "update"
+ },
+ {
+ documentKey: {_id: 2},
+ ns: {db: testDB.getName(), coll: collName},
+ operationType: "delete"
+ },
+ ];
+
+ cst.assertNextChangesEqual({cursor: changeStreamCursor, expectedChanges: expectedChanges});
+ cst.cleanUp();
+})(); \ No newline at end of file
diff --git a/jstests/sharding/change_stream_no_shards.js b/jstests/sharding/change_stream_no_shards.js
new file mode 100644
index 00000000000..e92c91d7322
--- /dev/null
+++ b/jstests/sharding/change_stream_no_shards.js
@@ -0,0 +1,39 @@
+/**
+ * Test that running a $changeStream aggregation on a cluster with no shards returns an empty result
+ * set with a cursorID of zero.
+ */
+(function() {
+ const st = new ShardingTest({shards: 0, config: 1});
+
+ const adminDB = st.s.getDB("admin");
+ const testDB = st.s.getDB("test");
+
+ // Test that attempting to open a stream on a single collection results in an empty, closed
+ // cursor response.
+ let csCmdRes = assert.commandWorked(
+ testDB.runCommand({aggregate: "testing", pipeline: [{$changeStream: {}}], cursor: {}}));
+ assert.docEq(csCmdRes.cursor.firstBatch, []);
+ assert.eq(csCmdRes.cursor.id, 0);
+
+ // Test that attempting to open a whole-db stream results in an empty, closed cursor response.
+ csCmdRes = assert.commandWorked(
+ testDB.runCommand({aggregate: 1, pipeline: [{$changeStream: {}}], cursor: {}}));
+ assert.docEq(csCmdRes.cursor.firstBatch, []);
+ assert.eq(csCmdRes.cursor.id, 0);
+
+ // Test that attempting to open a cluster-wide stream results in an empty, closed cursor
+ // response.
+ csCmdRes = assert.commandWorked(adminDB.runCommand(
+ {aggregate: 1, pipeline: [{$changeStream: {allChangesForCluster: true}}], cursor: {}}));
+ assert.docEq(csCmdRes.cursor.firstBatch, []);
+ assert.eq(csCmdRes.cursor.id, 0);
+
+ // Test that a regular, non-$changeStream aggregation also results in an empty cursor when no
+ // shards are present.
+ const nonCsCmdRes = assert.commandWorked(
+ testDB.runCommand({aggregate: "testing", pipeline: [{$match: {}}], cursor: {}}));
+ assert.docEq(nonCsCmdRes.cursor.firstBatch, []);
+ assert.eq(nonCsCmdRes.cursor.id, 0);
+
+ st.stop();
+})(); \ No newline at end of file
diff --git a/jstests/sharding/change_streams_whole_db.js b/jstests/sharding/change_streams_whole_db.js
index fc5443315a6..d43294e7772 100644
--- a/jstests/sharding/change_streams_whole_db.js
+++ b/jstests/sharding/change_streams_whole_db.js
@@ -24,15 +24,7 @@
});
const mongosDB = st.s0.getDB("test");
-
- // TODO SERVER-34138 will add support for opening a change stream before a database exists.
- assert.commandFailedWithCode(
- mongosDB.runCommand(
- {aggregate: 1, pipeline: [{$changeStream: {}}], cursor: {batchSize: 1}}),
- ErrorCodes.NamespaceNotFound);
-
const mongosColl = mongosDB[jsTestName()];
- mongosDB.createCollection(jsTestName());
let cst = new ChangeStreamTest(mongosDB);
let cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1});
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index 2f068165ed9..e00f26bebca 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -339,14 +339,8 @@ Status runAggregate(OperationContext* opCtx,
uassertStatusOK(waitForReadConcern(opCtx, readConcern, true));
}
- // If the change stream is opened against a database which does not exist yet, go ahead
- // and create it. Use MODE_IX since the AutoGetOrCreateDb helper will automatically
- // reacquire as MODE_X if the database does not exist.
- AutoGetOrCreateDb dbLock(opCtx, origNss.db(), MODE_IX);
- invariant(dbLock.getDb());
if (!origNss.isCollectionlessAggregateNS()) {
- // AutoGetCollectionForReadCommand will raise an error if the given namespace is a
- // view.
+ // AutoGetCollectionForReadCommand will raise an error if 'origNss' is a view.
AutoGetCollectionForReadCommand origNssCtx(opCtx, origNss);
// Resolve the collator to either the user-specified collation or the default
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp
index 46cb784032a..919f3318c7e 100644
--- a/src/mongo/s/commands/cluster_aggregate.cpp
+++ b/src/mongo/s/commands/cluster_aggregate.cpp
@@ -149,48 +149,55 @@ Status appendCursorResponseToCommandResult(const ShardId& shardId,
}
bool mustRunOnAllShards(const NamespaceString& nss,
- const CachedCollectionRoutingInfo& routingInfo,
+ const boost::optional<CachedCollectionRoutingInfo>& routingInfo,
const LiteParsedPipeline& litePipe) {
- // Any collectionless aggregation like a $currentOp, and a change stream on a sharded collection
- // must run on all shards.
- const bool nsIsSharded = static_cast<bool>(routingInfo.cm());
- return nss.isCollectionlessAggregateNS() || (nsIsSharded && litePipe.hasChangeStream());
+ // Non-existent routing table is only valid for $changeStream aggregations which have been
+ // opened prior to the creation of the target database.
+ invariant(routingInfo || litePipe.hasChangeStream());
+ // The following aggregations must be routed to all shards:
+ // - Any collectionless aggregation such as $currentOp
+ // - $changeStream on a non-existent database
+ // - $changeStream on a sharded collection
+ const bool dbExists = static_cast<bool>(routingInfo);
+ const bool nsIsSharded = dbExists && static_cast<bool>(routingInfo->cm());
+ return !dbExists || nss.isCollectionlessAggregateNS() ||
+ (nsIsSharded && litePipe.hasChangeStream());
}
StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo(OperationContext* opCtx,
- const NamespaceString& execNss,
- CatalogCache* catalogCache) {
- // This call to getCollectionRoutingInfo will return !OK if the database does not exist.
- auto swRoutingInfo = catalogCache->getCollectionRoutingInfo(opCtx, execNss);
-
- // Collectionless aggregations, however, may be run on 'admin' (which should always exist) but
- // are subsequently targeted towards the shards. If getCollectionRoutingInfo is OK, we perform a
- // further check that at least one shard exists if the aggregation is collectionless.
- if (swRoutingInfo.isOK() && execNss.isCollectionlessAggregateNS()) {
- std::vector<ShardId> shardIds;
- Grid::get(opCtx)->shardRegistry()->getAllShardIdsNoReload(&shardIds);
-
- if (shardIds.size() == 0) {
- return {ErrorCodes::NamespaceNotFound, "No shards are present in the cluster"};
- }
+ const NamespaceString& execNss) {
+ // First, verify that there are shards present in the cluster. If not, then we return the
+ // stronger 'ShardNotFound' error rather than 'NamespaceNotFound'. We must do this because
+ // $changeStream aggregations ignore NamespaceNotFound in order to allow streams to be opened on
+ // a collection before its enclosing database is created. However, if there are no shards
+ // present, then $changeStream should immediately return an empty cursor just as other
+ // aggregations do when the database does not exist.
+ std::vector<ShardId> shardIds;
+ Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx, &shardIds);
+ if (shardIds.size() == 0) {
+ return {ErrorCodes::ShardNotFound, "No shards are present in the cluster"};
}
- return swRoutingInfo;
+ // This call to getCollectionRoutingInfo will return !OK if the database does not exist.
+ return Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, execNss);
}
std::set<ShardId> getTargetedShards(OperationContext* opCtx,
bool mustRunOnAllShards,
- const CachedCollectionRoutingInfo& routingInfo,
+ const boost::optional<CachedCollectionRoutingInfo>& routingInfo,
const BSONObj shardQuery,
const BSONObj collation) {
if (mustRunOnAllShards) {
// The pipeline begins with a stage which must be run on all shards.
std::vector<ShardId> shardIds;
- Grid::get(opCtx)->shardRegistry()->getAllShardIdsNoReload(&shardIds);
+ Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx, &shardIds);
return {shardIds.begin(), shardIds.end()};
}
- return getTargetedShardsForQuery(opCtx, routingInfo, shardQuery, collation);
+ // If we don't need to run on all shards, then we should always have a valid routing table.
+ invariant(routingInfo);
+
+ return getTargetedShardsForQuery(opCtx, *routingInfo, shardQuery, collation);
}
BSONObj createCommandForTargetedShards(
@@ -264,21 +271,25 @@ BSONObj createCommandForMergingShard(
return appendAllowImplicitCreate(mergeCmd.freeze().toBson(), true);
}
-std::vector<RemoteCursor> establishShardCursors(OperationContext* opCtx,
- const NamespaceString& nss,
- const LiteParsedPipeline& litePipe,
- CachedCollectionRoutingInfo* routingInfo,
- const BSONObj& cmdObj,
- const ReadPreferenceSetting& readPref,
- const BSONObj& shardQuery,
- const BSONObj& collation) {
+std::vector<RemoteCursor> establishShardCursors(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const LiteParsedPipeline& litePipe,
+ boost::optional<CachedCollectionRoutingInfo>& routingInfo,
+ const BSONObj& cmdObj,
+ const ReadPreferenceSetting& readPref,
+ const BSONObj& shardQuery,
+ const BSONObj& collation) {
LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards";
- bool mustRunOnAll = mustRunOnAllShards(nss, *routingInfo, litePipe);
+ const bool mustRunOnAll = mustRunOnAllShards(nss, routingInfo, litePipe);
std::set<ShardId> shardIds =
- getTargetedShards(opCtx, mustRunOnAll, *routingInfo, shardQuery, collation);
+ getTargetedShards(opCtx, mustRunOnAll, routingInfo, shardQuery, collation);
std::vector<std::pair<ShardId, BSONObj>> requests;
+ // If we don't need to run on all shards, then we should always have a valid routing table.
+ invariant(routingInfo || mustRunOnAll);
+
if (mustRunOnAll) {
// The pipeline contains a stage which must be run on all shards. Skip versioning and
// enqueue the raw command objects.
@@ -399,20 +410,25 @@ DispatchShardPipelineResults dispatchShardPipeline(
while (++numAttempts <= kMaxNumStaleVersionRetries) {
// We need to grab a new routing table at the start of each iteration, since a stale config
// exception will invalidate the previous one.
- auto executionNsRoutingInfo = uassertStatusOK(
- Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, executionNss));
+ auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, executionNss);
+
+ // If this is a $changeStream, we swallow NamespaceNotFound exceptions and continue.
+ // Otherwise, uassert on all exceptions here.
+ if (!(liteParsedPipeline.hasChangeStream() &&
+ executionNsRoutingInfoStatus == ErrorCodes::NamespaceNotFound)) {
+ uassertStatusOK(executionNsRoutingInfoStatus);
+ }
+
+ auto executionNsRoutingInfo = executionNsRoutingInfoStatus.isOK()
+ ? std::move(executionNsRoutingInfoStatus.getValue())
+ : boost::optional<CachedCollectionRoutingInfo>{};
// Determine whether we can run the entire aggregation on a single shard.
- bool mustRunOnAll =
+ const bool mustRunOnAll =
mustRunOnAllShards(executionNss, executionNsRoutingInfo, liteParsedPipeline);
std::set<ShardId> shardIds = getTargetedShards(
opCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, aggRequest.getCollation());
- uassert(ErrorCodes::ShardNotFound,
- "No targets were found for this aggregation. All shards were removed from the "
- "cluster mid-operation",
- shardIds.size() > 0);
-
auto atClusterTime = computeAtClusterTime(
opCtx, mustRunOnAll, shardIds, executionNss, shardQuery, aggRequest.getCollation());
@@ -423,8 +439,8 @@ DispatchShardPipelineResults dispatchShardPipeline(
// is not the primary.
// - The pipeline contains one or more stages which must always merge on mongoS.
const bool needsSplit = (shardIds.size() > 1u || needsMongosMerge ||
- (needsPrimaryShardMerge &&
- *(shardIds.begin()) != executionNsRoutingInfo.db().primaryId()));
+ (needsPrimaryShardMerge && executionNsRoutingInfo &&
+ *shardIds.begin() != executionNsRoutingInfo->db().primaryId()));
const bool isSplit = pipelineForTargetedShards->isSplitForShards();
@@ -444,7 +460,7 @@ DispatchShardPipelineResults dispatchShardPipeline(
// Refresh the shard registry if we're targeting all shards. We need the shard registry
// to be at least as current as the logical time used when creating the command for
// $changeStream to work reliably, so we do a "hard" reload.
- if (mustRunOnAllShards(executionNss, executionNsRoutingInfo, liteParsedPipeline)) {
+ if (mustRunOnAll) {
auto* shardRegistry = Grid::get(opCtx)->shardRegistry();
if (!shardRegistry->reload(opCtx)) {
shardRegistry->reload(opCtx);
@@ -454,7 +470,7 @@ DispatchShardPipelineResults dispatchShardPipeline(
// Explain does not produce a cursor, so instead we scatter-gather commands to the shards.
try {
if (expCtx->explain) {
- if (mustRunOnAllShards(executionNss, executionNsRoutingInfo, liteParsedPipeline)) {
+ if (mustRunOnAll) {
// Some stages (such as $currentOp) need to be broadcast to all shards, and
// should not participate in the shard version protocol.
shardResults =
@@ -466,11 +482,12 @@ DispatchShardPipelineResults dispatchShardPipeline(
} else {
// Aggregations on a real namespace should use the routing table to target
// shards, and should participate in the shard version protocol.
+ invariant(executionNsRoutingInfo);
shardResults = scatterGatherVersionedTargetByRoutingTable(
opCtx,
executionNss.db(),
executionNss,
- executionNsRoutingInfo,
+ *executionNsRoutingInfo,
targetedCommand,
ReadPreferenceSetting::get(opCtx),
Shard::RetryPolicy::kIdempotent,
@@ -481,7 +498,7 @@ DispatchShardPipelineResults dispatchShardPipeline(
cursors = establishShardCursors(opCtx,
executionNss,
liteParsedPipeline,
- &executionNsRoutingInfo,
+ executionNsRoutingInfo,
targetedCommand,
ReadPreferenceSetting::get(opCtx),
shardQuery,
@@ -502,8 +519,9 @@ DispatchShardPipelineResults dispatchShardPipeline(
// Record the number of shards involved in the aggregation. If we are required to merge on
// the primary shard, but the primary shard was not in the set of targeted shards, then we
// must increment the number of involved shards.
- CurOp::get(opCtx)->debug().nShards = shardIds.size() +
- (needsPrimaryShardMerge && !shardIds.count(executionNsRoutingInfo.db().primaryId()));
+ CurOp::get(opCtx)->debug().nShards =
+ shardIds.size() + (needsPrimaryShardMerge && executionNsRoutingInfo &&
+ !shardIds.count(executionNsRoutingInfo->db().primaryId()));
break; // Success!
}
@@ -687,224 +705,274 @@ ShardId pickMergingShard(OperationContext* opCtx,
.toString();
}
-} // namespace
-
-Status ClusterAggregate::runAggregate(OperationContext* opCtx,
- const Namespaces& namespaces,
- const AggregationRequest& request,
- BSONObj cmdObj,
- BSONObjBuilder* result) {
- const auto catalogCache = Grid::get(opCtx)->catalogCache();
-
- auto executionNsRoutingInfoStatus =
- getExecutionNsRoutingInfo(opCtx, namespaces.executionNss, catalogCache);
-
- LiteParsedPipeline liteParsedPipeline(request);
-
- if (!executionNsRoutingInfoStatus.isOK()) {
- // Standard aggregations swallow 'NamespaceNotFound' and return an empty cursor with id 0 in
- // the event that the database does not exist. For $changeStream aggregations, however, we
- // throw the exception in all error cases, including that of a non-existent database.
- if (liteParsedPipeline.hasChangeStream()) {
- uassertStatusOKWithContext(executionNsRoutingInfoStatus.getStatus(),
- "failed to open $changeStream");
- }
- appendEmptyResultSet(
- opCtx, *result, executionNsRoutingInfoStatus.getStatus(), namespaces.requestedNss.ns());
-
- return Status::OK();
- }
-
- auto executionNsRoutingInfo = executionNsRoutingInfoStatus.getValue();
-
- // Determine the appropriate collation and 'resolve' involved namespaces to make the
- // ExpressionContext.
-
- // We won't try to execute anything on a mongos, but we still have to populate this map so that
- // any $lookups, etc. will be able to have a resolved view definition. It's okay that this is
- // incorrect, we will repopulate the real resolved namespace map on the mongod. Note that we
- // need to check if any involved collections are sharded before forwarding an aggregation
- // command on an unsharded collection.
+// "Resolve" involved namespaces and verify that none of them are sharded. We won't try to execute
+// anything on a mongos, but we still have to populate this map so that any $lookups, etc. will be
+// able to have a resolved view definition. It's okay that this is incorrect, we will repopulate the
+// real namespace map on the mongod. Note that this function must be called before forwarding an
+// aggregation command on an unsharded collection, in order to validate that none of the involved
+// collections are sharded.
+StringMap<ExpressionContext::ResolvedNamespace> resolveInvolvedNamespaces(
+ OperationContext* opCtx, const LiteParsedPipeline& litePipe) {
StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces;
-
- for (auto&& nss : liteParsedPipeline.getInvolvedNamespaces()) {
+ for (auto&& nss : litePipe.getInvolvedNamespaces()) {
const auto resolvedNsRoutingInfo =
- uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss));
+ uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss));
uassert(
28769, str::stream() << nss.ns() << " cannot be sharded", !resolvedNsRoutingInfo.cm());
resolvedNamespaces.try_emplace(nss.coll(), nss, std::vector<BSONObj>{});
}
+ return resolvedNamespaces;
+}
- // If this pipeline is on an unsharded collection, is allowed to be forwarded to shards, does
- // not need to run on all shards, and doesn't need transformation via
- // DocumentSource::serialize(), then go ahead and pass it through to the owning shard
- // unmodified.
- if (!executionNsRoutingInfo.cm() &&
- !mustRunOnAllShards(namespaces.executionNss, executionNsRoutingInfo, liteParsedPipeline) &&
- liteParsedPipeline.allowedToForwardFromMongos() &&
- liteParsedPipeline.allowedToPassthroughFromMongos()) {
- return aggPassthrough(opCtx,
- namespaces,
- executionNsRoutingInfo.db().primary()->getId(),
- cmdObj,
- request,
- liteParsedPipeline,
- result);
- }
-
+// Build an appropriate ExpressionContext for the pipeline. This helper validates that all involved
+// namespaces are unsharded, obtains the appropriate user-defined or default collator, and creates a
+// MongoProcessInterface for use by the pipeline's stages.
+boost::intrusive_ptr<ExpressionContext> makeExpressionContext(
+ OperationContext* opCtx,
+ const NamespaceString& executionNss,
+ const AggregationRequest& request,
+ const LiteParsedPipeline& litePipe,
+ const boost::optional<CachedCollectionRoutingInfo>& routingInfo) {
+ // Determine the appropriate collation for the ExpressionContext.
+ const bool collectionIsSharded = (routingInfo && routingInfo->cm());
+ const bool collectionIsNotSharded = (routingInfo && !routingInfo->cm());
std::unique_ptr<CollatorInterface> collation;
+
if (!request.getCollation().isEmpty()) {
collation = uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext())
->makeFromBSON(request.getCollation()));
- } else if (const auto chunkMgr = executionNsRoutingInfo.cm()) {
- if (chunkMgr->getDefaultCollator()) {
- collation = chunkMgr->getDefaultCollator()->clone();
+ } else if (collectionIsSharded) {
+ if (routingInfo->cm()->getDefaultCollator()) {
+ collation = routingInfo->cm()->getDefaultCollator()->clone();
}
- } else {
- // Unsharded collection. Get collection metadata from primary chunk.
+ } else if (collectionIsNotSharded) {
+ // Get collection metadata from primary chunk.
auto collationObj = getDefaultCollationForUnshardedCollection(
- executionNsRoutingInfo.db().primary().get(), namespaces.executionNss);
+ routingInfo->db().primary().get(), executionNss);
if (!collationObj.isEmpty()) {
collation = uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext())
->makeFromBSON(collationObj));
}
}
- boost::intrusive_ptr<ExpressionContext> mergeCtx =
- new ExpressionContext(opCtx,
- request,
- std::move(collation),
- std::make_shared<PipelineS::MongoSInterface>(),
- std::move(resolvedNamespaces));
+ // Create the expression context, and set 'inMongos' to true before returning it. We explicitly
+ // do *not* set mergeCtx->tempDir. Note that we resolve the pipeline's involved namespaces here,
+ // validating that none of them are sharded.
+ auto mergeCtx = new ExpressionContext(opCtx,
+ request,
+ std::move(collation),
+ std::make_shared<PipelineS::MongoSInterface>(),
+ resolveInvolvedNamespaces(opCtx, litePipe));
mergeCtx->inMongos = true;
- // explicitly *not* setting mergeCtx->tempDir
-
- auto pipeline = uassertStatusOK(Pipeline::parse(request.getPipeline(), mergeCtx));
- pipeline->optimizePipeline();
+ return mergeCtx;
+}
- // Check whether the entire pipeline must be run on mongoS.
- if (pipeline->requiredToRunOnMongos()) {
- uassert(ErrorCodes::IllegalOperation,
- str::stream() << "Aggregation pipeline must be run on mongoS, but "
- << pipeline->getSources().front()->getSourceName()
- << " is not capable of producing input",
+// Runs a pipeline on mongoS, having first validated that it is eligible to do so. This can be a
+// pipeline which is split for merging, or an intact pipeline which must run entirely on mongoS.
+Status runPipelineOnMongoS(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const ClusterAggregate::Namespaces& namespaces,
+ const AggregationRequest& request,
+ BSONObj cmdObj,
+ const LiteParsedPipeline& litePipe,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
+ std::vector<RemoteCursor>&& cursors,
+ BSONObjBuilder* result) {
+ // We should never receive a pipeline intended for the shards, or which cannot run on mongoS.
+ invariant(!pipeline->isSplitForShards());
+ invariant(pipeline->canRunOnMongos());
+
+ const auto& requestedNss = namespaces.requestedNss;
+ const auto opCtx = expCtx->opCtx;
+
+ // If this is an unsplit mongoS-only pipeline, verify that the first stage can produce input for
+ // the remainder of the pipeline.
+ uassert(ErrorCodes::IllegalOperation,
+ str::stream() << "Aggregation pipeline must be run on mongoS, but "
+ << pipeline->getSources().front()->getSourceName()
+ << " is not capable of producing input",
+ pipeline->isSplitForMerge() ||
!pipeline->getSources().front()->constraints().requiresInputDocSource);
- if (mergeCtx->explain) {
- *result << "splitPipeline" << BSONNULL << "mongos"
- << Document{{"host", getHostNameCachedAndPort()},
- {"stages", pipeline->writeExplainOps(*mergeCtx->explain)}};
- return Status::OK();
- }
-
- auto cursorResponse = establishMergingMongosCursor(opCtx,
- request,
- namespaces.requestedNss,
- cmdObj,
- liteParsedPipeline,
- std::move(pipeline),
- {});
- CommandHelpers::filterCommandReplyForPassthrough(cursorResponse, result);
- return getStatusFromCommandResult(result->asTempObj());
- }
-
- auto dispatchResults = dispatchShardPipeline(mergeCtx,
- namespaces.executionNss,
- cmdObj,
- request,
- liteParsedPipeline,
- std::move(pipeline));
-
- if (mergeCtx->explain) {
- // If we reach here, we've either succeeded in running the explain or exhausted all
- // attempts. In either case, attempt to append the explain results to the output builder.
- uassertAllShardsSupportExplain(dispatchResults.remoteExplainOutput);
-
- return appendExplainResults(std::move(dispatchResults.remoteExplainOutput),
- mergeCtx,
- dispatchResults.pipelineForTargetedShards,
- dispatchResults.pipelineForMerging,
- result);
+ // If this is an explain and the pipeline is not split, write the explain output and return.
+ if (expCtx->explain && !pipeline->isSplitForMerge()) {
+ *result << "splitPipeline" << BSONNULL << "mongos"
+ << Document{{"host", getHostNameCachedAndPort()},
+ {"stages", pipeline->writeExplainOps(*expCtx->explain)}};
+ return Status::OK();
}
+ // Register the new mongoS cursor, and retrieve the initial batch of results.
+ auto cursorResponse = establishMergingMongosCursor(
+ opCtx, request, requestedNss, cmdObj, litePipe, std::move(pipeline), std::move(cursors));
- invariant(dispatchResults.remoteCursors.size() > 0);
-
- // If we dispatched to a single shard, store the remote cursor and return immediately.
- if (!dispatchResults.pipelineForTargetedShards->isSplitForShards()) {
- invariant(dispatchResults.remoteCursors.size() == 1);
- const auto& remoteCursor = dispatchResults.remoteCursors[0];
- auto executorPool = Grid::get(opCtx)->getExecutorPool();
- const BSONObj reply = uassertStatusOK(storePossibleCursor(
- opCtx,
- remoteCursor.getShardId().toString(),
- remoteCursor.getHostAndPort(),
- remoteCursor.getCursorResponse().toBSON(CursorResponse::ResponseType::InitialResponse),
- namespaces.requestedNss,
- executorPool->getArbitraryExecutor(),
- Grid::get(opCtx)->getCursorManager(),
- mergeCtx->tailableMode));
+ // We don't need to storePossibleCursor or propagate writeConcern errors; an $out pipeline
+ // can never run on mongoS. Filter the command response and return immediately.
+ CommandHelpers::filterCommandReplyForPassthrough(cursorResponse, result);
+ return getStatusFromCommandResult(result->asTempObj());
+}
- return appendCursorResponseToCommandResult(
- remoteCursor.getShardId().toString(), reply, result);
- }
+Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const ClusterAggregate::Namespaces& namespaces,
+ const AggregationRequest& request,
+ BSONObj cmdObj,
+ const LiteParsedPipeline& litePipe,
+ const boost::optional<CachedCollectionRoutingInfo>& routingInfo,
+ DispatchShardPipelineResults& shardDispatchResults,
+ BSONObjBuilder* result) {
+ // We should never be in a situation where we call this function on a non-merge pipeline.
+ auto& mergingPipeline = shardDispatchResults.pipelineForMerging;
+ invariant(mergingPipeline && mergingPipeline->isSplitForMerge());
- // If we reach here, we have a merge pipeline to dispatch.
- auto mergingPipeline = std::move(dispatchResults.pipelineForMerging);
- invariant(mergingPipeline);
+ const auto opCtx = expCtx->opCtx;
// First, check whether we can merge on the mongoS. If the merge pipeline MUST run on mongoS,
// then ignore the internalQueryProhibitMergingOnMongoS parameter.
if (mergingPipeline->requiredToRunOnMongos() ||
(!internalQueryProhibitMergingOnMongoS.load() && mergingPipeline->canRunOnMongos())) {
- // Register the new mongoS cursor, and retrieve the initial batch of results.
- auto cursorResponse =
- establishMergingMongosCursor(opCtx,
- request,
- namespaces.requestedNss,
- dispatchResults.commandForTargetedShards,
- liteParsedPipeline,
- std::move(mergingPipeline),
- std::move(dispatchResults.remoteCursors));
-
- // We don't need to storePossibleCursor or propagate writeConcern errors; an $out pipeline
- // can never run on mongoS. Filter the command response and return immediately.
- CommandHelpers::filterCommandReplyForPassthrough(cursorResponse, result);
- return getStatusFromCommandResult(result->asTempObj());
+ return runPipelineOnMongoS(expCtx,
+ namespaces,
+ request,
+ shardDispatchResults.commandForTargetedShards,
+ litePipe,
+ std::move(mergingPipeline),
+ std::move(shardDispatchResults.remoteCursors),
+ result);
}
+ // If we are not merging on mongoS, then this is not a $changeStream aggregation, and we
+ // therefore must have a valid routing table.
+ invariant(routingInfo);
+
// TODO SERVER-33683 allowing an aggregation within a transaction can lead to a deadlock in the
// SessionCatalog when a pipeline with a $mergeCursors sends a getMore to itself.
uassert(50732,
"Cannot specify a transaction number in combination with an aggregation on mongos when "
- "merigng on a shard",
+ "merging on a shard",
!opCtx->getTxnNumber());
+
ShardId mergingShardId =
- pickMergingShard(opCtx, dispatchResults, executionNsRoutingInfo.db().primaryId());
+ pickMergingShard(opCtx, shardDispatchResults, routingInfo->db().primaryId());
cluster_aggregation_planner::addMergeCursorsSource(
mergingPipeline.get(),
- std::move(dispatchResults.remoteCursors),
+ std::move(shardDispatchResults.remoteCursors),
Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor());
- auto mergeCmdObj = createCommandForMergingShard(request, mergeCtx, cmdObj, mergingPipeline);
+ auto mergeCmdObj = createCommandForMergingShard(request, expCtx, cmdObj, mergingPipeline);
+
+ // Dispatch $mergeCursors to the chosen shard, store the resulting cursor, and return.
auto mergeResponse =
establishMergingShardCursor(opCtx, namespaces.executionNss, mergeCmdObj, mergingShardId);
- // The merging shard is remote, so if a response was received, a HostAndPort must have been set.
- invariant(mergeResponse.hostAndPort);
- auto mergeCursorResponse = uassertStatusOK(
- storePossibleCursor(opCtx,
- mergingShardId,
- *mergeResponse.hostAndPort,
- mergeResponse.response,
- namespaces.requestedNss,
- Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
- Grid::get(opCtx)->getCursorManager()));
+ auto mergeCursorResponse = uassertStatusOK(storePossibleCursor(
+ opCtx, namespaces.requestedNss, mergingShardId, mergeResponse, expCtx->tailableMode));
return appendCursorResponseToCommandResult(mergingShardId, mergeCursorResponse, result);
}
+void appendEmptyResultSetWithStatus(OperationContext* opCtx,
+ const NamespaceString& nss,
+ Status status,
+ BSONObjBuilder* result) {
+ // Rewrite ShardNotFound as NamespaceNotFound so that appendEmptyResultSet swallows it.
+ if (status == ErrorCodes::ShardNotFound) {
+ status = {ErrorCodes::NamespaceNotFound, status.reason()};
+ }
+ appendEmptyResultSet(opCtx, *result, status, nss.ns());
+}
+
+} // namespace
+
+Status ClusterAggregate::runAggregate(OperationContext* opCtx,
+ const Namespaces& namespaces,
+ const AggregationRequest& request,
+ BSONObj cmdObj,
+ BSONObjBuilder* result) {
+ auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, namespaces.executionNss);
+ boost::optional<CachedCollectionRoutingInfo> routingInfo;
+ LiteParsedPipeline litePipe(request);
+
+ // If the routing table is valid, we obtain a reference to it. If the table is not valid, then
+ // either the database does not exist, or there are no shards in the cluster. In the latter
+ // case, we always return an empty cursor. In the former case, if the requested aggregation is a
+ // $changeStream, we allow the operation to continue so that stream cursors can be established
+ // on the given namespace before the database or collection is actually created. If the database
+ // does not exist and this is not a $changeStream, then we return an empty cursor.
+ if (executionNsRoutingInfoStatus.isOK()) {
+ routingInfo = std::move(executionNsRoutingInfoStatus.getValue());
+ } else if (!(litePipe.hasChangeStream() &&
+ executionNsRoutingInfoStatus == ErrorCodes::NamespaceNotFound)) {
+ appendEmptyResultSetWithStatus(
+ opCtx, namespaces.requestedNss, executionNsRoutingInfoStatus.getStatus(), result);
+ return Status::OK();
+ }
+
+ // Determine whether this aggregation must be dispatched to all shards in the cluster.
+ const bool mustRunOnAll = mustRunOnAllShards(namespaces.executionNss, routingInfo, litePipe);
+
+ // If we don't have a routing table, then this is a $changeStream which must run on all shards.
+ invariant(routingInfo || (mustRunOnAll && litePipe.hasChangeStream()));
+
+ // If this pipeline is not on a sharded collection, is allowed to be forwarded to shards, does
+ // not need to run on all shards, and doesn't need to go through DocumentSource::serialize(),
+ // then go ahead and pass it through to the owning shard unmodified. Note that we first call
+ // resolveInvolvedNamespaces to validate that none of the namespaces are sharded.
+ if (routingInfo && !routingInfo->cm() && !mustRunOnAll &&
+ litePipe.allowedToForwardFromMongos() && litePipe.allowedToPassthroughFromMongos()) {
+ resolveInvolvedNamespaces(opCtx, litePipe);
+ const auto primaryShardId = routingInfo->db().primary()->getId();
+ return aggPassthrough(opCtx, namespaces, primaryShardId, cmdObj, request, litePipe, result);
+ }
+
+ // Build an ExpressionContext for the pipeline. This instantiates an appropriate collator,
+ // resolves all involved namespaces, and creates a shared MongoProcessInterface for use by the
+ // pipeline's stages.
+ auto expCtx =
+ makeExpressionContext(opCtx, namespaces.executionNss, request, litePipe, routingInfo);
+
+ // Parse and optimize the full pipeline.
+ auto pipeline = uassertStatusOK(Pipeline::parse(request.getPipeline(), expCtx));
+ pipeline->optimizePipeline();
+
+ // Check whether the entire pipeline must be run on mongoS.
+ if (pipeline->requiredToRunOnMongos()) {
+ return runPipelineOnMongoS(
+ expCtx, namespaces, request, cmdObj, litePipe, std::move(pipeline), {}, result);
+ }
+
+ // If not, split the pipeline as necessary and dispatch to the relevant shards.
+ auto shardDispatchResults = dispatchShardPipeline(
+ expCtx, namespaces.executionNss, cmdObj, request, litePipe, std::move(pipeline));
+
+ // If the operation is an explain, then we verify that it succeeded on all targeted shards,
+ // write the results to the output builder, and return immediately.
+ if (expCtx->explain) {
+ uassertAllShardsSupportExplain(shardDispatchResults.remoteExplainOutput);
+ return appendExplainResults(std::move(shardDispatchResults.remoteExplainOutput),
+ expCtx,
+ shardDispatchResults.pipelineForTargetedShards,
+ shardDispatchResults.pipelineForMerging,
+ result);
+ }
+
+ // If this isn't an explain, then we must have established cursors on at least one shard.
+ invariant(shardDispatchResults.remoteCursors.size() > 0);
+
+ // If we sent the entire pipeline to a single shard, store the remote cursor and return.
+ if (!shardDispatchResults.pipelineForTargetedShards->isSplitForShards()) {
+ invariant(shardDispatchResults.remoteCursors.size() == 1);
+ auto& remoteCursor = shardDispatchResults.remoteCursors.front();
+ const auto reply = uassertStatusOK(storePossibleCursor(
+ opCtx, namespaces.requestedNss, remoteCursor, expCtx->tailableMode));
+ return appendCursorResponseToCommandResult(
+ remoteCursor.getShardId().toString(), reply, result);
+ }
+
+ // If we reach here, we have a merge pipeline to dispatch.
+ return dispatchMergingPipeline(
+ expCtx, namespaces, request, cmdObj, litePipe, routingInfo, shardDispatchResults, result);
+}
+
void ClusterAggregate::uassertAllShardsSupportExplain(
const std::vector<AsyncRequestsSender::Response>& shardResults) {
for (const auto& result : shardResults) {
@@ -969,19 +1037,11 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx,
// If this was an explain, then we get back an explain result object rather than a cursor.
result = cmdResponse.response;
} else {
- // The merging shard is remote, so if a response was received, a HostAndPort must have been
- // set.
- invariant(cmdResponse.hostAndPort);
+ auto tailMode = liteParsedPipeline.hasChangeStream()
+ ? TailableModeEnum::kTailableAndAwaitData
+ : TailableModeEnum::kNormal;
result = uassertStatusOK(storePossibleCursor(
- opCtx,
- shard->getId(),
- *cmdResponse.hostAndPort,
- cmdResponse.response,
- namespaces.requestedNss,
- Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
- Grid::get(opCtx)->getCursorManager(),
- liteParsedPipeline.hasChangeStream() ? TailableModeEnum::kTailableAndAwaitData
- : TailableModeEnum::kNormal));
+ opCtx, namespaces.requestedNss, shard->getId(), cmdResponse, tailMode));
}
// First append the properly constructed writeConcernError. It will then be skipped
diff --git a/src/mongo/s/query/store_possible_cursor.cpp b/src/mongo/s/query/store_possible_cursor.cpp
index fa67241d1cd..28db3452a29 100644
--- a/src/mongo/s/query/store_possible_cursor.cpp
+++ b/src/mongo/s/query/store_possible_cursor.cpp
@@ -35,6 +35,8 @@
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/curop.h"
#include "mongo/db/query/cursor_response.h"
+#include "mongo/executor/task_executor_pool.h"
+#include "mongo/s/grid.h"
#include "mongo/s/query/cluster_client_cursor_impl.h"
#include "mongo/s/query/cluster_client_cursor_params.h"
#include "mongo/s/query/cluster_cursor_manager.h"
@@ -43,6 +45,39 @@
namespace mongo {
StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
+ const NamespaceString& requestedNss,
+ const RemoteCursor& remoteCursor,
+ TailableModeEnum tailableMode) {
+ auto executorPool = Grid::get(opCtx)->getExecutorPool();
+ return storePossibleCursor(
+ opCtx,
+ remoteCursor.getShardId().toString(),
+ remoteCursor.getHostAndPort(),
+ remoteCursor.getCursorResponse().toBSON(CursorResponse::ResponseType::InitialResponse),
+ requestedNss,
+ executorPool->getArbitraryExecutor(),
+ Grid::get(opCtx)->getCursorManager(),
+ tailableMode);
+}
+
+StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
+ const NamespaceString& requestedNss,
+ const ShardId& shardId,
+ const Shard::CommandResponse& commandResponse,
+ TailableModeEnum tailableMode) {
+ invariant(commandResponse.hostAndPort);
+ auto executorPool = Grid::get(opCtx)->getExecutorPool();
+ return storePossibleCursor(opCtx,
+ shardId,
+ *commandResponse.hostAndPort,
+ commandResponse.response,
+ requestedNss,
+ executorPool->getArbitraryExecutor(),
+ Grid::get(opCtx)->getCursorManager(),
+ tailableMode);
+}
+
+StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
const ShardId& shardId,
const HostAndPort& server,
const BSONObj& cmdResult,
diff --git a/src/mongo/s/query/store_possible_cursor.h b/src/mongo/s/query/store_possible_cursor.h
index b9756be44f7..a364744a386 100644
--- a/src/mongo/s/query/store_possible_cursor.h
+++ b/src/mongo/s/query/store_possible_cursor.h
@@ -31,12 +31,13 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/query/tailable_mode.h"
-#include "mongo/s/shard_id.h"
+#include "mongo/s/client/shard.h"
namespace mongo {
class BSONObj;
class ClusterCursorManager;
+class RemoteCursor;
template <typename T>
class StatusWith;
struct HostAndPort;
@@ -76,4 +77,22 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
ClusterCursorManager* cursorManager,
TailableModeEnum tailableMode = TailableModeEnum::kNormal);
+/**
+ * Convenience function which extracts all necessary information from the passed RemoteCursor, and
+ * stores a ClusterClientCursor based on it.
+ */
+StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
+ const NamespaceString& requestedNss,
+ const RemoteCursor& remoteCursor,
+ TailableModeEnum tailableMode);
+
+/**
+ * Convenience function which extracts all necessary information from the passed CommandResponse,
+ * and stores a ClusterClientCursor based on it.
+ */
+StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
+ const NamespaceString& requestedNss,
+ const ShardId& shardId,
+ const Shard::CommandResponse& commandResponse,
+ TailableModeEnum tailableMode);
} // namespace mongo