summaryrefslogtreecommitdiff
path: root/s
diff options
context:
space:
mode:
authorU-tellus\cwestin <cwestin@10gen.com>2011-12-07 10:13:14 -0800
committerU-tellus\cwestin <cwestin@10gen.com>2011-12-07 10:13:14 -0800
commit5d443029c1d7c18b0d75f5a1b4a5de7bdf230fe6 (patch)
tree6294223da0144e62388b408d32fd27bf01d7c909 /s
parent67839bc7e83203027cb7dca299191eb17c0db5c5 (diff)
parentac4da93fa146a927bd13dc19bb92e9d329463fbc (diff)
downloadmongo-5d443029c1d7c18b0d75f5a1b4a5de7bdf230fe6.tar.gz
merge from main
Diffstat (limited to 's')
-rw-r--r--s/chunk.cpp10
-rw-r--r--s/chunk.h4
-rw-r--r--s/client.cpp1
-rw-r--r--s/commands_public.cpp544
-rw-r--r--s/config.cpp36
-rw-r--r--s/config.h4
-rw-r--r--s/d_logic.h5
-rw-r--r--s/d_state.cpp30
-rw-r--r--s/dbgrid.vcxproj18
-rwxr-xr-xs/dbgrid.vcxproj.filters2
-rw-r--r--s/grid.cpp3
-rw-r--r--s/shard_version.cpp8
-rw-r--r--s/shardconnection.cpp8
-rw-r--r--s/strategy_shard.cpp12
-rw-r--r--s/strategy_single.cpp4
15 files changed, 412 insertions, 277 deletions
diff --git a/s/chunk.cpp b/s/chunk.cpp
index 8fd4f5449a0..49dd84bebf0 100644
--- a/s/chunk.cpp
+++ b/s/chunk.cpp
@@ -344,6 +344,12 @@ namespace mongo {
if ( _dataWritten < splitThreshold / 5 )
return false;
+
+ if ( ! getManager()->_splitTickets.tryAcquire() ) {
+ LOG(1) << "won't auto split becaue not enough tickets: " << getManager()->getns() << endl;
+ return false;
+ }
+ TicketHolderReleaser releaser( &getManager()->_splitTickets );
// this is a bit ugly
// we need it so that mongos blocks for the writes to actually be committed
@@ -542,7 +548,9 @@ namespace mongo {
// The shard versioning mechanism hinges on keeping track of the number of times we reloaded ChunkManager's.
// Increasing this number here will prompt checkShardVersion() to refresh the connection-level versions to
// the most up to date value.
- _sequenceNumber(++NextSequenceNumber)
+ _sequenceNumber(++NextSequenceNumber),
+
+ _splitTickets( 5 )
{
int tries = 3;
diff --git a/s/chunk.h b/s/chunk.h
index 24961f1bfcf..3a312575326 100644
--- a/s/chunk.h
+++ b/s/chunk.h
@@ -317,6 +317,8 @@ namespace mongo {
void getAllShards( set<Shard>& all ) const;
void getShardsForRange(set<Shard>& shards, const BSONObj& min, const BSONObj& max, bool fullKeyReq = true) const; // [min, max)
+ ChunkMap getChunkMap() const { return _chunkMap; }
+
/**
* Returns true if, for this shard, the chunks are identical in both chunk managers
*/
@@ -373,6 +375,8 @@ namespace mongo {
const unsigned long long _sequenceNumber;
+ mutable TicketHolder _splitTickets; // number of concurrent splitVector we can do from a splitIfShould per collection
+
friend class Chunk;
friend class ChunkRangeManager; // only needed for CRM::assertValid()
static AtomicUInt NextSequenceNumber;
diff --git a/s/client.cpp b/s/client.cpp
index a8eb30bb5d6..36063347d85 100644
--- a/s/client.cpp
+++ b/s/client.cpp
@@ -288,6 +288,7 @@ namespace mongo {
catch( std::exception &e ){
warning() << "could not clear last error from a shard " << temp << causedBy( e ) << endl;
}
+ conn.done();
}
clearSinceLastGetError();
diff --git a/s/commands_public.cpp b/s/commands_public.cpp
index b9f0847cf0a..807eb24e63c 100644
--- a/s/commands_public.cpp
+++ b/s/commands_public.cpp
@@ -946,7 +946,7 @@ namespace mongo {
return ss.str();
}
- BSONObj fixForShards( const BSONObj& orig , const string& output, BSONObj& customOut , string& badShardedField ) {
+ BSONObj fixForShards( const BSONObj& orig , const string& output , string& badShardedField , int maxChunkSizeBytes ) {
BSONObjBuilder b;
BSONObjIterator i( orig );
while ( i.more() ) {
@@ -965,12 +965,6 @@ namespace mongo {
else if ( fn == "out" ||
fn == "finalize" ) {
// we don't want to copy these
- if (fn == "out" && e.type() == Object) {
- // check if there is a custom output
- BSONObj out = e.embeddedObject();
-// if (out.hasField("db"))
- customOut = out;
- }
}
else {
badShardedField = fn;
@@ -978,6 +972,12 @@ namespace mongo {
}
}
b.append( "out" , output );
+
+ if ( maxChunkSizeBytes > 0 ) {
+ // will need to figure out chunks, ask shards for points
+ b.append("splitInfo", maxChunkSizeBytes);
+ }
+
return b.obj();
}
@@ -1000,14 +1000,6 @@ namespace mongo {
string collection = cmdObj.firstElement().valuestrsafe();
string fullns = dbName + "." + collection;
- const string shardedOutputCollection = getTmpName( collection );
-
- string badShardedField;
- BSONObj customOut;
- BSONObj shardedCommand = fixForShards( cmdObj , shardedOutputCollection, customOut , badShardedField );
-
- bool customOutDB = customOut.hasField( "db" );
-
// Abort after two retries, m/r is an expensive operation
if( retry > 2 ){
errmsg = "shard version errors preventing parallel mapreduce, check logs for further info";
@@ -1018,14 +1010,57 @@ namespace mongo {
forceRemoteCheckShardVersionCB( fullns );
}
- DBConfigPtr conf = grid.getDBConfig( dbName , false );
+ const string shardResultCollection = getTmpName( collection );
- if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ) {
- if ( customOutDB ) {
- errmsg = "can't use out 'db' with non-sharded db";
- return false;
+ BSONObj customOut;
+ string finalColShort;
+ string finalColLong;
+ bool customOutDB = false;
+ string outDB = dbName;
+ BSONElement outElmt = cmdObj.getField("out");
+ if (outElmt.type() == Object) {
+ // check if there is a custom output
+ BSONObj out = outElmt.embeddedObject();
+ customOut = out;
+ // mode must be 1st element
+ finalColShort = out.firstElement().str();
+ if (customOut.hasField( "db" )) {
+ customOutDB = true;
+ outDB = customOut.getField("db").str();
}
- return passthrough( conf , cmdObj , result );
+ finalColLong = outDB + "." + finalColShort;
+ }
+
+ DBConfigPtr confIn = grid.getDBConfig( dbName , false );
+ DBConfigPtr confOut = confIn;
+ if (customOutDB) {
+ confOut = grid.getDBConfig( outDB , true );
+ }
+
+ bool shardedInput = confIn && confIn->isShardingEnabled() && confIn->isSharded( fullns );
+ bool shardedOutput = customOut.getBoolField("sharded");
+
+ if (!shardedOutput)
+ uassert( 15920 , "Cannot output to a non-sharded collection, a sharded collection exists" , !confOut->isSharded(finalColLong) );
+ // should we also prevent going from non-sharded to sharded? during the transition client may see partial data
+
+ long long maxChunkSizeBytes = 0;
+ if (shardedOutput) {
+ // will need to figure out chunks, ask shards for points
+ maxChunkSizeBytes = cmdObj["maxChunkSizeBytes"].numberLong();
+ if ( maxChunkSizeBytes == 0 ) {
+ maxChunkSizeBytes = Chunk::MaxChunkSize;
+ }
+ }
+
+ // modify command to run on shards with output to tmp collection
+ string badShardedField;
+ assert( maxChunkSizeBytes < 0x7fffffff );
+ BSONObj shardedCommand = fixForShards( cmdObj , shardResultCollection , badShardedField, static_cast<int>(maxChunkSizeBytes) );
+
+ if ( ! shardedInput && ! shardedOutput && ! customOutDB ) {
+ LOG(1) << "simple MR, just passthrough" << endl;
+ return passthrough( confIn , cmdObj , result );
}
if ( badShardedField.size() ) {
@@ -1034,312 +1069,307 @@ namespace mongo {
}
BSONObjBuilder timingBuilder;
-
- ChunkManagerPtr cm = conf->getChunkManager( fullns );
-
BSONObj q;
if ( cmdObj["query"].type() == Object ) {
q = cmdObj["query"].embeddedObjectUserCheck();
}
set<Shard> shards;
- cm->getShardsForQuery( shards , q );
-
-
- BSONObjBuilder finalCmd;
- finalCmd.append( "mapreduce.shardedfinish" , cmdObj );
- finalCmd.append( "shardedOutputCollection" , shardedOutputCollection );
-
+ if ( shardedInput ) {
+ ChunkManagerPtr cm = confIn->getChunkManager( fullns );
+ cm->getShardsForQuery( shards , q );
+ } else {
+ shards.insert(confIn->getPrimary());
+ }
+ // we need to use our connections to the shard
+ // so filtering is done correctly for un-owned docs
+ // so we allocate them in our thread and hand off
set<ServerAndQuery> servers;
- BSONObj shardCounts;
- BSONObj aggCounts;
- map<string,long long> countsMap;
- {
- // we need to use our connections to the shard
- // so filtering is done correctly for un-owned docs
- // so we allocate them in our thread
- // and hand off
- // Note: why not use pooled connections? This has been reported to create too many connections
- vector< shared_ptr<ShardConnection> > shardConns;
- list< shared_ptr<Future::CommandResult> > futures;
-
- for ( set<Shard>::iterator i=shards.begin(), end=shards.end() ; i != end ; i++ ) {
- shared_ptr<ShardConnection> temp( new ShardConnection( i->getConnString() , fullns ) );
- assert( temp->get() );
- futures.push_back( Future::spawnCommand( i->getConnString() , dbName , shardedCommand , 0 , temp->get() ) );
- shardConns.push_back( temp );
- }
-
- bool failed = false;
-
- // now wait for the result of all shards
- BSONObjBuilder shardResultsB;
- BSONObjBuilder shardCountsB;
- BSONObjBuilder aggCountsB;
- for ( list< shared_ptr<Future::CommandResult> >::iterator i=futures.begin(); i!=futures.end(); i++ ) {
+ vector< shared_ptr<ShardConnection> > shardConns;
+ list< shared_ptr<Future::CommandResult> > futures;
- BSONObj mrResult;
- string server;
+ for ( set<Shard>::iterator i=shards.begin(), end=shards.end() ; i != end ; i++ ) {
+ shared_ptr<ShardConnection> temp( new ShardConnection( i->getConnString() , fullns ) );
+ assert( temp->get() );
+ futures.push_back( Future::spawnCommand( i->getConnString() , dbName , shardedCommand , 0 , temp->get() ) );
+ shardConns.push_back( temp );
+ }
- try {
- shared_ptr<Future::CommandResult> res = *i;
- if ( ! res->join() ) {
- error() << "sharded m/r failed on shard: " << res->getServer() << " error: " << res->result() << endl;
- result.append( "cause" , res->result() );
- errmsg = "mongod mr failed: ";
- errmsg += res->result().toString();
- failed = true;
- continue;
- }
- mrResult = res->result();
- server = res->getServer();
- }
- catch( RecvStaleConfigException& e ){
+ bool failed = false;
+ BSONObjBuilder shardResultsB;
+ BSONObjBuilder shardCountsB;
+ BSONObjBuilder aggCountsB;
+ map<string,long long> countsMap;
+ set< BSONObj > splitPts;
- // TODO: really should kill all the MR ops we sent if possible...
+ // now wait for the result of all shards
+ for ( list< shared_ptr<Future::CommandResult> >::iterator i=futures.begin(); i!=futures.end(); i++ ) {
- log() << "restarting m/r due to stale config on a shard" << causedBy( e ) << endl;
+ BSONObj mrResult;
+ string server;
+
+ try {
+ shared_ptr<Future::CommandResult> res = *i;
+ if ( ! res->join() ) {
+ error() << "sharded m/r failed on shard: " << res->getServer() << " error: " << res->result() << endl;
+ result.append( "cause" , res->result() );
+ errmsg = "mongod mr failed: ";
+ errmsg += res->result().toString();
+ failed = true;
+ continue;
+ }
+ mrResult = res->result();
+ server = res->getServer();
+ }
+ catch( RecvStaleConfigException& e ){
+ log() << "restarting m/r due to stale config on a shard" << causedBy( e ) << endl;
+ return run( dbName , cmdObj, errmsg, result, retry + 1 );
+ }
- return run( dbName , cmdObj, errmsg, result, retry + 1 );
+ shardResultsB.append( server , mrResult );
+ BSONObj counts = mrResult["counts"].embeddedObjectUserCheck();
+ shardCountsB.append( server , counts );
+ servers.insert( server );
- }
+ // add up the counts for each shard
+ // some of them will be fixed later like output and reduce
+ BSONObjIterator j( counts );
+ while ( j.more() ) {
+ BSONElement temp = j.next();
+ countsMap[temp.fieldName()] += temp.numberLong();
+ }
- shardResultsB.append( server , mrResult );
- BSONObj counts = mrResult["counts"].embeddedObjectUserCheck();
- shardCountsB.append( server , counts );
- servers.insert( server );
-
- // add up the counts for each shard
- // some of them will be fixed later like output and reduce
- BSONObjIterator j( counts );
- while ( j.more() ) {
- BSONElement temp = j.next();
- countsMap[temp.fieldName()] += temp.numberLong();
+ if (mrResult.hasField("splitKeys")) {
+ BSONElement splitKeys = mrResult.getField("splitKeys");
+ vector<BSONElement> pts = splitKeys.Array();
+ for (vector<BSONElement>::iterator it = pts.begin(); it != pts.end(); ++it) {
+ splitPts.insert(it->Obj());
}
}
+ }
+ for ( unsigned i=0; i<shardConns.size(); i++ )
+ shardConns[i]->done();
+ shardConns.clear();
- for ( unsigned i=0; i<shardConns.size(); i++ )
- shardConns[i]->done();
+ if ( failed ) {
+ return 0;
+ }
- if ( failed )
- return 0;
+ // build the sharded finish command
+ BSONObjBuilder finalCmd;
+ finalCmd.append( "mapreduce.shardedfinish" , cmdObj );
+ finalCmd.append( "inputNS" , dbName + "." + shardResultCollection );
- finalCmd.append( "shards" , shardResultsB.obj() );
- shardCounts = shardCountsB.obj();
- finalCmd.append( "shardCounts" , shardCounts );
- timingBuilder.append( "shards" , t.millis() );
+ finalCmd.append( "shards" , shardResultsB.obj() );
+ BSONObj shardCounts = shardCountsB.obj();
+ finalCmd.append( "shardCounts" , shardCounts );
+ timingBuilder.append( "shardProcessing" , t.millis() );
- for ( map<string,long long>::iterator i=countsMap.begin(); i!=countsMap.end(); i++ ) {
- aggCountsB.append( i->first , i->second );
- }
- aggCounts = aggCountsB.obj();
- finalCmd.append( "counts" , aggCounts );
+ for ( map<string,long long>::iterator i=countsMap.begin(); i!=countsMap.end(); i++ ) {
+ aggCountsB.append( i->first , i->second );
}
+ BSONObj aggCounts = aggCountsB.obj();
+ finalCmd.append( "counts" , aggCounts );
Timer t2;
- BSONObj finalResult;
+ BSONObj singleResult;
bool ok = false;
- string outdb = dbName;
- if (customOutDB) {
- BSONElement elmt = customOut.getField("db");
- outdb = elmt.valuestrsafe();
- }
+ long long reduceCount = 0;
+ long long outputCount = 0;
+ BSONObjBuilder postCountsB;
- if (!customOut.getBoolField("sharded")) {
- // non-sharded, use the MRFinish command on target server
- // This will save some data transfer
-
- // by default the target database is same as input
- Shard outServer = conf->getPrimary();
- string outns = fullns;
- if ( customOutDB ) {
- // have to figure out shard for the output DB
- DBConfigPtr conf2 = grid.getDBConfig( outdb , true );
- outServer = conf2->getPrimary();
- outns = outdb + "." + collection;
- }
- log() << "customOut: " << customOut << " outServer: " << outServer << endl;
+ if (!shardedOutput) {
+ LOG(1) << "MR with single shard output, NS=" << finalColLong << " primary=" << confOut->getPrimary() << endl;
+ ShardConnection conn( confOut->getPrimary() , finalColLong );
+ ok = conn->runCommand( outDB , finalCmd.obj() , singleResult );
+
+ BSONObj counts = singleResult.getObjectField("counts");
+ postCountsB.append(conn->getServerAddress(), counts);
+ reduceCount = counts.getIntField("reduce");
+ outputCount = counts.getIntField("output");
- ShardConnection conn( outServer , outns );
- ok = conn->runCommand( dbName , finalCmd.obj() , finalResult );
conn.done();
} else {
- // grab records from each shard and insert back in correct shard in "temp" collection
- // we do the final reduce in mongos since records are ordered and already reduced on each shard
-// string shardedIncLong = str::stream() << outdb << ".tmp.mr." << collection << "_" << "shardedTemp" << "_" << time(0) << "_" << JOB_NUMBER++;
-
- mr_shard::Config config( dbName , cmdObj );
- mr_shard::State state(config);
- LOG(1) << "mr sharded output ns: " << config.ns << endl;
- if (config.outType == mr_shard::Config::INMEMORY) {
- errmsg = "This Map Reduce mode is not supported with sharded output";
- return false;
- }
-
- if (!config.outDB.empty()) {
- BSONObjBuilder loc;
- if ( !config.outDB.empty())
- loc.append( "db" , config.outDB );
- loc.append( "collection" , config.finalShort );
- result.append("result", loc.obj());
- }
- else {
- if ( !config.finalShort.empty() )
- result.append( "result" , config.finalShort );
- }
-
- string outns = config.finalLong;
- string tempns;
-
- // result will be inserted into a temp collection to post process
- const string postProcessCollection = getTmpName( collection );
- finalCmd.append("postProcessCollection", postProcessCollection);
- tempns = dbName + "." + postProcessCollection;
+ LOG(1) << "MR with sharded output, NS=" << finalColLong << endl;
+ // create the sharded collection if needed
BSONObj sortKey = BSON( "_id" << 1 );
- if (!conf->isSharded(outns)) {
- // create the sharded collection
+ if (!confOut->isSharded(finalColLong)) {
+ // make sure db is sharded
+ if (!confOut->isShardingEnabled()) {
+ BSONObj shardDBCmd = BSON("enableSharding" << outDB);
+ BSONObjBuilder shardDBResult(32);
+ bool res = Command::runAgainstRegistered("admin.$cmd", shardDBCmd, shardDBResult);
+ if (!res) {
+ errmsg = str::stream() << "Could not enable sharding on db " << outDB << ": " << shardDBResult.obj().toString();
+ return false;
+ }
+ }
- BSONObj shardColCmd = BSON("shardCollection" << outns << "key" << sortKey);
+ BSONObj shardColCmd = BSON("shardCollection" << finalColLong << "key" << sortKey);
BSONObjBuilder shardColResult(32);
bool res = Command::runAgainstRegistered("admin.$cmd", shardColCmd, shardColResult);
if (!res) {
- errmsg = str::stream() << "Could not create sharded output collection " << outns << ": " << shardColResult.obj().toString();
+ errmsg = str::stream() << "Could not create sharded output collection " << finalColLong << ": " << shardColResult.obj().toString();
return false;
}
- }
- ParallelSortClusteredCursor cursor( servers , dbName + "." + shardedOutputCollection ,
- Query().sort( sortKey ) );
- cursor.init();
- state.init();
-
- mr_shard::BSONList values;
- long long finalCount = 0;
- int currentSize = 0;
- map<ChunkPtr, long int> sizePerChunk;
- ChunkManagerPtr manager = conf->getChunkManager(outns.c_str());
-
- while ( cursor.more() || !values.empty() ) {
- BSONObj t;
- if ( cursor.more() ) {
- t = cursor.next().getOwned();
-
- if ( values.size() == 0 || t.woSortOrder( *(values.begin()) , sortKey ) == 0 ) {
- values.push_back( t );
- currentSize += t.objsize();
-
- // check size and potentially reduce
- if (currentSize > config.maxInMemSize && values.size() > config.reduceTriggerRatio) {
- BSONObj reduced = config.reducer->finalReduce(values, 0);
- values.clear();
- values.push_back( reduced );
- currentSize = reduced.objsize();
+ // create initial chunks
+ // the 1st chunk from MinKey will belong to primary for shard
+ bool skipPrimary = true;
+ vector<Shard> allShards;
+ Shard::getAllShards(allShards);
+ if ( !splitPts.empty() ) {
+ int sidx = 0;
+ int numShards = allShards.size();
+ for (set<BSONObj>::iterator it = splitPts.begin(); it != splitPts.end(); ++it) {
+ BSONObj splitCmd = BSON("split" << finalColLong << "middle" << *it);
+ BSONObjBuilder splitResult(32);
+ bool res = Command::runAgainstRegistered("admin.$cmd", splitCmd, splitResult);
+ if (!res) {
+ errmsg = str::stream() << "Could not split sharded output collection " << finalColLong << ": " << splitResult.obj().toString();
+ return false;
}
- continue;
+
+ // move to a shard
+ Shard s = allShards[sidx];
+ if (skipPrimary && s == confOut->getPrimary()) {
+ skipPrimary = false;
+ sidx = (sidx + 1) % numShards;
+ s = allShards[sidx];
+ }
+ BSONObj mvCmd = BSON("moveChunk" << finalColLong << "find" << *it << "to" << s.getName());
+ BSONObjBuilder mvResult(32);
+ res = Command::runAgainstRegistered("admin.$cmd", mvCmd, mvResult);
+ if (!res) {
+ errmsg = str::stream() << "Could not move chunk for sharded output collection " << finalColLong << ": " << mvResult.obj().toString();
+ return false;
+ }
+
+ sidx = (sidx + 1) % numShards;
}
}
+ }
- BSONObj final = config.reducer->finalReduce(values, config.finalizer.get());
- // as a future optimization we can have a mode where record goes right into the final collection
- // this would be for MERGE mode and could be much more efficient
-// if (config.outNonAtomic && config.outType == mr_shard::Config::MERGE) {
-// BSONObj id = final["_id"].wrap();
-// s->updateSharded(conf, outns.c_str(), id, final, UpdateOption_Upsert, true);
-// }
-
- // insert into temp collection, but using final collection's shard chunks
- ChunkPtr chunk = insertSharded(manager, tempns.c_str(), final, 0, true);
- if (chunk) {
- sizePerChunk[chunk] += final.objsize();
- }
- ++finalCount;
- values.clear();
- if (!t.isEmpty()) {
- values.push_back( t );
- currentSize = t.objsize();
- }
+ // group chunks per shard
+ ChunkManagerPtr cm = confOut->getChunkManager( finalColLong );
+ map<Shard, vector<ChunkPtr> > rangesList;
+ const ChunkMap& chunkMap = cm->getChunkMap();
+ for ( ChunkMap::const_iterator it = chunkMap.begin(); it != chunkMap.end(); ++it ) {
+ ChunkPtr chunk = it->second;
+ rangesList[chunk->getShard()].push_back(chunk);
}
- {
- // results were written to temp collection, need post processing
- vector< shared_ptr<ShardConnection> > shardConns;
- list< shared_ptr<Future::CommandResult> > futures;
- BSONObj finalCmdObj = finalCmd.obj();
- for ( set<Shard>::iterator i=shards.begin(), end=shards.end() ; i != end ; i++ ) {
- shared_ptr<ShardConnection> temp( new ShardConnection( i->getConnString() , outns ) );
- futures.push_back( Future::spawnCommand( i->getConnString() , dbName , finalCmdObj , 0 , temp->get() ) );
- shardConns.push_back( temp );
+ // spawn sharded finish jobs on each shard
+ // command will fetch appropriate results from other shards, do final reduce and post processing
+ futures.clear();
+ BSONObj finalCmdObj = finalCmd.obj();
+
+ for ( map<Shard, vector<ChunkPtr> >::iterator i=rangesList.begin(), end=rangesList.end() ; i != end ; i++ ) {
+ Shard shard = i->first;
+ BSONObjBuilder b;
+ b.appendElements(finalCmdObj);
+ BSONArrayBuilder ranges;
+ for (vector<ChunkPtr>::iterator it = i->second.begin(); it != i->second.end(); ++it) {
+ ranges.append((*it)->getMin().firstElement());
+ ranges.append((*it)->getMax().firstElement());
}
+ b.append("ranges", ranges.arr());
+ shared_ptr<ShardConnection> temp( new ShardConnection( shard.getConnString() , finalColLong ) );
+ assert( temp->get() );
+ futures.push_back( Future::spawnCommand( shard.getConnString() , outDB , b.obj() , 0 , temp->get() ) );
+ shardConns.push_back(temp);
+ }
- // now wait for the result of all shards
- bool failed = false;
- for ( list< shared_ptr<Future::CommandResult> >::iterator i=futures.begin(); i!=futures.end(); i++ ) {
+ // now wait for the result of all shards
+ ok = true;
+ map<Shard, vector<ChunkPtr> >::iterator rangeIt=rangesList.begin();
+ for ( list< shared_ptr<Future::CommandResult> >::iterator i=futures.begin(); i!=futures.end(); ++i, ++rangeIt ) {
+ string server;
+ try {
shared_ptr<Future::CommandResult> res = *i;
if ( ! res->join() ) {
- error() << "final reduce on sharded output m/r failed on shard: " << res->getServer() << " error: " << res->result() << endl;
+ error() << "final reduce failed on shard: " << res->getServer() << " error: " << res->result() << endl;
result.append( "cause" , res->result() );
- errmsg = "mongod mr failed: ";
+ errmsg = "final reduce failed: ";
errmsg += res->result().toString();
- failed = true;
+ ok = false;
continue;
}
- BSONObj result = res->result();
+ singleResult = res->result();
+ BSONObj counts = singleResult.getObjectField("counts");
+ reduceCount += counts.getIntField("reduce");
+ outputCount += counts.getIntField("output");
+ server = res->getServer();
+ postCountsB.append(server, counts);
+
+ // check on splitting, now that results are in the final collection
+ if (singleResult.hasField("chunkSizes")) {
+ vector<BSONElement> chunkSizes = singleResult.getField("chunkSizes").Array();
+ vector<ChunkPtr> chunks = rangeIt->second;
+ for (unsigned int i = 0; i < chunkSizes.size(); ++i) {
+ long long size = chunkSizes[i].numberLong();
+ ChunkPtr c = chunks[i];
+ assert( size < 0x7fffffff );
+ c->splitIfShould(static_cast<int>(size));
+ }
+ }
+ }
+ catch( RecvStaleConfigException& e ){
+ log() << "final reduce error due to stale config on a shard" << causedBy( e ) << endl;
+ ok = false;
+ continue;
}
-
- for ( unsigned i=0; i<shardConns.size(); i++ )
- shardConns[i]->done();
-
- if (failed)
- return 0;
- }
-
- // check on splitting, now that results are in the final collection
- for ( map< ChunkPtr, long int >::iterator it = sizePerChunk.begin(); it != sizePerChunk.end(); ++it ) {
- ChunkPtr c = it->first;
- long int size = it->second;
- cout << "Splitting for chunk " << c << " with size " << size;
- c->splitIfShould(size);
}
+ }
+ try {
// drop collections with tmp results on each shard
for ( set<ServerAndQuery>::iterator i=servers.begin(); i!=servers.end(); i++ ) {
ScopedDbConnection conn( i->_server );
- conn->dropCollection( dbName + "." + shardedOutputCollection );
+ conn->dropCollection( dbName + "." + shardResultCollection );
conn.done();
}
-
- result.append("shardCounts", shardCounts);
-
- // fix the global counts
- BSONObjBuilder countsB(32);
- BSONObjIterator j(aggCounts);
- while (j.more()) {
- BSONElement elmt = j.next();
- if (!strcmp(elmt.fieldName(), "reduce"))
- countsB.append("reduce", elmt.numberLong() + state.numReduces());
- else if (!strcmp(elmt.fieldName(), "output"))
- countsB.append("output", finalCount);
- else
- countsB.append(elmt);
- }
- result.append( "counts" , countsB.obj() );
- ok = true;
+ } catch ( std::exception e ) {
+ log() << "Cannot cleanup shard results" << causedBy( e ) << endl;
}
+ for ( unsigned i=0; i<shardConns.size(); i++ )
+ shardConns[i]->done();
+
if ( ! ok ) {
errmsg = "final reduce failed: ";
- errmsg += finalResult.toString();
+ errmsg += singleResult.toString();
return 0;
}
- timingBuilder.append( "final" , t2.millis() );
- result.appendElements( finalResult );
+ // copy some elements from a single result
+ // annoying that we have to copy all results for inline, but no way around it
+ if (singleResult.hasField("result"))
+ result.append(singleResult.getField("result"));
+ else if (singleResult.hasField("results"))
+ result.append(singleResult.getField("results"));
+
+ BSONObjBuilder countsB(32);
+ // input stat is determined by aggregate MR job
+ countsB.append("input", aggCounts.getField("input").numberLong());
+ countsB.append("emit", aggCounts.getField("emit").numberLong());
+
+ // reduce count is sum of all reduces that happened
+ countsB.append("reduce", aggCounts.getField("reduce").numberLong() + reduceCount);
+
+ // ouput is determined by post processing on each shard
+ countsB.append("output", outputCount);
+ result.append( "counts" , countsB.obj() );
+
+ timingBuilder.append( "postProcessing" , t2.millis() );
+
result.append( "timeMillis" , t.millis() );
result.append( "timing" , timingBuilder.obj() );
-
+ result.append("shardCounts", shardCounts);
+ result.append("postProcessCounts", postCountsB.obj());
return 1;
}
} mrCmd;
diff --git a/s/config.cpp b/s/config.cpp
index 24d16aadb69..22516eb46a4 100644
--- a/s/config.cpp
+++ b/s/config.cpp
@@ -214,10 +214,11 @@ namespace mongo {
assert( ! key.isEmpty() );
+ BSONObj newest;
if ( oldVersion > 0 && ! forceReload ) {
ScopedDbConnection conn( configServer.modelServer() , 30.0 );
- BSONObj newest = conn->findOne( ShardNS::chunk ,
- Query( BSON( "ns" << ns ) ).sort( "lastmod" , -1 ) );
+ newest = conn->findOne( ShardNS::chunk ,
+ Query( BSON( "ns" << ns ) ).sort( "lastmod" , -1 ) );
conn.done();
if ( ! newest.isEmpty() ) {
@@ -238,11 +239,32 @@ namespace mongo {
// we are not locked now, and want to load a new ChunkManager
- auto_ptr<ChunkManager> temp( new ChunkManager( ns , key , unique ) );
- if ( temp->numChunks() == 0 ) {
- // maybe we're not sharded any more
- reload(); // this is a full reload
- return getChunkManager( ns , false );
+ auto_ptr<ChunkManager> temp;
+
+ {
+ scoped_lock lll ( _hitConfigServerLock );
+
+ if ( ! newest.isEmpty() && ! forceReload ) {
+ // if we have a target we're going for
+ // see if we've hit already
+
+ scoped_lock lk( _lock );
+ CollectionInfo& ci = _collections[ns];
+ if ( ci.isSharded() && ci.getCM() ) {
+ ShardChunkVersion currentVersion = newest["lastmod"];
+ if ( currentVersion == ci.getCM()->getVersion() ) {
+ return ci.getCM();
+ }
+ }
+
+ }
+
+ temp.reset( new ChunkManager( ns , key , unique ) );
+ if ( temp->numChunks() == 0 ) {
+ // maybe we're not sharded any more
+ reload(); // this is a full reload
+ return getChunkManager( ns , false );
+ }
}
scoped_lock lk( _lock );
diff --git a/s/config.h b/s/config.h
index 3b7eb9570ba..0374bcb52be 100644
--- a/s/config.h
+++ b/s/config.h
@@ -115,7 +115,8 @@ namespace mongo {
: _name( name ) ,
_primary("config","") ,
_shardingEnabled(false),
- _lock("DBConfig") {
+ _lock("DBConfig") ,
+ _hitConfigServerLock( "DBConfig::_hitConfigServerLock" ) {
assert( name.size() );
}
virtual ~DBConfig() {}
@@ -195,6 +196,7 @@ namespace mongo {
Collections _collections;
mutable mongo::mutex _lock; // TODO: change to r/w lock ??
+ mutable mongo::mutex _hitConfigServerLock;
};
class ConfigServer : public DBConfig {
diff --git a/s/d_logic.h b/s/d_logic.h
index d96f937756f..ade02b21e80 100644
--- a/s/d_logic.h
+++ b/s/d_logic.h
@@ -31,7 +31,6 @@ namespace mongo {
class DiskLoc;
typedef ShardChunkVersion ConfigVersion;
- typedef map<string,ConfigVersion> NSVersionMap;
// --------------
// --- global state ---
@@ -185,9 +184,11 @@ namespace mongo {
private:
OID _id;
- NSVersionMap _versions;
bool _forceVersionOk; // if this is true, then chunk version #s aren't check, and all ops are allowed
+ typedef map<string,ConfigVersion> NSVersionMap;
+ NSVersionMap _versions;
+
static boost::thread_specific_ptr<ShardedConnectionInfo> _tl;
};
diff --git a/s/d_state.cpp b/s/d_state.cpp
index 393df986533..39d84b6ff88 100644
--- a/s/d_state.cpp
+++ b/s/d_state.cpp
@@ -29,7 +29,7 @@
#include "../db/commands.h"
#include "../db/jsobj.h"
#include "../db/db.h"
-
+#include "../db/replutil.h"
#include "../client/connpool.h"
#include "../util/queue.h"
@@ -396,6 +396,7 @@ namespace mongo {
help << " example: { setShardVersion : 'alleyinsider.foo' , version : 1 , configdb : '' } ";
}
+ virtual bool slaveOk() const { return true; }
virtual LockType locktype() const { return NONE; }
bool checkConfigOrInit( const string& configdb , bool authoritative , string& errmsg , BSONObjBuilder& result , bool locked=false ) const {
@@ -492,6 +493,13 @@ namespace mongo {
return true;
}
+ // we can run on a slave up to here
+ if ( ! isMaster( "admin" ) ) {
+ result.append( "errmsg" , "not master" );
+ result.append( "note" , "from post init in setShardVersion" );
+ return false;
+ }
+
// step 2
string ns = cmdObj["setShardVersion"].valuestrsafe();
@@ -518,8 +526,19 @@ namespace mongo {
if ( version == globalVersion ) {
// mongos and mongod agree!
if ( oldVersion != version ) {
- assert( oldVersion < globalVersion );
- info->setVersion( ns , version );
+ if ( oldVersion < globalVersion ) {
+ info->setVersion( ns , version );
+ }
+ else if ( authoritative ) {
+ // this means there was a drop and our version is reset
+ info->setVersion( ns , version );
+ }
+ else {
+ result.append( "ns" , ns );
+ result.appendBool( "need_authoritative" , true );
+ errmsg = "verifying drop on '" + ns + "'";
+ return false;
+ }
}
return true;
}
@@ -668,6 +687,11 @@ namespace mongo {
if ( ! shardingState.enabled() )
return true;
+ if ( ! isMasterNs( ns.c_str() ) ) {
+ // right now connections to secondaries aren't versioned at all
+ return true;
+ }
+
ShardedConnectionInfo* info = ShardedConnectionInfo::get( false );
if ( ! info ) {
diff --git a/s/dbgrid.vcxproj b/s/dbgrid.vcxproj
index 584757abbf3..a576dc7a6a7 100644
--- a/s/dbgrid.vcxproj
+++ b/s/dbgrid.vcxproj
@@ -112,6 +112,10 @@
<SubSystem>Console</SubSystem>
<TargetMachine>MachineX86</TargetMachine>
</Link>
+ <PreBuildEvent>
+ <Command>cscript //Nologo ..\shell\msvc\createCPPfromJavaScriptFiles.js "$(ProjectDir).."</Command>
+ <Message>Create mongo.cpp and mongo-server.cpp from JavaScript source files</Message>
+ </PreBuildEvent>
</ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">
<ClCompile>
@@ -134,6 +138,10 @@
<GenerateDebugInformation>true</GenerateDebugInformation>
<SubSystem>Console</SubSystem>
</Link>
+ <PreBuildEvent>
+ <Command>cscript //Nologo ..\shell\msvc\createCPPfromJavaScriptFiles.js "$(ProjectDir).."</Command>
+ <Message>Create mongo.cpp and mongo-server.cpp from JavaScript source files</Message>
+ </PreBuildEvent>
</ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
<ClCompile>
@@ -160,6 +168,10 @@
<EnableCOMDATFolding>true</EnableCOMDATFolding>
<TargetMachine>MachineX86</TargetMachine>
</Link>
+ <PreBuildEvent>
+ <Command>cscript //Nologo ..\shell\msvc\createCPPfromJavaScriptFiles.js "$(ProjectDir).."</Command>
+ <Message>Create mongo.cpp and mongo-server.cpp from JavaScript source files</Message>
+ </PreBuildEvent>
</ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
<ClCompile>
@@ -185,6 +197,10 @@
<OptimizeReferences>true</OptimizeReferences>
<EnableCOMDATFolding>true</EnableCOMDATFolding>
</Link>
+ <PreBuildEvent>
+ <Command>cscript //Nologo ..\shell\msvc\createCPPfromJavaScriptFiles.js "$(ProjectDir).."</Command>
+ <Message>Create mongo.cpp and mongo-server.cpp from JavaScript source files</Message>
+ </PreBuildEvent>
</ItemDefinitionGroup>
<ItemGroup>
<ClCompile Include="..\bson\oid.cpp" />
@@ -533,7 +549,7 @@
<ClCompile Include="..\util\net\message_server_port.cpp" />
<ClCompile Include="..\util\mmap.cpp" />
<ClCompile Include="..\util\mmap_win.cpp" />
- <ClCompile Include="..\shell\mongo_vstudio.cpp">
+ <ClCompile Include="..\shell\mongo.cpp">
<PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
</PrecompiledHeader>
<PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">
diff --git a/s/dbgrid.vcxproj.filters b/s/dbgrid.vcxproj.filters
index 1e204805d74..1f823d53f09 100755
--- a/s/dbgrid.vcxproj.filters
+++ b/s/dbgrid.vcxproj.filters
@@ -143,7 +143,7 @@
<ClCompile Include="..\util\mmap_win.cpp">
<Filter>Shared Source Files</Filter>
</ClCompile>
- <ClCompile Include="..\shell\mongo_vstudio.cpp">
+ <ClCompile Include="..\shell\mongo.cpp">
<Filter>Shared Source Files</Filter>
</ClCompile>
<ClCompile Include="..\db\nonce.cpp">
diff --git a/s/grid.cpp b/s/grid.cpp
index 8dd5bd411af..c14feb50a03 100644
--- a/s/grid.cpp
+++ b/s/grid.cpp
@@ -22,6 +22,7 @@
#include "../client/connpool.h"
#include "../util/stringutils.h"
#include "../util/unittest.h"
+#include "../db/namespace_common.h"
#include "grid.h"
#include "shard.h"
@@ -38,6 +39,8 @@ namespace mongo {
if ( database == "config" )
return configServerPtr;
+ uassert( 15918 , str::stream() << "invalid database name: " << database , NamespaceString::validDBName( database ) );
+
scoped_lock l( _lock );
DBConfigPtr& cc = _databases[database];
diff --git a/s/shard_version.cpp b/s/shard_version.cpp
index 8e51b2aa104..0d669fe216c 100644
--- a/s/shard_version.cpp
+++ b/s/shard_version.cpp
@@ -136,6 +136,14 @@ namespace mongo {
bool ok = conn->runCommand( "admin" , cmd , result );
+ // HACK for backwards compatibility with v1.8.x, v2.0.0 and v2.0.1
+ // Result is false, but will still initialize serverID and configdb
+ if( ! ok && ! result["errmsg"].eoo() && ( result["errmsg"].String() == "need to specify namespace"/* 2.0.1/2 */ ||
+ result["errmsg"].String() == "need to speciy namespace" /* 1.8 */ ))
+ {
+ ok = true;
+ }
+
LOG(3) << "initial sharding result : " << result << endl;
return ok;
diff --git a/s/shardconnection.cpp b/s/shardconnection.cpp
index dff828a0d39..b759284f94a 100644
--- a/s/shardconnection.cpp
+++ b/s/shardconnection.cpp
@@ -105,7 +105,13 @@ namespace mongo {
if ( s->avail ) {
DBClientBase* c = s->avail;
s->avail = 0;
- shardConnectionPool.onHandedOut( c );
+ try {
+ shardConnectionPool.onHandedOut( c );
+ }
+ catch ( std::exception& ) {
+ delete c;
+ throw;
+ }
return c;
}
diff --git a/s/strategy_shard.cpp b/s/strategy_shard.cpp
index d5a65b4a703..6a19b990458 100644
--- a/s/strategy_shard.cpp
+++ b/s/strategy_shard.cpp
@@ -73,14 +73,22 @@ namespace mongo {
assert( cursor );
try {
+ long long start_millis = 0;
+ if ( query.isExplain() ) start_millis = curTimeMillis64();
cursor->init();
LOG(5) << " cursor type: " << cursor->type() << endl;
shardedCursorTypes.hit( cursor->type() );
if ( query.isExplain() ) {
- BSONObj explain = cursor->explain();
- replyToQuery( 0 , r.p() , r.m() , explain );
+ // fetch elapsed time for the query
+ long long elapsed_millis = curTimeMillis64() - start_millis;
+ BSONObjBuilder explain_builder;
+ cursor->explain( explain_builder );
+ explain_builder.appendNumber( "millis", elapsed_millis );
+ BSONObj b = explain_builder.obj();
+
+ replyToQuery( 0 , r.p() , r.m() , b );
delete( cursor );
return;
}
diff --git a/s/strategy_single.cpp b/s/strategy_single.cpp
index 1e3e894d2a8..d9c2b03fae0 100644
--- a/s/strategy_single.cpp
+++ b/s/strategy_single.cpp
@@ -178,6 +178,8 @@ namespace mongo {
return false;
ns += 10;
+ r.checkAuth( Auth::WRITE );
+
BSONObjBuilder b;
vector<Shard> shards;
@@ -220,7 +222,7 @@ namespace mongo {
}
else if ( strcmp( ns , "killop" ) == 0 ) {
BSONElement e = q.query["op"];
- if ( strstr( r.getns() , "admin." ) != 0 ) {
+ if ( strstr( r.getns() , "admin." ) == 0 ) {
b.append( "err" , "unauthorized" );
}
else if ( e.type() != String ) {