summaryrefslogtreecommitdiff
path: root/jstests/replsets/tenant_migration_recipient_retryable_writes_failover.js
blob: 3fc35f5d7720e79792223d55a3a63c9cbd1b705a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
/**
 * Tests whether the recipient correctly clears its oplog buffer if the recipient primary
 * fails over while fetching retryable writes oplog entries from the donor.
 *
 * @tags: [
 *   incompatible_with_eft,
 *   incompatible_with_macos,
 *   incompatible_with_windows_tls,
 *   requires_majority_read_concern,
 *   requires_persistence,
 * ]
 */

(function() {

"use strict";
load("jstests/libs/uuid_util.js");        // For extractUUIDFromObject().
load("jstests/libs/fail_point_util.js");  // For configureFailPoint().
load("jstests/libs/retryable_writes_util.js");
load("jstests/replsets/libs/tenant_migration_test.js");

if (!RetryableWritesUtil.storageEngineSupportsRetryableWrites(jsTest.options().storageEngine)) {
    jsTestLog("Retryable writes are not supported, skipping test");
    return;
}

const tenantMigrationTest =
    new TenantMigrationTest({name: jsTestName(), sharedOptions: {nodes: 2}});

const kMigrationId = UUID();
const kTenantId = 'testTenantId';
const kDbName = tenantMigrationTest.tenantDB(kTenantId, "testDb");
const kCollName = "testColl";
const migrationOpts = {
    migrationIdString: extractUUIDFromObject(kMigrationId),
    tenantId: kTenantId,
};

const donorRst = tenantMigrationTest.getDonorRst();
const donorPrimary = tenantMigrationTest.getDonorPrimary();
const rsConn = new Mongo(donorRst.getURL());
const recipientPrimary = tenantMigrationTest.getRecipientPrimary();

const session = rsConn.startSession({retryWrites: true});
const sessionColl = session.getDatabase(kDbName)[kCollName];

const session2 = rsConn.startSession({retryWrites: true});
const sessionColl2 = session2.getDatabase(kDbName)[kCollName];

jsTestLog("Run retryable writes prior to the migration.");
assert.commandWorked(sessionColl.insert({_id: "retryableWrite1"}));
assert.commandWorked(sessionColl2.insert({_id: "retryableWrite2"}));

jsTestLog("Setting up failpoints.");
// Use `pauseAfterRetrievingRetryableWritesBatch` to hang after inserting the first batch of results
// from the aggregation request into the oplog buffer.
const fpPauseAfterRetrievingRetryableWritesBatch =
    configureFailPoint(recipientPrimary, "pauseAfterRetrievingRetryableWritesBatch");

// Set aggregation request batch size to 1 so that we can failover in between batches.
const fpSetSmallAggregationBatchSize =
    configureFailPoint(recipientPrimary, "fpSetSmallAggregationBatchSize");

jsTestLog("Starting tenant migration with migrationId: " + kMigrationId +
          ", tenantId: " + kTenantId);
assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts));

jsTestLog("Waiting until the recipient primary fetches a batch of retryable writes oplog entries.");
fpSetSmallAggregationBatchSize.wait();
fpPauseAfterRetrievingRetryableWritesBatch.wait();

// Check that the oplog buffer is correctly populated.
const kOplogBufferNS = "repl.migration.oplog_" + migrationOpts.migrationIdString;
let recipientOplogBuffer = recipientPrimary.getDB("config")[kOplogBufferNS];
// We expect to have only retryableWrite1 since the cursor batch size is 1 and we paused after
// inserting the first branch of results from the aggregation request.
let cursor = recipientOplogBuffer.find();
assert.eq(cursor.itcount(), 1, "Incorrect number of oplog entries in buffer: " + cursor.toArray());

// Check that we haven't completed the retryable writes fetching stage yet.
let recipientConfigColl = recipientPrimary.getCollection(TenantMigrationTest.kConfigRecipientsNS);
let recipientDoc = recipientConfigColl.find({"_id": kMigrationId}).toArray();
assert.eq(recipientDoc.length, 1);
assert.eq(recipientDoc[0].completedFetchingRetryableWritesBeforeStartOpTime, false);

jsTestLog("Stepping a new primary up.");
const recipientRst = tenantMigrationTest.getRecipientRst();
const recipientSecondary = recipientRst.getSecondary();
// Use `fpAfterFetchingRetryableWritesEntriesBeforeStartOpTime` to hang after populating the oplog
// buffer with retryable writes entries. Set this before stepping up instead of after so that the
// new primary will not be able to pass this stage without the failpoint being set.
const fpAfterFetchingRetryableWritesEntries = configureFailPoint(
    recipientSecondary, "fpAfterFetchingRetryableWritesEntriesBeforeStartOpTime", {action: "hang"});

recipientRst.awaitLastOpCommitted();
assert.commandWorked(
    recipientSecondary.adminCommand({replSetStepUp: ReplSetTest.kForeverSecs, force: true}));

fpPauseAfterRetrievingRetryableWritesBatch.off();
const newRecipientPrimary = recipientRst.getPrimary();

fpAfterFetchingRetryableWritesEntries.wait();
// The new primary should have cleared its oplog buffer and refetched both retryableWrite1 and
// retryableWrite2. Otherwise, we will invariant when trying to add those entries.
recipientOplogBuffer = newRecipientPrimary.getDB("config")[kOplogBufferNS];
cursor = recipientOplogBuffer.find();
assert.eq(cursor.itcount(), 2, "Incorrect number of oplog entries in buffer: " + cursor.toArray());

recipientConfigColl = newRecipientPrimary.getCollection(TenantMigrationTest.kConfigRecipientsNS);
recipientDoc = recipientConfigColl.find({"_id": kMigrationId}).toArray();
assert.eq(recipientDoc.length, 1);
assert.eq(recipientDoc[0].completedFetchingRetryableWritesBeforeStartOpTime, true);

fpAfterFetchingRetryableWritesEntries.off();
fpSetSmallAggregationBatchSize.off();

jsTestLog("Waiting for migration to complete.");
TenantMigrationTest.assertCommitted(tenantMigrationTest.waitForMigrationToComplete(migrationOpts));

tenantMigrationTest.stop();
})();