summaryrefslogtreecommitdiff
path: root/src/mongo/s/write_ops
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2020-08-25 01:42:28 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-09-01 10:50:45 +0000
commit9cb82d2e8680717d3002459ba5fdb16036183a17 (patch)
tree6d49e2f6a2bba23707285e90ec1e8b3beba41400 /src/mongo/s/write_ops
parentca4df25002a60910b38bfdd8d71eb5bff5a79b49 (diff)
downloadmongo-9cb82d2e8680717d3002459ba5fdb16036183a17.tar.gz
SERVER-50505 Make the CatalogCache return ChunkManager(s) directly
... instead of returning the intermediate CachedCollectionRoutingInfo class. The ChunkManager should be the only class used for routing.
Diffstat (limited to 'src/mongo/s/write_ops')
-rw-r--r--src/mongo/s/write_ops/batch_write_op.cpp1
-rw-r--r--src/mongo/s/write_ops/chunk_manager_targeter.cpp128
-rw-r--r--src/mongo/s/write_ops/chunk_manager_targeter.h4
3 files changed, 61 insertions, 72 deletions
diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp
index b9bd33d9782..4a492481be4 100644
--- a/src/mongo/s/write_ops/batch_write_op.cpp
+++ b/src/mongo/s/write_ops/batch_write_op.cpp
@@ -39,7 +39,6 @@
#include "mongo/db/ops/write_ops_parsers.h"
#include "mongo/s/client/num_hosts_targeted_metrics.h"
#include "mongo/s/cluster_commands_helpers.h"
-#include "mongo/s/grid.h"
#include "mongo/s/transaction_router.h"
#include "mongo/util/transitional_tools_do_not_use/vector_spooling.h"
diff --git a/src/mongo/s/write_ops/chunk_manager_targeter.cpp b/src/mongo/s/write_ops/chunk_manager_targeter.cpp
index ec79a4ec1e4..f7189efdfe9 100644
--- a/src/mongo/s/write_ops/chunk_manager_targeter.cpp
+++ b/src/mongo/s/write_ops/chunk_manager_targeter.cpp
@@ -171,7 +171,7 @@ BSONObj getUpdateExprForTargeting(const boost::intrusive_ptr<ExpressionContext>
* { _id : { $lt : 30 } } => false
* { foo : <anything> } => false
*/
-bool isExactIdQuery(OperationContext* opCtx, const CanonicalQuery& query, const ChunkManager* cm) {
+bool isExactIdQuery(OperationContext* opCtx, const CanonicalQuery& query, const ChunkManager& cm) {
auto shardKey = kVirtualIdShardKey.extractShardKeyFromQuery(query);
BSONElement idElt = shardKey["_id"];
@@ -179,9 +179,9 @@ bool isExactIdQuery(OperationContext* opCtx, const CanonicalQuery& query, const
return false;
}
- if (CollationIndexKey::isCollatableType(idElt.type()) && cm &&
+ if (CollationIndexKey::isCollatableType(idElt.type()) && cm.isSharded() &&
!query.getQueryRequest().getCollation().isEmpty() &&
- !CollatorInterface::collatorsMatch(query.getCollator(), cm->getDefaultCollator())) {
+ !CollatorInterface::collatorsMatch(query.getCollator(), cm.getDefaultCollator())) {
// The collation applies to the _id field, but the user specified a collation which doesn't
// match the collection default.
@@ -195,7 +195,7 @@ bool isExactIdQuery(OperationContext* opCtx,
const NamespaceString& nss,
const BSONObj query,
const BSONObj collation,
- const ChunkManager* cm) {
+ const ChunkManager& cm) {
auto qr = std::make_unique<QueryRequest>(nss);
qr->setFilter(query);
if (!collation.isEmpty()) {
@@ -242,10 +242,9 @@ CompareResult compareShardVersions(const ChunkVersion& shardVersionA,
return CompareResult_GTE;
}
-ChunkVersion getShardVersion(const CachedCollectionRoutingInfo& routingInfo,
- const ShardId& shardId) {
- if (routingInfo.cm()) {
- return routingInfo.cm()->getVersion(shardId);
+ChunkVersion getShardVersion(const ChunkManager& cm, const ShardId& shardId) {
+ if (cm.isSharded()) {
+ return cm.getVersion(shardId);
}
return ChunkVersion::UNSHARDED();
@@ -262,7 +261,7 @@ ChunkVersion getShardVersion(const CachedCollectionRoutingInfo& routingInfo,
* Note that the signature here is weird since our cached map of chunk versions is stored in a
* ChunkManager or is implicit in the primary shard of the collection.
*/
-CompareResult compareAllShardVersions(const CachedCollectionRoutingInfo& routingInfo,
+CompareResult compareAllShardVersions(const ChunkManager& cm,
const StaleShardVersionMap& remoteShardVersions) {
CompareResult finalResult = CompareResult_GTE;
@@ -274,7 +273,7 @@ CompareResult compareAllShardVersions(const CachedCollectionRoutingInfo& routing
try {
// Throws b/c shard constructor throws
- cachedShardVersion = getShardVersion(routingInfo, shardId);
+ cachedShardVersion = getShardVersion(cm, shardId);
} catch (const DBException& ex) {
LOGV2_WARNING(22915,
"could not lookup shard {shardId} in local cache, shard metadata may "
@@ -301,9 +300,8 @@ CompareResult compareAllShardVersions(const CachedCollectionRoutingInfo& routing
return finalResult;
}
-CompareResult compareDbVersions(const CachedCollectionRoutingInfo& routingInfo,
- const DatabaseVersion& remoteDbVersion) {
- DatabaseVersion cachedDbVersion = routingInfo.db().databaseVersion();
+CompareResult compareDbVersions(const ChunkManager& cm, const DatabaseVersion& remoteDbVersion) {
+ DatabaseVersion cachedDbVersion = cm.dbVersion();
// Db may have been dropped
if (cachedDbVersion.getUuid() != remoteDbVersion.getUuid()) {
@@ -321,15 +319,16 @@ CompareResult compareDbVersions(const CachedCollectionRoutingInfo& routingInfo,
/**
* Whether or not the manager/primary pair is different from the other manager/primary pair.
*/
-bool isMetadataDifferent(const ChunkManager* managerA,
+bool isMetadataDifferent(const ChunkManager& managerA,
const DatabaseVersion dbVersionA,
- const ChunkManager* managerB,
+ const ChunkManager& managerB,
const DatabaseVersion dbVersionB) {
- if ((managerA && !managerB) || (!managerA && managerB))
+ if ((managerA.isSharded() && !managerB.isSharded()) ||
+ (!managerA.isSharded() && managerB.isSharded()))
return true;
- if (managerA) {
- return managerA->getVersion() != managerB->getVersion();
+ if (managerA.isSharded()) {
+ return managerA.getVersion() != managerB.getVersion();
}
return !databaseVersion::equal(dbVersionA, dbVersionB);
@@ -346,13 +345,13 @@ ChunkManagerTargeter::ChunkManagerTargeter(OperationContext* opCtx,
void ChunkManagerTargeter::_init(OperationContext* opCtx) {
createShardDatabase(opCtx, _nss.db());
- _routingInfo = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, _nss));
+ _cm = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, _nss));
if (_targetEpoch) {
- uassert(ErrorCodes::StaleEpoch, "Collection has been dropped", _routingInfo->cm());
+ uassert(ErrorCodes::StaleEpoch, "Collection has been dropped", _cm->isSharded());
uassert(ErrorCodes::StaleEpoch,
"Collection epoch has changed",
- _routingInfo->cm()->getVersion().epoch() == *_targetEpoch);
+ _cm->getVersion().epoch() == *_targetEpoch);
}
}
@@ -364,8 +363,8 @@ ShardEndpoint ChunkManagerTargeter::targetInsert(OperationContext* opCtx,
const BSONObj& doc) const {
BSONObj shardKey;
- if (_routingInfo->cm()) {
- shardKey = _routingInfo->cm()->getShardKeyPattern().extractShardKeyFromDoc(doc);
+ if (_cm->isSharded()) {
+ shardKey = _cm->getShardKeyPattern().extractShardKeyFromDoc(doc);
// The shard key would only be empty after extraction if we encountered an error case, such
// as the shard key possessing an array value or array descendants. If the shard key
// presented to the targeter was empty, we would emplace the missing fields, and the
@@ -381,9 +380,7 @@ ShardEndpoint ChunkManagerTargeter::targetInsert(OperationContext* opCtx,
_targetShardKey(shardKey, CollationSpec::kSimpleSpec, doc.objsize()));
}
- return ShardEndpoint(_routingInfo->db().primary()->getId(),
- ChunkVersion::UNSHARDED(),
- _routingInfo->db().databaseVersion());
+ return ShardEndpoint(_cm->dbPrimary(), ChunkVersion::UNSHARDED(), _cm->dbVersion());
}
std::vector<ShardEndpoint> ChunkManagerTargeter::targetUpdate(OperationContext* opCtx,
@@ -405,13 +402,12 @@ std::vector<ShardEndpoint> ChunkManagerTargeter::targetUpdate(OperationContext*
const auto updateType = getUpdateExprType(updateOp);
// If the collection is not sharded, forward the update to the primary shard.
- if (!_routingInfo->cm()) {
- return std::vector<ShardEndpoint>{{_routingInfo->db().primaryId(),
- ChunkVersion::UNSHARDED(),
- _routingInfo->db().databaseVersion()}};
+ if (!_cm->isSharded()) {
+ return std::vector<ShardEndpoint>{
+ {_cm->dbPrimary(), ChunkVersion::UNSHARDED(), _cm->dbVersion()}};
}
- const auto& shardKeyPattern = _routingInfo->cm()->getShardKeyPattern();
+ const auto& shardKeyPattern = _cm->getShardKeyPattern();
const auto collation = write_ops::collationOf(updateOp);
auto expCtx = makeExpressionContextWithDefaultsForTargeter(opCtx,
@@ -468,7 +464,7 @@ std::vector<ShardEndpoint> ChunkManagerTargeter::targetUpdate(OperationContext*
"a single shard (and have the simple collation), but this update targeted "
<< endPoints.size() << " shards. Update request: " << updateOp.toBSON()
<< ", shard key pattern: " << shardKeyPattern.toString(),
- updateOp.getMulti() || isExactIdQuery(opCtx, _nss, query, collation, _routingInfo->cm()));
+ updateOp.getMulti() || isExactIdQuery(opCtx, _nss, query, collation, *_cm));
// If the request is {multi:false}, then this is a single op-style update which we are
// broadcasting to multiple shards by exact _id. Record this event in our serverStatus metrics.
@@ -492,13 +488,12 @@ std::vector<ShardEndpoint> ChunkManagerTargeter::targetDelete(OperationContext*
itemRef.getRuntimeConstants());
BSONObj shardKey;
- if (_routingInfo->cm()) {
+ if (_cm->isSharded()) {
// Sharded collections have the following further requirements for targeting:
//
// Limit-1 deletes must be targeted exactly by shard key *or* exact _id
- shardKey =
- uassertStatusOK(_routingInfo->cm()->getShardKeyPattern().extractShardKeyFromQuery(
- expCtx, deleteOp.getQ()));
+ shardKey = uassertStatusOK(
+ _cm->getShardKeyPattern().extractShardKeyFromQuery(expCtx, deleteOp.getQ()));
}
// Target the shard key or delete query
@@ -530,10 +525,9 @@ std::vector<ShardEndpoint> ChunkManagerTargeter::targetDelete(OperationContext*
str::stream() << "A single delete on a sharded collection must contain an exact match "
"on _id (and have the collection default collation) or contain the "
"shard key (and have the simple collation). Delete request: "
- << deleteOp.toBSON() << ", shard key pattern: "
- << _routingInfo->cm()->getShardKeyPattern().toString(),
- !_routingInfo->cm() || deleteOp.getMulti() ||
- isExactIdQuery(opCtx, *cq, _routingInfo->cm()));
+ << deleteOp.toBSON()
+ << ", shard key pattern: " << _cm->getShardKeyPattern().toString(),
+ !_cm->isSharded() || deleteOp.getMulti() || isExactIdQuery(opCtx, *cq, *_cm));
return uassertStatusOK(_targetQuery(expCtx, deleteOp.getQ(), collation));
}
@@ -542,22 +536,21 @@ StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::_targetQuery(
boost::intrusive_ptr<ExpressionContext> expCtx,
const BSONObj& query,
const BSONObj& collation) const {
- if (!_routingInfo->cm()) {
- return std::vector<ShardEndpoint>{{_routingInfo->db().primaryId(),
- ChunkVersion::UNSHARDED(),
- _routingInfo->db().databaseVersion()}};
+ if (!_cm->isSharded()) {
+ return std::vector<ShardEndpoint>{
+ {_cm->dbPrimary(), ChunkVersion::UNSHARDED(), _cm->dbVersion()}};
}
std::set<ShardId> shardIds;
try {
- _routingInfo->cm()->getShardIdsForQuery(expCtx, query, collation, &shardIds);
+ _cm->getShardIdsForQuery(expCtx, query, collation, &shardIds);
} catch (const DBException& ex) {
return ex.toStatus();
}
std::vector<ShardEndpoint> endpoints;
for (auto&& shardId : shardIds) {
- endpoints.emplace_back(std::move(shardId), _routingInfo->cm()->getVersion(shardId));
+ endpoints.emplace_back(std::move(shardId), _cm->getVersion(shardId));
}
return endpoints;
@@ -567,8 +560,8 @@ StatusWith<ShardEndpoint> ChunkManagerTargeter::_targetShardKey(const BSONObj& s
const BSONObj& collation,
long long estDataSize) const {
try {
- auto chunk = _routingInfo->cm()->findIntersectingChunk(shardKey, collation);
- return {{chunk.getShardId(), _routingInfo->cm()->getVersion(chunk.getShardId())}};
+ auto chunk = _cm->findIntersectingChunk(shardKey, collation);
+ return {{chunk.getShardId(), _cm->getVersion(chunk.getShardId())}};
} catch (const DBException& ex) {
return ex.toStatus();
}
@@ -578,14 +571,14 @@ StatusWith<ShardEndpoint> ChunkManagerTargeter::_targetShardKey(const BSONObj& s
std::vector<ShardEndpoint> ChunkManagerTargeter::targetAllShards(OperationContext* opCtx) const {
// This function is only called if doing a multi write that targets more than one shard. This
// implies the collection is sharded, so we should always have a chunk manager.
- invariant(_routingInfo->cm());
+ invariant(_cm->isSharded());
std::vector<ShardId> shardIds;
Grid::get(opCtx)->shardRegistry()->getAllShardIdsNoReload(&shardIds);
std::vector<ShardEndpoint> endpoints;
for (auto&& shardId : shardIds) {
- endpoints.emplace_back(std::move(shardId), _routingInfo->cm()->getVersion(shardId));
+ endpoints.emplace_back(std::move(shardId), _cm->getVersion(shardId));
}
return endpoints;
@@ -605,7 +598,7 @@ void ChunkManagerTargeter::noteStaleShardResponse(const ShardEndpoint& endpoint,
ChunkVersion remoteShardVersion;
if (!staleInfo.getVersionWanted()) {
// If we don't have a vWanted sent, assume the version is higher than our current version.
- remoteShardVersion = getShardVersion(*_routingInfo, endpoint.shardName);
+ remoteShardVersion = getShardVersion(*_cm, endpoint.shardName);
remoteShardVersion.incMajor();
} else {
remoteShardVersion = *staleInfo.getVersionWanted();
@@ -637,7 +630,7 @@ void ChunkManagerTargeter::noteStaleDbResponse(const ShardEndpoint& endpoint,
DatabaseVersion remoteDbVersion;
if (!staleInfo.getVersionWanted()) {
// If the vWanted is not set, assume the wanted version is higher than our current version.
- remoteDbVersion = _routingInfo->db().databaseVersion();
+ remoteDbVersion = _cm->dbVersion();
remoteDbVersion = databaseVersion::makeIncremented(remoteDbVersion);
} else {
remoteDbVersion = *staleInfo.getVersionWanted();
@@ -690,8 +683,8 @@ void ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasCha
// Get the latest metadata information from the cache if there were issues
//
- auto lastManager = _routingInfo->cm();
- auto lastDbVersion = _routingInfo->db().databaseVersion();
+ auto lastManager = *_cm;
+ auto lastDbVersion = _cm->dbVersion();
_init(opCtx);
@@ -710,8 +703,8 @@ void ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasCha
// If we couldn't target, we might need to refresh if we haven't remotely refreshed
// the metadata since we last got it from the cache.
- bool alreadyRefreshed = isMetadataDifferent(
- lastManager, lastDbVersion, _routingInfo->cm(), _routingInfo->db().databaseVersion());
+ bool alreadyRefreshed =
+ isMetadataDifferent(lastManager, lastDbVersion, *_cm, _cm->dbVersion());
// If didn't already refresh the targeting information, refresh it
if (!alreadyRefreshed) {
@@ -720,13 +713,12 @@ void ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasCha
return;
}
- *wasChanged = isMetadataDifferent(
- lastManager, lastDbVersion, _routingInfo->cm(), _routingInfo->db().databaseVersion());
+ *wasChanged = isMetadataDifferent(lastManager, lastDbVersion, *_cm, _cm->dbVersion());
} else if (!_remoteShardVersions.empty()) {
// If we got stale shard versions from remote shards, we may need to refresh
// NOTE: Not sure yet if this can happen simultaneously with targeting issues
- CompareResult result = compareAllShardVersions(*_routingInfo, _remoteShardVersions);
+ CompareResult result = compareAllShardVersions(*_cm, _remoteShardVersions);
LOGV2_DEBUG(22913,
4,
@@ -743,13 +735,12 @@ void ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasCha
return;
}
- *wasChanged = isMetadataDifferent(
- lastManager, lastDbVersion, _routingInfo->cm(), _routingInfo->db().databaseVersion());
+ *wasChanged = isMetadataDifferent(lastManager, lastDbVersion, *_cm, _cm->dbVersion());
} else if (_remoteDbVersion) {
// If we got stale database versions from the remote shard, we may need to refresh
// NOTE: Not sure yet if this can happen simultaneously with targeting issues
- CompareResult result = compareDbVersions(*_routingInfo, *_remoteDbVersion);
+ CompareResult result = compareDbVersions(*_cm, *_remoteDbVersion);
LOGV2_DEBUG(22914,
4,
@@ -767,18 +758,17 @@ void ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasCha
return;
}
- *wasChanged = isMetadataDifferent(
- lastManager, lastDbVersion, _routingInfo->cm(), _routingInfo->db().databaseVersion());
+ *wasChanged = isMetadataDifferent(lastManager, lastDbVersion, *_cm, _cm->dbVersion());
}
}
bool ChunkManagerTargeter::endpointIsConfigServer() const {
- if (!_routingInfo->cm()) {
- return _routingInfo->db().primaryId() == ShardRegistry::kConfigServerShardId;
+ if (!_cm->isSharded()) {
+ return _cm->dbPrimary() == ShardRegistry::kConfigServerShardId;
}
std::set<ShardId> shardIds;
- _routingInfo->cm()->getAllShardIds(&shardIds);
+ _cm->getAllShardIds(&shardIds);
if (std::any_of(shardIds.begin(), shardIds.end(), [](const auto& shardId) {
return shardId == ShardRegistry::kConfigServerShardId;
@@ -792,8 +782,8 @@ bool ChunkManagerTargeter::endpointIsConfigServer() const {
}
int ChunkManagerTargeter::getNShardsOwningChunks() const {
- if (_routingInfo->cm()) {
- return _routingInfo->cm()->getNShardsOwningChunks();
+ if (_cm->isSharded()) {
+ return _cm->getNShardsOwningChunks();
}
return 0;
diff --git a/src/mongo/s/write_ops/chunk_manager_targeter.h b/src/mongo/s/write_ops/chunk_manager_targeter.h
index 1e62bc2eeb5..caaa8884399 100644
--- a/src/mongo/s/write_ops/chunk_manager_targeter.h
+++ b/src/mongo/s/write_ops/chunk_manager_targeter.h
@@ -35,7 +35,7 @@
#include "mongo/bson/bsonobj_comparator_interface.h"
#include "mongo/bson/simple_bsonobj_comparator.h"
#include "mongo/db/namespace_string.h"
-#include "mongo/s/catalog_cache.h"
+#include "mongo/s/chunk_manager.h"
#include "mongo/s/ns_targeter.h"
#include "mongo/s/write_ops/batched_command_request.h"
@@ -151,7 +151,7 @@ private:
bool _needsTargetingRefresh;
// The latest loaded routing cache entry
- boost::optional<CachedCollectionRoutingInfo> _routingInfo;
+ boost::optional<ChunkManager> _cm;
// Set to the epoch of the namespace we are targeting. If we ever refresh the catalog cache and
// find a new epoch, we immediately throw a StaleEpoch exception.