summaryrefslogtreecommitdiff
path: root/jstests/sharding/sharded_data_distribution.js
blob: a4d24d0c702cc39b1129511a8af8f3561dc0809b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
/*
 * Test to validate the $shardedDataDistribution stage.
 *
 * @tags: [
 *   requires_fcv_62,
 * ]
 */

(function() {
'use strict';

function testShardedDataAggregationStage() {
    // Get all expected results in obj format
    const fooResults = fooColl.aggregate([{$collStats: {storageStats: {}}}]).toArray();
    assert.neq(null, fooResults);
    const bazResults = bazColl.aggregate([{$collStats: {storageStats: {}}}]).toArray();
    assert.neq(null, bazResults);

    const objFooResults = {};
    for (let fooRes of fooResults) {
        objFooResults[fooRes.shard] = fooRes;
    }

    const objBazResults = {};
    for (const bazRes of bazResults) {
        objBazResults[bazRes.shard] = bazRes;
    }

    const expectedResults = {[ns1]: objFooResults, [ns2]: objBazResults};

    // Get data to validate
    const outputData = adminDb.aggregate([{$shardedDataDistribution: {}}]).toArray();

    assert.gte(outputData.length, 2);

    // Test the data obtained by $shardedDataDistribution stage
    for (const data of outputData) {
        const ns = data.ns;

        // Check only for namespaces test.foo and bar.baz
        if (expectedResults.hasOwnProperty(ns)) {
            // Check for length
            assert.eq(data.shards.length, Object.keys(expectedResults[ns]).length);

            // Check for data
            for (const shard of data.shards) {
                const outputShardName = shard.shardName;
                const outputOwnedSizeBytes = shard.ownedSizeBytes;
                const outputOrphanedSizeBytes = shard.orphanedSizeBytes;
                const outputNumOwnedDocuments = shard.numOwnedDocuments;
                const outputNumOrphanedDocs = shard.numOrphanedDocs;

                assert.eq(true, expectedResults[ns].hasOwnProperty(outputShardName));

                const avgObjSize = expectedResults[ns][outputShardName].storageStats.avgObjSize;
                const numOrphanDocs =
                    expectedResults[ns][outputShardName].storageStats.numOrphanDocs;
                const storageStatsCount = expectedResults[ns][outputShardName].storageStats.count;

                const expectedOwnedSizeBytes = (storageStatsCount - numOrphanDocs) * avgObjSize;
                const expectedOrphanedSizeBytes = numOrphanDocs * avgObjSize;
                const expectedNumOwnedDocuments = storageStatsCount - numOrphanDocs;
                const expectedNumOrphanedDocs = numOrphanDocs;

                assert.eq(outputOwnedSizeBytes, expectedOwnedSizeBytes);
                assert.eq(outputOrphanedSizeBytes, expectedOrphanedSizeBytes);
                assert.eq(outputNumOwnedDocuments, expectedNumOwnedDocuments);
                assert.eq(outputNumOrphanedDocs, expectedNumOrphanedDocs);
            }
        }
    }
}

// Configure initial sharding cluster
const st = new ShardingTest({shards: 2});
const mongos = st.s;

const ns1 = "test.foo";
const ns2 = "bar.baz";

const adminDb = mongos.getDB("admin");
const testDb = mongos.getDB("test");
const barDb = mongos.getDB("bar");
const fooColl = testDb.getCollection("foo");
const bazColl = barDb.getCollection("baz");

st.adminCommand({enablesharding: testDb.getName(), primaryShard: st.shard1.shardName});
st.adminCommand({shardcollection: ns1, key: {skey: 1}});
st.adminCommand({enablesharding: barDb.getName(), primaryShard: st.shard1.shardName});
st.adminCommand({shardcollection: ns2, key: {skey: 1}});

// Insert data to validate the aggregation stage
for (let i = 0; i < 6; i++) {
    assert.commandWorked(fooColl.insert({skey: i}));
    assert.commandWorked(bazColl.insert({skey: (i + 5)}));
}

// Test before chunk migration
testShardedDataAggregationStage();

st.adminCommand({split: ns1, middle: {skey: 2}});
st.adminCommand({moveChunk: ns1, find: {skey: 2}, to: st.shard0.name, _waitForDelete: true});
st.adminCommand({split: ns2, middle: {skey: 7}});
st.adminCommand({moveChunk: ns2, find: {skey: 7}, to: st.shard0.name, _waitForDelete: true});

// Test after chunk migration
testShardedDataAggregationStage();

// Test invalid queries/values.
assert.commandFailedWithCode(
    adminDb.runCommand({aggregate: 1, pipeline: [{$shardedDataDistribution: 3}], cursor: {}}),
    6789100);

const response = assert.commandFailedWithCode(
    testDb.runCommand({aggregate: "foo", pipeline: [{$shardedDataDistribution: {}}], cursor: {}}),
    6789102);
assert.neq(-1, response.errmsg.indexOf("$shardedDataDistribution"), response.errmsg);
assert.neq(-1, response.errmsg.indexOf("admin database"), response.errmsg);

st.stop();

// Test that verifies the behavior in unsharded deployments
const rsTest = new ReplSetTest({name: 'replicaSetTest', nodes: 2});
rsTest.startSet();
rsTest.initiate();

const primary = rsTest.getPrimary();
const admin = primary.getDB('admin');

const response2 = assert.commandFailedWithCode(
    admin.runCommand({aggregate: 1, pipeline: [{$shardedDataDistribution: {}}], cursor: {}}),
    6789101);
assert.neq(
    -1, response2.errmsg.indexOf("The $shardedDataDistribution stage can only be run on mongoS"));

rsTest.stopSet();
})();