summaryrefslogtreecommitdiff
path: root/src/mongo/s/catalog/sharding_catalog_manager_collection_operations.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/catalog/sharding_catalog_manager_collection_operations.cpp')
-rw-r--r--src/mongo/s/catalog/sharding_catalog_manager_collection_operations.cpp183
1 files changed, 183 insertions, 0 deletions
diff --git a/src/mongo/s/catalog/sharding_catalog_manager_collection_operations.cpp b/src/mongo/s/catalog/sharding_catalog_manager_collection_operations.cpp
index 71df221d917..8c7c872ba5a 100644
--- a/src/mongo/s/catalog/sharding_catalog_manager_collection_operations.cpp
+++ b/src/mongo/s/catalog/sharding_catalog_manager_collection_operations.cpp
@@ -46,6 +46,7 @@
#include "mongo/db/operation_context.h"
#include "mongo/db/query/collation/collator_factory_interface.h"
#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/executor/network_interface.h"
#include "mongo/executor/task_executor.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/balancer_configuration.h"
@@ -221,6 +222,188 @@ void checkForExistingChunks(OperationContext* opCtx, const string& ns) {
} // namespace
+Status ShardingCatalogManager::dropCollection(OperationContext* opCtx, const NamespaceString& ns) {
+ const auto catalogClient = Grid::get(opCtx)->catalogClient();
+ catalogClient
+ ->logChange(opCtx,
+ "dropCollection.start",
+ ns.ns(),
+ BSONObj(),
+ ShardingCatalogClientImpl::kMajorityWriteConcern)
+ .ignore();
+
+ auto shardsStatus =
+ catalogClient->getAllShards(opCtx, repl::ReadConcernLevel::kLocalReadConcern);
+ if (!shardsStatus.isOK()) {
+ return shardsStatus.getStatus();
+ }
+ vector<ShardType> allShards = std::move(shardsStatus.getValue().value);
+
+ LOG(1) << "dropCollection " << ns << " started";
+
+ const auto dropCommandBSON = [opCtx, &ns] {
+ BSONObjBuilder builder;
+ builder.append("drop", ns.coll());
+
+ if (!opCtx->getWriteConcern().usedDefault) {
+ builder.append(WriteConcernOptions::kWriteConcernField,
+ opCtx->getWriteConcern().toBSON());
+ }
+
+ return builder.obj();
+ }();
+
+ std::map<std::string, BSONObj> errors;
+ auto* const shardRegistry = Grid::get(opCtx)->shardRegistry();
+
+ for (const auto& shardEntry : allShards) {
+ auto swShard = shardRegistry->getShard(opCtx, shardEntry.getName());
+ if (!swShard.isOK()) {
+ return swShard.getStatus();
+ }
+
+ const auto& shard = swShard.getValue();
+
+ auto swDropResult = shard->runCommandWithFixedRetryAttempts(
+ opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ ns.db().toString(),
+ dropCommandBSON,
+ Shard::RetryPolicy::kIdempotent);
+
+ if (!swDropResult.isOK()) {
+ return {swDropResult.getStatus().code(),
+ str::stream() << swDropResult.getStatus().reason() << " at "
+ << shardEntry.getName()};
+ }
+
+ auto& dropResult = swDropResult.getValue();
+
+ auto dropStatus = std::move(dropResult.commandStatus);
+ auto wcStatus = std::move(dropResult.writeConcernStatus);
+ if (!dropStatus.isOK() || !wcStatus.isOK()) {
+ if (dropStatus.code() == ErrorCodes::NamespaceNotFound && wcStatus.isOK()) {
+ // Generally getting NamespaceNotFound is okay to ignore as it simply means that
+ // the collection has already been dropped or doesn't exist on this shard.
+ // If, however, we get NamespaceNotFound but also have a write concern error then we
+ // can't confirm whether the fact that the namespace doesn't exist is actually
+ // committed. Thus we must still fail on NamespaceNotFound if there is also a write
+ // concern error. This can happen if we call drop, it succeeds but with a write
+ // concern error, then we retry the drop.
+ continue;
+ }
+
+ errors.emplace(shardEntry.getHost(), std::move(dropResult.response));
+ }
+ }
+
+ if (!errors.empty()) {
+ StringBuilder sb;
+ sb << "Dropping collection failed on the following hosts: ";
+
+ for (auto it = errors.cbegin(); it != errors.cend(); ++it) {
+ if (it != errors.cbegin()) {
+ sb << ", ";
+ }
+
+ sb << it->first << ": " << it->second;
+ }
+
+ return {ErrorCodes::OperationFailed, sb.str()};
+ }
+
+ LOG(1) << "dropCollection " << ns << " shard data deleted";
+
+ // Remove chunk data
+ Status result =
+ catalogClient->removeConfigDocuments(opCtx,
+ ChunkType::ConfigNS,
+ BSON(ChunkType::ns(ns.ns())),
+ ShardingCatalogClient::kMajorityWriteConcern);
+ if (!result.isOK()) {
+ return result;
+ }
+
+ LOG(1) << "dropCollection " << ns << " chunk data deleted";
+
+ // Mark the collection as dropped
+ CollectionType coll;
+ coll.setNs(ns);
+ coll.setDropped(true);
+ coll.setEpoch(ChunkVersion::DROPPED().epoch());
+ coll.setUpdatedAt(Grid::get(opCtx)->getNetwork()->now());
+
+ const bool upsert = false;
+ result = ShardingCatalogClientImpl::updateShardingCatalogEntryForCollection(
+ opCtx, ns.ns(), coll, upsert);
+ if (!result.isOK()) {
+ return result;
+ }
+
+ LOG(1) << "dropCollection " << ns << " collection marked as dropped";
+
+ for (const auto& shardEntry : allShards) {
+ auto swShard = shardRegistry->getShard(opCtx, shardEntry.getName());
+ if (!swShard.isOK()) {
+ return swShard.getStatus();
+ }
+
+ const auto& shard = swShard.getValue();
+
+ SetShardVersionRequest ssv = SetShardVersionRequest::makeForVersioningNoPersist(
+ shardRegistry->getConfigServerConnectionString(),
+ shardEntry.getName(),
+ fassertStatusOK(28781, ConnectionString::parse(shardEntry.getHost())),
+ ns,
+ ChunkVersion::DROPPED(),
+ true);
+
+ auto ssvResult = shard->runCommandWithFixedRetryAttempts(
+ opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ "admin",
+ ssv.toBSON(),
+ Shard::RetryPolicy::kIdempotent);
+
+ if (!ssvResult.isOK()) {
+ return ssvResult.getStatus();
+ }
+
+ auto ssvStatus = std::move(ssvResult.getValue().commandStatus);
+ if (!ssvStatus.isOK()) {
+ return ssvStatus;
+ }
+
+ auto unsetShardingStatus = shard->runCommandWithFixedRetryAttempts(
+ opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ "admin",
+ BSON("unsetSharding" << 1),
+ Shard::RetryPolicy::kIdempotent);
+
+ if (!unsetShardingStatus.isOK()) {
+ return unsetShardingStatus.getStatus();
+ }
+
+ auto unsetShardingResult = std::move(unsetShardingStatus.getValue().commandStatus);
+ if (!unsetShardingResult.isOK()) {
+ return unsetShardingResult;
+ }
+ }
+
+ LOG(1) << "dropCollection " << ns << " completed";
+
+ catalogClient
+ ->logChange(opCtx,
+ "dropCollection",
+ ns.ns(),
+ BSONObj(),
+ ShardingCatalogClientImpl::kMajorityWriteConcern)
+ .ignore();
+
+ return Status::OK();
+}
+
void ShardingCatalogManager::shardCollection(OperationContext* opCtx,
const string& ns,
const boost::optional<UUID> uuid,