diff options
-rw-r--r-- | client/parallel.cpp | 19 | ||||
-rw-r--r-- | client/parallel.h | 4 | ||||
-rw-r--r-- | db/mr.cpp | 88 | ||||
-rw-r--r-- | jstests/sharding/features2.js | 4 | ||||
-rw-r--r-- | s/commands_public.cpp | 42 |
5 files changed, 127 insertions, 30 deletions
diff --git a/client/parallel.cpp b/client/parallel.cpp index e77acfccdb5..410e3e90499 100644 --- a/client/parallel.cpp +++ b/client/parallel.cpp @@ -113,16 +113,29 @@ namespace mongo { // -------- ParallelSortClusteredCursor ----------- - ParallelSortClusteredCursor::ParallelSortClusteredCursor( set<ServerAndQuery> servers , QueryMessage& q , const BSONObj& sortKey ) : ClusteredCursor( q ) , _servers( servers ){ - _numServers = servers.size(); + ParallelSortClusteredCursor::ParallelSortClusteredCursor( set<ServerAndQuery> servers , QueryMessage& q , + const BSONObj& sortKey ) + : ClusteredCursor( q ) , _servers( servers ){ _sortKey = sortKey.getOwned(); + _init(); + } + + ParallelSortClusteredCursor::ParallelSortClusteredCursor( set<ServerAndQuery> servers , const string& ns , + const Query& q , + int options , const BSONObj& fields ) + : ClusteredCursor( ns , q.obj , options , fields ) , _servers( servers ){ + _sortKey = q.getSort().copy(); + _init(); + } + void ParallelSortClusteredCursor::_init(){ + _numServers = _servers.size(); _cursors = new auto_ptr<DBClientCursor>[_numServers]; _nexts = new BSONObj[_numServers]; // TODO: parellize int num = 0; - for ( set<ServerAndQuery>::iterator i = servers.begin(); i!=servers.end(); i++ ){ + for ( set<ServerAndQuery>::iterator i = _servers.begin(); i!=_servers.end(); i++ ){ const ServerAndQuery& sq = *i; _cursors[num++] = query( sq._server , 0 , sq._extra ); } diff --git a/client/parallel.h b/client/parallel.h index e7d39a89ba9..a2189c6c745 100644 --- a/client/parallel.h +++ b/client/parallel.h @@ -87,10 +87,14 @@ namespace mongo { class ParallelSortClusteredCursor : public ClusteredCursor { public: ParallelSortClusteredCursor( set<ServerAndQuery> servers , QueryMessage& q , const BSONObj& sortKey ); + ParallelSortClusteredCursor( set<ServerAndQuery> servers , const string& ns , + const Query& q , int options=0, const BSONObj& fields=BSONObj() ); virtual ~ParallelSortClusteredCursor(); virtual bool more(); virtual BSONObj next(); private: + void _init(); + void advance(); int _numServers; diff --git a/db/mr.cpp b/db/mr.cpp index a3aa178ddec..dcf994074fb 100644 --- a/db/mr.cpp +++ b/db/mr.cpp @@ -22,6 +22,7 @@ #include "../scripting/engine.h" #include "../client/dbclient.h" #include "../client/connpool.h" +#include "../client/parallel.h" namespace mongo { @@ -211,7 +212,17 @@ namespace mongo { _tlmr->resetNum(); return BSONObj(); } - + + string tempCollectionName( const BSONObj& cmd ){ + static int inc = 1; + stringstream ss; + ss << cc().database()->name << "."; + if ( ! cmd["keeptemp"].trueValue() ) + ss << "tmp."; + ss << "mr." << cmd.firstElement().fieldName() << "_" << time(0) << "_" << inc++; + return ss.str(); + } + class MapReduceCommand : public Command { public: MapReduceCommand() : Command("mapreduce"){} @@ -221,16 +232,6 @@ namespace mongo { help << "see http://www.mongodb.org/display/DOCS/MapReduce"; } - string tempCollectionName( string coll , bool tmp ){ - static int inc = 1; - stringstream ss; - ss << cc().database()->name << "."; - if ( tmp ) - ss << "tmp."; - ss << "mr." << coll << "_" << time(0) << "_" << inc++; - return ss.str(); - } - void doReduce( const string& resultColl , list<BSONObj>& values , Scope * s , ScriptingFunction reduce ){ if ( values.size() == 0 ) return; @@ -272,7 +273,7 @@ namespace mongo { s->localConnect( cc().database()->name.c_str() ); bool istemp = ! cmdObj["keeptemp"].trueValue(); - string resultColl = tempCollectionName( cmdObj.firstElement().valuestr() , istemp ); + string resultColl = tempCollectionName( cmdObj ); if ( istemp ) currentClient->addTempCollection( resultColl ); string finalOutput = resultColl; @@ -436,12 +437,14 @@ namespace mongo { virtual bool slaveOk() { return true; } bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ - + dbtemprelease temprlease; // we don't touch the db directly + string dbname = cc().database()->name; - + string shardedOutputCollection = cmdObj["shardedOutputCollection"].valuestrsafe(); + BSONObj origCmd = cmdObj.firstElement().embeddedObjectUserCheck(); - cout << origCmd << endl; - errmsg = "eliot was here"; + + set<ServerAndQuery> servers; BSONObj shards = cmdObj["shards"].embeddedObjectUserCheck(); vector< auto_ptr<DBClientCursor> > shardCursors; @@ -449,18 +452,55 @@ namespace mongo { while ( i.more() ){ BSONElement e = i.next(); string shard = e.fieldName(); - BSONObj res = e.embeddedObjectUserCheck(); - cout << "\t" << shard << "\t" << res << endl; - ScopedDbConnection conn( shard ); - //shardCursors.push_back( conn->query( dbname + "." + res["result"].valuestrsafe() , Query().sort( BSON( "_id" << 1 ) ) ) ); + BSONObj res = e.embeddedObjectUserCheck(); + + uassert( "something bad happened" , shardedOutputCollection == res["result"].valuestrsafe() ); + servers.insert( shard ); } + + BSONObj sortKey = BSON( "_id" << 1 ); + + ParallelSortClusteredCursor cursor( servers , dbname + "." + shardedOutputCollection , + Query().sort( sortKey ) ); - //while ( true ){ - - //} + + auto_ptr<Scope> s = globalScriptEngine->getPooledScope( ns ); + ScriptingFunction reduceFunction = s->createFunction( origCmd["reduce"].ascode().c_str() ); + + list<BSONObj> values; + + string output = origCmd["out"].valuestrsafe(); + if ( output.size() == 0 ) + output = tempCollectionName( origCmd ); + string fulloutput = dbname + "." + output; + result.append( "result" , output ); - return 0; + DBDirectClient db; + + while ( cursor.more() ){ + BSONObj t = cursor.next(); + + if ( values.size() == 0 ){ + values.push_back( t ); + continue; + } + + if ( t.woSortOrder( *(values.begin()) , sortKey ) == 0 ){ + values.push_back( t ); + continue; + } + + db.insert( fulloutput , reduceValues( values , s.get() , reduceFunction ) ); + values.clear(); + values.push_back( t ); + } + + if ( values.size() ) + db.insert( fulloutput , reduceValues( values , s.get() , reduceFunction ) ); + + + return 1; } } mapReduceFinishCommand; diff --git a/jstests/sharding/features2.js b/jstests/sharding/features2.js index 46a398e2596..a16f5887aef 100644 --- a/jstests/sharding/features2.js +++ b/jstests/sharding/features2.js @@ -91,8 +91,8 @@ doMR = function( n ){ doMR( "before" ); assert.eq( 1 , s.onNumShards( "mr" ) , "E1" ); -//s.shardGo( "mr" , { x : 1 } , { x : 2 } , { x : 3 } ); -//assert.eq( 2 , s.onNumShards( "mr" ) , "E1" ); +s.shardGo( "mr" , { x : 1 } , { x : 2 } , { x : 3 } ); +assert.eq( 2 , s.onNumShards( "mr" ) , "E1" ); doMR( "after" ); diff --git a/s/commands_public.cpp b/s/commands_public.cpp index a9f512cf35d..aa4b6e458cd 100644 --- a/s/commands_public.cpp +++ b/s/commands_public.cpp @@ -204,6 +204,41 @@ namespace mongo { class MRCmd : public PublicGridCommand { public: MRCmd() : PublicGridCommand( "mapreduce" ){} + + string getTmpName( const string& coll ){ + static int inc = 1; + stringstream ss; + ss << "tmp.mrs." << coll << "_" << time(0) << "_" << inc++; + return ss.str(); + } + + BSONObj fixForShards( const BSONObj& orig , const string& output ){ + BSONObjBuilder b; + BSONObjIterator i( orig ); + while ( i.more() ){ + BSONElement e = i.next(); + string fn = e.fieldName(); + if ( fn == "map" || + fn == "mapreduce" || + fn == "reduce" || + fn == "query" || + fn == "sort" || + fn == "verbose" ){ + b.append( e ); + } + else if ( fn == "keeptemp" || + fn == "out" || + fn == "finalize" ){ + // we don't want to copy these + } + else { + uassert( (string)"don't know mr field: " + fn , 0 ); + } + } + b.append( "out" , output ); + return b.obj(); + } + bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ string dbName = getDBName( ns ); @@ -226,8 +261,13 @@ namespace mongo { vector<Chunk*> chunks; cm->getChunksForQuery( chunks , q ); + const string shardedOutputCollection = getTmpName( collection ); + + BSONObj shardedCommand = fixForShards( cmdObj , shardedOutputCollection ); + BSONObjBuilder finalB; finalB.append( "mapreduce.shardedfinish" , cmdObj ); + finalB.append( "shardedOutputCollection" , shardedOutputCollection ); BSONObjBuilder shardresults; for ( vector<Chunk*>::iterator i = chunks.begin() ; i != chunks.end() ; i++ ){ @@ -235,7 +275,7 @@ namespace mongo { ScopedDbConnection conn( c->getShard() ); BSONObj myres; - if ( ! conn->runCommand( dbName , cmdObj , myres ) ){ + if ( ! conn->runCommand( dbName , shardedCommand , myres ) ){ errmsg = "mongod mr failed: "; errmsg += myres.toString(); return 0; |