summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMax Hirschhorn <max.hirschhorn@mongodb.com>2017-09-20 14:15:00 -0400
committerMax Hirschhorn <max.hirschhorn@mongodb.com>2017-09-20 14:15:00 -0400
commit0195f34a7a61c75fd8fbaba6f8dae9671e2dd915 (patch)
tree63f9f41fae93f9b9c9efe5f993daf96dc7a0a0a9
parent44a2de49607e5340efc7e84d265216723d403add (diff)
downloadmongo-0195f34a7a61c75fd8fbaba6f8dae9671e2dd915.tar.gz
SERVER-30679 Integrate causallyConsistentReads into SessionOptions.
-rw-r--r--jstests/aggregation/bugs/server18198.js3
-rw-r--r--jstests/change_streams/change_stream.js8
-rw-r--r--jstests/change_streams/change_stream_invalidation.js4
-rw-r--r--jstests/change_streams/lookup_post_image.js10
-rw-r--r--jstests/change_streams/report_latest_observed_oplog_timestamp.js2
-rw-r--r--jstests/concurrency/fsm_all_sharded_causal_consistency.js2
-rw-r--r--jstests/concurrency/fsm_all_sharded_causal_consistency_and_balancer.js2
-rw-r--r--jstests/concurrency/fsm_libs/worker_thread.js7
-rw-r--r--jstests/multiVersion/3_upgrade_replset.js12
-rw-r--r--jstests/multiVersion/downgrade_replset.js9
-rw-r--r--jstests/multiVersion/minor_version_upgrade_replset.js9
-rw-r--r--jstests/noPassthrough/shell_can_use_read_concern.js314
-rw-r--r--jstests/noPassthroughWithMongod/create_indexes_shell_helper.js3
-rw-r--r--jstests/noPassthroughWithMongod/default_read_pref.js3
-rw-r--r--jstests/sharding/after_cluster_time.js3
-rw-r--r--jstests/sharding/causal_consistency_shell_support.js113
-rw-r--r--jstests/sharding/secondary_shard_version_protocol_with_causal_consistency.js12
-rw-r--r--src/mongo/shell/db.js2
-rw-r--r--src/mongo/shell/mongo.js205
-rw-r--r--src/mongo/shell/session.js113
20 files changed, 539 insertions, 297 deletions
diff --git a/jstests/aggregation/bugs/server18198.js b/jstests/aggregation/bugs/server18198.js
index d55a890963e..a5847ad30dd 100644
--- a/jstests/aggregation/bugs/server18198.js
+++ b/jstests/aggregation/bugs/server18198.js
@@ -30,6 +30,9 @@
getMaxWireVersion: function() {
return mongo.getMaxWireVersion();
},
+ isCausalConsistency: function() {
+ return false;
+ },
};
db._mongo = mockMongo;
diff --git a/jstests/change_streams/change_stream.js b/jstests/change_streams/change_stream.js
index c545456ef33..5e06eae5b52 100644
--- a/jstests/change_streams/change_stream.js
+++ b/jstests/change_streams/change_stream.js
@@ -33,7 +33,7 @@
// these synchronization points from this test.
assert.commandWorked(db.runCommand({
find: "foo",
- readConcern: {level: "local", afterClusterTime: db.getMongo().getOperationTime()}
+ readConcern: {level: "local", afterClusterTime: db.getSession().getOperationTime()}
}));
// Waiting for replication assures no previous operations will be included.
@@ -49,7 +49,7 @@
// TODO: SERVER-29126
assert.commandWorked(db.runCommand({
find: "foo",
- readConcern: {level: "local", afterClusterTime: db.getMongo().getOperationTime()}
+ readConcern: {level: "local", afterClusterTime: db.getSession().getOperationTime()}
}));
FixtureHelpers.awaitReplication();
if (expectedBatch.length == 0)
@@ -293,7 +293,7 @@
// TODO: SERVER-29126
assert.commandWorked(db.runCommand({
find: "foo",
- readConcern: {level: "local", afterClusterTime: db.getMongo().getOperationTime()}
+ readConcern: {level: "local", afterClusterTime: db.getSession().getOperationTime()}
}));
FixtureHelpers.awaitReplication();
FixtureHelpers.runCommandOnEachPrimary({
@@ -322,7 +322,7 @@
// TODO: SERVER-29126
assert.commandWorked(db.runCommand({
find: "foo",
- readConcern: {level: "local", afterClusterTime: db.getMongo().getOperationTime()}
+ readConcern: {level: "local", afterClusterTime: db.getSession().getOperationTime()}
}));
FixtureHelpers.awaitReplication();
assert.commandWorked(db.adminCommand(
diff --git a/jstests/change_streams/change_stream_invalidation.js b/jstests/change_streams/change_stream_invalidation.js
index 4d1122eeac3..8fcb5e9e527 100644
--- a/jstests/change_streams/change_stream_invalidation.js
+++ b/jstests/change_streams/change_stream_invalidation.js
@@ -19,7 +19,7 @@
// these synchronization points from this test.
assert.commandWorked(db.runCommand({
find: "foo",
- readConcern: {level: "local", afterClusterTime: db.getMongo().getOperationTime()}
+ readConcern: {level: "local", afterClusterTime: db.getSession().getOperationTime()}
}));
let res = assert.commandWorked(
@@ -70,7 +70,7 @@
// these synchronization points from this test.
assert.commandWorked(db.runCommand({
find: "foo",
- readConcern: {level: "local", afterClusterTime: db.getMongo().getOperationTime()}
+ readConcern: {level: "local", afterClusterTime: db.getSession().getOperationTime()}
}));
res =
diff --git a/jstests/change_streams/lookup_post_image.js b/jstests/change_streams/lookup_post_image.js
index 44c1e8de956..fa9301fa142 100644
--- a/jstests/change_streams/lookup_post_image.js
+++ b/jstests/change_streams/lookup_post_image.js
@@ -21,7 +21,7 @@
// these synchronization points from this test.
assert.commandWorked(db.runCommand({
find: "foo",
- readConcern: {level: "local", afterClusterTime: db.getMongo().getOperationTime()}
+ readConcern: {level: "local", afterClusterTime: db.getSession().getOperationTime()}
}));
const cmdResponse = assert.commandWorked(
@@ -44,7 +44,7 @@
// TODO: SERVER-29126
assert.commandWorked(db.runCommand({
find: "foo",
- readConcern: {level: "local", afterClusterTime: db.getMongo().getOperationTime()}
+ readConcern: {level: "local", afterClusterTime: db.getSession().getOperationTime()}
}));
FixtureHelpers.awaitReplication();
// TODO: SERVER-29126
@@ -54,7 +54,7 @@
// these synchronization points from this test.
assert.commandWorked(db.runCommand({
find: "foo",
- readConcern: {level: "local", afterClusterTime: db.getMongo().getOperationTime()}
+ readConcern: {level: "local", afterClusterTime: db.getSession().getOperationTime()}
}));
FixtureHelpers.runCommandOnEachPrimary({
dbName: "admin",
@@ -285,7 +285,7 @@
// these synchronization points from this test.
assert.commandWorked(db.runCommand({
find: "foo",
- readConcern: {level: "local", afterClusterTime: db.getMongo().getOperationTime()}
+ readConcern: {level: "local", afterClusterTime: db.getSession().getOperationTime()}
}));
res = assert.commandWorked(db.runCommand({
@@ -302,7 +302,7 @@
// these synchronization points from this test.
assert.commandWorked(db.runCommand({
find: "foo",
- readConcern: {level: "local", afterClusterTime: db.getMongo().getOperationTime()}
+ readConcern: {level: "local", afterClusterTime: db.getSession().getOperationTime()}
}));
res = assert.commandWorked(db.runCommand({
diff --git a/jstests/change_streams/report_latest_observed_oplog_timestamp.js b/jstests/change_streams/report_latest_observed_oplog_timestamp.js
index 7345cfc4667..4598bc6ce9a 100644
--- a/jstests/change_streams/report_latest_observed_oplog_timestamp.js
+++ b/jstests/change_streams/report_latest_observed_oplog_timestamp.js
@@ -51,7 +51,7 @@
// synchronization point from this test.
assert.commandWorked(db.runCommand({
find: "foo",
- readConcern: {level: "local", afterClusterTime: db.getMongo().getOperationTime()}
+ readConcern: {level: "local", afterClusterTime: db.getSession().getOperationTime()}
}));
// Look at one batch's worth.
diff --git a/jstests/concurrency/fsm_all_sharded_causal_consistency.js b/jstests/concurrency/fsm_all_sharded_causal_consistency.js
index 05deef3583c..1d2f8617d4c 100644
--- a/jstests/concurrency/fsm_all_sharded_causal_consistency.js
+++ b/jstests/concurrency/fsm_all_sharded_causal_consistency.js
@@ -114,4 +114,4 @@ runWorkloadsSerially(
return !Array.contains(blacklist, file);
}),
{sharded: {enabled: true}, replication: {enabled: true}},
- {sessionOptions: {causallyConsistentReads: true, readPreference: {mode: "secondary"}}});
+ {sessionOptions: {causalConsistency: true, readPreference: {mode: "secondary"}}});
diff --git a/jstests/concurrency/fsm_all_sharded_causal_consistency_and_balancer.js b/jstests/concurrency/fsm_all_sharded_causal_consistency_and_balancer.js
index 44120d28c85..7ee65f861bf 100644
--- a/jstests/concurrency/fsm_all_sharded_causal_consistency_and_balancer.js
+++ b/jstests/concurrency/fsm_all_sharded_causal_consistency_and_balancer.js
@@ -120,4 +120,4 @@ runWorkloadsSerially(
return !Array.contains(blacklist, file);
}),
{sharded: {enabled: true, enableBalancer: true}, replication: {enabled: true}},
- {sessionOptions: {causallyConsistentReads: true, readPreference: {mode: "secondary"}}});
+ {sessionOptions: {causalConsistency: true, readPreference: {mode: "secondary"}}});
diff --git a/jstests/concurrency/fsm_libs/worker_thread.js b/jstests/concurrency/fsm_libs/worker_thread.js
index 2f5e0a40245..44bdb870d0c 100644
--- a/jstests/concurrency/fsm_libs/worker_thread.js
+++ b/jstests/concurrency/fsm_libs/worker_thread.js
@@ -41,13 +41,6 @@ var workerThread = (function() {
myDB = new Mongo(args.host)
.startSession(args.sessionOptions)
.getDatabase(args.dbName);
-
- if (args.sessionOptions.causallyConsistentReads) {
- // TODO SERVER-30679: We manually enable causal consistency on the
- // connection object so that "afterClusterTime" is injected into the
- // readConcern of any command requests through this connection.
- myDB.getMongo().setCausalConsistency();
- }
} else {
myDB = new Mongo(args.host).getDB(args.dbName);
}
diff --git a/jstests/multiVersion/3_upgrade_replset.js b/jstests/multiVersion/3_upgrade_replset.js
index bccab5068c7..ea8826c1ad7 100644
--- a/jstests/multiVersion/3_upgrade_replset.js
+++ b/jstests/multiVersion/3_upgrade_replset.js
@@ -58,6 +58,11 @@ rst.upgradeSet({binVersion: "latest"});
jsTest.log("Replica set upgraded.");
+// We save a reference to the old primary so that we can call reconnect() on it before
+// joinFindInsert() would attempt to send the node an update operation that signals the parallel
+// shell running the background operations to stop.
+var oldPrimary = primary;
+
// Wait for primary
var primary = rst.getPrimary();
@@ -73,8 +78,15 @@ jsTest.log("Replica set downgraded.");
// Allow even more valid writes to go through
sleep(10 * 1000);
+// Since the primary from before the upgrade took place was restarted as part of the
+// upgrade/downgrade process, we explicitly reconnect to it so that sending it an update operation
+// silently fails with an unchecked NotMaster error rather than a network error.
+reconnect(oldPrimary.getDB("admin"));
joinFindInsert();
+// Since the primary from after the upgrade took place was restarted as part of the downgrade
+// process, we explicitly reconnect to it.
+reconnect(primary.getDB("admin"));
var totalInserts = primary.getCollection(insertNS).find().sort({_id: -1}).next()._id + 1;
var dataFound = primary.getCollection(insertNS).count();
diff --git a/jstests/multiVersion/downgrade_replset.js b/jstests/multiVersion/downgrade_replset.js
index 3e324a69ad6..7804769f4dc 100644
--- a/jstests/multiVersion/downgrade_replset.js
+++ b/jstests/multiVersion/downgrade_replset.js
@@ -54,9 +54,18 @@ function runDowngradeTest(protocolVersion) {
rst.upgradeSet({binVersion: oldVersion});
jsTest.log("Downgrade complete.");
+ // We save a reference to the old primary so that we can call reconnect() on it before
+ // joinFindInsert() would attempt to send the node an update operation that signals the parallel
+ // shell running the background operations to stop.
+ var oldPrimary = primary;
+
primary = rst.getPrimary();
printjson(rst.status());
+ // Since the old primary was restarted as part of the downgrade process, we explicitly reconnect
+ // to it so that sending it an update operation silently fails with an unchecked NotMaster error
+ // rather than a network error.
+ reconnect(oldPrimary.getDB("admin"));
joinFindInsert();
rst.stopSet();
}
diff --git a/jstests/multiVersion/minor_version_upgrade_replset.js b/jstests/multiVersion/minor_version_upgrade_replset.js
index 45d3a7f4844..fb98e74f70e 100644
--- a/jstests/multiVersion/minor_version_upgrade_replset.js
+++ b/jstests/multiVersion/minor_version_upgrade_replset.js
@@ -58,6 +58,11 @@ rst.upgradeSet({binVersion: "latest"});
jsTest.log("Replica set upgraded.");
+// We save a reference to the old primary so that we can call reconnect() on it before
+// joinFindInsert() would attempt to send the node an update operation that signals the parallel
+// shell running the background operations to stop.
+var oldPrimary = primary;
+
// Wait for primary
var primary = rst.getPrimary();
@@ -66,6 +71,10 @@ printjson(rst.status());
// Allow more valid writes to go through
sleep(10 * 1000);
+// Since the old primary was restarted as part of the upgrade process, we explicitly reconnect to it
+// so that sending it an update operation silently fails with an unchecked NotMaster error rather
+// than a network error.
+reconnect(oldPrimary.getDB("admin"));
joinFindInsert();
var totalInserts = primary.getCollection(insertNS).find().sort({_id: -1}).next()._id + 1;
diff --git a/jstests/noPassthrough/shell_can_use_read_concern.js b/jstests/noPassthrough/shell_can_use_read_concern.js
new file mode 100644
index 00000000000..95d47c4e74e
--- /dev/null
+++ b/jstests/noPassthrough/shell_can_use_read_concern.js
@@ -0,0 +1,314 @@
+/**
+ * Tests that read operations executed through the mongo shell's API are specify afterClusterTime
+ * when causal consistency is enabled.
+ */
+(function() {
+ "use strict";
+
+ const rst = new ReplSetTest({nodes: 1});
+ rst.startSet();
+ rst.initiate();
+
+ const primary = rst.getPrimary();
+
+ function runTests({withSession}) {
+ let db;
+
+ if (withSession) {
+ primary.setCausalConsistency(false);
+ db = primary.startSession({causalConsistency: true}).getDatabase("test");
+ } else {
+ primary.setCausalConsistency(true);
+ db = primary.getDB("test");
+ }
+
+ const coll = db.shell_can_use_read_concern;
+ coll.drop();
+
+ function testCommandCanBeCausallyConsistent(func, {
+ expectedSession: expectedSession = withSession,
+ expectedAfterClusterTime: expectedAfterClusterTime = true
+ } = {}) {
+ const mongoRunCommandOriginal = Mongo.prototype.runCommand;
+
+ const sentinel = {};
+ let cmdObjSeen = sentinel;
+
+ Mongo.prototype.runCommand = function runComandSpy(dbName, cmdObj, options) {
+ cmdObjSeen = cmdObj;
+ return mongoRunCommandOriginal.apply(this, arguments);
+ };
+
+ try {
+ assert.doesNotThrow(func);
+ } finally {
+ Mongo.prototype.runCommand = mongoRunCommandOriginal;
+ }
+
+ if (cmdObjSeen === sentinel) {
+ throw new Error("Mongo.prototype.runCommand() was never called: " +
+ func.toString());
+ }
+
+ let cmdName = Object.keys(cmdObjSeen)[0];
+
+ // If the command is in a wrapped form, then we look for the actual command object
+ // inside
+ // the query/$query object.
+ if (cmdName === "query" || cmdName === "$query") {
+ cmdObjSeen = cmdObjSeen[cmdName];
+ cmdName = Object.keys(cmdObjSeen)[0];
+ }
+
+ if (expectedSession) {
+ assert(cmdObjSeen.hasOwnProperty("lsid"),
+ "Expected operation " + tojson(cmdObjSeen) +
+ " to have a logical session id: " + func.toString());
+ }
+
+ if (expectedAfterClusterTime) {
+ assert(cmdObjSeen.hasOwnProperty("readConcern"),
+ "Expected operation " + tojson(cmdObjSeen) +
+ " to have a readConcern object since it can be causally consistent: " +
+ func.toString());
+
+ const readConcern = cmdObjSeen.readConcern;
+ assert(readConcern.hasOwnProperty("afterClusterTime"),
+ "Expected operation " + tojson(cmdObjSeen) +
+ " to specify afterClusterTime since it can be causally consistent: " +
+ func.toString());
+ } else {
+ assert(!cmdObjSeen.hasOwnProperty("readConcern"),
+ "Expected operation " + tojson(cmdObjSeen) + " to not have a readConcern" +
+ " object since it cannot be causally consistent: " + func.toString());
+ }
+ }
+
+ //
+ // Tests for the "find" and "getMore" commands.
+ //
+
+ {
+ testCommandCanBeCausallyConsistent(function() {
+ assert.writeOK(coll.insert([{}, {}, {}, {}, {}]));
+ }, {expectedSession: withSession, expectedAfterClusterTime: false});
+
+ testCommandCanBeCausallyConsistent(function() {
+ assert.commandWorked(
+ db.runCommand({find: coll.getName(), batchSize: 5, singleBatch: true}));
+ });
+
+ const cursor = coll.find().batchSize(2);
+
+ testCommandCanBeCausallyConsistent(function() {
+ cursor.next();
+ cursor.next();
+ });
+
+ testCommandCanBeCausallyConsistent(function() {
+ cursor.next();
+ cursor.next();
+ cursor.next();
+ assert(!cursor.hasNext());
+ }, {
+ // TODO SERVER-30848: Change expectedSession to `withSession` after getMore requests
+ // from the mongo shell use the logical session the cursor was established with.
+ expectedSession: false,
+ expectedAfterClusterTime: false,
+ });
+ }
+
+ //
+ // Tests for the "count" command.
+ //
+
+ testCommandCanBeCausallyConsistent(function() {
+ assert.commandWorked(db.runCommand({count: coll.getName()}));
+ });
+
+ testCommandCanBeCausallyConsistent(function() {
+ assert.commandWorked(db.runCommand({query: {count: coll.getName()}}));
+ });
+
+ testCommandCanBeCausallyConsistent(function() {
+ assert.commandWorked(db.runCommand({$query: {count: coll.getName()}}));
+ });
+
+ testCommandCanBeCausallyConsistent(function() {
+ assert.eq(5, coll.count());
+ });
+
+ //
+ // Tests for the "distinct" command.
+ //
+
+ testCommandCanBeCausallyConsistent(function() {
+ assert.commandWorked(db.runCommand({distinct: coll.getName(), key: "_id"}));
+ });
+
+ testCommandCanBeCausallyConsistent(function() {
+ const values = coll.distinct("_id");
+ assert.eq(5, values.length, tojson(values));
+ });
+
+ //
+ // Tests for the "aggregate" command.
+ //
+
+ {
+ testCommandCanBeCausallyConsistent(function() {
+ assert.commandWorked(db.runCommand(
+ {aggregate: coll.getName(), pipeline: [], cursor: {batchSize: 5}}));
+ });
+
+ testCommandCanBeCausallyConsistent(function() {
+ assert.commandWorked(db.runCommand({
+ aggregate: coll.getName(),
+ pipeline: [],
+ cursor: {batchSize: 5},
+ explain: true
+ }));
+ }, {expectedSession: withSession, expectedAfterClusterTime: false});
+
+ let cursor;
+
+ testCommandCanBeCausallyConsistent(function() {
+ cursor = coll.aggregate([], {cursor: {batchSize: 2}});
+ cursor.next();
+ cursor.next();
+ });
+
+ testCommandCanBeCausallyConsistent(function() {
+ cursor.next();
+ cursor.next();
+ cursor.next();
+ assert(!cursor.hasNext());
+ }, {
+ // TODO SERVER-30848: Change expectedSession to `withSession` after getMore requests
+ // from the mongo shell use the logical session the cursor was established with.
+ expectedSession: false,
+ expectedAfterClusterTime: false,
+ });
+ }
+
+ //
+ // Tests for the "mapReduce" command.
+ //
+
+ testCommandCanBeCausallyConsistent(function() {
+ const res = assert.commandWorked(db.runCommand({
+ mapReduce: coll.getName(),
+ map: function() {
+ emit("x", 1);
+ },
+ reduce: function(key, values) {
+ return values.length;
+ },
+ out: {inline: 1}
+ }));
+ assert.eq([{_id: "x", value: 5}], res.results, tojson(res));
+ });
+
+ testCommandCanBeCausallyConsistent(function() {
+ const res = coll.mapReduce(
+ function() {
+ emit("x", 1);
+ },
+ function(key, values) {
+ return values.length;
+ },
+ {out: {inline: 1}});
+ assert.eq([{_id: "x", value: 5}], res.results, tojson(res));
+ });
+
+ //
+ // Tests for the "group" command.
+ //
+
+ testCommandCanBeCausallyConsistent(function() {
+ const res = assert.commandWorked(db.runCommand({
+ group: {
+ ns: coll.getName(),
+ key: {x: 1},
+ $reduce: function(curr, result) {
+ ++result.total;
+ },
+ initial: {total: 0}
+ }
+ }));
+ assert.eq([{x: null, total: 5}], res.retval, tojson(res));
+ });
+
+ testCommandCanBeCausallyConsistent(function() {
+ const res = coll.group({
+ key: {x: 1},
+ $reduce: function(curr, result) {
+ ++result.total;
+ },
+ initial: {total: 0}
+ });
+ assert.eq([{x: null, total: 5}], res);
+ });
+
+ //
+ // Tests for the "geoNear" command.
+ //
+
+ testCommandCanBeCausallyConsistent(function() {
+ assert.commandWorked(coll.createIndex({loc: "2dsphere"}));
+ }, {expectedSession: withSession, expectedAfterClusterTime: false});
+
+ testCommandCanBeCausallyConsistent(function() {
+ assert.commandWorked(db.runCommand({
+ geoNear: coll.getName(),
+ near: {type: "Point", coordinates: [0, 0]},
+ spherical: true
+ }));
+ });
+
+ //
+ // Tests for the "geoSearch" command.
+ //
+
+ testCommandCanBeCausallyConsistent(function() {
+ assert.commandWorked(coll.createIndex({loc: "geoHaystack", other: 1}, {bucketSize: 1}));
+ }, {expectedSession: withSession, expectedAfterClusterTime: false});
+
+ testCommandCanBeCausallyConsistent(function() {
+ assert.commandWorked(db.runCommand(
+ {geoSearch: coll.getName(), near: [0, 0], maxDistance: 1, search: {}}));
+ });
+
+ //
+ // Tests for the "parallelCollectionScan" command.
+ //
+
+ testCommandCanBeCausallyConsistent(function() {
+ assert.commandWorked(
+ db.runCommand({parallelCollectionScan: coll.getName(), numCursors: 1}));
+ });
+
+ //
+ // Tests for the "explain" command.
+ //
+
+ testCommandCanBeCausallyConsistent(function() {
+ assert.commandWorked(db.runCommand({explain: {find: coll.getName()}}));
+ });
+
+ testCommandCanBeCausallyConsistent(function() {
+ coll.find().explain();
+ });
+
+ testCommandCanBeCausallyConsistent(function() {
+ coll.explain().find().finish();
+ });
+
+ db.getSession().endSession();
+ }
+
+ runTests({withSession: false});
+ runTests({withSession: true});
+
+ rst.stopSet();
+})();
diff --git a/jstests/noPassthroughWithMongod/create_indexes_shell_helper.js b/jstests/noPassthroughWithMongod/create_indexes_shell_helper.js
index ae59ec3402f..01f891b0810 100644
--- a/jstests/noPassthroughWithMongod/create_indexes_shell_helper.js
+++ b/jstests/noPassthroughWithMongod/create_indexes_shell_helper.js
@@ -41,6 +41,9 @@
getMaxWireVersion: function() {
return mongo.getMaxWireVersion();
},
+ isCausalConsistency: function() {
+ return false;
+ },
};
db._mongo = mockMongo;
diff --git a/jstests/noPassthroughWithMongod/default_read_pref.js b/jstests/noPassthroughWithMongod/default_read_pref.js
index dfc092da399..b5171f559ff 100644
--- a/jstests/noPassthroughWithMongod/default_read_pref.js
+++ b/jstests/noPassthroughWithMongod/default_read_pref.js
@@ -27,6 +27,9 @@
getMaxWireVersion: function() {
return mongo.getMaxWireVersion();
},
+ isCausalConsistency: function() {
+ return false;
+ },
};
db._session = new _DummyDriverSession(db._mongo);
diff --git a/jstests/sharding/after_cluster_time.js b/jstests/sharding/after_cluster_time.js
index 47b97a3315e..bc8b1cf462a 100644
--- a/jstests/sharding/after_cluster_time.js
+++ b/jstests/sharding/after_cluster_time.js
@@ -90,8 +90,7 @@
testDB.getMongo().setCausalConsistency(true);
// With causal consistency enabled, the shell sets read concern to level "majority" if it is
- // not
- // specified.
+ // not specified.
assertAfterClusterTimeReadSucceeds(testDB, {afterClusterTime: Timestamp(1, 1)});
testDB.getMongo().setCausalConsistency(false);
};
diff --git a/jstests/sharding/causal_consistency_shell_support.js b/jstests/sharding/causal_consistency_shell_support.js
index 4e3f0818f1a..b5640d2ab8f 100644
--- a/jstests/sharding/causal_consistency_shell_support.js
+++ b/jstests/sharding/causal_consistency_shell_support.js
@@ -8,43 +8,27 @@
load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority.
- // Verifies causal consistency is either enabled or disabled for each given command name.
- function checkCausalConsistencySupportForCommandNames(cmdObjs, isReadCommand) {
- cmdObjs.forEach(function(cmdObj) {
- let cmdName = Object.keys(cmdObj)[0];
- if (cmdName === "query" || cmdName === "$query") {
- cmdObj = cmdObj[cmdName];
- cmdName = Object.keys(cmdObj)[0];
- }
-
- assert.eq(testDB.getMongo()._isReadCommand(cmdObj),
- isReadCommand,
- "expected causal consistency support for command, " + tojson(cmdObj) +
- ", to be " + isReadCommand);
- });
- }
-
// Verifies the command works and properly updates operation or cluster time.
function runCommandAndCheckLogicalTimes(cmdObj, db, shouldAdvance) {
- const mongo = db.getMongo();
+ const session = db.getSession();
// Extract initial operation and cluster time.
- let operationTime = mongo.getOperationTime();
- let clusterTimeObj = mongo.getClusterTime();
+ let operationTime = session.getOperationTime();
+ let clusterTimeObj = session.getClusterTime();
assert.commandWorked(db.runCommand(cmdObj));
// Verify cluster and operation time.
if (shouldAdvance) {
- assert(bsonWoCompare(mongo.getOperationTime(), operationTime) > 0,
+ assert(bsonWoCompare(session.getOperationTime(), operationTime) > 0,
"expected the shell's operationTime to increase after running command: " +
tojson(cmdObj));
assert(
- bsonWoCompare(mongo.getClusterTime().clusterTime, clusterTimeObj.clusterTime) > 0,
+ bsonWoCompare(session.getClusterTime().clusterTime, clusterTimeObj.clusterTime) > 0,
"expected the shell's clusterTime value to increase after running command: " +
tojson(cmdObj));
} else {
- assert(bsonWoCompare(mongo.getOperationTime(), operationTime) == 0,
+ assert(bsonWoCompare(session.getOperationTime(), operationTime) == 0,
"expected the shell's operationTime to not change after running command: " +
tojson(cmdObj));
// Don't check clusterTime, because during a slow operation clusterTime may be
@@ -54,63 +38,20 @@
// Verifies the command works and its response satisfies the callback.
function commandReturnsExpectedResult(cmdObj, db, resCallback) {
- const mongo = db.getMongo();
+ const session = db.getSession();
// Use the latest cluster time returned as a new operationTime and run command.
- const clusterTimeObj = mongo.getClusterTime();
- mongo.setOperationTime(clusterTimeObj.clusterTime);
+ const clusterTimeObj = session.getClusterTime();
+ session._operationTime = clusterTimeObj.clusterTime;
const res = assert.commandWorked(testDB.runCommand(cmdObj));
// Verify the response contents and that new operation time is >= passed in time.
- assert(bsonWoCompare(mongo.getOperationTime(), clusterTimeObj.clusterTime) >= 0,
+ assert(bsonWoCompare(session.getOperationTime(), clusterTimeObj.clusterTime) >= 0,
"expected the shell's operationTime to be >= to:" + clusterTimeObj.clusterTime +
" after running command: " + tojson(cmdObj));
resCallback(res);
}
- // All commands currently enabled to use causal consistency in the shell.
- const supportedCommandNames = [
- {"query": {"aggregate": "test", "pipeline": [{"$match": {"x": 1}}]}},
- {"aggregate": "test", "pipeline": [{"$match": {"x": 1}}]},
- {"group": {"key": {"x": 1}}},
- {"query": {"group": {"key": {"x": 1}}}},
- {"query": {"explain": {"group": {"key": {"x": 1}}}}},
- {"count": "test", "query": {}},
- {"query": {"count": "test", "query": {}}},
- {"query": {"explain": {"count": "test", "query": {}}}},
- {"explain": {"count": "test", "query": {}}},
- {"distinct": "test", "query": {}},
- {"query": {"distinct": "test", "query": {}}},
- {"query": {"explain": {"distinct": "test", "query": {}}}},
- {"find": "test", "query": {}},
- {"query": {"find": "test", "query": {}}},
- {"query": {"explain": {"find": "test", "query": {}}}},
- {"geoNear": "test", "near": {}},
- {"query": {"geoNear": "test", "near": {}}},
- {"query": {"explain": {"geoNear": "test", "near": {}}}},
- {"geoSearch": "test", "near": {}},
- {"mapReduce": "test"},
- {"parallelCollectionScan": "test"},
- {"getMore": NumberLong("5888577173997830861")}
- ];
-
- // Omitting some commands for simplicity. Every command not listed above should be unsupported.
- const unsupportedCommandNames = [
- {"aggregate": "test", "pipeline": [{"$match": {"x": 1}}], "explain": true},
- {"explain": {"aggregate": "test", "pipeline": [{"$match": {"x": 1}}]}},
- {"delete": "coll", "query": {"x": 1}},
- {"explain": {"delete": "coll", "query": {"x": 1}}},
- {"findAndModify": "coll", "query": {"x": 1}},
- {"explain": {"findAndModify": "coll", "query": {"x": 1}}},
- {"query": {"explain": {"findAndModify": "coll", "query": {"x": 1}}}},
- {"insert": "coll"},
- {"explain": {"insert": "coll"}},
- {"explain": {"update": "coll"}},
- {"update": "coll"},
- {"getLastError": {"x": 1}},
- {"getPrevError": {"x": 1}}
- ];
-
// Manually create a shard so tests on storage engines that don't support majority readConcern
// can exit early.
const rsName = "causal_consistency_shell_support_rs";
@@ -136,32 +77,32 @@
assert.commandWorked(st.s.adminCommand({addShard: rst.getURL()}));
const testDB = st.s.getDB("test");
- const mongo = testDB.getMongo();
+ const session = testDB.getSession();
// Verify causal consistency is disabled unless explicitly set.
- assert.eq(!!mongo._isCausal, false);
- mongo.setCausalConsistency(true);
+ assert.eq(testDB.getMongo()._causalConsistency,
+ false,
+ "causal consistency should be disabled by default");
+ testDB.getMongo().setCausalConsistency(true);
// Verify causal consistency is enabled for the connection and for each supported command.
- assert.eq(!!mongo._isCausal, true);
- checkCausalConsistencySupportForCommandNames(supportedCommandNames, true);
- checkCausalConsistencySupportForCommandNames(unsupportedCommandNames, false);
+ assert.eq(testDB.getMongo()._causalConsistency,
+ true,
+ "calling setCausalConsistency() didn't enable causal consistency");
// Verify cluster times are tracked even before causal consistency is set (so the first
// operation with causal consistency set can use valid cluster times).
- mongo._operationTime = null;
- mongo._clusterTime = null;
+ session._operationTime = undefined;
assert.commandWorked(testDB.runCommand({insert: "foo", documents: [{x: 1}]}));
- assert.neq(mongo.getOperationTime(), null);
- assert.neq(mongo.getClusterTime(), null);
+ assert.neq(session.getOperationTime(), null);
+ assert.neq(session.getClusterTime(), null);
- mongo._operationTime = null;
- mongo._clusterTime = null;
+ session._operationTime = undefined;
assert.commandWorked(testDB.runCommand({find: "foo"}));
- assert.neq(mongo.getOperationTime(), null);
- assert.neq(mongo.getClusterTime(), null);
+ assert.neq(session.getOperationTime(), null);
+ assert.neq(session.getClusterTime(), null);
// Test that write commands advance both operation and cluster time.
runCommandAndCheckLogicalTimes({insert: "foo", documents: [{x: 2}]}, testDB, true);
@@ -170,7 +111,7 @@
// Test that each supported command works as expected and the shell's cluster times are properly
// forwarded to the server and updated based on the response.
- mongo.setCausalConsistency(true);
+ testDB.getMongo().setCausalConsistency(true);
// Aggregate command.
let aggColl = "aggColl";
@@ -244,7 +185,7 @@
// Verify that the server rejects commands when operation time is invalid by running a command
// with an afterClusterTime value one day ahead.
- const invalidTime = new Timestamp(mongo.getOperationTime().getTime() + (60 * 60 * 24), 0);
+ const invalidTime = new Timestamp(session.getOperationTime().getTime() + (60 * 60 * 24), 0);
const invalidCmd = {
find: "foo",
readConcern: {level: "majority", afterClusterTime: invalidTime}
@@ -254,7 +195,7 @@
ErrorCodes.InvalidOptions,
"expected command, " + tojson(invalidCmd) + ", to fail with code, " +
ErrorCodes.InvalidOptions + ", because the afterClusterTime value, " + invalidTime +
- ", should not be ahead of the clusterTime, " + mongo.getClusterTime().clusterTime);
+ ", should not be ahead of the clusterTime, " + session.getClusterTime().clusterTime);
st.stop();
})();
diff --git a/jstests/sharding/secondary_shard_version_protocol_with_causal_consistency.js b/jstests/sharding/secondary_shard_version_protocol_with_causal_consistency.js
index d344032102a..17c36b3d80d 100644
--- a/jstests/sharding/secondary_shard_version_protocol_with_causal_consistency.js
+++ b/jstests/sharding/secondary_shard_version_protocol_with_causal_consistency.js
@@ -46,20 +46,24 @@
// Note: this query will not be registered by the profiler because it errors before reaching the
// storage level.
jsTest.log("Do a secondary read from stale mongos with afterClusterTime and level 'available'");
- assert.commandFailedWithCode(staleMongos.getDB(dbName).runCommand({
+ const staleMongosDB = staleMongos.getDB(dbName);
+ assert.commandFailedWithCode(staleMongosDB.runCommand({
count: collName,
query: {x: 1},
$readPreference: {mode: "secondary"},
- readConcern: {'afterClusterTime': staleMongos.getOperationTime(), 'level': 'available'}
+ readConcern: {
+ 'afterClusterTime': staleMongosDB.getSession().getOperationTime(),
+ 'level': 'available'
+ }
}),
ErrorCodes.InvalidOptions);
jsTest.log("Do a secondary read from stale mongos with afterClusterTime and no level");
- let res = staleMongos.getDB('test').runCommand({
+ let res = staleMongosDB.runCommand({
count: collName,
query: {x: 1},
$readPreference: {mode: "secondary"},
- readConcern: {'afterClusterTime': staleMongos.getOperationTime()},
+ readConcern: {'afterClusterTime': staleMongosDB.getSession().getOperationTime()},
});
assert(res.ok);
assert.eq(1, res.n, tojson(res));
diff --git a/src/mongo/shell/db.js b/src/mongo/shell/db.js
index 4930a5d46b0..7b9c1f25f62 100644
--- a/src/mongo/shell/db.js
+++ b/src/mongo/shell/db.js
@@ -1857,7 +1857,7 @@ var DB;
(function(hasOwnProperty) {
DB.prototype.getSession = function() {
if (!hasOwnProperty.call(this, "_session")) {
- this._session = new _DummyDriverSession(this.getMongo());
+ this._session = this.getMongo()._getDefaultSession();
}
return this._session;
};
diff --git a/src/mongo/shell/mongo.js b/src/mongo/shell/mongo.js
index c5ca1a851ee..231206a9766 100644
--- a/src/mongo/shell/mongo.js
+++ b/src/mongo/shell/mongo.js
@@ -11,24 +11,10 @@ if (!Mongo.prototype) {
throw Error("Mongo.prototype not defined");
}
-(function(original) {
- Mongo.prototype.find = function find(ns, query, fields, limit, skip, batchSize, options) {
- const self = this;
- // Causal consistency is not supported in the OP_QUERY legacy protocol.
- if (this._isCausal && this.useReadCommands()) {
- query = this._gossipLogicalTime(query);
- }
- const res = original.call(this, ns, query, fields, limit, skip, batchSize, options);
- const origNext = res.next;
- res.next = function next() {
- const ret = origNext.call(this);
- self._setLogicalTimeFromReply(ret);
- return ret;
- };
- return res;
+if (!Mongo.prototype.find)
+ Mongo.prototype.find = function(ns, query, fields, limit, skip, batchSize, options) {
+ throw Error("find not implemented");
};
-})(Mongo.prototype.find);
-
if (!Mongo.prototype.insert)
Mongo.prototype.insert = function(ns, obj) {
throw Error("insert not implemented");
@@ -46,13 +32,6 @@ if (typeof mongoInject == "function") {
mongoInject(Mongo.prototype);
}
-Mongo.prototype.setCausalConsistency = function(value) {
- if (arguments.length === 0) {
- value = true;
- }
- this._isCausal = value;
-};
-
Mongo.prototype.setSlaveOk = function(value) {
if (value == undefined)
value = true;
@@ -77,7 +56,7 @@ Mongo.prototype.getDB = function(name) {
return new DB(this, name);
};
-Mongo.prototype.getDBs = function(driverSession = new _DummyDriverSession(this)) {
+Mongo.prototype.getDBs = function(driverSession = this._getDefaultSession()) {
var cmdObj = {listDatabases: 1};
cmdObj = driverSession._serverSession.injectSessionId(cmdObj);
@@ -87,141 +66,6 @@ Mongo.prototype.getDBs = function(driverSession = new _DummyDriverSession(this))
return res;
};
-Mongo.prototype._isReadCommand = function(cmdObj) {
- let readCommands = [
- "count",
- "distinct",
- "find",
- "getMore",
- "geoNear",
- "geoSearch",
- "group",
- "mapReduce",
- "mapreduce",
- "parallelCollectionScan",
- ];
-
- const cmdName = Object.keys(cmdObj)[0];
- let isReadCommand = Array.contains(readCommands, cmdName);
- if (cmdName === "aggregate") {
- // Aggregate can be either a read or a write depending on whether it has a $out stage.
- // $out is required to be the last stage of the pipeline.
- var stages = cmdObj.pipeline;
- const lastStage = stages && Array.isArray(stages) && (stages.length !== 0)
- ? stages[stages.length - 1]
- : undefined;
- const hasOut =
- lastStage && (typeof lastStage === "object") && lastStage.hasOwnProperty("$out");
- const hasExplain = cmdObj.hasOwnProperty("explain") && cmdObj.explain;
-
- if (!hasExplain && !hasOut) {
- isReadCommand = true;
- }
- }
-
- if (cmdName === "explain") {
- if (Array.contains(readCommands, Object.keys(cmdObj[cmdName])[0])) {
- isReadCommand = true;
- }
- }
-
- return isReadCommand;
-};
-
-/**
- * Adds afterClusterTime to the readConcern.
- */
-Mongo.prototype._injectAfterClusterTime = function(cmdObj) {
- cmdObj = Object.assign({}, cmdObj);
- // The operationTime returned by the current session (i.e. connection) is the
- // smallest time that is needed for causal consistent read. The clusterTime is >=
- // the operationTime so it's less efficient to wait on the server for the
- // clusterTime.
- const operationTime = this.getOperationTime();
- if (operationTime) {
- let cmdName = Object.keys(cmdObj)[0];
- let cmdObjUnwrapped = cmdObj;
- if (cmdName === "query" || cmdName === "$query") {
- cmdObj[cmdName] = Object.assign({}, cmdObj[cmdName]);
- cmdObjUnwrapped = cmdObj[cmdName];
- cmdName = Object.keys(cmdObjUnwrapped)[0];
- }
-
- cmdObjUnwrapped.readConcern = Object.assign({}, cmdObjUnwrapped.readConcern);
- let readConcern = cmdObjUnwrapped.readConcern;
-
- if (!readConcern.hasOwnProperty("afterClusterTime")) {
- readConcern.afterClusterTime = operationTime;
- }
- if (!readConcern.hasOwnProperty("level")) {
- readConcern.level = "local";
- }
-
- const isReadCommand = this._isReadCommand(cmdObjUnwrapped);
- const readPref = this.getReadPref();
- // While the readConcern must be set on the commandObject level i.e. in this case its the
- // object referenced by cmdObjUnwrapped the $readPreference must be set on the top level.
- if (isReadCommand && !cmdObj.hasOwnProperty("$readPreference") && readPref) {
- cmdObj.$readPreference = readPref;
- }
- }
- return cmdObj;
-};
-
-Mongo.prototype._gossipLogicalTime = function(obj) {
- obj = Object.assign({}, obj);
- const clusterTime = this.getClusterTime();
- if (clusterTime) {
- obj["$clusterTime"] = clusterTime;
- }
- return obj;
-};
-
-/**
- * Sets logicalTime and operationTime extracted from command reply.
- * This is applicable for the protocol starting from version 3.6.
- */
-Mongo.prototype._setLogicalTimeFromReply = function(res) {
- if (res.hasOwnProperty("operationTime")) {
- this.setOperationTime(res["operationTime"]);
- }
- if (res.hasOwnProperty("$clusterTime")) {
- this.setClusterTime(res["$clusterTime"]);
- }
-};
-
-/**
- * Adds afterClusterTime to the readConcern if its supported and runs the command.
- */
-(function(original) {
- Mongo.prototype.runCommandWithMetadata = function runCommandWithMetadata(
- dbName, metadata, cmdObj) {
- if (this._isCausal && cmdObj) {
- cmdObj = this._injectAfterClusterTime(cmdObj);
- metadata = this._gossipLogicalTime(metadata);
- }
- const res = original.call(this, dbName, metadata, cmdObj);
-
- this._setLogicalTimeFromReply(res);
- return res;
- };
-})(Mongo.prototype.runCommandWithMetadata);
-
-/**
- * Adds afterClusterTime to the readConcern if its supported and runs the command.
- */
-(function(original) {
- Mongo.prototype.runCommand = function runCommand(dbName, cmdObj, options) {
- if (this._isCausal && cmdObj) {
- cmdObj = this._injectAfterClusterTime(cmdObj);
- cmdObj = this._gossipLogicalTime(cmdObj);
- }
- const res = original.call(this, dbName, cmdObj, options);
- this._setLogicalTimeFromReply(res);
- return res;
- };
-})(Mongo.prototype.runCommand);
-
Mongo.prototype.adminCommand = function(cmd) {
return this.getDB("admin").runCommand(cmd);
};
@@ -229,7 +73,7 @@ Mongo.prototype.adminCommand = function(cmd) {
/**
* Returns all log components and current verbosity values
*/
-Mongo.prototype.getLogComponents = function(driverSession = new _DummyDriverSession(this)) {
+Mongo.prototype.getLogComponents = function(driverSession = this._getDefaultSession()) {
var cmdObj = {getParameter: 1, logComponentVerbosity: 1};
cmdObj = driverSession._serverSession.injectSessionId(cmdObj);
@@ -244,7 +88,7 @@ Mongo.prototype.getLogComponents = function(driverSession = new _DummyDriverSess
* string of form "storage.journaling"
*/
Mongo.prototype.setLogLevel = function(
- logLevel, component, driverSession = new _DummyDriverSession(this)) {
+ logLevel, component, driverSession = this._getDefaultSession()) {
componentNames = [];
if (typeof component === "string") {
componentNames = component.split(".");
@@ -547,30 +391,27 @@ Mongo.prototype.unsetWriteConcern = function() {
delete this._writeConcern;
};
-/**
- * Sets the operationTime.
- */
-Mongo.prototype.setOperationTime = function(operationTime) {
- if (operationTime === Timestamp(0, 0)) {
- throw Error("Attempt to set an uninitiated operationTime");
- }
- if (this._operationTime === undefined || this._operationTime === null ||
- (typeof operationTime === "object" &&
- bsonWoCompare(operationTime, this._operationTime) === 1)) {
- this._operationTime = operationTime;
+Mongo.prototype.startSession = function startSession(options) {
+ return new DriverSession(this, options);
+};
+
+Mongo.prototype._getDefaultSession = function getDefaultSession() {
+ // We implicitly associate a Mongo connection object with a DriverSession so that tests which
+ // call DB.prototype.getMongo() and then Mongo.prototype.getDB() to get a different DB instance
+ // are still causally consistent.
+ if (!this.hasOwnProperty("_defaultSession")) {
+ this._defaultSession = new _DummyDriverSession(this);
}
+ return this._defaultSession;
};
-/**
- * Gets the operationTime or null if unset.
- */
-Mongo.prototype.getOperationTime = function() {
- if (this._operationTime === undefined) {
- return null;
+Mongo.prototype.isCausalConsistency = function isCausalConsistency() {
+ if (!this.hasOwnProperty("_causalConsistency")) {
+ this._causalConsistency = false;
}
- return this._operationTime;
+ return this._causalConsistency;
};
-Mongo.prototype.startSession = function(opts) {
- return new DriverSession(this, opts);
+Mongo.prototype.setCausalConsistency = function setCausalConsistency(causalConsistency = true) {
+ this._causalConsistency = causalConsistency;
};
diff --git a/src/mongo/shell/session.js b/src/mongo/shell/session.js
index 90460663284..f932752555d 100644
--- a/src/mongo/shell/session.js
+++ b/src/mongo/shell/session.js
@@ -10,6 +10,9 @@ var {
let _readPreference = rawOptions.readPreference;
let _readConcern = rawOptions.readConcern;
let _writeConcern = rawOptions.writeConcern;
+ const _initialClusterTime = rawOptions.initialClusterTime;
+ const _initialOperationTime = rawOptions.initialOperationTime;
+ let _causalConsistency = rawOptions.causalConsistency;
let _retryWrites = rawOptions.retryWrites;
this.getReadPreference = function getReadPreference() {
@@ -39,6 +42,22 @@ var {
_writeConcern = writeConcern;
};
+ this.getInitialClusterTime = function getInitialClusterTime() {
+ return _initialClusterTime;
+ };
+
+ this.getInitialOperationTime = function getInitialOperationTime() {
+ return _initialOperationTime;
+ };
+
+ this.isCausalConsistency = function isCausalConsistency() {
+ return _causalConsistency;
+ };
+
+ this.setCausalConsistency = function setCausalConsistency(causalConsistency = true) {
+ _causalConsistency = causalConsistency;
+ };
+
this.shouldRetryWrites = function shouldRetryWrites() {
return _retryWrites;
};
@@ -50,6 +69,7 @@ var {
function SessionAwareClient(client) {
const kWireVersionSupportingLogicalSession = 6;
+ const kWireVersionSupportingCausalConsistency = 6;
this.getReadPreference = function getReadPreference(driverSession) {
const sessionOptions = driverSession.getOptions();
@@ -83,11 +103,90 @@ var {
wireVersion <= client.getMaxWireVersion();
}
+ const kCommandsThatSupportReadConcern = new Set([
+ "aggregate",
+ "count",
+ "distinct",
+ "explain",
+ "find",
+ "geoNear",
+ "geoSearch",
+ "group",
+ "mapReduce",
+ "mapreduce",
+ "parallelCollectionScan",
+ ]);
+
+ function canUseReadConcern(cmdObj) {
+ let cmdName = Object.keys(cmdObj)[0];
+
+ // If the command is in a wrapped form, then we look for the actual command name inside
+ // the query/$query object.
+ let cmdObjUnwrapped = cmdObj;
+ if (cmdName === "query" || cmdName === "$query") {
+ cmdObjUnwrapped = cmdObj[cmdName];
+ cmdName = Object.keys(cmdObjUnwrapped)[0];
+ }
+
+ if (!kCommandsThatSupportReadConcern.has(cmdName)) {
+ return false;
+ }
+
+ if (cmdName === "aggregate" && cmdObjUnwrapped.explain) {
+ // TODO SERVER-30582: Aggregation's explain doesn't support the "readConcern"
+ // option. Note that an aggregation with a $out stage as its last stage still
+ // supports a read concern level of "local".
+ return false;
+ }
+
+ if (cmdName === "explain") {
+ return kCommandsThatSupportReadConcern.has(Object.keys(cmdObjUnwrapped.explain)[0]);
+ }
+
+ return true;
+ }
+
+ function injectAfterClusterTime(cmdObj, operationTime) {
+ cmdObj = Object.assign({}, cmdObj);
+
+ if (operationTime !== undefined) {
+ const cmdName = Object.keys(cmdObj)[0];
+
+ // If the command is in a wrapped form, then we look for the actual command object
+ // inside the query/$query object.
+ let cmdObjUnwrapped = cmdObj;
+ if (cmdName === "query" || cmdName === "$query") {
+ cmdObj[cmdName] = Object.assign({}, cmdObj[cmdName]);
+ cmdObjUnwrapped = cmdObj[cmdName];
+ }
+
+ cmdObjUnwrapped.readConcern = Object.assign({}, cmdObjUnwrapped.readConcern);
+ const readConcern = cmdObjUnwrapped.readConcern;
+
+ if (!readConcern.hasOwnProperty("afterClusterTime")) {
+ readConcern.afterClusterTime = operationTime;
+ }
+ }
+
+ return cmdObj;
+ }
+
function prepareCommandRequest(driverSession, cmdObj) {
if (serverSupports(kWireVersionSupportingLogicalSession)) {
cmdObj = driverSession._serverSession.injectSessionId(cmdObj);
}
+ if (serverSupports(kWireVersionSupportingCausalConsistency) &&
+ (driverSession.getOptions().isCausalConsistency() ||
+ client.isCausalConsistency()) &&
+ canUseReadConcern(cmdObj)) {
+ // `driverSession._operationTime` is the smallest time needed for performing a
+ // causally consistent read using the current session. Note that
+ // `client.getClusterTime()` is no smaller than the operation time and would
+ // therefore only be less efficient to wait until.
+ cmdObj = injectAfterClusterTime(cmdObj, driverSession._operationTime);
+ }
+
return cmdObj;
}
@@ -309,7 +408,11 @@ var {
}
this._serverSession = implMethods.createServerSession(client);
- this._operationTime = null;
+ this._operationTime = _options.getInitialOperationTime();
+
+ if (_options.getInitialClusterTime() !== undefined) {
+ client.setClusterTime(_options.getInitialClusterTime());
+ }
this.getClient = function getClient() {
return client;
@@ -319,6 +422,14 @@ var {
return _options;
};
+ this.getOperationTime = function getOperationTime() {
+ return this._operationTime;
+ };
+
+ this.getClusterTime = function getClusterTime() {
+ return client.getClusterTime();
+ };
+
this.getDatabase = function getDatabase(dbName) {
const db = client.getDB(dbName);
db._session = this;