summaryrefslogtreecommitdiff
path: root/jstests/noPassthrough/read_concern_snapshot_aggregation.js
diff options
context:
space:
mode:
Diffstat (limited to 'jstests/noPassthrough/read_concern_snapshot_aggregation.js')
-rw-r--r--jstests/noPassthrough/read_concern_snapshot_aggregation.js385
1 files changed, 191 insertions, 194 deletions
diff --git a/jstests/noPassthrough/read_concern_snapshot_aggregation.js b/jstests/noPassthrough/read_concern_snapshot_aggregation.js
index 9c36b6ebf2e..2cb5fe26fd6 100644
--- a/jstests/noPassthrough/read_concern_snapshot_aggregation.js
+++ b/jstests/noPassthrough/read_concern_snapshot_aggregation.js
@@ -4,129 +4,129 @@
* @tags: [uses_transactions]
*/
(function() {
- "use strict";
+"use strict";
- const kAdminDB = "admin";
- const kCollName = "coll";
- const kConfigDB = "config";
- const kDBName = "test";
- const kWCMajority = {writeConcern: {w: "majority"}};
+const kAdminDB = "admin";
+const kCollName = "coll";
+const kConfigDB = "config";
+const kDBName = "test";
+const kWCMajority = {
+ writeConcern: {w: "majority"}
+};
- let rst = new ReplSetTest({nodes: 1});
- rst.startSet();
- rst.initiate();
- let session =
- rst.getPrimary().getDB(kDBName).getMongo().startSession({causalConsistency: false});
- let sessionDB = session.getDatabase(kDBName);
+let rst = new ReplSetTest({nodes: 1});
+rst.startSet();
+rst.initiate();
+let session = rst.getPrimary().getDB(kDBName).getMongo().startSession({causalConsistency: false});
+let sessionDB = session.getDatabase(kDBName);
- let txnNumber = NumberLong(0);
- assert.commandWorked(sessionDB.runCommand({create: kCollName, writeConcern: {w: "majority"}}));
+let txnNumber = NumberLong(0);
+assert.commandWorked(sessionDB.runCommand({create: kCollName, writeConcern: {w: "majority"}}));
- function testSnapshotAggFailsWithCode(coll, pipeline, code) {
- let cmd = {aggregate: coll, pipeline: pipeline, cursor: {}};
+function testSnapshotAggFailsWithCode(coll, pipeline, code) {
+ let cmd = {aggregate: coll, pipeline: pipeline, cursor: {}};
- let cmdAsSnapshotRead = Object.extend({}, cmd);
- cmdAsSnapshotRead.txnNumber = NumberLong(++txnNumber);
- cmdAsSnapshotRead.readConcern = {level: "snapshot"};
- cmdAsSnapshotRead.autocommit = false;
- cmdAsSnapshotRead.startTransaction = true;
- assert.commandFailedWithCode(sessionDB.runCommand(cmdAsSnapshotRead), code);
+ let cmdAsSnapshotRead = Object.extend({}, cmd);
+ cmdAsSnapshotRead.txnNumber = NumberLong(++txnNumber);
+ cmdAsSnapshotRead.readConcern = {level: "snapshot"};
+ cmdAsSnapshotRead.autocommit = false;
+ cmdAsSnapshotRead.startTransaction = true;
+ assert.commandFailedWithCode(sessionDB.runCommand(cmdAsSnapshotRead), code);
- // As a sanity check, also make sure that the command succeeds when run without a txn number
- // and without a readConcern.
- assert.commandWorked(sessionDB.runCommand(cmd));
- }
+ // As a sanity check, also make sure that the command succeeds when run without a txn number
+ // and without a readConcern.
+ assert.commandWorked(sessionDB.runCommand(cmd));
+}
- // Test that $changeStream is disallowed with transactions.
- // TODO SERVER-37221: Remove the check for 'supportsCommittedReads'.
- if (sessionDB.serverStatus().storageEngine.supportsCommittedReads) {
- testSnapshotAggFailsWithCode(
- kCollName, [{$changeStream: {}}], ErrorCodes.OperationNotSupportedInTransaction);
- }
-
- // Test that $collStats is disallowed with transactions.
- testSnapshotAggFailsWithCode(
- kCollName, [{$collStats: {}}], ErrorCodes.OperationNotSupportedInTransaction);
-
- // Test that $indexStats is disallowed with transactions.
+// Test that $changeStream is disallowed with transactions.
+// TODO SERVER-37221: Remove the check for 'supportsCommittedReads'.
+if (sessionDB.serverStatus().storageEngine.supportsCommittedReads) {
testSnapshotAggFailsWithCode(
- kCollName, [{$indexStats: {}}], ErrorCodes.OperationNotSupportedInTransaction);
+ kCollName, [{$changeStream: {}}], ErrorCodes.OperationNotSupportedInTransaction);
+}
- // Test that $listLocalSessions is disallowed with transactions.
- testSnapshotAggFailsWithCode(
- 1, [{$listLocalSessions: {}}], ErrorCodes.OperationNotSupportedInTransaction);
+// Test that $collStats is disallowed with transactions.
+testSnapshotAggFailsWithCode(
+ kCollName, [{$collStats: {}}], ErrorCodes.OperationNotSupportedInTransaction);
- // Test that $out is disallowed with transactions.
- testSnapshotAggFailsWithCode(
- kCollName, [{$out: "out"}], ErrorCodes.OperationNotSupportedInTransaction);
+// Test that $indexStats is disallowed with transactions.
+testSnapshotAggFailsWithCode(
+ kCollName, [{$indexStats: {}}], ErrorCodes.OperationNotSupportedInTransaction);
- // Test that $listSessions is disallowed with transactions. This stage must be run against
- // 'system.sessions' in the config database, which cannot be queried in a transaction.
- sessionDB = session.getDatabase(kConfigDB);
- testSnapshotAggFailsWithCode(
- "system.sessions", [{$listSessions: {}}], ErrorCodes.OperationNotSupportedInTransaction);
+// Test that $listLocalSessions is disallowed with transactions.
+testSnapshotAggFailsWithCode(
+ 1, [{$listLocalSessions: {}}], ErrorCodes.OperationNotSupportedInTransaction);
- // Test that $currentOp is disallowed with transactions. We have to reassign 'sessionDB' to
- // refer to the admin database, because $currentOp pipelines are required to run against
- // 'admin'. Queries against 'admin' are not permitted in a transaction.
- sessionDB = session.getDatabase(kAdminDB);
- testSnapshotAggFailsWithCode(
- 1, [{$currentOp: {}}], ErrorCodes.OperationNotSupportedInTransaction);
- sessionDB = session.getDatabase(kDBName);
+// Test that $out is disallowed with transactions.
+testSnapshotAggFailsWithCode(
+ kCollName, [{$out: "out"}], ErrorCodes.OperationNotSupportedInTransaction);
- // Helper for testing that aggregation stages which involve a local and foreign collection
- // ($lookup and $graphLookup) obey the expected readConcern "snapshot" isolation semantics.
- //
- // Inserts 'localDocsPre' into the 'local' collection and 'foreignDocsPre' into the 'foreign'
- // collection. Then runs the first batch of 'pipeline', before inserting 'localDocsPost' into
- // 'local' and 'foreignDocsPost' into 'foreign'. Iterates the remainder of the aggregation
- // cursor and verifies that the result set matches 'expectedResults'.
- function testLookupReadConcernSnapshotIsolation(
- {localDocsPre, foreignDocsPre, localDocsPost, foreignDocsPost, pipeline, expectedResults}) {
- sessionDB.runCommand({drop: "local", writeConcern: {w: "majority"}});
- sessionDB.runCommand({drop: "foreign", writeConcern: {w: "majority"}});
- let localColl = sessionDB.local;
- let foreignColl = sessionDB.foreign;
- assert.commandWorked(localColl.insert(localDocsPre, kWCMajority));
- assert.commandWorked(foreignColl.insert(foreignDocsPre, kWCMajority));
- let cmdRes = sessionDB.runCommand({
- aggregate: localColl.getName(),
- pipeline: pipeline,
- cursor: {batchSize: 0},
- readConcern: {level: "snapshot"},
- txnNumber: NumberLong(++txnNumber),
- startTransaction: true,
- autocommit: false
- });
- assert.commandWorked(cmdRes);
- assert.neq(0, cmdRes.cursor.id);
- assert.eq(0, cmdRes.cursor.firstBatch.length);
+// Test that $listSessions is disallowed with transactions. This stage must be run against
+// 'system.sessions' in the config database, which cannot be queried in a transaction.
+sessionDB = session.getDatabase(kConfigDB);
+testSnapshotAggFailsWithCode(
+ "system.sessions", [{$listSessions: {}}], ErrorCodes.OperationNotSupportedInTransaction);
- assert.commandWorked(localColl.insert(localDocsPost, kWCMajority));
- assert.commandWorked(foreignColl.insert(foreignDocsPost, kWCMajority));
- let results =
- new DBCommandCursor(sessionDB, cmdRes, undefined, undefined, NumberLong(txnNumber))
- .toArray();
- assert.eq(results, expectedResults);
- assert.commandWorked(sessionDB.adminCommand(
- {commitTransaction: 1, txnNumber: NumberLong(txnNumber), autocommit: false}));
- }
+// Test that $currentOp is disallowed with transactions. We have to reassign 'sessionDB' to
+// refer to the admin database, because $currentOp pipelines are required to run against
+// 'admin'. Queries against 'admin' are not permitted in a transaction.
+sessionDB = session.getDatabase(kAdminDB);
+testSnapshotAggFailsWithCode(1, [{$currentOp: {}}], ErrorCodes.OperationNotSupportedInTransaction);
+sessionDB = session.getDatabase(kDBName);
- // Test that snapshot isolation works with $lookup using localField/foreignField syntax.
- testLookupReadConcernSnapshotIsolation({
- localDocsPre: [{_id: 0}, {_id: 1}, {_id: 2}],
- foreignDocsPre: [{_id: 1}],
- localDocsPost: [{_id: 3}],
- foreignDocsPost: [{_id: 2}, {_id: 3}],
- pipeline: [
- {$lookup: {from: "foreign", localField: "_id", foreignField: "_id", as: "as"}},
- {$sort: {_id: 1}}
- ],
- expectedResults: [{_id: 0, as: []}, {_id: 1, as: [{_id: 1}]}, {_id: 2, as: []}]
+// Helper for testing that aggregation stages which involve a local and foreign collection
+// ($lookup and $graphLookup) obey the expected readConcern "snapshot" isolation semantics.
+//
+// Inserts 'localDocsPre' into the 'local' collection and 'foreignDocsPre' into the 'foreign'
+// collection. Then runs the first batch of 'pipeline', before inserting 'localDocsPost' into
+// 'local' and 'foreignDocsPost' into 'foreign'. Iterates the remainder of the aggregation
+// cursor and verifies that the result set matches 'expectedResults'.
+function testLookupReadConcernSnapshotIsolation(
+ {localDocsPre, foreignDocsPre, localDocsPost, foreignDocsPost, pipeline, expectedResults}) {
+ sessionDB.runCommand({drop: "local", writeConcern: {w: "majority"}});
+ sessionDB.runCommand({drop: "foreign", writeConcern: {w: "majority"}});
+ let localColl = sessionDB.local;
+ let foreignColl = sessionDB.foreign;
+ assert.commandWorked(localColl.insert(localDocsPre, kWCMajority));
+ assert.commandWorked(foreignColl.insert(foreignDocsPre, kWCMajority));
+ let cmdRes = sessionDB.runCommand({
+ aggregate: localColl.getName(),
+ pipeline: pipeline,
+ cursor: {batchSize: 0},
+ readConcern: {level: "snapshot"},
+ txnNumber: NumberLong(++txnNumber),
+ startTransaction: true,
+ autocommit: false
});
+ assert.commandWorked(cmdRes);
+ assert.neq(0, cmdRes.cursor.id);
+ assert.eq(0, cmdRes.cursor.firstBatch.length);
+
+ assert.commandWorked(localColl.insert(localDocsPost, kWCMajority));
+ assert.commandWorked(foreignColl.insert(foreignDocsPost, kWCMajority));
+ let results =
+ new DBCommandCursor(sessionDB, cmdRes, undefined, undefined, NumberLong(txnNumber))
+ .toArray();
+ assert.eq(results, expectedResults);
+ assert.commandWorked(sessionDB.adminCommand(
+ {commitTransaction: 1, txnNumber: NumberLong(txnNumber), autocommit: false}));
+}
- // Test that snapshot isolation works with $lookup into a nested pipeline.
- testLookupReadConcernSnapshotIsolation({
+// Test that snapshot isolation works with $lookup using localField/foreignField syntax.
+testLookupReadConcernSnapshotIsolation({
+ localDocsPre: [{_id: 0}, {_id: 1}, {_id: 2}],
+ foreignDocsPre: [{_id: 1}],
+ localDocsPost: [{_id: 3}],
+ foreignDocsPost: [{_id: 2}, {_id: 3}],
+ pipeline: [
+ {$lookup: {from: "foreign", localField: "_id", foreignField: "_id", as: "as"}},
+ {$sort: {_id: 1}}
+ ],
+ expectedResults: [{_id: 0, as: []}, {_id: 1, as: [{_id: 1}]}, {_id: 2, as: []}]
+});
+
+// Test that snapshot isolation works with $lookup into a nested pipeline.
+testLookupReadConcernSnapshotIsolation({
localDocsPre: [{_id: 0}, {_id: 1}, {_id: 2}],
foreignDocsPre: [{_id: 1}],
localDocsPost: [{_id: 3}],
@@ -145,8 +145,8 @@
expectedResults: [{_id: 0, as: []}, {_id: 1, as: [{_id: 1}]}, {_id: 2, as: []}]
});
- // Test that snapshot isolation works with $graphLookup.
- testLookupReadConcernSnapshotIsolation({
+// Test that snapshot isolation works with $graphLookup.
+testLookupReadConcernSnapshotIsolation({
localDocsPre: [{_id: 0}, {_id: 1}, {_id: 2}],
foreignDocsPre: [{_id: 1, linkTo: 2}],
localDocsPost: [{_id: 3}],
@@ -167,97 +167,94 @@
[{_id: 0, as: []}, {_id: 1, as: [{_id: 1, linkTo: 2}]}, {_id: 2, as: []}]
});
- // Test that snapshot isolation works for $geoNear. Special care is taken to test snapshot
- // isolation across getMore for $geoNear as it is an initial document source.
- assert.commandWorked(sessionDB.runCommand({drop: kCollName, writeConcern: {w: "majority"}}));
- assert.commandWorked(sessionDB.runCommand({
- createIndexes: kCollName,
- indexes: [{key: {geo: "2dsphere"}, name: "geo_2dsphere"}],
- writeConcern: {w: "majority"}
- }));
+// Test that snapshot isolation works for $geoNear. Special care is taken to test snapshot
+// isolation across getMore for $geoNear as it is an initial document source.
+assert.commandWorked(sessionDB.runCommand({drop: kCollName, writeConcern: {w: "majority"}}));
+assert.commandWorked(sessionDB.runCommand({
+ createIndexes: kCollName,
+ indexes: [{key: {geo: "2dsphere"}, name: "geo_2dsphere"}],
+ writeConcern: {w: "majority"}
+}));
- const coll = sessionDB.getCollection(kCollName);
- let bulk = coll.initializeUnorderedBulkOp();
- const numInitialGeoInsert = 4;
- for (let i = 0; i < numInitialGeoInsert; ++i) {
- bulk.insert({_id: i, geo: {type: "Point", coordinates: [0, 0]}});
- }
- assert.commandWorked(bulk.execute({w: "majority"}));
+const coll = sessionDB.getCollection(kCollName);
+let bulk = coll.initializeUnorderedBulkOp();
+const numInitialGeoInsert = 4;
+for (let i = 0; i < numInitialGeoInsert; ++i) {
+ bulk.insert({_id: i, geo: {type: "Point", coordinates: [0, 0]}});
+}
+assert.commandWorked(bulk.execute({w: "majority"}));
- let cmdRes = assert.commandWorked(sessionDB.runCommand({
- aggregate: kCollName,
- pipeline: [{
- $geoNear: {
- spherical: true,
- near: {type: "Point", coordinates: [0, 0]},
- distanceField: "distance"
- }
- }],
- txnNumber: NumberLong(++txnNumber),
- readConcern: {level: "snapshot"},
- autocommit: false,
- startTransaction: true,
- cursor: {batchSize: 0}
- }));
- assert(cmdRes.hasOwnProperty("cursor"));
- const cursorId = cmdRes.cursor.id;
- assert.neq(cursorId, 0);
+let cmdRes = assert.commandWorked(sessionDB.runCommand({
+ aggregate: kCollName,
+ pipeline: [{
+ $geoNear:
+ {spherical: true, near: {type: "Point", coordinates: [0, 0]}, distanceField: "distance"}
+ }],
+ txnNumber: NumberLong(++txnNumber),
+ readConcern: {level: "snapshot"},
+ autocommit: false,
+ startTransaction: true,
+ cursor: {batchSize: 0}
+}));
+assert(cmdRes.hasOwnProperty("cursor"));
+const cursorId = cmdRes.cursor.id;
+assert.neq(cursorId, 0);
- assert.commandWorked(
- coll.insert({_id: numInitialGeoInsert, geo: {type: "Point", coordinates: [0, 0]}},
- {writeConcern: {w: "majority"}}));
+assert.commandWorked(
+ coll.insert({_id: numInitialGeoInsert, geo: {type: "Point", coordinates: [0, 0]}},
+ {writeConcern: {w: "majority"}}));
- cmdRes = assert.commandWorked(sessionDB.runCommand({
- getMore: NumberLong(cursorId),
- collection: kCollName,
- autocommit: false,
- txnNumber: NumberLong(txnNumber)
- }));
- assert.commandWorked(sessionDB.adminCommand(
- {commitTransaction: 1, txnNumber: NumberLong(txnNumber), autocommit: false}));
- assert(cmdRes.hasOwnProperty("cursor"));
- assert(cmdRes.cursor.hasOwnProperty("nextBatch"));
- assert.eq(cmdRes.cursor.nextBatch.length, numInitialGeoInsert);
+cmdRes = assert.commandWorked(sessionDB.runCommand({
+ getMore: NumberLong(cursorId),
+ collection: kCollName,
+ autocommit: false,
+ txnNumber: NumberLong(txnNumber)
+}));
+assert.commandWorked(sessionDB.adminCommand(
+ {commitTransaction: 1, txnNumber: NumberLong(txnNumber), autocommit: false}));
+assert(cmdRes.hasOwnProperty("cursor"));
+assert(cmdRes.cursor.hasOwnProperty("nextBatch"));
+assert.eq(cmdRes.cursor.nextBatch.length, numInitialGeoInsert);
- // Test that snapshot reads are legal for $facet.
- assert.commandWorked(sessionDB.runCommand({drop: kCollName, writeConcern: {w: "majority"}}));
- assert.commandWorked(coll.insert(
- [
- {group1: 1, group2: 1, val: 1},
- {group1: 1, group2: 2, val: 2},
- {group1: 2, group2: 2, val: 8}
- ],
- kWCMajority));
+// Test that snapshot reads are legal for $facet.
+assert.commandWorked(sessionDB.runCommand({drop: kCollName, writeConcern: {w: "majority"}}));
+assert.commandWorked(coll.insert(
+ [
+ {group1: 1, group2: 1, val: 1},
+ {group1: 1, group2: 2, val: 2},
+ {group1: 2, group2: 2, val: 8}
+ ],
+ kWCMajority));
- cmdRes = sessionDB.runCommand({
- aggregate: kCollName,
- pipeline: [
- {
- $facet: {
- g1: [{$group: {_id: "$group1", sum: {$sum: "$val"}}}, {$sort: {_id: 1}}],
- g2: [{$group: {_id: "$group2", sum: {$sum: "$val"}}}, {$sort: {_id: 1}}]
- }
- },
- {$unwind: "$g1"},
- {$unwind: "$g2"},
- {$sort: {"g1._id": 1, "g2._id": 1}}
- ],
- cursor: {},
- readConcern: {level: "snapshot"},
- txnNumber: NumberLong(++txnNumber),
- startTransaction: true,
- autocommit: false
- });
- assert.commandWorked(sessionDB.adminCommand(
- {commitTransaction: 1, txnNumber: NumberLong(txnNumber), autocommit: false}));
- assert.commandWorked(cmdRes);
- assert.eq(0, cmdRes.cursor.id);
- assert.eq(cmdRes.cursor.firstBatch, [
- {g1: {_id: 1, sum: 3}, g2: {_id: 1, sum: 1}},
- {g1: {_id: 1, sum: 3}, g2: {_id: 2, sum: 10}},
- {g1: {_id: 2, sum: 8}, g2: {_id: 1, sum: 1}},
- {g1: {_id: 2, sum: 8}, g2: {_id: 2, sum: 10}}
- ]);
+cmdRes = sessionDB.runCommand({
+ aggregate: kCollName,
+ pipeline: [
+ {
+ $facet: {
+ g1: [{$group: {_id: "$group1", sum: {$sum: "$val"}}}, {$sort: {_id: 1}}],
+ g2: [{$group: {_id: "$group2", sum: {$sum: "$val"}}}, {$sort: {_id: 1}}]
+ }
+ },
+ {$unwind: "$g1"},
+ {$unwind: "$g2"},
+ {$sort: {"g1._id": 1, "g2._id": 1}}
+ ],
+ cursor: {},
+ readConcern: {level: "snapshot"},
+ txnNumber: NumberLong(++txnNumber),
+ startTransaction: true,
+ autocommit: false
+});
+assert.commandWorked(sessionDB.adminCommand(
+ {commitTransaction: 1, txnNumber: NumberLong(txnNumber), autocommit: false}));
+assert.commandWorked(cmdRes);
+assert.eq(0, cmdRes.cursor.id);
+assert.eq(cmdRes.cursor.firstBatch, [
+ {g1: {_id: 1, sum: 3}, g2: {_id: 1, sum: 1}},
+ {g1: {_id: 1, sum: 3}, g2: {_id: 2, sum: 10}},
+ {g1: {_id: 2, sum: 8}, g2: {_id: 1, sum: 1}},
+ {g1: {_id: 2, sum: 8}, g2: {_id: 2, sum: 10}}
+]);
- rst.stopSet();
+rst.stopSet();
}());