summaryrefslogtreecommitdiff
path: root/jstests/sharding/transactions_causal_consistency.js
blob: e2f6a9aed587a4b2a7d8451b66bc6b3587c29a3f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
// Verifies basic sharded transaction behavior with causal consistency.
//
// @tags: [
//   requires_find_command,
//   requires_sharding,
//   uses_multi_shard_transaction,
//   uses_transactions,
// ]
(function() {
"use strict";

load("jstests/sharding/libs/sharded_transactions_helpers.js");

const dbName = "test";
const collName = "foo";
const ns = dbName + "." + collName;

const st = new ShardingTest({shards: 2, mongos: 2});

enableStaleVersionAndSnapshotRetriesWithinTransactions(st);

// Set up a sharded collection with 2 chunks, [min, 0) and [0, max), one on each shard, with one
// document in each.

assert.commandWorked(st.s.adminCommand({enableSharding: dbName}));
st.ensurePrimaryShard(dbName, st.shard0.shardName);

assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {_id: 1}}));
assert.commandWorked(st.s.adminCommand({split: ns, middle: {_id: 0}}));
assert.commandWorked(st.s.adminCommand({moveChunk: ns, find: {_id: 1}, to: st.shard1.shardName}));

assert.writeOK(st.s.getDB(dbName)[collName].insert({_id: -1}, {writeConcern: {w: "majority"}}));
assert.writeOK(st.s.getDB(dbName)[collName].insert({_id: 1}, {writeConcern: {w: "majority"}}));

// Verifies transactions using causal consistency read all causally prior operations.
function runTest(st, readConcern) {
    jsTestLog("Testing readConcern: " + tojson(readConcern));

    const session = st.s.startSession({causalConsistency: true});
    const sessionDB = session.getDatabase(dbName);

    // Insert data to one shard in a causally consistent session.
    const docToInsert = {_id: 5};
    assert.commandWorked(sessionDB.runCommand({insert: collName, documents: [docToInsert]}));

    // Through a separate router move the chunk that was inserted to, so the original router is
    // stale when it starts its transaction.
    const otherRouter = st.s1;
    assert.commandWorked(
        otherRouter.adminCommand({moveChunk: ns, find: docToInsert, to: st.shard0.shardName}));

    session.startTransaction({readConcern: readConcern});

    // The transaction should always see the document written earlier through its session,
    // regardless of the move.
    //
    // Note: until transactions can read from secondaries and/or disabling speculative snapshot
    // is allowed, read concerns that do not require global snapshots (i.e. local and majority)
    // will always read the inserted document here because the local snapshot established on
    // this shard will include all currently applied operations, which must include all earlier
    // acknowledged writes.
    assert.docEq(docToInsert,
                 sessionDB[collName].findOne(docToInsert),
                 "sharded transaction with read concern " + tojson(readConcern) +
                     " did not see expected document");

    assert.commandWorked(session.commitTransaction_forTesting());

    // Clean up for the next iteration.
    assert.commandWorked(
        st.s.adminCommand({moveChunk: ns, find: docToInsert, to: st.shard1.shardName}));
    assert.writeOK(sessionDB[collName].remove(docToInsert));
}

const kAllowedReadConcernLevels = ["local", "majority", "snapshot"];
for (let readConcernLevel of kAllowedReadConcernLevels) {
    runTest(st, {level: readConcernLevel});
}

disableStaleVersionAndSnapshotRetriesWithinTransactions(st);

st.stop();
})();