From fd3938aba48ae1f61a67c1e379bb1ea1774bf70f Mon Sep 17 00:00:00 2001 From: agirbal Date: Mon, 27 Jun 2011 13:29:30 -0700 Subject: SERVER-2531: REPLACE mode now uses the finishMapReduce cmd on each shard, so that the replace is atomic. --- s/commands_public.cpp | 49 ++++++++++++++++++++++++------------------------- 1 file changed, 24 insertions(+), 25 deletions(-) (limited to 's') diff --git a/s/commands_public.cpp b/s/commands_public.cpp index 15ddf988341..3afb0621c6f 100644 --- a/s/commands_public.cpp +++ b/s/commands_public.cpp @@ -1117,22 +1117,23 @@ namespace mongo { string outns = config.finalLong; string tempns; - if (config.outType == mr_shard::Config::REDUCE) { - // result will be inserted into a temp collection to post process - const string postProcessCollection = getTmpName( collection ); - cout << "post process collection is " << postProcessCollection << endl; - tempns = dbName + "." + postProcessCollection; - } else if (config.outType == mr_shard::Config::REPLACE) { - // drop previous collection - BSONObj dropColCmd = BSON("drop" << config.finalShort); - BSONObjBuilder dropColResult(32); - string outdbCmd = outdb + ".$cmd"; - bool res = Command::runAgainstRegistered(outdbCmd.c_str(), dropColCmd, dropColResult); - if (!res) { - errmsg = str::stream() << "Could not drop sharded output collection " << outns << ": " << dropColResult.obj().toString(); - return false; - } - } + + // result will be inserted into a temp collection to post process + const string postProcessCollection = getTmpName( collection ); + finalCmd.append("postProcessCollection", postProcessCollection); + tempns = dbName + "." + postProcessCollection; + +// if (config.outType == mr_shard::Config::REPLACE) { +// // drop previous collection +// BSONObj dropColCmd = BSON("drop" << config.finalShort); +// BSONObjBuilder dropColResult(32); +// string outdbCmd = outdb + ".$cmd"; +// bool res = Command::runAgainstRegistered(outdbCmd.c_str(), dropColCmd, dropColResult); +// if (!res) { +// errmsg = str::stream() << "Could not drop sharded output collection " << outns << ": " << dropColResult.obj().toString(); +// return false; +// } +// } BSONObj sortKey = BSON( "_id" << 1 ); if (!conf->isSharded(outns)) { @@ -1145,9 +1146,6 @@ namespace mongo { errmsg = str::stream() << "Could not create sharded output collection " << outns << ": " << shardColResult.obj().toString(); return false; } - - // since it's new collection, use replace mode always - config.outType = mr_shard::Config::REPLACE; } ParallelSortClusteredCursor cursor( servers , dbName + "." + shardedOutputCollection , @@ -1178,11 +1176,9 @@ namespace mongo { if (config.outType == mr_shard::Config::MERGE) { BSONObj id = final["_id"].wrap(); s->updateSharded(conf, outns.c_str(), id, final, UpdateOption_Upsert, true); - } else if (config.outType == mr_shard::Config::REDUCE) { - // insert into temp collection, but final collection's sharding - s->insertSharded(conf, tempns.c_str(), final, 0, true, outns.c_str()); } else { - s->insertSharded(conf, outns.c_str(), final, 0, true); + // insert into temp collection, but using final collection's shard chunks + s->insertSharded(conf, tempns.c_str(), final, 0, true, outns.c_str()); } ++finalCount; values.clear(); @@ -1190,8 +1186,8 @@ namespace mongo { values.push_back( t ); } - if (config.outType == mr_shard::Config::REDUCE) { - // results were written to temp collection, need final reduce + if (config.outType == mr_shard::Config::REDUCE || config.outType == mr_shard::Config::REPLACE) { + // results were written to temp collection, need post processing vector< shared_ptr > shardConns; list< shared_ptr > futures; BSONObj finalCmdObj = finalCmd.obj(); @@ -1218,6 +1214,9 @@ namespace mongo { for ( unsigned i=0; idone(); + + if (failed) + return 0; } for ( set::iterator i=servers.begin(); i!=servers.end(); i++ ) { -- cgit v1.2.1