diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2017-05-19 21:39:09 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2017-06-14 15:11:22 -0400 |
commit | ca77c160790082f8efd64a98c586d4fd3f4f64b4 (patch) | |
tree | 6c5c06c71cc5e806feced9e8176fcd51f1d35e71 /src/mongo/s | |
parent | 629985c5ffacf32f71b17a0e797fcd74cf7b1f9d (diff) | |
download | mongo-ca77c160790082f8efd64a98c586d4fd3f4f64b4.tar.gz |
SERVER-29369 Get rid of one extra find in _configsvrCommitChunkMigration
Diffstat (limited to 'src/mongo/s')
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_manager_chunk_operations_impl.cpp | 155 |
1 files changed, 58 insertions, 97 deletions
diff --git a/src/mongo/s/catalog/sharding_catalog_manager_chunk_operations_impl.cpp b/src/mongo/s/catalog/sharding_catalog_manager_chunk_operations_impl.cpp index c1edeaad1dd..76479cbd5c3 100644 --- a/src/mongo/s/catalog/sharding_catalog_manager_chunk_operations_impl.cpp +++ b/src/mongo/s/catalog/sharding_catalog_manager_chunk_operations_impl.cpp @@ -132,66 +132,6 @@ BSONArray buildMergeChunksApplyOpsPrecond(const std::vector<ChunkType>& chunksTo return preCond.arr(); } -/** - * Checks that the epoch in the version the shard sent with the command matches the epoch of the - * collection version found on the config server. It is possible for a migration to end up running - * partly without the protection of the distributed lock. This function checks that the collection - * has not been dropped and recreated since the migration began, unbeknown to the shard when the - * command was sent. - */ -Status checkCollectionVersionEpoch(OperationContext* opCtx, - const NamespaceString& nss, - const ChunkType& aChunk, - const OID& collectionEpoch) { - auto findResponseWith = - Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( - opCtx, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - repl::ReadConcernLevel::kLocalReadConcern, - NamespaceString(ChunkType::ConfigNS), - BSON(ChunkType::ns() << nss.ns()), - BSONObj(), - 1); - if (!findResponseWith.isOK()) { - return findResponseWith.getStatus(); - } - - if (MONGO_FAIL_POINT(migrationCommitVersionError)) { - uassert(ErrorCodes::StaleEpoch, - "failpoint 'migrationCommitVersionError' generated error", - false); - } - - if (findResponseWith.getValue().docs.empty()) { - return Status( - ErrorCodes::IncompatibleShardingMetadata, - str::stream() - << "Could not find any chunks for collection '" - << nss.ns() - << "'. The collection has been dropped since the migration began. Aborting" - " migration commit for chunk (" - << redact(aChunk.getRange().toString()) - << ")."); - } - - auto chunkWith = ChunkType::fromConfigBSON(findResponseWith.getValue().docs.front()); - if (!chunkWith.isOK()) { - return chunkWith.getStatus(); - } else if (chunkWith.getValue().getVersion().epoch() != collectionEpoch) { - return Status(ErrorCodes::StaleEpoch, - str::stream() << "The collection '" << nss.ns() - << "' has been dropped and recreated since the migration began." - " The config server's collection version epoch is now '" - << chunkWith.getValue().getVersion().epoch().toString() - << "', but the shard's is " - << collectionEpoch.toString() - << "'. Aborting migration commit for chunk (" - << redact(aChunk.getRange().toString()) - << ")."); - } - return Status::OK(); -} - Status checkChunkIsOnShard(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& min, @@ -592,6 +532,8 @@ StatusWith<BSONObj> ShardingCatalogManagerImpl::commitChunkMigration( const ShardId& fromShard, const ShardId& toShard) { + auto const configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); + // Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and // migrations. // @@ -604,11 +546,54 @@ StatusWith<BSONObj> ShardingCatalogManagerImpl::commitChunkMigration( // (Note: This is not needed while we have a global lock, taken here only for consistency.) Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock); - // Ensure that the epoch passed in still matches the real state of the database. + // Must use local read concern because we will perform subsequent writes. + auto findResponse = + configShard->exhaustiveFindOnConfig(opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + NamespaceString(ChunkType::ConfigNS), + BSON("ns" << nss.ns()), + BSON(ChunkType::DEPRECATED_lastmod << -1), + 1); + if (!findResponse.isOK()) { + return findResponse.getStatus(); + } + + if (MONGO_FAIL_POINT(migrationCommitVersionError)) { + uassert(ErrorCodes::StaleEpoch, + "failpoint 'migrationCommitVersionError' generated error", + false); + } + + const auto chunksVector = std::move(findResponse.getValue().docs); + if (chunksVector.empty()) { + return {ErrorCodes::IncompatibleShardingMetadata, + str::stream() << "Tried to find max chunk version for collection '" << nss.ns() + << ", but found no chunks"}; + } + + const auto swChunk = ChunkType::fromConfigBSON(chunksVector.front()); + if (!swChunk.isOK()) { + return swChunk.getStatus(); + } + + const auto currentCollectionVersion = swChunk.getValue().getVersion(); - auto epochCheck = checkCollectionVersionEpoch(opCtx, nss, migratedChunk, collectionEpoch); - if (!epochCheck.isOK()) { - return epochCheck; + // It is possible for a migration to end up running partly without the protection of the + // distributed lock if the config primary stepped down since the start of the migration and + // failed to recover the migration. Check that the collection has not been dropped and recreated + // since the migration began, unbeknown to the shard when the command was sent. + if (currentCollectionVersion.epoch() != collectionEpoch) { + return {ErrorCodes::StaleEpoch, + str::stream() << "The collection '" << nss.ns() + << "' has been dropped and recreated since the migration began." + " The config server's collection version epoch is now '" + << currentCollectionVersion.epoch().toString() + << "', but the shard's is " + << collectionEpoch.toString() + << "'. Aborting migration commit for chunk (" + << migratedChunk.getRange().toString() + << ")."}; } // Check that migratedChunk and controlChunk are where they should be, on fromShard. @@ -627,51 +612,25 @@ StatusWith<BSONObj> ShardingCatalogManagerImpl::commitChunkMigration( } } - // Must use local read concern because we will perform subsequent writes. - auto findResponse = Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( - opCtx, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - repl::ReadConcernLevel::kLocalReadConcern, - NamespaceString(ChunkType::ConfigNS), - BSON("ns" << nss.ns()), - BSON(ChunkType::DEPRECATED_lastmod << -1), - 1); - if (!findResponse.isOK()) { - return findResponse.getStatus(); - } - - std::vector<BSONObj> chunksVector = std::move(findResponse.getValue().docs); - if (chunksVector.empty()) { - return Status(ErrorCodes::Error(40164), - str::stream() << "Tried to find max chunk version for collection '" - << nss.ns() - << ", but found no chunks"); - } - - ChunkVersion currentMaxVersion = - ChunkVersion::fromBSON(chunksVector.front(), ChunkType::DEPRECATED_lastmod()); - - // Use the incremented major version of the result returned. - - // Generate the new versions of migratedChunk and controlChunk. - // Migrating chunk's minor version will be 0. + // Generate the new versions of migratedChunk and controlChunk. Migrating chunk's minor version + // will be 0. ChunkType newMigratedChunk = migratedChunk; - newMigratedChunk.setVersion( - ChunkVersion(currentMaxVersion.majorVersion() + 1, 0, currentMaxVersion.epoch())); + newMigratedChunk.setVersion(ChunkVersion( + currentCollectionVersion.majorVersion() + 1, 0, currentCollectionVersion.epoch())); // Control chunk's minor version will be 1 (if control chunk is present). boost::optional<ChunkType> newControlChunk = boost::none; if (controlChunk) { newControlChunk = controlChunk.get(); - newControlChunk->setVersion( - ChunkVersion(currentMaxVersion.majorVersion() + 1, 1, currentMaxVersion.epoch())); + newControlChunk->setVersion(ChunkVersion( + currentCollectionVersion.majorVersion() + 1, 1, currentCollectionVersion.epoch())); } auto command = makeCommitChunkApplyOpsCommand( nss, newMigratedChunk, newControlChunk, fromShard.toString(), toShard.toString()); StatusWith<Shard::CommandResponse> applyOpsCommandResponse = - Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts( + configShard->runCommandWithFixedRetryAttempts( opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, nss.db().toString(), @@ -681,6 +640,7 @@ StatusWith<BSONObj> ShardingCatalogManagerImpl::commitChunkMigration( if (!applyOpsCommandResponse.isOK()) { return applyOpsCommandResponse.getStatus(); } + if (!applyOpsCommandResponse.getValue().commandStatus.isOK()) { return applyOpsCommandResponse.getValue().commandStatus; } @@ -690,6 +650,7 @@ StatusWith<BSONObj> ShardingCatalogManagerImpl::commitChunkMigration( if (controlChunk) { newControlChunk->getVersion().appendWithFieldForCommands(&result, "controlChunkVersion"); } + return result.obj(); } |