diff options
Diffstat (limited to 'src/mongo/db/s/resharding/resharding_coordinator_service.cpp')
-rw-r--r-- | src/mongo/db/s/resharding/resharding_coordinator_service.cpp | 41 |
1 files changed, 40 insertions, 1 deletions
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp index 42be9c1a946..40a72cb75d7 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp @@ -51,6 +51,12 @@ ReshardingCoordinatorService::ReshardingCoordinator::ReshardingCoordinator(const _reshardingCoordinatorObserver = std::make_shared<ReshardingCoordinatorObserver>(); } +ReshardingCoordinatorService::ReshardingCoordinator::~ReshardingCoordinator() { + stdx::lock_guard<Latch> lg(_mutex); + invariant(_initialChunksAndZonesPromise.getFuture().isReady()); + invariant(_completionPromise.getFuture().isReady()); +} + void ReshardingCoordinatorService::ReshardingCoordinator::run( std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept { ExecutorFuture<void>(**executor) @@ -80,6 +86,12 @@ void ReshardingCoordinatorService::ReshardingCoordinator::run( .then([this] { _tellAllRecipientsToRefresh(); }) .then([this] { _tellAllDonorsToRefresh(); }) .onError([this](Status status) { + stdx::lock_guard<Latch> lg(_mutex); + if (_completionPromise.getFuture().isReady()) { + // interrupt() was called before we got here. + return status; + } + _runUpdates(CoordinatorStateEnum::kError, _stateDoc); LOGV2(4956902, @@ -94,11 +106,38 @@ void ReshardingCoordinatorService::ReshardingCoordinator::run( return status; }) - .getAsync([](Status) {}); + .getAsync([this](Status status) { + stdx::lock_guard<Latch> lg(_mutex); + if (_completionPromise.getFuture().isReady()) { + // interrupt() was called before we got here. + return; + } + + if (status.isOK()) { + _completionPromise.emplaceValue(); + } else { + _completionPromise.setError(status); + } + }); +} + +void ReshardingCoordinatorService::ReshardingCoordinator::interrupt(Status status) { + // Resolve any unresolved promises to avoid hanging. + stdx::lock_guard<Latch> lg(_mutex); + if (!_initialChunksAndZonesPromise.getFuture().isReady()) { + _initialChunksAndZonesPromise.setError(status); + } + + _reshardingCoordinatorObserver->interrupt(status); + + if (!_completionPromise.getFuture().isReady()) { + _completionPromise.setError(status); + } } void ReshardingCoordinatorService::ReshardingCoordinator::setInitialChunksAndZones( std::vector<ChunkType> initialChunks, std::vector<TagsType> newZones) { + stdx::lock_guard<Latch> lg(_mutex); if (_stateDoc.getState() > CoordinatorStateEnum::kInitializing || _initialChunksAndZonesPromise.getFuture().isReady()) { return; |