summaryrefslogtreecommitdiff
path: root/jstests/replsets/tenant_migration_donor_unblock_reads_and_writes_on_completion.js
blob: 186d4298123d628d6d9979d72b8fd23ab3c129ff (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
/**
 * Tests that tenant migration donor reliably unblocks blocked reads and writes when the migration
 * completes or is interrupted when the state doc collection is dropped.
 *
 * @tags: [
 *   incompatible_with_eft,
 *   incompatible_with_macos,
 *   incompatible_with_windows_tls,
 *   requires_majority_read_concern,
 *   requires_persistence,
 * ]
 */

(function() {
"use strict";

load("jstests/libs/parallelTester.js");
load("jstests/libs/fail_point_util.js");
load("jstests/libs/uuid_util.js");
load("jstests/libs/write_concern_util.js");
load("jstests/replsets/libs/tenant_migration_test.js");
load("jstests/replsets/libs/tenant_migration_util.js");

function startReadThread(node, dbName, collName, afterClusterTime) {
    let readThread = new Thread((host, dbName, collName, afterClusterTime) => {
        const node = new Mongo(host);
        node.setSecondaryOk();
        const db = node.getDB(dbName);
        return db.runCommand({
            find: collName,
            readConcern: {afterClusterTime: Timestamp(afterClusterTime.t, afterClusterTime.i)}
        });
    }, node.host, dbName, collName, afterClusterTime);
    readThread.start();
    return readThread;
}

function startWriteThread(node, dbName, collName) {
    let writeThread = new Thread((host, dbName, collName) => {
        const node = new Mongo(host);
        const db = node.getDB(dbName);
        return db.runCommand({insert: collName, documents: [{_id: 1}]});
    }, node.host, dbName, collName);
    writeThread.start();
    return writeThread;
}

const donorRst = new ReplSetTest({
    nodes: 3,
    name: "donorRst",
    nodeOptions: Object.assign(TenantMigrationUtil.makeX509OptionsForTest().donor, {
        setParameter: {
            tenantMigrationGarbageCollectionDelayMS: 1,
            ttlMonitorSleepSecs: 1,
        }
    }),
    // Disallow chaining to force both secondaries to sync from the primary. One of the test cases
    // below disables replication on one of the secondaries, with chaining it would effectively
    // disable replication on both secondaries, causing the migration to hang since majority
    // write concern is unsatsifiable.
    settings: {chainingAllowed: false}
});
donorRst.startSet();
donorRst.initiate();

const tenantMigrationTest = new TenantMigrationTest({name: jsTestName(), donorRst});

const donorPrimary = tenantMigrationTest.getDonorPrimary();
const donorsColl = donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS);

const kTenantIdPrefix = "testTenantId";
const kDbName = "testDb";
const kCollName = "testColl";

(() => {
    jsTest.log(
        "Test that a lagged donor secondary correctly unblocks blocked reads after the migration aborts");
    const tenantId = kTenantIdPrefix + "LaggedSecondaryMigrationAborted";
    const dbName = tenantId + "_" + kDbName;
    assert.commandWorked(
        donorPrimary.getDB(dbName).runCommand({insert: kCollName, documents: [{_id: 0}]}));

    const migrationId = UUID();
    const migrationOpts = {
        migrationIdString: extractUUIDFromObject(migrationId),
        tenantId: tenantId,
    };

    let blockingFp =
        configureFailPoint(donorPrimary, "pauseTenantMigrationBeforeLeavingBlockingState");
    let abortFp =
        configureFailPoint(donorPrimary, "abortTenantMigrationBeforeLeavingBlockingState");
    assert.commandWorked(
        tenantMigrationTest.startMigration(migrationOpts, false /* retryOnRetryableErrors */));
    blockingFp.wait();
    donorRst.awaitReplication();

    // Run a read command against one of the secondaries, and wait for it to block.
    const laggedSecondary = donorRst.getSecondary();
    const donorDoc = donorsColl.findOne({tenantId: tenantId});
    assert.neq(null, donorDoc);
    const readThread = startReadThread(laggedSecondary, dbName, kCollName, donorDoc.blockTimestamp);
    assert.soon(() => TenantMigrationUtil.getNumBlockedReads(laggedSecondary, tenantId) == 1);

    // Disable snapshotting on that secondary, and wait for the migration to abort and be garbage
    // collected. That way the secondary is guaranteed to observe the write to set expireAt before
    // learning that the abortOpTime has been majority committed.
    let snapshotFp = configureFailPoint(laggedSecondary, "disableSnapshotting");
    blockingFp.off();
    TenantMigrationTest.assertAborted(
        tenantMigrationTest.waitForMigrationToComplete(migrationOpts));
    assert.commandWorked(tenantMigrationTest.forgetMigration(migrationOpts.migrationIdString));
    tenantMigrationTest.waitForMigrationGarbageCollection(
        migrationId, tenantId, [donorPrimary] /* donorNodes */, [] /* recipientNodes */);

    assert.commandWorked(readThread.returnData());
    abortFp.off();
    snapshotFp.off();
})();

(() => {
    jsTest.log(
        "Test that a lagged donor secondary correctly unblocks blocked reads after the migration commits");
    const tenantId = kTenantIdPrefix + "LaggedSecondaryMigrationCommitted";
    const dbName = tenantId + "_" + kDbName;
    assert.commandWorked(
        donorPrimary.getDB(dbName).runCommand({insert: kCollName, documents: [{_id: 0}]}));

    const migrationId = UUID();
    const migrationOpts = {
        migrationIdString: extractUUIDFromObject(migrationId),
        tenantId: tenantId,
    };

    let blockingFp =
        configureFailPoint(donorPrimary, "pauseTenantMigrationBeforeLeavingBlockingState");
    assert.commandWorked(
        tenantMigrationTest.startMigration(migrationOpts, false /* retryOnRetryableErrors */));
    blockingFp.wait();
    donorRst.awaitReplication();

    // Run a read command against one of the secondaries, and wait for it to block.
    const laggedSecondary = donorRst.getSecondary();
    const donorDoc = donorsColl.findOne({tenantId: tenantId});
    assert.neq(null, donorDoc);
    const readThread = startReadThread(laggedSecondary, dbName, kCollName, donorDoc.blockTimestamp);
    assert.soon(() => TenantMigrationUtil.getNumBlockedReads(laggedSecondary, tenantId) == 1);

    // Disable snapshotting on that secondary, and wait for the migration to commit and be garbage
    // collected. That way the secondary is guaranteed to observe the write to set expireAt before
    // learning that the commitOpTime has been majority committed.
    let snapshotFp = configureFailPoint(laggedSecondary, "disableSnapshotting");
    blockingFp.off();
    TenantMigrationTest.assertCommitted(
        tenantMigrationTest.waitForMigrationToComplete(migrationOpts));
    assert.commandWorked(tenantMigrationTest.forgetMigration(migrationOpts.migrationIdString));
    tenantMigrationTest.waitForMigrationGarbageCollection(
        migrationId, tenantId, [donorPrimary] /* donorNodes */, [] /* recipientNodes */);

    assert.commandFailedWithCode(readThread.returnData(), ErrorCodes.TenantMigrationCommitted);
    snapshotFp.off();
})();

(() => {
    jsTest.log(
        "Test that blocked writes and reads are interrupted when the donor's state doc collection is dropped");
    const tenantId = kTenantIdPrefix + "DropStateDocCollection";
    const dbName = tenantId + "_" + kDbName;
    assert.commandWorked(
        donorPrimary.getDB(dbName).runCommand({insert: kCollName, documents: [{_id: 0}]}));

    const migrationId = UUID();
    const migrationOpts = {
        migrationIdString: extractUUIDFromObject(migrationId),
        tenantId: tenantId,
    };

    let blockingFp =
        configureFailPoint(donorPrimary, "pauseTenantMigrationBeforeLeavingBlockingState");
    assert.commandWorked(
        tenantMigrationTest.startMigration(migrationOpts, false /* retryOnRetryableErrors */));
    blockingFp.wait();

    // Run a read command and a write command against the primary, and wait for them to block.
    const donorDoc = donorsColl.findOne({tenantId: tenantId});
    assert.neq(null, donorDoc);
    const readThread = startReadThread(donorPrimary, dbName, kCollName, donorDoc.blockTimestamp);
    const writeThread = startWriteThread(donorPrimary, dbName, kCollName);
    assert.soon(() => TenantMigrationUtil.getNumBlockedReads(donorPrimary, tenantId) == 1);
    assert.soon(() => TenantMigrationUtil.getNumBlockedWrites(donorPrimary, tenantId) == 1);

    // Cannot delete the donor state doc since it has not been marked as garbage collectable.
    assert.commandFailedWithCode(donorsColl.remove({}), ErrorCodes.IllegalOperation);

    // Cannot mark the state doc as garbage collectable before the migration commits or aborts.
    assert.commandFailedWithCode(
        donorsColl.update({tenantId: tenantId}, {$set: {expireAt: new Date()}}),
        ErrorCodes.BadValue);

    // Can drop the state doc collection but this will not cause all blocked reads and writes to
    // hang.
    assert(donorsColl.drop());
    assert.commandFailedWithCode(readThread.returnData(), ErrorCodes.Interrupted);
    assert.commandFailedWithCode(writeThread.returnData(), ErrorCodes.Interrupted);
    blockingFp.off();
})();

donorRst.stopSet();
tenantMigrationTest.stop();
})();