summaryrefslogtreecommitdiff
path: root/jstests/perf/v8_mapreduce.js
blob: 46afcf3708201e6414c0f6472dd70b1098ee6b29 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
// Testing parallelism of mapReduce in V8

// Our server and client need to be running V8 and the host we are running on needs at least two
// cores.  Update this if you are testing more than three threads in parallel.
if (/V8/.test(interpreterVersion()) && db.runCommand({buildinfo: 1}).javascriptEngine == "V8" &&
    db.hostInfo().system.numCores >= 2) {
    // function timeSingleThread
    // Description: Gathers data about how long it takes to run a given job
    // Args: job - job to run
    //       tid - thread id passed as an argument to the job, default 0
    // Returns: { threadStart : <time job started> , threadEnd : <time job completed> }
    var timeSingleThread = function(job, tid) {
        var tid = tid || 0;
        var threadStart = new Date();
        job(tid);
        return {"threadStart": threadStart, "threadEnd": new Date()};
    };

    // function timeMultipleThreads
    // Description: Gathers data about how long it takes to run a given job in multiple threads.
    // Args: job - job to run in each thread
    // nthreads - number of threads to spawn
    // stagger - delay between each thread spawned in milliseconds
    // Returns: Array with one entry for each thread of the form:
    // [ { threadStart : <time elapsed before thread started work> ,
    //     threadEnd : <time elapsed before thread completed work> } ,
    //     ...
    //     ]
    var timeMultipleThreads = function(job, nthreads, stagger) {
        var i = 0;
        var threads = [];

        for (i = 0; i < nthreads; ++i) {
            threads[i] = new Thread(timeSingleThread, job, i);
        }

        // Our "reference time" that all threads agree on
        var referenceTime = new Date();

        for (i = 0; i < nthreads; ++i) {
            if (stagger && i > 0) {
                sleep(stagger);
            }
            threads[i].start();
        }

        var threadTimes = [];
        for (i = 0; i < nthreads; ++i) {
            var returnData = threads[i].returnData();
            threadTimes[i] = {};
            threadTimes[i].threadStart = returnData.threadStart - referenceTime;
            threadTimes[i].threadEnd = returnData.threadEnd - referenceTime;
        }

        return threadTimes;
    };

    // Display and analysis helper functions

    var getLastCompletion = function(threadTimes) {
        var lastCompletion = 0;
        for (var i = 0; i < threadTimes.length; i++) {
            lastCompletion = Math.max(lastCompletion, threadTimes[i].threadEnd);
        }
        return lastCompletion;
    };

    // Functions we are performance testing

    db.v8_parallel_mr_src.drop();

    for (j = 0; j < 100; j++)
        for (i = 0; i < 512; i++) {
            db.v8_parallel_mr_src.save({j: j, i: i});
        }

    db.getLastError();

    var mrWorkFunction = function() {
        function verifyOutput(out) {
            // printjson(out);
            assert.eq(out.counts.input, 51200, "input count is wrong");
            assert.eq(out.counts.emit, 51200, "emit count is wrong");
            assert.gt(out.counts.reduce, 99, "reduce count is wrong");
            assert.eq(out.counts.output, 512, "output count is wrong");
        }

        function map() {
            if (this.j % 2 == 0) {
                emit(this.i, this.j * this.j);
            } else {
                emit(this.i, this.j + this.j);
            }
        }

        function reduce(key, values) {
            values_halved = values.map(function(value) {
                return value / 2;
            });
            values_halved_sum = Array.sum(values_halved);
            return values_halved_sum;
        }

        var out = db.v8_parallel_mr_src.mapReduce(map, reduce, {out: "v8_parallel_mr_out"});
        verifyOutput(out);
    };

    var oneMapReduce = getLastCompletion(timeMultipleThreads(mrWorkFunction, 1));
    var twoMapReduce = getLastCompletion(timeMultipleThreads(mrWorkFunction, 2));
    var threeMapReduce = getLastCompletion(timeMultipleThreads(mrWorkFunction, 3));

    printjson("One map reduce job: " + oneMapReduce);
    printjson("Two map reduce jobs: " + twoMapReduce);
    printjson("Three map reduce jobs: " + threeMapReduce);

    assert(oneMapReduce * 1.75 > twoMapReduce);
    assert(oneMapReduce * 2.5 > threeMapReduce);
}