summaryrefslogtreecommitdiff
path: root/src/mongo/db/s
diff options
context:
space:
mode:
authorMatthew Saltz <matthew.saltz@mongodb.com>2020-01-15 19:38:42 +0000
committerevergreen <evergreen@mongodb.com>2020-01-15 19:38:42 +0000
commitc9dbf657e1ef0fdda8a32d1080d4a7c3bb361c5e (patch)
treede1a02312f003c5da8c11fdd19b1635ce11bae7d /src/mongo/db/s
parentb522b177cb8d899ea167ad31ba0e4ac461dfeeb5 (diff)
downloadmongo-c9dbf657e1ef0fdda8a32d1080d4a7c3bb361c5e.tar.gz
SERVER-45024 Integrate refactored range deletion functionality and remove CollectionRangeDeleter
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r--src/mongo/db/s/SConscript2
-rw-r--r--src/mongo/db/s/cleanup_orphaned_cmd.cpp6
-rw-r--r--src/mongo/db/s/collection_metadata_filtering_test.cpp2
-rw-r--r--src/mongo/db/s/collection_range_deleter.cpp559
-rw-r--r--src/mongo/db/s/collection_range_deleter.h220
-rw-r--r--src/mongo/db/s/collection_range_deleter_test.cpp421
-rw-r--r--src/mongo/db/s/collection_sharding_runtime.cpp22
-rw-r--r--src/mongo/db/s/collection_sharding_runtime.h32
-rw-r--r--src/mongo/db/s/collection_sharding_state.cpp13
-rw-r--r--src/mongo/db/s/collection_sharding_state.h5
-rw-r--r--src/mongo/db/s/collection_sharding_state_factory_embedded.cpp2
-rw-r--r--src/mongo/db/s/collection_sharding_state_factory_shard.cpp14
-rw-r--r--src/mongo/db/s/collection_sharding_state_test.cpp2
-rw-r--r--src/mongo/db/s/metadata_manager.cpp288
-rw-r--r--src/mongo/db/s/metadata_manager.h105
-rw-r--r--src/mongo/db/s/metadata_manager_test.cpp49
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp19
-rw-r--r--src/mongo/db/s/migration_destination_manager.h3
-rw-r--r--src/mongo/db/s/migration_source_manager.cpp9
-rw-r--r--src/mongo/db/s/migration_util.cpp8
-rw-r--r--src/mongo/db/s/migration_util_test.cpp3
-rw-r--r--src/mongo/db/s/range_deletion_util.cpp12
22 files changed, 236 insertions, 1560 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 8f11b900e12..823a76b04a2 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -39,7 +39,6 @@ env.Library(
'active_shard_collection_registry.cpp',
'chunk_move_write_concern_options.cpp',
'chunk_splitter.cpp',
- 'collection_range_deleter.cpp',
'collection_sharding_runtime.cpp',
'collection_sharding_state_factory_shard.cpp',
'config_server_op_observer.cpp',
@@ -407,7 +406,6 @@ env.CppUnitTest(
source=[
'collection_metadata_filtering_test.cpp',
'collection_metadata_test.cpp',
- 'collection_range_deleter_test.cpp',
'collection_sharding_state_test.cpp',
'metadata_manager_test.cpp',
'persistent_task_store_test.cpp',
diff --git a/src/mongo/db/s/cleanup_orphaned_cmd.cpp b/src/mongo/db/s/cleanup_orphaned_cmd.cpp
index 146198e7416..444fd536a50 100644
--- a/src/mongo/db/s/cleanup_orphaned_cmd.cpp
+++ b/src/mongo/db/s/cleanup_orphaned_cmd.cpp
@@ -73,7 +73,7 @@ CleanupResult cleanupOrphanedData(OperationContext* opCtx,
std::string* errMsg) {
BSONObj startingFromKey = startingFromKeyConst;
boost::optional<ChunkRange> targetRange;
- CollectionShardingRuntime::CleanupNotification notifn;
+ SharedSemiFuture<void> cleanupCompleteFuture;
{
AutoGetCollection autoColl(opCtx, ns, MODE_IX);
@@ -109,7 +109,7 @@ CleanupResult cleanupOrphanedData(OperationContext* opCtx,
*stoppedAtKey = targetRange->getMax();
- notifn = css->cleanUpRange(*targetRange, CollectionShardingRuntime::kNow);
+ cleanupCompleteFuture = css->cleanUpRange(*targetRange, CollectionShardingRuntime::kNow);
}
// Sleep waiting for our own deletion. We don't actually care about any others, so there is no
@@ -119,7 +119,7 @@ CleanupResult cleanupOrphanedData(OperationContext* opCtx,
<< redact(startingFromKey) << ", removing next orphan range "
<< redact(targetRange->toString()) << "; waiting...";
- Status result = notifn.waitStatus(opCtx);
+ Status result = cleanupCompleteFuture.getNoThrow(opCtx);
LOG(1) << "Finished waiting for last " << ns.toString() << " orphan range cleanup";
diff --git a/src/mongo/db/s/collection_metadata_filtering_test.cpp b/src/mongo/db/s/collection_metadata_filtering_test.cpp
index 31eb7655e7e..a9c251f325f 100644
--- a/src/mongo/db/s/collection_metadata_filtering_test.cpp
+++ b/src/mongo/db/s/collection_metadata_filtering_test.cpp
@@ -106,7 +106,7 @@ protected:
}
_manager = std::make_shared<MetadataManager>(
- getServiceContext(), kNss, executor().get(), CollectionMetadata(cm, ShardId("0")));
+ getServiceContext(), kNss, executor(), CollectionMetadata(cm, ShardId("0")));
auto& oss = OperationShardingState::get(operationContext());
const auto version = cm->getVersion(ShardId("0"));
diff --git a/src/mongo/db/s/collection_range_deleter.cpp b/src/mongo/db/s/collection_range_deleter.cpp
deleted file mode 100644
index ef93a23c06f..00000000000
--- a/src/mongo/db/s/collection_range_deleter.cpp
+++ /dev/null
@@ -1,559 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/s/collection_range_deleter.h"
-
-#include <algorithm>
-#include <utility>
-
-#include "mongo/db/catalog/index_catalog.h"
-#include "mongo/db/catalog_raii.h"
-#include "mongo/db/client.h"
-#include "mongo/db/concurrency/write_conflict_exception.h"
-#include "mongo/db/dbhelpers.h"
-#include "mongo/db/exec/delete.h"
-#include "mongo/db/exec/working_set_common.h"
-#include "mongo/db/index/index_descriptor.h"
-#include "mongo/db/keypattern.h"
-#include "mongo/db/operation_context.h"
-#include "mongo/db/query/internal_plans.h"
-#include "mongo/db/query/plan_yield_policy.h"
-#include "mongo/db/query/query_knobs_gen.h"
-#include "mongo/db/query/query_planner.h"
-#include "mongo/db/repl/repl_client_info.h"
-#include "mongo/db/s/collection_sharding_runtime.h"
-#include "mongo/db/s/persistent_task_store.h"
-#include "mongo/db/s/range_deletion_task_gen.h"
-#include "mongo/db/s/sharding_runtime_d_params_gen.h"
-#include "mongo/db/s/sharding_state.h"
-#include "mongo/db/s/sharding_statistics.h"
-#include "mongo/db/service_context.h"
-#include "mongo/db/storage/remove_saver.h"
-#include "mongo/db/write_concern.h"
-#include "mongo/executor/task_executor.h"
-#include "mongo/util/log.h"
-#include "mongo/util/scopeguard.h"
-#include "mongo/util/str.h"
-
-namespace mongo {
-
-namespace {
-
-using Deletion = CollectionRangeDeleter::Deletion;
-using DeleteNotification = CollectionRangeDeleter::DeleteNotification;
-
-const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority,
- WriteConcernOptions::SyncMode::UNSET,
- WriteConcernOptions::kWriteConcernTimeoutSharding);
-
-MONGO_FAIL_POINT_DEFINE(hangBeforeDoingDeletion);
-
-boost::optional<DeleteNotification> checkOverlap(std::list<Deletion> const& deletions,
- ChunkRange const& range) {
- // Start search with newest entries by using reverse iterators
- auto it = find_if(deletions.rbegin(), deletions.rend(), [&](auto& cleanee) {
- return bool(cleanee.range.overlapWith(range));
- });
-
- if (it != deletions.rend())
- return it->notification;
-
- return boost::none;
-}
-
-/**
- * Performs the deletion of up to maxToDelete entries within the range in progress. Must be
- * called under the collection lock.
- *
- * Returns the number of documents deleted, 0 if done with the range, or bad status if deleting
- * the range failed.
- */
-StatusWith<int> doDeletion(OperationContext* opCtx,
- Collection* collection,
- BSONObj const& keyPattern,
- ChunkRange const& range,
- int maxToDelete,
- bool throwWriteConflictForTest) {
- invariant(collection != nullptr);
-
- auto const& nss = collection->ns();
-
- // The IndexChunk has a keyPattern that may apply to more than one index - we need to
- // select the index and get the full index keyPattern here.
- auto catalog = collection->getIndexCatalog();
- const IndexDescriptor* idx = catalog->findShardKeyPrefixedIndex(opCtx, keyPattern, false);
- if (!idx) {
- std::string msg = str::stream()
- << "Unable to find shard key index for " << keyPattern.toString() << " in " << nss.ns();
- LOG(0) << msg;
- return {ErrorCodes::InternalError, msg};
- }
-
- // Extend bounds to match the index we found
- const KeyPattern indexKeyPattern(idx->keyPattern());
- const auto extend = [&](const auto& key) {
- return Helpers::toKeyFormat(indexKeyPattern.extendRangeBound(key, false));
- };
-
- const auto min = extend(range.getMin());
- const auto max = extend(range.getMax());
-
- LOG(1) << "begin removal of " << min << " to " << max << " in " << nss.ns();
-
- const auto indexName = idx->indexName();
- const IndexDescriptor* descriptor =
- collection->getIndexCatalog()->findIndexByName(opCtx, indexName);
- if (!descriptor) {
- std::string msg = str::stream()
- << "shard key index with name " << indexName << " on '" << nss.ns() << "' was dropped";
- LOG(0) << msg;
- return {ErrorCodes::InternalError, msg};
- }
-
- auto deleteStageParams = std::make_unique<DeleteStageParams>();
- deleteStageParams->fromMigrate = true;
- deleteStageParams->isMulti = true;
- deleteStageParams->returnDeleted = true;
-
- if (serverGlobalParams.moveParanoia) {
- deleteStageParams->removeSaver =
- std::make_unique<RemoveSaver>("moveChunk", nss.ns(), "cleaning");
- }
-
- auto exec = InternalPlanner::deleteWithIndexScan(opCtx,
- collection,
- std::move(deleteStageParams),
- descriptor,
- min,
- max,
- BoundInclusion::kIncludeStartKeyOnly,
- PlanExecutor::YIELD_MANUAL,
- InternalPlanner::FORWARD);
-
- if (MONGO_unlikely(hangBeforeDoingDeletion.shouldFail())) {
- LOG(0) << "Hit hangBeforeDoingDeletion failpoint";
- hangBeforeDoingDeletion.pauseWhileSet(opCtx);
- }
-
- PlanYieldPolicy planYieldPolicy(exec.get(), PlanExecutor::YIELD_MANUAL);
-
- int numDeleted = 0;
- do {
- BSONObj deletedObj;
-
- // TODO SERVER-41606: Remove this function when we refactor CollectionRangeDeleter.
- if (throwWriteConflictForTest)
- throw WriteConflictException();
-
- PlanExecutor::ExecState state = exec->getNext(&deletedObj, nullptr);
-
- if (state == PlanExecutor::IS_EOF) {
- break;
- }
-
- if (state == PlanExecutor::FAILURE) {
- warning() << PlanExecutor::statestr(state) << " - cursor error while trying to delete "
- << redact(min) << " to " << redact(max) << " in " << nss
- << ": FAILURE, stats: " << Explain::getWinningPlanStats(exec.get());
- break;
- }
-
- invariant(PlanExecutor::ADVANCED == state);
- ShardingStatistics::get(opCtx).countDocsDeletedOnDonor.addAndFetch(1);
-
- } while (++numDeleted < maxToDelete);
-
- return numDeleted;
-}
-
-} // namespace
-
-CollectionRangeDeleter::CollectionRangeDeleter() = default;
-
-CollectionRangeDeleter::~CollectionRangeDeleter() {
- // Notify anybody still sleeping on orphan ranges
- clear({ErrorCodes::InterruptedDueToReplStateChange, "Collection sharding metadata discarded"});
-}
-
-boost::optional<Date_t> CollectionRangeDeleter::cleanUpNextRange(
- OperationContext* opCtx,
- NamespaceString const& nss,
- UUID collectionUuid,
- int maxToDelete,
- CollectionRangeDeleter* forTestOnly) {
-
- if (maxToDelete <= 0) {
- maxToDelete = rangeDeleterBatchSize.load();
- if (maxToDelete <= 0) {
- maxToDelete = std::max(int(internalQueryExecYieldIterations.load()), 1);
- }
- }
-
- StatusWith<int> swNumDeleted = 0;
-
- auto range = boost::optional<ChunkRange>(boost::none);
- auto notification = DeleteNotification();
-
- {
- UninterruptibleLockGuard noInterrupt(opCtx->lockState());
- AutoGetCollection autoColl(opCtx, nss, MODE_IX);
- auto* const collection = autoColl.getCollection();
- auto* const csr = CollectionShardingRuntime::get(opCtx, nss);
- auto& metadataManager = csr->_metadataManager;
-
- if (!_checkCollectionMetadataStillValid(
- nss, collectionUuid, forTestOnly, collection, metadataManager)) {
- return boost::none;
- }
-
- auto* const self = forTestOnly ? forTestOnly : &metadataManager->_rangesToClean;
-
- bool writeOpLog = false;
-
- {
- stdx::lock_guard<Latch> scopedLock(csr->_metadataManager->_managerLock);
- if (self->isEmpty()) {
- LOG(1) << "No further range deletions scheduled on " << nss.ns();
- return boost::none;
- }
-
- auto& orphans = self->_orphans;
- if (orphans.empty()) {
- // We have delayed deletions; see if any are ready.
- auto& df = self->_delayedOrphans.front();
- if (df.whenToDelete > Date_t::now()) {
- LOG(0) << "Deferring deletion of " << nss.ns() << " range "
- << redact(df.range.toString()) << " until " << df.whenToDelete;
- return df.whenToDelete;
- }
-
- // Move a single range from _delayedOrphans to _orphans
- orphans.splice(orphans.end(), self->_delayedOrphans, self->_delayedOrphans.begin());
- LOG(1) << "Proceeding with deferred deletion of " << nss.ns() << " range "
- << redact(orphans.front().range.toString());
-
- writeOpLog = true;
- }
-
- invariant(!orphans.empty());
- const auto& frontRange = orphans.front().range;
- range.emplace(frontRange.getMin().getOwned(), frontRange.getMax().getOwned());
- notification = orphans.front().notification;
- }
-
- invariant(range);
-
- if (writeOpLog) {
- // Secondaries will watch for this update, and kill any queries that may depend on
- // documents in the range -- excepting any queries with a read-concern option
- // 'ignoreChunkMigration'
- try {
- AutoGetCollection autoAdmin(
- opCtx, NamespaceString::kServerConfigurationNamespace, MODE_IX);
-
- Helpers::upsert(opCtx,
- NamespaceString::kServerConfigurationNamespace.ns(),
- BSON("_id"
- << "startRangeDeletion"
- << "ns" << nss.ns() << "uuid" << collectionUuid << "min"
- << range->getMin() << "max" << range->getMax()));
- } catch (const DBException& e) {
- stdx::lock_guard<Latch> scopedLock(csr->_metadataManager->_managerLock);
- csr->_metadataManager->_clearAllCleanups(
- scopedLock,
- e.toStatus("cannot push startRangeDeletion record to Op Log,"
- " abandoning scheduled range deletions"));
- return boost::none;
- }
- }
-
- const auto scopedCollectionMetadata = metadataManager->getActiveMetadata(boost::none);
- const auto& metadata = *scopedCollectionMetadata;
-
- try {
- swNumDeleted = doDeletion(opCtx,
- collection,
- metadata.getKeyPattern(),
- *range,
- maxToDelete,
- // _throwWriteConflictForTest is only used in unit tests, so
- // taking the MetadataManager lock is not required.
- self->_throwWriteConflictForTest);
- if (swNumDeleted.isOK()) {
- LOG(0) << "Deleted " << swNumDeleted.getValue() << " documents in pass.";
- }
- } catch (const DBException& e) {
- swNumDeleted = e.toStatus();
- warning() << e.what();
- }
- } // drop autoColl
-
- bool continueDeleting = swNumDeleted.isOK() && swNumDeleted.getValue() > 0;
-
- if (swNumDeleted == ErrorCodes::WriteConflict) {
- CurOp::get(opCtx)->debug().additiveMetrics.incrementWriteConflicts(1);
- continueDeleting = true;
- }
-
- // If there's an error or if there are no more documents to delete, take this branch.
- // This branch means that we will NOT continue deleting documents from this range.
- if (!continueDeleting) {
- if (swNumDeleted.isOK()) {
- LOG(0) << "No documents remain to delete in " << nss << " range "
- << redact(range->toString());
- }
-
- // Wait for majority replication even when swNumDeleted isn't OK or == 0, because it might
- // have been OK and/or > 0 previously, and the deletions must be persistent before notifying
- // clients in _pop().
-
- LOG(0) << "Waiting for majority replication of local deletions in " << nss.ns() << " range "
- << redact(range->toString());
-
- // Wait for replication outside the lock
- const auto replicationStatus = [&] {
- try {
- repl::ReplClientInfo::forClient(opCtx->getClient())
- .setLastOpToSystemLastOpTime(opCtx);
- const auto clientOpTime =
- repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
-
- WriteConcernResult unusedWCResult;
- return waitForWriteConcern(
- opCtx, clientOpTime, kMajorityWriteConcern, &unusedWCResult);
- } catch (const DBException& e) {
- return e.toStatus();
- }
- }();
-
- // Get the lock again to finish off this range (including notifying, if necessary).
- // Don't allow lock interrupts while cleaning up.
- bool finishedDeleting = false;
- {
- UninterruptibleLockGuard noInterrupt(opCtx->lockState());
- AutoGetCollection autoColl(opCtx, nss, MODE_IX);
- auto* const collection = autoColl.getCollection();
- auto* const csr = CollectionShardingRuntime::get(opCtx, nss);
- auto& metadataManager = csr->_metadataManager;
-
- if (!_checkCollectionMetadataStillValid(
- nss, collectionUuid, forTestOnly, collection, metadataManager)) {
- return boost::none;
- }
-
- auto* const self = forTestOnly ? forTestOnly : &metadataManager->_rangesToClean;
-
- stdx::lock_guard<Latch> scopedLock(csr->_metadataManager->_managerLock);
-
- if (!replicationStatus.isOK()) {
- LOG(0) << "Error when waiting for write concern after removing " << nss << " range "
- << redact(range->toString()) << " : " << redact(replicationStatus.reason());
-
- // If range were already popped (e.g. by dropping nss during the waitForWriteConcern
- // above) its notification would have been triggered, so this check suffices to
- // ensure that it is safe to pop the range here
- if (!notification.ready()) {
- invariant(!self->isEmpty() &&
- self->_orphans.front().notification == notification);
- LOG(0) << "Abandoning deletion of latest range in " << nss.ns()
- << " after local "
- << "deletions because of replication failure";
- self->_pop(replicationStatus);
- }
- } else {
- LOG(0) << "Finished deleting documents in " << nss.ns() << " range "
- << redact(range->toString());
-
- finishedDeleting = true;
- self->_pop(swNumDeleted.getStatus());
- }
-
- if (!self->_orphans.empty()) {
- LOG(1) << "Deleting " << nss.ns() << " range "
- << redact(self->_orphans.front().range.toString()) << " next.";
- }
- }
-
- if (finishedDeleting) {
- try {
- PersistentTaskStore<RangeDeletionTask> store(
- opCtx, NamespaceString::kRangeDeletionNamespace);
- store.remove(opCtx,
- QUERY(RangeDeletionTask::kCollectionUuidFieldName
- << collectionUuid << RangeDeletionTask::kRangeFieldName
- << range->toBSON()));
- } catch (const DBException& e) {
- LOG(0) << "Failed to delete range deletion task for range " << range.get()
- << " in collection " << nss << causedBy(e.what());
- }
- }
-
- return Date_t::now() + Milliseconds(rangeDeleterBatchDelayMS.load());
- }
-
- invariant(range);
- invariant(continueDeleting);
-
- notification.abandon();
- return Date_t::now() + Milliseconds(rangeDeleterBatchDelayMS.load());
-}
-
-bool CollectionRangeDeleter::_checkCollectionMetadataStillValid(
- const NamespaceString& nss,
- UUID collectionUuid,
- CollectionRangeDeleter* forTestOnly,
- Collection* collection,
- std::shared_ptr<MetadataManager> metadataManager) {
-
- if (!metadataManager) {
- LOG(0) << "Abandoning any range deletions because the metadata for " << nss.ns()
- << " was reset";
- return false;
- }
-
- if (!forTestOnly && (!collection)) {
- if (!collection) {
- LOG(0) << "Abandoning any range deletions left over from dropped " << nss.ns();
- } else {
- LOG(0) << "Abandoning any range deletions left over from previously sharded"
- << nss.ns();
- }
-
- stdx::lock_guard<Latch> lk(metadataManager->_managerLock);
- metadataManager->_clearAllCleanups(lk);
- return false;
- }
-
- if (!forTestOnly && collection->uuid() != collectionUuid) {
- LOG(1) << "Abandoning range deletion task for " << nss.ns() << " with UUID "
- << collectionUuid << " because UUID of " << nss.ns() << "has changed (current is "
- << collection->uuid() << ")";
- return false;
- }
-
- return true;
-}
-
-auto CollectionRangeDeleter::overlaps(ChunkRange const& range) const
- -> boost::optional<DeleteNotification> {
- auto result = checkOverlap(_orphans, range);
- if (result) {
- return result;
- }
- return checkOverlap(_delayedOrphans, range);
-}
-
-boost::optional<Date_t> CollectionRangeDeleter::add(std::list<Deletion> ranges) {
- // We ignore the case of overlapping, or even equal, ranges. Deleting overlapping ranges is
- // quick.
- const bool wasScheduledImmediate = !_orphans.empty();
- const bool wasScheduledLater = !_delayedOrphans.empty();
-
- while (!ranges.empty()) {
- if (ranges.front().whenToDelete != Date_t{}) {
- _delayedOrphans.splice(_delayedOrphans.end(), ranges, ranges.begin());
- } else {
- _orphans.splice(_orphans.end(), ranges, ranges.begin());
- }
- }
-
- if (wasScheduledImmediate) {
- return boost::none; // already scheduled
- } else if (!_orphans.empty()) {
- return Date_t{};
- } else if (wasScheduledLater) {
- return boost::none; // already scheduled
- } else if (!_delayedOrphans.empty()) {
- return _delayedOrphans.front().whenToDelete;
- }
-
- return boost::none;
-}
-
-void CollectionRangeDeleter::append(BSONObjBuilder* builder) const {
- BSONArrayBuilder arr(builder->subarrayStart("rangesToClean"));
- for (auto const& entry : _orphans) {
- BSONObjBuilder obj;
- entry.range.append(&obj);
- arr.append(obj.done());
- }
- for (auto const& entry : _delayedOrphans) {
- BSONObjBuilder obj;
- entry.range.append(&obj);
- arr.append(obj.done());
- }
- arr.done();
-}
-
-size_t CollectionRangeDeleter::size() const {
- return _orphans.size() + _delayedOrphans.size();
-}
-
-bool CollectionRangeDeleter::isEmpty() const {
- return _orphans.empty() && _delayedOrphans.empty();
-}
-
-void CollectionRangeDeleter::clear(Status status) {
- for (auto& range : _orphans) {
- range.notification.notify(status); // wake up anything still waiting
- }
- _orphans.clear();
- for (auto& range : _delayedOrphans) {
- range.notification.notify(status); // wake up anything still waiting
- }
- _delayedOrphans.clear();
-}
-
-void CollectionRangeDeleter::_pop(Status result) {
- _orphans.front().notification.notify(result); // wake up waitForClean
- _orphans.pop_front();
-}
-
-// DeleteNotification
-
-CollectionRangeDeleter::DeleteNotification::DeleteNotification()
- : _notification(std::make_shared<Notification<Status>>()) {}
-
-CollectionRangeDeleter::DeleteNotification::DeleteNotification(Status status)
- : _notification(std::make_shared<Notification<Status>>(std::move(status))) {}
-
-Status CollectionRangeDeleter::DeleteNotification::waitStatus(OperationContext* opCtx) {
- try {
- return _notification->get(opCtx);
- } catch (const DBException& ex) {
- _notification = std::make_shared<Notification<Status>>(ex.toStatus());
- throw;
- }
-}
-
-} // namespace mongo
diff --git a/src/mongo/db/s/collection_range_deleter.h b/src/mongo/db/s/collection_range_deleter.h
deleted file mode 100644
index f6e8d1b136f..00000000000
--- a/src/mongo/db/s/collection_range_deleter.h
+++ /dev/null
@@ -1,220 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-#pragma once
-
-#include <list>
-
-#include "mongo/db/namespace_string.h"
-#include "mongo/executor/task_executor.h"
-#include "mongo/s/catalog/type_chunk.h"
-#include "mongo/util/concurrency/notification.h"
-#include "mongo/util/time_support.h"
-
-namespace mongo {
-
-class BSONObj;
-class Collection;
-class MetadataManager;
-class OperationContext;
-
-// The maximum number of documents to delete in a single batch during range deletion.
-// secondaryThrottle and rangeDeleterBatchDelayMS apply between each batch.
-// Must be positive or 0 (the default), which means to use the value of
-// internalQueryExecYieldIterations (or 1 if that's negative or zero).
-extern AtomicWord<int> rangeDeleterBatchSize;
-
-// After completing a batch of document deletions, the time in millis to wait before commencing the
-// next batch of deletions.
-extern AtomicWord<int> rangeDeleterBatchDelayMS;
-
-class CollectionRangeDeleter {
- CollectionRangeDeleter(const CollectionRangeDeleter&) = delete;
- CollectionRangeDeleter& operator=(const CollectionRangeDeleter&) = delete;
-
-public:
- /**
- * This is an object n that asynchronously changes state when a scheduled range deletion
- * completes or fails. Call n.ready() to discover if the event has already occurred. Call
- * n.waitStatus(opCtx) to sleep waiting for the event, and get its result. If the wait is
- * interrupted, waitStatus throws.
- *
- * It is an error to destroy a returned CleanupNotification object n unless either n.ready()
- * is true or n.abandon() has been called. After n.abandon(), n is in a moved-from state.
- */
- class DeleteNotification {
- public:
- DeleteNotification();
- DeleteNotification(Status status);
-
- // The following default declarations are needed because the presence of a non-trivial
- // destructor forbids the compiler to generate the declarations itself, but the definitions
- // it generates are fine.
- DeleteNotification(DeleteNotification&& notifn) = default;
- DeleteNotification& operator=(DeleteNotification&& notifn) = default;
- DeleteNotification(DeleteNotification const& notifn) = default;
- DeleteNotification& operator=(DeleteNotification const& notifn) = default;
-
- ~DeleteNotification() {
- // Can be null only if moved from
- dassert(!_notification || *_notification || _notification.use_count() == 1);
- }
-
- void notify(Status status) const {
- _notification->set(status);
- }
-
- /**
- * Sleeps waiting for notification, and returns notify's argument. On interruption, throws;
- * calling waitStatus afterward returns failed status.
- */
- Status waitStatus(OperationContext* opCtx);
-
- bool ready() const {
- return bool(*_notification);
- }
- void abandon() {
- _notification = nullptr;
- }
- bool operator==(DeleteNotification const& other) const {
- return _notification == other._notification;
- }
- bool operator!=(DeleteNotification const& other) const {
- return _notification != other._notification;
- }
-
- private:
- std::shared_ptr<Notification<Status>> _notification;
- };
-
- struct Deletion {
- Deletion(ChunkRange r, Date_t when) : range(std::move(r)), whenToDelete(when) {}
-
- ChunkRange range;
- Date_t whenToDelete; // A value of Date_t{} means immediately.
- DeleteNotification notification{};
- };
-
- CollectionRangeDeleter();
- ~CollectionRangeDeleter();
-
- //
- // All of the following members must be called only while the containing MetadataManager's lock
- // is held (or in its destructor), except cleanUpNextRange.
- //
-
- /**
- * Splices range's elements to the list to be cleaned up by the deleter thread. Deletions d
- * with d.delay == true are added to the delayed queue, and scheduled at time `later`.
- * Returns the time to begin deletions, if needed, or boost::none if deletions are already
- * scheduled.
- */
- boost::optional<Date_t> add(std::list<Deletion> ranges);
-
- /**
- * Reports whether the argument range overlaps any of the ranges to clean. If there is overlap,
- * it returns a notification that will be signaled when the currently newest overlapping range
- * completes or fails. If there is no overlap, the result is boost::none. After a successful
- * removal, the caller should call again to ensure no other range overlaps the argument.
- * (See CollectionShardingState::waitForClean and MetadataManager::trackOrphanedDataCleanup for
- * an example use.)
- */
- boost::optional<DeleteNotification> overlaps(ChunkRange const& range) const;
-
- /**
- * Reports the number of ranges remaining to be cleaned up.
- */
- size_t size() const;
-
- bool isEmpty() const;
-
- /*
- * Notifies with the specified status anything waiting on ranges scheduled, and then discards
- * the ranges and notifications. Is called in the destructor.
- */
- void clear(Status status);
-
- /*
- * Append a representation of self to the specified builder.
- */
- void append(BSONObjBuilder* builder) const;
-
- /**
- * If any range deletions are scheduled, deletes up to maxToDelete documents, notifying
- * watchers of ranges as they are done being deleted. It performs its own collection locking, so
- * it must be called without locks.
- *
- * If it should be scheduled to run again because there might be more documents to delete,
- * returns the time to begin, or boost::none otherwise.
- *
- * Negative (or zero) value for 'maxToDelete' indicates some canonical default should be used.
- *
- * Argument 'forTestOnly' is used in unit tests that exercise the CollectionRangeDeleter class,
- * so that they do not need to set up CollectionShardingState and MetadataManager objects.
- */
- static boost::optional<Date_t> cleanUpNextRange(OperationContext*,
- NamespaceString const& nss,
- UUID collectionUuid,
- int maxToDelete = 0,
- CollectionRangeDeleter* forTestOnly = nullptr);
-
- // TODO SERVER-41606: Remove this function when we refactor CollectionRangeDeleter.
- void setDoDeletionShouldThrowWriteConflictForTest(bool on) {
- _throwWriteConflictForTest = on;
- }
-
-private:
- /**
- * Verifies that the metadata for the collection to be cleaned up is still valid. Makes sure
- * the collection has not been dropped (or dropped then recreated).
- */
- static bool _checkCollectionMetadataStillValid(
- const NamespaceString& nss,
- UUID collectionUuid,
- CollectionRangeDeleter* forTestOnly,
- Collection* collection,
- std::shared_ptr<MetadataManager> metadataManager);
-
- /**
- * Removes the latest-scheduled range from the ranges to be cleaned up, and notifies any
- * interested callers of this->overlaps(range) with specified status.
- */
- void _pop(Status status);
-
- // TODO SERVER-41606: Remove this function when we refactor CollectionRangeDeleter.
- bool _throwWriteConflictForTest{false};
-
- /**
- * Ranges scheduled for deletion. The front of the list will be in active process of deletion.
- * As each range is completed, its notification is signaled before it is popped.
- */
- std::list<Deletion> _orphans;
- std::list<Deletion> _delayedOrphans;
-};
-
-} // namespace mongo
diff --git a/src/mongo/db/s/collection_range_deleter_test.cpp b/src/mongo/db/s/collection_range_deleter_test.cpp
deleted file mode 100644
index f6c76dd9b2f..00000000000
--- a/src/mongo/db/s/collection_range_deleter_test.cpp
+++ /dev/null
@@ -1,421 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/bson/bsonobj.h"
-#include "mongo/bson/bsonobjbuilder.h"
-#include "mongo/client/query.h"
-#include "mongo/client/remote_command_targeter_mock.h"
-#include "mongo/db/catalog_raii.h"
-#include "mongo/db/client.h"
-#include "mongo/db/dbdirectclient.h"
-#include "mongo/db/index/index_descriptor.h"
-#include "mongo/db/keypattern.h"
-#include "mongo/db/repl/replication_coordinator_mock.h"
-#include "mongo/db/s/collection_sharding_runtime.h"
-#include "mongo/db/s/sharding_state.h"
-#include "mongo/s/balancer_configuration.h"
-#include "mongo/s/chunk_version.h"
-#include "mongo/s/client/shard_registry.h"
-#include "mongo/s/shard_server_test_fixture.h"
-#include "mongo/unittest/unittest.h"
-
-namespace mongo {
-namespace {
-
-using unittest::assertGet;
-
-using Deletion = CollectionRangeDeleter::Deletion;
-
-const NamespaceString kNss = NamespaceString("foo", "bar");
-const std::string kShardKey = "_id";
-const BSONObj kShardKeyPattern = BSON(kShardKey << 1);
-const NamespaceString kAdminSysVer = NamespaceString("admin", "system.version");
-
-class CollectionRangeDeleterTest : public ShardServerTestFixture {
-protected:
- // Required because default constructor of UUID is private.
- CollectionRangeDeleterTest() : _uuid(UUID::gen()) {}
- void setUp() override {
- ShardServerTestFixture::setUp();
-
- // Reset for each test.
- _uuid = UUID::gen();
-
- DBDirectClient client(operationContext());
- client.createCollection(kNss.ns());
- auto epoch = OID::gen();
-
- const KeyPattern keyPattern(kShardKeyPattern);
- auto rt = RoutingTableHistory::makeNew(
- kNss,
- _uuid,
- keyPattern,
- nullptr,
- false,
- epoch,
- {ChunkType(kNss,
- ChunkRange{keyPattern.globalMin(), keyPattern.globalMax()},
- ChunkVersion(1, 0, epoch),
- ShardId("otherShard"))});
- std::shared_ptr<ChunkManager> cm = std::make_shared<ChunkManager>(rt, Timestamp(100, 0));
-
- AutoGetCollection autoColl(operationContext(), kNss, MODE_IX);
- auto* const css = CollectionShardingRuntime::get(operationContext(), kNss);
- css->setFilteringMetadata(operationContext(), CollectionMetadata(cm, ShardId("thisShard")));
- }
-
- void tearDown() override {
- {
- AutoGetCollection autoColl(operationContext(), kNss, MODE_IX);
- auto* const css = CollectionShardingRuntime::get(operationContext(), kNss);
- css->clearFilteringMetadata();
- }
-
- ShardServerTestFixture::tearDown();
- }
-
- boost::optional<Date_t> next(CollectionRangeDeleter& rangeDeleter, int maxToDelete) {
- return CollectionRangeDeleter::cleanUpNextRange(
- operationContext(), kNss, uuid(), maxToDelete, &rangeDeleter);
- }
-
- std::shared_ptr<RemoteCommandTargeterMock> configTargeter() const {
- return RemoteCommandTargeterMock::get(shardRegistry()->getConfigShard()->getTargeter());
- }
-
- UUID uuid() const {
- return _uuid;
- }
-
- std::unique_ptr<BalancerConfiguration> makeBalancerConfiguration() override {
- return std::make_unique<BalancerConfiguration>();
- }
-
-private:
- UUID _uuid;
-};
-
-// Tests the case that there is nothing in the database.
-TEST_F(CollectionRangeDeleterTest, EmptyDatabase) {
- CollectionRangeDeleter rangeDeleter;
- ASSERT_FALSE(next(rangeDeleter, 1));
-}
-
-// Tests the case that there is data, but it is not in a range to clean.
-TEST_F(CollectionRangeDeleterTest, NoDataInGivenRangeToClean) {
- CollectionRangeDeleter rangeDeleter;
- const BSONObj insertedDoc = BSON(kShardKey << 25);
- DBDirectClient dbclient(operationContext());
- dbclient.insert(kNss.toString(), insertedDoc);
- ASSERT_BSONOBJ_EQ(insertedDoc, dbclient.findOne(kNss.toString(), QUERY(kShardKey << 25)));
- std::list<Deletion> ranges;
- ranges.emplace_back(
- Deletion{ChunkRange{BSON(kShardKey << 0), BSON(kShardKey << 10)}, Date_t{}});
- auto when = rangeDeleter.add(std::move(ranges));
- ASSERT(when && *when == Date_t{});
- ASSERT_EQ(1u, rangeDeleter.size());
- ASSERT_TRUE(next(rangeDeleter, 1));
-
- ASSERT_EQ(0u, rangeDeleter.size());
- ASSERT_BSONOBJ_EQ(insertedDoc, dbclient.findOne(kNss.toString(), QUERY(kShardKey << 25)));
-
- ASSERT_FALSE(next(rangeDeleter, 1));
-}
-
-// Tests the case that there is a single document within a range to clean.
-TEST_F(CollectionRangeDeleterTest, OneDocumentInOneRangeToClean) {
- CollectionRangeDeleter rangeDeleter;
- const BSONObj insertedDoc = BSON(kShardKey << 5);
- DBDirectClient dbclient(operationContext());
- dbclient.insert(kNss.toString(), BSON(kShardKey << 5));
- ASSERT_BSONOBJ_EQ(insertedDoc, dbclient.findOne(kNss.toString(), QUERY(kShardKey << 5)));
-
- std::list<Deletion> ranges;
- auto deletion = Deletion{ChunkRange(BSON(kShardKey << 0), BSON(kShardKey << 10)), Date_t{}};
- ranges.emplace_back(std::move(deletion));
- auto when = rangeDeleter.add(std::move(ranges));
- ASSERT(when && *when == Date_t{});
- ASSERT_TRUE(ranges.empty()); // spliced elements out of it
-
- auto optNotifn = rangeDeleter.overlaps(ChunkRange(BSON(kShardKey << 0), BSON(kShardKey << 10)));
- ASSERT(optNotifn);
- auto notifn = *optNotifn;
- ASSERT(!notifn.ready());
- // actually delete one
- ASSERT_TRUE(next(rangeDeleter, 1));
- ASSERT(!notifn.ready());
-
- ASSERT_EQ(rangeDeleter.size(), 1u);
- // range empty, pop range, notify
- ASSERT_TRUE(next(rangeDeleter, 1));
- ASSERT_TRUE(rangeDeleter.isEmpty());
- ASSERT(notifn.ready() && notifn.waitStatus(operationContext()).isOK());
-
- ASSERT_TRUE(dbclient.findOne(kNss.toString(), QUERY(kShardKey << 5)).isEmpty());
- ASSERT_FALSE(next(rangeDeleter, 1));
- ASSERT_EQUALS(0ULL, dbclient.count(kAdminSysVer, BSON(kShardKey << "startRangeDeletion")));
-}
-
-// Tests the case that there are multiple documents within a range to clean.
-TEST_F(CollectionRangeDeleterTest, MultipleDocumentsInOneRangeToClean) {
- CollectionRangeDeleter rangeDeleter;
- DBDirectClient dbclient(operationContext());
- dbclient.insert(kNss.toString(), BSON(kShardKey << 1));
- dbclient.insert(kNss.toString(), BSON(kShardKey << 2));
- dbclient.insert(kNss.toString(), BSON(kShardKey << 3));
- ASSERT_EQUALS(3ULL, dbclient.count(kNss, BSON(kShardKey << LT << 5)));
-
- std::list<Deletion> ranges;
- auto deletion = Deletion{ChunkRange(BSON(kShardKey << 0), BSON(kShardKey << 10)), Date_t{}};
- ranges.emplace_back(std::move(deletion));
- auto when = rangeDeleter.add(std::move(ranges));
- ASSERT(when && *when == Date_t{});
-
- ASSERT_TRUE(next(rangeDeleter, 100));
- ASSERT_TRUE(next(rangeDeleter, 100));
- ASSERT_EQUALS(0ULL, dbclient.count(kNss, BSON(kShardKey << LT << 5)));
- ASSERT_FALSE(next(rangeDeleter, 100));
- ASSERT_EQUALS(0ULL, dbclient.count(kAdminSysVer, BSON(kShardKey << "startRangeDeletion")));
-}
-
-// Tests the case that there are multiple documents within a range to clean, and the range deleter
-// has a max deletion rate of one document per run.
-TEST_F(CollectionRangeDeleterTest, MultipleCleanupNextRangeCalls) {
- CollectionRangeDeleter rangeDeleter;
- DBDirectClient dbclient(operationContext());
- dbclient.insert(kNss.toString(), BSON(kShardKey << 1));
- dbclient.insert(kNss.toString(), BSON(kShardKey << 2));
- dbclient.insert(kNss.toString(), BSON(kShardKey << 3));
- ASSERT_EQUALS(3ULL, dbclient.count(kNss, BSON(kShardKey << LT << 5)));
-
- std::list<Deletion> ranges;
- auto deletion = Deletion{ChunkRange(BSON(kShardKey << 0), BSON(kShardKey << 10)), Date_t{}};
- ranges.emplace_back(std::move(deletion));
- auto when = rangeDeleter.add(std::move(ranges));
- ASSERT(when && *when == Date_t{});
-
- ASSERT_TRUE(next(rangeDeleter, 1));
- ASSERT_EQUALS(2ULL, dbclient.count(kNss, BSON(kShardKey << LT << 5)));
-
- ASSERT_TRUE(next(rangeDeleter, 1));
- ASSERT_EQUALS(1ULL, dbclient.count(kNss, BSON(kShardKey << LT << 5)));
-
- ASSERT_TRUE(next(rangeDeleter, 1));
- ASSERT_TRUE(next(rangeDeleter, 1));
- ASSERT_EQUALS(0ULL, dbclient.count(kNss, BSON(kShardKey << LT << 5)));
- ASSERT_FALSE(next(rangeDeleter, 1));
- ASSERT_EQUALS(0ULL, dbclient.count(kAdminSysVer, BSON(kShardKey << "startRangeDeletion")));
-}
-
-// Tests the case that there are two ranges to clean, each containing multiple documents.
-TEST_F(CollectionRangeDeleterTest, MultipleDocumentsInMultipleRangesToClean) {
- CollectionRangeDeleter rangeDeleter;
- DBDirectClient dbclient(operationContext());
- dbclient.insert(kNss.toString(), BSON(kShardKey << 1));
- dbclient.insert(kNss.toString(), BSON(kShardKey << 2));
- dbclient.insert(kNss.toString(), BSON(kShardKey << 3));
- dbclient.insert(kNss.toString(), BSON(kShardKey << 4));
- dbclient.insert(kNss.toString(), BSON(kShardKey << 5));
- dbclient.insert(kNss.toString(), BSON(kShardKey << 6));
- ASSERT_EQUALS(6ULL, dbclient.count(kNss, BSON(kShardKey << LT << 10)));
-
- std::list<Deletion> ranges;
- auto later = Date_t::now();
- ranges.emplace_back(Deletion{ChunkRange{BSON(kShardKey << 0), BSON(kShardKey << 3)}, later});
- auto when = rangeDeleter.add(std::move(ranges));
- ASSERT(when && *when == later);
- ASSERT_TRUE(ranges.empty()); // not guaranteed by std, but failure would indicate a problem.
-
- std::list<Deletion> ranges2;
- ranges2.emplace_back(Deletion{ChunkRange{BSON(kShardKey << 4), BSON(kShardKey << 7)}, later});
- when = rangeDeleter.add(std::move(ranges2));
- ASSERT(!when);
-
- std::list<Deletion> ranges3;
- ranges3.emplace_back(
- Deletion{ChunkRange{BSON(kShardKey << 3), BSON(kShardKey << 4)}, Date_t{}});
- when = rangeDeleter.add(std::move(ranges3));
- ASSERT(when);
-
- auto optNotifn1 = rangeDeleter.overlaps(ChunkRange{BSON(kShardKey << 0), BSON(kShardKey << 3)});
- ASSERT_TRUE(optNotifn1);
- auto& notifn1 = *optNotifn1;
- ASSERT_FALSE(notifn1.ready());
-
- auto optNotifn2 = rangeDeleter.overlaps(ChunkRange{BSON(kShardKey << 4), BSON(kShardKey << 7)});
- ASSERT_TRUE(optNotifn2);
- auto& notifn2 = *optNotifn2;
- ASSERT_FALSE(notifn2.ready());
-
- auto optNotifn3 = rangeDeleter.overlaps(ChunkRange{BSON(kShardKey << 3), BSON(kShardKey << 4)});
- ASSERT_TRUE(optNotifn3);
- auto& notifn3 = *optNotifn3;
- ASSERT_FALSE(notifn3.ready());
-
- // test op== on notifications
- ASSERT_TRUE(notifn1 == *optNotifn1);
- ASSERT_FALSE(notifn1 == *optNotifn2);
- ASSERT_TRUE(notifn1 != *optNotifn2);
- ASSERT_FALSE(notifn1 != *optNotifn1);
-
- // no op log entry yet
- ASSERT_EQUALS(0ULL, dbclient.count(kAdminSysVer, BSON(kShardKey << "startRangeDeletion")));
-
- ASSERT_EQUALS(6ULL, dbclient.count(kNss, BSON(kShardKey << LT << 7)));
-
- // catch range3, [3..4) only
- auto next1 = next(rangeDeleter, 100);
- ASSERT_TRUE(next1);
-
- // no op log entry for immediate deletions
- ASSERT_EQUALS(0ULL, dbclient.count(kAdminSysVer, BSON(kShardKey << "startRangeDeletion")));
-
- // 3 gone
- ASSERT_EQUALS(5ULL, dbclient.count(kNss, BSON(kShardKey << LT << 7)));
- ASSERT_EQUALS(2ULL, dbclient.count(kNss, BSON(kShardKey << LT << 4)));
-
- ASSERT_FALSE(notifn1.ready()); // no trigger yet
- ASSERT_FALSE(notifn2.ready()); // no trigger yet
- ASSERT_FALSE(notifn3.ready()); // no trigger yet
-
- // this will find the [3..4) range empty, so pop the range and notify
- auto next2 = next(rangeDeleter, 100);
- ASSERT_TRUE(next2);
-
- // still no op log entry, because not delayed
- ASSERT_EQUALS(0ULL, dbclient.count(kAdminSysVer, BSON(kShardKey << "startRangeDeletion")));
-
- // deleted 1, 5 left
- ASSERT_EQUALS(2ULL, dbclient.count(kNss, BSON(kShardKey << LT << 4)));
- ASSERT_EQUALS(5ULL, dbclient.count(kNss, BSON(kShardKey << LT << 10)));
-
- ASSERT_FALSE(notifn1.ready()); // no trigger yet
- ASSERT_FALSE(notifn2.ready()); // no trigger yet
- ASSERT_TRUE(notifn3.ready()); // triggered.
- ASSERT_OK(notifn3.waitStatus(operationContext()));
-
- // This will find the regular queue empty, but the [0..3) range in the delayed queue.
- // However, the time to delete them is now, so the range is moved to the regular queue.
- auto next3 = next(rangeDeleter, 100);
- ASSERT_TRUE(next3);
-
- ASSERT_FALSE(notifn1.ready()); // no trigger yet
- ASSERT_FALSE(notifn2.ready()); // no trigger yet
-
- // deleted 3, 3 left
- ASSERT_EQUALS(3ULL, dbclient.count(kNss, BSON(kShardKey << LT << 10)));
-
- ASSERT_EQUALS(1ULL, dbclient.count(kAdminSysVer, BSON(kShardKey << "startRangeDeletion")));
- // clang-format off
- ASSERT_BSONOBJ_EQ(
- BSON("_id" << "startRangeDeletion" << "ns" << kNss.ns()
- << "uuid" << uuid() << "min" << BSON("_id" << 0) << "max" << BSON("_id" << 3)),
- dbclient.findOne(kAdminSysVer.ns(), QUERY("_id" << "startRangeDeletion")));
- // clang-format on
-
- // this will find the [0..3) range empty, so pop the range and notify
- auto next4 = next(rangeDeleter, 100);
- ASSERT_TRUE(next4);
-
- ASSERT_TRUE(notifn1.ready());
- ASSERT_OK(notifn1.waitStatus(operationContext()));
- ASSERT_FALSE(notifn2.ready());
-
- // op log entry unchanged
- // clang-format off
- ASSERT_BSONOBJ_EQ(
- BSON("_id" << "startRangeDeletion" << "ns" << kNss.ns()
- << "uuid" << uuid() << "min" << BSON("_id" << 0) << "max" << BSON("_id" << 3)),
- dbclient.findOne(kAdminSysVer.ns(), QUERY("_id" << "startRangeDeletion")));
- // clang-format on
-
- // still 3 left
- ASSERT_EQUALS(3ULL, dbclient.count(kNss, BSON(kShardKey << LT << 10)));
-
- // delete the remaining documents
- auto next5 = next(rangeDeleter, 100);
- ASSERT_TRUE(next5);
-
- ASSERT_FALSE(notifn2.ready());
-
- // Another delayed range, so logged
- // clang-format off
- ASSERT_BSONOBJ_EQ(
- BSON("_id" << "startRangeDeletion" << "ns" << kNss.ns()
- << "uuid" << uuid() << "min" << BSON("_id" << 4) << "max" << BSON("_id" << 7)),
- dbclient.findOne(kAdminSysVer.ns(), QUERY("_id" << "startRangeDeletion")));
- // clang-format on
-
- // all docs gone
- ASSERT_EQUALS(0ULL, dbclient.count(kNss, BSON(kShardKey << LT << 10)));
-
- // discover there are no more, pop range 2
- auto next6 = next(rangeDeleter, 100);
- ASSERT_TRUE(next6);
-
- ASSERT_TRUE(notifn2.ready());
- ASSERT_OK(notifn2.waitStatus(operationContext()));
-
- // discover there are no more ranges
- ASSERT_FALSE(next(rangeDeleter, 1));
-}
-
-// Tests that we retry on a WriteConflictException.
-TEST_F(CollectionRangeDeleterTest, RetryOnWriteConflictException) {
- CollectionRangeDeleter rangeDeleter;
- DBDirectClient dbclient(operationContext());
-
- dbclient.insert(kNss.toString(), BSON(kShardKey << 1));
- dbclient.insert(kNss.toString(), BSON(kShardKey << 2));
- dbclient.insert(kNss.toString(), BSON(kShardKey << 3));
- ASSERT_EQUALS(3ULL, dbclient.count(kNss, BSON(kShardKey << LT << 5)));
-
- std::list<Deletion> ranges;
- auto deletion = Deletion{ChunkRange(BSON(kShardKey << 0), BSON(kShardKey << 10)), Date_t{}};
- ranges.emplace_back(std::move(deletion));
- auto when = rangeDeleter.add(std::move(ranges));
- ASSERT(when && *when == Date_t{});
-
- // TODO SERVER-41606: Remove this function when we refactor CollectionRangeDeleter.
- rangeDeleter.setDoDeletionShouldThrowWriteConflictForTest(true);
-
- ASSERT_TRUE(next(rangeDeleter, 1));
- ASSERT_EQUALS(3ULL, dbclient.count(kNss, BSON(kShardKey << LT << 5)));
-
- // TODO SERVER-41606: Remove this function when we refactor CollectionRangeDeleter.
- rangeDeleter.setDoDeletionShouldThrowWriteConflictForTest(false);
-
- ASSERT_TRUE(next(rangeDeleter, 1));
- ASSERT_EQUALS(2ULL, dbclient.count(kNss, BSON(kShardKey << LT << 5)));
-}
-
-} // namespace
-} // namespace mongo
diff --git a/src/mongo/db/s/collection_sharding_runtime.cpp b/src/mongo/db/s/collection_sharding_runtime.cpp
index 9e5f79529fe..93b4529d502 100644
--- a/src/mongo/db/s/collection_sharding_runtime.cpp
+++ b/src/mongo/db/s/collection_sharding_runtime.cpp
@@ -100,9 +100,10 @@ boost::optional<ChunkVersion> getOperationReceivedVersion(OperationContext* opCt
} // namespace
-CollectionShardingRuntime::CollectionShardingRuntime(ServiceContext* sc,
- NamespaceString nss,
- executor::TaskExecutor* rangeDeleterExecutor)
+CollectionShardingRuntime::CollectionShardingRuntime(
+ ServiceContext* sc,
+ NamespaceString nss,
+ std::shared_ptr<executor::TaskExecutor> rangeDeleterExecutor)
: _nss(std::move(nss)),
_rangeDeleterExecutor(rangeDeleterExecutor),
_stateChangeMutex(nss.toString()) {
@@ -227,7 +228,7 @@ void CollectionShardingRuntime::clearFilteringMetadata() {
}
}
-auto CollectionShardingRuntime::beginReceive(ChunkRange const& range) -> CleanupNotification {
+SharedSemiFuture<void> CollectionShardingRuntime::beginReceive(ChunkRange const& range) {
stdx::lock_guard lk(_metadataManagerLock);
invariant(_metadataType == MetadataType::kSharded);
return _metadataManager->beginReceive(range);
@@ -238,14 +239,11 @@ void CollectionShardingRuntime::forgetReceive(const ChunkRange& range) {
invariant(_metadataType == MetadataType::kSharded);
_metadataManager->forgetReceive(range);
}
-
-auto CollectionShardingRuntime::cleanUpRange(ChunkRange const& range, CleanWhen when)
- -> CleanupNotification {
- Date_t time =
- (when == kNow) ? Date_t{} : Date_t::now() + Seconds(orphanCleanupDelaySecs.load());
+SharedSemiFuture<void> CollectionShardingRuntime::cleanUpRange(ChunkRange const& range,
+ CleanWhen when) {
stdx::lock_guard lk(_metadataManagerLock);
invariant(_metadataType == MetadataType::kSharded);
- return _metadataManager->cleanUpRange(range, time);
+ return _metadataManager->cleanUpRange(range, when == kDelayed);
}
Status CollectionShardingRuntime::waitForClean(OperationContext* opCtx,
@@ -253,7 +251,7 @@ Status CollectionShardingRuntime::waitForClean(OperationContext* opCtx,
OID const& epoch,
ChunkRange orphanRange) {
while (true) {
- boost::optional<CleanupNotification> stillScheduled;
+ boost::optional<SharedSemiFuture<void>> stillScheduled;
{
AutoGetCollection autoColl(opCtx, nss, MODE_IX);
@@ -279,7 +277,7 @@ Status CollectionShardingRuntime::waitForClean(OperationContext* opCtx,
log() << "Waiting for deletion of " << nss.ns() << " range " << orphanRange;
- Status result = stillScheduled->waitStatus(opCtx);
+ Status result = stillScheduled->getNoThrow(opCtx);
if (!result.isOK()) {
return result.withContext(str::stream() << "Failed to delete orphaned " << nss.ns()
<< " range " << orphanRange.toString());
diff --git a/src/mongo/db/s/collection_sharding_runtime.h b/src/mongo/db/s/collection_sharding_runtime.h
index b0336a3143d..054f637440e 100644
--- a/src/mongo/db/s/collection_sharding_runtime.h
+++ b/src/mongo/db/s/collection_sharding_runtime.h
@@ -50,7 +50,7 @@ class CollectionShardingRuntime final : public CollectionShardingState,
public:
CollectionShardingRuntime(ServiceContext* sc,
NamespaceString nss,
- executor::TaskExecutor* rangeDeleterExecutor);
+ std::shared_ptr<executor::TaskExecutor> rangeDeleterExecutor);
CollectionShardingRuntime(const CollectionShardingRuntime&) = delete;
CollectionShardingRuntime& operator=(const CollectionShardingRuntime&) = delete;
@@ -126,17 +126,15 @@ public:
/**
* Schedules any documents in `range` for immediate cleanup iff no running queries can depend
- * on them, and adds the range to the list of pending ranges. Otherwise, returns a notification
- * that yields bad status immediately. Does not block. Call waitStatus(opCtx) on the result
- * to wait for the deletion to complete or fail. After that, call waitForClean to ensure no
- * other deletions are pending for the range.
+ * on them, and adds the range to the list of ranges being received.
+ *
+ * Returns a future that will be resolved when the deletion has completed or failed.
*/
- using CleanupNotification = CollectionRangeDeleter::DeleteNotification;
- CleanupNotification beginReceive(ChunkRange const& range);
+ SharedSemiFuture<void> beginReceive(ChunkRange const& range);
/*
- * Removes `range` from the list of pending ranges, and schedules any documents in the range for
- * immediate cleanup. Does not block.
+ * Removes `range` from the list of ranges being received, and schedules any documents in the
+ * range for immediate cleanup. Does not block.
*/
void forgetReceive(const ChunkRange& range);
@@ -146,12 +144,11 @@ public:
* Passed kDelayed, an additional delay (configured via server parameter orphanCleanupDelaySecs)
* is added to permit (most) dependent queries on secondaries to complete, too.
*
- * Call result.waitStatus(opCtx) to wait for the deletion to complete or fail. If that succeeds,
- * waitForClean can be called to ensure no other deletions are pending for the range. Call
- * result.abandon(), instead of waitStatus, to ignore the outcome.
+ * Returns a future that will be resolved when the deletion completes or fails. If that
+ * succeeds, waitForClean can be called to ensure no other deletions are pending for the range.
*/
enum CleanWhen { kNow, kDelayed };
- CleanupNotification cleanUpRange(ChunkRange const& range, CleanWhen when);
+ SharedSemiFuture<void> cleanUpRange(ChunkRange const& range, CleanWhen when);
/**
* Returns a range _not_ owned by this shard that starts no lower than the specified
@@ -169,13 +166,6 @@ public:
private:
friend CSRLock;
- friend boost::optional<Date_t> CollectionRangeDeleter::cleanUpNextRange(
- OperationContext*,
- NamespaceString const&,
- UUID collectionUuid,
- int,
- CollectionRangeDeleter*);
-
/**
* Returns the latest version of collection metadata with filtering configured for
* atClusterTime if specified.
@@ -197,7 +187,7 @@ private:
const NamespaceString _nss;
// The executor used for deleting ranges of orphan chunks.
- executor::TaskExecutor* _rangeDeleterExecutor;
+ std::shared_ptr<executor::TaskExecutor> _rangeDeleterExecutor;
// Object-wide ResourceMutex to protect changes to the CollectionShardingRuntime or objects held
// within (including the MigrationSourceManager, which is a decoration on the CSR). Use only the
diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp
index c911634e890..e3a9d0d9f47 100644
--- a/src/mongo/db/s/collection_sharding_state.cpp
+++ b/src/mongo/db/s/collection_sharding_state.cpp
@@ -53,6 +53,14 @@ public:
CollectionShardingStateMap(std::unique_ptr<CollectionShardingStateFactory> factory)
: _factory(std::move(factory)) {}
+ /**
+ * Joins the factory, waiting for any outstanding tasks using the factory to be finished. Must
+ * be called before destruction.
+ */
+ void join() {
+ _factory->join();
+ }
+
CollectionShardingState& getOrCreate(const NamespaceString& nss) {
stdx::lock_guard<Latch> lg(_mutex);
@@ -129,7 +137,10 @@ void CollectionShardingStateFactory::set(ServiceContext* service,
void CollectionShardingStateFactory::clear(ServiceContext* service) {
auto& collectionsMap = CollectionShardingStateMap::get(service);
- collectionsMap.reset();
+ if (collectionsMap) {
+ collectionsMap->join();
+ collectionsMap.reset();
+ }
}
} // namespace mongo
diff --git a/src/mongo/db/s/collection_sharding_state.h b/src/mongo/db/s/collection_sharding_state.h
index f4974961ff6..30fac48cedc 100644
--- a/src/mongo/db/s/collection_sharding_state.h
+++ b/src/mongo/db/s/collection_sharding_state.h
@@ -178,6 +178,11 @@ public:
virtual ~CollectionShardingStateFactory() = default;
/**
+ * Must be called prior to destruction to wait for any ongoing work to complete.
+ */
+ virtual void join() = 0;
+
+ /**
* Called by the CollectionShardingState::get method once per newly cached namespace. It is
* invoked under a mutex and must not acquire any locks or do blocking work.
*
diff --git a/src/mongo/db/s/collection_sharding_state_factory_embedded.cpp b/src/mongo/db/s/collection_sharding_state_factory_embedded.cpp
index 4e705f0bd41..95614739d0d 100644
--- a/src/mongo/db/s/collection_sharding_state_factory_embedded.cpp
+++ b/src/mongo/db/s/collection_sharding_state_factory_embedded.cpp
@@ -88,6 +88,8 @@ public:
CollectionShardingStateFactoryEmbedded(ServiceContext* serviceContext)
: CollectionShardingStateFactory(serviceContext) {}
+ void join() override {}
+
std::unique_ptr<CollectionShardingState> make(const NamespaceString&) override {
return std::make_unique<CollectionShardingStateStandalone>();
}
diff --git a/src/mongo/db/s/collection_sharding_state_factory_shard.cpp b/src/mongo/db/s/collection_sharding_state_factory_shard.cpp
index b0c800c92cb..14426d05ec6 100644
--- a/src/mongo/db/s/collection_sharding_state_factory_shard.cpp
+++ b/src/mongo/db/s/collection_sharding_state_factory_shard.cpp
@@ -46,6 +46,10 @@ public:
: CollectionShardingStateFactory(serviceContext) {}
~CollectionShardingStateFactoryShard() {
+ join();
+ }
+
+ void join() override {
if (_taskExecutor) {
_taskExecutor->shutdown();
_taskExecutor->join();
@@ -57,7 +61,7 @@ public:
}
private:
- executor::TaskExecutor* _getExecutor() {
+ std::shared_ptr<executor::TaskExecutor> _getExecutor() {
stdx::lock_guard<Latch> lg(_mutex);
if (!_taskExecutor) {
const std::string kExecName("CollectionRangeDeleter-TaskExecutor");
@@ -65,18 +69,20 @@ private:
auto net = executor::makeNetworkInterface(kExecName);
auto pool = std::make_unique<executor::NetworkInterfaceThreadPool>(net.get());
auto taskExecutor =
- std::make_unique<executor::ThreadPoolTaskExecutor>(std::move(pool), std::move(net));
+ std::make_shared<executor::ThreadPoolTaskExecutor>(std::move(pool), std::move(net));
taskExecutor->startup();
_taskExecutor = std::move(taskExecutor);
}
- return _taskExecutor.get();
+ return _taskExecutor;
}
// Serializes the instantiation of the task executor
Mutex _mutex = MONGO_MAKE_LATCH("CollectionShardingStateFactoryShard::_mutex");
- std::unique_ptr<executor::TaskExecutor> _taskExecutor{nullptr};
+
+ // Required to be a shared_ptr since it is used as an executor for ExecutorFutures.
+ std::shared_ptr<executor::TaskExecutor> _taskExecutor{nullptr};
};
} // namespace
diff --git a/src/mongo/db/s/collection_sharding_state_test.cpp b/src/mongo/db/s/collection_sharding_state_test.cpp
index 5c1dd392ae5..c01c9879423 100644
--- a/src/mongo/db/s/collection_sharding_state_test.cpp
+++ b/src/mongo/db/s/collection_sharding_state_test.cpp
@@ -167,7 +167,7 @@ TEST_F(DeleteStateTest, MakeDeleteStateShardedWithIdHashInShardKey) {
TEST_F(CollectionShardingRuntimeTest,
GetCurrentMetadataReturnsNoneBeforeSetFilteringMetadataIsCalled) {
- CollectionShardingRuntime csr(getServiceContext(), kTestNss, executor().get());
+ CollectionShardingRuntime csr(getServiceContext(), kTestNss, executor());
ASSERT_FALSE(csr.getCurrentMetadataIfKnown());
}
diff --git a/src/mongo/db/s/metadata_manager.cpp b/src/mongo/db/s/metadata_manager.cpp
index 9e9d67b77f5..f038c328052 100644
--- a/src/mongo/db/s/metadata_manager.cpp
+++ b/src/mongo/db/s/metadata_manager.cpp
@@ -40,125 +40,18 @@
#include "mongo/bson/util/builder.h"
#include "mongo/db/query/internal_plans.h"
#include "mongo/db/range_arithmetic.h"
-#include "mongo/db/s/collection_sharding_state.h"
+#include "mongo/db/s/range_deletion_util.h"
+#include "mongo/db/s/sharding_runtime_d_params_gen.h"
#include "mongo/s/grid.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/fail_point.h"
#include "mongo/util/log.h"
#include "mongo/util/time_support.h"
-// MetadataManager maintains pointers to CollectionMetadata objects in a member list named
-// _metadata. Each CollectionMetadata contains an immutable _chunksMap of chunks assigned to this
-// shard, along with details related to its own lifecycle in a member _tracker.
-//
-// The current chunk mapping, used by queries starting up, is at _metadata.back(). Each query,
-// when it starts up, requests and holds a ScopedCollectionMetadata object, and destroys it on
-// termination. Each ScopedCollectionMetadata keeps a shared_ptr to its CollectionMetadata chunk
-// mapping, and to the MetadataManager itself. CollectionMetadata mappings also keep a record of
-// chunk ranges that may be deleted when it is determined that the range can no longer be in use.
-//
-// ScopedCollectionMetadata's destructor decrements the CollectionMetadata's usageCounter.
-// Whenever a usageCounter drops to zero, we check whether any now-unused CollectionMetadata
-// elements can be popped off the front of _metadata. We need to keep the unused elements in the
-// middle (as seen below) because they may schedule deletions of chunks depended on by older
-// mappings.
-//
-// New chunk mappings are pushed onto the back of _metadata. Subsequently started queries use the
-// new mapping while still-running queries continue using the older "snapshot" mappings. We treat
-// _metadata.back()'s usage count differently from the snapshots because it can't reliably be
-// compared to zero; a new query may increment it at any time.
-//
-// (Note that the collection may be dropped or become unsharded, and even get made and sharded
-// again, between construction and destruction of a ScopedCollectionMetadata).
-//
-// MetadataManager also contains a CollectionRangeDeleter _rangesToClean that queues orphan ranges
-// being deleted in a background thread, and a mapping _receivingChunks of the ranges being migrated
-// in, to avoid deleting them. Each range deletion is paired with a notification object triggered
-// when the deletion is completed or abandoned.
-//
-// ____________________________
-// (s): std::shared_ptr<> Clients:| ScopedCollectionMetadata |
-// _________________________ +----(s) manager metadata (s)------------------+
-// | CollectionShardingState | | |____________________________| | |
-// | _metadataManager (s) | +-------(s) manager metadata (s)--------------+ |
-// |____________________|____| | |____________________________| | | |
-// ____________________v________ +------------(s) manager metadata (s)-----+ | |
-// | MetadataManager | | |____________________________| | | |
-// | |<--+ | | |
-// | | ___________________________ (1 use) | | |
-// | getActiveMetadata(): /---------->| CollectionMetadata |<---------+ | |
-// | back(): [(s),------/ | | _________________________|_ | |
-// | (s),-------------------->| CollectionMetadata | (0 uses) | |
-// | _metadata: (s)]------\ | | | _________________________|_ | |
-// | \-------------->| CollectionMetadata | | |
-// | _receivingChunks | | | | | (2 uses) | |
-// | _rangesToClean: | | | | _tracker: |<---------+ |
-// | _________________________ | | | | _______________________ |<-----------+
-// | | CollectionRangeDeleter | | | | | | Tracker | |
-// | | | | | | | | | |
-// | | _orphans [range,notif, | | | | | | usageCounter | |
-// | | range,notif, | | | | | | orphans [range,notif, | |
-// | | ... ] | | | | | | range,notif, | |
-// | | | | | | | | ... ] | |
-// | |_________________________| | |_| | |_______________________| |
-// |_____________________________| | | _chunksMap |
-// |_| _chunkVersion |
-// | ... |
-// |___________________________|
-//
-// Note that _metadata as shown here has its front() at the bottom, back() at the top. As usual,
-// new entries are pushed onto the back, popped off the front.
-
namespace mongo {
namespace {
-
using TaskExecutor = executor::TaskExecutor;
using CallbackArgs = TaskExecutor::CallbackArgs;
-
-MONGO_FAIL_POINT_DEFINE(suspendRangeDeletion);
-
-/**
- * Deletes ranges, in background, until done, normally using a task executor attached to the
- * ShardingState.
- *
- * Each time it completes cleaning up a range, it wakes up clients waiting on completion of that
- * range, which may then verify that their range has no more deletions scheduled, and proceed.
- */
-void scheduleCleanup(executor::TaskExecutor* executor,
- NamespaceString nss,
- UUID collectionUuid,
- Date_t when) {
- LOG(1) << "Scheduling cleanup on " << nss.ns() << " at " << when;
- auto swCallbackHandle = executor->scheduleWorkAt(
- when, [executor, nss = std::move(nss), uuid = collectionUuid](auto& args) {
- auto& status = args.status;
- if (ErrorCodes::isCancelationError(status.code())) {
- return;
- }
- invariant(status);
-
- ThreadClient tc("Collection-Range-Deleter", getGlobalServiceContext());
- {
- stdx::lock_guard<Client> lk(*tc.get());
- tc->setSystemOperationKillable(lk);
- }
- auto uniqueOpCtx = Client::getCurrent()->makeOperationContext();
- auto opCtx = uniqueOpCtx.get();
-
- suspendRangeDeletion.pauseWhileSet();
-
- auto next = CollectionRangeDeleter::cleanUpNextRange(opCtx, nss, uuid);
- if (next) {
- scheduleCleanup(executor, std::move(nss), std::move(uuid), *next);
- }
- });
-
- if (!swCallbackHandle.isOK()) {
- log() << "Failed to schedule the orphan data cleanup task"
- << causedBy(redact(swCallbackHandle.getStatus()));
- }
-}
-
} // namespace
class RangePreserver : public ScopedCollectionMetadata::Impl {
@@ -202,7 +95,7 @@ private:
MetadataManager::MetadataManager(ServiceContext* serviceContext,
NamespaceString nss,
- TaskExecutor* executor,
+ std::shared_ptr<TaskExecutor> executor,
CollectionMetadata initialMetadata)
: _serviceContext(serviceContext),
_nss(std::move(nss)),
@@ -212,26 +105,6 @@ MetadataManager::MetadataManager(ServiceContext* serviceContext,
_metadata.emplace_back(std::make_shared<CollectionMetadataTracker>(std::move(initialMetadata)));
}
-MetadataManager::~MetadataManager() {
- stdx::lock_guard<Latch> lg(_managerLock);
- _clearAllCleanups(lg);
-}
-
-void MetadataManager::_clearAllCleanups(WithLock lock) {
- _clearAllCleanups(lock,
- {ErrorCodes::InterruptedDueToReplStateChange,
- str::stream()
- << "Range deletions in " << _nss.ns()
- << " abandoned because collection was dropped or became unsharded"});
-}
-
-void MetadataManager::_clearAllCleanups(WithLock, Status status) {
- for (auto& tracker : _metadata) {
- std::ignore = _rangesToClean.add(std::move(tracker->orphans));
- }
- _rangesToClean.clear(status);
-}
-
ScopedCollectionMetadata MetadataManager::getActiveMetadata(
const boost::optional<LogicalTime>& atClusterTime) {
stdx::lock_guard<Latch> lg(_managerLock);
@@ -329,18 +202,12 @@ void MetadataManager::_setActiveMetadata(WithLock wl, CollectionMetadata newMeta
_retireExpiredMetadata(wl);
}
-void MetadataManager::_retireExpiredMetadata(WithLock lock) {
- // Remove entries and schedule orphans for deletion only from the front of _metadata. We cannot
- // remove an entry from the middle of _metadata because a previous entry (whose usageCount is
- // not 0) could have a query that is actually still accessing those documents.
+void MetadataManager::_retireExpiredMetadata(WithLock) {
+ // Remove entries with a usage count of 0 from the front of _metadata, which may schedule
+ // orphans for deletion. We cannot remove an entry from the middle of _metadata because a
+ // previous entry (whose usageCount is not 0) could have a query that is actually still
+ // accessing those documents.
while (_metadata.size() > 1 && !_metadata.front()->usageCounter) {
- if (!_metadata.front()->orphans.empty()) {
- LOG(0) << "Queries possibly dependent on " << _nss.ns()
- << " range(s) finished; scheduling ranges for deletion";
-
- _pushListToClean(lock, std::move(_metadata.front()->orphans));
- }
-
_metadata.pop_front();
}
@@ -373,7 +240,12 @@ void MetadataManager::toBSONPending(BSONArrayBuilder& bb) const {
void MetadataManager::append(BSONObjBuilder* builder) const {
stdx::lock_guard<Latch> lg(_managerLock);
- _rangesToClean.append(builder);
+ BSONArrayBuilder arr(builder->subarrayStart("rangesToClean"));
+ for (auto const& [range, _] : _rangesScheduledForDeletion) {
+ BSONObjBuilder obj;
+ range.append(&obj);
+ arr.append(obj.done());
+ }
BSONArrayBuilder pcArr(builder->subarrayStart("pendingChunks"));
for (const auto& entry : _receivingChunks) {
@@ -396,23 +268,7 @@ void MetadataManager::append(BSONObjBuilder* builder) const {
amrArr.done();
}
-auto MetadataManager::_pushRangeToClean(WithLock lock, ChunkRange const& range, Date_t when)
- -> CleanupNotification {
- std::list<Deletion> ranges;
- ranges.emplace_back(ChunkRange(range.getMin().getOwned(), range.getMax().getOwned()), when);
- auto& notifn = ranges.back().notification;
- _pushListToClean(lock, std::move(ranges));
- return notifn;
-}
-
-void MetadataManager::_pushListToClean(WithLock, std::list<Deletion> ranges) {
- auto when = _rangesToClean.add(std::move(ranges));
- if (when) {
- scheduleCleanup(_executor, _nss, _collectionUuid, *when);
- }
-}
-
-auto MetadataManager::beginReceive(ChunkRange const& range) -> CleanupNotification {
+SharedSemiFuture<void> MetadataManager::beginReceive(ChunkRange const& range) {
stdx::lock_guard<Latch> lg(_managerLock);
invariant(!_metadata.empty());
@@ -426,7 +282,8 @@ auto MetadataManager::beginReceive(ChunkRange const& range) -> CleanupNotificati
log() << "Scheduling deletion of any documents in " << _nss.ns() << " range "
<< redact(range.toString()) << " before migrating in a chunk covering the range";
- return _pushRangeToClean(lg, range, Date_t{});
+ return _submitRangeForDeletion(
+ lg, SemiFuture<void>::makeReady(), range, Seconds(orphanCleanupDelaySecs.load()));
}
void MetadataManager::forgetReceive(ChunkRange const& range) {
@@ -444,11 +301,11 @@ void MetadataManager::forgetReceive(ChunkRange const& range) {
invariant(it != _receivingChunks.end());
_receivingChunks.erase(it);
- _pushRangeToClean(lg, range, Date_t{}).abandon();
+ std::ignore = _submitRangeForDeletion(lg, SemiFuture<void>::makeReady(), range, Seconds(0));
}
-auto MetadataManager::cleanUpRange(ChunkRange const& range, Date_t whenToDelete)
- -> CleanupNotification {
+SharedSemiFuture<void> MetadataManager::cleanUpRange(ChunkRange const& range,
+ bool shouldDelayBeforeDeletion) {
stdx::lock_guard<Latch> lg(_managerLock);
invariant(!_metadata.empty());
@@ -466,48 +323,54 @@ auto MetadataManager::cleanUpRange(ChunkRange const& range, Date_t whenToDelete)
" migrated in"};
}
- if (!overlapMetadata) {
- // No running queries can depend on it, so queue it for deletion immediately.
- const auto whenStr = (whenToDelete == Date_t{}) ? "immediate"_sd : "deferred"_sd;
- log() << "Scheduling " << whenStr << " deletion of " << _nss.ns() << " range "
- << redact(range.toString());
- return _pushRangeToClean(lg, range, whenToDelete);
+ auto delayForActiveQueriesOnSecondariesToComplete =
+ shouldDelayBeforeDeletion ? Seconds(orphanCleanupDelaySecs.load()) : Seconds(0);
+
+ if (overlapMetadata) {
+ log() << "Deletion of " << _nss.ns() << " range " << redact(range.toString())
+ << " will be scheduled after all possibly dependent queries finish";
+ ++overlapMetadata->numContingentRangeDeletionTasks;
+ // Schedule the range for deletion once the overlapping metadata object is destroyed
+ // (meaning no more queries can be using the range) and obtain a future which will be
+ // signaled when deletion is complete.
+ return _submitRangeForDeletion(lg,
+ overlapMetadata->onDestructionPromise.getFuture().semi(),
+ range,
+ delayForActiveQueriesOnSecondariesToComplete);
+ } else {
+ // No running queries can depend on this range, so queue it for deletion immediately.
+ log() << "Scheduling deletion of " << _nss.ns() << " range " << redact(range.toString());
+
+ return _submitRangeForDeletion(
+ lg, SemiFuture<void>::makeReady(), range, delayForActiveQueriesOnSecondariesToComplete);
}
-
- log() << "Deletion of " << _nss.ns() << " range " << redact(range.toString())
- << " will be scheduled after all possibly dependent queries finish";
-
- // Put it on the oldest metadata permissible; the current one might live a long time.
- auto& orphans = overlapMetadata->orphans;
- orphans.emplace_back(ChunkRange(range.getMin().getOwned(), range.getMax().getOwned()),
- whenToDelete);
-
- return orphans.back().notification;
}
size_t MetadataManager::numberOfRangesToCleanStillInUse() const {
stdx::lock_guard<Latch> lg(_managerLock);
size_t count = 0;
for (auto& tracker : _metadata) {
- count += tracker->orphans.size();
+ count += tracker->numContingentRangeDeletionTasks;
}
return count;
}
size_t MetadataManager::numberOfRangesToClean() const {
+ auto rangesToCleanInUse = numberOfRangesToCleanStillInUse();
stdx::lock_guard<Latch> lg(_managerLock);
- return _rangesToClean.size();
+ return _rangesScheduledForDeletion.size() - rangesToCleanInUse;
}
-auto MetadataManager::trackOrphanedDataCleanup(ChunkRange const& range) const
- -> boost::optional<CleanupNotification> {
+boost::optional<SharedSemiFuture<void>> MetadataManager::trackOrphanedDataCleanup(
+ ChunkRange const& range) const {
stdx::lock_guard<Latch> lg(_managerLock);
- auto overlaps = _overlapsInUseCleanups(lg, range);
- if (overlaps) {
- return overlaps;
+ for (const auto& [orphanRange, deletionComplete] : _rangesScheduledForDeletion) {
+ if (orphanRange.overlapWith(range)) {
+ return deletionComplete;
+ }
}
- return _rangesToClean.overlaps(range);
+ return boost::none;
}
auto MetadataManager::_findNewestOverlappingMetadata(WithLock, ChunkRange const& range)
@@ -536,27 +399,44 @@ bool MetadataManager::_overlapsInUseChunk(WithLock lk, ChunkRange const& range)
return (cm != nullptr);
}
-auto MetadataManager::_overlapsInUseCleanups(WithLock, ChunkRange const& range) const
- -> boost::optional<CleanupNotification> {
- invariant(!_metadata.empty());
-
- for (auto it = _metadata.rbegin(); it != _metadata.rend(); ++it) {
- const auto& orphans = (*it)->orphans;
- for (auto itOrphans = orphans.rbegin(); itOrphans != orphans.rend(); ++itOrphans) {
- const auto& orphan = *itOrphans;
- if (orphan.range.overlapWith(range)) {
- return orphan.notification;
- }
- }
- }
-
- return boost::none;
-}
-
boost::optional<ChunkRange> MetadataManager::getNextOrphanRange(BSONObj const& from) const {
stdx::lock_guard<Latch> lg(_managerLock);
invariant(!_metadata.empty());
return _metadata.back()->metadata->getNextOrphanRange(_receivingChunks, from);
}
+SharedSemiFuture<void> MetadataManager::_submitRangeForDeletion(
+ const WithLock&,
+ SemiFuture<void> waitForActiveQueriesToComplete,
+ const ChunkRange& range,
+ Seconds delayForActiveQueriesOnSecondariesToComplete) {
+
+ int maxToDelete = rangeDeleterBatchSize.load();
+ if (maxToDelete <= 0) {
+ maxToDelete = std::max(int(internalQueryExecYieldIterations.load()), 1);
+ }
+
+ auto cleanupComplete =
+ removeDocumentsInRange(_executor,
+ std::move(waitForActiveQueriesToComplete),
+ _nss,
+ *_metadata.back()->metadata->getChunkManager()->getUUID(),
+ _metadata.back()->metadata->getKeyPattern().getOwned(),
+ range,
+ maxToDelete,
+ delayForActiveQueriesOnSecondariesToComplete,
+ Milliseconds(rangeDeleterBatchDelayMS.load()));
+
+ _rangesScheduledForDeletion.emplace_front(range, cleanupComplete);
+ // Attach a continuation so that once the range has been deleted, we will remove the deletion
+ // from the _rangesScheduledForDeletion. std::list iterators are never invalidated, which
+ // allows us to save the iterator pointing to the newly added element for use later when
+ // deleting it.
+ cleanupComplete.thenRunOn(_executor).onCompletion(
+ [self = shared_from_this(), it = _rangesScheduledForDeletion.begin()](Status s) {
+ stdx::lock_guard<Latch> lg(self->_managerLock);
+ self->_rangesScheduledForDeletion.erase(it);
+ });
+ return cleanupComplete;
+}
} // namespace mongo
diff --git a/src/mongo/db/s/metadata_manager.h b/src/mongo/db/s/metadata_manager.h
index 69fc46ec81a..803aa6330a4 100644
--- a/src/mongo/db/s/metadata_manager.h
+++ b/src/mongo/db/s/metadata_manager.h
@@ -36,12 +36,10 @@
#include "mongo/db/logical_time.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/range_arithmetic.h"
-#include "mongo/db/s/collection_range_deleter.h"
#include "mongo/db/s/scoped_collection_metadata.h"
#include "mongo/db/service_context.h"
#include "mongo/executor/task_executor.h"
#include "mongo/s/catalog/type_chunk.h"
-#include "mongo/util/concurrency/notification.h"
#include "mongo/util/concurrency/with_lock.h"
namespace mongo {
@@ -53,14 +51,11 @@ class RangePreserver;
*/
class MetadataManager : public std::enable_shared_from_this<MetadataManager> {
public:
- using CleanupNotification = CollectionRangeDeleter::DeleteNotification;
- using Deletion = CollectionRangeDeleter::Deletion;
-
MetadataManager(ServiceContext* serviceContext,
NamespaceString nss,
- executor::TaskExecutor* executor,
+ std::shared_ptr<executor::TaskExecutor> executor,
CollectionMetadata initialMetadata);
- ~MetadataManager();
+ ~MetadataManager() = default;
MetadataManager(const MetadataManager&) = delete;
MetadataManager& operator=(const MetadataManager&) = delete;
@@ -117,27 +112,29 @@ public:
/**
* Schedules any documents in `range` for immediate cleanup iff no running queries can depend
- * on them, and adds the range to the list of pending ranges. Otherwise, returns a notification
- * that yields bad status immediately. Does not block. Call waitStatus(opCtx) on the result
- * to wait for the deletion to complete or fail.
+ * on them, and adds the range to the list of ranges currently being received.
+ *
+ * Returns a future that will be resolved when the deletion either completes or fail.
*/
- CleanupNotification beginReceive(ChunkRange const& range);
+ SharedSemiFuture<void> beginReceive(ChunkRange const& range);
/**
- * Removes `range` from the list of pending ranges, and schedules any documents in the range for
- * immediate cleanup. Does not block. If no such range is scheduled, does nothing.
+ * Removes `range` from the list of ranges currently being received, and schedules any documents
+ * in the range for immediate cleanup.
*/
void forgetReceive(const ChunkRange& range);
/**
* Schedules documents in `range` for cleanup after any running queries that may depend on them
* have terminated. Does not block. Fails if the range overlaps any current local shard chunk.
- * If `whenToDelete` is Date_t{}, deletion is scheduled immediately after the last dependent
- * query completes; otherwise, deletion is postponed until the time specified.
*
- * Call waitStatus(opCtx) on the result to wait for the deletion to complete or fail.
+ * If shouldDelayBeforeDeletion is false, deletion is scheduled immediately after the last
+ * dependent query completes; otherwise, deletion is postponed until after
+ * orphanCleanupDelaySecs after the last dependent query completes.
+ *
+ * Returns a future that will be fulfilled when the range deletion completes or fails.
*/
- CleanupNotification cleanUpRange(ChunkRange const& range, Date_t whenToDelete);
+ SharedSemiFuture<void> cleanUpRange(ChunkRange const& range, bool shouldDelayBeforeDeletion);
/**
* Returns the number of ranges scheduled to be cleaned, exclusive of such ranges that might
@@ -155,21 +152,18 @@ public:
/**
* Reports whether any range still scheduled for deletion overlaps the argument range. If so,
- * returns a notification n such that n.waitStatus(opCtx) will wake up when the newest
- * overlapping range's deletion (possibly the one of interest) completes or fails.
+ * returns a future that will be resolved when the newest overlapping range's deletion (possibly
+ * the one of interest) completes or fails.
*/
- boost::optional<CleanupNotification> trackOrphanedDataCleanup(ChunkRange const& orphans) const;
+ boost::optional<SharedSemiFuture<void>> trackOrphanedDataCleanup(
+ ChunkRange const& orphans) const;
boost::optional<ChunkRange> getNextOrphanRange(BSONObj const& from) const;
private:
- // For access to _managerLock, _rangesToClean, and _clearAllCleanups under task callback
- friend class CollectionRangeDeleter;
-
// Management of the _metadata list is implemented in RangePreserver
friend class RangePreserver;
-
/**
* Represents an instance of what the filtering metadata for this collection was at a particular
* point in time along with a counter of how many queries are still using it.
@@ -183,29 +177,34 @@ private:
~CollectionMetadataTracker() {
invariant(!usageCounter);
+ onDestructionPromise.emplaceValue();
}
boost::optional<CollectionMetadata> metadata;
- std::list<Deletion> orphans;
+ /**
+ * Number of range deletion tasks waiting on this CollectionMetadataTracker to be destroyed
+ * before deleting documents.
+ */
+ uint32_t numContingentRangeDeletionTasks{0};
+
+ /**
+ * Promise that will be signaled when this object is destroyed.
+ *
+ * In the case where this CollectionMetadataTracker may refer to orphaned documents for one
+ * or more ranges, the corresponding futures from this promise are used as barriers to
+ * prevent range deletion tasks for those ranges from proceeding until this object is
+ * destroyed, to guarantee that ranges aren't deleted while active queries can still access
+ * them.
+ */
+ SharedPromise<void> onDestructionPromise;
uint32_t usageCounter{0};
};
/**
- * Cancels all scheduled deletions of orphan ranges, notifying listeners with specified status.
- */
- void _clearAllCleanups(WithLock, Status);
-
- /**
- * Cancels all scheduled deletions of orphan ranges, notifying listeners with status
- * InterruptedDueToReplStateChange.
- */
- void _clearAllCleanups(WithLock);
-
- /**
- * Retires any metadata that has fallen out of use, and pushes any orphan ranges found in them
- * to the list of ranges actively being cleaned up.
+ * Retires any metadata that has fallen out of use, potentially allowing range deletions to
+ * proceed which were waiting for active queries using these metadata objects to complete.
*/
void _retireExpiredMetadata(WithLock);
@@ -228,23 +227,15 @@ private:
bool _overlapsInUseChunk(WithLock, ChunkRange const& range);
/**
- * Returns a notification if any range (possibly) still in use, but scheduled for cleanup,
- * overlaps the argument range.
- */
- boost::optional<CleanupNotification> _overlapsInUseCleanups(WithLock,
- ChunkRange const& range) const;
-
- /**
- * Copies the argument range to the list of ranges scheduled for immediate deletion, and
- * schedules a a background task to perform the work.
- */
- CleanupNotification _pushRangeToClean(WithLock, ChunkRange const& range, Date_t when);
-
- /**
- * Splices the argument list elements to the list of ranges scheduled for immediate deletion,
- * and schedules a a background task to perform the work.
+ * Schedule a task to delete the given range of documents once waitForActiveQueriesToComplete
+ * has been signaled, and store the resulting future for the task in
+ * _rangesScheduledForDeletion.
*/
- void _pushListToClean(WithLock, std::list<Deletion> range);
+ SharedSemiFuture<void> _submitRangeForDeletion(
+ const WithLock&,
+ SemiFuture<void> waitForActiveQueriesToComplete,
+ const ChunkRange& range,
+ Seconds delayForActiveQueriesOnSecondariesToComplete);
// ServiceContext from which to obtain instances of global support objects
ServiceContext* const _serviceContext;
@@ -256,7 +247,7 @@ private:
const UUID _collectionUuid;
// The background task that deletes documents from orphaned chunk ranges.
- executor::TaskExecutor* const _executor;
+ std::shared_ptr<executor::TaskExecutor> const _executor;
// Mutex to protect the state below
mutable Mutex _managerLock = MONGO_MAKE_LATCH("MetadataManager::_managerLock");
@@ -270,8 +261,8 @@ private:
// Chunk ranges being migrated into to the shard. Indexed by the min key of the range.
RangeMap _receivingChunks;
- // Ranges being deleted, or scheduled to be deleted, by a background task
- CollectionRangeDeleter _rangesToClean;
+ // Ranges being deleted, or scheduled to be deleted, by a background task.
+ std::list<std::pair<ChunkRange, SharedSemiFuture<void>>> _rangesScheduledForDeletion;
};
} // namespace mongo
diff --git a/src/mongo/db/s/metadata_manager_test.cpp b/src/mongo/db/s/metadata_manager_test.cpp
index 145c3e44014..a36d99e0a75 100644
--- a/src/mongo/db/s/metadata_manager_test.cpp
+++ b/src/mongo/db/s/metadata_manager_test.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/jsobj.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/s/metadata_manager.h"
+#include "mongo/db/s/sharding_runtime_d_params_gen.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/server_options.h"
#include "mongo/db/service_context.h"
@@ -65,7 +66,8 @@ protected:
void setUp() override {
ShardServerTestFixture::setUp();
_manager = std::make_shared<MetadataManager>(
- getServiceContext(), kNss, executor().get(), makeEmptyMetadata());
+ getServiceContext(), kNss, executor(), makeEmptyMetadata());
+ orphanCleanupDelaySecs.store(1);
}
/**
@@ -168,31 +170,25 @@ TEST_F(MetadataManagerTest, CleanUpForMigrateIn) {
ChunkRange range2(BSON("key" << 10), BSON("key" << 20));
auto notif1 = _manager->beginReceive(range1);
- ASSERT(!notif1.ready());
+ ASSERT(!notif1.isReady());
auto notif2 = _manager->beginReceive(range2);
- ASSERT(!notif2.ready());
+ ASSERT(!notif2.isReady());
ASSERT_EQ(2UL, _manager->numberOfRangesToClean());
ASSERT_EQ(0UL, _manager->numberOfRangesToCleanStillInUse());
-
- notif1.abandon();
- notif2.abandon();
}
TEST_F(MetadataManagerTest, AddRangeNotificationsBlockAndYield) {
ChunkRange cr1(BSON("key" << 0), BSON("key" << 10));
- auto notifn1 = _manager->cleanUpRange(cr1, Date_t{});
- ASSERT_FALSE(notifn1.ready());
+ auto notifn1 = _manager->cleanUpRange(cr1, false /*delayBeforeDeleting*/);
+ ASSERT_FALSE(notifn1.isReady());
ASSERT_EQ(_manager->numberOfRangesToClean(), 1UL);
auto optNotifn = _manager->trackOrphanedDataCleanup(cr1);
- ASSERT_FALSE(notifn1.ready());
- ASSERT_FALSE(optNotifn->ready());
- ASSERT(notifn1 == *optNotifn);
- notifn1.abandon();
- optNotifn->abandon();
+ ASSERT_FALSE(notifn1.isReady());
+ ASSERT_FALSE(optNotifn->isReady());
}
TEST_F(MetadataManagerTest, CleanupNotificationsAreSignaledWhenMetadataManagerIsDestroyed) {
@@ -211,24 +207,31 @@ TEST_F(MetadataManagerTest, CleanupNotificationsAreSignaledWhenMetadataManagerIs
_manager->setFilteringMetadata(
cloneMetadataMinusChunk(_manager->getActiveMetadata(boost::none), rangeToClean));
- auto notif = _manager->cleanUpRange(rangeToClean, Date_t{});
- ASSERT(!notif.ready());
+ auto notif = _manager->cleanUpRange(rangeToClean, false /*delayBeforeDeleting*/);
+ ASSERT(!notif.isReady());
auto optNotif = _manager->trackOrphanedDataCleanup(rangeToClean);
ASSERT(optNotif);
- ASSERT(!optNotif->ready());
+ ASSERT(!optNotif->isReady());
// Reset the original shared_ptr. The cursorOnMovedMetadata will still contain its own copy of
// the shared_ptr though, so the destructor of ~MetadataManager won't yet be called.
_manager.reset();
- ASSERT(!notif.ready());
- ASSERT(!optNotif->ready());
+ ASSERT(!notif.isReady());
+ ASSERT(!optNotif->isReady());
// Destroys the ScopedCollectionMetadata object and causes the destructor of MetadataManager to
// run, which should trigger all deletion notifications.
cursorOnMovedMetadata.reset();
- ASSERT(notif.ready());
- ASSERT(optNotif->ready());
+
+ // Advance time to simulate orphanCleanupDelaySecs passing.
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(network());
+ network()->advanceTime(network()->now() + Seconds{5});
+ }
+
+ notif.wait();
+ optNotif->wait();
}
TEST_F(MetadataManagerTest, RefreshAfterSuccessfulMigrationSinglePending) {
@@ -284,11 +287,9 @@ TEST_F(MetadataManagerTest, RangesToCleanMembership) {
ASSERT_EQ(0UL, _manager->numberOfRangesToClean());
- auto notifn = _manager->cleanUpRange(cr, Date_t{});
- ASSERT(!notifn.ready());
+ auto notifn = _manager->cleanUpRange(cr, false /*delayBeforeDeleting*/);
+ ASSERT(!notifn.isReady());
ASSERT_EQ(1UL, _manager->numberOfRangesToClean());
-
- notifn.abandon();
}
TEST_F(MetadataManagerTest, ClearUnneededChunkManagerObjectsLastSnapshotInList) {
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index d33e5fbcbc9..63d098e96a0 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -65,7 +65,6 @@
#include "mongo/s/grid.h"
#include "mongo/s/shard_key_pattern.h"
#include "mongo/stdx/chrono.h"
-#include "mongo/util/concurrency/notification.h"
#include "mongo/util/fail_point.h"
#include "mongo/util/log.h"
#include "mongo/util/producer_consumer_queue.h"
@@ -842,10 +841,10 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx) {
// Synchronously delete any data which might have been left orphaned in the range
// being moved, and wait for completion
- auto notification = _notePending(opCtx, range);
+ auto cleanupCompleteFuture = _notePending(opCtx, range);
// Wait for the range deletion to report back
- if (!notification.waitStatus(opCtx).isOK()) {
- _setStateFail(redact(notification.waitStatus(opCtx).reason()));
+ if (!cleanupCompleteFuture.getNoThrow(opCtx).isOK()) {
+ _setStateFail(redact(cleanupCompleteFuture.getNoThrow(opCtx).reason()));
return;
}
@@ -1229,8 +1228,8 @@ bool MigrationDestinationManager::_flushPendingWrites(OperationContext* opCtx,
return true;
}
-CollectionShardingRuntime::CleanupNotification MigrationDestinationManager::_notePending(
- OperationContext* opCtx, ChunkRange const& range) {
+SharedSemiFuture<void> MigrationDestinationManager::_notePending(OperationContext* opCtx,
+ ChunkRange const& range) {
AutoGetCollection autoColl(opCtx, _nss, MODE_X);
auto* const css = CollectionShardingRuntime::get(opCtx, _nss);
@@ -1247,13 +1246,13 @@ CollectionShardingRuntime::CleanupNotification MigrationDestinationManager::_not
}
// Start clearing any leftovers that would be in the new chunk
- auto notification = css->beginReceive(range);
- if (notification.ready() && !notification.waitStatus(opCtx).isOK()) {
- return notification.waitStatus(opCtx).withContext(
+ auto cleanupCompleteFuture = css->beginReceive(range);
+ if (cleanupCompleteFuture.isReady() && !cleanupCompleteFuture.getNoThrow(opCtx).isOK()) {
+ return cleanupCompleteFuture.getNoThrow(opCtx).withContext(
str::stream() << "Collection " << _nss.ns() << " range " << redact(range.toString())
<< " migration aborted");
}
- return notification;
+ return cleanupCompleteFuture;
}
void MigrationDestinationManager::_forgetPending(OperationContext* opCtx, ChunkRange const& range) {
diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h
index 41841c96abe..b97a9b60217 100644
--- a/src/mongo/db/s/migration_destination_manager.h
+++ b/src/mongo/db/s/migration_destination_manager.h
@@ -160,8 +160,7 @@ private:
* it schedules deletion of any documents in the range, so that process must be seen to be
* complete before migrating any new documents in.
*/
- CollectionShardingRuntime::CleanupNotification _notePending(OperationContext*,
- ChunkRange const&);
+ SharedSemiFuture<void> _notePending(OperationContext*, ChunkRange const&);
/**
* Stops tracking a chunk range between 'min' and 'max' that previously was having data
diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp
index dd131735106..4155ef4dada 100644
--- a/src/mongo/db/s/migration_source_manager.cpp
+++ b/src/mongo/db/s/migration_source_manager.cpp
@@ -674,7 +674,7 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig() {
}
}
} else {
- auto notification = [&] {
+ auto cleanupCompleteFuture = [&] {
auto const whenToClean = _args.getWaitForDelete() ? CollectionShardingRuntime::kNow
: CollectionShardingRuntime::kDelayed;
UninterruptibleLockGuard noInterrupt(_opCtx->lockState());
@@ -687,7 +687,7 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig() {
log() << "Waiting for cleanup of " << getNss().ns() << " range "
<< redact(range.toString());
- auto deleteStatus = notification.waitStatus(_opCtx);
+ auto deleteStatus = cleanupCompleteFuture.getNoThrow(_opCtx);
if (!deleteStatus.isOK()) {
return {ErrorCodes::OrphanedRangeCleanUpFailed,
@@ -697,13 +697,12 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig() {
return Status::OK();
}
- if (notification.ready() && !notification.waitStatus(_opCtx).isOK()) {
+ if (cleanupCompleteFuture.isReady() && !cleanupCompleteFuture.getNoThrow(_opCtx).isOK()) {
return {ErrorCodes::OrphanedRangeCleanUpFailed,
- orphanedRangeCleanUpErrMsg + redact(notification.waitStatus(_opCtx))};
+ orphanedRangeCleanUpErrMsg + redact(cleanupCompleteFuture.getNoThrow(_opCtx))};
} else {
log() << "Leaving cleanup of " << getNss().ns() << " range " << redact(range.toString())
<< " to complete in background";
- notification.abandon();
}
}
diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp
index 7994dce7251..f6affab21bf 100644
--- a/src/mongo/db/s/migration_util.cpp
+++ b/src/mongo/db/s/migration_util.cpp
@@ -156,13 +156,11 @@ bool submitRangeDeletionTask(OperationContext* opCtx, const RangeDeletionTask& d
return false;
}
- auto notification = css->cleanUpRange(deletionTask.getRange(), whenToClean);
+ auto cleanupCompleteFuture = css->cleanUpRange(deletionTask.getRange(), whenToClean);
- if (notification.ready() && !notification.waitStatus(opCtx).isOK()) {
+ if (cleanupCompleteFuture.isReady() && !cleanupCompleteFuture.getNoThrow(opCtx).isOK()) {
LOG(0) << "Failed to resubmit range for deletion: "
- << causedBy(notification.waitStatus(opCtx));
- } else {
- notification.abandon();
+ << causedBy(cleanupCompleteFuture.getNoThrow(opCtx));
}
return true;
diff --git a/src/mongo/db/s/migration_util_test.cpp b/src/mongo/db/s/migration_util_test.cpp
index a09ec14f5da..6e41b53e07a 100644
--- a/src/mongo/db/s/migration_util_test.cpp
+++ b/src/mongo/db/s/migration_util_test.cpp
@@ -256,8 +256,7 @@ void addRangeToReceivingChunks(OperationContext* opCtx,
const ChunkRange& range) {
AutoGetCollection autoColl(opCtx, nss, MODE_IS);
- auto notification = CollectionShardingRuntime::get(opCtx, nss)->beginReceive(range);
- notification.abandon();
+ std::ignore = CollectionShardingRuntime::get(opCtx, nss)->beginReceive(range);
}
RangeDeletionTask createDeletionTask(
diff --git a/src/mongo/db/s/range_deletion_util.cpp b/src/mongo/db/s/range_deletion_util.cpp
index 70f8253d74b..a6aa47c0a14 100644
--- a/src/mongo/db/s/range_deletion_util.cpp
+++ b/src/mongo/db/s/range_deletion_util.cpp
@@ -69,8 +69,8 @@ const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority,
WriteConcernOptions::SyncMode::UNSET,
WriteConcernOptions::kWriteConcernTimeoutSharding);
-MONGO_FAIL_POINT_DEFINE(hangBeforeDoingDeletionNewRangeDeleter);
-MONGO_FAIL_POINT_DEFINE(suspendRangeDeletionNewRangeDeleter);
+MONGO_FAIL_POINT_DEFINE(hangBeforeDoingDeletion);
+MONGO_FAIL_POINT_DEFINE(suspendRangeDeletion);
MONGO_FAIL_POINT_DEFINE(throwWriteConflictExceptionInDeleteRange);
MONGO_FAIL_POINT_DEFINE(throwInternalErrorInDeleteRange);
@@ -167,9 +167,9 @@ StatusWith<int> deleteNextBatch(OperationContext* opCtx,
PlanExecutor::YIELD_MANUAL,
InternalPlanner::FORWARD);
- if (MONGO_unlikely(hangBeforeDoingDeletionNewRangeDeleter.shouldFail())) {
- LOG(0) << "Hit hangBeforeDoingDeletionNewRangeDeleter failpoint";
- hangBeforeDoingDeletionNewRangeDeleter.pauseWhileSet(opCtx);
+ if (MONGO_unlikely(hangBeforeDoingDeletion.shouldFail())) {
+ LOG(0) << "Hit hangBeforeDoingDeletion failpoint";
+ hangBeforeDoingDeletion.pauseWhileSet(opCtx);
}
PlanYieldPolicy planYieldPolicy(exec.get(), PlanExecutor::YIELD_MANUAL);
@@ -344,7 +344,7 @@ SharedSemiFuture<void> removeDocumentsInRange(
invariant(s.isOK());
})
.then([=]() mutable {
- suspendRangeDeletionNewRangeDeleter.pauseWhileSet();
+ suspendRangeDeletion.pauseWhileSet();
// Wait for possibly ongoing queries on secondaries to complete.
return sleepUntil(executor,
executor->now() + delayForActiveQueriesOnSecondariesToComplete);