diff options
author | agirbal <antoine@10gen.com> | 2011-06-27 13:29:30 -0700 |
---|---|---|
committer | agirbal <antoine@10gen.com> | 2011-06-27 13:30:19 -0700 |
commit | fd3938aba48ae1f61a67c1e379bb1ea1774bf70f (patch) | |
tree | 02581331e041e8eaace16e6d4f2b906f93e37e27 /s | |
parent | 466da42da2dec2727d4523da5702f41f42fee499 (diff) | |
download | mongo-fd3938aba48ae1f61a67c1e379bb1ea1774bf70f.tar.gz |
SERVER-2531: REPLACE mode now uses the finishMapReduce cmd on each shard, so that the replace is atomic.
Diffstat (limited to 's')
-rw-r--r-- | s/commands_public.cpp | 49 |
1 files changed, 24 insertions, 25 deletions
diff --git a/s/commands_public.cpp b/s/commands_public.cpp index 15ddf988341..3afb0621c6f 100644 --- a/s/commands_public.cpp +++ b/s/commands_public.cpp @@ -1117,22 +1117,23 @@ namespace mongo { string outns = config.finalLong; string tempns; - if (config.outType == mr_shard::Config::REDUCE) { - // result will be inserted into a temp collection to post process - const string postProcessCollection = getTmpName( collection ); - cout << "post process collection is " << postProcessCollection << endl; - tempns = dbName + "." + postProcessCollection; - } else if (config.outType == mr_shard::Config::REPLACE) { - // drop previous collection - BSONObj dropColCmd = BSON("drop" << config.finalShort); - BSONObjBuilder dropColResult(32); - string outdbCmd = outdb + ".$cmd"; - bool res = Command::runAgainstRegistered(outdbCmd.c_str(), dropColCmd, dropColResult); - if (!res) { - errmsg = str::stream() << "Could not drop sharded output collection " << outns << ": " << dropColResult.obj().toString(); - return false; - } - } + + // result will be inserted into a temp collection to post process + const string postProcessCollection = getTmpName( collection ); + finalCmd.append("postProcessCollection", postProcessCollection); + tempns = dbName + "." + postProcessCollection; + +// if (config.outType == mr_shard::Config::REPLACE) { +// // drop previous collection +// BSONObj dropColCmd = BSON("drop" << config.finalShort); +// BSONObjBuilder dropColResult(32); +// string outdbCmd = outdb + ".$cmd"; +// bool res = Command::runAgainstRegistered(outdbCmd.c_str(), dropColCmd, dropColResult); +// if (!res) { +// errmsg = str::stream() << "Could not drop sharded output collection " << outns << ": " << dropColResult.obj().toString(); +// return false; +// } +// } BSONObj sortKey = BSON( "_id" << 1 ); if (!conf->isSharded(outns)) { @@ -1145,9 +1146,6 @@ namespace mongo { errmsg = str::stream() << "Could not create sharded output collection " << outns << ": " << shardColResult.obj().toString(); return false; } - - // since it's new collection, use replace mode always - config.outType = mr_shard::Config::REPLACE; } ParallelSortClusteredCursor cursor( servers , dbName + "." + shardedOutputCollection , @@ -1178,11 +1176,9 @@ namespace mongo { if (config.outType == mr_shard::Config::MERGE) { BSONObj id = final["_id"].wrap(); s->updateSharded(conf, outns.c_str(), id, final, UpdateOption_Upsert, true); - } else if (config.outType == mr_shard::Config::REDUCE) { - // insert into temp collection, but final collection's sharding - s->insertSharded(conf, tempns.c_str(), final, 0, true, outns.c_str()); } else { - s->insertSharded(conf, outns.c_str(), final, 0, true); + // insert into temp collection, but using final collection's shard chunks + s->insertSharded(conf, tempns.c_str(), final, 0, true, outns.c_str()); } ++finalCount; values.clear(); @@ -1190,8 +1186,8 @@ namespace mongo { values.push_back( t ); } - if (config.outType == mr_shard::Config::REDUCE) { - // results were written to temp collection, need final reduce + if (config.outType == mr_shard::Config::REDUCE || config.outType == mr_shard::Config::REPLACE) { + // results were written to temp collection, need post processing vector< shared_ptr<ShardConnection> > shardConns; list< shared_ptr<Future::CommandResult> > futures; BSONObj finalCmdObj = finalCmd.obj(); @@ -1218,6 +1214,9 @@ namespace mongo { for ( unsigned i=0; i<shardConns.size(); i++ ) shardConns[i]->done(); + + if (failed) + return 0; } for ( set<ServerAndQuery>::iterator i=servers.begin(); i!=servers.end(); i++ ) { |