diff options
Diffstat (limited to 'src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp')
-rw-r--r-- | src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp | 187 |
1 files changed, 166 insertions, 21 deletions
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp index d3d63098cae..cfcf9dd3f07 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp @@ -164,7 +164,7 @@ BSONArray buildMergeChunksTransactionPrecond(const std::vector<ChunkType>& chunk return preCond.arr(); } -/* +/** * Check that the chunk still exists and return its metadata. */ StatusWith<ChunkType> getCurrentChunk(OperationContext* opCtx, @@ -297,7 +297,9 @@ boost::optional<ChunkType> getControlChunkForMigrate(OperationContext* opCtx, return uassertStatusOK(ChunkType::fromConfigBSON(response.docs.front(), epoch, timestamp)); } -// Helper function to find collection version and shard version. +/** + * Helper function to find collection version and shard version. + */ StatusWith<ChunkVersion> getMaxChunkVersionFromQueryResponse( const CollectionType& coll, const StatusWith<Shard::QueryResponse>& queryResponse) { @@ -318,7 +320,9 @@ StatusWith<ChunkVersion> getMaxChunkVersionFromQueryResponse( return chunk.getVersion(); } -// Helper function to get the collection version for nss. Always uses kLocalReadConcern. +/** + * Helper function to get the collection version for nss. Always uses kLocalReadConcern. + */ StatusWith<ChunkVersion> getCollectionVersion(OperationContext* opCtx, const NamespaceString& nss) { auto const configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); auto findCollResponse = @@ -353,7 +357,9 @@ StatusWith<ChunkVersion> getCollectionVersion(OperationContext* opCtx, const Nam 1)); // Limit 1. } -// Helper function to get collection version and donor shard version following a merge/move/split +/** + * Helper function to get collection version and donor shard version following a move/split/merge + */ BSONObj getShardAndCollectionVersion(OperationContext* opCtx, const CollectionType& coll, const ShardId& fromShard) { @@ -540,7 +546,6 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkSplit( // Get the max chunk version for this namespace. auto swCollVersion = getCollectionVersion(opCtx, nss); - if (!swCollVersion.isOK()) { return swCollVersion.getStatus().withContext( str::stream() << "splitChunk cannot split chunk " << range.toString() << "."); @@ -758,16 +763,16 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMerge( // This method must never be called with empty chunks to merge invariant(!chunkBoundaries.empty()); + if (!validAfter) { + return {ErrorCodes::IllegalOperation, "chunk operation requires validAfter timestamp"}; + } + // Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and // migrations // TODO(SERVER-25359): Replace with a collection-specific lock map to allow splits/merges/ // move chunks on different collections to proceed in parallel Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock); - if (!validAfter) { - return {ErrorCodes::IllegalOperation, "chunk operation requires validAfter timestamp"}; - } - // Get the max chunk version for this namespace. auto swCollVersion = getCollectionVersion(opCtx, nss); if (!swCollVersion.isOK()) { @@ -882,8 +887,6 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunksMerge( return {ErrorCodes::IllegalOperation, "chunk operation requires validAfter timestamp"}; } - const auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); - // Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and // migrations // TODO(SERVER-25359): Replace with a collection-specific lock map to allow splits/merges/ @@ -898,6 +901,7 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunksMerge( } // 2. Retrieve the list of chunks belonging to the requested shard + key range. + const auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); auto findCollResponse = uassertStatusOK( configShard->exhaustiveFindOnConfig(opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, @@ -906,7 +910,6 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunksMerge( BSON(CollectionType::kNssFieldName << nss.ns()), {}, 1)); - if (findCollResponse.docs.empty()) { return {ErrorCodes::Error(5678601), str::stream() << "Collection '" << nss.ns() << "' no longer either exists"}; @@ -1034,6 +1037,9 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMigration( const ShardId& fromShard, const ShardId& toShard, const boost::optional<Timestamp>& validAfter) { + if (!validAfter) { + return {ErrorCodes::IllegalOperation, "chunk operation requires validAfter timestamp"}; + } // TODO(SERVER-53283): Remove the logic around fcvRegion to re-enable // the concurrent execution of moveChunk() and setFCV(). @@ -1043,11 +1049,10 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMigration( "while the cluster is being upgraded or downgraded", !fcvRegion->isUpgradingOrDowngrading()); - - auto const configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); - // Must hold the shard lock until the entire commit finishes to serialize with removeShard. Lock::SharedLock shardLock(opCtx->lockState(), _kShardMembershipLock); + + auto const configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); auto shardResult = uassertStatusOK( configShard->exhaustiveFindOnConfig(opCtx, ReadPreferenceSetting(ReadPreference::PrimaryOnly), @@ -1056,7 +1061,6 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMigration( BSON(ShardType::name(toShard.toString())), {}, boost::none)); - uassert(ErrorCodes::ShardNotFound, str::stream() << "Shard " << toShard << " does not exist", !shardResult.docs.empty()); @@ -1078,10 +1082,6 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMigration( // (Note: This is not needed while we have a global lock, taken here only for consistency.) Lock::ExclusiveLock lk(opCtx, opCtx->lockState(), _kChunkOpLock); - if (!validAfter) { - return {ErrorCodes::IllegalOperation, "chunk operation requires validAfter timestamp"}; - } - auto findCollResponse = uassertStatusOK( configShard->exhaustiveFindOnConfig(opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, @@ -1093,6 +1093,7 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMigration( uassert(ErrorCodes::ConflictingOperationInProgress, "Collection does not exist", !findCollResponse.docs.empty()); + const CollectionType coll(findCollResponse.docs[0]); uassert(ErrorCodes::ConflictingOperationInProgress, "Collection is undergoing changes and chunks cannot be moved", @@ -1144,7 +1145,6 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMigration( const auto collNsOrUUID = getNsOrUUIDForChunkTargeting(coll); auto swCurrentChunk = getCurrentChunk(opCtx, collNsOrUUID, coll.getEpoch(), coll.getTimestamp(), migratedChunk); - if (!swCurrentChunk.isOK()) { return swCurrentChunk.getStatus(); } @@ -1309,6 +1309,151 @@ StatusWith<ChunkType> ShardingCatalogManager::_findChunkOnConfig( return ChunkType::fromConfigBSON(origChunks.front(), epoch, timestamp); } +void ShardingCatalogManager::upgradeChunksHistory(OperationContext* opCtx, + const NamespaceString& nss, + bool force, + const Timestamp validAfter) { + auto const catalogClient = Grid::get(opCtx)->catalogClient(); + const auto shardRegistry = Grid::get(opCtx)->shardRegistry(); + + FixedFCVRegion fcvRegion(opCtx); + uassert(ErrorCodes::ConflictingOperationInProgress, + "Cannot upgrade the chunks history while the cluster is being upgraded or downgraded", + !fcvRegion->isUpgradingOrDowngrading()); + + // Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and + // migrations. + Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock); + + auto const configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); + const auto coll = [&] { + auto collDocs = uassertStatusOK(configShard->exhaustiveFindOnConfig( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + CollectionType::ConfigNS, + BSON(CollectionType::kNssFieldName << nss.ns()), + {}, + 1)) + .docs; + uassert(ErrorCodes::NamespaceNotFound, "Collection does not exist", !collDocs.empty()); + + return CollectionType(collDocs[0].getOwned()); + }(); + + const auto nsOrUUID = getNsOrUUIDForChunkTargeting(coll); + const auto allChunksQuery = nsOrUUID.uuid() + ? BSON(ChunkType::collectionUUID << *nsOrUUID.uuid()) + : BSON(ChunkType::ns << nsOrUUID.nss()->ns()); + + if (force) { + LOGV2(620650, + "Resetting the 'historyIsAt40' field for all chunks in collection {namespace} in " + "order to force all chunks' history to get recreated", + "namespace"_attr = nss.ns()); + + BatchedCommandRequest request([&] { + write_ops::UpdateCommandRequest updateOp(ChunkType::ConfigNS); + updateOp.setUpdates({[&] { + write_ops::UpdateOpEntry entry; + entry.setQ(allChunksQuery); + entry.setU(write_ops::UpdateModification::parseFromClassicUpdate( + BSON("$unset" << BSON(ChunkType::historyIsAt40() << "")))); + entry.setUpsert(false); + entry.setMulti(true); + return entry; + }()}); + return updateOp; + }()); + request.setWriteConcern(ShardingCatalogClient::kLocalWriteConcern.toBSON()); + + auto response = configShard->runBatchWriteCommand( + opCtx, Shard::kDefaultConfigCommandTimeout, request, Shard::RetryPolicy::kIdempotent); + uassertStatusOK(response.toStatus()); + + uassert(ErrorCodes::Error(5760502), + str::stream() << "No chunks found for collection " << nss.ns(), + response.getN() > 0); + } + + // Find the collection version + const auto collVersion = uassertStatusOK(getCollectionVersion(opCtx, nss)); + + // Find the chunk history + const auto allChunksVector = [&] { + auto findChunksResponse = uassertStatusOK( + configShard->exhaustiveFindOnConfig(opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + ChunkType::ConfigNS, + allChunksQuery, + BSONObj(), + boost::none)); + uassert(ErrorCodes::Error(5760503), + str::stream() << "No chunks found for collection " << nss.ns(), + !findChunksResponse.docs.empty()); + return std::move(findChunksResponse.docs); + }(); + + // Bump the major version in order to be guaranteed to trigger refresh on every shard + ChunkVersion newCollectionVersion( + collVersion.majorVersion() + 1, 0, collVersion.epoch(), collVersion.getTimestamp()); + std::set<ShardId> changedShardIds; + for (const auto& chunk : allChunksVector) { + auto upgradeChunk = uassertStatusOK( + ChunkType::fromConfigBSON(chunk, collVersion.epoch(), collVersion.getTimestamp())); + bool historyIsAt40 = chunk[ChunkType::historyIsAt40()].booleanSafe(); + if (historyIsAt40) { + uassert( + ErrorCodes::Error(5760504), + str::stream() << "Chunk " << upgradeChunk.getName() << " in collection " << nss.ns() + << " indicates that it has been upgraded to version 4.0, but is " + "missing the history field. This indicates a corrupted routing " + "table and requires a manual intervention to be fixed.", + !upgradeChunk.getHistory().empty()); + continue; + } + + upgradeChunk.setVersion(newCollectionVersion); + newCollectionVersion.incMinor(); + changedShardIds.emplace(upgradeChunk.getShard()); + + // Construct the fresh history. + upgradeChunk.setHistory({ChunkHistory{validAfter, upgradeChunk.getShard()}}); + + // Set the 'historyIsAt40' field so that it gets skipped if the command is re-run + BSONObjBuilder chunkObjBuilder(upgradeChunk.toConfigBSON()); + chunkObjBuilder.appendBool(ChunkType::historyIsAt40(), true); + + // Run the update + uassertStatusOK( + catalogClient->updateConfigDocument(opCtx, + ChunkType::ConfigNS, + BSON(ChunkType::name(upgradeChunk.getName())), + chunkObjBuilder.obj(), + false, + ShardingCatalogClient::kLocalWriteConcern)); + } + + // Wait for the writes to become majority committed so that the subsequent shard refreshes can + // see them + const auto clientOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + WriteConcernResult unusedWCResult; + uassertStatusOK(waitForWriteConcern( + opCtx, clientOpTime, ShardingCatalogClient::kMajorityWriteConcern, &unusedWCResult)); + + for (const auto& shardId : changedShardIds) { + auto shard = uassertStatusOK(shardRegistry->getShard(opCtx, shardId)); + uassertStatusOK( + Shard::CommandResponse::getEffectiveStatus(shard->runCommandWithFixedRetryAttempts( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + "admin", + BSON("_flushRoutingTableCacheUpdates" << nss.ns()), + Shard::RetryPolicy::kIdempotent))); + } +} + void ShardingCatalogManager::clearJumboFlag(OperationContext* opCtx, const NamespaceString& nss, const OID& collectionEpoch, |