summaryrefslogtreecommitdiff
path: root/jstests/sharding/txn_agg.js
blob: 9768ef41421737e6cdd5163e2bebdeaa25ae5f43 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
// @tags: [uses_transactions, requires_find_command, uses_multi_shard_transaction]
(function() {

    "use strict";

    const st = new ShardingTest({shards: 2});

    assert.commandWorked(st.s.adminCommand({enableSharding: 'test'}));
    st.ensurePrimaryShard("test", st.shard0.shardName);

    assert.commandWorked(st.s.adminCommand({shardCollection: 'test.user', key: {_id: 1}}));
    assert.commandWorked(st.s.adminCommand({split: 'test.user', middle: {_id: 0}}));
    assert.commandWorked(
        st.s.adminCommand({moveChunk: 'test.user', find: {_id: 0}, to: st.shard1.shardName}));

    // Preemptively create the collections in the shard since it is not allowed in transactions.
    let coll = st.s.getDB('test').user;
    coll.insert({_id: 1});
    coll.insert({_id: -1});
    coll.remove({});

    let unshardedColl = st.s.getDB('test').foo;
    unshardedColl.insert({_id: 0});
    unshardedColl.remove({});

    let session = st.s.startSession();
    let sessionDB = session.getDatabase('test');
    let sessionColl = sessionDB.getCollection('user');
    let sessionUnsharded = sessionDB.getCollection('foo');

    // passthrough

    session.startTransaction();

    sessionUnsharded.insert({_id: -1});
    sessionUnsharded.insert({_id: 1});
    assert.eq(2, sessionUnsharded.find().itcount());

    let res = sessionUnsharded.aggregate([{$match: {_id: {$gte: -200}}}]).toArray();
    assert.eq(2, res.length, tojson(res));

    assert.commandWorked(session.abortTransaction_forTesting());

    // merge on mongos

    session.startTransaction();

    sessionColl.insert({_id: -1});
    sessionColl.insert({_id: 1});
    assert.eq(2, sessionColl.find().itcount());

    res = sessionColl.aggregate([{$match: {_id: {$gte: -200}}}], {allowDiskUse: false}).toArray();
    assert.eq(2, res.length, tojson(res));

    assert.commandWorked(session.abortTransaction_forTesting());

    // merge on shard. This will require the merging shard to open a cursor on itself.
    session.startTransaction();

    sessionColl.insert({_id: -1});
    sessionColl.insert({_id: 1});
    assert.eq(2, sessionColl.find().itcount());

    res =
        sessionColl
            .aggregate(
                [{$match: {_id: {$gte: -200}}}, {$_internalSplitPipeline: {mergeType: "anyShard"}}])
            .toArray();
    assert.eq(2, res.length, tojson(res));

    assert.commandWorked(session.abortTransaction_forTesting());

    // Error case: provide a readConcern on an operation which comes in the middle of a transaction.
    session.startTransaction();

    sessionColl.insert({_id: -1});
    assert.eq(1, sessionColl.find().itcount());

    const err = assert.throws(
        () => sessionColl.aggregate(
            [{$match: {_id: {$gte: -200}}}, {$_internalSplitPipeline: {mergeType: "anyShard"}}],
            {readConcern: {level: "majority"}}

            ));
    assert.eq(err.code, ErrorCodes.InvalidOptions, err);

    session.abortTransaction();

    // Insert some data outside of a transaction.
    assert.commandWorked(sessionColl.insert([{_id: -1}, {_id: 0}, {_id: 1}]));

    // Run an aggregation which requires merging on a shard as the first operation in a transaction.
    session.startTransaction();
    assert.eq(
        [{_id: -1}, {_id: 0}, {_id: 1}],
        sessionColl
            .aggregate([{$_internalSplitPipeline: {mergeType: "primaryShard"}}, {$sort: {_id: 1}}])
            .toArray());
    session.commitTransaction();

    // Move all of the data to shard 1.
    assert.commandWorked(
        st.s.adminCommand({moveChunk: 'test.user', find: {_id: -1}, to: st.shard1.shardName}));

    // Be sure that only one shard will be targeted after the moveChunk.
    const pipeline = [{$_internalSplitPipeline: {mergeType: "primaryShard"}}, {$sort: {_id: 1}}];
    const explain = sessionColl.explain().aggregate(pipeline);
    assert.eq(Object.keys(explain.shards), [st.shard1.shardName], explain);

    // Now run the same aggregation, but again, force shard 0 to be the merger even though it has no
    // chunks for the collection.
    session.startTransaction();
    assert.eq([{_id: -1}, {_id: 0}, {_id: 1}], sessionColl.aggregate(pipeline).toArray());
    session.commitTransaction();

    st.stop();
})();