diff options
author | Jack Mulrow <jack.mulrow@mongodb.com> | 2020-02-10 17:36:27 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-02-14 01:19:41 +0000 |
commit | 0d083cfc66abb8804ca5b80c8b2cea534434099b (patch) | |
tree | 0f9be5b0231c78d94bf318346dc8a0d92bc8aa84 | |
parent | 149aae77fd00cbb0d5760881e76eae631e1f0e11 (diff) | |
download | mongo-0d083cfc66abb8804ca5b80c8b2cea534434099b.tar.gz |
SERVER-45741 Use WaitForMajorityService in migration coordinator stepup task
-rw-r--r-- | src/mongo/db/s/migration_util.cpp | 39 |
1 files changed, 22 insertions, 17 deletions
diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp index f4dbbf3d7a2..9bea2882d3e 100644 --- a/src/mongo/db/s/migration_util.cpp +++ b/src/mongo/db/s/migration_util.cpp @@ -49,6 +49,7 @@ #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/migration_coordinator.h" #include "mongo/db/s/shard_filtering_metadata_refresh.h" +#include "mongo/db/s/wait_for_majority_service.h" #include "mongo/db/write_concern.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/executor/thread_pool_task_executor.h" @@ -602,14 +603,13 @@ void resumeMigrationCoordinationsOnStepUp(ServiceContext* serviceContext) { LOGV2(22037, "Starting migration coordinator stepup recovery thread."); auto executor = Grid::get(serviceContext)->getExecutorPool()->getFixedExecutor(); - ExecutorFuture<void>(executor).getAsync([serviceContext](const Status& status) { - try { + ExecutorFuture<void>(executor) + .then([serviceContext] { ThreadClient tc("MigrationCoordinatorStepupRecovery", serviceContext); { stdx::lock_guard<Client> lk(*tc.get()); tc->setSystemOperationKillable(lk); } - auto uniqueOpCtx = tc->makeOperationContext(); auto opCtx = uniqueOpCtx.get(); @@ -633,14 +633,17 @@ void resumeMigrationCoordinationsOnStepUp(ServiceContext* serviceContext) { LOGV2(22038, "Waiting for OpTime {lastOpTime} to become majority committed", "lastOpTime"_attr = lastOpTime); - WriteConcernResult unusedWCResult; - uassertStatusOK( - waitForWriteConcern(opCtx, - lastOpTime, - WriteConcernOptions{WriteConcernOptions::kMajority, - WriteConcernOptions::SyncMode::UNSET, - WriteConcernOptions::kNoTimeout}, - &unusedWCResult)); + return WaitForMajorityService::get(serviceContext).waitUntilMajority(lastOpTime); + }) + .thenRunOn(executor) + .then([serviceContext]() { + ThreadClient tc("MigrationCoordinatorStepupRecovery", serviceContext); + { + stdx::lock_guard<Client> lk(*tc.get()); + tc->setSystemOperationKillable(lk); + } + auto uniqueOpCtx = tc->makeOperationContext(); + auto opCtx = uniqueOpCtx.get(); PersistentTaskStore<MigrationCoordinatorDocument> store( opCtx, NamespaceString::kMigrationCoordinatorsNamespace); @@ -718,12 +721,14 @@ void resumeMigrationCoordinationsOnStepUp(ServiceContext* serviceContext) { coordinator.completeMigration(opCtx); return true; }); - } catch (const DBException& ex) { - LOGV2(22041, - "Failed to resume coordinating migrations on stepup {causedBy_ex_toStatus}", - "causedBy_ex_toStatus"_attr = causedBy(ex.toStatus())); - } - }); + }) + .getAsync([](const Status& status) { + if (!status.isOK()) { + LOGV2(22041, + "Failed to resume coordinating migrations on stepup {causedBy_status}", + "causedBy_status"_attr = causedBy(status)); + } + }); } } // namespace migrationutil |