diff options
Diffstat (limited to 'jstests/sharding')
-rw-r--r-- | jstests/sharding/aggregations_in_session.js | 7 | ||||
-rw-r--r-- | jstests/sharding/snapshot_aggregate_mongos.js | 116 |
2 files changed, 118 insertions, 5 deletions
diff --git a/jstests/sharding/aggregations_in_session.js b/jstests/sharding/aggregations_in_session.js index 0a59f6368b3..6c5fcdb41be 100644 --- a/jstests/sharding/aggregations_in_session.js +++ b/jstests/sharding/aggregations_in_session.js @@ -29,11 +29,8 @@ .toArray()); assert.eq(mongosColl.aggregate([{$sort: {_id: 1}}, {$out: "testing"}]).itcount(), 0); - // Test that running an aggregation within a transaction against mongos will error. - assert.commandFailedWithCode( - mongosColl.getDB().runCommand( - {aggregate: mongosColl.getName(), pipeline: [], cursor: {}, txnNumber: NumberLong(1)}), - 50732); + assert.commandWorked(mongosColl.getDB().runCommand( + {aggregate: mongosColl.getName(), pipeline: [], cursor: {}, txnNumber: NumberLong(1)})); st.stop(); }()); diff --git a/jstests/sharding/snapshot_aggregate_mongos.js b/jstests/sharding/snapshot_aggregate_mongos.js new file mode 100644 index 00000000000..0ff71218aea --- /dev/null +++ b/jstests/sharding/snapshot_aggregate_mongos.js @@ -0,0 +1,116 @@ +// Tests snapshot isolation on readConcern level snapshot reads through mongos. +// @tags: [requires_sharding] +(function() { + "use strict"; + + const dbName = "test"; + const shardedCollName = "shardedColl"; + const unshardedCollName = "unshardedColl"; + + const st = new ShardingTest({shards: 1, mongos: 1, config: 1}); + + const shardDb = st.rs0.getPrimary().getDB(dbName); + if (!shardDb.serverStatus().storageEngine.supportsSnapshotReadConcern) { + st.stop(); + return; + } + + assert.commandWorked(st.s.adminCommand({enableSharding: dbName})); + assert.commandWorked(st.s.adminCommand( + {shardCollection: st.s.getDB(dbName)[shardedCollName] + "", key: {_id: 1}})); + + function runTest(mainDb, {useCausalConsistency, collName}) { + const session = mainDb.getMongo().startSession({causalConsistency: useCausalConsistency}); + const sessionDb = session.getDatabase(dbName); + + const bulk = mainDb[collName].initializeUnorderedBulkOp(); + for (let x = 0; x < 10; ++x) { + bulk.insert({_id: x}); + } + assert.commandWorked(bulk.execute({w: "majority"})); + + let txnNumber = 1; + + // Test snapshot reads using aggregate. + let cursorCmd = { + aggregate: collName, + pipeline: [{$sort: {_id: 1}}], + readConcern: {level: "snapshot"}, + txnNumber: NumberLong(txnNumber), + cursor: {batchSize: 5} + }; + + // Establish a snapshot cursor, fetching the first 5 documents. + + let res = assert.commandWorked(sessionDb.runCommand(cursorCmd)); + + assert(res.hasOwnProperty("cursor")); + assert(res.cursor.hasOwnProperty("firstBatch")); + assert.eq(5, res.cursor.firstBatch.length); + + assert(res.cursor.hasOwnProperty("id")); + const cursorId = res.cursor.id; + assert.neq(cursorId, 0); + + // Insert an 11th document which should not be visible to the snapshot cursor. This write is + // performed outside of the session. + assert.writeOK(mainDb[collName].insert({_id: 10}, {writeConcern: {w: "majority"}})); + + // Fetch the 6th document. This confirms that the transaction stash is preserved across + // multiple getMore invocations. + res = assert.commandWorked(sessionDb.runCommand({ + getMore: cursorId, + collection: collName, + batchSize: 1, + txnNumber: NumberLong(txnNumber) + })); + assert(res.hasOwnProperty("cursor")); + assert(res.cursor.hasOwnProperty("id")); + assert.neq(0, res.cursor.id); + + // Exhaust the cursor, retrieving the remainder of the result set. + res = assert.commandWorked(sessionDb.runCommand({ + getMore: cursorId, + collection: collName, + batchSize: 10, + txnNumber: NumberLong(txnNumber) + })); + + // The cursor has been exhausted. + assert(res.hasOwnProperty("cursor")); + assert(res.cursor.hasOwnProperty("id")); + assert.eq(0, res.cursor.id); + + // Only the remaining 4 of the initial 10 documents are returned. The 11th document is not + // part of the result set. + assert(res.cursor.hasOwnProperty("nextBatch")); + assert.eq(4, res.cursor.nextBatch.length); + + // Perform a second snapshot read under a new transaction. + res = assert.commandWorked(sessionDb.runCommand({ + aggregate: collName, + pipeline: [{$sort: {_id: 1}}], + cursor: {batchSize: 20}, + readConcern: {level: "snapshot"}, + txnNumber: NumberLong(txnNumber++) + })); + + // The cursor has been exhausted. + assert(res.hasOwnProperty("cursor")); + assert(res.cursor.hasOwnProperty("id")); + assert.eq(0, res.cursor.id); + + // All 11 documents are returned. + assert(res.cursor.hasOwnProperty("firstBatch")); + assert.eq(11, res.cursor.firstBatch.length); + + session.endSession(); + } + + jsTestLog("Running sharded"); + runTest(st.s.getDB(dbName), {useCausalConsistency: false, collName: shardedCollName}); + jsTestLog("Running unsharded"); + runTest(st.s.getDB(dbName), {useCausalConsistency: false, collName: unshardedCollName}); + + st.stop(); +})(); |