diff options
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r-- | src/mongo/db/s/SConscript | 3 | ||||
-rw-r--r-- | src/mongo/db/s/collection_sharding_runtime_test.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/s/migration_util.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/s/migration_util_test.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/s/range_deletion_util.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/s/range_deletion_util_test.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_test_fixture.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/s/wait_for_majority_service.cpp | 200 | ||||
-rw-r--r-- | src/mongo/db/s/wait_for_majority_service.h | 100 | ||||
-rw-r--r-- | src/mongo/db/s/wait_for_majority_service_test.cpp | 271 |
11 files changed, 8 insertions, 580 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 59a3b7294f1..3950e0dddb1 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -143,7 +143,6 @@ env.Library( 'transaction_coordinator_util.cpp', 'transaction_coordinator_worker_curop_repository_mongod.cpp', 'transaction_coordinator.cpp', - 'wait_for_majority_service.cpp', env.Idlc('transaction_coordinator_document.idl')[0], env.Idlc('transaction_coordinators_stats.idl')[0], ], @@ -152,6 +151,7 @@ env.Library( '$BUILD_DIR/mongo/db/commands/server_status', '$BUILD_DIR/mongo/db/commands/txn_cmd_request', '$BUILD_DIR/mongo/db/dbdirectclient', + '$BUILD_DIR/mongo/db/repl/wait_for_majority_service', '$BUILD_DIR/mongo/db/rw_concern_d', '$BUILD_DIR/mongo/db/vector_clock_mutable', '$BUILD_DIR/mongo/executor/task_executor_pool', @@ -404,7 +404,6 @@ env.CppUnitTest( 'start_chunk_clone_request_test.cpp', 'type_shard_identity_test.cpp', 'vector_clock_shard_server_test.cpp', - 'wait_for_majority_service_test.cpp', ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/db/auth/authmocks', diff --git a/src/mongo/db/s/collection_sharding_runtime_test.cpp b/src/mongo/db/s/collection_sharding_runtime_test.cpp index a58d4375a92..cc8984a57ed 100644 --- a/src/mongo/db/s/collection_sharding_runtime_test.cpp +++ b/src/mongo/db/s/collection_sharding_runtime_test.cpp @@ -32,10 +32,10 @@ #include "mongo/db/catalog_raii.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" +#include "mongo/db/repl/wait_for_majority_service.h" #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/shard_server_test_fixture.h" -#include "mongo/db/s/wait_for_majority_service.h" #include "mongo/util/fail_point.h" namespace mongo { diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp index 9a2a891f39d..4a57fae8c4d 100644 --- a/src/mongo/db/s/migration_util.cpp +++ b/src/mongo/db/s/migration_util.cpp @@ -48,6 +48,7 @@ #include "mongo/db/ops/write_ops.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/repl/wait_for_majority_service.h" #include "mongo/db/s/active_migrations_registry.h" #include "mongo/db/s/collection_metadata.h" #include "mongo/db/s/collection_sharding_runtime.h" @@ -56,7 +57,6 @@ #include "mongo/db/s/sharding_runtime_d_params_gen.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/s/sharding_statistics.h" -#include "mongo/db/s/wait_for_majority_service.h" #include "mongo/db/write_concern.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/executor/thread_pool_task_executor.h" diff --git a/src/mongo/db/s/migration_util_test.cpp b/src/mongo/db/s/migration_util_test.cpp index f643224ccee..165ad6a85b1 100644 --- a/src/mongo/db/s/migration_util_test.cpp +++ b/src/mongo/db/s/migration_util_test.cpp @@ -30,6 +30,7 @@ #include "mongo/client/remote_command_targeter_factory_mock.h" #include "mongo/client/remote_command_targeter_mock.h" #include "mongo/db/catalog_raii.h" +#include "mongo/db/repl/wait_for_majority_service.h" #include "mongo/db/s/catalog_cache_loader_mock.h" #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/collection_sharding_state.h" @@ -39,7 +40,6 @@ #include "mongo/db/s/shard_server_catalog_cache_loader.h" #include "mongo/db/s/shard_server_test_fixture.h" #include "mongo/db/s/sharding_state.h" -#include "mongo/db/s/wait_for_majority_service.h" #include "mongo/s/catalog/sharding_catalog_client_mock.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/database_version_helpers.h" diff --git a/src/mongo/db/s/range_deletion_util.cpp b/src/mongo/db/s/range_deletion_util.cpp index 97c087b448c..1a5ba642aa4 100644 --- a/src/mongo/db/s/range_deletion_util.cpp +++ b/src/mongo/db/s/range_deletion_util.cpp @@ -53,11 +53,11 @@ #include "mongo/db/query/query_knobs_gen.h" #include "mongo/db/query/query_planner.h" #include "mongo/db/repl/repl_client_info.h" +#include "mongo/db/repl/wait_for_majority_service.h" #include "mongo/db/s/migration_util.h" #include "mongo/db/s/persistent_task_store.h" #include "mongo/db/s/range_deletion_task_gen.h" #include "mongo/db/s/sharding_statistics.h" -#include "mongo/db/s/wait_for_majority_service.h" #include "mongo/db/service_context.h" #include "mongo/db/storage/remove_saver.h" #include "mongo/db/write_concern.h" diff --git a/src/mongo/db/s/range_deletion_util_test.cpp b/src/mongo/db/s/range_deletion_util_test.cpp index 919d723028f..c0d769a2564 100644 --- a/src/mongo/db/s/range_deletion_util_test.cpp +++ b/src/mongo/db/s/range_deletion_util_test.cpp @@ -31,6 +31,7 @@ #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" +#include "mongo/db/repl/wait_for_majority_service.h" #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/metadata_manager.h" #include "mongo/db/s/migration_util.h" @@ -39,7 +40,6 @@ #include "mongo/db/s/range_deletion_util.h" #include "mongo/db/s/shard_server_test_fixture.h" #include "mongo/db/s/sharding_runtime_d_params_gen.h" -#include "mongo/db/s/wait_for_majority_service.h" #include "mongo/unittest/death_test.h" #include "mongo/util/fail_point.h" diff --git a/src/mongo/db/s/transaction_coordinator.cpp b/src/mongo/db/s/transaction_coordinator.cpp index 0a9b900f853..52353319a55 100644 --- a/src/mongo/db/s/transaction_coordinator.cpp +++ b/src/mongo/db/s/transaction_coordinator.cpp @@ -33,8 +33,8 @@ #include "mongo/db/s/transaction_coordinator.h" +#include "mongo/db/repl/wait_for_majority_service.h" #include "mongo/db/s/transaction_coordinator_metrics_observer.h" -#include "mongo/db/s/wait_for_majority_service.h" #include "mongo/db/server_options.h" #include "mongo/db/vector_clock_mutable.h" #include "mongo/logv2/log.h" diff --git a/src/mongo/db/s/transaction_coordinator_test_fixture.cpp b/src/mongo/db/s/transaction_coordinator_test_fixture.cpp index 7d9d565d865..256d0905278 100644 --- a/src/mongo/db/s/transaction_coordinator_test_fixture.cpp +++ b/src/mongo/db/s/transaction_coordinator_test_fixture.cpp @@ -37,7 +37,7 @@ #include "mongo/db/commands/txn_cmds_gen.h" #include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h" #include "mongo/db/operation_context.h" -#include "mongo/db/s/wait_for_majority_service.h" +#include "mongo/db/repl/wait_for_majority_service.h" #include "mongo/rpc/metadata/client_metadata.h" #include "mongo/rpc/metadata/client_metadata_ismaster.h" #include "mongo/s/catalog/sharding_catalog_client_mock.h" diff --git a/src/mongo/db/s/wait_for_majority_service.cpp b/src/mongo/db/s/wait_for_majority_service.cpp deleted file mode 100644 index 8ab5c19d21a..00000000000 --- a/src/mongo/db/s/wait_for_majority_service.cpp +++ /dev/null @@ -1,200 +0,0 @@ -/** - * Copyright (C) 2019-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding - -#include "mongo/platform/basic.h" - -#include "mongo/db/s/wait_for_majority_service.h" - -#include <utility> - -#include "mongo/db/service_context.h" -#include "mongo/db/write_concern.h" -#include "mongo/executor/network_interface_factory.h" -#include "mongo/executor/thread_pool_task_executor.h" -#include "mongo/logv2/log.h" -#include "mongo/util/concurrency/idle_thread_block.h" -#include "mongo/util/concurrency/thread_pool.h" - -namespace mongo { - -namespace { -const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority, - WriteConcernOptions::SyncMode::UNSET, - WriteConcernOptions::kNoTimeout); - -const auto waitForMajorityServiceDecoration = - ServiceContext::declareDecoration<WaitForMajorityService>(); -} // namespace - -WaitForMajorityService::~WaitForMajorityService() { - shutDown(); -} - -WaitForMajorityService& WaitForMajorityService::get(ServiceContext* service) { - return waitForMajorityServiceDecoration(service); -} - -void WaitForMajorityService::setUp(ServiceContext* service) { - stdx::lock_guard lk(_mutex); - - if (!_thread.joinable() && !_inShutDown) { - _thread = stdx::thread([this, service] { _periodicallyWaitForMajority(service); }); - } -} - -void WaitForMajorityService::shutDown() { - { - stdx::lock_guard lk(_mutex); - - if (std::exchange(_inShutDown, true)) { - return; - } - - if (_opCtx) { - stdx::lock_guard scopedClientLock(*_opCtx->getClient()); - _opCtx->getServiceContext()->killOperation( - scopedClientLock, _opCtx, ErrorCodes::InterruptedAtShutdown); - } - } - - if (_thread.joinable()) { - _thread.join(); - } - - stdx::lock_guard lk(_mutex); - for (auto&& pendingRequest : _queuedOpTimes) { - pendingRequest.second.setError( - {ErrorCodes::InterruptedAtShutdown, "Shutting down wait for majority service"}); - } - - _queuedOpTimes.clear(); -} - -SharedSemiFuture<void> WaitForMajorityService::waitUntilMajority(const repl::OpTime& opTime) { - stdx::lock_guard lk(_mutex); - - if (_inShutDown) { - return {Future<void>::makeReady( - Status{ErrorCodes::ShutdownInProgress, - "rejecting wait for majority request due to server shutdown"})}; - } - - // Background thread must be running before requesting. - invariant(_thread.joinable()); - - if (_lastOpTimeWaited >= opTime) { - return {Future<void>::makeReady()}; - } - - auto iter = _queuedOpTimes.lower_bound(opTime); - if (iter != _queuedOpTimes.end()) { - if (iter->first == opTime) { - return iter->second.getFuture(); - } - } - - if (iter == _queuedOpTimes.begin()) { - // Background thread could already be actively waiting on a later time, so tell it to stop - // and wait for the newly requested opTime instead. - if (_opCtx) { - stdx::lock_guard scopedClientLock(*_opCtx->getClient()); - _opCtx->getServiceContext()->killOperation( - scopedClientLock, _opCtx, ErrorCodes::WaitForMajorityServiceEarlierOpTimeAvailable); - } - } - - const bool wasEmpty = _queuedOpTimes.empty(); - auto resultIter = _queuedOpTimes.emplace_hint( - iter, std::piecewise_construct, std::forward_as_tuple(opTime), std::forward_as_tuple()); - - if (wasEmpty) { - _hasNewOpTimeCV.notify_one(); - } - - return resultIter->second.getFuture(); -} - -void WaitForMajorityService::_periodicallyWaitForMajority(ServiceContext* service) { - ThreadClient tc("waitForMajority", service); - - stdx::unique_lock<Latch> lk(_mutex); - - while (!_inShutDown) { - auto opCtx = tc->makeOperationContext(); - _opCtx = opCtx.get(); - - if (!_queuedOpTimes.empty()) { - auto lowestOpTimeIter = _queuedOpTimes.begin(); - auto lowestOpTime = lowestOpTimeIter->first; - - lk.unlock(); - - WriteConcernResult ignoreResult; - auto status = - waitForWriteConcern(_opCtx, lowestOpTime, kMajorityWriteConcern, &ignoreResult); - - lk.lock(); - - if (status.isOK()) { - _lastOpTimeWaited = lowestOpTime; - } - - if (status == ErrorCodes::WaitForMajorityServiceEarlierOpTimeAvailable) { - _opCtx = nullptr; - continue; - } - - if (status.isOK()) { - lowestOpTimeIter->second.emplaceValue(); - } else { - lowestOpTimeIter->second.setError(status); - } - - _queuedOpTimes.erase(lowestOpTimeIter); - } - - try { - MONGO_IDLE_THREAD_BLOCK; - _opCtx->waitForConditionOrInterrupt( - _hasNewOpTimeCV, lk, [&] { return !_queuedOpTimes.empty() || _inShutDown; }); - } catch (const DBException& e) { - LOGV2_DEBUG(22487, - 1, - "Unable to wait for new op time due to: {error}", - "Unable to wait for new op time", - "error"_attr = e); - } - - _opCtx = nullptr; - } -} - -} // namespace mongo diff --git a/src/mongo/db/s/wait_for_majority_service.h b/src/mongo/db/s/wait_for_majority_service.h deleted file mode 100644 index 1663ce907ac..00000000000 --- a/src/mongo/db/s/wait_for_majority_service.h +++ /dev/null @@ -1,100 +0,0 @@ -/** - * Copyright (C) 2019-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <map> -#include <memory> -#include <vector> - -#include "mongo/db/repl/optime.h" -#include "mongo/db/service_context.h" -#include "mongo/executor/task_executor.h" -#include "mongo/platform/mutex.h" -#include "mongo/stdx/thread.h" -#include "mongo/util/future.h" - -namespace mongo { - -/** - * Provides a facility for asynchronously waiting a local opTime to be majority committed. - */ -class WaitForMajorityService { -public: - ~WaitForMajorityService(); - - static WaitForMajorityService& get(ServiceContext* service); - - /** - * Sets up the background thread responsible for waiting for opTimes to be majority committed. - */ - void setUp(ServiceContext* service); - - /** - * Blocking method, which shuts down and joins the background thread. - */ - void shutDown(); - - /** - * Enqueue a request to wait for the given opTime to be majority committed. - */ - SharedSemiFuture<void> waitUntilMajority(const repl::OpTime& opTime); - -private: - using OpTimeWaitingMap = std::map<repl::OpTime, SharedPromise<void>>; - - /** - * Periodically checks the list of opTimes to wait for majority committed. - */ - void _periodicallyWaitForMajority(ServiceContext* service); - - Mutex _mutex = MONGO_MAKE_LATCH("WaitForMajorityService::_mutex"); - - // Contains an ordered list of opTimes to wait to be majority comitted. - OpTimeWaitingMap _queuedOpTimes; - - // Contains the last opTime that the background thread was able to successfully wait to be - // majority comitted. - repl::OpTime _lastOpTimeWaited; - - // The background thread. - stdx::thread _thread; - - // Use for signalling new opTime requests being queued. - stdx::condition_variable _hasNewOpTimeCV; - - // If set, contains a reference to the opCtx being used by the background thread. - // Only valid when _thread.joinable() and not nullptr. - OperationContext* _opCtx{nullptr}; - - // Flag is set to true after shutDown() is called. - bool _inShutDown{false}; -}; - -} // namespace mongo diff --git a/src/mongo/db/s/wait_for_majority_service_test.cpp b/src/mongo/db/s/wait_for_majority_service_test.cpp deleted file mode 100644 index c9e2a8af3ef..00000000000 --- a/src/mongo/db/s/wait_for_majority_service_test.cpp +++ /dev/null @@ -1,271 +0,0 @@ -/** - * Copyright (C) 2019-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/db/repl/replication_coordinator_mock.h" -#include "mongo/db/s/wait_for_majority_service.h" -#include "mongo/db/service_context_d_test_fixture.h" -#include "mongo/platform/mutex.h" -#include "mongo/unittest/unittest.h" - -namespace mongo { -namespace { - -class WaitForMajorityServiceTest : public ServiceContextMongoDTest { -public: - void setUp() override { - auto service = getServiceContext(); - waitService()->setUp(service); - - auto replCoord = std::make_unique<repl::ReplicationCoordinatorMock>(service); - - replCoord->setAwaitReplicationReturnValueFunction( - [this](OperationContext* opCtx, const repl::OpTime& opTime) { - auto status = waitForWriteConcernStub(opCtx, opTime); - return repl::ReplicationCoordinator::StatusAndDuration(status, Milliseconds(0)); - }); - - repl::ReplicationCoordinator::set(service, std::move(replCoord)); - } - - void tearDown() override { - waitService()->shutDown(); - } - - WaitForMajorityService* waitService() { - return &_waitForMajorityService; - } - - void finishWaitingOneOpTime() { - stdx::unique_lock<Latch> lk(_mutex); - _isTestReady = true; - _isTestReadyCV.notify_one(); - - while (_isTestReady) { - _finishWaitingOneOpTimeCV.wait(lk); - } - } - - Status waitForWriteConcernStub(OperationContext* opCtx, const repl::OpTime& opTime) { - stdx::unique_lock<Latch> lk(_mutex); - - _waitForMajorityCallCount++; - _callCountChangedCV.notify_one(); - - try { - opCtx->waitForConditionOrInterrupt(_isTestReadyCV, lk, [&] { return _isTestReady; }); - } catch (const DBException& e) { - _isTestReady = false; - _finishWaitingOneOpTimeCV.notify_one(); - - return e.toStatus(); - } - - _lastOpTimeWaited = opTime; - _isTestReady = false; - _finishWaitingOneOpTimeCV.notify_one(); - - return Status::OK(); - } - - const repl::OpTime& getLastOpTimeWaited() { - stdx::lock_guard<Latch> lk(_mutex); - return _lastOpTimeWaited; - } - - void waitForMajorityCallCountGreaterThan(int expectedCount) { - stdx::unique_lock lk(_mutex); - _callCountChangedCV.wait(lk, [&] { return _waitForMajorityCallCount > expectedCount; }); - } - -private: - WaitForMajorityService _waitForMajorityService; - - Mutex _mutex = MONGO_MAKE_LATCH("WaitForMajorityServiceTest::_mutex"); - stdx::condition_variable _isTestReadyCV; - stdx::condition_variable _finishWaitingOneOpTimeCV; - stdx::condition_variable _callCountChangedCV; - - bool _isTestReady{false}; - repl::OpTime _lastOpTimeWaited; - int _waitForMajorityCallCount{0}; -}; - -TEST_F(WaitForMajorityServiceTest, WaitOneOpTime) { - repl::OpTime t1(Timestamp(1, 0), 2); - - auto future = waitService()->waitUntilMajority(t1); - - ASSERT_FALSE(future.isReady()); - - finishWaitingOneOpTime(); - - future.get(); - ASSERT_EQ(t1, getLastOpTimeWaited()); -} - -TEST_F(WaitForMajorityServiceTest, WaitWithSameOpTime) { - repl::OpTime t1(Timestamp(1, 0), 2); - - auto future1 = waitService()->waitUntilMajority(t1); - auto future1b = waitService()->waitUntilMajority(t1); - - ASSERT_FALSE(future1.isReady()); - ASSERT_FALSE(future1b.isReady()); - - finishWaitingOneOpTime(); - - future1.get(); - future1b.get(); - - ASSERT_EQ(t1, getLastOpTimeWaited()); -} - -TEST_F(WaitForMajorityServiceTest, WaitWithOpTimeEarlierThanLowestQueued) { - repl::OpTime laterOpTime(Timestamp(6, 0), 2); - repl::OpTime earlierOpTime(Timestamp(1, 0), 2); - - auto laterFuture = waitService()->waitUntilMajority(laterOpTime); - - // Wait until the background thread picks up the queued opTime. - waitForMajorityCallCountGreaterThan(0); - - // The 2nd request has an earlier time, so it will interrupt 'laterOpTime' and skip the line. - auto earlierFuture = waitService()->waitUntilMajority(earlierOpTime); - - // Wait for background thread to finish transitioning from waiting on laterOpTime to - // earlierOpTime. - waitForMajorityCallCountGreaterThan(1); - - ASSERT_FALSE(laterFuture.isReady()); - ASSERT_FALSE(earlierFuture.isReady()); - - finishWaitingOneOpTime(); - - ASSERT_FALSE(laterFuture.isReady()); - - earlierFuture.get(); - ASSERT_EQ(earlierOpTime, getLastOpTimeWaited()); - - finishWaitingOneOpTime(); - laterFuture.get(); - - ASSERT_EQ(laterOpTime, getLastOpTimeWaited()); -} - -TEST_F(WaitForMajorityServiceTest, WaitWithDifferentOpTime) { - repl::OpTime t1(Timestamp(1, 0), 2); - repl::OpTime t2(Timestamp(14, 0), 2); - - auto future1 = waitService()->waitUntilMajority(t1); - auto future2 = waitService()->waitUntilMajority(t2); - - ASSERT_FALSE(future1.isReady()); - ASSERT_FALSE(future2.isReady()); - - finishWaitingOneOpTime(); - - ASSERT_FALSE(future2.isReady()); - - future1.get(); - ASSERT_EQ(t1, getLastOpTimeWaited()); - - finishWaitingOneOpTime(); - - future2.get(); - ASSERT_EQ(t2, getLastOpTimeWaited()); -} - -TEST_F(WaitForMajorityServiceTest, WaitWithOpTimeEarlierThanOpTimeAlreadyWaited) { - repl::OpTime t1(Timestamp(5, 0), 2); - repl::OpTime t2(Timestamp(14, 0), 2); - - auto future1 = waitService()->waitUntilMajority(t1); - auto future2 = waitService()->waitUntilMajority(t2); - - ASSERT_FALSE(future1.isReady()); - ASSERT_FALSE(future2.isReady()); - - finishWaitingOneOpTime(); - - ASSERT_FALSE(future2.isReady()); - - future1.get(); - ASSERT_EQ(t1, getLastOpTimeWaited()); - - repl::OpTime oldTs(Timestamp(4, 0), 2); - auto oldFuture = waitService()->waitUntilMajority(oldTs); - auto alreadyWaitedFuture = waitService()->waitUntilMajority(t1); - - ASSERT_FALSE(future2.isReady()); - - oldFuture.get(); - alreadyWaitedFuture.get(); - ASSERT_EQ(t1, getLastOpTimeWaited()); - - finishWaitingOneOpTime(); - - future2.get(); - ASSERT_EQ(t2, getLastOpTimeWaited()); -} - -TEST_F(WaitForMajorityServiceTest, ShutdownShouldCancelQueuedRequests) { - repl::OpTime t1(Timestamp(5, 0), 2); - repl::OpTime t2(Timestamp(14, 0), 2); - - auto future1 = waitService()->waitUntilMajority(t1); - auto future2 = waitService()->waitUntilMajority(t2); - - ASSERT_FALSE(future1.isReady()); - ASSERT_FALSE(future2.isReady()); - - waitService()->shutDown(); - - ASSERT_THROWS_CODE(future1.get(), AssertionException, ErrorCodes::InterruptedAtShutdown); - ASSERT_THROWS_CODE(future2.get(), AssertionException, ErrorCodes::InterruptedAtShutdown); -} - -TEST_F(WaitForMajorityServiceTest, WriteConcernErrorGetsPropagatedCorrectly) { - repl::OpTime t(Timestamp(5, 0), 2); - - auto replCoord = dynamic_cast<repl::ReplicationCoordinatorMock*>( - repl::ReplicationCoordinator::get(getServiceContext())); - replCoord->setAwaitReplicationReturnValueFunction( - [this](OperationContext* opCtx, const repl::OpTime& opTime) { - return repl::ReplicationCoordinator::StatusAndDuration( - {ErrorCodes::PrimarySteppedDown, "test stepdown"}, Milliseconds(0)); - }); - - auto future = waitService()->waitUntilMajority(t); - ASSERT_THROWS_CODE(future.get(), AssertionException, ErrorCodes::PrimarySteppedDown); -} - -} // namespace -} // namespace mongo |