diff options
Diffstat (limited to 'src/mongo/s/catalog/sharding_catalog_manager_collection_operations.cpp')
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_manager_collection_operations.cpp | 183 |
1 files changed, 183 insertions, 0 deletions
diff --git a/src/mongo/s/catalog/sharding_catalog_manager_collection_operations.cpp b/src/mongo/s/catalog/sharding_catalog_manager_collection_operations.cpp index 71df221d917..8c7c872ba5a 100644 --- a/src/mongo/s/catalog/sharding_catalog_manager_collection_operations.cpp +++ b/src/mongo/s/catalog/sharding_catalog_manager_collection_operations.cpp @@ -46,6 +46,7 @@ #include "mongo/db/operation_context.h" #include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/db/repl/repl_client_info.h" +#include "mongo/executor/network_interface.h" #include "mongo/executor/task_executor.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/balancer_configuration.h" @@ -221,6 +222,188 @@ void checkForExistingChunks(OperationContext* opCtx, const string& ns) { } // namespace +Status ShardingCatalogManager::dropCollection(OperationContext* opCtx, const NamespaceString& ns) { + const auto catalogClient = Grid::get(opCtx)->catalogClient(); + catalogClient + ->logChange(opCtx, + "dropCollection.start", + ns.ns(), + BSONObj(), + ShardingCatalogClientImpl::kMajorityWriteConcern) + .ignore(); + + auto shardsStatus = + catalogClient->getAllShards(opCtx, repl::ReadConcernLevel::kLocalReadConcern); + if (!shardsStatus.isOK()) { + return shardsStatus.getStatus(); + } + vector<ShardType> allShards = std::move(shardsStatus.getValue().value); + + LOG(1) << "dropCollection " << ns << " started"; + + const auto dropCommandBSON = [opCtx, &ns] { + BSONObjBuilder builder; + builder.append("drop", ns.coll()); + + if (!opCtx->getWriteConcern().usedDefault) { + builder.append(WriteConcernOptions::kWriteConcernField, + opCtx->getWriteConcern().toBSON()); + } + + return builder.obj(); + }(); + + std::map<std::string, BSONObj> errors; + auto* const shardRegistry = Grid::get(opCtx)->shardRegistry(); + + for (const auto& shardEntry : allShards) { + auto swShard = shardRegistry->getShard(opCtx, shardEntry.getName()); + if (!swShard.isOK()) { + return swShard.getStatus(); + } + + const auto& shard = swShard.getValue(); + + auto swDropResult = shard->runCommandWithFixedRetryAttempts( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + ns.db().toString(), + dropCommandBSON, + Shard::RetryPolicy::kIdempotent); + + if (!swDropResult.isOK()) { + return {swDropResult.getStatus().code(), + str::stream() << swDropResult.getStatus().reason() << " at " + << shardEntry.getName()}; + } + + auto& dropResult = swDropResult.getValue(); + + auto dropStatus = std::move(dropResult.commandStatus); + auto wcStatus = std::move(dropResult.writeConcernStatus); + if (!dropStatus.isOK() || !wcStatus.isOK()) { + if (dropStatus.code() == ErrorCodes::NamespaceNotFound && wcStatus.isOK()) { + // Generally getting NamespaceNotFound is okay to ignore as it simply means that + // the collection has already been dropped or doesn't exist on this shard. + // If, however, we get NamespaceNotFound but also have a write concern error then we + // can't confirm whether the fact that the namespace doesn't exist is actually + // committed. Thus we must still fail on NamespaceNotFound if there is also a write + // concern error. This can happen if we call drop, it succeeds but with a write + // concern error, then we retry the drop. + continue; + } + + errors.emplace(shardEntry.getHost(), std::move(dropResult.response)); + } + } + + if (!errors.empty()) { + StringBuilder sb; + sb << "Dropping collection failed on the following hosts: "; + + for (auto it = errors.cbegin(); it != errors.cend(); ++it) { + if (it != errors.cbegin()) { + sb << ", "; + } + + sb << it->first << ": " << it->second; + } + + return {ErrorCodes::OperationFailed, sb.str()}; + } + + LOG(1) << "dropCollection " << ns << " shard data deleted"; + + // Remove chunk data + Status result = + catalogClient->removeConfigDocuments(opCtx, + ChunkType::ConfigNS, + BSON(ChunkType::ns(ns.ns())), + ShardingCatalogClient::kMajorityWriteConcern); + if (!result.isOK()) { + return result; + } + + LOG(1) << "dropCollection " << ns << " chunk data deleted"; + + // Mark the collection as dropped + CollectionType coll; + coll.setNs(ns); + coll.setDropped(true); + coll.setEpoch(ChunkVersion::DROPPED().epoch()); + coll.setUpdatedAt(Grid::get(opCtx)->getNetwork()->now()); + + const bool upsert = false; + result = ShardingCatalogClientImpl::updateShardingCatalogEntryForCollection( + opCtx, ns.ns(), coll, upsert); + if (!result.isOK()) { + return result; + } + + LOG(1) << "dropCollection " << ns << " collection marked as dropped"; + + for (const auto& shardEntry : allShards) { + auto swShard = shardRegistry->getShard(opCtx, shardEntry.getName()); + if (!swShard.isOK()) { + return swShard.getStatus(); + } + + const auto& shard = swShard.getValue(); + + SetShardVersionRequest ssv = SetShardVersionRequest::makeForVersioningNoPersist( + shardRegistry->getConfigServerConnectionString(), + shardEntry.getName(), + fassertStatusOK(28781, ConnectionString::parse(shardEntry.getHost())), + ns, + ChunkVersion::DROPPED(), + true); + + auto ssvResult = shard->runCommandWithFixedRetryAttempts( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + "admin", + ssv.toBSON(), + Shard::RetryPolicy::kIdempotent); + + if (!ssvResult.isOK()) { + return ssvResult.getStatus(); + } + + auto ssvStatus = std::move(ssvResult.getValue().commandStatus); + if (!ssvStatus.isOK()) { + return ssvStatus; + } + + auto unsetShardingStatus = shard->runCommandWithFixedRetryAttempts( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + "admin", + BSON("unsetSharding" << 1), + Shard::RetryPolicy::kIdempotent); + + if (!unsetShardingStatus.isOK()) { + return unsetShardingStatus.getStatus(); + } + + auto unsetShardingResult = std::move(unsetShardingStatus.getValue().commandStatus); + if (!unsetShardingResult.isOK()) { + return unsetShardingResult; + } + } + + LOG(1) << "dropCollection " << ns << " completed"; + + catalogClient + ->logChange(opCtx, + "dropCollection", + ns.ns(), + BSONObj(), + ShardingCatalogClientImpl::kMajorityWriteConcern) + .ignore(); + + return Status::OK(); +} + void ShardingCatalogManager::shardCollection(OperationContext* opCtx, const string& ns, const boost::optional<UUID> uuid, |