summaryrefslogtreecommitdiff
path: root/db
diff options
context:
space:
mode:
authorEliot Horowitz <eliot@10gen.com>2010-11-15 14:39:42 -0500
committerEliot Horowitz <eliot@10gen.com>2010-11-15 14:39:56 -0500
commit1cafbccecade5457bb9a637619a34055c2936fbf (patch)
treefcc85ee8d76e536c43d78a065274ef172e3a6687 /db
parent1bb47c4a9289e24cb9593b274540b1d7640f53f6 (diff)
downloadmongo-1cafbccecade5457bb9a637619a34055c2936fbf.tar.gz
cleaning m/r code
Diffstat (limited to 'db')
-rw-r--r--db/commands/mr.cpp475
-rw-r--r--db/commands/mr.h145
2 files changed, 343 insertions, 277 deletions
diff --git a/db/commands/mr.cpp b/db/commands/mr.cpp
index fd7bca7d04e..722ee513e3f 100644
--- a/db/commands/mr.cpp
+++ b/db/commands/mr.cpp
@@ -27,23 +27,13 @@
#include "../matcher.h"
#include "../clientcursor.h"
+#include "mr.h"
+
namespace mongo {
namespace mr {
- typedef vector<BSONObj> BSONList;
-
- class MyCmp {
- public:
- MyCmp(){}
- bool operator()( const BSONObj &l, const BSONObj &r ) const {
- return l.firstElement().woCompare( r.firstElement() ) < 0;
- }
- };
-
- typedef pair<BSONObj,BSONObj> Data;
- //typedef list< Data > InMemory;
- typedef map< BSONObj,BSONList,MyCmp > InMemory;
+ AtomicUInt MRSetup::JOB_NUMBER;
BSONObj reduceValues( BSONList& values , Scope * s , ScriptingFunction reduce , bool final , ScriptingFunction finalize ){
uassert( 10074 , "need values" , values.size() );
@@ -118,321 +108,252 @@ namespace mongo {
return b.obj();
}
- class MRSetup {
- public:
- MRSetup( const string& _dbname , const BSONObj& cmdObj , bool markAsTemp = true ){
- static int jobNumber = 1;
+ MRSetup::MRSetup( const string& _dbname , const BSONObj& cmdObj , bool markAsTemp ){
+
+ dbname = _dbname;
+ ns = dbname + "." + cmdObj.firstElement().valuestr();
+
+ verbose = cmdObj["verbose"].trueValue();
+ keeptemp = cmdObj["keeptemp"].trueValue();
+
+ { // setup names
+ stringstream ss;
+ if ( ! keeptemp )
+ ss << "tmp.";
+ ss << "mr." << cmdObj.firstElement().String() << "_" << time(0) << "_" << JOB_NUMBER++;
+ tempShort = ss.str();
+ tempLong = dbname + "." + tempShort;
+ incLong = tempLong + "_inc";
- dbname = _dbname;
- ns = dbname + "." + cmdObj.firstElement().valuestr();
+ if ( ! keeptemp && markAsTemp )
+ cc().addTempCollection( tempLong );
- verbose = cmdObj["verbose"].trueValue();
- keeptemp = cmdObj["keeptemp"].trueValue();
-
- { // setup names
- stringstream ss;
- if ( ! keeptemp )
- ss << "tmp.";
- ss << "mr." << cmdObj.firstElement().String() << "_" << time(0) << "_" << jobNumber++;
- tempShort = ss.str();
- tempLong = dbname + "." + tempShort;
- incLong = tempLong + "_inc";
-
- if ( ! keeptemp && markAsTemp )
- cc().addTempCollection( tempLong );
-
- replicate = keeptemp;
-
- if ( cmdObj["out"].type() == String ){
- finalShort = cmdObj["out"].valuestr();
- replicate = true;
- }
- else
- finalShort = tempShort;
+ replicate = keeptemp;
+
+ if ( cmdObj["out"].type() == String ){
+ finalShort = cmdObj["out"].valuestr();
+ replicate = true;
+ }
+ else
+ finalShort = tempShort;
- finalLong = dbname + "." + finalShort;
+ finalLong = dbname + "." + finalShort;
- }
+ }
- if ( cmdObj["outType"].type() == String ){
- uassert( 13521 , "need 'out' if using 'outType'" , cmdObj["out"].type() == String );
- string t = cmdObj["outType"].String();
- if ( t == "normal" )
- outType = NORMAL;
- else if ( t == "merge" )
- outType = MERGE;
- else if ( t == "reduce" )
- outType = REDUCE;
- else
- uasserted( 13522 , str::stream() << "unknown outType [" << t << "]" );
- }
- else {
+ if ( cmdObj["outType"].type() == String ){
+ uassert( 13521 , "need 'out' if using 'outType'" , cmdObj["out"].type() == String );
+ string t = cmdObj["outType"].String();
+ if ( t == "normal" )
outType = NORMAL;
- }
+ else if ( t == "merge" )
+ outType = MERGE;
+ else if ( t == "reduce" )
+ outType = REDUCE;
+ else
+ uasserted( 13522 , str::stream() << "unknown outType [" << t << "]" );
+ }
+ else {
+ outType = NORMAL;
+ }
- { // scope and code
- // NOTE: function scopes are merged with m/r scope, not nested like they should be
- BSONObjBuilder scopeBuilder;
+ { // scope and code
+ // NOTE: function scopes are merged with m/r scope, not nested like they should be
+ BSONObjBuilder scopeBuilder;
- if ( cmdObj["scope"].type() == Object ){
- scopeBuilder.appendElements( cmdObj["scope"].embeddedObjectUserCheck() );
- }
+ if ( cmdObj["scope"].type() == Object ){
+ scopeBuilder.appendElements( cmdObj["scope"].embeddedObjectUserCheck() );
+ }
- mapCode = scopeAndCode( scopeBuilder, cmdObj["map"] );
- reduceCode = scopeAndCode( scopeBuilder, cmdObj["reduce"] );
- if ( cmdObj["finalize"].type() ){
- finalizeCode = scopeAndCode( scopeBuilder, cmdObj["finalize"] );
- }
+ mapCode = scopeAndCode( scopeBuilder, cmdObj["map"] );
+ reduceCode = scopeAndCode( scopeBuilder, cmdObj["reduce"] );
+ if ( cmdObj["finalize"].type() ){
+ finalizeCode = scopeAndCode( scopeBuilder, cmdObj["finalize"] );
+ }
- scopeSetup = scopeBuilder.obj();
+ scopeSetup = scopeBuilder.obj();
- if ( cmdObj["mapparams"].type() == Array ){
- mapparams = cmdObj["mapparams"].embeddedObjectUserCheck();
- }
-
+ if ( cmdObj["mapparams"].type() == Array ){
+ mapparams = cmdObj["mapparams"].embeddedObjectUserCheck();
}
+
+ }
- { // query options
- if ( cmdObj["query"].type() == Object ){
- filter = cmdObj["query"].embeddedObjectUserCheck();
- }
+ { // query options
+ if ( cmdObj["query"].type() == Object ){
+ filter = cmdObj["query"].embeddedObjectUserCheck();
+ }
- if ( cmdObj["sort"].type() == Object ){
- sort = cmdObj["sort"].embeddedObjectUserCheck();
- }
-
- if ( cmdObj["limit"].isNumber() )
- limit = cmdObj["limit"].numberLong();
- else
- limit = 0;
+ if ( cmdObj["sort"].type() == Object ){
+ sort = cmdObj["sort"].embeddedObjectUserCheck();
}
- }
- /** Field expected to be a Code or CodeWScope.
- * Add its scope, if any, to scopeBuilder, and return its code.
- * Scopes added later will shadow those added earlier. */
- static string scopeAndCode (BSONObjBuilder& scopeBuilder, const BSONElement& field) {
- if ( field.type() == CodeWScope )
- scopeBuilder.appendElements( field.codeWScopeObject() );
- return field._asCode();
+ if ( cmdObj["limit"].isNumber() )
+ limit = cmdObj["limit"].numberLong();
+ else
+ limit = 0;
}
+ }
- /**
- @return number objects in collection
- */
- long long renameIfNeeded( DBDirectClient& db ){
- assertInWriteLock();
- if ( finalLong != tempLong ){
+ string MRSetup::scopeAndCode (BSONObjBuilder& scopeBuilder, const BSONElement& field) {
+ if ( field.type() == CodeWScope )
+ scopeBuilder.appendElements( field.codeWScopeObject() );
+ return field._asCode();
+ }
+
+ long long MRSetup::renameIfNeeded( DBDirectClient& db ){
+ assertInWriteLock();
+ if ( finalLong != tempLong ){
- if ( outType == NORMAL ){
- db.dropCollection( finalLong );
- if ( db.count( tempLong ) ){
- BSONObj info;
- uassert( 10076 , "rename failed" ,
- db.runCommand( "admin" , BSON( "renameCollection" << tempLong << "to" << finalLong ) , info ) );
- }
+ if ( outType == NORMAL ){
+ db.dropCollection( finalLong );
+ if ( db.count( tempLong ) ){
+ BSONObj info;
+ uassert( 10076 , "rename failed" ,
+ db.runCommand( "admin" , BSON( "renameCollection" << tempLong << "to" << finalLong ) , info ) );
}
- else if ( outType == MERGE ){
- auto_ptr<DBClientCursor> cursor = db.query( tempLong , BSONObj() );
- while ( cursor->more() ){
- BSONObj o = cursor->next();
- Helpers::upsert( finalLong , o );
- }
- db.dropCollection( tempLong );
- }
- else if ( outType == REDUCE ){
- assert(0);
- }
- else {
- assert(0);
+ }
+ else if ( outType == MERGE ){
+ auto_ptr<DBClientCursor> cursor = db.query( tempLong , BSONObj() );
+ while ( cursor->more() ){
+ BSONObj o = cursor->next();
+ Helpers::upsert( finalLong , o );
}
+ db.dropCollection( tempLong );
+ }
+ else if ( outType == REDUCE ){
+ assert(0);
+ }
+ else {
+ assert(0);
}
- return db.count( finalLong );
}
+ return db.count( finalLong );
+ }
- void insert( const string& ns , BSONObj& o ){
- writelock l( ns );
- Client::Context ctx( ns );
+ void MRSetup::insert( const string& ns , BSONObj& o ){
+ writelock l( ns );
+ Client::Context ctx( ns );
- if ( replicate )
- theDataFileMgr.insertAndLog( ns.c_str() , o , false );
- else
- theDataFileMgr.insertWithObjMod( ns.c_str() , o , false );
+ if ( replicate )
+ theDataFileMgr.insertAndLog( ns.c_str() , o , false );
+ else
+ theDataFileMgr.insertWithObjMod( ns.c_str() , o , false );
- }
+ }
- string dbname;
- string ns;
-
- // options
- bool verbose;
- bool keeptemp;
- bool replicate;
- // query options
+ MRState::MRState( MRSetup& s ) : setup(s){
+ scope = globalScriptEngine->getPooledScope( setup.dbname );
+ scope->localConnect( setup.dbname.c_str() );
- BSONObj filter;
- BSONObj sort;
- long long limit;
-
- // functions
+ map = scope->createFunction( setup.mapCode.c_str() );
+ if ( ! map )
+ throw UserException( 9012, (string)"map compile failed: " + scope->getError() );
- string mapCode;
- string reduceCode;
- string finalizeCode;
+ reduce = scope->createFunction( setup.reduceCode.c_str() );
+ if ( ! reduce )
+ throw UserException( 9013, (string)"reduce compile failed: " + scope->getError() );
- BSONObj mapparams;
- BSONObj scopeSetup;
+ if ( setup.finalizeCode.size() )
+ finalize = scope->createFunction( setup.finalizeCode.c_str() );
+ else
+ finalize = 0;
- // output tables
- string incLong;
+ if ( ! setup.scopeSetup.isEmpty() )
+ scope->init( &setup.scopeSetup );
- string tempShort;
- string tempLong;
+ db.dropCollection( setup.tempLong );
+ db.dropCollection( setup.incLong );
- string finalShort;
- string finalLong;
-
- enum { NORMAL , MERGE , REDUCE } outType;
+ writelock l( setup.incLong );
+ Client::Context ctx( setup.incLong );
+ string err;
+ assert( userCreateNS( setup.incLong.c_str() , BSON( "autoIndexId" << 0 ) , err , false ) );
- }; // end MRsetup
-
- class MRState {
- public:
- MRState( MRSetup& s ) : setup(s){
- scope = globalScriptEngine->getPooledScope( setup.dbname );
- scope->localConnect( setup.dbname.c_str() );
-
- map = scope->createFunction( setup.mapCode.c_str() );
- if ( ! map )
- throw UserException( 9012, (string)"map compile failed: " + scope->getError() );
-
- reduce = scope->createFunction( setup.reduceCode.c_str() );
- if ( ! reduce )
- throw UserException( 9013, (string)"reduce compile failed: " + scope->getError() );
-
- if ( setup.finalizeCode.size() )
- finalize = scope->createFunction( setup.finalizeCode.c_str() );
- else
- finalize = 0;
-
- if ( ! setup.scopeSetup.isEmpty() )
- scope->init( &setup.scopeSetup );
-
- db.dropCollection( setup.tempLong );
- db.dropCollection( setup.incLong );
-
- writelock l( setup.incLong );
- Client::Context ctx( setup.incLong );
- string err;
- assert( userCreateNS( setup.incLong.c_str() , BSON( "autoIndexId" << 0 ) , err , false ) );
-
- }
-
- void finalReduce( BSONList& values ){
- if ( values.size() == 0 )
- return;
-
- BSONObj key = values.begin()->firstElement().wrap( "_id" );
- BSONObj res = reduceValues( values , scope.get() , reduce , 1 , finalize );
-
- setup.insert( setup.tempLong , res );
- }
-
+ }
+
+ void MRState::finalReduce( BSONList& values ){
+ if ( values.size() == 0 )
+ return;
- MRSetup& setup;
- auto_ptr<Scope> scope;
- DBDirectClient db;
+ BSONObj key = values.begin()->firstElement().wrap( "_id" );
+ BSONObj res = reduceValues( values , scope.get() , reduce , 1 , finalize );
+
+ setup.insert( setup.tempLong , res );
+ }
- ScriptingFunction map;
- ScriptingFunction reduce;
- ScriptingFunction finalize;
+ MRTL::MRTL( MRState& state )
+ : _state( state )
+ , _temp(new InMemory())
+ {
+ _size = 0;
+ numEmits = 0;
+ }
- };
-
- class MRTL {
- public:
- MRTL( MRState& state )
- : _state( state )
- , _temp(new InMemory())
- {
- _size = 0;
- numEmits = 0;
- }
+ void MRTL::reduceInMemory(){
+ boost::shared_ptr<InMemory> old = _temp;
+ _temp.reset(new InMemory());
+ _size = 0;
- void reduceInMemory(){
- boost::shared_ptr<InMemory> old = _temp;
- _temp.reset(new InMemory());
- _size = 0;
+ for ( InMemory::iterator i=old->begin(); i!=old->end(); i++ ){
+ BSONObj key = i->first;
+ BSONList& all = i->second;
- for ( InMemory::iterator i=old->begin(); i!=old->end(); i++ ){
- BSONObj key = i->first;
- BSONList& all = i->second;
-
- if ( all.size() == 1 ){
- // this key has low cardinality, so just write to db
- writelock l(_state.setup.incLong);
- Client::Context ctx(_state.setup.incLong.c_str());
- write( *(all.begin()) );
- }
- else if ( all.size() > 1 ){
- BSONObj res = reduceValues( all , _state.scope.get() , _state.reduce , false , 0 );
- insert( res );
- }
+ if ( all.size() == 1 ){
+ // this key has low cardinality, so just write to db
+ writelock l(_state.setup.incLong);
+ Client::Context ctx(_state.setup.incLong.c_str());
+ write( *(all.begin()) );
+ }
+ else if ( all.size() > 1 ){
+ BSONObj res = reduceValues( all , _state.scope.get() , _state.reduce , false , 0 );
+ insert( res );
}
}
-
- void dump(){
- writelock l(_state.setup.incLong);
- Client::Context ctx(_state.setup.incLong);
+ }
+
+ void MRTL::dump(){
+ writelock l(_state.setup.incLong);
+ Client::Context ctx(_state.setup.incLong);
- for ( InMemory::iterator i=_temp->begin(); i!=_temp->end(); i++ ){
- BSONList& all = i->second;
- if ( all.size() < 1 )
- continue;
+ for ( InMemory::iterator i=_temp->begin(); i!=_temp->end(); i++ ){
+ BSONList& all = i->second;
+ if ( all.size() < 1 )
+ continue;
- for ( BSONList::iterator j=all.begin(); j!=all.end(); j++ )
- write( *j );
- }
- _temp->clear();
- _size = 0;
-
+ for ( BSONList::iterator j=all.begin(); j!=all.end(); j++ )
+ write( *j );
}
+ _temp->clear();
+ _size = 0;
+
+ }
- void insert( const BSONObj& a ){
- BSONList& all = (*_temp)[a];
- all.push_back( a );
- _size += a.objsize() + 16;
- }
+ void MRTL::insert( const BSONObj& a ){
+ BSONList& all = (*_temp)[a];
+ all.push_back( a );
+ _size += a.objsize() + 16;
+ }
- void checkSize(){
- if ( _size < 1024 * 5 )
- return;
+ void MRTL::checkSize(){
+ if ( _size < 1024 * 5 )
+ return;
- long before = _size;
- reduceInMemory();
- log(1) << " mr: did reduceInMemory " << before << " -->> " << _size << endl;
+ long before = _size;
+ reduceInMemory();
+ log(1) << " mr: did reduceInMemory " << before << " -->> " << _size << endl;
- if ( _size < 1024 * 15 )
- return;
+ if ( _size < 1024 * 15 )
+ return;
- dump();
- log(1) << " mr: dumping to db" << endl;
- }
+ dump();
+ log(1) << " mr: dumping to db" << endl;
+ }
- private:
- void write( BSONObj& o ){
- theDataFileMgr.insertWithObjMod( _state.setup.incLong.c_str() , o , true );
- }
-
- MRState& _state;
-
- boost::shared_ptr<InMemory> _temp;
- long _size;
-
- public:
- long long numEmits;
- };
+ void MRTL::write( BSONObj& o ){
+ theDataFileMgr.insertWithObjMod( _state.setup.incLong.c_str() , o , true );
+ }
boost::thread_specific_ptr<MRTL> _tlmr;
diff --git a/db/commands/mr.h b/db/commands/mr.h
new file mode 100644
index 00000000000..7eec40078bc
--- /dev/null
+++ b/db/commands/mr.h
@@ -0,0 +1,145 @@
+// mr.h
+
+/**
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include "pch.h"
+
+namespace mongo {
+
+ namespace mr {
+
+ typedef vector<BSONObj> BSONList;
+
+ class MyCmp {
+ public:
+ MyCmp(){}
+ bool operator()( const BSONObj &l, const BSONObj &r ) const {
+ return l.firstElement().woCompare( r.firstElement() ) < 0;
+ }
+ };
+
+ typedef map< BSONObj,BSONList,MyCmp > InMemory;
+
+ /**
+ * holds map/reduce config information
+ */
+ class MRSetup {
+ public:
+ MRSetup( const string& _dbname , const BSONObj& cmdObj , bool markAsTemp = true );
+
+ /** Field expected to be a Code or CodeWScope.
+ * Add its scope, if any, to scopeBuilder, and return its code.
+ * Scopes added later will shadow those added earlier. */
+ static string scopeAndCode (BSONObjBuilder& scopeBuilder, const BSONElement& field);
+
+ /**
+ @return number objects in collection
+ */
+ long long renameIfNeeded( DBDirectClient& db );
+
+ void insert( const string& ns , BSONObj& o );
+
+ string dbname;
+ string ns;
+
+ // options
+ bool verbose;
+ bool keeptemp;
+ bool replicate;
+
+ // query options
+
+ BSONObj filter;
+ BSONObj sort;
+ long long limit;
+
+ // functions
+
+ string mapCode;
+ string reduceCode;
+ string finalizeCode;
+
+ BSONObj mapparams;
+ BSONObj scopeSetup;
+
+ // output tables
+ string incLong;
+
+ string tempShort;
+ string tempLong;
+
+ string finalShort;
+ string finalLong;
+
+ enum { NORMAL , MERGE , REDUCE } outType;
+
+ static AtomicUInt JOB_NUMBER;
+ }; // end MRsetup
+
+
+ /**
+ * container for all stack based map/reduce state
+ */
+ class MRState {
+ public:
+ MRState( MRSetup& s );
+
+ void finalReduce( BSONList& values );
+
+ MRSetup& setup;
+ auto_ptr<Scope> scope;
+ DBDirectClient db;
+
+ ScriptingFunction map;
+ ScriptingFunction reduce;
+ ScriptingFunction finalize;
+
+ };
+
+ /**
+ * thread local map/reduce state
+ * used for access map/reduce state from inside javascript calls
+ */
+ class MRTL {
+ public:
+ MRTL( MRState& state );
+
+ void reduceInMemory();
+
+ void dump();
+
+ void insert( const BSONObj& a );
+
+ void checkSize();
+
+ private:
+ void write( BSONObj& o );
+
+ MRState& _state;
+
+ boost::shared_ptr<InMemory> _temp;
+ long _size;
+
+ public:
+ long long numEmits;
+ };
+
+ } // end mr namespace
+}
+
+