diff options
Diffstat (limited to 'src/mongo/db/repl/wait_for_majority_service.cpp')
-rw-r--r-- | src/mongo/db/repl/wait_for_majority_service.cpp | 234 |
1 files changed, 99 insertions, 135 deletions
diff --git a/src/mongo/db/repl/wait_for_majority_service.cpp b/src/mongo/db/repl/wait_for_majority_service.cpp index 0cb9e51f9f4..c316501663e 100644 --- a/src/mongo/db/repl/wait_for_majority_service.cpp +++ b/src/mongo/db/repl/wait_for_majority_service.cpp @@ -1,4 +1,4 @@ -/* +/** * Copyright (C) 2019-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify @@ -40,9 +40,8 @@ #include "mongo/executor/network_interface_factory.h" #include "mongo/executor/thread_pool_task_executor.h" #include "mongo/logv2/log.h" +#include "mongo/util/concurrency/idle_thread_block.h" #include "mongo/util/concurrency/thread_pool.h" -#include "mongo/util/future_util.h" -#include "mongo/util/static_immortal.h" namespace mongo { @@ -50,27 +49,9 @@ namespace { const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority, WriteConcernOptions::SyncMode::UNSET, WriteConcernOptions::kNoTimeout); + const auto waitForMajorityServiceDecoration = ServiceContext::declareDecoration<WaitForMajorityService>(); - -constexpr static auto kWaitClientName = "WaitForMajorityServiceWaiter"; -constexpr static auto kCancelClientName = "WaitForMajorityServiceCanceler"; - -std::unique_ptr<ThreadPool> makeThreadPool() { - ThreadPool::Options options; - options.poolName = "WaitForMajorityServiceThreadPool"; - options.minThreads = 0; - // This service must have the ability to use at least two background threads. If it is limited - // to one, than if that thread is blocking waiting on an opTime, any cancellations cannot be - // completed until that wait is complete. - options.maxThreads = 2; - return std::make_unique<ThreadPool>(options); -} -inline Status waitUntilMajorityCanceledStatus() { - static StaticImmortal s = - Status{ErrorCodes::CallbackCanceled, "WaitForMajorityService::waitUntilMajority canceled"}; - return *s; -} } // namespace WaitForMajorityService::~WaitForMajorityService() { @@ -81,156 +62,139 @@ WaitForMajorityService& WaitForMajorityService::get(ServiceContext* service) { return waitForMajorityServiceDecoration(service); } -void WaitForMajorityService::startup(ServiceContext* ctx) { +void WaitForMajorityService::setUp(ServiceContext* service) { stdx::lock_guard lk(_mutex); - invariant(_state == State::kNotStarted); - _pool = makeThreadPool(); - _waitForMajorityClient = ClientStrand::make(ctx->makeClient(kWaitClientName)); - _waitForMajorityCancelationClient = ClientStrand::make(ctx->makeClient(kCancelClientName)); - _backgroundWorkComplete = _periodicallyWaitForMajority(); - _pool->startup(); - _state = State::kRunning; + + if (!_thread.joinable() && !_inShutDown) { + _thread = stdx::thread([this, service] { _periodicallyWaitForMajority(service); }); + } } void WaitForMajorityService::shutDown() { { stdx::lock_guard lk(_mutex); - if (_state != State::kRunning) { + if (std::exchange(_inShutDown, true)) { return; } - _state = State::kShutdown; - _waitForMajorityClient->getClientPointer()->setKilled(); - _waitForMajorityCancelationClient->getClientPointer()->setKilled(); - - for (auto&& request : _queuedOpTimes) { - if (!request.second->hasBeenProcessed.swap(true)) { - request.second->result.setError( - {ErrorCodes::InterruptedAtShutdown, "Shutting down wait for majority service"}); - } + if (_opCtx) { + stdx::lock_guard scopedClientLock(*_opCtx->getClient()); + _opCtx->getServiceContext()->killOperation( + scopedClientLock, _opCtx, ErrorCodes::InterruptedAtShutdown); } - _hasNewOpTimeCV.notifyAllAndClose(); } - _pool->shutdown(); - _pool->join(); - _backgroundWorkComplete->wait(); - // It's important to reset the clientstrand pointers after waiting for work - // in the thread pool to complete since that work might be using the client - // objects. - _waitForMajorityClient.reset(); - _waitForMajorityCancelationClient.reset(); -} -SemiFuture<void> WaitForMajorityService::waitUntilMajority(const repl::OpTime& opTime, - const CancelationToken& cancelToken) { - - auto [promise, future] = makePromiseFuture<void>(); - auto request = std::make_shared<Request>(std::move(promise)); + if (_thread.joinable()) { + _thread.join(); + } stdx::lock_guard lk(_mutex); + for (auto&& pendingRequest : _queuedOpTimes) { + pendingRequest.second.setError( + {ErrorCodes::InterruptedAtShutdown, "Shutting down wait for majority service"}); + } - tassert(5065600, - "WaitForMajorityService must be started before calling waitUntilMajority", - _state != State::kNotStarted); + _queuedOpTimes.clear(); +} - if (_state == State::kShutdown) { - return {SemiFuture<void>::makeReady( +SharedSemiFuture<void> WaitForMajorityService::waitUntilMajority(const repl::OpTime& opTime) { + stdx::lock_guard lk(_mutex); + + if (_inShutDown) { + return {Future<void>::makeReady( Status{ErrorCodes::ShutdownInProgress, "rejecting wait for majority request due to server shutdown"})}; } + // Background thread must be running before requesting. + invariant(_thread.joinable()); + if (_lastOpTimeWaited >= opTime) { - return {SemiFuture<void>::makeReady()}; + return {Future<void>::makeReady()}; } - if (cancelToken.isCanceled()) { - return {SemiFuture<void>::makeReady(waitUntilMajorityCanceledStatus())}; + auto iter = _queuedOpTimes.lower_bound(opTime); + if (iter != _queuedOpTimes.end()) { + if (iter->first == opTime) { + return iter->second.getFuture(); + } } - const bool wasEmpty = _queuedOpTimes.empty(); - - if (!wasEmpty && opTime < _queuedOpTimes.begin()->first) { + if (iter == _queuedOpTimes.begin()) { // Background thread could already be actively waiting on a later time, so tell it to stop // and wait for the newly requested opTime instead. - stdx::lock_guard scopedClientLock(*_waitForMajorityClient->getClientPointer()); - if (auto opCtx = _waitForMajorityClient->getClientPointer()->getOperationContext()) - opCtx->getServiceContext()->killOperation( - scopedClientLock, opCtx, ErrorCodes::WaitForMajorityServiceEarlierOpTimeAvailable); + if (_opCtx) { + stdx::lock_guard scopedClientLock(*_opCtx->getClient()); + _opCtx->getServiceContext()->killOperation( + scopedClientLock, _opCtx, ErrorCodes::WaitForMajorityServiceEarlierOpTimeAvailable); + } } - auto resultIter = _queuedOpTimes.emplace( - std::piecewise_construct, std::forward_as_tuple(opTime), std::forward_as_tuple(request)); - + const bool wasEmpty = _queuedOpTimes.empty(); + auto resultIter = _queuedOpTimes.emplace_hint( + iter, std::piecewise_construct, std::forward_as_tuple(opTime), std::forward_as_tuple()); if (wasEmpty) { - // Notify the background thread that work is now available. - _hasNewOpTimeCV.notifyAllAndReset(); + _hasNewOpTimeCV.notify_one(); } - cancelToken.onCancel().thenRunOn(_pool).getAsync([this, resultIter, request](Status s) { - if (!s.isOK()) { - return; + return resultIter->second.getFuture(); +} + +void WaitForMajorityService::_periodicallyWaitForMajority(ServiceContext* service) { + ThreadClient tc("waitForMajority", service); + + stdx::unique_lock<Latch> lk(_mutex); + + while (!_inShutDown) { + auto opCtx = tc->makeOperationContext(); + _opCtx = opCtx.get(); + + if (!_queuedOpTimes.empty()) { + auto lowestOpTimeIter = _queuedOpTimes.begin(); + auto lowestOpTime = lowestOpTimeIter->first; + + lk.unlock(); + + WriteConcernResult ignoreResult; + auto status = + waitForWriteConcern(_opCtx, lowestOpTime, kMajorityWriteConcern, &ignoreResult); + + lk.lock(); + + if (status.isOK()) { + _lastOpTimeWaited = lowestOpTime; + } + + if (status == ErrorCodes::WaitForMajorityServiceEarlierOpTimeAvailable) { + _opCtx = nullptr; + continue; + } + + if (status.isOK()) { + lowestOpTimeIter->second.emplaceValue(); + } else { + lowestOpTimeIter->second.setError(status); + } + + _queuedOpTimes.erase(lowestOpTimeIter); } - auto clientGuard = _waitForMajorityCancelationClient->bind(); - if (!request->hasBeenProcessed.swap(true)) { - request->result.setError(waitUntilMajorityCanceledStatus()); - stdx::lock_guard lk(_mutex); - _queuedOpTimes.erase(resultIter); + + try { + MONGO_IDLE_THREAD_BLOCK; + _opCtx->waitForConditionOrInterrupt( + _hasNewOpTimeCV, lk, [&] { return !_queuedOpTimes.empty() || _inShutDown; }); + } catch (const DBException& e) { + LOGV2_DEBUG(22487, + 1, + "Unable to wait for new op time due to: {error}", + "Unable to wait for new op time", + "error"_attr = e); } - }); - return std::move(future).semi(); -} -SemiFuture<void> WaitForMajorityService::_periodicallyWaitForMajority() { - return AsyncTry([this] { - stdx::unique_lock<Latch> lk(_mutex); - if (_queuedOpTimes.empty()) { - return _hasNewOpTimeCV.onNotify(); - } - auto clientGuard = _waitForMajorityClient->bind(); - auto opCtx = clientGuard->makeOperationContext(); - - // This needs to be a copy since we unlock the lock before waiting for write concern - // and the iterator could be invalidated. - auto lowestOpTime = _queuedOpTimes.begin()->first; - - lk.unlock(); - - WriteConcernResult ignoreResult; - auto status = waitForWriteConcern( - opCtx.get(), lowestOpTime, kMajorityWriteConcern, &ignoreResult); - - lk.lock(); - - if (status.isOK()) { - _lastOpTimeWaited = lowestOpTime; - } - - if (status != ErrorCodes::WaitForMajorityServiceEarlierOpTimeAvailable) { - auto [lowestOpTimeIter, firstElemWithHigherOpTimeIter] = - _queuedOpTimes.equal_range(lowestOpTime); - - for (auto requestIt = lowestOpTimeIter; - requestIt != firstElemWithHigherOpTimeIter; - /*Increment in loop*/) { - if (!requestIt->second->hasBeenProcessed.swap(true)) { - requestIt->second->result.setFrom(status); - requestIt = _queuedOpTimes.erase(requestIt); - } else { - ++requestIt; - } - } - } - return SemiFuture<void>::makeReady(); - }) - .until([](Status) { - // Loop forever until _pool is shut down. - // TODO (SERVER-53766): Replace with condition-free looping utility. - return false; - }) - .on(_pool, CancelationToken::uncancelable()) - .semi(); + _opCtx = nullptr; + } } } // namespace mongo |