diff options
author | agirbal <antoine@10gen.com> | 2011-06-26 18:59:37 -0700 |
---|---|---|
committer | agirbal <antoine@10gen.com> | 2011-06-26 19:00:52 -0700 |
commit | 1388ecbdb76160bbfbb2df018ed860c2c81f592a (patch) | |
tree | 5a6f8e67b02168d49573451aaaecd2066ba70403 /s | |
parent | 5a2f4ceb93b44283500f1ed346898439ca33b137 (diff) | |
download | mongo-1388ecbdb76160bbfbb2df018ed860c2c81f592a.tar.gz |
SERVER-2531: added REDUCE mode for M/R to sharded output collection
Diffstat (limited to 's')
-rw-r--r-- | s/commands_public.cpp | 81 | ||||
-rw-r--r-- | s/strategy.cpp | 8 | ||||
-rw-r--r-- | s/strategy.h | 8 | ||||
-rw-r--r-- | s/strategy_shard.cpp | 14 | ||||
-rw-r--r-- | s/strategy_single.cpp | 4 |
5 files changed, 80 insertions, 35 deletions
diff --git a/s/commands_public.cpp b/s/commands_public.cpp index 310598cc653..15ddf988341 100644 --- a/s/commands_public.cpp +++ b/s/commands_public.cpp @@ -1003,9 +1003,7 @@ namespace mongo { // so we allocate them in our thread // and hand off // Note: why not use pooled connections? This has been reported to create too many connections - vector< shared_ptr<ShardConnection> > shardConns; - list< shared_ptr<Future::CommandResult> > futures; for ( set<Shard>::iterator i=shards.begin(), end=shards.end() ; i != end ; i++ ) { @@ -1100,9 +1098,7 @@ namespace mongo { mr_shard::State state(config); log(1) << "mr sharded output ns: " << config.ns << endl; - // for now we only support replace output collection in sharded mode - if (config.outType != mr_shard::Config::REPLACE && - config.outType != mr_shard::Config::MERGE) { + if (config.outType == mr_shard::Config::INMEMORY) { errmsg = "This Map Reduce mode is not supported with sharded output"; return false; } @@ -1111,18 +1107,22 @@ namespace mongo { BSONObjBuilder loc; if ( !config.outDB.empty()) loc.append( "db" , config.outDB ); - if ( !config.finalShort.empty() ) - loc.append( "collection" , config.finalShort ); + loc.append( "collection" , config.finalShort ); result.append("result", loc.obj()); } else { if ( !config.finalShort.empty() ) result.append( "result" , config.finalShort ); } - string outns = config.finalLong; - bool merge = (config.outType == mr_shard::Config::MERGE); - if (!merge) { + string outns = config.finalLong; + string tempns; + if (config.outType == mr_shard::Config::REDUCE) { + // result will be inserted into a temp collection to post process + const string postProcessCollection = getTmpName( collection ); + cout << "post process collection is " << postProcessCollection << endl; + tempns = dbName + "." + postProcessCollection; + } else if (config.outType == mr_shard::Config::REPLACE) { // drop previous collection BSONObj dropColCmd = BSON("drop" << config.finalShort); BSONObjBuilder dropColResult(32); @@ -1134,14 +1134,20 @@ namespace mongo { } } - // create the sharded collection BSONObj sortKey = BSON( "_id" << 1 ); - BSONObj shardColCmd = BSON("shardCollection" << outns << "key" << sortKey); - BSONObjBuilder shardColResult(32); - bool res = Command::runAgainstRegistered("admin.$cmd", shardColCmd, shardColResult); - if (!res) { - errmsg = str::stream() << "Could not create sharded output collection " << outns << ": " << shardColResult.obj().toString(); - return false; + if (!conf->isSharded(outns)) { + // create the sharded collection + + BSONObj shardColCmd = BSON("shardCollection" << outns << "key" << sortKey); + BSONObjBuilder shardColResult(32); + bool res = Command::runAgainstRegistered("admin.$cmd", shardColCmd, shardColResult); + if (!res) { + errmsg = str::stream() << "Could not create sharded output collection " << outns << ": " << shardColResult.obj().toString(); + return false; + } + + // since it's new collection, use replace mode always + config.outType = mr_shard::Config::REPLACE; } ParallelSortClusteredCursor cursor( servers , dbName + "." + shardedOutputCollection , @@ -1152,7 +1158,7 @@ namespace mongo { mr_shard::BSONList values; Strategy* s = SHARDED; long long finalCount = 0; - while ( cursor.more() || values.size() > 0 ) { + while ( cursor.more() || !values.empty() ) { BSONObj t; if ( cursor.more() ) { t = cursor.next().getOwned(); @@ -1169,11 +1175,14 @@ namespace mongo { } BSONObj final = config.reducer->finalReduce(values, config.finalizer.get()); - if (merge) { + if (config.outType == mr_shard::Config::MERGE) { BSONObj id = final["_id"].wrap(); - s->updateSharded(conf, outns.c_str(), id, final, UpdateOption_Upsert); + s->updateSharded(conf, outns.c_str(), id, final, UpdateOption_Upsert, true); + } else if (config.outType == mr_shard::Config::REDUCE) { + // insert into temp collection, but final collection's sharding + s->insertSharded(conf, tempns.c_str(), final, 0, true, outns.c_str()); } else { - s->insertSharded(conf, outns.c_str(), final, 0); + s->insertSharded(conf, outns.c_str(), final, 0, true); } ++finalCount; values.clear(); @@ -1181,6 +1190,36 @@ namespace mongo { values.push_back( t ); } + if (config.outType == mr_shard::Config::REDUCE) { + // results were written to temp collection, need final reduce + vector< shared_ptr<ShardConnection> > shardConns; + list< shared_ptr<Future::CommandResult> > futures; + BSONObj finalCmdObj = finalCmd.obj(); + for ( set<Shard>::iterator i=shards.begin(), end=shards.end() ; i != end ; i++ ) { + shared_ptr<ShardConnection> temp( new ShardConnection( i->getConnString() , outns ) ); + futures.push_back( Future::spawnCommand( i->getConnString() , dbName , finalCmdObj , temp->get() ) ); + shardConns.push_back( temp ); + } + + // now wait for the result of all shards + bool failed = false; + for ( list< shared_ptr<Future::CommandResult> >::iterator i=futures.begin(); i!=futures.end(); i++ ) { + shared_ptr<Future::CommandResult> res = *i; + if ( ! res->join() ) { + error() << "final reduce on sharded output m/r failed on shard: " << res->getServer() << " error: " << res->result() << endl; + result.append( "cause" , res->result() ); + errmsg = "mongod mr failed: "; + errmsg += res->result().toString(); + failed = true; + continue; + } + BSONObj result = res->result(); + } + + for ( unsigned i=0; i<shardConns.size(); i++ ) + shardConns[i]->done(); + } + for ( set<ServerAndQuery>::iterator i=servers.begin(); i!=servers.end(); i++ ) { ScopedDbConnection conn( i->_server ); conn->dropCollection( dbName + "." + shardedOutputCollection ); diff --git a/s/strategy.cpp b/s/strategy.cpp index 6e4ddbc0bf2..b48a718b49d 100644 --- a/s/strategy.cpp +++ b/s/strategy.cpp @@ -67,17 +67,19 @@ namespace mongo { dbcon.done(); } - void Strategy::insert( const Shard& shard , const char * ns , const BSONObj& obj , int flags ) { + void Strategy::insert( const Shard& shard , const char * ns , const BSONObj& obj , int flags, bool safe ) { ShardConnection dbcon( shard , ns ); if ( dbcon.setVersion() ) { dbcon.done(); throw StaleConfigException( ns , "for insert" ); } dbcon->insert( ns , obj , flags); + if (safe) + dbcon->getLastError(); dbcon.done(); } - void Strategy::update( const Shard& shard , const char * ns , const BSONObj& query , const BSONObj& toupdate , int flags ) { + void Strategy::update( const Shard& shard , const char * ns , const BSONObj& query , const BSONObj& toupdate , int flags, bool safe ) { bool upsert = flags & UpdateOption_Upsert; bool multi = flags & UpdateOption_Multi; @@ -87,6 +89,8 @@ namespace mongo { throw StaleConfigException( ns , "for insert" ); } dbcon->update( ns , query , toupdate, upsert, multi); + if (safe) + dbcon->getLastError(); dbcon.done(); } diff --git a/s/strategy.h b/s/strategy.h index 6218e42c530..326a5150c38 100644 --- a/s/strategy.h +++ b/s/strategy.h @@ -32,15 +32,15 @@ namespace mongo { virtual void getMore( Request& r ) = 0; virtual void writeOp( int op , Request& r ) = 0; - virtual void insertSharded( DBConfigPtr conf, const char* ns, BSONObj& o, int flags ) = 0; - virtual void updateSharded( DBConfigPtr conf, const char* ns, BSONObj& query, BSONObj& toupdate, int flags ) = 0; + virtual void insertSharded( DBConfigPtr conf, const char* ns, BSONObj& o, int flags, bool safe=false, const char* nsChunkLookup=0 ) = 0; + virtual void updateSharded( DBConfigPtr conf, const char* ns, BSONObj& query, BSONObj& toupdate, int flags, bool safe=false ) = 0; protected: void doWrite( int op , Request& r , const Shard& shard , bool checkVersion = true ); void doQuery( Request& r , const Shard& shard ); - void insert( const Shard& shard , const char * ns , const BSONObj& obj , int flags=0 ); - void update( const Shard& shard , const char * ns , const BSONObj& query , const BSONObj& toupdate , int flags=0 ); + void insert( const Shard& shard , const char * ns , const BSONObj& obj , int flags=0 , bool safe=false ); + void update( const Shard& shard , const char * ns , const BSONObj& query , const BSONObj& toupdate , int flags=0, bool safe=false ); }; diff --git a/s/strategy_shard.cpp b/s/strategy_shard.cpp index 7710c00bf8f..4ff35673220 100644 --- a/s/strategy_shard.cpp +++ b/s/strategy_shard.cpp @@ -191,8 +191,10 @@ namespace mongo { } } - void insertSharded( DBConfigPtr conf, const char* ns, BSONObj& o, int flags ) { - ChunkManagerPtr manager = conf->getChunkManager(ns); + void insertSharded( DBConfigPtr conf, const char* ns, BSONObj& o, int flags, bool safe, const char* nsChunkLookup ) { + if (!nsChunkLookup) + nsChunkLookup = ns; + ChunkManagerPtr manager = conf->getChunkManager(nsChunkLookup); if ( ! manager->hasShardKey( o ) ) { bool bad = true; @@ -206,7 +208,7 @@ namespace mongo { } if ( bad ) { - log() << "tried to insert object without shard key: " << ns << " " << o << endl; + log() << "tried to insert object without shard key: " << nsChunkLookup << " " << o << endl; uasserted( 14842 , "tried to insert object without shard key" ); } @@ -221,7 +223,7 @@ namespace mongo { try { ChunkPtr c = manager->findChunk( o ); log(4) << " server:" << c->getShard().toString() << " " << o << endl; - insert( c->getShard() , ns , o , flags); + insert( c->getShard() , ns , o , flags, safe); // r.gotInsert(); // if ( r.getClientInfo()->autoSplitOk() ) @@ -344,7 +346,7 @@ namespace mongo { } } - void updateSharded( DBConfigPtr conf, const char* ns, BSONObj& query, BSONObj& toupdate, int flags ) { + void updateSharded( DBConfigPtr conf, const char* ns, BSONObj& query, BSONObj& toupdate, int flags, bool safe ) { ChunkManagerPtr manager = conf->getChunkManager(ns); BSONObj chunkFinder = query; @@ -410,7 +412,7 @@ namespace mongo { // int * x = (int*)(r.d().afterNS()); // x[0] |= UpdateOption_Broadcast; for ( set<Shard>::iterator i=shards.begin(); i!=shards.end(); i++) { - update(*i, ns, query, toupdate, flags); + update(*i, ns, query, toupdate, flags, safe); } } else { diff --git a/s/strategy_single.cpp b/s/strategy_single.cpp index 670b4070ce3..b3eef9dafa4 100644 --- a/s/strategy_single.cpp +++ b/s/strategy_single.cpp @@ -262,11 +262,11 @@ namespace mongo { return true; } - void insertSharded( DBConfigPtr conf, const char* ns, BSONObj& o, int flags ) { + void insertSharded( DBConfigPtr conf, const char* ns, BSONObj& o, int flags, bool safe, const char* nsChunkLookup ) { // only useful for shards } - void updateSharded( DBConfigPtr conf, const char* ns, BSONObj& query, BSONObj& toupdate, int flags ) { + void updateSharded( DBConfigPtr conf, const char* ns, BSONObj& query, BSONObj& toupdate, int flags, bool safe ) { // only useful for shards } |