diff options
author | Eliot Horowitz <eliot@10gen.com> | 2010-11-15 14:39:42 -0500 |
---|---|---|
committer | Eliot Horowitz <eliot@10gen.com> | 2010-11-15 14:39:56 -0500 |
commit | 1cafbccecade5457bb9a637619a34055c2936fbf (patch) | |
tree | fcc85ee8d76e536c43d78a065274ef172e3a6687 /db | |
parent | 1bb47c4a9289e24cb9593b274540b1d7640f53f6 (diff) | |
download | mongo-1cafbccecade5457bb9a637619a34055c2936fbf.tar.gz |
cleaning m/r code
Diffstat (limited to 'db')
-rw-r--r-- | db/commands/mr.cpp | 475 | ||||
-rw-r--r-- | db/commands/mr.h | 145 |
2 files changed, 343 insertions, 277 deletions
diff --git a/db/commands/mr.cpp b/db/commands/mr.cpp index fd7bca7d04e..722ee513e3f 100644 --- a/db/commands/mr.cpp +++ b/db/commands/mr.cpp @@ -27,23 +27,13 @@ #include "../matcher.h" #include "../clientcursor.h" +#include "mr.h" + namespace mongo { namespace mr { - typedef vector<BSONObj> BSONList; - - class MyCmp { - public: - MyCmp(){} - bool operator()( const BSONObj &l, const BSONObj &r ) const { - return l.firstElement().woCompare( r.firstElement() ) < 0; - } - }; - - typedef pair<BSONObj,BSONObj> Data; - //typedef list< Data > InMemory; - typedef map< BSONObj,BSONList,MyCmp > InMemory; + AtomicUInt MRSetup::JOB_NUMBER; BSONObj reduceValues( BSONList& values , Scope * s , ScriptingFunction reduce , bool final , ScriptingFunction finalize ){ uassert( 10074 , "need values" , values.size() ); @@ -118,321 +108,252 @@ namespace mongo { return b.obj(); } - class MRSetup { - public: - MRSetup( const string& _dbname , const BSONObj& cmdObj , bool markAsTemp = true ){ - static int jobNumber = 1; + MRSetup::MRSetup( const string& _dbname , const BSONObj& cmdObj , bool markAsTemp ){ + + dbname = _dbname; + ns = dbname + "." + cmdObj.firstElement().valuestr(); + + verbose = cmdObj["verbose"].trueValue(); + keeptemp = cmdObj["keeptemp"].trueValue(); + + { // setup names + stringstream ss; + if ( ! keeptemp ) + ss << "tmp."; + ss << "mr." << cmdObj.firstElement().String() << "_" << time(0) << "_" << JOB_NUMBER++; + tempShort = ss.str(); + tempLong = dbname + "." + tempShort; + incLong = tempLong + "_inc"; - dbname = _dbname; - ns = dbname + "." + cmdObj.firstElement().valuestr(); + if ( ! keeptemp && markAsTemp ) + cc().addTempCollection( tempLong ); - verbose = cmdObj["verbose"].trueValue(); - keeptemp = cmdObj["keeptemp"].trueValue(); - - { // setup names - stringstream ss; - if ( ! keeptemp ) - ss << "tmp."; - ss << "mr." << cmdObj.firstElement().String() << "_" << time(0) << "_" << jobNumber++; - tempShort = ss.str(); - tempLong = dbname + "." + tempShort; - incLong = tempLong + "_inc"; - - if ( ! keeptemp && markAsTemp ) - cc().addTempCollection( tempLong ); - - replicate = keeptemp; - - if ( cmdObj["out"].type() == String ){ - finalShort = cmdObj["out"].valuestr(); - replicate = true; - } - else - finalShort = tempShort; + replicate = keeptemp; + + if ( cmdObj["out"].type() == String ){ + finalShort = cmdObj["out"].valuestr(); + replicate = true; + } + else + finalShort = tempShort; - finalLong = dbname + "." + finalShort; + finalLong = dbname + "." + finalShort; - } + } - if ( cmdObj["outType"].type() == String ){ - uassert( 13521 , "need 'out' if using 'outType'" , cmdObj["out"].type() == String ); - string t = cmdObj["outType"].String(); - if ( t == "normal" ) - outType = NORMAL; - else if ( t == "merge" ) - outType = MERGE; - else if ( t == "reduce" ) - outType = REDUCE; - else - uasserted( 13522 , str::stream() << "unknown outType [" << t << "]" ); - } - else { + if ( cmdObj["outType"].type() == String ){ + uassert( 13521 , "need 'out' if using 'outType'" , cmdObj["out"].type() == String ); + string t = cmdObj["outType"].String(); + if ( t == "normal" ) outType = NORMAL; - } + else if ( t == "merge" ) + outType = MERGE; + else if ( t == "reduce" ) + outType = REDUCE; + else + uasserted( 13522 , str::stream() << "unknown outType [" << t << "]" ); + } + else { + outType = NORMAL; + } - { // scope and code - // NOTE: function scopes are merged with m/r scope, not nested like they should be - BSONObjBuilder scopeBuilder; + { // scope and code + // NOTE: function scopes are merged with m/r scope, not nested like they should be + BSONObjBuilder scopeBuilder; - if ( cmdObj["scope"].type() == Object ){ - scopeBuilder.appendElements( cmdObj["scope"].embeddedObjectUserCheck() ); - } + if ( cmdObj["scope"].type() == Object ){ + scopeBuilder.appendElements( cmdObj["scope"].embeddedObjectUserCheck() ); + } - mapCode = scopeAndCode( scopeBuilder, cmdObj["map"] ); - reduceCode = scopeAndCode( scopeBuilder, cmdObj["reduce"] ); - if ( cmdObj["finalize"].type() ){ - finalizeCode = scopeAndCode( scopeBuilder, cmdObj["finalize"] ); - } + mapCode = scopeAndCode( scopeBuilder, cmdObj["map"] ); + reduceCode = scopeAndCode( scopeBuilder, cmdObj["reduce"] ); + if ( cmdObj["finalize"].type() ){ + finalizeCode = scopeAndCode( scopeBuilder, cmdObj["finalize"] ); + } - scopeSetup = scopeBuilder.obj(); + scopeSetup = scopeBuilder.obj(); - if ( cmdObj["mapparams"].type() == Array ){ - mapparams = cmdObj["mapparams"].embeddedObjectUserCheck(); - } - + if ( cmdObj["mapparams"].type() == Array ){ + mapparams = cmdObj["mapparams"].embeddedObjectUserCheck(); } + + } - { // query options - if ( cmdObj["query"].type() == Object ){ - filter = cmdObj["query"].embeddedObjectUserCheck(); - } + { // query options + if ( cmdObj["query"].type() == Object ){ + filter = cmdObj["query"].embeddedObjectUserCheck(); + } - if ( cmdObj["sort"].type() == Object ){ - sort = cmdObj["sort"].embeddedObjectUserCheck(); - } - - if ( cmdObj["limit"].isNumber() ) - limit = cmdObj["limit"].numberLong(); - else - limit = 0; + if ( cmdObj["sort"].type() == Object ){ + sort = cmdObj["sort"].embeddedObjectUserCheck(); } - } - /** Field expected to be a Code or CodeWScope. - * Add its scope, if any, to scopeBuilder, and return its code. - * Scopes added later will shadow those added earlier. */ - static string scopeAndCode (BSONObjBuilder& scopeBuilder, const BSONElement& field) { - if ( field.type() == CodeWScope ) - scopeBuilder.appendElements( field.codeWScopeObject() ); - return field._asCode(); + if ( cmdObj["limit"].isNumber() ) + limit = cmdObj["limit"].numberLong(); + else + limit = 0; } + } - /** - @return number objects in collection - */ - long long renameIfNeeded( DBDirectClient& db ){ - assertInWriteLock(); - if ( finalLong != tempLong ){ + string MRSetup::scopeAndCode (BSONObjBuilder& scopeBuilder, const BSONElement& field) { + if ( field.type() == CodeWScope ) + scopeBuilder.appendElements( field.codeWScopeObject() ); + return field._asCode(); + } + + long long MRSetup::renameIfNeeded( DBDirectClient& db ){ + assertInWriteLock(); + if ( finalLong != tempLong ){ - if ( outType == NORMAL ){ - db.dropCollection( finalLong ); - if ( db.count( tempLong ) ){ - BSONObj info; - uassert( 10076 , "rename failed" , - db.runCommand( "admin" , BSON( "renameCollection" << tempLong << "to" << finalLong ) , info ) ); - } + if ( outType == NORMAL ){ + db.dropCollection( finalLong ); + if ( db.count( tempLong ) ){ + BSONObj info; + uassert( 10076 , "rename failed" , + db.runCommand( "admin" , BSON( "renameCollection" << tempLong << "to" << finalLong ) , info ) ); } - else if ( outType == MERGE ){ - auto_ptr<DBClientCursor> cursor = db.query( tempLong , BSONObj() ); - while ( cursor->more() ){ - BSONObj o = cursor->next(); - Helpers::upsert( finalLong , o ); - } - db.dropCollection( tempLong ); - } - else if ( outType == REDUCE ){ - assert(0); - } - else { - assert(0); + } + else if ( outType == MERGE ){ + auto_ptr<DBClientCursor> cursor = db.query( tempLong , BSONObj() ); + while ( cursor->more() ){ + BSONObj o = cursor->next(); + Helpers::upsert( finalLong , o ); } + db.dropCollection( tempLong ); + } + else if ( outType == REDUCE ){ + assert(0); + } + else { + assert(0); } - return db.count( finalLong ); } + return db.count( finalLong ); + } - void insert( const string& ns , BSONObj& o ){ - writelock l( ns ); - Client::Context ctx( ns ); + void MRSetup::insert( const string& ns , BSONObj& o ){ + writelock l( ns ); + Client::Context ctx( ns ); - if ( replicate ) - theDataFileMgr.insertAndLog( ns.c_str() , o , false ); - else - theDataFileMgr.insertWithObjMod( ns.c_str() , o , false ); + if ( replicate ) + theDataFileMgr.insertAndLog( ns.c_str() , o , false ); + else + theDataFileMgr.insertWithObjMod( ns.c_str() , o , false ); - } + } - string dbname; - string ns; - - // options - bool verbose; - bool keeptemp; - bool replicate; - // query options + MRState::MRState( MRSetup& s ) : setup(s){ + scope = globalScriptEngine->getPooledScope( setup.dbname ); + scope->localConnect( setup.dbname.c_str() ); - BSONObj filter; - BSONObj sort; - long long limit; - - // functions + map = scope->createFunction( setup.mapCode.c_str() ); + if ( ! map ) + throw UserException( 9012, (string)"map compile failed: " + scope->getError() ); - string mapCode; - string reduceCode; - string finalizeCode; + reduce = scope->createFunction( setup.reduceCode.c_str() ); + if ( ! reduce ) + throw UserException( 9013, (string)"reduce compile failed: " + scope->getError() ); - BSONObj mapparams; - BSONObj scopeSetup; + if ( setup.finalizeCode.size() ) + finalize = scope->createFunction( setup.finalizeCode.c_str() ); + else + finalize = 0; - // output tables - string incLong; + if ( ! setup.scopeSetup.isEmpty() ) + scope->init( &setup.scopeSetup ); - string tempShort; - string tempLong; + db.dropCollection( setup.tempLong ); + db.dropCollection( setup.incLong ); - string finalShort; - string finalLong; - - enum { NORMAL , MERGE , REDUCE } outType; + writelock l( setup.incLong ); + Client::Context ctx( setup.incLong ); + string err; + assert( userCreateNS( setup.incLong.c_str() , BSON( "autoIndexId" << 0 ) , err , false ) ); - }; // end MRsetup - - class MRState { - public: - MRState( MRSetup& s ) : setup(s){ - scope = globalScriptEngine->getPooledScope( setup.dbname ); - scope->localConnect( setup.dbname.c_str() ); - - map = scope->createFunction( setup.mapCode.c_str() ); - if ( ! map ) - throw UserException( 9012, (string)"map compile failed: " + scope->getError() ); - - reduce = scope->createFunction( setup.reduceCode.c_str() ); - if ( ! reduce ) - throw UserException( 9013, (string)"reduce compile failed: " + scope->getError() ); - - if ( setup.finalizeCode.size() ) - finalize = scope->createFunction( setup.finalizeCode.c_str() ); - else - finalize = 0; - - if ( ! setup.scopeSetup.isEmpty() ) - scope->init( &setup.scopeSetup ); - - db.dropCollection( setup.tempLong ); - db.dropCollection( setup.incLong ); - - writelock l( setup.incLong ); - Client::Context ctx( setup.incLong ); - string err; - assert( userCreateNS( setup.incLong.c_str() , BSON( "autoIndexId" << 0 ) , err , false ) ); - - } - - void finalReduce( BSONList& values ){ - if ( values.size() == 0 ) - return; - - BSONObj key = values.begin()->firstElement().wrap( "_id" ); - BSONObj res = reduceValues( values , scope.get() , reduce , 1 , finalize ); - - setup.insert( setup.tempLong , res ); - } - + } + + void MRState::finalReduce( BSONList& values ){ + if ( values.size() == 0 ) + return; - MRSetup& setup; - auto_ptr<Scope> scope; - DBDirectClient db; + BSONObj key = values.begin()->firstElement().wrap( "_id" ); + BSONObj res = reduceValues( values , scope.get() , reduce , 1 , finalize ); + + setup.insert( setup.tempLong , res ); + } - ScriptingFunction map; - ScriptingFunction reduce; - ScriptingFunction finalize; + MRTL::MRTL( MRState& state ) + : _state( state ) + , _temp(new InMemory()) + { + _size = 0; + numEmits = 0; + } - }; - - class MRTL { - public: - MRTL( MRState& state ) - : _state( state ) - , _temp(new InMemory()) - { - _size = 0; - numEmits = 0; - } + void MRTL::reduceInMemory(){ + boost::shared_ptr<InMemory> old = _temp; + _temp.reset(new InMemory()); + _size = 0; - void reduceInMemory(){ - boost::shared_ptr<InMemory> old = _temp; - _temp.reset(new InMemory()); - _size = 0; + for ( InMemory::iterator i=old->begin(); i!=old->end(); i++ ){ + BSONObj key = i->first; + BSONList& all = i->second; - for ( InMemory::iterator i=old->begin(); i!=old->end(); i++ ){ - BSONObj key = i->first; - BSONList& all = i->second; - - if ( all.size() == 1 ){ - // this key has low cardinality, so just write to db - writelock l(_state.setup.incLong); - Client::Context ctx(_state.setup.incLong.c_str()); - write( *(all.begin()) ); - } - else if ( all.size() > 1 ){ - BSONObj res = reduceValues( all , _state.scope.get() , _state.reduce , false , 0 ); - insert( res ); - } + if ( all.size() == 1 ){ + // this key has low cardinality, so just write to db + writelock l(_state.setup.incLong); + Client::Context ctx(_state.setup.incLong.c_str()); + write( *(all.begin()) ); + } + else if ( all.size() > 1 ){ + BSONObj res = reduceValues( all , _state.scope.get() , _state.reduce , false , 0 ); + insert( res ); } } - - void dump(){ - writelock l(_state.setup.incLong); - Client::Context ctx(_state.setup.incLong); + } + + void MRTL::dump(){ + writelock l(_state.setup.incLong); + Client::Context ctx(_state.setup.incLong); - for ( InMemory::iterator i=_temp->begin(); i!=_temp->end(); i++ ){ - BSONList& all = i->second; - if ( all.size() < 1 ) - continue; + for ( InMemory::iterator i=_temp->begin(); i!=_temp->end(); i++ ){ + BSONList& all = i->second; + if ( all.size() < 1 ) + continue; - for ( BSONList::iterator j=all.begin(); j!=all.end(); j++ ) - write( *j ); - } - _temp->clear(); - _size = 0; - + for ( BSONList::iterator j=all.begin(); j!=all.end(); j++ ) + write( *j ); } + _temp->clear(); + _size = 0; + + } - void insert( const BSONObj& a ){ - BSONList& all = (*_temp)[a]; - all.push_back( a ); - _size += a.objsize() + 16; - } + void MRTL::insert( const BSONObj& a ){ + BSONList& all = (*_temp)[a]; + all.push_back( a ); + _size += a.objsize() + 16; + } - void checkSize(){ - if ( _size < 1024 * 5 ) - return; + void MRTL::checkSize(){ + if ( _size < 1024 * 5 ) + return; - long before = _size; - reduceInMemory(); - log(1) << " mr: did reduceInMemory " << before << " -->> " << _size << endl; + long before = _size; + reduceInMemory(); + log(1) << " mr: did reduceInMemory " << before << " -->> " << _size << endl; - if ( _size < 1024 * 15 ) - return; + if ( _size < 1024 * 15 ) + return; - dump(); - log(1) << " mr: dumping to db" << endl; - } + dump(); + log(1) << " mr: dumping to db" << endl; + } - private: - void write( BSONObj& o ){ - theDataFileMgr.insertWithObjMod( _state.setup.incLong.c_str() , o , true ); - } - - MRState& _state; - - boost::shared_ptr<InMemory> _temp; - long _size; - - public: - long long numEmits; - }; + void MRTL::write( BSONObj& o ){ + theDataFileMgr.insertWithObjMod( _state.setup.incLong.c_str() , o , true ); + } boost::thread_specific_ptr<MRTL> _tlmr; diff --git a/db/commands/mr.h b/db/commands/mr.h new file mode 100644 index 00000000000..7eec40078bc --- /dev/null +++ b/db/commands/mr.h @@ -0,0 +1,145 @@ +// mr.h + +/** + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "pch.h" + +namespace mongo { + + namespace mr { + + typedef vector<BSONObj> BSONList; + + class MyCmp { + public: + MyCmp(){} + bool operator()( const BSONObj &l, const BSONObj &r ) const { + return l.firstElement().woCompare( r.firstElement() ) < 0; + } + }; + + typedef map< BSONObj,BSONList,MyCmp > InMemory; + + /** + * holds map/reduce config information + */ + class MRSetup { + public: + MRSetup( const string& _dbname , const BSONObj& cmdObj , bool markAsTemp = true ); + + /** Field expected to be a Code or CodeWScope. + * Add its scope, if any, to scopeBuilder, and return its code. + * Scopes added later will shadow those added earlier. */ + static string scopeAndCode (BSONObjBuilder& scopeBuilder, const BSONElement& field); + + /** + @return number objects in collection + */ + long long renameIfNeeded( DBDirectClient& db ); + + void insert( const string& ns , BSONObj& o ); + + string dbname; + string ns; + + // options + bool verbose; + bool keeptemp; + bool replicate; + + // query options + + BSONObj filter; + BSONObj sort; + long long limit; + + // functions + + string mapCode; + string reduceCode; + string finalizeCode; + + BSONObj mapparams; + BSONObj scopeSetup; + + // output tables + string incLong; + + string tempShort; + string tempLong; + + string finalShort; + string finalLong; + + enum { NORMAL , MERGE , REDUCE } outType; + + static AtomicUInt JOB_NUMBER; + }; // end MRsetup + + + /** + * container for all stack based map/reduce state + */ + class MRState { + public: + MRState( MRSetup& s ); + + void finalReduce( BSONList& values ); + + MRSetup& setup; + auto_ptr<Scope> scope; + DBDirectClient db; + + ScriptingFunction map; + ScriptingFunction reduce; + ScriptingFunction finalize; + + }; + + /** + * thread local map/reduce state + * used for access map/reduce state from inside javascript calls + */ + class MRTL { + public: + MRTL( MRState& state ); + + void reduceInMemory(); + + void dump(); + + void insert( const BSONObj& a ); + + void checkSize(); + + private: + void write( BSONObj& o ); + + MRState& _state; + + boost::shared_ptr<InMemory> _temp; + long _size; + + public: + long long numEmits; + }; + + } // end mr namespace +} + + |