summaryrefslogtreecommitdiff
path: root/jstests/sharding/unsharded_lookup_in_txn.js
blob: f04da181f3cd3ca45321161bd8b8029ddc41bff3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
/**
 * Test that $lookup within a sharded transaction reads from the correct snapshot.
 * @tags: [
 *   requires_sharding,
 *   uses_multi_shard_transaction,
 *   uses_transactions,
 * ]
 */
(function() {
"use strict";

load("jstests/sharding/libs/sharded_transactions_helpers.js");

const st = new ShardingTest({shards: 2, mongos: 1});
const kDBName = "unsharded_lookup_in_txn";

let session = st.s.startSession();
let sessionDB = session.getDatabase("unsharded_lookup_in_txn");

const shardedColl = sessionDB.sharded;
const unshardedColl = sessionDB.unsharded;

assert.commandWorked(st.s.adminCommand({enableSharding: sessionDB.getName()}));
st.ensurePrimaryShard(sessionDB.getName(), st.shard0.shardName);

assert.commandWorked(
    st.s.adminCommand({shardCollection: shardedColl.getFullName(), key: {_id: 1}}));

// Move all of the data to shard 1.
assert.commandWorked(st.s.adminCommand(
    {moveChunk: shardedColl.getFullName(), find: {_id: 0}, to: st.shard1.shardName}));
flushRoutersAndRefreshShardMetadata(st, {ns: shardedColl.getFullName()});

// Insert a bunch of documents, all of which reside on the same chunk (on shard 1).
for (let i = -10; i < 10; i++) {
    assert.commandWorked(shardedColl.insert({_id: i, local_always_one: 1}));
}

const pipeline = [{
        $lookup: {
            from: unshardedColl.getName(),
            localField: "local_always_one",
            foreignField: "foreign_always_one",
            as: "matches"
        }
    }];
const kBatchSize = 2;

const testLookupDoesNotSeeDocumentsOutsideSnapshot = function() {
    unshardedColl.drop();
    // Insert some stuff into the unsharded collection.
    const kUnshardedCollOriginalSize = 10;
    for (let i = 0; i < kUnshardedCollOriginalSize; i++) {
        assert.commandWorked(unshardedColl.insert({_id: i, foreign_always_one: 1}));
    }

    session.startTransaction();

    const curs = shardedColl.aggregate(
        pipeline, {readConcern: {level: "snapshot"}, cursor: {batchSize: kBatchSize}});

    for (let i = 0; i < kBatchSize; i++) {
        const doc = curs.next();
        assert.eq(doc.matches.length, kUnshardedCollOriginalSize);
    }

    // Do writes on the unsharded collection from outside the session.
    (function() {
        const unshardedCollOutsideSession =
            st.s.getDB(sessionDB.getName())[unshardedColl.getName()];
        assert.commandWorked(unshardedCollOutsideSession.insert({b: 1, xyz: 1}));
        assert.commandWorked(unshardedCollOutsideSession.insert({b: 1, xyz: 2}));
    })();

    // We shouldn't see those writes from the aggregation within the session.
    assert.eq(curs.hasNext(), true);
    while (curs.hasNext()) {
        const doc = curs.next();
        assert.eq(doc.matches.length, kUnshardedCollOriginalSize);
    }

    assert.commandWorked(session.abortTransaction_forTesting());
};

// Run the test once, with all of the data on shard 1. This means that the merging shard (shard
// 0) will not be targeted. This is interesting because in contrast to the case below, the
// merging half of the pipeline will start the transaction on the merging shard.
testLookupDoesNotSeeDocumentsOutsideSnapshot();

// Move some data to shard 0, so that the merging shard will be targeted.
assert.commandWorked(st.s.adminCommand({split: shardedColl.getFullName(), middle: {_id: 0}}));
assert.commandWorked(st.s.adminCommand(
    {moveChunk: shardedColl.getFullName(), find: {_id: -1}, to: st.shard0.shardName}));
flushRoutersAndRefreshShardMetadata(st, {ns: shardedColl.getFullName()});

// Run the test again.
testLookupDoesNotSeeDocumentsOutsideSnapshot();

st.stop();
})();