diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2018-04-23 23:20:31 +0100 |
---|---|---|
committer | Bernard Gorman <bernard.gorman@gmail.com> | 2018-04-29 16:16:16 +0100 |
commit | 793e59f11b558db3d833a12ec23bbacc359011a1 (patch) | |
tree | 14f832460035d1f604341677fc2790986177e0ff | |
parent | 4ddf18bcf4d517c3dc0f005f9222ffaab9a86ffa (diff) | |
download | mongo-793e59f11b558db3d833a12ec23bbacc359011a1.tar.gz |
SERVER-34138 Allow change stream to be opened against non-existent DB and collection in sharded cluster
16 files changed, 465 insertions, 277 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml index c8bab397c18..5af7b8cd63b 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml @@ -64,8 +64,3 @@ executor: periodicNoopIntervalSecs: 1 writePeriodicNoops: true num_rs_nodes_per_shard: 1 - # TODO SERVER-34138: This suite doesn't actually shard any collections, but enabling sharding - # will prevent read commands against non-existent databases from unconditionally returning a - # CursorId of 0. - enable_sharding: - - test diff --git a/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml b/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml index 667291fa1ef..712986a1950 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml @@ -82,8 +82,3 @@ executor: num_rs_nodes_per_shard: 2 shard_options: voting_secondaries: true - # TODO SERVER-34138: This suite doesn't actually shard any collections, but enabling sharding - # will prevent read commands against non-existent databases from unconditionally returning a - # CursorId of 0. - enable_sharding: - - test diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml index a34bf9bda63..086f96021c3 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml @@ -67,8 +67,3 @@ executor: periodicNoopIntervalSecs: 1 writePeriodicNoops: true num_rs_nodes_per_shard: 1 - # TODO SERVER-34138: This suite doesn't actually shard any collections, but enabling sharding - # will prevent read commands against non-existent databases from unconditionally returning a - # CursorId of 0. - enable_sharding: - - test diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml index eff8a859473..651fad01bed 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml @@ -83,8 +83,3 @@ executor: num_rs_nodes_per_shard: 2 shard_options: voting_secondaries: true - # TODO SERVER-34138: This suite doesn't actually shard any collections, but enabling sharding - # will prevent read commands against non-existent databases from unconditionally returning a - # CursorId of 0. - enable_sharding: - - test diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml index 9996d4b75c9..d2452f5c739 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml @@ -67,8 +67,3 @@ executor: periodicNoopIntervalSecs: 1 writePeriodicNoops: true num_rs_nodes_per_shard: 1 - # TODO SERVER-34138: This suite doesn't actually shard any collections, but enabling sharding - # will prevent read commands against non-existent databases from unconditionally returning a - # CursorId of 0. - enable_sharding: - - test diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml index 945c1c8ddfb..96332cfe463 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml @@ -83,8 +83,3 @@ executor: num_rs_nodes_per_shard: 2 shard_options: voting_secondaries: true - # TODO SERVER-34138: This suite doesn't actually shard any collections, but enabling sharding - # will prevent read commands against non-existent databases from unconditionally returning a - # CursorId of 0. - enable_sharding: - - test diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml index 43928e3d11f..94664da3a44 100644 --- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml +++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml @@ -13,6 +13,7 @@ selector: - jstests/sharding/database_versioning_upgrade_downgrade.js - jstests/sharding/shard_collection_cache_upgrade_downgrade.js #### Enable when 4.0 becomes last-stable. + - jstests/sharding/change_stream_no_shards.js - jstests/sharding/change_streams_unsharded_becomes_sharded.js - jstests/sharding/create_database.js - jstests/sharding/database_and_shard_versioning_all_commands.js diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml index 5157e0c189a..2a0741edc54 100644 --- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml +++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml @@ -21,6 +21,7 @@ selector: - jstests/sharding/database_versioning_upgrade_downgrade.js - jstests/sharding/shard_collection_cache_upgrade_downgrade.js #### Enable when 4.0 becomes last-stable. + - jstests/sharding/change_stream_no_shards.js - jstests/sharding/change_streams_unsharded_becomes_sharded.js - jstests/sharding/create_database.js - jstests/sharding/database_and_shard_versioning_all_commands.js diff --git a/jstests/change_streams/change_stream_collation.js b/jstests/change_streams/change_stream_collation.js index 0f2c4ee22e3..ceb1eb463dc 100644 --- a/jstests/change_streams/change_stream_collation.js +++ b/jstests/change_streams/change_stream_collation.js @@ -21,10 +21,6 @@ const caseInsensitive = {locale: "en_US", strength: 2}; - // $changeStream cannot run on a non-existent database. Create an unrelated collection to - // ensure that the database is present before testing. - assertDropAndRecreateCollection(db, "change_stream_ensure_db_exists"); - let caseInsensitiveCollection = "change_stream_case_insensitive"; assertDropCollection(db, caseInsensitiveCollection); diff --git a/jstests/change_streams/change_stream_does_not_implicitly_create_database.js b/jstests/change_streams/change_stream_does_not_implicitly_create_database.js new file mode 100644 index 00000000000..052a53585bd --- /dev/null +++ b/jstests/change_streams/change_stream_does_not_implicitly_create_database.js @@ -0,0 +1,81 @@ +/** + * Tests that change streams can be opened on a namespace before the collection or database has been + * created, and will not implicitly create either. + */ + +(function() { + "use strict"; + + load("jstests/libs/change_stream_util.js"); // For 'ChangeStreamTest'. + + // Ensure that the test DB does not exist. + const testDB = db.getSiblingDB(jsTestName()); + assert.commandWorked(testDB.dropDatabase()); + + let dbList = assert.commandWorked( + db.adminCommand({listDatabases: 1, nameOnly: true, filter: {name: testDB.getName()}})); + assert.docEq(dbList.databases, []); + + const collName = "test"; + + // Start a new $changeStream on the non-existent db. + const cst = new ChangeStreamTest(testDB); + const changeStreamCursor = + cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: collName}); + + // Confirm that a $changeStream cursor has been opened on the namespace. + assert.gt(changeStreamCursor.id, 0); + + // Confirm that the database has not been implicitly created. + dbList = assert.commandWorked( + db.adminCommand({listDatabases: 1, nameOnly: true, filter: {name: testDB.getName()}})); + assert.docEq(dbList.databases, []); + + // Confirm that a non-$changeStream aggregation on the non-existent database returns an empty + // cursor. + const nonCsCmdRes = assert.commandWorked( + testDB.runCommand({aggregate: collName, pipeline: [{$match: {}}], cursor: {}})); + assert.docEq(nonCsCmdRes.cursor.firstBatch, []); + assert.eq(nonCsCmdRes.cursor.id, 0); + + // Now perform some writes into the collection... + assert.commandWorked(testDB[collName].insert({_id: 1})); + assert.commandWorked(testDB[collName].insert({_id: 2})); + assert.commandWorked(testDB[collName].update({_id: 1}, {$set: {updated: true}})); + assert.commandWorked(testDB[collName].remove({_id: 2})); + + // ... confirm that the database has been created... + dbList = assert.commandWorked( + db.adminCommand({listDatabases: 1, nameOnly: true, filter: {name: testDB.getName()}})); + assert.docEq(dbList.databases, [{name: testDB.getName()}]); + + // ... and verify that the changes are observed by the stream. + const expectedChanges = [ + { + documentKey: {_id: 1}, + fullDocument: {_id: 1}, + ns: {db: testDB.getName(), coll: collName}, + operationType: "insert" + }, + { + documentKey: {_id: 2}, + fullDocument: {_id: 2}, + ns: {db: testDB.getName(), coll: collName}, + operationType: "insert" + }, + { + documentKey: {_id: 1}, + ns: {db: testDB.getName(), coll: collName}, + updateDescription: {removedFields: [], updatedFields: {updated: true}}, + operationType: "update" + }, + { + documentKey: {_id: 2}, + ns: {db: testDB.getName(), coll: collName}, + operationType: "delete" + }, + ]; + + cst.assertNextChangesEqual({cursor: changeStreamCursor, expectedChanges: expectedChanges}); + cst.cleanUp(); +})();
\ No newline at end of file diff --git a/jstests/sharding/change_stream_no_shards.js b/jstests/sharding/change_stream_no_shards.js new file mode 100644 index 00000000000..e92c91d7322 --- /dev/null +++ b/jstests/sharding/change_stream_no_shards.js @@ -0,0 +1,39 @@ +/** + * Test that running a $changeStream aggregation on a cluster with no shards returns an empty result + * set with a cursorID of zero. + */ +(function() { + const st = new ShardingTest({shards: 0, config: 1}); + + const adminDB = st.s.getDB("admin"); + const testDB = st.s.getDB("test"); + + // Test that attempting to open a stream on a single collection results in an empty, closed + // cursor response. + let csCmdRes = assert.commandWorked( + testDB.runCommand({aggregate: "testing", pipeline: [{$changeStream: {}}], cursor: {}})); + assert.docEq(csCmdRes.cursor.firstBatch, []); + assert.eq(csCmdRes.cursor.id, 0); + + // Test that attempting to open a whole-db stream results in an empty, closed cursor response. + csCmdRes = assert.commandWorked( + testDB.runCommand({aggregate: 1, pipeline: [{$changeStream: {}}], cursor: {}})); + assert.docEq(csCmdRes.cursor.firstBatch, []); + assert.eq(csCmdRes.cursor.id, 0); + + // Test that attempting to open a cluster-wide stream results in an empty, closed cursor + // response. + csCmdRes = assert.commandWorked(adminDB.runCommand( + {aggregate: 1, pipeline: [{$changeStream: {allChangesForCluster: true}}], cursor: {}})); + assert.docEq(csCmdRes.cursor.firstBatch, []); + assert.eq(csCmdRes.cursor.id, 0); + + // Test that a regular, non-$changeStream aggregation also results in an empty cursor when no + // shards are present. + const nonCsCmdRes = assert.commandWorked( + testDB.runCommand({aggregate: "testing", pipeline: [{$match: {}}], cursor: {}})); + assert.docEq(nonCsCmdRes.cursor.firstBatch, []); + assert.eq(nonCsCmdRes.cursor.id, 0); + + st.stop(); +})();
\ No newline at end of file diff --git a/jstests/sharding/change_streams_whole_db.js b/jstests/sharding/change_streams_whole_db.js index fc5443315a6..d43294e7772 100644 --- a/jstests/sharding/change_streams_whole_db.js +++ b/jstests/sharding/change_streams_whole_db.js @@ -24,15 +24,7 @@ }); const mongosDB = st.s0.getDB("test"); - - // TODO SERVER-34138 will add support for opening a change stream before a database exists. - assert.commandFailedWithCode( - mongosDB.runCommand( - {aggregate: 1, pipeline: [{$changeStream: {}}], cursor: {batchSize: 1}}), - ErrorCodes.NamespaceNotFound); - const mongosColl = mongosDB[jsTestName()]; - mongosDB.createCollection(jsTestName()); let cst = new ChangeStreamTest(mongosDB); let cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1}); diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index 2f068165ed9..e00f26bebca 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -339,14 +339,8 @@ Status runAggregate(OperationContext* opCtx, uassertStatusOK(waitForReadConcern(opCtx, readConcern, true)); } - // If the change stream is opened against a database which does not exist yet, go ahead - // and create it. Use MODE_IX since the AutoGetOrCreateDb helper will automatically - // reacquire as MODE_X if the database does not exist. - AutoGetOrCreateDb dbLock(opCtx, origNss.db(), MODE_IX); - invariant(dbLock.getDb()); if (!origNss.isCollectionlessAggregateNS()) { - // AutoGetCollectionForReadCommand will raise an error if the given namespace is a - // view. + // AutoGetCollectionForReadCommand will raise an error if 'origNss' is a view. AutoGetCollectionForReadCommand origNssCtx(opCtx, origNss); // Resolve the collator to either the user-specified collation or the default diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp index 46cb784032a..919f3318c7e 100644 --- a/src/mongo/s/commands/cluster_aggregate.cpp +++ b/src/mongo/s/commands/cluster_aggregate.cpp @@ -149,48 +149,55 @@ Status appendCursorResponseToCommandResult(const ShardId& shardId, } bool mustRunOnAllShards(const NamespaceString& nss, - const CachedCollectionRoutingInfo& routingInfo, + const boost::optional<CachedCollectionRoutingInfo>& routingInfo, const LiteParsedPipeline& litePipe) { - // Any collectionless aggregation like a $currentOp, and a change stream on a sharded collection - // must run on all shards. - const bool nsIsSharded = static_cast<bool>(routingInfo.cm()); - return nss.isCollectionlessAggregateNS() || (nsIsSharded && litePipe.hasChangeStream()); + // Non-existent routing table is only valid for $changeStream aggregations which have been + // opened prior to the creation of the target database. + invariant(routingInfo || litePipe.hasChangeStream()); + // The following aggregations must be routed to all shards: + // - Any collectionless aggregation such as $currentOp + // - $changeStream on a non-existent database + // - $changeStream on a sharded collection + const bool dbExists = static_cast<bool>(routingInfo); + const bool nsIsSharded = dbExists && static_cast<bool>(routingInfo->cm()); + return !dbExists || nss.isCollectionlessAggregateNS() || + (nsIsSharded && litePipe.hasChangeStream()); } StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo(OperationContext* opCtx, - const NamespaceString& execNss, - CatalogCache* catalogCache) { - // This call to getCollectionRoutingInfo will return !OK if the database does not exist. - auto swRoutingInfo = catalogCache->getCollectionRoutingInfo(opCtx, execNss); - - // Collectionless aggregations, however, may be run on 'admin' (which should always exist) but - // are subsequently targeted towards the shards. If getCollectionRoutingInfo is OK, we perform a - // further check that at least one shard exists if the aggregation is collectionless. - if (swRoutingInfo.isOK() && execNss.isCollectionlessAggregateNS()) { - std::vector<ShardId> shardIds; - Grid::get(opCtx)->shardRegistry()->getAllShardIdsNoReload(&shardIds); - - if (shardIds.size() == 0) { - return {ErrorCodes::NamespaceNotFound, "No shards are present in the cluster"}; - } + const NamespaceString& execNss) { + // First, verify that there are shards present in the cluster. If not, then we return the + // stronger 'ShardNotFound' error rather than 'NamespaceNotFound'. We must do this because + // $changeStream aggregations ignore NamespaceNotFound in order to allow streams to be opened on + // a collection before its enclosing database is created. However, if there are no shards + // present, then $changeStream should immediately return an empty cursor just as other + // aggregations do when the database does not exist. + std::vector<ShardId> shardIds; + Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx, &shardIds); + if (shardIds.size() == 0) { + return {ErrorCodes::ShardNotFound, "No shards are present in the cluster"}; } - return swRoutingInfo; + // This call to getCollectionRoutingInfo will return !OK if the database does not exist. + return Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, execNss); } std::set<ShardId> getTargetedShards(OperationContext* opCtx, bool mustRunOnAllShards, - const CachedCollectionRoutingInfo& routingInfo, + const boost::optional<CachedCollectionRoutingInfo>& routingInfo, const BSONObj shardQuery, const BSONObj collation) { if (mustRunOnAllShards) { // The pipeline begins with a stage which must be run on all shards. std::vector<ShardId> shardIds; - Grid::get(opCtx)->shardRegistry()->getAllShardIdsNoReload(&shardIds); + Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx, &shardIds); return {shardIds.begin(), shardIds.end()}; } - return getTargetedShardsForQuery(opCtx, routingInfo, shardQuery, collation); + // If we don't need to run on all shards, then we should always have a valid routing table. + invariant(routingInfo); + + return getTargetedShardsForQuery(opCtx, *routingInfo, shardQuery, collation); } BSONObj createCommandForTargetedShards( @@ -264,21 +271,25 @@ BSONObj createCommandForMergingShard( return appendAllowImplicitCreate(mergeCmd.freeze().toBson(), true); } -std::vector<RemoteCursor> establishShardCursors(OperationContext* opCtx, - const NamespaceString& nss, - const LiteParsedPipeline& litePipe, - CachedCollectionRoutingInfo* routingInfo, - const BSONObj& cmdObj, - const ReadPreferenceSetting& readPref, - const BSONObj& shardQuery, - const BSONObj& collation) { +std::vector<RemoteCursor> establishShardCursors( + OperationContext* opCtx, + const NamespaceString& nss, + const LiteParsedPipeline& litePipe, + boost::optional<CachedCollectionRoutingInfo>& routingInfo, + const BSONObj& cmdObj, + const ReadPreferenceSetting& readPref, + const BSONObj& shardQuery, + const BSONObj& collation) { LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards"; - bool mustRunOnAll = mustRunOnAllShards(nss, *routingInfo, litePipe); + const bool mustRunOnAll = mustRunOnAllShards(nss, routingInfo, litePipe); std::set<ShardId> shardIds = - getTargetedShards(opCtx, mustRunOnAll, *routingInfo, shardQuery, collation); + getTargetedShards(opCtx, mustRunOnAll, routingInfo, shardQuery, collation); std::vector<std::pair<ShardId, BSONObj>> requests; + // If we don't need to run on all shards, then we should always have a valid routing table. + invariant(routingInfo || mustRunOnAll); + if (mustRunOnAll) { // The pipeline contains a stage which must be run on all shards. Skip versioning and // enqueue the raw command objects. @@ -399,20 +410,25 @@ DispatchShardPipelineResults dispatchShardPipeline( while (++numAttempts <= kMaxNumStaleVersionRetries) { // We need to grab a new routing table at the start of each iteration, since a stale config // exception will invalidate the previous one. - auto executionNsRoutingInfo = uassertStatusOK( - Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, executionNss)); + auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, executionNss); + + // If this is a $changeStream, we swallow NamespaceNotFound exceptions and continue. + // Otherwise, uassert on all exceptions here. + if (!(liteParsedPipeline.hasChangeStream() && + executionNsRoutingInfoStatus == ErrorCodes::NamespaceNotFound)) { + uassertStatusOK(executionNsRoutingInfoStatus); + } + + auto executionNsRoutingInfo = executionNsRoutingInfoStatus.isOK() + ? std::move(executionNsRoutingInfoStatus.getValue()) + : boost::optional<CachedCollectionRoutingInfo>{}; // Determine whether we can run the entire aggregation on a single shard. - bool mustRunOnAll = + const bool mustRunOnAll = mustRunOnAllShards(executionNss, executionNsRoutingInfo, liteParsedPipeline); std::set<ShardId> shardIds = getTargetedShards( opCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, aggRequest.getCollation()); - uassert(ErrorCodes::ShardNotFound, - "No targets were found for this aggregation. All shards were removed from the " - "cluster mid-operation", - shardIds.size() > 0); - auto atClusterTime = computeAtClusterTime( opCtx, mustRunOnAll, shardIds, executionNss, shardQuery, aggRequest.getCollation()); @@ -423,8 +439,8 @@ DispatchShardPipelineResults dispatchShardPipeline( // is not the primary. // - The pipeline contains one or more stages which must always merge on mongoS. const bool needsSplit = (shardIds.size() > 1u || needsMongosMerge || - (needsPrimaryShardMerge && - *(shardIds.begin()) != executionNsRoutingInfo.db().primaryId())); + (needsPrimaryShardMerge && executionNsRoutingInfo && + *shardIds.begin() != executionNsRoutingInfo->db().primaryId())); const bool isSplit = pipelineForTargetedShards->isSplitForShards(); @@ -444,7 +460,7 @@ DispatchShardPipelineResults dispatchShardPipeline( // Refresh the shard registry if we're targeting all shards. We need the shard registry // to be at least as current as the logical time used when creating the command for // $changeStream to work reliably, so we do a "hard" reload. - if (mustRunOnAllShards(executionNss, executionNsRoutingInfo, liteParsedPipeline)) { + if (mustRunOnAll) { auto* shardRegistry = Grid::get(opCtx)->shardRegistry(); if (!shardRegistry->reload(opCtx)) { shardRegistry->reload(opCtx); @@ -454,7 +470,7 @@ DispatchShardPipelineResults dispatchShardPipeline( // Explain does not produce a cursor, so instead we scatter-gather commands to the shards. try { if (expCtx->explain) { - if (mustRunOnAllShards(executionNss, executionNsRoutingInfo, liteParsedPipeline)) { + if (mustRunOnAll) { // Some stages (such as $currentOp) need to be broadcast to all shards, and // should not participate in the shard version protocol. shardResults = @@ -466,11 +482,12 @@ DispatchShardPipelineResults dispatchShardPipeline( } else { // Aggregations on a real namespace should use the routing table to target // shards, and should participate in the shard version protocol. + invariant(executionNsRoutingInfo); shardResults = scatterGatherVersionedTargetByRoutingTable( opCtx, executionNss.db(), executionNss, - executionNsRoutingInfo, + *executionNsRoutingInfo, targetedCommand, ReadPreferenceSetting::get(opCtx), Shard::RetryPolicy::kIdempotent, @@ -481,7 +498,7 @@ DispatchShardPipelineResults dispatchShardPipeline( cursors = establishShardCursors(opCtx, executionNss, liteParsedPipeline, - &executionNsRoutingInfo, + executionNsRoutingInfo, targetedCommand, ReadPreferenceSetting::get(opCtx), shardQuery, @@ -502,8 +519,9 @@ DispatchShardPipelineResults dispatchShardPipeline( // Record the number of shards involved in the aggregation. If we are required to merge on // the primary shard, but the primary shard was not in the set of targeted shards, then we // must increment the number of involved shards. - CurOp::get(opCtx)->debug().nShards = shardIds.size() + - (needsPrimaryShardMerge && !shardIds.count(executionNsRoutingInfo.db().primaryId())); + CurOp::get(opCtx)->debug().nShards = + shardIds.size() + (needsPrimaryShardMerge && executionNsRoutingInfo && + !shardIds.count(executionNsRoutingInfo->db().primaryId())); break; // Success! } @@ -687,224 +705,274 @@ ShardId pickMergingShard(OperationContext* opCtx, .toString(); } -} // namespace - -Status ClusterAggregate::runAggregate(OperationContext* opCtx, - const Namespaces& namespaces, - const AggregationRequest& request, - BSONObj cmdObj, - BSONObjBuilder* result) { - const auto catalogCache = Grid::get(opCtx)->catalogCache(); - - auto executionNsRoutingInfoStatus = - getExecutionNsRoutingInfo(opCtx, namespaces.executionNss, catalogCache); - - LiteParsedPipeline liteParsedPipeline(request); - - if (!executionNsRoutingInfoStatus.isOK()) { - // Standard aggregations swallow 'NamespaceNotFound' and return an empty cursor with id 0 in - // the event that the database does not exist. For $changeStream aggregations, however, we - // throw the exception in all error cases, including that of a non-existent database. - if (liteParsedPipeline.hasChangeStream()) { - uassertStatusOKWithContext(executionNsRoutingInfoStatus.getStatus(), - "failed to open $changeStream"); - } - appendEmptyResultSet( - opCtx, *result, executionNsRoutingInfoStatus.getStatus(), namespaces.requestedNss.ns()); - - return Status::OK(); - } - - auto executionNsRoutingInfo = executionNsRoutingInfoStatus.getValue(); - - // Determine the appropriate collation and 'resolve' involved namespaces to make the - // ExpressionContext. - - // We won't try to execute anything on a mongos, but we still have to populate this map so that - // any $lookups, etc. will be able to have a resolved view definition. It's okay that this is - // incorrect, we will repopulate the real resolved namespace map on the mongod. Note that we - // need to check if any involved collections are sharded before forwarding an aggregation - // command on an unsharded collection. +// "Resolve" involved namespaces and verify that none of them are sharded. We won't try to execute +// anything on a mongos, but we still have to populate this map so that any $lookups, etc. will be +// able to have a resolved view definition. It's okay that this is incorrect, we will repopulate the +// real namespace map on the mongod. Note that this function must be called before forwarding an +// aggregation command on an unsharded collection, in order to validate that none of the involved +// collections are sharded. +StringMap<ExpressionContext::ResolvedNamespace> resolveInvolvedNamespaces( + OperationContext* opCtx, const LiteParsedPipeline& litePipe) { StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces; - - for (auto&& nss : liteParsedPipeline.getInvolvedNamespaces()) { + for (auto&& nss : litePipe.getInvolvedNamespaces()) { const auto resolvedNsRoutingInfo = - uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss)); + uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); uassert( 28769, str::stream() << nss.ns() << " cannot be sharded", !resolvedNsRoutingInfo.cm()); resolvedNamespaces.try_emplace(nss.coll(), nss, std::vector<BSONObj>{}); } + return resolvedNamespaces; +} - // If this pipeline is on an unsharded collection, is allowed to be forwarded to shards, does - // not need to run on all shards, and doesn't need transformation via - // DocumentSource::serialize(), then go ahead and pass it through to the owning shard - // unmodified. - if (!executionNsRoutingInfo.cm() && - !mustRunOnAllShards(namespaces.executionNss, executionNsRoutingInfo, liteParsedPipeline) && - liteParsedPipeline.allowedToForwardFromMongos() && - liteParsedPipeline.allowedToPassthroughFromMongos()) { - return aggPassthrough(opCtx, - namespaces, - executionNsRoutingInfo.db().primary()->getId(), - cmdObj, - request, - liteParsedPipeline, - result); - } - +// Build an appropriate ExpressionContext for the pipeline. This helper validates that all involved +// namespaces are unsharded, obtains the appropriate user-defined or default collator, and creates a +// MongoProcessInterface for use by the pipeline's stages. +boost::intrusive_ptr<ExpressionContext> makeExpressionContext( + OperationContext* opCtx, + const NamespaceString& executionNss, + const AggregationRequest& request, + const LiteParsedPipeline& litePipe, + const boost::optional<CachedCollectionRoutingInfo>& routingInfo) { + // Determine the appropriate collation for the ExpressionContext. + const bool collectionIsSharded = (routingInfo && routingInfo->cm()); + const bool collectionIsNotSharded = (routingInfo && !routingInfo->cm()); std::unique_ptr<CollatorInterface> collation; + if (!request.getCollation().isEmpty()) { collation = uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext()) ->makeFromBSON(request.getCollation())); - } else if (const auto chunkMgr = executionNsRoutingInfo.cm()) { - if (chunkMgr->getDefaultCollator()) { - collation = chunkMgr->getDefaultCollator()->clone(); + } else if (collectionIsSharded) { + if (routingInfo->cm()->getDefaultCollator()) { + collation = routingInfo->cm()->getDefaultCollator()->clone(); } - } else { - // Unsharded collection. Get collection metadata from primary chunk. + } else if (collectionIsNotSharded) { + // Get collection metadata from primary chunk. auto collationObj = getDefaultCollationForUnshardedCollection( - executionNsRoutingInfo.db().primary().get(), namespaces.executionNss); + routingInfo->db().primary().get(), executionNss); if (!collationObj.isEmpty()) { collation = uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext()) ->makeFromBSON(collationObj)); } } - boost::intrusive_ptr<ExpressionContext> mergeCtx = - new ExpressionContext(opCtx, - request, - std::move(collation), - std::make_shared<PipelineS::MongoSInterface>(), - std::move(resolvedNamespaces)); + // Create the expression context, and set 'inMongos' to true before returning it. We explicitly + // do *not* set mergeCtx->tempDir. Note that we resolve the pipeline's involved namespaces here, + // validating that none of them are sharded. + auto mergeCtx = new ExpressionContext(opCtx, + request, + std::move(collation), + std::make_shared<PipelineS::MongoSInterface>(), + resolveInvolvedNamespaces(opCtx, litePipe)); mergeCtx->inMongos = true; - // explicitly *not* setting mergeCtx->tempDir - - auto pipeline = uassertStatusOK(Pipeline::parse(request.getPipeline(), mergeCtx)); - pipeline->optimizePipeline(); + return mergeCtx; +} - // Check whether the entire pipeline must be run on mongoS. - if (pipeline->requiredToRunOnMongos()) { - uassert(ErrorCodes::IllegalOperation, - str::stream() << "Aggregation pipeline must be run on mongoS, but " - << pipeline->getSources().front()->getSourceName() - << " is not capable of producing input", +// Runs a pipeline on mongoS, having first validated that it is eligible to do so. This can be a +// pipeline which is split for merging, or an intact pipeline which must run entirely on mongoS. +Status runPipelineOnMongoS(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const ClusterAggregate::Namespaces& namespaces, + const AggregationRequest& request, + BSONObj cmdObj, + const LiteParsedPipeline& litePipe, + std::unique_ptr<Pipeline, PipelineDeleter> pipeline, + std::vector<RemoteCursor>&& cursors, + BSONObjBuilder* result) { + // We should never receive a pipeline intended for the shards, or which cannot run on mongoS. + invariant(!pipeline->isSplitForShards()); + invariant(pipeline->canRunOnMongos()); + + const auto& requestedNss = namespaces.requestedNss; + const auto opCtx = expCtx->opCtx; + + // If this is an unsplit mongoS-only pipeline, verify that the first stage can produce input for + // the remainder of the pipeline. + uassert(ErrorCodes::IllegalOperation, + str::stream() << "Aggregation pipeline must be run on mongoS, but " + << pipeline->getSources().front()->getSourceName() + << " is not capable of producing input", + pipeline->isSplitForMerge() || !pipeline->getSources().front()->constraints().requiresInputDocSource); - if (mergeCtx->explain) { - *result << "splitPipeline" << BSONNULL << "mongos" - << Document{{"host", getHostNameCachedAndPort()}, - {"stages", pipeline->writeExplainOps(*mergeCtx->explain)}}; - return Status::OK(); - } - - auto cursorResponse = establishMergingMongosCursor(opCtx, - request, - namespaces.requestedNss, - cmdObj, - liteParsedPipeline, - std::move(pipeline), - {}); - CommandHelpers::filterCommandReplyForPassthrough(cursorResponse, result); - return getStatusFromCommandResult(result->asTempObj()); - } - - auto dispatchResults = dispatchShardPipeline(mergeCtx, - namespaces.executionNss, - cmdObj, - request, - liteParsedPipeline, - std::move(pipeline)); - - if (mergeCtx->explain) { - // If we reach here, we've either succeeded in running the explain or exhausted all - // attempts. In either case, attempt to append the explain results to the output builder. - uassertAllShardsSupportExplain(dispatchResults.remoteExplainOutput); - - return appendExplainResults(std::move(dispatchResults.remoteExplainOutput), - mergeCtx, - dispatchResults.pipelineForTargetedShards, - dispatchResults.pipelineForMerging, - result); + // If this is an explain and the pipeline is not split, write the explain output and return. + if (expCtx->explain && !pipeline->isSplitForMerge()) { + *result << "splitPipeline" << BSONNULL << "mongos" + << Document{{"host", getHostNameCachedAndPort()}, + {"stages", pipeline->writeExplainOps(*expCtx->explain)}}; + return Status::OK(); } + // Register the new mongoS cursor, and retrieve the initial batch of results. + auto cursorResponse = establishMergingMongosCursor( + opCtx, request, requestedNss, cmdObj, litePipe, std::move(pipeline), std::move(cursors)); - invariant(dispatchResults.remoteCursors.size() > 0); - - // If we dispatched to a single shard, store the remote cursor and return immediately. - if (!dispatchResults.pipelineForTargetedShards->isSplitForShards()) { - invariant(dispatchResults.remoteCursors.size() == 1); - const auto& remoteCursor = dispatchResults.remoteCursors[0]; - auto executorPool = Grid::get(opCtx)->getExecutorPool(); - const BSONObj reply = uassertStatusOK(storePossibleCursor( - opCtx, - remoteCursor.getShardId().toString(), - remoteCursor.getHostAndPort(), - remoteCursor.getCursorResponse().toBSON(CursorResponse::ResponseType::InitialResponse), - namespaces.requestedNss, - executorPool->getArbitraryExecutor(), - Grid::get(opCtx)->getCursorManager(), - mergeCtx->tailableMode)); + // We don't need to storePossibleCursor or propagate writeConcern errors; an $out pipeline + // can never run on mongoS. Filter the command response and return immediately. + CommandHelpers::filterCommandReplyForPassthrough(cursorResponse, result); + return getStatusFromCommandResult(result->asTempObj()); +} - return appendCursorResponseToCommandResult( - remoteCursor.getShardId().toString(), reply, result); - } +Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const ClusterAggregate::Namespaces& namespaces, + const AggregationRequest& request, + BSONObj cmdObj, + const LiteParsedPipeline& litePipe, + const boost::optional<CachedCollectionRoutingInfo>& routingInfo, + DispatchShardPipelineResults& shardDispatchResults, + BSONObjBuilder* result) { + // We should never be in a situation where we call this function on a non-merge pipeline. + auto& mergingPipeline = shardDispatchResults.pipelineForMerging; + invariant(mergingPipeline && mergingPipeline->isSplitForMerge()); - // If we reach here, we have a merge pipeline to dispatch. - auto mergingPipeline = std::move(dispatchResults.pipelineForMerging); - invariant(mergingPipeline); + const auto opCtx = expCtx->opCtx; // First, check whether we can merge on the mongoS. If the merge pipeline MUST run on mongoS, // then ignore the internalQueryProhibitMergingOnMongoS parameter. if (mergingPipeline->requiredToRunOnMongos() || (!internalQueryProhibitMergingOnMongoS.load() && mergingPipeline->canRunOnMongos())) { - // Register the new mongoS cursor, and retrieve the initial batch of results. - auto cursorResponse = - establishMergingMongosCursor(opCtx, - request, - namespaces.requestedNss, - dispatchResults.commandForTargetedShards, - liteParsedPipeline, - std::move(mergingPipeline), - std::move(dispatchResults.remoteCursors)); - - // We don't need to storePossibleCursor or propagate writeConcern errors; an $out pipeline - // can never run on mongoS. Filter the command response and return immediately. - CommandHelpers::filterCommandReplyForPassthrough(cursorResponse, result); - return getStatusFromCommandResult(result->asTempObj()); + return runPipelineOnMongoS(expCtx, + namespaces, + request, + shardDispatchResults.commandForTargetedShards, + litePipe, + std::move(mergingPipeline), + std::move(shardDispatchResults.remoteCursors), + result); } + // If we are not merging on mongoS, then this is not a $changeStream aggregation, and we + // therefore must have a valid routing table. + invariant(routingInfo); + // TODO SERVER-33683 allowing an aggregation within a transaction can lead to a deadlock in the // SessionCatalog when a pipeline with a $mergeCursors sends a getMore to itself. uassert(50732, "Cannot specify a transaction number in combination with an aggregation on mongos when " - "merigng on a shard", + "merging on a shard", !opCtx->getTxnNumber()); + ShardId mergingShardId = - pickMergingShard(opCtx, dispatchResults, executionNsRoutingInfo.db().primaryId()); + pickMergingShard(opCtx, shardDispatchResults, routingInfo->db().primaryId()); cluster_aggregation_planner::addMergeCursorsSource( mergingPipeline.get(), - std::move(dispatchResults.remoteCursors), + std::move(shardDispatchResults.remoteCursors), Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor()); - auto mergeCmdObj = createCommandForMergingShard(request, mergeCtx, cmdObj, mergingPipeline); + auto mergeCmdObj = createCommandForMergingShard(request, expCtx, cmdObj, mergingPipeline); + + // Dispatch $mergeCursors to the chosen shard, store the resulting cursor, and return. auto mergeResponse = establishMergingShardCursor(opCtx, namespaces.executionNss, mergeCmdObj, mergingShardId); - // The merging shard is remote, so if a response was received, a HostAndPort must have been set. - invariant(mergeResponse.hostAndPort); - auto mergeCursorResponse = uassertStatusOK( - storePossibleCursor(opCtx, - mergingShardId, - *mergeResponse.hostAndPort, - mergeResponse.response, - namespaces.requestedNss, - Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), - Grid::get(opCtx)->getCursorManager())); + auto mergeCursorResponse = uassertStatusOK(storePossibleCursor( + opCtx, namespaces.requestedNss, mergingShardId, mergeResponse, expCtx->tailableMode)); return appendCursorResponseToCommandResult(mergingShardId, mergeCursorResponse, result); } +void appendEmptyResultSetWithStatus(OperationContext* opCtx, + const NamespaceString& nss, + Status status, + BSONObjBuilder* result) { + // Rewrite ShardNotFound as NamespaceNotFound so that appendEmptyResultSet swallows it. + if (status == ErrorCodes::ShardNotFound) { + status = {ErrorCodes::NamespaceNotFound, status.reason()}; + } + appendEmptyResultSet(opCtx, *result, status, nss.ns()); +} + +} // namespace + +Status ClusterAggregate::runAggregate(OperationContext* opCtx, + const Namespaces& namespaces, + const AggregationRequest& request, + BSONObj cmdObj, + BSONObjBuilder* result) { + auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, namespaces.executionNss); + boost::optional<CachedCollectionRoutingInfo> routingInfo; + LiteParsedPipeline litePipe(request); + + // If the routing table is valid, we obtain a reference to it. If the table is not valid, then + // either the database does not exist, or there are no shards in the cluster. In the latter + // case, we always return an empty cursor. In the former case, if the requested aggregation is a + // $changeStream, we allow the operation to continue so that stream cursors can be established + // on the given namespace before the database or collection is actually created. If the database + // does not exist and this is not a $changeStream, then we return an empty cursor. + if (executionNsRoutingInfoStatus.isOK()) { + routingInfo = std::move(executionNsRoutingInfoStatus.getValue()); + } else if (!(litePipe.hasChangeStream() && + executionNsRoutingInfoStatus == ErrorCodes::NamespaceNotFound)) { + appendEmptyResultSetWithStatus( + opCtx, namespaces.requestedNss, executionNsRoutingInfoStatus.getStatus(), result); + return Status::OK(); + } + + // Determine whether this aggregation must be dispatched to all shards in the cluster. + const bool mustRunOnAll = mustRunOnAllShards(namespaces.executionNss, routingInfo, litePipe); + + // If we don't have a routing table, then this is a $changeStream which must run on all shards. + invariant(routingInfo || (mustRunOnAll && litePipe.hasChangeStream())); + + // If this pipeline is not on a sharded collection, is allowed to be forwarded to shards, does + // not need to run on all shards, and doesn't need to go through DocumentSource::serialize(), + // then go ahead and pass it through to the owning shard unmodified. Note that we first call + // resolveInvolvedNamespaces to validate that none of the namespaces are sharded. + if (routingInfo && !routingInfo->cm() && !mustRunOnAll && + litePipe.allowedToForwardFromMongos() && litePipe.allowedToPassthroughFromMongos()) { + resolveInvolvedNamespaces(opCtx, litePipe); + const auto primaryShardId = routingInfo->db().primary()->getId(); + return aggPassthrough(opCtx, namespaces, primaryShardId, cmdObj, request, litePipe, result); + } + + // Build an ExpressionContext for the pipeline. This instantiates an appropriate collator, + // resolves all involved namespaces, and creates a shared MongoProcessInterface for use by the + // pipeline's stages. + auto expCtx = + makeExpressionContext(opCtx, namespaces.executionNss, request, litePipe, routingInfo); + + // Parse and optimize the full pipeline. + auto pipeline = uassertStatusOK(Pipeline::parse(request.getPipeline(), expCtx)); + pipeline->optimizePipeline(); + + // Check whether the entire pipeline must be run on mongoS. + if (pipeline->requiredToRunOnMongos()) { + return runPipelineOnMongoS( + expCtx, namespaces, request, cmdObj, litePipe, std::move(pipeline), {}, result); + } + + // If not, split the pipeline as necessary and dispatch to the relevant shards. + auto shardDispatchResults = dispatchShardPipeline( + expCtx, namespaces.executionNss, cmdObj, request, litePipe, std::move(pipeline)); + + // If the operation is an explain, then we verify that it succeeded on all targeted shards, + // write the results to the output builder, and return immediately. + if (expCtx->explain) { + uassertAllShardsSupportExplain(shardDispatchResults.remoteExplainOutput); + return appendExplainResults(std::move(shardDispatchResults.remoteExplainOutput), + expCtx, + shardDispatchResults.pipelineForTargetedShards, + shardDispatchResults.pipelineForMerging, + result); + } + + // If this isn't an explain, then we must have established cursors on at least one shard. + invariant(shardDispatchResults.remoteCursors.size() > 0); + + // If we sent the entire pipeline to a single shard, store the remote cursor and return. + if (!shardDispatchResults.pipelineForTargetedShards->isSplitForShards()) { + invariant(shardDispatchResults.remoteCursors.size() == 1); + auto& remoteCursor = shardDispatchResults.remoteCursors.front(); + const auto reply = uassertStatusOK(storePossibleCursor( + opCtx, namespaces.requestedNss, remoteCursor, expCtx->tailableMode)); + return appendCursorResponseToCommandResult( + remoteCursor.getShardId().toString(), reply, result); + } + + // If we reach here, we have a merge pipeline to dispatch. + return dispatchMergingPipeline( + expCtx, namespaces, request, cmdObj, litePipe, routingInfo, shardDispatchResults, result); +} + void ClusterAggregate::uassertAllShardsSupportExplain( const std::vector<AsyncRequestsSender::Response>& shardResults) { for (const auto& result : shardResults) { @@ -969,19 +1037,11 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx, // If this was an explain, then we get back an explain result object rather than a cursor. result = cmdResponse.response; } else { - // The merging shard is remote, so if a response was received, a HostAndPort must have been - // set. - invariant(cmdResponse.hostAndPort); + auto tailMode = liteParsedPipeline.hasChangeStream() + ? TailableModeEnum::kTailableAndAwaitData + : TailableModeEnum::kNormal; result = uassertStatusOK(storePossibleCursor( - opCtx, - shard->getId(), - *cmdResponse.hostAndPort, - cmdResponse.response, - namespaces.requestedNss, - Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), - Grid::get(opCtx)->getCursorManager(), - liteParsedPipeline.hasChangeStream() ? TailableModeEnum::kTailableAndAwaitData - : TailableModeEnum::kNormal)); + opCtx, namespaces.requestedNss, shard->getId(), cmdResponse, tailMode)); } // First append the properly constructed writeConcernError. It will then be skipped diff --git a/src/mongo/s/query/store_possible_cursor.cpp b/src/mongo/s/query/store_possible_cursor.cpp index fa67241d1cd..28db3452a29 100644 --- a/src/mongo/s/query/store_possible_cursor.cpp +++ b/src/mongo/s/query/store_possible_cursor.cpp @@ -35,6 +35,8 @@ #include "mongo/db/auth/authorization_session.h" #include "mongo/db/curop.h" #include "mongo/db/query/cursor_response.h" +#include "mongo/executor/task_executor_pool.h" +#include "mongo/s/grid.h" #include "mongo/s/query/cluster_client_cursor_impl.h" #include "mongo/s/query/cluster_client_cursor_params.h" #include "mongo/s/query/cluster_cursor_manager.h" @@ -43,6 +45,39 @@ namespace mongo { StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx, + const NamespaceString& requestedNss, + const RemoteCursor& remoteCursor, + TailableModeEnum tailableMode) { + auto executorPool = Grid::get(opCtx)->getExecutorPool(); + return storePossibleCursor( + opCtx, + remoteCursor.getShardId().toString(), + remoteCursor.getHostAndPort(), + remoteCursor.getCursorResponse().toBSON(CursorResponse::ResponseType::InitialResponse), + requestedNss, + executorPool->getArbitraryExecutor(), + Grid::get(opCtx)->getCursorManager(), + tailableMode); +} + +StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx, + const NamespaceString& requestedNss, + const ShardId& shardId, + const Shard::CommandResponse& commandResponse, + TailableModeEnum tailableMode) { + invariant(commandResponse.hostAndPort); + auto executorPool = Grid::get(opCtx)->getExecutorPool(); + return storePossibleCursor(opCtx, + shardId, + *commandResponse.hostAndPort, + commandResponse.response, + requestedNss, + executorPool->getArbitraryExecutor(), + Grid::get(opCtx)->getCursorManager(), + tailableMode); +} + +StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx, const ShardId& shardId, const HostAndPort& server, const BSONObj& cmdResult, diff --git a/src/mongo/s/query/store_possible_cursor.h b/src/mongo/s/query/store_possible_cursor.h index b9756be44f7..a364744a386 100644 --- a/src/mongo/s/query/store_possible_cursor.h +++ b/src/mongo/s/query/store_possible_cursor.h @@ -31,12 +31,13 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/query/tailable_mode.h" -#include "mongo/s/shard_id.h" +#include "mongo/s/client/shard.h" namespace mongo { class BSONObj; class ClusterCursorManager; +class RemoteCursor; template <typename T> class StatusWith; struct HostAndPort; @@ -76,4 +77,22 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx, ClusterCursorManager* cursorManager, TailableModeEnum tailableMode = TailableModeEnum::kNormal); +/** + * Convenience function which extracts all necessary information from the passed RemoteCursor, and + * stores a ClusterClientCursor based on it. + */ +StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx, + const NamespaceString& requestedNss, + const RemoteCursor& remoteCursor, + TailableModeEnum tailableMode); + +/** + * Convenience function which extracts all necessary information from the passed CommandResponse, + * and stores a ClusterClientCursor based on it. + */ +StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx, + const NamespaceString& requestedNss, + const ShardId& shardId, + const Shard::CommandResponse& commandResponse, + TailableModeEnum tailableMode); } // namespace mongo |