1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
|
// mr.js
MR = {};
MR.init = function() {
$max = 0;
$arr = [];
emit = MR.emit;
$numEmits = 0;
$numReduces = 0;
$numReducesToDB = 0;
gc(); // this is just so that keep memory size sane
};
MR.cleanup = function() {
MR.init();
gc();
};
MR.emit = function(k, v) {
$numEmits++;
var num = nativeHelper.apply(get_num_, [k]);
var data = $arr[num];
if (!data) {
data = {
key: k,
values: new Array(1000),
count: 0
};
$arr[num] = data;
}
data.values[data.count++] = v;
$max = Math.max($max, data.count);
};
MR.doReduce = function(useDB) {
$numReduces++;
if (useDB)
$numReducesToDB++;
$max = 0;
for (var i = 0; i < $arr.length; i++) {
var data = $arr[i];
if (!data)
continue;
if (useDB) {
var x = tempcoll.findOne({_id: data.key});
if (x) {
data.values[data.count++] = x.value;
}
}
var r = $reduce(data.key, data.values.slice(0, data.count));
if (r && r.length && r[0]) {
data.values = r;
data.count = r.length;
} else {
data.values[0] = r;
data.count = 1;
}
$max = Math.max($max, data.count);
if (useDB) {
if (data.count == 1) {
tempcoll.save({_id: data.key, value: data.values[0]});
} else {
tempcoll.save({_id: data.key, value: data.values.slice(0, data.count)});
}
}
}
};
MR.check = function() {
if ($max < 2000 && $arr.length < 1000) {
return 0;
}
MR.doReduce();
if ($max < 2000 && $arr.length < 1000) {
return 1;
}
MR.doReduce(true);
$arr = [];
$max = 0;
reset_num();
gc();
return 2;
};
MR.finalize = function() {
tempcoll.find().forEach(function(z) {
z.value = $finalize(z._id, z.value);
tempcoll.save(z);
});
};
|