diff options
author | Martin Neupauer <martin.neupauer@mongodb.com> | 2018-04-10 10:57:18 -0400 |
---|---|---|
committer | Martin Neupauer <martin.neupauer@mongodb.com> | 2018-04-13 12:53:11 -0400 |
commit | fde45e48ef4a86fb19604843fdee272a22fcd475 (patch) | |
tree | 5902e57d0275a66fdafe65a3e15ad0b34d1ee640 /src/mongo/db/s/config | |
parent | b1e0f61e6d78c56248997c26f505b4519c84833f (diff) | |
download | mongo-fde45e48ef4a86fb19604843fdee272a22fcd475.tar.gz |
SERVER-33781 upgrade/downgrade config.chunks metadata with a new history field.
Diffstat (limited to 'src/mongo/db/s/config')
3 files changed, 306 insertions, 55 deletions
diff --git a/src/mongo/db/s/config/sharding_catalog_manager.h b/src/mongo/db/s/config/sharding_catalog_manager.h index a41c1c2ce0c..c2fe589252b 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager.h +++ b/src/mongo/db/s/config/sharding_catalog_manager.h @@ -353,6 +353,25 @@ public: */ static void clearForTests(ServiceContext* serviceContext); + // + // Upgrade/downgrade + // + + /** + * Upgrade the chunk metadata to include the history field. + */ + Status upgradeChunksHistory(OperationContext* opCtx, + const NamespaceString& nss, + const OID& collectionEpoch, + const Timestamp validAfter); + + /** + * Remove the history field from the chunk metadata. + */ + Status downgradeChunksHistory(OperationContext* opCtx, + const NamespaceString& nss, + const OID& collectionEpoch); + private: /** * Performs the necessary checks for version compatibility and creates a new config.version @@ -455,6 +474,20 @@ private: const std::vector<BSONObj>& initPoints, const bool distributeInitialChunks); + /** + * Retrieve the full chunk description from the config. + */ + StatusWith<ChunkType> _findChunkOnConfig(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& key); + + /** + * Retrieve the the latest collection version from the config. + */ + StatusWith<ChunkVersion> _findCollectionVersion(OperationContext* opCtx, + const NamespaceString& nss, + const OID& collectionEpoch); + // The owning service context ServiceContext* const _serviceContext; diff --git a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp index 5f32fca980a..e4a73bc6d54 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp @@ -88,12 +88,13 @@ BSONArray buildMergeChunksTransactionUpdates(const std::vector<ChunkType>& chunk // fill in additional details for sending through transaction mergedChunk.setVersion(mergeVersion); - // Clear the chunk history - std::vector<ChunkHistory> history; - if (validAfter) { - history.emplace_back(ChunkHistory(validAfter.get(), mergedChunk.getShard())); + // FCV 3.6 does not have the history field in the persisted metadata + if (serverGlobalParams.featureCompatibility.getVersion() >= + ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo40) { + + invariant(validAfter); + mergedChunk.setHistory({ChunkHistory(validAfter.get(), mergedChunk.getShard())}); } - mergedChunk.setHistory(std::move(history)); // add the new chunk information as the update object op.append("o", mergedChunk.toConfigBSON()); @@ -286,27 +287,7 @@ Status ShardingCatalogManager::commitChunkSplit(OperationContext* opCtx, } // Find the chunk history. - auto findHistory = Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( - opCtx, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - repl::ReadConcernLevel::kLocalReadConcern, - ChunkType::ConfigNS, - BSON(ChunkType::name << ChunkType::genID(nss, range.getMin())), - BSONObj(), - 1); - if (!findHistory.isOK()) { - return findHistory.getStatus(); - } - - const auto origChunks = std::move(findHistory.getValue().docs); - if (origChunks.size() != 1) { - return {ErrorCodes::IncompatibleShardingMetadata, - str::stream() << "Tried to find the chunk history for '" - << ChunkType::genID(nss, range.getMin()) - << ", but found no chunks"}; - } - - const auto origChunk = ChunkType::fromConfigBSON(origChunks.front()); + const auto origChunk = _findChunkOnConfig(opCtx, nss, range.getMin()); if (!origChunk.isOK()) { return origChunk.getStatus(); } @@ -372,7 +353,13 @@ Status ShardingCatalogManager::commitChunkSplit(OperationContext* opCtx, n.append(ChunkType::min(), startKey); n.append(ChunkType::max(), endKey); n.append(ChunkType::shard(), shardName); - origChunk.getValue().addHistoryToBSON(n); + + // FCV 3.6 does not have the history field in the persisted metadata + if (serverGlobalParams.featureCompatibility.getVersion() >= + ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo40) { + origChunk.getValue().addHistoryToBSON(n); + } + n.done(); // add the chunk's _id as the query part of the update statement @@ -479,6 +466,11 @@ Status ShardingCatalogManager::commitChunkMerge(OperationContext* opCtx, // move chunks on different collections to proceed in parallel Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock); + if (serverGlobalParams.featureCompatibility.getVersion() >= + ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo40 && + !validAfter) { + return {ErrorCodes::IllegalOperation, "chunk operation requires validAfter timestamp"}; + } // Get the chunk with the highest version for this namespace auto findStatus = Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( opCtx, @@ -597,6 +589,12 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMigration( // (Note: This is not needed while we have a global lock, taken here only for consistency.) Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock); + if (serverGlobalParams.featureCompatibility.getVersion() >= + ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo40 && + !validAfter) { + return {ErrorCodes::IllegalOperation, "chunk operation requires validAfter timestamp"}; + } + // Must use local read concern because we will perform subsequent writes. auto findResponse = configShard->exhaustiveFindOnConfig(opCtx, @@ -664,27 +662,7 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMigration( } // Find the chunk history. - auto findHistory = configShard->exhaustiveFindOnConfig( - opCtx, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - repl::ReadConcernLevel::kLocalReadConcern, - ChunkType::ConfigNS, - BSON(ChunkType::name << ChunkType::genID(nss, migratedChunk.getMin())), - BSONObj(), - 1); - if (!findHistory.isOK()) { - return findHistory.getStatus(); - } - - const auto origChunks = std::move(findHistory.getValue().docs); - if (origChunks.size() != 1) { - return {ErrorCodes::IncompatibleShardingMetadata, - str::stream() << "Tried to find the chunk history for '" - << ChunkType::genID(nss, migratedChunk.getMin()) - << ", but found no chunks"}; - } - - const auto origChunk = ChunkType::fromConfigBSON(origChunks.front()); + const auto origChunk = _findChunkOnConfig(opCtx, nss, migratedChunk.getMin()); if (!origChunk.isOK()) { return origChunk.getStatus(); } @@ -701,7 +679,10 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMigration( const int kHistorySecs = 10; // Update the history of the migrated chunk. - if (validAfter) { + if (serverGlobalParams.featureCompatibility.getVersion() >= + ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo40) { + invariant(validAfter); + // Drop the history that is too old (10 seconds of history for now). // TODO SERVER-33831 to update the old history removal policy. while (!newHistory.empty() && @@ -721,7 +702,7 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMigration( } newHistory.emplace(newHistory.begin(), ChunkHistory(validAfter.get(), toShard)); } else { - // TODO: SERVER-33781 FCV 3.6 should not have any history + // FCV 3.6 does not have the history field in the persisted metadata newHistory.clear(); } newMigratedChunk.setHistory(std::move(newHistory)); @@ -729,9 +710,22 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMigration( // Control chunk's minor version will be 1 (if control chunk is present). boost::optional<ChunkType> newControlChunk = boost::none; if (controlChunk) { - newControlChunk = controlChunk.get(); + // Find the chunk history. + const auto origControlChunk = _findChunkOnConfig(opCtx, nss, controlChunk->getMin()); + if (!origControlChunk.isOK()) { + return origControlChunk.getStatus(); + } + + newControlChunk = origControlChunk.getValue(); newControlChunk->setVersion(ChunkVersion( currentCollectionVersion.majorVersion() + 1, 1, currentCollectionVersion.epoch())); + + // Copy the history of the control chunk. + if (serverGlobalParams.featureCompatibility.getVersion() < + ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo40) { + // FCV 3.6 does not have the history field in the persisted metadata + newControlChunk->setHistory({}); + } } auto command = makeCommitChunkTransactionCommand( @@ -762,4 +756,226 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMigration( return result.obj(); } +StatusWith<ChunkType> ShardingCatalogManager::_findChunkOnConfig(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& key) { + auto const configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); + + auto findResponse = + configShard->exhaustiveFindOnConfig(opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + ChunkType::ConfigNS, + BSON(ChunkType::name << ChunkType::genID(nss, key)), + BSONObj(), + 1); + + if (!findResponse.isOK()) { + return findResponse.getStatus(); + } + + const auto origChunks = std::move(findResponse.getValue().docs); + if (origChunks.size() != 1) { + return {ErrorCodes::IncompatibleShardingMetadata, + str::stream() << "Tried to find the chunk for '" << ChunkType::genID(nss, key) + << ", but found no chunks"}; + } + + return ChunkType::fromConfigBSON(origChunks.front()); +} + +Status ShardingCatalogManager::upgradeChunksHistory(OperationContext* opCtx, + const NamespaceString& nss, + const OID& collectionEpoch, + const Timestamp validAfter) { + auto const configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); + auto const catalogClient = Grid::get(opCtx)->catalogClient(); + + // Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and + // migrations. + Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock); + + auto findResponse = + configShard->exhaustiveFindOnConfig(opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + ChunkType::ConfigNS, + BSON("ns" << nss.ns()), + BSONObj(), + boost::none); + if (!findResponse.isOK()) { + return findResponse.getStatus(); + } + + const auto chunksVector = std::move(findResponse.getValue().docs); + if (chunksVector.empty()) { + return {ErrorCodes::IncompatibleShardingMetadata, + str::stream() << "Tried to find chunks for collection '" << nss.ns() + << ", but found no chunks"}; + } + + const auto currentCollectionVersion = _findCollectionVersion(opCtx, nss, collectionEpoch); + if (!currentCollectionVersion.isOK()) { + return currentCollectionVersion.getStatus(); + } + + // Bump the version. + auto newCollectionVersion = ChunkVersion(currentCollectionVersion.getValue().majorVersion() + 1, + 0, + currentCollectionVersion.getValue().epoch()); + + for (const auto& chunk : chunksVector) { + auto swChunk = ChunkType::fromConfigBSON(chunk); + if (!swChunk.isOK()) { + return swChunk.getStatus(); + } + auto& upgradeChunk = swChunk.getValue(); + + if (upgradeChunk.getHistory().empty()) { + + // Bump the version. + upgradeChunk.setVersion(newCollectionVersion); + newCollectionVersion.incMajor(); + + // Construct the fresh history. + upgradeChunk.setHistory({ChunkHistory{validAfter, upgradeChunk.getShard()}}); + + // Run the update. + auto status = + catalogClient->updateConfigDocument(opCtx, + ChunkType::ConfigNS, + BSON(ChunkType::name(upgradeChunk.getName())), + upgradeChunk.toConfigBSON(), + false, + ShardingCatalogClient::kLocalWriteConcern); + + if (!status.isOK()) { + return status.getStatus(); + } + } + } + + return Status::OK(); +} + +Status ShardingCatalogManager::downgradeChunksHistory(OperationContext* opCtx, + const NamespaceString& nss, + const OID& collectionEpoch) { + auto const configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); + auto const catalogClient = Grid::get(opCtx)->catalogClient(); + + // Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and + // migrations. + // + Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock); + + auto findResponse = + configShard->exhaustiveFindOnConfig(opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + ChunkType::ConfigNS, + BSON("ns" << nss.ns()), + BSONObj(), + boost::none); + if (!findResponse.isOK()) { + return findResponse.getStatus(); + } + + const auto chunksVector = std::move(findResponse.getValue().docs); + if (chunksVector.empty()) { + return {ErrorCodes::IncompatibleShardingMetadata, + str::stream() << "Tried to find chunks for collection '" << nss.ns() + << ", but found no chunks"}; + } + + const auto currentCollectionVersion = _findCollectionVersion(opCtx, nss, collectionEpoch); + if (!currentCollectionVersion.isOK()) { + return currentCollectionVersion.getStatus(); + } + + // Bump the version. + auto newCollectionVersion = ChunkVersion(currentCollectionVersion.getValue().majorVersion() + 1, + 0, + currentCollectionVersion.getValue().epoch()); + + for (const auto& chunk : chunksVector) { + auto swChunk = ChunkType::fromConfigBSON(chunk); + if (!swChunk.isOK()) { + return swChunk.getStatus(); + } + auto& downgradeChunk = swChunk.getValue(); + + // Bump the version. + downgradeChunk.setVersion(newCollectionVersion); + newCollectionVersion.incMajor(); + + // Clear the history. + downgradeChunk.setHistory({}); + + // Run the update. + auto status = + catalogClient->updateConfigDocument(opCtx, + ChunkType::ConfigNS, + BSON(ChunkType::name(downgradeChunk.getName())), + downgradeChunk.toConfigBSON(), + false, + ShardingCatalogClient::kLocalWriteConcern); + + if (!status.isOK()) { + return status.getStatus(); + } + } + + return Status::OK(); +} + +StatusWith<ChunkVersion> ShardingCatalogManager::_findCollectionVersion( + OperationContext* opCtx, const NamespaceString& nss, const OID& collectionEpoch) { + auto const configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); + + // Must use local read concern because we will perform subsequent writes. + auto findResponse = + configShard->exhaustiveFindOnConfig(opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + ChunkType::ConfigNS, + BSON("ns" << nss.ns()), + BSON(ChunkType::lastmod << -1), + 1); + if (!findResponse.isOK()) { + return findResponse.getStatus(); + } + + const auto chunksVector = std::move(findResponse.getValue().docs); + if (chunksVector.empty()) { + return {ErrorCodes::IncompatibleShardingMetadata, + str::stream() << "Tried to find max chunk version for collection '" << nss.ns() + << ", but found no chunks"}; + } + + const auto swChunk = ChunkType::fromConfigBSON(chunksVector.front()); + if (!swChunk.isOK()) { + return swChunk.getStatus(); + } + + const auto currentCollectionVersion = swChunk.getValue().getVersion(); + + // It is possible for a migration to end up running partly without the protection of the + // distributed lock if the config primary stepped down since the start of the migration and + // failed to recover the migration. Check that the collection has not been dropped and recreated + // since the migration began, unbeknown to the shard when the command was sent. + if (currentCollectionVersion.epoch() != collectionEpoch) { + return {ErrorCodes::StaleEpoch, + str::stream() << "The collection '" << nss.ns() + << "' has been dropped and recreated since the migration began." + " The config server's collection version epoch is now '" + << currentCollectionVersion.epoch().toString() + << "', but the shard's is " + << collectionEpoch.toString() + << "'."}; + } + + return currentCollectionVersion; +} + } // namespace mongo diff --git a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp index 73ee3e1c938..59d2511d671 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp @@ -293,10 +293,12 @@ ChunkVersion ShardingCatalogManager::_createFirstChunks(OperationContext* opCtx, chunk.setMax(max); chunk.setShard(shardIds[i % shardIds.size()]); chunk.setVersion(version); - // TODO SERVER-33781 write history only when FCV4.0 config. - std::vector<ChunkHistory> initialHistory; - initialHistory.emplace_back(ChunkHistory(validAfter, shardIds[i % shardIds.size()])); - chunk.setHistory(std::move(initialHistory)); + if (serverGlobalParams.featureCompatibility.getVersion() >= + ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo40) { + std::vector<ChunkHistory> initialHistory; + initialHistory.emplace_back(ChunkHistory(validAfter, shardIds[i % shardIds.size()])); + chunk.setHistory(std::move(initialHistory)); + } uassertStatusOK(Grid::get(opCtx)->catalogClient()->insertConfigDocument( opCtx, |