summaryrefslogtreecommitdiff
path: root/s
diff options
context:
space:
mode:
authorGreg Studer <greg@10gen.com>2011-12-14 14:18:08 -0500
committerGreg Studer <greg@10gen.com>2011-12-14 14:34:53 -0500
commit082a92b92097041174a5bd32a4dcd352b5ae3a00 (patch)
treef3016900788d0e57e2882be09d372b43caf5f54a /s
parentb8f86e566213a441fbf2b950a70f8daaf44c6e41 (diff)
downloadmongo-082a92b92097041174a5bd32a4dcd352b5ae3a00.tar.gz
SERVER-4215 use consistent cursor for map phase
Diffstat (limited to 's')
-rw-r--r--s/commands_public.cpp61
1 files changed, 14 insertions, 47 deletions
diff --git a/s/commands_public.cpp b/s/commands_public.cpp
index dd813d5c222..278244c5a02 100644
--- a/s/commands_public.cpp
+++ b/s/commands_public.cpp
@@ -1097,57 +1097,27 @@ namespace mongo {
}
set<Shard> shards;
- if ( shardedInput ) {
- ChunkManagerPtr cm = confIn->getChunkManager( fullns );
- cm->getShardsForQuery( shards , q );
- } else {
- shards.insert(confIn->getPrimary());
- }
-
- // we need to use our connections to the shard
- // so filtering is done correctly for un-owned docs
- // so we allocate them in our thread and hand off
set<ServerAndQuery> servers;
- vector< shared_ptr<ShardConnection> > shardConns;
- list< shared_ptr<Future::CommandResult> > futures;
- for ( set<Shard>::iterator i=shards.begin(), end=shards.end() ; i != end ; i++ ) {
- shared_ptr<ShardConnection> temp( new ShardConnection( i->getConnString() , fullns ) );
- assert( temp->get() );
- futures.push_back( Future::spawnCommand( i->getConnString() , dbName , shardedCommand , 0 , temp->get() ) );
- shardConns.push_back( temp );
+ map<Shard,BSONObj> results;
+ try {
+ SHARDED->commandOp( dbName, shardedCommand, 0, fullns, q, results );
+ }
+ catch( DBException& e ){
+ e.addContext( str::stream() << "could not run map command on all shards for ns " << fullns << " and query " << q );
+ throw;
}
- bool failed = false;
BSONObjBuilder shardResultsB;
BSONObjBuilder shardCountsB;
BSONObjBuilder aggCountsB;
map<string,long long> countsMap;
set< BSONObj > splitPts;
- // now wait for the result of all shards
- for ( list< shared_ptr<Future::CommandResult> >::iterator i=futures.begin(); i!=futures.end(); i++ ) {
+ for ( map<Shard,BSONObj>::iterator i = results.begin(); i != results.end(); ++i ){
- BSONObj mrResult;
- string server;
-
- try {
- shared_ptr<Future::CommandResult> res = *i;
- if ( ! res->join() ) {
- error() << "sharded m/r failed on shard: " << res->getServer() << " error: " << res->result() << endl;
- result.append( "cause" , res->result() );
- errmsg = "mongod mr failed: ";
- errmsg += res->result().toString();
- failed = true;
- continue;
- }
- mrResult = res->result();
- server = res->getServer();
- }
- catch( RecvStaleConfigException& e ){
- log() << "restarting m/r due to stale config on a shard" << causedBy( e ) << endl;
- return run( dbName , cmdObj, errmsg, result, retry + 1 );
- }
+ BSONObj mrResult = i->second;
+ string server = i->first.getConnString();
shardResultsB.append( server , mrResult );
BSONObj counts = mrResult["counts"].embeddedObjectUserCheck();
@@ -1170,13 +1140,6 @@ namespace mongo {
}
}
}
- for ( unsigned i=0; i<shardConns.size(); i++ )
- shardConns[i]->done();
- shardConns.clear();
-
- if ( failed ) {
- return 0;
- }
// build the sharded finish command
BSONObjBuilder finalCmd;
@@ -1201,6 +1164,9 @@ namespace mongo {
long long outputCount = 0;
BSONObjBuilder postCountsB;
+ // Still need for legacy reasons
+ vector<shared_ptr<ShardConnection> > shardConns;
+
if (!shardedOutput) {
LOG(1) << "MR with single shard output, NS=" << finalColLong << " primary=" << confOut->getPrimary() << endl;
ShardConnection conn( confOut->getPrimary() , finalColLong );
@@ -1282,6 +1248,7 @@ namespace mongo {
// spawn sharded finish jobs on each shard
// command will fetch appropriate results from other shards, do final reduce and post processing
+ list<shared_ptr<Future::CommandResult> > futures;
futures.clear();
BSONObj finalCmdObj = finalCmd.obj();