summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/resharding
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/resharding')
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_observer.cpp9
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_observer.h6
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.cpp25
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.h3
-rw-r--r--src/mongo/db/s/resharding/resharding_server_parameters.idl12
5 files changed, 53 insertions, 2 deletions
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp b/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp
index 0522ac65ef4..2cc932b201e 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp
@@ -275,6 +275,15 @@ void ReshardingCoordinatorObserver::interrupt(Status status) {
}
}
+void ReshardingCoordinatorObserver::onCriticalSectionTimeout() {
+ stdx::lock_guard<Latch> lk(_mutex);
+ if (!_allRecipientsReportedStrictConsistencyTimestamp.getFuture().isReady()) {
+ _allRecipientsReportedStrictConsistencyTimestamp.setError(
+ Status{ErrorCodes::ReshardingCriticalSectionTimeout,
+ "Resharding critical section timed out."});
+ }
+}
+
void ReshardingCoordinatorObserver::_onAbortOrStepdown(WithLock, Status status) {
if (!_allDonorsReportedMinFetchTimestamp.getFuture().isReady()) {
_allDonorsReportedMinFetchTimestamp.setError(status);
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_observer.h b/src/mongo/db/s/resharding/resharding_coordinator_observer.h
index 50d4c2fdd5e..b0c6eedfe95 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_observer.h
+++ b/src/mongo/db/s/resharding/resharding_coordinator_observer.h
@@ -110,6 +110,12 @@ public:
SharedSemiFuture<void> awaitAllParticipantsDoneAborting();
/**
+ * Checks if all recipients are in steady state. Otherwise, sets an error state so that
+ * resharding is aborted.
+ */
+ void onCriticalSectionTimeout();
+
+ /**
* Sets errors on any promises that have not yet been fulfilled.
*/
void interrupt(Status status);
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
index e76a0f93d03..50fe65125ee 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
@@ -1001,7 +1001,7 @@ SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run(
const ReshardingCoordinatorDocument& updatedCoordinatorDoc) {
return _persistDecisionAndFinishReshardOperation(executor, updatedCoordinatorDoc);
})
- .onCompletion([this, self = shared_from_this()](Status status) {
+ .onCompletion([this, self = shared_from_this(), executor](Status status) {
// TODO SERVER-53914 depending on where we load metrics at the start of the operation,
// this may need to change
if (_coordinatorDoc.getState() != CoordinatorStateEnum::kUnused) {
@@ -1021,6 +1021,10 @@ SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run(
_completionPromise.setError(status);
}
+ if (_criticalSectionTimeoutCbHandle) {
+ (*executor)->cancel(*_criticalSectionTimeoutCbHandle);
+ }
+
return status;
})
.thenRunOn(_coordinatorService->getInstanceCleanupExecutor())
@@ -1243,7 +1247,7 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsFinished
_reshardingCoordinatorObserver->awaitAllRecipientsFinishedApplying(),
_ctHolder->getAbortToken())
.thenRunOn(**executor)
- .then([this](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) {
+ .then([this, executor](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) {
{
auto opCtx = cc().makeOperationContext();
reshardingPauseCoordinatorInSteadyState.pauseWhileSetAndNotCanceled(
@@ -1252,6 +1256,23 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsFinished
this->_updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum::kBlockingWrites,
coordinatorDocChangedOnDisk);
+
+ const auto criticalSectionTimeout =
+ Milliseconds(resharding::gReshardingCriticalSectionTimeoutMillis.load());
+ auto swCbHandle = (*executor)->scheduleWorkAt(
+ (*executor)->now() + criticalSectionTimeout,
+ [this](const executor::TaskExecutor::CallbackArgs& cbData) {
+ if (!cbData.status.isOK()) {
+ return;
+ }
+ _reshardingCoordinatorObserver->onCriticalSectionTimeout();
+ });
+
+ if (!swCbHandle.isOK()) {
+ _reshardingCoordinatorObserver->interrupt(swCbHandle.getStatus());
+ }
+
+ _criticalSectionTimeoutCbHandle = swCbHandle.getValue();
});
}
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.h b/src/mongo/db/s/resharding/resharding_coordinator_service.h
index b99accd3050..6efe33d2ea9 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.h
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.h
@@ -377,6 +377,9 @@ private:
// Promise that is resolved when the chain of work kicked off by run() has completed.
SharedPromise<void> _completionPromise;
+
+ // Callback handle for scheduled work to handle critical section timeout.
+ boost::optional<executor::TaskExecutor::CallbackHandle> _criticalSectionTimeoutCbHandle;
};
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_server_parameters.idl b/src/mongo/db/s/resharding/resharding_server_parameters.idl
index 30cf8e9b728..4c13b83d93b 100644
--- a/src/mongo/db/s/resharding/resharding_server_parameters.idl
+++ b/src/mongo/db/s/resharding/resharding_server_parameters.idl
@@ -119,3 +119,15 @@ server_parameters:
expr: 5 * 60 * 1000
validator:
gte: 0
+
+ reshardingCriticalSectionTimeoutMillis:
+ description: >-
+ The upper limit on how long to wait to hear back from recipient shards reaching strict
+ consistency after engaging the critical section.
+ set_at: [startup, runtime]
+ cpp_vartype: AtomicWord<int>
+ cpp_varname: gReshardingCriticalSectionTimeoutMillis
+ default:
+ expr: 5 * 1000
+ validator:
+ gte: 0