summaryrefslogtreecommitdiff
path: root/db/mr.cpp
diff options
context:
space:
mode:
authorEliot Horowitz <eliot@10gen.com>2009-11-04 15:25:45 -0500
committerEliot Horowitz <eliot@10gen.com>2009-11-04 15:25:45 -0500
commitd5048b8e6c972899180b57e716e57d887c7dcfab (patch)
tree4654d2f3dfad496ff398497551b59d91150bf706 /db/mr.cpp
parenta5b8dc4e0def5566de33b70335bdc46b1e030f08 (diff)
downloadmongo-d5048b8e6c972899180b57e716e57d887c7dcfab.tar.gz
m/r refactor check point 1
Diffstat (limited to 'db/mr.cpp')
-rw-r--r--db/mr.cpp195
1 files changed, 123 insertions, 72 deletions
diff --git a/db/mr.cpp b/db/mr.cpp
index 4e28bac9fb5..75d650eb0df 100644
--- a/db/mr.cpp
+++ b/db/mr.cpp
@@ -33,6 +33,96 @@ namespace mongo {
typedef map< BSONObj,list<BSONObj>,BSONObjCmp > InMemory;
typedef map< BSONObj,int,BSONObjCmp > KeyNums;
+ class MRSetup {
+ public:
+ MRSetup( const string& _dbname , const BSONObj& cmdObj , bool markAsTemp = true ){
+ static int jobNumber = 1;
+
+ dbname = _dbname;
+ ns = dbname + "." + cmdObj.firstElement().valuestr();
+
+ verbose = cmdObj["verbose"].trueValue();
+ keeptemp = cmdObj["keeptemp"].trueValue();
+
+ { // setup names
+ stringstream ss;
+ if ( ! keeptemp )
+ ss << "tmp.";
+ ss << "mr." << cmdObj.firstElement().fieldName() << "_" << time(0) << "_" << jobNumber++;
+ tempShort = ss.str();
+ tempLong = dbname + "." + tempShort;
+
+ if ( ! keeptemp && markAsTemp )
+ cc().addTempCollection( tempLong );
+
+ if ( cmdObj["out"].type() == String )
+ finalShort = cmdObj["out"].valuestr();
+ else
+ finalShort = tempShort;
+
+ finalLong = dbname + "." + finalShort;
+
+ }
+
+ { // code
+ mapCode = cmdObj["map"].ascode();
+ reduceCode = cmdObj["reduce"].ascode();
+ if ( cmdObj["finalize"].type() ){
+ finalizeCode = cmdObj["finalize"].ascode();
+ }
+
+ }
+
+ { // query options
+ if ( cmdObj["query"].type() == Object ){
+ filter = cmdObj["query"].embeddedObjectUserCheck();
+ q = filter;
+ }
+
+ if ( cmdObj["sort"].type() == Object )
+ q.sort( cmdObj["sort"].embeddedObjectUserCheck() );
+ }
+ }
+
+ /**
+ @return number objects in collection
+ */
+ long long renameIfNeeded( DBDirectClient& db ){
+ if ( finalLong != tempLong ){
+ dblock l;
+ db.dropCollection( finalLong );
+ BSONObj info;
+ uassert( "rename failed" , db.runCommand( "admin" , BSON( "renameCollection" << tempLong << "to" << finalLong ) , info ) );
+ }
+ return db.count( finalLong );
+ }
+
+ string dbname;
+ string ns;
+
+ // options
+ bool verbose;
+ bool keeptemp;
+
+ // query options
+
+ BSONObj filter;
+ Query q;
+
+ // functions
+
+ string mapCode;
+ string reduceCode;
+ string finalizeCode;
+
+ // output tables
+ string tempShort;
+ string tempLong;
+
+ string finalShort;
+ string finalLong;
+
+ };
class MyCmp {
public:
@@ -214,16 +304,6 @@ namespace mongo {
return BSONObj();
}
- string tempCollectionName( const BSONObj& cmd ){
- static int inc = 1;
- stringstream ss;
- ss << cc().database()->name << ".";
- if ( ! cmd["keeptemp"].trueValue() )
- ss << "tmp.";
- ss << "mr." << cmd.firstElement().fieldName() << "_" << time(0) << "_" << inc++;
- return ss.str();
- }
-
class MapReduceCommand : public Command {
public:
MapReduceCommand() : Command("mapreduce"){}
@@ -256,35 +336,22 @@ namespace mongo {
doReduce( resultColl , values , s , reduce );
}
- bool run(const char *dbname, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl ){
+ bool run(const char *dbname, BSONObj& cmd, string& errmsg, BSONObjBuilder& result, bool fromRepl ){
Timer t;
Client::GodScope cg;
- bool verboseOutput = cmdObj["verbose"].trueValue();
+ MRSetup mr( cc().database()->name , cmd );
- string ns = cc().database()->name + '.' + cmdObj.firstElement().valuestr();
- log(1) << "mr ns: " << ns << endl;
+ log(1) << "mr ns: " << mr.ns << endl;
- if ( ! db.exists( ns ) ){
+ if ( ! db.exists( mr.ns ) ){
errmsg = "ns doesn't exist";
return false;
}
-
- auto_ptr<Scope> s = globalScriptEngine->getPooledScope( ns );
- s->localConnect( cc().database()->name.c_str() );
+ auto_ptr<Scope> s = globalScriptEngine->getPooledScope( mr.dbname );
+ s->localConnect( mr.dbname.c_str() );
- bool istemp = ! cmdObj["keeptemp"].trueValue();
- string resultColl = tempCollectionName( cmdObj );
- if ( istemp )
- currentClient->addTempCollection( resultColl );
- string finalOutput = resultColl;
- if ( cmdObj["out"].type() == String )
- finalOutput = cc().database()->name + "." + cmdObj["out"].valuestr();
-
- string resultCollShort = resultColl.substr( cc().database()->name.size() + 1 );
- string finalOutputShort = finalOutput.substr( cc().database()->name.size() + 1 );
- log(1) << "\t resultColl: " << resultColl << " short: " << resultCollShort << endl;
- db.dropCollection( resultColl );
+ db.dropCollection( mr.tempLong );
long long num = 0;
long long inReduce = 0;
@@ -294,43 +361,33 @@ namespace mongo {
try {
dbtemprelease temprlease;
- s->execSetup( (string)"tempcoll = db[\"" + resultCollShort + "\"]; db.getMongo().setSlaveOk();" , "tempcoll1" );
+ s->execSetup( (string)"tempcoll = db[\"" + mr.tempShort + "\"]; db.getMongo().setSlaveOk();" , "tempcoll1" );
s->execSetup( "MR.init()" );
s->injectNative( "get_num" , get_num );
s->injectNative( "reset_num" , reset_num );
- ScriptingFunction mapFunction = s->createFunction( cmdObj["map"].ascode().c_str() );
- ScriptingFunction reduceFunction = s->createFunction( cmdObj["reduce"].ascode().c_str() );
- s->execSetup( (string)"$reduce = " + cmdObj["reduce"].ascode() );
+ ScriptingFunction mapFunction = s->createFunction( mr.mapCode.c_str() );
+ ScriptingFunction reduceFunction = s->createFunction( mr.reduceCode.c_str() );
+ s->execSetup( (string)"$reduce = " + mr.reduceCode );
- MRTL * mrtl = new MRTL( &db , resultColl , s.get() , reduceFunction );
+ MRTL * mrtl = new MRTL( &db , mr.tempLong , s.get() , reduceFunction );
_tlmr.reset( mrtl );
- Query q;
- BSONObj filter;
- if ( cmdObj["query"].type() == Object ){
- filter = cmdObj["query"].embeddedObjectUserCheck();
- q = filter;
- }
-
- if ( cmdObj["sort"].type() == Object )
- q.sort( cmdObj["sort"].embeddedObjectUserCheck() );
-
- ProgressMeter pm( db.count( ns , filter ) );
- auto_ptr<DBClientCursor> cursor = db.query( ns , q );
+ ProgressMeter pm( db.count( mr.ns , mr.filter ) );
+ auto_ptr<DBClientCursor> cursor = db.query( mr.ns , mr.q );
long long mapTime = 0;
Timer mt;
while ( cursor->more() ){
BSONObj o = cursor->next();
- if ( verboseOutput ) mt.reset();
+ if ( mr.verbose ) mt.reset();
s->setThis( &o );
if ( s->invoke( mapFunction , BSONObj() , 0 , true ) )
throw UserException( (string)"map invoke failed: " + s->getError() );
- if ( verboseOutput ) mapTime += mt.micros();
+ if ( mr.verbose ) mapTime += mt.micros();
num++;
if ( num % 100 == 0 ){
@@ -361,13 +418,13 @@ namespace mongo {
inReduce += t.micros();
timingBuilder.append( "reduce" , inReduce / 1000 );
}
- if ( verboseOutput ){
+ if ( mr.verbose ){
countsBuilder.append( "reduces" , s->getNumber( "$numReduces" ) );
countsBuilder.append( "reducesToDB" , s->getNumber( "$numReducesToDB" ) );
}
- if ( cmdObj["finalize"].type() ){
- s->execSetup( (string)"$finalize = " + cmdObj["finalize"].ascode() );
+ if ( mr.finalizeCode.size() ){
+ s->execSetup( (string)"$finalize = " + mr.finalizeCode );
s->execSetup( "MR.finalize()" );
}
@@ -399,29 +456,24 @@ namespace mongo {
}
catch ( ... ){
log() << "mr failed, removing collection" << endl;
- db.dropCollection( resultColl );
+ db.dropCollection( mr.tempLong );
throw;
}
- if ( finalOutput != resultColl ){
- // need to do this with the full dblock, that's why its after the try/catch
- db.dropCollection( finalOutput );
- BSONObj info;
- uassert( "rename failed" , db.runCommand( "admin" , BSON( "renameCollection" << resultColl << "to" << finalOutput ) , info ) );
- }
+ long long finalCount = mr.renameIfNeeded( db );
- if ( db.count( finalOutput ) == 0 && numEmits > 0 ){
+ if ( finalCount == 0 && numEmits > 0 ){
errmsg = "there were emits but no data!";
return false;
}
timingBuilder.append( "total" , t.millis() );
- result.append( "result" , finalOutputShort );
+ result.append( "result" , mr.finalShort );
result.append( "timeMillis" , t.millis() );
- countsBuilder.append( "output" , (long long)(db.count( finalOutput )) );
- if ( verboseOutput ) result.append( "timing" , timingBuilder.obj() );
+ countsBuilder.append( "output" , finalCount );
+ if ( mr.verbose ) result.append( "timing" , timingBuilder.obj() );
result.append( "counts" , countsBuilder.obj() );
return true;
@@ -443,7 +495,7 @@ namespace mongo {
string dbname = cc().database()->name;
string shardedOutputCollection = cmdObj["shardedOutputCollection"].valuestrsafe();
- BSONObj origCmd = cmdObj.firstElement().embeddedObjectUserCheck();
+ MRSetup mr( dbname , cmdObj.firstElement().embeddedObjectUserCheck() , false );
set<ServerAndQuery> servers;
@@ -478,15 +530,11 @@ namespace mongo {
auto_ptr<Scope> s = globalScriptEngine->getPooledScope( ns );
- ScriptingFunction reduceFunction = s->createFunction( origCmd["reduce"].ascode().c_str() );
+ ScriptingFunction reduceFunction = s->createFunction( mr.reduceCode.c_str() );
list<BSONObj> values;
- string output = origCmd["out"].valuestrsafe();
- if ( output.size() == 0 )
- output = tempCollectionName( origCmd );
- string fulloutput = dbname + "." + output;
- result.append( "result" , output );
+ result.append( "result" , mr.finalShort );
DBDirectClient db;
@@ -503,14 +551,17 @@ namespace mongo {
continue;
}
- db.insert( fulloutput , reduceValues( values , s.get() , reduceFunction ) );
+ db.insert( mr.tempLong , reduceValues( values , s.get() , reduceFunction ) );
values.clear();
values.push_back( t );
}
if ( values.size() )
- db.insert( fulloutput , reduceValues( values , s.get() , reduceFunction ) );
+ db.insert( mr.tempLong , reduceValues( values , s.get() , reduceFunction ) );
+ long long finalCount = mr.renameIfNeeded( db );
+ log(0) << " mapreducefinishcommand " << mr.finalLong << " " << finalCount << endl;
+
for ( set<ServerAndQuery>::iterator i=servers.begin(); i!=servers.end(); i++ ){
ScopedDbConnection conn( i->_server );
conn->dropCollection( dbname + "." + shardedOutputCollection );