summaryrefslogtreecommitdiff
path: root/src/mongo/db/s
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r--src/mongo/db/s/SConscript3
-rw-r--r--src/mongo/db/s/collection_sharding_runtime_test.cpp2
-rw-r--r--src/mongo/db/s/migration_util.cpp2
-rw-r--r--src/mongo/db/s/migration_util_test.cpp2
-rw-r--r--src/mongo/db/s/range_deletion_util.cpp2
-rw-r--r--src/mongo/db/s/range_deletion_util_test.cpp2
-rw-r--r--src/mongo/db/s/transaction_coordinator.cpp2
-rw-r--r--src/mongo/db/s/transaction_coordinator_test_fixture.cpp2
-rw-r--r--src/mongo/db/s/wait_for_majority_service.cpp200
-rw-r--r--src/mongo/db/s/wait_for_majority_service.h100
-rw-r--r--src/mongo/db/s/wait_for_majority_service_test.cpp271
11 files changed, 8 insertions, 580 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 59a3b7294f1..3950e0dddb1 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -143,7 +143,6 @@ env.Library(
'transaction_coordinator_util.cpp',
'transaction_coordinator_worker_curop_repository_mongod.cpp',
'transaction_coordinator.cpp',
- 'wait_for_majority_service.cpp',
env.Idlc('transaction_coordinator_document.idl')[0],
env.Idlc('transaction_coordinators_stats.idl')[0],
],
@@ -152,6 +151,7 @@ env.Library(
'$BUILD_DIR/mongo/db/commands/server_status',
'$BUILD_DIR/mongo/db/commands/txn_cmd_request',
'$BUILD_DIR/mongo/db/dbdirectclient',
+ '$BUILD_DIR/mongo/db/repl/wait_for_majority_service',
'$BUILD_DIR/mongo/db/rw_concern_d',
'$BUILD_DIR/mongo/db/vector_clock_mutable',
'$BUILD_DIR/mongo/executor/task_executor_pool',
@@ -404,7 +404,6 @@ env.CppUnitTest(
'start_chunk_clone_request_test.cpp',
'type_shard_identity_test.cpp',
'vector_clock_shard_server_test.cpp',
- 'wait_for_majority_service_test.cpp',
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/db/auth/authmocks',
diff --git a/src/mongo/db/s/collection_sharding_runtime_test.cpp b/src/mongo/db/s/collection_sharding_runtime_test.cpp
index a58d4375a92..cc8984a57ed 100644
--- a/src/mongo/db/s/collection_sharding_runtime_test.cpp
+++ b/src/mongo/db/s/collection_sharding_runtime_test.cpp
@@ -32,10 +32,10 @@
#include "mongo/db/catalog_raii.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/repl/wait_for_majority_service.h"
#include "mongo/db/s/collection_sharding_runtime.h"
#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/shard_server_test_fixture.h"
-#include "mongo/db/s/wait_for_majority_service.h"
#include "mongo/util/fail_point.h"
namespace mongo {
diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp
index 9a2a891f39d..4a57fae8c4d 100644
--- a/src/mongo/db/s/migration_util.cpp
+++ b/src/mongo/db/s/migration_util.cpp
@@ -48,6 +48,7 @@
#include "mongo/db/ops/write_ops.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/db/repl/wait_for_majority_service.h"
#include "mongo/db/s/active_migrations_registry.h"
#include "mongo/db/s/collection_metadata.h"
#include "mongo/db/s/collection_sharding_runtime.h"
@@ -56,7 +57,6 @@
#include "mongo/db/s/sharding_runtime_d_params_gen.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/s/sharding_statistics.h"
-#include "mongo/db/s/wait_for_majority_service.h"
#include "mongo/db/write_concern.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/executor/thread_pool_task_executor.h"
diff --git a/src/mongo/db/s/migration_util_test.cpp b/src/mongo/db/s/migration_util_test.cpp
index f643224ccee..165ad6a85b1 100644
--- a/src/mongo/db/s/migration_util_test.cpp
+++ b/src/mongo/db/s/migration_util_test.cpp
@@ -30,6 +30,7 @@
#include "mongo/client/remote_command_targeter_factory_mock.h"
#include "mongo/client/remote_command_targeter_mock.h"
#include "mongo/db/catalog_raii.h"
+#include "mongo/db/repl/wait_for_majority_service.h"
#include "mongo/db/s/catalog_cache_loader_mock.h"
#include "mongo/db/s/collection_sharding_runtime.h"
#include "mongo/db/s/collection_sharding_state.h"
@@ -39,7 +40,6 @@
#include "mongo/db/s/shard_server_catalog_cache_loader.h"
#include "mongo/db/s/shard_server_test_fixture.h"
#include "mongo/db/s/sharding_state.h"
-#include "mongo/db/s/wait_for_majority_service.h"
#include "mongo/s/catalog/sharding_catalog_client_mock.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/database_version_helpers.h"
diff --git a/src/mongo/db/s/range_deletion_util.cpp b/src/mongo/db/s/range_deletion_util.cpp
index 97c087b448c..1a5ba642aa4 100644
--- a/src/mongo/db/s/range_deletion_util.cpp
+++ b/src/mongo/db/s/range_deletion_util.cpp
@@ -53,11 +53,11 @@
#include "mongo/db/query/query_knobs_gen.h"
#include "mongo/db/query/query_planner.h"
#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/db/repl/wait_for_majority_service.h"
#include "mongo/db/s/migration_util.h"
#include "mongo/db/s/persistent_task_store.h"
#include "mongo/db/s/range_deletion_task_gen.h"
#include "mongo/db/s/sharding_statistics.h"
-#include "mongo/db/s/wait_for_majority_service.h"
#include "mongo/db/service_context.h"
#include "mongo/db/storage/remove_saver.h"
#include "mongo/db/write_concern.h"
diff --git a/src/mongo/db/s/range_deletion_util_test.cpp b/src/mongo/db/s/range_deletion_util_test.cpp
index 919d723028f..c0d769a2564 100644
--- a/src/mongo/db/s/range_deletion_util_test.cpp
+++ b/src/mongo/db/s/range_deletion_util_test.cpp
@@ -31,6 +31,7 @@
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/repl/wait_for_majority_service.h"
#include "mongo/db/s/collection_sharding_runtime.h"
#include "mongo/db/s/metadata_manager.h"
#include "mongo/db/s/migration_util.h"
@@ -39,7 +40,6 @@
#include "mongo/db/s/range_deletion_util.h"
#include "mongo/db/s/shard_server_test_fixture.h"
#include "mongo/db/s/sharding_runtime_d_params_gen.h"
-#include "mongo/db/s/wait_for_majority_service.h"
#include "mongo/unittest/death_test.h"
#include "mongo/util/fail_point.h"
diff --git a/src/mongo/db/s/transaction_coordinator.cpp b/src/mongo/db/s/transaction_coordinator.cpp
index 0a9b900f853..52353319a55 100644
--- a/src/mongo/db/s/transaction_coordinator.cpp
+++ b/src/mongo/db/s/transaction_coordinator.cpp
@@ -33,8 +33,8 @@
#include "mongo/db/s/transaction_coordinator.h"
+#include "mongo/db/repl/wait_for_majority_service.h"
#include "mongo/db/s/transaction_coordinator_metrics_observer.h"
-#include "mongo/db/s/wait_for_majority_service.h"
#include "mongo/db/server_options.h"
#include "mongo/db/vector_clock_mutable.h"
#include "mongo/logv2/log.h"
diff --git a/src/mongo/db/s/transaction_coordinator_test_fixture.cpp b/src/mongo/db/s/transaction_coordinator_test_fixture.cpp
index 7d9d565d865..256d0905278 100644
--- a/src/mongo/db/s/transaction_coordinator_test_fixture.cpp
+++ b/src/mongo/db/s/transaction_coordinator_test_fixture.cpp
@@ -37,7 +37,7 @@
#include "mongo/db/commands/txn_cmds_gen.h"
#include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h"
#include "mongo/db/operation_context.h"
-#include "mongo/db/s/wait_for_majority_service.h"
+#include "mongo/db/repl/wait_for_majority_service.h"
#include "mongo/rpc/metadata/client_metadata.h"
#include "mongo/rpc/metadata/client_metadata_ismaster.h"
#include "mongo/s/catalog/sharding_catalog_client_mock.h"
diff --git a/src/mongo/db/s/wait_for_majority_service.cpp b/src/mongo/db/s/wait_for_majority_service.cpp
deleted file mode 100644
index 8ab5c19d21a..00000000000
--- a/src/mongo/db/s/wait_for_majority_service.cpp
+++ /dev/null
@@ -1,200 +0,0 @@
-/**
- * Copyright (C) 2019-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/s/wait_for_majority_service.h"
-
-#include <utility>
-
-#include "mongo/db/service_context.h"
-#include "mongo/db/write_concern.h"
-#include "mongo/executor/network_interface_factory.h"
-#include "mongo/executor/thread_pool_task_executor.h"
-#include "mongo/logv2/log.h"
-#include "mongo/util/concurrency/idle_thread_block.h"
-#include "mongo/util/concurrency/thread_pool.h"
-
-namespace mongo {
-
-namespace {
-const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority,
- WriteConcernOptions::SyncMode::UNSET,
- WriteConcernOptions::kNoTimeout);
-
-const auto waitForMajorityServiceDecoration =
- ServiceContext::declareDecoration<WaitForMajorityService>();
-} // namespace
-
-WaitForMajorityService::~WaitForMajorityService() {
- shutDown();
-}
-
-WaitForMajorityService& WaitForMajorityService::get(ServiceContext* service) {
- return waitForMajorityServiceDecoration(service);
-}
-
-void WaitForMajorityService::setUp(ServiceContext* service) {
- stdx::lock_guard lk(_mutex);
-
- if (!_thread.joinable() && !_inShutDown) {
- _thread = stdx::thread([this, service] { _periodicallyWaitForMajority(service); });
- }
-}
-
-void WaitForMajorityService::shutDown() {
- {
- stdx::lock_guard lk(_mutex);
-
- if (std::exchange(_inShutDown, true)) {
- return;
- }
-
- if (_opCtx) {
- stdx::lock_guard scopedClientLock(*_opCtx->getClient());
- _opCtx->getServiceContext()->killOperation(
- scopedClientLock, _opCtx, ErrorCodes::InterruptedAtShutdown);
- }
- }
-
- if (_thread.joinable()) {
- _thread.join();
- }
-
- stdx::lock_guard lk(_mutex);
- for (auto&& pendingRequest : _queuedOpTimes) {
- pendingRequest.second.setError(
- {ErrorCodes::InterruptedAtShutdown, "Shutting down wait for majority service"});
- }
-
- _queuedOpTimes.clear();
-}
-
-SharedSemiFuture<void> WaitForMajorityService::waitUntilMajority(const repl::OpTime& opTime) {
- stdx::lock_guard lk(_mutex);
-
- if (_inShutDown) {
- return {Future<void>::makeReady(
- Status{ErrorCodes::ShutdownInProgress,
- "rejecting wait for majority request due to server shutdown"})};
- }
-
- // Background thread must be running before requesting.
- invariant(_thread.joinable());
-
- if (_lastOpTimeWaited >= opTime) {
- return {Future<void>::makeReady()};
- }
-
- auto iter = _queuedOpTimes.lower_bound(opTime);
- if (iter != _queuedOpTimes.end()) {
- if (iter->first == opTime) {
- return iter->second.getFuture();
- }
- }
-
- if (iter == _queuedOpTimes.begin()) {
- // Background thread could already be actively waiting on a later time, so tell it to stop
- // and wait for the newly requested opTime instead.
- if (_opCtx) {
- stdx::lock_guard scopedClientLock(*_opCtx->getClient());
- _opCtx->getServiceContext()->killOperation(
- scopedClientLock, _opCtx, ErrorCodes::WaitForMajorityServiceEarlierOpTimeAvailable);
- }
- }
-
- const bool wasEmpty = _queuedOpTimes.empty();
- auto resultIter = _queuedOpTimes.emplace_hint(
- iter, std::piecewise_construct, std::forward_as_tuple(opTime), std::forward_as_tuple());
-
- if (wasEmpty) {
- _hasNewOpTimeCV.notify_one();
- }
-
- return resultIter->second.getFuture();
-}
-
-void WaitForMajorityService::_periodicallyWaitForMajority(ServiceContext* service) {
- ThreadClient tc("waitForMajority", service);
-
- stdx::unique_lock<Latch> lk(_mutex);
-
- while (!_inShutDown) {
- auto opCtx = tc->makeOperationContext();
- _opCtx = opCtx.get();
-
- if (!_queuedOpTimes.empty()) {
- auto lowestOpTimeIter = _queuedOpTimes.begin();
- auto lowestOpTime = lowestOpTimeIter->first;
-
- lk.unlock();
-
- WriteConcernResult ignoreResult;
- auto status =
- waitForWriteConcern(_opCtx, lowestOpTime, kMajorityWriteConcern, &ignoreResult);
-
- lk.lock();
-
- if (status.isOK()) {
- _lastOpTimeWaited = lowestOpTime;
- }
-
- if (status == ErrorCodes::WaitForMajorityServiceEarlierOpTimeAvailable) {
- _opCtx = nullptr;
- continue;
- }
-
- if (status.isOK()) {
- lowestOpTimeIter->second.emplaceValue();
- } else {
- lowestOpTimeIter->second.setError(status);
- }
-
- _queuedOpTimes.erase(lowestOpTimeIter);
- }
-
- try {
- MONGO_IDLE_THREAD_BLOCK;
- _opCtx->waitForConditionOrInterrupt(
- _hasNewOpTimeCV, lk, [&] { return !_queuedOpTimes.empty() || _inShutDown; });
- } catch (const DBException& e) {
- LOGV2_DEBUG(22487,
- 1,
- "Unable to wait for new op time due to: {error}",
- "Unable to wait for new op time",
- "error"_attr = e);
- }
-
- _opCtx = nullptr;
- }
-}
-
-} // namespace mongo
diff --git a/src/mongo/db/s/wait_for_majority_service.h b/src/mongo/db/s/wait_for_majority_service.h
deleted file mode 100644
index 1663ce907ac..00000000000
--- a/src/mongo/db/s/wait_for_majority_service.h
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * Copyright (C) 2019-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <map>
-#include <memory>
-#include <vector>
-
-#include "mongo/db/repl/optime.h"
-#include "mongo/db/service_context.h"
-#include "mongo/executor/task_executor.h"
-#include "mongo/platform/mutex.h"
-#include "mongo/stdx/thread.h"
-#include "mongo/util/future.h"
-
-namespace mongo {
-
-/**
- * Provides a facility for asynchronously waiting a local opTime to be majority committed.
- */
-class WaitForMajorityService {
-public:
- ~WaitForMajorityService();
-
- static WaitForMajorityService& get(ServiceContext* service);
-
- /**
- * Sets up the background thread responsible for waiting for opTimes to be majority committed.
- */
- void setUp(ServiceContext* service);
-
- /**
- * Blocking method, which shuts down and joins the background thread.
- */
- void shutDown();
-
- /**
- * Enqueue a request to wait for the given opTime to be majority committed.
- */
- SharedSemiFuture<void> waitUntilMajority(const repl::OpTime& opTime);
-
-private:
- using OpTimeWaitingMap = std::map<repl::OpTime, SharedPromise<void>>;
-
- /**
- * Periodically checks the list of opTimes to wait for majority committed.
- */
- void _periodicallyWaitForMajority(ServiceContext* service);
-
- Mutex _mutex = MONGO_MAKE_LATCH("WaitForMajorityService::_mutex");
-
- // Contains an ordered list of opTimes to wait to be majority comitted.
- OpTimeWaitingMap _queuedOpTimes;
-
- // Contains the last opTime that the background thread was able to successfully wait to be
- // majority comitted.
- repl::OpTime _lastOpTimeWaited;
-
- // The background thread.
- stdx::thread _thread;
-
- // Use for signalling new opTime requests being queued.
- stdx::condition_variable _hasNewOpTimeCV;
-
- // If set, contains a reference to the opCtx being used by the background thread.
- // Only valid when _thread.joinable() and not nullptr.
- OperationContext* _opCtx{nullptr};
-
- // Flag is set to true after shutDown() is called.
- bool _inShutDown{false};
-};
-
-} // namespace mongo
diff --git a/src/mongo/db/s/wait_for_majority_service_test.cpp b/src/mongo/db/s/wait_for_majority_service_test.cpp
deleted file mode 100644
index c9e2a8af3ef..00000000000
--- a/src/mongo/db/s/wait_for_majority_service_test.cpp
+++ /dev/null
@@ -1,271 +0,0 @@
-/**
- * Copyright (C) 2019-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/repl/replication_coordinator_mock.h"
-#include "mongo/db/s/wait_for_majority_service.h"
-#include "mongo/db/service_context_d_test_fixture.h"
-#include "mongo/platform/mutex.h"
-#include "mongo/unittest/unittest.h"
-
-namespace mongo {
-namespace {
-
-class WaitForMajorityServiceTest : public ServiceContextMongoDTest {
-public:
- void setUp() override {
- auto service = getServiceContext();
- waitService()->setUp(service);
-
- auto replCoord = std::make_unique<repl::ReplicationCoordinatorMock>(service);
-
- replCoord->setAwaitReplicationReturnValueFunction(
- [this](OperationContext* opCtx, const repl::OpTime& opTime) {
- auto status = waitForWriteConcernStub(opCtx, opTime);
- return repl::ReplicationCoordinator::StatusAndDuration(status, Milliseconds(0));
- });
-
- repl::ReplicationCoordinator::set(service, std::move(replCoord));
- }
-
- void tearDown() override {
- waitService()->shutDown();
- }
-
- WaitForMajorityService* waitService() {
- return &_waitForMajorityService;
- }
-
- void finishWaitingOneOpTime() {
- stdx::unique_lock<Latch> lk(_mutex);
- _isTestReady = true;
- _isTestReadyCV.notify_one();
-
- while (_isTestReady) {
- _finishWaitingOneOpTimeCV.wait(lk);
- }
- }
-
- Status waitForWriteConcernStub(OperationContext* opCtx, const repl::OpTime& opTime) {
- stdx::unique_lock<Latch> lk(_mutex);
-
- _waitForMajorityCallCount++;
- _callCountChangedCV.notify_one();
-
- try {
- opCtx->waitForConditionOrInterrupt(_isTestReadyCV, lk, [&] { return _isTestReady; });
- } catch (const DBException& e) {
- _isTestReady = false;
- _finishWaitingOneOpTimeCV.notify_one();
-
- return e.toStatus();
- }
-
- _lastOpTimeWaited = opTime;
- _isTestReady = false;
- _finishWaitingOneOpTimeCV.notify_one();
-
- return Status::OK();
- }
-
- const repl::OpTime& getLastOpTimeWaited() {
- stdx::lock_guard<Latch> lk(_mutex);
- return _lastOpTimeWaited;
- }
-
- void waitForMajorityCallCountGreaterThan(int expectedCount) {
- stdx::unique_lock lk(_mutex);
- _callCountChangedCV.wait(lk, [&] { return _waitForMajorityCallCount > expectedCount; });
- }
-
-private:
- WaitForMajorityService _waitForMajorityService;
-
- Mutex _mutex = MONGO_MAKE_LATCH("WaitForMajorityServiceTest::_mutex");
- stdx::condition_variable _isTestReadyCV;
- stdx::condition_variable _finishWaitingOneOpTimeCV;
- stdx::condition_variable _callCountChangedCV;
-
- bool _isTestReady{false};
- repl::OpTime _lastOpTimeWaited;
- int _waitForMajorityCallCount{0};
-};
-
-TEST_F(WaitForMajorityServiceTest, WaitOneOpTime) {
- repl::OpTime t1(Timestamp(1, 0), 2);
-
- auto future = waitService()->waitUntilMajority(t1);
-
- ASSERT_FALSE(future.isReady());
-
- finishWaitingOneOpTime();
-
- future.get();
- ASSERT_EQ(t1, getLastOpTimeWaited());
-}
-
-TEST_F(WaitForMajorityServiceTest, WaitWithSameOpTime) {
- repl::OpTime t1(Timestamp(1, 0), 2);
-
- auto future1 = waitService()->waitUntilMajority(t1);
- auto future1b = waitService()->waitUntilMajority(t1);
-
- ASSERT_FALSE(future1.isReady());
- ASSERT_FALSE(future1b.isReady());
-
- finishWaitingOneOpTime();
-
- future1.get();
- future1b.get();
-
- ASSERT_EQ(t1, getLastOpTimeWaited());
-}
-
-TEST_F(WaitForMajorityServiceTest, WaitWithOpTimeEarlierThanLowestQueued) {
- repl::OpTime laterOpTime(Timestamp(6, 0), 2);
- repl::OpTime earlierOpTime(Timestamp(1, 0), 2);
-
- auto laterFuture = waitService()->waitUntilMajority(laterOpTime);
-
- // Wait until the background thread picks up the queued opTime.
- waitForMajorityCallCountGreaterThan(0);
-
- // The 2nd request has an earlier time, so it will interrupt 'laterOpTime' and skip the line.
- auto earlierFuture = waitService()->waitUntilMajority(earlierOpTime);
-
- // Wait for background thread to finish transitioning from waiting on laterOpTime to
- // earlierOpTime.
- waitForMajorityCallCountGreaterThan(1);
-
- ASSERT_FALSE(laterFuture.isReady());
- ASSERT_FALSE(earlierFuture.isReady());
-
- finishWaitingOneOpTime();
-
- ASSERT_FALSE(laterFuture.isReady());
-
- earlierFuture.get();
- ASSERT_EQ(earlierOpTime, getLastOpTimeWaited());
-
- finishWaitingOneOpTime();
- laterFuture.get();
-
- ASSERT_EQ(laterOpTime, getLastOpTimeWaited());
-}
-
-TEST_F(WaitForMajorityServiceTest, WaitWithDifferentOpTime) {
- repl::OpTime t1(Timestamp(1, 0), 2);
- repl::OpTime t2(Timestamp(14, 0), 2);
-
- auto future1 = waitService()->waitUntilMajority(t1);
- auto future2 = waitService()->waitUntilMajority(t2);
-
- ASSERT_FALSE(future1.isReady());
- ASSERT_FALSE(future2.isReady());
-
- finishWaitingOneOpTime();
-
- ASSERT_FALSE(future2.isReady());
-
- future1.get();
- ASSERT_EQ(t1, getLastOpTimeWaited());
-
- finishWaitingOneOpTime();
-
- future2.get();
- ASSERT_EQ(t2, getLastOpTimeWaited());
-}
-
-TEST_F(WaitForMajorityServiceTest, WaitWithOpTimeEarlierThanOpTimeAlreadyWaited) {
- repl::OpTime t1(Timestamp(5, 0), 2);
- repl::OpTime t2(Timestamp(14, 0), 2);
-
- auto future1 = waitService()->waitUntilMajority(t1);
- auto future2 = waitService()->waitUntilMajority(t2);
-
- ASSERT_FALSE(future1.isReady());
- ASSERT_FALSE(future2.isReady());
-
- finishWaitingOneOpTime();
-
- ASSERT_FALSE(future2.isReady());
-
- future1.get();
- ASSERT_EQ(t1, getLastOpTimeWaited());
-
- repl::OpTime oldTs(Timestamp(4, 0), 2);
- auto oldFuture = waitService()->waitUntilMajority(oldTs);
- auto alreadyWaitedFuture = waitService()->waitUntilMajority(t1);
-
- ASSERT_FALSE(future2.isReady());
-
- oldFuture.get();
- alreadyWaitedFuture.get();
- ASSERT_EQ(t1, getLastOpTimeWaited());
-
- finishWaitingOneOpTime();
-
- future2.get();
- ASSERT_EQ(t2, getLastOpTimeWaited());
-}
-
-TEST_F(WaitForMajorityServiceTest, ShutdownShouldCancelQueuedRequests) {
- repl::OpTime t1(Timestamp(5, 0), 2);
- repl::OpTime t2(Timestamp(14, 0), 2);
-
- auto future1 = waitService()->waitUntilMajority(t1);
- auto future2 = waitService()->waitUntilMajority(t2);
-
- ASSERT_FALSE(future1.isReady());
- ASSERT_FALSE(future2.isReady());
-
- waitService()->shutDown();
-
- ASSERT_THROWS_CODE(future1.get(), AssertionException, ErrorCodes::InterruptedAtShutdown);
- ASSERT_THROWS_CODE(future2.get(), AssertionException, ErrorCodes::InterruptedAtShutdown);
-}
-
-TEST_F(WaitForMajorityServiceTest, WriteConcernErrorGetsPropagatedCorrectly) {
- repl::OpTime t(Timestamp(5, 0), 2);
-
- auto replCoord = dynamic_cast<repl::ReplicationCoordinatorMock*>(
- repl::ReplicationCoordinator::get(getServiceContext()));
- replCoord->setAwaitReplicationReturnValueFunction(
- [this](OperationContext* opCtx, const repl::OpTime& opTime) {
- return repl::ReplicationCoordinator::StatusAndDuration(
- {ErrorCodes::PrimarySteppedDown, "test stepdown"}, Milliseconds(0));
- });
-
- auto future = waitService()->waitUntilMajority(t);
- ASSERT_THROWS_CODE(future.get(), AssertionException, ErrorCodes::PrimarySteppedDown);
-}
-
-} // namespace
-} // namespace mongo