summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml2
-rw-r--r--jstests/sharding/change_stream_read_preference.js131
-rw-r--r--jstests/sharding/change_stream_update_lookup_read_concern.js205
-rw-r--r--src/mongo/db/pipeline/aggregation_request.h2
-rw-r--r--src/mongo/db/pipeline/document_source.h8
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp10
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp3
-rw-r--r--src/mongo/db/pipeline/expression_context.cpp2
-rw-r--r--src/mongo/db/pipeline/expression_context.h3
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp7
-rw-r--r--src/mongo/db/pipeline/stub_mongo_process_interface.h3
-rw-r--r--src/mongo/s/commands/pipeline_s.cpp10
-rw-r--r--src/mongo/s/query/cluster_client_cursor.h6
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.cpp4
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.h2
-rw-r--r--src/mongo/s/query/cluster_client_cursor_mock.cpp4
-rw-r--r--src/mongo/s/query/cluster_client_cursor_mock.h2
-rw-r--r--src/mongo/s/query/cluster_cursor_manager.cpp6
-rw-r--r--src/mongo/s/query/cluster_cursor_manager.h5
-rw-r--r--src/mongo/s/query/cluster_find.cpp3
20 files changed, 406 insertions, 12 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
index 695139b998c..7558224e2af 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
@@ -38,6 +38,8 @@ selector:
- jstests/sharding/change_stream_lookup_single_shard_cluster.js
- jstests/sharding/change_stream_remove_shard.js
- jstests/sharding/change_stream_update_lookup_collation.js
+ - jstests/sharding/change_stream_update_lookup_read_concern.js
+ - jstests/sharding/change_stream_read_preference.js
- jstests/sharding/change_streams.js
- jstests/sharding/change_streams_establishment_finds_new_shards.js
- jstests/sharding/change_streams_shards_start_in_sync.js
diff --git a/jstests/sharding/change_stream_read_preference.js b/jstests/sharding/change_stream_read_preference.js
new file mode 100644
index 00000000000..572ecf82424
--- /dev/null
+++ b/jstests/sharding/change_stream_read_preference.js
@@ -0,0 +1,131 @@
+// Tests that change streams and their update lookups obey the read preference specified by the
+// user.
+(function() {
+ "use strict";
+
+ load('jstests/libs/profiler.js'); // For various profiler helpers.
+
+ // For supportsMajorityReadConcern.
+ load('jstests/multiVersion/libs/causal_consistency_helpers.js');
+
+ // This test only works on storage engines that support committed reads, skip it if the
+ // configured engine doesn't support it.
+ if (!supportsMajorityReadConcern()) {
+ jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
+ return;
+ }
+
+ const st = new ShardingTest({
+ name: "change_stream_read_pref",
+ shards: 2,
+ rs: {
+ nodes: 2,
+ // Use a higher frequency for periodic noops to speed up the test.
+ setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: true}
+ },
+ });
+
+ const dbName = jsTestName();
+ const mongosDB = st.s0.getDB(dbName);
+ const mongosColl = mongosDB[jsTestName()];
+
+ // Enable sharding on the test DB and ensure its primary is shard0000.
+ assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
+ st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL());
+
+ // Shard the test collection on _id.
+ assert.commandWorked(
+ mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}}));
+
+ // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey].
+ assert.commandWorked(
+ mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}}));
+
+ // Move the [0, MaxKey] chunk to shard0001.
+ assert.commandWorked(mongosDB.adminCommand(
+ {moveChunk: mongosColl.getFullName(), find: {_id: 1}, to: st.rs1.getURL()}));
+
+ // Turn on the profiler.
+ for (let rs of[st.rs0, st.rs1]) {
+ assert.commandWorked(rs.getPrimary().getDB(dbName).setProfilingLevel(2));
+ assert.commandWorked(rs.getSecondary().getDB(dbName).setProfilingLevel(2));
+ }
+
+ // Write a document to each chunk.
+ assert.writeOK(mongosColl.insert({_id: -1}, {writeConcern: {w: "majority"}}));
+ assert.writeOK(mongosColl.insert({_id: 1}, {writeConcern: {w: "majority"}}));
+
+ // Test that change streams go to the primary by default.
+ let changeStreamComment = "change stream against primary";
+ const primaryStream = mongosColl.aggregate([{$changeStream: {fullDocument: "updateLookup"}}],
+ {comment: changeStreamComment});
+
+ assert.writeOK(mongosColl.update({_id: -1}, {$set: {updated: true}}));
+ assert.writeOK(mongosColl.update({_id: 1}, {$set: {updated: true}}));
+
+ assert.soon(() => primaryStream.hasNext());
+ assert.eq(primaryStream.next().fullDocument, {_id: -1, updated: true});
+ assert.soon(() => primaryStream.hasNext());
+ assert.eq(primaryStream.next().fullDocument, {_id: 1, updated: true});
+
+ for (let rs of[st.rs0, st.rs1]) {
+ const primaryDB = rs.getPrimary().getDB(dbName);
+ // Test that the change stream itself goes to the primary. There might be more than one if
+ // we needed multiple getMores to retrieve the changes.
+ // TODO SERVER-31650 We have to use 'originatingCommand' here and look for the getMore
+ // because the initial aggregate will not show up.
+ profilerHasAtLeastOneMatchingEntryOrThrow(
+ {profileDB: primaryDB, filter: {'originatingCommand.comment': changeStreamComment}});
+
+ // Test that the update lookup goes to the primary as well.
+ profilerHasSingleMatchingEntryOrThrow({
+ profileDB: primaryDB,
+ filter: {
+ op: "query",
+ ns: mongosColl.getFullName(), "command.comment": changeStreamComment
+ }
+ });
+ }
+
+ primaryStream.close();
+
+ // Test that change streams go to the secondary when the readPreference is {mode: "secondary"}.
+ changeStreamComment = 'change stream against secondary';
+ const secondaryStream =
+ mongosColl.aggregate([{$changeStream: {fullDocument: "updateLookup"}}],
+ {comment: changeStreamComment, $readPreference: {mode: "secondary"}});
+
+ assert.writeOK(mongosColl.update({_id: -1}, {$set: {updatedCount: 2}}));
+ assert.writeOK(mongosColl.update({_id: 1}, {$set: {updatedCount: 2}}));
+
+ assert.soon(() => secondaryStream.hasNext());
+ assert.eq(secondaryStream.next().fullDocument, {_id: -1, updated: true, updatedCount: 2});
+ assert.soon(() => secondaryStream.hasNext());
+ assert.eq(secondaryStream.next().fullDocument, {_id: 1, updated: true, updatedCount: 2});
+
+ for (let rs of[st.rs0, st.rs1]) {
+ const secondaryDB = rs.getSecondary().getDB(dbName);
+ // Test that the change stream itself goes to the secondary. There might be more than one if
+ // we needed multiple getMores to retrieve the changes.
+ // TODO SERVER-31650 We have to use 'originatingCommand' here and look for the getMore
+ // because the initial aggregate will not show up.
+ profilerHasAtLeastOneMatchingEntryOrThrow(
+ {profileDB: secondaryDB, filter: {'originatingCommand.comment': changeStreamComment}});
+
+ // Test that the update lookup goes to the secondary as well.
+ profilerHasSingleMatchingEntryOrThrow({
+ profileDB: secondaryDB,
+ filter: {
+ op: "query",
+ ns: mongosColl.getFullName(), "command.comment": changeStreamComment,
+ // We need to filter out any profiler entries with a stale config - this is the
+ // first read on this secondary with a readConcern specified, so it is the first
+ // read on this secondary that will enforce shard version.
+ exceptionCode: {$ne: ErrorCodes.StaleConfig}
+ }
+ });
+ }
+
+ secondaryStream.close();
+ st.stop();
+}());
diff --git a/jstests/sharding/change_stream_update_lookup_read_concern.js b/jstests/sharding/change_stream_update_lookup_read_concern.js
new file mode 100644
index 00000000000..f90e51fd17c
--- /dev/null
+++ b/jstests/sharding/change_stream_update_lookup_read_concern.js
@@ -0,0 +1,205 @@
+// Tests that a change stream's update lookup will use the appropriate read concern. In particular,
+// tests that the update lookup will return a version of the document at least as recent as the
+// change that we're doing the lookup for, and that change will be majority-committed.
+(function() {
+ "use strict";
+
+ load('jstests/replsets/rslib.js'); // For startSetIfSupportsReadMajority.
+ load("jstests/libs/profiler.js"); // For profilerHas*OrThrow() helpers.
+ load("jstests/replsets/rslib.js"); // For reconfig().
+
+ // For stopServerReplication() and restartServerReplication().
+ load("jstests/libs/write_concern_util.js");
+
+ // Configure a replica set to have nodes with specific tags - we will eventually add this as
+ // part of a sharded cluster.
+ const rsNodeOptions = {
+ setParameter: {
+ writePeriodicNoops: true,
+ // Note we do not configure the periodic noop writes to be more frequent as we do to
+ // speed up other change streams tests, since we provide an array of individually
+ // configured nodes, in order to know which nodes have which tags. This requires a step
+ // up command to happen, which requires all nodes to agree on an op time. With the
+ // periodic noop writer at a high frequency, this can potentially never finish.
+ },
+ shardsvr: "",
+ };
+ const replSetName = jsTestName();
+ const rst = new ReplSetTest({
+ name: replSetName,
+ nodes: [
+ {rsConfig: {priority: 1, tags: {tag: "primary"}}},
+ {rsConfig: {priority: 0, tags: {tag: "closestSecondary"}}},
+ {rsConfig: {priority: 0, tags: {tag: "fartherSecondary"}}}
+ ],
+ nodeOptions: rsNodeOptions,
+ });
+
+ if (!startSetIfSupportsReadMajority(rst)) {
+ jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
+ return;
+ }
+ rst.initiate();
+ rst.awaitSecondaryNodes();
+
+ // Start the sharding test and add the replica set.
+ const st = new ShardingTest({manualAddShard: true});
+ assert.commandWorked(st.s.adminCommand({addShard: replSetName + "/" + rst.getPrimary().host}));
+
+ const mongosDB = st.s0.getDB(jsTestName());
+ const mongosColl = mongosDB[jsTestName()];
+
+ // Shard the collection to ensure the change stream will perform update lookup from mongos.
+ assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
+ assert.commandWorked(
+ mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}}));
+
+ assert.writeOK(mongosColl.insert({_id: 1}));
+ rst.awaitReplication();
+
+ // Make sure reads with read preference tag 'closestSecondary' go to the tagged secondary.
+ const closestSecondary = rst.nodes[1];
+ const closestSecondaryDB = closestSecondary.getDB(mongosDB.getName());
+ assert.commandWorked(closestSecondaryDB.setProfilingLevel(2));
+
+ // We expect the tag to ensure there is only one node to choose from, so the actual read
+ // preference doesn't really matter - we use 'nearest' throughout.
+ assert.eq(mongosColl.find()
+ .readPref("nearest", [{tag: "closestSecondary"}])
+ .comment("testing targeting")
+ .itcount(),
+ 1);
+ profilerHasSingleMatchingEntryOrThrow({
+ profileDB: closestSecondaryDB,
+ filter: {ns: mongosColl.getFullName(), "command.comment": "testing targeting"}
+ });
+
+ const changeStreamComment = "change stream against closestSecondary";
+ const changeStream = mongosColl.aggregate([{$changeStream: {fullDocument: "updateLookup"}}], {
+ comment: changeStreamComment,
+ $readPreference: {mode: "nearest", tags: [{tag: "closestSecondary"}]}
+ });
+ assert.writeOK(mongosColl.update({_id: 1}, {$set: {updatedCount: 1}}));
+ assert.soon(() => changeStream.hasNext());
+ let latestChange = changeStream.next();
+ assert.eq(latestChange.operationType, "update");
+ assert.docEq(latestChange.fullDocument, {_id: 1, updatedCount: 1});
+
+ // Test that the change stream itself goes to the secondary. There might be more than one if we
+ // needed multiple getMores to retrieve the changes.
+ // TODO SERVER-31650 We have to use 'originatingCommand' here and look for the getMore because
+ // the initial aggregate will not show up.
+ profilerHasAtLeastOneMatchingEntryOrThrow({
+ profileDB: closestSecondaryDB,
+ filter: {"originatingCommand.comment": changeStreamComment}
+ });
+
+ // Test that the update lookup goes to the secondary as well.
+ profilerHasSingleMatchingEntryOrThrow({
+ profileDB: closestSecondaryDB,
+ filter: {
+ op: "query",
+ ns: mongosColl.getFullName(),
+ "command.filter._id": 1,
+ "command.comment": changeStreamComment,
+ // We need to filter out any profiler entries with a stale config - this is the first
+ // read on this secondary with a readConcern specified, so it is the first read on this
+ // secondary that will enforce shard version.
+ exceptionCode: {$ne: ErrorCodes.StaleConfig}
+ },
+ errorMsgFilter: {ns: mongosColl.getFullName()},
+ errorMsgProj: {ns: 1, op: 1, command: 1},
+ });
+
+ // Now add a new secondary which is "closer" (add the "closestSecondary" tag to that secondary,
+ // and remove it from the old node with that tag) to force update lookups target a different
+ // node than the change stream itself.
+ let rsConfig = rst.getReplSetConfig();
+ rsConfig.members[1].tags = {tag: "fartherSecondary"};
+ rsConfig.members[2].tags = {tag: "closestSecondary"};
+ rsConfig.version = rst.getReplSetConfigFromNode().version + 1;
+ reconfig(rst, rsConfig);
+ rst.awaitSecondaryNodes();
+ const newClosestSecondary = rst.nodes[2];
+ const newClosestSecondaryDB = newClosestSecondary.getDB(mongosDB.getName());
+ const originalClosestSecondaryDB = closestSecondaryDB;
+
+ // Wait for the mongos to acknowledge the new tags from our reconfig.
+ awaitRSClientHosts(st.s,
+ newClosestSecondary,
+ {ok: true, secondary: true, tags: {tag: "closestSecondary"}},
+ rst);
+ assert.commandWorked(newClosestSecondaryDB.setProfilingLevel(2));
+
+ // Make sure new queries with read preference tag "closestSecondary" go to the new secondary.
+ assert.eq(newClosestSecondaryDB.system.profile.count(), 0);
+ assert.eq(mongosColl.find()
+ .readPref("nearest", [{tag: "closestSecondary"}])
+ .comment("testing targeting")
+ .itcount(),
+ 1);
+ assert.gt(newClosestSecondaryDB.system.profile.count(
+ {ns: mongosColl.getFullName(), "command.comment": "testing targeting"}),
+ 0);
+
+ // Test that the change stream continues on the original host, but the update lookup now targets
+ // the new, lagged secondary. Even though it's lagged, the lookup should use 'afterClusterTime'
+ // to ensure it does not return until the node can see the change it's looking up.
+ stopServerReplication(newClosestSecondary);
+ assert.writeOK(mongosColl.update({_id: 1}, {$set: {updatedCount: 2}}));
+
+ // Since we stopped replication, we expect the update lookup to block indefinitely until we
+ // resume replication, so we resume replication in a parallel shell while this thread is blocked
+ // getting the next change from the stream.
+ const noConnect = true; // This shell creates its own connection to the host.
+ const joinResumeReplicationShell =
+ startParallelShell(`load('jstests/libs/write_concern_util.js');
+
+ const pausedSecondary = new Mongo("${newClosestSecondary.host}");
+
+ // Wait for the update lookup to appear in currentOp.
+ const changeStreamDB = pausedSecondary.getDB("${mongosDB.getName()}");
+ assert.soon(
+ function() {
+ return changeStreamDB
+ .currentOp({
+ op: "query",
+ // Note the namespace here happens to be the database.$cmd,
+ // because we're blocked waiting for the read concern, which
+ // happens before we get to the command processing level and
+ // adjust the currentOp namespace to include the collection name.
+ ns: "${mongosDB.getName()}.$cmd",
+ "command.comment": "${changeStreamComment}",
+ })
+ .inprog.length === 1;
+ },
+ () => "Failed to find update lookup in currentOp(): " +
+ tojson(changeStreamDB.currentOp().inprog));
+
+ // Then restart replication - this should eventually unblock the lookup.
+ restartServerReplication(pausedSecondary);`,
+ undefined,
+ noConnect);
+ assert.soon(() => changeStream.hasNext());
+ latestChange = changeStream.next();
+ assert.eq(latestChange.operationType, "update");
+ assert.docEq(latestChange.fullDocument, {_id: 1, updatedCount: 2});
+ joinResumeReplicationShell();
+
+ // Test that the update lookup goes to the new closest secondary.
+ profilerHasSingleMatchingEntryOrThrow({
+ profileDB: newClosestSecondaryDB,
+ filter: {
+ op: "query",
+ ns: mongosColl.getFullName(), "command.comment": changeStreamComment,
+ // We need to filter out any profiler entries with a stale config - this is the first
+ // read on this secondary with a readConcern specified, so it is the first read on this
+ // secondary that will enforce shard version.
+ exceptionCode: {$ne: ErrorCodes.StaleConfig}
+ }
+ });
+
+ changeStream.close();
+ st.stop();
+ rst.stopSet();
+}());
diff --git a/src/mongo/db/pipeline/aggregation_request.h b/src/mongo/db/pipeline/aggregation_request.h
index b34fc383d20..d6b87a5f183 100644
--- a/src/mongo/db/pipeline/aggregation_request.h
+++ b/src/mongo/db/pipeline/aggregation_request.h
@@ -274,7 +274,7 @@ private:
// {$hint: <String>}, where <String> is the index name hinted.
BSONObj _hint;
- // The comment parameter attached to this aggregation.
+ // The comment parameter attached to this aggregation, empty if not set.
std::string _comment;
BSONObj _readConcern;
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index 0ecd0fcf92a..4585c68637c 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -847,9 +847,11 @@ public:
* no matching documents were found, including cases where the given namespace does not
* exist.
*/
- virtual boost::optional<Document> lookupSingleDocument(const NamespaceString& nss,
- UUID collectionUUID,
- const Document& documentKey) = 0;
+ virtual boost::optional<Document> lookupSingleDocument(
+ const NamespaceString& nss,
+ UUID collectionUUID,
+ const Document& documentKey,
+ boost::optional<BSONObj> readConcern) = 0;
/**
* Returns a vector of all local cursors.
diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp
index 7752fbfd751..b616b5ec659 100644
--- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp
@@ -105,9 +105,15 @@ Value DocumentSourceLookupChangePostImage::lookupPostImage(const Document& updat
auto resumeToken =
ResumeToken::parse(updateOp[DocumentSourceChangeStream::kIdField].getDocument());
+ const auto readConcern = pExpCtx->inMongos
+ ? boost::optional<BSONObj>(BSON("level"
+ << "majority"
+ << "afterClusterTime"
+ << resumeToken.getData().clusterTime))
+ : boost::none;
invariant(resumeToken.getData().uuid);
- auto lookedUpDoc =
- _mongoProcessInterface->lookupSingleDocument(nss, *resumeToken.getData().uuid, documentKey);
+ auto lookedUpDoc = _mongoProcessInterface->lookupSingleDocument(
+ nss, *resumeToken.getData().uuid, documentKey, readConcern);
// Check whether the lookup returned any documents. Even if the lookup itself succeeded, it may
// not have returned any results if the document was deleted in the time since the update op.
diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
index 3d35f875619..1d003c9c09e 100644
--- a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
@@ -115,7 +115,8 @@ public:
boost::optional<Document> lookupSingleDocument(const NamespaceString& nss,
UUID collectionUUID,
- const Document& documentKey) {
+ const Document& documentKey,
+ boost::optional<BSONObj> readConcern) {
boost::intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest(nss));
auto swPipeline = makePipeline({BSON("$match" << documentKey)}, expCtx);
if (swPipeline == ErrorCodes::NamespaceNotFound) {
diff --git a/src/mongo/db/pipeline/expression_context.cpp b/src/mongo/db/pipeline/expression_context.cpp
index e2b2e1e758d..471a66626e1 100644
--- a/src/mongo/db/pipeline/expression_context.cpp
+++ b/src/mongo/db/pipeline/expression_context.cpp
@@ -46,6 +46,7 @@ ExpressionContext::ExpressionContext(OperationContext* opCtx,
StringMap<ResolvedNamespace> resolvedNamespaces)
: ExpressionContext(opCtx, collator.get()) {
explain = request.getExplain();
+ comment = request.getComment();
fromMongos = request.isFromMongos();
needsMerge = request.needsMerge();
allowDiskUse = request.shouldAllowDiskUse();
@@ -131,6 +132,7 @@ intrusive_ptr<ExpressionContext> ExpressionContext::copyWith(
expCtx->uuid = std::move(uuid);
expCtx->explain = explain;
+ expCtx->comment = comment;
expCtx->needsMerge = needsMerge;
expCtx->fromMongos = fromMongos;
expCtx->from34Mongos = from34Mongos;
diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h
index a2e16163a73..e949e068f2b 100644
--- a/src/mongo/db/pipeline/expression_context.h
+++ b/src/mongo/db/pipeline/expression_context.h
@@ -165,6 +165,9 @@ public:
// The explain verbosity requested by the user, or boost::none if no explain was requested.
boost::optional<ExplainOptions::Verbosity> explain;
+ // The comment provided by the user, or the empty string if no comment was provided.
+ std::string comment;
+
bool fromMongos = false;
bool needsMerge = false;
bool inMongos = false;
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index b32f18828d0..f19c9aa3abf 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -386,12 +386,15 @@ public:
boost::optional<Document> lookupSingleDocument(const NamespaceString& nss,
UUID collectionUUID,
- const Document& documentKey) final {
+ const Document& documentKey,
+ boost::optional<BSONObj> readConcern) final {
+ invariant(!readConcern); // We don't currently support a read concern on mongod - it's only
+ // expected to be necessary on mongos.
+ //
// Be sure to do the lookup using the collection default collation.
auto foreignExpCtx =
_ctx->copyWith(nss, collectionUUID, _getCollectionDefaultCollator(nss, collectionUUID));
auto swPipeline = makePipeline({BSON("$match" << documentKey)}, foreignExpCtx);
-
if (swPipeline == ErrorCodes::NamespaceNotFound) {
return boost::none;
}
diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h
index c0beb0504ba..6b0032b1fd3 100644
--- a/src/mongo/db/pipeline/stub_mongo_process_interface.h
+++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h
@@ -120,7 +120,8 @@ public:
boost::optional<Document> lookupSingleDocument(const NamespaceString& nss,
UUID collectionUUID,
- const Document& documentKey) {
+ const Document& documentKey,
+ boost::optional<BSONObj> readConcern) {
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/s/commands/pipeline_s.cpp b/src/mongo/s/commands/pipeline_s.cpp
index b9fef455894..ef75292be37 100644
--- a/src/mongo/s/commands/pipeline_s.cpp
+++ b/src/mongo/s/commands/pipeline_s.cpp
@@ -32,6 +32,7 @@
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/query/collation/collation_spec.h"
+#include "mongo/db/repl/read_concern_args.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/s/commands/cluster_commands_helpers.h"
@@ -175,7 +176,8 @@ public:
boost::optional<Document> lookupSingleDocument(const NamespaceString& nss,
UUID collectionUUID,
- const Document& filter) final {
+ const Document& filter,
+ boost::optional<BSONObj> readConcern) final {
auto foreignExpCtx = _expCtx->copyWith(nss, collectionUUID);
// Create the find command to be dispatched to the shard in order to return the post-change
@@ -189,12 +191,16 @@ public:
cmdBuilder.append("find", nss.coll());
}
cmdBuilder.append("filter", filterObj);
+ cmdBuilder.append("comment", _expCtx->comment);
+ if (readConcern) {
+ cmdBuilder.append(repl::ReadConcernArgs::kReadConcernFieldName, *readConcern);
+ }
auto swShardResult = makeStatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>>();
auto findCmd = cmdBuilder.obj();
size_t numAttempts = 0;
do {
- // Verify that the collection exists, with the UUID passed in the expCtx.
+ // Verify that the collection exists, with the correct UUID.
auto catalogCache = Grid::get(_expCtx->opCtx)->catalogCache();
auto swRoutingInfo = getCollectionRoutingInfo(foreignExpCtx);
if (swRoutingInfo == ErrorCodes::NamespaceNotFound) {
diff --git a/src/mongo/s/query/cluster_client_cursor.h b/src/mongo/s/query/cluster_client_cursor.h
index a263a77f56e..742d294f107 100644
--- a/src/mongo/s/query/cluster_client_cursor.h
+++ b/src/mongo/s/query/cluster_client_cursor.h
@@ -30,6 +30,7 @@
#include <boost/optional.hpp>
+#include "mongo/client/read_preference.h"
#include "mongo/db/auth/user_name.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/logical_session_id.h"
@@ -137,6 +138,11 @@ public:
* Returns the logical session id for this cursor.
*/
virtual boost::optional<LogicalSessionId> getLsid() const = 0;
+
+ /**
+ * Returns the readPreference for this cursor.
+ */
+ virtual boost::optional<ReadPreferenceSetting> getReadPreference() const = 0;
};
} // namespace mongo
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp
index 1cad6ef0830..9a2d5de4444 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp
@@ -146,6 +146,10 @@ boost::optional<LogicalSessionId> ClusterClientCursorImpl::getLsid() const {
return _lsid;
}
+boost::optional<ReadPreferenceSetting> ClusterClientCursorImpl::getReadPreference() const {
+ return _params.readPreference;
+}
+
namespace {
/**
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h
index f325be6ca4d..67e334a7f47 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.h
+++ b/src/mongo/s/query/cluster_client_cursor_impl.h
@@ -113,6 +113,8 @@ public:
boost::optional<LogicalSessionId> getLsid() const final;
+ boost::optional<ReadPreferenceSetting> getReadPreference() const final;
+
public:
/** private for tests */
/**
diff --git a/src/mongo/s/query/cluster_client_cursor_mock.cpp b/src/mongo/s/query/cluster_client_cursor_mock.cpp
index 4899e0f3056..f2327f861fa 100644
--- a/src/mongo/s/query/cluster_client_cursor_mock.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_mock.cpp
@@ -115,4 +115,8 @@ boost::optional<LogicalSessionId> ClusterClientCursorMock::getLsid() const {
return _lsid;
}
+boost::optional<ReadPreferenceSetting> ClusterClientCursorMock::getReadPreference() const {
+ return boost::none;
+}
+
} // namespace mongo
diff --git a/src/mongo/s/query/cluster_client_cursor_mock.h b/src/mongo/s/query/cluster_client_cursor_mock.h
index 5cc6b2149a8..c63a2654cda 100644
--- a/src/mongo/s/query/cluster_client_cursor_mock.h
+++ b/src/mongo/s/query/cluster_client_cursor_mock.h
@@ -69,6 +69,8 @@ public:
boost::optional<LogicalSessionId> getLsid() const final;
+ boost::optional<ReadPreferenceSetting> getReadPreference() const final;
+
/**
* Returns true unless marked as having non-exhausted remote cursors via
* markRemotesNotExhausted().
diff --git a/src/mongo/s/query/cluster_cursor_manager.cpp b/src/mongo/s/query/cluster_cursor_manager.cpp
index ed34635a8fc..fd7927e90ea 100644
--- a/src/mongo/s/query/cluster_cursor_manager.cpp
+++ b/src/mongo/s/query/cluster_cursor_manager.cpp
@@ -139,6 +139,12 @@ bool ClusterCursorManager::PinnedCursor::isTailableAndAwaitData() const {
return _cursor->isTailableAndAwaitData();
}
+boost::optional<ReadPreferenceSetting> ClusterCursorManager::PinnedCursor::getReadPreference()
+ const {
+ invariant(_cursor);
+ return _cursor->getReadPreference();
+}
+
UserNameIterator ClusterCursorManager::PinnedCursor::getAuthenticatedUsers() const {
invariant(_cursor);
return _cursor->getAuthenticatedUsers();
diff --git a/src/mongo/s/query/cluster_cursor_manager.h b/src/mongo/s/query/cluster_cursor_manager.h
index 6bd9e20c3a6..d82e1727e15 100644
--- a/src/mongo/s/query/cluster_cursor_manager.h
+++ b/src/mongo/s/query/cluster_cursor_manager.h
@@ -206,6 +206,11 @@ public:
CursorId getCursorId() const;
/**
+ * Returns the read preference setting for this cursor.
+ */
+ boost::optional<ReadPreferenceSetting> getReadPreference() const;
+
+ /**
* Returns the number of result documents returned so far by this cursor via the next()
* method.
*/
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index 589917cd46b..ac055b83fd4 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -422,6 +422,9 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx,
<< " was not created by the authenticated user"};
}
+ if (auto readPref = pinnedCursor.getValue().getReadPreference()) {
+ ReadPreferenceSetting::get(opCtx) = *readPref;
+ }
if (pinnedCursor.getValue().isTailableAndAwaitData()) {
// Default to 1-second timeout for tailable awaitData cursors. If an explicit maxTimeMS has
// been specified, do not apply it to the opCtx, since its deadline will already have been