diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2020-08-25 01:42:28 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-09-01 10:50:45 +0000 |
commit | 9cb82d2e8680717d3002459ba5fdb16036183a17 (patch) | |
tree | 6d49e2f6a2bba23707285e90ec1e8b3beba41400 /src/mongo/s/write_ops | |
parent | ca4df25002a60910b38bfdd8d71eb5bff5a79b49 (diff) | |
download | mongo-9cb82d2e8680717d3002459ba5fdb16036183a17.tar.gz |
SERVER-50505 Make the CatalogCache return ChunkManager(s) directly
... instead of returning the intermediate CachedCollectionRoutingInfo
class. The ChunkManager should be the only class used for routing.
Diffstat (limited to 'src/mongo/s/write_ops')
-rw-r--r-- | src/mongo/s/write_ops/batch_write_op.cpp | 1 | ||||
-rw-r--r-- | src/mongo/s/write_ops/chunk_manager_targeter.cpp | 128 | ||||
-rw-r--r-- | src/mongo/s/write_ops/chunk_manager_targeter.h | 4 |
3 files changed, 61 insertions, 72 deletions
diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp index b9bd33d9782..4a492481be4 100644 --- a/src/mongo/s/write_ops/batch_write_op.cpp +++ b/src/mongo/s/write_ops/batch_write_op.cpp @@ -39,7 +39,6 @@ #include "mongo/db/ops/write_ops_parsers.h" #include "mongo/s/client/num_hosts_targeted_metrics.h" #include "mongo/s/cluster_commands_helpers.h" -#include "mongo/s/grid.h" #include "mongo/s/transaction_router.h" #include "mongo/util/transitional_tools_do_not_use/vector_spooling.h" diff --git a/src/mongo/s/write_ops/chunk_manager_targeter.cpp b/src/mongo/s/write_ops/chunk_manager_targeter.cpp index ec79a4ec1e4..f7189efdfe9 100644 --- a/src/mongo/s/write_ops/chunk_manager_targeter.cpp +++ b/src/mongo/s/write_ops/chunk_manager_targeter.cpp @@ -171,7 +171,7 @@ BSONObj getUpdateExprForTargeting(const boost::intrusive_ptr<ExpressionContext> * { _id : { $lt : 30 } } => false * { foo : <anything> } => false */ -bool isExactIdQuery(OperationContext* opCtx, const CanonicalQuery& query, const ChunkManager* cm) { +bool isExactIdQuery(OperationContext* opCtx, const CanonicalQuery& query, const ChunkManager& cm) { auto shardKey = kVirtualIdShardKey.extractShardKeyFromQuery(query); BSONElement idElt = shardKey["_id"]; @@ -179,9 +179,9 @@ bool isExactIdQuery(OperationContext* opCtx, const CanonicalQuery& query, const return false; } - if (CollationIndexKey::isCollatableType(idElt.type()) && cm && + if (CollationIndexKey::isCollatableType(idElt.type()) && cm.isSharded() && !query.getQueryRequest().getCollation().isEmpty() && - !CollatorInterface::collatorsMatch(query.getCollator(), cm->getDefaultCollator())) { + !CollatorInterface::collatorsMatch(query.getCollator(), cm.getDefaultCollator())) { // The collation applies to the _id field, but the user specified a collation which doesn't // match the collection default. @@ -195,7 +195,7 @@ bool isExactIdQuery(OperationContext* opCtx, const NamespaceString& nss, const BSONObj query, const BSONObj collation, - const ChunkManager* cm) { + const ChunkManager& cm) { auto qr = std::make_unique<QueryRequest>(nss); qr->setFilter(query); if (!collation.isEmpty()) { @@ -242,10 +242,9 @@ CompareResult compareShardVersions(const ChunkVersion& shardVersionA, return CompareResult_GTE; } -ChunkVersion getShardVersion(const CachedCollectionRoutingInfo& routingInfo, - const ShardId& shardId) { - if (routingInfo.cm()) { - return routingInfo.cm()->getVersion(shardId); +ChunkVersion getShardVersion(const ChunkManager& cm, const ShardId& shardId) { + if (cm.isSharded()) { + return cm.getVersion(shardId); } return ChunkVersion::UNSHARDED(); @@ -262,7 +261,7 @@ ChunkVersion getShardVersion(const CachedCollectionRoutingInfo& routingInfo, * Note that the signature here is weird since our cached map of chunk versions is stored in a * ChunkManager or is implicit in the primary shard of the collection. */ -CompareResult compareAllShardVersions(const CachedCollectionRoutingInfo& routingInfo, +CompareResult compareAllShardVersions(const ChunkManager& cm, const StaleShardVersionMap& remoteShardVersions) { CompareResult finalResult = CompareResult_GTE; @@ -274,7 +273,7 @@ CompareResult compareAllShardVersions(const CachedCollectionRoutingInfo& routing try { // Throws b/c shard constructor throws - cachedShardVersion = getShardVersion(routingInfo, shardId); + cachedShardVersion = getShardVersion(cm, shardId); } catch (const DBException& ex) { LOGV2_WARNING(22915, "could not lookup shard {shardId} in local cache, shard metadata may " @@ -301,9 +300,8 @@ CompareResult compareAllShardVersions(const CachedCollectionRoutingInfo& routing return finalResult; } -CompareResult compareDbVersions(const CachedCollectionRoutingInfo& routingInfo, - const DatabaseVersion& remoteDbVersion) { - DatabaseVersion cachedDbVersion = routingInfo.db().databaseVersion(); +CompareResult compareDbVersions(const ChunkManager& cm, const DatabaseVersion& remoteDbVersion) { + DatabaseVersion cachedDbVersion = cm.dbVersion(); // Db may have been dropped if (cachedDbVersion.getUuid() != remoteDbVersion.getUuid()) { @@ -321,15 +319,16 @@ CompareResult compareDbVersions(const CachedCollectionRoutingInfo& routingInfo, /** * Whether or not the manager/primary pair is different from the other manager/primary pair. */ -bool isMetadataDifferent(const ChunkManager* managerA, +bool isMetadataDifferent(const ChunkManager& managerA, const DatabaseVersion dbVersionA, - const ChunkManager* managerB, + const ChunkManager& managerB, const DatabaseVersion dbVersionB) { - if ((managerA && !managerB) || (!managerA && managerB)) + if ((managerA.isSharded() && !managerB.isSharded()) || + (!managerA.isSharded() && managerB.isSharded())) return true; - if (managerA) { - return managerA->getVersion() != managerB->getVersion(); + if (managerA.isSharded()) { + return managerA.getVersion() != managerB.getVersion(); } return !databaseVersion::equal(dbVersionA, dbVersionB); @@ -346,13 +345,13 @@ ChunkManagerTargeter::ChunkManagerTargeter(OperationContext* opCtx, void ChunkManagerTargeter::_init(OperationContext* opCtx) { createShardDatabase(opCtx, _nss.db()); - _routingInfo = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, _nss)); + _cm = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, _nss)); if (_targetEpoch) { - uassert(ErrorCodes::StaleEpoch, "Collection has been dropped", _routingInfo->cm()); + uassert(ErrorCodes::StaleEpoch, "Collection has been dropped", _cm->isSharded()); uassert(ErrorCodes::StaleEpoch, "Collection epoch has changed", - _routingInfo->cm()->getVersion().epoch() == *_targetEpoch); + _cm->getVersion().epoch() == *_targetEpoch); } } @@ -364,8 +363,8 @@ ShardEndpoint ChunkManagerTargeter::targetInsert(OperationContext* opCtx, const BSONObj& doc) const { BSONObj shardKey; - if (_routingInfo->cm()) { - shardKey = _routingInfo->cm()->getShardKeyPattern().extractShardKeyFromDoc(doc); + if (_cm->isSharded()) { + shardKey = _cm->getShardKeyPattern().extractShardKeyFromDoc(doc); // The shard key would only be empty after extraction if we encountered an error case, such // as the shard key possessing an array value or array descendants. If the shard key // presented to the targeter was empty, we would emplace the missing fields, and the @@ -381,9 +380,7 @@ ShardEndpoint ChunkManagerTargeter::targetInsert(OperationContext* opCtx, _targetShardKey(shardKey, CollationSpec::kSimpleSpec, doc.objsize())); } - return ShardEndpoint(_routingInfo->db().primary()->getId(), - ChunkVersion::UNSHARDED(), - _routingInfo->db().databaseVersion()); + return ShardEndpoint(_cm->dbPrimary(), ChunkVersion::UNSHARDED(), _cm->dbVersion()); } std::vector<ShardEndpoint> ChunkManagerTargeter::targetUpdate(OperationContext* opCtx, @@ -405,13 +402,12 @@ std::vector<ShardEndpoint> ChunkManagerTargeter::targetUpdate(OperationContext* const auto updateType = getUpdateExprType(updateOp); // If the collection is not sharded, forward the update to the primary shard. - if (!_routingInfo->cm()) { - return std::vector<ShardEndpoint>{{_routingInfo->db().primaryId(), - ChunkVersion::UNSHARDED(), - _routingInfo->db().databaseVersion()}}; + if (!_cm->isSharded()) { + return std::vector<ShardEndpoint>{ + {_cm->dbPrimary(), ChunkVersion::UNSHARDED(), _cm->dbVersion()}}; } - const auto& shardKeyPattern = _routingInfo->cm()->getShardKeyPattern(); + const auto& shardKeyPattern = _cm->getShardKeyPattern(); const auto collation = write_ops::collationOf(updateOp); auto expCtx = makeExpressionContextWithDefaultsForTargeter(opCtx, @@ -468,7 +464,7 @@ std::vector<ShardEndpoint> ChunkManagerTargeter::targetUpdate(OperationContext* "a single shard (and have the simple collation), but this update targeted " << endPoints.size() << " shards. Update request: " << updateOp.toBSON() << ", shard key pattern: " << shardKeyPattern.toString(), - updateOp.getMulti() || isExactIdQuery(opCtx, _nss, query, collation, _routingInfo->cm())); + updateOp.getMulti() || isExactIdQuery(opCtx, _nss, query, collation, *_cm)); // If the request is {multi:false}, then this is a single op-style update which we are // broadcasting to multiple shards by exact _id. Record this event in our serverStatus metrics. @@ -492,13 +488,12 @@ std::vector<ShardEndpoint> ChunkManagerTargeter::targetDelete(OperationContext* itemRef.getRuntimeConstants()); BSONObj shardKey; - if (_routingInfo->cm()) { + if (_cm->isSharded()) { // Sharded collections have the following further requirements for targeting: // // Limit-1 deletes must be targeted exactly by shard key *or* exact _id - shardKey = - uassertStatusOK(_routingInfo->cm()->getShardKeyPattern().extractShardKeyFromQuery( - expCtx, deleteOp.getQ())); + shardKey = uassertStatusOK( + _cm->getShardKeyPattern().extractShardKeyFromQuery(expCtx, deleteOp.getQ())); } // Target the shard key or delete query @@ -530,10 +525,9 @@ std::vector<ShardEndpoint> ChunkManagerTargeter::targetDelete(OperationContext* str::stream() << "A single delete on a sharded collection must contain an exact match " "on _id (and have the collection default collation) or contain the " "shard key (and have the simple collation). Delete request: " - << deleteOp.toBSON() << ", shard key pattern: " - << _routingInfo->cm()->getShardKeyPattern().toString(), - !_routingInfo->cm() || deleteOp.getMulti() || - isExactIdQuery(opCtx, *cq, _routingInfo->cm())); + << deleteOp.toBSON() + << ", shard key pattern: " << _cm->getShardKeyPattern().toString(), + !_cm->isSharded() || deleteOp.getMulti() || isExactIdQuery(opCtx, *cq, *_cm)); return uassertStatusOK(_targetQuery(expCtx, deleteOp.getQ(), collation)); } @@ -542,22 +536,21 @@ StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::_targetQuery( boost::intrusive_ptr<ExpressionContext> expCtx, const BSONObj& query, const BSONObj& collation) const { - if (!_routingInfo->cm()) { - return std::vector<ShardEndpoint>{{_routingInfo->db().primaryId(), - ChunkVersion::UNSHARDED(), - _routingInfo->db().databaseVersion()}}; + if (!_cm->isSharded()) { + return std::vector<ShardEndpoint>{ + {_cm->dbPrimary(), ChunkVersion::UNSHARDED(), _cm->dbVersion()}}; } std::set<ShardId> shardIds; try { - _routingInfo->cm()->getShardIdsForQuery(expCtx, query, collation, &shardIds); + _cm->getShardIdsForQuery(expCtx, query, collation, &shardIds); } catch (const DBException& ex) { return ex.toStatus(); } std::vector<ShardEndpoint> endpoints; for (auto&& shardId : shardIds) { - endpoints.emplace_back(std::move(shardId), _routingInfo->cm()->getVersion(shardId)); + endpoints.emplace_back(std::move(shardId), _cm->getVersion(shardId)); } return endpoints; @@ -567,8 +560,8 @@ StatusWith<ShardEndpoint> ChunkManagerTargeter::_targetShardKey(const BSONObj& s const BSONObj& collation, long long estDataSize) const { try { - auto chunk = _routingInfo->cm()->findIntersectingChunk(shardKey, collation); - return {{chunk.getShardId(), _routingInfo->cm()->getVersion(chunk.getShardId())}}; + auto chunk = _cm->findIntersectingChunk(shardKey, collation); + return {{chunk.getShardId(), _cm->getVersion(chunk.getShardId())}}; } catch (const DBException& ex) { return ex.toStatus(); } @@ -578,14 +571,14 @@ StatusWith<ShardEndpoint> ChunkManagerTargeter::_targetShardKey(const BSONObj& s std::vector<ShardEndpoint> ChunkManagerTargeter::targetAllShards(OperationContext* opCtx) const { // This function is only called if doing a multi write that targets more than one shard. This // implies the collection is sharded, so we should always have a chunk manager. - invariant(_routingInfo->cm()); + invariant(_cm->isSharded()); std::vector<ShardId> shardIds; Grid::get(opCtx)->shardRegistry()->getAllShardIdsNoReload(&shardIds); std::vector<ShardEndpoint> endpoints; for (auto&& shardId : shardIds) { - endpoints.emplace_back(std::move(shardId), _routingInfo->cm()->getVersion(shardId)); + endpoints.emplace_back(std::move(shardId), _cm->getVersion(shardId)); } return endpoints; @@ -605,7 +598,7 @@ void ChunkManagerTargeter::noteStaleShardResponse(const ShardEndpoint& endpoint, ChunkVersion remoteShardVersion; if (!staleInfo.getVersionWanted()) { // If we don't have a vWanted sent, assume the version is higher than our current version. - remoteShardVersion = getShardVersion(*_routingInfo, endpoint.shardName); + remoteShardVersion = getShardVersion(*_cm, endpoint.shardName); remoteShardVersion.incMajor(); } else { remoteShardVersion = *staleInfo.getVersionWanted(); @@ -637,7 +630,7 @@ void ChunkManagerTargeter::noteStaleDbResponse(const ShardEndpoint& endpoint, DatabaseVersion remoteDbVersion; if (!staleInfo.getVersionWanted()) { // If the vWanted is not set, assume the wanted version is higher than our current version. - remoteDbVersion = _routingInfo->db().databaseVersion(); + remoteDbVersion = _cm->dbVersion(); remoteDbVersion = databaseVersion::makeIncremented(remoteDbVersion); } else { remoteDbVersion = *staleInfo.getVersionWanted(); @@ -690,8 +683,8 @@ void ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasCha // Get the latest metadata information from the cache if there were issues // - auto lastManager = _routingInfo->cm(); - auto lastDbVersion = _routingInfo->db().databaseVersion(); + auto lastManager = *_cm; + auto lastDbVersion = _cm->dbVersion(); _init(opCtx); @@ -710,8 +703,8 @@ void ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasCha // If we couldn't target, we might need to refresh if we haven't remotely refreshed // the metadata since we last got it from the cache. - bool alreadyRefreshed = isMetadataDifferent( - lastManager, lastDbVersion, _routingInfo->cm(), _routingInfo->db().databaseVersion()); + bool alreadyRefreshed = + isMetadataDifferent(lastManager, lastDbVersion, *_cm, _cm->dbVersion()); // If didn't already refresh the targeting information, refresh it if (!alreadyRefreshed) { @@ -720,13 +713,12 @@ void ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasCha return; } - *wasChanged = isMetadataDifferent( - lastManager, lastDbVersion, _routingInfo->cm(), _routingInfo->db().databaseVersion()); + *wasChanged = isMetadataDifferent(lastManager, lastDbVersion, *_cm, _cm->dbVersion()); } else if (!_remoteShardVersions.empty()) { // If we got stale shard versions from remote shards, we may need to refresh // NOTE: Not sure yet if this can happen simultaneously with targeting issues - CompareResult result = compareAllShardVersions(*_routingInfo, _remoteShardVersions); + CompareResult result = compareAllShardVersions(*_cm, _remoteShardVersions); LOGV2_DEBUG(22913, 4, @@ -743,13 +735,12 @@ void ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasCha return; } - *wasChanged = isMetadataDifferent( - lastManager, lastDbVersion, _routingInfo->cm(), _routingInfo->db().databaseVersion()); + *wasChanged = isMetadataDifferent(lastManager, lastDbVersion, *_cm, _cm->dbVersion()); } else if (_remoteDbVersion) { // If we got stale database versions from the remote shard, we may need to refresh // NOTE: Not sure yet if this can happen simultaneously with targeting issues - CompareResult result = compareDbVersions(*_routingInfo, *_remoteDbVersion); + CompareResult result = compareDbVersions(*_cm, *_remoteDbVersion); LOGV2_DEBUG(22914, 4, @@ -767,18 +758,17 @@ void ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasCha return; } - *wasChanged = isMetadataDifferent( - lastManager, lastDbVersion, _routingInfo->cm(), _routingInfo->db().databaseVersion()); + *wasChanged = isMetadataDifferent(lastManager, lastDbVersion, *_cm, _cm->dbVersion()); } } bool ChunkManagerTargeter::endpointIsConfigServer() const { - if (!_routingInfo->cm()) { - return _routingInfo->db().primaryId() == ShardRegistry::kConfigServerShardId; + if (!_cm->isSharded()) { + return _cm->dbPrimary() == ShardRegistry::kConfigServerShardId; } std::set<ShardId> shardIds; - _routingInfo->cm()->getAllShardIds(&shardIds); + _cm->getAllShardIds(&shardIds); if (std::any_of(shardIds.begin(), shardIds.end(), [](const auto& shardId) { return shardId == ShardRegistry::kConfigServerShardId; @@ -792,8 +782,8 @@ bool ChunkManagerTargeter::endpointIsConfigServer() const { } int ChunkManagerTargeter::getNShardsOwningChunks() const { - if (_routingInfo->cm()) { - return _routingInfo->cm()->getNShardsOwningChunks(); + if (_cm->isSharded()) { + return _cm->getNShardsOwningChunks(); } return 0; diff --git a/src/mongo/s/write_ops/chunk_manager_targeter.h b/src/mongo/s/write_ops/chunk_manager_targeter.h index 1e62bc2eeb5..caaa8884399 100644 --- a/src/mongo/s/write_ops/chunk_manager_targeter.h +++ b/src/mongo/s/write_ops/chunk_manager_targeter.h @@ -35,7 +35,7 @@ #include "mongo/bson/bsonobj_comparator_interface.h" #include "mongo/bson/simple_bsonobj_comparator.h" #include "mongo/db/namespace_string.h" -#include "mongo/s/catalog_cache.h" +#include "mongo/s/chunk_manager.h" #include "mongo/s/ns_targeter.h" #include "mongo/s/write_ops/batched_command_request.h" @@ -151,7 +151,7 @@ private: bool _needsTargetingRefresh; // The latest loaded routing cache entry - boost::optional<CachedCollectionRoutingInfo> _routingInfo; + boost::optional<ChunkManager> _cm; // Set to the epoch of the namespace we are targeting. If we ever refresh the catalog cache and // find a new epoch, we immediately throw a StaleEpoch exception. |