summaryrefslogtreecommitdiff
path: root/jstests/slow1/mr_during_migrate.js
blob: c5d7b772efbe5fcee698e6b4fcbc0a35257b926f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
// Do parallel ops with migrates occurring
// @tags: [requires_sharding]

(function() {
    'use strict';

    var st = new ShardingTest({shards: 10, mongos: 2, verbose: 2});

    var mongos = st.s0;
    var admin = mongos.getDB("admin");
    var coll = st.s.getCollection(jsTest.name() + ".coll");

    var numDocs = 1024 * 1024;
    var dataSize = 1024;  // bytes, must be power of 2

    var data = "x";
    while (data.length < dataSize)
        data += data;

    var bulk = coll.initializeUnorderedBulkOp();
    for (var i = 0; i < numDocs; i++) {
        bulk.insert({_id: i, data: data});
    }
    assert.writeOK(bulk.execute());

    // Make sure everything got inserted
    assert.eq(numDocs, coll.find().itcount());

    jsTest.log("Inserted " + sh._dataFormat(dataSize * numDocs) + " of data.");

    // Shard collection
    st.shardColl(coll, {_id: 1}, false);

    st.printShardingStatus();

    jsTest.log("Sharded collection now initialized, starting migrations...");

    var checkMigrate = function() {
        print("Result of migrate : ");
        printjson(this);
    };

    // Creates a number of migrations of random chunks to diff shard servers
    var ops = [];
    for (var i = 0; i < st._connections.length; i++) {
        ops.push({
            op: "command",
            ns: "admin",
            command: {
                moveChunk: "" + coll,
                find: {_id: {"#RAND_INT": [0, numDocs]}},
                to: st._connections[i].shardName,
                _waitForDelete: true
            },
            showResult: true
        });
    }

    // TODO: Also migrate output collection

    jsTest.log("Starting migrations now...");

    var bid = benchStart({ops: ops, host: st.s.host, parallel: 1, handleErrors: false});

    //#######################
    // Tests during migration

    var numTests = 5;

    for (var t = 0; t < numTests; t++) {
        jsTest.log("Test #" + t);

        var mongos = st.s1;  // use other mongos so we get stale shard versions
        var coll = mongos.getCollection(coll + "");
        var outputColl = mongos.getCollection(coll + "_output");

        var numTypes = 32;
        var map = function() {
            emit(this._id % 32 /* must be hardcoded */, {c: 1});
        };

        var reduce = function(k, vals) {
            var total = 0;
            for (var i = 0; i < vals.length; i++)
                total += vals[i].c;
            return {c: total};
        };

        printjson(coll.find({_id: 0}).itcount());

        jsTest.log("Starting new mapReduce run #" + t);

        // assert.eq( coll.find().itcount(), numDocs )

        coll.getMongo().getDB("admin").runCommand({setParameter: 1, traceExceptions: true});

        printjson(coll.mapReduce(
            map, reduce, {out: {replace: outputColl.getName(), db: outputColl.getDB() + ""}}));

        jsTest.log("MapReduce run #" + t + " finished.");

        assert.eq(outputColl.find().itcount(), numTypes);

        outputColl.find().forEach(function(x) {
            assert.eq(x.value.c, numDocs / numTypes);
        });
    }

    printjson(benchFinish(bid));

    st.stop();
})();