// mr.cpp /** * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "stdafx.h" #include "db.h" #include "instance.h" #include "commands.h" #include "../scripting/engine.h" #include "../client/dbclient.h" #include "../client/connpool.h" #include "../client/parallel.h" namespace mongo { namespace mr { class MyCmp { public: MyCmp(){} bool operator()( const BSONObj &l, const BSONObj &r ) const { return l.firstElement().woCompare( r.firstElement() ) < 0; } }; typedef pair Data; //typedef list< Data > InMemory; typedef map< BSONObj,list,MyCmp > InMemory; BSONObj reduceValues( list& values , Scope * s , ScriptingFunction reduce , bool final , ScriptingFunction finalize ){ uassert( "need values" , values.size() ); int sizeEstimate = ( values.size() * values.begin()->getField( "value" ).size() ) + 128; BSONObj key; BSONObjBuilder reduceArgs( sizeEstimate ); BSONObjBuilder valueBuilder( sizeEstimate ); int n = 0; for ( list::iterator i=values.begin(); i!=values.end(); i++){ BSONObj o = *i; BSONObjIterator j(o); BSONElement keyE = j.next(); if ( n == 0 ){ reduceArgs.append( keyE ); BSONObjBuilder temp; temp.append( keyE ); key = temp.obj(); } valueBuilder.appendAs( j.next() , BSONObjBuilder::numStr( n++ ).c_str() ); } reduceArgs.appendArray( "values" , valueBuilder.obj() ); BSONObj args = reduceArgs.obj(); s->invokeSafe( reduce , args ); if ( s->type( "return" ) == Array ){ uassert("reduce -> multiple not supported yet",0); return BSONObj(); } if ( finalize ){ BSONObjBuilder b; b.appendAs( key.firstElement() , "_id" ); s->append( b , "value" , "return" ); s->invokeSafe( finalize , b.obj() ); } BSONObjBuilder b; b.appendAs( key.firstElement() , final ? "_id" : "0" ); s->append( b , final ? "value" : "1" , "return" ); return b.obj(); } class MRSetup { public: MRSetup( const string& _dbname , const BSONObj& cmdObj , bool markAsTemp = true ){ static int jobNumber = 1; dbname = _dbname; ns = dbname + "." + cmdObj.firstElement().valuestr(); verbose = cmdObj["verbose"].trueValue(); keeptemp = cmdObj["keeptemp"].trueValue(); { // setup names stringstream ss; if ( ! keeptemp ) ss << "tmp."; ss << "mr." << cmdObj.firstElement().fieldName() << "_" << time(0) << "_" << jobNumber++; tempShort = ss.str(); tempLong = dbname + "." + tempShort; incLong = tempLong + "_inc"; if ( ! keeptemp && markAsTemp ) cc().addTempCollection( tempLong ); if ( cmdObj["out"].type() == String ) finalShort = cmdObj["out"].valuestr(); else finalShort = tempShort; finalLong = dbname + "." + finalShort; } { // code mapCode = cmdObj["map"].ascode(); reduceCode = cmdObj["reduce"].ascode(); if ( cmdObj["finalize"].type() ){ finalizeCode = cmdObj["finalize"].ascode(); } if ( cmdObj["mapparams"].type() == Array ){ mapparams = cmdObj["mapparams"].embeddedObjectUserCheck(); } if ( cmdObj["scope"].type() == Object ){ scopeSetup = cmdObj["scope"].embeddedObjectUserCheck(); } } { // query options if ( cmdObj["query"].type() == Object ){ filter = cmdObj["query"].embeddedObjectUserCheck(); q = filter; } if ( cmdObj["sort"].type() == Object ) q.sort( cmdObj["sort"].embeddedObjectUserCheck() ); if ( cmdObj["limit"].isNumber() ) limit = cmdObj["limit"].numberLong(); else limit = 0; } } /** @return number objects in collection */ long long renameIfNeeded( DBDirectClient& db ){ if ( finalLong != tempLong ){ db.dropCollection( finalLong ); if ( db.count( tempLong ) ){ BSONObj info; uassert( "rename failed" , db.runCommand( "admin" , BSON( "renameCollection" << tempLong << "to" << finalLong ) , info ) ); } } return db.count( finalLong ); } string dbname; string ns; // options bool verbose; bool keeptemp; // query options BSONObj filter; Query q; long long limit; // functions string mapCode; string reduceCode; string finalizeCode; BSONObj mapparams; BSONObj scopeSetup; // output tables string incLong; string tempShort; string tempLong; string finalShort; string finalLong; }; // end MRsetup class MRState { public: MRState( MRSetup& s ) : setup(s){ scope = globalScriptEngine->getPooledScope( setup.dbname ); scope->localConnect( setup.dbname.c_str() ); map = scope->createFunction( setup.mapCode.c_str() ); if ( ! map ) throw UserException( (string)"map compile failed: " + scope->getError() ); reduce = scope->createFunction( setup.reduceCode.c_str() ); if ( ! reduce ) throw UserException( (string)"reduce compile failed: " + scope->getError() ); if ( setup.finalizeCode.size() ) finalize = scope->createFunction( setup.finalizeCode.c_str() ); else finalize = 0; if ( ! setup.scopeSetup.isEmpty() ) scope->init( &setup.scopeSetup ); db.dropCollection( setup.tempLong ); db.dropCollection( setup.incLong ); writelock l( setup.incLong ); string err; assert( userCreateNS( setup.incLong.c_str() , BSON( "autoIndexId" << 0 ) , err , false ) ); } void finalReduce( list& values ){ if ( values.size() == 0 ) return; BSONObj key = values.begin()->firstElement().wrap( "_id" ); BSONObj res = reduceValues( values , scope.get() , reduce , 1 , finalize ); writelock l( setup.tempLong ); theDataFileMgr.insertAndLog( setup.tempLong.c_str() , res , false ); } MRSetup& setup; auto_ptr scope; DBDirectClient db; ScriptingFunction map; ScriptingFunction reduce; ScriptingFunction finalize; }; class MRTL { public: MRTL( MRState& state ) : _state( state ){ _temp = new InMemory(); _size = 0; numEmits = 0; } ~MRTL(){ delete _temp; } void reduceInMemory(){ InMemory * old = _temp; InMemory * n = new InMemory(); _temp = n; _size = 0; for ( InMemory::iterator i=old->begin(); i!=old->end(); i++ ){ BSONObj key = i->first; list& all = i->second; if ( all.size() == 1 ){ // this key has low cardinality, so just write to db writelock l(_state.setup.incLong); write( *(all.begin()) ); } else if ( all.size() > 1 ){ BSONObj res = reduceValues( all , _state.scope.get() , _state.reduce , false , 0 ); insert( res ); } } delete( old ); } void dump(){ writelock l(_state.setup.incLong); for ( InMemory::iterator i=_temp->begin(); i!=_temp->end(); i++ ){ list& all = i->second; if ( all.size() < 1 ) continue; for ( list::iterator j=all.begin(); j!=all.end(); j++ ) write( *j ); } _temp->clear(); _size = 0; } void insert( const BSONObj& a ){ list& all = (*_temp)[a]; all.push_back( a ); _size += a.objsize() + 16; } void checkSize(){ if ( _size < 1024 * 5 ) return; long before = _size; reduceInMemory(); log(1) << " mr: did reduceInMemory " << before << " -->> " << _size << endl; if ( _size < 1024 * 15 ) return; dump(); log(1) << " mr: dumping to db" << endl; } private: void write( BSONObj& o ){ theDataFileMgr.insert( _state.setup.incLong.c_str() , o , true ); } MRState& _state; InMemory * _temp; long _size; public: long long numEmits; }; boost::thread_specific_ptr _tlmr; BSONObj fast_emit( const BSONObj& args ){ uassert( "fast_emit takes 2 args" , args.nFields() == 2 ); _tlmr->insert( args ); _tlmr->numEmits++; return BSONObj(); } class MapReduceCommand : public Command { public: MapReduceCommand() : Command("mapreduce"){} virtual bool slaveOk() { return true; } virtual void help( stringstream &help ) const { help << "see http://www.mongodb.org/display/DOCS/MapReduce"; } bool run(const char *dbname, BSONObj& cmd, string& errmsg, BSONObjBuilder& result, bool fromRepl ){ Timer t; Client::GodScope cg; MRSetup mr( cc().database()->name , cmd ); log(1) << "mr ns: " << mr.ns << endl; if ( ! db.exists( mr.ns ) ){ errmsg = "ns doesn't exist"; return false; } bool shouldHaveData = false; long long num = 0; long long inReduce = 0; BSONObjBuilder countsBuilder; BSONObjBuilder timingBuilder; try { dbtemprelease temprlease; MRState state( mr ); state.scope->injectNative( "emit" , fast_emit ); MRTL * mrtl = new MRTL( state ); _tlmr.reset( mrtl ); ProgressMeter pm( db.count( mr.ns , mr.filter ) ); auto_ptr cursor = db.query( mr.ns , mr.q ); long long mapTime = 0; Timer mt; while ( cursor->more() ){ BSONObj o = cursor->next(); if ( mr.verbose ) mt.reset(); state.scope->setThis( &o ); if ( state.scope->invoke( state.map , state.setup.mapparams , 0 , true ) ) throw UserException( (string)"map invoke failed: " + state.scope->getError() ); if ( mr.verbose ) mapTime += mt.micros(); num++; if ( num % 100 == 0 ){ Timer t; mrtl->checkSize(); inReduce += t.micros(); } pm.hit(); if ( mr.limit && num >= mr.limit ) break; } countsBuilder.append( "input" , num ); countsBuilder.append( "emit" , mrtl->numEmits ); if ( mrtl->numEmits ) shouldHaveData = true; timingBuilder.append( "mapTime" , mapTime / 1000 ); timingBuilder.append( "emitLoop" , t.millis() ); // final reduce mrtl->reduceInMemory(); mrtl->dump(); BSONObj sortKey = BSON( "0" << 1 ); db.ensureIndex( mr.incLong , sortKey ); BSONObj prev; list all; ProgressMeter fpm( db.count( mr.incLong ) ); cursor = db.query( mr.incLong, Query().sort( sortKey ) ); while ( cursor->more() ){ BSONObj o = cursor->next().getOwned(); if ( o.woSortOrder( prev , sortKey ) == 0 ){ all.push_back( o ); continue; } state.finalReduce( all ); all.clear(); prev = o; all.push_back( o ); fpm.hit(); } state.finalReduce( all ); _tlmr.reset( 0 ); } catch ( ... ){ log() << "mr failed, removing collection" << endl; db.dropCollection( mr.tempLong ); db.dropCollection( mr.incLong ); throw; } db.dropCollection( mr.incLong ); long long finalCount = mr.renameIfNeeded( db ); if ( finalCount == 0 && shouldHaveData ){ errmsg = "there were emits but no data!"; return false; } timingBuilder.append( "total" , t.millis() ); result.append( "result" , mr.finalShort ); result.append( "timeMillis" , t.millis() ); countsBuilder.append( "output" , finalCount ); if ( mr.verbose ) result.append( "timing" , timingBuilder.obj() ); result.append( "counts" , countsBuilder.obj() ); return true; } private: DBDirectClient db; } mapReduceCommand; class MapReduceFinishCommand : public Command { public: MapReduceFinishCommand() : Command( "mapreduce.shardedfinish" ){} virtual bool slaveOk() { return true; } bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ dbtemprelease temprlease; // we don't touch the db directly string dbname = cc().database()->name; string shardedOutputCollection = cmdObj["shardedOutputCollection"].valuestrsafe(); MRSetup mr( dbname , cmdObj.firstElement().embeddedObjectUserCheck() , false ); set servers; BSONObjBuilder shardCounts; map counts; BSONObj shards = cmdObj["shards"].embeddedObjectUserCheck(); vector< auto_ptr > shardCursors; BSONObjIterator i( shards ); while ( i.more() ){ BSONElement e = i.next(); string shard = e.fieldName(); BSONObj res = e.embeddedObjectUserCheck(); uassert( "something bad happened" , shardedOutputCollection == res["result"].valuestrsafe() ); servers.insert( shard ); shardCounts.appendAs( res["counts"] , shard.c_str() ); BSONObjIterator j( res["counts"].embeddedObjectUserCheck() ); while ( j.more() ){ BSONElement temp = j.next(); counts[temp.fieldName()] += temp.numberLong(); } } BSONObj sortKey = BSON( "_id" << 1 ); ParallelSortClusteredCursor cursor( servers , dbname + "." + shardedOutputCollection , Query().sort( sortKey ) ); auto_ptr s = globalScriptEngine->getPooledScope( ns ); ScriptingFunction reduceFunction = s->createFunction( mr.reduceCode.c_str() ); ScriptingFunction finalizeFunction = 0; if ( mr.finalizeCode.size() ) finalizeFunction = s->createFunction( mr.finalizeCode.c_str() ); list values; result.append( "result" , mr.finalShort ); DBDirectClient db; while ( cursor.more() ){ BSONObj t = cursor.next(); if ( values.size() == 0 ){ values.push_back( t ); continue; } if ( t.woSortOrder( *(values.begin()) , sortKey ) == 0 ){ values.push_back( t ); continue; } db.insert( mr.tempLong , reduceValues( values , s.get() , reduceFunction , 1 , finalizeFunction ) ); values.clear(); values.push_back( t ); } if ( values.size() ) db.insert( mr.tempLong , reduceValues( values , s.get() , reduceFunction , 1 , finalizeFunction ) ); long long finalCount = mr.renameIfNeeded( db ); log(0) << " mapreducefinishcommand " << mr.finalLong << " " << finalCount << endl; for ( set::iterator i=servers.begin(); i!=servers.end(); i++ ){ ScopedDbConnection conn( i->_server ); conn->dropCollection( dbname + "." + shardedOutputCollection ); } result.append( "shardCounts" , shardCounts.obj() ); { BSONObjBuilder c; for ( map::iterator i=counts.begin(); i!=counts.end(); i++ ){ c.append( i->first , i->second ); } result.append( "counts" , c.obj() ); } return 1; } } mapReduceFinishCommand; } }