1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
|
// Testing parallelism of mapReduce in V8
// Our server and client need to be running V8 and the host we are running on needs at least two
// cores. Update this if you are testing more than three threads in parallel.
if (/V8/.test(interpreterVersion()) && db.runCommand({buildinfo: 1}).javascriptEngine == "V8" &&
db.hostInfo().system.numCores >= 2) {
// function timeSingleThread
// Description: Gathers data about how long it takes to run a given job
// Args: job - job to run
// tid - thread id passed as an argument to the job, default 0
// Returns: { threadStart : <time job started> , threadEnd : <time job completed> }
var timeSingleThread = function(job, tid) {
var tid = tid || 0;
var threadStart = new Date();
job(tid);
return {"threadStart": threadStart, "threadEnd": new Date()};
};
// function timeMultipleThreads
// Description: Gathers data about how long it takes to run a given job in multiple threads.
// Args: job - job to run in each thread
// nthreads - number of threads to spawn
// stagger - delay between each thread spawned in milliseconds
// Returns: Array with one entry for each thread of the form:
// [ { threadStart : <time elapsed before thread started work> ,
// threadEnd : <time elapsed before thread completed work> } ,
// ...
// ]
var timeMultipleThreads = function(job, nthreads, stagger) {
var i = 0;
var threads = [];
for (i = 0; i < nthreads; ++i) {
threads[i] = new Thread(timeSingleThread, job, i);
}
// Our "reference time" that all threads agree on
var referenceTime = new Date();
for (i = 0; i < nthreads; ++i) {
if (stagger && i > 0) {
sleep(stagger);
}
threads[i].start();
}
var threadTimes = [];
for (i = 0; i < nthreads; ++i) {
var returnData = threads[i].returnData();
threadTimes[i] = {};
threadTimes[i].threadStart = returnData.threadStart - referenceTime;
threadTimes[i].threadEnd = returnData.threadEnd - referenceTime;
}
return threadTimes;
};
// Display and analysis helper functions
var getLastCompletion = function(threadTimes) {
var lastCompletion = 0;
for (var i = 0; i < threadTimes.length; i++) {
lastCompletion = Math.max(lastCompletion, threadTimes[i].threadEnd);
}
return lastCompletion;
};
// Functions we are performance testing
db.v8_parallel_mr_src.drop();
for (j = 0; j < 100; j++)
for (i = 0; i < 512; i++) {
db.v8_parallel_mr_src.save({j: j, i: i});
}
db.getLastError();
var mrWorkFunction = function() {
function verifyOutput(out) {
// printjson(out);
assert.eq(out.counts.input, 51200, "input count is wrong");
assert.eq(out.counts.emit, 51200, "emit count is wrong");
assert.gt(out.counts.reduce, 99, "reduce count is wrong");
assert.eq(out.counts.output, 512, "output count is wrong");
}
function map() {
if (this.j % 2 == 0) {
emit(this.i, this.j * this.j);
} else {
emit(this.i, this.j + this.j);
}
}
function reduce(key, values) {
values_halved = values.map(function(value) {
return value / 2;
});
values_halved_sum = Array.sum(values_halved);
return values_halved_sum;
}
var out = db.v8_parallel_mr_src.mapReduce(map, reduce, {out: "v8_parallel_mr_out"});
verifyOutput(out);
};
var oneMapReduce = getLastCompletion(timeMultipleThreads(mrWorkFunction, 1));
var twoMapReduce = getLastCompletion(timeMultipleThreads(mrWorkFunction, 2));
var threeMapReduce = getLastCompletion(timeMultipleThreads(mrWorkFunction, 3));
printjson("One map reduce job: " + oneMapReduce);
printjson("Two map reduce jobs: " + twoMapReduce);
printjson("Three map reduce jobs: " + threeMapReduce);
assert(oneMapReduce * 1.75 > twoMapReduce);
assert(oneMapReduce * 2.5 > threeMapReduce);
}
|