summaryrefslogtreecommitdiff
path: root/jstests/sharding/transactions_multi_writes.js
blob: 2dbb9b3624e89f62b72665cf4af2a39e80f0b67d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
/**
 * Verifies multi-writes in transactions are sent with shard versions to only the targeted shards.
 *
 * @tags: [
 *   requires_sharding,
 *   uses_multi_shard_transaction,
 *   uses_transactions,
 * ]
 */
(function() {
"use strict";

load("jstests/libs/collection_drop_recreate.js");
load("jstests/sharding/libs/sharded_transactions_helpers.js");

const dbName = "test";
const collName = "foo";
const ns = dbName + "." + collName;

const st = new ShardingTest({shards: 3, mongos: 2});

enableCoordinateCommitReturnImmediatelyAfterPersistingDecision(st);
enableStaleVersionAndSnapshotRetriesWithinTransactions(st);

// Set up a sharded collection with 3 chunks, [min, 0), [0, 10), [10, max), one on each shard,
// with one document in each.

assert.commandWorked(st.s.adminCommand({enableSharding: dbName}));
st.ensurePrimaryShard(dbName, st.shard0.shardName);

function setupCollection() {
    assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {skey: 1}}));
    assert.commandWorked(st.s.adminCommand({split: ns, middle: {skey: 0}}));
    assert.commandWorked(st.s.adminCommand({split: ns, middle: {skey: 10}}));
    assert.commandWorked(
        st.s.adminCommand({moveChunk: ns, find: {skey: 5}, to: st.shard1.shardName}));
    assert.commandWorked(
        st.s.adminCommand({moveChunk: ns, find: {skey: 15}, to: st.shard2.shardName}));

    assert.commandWorked(st.s.getDB(dbName)[collName].insert({_id: 1, counter: 0, skey: -5}));
    assert.commandWorked(st.s.getDB(dbName)[collName].insert({_id: 2, counter: 0, skey: 5}));
    assert.commandWorked(st.s.getDB(dbName)[collName].insert({_id: 3, counter: 0, skey: 15}));
}

// Runs the given multi-write and asserts a manually inserted orphan document is not affected.
// The write is assumed to target chunks [min, 0) and [0, 10), which begin on shard0 and shard1,
// respectively.
function runTest(st, session, writeCmd, staleRouter) {
    const isUpdate = writeCmd.hasOwnProperty("update");
    const sessionDB = session.getDatabase(dbName);

    setupCollection();

    let orphanShardName;
    let orphanDoc = {_id: 2, counter: 0, skey: 5};
    if (staleRouter) {
        // Using a separate router, move a chunk that will be targeted by the write to a shard
        // that would not be targeted by a stale router. Put the orphan on the shard that
        // previously owned the chunk to verify the multi-write obeys the shard versioning
        // protocol.
        assert.commandWorked(st.s1.adminCommand(
            {moveChunk: ns, find: {skey: 5}, to: st.shard2.shardName, _waitForDelete: true}));
        orphanShardName = "rs1";
    } else {
        // Otherwise put the orphan on a shard that should not be targeted by a fresh router to
        // verify the multi-write is not broadcast to all shards.
        orphanShardName = "rs2";
    }

    const orphanShardDB = st[orphanShardName].getPrimary().getDB(dbName);
    assert.commandWorked(
        orphanShardDB[collName].insert(orphanDoc, {writeConcern: {w: "majority"}}));

    // Start a transaction with majority read concern to ensure the orphan will be visible if
    // its shard is targeted and send the multi-write.
    session.startTransaction({readConcern: {level: "majority"}});
    assert.commandWorked(sessionDB.runCommand(writeCmd));

    // The write shouldn't be visible until the transaction commits.
    assert.sameMembers(st.getDB(dbName)[collName].find().toArray(), [
        {_id: 1, counter: 0, skey: -5},
        {_id: 2, counter: 0, skey: 5},
        {_id: 3, counter: 0, skey: 15}
    ]);

    // Commit the transaction and verify the write was successful.
    assert.commandWorked(session.commitTransaction_forTesting());
    if (isUpdate) {
        assert.sameMembers(
            sessionDB[collName].find().toArray(),
            [
                {_id: 1, counter: 1, skey: -5},
                {_id: 2, counter: 1, skey: 5},
                {_id: 3, counter: 0, skey: 15}
            ],
            "document mismatch for update, stale: " + staleRouter + ", cmd: " + tojson(writeCmd));
    } else {  // isDelete
        assert.sameMembers(
            sessionDB[collName].find().toArray(),
            [{_id: 3, counter: 0, skey: 15}],
            "document mismatch for delete, stale: " + staleRouter + ", cmd: " + tojson(writeCmd));
    }

    // The orphaned document should not have been affected.
    assert.docEq(
        orphanDoc,
        orphanShardDB[collName].findOne({skey: orphanDoc.skey}),
        "document mismatch for orphaned doc, stale: " + staleRouter + ", cmd: " + tojson(writeCmd));

    assertDropCollection(st.getDB(dbName), collName);
}

const session = st.s.startSession();

let multiUpdate = {
    update: collName,
    updates: [{q: {skey: {$lte: 5}}, u: {$inc: {counter: 1}}, multi: true}]
};

multiUpdate.ordered = false;
runTest(st, session, multiUpdate, false /*staleRouter*/);
// TODO: SERVER-39704 uncomment when mongos can internally retry txn on stale errors for real.
// runTest(st, session, multiUpdate, true /*staleRouter*/);

multiUpdate.ordered = true;
runTest(st, session, multiUpdate, false /*staleRouter*/);
// TODO: SERVER-39704 uncomment when mongos can internally retry txn on stale errors for real.
// runTest(st, session, multiUpdate, true /*staleRouter*/);

let multiDelete = {delete: collName, deletes: [{q: {skey: {$lte: 5}}, limit: 0}]};

multiDelete.ordered = false;
runTest(st, session, multiDelete, false /*staleRouter*/);
// TODO: SERVER-39704 uncomment when mongos can internally retry txn on stale errors for real.
// runTest(st, session, multiDelete, true /*staleRouter*/);

multiDelete.ordered = true;
runTest(st, session, multiDelete, false /*staleRouter*/);
// TODO: SERVER-39704 uncomment when mongos can internally retry txn on stale errors for real.
// runTest(st, session, multiDelete, true /*staleRouter*/);

disableStaleVersionAndSnapshotRetriesWithinTransactions(st);

st.stop();
})();