summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJack Mulrow <jack.mulrow@mongodb.com>2020-02-10 17:36:27 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-02-14 01:19:41 +0000
commit0d083cfc66abb8804ca5b80c8b2cea534434099b (patch)
tree0f9be5b0231c78d94bf318346dc8a0d92bc8aa84 /src
parent149aae77fd00cbb0d5760881e76eae631e1f0e11 (diff)
downloadmongo-0d083cfc66abb8804ca5b80c8b2cea534434099b.tar.gz
SERVER-45741 Use WaitForMajorityService in migration coordinator stepup task
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/s/migration_util.cpp39
1 files changed, 22 insertions, 17 deletions
diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp
index f4dbbf3d7a2..9bea2882d3e 100644
--- a/src/mongo/db/s/migration_util.cpp
+++ b/src/mongo/db/s/migration_util.cpp
@@ -49,6 +49,7 @@
#include "mongo/db/s/collection_sharding_runtime.h"
#include "mongo/db/s/migration_coordinator.h"
#include "mongo/db/s/shard_filtering_metadata_refresh.h"
+#include "mongo/db/s/wait_for_majority_service.h"
#include "mongo/db/write_concern.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/executor/thread_pool_task_executor.h"
@@ -602,14 +603,13 @@ void resumeMigrationCoordinationsOnStepUp(ServiceContext* serviceContext) {
LOGV2(22037, "Starting migration coordinator stepup recovery thread.");
auto executor = Grid::get(serviceContext)->getExecutorPool()->getFixedExecutor();
- ExecutorFuture<void>(executor).getAsync([serviceContext](const Status& status) {
- try {
+ ExecutorFuture<void>(executor)
+ .then([serviceContext] {
ThreadClient tc("MigrationCoordinatorStepupRecovery", serviceContext);
{
stdx::lock_guard<Client> lk(*tc.get());
tc->setSystemOperationKillable(lk);
}
-
auto uniqueOpCtx = tc->makeOperationContext();
auto opCtx = uniqueOpCtx.get();
@@ -633,14 +633,17 @@ void resumeMigrationCoordinationsOnStepUp(ServiceContext* serviceContext) {
LOGV2(22038,
"Waiting for OpTime {lastOpTime} to become majority committed",
"lastOpTime"_attr = lastOpTime);
- WriteConcernResult unusedWCResult;
- uassertStatusOK(
- waitForWriteConcern(opCtx,
- lastOpTime,
- WriteConcernOptions{WriteConcernOptions::kMajority,
- WriteConcernOptions::SyncMode::UNSET,
- WriteConcernOptions::kNoTimeout},
- &unusedWCResult));
+ return WaitForMajorityService::get(serviceContext).waitUntilMajority(lastOpTime);
+ })
+ .thenRunOn(executor)
+ .then([serviceContext]() {
+ ThreadClient tc("MigrationCoordinatorStepupRecovery", serviceContext);
+ {
+ stdx::lock_guard<Client> lk(*tc.get());
+ tc->setSystemOperationKillable(lk);
+ }
+ auto uniqueOpCtx = tc->makeOperationContext();
+ auto opCtx = uniqueOpCtx.get();
PersistentTaskStore<MigrationCoordinatorDocument> store(
opCtx, NamespaceString::kMigrationCoordinatorsNamespace);
@@ -718,12 +721,14 @@ void resumeMigrationCoordinationsOnStepUp(ServiceContext* serviceContext) {
coordinator.completeMigration(opCtx);
return true;
});
- } catch (const DBException& ex) {
- LOGV2(22041,
- "Failed to resume coordinating migrations on stepup {causedBy_ex_toStatus}",
- "causedBy_ex_toStatus"_attr = causedBy(ex.toStatus()));
- }
- });
+ })
+ .getAsync([](const Status& status) {
+ if (!status.isOK()) {
+ LOGV2(22041,
+ "Failed to resume coordinating migrations on stepup {causedBy_status}",
+ "causedBy_status"_attr = causedBy(status));
+ }
+ });
}
} // namespace migrationutil