diff options
Diffstat (limited to 'jstests/sharding/txn_agg.js')
-rw-r--r-- | jstests/sharding/txn_agg.js | 171 |
1 files changed, 85 insertions, 86 deletions
diff --git a/jstests/sharding/txn_agg.js b/jstests/sharding/txn_agg.js index 5e03e082c66..dea07830dff 100644 --- a/jstests/sharding/txn_agg.js +++ b/jstests/sharding/txn_agg.js @@ -1,122 +1,121 @@ // @tags: [uses_transactions, requires_find_command, uses_multi_shard_transaction] (function() { - "use strict"; +"use strict"; - const st = new ShardingTest({shards: 2}); +const st = new ShardingTest({shards: 2}); - assert.commandWorked(st.s.adminCommand({enableSharding: 'test'})); - st.ensurePrimaryShard("test", st.shard0.shardName); +assert.commandWorked(st.s.adminCommand({enableSharding: 'test'})); +st.ensurePrimaryShard("test", st.shard0.shardName); - assert.commandWorked(st.s.adminCommand({shardCollection: 'test.user', key: {_id: 1}})); - assert.commandWorked(st.s.adminCommand({split: 'test.user', middle: {_id: 0}})); - assert.commandWorked( - st.s.adminCommand({moveChunk: 'test.user', find: {_id: 0}, to: st.shard1.shardName})); +assert.commandWorked(st.s.adminCommand({shardCollection: 'test.user', key: {_id: 1}})); +assert.commandWorked(st.s.adminCommand({split: 'test.user', middle: {_id: 0}})); +assert.commandWorked( + st.s.adminCommand({moveChunk: 'test.user', find: {_id: 0}, to: st.shard1.shardName})); - // Preemptively create the collections in the shard since it is not allowed in transactions. - let coll = st.s.getDB('test').user; - coll.insert({_id: 1}); - coll.insert({_id: -1}); - coll.remove({}); +// Preemptively create the collections in the shard since it is not allowed in transactions. +let coll = st.s.getDB('test').user; +coll.insert({_id: 1}); +coll.insert({_id: -1}); +coll.remove({}); - let unshardedColl = st.s.getDB('test').foo; - unshardedColl.insert({_id: 0}); - unshardedColl.remove({}); +let unshardedColl = st.s.getDB('test').foo; +unshardedColl.insert({_id: 0}); +unshardedColl.remove({}); - let session = st.s.startSession(); - let sessionDB = session.getDatabase('test'); - let sessionColl = sessionDB.getCollection('user'); - let sessionUnsharded = sessionDB.getCollection('foo'); +let session = st.s.startSession(); +let sessionDB = session.getDatabase('test'); +let sessionColl = sessionDB.getCollection('user'); +let sessionUnsharded = sessionDB.getCollection('foo'); - // Transactions do not internally retry on StaleDbVersion errors, so we - // ensure the primary shard's cached databaseVersion is fresh before running commands through - // mongos on the unsharded collections. - assert.commandWorked(st.shard0.adminCommand({_flushDatabaseCacheUpdates: "test"})); +// Transactions do not internally retry on StaleDbVersion errors, so we +// ensure the primary shard's cached databaseVersion is fresh before running commands through +// mongos on the unsharded collections. +assert.commandWorked(st.shard0.adminCommand({_flushDatabaseCacheUpdates: "test"})); - // passthrough +// passthrough - session.startTransaction(); +session.startTransaction(); - sessionUnsharded.insert({_id: -1}); - sessionUnsharded.insert({_id: 1}); - assert.eq(2, sessionUnsharded.find().itcount()); +sessionUnsharded.insert({_id: -1}); +sessionUnsharded.insert({_id: 1}); +assert.eq(2, sessionUnsharded.find().itcount()); - let res = sessionUnsharded.aggregate([{$match: {_id: {$gte: -200}}}]).toArray(); - assert.eq(2, res.length, tojson(res)); +let res = sessionUnsharded.aggregate([{$match: {_id: {$gte: -200}}}]).toArray(); +assert.eq(2, res.length, tojson(res)); - assert.commandWorked(session.abortTransaction_forTesting()); +assert.commandWorked(session.abortTransaction_forTesting()); - // merge on mongos +// merge on mongos - session.startTransaction(); +session.startTransaction(); - sessionColl.insert({_id: -1}); - sessionColl.insert({_id: 1}); - assert.eq(2, sessionColl.find().itcount()); +sessionColl.insert({_id: -1}); +sessionColl.insert({_id: 1}); +assert.eq(2, sessionColl.find().itcount()); - res = sessionColl.aggregate([{$match: {_id: {$gte: -200}}}], {allowDiskUse: false}).toArray(); - assert.eq(2, res.length, tojson(res)); +res = sessionColl.aggregate([{$match: {_id: {$gte: -200}}}], {allowDiskUse: false}).toArray(); +assert.eq(2, res.length, tojson(res)); - assert.commandWorked(session.abortTransaction_forTesting()); +assert.commandWorked(session.abortTransaction_forTesting()); - // merge on shard. This will require the merging shard to open a cursor on itself. - session.startTransaction(); +// merge on shard. This will require the merging shard to open a cursor on itself. +session.startTransaction(); - sessionColl.insert({_id: -1}); - sessionColl.insert({_id: 1}); - assert.eq(2, sessionColl.find().itcount()); +sessionColl.insert({_id: -1}); +sessionColl.insert({_id: 1}); +assert.eq(2, sessionColl.find().itcount()); - res = - sessionColl - .aggregate( - [{$match: {_id: {$gte: -200}}}, {$_internalSplitPipeline: {mergeType: "anyShard"}}]) - .toArray(); - assert.eq(2, res.length, tojson(res)); +res = sessionColl + .aggregate( + [{$match: {_id: {$gte: -200}}}, {$_internalSplitPipeline: {mergeType: "anyShard"}}]) + .toArray(); +assert.eq(2, res.length, tojson(res)); - assert.commandWorked(session.abortTransaction_forTesting()); +assert.commandWorked(session.abortTransaction_forTesting()); - // Error case: provide a readConcern on an operation which comes in the middle of a transaction. - session.startTransaction(); +// Error case: provide a readConcern on an operation which comes in the middle of a transaction. +session.startTransaction(); - sessionColl.insert({_id: -1}); - assert.eq(1, sessionColl.find().itcount()); +sessionColl.insert({_id: -1}); +assert.eq(1, sessionColl.find().itcount()); - const err = assert.throws( - () => sessionColl.aggregate( - [{$match: {_id: {$gte: -200}}}, {$_internalSplitPipeline: {mergeType: "anyShard"}}], - {readConcern: {level: "majority"}} +const err = assert.throws( + () => sessionColl.aggregate( + [{$match: {_id: {$gte: -200}}}, {$_internalSplitPipeline: {mergeType: "anyShard"}}], + {readConcern: {level: "majority"}} - )); - assert.eq(err.code, ErrorCodes.InvalidOptions, err); + )); +assert.eq(err.code, ErrorCodes.InvalidOptions, err); - assert.commandWorked(session.abortTransaction_forTesting()); +assert.commandWorked(session.abortTransaction_forTesting()); - // Insert some data outside of a transaction. - assert.commandWorked(sessionColl.insert([{_id: -1}, {_id: 0}, {_id: 1}])); +// Insert some data outside of a transaction. +assert.commandWorked(sessionColl.insert([{_id: -1}, {_id: 0}, {_id: 1}])); - // Run an aggregation which requires merging on a shard as the first operation in a transaction. - session.startTransaction(); - assert.eq( - [{_id: -1}, {_id: 0}, {_id: 1}], - sessionColl - .aggregate([{$_internalSplitPipeline: {mergeType: "primaryShard"}}, {$sort: {_id: 1}}]) - .toArray()); - assert.commandWorked(session.commitTransaction_forTesting()); +// Run an aggregation which requires merging on a shard as the first operation in a transaction. +session.startTransaction(); +assert.eq( + [{_id: -1}, {_id: 0}, {_id: 1}], + sessionColl + .aggregate([{$_internalSplitPipeline: {mergeType: "primaryShard"}}, {$sort: {_id: 1}}]) + .toArray()); +assert.commandWorked(session.commitTransaction_forTesting()); - // Move all of the data to shard 1. - assert.commandWorked( - st.s.adminCommand({moveChunk: 'test.user', find: {_id: -1}, to: st.shard1.shardName})); +// Move all of the data to shard 1. +assert.commandWorked( + st.s.adminCommand({moveChunk: 'test.user', find: {_id: -1}, to: st.shard1.shardName})); - // Be sure that only one shard will be targeted after the moveChunk. - const pipeline = [{$_internalSplitPipeline: {mergeType: "primaryShard"}}, {$sort: {_id: 1}}]; - const explain = sessionColl.explain().aggregate(pipeline); - assert.eq(Object.keys(explain.shards), [st.shard1.shardName], explain); +// Be sure that only one shard will be targeted after the moveChunk. +const pipeline = [{$_internalSplitPipeline: {mergeType: "primaryShard"}}, {$sort: {_id: 1}}]; +const explain = sessionColl.explain().aggregate(pipeline); +assert.eq(Object.keys(explain.shards), [st.shard1.shardName], explain); - // Now run the same aggregation, but again, force shard 0 to be the merger even though it has no - // chunks for the collection. - session.startTransaction(); - assert.eq([{_id: -1}, {_id: 0}, {_id: 1}], sessionColl.aggregate(pipeline).toArray()); - assert.commandWorked(session.commitTransaction_forTesting()); +// Now run the same aggregation, but again, force shard 0 to be the merger even though it has no +// chunks for the collection. +session.startTransaction(); +assert.eq([{_id: -1}, {_id: 0}, {_id: 1}], sessionColl.aggregate(pipeline).toArray()); +assert.commandWorked(session.commitTransaction_forTesting()); - st.stop(); +st.stop(); })(); |