summaryrefslogtreecommitdiff
path: root/jstests
diff options
context:
space:
mode:
authorDavid Storch <david.storch@10gen.com>2018-02-28 18:08:21 -0500
committerDavid Storch <david.storch@10gen.com>2018-03-09 17:20:42 -0500
commited1e2b4d2a4987e3744484f9482fdc7a0e119e94 (patch)
tree8096db9198fb62cd62e2192a38b15faf3d5100a6 /jstests
parentb9e20190b647fea262a8f4e154bbf18d9934a3ba (diff)
downloadmongo-ed1e2b4d2a4987e3744484f9482fdc7a0e119e94.tar.gz
SERVER-33541 Add readConcern level 'snapshot' support for aggregation.
Diffstat (limited to 'jstests')
-rw-r--r--jstests/noPassthrough/readConcern_snapshot.js8
-rw-r--r--jstests/noPassthrough/read_concern_snapshot_aggregation.js216
-rw-r--r--jstests/noPassthrough/read_concern_snapshot_yielding.js13
-rw-r--r--jstests/noPassthrough/snapshot_reads.js33
4 files changed, 253 insertions, 17 deletions
diff --git a/jstests/noPassthrough/readConcern_snapshot.js b/jstests/noPassthrough/readConcern_snapshot.js
index a45bdc5a4bd..2257f6641bf 100644
--- a/jstests/noPassthrough/readConcern_snapshot.js
+++ b/jstests/noPassthrough/readConcern_snapshot.js
@@ -117,16 +117,14 @@
assert.commandWorked(sessionDb.runCommand(
{find: collName, readConcern: {level: "snapshot"}, txnNumber: NumberLong(txnNumber++)}));
- // readConcern 'snapshot' is not supported by aggregate.
- // TODO SERVER-33354: Add snapshot support for aggregate.
- assert.commandFailedWithCode(sessionDb.runCommand({
+ // readConcern 'snapshot' is supported by aggregate.
+ assert.commandWorked(sessionDb.runCommand({
aggregate: collName,
pipeline: [],
cursor: {},
readConcern: {level: "snapshot"},
txnNumber: NumberLong(txnNumber++)
- }),
- ErrorCodes.InvalidOptions);
+ }));
// readConcern 'snapshot' is supported by count.
assert.commandWorked(sessionDb.runCommand(
diff --git a/jstests/noPassthrough/read_concern_snapshot_aggregation.js b/jstests/noPassthrough/read_concern_snapshot_aggregation.js
new file mode 100644
index 00000000000..9dfa9e3d816
--- /dev/null
+++ b/jstests/noPassthrough/read_concern_snapshot_aggregation.js
@@ -0,0 +1,216 @@
+/**
+ * Tests for the aggregate command's support for readConcern level "snapshot".
+ *
+ * @tags: [requires_replication]
+ */
+(function() {
+ "use strict";
+
+ const kAdminDB = "admin";
+ const kCollName = "coll";
+ const kConfigDB = "config";
+ const kDBName = "test";
+ const kIllegalStageForSnapshotReadCode = 50742;
+ const kWCMajority = {writeConcern: {w: "majority"}};
+
+ let rst = new ReplSetTest({nodes: 1});
+ rst.startSet();
+ rst.initiate();
+ let session =
+ rst.getPrimary().getDB(kDBName).getMongo().startSession({causalConsistency: false});
+ let sessionDB = session.getDatabase(kDBName);
+ if (!sessionDB.serverStatus().storageEngine.supportsSnapshotReadConcern) {
+ rst.stopSet();
+ return;
+ }
+
+ let txnNumber = NumberLong(0);
+ assert.commandWorked(sessionDB.runCommand({create: kCollName}));
+
+ function testSnapshotAggFailsWithCode(coll, pipeline, code) {
+ let cmd = {aggregate: coll, pipeline: pipeline, cursor: {}};
+
+ let cmdAsSnapshotRead = Object.extend({}, cmd);
+ cmdAsSnapshotRead.txnNumber = NumberLong(++txnNumber);
+ cmdAsSnapshotRead.readConcern = {level: "snapshot"};
+ assert.commandFailedWithCode(sessionDB.runCommand(cmdAsSnapshotRead), code);
+
+ // As a sanity check, also make sure that the command succeeds when run without a txn number
+ // and without a readConcern.
+ assert.commandWorked(sessionDB.runCommand(cmd));
+ }
+
+ // Test that $changeStream is disallowed with snapshot reads.
+ testSnapshotAggFailsWithCode(kCollName, [{$changeStream: {}}], ErrorCodes.InvalidOptions);
+
+ // Test that $collStats is disallowed with snapshot reads.
+ testSnapshotAggFailsWithCode(kCollName, [{$collStats: {}}], kIllegalStageForSnapshotReadCode);
+
+ // Test that $geoNear is disallowed with snapshot reads.
+ assert.commandWorked(sessionDB.getCollection(kCollName).createIndex({a: "2dsphere"}));
+ testSnapshotAggFailsWithCode(kCollName,
+ [{
+ $geoNear: {
+ near: {type: "Point", coordinates: [0, 0]},
+ distanceField: "distanceField",
+ spherical: true,
+ key: "a"
+ }
+ }],
+ kIllegalStageForSnapshotReadCode);
+
+ // Test that $indexStats is disallowed with snapshot reads.
+ testSnapshotAggFailsWithCode(kCollName, [{$indexStats: {}}], kIllegalStageForSnapshotReadCode);
+
+ // Test that $listLocalCursors is disallowed with snapshot reads.
+ testSnapshotAggFailsWithCode(1, [{$listLocalCursors: {}}], ErrorCodes.InvalidOptions);
+
+ // Test that $listLocalSessions is disallowed with snapshot reads.
+ testSnapshotAggFailsWithCode(1, [{$listLocalSessions: {}}], ErrorCodes.InvalidOptions);
+
+ // Test that $out is disallowed with snapshot reads.
+ testSnapshotAggFailsWithCode(kCollName, [{$out: "out"}], ErrorCodes.InvalidOptions);
+
+ // Test that $listSessions is disallowed with snapshot reads. This stage must be run against
+ // 'system.sessions' in the config database.
+ sessionDB = session.getDatabase(kConfigDB);
+ testSnapshotAggFailsWithCode(
+ "system.sessions", [{$listSessions: {}}], kIllegalStageForSnapshotReadCode);
+
+ // Test that $currentOp is disallowed with snapshot reads. We have to reassign 'sessionDB' to
+ // refer to the admin database, because $currentOp pipelines are required to run against
+ // 'admin'.
+ sessionDB = session.getDatabase(kAdminDB);
+ testSnapshotAggFailsWithCode(1, [{$currentOp: {}}], ErrorCodes.InvalidOptions);
+ sessionDB = session.getDatabase(kDBName);
+
+ // Helper for testing that aggregation stages which involve a local and foreign collection
+ // ($lookup and $graphLookup) obey the expected readConcern "snapshot" isolation semantics.
+ //
+ // Inserts 'localDocsPre' into the 'local' collection and 'foreignDocsPre' into the 'foreign'
+ // collection. Then runs the first batch of 'pipeline', before inserting 'localDocsPost' into
+ // 'local' and 'foreignDocsPost' into 'foreign'. Iterates the remainder of the aggregation
+ // cursor and verifies that the result set matches 'expectedResults'.
+ function testLookupReadConcernSnapshotIsolation(
+ {localDocsPre, foreignDocsPre, localDocsPost, foreignDocsPost, pipeline, expectedResults}) {
+ let localColl = sessionDB.local;
+ let foreignColl = sessionDB.foreign;
+ localColl.drop();
+ foreignColl.drop();
+ assert.commandWorked(localColl.insert(localDocsPre, kWCMajority));
+ assert.commandWorked(foreignColl.insert(foreignDocsPre, kWCMajority));
+ let cmdRes = sessionDB.runCommand({
+ aggregate: localColl.getName(),
+ pipeline: pipeline,
+ // TODO SERVER-33698: Remove this workaround once cursor establishment commands with
+ // batchSize:0 correctly open a snapshot for the read transaction.
+ cursor: {batchSize: 1},
+ readConcern: {level: "snapshot"},
+ txnNumber: NumberLong(++txnNumber)
+ });
+ assert.commandWorked(cmdRes);
+ assert.neq(0, cmdRes.cursor.id);
+ assert.eq(1, cmdRes.cursor.firstBatch.length);
+
+ assert.commandWorked(localColl.insert(localDocsPost, kWCMajority));
+ assert.commandWorked(foreignColl.insert(foreignDocsPost, kWCMajority));
+ let results =
+ new DBCommandCursor(sessionDB, cmdRes, undefined, undefined, NumberLong(txnNumber))
+ .toArray();
+ assert.eq(results, expectedResults);
+ }
+
+ // Test that snapshot isolation works with $lookup using localField/foreignField syntax.
+ testLookupReadConcernSnapshotIsolation({
+ localDocsPre: [{_id: 0}, {_id: 1}, {_id: 2}],
+ foreignDocsPre: [{_id: 1}],
+ localDocsPost: [{_id: 3}],
+ foreignDocsPost: [{_id: 2}, {_id: 3}],
+ pipeline: [
+ {$lookup: {from: "foreign", localField: "_id", foreignField: "_id", as: "as"}},
+ {$sort: {_id: 1}}
+ ],
+ expectedResults: [{_id: 0, as: []}, {_id: 1, as: [{_id: 1}]}, {_id: 2, as: []}]
+ });
+
+ // Test that snapshot isolation works with $lookup into a nested pipeline.
+ testLookupReadConcernSnapshotIsolation({
+ localDocsPre: [{_id: 0}, {_id: 1}, {_id: 2}],
+ foreignDocsPre: [{_id: 1}],
+ localDocsPost: [{_id: 3}],
+ foreignDocsPost: [{_id: 2}, {_id: 3}],
+ pipeline: [
+ {
+ $lookup: {
+ from: "foreign",
+ as: "as",
+ let : {localId: "$_id"},
+ pipeline: [{$match: {$expr: {$eq: ["$_id", "$$localId"]}}}]
+ }
+ },
+ {$sort: {_id: 1}}
+ ],
+ expectedResults: [{_id: 0, as: []}, {_id: 1, as: [{_id: 1}]}, {_id: 2, as: []}]
+ });
+
+ // Test that snapshot isolation works with $graphLookup.
+ testLookupReadConcernSnapshotIsolation({
+ localDocsPre: [{_id: 0}, {_id: 1}, {_id: 2}],
+ foreignDocsPre: [{_id: 1, linkTo: 2}],
+ localDocsPost: [{_id: 3}],
+ foreignDocsPost: [{_id: 2, linkTo: 3}, {_id: 3}],
+ pipeline: [
+ {
+ $graphLookup: {
+ from: "foreign",
+ as: "as",
+ startWith: "$_id",
+ connectFromField: "linkTo",
+ connectToField: "_id"
+ }
+ },
+ {$sort: {_id: 1}}
+ ],
+ expectedResults:
+ [{_id: 0, as: []}, {_id: 1, as: [{_id: 1, linkTo: 2}]}, {_id: 2, as: []}]
+ });
+
+ // Test that snapshot reads are legal for $facet.
+ let coll = sessionDB.getCollection(kCollName);
+ coll.drop();
+ assert.commandWorked(coll.insert(
+ [
+ {group1: 1, group2: 1, val: 1},
+ {group1: 1, group2: 2, val: 2},
+ {group1: 2, group2: 2, val: 8}
+ ],
+ kWCMajority));
+
+ let cmdRes = sessionDB.runCommand({
+ aggregate: kCollName,
+ pipeline: [
+ {
+ $facet: {
+ g1: [{$group: {_id: "$group1", sum: {$sum: "$val"}}}, {$sort: {_id: 1}}],
+ g2: [{$group: {_id: "$group2", sum: {$sum: "$val"}}}, {$sort: {_id: 1}}]
+ }
+ },
+ {$unwind: "$g1"},
+ {$unwind: "$g2"},
+ {$sort: {"g1._id": 1, "g2._id": 1}}
+ ],
+ cursor: {},
+ readConcern: {level: "snapshot"},
+ txnNumber: NumberLong(++txnNumber)
+ });
+ assert.commandWorked(cmdRes);
+ assert.eq(0, cmdRes.cursor.id);
+ assert.eq(cmdRes.cursor.firstBatch, [
+ {g1: {_id: 1, sum: 3}, g2: {_id: 1, sum: 1}},
+ {g1: {_id: 1, sum: 3}, g2: {_id: 2, sum: 10}},
+ {g1: {_id: 2, sum: 8}, g2: {_id: 1, sum: 1}},
+ {g1: {_id: 2, sum: 8}, g2: {_id: 2, sum: 10}}
+ ]);
+
+ rst.stopSet();
+}());
diff --git a/jstests/noPassthrough/read_concern_snapshot_yielding.js b/jstests/noPassthrough/read_concern_snapshot_yielding.js
index aa953e11927..dc600721b59 100644
--- a/jstests/noPassthrough/read_concern_snapshot_yielding.js
+++ b/jstests/noPassthrough/read_concern_snapshot_yielding.js
@@ -237,6 +237,19 @@
res.cursor.nextBatch.length, TestData.numDocs - initialFindBatchSize, tojson(res));
}, {"originatingCommand.filter": {x: 1}}, {op: "getmore"});
+ // Test aggregate.
+ testCommand(function() {
+ const res = assert.commandWorked(db.runCommand({
+ aggregate: "coll",
+ pipeline: [{$match: {x: 1}}],
+ readConcern: {level: "snapshot"},
+ cursor: {},
+ lsid: TestData.sessionId,
+ txnNumber: NumberLong(TestData.txnNumber)
+ }));
+ assert.eq(res.cursor.firstBatch.length, TestData.numDocs, tojson(res));
+ }, {"command.pipeline": [{$match: {x: 1}}]}, {"command.pipeline": [{$match: {x: 1}}]});
+
// Test update.
// We cannot provide a 'profilerFilter' because profiling is turned off for write commands in
// transactions.
diff --git a/jstests/noPassthrough/snapshot_reads.js b/jstests/noPassthrough/snapshot_reads.js
index efa80492b78..65a37fdc4db 100644
--- a/jstests/noPassthrough/snapshot_reads.js
+++ b/jstests/noPassthrough/snapshot_reads.js
@@ -20,7 +20,7 @@
}
const secondaryDB = rst.getSecondary().getDB(dbName);
- function runTest({useCausalConsistency, readFromSecondary}) {
+ function runTest({useCausalConsistency, readFromSecondary, establishCursorCmd}) {
primaryDB.coll.drop();
let readDB = primaryDB;
@@ -43,14 +43,14 @@
let txnNumber = 0;
+ // Augment the cursor-establishing command with the proper readConcern and transaction
+ // number.
+ let cursorCmd = Object.extend({}, establishCursorCmd);
+ cursorCmd.readConcern = {level: "snapshot"};
+ cursorCmd.txnNumber = NumberLong(txnNumber);
+
// Establish a snapshot cursor, fetching the first 5 documents.
- let res = assert.commandWorked(sessionDb.runCommand({
- find: collName,
- sort: {_id: 1},
- batchSize: 5,
- readConcern: {level: "snapshot"},
- txnNumber: NumberLong(txnNumber)
- }));
+ let res = assert.commandWorked(sessionDb.runCommand(cursorCmd));
assert(res.hasOwnProperty("cursor"));
assert(res.cursor.hasOwnProperty("firstBatch"));
@@ -132,10 +132,19 @@
session.endSession();
}
- runTest({useCausalConsistency: false, readFromSecondary: false});
- runTest({useCausalConsistency: true, readFromSecondary: false});
- runTest({useCausalConsistency: false, readFromSecondary: true});
- runTest({useCausalConsistency: true, readFromSecondary: true});
+ // Test snapshot reads using find.
+ let findCmd = {find: collName, sort: {_id: 1}, batchSize: 5};
+ runTest({useCausalConsistency: false, readFromSecondary: false, establishCursorCmd: findCmd});
+ runTest({useCausalConsistency: true, readFromSecondary: false, establishCursorCmd: findCmd});
+ runTest({useCausalConsistency: false, readFromSecondary: true, establishCursorCmd: findCmd});
+ runTest({useCausalConsistency: true, readFromSecondary: true, establishCursorCmd: findCmd});
+
+ // Test snapshot reads using aggregate.
+ let aggCmd = {aggregate: collName, pipeline: [{$sort: {_id: 1}}], cursor: {batchSize: 5}};
+ runTest({useCausalConsistency: false, readFromSecondary: false, establishCursorCmd: aggCmd});
+ runTest({useCausalConsistency: true, readFromSecondary: false, establishCursorCmd: aggCmd});
+ runTest({useCausalConsistency: false, readFromSecondary: true, establishCursorCmd: aggCmd});
+ runTest({useCausalConsistency: true, readFromSecondary: true, establishCursorCmd: aggCmd});
rst.stopSet();
})();