diff options
author | Kaitlin Mahar <kaitlin.mahar@mongodb.com> | 2018-01-16 11:15:05 -0500 |
---|---|---|
committer | Kaitlin Mahar <kaitlin.mahar@mongodb.com> | 2018-01-16 21:32:19 -0500 |
commit | 23870b6aecac924a15af49bb7abe2f8e1cda2aa8 (patch) | |
tree | 066f8693d65ad71f54331ef95ab6e89f0351dce6 /src | |
parent | 7ed79c16f619cab2195edf9cad37a3c4765c8a23 (diff) | |
download | mongo-23870b6aecac924a15af49bb7abe2f8e1cda2aa8.tar.gz |
SERVER-30744 Move dropCollection logic from ShardingCatalogClient into ShardingCatalogManager
Diffstat (limited to 'src')
10 files changed, 199 insertions, 199 deletions
diff --git a/src/mongo/db/s/config/configsvr_drop_collection_command.cpp b/src/mongo/db/s/config/configsvr_drop_collection_command.cpp index 2b3987585bc..5d4eb31bf50 100644 --- a/src/mongo/db/s/config/configsvr_drop_collection_command.cpp +++ b/src/mongo/db/s/config/configsvr_drop_collection_command.cpp @@ -36,6 +36,7 @@ #include "mongo/db/repl/repl_client_info.h" #include "mongo/s/catalog/dist_lock_manager.h" #include "mongo/s/catalog/sharding_catalog_client.h" +#include "mongo/s/catalog/sharding_catalog_manager.h" #include "mongo/s/catalog/type_database.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard_registry.h" @@ -151,7 +152,7 @@ public: opCtx, dbStatus.getValue().value.getPrimary(), nss, &result); } else { uassertStatusOK(collStatus); - uassertStatusOK(catalogClient->dropCollection(opCtx, nss)); + uassertStatusOK(ShardingCatalogManager::get(opCtx)->dropCollection(opCtx, nss)); } return true; diff --git a/src/mongo/db/s/config/configsvr_drop_database_command.cpp b/src/mongo/db/s/config/configsvr_drop_database_command.cpp index c772c34cf97..e024edb997a 100644 --- a/src/mongo/db/s/config/configsvr_drop_database_command.cpp +++ b/src/mongo/db/s/config/configsvr_drop_database_command.cpp @@ -146,7 +146,7 @@ public: for (const auto& nss : catalogManager->getAllShardedCollectionsForDb(opCtx, dbname)) { auto collDistLock = uassertStatusOK(catalogClient->getDistLockManager()->lock( opCtx, nss.ns(), "dropCollection", DistLockManager::kDefaultLockTimeout)); - uassertStatusOK(catalogClient->dropCollection(opCtx, nss)); + uassertStatusOK(catalogManager->dropCollection(opCtx, nss)); } // Drop the database from the primary shard first. diff --git a/src/mongo/s/catalog/sharding_catalog_client.h b/src/mongo/s/catalog/sharding_catalog_client.h index ebf1681e121..d4eac9da595 100644 --- a/src/mongo/s/catalog/sharding_catalog_client.h +++ b/src/mongo/s/catalog/sharding_catalog_client.h @@ -163,15 +163,6 @@ public: repl::ReadConcernLevel readConcernLevel = repl::ReadConcernLevel::kMajorityReadConcern) = 0; /** - * Drops the specified collection from the collection metadata store. - * - * Returns Status::OK if successful or any error code indicating the failure. These are - * some of the known failures: - * - NamespaceNotFound - collection does not exist - */ - virtual Status dropCollection(OperationContext* opCtx, const NamespaceString& ns) = 0; - - /** * Retrieves all databases for a shard. * * Returns a !OK status if an error occurs. diff --git a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp index 171f685eb10..378f4a44df6 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp +++ b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp @@ -424,183 +424,6 @@ StatusWith<std::vector<CollectionType>> ShardingCatalogClientImpl::getCollection return collections; } -Status ShardingCatalogClientImpl::dropCollection(OperationContext* opCtx, - const NamespaceString& ns) { - logChange(opCtx, - "dropCollection.start", - ns.ns(), - BSONObj(), - ShardingCatalogClientImpl::kMajorityWriteConcern) - .ignore(); - - auto shardsStatus = getAllShards(opCtx, repl::ReadConcernLevel::kLocalReadConcern); - if (!shardsStatus.isOK()) { - return shardsStatus.getStatus(); - } - vector<ShardType> allShards = std::move(shardsStatus.getValue().value); - - LOG(1) << "dropCollection " << ns << " started"; - - const auto dropCommandBSON = [opCtx, &ns] { - BSONObjBuilder builder; - builder.append("drop", ns.coll()); - - if (!opCtx->getWriteConcern().usedDefault) { - builder.append(WriteConcernOptions::kWriteConcernField, - opCtx->getWriteConcern().toBSON()); - } - - return builder.obj(); - }(); - - std::map<std::string, BSONObj> errors; - auto* const shardRegistry = Grid::get(opCtx)->shardRegistry(); - - for (const auto& shardEntry : allShards) { - auto swShard = shardRegistry->getShard(opCtx, shardEntry.getName()); - if (!swShard.isOK()) { - return swShard.getStatus(); - } - - const auto& shard = swShard.getValue(); - - auto swDropResult = shard->runCommandWithFixedRetryAttempts( - opCtx, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - ns.db().toString(), - dropCommandBSON, - Shard::RetryPolicy::kIdempotent); - - if (!swDropResult.isOK()) { - return {swDropResult.getStatus().code(), - str::stream() << swDropResult.getStatus().reason() << " at " - << shardEntry.getName()}; - } - - auto& dropResult = swDropResult.getValue(); - - auto dropStatus = std::move(dropResult.commandStatus); - auto wcStatus = std::move(dropResult.writeConcernStatus); - if (!dropStatus.isOK() || !wcStatus.isOK()) { - if (dropStatus.code() == ErrorCodes::NamespaceNotFound && wcStatus.isOK()) { - // Generally getting NamespaceNotFound is okay to ignore as it simply means that - // the collection has already been dropped or doesn't exist on this shard. - // If, however, we get NamespaceNotFound but also have a write concern error then we - // can't confirm whether the fact that the namespace doesn't exist is actually - // committed. Thus we must still fail on NamespaceNotFound if there is also a write - // concern error. This can happen if we call drop, it succeeds but with a write - // concern error, then we retry the drop. - continue; - } - - errors.emplace(shardEntry.getHost(), std::move(dropResult.response)); - } - } - - if (!errors.empty()) { - StringBuilder sb; - sb << "Dropping collection failed on the following hosts: "; - - for (auto it = errors.cbegin(); it != errors.cend(); ++it) { - if (it != errors.cbegin()) { - sb << ", "; - } - - sb << it->first << ": " << it->second; - } - - return {ErrorCodes::OperationFailed, sb.str()}; - } - - LOG(1) << "dropCollection " << ns << " shard data deleted"; - - // Remove chunk data - Status result = removeConfigDocuments(opCtx, - ChunkType::ConfigNS, - BSON(ChunkType::ns(ns.ns())), - ShardingCatalogClient::kMajorityWriteConcern); - if (!result.isOK()) { - return result; - } - - LOG(1) << "dropCollection " << ns << " chunk data deleted"; - - // Mark the collection as dropped - CollectionType coll; - coll.setNs(ns); - coll.setDropped(true); - coll.setEpoch(ChunkVersion::DROPPED().epoch()); - coll.setUpdatedAt(Grid::get(opCtx)->getNetwork()->now()); - - const bool upsert = false; - result = updateShardingCatalogEntryForCollection(opCtx, ns.ns(), coll, upsert); - if (!result.isOK()) { - return result; - } - - LOG(1) << "dropCollection " << ns << " collection marked as dropped"; - - for (const auto& shardEntry : allShards) { - auto swShard = shardRegistry->getShard(opCtx, shardEntry.getName()); - if (!swShard.isOK()) { - return swShard.getStatus(); - } - - const auto& shard = swShard.getValue(); - - SetShardVersionRequest ssv = SetShardVersionRequest::makeForVersioningNoPersist( - shardRegistry->getConfigServerConnectionString(), - shardEntry.getName(), - fassertStatusOK(28781, ConnectionString::parse(shardEntry.getHost())), - ns, - ChunkVersion::DROPPED(), - true); - - auto ssvResult = shard->runCommandWithFixedRetryAttempts( - opCtx, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - "admin", - ssv.toBSON(), - Shard::RetryPolicy::kIdempotent); - - if (!ssvResult.isOK()) { - return ssvResult.getStatus(); - } - - auto ssvStatus = std::move(ssvResult.getValue().commandStatus); - if (!ssvStatus.isOK()) { - return ssvStatus; - } - - auto unsetShardingStatus = shard->runCommandWithFixedRetryAttempts( - opCtx, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - "admin", - BSON("unsetSharding" << 1), - Shard::RetryPolicy::kIdempotent); - - if (!unsetShardingStatus.isOK()) { - return unsetShardingStatus.getStatus(); - } - - auto unsetShardingResult = std::move(unsetShardingStatus.getValue().commandStatus); - if (!unsetShardingResult.isOK()) { - return unsetShardingResult; - } - } - - LOG(1) << "dropCollection " << ns << " completed"; - - logChange(opCtx, - "dropCollection", - ns.ns(), - BSONObj(), - ShardingCatalogClientImpl::kMajorityWriteConcern) - .ignore(); - - return Status::OK(); -} - StatusWith<BSONObj> ShardingCatalogClientImpl::getGlobalSettings(OperationContext* opCtx, StringData key) { auto findStatus = _exhaustiveFindOnConfig(opCtx, diff --git a/src/mongo/s/catalog/sharding_catalog_client_impl.h b/src/mongo/s/catalog/sharding_catalog_client_impl.h index 94b080538b2..0766a44b279 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_impl.h +++ b/src/mongo/s/catalog/sharding_catalog_client_impl.h @@ -102,8 +102,6 @@ public: repl::OpTime* optime, repl::ReadConcernLevel readConcernLevel) override; - Status dropCollection(OperationContext* opCtx, const NamespaceString& ns) override; - StatusWith<std::vector<std::string>> getDatabasesForShard(OperationContext* opCtx, const ShardId& shardName) override; diff --git a/src/mongo/s/catalog/sharding_catalog_client_mock.cpp b/src/mongo/s/catalog/sharding_catalog_client_mock.cpp index 6bee2c72739..2fe043be4e4 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_mock.cpp +++ b/src/mongo/s/catalog/sharding_catalog_client_mock.cpp @@ -92,11 +92,6 @@ StatusWith<std::vector<CollectionType>> ShardingCatalogClientMock::getCollection return {ErrorCodes::InternalError, "Method not implemented"}; } -Status ShardingCatalogClientMock::dropCollection(OperationContext* opCtx, - const NamespaceString& ns) { - return {ErrorCodes::InternalError, "Method not implemented"}; -} - StatusWith<std::vector<std::string>> ShardingCatalogClientMock::getDatabasesForShard( OperationContext* opCtx, const ShardId& shardName) { return {ErrorCodes::InternalError, "Method not implemented"}; diff --git a/src/mongo/s/catalog/sharding_catalog_client_mock.h b/src/mongo/s/catalog/sharding_catalog_client_mock.h index b537aa80b39..159d5d8c1b8 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_mock.h +++ b/src/mongo/s/catalog/sharding_catalog_client_mock.h @@ -67,8 +67,6 @@ public: repl::OpTime* optime, repl::ReadConcernLevel readConcernLevel) override; - Status dropCollection(OperationContext* opCtx, const NamespaceString& ns) override; - StatusWith<std::vector<std::string>> getDatabasesForShard(OperationContext* opCtx, const ShardId& shardName) override; diff --git a/src/mongo/s/catalog/sharding_catalog_drop_coll_test.cpp b/src/mongo/s/catalog/sharding_catalog_drop_coll_test.cpp index 426ba6e82db..86526197821 100644 --- a/src/mongo/s/catalog/sharding_catalog_drop_coll_test.cpp +++ b/src/mongo/s/catalog/sharding_catalog_drop_coll_test.cpp @@ -33,7 +33,7 @@ #include "mongo/client/remote_command_targeter_mock.h" #include "mongo/db/namespace_string.h" #include "mongo/rpc/metadata/tracking_metadata.h" -#include "mongo/s/catalog/sharding_catalog_client_impl.h" +#include "mongo/s/catalog/sharding_catalog_manager.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/chunk_version.h" #include "mongo/s/client/shard_registry.h" @@ -114,7 +114,7 @@ public: ON_BLOCK_EXIT([&] { Client::destroy(); }); Client::initThreadIfNotAlready("Test"); auto opCtx = cc().makeOperationContext(); - return catalogClient()->dropCollection(opCtx.get(), dropNS()); + return ShardingCatalogManager::get(opCtx.get())->dropCollection(opCtx.get(), dropNS()); } const NamespaceString& dropNS() const { diff --git a/src/mongo/s/catalog/sharding_catalog_manager.h b/src/mongo/s/catalog/sharding_catalog_manager.h index 2c36eef57f5..4c535825c7e 100644 --- a/src/mongo/s/catalog/sharding_catalog_manager.h +++ b/src/mongo/s/catalog/sharding_catalog_manager.h @@ -223,6 +223,16 @@ public: // /** + * Drops the specified collection from the collection metadata store. + * + * Returns Status::OK if successful or any error code indicating the failure. These are + * some of the known failures: + * - NamespaceNotFound - collection does not exist + */ + Status dropCollection(OperationContext* opCtx, const NamespaceString& ns); + + + /** * Shards a collection. Assumes that the database is enabled for sharding. * * @param ns: namespace of collection to shard @@ -247,6 +257,7 @@ public: const bool distributeInitialChunks, const ShardId& dbPrimaryShardId); + /** * Iterates through each entry in config.collections that does not have a UUID, generates a UUID * for the collection, and updates the entry with the generated UUID. diff --git a/src/mongo/s/catalog/sharding_catalog_manager_collection_operations.cpp b/src/mongo/s/catalog/sharding_catalog_manager_collection_operations.cpp index 71df221d917..8c7c872ba5a 100644 --- a/src/mongo/s/catalog/sharding_catalog_manager_collection_operations.cpp +++ b/src/mongo/s/catalog/sharding_catalog_manager_collection_operations.cpp @@ -46,6 +46,7 @@ #include "mongo/db/operation_context.h" #include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/db/repl/repl_client_info.h" +#include "mongo/executor/network_interface.h" #include "mongo/executor/task_executor.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/balancer_configuration.h" @@ -221,6 +222,188 @@ void checkForExistingChunks(OperationContext* opCtx, const string& ns) { } // namespace +Status ShardingCatalogManager::dropCollection(OperationContext* opCtx, const NamespaceString& ns) { + const auto catalogClient = Grid::get(opCtx)->catalogClient(); + catalogClient + ->logChange(opCtx, + "dropCollection.start", + ns.ns(), + BSONObj(), + ShardingCatalogClientImpl::kMajorityWriteConcern) + .ignore(); + + auto shardsStatus = + catalogClient->getAllShards(opCtx, repl::ReadConcernLevel::kLocalReadConcern); + if (!shardsStatus.isOK()) { + return shardsStatus.getStatus(); + } + vector<ShardType> allShards = std::move(shardsStatus.getValue().value); + + LOG(1) << "dropCollection " << ns << " started"; + + const auto dropCommandBSON = [opCtx, &ns] { + BSONObjBuilder builder; + builder.append("drop", ns.coll()); + + if (!opCtx->getWriteConcern().usedDefault) { + builder.append(WriteConcernOptions::kWriteConcernField, + opCtx->getWriteConcern().toBSON()); + } + + return builder.obj(); + }(); + + std::map<std::string, BSONObj> errors; + auto* const shardRegistry = Grid::get(opCtx)->shardRegistry(); + + for (const auto& shardEntry : allShards) { + auto swShard = shardRegistry->getShard(opCtx, shardEntry.getName()); + if (!swShard.isOK()) { + return swShard.getStatus(); + } + + const auto& shard = swShard.getValue(); + + auto swDropResult = shard->runCommandWithFixedRetryAttempts( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + ns.db().toString(), + dropCommandBSON, + Shard::RetryPolicy::kIdempotent); + + if (!swDropResult.isOK()) { + return {swDropResult.getStatus().code(), + str::stream() << swDropResult.getStatus().reason() << " at " + << shardEntry.getName()}; + } + + auto& dropResult = swDropResult.getValue(); + + auto dropStatus = std::move(dropResult.commandStatus); + auto wcStatus = std::move(dropResult.writeConcernStatus); + if (!dropStatus.isOK() || !wcStatus.isOK()) { + if (dropStatus.code() == ErrorCodes::NamespaceNotFound && wcStatus.isOK()) { + // Generally getting NamespaceNotFound is okay to ignore as it simply means that + // the collection has already been dropped or doesn't exist on this shard. + // If, however, we get NamespaceNotFound but also have a write concern error then we + // can't confirm whether the fact that the namespace doesn't exist is actually + // committed. Thus we must still fail on NamespaceNotFound if there is also a write + // concern error. This can happen if we call drop, it succeeds but with a write + // concern error, then we retry the drop. + continue; + } + + errors.emplace(shardEntry.getHost(), std::move(dropResult.response)); + } + } + + if (!errors.empty()) { + StringBuilder sb; + sb << "Dropping collection failed on the following hosts: "; + + for (auto it = errors.cbegin(); it != errors.cend(); ++it) { + if (it != errors.cbegin()) { + sb << ", "; + } + + sb << it->first << ": " << it->second; + } + + return {ErrorCodes::OperationFailed, sb.str()}; + } + + LOG(1) << "dropCollection " << ns << " shard data deleted"; + + // Remove chunk data + Status result = + catalogClient->removeConfigDocuments(opCtx, + ChunkType::ConfigNS, + BSON(ChunkType::ns(ns.ns())), + ShardingCatalogClient::kMajorityWriteConcern); + if (!result.isOK()) { + return result; + } + + LOG(1) << "dropCollection " << ns << " chunk data deleted"; + + // Mark the collection as dropped + CollectionType coll; + coll.setNs(ns); + coll.setDropped(true); + coll.setEpoch(ChunkVersion::DROPPED().epoch()); + coll.setUpdatedAt(Grid::get(opCtx)->getNetwork()->now()); + + const bool upsert = false; + result = ShardingCatalogClientImpl::updateShardingCatalogEntryForCollection( + opCtx, ns.ns(), coll, upsert); + if (!result.isOK()) { + return result; + } + + LOG(1) << "dropCollection " << ns << " collection marked as dropped"; + + for (const auto& shardEntry : allShards) { + auto swShard = shardRegistry->getShard(opCtx, shardEntry.getName()); + if (!swShard.isOK()) { + return swShard.getStatus(); + } + + const auto& shard = swShard.getValue(); + + SetShardVersionRequest ssv = SetShardVersionRequest::makeForVersioningNoPersist( + shardRegistry->getConfigServerConnectionString(), + shardEntry.getName(), + fassertStatusOK(28781, ConnectionString::parse(shardEntry.getHost())), + ns, + ChunkVersion::DROPPED(), + true); + + auto ssvResult = shard->runCommandWithFixedRetryAttempts( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + "admin", + ssv.toBSON(), + Shard::RetryPolicy::kIdempotent); + + if (!ssvResult.isOK()) { + return ssvResult.getStatus(); + } + + auto ssvStatus = std::move(ssvResult.getValue().commandStatus); + if (!ssvStatus.isOK()) { + return ssvStatus; + } + + auto unsetShardingStatus = shard->runCommandWithFixedRetryAttempts( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + "admin", + BSON("unsetSharding" << 1), + Shard::RetryPolicy::kIdempotent); + + if (!unsetShardingStatus.isOK()) { + return unsetShardingStatus.getStatus(); + } + + auto unsetShardingResult = std::move(unsetShardingStatus.getValue().commandStatus); + if (!unsetShardingResult.isOK()) { + return unsetShardingResult; + } + } + + LOG(1) << "dropCollection " << ns << " completed"; + + catalogClient + ->logChange(opCtx, + "dropCollection", + ns.ns(), + BSONObj(), + ShardingCatalogClientImpl::kMajorityWriteConcern) + .ignore(); + + return Status::OK(); +} + void ShardingCatalogManager::shardCollection(OperationContext* opCtx, const string& ns, const boost::optional<UUID> uuid, |