diff options
author | Eliot Horowitz <eliot@10gen.com> | 2009-11-04 15:25:45 -0500 |
---|---|---|
committer | Eliot Horowitz <eliot@10gen.com> | 2009-11-04 15:25:45 -0500 |
commit | d5048b8e6c972899180b57e716e57d887c7dcfab (patch) | |
tree | 4654d2f3dfad496ff398497551b59d91150bf706 /db/mr.cpp | |
parent | a5b8dc4e0def5566de33b70335bdc46b1e030f08 (diff) | |
download | mongo-d5048b8e6c972899180b57e716e57d887c7dcfab.tar.gz |
m/r refactor check point 1
Diffstat (limited to 'db/mr.cpp')
-rw-r--r-- | db/mr.cpp | 195 |
1 files changed, 123 insertions, 72 deletions
diff --git a/db/mr.cpp b/db/mr.cpp index 4e28bac9fb5..75d650eb0df 100644 --- a/db/mr.cpp +++ b/db/mr.cpp @@ -33,6 +33,96 @@ namespace mongo { typedef map< BSONObj,list<BSONObj>,BSONObjCmp > InMemory; typedef map< BSONObj,int,BSONObjCmp > KeyNums; + class MRSetup { + public: + MRSetup( const string& _dbname , const BSONObj& cmdObj , bool markAsTemp = true ){ + static int jobNumber = 1; + + dbname = _dbname; + ns = dbname + "." + cmdObj.firstElement().valuestr(); + + verbose = cmdObj["verbose"].trueValue(); + keeptemp = cmdObj["keeptemp"].trueValue(); + + { // setup names + stringstream ss; + if ( ! keeptemp ) + ss << "tmp."; + ss << "mr." << cmdObj.firstElement().fieldName() << "_" << time(0) << "_" << jobNumber++; + tempShort = ss.str(); + tempLong = dbname + "." + tempShort; + + if ( ! keeptemp && markAsTemp ) + cc().addTempCollection( tempLong ); + + if ( cmdObj["out"].type() == String ) + finalShort = cmdObj["out"].valuestr(); + else + finalShort = tempShort; + + finalLong = dbname + "." + finalShort; + + } + + { // code + mapCode = cmdObj["map"].ascode(); + reduceCode = cmdObj["reduce"].ascode(); + if ( cmdObj["finalize"].type() ){ + finalizeCode = cmdObj["finalize"].ascode(); + } + + } + + { // query options + if ( cmdObj["query"].type() == Object ){ + filter = cmdObj["query"].embeddedObjectUserCheck(); + q = filter; + } + + if ( cmdObj["sort"].type() == Object ) + q.sort( cmdObj["sort"].embeddedObjectUserCheck() ); + } + } + + /** + @return number objects in collection + */ + long long renameIfNeeded( DBDirectClient& db ){ + if ( finalLong != tempLong ){ + dblock l; + db.dropCollection( finalLong ); + BSONObj info; + uassert( "rename failed" , db.runCommand( "admin" , BSON( "renameCollection" << tempLong << "to" << finalLong ) , info ) ); + } + return db.count( finalLong ); + } + + string dbname; + string ns; + + // options + bool verbose; + bool keeptemp; + + // query options + + BSONObj filter; + Query q; + + // functions + + string mapCode; + string reduceCode; + string finalizeCode; + + // output tables + string tempShort; + string tempLong; + + string finalShort; + string finalLong; + + }; class MyCmp { public: @@ -214,16 +304,6 @@ namespace mongo { return BSONObj(); } - string tempCollectionName( const BSONObj& cmd ){ - static int inc = 1; - stringstream ss; - ss << cc().database()->name << "."; - if ( ! cmd["keeptemp"].trueValue() ) - ss << "tmp."; - ss << "mr." << cmd.firstElement().fieldName() << "_" << time(0) << "_" << inc++; - return ss.str(); - } - class MapReduceCommand : public Command { public: MapReduceCommand() : Command("mapreduce"){} @@ -256,35 +336,22 @@ namespace mongo { doReduce( resultColl , values , s , reduce ); } - bool run(const char *dbname, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl ){ + bool run(const char *dbname, BSONObj& cmd, string& errmsg, BSONObjBuilder& result, bool fromRepl ){ Timer t; Client::GodScope cg; - bool verboseOutput = cmdObj["verbose"].trueValue(); + MRSetup mr( cc().database()->name , cmd ); - string ns = cc().database()->name + '.' + cmdObj.firstElement().valuestr(); - log(1) << "mr ns: " << ns << endl; + log(1) << "mr ns: " << mr.ns << endl; - if ( ! db.exists( ns ) ){ + if ( ! db.exists( mr.ns ) ){ errmsg = "ns doesn't exist"; return false; } - - auto_ptr<Scope> s = globalScriptEngine->getPooledScope( ns ); - s->localConnect( cc().database()->name.c_str() ); + auto_ptr<Scope> s = globalScriptEngine->getPooledScope( mr.dbname ); + s->localConnect( mr.dbname.c_str() ); - bool istemp = ! cmdObj["keeptemp"].trueValue(); - string resultColl = tempCollectionName( cmdObj ); - if ( istemp ) - currentClient->addTempCollection( resultColl ); - string finalOutput = resultColl; - if ( cmdObj["out"].type() == String ) - finalOutput = cc().database()->name + "." + cmdObj["out"].valuestr(); - - string resultCollShort = resultColl.substr( cc().database()->name.size() + 1 ); - string finalOutputShort = finalOutput.substr( cc().database()->name.size() + 1 ); - log(1) << "\t resultColl: " << resultColl << " short: " << resultCollShort << endl; - db.dropCollection( resultColl ); + db.dropCollection( mr.tempLong ); long long num = 0; long long inReduce = 0; @@ -294,43 +361,33 @@ namespace mongo { try { dbtemprelease temprlease; - s->execSetup( (string)"tempcoll = db[\"" + resultCollShort + "\"]; db.getMongo().setSlaveOk();" , "tempcoll1" ); + s->execSetup( (string)"tempcoll = db[\"" + mr.tempShort + "\"]; db.getMongo().setSlaveOk();" , "tempcoll1" ); s->execSetup( "MR.init()" ); s->injectNative( "get_num" , get_num ); s->injectNative( "reset_num" , reset_num ); - ScriptingFunction mapFunction = s->createFunction( cmdObj["map"].ascode().c_str() ); - ScriptingFunction reduceFunction = s->createFunction( cmdObj["reduce"].ascode().c_str() ); - s->execSetup( (string)"$reduce = " + cmdObj["reduce"].ascode() ); + ScriptingFunction mapFunction = s->createFunction( mr.mapCode.c_str() ); + ScriptingFunction reduceFunction = s->createFunction( mr.reduceCode.c_str() ); + s->execSetup( (string)"$reduce = " + mr.reduceCode ); - MRTL * mrtl = new MRTL( &db , resultColl , s.get() , reduceFunction ); + MRTL * mrtl = new MRTL( &db , mr.tempLong , s.get() , reduceFunction ); _tlmr.reset( mrtl ); - Query q; - BSONObj filter; - if ( cmdObj["query"].type() == Object ){ - filter = cmdObj["query"].embeddedObjectUserCheck(); - q = filter; - } - - if ( cmdObj["sort"].type() == Object ) - q.sort( cmdObj["sort"].embeddedObjectUserCheck() ); - - ProgressMeter pm( db.count( ns , filter ) ); - auto_ptr<DBClientCursor> cursor = db.query( ns , q ); + ProgressMeter pm( db.count( mr.ns , mr.filter ) ); + auto_ptr<DBClientCursor> cursor = db.query( mr.ns , mr.q ); long long mapTime = 0; Timer mt; while ( cursor->more() ){ BSONObj o = cursor->next(); - if ( verboseOutput ) mt.reset(); + if ( mr.verbose ) mt.reset(); s->setThis( &o ); if ( s->invoke( mapFunction , BSONObj() , 0 , true ) ) throw UserException( (string)"map invoke failed: " + s->getError() ); - if ( verboseOutput ) mapTime += mt.micros(); + if ( mr.verbose ) mapTime += mt.micros(); num++; if ( num % 100 == 0 ){ @@ -361,13 +418,13 @@ namespace mongo { inReduce += t.micros(); timingBuilder.append( "reduce" , inReduce / 1000 ); } - if ( verboseOutput ){ + if ( mr.verbose ){ countsBuilder.append( "reduces" , s->getNumber( "$numReduces" ) ); countsBuilder.append( "reducesToDB" , s->getNumber( "$numReducesToDB" ) ); } - if ( cmdObj["finalize"].type() ){ - s->execSetup( (string)"$finalize = " + cmdObj["finalize"].ascode() ); + if ( mr.finalizeCode.size() ){ + s->execSetup( (string)"$finalize = " + mr.finalizeCode ); s->execSetup( "MR.finalize()" ); } @@ -399,29 +456,24 @@ namespace mongo { } catch ( ... ){ log() << "mr failed, removing collection" << endl; - db.dropCollection( resultColl ); + db.dropCollection( mr.tempLong ); throw; } - if ( finalOutput != resultColl ){ - // need to do this with the full dblock, that's why its after the try/catch - db.dropCollection( finalOutput ); - BSONObj info; - uassert( "rename failed" , db.runCommand( "admin" , BSON( "renameCollection" << resultColl << "to" << finalOutput ) , info ) ); - } + long long finalCount = mr.renameIfNeeded( db ); - if ( db.count( finalOutput ) == 0 && numEmits > 0 ){ + if ( finalCount == 0 && numEmits > 0 ){ errmsg = "there were emits but no data!"; return false; } timingBuilder.append( "total" , t.millis() ); - result.append( "result" , finalOutputShort ); + result.append( "result" , mr.finalShort ); result.append( "timeMillis" , t.millis() ); - countsBuilder.append( "output" , (long long)(db.count( finalOutput )) ); - if ( verboseOutput ) result.append( "timing" , timingBuilder.obj() ); + countsBuilder.append( "output" , finalCount ); + if ( mr.verbose ) result.append( "timing" , timingBuilder.obj() ); result.append( "counts" , countsBuilder.obj() ); return true; @@ -443,7 +495,7 @@ namespace mongo { string dbname = cc().database()->name; string shardedOutputCollection = cmdObj["shardedOutputCollection"].valuestrsafe(); - BSONObj origCmd = cmdObj.firstElement().embeddedObjectUserCheck(); + MRSetup mr( dbname , cmdObj.firstElement().embeddedObjectUserCheck() , false ); set<ServerAndQuery> servers; @@ -478,15 +530,11 @@ namespace mongo { auto_ptr<Scope> s = globalScriptEngine->getPooledScope( ns ); - ScriptingFunction reduceFunction = s->createFunction( origCmd["reduce"].ascode().c_str() ); + ScriptingFunction reduceFunction = s->createFunction( mr.reduceCode.c_str() ); list<BSONObj> values; - string output = origCmd["out"].valuestrsafe(); - if ( output.size() == 0 ) - output = tempCollectionName( origCmd ); - string fulloutput = dbname + "." + output; - result.append( "result" , output ); + result.append( "result" , mr.finalShort ); DBDirectClient db; @@ -503,14 +551,17 @@ namespace mongo { continue; } - db.insert( fulloutput , reduceValues( values , s.get() , reduceFunction ) ); + db.insert( mr.tempLong , reduceValues( values , s.get() , reduceFunction ) ); values.clear(); values.push_back( t ); } if ( values.size() ) - db.insert( fulloutput , reduceValues( values , s.get() , reduceFunction ) ); + db.insert( mr.tempLong , reduceValues( values , s.get() , reduceFunction ) ); + long long finalCount = mr.renameIfNeeded( db ); + log(0) << " mapreducefinishcommand " << mr.finalLong << " " << finalCount << endl; + for ( set<ServerAndQuery>::iterator i=servers.begin(); i!=servers.end(); i++ ){ ScopedDbConnection conn( i->_server ); conn->dropCollection( dbname + "." + shardedOutputCollection ); |