summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/wait_for_majority_service.cpp
diff options
context:
space:
mode:
authorGeorge Wangensteen <george.wangensteen@mongodb.com>2021-02-04 16:23:05 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-02-04 17:45:23 +0000
commit3b4c40b136da419512bf6501655473db552efb11 (patch)
treeafa116d98dd44521a2983b97e0f91dcafa859ab5 /src/mongo/db/repl/wait_for_majority_service.cpp
parentf6119db22f13329a15be3771f732088a06130922 (diff)
downloadmongo-3b4c40b136da419512bf6501655473db552efb11.tar.gz
Revert "SERVER-50656 Add cancellation support to WaitForMajorityService"
This reverts commit da77452821c355346d873a6b31160c101adc60de.
Diffstat (limited to 'src/mongo/db/repl/wait_for_majority_service.cpp')
-rw-r--r--src/mongo/db/repl/wait_for_majority_service.cpp234
1 files changed, 99 insertions, 135 deletions
diff --git a/src/mongo/db/repl/wait_for_majority_service.cpp b/src/mongo/db/repl/wait_for_majority_service.cpp
index 0cb9e51f9f4..c316501663e 100644
--- a/src/mongo/db/repl/wait_for_majority_service.cpp
+++ b/src/mongo/db/repl/wait_for_majority_service.cpp
@@ -1,4 +1,4 @@
-/*
+/**
* Copyright (C) 2019-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
@@ -40,9 +40,8 @@
#include "mongo/executor/network_interface_factory.h"
#include "mongo/executor/thread_pool_task_executor.h"
#include "mongo/logv2/log.h"
+#include "mongo/util/concurrency/idle_thread_block.h"
#include "mongo/util/concurrency/thread_pool.h"
-#include "mongo/util/future_util.h"
-#include "mongo/util/static_immortal.h"
namespace mongo {
@@ -50,27 +49,9 @@ namespace {
const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority,
WriteConcernOptions::SyncMode::UNSET,
WriteConcernOptions::kNoTimeout);
+
const auto waitForMajorityServiceDecoration =
ServiceContext::declareDecoration<WaitForMajorityService>();
-
-constexpr static auto kWaitClientName = "WaitForMajorityServiceWaiter";
-constexpr static auto kCancelClientName = "WaitForMajorityServiceCanceler";
-
-std::unique_ptr<ThreadPool> makeThreadPool() {
- ThreadPool::Options options;
- options.poolName = "WaitForMajorityServiceThreadPool";
- options.minThreads = 0;
- // This service must have the ability to use at least two background threads. If it is limited
- // to one, than if that thread is blocking waiting on an opTime, any cancellations cannot be
- // completed until that wait is complete.
- options.maxThreads = 2;
- return std::make_unique<ThreadPool>(options);
-}
-inline Status waitUntilMajorityCanceledStatus() {
- static StaticImmortal s =
- Status{ErrorCodes::CallbackCanceled, "WaitForMajorityService::waitUntilMajority canceled"};
- return *s;
-}
} // namespace
WaitForMajorityService::~WaitForMajorityService() {
@@ -81,156 +62,139 @@ WaitForMajorityService& WaitForMajorityService::get(ServiceContext* service) {
return waitForMajorityServiceDecoration(service);
}
-void WaitForMajorityService::startup(ServiceContext* ctx) {
+void WaitForMajorityService::setUp(ServiceContext* service) {
stdx::lock_guard lk(_mutex);
- invariant(_state == State::kNotStarted);
- _pool = makeThreadPool();
- _waitForMajorityClient = ClientStrand::make(ctx->makeClient(kWaitClientName));
- _waitForMajorityCancelationClient = ClientStrand::make(ctx->makeClient(kCancelClientName));
- _backgroundWorkComplete = _periodicallyWaitForMajority();
- _pool->startup();
- _state = State::kRunning;
+
+ if (!_thread.joinable() && !_inShutDown) {
+ _thread = stdx::thread([this, service] { _periodicallyWaitForMajority(service); });
+ }
}
void WaitForMajorityService::shutDown() {
{
stdx::lock_guard lk(_mutex);
- if (_state != State::kRunning) {
+ if (std::exchange(_inShutDown, true)) {
return;
}
- _state = State::kShutdown;
- _waitForMajorityClient->getClientPointer()->setKilled();
- _waitForMajorityCancelationClient->getClientPointer()->setKilled();
-
- for (auto&& request : _queuedOpTimes) {
- if (!request.second->hasBeenProcessed.swap(true)) {
- request.second->result.setError(
- {ErrorCodes::InterruptedAtShutdown, "Shutting down wait for majority service"});
- }
+ if (_opCtx) {
+ stdx::lock_guard scopedClientLock(*_opCtx->getClient());
+ _opCtx->getServiceContext()->killOperation(
+ scopedClientLock, _opCtx, ErrorCodes::InterruptedAtShutdown);
}
- _hasNewOpTimeCV.notifyAllAndClose();
}
- _pool->shutdown();
- _pool->join();
- _backgroundWorkComplete->wait();
- // It's important to reset the clientstrand pointers after waiting for work
- // in the thread pool to complete since that work might be using the client
- // objects.
- _waitForMajorityClient.reset();
- _waitForMajorityCancelationClient.reset();
-}
-SemiFuture<void> WaitForMajorityService::waitUntilMajority(const repl::OpTime& opTime,
- const CancelationToken& cancelToken) {
-
- auto [promise, future] = makePromiseFuture<void>();
- auto request = std::make_shared<Request>(std::move(promise));
+ if (_thread.joinable()) {
+ _thread.join();
+ }
stdx::lock_guard lk(_mutex);
+ for (auto&& pendingRequest : _queuedOpTimes) {
+ pendingRequest.second.setError(
+ {ErrorCodes::InterruptedAtShutdown, "Shutting down wait for majority service"});
+ }
- tassert(5065600,
- "WaitForMajorityService must be started before calling waitUntilMajority",
- _state != State::kNotStarted);
+ _queuedOpTimes.clear();
+}
- if (_state == State::kShutdown) {
- return {SemiFuture<void>::makeReady(
+SharedSemiFuture<void> WaitForMajorityService::waitUntilMajority(const repl::OpTime& opTime) {
+ stdx::lock_guard lk(_mutex);
+
+ if (_inShutDown) {
+ return {Future<void>::makeReady(
Status{ErrorCodes::ShutdownInProgress,
"rejecting wait for majority request due to server shutdown"})};
}
+ // Background thread must be running before requesting.
+ invariant(_thread.joinable());
+
if (_lastOpTimeWaited >= opTime) {
- return {SemiFuture<void>::makeReady()};
+ return {Future<void>::makeReady()};
}
- if (cancelToken.isCanceled()) {
- return {SemiFuture<void>::makeReady(waitUntilMajorityCanceledStatus())};
+ auto iter = _queuedOpTimes.lower_bound(opTime);
+ if (iter != _queuedOpTimes.end()) {
+ if (iter->first == opTime) {
+ return iter->second.getFuture();
+ }
}
- const bool wasEmpty = _queuedOpTimes.empty();
-
- if (!wasEmpty && opTime < _queuedOpTimes.begin()->first) {
+ if (iter == _queuedOpTimes.begin()) {
// Background thread could already be actively waiting on a later time, so tell it to stop
// and wait for the newly requested opTime instead.
- stdx::lock_guard scopedClientLock(*_waitForMajorityClient->getClientPointer());
- if (auto opCtx = _waitForMajorityClient->getClientPointer()->getOperationContext())
- opCtx->getServiceContext()->killOperation(
- scopedClientLock, opCtx, ErrorCodes::WaitForMajorityServiceEarlierOpTimeAvailable);
+ if (_opCtx) {
+ stdx::lock_guard scopedClientLock(*_opCtx->getClient());
+ _opCtx->getServiceContext()->killOperation(
+ scopedClientLock, _opCtx, ErrorCodes::WaitForMajorityServiceEarlierOpTimeAvailable);
+ }
}
- auto resultIter = _queuedOpTimes.emplace(
- std::piecewise_construct, std::forward_as_tuple(opTime), std::forward_as_tuple(request));
-
+ const bool wasEmpty = _queuedOpTimes.empty();
+ auto resultIter = _queuedOpTimes.emplace_hint(
+ iter, std::piecewise_construct, std::forward_as_tuple(opTime), std::forward_as_tuple());
if (wasEmpty) {
- // Notify the background thread that work is now available.
- _hasNewOpTimeCV.notifyAllAndReset();
+ _hasNewOpTimeCV.notify_one();
}
- cancelToken.onCancel().thenRunOn(_pool).getAsync([this, resultIter, request](Status s) {
- if (!s.isOK()) {
- return;
+ return resultIter->second.getFuture();
+}
+
+void WaitForMajorityService::_periodicallyWaitForMajority(ServiceContext* service) {
+ ThreadClient tc("waitForMajority", service);
+
+ stdx::unique_lock<Latch> lk(_mutex);
+
+ while (!_inShutDown) {
+ auto opCtx = tc->makeOperationContext();
+ _opCtx = opCtx.get();
+
+ if (!_queuedOpTimes.empty()) {
+ auto lowestOpTimeIter = _queuedOpTimes.begin();
+ auto lowestOpTime = lowestOpTimeIter->first;
+
+ lk.unlock();
+
+ WriteConcernResult ignoreResult;
+ auto status =
+ waitForWriteConcern(_opCtx, lowestOpTime, kMajorityWriteConcern, &ignoreResult);
+
+ lk.lock();
+
+ if (status.isOK()) {
+ _lastOpTimeWaited = lowestOpTime;
+ }
+
+ if (status == ErrorCodes::WaitForMajorityServiceEarlierOpTimeAvailable) {
+ _opCtx = nullptr;
+ continue;
+ }
+
+ if (status.isOK()) {
+ lowestOpTimeIter->second.emplaceValue();
+ } else {
+ lowestOpTimeIter->second.setError(status);
+ }
+
+ _queuedOpTimes.erase(lowestOpTimeIter);
}
- auto clientGuard = _waitForMajorityCancelationClient->bind();
- if (!request->hasBeenProcessed.swap(true)) {
- request->result.setError(waitUntilMajorityCanceledStatus());
- stdx::lock_guard lk(_mutex);
- _queuedOpTimes.erase(resultIter);
+
+ try {
+ MONGO_IDLE_THREAD_BLOCK;
+ _opCtx->waitForConditionOrInterrupt(
+ _hasNewOpTimeCV, lk, [&] { return !_queuedOpTimes.empty() || _inShutDown; });
+ } catch (const DBException& e) {
+ LOGV2_DEBUG(22487,
+ 1,
+ "Unable to wait for new op time due to: {error}",
+ "Unable to wait for new op time",
+ "error"_attr = e);
}
- });
- return std::move(future).semi();
-}
-SemiFuture<void> WaitForMajorityService::_periodicallyWaitForMajority() {
- return AsyncTry([this] {
- stdx::unique_lock<Latch> lk(_mutex);
- if (_queuedOpTimes.empty()) {
- return _hasNewOpTimeCV.onNotify();
- }
- auto clientGuard = _waitForMajorityClient->bind();
- auto opCtx = clientGuard->makeOperationContext();
-
- // This needs to be a copy since we unlock the lock before waiting for write concern
- // and the iterator could be invalidated.
- auto lowestOpTime = _queuedOpTimes.begin()->first;
-
- lk.unlock();
-
- WriteConcernResult ignoreResult;
- auto status = waitForWriteConcern(
- opCtx.get(), lowestOpTime, kMajorityWriteConcern, &ignoreResult);
-
- lk.lock();
-
- if (status.isOK()) {
- _lastOpTimeWaited = lowestOpTime;
- }
-
- if (status != ErrorCodes::WaitForMajorityServiceEarlierOpTimeAvailable) {
- auto [lowestOpTimeIter, firstElemWithHigherOpTimeIter] =
- _queuedOpTimes.equal_range(lowestOpTime);
-
- for (auto requestIt = lowestOpTimeIter;
- requestIt != firstElemWithHigherOpTimeIter;
- /*Increment in loop*/) {
- if (!requestIt->second->hasBeenProcessed.swap(true)) {
- requestIt->second->result.setFrom(status);
- requestIt = _queuedOpTimes.erase(requestIt);
- } else {
- ++requestIt;
- }
- }
- }
- return SemiFuture<void>::makeReady();
- })
- .until([](Status) {
- // Loop forever until _pool is shut down.
- // TODO (SERVER-53766): Replace with condition-free looping utility.
- return false;
- })
- .on(_pool, CancelationToken::uncancelable())
- .semi();
+ _opCtx = nullptr;
+ }
}
} // namespace mongo