summaryrefslogtreecommitdiff
path: root/jstests/sharding/ddl_commits_with_two_phase_oplog_notification.js
blob: 9502c94b312da31e9199c5acd991a58419d16221 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
/**
 * Verifies that the successful commit of Sharding DDL operations implementing the "2-phase oplog"
 * notification generate the expected op entries.
 * @tags: [
 *   does_not_support_stepdowns,
 *   requires_fcv_70,
 * ]
 */
(function() {
load('jstests/libs/fail_point_util.js');
load('jstests/libs/parallel_shell_helpers.js');

const kPrepareCommit = 0;
const kCommitSuccessful = 1;

const st = new ShardingTest({shards: 2, chunkSize: 1});

function verifyOpEntriesForDatabaseOnRS(dbName, isImported, dbPrimaryShard, replicaSet) {
    const primaryNodeOplog = replicaSet.getPrimary().getDB('local').oplog.rs;

    const generatedOpEntries = primaryNodeOplog.find({'o.msg.createDatabase': dbName}).toArray();
    assert.eq(2, generatedOpEntries.length);

    const prepareCommitEntry = generatedOpEntries[0];
    assert.eq(dbName, prepareCommitEntry.o2.createDatabase);
    assert.eq(kPrepareCommit, prepareCommitEntry.o2.phase);
    assert.eq(isImported, prepareCommitEntry.o2.isImported);
    assert.eq(dbPrimaryShard, prepareCommitEntry.o2.primaryShard);

    const commitSuccessfulEntry = generatedOpEntries[1];
    assert.eq(dbName, commitSuccessfulEntry.o2.createDatabase);
    assert.eq(kCommitSuccessful, commitSuccessfulEntry.o2.phase);
    assert.eq(isImported, commitSuccessfulEntry.o2.isImported);
    assert.eq(undefined, commitSuccessfulEntry.o2.primaryShard);
}

function testCreateDatabase() {
    jsTest.log('test createDatabase');
    const dbName = 'createDatabaseTestDB';
    const primaryShard = st.rs0;
    const primaryShardId = st.shard0.shardName;

    // Execute enableSharding, injecting a stepdown of the config server between the write into the
    // sharding catalog and the remote notification of the "commitSuccessful" event. The command is
    // expected to eventually succeed.
    let failpointHandle =
        configureFailPoint(st.configRS.getPrimary(), 'hangBeforeNotifyingCreateDatabaseCommitted');

    const joinDatabaseCreation = startParallelShell(
        funWithArgs(function(dbName, primaryShardName) {
            assert.commandWorked(
                db.adminCommand({enableSharding: dbName, primaryShard: primaryShardName}));
        }, dbName, primaryShardId), st.s.port);

    failpointHandle.wait();
    assert.commandWorked(st.configRS.getPrimary().adminCommand(
        {replSetStepDown: 10 /* stepDownSecs */, force: true}));
    failpointHandle.off();

    // Allow enableSharding to finish.
    joinDatabaseCreation();

    // Despite the CSRS stepdown, the remote notification of each phase should have reached the
    // primary shard of the newly created database. As a consequence of this, a single op entry for
    // each phase should have been generated.
    verifyOpEntriesForDatabaseOnRS(dbName, false /*isImported*/, primaryShardId, primaryShard);
}

function testAddShard() {
    jsTest.log('Test addShard');

    // Create a new replica set and populate it with two DBs
    const newReplicaSet = new ReplSetTest({name: 'addedShard', nodes: 1});
    const newShardName = 'addedShard';
    const preExistingCollName = 'preExistingColl';
    newReplicaSet.startSet({shardsvr: ""});
    newReplicaSet.initiate();
    const dbsOnNewReplicaSet = ['addShardTestDB1', 'addShardTestDB2'];
    for (const dbName of dbsOnNewReplicaSet) {
        const db = newReplicaSet.getPrimary().getDB(dbName);
        assert.commandWorked(db[preExistingCollName].save({value: 1}));
    }

    // Execute addShard, injecting a stepdown of the config server between the write into the
    // sharding catalog and the remote notification of the "commitSuccessful" event. The command is
    // expected to eventually succeed.
    let failpointHandle =
        configureFailPoint(st.configRS.getPrimary(), 'hangBeforeNotifyingaddShardCommitted');

    const joinAddShard = startParallelShell(
        funWithArgs(function(newShardUrl, newShardName) {
            assert.commandWorked(db.adminCommand({addShard: newShardUrl, name: newShardName}));
        }, newReplicaSet.getURL(), newShardName), st.s.port);

    failpointHandle.wait();
    assert.commandWorked(st.configRS.getPrimary().adminCommand(
        {replSetStepDown: 10 /* stepDownSecs */, force: true}));
    failpointHandle.off();

    // Allow addShard to finish.
    joinAddShard();

    // Despite the CSRS stepdown, the remote notification of each phase should have reached each
    // pre-existing shard of the cluster. As a consequence of this, each shard should contain 2 op
    // entries for each database imported from the new RS as part of addShard.
    for (let existingShard of [st.rs0, st.rs1]) {
        for (let importedDB of dbsOnNewReplicaSet) {
            verifyOpEntriesForDatabaseOnRS(
                importedDB, true /*isImported*/, newShardName, existingShard);
        }
    }

    // Execute the test case teardown
    st.s.adminCommand({removeShard: newShardName});
    newReplicaSet.stopSet();
}

testCreateDatabase();

testAddShard();

st.stop();
}());