diff options
author | Eliot Horowitz <eliot@10gen.com> | 2010-11-15 15:10:31 -0500 |
---|---|---|
committer | Eliot Horowitz <eliot@10gen.com> | 2010-11-15 15:10:31 -0500 |
commit | 94ba794b852279ac73c40b4c924f8b4487b2da8b (patch) | |
tree | 6f5bc7dad12436a85a42557d6a3c5519ce1b8fcc | |
parent | 1cafbccecade5457bb9a637619a34055c2936fbf (diff) | |
download | mongo-94ba794b852279ac73c40b4c924f8b4487b2da8b.tar.gz |
map/reduce into old collections merge or reduce SERVER-647
-rw-r--r-- | db/commands/mr.cpp | 88 | ||||
-rw-r--r-- | db/commands/mr.h | 24 | ||||
-rw-r--r-- | jstests/mr_merge.js | 51 | ||||
-rw-r--r-- | jstests/mr_outreduce.js | 41 |
4 files changed, 168 insertions, 36 deletions
diff --git a/db/commands/mr.cpp b/db/commands/mr.cpp index 722ee513e3f..6398ed7bc7f 100644 --- a/db/commands/mr.cpp +++ b/db/commands/mr.cpp @@ -35,7 +35,7 @@ namespace mongo { AtomicUInt MRSetup::JOB_NUMBER; - BSONObj reduceValues( BSONList& values , Scope * s , ScriptingFunction reduce , bool final , ScriptingFunction finalize ){ + BSONObj reduceValues( BSONList& values , MRReduceState * state , bool final ){ uassert( 10074 , "need values" , values.size() ); int sizeEstimate = ( values.size() * values.begin()->getField( "value" ).size() ) + 128; @@ -72,8 +72,8 @@ namespace mongo { valueBuilder->done(); BSONObj args = reduceArgs.obj(); - s->invokeSafe( reduce , args ); - if ( s->type( "return" ) == Array ){ + state->scope->invokeSafe( state->reduce , args ); + if ( state->scope->type( "return" ) == Array ){ uassert( 10075 , "reduce -> multiple not supported yet",0); return BSONObj(); } @@ -87,24 +87,24 @@ namespace mongo { } BSONObjBuilder temp( endSizeEstimate ); temp.append( key.firstElement() ); - s->append( temp , "1" , "return" ); + state->scope->append( temp , "1" , "return" ); x.push_back( temp.obj() ); - return reduceValues( x , s , reduce , final , finalize ); + return reduceValues( x , state , final ); } - if ( finalize ){ - Scope::NoDBAccess no = s->disableDBAccess( "can't access db inside finalize" ); + if ( state->finalize ){ + Scope::NoDBAccess no = state->scope->disableDBAccess( "can't access db inside finalize" ); BSONObjBuilder b(endSizeEstimate); b.appendAs( key.firstElement() , "_id" ); - s->append( b , "value" , "return" ); - s->invokeSafe( finalize , b.obj() ); + state->scope->append( b , "value" , "return" ); + state->scope->invokeSafe( state->finalize , b.obj() ); } BSONObjBuilder b(endSizeEstimate); b.appendAs( key.firstElement() , final ? "_id" : "0" ); - s->append( b , final ? "value" : "1" , "return" ); + state->scope->append( b , final ? "value" : "1" , "return" ); return b.obj(); } @@ -201,7 +201,7 @@ namespace mongo { return field._asCode(); } - long long MRSetup::renameIfNeeded( DBDirectClient& db ){ + long long MRSetup::renameIfNeeded( DBDirectClient& db , MRReduceState * state ){ assertInWriteLock(); if ( finalLong != tempLong ){ @@ -222,7 +222,31 @@ namespace mongo { db.dropCollection( tempLong ); } else if ( outType == REDUCE ){ - assert(0); + BSONList values; + + auto_ptr<DBClientCursor> cursor = db.query( tempLong , BSONObj() ); + while ( cursor->more() ){ + BSONObj temp = cursor->next(); + BSONObj old; + + bool found; + { + Client::Context tx( finalLong ); + found = Helpers::findOne( finalLong.c_str() , temp["_id"].wrap() , old , true ); + } + + if ( found ){ + // need to reduce + values.clear(); + values.push_back( temp ); + values.push_back( old ); + Helpers::upsert( finalLong , reduceValues( values , state , true ) ); + } + else { + Helpers::upsert( finalLong , temp ); + } + } + db.dropCollection( tempLong ); } else { assert(0); @@ -230,7 +254,7 @@ namespace mongo { } return db.count( finalLong ); } - + void MRSetup::insert( const string& ns , BSONObj& o ){ writelock l( ns ); Client::Context ctx( ns ); @@ -243,8 +267,12 @@ namespace mongo { } - MRState::MRState( MRSetup& s ) : setup(s){ - scope = globalScriptEngine->getPooledScope( setup.dbname ); + MRState::MRState( MRSetup& s ) + : setup(s){ + } + + void MRState::init(){ + scope.reset(globalScriptEngine->getPooledScope( setup.dbname ).release() ); scope->localConnect( setup.dbname.c_str() ); map = scope->createFunction( setup.mapCode.c_str() ); @@ -278,7 +306,7 @@ namespace mongo { return; BSONObj key = values.begin()->firstElement().wrap( "_id" ); - BSONObj res = reduceValues( values , scope.get() , reduce , 1 , finalize ); + BSONObj res = reduceValues( values , this , true ); setup.insert( setup.tempLong , res ); } @@ -307,7 +335,7 @@ namespace mongo { write( *(all.begin()) ); } else if ( all.size() > 1 ){ - BSONObj res = reduceValues( all , _state.scope.get() , _state.reduce , false , 0 ); + BSONObj res = reduceValues( all , &_state , false ); insert( res ); } } @@ -398,9 +426,11 @@ namespace mongo { BSONObjBuilder countsBuilder; BSONObjBuilder timingBuilder; + MRState state( mr ); + try { - - MRState state( mr ); + state.init(); + state.scope->injectNative( "emit" , fast_emit ); MRTL * mrtl = new MRTL( state ); @@ -578,7 +608,7 @@ namespace mongo { dblock lock; db.dropCollection( mr.incLong ); - finalCount = mr.renameIfNeeded( db ); + finalCount = mr.renameIfNeeded( db , &state ); } timingBuilder.append( "total" , t.millis() ); @@ -644,6 +674,8 @@ namespace mongo { } + MRReduceState state; + DBDirectClient db; { // reduce from each stream @@ -654,12 +686,11 @@ namespace mongo { Query().sort( sortKey ) ); cursor.init(); - auto_ptr<Scope> s = globalScriptEngine->getPooledScope( dbname ); - s->localConnect( dbname.c_str() ); - ScriptingFunction reduceFunction = s->createFunction( mr.reduceCode.c_str() ); - ScriptingFunction finalizeFunction = 0; + state.scope.reset( globalScriptEngine->getPooledScope( dbname ).release() ); + state.scope->localConnect( dbname.c_str() ); + state.reduce = state.scope->createFunction( mr.reduceCode.c_str() ); if ( mr.finalizeCode.size() ) - finalizeFunction = s->createFunction( mr.finalizeCode.c_str() ); + state.finalize = state.scope->createFunction( mr.finalizeCode.c_str() ); BSONList values; @@ -679,16 +710,17 @@ namespace mongo { } - db.insert( mr.tempLong , reduceValues( values , s.get() , reduceFunction , 1 , finalizeFunction ) ); + db.insert( mr.tempLong , reduceValues( values , &state , true ) ); values.clear(); values.push_back( t ); } if ( values.size() ) - db.insert( mr.tempLong , reduceValues( values , s.get() , reduceFunction , 1 , finalizeFunction ) ); + db.insert( mr.tempLong , reduceValues( values , &state , true ) ); } - long long finalCount = mr.renameIfNeeded( db ); + + long long finalCount = mr.renameIfNeeded( db , &state ); log(0) << " mapreducefinishcommand " << mr.finalLong << " " << finalCount << endl; for ( set<ServerAndQuery>::iterator i=servers.begin(); i!=servers.end(); i++ ){ diff --git a/db/commands/mr.h b/db/commands/mr.h index 7eec40078bc..9f8907b1de3 100644 --- a/db/commands/mr.h +++ b/db/commands/mr.h @@ -23,6 +23,18 @@ namespace mongo { namespace mr { + class MRReduceState { + public: + + MRReduceState() : reduce(), finalize(){} + + scoped_ptr<Scope> scope; + + ScriptingFunction reduce; + ScriptingFunction finalize; + }; + + typedef vector<BSONObj> BSONList; class MyCmp { @@ -50,7 +62,7 @@ namespace mongo { /** @return number objects in collection */ - long long renameIfNeeded( DBDirectClient& db ); + long long renameIfNeeded( DBDirectClient& db , MRReduceState * state ); void insert( const string& ns , BSONObj& o ); @@ -91,24 +103,20 @@ namespace mongo { static AtomicUInt JOB_NUMBER; }; // end MRsetup - /** * container for all stack based map/reduce state */ - class MRState { + class MRState : public MRReduceState { public: MRState( MRSetup& s ); - + void init(); + void finalReduce( BSONList& values ); MRSetup& setup; - auto_ptr<Scope> scope; DBDirectClient db; ScriptingFunction map; - ScriptingFunction reduce; - ScriptingFunction finalize; - }; /** diff --git a/jstests/mr_merge.js b/jstests/mr_merge.js new file mode 100644 index 00000000000..6393f188bf7 --- /dev/null +++ b/jstests/mr_merge.js @@ -0,0 +1,51 @@ + +t = db.mr_merge; +t.drop(); + +t.insert( { a : [ 1 , 2 ] } ) +t.insert( { a : [ 2 , 3 ] } ) +t.insert( { a : [ 3 , 4 ] } ) + +outName = "mr_merge_out"; +out = db[outName]; +out.drop(); + +m = function(){ for (i=0; i<this.a.length; i++ ) emit( this.a[i] , 1 ); } +r = function(k,vs){ return Array.sum( vs ); } + +function tos( o ){ + var s = ""; + for ( var i=0; i<100; i++ ){ + if ( o[i] ) + s += i + "_" + o[i]; + } + return s; +} + + +res = t.mapReduce( m , r , { out : outName } ) + + +expected = { "1" : 1 , "2" : 2 , "3" : 2 , "4" : 1 } +assert.eq( tos( expected ) , tos( res.convertToSingleObject() ) , "A" ); + +t.insert( { a : [ 4 , 5 ] } ) +out.insert( { _id : 10 , value : "5" } ) +res = t.mapReduce( m , r , { out : outName } ) + +expected["4"]++; +expected["5"] = 1 +assert.eq( tos( expected ) , tos( res.convertToSingleObject() ) , "B" ); + +t.insert( { a : [ 5 , 6 ] } ) +out.insert( { _id : 10 , value : "5" } ) +res = t.mapReduce( m , r , { out : outName , outType : "merge" } ) + +expected["5"]++; +expected["10"] = 5 +expected["6"] = 1 + +assert.eq( tos( expected ) , tos( res.convertToSingleObject() ) , "C" ); + + + diff --git a/jstests/mr_outreduce.js b/jstests/mr_outreduce.js new file mode 100644 index 00000000000..e5878366fe4 --- /dev/null +++ b/jstests/mr_outreduce.js @@ -0,0 +1,41 @@ + +t = db.mr_outreduce; +t.drop(); + +t.insert( { _id : 1 , a : [ 1 , 2 ] } ) +t.insert( { _id : 2 , a : [ 2 , 3 ] } ) +t.insert( { _id : 3 , a : [ 3 , 4 ] } ) + +outName = "mr_outreduce_out"; +out = db[outName]; +out.drop(); + +m = function(){ for (i=0; i<this.a.length; i++ ) emit( this.a[i] , 1 ); } +r = function(k,vs){ return Array.sum( vs ); } + +function tos( o ){ + var s = ""; + for ( var i=0; i<100; i++ ){ + if ( o[i] ) + s += i + "_" + o[i] + "|" + } + return s; +} + + +res = t.mapReduce( m , r , { out : outName } ) + + +expected = { "1" : 1 , "2" : 2 , "3" : 2 , "4" : 1 } +assert.eq( tos( expected ) , tos( res.convertToSingleObject() ) , "A" ); + +t.insert( { _id : 4 , a : [ 4 , 5 ] } ) +out.insert( { _id : 10 , value : "5" } ) // this is a sentinal to make sure it wasn't killed +res = t.mapReduce( m , r , { out : outName , outType : "reduce" , query : { _id : { $gt : 3 } } } ) + +expected["4"]++; +expected["5"] = 1 +expected["10"] = 5 +assert.eq( tos( expected ) , tos( res.convertToSingleObject() ) , "B" ); + + |