summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGeorge Wangensteen <george.wangensteen@mongodb.com>2021-02-04 16:23:05 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-02-04 17:45:23 +0000
commit3b4c40b136da419512bf6501655473db552efb11 (patch)
treeafa116d98dd44521a2983b97e0f91dcafa859ab5 /src
parentf6119db22f13329a15be3771f732088a06130922 (diff)
downloadmongo-3b4c40b136da419512bf6501655473db552efb11.tar.gz
Revert "SERVER-50656 Add cancellation support to WaitForMajorityService"
This reverts commit da77452821c355346d873a6b31160c101adc60de.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/mongod_main.cpp2
-rw-r--r--src/mongo/db/repl/SConscript2
-rw-r--r--src/mongo/db/repl/primary_only_service.cpp2
-rw-r--r--src/mongo/db/repl/primary_only_service_test.cpp2
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_service.cpp2
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.cpp18
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service_test.cpp2
-rw-r--r--src/mongo/db/repl/tenant_migration_util.cpp3
-rw-r--r--src/mongo/db/repl/wait_for_majority_service.cpp234
-rw-r--r--src/mongo/db/repl/wait_for_majority_service.h92
-rw-r--r--src/mongo/db/repl/wait_for_majority_service_test.cpp170
-rw-r--r--src/mongo/db/s/collection_sharding_runtime_test.cpp2
-rw-r--r--src/mongo/db/s/migration_util_test.cpp2
-rw-r--r--src/mongo/db/s/range_deletion_util.cpp2
-rw-r--r--src/mongo/db/s/range_deletion_util_test.cpp2
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp1
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h2
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp2
-rw-r--r--src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp2
-rw-r--r--src/mongo/db/s/resharding_destined_recipient_test.cpp2
-rw-r--r--src/mongo/db/s/transaction_coordinator.cpp5
-rw-r--r--src/mongo/db/s/transaction_coordinator_test_fixture.cpp2
-rw-r--r--src/mongo/util/future_util.h6
23 files changed, 154 insertions, 405 deletions
diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp
index b10120e76ed..de25ae2da20 100644
--- a/src/mongo/db/mongod_main.cpp
+++ b/src/mongo/db/mongod_main.cpp
@@ -572,7 +572,7 @@ ExitCode _initAndListen(ServiceContext* serviceContext, int listenPort) {
"** This mode should only be used to manually repair corrupted auth data");
}
- WaitForMajorityService::get(serviceContext).startup(serviceContext);
+ WaitForMajorityService::get(serviceContext).setUp(serviceContext);
// This function may take the global lock.
auto shardingInitialized = ShardingInitializationMongoD::get(startupOpCtx.get())
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index b95622c7ed4..de56a969f75 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -1748,8 +1748,6 @@ env.Library(
LIBDEPS=[
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/rw_concern_d',
- '$BUILD_DIR/mongo/util/concurrency/thread_pool',
- '$BUILD_DIR/mongo/util/future_util',
],
)
diff --git a/src/mongo/db/repl/primary_only_service.cpp b/src/mongo/db/repl/primary_only_service.cpp
index 35a0aff7f91..93ee1280340 100644
--- a/src/mongo/db/repl/primary_only_service.cpp
+++ b/src/mongo/db/repl/primary_only_service.cpp
@@ -365,7 +365,7 @@ void PrimaryOnlyService::onStepUp(const OpTime& stepUpOpTime) {
stdx::lock_guard lk(_mutex);
auto term = _term;
WaitForMajorityService::get(_serviceContext)
- .waitUntilMajority(stepUpOpTime, _source.token())
+ .waitUntilMajority(stepUpOpTime)
.thenRunOn(**_scopedExecutor)
.then([this] {
if (MONGO_unlikely(PrimaryOnlyServiceSkipRebuildingInstances.shouldFail())) {
diff --git a/src/mongo/db/repl/primary_only_service_test.cpp b/src/mongo/db/repl/primary_only_service_test.cpp
index 501d28a08dd..ace2ac62288 100644
--- a/src/mongo/db/repl/primary_only_service_test.cpp
+++ b/src/mongo/db/repl/primary_only_service_test.cpp
@@ -300,7 +300,7 @@ public:
ServiceContextMongoDTest::setUp();
auto serviceContext = getServiceContext();
- WaitForMajorityService::get(serviceContext).startup(serviceContext);
+ WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext());
{
auto opCtx = cc().makeOperationContext();
diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp
index 086e4d68bd1..9e42924363d 100644
--- a/src/mongo/db/repl/tenant_migration_donor_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp
@@ -570,7 +570,7 @@ TenantMigrationDonorService::Instance::_markStateDocAsGarbageCollectable(
ExecutorFuture<void> TenantMigrationDonorService::Instance::_waitForMajorityWriteConcern(
std::shared_ptr<executor::ScopedTaskExecutor> executor, repl::OpTime opTime) {
return WaitForMajorityService::get(_serviceContext)
- .waitUntilMajority(std::move(opTime), CancelationToken::uncancelable())
+ .waitUntilMajority(std::move(opTime))
.thenRunOn(**executor)
.then([this, self = shared_from_this()] {
stdx::lock_guard<Latch> lg(_mutex);
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
index 06e3a2f0c88..6b5b1fdd5d2 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
@@ -59,7 +59,6 @@
#include "mongo/logv2/log.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/util/assert_util.h"
-#include "mongo/util/cancelation.h"
#include "mongo/util/future_util.h"
namespace mongo {
@@ -318,8 +317,7 @@ OpTime TenantMigrationRecipientService::Instance::waitUntilTimestampIsMajorityCo
// Wait for the read recipient optime to be majority committed.
WaitForMajorityService::get(opCtx->getServiceContext())
- .waitUntilMajority(donorRecipientOpTimePair.recipientOpTime,
- CancelationToken::uncancelable())
+ .waitUntilMajority(donorRecipientOpTimePair.recipientOpTime)
.get(opCtx);
return donorRecipientOpTimePair.donorOpTime;
}
@@ -507,7 +505,7 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_initializeStateDoc(
// rollback.
auto insertOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
return WaitForMajorityService::get(opCtx->getServiceContext())
- .waitUntilMajority(insertOpTime, CancelationToken::uncancelable())
+ .waitUntilMajority(insertOpTime)
.semi();
}
@@ -865,8 +863,7 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_onCloneSuccess() {
uassertStatusOK(tenantMigrationRecipientEntryHelpers::updateStateDoc(opCtx.get(), _stateDoc));
return WaitForMajorityService::get(opCtx->getServiceContext())
- .waitUntilMajority(repl::ReplClientInfo::forClient(cc()).getLastOp(),
- CancelationToken::uncancelable())
+ .waitUntilMajority(repl::ReplClientInfo::forClient(cc()).getLastOp())
.semi();
}
@@ -892,8 +889,7 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_getDataConsistentFu
uassertStatusOK(
tenantMigrationRecipientEntryHelpers::updateStateDoc(opCtx.get(), _stateDoc));
return WaitForMajorityService::get(opCtx->getServiceContext())
- .waitUntilMajority(repl::ReplClientInfo::forClient(cc()).getLastOp(),
- CancelationToken::uncancelable());
+ .waitUntilMajority(repl::ReplClientInfo::forClient(cc()).getLastOp());
})
.semi();
}
@@ -981,7 +977,7 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_markStateDocAsGarba
auto writeOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
return WaitForMajorityService::get(opCtx->getServiceContext())
- .waitUntilMajority(writeOpTime, CancelationToken::uncancelable())
+ .waitUntilMajority(writeOpTime)
.semi();
}
@@ -1134,9 +1130,7 @@ SharedSemiFuture<void> TenantMigrationRecipientService::Instance::_updateStateDo
auto opCtx = cc().makeOperationContext();
uassertStatusOK(tenantMigrationRecipientEntryHelpers::updateStateDoc(opCtx.get(), _stateDoc));
return WaitForMajorityService::get(opCtx->getServiceContext())
- .waitUntilMajority(repl::ReplClientInfo::forClient(cc()).getLastOp(),
- CancelationToken::uncancelable())
- .share();
+ .waitUntilMajority(repl::ReplClientInfo::forClient(cc()).getLastOp());
}
ExecutorFuture<void>
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
index ed9e0016ca8..31f833b75f8 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
@@ -176,7 +176,7 @@ public:
serviceContext->setPreciseClockSource(
std::make_unique<SharedClockSourceAdapter>(_clkSource));
- WaitForMajorityService::get(serviceContext).startup(serviceContext);
+ WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext());
// Automatically mark the state doc garbage collectable after data sync completion.
globalFailPointRegistry()
diff --git a/src/mongo/db/repl/tenant_migration_util.cpp b/src/mongo/db/repl/tenant_migration_util.cpp
index ffac7121a2a..66c39f1a897 100644
--- a/src/mongo/db/repl/tenant_migration_util.cpp
+++ b/src/mongo/db/repl/tenant_migration_util.cpp
@@ -36,7 +36,6 @@
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/repl_server_parameters_gen.h"
#include "mongo/db/repl/wait_for_majority_service.h"
-#include "mongo/util/cancelation.h"
#include "mongo/util/future_util.h"
namespace mongo {
@@ -92,7 +91,7 @@ ExecutorFuture<void> storeExternalClusterTimeKeyDocsAndRefreshCache(
const auto opTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
return WaitForMajorityService::get(opCtx->getServiceContext())
- .waitUntilMajority(opTime, CancelationToken::uncancelable())
+ .waitUntilMajority(opTime)
.thenRunOn(**executor)
.then([] {
auto opCtxHolder = cc().makeOperationContext();
diff --git a/src/mongo/db/repl/wait_for_majority_service.cpp b/src/mongo/db/repl/wait_for_majority_service.cpp
index 0cb9e51f9f4..c316501663e 100644
--- a/src/mongo/db/repl/wait_for_majority_service.cpp
+++ b/src/mongo/db/repl/wait_for_majority_service.cpp
@@ -1,4 +1,4 @@
-/*
+/**
* Copyright (C) 2019-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
@@ -40,9 +40,8 @@
#include "mongo/executor/network_interface_factory.h"
#include "mongo/executor/thread_pool_task_executor.h"
#include "mongo/logv2/log.h"
+#include "mongo/util/concurrency/idle_thread_block.h"
#include "mongo/util/concurrency/thread_pool.h"
-#include "mongo/util/future_util.h"
-#include "mongo/util/static_immortal.h"
namespace mongo {
@@ -50,27 +49,9 @@ namespace {
const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority,
WriteConcernOptions::SyncMode::UNSET,
WriteConcernOptions::kNoTimeout);
+
const auto waitForMajorityServiceDecoration =
ServiceContext::declareDecoration<WaitForMajorityService>();
-
-constexpr static auto kWaitClientName = "WaitForMajorityServiceWaiter";
-constexpr static auto kCancelClientName = "WaitForMajorityServiceCanceler";
-
-std::unique_ptr<ThreadPool> makeThreadPool() {
- ThreadPool::Options options;
- options.poolName = "WaitForMajorityServiceThreadPool";
- options.minThreads = 0;
- // This service must have the ability to use at least two background threads. If it is limited
- // to one, than if that thread is blocking waiting on an opTime, any cancellations cannot be
- // completed until that wait is complete.
- options.maxThreads = 2;
- return std::make_unique<ThreadPool>(options);
-}
-inline Status waitUntilMajorityCanceledStatus() {
- static StaticImmortal s =
- Status{ErrorCodes::CallbackCanceled, "WaitForMajorityService::waitUntilMajority canceled"};
- return *s;
-}
} // namespace
WaitForMajorityService::~WaitForMajorityService() {
@@ -81,156 +62,139 @@ WaitForMajorityService& WaitForMajorityService::get(ServiceContext* service) {
return waitForMajorityServiceDecoration(service);
}
-void WaitForMajorityService::startup(ServiceContext* ctx) {
+void WaitForMajorityService::setUp(ServiceContext* service) {
stdx::lock_guard lk(_mutex);
- invariant(_state == State::kNotStarted);
- _pool = makeThreadPool();
- _waitForMajorityClient = ClientStrand::make(ctx->makeClient(kWaitClientName));
- _waitForMajorityCancelationClient = ClientStrand::make(ctx->makeClient(kCancelClientName));
- _backgroundWorkComplete = _periodicallyWaitForMajority();
- _pool->startup();
- _state = State::kRunning;
+
+ if (!_thread.joinable() && !_inShutDown) {
+ _thread = stdx::thread([this, service] { _periodicallyWaitForMajority(service); });
+ }
}
void WaitForMajorityService::shutDown() {
{
stdx::lock_guard lk(_mutex);
- if (_state != State::kRunning) {
+ if (std::exchange(_inShutDown, true)) {
return;
}
- _state = State::kShutdown;
- _waitForMajorityClient->getClientPointer()->setKilled();
- _waitForMajorityCancelationClient->getClientPointer()->setKilled();
-
- for (auto&& request : _queuedOpTimes) {
- if (!request.second->hasBeenProcessed.swap(true)) {
- request.second->result.setError(
- {ErrorCodes::InterruptedAtShutdown, "Shutting down wait for majority service"});
- }
+ if (_opCtx) {
+ stdx::lock_guard scopedClientLock(*_opCtx->getClient());
+ _opCtx->getServiceContext()->killOperation(
+ scopedClientLock, _opCtx, ErrorCodes::InterruptedAtShutdown);
}
- _hasNewOpTimeCV.notifyAllAndClose();
}
- _pool->shutdown();
- _pool->join();
- _backgroundWorkComplete->wait();
- // It's important to reset the clientstrand pointers after waiting for work
- // in the thread pool to complete since that work might be using the client
- // objects.
- _waitForMajorityClient.reset();
- _waitForMajorityCancelationClient.reset();
-}
-SemiFuture<void> WaitForMajorityService::waitUntilMajority(const repl::OpTime& opTime,
- const CancelationToken& cancelToken) {
-
- auto [promise, future] = makePromiseFuture<void>();
- auto request = std::make_shared<Request>(std::move(promise));
+ if (_thread.joinable()) {
+ _thread.join();
+ }
stdx::lock_guard lk(_mutex);
+ for (auto&& pendingRequest : _queuedOpTimes) {
+ pendingRequest.second.setError(
+ {ErrorCodes::InterruptedAtShutdown, "Shutting down wait for majority service"});
+ }
- tassert(5065600,
- "WaitForMajorityService must be started before calling waitUntilMajority",
- _state != State::kNotStarted);
+ _queuedOpTimes.clear();
+}
- if (_state == State::kShutdown) {
- return {SemiFuture<void>::makeReady(
+SharedSemiFuture<void> WaitForMajorityService::waitUntilMajority(const repl::OpTime& opTime) {
+ stdx::lock_guard lk(_mutex);
+
+ if (_inShutDown) {
+ return {Future<void>::makeReady(
Status{ErrorCodes::ShutdownInProgress,
"rejecting wait for majority request due to server shutdown"})};
}
+ // Background thread must be running before requesting.
+ invariant(_thread.joinable());
+
if (_lastOpTimeWaited >= opTime) {
- return {SemiFuture<void>::makeReady()};
+ return {Future<void>::makeReady()};
}
- if (cancelToken.isCanceled()) {
- return {SemiFuture<void>::makeReady(waitUntilMajorityCanceledStatus())};
+ auto iter = _queuedOpTimes.lower_bound(opTime);
+ if (iter != _queuedOpTimes.end()) {
+ if (iter->first == opTime) {
+ return iter->second.getFuture();
+ }
}
- const bool wasEmpty = _queuedOpTimes.empty();
-
- if (!wasEmpty && opTime < _queuedOpTimes.begin()->first) {
+ if (iter == _queuedOpTimes.begin()) {
// Background thread could already be actively waiting on a later time, so tell it to stop
// and wait for the newly requested opTime instead.
- stdx::lock_guard scopedClientLock(*_waitForMajorityClient->getClientPointer());
- if (auto opCtx = _waitForMajorityClient->getClientPointer()->getOperationContext())
- opCtx->getServiceContext()->killOperation(
- scopedClientLock, opCtx, ErrorCodes::WaitForMajorityServiceEarlierOpTimeAvailable);
+ if (_opCtx) {
+ stdx::lock_guard scopedClientLock(*_opCtx->getClient());
+ _opCtx->getServiceContext()->killOperation(
+ scopedClientLock, _opCtx, ErrorCodes::WaitForMajorityServiceEarlierOpTimeAvailable);
+ }
}
- auto resultIter = _queuedOpTimes.emplace(
- std::piecewise_construct, std::forward_as_tuple(opTime), std::forward_as_tuple(request));
-
+ const bool wasEmpty = _queuedOpTimes.empty();
+ auto resultIter = _queuedOpTimes.emplace_hint(
+ iter, std::piecewise_construct, std::forward_as_tuple(opTime), std::forward_as_tuple());
if (wasEmpty) {
- // Notify the background thread that work is now available.
- _hasNewOpTimeCV.notifyAllAndReset();
+ _hasNewOpTimeCV.notify_one();
}
- cancelToken.onCancel().thenRunOn(_pool).getAsync([this, resultIter, request](Status s) {
- if (!s.isOK()) {
- return;
+ return resultIter->second.getFuture();
+}
+
+void WaitForMajorityService::_periodicallyWaitForMajority(ServiceContext* service) {
+ ThreadClient tc("waitForMajority", service);
+
+ stdx::unique_lock<Latch> lk(_mutex);
+
+ while (!_inShutDown) {
+ auto opCtx = tc->makeOperationContext();
+ _opCtx = opCtx.get();
+
+ if (!_queuedOpTimes.empty()) {
+ auto lowestOpTimeIter = _queuedOpTimes.begin();
+ auto lowestOpTime = lowestOpTimeIter->first;
+
+ lk.unlock();
+
+ WriteConcernResult ignoreResult;
+ auto status =
+ waitForWriteConcern(_opCtx, lowestOpTime, kMajorityWriteConcern, &ignoreResult);
+
+ lk.lock();
+
+ if (status.isOK()) {
+ _lastOpTimeWaited = lowestOpTime;
+ }
+
+ if (status == ErrorCodes::WaitForMajorityServiceEarlierOpTimeAvailable) {
+ _opCtx = nullptr;
+ continue;
+ }
+
+ if (status.isOK()) {
+ lowestOpTimeIter->second.emplaceValue();
+ } else {
+ lowestOpTimeIter->second.setError(status);
+ }
+
+ _queuedOpTimes.erase(lowestOpTimeIter);
}
- auto clientGuard = _waitForMajorityCancelationClient->bind();
- if (!request->hasBeenProcessed.swap(true)) {
- request->result.setError(waitUntilMajorityCanceledStatus());
- stdx::lock_guard lk(_mutex);
- _queuedOpTimes.erase(resultIter);
+
+ try {
+ MONGO_IDLE_THREAD_BLOCK;
+ _opCtx->waitForConditionOrInterrupt(
+ _hasNewOpTimeCV, lk, [&] { return !_queuedOpTimes.empty() || _inShutDown; });
+ } catch (const DBException& e) {
+ LOGV2_DEBUG(22487,
+ 1,
+ "Unable to wait for new op time due to: {error}",
+ "Unable to wait for new op time",
+ "error"_attr = e);
}
- });
- return std::move(future).semi();
-}
-SemiFuture<void> WaitForMajorityService::_periodicallyWaitForMajority() {
- return AsyncTry([this] {
- stdx::unique_lock<Latch> lk(_mutex);
- if (_queuedOpTimes.empty()) {
- return _hasNewOpTimeCV.onNotify();
- }
- auto clientGuard = _waitForMajorityClient->bind();
- auto opCtx = clientGuard->makeOperationContext();
-
- // This needs to be a copy since we unlock the lock before waiting for write concern
- // and the iterator could be invalidated.
- auto lowestOpTime = _queuedOpTimes.begin()->first;
-
- lk.unlock();
-
- WriteConcernResult ignoreResult;
- auto status = waitForWriteConcern(
- opCtx.get(), lowestOpTime, kMajorityWriteConcern, &ignoreResult);
-
- lk.lock();
-
- if (status.isOK()) {
- _lastOpTimeWaited = lowestOpTime;
- }
-
- if (status != ErrorCodes::WaitForMajorityServiceEarlierOpTimeAvailable) {
- auto [lowestOpTimeIter, firstElemWithHigherOpTimeIter] =
- _queuedOpTimes.equal_range(lowestOpTime);
-
- for (auto requestIt = lowestOpTimeIter;
- requestIt != firstElemWithHigherOpTimeIter;
- /*Increment in loop*/) {
- if (!requestIt->second->hasBeenProcessed.swap(true)) {
- requestIt->second->result.setFrom(status);
- requestIt = _queuedOpTimes.erase(requestIt);
- } else {
- ++requestIt;
- }
- }
- }
- return SemiFuture<void>::makeReady();
- })
- .until([](Status) {
- // Loop forever until _pool is shut down.
- // TODO (SERVER-53766): Replace with condition-free looping utility.
- return false;
- })
- .on(_pool, CancelationToken::uncancelable())
- .semi();
+ _opCtx = nullptr;
+ }
}
} // namespace mongo
diff --git a/src/mongo/db/repl/wait_for_majority_service.h b/src/mongo/db/repl/wait_for_majority_service.h
index 181ed60b265..1663ce907ac 100644
--- a/src/mongo/db/repl/wait_for_majority_service.h
+++ b/src/mongo/db/repl/wait_for_majority_service.h
@@ -33,57 +33,15 @@
#include <memory>
#include <vector>
-#include <boost/optional.hpp>
-#include <boost/utility/in_place_factory.hpp>
-
-#include "mongo/db/client_strand.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/service_context.h"
#include "mongo/executor/task_executor.h"
#include "mongo/platform/mutex.h"
#include "mongo/stdx/thread.h"
-#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/future.h"
namespace mongo {
-namespace detail {
-
-class AsyncConditionVariable {
-public:
- AsyncConditionVariable() : _current(boost::in_place()) {}
-
- SemiFuture<void> onNotify() {
- stdx::lock_guard lk(_mutex);
- return _current->getFuture().semi();
- }
-
- void notifyAllAndReset() {
- stdx::lock_guard lk(_mutex);
- if (_inShutdown) {
- return;
- }
- _current->emplaceValue();
- _current = boost::in_place();
- }
-
- void notifyAllAndClose() {
- stdx::lock_guard lk(_mutex);
- if (_inShutdown) {
- return;
- }
- _inShutdown = true;
- _current->emplaceValue();
- }
-
-private:
- mutable Mutex _mutex = MONGO_MAKE_LATCH("AsyncConditionVariable::_mutex");
- boost::optional<SharedPromise<void>> _current;
- bool _inShutdown{false};
-};
-} // namespace detail
-
-
/**
* Provides a facility for asynchronously waiting a local opTime to be majority committed.
*/
@@ -94,10 +52,9 @@ public:
static WaitForMajorityService& get(ServiceContext* service);
/**
- * Sets up the background thread pool responsible for waiting for opTimes to be majority
- * committed.
+ * Sets up the background thread responsible for waiting for opTimes to be majority committed.
*/
- void startup(ServiceContext* ctx);
+ void setUp(ServiceContext* service);
/**
* Blocking method, which shuts down and joins the background thread.
@@ -107,40 +64,16 @@ public:
/**
* Enqueue a request to wait for the given opTime to be majority committed.
*/
- SemiFuture<void> waitUntilMajority(const repl::OpTime& opTime,
- const CancelationToken& cancelToken);
+ SharedSemiFuture<void> waitUntilMajority(const repl::OpTime& opTime);
private:
- enum class State { kNotStarted, kRunning, kShutdown };
- // State is kNotStarted on construction, kRunning after WaitForMajorityService::startup() has
- // been called, and kShutdown after WaitForMajorityService::shutdown() has been called. It is
- // illegal to call WaitForMajorityService::waitUntilMajority before calling startup.
- State _state{State::kNotStarted};
- /**
- * Internal representation of an individual request to wait on some particular optime.
- */
- struct Request {
- explicit Request(Promise<void> promise)
- : hasBeenProcessed{false}, result(std::move(promise)) {}
- AtomicWord<bool> hasBeenProcessed;
- Promise<void> result;
- };
-
- using OpTimeWaitingMap = std::multimap<repl::OpTime, std::shared_ptr<Request>>;
+ using OpTimeWaitingMap = std::map<repl::OpTime, SharedPromise<void>>;
/**
* Periodically checks the list of opTimes to wait for majority committed.
*/
- SemiFuture<void> _periodicallyWaitForMajority();
+ void _periodicallyWaitForMajority(ServiceContext* service);
- // The pool of threads available to wait on opTimes and cancel existing requests.
- std::shared_ptr<ThreadPool> _pool;
-
- // This future is completed when the service has finished all of its work and is ready for
- // shutdown.
- boost::optional<SemiFuture<void>> _backgroundWorkComplete;
-
- // This mutex synchronizes access to the members declared below.
Mutex _mutex = MONGO_MAKE_LATCH("WaitForMajorityService::_mutex");
// Contains an ordered list of opTimes to wait to be majority comitted.
@@ -150,15 +83,18 @@ private:
// majority comitted.
repl::OpTime _lastOpTimeWaited;
+ // The background thread.
+ stdx::thread _thread;
+
// Use for signalling new opTime requests being queued.
- detail::AsyncConditionVariable _hasNewOpTimeCV;
+ stdx::condition_variable _hasNewOpTimeCV;
- // Manages the Client responsible for the thread that waits on opTimes.
- ClientStrandPtr _waitForMajorityClient;
+ // If set, contains a reference to the opCtx being used by the background thread.
+ // Only valid when _thread.joinable() and not nullptr.
+ OperationContext* _opCtx{nullptr};
- // Manages the Client responsible for the thread that cancels existing requests to wait on
- // opTimes.
- ClientStrandPtr _waitForMajorityCancelationClient;
+ // Flag is set to true after shutDown() is called.
+ bool _inShutDown{false};
};
} // namespace mongo
diff --git a/src/mongo/db/repl/wait_for_majority_service_test.cpp b/src/mongo/db/repl/wait_for_majority_service_test.cpp
index 14c0266ee89..c68158b97fd 100644
--- a/src/mongo/db/repl/wait_for_majority_service_test.cpp
+++ b/src/mongo/db/repl/wait_for_majority_service_test.cpp
@@ -34,7 +34,6 @@
#include "mongo/db/service_context_d_test_fixture.h"
#include "mongo/platform/mutex.h"
#include "mongo/unittest/unittest.h"
-#include "mongo/util/cancelation.h"
namespace mongo {
namespace {
@@ -42,9 +41,8 @@ namespace {
class WaitForMajorityServiceTest : public ServiceContextMongoDTest {
public:
void setUp() override {
- ServiceContextMongoDTest::setUp();
auto service = getServiceContext();
- waitService()->startup(service);
+ waitService()->setUp(service);
auto replCoord = std::make_unique<repl::ReplicationCoordinatorMock>(service);
@@ -59,7 +57,6 @@ public:
void tearDown() override {
waitService()->shutDown();
- ServiceContextMongoDTest::tearDown();
}
WaitForMajorityService* waitService() {
@@ -108,9 +105,6 @@ public:
_callCountChangedCV.wait(lk, [&] { return _waitForMajorityCallCount > expectedCount; });
}
- static inline const Status kCanceledStatus = {ErrorCodes::CallbackCanceled,
- "waitForMajority canceled"};
-
private:
WaitForMajorityService _waitForMajorityService;
@@ -124,36 +118,10 @@ private:
int _waitForMajorityCallCount{0};
};
-class WaitForMajorityServiceNoStartupTest : public ServiceContextMongoDTest {
-public:
- void setUp() override {
- ServiceContextMongoDTest::setUp();
- }
-
- void tearDown() override {
- waitService()->shutDown();
- ServiceContextMongoDTest::tearDown();
- }
- WaitForMajorityService* waitService() {
- return &_waitForMajorityService;
- }
-
-private:
- WaitForMajorityService _waitForMajorityService;
-};
-
-TEST_F(WaitForMajorityServiceTest, ShutdownImmediatelyAfterStartupDoesNotCrashOrHang) {
- waitService()->shutDown();
-}
-
-TEST_F(WaitForMajorityServiceNoStartupTest, ShutdownBeforeStartupDoesNotCrashOrHang) {
- waitService()->shutDown();
-}
-
TEST_F(WaitForMajorityServiceTest, WaitOneOpTime) {
repl::OpTime t1(Timestamp(1, 0), 2);
- auto future = waitService()->waitUntilMajority(t1, CancelationToken::uncancelable());
+ auto future = waitService()->waitUntilMajority(t1);
ASSERT_FALSE(future.isReady());
@@ -166,8 +134,8 @@ TEST_F(WaitForMajorityServiceTest, WaitOneOpTime) {
TEST_F(WaitForMajorityServiceTest, WaitWithSameOpTime) {
repl::OpTime t1(Timestamp(1, 0), 2);
- auto future1 = waitService()->waitUntilMajority(t1, CancelationToken::uncancelable());
- auto future1b = waitService()->waitUntilMajority(t1, CancelationToken::uncancelable());
+ auto future1 = waitService()->waitUntilMajority(t1);
+ auto future1b = waitService()->waitUntilMajority(t1);
ASSERT_FALSE(future1.isReady());
ASSERT_FALSE(future1b.isReady());
@@ -184,15 +152,13 @@ TEST_F(WaitForMajorityServiceTest, WaitWithOpTimeEarlierThanLowestQueued) {
repl::OpTime laterOpTime(Timestamp(6, 0), 2);
repl::OpTime earlierOpTime(Timestamp(1, 0), 2);
- auto laterFuture =
- waitService()->waitUntilMajority(laterOpTime, CancelationToken::uncancelable());
+ auto laterFuture = waitService()->waitUntilMajority(laterOpTime);
// Wait until the background thread picks up the queued opTime.
waitForMajorityCallCountGreaterThan(0);
// The 2nd request has an earlier time, so it will interrupt 'laterOpTime' and skip the line.
- auto earlierFuture =
- waitService()->waitUntilMajority(earlierOpTime, CancelationToken::uncancelable());
+ auto earlierFuture = waitService()->waitUntilMajority(earlierOpTime);
// Wait for background thread to finish transitioning from waiting on laterOpTime to
// earlierOpTime.
@@ -218,8 +184,8 @@ TEST_F(WaitForMajorityServiceTest, WaitWithDifferentOpTime) {
repl::OpTime t1(Timestamp(1, 0), 2);
repl::OpTime t2(Timestamp(14, 0), 2);
- auto future1 = waitService()->waitUntilMajority(t1, CancelationToken::uncancelable());
- auto future2 = waitService()->waitUntilMajority(t2, CancelationToken::uncancelable());
+ auto future1 = waitService()->waitUntilMajority(t1);
+ auto future2 = waitService()->waitUntilMajority(t2);
ASSERT_FALSE(future1.isReady());
ASSERT_FALSE(future2.isReady());
@@ -241,8 +207,8 @@ TEST_F(WaitForMajorityServiceTest, WaitWithOpTimeEarlierThanOpTimeAlreadyWaited)
repl::OpTime t1(Timestamp(5, 0), 2);
repl::OpTime t2(Timestamp(14, 0), 2);
- auto future1 = waitService()->waitUntilMajority(t1, CancelationToken::uncancelable());
- auto future2 = waitService()->waitUntilMajority(t2, CancelationToken::uncancelable());
+ auto future1 = waitService()->waitUntilMajority(t1);
+ auto future2 = waitService()->waitUntilMajority(t2);
ASSERT_FALSE(future1.isReady());
ASSERT_FALSE(future2.isReady());
@@ -255,9 +221,8 @@ TEST_F(WaitForMajorityServiceTest, WaitWithOpTimeEarlierThanOpTimeAlreadyWaited)
ASSERT_EQ(t1, getLastOpTimeWaited());
repl::OpTime oldTs(Timestamp(4, 0), 2);
- auto oldFuture = waitService()->waitUntilMajority(oldTs, CancelationToken::uncancelable());
- auto alreadyWaitedFuture =
- waitService()->waitUntilMajority(t1, CancelationToken::uncancelable());
+ auto oldFuture = waitService()->waitUntilMajority(oldTs);
+ auto alreadyWaitedFuture = waitService()->waitUntilMajority(t1);
ASSERT_FALSE(future2.isReady());
@@ -271,13 +236,12 @@ TEST_F(WaitForMajorityServiceTest, WaitWithOpTimeEarlierThanOpTimeAlreadyWaited)
ASSERT_EQ(t2, getLastOpTimeWaited());
}
-
TEST_F(WaitForMajorityServiceTest, ShutdownShouldCancelQueuedRequests) {
repl::OpTime t1(Timestamp(5, 0), 2);
repl::OpTime t2(Timestamp(14, 0), 2);
- auto future1 = waitService()->waitUntilMajority(t1, CancelationToken::uncancelable());
- auto future2 = waitService()->waitUntilMajority(t2, CancelationToken::uncancelable());
+ auto future1 = waitService()->waitUntilMajority(t1);
+ auto future2 = waitService()->waitUntilMajority(t2);
ASSERT_FALSE(future1.isReady());
ASSERT_FALSE(future2.isReady());
@@ -299,113 +263,9 @@ TEST_F(WaitForMajorityServiceTest, WriteConcernErrorGetsPropagatedCorrectly) {
{ErrorCodes::PrimarySteppedDown, "test stepdown"}, Milliseconds(0));
});
- auto future = waitService()->waitUntilMajority(t, CancelationToken::uncancelable());
+ auto future = waitService()->waitUntilMajority(t);
ASSERT_THROWS_CODE(future.get(), AssertionException, ErrorCodes::PrimarySteppedDown);
}
-TEST_F(WaitForMajorityServiceTest, CanCancelWaitOnOneOptime) {
- repl::OpTime t(Timestamp(1, 2), 4);
- CancelationSource source;
- auto future = waitService()->waitUntilMajority(t, source.token());
- ASSERT_FALSE(future.isReady());
- source.cancel();
- // The future should now become ready without having to wait for any opTime.
- ASSERT_EQ(future.getNoThrow(), kCanceledStatus);
-}
-
-TEST_F(WaitForMajorityServiceTest, CancelingEarlierOpTimeRequestDoesNotAffectLaterOpTimeRequests) {
- repl::OpTime earlier(Timestamp(1, 2), 4);
- repl::OpTime later(Timestamp(5, 2), 5);
- CancelationSource source;
- auto cancelFuture = waitService()->waitUntilMajority(earlier, source.token());
- auto uncancelableFuture =
- waitService()->waitUntilMajority(later, CancelationToken::uncancelable());
- ASSERT_FALSE(cancelFuture.isReady());
- ASSERT_FALSE(uncancelableFuture.isReady());
- // Wait until the background thread picks up the initial request. Otherwise, there is a race
- // between the cancellation callback removing the initial request and the background thread
- // waiting on it.
- waitForMajorityCallCountGreaterThan(0);
- source.cancel();
- // The future should now become ready without having to wait for any opTime.
- ASSERT_EQ(cancelFuture.getNoThrow(), kCanceledStatus);
- ASSERT_FALSE(uncancelableFuture.isReady());
- finishWaitingOneOpTime();
- ASSERT_FALSE(uncancelableFuture.isReady());
- finishWaitingOneOpTime();
- uncancelableFuture.wait();
- ASSERT_EQ(later, getLastOpTimeWaited());
-}
-
-TEST_F(WaitForMajorityServiceTest, CancelingOneRequestOnOpTimeDoesNotAffectOthersOnSameOpTime) {
- repl::OpTime t1(Timestamp(1, 2), 4);
- repl::OpTime t1Dupe(Timestamp(1, 2), 4);
- CancelationSource source;
- auto cancelFuture = waitService()->waitUntilMajority(t1, source.token());
- auto uncancelableFuture =
- waitService()->waitUntilMajority(t1Dupe, CancelationToken::uncancelable());
- ASSERT_FALSE(cancelFuture.isReady());
- ASSERT_FALSE(uncancelableFuture.isReady());
- source.cancel();
- // The future should now become ready without having to wait for any opTime.
- ASSERT_EQ(cancelFuture.getNoThrow(), kCanceledStatus);
- ASSERT_FALSE(uncancelableFuture.isReady());
- finishWaitingOneOpTime();
- uncancelableFuture.wait();
- ASSERT_EQ(t1, getLastOpTimeWaited());
-}
-
-TEST_F(WaitForMajorityServiceTest, CancelingLaterOpTimeRequestDoesNotAffectEarlierOpTimeRequests) {
- repl::OpTime t1(Timestamp(1, 2), 4);
- repl::OpTime smallerOpTime(Timestamp(1, 2), 1);
- CancelationSource source;
- auto cancelFuture = waitService()->waitUntilMajority(t1, source.token());
- // Wait until the background thread picks up the queued opTime.
- waitForMajorityCallCountGreaterThan(0);
- auto earlierFuture =
- waitService()->waitUntilMajority(smallerOpTime, CancelationToken::uncancelable());
- // Wait for background thread to finish transitioning from waiting on t1 to smallerOpTime.
- waitForMajorityCallCountGreaterThan(1);
- ASSERT_FALSE(cancelFuture.isReady());
- ASSERT_FALSE(earlierFuture.isReady());
- source.cancel();
- // The future should now become ready without having to wait for any opTime.
- ASSERT_EQ(cancelFuture.getNoThrow(), kCanceledStatus);
- ASSERT_FALSE(earlierFuture.isReady());
- finishWaitingOneOpTime();
- earlierFuture.wait();
- ASSERT_EQ(smallerOpTime, getLastOpTimeWaited());
-}
-
-TEST_F(WaitForMajorityServiceTest, SafeToCallCancelOnRequestAlreadyCompletedByShutdown) {
- repl::OpTime t(Timestamp(1, 2), 4);
- CancelationSource source;
- auto deadFuture = waitService()->waitUntilMajority(t, source.token());
- ASSERT_FALSE(deadFuture.isReady());
- waitService()->shutDown();
- ASSERT(deadFuture.isReady());
- ASSERT_THROWS_CODE(deadFuture.get(), AssertionException, ErrorCodes::InterruptedAtShutdown);
- source.cancel();
-}
-
-TEST_F(WaitForMajorityServiceTest, SafeToCallCancelOnRequestAlreadyCompletedByWaiting) {
- repl::OpTime t(Timestamp(1, 2), 4);
- CancelationSource source;
- auto future = waitService()->waitUntilMajority(t, source.token());
- ASSERT_FALSE(future.isReady());
- waitForMajorityCallCountGreaterThan(0);
- finishWaitingOneOpTime();
- future.get();
- ASSERT_EQ(t, getLastOpTimeWaited());
- source.cancel();
-}
-
-TEST_F(WaitForMajorityServiceTest, PassingAlreadyCanceledTokenCompletesFutureWithNoWaiting) {
- repl::OpTime t(Timestamp(1, 2), 4);
- CancelationSource source;
- source.cancel();
- auto future = waitService()->waitUntilMajority(t, source.token());
- ASSERT_EQ(future.getNoThrow(), kCanceledStatus);
-}
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/s/collection_sharding_runtime_test.cpp b/src/mongo/db/s/collection_sharding_runtime_test.cpp
index db87d219711..1779b75b60b 100644
--- a/src/mongo/db/s/collection_sharding_runtime_test.cpp
+++ b/src/mongo/db/s/collection_sharding_runtime_test.cpp
@@ -188,7 +188,7 @@ class CollectionShardingRuntimeWithRangeDeleterTest : public CollectionShardingR
public:
void setUp() override {
ShardServerTestFixture::setUp();
- WaitForMajorityService::get(getServiceContext()).startup(getServiceContext());
+ WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext());
// Set up replication coordinator to be primary and have no replication delay.
auto replCoord = std::make_unique<repl::ReplicationCoordinatorMock>(getServiceContext());
replCoord->setCanAcceptNonLocalWrites(true);
diff --git a/src/mongo/db/s/migration_util_test.cpp b/src/mongo/db/s/migration_util_test.cpp
index 1058c7e83a0..fbe7cf9a40d 100644
--- a/src/mongo/db/s/migration_util_test.cpp
+++ b/src/mongo/db/s/migration_util_test.cpp
@@ -349,7 +349,7 @@ public:
configTargeterMock()->setFindHostReturnValue(kConfigHostAndPort);
- WaitForMajorityService::get(getServiceContext()).startup(getServiceContext());
+ WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext());
// Set up 2 default shards.
for (const auto& shard : kShardList) {
diff --git a/src/mongo/db/s/range_deletion_util.cpp b/src/mongo/db/s/range_deletion_util.cpp
index 1468b2a74c9..1c2189e7177 100644
--- a/src/mongo/db/s/range_deletion_util.cpp
+++ b/src/mongo/db/s/range_deletion_util.cpp
@@ -407,7 +407,7 @@ ExecutorFuture<void> waitForDeletionsToMajorityReplicate(
// Asynchronously wait for majority write concern.
return WaitForMajorityService::get(opCtx->getServiceContext())
- .waitUntilMajority(clientOpTime, CancelationToken::uncancelable())
+ .waitUntilMajority(clientOpTime)
.thenRunOn(executor);
});
}
diff --git a/src/mongo/db/s/range_deletion_util_test.cpp b/src/mongo/db/s/range_deletion_util_test.cpp
index 94088b7751a..12b5e11546d 100644
--- a/src/mongo/db/s/range_deletion_util_test.cpp
+++ b/src/mongo/db/s/range_deletion_util_test.cpp
@@ -57,7 +57,7 @@ public:
void setUp() override {
ShardServerTestFixture::setUp();
- WaitForMajorityService::get(getServiceContext()).startup(getServiceContext());
+ WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext());
// Set up replication coordinator to be primary and have no replication delay.
auto replCoord = std::make_unique<repl::ReplicationCoordinatorMock>(getServiceContext());
replCoord->setCanAcceptNonLocalWrites(true);
diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp
index 686fb22b23e..a582302f029 100644
--- a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp
@@ -76,6 +76,7 @@ TEST_F(ReshardingDonorRecipientCommonInternalsTest,
assertRecipientDocMatchesReshardingFields(metadata, reshardingFields, recipientDoc);
}
+
TEST_F(ReshardingDonorRecipientCommonTest, CreateDonorServiceInstance) {
OperationContext* opCtx = operationContext();
auto metadata = makeShardedMetadataForOriginalCollection(opCtx, kThisShard);
diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h
index 2e7ef3c48b2..a0bdb181523 100644
--- a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h
+++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h
@@ -212,7 +212,7 @@ public:
void setUp() override {
ShardServerTestFixture::setUp();
- WaitForMajorityService::get(getServiceContext()).startup(getServiceContext());
+ WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext());
_registry = repl::PrimaryOnlyServiceRegistry::get(getServiceContext());
diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp
index 4e8189d6238..2b9aed54b82 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp
@@ -104,7 +104,7 @@ public:
shardTargeter->setFindHostReturnValue(makeHostAndPort(shardId));
}
- WaitForMajorityService::get(getServiceContext()).startup(getServiceContext());
+ WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext());
// onStepUp() relies on the storage interface to create the config.transactions table.
repl::StorageInterface::set(getServiceContext(),
diff --git a/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp b/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp
index 36d05e65692..db10b0a6bf6 100644
--- a/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp
@@ -114,7 +114,7 @@ class ReshardingTxnClonerTest : public ShardServerTestFixture {
shardTargeter->setFindHostReturnValue(makeHostAndPort(shardId));
}
- WaitForMajorityService::get(getServiceContext()).startup(getServiceContext());
+ WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext());
// onStepUp() relies on the storage interface to create the config.transactions table.
repl::StorageInterface::set(getServiceContext(),
diff --git a/src/mongo/db/s/resharding_destined_recipient_test.cpp b/src/mongo/db/s/resharding_destined_recipient_test.cpp
index 4fe39db2dcb..7f64d66cdbd 100644
--- a/src/mongo/db/s/resharding_destined_recipient_test.cpp
+++ b/src/mongo/db/s/resharding_destined_recipient_test.cpp
@@ -105,7 +105,7 @@ public:
configTargeterMock()->setFindHostReturnValue(kConfigHostAndPort);
- WaitForMajorityService::get(getServiceContext()).startup(getServiceContext());
+ WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext());
for (const auto& shard : kShardList) {
std::unique_ptr<RemoteCommandTargeterMock> targeter(
diff --git a/src/mongo/db/s/transaction_coordinator.cpp b/src/mongo/db/s/transaction_coordinator.cpp
index 384e4cffeda..1e84a79aee2 100644
--- a/src/mongo/db/s/transaction_coordinator.cpp
+++ b/src/mongo/db/s/transaction_coordinator.cpp
@@ -39,7 +39,6 @@
#include "mongo/db/vector_clock_mutable.h"
#include "mongo/logv2/log.h"
#include "mongo/s/grid.h"
-#include "mongo/util/cancelation.h"
#include "mongo/util/fail_point.h"
namespace mongo {
@@ -59,9 +58,7 @@ ExecutorFuture<void> waitForMajorityWithHangFailpoint(ServiceContext* service,
repl::OpTime opTime) {
auto executor = Grid::get(service)->getExecutorPool()->getFixedExecutor();
auto waitForWC = [service, executor](repl::OpTime opTime) {
- return WaitForMajorityService::get(service)
- .waitUntilMajority(opTime, CancelationToken::uncancelable())
- .thenRunOn(executor);
+ return WaitForMajorityService::get(service).waitUntilMajority(opTime).thenRunOn(executor);
};
if (auto sfp = failpoint.scoped(); MONGO_unlikely(sfp.isActive())) {
diff --git a/src/mongo/db/s/transaction_coordinator_test_fixture.cpp b/src/mongo/db/s/transaction_coordinator_test_fixture.cpp
index 0beec9178f4..560dc51524b 100644
--- a/src/mongo/db/s/transaction_coordinator_test_fixture.cpp
+++ b/src/mongo/db/s/transaction_coordinator_test_fixture.cpp
@@ -66,7 +66,7 @@ void TransactionCoordinatorTestFixture::setUp() {
shardTargeter->setFindHostReturnValue(makeHostAndPort(shardId));
}
- WaitForMajorityService::get(getServiceContext()).startup(getServiceContext());
+ WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext());
}
void TransactionCoordinatorTestFixture::tearDown() {
diff --git a/src/mongo/util/future_util.h b/src/mongo/util/future_util.h
index e0096a81df5..ff846fabc3e 100644
--- a/src/mongo/util/future_util.h
+++ b/src/mongo/util/future_util.h
@@ -206,7 +206,7 @@ public:
* iteration of the loop body threw an exception or otherwise returned an error status, the
* returned ExecutorFuture will contain that error.
*/
- auto on(ExecutorPtr executor, CancelationToken cancelToken)&& {
+ auto on(std::shared_ptr<executor::TaskExecutor> executor, CancelationToken cancelToken)&& {
auto loop = std::make_shared<TryUntilLoop>(
std::move(executor), std::move(_body), std::move(_condition), std::move(cancelToken));
// Launch the recursive chain using the helper class.
@@ -245,7 +245,7 @@ private:
* Mostly needed to clean up lambda captures and make the looping logic more readable.
*/
struct TryUntilLoop : public std::enable_shared_from_this<TryUntilLoop> {
- TryUntilLoop(ExecutorPtr executor,
+ TryUntilLoop(std::shared_ptr<executor::TaskExecutor> executor,
BodyCallable executeLoopBody,
ConditionCallable shouldStopIteration,
CancelationToken cancelToken)
@@ -274,7 +274,7 @@ private:
});
}
- ExecutorPtr executor;
+ std::shared_ptr<executor::TaskExecutor> executor;
BodyCallable executeLoopBody;
ConditionCallable shouldStopIteration;
CancelationToken cancelToken;