summaryrefslogtreecommitdiff
path: root/jstests/sharding/resharding_critical_section_metrics.js
blob: 15192f49b37c8f842bc6f0703fca17437ffbd85a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
/**
 * Tests that resharding reads and writes during critical section metrics are incremented.
 *
 * @tags: [
 *   requires_fcv_61
 * ]
 */
(function() {
'use strict';

load("jstests/libs/fail_point_util.js");
load("jstests/sharding/libs/resharding_test_fixture.js");
load('jstests/libs/parallel_shell_helpers.js');

const reshardingTest = new ReshardingTest();
reshardingTest.setup();

const donorName = reshardingTest.donorShardNames[0];
const recipientName = reshardingTest.recipientShardNames[0];
const donorShard = reshardingTest.getReplSetForShard(donorName).getPrimary();
const sourceCollection = reshardingTest.createShardedCollection({
    ns: 'reshardingDb.coll',
    shardKeyPattern: {oldKey: 1},
    chunks: [
        {min: {oldKey: MinKey}, max: {oldKey: MaxKey}, shard: donorName},
    ]
});
const mongos = sourceCollection.getMongo();

const kWritesDuringCriticalSection = 'countWritesDuringCriticalSection';
const kReadsDuringCriticalSection = 'countReadsDuringCriticalSection';

function attemptFromParallelShell(fn) {
    const join = startParallelShell(funWithArgs((fn) => {
                                        db = db.getSiblingDB('reshardingDb');
                                        fn(db.coll);
                                    }, fn), mongos.port);
    return join;
}

function attemptWriteFromParallelShell() {
    return attemptFromParallelShell((coll) => {
        assert.commandWorked(coll.insert({_id: 0, oldKey: 0, newKey: 0}));
    });
}

function attemptReadFromParallelShell() {
    return attemptFromParallelShell((coll) => {
        coll.find({}).toArray();
    });
}

function getActiveSectionMetric(fieldName) {
    const stats = donorShard.getDB('admin').serverStatus({});
    const active = stats.shardingStatistics.resharding.active;
    return active[fieldName];
}

function assertIncrementsActiveSectionMetricSoon(fn, metricFieldName) {
    const before = getActiveSectionMetric(metricFieldName);
    fn();
    assert.soon(() => {
        const after = getActiveSectionMetric(metricFieldName);
        return after > before;
    });
}

const hangWhileBlockingReads =
    configureFailPoint(donorShard, "reshardingPauseDonorAfterBlockingReads");

let waitForWrite;
let waitForRead;

reshardingTest.withReshardingInBackground({
    newShardKeyPattern: {newKey: 1},
    newChunks: [{min: {newKey: MinKey}, max: {newKey: MaxKey}, shard: recipientName}],
},
                                          (tempNs) => {},
                                          {
                                              postDecisionPersistedFn: () => {
                                                  hangWhileBlockingReads.wait();
                                                  assertIncrementsActiveSectionMetricSoon(() => {
                                                      waitForWrite =
                                                          attemptWriteFromParallelShell();
                                                  }, kWritesDuringCriticalSection);
                                                  assertIncrementsActiveSectionMetricSoon(() => {
                                                      waitForRead = attemptReadFromParallelShell();
                                                  }, kReadsDuringCriticalSection);
                                                  hangWhileBlockingReads.off();
                                              }
                                          });

waitForWrite();
waitForRead();

reshardingTest.teardown();
})();