summaryrefslogtreecommitdiff
path: root/s
diff options
context:
space:
mode:
authoragirbal <antoine@10gen.com>2011-06-27 13:29:30 -0700
committeragirbal <antoine@10gen.com>2011-06-27 13:30:19 -0700
commitfd3938aba48ae1f61a67c1e379bb1ea1774bf70f (patch)
tree02581331e041e8eaace16e6d4f2b906f93e37e27 /s
parent466da42da2dec2727d4523da5702f41f42fee499 (diff)
downloadmongo-fd3938aba48ae1f61a67c1e379bb1ea1774bf70f.tar.gz
SERVER-2531: REPLACE mode now uses the finishMapReduce cmd on each shard, so that the replace is atomic.
Diffstat (limited to 's')
-rw-r--r--s/commands_public.cpp49
1 files changed, 24 insertions, 25 deletions
diff --git a/s/commands_public.cpp b/s/commands_public.cpp
index 15ddf988341..3afb0621c6f 100644
--- a/s/commands_public.cpp
+++ b/s/commands_public.cpp
@@ -1117,22 +1117,23 @@ namespace mongo {
string outns = config.finalLong;
string tempns;
- if (config.outType == mr_shard::Config::REDUCE) {
- // result will be inserted into a temp collection to post process
- const string postProcessCollection = getTmpName( collection );
- cout << "post process collection is " << postProcessCollection << endl;
- tempns = dbName + "." + postProcessCollection;
- } else if (config.outType == mr_shard::Config::REPLACE) {
- // drop previous collection
- BSONObj dropColCmd = BSON("drop" << config.finalShort);
- BSONObjBuilder dropColResult(32);
- string outdbCmd = outdb + ".$cmd";
- bool res = Command::runAgainstRegistered(outdbCmd.c_str(), dropColCmd, dropColResult);
- if (!res) {
- errmsg = str::stream() << "Could not drop sharded output collection " << outns << ": " << dropColResult.obj().toString();
- return false;
- }
- }
+
+ // result will be inserted into a temp collection to post process
+ const string postProcessCollection = getTmpName( collection );
+ finalCmd.append("postProcessCollection", postProcessCollection);
+ tempns = dbName + "." + postProcessCollection;
+
+// if (config.outType == mr_shard::Config::REPLACE) {
+// // drop previous collection
+// BSONObj dropColCmd = BSON("drop" << config.finalShort);
+// BSONObjBuilder dropColResult(32);
+// string outdbCmd = outdb + ".$cmd";
+// bool res = Command::runAgainstRegistered(outdbCmd.c_str(), dropColCmd, dropColResult);
+// if (!res) {
+// errmsg = str::stream() << "Could not drop sharded output collection " << outns << ": " << dropColResult.obj().toString();
+// return false;
+// }
+// }
BSONObj sortKey = BSON( "_id" << 1 );
if (!conf->isSharded(outns)) {
@@ -1145,9 +1146,6 @@ namespace mongo {
errmsg = str::stream() << "Could not create sharded output collection " << outns << ": " << shardColResult.obj().toString();
return false;
}
-
- // since it's new collection, use replace mode always
- config.outType = mr_shard::Config::REPLACE;
}
ParallelSortClusteredCursor cursor( servers , dbName + "." + shardedOutputCollection ,
@@ -1178,11 +1176,9 @@ namespace mongo {
if (config.outType == mr_shard::Config::MERGE) {
BSONObj id = final["_id"].wrap();
s->updateSharded(conf, outns.c_str(), id, final, UpdateOption_Upsert, true);
- } else if (config.outType == mr_shard::Config::REDUCE) {
- // insert into temp collection, but final collection's sharding
- s->insertSharded(conf, tempns.c_str(), final, 0, true, outns.c_str());
} else {
- s->insertSharded(conf, outns.c_str(), final, 0, true);
+ // insert into temp collection, but using final collection's shard chunks
+ s->insertSharded(conf, tempns.c_str(), final, 0, true, outns.c_str());
}
++finalCount;
values.clear();
@@ -1190,8 +1186,8 @@ namespace mongo {
values.push_back( t );
}
- if (config.outType == mr_shard::Config::REDUCE) {
- // results were written to temp collection, need final reduce
+ if (config.outType == mr_shard::Config::REDUCE || config.outType == mr_shard::Config::REPLACE) {
+ // results were written to temp collection, need post processing
vector< shared_ptr<ShardConnection> > shardConns;
list< shared_ptr<Future::CommandResult> > futures;
BSONObj finalCmdObj = finalCmd.obj();
@@ -1218,6 +1214,9 @@ namespace mongo {
for ( unsigned i=0; i<shardConns.size(); i++ )
shardConns[i]->done();
+
+ if (failed)
+ return 0;
}
for ( set<ServerAndQuery>::iterator i=servers.begin(); i!=servers.end(); i++ ) {