summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/resharding/resharding_coordinator_service.cpp')
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.cpp41
1 files changed, 40 insertions, 1 deletions
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
index 42be9c1a946..40a72cb75d7 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
@@ -51,6 +51,12 @@ ReshardingCoordinatorService::ReshardingCoordinator::ReshardingCoordinator(const
_reshardingCoordinatorObserver = std::make_shared<ReshardingCoordinatorObserver>();
}
+ReshardingCoordinatorService::ReshardingCoordinator::~ReshardingCoordinator() {
+ stdx::lock_guard<Latch> lg(_mutex);
+ invariant(_initialChunksAndZonesPromise.getFuture().isReady());
+ invariant(_completionPromise.getFuture().isReady());
+}
+
void ReshardingCoordinatorService::ReshardingCoordinator::run(
std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept {
ExecutorFuture<void>(**executor)
@@ -80,6 +86,12 @@ void ReshardingCoordinatorService::ReshardingCoordinator::run(
.then([this] { _tellAllRecipientsToRefresh(); })
.then([this] { _tellAllDonorsToRefresh(); })
.onError([this](Status status) {
+ stdx::lock_guard<Latch> lg(_mutex);
+ if (_completionPromise.getFuture().isReady()) {
+ // interrupt() was called before we got here.
+ return status;
+ }
+
_runUpdates(CoordinatorStateEnum::kError, _stateDoc);
LOGV2(4956902,
@@ -94,11 +106,38 @@ void ReshardingCoordinatorService::ReshardingCoordinator::run(
return status;
})
- .getAsync([](Status) {});
+ .getAsync([this](Status status) {
+ stdx::lock_guard<Latch> lg(_mutex);
+ if (_completionPromise.getFuture().isReady()) {
+ // interrupt() was called before we got here.
+ return;
+ }
+
+ if (status.isOK()) {
+ _completionPromise.emplaceValue();
+ } else {
+ _completionPromise.setError(status);
+ }
+ });
+}
+
+void ReshardingCoordinatorService::ReshardingCoordinator::interrupt(Status status) {
+ // Resolve any unresolved promises to avoid hanging.
+ stdx::lock_guard<Latch> lg(_mutex);
+ if (!_initialChunksAndZonesPromise.getFuture().isReady()) {
+ _initialChunksAndZonesPromise.setError(status);
+ }
+
+ _reshardingCoordinatorObserver->interrupt(status);
+
+ if (!_completionPromise.getFuture().isReady()) {
+ _completionPromise.setError(status);
+ }
}
void ReshardingCoordinatorService::ReshardingCoordinator::setInitialChunksAndZones(
std::vector<ChunkType> initialChunks, std::vector<TagsType> newZones) {
+ stdx::lock_guard<Latch> lg(_mutex);
if (_stateDoc.getState() > CoordinatorStateEnum::kInitializing ||
_initialChunksAndZonesPromise.getFuture().isReady()) {
return;