1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
|
/**
* Verifies that the successful commit of Sharding DDL operations implementing the "2-phase oplog"
* notification generate the expected op entries.
* @tags: [
* does_not_support_stepdowns,
* requires_fcv_70,
* ]
*/
(function() {
load('jstests/libs/fail_point_util.js');
load('jstests/libs/parallel_shell_helpers.js');
const kPrepareCommit = 0;
const kCommitSuccessful = 1;
const st = new ShardingTest({shards: 2, chunkSize: 1});
function verifyOpEntriesForDatabaseOnRS(dbName, isImported, dbPrimaryShard, replicaSet) {
const primaryNodeOplog = replicaSet.getPrimary().getDB('local').oplog.rs;
const generatedOpEntries = primaryNodeOplog.find({'o.msg.createDatabase': dbName}).toArray();
assert.eq(2, generatedOpEntries.length);
const prepareCommitEntry = generatedOpEntries[0];
assert.eq(dbName, prepareCommitEntry.o2.createDatabase);
assert.eq(kPrepareCommit, prepareCommitEntry.o2.phase);
assert.eq(isImported, prepareCommitEntry.o2.isImported);
assert.eq(dbPrimaryShard, prepareCommitEntry.o2.primaryShard);
const commitSuccessfulEntry = generatedOpEntries[1];
assert.eq(dbName, commitSuccessfulEntry.o2.createDatabase);
assert.eq(kCommitSuccessful, commitSuccessfulEntry.o2.phase);
assert.eq(isImported, commitSuccessfulEntry.o2.isImported);
assert.eq(undefined, commitSuccessfulEntry.o2.primaryShard);
}
function testCreateDatabase() {
jsTest.log('test createDatabase');
const dbName = 'createDatabaseTestDB';
const primaryShard = st.rs0;
const primaryShardId = st.shard0.shardName;
// Execute enableSharding, injecting a stepdown of the config server between the write into the
// sharding catalog and the remote notification of the "commitSuccessful" event. The command is
// expected to eventually succeed.
let failpointHandle =
configureFailPoint(st.configRS.getPrimary(), 'hangBeforeNotifyingCreateDatabaseCommitted');
const joinDatabaseCreation = startParallelShell(
funWithArgs(function(dbName, primaryShardName) {
assert.commandWorked(
db.adminCommand({enableSharding: dbName, primaryShard: primaryShardName}));
}, dbName, primaryShardId), st.s.port);
failpointHandle.wait();
assert.commandWorked(st.configRS.getPrimary().adminCommand(
{replSetStepDown: 10 /* stepDownSecs */, force: true}));
failpointHandle.off();
// Allow enableSharding to finish.
joinDatabaseCreation();
// Despite the CSRS stepdown, the remote notification of each phase should have reached the
// primary shard of the newly created database. As a consequence of this, a single op entry for
// each phase should have been generated.
verifyOpEntriesForDatabaseOnRS(dbName, false /*isImported*/, primaryShardId, primaryShard);
}
function testAddShard() {
jsTest.log('Test addShard');
// Create a new replica set and populate it with two DBs
const newReplicaSet = new ReplSetTest({name: 'addedShard', nodes: 1});
const newShardName = 'addedShard';
const preExistingCollName = 'preExistingColl';
newReplicaSet.startSet({shardsvr: ""});
newReplicaSet.initiate();
const dbsOnNewReplicaSet = ['addShardTestDB1', 'addShardTestDB2'];
for (const dbName of dbsOnNewReplicaSet) {
const db = newReplicaSet.getPrimary().getDB(dbName);
assert.commandWorked(db[preExistingCollName].save({value: 1}));
}
// Execute addShard, injecting a stepdown of the config server between the write into the
// sharding catalog and the remote notification of the "commitSuccessful" event. The command is
// expected to eventually succeed.
let failpointHandle =
configureFailPoint(st.configRS.getPrimary(), 'hangBeforeNotifyingaddShardCommitted');
const joinAddShard = startParallelShell(
funWithArgs(function(newShardUrl, newShardName) {
assert.commandWorked(db.adminCommand({addShard: newShardUrl, name: newShardName}));
}, newReplicaSet.getURL(), newShardName), st.s.port);
failpointHandle.wait();
assert.commandWorked(st.configRS.getPrimary().adminCommand(
{replSetStepDown: 10 /* stepDownSecs */, force: true}));
failpointHandle.off();
// Allow addShard to finish.
joinAddShard();
// Despite the CSRS stepdown, the remote notification of each phase should have reached each
// pre-existing shard of the cluster. As a consequence of this, each shard should contain 2 op
// entries for each database imported from the new RS as part of addShard.
for (let existingShard of [st.rs0, st.rs1]) {
for (let importedDB of dbsOnNewReplicaSet) {
verifyOpEntriesForDatabaseOnRS(
importedDB, true /*isImported*/, newShardName, existingShard);
}
}
// Execute the test case teardown
st.s.adminCommand({removeShard: newShardName});
newReplicaSet.stopSet();
}
testCreateDatabase();
testAddShard();
st.stop();
}());
|