From 2a8f91f240f9ec5cb5e47fcbb78a613985de6632 Mon Sep 17 00:00:00 2001 From: Spencer T Brody Date: Fri, 3 Jul 2020 14:54:08 -0400 Subject: SERVER-49316 Move wait_for_majority_service to its own library --- src/mongo/db/mongod_main.cpp | 2 +- src/mongo/db/repl/SConscript | 13 + src/mongo/db/repl/wait_for_majority_service.cpp | 200 +++++++++++++++ src/mongo/db/repl/wait_for_majority_service.h | 100 ++++++++ .../db/repl/wait_for_majority_service_test.cpp | 271 +++++++++++++++++++++ src/mongo/db/s/SConscript | 3 +- .../db/s/collection_sharding_runtime_test.cpp | 2 +- src/mongo/db/s/migration_util.cpp | 2 +- src/mongo/db/s/migration_util_test.cpp | 2 +- src/mongo/db/s/range_deletion_util.cpp | 2 +- src/mongo/db/s/range_deletion_util_test.cpp | 2 +- src/mongo/db/s/transaction_coordinator.cpp | 2 +- .../db/s/transaction_coordinator_test_fixture.cpp | 2 +- src/mongo/db/s/wait_for_majority_service.cpp | 200 --------------- src/mongo/db/s/wait_for_majority_service.h | 100 -------- src/mongo/db/s/wait_for_majority_service_test.cpp | 271 --------------------- 16 files changed, 593 insertions(+), 581 deletions(-) create mode 100644 src/mongo/db/repl/wait_for_majority_service.cpp create mode 100644 src/mongo/db/repl/wait_for_majority_service.h create mode 100644 src/mongo/db/repl/wait_for_majority_service_test.cpp delete mode 100644 src/mongo/db/s/wait_for_majority_service.cpp delete mode 100644 src/mongo/db/s/wait_for_majority_service.h delete mode 100644 src/mongo/db/s/wait_for_majority_service_test.cpp (limited to 'src') diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp index ce040f751b4..fe91a5478fc 100644 --- a/src/mongo/db/mongod_main.cpp +++ b/src/mongo/db/mongod_main.cpp @@ -123,6 +123,7 @@ #include "mongo/db/repl/replication_recovery.h" #include "mongo/db/repl/storage_interface_impl.h" #include "mongo/db/repl/topology_coordinator.h" +#include "mongo/db/repl/wait_for_majority_service.h" #include "mongo/db/repl_set_member_in_standalone_mode.h" #include "mongo/db/s/collection_sharding_state_factory_shard.h" #include "mongo/db/s/collection_sharding_state_factory_standalone.h" @@ -135,7 +136,6 @@ #include "mongo/db/s/sharding_initialization_mongod.h" #include "mongo/db/s/sharding_state_recovery.h" #include "mongo/db/s/transaction_coordinator_service.h" -#include "mongo/db/s/wait_for_majority_service.h" #include "mongo/db/server_options.h" #include "mongo/db/service_context.h" #include "mongo/db/service_entry_point_mongod.h" diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index c14a6696a3f..dae5cbb881a 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -1295,6 +1295,7 @@ env.CppUnitTest( 'task_runner_test.cpp', 'task_runner_test_fixture.cpp', 'vote_requester_test.cpp', + 'wait_for_majority_service_test.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/base', @@ -1370,6 +1371,7 @@ env.CppUnitTest( 'sync_source_selector_mock', 'task_executor_mock', 'task_runner', + 'wait_for_majority_service', ], ) @@ -1491,3 +1493,14 @@ env.Library( '$BUILD_DIR/mongo/db/commands/authentication_commands', ], ) + +env.Library( + target='wait_for_majority_service', + source=[ + 'wait_for_majority_service.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/db/rw_concern_d', + ], +) diff --git a/src/mongo/db/repl/wait_for_majority_service.cpp b/src/mongo/db/repl/wait_for_majority_service.cpp new file mode 100644 index 00000000000..c316501663e --- /dev/null +++ b/src/mongo/db/repl/wait_for_majority_service.cpp @@ -0,0 +1,200 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/wait_for_majority_service.h" + +#include + +#include "mongo/db/service_context.h" +#include "mongo/db/write_concern.h" +#include "mongo/executor/network_interface_factory.h" +#include "mongo/executor/thread_pool_task_executor.h" +#include "mongo/logv2/log.h" +#include "mongo/util/concurrency/idle_thread_block.h" +#include "mongo/util/concurrency/thread_pool.h" + +namespace mongo { + +namespace { +const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority, + WriteConcernOptions::SyncMode::UNSET, + WriteConcernOptions::kNoTimeout); + +const auto waitForMajorityServiceDecoration = + ServiceContext::declareDecoration(); +} // namespace + +WaitForMajorityService::~WaitForMajorityService() { + shutDown(); +} + +WaitForMajorityService& WaitForMajorityService::get(ServiceContext* service) { + return waitForMajorityServiceDecoration(service); +} + +void WaitForMajorityService::setUp(ServiceContext* service) { + stdx::lock_guard lk(_mutex); + + if (!_thread.joinable() && !_inShutDown) { + _thread = stdx::thread([this, service] { _periodicallyWaitForMajority(service); }); + } +} + +void WaitForMajorityService::shutDown() { + { + stdx::lock_guard lk(_mutex); + + if (std::exchange(_inShutDown, true)) { + return; + } + + if (_opCtx) { + stdx::lock_guard scopedClientLock(*_opCtx->getClient()); + _opCtx->getServiceContext()->killOperation( + scopedClientLock, _opCtx, ErrorCodes::InterruptedAtShutdown); + } + } + + if (_thread.joinable()) { + _thread.join(); + } + + stdx::lock_guard lk(_mutex); + for (auto&& pendingRequest : _queuedOpTimes) { + pendingRequest.second.setError( + {ErrorCodes::InterruptedAtShutdown, "Shutting down wait for majority service"}); + } + + _queuedOpTimes.clear(); +} + +SharedSemiFuture WaitForMajorityService::waitUntilMajority(const repl::OpTime& opTime) { + stdx::lock_guard lk(_mutex); + + if (_inShutDown) { + return {Future::makeReady( + Status{ErrorCodes::ShutdownInProgress, + "rejecting wait for majority request due to server shutdown"})}; + } + + // Background thread must be running before requesting. + invariant(_thread.joinable()); + + if (_lastOpTimeWaited >= opTime) { + return {Future::makeReady()}; + } + + auto iter = _queuedOpTimes.lower_bound(opTime); + if (iter != _queuedOpTimes.end()) { + if (iter->first == opTime) { + return iter->second.getFuture(); + } + } + + if (iter == _queuedOpTimes.begin()) { + // Background thread could already be actively waiting on a later time, so tell it to stop + // and wait for the newly requested opTime instead. + if (_opCtx) { + stdx::lock_guard scopedClientLock(*_opCtx->getClient()); + _opCtx->getServiceContext()->killOperation( + scopedClientLock, _opCtx, ErrorCodes::WaitForMajorityServiceEarlierOpTimeAvailable); + } + } + + const bool wasEmpty = _queuedOpTimes.empty(); + auto resultIter = _queuedOpTimes.emplace_hint( + iter, std::piecewise_construct, std::forward_as_tuple(opTime), std::forward_as_tuple()); + + if (wasEmpty) { + _hasNewOpTimeCV.notify_one(); + } + + return resultIter->second.getFuture(); +} + +void WaitForMajorityService::_periodicallyWaitForMajority(ServiceContext* service) { + ThreadClient tc("waitForMajority", service); + + stdx::unique_lock lk(_mutex); + + while (!_inShutDown) { + auto opCtx = tc->makeOperationContext(); + _opCtx = opCtx.get(); + + if (!_queuedOpTimes.empty()) { + auto lowestOpTimeIter = _queuedOpTimes.begin(); + auto lowestOpTime = lowestOpTimeIter->first; + + lk.unlock(); + + WriteConcernResult ignoreResult; + auto status = + waitForWriteConcern(_opCtx, lowestOpTime, kMajorityWriteConcern, &ignoreResult); + + lk.lock(); + + if (status.isOK()) { + _lastOpTimeWaited = lowestOpTime; + } + + if (status == ErrorCodes::WaitForMajorityServiceEarlierOpTimeAvailable) { + _opCtx = nullptr; + continue; + } + + if (status.isOK()) { + lowestOpTimeIter->second.emplaceValue(); + } else { + lowestOpTimeIter->second.setError(status); + } + + _queuedOpTimes.erase(lowestOpTimeIter); + } + + try { + MONGO_IDLE_THREAD_BLOCK; + _opCtx->waitForConditionOrInterrupt( + _hasNewOpTimeCV, lk, [&] { return !_queuedOpTimes.empty() || _inShutDown; }); + } catch (const DBException& e) { + LOGV2_DEBUG(22487, + 1, + "Unable to wait for new op time due to: {error}", + "Unable to wait for new op time", + "error"_attr = e); + } + + _opCtx = nullptr; + } +} + +} // namespace mongo diff --git a/src/mongo/db/repl/wait_for_majority_service.h b/src/mongo/db/repl/wait_for_majority_service.h new file mode 100644 index 00000000000..1663ce907ac --- /dev/null +++ b/src/mongo/db/repl/wait_for_majority_service.h @@ -0,0 +1,100 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include +#include +#include + +#include "mongo/db/repl/optime.h" +#include "mongo/db/service_context.h" +#include "mongo/executor/task_executor.h" +#include "mongo/platform/mutex.h" +#include "mongo/stdx/thread.h" +#include "mongo/util/future.h" + +namespace mongo { + +/** + * Provides a facility for asynchronously waiting a local opTime to be majority committed. + */ +class WaitForMajorityService { +public: + ~WaitForMajorityService(); + + static WaitForMajorityService& get(ServiceContext* service); + + /** + * Sets up the background thread responsible for waiting for opTimes to be majority committed. + */ + void setUp(ServiceContext* service); + + /** + * Blocking method, which shuts down and joins the background thread. + */ + void shutDown(); + + /** + * Enqueue a request to wait for the given opTime to be majority committed. + */ + SharedSemiFuture waitUntilMajority(const repl::OpTime& opTime); + +private: + using OpTimeWaitingMap = std::map>; + + /** + * Periodically checks the list of opTimes to wait for majority committed. + */ + void _periodicallyWaitForMajority(ServiceContext* service); + + Mutex _mutex = MONGO_MAKE_LATCH("WaitForMajorityService::_mutex"); + + // Contains an ordered list of opTimes to wait to be majority comitted. + OpTimeWaitingMap _queuedOpTimes; + + // Contains the last opTime that the background thread was able to successfully wait to be + // majority comitted. + repl::OpTime _lastOpTimeWaited; + + // The background thread. + stdx::thread _thread; + + // Use for signalling new opTime requests being queued. + stdx::condition_variable _hasNewOpTimeCV; + + // If set, contains a reference to the opCtx being used by the background thread. + // Only valid when _thread.joinable() and not nullptr. + OperationContext* _opCtx{nullptr}; + + // Flag is set to true after shutDown() is called. + bool _inShutDown{false}; +}; + +} // namespace mongo diff --git a/src/mongo/db/repl/wait_for_majority_service_test.cpp b/src/mongo/db/repl/wait_for_majority_service_test.cpp new file mode 100644 index 00000000000..c68158b97fd --- /dev/null +++ b/src/mongo/db/repl/wait_for_majority_service_test.cpp @@ -0,0 +1,271 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/replication_coordinator_mock.h" +#include "mongo/db/repl/wait_for_majority_service.h" +#include "mongo/db/service_context_d_test_fixture.h" +#include "mongo/platform/mutex.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +class WaitForMajorityServiceTest : public ServiceContextMongoDTest { +public: + void setUp() override { + auto service = getServiceContext(); + waitService()->setUp(service); + + auto replCoord = std::make_unique(service); + + replCoord->setAwaitReplicationReturnValueFunction( + [this](OperationContext* opCtx, const repl::OpTime& opTime) { + auto status = waitForWriteConcernStub(opCtx, opTime); + return repl::ReplicationCoordinator::StatusAndDuration(status, Milliseconds(0)); + }); + + repl::ReplicationCoordinator::set(service, std::move(replCoord)); + } + + void tearDown() override { + waitService()->shutDown(); + } + + WaitForMajorityService* waitService() { + return &_waitForMajorityService; + } + + void finishWaitingOneOpTime() { + stdx::unique_lock lk(_mutex); + _isTestReady = true; + _isTestReadyCV.notify_one(); + + while (_isTestReady) { + _finishWaitingOneOpTimeCV.wait(lk); + } + } + + Status waitForWriteConcernStub(OperationContext* opCtx, const repl::OpTime& opTime) { + stdx::unique_lock lk(_mutex); + + _waitForMajorityCallCount++; + _callCountChangedCV.notify_one(); + + try { + opCtx->waitForConditionOrInterrupt(_isTestReadyCV, lk, [&] { return _isTestReady; }); + } catch (const DBException& e) { + _isTestReady = false; + _finishWaitingOneOpTimeCV.notify_one(); + + return e.toStatus(); + } + + _lastOpTimeWaited = opTime; + _isTestReady = false; + _finishWaitingOneOpTimeCV.notify_one(); + + return Status::OK(); + } + + const repl::OpTime& getLastOpTimeWaited() { + stdx::lock_guard lk(_mutex); + return _lastOpTimeWaited; + } + + void waitForMajorityCallCountGreaterThan(int expectedCount) { + stdx::unique_lock lk(_mutex); + _callCountChangedCV.wait(lk, [&] { return _waitForMajorityCallCount > expectedCount; }); + } + +private: + WaitForMajorityService _waitForMajorityService; + + Mutex _mutex = MONGO_MAKE_LATCH("WaitForMajorityServiceTest::_mutex"); + stdx::condition_variable _isTestReadyCV; + stdx::condition_variable _finishWaitingOneOpTimeCV; + stdx::condition_variable _callCountChangedCV; + + bool _isTestReady{false}; + repl::OpTime _lastOpTimeWaited; + int _waitForMajorityCallCount{0}; +}; + +TEST_F(WaitForMajorityServiceTest, WaitOneOpTime) { + repl::OpTime t1(Timestamp(1, 0), 2); + + auto future = waitService()->waitUntilMajority(t1); + + ASSERT_FALSE(future.isReady()); + + finishWaitingOneOpTime(); + + future.get(); + ASSERT_EQ(t1, getLastOpTimeWaited()); +} + +TEST_F(WaitForMajorityServiceTest, WaitWithSameOpTime) { + repl::OpTime t1(Timestamp(1, 0), 2); + + auto future1 = waitService()->waitUntilMajority(t1); + auto future1b = waitService()->waitUntilMajority(t1); + + ASSERT_FALSE(future1.isReady()); + ASSERT_FALSE(future1b.isReady()); + + finishWaitingOneOpTime(); + + future1.get(); + future1b.get(); + + ASSERT_EQ(t1, getLastOpTimeWaited()); +} + +TEST_F(WaitForMajorityServiceTest, WaitWithOpTimeEarlierThanLowestQueued) { + repl::OpTime laterOpTime(Timestamp(6, 0), 2); + repl::OpTime earlierOpTime(Timestamp(1, 0), 2); + + auto laterFuture = waitService()->waitUntilMajority(laterOpTime); + + // Wait until the background thread picks up the queued opTime. + waitForMajorityCallCountGreaterThan(0); + + // The 2nd request has an earlier time, so it will interrupt 'laterOpTime' and skip the line. + auto earlierFuture = waitService()->waitUntilMajority(earlierOpTime); + + // Wait for background thread to finish transitioning from waiting on laterOpTime to + // earlierOpTime. + waitForMajorityCallCountGreaterThan(1); + + ASSERT_FALSE(laterFuture.isReady()); + ASSERT_FALSE(earlierFuture.isReady()); + + finishWaitingOneOpTime(); + + ASSERT_FALSE(laterFuture.isReady()); + + earlierFuture.get(); + ASSERT_EQ(earlierOpTime, getLastOpTimeWaited()); + + finishWaitingOneOpTime(); + laterFuture.get(); + + ASSERT_EQ(laterOpTime, getLastOpTimeWaited()); +} + +TEST_F(WaitForMajorityServiceTest, WaitWithDifferentOpTime) { + repl::OpTime t1(Timestamp(1, 0), 2); + repl::OpTime t2(Timestamp(14, 0), 2); + + auto future1 = waitService()->waitUntilMajority(t1); + auto future2 = waitService()->waitUntilMajority(t2); + + ASSERT_FALSE(future1.isReady()); + ASSERT_FALSE(future2.isReady()); + + finishWaitingOneOpTime(); + + ASSERT_FALSE(future2.isReady()); + + future1.get(); + ASSERT_EQ(t1, getLastOpTimeWaited()); + + finishWaitingOneOpTime(); + + future2.get(); + ASSERT_EQ(t2, getLastOpTimeWaited()); +} + +TEST_F(WaitForMajorityServiceTest, WaitWithOpTimeEarlierThanOpTimeAlreadyWaited) { + repl::OpTime t1(Timestamp(5, 0), 2); + repl::OpTime t2(Timestamp(14, 0), 2); + + auto future1 = waitService()->waitUntilMajority(t1); + auto future2 = waitService()->waitUntilMajority(t2); + + ASSERT_FALSE(future1.isReady()); + ASSERT_FALSE(future2.isReady()); + + finishWaitingOneOpTime(); + + ASSERT_FALSE(future2.isReady()); + + future1.get(); + ASSERT_EQ(t1, getLastOpTimeWaited()); + + repl::OpTime oldTs(Timestamp(4, 0), 2); + auto oldFuture = waitService()->waitUntilMajority(oldTs); + auto alreadyWaitedFuture = waitService()->waitUntilMajority(t1); + + ASSERT_FALSE(future2.isReady()); + + oldFuture.get(); + alreadyWaitedFuture.get(); + ASSERT_EQ(t1, getLastOpTimeWaited()); + + finishWaitingOneOpTime(); + + future2.get(); + ASSERT_EQ(t2, getLastOpTimeWaited()); +} + +TEST_F(WaitForMajorityServiceTest, ShutdownShouldCancelQueuedRequests) { + repl::OpTime t1(Timestamp(5, 0), 2); + repl::OpTime t2(Timestamp(14, 0), 2); + + auto future1 = waitService()->waitUntilMajority(t1); + auto future2 = waitService()->waitUntilMajority(t2); + + ASSERT_FALSE(future1.isReady()); + ASSERT_FALSE(future2.isReady()); + + waitService()->shutDown(); + + ASSERT_THROWS_CODE(future1.get(), AssertionException, ErrorCodes::InterruptedAtShutdown); + ASSERT_THROWS_CODE(future2.get(), AssertionException, ErrorCodes::InterruptedAtShutdown); +} + +TEST_F(WaitForMajorityServiceTest, WriteConcernErrorGetsPropagatedCorrectly) { + repl::OpTime t(Timestamp(5, 0), 2); + + auto replCoord = dynamic_cast( + repl::ReplicationCoordinator::get(getServiceContext())); + replCoord->setAwaitReplicationReturnValueFunction( + [this](OperationContext* opCtx, const repl::OpTime& opTime) { + return repl::ReplicationCoordinator::StatusAndDuration( + {ErrorCodes::PrimarySteppedDown, "test stepdown"}, Milliseconds(0)); + }); + + auto future = waitService()->waitUntilMajority(t); + ASSERT_THROWS_CODE(future.get(), AssertionException, ErrorCodes::PrimarySteppedDown); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 59a3b7294f1..3950e0dddb1 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -143,7 +143,6 @@ env.Library( 'transaction_coordinator_util.cpp', 'transaction_coordinator_worker_curop_repository_mongod.cpp', 'transaction_coordinator.cpp', - 'wait_for_majority_service.cpp', env.Idlc('transaction_coordinator_document.idl')[0], env.Idlc('transaction_coordinators_stats.idl')[0], ], @@ -152,6 +151,7 @@ env.Library( '$BUILD_DIR/mongo/db/commands/server_status', '$BUILD_DIR/mongo/db/commands/txn_cmd_request', '$BUILD_DIR/mongo/db/dbdirectclient', + '$BUILD_DIR/mongo/db/repl/wait_for_majority_service', '$BUILD_DIR/mongo/db/rw_concern_d', '$BUILD_DIR/mongo/db/vector_clock_mutable', '$BUILD_DIR/mongo/executor/task_executor_pool', @@ -404,7 +404,6 @@ env.CppUnitTest( 'start_chunk_clone_request_test.cpp', 'type_shard_identity_test.cpp', 'vector_clock_shard_server_test.cpp', - 'wait_for_majority_service_test.cpp', ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/db/auth/authmocks', diff --git a/src/mongo/db/s/collection_sharding_runtime_test.cpp b/src/mongo/db/s/collection_sharding_runtime_test.cpp index a58d4375a92..cc8984a57ed 100644 --- a/src/mongo/db/s/collection_sharding_runtime_test.cpp +++ b/src/mongo/db/s/collection_sharding_runtime_test.cpp @@ -32,10 +32,10 @@ #include "mongo/db/catalog_raii.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" +#include "mongo/db/repl/wait_for_majority_service.h" #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/shard_server_test_fixture.h" -#include "mongo/db/s/wait_for_majority_service.h" #include "mongo/util/fail_point.h" namespace mongo { diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp index 9a2a891f39d..4a57fae8c4d 100644 --- a/src/mongo/db/s/migration_util.cpp +++ b/src/mongo/db/s/migration_util.cpp @@ -48,6 +48,7 @@ #include "mongo/db/ops/write_ops.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/repl/wait_for_majority_service.h" #include "mongo/db/s/active_migrations_registry.h" #include "mongo/db/s/collection_metadata.h" #include "mongo/db/s/collection_sharding_runtime.h" @@ -56,7 +57,6 @@ #include "mongo/db/s/sharding_runtime_d_params_gen.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/s/sharding_statistics.h" -#include "mongo/db/s/wait_for_majority_service.h" #include "mongo/db/write_concern.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/executor/thread_pool_task_executor.h" diff --git a/src/mongo/db/s/migration_util_test.cpp b/src/mongo/db/s/migration_util_test.cpp index f643224ccee..165ad6a85b1 100644 --- a/src/mongo/db/s/migration_util_test.cpp +++ b/src/mongo/db/s/migration_util_test.cpp @@ -30,6 +30,7 @@ #include "mongo/client/remote_command_targeter_factory_mock.h" #include "mongo/client/remote_command_targeter_mock.h" #include "mongo/db/catalog_raii.h" +#include "mongo/db/repl/wait_for_majority_service.h" #include "mongo/db/s/catalog_cache_loader_mock.h" #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/collection_sharding_state.h" @@ -39,7 +40,6 @@ #include "mongo/db/s/shard_server_catalog_cache_loader.h" #include "mongo/db/s/shard_server_test_fixture.h" #include "mongo/db/s/sharding_state.h" -#include "mongo/db/s/wait_for_majority_service.h" #include "mongo/s/catalog/sharding_catalog_client_mock.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/database_version_helpers.h" diff --git a/src/mongo/db/s/range_deletion_util.cpp b/src/mongo/db/s/range_deletion_util.cpp index 97c087b448c..1a5ba642aa4 100644 --- a/src/mongo/db/s/range_deletion_util.cpp +++ b/src/mongo/db/s/range_deletion_util.cpp @@ -53,11 +53,11 @@ #include "mongo/db/query/query_knobs_gen.h" #include "mongo/db/query/query_planner.h" #include "mongo/db/repl/repl_client_info.h" +#include "mongo/db/repl/wait_for_majority_service.h" #include "mongo/db/s/migration_util.h" #include "mongo/db/s/persistent_task_store.h" #include "mongo/db/s/range_deletion_task_gen.h" #include "mongo/db/s/sharding_statistics.h" -#include "mongo/db/s/wait_for_majority_service.h" #include "mongo/db/service_context.h" #include "mongo/db/storage/remove_saver.h" #include "mongo/db/write_concern.h" diff --git a/src/mongo/db/s/range_deletion_util_test.cpp b/src/mongo/db/s/range_deletion_util_test.cpp index 919d723028f..c0d769a2564 100644 --- a/src/mongo/db/s/range_deletion_util_test.cpp +++ b/src/mongo/db/s/range_deletion_util_test.cpp @@ -31,6 +31,7 @@ #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" +#include "mongo/db/repl/wait_for_majority_service.h" #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/metadata_manager.h" #include "mongo/db/s/migration_util.h" @@ -39,7 +40,6 @@ #include "mongo/db/s/range_deletion_util.h" #include "mongo/db/s/shard_server_test_fixture.h" #include "mongo/db/s/sharding_runtime_d_params_gen.h" -#include "mongo/db/s/wait_for_majority_service.h" #include "mongo/unittest/death_test.h" #include "mongo/util/fail_point.h" diff --git a/src/mongo/db/s/transaction_coordinator.cpp b/src/mongo/db/s/transaction_coordinator.cpp index 0a9b900f853..52353319a55 100644 --- a/src/mongo/db/s/transaction_coordinator.cpp +++ b/src/mongo/db/s/transaction_coordinator.cpp @@ -33,8 +33,8 @@ #include "mongo/db/s/transaction_coordinator.h" +#include "mongo/db/repl/wait_for_majority_service.h" #include "mongo/db/s/transaction_coordinator_metrics_observer.h" -#include "mongo/db/s/wait_for_majority_service.h" #include "mongo/db/server_options.h" #include "mongo/db/vector_clock_mutable.h" #include "mongo/logv2/log.h" diff --git a/src/mongo/db/s/transaction_coordinator_test_fixture.cpp b/src/mongo/db/s/transaction_coordinator_test_fixture.cpp index 7d9d565d865..256d0905278 100644 --- a/src/mongo/db/s/transaction_coordinator_test_fixture.cpp +++ b/src/mongo/db/s/transaction_coordinator_test_fixture.cpp @@ -37,7 +37,7 @@ #include "mongo/db/commands/txn_cmds_gen.h" #include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h" #include "mongo/db/operation_context.h" -#include "mongo/db/s/wait_for_majority_service.h" +#include "mongo/db/repl/wait_for_majority_service.h" #include "mongo/rpc/metadata/client_metadata.h" #include "mongo/rpc/metadata/client_metadata_ismaster.h" #include "mongo/s/catalog/sharding_catalog_client_mock.h" diff --git a/src/mongo/db/s/wait_for_majority_service.cpp b/src/mongo/db/s/wait_for_majority_service.cpp deleted file mode 100644 index 8ab5c19d21a..00000000000 --- a/src/mongo/db/s/wait_for_majority_service.cpp +++ /dev/null @@ -1,200 +0,0 @@ -/** - * Copyright (C) 2019-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * . - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding - -#include "mongo/platform/basic.h" - -#include "mongo/db/s/wait_for_majority_service.h" - -#include - -#include "mongo/db/service_context.h" -#include "mongo/db/write_concern.h" -#include "mongo/executor/network_interface_factory.h" -#include "mongo/executor/thread_pool_task_executor.h" -#include "mongo/logv2/log.h" -#include "mongo/util/concurrency/idle_thread_block.h" -#include "mongo/util/concurrency/thread_pool.h" - -namespace mongo { - -namespace { -const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority, - WriteConcernOptions::SyncMode::UNSET, - WriteConcernOptions::kNoTimeout); - -const auto waitForMajorityServiceDecoration = - ServiceContext::declareDecoration(); -} // namespace - -WaitForMajorityService::~WaitForMajorityService() { - shutDown(); -} - -WaitForMajorityService& WaitForMajorityService::get(ServiceContext* service) { - return waitForMajorityServiceDecoration(service); -} - -void WaitForMajorityService::setUp(ServiceContext* service) { - stdx::lock_guard lk(_mutex); - - if (!_thread.joinable() && !_inShutDown) { - _thread = stdx::thread([this, service] { _periodicallyWaitForMajority(service); }); - } -} - -void WaitForMajorityService::shutDown() { - { - stdx::lock_guard lk(_mutex); - - if (std::exchange(_inShutDown, true)) { - return; - } - - if (_opCtx) { - stdx::lock_guard scopedClientLock(*_opCtx->getClient()); - _opCtx->getServiceContext()->killOperation( - scopedClientLock, _opCtx, ErrorCodes::InterruptedAtShutdown); - } - } - - if (_thread.joinable()) { - _thread.join(); - } - - stdx::lock_guard lk(_mutex); - for (auto&& pendingRequest : _queuedOpTimes) { - pendingRequest.second.setError( - {ErrorCodes::InterruptedAtShutdown, "Shutting down wait for majority service"}); - } - - _queuedOpTimes.clear(); -} - -SharedSemiFuture WaitForMajorityService::waitUntilMajority(const repl::OpTime& opTime) { - stdx::lock_guard lk(_mutex); - - if (_inShutDown) { - return {Future::makeReady( - Status{ErrorCodes::ShutdownInProgress, - "rejecting wait for majority request due to server shutdown"})}; - } - - // Background thread must be running before requesting. - invariant(_thread.joinable()); - - if (_lastOpTimeWaited >= opTime) { - return {Future::makeReady()}; - } - - auto iter = _queuedOpTimes.lower_bound(opTime); - if (iter != _queuedOpTimes.end()) { - if (iter->first == opTime) { - return iter->second.getFuture(); - } - } - - if (iter == _queuedOpTimes.begin()) { - // Background thread could already be actively waiting on a later time, so tell it to stop - // and wait for the newly requested opTime instead. - if (_opCtx) { - stdx::lock_guard scopedClientLock(*_opCtx->getClient()); - _opCtx->getServiceContext()->killOperation( - scopedClientLock, _opCtx, ErrorCodes::WaitForMajorityServiceEarlierOpTimeAvailable); - } - } - - const bool wasEmpty = _queuedOpTimes.empty(); - auto resultIter = _queuedOpTimes.emplace_hint( - iter, std::piecewise_construct, std::forward_as_tuple(opTime), std::forward_as_tuple()); - - if (wasEmpty) { - _hasNewOpTimeCV.notify_one(); - } - - return resultIter->second.getFuture(); -} - -void WaitForMajorityService::_periodicallyWaitForMajority(ServiceContext* service) { - ThreadClient tc("waitForMajority", service); - - stdx::unique_lock lk(_mutex); - - while (!_inShutDown) { - auto opCtx = tc->makeOperationContext(); - _opCtx = opCtx.get(); - - if (!_queuedOpTimes.empty()) { - auto lowestOpTimeIter = _queuedOpTimes.begin(); - auto lowestOpTime = lowestOpTimeIter->first; - - lk.unlock(); - - WriteConcernResult ignoreResult; - auto status = - waitForWriteConcern(_opCtx, lowestOpTime, kMajorityWriteConcern, &ignoreResult); - - lk.lock(); - - if (status.isOK()) { - _lastOpTimeWaited = lowestOpTime; - } - - if (status == ErrorCodes::WaitForMajorityServiceEarlierOpTimeAvailable) { - _opCtx = nullptr; - continue; - } - - if (status.isOK()) { - lowestOpTimeIter->second.emplaceValue(); - } else { - lowestOpTimeIter->second.setError(status); - } - - _queuedOpTimes.erase(lowestOpTimeIter); - } - - try { - MONGO_IDLE_THREAD_BLOCK; - _opCtx->waitForConditionOrInterrupt( - _hasNewOpTimeCV, lk, [&] { return !_queuedOpTimes.empty() || _inShutDown; }); - } catch (const DBException& e) { - LOGV2_DEBUG(22487, - 1, - "Unable to wait for new op time due to: {error}", - "Unable to wait for new op time", - "error"_attr = e); - } - - _opCtx = nullptr; - } -} - -} // namespace mongo diff --git a/src/mongo/db/s/wait_for_majority_service.h b/src/mongo/db/s/wait_for_majority_service.h deleted file mode 100644 index 1663ce907ac..00000000000 --- a/src/mongo/db/s/wait_for_majority_service.h +++ /dev/null @@ -1,100 +0,0 @@ -/** - * Copyright (C) 2019-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * . - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include -#include -#include - -#include "mongo/db/repl/optime.h" -#include "mongo/db/service_context.h" -#include "mongo/executor/task_executor.h" -#include "mongo/platform/mutex.h" -#include "mongo/stdx/thread.h" -#include "mongo/util/future.h" - -namespace mongo { - -/** - * Provides a facility for asynchronously waiting a local opTime to be majority committed. - */ -class WaitForMajorityService { -public: - ~WaitForMajorityService(); - - static WaitForMajorityService& get(ServiceContext* service); - - /** - * Sets up the background thread responsible for waiting for opTimes to be majority committed. - */ - void setUp(ServiceContext* service); - - /** - * Blocking method, which shuts down and joins the background thread. - */ - void shutDown(); - - /** - * Enqueue a request to wait for the given opTime to be majority committed. - */ - SharedSemiFuture waitUntilMajority(const repl::OpTime& opTime); - -private: - using OpTimeWaitingMap = std::map>; - - /** - * Periodically checks the list of opTimes to wait for majority committed. - */ - void _periodicallyWaitForMajority(ServiceContext* service); - - Mutex _mutex = MONGO_MAKE_LATCH("WaitForMajorityService::_mutex"); - - // Contains an ordered list of opTimes to wait to be majority comitted. - OpTimeWaitingMap _queuedOpTimes; - - // Contains the last opTime that the background thread was able to successfully wait to be - // majority comitted. - repl::OpTime _lastOpTimeWaited; - - // The background thread. - stdx::thread _thread; - - // Use for signalling new opTime requests being queued. - stdx::condition_variable _hasNewOpTimeCV; - - // If set, contains a reference to the opCtx being used by the background thread. - // Only valid when _thread.joinable() and not nullptr. - OperationContext* _opCtx{nullptr}; - - // Flag is set to true after shutDown() is called. - bool _inShutDown{false}; -}; - -} // namespace mongo diff --git a/src/mongo/db/s/wait_for_majority_service_test.cpp b/src/mongo/db/s/wait_for_majority_service_test.cpp deleted file mode 100644 index c9e2a8af3ef..00000000000 --- a/src/mongo/db/s/wait_for_majority_service_test.cpp +++ /dev/null @@ -1,271 +0,0 @@ -/** - * Copyright (C) 2019-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * . - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/db/repl/replication_coordinator_mock.h" -#include "mongo/db/s/wait_for_majority_service.h" -#include "mongo/db/service_context_d_test_fixture.h" -#include "mongo/platform/mutex.h" -#include "mongo/unittest/unittest.h" - -namespace mongo { -namespace { - -class WaitForMajorityServiceTest : public ServiceContextMongoDTest { -public: - void setUp() override { - auto service = getServiceContext(); - waitService()->setUp(service); - - auto replCoord = std::make_unique(service); - - replCoord->setAwaitReplicationReturnValueFunction( - [this](OperationContext* opCtx, const repl::OpTime& opTime) { - auto status = waitForWriteConcernStub(opCtx, opTime); - return repl::ReplicationCoordinator::StatusAndDuration(status, Milliseconds(0)); - }); - - repl::ReplicationCoordinator::set(service, std::move(replCoord)); - } - - void tearDown() override { - waitService()->shutDown(); - } - - WaitForMajorityService* waitService() { - return &_waitForMajorityService; - } - - void finishWaitingOneOpTime() { - stdx::unique_lock lk(_mutex); - _isTestReady = true; - _isTestReadyCV.notify_one(); - - while (_isTestReady) { - _finishWaitingOneOpTimeCV.wait(lk); - } - } - - Status waitForWriteConcernStub(OperationContext* opCtx, const repl::OpTime& opTime) { - stdx::unique_lock lk(_mutex); - - _waitForMajorityCallCount++; - _callCountChangedCV.notify_one(); - - try { - opCtx->waitForConditionOrInterrupt(_isTestReadyCV, lk, [&] { return _isTestReady; }); - } catch (const DBException& e) { - _isTestReady = false; - _finishWaitingOneOpTimeCV.notify_one(); - - return e.toStatus(); - } - - _lastOpTimeWaited = opTime; - _isTestReady = false; - _finishWaitingOneOpTimeCV.notify_one(); - - return Status::OK(); - } - - const repl::OpTime& getLastOpTimeWaited() { - stdx::lock_guard lk(_mutex); - return _lastOpTimeWaited; - } - - void waitForMajorityCallCountGreaterThan(int expectedCount) { - stdx::unique_lock lk(_mutex); - _callCountChangedCV.wait(lk, [&] { return _waitForMajorityCallCount > expectedCount; }); - } - -private: - WaitForMajorityService _waitForMajorityService; - - Mutex _mutex = MONGO_MAKE_LATCH("WaitForMajorityServiceTest::_mutex"); - stdx::condition_variable _isTestReadyCV; - stdx::condition_variable _finishWaitingOneOpTimeCV; - stdx::condition_variable _callCountChangedCV; - - bool _isTestReady{false}; - repl::OpTime _lastOpTimeWaited; - int _waitForMajorityCallCount{0}; -}; - -TEST_F(WaitForMajorityServiceTest, WaitOneOpTime) { - repl::OpTime t1(Timestamp(1, 0), 2); - - auto future = waitService()->waitUntilMajority(t1); - - ASSERT_FALSE(future.isReady()); - - finishWaitingOneOpTime(); - - future.get(); - ASSERT_EQ(t1, getLastOpTimeWaited()); -} - -TEST_F(WaitForMajorityServiceTest, WaitWithSameOpTime) { - repl::OpTime t1(Timestamp(1, 0), 2); - - auto future1 = waitService()->waitUntilMajority(t1); - auto future1b = waitService()->waitUntilMajority(t1); - - ASSERT_FALSE(future1.isReady()); - ASSERT_FALSE(future1b.isReady()); - - finishWaitingOneOpTime(); - - future1.get(); - future1b.get(); - - ASSERT_EQ(t1, getLastOpTimeWaited()); -} - -TEST_F(WaitForMajorityServiceTest, WaitWithOpTimeEarlierThanLowestQueued) { - repl::OpTime laterOpTime(Timestamp(6, 0), 2); - repl::OpTime earlierOpTime(Timestamp(1, 0), 2); - - auto laterFuture = waitService()->waitUntilMajority(laterOpTime); - - // Wait until the background thread picks up the queued opTime. - waitForMajorityCallCountGreaterThan(0); - - // The 2nd request has an earlier time, so it will interrupt 'laterOpTime' and skip the line. - auto earlierFuture = waitService()->waitUntilMajority(earlierOpTime); - - // Wait for background thread to finish transitioning from waiting on laterOpTime to - // earlierOpTime. - waitForMajorityCallCountGreaterThan(1); - - ASSERT_FALSE(laterFuture.isReady()); - ASSERT_FALSE(earlierFuture.isReady()); - - finishWaitingOneOpTime(); - - ASSERT_FALSE(laterFuture.isReady()); - - earlierFuture.get(); - ASSERT_EQ(earlierOpTime, getLastOpTimeWaited()); - - finishWaitingOneOpTime(); - laterFuture.get(); - - ASSERT_EQ(laterOpTime, getLastOpTimeWaited()); -} - -TEST_F(WaitForMajorityServiceTest, WaitWithDifferentOpTime) { - repl::OpTime t1(Timestamp(1, 0), 2); - repl::OpTime t2(Timestamp(14, 0), 2); - - auto future1 = waitService()->waitUntilMajority(t1); - auto future2 = waitService()->waitUntilMajority(t2); - - ASSERT_FALSE(future1.isReady()); - ASSERT_FALSE(future2.isReady()); - - finishWaitingOneOpTime(); - - ASSERT_FALSE(future2.isReady()); - - future1.get(); - ASSERT_EQ(t1, getLastOpTimeWaited()); - - finishWaitingOneOpTime(); - - future2.get(); - ASSERT_EQ(t2, getLastOpTimeWaited()); -} - -TEST_F(WaitForMajorityServiceTest, WaitWithOpTimeEarlierThanOpTimeAlreadyWaited) { - repl::OpTime t1(Timestamp(5, 0), 2); - repl::OpTime t2(Timestamp(14, 0), 2); - - auto future1 = waitService()->waitUntilMajority(t1); - auto future2 = waitService()->waitUntilMajority(t2); - - ASSERT_FALSE(future1.isReady()); - ASSERT_FALSE(future2.isReady()); - - finishWaitingOneOpTime(); - - ASSERT_FALSE(future2.isReady()); - - future1.get(); - ASSERT_EQ(t1, getLastOpTimeWaited()); - - repl::OpTime oldTs(Timestamp(4, 0), 2); - auto oldFuture = waitService()->waitUntilMajority(oldTs); - auto alreadyWaitedFuture = waitService()->waitUntilMajority(t1); - - ASSERT_FALSE(future2.isReady()); - - oldFuture.get(); - alreadyWaitedFuture.get(); - ASSERT_EQ(t1, getLastOpTimeWaited()); - - finishWaitingOneOpTime(); - - future2.get(); - ASSERT_EQ(t2, getLastOpTimeWaited()); -} - -TEST_F(WaitForMajorityServiceTest, ShutdownShouldCancelQueuedRequests) { - repl::OpTime t1(Timestamp(5, 0), 2); - repl::OpTime t2(Timestamp(14, 0), 2); - - auto future1 = waitService()->waitUntilMajority(t1); - auto future2 = waitService()->waitUntilMajority(t2); - - ASSERT_FALSE(future1.isReady()); - ASSERT_FALSE(future2.isReady()); - - waitService()->shutDown(); - - ASSERT_THROWS_CODE(future1.get(), AssertionException, ErrorCodes::InterruptedAtShutdown); - ASSERT_THROWS_CODE(future2.get(), AssertionException, ErrorCodes::InterruptedAtShutdown); -} - -TEST_F(WaitForMajorityServiceTest, WriteConcernErrorGetsPropagatedCorrectly) { - repl::OpTime t(Timestamp(5, 0), 2); - - auto replCoord = dynamic_cast( - repl::ReplicationCoordinator::get(getServiceContext())); - replCoord->setAwaitReplicationReturnValueFunction( - [this](OperationContext* opCtx, const repl::OpTime& opTime) { - return repl::ReplicationCoordinator::StatusAndDuration( - {ErrorCodes::PrimarySteppedDown, "test stepdown"}, Milliseconds(0)); - }); - - auto future = waitService()->waitUntilMajority(t); - ASSERT_THROWS_CODE(future.get(), AssertionException, ErrorCodes::PrimarySteppedDown); -} - -} // namespace -} // namespace mongo -- cgit v1.2.1