1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
|
/**
* Test that stepdown during collection cloning and oplog fetching does not interrupt initial sync.
*/
(function() {
"use strict";
load("jstests/libs/curop_helpers.js"); // for waitForCurOpByFailPoint().
load("jstests/libs/fail_point_util.js");
const testName = "initialSyncDuringStepDown";
const dbName = testName;
const collName = "testcoll";
// Start a 3 node replica set to avoid primary step down after secondary restart.
const rst = new ReplSetTest(
{nodes: [{}, {rsConfig: {priority: 0}}, {arbiter: true}], settings: {chainingAllowed: false}});
rst.startSet();
rst.initiate();
var primary = rst.getPrimary();
var primaryDB = primary.getDB(dbName);
var primaryAdmin = primary.getDB("admin");
var primaryColl = primaryDB[collName];
var secondary = rst.getSecondary();
var secondaryDB = secondary.getDB(dbName);
var secondaryColl = secondaryDB[collName];
var dbNss = primaryDB.getName();
var collNss = primaryColl.getFullName();
function setupTest({
failPoint,
nss: nss = '',
nssSuffix: nssSuffix = '',
secondaryStartupParams: secondaryStartupParams = {}
}) {
assert.commandWorked(primary.adminCommand({clearLog: "global"}));
jsTestLog("Writing data to collection.");
assert.commandWorked(primaryColl.insert([{_id: 1}, {_id: 2}]));
jsTestLog("Stopping secondary.");
rst.stop(secondary);
jsTestLog("Enabling failpoint '" + failPoint + "' on primary (sync source).");
assert.commandWorked(primary.adminCommand({
configureFailPoint: failPoint,
data: {nss: nss + nssSuffix, shouldCheckForInterrupt: true, shouldNotdropLock: true},
mode: "alwaysOn"
}));
jsTestLog("Starting secondary.");
secondaryStartupParams['numInitialSyncAttempts'] = 1;
// Skip clearing initial sync progress after a successful initial sync attempt so that we
// can check initialSyncStatus fields after initial sync is complete.
secondaryStartupParams['failpoint.skipClearInitialSyncState'] = tojson({mode: 'alwaysOn'});
secondary = rst.start(secondary, {startClean: true, setParameter: secondaryStartupParams});
secondaryDB = secondary.getDB(dbName);
secondaryColl = secondaryDB[collName];
// Wait until secondary reaches RS_STARTUP2 state.
rst.waitForState(secondary, ReplSetTest.State.STARTUP_2);
}
function finishTest(
{failPoint, nss: nss = '', DocsCopiedByOplogFetcher: DocsCopiedByOplogFetcher = 0}) {
jsTestLog("Waiting for primary to reach failPoint '" + failPoint + "'.");
waitForCurOpByFailPoint(primaryAdmin, new RegExp('^' + nss), failPoint);
jsTestLog("Making primary step down");
const joinStepDownThread = startParallelShell(() => {
assert.commandWorked(db.adminCommand({"replSetStepDown": 30 * 60, "force": true}));
}, primary.port);
// Wait until the step down has started to kill user operations.
checkLog.contains(primary, "Starting to kill user operations");
jsTestLog("Allowing initial sync to continue.");
assert.commandWorked(primaryAdmin.adminCommand({configureFailPoint: failPoint, mode: 'off'}));
jsTestLog("Waiting for initial sync to complete.");
rst.waitForState(secondary, ReplSetTest.State.SECONDARY);
// Wait until the primary transitioned to SECONDARY state.
joinStepDownThread();
rst.waitForState(primary, ReplSetTest.State.SECONDARY);
jsTestLog("Validating initial sync data.");
let res = assert.commandWorked(secondary.adminCommand({replSetGetStatus: 1}));
assert.eq(0, res.initialSyncStatus.failedInitialSyncAttempts);
assert.eq(2 + DocsCopiedByOplogFetcher, secondaryColl.find().itcount());
// As checkReplicatedDataHashes requires primary to validate the cloned data, we need to
// unfreeze the old primary and make it re-elected.
assert.commandWorked(primaryAdmin.adminCommand({replSetFreeze: 0}));
rst.getPrimary();
rst.checkReplicatedDataHashes();
jsTestLog("Dropping collection '" + collName + "'.");
assert(primaryColl.drop());
}
function runStepDownTest(params) {
setupTest(params);
finishTest(params);
}
jsTestLog("Testing stepdown while 'databases' cloner lists databases.");
runStepDownTest({failPoint: "hangBeforeListDatabases"});
jsTestLog("Testing stepdown while 'database' cloner lists collections.");
runStepDownTest(
{failPoint: "hangBeforeListCollections", nss: dbNss, nssSuffix: ".$cmd.listCollections"});
jsTestLog("Testing stepdown while 'collection' cloner performs collection count.");
runStepDownTest({failPoint: "hangBeforeCollectionCount", nss: collNss});
jsTestLog("Testing stepdown while 'collection' cloner list indexes for a collection.");
runStepDownTest({failPoint: "hangBeforeListIndexes", nss: collNss});
jsTestLog("Testing stepdown while 'collection' cloner clones collection data.");
runStepDownTest({failPoint: "waitInFindBeforeMakingBatch", nss: collNss});
jsTestLog("Testing stepdown between collection data batches.");
runStepDownTest({
failPoint: "waitWithPinnedCursorDuringGetMoreBatch",
nss: collNss,
secondaryStartupParams: {collectionClonerBatchSize: 1}
});
// Restart secondary with "oplogFetcherInitialSyncMaxFetcherRestarts"
// set to zero to avoid masking the oplog fetcher error and an increased oplog fetcher network
// timeout to avoid spurious failures. Enable fail point "waitAfterPinningCursorBeforeGetMoreBatch"
// which drops and reacquires read lock to prevent deadlock between getmore and insert thread for
// ephemeral storage engine.
jsTestLog("Testing stepdown during oplog fetching");
const oplogNss = "local.oplog.rs";
setupTest({
failPoint: "waitAfterPinningCursorBeforeGetMoreBatch",
nss: oplogNss,
secondaryStartupParams: {
initialSyncOplogFetcherBatchSize: 1,
oplogFetcherInitialSyncMaxFetcherRestarts: 0,
oplogNetworkTimeoutBufferSeconds: ReplSetTest.kDefaultTimeoutMS / 1000,
"failpoint.initialSyncHangAfterDataCloning": tojson({mode: 'alwaysOn'})
}
});
jsTestLog("Waiting for collection cloning to complete.");
assert.commandWorked(secondary.adminCommand({
waitForFailPoint: "initialSyncHangAfterDataCloning",
timesEntered: 1,
maxTimeMS: kDefaultWaitForFailPointTimeout
}));
// Insert more data so that these are replicated to secondary node via oplog fetcher.
jsTestLog("Inserting more data on primary.");
assert.commandWorked(primaryColl.insert([{_id: 3}, {_id: 4}]));
// Insert is successful. So, enable fail point "waitWithPinnedCursorDuringGetMoreBatch"
// such that it doesn't drop locks when getmore cmd waits inside the fail point block.
assert.commandWorked(primary.adminCommand({
configureFailPoint: "waitWithPinnedCursorDuringGetMoreBatch",
data: {nss: oplogNss, shouldCheckForInterrupt: true, shouldNotdropLock: true},
mode: "alwaysOn"
}));
// Now, disable fail point "waitAfterPinningCursorBeforeGetMoreBatch" to allow getmore to
// continue and hang on "waitWithPinnedCursorDuringGetMoreBatch" fail point.
assert.commandWorked(primary.adminCommand(
{configureFailPoint: "waitAfterPinningCursorBeforeGetMoreBatch", mode: "off"}));
// Disable fail point on secondary to allow initial sync to continue.
assert.commandWorked(
secondary.adminCommand({configureFailPoint: "initialSyncHangAfterDataCloning", mode: "off"}));
finishTest({
failPoint: "waitWithPinnedCursorDuringGetMoreBatch",
nss: "local.oplog.rs",
DocsCopiedByOplogFetcher: 2
});
rst.stopSet();
})();
|