diff options
author | Max Hirschhorn <max.hirschhorn@mongodb.com> | 2017-09-20 14:15:00 -0400 |
---|---|---|
committer | Max Hirschhorn <max.hirschhorn@mongodb.com> | 2017-09-20 14:15:00 -0400 |
commit | 0195f34a7a61c75fd8fbaba6f8dae9671e2dd915 (patch) | |
tree | 63f9f41fae93f9b9c9efe5f993daf96dc7a0a0a9 | |
parent | 44a2de49607e5340efc7e84d265216723d403add (diff) | |
download | mongo-0195f34a7a61c75fd8fbaba6f8dae9671e2dd915.tar.gz |
SERVER-30679 Integrate causallyConsistentReads into SessionOptions.
20 files changed, 539 insertions, 297 deletions
diff --git a/jstests/aggregation/bugs/server18198.js b/jstests/aggregation/bugs/server18198.js index d55a890963e..a5847ad30dd 100644 --- a/jstests/aggregation/bugs/server18198.js +++ b/jstests/aggregation/bugs/server18198.js @@ -30,6 +30,9 @@ getMaxWireVersion: function() { return mongo.getMaxWireVersion(); }, + isCausalConsistency: function() { + return false; + }, }; db._mongo = mockMongo; diff --git a/jstests/change_streams/change_stream.js b/jstests/change_streams/change_stream.js index c545456ef33..5e06eae5b52 100644 --- a/jstests/change_streams/change_stream.js +++ b/jstests/change_streams/change_stream.js @@ -33,7 +33,7 @@ // these synchronization points from this test. assert.commandWorked(db.runCommand({ find: "foo", - readConcern: {level: "local", afterClusterTime: db.getMongo().getOperationTime()} + readConcern: {level: "local", afterClusterTime: db.getSession().getOperationTime()} })); // Waiting for replication assures no previous operations will be included. @@ -49,7 +49,7 @@ // TODO: SERVER-29126 assert.commandWorked(db.runCommand({ find: "foo", - readConcern: {level: "local", afterClusterTime: db.getMongo().getOperationTime()} + readConcern: {level: "local", afterClusterTime: db.getSession().getOperationTime()} })); FixtureHelpers.awaitReplication(); if (expectedBatch.length == 0) @@ -293,7 +293,7 @@ // TODO: SERVER-29126 assert.commandWorked(db.runCommand({ find: "foo", - readConcern: {level: "local", afterClusterTime: db.getMongo().getOperationTime()} + readConcern: {level: "local", afterClusterTime: db.getSession().getOperationTime()} })); FixtureHelpers.awaitReplication(); FixtureHelpers.runCommandOnEachPrimary({ @@ -322,7 +322,7 @@ // TODO: SERVER-29126 assert.commandWorked(db.runCommand({ find: "foo", - readConcern: {level: "local", afterClusterTime: db.getMongo().getOperationTime()} + readConcern: {level: "local", afterClusterTime: db.getSession().getOperationTime()} })); FixtureHelpers.awaitReplication(); assert.commandWorked(db.adminCommand( diff --git a/jstests/change_streams/change_stream_invalidation.js b/jstests/change_streams/change_stream_invalidation.js index 4d1122eeac3..8fcb5e9e527 100644 --- a/jstests/change_streams/change_stream_invalidation.js +++ b/jstests/change_streams/change_stream_invalidation.js @@ -19,7 +19,7 @@ // these synchronization points from this test. assert.commandWorked(db.runCommand({ find: "foo", - readConcern: {level: "local", afterClusterTime: db.getMongo().getOperationTime()} + readConcern: {level: "local", afterClusterTime: db.getSession().getOperationTime()} })); let res = assert.commandWorked( @@ -70,7 +70,7 @@ // these synchronization points from this test. assert.commandWorked(db.runCommand({ find: "foo", - readConcern: {level: "local", afterClusterTime: db.getMongo().getOperationTime()} + readConcern: {level: "local", afterClusterTime: db.getSession().getOperationTime()} })); res = diff --git a/jstests/change_streams/lookup_post_image.js b/jstests/change_streams/lookup_post_image.js index 44c1e8de956..fa9301fa142 100644 --- a/jstests/change_streams/lookup_post_image.js +++ b/jstests/change_streams/lookup_post_image.js @@ -21,7 +21,7 @@ // these synchronization points from this test. assert.commandWorked(db.runCommand({ find: "foo", - readConcern: {level: "local", afterClusterTime: db.getMongo().getOperationTime()} + readConcern: {level: "local", afterClusterTime: db.getSession().getOperationTime()} })); const cmdResponse = assert.commandWorked( @@ -44,7 +44,7 @@ // TODO: SERVER-29126 assert.commandWorked(db.runCommand({ find: "foo", - readConcern: {level: "local", afterClusterTime: db.getMongo().getOperationTime()} + readConcern: {level: "local", afterClusterTime: db.getSession().getOperationTime()} })); FixtureHelpers.awaitReplication(); // TODO: SERVER-29126 @@ -54,7 +54,7 @@ // these synchronization points from this test. assert.commandWorked(db.runCommand({ find: "foo", - readConcern: {level: "local", afterClusterTime: db.getMongo().getOperationTime()} + readConcern: {level: "local", afterClusterTime: db.getSession().getOperationTime()} })); FixtureHelpers.runCommandOnEachPrimary({ dbName: "admin", @@ -285,7 +285,7 @@ // these synchronization points from this test. assert.commandWorked(db.runCommand({ find: "foo", - readConcern: {level: "local", afterClusterTime: db.getMongo().getOperationTime()} + readConcern: {level: "local", afterClusterTime: db.getSession().getOperationTime()} })); res = assert.commandWorked(db.runCommand({ @@ -302,7 +302,7 @@ // these synchronization points from this test. assert.commandWorked(db.runCommand({ find: "foo", - readConcern: {level: "local", afterClusterTime: db.getMongo().getOperationTime()} + readConcern: {level: "local", afterClusterTime: db.getSession().getOperationTime()} })); res = assert.commandWorked(db.runCommand({ diff --git a/jstests/change_streams/report_latest_observed_oplog_timestamp.js b/jstests/change_streams/report_latest_observed_oplog_timestamp.js index 7345cfc4667..4598bc6ce9a 100644 --- a/jstests/change_streams/report_latest_observed_oplog_timestamp.js +++ b/jstests/change_streams/report_latest_observed_oplog_timestamp.js @@ -51,7 +51,7 @@ // synchronization point from this test. assert.commandWorked(db.runCommand({ find: "foo", - readConcern: {level: "local", afterClusterTime: db.getMongo().getOperationTime()} + readConcern: {level: "local", afterClusterTime: db.getSession().getOperationTime()} })); // Look at one batch's worth. diff --git a/jstests/concurrency/fsm_all_sharded_causal_consistency.js b/jstests/concurrency/fsm_all_sharded_causal_consistency.js index 05deef3583c..1d2f8617d4c 100644 --- a/jstests/concurrency/fsm_all_sharded_causal_consistency.js +++ b/jstests/concurrency/fsm_all_sharded_causal_consistency.js @@ -114,4 +114,4 @@ runWorkloadsSerially( return !Array.contains(blacklist, file); }), {sharded: {enabled: true}, replication: {enabled: true}}, - {sessionOptions: {causallyConsistentReads: true, readPreference: {mode: "secondary"}}}); + {sessionOptions: {causalConsistency: true, readPreference: {mode: "secondary"}}}); diff --git a/jstests/concurrency/fsm_all_sharded_causal_consistency_and_balancer.js b/jstests/concurrency/fsm_all_sharded_causal_consistency_and_balancer.js index 44120d28c85..7ee65f861bf 100644 --- a/jstests/concurrency/fsm_all_sharded_causal_consistency_and_balancer.js +++ b/jstests/concurrency/fsm_all_sharded_causal_consistency_and_balancer.js @@ -120,4 +120,4 @@ runWorkloadsSerially( return !Array.contains(blacklist, file); }), {sharded: {enabled: true, enableBalancer: true}, replication: {enabled: true}}, - {sessionOptions: {causallyConsistentReads: true, readPreference: {mode: "secondary"}}}); + {sessionOptions: {causalConsistency: true, readPreference: {mode: "secondary"}}}); diff --git a/jstests/concurrency/fsm_libs/worker_thread.js b/jstests/concurrency/fsm_libs/worker_thread.js index 2f5e0a40245..44bdb870d0c 100644 --- a/jstests/concurrency/fsm_libs/worker_thread.js +++ b/jstests/concurrency/fsm_libs/worker_thread.js @@ -41,13 +41,6 @@ var workerThread = (function() { myDB = new Mongo(args.host) .startSession(args.sessionOptions) .getDatabase(args.dbName); - - if (args.sessionOptions.causallyConsistentReads) { - // TODO SERVER-30679: We manually enable causal consistency on the - // connection object so that "afterClusterTime" is injected into the - // readConcern of any command requests through this connection. - myDB.getMongo().setCausalConsistency(); - } } else { myDB = new Mongo(args.host).getDB(args.dbName); } diff --git a/jstests/multiVersion/3_upgrade_replset.js b/jstests/multiVersion/3_upgrade_replset.js index bccab5068c7..ea8826c1ad7 100644 --- a/jstests/multiVersion/3_upgrade_replset.js +++ b/jstests/multiVersion/3_upgrade_replset.js @@ -58,6 +58,11 @@ rst.upgradeSet({binVersion: "latest"}); jsTest.log("Replica set upgraded."); +// We save a reference to the old primary so that we can call reconnect() on it before +// joinFindInsert() would attempt to send the node an update operation that signals the parallel +// shell running the background operations to stop. +var oldPrimary = primary; + // Wait for primary var primary = rst.getPrimary(); @@ -73,8 +78,15 @@ jsTest.log("Replica set downgraded."); // Allow even more valid writes to go through sleep(10 * 1000); +// Since the primary from before the upgrade took place was restarted as part of the +// upgrade/downgrade process, we explicitly reconnect to it so that sending it an update operation +// silently fails with an unchecked NotMaster error rather than a network error. +reconnect(oldPrimary.getDB("admin")); joinFindInsert(); +// Since the primary from after the upgrade took place was restarted as part of the downgrade +// process, we explicitly reconnect to it. +reconnect(primary.getDB("admin")); var totalInserts = primary.getCollection(insertNS).find().sort({_id: -1}).next()._id + 1; var dataFound = primary.getCollection(insertNS).count(); diff --git a/jstests/multiVersion/downgrade_replset.js b/jstests/multiVersion/downgrade_replset.js index 3e324a69ad6..7804769f4dc 100644 --- a/jstests/multiVersion/downgrade_replset.js +++ b/jstests/multiVersion/downgrade_replset.js @@ -54,9 +54,18 @@ function runDowngradeTest(protocolVersion) { rst.upgradeSet({binVersion: oldVersion}); jsTest.log("Downgrade complete."); + // We save a reference to the old primary so that we can call reconnect() on it before + // joinFindInsert() would attempt to send the node an update operation that signals the parallel + // shell running the background operations to stop. + var oldPrimary = primary; + primary = rst.getPrimary(); printjson(rst.status()); + // Since the old primary was restarted as part of the downgrade process, we explicitly reconnect + // to it so that sending it an update operation silently fails with an unchecked NotMaster error + // rather than a network error. + reconnect(oldPrimary.getDB("admin")); joinFindInsert(); rst.stopSet(); } diff --git a/jstests/multiVersion/minor_version_upgrade_replset.js b/jstests/multiVersion/minor_version_upgrade_replset.js index 45d3a7f4844..fb98e74f70e 100644 --- a/jstests/multiVersion/minor_version_upgrade_replset.js +++ b/jstests/multiVersion/minor_version_upgrade_replset.js @@ -58,6 +58,11 @@ rst.upgradeSet({binVersion: "latest"}); jsTest.log("Replica set upgraded."); +// We save a reference to the old primary so that we can call reconnect() on it before +// joinFindInsert() would attempt to send the node an update operation that signals the parallel +// shell running the background operations to stop. +var oldPrimary = primary; + // Wait for primary var primary = rst.getPrimary(); @@ -66,6 +71,10 @@ printjson(rst.status()); // Allow more valid writes to go through sleep(10 * 1000); +// Since the old primary was restarted as part of the upgrade process, we explicitly reconnect to it +// so that sending it an update operation silently fails with an unchecked NotMaster error rather +// than a network error. +reconnect(oldPrimary.getDB("admin")); joinFindInsert(); var totalInserts = primary.getCollection(insertNS).find().sort({_id: -1}).next()._id + 1; diff --git a/jstests/noPassthrough/shell_can_use_read_concern.js b/jstests/noPassthrough/shell_can_use_read_concern.js new file mode 100644 index 00000000000..95d47c4e74e --- /dev/null +++ b/jstests/noPassthrough/shell_can_use_read_concern.js @@ -0,0 +1,314 @@ +/** + * Tests that read operations executed through the mongo shell's API are specify afterClusterTime + * when causal consistency is enabled. + */ +(function() { + "use strict"; + + const rst = new ReplSetTest({nodes: 1}); + rst.startSet(); + rst.initiate(); + + const primary = rst.getPrimary(); + + function runTests({withSession}) { + let db; + + if (withSession) { + primary.setCausalConsistency(false); + db = primary.startSession({causalConsistency: true}).getDatabase("test"); + } else { + primary.setCausalConsistency(true); + db = primary.getDB("test"); + } + + const coll = db.shell_can_use_read_concern; + coll.drop(); + + function testCommandCanBeCausallyConsistent(func, { + expectedSession: expectedSession = withSession, + expectedAfterClusterTime: expectedAfterClusterTime = true + } = {}) { + const mongoRunCommandOriginal = Mongo.prototype.runCommand; + + const sentinel = {}; + let cmdObjSeen = sentinel; + + Mongo.prototype.runCommand = function runComandSpy(dbName, cmdObj, options) { + cmdObjSeen = cmdObj; + return mongoRunCommandOriginal.apply(this, arguments); + }; + + try { + assert.doesNotThrow(func); + } finally { + Mongo.prototype.runCommand = mongoRunCommandOriginal; + } + + if (cmdObjSeen === sentinel) { + throw new Error("Mongo.prototype.runCommand() was never called: " + + func.toString()); + } + + let cmdName = Object.keys(cmdObjSeen)[0]; + + // If the command is in a wrapped form, then we look for the actual command object + // inside + // the query/$query object. + if (cmdName === "query" || cmdName === "$query") { + cmdObjSeen = cmdObjSeen[cmdName]; + cmdName = Object.keys(cmdObjSeen)[0]; + } + + if (expectedSession) { + assert(cmdObjSeen.hasOwnProperty("lsid"), + "Expected operation " + tojson(cmdObjSeen) + + " to have a logical session id: " + func.toString()); + } + + if (expectedAfterClusterTime) { + assert(cmdObjSeen.hasOwnProperty("readConcern"), + "Expected operation " + tojson(cmdObjSeen) + + " to have a readConcern object since it can be causally consistent: " + + func.toString()); + + const readConcern = cmdObjSeen.readConcern; + assert(readConcern.hasOwnProperty("afterClusterTime"), + "Expected operation " + tojson(cmdObjSeen) + + " to specify afterClusterTime since it can be causally consistent: " + + func.toString()); + } else { + assert(!cmdObjSeen.hasOwnProperty("readConcern"), + "Expected operation " + tojson(cmdObjSeen) + " to not have a readConcern" + + " object since it cannot be causally consistent: " + func.toString()); + } + } + + // + // Tests for the "find" and "getMore" commands. + // + + { + testCommandCanBeCausallyConsistent(function() { + assert.writeOK(coll.insert([{}, {}, {}, {}, {}])); + }, {expectedSession: withSession, expectedAfterClusterTime: false}); + + testCommandCanBeCausallyConsistent(function() { + assert.commandWorked( + db.runCommand({find: coll.getName(), batchSize: 5, singleBatch: true})); + }); + + const cursor = coll.find().batchSize(2); + + testCommandCanBeCausallyConsistent(function() { + cursor.next(); + cursor.next(); + }); + + testCommandCanBeCausallyConsistent(function() { + cursor.next(); + cursor.next(); + cursor.next(); + assert(!cursor.hasNext()); + }, { + // TODO SERVER-30848: Change expectedSession to `withSession` after getMore requests + // from the mongo shell use the logical session the cursor was established with. + expectedSession: false, + expectedAfterClusterTime: false, + }); + } + + // + // Tests for the "count" command. + // + + testCommandCanBeCausallyConsistent(function() { + assert.commandWorked(db.runCommand({count: coll.getName()})); + }); + + testCommandCanBeCausallyConsistent(function() { + assert.commandWorked(db.runCommand({query: {count: coll.getName()}})); + }); + + testCommandCanBeCausallyConsistent(function() { + assert.commandWorked(db.runCommand({$query: {count: coll.getName()}})); + }); + + testCommandCanBeCausallyConsistent(function() { + assert.eq(5, coll.count()); + }); + + // + // Tests for the "distinct" command. + // + + testCommandCanBeCausallyConsistent(function() { + assert.commandWorked(db.runCommand({distinct: coll.getName(), key: "_id"})); + }); + + testCommandCanBeCausallyConsistent(function() { + const values = coll.distinct("_id"); + assert.eq(5, values.length, tojson(values)); + }); + + // + // Tests for the "aggregate" command. + // + + { + testCommandCanBeCausallyConsistent(function() { + assert.commandWorked(db.runCommand( + {aggregate: coll.getName(), pipeline: [], cursor: {batchSize: 5}})); + }); + + testCommandCanBeCausallyConsistent(function() { + assert.commandWorked(db.runCommand({ + aggregate: coll.getName(), + pipeline: [], + cursor: {batchSize: 5}, + explain: true + })); + }, {expectedSession: withSession, expectedAfterClusterTime: false}); + + let cursor; + + testCommandCanBeCausallyConsistent(function() { + cursor = coll.aggregate([], {cursor: {batchSize: 2}}); + cursor.next(); + cursor.next(); + }); + + testCommandCanBeCausallyConsistent(function() { + cursor.next(); + cursor.next(); + cursor.next(); + assert(!cursor.hasNext()); + }, { + // TODO SERVER-30848: Change expectedSession to `withSession` after getMore requests + // from the mongo shell use the logical session the cursor was established with. + expectedSession: false, + expectedAfterClusterTime: false, + }); + } + + // + // Tests for the "mapReduce" command. + // + + testCommandCanBeCausallyConsistent(function() { + const res = assert.commandWorked(db.runCommand({ + mapReduce: coll.getName(), + map: function() { + emit("x", 1); + }, + reduce: function(key, values) { + return values.length; + }, + out: {inline: 1} + })); + assert.eq([{_id: "x", value: 5}], res.results, tojson(res)); + }); + + testCommandCanBeCausallyConsistent(function() { + const res = coll.mapReduce( + function() { + emit("x", 1); + }, + function(key, values) { + return values.length; + }, + {out: {inline: 1}}); + assert.eq([{_id: "x", value: 5}], res.results, tojson(res)); + }); + + // + // Tests for the "group" command. + // + + testCommandCanBeCausallyConsistent(function() { + const res = assert.commandWorked(db.runCommand({ + group: { + ns: coll.getName(), + key: {x: 1}, + $reduce: function(curr, result) { + ++result.total; + }, + initial: {total: 0} + } + })); + assert.eq([{x: null, total: 5}], res.retval, tojson(res)); + }); + + testCommandCanBeCausallyConsistent(function() { + const res = coll.group({ + key: {x: 1}, + $reduce: function(curr, result) { + ++result.total; + }, + initial: {total: 0} + }); + assert.eq([{x: null, total: 5}], res); + }); + + // + // Tests for the "geoNear" command. + // + + testCommandCanBeCausallyConsistent(function() { + assert.commandWorked(coll.createIndex({loc: "2dsphere"})); + }, {expectedSession: withSession, expectedAfterClusterTime: false}); + + testCommandCanBeCausallyConsistent(function() { + assert.commandWorked(db.runCommand({ + geoNear: coll.getName(), + near: {type: "Point", coordinates: [0, 0]}, + spherical: true + })); + }); + + // + // Tests for the "geoSearch" command. + // + + testCommandCanBeCausallyConsistent(function() { + assert.commandWorked(coll.createIndex({loc: "geoHaystack", other: 1}, {bucketSize: 1})); + }, {expectedSession: withSession, expectedAfterClusterTime: false}); + + testCommandCanBeCausallyConsistent(function() { + assert.commandWorked(db.runCommand( + {geoSearch: coll.getName(), near: [0, 0], maxDistance: 1, search: {}})); + }); + + // + // Tests for the "parallelCollectionScan" command. + // + + testCommandCanBeCausallyConsistent(function() { + assert.commandWorked( + db.runCommand({parallelCollectionScan: coll.getName(), numCursors: 1})); + }); + + // + // Tests for the "explain" command. + // + + testCommandCanBeCausallyConsistent(function() { + assert.commandWorked(db.runCommand({explain: {find: coll.getName()}})); + }); + + testCommandCanBeCausallyConsistent(function() { + coll.find().explain(); + }); + + testCommandCanBeCausallyConsistent(function() { + coll.explain().find().finish(); + }); + + db.getSession().endSession(); + } + + runTests({withSession: false}); + runTests({withSession: true}); + + rst.stopSet(); +})(); diff --git a/jstests/noPassthroughWithMongod/create_indexes_shell_helper.js b/jstests/noPassthroughWithMongod/create_indexes_shell_helper.js index ae59ec3402f..01f891b0810 100644 --- a/jstests/noPassthroughWithMongod/create_indexes_shell_helper.js +++ b/jstests/noPassthroughWithMongod/create_indexes_shell_helper.js @@ -41,6 +41,9 @@ getMaxWireVersion: function() { return mongo.getMaxWireVersion(); }, + isCausalConsistency: function() { + return false; + }, }; db._mongo = mockMongo; diff --git a/jstests/noPassthroughWithMongod/default_read_pref.js b/jstests/noPassthroughWithMongod/default_read_pref.js index dfc092da399..b5171f559ff 100644 --- a/jstests/noPassthroughWithMongod/default_read_pref.js +++ b/jstests/noPassthroughWithMongod/default_read_pref.js @@ -27,6 +27,9 @@ getMaxWireVersion: function() { return mongo.getMaxWireVersion(); }, + isCausalConsistency: function() { + return false; + }, }; db._session = new _DummyDriverSession(db._mongo); diff --git a/jstests/sharding/after_cluster_time.js b/jstests/sharding/after_cluster_time.js index 47b97a3315e..bc8b1cf462a 100644 --- a/jstests/sharding/after_cluster_time.js +++ b/jstests/sharding/after_cluster_time.js @@ -90,8 +90,7 @@ testDB.getMongo().setCausalConsistency(true); // With causal consistency enabled, the shell sets read concern to level "majority" if it is - // not - // specified. + // not specified. assertAfterClusterTimeReadSucceeds(testDB, {afterClusterTime: Timestamp(1, 1)}); testDB.getMongo().setCausalConsistency(false); }; diff --git a/jstests/sharding/causal_consistency_shell_support.js b/jstests/sharding/causal_consistency_shell_support.js index 4e3f0818f1a..b5640d2ab8f 100644 --- a/jstests/sharding/causal_consistency_shell_support.js +++ b/jstests/sharding/causal_consistency_shell_support.js @@ -8,43 +8,27 @@ load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority. - // Verifies causal consistency is either enabled or disabled for each given command name. - function checkCausalConsistencySupportForCommandNames(cmdObjs, isReadCommand) { - cmdObjs.forEach(function(cmdObj) { - let cmdName = Object.keys(cmdObj)[0]; - if (cmdName === "query" || cmdName === "$query") { - cmdObj = cmdObj[cmdName]; - cmdName = Object.keys(cmdObj)[0]; - } - - assert.eq(testDB.getMongo()._isReadCommand(cmdObj), - isReadCommand, - "expected causal consistency support for command, " + tojson(cmdObj) + - ", to be " + isReadCommand); - }); - } - // Verifies the command works and properly updates operation or cluster time. function runCommandAndCheckLogicalTimes(cmdObj, db, shouldAdvance) { - const mongo = db.getMongo(); + const session = db.getSession(); // Extract initial operation and cluster time. - let operationTime = mongo.getOperationTime(); - let clusterTimeObj = mongo.getClusterTime(); + let operationTime = session.getOperationTime(); + let clusterTimeObj = session.getClusterTime(); assert.commandWorked(db.runCommand(cmdObj)); // Verify cluster and operation time. if (shouldAdvance) { - assert(bsonWoCompare(mongo.getOperationTime(), operationTime) > 0, + assert(bsonWoCompare(session.getOperationTime(), operationTime) > 0, "expected the shell's operationTime to increase after running command: " + tojson(cmdObj)); assert( - bsonWoCompare(mongo.getClusterTime().clusterTime, clusterTimeObj.clusterTime) > 0, + bsonWoCompare(session.getClusterTime().clusterTime, clusterTimeObj.clusterTime) > 0, "expected the shell's clusterTime value to increase after running command: " + tojson(cmdObj)); } else { - assert(bsonWoCompare(mongo.getOperationTime(), operationTime) == 0, + assert(bsonWoCompare(session.getOperationTime(), operationTime) == 0, "expected the shell's operationTime to not change after running command: " + tojson(cmdObj)); // Don't check clusterTime, because during a slow operation clusterTime may be @@ -54,63 +38,20 @@ // Verifies the command works and its response satisfies the callback. function commandReturnsExpectedResult(cmdObj, db, resCallback) { - const mongo = db.getMongo(); + const session = db.getSession(); // Use the latest cluster time returned as a new operationTime and run command. - const clusterTimeObj = mongo.getClusterTime(); - mongo.setOperationTime(clusterTimeObj.clusterTime); + const clusterTimeObj = session.getClusterTime(); + session._operationTime = clusterTimeObj.clusterTime; const res = assert.commandWorked(testDB.runCommand(cmdObj)); // Verify the response contents and that new operation time is >= passed in time. - assert(bsonWoCompare(mongo.getOperationTime(), clusterTimeObj.clusterTime) >= 0, + assert(bsonWoCompare(session.getOperationTime(), clusterTimeObj.clusterTime) >= 0, "expected the shell's operationTime to be >= to:" + clusterTimeObj.clusterTime + " after running command: " + tojson(cmdObj)); resCallback(res); } - // All commands currently enabled to use causal consistency in the shell. - const supportedCommandNames = [ - {"query": {"aggregate": "test", "pipeline": [{"$match": {"x": 1}}]}}, - {"aggregate": "test", "pipeline": [{"$match": {"x": 1}}]}, - {"group": {"key": {"x": 1}}}, - {"query": {"group": {"key": {"x": 1}}}}, - {"query": {"explain": {"group": {"key": {"x": 1}}}}}, - {"count": "test", "query": {}}, - {"query": {"count": "test", "query": {}}}, - {"query": {"explain": {"count": "test", "query": {}}}}, - {"explain": {"count": "test", "query": {}}}, - {"distinct": "test", "query": {}}, - {"query": {"distinct": "test", "query": {}}}, - {"query": {"explain": {"distinct": "test", "query": {}}}}, - {"find": "test", "query": {}}, - {"query": {"find": "test", "query": {}}}, - {"query": {"explain": {"find": "test", "query": {}}}}, - {"geoNear": "test", "near": {}}, - {"query": {"geoNear": "test", "near": {}}}, - {"query": {"explain": {"geoNear": "test", "near": {}}}}, - {"geoSearch": "test", "near": {}}, - {"mapReduce": "test"}, - {"parallelCollectionScan": "test"}, - {"getMore": NumberLong("5888577173997830861")} - ]; - - // Omitting some commands for simplicity. Every command not listed above should be unsupported. - const unsupportedCommandNames = [ - {"aggregate": "test", "pipeline": [{"$match": {"x": 1}}], "explain": true}, - {"explain": {"aggregate": "test", "pipeline": [{"$match": {"x": 1}}]}}, - {"delete": "coll", "query": {"x": 1}}, - {"explain": {"delete": "coll", "query": {"x": 1}}}, - {"findAndModify": "coll", "query": {"x": 1}}, - {"explain": {"findAndModify": "coll", "query": {"x": 1}}}, - {"query": {"explain": {"findAndModify": "coll", "query": {"x": 1}}}}, - {"insert": "coll"}, - {"explain": {"insert": "coll"}}, - {"explain": {"update": "coll"}}, - {"update": "coll"}, - {"getLastError": {"x": 1}}, - {"getPrevError": {"x": 1}} - ]; - // Manually create a shard so tests on storage engines that don't support majority readConcern // can exit early. const rsName = "causal_consistency_shell_support_rs"; @@ -136,32 +77,32 @@ assert.commandWorked(st.s.adminCommand({addShard: rst.getURL()})); const testDB = st.s.getDB("test"); - const mongo = testDB.getMongo(); + const session = testDB.getSession(); // Verify causal consistency is disabled unless explicitly set. - assert.eq(!!mongo._isCausal, false); - mongo.setCausalConsistency(true); + assert.eq(testDB.getMongo()._causalConsistency, + false, + "causal consistency should be disabled by default"); + testDB.getMongo().setCausalConsistency(true); // Verify causal consistency is enabled for the connection and for each supported command. - assert.eq(!!mongo._isCausal, true); - checkCausalConsistencySupportForCommandNames(supportedCommandNames, true); - checkCausalConsistencySupportForCommandNames(unsupportedCommandNames, false); + assert.eq(testDB.getMongo()._causalConsistency, + true, + "calling setCausalConsistency() didn't enable causal consistency"); // Verify cluster times are tracked even before causal consistency is set (so the first // operation with causal consistency set can use valid cluster times). - mongo._operationTime = null; - mongo._clusterTime = null; + session._operationTime = undefined; assert.commandWorked(testDB.runCommand({insert: "foo", documents: [{x: 1}]})); - assert.neq(mongo.getOperationTime(), null); - assert.neq(mongo.getClusterTime(), null); + assert.neq(session.getOperationTime(), null); + assert.neq(session.getClusterTime(), null); - mongo._operationTime = null; - mongo._clusterTime = null; + session._operationTime = undefined; assert.commandWorked(testDB.runCommand({find: "foo"})); - assert.neq(mongo.getOperationTime(), null); - assert.neq(mongo.getClusterTime(), null); + assert.neq(session.getOperationTime(), null); + assert.neq(session.getClusterTime(), null); // Test that write commands advance both operation and cluster time. runCommandAndCheckLogicalTimes({insert: "foo", documents: [{x: 2}]}, testDB, true); @@ -170,7 +111,7 @@ // Test that each supported command works as expected and the shell's cluster times are properly // forwarded to the server and updated based on the response. - mongo.setCausalConsistency(true); + testDB.getMongo().setCausalConsistency(true); // Aggregate command. let aggColl = "aggColl"; @@ -244,7 +185,7 @@ // Verify that the server rejects commands when operation time is invalid by running a command // with an afterClusterTime value one day ahead. - const invalidTime = new Timestamp(mongo.getOperationTime().getTime() + (60 * 60 * 24), 0); + const invalidTime = new Timestamp(session.getOperationTime().getTime() + (60 * 60 * 24), 0); const invalidCmd = { find: "foo", readConcern: {level: "majority", afterClusterTime: invalidTime} @@ -254,7 +195,7 @@ ErrorCodes.InvalidOptions, "expected command, " + tojson(invalidCmd) + ", to fail with code, " + ErrorCodes.InvalidOptions + ", because the afterClusterTime value, " + invalidTime + - ", should not be ahead of the clusterTime, " + mongo.getClusterTime().clusterTime); + ", should not be ahead of the clusterTime, " + session.getClusterTime().clusterTime); st.stop(); })(); diff --git a/jstests/sharding/secondary_shard_version_protocol_with_causal_consistency.js b/jstests/sharding/secondary_shard_version_protocol_with_causal_consistency.js index d344032102a..17c36b3d80d 100644 --- a/jstests/sharding/secondary_shard_version_protocol_with_causal_consistency.js +++ b/jstests/sharding/secondary_shard_version_protocol_with_causal_consistency.js @@ -46,20 +46,24 @@ // Note: this query will not be registered by the profiler because it errors before reaching the // storage level. jsTest.log("Do a secondary read from stale mongos with afterClusterTime and level 'available'"); - assert.commandFailedWithCode(staleMongos.getDB(dbName).runCommand({ + const staleMongosDB = staleMongos.getDB(dbName); + assert.commandFailedWithCode(staleMongosDB.runCommand({ count: collName, query: {x: 1}, $readPreference: {mode: "secondary"}, - readConcern: {'afterClusterTime': staleMongos.getOperationTime(), 'level': 'available'} + readConcern: { + 'afterClusterTime': staleMongosDB.getSession().getOperationTime(), + 'level': 'available' + } }), ErrorCodes.InvalidOptions); jsTest.log("Do a secondary read from stale mongos with afterClusterTime and no level"); - let res = staleMongos.getDB('test').runCommand({ + let res = staleMongosDB.runCommand({ count: collName, query: {x: 1}, $readPreference: {mode: "secondary"}, - readConcern: {'afterClusterTime': staleMongos.getOperationTime()}, + readConcern: {'afterClusterTime': staleMongosDB.getSession().getOperationTime()}, }); assert(res.ok); assert.eq(1, res.n, tojson(res)); diff --git a/src/mongo/shell/db.js b/src/mongo/shell/db.js index 4930a5d46b0..7b9c1f25f62 100644 --- a/src/mongo/shell/db.js +++ b/src/mongo/shell/db.js @@ -1857,7 +1857,7 @@ var DB; (function(hasOwnProperty) { DB.prototype.getSession = function() { if (!hasOwnProperty.call(this, "_session")) { - this._session = new _DummyDriverSession(this.getMongo()); + this._session = this.getMongo()._getDefaultSession(); } return this._session; }; diff --git a/src/mongo/shell/mongo.js b/src/mongo/shell/mongo.js index c5ca1a851ee..231206a9766 100644 --- a/src/mongo/shell/mongo.js +++ b/src/mongo/shell/mongo.js @@ -11,24 +11,10 @@ if (!Mongo.prototype) { throw Error("Mongo.prototype not defined"); } -(function(original) { - Mongo.prototype.find = function find(ns, query, fields, limit, skip, batchSize, options) { - const self = this; - // Causal consistency is not supported in the OP_QUERY legacy protocol. - if (this._isCausal && this.useReadCommands()) { - query = this._gossipLogicalTime(query); - } - const res = original.call(this, ns, query, fields, limit, skip, batchSize, options); - const origNext = res.next; - res.next = function next() { - const ret = origNext.call(this); - self._setLogicalTimeFromReply(ret); - return ret; - }; - return res; +if (!Mongo.prototype.find) + Mongo.prototype.find = function(ns, query, fields, limit, skip, batchSize, options) { + throw Error("find not implemented"); }; -})(Mongo.prototype.find); - if (!Mongo.prototype.insert) Mongo.prototype.insert = function(ns, obj) { throw Error("insert not implemented"); @@ -46,13 +32,6 @@ if (typeof mongoInject == "function") { mongoInject(Mongo.prototype); } -Mongo.prototype.setCausalConsistency = function(value) { - if (arguments.length === 0) { - value = true; - } - this._isCausal = value; -}; - Mongo.prototype.setSlaveOk = function(value) { if (value == undefined) value = true; @@ -77,7 +56,7 @@ Mongo.prototype.getDB = function(name) { return new DB(this, name); }; -Mongo.prototype.getDBs = function(driverSession = new _DummyDriverSession(this)) { +Mongo.prototype.getDBs = function(driverSession = this._getDefaultSession()) { var cmdObj = {listDatabases: 1}; cmdObj = driverSession._serverSession.injectSessionId(cmdObj); @@ -87,141 +66,6 @@ Mongo.prototype.getDBs = function(driverSession = new _DummyDriverSession(this)) return res; }; -Mongo.prototype._isReadCommand = function(cmdObj) { - let readCommands = [ - "count", - "distinct", - "find", - "getMore", - "geoNear", - "geoSearch", - "group", - "mapReduce", - "mapreduce", - "parallelCollectionScan", - ]; - - const cmdName = Object.keys(cmdObj)[0]; - let isReadCommand = Array.contains(readCommands, cmdName); - if (cmdName === "aggregate") { - // Aggregate can be either a read or a write depending on whether it has a $out stage. - // $out is required to be the last stage of the pipeline. - var stages = cmdObj.pipeline; - const lastStage = stages && Array.isArray(stages) && (stages.length !== 0) - ? stages[stages.length - 1] - : undefined; - const hasOut = - lastStage && (typeof lastStage === "object") && lastStage.hasOwnProperty("$out"); - const hasExplain = cmdObj.hasOwnProperty("explain") && cmdObj.explain; - - if (!hasExplain && !hasOut) { - isReadCommand = true; - } - } - - if (cmdName === "explain") { - if (Array.contains(readCommands, Object.keys(cmdObj[cmdName])[0])) { - isReadCommand = true; - } - } - - return isReadCommand; -}; - -/** - * Adds afterClusterTime to the readConcern. - */ -Mongo.prototype._injectAfterClusterTime = function(cmdObj) { - cmdObj = Object.assign({}, cmdObj); - // The operationTime returned by the current session (i.e. connection) is the - // smallest time that is needed for causal consistent read. The clusterTime is >= - // the operationTime so it's less efficient to wait on the server for the - // clusterTime. - const operationTime = this.getOperationTime(); - if (operationTime) { - let cmdName = Object.keys(cmdObj)[0]; - let cmdObjUnwrapped = cmdObj; - if (cmdName === "query" || cmdName === "$query") { - cmdObj[cmdName] = Object.assign({}, cmdObj[cmdName]); - cmdObjUnwrapped = cmdObj[cmdName]; - cmdName = Object.keys(cmdObjUnwrapped)[0]; - } - - cmdObjUnwrapped.readConcern = Object.assign({}, cmdObjUnwrapped.readConcern); - let readConcern = cmdObjUnwrapped.readConcern; - - if (!readConcern.hasOwnProperty("afterClusterTime")) { - readConcern.afterClusterTime = operationTime; - } - if (!readConcern.hasOwnProperty("level")) { - readConcern.level = "local"; - } - - const isReadCommand = this._isReadCommand(cmdObjUnwrapped); - const readPref = this.getReadPref(); - // While the readConcern must be set on the commandObject level i.e. in this case its the - // object referenced by cmdObjUnwrapped the $readPreference must be set on the top level. - if (isReadCommand && !cmdObj.hasOwnProperty("$readPreference") && readPref) { - cmdObj.$readPreference = readPref; - } - } - return cmdObj; -}; - -Mongo.prototype._gossipLogicalTime = function(obj) { - obj = Object.assign({}, obj); - const clusterTime = this.getClusterTime(); - if (clusterTime) { - obj["$clusterTime"] = clusterTime; - } - return obj; -}; - -/** - * Sets logicalTime and operationTime extracted from command reply. - * This is applicable for the protocol starting from version 3.6. - */ -Mongo.prototype._setLogicalTimeFromReply = function(res) { - if (res.hasOwnProperty("operationTime")) { - this.setOperationTime(res["operationTime"]); - } - if (res.hasOwnProperty("$clusterTime")) { - this.setClusterTime(res["$clusterTime"]); - } -}; - -/** - * Adds afterClusterTime to the readConcern if its supported and runs the command. - */ -(function(original) { - Mongo.prototype.runCommandWithMetadata = function runCommandWithMetadata( - dbName, metadata, cmdObj) { - if (this._isCausal && cmdObj) { - cmdObj = this._injectAfterClusterTime(cmdObj); - metadata = this._gossipLogicalTime(metadata); - } - const res = original.call(this, dbName, metadata, cmdObj); - - this._setLogicalTimeFromReply(res); - return res; - }; -})(Mongo.prototype.runCommandWithMetadata); - -/** - * Adds afterClusterTime to the readConcern if its supported and runs the command. - */ -(function(original) { - Mongo.prototype.runCommand = function runCommand(dbName, cmdObj, options) { - if (this._isCausal && cmdObj) { - cmdObj = this._injectAfterClusterTime(cmdObj); - cmdObj = this._gossipLogicalTime(cmdObj); - } - const res = original.call(this, dbName, cmdObj, options); - this._setLogicalTimeFromReply(res); - return res; - }; -})(Mongo.prototype.runCommand); - Mongo.prototype.adminCommand = function(cmd) { return this.getDB("admin").runCommand(cmd); }; @@ -229,7 +73,7 @@ Mongo.prototype.adminCommand = function(cmd) { /** * Returns all log components and current verbosity values */ -Mongo.prototype.getLogComponents = function(driverSession = new _DummyDriverSession(this)) { +Mongo.prototype.getLogComponents = function(driverSession = this._getDefaultSession()) { var cmdObj = {getParameter: 1, logComponentVerbosity: 1}; cmdObj = driverSession._serverSession.injectSessionId(cmdObj); @@ -244,7 +88,7 @@ Mongo.prototype.getLogComponents = function(driverSession = new _DummyDriverSess * string of form "storage.journaling" */ Mongo.prototype.setLogLevel = function( - logLevel, component, driverSession = new _DummyDriverSession(this)) { + logLevel, component, driverSession = this._getDefaultSession()) { componentNames = []; if (typeof component === "string") { componentNames = component.split("."); @@ -547,30 +391,27 @@ Mongo.prototype.unsetWriteConcern = function() { delete this._writeConcern; }; -/** - * Sets the operationTime. - */ -Mongo.prototype.setOperationTime = function(operationTime) { - if (operationTime === Timestamp(0, 0)) { - throw Error("Attempt to set an uninitiated operationTime"); - } - if (this._operationTime === undefined || this._operationTime === null || - (typeof operationTime === "object" && - bsonWoCompare(operationTime, this._operationTime) === 1)) { - this._operationTime = operationTime; +Mongo.prototype.startSession = function startSession(options) { + return new DriverSession(this, options); +}; + +Mongo.prototype._getDefaultSession = function getDefaultSession() { + // We implicitly associate a Mongo connection object with a DriverSession so that tests which + // call DB.prototype.getMongo() and then Mongo.prototype.getDB() to get a different DB instance + // are still causally consistent. + if (!this.hasOwnProperty("_defaultSession")) { + this._defaultSession = new _DummyDriverSession(this); } + return this._defaultSession; }; -/** - * Gets the operationTime or null if unset. - */ -Mongo.prototype.getOperationTime = function() { - if (this._operationTime === undefined) { - return null; +Mongo.prototype.isCausalConsistency = function isCausalConsistency() { + if (!this.hasOwnProperty("_causalConsistency")) { + this._causalConsistency = false; } - return this._operationTime; + return this._causalConsistency; }; -Mongo.prototype.startSession = function(opts) { - return new DriverSession(this, opts); +Mongo.prototype.setCausalConsistency = function setCausalConsistency(causalConsistency = true) { + this._causalConsistency = causalConsistency; }; diff --git a/src/mongo/shell/session.js b/src/mongo/shell/session.js index 90460663284..f932752555d 100644 --- a/src/mongo/shell/session.js +++ b/src/mongo/shell/session.js @@ -10,6 +10,9 @@ var { let _readPreference = rawOptions.readPreference; let _readConcern = rawOptions.readConcern; let _writeConcern = rawOptions.writeConcern; + const _initialClusterTime = rawOptions.initialClusterTime; + const _initialOperationTime = rawOptions.initialOperationTime; + let _causalConsistency = rawOptions.causalConsistency; let _retryWrites = rawOptions.retryWrites; this.getReadPreference = function getReadPreference() { @@ -39,6 +42,22 @@ var { _writeConcern = writeConcern; }; + this.getInitialClusterTime = function getInitialClusterTime() { + return _initialClusterTime; + }; + + this.getInitialOperationTime = function getInitialOperationTime() { + return _initialOperationTime; + }; + + this.isCausalConsistency = function isCausalConsistency() { + return _causalConsistency; + }; + + this.setCausalConsistency = function setCausalConsistency(causalConsistency = true) { + _causalConsistency = causalConsistency; + }; + this.shouldRetryWrites = function shouldRetryWrites() { return _retryWrites; }; @@ -50,6 +69,7 @@ var { function SessionAwareClient(client) { const kWireVersionSupportingLogicalSession = 6; + const kWireVersionSupportingCausalConsistency = 6; this.getReadPreference = function getReadPreference(driverSession) { const sessionOptions = driverSession.getOptions(); @@ -83,11 +103,90 @@ var { wireVersion <= client.getMaxWireVersion(); } + const kCommandsThatSupportReadConcern = new Set([ + "aggregate", + "count", + "distinct", + "explain", + "find", + "geoNear", + "geoSearch", + "group", + "mapReduce", + "mapreduce", + "parallelCollectionScan", + ]); + + function canUseReadConcern(cmdObj) { + let cmdName = Object.keys(cmdObj)[0]; + + // If the command is in a wrapped form, then we look for the actual command name inside + // the query/$query object. + let cmdObjUnwrapped = cmdObj; + if (cmdName === "query" || cmdName === "$query") { + cmdObjUnwrapped = cmdObj[cmdName]; + cmdName = Object.keys(cmdObjUnwrapped)[0]; + } + + if (!kCommandsThatSupportReadConcern.has(cmdName)) { + return false; + } + + if (cmdName === "aggregate" && cmdObjUnwrapped.explain) { + // TODO SERVER-30582: Aggregation's explain doesn't support the "readConcern" + // option. Note that an aggregation with a $out stage as its last stage still + // supports a read concern level of "local". + return false; + } + + if (cmdName === "explain") { + return kCommandsThatSupportReadConcern.has(Object.keys(cmdObjUnwrapped.explain)[0]); + } + + return true; + } + + function injectAfterClusterTime(cmdObj, operationTime) { + cmdObj = Object.assign({}, cmdObj); + + if (operationTime !== undefined) { + const cmdName = Object.keys(cmdObj)[0]; + + // If the command is in a wrapped form, then we look for the actual command object + // inside the query/$query object. + let cmdObjUnwrapped = cmdObj; + if (cmdName === "query" || cmdName === "$query") { + cmdObj[cmdName] = Object.assign({}, cmdObj[cmdName]); + cmdObjUnwrapped = cmdObj[cmdName]; + } + + cmdObjUnwrapped.readConcern = Object.assign({}, cmdObjUnwrapped.readConcern); + const readConcern = cmdObjUnwrapped.readConcern; + + if (!readConcern.hasOwnProperty("afterClusterTime")) { + readConcern.afterClusterTime = operationTime; + } + } + + return cmdObj; + } + function prepareCommandRequest(driverSession, cmdObj) { if (serverSupports(kWireVersionSupportingLogicalSession)) { cmdObj = driverSession._serverSession.injectSessionId(cmdObj); } + if (serverSupports(kWireVersionSupportingCausalConsistency) && + (driverSession.getOptions().isCausalConsistency() || + client.isCausalConsistency()) && + canUseReadConcern(cmdObj)) { + // `driverSession._operationTime` is the smallest time needed for performing a + // causally consistent read using the current session. Note that + // `client.getClusterTime()` is no smaller than the operation time and would + // therefore only be less efficient to wait until. + cmdObj = injectAfterClusterTime(cmdObj, driverSession._operationTime); + } + return cmdObj; } @@ -309,7 +408,11 @@ var { } this._serverSession = implMethods.createServerSession(client); - this._operationTime = null; + this._operationTime = _options.getInitialOperationTime(); + + if (_options.getInitialClusterTime() !== undefined) { + client.setClusterTime(_options.getInitialClusterTime()); + } this.getClient = function getClient() { return client; @@ -319,6 +422,14 @@ var { return _options; }; + this.getOperationTime = function getOperationTime() { + return this._operationTime; + }; + + this.getClusterTime = function getClusterTime() { + return client.getClusterTime(); + }; + this.getDatabase = function getDatabase(dbName) { const db = client.getDB(dbName); db._session = this; |