diff options
-rw-r--r-- | src/mongo/db/s/range_deletion_util.cpp | 127 | ||||
-rw-r--r-- | src/mongo/db/s/range_deletion_util.h | 20 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_runtime_d_params.idl | 8 |
3 files changed, 68 insertions, 87 deletions
diff --git a/src/mongo/db/s/range_deletion_util.cpp b/src/mongo/db/s/range_deletion_util.cpp index 82b18854782..700412d18b3 100644 --- a/src/mongo/db/s/range_deletion_util.cpp +++ b/src/mongo/db/s/range_deletion_util.cpp @@ -29,14 +29,11 @@ #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kShardingRangeDeleter -#include "mongo/platform/basic.h" - #include "mongo/db/s/range_deletion_util.h" #include <algorithm> -#include <utility> - #include <boost/optional.hpp> +#include <utility> #include "mongo/db/catalog_raii.h" #include "mongo/db/client.h" @@ -56,6 +53,7 @@ #include "mongo/db/repl/wait_for_majority_service.h" #include "mongo/db/s/migration_util.h" #include "mongo/db/s/shard_key_index_util.h" +#include "mongo/db/s/sharding_runtime_d_params_gen.h" #include "mongo/db/s/sharding_statistics.h" #include "mongo/db/service_context.h" #include "mongo/db/storage/remove_saver.h" @@ -67,11 +65,7 @@ #include "mongo/util/future_util.h" namespace mongo { - namespace { -const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority, - WriteConcernOptions::SyncMode::UNSET, - WriteConcernOptions::kWriteConcernTimeoutSharding); MONGO_FAIL_POINT_DEFINE(hangBeforeDoingDeletion); MONGO_FAIL_POINT_DEFINE(hangAfterDoingDeletion); @@ -230,7 +224,6 @@ StatusWith<int> deleteNextBatch(OperationContext* opCtx, return numDeleted; } - template <typename Callable> auto withTemporaryOperationContext(Callable&& callable, const NamespaceString& nss) { ThreadClient tc(migrationutil::kRangeDeletionThreadName, getGlobalServiceContext()); @@ -284,9 +277,18 @@ void ensureRangeDeletionTaskStillExists(OperationContext* opCtx, const UUID& mig // holding any locks. } +void markRangeDeletionTaskAsProcessing(OperationContext* opCtx, const UUID& migrationId) { + PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace); + auto query = BSON(RangeDeletionTask::kIdFieldName << migrationId); + static const auto update = + BSON("$set" << BSON(RangeDeletionTask::kProcessingFieldName << true)); + + store.update(opCtx, query, update, WriteConcerns::kLocalWriteConcern); +} + /** - * Delete the range in a sequence of batches until there are no more documents to - * delete or deletion returns an error. + * Delete the range in a sequence of batches until there are no more documents to delete or deletion + * returns an error. */ ExecutorFuture<void> deleteRangeInBatches(const std::shared_ptr<executor::TaskExecutor>& executor, const NamespaceString& nss, @@ -309,45 +311,48 @@ ExecutorFuture<void> deleteRangeInBatches(const std::shared_ptr<executor::TaskEx ensureRangeDeletionTaskStillExists(opCtx, migrationId); - AutoGetCollection collection(opCtx, nss, MODE_IX); - - // Ensure the collection exists and has not been dropped or dropped and - // recreated. - uassert( - ErrorCodes::RangeDeletionAbandonedBecauseCollectionWithUUIDDoesNotExist, - "Collection has been dropped since enqueuing this range " - "deletion task. No need to delete documents.", - !collectionUuidHasChanged( - nss, collection.getCollection(), collectionUuid)); - - markAsProcessingRangeDeletionTask(opCtx, migrationId); - int numDeleted; - { - ScopedRangeDeleterLock rangeDeleterLock(opCtx, collectionUuid); - numDeleted = uassertStatusOK(deleteNextBatch(opCtx, - collection.getCollection(), - keyPattern, - range, - numDocsToRemovePerBatch)); - migrationutil::persistUpdatedNumOrphans( - opCtx, migrationId, collectionUuid, -numDeleted); - } - if (MONGO_unlikely(hangAfterDoingDeletion.shouldFail())) { - hangAfterDoingDeletion.pauseWhileSet(opCtx); + { + AutoGetCollection collection(opCtx, nss, MODE_IX); + + // Ensure the collection exists and has not been dropped or dropped and + // recreated + uassert(ErrorCodes:: + RangeDeletionAbandonedBecauseCollectionWithUUIDDoesNotExist, + "Collection has been dropped since enqueuing this range " + "deletion task. No need to delete documents.", + !collectionUuidHasChanged( + nss, collection.getCollection(), collectionUuid)); + + markRangeDeletionTaskAsProcessing(opCtx, migrationId); + + { + ScopedRangeDeleterLock rangeDeleterLock(opCtx, collectionUuid); + + numDeleted = + uassertStatusOK(deleteNextBatch(opCtx, + collection.getCollection(), + keyPattern, + range, + numDocsToRemovePerBatch)); + + migrationutil::persistUpdatedNumOrphans( + opCtx, migrationId, collectionUuid, -numDeleted); + } + + if (MONGO_unlikely(hangAfterDoingDeletion.shouldFail())) { + hangAfterDoingDeletion.pauseWhileSet(opCtx); + } } - LOGV2_DEBUG( - 23769, - 1, - "Deleted {numDeleted} documents in pass in namespace {namespace} with " - "UUID {collectionUUID} for range {range}", - "Deleted documents in pass", - "numDeleted"_attr = numDeleted, - "namespace"_attr = nss.ns(), - "collectionUUID"_attr = collectionUuid, - "range"_attr = range.toString()); + LOGV2_DEBUG(23769, + 1, + "Deleted documents in pass", + "numDeleted"_attr = numDeleted, + "namespace"_attr = nss.ns(), + "collectionUUID"_attr = collectionUuid, + "range"_attr = range.toString()); if (numDeleted > 0) { // (SERVER-62368) The range-deleter executor is mono-threaded, so @@ -356,21 +361,22 @@ ExecutorFuture<void> deleteRangeInBatches(const std::shared_ptr<executor::TaskEx opCtx->sleepFor(delayBetweenBatches); } - return numDeleted; + return numDeleted == 0; }, nss); }) - .until([=](StatusWith<int> swNumDeleted) { - // Continue iterating until there are no more documents to delete, retrying on - // any error that doesn't indicate that this node is stepping down. - return (swNumDeleted.isOK() && swNumDeleted.getValue() < numDocsToRemovePerBatch) || - swNumDeleted.getStatus() == + .until([=](StatusWith<bool> swAllDocumentsInRangeDeleted) { + // Continue iterating until there are no more documents to delete, retrying on any error + // that doesn't indicate that this node is stepping down. + return (swAllDocumentsInRangeDeleted.isOK() && + swAllDocumentsInRangeDeleted.getValue()) || + swAllDocumentsInRangeDeleted == ErrorCodes::RangeDeletionAbandonedBecauseCollectionWithUUIDDoesNotExist || - swNumDeleted.getStatus() == + swAllDocumentsInRangeDeleted == ErrorCodes::RangeDeletionAbandonedBecauseTaskDocumentDoesNotExist || - swNumDeleted.getStatus().code() == ErrorCodes::KeyPatternShorterThanBound || - ErrorCodes::isShutdownError(swNumDeleted.getStatus()) || - ErrorCodes::isNotPrimaryError(swNumDeleted.getStatus()); + swAllDocumentsInRangeDeleted == ErrorCodes::KeyPatternShorterThanBound || + ErrorCodes::isShutdownError(swAllDocumentsInRangeDeleted.getStatus()) || + ErrorCodes::isNotPrimaryError(swAllDocumentsInRangeDeleted.getStatus()); }) .on(executor, CancellationToken::uncancelable()) .ignoreValue(); @@ -502,15 +508,6 @@ void deleteRangeDeletionTasksForRename(OperationContext* opCtx, BSON(RangeDeletionTask::kNssFieldName << toNss.ns())); } -void markAsProcessingRangeDeletionTask(OperationContext* opCtx, const UUID& migrationId) { - PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace); - auto query = BSON(RangeDeletionTask::kIdFieldName << migrationId); - static const auto update = - BSON("$set" << BSON(RangeDeletionTask::kProcessingFieldName << true)); - - store.update(opCtx, query, update, WriteConcerns::kLocalWriteConcern); -} - SharedSemiFuture<void> removeDocumentsInRange( const std::shared_ptr<executor::TaskExecutor>& executor, SemiFuture<void> waitForActiveQueriesToComplete, diff --git a/src/mongo/db/s/range_deletion_util.h b/src/mongo/db/s/range_deletion_util.h index 9d30bcc577c..1c2fdffef1d 100644 --- a/src/mongo/db/s/range_deletion_util.h +++ b/src/mongo/db/s/range_deletion_util.h @@ -28,9 +28,8 @@ */ #pragma once -#include <list> - #include <boost/optional.hpp> +#include <list> #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/namespace_string.h" @@ -40,18 +39,6 @@ namespace mongo { -class BSONObj; - -// The maximum number of documents to delete in a single batch during range deletion. -// secondaryThrottle and rangeDeleterBatchDelayMS apply between each batch. -// Must be positive or 0 (the default), which means to use the value of -// internalQueryExecYieldIterations (or 1 if that's negative or zero). -extern AtomicWord<int> rangeDeleterBatchSize; - -// After completing a batch of document deletions, the time in millis to wait before commencing the -// next batch of deletions. -extern AtomicWord<int> rangeDeleterBatchDelayMS; - /** * Acquires the config db lock in IX mode and the collection lock for config.rangeDeletions in X * mode. @@ -115,11 +102,6 @@ void deleteRangeDeletionTasksForRename(OperationContext* opCtx, const NamespaceString& toNss); /** - * Sets the processing field on the range deletion document for the given migration id. - */ -void markAsProcessingRangeDeletionTask(OperationContext* opCtx, const UUID& migrationId); - -/** * Computes and sets the numOrphanDocs field for each document in `config.rangeDeletions` (skips * documents referring to older incarnations of a collection) */ diff --git a/src/mongo/db/s/sharding_runtime_d_params.idl b/src/mongo/db/s/sharding_runtime_d_params.idl index 23d298f0d6b..aeb0c8b676a 100644 --- a/src/mongo/db/s/sharding_runtime_d_params.idl +++ b/src/mongo/db/s/sharding_runtime_d_params.idl @@ -32,8 +32,10 @@ server_parameters: rangeDeleterBatchSize: description: >- The maximum number of documents in each batch to delete during the cleanup stage of chunk - migration (or the cleanupOrphaned command). A value of 0 indicates that the system chooses - the default value (INT_MAX). + migration (or the cleanupOrphaned command). Between each batch, secondaryThrottle and + rangeDeleterBatchDelayMS will apply. + + A value of 0 indicates that the system chooses the default value (INT_MAX). set_at: [startup, runtime] cpp_vartype: AtomicWord<int> cpp_varname: rangeDeleterBatchSize @@ -110,7 +112,7 @@ server_parameters: default: 10 orphanCleanupDelaySecs: - description: 'How long to wait before starting cleanup of an emigrated chunk range.' + description: How long to wait before starting cleanup of an emigrated chunk range. set_at: [startup, runtime] cpp_vartype: AtomicWord<int> cpp_varname: orphanCleanupDelaySecs |