summaryrefslogtreecommitdiff
path: root/jstests/sharding/txn_agg.js
blob: dea07830dff9481dd14f85b815fcb318362bfc3c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
// @tags: [uses_transactions, requires_find_command, uses_multi_shard_transaction]
(function() {

"use strict";

const st = new ShardingTest({shards: 2});

assert.commandWorked(st.s.adminCommand({enableSharding: 'test'}));
st.ensurePrimaryShard("test", st.shard0.shardName);

assert.commandWorked(st.s.adminCommand({shardCollection: 'test.user', key: {_id: 1}}));
assert.commandWorked(st.s.adminCommand({split: 'test.user', middle: {_id: 0}}));
assert.commandWorked(
    st.s.adminCommand({moveChunk: 'test.user', find: {_id: 0}, to: st.shard1.shardName}));

// Preemptively create the collections in the shard since it is not allowed in transactions.
let coll = st.s.getDB('test').user;
coll.insert({_id: 1});
coll.insert({_id: -1});
coll.remove({});

let unshardedColl = st.s.getDB('test').foo;
unshardedColl.insert({_id: 0});
unshardedColl.remove({});

let session = st.s.startSession();
let sessionDB = session.getDatabase('test');
let sessionColl = sessionDB.getCollection('user');
let sessionUnsharded = sessionDB.getCollection('foo');

// Transactions do not internally retry on StaleDbVersion errors, so we
// ensure the primary shard's cached databaseVersion is fresh before running commands through
// mongos on the unsharded collections.
assert.commandWorked(st.shard0.adminCommand({_flushDatabaseCacheUpdates: "test"}));

// passthrough

session.startTransaction();

sessionUnsharded.insert({_id: -1});
sessionUnsharded.insert({_id: 1});
assert.eq(2, sessionUnsharded.find().itcount());

let res = sessionUnsharded.aggregate([{$match: {_id: {$gte: -200}}}]).toArray();
assert.eq(2, res.length, tojson(res));

assert.commandWorked(session.abortTransaction_forTesting());

// merge on mongos

session.startTransaction();

sessionColl.insert({_id: -1});
sessionColl.insert({_id: 1});
assert.eq(2, sessionColl.find().itcount());

res = sessionColl.aggregate([{$match: {_id: {$gte: -200}}}], {allowDiskUse: false}).toArray();
assert.eq(2, res.length, tojson(res));

assert.commandWorked(session.abortTransaction_forTesting());

// merge on shard. This will require the merging shard to open a cursor on itself.
session.startTransaction();

sessionColl.insert({_id: -1});
sessionColl.insert({_id: 1});
assert.eq(2, sessionColl.find().itcount());

res = sessionColl
          .aggregate(
              [{$match: {_id: {$gte: -200}}}, {$_internalSplitPipeline: {mergeType: "anyShard"}}])
          .toArray();
assert.eq(2, res.length, tojson(res));

assert.commandWorked(session.abortTransaction_forTesting());

// Error case: provide a readConcern on an operation which comes in the middle of a transaction.
session.startTransaction();

sessionColl.insert({_id: -1});
assert.eq(1, sessionColl.find().itcount());

const err = assert.throws(
    () => sessionColl.aggregate(
        [{$match: {_id: {$gte: -200}}}, {$_internalSplitPipeline: {mergeType: "anyShard"}}],
        {readConcern: {level: "majority"}}

        ));
assert.eq(err.code, ErrorCodes.InvalidOptions, err);

assert.commandWorked(session.abortTransaction_forTesting());

// Insert some data outside of a transaction.
assert.commandWorked(sessionColl.insert([{_id: -1}, {_id: 0}, {_id: 1}]));

// Run an aggregation which requires merging on a shard as the first operation in a transaction.
session.startTransaction();
assert.eq(
    [{_id: -1}, {_id: 0}, {_id: 1}],
    sessionColl
        .aggregate([{$_internalSplitPipeline: {mergeType: "primaryShard"}}, {$sort: {_id: 1}}])
        .toArray());
assert.commandWorked(session.commitTransaction_forTesting());

// Move all of the data to shard 1.
assert.commandWorked(
    st.s.adminCommand({moveChunk: 'test.user', find: {_id: -1}, to: st.shard1.shardName}));

// Be sure that only one shard will be targeted after the moveChunk.
const pipeline = [{$_internalSplitPipeline: {mergeType: "primaryShard"}}, {$sort: {_id: 1}}];
const explain = sessionColl.explain().aggregate(pipeline);
assert.eq(Object.keys(explain.shards), [st.shard1.shardName], explain);

// Now run the same aggregation, but again, force shard 0 to be the merger even though it has no
// chunks for the collection.
session.startTransaction();
assert.eq([{_id: -1}, {_id: 0}, {_id: 1}], sessionColl.aggregate(pipeline).toArray());
assert.commandWorked(session.commitTransaction_forTesting());

st.stop();
})();