summaryrefslogtreecommitdiff
path: root/jstests/slow2/mr_during_migrate.js
blob: cb439aeb241588a79c6c2f2b255643f0b64cea2e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
// Do parallel ops with migrates occurring

var st = new ShardingTest({shards: 10, mongos: 2, verbose: 2});

jsTest.log("Doing parallel operations...");

// Stop balancer, since it'll just get in the way of these
st.stopBalancer();

var mongos = st.s0;
var admin = mongos.getDB("admin");
var coll = st.s.getCollection(jsTest.name() + ".coll");

var numDocs = 1024 * 1024;
var dataSize = 1024;  // bytes, must be power of 2

var data = "x";
while (data.length < dataSize)
    data += data;

var bulk = coll.initializeUnorderedBulkOp();
for (var i = 0; i < numDocs; i++) {
    bulk.insert({_id: i, data: data});
}
assert.writeOK(bulk.execute());

// Make sure everything got inserted
assert.eq(numDocs, coll.find().itcount());

jsTest.log("Inserted " + sh._dataFormat(dataSize * numDocs) + " of data.");

// Shard collection
st.shardColl(coll, {_id: 1}, false);

st.printShardingStatus();

jsTest.log("Sharded collection now initialized, starting migrations...");

var checkMigrate = function() {
    print("Result of migrate : ");
    printjson(this);
};

// Creates a number of migrations of random chunks to diff shard servers
var ops = [];
for (var i = 0; i < st._connections.length; i++) {
    ops.push({
        op: "command",
        ns: "admin",
        command: {
            moveChunk: "" + coll,
            find: {_id: {"#RAND_INT": [0, numDocs]}},
            to: st._connections[i].shardName,
            _waitForDelete: true
        },
        showResult: true
    });
}

// TODO: Also migrate output collection

jsTest.log("Starting migrations now...");

var bid = benchStart({ops: ops, host: st.s.host, parallel: 1, handleErrors: false});

//#######################
// Tests during migration

var numTests = 5;

for (var t = 0; t < numTests; t++) {
    jsTest.log("Test #" + t);

    var mongos = st.s1;  // use other mongos so we get stale shard versions
    var coll = mongos.getCollection(coll + "");
    var outputColl = mongos.getCollection(coll + "_output");

    var numTypes = 32;
    var map = function() {
        emit(this._id % 32 /* must be hardcoded */, {c: 1});
    };
    var reduce = function(k, vals) {
        var total = 0;
        for (var i = 0; i < vals.length; i++)
            total += vals[i].c;
        return {
            c: total
        };
    };

    printjson(coll.find({_id: 0}).itcount());

    jsTest.log("Starting new mapReduce run #" + t);

    // assert.eq( coll.find().itcount(), numDocs )

    coll.getMongo().getDB("admin").runCommand({setParameter: 1, traceExceptions: true});

    printjson(coll.mapReduce(
        map, reduce, {out: {replace: outputColl.getName(), db: outputColl.getDB() + ""}}));

    jsTest.log("MapReduce run #" + t + " finished.");

    assert.eq(outputColl.find().itcount(), numTypes);

    outputColl.find().forEach(function(x) {
        assert.eq(x.value.c, numDocs / numTypes);
    });
}

printjson(benchFinish(bid));

st.stop();