diff options
author | Eliot Horowitz <eliot@10gen.com> | 2009-11-03 11:40:00 -0500 |
---|---|---|
committer | Eliot Horowitz <eliot@10gen.com> | 2009-11-03 11:40:00 -0500 |
commit | 6d5e43755d739b526d5b56c7a6c457e8f019a74b (patch) | |
tree | cd929539e73fb9f46129ad9bd1c4a9fa016a41cd /db/mr.cpp | |
parent | 3a1c9831c135fbef4bf02ac3458772196bd88038 (diff) | |
download | mongo-6d5e43755d739b526d5b56c7a6c457e8f019a74b.tar.gz |
sharded map-reduce basic test works SHARDING-37
Diffstat (limited to 'db/mr.cpp')
-rw-r--r-- | db/mr.cpp | 88 |
1 files changed, 64 insertions, 24 deletions
diff --git a/db/mr.cpp b/db/mr.cpp index a3aa178ddec..dcf994074fb 100644 --- a/db/mr.cpp +++ b/db/mr.cpp @@ -22,6 +22,7 @@ #include "../scripting/engine.h" #include "../client/dbclient.h" #include "../client/connpool.h" +#include "../client/parallel.h" namespace mongo { @@ -211,7 +212,17 @@ namespace mongo { _tlmr->resetNum(); return BSONObj(); } - + + string tempCollectionName( const BSONObj& cmd ){ + static int inc = 1; + stringstream ss; + ss << cc().database()->name << "."; + if ( ! cmd["keeptemp"].trueValue() ) + ss << "tmp."; + ss << "mr." << cmd.firstElement().fieldName() << "_" << time(0) << "_" << inc++; + return ss.str(); + } + class MapReduceCommand : public Command { public: MapReduceCommand() : Command("mapreduce"){} @@ -221,16 +232,6 @@ namespace mongo { help << "see http://www.mongodb.org/display/DOCS/MapReduce"; } - string tempCollectionName( string coll , bool tmp ){ - static int inc = 1; - stringstream ss; - ss << cc().database()->name << "."; - if ( tmp ) - ss << "tmp."; - ss << "mr." << coll << "_" << time(0) << "_" << inc++; - return ss.str(); - } - void doReduce( const string& resultColl , list<BSONObj>& values , Scope * s , ScriptingFunction reduce ){ if ( values.size() == 0 ) return; @@ -272,7 +273,7 @@ namespace mongo { s->localConnect( cc().database()->name.c_str() ); bool istemp = ! cmdObj["keeptemp"].trueValue(); - string resultColl = tempCollectionName( cmdObj.firstElement().valuestr() , istemp ); + string resultColl = tempCollectionName( cmdObj ); if ( istemp ) currentClient->addTempCollection( resultColl ); string finalOutput = resultColl; @@ -436,12 +437,14 @@ namespace mongo { virtual bool slaveOk() { return true; } bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ - + dbtemprelease temprlease; // we don't touch the db directly + string dbname = cc().database()->name; - + string shardedOutputCollection = cmdObj["shardedOutputCollection"].valuestrsafe(); + BSONObj origCmd = cmdObj.firstElement().embeddedObjectUserCheck(); - cout << origCmd << endl; - errmsg = "eliot was here"; + + set<ServerAndQuery> servers; BSONObj shards = cmdObj["shards"].embeddedObjectUserCheck(); vector< auto_ptr<DBClientCursor> > shardCursors; @@ -449,18 +452,55 @@ namespace mongo { while ( i.more() ){ BSONElement e = i.next(); string shard = e.fieldName(); - BSONObj res = e.embeddedObjectUserCheck(); - cout << "\t" << shard << "\t" << res << endl; - ScopedDbConnection conn( shard ); - //shardCursors.push_back( conn->query( dbname + "." + res["result"].valuestrsafe() , Query().sort( BSON( "_id" << 1 ) ) ) ); + BSONObj res = e.embeddedObjectUserCheck(); + + uassert( "something bad happened" , shardedOutputCollection == res["result"].valuestrsafe() ); + servers.insert( shard ); } + + BSONObj sortKey = BSON( "_id" << 1 ); + + ParallelSortClusteredCursor cursor( servers , dbname + "." + shardedOutputCollection , + Query().sort( sortKey ) ); - //while ( true ){ - - //} + + auto_ptr<Scope> s = globalScriptEngine->getPooledScope( ns ); + ScriptingFunction reduceFunction = s->createFunction( origCmd["reduce"].ascode().c_str() ); + + list<BSONObj> values; + + string output = origCmd["out"].valuestrsafe(); + if ( output.size() == 0 ) + output = tempCollectionName( origCmd ); + string fulloutput = dbname + "." + output; + result.append( "result" , output ); - return 0; + DBDirectClient db; + + while ( cursor.more() ){ + BSONObj t = cursor.next(); + + if ( values.size() == 0 ){ + values.push_back( t ); + continue; + } + + if ( t.woSortOrder( *(values.begin()) , sortKey ) == 0 ){ + values.push_back( t ); + continue; + } + + db.insert( fulloutput , reduceValues( values , s.get() , reduceFunction ) ); + values.clear(); + values.push_back( t ); + } + + if ( values.size() ) + db.insert( fulloutput , reduceValues( values , s.get() , reduceFunction ) ); + + + return 1; } } mapReduceFinishCommand; |