diff options
author | Matthew Russotto <matthew.russotto@mongodb.com> | 2023-03-02 16:34:22 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-03-11 00:50:41 +0000 |
commit | 45e7d2fef59284c1f34ff425de4ee09fcd24301a (patch) | |
tree | 44bab90945a6937843f337d05b3505e6cdfac82d /src/mongo/db/repl/wait_for_majority_service.cpp | |
parent | 83649d1a14581508a7232a3c2d275440cd21f8f1 (diff) | |
download | mongo-45e7d2fef59284c1f34ff425de4ee09fcd24301a.tar.gz |
SERVER-74611 Create a read concern majority service to complement write concern majority service.
Diffstat (limited to 'src/mongo/db/repl/wait_for_majority_service.cpp')
-rw-r--r-- | src/mongo/db/repl/wait_for_majority_service.cpp | 82 |
1 files changed, 69 insertions, 13 deletions
diff --git a/src/mongo/db/repl/wait_for_majority_service.cpp b/src/mongo/db/repl/wait_for_majority_service.cpp index 06ece2155f3..37d6407e0fe 100644 --- a/src/mongo/db/repl/wait_for_majority_service.cpp +++ b/src/mongo/db/repl/wait_for_majority_service.cpp @@ -34,6 +34,8 @@ #include <utility> +#include "mongo/db/read_concern.h" +#include "mongo/db/repl/read_concern_args.h" #include "mongo/db/service_context.h" #include "mongo/db/write_concern.h" #include "mongo/executor/network_interface_factory.h" @@ -43,7 +45,7 @@ #include "mongo/util/future_util.h" #include "mongo/util/static_immortal.h" -#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication namespace mongo { @@ -58,9 +60,9 @@ const auto waitForMajorityServiceDecoration = constexpr static auto kWaitClientName = "WaitForMajorityServiceWaiter"; constexpr static auto kCancelClientName = "WaitForMajorityServiceCanceler"; -std::unique_ptr<ThreadPool> makeThreadPool() { +std::unique_ptr<ThreadPool> makeThreadPool(StringData readOrWrite) { ThreadPool::Options options; - options.poolName = "WaitForMajorityServiceThreadPool"; + options.poolName = "WaitForMajorityService" + readOrWrite + "ThreadPool"; options.minThreads = 0; // This service must have the ability to use at least two background threads. If it is limited // to one, than if that thread is blocking waiting on an opTime, any cancellations cannot be @@ -79,22 +81,57 @@ WaitForMajorityService::~WaitForMajorityService() { shutDown(); } +WaitForMajorityServiceImplBase::~WaitForMajorityServiceImplBase() { + shutDown(); +} + WaitForMajorityService& WaitForMajorityService::get(ServiceContext* service) { return waitForMajorityServiceDecoration(service); } void WaitForMajorityService::startup(ServiceContext* ctx) { + _readService.startup(ctx); + _writeService.startup(ctx); +} + +void WaitForMajorityService::shutDown() { + _writeService.shutDown(); + _readService.shutDown(); +} + +void WaitForMajorityServiceImplBase::startup(ServiceContext* ctx) { stdx::lock_guard lk(_mutex); invariant(_state == State::kNotStarted); - _pool = makeThreadPool(); - _waitForMajorityClient = ClientStrand::make(ctx->makeClient(kWaitClientName)); - _waitForMajorityCancellationClient = ClientStrand::make(ctx->makeClient(kCancelClientName)); + _pool = makeThreadPool(_getReadOrWrite()); + _waitForMajorityClient = + ClientStrand::make(ctx->makeClient(kWaitClientName + _getReadOrWrite())); + _waitForMajorityCancellationClient = + ClientStrand::make(ctx->makeClient(kCancelClientName + _getReadOrWrite())); _backgroundWorkComplete = _periodicallyWaitForMajority(); _pool->startup(); _state = State::kRunning; } -void WaitForMajorityService::shutDown() { +SemiFuture<void> WaitForMajorityService::waitUntilMajorityForRead( + const repl::OpTime& opTime, const CancellationToken& cancelToken) { + uassert(ErrorCodes::ReadConcernMajorityNotEnabled, + "Storage engine does not support read concern majority.", + serverGlobalParams.enableMajorityReadConcern); + + return _readService.waitUntilMajority(opTime, cancelToken); +} + +SemiFuture<void> WaitForMajorityService::waitUntilMajorityForWrite( + const repl::OpTime& opTime, const CancellationToken& cancelToken) { + return _writeService.waitUntilMajority(opTime, cancelToken); +} + +SemiFuture<void> WaitForMajorityService::waitUntilMajority(const repl::OpTime& opTime, + const CancellationToken& cancelToken) { + return _writeService.waitUntilMajority(opTime, cancelToken); +} + +void WaitForMajorityServiceImplBase::shutDown() { { stdx::lock_guard lk(_mutex); @@ -124,8 +161,8 @@ void WaitForMajorityService::shutDown() { _waitForMajorityCancellationClient.reset(); } -SemiFuture<void> WaitForMajorityService::waitUntilMajority(const repl::OpTime& opTime, - const CancellationToken& cancelToken) { +SemiFuture<void> WaitForMajorityServiceImplBase::waitUntilMajority( + const repl::OpTime& opTime, const CancellationToken& cancelToken) { auto [promise, future] = makePromiseFuture<void>(); auto request = std::make_shared<Request>(std::move(promise)); @@ -189,7 +226,28 @@ SemiFuture<void> WaitForMajorityService::waitUntilMajority(const repl::OpTime& o return std::move(future).semi(); } -SemiFuture<void> WaitForMajorityService::_periodicallyWaitForMajority() { +Status WaitForMajorityServiceForWriteImpl::_waitForOpTime(OperationContext* opCtx, + const repl::OpTime& opTime) { + WriteConcernResult ignoreResult; + return waitForWriteConcern(opCtx, opTime, kMajorityWriteConcern, &ignoreResult); +} + +Status WaitForMajorityServiceForReadImpl::_waitForOpTime(OperationContext* opCtx, + const repl::OpTime& opTime) { + repl::ReadConcernArgs readConcernArgs(opTime, repl::ReadConcernLevel::kMajorityReadConcern); + + auto status = waitForReadConcern( + opCtx, readConcernArgs, DatabaseName(), false /* allow afterClusterTime */); + // This code should only happen when enableMajorityReadConcern is true, which is no longer + // permitted. + invariant(status.code() != ErrorCodes::ReadConcernMajorityNotEnabled); + return status; +} + +SemiFuture<void> WaitForMajorityServiceImplBase::_periodicallyWaitForMajority() { + /** + * Enqueue a request to wait for the given opTime to be majority committed. + */ return AsyncTry([this] { auto clientGuard = _waitForMajorityClient->bind(); stdx::unique_lock<Latch> lk(_mutex); @@ -204,9 +262,7 @@ SemiFuture<void> WaitForMajorityService::_periodicallyWaitForMajority() { lk.unlock(); - WriteConcernResult ignoreResult; - auto status = waitForWriteConcern( - opCtx.get(), lowestOpTime, kMajorityWriteConcern, &ignoreResult); + auto status = _waitForOpTime(opCtx.get(), lowestOpTime); lk.lock(); |