diff options
author | agirbal <antoine@10gen.com> | 2012-02-17 17:09:13 -0800 |
---|---|---|
committer | agirbal <antoine@10gen.com> | 2012-02-17 17:10:04 -0800 |
commit | 49405ce14fc98462a09f94b3c0b79ebe817aa6d3 (patch) | |
tree | 50a1a8d19892a5d3783cb0f3792a504e9321042b | |
parent | f06e7dc332a694ed3b344c360338b2a18814db2b (diff) | |
download | mongo-49405ce14fc98462a09f94b3c0b79ebe817aa6d3.tar.gz |
SERVER-4989: MR: errors that happen during sharded post processing are ignored
-rw-r--r-- | .gitignore | 1 | ||||
-rw-r--r-- | src/mongo/db/commands/mr.cpp | 2 | ||||
-rw-r--r-- | src/mongo/s/commands_public.cpp | 54 |
3 files changed, 38 insertions, 19 deletions
diff --git a/.gitignore b/.gitignore index ad78b8e0018..062c2826a42 100644 --- a/.gitignore +++ b/.gitignore @@ -135,3 +135,4 @@ src/mongo/shell/mongo.cpp src/third_party/js-1.7/jsautocfg.h src/third_party/js-1.7/jsautokw.h buildinfo.cpp +/.settings/
\ No newline at end of file diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp index 64c52589006..478b101fc9b 100644 --- a/src/mongo/db/commands/mr.cpp +++ b/src/mongo/db/commands/mr.cpp @@ -613,7 +613,7 @@ namespace mongo { _config.reducer->init( this ); if ( _config.finalizer ) _config.finalizer->init( this ); - _scope->setBoolean("_doFinal", _config.finalizer); + _scope->setBoolean("_doFinal", _config.finalizer.get() != 0); // by default start in JS mode, will be faster for small jobs _jsMode = _config.jsMode; diff --git a/src/mongo/s/commands_public.cpp b/src/mongo/s/commands_public.cpp index d54e7a2d661..0069d6058fa 100644 --- a/src/mongo/s/commands_public.cpp +++ b/src/mongo/s/commands_public.cpp @@ -1013,6 +1013,19 @@ namespace mongo { return c; } + void cleanUp( const set<ServerAndQuery>& servers, string dbName, string shardResultCollection ) { + try { + // drop collections with tmp results on each shard + for ( set<ServerAndQuery>::iterator i=servers.begin(); i!=servers.end(); i++ ) { + ScopedDbConnection conn( i->_server ); + conn->dropCollection( dbName + "." + shardResultCollection ); + conn.done(); + } + } catch ( std::exception e ) { + log() << "Cannot cleanup shard results" << causedBy( e ) << endl; + } + } + bool run(const string& dbName , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool) { return run( dbName, cmdObj, errmsg, result, 0 ); } @@ -1105,6 +1118,8 @@ namespace mongo { BSONObjBuilder aggCountsB; map<string,long long> countsMap; set< BSONObj > splitPts; + BSONObj singleResult; + bool ok = true; { // take distributed lock to prevent split / migration @@ -1143,12 +1158,17 @@ namespace mongo { for ( map<Shard,BSONObj>::iterator i = results.begin(); i != results.end(); ++i ){ - BSONObj mrResult = i->second; + // need to gather list of all servers even if an error happened string server = i->first.getConnString(); + servers.insert( server ); + if ( !ok ) continue; - BSONObj counts = mrResult["counts"].embeddedObjectUserCheck(); + singleResult = i->second; + ok = singleResult["ok"].trueValue(); + if ( !ok ) continue; + + BSONObj counts = singleResult["counts"].embeddedObjectUserCheck(); shardCountsB.append( server , counts ); - servers.insert( server ); // add up the counts for each shard // some of them will be fixed later like output and reduce @@ -1158,8 +1178,8 @@ namespace mongo { countsMap[temp.fieldName()] += temp.numberLong(); } - if (mrResult.hasField("splitKeys")) { - BSONElement splitKeys = mrResult.getField("splitKeys"); + if (singleResult.hasField("splitKeys")) { + BSONElement splitKeys = singleResult.getField("splitKeys"); vector<BSONElement> pts = splitKeys.Array(); for (vector<BSONElement>::iterator it = pts.begin(); it != pts.end(); ++it) { splitPts.insert(it->Obj().getOwned()); @@ -1168,6 +1188,13 @@ namespace mongo { } } + if ( ! ok ) { + cleanUp( servers, dbName, shardResultCollection ); + errmsg = "MR parallel processing failed: "; + errmsg += singleResult.toString(); + return 0; + } + // build the sharded finish command BSONObjBuilder finalCmd; finalCmd.append( "mapreduce.shardedfinish" , cmdObj ); @@ -1184,8 +1211,6 @@ namespace mongo { finalCmd.append( "counts" , aggCounts ); Timer t2; - BSONObj singleResult; - bool ok = false; long long reduceCount = 0; long long outputCount = 0; BSONObjBuilder postCountsB; @@ -1258,6 +1283,8 @@ namespace mongo { string server = i->first.getConnString(); singleResult = i->second; + ok = singleResult["ok"].trueValue(); + if ( !ok ) break; BSONObj counts = singleResult.getObjectField("counts"); reduceCount += counts.getIntField("reduce"); @@ -1295,19 +1322,10 @@ namespace mongo { } } - try { - // drop collections with tmp results on each shard - for ( set<ServerAndQuery>::iterator i=servers.begin(); i!=servers.end(); i++ ) { - ScopedDbConnection conn( i->_server ); - conn->dropCollection( dbName + "." + shardResultCollection ); - conn.done(); - } - } catch ( std::exception e ) { - log() << "Cannot cleanup shard results" << causedBy( e ) << endl; - } + cleanUp( servers, dbName, shardResultCollection ); if ( ! ok ) { - errmsg = "final reduce failed: "; + errmsg = "MR post processing failed: "; errmsg += singleResult.toString(); return 0; } |