diff options
author | U-tellus\cwestin <cwestin@10gen.com> | 2011-12-07 10:13:14 -0800 |
---|---|---|
committer | U-tellus\cwestin <cwestin@10gen.com> | 2011-12-07 10:13:14 -0800 |
commit | 5d443029c1d7c18b0d75f5a1b4a5de7bdf230fe6 (patch) | |
tree | 6294223da0144e62388b408d32fd27bf01d7c909 /s | |
parent | 67839bc7e83203027cb7dca299191eb17c0db5c5 (diff) | |
parent | ac4da93fa146a927bd13dc19bb92e9d329463fbc (diff) | |
download | mongo-5d443029c1d7c18b0d75f5a1b4a5de7bdf230fe6.tar.gz |
merge from main
Diffstat (limited to 's')
-rw-r--r-- | s/chunk.cpp | 10 | ||||
-rw-r--r-- | s/chunk.h | 4 | ||||
-rw-r--r-- | s/client.cpp | 1 | ||||
-rw-r--r-- | s/commands_public.cpp | 544 | ||||
-rw-r--r-- | s/config.cpp | 36 | ||||
-rw-r--r-- | s/config.h | 4 | ||||
-rw-r--r-- | s/d_logic.h | 5 | ||||
-rw-r--r-- | s/d_state.cpp | 30 | ||||
-rw-r--r-- | s/dbgrid.vcxproj | 18 | ||||
-rwxr-xr-x | s/dbgrid.vcxproj.filters | 2 | ||||
-rw-r--r-- | s/grid.cpp | 3 | ||||
-rw-r--r-- | s/shard_version.cpp | 8 | ||||
-rw-r--r-- | s/shardconnection.cpp | 8 | ||||
-rw-r--r-- | s/strategy_shard.cpp | 12 | ||||
-rw-r--r-- | s/strategy_single.cpp | 4 |
15 files changed, 412 insertions, 277 deletions
diff --git a/s/chunk.cpp b/s/chunk.cpp index 8fd4f5449a0..49dd84bebf0 100644 --- a/s/chunk.cpp +++ b/s/chunk.cpp @@ -344,6 +344,12 @@ namespace mongo { if ( _dataWritten < splitThreshold / 5 ) return false; + + if ( ! getManager()->_splitTickets.tryAcquire() ) { + LOG(1) << "won't auto split becaue not enough tickets: " << getManager()->getns() << endl; + return false; + } + TicketHolderReleaser releaser( &getManager()->_splitTickets ); // this is a bit ugly // we need it so that mongos blocks for the writes to actually be committed @@ -542,7 +548,9 @@ namespace mongo { // The shard versioning mechanism hinges on keeping track of the number of times we reloaded ChunkManager's. // Increasing this number here will prompt checkShardVersion() to refresh the connection-level versions to // the most up to date value. - _sequenceNumber(++NextSequenceNumber) + _sequenceNumber(++NextSequenceNumber), + + _splitTickets( 5 ) { int tries = 3; diff --git a/s/chunk.h b/s/chunk.h index 24961f1bfcf..3a312575326 100644 --- a/s/chunk.h +++ b/s/chunk.h @@ -317,6 +317,8 @@ namespace mongo { void getAllShards( set<Shard>& all ) const; void getShardsForRange(set<Shard>& shards, const BSONObj& min, const BSONObj& max, bool fullKeyReq = true) const; // [min, max) + ChunkMap getChunkMap() const { return _chunkMap; } + /** * Returns true if, for this shard, the chunks are identical in both chunk managers */ @@ -373,6 +375,8 @@ namespace mongo { const unsigned long long _sequenceNumber; + mutable TicketHolder _splitTickets; // number of concurrent splitVector we can do from a splitIfShould per collection + friend class Chunk; friend class ChunkRangeManager; // only needed for CRM::assertValid() static AtomicUInt NextSequenceNumber; diff --git a/s/client.cpp b/s/client.cpp index a8eb30bb5d6..36063347d85 100644 --- a/s/client.cpp +++ b/s/client.cpp @@ -288,6 +288,7 @@ namespace mongo { catch( std::exception &e ){ warning() << "could not clear last error from a shard " << temp << causedBy( e ) << endl; } + conn.done(); } clearSinceLastGetError(); diff --git a/s/commands_public.cpp b/s/commands_public.cpp index b9f0847cf0a..807eb24e63c 100644 --- a/s/commands_public.cpp +++ b/s/commands_public.cpp @@ -946,7 +946,7 @@ namespace mongo { return ss.str(); } - BSONObj fixForShards( const BSONObj& orig , const string& output, BSONObj& customOut , string& badShardedField ) { + BSONObj fixForShards( const BSONObj& orig , const string& output , string& badShardedField , int maxChunkSizeBytes ) { BSONObjBuilder b; BSONObjIterator i( orig ); while ( i.more() ) { @@ -965,12 +965,6 @@ namespace mongo { else if ( fn == "out" || fn == "finalize" ) { // we don't want to copy these - if (fn == "out" && e.type() == Object) { - // check if there is a custom output - BSONObj out = e.embeddedObject(); -// if (out.hasField("db")) - customOut = out; - } } else { badShardedField = fn; @@ -978,6 +972,12 @@ namespace mongo { } } b.append( "out" , output ); + + if ( maxChunkSizeBytes > 0 ) { + // will need to figure out chunks, ask shards for points + b.append("splitInfo", maxChunkSizeBytes); + } + return b.obj(); } @@ -1000,14 +1000,6 @@ namespace mongo { string collection = cmdObj.firstElement().valuestrsafe(); string fullns = dbName + "." + collection; - const string shardedOutputCollection = getTmpName( collection ); - - string badShardedField; - BSONObj customOut; - BSONObj shardedCommand = fixForShards( cmdObj , shardedOutputCollection, customOut , badShardedField ); - - bool customOutDB = customOut.hasField( "db" ); - // Abort after two retries, m/r is an expensive operation if( retry > 2 ){ errmsg = "shard version errors preventing parallel mapreduce, check logs for further info"; @@ -1018,14 +1010,57 @@ namespace mongo { forceRemoteCheckShardVersionCB( fullns ); } - DBConfigPtr conf = grid.getDBConfig( dbName , false ); + const string shardResultCollection = getTmpName( collection ); - if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ) { - if ( customOutDB ) { - errmsg = "can't use out 'db' with non-sharded db"; - return false; + BSONObj customOut; + string finalColShort; + string finalColLong; + bool customOutDB = false; + string outDB = dbName; + BSONElement outElmt = cmdObj.getField("out"); + if (outElmt.type() == Object) { + // check if there is a custom output + BSONObj out = outElmt.embeddedObject(); + customOut = out; + // mode must be 1st element + finalColShort = out.firstElement().str(); + if (customOut.hasField( "db" )) { + customOutDB = true; + outDB = customOut.getField("db").str(); } - return passthrough( conf , cmdObj , result ); + finalColLong = outDB + "." + finalColShort; + } + + DBConfigPtr confIn = grid.getDBConfig( dbName , false ); + DBConfigPtr confOut = confIn; + if (customOutDB) { + confOut = grid.getDBConfig( outDB , true ); + } + + bool shardedInput = confIn && confIn->isShardingEnabled() && confIn->isSharded( fullns ); + bool shardedOutput = customOut.getBoolField("sharded"); + + if (!shardedOutput) + uassert( 15920 , "Cannot output to a non-sharded collection, a sharded collection exists" , !confOut->isSharded(finalColLong) ); + // should we also prevent going from non-sharded to sharded? during the transition client may see partial data + + long long maxChunkSizeBytes = 0; + if (shardedOutput) { + // will need to figure out chunks, ask shards for points + maxChunkSizeBytes = cmdObj["maxChunkSizeBytes"].numberLong(); + if ( maxChunkSizeBytes == 0 ) { + maxChunkSizeBytes = Chunk::MaxChunkSize; + } + } + + // modify command to run on shards with output to tmp collection + string badShardedField; + assert( maxChunkSizeBytes < 0x7fffffff ); + BSONObj shardedCommand = fixForShards( cmdObj , shardResultCollection , badShardedField, static_cast<int>(maxChunkSizeBytes) ); + + if ( ! shardedInput && ! shardedOutput && ! customOutDB ) { + LOG(1) << "simple MR, just passthrough" << endl; + return passthrough( confIn , cmdObj , result ); } if ( badShardedField.size() ) { @@ -1034,312 +1069,307 @@ namespace mongo { } BSONObjBuilder timingBuilder; - - ChunkManagerPtr cm = conf->getChunkManager( fullns ); - BSONObj q; if ( cmdObj["query"].type() == Object ) { q = cmdObj["query"].embeddedObjectUserCheck(); } set<Shard> shards; - cm->getShardsForQuery( shards , q ); - - - BSONObjBuilder finalCmd; - finalCmd.append( "mapreduce.shardedfinish" , cmdObj ); - finalCmd.append( "shardedOutputCollection" , shardedOutputCollection ); - + if ( shardedInput ) { + ChunkManagerPtr cm = confIn->getChunkManager( fullns ); + cm->getShardsForQuery( shards , q ); + } else { + shards.insert(confIn->getPrimary()); + } + // we need to use our connections to the shard + // so filtering is done correctly for un-owned docs + // so we allocate them in our thread and hand off set<ServerAndQuery> servers; - BSONObj shardCounts; - BSONObj aggCounts; - map<string,long long> countsMap; - { - // we need to use our connections to the shard - // so filtering is done correctly for un-owned docs - // so we allocate them in our thread - // and hand off - // Note: why not use pooled connections? This has been reported to create too many connections - vector< shared_ptr<ShardConnection> > shardConns; - list< shared_ptr<Future::CommandResult> > futures; - - for ( set<Shard>::iterator i=shards.begin(), end=shards.end() ; i != end ; i++ ) { - shared_ptr<ShardConnection> temp( new ShardConnection( i->getConnString() , fullns ) ); - assert( temp->get() ); - futures.push_back( Future::spawnCommand( i->getConnString() , dbName , shardedCommand , 0 , temp->get() ) ); - shardConns.push_back( temp ); - } - - bool failed = false; - - // now wait for the result of all shards - BSONObjBuilder shardResultsB; - BSONObjBuilder shardCountsB; - BSONObjBuilder aggCountsB; - for ( list< shared_ptr<Future::CommandResult> >::iterator i=futures.begin(); i!=futures.end(); i++ ) { + vector< shared_ptr<ShardConnection> > shardConns; + list< shared_ptr<Future::CommandResult> > futures; - BSONObj mrResult; - string server; + for ( set<Shard>::iterator i=shards.begin(), end=shards.end() ; i != end ; i++ ) { + shared_ptr<ShardConnection> temp( new ShardConnection( i->getConnString() , fullns ) ); + assert( temp->get() ); + futures.push_back( Future::spawnCommand( i->getConnString() , dbName , shardedCommand , 0 , temp->get() ) ); + shardConns.push_back( temp ); + } - try { - shared_ptr<Future::CommandResult> res = *i; - if ( ! res->join() ) { - error() << "sharded m/r failed on shard: " << res->getServer() << " error: " << res->result() << endl; - result.append( "cause" , res->result() ); - errmsg = "mongod mr failed: "; - errmsg += res->result().toString(); - failed = true; - continue; - } - mrResult = res->result(); - server = res->getServer(); - } - catch( RecvStaleConfigException& e ){ + bool failed = false; + BSONObjBuilder shardResultsB; + BSONObjBuilder shardCountsB; + BSONObjBuilder aggCountsB; + map<string,long long> countsMap; + set< BSONObj > splitPts; - // TODO: really should kill all the MR ops we sent if possible... + // now wait for the result of all shards + for ( list< shared_ptr<Future::CommandResult> >::iterator i=futures.begin(); i!=futures.end(); i++ ) { - log() << "restarting m/r due to stale config on a shard" << causedBy( e ) << endl; + BSONObj mrResult; + string server; + + try { + shared_ptr<Future::CommandResult> res = *i; + if ( ! res->join() ) { + error() << "sharded m/r failed on shard: " << res->getServer() << " error: " << res->result() << endl; + result.append( "cause" , res->result() ); + errmsg = "mongod mr failed: "; + errmsg += res->result().toString(); + failed = true; + continue; + } + mrResult = res->result(); + server = res->getServer(); + } + catch( RecvStaleConfigException& e ){ + log() << "restarting m/r due to stale config on a shard" << causedBy( e ) << endl; + return run( dbName , cmdObj, errmsg, result, retry + 1 ); + } - return run( dbName , cmdObj, errmsg, result, retry + 1 ); + shardResultsB.append( server , mrResult ); + BSONObj counts = mrResult["counts"].embeddedObjectUserCheck(); + shardCountsB.append( server , counts ); + servers.insert( server ); - } + // add up the counts for each shard + // some of them will be fixed later like output and reduce + BSONObjIterator j( counts ); + while ( j.more() ) { + BSONElement temp = j.next(); + countsMap[temp.fieldName()] += temp.numberLong(); + } - shardResultsB.append( server , mrResult ); - BSONObj counts = mrResult["counts"].embeddedObjectUserCheck(); - shardCountsB.append( server , counts ); - servers.insert( server ); - - // add up the counts for each shard - // some of them will be fixed later like output and reduce - BSONObjIterator j( counts ); - while ( j.more() ) { - BSONElement temp = j.next(); - countsMap[temp.fieldName()] += temp.numberLong(); + if (mrResult.hasField("splitKeys")) { + BSONElement splitKeys = mrResult.getField("splitKeys"); + vector<BSONElement> pts = splitKeys.Array(); + for (vector<BSONElement>::iterator it = pts.begin(); it != pts.end(); ++it) { + splitPts.insert(it->Obj()); } } + } + for ( unsigned i=0; i<shardConns.size(); i++ ) + shardConns[i]->done(); + shardConns.clear(); - for ( unsigned i=0; i<shardConns.size(); i++ ) - shardConns[i]->done(); + if ( failed ) { + return 0; + } - if ( failed ) - return 0; + // build the sharded finish command + BSONObjBuilder finalCmd; + finalCmd.append( "mapreduce.shardedfinish" , cmdObj ); + finalCmd.append( "inputNS" , dbName + "." + shardResultCollection ); - finalCmd.append( "shards" , shardResultsB.obj() ); - shardCounts = shardCountsB.obj(); - finalCmd.append( "shardCounts" , shardCounts ); - timingBuilder.append( "shards" , t.millis() ); + finalCmd.append( "shards" , shardResultsB.obj() ); + BSONObj shardCounts = shardCountsB.obj(); + finalCmd.append( "shardCounts" , shardCounts ); + timingBuilder.append( "shardProcessing" , t.millis() ); - for ( map<string,long long>::iterator i=countsMap.begin(); i!=countsMap.end(); i++ ) { - aggCountsB.append( i->first , i->second ); - } - aggCounts = aggCountsB.obj(); - finalCmd.append( "counts" , aggCounts ); + for ( map<string,long long>::iterator i=countsMap.begin(); i!=countsMap.end(); i++ ) { + aggCountsB.append( i->first , i->second ); } + BSONObj aggCounts = aggCountsB.obj(); + finalCmd.append( "counts" , aggCounts ); Timer t2; - BSONObj finalResult; + BSONObj singleResult; bool ok = false; - string outdb = dbName; - if (customOutDB) { - BSONElement elmt = customOut.getField("db"); - outdb = elmt.valuestrsafe(); - } + long long reduceCount = 0; + long long outputCount = 0; + BSONObjBuilder postCountsB; - if (!customOut.getBoolField("sharded")) { - // non-sharded, use the MRFinish command on target server - // This will save some data transfer - - // by default the target database is same as input - Shard outServer = conf->getPrimary(); - string outns = fullns; - if ( customOutDB ) { - // have to figure out shard for the output DB - DBConfigPtr conf2 = grid.getDBConfig( outdb , true ); - outServer = conf2->getPrimary(); - outns = outdb + "." + collection; - } - log() << "customOut: " << customOut << " outServer: " << outServer << endl; + if (!shardedOutput) { + LOG(1) << "MR with single shard output, NS=" << finalColLong << " primary=" << confOut->getPrimary() << endl; + ShardConnection conn( confOut->getPrimary() , finalColLong ); + ok = conn->runCommand( outDB , finalCmd.obj() , singleResult ); + + BSONObj counts = singleResult.getObjectField("counts"); + postCountsB.append(conn->getServerAddress(), counts); + reduceCount = counts.getIntField("reduce"); + outputCount = counts.getIntField("output"); - ShardConnection conn( outServer , outns ); - ok = conn->runCommand( dbName , finalCmd.obj() , finalResult ); conn.done(); } else { - // grab records from each shard and insert back in correct shard in "temp" collection - // we do the final reduce in mongos since records are ordered and already reduced on each shard -// string shardedIncLong = str::stream() << outdb << ".tmp.mr." << collection << "_" << "shardedTemp" << "_" << time(0) << "_" << JOB_NUMBER++; - - mr_shard::Config config( dbName , cmdObj ); - mr_shard::State state(config); - LOG(1) << "mr sharded output ns: " << config.ns << endl; - if (config.outType == mr_shard::Config::INMEMORY) { - errmsg = "This Map Reduce mode is not supported with sharded output"; - return false; - } - - if (!config.outDB.empty()) { - BSONObjBuilder loc; - if ( !config.outDB.empty()) - loc.append( "db" , config.outDB ); - loc.append( "collection" , config.finalShort ); - result.append("result", loc.obj()); - } - else { - if ( !config.finalShort.empty() ) - result.append( "result" , config.finalShort ); - } - - string outns = config.finalLong; - string tempns; - - // result will be inserted into a temp collection to post process - const string postProcessCollection = getTmpName( collection ); - finalCmd.append("postProcessCollection", postProcessCollection); - tempns = dbName + "." + postProcessCollection; + LOG(1) << "MR with sharded output, NS=" << finalColLong << endl; + // create the sharded collection if needed BSONObj sortKey = BSON( "_id" << 1 ); - if (!conf->isSharded(outns)) { - // create the sharded collection + if (!confOut->isSharded(finalColLong)) { + // make sure db is sharded + if (!confOut->isShardingEnabled()) { + BSONObj shardDBCmd = BSON("enableSharding" << outDB); + BSONObjBuilder shardDBResult(32); + bool res = Command::runAgainstRegistered("admin.$cmd", shardDBCmd, shardDBResult); + if (!res) { + errmsg = str::stream() << "Could not enable sharding on db " << outDB << ": " << shardDBResult.obj().toString(); + return false; + } + } - BSONObj shardColCmd = BSON("shardCollection" << outns << "key" << sortKey); + BSONObj shardColCmd = BSON("shardCollection" << finalColLong << "key" << sortKey); BSONObjBuilder shardColResult(32); bool res = Command::runAgainstRegistered("admin.$cmd", shardColCmd, shardColResult); if (!res) { - errmsg = str::stream() << "Could not create sharded output collection " << outns << ": " << shardColResult.obj().toString(); + errmsg = str::stream() << "Could not create sharded output collection " << finalColLong << ": " << shardColResult.obj().toString(); return false; } - } - ParallelSortClusteredCursor cursor( servers , dbName + "." + shardedOutputCollection , - Query().sort( sortKey ) ); - cursor.init(); - state.init(); - - mr_shard::BSONList values; - long long finalCount = 0; - int currentSize = 0; - map<ChunkPtr, long int> sizePerChunk; - ChunkManagerPtr manager = conf->getChunkManager(outns.c_str()); - - while ( cursor.more() || !values.empty() ) { - BSONObj t; - if ( cursor.more() ) { - t = cursor.next().getOwned(); - - if ( values.size() == 0 || t.woSortOrder( *(values.begin()) , sortKey ) == 0 ) { - values.push_back( t ); - currentSize += t.objsize(); - - // check size and potentially reduce - if (currentSize > config.maxInMemSize && values.size() > config.reduceTriggerRatio) { - BSONObj reduced = config.reducer->finalReduce(values, 0); - values.clear(); - values.push_back( reduced ); - currentSize = reduced.objsize(); + // create initial chunks + // the 1st chunk from MinKey will belong to primary for shard + bool skipPrimary = true; + vector<Shard> allShards; + Shard::getAllShards(allShards); + if ( !splitPts.empty() ) { + int sidx = 0; + int numShards = allShards.size(); + for (set<BSONObj>::iterator it = splitPts.begin(); it != splitPts.end(); ++it) { + BSONObj splitCmd = BSON("split" << finalColLong << "middle" << *it); + BSONObjBuilder splitResult(32); + bool res = Command::runAgainstRegistered("admin.$cmd", splitCmd, splitResult); + if (!res) { + errmsg = str::stream() << "Could not split sharded output collection " << finalColLong << ": " << splitResult.obj().toString(); + return false; } - continue; + + // move to a shard + Shard s = allShards[sidx]; + if (skipPrimary && s == confOut->getPrimary()) { + skipPrimary = false; + sidx = (sidx + 1) % numShards; + s = allShards[sidx]; + } + BSONObj mvCmd = BSON("moveChunk" << finalColLong << "find" << *it << "to" << s.getName()); + BSONObjBuilder mvResult(32); + res = Command::runAgainstRegistered("admin.$cmd", mvCmd, mvResult); + if (!res) { + errmsg = str::stream() << "Could not move chunk for sharded output collection " << finalColLong << ": " << mvResult.obj().toString(); + return false; + } + + sidx = (sidx + 1) % numShards; } } + } - BSONObj final = config.reducer->finalReduce(values, config.finalizer.get()); - // as a future optimization we can have a mode where record goes right into the final collection - // this would be for MERGE mode and could be much more efficient -// if (config.outNonAtomic && config.outType == mr_shard::Config::MERGE) { -// BSONObj id = final["_id"].wrap(); -// s->updateSharded(conf, outns.c_str(), id, final, UpdateOption_Upsert, true); -// } - - // insert into temp collection, but using final collection's shard chunks - ChunkPtr chunk = insertSharded(manager, tempns.c_str(), final, 0, true); - if (chunk) { - sizePerChunk[chunk] += final.objsize(); - } - ++finalCount; - values.clear(); - if (!t.isEmpty()) { - values.push_back( t ); - currentSize = t.objsize(); - } + // group chunks per shard + ChunkManagerPtr cm = confOut->getChunkManager( finalColLong ); + map<Shard, vector<ChunkPtr> > rangesList; + const ChunkMap& chunkMap = cm->getChunkMap(); + for ( ChunkMap::const_iterator it = chunkMap.begin(); it != chunkMap.end(); ++it ) { + ChunkPtr chunk = it->second; + rangesList[chunk->getShard()].push_back(chunk); } - { - // results were written to temp collection, need post processing - vector< shared_ptr<ShardConnection> > shardConns; - list< shared_ptr<Future::CommandResult> > futures; - BSONObj finalCmdObj = finalCmd.obj(); - for ( set<Shard>::iterator i=shards.begin(), end=shards.end() ; i != end ; i++ ) { - shared_ptr<ShardConnection> temp( new ShardConnection( i->getConnString() , outns ) ); - futures.push_back( Future::spawnCommand( i->getConnString() , dbName , finalCmdObj , 0 , temp->get() ) ); - shardConns.push_back( temp ); + // spawn sharded finish jobs on each shard + // command will fetch appropriate results from other shards, do final reduce and post processing + futures.clear(); + BSONObj finalCmdObj = finalCmd.obj(); + + for ( map<Shard, vector<ChunkPtr> >::iterator i=rangesList.begin(), end=rangesList.end() ; i != end ; i++ ) { + Shard shard = i->first; + BSONObjBuilder b; + b.appendElements(finalCmdObj); + BSONArrayBuilder ranges; + for (vector<ChunkPtr>::iterator it = i->second.begin(); it != i->second.end(); ++it) { + ranges.append((*it)->getMin().firstElement()); + ranges.append((*it)->getMax().firstElement()); } + b.append("ranges", ranges.arr()); + shared_ptr<ShardConnection> temp( new ShardConnection( shard.getConnString() , finalColLong ) ); + assert( temp->get() ); + futures.push_back( Future::spawnCommand( shard.getConnString() , outDB , b.obj() , 0 , temp->get() ) ); + shardConns.push_back(temp); + } - // now wait for the result of all shards - bool failed = false; - for ( list< shared_ptr<Future::CommandResult> >::iterator i=futures.begin(); i!=futures.end(); i++ ) { + // now wait for the result of all shards + ok = true; + map<Shard, vector<ChunkPtr> >::iterator rangeIt=rangesList.begin(); + for ( list< shared_ptr<Future::CommandResult> >::iterator i=futures.begin(); i!=futures.end(); ++i, ++rangeIt ) { + string server; + try { shared_ptr<Future::CommandResult> res = *i; if ( ! res->join() ) { - error() << "final reduce on sharded output m/r failed on shard: " << res->getServer() << " error: " << res->result() << endl; + error() << "final reduce failed on shard: " << res->getServer() << " error: " << res->result() << endl; result.append( "cause" , res->result() ); - errmsg = "mongod mr failed: "; + errmsg = "final reduce failed: "; errmsg += res->result().toString(); - failed = true; + ok = false; continue; } - BSONObj result = res->result(); + singleResult = res->result(); + BSONObj counts = singleResult.getObjectField("counts"); + reduceCount += counts.getIntField("reduce"); + outputCount += counts.getIntField("output"); + server = res->getServer(); + postCountsB.append(server, counts); + + // check on splitting, now that results are in the final collection + if (singleResult.hasField("chunkSizes")) { + vector<BSONElement> chunkSizes = singleResult.getField("chunkSizes").Array(); + vector<ChunkPtr> chunks = rangeIt->second; + for (unsigned int i = 0; i < chunkSizes.size(); ++i) { + long long size = chunkSizes[i].numberLong(); + ChunkPtr c = chunks[i]; + assert( size < 0x7fffffff ); + c->splitIfShould(static_cast<int>(size)); + } + } + } + catch( RecvStaleConfigException& e ){ + log() << "final reduce error due to stale config on a shard" << causedBy( e ) << endl; + ok = false; + continue; } - - for ( unsigned i=0; i<shardConns.size(); i++ ) - shardConns[i]->done(); - - if (failed) - return 0; - } - - // check on splitting, now that results are in the final collection - for ( map< ChunkPtr, long int >::iterator it = sizePerChunk.begin(); it != sizePerChunk.end(); ++it ) { - ChunkPtr c = it->first; - long int size = it->second; - cout << "Splitting for chunk " << c << " with size " << size; - c->splitIfShould(size); } + } + try { // drop collections with tmp results on each shard for ( set<ServerAndQuery>::iterator i=servers.begin(); i!=servers.end(); i++ ) { ScopedDbConnection conn( i->_server ); - conn->dropCollection( dbName + "." + shardedOutputCollection ); + conn->dropCollection( dbName + "." + shardResultCollection ); conn.done(); } - - result.append("shardCounts", shardCounts); - - // fix the global counts - BSONObjBuilder countsB(32); - BSONObjIterator j(aggCounts); - while (j.more()) { - BSONElement elmt = j.next(); - if (!strcmp(elmt.fieldName(), "reduce")) - countsB.append("reduce", elmt.numberLong() + state.numReduces()); - else if (!strcmp(elmt.fieldName(), "output")) - countsB.append("output", finalCount); - else - countsB.append(elmt); - } - result.append( "counts" , countsB.obj() ); - ok = true; + } catch ( std::exception e ) { + log() << "Cannot cleanup shard results" << causedBy( e ) << endl; } + for ( unsigned i=0; i<shardConns.size(); i++ ) + shardConns[i]->done(); + if ( ! ok ) { errmsg = "final reduce failed: "; - errmsg += finalResult.toString(); + errmsg += singleResult.toString(); return 0; } - timingBuilder.append( "final" , t2.millis() ); - result.appendElements( finalResult ); + // copy some elements from a single result + // annoying that we have to copy all results for inline, but no way around it + if (singleResult.hasField("result")) + result.append(singleResult.getField("result")); + else if (singleResult.hasField("results")) + result.append(singleResult.getField("results")); + + BSONObjBuilder countsB(32); + // input stat is determined by aggregate MR job + countsB.append("input", aggCounts.getField("input").numberLong()); + countsB.append("emit", aggCounts.getField("emit").numberLong()); + + // reduce count is sum of all reduces that happened + countsB.append("reduce", aggCounts.getField("reduce").numberLong() + reduceCount); + + // ouput is determined by post processing on each shard + countsB.append("output", outputCount); + result.append( "counts" , countsB.obj() ); + + timingBuilder.append( "postProcessing" , t2.millis() ); + result.append( "timeMillis" , t.millis() ); result.append( "timing" , timingBuilder.obj() ); - + result.append("shardCounts", shardCounts); + result.append("postProcessCounts", postCountsB.obj()); return 1; } } mrCmd; diff --git a/s/config.cpp b/s/config.cpp index 24d16aadb69..22516eb46a4 100644 --- a/s/config.cpp +++ b/s/config.cpp @@ -214,10 +214,11 @@ namespace mongo { assert( ! key.isEmpty() ); + BSONObj newest; if ( oldVersion > 0 && ! forceReload ) { ScopedDbConnection conn( configServer.modelServer() , 30.0 ); - BSONObj newest = conn->findOne( ShardNS::chunk , - Query( BSON( "ns" << ns ) ).sort( "lastmod" , -1 ) ); + newest = conn->findOne( ShardNS::chunk , + Query( BSON( "ns" << ns ) ).sort( "lastmod" , -1 ) ); conn.done(); if ( ! newest.isEmpty() ) { @@ -238,11 +239,32 @@ namespace mongo { // we are not locked now, and want to load a new ChunkManager - auto_ptr<ChunkManager> temp( new ChunkManager( ns , key , unique ) ); - if ( temp->numChunks() == 0 ) { - // maybe we're not sharded any more - reload(); // this is a full reload - return getChunkManager( ns , false ); + auto_ptr<ChunkManager> temp; + + { + scoped_lock lll ( _hitConfigServerLock ); + + if ( ! newest.isEmpty() && ! forceReload ) { + // if we have a target we're going for + // see if we've hit already + + scoped_lock lk( _lock ); + CollectionInfo& ci = _collections[ns]; + if ( ci.isSharded() && ci.getCM() ) { + ShardChunkVersion currentVersion = newest["lastmod"]; + if ( currentVersion == ci.getCM()->getVersion() ) { + return ci.getCM(); + } + } + + } + + temp.reset( new ChunkManager( ns , key , unique ) ); + if ( temp->numChunks() == 0 ) { + // maybe we're not sharded any more + reload(); // this is a full reload + return getChunkManager( ns , false ); + } } scoped_lock lk( _lock ); diff --git a/s/config.h b/s/config.h index 3b7eb9570ba..0374bcb52be 100644 --- a/s/config.h +++ b/s/config.h @@ -115,7 +115,8 @@ namespace mongo { : _name( name ) , _primary("config","") , _shardingEnabled(false), - _lock("DBConfig") { + _lock("DBConfig") , + _hitConfigServerLock( "DBConfig::_hitConfigServerLock" ) { assert( name.size() ); } virtual ~DBConfig() {} @@ -195,6 +196,7 @@ namespace mongo { Collections _collections; mutable mongo::mutex _lock; // TODO: change to r/w lock ?? + mutable mongo::mutex _hitConfigServerLock; }; class ConfigServer : public DBConfig { diff --git a/s/d_logic.h b/s/d_logic.h index d96f937756f..ade02b21e80 100644 --- a/s/d_logic.h +++ b/s/d_logic.h @@ -31,7 +31,6 @@ namespace mongo { class DiskLoc; typedef ShardChunkVersion ConfigVersion; - typedef map<string,ConfigVersion> NSVersionMap; // -------------- // --- global state --- @@ -185,9 +184,11 @@ namespace mongo { private: OID _id; - NSVersionMap _versions; bool _forceVersionOk; // if this is true, then chunk version #s aren't check, and all ops are allowed + typedef map<string,ConfigVersion> NSVersionMap; + NSVersionMap _versions; + static boost::thread_specific_ptr<ShardedConnectionInfo> _tl; }; diff --git a/s/d_state.cpp b/s/d_state.cpp index 393df986533..39d84b6ff88 100644 --- a/s/d_state.cpp +++ b/s/d_state.cpp @@ -29,7 +29,7 @@ #include "../db/commands.h" #include "../db/jsobj.h" #include "../db/db.h" - +#include "../db/replutil.h" #include "../client/connpool.h" #include "../util/queue.h" @@ -396,6 +396,7 @@ namespace mongo { help << " example: { setShardVersion : 'alleyinsider.foo' , version : 1 , configdb : '' } "; } + virtual bool slaveOk() const { return true; } virtual LockType locktype() const { return NONE; } bool checkConfigOrInit( const string& configdb , bool authoritative , string& errmsg , BSONObjBuilder& result , bool locked=false ) const { @@ -492,6 +493,13 @@ namespace mongo { return true; } + // we can run on a slave up to here + if ( ! isMaster( "admin" ) ) { + result.append( "errmsg" , "not master" ); + result.append( "note" , "from post init in setShardVersion" ); + return false; + } + // step 2 string ns = cmdObj["setShardVersion"].valuestrsafe(); @@ -518,8 +526,19 @@ namespace mongo { if ( version == globalVersion ) { // mongos and mongod agree! if ( oldVersion != version ) { - assert( oldVersion < globalVersion ); - info->setVersion( ns , version ); + if ( oldVersion < globalVersion ) { + info->setVersion( ns , version ); + } + else if ( authoritative ) { + // this means there was a drop and our version is reset + info->setVersion( ns , version ); + } + else { + result.append( "ns" , ns ); + result.appendBool( "need_authoritative" , true ); + errmsg = "verifying drop on '" + ns + "'"; + return false; + } } return true; } @@ -668,6 +687,11 @@ namespace mongo { if ( ! shardingState.enabled() ) return true; + if ( ! isMasterNs( ns.c_str() ) ) { + // right now connections to secondaries aren't versioned at all + return true; + } + ShardedConnectionInfo* info = ShardedConnectionInfo::get( false ); if ( ! info ) { diff --git a/s/dbgrid.vcxproj b/s/dbgrid.vcxproj index 584757abbf3..a576dc7a6a7 100644 --- a/s/dbgrid.vcxproj +++ b/s/dbgrid.vcxproj @@ -112,6 +112,10 @@ <SubSystem>Console</SubSystem>
<TargetMachine>MachineX86</TargetMachine>
</Link>
+ <PreBuildEvent>
+ <Command>cscript //Nologo ..\shell\msvc\createCPPfromJavaScriptFiles.js "$(ProjectDir).."</Command>
+ <Message>Create mongo.cpp and mongo-server.cpp from JavaScript source files</Message>
+ </PreBuildEvent>
</ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">
<ClCompile>
@@ -134,6 +138,10 @@ <GenerateDebugInformation>true</GenerateDebugInformation>
<SubSystem>Console</SubSystem>
</Link>
+ <PreBuildEvent>
+ <Command>cscript //Nologo ..\shell\msvc\createCPPfromJavaScriptFiles.js "$(ProjectDir).."</Command>
+ <Message>Create mongo.cpp and mongo-server.cpp from JavaScript source files</Message>
+ </PreBuildEvent>
</ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
<ClCompile>
@@ -160,6 +168,10 @@ <EnableCOMDATFolding>true</EnableCOMDATFolding>
<TargetMachine>MachineX86</TargetMachine>
</Link>
+ <PreBuildEvent>
+ <Command>cscript //Nologo ..\shell\msvc\createCPPfromJavaScriptFiles.js "$(ProjectDir).."</Command>
+ <Message>Create mongo.cpp and mongo-server.cpp from JavaScript source files</Message>
+ </PreBuildEvent>
</ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
<ClCompile>
@@ -185,6 +197,10 @@ <OptimizeReferences>true</OptimizeReferences>
<EnableCOMDATFolding>true</EnableCOMDATFolding>
</Link>
+ <PreBuildEvent>
+ <Command>cscript //Nologo ..\shell\msvc\createCPPfromJavaScriptFiles.js "$(ProjectDir).."</Command>
+ <Message>Create mongo.cpp and mongo-server.cpp from JavaScript source files</Message>
+ </PreBuildEvent>
</ItemDefinitionGroup>
<ItemGroup>
<ClCompile Include="..\bson\oid.cpp" />
@@ -533,7 +549,7 @@ <ClCompile Include="..\util\net\message_server_port.cpp" />
<ClCompile Include="..\util\mmap.cpp" />
<ClCompile Include="..\util\mmap_win.cpp" />
- <ClCompile Include="..\shell\mongo_vstudio.cpp">
+ <ClCompile Include="..\shell\mongo.cpp">
<PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
</PrecompiledHeader>
<PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">
diff --git a/s/dbgrid.vcxproj.filters b/s/dbgrid.vcxproj.filters index 1e204805d74..1f823d53f09 100755 --- a/s/dbgrid.vcxproj.filters +++ b/s/dbgrid.vcxproj.filters @@ -143,7 +143,7 @@ <ClCompile Include="..\util\mmap_win.cpp">
<Filter>Shared Source Files</Filter>
</ClCompile>
- <ClCompile Include="..\shell\mongo_vstudio.cpp">
+ <ClCompile Include="..\shell\mongo.cpp">
<Filter>Shared Source Files</Filter>
</ClCompile>
<ClCompile Include="..\db\nonce.cpp">
diff --git a/s/grid.cpp b/s/grid.cpp index 8dd5bd411af..c14feb50a03 100644 --- a/s/grid.cpp +++ b/s/grid.cpp @@ -22,6 +22,7 @@ #include "../client/connpool.h" #include "../util/stringutils.h" #include "../util/unittest.h" +#include "../db/namespace_common.h" #include "grid.h" #include "shard.h" @@ -38,6 +39,8 @@ namespace mongo { if ( database == "config" ) return configServerPtr; + uassert( 15918 , str::stream() << "invalid database name: " << database , NamespaceString::validDBName( database ) ); + scoped_lock l( _lock ); DBConfigPtr& cc = _databases[database]; diff --git a/s/shard_version.cpp b/s/shard_version.cpp index 8e51b2aa104..0d669fe216c 100644 --- a/s/shard_version.cpp +++ b/s/shard_version.cpp @@ -136,6 +136,14 @@ namespace mongo { bool ok = conn->runCommand( "admin" , cmd , result ); + // HACK for backwards compatibility with v1.8.x, v2.0.0 and v2.0.1 + // Result is false, but will still initialize serverID and configdb + if( ! ok && ! result["errmsg"].eoo() && ( result["errmsg"].String() == "need to specify namespace"/* 2.0.1/2 */ || + result["errmsg"].String() == "need to speciy namespace" /* 1.8 */ )) + { + ok = true; + } + LOG(3) << "initial sharding result : " << result << endl; return ok; diff --git a/s/shardconnection.cpp b/s/shardconnection.cpp index dff828a0d39..b759284f94a 100644 --- a/s/shardconnection.cpp +++ b/s/shardconnection.cpp @@ -105,7 +105,13 @@ namespace mongo { if ( s->avail ) { DBClientBase* c = s->avail; s->avail = 0; - shardConnectionPool.onHandedOut( c ); + try { + shardConnectionPool.onHandedOut( c ); + } + catch ( std::exception& ) { + delete c; + throw; + } return c; } diff --git a/s/strategy_shard.cpp b/s/strategy_shard.cpp index d5a65b4a703..6a19b990458 100644 --- a/s/strategy_shard.cpp +++ b/s/strategy_shard.cpp @@ -73,14 +73,22 @@ namespace mongo { assert( cursor ); try { + long long start_millis = 0; + if ( query.isExplain() ) start_millis = curTimeMillis64(); cursor->init(); LOG(5) << " cursor type: " << cursor->type() << endl; shardedCursorTypes.hit( cursor->type() ); if ( query.isExplain() ) { - BSONObj explain = cursor->explain(); - replyToQuery( 0 , r.p() , r.m() , explain ); + // fetch elapsed time for the query + long long elapsed_millis = curTimeMillis64() - start_millis; + BSONObjBuilder explain_builder; + cursor->explain( explain_builder ); + explain_builder.appendNumber( "millis", elapsed_millis ); + BSONObj b = explain_builder.obj(); + + replyToQuery( 0 , r.p() , r.m() , b ); delete( cursor ); return; } diff --git a/s/strategy_single.cpp b/s/strategy_single.cpp index 1e3e894d2a8..d9c2b03fae0 100644 --- a/s/strategy_single.cpp +++ b/s/strategy_single.cpp @@ -178,6 +178,8 @@ namespace mongo { return false; ns += 10; + r.checkAuth( Auth::WRITE ); + BSONObjBuilder b; vector<Shard> shards; @@ -220,7 +222,7 @@ namespace mongo { } else if ( strcmp( ns , "killop" ) == 0 ) { BSONElement e = q.query["op"]; - if ( strstr( r.getns() , "admin." ) != 0 ) { + if ( strstr( r.getns() , "admin." ) == 0 ) { b.append( "err" , "unauthorized" ); } else if ( e.type() != String ) { |