summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorPierlauro Sciarelli <pierlauro.sciarelli@mongodb.com>2022-09-12 15:39:28 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-09-12 16:47:44 +0000
commit727d3de09c9e38206cbde56c8a431f6e7b480da6 (patch)
treecdd2dbd0544449f99cc59984593121e6dc3351f4 /src/mongo/db
parentc7f9bb9676a8859a21aaea5fdd81e23a4e9d79e8 (diff)
downloadmongo-727d3de09c9e38206cbde56c8a431f6e7b480da6.tar.gz
SERVER-68348 Asynchronously register tasks on the range deleter service on step-up
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/repl/replica_set_aware_service.h3
-rw-r--r--src/mongo/db/s/range_deleter_service.cpp103
-rw-r--r--src/mongo/db/s/range_deleter_service.h17
-rw-r--r--src/mongo/db/s/range_deleter_service_test.cpp59
4 files changed, 169 insertions, 13 deletions
diff --git a/src/mongo/db/repl/replica_set_aware_service.h b/src/mongo/db/repl/replica_set_aware_service.h
index 3f099610a5c..a6b723163c3 100644
--- a/src/mongo/db/repl/replica_set_aware_service.h
+++ b/src/mongo/db/repl/replica_set_aware_service.h
@@ -148,6 +148,9 @@ public:
/**
* Called after the node has transitioned out of PRIMARY. Usually this is into SECONDARY, but it
* could also be into ROLLBACK or REMOVED.
+ *
+ * NB: also called when SECONDARY nodes transition to ROLLBACK, hence it should never be assumed
+ * that `onStepUp` hooks have been invoked at least once before this method is invoked.
*/
virtual void onStepDown() = 0;
diff --git a/src/mongo/db/s/range_deleter_service.cpp b/src/mongo/db/s/range_deleter_service.cpp
index f90618421d5..a24e14082f9 100644
--- a/src/mongo/db/s/range_deleter_service.cpp
+++ b/src/mongo/db/s/range_deleter_service.cpp
@@ -28,10 +28,13 @@
*/
#include "mongo/db/s/range_deleter_service.h"
+#include "mongo/db/dbdirectclient.h"
#include "mongo/db/op_observer/op_observer_registry.h"
+#include "mongo/db/s/balancer_stats_registry.h"
#include "mongo/db/s/range_deleter_service_op_observer.h"
#include "mongo/logv2/log.h"
#include "mongo/s/sharding_feature_flags_gen.h"
+#include "mongo/util/future_util.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kShardingRangeDeleter
@@ -83,18 +86,94 @@ void RangeDeleterService::onStepUpComplete(OperationContext* opCtx, long long te
_executor = std::move(taskExecutor);
_executor->startup();
- _recoverRangeDeletionsOnStepUp();
+ _recoverRangeDeletionsOnStepUp(opCtx);
}
-void RangeDeleterService::_recoverRangeDeletionsOnStepUp() {
-
+void RangeDeleterService::_recoverRangeDeletionsOnStepUp(OperationContext* opCtx) {
if (disableResumableRangeDeleter.load()) {
_state.store(kDown);
return;
}
- // TODO SERVER-68348 Asynchronously register tasks on the range deleter service on step-up
- _state.store(kUp);
+ LOGV2(6834800, "Resubmitting range deletion tasks");
+
+ ServiceContext* serviceContext = opCtx->getServiceContext();
+
+ ExecutorFuture<void>(_executor)
+ .then([serviceContext, this] {
+ ThreadClient tc("ResubmitRangeDeletionsOnStepUp", serviceContext);
+ {
+ stdx::lock_guard<Client> lk(*tc.get());
+ tc->setSystemOperationKillableByStepdown(lk);
+ }
+ auto opCtx = tc->makeOperationContext();
+ opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE();
+
+ ScopedRangeDeleterLock rangeDeleterLock(opCtx.get());
+ DBDirectClient client(opCtx.get());
+
+ int nRescheduledTasks = 0;
+
+ // (1) register range deletion tasks marked as "processing"
+ auto processingTasksCompletionFuture = [&] {
+ std::vector<ExecutorFuture<void>> processingTasksCompletionFutures;
+ FindCommandRequest findCommand(NamespaceString::kRangeDeletionNamespace);
+ findCommand.setFilter(BSON(RangeDeletionTask::kProcessingFieldName << true));
+ auto cursor = client.find(std::move(findCommand));
+
+ while (cursor->more()) {
+ auto completionFuture = this->registerTask(
+ RangeDeletionTask::parse(IDLParserContext("rangeDeletionRecovery"),
+ cursor->next()),
+ SemiFuture<void>::makeReady(),
+ true /* fromResubmitOnStepUp */);
+ nRescheduledTasks++;
+ processingTasksCompletionFutures.push_back(
+ completionFuture.thenRunOn(_executor));
+ }
+
+ if (nRescheduledTasks > 1) {
+ LOGV2_WARNING(6834801,
+ "Rescheduling several range deletions marked as processing. "
+ "Orphans count may be off while they are not drained",
+ "numRangeDeletionsMarkedAsProcessing"_attr = nRescheduledTasks);
+ }
+
+ return processingTasksCompletionFutures.size() > 0
+ ? whenAllSucceed(std::move(processingTasksCompletionFutures)).share()
+ : SemiFuture<void>::makeReady().share();
+ }();
+
+ // (2) register all other "non-pending" tasks
+ {
+ FindCommandRequest findCommand(NamespaceString::kRangeDeletionNamespace);
+ findCommand.setFilter(BSON(RangeDeletionTask::kProcessingFieldName
+ << BSON("$ne" << true)
+ << RangeDeletionTask::kPendingFieldName
+ << BSON("$ne" << true)));
+ auto cursor = client.find(std::move(findCommand));
+ while (cursor->more()) {
+ (void)this->registerTask(
+ RangeDeletionTask::parse(IDLParserContext("rangeDeletionRecovery"),
+ cursor->next()),
+ processingTasksCompletionFuture.thenRunOn(_executor).semi(),
+ true /* fromResubmitOnStepUp */);
+ }
+ }
+
+ LOGV2_INFO(6834802,
+ "Finished resubmitting range deletion tasks",
+ "nRescheduledTasks"_attr = nRescheduledTasks);
+
+ auto lock = _acquireMutexUnconditionally();
+ // Since the recovery is only spawned on step-up but may complete later, it's not
+ // assumable that the node is still primary when the all resubmissions finish
+ if (_state.load() != kDown) {
+ this->_rangeDeleterServiceUpCondVar_FOR_TESTING.notify_all();
+ this->_state.store(kUp);
+ }
+ })
+ .getAsync([](auto) {});
}
void RangeDeleterService::onStepDown() {
@@ -103,9 +182,12 @@ void RangeDeleterService::onStepDown() {
}
auto lock = _acquireMutexUnconditionally();
- dassert(_state.load() != kDown, "Service expected to be initializing/up before stepping down");
- _executor->shutdown();
+ // It may happen for the `onStepDown` hook to be invoked on a SECONDARY node transitioning
+ // to ROLLBACK, hence the executor may have never been initialized
+ if (_executor) {
+ _executor->shutdown();
+ }
_state.store(kDown);
}
@@ -149,7 +231,9 @@ long long RangeDeleterService::totalNumOfRegisteredTasks() {
}
SharedSemiFuture<void> RangeDeleterService::registerTask(
- const RangeDeletionTask& rdt, SemiFuture<void>&& waitForActiveQueriesToComplete) {
+ const RangeDeletionTask& rdt,
+ SemiFuture<void>&& waitForActiveQueriesToComplete,
+ bool fromResubmitOnStepUp) {
if (disableResumableRangeDeleter.load()) {
return SemiFuture<void>::makeReady(
@@ -207,7 +291,8 @@ SharedSemiFuture<void> RangeDeleterService::registerTask(
.share();
auto [taskCompletionFuture, inserted] = [&]() -> std::pair<SharedSemiFuture<void>, bool> {
- auto lock = _acquireMutexFailIfServiceNotUp();
+ auto lock = fromResubmitOnStepUp ? _acquireMutexUnconditionally()
+ : _acquireMutexFailIfServiceNotUp();
auto [registeredTask, inserted] = _rangeDeletionTasks[rdt.getCollectionUuid()].insert(
std::make_shared<RangeDeletion>(RangeDeletion(rdt, chainCompletionFuture)));
auto retFuture = static_cast<RangeDeletion*>(registeredTask->get())->getCompletionFuture();
diff --git a/src/mongo/db/s/range_deleter_service.h b/src/mongo/db/s/range_deleter_service.h
index 7de54c0be60..70f41bc1ccd 100644
--- a/src/mongo/db/s/range_deleter_service.h
+++ b/src/mongo/db/s/range_deleter_service.h
@@ -92,6 +92,9 @@ private:
AtomicWord<State> _state{kDown};
+ // ONLY FOR TESTING: variable notified when the state changes to "up"
+ stdx::condition_variable _rangeDeleterServiceUpCondVar_FOR_TESTING;
+
/* Acquire mutex only if service is up (for "user" operation) */
[[nodiscard]] stdx::unique_lock<Latch> _acquireMutexFailIfServiceNotUp() {
stdx::unique_lock<Latch> lg(_mutex_DO_NOT_USE_DIRECTLY);
@@ -120,7 +123,8 @@ public:
*/
SharedSemiFuture<void> registerTask(
const RangeDeletionTask& rdt,
- SemiFuture<void>&& waitForActiveQueriesToComplete = SemiFuture<void>::makeReady());
+ SemiFuture<void>&& waitForActiveQueriesToComplete = SemiFuture<void>::makeReady(),
+ bool fromResubmitOnStepUp = false);
/*
* Deregister a task from the range deleter service.
@@ -157,9 +161,18 @@ public:
*/
long long totalNumOfRegisteredTasks();
+ /* ONLY FOR TESTING: wait for the state to become "up" */
+ void _waitForRangeDeleterServiceUp_FOR_TESTING() {
+ stdx::unique_lock<Latch> lg(_mutex_DO_NOT_USE_DIRECTLY);
+ if (_state.load() != kUp) {
+ _rangeDeleterServiceUpCondVar_FOR_TESTING.wait(lg,
+ [&]() { return _state.load() == kUp; });
+ }
+ }
+
private:
/* Asynchronously register range deletions on the service. To be called on on step-up */
- void _recoverRangeDeletionsOnStepUp();
+ void _recoverRangeDeletionsOnStepUp(OperationContext* opCtx);
/* ReplicaSetAwareServiceShardSvr "empty implemented" methods */
void onStartup(OperationContext* opCtx) override final{};
diff --git a/src/mongo/db/s/range_deleter_service_test.cpp b/src/mongo/db/s/range_deleter_service_test.cpp
index 26e1b0aca49..6d3d49eced0 100644
--- a/src/mongo/db/s/range_deleter_service_test.cpp
+++ b/src/mongo/db/s/range_deleter_service_test.cpp
@@ -31,6 +31,7 @@
#include "mongo/bson/unordered_fields_bsonobj_comparator.h"
#include "mongo/db/catalog/create_collection.h"
#include "mongo/db/catalog_raii.h"
+#include "mongo/db/persistent_task_store.h"
#include "mongo/db/s/operation_sharding_state.h"
namespace mongo {
@@ -42,6 +43,7 @@ void RangeDeleterServiceTest::setUp() {
ShardServerTestFixture::setUp();
opCtx = operationContext();
RangeDeleterService::get(opCtx)->onStepUpComplete(opCtx, 0L);
+ RangeDeleterService::get(opCtx)->_waitForRangeDeleterServiceUp_FOR_TESTING();
{
OperationShardingState::ScopedAllowImplicitCollectionCreate_UNSAFE unsafeCreateCollection(
@@ -261,7 +263,9 @@ TEST_F(RangeDeleterServiceTest, ScheduledTaskInvalidatedOnStepDown) {
// Manually trigger disabling of the service
rds->onStepDown();
ON_BLOCK_EXIT([&] {
- rds->onStepUpComplete(opCtx, 0L); // Re-enable the service for clean teardown
+ // Re-enable the service for clean teardown
+ rds->onStepUpComplete(opCtx, 0L);
+ rds->_waitForRangeDeleterServiceUp_FOR_TESTING();
});
try {
@@ -277,7 +281,9 @@ TEST_F(RangeDeleterServiceTest, NoActionPossibleIfServiceIsDown) {
// Manually trigger disabling of the service
rds->onStepDown();
ON_BLOCK_EXIT([&] {
- rds->onStepUpComplete(opCtx, 0L); // Re-enable the service for clean teardown
+ // Re-enable the service for clean teardown
+ rds->onStepUpComplete(opCtx, 0L);
+ rds->_waitForRangeDeleterServiceUp_FOR_TESTING();
});
auto taskWithOngoingQueries = createRangeDeletionTaskWithOngoingQueries(
@@ -588,4 +594,53 @@ TEST_F(RangeDeleterServiceTest,
ASSERT(overlappingRangeFutureWhenDisabled.isReady());
}
+TEST_F(RangeDeleterServiceTest, RescheduleRangeDeletionTasksOnStepUp) {
+ PseudoRandom random(SecureRandom().nextInt64());
+ auto rds = RangeDeleterService::get(opCtx);
+
+ // Trigger step-down
+ rds->onStepDown();
+
+ // Random number of range deletion documents to generate (minimum 1, maximum 20).
+ int nRangeDeletionTasks = random.nextInt32(20) + 1;
+
+ PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace);
+
+ // Generate and persist range deleter tasks (some pending, some non-pending, some non-pending &&
+ // processing)
+ int nPending = 0, nNonPending = 0, nNonPendingAndProcessing = 0;
+ int minBound = 0;
+ for (int i = 0; i < nRangeDeletionTasks; i++) {
+ auto rangeDeletionTask = createRangeDeletionTask(
+ uuidCollA, BSON("a" << minBound), BSON("a" << minBound + 10), CleanWhenEnum::kDelayed);
+ minBound += 10;
+
+ auto rand = random.nextInt32() % 3;
+ if (rand == 0) {
+ // Pending range deletion task
+ rangeDeletionTask.setPending(true);
+ nPending++;
+ } else if (rand == 1) {
+ // Non-pending range deletion task
+ rangeDeletionTask.setPending(false);
+ nNonPending++;
+ } else if (rand == 2) {
+ // Non-pending and processing range deletion task
+ rangeDeletionTask.setPending(false);
+ rangeDeletionTask.setProcessing(true);
+ nNonPendingAndProcessing++;
+ }
+
+ store.add(opCtx, rangeDeletionTask);
+ }
+
+ // Trigger step-up
+ rds->onStepUpComplete(opCtx, 0L);
+ rds->_waitForRangeDeleterServiceUp_FOR_TESTING();
+
+ // Check that all non-pending tasks are being rescheduled
+ ASSERT_EQ(nNonPending + nNonPendingAndProcessing,
+ rds->getNumRangeDeletionTasksForCollection(uuidCollA));
+}
+
} // namespace mongo