summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Saltz <matthew.saltz@mongodb.com>2021-05-13 15:47:11 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-05-13 16:35:24 +0000
commit859d7208223ea0cb03270f6dcc9491a1afbd18fd (patch)
tree0e14908fbabd33b65b6d3b295d1c1e321eec1457
parentd8d76d053616c16cb020190b46c6aad450598010 (diff)
downloadmongo-859d7208223ea0cb03270f6dcc9491a1afbd18fd.tar.gz
SERVER-56899 Make PrimaryOnlyService TestService wait for cancellation logic to run
-rw-r--r--src/mongo/db/repl/primary_only_service_test.cpp114
1 files changed, 61 insertions, 53 deletions
diff --git a/src/mongo/db/repl/primary_only_service_test.cpp b/src/mongo/db/repl/primary_only_service_test.cpp
index 3c0f4bb3717..cf4bbfe5803 100644
--- a/src/mongo/db/repl/primary_only_service_test.cpp
+++ b/src/mongo/db/repl/primary_only_service_test.cpp
@@ -46,6 +46,7 @@
#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/fail_point.h"
#include "mongo/util/future.h"
+#include "mongo/util/future_util.h"
using namespace mongo;
using namespace mongo::repl;
@@ -105,59 +106,66 @@ public:
TestServiceHangDuringInitialization.pauseWhileSet();
}
- token.onCancel()
- .thenRunOn(_service->getInstanceCleanupExecutor())
- .then([this, self = shared_from_this()] {
- stdx::lock_guard lk(_mutex);
-
- if (_completionPromise.getFuture().isReady()) {
- // We already completed
- return;
- }
- _completionPromise.setError(Status(ErrorCodes::Interrupted, "Interrupted"));
- })
- .getAsync([](auto) {});
-
- return SemiFuture<void>::makeReady()
- .thenRunOn(**executor)
- .then([self = shared_from_this()] {
- self->_runOnce(State::kInitializing, State::kOne);
-
- if (MONGO_unlikely(TestServiceHangDuringStateOne.shouldFail())) {
- TestServiceHangDuringStateOne.pauseWhileSet();
- }
- })
- .then([self = shared_from_this()] {
- self->_runOnce(State::kOne, State::kTwo);
-
- if (MONGO_unlikely(TestServiceHangDuringStateTwo.shouldFail())) {
- TestServiceHangDuringStateTwo.pauseWhileSet();
- }
- })
- .then([self = shared_from_this()] {
- // After this line the shared_ptr maintaining the Instance object is deleted
- // from the PrimaryOnlyService's map. Thus keeping the self reference is
- // critical to extend the instance lifetime until all the callbacks using it
- // have completed.
- self->_runOnce(State::kTwo, State::kDone);
-
- if (MONGO_unlikely(TestServiceHangDuringCompletion.shouldFail())) {
- TestServiceHangDuringCompletion.pauseWhileSet();
- }
- })
- .onCompletion([self = shared_from_this()](Status status) {
- stdx::lock_guard lk(self->_mutex);
- if (self->_completionPromise.getFuture().isReady()) {
- // We were already interrupted
- return;
- }
-
- if (status.isOK()) {
- self->_completionPromise.emplaceValue();
- } else {
- self->_completionPromise.setError(status);
- }
- })
+ auto cancelLogicFinishedRunning =
+ token.onCancel()
+ .thenRunOn(_service->getInstanceCleanupExecutor())
+ .then([this, self = shared_from_this()] {
+ stdx::lock_guard lk(_mutex);
+
+ if (_completionPromise.getFuture().isReady()) {
+ // We already completed
+ return;
+ }
+ _completionPromise.setError(Status(ErrorCodes::Interrupted, "Interrupted"));
+ });
+
+ auto testLogicFinishedRunning =
+ SemiFuture<void>::makeReady()
+ .thenRunOn(**executor)
+ .then([self = shared_from_this()] {
+ self->_runOnce(State::kInitializing, State::kOne);
+
+ if (MONGO_unlikely(TestServiceHangDuringStateOne.shouldFail())) {
+ TestServiceHangDuringStateOne.pauseWhileSet();
+ }
+ })
+ .then([self = shared_from_this()] {
+ self->_runOnce(State::kOne, State::kTwo);
+
+ if (MONGO_unlikely(TestServiceHangDuringStateTwo.shouldFail())) {
+ TestServiceHangDuringStateTwo.pauseWhileSet();
+ }
+ })
+ .then([self = shared_from_this()] {
+ // After this line the shared_ptr maintaining the Instance object is deleted
+ // from the PrimaryOnlyService's map. Thus keeping the self reference is
+ // critical to extend the instance lifetime until all the callbacks using it
+ // have completed.
+ self->_runOnce(State::kTwo, State::kDone);
+
+ if (MONGO_unlikely(TestServiceHangDuringCompletion.shouldFail())) {
+ TestServiceHangDuringCompletion.pauseWhileSet();
+ }
+ })
+ .onCompletion([self = shared_from_this()](Status status) {
+ stdx::lock_guard lk(self->_mutex);
+ if (self->_completionPromise.getFuture().isReady()) {
+ // We were already interrupted
+ return;
+ }
+
+ if (status.isOK()) {
+ self->_completionPromise.emplaceValue();
+ } else {
+ self->_completionPromise.setError(status);
+ }
+ });
+
+ // This instance is considered complete when both cancellation logic and test logic have
+ // finished running.
+ return whenAll(std::move(cancelLogicFinishedRunning),
+ std::move(testLogicFinishedRunning))
+ .ignoreValue()
.semi();
}