summaryrefslogtreecommitdiff
path: root/jstests/sharding/sharded_data_distribution.js
blob: 30a06a634f1804cbbf09017e96a0f6c5452e4adc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
/*
 * Test to validate the $shardedDataDistribution stage.
 *
 * @tags: [
 *   requires_fcv_62,
 * ]
 */

(function() {
'use strict';

function testShardedDataAggregationStage() {
    // Get all expected results in obj format
    const fooResults = fooColl.aggregate([{$collStats: {storageStats: {}}}]).toArray();
    assert.neq(null, fooResults);
    const bazResults = bazColl.aggregate([{$collStats: {storageStats: {}}}]).toArray();
    assert.neq(null, bazResults);

    const objFooResults = {};
    for (let fooRes of fooResults) {
        objFooResults[fooRes.shard] = fooRes;
    }

    const objBazResults = {};
    for (const bazRes of bazResults) {
        objBazResults[bazRes.shard] = bazRes;
    }

    const expectedResults = {[ns1]: objFooResults, [ns2]: objBazResults};

    // Get data to validate
    const outputData = adminDb.aggregate([{$shardedDataDistribution: {}}]).toArray();

    assert.gte(outputData.length, 2);

    // Test the data obtained by $shardedDataDistribution stage
    for (const data of outputData) {
        const ns = data.ns;

        // Check only for namespaces test.foo and bar.baz
        if (expectedResults.hasOwnProperty(ns)) {
            // Check for length
            assert.eq(data.shards.length, Object.keys(expectedResults[ns]).length);

            // Check for data
            for (const shard of data.shards) {
                const outputShardName = shard.shardName;
                const outputOwnedSizeBytes = shard.ownedSizeBytes;
                const outputOrphanedSizeBytes = shard.orphanedSizeBytes;
                const outputNumOwnedDocuments = shard.numOwnedDocuments;
                const outputNumOrphanedDocs = shard.numOrphanedDocs;

                assert.eq(true, expectedResults[ns].hasOwnProperty(outputShardName));

                const avgObjSize = expectedResults[ns][outputShardName].storageStats.avgObjSize;
                const numOrphanDocs =
                    expectedResults[ns][outputShardName].storageStats.numOrphanDocs;
                const storageStatsCount = expectedResults[ns][outputShardName].storageStats.count;

                const expectedOwnedSizeBytes = (storageStatsCount - numOrphanDocs) * avgObjSize;
                const expectedOrphanedSizeBytes = numOrphanDocs * avgObjSize;
                const expectedNumOwnedDocuments = storageStatsCount - numOrphanDocs;
                const expectedNumOrphanedDocs = numOrphanDocs;

                assert.eq(outputOwnedSizeBytes, expectedOwnedSizeBytes);
                assert.eq(outputOrphanedSizeBytes, expectedOrphanedSizeBytes);
                assert.eq(outputNumOwnedDocuments, expectedNumOwnedDocuments);
                assert.eq(outputNumOrphanedDocs, expectedNumOrphanedDocs);
            }
        }
    }
}

// Configure initial sharding cluster
const st = new ShardingTest({shards: 2});
const mongos = st.s;

const ns1 = "test.foo";
const ns2 = "bar.baz";

const adminDb = mongos.getDB("admin");
const testDb = mongos.getDB("test");
const barDb = mongos.getDB("bar");
const fooColl = testDb.getCollection("foo");
const bazColl = barDb.getCollection("baz");

st.adminCommand({enablesharding: testDb.getName(), primaryShard: st.shard1.shardName});
st.adminCommand({shardcollection: ns1, key: {skey: 1}});
st.adminCommand({enablesharding: barDb.getName(), primaryShard: st.shard1.shardName});
st.adminCommand({shardcollection: ns2, key: {skey: 1}});

// Insert data to validate the aggregation stage
for (let i = 0; i < 6; i++) {
    assert.commandWorked(fooColl.insert({skey: i}));
    assert.commandWorked(bazColl.insert({skey: (i + 5)}));
}

// Test before chunk migration
testShardedDataAggregationStage();

st.adminCommand({split: ns1, middle: {skey: 2}});
st.adminCommand({moveChunk: ns1, find: {skey: 2}, to: st.shard0.name, _waitForDelete: true});
st.adminCommand({split: ns2, middle: {skey: 7}});
st.adminCommand({moveChunk: ns2, find: {skey: 7}, to: st.shard0.name, _waitForDelete: true});

// Test after chunk migration
testShardedDataAggregationStage();

// Test invalid queries/values.
assert.commandFailedWithCode(
    adminDb.runCommand({aggregate: 1, pipeline: [{$shardedDataDistribution: 3}], cursor: {}}),
    6789100);

const response = assert.commandFailedWithCode(
    testDb.runCommand({aggregate: "foo", pipeline: [{$shardedDataDistribution: {}}], cursor: {}}),
    6789102);
assert.neq(-1, response.errmsg.indexOf("$shardedDataDistribution"), response.errmsg);
assert.neq(-1, response.errmsg.indexOf("admin database"), response.errmsg);

// Test $shardedDataDistribution followed by a $match stage on the 'ns'.
assert.eq(1, adminDb.aggregate([{$shardedDataDistribution: {}}, {$match: {ns: ns1}}]).itcount());
assert.eq(2,
          adminDb.aggregate([{$shardedDataDistribution: {}}, {$match: {ns: {$in: [ns1, ns2]}}}])
              .itcount());
assert.eq(0,
          adminDb.aggregate([{$shardedDataDistribution: {}}, {$match: {ns: 'test.IDoNotExist'}}])
              .itcount());

// Test $shardedDataDistribution followed by a $match stage on the 'ns' and something else.
assert.eq(
    1,
    adminDb.aggregate([{$shardedDataDistribution: {}}, {$match: {ns: ns1, shards: {$size: 2}}}])
        .itcount());
assert.eq(
    0,
    adminDb.aggregate([{$shardedDataDistribution: {}}, {$match: {ns: ns1, shards: {$size: 50}}}])
        .itcount());

// Test $shardedDataDistribution followed by a $match stage on the 'ns' and other match stages.
assert.eq(
    1,
    adminDb
        .aggregate(
            [{$shardedDataDistribution: {}}, {$match: {ns: ns1}}, {$match: {shards: {$size: 2}}}])
        .itcount());
assert.eq(
    0,
    adminDb
        .aggregate(
            [{$shardedDataDistribution: {}}, {$match: {ns: ns1}}, {$match: {shards: {$size: 50}}}])
        .itcount());
assert.eq(1,
          adminDb
              .aggregate([
                  {$shardedDataDistribution: {}},
                  {$match: {ns: /^test/}},
                  {$match: {shards: {$size: 2}}},
                  {$match: {ns: /foo$/}},
              ])
              .itcount());

// Test $shardedDataDistribution followed by a $match stage unrelated to 'ns'.
assert.eq(
    0,
    adminDb.aggregate([{$shardedDataDistribution: {}}, {$match: {shards: {$size: 50}}}]).itcount());

assert.neq(
    0,
    adminDb.aggregate([{$shardedDataDistribution: {}}, {$match: {shards: {$size: 2}}}]).itcount());

st.stop();

// Test that verifies the behavior in unsharded deployments
const rsTest = new ReplSetTest({name: 'replicaSetTest', nodes: 2});
rsTest.startSet();
rsTest.initiate();

const primary = rsTest.getPrimary();
const admin = primary.getDB('admin');

const response2 = assert.commandFailedWithCode(
    admin.runCommand({aggregate: 1, pipeline: [{$shardedDataDistribution: {}}], cursor: {}}),
    6789101);
assert.neq(
    -1, response2.errmsg.indexOf("The $shardedDataDistribution stage can only be run on mongoS"));

rsTest.stopSet();
})();