diff options
author | Matthew Saltz <matthew.saltz@mongodb.com> | 2021-05-13 15:47:11 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-05-13 16:35:24 +0000 |
commit | 859d7208223ea0cb03270f6dcc9491a1afbd18fd (patch) | |
tree | 0e14908fbabd33b65b6d3b295d1c1e321eec1457 | |
parent | d8d76d053616c16cb020190b46c6aad450598010 (diff) | |
download | mongo-859d7208223ea0cb03270f6dcc9491a1afbd18fd.tar.gz |
SERVER-56899 Make PrimaryOnlyService TestService wait for cancellation logic to run
-rw-r--r-- | src/mongo/db/repl/primary_only_service_test.cpp | 114 |
1 files changed, 61 insertions, 53 deletions
diff --git a/src/mongo/db/repl/primary_only_service_test.cpp b/src/mongo/db/repl/primary_only_service_test.cpp index 3c0f4bb3717..cf4bbfe5803 100644 --- a/src/mongo/db/repl/primary_only_service_test.cpp +++ b/src/mongo/db/repl/primary_only_service_test.cpp @@ -46,6 +46,7 @@ #include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/fail_point.h" #include "mongo/util/future.h" +#include "mongo/util/future_util.h" using namespace mongo; using namespace mongo::repl; @@ -105,59 +106,66 @@ public: TestServiceHangDuringInitialization.pauseWhileSet(); } - token.onCancel() - .thenRunOn(_service->getInstanceCleanupExecutor()) - .then([this, self = shared_from_this()] { - stdx::lock_guard lk(_mutex); - - if (_completionPromise.getFuture().isReady()) { - // We already completed - return; - } - _completionPromise.setError(Status(ErrorCodes::Interrupted, "Interrupted")); - }) - .getAsync([](auto) {}); - - return SemiFuture<void>::makeReady() - .thenRunOn(**executor) - .then([self = shared_from_this()] { - self->_runOnce(State::kInitializing, State::kOne); - - if (MONGO_unlikely(TestServiceHangDuringStateOne.shouldFail())) { - TestServiceHangDuringStateOne.pauseWhileSet(); - } - }) - .then([self = shared_from_this()] { - self->_runOnce(State::kOne, State::kTwo); - - if (MONGO_unlikely(TestServiceHangDuringStateTwo.shouldFail())) { - TestServiceHangDuringStateTwo.pauseWhileSet(); - } - }) - .then([self = shared_from_this()] { - // After this line the shared_ptr maintaining the Instance object is deleted - // from the PrimaryOnlyService's map. Thus keeping the self reference is - // critical to extend the instance lifetime until all the callbacks using it - // have completed. - self->_runOnce(State::kTwo, State::kDone); - - if (MONGO_unlikely(TestServiceHangDuringCompletion.shouldFail())) { - TestServiceHangDuringCompletion.pauseWhileSet(); - } - }) - .onCompletion([self = shared_from_this()](Status status) { - stdx::lock_guard lk(self->_mutex); - if (self->_completionPromise.getFuture().isReady()) { - // We were already interrupted - return; - } - - if (status.isOK()) { - self->_completionPromise.emplaceValue(); - } else { - self->_completionPromise.setError(status); - } - }) + auto cancelLogicFinishedRunning = + token.onCancel() + .thenRunOn(_service->getInstanceCleanupExecutor()) + .then([this, self = shared_from_this()] { + stdx::lock_guard lk(_mutex); + + if (_completionPromise.getFuture().isReady()) { + // We already completed + return; + } + _completionPromise.setError(Status(ErrorCodes::Interrupted, "Interrupted")); + }); + + auto testLogicFinishedRunning = + SemiFuture<void>::makeReady() + .thenRunOn(**executor) + .then([self = shared_from_this()] { + self->_runOnce(State::kInitializing, State::kOne); + + if (MONGO_unlikely(TestServiceHangDuringStateOne.shouldFail())) { + TestServiceHangDuringStateOne.pauseWhileSet(); + } + }) + .then([self = shared_from_this()] { + self->_runOnce(State::kOne, State::kTwo); + + if (MONGO_unlikely(TestServiceHangDuringStateTwo.shouldFail())) { + TestServiceHangDuringStateTwo.pauseWhileSet(); + } + }) + .then([self = shared_from_this()] { + // After this line the shared_ptr maintaining the Instance object is deleted + // from the PrimaryOnlyService's map. Thus keeping the self reference is + // critical to extend the instance lifetime until all the callbacks using it + // have completed. + self->_runOnce(State::kTwo, State::kDone); + + if (MONGO_unlikely(TestServiceHangDuringCompletion.shouldFail())) { + TestServiceHangDuringCompletion.pauseWhileSet(); + } + }) + .onCompletion([self = shared_from_this()](Status status) { + stdx::lock_guard lk(self->_mutex); + if (self->_completionPromise.getFuture().isReady()) { + // We were already interrupted + return; + } + + if (status.isOK()) { + self->_completionPromise.emplaceValue(); + } else { + self->_completionPromise.setError(status); + } + }); + + // This instance is considered complete when both cancellation logic and test logic have + // finished running. + return whenAll(std::move(cancelLogicFinishedRunning), + std::move(testLogicFinishedRunning)) + .ignoreValue() .semi(); } |