summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoragirbal <antoine@10gen.com>2012-02-17 17:09:13 -0800
committeragirbal <antoine@10gen.com>2012-02-17 17:10:04 -0800
commit49405ce14fc98462a09f94b3c0b79ebe817aa6d3 (patch)
tree50a1a8d19892a5d3783cb0f3792a504e9321042b
parentf06e7dc332a694ed3b344c360338b2a18814db2b (diff)
downloadmongo-49405ce14fc98462a09f94b3c0b79ebe817aa6d3.tar.gz
SERVER-4989: MR: errors that happen during sharded post processing are ignored
-rw-r--r--.gitignore1
-rw-r--r--src/mongo/db/commands/mr.cpp2
-rw-r--r--src/mongo/s/commands_public.cpp54
3 files changed, 38 insertions, 19 deletions
diff --git a/.gitignore b/.gitignore
index ad78b8e0018..062c2826a42 100644
--- a/.gitignore
+++ b/.gitignore
@@ -135,3 +135,4 @@ src/mongo/shell/mongo.cpp
src/third_party/js-1.7/jsautocfg.h
src/third_party/js-1.7/jsautokw.h
buildinfo.cpp
+/.settings/ \ No newline at end of file
diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp
index 64c52589006..478b101fc9b 100644
--- a/src/mongo/db/commands/mr.cpp
+++ b/src/mongo/db/commands/mr.cpp
@@ -613,7 +613,7 @@ namespace mongo {
_config.reducer->init( this );
if ( _config.finalizer )
_config.finalizer->init( this );
- _scope->setBoolean("_doFinal", _config.finalizer);
+ _scope->setBoolean("_doFinal", _config.finalizer.get() != 0);
// by default start in JS mode, will be faster for small jobs
_jsMode = _config.jsMode;
diff --git a/src/mongo/s/commands_public.cpp b/src/mongo/s/commands_public.cpp
index d54e7a2d661..0069d6058fa 100644
--- a/src/mongo/s/commands_public.cpp
+++ b/src/mongo/s/commands_public.cpp
@@ -1013,6 +1013,19 @@ namespace mongo {
return c;
}
+ void cleanUp( const set<ServerAndQuery>& servers, string dbName, string shardResultCollection ) {
+ try {
+ // drop collections with tmp results on each shard
+ for ( set<ServerAndQuery>::iterator i=servers.begin(); i!=servers.end(); i++ ) {
+ ScopedDbConnection conn( i->_server );
+ conn->dropCollection( dbName + "." + shardResultCollection );
+ conn.done();
+ }
+ } catch ( std::exception e ) {
+ log() << "Cannot cleanup shard results" << causedBy( e ) << endl;
+ }
+ }
+
bool run(const string& dbName , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool) {
return run( dbName, cmdObj, errmsg, result, 0 );
}
@@ -1105,6 +1118,8 @@ namespace mongo {
BSONObjBuilder aggCountsB;
map<string,long long> countsMap;
set< BSONObj > splitPts;
+ BSONObj singleResult;
+ bool ok = true;
{
// take distributed lock to prevent split / migration
@@ -1143,12 +1158,17 @@ namespace mongo {
for ( map<Shard,BSONObj>::iterator i = results.begin(); i != results.end(); ++i ){
- BSONObj mrResult = i->second;
+ // need to gather list of all servers even if an error happened
string server = i->first.getConnString();
+ servers.insert( server );
+ if ( !ok ) continue;
- BSONObj counts = mrResult["counts"].embeddedObjectUserCheck();
+ singleResult = i->second;
+ ok = singleResult["ok"].trueValue();
+ if ( !ok ) continue;
+
+ BSONObj counts = singleResult["counts"].embeddedObjectUserCheck();
shardCountsB.append( server , counts );
- servers.insert( server );
// add up the counts for each shard
// some of them will be fixed later like output and reduce
@@ -1158,8 +1178,8 @@ namespace mongo {
countsMap[temp.fieldName()] += temp.numberLong();
}
- if (mrResult.hasField("splitKeys")) {
- BSONElement splitKeys = mrResult.getField("splitKeys");
+ if (singleResult.hasField("splitKeys")) {
+ BSONElement splitKeys = singleResult.getField("splitKeys");
vector<BSONElement> pts = splitKeys.Array();
for (vector<BSONElement>::iterator it = pts.begin(); it != pts.end(); ++it) {
splitPts.insert(it->Obj().getOwned());
@@ -1168,6 +1188,13 @@ namespace mongo {
}
}
+ if ( ! ok ) {
+ cleanUp( servers, dbName, shardResultCollection );
+ errmsg = "MR parallel processing failed: ";
+ errmsg += singleResult.toString();
+ return 0;
+ }
+
// build the sharded finish command
BSONObjBuilder finalCmd;
finalCmd.append( "mapreduce.shardedfinish" , cmdObj );
@@ -1184,8 +1211,6 @@ namespace mongo {
finalCmd.append( "counts" , aggCounts );
Timer t2;
- BSONObj singleResult;
- bool ok = false;
long long reduceCount = 0;
long long outputCount = 0;
BSONObjBuilder postCountsB;
@@ -1258,6 +1283,8 @@ namespace mongo {
string server = i->first.getConnString();
singleResult = i->second;
+ ok = singleResult["ok"].trueValue();
+ if ( !ok ) break;
BSONObj counts = singleResult.getObjectField("counts");
reduceCount += counts.getIntField("reduce");
@@ -1295,19 +1322,10 @@ namespace mongo {
}
}
- try {
- // drop collections with tmp results on each shard
- for ( set<ServerAndQuery>::iterator i=servers.begin(); i!=servers.end(); i++ ) {
- ScopedDbConnection conn( i->_server );
- conn->dropCollection( dbName + "." + shardResultCollection );
- conn.done();
- }
- } catch ( std::exception e ) {
- log() << "Cannot cleanup shard results" << causedBy( e ) << endl;
- }
+ cleanUp( servers, dbName, shardResultCollection );
if ( ! ok ) {
- errmsg = "final reduce failed: ";
+ errmsg = "MR post processing failed: ";
errmsg += singleResult.toString();
return 0;
}