summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp')
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp187
1 files changed, 166 insertions, 21 deletions
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
index d3d63098cae..cfcf9dd3f07 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
@@ -164,7 +164,7 @@ BSONArray buildMergeChunksTransactionPrecond(const std::vector<ChunkType>& chunk
return preCond.arr();
}
-/*
+/**
* Check that the chunk still exists and return its metadata.
*/
StatusWith<ChunkType> getCurrentChunk(OperationContext* opCtx,
@@ -297,7 +297,9 @@ boost::optional<ChunkType> getControlChunkForMigrate(OperationContext* opCtx,
return uassertStatusOK(ChunkType::fromConfigBSON(response.docs.front(), epoch, timestamp));
}
-// Helper function to find collection version and shard version.
+/**
+ * Helper function to find collection version and shard version.
+ */
StatusWith<ChunkVersion> getMaxChunkVersionFromQueryResponse(
const CollectionType& coll, const StatusWith<Shard::QueryResponse>& queryResponse) {
@@ -318,7 +320,9 @@ StatusWith<ChunkVersion> getMaxChunkVersionFromQueryResponse(
return chunk.getVersion();
}
-// Helper function to get the collection version for nss. Always uses kLocalReadConcern.
+/**
+ * Helper function to get the collection version for nss. Always uses kLocalReadConcern.
+ */
StatusWith<ChunkVersion> getCollectionVersion(OperationContext* opCtx, const NamespaceString& nss) {
auto const configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
auto findCollResponse =
@@ -353,7 +357,9 @@ StatusWith<ChunkVersion> getCollectionVersion(OperationContext* opCtx, const Nam
1)); // Limit 1.
}
-// Helper function to get collection version and donor shard version following a merge/move/split
+/**
+ * Helper function to get collection version and donor shard version following a move/split/merge
+ */
BSONObj getShardAndCollectionVersion(OperationContext* opCtx,
const CollectionType& coll,
const ShardId& fromShard) {
@@ -540,7 +546,6 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkSplit(
// Get the max chunk version for this namespace.
auto swCollVersion = getCollectionVersion(opCtx, nss);
-
if (!swCollVersion.isOK()) {
return swCollVersion.getStatus().withContext(
str::stream() << "splitChunk cannot split chunk " << range.toString() << ".");
@@ -758,16 +763,16 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMerge(
// This method must never be called with empty chunks to merge
invariant(!chunkBoundaries.empty());
+ if (!validAfter) {
+ return {ErrorCodes::IllegalOperation, "chunk operation requires validAfter timestamp"};
+ }
+
// Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and
// migrations
// TODO(SERVER-25359): Replace with a collection-specific lock map to allow splits/merges/
// move chunks on different collections to proceed in parallel
Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock);
- if (!validAfter) {
- return {ErrorCodes::IllegalOperation, "chunk operation requires validAfter timestamp"};
- }
-
// Get the max chunk version for this namespace.
auto swCollVersion = getCollectionVersion(opCtx, nss);
if (!swCollVersion.isOK()) {
@@ -882,8 +887,6 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunksMerge(
return {ErrorCodes::IllegalOperation, "chunk operation requires validAfter timestamp"};
}
- const auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
-
// Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and
// migrations
// TODO(SERVER-25359): Replace with a collection-specific lock map to allow splits/merges/
@@ -898,6 +901,7 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunksMerge(
}
// 2. Retrieve the list of chunks belonging to the requested shard + key range.
+ const auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
auto findCollResponse = uassertStatusOK(
configShard->exhaustiveFindOnConfig(opCtx,
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
@@ -906,7 +910,6 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunksMerge(
BSON(CollectionType::kNssFieldName << nss.ns()),
{},
1));
-
if (findCollResponse.docs.empty()) {
return {ErrorCodes::Error(5678601),
str::stream() << "Collection '" << nss.ns() << "' no longer either exists"};
@@ -1034,6 +1037,9 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMigration(
const ShardId& fromShard,
const ShardId& toShard,
const boost::optional<Timestamp>& validAfter) {
+ if (!validAfter) {
+ return {ErrorCodes::IllegalOperation, "chunk operation requires validAfter timestamp"};
+ }
// TODO(SERVER-53283): Remove the logic around fcvRegion to re-enable
// the concurrent execution of moveChunk() and setFCV().
@@ -1043,11 +1049,10 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMigration(
"while the cluster is being upgraded or downgraded",
!fcvRegion->isUpgradingOrDowngrading());
-
- auto const configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
-
// Must hold the shard lock until the entire commit finishes to serialize with removeShard.
Lock::SharedLock shardLock(opCtx->lockState(), _kShardMembershipLock);
+
+ auto const configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
auto shardResult = uassertStatusOK(
configShard->exhaustiveFindOnConfig(opCtx,
ReadPreferenceSetting(ReadPreference::PrimaryOnly),
@@ -1056,7 +1061,6 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMigration(
BSON(ShardType::name(toShard.toString())),
{},
boost::none));
-
uassert(ErrorCodes::ShardNotFound,
str::stream() << "Shard " << toShard << " does not exist",
!shardResult.docs.empty());
@@ -1078,10 +1082,6 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMigration(
// (Note: This is not needed while we have a global lock, taken here only for consistency.)
Lock::ExclusiveLock lk(opCtx, opCtx->lockState(), _kChunkOpLock);
- if (!validAfter) {
- return {ErrorCodes::IllegalOperation, "chunk operation requires validAfter timestamp"};
- }
-
auto findCollResponse = uassertStatusOK(
configShard->exhaustiveFindOnConfig(opCtx,
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
@@ -1093,6 +1093,7 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMigration(
uassert(ErrorCodes::ConflictingOperationInProgress,
"Collection does not exist",
!findCollResponse.docs.empty());
+
const CollectionType coll(findCollResponse.docs[0]);
uassert(ErrorCodes::ConflictingOperationInProgress,
"Collection is undergoing changes and chunks cannot be moved",
@@ -1144,7 +1145,6 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMigration(
const auto collNsOrUUID = getNsOrUUIDForChunkTargeting(coll);
auto swCurrentChunk =
getCurrentChunk(opCtx, collNsOrUUID, coll.getEpoch(), coll.getTimestamp(), migratedChunk);
-
if (!swCurrentChunk.isOK()) {
return swCurrentChunk.getStatus();
}
@@ -1309,6 +1309,151 @@ StatusWith<ChunkType> ShardingCatalogManager::_findChunkOnConfig(
return ChunkType::fromConfigBSON(origChunks.front(), epoch, timestamp);
}
+void ShardingCatalogManager::upgradeChunksHistory(OperationContext* opCtx,
+ const NamespaceString& nss,
+ bool force,
+ const Timestamp validAfter) {
+ auto const catalogClient = Grid::get(opCtx)->catalogClient();
+ const auto shardRegistry = Grid::get(opCtx)->shardRegistry();
+
+ FixedFCVRegion fcvRegion(opCtx);
+ uassert(ErrorCodes::ConflictingOperationInProgress,
+ "Cannot upgrade the chunks history while the cluster is being upgraded or downgraded",
+ !fcvRegion->isUpgradingOrDowngrading());
+
+ // Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and
+ // migrations.
+ Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock);
+
+ auto const configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
+ const auto coll = [&] {
+ auto collDocs = uassertStatusOK(configShard->exhaustiveFindOnConfig(
+ opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ CollectionType::ConfigNS,
+ BSON(CollectionType::kNssFieldName << nss.ns()),
+ {},
+ 1))
+ .docs;
+ uassert(ErrorCodes::NamespaceNotFound, "Collection does not exist", !collDocs.empty());
+
+ return CollectionType(collDocs[0].getOwned());
+ }();
+
+ const auto nsOrUUID = getNsOrUUIDForChunkTargeting(coll);
+ const auto allChunksQuery = nsOrUUID.uuid()
+ ? BSON(ChunkType::collectionUUID << *nsOrUUID.uuid())
+ : BSON(ChunkType::ns << nsOrUUID.nss()->ns());
+
+ if (force) {
+ LOGV2(620650,
+ "Resetting the 'historyIsAt40' field for all chunks in collection {namespace} in "
+ "order to force all chunks' history to get recreated",
+ "namespace"_attr = nss.ns());
+
+ BatchedCommandRequest request([&] {
+ write_ops::UpdateCommandRequest updateOp(ChunkType::ConfigNS);
+ updateOp.setUpdates({[&] {
+ write_ops::UpdateOpEntry entry;
+ entry.setQ(allChunksQuery);
+ entry.setU(write_ops::UpdateModification::parseFromClassicUpdate(
+ BSON("$unset" << BSON(ChunkType::historyIsAt40() << ""))));
+ entry.setUpsert(false);
+ entry.setMulti(true);
+ return entry;
+ }()});
+ return updateOp;
+ }());
+ request.setWriteConcern(ShardingCatalogClient::kLocalWriteConcern.toBSON());
+
+ auto response = configShard->runBatchWriteCommand(
+ opCtx, Shard::kDefaultConfigCommandTimeout, request, Shard::RetryPolicy::kIdempotent);
+ uassertStatusOK(response.toStatus());
+
+ uassert(ErrorCodes::Error(5760502),
+ str::stream() << "No chunks found for collection " << nss.ns(),
+ response.getN() > 0);
+ }
+
+ // Find the collection version
+ const auto collVersion = uassertStatusOK(getCollectionVersion(opCtx, nss));
+
+ // Find the chunk history
+ const auto allChunksVector = [&] {
+ auto findChunksResponse = uassertStatusOK(
+ configShard->exhaustiveFindOnConfig(opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ ChunkType::ConfigNS,
+ allChunksQuery,
+ BSONObj(),
+ boost::none));
+ uassert(ErrorCodes::Error(5760503),
+ str::stream() << "No chunks found for collection " << nss.ns(),
+ !findChunksResponse.docs.empty());
+ return std::move(findChunksResponse.docs);
+ }();
+
+ // Bump the major version in order to be guaranteed to trigger refresh on every shard
+ ChunkVersion newCollectionVersion(
+ collVersion.majorVersion() + 1, 0, collVersion.epoch(), collVersion.getTimestamp());
+ std::set<ShardId> changedShardIds;
+ for (const auto& chunk : allChunksVector) {
+ auto upgradeChunk = uassertStatusOK(
+ ChunkType::fromConfigBSON(chunk, collVersion.epoch(), collVersion.getTimestamp()));
+ bool historyIsAt40 = chunk[ChunkType::historyIsAt40()].booleanSafe();
+ if (historyIsAt40) {
+ uassert(
+ ErrorCodes::Error(5760504),
+ str::stream() << "Chunk " << upgradeChunk.getName() << " in collection " << nss.ns()
+ << " indicates that it has been upgraded to version 4.0, but is "
+ "missing the history field. This indicates a corrupted routing "
+ "table and requires a manual intervention to be fixed.",
+ !upgradeChunk.getHistory().empty());
+ continue;
+ }
+
+ upgradeChunk.setVersion(newCollectionVersion);
+ newCollectionVersion.incMinor();
+ changedShardIds.emplace(upgradeChunk.getShard());
+
+ // Construct the fresh history.
+ upgradeChunk.setHistory({ChunkHistory{validAfter, upgradeChunk.getShard()}});
+
+ // Set the 'historyIsAt40' field so that it gets skipped if the command is re-run
+ BSONObjBuilder chunkObjBuilder(upgradeChunk.toConfigBSON());
+ chunkObjBuilder.appendBool(ChunkType::historyIsAt40(), true);
+
+ // Run the update
+ uassertStatusOK(
+ catalogClient->updateConfigDocument(opCtx,
+ ChunkType::ConfigNS,
+ BSON(ChunkType::name(upgradeChunk.getName())),
+ chunkObjBuilder.obj(),
+ false,
+ ShardingCatalogClient::kLocalWriteConcern));
+ }
+
+ // Wait for the writes to become majority committed so that the subsequent shard refreshes can
+ // see them
+ const auto clientOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
+ WriteConcernResult unusedWCResult;
+ uassertStatusOK(waitForWriteConcern(
+ opCtx, clientOpTime, ShardingCatalogClient::kMajorityWriteConcern, &unusedWCResult));
+
+ for (const auto& shardId : changedShardIds) {
+ auto shard = uassertStatusOK(shardRegistry->getShard(opCtx, shardId));
+ uassertStatusOK(
+ Shard::CommandResponse::getEffectiveStatus(shard->runCommandWithFixedRetryAttempts(
+ opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ "admin",
+ BSON("_flushRoutingTableCacheUpdates" << nss.ns()),
+ Shard::RetryPolicy::kIdempotent)));
+ }
+}
+
void ShardingCatalogManager::clearJumboFlag(OperationContext* opCtx,
const NamespaceString& nss,
const OID& collectionEpoch,