diff options
Diffstat (limited to 'src/mongo/db/s/config')
3 files changed, 267 insertions, 121 deletions
diff --git a/src/mongo/db/s/config/configsvr_repair_sharded_collection_chunks_history_command.cpp b/src/mongo/db/s/config/configsvr_repair_sharded_collection_chunks_history_command.cpp new file mode 100644 index 00000000000..138c66d4385 --- /dev/null +++ b/src/mongo/db/s/config/configsvr_repair_sharded_collection_chunks_history_command.cpp @@ -0,0 +1,163 @@ + +/** + * Copyright (C) 2021 MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/db/auth/action_type.h" +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/auth/privilege.h" +#include "mongo/db/commands.h" +#include "mongo/db/commands/feature_compatibility_version.h" +#include "mongo/db/commands/feature_compatibility_version_parser.h" +#include "mongo/db/logical_clock.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/s/config/sharding_catalog_manager.h" +#include "mongo/s/grid.h" +#include "mongo/util/log.h" + +namespace mongo { +namespace { + +class ConfigSvrRepairShardedCollectionChunksHistoryCommand : public BasicCommand { +public: + ConfigSvrRepairShardedCollectionChunksHistoryCommand() + : BasicCommand("_configsvrRepairShardedCollectionChunksHistory") {} + + std::string help() const override { + return "Internal command, which is exported by the sharding config server. Do not call " + "directly."; + } + + AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { + return AllowedOnSecondary::kNever; + } + + bool adminOnly() const override { + return true; + } + + bool supportsWriteConcern(const BSONObj& cmd) const override { + return true; + } + + std::string parseNs(const std::string& unusedDbName, const BSONObj& cmdObj) const override { + return CommandHelpers::parseNsFullyQualified(cmdObj); + } + + Status checkAuthForCommand(Client* client, + const std::string& dbname, + const BSONObj& cmdObj) const override { + if (!AuthorizationSession::get(client)->isAuthorizedForActionsOnResource( + ResourcePattern::forClusterResource(), ActionType::internal)) { + return Status(ErrorCodes::Unauthorized, "Unauthorized"); + } + return Status::OK(); + } + + bool run(OperationContext* opCtx, + const std::string& unusedDbName, + const BSONObj& cmdObj, + BSONObjBuilder& result) override { + uassert(ErrorCodes::IllegalOperation, + "_configsvrRepairShardedCollectionChunksHistory can only be run on config servers", + serverGlobalParams.clusterRole == ClusterRole::ConfigServer); + + // Set the operation context read concern level to local for reads into the config database. + repl::ReadConcernArgs::get(opCtx) = + repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern); + + uassert(ErrorCodes::InvalidOptions, + str::stream() << "_configsvrRepairShardedCollectionChunksHistory must be called with majority writeConcern, got " + << cmdObj, + opCtx->getWriteConcern().wMode == WriteConcernOptions::kMajority); + + const NamespaceString nss{parseNs(unusedDbName, cmdObj)}; + + // Ensure that the feature compatibility version stays stable while the command is executing + Lock::SharedLock lk(opCtx->lockState(), FeatureCompatibilityVersion::fcvLock); + uassert(620650, + str::stream() << "Collection history entries can only be repaired when the server " + "is in feature compatibility version " + << FeatureCompatibilityVersionParser::kVersion40, + serverGlobalParams.featureCompatibility.getVersion() == + ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo40); + + auto const catalogClient = Grid::get(opCtx)->catalogClient(); + auto collection = + uassertStatusOK( + catalogClient->getCollection(opCtx, nss, repl::ReadConcernLevel::kLocalReadConcern)) + .value; + + if (cmdObj["force"].booleanSafe()) { + LOG(0) << "Resetting the 'historyIsAt40' field for all chunks in collection " + << nss.ns() << " in order to force all chunks' history to get recreated"; + + BatchedCommandRequest request([&] { + write_ops::Update updateOp(ChunkType::ConfigNS); + updateOp.setUpdates({[&] { + write_ops::UpdateOpEntry entry; + entry.setQ(BSON("ns" << nss.ns())); + entry.setU(BSON("$unset" << BSON(ChunkType::historyIsAt40() << ""))); + entry.setUpsert(false); + entry.setMulti(true); + return entry; + }()}); + return updateOp; + }()); + request.setWriteConcern(ShardingCatalogClient::kLocalWriteConcern.toBSON()); + + const auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); + auto response = configShard->runBatchWriteCommand(opCtx, + Shard::kDefaultConfigCommandTimeout, + request, + Shard::RetryPolicy::kIdempotent); + uassertStatusOK(response.toStatus()); + + uassert(ErrorCodes::Error(620651), + str::stream() << "No chunks found for collection " << nss.ns(), + response.getN() > 0); + } + + auto validAfter = LogicalClock::get(opCtx)->getClusterTime().asTimestamp(); + + ShardingCatalogManager::get(opCtx)->upgradeChunksHistory( + opCtx, nss, collection.getEpoch(), validAfter); + + Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(nss); + + return true; + } + +} configSvrRepairShardedCollectionChunksHistoryCommand; + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/s/config/sharding_catalog_manager.h b/src/mongo/db/s/config/sharding_catalog_manager.h index b2f8b278165..0a257d4274f 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager.h +++ b/src/mongo/db/s/config/sharding_catalog_manager.h @@ -434,17 +434,15 @@ public: /** * Upgrade the chunk metadata to include the history field. */ - Status upgradeChunksHistory(OperationContext* opCtx, - const NamespaceString& nss, - const OID& collectionEpoch, - const Timestamp validAfter); + void upgradeChunksHistory(OperationContext* opCtx, + const NamespaceString& nss, + const OID& collectionEpoch, + const Timestamp validAfter); /** * Remove the history field from the chunk metadata. */ - Status downgradeChunksHistory(OperationContext* opCtx, - const NamespaceString& nss, - const OID& collectionEpoch); + void downgradeChunksHistory(OperationContext* opCtx, const NamespaceString& nss); private: /** diff --git a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp index c5e703e8129..056d676d273 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp @@ -43,7 +43,9 @@ #include "mongo/db/dbdirectclient.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" +#include "mongo/db/repl/repl_client_info.h" #include "mongo/db/server_parameters.h" +#include "mongo/db/write_concern.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/type_chunk.h" @@ -1126,140 +1128,123 @@ StatusWith<ChunkType> ShardingCatalogManager::_findChunkOnConfig(OperationContex return ChunkType::fromConfigBSON(origChunks.front()); } -Status ShardingCatalogManager::upgradeChunksHistory(OperationContext* opCtx, - const NamespaceString& nss, - const OID& collectionEpoch, - const Timestamp validAfter) { - auto const configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); +void ShardingCatalogManager::upgradeChunksHistory(OperationContext* opCtx, + const NamespaceString& nss, + const OID& collectionEpoch, + const Timestamp validAfter) { auto const catalogClient = Grid::get(opCtx)->catalogClient(); + const auto shardRegistry = Grid::get(opCtx)->shardRegistry(); // Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and // migrations. Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock); - auto findResponse = - configShard->exhaustiveFindOnConfig(opCtx, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - repl::ReadConcernLevel::kLocalReadConcern, - ChunkType::ConfigNS, - BSON("ns" << nss.ns()), - BSONObj(), - boost::none); - if (!findResponse.isOK()) { - return findResponse.getStatus(); - } - - const auto chunksVector = std::move(findResponse.getValue().docs); - if (chunksVector.empty()) { - return {ErrorCodes::IncompatibleShardingMetadata, - str::stream() << "Tried to find chunks for collection '" << nss.ns() - << ", but found no chunks"}; - } - - const auto currentCollectionVersion = _findCollectionVersion(opCtx, nss, collectionEpoch); - if (!currentCollectionVersion.isOK()) { - return currentCollectionVersion.getStatus(); - } + auto const configShard = shardRegistry->getConfigShard(); + const auto chunksVector = [&] { + auto findChunksResponse = uassertStatusOK( + configShard->exhaustiveFindOnConfig(opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + ChunkType::ConfigNS, + BSON("ns" << nss.ns()), + BSONObj(), + boost::none)); + uassert(ErrorCodes::Error(620652), + str::stream() << "No chunks found for collection " << nss.ns(), + !findChunksResponse.docs.empty()); + return std::move(findChunksResponse.docs); + }(); - // Bump the version. - auto newCollectionVersion = ChunkVersion(currentCollectionVersion.getValue().majorVersion() + 1, - 0, - currentCollectionVersion.getValue().epoch()); + const auto currentCollectionVersion = + uassertStatusOK(_findCollectionVersion(opCtx, nss, collectionEpoch)); - stdx::unordered_set<ShardId, ShardId::Hasher> bumpedShards; + // Bump the major version in order to be guaranteed to trigger refresh on every shard + ChunkVersion newCollectionVersion( + currentCollectionVersion.majorVersion() + 1, 0, currentCollectionVersion.epoch()); + std::set<ShardId> changedShardIds; for (const auto& chunk : chunksVector) { - auto swChunk = ChunkType::fromConfigBSON(chunk); - if (!swChunk.isOK()) { - return swChunk.getStatus(); + auto upgradeChunk = uassertStatusOK(ChunkType::fromConfigBSON(chunk)); + bool historyIsAt40 = chunk[ChunkType::historyIsAt40()].booleanSafe(); + if (historyIsAt40) { + uassert( + ErrorCodes::Error(620653), + str::stream() << "Chunk " << upgradeChunk.getName() << " in collection " << nss.ns() + << " indicates that it has been upgraded to version 4.0, but is " + "missing the history field. This indicates a corrupted routing " + "table and requires a manual intervention to be fixed.", + !upgradeChunk.getHistory().empty()); + continue; } - auto& upgradeChunk = swChunk.getValue(); - - if (upgradeChunk.getHistory().empty()) { - // Bump the version for only one chunk per shard to satisfy the requirement imposed by - // SERVER-33356 - const auto& shardId = upgradeChunk.getShard(); - if (!bumpedShards.count(shardId)) { - upgradeChunk.setVersion(newCollectionVersion); - newCollectionVersion.incMajor(); - bumpedShards.emplace(shardId); - } - - // Construct the fresh history. - upgradeChunk.setHistory({ChunkHistory{validAfter, upgradeChunk.getShard()}}); - - // Run the update. - auto status = - catalogClient->updateConfigDocument(opCtx, - ChunkType::ConfigNS, - BSON(ChunkType::name(upgradeChunk.getName())), - upgradeChunk.toConfigBSON(), - false, - ShardingCatalogClient::kLocalWriteConcern); - - if (!status.isOK()) { - return status.getStatus(); - } - } - } - - return Status::OK(); -} - -Status ShardingCatalogManager::downgradeChunksHistory(OperationContext* opCtx, - const NamespaceString& nss, - const OID& collectionEpoch) { - auto const configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); - auto const catalogClient = Grid::get(opCtx)->catalogClient(); - - // Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and - // migrations. - // - Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock); - - auto findResponse = - configShard->exhaustiveFindOnConfig(opCtx, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - repl::ReadConcernLevel::kLocalReadConcern, - ChunkType::ConfigNS, - BSON("ns" << nss.ns()), - BSONObj(), - boost::none); - if (!findResponse.isOK()) { - return findResponse.getStatus(); - } - const auto chunksVector = std::move(findResponse.getValue().docs); - if (chunksVector.empty()) { - return {ErrorCodes::IncompatibleShardingMetadata, - str::stream() << "Tried to find chunks for collection '" << nss.ns() - << ", but found no chunks"}; - } + upgradeChunk.setVersion(newCollectionVersion); + newCollectionVersion.incMinor(); + changedShardIds.emplace(upgradeChunk.getShard()); - for (const auto& chunk : chunksVector) { - auto swChunk = ChunkType::fromConfigBSON(chunk); - if (!swChunk.isOK()) { - return swChunk.getStatus(); - } - auto& downgradeChunk = swChunk.getValue(); + // Construct the fresh history. + upgradeChunk.setHistory({ChunkHistory{validAfter, upgradeChunk.getShard()}}); - // Clear the history. - downgradeChunk.setHistory({}); + // Set the 'historyIsAt40' field so that it gets skipped if the command is re-run + BSONObjBuilder chunkObjBuilder(upgradeChunk.toConfigBSON()); + chunkObjBuilder.appendBool(ChunkType::historyIsAt40(), true); // Run the update. - auto status = + uassertStatusOK( catalogClient->updateConfigDocument(opCtx, ChunkType::ConfigNS, - BSON(ChunkType::name(downgradeChunk.getName())), - downgradeChunk.toConfigBSON(), + BSON(ChunkType::name(upgradeChunk.getName())), + chunkObjBuilder.obj(), false, - ShardingCatalogClient::kLocalWriteConcern); - - if (!status.isOK()) { - return status.getStatus(); - } + ShardingCatalogClient::kLocalWriteConcern)); + } + + // Wait for the writes to become majority committed so that the subsequent shard refreshes can + // see them + const auto clientOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + WriteConcernResult unusedWCResult; + uassertStatusOK(waitForWriteConcern( + opCtx, clientOpTime, ShardingCatalogClient::kMajorityWriteConcern, &unusedWCResult)); + + for (const auto& shardId : changedShardIds) { + auto shard = uassertStatusOK(shardRegistry->getShard(opCtx, shardId)); + uassertStatusOK( + Shard::CommandResponse::getEffectiveStatus(shard->runCommandWithFixedRetryAttempts( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + "admin", + BSON("_flushRoutingTableCacheUpdates" << nss.ns()), + Shard::RetryPolicy::kIdempotent))); } +} - return Status::OK(); +void ShardingCatalogManager::downgradeChunksHistory(OperationContext* opCtx, + const NamespaceString& nss) { + // Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and + // migrations. + Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock); + + BatchedCommandRequest request([&] { + write_ops::Update updateOp(ChunkType::ConfigNS); + updateOp.setUpdates({[&] { + write_ops::UpdateOpEntry entry; + entry.setQ(BSON("ns" << nss.ns())); + entry.setU(BSON( + "$unset" << BSON(ChunkType::history() << "" << ChunkType::historyIsAt40() << ""))); + entry.setUpsert(false); + entry.setMulti(true); + return entry; + }()}); + return updateOp; + }()); + request.setWriteConcern(ShardingCatalogClient::kLocalWriteConcern.toBSON()); + + auto const configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); + auto response = configShard->runBatchWriteCommand( + opCtx, Shard::kDefaultConfigCommandTimeout, request, Shard::RetryPolicy::kIdempotent); + uassertStatusOK(response.toStatus()); + + uassert(ErrorCodes::Error(620654), + str::stream() << "No chunks found for collection " << nss.ns(), + response.getN() > 0); } StatusWith<ChunkVersion> ShardingCatalogManager::_findCollectionVersion( |