diff options
author | Greg Studer <greg@10gen.com> | 2011-12-14 14:18:08 -0500 |
---|---|---|
committer | Greg Studer <greg@10gen.com> | 2011-12-14 14:34:53 -0500 |
commit | 082a92b92097041174a5bd32a4dcd352b5ae3a00 (patch) | |
tree | f3016900788d0e57e2882be09d372b43caf5f54a /s | |
parent | b8f86e566213a441fbf2b950a70f8daaf44c6e41 (diff) | |
download | mongo-082a92b92097041174a5bd32a4dcd352b5ae3a00.tar.gz |
SERVER-4215 use consistent cursor for map phase
Diffstat (limited to 's')
-rw-r--r-- | s/commands_public.cpp | 61 |
1 files changed, 14 insertions, 47 deletions
diff --git a/s/commands_public.cpp b/s/commands_public.cpp index dd813d5c222..278244c5a02 100644 --- a/s/commands_public.cpp +++ b/s/commands_public.cpp @@ -1097,57 +1097,27 @@ namespace mongo { } set<Shard> shards; - if ( shardedInput ) { - ChunkManagerPtr cm = confIn->getChunkManager( fullns ); - cm->getShardsForQuery( shards , q ); - } else { - shards.insert(confIn->getPrimary()); - } - - // we need to use our connections to the shard - // so filtering is done correctly for un-owned docs - // so we allocate them in our thread and hand off set<ServerAndQuery> servers; - vector< shared_ptr<ShardConnection> > shardConns; - list< shared_ptr<Future::CommandResult> > futures; - for ( set<Shard>::iterator i=shards.begin(), end=shards.end() ; i != end ; i++ ) { - shared_ptr<ShardConnection> temp( new ShardConnection( i->getConnString() , fullns ) ); - assert( temp->get() ); - futures.push_back( Future::spawnCommand( i->getConnString() , dbName , shardedCommand , 0 , temp->get() ) ); - shardConns.push_back( temp ); + map<Shard,BSONObj> results; + try { + SHARDED->commandOp( dbName, shardedCommand, 0, fullns, q, results ); + } + catch( DBException& e ){ + e.addContext( str::stream() << "could not run map command on all shards for ns " << fullns << " and query " << q ); + throw; } - bool failed = false; BSONObjBuilder shardResultsB; BSONObjBuilder shardCountsB; BSONObjBuilder aggCountsB; map<string,long long> countsMap; set< BSONObj > splitPts; - // now wait for the result of all shards - for ( list< shared_ptr<Future::CommandResult> >::iterator i=futures.begin(); i!=futures.end(); i++ ) { + for ( map<Shard,BSONObj>::iterator i = results.begin(); i != results.end(); ++i ){ - BSONObj mrResult; - string server; - - try { - shared_ptr<Future::CommandResult> res = *i; - if ( ! res->join() ) { - error() << "sharded m/r failed on shard: " << res->getServer() << " error: " << res->result() << endl; - result.append( "cause" , res->result() ); - errmsg = "mongod mr failed: "; - errmsg += res->result().toString(); - failed = true; - continue; - } - mrResult = res->result(); - server = res->getServer(); - } - catch( RecvStaleConfigException& e ){ - log() << "restarting m/r due to stale config on a shard" << causedBy( e ) << endl; - return run( dbName , cmdObj, errmsg, result, retry + 1 ); - } + BSONObj mrResult = i->second; + string server = i->first.getConnString(); shardResultsB.append( server , mrResult ); BSONObj counts = mrResult["counts"].embeddedObjectUserCheck(); @@ -1170,13 +1140,6 @@ namespace mongo { } } } - for ( unsigned i=0; i<shardConns.size(); i++ ) - shardConns[i]->done(); - shardConns.clear(); - - if ( failed ) { - return 0; - } // build the sharded finish command BSONObjBuilder finalCmd; @@ -1201,6 +1164,9 @@ namespace mongo { long long outputCount = 0; BSONObjBuilder postCountsB; + // Still need for legacy reasons + vector<shared_ptr<ShardConnection> > shardConns; + if (!shardedOutput) { LOG(1) << "MR with single shard output, NS=" << finalColLong << " primary=" << confOut->getPrimary() << endl; ShardConnection conn( confOut->getPrimary() , finalColLong ); @@ -1282,6 +1248,7 @@ namespace mongo { // spawn sharded finish jobs on each shard // command will fetch appropriate results from other shards, do final reduce and post processing + list<shared_ptr<Future::CommandResult> > futures; futures.clear(); BSONObj finalCmdObj = finalCmd.obj(); |