summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndy Schwerin <Andy Schwerin schwerin@mongodb.com>2017-03-24 17:59:14 -0400
committerAndy Schwerin <Andy Schwerin schwerin@mongodb.com>2017-03-27 13:22:25 -0400
commitf4cce647d9bdd3e988a5d514ec64cd4deb9f7a26 (patch)
tree65b178c2d48751d4bb259e2472ec888b4315bd96
parent5e47fab7c971014ae7ce401e94d35dc761fea8c2 (diff)
downloadmongo-f4cce647d9bdd3e988a5d514ec64cd4deb9f7a26.tar.gz
SERVER-28491 Re-host unit tests that used ReplicationExecutorTest onto ThreadPoolExecutorTest.
-rw-r--r--src/mongo/client/SConscript1
-rw-r--r--src/mongo/db/SConscript2
-rw-r--r--src/mongo/db/repl/SConscript59
-rw-r--r--src/mongo/db/repl/check_quorum_for_config_change.cpp10
-rw-r--r--src/mongo/db/repl/check_quorum_for_config_change.h10
-rw-r--r--src/mongo/db/repl/check_quorum_for_config_change_test.cpp301
-rw-r--r--src/mongo/db/repl/elect_cmd_runner.cpp4
-rw-r--r--src/mongo/db/repl/elect_cmd_runner.h8
-rw-r--r--src/mongo/db/repl/elect_cmd_runner_test.cpp140
-rw-r--r--src/mongo/db/repl/freshness_checker.cpp7
-rw-r--r--src/mongo/db/repl/freshness_checker.h14
-rw-r--r--src/mongo/db/repl/freshness_checker_test.cpp425
-rw-r--r--src/mongo/db/repl/freshness_scanner.cpp12
-rw-r--r--src/mongo/db/repl/freshness_scanner.h12
-rw-r--r--src/mongo/db/repl/freshness_scanner_test.cpp66
-rw-r--r--src/mongo/db/repl/noop_writer_test.cpp90
-rw-r--r--src/mongo/db/repl/rollback_checker_test.cpp32
-rw-r--r--src/mongo/db/repl/scatter_gather_algorithm.h8
-rw-r--r--src/mongo/db/repl/scatter_gather_runner.cpp21
-rw-r--r--src/mongo/db/repl/scatter_gather_runner.h25
-rw-r--r--src/mongo/db/repl/scatter_gather_test.cpp167
-rw-r--r--src/mongo/db/repl/task_runner.cpp4
22 files changed, 614 insertions, 804 deletions
diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript
index f7321607742..89fb9b75516 100644
--- a/src/mongo/client/SConscript
+++ b/src/mongo/client/SConscript
@@ -177,7 +177,6 @@ env.Library(
LIBDEPS=[
'clientdriver',
'$BUILD_DIR/mongo/db/auth/authcommon',
- '$BUILD_DIR/mongo/db/auth/authorization_manager_global',
]
)
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 40c9eabc16d..7bf9032d005 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -538,7 +538,7 @@ env.Library(
'$BUILD_DIR/mongo/rpc/rpc',
'$BUILD_DIR/mongo/s/write_ops/batch_write_types',
'audit',
- 'auth/authorization_manager_global',
+ 'auth/authcore',
'commands/server_status_core',
'commands/test_commands_enabled',
'service_context',
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 08503eb92bd..7931b0d082f 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -125,40 +125,40 @@ env.Library(
target='replication_executor',
source=[
'replication_executor.cpp',
- 'scatter_gather_algorithm.cpp',
- 'scatter_gather_runner.cpp',
],
LIBDEPS=[
'database_task',
'task_runner',
'$BUILD_DIR/mongo/executor/network_interface',
'$BUILD_DIR/mongo/executor/task_executor_interface',
- '$BUILD_DIR/mongo/util/net/hostandport',
],
)
env.Library(
- target='replication_executor_test_fixture',
+ target='scatter_gather',
source=[
- 'replication_executor_test_fixture.cpp',
- ],
+ 'scatter_gather_algorithm.cpp',
+ 'scatter_gather_runner.cpp',
+ ],
LIBDEPS=[
- 'replication_executor',
- 'replmocks',
- 'service_context_repl_mock_init',
- '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
- '$BUILD_DIR/mongo/executor/task_executor_test_fixture',
+ '$BUILD_DIR/mongo/executor/task_executor_interface',
],
)
+
env.CppUnitTest(
target='replication_executor_test',
source=[
'replication_executor_test.cpp',
+ 'replication_executor_test_fixture.cpp',
],
LIBDEPS=[
- 'replication_executor_test_fixture',
- '$BUILD_DIR/mongo/db/bson/dotted_path_support',
+ 'replication_executor',
+ 'replmocks',
+ 'service_context_repl_mock_init',
+ '$BUILD_DIR/mongo/db/auth/authorization_manager_global',
+ '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
+ '$BUILD_DIR/mongo/executor/task_executor_test_fixture',
'$BUILD_DIR/mongo/unittest/concurrency',
],
)
@@ -456,7 +456,6 @@ env.Library('topology_coordinator_impl',
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/audit',
- 'replication_executor',
'replica_set_messages',
'repl_settings',
'rslog',
@@ -509,6 +508,7 @@ env.Library('repl_coordinator_impl',
'vote_requester.cpp',
],
LIBDEPS=[
+ '$BUILD_DIR/mongo/db/commands/test_commands_enabled',
'$BUILD_DIR/mongo/db/common',
'$BUILD_DIR/mongo/db/index/index_descriptor',
'$BUILD_DIR/mongo/db/server_options_core',
@@ -527,6 +527,7 @@ env.Library('repl_coordinator_impl',
'replication_executor',
'reporter',
'rslog',
+ 'scatter_gather',
'topology_coordinator',
])
@@ -540,6 +541,7 @@ env.Library(
'replmocks',
'service_context_repl_mock_init',
'topology_coordinator_impl',
+ '$BUILD_DIR/mongo/db/auth/authorization_manager_global',
'$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
'$BUILD_DIR/mongo/unittest/unittest',
],
@@ -575,10 +577,8 @@ env.CppUnitTest(
'scatter_gather_test.cpp',
],
LIBDEPS=[
- 'repl_coordinator_impl',
- 'replmocks',
- '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
- '$BUILD_DIR/mongo/db/service_context_noop_init',
+ 'scatter_gather',
+ '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture',
],
)
@@ -589,10 +589,8 @@ env.CppUnitTest(
],
LIBDEPS=[
'repl_coordinator_impl',
- 'replication_executor',
'replmocks',
- '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
- '$BUILD_DIR/mongo/db/service_context_noop_init',
+ '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture',
],
)
@@ -605,8 +603,7 @@ env.CppUnitTest(
'repl_coordinator_impl',
'replica_set_messages',
'replmocks',
- '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
- '$BUILD_DIR/mongo/db/service_context_noop_init',
+ '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture',
],
)
@@ -634,8 +631,7 @@ env.CppUnitTest(
'repl_coordinator_impl',
'replica_set_messages',
'replmocks',
- '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
- '$BUILD_DIR/mongo/db/service_context_noop_init',
+ '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture',
],
)
@@ -687,7 +683,6 @@ env.Library(
'repl_coordinator_interface',
'repl_settings',
'replica_set_messages',
- 'replication_executor',
'storage_interface',
'$BUILD_DIR/mongo/db/concurrency/lock_manager',
'$BUILD_DIR/mongo/db/service_context',
@@ -892,6 +887,7 @@ env.Library(
LIBDEPS=[
'replmocks',
'$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture',
+ '$BUILD_DIR/mongo/util/concurrency/thread_pool',
],
)
@@ -920,6 +916,7 @@ env.CppUnitTest(
'collection_cloner',
'base_cloner_test_fixture',
'$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
+ '$BUILD_DIR/mongo/db/auth/authorization_manager_global',
'$BUILD_DIR/mongo/db/service_context_noop_init',
'$BUILD_DIR/mongo/unittest/task_executor_proxy',
],
@@ -981,7 +978,6 @@ env.Library(
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/auth/authcore',
- '$BUILD_DIR/mongo/db/auth/authorization_manager_global',
'$BUILD_DIR/mongo/util/concurrency/thread_pool',
],
)
@@ -996,6 +992,7 @@ env.Library(
'service_context_repl_mock_init',
'task_runner',
'$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
+ '$BUILD_DIR/mongo/db/auth/authorization_manager_global',
'$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/mongo/unittest/concurrency',
'$BUILD_DIR/mongo/unittest/unittest',
@@ -1153,7 +1150,8 @@ env.CppUnitTest(
'base_cloner_test_fixture',
'initial_syncer',
'data_replicator_external_state_mock',
- 'replication_executor_test_fixture',
+ 'replmocks',
+ 'service_context_repl_mock_init',
'sync_source_selector_mock',
'task_executor_mock',
'$BUILD_DIR/mongo/db/query/command_request_response',
@@ -1178,9 +1176,9 @@ env.CppUnitTest(
'rollback_checker_test.cpp',
],
LIBDEPS=[
- 'replication_executor_test_fixture',
'rollback_checker',
'$BUILD_DIR/mongo/unittest/concurrency',
+ '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture',
],
)
@@ -1214,8 +1212,7 @@ env.CppUnitTest(
'repl_coordinator_impl',
'replica_set_messages',
'replmocks',
- '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
- '$BUILD_DIR/mongo/db/service_context_noop_init',
+ '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture',
],
)
diff --git a/src/mongo/db/repl/check_quorum_for_config_change.cpp b/src/mongo/db/repl/check_quorum_for_config_change.cpp
index 1dd1fe3ebbb..a1be65dbaf3 100644
--- a/src/mongo/db/repl/check_quorum_for_config_change.cpp
+++ b/src/mongo/db/repl/check_quorum_for_config_change.cpp
@@ -111,7 +111,7 @@ std::vector<RemoteCommandRequest> QuorumChecker::getRequests() const {
}
void QuorumChecker::processResponse(const RemoteCommandRequest& request,
- const ResponseStatus& response) {
+ const executor::RemoteCommandResponse& response) {
_tabulateHeartbeatResponse(request, response);
if (hasReceivedSufficientResponses()) {
_onQuorumCheckComplete();
@@ -177,7 +177,7 @@ void QuorumChecker::_onQuorumCheckComplete() {
}
void QuorumChecker::_tabulateHeartbeatResponse(const RemoteCommandRequest& request,
- const ResponseStatus& response) {
+ const executor::RemoteCommandResponse& response) {
++_numResponses;
if (!response.isOK()) {
warning() << "Failed to complete heartbeat request to " << request.target << "; "
@@ -279,7 +279,7 @@ bool QuorumChecker::hasReceivedSufficientResponses() const {
return true;
}
-Status checkQuorumGeneral(ReplicationExecutor* executor,
+Status checkQuorumGeneral(executor::TaskExecutor* executor,
const ReplSetConfig& rsConfig,
const int myIndex) {
QuorumChecker checker(&rsConfig, myIndex);
@@ -292,14 +292,14 @@ Status checkQuorumGeneral(ReplicationExecutor* executor,
return checker.getFinalStatus();
}
-Status checkQuorumForInitiate(ReplicationExecutor* executor,
+Status checkQuorumForInitiate(executor::TaskExecutor* executor,
const ReplSetConfig& rsConfig,
const int myIndex) {
invariant(rsConfig.getConfigVersion() == 1);
return checkQuorumGeneral(executor, rsConfig, myIndex);
}
-Status checkQuorumForReconfig(ReplicationExecutor* executor,
+Status checkQuorumForReconfig(executor::TaskExecutor* executor,
const ReplSetConfig& rsConfig,
const int myIndex) {
invariant(rsConfig.getConfigVersion() > 1);
diff --git a/src/mongo/db/repl/check_quorum_for_config_change.h b/src/mongo/db/repl/check_quorum_for_config_change.h
index 746a21d5ec9..2bc9a289464 100644
--- a/src/mongo/db/repl/check_quorum_for_config_change.h
+++ b/src/mongo/db/repl/check_quorum_for_config_change.h
@@ -29,8 +29,8 @@
#pragma once
#include "mongo/base/status.h"
-#include "mongo/db/repl/replication_executor.h"
#include "mongo/db/repl/scatter_gather_algorithm.h"
+#include "mongo/executor/task_executor.h"
namespace mongo {
namespace repl {
@@ -62,7 +62,7 @@ public:
virtual std::vector<executor::RemoteCommandRequest> getRequests() const;
virtual void processResponse(const executor::RemoteCommandRequest& request,
- const ResponseStatus& response);
+ const executor::RemoteCommandResponse& response);
virtual bool hasReceivedSufficientResponses() const;
@@ -83,7 +83,7 @@ private:
* Updates the QuorumChecker state based on the data from a single heartbeat response.
*/
void _tabulateHeartbeatResponse(const executor::RemoteCommandRequest& request,
- const ResponseStatus& response);
+ const executor::RemoteCommandResponse& response);
// Pointer to the replica set configuration for which we're checking quorum.
const ReplSetConfig* const _rsConfig;
@@ -126,7 +126,7 @@ private:
* - No nodes are already joined to a replica set.
* - No node reports a replica set name other than the one in "rsConfig".
*/
-Status checkQuorumForInitiate(ReplicationExecutor* executor,
+Status checkQuorumForInitiate(executor::TaskExecutor* executor,
const ReplSetConfig& rsConfig,
const int myIndex);
@@ -144,7 +144,7 @@ Status checkQuorumForInitiate(ReplicationExecutor* executor,
* - No responding node reports a replica set name other than the one in "rsConfig".
* - All responding nodes report a config version less than the one in "rsConfig".
*/
-Status checkQuorumForReconfig(ReplicationExecutor* executor,
+Status checkQuorumForReconfig(executor::TaskExecutor* executor,
const ReplSetConfig& rsConfig,
const int myIndex);
diff --git a/src/mongo/db/repl/check_quorum_for_config_change_test.cpp b/src/mongo/db/repl/check_quorum_for_config_change_test.cpp
index dc1ac7c722e..230a13f43ae 100644
--- a/src/mongo/db/repl/check_quorum_for_config_change_test.cpp
+++ b/src/mongo/db/repl/check_quorum_for_config_change_test.cpp
@@ -36,8 +36,8 @@
#include "mongo/db/repl/repl_set_config.h"
#include "mongo/db/repl/repl_set_heartbeat_args.h"
#include "mongo/db/repl/repl_set_heartbeat_response.h"
-#include "mongo/db/repl/replication_executor.h"
#include "mongo/executor/network_interface_mock.h"
+#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
#include "mongo/platform/unordered_set.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/stdx/functional.h"
@@ -68,7 +68,7 @@ using executor::NetworkInterfaceMock;
using executor::RemoteCommandRequest;
using executor::RemoteCommandResponse;
-class CheckQuorumTest : public mongo::unittest::Test {
+class CheckQuorumTest : public executor::ThreadPoolExecutorTest {
protected:
CheckQuorumTest();
@@ -76,17 +76,14 @@ protected:
Status waitForQuorumCheck();
bool isQuorumCheckDone();
- NetworkInterfaceMock* _net;
- std::unique_ptr<ReplicationExecutor> _executor;
-
private:
- void setUp();
- void tearDown();
-
+ void setUp() {
+ executor::ThreadPoolExecutorTest::setUp();
+ launchExecutorThread();
+ }
void _runQuorumCheck(const ReplSetConfig& config, int myIndex);
virtual Status _runQuorumCheckImpl(const ReplSetConfig& config, int myIndex) = 0;
- std::unique_ptr<stdx::thread> _executorThread;
std::unique_ptr<stdx::thread> _quorumCheckThread;
Status _quorumCheckStatus;
stdx::mutex _mutex;
@@ -96,18 +93,6 @@ private:
CheckQuorumTest::CheckQuorumTest()
: _quorumCheckStatus(ErrorCodes::InternalError, "Not executed") {}
-void CheckQuorumTest::setUp() {
- auto net = stdx::make_unique<NetworkInterfaceMock>();
- _net = net.get();
- _executor = stdx::make_unique<ReplicationExecutor>(std::move(net), 1 /* prng seed */);
- _executorThread.reset(new stdx::thread(stdx::bind(&ReplicationExecutor::run, _executor.get())));
-}
-
-void CheckQuorumTest::tearDown() {
- _executor->shutdown();
- _executorThread->join();
-}
-
void CheckQuorumTest::startQuorumCheck(const ReplSetConfig& config, int myIndex) {
ASSERT_FALSE(_quorumCheckThread);
_isQuorumCheckDone = false;
@@ -135,14 +120,14 @@ void CheckQuorumTest::_runQuorumCheck(const ReplSetConfig& config, int myIndex)
class CheckQuorumForInitiate : public CheckQuorumTest {
private:
virtual Status _runQuorumCheckImpl(const ReplSetConfig& config, int myIndex) {
- return checkQuorumForInitiate(_executor.get(), config, myIndex);
+ return checkQuorumForInitiate(&getExecutor(), config, myIndex);
}
};
class CheckQuorumForReconfig : public CheckQuorumTest {
protected:
virtual Status _runQuorumCheckImpl(const ReplSetConfig& config, int myIndex) {
- return checkQuorumForReconfig(_executor.get(), config, myIndex);
+ return checkQuorumForReconfig(&getExecutor(), config, myIndex);
}
};
@@ -166,7 +151,7 @@ TEST_F(CheckQuorumForInitiate, ValidSingleNodeSet) {
}
TEST_F(CheckQuorumForInitiate, QuorumCheckCanceledByShutdown) {
- _executor->shutdown();
+ getExecutor().shutdown();
ReplSetConfig config = assertMakeRSConfig(BSON("_id"
<< "rs0"
<< "version"
@@ -198,17 +183,17 @@ TEST_F(CheckQuorumForInitiate, QuorumCheckFailedDueToSeveralDownNodes) {
<< BSON("_id" << 5 << "host"
<< "h5:1"))));
startQuorumCheck(config, 2);
- _net->enterNetwork();
- const Date_t startDate = _net->now();
+ getNet()->enterNetwork();
+ const Date_t startDate = getNet()->now();
const int numCommandsExpected = config.getNumMembers() - 1;
for (int i = 0; i < numCommandsExpected; ++i) {
- _net->scheduleResponse(_net->getNextReadyRequest(),
- startDate + Milliseconds(10),
- {ErrorCodes::NoSuchKey, "No reply"});
+ getNet()->scheduleResponse(getNet()->getNextReadyRequest(),
+ startDate + Milliseconds(10),
+ {ErrorCodes::NoSuchKey, "No reply"});
}
- _net->runUntil(startDate + Milliseconds(10));
- _net->exitNetwork();
- ASSERT_EQUALS(startDate + Milliseconds(10), _net->now());
+ getNet()->runUntil(startDate + Milliseconds(10));
+ getNet()->exitNetwork();
+ ASSERT_EQUALS(startDate + Milliseconds(10), getNet()->now());
Status status = waitForQuorumCheck();
ASSERT_EQUALS(ErrorCodes::NodeNotFound, status);
ASSERT_REASON_CONTAINS(
@@ -256,24 +241,24 @@ TEST_F(CheckQuorumForInitiate, QuorumCheckSuccessForFiveNodes) {
const BSONObj hbRequest = makeHeartbeatRequest(rsConfig, myConfigIndex);
startQuorumCheck(rsConfig, myConfigIndex);
- const Date_t startDate = _net->now();
+ const Date_t startDate = getNet()->now();
const int numCommandsExpected = rsConfig.getNumMembers() - 1;
unordered_set<HostAndPort> seenHosts;
- _net->enterNetwork();
+ getNet()->enterNetwork();
for (int i = 0; i < numCommandsExpected; ++i) {
- const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest();
+ const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
const RemoteCommandRequest& request = noi->getRequest();
ASSERT_EQUALS("admin", request.dbname);
ASSERT_BSONOBJ_EQ(hbRequest, request.cmdObj);
ASSERT(seenHosts.insert(request.target).second) << "Already saw "
<< request.target.toString();
- _net->scheduleResponse(
+ getNet()->scheduleResponse(
noi,
startDate + Milliseconds(10),
- ResponseStatus(RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(8))));
+ (RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(8))));
}
- _net->runUntil(startDate + Milliseconds(10));
- _net->exitNetwork();
+ getNet()->runUntil(startDate + Milliseconds(10));
+ getNet()->exitNetwork();
ASSERT_OK(waitForQuorumCheck());
}
@@ -309,29 +294,29 @@ TEST_F(CheckQuorumForInitiate, QuorumCheckFailedDueToOneDownNode) {
const BSONObj hbRequest = makeHeartbeatRequest(rsConfig, myConfigIndex);
startQuorumCheck(rsConfig, myConfigIndex);
- const Date_t startDate = _net->now();
+ const Date_t startDate = getNet()->now();
const int numCommandsExpected = rsConfig.getNumMembers() - 1;
unordered_set<HostAndPort> seenHosts;
- _net->enterNetwork();
+ getNet()->enterNetwork();
for (int i = 0; i < numCommandsExpected; ++i) {
- const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest();
+ const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
const RemoteCommandRequest& request = noi->getRequest();
ASSERT_EQUALS("admin", request.dbname);
ASSERT_BSONOBJ_EQ(hbRequest, request.cmdObj);
ASSERT(seenHosts.insert(request.target).second) << "Already saw "
<< request.target.toString();
if (request.target == HostAndPort("h2", 1)) {
- _net->scheduleResponse(
+ getNet()->scheduleResponse(
noi, startDate + Milliseconds(10), {ErrorCodes::NoSuchKey, "No response"});
} else {
- _net->scheduleResponse(
+ getNet()->scheduleResponse(
noi,
startDate + Milliseconds(10),
- ResponseStatus(RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(8))));
+ (RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(8))));
}
}
- _net->runUntil(startDate + Milliseconds(10));
- _net->exitNetwork();
+ getNet()->runUntil(startDate + Milliseconds(10));
+ getNet()->exitNetwork();
Status status = waitForQuorumCheck();
ASSERT_EQUALS(ErrorCodes::NodeNotFound, status);
ASSERT_REASON_CONTAINS(
@@ -369,32 +354,32 @@ TEST_F(CheckQuorumForInitiate, QuorumCheckFailedDueToSetNameMismatch) {
const BSONObj hbRequest = makeHeartbeatRequest(rsConfig, myConfigIndex);
startQuorumCheck(rsConfig, myConfigIndex);
- const Date_t startDate = _net->now();
+ const Date_t startDate = getNet()->now();
const int numCommandsExpected = rsConfig.getNumMembers() - 1;
unordered_set<HostAndPort> seenHosts;
- _net->enterNetwork();
+ getNet()->enterNetwork();
for (int i = 0; i < numCommandsExpected; ++i) {
- const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest();
+ const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
const RemoteCommandRequest& request = noi->getRequest();
ASSERT_EQUALS("admin", request.dbname);
ASSERT_BSONOBJ_EQ(hbRequest, request.cmdObj);
ASSERT(seenHosts.insert(request.target).second) << "Already saw "
<< request.target.toString();
if (request.target == HostAndPort("h4", 1)) {
- _net->scheduleResponse(
- noi,
- startDate + Milliseconds(10),
- ResponseStatus(RemoteCommandResponse(
- BSON("ok" << 0 << "mismatch" << true), BSONObj(), Milliseconds(8))));
+ getNet()->scheduleResponse(noi,
+ startDate + Milliseconds(10),
+ (RemoteCommandResponse(BSON("ok" << 0 << "mismatch" << true),
+ BSONObj(),
+ Milliseconds(8))));
} else {
- _net->scheduleResponse(
+ getNet()->scheduleResponse(
noi,
startDate + Milliseconds(10),
- ResponseStatus(RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(8))));
+ (RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(8))));
}
}
- _net->runUntil(startDate + Milliseconds(10));
- _net->exitNetwork();
+ getNet()->runUntil(startDate + Milliseconds(10));
+ getNet()->exitNetwork();
Status status = waitForQuorumCheck();
ASSERT_EQUALS(ErrorCodes::NewReplicaSetConfigurationIncompatible, status);
ASSERT_REASON_CONTAINS(status, "Our set name did not match");
@@ -433,14 +418,14 @@ TEST_F(CheckQuorumForInitiate, QuorumCheckFailedDueToSetIdMismatch) {
const BSONObj hbRequest = makeHeartbeatRequest(rsConfig, myConfigIndex);
startQuorumCheck(rsConfig, myConfigIndex);
- const Date_t startDate = _net->now();
+ const Date_t startDate = getNet()->now();
const int numCommandsExpected = rsConfig.getNumMembers() - 1;
unordered_set<HostAndPort> seenHosts;
- _net->enterNetwork();
+ getNet()->enterNetwork();
HostAndPort incompatibleHost("h4", 1);
OID unexpectedId = OID::gen();
for (int i = 0; i < numCommandsExpected; ++i) {
- const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest();
+ const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
const RemoteCommandRequest& request = noi->getRequest();
ASSERT_EQUALS("admin", request.dbname);
ASSERT_BSONOBJ_EQ(hbRequest, request.cmdObj);
@@ -459,19 +444,19 @@ TEST_F(CheckQuorumForInitiate, QuorumCheckFailedDueToSetIdMismatch) {
BSONObjBuilder metadataBuilder;
metadata.writeToMetadata(&metadataBuilder);
- _net->scheduleResponse(noi,
- startDate + Milliseconds(10),
- ResponseStatus(RemoteCommandResponse(
- BSON("ok" << 1), metadataBuilder.obj(), Milliseconds(8))));
+ getNet()->scheduleResponse(
+ noi,
+ startDate + Milliseconds(10),
+ (RemoteCommandResponse(BSON("ok" << 1), metadataBuilder.obj(), Milliseconds(8))));
} else {
- _net->scheduleResponse(
+ getNet()->scheduleResponse(
noi,
startDate + Milliseconds(10),
- ResponseStatus(RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(8))));
+ (RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(8))));
}
}
- _net->runUntil(startDate + Milliseconds(10));
- _net->exitNetwork();
+ getNet()->runUntil(startDate + Milliseconds(10));
+ getNet()->exitNetwork();
Status status = waitForQuorumCheck();
ASSERT_EQUALS(ErrorCodes::NewReplicaSetConfigurationIncompatible, status);
ASSERT_REASON_CONTAINS(status,
@@ -512,35 +497,35 @@ TEST_F(CheckQuorumForInitiate, QuorumCheckFailedDueToInitializedNode) {
const BSONObj hbRequest = makeHeartbeatRequest(rsConfig, myConfigIndex);
startQuorumCheck(rsConfig, myConfigIndex);
- const Date_t startDate = _net->now();
+ const Date_t startDate = getNet()->now();
const int numCommandsExpected = rsConfig.getNumMembers() - 1;
unordered_set<HostAndPort> seenHosts;
- _net->enterNetwork();
+ getNet()->enterNetwork();
for (int i = 0; i < numCommandsExpected; ++i) {
- const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest();
+ const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
const RemoteCommandRequest& request = noi->getRequest();
ASSERT_EQUALS("admin", request.dbname);
ASSERT_BSONOBJ_EQ(hbRequest, request.cmdObj);
ASSERT(seenHosts.insert(request.target).second) << "Already saw "
<< request.target.toString();
if (request.target == HostAndPort("h5", 1)) {
- _net->scheduleResponse(noi,
- startDate + Milliseconds(10),
- ResponseStatus(RemoteCommandResponse(BSON("ok" << 0 << "set"
- << "rs0"
- << "v"
- << 1),
- BSONObj(),
- Milliseconds(8))));
+ getNet()->scheduleResponse(noi,
+ startDate + Milliseconds(10),
+ (RemoteCommandResponse(BSON("ok" << 0 << "set"
+ << "rs0"
+ << "v"
+ << 1),
+ BSONObj(),
+ Milliseconds(8))));
} else {
- _net->scheduleResponse(
+ getNet()->scheduleResponse(
noi,
startDate + Milliseconds(10),
- ResponseStatus(RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(8))));
+ (RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(8))));
}
}
- _net->runUntil(startDate + Milliseconds(10));
- _net->exitNetwork();
+ getNet()->runUntil(startDate + Milliseconds(10));
+ getNet()->exitNetwork();
Status status = waitForQuorumCheck();
ASSERT_EQUALS(ErrorCodes::NewReplicaSetConfigurationIncompatible, status);
ASSERT_REASON_CONTAINS(status, "Our config version of");
@@ -578,32 +563,32 @@ TEST_F(CheckQuorumForInitiate, QuorumCheckFailedDueToInitializedNodeOnlyOneRespo
const BSONObj hbRequest = makeHeartbeatRequest(rsConfig, myConfigIndex);
startQuorumCheck(rsConfig, myConfigIndex);
- const Date_t startDate = _net->now();
+ const Date_t startDate = getNet()->now();
const int numCommandsExpected = rsConfig.getNumMembers() - 1;
unordered_set<HostAndPort> seenHosts;
- _net->enterNetwork();
+ getNet()->enterNetwork();
for (int i = 0; i < numCommandsExpected; ++i) {
- const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest();
+ const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
const RemoteCommandRequest& request = noi->getRequest();
ASSERT_EQUALS("admin", request.dbname);
ASSERT_BSONOBJ_EQ(hbRequest, request.cmdObj);
ASSERT(seenHosts.insert(request.target).second) << "Already saw "
<< request.target.toString();
if (request.target == HostAndPort("h5", 1)) {
- _net->scheduleResponse(noi,
- startDate + Milliseconds(10),
- ResponseStatus(RemoteCommandResponse(BSON("ok" << 0 << "set"
- << "rs0"
- << "v"
- << 1),
- BSONObj(),
- Milliseconds(8))));
+ getNet()->scheduleResponse(noi,
+ startDate + Milliseconds(10),
+ (RemoteCommandResponse(BSON("ok" << 0 << "set"
+ << "rs0"
+ << "v"
+ << 1),
+ BSONObj(),
+ Milliseconds(8))));
} else {
- _net->blackHole(noi);
+ getNet()->blackHole(noi);
}
}
- _net->runUntil(startDate + Milliseconds(10));
- _net->exitNetwork();
+ getNet()->runUntil(startDate + Milliseconds(10));
+ getNet()->exitNetwork();
Status status = waitForQuorumCheck();
ASSERT_EQUALS(ErrorCodes::NewReplicaSetConfigurationIncompatible, status);
ASSERT_REASON_CONTAINS(status, "Our config version of");
@@ -639,12 +624,12 @@ TEST_F(CheckQuorumForInitiate, QuorumCheckFailedDueToNodeWithData) {
const BSONObj hbRequest = makeHeartbeatRequest(rsConfig, myConfigIndex);
startQuorumCheck(rsConfig, myConfigIndex);
- const Date_t startDate = _net->now();
+ const Date_t startDate = getNet()->now();
const int numCommandsExpected = rsConfig.getNumMembers() - 1;
unordered_set<HostAndPort> seenHosts;
- _net->enterNetwork();
+ getNet()->enterNetwork();
for (int i = 0; i < numCommandsExpected; ++i) {
- const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest();
+ const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
const RemoteCommandRequest& request = noi->getRequest();
ASSERT_EQUALS("admin", request.dbname);
ASSERT_BSONOBJ_EQ(hbRequest, request.cmdObj);
@@ -654,16 +639,16 @@ TEST_F(CheckQuorumForInitiate, QuorumCheckFailedDueToNodeWithData) {
hbResp.setConfigVersion(0);
hbResp.noteHasData();
if (request.target == HostAndPort("h5", 1)) {
- _net->scheduleResponse(noi,
- startDate + Milliseconds(10),
- ResponseStatus(RemoteCommandResponse(
- hbResp.toBSON(false), BSONObj(), Milliseconds(8))));
+ getNet()->scheduleResponse(
+ noi,
+ startDate + Milliseconds(10),
+ (RemoteCommandResponse(hbResp.toBSON(false), BSONObj(), Milliseconds(8))));
} else {
- _net->blackHole(noi);
+ getNet()->blackHole(noi);
}
}
- _net->runUntil(startDate + Milliseconds(10));
- _net->exitNetwork();
+ getNet()->runUntil(startDate + Milliseconds(10));
+ getNet()->exitNetwork();
Status status = waitForQuorumCheck();
ASSERT_EQUALS(ErrorCodes::CannotInitializeNodeWithData, status);
ASSERT_REASON_CONTAINS(status, "has data already");
@@ -693,32 +678,32 @@ TEST_F(CheckQuorumForReconfig, QuorumCheckVetoedDueToHigherConfigVersion) {
const BSONObj hbRequest = makeHeartbeatRequest(rsConfig, myConfigIndex);
startQuorumCheck(rsConfig, myConfigIndex);
- const Date_t startDate = _net->now();
+ const Date_t startDate = getNet()->now();
const int numCommandsExpected = rsConfig.getNumMembers() - 1;
unordered_set<HostAndPort> seenHosts;
- _net->enterNetwork();
+ getNet()->enterNetwork();
for (int i = 0; i < numCommandsExpected; ++i) {
- const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest();
+ const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
const RemoteCommandRequest& request = noi->getRequest();
ASSERT_EQUALS("admin", request.dbname);
ASSERT_BSONOBJ_EQ(hbRequest, request.cmdObj);
ASSERT(seenHosts.insert(request.target).second) << "Already saw "
<< request.target.toString();
if (request.target == HostAndPort("h1", 1)) {
- _net->scheduleResponse(noi,
- startDate + Milliseconds(10),
- ResponseStatus(RemoteCommandResponse(BSON("ok" << 0 << "set"
- << "rs0"
- << "v"
- << 5),
- BSONObj(),
- Milliseconds(8))));
+ getNet()->scheduleResponse(noi,
+ startDate + Milliseconds(10),
+ (RemoteCommandResponse(BSON("ok" << 0 << "set"
+ << "rs0"
+ << "v"
+ << 5),
+ BSONObj(),
+ Milliseconds(8))));
} else {
- _net->blackHole(noi);
+ getNet()->blackHole(noi);
}
}
- _net->runUntil(startDate + Milliseconds(10));
- _net->exitNetwork();
+ getNet()->runUntil(startDate + Milliseconds(10));
+ getNet()->exitNetwork();
Status status = waitForQuorumCheck();
ASSERT_EQUALS(ErrorCodes::NewReplicaSetConfigurationIncompatible, status);
ASSERT_REASON_CONTAINS(status, "Our config version of");
@@ -748,30 +733,30 @@ TEST_F(CheckQuorumForReconfig, QuorumCheckVetoedDueToIncompatibleSetName) {
const BSONObj hbRequest = makeHeartbeatRequest(rsConfig, myConfigIndex);
startQuorumCheck(rsConfig, myConfigIndex);
- const Date_t startDate = _net->now();
+ const Date_t startDate = getNet()->now();
const int numCommandsExpected = rsConfig.getNumMembers() - 1;
unordered_set<HostAndPort> seenHosts;
- _net->enterNetwork();
+ getNet()->enterNetwork();
for (int i = 0; i < numCommandsExpected; ++i) {
- const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest();
+ const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
const RemoteCommandRequest& request = noi->getRequest();
ASSERT_EQUALS("admin", request.dbname);
ASSERT_BSONOBJ_EQ(hbRequest, request.cmdObj);
ASSERT(seenHosts.insert(request.target).second) << "Already saw "
<< request.target.toString();
if (request.target == HostAndPort("h2", 1)) {
- _net->scheduleResponse(
- noi,
- startDate + Milliseconds(10),
- ResponseStatus(RemoteCommandResponse(
- BSON("ok" << 0 << "mismatch" << true), BSONObj(), Milliseconds(8))));
+ getNet()->scheduleResponse(noi,
+ startDate + Milliseconds(10),
+ (RemoteCommandResponse(BSON("ok" << 0 << "mismatch" << true),
+ BSONObj(),
+ Milliseconds(8))));
} else {
- _net->scheduleResponse(
+ getNet()->scheduleResponse(
noi, startDate + Milliseconds(10), {ErrorCodes::NoSuchKey, "No response"});
}
}
- _net->runUntil(startDate + Milliseconds(10));
- _net->exitNetwork();
+ getNet()->runUntil(startDate + Milliseconds(10));
+ getNet()->exitNetwork();
Status status = waitForQuorumCheck();
ASSERT_EQUALS(ErrorCodes::NewReplicaSetConfigurationIncompatible, status);
ASSERT_REASON_CONTAINS(status, "Our set name did not match");
@@ -814,29 +799,29 @@ TEST_F(CheckQuorumForReconfig, QuorumCheckFailsDueToInsufficientVoters) {
const BSONObj hbRequest = makeHeartbeatRequest(rsConfig, myConfigIndex);
startQuorumCheck(rsConfig, myConfigIndex);
- const Date_t startDate = _net->now();
+ const Date_t startDate = getNet()->now();
const int numCommandsExpected = rsConfig.getNumMembers() - 1;
unordered_set<HostAndPort> seenHosts;
- _net->enterNetwork();
+ getNet()->enterNetwork();
for (int i = 0; i < numCommandsExpected; ++i) {
- const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest();
+ const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
const RemoteCommandRequest& request = noi->getRequest();
ASSERT_EQUALS("admin", request.dbname);
ASSERT_BSONOBJ_EQ(hbRequest, request.cmdObj);
ASSERT(seenHosts.insert(request.target).second) << "Already saw "
<< request.target.toString();
if (request.target == HostAndPort("h1", 1) || request.target == HostAndPort("h5", 1)) {
- _net->scheduleResponse(
+ getNet()->scheduleResponse(
noi,
startDate + Milliseconds(10),
- ResponseStatus(RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(8))));
+ (RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(8))));
} else {
- _net->scheduleResponse(
+ getNet()->scheduleResponse(
noi, startDate + Milliseconds(10), {ErrorCodes::NoSuchKey, "No response"});
}
}
- _net->runUntil(startDate + Milliseconds(10));
- _net->exitNetwork();
+ getNet()->runUntil(startDate + Milliseconds(10));
+ getNet()->exitNetwork();
Status status = waitForQuorumCheck();
ASSERT_EQUALS(ErrorCodes::NodeNotFound, status);
ASSERT_REASON_CONTAINS(status, "not enough voting nodes responded; required 2 but only");
@@ -875,29 +860,29 @@ TEST_F(CheckQuorumForReconfig, QuorumCheckFailsDueToNoElectableNodeResponding) {
const BSONObj hbRequest = makeHeartbeatRequest(rsConfig, myConfigIndex);
startQuorumCheck(rsConfig, myConfigIndex);
- const Date_t startDate = _net->now();
+ const Date_t startDate = getNet()->now();
const int numCommandsExpected = rsConfig.getNumMembers() - 1;
unordered_set<HostAndPort> seenHosts;
- _net->enterNetwork();
+ getNet()->enterNetwork();
for (int i = 0; i < numCommandsExpected; ++i) {
- const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest();
+ const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
const RemoteCommandRequest& request = noi->getRequest();
ASSERT_EQUALS("admin", request.dbname);
ASSERT_BSONOBJ_EQ(hbRequest, request.cmdObj);
ASSERT(seenHosts.insert(request.target).second) << "Already saw "
<< request.target.toString();
if (request.target == HostAndPort("h5", 1)) {
- _net->scheduleResponse(
+ getNet()->scheduleResponse(
noi,
startDate + Milliseconds(10),
- ResponseStatus(RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(8))));
+ (RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(8))));
} else {
- _net->scheduleResponse(
+ getNet()->scheduleResponse(
noi, startDate + Milliseconds(10), {ErrorCodes::NoSuchKey, "No response"});
}
}
- _net->runUntil(startDate + Milliseconds(10));
- _net->exitNetwork();
+ getNet()->runUntil(startDate + Milliseconds(10));
+ getNet()->exitNetwork();
Status status = waitForQuorumCheck();
ASSERT_EQUALS(ErrorCodes::NodeNotFound, status);
ASSERT_REASON_CONTAINS(status, "no electable nodes responded");
@@ -936,28 +921,28 @@ TEST_F(CheckQuorumForReconfig, QuorumCheckSucceedsWithAsSoonAsPossible) {
const BSONObj hbRequest = makeHeartbeatRequest(rsConfig, myConfigIndex);
startQuorumCheck(rsConfig, myConfigIndex);
- const Date_t startDate = _net->now();
+ const Date_t startDate = getNet()->now();
const int numCommandsExpected = rsConfig.getNumMembers() - 1;
unordered_set<HostAndPort> seenHosts;
- _net->enterNetwork();
+ getNet()->enterNetwork();
for (int i = 0; i < numCommandsExpected; ++i) {
- const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest();
+ const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
const RemoteCommandRequest& request = noi->getRequest();
ASSERT_EQUALS("admin", request.dbname);
ASSERT_BSONOBJ_EQ(hbRequest, request.cmdObj);
ASSERT(seenHosts.insert(request.target).second) << "Already saw "
<< request.target.toString();
if (request.target == HostAndPort("h1", 1) || request.target == HostAndPort("h2", 1)) {
- _net->scheduleResponse(
+ getNet()->scheduleResponse(
noi,
startDate + Milliseconds(10),
- ResponseStatus(RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(8))));
+ (RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(8))));
} else {
- _net->blackHole(noi);
+ getNet()->blackHole(noi);
}
}
- _net->runUntil(startDate + Milliseconds(10));
- _net->exitNetwork();
+ getNet()->runUntil(startDate + Milliseconds(10));
+ getNet()->exitNetwork();
ASSERT_OK(waitForQuorumCheck());
}
diff --git a/src/mongo/db/repl/elect_cmd_runner.cpp b/src/mongo/db/repl/elect_cmd_runner.cpp
index 6f155213861..ef6d9c722d3 100644
--- a/src/mongo/db/repl/elect_cmd_runner.cpp
+++ b/src/mongo/db/repl/elect_cmd_runner.cpp
@@ -128,8 +128,8 @@ void ElectCmdRunner::Algorithm::processResponse(const RemoteCommandRequest& requ
ElectCmdRunner::ElectCmdRunner() : _isCanceled(false) {}
ElectCmdRunner::~ElectCmdRunner() {}
-StatusWith<ReplicationExecutor::EventHandle> ElectCmdRunner::start(
- ReplicationExecutor* executor,
+StatusWith<executor::TaskExecutor::EventHandle> ElectCmdRunner::start(
+ executor::TaskExecutor* executor,
const ReplSetConfig& currentConfig,
int selfIndex,
const std::vector<HostAndPort>& targets) {
diff --git a/src/mongo/db/repl/elect_cmd_runner.h b/src/mongo/db/repl/elect_cmd_runner.h
index c428fa11002..7ca3155c814 100644
--- a/src/mongo/db/repl/elect_cmd_runner.h
+++ b/src/mongo/db/repl/elect_cmd_runner.h
@@ -90,10 +90,10 @@ public:
*
* Returned handle can be used to schedule a callback when the process is complete.
*/
- StatusWith<ReplicationExecutor::EventHandle> start(ReplicationExecutor* executor,
- const ReplSetConfig& currentConfig,
- int selfIndex,
- const std::vector<HostAndPort>& targets);
+ StatusWith<executor::TaskExecutor::EventHandle> start(executor::TaskExecutor* executor,
+ const ReplSetConfig& currentConfig,
+ int selfIndex,
+ const std::vector<HostAndPort>& targets);
/**
* Informs the ElectCmdRunner to cancel further processing.
diff --git a/src/mongo/db/repl/elect_cmd_runner_test.cpp b/src/mongo/db/repl/elect_cmd_runner_test.cpp
index 1361c544b6e..6cb8e4df917 100644
--- a/src/mongo/db/repl/elect_cmd_runner_test.cpp
+++ b/src/mongo/db/repl/elect_cmd_runner_test.cpp
@@ -33,11 +33,10 @@
#include "mongo/db/repl/elect_cmd_runner.h"
#include "mongo/db/repl/member_heartbeat_data.h"
#include "mongo/db/repl/repl_set_config.h"
-#include "mongo/db/repl/replication_executor.h"
#include "mongo/executor/network_interface_mock.h"
+#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
#include "mongo/stdx/functional.h"
#include "mongo/stdx/memory.h"
-#include "mongo/stdx/thread.h"
#include "mongo/unittest/unittest.h"
@@ -51,7 +50,7 @@ using executor::NetworkInterfaceMock;
using executor::RemoteCommandRequest;
using executor::RemoteCommandResponse;
-class ElectCmdRunnerTest : public mongo::unittest::Test {
+class ElectCmdRunnerTest : public executor::ThreadPoolExecutorTest {
public:
void startTest(ElectCmdRunner* electCmdRunner,
const ReplSetConfig& currentConfig,
@@ -60,36 +59,21 @@ public:
void waitForTest();
- void electCmdRunnerRunner(const ReplicationExecutor::CallbackArgs& data,
+ void electCmdRunnerRunner(const executor::TaskExecutor::CallbackArgs& data,
ElectCmdRunner* electCmdRunner,
- StatusWith<ReplicationExecutor::EventHandle>* evh,
+ StatusWith<executor::TaskExecutor::EventHandle>* evh,
const ReplSetConfig& currentConfig,
int selfIndex,
const std::vector<HostAndPort>& hosts);
- NetworkInterfaceMock* _net;
- std::unique_ptr<ReplicationExecutor> _executor;
- std::unique_ptr<stdx::thread> _executorThread;
-
private:
- void setUp();
- void tearDown();
-
- ReplicationExecutor::EventHandle _allDoneEvent;
+ void setUp() {
+ executor::ThreadPoolExecutorTest::setUp();
+ launchExecutorThread();
+ }
+ executor::TaskExecutor::EventHandle _allDoneEvent;
};
-void ElectCmdRunnerTest::setUp() {
- auto net = stdx::make_unique<NetworkInterfaceMock>();
- _net = net.get();
- _executor = stdx::make_unique<ReplicationExecutor>(std::move(net), 1 /* prng seed */);
- _executorThread.reset(new stdx::thread(stdx::bind(&ReplicationExecutor::run, _executor.get())));
-}
-
-void ElectCmdRunnerTest::tearDown() {
- _executor->shutdown();
- _executorThread->join();
-}
-
ReplSetConfig assertMakeRSConfig(const BSONObj& configBson) {
ReplSetConfig config;
ASSERT_OK(config.initialize(configBson));
@@ -123,40 +107,38 @@ BSONObj stripRound(const BSONObj& orig) {
// This is necessary because the run method must be scheduled in the Replication Executor
// for correct concurrency operation.
-void ElectCmdRunnerTest::electCmdRunnerRunner(const ReplicationExecutor::CallbackArgs& data,
+void ElectCmdRunnerTest::electCmdRunnerRunner(const executor::TaskExecutor::CallbackArgs& data,
ElectCmdRunner* electCmdRunner,
- StatusWith<ReplicationExecutor::EventHandle>* evh,
+ StatusWith<executor::TaskExecutor::EventHandle>* evh,
const ReplSetConfig& currentConfig,
int selfIndex,
const std::vector<HostAndPort>& hosts) {
invariant(data.status.isOK());
- ReplicationExecutor* executor = dynamic_cast<ReplicationExecutor*>(data.executor);
- ASSERT(executor);
- *evh = electCmdRunner->start(executor, currentConfig, selfIndex, hosts);
+ *evh = electCmdRunner->start(data.executor, currentConfig, selfIndex, hosts);
}
void ElectCmdRunnerTest::startTest(ElectCmdRunner* electCmdRunner,
const ReplSetConfig& currentConfig,
int selfIndex,
const std::vector<HostAndPort>& hosts) {
- StatusWith<ReplicationExecutor::EventHandle> evh(ErrorCodes::InternalError, "Not set");
- StatusWith<ReplicationExecutor::CallbackHandle> cbh =
- _executor->scheduleWork(stdx::bind(&ElectCmdRunnerTest::electCmdRunnerRunner,
- this,
- stdx::placeholders::_1,
- electCmdRunner,
- &evh,
- currentConfig,
- selfIndex,
- hosts));
+ StatusWith<executor::TaskExecutor::EventHandle> evh(ErrorCodes::InternalError, "Not set");
+ StatusWith<executor::TaskExecutor::CallbackHandle> cbh =
+ getExecutor().scheduleWork(stdx::bind(&ElectCmdRunnerTest::electCmdRunnerRunner,
+ this,
+ stdx::placeholders::_1,
+ electCmdRunner,
+ &evh,
+ currentConfig,
+ selfIndex,
+ hosts));
ASSERT_OK(cbh.getStatus());
- _executor->wait(cbh.getValue());
+ getExecutor().wait(cbh.getValue());
ASSERT_OK(evh.getStatus());
_allDoneEvent = evh.getValue();
}
void ElectCmdRunnerTest::waitForTest() {
- _executor->waitForEvent(_allDoneEvent);
+ getExecutor().waitForEvent(_allDoneEvent);
}
TEST_F(ElectCmdRunnerTest, OneNode) {
@@ -195,21 +177,21 @@ TEST_F(ElectCmdRunnerTest, TwoNodes) {
ElectCmdRunner electCmdRunner;
startTest(&electCmdRunner, config, 0, hosts);
- const Date_t startDate = _net->now();
- _net->enterNetwork();
- const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest();
+ const Date_t startDate = getNet()->now();
+ getNet()->enterNetwork();
+ const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
ASSERT_EQUALS("admin", noi->getRequest().dbname);
ASSERT_BSONOBJ_EQ(stripRound(electRequest), stripRound(noi->getRequest().cmdObj));
ASSERT_EQUALS(HostAndPort("h1"), noi->getRequest().target);
- _net->scheduleResponse(noi,
- startDate + Milliseconds(10),
- ResponseStatus(RemoteCommandResponse(
- BSON("ok" << 1 << "vote" << 1 << "round" << 380865962699346850ll),
+ getNet()->scheduleResponse(
+ noi,
+ startDate + Milliseconds(10),
+ (RemoteCommandResponse(BSON("ok" << 1 << "vote" << 1 << "round" << 380865962699346850ll),
BSONObj(),
Milliseconds(8))));
- _net->runUntil(startDate + Milliseconds(10));
- _net->exitNetwork();
- ASSERT_EQUALS(startDate + Milliseconds(10), _net->now());
+ getNet()->runUntil(startDate + Milliseconds(10));
+ getNet()->exitNetwork();
+ ASSERT_EQUALS(startDate + Milliseconds(10), getNet()->now());
waitForTest();
ASSERT_EQUALS(electCmdRunner.getReceivedVotes(), 2);
}
@@ -230,21 +212,22 @@ TEST_F(ElectCmdRunnerTest, ShuttingDown) {
hosts.push_back(config.getMemberAt(1).getHostAndPort());
ElectCmdRunner electCmdRunner;
- StatusWith<ReplicationExecutor::EventHandle> evh(ErrorCodes::InternalError, "Not set");
- StatusWith<ReplicationExecutor::CallbackHandle> cbh =
- _executor->scheduleWork(stdx::bind(&ElectCmdRunnerTest::electCmdRunnerRunner,
- this,
- stdx::placeholders::_1,
- &electCmdRunner,
- &evh,
- config,
- 0,
- hosts));
+ StatusWith<executor::TaskExecutor::EventHandle> evh(ErrorCodes::InternalError, "Not set");
+ StatusWith<executor::TaskExecutor::CallbackHandle> cbh =
+ getExecutor().scheduleWork(stdx::bind(&ElectCmdRunnerTest::electCmdRunnerRunner,
+ this,
+ stdx::placeholders::_1,
+ &electCmdRunner,
+ &evh,
+ config,
+ 0,
+ hosts));
ASSERT_OK(cbh.getStatus());
- _executor->wait(cbh.getValue());
+ getExecutor().wait(cbh.getValue());
ASSERT_OK(evh.getStatus());
- _executor->shutdown();
- _executor->waitForEvent(evh.getValue());
+ shutdownExecutorThread();
+ joinExecutorThread();
+ getExecutor().waitForEvent(evh.getValue());
ASSERT_EQUALS(electCmdRunner.getReceivedVotes(), 1);
}
@@ -279,7 +262,8 @@ protected:
return _checker->getReceivedVotes();
}
- void processResponse(const RemoteCommandRequest& request, const ResponseStatus& response) {
+ void processResponse(const RemoteCommandRequest& request,
+ const RemoteCommandResponse& response) {
_checker->processResponse(request, response);
}
@@ -291,27 +275,27 @@ protected:
Milliseconds(0));
}
- ResponseStatus badResponseStatus() {
- return ResponseStatus(ErrorCodes::NodeNotFound, "not on my watch");
+ RemoteCommandResponse badRemoteCommandResponse() {
+ return RemoteCommandResponse(ErrorCodes::NodeNotFound, "not on my watch");
}
- ResponseStatus wrongTypeForVoteField() {
- return ResponseStatus(NetworkInterfaceMock::Response(
+ RemoteCommandResponse wrongTypeForVoteField() {
+ return RemoteCommandResponse(NetworkInterfaceMock::Response(
BSON("vote" << std::string("yea")), BSONObj(), Milliseconds(10)));
}
- ResponseStatus voteYea() {
- return ResponseStatus(
+ RemoteCommandResponse voteYea() {
+ return RemoteCommandResponse(
NetworkInterfaceMock::Response(BSON("vote" << 1), BSONObj(), Milliseconds(10)));
}
- ResponseStatus voteNay() {
- return ResponseStatus(
+ RemoteCommandResponse voteNay() {
+ return RemoteCommandResponse(
NetworkInterfaceMock::Response(BSON("vote" << -10000), BSONObj(), Milliseconds(10)));
}
- ResponseStatus abstainFromVoting() {
- return ResponseStatus(
+ RemoteCommandResponse abstainFromVoting() {
+ return RemoteCommandResponse(
NetworkInterfaceMock::Response(BSON("vote" << 0), BSONObj(), Milliseconds(10)));
}
@@ -364,7 +348,7 @@ TEST_F(ElectScatterGatherTest, NodeRespondsWithBadStatus) {
start(basicThreeNodeConfig());
ASSERT_FALSE(hasReceivedSufficientResponses());
- processResponse(requestFrom("host2"), badResponseStatus());
+ processResponse(requestFrom("host2"), badRemoteCommandResponse());
ASSERT_FALSE(hasReceivedSufficientResponses());
processResponse(requestFrom("host3"), abstainFromVoting());
@@ -406,7 +390,7 @@ TEST_F(ElectScatterGatherTest, NodeRespondsWithBadStatusArbiters) {
start(threeNodesTwoArbitersConfig());
ASSERT_FALSE(hasReceivedSufficientResponses());
- processResponse(requestFrom("host2"), badResponseStatus());
+ processResponse(requestFrom("host2"), badRemoteCommandResponse());
ASSERT_FALSE(hasReceivedSufficientResponses());
processResponse(requestFrom("host3"), abstainFromVoting());
diff --git a/src/mongo/db/repl/freshness_checker.cpp b/src/mongo/db/repl/freshness_checker.cpp
index 8ab9cb1cd7c..b6db822eb67 100644
--- a/src/mongo/db/repl/freshness_checker.cpp
+++ b/src/mongo/db/repl/freshness_checker.cpp
@@ -47,6 +47,7 @@ namespace mongo {
namespace repl {
using executor::RemoteCommandRequest;
+using executor::RemoteCommandResponse;
FreshnessChecker::Algorithm::Algorithm(Timestamp lastOpTimeApplied,
const ReplSetConfig& rsConfig,
@@ -122,7 +123,7 @@ bool FreshnessChecker::Algorithm::_isVotingMember(const HostAndPort hap) const {
}
void FreshnessChecker::Algorithm::processResponse(const RemoteCommandRequest& request,
- const ResponseStatus& response) {
+ const RemoteCommandResponse& response) {
++_responsesProcessed;
bool votingMember = _isVotingMember(request.target);
@@ -209,8 +210,8 @@ long long FreshnessChecker::getOriginalConfigVersion() const {
FreshnessChecker::FreshnessChecker() : _isCanceled(false) {}
FreshnessChecker::~FreshnessChecker() {}
-StatusWith<ReplicationExecutor::EventHandle> FreshnessChecker::start(
- ReplicationExecutor* executor,
+StatusWith<executor::TaskExecutor::EventHandle> FreshnessChecker::start(
+ executor::TaskExecutor* executor,
const Timestamp& lastOpTimeApplied,
const ReplSetConfig& currentConfig,
int selfIndex,
diff --git a/src/mongo/db/repl/freshness_checker.h b/src/mongo/db/repl/freshness_checker.h
index 9d4eb50023f..2db774a7baf 100644
--- a/src/mongo/db/repl/freshness_checker.h
+++ b/src/mongo/db/repl/freshness_checker.h
@@ -33,8 +33,8 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/bson/timestamp.h"
#include "mongo/db/repl/repl_set_config.h"
-#include "mongo/db/repl/replication_executor.h"
#include "mongo/db/repl/scatter_gather_algorithm.h"
+#include "mongo/executor/task_executor.h"
namespace mongo {
@@ -66,7 +66,7 @@ public:
virtual ~Algorithm();
virtual std::vector<executor::RemoteCommandRequest> getRequests() const;
virtual void processResponse(const executor::RemoteCommandRequest& request,
- const ResponseStatus& response);
+ const executor::RemoteCommandResponse& response);
virtual bool hasReceivedSufficientResponses() const;
ElectionAbortReason shouldAbortElection() const;
@@ -118,11 +118,11 @@ public:
* evh can be used to schedule a callback when the process is complete.
* If this function returns Status::OK(), evh is then guaranteed to be signaled.
**/
- StatusWith<ReplicationExecutor::EventHandle> start(ReplicationExecutor* executor,
- const Timestamp& lastOpTimeApplied,
- const ReplSetConfig& currentConfig,
- int selfIndex,
- const std::vector<HostAndPort>& targets);
+ StatusWith<executor::TaskExecutor::EventHandle> start(executor::TaskExecutor* executor,
+ const Timestamp& lastOpTimeApplied,
+ const ReplSetConfig& currentConfig,
+ int selfIndex,
+ const std::vector<HostAndPort>& targets);
/**
* Informs the freshness checker to cancel further processing.
diff --git a/src/mongo/db/repl/freshness_checker_test.cpp b/src/mongo/db/repl/freshness_checker_test.cpp
index 5bc3fd48984..2535ac5ffa2 100644
--- a/src/mongo/db/repl/freshness_checker_test.cpp
+++ b/src/mongo/db/repl/freshness_checker_test.cpp
@@ -33,8 +33,8 @@
#include "mongo/db/repl/freshness_checker.h"
#include "mongo/db/repl/member_heartbeat_data.h"
#include "mongo/db/repl/repl_set_config.h"
-#include "mongo/db/repl/replication_executor.h"
#include "mongo/executor/network_interface_mock.h"
+#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
#include "mongo/platform/unordered_set.h"
#include "mongo/stdx/functional.h"
#include "mongo/stdx/memory.h"
@@ -57,7 +57,7 @@ bool stringContains(const std::string& haystack, const std::string& needle) {
return haystack.find(needle) != std::string::npos;
}
-class FreshnessCheckerTest : public mongo::unittest::Test {
+class FreshnessCheckerTest : public executor::ThreadPoolExecutorTest {
protected:
void startTest(const Timestamp& lastOpTimeApplied,
const ReplSetConfig& currentConfig,
@@ -72,42 +72,29 @@ protected:
stdx::bind(stringContains, stdx::placeholders::_1, needle));
}
- NetworkInterfaceMock* _net;
- std::unique_ptr<ReplicationExecutor> _executor;
- std::unique_ptr<stdx::thread> _executorThread;
-
private:
- void freshnessCheckerRunner(const ReplicationExecutor::CallbackArgs& data,
+ void freshnessCheckerRunner(const executor::TaskExecutor::CallbackArgs& data,
const Timestamp& lastOpTimeApplied,
const ReplSetConfig& currentConfig,
int selfIndex,
const std::vector<HostAndPort>& hosts);
void setUp();
- void tearDown();
- std::unique_ptr<FreshnessChecker> _checker;
- ReplicationExecutor::EventHandle _checkerDoneEvent;
+ FreshnessChecker _checker;
+ executor::TaskExecutor::EventHandle _checkerDoneEvent;
};
void FreshnessCheckerTest::setUp() {
- auto net = stdx::make_unique<NetworkInterfaceMock>();
- _net = net.get();
- _executor = stdx::make_unique<ReplicationExecutor>(std::move(net), 1 /* prng seed */);
- _executorThread.reset(new stdx::thread(stdx::bind(&ReplicationExecutor::run, _executor.get())));
- _checker.reset(new FreshnessChecker);
-}
-
-void FreshnessCheckerTest::tearDown() {
- _executor->shutdown();
- _executorThread->join();
+ executor::ThreadPoolExecutorTest::setUp();
+ launchExecutorThread();
}
void FreshnessCheckerTest::waitOnChecker() {
- _executor->waitForEvent(_checkerDoneEvent);
+ getExecutor().waitForEvent(_checkerDoneEvent);
}
FreshnessChecker::ElectionAbortReason FreshnessCheckerTest::shouldAbortElection() const {
- return _checker->shouldAbortElection();
+ return _checker.shouldAbortElection();
}
ReplSetConfig assertMakeRSConfig(const BSONObj& configBson) {
@@ -131,18 +118,16 @@ const BSONObj makeFreshRequest(const ReplSetConfig& rsConfig,
<< myConfig.getId());
}
-// This is necessary because the run method must be scheduled in the Replication Executor
+// This is necessary because the run method must be scheduled in the executor
// for correct concurrency operation.
-void FreshnessCheckerTest::freshnessCheckerRunner(const ReplicationExecutor::CallbackArgs& data,
+void FreshnessCheckerTest::freshnessCheckerRunner(const executor::TaskExecutor::CallbackArgs& data,
const Timestamp& lastOpTimeApplied,
const ReplSetConfig& currentConfig,
int selfIndex,
const std::vector<HostAndPort>& hosts) {
invariant(data.status.isOK());
- ReplicationExecutor* executor = dynamic_cast<ReplicationExecutor*>(data.executor);
- ASSERT(executor);
- StatusWith<ReplicationExecutor::EventHandle> evh =
- _checker->start(executor, lastOpTimeApplied, currentConfig, selfIndex, hosts);
+ StatusWith<executor::TaskExecutor::EventHandle> evh =
+ _checker.start(data.executor, lastOpTimeApplied, currentConfig, selfIndex, hosts);
_checkerDoneEvent = assertGet(evh);
}
@@ -150,14 +135,14 @@ void FreshnessCheckerTest::startTest(const Timestamp& lastOpTimeApplied,
const ReplSetConfig& currentConfig,
int selfIndex,
const std::vector<HostAndPort>& hosts) {
- _executor->wait(
- assertGet(_executor->scheduleWork(stdx::bind(&FreshnessCheckerTest::freshnessCheckerRunner,
- this,
- stdx::placeholders::_1,
- lastOpTimeApplied,
- currentConfig,
- selfIndex,
- hosts))));
+ getExecutor().wait(assertGet(
+ getExecutor().scheduleWork(stdx::bind(&FreshnessCheckerTest::freshnessCheckerRunner,
+ this,
+ stdx::placeholders::_1,
+ lastOpTimeApplied,
+ currentConfig,
+ selfIndex,
+ hosts))));
}
TEST_F(FreshnessCheckerTest, TwoNodes) {
@@ -177,30 +162,29 @@ TEST_F(FreshnessCheckerTest, TwoNodes) {
const BSONObj freshRequest = makeFreshRequest(config, Timestamp(0, 0), 0);
startTest(Timestamp(0, 0), config, 0, hosts);
- const Date_t startDate = _net->now();
- _net->enterNetwork();
+ const Date_t startDate = getNet()->now();
+ getNet()->enterNetwork();
for (size_t i = 0; i < hosts.size(); ++i) {
- const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest();
+ const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
ASSERT_EQUALS("admin", noi->getRequest().dbname);
ASSERT_BSONOBJ_EQ(freshRequest, noi->getRequest().cmdObj);
ASSERT_EQUALS(HostAndPort("h1"), noi->getRequest().target);
- _net->scheduleResponse(
- noi,
- startDate + Milliseconds(10),
- ResponseStatus(RemoteCommandResponse(BSON("ok" << 1 << "id" << 2 << "set"
- << "rs0"
- << "who"
- << "h1"
- << "cfgver"
- << 1
- << "opTime"
- << Date_t()),
- BSONObj(),
- Milliseconds(8))));
+ getNet()->scheduleResponse(noi,
+ startDate + Milliseconds(10),
+ (RemoteCommandResponse(BSON("ok" << 1 << "id" << 2 << "set"
+ << "rs0"
+ << "who"
+ << "h1"
+ << "cfgver"
+ << 1
+ << "opTime"
+ << Date_t()),
+ BSONObj(),
+ Milliseconds(8))));
}
- _net->runUntil(startDate + Milliseconds(10));
- _net->exitNetwork();
- ASSERT_EQUALS(startDate + Milliseconds(10), _net->now());
+ getNet()->runUntil(startDate + Milliseconds(10));
+ getNet()->exitNetwork();
+ ASSERT_EQUALS(startDate + Milliseconds(10), getNet()->now());
waitOnChecker();
ASSERT_EQUALS(shouldAbortElection(), FreshnessChecker::FreshnessTie);
}
@@ -221,7 +205,8 @@ TEST_F(FreshnessCheckerTest, ShuttingDown) {
hosts.push_back(config.getMemberAt(1).getHostAndPort());
startTest(Timestamp(0, 0), config, 0, hosts);
- _executor->shutdown();
+ shutdownExecutorThread();
+ joinExecutorThread();
waitOnChecker();
// This seems less than ideal, but if we are shutting down, the next phase of election
@@ -248,32 +233,31 @@ TEST_F(FreshnessCheckerTest, ElectNotElectingSelfWeAreNotFreshest) {
const BSONObj freshRequest = makeFreshRequest(config, Timestamp(10, 0), 0);
startTest(Timestamp(10, 0), config, 0, hosts);
- const Date_t startDate = _net->now();
- _net->enterNetwork();
+ const Date_t startDate = getNet()->now();
+ getNet()->enterNetwork();
for (size_t i = 0; i < hosts.size(); ++i) {
- const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest();
+ const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
ASSERT_EQUALS("admin", noi->getRequest().dbname);
ASSERT_BSONOBJ_EQ(freshRequest, noi->getRequest().cmdObj);
ASSERT_EQUALS(HostAndPort("h1"), noi->getRequest().target);
- _net->scheduleResponse(
- noi,
- startDate + Milliseconds(10),
- ResponseStatus(RemoteCommandResponse(BSON("ok" << 1 << "id" << 2 << "set"
- << "rs0"
- << "who"
- << "h1"
- << "cfgver"
- << 1
- << "fresher"
- << true
- << "opTime"
- << Date_t()),
- BSONObj(),
- Milliseconds(8))));
+ getNet()->scheduleResponse(noi,
+ startDate + Milliseconds(10),
+ (RemoteCommandResponse(BSON("ok" << 1 << "id" << 2 << "set"
+ << "rs0"
+ << "who"
+ << "h1"
+ << "cfgver"
+ << 1
+ << "fresher"
+ << true
+ << "opTime"
+ << Date_t()),
+ BSONObj(),
+ Milliseconds(8))));
}
- _net->runUntil(startDate + Milliseconds(10));
- _net->exitNetwork();
- ASSERT_EQUALS(startDate + Milliseconds(10), _net->now());
+ getNet()->runUntil(startDate + Milliseconds(10));
+ getNet()->exitNetwork();
+ ASSERT_EQUALS(startDate + Milliseconds(10), getNet()->now());
waitOnChecker();
stopCapturingLogMessages();
@@ -301,17 +285,17 @@ TEST_F(FreshnessCheckerTest, ElectNotElectingSelfWeAreNotFreshestOpTime) {
const BSONObj freshRequest = makeFreshRequest(config, Timestamp(0, 0), 0);
startTest(Timestamp(0, 0), config, 0, hosts);
- const Date_t startDate = _net->now();
- _net->enterNetwork();
+ const Date_t startDate = getNet()->now();
+ getNet()->enterNetwork();
for (size_t i = 0; i < hosts.size(); ++i) {
- const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest();
+ const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
ASSERT_EQUALS("admin", noi->getRequest().dbname);
ASSERT_BSONOBJ_EQ(freshRequest, noi->getRequest().cmdObj);
ASSERT_EQUALS(HostAndPort("h1"), noi->getRequest().target);
- _net->scheduleResponse(
+ getNet()->scheduleResponse(
noi,
startDate + Milliseconds(10),
- ResponseStatus(RemoteCommandResponse(
+ (RemoteCommandResponse(
BSON("ok" << 1 << "id" << 2 << "set"
<< "rs0"
<< "who"
@@ -323,9 +307,9 @@ TEST_F(FreshnessCheckerTest, ElectNotElectingSelfWeAreNotFreshestOpTime) {
BSONObj(),
Milliseconds(8))));
}
- _net->runUntil(startDate + Milliseconds(10));
- _net->exitNetwork();
- ASSERT_EQUALS(startDate + Milliseconds(10), _net->now());
+ getNet()->runUntil(startDate + Milliseconds(10));
+ getNet()->exitNetwork();
+ ASSERT_EQUALS(startDate + Milliseconds(10), getNet()->now());
waitOnChecker();
stopCapturingLogMessages();
@@ -351,30 +335,29 @@ TEST_F(FreshnessCheckerTest, ElectWrongTypeInFreshnessResponse) {
const BSONObj freshRequest = makeFreshRequest(config, Timestamp(10, 0), 0);
startTest(Timestamp(10, 0), config, 0, hosts);
- const Date_t startDate = _net->now();
- _net->enterNetwork();
+ const Date_t startDate = getNet()->now();
+ getNet()->enterNetwork();
for (size_t i = 0; i < hosts.size(); ++i) {
- const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest();
+ const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
ASSERT_EQUALS("admin", noi->getRequest().dbname);
ASSERT_BSONOBJ_EQ(freshRequest, noi->getRequest().cmdObj);
ASSERT_EQUALS(HostAndPort("h1"), noi->getRequest().target);
- _net->scheduleResponse(
- noi,
- startDate + Milliseconds(10),
- ResponseStatus(RemoteCommandResponse(BSON("ok" << 1 << "id" << 2 << "set"
- << "rs0"
- << "who"
- << "h1"
- << "cfgver"
- << 1
- << "opTime"
- << 3),
- BSONObj(),
- Milliseconds(8))));
+ getNet()->scheduleResponse(noi,
+ startDate + Milliseconds(10),
+ (RemoteCommandResponse(BSON("ok" << 1 << "id" << 2 << "set"
+ << "rs0"
+ << "who"
+ << "h1"
+ << "cfgver"
+ << 1
+ << "opTime"
+ << 3),
+ BSONObj(),
+ Milliseconds(8))));
}
- _net->runUntil(startDate + Milliseconds(10));
- _net->exitNetwork();
- ASSERT_EQUALS(startDate + Milliseconds(10), _net->now());
+ getNet()->runUntil(startDate + Milliseconds(10));
+ getNet()->exitNetwork();
+ ASSERT_EQUALS(startDate + Milliseconds(10), getNet()->now());
waitOnChecker();
stopCapturingLogMessages();
@@ -404,35 +387,34 @@ TEST_F(FreshnessCheckerTest, ElectVetoed) {
const BSONObj freshRequest = makeFreshRequest(config, Timestamp(10, 0), 0);
startTest(Timestamp(10, 0), config, 0, hosts);
- const Date_t startDate = _net->now();
- _net->enterNetwork();
+ const Date_t startDate = getNet()->now();
+ getNet()->enterNetwork();
for (size_t i = 0; i < hosts.size(); ++i) {
- const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest();
+ const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
ASSERT_EQUALS("admin", noi->getRequest().dbname);
ASSERT_BSONOBJ_EQ(freshRequest, noi->getRequest().cmdObj);
ASSERT_EQUALS(HostAndPort("h1"), noi->getRequest().target);
- _net->scheduleResponse(
- noi,
- startDate + Milliseconds(10),
- ResponseStatus(RemoteCommandResponse(
- BSON("ok" << 1 << "id" << 2 << "set"
- << "rs0"
- << "who"
- << "h1"
- << "cfgver"
- << 1
- << "veto"
- << true
- << "errmsg"
- << "I'd rather you didn't"
- << "opTime"
- << Date_t::fromMillisSinceEpoch(Timestamp(0, 0).asLL())),
- BSONObj(),
- Milliseconds(8))));
+ getNet()->scheduleResponse(noi,
+ startDate + Milliseconds(10),
+ (RemoteCommandResponse(BSON("ok" << 1 << "id" << 2 << "set"
+ << "rs0"
+ << "who"
+ << "h1"
+ << "cfgver"
+ << 1
+ << "veto"
+ << true
+ << "errmsg"
+ << "I'd rather you didn't"
+ << "opTime"
+ << Date_t::fromMillisSinceEpoch(
+ Timestamp(0, 0).asLL())),
+ BSONObj(),
+ Milliseconds(8))));
}
- _net->runUntil(startDate + Milliseconds(10));
- _net->exitNetwork();
- ASSERT_EQUALS(startDate + Milliseconds(10), _net->now());
+ getNet()->runUntil(startDate + Milliseconds(10));
+ getNet()->exitNetwork();
+ ASSERT_EQUALS(startDate + Milliseconds(10), getNet()->now());
waitOnChecker();
stopCapturingLogMessages();
@@ -477,11 +459,11 @@ TEST_F(FreshnessCheckerTest, ElectNotElectingSelfWeAreNotFreshestManyNodes) {
const BSONObj freshRequest = makeFreshRequest(config, Timestamp(10, 0), 0);
startTest(Timestamp(10, 0), config, 0, hosts);
- const Date_t startDate = _net->now();
+ const Date_t startDate = getNet()->now();
unordered_set<HostAndPort> seen;
- _net->enterNetwork();
+ getNet()->enterNetwork();
for (size_t i = 0; i < hosts.size(); ++i) {
- const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest();
+ const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
const HostAndPort target = noi->getRequest().target;
ASSERT_EQUALS("admin", noi->getRequest().dbname);
ASSERT_BSONOBJ_EQ(freshRequest, noi->getRequest().cmdObj);
@@ -494,14 +476,14 @@ TEST_F(FreshnessCheckerTest, ElectNotElectingSelfWeAreNotFreshestManyNodes) {
if (target.host() == "h1") {
responseBuilder << "fresher" << true;
}
- _net->scheduleResponse(noi,
- startDate + Milliseconds(10),
- ResponseStatus(RemoteCommandResponse(
- responseBuilder.obj(), BSONObj(), Milliseconds(8))));
+ getNet()->scheduleResponse(
+ noi,
+ startDate + Milliseconds(10),
+ (RemoteCommandResponse(responseBuilder.obj(), BSONObj(), Milliseconds(8))));
}
- _net->runUntil(startDate + Milliseconds(10));
- _net->exitNetwork();
- ASSERT_EQUALS(startDate + Milliseconds(10), _net->now());
+ getNet()->runUntil(startDate + Milliseconds(10));
+ getNet()->exitNetwork();
+ ASSERT_EQUALS(startDate + Milliseconds(10), getNet()->now());
waitOnChecker();
stopCapturingLogMessages();
ASSERT_EQUALS(shouldAbortElection(), FreshnessChecker::FresherNodeFound);
@@ -540,12 +522,12 @@ TEST_F(FreshnessCheckerTest, ElectNotElectingSelfWeAreNotFreshestOpTimeManyNodes
const BSONObj freshRequest = makeFreshRequest(config, Timestamp(10, 0), 0);
startTest(Timestamp(10, 0), config, 0, hosts);
- const Date_t startDate = _net->now();
+ const Date_t startDate = getNet()->now();
unordered_set<HostAndPort> seen;
- _net->enterNetwork();
+ getNet()->enterNetwork();
for (size_t i = 0; i < hosts.size(); ++i) {
- const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest();
+ const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
const HostAndPort target = noi->getRequest().target;
ASSERT_EQUALS("admin", noi->getRequest().dbname);
ASSERT_BSONOBJ_EQ(freshRequest, noi->getRequest().cmdObj);
@@ -556,27 +538,27 @@ TEST_F(FreshnessCheckerTest, ElectNotElectingSelfWeAreNotFreshestOpTimeManyNodes
<< "rs0"
<< "who" << target.toString() << "cfgver" << 1 << "opTime"
<< Date_t::fromMillisSinceEpoch(Timestamp(20, 0).asLL());
- _net->scheduleResponse(noi,
- startDate + Milliseconds(20),
- ResponseStatus(RemoteCommandResponse(
- responseBuilder.obj(), BSONObj(), Milliseconds(8))));
+ getNet()->scheduleResponse(
+ noi,
+ startDate + Milliseconds(20),
+ (RemoteCommandResponse(responseBuilder.obj(), BSONObj(), Milliseconds(8))));
} else {
responseBuilder << "ok" << 1 << "id" << findIdForMember(config, target) << "set"
<< "rs0"
<< "who" << target.toString() << "cfgver" << 1 << "opTime"
<< Date_t::fromMillisSinceEpoch(Timestamp(10, 0).asLL());
- _net->scheduleResponse(noi,
- startDate + Milliseconds(10),
- ResponseStatus(RemoteCommandResponse(
- responseBuilder.obj(), BSONObj(), Milliseconds(8))));
+ getNet()->scheduleResponse(
+ noi,
+ startDate + Milliseconds(10),
+ (RemoteCommandResponse(responseBuilder.obj(), BSONObj(), Milliseconds(8))));
}
}
- _net->runUntil(startDate + Milliseconds(10));
- ASSERT_EQUALS(startDate + Milliseconds(10), _net->now());
+ getNet()->runUntil(startDate + Milliseconds(10));
+ ASSERT_EQUALS(startDate + Milliseconds(10), getNet()->now());
ASSERT_EQUALS(0, countLogLinesContaining("not electing self, we are not freshest"));
- _net->runUntil(startDate + Milliseconds(20));
- ASSERT_EQUALS(startDate + Milliseconds(20), _net->now());
- _net->exitNetwork();
+ getNet()->runUntil(startDate + Milliseconds(20));
+ ASSERT_EQUALS(startDate + Milliseconds(20), getNet()->now());
+ getNet()->exitNetwork();
waitOnChecker();
stopCapturingLogMessages();
ASSERT_EQUALS(shouldAbortElection(), FreshnessChecker::FresherNodeFound);
@@ -610,11 +592,11 @@ TEST_F(FreshnessCheckerTest, ElectWrongTypeInFreshnessResponseManyNodes) {
const BSONObj freshRequest = makeFreshRequest(config, Timestamp(10, 0), 0);
startTest(Timestamp(10, 0), config, 0, hosts);
- const Date_t startDate = _net->now();
+ const Date_t startDate = getNet()->now();
unordered_set<HostAndPort> seen;
- _net->enterNetwork();
+ getNet()->enterNetwork();
for (size_t i = 0; i < hosts.size(); ++i) {
- const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest();
+ const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
const HostAndPort target = noi->getRequest().target;
ASSERT_EQUALS("admin", noi->getRequest().dbname);
ASSERT_BSONOBJ_EQ(freshRequest, noi->getRequest().cmdObj);
@@ -628,14 +610,14 @@ TEST_F(FreshnessCheckerTest, ElectWrongTypeInFreshnessResponseManyNodes) {
} else {
responseBuilder << "opTime" << Date_t::fromMillisSinceEpoch(Timestamp(0, 0).asLL());
}
- _net->scheduleResponse(noi,
- startDate + Milliseconds(10),
- ResponseStatus(RemoteCommandResponse(
- responseBuilder.obj(), BSONObj(), Milliseconds(8))));
+ getNet()->scheduleResponse(
+ noi,
+ startDate + Milliseconds(10),
+ (RemoteCommandResponse(responseBuilder.obj(), BSONObj(), Milliseconds(8))));
}
- _net->runUntil(startDate + Milliseconds(10));
- _net->exitNetwork();
- ASSERT_EQUALS(startDate + Milliseconds(10), _net->now());
+ getNet()->runUntil(startDate + Milliseconds(10));
+ getNet()->exitNetwork();
+ ASSERT_EQUALS(startDate + Milliseconds(10), getNet()->now());
waitOnChecker();
stopCapturingLogMessages();
ASSERT_EQUALS(shouldAbortElection(), FreshnessChecker::FresherNodeFound);
@@ -672,11 +654,11 @@ TEST_F(FreshnessCheckerTest, ElectVetoedManyNodes) {
const BSONObj freshRequest = makeFreshRequest(config, Timestamp(10, 0), 0);
startTest(Timestamp(10, 0), config, 0, hosts);
- const Date_t startDate = _net->now();
+ const Date_t startDate = getNet()->now();
unordered_set<HostAndPort> seen;
- _net->enterNetwork();
+ getNet()->enterNetwork();
for (size_t i = 0; i < hosts.size(); ++i) {
- const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest();
+ const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
const HostAndPort target = noi->getRequest().target;
ASSERT_EQUALS("admin", noi->getRequest().dbname);
ASSERT_BSONOBJ_EQ(freshRequest, noi->getRequest().cmdObj);
@@ -690,14 +672,14 @@ TEST_F(FreshnessCheckerTest, ElectVetoedManyNodes) {
responseBuilder << "veto" << true << "errmsg"
<< "I'd rather you didn't";
}
- _net->scheduleResponse(noi,
- startDate + Milliseconds(10),
- ResponseStatus(RemoteCommandResponse(
- responseBuilder.obj(), BSONObj(), Milliseconds(8))));
+ getNet()->scheduleResponse(
+ noi,
+ startDate + Milliseconds(10),
+ (RemoteCommandResponse(responseBuilder.obj(), BSONObj(), Milliseconds(8))));
}
- _net->runUntil(startDate + Milliseconds(10));
- _net->exitNetwork();
- ASSERT_EQUALS(startDate + Milliseconds(10), _net->now());
+ getNet()->runUntil(startDate + Milliseconds(10));
+ getNet()->exitNetwork();
+ ASSERT_EQUALS(startDate + Milliseconds(10), getNet()->now());
waitOnChecker();
stopCapturingLogMessages();
ASSERT_EQUALS(shouldAbortElection(), FreshnessChecker::FresherNodeFound);
@@ -737,12 +719,12 @@ TEST_F(FreshnessCheckerTest, ElectVetoedAndTiedFreshnessManyNodes) {
const BSONObj freshRequest = makeFreshRequest(config, Timestamp(10, 0), 0);
startTest(Timestamp(10, 0), config, 0, hosts);
- const Date_t startDate = _net->now();
+ const Date_t startDate = getNet()->now();
unordered_set<HostAndPort> seen;
- _net->enterNetwork();
+ getNet()->enterNetwork();
for (size_t i = 0; i < hosts.size(); ++i) {
- const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest();
+ const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
const HostAndPort target = noi->getRequest().target;
ASSERT_EQUALS("admin", noi->getRequest().dbname);
ASSERT_BSONOBJ_EQ(freshRequest, noi->getRequest().cmdObj);
@@ -755,29 +737,29 @@ TEST_F(FreshnessCheckerTest, ElectVetoedAndTiedFreshnessManyNodes) {
<< "errmsg"
<< "I'd rather you didn't"
<< "opTime" << Date_t::fromMillisSinceEpoch(Timestamp(10, 0).asLL());
- _net->scheduleResponse(noi,
- startDate + Milliseconds(20),
- ResponseStatus(RemoteCommandResponse(
- responseBuilder.obj(), BSONObj(), Milliseconds(8))));
+ getNet()->scheduleResponse(
+ noi,
+ startDate + Milliseconds(20),
+ (RemoteCommandResponse(responseBuilder.obj(), BSONObj(), Milliseconds(8))));
} else {
responseBuilder << "ok" << 1 << "id" << findIdForMember(config, target) << "set"
<< "rs0"
<< "who" << target.toString() << "cfgver" << 1 << "opTime"
<< Date_t::fromMillisSinceEpoch(Timestamp(10, 0).asLL());
- _net->scheduleResponse(noi,
- startDate + Milliseconds(10),
- ResponseStatus(RemoteCommandResponse(
- responseBuilder.obj(), BSONObj(), Milliseconds(8))));
+ getNet()->scheduleResponse(
+ noi,
+ startDate + Milliseconds(10),
+ (RemoteCommandResponse(responseBuilder.obj(), BSONObj(), Milliseconds(8))));
}
}
- _net->runUntil(startDate + Milliseconds(10));
- ASSERT_EQUALS(startDate + Milliseconds(10), _net->now());
+ getNet()->runUntil(startDate + Milliseconds(10));
+ ASSERT_EQUALS(startDate + Milliseconds(10), getNet()->now());
ASSERT_EQUALS(0,
countLogLinesContaining("not electing self, h4:27017 would veto with '"
"errmsg: \"I'd rather you didn't\"'"));
- _net->runUntil(startDate + Milliseconds(20));
- ASSERT_EQUALS(startDate + Milliseconds(20), _net->now());
- _net->exitNetwork();
+ getNet()->runUntil(startDate + Milliseconds(20));
+ ASSERT_EQUALS(startDate + Milliseconds(20), getNet()->now());
+ getNet()->exitNetwork();
waitOnChecker();
stopCapturingLogMessages();
ASSERT_EQUALS(shouldAbortElection(), FreshnessChecker::FresherNodeFound);
@@ -813,34 +795,34 @@ TEST_F(FreshnessCheckerTest, ElectManyNodesNotAllRespond) {
const BSONObj freshRequest = makeFreshRequest(config, lastOpTimeApplied, 0);
startTest(Timestamp(10, 0), config, 0, hosts);
- const Date_t startDate = _net->now();
+ const Date_t startDate = getNet()->now();
unordered_set<HostAndPort> seen;
- _net->enterNetwork();
+ getNet()->enterNetwork();
for (size_t i = 0; i < hosts.size(); ++i) {
- const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest();
+ const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
const HostAndPort target = noi->getRequest().target;
ASSERT_EQUALS("admin", noi->getRequest().dbname);
ASSERT_BSONOBJ_EQ(freshRequest, noi->getRequest().cmdObj);
ASSERT(seen.insert(target).second) << "Already saw " << target;
if (target.host() == "h2" || target.host() == "h3") {
- _net->scheduleResponse(noi,
- startDate + Milliseconds(10),
- ResponseStatus(ErrorCodes::NoSuchKey, "No response"));
+ getNet()->scheduleResponse(noi,
+ startDate + Milliseconds(10),
+ RemoteCommandResponse(ErrorCodes::NoSuchKey, "No response"));
} else {
BSONObjBuilder responseBuilder;
responseBuilder << "ok" << 1 << "id" << findIdForMember(config, target) << "set"
<< "rs0"
<< "who" << target.toString() << "cfgver" << 1 << "opTime"
<< Date_t::fromMillisSinceEpoch(Timestamp(0, 0).asLL());
- _net->scheduleResponse(noi,
- startDate + Milliseconds(10),
- ResponseStatus(RemoteCommandResponse(
- responseBuilder.obj(), BSONObj(), Milliseconds(8))));
+ getNet()->scheduleResponse(
+ noi,
+ startDate + Milliseconds(10),
+ (RemoteCommandResponse(responseBuilder.obj(), BSONObj(), Milliseconds(8))));
}
}
- _net->runUntil(startDate + Milliseconds(10));
- _net->exitNetwork();
- ASSERT_EQUALS(startDate + Milliseconds(10), _net->now());
+ getNet()->runUntil(startDate + Milliseconds(10));
+ getNet()->exitNetwork();
+ ASSERT_EQUALS(startDate + Milliseconds(10), getNet()->now());
waitOnChecker();
ASSERT_EQUALS(shouldAbortElection(), FreshnessChecker::None);
}
@@ -884,7 +866,8 @@ protected:
return _checker->hasReceivedSufficientResponses();
}
- void processResponse(const RemoteCommandRequest& request, const ResponseStatus& response) {
+ void processResponse(const RemoteCommandRequest& request,
+ const RemoteCommandResponse& response) {
_checker->processResponse(request, response);
}
@@ -892,54 +875,54 @@ protected:
return _checker->shouldAbortElection();
}
- ResponseStatus lessFresh() {
+ RemoteCommandResponse lessFresh() {
BSONObjBuilder bb;
bb.append("ok", 1.0);
bb.appendDate("opTime", Date_t::fromMillisSinceEpoch(Timestamp(10, 0).asLL()));
- return ResponseStatus(
+ return RemoteCommandResponse(
NetworkInterfaceMock::Response(bb.obj(), BSONObj(), Milliseconds(10)));
}
- ResponseStatus moreFreshViaOpTime() {
+ RemoteCommandResponse moreFreshViaOpTime() {
BSONObjBuilder bb;
bb.append("ok", 1.0);
bb.appendDate("opTime", Date_t::fromMillisSinceEpoch(Timestamp(110, 0).asLL()));
- return ResponseStatus(
+ return RemoteCommandResponse(
NetworkInterfaceMock::Response(bb.obj(), BSONObj(), Milliseconds(10)));
}
- ResponseStatus wrongTypeForOpTime() {
+ RemoteCommandResponse wrongTypeForOpTime() {
BSONObjBuilder bb;
bb.append("ok", 1.0);
bb.append("opTime", std::string("several minutes ago"));
- return ResponseStatus(
+ return RemoteCommandResponse(
NetworkInterfaceMock::Response(bb.obj(), BSONObj(), Milliseconds(10)));
}
- ResponseStatus unauthorized() {
+ RemoteCommandResponse unauthorized() {
BSONObjBuilder bb;
bb.append("ok", 0.0);
bb.append("code", ErrorCodes::Unauthorized);
bb.append("errmsg", "Unauthorized");
- return ResponseStatus(
+ return RemoteCommandResponse(
NetworkInterfaceMock::Response(bb.obj(), BSONObj(), Milliseconds(10)));
}
- ResponseStatus tiedForFreshness() {
+ RemoteCommandResponse tiedForFreshness() {
BSONObjBuilder bb;
bb.append("ok", 1.0);
bb.appendDate("opTime", Date_t::fromMillisSinceEpoch(Timestamp(100, 0).asLL()));
- return ResponseStatus(
+ return RemoteCommandResponse(
NetworkInterfaceMock::Response(bb.obj(), BSONObj(), Milliseconds(10)));
}
- ResponseStatus moreFresh() {
- return ResponseStatus(NetworkInterfaceMock::Response(
+ RemoteCommandResponse moreFresh() {
+ return RemoteCommandResponse(NetworkInterfaceMock::Response(
BSON("ok" << 1.0 << "fresher" << true), BSONObj(), Milliseconds(10)));
}
- ResponseStatus veto() {
- return ResponseStatus(
+ RemoteCommandResponse veto() {
+ return RemoteCommandResponse(
NetworkInterfaceMock::Response(BSON("ok" << 1.0 << "veto" << true << "errmsg"
<< "vetoed!"),
BSONObj(),
@@ -1078,10 +1061,12 @@ TEST_F(FreshnessScatterGatherTest, SecondNodeTiedAndFirstWrongTypeForOpTime) {
TEST_F(FreshnessScatterGatherTest, NotEnoughVotersDueNetworkErrors) {
ASSERT_FALSE(hasReceivedSufficientResponses());
- processResponse(requestFrom("host1"), ResponseStatus(Status(ErrorCodes::NetworkTimeout, "")));
+ processResponse(requestFrom("host1"),
+ RemoteCommandResponse(Status(ErrorCodes::NetworkTimeout, "")));
ASSERT_FALSE(hasReceivedSufficientResponses());
- processResponse(requestFrom("host2"), ResponseStatus(Status(ErrorCodes::NetworkTimeout, "")));
+ processResponse(requestFrom("host2"),
+ RemoteCommandResponse(Status(ErrorCodes::NetworkTimeout, "")));
ASSERT_TRUE(hasReceivedSufficientResponses());
ASSERT_EQUALS(shouldAbortElection(), FreshnessChecker::QuorumUnreachable);
}
diff --git a/src/mongo/db/repl/freshness_scanner.cpp b/src/mongo/db/repl/freshness_scanner.cpp
index f9de9409373..f55d2c673a8 100644
--- a/src/mongo/db/repl/freshness_scanner.cpp
+++ b/src/mongo/db/repl/freshness_scanner.cpp
@@ -44,6 +44,7 @@ namespace mongo {
namespace repl {
using executor::RemoteCommandRequest;
+using executor::RemoteCommandResponse;
FreshnessScanner::Algorithm::Algorithm(const ReplSetConfig& rsConfig,
int myIndex,
@@ -70,7 +71,7 @@ std::vector<RemoteCommandRequest> FreshnessScanner::Algorithm::getRequests() con
}
void FreshnessScanner::Algorithm::processResponse(const RemoteCommandRequest& request,
- const ResponseStatus& response) {
+ const RemoteCommandResponse& response) {
_responsesProcessed++;
if (!response.isOK()) { // failed response
LOG(2) << "FreshnessScanner: Got failed response from " << request.target << ": "
@@ -108,10 +109,11 @@ FreshnessScanner::Result FreshnessScanner::Algorithm::getResult() const {
return _freshnessInfos;
}
-StatusWith<ReplicationExecutor::EventHandle> FreshnessScanner::start(ReplicationExecutor* executor,
- const ReplSetConfig& rsConfig,
- int myIndex,
- Milliseconds timeout) {
+StatusWith<executor::TaskExecutor::EventHandle> FreshnessScanner::start(
+ executor::TaskExecutor* executor,
+ const ReplSetConfig& rsConfig,
+ int myIndex,
+ Milliseconds timeout) {
_algorithm.reset(new Algorithm(rsConfig, myIndex, timeout));
_runner.reset(new ScatterGatherRunner(_algorithm.get(), executor));
return _runner->start();
diff --git a/src/mongo/db/repl/freshness_scanner.h b/src/mongo/db/repl/freshness_scanner.h
index bc474b4fadd..69e6cccf27b 100644
--- a/src/mongo/db/repl/freshness_scanner.h
+++ b/src/mongo/db/repl/freshness_scanner.h
@@ -35,9 +35,9 @@
#include "mongo/bson/timestamp.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/repl_set_config.h"
-#include "mongo/db/repl/replication_executor.h"
#include "mongo/db/repl/scatter_gather_algorithm.h"
#include "mongo/db/repl/scatter_gather_runner.h"
+#include "mongo/executor/task_executor.h"
#include "mongo/stdx/functional.h"
namespace mongo {
@@ -66,7 +66,7 @@ public:
Algorithm(const ReplSetConfig& rsConfig, int myIndex, Milliseconds timeout);
virtual std::vector<executor::RemoteCommandRequest> getRequests() const;
virtual void processResponse(const executor::RemoteCommandRequest& request,
- const ResponseStatus& response);
+ const executor::RemoteCommandResponse& response);
virtual bool hasReceivedSufficientResponses() const;
/**
@@ -96,10 +96,10 @@ public:
* evh can be used to schedule a callback when the process is complete.
* If this function returns Status::OK(), evh is then guaranteed to be signaled.
**/
- StatusWith<ReplicationExecutor::EventHandle> start(ReplicationExecutor* executor,
- const ReplSetConfig& rsConfig,
- int myIndex,
- Milliseconds timeout);
+ StatusWith<executor::TaskExecutor::EventHandle> start(executor::TaskExecutor* executor,
+ const ReplSetConfig& rsConfig,
+ int myIndex,
+ Milliseconds timeout);
/**
* Informs the FreshnessScanner to cancel further processing.
diff --git a/src/mongo/db/repl/freshness_scanner_test.cpp b/src/mongo/db/repl/freshness_scanner_test.cpp
index b1778d002ea..cf9c7dbc709 100644
--- a/src/mongo/db/repl/freshness_scanner_test.cpp
+++ b/src/mongo/db/repl/freshness_scanner_test.cpp
@@ -32,8 +32,8 @@
#include "mongo/db/jsobj.h"
#include "mongo/db/repl/freshness_scanner.h"
#include "mongo/db/repl/replication_coordinator_impl.h"
-#include "mongo/db/repl/replication_executor.h"
#include "mongo/executor/network_interface_mock.h"
+#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
#include "mongo/stdx/functional.h"
#include "mongo/stdx/memory.h"
#include "mongo/stdx/thread.h"
@@ -48,16 +48,11 @@ using executor::RemoteCommandRequest;
using executor::RemoteCommandResponse;
using unittest::assertGet;
-class FreshnessScannerTest : public mongo::unittest::Test {
+class FreshnessScannerTest : public executor::ThreadPoolExecutorTest {
public:
- NetworkInterfaceMock* getNet() {
- return _net;
- }
- ReplicationExecutor* getExecutor() {
- return _executor.get();
- }
-
virtual void setUp() {
+ executor::ThreadPoolExecutorTest::setUp();
+ launchExecutorThread();
ASSERT_OK(_config.initialize(BSON("_id"
<< "rs0"
<< "version"
@@ -82,17 +77,6 @@ public:
<< "priority"
<< 0)))));
ASSERT_OK(_config.validate());
-
- auto net = stdx::make_unique<NetworkInterfaceMock>();
- _net = net.get();
- _executor = stdx::make_unique<ReplicationExecutor>(std::move(net), 1 /* prng seed */);
- _executorThread =
- stdx::make_unique<stdx::thread>(stdx::bind(&ReplicationExecutor::run, _executor.get()));
- }
-
- virtual void tearDown() {
- _executor->shutdown();
- _executorThread->join();
}
protected:
@@ -104,42 +88,36 @@ protected:
Milliseconds(0));
}
- ResponseStatus makeResponseStatus(BSONObj response) {
- return ResponseStatus(
+ RemoteCommandResponse makeRemoteCommandResponse(BSONObj response) {
+ return RemoteCommandResponse(
NetworkInterfaceMock::Response(response, BSONObj(), Milliseconds(10)));
}
- ResponseStatus badResponseStatus() {
- return ResponseStatus(ErrorCodes::NodeNotFound, "not on my watch");
+ RemoteCommandResponse badRemoteCommandResponse() {
+ return RemoteCommandResponse(ErrorCodes::NodeNotFound, "not on my watch");
}
- ResponseStatus goodResponseStatus(Timestamp timestamp, long long term) {
+ RemoteCommandResponse goodRemoteCommandResponse(Timestamp timestamp, long long term) {
// OpTime part of replSetGetStatus.
BSONObj response =
BSON("optimes" << BSON("appliedOpTime" << OpTime(timestamp, term).toBSON()));
- return makeResponseStatus(response);
+ return makeRemoteCommandResponse(response);
}
ReplSetConfig _config;
-
-private:
- // owned by _executor
- NetworkInterfaceMock* _net;
- std::unique_ptr<ReplicationExecutor> _executor;
- std::unique_ptr<stdx::thread> _executorThread;
};
TEST_F(FreshnessScannerTest, ImmediateGoodResponse) {
FreshnessScanner::Algorithm algo(_config, 0, Milliseconds(2000));
ASSERT_FALSE(algo.hasReceivedSufficientResponses());
- algo.processResponse(requestFrom("host1"), goodResponseStatus(Timestamp(1, 100), 1));
+ algo.processResponse(requestFrom("host1"), goodRemoteCommandResponse(Timestamp(1, 100), 1));
ASSERT_FALSE(algo.hasReceivedSufficientResponses());
- algo.processResponse(requestFrom("host2"), goodResponseStatus(Timestamp(1, 200), 1));
+ algo.processResponse(requestFrom("host2"), goodRemoteCommandResponse(Timestamp(1, 200), 1));
ASSERT_FALSE(algo.hasReceivedSufficientResponses());
- algo.processResponse(requestFrom("host3"), goodResponseStatus(Timestamp(1, 400), 1));
+ algo.processResponse(requestFrom("host3"), goodRemoteCommandResponse(Timestamp(1, 400), 1));
ASSERT_FALSE(algo.hasReceivedSufficientResponses());
- algo.processResponse(requestFrom("host4"), goodResponseStatus(Timestamp(1, 300), 1));
+ algo.processResponse(requestFrom("host4"), goodRemoteCommandResponse(Timestamp(1, 300), 1));
ASSERT_TRUE(algo.hasReceivedSufficientResponses());
ASSERT_EQUALS((size_t)4, algo.getResult().size());
ASSERT_EQUALS(3, algo.getResult().front().index);
@@ -153,18 +131,18 @@ TEST_F(FreshnessScannerTest, ImmediateBadResponse) {
// Cannot access host 1 and host 2.
ASSERT_FALSE(algo.hasReceivedSufficientResponses());
- algo.processResponse(requestFrom("host1"), badResponseStatus());
+ algo.processResponse(requestFrom("host1"), badRemoteCommandResponse());
ASSERT_FALSE(algo.hasReceivedSufficientResponses());
- algo.processResponse(requestFrom("host2"), badResponseStatus());
+ algo.processResponse(requestFrom("host2"), badRemoteCommandResponse());
ASSERT_FALSE(algo.hasReceivedSufficientResponses());
// host 3 is in an old version, which doesn't include OpTimes in the response.
- algo.processResponse(requestFrom("host3"), makeResponseStatus(BSONObj()));
+ algo.processResponse(requestFrom("host3"), makeRemoteCommandResponse(BSONObj()));
ASSERT_FALSE(algo.hasReceivedSufficientResponses());
// Responses from host 4 in PV0 are considered as bad responses.
auto response4 = BSON("optimes" << BSON("appliedOpTime" << Timestamp(1, 300)));
- algo.processResponse(requestFrom("host4"), makeResponseStatus(response4));
+ algo.processResponse(requestFrom("host4"), makeRemoteCommandResponse(response4));
ASSERT_TRUE(algo.hasReceivedSufficientResponses());
ASSERT_EQUALS((size_t)0, algo.getResult().size());
}
@@ -172,7 +150,7 @@ TEST_F(FreshnessScannerTest, ImmediateBadResponse) {
TEST_F(FreshnessScannerTest, AllResponsesTimeout) {
Milliseconds timeout(2000);
FreshnessScanner scanner;
- scanner.start(getExecutor(), _config, 0, timeout);
+ scanner.start(&getExecutor(), _config, 0, timeout);
auto net = getNet();
net->enterNetwork();
@@ -191,7 +169,7 @@ TEST_F(FreshnessScannerTest, AllResponsesTimeout) {
TEST_F(FreshnessScannerTest, BadResponsesAndTimeout) {
Milliseconds timeout(2000);
FreshnessScanner scanner;
- scanner.start(getExecutor(), _config, 0, timeout);
+ scanner.start(&getExecutor(), _config, 0, timeout);
auto net = getNet();
net->enterNetwork();
@@ -201,11 +179,11 @@ TEST_F(FreshnessScannerTest, BadResponsesAndTimeout) {
ASSERT(net->hasReadyRequests());
auto noi = net->getNextReadyRequest();
HostAndPort successfulHost = noi->getRequest().target;
- net->scheduleResponse(noi, later, goodResponseStatus(Timestamp(1, 100), 1));
+ net->scheduleResponse(noi, later, goodRemoteCommandResponse(Timestamp(1, 100), 1));
// host 2 has a bad connection.
ASSERT(net->hasReadyRequests());
- net->scheduleResponse(net->getNextReadyRequest(), later, badResponseStatus());
+ net->scheduleResponse(net->getNextReadyRequest(), later, badRemoteCommandResponse());
// host 3 and 4 time out.
ASSERT(net->hasReadyRequests());
diff --git a/src/mongo/db/repl/noop_writer_test.cpp b/src/mongo/db/repl/noop_writer_test.cpp
deleted file mode 100644
index c0f6dce0ebb..00000000000
--- a/src/mongo/db/repl/noop_writer_test.cpp
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Copyright (C) 2016 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-
-#include "mongo/platform/basic.h"
-
-#include <memory>
-
-#include "mongo/db/client.h"
-#include "mongo/db/json.h"
-#include "mongo/db/repl/noop_writer.h"
-#include "mongo/db/repl/optime.h"
-#include "mongo/db/repl/replication_executor_test_fixture.h"
-#include "mongo/unittest/unittest.h"
-
-
-namespace {
-
-using namespace mongo;
-using namespace mongo::repl;
-
-using unittest::log;
-
-class NoopWriterTest : public ReplicationExecutorTest {
-public:
- NoopWriter* getNoopWriter() {
- return _noopWriter.get();
- }
-
-protected:
- void setUp() override {
- setupNoopWriter(Seconds(1));
- }
-
- void tearDown() override {}
-
-private:
- void startNoopWriter(OpTime opTime) {
- invariant(_noopWriter);
- _noopWriter->start(opTime);
- }
-
- void stopNoopWriter() {
- invariant(_noopWriter);
- _noopWriter->stop();
- }
-
- void setupNoopWriter(Seconds waitTime) {
- invariant(!_noopWriter);
- _noopWriter = stdx::make_unique<NoopWriter>(waitTime);
- }
-
- OpTime _getLastOpTime() {
- return _lastOpTime;
- }
-
- std::unique_ptr<NoopWriter> _noopWriter;
- OpTime _lastOpTime;
-};
-// TODO: SERVER-25679
-TEST_F(NoopWriterTest, CreateDestroy) {
- NoopWriter* writer = getNoopWriter();
- ASSERT(writer != nullptr);
-}
-}
diff --git a/src/mongo/db/repl/rollback_checker_test.cpp b/src/mongo/db/repl/rollback_checker_test.cpp
index 9b29fde9874..985a0f59a96 100644
--- a/src/mongo/db/repl/rollback_checker_test.cpp
+++ b/src/mongo/db/repl/rollback_checker_test.cpp
@@ -31,12 +31,12 @@
#include "mongo/platform/basic.h"
#include "mongo/db/client.h"
-#include "mongo/db/repl/replication_executor_test_fixture.h"
#include "mongo/db/repl/rollback_checker.h"
#include "mongo/executor/network_interface_mock.h"
-#include "mongo/util/log.h"
-
+#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
+#include "mongo/stdx/memory.h"
#include "mongo/unittest/unittest.h"
+#include "mongo/util/log.h"
namespace {
@@ -47,7 +47,7 @@ using executor::RemoteCommandResponse;
using LockGuard = stdx::lock_guard<stdx::mutex>;
-class RollbackCheckerTest : public ReplicationExecutorTest {
+class RollbackCheckerTest : public executor::ThreadPoolExecutorTest {
public:
RollbackChecker* getRollbackChecker() const;
@@ -61,9 +61,10 @@ protected:
};
void RollbackCheckerTest::setUp() {
- ReplicationExecutorTest::setUp();
+ executor::ThreadPoolExecutorTest::setUp();
launchExecutorThread();
- _rollbackChecker = stdx::make_unique<RollbackChecker>(&getReplExecutor(), HostAndPort());
+ getNet()->enterNetwork();
+ _rollbackChecker = stdx::make_unique<RollbackChecker>(&getExecutor(), HostAndPort());
stdx::lock_guard<stdx::mutex> lk(_mutex);
_hasRolledBackResult = {ErrorCodes::NotYetInitialized, ""};
_hasCalledCallback = false;
@@ -82,18 +83,21 @@ TEST_F(RollbackCheckerTest, InvalidConstruction) {
TEST_F(RollbackCheckerTest, ShutdownBeforeStart) {
auto callback = [](const RollbackChecker::Result&) {};
- getReplExecutor().shutdown();
+ shutdownExecutorThread();
+ joinExecutorThread();
ASSERT_NOT_OK(getRollbackChecker()->reset(callback).getStatus());
ASSERT_NOT_OK(getRollbackChecker()->checkForRollback(callback).getStatus());
}
TEST_F(RollbackCheckerTest, ShutdownBeforeHasHadRollback) {
- getReplExecutor().shutdown();
+ shutdownExecutorThread();
+ joinExecutorThread();
ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, getRollbackChecker()->hasHadRollback());
}
TEST_F(RollbackCheckerTest, ShutdownBeforeResetSync) {
- getReplExecutor().shutdown();
+ shutdownExecutorThread();
+ joinExecutorThread();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, getRollbackChecker()->reset_sync());
}
@@ -107,7 +111,7 @@ TEST_F(RollbackCheckerTest, reset) {
getNet()->runReadyNetworkOperations();
getNet()->exitNetwork();
- getReplExecutor().wait(cbh);
+ getExecutor().wait(cbh);
ASSERT_EQUALS(getRollbackChecker()->getBaseRBID(), 3);
}
@@ -123,7 +127,7 @@ TEST_F(RollbackCheckerTest, RollbackRBID) {
auto commandResponse = BSON("ok" << 1 << "rbid" << 3);
getNet()->scheduleSuccessfulResponse(commandResponse);
getNet()->runReadyNetworkOperations();
- getReplExecutor().wait(refreshCBH);
+ getExecutor().wait(refreshCBH);
ASSERT_EQUALS(getRollbackChecker()->getBaseRBID(), 3);
{
LockGuard lk(_mutex);
@@ -141,7 +145,7 @@ TEST_F(RollbackCheckerTest, RollbackRBID) {
getNet()->runReadyNetworkOperations();
getNet()->exitNetwork();
- getReplExecutor().wait(rbCBH);
+ getExecutor().wait(rbCBH);
ASSERT_EQUALS(getRollbackChecker()->getLastRBID_forTest(), 4);
ASSERT_EQUALS(getRollbackChecker()->getBaseRBID(), 3);
LockGuard lk(_mutex);
@@ -161,7 +165,7 @@ TEST_F(RollbackCheckerTest, NoRollbackRBID) {
auto commandResponse = BSON("ok" << 1 << "rbid" << 3);
getNet()->scheduleSuccessfulResponse(commandResponse);
getNet()->runReadyNetworkOperations();
- getReplExecutor().wait(refreshCBH);
+ getExecutor().wait(refreshCBH);
ASSERT_EQUALS(getRollbackChecker()->getBaseRBID(), 3);
{
LockGuard lk(_mutex);
@@ -179,7 +183,7 @@ TEST_F(RollbackCheckerTest, NoRollbackRBID) {
getNet()->runReadyNetworkOperations();
getNet()->exitNetwork();
- getReplExecutor().wait(rbCBH);
+ getExecutor().wait(rbCBH);
ASSERT_EQUALS(getRollbackChecker()->getLastRBID_forTest(), 3);
ASSERT_EQUALS(getRollbackChecker()->getBaseRBID(), 3);
LockGuard lk(_mutex);
diff --git a/src/mongo/db/repl/scatter_gather_algorithm.h b/src/mongo/db/repl/scatter_gather_algorithm.h
index a622473c5ee..6c4c31a0799 100644
--- a/src/mongo/db/repl/scatter_gather_algorithm.h
+++ b/src/mongo/db/repl/scatter_gather_algorithm.h
@@ -30,13 +30,11 @@
#include <vector>
-#include "mongo/db/repl/replication_executor.h"
+#include "mongo/executor/remote_command_request.h"
+#include "mongo/executor/remote_command_response.h"
namespace mongo {
-template <typename T>
-class StatusWith;
-
namespace repl {
/**
@@ -63,7 +61,7 @@ public:
* Method to call once for each received response.
*/
virtual void processResponse(const executor::RemoteCommandRequest& request,
- const ResponseStatus& response) = 0;
+ const executor::RemoteCommandResponse& response) = 0;
/**
* Returns true if no more calls to processResponse are needed to consider the
diff --git a/src/mongo/db/repl/scatter_gather_runner.cpp b/src/mongo/db/repl/scatter_gather_runner.cpp
index 597239179e0..f29600a2bf2 100644
--- a/src/mongo/db/repl/scatter_gather_runner.cpp
+++ b/src/mongo/db/repl/scatter_gather_runner.cpp
@@ -43,13 +43,13 @@ namespace repl {
using executor::RemoteCommandRequest;
using LockGuard = stdx::lock_guard<stdx::mutex>;
-using CallbackHandle = ReplicationExecutor::CallbackHandle;
-using EventHandle = ReplicationExecutor::EventHandle;
-using RemoteCommandCallbackArgs = ReplicationExecutor::RemoteCommandCallbackArgs;
-using RemoteCommandCallbackFn = ReplicationExecutor::RemoteCommandCallbackFn;
+using CallbackHandle = executor::TaskExecutor::CallbackHandle;
+using EventHandle = executor::TaskExecutor::EventHandle;
+using RemoteCommandCallbackArgs = executor::TaskExecutor::RemoteCommandCallbackArgs;
+using RemoteCommandCallbackFn = executor::TaskExecutor::RemoteCommandCallbackFn;
ScatterGatherRunner::ScatterGatherRunner(ScatterGatherAlgorithm* algorithm,
- ReplicationExecutor* executor)
+ executor::TaskExecutor* executor)
: _executor(executor), _impl(std::make_shared<RunnerImpl>(algorithm, executor)) {}
Status ScatterGatherRunner::run() {
@@ -80,7 +80,7 @@ void ScatterGatherRunner::cancel() {
* Scatter gather runner implementation.
*/
ScatterGatherRunner::RunnerImpl::RunnerImpl(ScatterGatherAlgorithm* algorithm,
- ReplicationExecutor* executor)
+ executor::TaskExecutor* executor)
: _executor(executor), _algorithm(algorithm) {}
StatusWith<EventHandle> ScatterGatherRunner::RunnerImpl::start(
@@ -124,7 +124,7 @@ void ScatterGatherRunner::RunnerImpl::cancel() {
}
void ScatterGatherRunner::RunnerImpl::processResponse(
- const ReplicationExecutor::RemoteCommandCallbackArgs& cbData) {
+ const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) {
LockGuard lk(_mutex);
if (!_sufficientResponsesReceived.isValid()) {
@@ -152,9 +152,10 @@ void ScatterGatherRunner::RunnerImpl::processResponse(
void ScatterGatherRunner::RunnerImpl::_signalSufficientResponsesReceived() {
if (_sufficientResponsesReceived.isValid()) {
- std::for_each(_callbacks.begin(),
- _callbacks.end(),
- stdx::bind(&ReplicationExecutor::cancel, _executor, stdx::placeholders::_1));
+ std::for_each(
+ _callbacks.begin(),
+ _callbacks.end(),
+ stdx::bind(&executor::TaskExecutor::cancel, _executor, stdx::placeholders::_1));
// Clear _callbacks to break the cycle of shared_ptr.
_callbacks.clear();
_executor->signalEvent(_sufficientResponsesReceived);
diff --git a/src/mongo/db/repl/scatter_gather_runner.h b/src/mongo/db/repl/scatter_gather_runner.h
index 864e7b3add8..4a58461de1e 100644
--- a/src/mongo/db/repl/scatter_gather_runner.h
+++ b/src/mongo/db/repl/scatter_gather_runner.h
@@ -31,7 +31,7 @@
#include <vector>
#include "mongo/base/disallow_copying.h"
-#include "mongo/db/repl/replication_executor.h"
+#include "mongo/executor/task_executor.h"
#include "mongo/stdx/functional.h"
#include "mongo/stdx/mutex.h"
@@ -56,7 +56,8 @@ public:
*
* "algorithm" and "executor" must remain in scope until the runner's destructor completes.
*/
- explicit ScatterGatherRunner(ScatterGatherAlgorithm* algorithm, ReplicationExecutor* executor);
+ explicit ScatterGatherRunner(ScatterGatherAlgorithm* algorithm,
+ executor::TaskExecutor* executor);
/**
* Runs the scatter-gather process and blocks until it completes.
@@ -78,7 +79,7 @@ public:
*
* The returned event will eventually be signaled.
*/
- StatusWith<ReplicationExecutor::EventHandle> start();
+ StatusWith<executor::TaskExecutor::EventHandle> start();
/**
* Informs the runner to cancel further processing.
@@ -87,11 +88,11 @@ public:
private:
/**
- * Implementation of a scatter-gather behavior using a ReplicationExecutor.
+ * Implementation of a scatter-gather behavior using a TaskExecutor.
*/
class RunnerImpl {
public:
- explicit RunnerImpl(ScatterGatherAlgorithm* algorithm, ReplicationExecutor* executor);
+ explicit RunnerImpl(ScatterGatherAlgorithm* algorithm, executor::TaskExecutor* executor);
/**
* On success, returns an event handle that will be signaled when the runner has
@@ -100,8 +101,8 @@ private:
*
* The returned event will eventually be signaled.
*/
- StatusWith<ReplicationExecutor::EventHandle> start(
- const ReplicationExecutor::RemoteCommandCallbackFn cb);
+ StatusWith<executor::TaskExecutor::EventHandle> start(
+ const executor::TaskExecutor::RemoteCommandCallbackFn cb);
/**
* Informs the runner to cancel further processing.
@@ -111,7 +112,7 @@ private:
/**
* Callback invoked once for every response from the network.
*/
- void processResponse(const ReplicationExecutor::RemoteCommandCallbackArgs& cbData);
+ void processResponse(const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData);
private:
/**
@@ -120,15 +121,15 @@ private:
*/
void _signalSufficientResponsesReceived();
- ReplicationExecutor* _executor; // Not owned here.
+ executor::TaskExecutor* _executor; // Not owned here.
ScatterGatherAlgorithm* _algorithm; // Not owned here.
- ReplicationExecutor::EventHandle _sufficientResponsesReceived;
- std::vector<ReplicationExecutor::CallbackHandle> _callbacks;
+ executor::TaskExecutor::EventHandle _sufficientResponsesReceived;
+ std::vector<executor::TaskExecutor::CallbackHandle> _callbacks;
bool _started = false;
stdx::mutex _mutex;
};
- ReplicationExecutor* _executor; // Not owned here.
+ executor::TaskExecutor* _executor; // Not owned here.
// This pointer of RunnerImpl will be shared with remote command callbacks to make sure
// callbacks can access the members safely.
diff --git a/src/mongo/db/repl/scatter_gather_test.cpp b/src/mongo/db/repl/scatter_gather_test.cpp
index 63915e0df98..294b2c84ed7 100644
--- a/src/mongo/db/repl/scatter_gather_test.cpp
+++ b/src/mongo/db/repl/scatter_gather_test.cpp
@@ -28,10 +28,9 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/repl/replication_executor.h"
#include "mongo/db/repl/scatter_gather_algorithm.h"
#include "mongo/db/repl/scatter_gather_runner.h"
-#include "mongo/executor/network_interface_mock.h"
+#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
#include "mongo/stdx/functional.h"
#include "mongo/stdx/memory.h"
#include "mongo/stdx/thread.h"
@@ -57,7 +56,7 @@ public:
ScatterGatherTestAlgorithm(int64_t maxResponses = 2)
: _done(false), _numResponses(0), _maxResponses(maxResponses) {}
- virtual std::vector<RemoteCommandRequest> getRequests() const {
+ std::vector<RemoteCommandRequest> getRequests() const override {
std::vector<RemoteCommandRequest> requests;
for (int i = 0; i < kTotalRequests; i++) {
requests.push_back(RemoteCommandRequest(
@@ -66,8 +65,8 @@ public:
return requests;
}
- virtual void processResponse(const RemoteCommandRequest& request,
- const ResponseStatus& response) {
+ void processResponse(const RemoteCommandRequest& request,
+ const RemoteCommandResponse& response) override {
_numResponses++;
}
@@ -94,46 +93,21 @@ private:
};
/**
- * ScatterGatherTest base class which sets up the ReplicationExecutor and NetworkInterfaceMock.
+ * ScatterGatherTest base class which sets up the TaskExecutor and NetworkInterfaceMock.
*/
-class ScatterGatherTest : public mongo::unittest::Test {
+class ScatterGatherTest : public executor::ThreadPoolExecutorTest {
protected:
- NetworkInterfaceMock* getNet() {
- return _net;
- }
- ReplicationExecutor* getExecutor() {
- return _executor.get();
- }
-
int64_t countLogLinesContaining(const std::string& needle);
-
-private:
- void setUp();
- void tearDown();
-
- // owned by _executor
- NetworkInterfaceMock* _net;
- std::unique_ptr<ReplicationExecutor> _executor;
- std::unique_ptr<stdx::thread> _executorThread;
+ void setUp() {
+ executor::ThreadPoolExecutorTest::setUp();
+ launchExecutorThread();
+ }
};
-void ScatterGatherTest::setUp() {
- auto net = stdx::make_unique<NetworkInterfaceMock>();
- _net = net.get();
- _executor = stdx::make_unique<ReplicationExecutor>(std::move(net), 1 /* prng seed */);
- _executorThread.reset(new stdx::thread(stdx::bind(&ReplicationExecutor::run, _executor.get())));
-}
-
-void ScatterGatherTest::tearDown() {
- _executor->shutdown();
- _executorThread->join();
-}
-
-
// Used to run a ScatterGatherRunner in a separate thread, to avoid blocking test execution.
class ScatterGatherRunnerRunner {
public:
- ScatterGatherRunnerRunner(ScatterGatherRunner* sgr, ReplicationExecutor* executor)
+ ScatterGatherRunnerRunner(ScatterGatherRunner* sgr, executor::TaskExecutor* executor)
: _sgr(sgr),
_executor(executor),
_result(Status(ErrorCodes::BadValue, "failed to set status")) {}
@@ -150,12 +124,12 @@ public:
}
private:
- void _run(ReplicationExecutor* executor) {
+ void _run(executor::TaskExecutor* executor) {
_result = _sgr->run();
}
ScatterGatherRunner* _sgr;
- ReplicationExecutor* _executor;
+ executor::TaskExecutor* _executor;
Status _result;
std::unique_ptr<stdx::thread> _thread;
};
@@ -163,7 +137,7 @@ private:
// Simple onCompletion function which will toggle a bool, so that we can check the logs to
// ensure the onCompletion function ran when expected.
executor::TaskExecutor::CallbackFn getOnCompletionTestFunction(bool* ran) {
- auto cb = [ran](const ReplicationExecutor::CallbackArgs& cbData) {
+ auto cb = [ran](const executor::TaskExecutor::CallbackArgs& cbData) {
if (!cbData.status.isOK()) {
return;
}
@@ -180,34 +154,31 @@ executor::TaskExecutor::CallbackFn getOnCompletionTestFunction(bool* ran) {
// completed.
TEST_F(ScatterGatherTest, DeleteAlgorithmAfterItHasCompleted) {
ScatterGatherTestAlgorithm* sga = new ScatterGatherTestAlgorithm();
- ScatterGatherRunner* sgr = new ScatterGatherRunner(sga, getExecutor());
+ ScatterGatherRunner* sgr = new ScatterGatherRunner(sga, &getExecutor());
bool ranCompletion = false;
- StatusWith<ReplicationExecutor::EventHandle> status = sgr->start();
- getExecutor()->onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion));
+ StatusWith<executor::TaskExecutor::EventHandle> status = sgr->start();
+ getExecutor().onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion));
ASSERT_OK(status.getStatus());
ASSERT_FALSE(ranCompletion);
NetworkInterfaceMock* net = getNet();
net->enterNetwork();
NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest();
- net->scheduleResponse(
- noi,
- net->now() + Seconds(2),
- ResponseStatus(RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(10))));
+ net->scheduleResponse(noi,
+ net->now() + Seconds(2),
+ (RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(10))));
ASSERT_FALSE(ranCompletion);
noi = net->getNextReadyRequest();
- net->scheduleResponse(
- noi,
- net->now() + Seconds(2),
- ResponseStatus(RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(10))));
+ net->scheduleResponse(noi,
+ net->now() + Seconds(2),
+ (RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(10))));
ASSERT_FALSE(ranCompletion);
noi = net->getNextReadyRequest();
- net->scheduleResponse(
- noi,
- net->now() + Seconds(5),
- ResponseStatus(RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(10))));
+ net->scheduleResponse(noi,
+ net->now() + Seconds(5),
+ (RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(10))));
ASSERT_FALSE(ranCompletion);
net->runUntil(net->now() + Seconds(2));
@@ -221,23 +192,23 @@ TEST_F(ScatterGatherTest, DeleteAlgorithmAfterItHasCompleted) {
net->exitNetwork();
}
-// Confirm that shutting the ReplicationExecutor down before calling run() will cause run()
+// Confirm that shutting the TaskExecutor down before calling run() will cause run()
// to return ErrorCodes::ShutdownInProgress.
TEST_F(ScatterGatherTest, ShutdownExecutorBeforeRun) {
ScatterGatherTestAlgorithm sga;
- ScatterGatherRunner sgr(&sga, getExecutor());
- getExecutor()->shutdown();
+ ScatterGatherRunner sgr(&sga, &getExecutor());
+ shutdownExecutorThread();
sga.finish();
Status status = sgr.run();
ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, status);
}
-// Confirm that shutting the ReplicationExecutor down after calling run(), but before run()
+// Confirm that shutting the TaskExecutor down after calling run(), but before run()
// finishes will cause run() to return Status::OK().
TEST_F(ScatterGatherTest, ShutdownExecutorAfterRun) {
ScatterGatherTestAlgorithm sga;
- ScatterGatherRunner sgr(&sga, getExecutor());
- ScatterGatherRunnerRunner sgrr(&sgr, getExecutor());
+ ScatterGatherRunner sgr(&sga, &getExecutor());
+ ScatterGatherRunnerRunner sgrr(&sgr, &getExecutor());
sgrr.run();
// need to wait for the scatter-gather to be scheduled in the executor
NetworkInterfaceMock* net = getNet();
@@ -248,33 +219,34 @@ TEST_F(ScatterGatherTest, ShutdownExecutorAfterRun) {
net->blackHole(noi);
}
net->exitNetwork();
- getExecutor()->shutdown();
+ shutdownExecutorThread();
+ joinExecutorThread();
Status status = sgrr.getResult();
ASSERT_OK(status);
}
-// Confirm that shutting the ReplicationExecutor down before calling start() will cause start()
+// Confirm that shutting the TaskExecutor down before calling start() will cause start()
// to return ErrorCodes::ShutdownInProgress and should not run onCompletion().
TEST_F(ScatterGatherTest, ShutdownExecutorBeforeStart) {
ScatterGatherTestAlgorithm sga;
- ScatterGatherRunner sgr(&sga, getExecutor());
- getExecutor()->shutdown();
+ ScatterGatherRunner sgr(&sga, &getExecutor());
+ shutdownExecutorThread();
bool ranCompletion = false;
- StatusWith<ReplicationExecutor::EventHandle> status = sgr.start();
+ StatusWith<executor::TaskExecutor::EventHandle> status = sgr.start();
sga.finish();
ASSERT_FALSE(ranCompletion);
ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, status.getStatus());
}
-// Confirm that shutting the ReplicationExecutor down after calling start() will cause start()
+// Confirm that shutting the TaskExecutor down after calling start() will cause start()
// to return Status::OK and should not run onCompletion().
TEST_F(ScatterGatherTest, ShutdownExecutorAfterStart) {
ScatterGatherTestAlgorithm sga;
- ScatterGatherRunner sgr(&sga, getExecutor());
+ ScatterGatherRunner sgr(&sga, &getExecutor());
bool ranCompletion = false;
- StatusWith<ReplicationExecutor::EventHandle> status = sgr.start();
- getExecutor()->onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion));
- getExecutor()->shutdown();
+ StatusWith<executor::TaskExecutor::EventHandle> status = sgr.start();
+ getExecutor().onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion));
+ shutdownExecutorThread();
sga.finish();
ASSERT_FALSE(ranCompletion);
ASSERT_OK(status.getStatus());
@@ -283,34 +255,31 @@ TEST_F(ScatterGatherTest, ShutdownExecutorAfterStart) {
// Confirm that responses are not processed once sufficient responses have been received.
TEST_F(ScatterGatherTest, DoNotProcessMoreThanSufficientResponses) {
ScatterGatherTestAlgorithm sga;
- ScatterGatherRunner sgr(&sga, getExecutor());
+ ScatterGatherRunner sgr(&sga, &getExecutor());
bool ranCompletion = false;
- StatusWith<ReplicationExecutor::EventHandle> status = sgr.start();
- getExecutor()->onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion));
+ StatusWith<executor::TaskExecutor::EventHandle> status = sgr.start();
+ getExecutor().onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion));
ASSERT_OK(status.getStatus());
ASSERT_FALSE(ranCompletion);
NetworkInterfaceMock* net = getNet();
net->enterNetwork();
NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest();
- net->scheduleResponse(
- noi,
- net->now() + Seconds(2),
- ResponseStatus(RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(10))));
+ net->scheduleResponse(noi,
+ net->now() + Seconds(2),
+ (RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(10))));
ASSERT_FALSE(ranCompletion);
noi = net->getNextReadyRequest();
- net->scheduleResponse(
- noi,
- net->now() + Seconds(2),
- ResponseStatus(RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(10))));
+ net->scheduleResponse(noi,
+ net->now() + Seconds(2),
+ (RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(10))));
ASSERT_FALSE(ranCompletion);
noi = net->getNextReadyRequest();
- net->scheduleResponse(
- noi,
- net->now() + Seconds(5),
- ResponseStatus(RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(10))));
+ net->scheduleResponse(noi,
+ net->now() + Seconds(5),
+ (RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(10))));
ASSERT_FALSE(ranCompletion);
net->runUntil(net->now() + Seconds(2));
@@ -328,10 +297,10 @@ TEST_F(ScatterGatherTest, DoNotCreateCallbacksIfHasSufficientResponsesReturnsTru
ScatterGatherTestAlgorithm sga;
// set hasReceivedSufficientResponses to return true before the run starts
sga.finish();
- ScatterGatherRunner sgr(&sga, getExecutor());
+ ScatterGatherRunner sgr(&sga, &getExecutor());
bool ranCompletion = false;
- StatusWith<ReplicationExecutor::EventHandle> status = sgr.start();
- getExecutor()->onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion));
+ StatusWith<executor::TaskExecutor::EventHandle> status = sgr.start();
+ getExecutor().onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion));
ASSERT_OK(status.getStatus());
// Wait until callback finishes.
NetworkInterfaceMock* net = getNet();
@@ -349,7 +318,7 @@ TEST_F(ScatterGatherTest, DoNotCreateCallbacksIfHasSufficientResponsesReturnsTru
ScatterGatherTestAlgorithm sga(5);
ScatterGatherRunner sgr(&sga);
bool ranCompletion = false;
- StatusWith<ReplicationExecutor::EventHandle> status = sgr.start(getExecutor(),
+ StatusWith<executor::TaskExecutor::EventHandle> status = sgr.start(&getExecutor(),
getOnCompletionTestFunction(&ranCompletion));
ASSERT_OK(status.getStatus());
ASSERT_FALSE(ranCompletion);
@@ -359,7 +328,7 @@ TEST_F(ScatterGatherTest, DoNotCreateCallbacksIfHasSufficientResponsesReturnsTru
NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest();
net->scheduleResponse(noi,
net->now(),
- ResponseStatus(RemoteCommandResponse(
+ (RemoteCommandResponse(
BSON("ok" << 1),
boost::posix_time::milliseconds(10))));
net->runReadyNetworkOperations();
@@ -368,7 +337,7 @@ TEST_F(ScatterGatherTest, DoNotCreateCallbacksIfHasSufficientResponsesReturnsTru
noi = net->getNextReadyRequest();
net->scheduleResponse(noi,
net->now(),
- ResponseStatus(RemoteCommandResponse(
+ (RemoteCommandResponse(
BSON("ok" << 1),
boost::posix_time::milliseconds(10))));
net->runReadyNetworkOperations();
@@ -377,7 +346,7 @@ TEST_F(ScatterGatherTest, DoNotCreateCallbacksIfHasSufficientResponsesReturnsTru
noi = net->getNextReadyRequest();
net->scheduleResponse(noi,
net->now(),
- ResponseStatus(RemoteCommandResponse(
+ (RemoteCommandResponse(
BSON("ok" << 1),
boost::posix_time::milliseconds(10))));
net->runReadyNetworkOperations();
@@ -389,17 +358,15 @@ TEST_F(ScatterGatherTest, DoNotCreateCallbacksIfHasSufficientResponsesReturnsTru
// Confirm that running via run() will finish once sufficient responses have been received.
TEST_F(ScatterGatherTest, SuccessfulScatterGatherViaRun) {
ScatterGatherTestAlgorithm sga;
- ScatterGatherRunner sgr(&sga, getExecutor());
- ScatterGatherRunnerRunner sgrr(&sgr, getExecutor());
+ ScatterGatherRunner sgr(&sga, &getExecutor());
+ ScatterGatherRunnerRunner sgrr(&sgr, &getExecutor());
sgrr.run();
NetworkInterfaceMock* net = getNet();
net->enterNetwork();
NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest();
net->scheduleResponse(
- noi,
- net->now(),
- ResponseStatus(RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(10))));
+ noi, net->now(), (RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(10))));
net->runReadyNetworkOperations();
noi = net->getNextReadyRequest();
@@ -408,9 +375,7 @@ TEST_F(ScatterGatherTest, SuccessfulScatterGatherViaRun) {
noi = net->getNextReadyRequest();
net->scheduleResponse(
- noi,
- net->now(),
- ResponseStatus(RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(10))));
+ noi, net->now(), (RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(10))));
net->runReadyNetworkOperations();
net->exitNetwork();
diff --git a/src/mongo/db/repl/task_runner.cpp b/src/mongo/db/repl/task_runner.cpp
index d364151a228..ff3080cf161 100644
--- a/src/mongo/db/repl/task_runner.cpp
+++ b/src/mongo/db/repl/task_runner.cpp
@@ -34,7 +34,7 @@
#include <memory>
-#include "mongo/db/auth/authorization_manager_global.h"
+#include "mongo/db/auth/authorization_manager.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/client.h"
#include "mongo/db/operation_context.h"
@@ -141,7 +141,7 @@ void TaskRunner::_runTasks() {
// to be equal to the client used to create the operation context.
Client::initThreadIfNotAlready();
client = &cc();
- if (getGlobalAuthorizationManager()->isAuthEnabled()) {
+ if (AuthorizationManager::get(client->getServiceContext())->isAuthEnabled()) {
AuthorizationSession::get(client)->grantInternalAuthorization();
}
}