summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl
diff options
context:
space:
mode:
authorSpencer T Brody <spencer@mongodb.com>2020-07-03 14:54:08 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-07-07 18:26:37 +0000
commit2a8f91f240f9ec5cb5e47fcbb78a613985de6632 (patch)
tree3f0ba573c8e4568c2b0ed3f7266070b6b8a9a1aa /src/mongo/db/repl
parentb38bfa72e8160bfe5bcebb87d22df6088f912786 (diff)
downloadmongo-2a8f91f240f9ec5cb5e47fcbb78a613985de6632.tar.gz
SERVER-49316 Move wait_for_majority_service to its own library
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r--src/mongo/db/repl/SConscript13
-rw-r--r--src/mongo/db/repl/wait_for_majority_service.cpp200
-rw-r--r--src/mongo/db/repl/wait_for_majority_service.h100
-rw-r--r--src/mongo/db/repl/wait_for_majority_service_test.cpp271
4 files changed, 584 insertions, 0 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index c14a6696a3f..dae5cbb881a 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -1295,6 +1295,7 @@ env.CppUnitTest(
'task_runner_test.cpp',
'task_runner_test_fixture.cpp',
'vote_requester_test.cpp',
+ 'wait_for_majority_service_test.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
@@ -1370,6 +1371,7 @@ env.CppUnitTest(
'sync_source_selector_mock',
'task_executor_mock',
'task_runner',
+ 'wait_for_majority_service',
],
)
@@ -1491,3 +1493,14 @@ env.Library(
'$BUILD_DIR/mongo/db/commands/authentication_commands',
],
)
+
+env.Library(
+ target='wait_for_majority_service',
+ source=[
+ 'wait_for_majority_service.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/db/rw_concern_d',
+ ],
+)
diff --git a/src/mongo/db/repl/wait_for_majority_service.cpp b/src/mongo/db/repl/wait_for_majority_service.cpp
new file mode 100644
index 00000000000..c316501663e
--- /dev/null
+++ b/src/mongo/db/repl/wait_for_majority_service.cpp
@@ -0,0 +1,200 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/repl/wait_for_majority_service.h"
+
+#include <utility>
+
+#include "mongo/db/service_context.h"
+#include "mongo/db/write_concern.h"
+#include "mongo/executor/network_interface_factory.h"
+#include "mongo/executor/thread_pool_task_executor.h"
+#include "mongo/logv2/log.h"
+#include "mongo/util/concurrency/idle_thread_block.h"
+#include "mongo/util/concurrency/thread_pool.h"
+
+namespace mongo {
+
+namespace {
+const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority,
+ WriteConcernOptions::SyncMode::UNSET,
+ WriteConcernOptions::kNoTimeout);
+
+const auto waitForMajorityServiceDecoration =
+ ServiceContext::declareDecoration<WaitForMajorityService>();
+} // namespace
+
+WaitForMajorityService::~WaitForMajorityService() {
+ shutDown();
+}
+
+WaitForMajorityService& WaitForMajorityService::get(ServiceContext* service) {
+ return waitForMajorityServiceDecoration(service);
+}
+
+void WaitForMajorityService::setUp(ServiceContext* service) {
+ stdx::lock_guard lk(_mutex);
+
+ if (!_thread.joinable() && !_inShutDown) {
+ _thread = stdx::thread([this, service] { _periodicallyWaitForMajority(service); });
+ }
+}
+
+void WaitForMajorityService::shutDown() {
+ {
+ stdx::lock_guard lk(_mutex);
+
+ if (std::exchange(_inShutDown, true)) {
+ return;
+ }
+
+ if (_opCtx) {
+ stdx::lock_guard scopedClientLock(*_opCtx->getClient());
+ _opCtx->getServiceContext()->killOperation(
+ scopedClientLock, _opCtx, ErrorCodes::InterruptedAtShutdown);
+ }
+ }
+
+ if (_thread.joinable()) {
+ _thread.join();
+ }
+
+ stdx::lock_guard lk(_mutex);
+ for (auto&& pendingRequest : _queuedOpTimes) {
+ pendingRequest.second.setError(
+ {ErrorCodes::InterruptedAtShutdown, "Shutting down wait for majority service"});
+ }
+
+ _queuedOpTimes.clear();
+}
+
+SharedSemiFuture<void> WaitForMajorityService::waitUntilMajority(const repl::OpTime& opTime) {
+ stdx::lock_guard lk(_mutex);
+
+ if (_inShutDown) {
+ return {Future<void>::makeReady(
+ Status{ErrorCodes::ShutdownInProgress,
+ "rejecting wait for majority request due to server shutdown"})};
+ }
+
+ // Background thread must be running before requesting.
+ invariant(_thread.joinable());
+
+ if (_lastOpTimeWaited >= opTime) {
+ return {Future<void>::makeReady()};
+ }
+
+ auto iter = _queuedOpTimes.lower_bound(opTime);
+ if (iter != _queuedOpTimes.end()) {
+ if (iter->first == opTime) {
+ return iter->second.getFuture();
+ }
+ }
+
+ if (iter == _queuedOpTimes.begin()) {
+ // Background thread could already be actively waiting on a later time, so tell it to stop
+ // and wait for the newly requested opTime instead.
+ if (_opCtx) {
+ stdx::lock_guard scopedClientLock(*_opCtx->getClient());
+ _opCtx->getServiceContext()->killOperation(
+ scopedClientLock, _opCtx, ErrorCodes::WaitForMajorityServiceEarlierOpTimeAvailable);
+ }
+ }
+
+ const bool wasEmpty = _queuedOpTimes.empty();
+ auto resultIter = _queuedOpTimes.emplace_hint(
+ iter, std::piecewise_construct, std::forward_as_tuple(opTime), std::forward_as_tuple());
+
+ if (wasEmpty) {
+ _hasNewOpTimeCV.notify_one();
+ }
+
+ return resultIter->second.getFuture();
+}
+
+void WaitForMajorityService::_periodicallyWaitForMajority(ServiceContext* service) {
+ ThreadClient tc("waitForMajority", service);
+
+ stdx::unique_lock<Latch> lk(_mutex);
+
+ while (!_inShutDown) {
+ auto opCtx = tc->makeOperationContext();
+ _opCtx = opCtx.get();
+
+ if (!_queuedOpTimes.empty()) {
+ auto lowestOpTimeIter = _queuedOpTimes.begin();
+ auto lowestOpTime = lowestOpTimeIter->first;
+
+ lk.unlock();
+
+ WriteConcernResult ignoreResult;
+ auto status =
+ waitForWriteConcern(_opCtx, lowestOpTime, kMajorityWriteConcern, &ignoreResult);
+
+ lk.lock();
+
+ if (status.isOK()) {
+ _lastOpTimeWaited = lowestOpTime;
+ }
+
+ if (status == ErrorCodes::WaitForMajorityServiceEarlierOpTimeAvailable) {
+ _opCtx = nullptr;
+ continue;
+ }
+
+ if (status.isOK()) {
+ lowestOpTimeIter->second.emplaceValue();
+ } else {
+ lowestOpTimeIter->second.setError(status);
+ }
+
+ _queuedOpTimes.erase(lowestOpTimeIter);
+ }
+
+ try {
+ MONGO_IDLE_THREAD_BLOCK;
+ _opCtx->waitForConditionOrInterrupt(
+ _hasNewOpTimeCV, lk, [&] { return !_queuedOpTimes.empty() || _inShutDown; });
+ } catch (const DBException& e) {
+ LOGV2_DEBUG(22487,
+ 1,
+ "Unable to wait for new op time due to: {error}",
+ "Unable to wait for new op time",
+ "error"_attr = e);
+ }
+
+ _opCtx = nullptr;
+ }
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/repl/wait_for_majority_service.h b/src/mongo/db/repl/wait_for_majority_service.h
new file mode 100644
index 00000000000..1663ce907ac
--- /dev/null
+++ b/src/mongo/db/repl/wait_for_majority_service.h
@@ -0,0 +1,100 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <map>
+#include <memory>
+#include <vector>
+
+#include "mongo/db/repl/optime.h"
+#include "mongo/db/service_context.h"
+#include "mongo/executor/task_executor.h"
+#include "mongo/platform/mutex.h"
+#include "mongo/stdx/thread.h"
+#include "mongo/util/future.h"
+
+namespace mongo {
+
+/**
+ * Provides a facility for asynchronously waiting a local opTime to be majority committed.
+ */
+class WaitForMajorityService {
+public:
+ ~WaitForMajorityService();
+
+ static WaitForMajorityService& get(ServiceContext* service);
+
+ /**
+ * Sets up the background thread responsible for waiting for opTimes to be majority committed.
+ */
+ void setUp(ServiceContext* service);
+
+ /**
+ * Blocking method, which shuts down and joins the background thread.
+ */
+ void shutDown();
+
+ /**
+ * Enqueue a request to wait for the given opTime to be majority committed.
+ */
+ SharedSemiFuture<void> waitUntilMajority(const repl::OpTime& opTime);
+
+private:
+ using OpTimeWaitingMap = std::map<repl::OpTime, SharedPromise<void>>;
+
+ /**
+ * Periodically checks the list of opTimes to wait for majority committed.
+ */
+ void _periodicallyWaitForMajority(ServiceContext* service);
+
+ Mutex _mutex = MONGO_MAKE_LATCH("WaitForMajorityService::_mutex");
+
+ // Contains an ordered list of opTimes to wait to be majority comitted.
+ OpTimeWaitingMap _queuedOpTimes;
+
+ // Contains the last opTime that the background thread was able to successfully wait to be
+ // majority comitted.
+ repl::OpTime _lastOpTimeWaited;
+
+ // The background thread.
+ stdx::thread _thread;
+
+ // Use for signalling new opTime requests being queued.
+ stdx::condition_variable _hasNewOpTimeCV;
+
+ // If set, contains a reference to the opCtx being used by the background thread.
+ // Only valid when _thread.joinable() and not nullptr.
+ OperationContext* _opCtx{nullptr};
+
+ // Flag is set to true after shutDown() is called.
+ bool _inShutDown{false};
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/repl/wait_for_majority_service_test.cpp b/src/mongo/db/repl/wait_for_majority_service_test.cpp
new file mode 100644
index 00000000000..c68158b97fd
--- /dev/null
+++ b/src/mongo/db/repl/wait_for_majority_service_test.cpp
@@ -0,0 +1,271 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/repl/replication_coordinator_mock.h"
+#include "mongo/db/repl/wait_for_majority_service.h"
+#include "mongo/db/service_context_d_test_fixture.h"
+#include "mongo/platform/mutex.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+class WaitForMajorityServiceTest : public ServiceContextMongoDTest {
+public:
+ void setUp() override {
+ auto service = getServiceContext();
+ waitService()->setUp(service);
+
+ auto replCoord = std::make_unique<repl::ReplicationCoordinatorMock>(service);
+
+ replCoord->setAwaitReplicationReturnValueFunction(
+ [this](OperationContext* opCtx, const repl::OpTime& opTime) {
+ auto status = waitForWriteConcernStub(opCtx, opTime);
+ return repl::ReplicationCoordinator::StatusAndDuration(status, Milliseconds(0));
+ });
+
+ repl::ReplicationCoordinator::set(service, std::move(replCoord));
+ }
+
+ void tearDown() override {
+ waitService()->shutDown();
+ }
+
+ WaitForMajorityService* waitService() {
+ return &_waitForMajorityService;
+ }
+
+ void finishWaitingOneOpTime() {
+ stdx::unique_lock<Latch> lk(_mutex);
+ _isTestReady = true;
+ _isTestReadyCV.notify_one();
+
+ while (_isTestReady) {
+ _finishWaitingOneOpTimeCV.wait(lk);
+ }
+ }
+
+ Status waitForWriteConcernStub(OperationContext* opCtx, const repl::OpTime& opTime) {
+ stdx::unique_lock<Latch> lk(_mutex);
+
+ _waitForMajorityCallCount++;
+ _callCountChangedCV.notify_one();
+
+ try {
+ opCtx->waitForConditionOrInterrupt(_isTestReadyCV, lk, [&] { return _isTestReady; });
+ } catch (const DBException& e) {
+ _isTestReady = false;
+ _finishWaitingOneOpTimeCV.notify_one();
+
+ return e.toStatus();
+ }
+
+ _lastOpTimeWaited = opTime;
+ _isTestReady = false;
+ _finishWaitingOneOpTimeCV.notify_one();
+
+ return Status::OK();
+ }
+
+ const repl::OpTime& getLastOpTimeWaited() {
+ stdx::lock_guard<Latch> lk(_mutex);
+ return _lastOpTimeWaited;
+ }
+
+ void waitForMajorityCallCountGreaterThan(int expectedCount) {
+ stdx::unique_lock lk(_mutex);
+ _callCountChangedCV.wait(lk, [&] { return _waitForMajorityCallCount > expectedCount; });
+ }
+
+private:
+ WaitForMajorityService _waitForMajorityService;
+
+ Mutex _mutex = MONGO_MAKE_LATCH("WaitForMajorityServiceTest::_mutex");
+ stdx::condition_variable _isTestReadyCV;
+ stdx::condition_variable _finishWaitingOneOpTimeCV;
+ stdx::condition_variable _callCountChangedCV;
+
+ bool _isTestReady{false};
+ repl::OpTime _lastOpTimeWaited;
+ int _waitForMajorityCallCount{0};
+};
+
+TEST_F(WaitForMajorityServiceTest, WaitOneOpTime) {
+ repl::OpTime t1(Timestamp(1, 0), 2);
+
+ auto future = waitService()->waitUntilMajority(t1);
+
+ ASSERT_FALSE(future.isReady());
+
+ finishWaitingOneOpTime();
+
+ future.get();
+ ASSERT_EQ(t1, getLastOpTimeWaited());
+}
+
+TEST_F(WaitForMajorityServiceTest, WaitWithSameOpTime) {
+ repl::OpTime t1(Timestamp(1, 0), 2);
+
+ auto future1 = waitService()->waitUntilMajority(t1);
+ auto future1b = waitService()->waitUntilMajority(t1);
+
+ ASSERT_FALSE(future1.isReady());
+ ASSERT_FALSE(future1b.isReady());
+
+ finishWaitingOneOpTime();
+
+ future1.get();
+ future1b.get();
+
+ ASSERT_EQ(t1, getLastOpTimeWaited());
+}
+
+TEST_F(WaitForMajorityServiceTest, WaitWithOpTimeEarlierThanLowestQueued) {
+ repl::OpTime laterOpTime(Timestamp(6, 0), 2);
+ repl::OpTime earlierOpTime(Timestamp(1, 0), 2);
+
+ auto laterFuture = waitService()->waitUntilMajority(laterOpTime);
+
+ // Wait until the background thread picks up the queued opTime.
+ waitForMajorityCallCountGreaterThan(0);
+
+ // The 2nd request has an earlier time, so it will interrupt 'laterOpTime' and skip the line.
+ auto earlierFuture = waitService()->waitUntilMajority(earlierOpTime);
+
+ // Wait for background thread to finish transitioning from waiting on laterOpTime to
+ // earlierOpTime.
+ waitForMajorityCallCountGreaterThan(1);
+
+ ASSERT_FALSE(laterFuture.isReady());
+ ASSERT_FALSE(earlierFuture.isReady());
+
+ finishWaitingOneOpTime();
+
+ ASSERT_FALSE(laterFuture.isReady());
+
+ earlierFuture.get();
+ ASSERT_EQ(earlierOpTime, getLastOpTimeWaited());
+
+ finishWaitingOneOpTime();
+ laterFuture.get();
+
+ ASSERT_EQ(laterOpTime, getLastOpTimeWaited());
+}
+
+TEST_F(WaitForMajorityServiceTest, WaitWithDifferentOpTime) {
+ repl::OpTime t1(Timestamp(1, 0), 2);
+ repl::OpTime t2(Timestamp(14, 0), 2);
+
+ auto future1 = waitService()->waitUntilMajority(t1);
+ auto future2 = waitService()->waitUntilMajority(t2);
+
+ ASSERT_FALSE(future1.isReady());
+ ASSERT_FALSE(future2.isReady());
+
+ finishWaitingOneOpTime();
+
+ ASSERT_FALSE(future2.isReady());
+
+ future1.get();
+ ASSERT_EQ(t1, getLastOpTimeWaited());
+
+ finishWaitingOneOpTime();
+
+ future2.get();
+ ASSERT_EQ(t2, getLastOpTimeWaited());
+}
+
+TEST_F(WaitForMajorityServiceTest, WaitWithOpTimeEarlierThanOpTimeAlreadyWaited) {
+ repl::OpTime t1(Timestamp(5, 0), 2);
+ repl::OpTime t2(Timestamp(14, 0), 2);
+
+ auto future1 = waitService()->waitUntilMajority(t1);
+ auto future2 = waitService()->waitUntilMajority(t2);
+
+ ASSERT_FALSE(future1.isReady());
+ ASSERT_FALSE(future2.isReady());
+
+ finishWaitingOneOpTime();
+
+ ASSERT_FALSE(future2.isReady());
+
+ future1.get();
+ ASSERT_EQ(t1, getLastOpTimeWaited());
+
+ repl::OpTime oldTs(Timestamp(4, 0), 2);
+ auto oldFuture = waitService()->waitUntilMajority(oldTs);
+ auto alreadyWaitedFuture = waitService()->waitUntilMajority(t1);
+
+ ASSERT_FALSE(future2.isReady());
+
+ oldFuture.get();
+ alreadyWaitedFuture.get();
+ ASSERT_EQ(t1, getLastOpTimeWaited());
+
+ finishWaitingOneOpTime();
+
+ future2.get();
+ ASSERT_EQ(t2, getLastOpTimeWaited());
+}
+
+TEST_F(WaitForMajorityServiceTest, ShutdownShouldCancelQueuedRequests) {
+ repl::OpTime t1(Timestamp(5, 0), 2);
+ repl::OpTime t2(Timestamp(14, 0), 2);
+
+ auto future1 = waitService()->waitUntilMajority(t1);
+ auto future2 = waitService()->waitUntilMajority(t2);
+
+ ASSERT_FALSE(future1.isReady());
+ ASSERT_FALSE(future2.isReady());
+
+ waitService()->shutDown();
+
+ ASSERT_THROWS_CODE(future1.get(), AssertionException, ErrorCodes::InterruptedAtShutdown);
+ ASSERT_THROWS_CODE(future2.get(), AssertionException, ErrorCodes::InterruptedAtShutdown);
+}
+
+TEST_F(WaitForMajorityServiceTest, WriteConcernErrorGetsPropagatedCorrectly) {
+ repl::OpTime t(Timestamp(5, 0), 2);
+
+ auto replCoord = dynamic_cast<repl::ReplicationCoordinatorMock*>(
+ repl::ReplicationCoordinator::get(getServiceContext()));
+ replCoord->setAwaitReplicationReturnValueFunction(
+ [this](OperationContext* opCtx, const repl::OpTime& opTime) {
+ return repl::ReplicationCoordinator::StatusAndDuration(
+ {ErrorCodes::PrimarySteppedDown, "test stepdown"}, Milliseconds(0));
+ });
+
+ auto future = waitService()->waitUntilMajority(t);
+ ASSERT_THROWS_CODE(future.get(), AssertionException, ErrorCodes::PrimarySteppedDown);
+}
+
+} // namespace
+} // namespace mongo