summaryrefslogtreecommitdiff
path: root/jstests/sharding/change_stream_update_lookup_read_concern.js
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2017-11-07 16:58:37 +0000
committerCharlie Swanson <charlie.swanson@mongodb.com>2017-11-17 17:20:45 -0500
commitf7122973bd8001bb8dd393b7ad7851493b8b7743 (patch)
tree13614d9c57256c1b374e45d5a2a676ac518813a5 /jstests/sharding/change_stream_update_lookup_read_concern.js
parenta40d277a1c7a735e4d7ed5cf394e23181f8620fb (diff)
downloadmongo-f7122973bd8001bb8dd393b7ad7851493b8b7743.tar.gz
SERVER-31665 Use correct read concern/preference during update lookup
Diffstat (limited to 'jstests/sharding/change_stream_update_lookup_read_concern.js')
-rw-r--r--jstests/sharding/change_stream_update_lookup_read_concern.js205
1 files changed, 205 insertions, 0 deletions
diff --git a/jstests/sharding/change_stream_update_lookup_read_concern.js b/jstests/sharding/change_stream_update_lookup_read_concern.js
new file mode 100644
index 00000000000..f90e51fd17c
--- /dev/null
+++ b/jstests/sharding/change_stream_update_lookup_read_concern.js
@@ -0,0 +1,205 @@
+// Tests that a change stream's update lookup will use the appropriate read concern. In particular,
+// tests that the update lookup will return a version of the document at least as recent as the
+// change that we're doing the lookup for, and that change will be majority-committed.
+(function() {
+ "use strict";
+
+ load('jstests/replsets/rslib.js'); // For startSetIfSupportsReadMajority.
+ load("jstests/libs/profiler.js"); // For profilerHas*OrThrow() helpers.
+ load("jstests/replsets/rslib.js"); // For reconfig().
+
+ // For stopServerReplication() and restartServerReplication().
+ load("jstests/libs/write_concern_util.js");
+
+ // Configure a replica set to have nodes with specific tags - we will eventually add this as
+ // part of a sharded cluster.
+ const rsNodeOptions = {
+ setParameter: {
+ writePeriodicNoops: true,
+ // Note we do not configure the periodic noop writes to be more frequent as we do to
+ // speed up other change streams tests, since we provide an array of individually
+ // configured nodes, in order to know which nodes have which tags. This requires a step
+ // up command to happen, which requires all nodes to agree on an op time. With the
+ // periodic noop writer at a high frequency, this can potentially never finish.
+ },
+ shardsvr: "",
+ };
+ const replSetName = jsTestName();
+ const rst = new ReplSetTest({
+ name: replSetName,
+ nodes: [
+ {rsConfig: {priority: 1, tags: {tag: "primary"}}},
+ {rsConfig: {priority: 0, tags: {tag: "closestSecondary"}}},
+ {rsConfig: {priority: 0, tags: {tag: "fartherSecondary"}}}
+ ],
+ nodeOptions: rsNodeOptions,
+ });
+
+ if (!startSetIfSupportsReadMajority(rst)) {
+ jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
+ return;
+ }
+ rst.initiate();
+ rst.awaitSecondaryNodes();
+
+ // Start the sharding test and add the replica set.
+ const st = new ShardingTest({manualAddShard: true});
+ assert.commandWorked(st.s.adminCommand({addShard: replSetName + "/" + rst.getPrimary().host}));
+
+ const mongosDB = st.s0.getDB(jsTestName());
+ const mongosColl = mongosDB[jsTestName()];
+
+ // Shard the collection to ensure the change stream will perform update lookup from mongos.
+ assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
+ assert.commandWorked(
+ mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}}));
+
+ assert.writeOK(mongosColl.insert({_id: 1}));
+ rst.awaitReplication();
+
+ // Make sure reads with read preference tag 'closestSecondary' go to the tagged secondary.
+ const closestSecondary = rst.nodes[1];
+ const closestSecondaryDB = closestSecondary.getDB(mongosDB.getName());
+ assert.commandWorked(closestSecondaryDB.setProfilingLevel(2));
+
+ // We expect the tag to ensure there is only one node to choose from, so the actual read
+ // preference doesn't really matter - we use 'nearest' throughout.
+ assert.eq(mongosColl.find()
+ .readPref("nearest", [{tag: "closestSecondary"}])
+ .comment("testing targeting")
+ .itcount(),
+ 1);
+ profilerHasSingleMatchingEntryOrThrow({
+ profileDB: closestSecondaryDB,
+ filter: {ns: mongosColl.getFullName(), "command.comment": "testing targeting"}
+ });
+
+ const changeStreamComment = "change stream against closestSecondary";
+ const changeStream = mongosColl.aggregate([{$changeStream: {fullDocument: "updateLookup"}}], {
+ comment: changeStreamComment,
+ $readPreference: {mode: "nearest", tags: [{tag: "closestSecondary"}]}
+ });
+ assert.writeOK(mongosColl.update({_id: 1}, {$set: {updatedCount: 1}}));
+ assert.soon(() => changeStream.hasNext());
+ let latestChange = changeStream.next();
+ assert.eq(latestChange.operationType, "update");
+ assert.docEq(latestChange.fullDocument, {_id: 1, updatedCount: 1});
+
+ // Test that the change stream itself goes to the secondary. There might be more than one if we
+ // needed multiple getMores to retrieve the changes.
+ // TODO SERVER-31650 We have to use 'originatingCommand' here and look for the getMore because
+ // the initial aggregate will not show up.
+ profilerHasAtLeastOneMatchingEntryOrThrow({
+ profileDB: closestSecondaryDB,
+ filter: {"originatingCommand.comment": changeStreamComment}
+ });
+
+ // Test that the update lookup goes to the secondary as well.
+ profilerHasSingleMatchingEntryOrThrow({
+ profileDB: closestSecondaryDB,
+ filter: {
+ op: "query",
+ ns: mongosColl.getFullName(),
+ "command.filter._id": 1,
+ "command.comment": changeStreamComment,
+ // We need to filter out any profiler entries with a stale config - this is the first
+ // read on this secondary with a readConcern specified, so it is the first read on this
+ // secondary that will enforce shard version.
+ exceptionCode: {$ne: ErrorCodes.StaleConfig}
+ },
+ errorMsgFilter: {ns: mongosColl.getFullName()},
+ errorMsgProj: {ns: 1, op: 1, command: 1},
+ });
+
+ // Now add a new secondary which is "closer" (add the "closestSecondary" tag to that secondary,
+ // and remove it from the old node with that tag) to force update lookups target a different
+ // node than the change stream itself.
+ let rsConfig = rst.getReplSetConfig();
+ rsConfig.members[1].tags = {tag: "fartherSecondary"};
+ rsConfig.members[2].tags = {tag: "closestSecondary"};
+ rsConfig.version = rst.getReplSetConfigFromNode().version + 1;
+ reconfig(rst, rsConfig);
+ rst.awaitSecondaryNodes();
+ const newClosestSecondary = rst.nodes[2];
+ const newClosestSecondaryDB = newClosestSecondary.getDB(mongosDB.getName());
+ const originalClosestSecondaryDB = closestSecondaryDB;
+
+ // Wait for the mongos to acknowledge the new tags from our reconfig.
+ awaitRSClientHosts(st.s,
+ newClosestSecondary,
+ {ok: true, secondary: true, tags: {tag: "closestSecondary"}},
+ rst);
+ assert.commandWorked(newClosestSecondaryDB.setProfilingLevel(2));
+
+ // Make sure new queries with read preference tag "closestSecondary" go to the new secondary.
+ assert.eq(newClosestSecondaryDB.system.profile.count(), 0);
+ assert.eq(mongosColl.find()
+ .readPref("nearest", [{tag: "closestSecondary"}])
+ .comment("testing targeting")
+ .itcount(),
+ 1);
+ assert.gt(newClosestSecondaryDB.system.profile.count(
+ {ns: mongosColl.getFullName(), "command.comment": "testing targeting"}),
+ 0);
+
+ // Test that the change stream continues on the original host, but the update lookup now targets
+ // the new, lagged secondary. Even though it's lagged, the lookup should use 'afterClusterTime'
+ // to ensure it does not return until the node can see the change it's looking up.
+ stopServerReplication(newClosestSecondary);
+ assert.writeOK(mongosColl.update({_id: 1}, {$set: {updatedCount: 2}}));
+
+ // Since we stopped replication, we expect the update lookup to block indefinitely until we
+ // resume replication, so we resume replication in a parallel shell while this thread is blocked
+ // getting the next change from the stream.
+ const noConnect = true; // This shell creates its own connection to the host.
+ const joinResumeReplicationShell =
+ startParallelShell(`load('jstests/libs/write_concern_util.js');
+
+ const pausedSecondary = new Mongo("${newClosestSecondary.host}");
+
+ // Wait for the update lookup to appear in currentOp.
+ const changeStreamDB = pausedSecondary.getDB("${mongosDB.getName()}");
+ assert.soon(
+ function() {
+ return changeStreamDB
+ .currentOp({
+ op: "query",
+ // Note the namespace here happens to be the database.$cmd,
+ // because we're blocked waiting for the read concern, which
+ // happens before we get to the command processing level and
+ // adjust the currentOp namespace to include the collection name.
+ ns: "${mongosDB.getName()}.$cmd",
+ "command.comment": "${changeStreamComment}",
+ })
+ .inprog.length === 1;
+ },
+ () => "Failed to find update lookup in currentOp(): " +
+ tojson(changeStreamDB.currentOp().inprog));
+
+ // Then restart replication - this should eventually unblock the lookup.
+ restartServerReplication(pausedSecondary);`,
+ undefined,
+ noConnect);
+ assert.soon(() => changeStream.hasNext());
+ latestChange = changeStream.next();
+ assert.eq(latestChange.operationType, "update");
+ assert.docEq(latestChange.fullDocument, {_id: 1, updatedCount: 2});
+ joinResumeReplicationShell();
+
+ // Test that the update lookup goes to the new closest secondary.
+ profilerHasSingleMatchingEntryOrThrow({
+ profileDB: newClosestSecondaryDB,
+ filter: {
+ op: "query",
+ ns: mongosColl.getFullName(), "command.comment": changeStreamComment,
+ // We need to filter out any profiler entries with a stale config - this is the first
+ // read on this secondary with a readConcern specified, so it is the first read on this
+ // secondary that will enforce shard version.
+ exceptionCode: {$ne: ErrorCodes.StaleConfig}
+ }
+ });
+
+ changeStream.close();
+ st.stop();
+ rst.stopSet();
+}());