diff options
author | Spencer T Brody <spencer@mongodb.com> | 2020-07-03 14:54:08 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-07-07 18:26:37 +0000 |
commit | 2a8f91f240f9ec5cb5e47fcbb78a613985de6632 (patch) | |
tree | 3f0ba573c8e4568c2b0ed3f7266070b6b8a9a1aa /src/mongo/db/repl | |
parent | b38bfa72e8160bfe5bcebb87d22df6088f912786 (diff) | |
download | mongo-2a8f91f240f9ec5cb5e47fcbb78a613985de6632.tar.gz |
SERVER-49316 Move wait_for_majority_service to its own library
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r-- | src/mongo/db/repl/SConscript | 13 | ||||
-rw-r--r-- | src/mongo/db/repl/wait_for_majority_service.cpp | 200 | ||||
-rw-r--r-- | src/mongo/db/repl/wait_for_majority_service.h | 100 | ||||
-rw-r--r-- | src/mongo/db/repl/wait_for_majority_service_test.cpp | 271 |
4 files changed, 584 insertions, 0 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index c14a6696a3f..dae5cbb881a 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -1295,6 +1295,7 @@ env.CppUnitTest( 'task_runner_test.cpp', 'task_runner_test_fixture.cpp', 'vote_requester_test.cpp', + 'wait_for_majority_service_test.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/base', @@ -1370,6 +1371,7 @@ env.CppUnitTest( 'sync_source_selector_mock', 'task_executor_mock', 'task_runner', + 'wait_for_majority_service', ], ) @@ -1491,3 +1493,14 @@ env.Library( '$BUILD_DIR/mongo/db/commands/authentication_commands', ], ) + +env.Library( + target='wait_for_majority_service', + source=[ + 'wait_for_majority_service.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/db/rw_concern_d', + ], +) diff --git a/src/mongo/db/repl/wait_for_majority_service.cpp b/src/mongo/db/repl/wait_for_majority_service.cpp new file mode 100644 index 00000000000..c316501663e --- /dev/null +++ b/src/mongo/db/repl/wait_for_majority_service.cpp @@ -0,0 +1,200 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/wait_for_majority_service.h" + +#include <utility> + +#include "mongo/db/service_context.h" +#include "mongo/db/write_concern.h" +#include "mongo/executor/network_interface_factory.h" +#include "mongo/executor/thread_pool_task_executor.h" +#include "mongo/logv2/log.h" +#include "mongo/util/concurrency/idle_thread_block.h" +#include "mongo/util/concurrency/thread_pool.h" + +namespace mongo { + +namespace { +const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority, + WriteConcernOptions::SyncMode::UNSET, + WriteConcernOptions::kNoTimeout); + +const auto waitForMajorityServiceDecoration = + ServiceContext::declareDecoration<WaitForMajorityService>(); +} // namespace + +WaitForMajorityService::~WaitForMajorityService() { + shutDown(); +} + +WaitForMajorityService& WaitForMajorityService::get(ServiceContext* service) { + return waitForMajorityServiceDecoration(service); +} + +void WaitForMajorityService::setUp(ServiceContext* service) { + stdx::lock_guard lk(_mutex); + + if (!_thread.joinable() && !_inShutDown) { + _thread = stdx::thread([this, service] { _periodicallyWaitForMajority(service); }); + } +} + +void WaitForMajorityService::shutDown() { + { + stdx::lock_guard lk(_mutex); + + if (std::exchange(_inShutDown, true)) { + return; + } + + if (_opCtx) { + stdx::lock_guard scopedClientLock(*_opCtx->getClient()); + _opCtx->getServiceContext()->killOperation( + scopedClientLock, _opCtx, ErrorCodes::InterruptedAtShutdown); + } + } + + if (_thread.joinable()) { + _thread.join(); + } + + stdx::lock_guard lk(_mutex); + for (auto&& pendingRequest : _queuedOpTimes) { + pendingRequest.second.setError( + {ErrorCodes::InterruptedAtShutdown, "Shutting down wait for majority service"}); + } + + _queuedOpTimes.clear(); +} + +SharedSemiFuture<void> WaitForMajorityService::waitUntilMajority(const repl::OpTime& opTime) { + stdx::lock_guard lk(_mutex); + + if (_inShutDown) { + return {Future<void>::makeReady( + Status{ErrorCodes::ShutdownInProgress, + "rejecting wait for majority request due to server shutdown"})}; + } + + // Background thread must be running before requesting. + invariant(_thread.joinable()); + + if (_lastOpTimeWaited >= opTime) { + return {Future<void>::makeReady()}; + } + + auto iter = _queuedOpTimes.lower_bound(opTime); + if (iter != _queuedOpTimes.end()) { + if (iter->first == opTime) { + return iter->second.getFuture(); + } + } + + if (iter == _queuedOpTimes.begin()) { + // Background thread could already be actively waiting on a later time, so tell it to stop + // and wait for the newly requested opTime instead. + if (_opCtx) { + stdx::lock_guard scopedClientLock(*_opCtx->getClient()); + _opCtx->getServiceContext()->killOperation( + scopedClientLock, _opCtx, ErrorCodes::WaitForMajorityServiceEarlierOpTimeAvailable); + } + } + + const bool wasEmpty = _queuedOpTimes.empty(); + auto resultIter = _queuedOpTimes.emplace_hint( + iter, std::piecewise_construct, std::forward_as_tuple(opTime), std::forward_as_tuple()); + + if (wasEmpty) { + _hasNewOpTimeCV.notify_one(); + } + + return resultIter->second.getFuture(); +} + +void WaitForMajorityService::_periodicallyWaitForMajority(ServiceContext* service) { + ThreadClient tc("waitForMajority", service); + + stdx::unique_lock<Latch> lk(_mutex); + + while (!_inShutDown) { + auto opCtx = tc->makeOperationContext(); + _opCtx = opCtx.get(); + + if (!_queuedOpTimes.empty()) { + auto lowestOpTimeIter = _queuedOpTimes.begin(); + auto lowestOpTime = lowestOpTimeIter->first; + + lk.unlock(); + + WriteConcernResult ignoreResult; + auto status = + waitForWriteConcern(_opCtx, lowestOpTime, kMajorityWriteConcern, &ignoreResult); + + lk.lock(); + + if (status.isOK()) { + _lastOpTimeWaited = lowestOpTime; + } + + if (status == ErrorCodes::WaitForMajorityServiceEarlierOpTimeAvailable) { + _opCtx = nullptr; + continue; + } + + if (status.isOK()) { + lowestOpTimeIter->second.emplaceValue(); + } else { + lowestOpTimeIter->second.setError(status); + } + + _queuedOpTimes.erase(lowestOpTimeIter); + } + + try { + MONGO_IDLE_THREAD_BLOCK; + _opCtx->waitForConditionOrInterrupt( + _hasNewOpTimeCV, lk, [&] { return !_queuedOpTimes.empty() || _inShutDown; }); + } catch (const DBException& e) { + LOGV2_DEBUG(22487, + 1, + "Unable to wait for new op time due to: {error}", + "Unable to wait for new op time", + "error"_attr = e); + } + + _opCtx = nullptr; + } +} + +} // namespace mongo diff --git a/src/mongo/db/repl/wait_for_majority_service.h b/src/mongo/db/repl/wait_for_majority_service.h new file mode 100644 index 00000000000..1663ce907ac --- /dev/null +++ b/src/mongo/db/repl/wait_for_majority_service.h @@ -0,0 +1,100 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <map> +#include <memory> +#include <vector> + +#include "mongo/db/repl/optime.h" +#include "mongo/db/service_context.h" +#include "mongo/executor/task_executor.h" +#include "mongo/platform/mutex.h" +#include "mongo/stdx/thread.h" +#include "mongo/util/future.h" + +namespace mongo { + +/** + * Provides a facility for asynchronously waiting a local opTime to be majority committed. + */ +class WaitForMajorityService { +public: + ~WaitForMajorityService(); + + static WaitForMajorityService& get(ServiceContext* service); + + /** + * Sets up the background thread responsible for waiting for opTimes to be majority committed. + */ + void setUp(ServiceContext* service); + + /** + * Blocking method, which shuts down and joins the background thread. + */ + void shutDown(); + + /** + * Enqueue a request to wait for the given opTime to be majority committed. + */ + SharedSemiFuture<void> waitUntilMajority(const repl::OpTime& opTime); + +private: + using OpTimeWaitingMap = std::map<repl::OpTime, SharedPromise<void>>; + + /** + * Periodically checks the list of opTimes to wait for majority committed. + */ + void _periodicallyWaitForMajority(ServiceContext* service); + + Mutex _mutex = MONGO_MAKE_LATCH("WaitForMajorityService::_mutex"); + + // Contains an ordered list of opTimes to wait to be majority comitted. + OpTimeWaitingMap _queuedOpTimes; + + // Contains the last opTime that the background thread was able to successfully wait to be + // majority comitted. + repl::OpTime _lastOpTimeWaited; + + // The background thread. + stdx::thread _thread; + + // Use for signalling new opTime requests being queued. + stdx::condition_variable _hasNewOpTimeCV; + + // If set, contains a reference to the opCtx being used by the background thread. + // Only valid when _thread.joinable() and not nullptr. + OperationContext* _opCtx{nullptr}; + + // Flag is set to true after shutDown() is called. + bool _inShutDown{false}; +}; + +} // namespace mongo diff --git a/src/mongo/db/repl/wait_for_majority_service_test.cpp b/src/mongo/db/repl/wait_for_majority_service_test.cpp new file mode 100644 index 00000000000..c68158b97fd --- /dev/null +++ b/src/mongo/db/repl/wait_for_majority_service_test.cpp @@ -0,0 +1,271 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/replication_coordinator_mock.h" +#include "mongo/db/repl/wait_for_majority_service.h" +#include "mongo/db/service_context_d_test_fixture.h" +#include "mongo/platform/mutex.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +class WaitForMajorityServiceTest : public ServiceContextMongoDTest { +public: + void setUp() override { + auto service = getServiceContext(); + waitService()->setUp(service); + + auto replCoord = std::make_unique<repl::ReplicationCoordinatorMock>(service); + + replCoord->setAwaitReplicationReturnValueFunction( + [this](OperationContext* opCtx, const repl::OpTime& opTime) { + auto status = waitForWriteConcernStub(opCtx, opTime); + return repl::ReplicationCoordinator::StatusAndDuration(status, Milliseconds(0)); + }); + + repl::ReplicationCoordinator::set(service, std::move(replCoord)); + } + + void tearDown() override { + waitService()->shutDown(); + } + + WaitForMajorityService* waitService() { + return &_waitForMajorityService; + } + + void finishWaitingOneOpTime() { + stdx::unique_lock<Latch> lk(_mutex); + _isTestReady = true; + _isTestReadyCV.notify_one(); + + while (_isTestReady) { + _finishWaitingOneOpTimeCV.wait(lk); + } + } + + Status waitForWriteConcernStub(OperationContext* opCtx, const repl::OpTime& opTime) { + stdx::unique_lock<Latch> lk(_mutex); + + _waitForMajorityCallCount++; + _callCountChangedCV.notify_one(); + + try { + opCtx->waitForConditionOrInterrupt(_isTestReadyCV, lk, [&] { return _isTestReady; }); + } catch (const DBException& e) { + _isTestReady = false; + _finishWaitingOneOpTimeCV.notify_one(); + + return e.toStatus(); + } + + _lastOpTimeWaited = opTime; + _isTestReady = false; + _finishWaitingOneOpTimeCV.notify_one(); + + return Status::OK(); + } + + const repl::OpTime& getLastOpTimeWaited() { + stdx::lock_guard<Latch> lk(_mutex); + return _lastOpTimeWaited; + } + + void waitForMajorityCallCountGreaterThan(int expectedCount) { + stdx::unique_lock lk(_mutex); + _callCountChangedCV.wait(lk, [&] { return _waitForMajorityCallCount > expectedCount; }); + } + +private: + WaitForMajorityService _waitForMajorityService; + + Mutex _mutex = MONGO_MAKE_LATCH("WaitForMajorityServiceTest::_mutex"); + stdx::condition_variable _isTestReadyCV; + stdx::condition_variable _finishWaitingOneOpTimeCV; + stdx::condition_variable _callCountChangedCV; + + bool _isTestReady{false}; + repl::OpTime _lastOpTimeWaited; + int _waitForMajorityCallCount{0}; +}; + +TEST_F(WaitForMajorityServiceTest, WaitOneOpTime) { + repl::OpTime t1(Timestamp(1, 0), 2); + + auto future = waitService()->waitUntilMajority(t1); + + ASSERT_FALSE(future.isReady()); + + finishWaitingOneOpTime(); + + future.get(); + ASSERT_EQ(t1, getLastOpTimeWaited()); +} + +TEST_F(WaitForMajorityServiceTest, WaitWithSameOpTime) { + repl::OpTime t1(Timestamp(1, 0), 2); + + auto future1 = waitService()->waitUntilMajority(t1); + auto future1b = waitService()->waitUntilMajority(t1); + + ASSERT_FALSE(future1.isReady()); + ASSERT_FALSE(future1b.isReady()); + + finishWaitingOneOpTime(); + + future1.get(); + future1b.get(); + + ASSERT_EQ(t1, getLastOpTimeWaited()); +} + +TEST_F(WaitForMajorityServiceTest, WaitWithOpTimeEarlierThanLowestQueued) { + repl::OpTime laterOpTime(Timestamp(6, 0), 2); + repl::OpTime earlierOpTime(Timestamp(1, 0), 2); + + auto laterFuture = waitService()->waitUntilMajority(laterOpTime); + + // Wait until the background thread picks up the queued opTime. + waitForMajorityCallCountGreaterThan(0); + + // The 2nd request has an earlier time, so it will interrupt 'laterOpTime' and skip the line. + auto earlierFuture = waitService()->waitUntilMajority(earlierOpTime); + + // Wait for background thread to finish transitioning from waiting on laterOpTime to + // earlierOpTime. + waitForMajorityCallCountGreaterThan(1); + + ASSERT_FALSE(laterFuture.isReady()); + ASSERT_FALSE(earlierFuture.isReady()); + + finishWaitingOneOpTime(); + + ASSERT_FALSE(laterFuture.isReady()); + + earlierFuture.get(); + ASSERT_EQ(earlierOpTime, getLastOpTimeWaited()); + + finishWaitingOneOpTime(); + laterFuture.get(); + + ASSERT_EQ(laterOpTime, getLastOpTimeWaited()); +} + +TEST_F(WaitForMajorityServiceTest, WaitWithDifferentOpTime) { + repl::OpTime t1(Timestamp(1, 0), 2); + repl::OpTime t2(Timestamp(14, 0), 2); + + auto future1 = waitService()->waitUntilMajority(t1); + auto future2 = waitService()->waitUntilMajority(t2); + + ASSERT_FALSE(future1.isReady()); + ASSERT_FALSE(future2.isReady()); + + finishWaitingOneOpTime(); + + ASSERT_FALSE(future2.isReady()); + + future1.get(); + ASSERT_EQ(t1, getLastOpTimeWaited()); + + finishWaitingOneOpTime(); + + future2.get(); + ASSERT_EQ(t2, getLastOpTimeWaited()); +} + +TEST_F(WaitForMajorityServiceTest, WaitWithOpTimeEarlierThanOpTimeAlreadyWaited) { + repl::OpTime t1(Timestamp(5, 0), 2); + repl::OpTime t2(Timestamp(14, 0), 2); + + auto future1 = waitService()->waitUntilMajority(t1); + auto future2 = waitService()->waitUntilMajority(t2); + + ASSERT_FALSE(future1.isReady()); + ASSERT_FALSE(future2.isReady()); + + finishWaitingOneOpTime(); + + ASSERT_FALSE(future2.isReady()); + + future1.get(); + ASSERT_EQ(t1, getLastOpTimeWaited()); + + repl::OpTime oldTs(Timestamp(4, 0), 2); + auto oldFuture = waitService()->waitUntilMajority(oldTs); + auto alreadyWaitedFuture = waitService()->waitUntilMajority(t1); + + ASSERT_FALSE(future2.isReady()); + + oldFuture.get(); + alreadyWaitedFuture.get(); + ASSERT_EQ(t1, getLastOpTimeWaited()); + + finishWaitingOneOpTime(); + + future2.get(); + ASSERT_EQ(t2, getLastOpTimeWaited()); +} + +TEST_F(WaitForMajorityServiceTest, ShutdownShouldCancelQueuedRequests) { + repl::OpTime t1(Timestamp(5, 0), 2); + repl::OpTime t2(Timestamp(14, 0), 2); + + auto future1 = waitService()->waitUntilMajority(t1); + auto future2 = waitService()->waitUntilMajority(t2); + + ASSERT_FALSE(future1.isReady()); + ASSERT_FALSE(future2.isReady()); + + waitService()->shutDown(); + + ASSERT_THROWS_CODE(future1.get(), AssertionException, ErrorCodes::InterruptedAtShutdown); + ASSERT_THROWS_CODE(future2.get(), AssertionException, ErrorCodes::InterruptedAtShutdown); +} + +TEST_F(WaitForMajorityServiceTest, WriteConcernErrorGetsPropagatedCorrectly) { + repl::OpTime t(Timestamp(5, 0), 2); + + auto replCoord = dynamic_cast<repl::ReplicationCoordinatorMock*>( + repl::ReplicationCoordinator::get(getServiceContext())); + replCoord->setAwaitReplicationReturnValueFunction( + [this](OperationContext* opCtx, const repl::OpTime& opTime) { + return repl::ReplicationCoordinator::StatusAndDuration( + {ErrorCodes::PrimarySteppedDown, "test stepdown"}, Milliseconds(0)); + }); + + auto future = waitService()->waitUntilMajority(t); + ASSERT_THROWS_CODE(future.get(), AssertionException, ErrorCodes::PrimarySteppedDown); +} + +} // namespace +} // namespace mongo |