diff options
20 files changed, 406 insertions, 12 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml index 695139b998c..7558224e2af 100644 --- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml +++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml @@ -38,6 +38,8 @@ selector: - jstests/sharding/change_stream_lookup_single_shard_cluster.js - jstests/sharding/change_stream_remove_shard.js - jstests/sharding/change_stream_update_lookup_collation.js + - jstests/sharding/change_stream_update_lookup_read_concern.js + - jstests/sharding/change_stream_read_preference.js - jstests/sharding/change_streams.js - jstests/sharding/change_streams_establishment_finds_new_shards.js - jstests/sharding/change_streams_shards_start_in_sync.js diff --git a/jstests/sharding/change_stream_read_preference.js b/jstests/sharding/change_stream_read_preference.js new file mode 100644 index 00000000000..572ecf82424 --- /dev/null +++ b/jstests/sharding/change_stream_read_preference.js @@ -0,0 +1,131 @@ +// Tests that change streams and their update lookups obey the read preference specified by the +// user. +(function() { + "use strict"; + + load('jstests/libs/profiler.js'); // For various profiler helpers. + + // For supportsMajorityReadConcern. + load('jstests/multiVersion/libs/causal_consistency_helpers.js'); + + // This test only works on storage engines that support committed reads, skip it if the + // configured engine doesn't support it. + if (!supportsMajorityReadConcern()) { + jsTestLog("Skipping test since storage engine doesn't support majority read concern."); + return; + } + + const st = new ShardingTest({ + name: "change_stream_read_pref", + shards: 2, + rs: { + nodes: 2, + // Use a higher frequency for periodic noops to speed up the test. + setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: true} + }, + }); + + const dbName = jsTestName(); + const mongosDB = st.s0.getDB(dbName); + const mongosColl = mongosDB[jsTestName()]; + + // Enable sharding on the test DB and ensure its primary is shard0000. + assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()})); + st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL()); + + // Shard the test collection on _id. + assert.commandWorked( + mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}})); + + // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey]. + assert.commandWorked( + mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}})); + + // Move the [0, MaxKey] chunk to shard0001. + assert.commandWorked(mongosDB.adminCommand( + {moveChunk: mongosColl.getFullName(), find: {_id: 1}, to: st.rs1.getURL()})); + + // Turn on the profiler. + for (let rs of[st.rs0, st.rs1]) { + assert.commandWorked(rs.getPrimary().getDB(dbName).setProfilingLevel(2)); + assert.commandWorked(rs.getSecondary().getDB(dbName).setProfilingLevel(2)); + } + + // Write a document to each chunk. + assert.writeOK(mongosColl.insert({_id: -1}, {writeConcern: {w: "majority"}})); + assert.writeOK(mongosColl.insert({_id: 1}, {writeConcern: {w: "majority"}})); + + // Test that change streams go to the primary by default. + let changeStreamComment = "change stream against primary"; + const primaryStream = mongosColl.aggregate([{$changeStream: {fullDocument: "updateLookup"}}], + {comment: changeStreamComment}); + + assert.writeOK(mongosColl.update({_id: -1}, {$set: {updated: true}})); + assert.writeOK(mongosColl.update({_id: 1}, {$set: {updated: true}})); + + assert.soon(() => primaryStream.hasNext()); + assert.eq(primaryStream.next().fullDocument, {_id: -1, updated: true}); + assert.soon(() => primaryStream.hasNext()); + assert.eq(primaryStream.next().fullDocument, {_id: 1, updated: true}); + + for (let rs of[st.rs0, st.rs1]) { + const primaryDB = rs.getPrimary().getDB(dbName); + // Test that the change stream itself goes to the primary. There might be more than one if + // we needed multiple getMores to retrieve the changes. + // TODO SERVER-31650 We have to use 'originatingCommand' here and look for the getMore + // because the initial aggregate will not show up. + profilerHasAtLeastOneMatchingEntryOrThrow( + {profileDB: primaryDB, filter: {'originatingCommand.comment': changeStreamComment}}); + + // Test that the update lookup goes to the primary as well. + profilerHasSingleMatchingEntryOrThrow({ + profileDB: primaryDB, + filter: { + op: "query", + ns: mongosColl.getFullName(), "command.comment": changeStreamComment + } + }); + } + + primaryStream.close(); + + // Test that change streams go to the secondary when the readPreference is {mode: "secondary"}. + changeStreamComment = 'change stream against secondary'; + const secondaryStream = + mongosColl.aggregate([{$changeStream: {fullDocument: "updateLookup"}}], + {comment: changeStreamComment, $readPreference: {mode: "secondary"}}); + + assert.writeOK(mongosColl.update({_id: -1}, {$set: {updatedCount: 2}})); + assert.writeOK(mongosColl.update({_id: 1}, {$set: {updatedCount: 2}})); + + assert.soon(() => secondaryStream.hasNext()); + assert.eq(secondaryStream.next().fullDocument, {_id: -1, updated: true, updatedCount: 2}); + assert.soon(() => secondaryStream.hasNext()); + assert.eq(secondaryStream.next().fullDocument, {_id: 1, updated: true, updatedCount: 2}); + + for (let rs of[st.rs0, st.rs1]) { + const secondaryDB = rs.getSecondary().getDB(dbName); + // Test that the change stream itself goes to the secondary. There might be more than one if + // we needed multiple getMores to retrieve the changes. + // TODO SERVER-31650 We have to use 'originatingCommand' here and look for the getMore + // because the initial aggregate will not show up. + profilerHasAtLeastOneMatchingEntryOrThrow( + {profileDB: secondaryDB, filter: {'originatingCommand.comment': changeStreamComment}}); + + // Test that the update lookup goes to the secondary as well. + profilerHasSingleMatchingEntryOrThrow({ + profileDB: secondaryDB, + filter: { + op: "query", + ns: mongosColl.getFullName(), "command.comment": changeStreamComment, + // We need to filter out any profiler entries with a stale config - this is the + // first read on this secondary with a readConcern specified, so it is the first + // read on this secondary that will enforce shard version. + exceptionCode: {$ne: ErrorCodes.StaleConfig} + } + }); + } + + secondaryStream.close(); + st.stop(); +}()); diff --git a/jstests/sharding/change_stream_update_lookup_read_concern.js b/jstests/sharding/change_stream_update_lookup_read_concern.js new file mode 100644 index 00000000000..f90e51fd17c --- /dev/null +++ b/jstests/sharding/change_stream_update_lookup_read_concern.js @@ -0,0 +1,205 @@ +// Tests that a change stream's update lookup will use the appropriate read concern. In particular, +// tests that the update lookup will return a version of the document at least as recent as the +// change that we're doing the lookup for, and that change will be majority-committed. +(function() { + "use strict"; + + load('jstests/replsets/rslib.js'); // For startSetIfSupportsReadMajority. + load("jstests/libs/profiler.js"); // For profilerHas*OrThrow() helpers. + load("jstests/replsets/rslib.js"); // For reconfig(). + + // For stopServerReplication() and restartServerReplication(). + load("jstests/libs/write_concern_util.js"); + + // Configure a replica set to have nodes with specific tags - we will eventually add this as + // part of a sharded cluster. + const rsNodeOptions = { + setParameter: { + writePeriodicNoops: true, + // Note we do not configure the periodic noop writes to be more frequent as we do to + // speed up other change streams tests, since we provide an array of individually + // configured nodes, in order to know which nodes have which tags. This requires a step + // up command to happen, which requires all nodes to agree on an op time. With the + // periodic noop writer at a high frequency, this can potentially never finish. + }, + shardsvr: "", + }; + const replSetName = jsTestName(); + const rst = new ReplSetTest({ + name: replSetName, + nodes: [ + {rsConfig: {priority: 1, tags: {tag: "primary"}}}, + {rsConfig: {priority: 0, tags: {tag: "closestSecondary"}}}, + {rsConfig: {priority: 0, tags: {tag: "fartherSecondary"}}} + ], + nodeOptions: rsNodeOptions, + }); + + if (!startSetIfSupportsReadMajority(rst)) { + jsTestLog("Skipping test since storage engine doesn't support majority read concern."); + return; + } + rst.initiate(); + rst.awaitSecondaryNodes(); + + // Start the sharding test and add the replica set. + const st = new ShardingTest({manualAddShard: true}); + assert.commandWorked(st.s.adminCommand({addShard: replSetName + "/" + rst.getPrimary().host})); + + const mongosDB = st.s0.getDB(jsTestName()); + const mongosColl = mongosDB[jsTestName()]; + + // Shard the collection to ensure the change stream will perform update lookup from mongos. + assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()})); + assert.commandWorked( + mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}})); + + assert.writeOK(mongosColl.insert({_id: 1})); + rst.awaitReplication(); + + // Make sure reads with read preference tag 'closestSecondary' go to the tagged secondary. + const closestSecondary = rst.nodes[1]; + const closestSecondaryDB = closestSecondary.getDB(mongosDB.getName()); + assert.commandWorked(closestSecondaryDB.setProfilingLevel(2)); + + // We expect the tag to ensure there is only one node to choose from, so the actual read + // preference doesn't really matter - we use 'nearest' throughout. + assert.eq(mongosColl.find() + .readPref("nearest", [{tag: "closestSecondary"}]) + .comment("testing targeting") + .itcount(), + 1); + profilerHasSingleMatchingEntryOrThrow({ + profileDB: closestSecondaryDB, + filter: {ns: mongosColl.getFullName(), "command.comment": "testing targeting"} + }); + + const changeStreamComment = "change stream against closestSecondary"; + const changeStream = mongosColl.aggregate([{$changeStream: {fullDocument: "updateLookup"}}], { + comment: changeStreamComment, + $readPreference: {mode: "nearest", tags: [{tag: "closestSecondary"}]} + }); + assert.writeOK(mongosColl.update({_id: 1}, {$set: {updatedCount: 1}})); + assert.soon(() => changeStream.hasNext()); + let latestChange = changeStream.next(); + assert.eq(latestChange.operationType, "update"); + assert.docEq(latestChange.fullDocument, {_id: 1, updatedCount: 1}); + + // Test that the change stream itself goes to the secondary. There might be more than one if we + // needed multiple getMores to retrieve the changes. + // TODO SERVER-31650 We have to use 'originatingCommand' here and look for the getMore because + // the initial aggregate will not show up. + profilerHasAtLeastOneMatchingEntryOrThrow({ + profileDB: closestSecondaryDB, + filter: {"originatingCommand.comment": changeStreamComment} + }); + + // Test that the update lookup goes to the secondary as well. + profilerHasSingleMatchingEntryOrThrow({ + profileDB: closestSecondaryDB, + filter: { + op: "query", + ns: mongosColl.getFullName(), + "command.filter._id": 1, + "command.comment": changeStreamComment, + // We need to filter out any profiler entries with a stale config - this is the first + // read on this secondary with a readConcern specified, so it is the first read on this + // secondary that will enforce shard version. + exceptionCode: {$ne: ErrorCodes.StaleConfig} + }, + errorMsgFilter: {ns: mongosColl.getFullName()}, + errorMsgProj: {ns: 1, op: 1, command: 1}, + }); + + // Now add a new secondary which is "closer" (add the "closestSecondary" tag to that secondary, + // and remove it from the old node with that tag) to force update lookups target a different + // node than the change stream itself. + let rsConfig = rst.getReplSetConfig(); + rsConfig.members[1].tags = {tag: "fartherSecondary"}; + rsConfig.members[2].tags = {tag: "closestSecondary"}; + rsConfig.version = rst.getReplSetConfigFromNode().version + 1; + reconfig(rst, rsConfig); + rst.awaitSecondaryNodes(); + const newClosestSecondary = rst.nodes[2]; + const newClosestSecondaryDB = newClosestSecondary.getDB(mongosDB.getName()); + const originalClosestSecondaryDB = closestSecondaryDB; + + // Wait for the mongos to acknowledge the new tags from our reconfig. + awaitRSClientHosts(st.s, + newClosestSecondary, + {ok: true, secondary: true, tags: {tag: "closestSecondary"}}, + rst); + assert.commandWorked(newClosestSecondaryDB.setProfilingLevel(2)); + + // Make sure new queries with read preference tag "closestSecondary" go to the new secondary. + assert.eq(newClosestSecondaryDB.system.profile.count(), 0); + assert.eq(mongosColl.find() + .readPref("nearest", [{tag: "closestSecondary"}]) + .comment("testing targeting") + .itcount(), + 1); + assert.gt(newClosestSecondaryDB.system.profile.count( + {ns: mongosColl.getFullName(), "command.comment": "testing targeting"}), + 0); + + // Test that the change stream continues on the original host, but the update lookup now targets + // the new, lagged secondary. Even though it's lagged, the lookup should use 'afterClusterTime' + // to ensure it does not return until the node can see the change it's looking up. + stopServerReplication(newClosestSecondary); + assert.writeOK(mongosColl.update({_id: 1}, {$set: {updatedCount: 2}})); + + // Since we stopped replication, we expect the update lookup to block indefinitely until we + // resume replication, so we resume replication in a parallel shell while this thread is blocked + // getting the next change from the stream. + const noConnect = true; // This shell creates its own connection to the host. + const joinResumeReplicationShell = + startParallelShell(`load('jstests/libs/write_concern_util.js'); + + const pausedSecondary = new Mongo("${newClosestSecondary.host}"); + + // Wait for the update lookup to appear in currentOp. + const changeStreamDB = pausedSecondary.getDB("${mongosDB.getName()}"); + assert.soon( + function() { + return changeStreamDB + .currentOp({ + op: "query", + // Note the namespace here happens to be the database.$cmd, + // because we're blocked waiting for the read concern, which + // happens before we get to the command processing level and + // adjust the currentOp namespace to include the collection name. + ns: "${mongosDB.getName()}.$cmd", + "command.comment": "${changeStreamComment}", + }) + .inprog.length === 1; + }, + () => "Failed to find update lookup in currentOp(): " + + tojson(changeStreamDB.currentOp().inprog)); + + // Then restart replication - this should eventually unblock the lookup. + restartServerReplication(pausedSecondary);`, + undefined, + noConnect); + assert.soon(() => changeStream.hasNext()); + latestChange = changeStream.next(); + assert.eq(latestChange.operationType, "update"); + assert.docEq(latestChange.fullDocument, {_id: 1, updatedCount: 2}); + joinResumeReplicationShell(); + + // Test that the update lookup goes to the new closest secondary. + profilerHasSingleMatchingEntryOrThrow({ + profileDB: newClosestSecondaryDB, + filter: { + op: "query", + ns: mongosColl.getFullName(), "command.comment": changeStreamComment, + // We need to filter out any profiler entries with a stale config - this is the first + // read on this secondary with a readConcern specified, so it is the first read on this + // secondary that will enforce shard version. + exceptionCode: {$ne: ErrorCodes.StaleConfig} + } + }); + + changeStream.close(); + st.stop(); + rst.stopSet(); +}()); diff --git a/src/mongo/db/pipeline/aggregation_request.h b/src/mongo/db/pipeline/aggregation_request.h index b34fc383d20..d6b87a5f183 100644 --- a/src/mongo/db/pipeline/aggregation_request.h +++ b/src/mongo/db/pipeline/aggregation_request.h @@ -274,7 +274,7 @@ private: // {$hint: <String>}, where <String> is the index name hinted. BSONObj _hint; - // The comment parameter attached to this aggregation. + // The comment parameter attached to this aggregation, empty if not set. std::string _comment; BSONObj _readConcern; diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index 0ecd0fcf92a..4585c68637c 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -847,9 +847,11 @@ public: * no matching documents were found, including cases where the given namespace does not * exist. */ - virtual boost::optional<Document> lookupSingleDocument(const NamespaceString& nss, - UUID collectionUUID, - const Document& documentKey) = 0; + virtual boost::optional<Document> lookupSingleDocument( + const NamespaceString& nss, + UUID collectionUUID, + const Document& documentKey, + boost::optional<BSONObj> readConcern) = 0; /** * Returns a vector of all local cursors. diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp index 7752fbfd751..b616b5ec659 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp @@ -105,9 +105,15 @@ Value DocumentSourceLookupChangePostImage::lookupPostImage(const Document& updat auto resumeToken = ResumeToken::parse(updateOp[DocumentSourceChangeStream::kIdField].getDocument()); + const auto readConcern = pExpCtx->inMongos + ? boost::optional<BSONObj>(BSON("level" + << "majority" + << "afterClusterTime" + << resumeToken.getData().clusterTime)) + : boost::none; invariant(resumeToken.getData().uuid); - auto lookedUpDoc = - _mongoProcessInterface->lookupSingleDocument(nss, *resumeToken.getData().uuid, documentKey); + auto lookedUpDoc = _mongoProcessInterface->lookupSingleDocument( + nss, *resumeToken.getData().uuid, documentKey, readConcern); // Check whether the lookup returned any documents. Even if the lookup itself succeeded, it may // not have returned any results if the document was deleted in the time since the update op. diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp index 3d35f875619..1d003c9c09e 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp @@ -115,7 +115,8 @@ public: boost::optional<Document> lookupSingleDocument(const NamespaceString& nss, UUID collectionUUID, - const Document& documentKey) { + const Document& documentKey, + boost::optional<BSONObj> readConcern) { boost::intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest(nss)); auto swPipeline = makePipeline({BSON("$match" << documentKey)}, expCtx); if (swPipeline == ErrorCodes::NamespaceNotFound) { diff --git a/src/mongo/db/pipeline/expression_context.cpp b/src/mongo/db/pipeline/expression_context.cpp index e2b2e1e758d..471a66626e1 100644 --- a/src/mongo/db/pipeline/expression_context.cpp +++ b/src/mongo/db/pipeline/expression_context.cpp @@ -46,6 +46,7 @@ ExpressionContext::ExpressionContext(OperationContext* opCtx, StringMap<ResolvedNamespace> resolvedNamespaces) : ExpressionContext(opCtx, collator.get()) { explain = request.getExplain(); + comment = request.getComment(); fromMongos = request.isFromMongos(); needsMerge = request.needsMerge(); allowDiskUse = request.shouldAllowDiskUse(); @@ -131,6 +132,7 @@ intrusive_ptr<ExpressionContext> ExpressionContext::copyWith( expCtx->uuid = std::move(uuid); expCtx->explain = explain; + expCtx->comment = comment; expCtx->needsMerge = needsMerge; expCtx->fromMongos = fromMongos; expCtx->from34Mongos = from34Mongos; diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h index a2e16163a73..e949e068f2b 100644 --- a/src/mongo/db/pipeline/expression_context.h +++ b/src/mongo/db/pipeline/expression_context.h @@ -165,6 +165,9 @@ public: // The explain verbosity requested by the user, or boost::none if no explain was requested. boost::optional<ExplainOptions::Verbosity> explain; + // The comment provided by the user, or the empty string if no comment was provided. + std::string comment; + bool fromMongos = false; bool needsMerge = false; bool inMongos = false; diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index b32f18828d0..f19c9aa3abf 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -386,12 +386,15 @@ public: boost::optional<Document> lookupSingleDocument(const NamespaceString& nss, UUID collectionUUID, - const Document& documentKey) final { + const Document& documentKey, + boost::optional<BSONObj> readConcern) final { + invariant(!readConcern); // We don't currently support a read concern on mongod - it's only + // expected to be necessary on mongos. + // // Be sure to do the lookup using the collection default collation. auto foreignExpCtx = _ctx->copyWith(nss, collectionUUID, _getCollectionDefaultCollator(nss, collectionUUID)); auto swPipeline = makePipeline({BSON("$match" << documentKey)}, foreignExpCtx); - if (swPipeline == ErrorCodes::NamespaceNotFound) { return boost::none; } diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h index c0beb0504ba..6b0032b1fd3 100644 --- a/src/mongo/db/pipeline/stub_mongo_process_interface.h +++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h @@ -120,7 +120,8 @@ public: boost::optional<Document> lookupSingleDocument(const NamespaceString& nss, UUID collectionUUID, - const Document& documentKey) { + const Document& documentKey, + boost::optional<BSONObj> readConcern) { MONGO_UNREACHABLE; } diff --git a/src/mongo/s/commands/pipeline_s.cpp b/src/mongo/s/commands/pipeline_s.cpp index b9fef455894..ef75292be37 100644 --- a/src/mongo/s/commands/pipeline_s.cpp +++ b/src/mongo/s/commands/pipeline_s.cpp @@ -32,6 +32,7 @@ #include "mongo/db/pipeline/document_source.h" #include "mongo/db/query/collation/collation_spec.h" +#include "mongo/db/repl/read_concern_args.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/commands/cluster_commands_helpers.h" @@ -175,7 +176,8 @@ public: boost::optional<Document> lookupSingleDocument(const NamespaceString& nss, UUID collectionUUID, - const Document& filter) final { + const Document& filter, + boost::optional<BSONObj> readConcern) final { auto foreignExpCtx = _expCtx->copyWith(nss, collectionUUID); // Create the find command to be dispatched to the shard in order to return the post-change @@ -189,12 +191,16 @@ public: cmdBuilder.append("find", nss.coll()); } cmdBuilder.append("filter", filterObj); + cmdBuilder.append("comment", _expCtx->comment); + if (readConcern) { + cmdBuilder.append(repl::ReadConcernArgs::kReadConcernFieldName, *readConcern); + } auto swShardResult = makeStatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>>(); auto findCmd = cmdBuilder.obj(); size_t numAttempts = 0; do { - // Verify that the collection exists, with the UUID passed in the expCtx. + // Verify that the collection exists, with the correct UUID. auto catalogCache = Grid::get(_expCtx->opCtx)->catalogCache(); auto swRoutingInfo = getCollectionRoutingInfo(foreignExpCtx); if (swRoutingInfo == ErrorCodes::NamespaceNotFound) { diff --git a/src/mongo/s/query/cluster_client_cursor.h b/src/mongo/s/query/cluster_client_cursor.h index a263a77f56e..742d294f107 100644 --- a/src/mongo/s/query/cluster_client_cursor.h +++ b/src/mongo/s/query/cluster_client_cursor.h @@ -30,6 +30,7 @@ #include <boost/optional.hpp> +#include "mongo/client/read_preference.h" #include "mongo/db/auth/user_name.h" #include "mongo/db/jsobj.h" #include "mongo/db/logical_session_id.h" @@ -137,6 +138,11 @@ public: * Returns the logical session id for this cursor. */ virtual boost::optional<LogicalSessionId> getLsid() const = 0; + + /** + * Returns the readPreference for this cursor. + */ + virtual boost::optional<ReadPreferenceSetting> getReadPreference() const = 0; }; } // namespace mongo diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp index 1cad6ef0830..9a2d5de4444 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.cpp +++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp @@ -146,6 +146,10 @@ boost::optional<LogicalSessionId> ClusterClientCursorImpl::getLsid() const { return _lsid; } +boost::optional<ReadPreferenceSetting> ClusterClientCursorImpl::getReadPreference() const { + return _params.readPreference; +} + namespace { /** diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h index f325be6ca4d..67e334a7f47 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.h +++ b/src/mongo/s/query/cluster_client_cursor_impl.h @@ -113,6 +113,8 @@ public: boost::optional<LogicalSessionId> getLsid() const final; + boost::optional<ReadPreferenceSetting> getReadPreference() const final; + public: /** private for tests */ /** diff --git a/src/mongo/s/query/cluster_client_cursor_mock.cpp b/src/mongo/s/query/cluster_client_cursor_mock.cpp index 4899e0f3056..f2327f861fa 100644 --- a/src/mongo/s/query/cluster_client_cursor_mock.cpp +++ b/src/mongo/s/query/cluster_client_cursor_mock.cpp @@ -115,4 +115,8 @@ boost::optional<LogicalSessionId> ClusterClientCursorMock::getLsid() const { return _lsid; } +boost::optional<ReadPreferenceSetting> ClusterClientCursorMock::getReadPreference() const { + return boost::none; +} + } // namespace mongo diff --git a/src/mongo/s/query/cluster_client_cursor_mock.h b/src/mongo/s/query/cluster_client_cursor_mock.h index 5cc6b2149a8..c63a2654cda 100644 --- a/src/mongo/s/query/cluster_client_cursor_mock.h +++ b/src/mongo/s/query/cluster_client_cursor_mock.h @@ -69,6 +69,8 @@ public: boost::optional<LogicalSessionId> getLsid() const final; + boost::optional<ReadPreferenceSetting> getReadPreference() const final; + /** * Returns true unless marked as having non-exhausted remote cursors via * markRemotesNotExhausted(). diff --git a/src/mongo/s/query/cluster_cursor_manager.cpp b/src/mongo/s/query/cluster_cursor_manager.cpp index ed34635a8fc..fd7927e90ea 100644 --- a/src/mongo/s/query/cluster_cursor_manager.cpp +++ b/src/mongo/s/query/cluster_cursor_manager.cpp @@ -139,6 +139,12 @@ bool ClusterCursorManager::PinnedCursor::isTailableAndAwaitData() const { return _cursor->isTailableAndAwaitData(); } +boost::optional<ReadPreferenceSetting> ClusterCursorManager::PinnedCursor::getReadPreference() + const { + invariant(_cursor); + return _cursor->getReadPreference(); +} + UserNameIterator ClusterCursorManager::PinnedCursor::getAuthenticatedUsers() const { invariant(_cursor); return _cursor->getAuthenticatedUsers(); diff --git a/src/mongo/s/query/cluster_cursor_manager.h b/src/mongo/s/query/cluster_cursor_manager.h index 6bd9e20c3a6..d82e1727e15 100644 --- a/src/mongo/s/query/cluster_cursor_manager.h +++ b/src/mongo/s/query/cluster_cursor_manager.h @@ -206,6 +206,11 @@ public: CursorId getCursorId() const; /** + * Returns the read preference setting for this cursor. + */ + boost::optional<ReadPreferenceSetting> getReadPreference() const; + + /** * Returns the number of result documents returned so far by this cursor via the next() * method. */ diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index 589917cd46b..ac055b83fd4 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -422,6 +422,9 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx, << " was not created by the authenticated user"}; } + if (auto readPref = pinnedCursor.getValue().getReadPreference()) { + ReadPreferenceSetting::get(opCtx) = *readPref; + } if (pinnedCursor.getValue().isTailableAndAwaitData()) { // Default to 1-second timeout for tailable awaitData cursors. If an explicit maxTimeMS has // been specified, do not apply it to the opCtx, since its deadline will already have been |