summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/wait_for_majority_service.cpp
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@mongodb.com>2023-03-02 16:34:22 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-03-11 00:50:41 +0000
commit45e7d2fef59284c1f34ff425de4ee09fcd24301a (patch)
tree44bab90945a6937843f337d05b3505e6cdfac82d /src/mongo/db/repl/wait_for_majority_service.cpp
parent83649d1a14581508a7232a3c2d275440cd21f8f1 (diff)
downloadmongo-45e7d2fef59284c1f34ff425de4ee09fcd24301a.tar.gz
SERVER-74611 Create a read concern majority service to complement write concern majority service.
Diffstat (limited to 'src/mongo/db/repl/wait_for_majority_service.cpp')
-rw-r--r--src/mongo/db/repl/wait_for_majority_service.cpp82
1 files changed, 69 insertions, 13 deletions
diff --git a/src/mongo/db/repl/wait_for_majority_service.cpp b/src/mongo/db/repl/wait_for_majority_service.cpp
index 06ece2155f3..37d6407e0fe 100644
--- a/src/mongo/db/repl/wait_for_majority_service.cpp
+++ b/src/mongo/db/repl/wait_for_majority_service.cpp
@@ -34,6 +34,8 @@
#include <utility>
+#include "mongo/db/read_concern.h"
+#include "mongo/db/repl/read_concern_args.h"
#include "mongo/db/service_context.h"
#include "mongo/db/write_concern.h"
#include "mongo/executor/network_interface_factory.h"
@@ -43,7 +45,7 @@
#include "mongo/util/future_util.h"
#include "mongo/util/static_immortal.h"
-#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication
namespace mongo {
@@ -58,9 +60,9 @@ const auto waitForMajorityServiceDecoration =
constexpr static auto kWaitClientName = "WaitForMajorityServiceWaiter";
constexpr static auto kCancelClientName = "WaitForMajorityServiceCanceler";
-std::unique_ptr<ThreadPool> makeThreadPool() {
+std::unique_ptr<ThreadPool> makeThreadPool(StringData readOrWrite) {
ThreadPool::Options options;
- options.poolName = "WaitForMajorityServiceThreadPool";
+ options.poolName = "WaitForMajorityService" + readOrWrite + "ThreadPool";
options.minThreads = 0;
// This service must have the ability to use at least two background threads. If it is limited
// to one, than if that thread is blocking waiting on an opTime, any cancellations cannot be
@@ -79,22 +81,57 @@ WaitForMajorityService::~WaitForMajorityService() {
shutDown();
}
+WaitForMajorityServiceImplBase::~WaitForMajorityServiceImplBase() {
+ shutDown();
+}
+
WaitForMajorityService& WaitForMajorityService::get(ServiceContext* service) {
return waitForMajorityServiceDecoration(service);
}
void WaitForMajorityService::startup(ServiceContext* ctx) {
+ _readService.startup(ctx);
+ _writeService.startup(ctx);
+}
+
+void WaitForMajorityService::shutDown() {
+ _writeService.shutDown();
+ _readService.shutDown();
+}
+
+void WaitForMajorityServiceImplBase::startup(ServiceContext* ctx) {
stdx::lock_guard lk(_mutex);
invariant(_state == State::kNotStarted);
- _pool = makeThreadPool();
- _waitForMajorityClient = ClientStrand::make(ctx->makeClient(kWaitClientName));
- _waitForMajorityCancellationClient = ClientStrand::make(ctx->makeClient(kCancelClientName));
+ _pool = makeThreadPool(_getReadOrWrite());
+ _waitForMajorityClient =
+ ClientStrand::make(ctx->makeClient(kWaitClientName + _getReadOrWrite()));
+ _waitForMajorityCancellationClient =
+ ClientStrand::make(ctx->makeClient(kCancelClientName + _getReadOrWrite()));
_backgroundWorkComplete = _periodicallyWaitForMajority();
_pool->startup();
_state = State::kRunning;
}
-void WaitForMajorityService::shutDown() {
+SemiFuture<void> WaitForMajorityService::waitUntilMajorityForRead(
+ const repl::OpTime& opTime, const CancellationToken& cancelToken) {
+ uassert(ErrorCodes::ReadConcernMajorityNotEnabled,
+ "Storage engine does not support read concern majority.",
+ serverGlobalParams.enableMajorityReadConcern);
+
+ return _readService.waitUntilMajority(opTime, cancelToken);
+}
+
+SemiFuture<void> WaitForMajorityService::waitUntilMajorityForWrite(
+ const repl::OpTime& opTime, const CancellationToken& cancelToken) {
+ return _writeService.waitUntilMajority(opTime, cancelToken);
+}
+
+SemiFuture<void> WaitForMajorityService::waitUntilMajority(const repl::OpTime& opTime,
+ const CancellationToken& cancelToken) {
+ return _writeService.waitUntilMajority(opTime, cancelToken);
+}
+
+void WaitForMajorityServiceImplBase::shutDown() {
{
stdx::lock_guard lk(_mutex);
@@ -124,8 +161,8 @@ void WaitForMajorityService::shutDown() {
_waitForMajorityCancellationClient.reset();
}
-SemiFuture<void> WaitForMajorityService::waitUntilMajority(const repl::OpTime& opTime,
- const CancellationToken& cancelToken) {
+SemiFuture<void> WaitForMajorityServiceImplBase::waitUntilMajority(
+ const repl::OpTime& opTime, const CancellationToken& cancelToken) {
auto [promise, future] = makePromiseFuture<void>();
auto request = std::make_shared<Request>(std::move(promise));
@@ -189,7 +226,28 @@ SemiFuture<void> WaitForMajorityService::waitUntilMajority(const repl::OpTime& o
return std::move(future).semi();
}
-SemiFuture<void> WaitForMajorityService::_periodicallyWaitForMajority() {
+Status WaitForMajorityServiceForWriteImpl::_waitForOpTime(OperationContext* opCtx,
+ const repl::OpTime& opTime) {
+ WriteConcernResult ignoreResult;
+ return waitForWriteConcern(opCtx, opTime, kMajorityWriteConcern, &ignoreResult);
+}
+
+Status WaitForMajorityServiceForReadImpl::_waitForOpTime(OperationContext* opCtx,
+ const repl::OpTime& opTime) {
+ repl::ReadConcernArgs readConcernArgs(opTime, repl::ReadConcernLevel::kMajorityReadConcern);
+
+ auto status = waitForReadConcern(
+ opCtx, readConcernArgs, DatabaseName(), false /* allow afterClusterTime */);
+ // This code should only happen when enableMajorityReadConcern is true, which is no longer
+ // permitted.
+ invariant(status.code() != ErrorCodes::ReadConcernMajorityNotEnabled);
+ return status;
+}
+
+SemiFuture<void> WaitForMajorityServiceImplBase::_periodicallyWaitForMajority() {
+ /**
+ * Enqueue a request to wait for the given opTime to be majority committed.
+ */
return AsyncTry([this] {
auto clientGuard = _waitForMajorityClient->bind();
stdx::unique_lock<Latch> lk(_mutex);
@@ -204,9 +262,7 @@ SemiFuture<void> WaitForMajorityService::_periodicallyWaitForMajority() {
lk.unlock();
- WriteConcernResult ignoreResult;
- auto status = waitForWriteConcern(
- opCtx.get(), lowestOpTime, kMajorityWriteConcern, &ignoreResult);
+ auto status = _waitForOpTime(opCtx.get(), lowestOpTime);
lk.lock();