diff options
Diffstat (limited to 'src/mongo/s/chunk_manager_targeter.cpp')
-rw-r--r-- | src/mongo/s/chunk_manager_targeter.cpp | 1177 |
1 files changed, 560 insertions, 617 deletions
diff --git a/src/mongo/s/chunk_manager_targeter.cpp b/src/mongo/s/chunk_manager_targeter.cpp index 7ae106245e6..fce380ef8e9 100644 --- a/src/mongo/s/chunk_manager_targeter.cpp +++ b/src/mongo/s/chunk_manager_targeter.cpp @@ -37,759 +37,702 @@ namespace mongo { - using std::endl; - using std::map; - using std::set; - using std::string; - using std::vector; - - using mongoutils::str::stream; - - /** - * Helper to get the DBConfigPtr object in an exception-safe way. - */ - static bool getDBConfigSafe( const StringData& db, DBConfigPtr& config, string* errMsg ) { - try { - config = grid.getDBConfig( db, true ); - if ( !config ) *errMsg = stream() << "could not load or create database " << db; - } - catch ( const DBException& ex ) { - *errMsg = ex.toString(); - } - - return config.get(); - } - - ChunkManagerTargeter::ChunkManagerTargeter() : - _needsTargetingRefresh( false ), _stats( new TargeterStats ) { - } - - Status ChunkManagerTargeter::init( const NamespaceString& nss ) { - - _nss = nss; - - // - // Get the latest metadata information from the cache - // +using std::endl; +using std::map; +using std::set; +using std::string; +using std::vector; - DBConfigPtr config; +using mongoutils::str::stream; - string errMsg; - if ( !getDBConfigSafe( _nss.db(), config, &errMsg ) ) { - return Status( ErrorCodes::DatabaseNotFound, errMsg ); - } - - // Get either the chunk manager or primary shard - config->getChunkManagerOrPrimary( _nss.ns(), _manager, _primary ); - - return Status::OK(); - } - - const NamespaceString& ChunkManagerTargeter::getNS() const { - return _nss; +/** + * Helper to get the DBConfigPtr object in an exception-safe way. + */ +static bool getDBConfigSafe(const StringData& db, DBConfigPtr& config, string* errMsg) { + try { + config = grid.getDBConfig(db, true); + if (!config) + *errMsg = stream() << "could not load or create database " << db; + } catch (const DBException& ex) { + *errMsg = ex.toString(); } - Status ChunkManagerTargeter::targetInsert( const BSONObj& doc, - ShardEndpoint** endpoint ) const { - - BSONObj shardKey; - - if ( _manager ) { - - // - // Sharded collections have the following requirements for targeting: - // - // Inserts must contain the exact shard key. - // + return config.get(); +} - shardKey = _manager->getShardKeyPattern().extractShardKeyFromDoc(doc); +ChunkManagerTargeter::ChunkManagerTargeter() + : _needsTargetingRefresh(false), _stats(new TargeterStats) {} - // Check shard key exists - if (shardKey.isEmpty()) { - return Status(ErrorCodes::ShardKeyNotFound, - stream() << "document " << doc - << " does not contain shard key for pattern " - << _manager->getShardKeyPattern().toString()); - } - - // Check shard key size on insert - Status status = ShardKeyPattern::checkShardKeySize(shardKey); - if (!status.isOK()) - return status; - } +Status ChunkManagerTargeter::init(const NamespaceString& nss) { + _nss = nss; - // Target the shard key or database primary - if (!shardKey.isEmpty()) { - return targetShardKey(shardKey, doc.objsize(), endpoint); - } - else { + // + // Get the latest metadata information from the cache + // - if (!_primary) { - return Status(ErrorCodes::NamespaceNotFound, - str::stream() << "could not target insert in collection " - << getNS().ns() << "; no metadata found"); - } + DBConfigPtr config; - *endpoint = new ShardEndpoint(_primary->getName(), ChunkVersion::UNSHARDED()); - return Status::OK(); - } + string errMsg; + if (!getDBConfigSafe(_nss.db(), config, &errMsg)) { + return Status(ErrorCodes::DatabaseNotFound, errMsg); } - namespace { - - // TODO: Expose these for unit testing via dbtests - - enum UpdateType { - UpdateType_Replacement, UpdateType_OpStyle, UpdateType_Unknown - }; - - /** - * There are two styles of update expressions: - * coll.update({ x : 1 }, { y : 2 }) // Replacement style - * coll.update({ x : 1 }, { $set : { y : 2 } }) // OpStyle - */ - UpdateType getUpdateExprType( const BSONObj& updateExpr ) { - - UpdateType updateType = UpdateType_Unknown; - - // Empty update is replacement-style, by default - if ( updateExpr.isEmpty() ) return UpdateType_Replacement; - - BSONObjIterator it( updateExpr ); - while ( it.more() ) { - BSONElement next = it.next(); - - if ( next.fieldName()[0] == '$' ) { - if ( updateType == UpdateType_Unknown ) { - updateType = UpdateType_OpStyle; - } - else if ( updateType == UpdateType_Replacement ) { - return UpdateType_Unknown; - } - } - else { - if ( updateType == UpdateType_Unknown ) { - updateType = UpdateType_Replacement; - } - else if ( updateType == UpdateType_OpStyle ) { - return UpdateType_Unknown; - } - } - } + // Get either the chunk manager or primary shard + config->getChunkManagerOrPrimary(_nss.ns(), _manager, _primary); - return updateType; - } + return Status::OK(); +} - /** - * This returns "does the query have an _id field" and "is the _id field - * querying for a direct value like _id : 3 and not _id : { $gt : 3 }" - * - * Ex: { _id : 1 } => true - * { foo : <anything>, _id : 1 } => true - * { _id : { $lt : 30 } } => false - * { foo : <anything> } => false - */ - bool isExactIdQuery(const BSONObj& query) { - static const ShardKeyPattern virtualIdShardKey(BSON("_id" << 1)); - StatusWith<BSONObj> status = virtualIdShardKey.extractShardKeyFromQuery(query); - if (!status.isOK()) - return false; - return !status.getValue()["_id"].eoo(); - } - } +const NamespaceString& ChunkManagerTargeter::getNS() const { + return _nss; +} - Status ChunkManagerTargeter::targetUpdate( const BatchedUpdateDocument& updateDoc, - vector<ShardEndpoint*>* endpoints ) const { +Status ChunkManagerTargeter::targetInsert(const BSONObj& doc, ShardEndpoint** endpoint) const { + BSONObj shardKey; + if (_manager) { // - // Update targeting may use either the query or the update. This is to support save-style - // updates, of the form: + // Sharded collections have the following requirements for targeting: // - // coll.update({ _id : xxx }, { _id : xxx, shardKey : 1, foo : bar }, { upsert : true }) - // - // Because drivers do not know the shard key, they can't pull the shard key automatically - // into the query doc, and to correctly support upsert we must target a single shard. - // - // The rule is simple - If the update is replacement style (no '$set'), we target using the - // update. If the update is replacement style, we target using the query. - // - // If we have the exact shard key in either the query or replacement doc, we target using - // that extracted key. + // Inserts must contain the exact shard key. // - BSONObj query = updateDoc.getQuery(); - BSONObj updateExpr = updateDoc.getUpdateExpr(); + shardKey = _manager->getShardKeyPattern().extractShardKeyFromDoc(doc); - UpdateType updateType = getUpdateExprType( updateDoc.getUpdateExpr() ); - - if ( updateType == UpdateType_Unknown ) { - return Status( ErrorCodes::UnsupportedFormat, - stream() << "update document " << updateExpr - << " has mixed $operator and non-$operator style fields" ); + // Check shard key exists + if (shardKey.isEmpty()) { + return Status(ErrorCodes::ShardKeyNotFound, + stream() << "document " << doc + << " does not contain shard key for pattern " + << _manager->getShardKeyPattern().toString()); } - BSONObj shardKey; - - if ( _manager ) { - - // - // Sharded collections have the following futher requirements for targeting: - // - // Upserts must be targeted exactly by shard key. - // Non-multi updates must be targeted exactly by shard key *or* exact _id. - // - - // Get the shard key - if (updateType == UpdateType_OpStyle) { - - // Target using the query - StatusWith<BSONObj> status = - _manager->getShardKeyPattern().extractShardKeyFromQuery(query); + // Check shard key size on insert + Status status = ShardKeyPattern::checkShardKeySize(shardKey); + if (!status.isOK()) + return status; + } - // Bad query - if (!status.isOK()) - return status.getStatus(); + // Target the shard key or database primary + if (!shardKey.isEmpty()) { + return targetShardKey(shardKey, doc.objsize(), endpoint); + } else { + if (!_primary) { + return Status(ErrorCodes::NamespaceNotFound, + str::stream() << "could not target insert in collection " << getNS().ns() + << "; no metadata found"); + } - shardKey = status.getValue(); - } - else { - // Target using the replacement document - shardKey = _manager->getShardKeyPattern().extractShardKeyFromDoc(updateExpr); - } + *endpoint = new ShardEndpoint(_primary->getName(), ChunkVersion::UNSHARDED()); + return Status::OK(); + } +} - // - // Extra sharded update validation - // +namespace { - if (updateDoc.getUpsert()) { +// TODO: Expose these for unit testing via dbtests - // Sharded upserts *always* need to be exactly targeted by shard key - if (shardKey.isEmpty()) { - return Status(ErrorCodes::ShardKeyNotFound, - stream() << "upsert " << updateDoc.toBSON() - << " does not contain shard key for pattern " - << _manager->getShardKeyPattern().toString()); - } +enum UpdateType { UpdateType_Replacement, UpdateType_OpStyle, UpdateType_Unknown }; - // Also check shard key size on upsert - Status status = ShardKeyPattern::checkShardKeySize(shardKey); - if (!status.isOK()) - return status; +/** + * There are two styles of update expressions: + * coll.update({ x : 1 }, { y : 2 }) // Replacement style + * coll.update({ x : 1 }, { $set : { y : 2 } }) // OpStyle + */ +UpdateType getUpdateExprType(const BSONObj& updateExpr) { + UpdateType updateType = UpdateType_Unknown; + + // Empty update is replacement-style, by default + if (updateExpr.isEmpty()) + return UpdateType_Replacement; + + BSONObjIterator it(updateExpr); + while (it.more()) { + BSONElement next = it.next(); + + if (next.fieldName()[0] == '$') { + if (updateType == UpdateType_Unknown) { + updateType = UpdateType_OpStyle; + } else if (updateType == UpdateType_Replacement) { + return UpdateType_Unknown; } - - // Validate that single (non-multi) sharded updates are targeted by shard key or _id - if (!updateDoc.getMulti() && shardKey.isEmpty() - && !isExactIdQuery(updateDoc.getQuery())) { - return Status(ErrorCodes::ShardKeyNotFound, - stream() << "update " << updateDoc.toBSON() - << " does not contain _id or shard key for pattern " - << _manager->getShardKeyPattern().toString()); + } else { + if (updateType == UpdateType_Unknown) { + updateType = UpdateType_Replacement; + } else if (updateType == UpdateType_OpStyle) { + return UpdateType_Unknown; } } - - // Target the shard key, query, or replacement doc - if (!shardKey.isEmpty()) { - // We can't rely on our query targeting to be exact - ShardEndpoint* endpoint = NULL; - Status result = targetShardKey(shardKey, - (query.objsize() + updateExpr.objsize()), - &endpoint); - endpoints->push_back(endpoint); - return result; - } - else if (updateType == UpdateType_OpStyle) { - return targetQuery(query, endpoints); - } - else { - return targetDoc(updateExpr, endpoints); - } } - Status ChunkManagerTargeter::targetDelete( const BatchedDeleteDocument& deleteDoc, - vector<ShardEndpoint*>* endpoints ) const { + return updateType; +} - BSONObj shardKey; +/** + * This returns "does the query have an _id field" and "is the _id field + * querying for a direct value like _id : 3 and not _id : { $gt : 3 }" + * + * Ex: { _id : 1 } => true + * { foo : <anything>, _id : 1 } => true + * { _id : { $lt : 30 } } => false + * { foo : <anything> } => false + */ +bool isExactIdQuery(const BSONObj& query) { + static const ShardKeyPattern virtualIdShardKey(BSON("_id" << 1)); + StatusWith<BSONObj> status = virtualIdShardKey.extractShardKeyFromQuery(query); + if (!status.isOK()) + return false; + return !status.getValue()["_id"].eoo(); +} +} + +Status ChunkManagerTargeter::targetUpdate(const BatchedUpdateDocument& updateDoc, + vector<ShardEndpoint*>* endpoints) const { + // + // Update targeting may use either the query or the update. This is to support save-style + // updates, of the form: + // + // coll.update({ _id : xxx }, { _id : xxx, shardKey : 1, foo : bar }, { upsert : true }) + // + // Because drivers do not know the shard key, they can't pull the shard key automatically + // into the query doc, and to correctly support upsert we must target a single shard. + // + // The rule is simple - If the update is replacement style (no '$set'), we target using the + // update. If the update is replacement style, we target using the query. + // + // If we have the exact shard key in either the query or replacement doc, we target using + // that extracted key. + // + + BSONObj query = updateDoc.getQuery(); + BSONObj updateExpr = updateDoc.getUpdateExpr(); + + UpdateType updateType = getUpdateExprType(updateDoc.getUpdateExpr()); + + if (updateType == UpdateType_Unknown) { + return Status(ErrorCodes::UnsupportedFormat, + stream() << "update document " << updateExpr + << " has mixed $operator and non-$operator style fields"); + } - if ( _manager ) { + BSONObj shardKey; - // - // Sharded collections have the following further requirements for targeting: - // - // Limit-1 deletes must be targeted exactly by shard key *or* exact _id - // + if (_manager) { + // + // Sharded collections have the following futher requirements for targeting: + // + // Upserts must be targeted exactly by shard key. + // Non-multi updates must be targeted exactly by shard key *or* exact _id. + // - // Get the shard key + // Get the shard key + if (updateType == UpdateType_OpStyle) { + // Target using the query StatusWith<BSONObj> status = - _manager->getShardKeyPattern().extractShardKeyFromQuery(deleteDoc.getQuery()); + _manager->getShardKeyPattern().extractShardKeyFromQuery(query); // Bad query if (!status.isOK()) return status.getStatus(); shardKey = status.getValue(); + } else { + // Target using the replacement document + shardKey = _manager->getShardKeyPattern().extractShardKeyFromDoc(updateExpr); + } - // Validate that single (limit-1) sharded deletes are targeted by shard key or _id - if (deleteDoc.getLimit() == 1 && shardKey.isEmpty() - && !isExactIdQuery(deleteDoc.getQuery())) { + // + // Extra sharded update validation + // + + if (updateDoc.getUpsert()) { + // Sharded upserts *always* need to be exactly targeted by shard key + if (shardKey.isEmpty()) { return Status(ErrorCodes::ShardKeyNotFound, - stream() << "delete " << deleteDoc.toBSON() - << " does not contain _id or shard key for pattern " + stream() << "upsert " << updateDoc.toBSON() + << " does not contain shard key for pattern " << _manager->getShardKeyPattern().toString()); } - } - // Target the shard key or delete query - if (!shardKey.isEmpty()) { - // We can't rely on our query targeting to be exact - ShardEndpoint* endpoint = NULL; - Status result = targetShardKey(shardKey, 0, &endpoint); - endpoints->push_back(endpoint); - return result; + // Also check shard key size on upsert + Status status = ShardKeyPattern::checkShardKeySize(shardKey); + if (!status.isOK()) + return status; } - else { - return targetQuery(deleteDoc.getQuery(), endpoints); + + // Validate that single (non-multi) sharded updates are targeted by shard key or _id + if (!updateDoc.getMulti() && shardKey.isEmpty() && !isExactIdQuery(updateDoc.getQuery())) { + return Status(ErrorCodes::ShardKeyNotFound, + stream() << "update " << updateDoc.toBSON() + << " does not contain _id or shard key for pattern " + << _manager->getShardKeyPattern().toString()); } } - Status ChunkManagerTargeter::targetDoc(const BSONObj& doc, - vector<ShardEndpoint*>* endpoints) const { - // NOTE: This is weird and fragile, but it's the way our language works right now - - // documents are either A) invalid or B) valid equality queries over themselves. - return targetQuery(doc, endpoints); + // Target the shard key, query, or replacement doc + if (!shardKey.isEmpty()) { + // We can't rely on our query targeting to be exact + ShardEndpoint* endpoint = NULL; + Status result = + targetShardKey(shardKey, (query.objsize() + updateExpr.objsize()), &endpoint); + endpoints->push_back(endpoint); + return result; + } else if (updateType == UpdateType_OpStyle) { + return targetQuery(query, endpoints); + } else { + return targetDoc(updateExpr, endpoints); } +} - Status ChunkManagerTargeter::targetQuery( const BSONObj& query, - vector<ShardEndpoint*>* endpoints ) const { +Status ChunkManagerTargeter::targetDelete(const BatchedDeleteDocument& deleteDoc, + vector<ShardEndpoint*>* endpoints) const { + BSONObj shardKey; - if ( !_primary && !_manager ) { - return Status( ErrorCodes::NamespaceNotFound, - str::stream() << "could not target query in " - << getNS().ns() - << "; no metadata found" ); - } + if (_manager) { + // + // Sharded collections have the following further requirements for targeting: + // + // Limit-1 deletes must be targeted exactly by shard key *or* exact _id + // - set<Shard> shards; - if ( _manager ) { - try { - _manager->getShardsForQuery( shards, query ); - } catch ( const DBException& ex ) { - return ex.toStatus(); - } - } - else { - shards.insert( *_primary ); - } + // Get the shard key + StatusWith<BSONObj> status = + _manager->getShardKeyPattern().extractShardKeyFromQuery(deleteDoc.getQuery()); - for ( set<Shard>::iterator it = shards.begin(); it != shards.end(); ++it ) { - endpoints->push_back(new ShardEndpoint(it->getName(), - _manager ? - _manager->getVersion(it->getName()) : - ChunkVersion::UNSHARDED())); + // Bad query + if (!status.isOK()) + return status.getStatus(); + + shardKey = status.getValue(); + + // Validate that single (limit-1) sharded deletes are targeted by shard key or _id + if (deleteDoc.getLimit() == 1 && shardKey.isEmpty() && + !isExactIdQuery(deleteDoc.getQuery())) { + return Status(ErrorCodes::ShardKeyNotFound, + stream() << "delete " << deleteDoc.toBSON() + << " does not contain _id or shard key for pattern " + << _manager->getShardKeyPattern().toString()); } + } - return Status::OK(); + // Target the shard key or delete query + if (!shardKey.isEmpty()) { + // We can't rely on our query targeting to be exact + ShardEndpoint* endpoint = NULL; + Status result = targetShardKey(shardKey, 0, &endpoint); + endpoints->push_back(endpoint); + return result; + } else { + return targetQuery(deleteDoc.getQuery(), endpoints); + } +} + +Status ChunkManagerTargeter::targetDoc(const BSONObj& doc, + vector<ShardEndpoint*>* endpoints) const { + // NOTE: This is weird and fragile, but it's the way our language works right now - + // documents are either A) invalid or B) valid equality queries over themselves. + return targetQuery(doc, endpoints); +} + +Status ChunkManagerTargeter::targetQuery(const BSONObj& query, + vector<ShardEndpoint*>* endpoints) const { + if (!_primary && !_manager) { + return Status(ErrorCodes::NamespaceNotFound, + str::stream() << "could not target query in " << getNS().ns() + << "; no metadata found"); } - Status ChunkManagerTargeter::targetShardKey(const BSONObj& shardKey, - long long estDataSize, - ShardEndpoint** endpoint) const { - invariant(NULL != _manager); + set<Shard> shards; + if (_manager) { + try { + _manager->getShardsForQuery(shards, query); + } catch (const DBException& ex) { + return ex.toStatus(); + } + } else { + shards.insert(*_primary); + } - ChunkPtr chunk = _manager->findIntersectingChunk(shardKey); + for (set<Shard>::iterator it = shards.begin(); it != shards.end(); ++it) { + endpoints->push_back(new ShardEndpoint(it->getName(), + _manager ? _manager->getVersion(it->getName()) + : ChunkVersion::UNSHARDED())); + } - // Track autosplit stats for sharded collections - // Note: this is only best effort accounting and is not accurate. - if (estDataSize > 0) - _stats->chunkSizeDelta[chunk->getMin()] += estDataSize; + return Status::OK(); +} - Shard shard = chunk->getShard(); - *endpoint = new ShardEndpoint(shard.getName(), - _manager->getVersion(shard.getName())); +Status ChunkManagerTargeter::targetShardKey(const BSONObj& shardKey, + long long estDataSize, + ShardEndpoint** endpoint) const { + invariant(NULL != _manager); - return Status::OK(); - } + ChunkPtr chunk = _manager->findIntersectingChunk(shardKey); - Status ChunkManagerTargeter::targetCollection( vector<ShardEndpoint*>* endpoints ) const { + // Track autosplit stats for sharded collections + // Note: this is only best effort accounting and is not accurate. + if (estDataSize > 0) + _stats->chunkSizeDelta[chunk->getMin()] += estDataSize; - if ( !_primary && !_manager ) { - return Status( ErrorCodes::NamespaceNotFound, - str::stream() << "could not target full range of " - << getNS().ns() - << "; metadata not found" ); - } + Shard shard = chunk->getShard(); + *endpoint = new ShardEndpoint(shard.getName(), _manager->getVersion(shard.getName())); - set<Shard> shards; - if ( _manager ) { - _manager->getAllShards( shards ); - } - else { - shards.insert( *_primary ); - } + return Status::OK(); +} - for ( set<Shard>::iterator it = shards.begin(); it != shards.end(); ++it ) { - endpoints->push_back(new ShardEndpoint(it->getName(), - _manager ? - _manager->getVersion(it->getName()) : - ChunkVersion::UNSHARDED())); - } +Status ChunkManagerTargeter::targetCollection(vector<ShardEndpoint*>* endpoints) const { + if (!_primary && !_manager) { + return Status(ErrorCodes::NamespaceNotFound, + str::stream() << "could not target full range of " << getNS().ns() + << "; metadata not found"); + } - return Status::OK(); + set<Shard> shards; + if (_manager) { + _manager->getAllShards(shards); + } else { + shards.insert(*_primary); } - Status ChunkManagerTargeter::targetAllShards( vector<ShardEndpoint*>* endpoints ) const { + for (set<Shard>::iterator it = shards.begin(); it != shards.end(); ++it) { + endpoints->push_back(new ShardEndpoint(it->getName(), + _manager ? _manager->getVersion(it->getName()) + : ChunkVersion::UNSHARDED())); + } - if ( !_primary && !_manager ) { - return Status( ErrorCodes::NamespaceNotFound, - str::stream() << "could not target every shard with versions for " - << getNS().ns() - << "; metadata not found" ); - } + return Status::OK(); +} - vector<Shard> shards; - Shard::getAllShards( shards ); +Status ChunkManagerTargeter::targetAllShards(vector<ShardEndpoint*>* endpoints) const { + if (!_primary && !_manager) { + return Status(ErrorCodes::NamespaceNotFound, + str::stream() << "could not target every shard with versions for " + << getNS().ns() << "; metadata not found"); + } - for ( vector<Shard>::iterator it = shards.begin(); it != shards.end(); ++it ) { - endpoints->push_back(new ShardEndpoint(it->getName(), - _manager ? - _manager->getVersion(it->getName()) : - ChunkVersion::UNSHARDED())); - } + vector<Shard> shards; + Shard::getAllShards(shards); - return Status::OK(); + for (vector<Shard>::iterator it = shards.begin(); it != shards.end(); ++it) { + endpoints->push_back(new ShardEndpoint(it->getName(), + _manager ? _manager->getVersion(it->getName()) + : ChunkVersion::UNSHARDED())); } - namespace { - - // - // Utilities to compare shard versions - // + return Status::OK(); +} - enum CompareResult { - CompareResult_Unknown, CompareResult_GTE, CompareResult_LT - }; - - /** - * Returns the relationship of two shard versions. Shard versions of a collection that has - * not been dropped and recreated and where there is at least one chunk on a shard are - * comparable, otherwise the result is ambiguous. - */ - CompareResult compareShardVersions( const ChunkVersion& shardVersionA, - const ChunkVersion& shardVersionB ) { - - // Collection may have been dropped - if ( !shardVersionA.hasEqualEpoch( shardVersionB ) ) return CompareResult_Unknown; - - // Zero shard versions are only comparable to themselves - if ( !shardVersionA.isSet() || !shardVersionB.isSet() ) { - // If both are zero... - if ( !shardVersionA.isSet() && !shardVersionB.isSet() ) return CompareResult_GTE; - // Otherwise... - return CompareResult_Unknown; - } +namespace { - if ( shardVersionA < shardVersionB ) return CompareResult_LT; - else return CompareResult_GTE; - } +// +// Utilities to compare shard versions +// - ChunkVersion getShardVersion( const StringData& shardName, - const ChunkManagerPtr& manager, - const ShardPtr& primary ) { +enum CompareResult { CompareResult_Unknown, CompareResult_GTE, CompareResult_LT }; - dassert( !( manager && primary ) ); - dassert( manager || primary ); +/** + * Returns the relationship of two shard versions. Shard versions of a collection that has + * not been dropped and recreated and where there is at least one chunk on a shard are + * comparable, otherwise the result is ambiguous. + */ +CompareResult compareShardVersions(const ChunkVersion& shardVersionA, + const ChunkVersion& shardVersionB) { + // Collection may have been dropped + if (!shardVersionA.hasEqualEpoch(shardVersionB)) + return CompareResult_Unknown; + + // Zero shard versions are only comparable to themselves + if (!shardVersionA.isSet() || !shardVersionB.isSet()) { + // If both are zero... + if (!shardVersionA.isSet() && !shardVersionB.isSet()) + return CompareResult_GTE; + // Otherwise... + return CompareResult_Unknown; + } - if ( primary ) return ChunkVersion::UNSHARDED(); + if (shardVersionA < shardVersionB) + return CompareResult_LT; + else + return CompareResult_GTE; +} - return manager->getVersion(shardName.toString()); - } +ChunkVersion getShardVersion(const StringData& shardName, + const ChunkManagerPtr& manager, + const ShardPtr& primary) { + dassert(!(manager && primary)); + dassert(manager || primary); - /** - * Returns the relationship between two maps of shard versions. As above, these maps are - * often comparable when the collection has not been dropped and there is at least one - * chunk on the shards. - * - * If any versions in the maps are not comparable, the result is _Unknown. - * - * If any versions in the first map (cached) are _LT the versions in the second map - * (remote), the first (cached) versions are _LT the second (remote) versions. - * - * Note that the signature here is weird since our cached map of chunk versions is - * stored in a ChunkManager or is implicit in the primary shard of the collection. - */ - CompareResult // - compareAllShardVersions( const ChunkManagerPtr& cachedShardVersions, - const ShardPtr& cachedPrimary, - const map<string, ChunkVersion>& remoteShardVersions ) { - - CompareResult finalResult = CompareResult_GTE; - - for ( map<string, ChunkVersion>::const_iterator it = remoteShardVersions.begin(); - it != remoteShardVersions.end(); ++it ) { - - // - // Get the remote and cached version for the next shard - // - - const string& shardName = it->first; - const ChunkVersion& remoteShardVersion = it->second; - ChunkVersion cachedShardVersion; - - try { - // Throws b/c shard constructor throws - cachedShardVersion = getShardVersion( shardName, - cachedShardVersions, - cachedPrimary ); - } - catch ( const DBException& ex ) { - - warning() << "could not lookup shard " << shardName - << " in local cache, shard metadata may have changed" - << " or be unavailable" << causedBy( ex ) << endl; - - return CompareResult_Unknown; - } - - // - // Compare the remote and cached versions - // - - CompareResult result = compareShardVersions( cachedShardVersion, - remoteShardVersion ); - - if ( result == CompareResult_Unknown ) return result; - if ( result == CompareResult_LT ) finalResult = CompareResult_LT; - - // Note that we keep going after _LT b/c there could be more _Unknowns. - } + if (primary) + return ChunkVersion::UNSHARDED(); - return finalResult; - } + return manager->getVersion(shardName.toString()); +} - /** - * Whether or not the manager/primary pair is different from the other manager/primary pair - */ - bool isMetadataDifferent( const ChunkManagerPtr& managerA, - const ShardPtr& primaryA, - const ChunkManagerPtr& managerB, - const ShardPtr& primaryB ) { +/** + * Returns the relationship between two maps of shard versions. As above, these maps are + * often comparable when the collection has not been dropped and there is at least one + * chunk on the shards. + * + * If any versions in the maps are not comparable, the result is _Unknown. + * + * If any versions in the first map (cached) are _LT the versions in the second map + * (remote), the first (cached) versions are _LT the second (remote) versions. + * + * Note that the signature here is weird since our cached map of chunk versions is + * stored in a ChunkManager or is implicit in the primary shard of the collection. + */ +CompareResult // + compareAllShardVersions(const ChunkManagerPtr& cachedShardVersions, + const ShardPtr& cachedPrimary, + const map<string, ChunkVersion>& remoteShardVersions) { + CompareResult finalResult = CompareResult_GTE; + + for (map<string, ChunkVersion>::const_iterator it = remoteShardVersions.begin(); + it != remoteShardVersions.end(); + ++it) { + // + // Get the remote and cached version for the next shard + // - if ( ( managerA && !managerB ) || ( !managerA && managerB ) || ( primaryA && !primaryB ) - || ( !primaryA && primaryB ) ) return true; + const string& shardName = it->first; + const ChunkVersion& remoteShardVersion = it->second; + ChunkVersion cachedShardVersion; - if ( managerA ) { - return !managerA->getVersion().isStrictlyEqualTo( managerB->getVersion() ); - } + try { + // Throws b/c shard constructor throws + cachedShardVersion = getShardVersion(shardName, cachedShardVersions, cachedPrimary); + } catch (const DBException& ex) { + warning() << "could not lookup shard " << shardName + << " in local cache, shard metadata may have changed" + << " or be unavailable" << causedBy(ex) << endl; - dassert( NULL != primaryA.get() ); - return primaryA->getName() != primaryB->getName(); + return CompareResult_Unknown; } - /** - * Whether or not the manager/primary pair was changed or refreshed from a previous version - * of the metadata. - */ - bool wasMetadataRefreshed( const ChunkManagerPtr& managerA, - const ShardPtr& primaryA, - const ChunkManagerPtr& managerB, - const ShardPtr& primaryB ) { - - if ( isMetadataDifferent( managerA, primaryA, managerB, primaryB ) ) - return true; - - if ( managerA ) { - dassert( managerB.get() ); // otherwise metadata would be different - return managerA->getSequenceNumber() != managerB->getSequenceNumber(); - } + // + // Compare the remote and cached versions + // - return false; - } + CompareResult result = compareShardVersions(cachedShardVersion, remoteShardVersion); + + if (result == CompareResult_Unknown) + return result; + if (result == CompareResult_LT) + finalResult = CompareResult_LT; + + // Note that we keep going after _LT b/c there could be more _Unknowns. } - void ChunkManagerTargeter::noteStaleResponse( const ShardEndpoint& endpoint, - const BSONObj& staleInfo ) { - dassert( !_needsTargetingRefresh ); + return finalResult; +} - ChunkVersion remoteShardVersion; - if ( staleInfo["vWanted"].eoo() ) { - // If we don't have a vWanted sent, assume the version is higher than our current - // version. - remoteShardVersion = getShardVersion( endpoint.shardName, _manager, _primary ); - remoteShardVersion.incMajor(); - } - else { - remoteShardVersion = ChunkVersion::fromBSON( staleInfo, "vWanted" ); - } +/** + * Whether or not the manager/primary pair is different from the other manager/primary pair + */ +bool isMetadataDifferent(const ChunkManagerPtr& managerA, + const ShardPtr& primaryA, + const ChunkManagerPtr& managerB, + const ShardPtr& primaryB) { + if ((managerA && !managerB) || (!managerA && managerB) || (primaryA && !primaryB) || + (!primaryA && primaryB)) + return true; + + if (managerA) { + return !managerA->getVersion().isStrictlyEqualTo(managerB->getVersion()); + } - ShardVersionMap::iterator it = _remoteShardVersions.find( endpoint.shardName ); - if ( it == _remoteShardVersions.end() ) { - _remoteShardVersions.insert( make_pair( endpoint.shardName, remoteShardVersion ) ); - } - else { - ChunkVersion& previouslyNotedVersion = it->second; - if ( previouslyNotedVersion.hasEqualEpoch( remoteShardVersion )) { - if ( previouslyNotedVersion.isOlderThan( remoteShardVersion )) { - remoteShardVersion.cloneTo( &previouslyNotedVersion ); - } - } - else { - // Epoch changed midway while applying the batch so set the version to - // something unique and non-existent to force a reload when - // refreshIsNeeded is called. - ChunkVersion::IGNORED().cloneTo( &previouslyNotedVersion ); - } - } + dassert(NULL != primaryA.get()); + return primaryA->getName() != primaryB->getName(); +} + +/** + * Whether or not the manager/primary pair was changed or refreshed from a previous version + * of the metadata. + */ +bool wasMetadataRefreshed(const ChunkManagerPtr& managerA, + const ShardPtr& primaryA, + const ChunkManagerPtr& managerB, + const ShardPtr& primaryB) { + if (isMetadataDifferent(managerA, primaryA, managerB, primaryB)) + return true; + + if (managerA) { + dassert(managerB.get()); // otherwise metadata would be different + return managerA->getSequenceNumber() != managerB->getSequenceNumber(); } - void ChunkManagerTargeter::noteCouldNotTarget() { - dassert( _remoteShardVersions.empty() ); - _needsTargetingRefresh = true; + return false; +} +} + +void ChunkManagerTargeter::noteStaleResponse(const ShardEndpoint& endpoint, + const BSONObj& staleInfo) { + dassert(!_needsTargetingRefresh); + + ChunkVersion remoteShardVersion; + if (staleInfo["vWanted"].eoo()) { + // If we don't have a vWanted sent, assume the version is higher than our current + // version. + remoteShardVersion = getShardVersion(endpoint.shardName, _manager, _primary); + remoteShardVersion.incMajor(); + } else { + remoteShardVersion = ChunkVersion::fromBSON(staleInfo, "vWanted"); } - const TargeterStats* ChunkManagerTargeter::getStats() const { - return _stats.get(); + ShardVersionMap::iterator it = _remoteShardVersions.find(endpoint.shardName); + if (it == _remoteShardVersions.end()) { + _remoteShardVersions.insert(make_pair(endpoint.shardName, remoteShardVersion)); + } else { + ChunkVersion& previouslyNotedVersion = it->second; + if (previouslyNotedVersion.hasEqualEpoch(remoteShardVersion)) { + if (previouslyNotedVersion.isOlderThan(remoteShardVersion)) { + remoteShardVersion.cloneTo(&previouslyNotedVersion); + } + } else { + // Epoch changed midway while applying the batch so set the version to + // something unique and non-existent to force a reload when + // refreshIsNeeded is called. + ChunkVersion::IGNORED().cloneTo(&previouslyNotedVersion); + } } +} - Status ChunkManagerTargeter::refreshIfNeeded( bool *wasChanged ) { +void ChunkManagerTargeter::noteCouldNotTarget() { + dassert(_remoteShardVersions.empty()); + _needsTargetingRefresh = true; +} - bool dummy; - if ( !wasChanged ) - wasChanged = &dummy; +const TargeterStats* ChunkManagerTargeter::getStats() const { + return _stats.get(); +} - *wasChanged = false; +Status ChunkManagerTargeter::refreshIfNeeded(bool* wasChanged) { + bool dummy; + if (!wasChanged) + wasChanged = &dummy; - // - // Did we have any stale config or targeting errors at all? - // + *wasChanged = false; - if ( !_needsTargetingRefresh && _remoteShardVersions.empty() ) return Status::OK(); + // + // Did we have any stale config or targeting errors at all? + // - // - // Get the latest metadata information from the cache if there were issues - // + if (!_needsTargetingRefresh && _remoteShardVersions.empty()) + return Status::OK(); - ChunkManagerPtr lastManager = _manager; - ShardPtr lastPrimary = _primary; + // + // Get the latest metadata information from the cache if there were issues + // - DBConfigPtr config; + ChunkManagerPtr lastManager = _manager; + ShardPtr lastPrimary = _primary; - string errMsg; - if ( !getDBConfigSafe( _nss.db(), config, &errMsg ) ) { - return Status( ErrorCodes::DatabaseNotFound, errMsg ); - } + DBConfigPtr config; - // Get either the chunk manager or primary shard - config->getChunkManagerOrPrimary( _nss.ns(), _manager, _primary ); - - // We now have the latest metadata from the cache. + string errMsg; + if (!getDBConfigSafe(_nss.db(), config, &errMsg)) { + return Status(ErrorCodes::DatabaseNotFound, errMsg); + } - // - // See if and how we need to do a remote refresh. - // Either we couldn't target at all, or we have stale versions, but not both. - // + // Get either the chunk manager or primary shard + config->getChunkManagerOrPrimary(_nss.ns(), _manager, _primary); - dassert( !( _needsTargetingRefresh && !_remoteShardVersions.empty() ) ); + // We now have the latest metadata from the cache. - if ( _needsTargetingRefresh ) { + // + // See if and how we need to do a remote refresh. + // Either we couldn't target at all, or we have stale versions, but not both. + // - // Reset the field - _needsTargetingRefresh = false; + dassert(!(_needsTargetingRefresh && !_remoteShardVersions.empty())); - // If we couldn't target, we might need to refresh if we haven't remotely refreshed the - // metadata since we last got it from the cache. + if (_needsTargetingRefresh) { + // Reset the field + _needsTargetingRefresh = false; - bool alreadyRefreshed = wasMetadataRefreshed( lastManager, - lastPrimary, - _manager, - _primary ); + // If we couldn't target, we might need to refresh if we haven't remotely refreshed the + // metadata since we last got it from the cache. - // If didn't already refresh the targeting information, refresh it - if ( !alreadyRefreshed ) { - // To match previous behavior, we just need an incremental refresh here - return refreshNow( RefreshType_RefreshChunkManager ); - } + bool alreadyRefreshed = wasMetadataRefreshed(lastManager, lastPrimary, _manager, _primary); - *wasChanged = isMetadataDifferent( lastManager, lastPrimary, _manager, _primary ); - return Status::OK(); + // If didn't already refresh the targeting information, refresh it + if (!alreadyRefreshed) { + // To match previous behavior, we just need an incremental refresh here + return refreshNow(RefreshType_RefreshChunkManager); } - else if ( !_remoteShardVersions.empty() ) { - - // If we got stale shard versions from remote shards, we may need to refresh - // NOTE: Not sure yet if this can happen simultaneously with targeting issues - CompareResult result = compareAllShardVersions( _manager, - _primary, - _remoteShardVersions ); - // Reset the versions - _remoteShardVersions.clear(); + *wasChanged = isMetadataDifferent(lastManager, lastPrimary, _manager, _primary); + return Status::OK(); + } else if (!_remoteShardVersions.empty()) { + // If we got stale shard versions from remote shards, we may need to refresh + // NOTE: Not sure yet if this can happen simultaneously with targeting issues - if ( result == CompareResult_Unknown ) { - // Our current shard versions aren't all comparable to the old versions, maybe drop - return refreshNow( RefreshType_ReloadDatabase ); - } - else if ( result == CompareResult_LT ) { - // Our current shard versions are less than the remote versions, but no drop - return refreshNow( RefreshType_RefreshChunkManager ); - } + CompareResult result = compareAllShardVersions(_manager, _primary, _remoteShardVersions); + // Reset the versions + _remoteShardVersions.clear(); - *wasChanged = isMetadataDifferent( lastManager, lastPrimary, _manager, _primary ); - return Status::OK(); + if (result == CompareResult_Unknown) { + // Our current shard versions aren't all comparable to the old versions, maybe drop + return refreshNow(RefreshType_ReloadDatabase); + } else if (result == CompareResult_LT) { + // Our current shard versions are less than the remote versions, but no drop + return refreshNow(RefreshType_RefreshChunkManager); } - // unreachable - dassert( false ); + *wasChanged = isMetadataDifferent(lastManager, lastPrimary, _manager, _primary); return Status::OK(); } - // To match legacy reload behavior, we have to backoff on config reload per-thread - // TODO: Centralize this behavior better by refactoring config reload in mongos - static const int maxWaitMillis = 500; - static boost::thread_specific_ptr<Backoff> perThreadBackoff; - - static void refreshBackoff() { - if ( !perThreadBackoff.get() ) - perThreadBackoff.reset( new Backoff( maxWaitMillis, maxWaitMillis * 2 ) ); - perThreadBackoff.get()->nextSleepMillis(); - } + // unreachable + dassert(false); + return Status::OK(); +} - Status ChunkManagerTargeter::refreshNow( RefreshType refreshType ) { +// To match legacy reload behavior, we have to backoff on config reload per-thread +// TODO: Centralize this behavior better by refactoring config reload in mongos +static const int maxWaitMillis = 500; +static boost::thread_specific_ptr<Backoff> perThreadBackoff; - DBConfigPtr config; +static void refreshBackoff() { + if (!perThreadBackoff.get()) + perThreadBackoff.reset(new Backoff(maxWaitMillis, maxWaitMillis * 2)); + perThreadBackoff.get()->nextSleepMillis(); +} - string errMsg; - if ( !getDBConfigSafe( _nss.db(), config, &errMsg ) ) { - return Status( ErrorCodes::DatabaseNotFound, errMsg ); - } +Status ChunkManagerTargeter::refreshNow(RefreshType refreshType) { + DBConfigPtr config; - // Try not to spam the configs - refreshBackoff(); + string errMsg; + if (!getDBConfigSafe(_nss.db(), config, &errMsg)) { + return Status(ErrorCodes::DatabaseNotFound, errMsg); + } - // TODO: Improve synchronization and make more explicit - if ( refreshType == RefreshType_RefreshChunkManager ) { - try { - // Forces a remote check of the collection info, synchronization between threads - // happens internally. - config->getChunkManagerIfExists( _nss.ns(), true ); - } - catch ( const DBException& ex ) { - return Status( ErrorCodes::UnknownError, ex.toString() ); - } - config->getChunkManagerOrPrimary( _nss.ns(), _manager, _primary ); - } - else if ( refreshType == RefreshType_ReloadDatabase ) { - try { - // Dumps the db info, reloads it all, synchronization between threads happens - // internally. - config->reload(); - config->getChunkManagerIfExists( _nss.ns(), true, true ); - } - catch ( const DBException& ex ) { - return Status( ErrorCodes::UnknownError, ex.toString() ); - } - config->getChunkManagerOrPrimary( _nss.ns(), _manager, _primary ); - } + // Try not to spam the configs + refreshBackoff(); - return Status::OK(); + // TODO: Improve synchronization and make more explicit + if (refreshType == RefreshType_RefreshChunkManager) { + try { + // Forces a remote check of the collection info, synchronization between threads + // happens internally. + config->getChunkManagerIfExists(_nss.ns(), true); + } catch (const DBException& ex) { + return Status(ErrorCodes::UnknownError, ex.toString()); + } + config->getChunkManagerOrPrimary(_nss.ns(), _manager, _primary); + } else if (refreshType == RefreshType_ReloadDatabase) { + try { + // Dumps the db info, reloads it all, synchronization between threads happens + // internally. + config->reload(); + config->getChunkManagerIfExists(_nss.ns(), true, true); + } catch (const DBException& ex) { + return Status(ErrorCodes::UnknownError, ex.toString()); + } + config->getChunkManagerOrPrimary(_nss.ns(), _manager, _primary); } -} // namespace mongo + return Status::OK(); +} +} // namespace mongo |