summaryrefslogtreecommitdiff
path: root/jstests/sharding/data_size_aware_balancing_sessions_collection.js
blob: 9513c44aac2a558a7e83350494f796e3afd7fbc6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
/*
 * Tests that the balancer splits the sessions collection and uniformly distributes the chunks
 * across shards in the cluster.
 * @tags: [
 * featureFlagBalanceAccordingToDataSize,
 * requires_fcv_61,
 * resource_intensive,
 * ]
 */
(function() {
"use strict";

load("jstests/libs/feature_flag_util.js");
load("jstests/sharding/libs/find_chunks_util.js");

// TODO SERVER-50144 Remove this and allow orphan checking.
// This test calls removeShard which can leave docs in config.rangeDeletions in state "pending",
// therefore preventing orphans from being cleaned up.
TestData.skipCheckOrphans = true;

/*
 * Returns the number of chunks for the sessions collection.
 */
function getNumTotalChunks() {
    return findChunksUtil.countChunksForNs(configDB, kSessionsNs);
}

/*
 * Returns the number of chunks for the sessions collection that are the given shard.
 */
function getNumChunksOnShard(shardName) {
    return findChunksUtil.countChunksForNs(configDB, kSessionsNs, {shard: shardName});
}

/*
 * Returns the number of docs in the sessions collection on the given host.
 */
function getNumSessionDocs(conn) {
    return conn.getCollection(kSessionsNs).find().itcount();
}

/*
 * Starts a replica-set shard, adds the shard to the cluster, and increments numShards.
 * Returns the ReplSetTest object for the shard.
 */
function addShardsToCluster(shardsToAdd) {
    let addedReplicaSets = [];
    for (let i = 0; i < shardsToAdd; ++i) {
        const shardName = clusterName + "-rs" + numShards;
        const replTest = new ReplSetTest({name: shardName, nodes: 1});
        replTest.startSet({shardsvr: ""});
        replTest.initiate();

        assert.commandWorked(st.s.adminCommand({addShard: replTest.getURL(), name: shardName}));
        numShards++;
        addedReplicaSets.push(replTest);
    }
    return addedReplicaSets;
}

/*
 * Removes the given shard from the cluster, waits util the state is completed, and
 * decrements numShards.
 */
function removeShardFromCluster(shardName) {
    assert.commandWorked(st.s.adminCommand({removeShard: shardName}));
    assert.soon(function() {
        const res = st.s.adminCommand({removeShard: shardName});
        if (!res.ok && res.code === ErrorCodes.ShardNotFound) {
            // If the config server primary steps down right after removing the config.shards doc
            // for the shard but before responding with "state": "completed", the mongos would retry
            // the _configsvrRemoveShard command against the new config server primary, which would
            // not find the removed shard in its ShardRegistry if it has done a ShardRegistry reload
            // after the config.shards doc for the shard was removed. This would cause the command
            // to fail with ShardNotFound.
            return true;
        }
        assert.commandWorked(res);
        return ("completed" == res.state);
    }, "failed to remove shard " + shardName, kBalancerTimeoutMS);
    numShards--;
}

/*
 * Returns the estimated size (in bytes) of the sessions collection chunks hosted by the shard.
 */
function getSessionsCollSizeInShard(shardStats) {
    const orphansSize =
        shardStats['storageStats']['numOrphanDocs'] * shardStats['storageStats']['avgObjSize'];
    return shardStats['storageStats']['size'] - orphansSize;
}

function printSessionsCollectionDistribution(shards) {
    const numDocsOnShards = shards.map(shard => getNumSessionDocs(shard));
    const collStatsPipeline = [
        {'$collStats': {'storageStats': {}}},
        {
            '$project': {
                'shard': true,
                'storageStats':
                    {'count': true, 'size': true, 'avgObjSize': true, 'numOrphanDocs': true}
            }
        },
        {'$sort': {'shard': 1}}
    ];
    const collectionStorageStats =
        st.s.getCollection(kSessionsNs).aggregate(collStatsPipeline).toArray();
    const collSizeDistribution =
        collectionStorageStats.map(shardStats => getSessionsCollSizeInShard(shardStats));
    const numChunksOnShard = shards.map(shard => getNumChunksOnShard(shard.shardName));
    const kMaxChunkSizeBytes = st.config.collections.findOne({_id: kSessionsNs}).maxChunkSizeBytes;

    jsTest.log(`Sessions distribution across shards ${tojson(shards)}: #docs = ${
        tojson(numDocsOnShards)}, #chunks = ${tojson(numChunksOnShard)}, size = ${
        tojson(collSizeDistribution)}, #maxChunkSize: ${tojson(kMaxChunkSizeBytes)}`);
}

function waitUntilBalancedAndVerify(shards) {
    const coll = st.s.getCollection(kSessionsNs);
    st.awaitBalance(
        kSessionsCollName, kConfigDbName, 9 * 60000 /* 9min timeout */, 1000 /* 1s interval */);
    printSessionsCollectionDistribution(shards);
    st.verifyCollectionIsBalanced(coll);
}

const kMinNumChunks = 100;
const kExpectedNumChunks = 128;  // the balancer rounds kMinNumChunks to the next power of 2.
const kNumSessions = 2000;
const kBalancerTimeoutMS = 5 * 60 * 1000;

let numShards = 2;
const clusterName = jsTest.name();
const st = new ShardingTest({
    name: clusterName,
    shards: numShards,
    other: {configOptions: {setParameter: {minNumChunksForSessionsCollection: kMinNumChunks}}}
});

const kConfigDbName = "config";
const kSessionsCollName = "system.sessions";
const kSessionsNs = `${kConfigDbName}.${kSessionsCollName}`;
const configDB = st.s.getDB(kConfigDbName);

// There is only one chunk initially.
assert.eq(1, getNumTotalChunks());

st.startBalancer();

jsTest.log(
    `Verify that the balancer generates the expected initial set of chunks for ${kSessionsNs}`);

assert.soon(() => getNumTotalChunks() == kExpectedNumChunks,
            "balancer did not split the initial chunk for the sessions collection");

jsTest.log(`Verify that no chunks are moved from the primary shard of ${
    kSessionsNs} if the are no open sessions`);
{
    st.awaitBalance(kSessionsCollName, kConfigDbName);
    const numChunksInShard0 = getNumChunksOnShard(st.shard0.shardName);
    const numChunksInShard1 = getNumChunksOnShard(st.shard1.shardName);
    assert(numChunksInShard0 === kExpectedNumChunks && numChunksInShard1 === 0 ||
           numChunksInShard1 === kExpectedNumChunks && numChunksInShard0 === 0);
}

jsTest.log(`Creating ${kNumSessions} sessions`);
for (let i = 0; i < kNumSessions; i++) {
    assert.commandWorked(st.s.adminCommand({startSession: 1}));
}
assert.commandWorked(st.s.adminCommand({refreshLogicalSessionCacheNow: 1}));
assert.lte(kNumSessions, getNumSessionDocs(st.s));
let shards = [st.shard0, st.shard1];
jsTest.log(`Verify that the chunks of ${kSessionsNs} get distributed across the original cluster`);
waitUntilBalancedAndVerify(shards);

jsTest.log(
    "Verify that the balancer redistributes chunks when more shards are added to the cluster");
const addedReplicaSets = addShardsToCluster(3);
shards = shards.concat(addedReplicaSets.map(rs => {
    const primaryNode = rs.getPrimary();
    primaryNode.shardName = rs.name;
    return primaryNode;
}));
waitUntilBalancedAndVerify(shards);

jsTest.log(
    "Verify that the balancer redistributes chunks when shards are removed from the cluster");
removeShardFromCluster(shards[2].shardName);
shards.splice(2, 1);
waitUntilBalancedAndVerify(shards);

st.stopBalancer();

st.stop();
addedReplicaSets.forEach(rs => {
    rs.stopSet();
});
}());