summaryrefslogtreecommitdiff
path: root/jstests/serverless/tenant_migration_concurrent_bulk_writes_against_mongoq.js
blob: e9436e10f837575a770f77fce18e511da53eb7c2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
/**
 * Tests read and write access after a migration aborted and also test read and write after a
 * migration commmitted successfully.
 * @tags: [requires_fcv_52, serverless]
 */

(function() {
"use strict";

load("jstests/libs/fail_point_util.js");
load("jstests/serverless/serverlesstest.js");
load('jstests/concurrency/fsm_libs/worker_thread.js');

function donorStartMigrationCmd(tenantID, realConnUrl) {
    return {
        donorStartMigration: 1,
        tenantId: tenantID.str,
        migrationId: UUID(),
        recipientConnectionString: realConnUrl,
        readPreference: {mode: "primary"}
    };
}

function bulkInsertDocs(primaryHost, dbName, collName, numDocs, isOrdered) {
    const primary = new Mongo(primaryHost);
    let primaryDB = primary.getDB(dbName);
    let bulk = isOrdered ? primaryDB[collName].initializeOrderedBulkOp()
                         : primaryDB[collName].initializeUnorderedBulkOp();
    for (let i = 0; i < numDocs; ++i) {
        bulk.insert({x: i});
    }

    let res;
    try {
        res = bulk.execute();
    } catch (e) {
        res = e;
    }
    return {res: res.getRawResponse(), ops: bulk.getOperations()};
}

/*
 * Test running a migration and then try to do an unordered/ordered bulk insert while the migration
 * is in a blocking state before it aborts. Since mongoq retries internally on
 * TenantMigrationAborted the test should pass with no error since the writes are retried.
 */
function orderedBulkInsertDuringBlockingState(st, isBulkWriteOrdered) {
    let titleOrderedStr = "Starting test - " + (isBulkWriteOrdered ? "ordered" : "unordered");
    jsTest.log(titleOrderedStr + " bulk insert during migration blocking state then aborts.");
    const tenantID = ObjectId();
    const kDbName = tenantID.str + "_test";
    let adminDB = st.rs0.getPrimary().getDB('admin');

    const kCollName = 'foo';

    assert.commandWorked(st.q0.adminCommand({enableSharding: kDbName}));
    st.ensurePrimaryShard(kDbName, st.shard0.shardName);

    let blockingFp = configureFailPoint(adminDB, "pauseTenantMigrationBeforeLeavingBlockingState");

    let abortFailPoint =
        configureFailPoint(adminDB, "abortTenantMigrationBeforeLeavingBlockingState");

    let cmdObj = donorStartMigrationCmd(tenantID, st.rs1.getURL());
    assert.commandWorked(adminDB.runCommand(cmdObj));
    blockingFp.wait();

    const kNumWriteOps = 6;
    const bulkWriteThread = new Thread(
        (bulkInsertDocsOrderedFunc, primaryHost, dbName, collName, numDocs, isOrdered) => {
            const res =
                bulkInsertDocsOrderedFunc(primaryHost, dbName, collName, numDocs, isOrdered);
            assert.eq(res.res.nInserted, numDocs);
            assert.eq(res.res.writeErrors.length, 0);
        },
        bulkInsertDocs,
        st.q0.host,
        kDbName,
        kCollName,
        kNumWriteOps,
        isBulkWriteOrdered);
    bulkWriteThread.start();

    assert.soon(function() {
        let mtab = st.rs0.getPrimary()
                       .getDB('admin')
                       .adminCommand({serverStatus: 1})
                       .tenantMigrationAccessBlocker[tenantID.str]
                       .donor;
        return mtab.numBlockedWrites > 0;
    }, "no blocked writes found", 1 * 5000, 1 * 1000);

    blockingFp.off();
    abortFailPoint.off();

    assert.soon(function() {
        let res = assert.commandWorked(adminDB.runCommand(cmdObj));
        return res['state'] == "aborted";
    }, "migration not in aborted state", 1 * 10000, 1 * 1000);

    bulkWriteThread.join();
}

/*
 * Test running a migration and then try to do an unordered/ordered once the migration has aborted.
 * Since the migration did not happen and was aborted, the bulk insert should succeed.
 */
function orderedBulkInsertAfterTenantMigrationAborted(st, isBulkWriteOrdered) {
    let titleOrderedStr = "Starting test - " + (isBulkWriteOrdered ? "ordered" : "unordered");
    jsTest.log(titleOrderedStr + " bulk insert after the migration has aborted.");
    const tenantID = ObjectId();
    const kDbName = tenantID.str + "_test";
    let adminDB = st.rs0.getPrimary().getDB('admin');

    const kCollName = 'foo';

    assert.commandWorked(st.q0.adminCommand({enableSharding: kDbName}));
    st.ensurePrimaryShard(kDbName, st.shard0.shardName);

    configureFailPoint(adminDB, "abortTenantMigrationBeforeLeavingBlockingState");

    let cmdObj = donorStartMigrationCmd(tenantID, st.rs1.getURL());
    assert.commandWorked(adminDB.runCommand(cmdObj));

    assert.soon(function() {
        let res = assert.commandWorked(adminDB.runCommand(cmdObj));
        return res['state'] == "aborted";
    }, "migration not in aborted state", 1 * 10000, 1 * 1000);

    const kNumWriteOps = 6;
    const bulkRes =
        bulkInsertDocs(st.q0.host, kDbName, kCollName, kNumWriteOps, isBulkWriteOrdered);
    assert.eq(bulkRes.res.nInserted, kNumWriteOps);
    assert.eq(bulkRes.res.writeErrors.length, 0);
}

let st = new ServerlessTest();

orderedBulkInsertDuringBlockingState(st, true);
orderedBulkInsertDuringBlockingState(st, false);
orderedBulkInsertAfterTenantMigrationAborted(st, true);
orderedBulkInsertAfterTenantMigrationAborted(st, false);

st.stop();
})();