diff options
Diffstat (limited to 'src/mongo')
23 files changed, 154 insertions, 405 deletions
diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp index b10120e76ed..de25ae2da20 100644 --- a/src/mongo/db/mongod_main.cpp +++ b/src/mongo/db/mongod_main.cpp @@ -572,7 +572,7 @@ ExitCode _initAndListen(ServiceContext* serviceContext, int listenPort) { "** This mode should only be used to manually repair corrupted auth data"); } - WaitForMajorityService::get(serviceContext).startup(serviceContext); + WaitForMajorityService::get(serviceContext).setUp(serviceContext); // This function may take the global lock. auto shardingInitialized = ShardingInitializationMongoD::get(startupOpCtx.get()) diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index b95622c7ed4..de56a969f75 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -1748,8 +1748,6 @@ env.Library( LIBDEPS=[ '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/rw_concern_d', - '$BUILD_DIR/mongo/util/concurrency/thread_pool', - '$BUILD_DIR/mongo/util/future_util', ], ) diff --git a/src/mongo/db/repl/primary_only_service.cpp b/src/mongo/db/repl/primary_only_service.cpp index 35a0aff7f91..93ee1280340 100644 --- a/src/mongo/db/repl/primary_only_service.cpp +++ b/src/mongo/db/repl/primary_only_service.cpp @@ -365,7 +365,7 @@ void PrimaryOnlyService::onStepUp(const OpTime& stepUpOpTime) { stdx::lock_guard lk(_mutex); auto term = _term; WaitForMajorityService::get(_serviceContext) - .waitUntilMajority(stepUpOpTime, _source.token()) + .waitUntilMajority(stepUpOpTime) .thenRunOn(**_scopedExecutor) .then([this] { if (MONGO_unlikely(PrimaryOnlyServiceSkipRebuildingInstances.shouldFail())) { diff --git a/src/mongo/db/repl/primary_only_service_test.cpp b/src/mongo/db/repl/primary_only_service_test.cpp index 501d28a08dd..ace2ac62288 100644 --- a/src/mongo/db/repl/primary_only_service_test.cpp +++ b/src/mongo/db/repl/primary_only_service_test.cpp @@ -300,7 +300,7 @@ public: ServiceContextMongoDTest::setUp(); auto serviceContext = getServiceContext(); - WaitForMajorityService::get(serviceContext).startup(serviceContext); + WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext()); { auto opCtx = cc().makeOperationContext(); diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp index 086e4d68bd1..9e42924363d 100644 --- a/src/mongo/db/repl/tenant_migration_donor_service.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp @@ -570,7 +570,7 @@ TenantMigrationDonorService::Instance::_markStateDocAsGarbageCollectable( ExecutorFuture<void> TenantMigrationDonorService::Instance::_waitForMajorityWriteConcern( std::shared_ptr<executor::ScopedTaskExecutor> executor, repl::OpTime opTime) { return WaitForMajorityService::get(_serviceContext) - .waitUntilMajority(std::move(opTime), CancelationToken::uncancelable()) + .waitUntilMajority(std::move(opTime)) .thenRunOn(**executor) .then([this, self = shared_from_this()] { stdx::lock_guard<Latch> lg(_mutex); diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp index 06e3a2f0c88..6b5b1fdd5d2 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp @@ -59,7 +59,6 @@ #include "mongo/logv2/log.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/util/assert_util.h" -#include "mongo/util/cancelation.h" #include "mongo/util/future_util.h" namespace mongo { @@ -318,8 +317,7 @@ OpTime TenantMigrationRecipientService::Instance::waitUntilTimestampIsMajorityCo // Wait for the read recipient optime to be majority committed. WaitForMajorityService::get(opCtx->getServiceContext()) - .waitUntilMajority(donorRecipientOpTimePair.recipientOpTime, - CancelationToken::uncancelable()) + .waitUntilMajority(donorRecipientOpTimePair.recipientOpTime) .get(opCtx); return donorRecipientOpTimePair.donorOpTime; } @@ -507,7 +505,7 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_initializeStateDoc( // rollback. auto insertOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); return WaitForMajorityService::get(opCtx->getServiceContext()) - .waitUntilMajority(insertOpTime, CancelationToken::uncancelable()) + .waitUntilMajority(insertOpTime) .semi(); } @@ -865,8 +863,7 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_onCloneSuccess() { uassertStatusOK(tenantMigrationRecipientEntryHelpers::updateStateDoc(opCtx.get(), _stateDoc)); return WaitForMajorityService::get(opCtx->getServiceContext()) - .waitUntilMajority(repl::ReplClientInfo::forClient(cc()).getLastOp(), - CancelationToken::uncancelable()) + .waitUntilMajority(repl::ReplClientInfo::forClient(cc()).getLastOp()) .semi(); } @@ -892,8 +889,7 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_getDataConsistentFu uassertStatusOK( tenantMigrationRecipientEntryHelpers::updateStateDoc(opCtx.get(), _stateDoc)); return WaitForMajorityService::get(opCtx->getServiceContext()) - .waitUntilMajority(repl::ReplClientInfo::forClient(cc()).getLastOp(), - CancelationToken::uncancelable()); + .waitUntilMajority(repl::ReplClientInfo::forClient(cc()).getLastOp()); }) .semi(); } @@ -981,7 +977,7 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_markStateDocAsGarba auto writeOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); return WaitForMajorityService::get(opCtx->getServiceContext()) - .waitUntilMajority(writeOpTime, CancelationToken::uncancelable()) + .waitUntilMajority(writeOpTime) .semi(); } @@ -1134,9 +1130,7 @@ SharedSemiFuture<void> TenantMigrationRecipientService::Instance::_updateStateDo auto opCtx = cc().makeOperationContext(); uassertStatusOK(tenantMigrationRecipientEntryHelpers::updateStateDoc(opCtx.get(), _stateDoc)); return WaitForMajorityService::get(opCtx->getServiceContext()) - .waitUntilMajority(repl::ReplClientInfo::forClient(cc()).getLastOp(), - CancelationToken::uncancelable()) - .share(); + .waitUntilMajority(repl::ReplClientInfo::forClient(cc()).getLastOp()); } ExecutorFuture<void> diff --git a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp index ed9e0016ca8..31f833b75f8 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp @@ -176,7 +176,7 @@ public: serviceContext->setPreciseClockSource( std::make_unique<SharedClockSourceAdapter>(_clkSource)); - WaitForMajorityService::get(serviceContext).startup(serviceContext); + WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext()); // Automatically mark the state doc garbage collectable after data sync completion. globalFailPointRegistry() diff --git a/src/mongo/db/repl/tenant_migration_util.cpp b/src/mongo/db/repl/tenant_migration_util.cpp index ffac7121a2a..66c39f1a897 100644 --- a/src/mongo/db/repl/tenant_migration_util.cpp +++ b/src/mongo/db/repl/tenant_migration_util.cpp @@ -36,7 +36,6 @@ #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/repl_server_parameters_gen.h" #include "mongo/db/repl/wait_for_majority_service.h" -#include "mongo/util/cancelation.h" #include "mongo/util/future_util.h" namespace mongo { @@ -92,7 +91,7 @@ ExecutorFuture<void> storeExternalClusterTimeKeyDocsAndRefreshCache( const auto opTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); return WaitForMajorityService::get(opCtx->getServiceContext()) - .waitUntilMajority(opTime, CancelationToken::uncancelable()) + .waitUntilMajority(opTime) .thenRunOn(**executor) .then([] { auto opCtxHolder = cc().makeOperationContext(); diff --git a/src/mongo/db/repl/wait_for_majority_service.cpp b/src/mongo/db/repl/wait_for_majority_service.cpp index 0cb9e51f9f4..c316501663e 100644 --- a/src/mongo/db/repl/wait_for_majority_service.cpp +++ b/src/mongo/db/repl/wait_for_majority_service.cpp @@ -1,4 +1,4 @@ -/* +/** * Copyright (C) 2019-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify @@ -40,9 +40,8 @@ #include "mongo/executor/network_interface_factory.h" #include "mongo/executor/thread_pool_task_executor.h" #include "mongo/logv2/log.h" +#include "mongo/util/concurrency/idle_thread_block.h" #include "mongo/util/concurrency/thread_pool.h" -#include "mongo/util/future_util.h" -#include "mongo/util/static_immortal.h" namespace mongo { @@ -50,27 +49,9 @@ namespace { const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority, WriteConcernOptions::SyncMode::UNSET, WriteConcernOptions::kNoTimeout); + const auto waitForMajorityServiceDecoration = ServiceContext::declareDecoration<WaitForMajorityService>(); - -constexpr static auto kWaitClientName = "WaitForMajorityServiceWaiter"; -constexpr static auto kCancelClientName = "WaitForMajorityServiceCanceler"; - -std::unique_ptr<ThreadPool> makeThreadPool() { - ThreadPool::Options options; - options.poolName = "WaitForMajorityServiceThreadPool"; - options.minThreads = 0; - // This service must have the ability to use at least two background threads. If it is limited - // to one, than if that thread is blocking waiting on an opTime, any cancellations cannot be - // completed until that wait is complete. - options.maxThreads = 2; - return std::make_unique<ThreadPool>(options); -} -inline Status waitUntilMajorityCanceledStatus() { - static StaticImmortal s = - Status{ErrorCodes::CallbackCanceled, "WaitForMajorityService::waitUntilMajority canceled"}; - return *s; -} } // namespace WaitForMajorityService::~WaitForMajorityService() { @@ -81,156 +62,139 @@ WaitForMajorityService& WaitForMajorityService::get(ServiceContext* service) { return waitForMajorityServiceDecoration(service); } -void WaitForMajorityService::startup(ServiceContext* ctx) { +void WaitForMajorityService::setUp(ServiceContext* service) { stdx::lock_guard lk(_mutex); - invariant(_state == State::kNotStarted); - _pool = makeThreadPool(); - _waitForMajorityClient = ClientStrand::make(ctx->makeClient(kWaitClientName)); - _waitForMajorityCancelationClient = ClientStrand::make(ctx->makeClient(kCancelClientName)); - _backgroundWorkComplete = _periodicallyWaitForMajority(); - _pool->startup(); - _state = State::kRunning; + + if (!_thread.joinable() && !_inShutDown) { + _thread = stdx::thread([this, service] { _periodicallyWaitForMajority(service); }); + } } void WaitForMajorityService::shutDown() { { stdx::lock_guard lk(_mutex); - if (_state != State::kRunning) { + if (std::exchange(_inShutDown, true)) { return; } - _state = State::kShutdown; - _waitForMajorityClient->getClientPointer()->setKilled(); - _waitForMajorityCancelationClient->getClientPointer()->setKilled(); - - for (auto&& request : _queuedOpTimes) { - if (!request.second->hasBeenProcessed.swap(true)) { - request.second->result.setError( - {ErrorCodes::InterruptedAtShutdown, "Shutting down wait for majority service"}); - } + if (_opCtx) { + stdx::lock_guard scopedClientLock(*_opCtx->getClient()); + _opCtx->getServiceContext()->killOperation( + scopedClientLock, _opCtx, ErrorCodes::InterruptedAtShutdown); } - _hasNewOpTimeCV.notifyAllAndClose(); } - _pool->shutdown(); - _pool->join(); - _backgroundWorkComplete->wait(); - // It's important to reset the clientstrand pointers after waiting for work - // in the thread pool to complete since that work might be using the client - // objects. - _waitForMajorityClient.reset(); - _waitForMajorityCancelationClient.reset(); -} -SemiFuture<void> WaitForMajorityService::waitUntilMajority(const repl::OpTime& opTime, - const CancelationToken& cancelToken) { - - auto [promise, future] = makePromiseFuture<void>(); - auto request = std::make_shared<Request>(std::move(promise)); + if (_thread.joinable()) { + _thread.join(); + } stdx::lock_guard lk(_mutex); + for (auto&& pendingRequest : _queuedOpTimes) { + pendingRequest.second.setError( + {ErrorCodes::InterruptedAtShutdown, "Shutting down wait for majority service"}); + } - tassert(5065600, - "WaitForMajorityService must be started before calling waitUntilMajority", - _state != State::kNotStarted); + _queuedOpTimes.clear(); +} - if (_state == State::kShutdown) { - return {SemiFuture<void>::makeReady( +SharedSemiFuture<void> WaitForMajorityService::waitUntilMajority(const repl::OpTime& opTime) { + stdx::lock_guard lk(_mutex); + + if (_inShutDown) { + return {Future<void>::makeReady( Status{ErrorCodes::ShutdownInProgress, "rejecting wait for majority request due to server shutdown"})}; } + // Background thread must be running before requesting. + invariant(_thread.joinable()); + if (_lastOpTimeWaited >= opTime) { - return {SemiFuture<void>::makeReady()}; + return {Future<void>::makeReady()}; } - if (cancelToken.isCanceled()) { - return {SemiFuture<void>::makeReady(waitUntilMajorityCanceledStatus())}; + auto iter = _queuedOpTimes.lower_bound(opTime); + if (iter != _queuedOpTimes.end()) { + if (iter->first == opTime) { + return iter->second.getFuture(); + } } - const bool wasEmpty = _queuedOpTimes.empty(); - - if (!wasEmpty && opTime < _queuedOpTimes.begin()->first) { + if (iter == _queuedOpTimes.begin()) { // Background thread could already be actively waiting on a later time, so tell it to stop // and wait for the newly requested opTime instead. - stdx::lock_guard scopedClientLock(*_waitForMajorityClient->getClientPointer()); - if (auto opCtx = _waitForMajorityClient->getClientPointer()->getOperationContext()) - opCtx->getServiceContext()->killOperation( - scopedClientLock, opCtx, ErrorCodes::WaitForMajorityServiceEarlierOpTimeAvailable); + if (_opCtx) { + stdx::lock_guard scopedClientLock(*_opCtx->getClient()); + _opCtx->getServiceContext()->killOperation( + scopedClientLock, _opCtx, ErrorCodes::WaitForMajorityServiceEarlierOpTimeAvailable); + } } - auto resultIter = _queuedOpTimes.emplace( - std::piecewise_construct, std::forward_as_tuple(opTime), std::forward_as_tuple(request)); - + const bool wasEmpty = _queuedOpTimes.empty(); + auto resultIter = _queuedOpTimes.emplace_hint( + iter, std::piecewise_construct, std::forward_as_tuple(opTime), std::forward_as_tuple()); if (wasEmpty) { - // Notify the background thread that work is now available. - _hasNewOpTimeCV.notifyAllAndReset(); + _hasNewOpTimeCV.notify_one(); } - cancelToken.onCancel().thenRunOn(_pool).getAsync([this, resultIter, request](Status s) { - if (!s.isOK()) { - return; + return resultIter->second.getFuture(); +} + +void WaitForMajorityService::_periodicallyWaitForMajority(ServiceContext* service) { + ThreadClient tc("waitForMajority", service); + + stdx::unique_lock<Latch> lk(_mutex); + + while (!_inShutDown) { + auto opCtx = tc->makeOperationContext(); + _opCtx = opCtx.get(); + + if (!_queuedOpTimes.empty()) { + auto lowestOpTimeIter = _queuedOpTimes.begin(); + auto lowestOpTime = lowestOpTimeIter->first; + + lk.unlock(); + + WriteConcernResult ignoreResult; + auto status = + waitForWriteConcern(_opCtx, lowestOpTime, kMajorityWriteConcern, &ignoreResult); + + lk.lock(); + + if (status.isOK()) { + _lastOpTimeWaited = lowestOpTime; + } + + if (status == ErrorCodes::WaitForMajorityServiceEarlierOpTimeAvailable) { + _opCtx = nullptr; + continue; + } + + if (status.isOK()) { + lowestOpTimeIter->second.emplaceValue(); + } else { + lowestOpTimeIter->second.setError(status); + } + + _queuedOpTimes.erase(lowestOpTimeIter); } - auto clientGuard = _waitForMajorityCancelationClient->bind(); - if (!request->hasBeenProcessed.swap(true)) { - request->result.setError(waitUntilMajorityCanceledStatus()); - stdx::lock_guard lk(_mutex); - _queuedOpTimes.erase(resultIter); + + try { + MONGO_IDLE_THREAD_BLOCK; + _opCtx->waitForConditionOrInterrupt( + _hasNewOpTimeCV, lk, [&] { return !_queuedOpTimes.empty() || _inShutDown; }); + } catch (const DBException& e) { + LOGV2_DEBUG(22487, + 1, + "Unable to wait for new op time due to: {error}", + "Unable to wait for new op time", + "error"_attr = e); } - }); - return std::move(future).semi(); -} -SemiFuture<void> WaitForMajorityService::_periodicallyWaitForMajority() { - return AsyncTry([this] { - stdx::unique_lock<Latch> lk(_mutex); - if (_queuedOpTimes.empty()) { - return _hasNewOpTimeCV.onNotify(); - } - auto clientGuard = _waitForMajorityClient->bind(); - auto opCtx = clientGuard->makeOperationContext(); - - // This needs to be a copy since we unlock the lock before waiting for write concern - // and the iterator could be invalidated. - auto lowestOpTime = _queuedOpTimes.begin()->first; - - lk.unlock(); - - WriteConcernResult ignoreResult; - auto status = waitForWriteConcern( - opCtx.get(), lowestOpTime, kMajorityWriteConcern, &ignoreResult); - - lk.lock(); - - if (status.isOK()) { - _lastOpTimeWaited = lowestOpTime; - } - - if (status != ErrorCodes::WaitForMajorityServiceEarlierOpTimeAvailable) { - auto [lowestOpTimeIter, firstElemWithHigherOpTimeIter] = - _queuedOpTimes.equal_range(lowestOpTime); - - for (auto requestIt = lowestOpTimeIter; - requestIt != firstElemWithHigherOpTimeIter; - /*Increment in loop*/) { - if (!requestIt->second->hasBeenProcessed.swap(true)) { - requestIt->second->result.setFrom(status); - requestIt = _queuedOpTimes.erase(requestIt); - } else { - ++requestIt; - } - } - } - return SemiFuture<void>::makeReady(); - }) - .until([](Status) { - // Loop forever until _pool is shut down. - // TODO (SERVER-53766): Replace with condition-free looping utility. - return false; - }) - .on(_pool, CancelationToken::uncancelable()) - .semi(); + _opCtx = nullptr; + } } } // namespace mongo diff --git a/src/mongo/db/repl/wait_for_majority_service.h b/src/mongo/db/repl/wait_for_majority_service.h index 181ed60b265..1663ce907ac 100644 --- a/src/mongo/db/repl/wait_for_majority_service.h +++ b/src/mongo/db/repl/wait_for_majority_service.h @@ -33,57 +33,15 @@ #include <memory> #include <vector> -#include <boost/optional.hpp> -#include <boost/utility/in_place_factory.hpp> - -#include "mongo/db/client_strand.h" #include "mongo/db/repl/optime.h" #include "mongo/db/service_context.h" #include "mongo/executor/task_executor.h" #include "mongo/platform/mutex.h" #include "mongo/stdx/thread.h" -#include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/future.h" namespace mongo { -namespace detail { - -class AsyncConditionVariable { -public: - AsyncConditionVariable() : _current(boost::in_place()) {} - - SemiFuture<void> onNotify() { - stdx::lock_guard lk(_mutex); - return _current->getFuture().semi(); - } - - void notifyAllAndReset() { - stdx::lock_guard lk(_mutex); - if (_inShutdown) { - return; - } - _current->emplaceValue(); - _current = boost::in_place(); - } - - void notifyAllAndClose() { - stdx::lock_guard lk(_mutex); - if (_inShutdown) { - return; - } - _inShutdown = true; - _current->emplaceValue(); - } - -private: - mutable Mutex _mutex = MONGO_MAKE_LATCH("AsyncConditionVariable::_mutex"); - boost::optional<SharedPromise<void>> _current; - bool _inShutdown{false}; -}; -} // namespace detail - - /** * Provides a facility for asynchronously waiting a local opTime to be majority committed. */ @@ -94,10 +52,9 @@ public: static WaitForMajorityService& get(ServiceContext* service); /** - * Sets up the background thread pool responsible for waiting for opTimes to be majority - * committed. + * Sets up the background thread responsible for waiting for opTimes to be majority committed. */ - void startup(ServiceContext* ctx); + void setUp(ServiceContext* service); /** * Blocking method, which shuts down and joins the background thread. @@ -107,40 +64,16 @@ public: /** * Enqueue a request to wait for the given opTime to be majority committed. */ - SemiFuture<void> waitUntilMajority(const repl::OpTime& opTime, - const CancelationToken& cancelToken); + SharedSemiFuture<void> waitUntilMajority(const repl::OpTime& opTime); private: - enum class State { kNotStarted, kRunning, kShutdown }; - // State is kNotStarted on construction, kRunning after WaitForMajorityService::startup() has - // been called, and kShutdown after WaitForMajorityService::shutdown() has been called. It is - // illegal to call WaitForMajorityService::waitUntilMajority before calling startup. - State _state{State::kNotStarted}; - /** - * Internal representation of an individual request to wait on some particular optime. - */ - struct Request { - explicit Request(Promise<void> promise) - : hasBeenProcessed{false}, result(std::move(promise)) {} - AtomicWord<bool> hasBeenProcessed; - Promise<void> result; - }; - - using OpTimeWaitingMap = std::multimap<repl::OpTime, std::shared_ptr<Request>>; + using OpTimeWaitingMap = std::map<repl::OpTime, SharedPromise<void>>; /** * Periodically checks the list of opTimes to wait for majority committed. */ - SemiFuture<void> _periodicallyWaitForMajority(); + void _periodicallyWaitForMajority(ServiceContext* service); - // The pool of threads available to wait on opTimes and cancel existing requests. - std::shared_ptr<ThreadPool> _pool; - - // This future is completed when the service has finished all of its work and is ready for - // shutdown. - boost::optional<SemiFuture<void>> _backgroundWorkComplete; - - // This mutex synchronizes access to the members declared below. Mutex _mutex = MONGO_MAKE_LATCH("WaitForMajorityService::_mutex"); // Contains an ordered list of opTimes to wait to be majority comitted. @@ -150,15 +83,18 @@ private: // majority comitted. repl::OpTime _lastOpTimeWaited; + // The background thread. + stdx::thread _thread; + // Use for signalling new opTime requests being queued. - detail::AsyncConditionVariable _hasNewOpTimeCV; + stdx::condition_variable _hasNewOpTimeCV; - // Manages the Client responsible for the thread that waits on opTimes. - ClientStrandPtr _waitForMajorityClient; + // If set, contains a reference to the opCtx being used by the background thread. + // Only valid when _thread.joinable() and not nullptr. + OperationContext* _opCtx{nullptr}; - // Manages the Client responsible for the thread that cancels existing requests to wait on - // opTimes. - ClientStrandPtr _waitForMajorityCancelationClient; + // Flag is set to true after shutDown() is called. + bool _inShutDown{false}; }; } // namespace mongo diff --git a/src/mongo/db/repl/wait_for_majority_service_test.cpp b/src/mongo/db/repl/wait_for_majority_service_test.cpp index 14c0266ee89..c68158b97fd 100644 --- a/src/mongo/db/repl/wait_for_majority_service_test.cpp +++ b/src/mongo/db/repl/wait_for_majority_service_test.cpp @@ -34,7 +34,6 @@ #include "mongo/db/service_context_d_test_fixture.h" #include "mongo/platform/mutex.h" #include "mongo/unittest/unittest.h" -#include "mongo/util/cancelation.h" namespace mongo { namespace { @@ -42,9 +41,8 @@ namespace { class WaitForMajorityServiceTest : public ServiceContextMongoDTest { public: void setUp() override { - ServiceContextMongoDTest::setUp(); auto service = getServiceContext(); - waitService()->startup(service); + waitService()->setUp(service); auto replCoord = std::make_unique<repl::ReplicationCoordinatorMock>(service); @@ -59,7 +57,6 @@ public: void tearDown() override { waitService()->shutDown(); - ServiceContextMongoDTest::tearDown(); } WaitForMajorityService* waitService() { @@ -108,9 +105,6 @@ public: _callCountChangedCV.wait(lk, [&] { return _waitForMajorityCallCount > expectedCount; }); } - static inline const Status kCanceledStatus = {ErrorCodes::CallbackCanceled, - "waitForMajority canceled"}; - private: WaitForMajorityService _waitForMajorityService; @@ -124,36 +118,10 @@ private: int _waitForMajorityCallCount{0}; }; -class WaitForMajorityServiceNoStartupTest : public ServiceContextMongoDTest { -public: - void setUp() override { - ServiceContextMongoDTest::setUp(); - } - - void tearDown() override { - waitService()->shutDown(); - ServiceContextMongoDTest::tearDown(); - } - WaitForMajorityService* waitService() { - return &_waitForMajorityService; - } - -private: - WaitForMajorityService _waitForMajorityService; -}; - -TEST_F(WaitForMajorityServiceTest, ShutdownImmediatelyAfterStartupDoesNotCrashOrHang) { - waitService()->shutDown(); -} - -TEST_F(WaitForMajorityServiceNoStartupTest, ShutdownBeforeStartupDoesNotCrashOrHang) { - waitService()->shutDown(); -} - TEST_F(WaitForMajorityServiceTest, WaitOneOpTime) { repl::OpTime t1(Timestamp(1, 0), 2); - auto future = waitService()->waitUntilMajority(t1, CancelationToken::uncancelable()); + auto future = waitService()->waitUntilMajority(t1); ASSERT_FALSE(future.isReady()); @@ -166,8 +134,8 @@ TEST_F(WaitForMajorityServiceTest, WaitOneOpTime) { TEST_F(WaitForMajorityServiceTest, WaitWithSameOpTime) { repl::OpTime t1(Timestamp(1, 0), 2); - auto future1 = waitService()->waitUntilMajority(t1, CancelationToken::uncancelable()); - auto future1b = waitService()->waitUntilMajority(t1, CancelationToken::uncancelable()); + auto future1 = waitService()->waitUntilMajority(t1); + auto future1b = waitService()->waitUntilMajority(t1); ASSERT_FALSE(future1.isReady()); ASSERT_FALSE(future1b.isReady()); @@ -184,15 +152,13 @@ TEST_F(WaitForMajorityServiceTest, WaitWithOpTimeEarlierThanLowestQueued) { repl::OpTime laterOpTime(Timestamp(6, 0), 2); repl::OpTime earlierOpTime(Timestamp(1, 0), 2); - auto laterFuture = - waitService()->waitUntilMajority(laterOpTime, CancelationToken::uncancelable()); + auto laterFuture = waitService()->waitUntilMajority(laterOpTime); // Wait until the background thread picks up the queued opTime. waitForMajorityCallCountGreaterThan(0); // The 2nd request has an earlier time, so it will interrupt 'laterOpTime' and skip the line. - auto earlierFuture = - waitService()->waitUntilMajority(earlierOpTime, CancelationToken::uncancelable()); + auto earlierFuture = waitService()->waitUntilMajority(earlierOpTime); // Wait for background thread to finish transitioning from waiting on laterOpTime to // earlierOpTime. @@ -218,8 +184,8 @@ TEST_F(WaitForMajorityServiceTest, WaitWithDifferentOpTime) { repl::OpTime t1(Timestamp(1, 0), 2); repl::OpTime t2(Timestamp(14, 0), 2); - auto future1 = waitService()->waitUntilMajority(t1, CancelationToken::uncancelable()); - auto future2 = waitService()->waitUntilMajority(t2, CancelationToken::uncancelable()); + auto future1 = waitService()->waitUntilMajority(t1); + auto future2 = waitService()->waitUntilMajority(t2); ASSERT_FALSE(future1.isReady()); ASSERT_FALSE(future2.isReady()); @@ -241,8 +207,8 @@ TEST_F(WaitForMajorityServiceTest, WaitWithOpTimeEarlierThanOpTimeAlreadyWaited) repl::OpTime t1(Timestamp(5, 0), 2); repl::OpTime t2(Timestamp(14, 0), 2); - auto future1 = waitService()->waitUntilMajority(t1, CancelationToken::uncancelable()); - auto future2 = waitService()->waitUntilMajority(t2, CancelationToken::uncancelable()); + auto future1 = waitService()->waitUntilMajority(t1); + auto future2 = waitService()->waitUntilMajority(t2); ASSERT_FALSE(future1.isReady()); ASSERT_FALSE(future2.isReady()); @@ -255,9 +221,8 @@ TEST_F(WaitForMajorityServiceTest, WaitWithOpTimeEarlierThanOpTimeAlreadyWaited) ASSERT_EQ(t1, getLastOpTimeWaited()); repl::OpTime oldTs(Timestamp(4, 0), 2); - auto oldFuture = waitService()->waitUntilMajority(oldTs, CancelationToken::uncancelable()); - auto alreadyWaitedFuture = - waitService()->waitUntilMajority(t1, CancelationToken::uncancelable()); + auto oldFuture = waitService()->waitUntilMajority(oldTs); + auto alreadyWaitedFuture = waitService()->waitUntilMajority(t1); ASSERT_FALSE(future2.isReady()); @@ -271,13 +236,12 @@ TEST_F(WaitForMajorityServiceTest, WaitWithOpTimeEarlierThanOpTimeAlreadyWaited) ASSERT_EQ(t2, getLastOpTimeWaited()); } - TEST_F(WaitForMajorityServiceTest, ShutdownShouldCancelQueuedRequests) { repl::OpTime t1(Timestamp(5, 0), 2); repl::OpTime t2(Timestamp(14, 0), 2); - auto future1 = waitService()->waitUntilMajority(t1, CancelationToken::uncancelable()); - auto future2 = waitService()->waitUntilMajority(t2, CancelationToken::uncancelable()); + auto future1 = waitService()->waitUntilMajority(t1); + auto future2 = waitService()->waitUntilMajority(t2); ASSERT_FALSE(future1.isReady()); ASSERT_FALSE(future2.isReady()); @@ -299,113 +263,9 @@ TEST_F(WaitForMajorityServiceTest, WriteConcernErrorGetsPropagatedCorrectly) { {ErrorCodes::PrimarySteppedDown, "test stepdown"}, Milliseconds(0)); }); - auto future = waitService()->waitUntilMajority(t, CancelationToken::uncancelable()); + auto future = waitService()->waitUntilMajority(t); ASSERT_THROWS_CODE(future.get(), AssertionException, ErrorCodes::PrimarySteppedDown); } -TEST_F(WaitForMajorityServiceTest, CanCancelWaitOnOneOptime) { - repl::OpTime t(Timestamp(1, 2), 4); - CancelationSource source; - auto future = waitService()->waitUntilMajority(t, source.token()); - ASSERT_FALSE(future.isReady()); - source.cancel(); - // The future should now become ready without having to wait for any opTime. - ASSERT_EQ(future.getNoThrow(), kCanceledStatus); -} - -TEST_F(WaitForMajorityServiceTest, CancelingEarlierOpTimeRequestDoesNotAffectLaterOpTimeRequests) { - repl::OpTime earlier(Timestamp(1, 2), 4); - repl::OpTime later(Timestamp(5, 2), 5); - CancelationSource source; - auto cancelFuture = waitService()->waitUntilMajority(earlier, source.token()); - auto uncancelableFuture = - waitService()->waitUntilMajority(later, CancelationToken::uncancelable()); - ASSERT_FALSE(cancelFuture.isReady()); - ASSERT_FALSE(uncancelableFuture.isReady()); - // Wait until the background thread picks up the initial request. Otherwise, there is a race - // between the cancellation callback removing the initial request and the background thread - // waiting on it. - waitForMajorityCallCountGreaterThan(0); - source.cancel(); - // The future should now become ready without having to wait for any opTime. - ASSERT_EQ(cancelFuture.getNoThrow(), kCanceledStatus); - ASSERT_FALSE(uncancelableFuture.isReady()); - finishWaitingOneOpTime(); - ASSERT_FALSE(uncancelableFuture.isReady()); - finishWaitingOneOpTime(); - uncancelableFuture.wait(); - ASSERT_EQ(later, getLastOpTimeWaited()); -} - -TEST_F(WaitForMajorityServiceTest, CancelingOneRequestOnOpTimeDoesNotAffectOthersOnSameOpTime) { - repl::OpTime t1(Timestamp(1, 2), 4); - repl::OpTime t1Dupe(Timestamp(1, 2), 4); - CancelationSource source; - auto cancelFuture = waitService()->waitUntilMajority(t1, source.token()); - auto uncancelableFuture = - waitService()->waitUntilMajority(t1Dupe, CancelationToken::uncancelable()); - ASSERT_FALSE(cancelFuture.isReady()); - ASSERT_FALSE(uncancelableFuture.isReady()); - source.cancel(); - // The future should now become ready without having to wait for any opTime. - ASSERT_EQ(cancelFuture.getNoThrow(), kCanceledStatus); - ASSERT_FALSE(uncancelableFuture.isReady()); - finishWaitingOneOpTime(); - uncancelableFuture.wait(); - ASSERT_EQ(t1, getLastOpTimeWaited()); -} - -TEST_F(WaitForMajorityServiceTest, CancelingLaterOpTimeRequestDoesNotAffectEarlierOpTimeRequests) { - repl::OpTime t1(Timestamp(1, 2), 4); - repl::OpTime smallerOpTime(Timestamp(1, 2), 1); - CancelationSource source; - auto cancelFuture = waitService()->waitUntilMajority(t1, source.token()); - // Wait until the background thread picks up the queued opTime. - waitForMajorityCallCountGreaterThan(0); - auto earlierFuture = - waitService()->waitUntilMajority(smallerOpTime, CancelationToken::uncancelable()); - // Wait for background thread to finish transitioning from waiting on t1 to smallerOpTime. - waitForMajorityCallCountGreaterThan(1); - ASSERT_FALSE(cancelFuture.isReady()); - ASSERT_FALSE(earlierFuture.isReady()); - source.cancel(); - // The future should now become ready without having to wait for any opTime. - ASSERT_EQ(cancelFuture.getNoThrow(), kCanceledStatus); - ASSERT_FALSE(earlierFuture.isReady()); - finishWaitingOneOpTime(); - earlierFuture.wait(); - ASSERT_EQ(smallerOpTime, getLastOpTimeWaited()); -} - -TEST_F(WaitForMajorityServiceTest, SafeToCallCancelOnRequestAlreadyCompletedByShutdown) { - repl::OpTime t(Timestamp(1, 2), 4); - CancelationSource source; - auto deadFuture = waitService()->waitUntilMajority(t, source.token()); - ASSERT_FALSE(deadFuture.isReady()); - waitService()->shutDown(); - ASSERT(deadFuture.isReady()); - ASSERT_THROWS_CODE(deadFuture.get(), AssertionException, ErrorCodes::InterruptedAtShutdown); - source.cancel(); -} - -TEST_F(WaitForMajorityServiceTest, SafeToCallCancelOnRequestAlreadyCompletedByWaiting) { - repl::OpTime t(Timestamp(1, 2), 4); - CancelationSource source; - auto future = waitService()->waitUntilMajority(t, source.token()); - ASSERT_FALSE(future.isReady()); - waitForMajorityCallCountGreaterThan(0); - finishWaitingOneOpTime(); - future.get(); - ASSERT_EQ(t, getLastOpTimeWaited()); - source.cancel(); -} - -TEST_F(WaitForMajorityServiceTest, PassingAlreadyCanceledTokenCompletesFutureWithNoWaiting) { - repl::OpTime t(Timestamp(1, 2), 4); - CancelationSource source; - source.cancel(); - auto future = waitService()->waitUntilMajority(t, source.token()); - ASSERT_EQ(future.getNoThrow(), kCanceledStatus); -} } // namespace } // namespace mongo diff --git a/src/mongo/db/s/collection_sharding_runtime_test.cpp b/src/mongo/db/s/collection_sharding_runtime_test.cpp index db87d219711..1779b75b60b 100644 --- a/src/mongo/db/s/collection_sharding_runtime_test.cpp +++ b/src/mongo/db/s/collection_sharding_runtime_test.cpp @@ -188,7 +188,7 @@ class CollectionShardingRuntimeWithRangeDeleterTest : public CollectionShardingR public: void setUp() override { ShardServerTestFixture::setUp(); - WaitForMajorityService::get(getServiceContext()).startup(getServiceContext()); + WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext()); // Set up replication coordinator to be primary and have no replication delay. auto replCoord = std::make_unique<repl::ReplicationCoordinatorMock>(getServiceContext()); replCoord->setCanAcceptNonLocalWrites(true); diff --git a/src/mongo/db/s/migration_util_test.cpp b/src/mongo/db/s/migration_util_test.cpp index 1058c7e83a0..fbe7cf9a40d 100644 --- a/src/mongo/db/s/migration_util_test.cpp +++ b/src/mongo/db/s/migration_util_test.cpp @@ -349,7 +349,7 @@ public: configTargeterMock()->setFindHostReturnValue(kConfigHostAndPort); - WaitForMajorityService::get(getServiceContext()).startup(getServiceContext()); + WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext()); // Set up 2 default shards. for (const auto& shard : kShardList) { diff --git a/src/mongo/db/s/range_deletion_util.cpp b/src/mongo/db/s/range_deletion_util.cpp index 1468b2a74c9..1c2189e7177 100644 --- a/src/mongo/db/s/range_deletion_util.cpp +++ b/src/mongo/db/s/range_deletion_util.cpp @@ -407,7 +407,7 @@ ExecutorFuture<void> waitForDeletionsToMajorityReplicate( // Asynchronously wait for majority write concern. return WaitForMajorityService::get(opCtx->getServiceContext()) - .waitUntilMajority(clientOpTime, CancelationToken::uncancelable()) + .waitUntilMajority(clientOpTime) .thenRunOn(executor); }); } diff --git a/src/mongo/db/s/range_deletion_util_test.cpp b/src/mongo/db/s/range_deletion_util_test.cpp index 94088b7751a..12b5e11546d 100644 --- a/src/mongo/db/s/range_deletion_util_test.cpp +++ b/src/mongo/db/s/range_deletion_util_test.cpp @@ -57,7 +57,7 @@ public: void setUp() override { ShardServerTestFixture::setUp(); - WaitForMajorityService::get(getServiceContext()).startup(getServiceContext()); + WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext()); // Set up replication coordinator to be primary and have no replication delay. auto replCoord = std::make_unique<repl::ReplicationCoordinatorMock>(getServiceContext()); replCoord->setCanAcceptNonLocalWrites(true); diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp index 686fb22b23e..a582302f029 100644 --- a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp @@ -76,6 +76,7 @@ TEST_F(ReshardingDonorRecipientCommonInternalsTest, assertRecipientDocMatchesReshardingFields(metadata, reshardingFields, recipientDoc); } + TEST_F(ReshardingDonorRecipientCommonTest, CreateDonorServiceInstance) { OperationContext* opCtx = operationContext(); auto metadata = makeShardedMetadataForOriginalCollection(opCtx, kThisShard); diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h index 2e7ef3c48b2..a0bdb181523 100644 --- a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h +++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h @@ -212,7 +212,7 @@ public: void setUp() override { ShardServerTestFixture::setUp(); - WaitForMajorityService::get(getServiceContext()).startup(getServiceContext()); + WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext()); _registry = repl::PrimaryOnlyServiceRegistry::get(getServiceContext()); diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp index 4e8189d6238..2b9aed54b82 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp @@ -104,7 +104,7 @@ public: shardTargeter->setFindHostReturnValue(makeHostAndPort(shardId)); } - WaitForMajorityService::get(getServiceContext()).startup(getServiceContext()); + WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext()); // onStepUp() relies on the storage interface to create the config.transactions table. repl::StorageInterface::set(getServiceContext(), diff --git a/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp b/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp index 36d05e65692..db10b0a6bf6 100644 --- a/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp +++ b/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp @@ -114,7 +114,7 @@ class ReshardingTxnClonerTest : public ShardServerTestFixture { shardTargeter->setFindHostReturnValue(makeHostAndPort(shardId)); } - WaitForMajorityService::get(getServiceContext()).startup(getServiceContext()); + WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext()); // onStepUp() relies on the storage interface to create the config.transactions table. repl::StorageInterface::set(getServiceContext(), diff --git a/src/mongo/db/s/resharding_destined_recipient_test.cpp b/src/mongo/db/s/resharding_destined_recipient_test.cpp index 4fe39db2dcb..7f64d66cdbd 100644 --- a/src/mongo/db/s/resharding_destined_recipient_test.cpp +++ b/src/mongo/db/s/resharding_destined_recipient_test.cpp @@ -105,7 +105,7 @@ public: configTargeterMock()->setFindHostReturnValue(kConfigHostAndPort); - WaitForMajorityService::get(getServiceContext()).startup(getServiceContext()); + WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext()); for (const auto& shard : kShardList) { std::unique_ptr<RemoteCommandTargeterMock> targeter( diff --git a/src/mongo/db/s/transaction_coordinator.cpp b/src/mongo/db/s/transaction_coordinator.cpp index 384e4cffeda..1e84a79aee2 100644 --- a/src/mongo/db/s/transaction_coordinator.cpp +++ b/src/mongo/db/s/transaction_coordinator.cpp @@ -39,7 +39,6 @@ #include "mongo/db/vector_clock_mutable.h" #include "mongo/logv2/log.h" #include "mongo/s/grid.h" -#include "mongo/util/cancelation.h" #include "mongo/util/fail_point.h" namespace mongo { @@ -59,9 +58,7 @@ ExecutorFuture<void> waitForMajorityWithHangFailpoint(ServiceContext* service, repl::OpTime opTime) { auto executor = Grid::get(service)->getExecutorPool()->getFixedExecutor(); auto waitForWC = [service, executor](repl::OpTime opTime) { - return WaitForMajorityService::get(service) - .waitUntilMajority(opTime, CancelationToken::uncancelable()) - .thenRunOn(executor); + return WaitForMajorityService::get(service).waitUntilMajority(opTime).thenRunOn(executor); }; if (auto sfp = failpoint.scoped(); MONGO_unlikely(sfp.isActive())) { diff --git a/src/mongo/db/s/transaction_coordinator_test_fixture.cpp b/src/mongo/db/s/transaction_coordinator_test_fixture.cpp index 0beec9178f4..560dc51524b 100644 --- a/src/mongo/db/s/transaction_coordinator_test_fixture.cpp +++ b/src/mongo/db/s/transaction_coordinator_test_fixture.cpp @@ -66,7 +66,7 @@ void TransactionCoordinatorTestFixture::setUp() { shardTargeter->setFindHostReturnValue(makeHostAndPort(shardId)); } - WaitForMajorityService::get(getServiceContext()).startup(getServiceContext()); + WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext()); } void TransactionCoordinatorTestFixture::tearDown() { diff --git a/src/mongo/util/future_util.h b/src/mongo/util/future_util.h index e0096a81df5..ff846fabc3e 100644 --- a/src/mongo/util/future_util.h +++ b/src/mongo/util/future_util.h @@ -206,7 +206,7 @@ public: * iteration of the loop body threw an exception or otherwise returned an error status, the * returned ExecutorFuture will contain that error. */ - auto on(ExecutorPtr executor, CancelationToken cancelToken)&& { + auto on(std::shared_ptr<executor::TaskExecutor> executor, CancelationToken cancelToken)&& { auto loop = std::make_shared<TryUntilLoop>( std::move(executor), std::move(_body), std::move(_condition), std::move(cancelToken)); // Launch the recursive chain using the helper class. @@ -245,7 +245,7 @@ private: * Mostly needed to clean up lambda captures and make the looping logic more readable. */ struct TryUntilLoop : public std::enable_shared_from_this<TryUntilLoop> { - TryUntilLoop(ExecutorPtr executor, + TryUntilLoop(std::shared_ptr<executor::TaskExecutor> executor, BodyCallable executeLoopBody, ConditionCallable shouldStopIteration, CancelationToken cancelToken) @@ -274,7 +274,7 @@ private: }); } - ExecutorPtr executor; + std::shared_ptr<executor::TaskExecutor> executor; BodyCallable executeLoopBody; ConditionCallable shouldStopIteration; CancelationToken cancelToken; |