diff options
Diffstat (limited to 'src/mongo/db/s/resharding')
5 files changed, 53 insertions, 2 deletions
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp b/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp index 0522ac65ef4..2cc932b201e 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp @@ -275,6 +275,15 @@ void ReshardingCoordinatorObserver::interrupt(Status status) { } } +void ReshardingCoordinatorObserver::onCriticalSectionTimeout() { + stdx::lock_guard<Latch> lk(_mutex); + if (!_allRecipientsReportedStrictConsistencyTimestamp.getFuture().isReady()) { + _allRecipientsReportedStrictConsistencyTimestamp.setError( + Status{ErrorCodes::ReshardingCriticalSectionTimeout, + "Resharding critical section timed out."}); + } +} + void ReshardingCoordinatorObserver::_onAbortOrStepdown(WithLock, Status status) { if (!_allDonorsReportedMinFetchTimestamp.getFuture().isReady()) { _allDonorsReportedMinFetchTimestamp.setError(status); diff --git a/src/mongo/db/s/resharding/resharding_coordinator_observer.h b/src/mongo/db/s/resharding/resharding_coordinator_observer.h index 50d4c2fdd5e..b0c6eedfe95 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_observer.h +++ b/src/mongo/db/s/resharding/resharding_coordinator_observer.h @@ -110,6 +110,12 @@ public: SharedSemiFuture<void> awaitAllParticipantsDoneAborting(); /** + * Checks if all recipients are in steady state. Otherwise, sets an error state so that + * resharding is aborted. + */ + void onCriticalSectionTimeout(); + + /** * Sets errors on any promises that have not yet been fulfilled. */ void interrupt(Status status); diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp index e76a0f93d03..50fe65125ee 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp @@ -1001,7 +1001,7 @@ SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run( const ReshardingCoordinatorDocument& updatedCoordinatorDoc) { return _persistDecisionAndFinishReshardOperation(executor, updatedCoordinatorDoc); }) - .onCompletion([this, self = shared_from_this()](Status status) { + .onCompletion([this, self = shared_from_this(), executor](Status status) { // TODO SERVER-53914 depending on where we load metrics at the start of the operation, // this may need to change if (_coordinatorDoc.getState() != CoordinatorStateEnum::kUnused) { @@ -1021,6 +1021,10 @@ SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run( _completionPromise.setError(status); } + if (_criticalSectionTimeoutCbHandle) { + (*executor)->cancel(*_criticalSectionTimeoutCbHandle); + } + return status; }) .thenRunOn(_coordinatorService->getInstanceCleanupExecutor()) @@ -1243,7 +1247,7 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsFinished _reshardingCoordinatorObserver->awaitAllRecipientsFinishedApplying(), _ctHolder->getAbortToken()) .thenRunOn(**executor) - .then([this](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) { + .then([this, executor](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) { { auto opCtx = cc().makeOperationContext(); reshardingPauseCoordinatorInSteadyState.pauseWhileSetAndNotCanceled( @@ -1252,6 +1256,23 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsFinished this->_updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum::kBlockingWrites, coordinatorDocChangedOnDisk); + + const auto criticalSectionTimeout = + Milliseconds(resharding::gReshardingCriticalSectionTimeoutMillis.load()); + auto swCbHandle = (*executor)->scheduleWorkAt( + (*executor)->now() + criticalSectionTimeout, + [this](const executor::TaskExecutor::CallbackArgs& cbData) { + if (!cbData.status.isOK()) { + return; + } + _reshardingCoordinatorObserver->onCriticalSectionTimeout(); + }); + + if (!swCbHandle.isOK()) { + _reshardingCoordinatorObserver->interrupt(swCbHandle.getStatus()); + } + + _criticalSectionTimeoutCbHandle = swCbHandle.getValue(); }); } diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.h b/src/mongo/db/s/resharding/resharding_coordinator_service.h index b99accd3050..6efe33d2ea9 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.h +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.h @@ -377,6 +377,9 @@ private: // Promise that is resolved when the chain of work kicked off by run() has completed. SharedPromise<void> _completionPromise; + + // Callback handle for scheduled work to handle critical section timeout. + boost::optional<executor::TaskExecutor::CallbackHandle> _criticalSectionTimeoutCbHandle; }; } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_server_parameters.idl b/src/mongo/db/s/resharding/resharding_server_parameters.idl index 30cf8e9b728..4c13b83d93b 100644 --- a/src/mongo/db/s/resharding/resharding_server_parameters.idl +++ b/src/mongo/db/s/resharding/resharding_server_parameters.idl @@ -119,3 +119,15 @@ server_parameters: expr: 5 * 60 * 1000 validator: gte: 0 + + reshardingCriticalSectionTimeoutMillis: + description: >- + The upper limit on how long to wait to hear back from recipient shards reaching strict + consistency after engaging the critical section. + set_at: [startup, runtime] + cpp_vartype: AtomicWord<int> + cpp_varname: gReshardingCriticalSectionTimeoutMillis + default: + expr: 5 * 1000 + validator: + gte: 0 |