// parallel.cpp /* * Copyright 2010 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects * for all of the code used other than as permitted herein. If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so. If you do not * wish to do so, delete this exception statement from your version. If you * delete this exception statement from all source files in the program, * then also delete it in the license file. */ #include "mongo/platform/basic.h" #include "mongo/client/parallel.h" #include "mongo/client/connpool.h" #include "mongo/client/dbclientcursor.h" #include "mongo/db/dbmessage.h" #include "mongo/db/query/lite_parsed_query.h" #include "mongo/s/chunk.h" #include "mongo/s/chunk_version.h" #include "mongo/s/config.h" #include "mongo/s/grid.h" #include "mongo/s/shard.h" #include "mongo/s/version_manager.h" #include "mongo/util/log.h" namespace mongo { MONGO_LOG_DEFAULT_COMPONENT_FILE(::mongo::logger::LogComponent::kNetworking); LabeledLevel pc( "pcursor", 2 ); void ParallelSortClusteredCursor::init() { if ( _didInit ) return; _didInit = true; if( ! _qSpec.isEmpty() ) { fullInit(); } else { // You can only get here by using the legacy constructor // TODO: Eliminate this _oldInit(); } } string ParallelSortClusteredCursor::getNS() { if( ! _qSpec.isEmpty() ) return _qSpec.ns(); return _ns; } static void _checkCursor( DBClientCursor * cursor ) { verify( cursor ); if ( cursor->hasResultFlag( ResultFlag_ShardConfigStale ) ) { BSONObj error; cursor->peekError( &error ); throw RecvStaleConfigException( "_checkCursor", error ); } if ( cursor->hasResultFlag( ResultFlag_ErrSet ) ) { BSONObj o = cursor->next(); throw UserException( o["code"].numberInt() , o["$err"].String() ); } if ( NamespaceString( cursor->getns() ).isCommand() ) { // For backwards compatibility with v2.0 mongods because in 2.0 commands that care about // versioning (like the count command) will return with the stale config error code, but // don't set the ShardConfigStale result flag on the cursor. // TODO: This should probably be removed for 2.3, as we'll no longer need to support // running with a 2.0 mongod. BSONObj res = cursor->peekFirst(); if ( res.hasField( "code" ) && res["code"].Number() == SendStaleConfigCode ) { throw RecvStaleConfigException( "_checkCursor", res ); } } } void ParallelSortClusteredCursor::explain(BSONObjBuilder& b) { // Note: by default we filter out allPlans and oldPlan in the shell's // explain() function. If you add any recursive structures, make sure to // edit the JS to make sure everything gets filtered. // Return single shard output if we're versioned but not sharded, or // if we specified only a single shard // TODO: We should really make this simpler - all queries via mongos // *always* get the same explain format if (!isSharded()) { map > out; _explain( out ); verify( out.size() == 1 ); list& l = out.begin()->second; verify( l.size() == 1 ); b.appendElements( *(l.begin()) ); return; } b.append( "clusteredType" , type() ); string cursorType; BSONObj indexBounds; BSONObj oldPlan; long long millis = 0; double numExplains = 0; map counters; map > out; { _explain( out ); BSONObjBuilder x( b.subobjStart( "shards" ) ); for ( map >::iterator i=out.begin(); i!=out.end(); ++i ) { string shard = i->first; list l = i->second; BSONArrayBuilder y( x.subarrayStart( shard ) ); for ( list::iterator j=l.begin(); j!=l.end(); ++j ) { BSONObj temp = *j; // If appending the next output from the shard is going to make the BSON // too large, then don't add it. We make sure the BSON doesn't get bigger // than the allowable "user size" for a BSONObj. This leaves a little bit // of extra space which mongos can use to add extra data. if ((x.len() + temp.objsize()) > BSONObjMaxUserSize) { y.append(BSON("warning" << "shard output omitted due to nearing 16 MB limit")); break; } y.append( temp ); BSONObjIterator k( temp ); while ( k.more() ) { BSONElement z = k.next(); if ( z.fieldName()[0] != 'n' ) continue; long long& c = counters[z.fieldName()]; c += z.numberLong(); } millis += temp["millis"].numberLong(); numExplains++; if ( temp["cursor"].type() == String ) { if ( cursorType.size() == 0 ) cursorType = temp["cursor"].String(); else if ( cursorType != temp["cursor"].String() ) cursorType = "multiple"; } if ( temp["indexBounds"].type() == Object ) indexBounds = temp["indexBounds"].Obj(); if ( temp["oldPlan"].type() == Object ) oldPlan = temp["oldPlan"].Obj(); } y.done(); } x.done(); } b.append( "cursor" , cursorType ); for ( map::iterator i=counters.begin(); i!=counters.end(); ++i ) b.appendNumber( i->first , i->second ); b.appendNumber( "millisShardTotal" , millis ); b.append( "millisShardAvg" , numExplains ? static_cast( static_cast(millis) / numExplains ) : 0 ); b.append( "numQueries" , (int)numExplains ); b.append( "numShards" , (int)out.size() ); if ( out.size() == 1 ) { b.append( "indexBounds" , indexBounds ); if ( ! oldPlan.isEmpty() ) { // this is to stay in compliance with mongod behavior // we should make this cleaner, i.e. {} == nothing b.append( "oldPlan" , oldPlan ); } } else { // TODO: this is lame... } } // -------- FilteringClientCursor ----------- FilteringClientCursor::FilteringClientCursor() : _pcmData( NULL ), _done( true ) { } FilteringClientCursor::~FilteringClientCursor() { // Don't use _pcmData _pcmData = NULL; } void FilteringClientCursor::reset( auto_ptr cursor ) { _cursor = cursor; _next = BSONObj(); _done = _cursor.get() == 0; _pcmData = NULL; } void FilteringClientCursor::reset( DBClientCursor* cursor, ParallelConnectionMetadata* pcmData ) { _cursor.reset( cursor ); _pcmData = pcmData; _next = BSONObj(); _done = cursor == 0; } bool FilteringClientCursor::more() { if ( ! _next.isEmpty() ) return true; if ( _done ) return false; _advance(); return ! _next.isEmpty(); } BSONObj FilteringClientCursor::next() { verify( ! _next.isEmpty() ); verify( ! _done ); BSONObj ret = _next; _next = BSONObj(); return ret; } BSONObj FilteringClientCursor::peek() { if ( _next.isEmpty() ) _advance(); return _next; } void FilteringClientCursor::_advance() { verify( _next.isEmpty() ); if ( ! _cursor.get() || _done ) return; while ( _cursor->more() ) { _next = _cursor->next(); if (!_cursor->moreInCurrentBatch()) { _next = _next.getOwned(); } return; } _done = true; } // -------- ParallelSortClusteredCursor ----------- ParallelSortClusteredCursor::ParallelSortClusteredCursor( const QuerySpec& qSpec, const CommandInfo& cInfo ) : _qSpec( qSpec ), _cInfo( cInfo ), _totalTries( 0 ) { _done = false; _didInit = false; _finishCons(); } // LEGACY Constructor ParallelSortClusteredCursor::ParallelSortClusteredCursor( const set& servers , const string& ns , const Query& q , int options , const BSONObj& fields ) : _servers( servers ) { _sortKey = q.getSort().copy(); _needToSkip = 0; _done = false; _didInit = false; // Populate legacy fields _ns = ns; _query = q.obj.getOwned(); _options = options; _fields = fields.getOwned(); _batchSize = 0; _finishCons(); } void ParallelSortClusteredCursor::_finishCons() { _numServers = _servers.size(); _lastFrom = 0; _cursors = 0; if( ! _qSpec.isEmpty() ){ _needToSkip = _qSpec.ntoskip(); _cursors = 0; _sortKey = _qSpec.sort(); _fields = _qSpec.fields(); } // Partition sort key fields into (a) text meta fields and (b) all other fields. set textMetaSortKeyFields; set normalSortKeyFields; // Transform _sortKey fields {a:{$meta:"textScore"}} into {a:-1}, in order to apply the // merge sort for text metadata in the correct direction. BSONObjBuilder transformedSortKeyBuilder; BSONObjIterator sortKeyIt( _sortKey ); while ( sortKeyIt.more() ) { BSONElement e = sortKeyIt.next(); if ( LiteParsedQuery::isTextScoreMeta( e ) ) { textMetaSortKeyFields.insert( e.fieldName() ); transformedSortKeyBuilder.append( e.fieldName(), -1 ); } else { normalSortKeyFields.insert( e.fieldName() ); transformedSortKeyBuilder.append( e ); } } _sortKey = transformedSortKeyBuilder.obj(); // Verify that that all text metadata sort fields are in the projection. For all other sort // fields, copy them into the projection if they are missing (and if projection is // negative). if ( ! _sortKey.isEmpty() && ! _fields.isEmpty() ) { BSONObjBuilder b; bool isNegative = false; { BSONObjIterator i( _fields ); while ( i.more() ) { BSONElement e = i.next(); b.append( e ); string fieldName = e.fieldName(); if ( LiteParsedQuery::isTextScoreMeta( e ) ) { textMetaSortKeyFields.erase( fieldName ); } else { // exact field bool found = normalSortKeyFields.erase( fieldName ); // subfields set::const_iterator begin = normalSortKeyFields.lower_bound( fieldName + ".\x00" ); set::const_iterator end = normalSortKeyFields.lower_bound( fieldName + ".\xFF" ); normalSortKeyFields.erase( begin, end ); if ( ! e.trueValue() ) { uassert( 13431, "have to have sort key in projection and removing it", !found && begin == end ); } else if ( !e.isABSONObj() ) { isNegative = true; } } } } if ( isNegative ) { for ( set::const_iterator it( normalSortKeyFields.begin() ), end( normalSortKeyFields.end() ); it != end; ++it ) { b.append( *it, 1 ); } } _fields = b.obj(); } if( ! _qSpec.isEmpty() ){ _qSpec.setFields( _fields ); } uassert( 17306, "have to have all text meta sort keys in projection", textMetaSortKeyFields.empty() ); } void ParallelConnectionMetadata::cleanup( bool full ){ if( full || errored ) retryNext = false; if( ! retryNext && pcState ){ if( errored && pcState->conn ){ // Don't return this conn to the pool if it's bad pcState->conn->kill(); pcState->conn.reset(); } else if( initialized ){ verify( pcState->cursor ); verify( pcState->conn ); if( ! finished && pcState->conn->ok() ){ try{ // Complete the call if only halfway done bool retry = false; pcState->cursor->initLazyFinish( retry ); } catch( std::exception& ){ warning() << "exception closing cursor" << endl; } catch( ... ){ warning() << "unknown exception closing cursor" << endl; } } } // Double-check conn is closed if( pcState->conn ){ pcState->conn->done(); } pcState.reset(); } else verify( finished || ! initialized ); initialized = false; finished = false; completed = false; errored = false; } BSONObj ParallelConnectionState::toBSON() const { BSONObj cursorPeek = BSON( "no cursor" << "" ); if( cursor ){ vector v; cursor->peek( v, 1 ); if( v.size() == 0 ) cursorPeek = BSON( "no data" << "" ); else cursorPeek = BSON( "" << v[0] ); } BSONObj stateObj = BSON( "conn" << ( conn ? ( conn->ok() ? conn->conn().toString() : "(done)" ) : "" ) << "vinfo" << ( manager ? ( str::stream() << manager->getns() << " @ " << manager->getVersion().toString() ) : primary->toString() ) ); // Append cursor data if exists BSONObjBuilder stateB; stateB.appendElements( stateObj ); if( ! cursor ) stateB.append( "cursor", "(none)" ); else { vector v; cursor->peek( v, 1 ); if( v.size() == 0 ) stateB.append( "cursor", "(empty)" ); else stateB.append( "cursor", v[0] ); } stateB.append( "count", count ); stateB.append( "done", done ); return stateB.obj().getOwned(); } BSONObj ParallelConnectionMetadata::toBSON() const { return BSON( "state" << ( pcState ? pcState->toBSON() : BSONObj() ) << "retryNext" << retryNext << "init" << initialized << "finish" << finished << "errored" << errored ); } BSONObj ParallelSortClusteredCursor::toBSON() const { BSONObjBuilder b; b.append( "tries", _totalTries ); { BSONObjBuilder bb; for( map< Shard, PCMData >::const_iterator i = _cursorMap.begin(), end = _cursorMap.end(); i != end; ++i ){ bb.append( i->first.toString(), i->second.toBSON() ); } b.append( "cursors", bb.obj().getOwned() ); } { BSONObjBuilder bb; for( map< string, int >::const_iterator i = _staleNSMap.begin(), end = _staleNSMap.end(); i != end; ++i ){ bb.append( i->first, i->second ); } b.append( "staleTries", bb.obj().getOwned() ); } return b.obj().getOwned(); } string ParallelSortClusteredCursor::toString() const { return str::stream() << "PCursor : " << toBSON(); } void ParallelSortClusteredCursor::fullInit(){ startInit(); finishInit(); } void ParallelSortClusteredCursor::_markStaleNS( const NamespaceString& staleNS, const StaleConfigException& e, bool& forceReload, bool& fullReload ){ fullReload = e.requiresFullReload(); if( _staleNSMap.find( staleNS ) == _staleNSMap.end() ) _staleNSMap[ staleNS ] = 1; int tries = ++_staleNSMap[ staleNS ]; if( tries >= 5 ) throw SendStaleConfigException( staleNS, str::stream() << "too many retries of stale version info", e.getVersionReceived(), e.getVersionWanted() ); forceReload = tries > 2; } void ParallelSortClusteredCursor::_handleStaleNS( const NamespaceString& staleNS, bool forceReload, bool fullReload ){ DBConfigPtr config = grid.getDBConfig( staleNS.db() ); // Reload db if needed, make sure it works if( config && fullReload && ! config->reload() ){ // We didn't find the db after the reload, the db may have been dropped, // reset this ptr config.reset(); } if( ! config ){ warning() << "cannot reload database info for stale namespace " << staleNS << endl; } else { // Reload chunk manager, potentially forcing the namespace config->getChunkManagerIfExists( staleNS, true, forceReload ); } } void ParallelSortClusteredCursor::setupVersionAndHandleSlaveOk( PCStatePtr state, const Shard& shard, ShardPtr primary, const NamespaceString& ns, const string& vinfo, ChunkManagerPtr manager ) { if ( manager ) { state->manager = manager; } else if ( primary ) { state->primary = primary; } verify( ! primary || shard == *primary ); // Setup conn if ( !state->conn ){ state->conn.reset( new ShardConnection( shard, ns, manager ) ); } const DBClientBase* rawConn = state->conn->getRawConn(); bool allowShardVersionFailure = rawConn->type() == ConnectionString::SET && DBClientReplicaSet::isSecondaryQuery( _qSpec.ns(), _qSpec.query(), _qSpec.options() ); if ( allowShardVersionFailure && rawConn->isFailed() ) { state->conn->donotCheckVersion(); // A side effect of this short circuiting is the mongos will not be able figure out that // the primary is now up on it's own and has to rely on other threads to refresh node // states. OCCASIONALLY { const DBClientReplicaSet* repl = dynamic_cast( rawConn ); dassert(repl); warning() << "Primary for " << repl->getServerAddress() << " was down before, bypassing setShardVersion." << " The local replica set view and targeting may be stale." << endl; } } else { try { if ( state->conn->setVersion() ) { // It's actually okay if we set the version here, since either the // manager will be verified as compatible, or if the manager doesn't // exist, we don't care about version consistency LOG( pc ) << "needed to set remote version on connection to value " << "compatible with " << vinfo << endl; } } catch ( const DBException& ) { if ( allowShardVersionFailure ) { // It's okay if we don't set the version when talking to a secondary, we can // be stale in any case. OCCASIONALLY { const DBClientReplicaSet* repl = dynamic_cast( state->conn->getRawConn() ); dassert(repl); warning() << "Cannot contact primary for " << repl->getServerAddress() << " to check shard version." << " The local replica set view and targeting may be stale." << endl; } } else { throw; } } } } void ParallelSortClusteredCursor::startInit() { const bool returnPartial = ( _qSpec.options() & QueryOption_PartialResults ); NamespaceString ns( !_cInfo.isEmpty() ? _cInfo.versionedNS : _qSpec.ns() ); ChunkManagerPtr manager; ShardPtr primary; string prefix; if (MONGO_unlikely(logger::globalLogDomain()->shouldLog(pc))) { if( _totalTries > 0 ) { prefix = str::stream() << "retrying (" << _totalTries << " tries)"; } else { prefix = "creating"; } } LOG( pc ) << prefix << " pcursor over " << _qSpec << " and " << _cInfo << endl; set todoStorage; set& todo = todoStorage; string vinfo; DBConfigPtr config = grid.getDBConfig( ns.db() ); // Gets or loads the config uassert( 15989, "database not found for parallel cursor request", config ); // Try to get either the chunk manager or the primary shard config->getChunkManagerOrPrimary( ns, manager, primary ); if (MONGO_unlikely(logger::globalLogDomain()->shouldLog(pc))) { if (manager) { vinfo = str::stream() << "[" << manager->getns() << " @ " << manager->getVersion().toString() << "]"; } else { vinfo = str::stream() << "[unsharded @ " << primary->toString() << "]"; } } if( manager ) manager->getShardsForQuery( todo, !_cInfo.isEmpty() ? _cInfo.cmdFilter : _qSpec.filter() ); else if( primary ) todo.insert( *primary ); // Close all cursors on extra shards first, as these will be invalid for( map< Shard, PCMData >::iterator i = _cursorMap.begin(), end = _cursorMap.end(); i != end; ++i ){ LOG( pc ) << "closing cursor on shard " << i->first << " as the connection is no longer required by " << vinfo << endl; // Force total cleanup of these connections if( todo.find( i->first ) == todo.end() ) i->second.cleanup(); } verify( todo.size() ); LOG( pc ) << "initializing over " << todo.size() << " shards required by " << vinfo << endl; // Don't retry indefinitely for whatever reason _totalTries++; uassert( 15986, "too many retries in total", _totalTries < 10 ); for( set::iterator i = todo.begin(), end = todo.end(); i != end; ++i ){ const Shard& shard = *i; PCMData& mdata = _cursorMap[ shard ]; LOG( pc ) << "initializing on shard " << shard << ", current connection state is " << mdata.toBSON() << endl; // This may be the first time connecting to this shard, if so we can get an error here try { if( mdata.initialized ){ verify( mdata.pcState ); PCStatePtr state = mdata.pcState; bool compatiblePrimary = true; bool compatibleManager = true; if( primary && ! state->primary ) warning() << "Collection becoming unsharded detected" << endl; if( manager && ! state->manager ) warning() << "Collection becoming sharded detected" << endl; if( primary && state->primary && primary != state->primary ) warning() << "Weird shift of primary detected" << endl; compatiblePrimary = primary && state->primary && primary == state->primary; compatibleManager = manager && state->manager && manager->compatibleWith( state->manager, shard ); if( compatiblePrimary || compatibleManager ){ // If we're compatible, don't need to retry unless forced if( ! mdata.retryNext ) continue; // Do partial cleanup mdata.cleanup( false ); } else { // Force total cleanup of connection if no longer compatible mdata.cleanup(); } } else { // Cleanup connection if we're not yet initialized mdata.cleanup( false ); } mdata.pcState.reset( new PCState() ); PCStatePtr state = mdata.pcState; setupVersionAndHandleSlaveOk( state, shard, primary, ns, vinfo, manager ); const string& ns = _qSpec.ns(); // Setup cursor if( ! state->cursor ){ // // Here we decide whether to split the query limits up for multiple shards. // NOTE: There's a subtle issue here, in that it's possible we target a single // shard first, but are stale, and then target multiple shards, or vice-versa. // In both these cases, we won't re-use the old cursor created here, since the // shard version must have changed on the single shard between queries. // if (todo.size() > 1) { // Query limits split for multiple shards state->cursor.reset( new DBClientCursor( state->conn->get(), ns, _qSpec.query(), isCommand() ? 1 : 0, // nToReturn (0 if query indicates multi) 0, // nToSkip // Does this need to be a ptr? _qSpec.fields().isEmpty() ? 0 : _qSpec.fieldsData(), // fieldsToReturn _qSpec.options(), // options // NtoReturn is weird. // If zero, it means use default size, so we do that for all cursors // If positive, it's the batch size (we don't want this cursor limiting results), that's // done at a higher level // If negative, it's the batch size, but we don't create a cursor - so we don't want // to create a child cursor either. // Either way, if non-zero, we want to pull back the batch size + the skip amount as // quickly as possible. Potentially, for a cursor on a single shard or if we keep better track of // chunks, we can actually add the skip value into the cursor and/or make some assumptions about the // return value size ( (batch size + skip amount) / num_servers ). _qSpec.ntoreturn() == 0 ? 0 : ( _qSpec.ntoreturn() > 0 ? _qSpec.ntoreturn() + _qSpec.ntoskip() : _qSpec.ntoreturn() - _qSpec.ntoskip() ) ) ); // batchSize } else { // Single shard query state->cursor.reset( new DBClientCursor( state->conn->get(), ns, _qSpec.query(), _qSpec.ntoreturn(), // nToReturn _qSpec.ntoskip(), // nToSkip // Does this need to be a ptr? _qSpec.fields().isEmpty() ? 0 : _qSpec.fieldsData(), // fieldsToReturn _qSpec.options(), // options 0 ) ); // batchSize } } bool lazyInit = state->conn->get()->lazySupported(); if( lazyInit ){ // Need to keep track if this is a second or third try for replica sets state->cursor->initLazy( mdata.retryNext ); mdata.retryNext = false; mdata.initialized = true; } else { bool success = false; if( nsGetCollection( ns ) == "$cmd" ){ /* TODO: remove this when config servers don't use * SyncClusterConnection anymore. This is needed * because SyncConn doesn't allow the call() method * to be used for commands. */ success = state->cursor->initCommand(); } else { success = state->cursor->init(); } // Without full initialization, throw an exception uassert( 15987, str::stream() << "could not fully initialize cursor on shard " << shard.toString() << ", current connection state is " << mdata.toBSON().toString(), success ); mdata.retryNext = false; mdata.initialized = true; mdata.finished = true; } LOG( pc ) << "initialized " << ( isCommand() ? "command " : "query " ) << ( lazyInit ? "(lazily) " : "(full) " ) << "on shard " << shard << ", current connection state is " << mdata.toBSON() << endl; } catch( StaleConfigException& e ){ // Our version isn't compatible with the current version anymore on at least one shard, need to retry immediately NamespaceString staleNS( e.getns() ); // For legacy reasons, this may not be set in the exception :-( if( staleNS.size() == 0 ) staleNS = ns; // ns is the *versioned* namespace, be careful of this // Probably need to retry fully bool forceReload, fullReload; _markStaleNS( staleNS, e, forceReload, fullReload ); int logLevel = fullReload ? 0 : 1; LOG( pc + logLevel ) << "stale config of ns " << staleNS << " during initialization, will retry with forced : " << forceReload << ", full : " << fullReload << causedBy( e ) << endl; // This is somewhat strange if( staleNS != ns ) warning() << "versioned ns " << ns << " doesn't match stale config namespace " << staleNS << endl; _handleStaleNS( staleNS, forceReload, fullReload ); // Restart with new chunk manager startInit(); return; } catch( SocketException& e ){ warning() << "socket exception when initializing on " << shard << ", current connection state is " << mdata.toBSON() << causedBy( e ) << endl; e._shard = shard.getName(); mdata.errored = true; if( returnPartial ){ mdata.cleanup(); continue; } throw; } catch( DBException& e ){ warning() << "db exception when initializing on " << shard << ", current connection state is " << mdata.toBSON() << causedBy( e ) << endl; e._shard = shard.getName(); mdata.errored = true; if( returnPartial && e.getCode() == 15925 /* From above! */ ){ mdata.cleanup(); continue; } throw; } catch( std::exception& e){ warning() << "exception when initializing on " << shard << ", current connection state is " << mdata.toBSON() << causedBy( e ) << endl; mdata.errored = true; throw; } catch( ... ){ warning() << "unknown exception when initializing on " << shard << ", current connection state is " << mdata.toBSON() << endl; mdata.errored = true; throw; } } // Sanity check final init'ed connections for( map< Shard, PCMData >::iterator i = _cursorMap.begin(), end = _cursorMap.end(); i != end; ++i ){ const Shard& shard = i->first; PCMData& mdata = i->second; if( ! mdata.pcState ) continue; // Make sure all state is in shards verify( todo.find( shard ) != todo.end() ); verify( mdata.initialized == true ); if( ! mdata.completed ) verify( mdata.pcState->conn->ok() ); verify( mdata.pcState->cursor ); verify( mdata.pcState->primary || mdata.pcState->manager ); verify( ! mdata.retryNext ); if( mdata.completed ) verify( mdata.finished ); if( mdata.finished ) verify( mdata.initialized ); if( ! returnPartial ) verify( mdata.initialized ); } } void ParallelSortClusteredCursor::finishInit(){ bool returnPartial = ( _qSpec.options() & QueryOption_PartialResults ); bool specialVersion = _cInfo.versionedNS.size() > 0; string ns = specialVersion ? _cInfo.versionedNS : _qSpec.ns(); bool retry = false; map< string, StaleConfigException > staleNSExceptions; LOG( pc ) << "finishing over " << _cursorMap.size() << " shards" << endl; for( map< Shard, PCMData >::iterator i = _cursorMap.begin(), end = _cursorMap.end(); i != end; ++i ){ const Shard& shard = i->first; PCMData& mdata = i->second; LOG( pc ) << "finishing on shard " << shard << ", current connection state is " << mdata.toBSON() << endl; // Ignore empty conns for now if( ! mdata.pcState ) continue; PCStatePtr state = mdata.pcState; try { // Sanity checks if( ! mdata.completed ) verify( state->conn && state->conn->ok() ); verify( state->cursor ); verify( state->manager || state->primary ); verify( ! state->manager || ! state->primary ); // If we weren't init'ing lazily, ignore this if( ! mdata.finished ){ mdata.finished = true; // Mark the cursor as non-retry by default mdata.retryNext = false; if( ! state->cursor->initLazyFinish( mdata.retryNext ) ){ if( ! mdata.retryNext ){ uassert( 15988, "error querying server", false ); } else{ retry = true; continue; } } mdata.completed = false; } if( ! mdata.completed ){ mdata.completed = true; // Make sure we didn't get an error we should rethrow // TODO : Rename/refactor this to something better _checkCursor( state->cursor.get() ); // Finalize state state->cursor->attach( state->conn.get() ); // Closes connection for us LOG( pc ) << "finished on shard " << shard << ", current connection state is " << mdata.toBSON() << endl; } } catch( RecvStaleConfigException& e ){ retry = true; string staleNS = e.getns(); // For legacy reasons, ns may not always be set in exception :-( if( staleNS.size() == 0 ) staleNS = ns; // ns is versioned namespace, be careful of this // Will retry all at once staleNSExceptions[ staleNS ] = e; // Fully clear this cursor, as it needs to be re-established mdata.cleanup(); continue; } catch( SocketException& e ){ warning() << "socket exception when finishing on " << shard << ", current connection state is " << mdata.toBSON() << causedBy( e ) << endl; mdata.errored = true; if( returnPartial ){ mdata.cleanup(); continue; } throw; } catch( DBException& e ){ // NOTE: RECV() WILL NOT THROW A SOCKET EXCEPTION - WE GET THIS AS ERROR 15988 FROM // ABOVE if (e.getCode() == 15988) { warning() << "exception when receiving data from " << shard << ", current connection state is " << mdata.toBSON() << causedBy( e ) << endl; mdata.errored = true; if (returnPartial) { mdata.cleanup(); continue; } throw; } else { warning() << "db exception when finishing on " << shard << ", current connection state is " << mdata.toBSON() << causedBy( e ) << endl; mdata.errored = true; throw; } } catch( std::exception& e){ warning() << "exception when finishing on " << shard << ", current connection state is " << mdata.toBSON() << causedBy( e ) << endl; mdata.errored = true; throw; } catch( ... ){ warning() << "unknown exception when finishing on " << shard << ", current connection state is " << mdata.toBSON() << endl; mdata.errored = true; throw; } } // Retry logic for single refresh of namespaces / retry init'ing connections if( retry ){ // Refresh stale namespaces if( staleNSExceptions.size() ){ for( map::iterator i = staleNSExceptions.begin(), end = staleNSExceptions.end(); i != end; ++i ){ NamespaceString staleNS( i->first ); const StaleConfigException& exception = i->second; bool forceReload, fullReload; _markStaleNS( staleNS, exception, forceReload, fullReload ); int logLevel = fullReload ? 0 : 1; LOG( pc + logLevel ) << "stale config of ns " << staleNS << " on finishing query, will retry with forced : " << forceReload << ", full : " << fullReload << causedBy( exception ) << endl; // This is somewhat strange if( staleNS != ns ) warning() << "versioned ns " << ns << " doesn't match stale config namespace " << staleNS << endl; _handleStaleNS( staleNS, forceReload, fullReload ); } } // Re-establish connections we need to startInit(); finishInit(); return; } // Sanity check and clean final connections map< Shard, PCMData >::iterator i = _cursorMap.begin(); while( i != _cursorMap.end() ){ // const Shard& shard = i->first; PCMData& mdata = i->second; // Erase empty stuff if( ! mdata.pcState ){ log() << "PCursor erasing empty state " << mdata.toBSON() << endl; _cursorMap.erase( i++ ); continue; } else ++i; // Make sure all state is in shards verify( mdata.initialized == true ); verify( mdata.finished == true ); verify( mdata.completed == true ); verify( ! mdata.pcState->conn->ok() ); verify( mdata.pcState->cursor ); verify( mdata.pcState->primary || mdata.pcState->manager ); } // TODO : More cleanup of metadata? // LEGACY STUFF NOW _cursors = new FilteringClientCursor[ _cursorMap.size() ]; // Put the cursors in the legacy format int index = 0; for( map< Shard, PCMData >::iterator i = _cursorMap.begin(), end = _cursorMap.end(); i != end; ++i ){ PCMData& mdata = i->second; _cursors[ index ].reset( mdata.pcState->cursor.get(), &mdata ); _servers.insert( ServerAndQuery( i->first.getConnString(), BSONObj() ) ); index++; } _numServers = _cursorMap.size(); } bool ParallelSortClusteredCursor::isSharded() { // LEGACY is always unsharded if( _qSpec.isEmpty() ) return false; // We're always sharded if the number of cursors != 1 // TODO: Kept this way for compatibility with getPrimary(), but revisit if( _cursorMap.size() != 1 ) return true; // Return if the single cursor is sharded return NULL != _cursorMap.begin()->second.pcState->manager; } int ParallelSortClusteredCursor::getNumQueryShards() { return _cursorMap.size(); } ShardPtr ParallelSortClusteredCursor::getQueryShard() { return ShardPtr(new Shard(_cursorMap.begin()->first)); } void ParallelSortClusteredCursor::getQueryShards(set& shards) { for (map::iterator i = _cursorMap.begin(), end = _cursorMap.end(); i != end; ++i) { shards.insert(i->first); } } ShardPtr ParallelSortClusteredCursor::getPrimary() { if (isSharded()) return ShardPtr(); return _cursorMap.begin()->second.pcState->primary; } ChunkManagerPtr ParallelSortClusteredCursor::getChunkManager( const Shard& shard ) { if( ! isSharded() ) return ChunkManagerPtr(); map::iterator i = _cursorMap.find( shard ); if( i == _cursorMap.end() ) return ChunkManagerPtr(); else return i->second.pcState->manager; } DBClientCursorPtr ParallelSortClusteredCursor::getShardCursor( const Shard& shard ) { map::iterator i = _cursorMap.find( shard ); if( i == _cursorMap.end() ) return DBClientCursorPtr(); else return i->second.pcState->cursor; } static BSONObj _concatFilter( const BSONObj& filter , const BSONObj& extra ) { BSONObjBuilder b; b.appendElements( filter ); b.appendElements( extra ); return b.obj(); // TODO: should do some simplification here if possibl ideally } static BSONObj concatQuery( const BSONObj& query , const BSONObj& extraFilter ) { if ( ! query.hasField( "query" ) ) return _concatFilter( query , extraFilter ); BSONObjBuilder b; BSONObjIterator i( query ); while ( i.more() ) { BSONElement e = i.next(); if ( strcmp( e.fieldName() , "query" ) ) { b.append( e ); continue; } b.append( "query" , _concatFilter( e.embeddedObjectUserCheck() , extraFilter ) ); } return b.obj(); } // DEPRECATED void ParallelSortClusteredCursor::_oldInit() { // log() << "Starting parallel search..." << endl; // make sure we're not already initialized verify( ! _cursors ); _cursors = new FilteringClientCursor[_numServers]; bool returnPartial = ( _options & QueryOption_PartialResults ); vector queries( _servers.begin(), _servers.end() ); set retryQueries; int finishedQueries = 0; vector< shared_ptr > conns; vector servers; // Since we may get all sorts of errors, record them all as they come and throw them later if necessary vector staleConfigExs; vector socketExs; vector otherExs; bool allConfigStale = false; int retries = -1; // Loop through all the queries until we've finished or gotten a socket exception on all of them // We break early for non-socket exceptions, and socket exceptions if we aren't returning partial results do { retries++; bool firstPass = retryQueries.size() == 0; if( ! firstPass ){ log() << "retrying " << ( returnPartial ? "(partial) " : "" ) << "parallel connection to "; for( set::iterator it = retryQueries.begin(); it != retryQueries.end(); ++it ){ log() << queries[*it]._server << ", "; } log() << finishedQueries << " finished queries." << endl; } size_t num = 0; for ( vector::iterator it = queries.begin(); it != queries.end(); ++it ) { size_t i = num++; const ServerAndQuery& sq = *it; // If we're not retrying this cursor on later passes, continue if( ! firstPass && retryQueries.find( i ) == retryQueries.end() ) continue; // log() << "Querying " << _query << " from " << _ns << " for " << sq._server << endl; BSONObj q = _query; if ( ! sq._extra.isEmpty() ) { q = concatQuery( q , sq._extra ); } string errLoc = " @ " + sq._server; if( firstPass ){ // This may be the first time connecting to this shard, if so we can get an error here try { conns.push_back( shared_ptr( new ShardConnection( sq._server , _ns ) ) ); } catch( std::exception& e ){ socketExs.push_back( e.what() + errLoc ); if( ! returnPartial ){ num--; break; } conns.push_back( shared_ptr() ); continue; } servers.push_back( sq._server ); } if ( conns[i]->setVersion() ) { conns[i]->done(); // Version is zero b/c this is deprecated codepath staleConfigExs.push_back( str::stream() << "stale config detected for " << RecvStaleConfigException( _ns, "ParallelCursor::_init", ChunkVersion( 0, 0, OID() ), ChunkVersion( 0, 0, OID() ), true ).what() << errLoc ); break; } LOG(5) << "ParallelSortClusteredCursor::init server:" << sq._server << " ns:" << _ns << " query:" << q << " _fields:" << _fields << " options: " << _options << endl; if( ! _cursors[i].raw() ) _cursors[i].reset( new DBClientCursor( conns[i]->get() , _ns , q , 0 , // nToReturn 0 , // nToSkip _fields.isEmpty() ? 0 : &_fields , // fieldsToReturn _options , _batchSize == 0 ? 0 : _batchSize + _needToSkip // batchSize ) ); try{ _cursors[i].raw()->initLazy( ! firstPass ); } catch( SocketException& e ){ socketExs.push_back( e.what() + errLoc ); _cursors[i].reset( NULL ); conns[i]->done(); if( ! returnPartial ) break; } catch( std::exception& e){ otherExs.push_back( e.what() + errLoc ); _cursors[i].reset( NULL ); conns[i]->done(); break; } } // Go through all the potentially started cursors and finish initializing them or log any errors and // potentially retry // TODO: Better error classification would make this easier, errors are indicated in all sorts of ways // here that we need to trap. for ( size_t i = 0; i < num; i++ ) { // log() << "Finishing query for " << cons[i].get()->getHost() << endl; string errLoc = " @ " + queries[i]._server; if( ! _cursors[i].raw() || ( ! firstPass && retryQueries.find( i ) == retryQueries.end() ) ){ if( conns[i] ) conns[i].get()->done(); continue; } verify( conns[i] ); retryQueries.erase( i ); bool retry = false; try { if( ! _cursors[i].raw()->initLazyFinish( retry ) ) { warning() << "invalid result from " << conns[i]->getHost() << ( retry ? ", retrying" : "" ) << endl; _cursors[i].reset( NULL ); if( ! retry ){ socketExs.push_back( str::stream() << "error querying server: " << servers[i] ); conns[i]->done(); } else { retryQueries.insert( i ); } continue; } } catch ( StaleConfigException& e ){ // Our stored configuration data is actually stale, we need to reload it // when we throw our exception allConfigStale = true; staleConfigExs.push_back( (string)"stale config detected when receiving response for " + e.what() + errLoc ); _cursors[i].reset( NULL ); conns[i]->done(); continue; } catch ( SocketException& e ) { socketExs.push_back( e.what() + errLoc ); _cursors[i].reset( NULL ); conns[i]->done(); continue; } catch( std::exception& e ){ otherExs.push_back( e.what() + errLoc ); _cursors[i].reset( NULL ); conns[i]->done(); continue; } try { _cursors[i].raw()->attach( conns[i].get() ); // this calls done on conn _checkCursor( _cursors[i].raw() ); finishedQueries++; } catch ( StaleConfigException& e ){ // Our stored configuration data is actually stale, we need to reload it // when we throw our exception allConfigStale = true; staleConfigExs.push_back( (string)"stale config detected for " + e.what() + errLoc ); _cursors[i].reset( NULL ); conns[i]->done(); continue; } catch( std::exception& e ){ otherExs.push_back( e.what() + errLoc ); _cursors[i].reset( NULL ); conns[i]->done(); continue; } } // Don't exceed our max retries, should not happen verify( retries < 5 ); } while( retryQueries.size() > 0 /* something to retry */ && ( socketExs.size() == 0 || returnPartial ) /* no conn issues */ && staleConfigExs.size() == 0 /* no config issues */ && otherExs.size() == 0 /* no other issues */); // Assert that our conns are all closed! for( vector< shared_ptr >::iterator i = conns.begin(); i < conns.end(); ++i ){ verify( ! (*i) || ! (*i)->ok() ); } // Handle errors we got during initialization. // If we're returning partial results, we can ignore socketExs, but nothing else // Log a warning in any case, so we don't lose these messages bool throwException = ( socketExs.size() > 0 && ! returnPartial ) || staleConfigExs.size() > 0 || otherExs.size() > 0; if( socketExs.size() > 0 || staleConfigExs.size() > 0 || otherExs.size() > 0 ) { vector errMsgs; errMsgs.insert( errMsgs.end(), staleConfigExs.begin(), staleConfigExs.end() ); errMsgs.insert( errMsgs.end(), otherExs.begin(), otherExs.end() ); errMsgs.insert( errMsgs.end(), socketExs.begin(), socketExs.end() ); stringstream errMsg; errMsg << "could not initialize cursor across all shards because : "; for( vector::iterator i = errMsgs.begin(); i != errMsgs.end(); i++ ){ if( i != errMsgs.begin() ) errMsg << " :: and :: "; errMsg << *i; } if( throwException && staleConfigExs.size() > 0 ){ // Version is zero b/c this is deprecated codepath throw RecvStaleConfigException( _ns, errMsg.str(), ChunkVersion( 0, 0, OID() ), ChunkVersion( 0, 0, OID() ), !allConfigStale ); } else if( throwException ) throw DBException( errMsg.str(), 14827 ); else warning() << errMsg.str() << endl; } if( retries > 0 ) log() << "successfully finished parallel query after " << retries << " retries" << endl; } ParallelSortClusteredCursor::~ParallelSortClusteredCursor() { // WARNING: Commands (in particular M/R) connect via _oldInit() directly to shards bool isDirectShardCursor = _cursorMap.empty(); // Non-direct shard cursors are owned by the _cursorMap, so we release // them in the array here. Direct shard cursors clean themselves. if (!isDirectShardCursor) { for( int i = 0; i < _numServers; i++ ) _cursors[i].release(); } delete [] _cursors; _cursors = 0; // Clear out our metadata after removing legacy cursor data _cursorMap.clear(); // Just to be sure _done = true; } bool ParallelSortClusteredCursor::more() { if ( _needToSkip > 0 ) { int n = _needToSkip; _needToSkip = 0; while ( n > 0 && more() ) { BSONObj x = next(); n--; } _needToSkip = n; } for ( int i=0; i<_numServers; i++ ) { if ( _cursors[i].more() ) return true; } return false; } BSONObj ParallelSortClusteredCursor::next() { BSONObj best = BSONObj(); int bestFrom = -1; for( int j = 0; j < _numServers; j++ ){ // Iterate _numServers times, starting one past the last server we used. // This means we actually start at server #1, not #0, but shouldn't matter int i = ( j + _lastFrom + 1 ) % _numServers; if ( ! _cursors[i].more() ){ if( _cursors[i].rawMData() ) _cursors[i].rawMData()->pcState->done = true; continue; } BSONObj me = _cursors[i].peek(); if ( best.isEmpty() ) { best = me; bestFrom = i; if( _sortKey.isEmpty() ) break; continue; } int comp = best.woSortOrder( me , _sortKey , true ); if ( comp < 0 ) continue; best = me; bestFrom = i; } _lastFrom = bestFrom; uassert( 10019 , "no more elements" , ! best.isEmpty() ); _cursors[bestFrom].next(); if( _cursors[bestFrom].rawMData() ) _cursors[bestFrom].rawMData()->pcState->count++; return best; } void ParallelSortClusteredCursor::_explain( map< string,list >& out ) { set shards; getQueryShards( shards ); for( set::iterator i = shards.begin(), end = shards.end(); i != end; ++i ){ // TODO: Make this the shard name, not address list& l = out[ i->getAddress().toString() ]; l.push_back( getShardCursor( *i )->peekFirst().getOwned() ); } } // ----------------- // ---- Future ----- // ----------------- Future::CommandResult::CommandResult( const string& server, const string& db, const BSONObj& cmd, int options, DBClientBase * conn, bool useShardedConn ): _server(server), _db(db), _options(options), _cmd(cmd), _conn(conn), _useShardConn(useShardedConn), _done(false) { init(); } void Future::CommandResult::init(){ try { if ( ! _conn ){ if ( _useShardConn) { _connHolder.reset( new ShardConnection( _server, "" )); } else { _connHolder.reset( new ScopedDbConnection( _server ) ); } _conn = _connHolder->get(); } if ( _conn->lazySupported() ) { _cursor.reset( new DBClientCursor(_conn, _db + ".$cmd", _cmd, -1/*limit*/, 0, NULL, _options, 0)); _cursor->initLazy(); } else { _done = true; // we set _done first because even if there is an error we're done _ok = _conn->runCommand( _db , _cmd , _res , _options ); } } catch ( std::exception& e ) { error() << "Future::spawnCommand (part 1) exception: " << e.what() << endl; _ok = false; _done = true; } } bool Future::CommandResult::join( int maxRetries ) { if (_done) return _ok; _ok = false; for( int i = 1; i <= maxRetries; i++ ){ try { bool retry = false; bool finished = _cursor->initLazyFinish( retry ); // Shouldn't need to communicate with server any more if ( _connHolder ) _connHolder->done(); uassert(14812, str::stream() << "Error running command on server: " << _server, finished); massert(14813, "Command returned nothing", _cursor->more()); _res = _cursor->nextSafe(); _ok = _res["ok"].trueValue(); break; } catch ( RecvStaleConfigException& e ){ verify( versionManager.isVersionableCB( _conn ) ); // For legacy reasons, we may not always have a namespace :-( string staleNS = e.getns(); if( staleNS.size() == 0 ) staleNS = _db; if( i >= maxRetries ){ error() << "Future::spawnCommand (part 2) stale config exception" << causedBy( e ) << endl; throw e; } if( i >= maxRetries / 2 ){ if( ! versionManager.forceRemoteCheckShardVersionCB( staleNS ) ){ error() << "Future::spawnCommand (part 2) no config detected" << causedBy( e ) << endl; throw e; } } // We may not always have a collection, since we don't know from a generic command what collection // is supposed to be acted on, if any if( nsGetCollection( staleNS ).size() == 0 ){ warning() << "no collection namespace in stale config exception " << "for lazy command " << _cmd << ", could not refresh " << staleNS << endl; } else { versionManager.checkShardVersionCB( _conn, staleNS, false, 1 ); } LOG( i > 1 ? 0 : 1 ) << "retrying lazy command" << causedBy( e ) << endl; verify( _conn->lazySupported() ); _done = false; init(); continue; } catch ( std::exception& e ) { error() << "Future::spawnCommand (part 2) exception: " << causedBy( e ) << endl; break; } } _done = true; return _ok; } shared_ptr Future::spawnCommand( const string& server, const string& db, const BSONObj& cmd, int options, DBClientBase * conn, bool useShardConn ) { shared_ptr res ( new Future::CommandResult( server, db, cmd, options, conn, useShardConn)); return res; } }