summaryrefslogtreecommitdiff
path: root/shell/mr.js
blob: 7b0814dd5577526a5ce4b768d969ea6ee6e64eea (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
// mr.js

MR = {};

MR.init = function(){
    $max = 0;
    $arr = [];
    emit = MR.emit;
    $numEmits = 0;
    $numReduces = 0;
    $numReducesToDB = 0;
    gc(); // this is just so that keep memory size sane
}

MR.cleanup = function(){
    MR.init();
    gc();
}

MR.emit = function(k,v){
    $numEmits++;
    var num = nativeHelper.apply( get_num_ , [ k ] );
    var data = $arr[num];
    if ( ! data ){
        data = { key : k , values : new Array(1000) , count : 0 };
        $arr[num] = data;
    }
    data.values[data.count++] = v;
    $max = Math.max( $max , data.count );
}

MR.doReduce = function( useDB ){
    $numReduces++;
    if ( useDB )
        $numReducesToDB++;
    $max = 0;
    for ( var i=0; i<$arr.length; i++){
        var data = $arr[i];
        if ( ! data ) 
            continue;
        
        if ( useDB ){
            var x = tempcoll.findOne( { _id : data.key } );
            if ( x ){
                data.values[data.count++] = x.value;
            }
        }

        var r = $reduce( data.key , data.values.slice( 0 , data.count ) );
        if ( r && r.length && r[0] ){ 
            data.values = r; 
            data.count = r.length;
        }
        else{ 
            data.values[0] = r;
            data.count = 1;
        }
        
        $max = Math.max( $max , data.count ); 
        
        if ( useDB ){
            if ( data.count == 1 ){
                tempcoll.save( { _id : data.key , value : data.values[0] } );
            }
            else {
                tempcoll.save( { _id : data.key , value : data.values.slice( 0 , data.count ) } );
            }
        }
    }
}

MR.check = function(){                        
    if ( $max < 2000 && $arr.length < 1000 ){ 
        return 0; 
    }
    MR.doReduce();
    if ( $max < 2000 && $arr.length < 1000 ){ 
        return 1;
    }
    MR.doReduce( true );
    $arr = []; 
    $max = 0; 
    reset_num();
    gc();
    return 2;
}

MR.finalize = function(){
    tempcoll.find().forEach( 
        function(z){
            z.value = $finalize( z._id , z.value );
            tempcoll.save( z );
        }
    );
}