From f4cce647d9bdd3e988a5d514ec64cd4deb9f7a26 Mon Sep 17 00:00:00 2001 From: Andy Schwerin Date: Fri, 24 Mar 2017 17:59:14 -0400 Subject: SERVER-28491 Re-host unit tests that used ReplicationExecutorTest onto ThreadPoolExecutorTest. --- src/mongo/db/repl/SConscript | 59 ++- .../db/repl/check_quorum_for_config_change.cpp | 10 +- src/mongo/db/repl/check_quorum_for_config_change.h | 10 +- .../repl/check_quorum_for_config_change_test.cpp | 301 +++++++-------- src/mongo/db/repl/elect_cmd_runner.cpp | 4 +- src/mongo/db/repl/elect_cmd_runner.h | 8 +- src/mongo/db/repl/elect_cmd_runner_test.cpp | 140 +++---- src/mongo/db/repl/freshness_checker.cpp | 7 +- src/mongo/db/repl/freshness_checker.h | 14 +- src/mongo/db/repl/freshness_checker_test.cpp | 425 ++++++++++----------- src/mongo/db/repl/freshness_scanner.cpp | 12 +- src/mongo/db/repl/freshness_scanner.h | 12 +- src/mongo/db/repl/freshness_scanner_test.cpp | 66 ++-- src/mongo/db/repl/noop_writer_test.cpp | 90 ----- src/mongo/db/repl/rollback_checker_test.cpp | 32 +- src/mongo/db/repl/scatter_gather_algorithm.h | 8 +- src/mongo/db/repl/scatter_gather_runner.cpp | 21 +- src/mongo/db/repl/scatter_gather_runner.h | 25 +- src/mongo/db/repl/scatter_gather_test.cpp | 167 ++++---- src/mongo/db/repl/task_runner.cpp | 4 +- 20 files changed, 613 insertions(+), 802 deletions(-) delete mode 100644 src/mongo/db/repl/noop_writer_test.cpp (limited to 'src/mongo/db/repl') diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 08503eb92bd..7931b0d082f 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -125,40 +125,40 @@ env.Library( target='replication_executor', source=[ 'replication_executor.cpp', - 'scatter_gather_algorithm.cpp', - 'scatter_gather_runner.cpp', ], LIBDEPS=[ 'database_task', 'task_runner', '$BUILD_DIR/mongo/executor/network_interface', '$BUILD_DIR/mongo/executor/task_executor_interface', - '$BUILD_DIR/mongo/util/net/hostandport', ], ) env.Library( - target='replication_executor_test_fixture', + target='scatter_gather', source=[ - 'replication_executor_test_fixture.cpp', - ], + 'scatter_gather_algorithm.cpp', + 'scatter_gather_runner.cpp', + ], LIBDEPS=[ - 'replication_executor', - 'replmocks', - 'service_context_repl_mock_init', - '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', - '$BUILD_DIR/mongo/executor/task_executor_test_fixture', + '$BUILD_DIR/mongo/executor/task_executor_interface', ], ) + env.CppUnitTest( target='replication_executor_test', source=[ 'replication_executor_test.cpp', + 'replication_executor_test_fixture.cpp', ], LIBDEPS=[ - 'replication_executor_test_fixture', - '$BUILD_DIR/mongo/db/bson/dotted_path_support', + 'replication_executor', + 'replmocks', + 'service_context_repl_mock_init', + '$BUILD_DIR/mongo/db/auth/authorization_manager_global', + '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', + '$BUILD_DIR/mongo/executor/task_executor_test_fixture', '$BUILD_DIR/mongo/unittest/concurrency', ], ) @@ -456,7 +456,6 @@ env.Library('topology_coordinator_impl', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/audit', - 'replication_executor', 'replica_set_messages', 'repl_settings', 'rslog', @@ -509,6 +508,7 @@ env.Library('repl_coordinator_impl', 'vote_requester.cpp', ], LIBDEPS=[ + '$BUILD_DIR/mongo/db/commands/test_commands_enabled', '$BUILD_DIR/mongo/db/common', '$BUILD_DIR/mongo/db/index/index_descriptor', '$BUILD_DIR/mongo/db/server_options_core', @@ -527,6 +527,7 @@ env.Library('repl_coordinator_impl', 'replication_executor', 'reporter', 'rslog', + 'scatter_gather', 'topology_coordinator', ]) @@ -540,6 +541,7 @@ env.Library( 'replmocks', 'service_context_repl_mock_init', 'topology_coordinator_impl', + '$BUILD_DIR/mongo/db/auth/authorization_manager_global', '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', '$BUILD_DIR/mongo/unittest/unittest', ], @@ -575,10 +577,8 @@ env.CppUnitTest( 'scatter_gather_test.cpp', ], LIBDEPS=[ - 'repl_coordinator_impl', - 'replmocks', - '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', - '$BUILD_DIR/mongo/db/service_context_noop_init', + 'scatter_gather', + '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture', ], ) @@ -589,10 +589,8 @@ env.CppUnitTest( ], LIBDEPS=[ 'repl_coordinator_impl', - 'replication_executor', 'replmocks', - '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', - '$BUILD_DIR/mongo/db/service_context_noop_init', + '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture', ], ) @@ -605,8 +603,7 @@ env.CppUnitTest( 'repl_coordinator_impl', 'replica_set_messages', 'replmocks', - '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', - '$BUILD_DIR/mongo/db/service_context_noop_init', + '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture', ], ) @@ -634,8 +631,7 @@ env.CppUnitTest( 'repl_coordinator_impl', 'replica_set_messages', 'replmocks', - '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', - '$BUILD_DIR/mongo/db/service_context_noop_init', + '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture', ], ) @@ -687,7 +683,6 @@ env.Library( 'repl_coordinator_interface', 'repl_settings', 'replica_set_messages', - 'replication_executor', 'storage_interface', '$BUILD_DIR/mongo/db/concurrency/lock_manager', '$BUILD_DIR/mongo/db/service_context', @@ -892,6 +887,7 @@ env.Library( LIBDEPS=[ 'replmocks', '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture', + '$BUILD_DIR/mongo/util/concurrency/thread_pool', ], ) @@ -920,6 +916,7 @@ env.CppUnitTest( 'collection_cloner', 'base_cloner_test_fixture', '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', + '$BUILD_DIR/mongo/db/auth/authorization_manager_global', '$BUILD_DIR/mongo/db/service_context_noop_init', '$BUILD_DIR/mongo/unittest/task_executor_proxy', ], @@ -981,7 +978,6 @@ env.Library( ], LIBDEPS=[ '$BUILD_DIR/mongo/db/auth/authcore', - '$BUILD_DIR/mongo/db/auth/authorization_manager_global', '$BUILD_DIR/mongo/util/concurrency/thread_pool', ], ) @@ -996,6 +992,7 @@ env.Library( 'service_context_repl_mock_init', 'task_runner', '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', + '$BUILD_DIR/mongo/db/auth/authorization_manager_global', '$BUILD_DIR/mongo/db/service_context', '$BUILD_DIR/mongo/unittest/concurrency', '$BUILD_DIR/mongo/unittest/unittest', @@ -1153,7 +1150,8 @@ env.CppUnitTest( 'base_cloner_test_fixture', 'initial_syncer', 'data_replicator_external_state_mock', - 'replication_executor_test_fixture', + 'replmocks', + 'service_context_repl_mock_init', 'sync_source_selector_mock', 'task_executor_mock', '$BUILD_DIR/mongo/db/query/command_request_response', @@ -1178,9 +1176,9 @@ env.CppUnitTest( 'rollback_checker_test.cpp', ], LIBDEPS=[ - 'replication_executor_test_fixture', 'rollback_checker', '$BUILD_DIR/mongo/unittest/concurrency', + '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture', ], ) @@ -1214,8 +1212,7 @@ env.CppUnitTest( 'repl_coordinator_impl', 'replica_set_messages', 'replmocks', - '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', - '$BUILD_DIR/mongo/db/service_context_noop_init', + '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture', ], ) diff --git a/src/mongo/db/repl/check_quorum_for_config_change.cpp b/src/mongo/db/repl/check_quorum_for_config_change.cpp index 1dd1fe3ebbb..a1be65dbaf3 100644 --- a/src/mongo/db/repl/check_quorum_for_config_change.cpp +++ b/src/mongo/db/repl/check_quorum_for_config_change.cpp @@ -111,7 +111,7 @@ std::vector QuorumChecker::getRequests() const { } void QuorumChecker::processResponse(const RemoteCommandRequest& request, - const ResponseStatus& response) { + const executor::RemoteCommandResponse& response) { _tabulateHeartbeatResponse(request, response); if (hasReceivedSufficientResponses()) { _onQuorumCheckComplete(); @@ -177,7 +177,7 @@ void QuorumChecker::_onQuorumCheckComplete() { } void QuorumChecker::_tabulateHeartbeatResponse(const RemoteCommandRequest& request, - const ResponseStatus& response) { + const executor::RemoteCommandResponse& response) { ++_numResponses; if (!response.isOK()) { warning() << "Failed to complete heartbeat request to " << request.target << "; " @@ -279,7 +279,7 @@ bool QuorumChecker::hasReceivedSufficientResponses() const { return true; } -Status checkQuorumGeneral(ReplicationExecutor* executor, +Status checkQuorumGeneral(executor::TaskExecutor* executor, const ReplSetConfig& rsConfig, const int myIndex) { QuorumChecker checker(&rsConfig, myIndex); @@ -292,14 +292,14 @@ Status checkQuorumGeneral(ReplicationExecutor* executor, return checker.getFinalStatus(); } -Status checkQuorumForInitiate(ReplicationExecutor* executor, +Status checkQuorumForInitiate(executor::TaskExecutor* executor, const ReplSetConfig& rsConfig, const int myIndex) { invariant(rsConfig.getConfigVersion() == 1); return checkQuorumGeneral(executor, rsConfig, myIndex); } -Status checkQuorumForReconfig(ReplicationExecutor* executor, +Status checkQuorumForReconfig(executor::TaskExecutor* executor, const ReplSetConfig& rsConfig, const int myIndex) { invariant(rsConfig.getConfigVersion() > 1); diff --git a/src/mongo/db/repl/check_quorum_for_config_change.h b/src/mongo/db/repl/check_quorum_for_config_change.h index 746a21d5ec9..2bc9a289464 100644 --- a/src/mongo/db/repl/check_quorum_for_config_change.h +++ b/src/mongo/db/repl/check_quorum_for_config_change.h @@ -29,8 +29,8 @@ #pragma once #include "mongo/base/status.h" -#include "mongo/db/repl/replication_executor.h" #include "mongo/db/repl/scatter_gather_algorithm.h" +#include "mongo/executor/task_executor.h" namespace mongo { namespace repl { @@ -62,7 +62,7 @@ public: virtual std::vector getRequests() const; virtual void processResponse(const executor::RemoteCommandRequest& request, - const ResponseStatus& response); + const executor::RemoteCommandResponse& response); virtual bool hasReceivedSufficientResponses() const; @@ -83,7 +83,7 @@ private: * Updates the QuorumChecker state based on the data from a single heartbeat response. */ void _tabulateHeartbeatResponse(const executor::RemoteCommandRequest& request, - const ResponseStatus& response); + const executor::RemoteCommandResponse& response); // Pointer to the replica set configuration for which we're checking quorum. const ReplSetConfig* const _rsConfig; @@ -126,7 +126,7 @@ private: * - No nodes are already joined to a replica set. * - No node reports a replica set name other than the one in "rsConfig". */ -Status checkQuorumForInitiate(ReplicationExecutor* executor, +Status checkQuorumForInitiate(executor::TaskExecutor* executor, const ReplSetConfig& rsConfig, const int myIndex); @@ -144,7 +144,7 @@ Status checkQuorumForInitiate(ReplicationExecutor* executor, * - No responding node reports a replica set name other than the one in "rsConfig". * - All responding nodes report a config version less than the one in "rsConfig". */ -Status checkQuorumForReconfig(ReplicationExecutor* executor, +Status checkQuorumForReconfig(executor::TaskExecutor* executor, const ReplSetConfig& rsConfig, const int myIndex); diff --git a/src/mongo/db/repl/check_quorum_for_config_change_test.cpp b/src/mongo/db/repl/check_quorum_for_config_change_test.cpp index dc1ac7c722e..230a13f43ae 100644 --- a/src/mongo/db/repl/check_quorum_for_config_change_test.cpp +++ b/src/mongo/db/repl/check_quorum_for_config_change_test.cpp @@ -36,8 +36,8 @@ #include "mongo/db/repl/repl_set_config.h" #include "mongo/db/repl/repl_set_heartbeat_args.h" #include "mongo/db/repl/repl_set_heartbeat_response.h" -#include "mongo/db/repl/replication_executor.h" #include "mongo/executor/network_interface_mock.h" +#include "mongo/executor/thread_pool_task_executor_test_fixture.h" #include "mongo/platform/unordered_set.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/stdx/functional.h" @@ -68,7 +68,7 @@ using executor::NetworkInterfaceMock; using executor::RemoteCommandRequest; using executor::RemoteCommandResponse; -class CheckQuorumTest : public mongo::unittest::Test { +class CheckQuorumTest : public executor::ThreadPoolExecutorTest { protected: CheckQuorumTest(); @@ -76,17 +76,14 @@ protected: Status waitForQuorumCheck(); bool isQuorumCheckDone(); - NetworkInterfaceMock* _net; - std::unique_ptr _executor; - private: - void setUp(); - void tearDown(); - + void setUp() { + executor::ThreadPoolExecutorTest::setUp(); + launchExecutorThread(); + } void _runQuorumCheck(const ReplSetConfig& config, int myIndex); virtual Status _runQuorumCheckImpl(const ReplSetConfig& config, int myIndex) = 0; - std::unique_ptr _executorThread; std::unique_ptr _quorumCheckThread; Status _quorumCheckStatus; stdx::mutex _mutex; @@ -96,18 +93,6 @@ private: CheckQuorumTest::CheckQuorumTest() : _quorumCheckStatus(ErrorCodes::InternalError, "Not executed") {} -void CheckQuorumTest::setUp() { - auto net = stdx::make_unique(); - _net = net.get(); - _executor = stdx::make_unique(std::move(net), 1 /* prng seed */); - _executorThread.reset(new stdx::thread(stdx::bind(&ReplicationExecutor::run, _executor.get()))); -} - -void CheckQuorumTest::tearDown() { - _executor->shutdown(); - _executorThread->join(); -} - void CheckQuorumTest::startQuorumCheck(const ReplSetConfig& config, int myIndex) { ASSERT_FALSE(_quorumCheckThread); _isQuorumCheckDone = false; @@ -135,14 +120,14 @@ void CheckQuorumTest::_runQuorumCheck(const ReplSetConfig& config, int myIndex) class CheckQuorumForInitiate : public CheckQuorumTest { private: virtual Status _runQuorumCheckImpl(const ReplSetConfig& config, int myIndex) { - return checkQuorumForInitiate(_executor.get(), config, myIndex); + return checkQuorumForInitiate(&getExecutor(), config, myIndex); } }; class CheckQuorumForReconfig : public CheckQuorumTest { protected: virtual Status _runQuorumCheckImpl(const ReplSetConfig& config, int myIndex) { - return checkQuorumForReconfig(_executor.get(), config, myIndex); + return checkQuorumForReconfig(&getExecutor(), config, myIndex); } }; @@ -166,7 +151,7 @@ TEST_F(CheckQuorumForInitiate, ValidSingleNodeSet) { } TEST_F(CheckQuorumForInitiate, QuorumCheckCanceledByShutdown) { - _executor->shutdown(); + getExecutor().shutdown(); ReplSetConfig config = assertMakeRSConfig(BSON("_id" << "rs0" << "version" @@ -198,17 +183,17 @@ TEST_F(CheckQuorumForInitiate, QuorumCheckFailedDueToSeveralDownNodes) { << BSON("_id" << 5 << "host" << "h5:1")))); startQuorumCheck(config, 2); - _net->enterNetwork(); - const Date_t startDate = _net->now(); + getNet()->enterNetwork(); + const Date_t startDate = getNet()->now(); const int numCommandsExpected = config.getNumMembers() - 1; for (int i = 0; i < numCommandsExpected; ++i) { - _net->scheduleResponse(_net->getNextReadyRequest(), - startDate + Milliseconds(10), - {ErrorCodes::NoSuchKey, "No reply"}); + getNet()->scheduleResponse(getNet()->getNextReadyRequest(), + startDate + Milliseconds(10), + {ErrorCodes::NoSuchKey, "No reply"}); } - _net->runUntil(startDate + Milliseconds(10)); - _net->exitNetwork(); - ASSERT_EQUALS(startDate + Milliseconds(10), _net->now()); + getNet()->runUntil(startDate + Milliseconds(10)); + getNet()->exitNetwork(); + ASSERT_EQUALS(startDate + Milliseconds(10), getNet()->now()); Status status = waitForQuorumCheck(); ASSERT_EQUALS(ErrorCodes::NodeNotFound, status); ASSERT_REASON_CONTAINS( @@ -256,24 +241,24 @@ TEST_F(CheckQuorumForInitiate, QuorumCheckSuccessForFiveNodes) { const BSONObj hbRequest = makeHeartbeatRequest(rsConfig, myConfigIndex); startQuorumCheck(rsConfig, myConfigIndex); - const Date_t startDate = _net->now(); + const Date_t startDate = getNet()->now(); const int numCommandsExpected = rsConfig.getNumMembers() - 1; unordered_set seenHosts; - _net->enterNetwork(); + getNet()->enterNetwork(); for (int i = 0; i < numCommandsExpected; ++i) { - const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest(); + const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest(); const RemoteCommandRequest& request = noi->getRequest(); ASSERT_EQUALS("admin", request.dbname); ASSERT_BSONOBJ_EQ(hbRequest, request.cmdObj); ASSERT(seenHosts.insert(request.target).second) << "Already saw " << request.target.toString(); - _net->scheduleResponse( + getNet()->scheduleResponse( noi, startDate + Milliseconds(10), - ResponseStatus(RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(8)))); + (RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(8)))); } - _net->runUntil(startDate + Milliseconds(10)); - _net->exitNetwork(); + getNet()->runUntil(startDate + Milliseconds(10)); + getNet()->exitNetwork(); ASSERT_OK(waitForQuorumCheck()); } @@ -309,29 +294,29 @@ TEST_F(CheckQuorumForInitiate, QuorumCheckFailedDueToOneDownNode) { const BSONObj hbRequest = makeHeartbeatRequest(rsConfig, myConfigIndex); startQuorumCheck(rsConfig, myConfigIndex); - const Date_t startDate = _net->now(); + const Date_t startDate = getNet()->now(); const int numCommandsExpected = rsConfig.getNumMembers() - 1; unordered_set seenHosts; - _net->enterNetwork(); + getNet()->enterNetwork(); for (int i = 0; i < numCommandsExpected; ++i) { - const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest(); + const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest(); const RemoteCommandRequest& request = noi->getRequest(); ASSERT_EQUALS("admin", request.dbname); ASSERT_BSONOBJ_EQ(hbRequest, request.cmdObj); ASSERT(seenHosts.insert(request.target).second) << "Already saw " << request.target.toString(); if (request.target == HostAndPort("h2", 1)) { - _net->scheduleResponse( + getNet()->scheduleResponse( noi, startDate + Milliseconds(10), {ErrorCodes::NoSuchKey, "No response"}); } else { - _net->scheduleResponse( + getNet()->scheduleResponse( noi, startDate + Milliseconds(10), - ResponseStatus(RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(8)))); + (RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(8)))); } } - _net->runUntil(startDate + Milliseconds(10)); - _net->exitNetwork(); + getNet()->runUntil(startDate + Milliseconds(10)); + getNet()->exitNetwork(); Status status = waitForQuorumCheck(); ASSERT_EQUALS(ErrorCodes::NodeNotFound, status); ASSERT_REASON_CONTAINS( @@ -369,32 +354,32 @@ TEST_F(CheckQuorumForInitiate, QuorumCheckFailedDueToSetNameMismatch) { const BSONObj hbRequest = makeHeartbeatRequest(rsConfig, myConfigIndex); startQuorumCheck(rsConfig, myConfigIndex); - const Date_t startDate = _net->now(); + const Date_t startDate = getNet()->now(); const int numCommandsExpected = rsConfig.getNumMembers() - 1; unordered_set seenHosts; - _net->enterNetwork(); + getNet()->enterNetwork(); for (int i = 0; i < numCommandsExpected; ++i) { - const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest(); + const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest(); const RemoteCommandRequest& request = noi->getRequest(); ASSERT_EQUALS("admin", request.dbname); ASSERT_BSONOBJ_EQ(hbRequest, request.cmdObj); ASSERT(seenHosts.insert(request.target).second) << "Already saw " << request.target.toString(); if (request.target == HostAndPort("h4", 1)) { - _net->scheduleResponse( - noi, - startDate + Milliseconds(10), - ResponseStatus(RemoteCommandResponse( - BSON("ok" << 0 << "mismatch" << true), BSONObj(), Milliseconds(8)))); + getNet()->scheduleResponse(noi, + startDate + Milliseconds(10), + (RemoteCommandResponse(BSON("ok" << 0 << "mismatch" << true), + BSONObj(), + Milliseconds(8)))); } else { - _net->scheduleResponse( + getNet()->scheduleResponse( noi, startDate + Milliseconds(10), - ResponseStatus(RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(8)))); + (RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(8)))); } } - _net->runUntil(startDate + Milliseconds(10)); - _net->exitNetwork(); + getNet()->runUntil(startDate + Milliseconds(10)); + getNet()->exitNetwork(); Status status = waitForQuorumCheck(); ASSERT_EQUALS(ErrorCodes::NewReplicaSetConfigurationIncompatible, status); ASSERT_REASON_CONTAINS(status, "Our set name did not match"); @@ -433,14 +418,14 @@ TEST_F(CheckQuorumForInitiate, QuorumCheckFailedDueToSetIdMismatch) { const BSONObj hbRequest = makeHeartbeatRequest(rsConfig, myConfigIndex); startQuorumCheck(rsConfig, myConfigIndex); - const Date_t startDate = _net->now(); + const Date_t startDate = getNet()->now(); const int numCommandsExpected = rsConfig.getNumMembers() - 1; unordered_set seenHosts; - _net->enterNetwork(); + getNet()->enterNetwork(); HostAndPort incompatibleHost("h4", 1); OID unexpectedId = OID::gen(); for (int i = 0; i < numCommandsExpected; ++i) { - const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest(); + const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest(); const RemoteCommandRequest& request = noi->getRequest(); ASSERT_EQUALS("admin", request.dbname); ASSERT_BSONOBJ_EQ(hbRequest, request.cmdObj); @@ -459,19 +444,19 @@ TEST_F(CheckQuorumForInitiate, QuorumCheckFailedDueToSetIdMismatch) { BSONObjBuilder metadataBuilder; metadata.writeToMetadata(&metadataBuilder); - _net->scheduleResponse(noi, - startDate + Milliseconds(10), - ResponseStatus(RemoteCommandResponse( - BSON("ok" << 1), metadataBuilder.obj(), Milliseconds(8)))); + getNet()->scheduleResponse( + noi, + startDate + Milliseconds(10), + (RemoteCommandResponse(BSON("ok" << 1), metadataBuilder.obj(), Milliseconds(8)))); } else { - _net->scheduleResponse( + getNet()->scheduleResponse( noi, startDate + Milliseconds(10), - ResponseStatus(RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(8)))); + (RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(8)))); } } - _net->runUntil(startDate + Milliseconds(10)); - _net->exitNetwork(); + getNet()->runUntil(startDate + Milliseconds(10)); + getNet()->exitNetwork(); Status status = waitForQuorumCheck(); ASSERT_EQUALS(ErrorCodes::NewReplicaSetConfigurationIncompatible, status); ASSERT_REASON_CONTAINS(status, @@ -512,35 +497,35 @@ TEST_F(CheckQuorumForInitiate, QuorumCheckFailedDueToInitializedNode) { const BSONObj hbRequest = makeHeartbeatRequest(rsConfig, myConfigIndex); startQuorumCheck(rsConfig, myConfigIndex); - const Date_t startDate = _net->now(); + const Date_t startDate = getNet()->now(); const int numCommandsExpected = rsConfig.getNumMembers() - 1; unordered_set seenHosts; - _net->enterNetwork(); + getNet()->enterNetwork(); for (int i = 0; i < numCommandsExpected; ++i) { - const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest(); + const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest(); const RemoteCommandRequest& request = noi->getRequest(); ASSERT_EQUALS("admin", request.dbname); ASSERT_BSONOBJ_EQ(hbRequest, request.cmdObj); ASSERT(seenHosts.insert(request.target).second) << "Already saw " << request.target.toString(); if (request.target == HostAndPort("h5", 1)) { - _net->scheduleResponse(noi, - startDate + Milliseconds(10), - ResponseStatus(RemoteCommandResponse(BSON("ok" << 0 << "set" - << "rs0" - << "v" - << 1), - BSONObj(), - Milliseconds(8)))); + getNet()->scheduleResponse(noi, + startDate + Milliseconds(10), + (RemoteCommandResponse(BSON("ok" << 0 << "set" + << "rs0" + << "v" + << 1), + BSONObj(), + Milliseconds(8)))); } else { - _net->scheduleResponse( + getNet()->scheduleResponse( noi, startDate + Milliseconds(10), - ResponseStatus(RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(8)))); + (RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(8)))); } } - _net->runUntil(startDate + Milliseconds(10)); - _net->exitNetwork(); + getNet()->runUntil(startDate + Milliseconds(10)); + getNet()->exitNetwork(); Status status = waitForQuorumCheck(); ASSERT_EQUALS(ErrorCodes::NewReplicaSetConfigurationIncompatible, status); ASSERT_REASON_CONTAINS(status, "Our config version of"); @@ -578,32 +563,32 @@ TEST_F(CheckQuorumForInitiate, QuorumCheckFailedDueToInitializedNodeOnlyOneRespo const BSONObj hbRequest = makeHeartbeatRequest(rsConfig, myConfigIndex); startQuorumCheck(rsConfig, myConfigIndex); - const Date_t startDate = _net->now(); + const Date_t startDate = getNet()->now(); const int numCommandsExpected = rsConfig.getNumMembers() - 1; unordered_set seenHosts; - _net->enterNetwork(); + getNet()->enterNetwork(); for (int i = 0; i < numCommandsExpected; ++i) { - const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest(); + const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest(); const RemoteCommandRequest& request = noi->getRequest(); ASSERT_EQUALS("admin", request.dbname); ASSERT_BSONOBJ_EQ(hbRequest, request.cmdObj); ASSERT(seenHosts.insert(request.target).second) << "Already saw " << request.target.toString(); if (request.target == HostAndPort("h5", 1)) { - _net->scheduleResponse(noi, - startDate + Milliseconds(10), - ResponseStatus(RemoteCommandResponse(BSON("ok" << 0 << "set" - << "rs0" - << "v" - << 1), - BSONObj(), - Milliseconds(8)))); + getNet()->scheduleResponse(noi, + startDate + Milliseconds(10), + (RemoteCommandResponse(BSON("ok" << 0 << "set" + << "rs0" + << "v" + << 1), + BSONObj(), + Milliseconds(8)))); } else { - _net->blackHole(noi); + getNet()->blackHole(noi); } } - _net->runUntil(startDate + Milliseconds(10)); - _net->exitNetwork(); + getNet()->runUntil(startDate + Milliseconds(10)); + getNet()->exitNetwork(); Status status = waitForQuorumCheck(); ASSERT_EQUALS(ErrorCodes::NewReplicaSetConfigurationIncompatible, status); ASSERT_REASON_CONTAINS(status, "Our config version of"); @@ -639,12 +624,12 @@ TEST_F(CheckQuorumForInitiate, QuorumCheckFailedDueToNodeWithData) { const BSONObj hbRequest = makeHeartbeatRequest(rsConfig, myConfigIndex); startQuorumCheck(rsConfig, myConfigIndex); - const Date_t startDate = _net->now(); + const Date_t startDate = getNet()->now(); const int numCommandsExpected = rsConfig.getNumMembers() - 1; unordered_set seenHosts; - _net->enterNetwork(); + getNet()->enterNetwork(); for (int i = 0; i < numCommandsExpected; ++i) { - const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest(); + const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest(); const RemoteCommandRequest& request = noi->getRequest(); ASSERT_EQUALS("admin", request.dbname); ASSERT_BSONOBJ_EQ(hbRequest, request.cmdObj); @@ -654,16 +639,16 @@ TEST_F(CheckQuorumForInitiate, QuorumCheckFailedDueToNodeWithData) { hbResp.setConfigVersion(0); hbResp.noteHasData(); if (request.target == HostAndPort("h5", 1)) { - _net->scheduleResponse(noi, - startDate + Milliseconds(10), - ResponseStatus(RemoteCommandResponse( - hbResp.toBSON(false), BSONObj(), Milliseconds(8)))); + getNet()->scheduleResponse( + noi, + startDate + Milliseconds(10), + (RemoteCommandResponse(hbResp.toBSON(false), BSONObj(), Milliseconds(8)))); } else { - _net->blackHole(noi); + getNet()->blackHole(noi); } } - _net->runUntil(startDate + Milliseconds(10)); - _net->exitNetwork(); + getNet()->runUntil(startDate + Milliseconds(10)); + getNet()->exitNetwork(); Status status = waitForQuorumCheck(); ASSERT_EQUALS(ErrorCodes::CannotInitializeNodeWithData, status); ASSERT_REASON_CONTAINS(status, "has data already"); @@ -693,32 +678,32 @@ TEST_F(CheckQuorumForReconfig, QuorumCheckVetoedDueToHigherConfigVersion) { const BSONObj hbRequest = makeHeartbeatRequest(rsConfig, myConfigIndex); startQuorumCheck(rsConfig, myConfigIndex); - const Date_t startDate = _net->now(); + const Date_t startDate = getNet()->now(); const int numCommandsExpected = rsConfig.getNumMembers() - 1; unordered_set seenHosts; - _net->enterNetwork(); + getNet()->enterNetwork(); for (int i = 0; i < numCommandsExpected; ++i) { - const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest(); + const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest(); const RemoteCommandRequest& request = noi->getRequest(); ASSERT_EQUALS("admin", request.dbname); ASSERT_BSONOBJ_EQ(hbRequest, request.cmdObj); ASSERT(seenHosts.insert(request.target).second) << "Already saw " << request.target.toString(); if (request.target == HostAndPort("h1", 1)) { - _net->scheduleResponse(noi, - startDate + Milliseconds(10), - ResponseStatus(RemoteCommandResponse(BSON("ok" << 0 << "set" - << "rs0" - << "v" - << 5), - BSONObj(), - Milliseconds(8)))); + getNet()->scheduleResponse(noi, + startDate + Milliseconds(10), + (RemoteCommandResponse(BSON("ok" << 0 << "set" + << "rs0" + << "v" + << 5), + BSONObj(), + Milliseconds(8)))); } else { - _net->blackHole(noi); + getNet()->blackHole(noi); } } - _net->runUntil(startDate + Milliseconds(10)); - _net->exitNetwork(); + getNet()->runUntil(startDate + Milliseconds(10)); + getNet()->exitNetwork(); Status status = waitForQuorumCheck(); ASSERT_EQUALS(ErrorCodes::NewReplicaSetConfigurationIncompatible, status); ASSERT_REASON_CONTAINS(status, "Our config version of"); @@ -748,30 +733,30 @@ TEST_F(CheckQuorumForReconfig, QuorumCheckVetoedDueToIncompatibleSetName) { const BSONObj hbRequest = makeHeartbeatRequest(rsConfig, myConfigIndex); startQuorumCheck(rsConfig, myConfigIndex); - const Date_t startDate = _net->now(); + const Date_t startDate = getNet()->now(); const int numCommandsExpected = rsConfig.getNumMembers() - 1; unordered_set seenHosts; - _net->enterNetwork(); + getNet()->enterNetwork(); for (int i = 0; i < numCommandsExpected; ++i) { - const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest(); + const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest(); const RemoteCommandRequest& request = noi->getRequest(); ASSERT_EQUALS("admin", request.dbname); ASSERT_BSONOBJ_EQ(hbRequest, request.cmdObj); ASSERT(seenHosts.insert(request.target).second) << "Already saw " << request.target.toString(); if (request.target == HostAndPort("h2", 1)) { - _net->scheduleResponse( - noi, - startDate + Milliseconds(10), - ResponseStatus(RemoteCommandResponse( - BSON("ok" << 0 << "mismatch" << true), BSONObj(), Milliseconds(8)))); + getNet()->scheduleResponse(noi, + startDate + Milliseconds(10), + (RemoteCommandResponse(BSON("ok" << 0 << "mismatch" << true), + BSONObj(), + Milliseconds(8)))); } else { - _net->scheduleResponse( + getNet()->scheduleResponse( noi, startDate + Milliseconds(10), {ErrorCodes::NoSuchKey, "No response"}); } } - _net->runUntil(startDate + Milliseconds(10)); - _net->exitNetwork(); + getNet()->runUntil(startDate + Milliseconds(10)); + getNet()->exitNetwork(); Status status = waitForQuorumCheck(); ASSERT_EQUALS(ErrorCodes::NewReplicaSetConfigurationIncompatible, status); ASSERT_REASON_CONTAINS(status, "Our set name did not match"); @@ -814,29 +799,29 @@ TEST_F(CheckQuorumForReconfig, QuorumCheckFailsDueToInsufficientVoters) { const BSONObj hbRequest = makeHeartbeatRequest(rsConfig, myConfigIndex); startQuorumCheck(rsConfig, myConfigIndex); - const Date_t startDate = _net->now(); + const Date_t startDate = getNet()->now(); const int numCommandsExpected = rsConfig.getNumMembers() - 1; unordered_set seenHosts; - _net->enterNetwork(); + getNet()->enterNetwork(); for (int i = 0; i < numCommandsExpected; ++i) { - const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest(); + const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest(); const RemoteCommandRequest& request = noi->getRequest(); ASSERT_EQUALS("admin", request.dbname); ASSERT_BSONOBJ_EQ(hbRequest, request.cmdObj); ASSERT(seenHosts.insert(request.target).second) << "Already saw " << request.target.toString(); if (request.target == HostAndPort("h1", 1) || request.target == HostAndPort("h5", 1)) { - _net->scheduleResponse( + getNet()->scheduleResponse( noi, startDate + Milliseconds(10), - ResponseStatus(RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(8)))); + (RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(8)))); } else { - _net->scheduleResponse( + getNet()->scheduleResponse( noi, startDate + Milliseconds(10), {ErrorCodes::NoSuchKey, "No response"}); } } - _net->runUntil(startDate + Milliseconds(10)); - _net->exitNetwork(); + getNet()->runUntil(startDate + Milliseconds(10)); + getNet()->exitNetwork(); Status status = waitForQuorumCheck(); ASSERT_EQUALS(ErrorCodes::NodeNotFound, status); ASSERT_REASON_CONTAINS(status, "not enough voting nodes responded; required 2 but only"); @@ -875,29 +860,29 @@ TEST_F(CheckQuorumForReconfig, QuorumCheckFailsDueToNoElectableNodeResponding) { const BSONObj hbRequest = makeHeartbeatRequest(rsConfig, myConfigIndex); startQuorumCheck(rsConfig, myConfigIndex); - const Date_t startDate = _net->now(); + const Date_t startDate = getNet()->now(); const int numCommandsExpected = rsConfig.getNumMembers() - 1; unordered_set seenHosts; - _net->enterNetwork(); + getNet()->enterNetwork(); for (int i = 0; i < numCommandsExpected; ++i) { - const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest(); + const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest(); const RemoteCommandRequest& request = noi->getRequest(); ASSERT_EQUALS("admin", request.dbname); ASSERT_BSONOBJ_EQ(hbRequest, request.cmdObj); ASSERT(seenHosts.insert(request.target).second) << "Already saw " << request.target.toString(); if (request.target == HostAndPort("h5", 1)) { - _net->scheduleResponse( + getNet()->scheduleResponse( noi, startDate + Milliseconds(10), - ResponseStatus(RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(8)))); + (RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(8)))); } else { - _net->scheduleResponse( + getNet()->scheduleResponse( noi, startDate + Milliseconds(10), {ErrorCodes::NoSuchKey, "No response"}); } } - _net->runUntil(startDate + Milliseconds(10)); - _net->exitNetwork(); + getNet()->runUntil(startDate + Milliseconds(10)); + getNet()->exitNetwork(); Status status = waitForQuorumCheck(); ASSERT_EQUALS(ErrorCodes::NodeNotFound, status); ASSERT_REASON_CONTAINS(status, "no electable nodes responded"); @@ -936,28 +921,28 @@ TEST_F(CheckQuorumForReconfig, QuorumCheckSucceedsWithAsSoonAsPossible) { const BSONObj hbRequest = makeHeartbeatRequest(rsConfig, myConfigIndex); startQuorumCheck(rsConfig, myConfigIndex); - const Date_t startDate = _net->now(); + const Date_t startDate = getNet()->now(); const int numCommandsExpected = rsConfig.getNumMembers() - 1; unordered_set seenHosts; - _net->enterNetwork(); + getNet()->enterNetwork(); for (int i = 0; i < numCommandsExpected; ++i) { - const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest(); + const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest(); const RemoteCommandRequest& request = noi->getRequest(); ASSERT_EQUALS("admin", request.dbname); ASSERT_BSONOBJ_EQ(hbRequest, request.cmdObj); ASSERT(seenHosts.insert(request.target).second) << "Already saw " << request.target.toString(); if (request.target == HostAndPort("h1", 1) || request.target == HostAndPort("h2", 1)) { - _net->scheduleResponse( + getNet()->scheduleResponse( noi, startDate + Milliseconds(10), - ResponseStatus(RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(8)))); + (RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(8)))); } else { - _net->blackHole(noi); + getNet()->blackHole(noi); } } - _net->runUntil(startDate + Milliseconds(10)); - _net->exitNetwork(); + getNet()->runUntil(startDate + Milliseconds(10)); + getNet()->exitNetwork(); ASSERT_OK(waitForQuorumCheck()); } diff --git a/src/mongo/db/repl/elect_cmd_runner.cpp b/src/mongo/db/repl/elect_cmd_runner.cpp index 6f155213861..ef6d9c722d3 100644 --- a/src/mongo/db/repl/elect_cmd_runner.cpp +++ b/src/mongo/db/repl/elect_cmd_runner.cpp @@ -128,8 +128,8 @@ void ElectCmdRunner::Algorithm::processResponse(const RemoteCommandRequest& requ ElectCmdRunner::ElectCmdRunner() : _isCanceled(false) {} ElectCmdRunner::~ElectCmdRunner() {} -StatusWith ElectCmdRunner::start( - ReplicationExecutor* executor, +StatusWith ElectCmdRunner::start( + executor::TaskExecutor* executor, const ReplSetConfig& currentConfig, int selfIndex, const std::vector& targets) { diff --git a/src/mongo/db/repl/elect_cmd_runner.h b/src/mongo/db/repl/elect_cmd_runner.h index c428fa11002..7ca3155c814 100644 --- a/src/mongo/db/repl/elect_cmd_runner.h +++ b/src/mongo/db/repl/elect_cmd_runner.h @@ -90,10 +90,10 @@ public: * * Returned handle can be used to schedule a callback when the process is complete. */ - StatusWith start(ReplicationExecutor* executor, - const ReplSetConfig& currentConfig, - int selfIndex, - const std::vector& targets); + StatusWith start(executor::TaskExecutor* executor, + const ReplSetConfig& currentConfig, + int selfIndex, + const std::vector& targets); /** * Informs the ElectCmdRunner to cancel further processing. diff --git a/src/mongo/db/repl/elect_cmd_runner_test.cpp b/src/mongo/db/repl/elect_cmd_runner_test.cpp index 1361c544b6e..6cb8e4df917 100644 --- a/src/mongo/db/repl/elect_cmd_runner_test.cpp +++ b/src/mongo/db/repl/elect_cmd_runner_test.cpp @@ -33,11 +33,10 @@ #include "mongo/db/repl/elect_cmd_runner.h" #include "mongo/db/repl/member_heartbeat_data.h" #include "mongo/db/repl/repl_set_config.h" -#include "mongo/db/repl/replication_executor.h" #include "mongo/executor/network_interface_mock.h" +#include "mongo/executor/thread_pool_task_executor_test_fixture.h" #include "mongo/stdx/functional.h" #include "mongo/stdx/memory.h" -#include "mongo/stdx/thread.h" #include "mongo/unittest/unittest.h" @@ -51,7 +50,7 @@ using executor::NetworkInterfaceMock; using executor::RemoteCommandRequest; using executor::RemoteCommandResponse; -class ElectCmdRunnerTest : public mongo::unittest::Test { +class ElectCmdRunnerTest : public executor::ThreadPoolExecutorTest { public: void startTest(ElectCmdRunner* electCmdRunner, const ReplSetConfig& currentConfig, @@ -60,36 +59,21 @@ public: void waitForTest(); - void electCmdRunnerRunner(const ReplicationExecutor::CallbackArgs& data, + void electCmdRunnerRunner(const executor::TaskExecutor::CallbackArgs& data, ElectCmdRunner* electCmdRunner, - StatusWith* evh, + StatusWith* evh, const ReplSetConfig& currentConfig, int selfIndex, const std::vector& hosts); - NetworkInterfaceMock* _net; - std::unique_ptr _executor; - std::unique_ptr _executorThread; - private: - void setUp(); - void tearDown(); - - ReplicationExecutor::EventHandle _allDoneEvent; + void setUp() { + executor::ThreadPoolExecutorTest::setUp(); + launchExecutorThread(); + } + executor::TaskExecutor::EventHandle _allDoneEvent; }; -void ElectCmdRunnerTest::setUp() { - auto net = stdx::make_unique(); - _net = net.get(); - _executor = stdx::make_unique(std::move(net), 1 /* prng seed */); - _executorThread.reset(new stdx::thread(stdx::bind(&ReplicationExecutor::run, _executor.get()))); -} - -void ElectCmdRunnerTest::tearDown() { - _executor->shutdown(); - _executorThread->join(); -} - ReplSetConfig assertMakeRSConfig(const BSONObj& configBson) { ReplSetConfig config; ASSERT_OK(config.initialize(configBson)); @@ -123,40 +107,38 @@ BSONObj stripRound(const BSONObj& orig) { // This is necessary because the run method must be scheduled in the Replication Executor // for correct concurrency operation. -void ElectCmdRunnerTest::electCmdRunnerRunner(const ReplicationExecutor::CallbackArgs& data, +void ElectCmdRunnerTest::electCmdRunnerRunner(const executor::TaskExecutor::CallbackArgs& data, ElectCmdRunner* electCmdRunner, - StatusWith* evh, + StatusWith* evh, const ReplSetConfig& currentConfig, int selfIndex, const std::vector& hosts) { invariant(data.status.isOK()); - ReplicationExecutor* executor = dynamic_cast(data.executor); - ASSERT(executor); - *evh = electCmdRunner->start(executor, currentConfig, selfIndex, hosts); + *evh = electCmdRunner->start(data.executor, currentConfig, selfIndex, hosts); } void ElectCmdRunnerTest::startTest(ElectCmdRunner* electCmdRunner, const ReplSetConfig& currentConfig, int selfIndex, const std::vector& hosts) { - StatusWith evh(ErrorCodes::InternalError, "Not set"); - StatusWith cbh = - _executor->scheduleWork(stdx::bind(&ElectCmdRunnerTest::electCmdRunnerRunner, - this, - stdx::placeholders::_1, - electCmdRunner, - &evh, - currentConfig, - selfIndex, - hosts)); + StatusWith evh(ErrorCodes::InternalError, "Not set"); + StatusWith cbh = + getExecutor().scheduleWork(stdx::bind(&ElectCmdRunnerTest::electCmdRunnerRunner, + this, + stdx::placeholders::_1, + electCmdRunner, + &evh, + currentConfig, + selfIndex, + hosts)); ASSERT_OK(cbh.getStatus()); - _executor->wait(cbh.getValue()); + getExecutor().wait(cbh.getValue()); ASSERT_OK(evh.getStatus()); _allDoneEvent = evh.getValue(); } void ElectCmdRunnerTest::waitForTest() { - _executor->waitForEvent(_allDoneEvent); + getExecutor().waitForEvent(_allDoneEvent); } TEST_F(ElectCmdRunnerTest, OneNode) { @@ -195,21 +177,21 @@ TEST_F(ElectCmdRunnerTest, TwoNodes) { ElectCmdRunner electCmdRunner; startTest(&electCmdRunner, config, 0, hosts); - const Date_t startDate = _net->now(); - _net->enterNetwork(); - const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest(); + const Date_t startDate = getNet()->now(); + getNet()->enterNetwork(); + const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest(); ASSERT_EQUALS("admin", noi->getRequest().dbname); ASSERT_BSONOBJ_EQ(stripRound(electRequest), stripRound(noi->getRequest().cmdObj)); ASSERT_EQUALS(HostAndPort("h1"), noi->getRequest().target); - _net->scheduleResponse(noi, - startDate + Milliseconds(10), - ResponseStatus(RemoteCommandResponse( - BSON("ok" << 1 << "vote" << 1 << "round" << 380865962699346850ll), + getNet()->scheduleResponse( + noi, + startDate + Milliseconds(10), + (RemoteCommandResponse(BSON("ok" << 1 << "vote" << 1 << "round" << 380865962699346850ll), BSONObj(), Milliseconds(8)))); - _net->runUntil(startDate + Milliseconds(10)); - _net->exitNetwork(); - ASSERT_EQUALS(startDate + Milliseconds(10), _net->now()); + getNet()->runUntil(startDate + Milliseconds(10)); + getNet()->exitNetwork(); + ASSERT_EQUALS(startDate + Milliseconds(10), getNet()->now()); waitForTest(); ASSERT_EQUALS(electCmdRunner.getReceivedVotes(), 2); } @@ -230,21 +212,22 @@ TEST_F(ElectCmdRunnerTest, ShuttingDown) { hosts.push_back(config.getMemberAt(1).getHostAndPort()); ElectCmdRunner electCmdRunner; - StatusWith evh(ErrorCodes::InternalError, "Not set"); - StatusWith cbh = - _executor->scheduleWork(stdx::bind(&ElectCmdRunnerTest::electCmdRunnerRunner, - this, - stdx::placeholders::_1, - &electCmdRunner, - &evh, - config, - 0, - hosts)); + StatusWith evh(ErrorCodes::InternalError, "Not set"); + StatusWith cbh = + getExecutor().scheduleWork(stdx::bind(&ElectCmdRunnerTest::electCmdRunnerRunner, + this, + stdx::placeholders::_1, + &electCmdRunner, + &evh, + config, + 0, + hosts)); ASSERT_OK(cbh.getStatus()); - _executor->wait(cbh.getValue()); + getExecutor().wait(cbh.getValue()); ASSERT_OK(evh.getStatus()); - _executor->shutdown(); - _executor->waitForEvent(evh.getValue()); + shutdownExecutorThread(); + joinExecutorThread(); + getExecutor().waitForEvent(evh.getValue()); ASSERT_EQUALS(electCmdRunner.getReceivedVotes(), 1); } @@ -279,7 +262,8 @@ protected: return _checker->getReceivedVotes(); } - void processResponse(const RemoteCommandRequest& request, const ResponseStatus& response) { + void processResponse(const RemoteCommandRequest& request, + const RemoteCommandResponse& response) { _checker->processResponse(request, response); } @@ -291,27 +275,27 @@ protected: Milliseconds(0)); } - ResponseStatus badResponseStatus() { - return ResponseStatus(ErrorCodes::NodeNotFound, "not on my watch"); + RemoteCommandResponse badRemoteCommandResponse() { + return RemoteCommandResponse(ErrorCodes::NodeNotFound, "not on my watch"); } - ResponseStatus wrongTypeForVoteField() { - return ResponseStatus(NetworkInterfaceMock::Response( + RemoteCommandResponse wrongTypeForVoteField() { + return RemoteCommandResponse(NetworkInterfaceMock::Response( BSON("vote" << std::string("yea")), BSONObj(), Milliseconds(10))); } - ResponseStatus voteYea() { - return ResponseStatus( + RemoteCommandResponse voteYea() { + return RemoteCommandResponse( NetworkInterfaceMock::Response(BSON("vote" << 1), BSONObj(), Milliseconds(10))); } - ResponseStatus voteNay() { - return ResponseStatus( + RemoteCommandResponse voteNay() { + return RemoteCommandResponse( NetworkInterfaceMock::Response(BSON("vote" << -10000), BSONObj(), Milliseconds(10))); } - ResponseStatus abstainFromVoting() { - return ResponseStatus( + RemoteCommandResponse abstainFromVoting() { + return RemoteCommandResponse( NetworkInterfaceMock::Response(BSON("vote" << 0), BSONObj(), Milliseconds(10))); } @@ -364,7 +348,7 @@ TEST_F(ElectScatterGatherTest, NodeRespondsWithBadStatus) { start(basicThreeNodeConfig()); ASSERT_FALSE(hasReceivedSufficientResponses()); - processResponse(requestFrom("host2"), badResponseStatus()); + processResponse(requestFrom("host2"), badRemoteCommandResponse()); ASSERT_FALSE(hasReceivedSufficientResponses()); processResponse(requestFrom("host3"), abstainFromVoting()); @@ -406,7 +390,7 @@ TEST_F(ElectScatterGatherTest, NodeRespondsWithBadStatusArbiters) { start(threeNodesTwoArbitersConfig()); ASSERT_FALSE(hasReceivedSufficientResponses()); - processResponse(requestFrom("host2"), badResponseStatus()); + processResponse(requestFrom("host2"), badRemoteCommandResponse()); ASSERT_FALSE(hasReceivedSufficientResponses()); processResponse(requestFrom("host3"), abstainFromVoting()); diff --git a/src/mongo/db/repl/freshness_checker.cpp b/src/mongo/db/repl/freshness_checker.cpp index 8ab9cb1cd7c..b6db822eb67 100644 --- a/src/mongo/db/repl/freshness_checker.cpp +++ b/src/mongo/db/repl/freshness_checker.cpp @@ -47,6 +47,7 @@ namespace mongo { namespace repl { using executor::RemoteCommandRequest; +using executor::RemoteCommandResponse; FreshnessChecker::Algorithm::Algorithm(Timestamp lastOpTimeApplied, const ReplSetConfig& rsConfig, @@ -122,7 +123,7 @@ bool FreshnessChecker::Algorithm::_isVotingMember(const HostAndPort hap) const { } void FreshnessChecker::Algorithm::processResponse(const RemoteCommandRequest& request, - const ResponseStatus& response) { + const RemoteCommandResponse& response) { ++_responsesProcessed; bool votingMember = _isVotingMember(request.target); @@ -209,8 +210,8 @@ long long FreshnessChecker::getOriginalConfigVersion() const { FreshnessChecker::FreshnessChecker() : _isCanceled(false) {} FreshnessChecker::~FreshnessChecker() {} -StatusWith FreshnessChecker::start( - ReplicationExecutor* executor, +StatusWith FreshnessChecker::start( + executor::TaskExecutor* executor, const Timestamp& lastOpTimeApplied, const ReplSetConfig& currentConfig, int selfIndex, diff --git a/src/mongo/db/repl/freshness_checker.h b/src/mongo/db/repl/freshness_checker.h index 9d4eb50023f..2db774a7baf 100644 --- a/src/mongo/db/repl/freshness_checker.h +++ b/src/mongo/db/repl/freshness_checker.h @@ -33,8 +33,8 @@ #include "mongo/base/disallow_copying.h" #include "mongo/bson/timestamp.h" #include "mongo/db/repl/repl_set_config.h" -#include "mongo/db/repl/replication_executor.h" #include "mongo/db/repl/scatter_gather_algorithm.h" +#include "mongo/executor/task_executor.h" namespace mongo { @@ -66,7 +66,7 @@ public: virtual ~Algorithm(); virtual std::vector getRequests() const; virtual void processResponse(const executor::RemoteCommandRequest& request, - const ResponseStatus& response); + const executor::RemoteCommandResponse& response); virtual bool hasReceivedSufficientResponses() const; ElectionAbortReason shouldAbortElection() const; @@ -118,11 +118,11 @@ public: * evh can be used to schedule a callback when the process is complete. * If this function returns Status::OK(), evh is then guaranteed to be signaled. **/ - StatusWith start(ReplicationExecutor* executor, - const Timestamp& lastOpTimeApplied, - const ReplSetConfig& currentConfig, - int selfIndex, - const std::vector& targets); + StatusWith start(executor::TaskExecutor* executor, + const Timestamp& lastOpTimeApplied, + const ReplSetConfig& currentConfig, + int selfIndex, + const std::vector& targets); /** * Informs the freshness checker to cancel further processing. diff --git a/src/mongo/db/repl/freshness_checker_test.cpp b/src/mongo/db/repl/freshness_checker_test.cpp index 5bc3fd48984..2535ac5ffa2 100644 --- a/src/mongo/db/repl/freshness_checker_test.cpp +++ b/src/mongo/db/repl/freshness_checker_test.cpp @@ -33,8 +33,8 @@ #include "mongo/db/repl/freshness_checker.h" #include "mongo/db/repl/member_heartbeat_data.h" #include "mongo/db/repl/repl_set_config.h" -#include "mongo/db/repl/replication_executor.h" #include "mongo/executor/network_interface_mock.h" +#include "mongo/executor/thread_pool_task_executor_test_fixture.h" #include "mongo/platform/unordered_set.h" #include "mongo/stdx/functional.h" #include "mongo/stdx/memory.h" @@ -57,7 +57,7 @@ bool stringContains(const std::string& haystack, const std::string& needle) { return haystack.find(needle) != std::string::npos; } -class FreshnessCheckerTest : public mongo::unittest::Test { +class FreshnessCheckerTest : public executor::ThreadPoolExecutorTest { protected: void startTest(const Timestamp& lastOpTimeApplied, const ReplSetConfig& currentConfig, @@ -72,42 +72,29 @@ protected: stdx::bind(stringContains, stdx::placeholders::_1, needle)); } - NetworkInterfaceMock* _net; - std::unique_ptr _executor; - std::unique_ptr _executorThread; - private: - void freshnessCheckerRunner(const ReplicationExecutor::CallbackArgs& data, + void freshnessCheckerRunner(const executor::TaskExecutor::CallbackArgs& data, const Timestamp& lastOpTimeApplied, const ReplSetConfig& currentConfig, int selfIndex, const std::vector& hosts); void setUp(); - void tearDown(); - std::unique_ptr _checker; - ReplicationExecutor::EventHandle _checkerDoneEvent; + FreshnessChecker _checker; + executor::TaskExecutor::EventHandle _checkerDoneEvent; }; void FreshnessCheckerTest::setUp() { - auto net = stdx::make_unique(); - _net = net.get(); - _executor = stdx::make_unique(std::move(net), 1 /* prng seed */); - _executorThread.reset(new stdx::thread(stdx::bind(&ReplicationExecutor::run, _executor.get()))); - _checker.reset(new FreshnessChecker); -} - -void FreshnessCheckerTest::tearDown() { - _executor->shutdown(); - _executorThread->join(); + executor::ThreadPoolExecutorTest::setUp(); + launchExecutorThread(); } void FreshnessCheckerTest::waitOnChecker() { - _executor->waitForEvent(_checkerDoneEvent); + getExecutor().waitForEvent(_checkerDoneEvent); } FreshnessChecker::ElectionAbortReason FreshnessCheckerTest::shouldAbortElection() const { - return _checker->shouldAbortElection(); + return _checker.shouldAbortElection(); } ReplSetConfig assertMakeRSConfig(const BSONObj& configBson) { @@ -131,18 +118,16 @@ const BSONObj makeFreshRequest(const ReplSetConfig& rsConfig, << myConfig.getId()); } -// This is necessary because the run method must be scheduled in the Replication Executor +// This is necessary because the run method must be scheduled in the executor // for correct concurrency operation. -void FreshnessCheckerTest::freshnessCheckerRunner(const ReplicationExecutor::CallbackArgs& data, +void FreshnessCheckerTest::freshnessCheckerRunner(const executor::TaskExecutor::CallbackArgs& data, const Timestamp& lastOpTimeApplied, const ReplSetConfig& currentConfig, int selfIndex, const std::vector& hosts) { invariant(data.status.isOK()); - ReplicationExecutor* executor = dynamic_cast(data.executor); - ASSERT(executor); - StatusWith evh = - _checker->start(executor, lastOpTimeApplied, currentConfig, selfIndex, hosts); + StatusWith evh = + _checker.start(data.executor, lastOpTimeApplied, currentConfig, selfIndex, hosts); _checkerDoneEvent = assertGet(evh); } @@ -150,14 +135,14 @@ void FreshnessCheckerTest::startTest(const Timestamp& lastOpTimeApplied, const ReplSetConfig& currentConfig, int selfIndex, const std::vector& hosts) { - _executor->wait( - assertGet(_executor->scheduleWork(stdx::bind(&FreshnessCheckerTest::freshnessCheckerRunner, - this, - stdx::placeholders::_1, - lastOpTimeApplied, - currentConfig, - selfIndex, - hosts)))); + getExecutor().wait(assertGet( + getExecutor().scheduleWork(stdx::bind(&FreshnessCheckerTest::freshnessCheckerRunner, + this, + stdx::placeholders::_1, + lastOpTimeApplied, + currentConfig, + selfIndex, + hosts)))); } TEST_F(FreshnessCheckerTest, TwoNodes) { @@ -177,30 +162,29 @@ TEST_F(FreshnessCheckerTest, TwoNodes) { const BSONObj freshRequest = makeFreshRequest(config, Timestamp(0, 0), 0); startTest(Timestamp(0, 0), config, 0, hosts); - const Date_t startDate = _net->now(); - _net->enterNetwork(); + const Date_t startDate = getNet()->now(); + getNet()->enterNetwork(); for (size_t i = 0; i < hosts.size(); ++i) { - const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest(); + const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest(); ASSERT_EQUALS("admin", noi->getRequest().dbname); ASSERT_BSONOBJ_EQ(freshRequest, noi->getRequest().cmdObj); ASSERT_EQUALS(HostAndPort("h1"), noi->getRequest().target); - _net->scheduleResponse( - noi, - startDate + Milliseconds(10), - ResponseStatus(RemoteCommandResponse(BSON("ok" << 1 << "id" << 2 << "set" - << "rs0" - << "who" - << "h1" - << "cfgver" - << 1 - << "opTime" - << Date_t()), - BSONObj(), - Milliseconds(8)))); + getNet()->scheduleResponse(noi, + startDate + Milliseconds(10), + (RemoteCommandResponse(BSON("ok" << 1 << "id" << 2 << "set" + << "rs0" + << "who" + << "h1" + << "cfgver" + << 1 + << "opTime" + << Date_t()), + BSONObj(), + Milliseconds(8)))); } - _net->runUntil(startDate + Milliseconds(10)); - _net->exitNetwork(); - ASSERT_EQUALS(startDate + Milliseconds(10), _net->now()); + getNet()->runUntil(startDate + Milliseconds(10)); + getNet()->exitNetwork(); + ASSERT_EQUALS(startDate + Milliseconds(10), getNet()->now()); waitOnChecker(); ASSERT_EQUALS(shouldAbortElection(), FreshnessChecker::FreshnessTie); } @@ -221,7 +205,8 @@ TEST_F(FreshnessCheckerTest, ShuttingDown) { hosts.push_back(config.getMemberAt(1).getHostAndPort()); startTest(Timestamp(0, 0), config, 0, hosts); - _executor->shutdown(); + shutdownExecutorThread(); + joinExecutorThread(); waitOnChecker(); // This seems less than ideal, but if we are shutting down, the next phase of election @@ -248,32 +233,31 @@ TEST_F(FreshnessCheckerTest, ElectNotElectingSelfWeAreNotFreshest) { const BSONObj freshRequest = makeFreshRequest(config, Timestamp(10, 0), 0); startTest(Timestamp(10, 0), config, 0, hosts); - const Date_t startDate = _net->now(); - _net->enterNetwork(); + const Date_t startDate = getNet()->now(); + getNet()->enterNetwork(); for (size_t i = 0; i < hosts.size(); ++i) { - const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest(); + const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest(); ASSERT_EQUALS("admin", noi->getRequest().dbname); ASSERT_BSONOBJ_EQ(freshRequest, noi->getRequest().cmdObj); ASSERT_EQUALS(HostAndPort("h1"), noi->getRequest().target); - _net->scheduleResponse( - noi, - startDate + Milliseconds(10), - ResponseStatus(RemoteCommandResponse(BSON("ok" << 1 << "id" << 2 << "set" - << "rs0" - << "who" - << "h1" - << "cfgver" - << 1 - << "fresher" - << true - << "opTime" - << Date_t()), - BSONObj(), - Milliseconds(8)))); + getNet()->scheduleResponse(noi, + startDate + Milliseconds(10), + (RemoteCommandResponse(BSON("ok" << 1 << "id" << 2 << "set" + << "rs0" + << "who" + << "h1" + << "cfgver" + << 1 + << "fresher" + << true + << "opTime" + << Date_t()), + BSONObj(), + Milliseconds(8)))); } - _net->runUntil(startDate + Milliseconds(10)); - _net->exitNetwork(); - ASSERT_EQUALS(startDate + Milliseconds(10), _net->now()); + getNet()->runUntil(startDate + Milliseconds(10)); + getNet()->exitNetwork(); + ASSERT_EQUALS(startDate + Milliseconds(10), getNet()->now()); waitOnChecker(); stopCapturingLogMessages(); @@ -301,17 +285,17 @@ TEST_F(FreshnessCheckerTest, ElectNotElectingSelfWeAreNotFreshestOpTime) { const BSONObj freshRequest = makeFreshRequest(config, Timestamp(0, 0), 0); startTest(Timestamp(0, 0), config, 0, hosts); - const Date_t startDate = _net->now(); - _net->enterNetwork(); + const Date_t startDate = getNet()->now(); + getNet()->enterNetwork(); for (size_t i = 0; i < hosts.size(); ++i) { - const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest(); + const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest(); ASSERT_EQUALS("admin", noi->getRequest().dbname); ASSERT_BSONOBJ_EQ(freshRequest, noi->getRequest().cmdObj); ASSERT_EQUALS(HostAndPort("h1"), noi->getRequest().target); - _net->scheduleResponse( + getNet()->scheduleResponse( noi, startDate + Milliseconds(10), - ResponseStatus(RemoteCommandResponse( + (RemoteCommandResponse( BSON("ok" << 1 << "id" << 2 << "set" << "rs0" << "who" @@ -323,9 +307,9 @@ TEST_F(FreshnessCheckerTest, ElectNotElectingSelfWeAreNotFreshestOpTime) { BSONObj(), Milliseconds(8)))); } - _net->runUntil(startDate + Milliseconds(10)); - _net->exitNetwork(); - ASSERT_EQUALS(startDate + Milliseconds(10), _net->now()); + getNet()->runUntil(startDate + Milliseconds(10)); + getNet()->exitNetwork(); + ASSERT_EQUALS(startDate + Milliseconds(10), getNet()->now()); waitOnChecker(); stopCapturingLogMessages(); @@ -351,30 +335,29 @@ TEST_F(FreshnessCheckerTest, ElectWrongTypeInFreshnessResponse) { const BSONObj freshRequest = makeFreshRequest(config, Timestamp(10, 0), 0); startTest(Timestamp(10, 0), config, 0, hosts); - const Date_t startDate = _net->now(); - _net->enterNetwork(); + const Date_t startDate = getNet()->now(); + getNet()->enterNetwork(); for (size_t i = 0; i < hosts.size(); ++i) { - const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest(); + const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest(); ASSERT_EQUALS("admin", noi->getRequest().dbname); ASSERT_BSONOBJ_EQ(freshRequest, noi->getRequest().cmdObj); ASSERT_EQUALS(HostAndPort("h1"), noi->getRequest().target); - _net->scheduleResponse( - noi, - startDate + Milliseconds(10), - ResponseStatus(RemoteCommandResponse(BSON("ok" << 1 << "id" << 2 << "set" - << "rs0" - << "who" - << "h1" - << "cfgver" - << 1 - << "opTime" - << 3), - BSONObj(), - Milliseconds(8)))); + getNet()->scheduleResponse(noi, + startDate + Milliseconds(10), + (RemoteCommandResponse(BSON("ok" << 1 << "id" << 2 << "set" + << "rs0" + << "who" + << "h1" + << "cfgver" + << 1 + << "opTime" + << 3), + BSONObj(), + Milliseconds(8)))); } - _net->runUntil(startDate + Milliseconds(10)); - _net->exitNetwork(); - ASSERT_EQUALS(startDate + Milliseconds(10), _net->now()); + getNet()->runUntil(startDate + Milliseconds(10)); + getNet()->exitNetwork(); + ASSERT_EQUALS(startDate + Milliseconds(10), getNet()->now()); waitOnChecker(); stopCapturingLogMessages(); @@ -404,35 +387,34 @@ TEST_F(FreshnessCheckerTest, ElectVetoed) { const BSONObj freshRequest = makeFreshRequest(config, Timestamp(10, 0), 0); startTest(Timestamp(10, 0), config, 0, hosts); - const Date_t startDate = _net->now(); - _net->enterNetwork(); + const Date_t startDate = getNet()->now(); + getNet()->enterNetwork(); for (size_t i = 0; i < hosts.size(); ++i) { - const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest(); + const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest(); ASSERT_EQUALS("admin", noi->getRequest().dbname); ASSERT_BSONOBJ_EQ(freshRequest, noi->getRequest().cmdObj); ASSERT_EQUALS(HostAndPort("h1"), noi->getRequest().target); - _net->scheduleResponse( - noi, - startDate + Milliseconds(10), - ResponseStatus(RemoteCommandResponse( - BSON("ok" << 1 << "id" << 2 << "set" - << "rs0" - << "who" - << "h1" - << "cfgver" - << 1 - << "veto" - << true - << "errmsg" - << "I'd rather you didn't" - << "opTime" - << Date_t::fromMillisSinceEpoch(Timestamp(0, 0).asLL())), - BSONObj(), - Milliseconds(8)))); + getNet()->scheduleResponse(noi, + startDate + Milliseconds(10), + (RemoteCommandResponse(BSON("ok" << 1 << "id" << 2 << "set" + << "rs0" + << "who" + << "h1" + << "cfgver" + << 1 + << "veto" + << true + << "errmsg" + << "I'd rather you didn't" + << "opTime" + << Date_t::fromMillisSinceEpoch( + Timestamp(0, 0).asLL())), + BSONObj(), + Milliseconds(8)))); } - _net->runUntil(startDate + Milliseconds(10)); - _net->exitNetwork(); - ASSERT_EQUALS(startDate + Milliseconds(10), _net->now()); + getNet()->runUntil(startDate + Milliseconds(10)); + getNet()->exitNetwork(); + ASSERT_EQUALS(startDate + Milliseconds(10), getNet()->now()); waitOnChecker(); stopCapturingLogMessages(); @@ -477,11 +459,11 @@ TEST_F(FreshnessCheckerTest, ElectNotElectingSelfWeAreNotFreshestManyNodes) { const BSONObj freshRequest = makeFreshRequest(config, Timestamp(10, 0), 0); startTest(Timestamp(10, 0), config, 0, hosts); - const Date_t startDate = _net->now(); + const Date_t startDate = getNet()->now(); unordered_set seen; - _net->enterNetwork(); + getNet()->enterNetwork(); for (size_t i = 0; i < hosts.size(); ++i) { - const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest(); + const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest(); const HostAndPort target = noi->getRequest().target; ASSERT_EQUALS("admin", noi->getRequest().dbname); ASSERT_BSONOBJ_EQ(freshRequest, noi->getRequest().cmdObj); @@ -494,14 +476,14 @@ TEST_F(FreshnessCheckerTest, ElectNotElectingSelfWeAreNotFreshestManyNodes) { if (target.host() == "h1") { responseBuilder << "fresher" << true; } - _net->scheduleResponse(noi, - startDate + Milliseconds(10), - ResponseStatus(RemoteCommandResponse( - responseBuilder.obj(), BSONObj(), Milliseconds(8)))); + getNet()->scheduleResponse( + noi, + startDate + Milliseconds(10), + (RemoteCommandResponse(responseBuilder.obj(), BSONObj(), Milliseconds(8)))); } - _net->runUntil(startDate + Milliseconds(10)); - _net->exitNetwork(); - ASSERT_EQUALS(startDate + Milliseconds(10), _net->now()); + getNet()->runUntil(startDate + Milliseconds(10)); + getNet()->exitNetwork(); + ASSERT_EQUALS(startDate + Milliseconds(10), getNet()->now()); waitOnChecker(); stopCapturingLogMessages(); ASSERT_EQUALS(shouldAbortElection(), FreshnessChecker::FresherNodeFound); @@ -540,12 +522,12 @@ TEST_F(FreshnessCheckerTest, ElectNotElectingSelfWeAreNotFreshestOpTimeManyNodes const BSONObj freshRequest = makeFreshRequest(config, Timestamp(10, 0), 0); startTest(Timestamp(10, 0), config, 0, hosts); - const Date_t startDate = _net->now(); + const Date_t startDate = getNet()->now(); unordered_set seen; - _net->enterNetwork(); + getNet()->enterNetwork(); for (size_t i = 0; i < hosts.size(); ++i) { - const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest(); + const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest(); const HostAndPort target = noi->getRequest().target; ASSERT_EQUALS("admin", noi->getRequest().dbname); ASSERT_BSONOBJ_EQ(freshRequest, noi->getRequest().cmdObj); @@ -556,27 +538,27 @@ TEST_F(FreshnessCheckerTest, ElectNotElectingSelfWeAreNotFreshestOpTimeManyNodes << "rs0" << "who" << target.toString() << "cfgver" << 1 << "opTime" << Date_t::fromMillisSinceEpoch(Timestamp(20, 0).asLL()); - _net->scheduleResponse(noi, - startDate + Milliseconds(20), - ResponseStatus(RemoteCommandResponse( - responseBuilder.obj(), BSONObj(), Milliseconds(8)))); + getNet()->scheduleResponse( + noi, + startDate + Milliseconds(20), + (RemoteCommandResponse(responseBuilder.obj(), BSONObj(), Milliseconds(8)))); } else { responseBuilder << "ok" << 1 << "id" << findIdForMember(config, target) << "set" << "rs0" << "who" << target.toString() << "cfgver" << 1 << "opTime" << Date_t::fromMillisSinceEpoch(Timestamp(10, 0).asLL()); - _net->scheduleResponse(noi, - startDate + Milliseconds(10), - ResponseStatus(RemoteCommandResponse( - responseBuilder.obj(), BSONObj(), Milliseconds(8)))); + getNet()->scheduleResponse( + noi, + startDate + Milliseconds(10), + (RemoteCommandResponse(responseBuilder.obj(), BSONObj(), Milliseconds(8)))); } } - _net->runUntil(startDate + Milliseconds(10)); - ASSERT_EQUALS(startDate + Milliseconds(10), _net->now()); + getNet()->runUntil(startDate + Milliseconds(10)); + ASSERT_EQUALS(startDate + Milliseconds(10), getNet()->now()); ASSERT_EQUALS(0, countLogLinesContaining("not electing self, we are not freshest")); - _net->runUntil(startDate + Milliseconds(20)); - ASSERT_EQUALS(startDate + Milliseconds(20), _net->now()); - _net->exitNetwork(); + getNet()->runUntil(startDate + Milliseconds(20)); + ASSERT_EQUALS(startDate + Milliseconds(20), getNet()->now()); + getNet()->exitNetwork(); waitOnChecker(); stopCapturingLogMessages(); ASSERT_EQUALS(shouldAbortElection(), FreshnessChecker::FresherNodeFound); @@ -610,11 +592,11 @@ TEST_F(FreshnessCheckerTest, ElectWrongTypeInFreshnessResponseManyNodes) { const BSONObj freshRequest = makeFreshRequest(config, Timestamp(10, 0), 0); startTest(Timestamp(10, 0), config, 0, hosts); - const Date_t startDate = _net->now(); + const Date_t startDate = getNet()->now(); unordered_set seen; - _net->enterNetwork(); + getNet()->enterNetwork(); for (size_t i = 0; i < hosts.size(); ++i) { - const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest(); + const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest(); const HostAndPort target = noi->getRequest().target; ASSERT_EQUALS("admin", noi->getRequest().dbname); ASSERT_BSONOBJ_EQ(freshRequest, noi->getRequest().cmdObj); @@ -628,14 +610,14 @@ TEST_F(FreshnessCheckerTest, ElectWrongTypeInFreshnessResponseManyNodes) { } else { responseBuilder << "opTime" << Date_t::fromMillisSinceEpoch(Timestamp(0, 0).asLL()); } - _net->scheduleResponse(noi, - startDate + Milliseconds(10), - ResponseStatus(RemoteCommandResponse( - responseBuilder.obj(), BSONObj(), Milliseconds(8)))); + getNet()->scheduleResponse( + noi, + startDate + Milliseconds(10), + (RemoteCommandResponse(responseBuilder.obj(), BSONObj(), Milliseconds(8)))); } - _net->runUntil(startDate + Milliseconds(10)); - _net->exitNetwork(); - ASSERT_EQUALS(startDate + Milliseconds(10), _net->now()); + getNet()->runUntil(startDate + Milliseconds(10)); + getNet()->exitNetwork(); + ASSERT_EQUALS(startDate + Milliseconds(10), getNet()->now()); waitOnChecker(); stopCapturingLogMessages(); ASSERT_EQUALS(shouldAbortElection(), FreshnessChecker::FresherNodeFound); @@ -672,11 +654,11 @@ TEST_F(FreshnessCheckerTest, ElectVetoedManyNodes) { const BSONObj freshRequest = makeFreshRequest(config, Timestamp(10, 0), 0); startTest(Timestamp(10, 0), config, 0, hosts); - const Date_t startDate = _net->now(); + const Date_t startDate = getNet()->now(); unordered_set seen; - _net->enterNetwork(); + getNet()->enterNetwork(); for (size_t i = 0; i < hosts.size(); ++i) { - const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest(); + const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest(); const HostAndPort target = noi->getRequest().target; ASSERT_EQUALS("admin", noi->getRequest().dbname); ASSERT_BSONOBJ_EQ(freshRequest, noi->getRequest().cmdObj); @@ -690,14 +672,14 @@ TEST_F(FreshnessCheckerTest, ElectVetoedManyNodes) { responseBuilder << "veto" << true << "errmsg" << "I'd rather you didn't"; } - _net->scheduleResponse(noi, - startDate + Milliseconds(10), - ResponseStatus(RemoteCommandResponse( - responseBuilder.obj(), BSONObj(), Milliseconds(8)))); + getNet()->scheduleResponse( + noi, + startDate + Milliseconds(10), + (RemoteCommandResponse(responseBuilder.obj(), BSONObj(), Milliseconds(8)))); } - _net->runUntil(startDate + Milliseconds(10)); - _net->exitNetwork(); - ASSERT_EQUALS(startDate + Milliseconds(10), _net->now()); + getNet()->runUntil(startDate + Milliseconds(10)); + getNet()->exitNetwork(); + ASSERT_EQUALS(startDate + Milliseconds(10), getNet()->now()); waitOnChecker(); stopCapturingLogMessages(); ASSERT_EQUALS(shouldAbortElection(), FreshnessChecker::FresherNodeFound); @@ -737,12 +719,12 @@ TEST_F(FreshnessCheckerTest, ElectVetoedAndTiedFreshnessManyNodes) { const BSONObj freshRequest = makeFreshRequest(config, Timestamp(10, 0), 0); startTest(Timestamp(10, 0), config, 0, hosts); - const Date_t startDate = _net->now(); + const Date_t startDate = getNet()->now(); unordered_set seen; - _net->enterNetwork(); + getNet()->enterNetwork(); for (size_t i = 0; i < hosts.size(); ++i) { - const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest(); + const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest(); const HostAndPort target = noi->getRequest().target; ASSERT_EQUALS("admin", noi->getRequest().dbname); ASSERT_BSONOBJ_EQ(freshRequest, noi->getRequest().cmdObj); @@ -755,29 +737,29 @@ TEST_F(FreshnessCheckerTest, ElectVetoedAndTiedFreshnessManyNodes) { << "errmsg" << "I'd rather you didn't" << "opTime" << Date_t::fromMillisSinceEpoch(Timestamp(10, 0).asLL()); - _net->scheduleResponse(noi, - startDate + Milliseconds(20), - ResponseStatus(RemoteCommandResponse( - responseBuilder.obj(), BSONObj(), Milliseconds(8)))); + getNet()->scheduleResponse( + noi, + startDate + Milliseconds(20), + (RemoteCommandResponse(responseBuilder.obj(), BSONObj(), Milliseconds(8)))); } else { responseBuilder << "ok" << 1 << "id" << findIdForMember(config, target) << "set" << "rs0" << "who" << target.toString() << "cfgver" << 1 << "opTime" << Date_t::fromMillisSinceEpoch(Timestamp(10, 0).asLL()); - _net->scheduleResponse(noi, - startDate + Milliseconds(10), - ResponseStatus(RemoteCommandResponse( - responseBuilder.obj(), BSONObj(), Milliseconds(8)))); + getNet()->scheduleResponse( + noi, + startDate + Milliseconds(10), + (RemoteCommandResponse(responseBuilder.obj(), BSONObj(), Milliseconds(8)))); } } - _net->runUntil(startDate + Milliseconds(10)); - ASSERT_EQUALS(startDate + Milliseconds(10), _net->now()); + getNet()->runUntil(startDate + Milliseconds(10)); + ASSERT_EQUALS(startDate + Milliseconds(10), getNet()->now()); ASSERT_EQUALS(0, countLogLinesContaining("not electing self, h4:27017 would veto with '" "errmsg: \"I'd rather you didn't\"'")); - _net->runUntil(startDate + Milliseconds(20)); - ASSERT_EQUALS(startDate + Milliseconds(20), _net->now()); - _net->exitNetwork(); + getNet()->runUntil(startDate + Milliseconds(20)); + ASSERT_EQUALS(startDate + Milliseconds(20), getNet()->now()); + getNet()->exitNetwork(); waitOnChecker(); stopCapturingLogMessages(); ASSERT_EQUALS(shouldAbortElection(), FreshnessChecker::FresherNodeFound); @@ -813,34 +795,34 @@ TEST_F(FreshnessCheckerTest, ElectManyNodesNotAllRespond) { const BSONObj freshRequest = makeFreshRequest(config, lastOpTimeApplied, 0); startTest(Timestamp(10, 0), config, 0, hosts); - const Date_t startDate = _net->now(); + const Date_t startDate = getNet()->now(); unordered_set seen; - _net->enterNetwork(); + getNet()->enterNetwork(); for (size_t i = 0; i < hosts.size(); ++i) { - const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest(); + const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest(); const HostAndPort target = noi->getRequest().target; ASSERT_EQUALS("admin", noi->getRequest().dbname); ASSERT_BSONOBJ_EQ(freshRequest, noi->getRequest().cmdObj); ASSERT(seen.insert(target).second) << "Already saw " << target; if (target.host() == "h2" || target.host() == "h3") { - _net->scheduleResponse(noi, - startDate + Milliseconds(10), - ResponseStatus(ErrorCodes::NoSuchKey, "No response")); + getNet()->scheduleResponse(noi, + startDate + Milliseconds(10), + RemoteCommandResponse(ErrorCodes::NoSuchKey, "No response")); } else { BSONObjBuilder responseBuilder; responseBuilder << "ok" << 1 << "id" << findIdForMember(config, target) << "set" << "rs0" << "who" << target.toString() << "cfgver" << 1 << "opTime" << Date_t::fromMillisSinceEpoch(Timestamp(0, 0).asLL()); - _net->scheduleResponse(noi, - startDate + Milliseconds(10), - ResponseStatus(RemoteCommandResponse( - responseBuilder.obj(), BSONObj(), Milliseconds(8)))); + getNet()->scheduleResponse( + noi, + startDate + Milliseconds(10), + (RemoteCommandResponse(responseBuilder.obj(), BSONObj(), Milliseconds(8)))); } } - _net->runUntil(startDate + Milliseconds(10)); - _net->exitNetwork(); - ASSERT_EQUALS(startDate + Milliseconds(10), _net->now()); + getNet()->runUntil(startDate + Milliseconds(10)); + getNet()->exitNetwork(); + ASSERT_EQUALS(startDate + Milliseconds(10), getNet()->now()); waitOnChecker(); ASSERT_EQUALS(shouldAbortElection(), FreshnessChecker::None); } @@ -884,7 +866,8 @@ protected: return _checker->hasReceivedSufficientResponses(); } - void processResponse(const RemoteCommandRequest& request, const ResponseStatus& response) { + void processResponse(const RemoteCommandRequest& request, + const RemoteCommandResponse& response) { _checker->processResponse(request, response); } @@ -892,54 +875,54 @@ protected: return _checker->shouldAbortElection(); } - ResponseStatus lessFresh() { + RemoteCommandResponse lessFresh() { BSONObjBuilder bb; bb.append("ok", 1.0); bb.appendDate("opTime", Date_t::fromMillisSinceEpoch(Timestamp(10, 0).asLL())); - return ResponseStatus( + return RemoteCommandResponse( NetworkInterfaceMock::Response(bb.obj(), BSONObj(), Milliseconds(10))); } - ResponseStatus moreFreshViaOpTime() { + RemoteCommandResponse moreFreshViaOpTime() { BSONObjBuilder bb; bb.append("ok", 1.0); bb.appendDate("opTime", Date_t::fromMillisSinceEpoch(Timestamp(110, 0).asLL())); - return ResponseStatus( + return RemoteCommandResponse( NetworkInterfaceMock::Response(bb.obj(), BSONObj(), Milliseconds(10))); } - ResponseStatus wrongTypeForOpTime() { + RemoteCommandResponse wrongTypeForOpTime() { BSONObjBuilder bb; bb.append("ok", 1.0); bb.append("opTime", std::string("several minutes ago")); - return ResponseStatus( + return RemoteCommandResponse( NetworkInterfaceMock::Response(bb.obj(), BSONObj(), Milliseconds(10))); } - ResponseStatus unauthorized() { + RemoteCommandResponse unauthorized() { BSONObjBuilder bb; bb.append("ok", 0.0); bb.append("code", ErrorCodes::Unauthorized); bb.append("errmsg", "Unauthorized"); - return ResponseStatus( + return RemoteCommandResponse( NetworkInterfaceMock::Response(bb.obj(), BSONObj(), Milliseconds(10))); } - ResponseStatus tiedForFreshness() { + RemoteCommandResponse tiedForFreshness() { BSONObjBuilder bb; bb.append("ok", 1.0); bb.appendDate("opTime", Date_t::fromMillisSinceEpoch(Timestamp(100, 0).asLL())); - return ResponseStatus( + return RemoteCommandResponse( NetworkInterfaceMock::Response(bb.obj(), BSONObj(), Milliseconds(10))); } - ResponseStatus moreFresh() { - return ResponseStatus(NetworkInterfaceMock::Response( + RemoteCommandResponse moreFresh() { + return RemoteCommandResponse(NetworkInterfaceMock::Response( BSON("ok" << 1.0 << "fresher" << true), BSONObj(), Milliseconds(10))); } - ResponseStatus veto() { - return ResponseStatus( + RemoteCommandResponse veto() { + return RemoteCommandResponse( NetworkInterfaceMock::Response(BSON("ok" << 1.0 << "veto" << true << "errmsg" << "vetoed!"), BSONObj(), @@ -1078,10 +1061,12 @@ TEST_F(FreshnessScatterGatherTest, SecondNodeTiedAndFirstWrongTypeForOpTime) { TEST_F(FreshnessScatterGatherTest, NotEnoughVotersDueNetworkErrors) { ASSERT_FALSE(hasReceivedSufficientResponses()); - processResponse(requestFrom("host1"), ResponseStatus(Status(ErrorCodes::NetworkTimeout, ""))); + processResponse(requestFrom("host1"), + RemoteCommandResponse(Status(ErrorCodes::NetworkTimeout, ""))); ASSERT_FALSE(hasReceivedSufficientResponses()); - processResponse(requestFrom("host2"), ResponseStatus(Status(ErrorCodes::NetworkTimeout, ""))); + processResponse(requestFrom("host2"), + RemoteCommandResponse(Status(ErrorCodes::NetworkTimeout, ""))); ASSERT_TRUE(hasReceivedSufficientResponses()); ASSERT_EQUALS(shouldAbortElection(), FreshnessChecker::QuorumUnreachable); } diff --git a/src/mongo/db/repl/freshness_scanner.cpp b/src/mongo/db/repl/freshness_scanner.cpp index f9de9409373..f55d2c673a8 100644 --- a/src/mongo/db/repl/freshness_scanner.cpp +++ b/src/mongo/db/repl/freshness_scanner.cpp @@ -44,6 +44,7 @@ namespace mongo { namespace repl { using executor::RemoteCommandRequest; +using executor::RemoteCommandResponse; FreshnessScanner::Algorithm::Algorithm(const ReplSetConfig& rsConfig, int myIndex, @@ -70,7 +71,7 @@ std::vector FreshnessScanner::Algorithm::getRequests() con } void FreshnessScanner::Algorithm::processResponse(const RemoteCommandRequest& request, - const ResponseStatus& response) { + const RemoteCommandResponse& response) { _responsesProcessed++; if (!response.isOK()) { // failed response LOG(2) << "FreshnessScanner: Got failed response from " << request.target << ": " @@ -108,10 +109,11 @@ FreshnessScanner::Result FreshnessScanner::Algorithm::getResult() const { return _freshnessInfos; } -StatusWith FreshnessScanner::start(ReplicationExecutor* executor, - const ReplSetConfig& rsConfig, - int myIndex, - Milliseconds timeout) { +StatusWith FreshnessScanner::start( + executor::TaskExecutor* executor, + const ReplSetConfig& rsConfig, + int myIndex, + Milliseconds timeout) { _algorithm.reset(new Algorithm(rsConfig, myIndex, timeout)); _runner.reset(new ScatterGatherRunner(_algorithm.get(), executor)); return _runner->start(); diff --git a/src/mongo/db/repl/freshness_scanner.h b/src/mongo/db/repl/freshness_scanner.h index bc474b4fadd..69e6cccf27b 100644 --- a/src/mongo/db/repl/freshness_scanner.h +++ b/src/mongo/db/repl/freshness_scanner.h @@ -35,9 +35,9 @@ #include "mongo/bson/timestamp.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/repl_set_config.h" -#include "mongo/db/repl/replication_executor.h" #include "mongo/db/repl/scatter_gather_algorithm.h" #include "mongo/db/repl/scatter_gather_runner.h" +#include "mongo/executor/task_executor.h" #include "mongo/stdx/functional.h" namespace mongo { @@ -66,7 +66,7 @@ public: Algorithm(const ReplSetConfig& rsConfig, int myIndex, Milliseconds timeout); virtual std::vector getRequests() const; virtual void processResponse(const executor::RemoteCommandRequest& request, - const ResponseStatus& response); + const executor::RemoteCommandResponse& response); virtual bool hasReceivedSufficientResponses() const; /** @@ -96,10 +96,10 @@ public: * evh can be used to schedule a callback when the process is complete. * If this function returns Status::OK(), evh is then guaranteed to be signaled. **/ - StatusWith start(ReplicationExecutor* executor, - const ReplSetConfig& rsConfig, - int myIndex, - Milliseconds timeout); + StatusWith start(executor::TaskExecutor* executor, + const ReplSetConfig& rsConfig, + int myIndex, + Milliseconds timeout); /** * Informs the FreshnessScanner to cancel further processing. diff --git a/src/mongo/db/repl/freshness_scanner_test.cpp b/src/mongo/db/repl/freshness_scanner_test.cpp index b1778d002ea..cf9c7dbc709 100644 --- a/src/mongo/db/repl/freshness_scanner_test.cpp +++ b/src/mongo/db/repl/freshness_scanner_test.cpp @@ -32,8 +32,8 @@ #include "mongo/db/jsobj.h" #include "mongo/db/repl/freshness_scanner.h" #include "mongo/db/repl/replication_coordinator_impl.h" -#include "mongo/db/repl/replication_executor.h" #include "mongo/executor/network_interface_mock.h" +#include "mongo/executor/thread_pool_task_executor_test_fixture.h" #include "mongo/stdx/functional.h" #include "mongo/stdx/memory.h" #include "mongo/stdx/thread.h" @@ -48,16 +48,11 @@ using executor::RemoteCommandRequest; using executor::RemoteCommandResponse; using unittest::assertGet; -class FreshnessScannerTest : public mongo::unittest::Test { +class FreshnessScannerTest : public executor::ThreadPoolExecutorTest { public: - NetworkInterfaceMock* getNet() { - return _net; - } - ReplicationExecutor* getExecutor() { - return _executor.get(); - } - virtual void setUp() { + executor::ThreadPoolExecutorTest::setUp(); + launchExecutorThread(); ASSERT_OK(_config.initialize(BSON("_id" << "rs0" << "version" @@ -82,17 +77,6 @@ public: << "priority" << 0))))); ASSERT_OK(_config.validate()); - - auto net = stdx::make_unique(); - _net = net.get(); - _executor = stdx::make_unique(std::move(net), 1 /* prng seed */); - _executorThread = - stdx::make_unique(stdx::bind(&ReplicationExecutor::run, _executor.get())); - } - - virtual void tearDown() { - _executor->shutdown(); - _executorThread->join(); } protected: @@ -104,42 +88,36 @@ protected: Milliseconds(0)); } - ResponseStatus makeResponseStatus(BSONObj response) { - return ResponseStatus( + RemoteCommandResponse makeRemoteCommandResponse(BSONObj response) { + return RemoteCommandResponse( NetworkInterfaceMock::Response(response, BSONObj(), Milliseconds(10))); } - ResponseStatus badResponseStatus() { - return ResponseStatus(ErrorCodes::NodeNotFound, "not on my watch"); + RemoteCommandResponse badRemoteCommandResponse() { + return RemoteCommandResponse(ErrorCodes::NodeNotFound, "not on my watch"); } - ResponseStatus goodResponseStatus(Timestamp timestamp, long long term) { + RemoteCommandResponse goodRemoteCommandResponse(Timestamp timestamp, long long term) { // OpTime part of replSetGetStatus. BSONObj response = BSON("optimes" << BSON("appliedOpTime" << OpTime(timestamp, term).toBSON())); - return makeResponseStatus(response); + return makeRemoteCommandResponse(response); } ReplSetConfig _config; - -private: - // owned by _executor - NetworkInterfaceMock* _net; - std::unique_ptr _executor; - std::unique_ptr _executorThread; }; TEST_F(FreshnessScannerTest, ImmediateGoodResponse) { FreshnessScanner::Algorithm algo(_config, 0, Milliseconds(2000)); ASSERT_FALSE(algo.hasReceivedSufficientResponses()); - algo.processResponse(requestFrom("host1"), goodResponseStatus(Timestamp(1, 100), 1)); + algo.processResponse(requestFrom("host1"), goodRemoteCommandResponse(Timestamp(1, 100), 1)); ASSERT_FALSE(algo.hasReceivedSufficientResponses()); - algo.processResponse(requestFrom("host2"), goodResponseStatus(Timestamp(1, 200), 1)); + algo.processResponse(requestFrom("host2"), goodRemoteCommandResponse(Timestamp(1, 200), 1)); ASSERT_FALSE(algo.hasReceivedSufficientResponses()); - algo.processResponse(requestFrom("host3"), goodResponseStatus(Timestamp(1, 400), 1)); + algo.processResponse(requestFrom("host3"), goodRemoteCommandResponse(Timestamp(1, 400), 1)); ASSERT_FALSE(algo.hasReceivedSufficientResponses()); - algo.processResponse(requestFrom("host4"), goodResponseStatus(Timestamp(1, 300), 1)); + algo.processResponse(requestFrom("host4"), goodRemoteCommandResponse(Timestamp(1, 300), 1)); ASSERT_TRUE(algo.hasReceivedSufficientResponses()); ASSERT_EQUALS((size_t)4, algo.getResult().size()); ASSERT_EQUALS(3, algo.getResult().front().index); @@ -153,18 +131,18 @@ TEST_F(FreshnessScannerTest, ImmediateBadResponse) { // Cannot access host 1 and host 2. ASSERT_FALSE(algo.hasReceivedSufficientResponses()); - algo.processResponse(requestFrom("host1"), badResponseStatus()); + algo.processResponse(requestFrom("host1"), badRemoteCommandResponse()); ASSERT_FALSE(algo.hasReceivedSufficientResponses()); - algo.processResponse(requestFrom("host2"), badResponseStatus()); + algo.processResponse(requestFrom("host2"), badRemoteCommandResponse()); ASSERT_FALSE(algo.hasReceivedSufficientResponses()); // host 3 is in an old version, which doesn't include OpTimes in the response. - algo.processResponse(requestFrom("host3"), makeResponseStatus(BSONObj())); + algo.processResponse(requestFrom("host3"), makeRemoteCommandResponse(BSONObj())); ASSERT_FALSE(algo.hasReceivedSufficientResponses()); // Responses from host 4 in PV0 are considered as bad responses. auto response4 = BSON("optimes" << BSON("appliedOpTime" << Timestamp(1, 300))); - algo.processResponse(requestFrom("host4"), makeResponseStatus(response4)); + algo.processResponse(requestFrom("host4"), makeRemoteCommandResponse(response4)); ASSERT_TRUE(algo.hasReceivedSufficientResponses()); ASSERT_EQUALS((size_t)0, algo.getResult().size()); } @@ -172,7 +150,7 @@ TEST_F(FreshnessScannerTest, ImmediateBadResponse) { TEST_F(FreshnessScannerTest, AllResponsesTimeout) { Milliseconds timeout(2000); FreshnessScanner scanner; - scanner.start(getExecutor(), _config, 0, timeout); + scanner.start(&getExecutor(), _config, 0, timeout); auto net = getNet(); net->enterNetwork(); @@ -191,7 +169,7 @@ TEST_F(FreshnessScannerTest, AllResponsesTimeout) { TEST_F(FreshnessScannerTest, BadResponsesAndTimeout) { Milliseconds timeout(2000); FreshnessScanner scanner; - scanner.start(getExecutor(), _config, 0, timeout); + scanner.start(&getExecutor(), _config, 0, timeout); auto net = getNet(); net->enterNetwork(); @@ -201,11 +179,11 @@ TEST_F(FreshnessScannerTest, BadResponsesAndTimeout) { ASSERT(net->hasReadyRequests()); auto noi = net->getNextReadyRequest(); HostAndPort successfulHost = noi->getRequest().target; - net->scheduleResponse(noi, later, goodResponseStatus(Timestamp(1, 100), 1)); + net->scheduleResponse(noi, later, goodRemoteCommandResponse(Timestamp(1, 100), 1)); // host 2 has a bad connection. ASSERT(net->hasReadyRequests()); - net->scheduleResponse(net->getNextReadyRequest(), later, badResponseStatus()); + net->scheduleResponse(net->getNextReadyRequest(), later, badRemoteCommandResponse()); // host 3 and 4 time out. ASSERT(net->hasReadyRequests()); diff --git a/src/mongo/db/repl/noop_writer_test.cpp b/src/mongo/db/repl/noop_writer_test.cpp deleted file mode 100644 index c0f6dce0ebb..00000000000 --- a/src/mongo/db/repl/noop_writer_test.cpp +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Copyright (C) 2016 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - - -#include "mongo/platform/basic.h" - -#include - -#include "mongo/db/client.h" -#include "mongo/db/json.h" -#include "mongo/db/repl/noop_writer.h" -#include "mongo/db/repl/optime.h" -#include "mongo/db/repl/replication_executor_test_fixture.h" -#include "mongo/unittest/unittest.h" - - -namespace { - -using namespace mongo; -using namespace mongo::repl; - -using unittest::log; - -class NoopWriterTest : public ReplicationExecutorTest { -public: - NoopWriter* getNoopWriter() { - return _noopWriter.get(); - } - -protected: - void setUp() override { - setupNoopWriter(Seconds(1)); - } - - void tearDown() override {} - -private: - void startNoopWriter(OpTime opTime) { - invariant(_noopWriter); - _noopWriter->start(opTime); - } - - void stopNoopWriter() { - invariant(_noopWriter); - _noopWriter->stop(); - } - - void setupNoopWriter(Seconds waitTime) { - invariant(!_noopWriter); - _noopWriter = stdx::make_unique(waitTime); - } - - OpTime _getLastOpTime() { - return _lastOpTime; - } - - std::unique_ptr _noopWriter; - OpTime _lastOpTime; -}; -// TODO: SERVER-25679 -TEST_F(NoopWriterTest, CreateDestroy) { - NoopWriter* writer = getNoopWriter(); - ASSERT(writer != nullptr); -} -} diff --git a/src/mongo/db/repl/rollback_checker_test.cpp b/src/mongo/db/repl/rollback_checker_test.cpp index 9b29fde9874..985a0f59a96 100644 --- a/src/mongo/db/repl/rollback_checker_test.cpp +++ b/src/mongo/db/repl/rollback_checker_test.cpp @@ -31,12 +31,12 @@ #include "mongo/platform/basic.h" #include "mongo/db/client.h" -#include "mongo/db/repl/replication_executor_test_fixture.h" #include "mongo/db/repl/rollback_checker.h" #include "mongo/executor/network_interface_mock.h" -#include "mongo/util/log.h" - +#include "mongo/executor/thread_pool_task_executor_test_fixture.h" +#include "mongo/stdx/memory.h" #include "mongo/unittest/unittest.h" +#include "mongo/util/log.h" namespace { @@ -47,7 +47,7 @@ using executor::RemoteCommandResponse; using LockGuard = stdx::lock_guard; -class RollbackCheckerTest : public ReplicationExecutorTest { +class RollbackCheckerTest : public executor::ThreadPoolExecutorTest { public: RollbackChecker* getRollbackChecker() const; @@ -61,9 +61,10 @@ protected: }; void RollbackCheckerTest::setUp() { - ReplicationExecutorTest::setUp(); + executor::ThreadPoolExecutorTest::setUp(); launchExecutorThread(); - _rollbackChecker = stdx::make_unique(&getReplExecutor(), HostAndPort()); + getNet()->enterNetwork(); + _rollbackChecker = stdx::make_unique(&getExecutor(), HostAndPort()); stdx::lock_guard lk(_mutex); _hasRolledBackResult = {ErrorCodes::NotYetInitialized, ""}; _hasCalledCallback = false; @@ -82,18 +83,21 @@ TEST_F(RollbackCheckerTest, InvalidConstruction) { TEST_F(RollbackCheckerTest, ShutdownBeforeStart) { auto callback = [](const RollbackChecker::Result&) {}; - getReplExecutor().shutdown(); + shutdownExecutorThread(); + joinExecutorThread(); ASSERT_NOT_OK(getRollbackChecker()->reset(callback).getStatus()); ASSERT_NOT_OK(getRollbackChecker()->checkForRollback(callback).getStatus()); } TEST_F(RollbackCheckerTest, ShutdownBeforeHasHadRollback) { - getReplExecutor().shutdown(); + shutdownExecutorThread(); + joinExecutorThread(); ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, getRollbackChecker()->hasHadRollback()); } TEST_F(RollbackCheckerTest, ShutdownBeforeResetSync) { - getReplExecutor().shutdown(); + shutdownExecutorThread(); + joinExecutorThread(); ASSERT_EQUALS(ErrorCodes::CallbackCanceled, getRollbackChecker()->reset_sync()); } @@ -107,7 +111,7 @@ TEST_F(RollbackCheckerTest, reset) { getNet()->runReadyNetworkOperations(); getNet()->exitNetwork(); - getReplExecutor().wait(cbh); + getExecutor().wait(cbh); ASSERT_EQUALS(getRollbackChecker()->getBaseRBID(), 3); } @@ -123,7 +127,7 @@ TEST_F(RollbackCheckerTest, RollbackRBID) { auto commandResponse = BSON("ok" << 1 << "rbid" << 3); getNet()->scheduleSuccessfulResponse(commandResponse); getNet()->runReadyNetworkOperations(); - getReplExecutor().wait(refreshCBH); + getExecutor().wait(refreshCBH); ASSERT_EQUALS(getRollbackChecker()->getBaseRBID(), 3); { LockGuard lk(_mutex); @@ -141,7 +145,7 @@ TEST_F(RollbackCheckerTest, RollbackRBID) { getNet()->runReadyNetworkOperations(); getNet()->exitNetwork(); - getReplExecutor().wait(rbCBH); + getExecutor().wait(rbCBH); ASSERT_EQUALS(getRollbackChecker()->getLastRBID_forTest(), 4); ASSERT_EQUALS(getRollbackChecker()->getBaseRBID(), 3); LockGuard lk(_mutex); @@ -161,7 +165,7 @@ TEST_F(RollbackCheckerTest, NoRollbackRBID) { auto commandResponse = BSON("ok" << 1 << "rbid" << 3); getNet()->scheduleSuccessfulResponse(commandResponse); getNet()->runReadyNetworkOperations(); - getReplExecutor().wait(refreshCBH); + getExecutor().wait(refreshCBH); ASSERT_EQUALS(getRollbackChecker()->getBaseRBID(), 3); { LockGuard lk(_mutex); @@ -179,7 +183,7 @@ TEST_F(RollbackCheckerTest, NoRollbackRBID) { getNet()->runReadyNetworkOperations(); getNet()->exitNetwork(); - getReplExecutor().wait(rbCBH); + getExecutor().wait(rbCBH); ASSERT_EQUALS(getRollbackChecker()->getLastRBID_forTest(), 3); ASSERT_EQUALS(getRollbackChecker()->getBaseRBID(), 3); LockGuard lk(_mutex); diff --git a/src/mongo/db/repl/scatter_gather_algorithm.h b/src/mongo/db/repl/scatter_gather_algorithm.h index a622473c5ee..6c4c31a0799 100644 --- a/src/mongo/db/repl/scatter_gather_algorithm.h +++ b/src/mongo/db/repl/scatter_gather_algorithm.h @@ -30,13 +30,11 @@ #include -#include "mongo/db/repl/replication_executor.h" +#include "mongo/executor/remote_command_request.h" +#include "mongo/executor/remote_command_response.h" namespace mongo { -template -class StatusWith; - namespace repl { /** @@ -63,7 +61,7 @@ public: * Method to call once for each received response. */ virtual void processResponse(const executor::RemoteCommandRequest& request, - const ResponseStatus& response) = 0; + const executor::RemoteCommandResponse& response) = 0; /** * Returns true if no more calls to processResponse are needed to consider the diff --git a/src/mongo/db/repl/scatter_gather_runner.cpp b/src/mongo/db/repl/scatter_gather_runner.cpp index 597239179e0..f29600a2bf2 100644 --- a/src/mongo/db/repl/scatter_gather_runner.cpp +++ b/src/mongo/db/repl/scatter_gather_runner.cpp @@ -43,13 +43,13 @@ namespace repl { using executor::RemoteCommandRequest; using LockGuard = stdx::lock_guard; -using CallbackHandle = ReplicationExecutor::CallbackHandle; -using EventHandle = ReplicationExecutor::EventHandle; -using RemoteCommandCallbackArgs = ReplicationExecutor::RemoteCommandCallbackArgs; -using RemoteCommandCallbackFn = ReplicationExecutor::RemoteCommandCallbackFn; +using CallbackHandle = executor::TaskExecutor::CallbackHandle; +using EventHandle = executor::TaskExecutor::EventHandle; +using RemoteCommandCallbackArgs = executor::TaskExecutor::RemoteCommandCallbackArgs; +using RemoteCommandCallbackFn = executor::TaskExecutor::RemoteCommandCallbackFn; ScatterGatherRunner::ScatterGatherRunner(ScatterGatherAlgorithm* algorithm, - ReplicationExecutor* executor) + executor::TaskExecutor* executor) : _executor(executor), _impl(std::make_shared(algorithm, executor)) {} Status ScatterGatherRunner::run() { @@ -80,7 +80,7 @@ void ScatterGatherRunner::cancel() { * Scatter gather runner implementation. */ ScatterGatherRunner::RunnerImpl::RunnerImpl(ScatterGatherAlgorithm* algorithm, - ReplicationExecutor* executor) + executor::TaskExecutor* executor) : _executor(executor), _algorithm(algorithm) {} StatusWith ScatterGatherRunner::RunnerImpl::start( @@ -124,7 +124,7 @@ void ScatterGatherRunner::RunnerImpl::cancel() { } void ScatterGatherRunner::RunnerImpl::processResponse( - const ReplicationExecutor::RemoteCommandCallbackArgs& cbData) { + const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) { LockGuard lk(_mutex); if (!_sufficientResponsesReceived.isValid()) { @@ -152,9 +152,10 @@ void ScatterGatherRunner::RunnerImpl::processResponse( void ScatterGatherRunner::RunnerImpl::_signalSufficientResponsesReceived() { if (_sufficientResponsesReceived.isValid()) { - std::for_each(_callbacks.begin(), - _callbacks.end(), - stdx::bind(&ReplicationExecutor::cancel, _executor, stdx::placeholders::_1)); + std::for_each( + _callbacks.begin(), + _callbacks.end(), + stdx::bind(&executor::TaskExecutor::cancel, _executor, stdx::placeholders::_1)); // Clear _callbacks to break the cycle of shared_ptr. _callbacks.clear(); _executor->signalEvent(_sufficientResponsesReceived); diff --git a/src/mongo/db/repl/scatter_gather_runner.h b/src/mongo/db/repl/scatter_gather_runner.h index 864e7b3add8..4a58461de1e 100644 --- a/src/mongo/db/repl/scatter_gather_runner.h +++ b/src/mongo/db/repl/scatter_gather_runner.h @@ -31,7 +31,7 @@ #include #include "mongo/base/disallow_copying.h" -#include "mongo/db/repl/replication_executor.h" +#include "mongo/executor/task_executor.h" #include "mongo/stdx/functional.h" #include "mongo/stdx/mutex.h" @@ -56,7 +56,8 @@ public: * * "algorithm" and "executor" must remain in scope until the runner's destructor completes. */ - explicit ScatterGatherRunner(ScatterGatherAlgorithm* algorithm, ReplicationExecutor* executor); + explicit ScatterGatherRunner(ScatterGatherAlgorithm* algorithm, + executor::TaskExecutor* executor); /** * Runs the scatter-gather process and blocks until it completes. @@ -78,7 +79,7 @@ public: * * The returned event will eventually be signaled. */ - StatusWith start(); + StatusWith start(); /** * Informs the runner to cancel further processing. @@ -87,11 +88,11 @@ public: private: /** - * Implementation of a scatter-gather behavior using a ReplicationExecutor. + * Implementation of a scatter-gather behavior using a TaskExecutor. */ class RunnerImpl { public: - explicit RunnerImpl(ScatterGatherAlgorithm* algorithm, ReplicationExecutor* executor); + explicit RunnerImpl(ScatterGatherAlgorithm* algorithm, executor::TaskExecutor* executor); /** * On success, returns an event handle that will be signaled when the runner has @@ -100,8 +101,8 @@ private: * * The returned event will eventually be signaled. */ - StatusWith start( - const ReplicationExecutor::RemoteCommandCallbackFn cb); + StatusWith start( + const executor::TaskExecutor::RemoteCommandCallbackFn cb); /** * Informs the runner to cancel further processing. @@ -111,7 +112,7 @@ private: /** * Callback invoked once for every response from the network. */ - void processResponse(const ReplicationExecutor::RemoteCommandCallbackArgs& cbData); + void processResponse(const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData); private: /** @@ -120,15 +121,15 @@ private: */ void _signalSufficientResponsesReceived(); - ReplicationExecutor* _executor; // Not owned here. + executor::TaskExecutor* _executor; // Not owned here. ScatterGatherAlgorithm* _algorithm; // Not owned here. - ReplicationExecutor::EventHandle _sufficientResponsesReceived; - std::vector _callbacks; + executor::TaskExecutor::EventHandle _sufficientResponsesReceived; + std::vector _callbacks; bool _started = false; stdx::mutex _mutex; }; - ReplicationExecutor* _executor; // Not owned here. + executor::TaskExecutor* _executor; // Not owned here. // This pointer of RunnerImpl will be shared with remote command callbacks to make sure // callbacks can access the members safely. diff --git a/src/mongo/db/repl/scatter_gather_test.cpp b/src/mongo/db/repl/scatter_gather_test.cpp index 63915e0df98..294b2c84ed7 100644 --- a/src/mongo/db/repl/scatter_gather_test.cpp +++ b/src/mongo/db/repl/scatter_gather_test.cpp @@ -28,10 +28,9 @@ #include "mongo/platform/basic.h" -#include "mongo/db/repl/replication_executor.h" #include "mongo/db/repl/scatter_gather_algorithm.h" #include "mongo/db/repl/scatter_gather_runner.h" -#include "mongo/executor/network_interface_mock.h" +#include "mongo/executor/thread_pool_task_executor_test_fixture.h" #include "mongo/stdx/functional.h" #include "mongo/stdx/memory.h" #include "mongo/stdx/thread.h" @@ -57,7 +56,7 @@ public: ScatterGatherTestAlgorithm(int64_t maxResponses = 2) : _done(false), _numResponses(0), _maxResponses(maxResponses) {} - virtual std::vector getRequests() const { + std::vector getRequests() const override { std::vector requests; for (int i = 0; i < kTotalRequests; i++) { requests.push_back(RemoteCommandRequest( @@ -66,8 +65,8 @@ public: return requests; } - virtual void processResponse(const RemoteCommandRequest& request, - const ResponseStatus& response) { + void processResponse(const RemoteCommandRequest& request, + const RemoteCommandResponse& response) override { _numResponses++; } @@ -94,46 +93,21 @@ private: }; /** - * ScatterGatherTest base class which sets up the ReplicationExecutor and NetworkInterfaceMock. + * ScatterGatherTest base class which sets up the TaskExecutor and NetworkInterfaceMock. */ -class ScatterGatherTest : public mongo::unittest::Test { +class ScatterGatherTest : public executor::ThreadPoolExecutorTest { protected: - NetworkInterfaceMock* getNet() { - return _net; - } - ReplicationExecutor* getExecutor() { - return _executor.get(); - } - int64_t countLogLinesContaining(const std::string& needle); - -private: - void setUp(); - void tearDown(); - - // owned by _executor - NetworkInterfaceMock* _net; - std::unique_ptr _executor; - std::unique_ptr _executorThread; + void setUp() { + executor::ThreadPoolExecutorTest::setUp(); + launchExecutorThread(); + } }; -void ScatterGatherTest::setUp() { - auto net = stdx::make_unique(); - _net = net.get(); - _executor = stdx::make_unique(std::move(net), 1 /* prng seed */); - _executorThread.reset(new stdx::thread(stdx::bind(&ReplicationExecutor::run, _executor.get()))); -} - -void ScatterGatherTest::tearDown() { - _executor->shutdown(); - _executorThread->join(); -} - - // Used to run a ScatterGatherRunner in a separate thread, to avoid blocking test execution. class ScatterGatherRunnerRunner { public: - ScatterGatherRunnerRunner(ScatterGatherRunner* sgr, ReplicationExecutor* executor) + ScatterGatherRunnerRunner(ScatterGatherRunner* sgr, executor::TaskExecutor* executor) : _sgr(sgr), _executor(executor), _result(Status(ErrorCodes::BadValue, "failed to set status")) {} @@ -150,12 +124,12 @@ public: } private: - void _run(ReplicationExecutor* executor) { + void _run(executor::TaskExecutor* executor) { _result = _sgr->run(); } ScatterGatherRunner* _sgr; - ReplicationExecutor* _executor; + executor::TaskExecutor* _executor; Status _result; std::unique_ptr _thread; }; @@ -163,7 +137,7 @@ private: // Simple onCompletion function which will toggle a bool, so that we can check the logs to // ensure the onCompletion function ran when expected. executor::TaskExecutor::CallbackFn getOnCompletionTestFunction(bool* ran) { - auto cb = [ran](const ReplicationExecutor::CallbackArgs& cbData) { + auto cb = [ran](const executor::TaskExecutor::CallbackArgs& cbData) { if (!cbData.status.isOK()) { return; } @@ -180,34 +154,31 @@ executor::TaskExecutor::CallbackFn getOnCompletionTestFunction(bool* ran) { // completed. TEST_F(ScatterGatherTest, DeleteAlgorithmAfterItHasCompleted) { ScatterGatherTestAlgorithm* sga = new ScatterGatherTestAlgorithm(); - ScatterGatherRunner* sgr = new ScatterGatherRunner(sga, getExecutor()); + ScatterGatherRunner* sgr = new ScatterGatherRunner(sga, &getExecutor()); bool ranCompletion = false; - StatusWith status = sgr->start(); - getExecutor()->onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion)); + StatusWith status = sgr->start(); + getExecutor().onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion)); ASSERT_OK(status.getStatus()); ASSERT_FALSE(ranCompletion); NetworkInterfaceMock* net = getNet(); net->enterNetwork(); NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); - net->scheduleResponse( - noi, - net->now() + Seconds(2), - ResponseStatus(RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(10)))); + net->scheduleResponse(noi, + net->now() + Seconds(2), + (RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(10)))); ASSERT_FALSE(ranCompletion); noi = net->getNextReadyRequest(); - net->scheduleResponse( - noi, - net->now() + Seconds(2), - ResponseStatus(RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(10)))); + net->scheduleResponse(noi, + net->now() + Seconds(2), + (RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(10)))); ASSERT_FALSE(ranCompletion); noi = net->getNextReadyRequest(); - net->scheduleResponse( - noi, - net->now() + Seconds(5), - ResponseStatus(RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(10)))); + net->scheduleResponse(noi, + net->now() + Seconds(5), + (RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(10)))); ASSERT_FALSE(ranCompletion); net->runUntil(net->now() + Seconds(2)); @@ -221,23 +192,23 @@ TEST_F(ScatterGatherTest, DeleteAlgorithmAfterItHasCompleted) { net->exitNetwork(); } -// Confirm that shutting the ReplicationExecutor down before calling run() will cause run() +// Confirm that shutting the TaskExecutor down before calling run() will cause run() // to return ErrorCodes::ShutdownInProgress. TEST_F(ScatterGatherTest, ShutdownExecutorBeforeRun) { ScatterGatherTestAlgorithm sga; - ScatterGatherRunner sgr(&sga, getExecutor()); - getExecutor()->shutdown(); + ScatterGatherRunner sgr(&sga, &getExecutor()); + shutdownExecutorThread(); sga.finish(); Status status = sgr.run(); ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, status); } -// Confirm that shutting the ReplicationExecutor down after calling run(), but before run() +// Confirm that shutting the TaskExecutor down after calling run(), but before run() // finishes will cause run() to return Status::OK(). TEST_F(ScatterGatherTest, ShutdownExecutorAfterRun) { ScatterGatherTestAlgorithm sga; - ScatterGatherRunner sgr(&sga, getExecutor()); - ScatterGatherRunnerRunner sgrr(&sgr, getExecutor()); + ScatterGatherRunner sgr(&sga, &getExecutor()); + ScatterGatherRunnerRunner sgrr(&sgr, &getExecutor()); sgrr.run(); // need to wait for the scatter-gather to be scheduled in the executor NetworkInterfaceMock* net = getNet(); @@ -248,33 +219,34 @@ TEST_F(ScatterGatherTest, ShutdownExecutorAfterRun) { net->blackHole(noi); } net->exitNetwork(); - getExecutor()->shutdown(); + shutdownExecutorThread(); + joinExecutorThread(); Status status = sgrr.getResult(); ASSERT_OK(status); } -// Confirm that shutting the ReplicationExecutor down before calling start() will cause start() +// Confirm that shutting the TaskExecutor down before calling start() will cause start() // to return ErrorCodes::ShutdownInProgress and should not run onCompletion(). TEST_F(ScatterGatherTest, ShutdownExecutorBeforeStart) { ScatterGatherTestAlgorithm sga; - ScatterGatherRunner sgr(&sga, getExecutor()); - getExecutor()->shutdown(); + ScatterGatherRunner sgr(&sga, &getExecutor()); + shutdownExecutorThread(); bool ranCompletion = false; - StatusWith status = sgr.start(); + StatusWith status = sgr.start(); sga.finish(); ASSERT_FALSE(ranCompletion); ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, status.getStatus()); } -// Confirm that shutting the ReplicationExecutor down after calling start() will cause start() +// Confirm that shutting the TaskExecutor down after calling start() will cause start() // to return Status::OK and should not run onCompletion(). TEST_F(ScatterGatherTest, ShutdownExecutorAfterStart) { ScatterGatherTestAlgorithm sga; - ScatterGatherRunner sgr(&sga, getExecutor()); + ScatterGatherRunner sgr(&sga, &getExecutor()); bool ranCompletion = false; - StatusWith status = sgr.start(); - getExecutor()->onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion)); - getExecutor()->shutdown(); + StatusWith status = sgr.start(); + getExecutor().onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion)); + shutdownExecutorThread(); sga.finish(); ASSERT_FALSE(ranCompletion); ASSERT_OK(status.getStatus()); @@ -283,34 +255,31 @@ TEST_F(ScatterGatherTest, ShutdownExecutorAfterStart) { // Confirm that responses are not processed once sufficient responses have been received. TEST_F(ScatterGatherTest, DoNotProcessMoreThanSufficientResponses) { ScatterGatherTestAlgorithm sga; - ScatterGatherRunner sgr(&sga, getExecutor()); + ScatterGatherRunner sgr(&sga, &getExecutor()); bool ranCompletion = false; - StatusWith status = sgr.start(); - getExecutor()->onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion)); + StatusWith status = sgr.start(); + getExecutor().onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion)); ASSERT_OK(status.getStatus()); ASSERT_FALSE(ranCompletion); NetworkInterfaceMock* net = getNet(); net->enterNetwork(); NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); - net->scheduleResponse( - noi, - net->now() + Seconds(2), - ResponseStatus(RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(10)))); + net->scheduleResponse(noi, + net->now() + Seconds(2), + (RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(10)))); ASSERT_FALSE(ranCompletion); noi = net->getNextReadyRequest(); - net->scheduleResponse( - noi, - net->now() + Seconds(2), - ResponseStatus(RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(10)))); + net->scheduleResponse(noi, + net->now() + Seconds(2), + (RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(10)))); ASSERT_FALSE(ranCompletion); noi = net->getNextReadyRequest(); - net->scheduleResponse( - noi, - net->now() + Seconds(5), - ResponseStatus(RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(10)))); + net->scheduleResponse(noi, + net->now() + Seconds(5), + (RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(10)))); ASSERT_FALSE(ranCompletion); net->runUntil(net->now() + Seconds(2)); @@ -328,10 +297,10 @@ TEST_F(ScatterGatherTest, DoNotCreateCallbacksIfHasSufficientResponsesReturnsTru ScatterGatherTestAlgorithm sga; // set hasReceivedSufficientResponses to return true before the run starts sga.finish(); - ScatterGatherRunner sgr(&sga, getExecutor()); + ScatterGatherRunner sgr(&sga, &getExecutor()); bool ranCompletion = false; - StatusWith status = sgr.start(); - getExecutor()->onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion)); + StatusWith status = sgr.start(); + getExecutor().onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion)); ASSERT_OK(status.getStatus()); // Wait until callback finishes. NetworkInterfaceMock* net = getNet(); @@ -349,7 +318,7 @@ TEST_F(ScatterGatherTest, DoNotCreateCallbacksIfHasSufficientResponsesReturnsTru ScatterGatherTestAlgorithm sga(5); ScatterGatherRunner sgr(&sga); bool ranCompletion = false; - StatusWith status = sgr.start(getExecutor(), + StatusWith status = sgr.start(&getExecutor(), getOnCompletionTestFunction(&ranCompletion)); ASSERT_OK(status.getStatus()); ASSERT_FALSE(ranCompletion); @@ -359,7 +328,7 @@ TEST_F(ScatterGatherTest, DoNotCreateCallbacksIfHasSufficientResponsesReturnsTru NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); net->scheduleResponse(noi, net->now(), - ResponseStatus(RemoteCommandResponse( + (RemoteCommandResponse( BSON("ok" << 1), boost::posix_time::milliseconds(10)))); net->runReadyNetworkOperations(); @@ -368,7 +337,7 @@ TEST_F(ScatterGatherTest, DoNotCreateCallbacksIfHasSufficientResponsesReturnsTru noi = net->getNextReadyRequest(); net->scheduleResponse(noi, net->now(), - ResponseStatus(RemoteCommandResponse( + (RemoteCommandResponse( BSON("ok" << 1), boost::posix_time::milliseconds(10)))); net->runReadyNetworkOperations(); @@ -377,7 +346,7 @@ TEST_F(ScatterGatherTest, DoNotCreateCallbacksIfHasSufficientResponsesReturnsTru noi = net->getNextReadyRequest(); net->scheduleResponse(noi, net->now(), - ResponseStatus(RemoteCommandResponse( + (RemoteCommandResponse( BSON("ok" << 1), boost::posix_time::milliseconds(10)))); net->runReadyNetworkOperations(); @@ -389,17 +358,15 @@ TEST_F(ScatterGatherTest, DoNotCreateCallbacksIfHasSufficientResponsesReturnsTru // Confirm that running via run() will finish once sufficient responses have been received. TEST_F(ScatterGatherTest, SuccessfulScatterGatherViaRun) { ScatterGatherTestAlgorithm sga; - ScatterGatherRunner sgr(&sga, getExecutor()); - ScatterGatherRunnerRunner sgrr(&sgr, getExecutor()); + ScatterGatherRunner sgr(&sga, &getExecutor()); + ScatterGatherRunnerRunner sgrr(&sgr, &getExecutor()); sgrr.run(); NetworkInterfaceMock* net = getNet(); net->enterNetwork(); NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); net->scheduleResponse( - noi, - net->now(), - ResponseStatus(RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(10)))); + noi, net->now(), (RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(10)))); net->runReadyNetworkOperations(); noi = net->getNextReadyRequest(); @@ -408,9 +375,7 @@ TEST_F(ScatterGatherTest, SuccessfulScatterGatherViaRun) { noi = net->getNextReadyRequest(); net->scheduleResponse( - noi, - net->now(), - ResponseStatus(RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(10)))); + noi, net->now(), (RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(10)))); net->runReadyNetworkOperations(); net->exitNetwork(); diff --git a/src/mongo/db/repl/task_runner.cpp b/src/mongo/db/repl/task_runner.cpp index d364151a228..ff3080cf161 100644 --- a/src/mongo/db/repl/task_runner.cpp +++ b/src/mongo/db/repl/task_runner.cpp @@ -34,7 +34,7 @@ #include -#include "mongo/db/auth/authorization_manager_global.h" +#include "mongo/db/auth/authorization_manager.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/client.h" #include "mongo/db/operation_context.h" @@ -141,7 +141,7 @@ void TaskRunner::_runTasks() { // to be equal to the client used to create the operation context. Client::initThreadIfNotAlready(); client = &cc(); - if (getGlobalAuthorizationManager()->isAuthEnabled()) { + if (AuthorizationManager::get(client->getServiceContext())->isAuthEnabled()) { AuthorizationSession::get(client)->grantInternalAuthorization(); } } -- cgit v1.2.1