summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAndy Schwerin <Andy Schwerin schwerin@mongodb.com>2017-05-18 16:41:38 -0500
committerAndy Schwerin <Andy Schwerin schwerin@mongodb.com>2017-05-18 16:41:38 -0500
commit9ff6b2d78cee808d633dee952d31e4d4f1fe0dd0 (patch)
tree00ef5cf14e062f063e19aa34b81b74b823cad462 /src
parentf5ae1e058b9cef56fdbe08775d48278c79144051 (diff)
downloadmongo-9ff6b2d78cee808d633dee952d31e4d4f1fe0dd0.tar.gz
SERVER-28865 Replace ReplicationExecutor with ThreadPoolTaskExecutor in ReplicationCoordinatorImpl.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/db.cpp21
-rw-r--r--src/mongo/db/repl/SConscript63
-rw-r--r--src/mongo/db/repl/database_task.cpp99
-rw-r--r--src/mongo/db/repl/database_task.h71
-rw-r--r--src/mongo/db/repl/database_task_test.cpp177
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp181
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h69
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp18
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp5
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp133
-rw-r--r--src/mongo/db/repl/replication_coordinator_test_fixture.cpp19
-rw-r--r--src/mongo/db/repl/replication_coordinator_test_fixture.h9
-rw-r--r--src/mongo/db/repl/replication_executor.cpp655
-rw-r--r--src/mongo/db/repl/replication_executor.h422
-rw-r--r--src/mongo/db/repl/replication_executor_test.cpp373
-rw-r--r--src/mongo/db/repl/replication_executor_test_fixture.cpp60
-rw-r--r--src/mongo/db/repl/replication_executor_test_fixture.h69
-rw-r--r--src/mongo/executor/thread_pool_mock.cpp2
19 files changed, 159 insertions, 2288 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index a998f4c8c64..dc2b8898e6e 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -690,6 +690,7 @@ env.Library(
],
LIBDEPS=[
"concurrency/lock_manager",
+ "concurrency/write_conflict_exception",
"curop",
"repl/read_concern_args",
"repl/repl_coordinator_impl",
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp
index 2981e14ac43..34786483431 100644
--- a/src/mongo/db/db.cpp
+++ b/src/mongo/db/db.cpp
@@ -855,6 +855,21 @@ static void startupConfigActions(const std::vector<std::string>& args) {
#endif
}
+auto makeReplicationExecutor(ServiceContext* serviceContext) {
+ ThreadPool::Options tpOptions;
+ tpOptions.poolName = "replexec";
+ tpOptions.maxThreads = 50;
+ tpOptions.onCreateThread = [](const std::string& threadName) {
+ Client::initThread(threadName.c_str());
+ };
+ auto hookList = stdx::make_unique<rpc::EgressMetadataHookList>();
+ hookList->addHook(stdx::make_unique<rpc::LogicalTimeMetadataHook>(serviceContext));
+ return stdx::make_unique<executor::ThreadPoolTaskExecutor>(
+ stdx::make_unique<ThreadPool>(tpOptions),
+ executor::makeNetworkInterface(
+ "NetworkInterfaceASIO-Replication", nullptr, std::move(hookList)));
+}
+
MONGO_INITIALIZER_WITH_PREREQUISITES(CreateReplicationManager,
("SetGlobalEnvironment", "SSLManager", "default"))
(InitializerContext* context) {
@@ -873,16 +888,12 @@ MONGO_INITIALIZER_WITH_PREREQUISITES(CreateReplicationManager,
auto logicalClock = stdx::make_unique<LogicalClock>(serviceContext);
LogicalClock::set(serviceContext, std::move(logicalClock));
- auto hookList = stdx::make_unique<rpc::EgressMetadataHookList>();
- hookList->addHook(stdx::make_unique<rpc::LogicalTimeMetadataHook>(serviceContext));
-
auto replCoord = stdx::make_unique<repl::ReplicationCoordinatorImpl>(
serviceContext,
getGlobalReplSettings(),
stdx::make_unique<repl::ReplicationCoordinatorExternalStateImpl>(serviceContext,
storageInterface),
- executor::makeNetworkInterface(
- "NetworkInterfaceASIO-Replication", nullptr, std::move(hookList)),
+ makeReplicationExecutor(serviceContext),
stdx::make_unique<repl::TopologyCoordinatorImpl>(topoCoordOptions),
replicationProcess,
storageInterface,
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index c5a10d4ae9e..7b22561716e 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -212,19 +212,6 @@ env.CppUnitTest(
)
env.Library(
- target='replication_executor',
- source=[
- 'replication_executor.cpp',
- ],
- LIBDEPS=[
- 'database_task',
- 'task_runner',
- '$BUILD_DIR/mongo/executor/network_interface',
- '$BUILD_DIR/mongo/executor/task_executor_interface',
- ],
-)
-
-env.Library(
target='scatter_gather',
source=[
'scatter_gather_algorithm.cpp',
@@ -235,24 +222,6 @@ env.Library(
],
)
-
-env.CppUnitTest(
- target='replication_executor_test',
- source=[
- 'replication_executor_test.cpp',
- 'replication_executor_test_fixture.cpp',
- ],
- LIBDEPS=[
- 'replication_executor',
- 'replmocks',
- 'service_context_repl_mock_init',
- '$BUILD_DIR/mongo/db/auth/authorization_manager_global',
- '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
- '$BUILD_DIR/mongo/executor/task_executor_test_fixture',
- '$BUILD_DIR/mongo/unittest/concurrency',
- ],
-)
-
env.Library(
target='oplog_buffer_blocking_queue',
source=[
@@ -583,9 +552,11 @@ env.Library('repl_coordinator_impl',
LIBDEPS=[
'$BUILD_DIR/mongo/db/commands/test_commands_enabled',
'$BUILD_DIR/mongo/db/common',
+ '$BUILD_DIR/mongo/db/concurrency/lock_manager',
'$BUILD_DIR/mongo/db/index/index_descriptor',
'$BUILD_DIR/mongo/db/server_options_core',
'$BUILD_DIR/mongo/db/service_context',
+ '$BUILD_DIR/mongo/executor/task_executor_interface',
'$BUILD_DIR/mongo/rpc/command_status',
'$BUILD_DIR/mongo/rpc/metadata',
'$BUILD_DIR/mongo/transport/transport_layer_common',
@@ -597,7 +568,6 @@ env.Library('repl_coordinator_impl',
'repl_coordinator_interface',
'repl_settings',
'replica_set_messages',
- 'replication_executor',
'replication_process',
'reporter',
'rslog',
@@ -617,6 +587,8 @@ env.Library(
'topology_coordinator_impl',
'$BUILD_DIR/mongo/db/auth/authorization_manager_global',
'$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
+ '$BUILD_DIR/mongo/executor/network_interface_mock',
+ '$BUILD_DIR/mongo/executor/thread_pool_task_executor',
'$BUILD_DIR/mongo/unittest/unittest',
],
)
@@ -851,7 +823,6 @@ env.Library(
'$BUILD_DIR/mongo/db/storage/storage_options',
'repl_settings',
'replica_set_messages',
- 'replication_executor',
'replication_process',
'serveronly',
],
@@ -955,9 +926,10 @@ env.Library(
],
LIBDEPS=[
'replica_set_messages',
- 'replication_executor',
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/namespace_string',
+ '$BUILD_DIR/mongo/executor/remote_command',
+ '$BUILD_DIR/mongo/executor/task_executor_interface',
'$BUILD_DIR/mongo/rpc/command_status',
],
)
@@ -1144,27 +1116,6 @@ env.CppUnitTest(
],
)
-env.Library(
- target='database_task',
- source=[
- 'database_task.cpp',
- ],
- LIBDEPS=[
- '$BUILD_DIR/mongo/db/concurrency/lock_manager',
- '$BUILD_DIR/mongo/db/concurrency/write_conflict_exception',
- '$BUILD_DIR/mongo/db/curop',
- ],
-)
-
-env.CppUnitTest(
- target='database_task_test',
- source='database_task_test.cpp',
- LIBDEPS=[
- 'database_task',
- 'task_runner_test_fixture',
- ],
-)
-
env.CppUnitTest(
target='read_concern_args_test',
source=[
@@ -1302,7 +1253,7 @@ env.Library(
'rollback_checker.cpp',
],
LIBDEPS=[
- 'replication_executor',
+ '$BUILD_DIR/mongo/executor/task_executor_interface',
],
)
diff --git a/src/mongo/db/repl/database_task.cpp b/src/mongo/db/repl/database_task.cpp
deleted file mode 100644
index 41f29aff342..00000000000
--- a/src/mongo/db/repl/database_task.cpp
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/concurrency/d_concurrency.h"
-#include "mongo/db/concurrency/write_conflict_exception.h"
-#include "mongo/db/curop.h"
-#include "mongo/db/operation_context.h"
-#include "mongo/db/repl/database_task.h"
-#include "mongo/util/assert_util.h"
-
-namespace mongo {
-namespace repl {
-
-// static
-DatabaseTask::Task DatabaseTask::makeGlobalExclusiveLockTask(const Task& task) {
- invariant(task);
- DatabaseTask::Task newTask = [task](OperationContext* opCtx, const Status& status) {
- if (!status.isOK()) {
- return task(opCtx, status);
- }
- MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- Lock::GlobalWrite lock(opCtx);
- return task(opCtx, status);
- }
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "globalExclusiveLockTask", "global");
- MONGO_UNREACHABLE;
- };
- return newTask;
-}
-
-// static
-DatabaseTask::Task DatabaseTask::makeDatabaseLockTask(const Task& task,
- const std::string& databaseName,
- LockMode mode) {
- invariant(task);
- DatabaseTask::Task newTask = [=](OperationContext* opCtx, const Status& status) {
- if (!status.isOK()) {
- return task(opCtx, status);
- }
- MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- Lock::DBLock lock(opCtx, databaseName, mode);
- return task(opCtx, status);
- }
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "databaseLockTask", databaseName);
- MONGO_UNREACHABLE;
- };
- return newTask;
-}
-
-// static
-DatabaseTask::Task DatabaseTask::makeCollectionLockTask(const Task& task,
- const NamespaceString& nss,
- LockMode mode) {
- invariant(task);
- DatabaseTask::Task newTask = [=](OperationContext* opCtx, const Status& status) {
- if (!status.isOK()) {
- return task(opCtx, status);
- }
- MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- LockMode permissiveLockMode = isSharedLockMode(mode) ? MODE_IS : MODE_IX;
- Lock::DBLock lock(opCtx, nss.db(), permissiveLockMode);
- Lock::CollectionLock collectionLock(opCtx->lockState(), nss.toString(), mode);
- return task(opCtx, status);
- }
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "collectionLockTask", nss.toString());
- MONGO_UNREACHABLE;
- };
- return newTask;
-}
-
-} // namespace repl
-} // namespace mongo
diff --git a/src/mongo/db/repl/database_task.h b/src/mongo/db/repl/database_task.h
deleted file mode 100644
index bde2df64c09..00000000000
--- a/src/mongo/db/repl/database_task.h
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <string>
-
-#include "mongo/db/concurrency/lock_manager_defs.h"
-#include "mongo/db/namespace_string.h"
-#include "mongo/db/repl/task_runner.h"
-
-namespace mongo {
-
-class OperationContext;
-
-namespace repl {
-
-class DatabaseTask {
-private:
- DatabaseTask();
-
-public:
- using Task = TaskRunner::Task;
-
- /**
- * Creates a task wrapper that runs the target task inside a global exclusive lock.
- */
- static Task makeGlobalExclusiveLockTask(const Task& task);
-
- /**
- * Creates a task wrapper that runs the target task inside a database lock.
- */
- static Task makeDatabaseLockTask(const Task& task,
- const std::string& databaseName,
- LockMode mode);
-
- /**
- * Creates a task wrapper that runs the target task inside a collection lock.
- * Task acquires database lock before attempting to lock collection. Do not
- * use in combination with makeDatabaseLockTask().
- */
- static Task makeCollectionLockTask(const Task& task, const NamespaceString& nss, LockMode mode);
-};
-
-} // namespace repl
-} // namespace mongo
diff --git a/src/mongo/db/repl/database_task_test.cpp b/src/mongo/db/repl/database_task_test.cpp
deleted file mode 100644
index beaa896acf0..00000000000
--- a/src/mongo/db/repl/database_task_test.cpp
+++ /dev/null
@@ -1,177 +0,0 @@
-/**
- * Copyright 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/client.h"
-#include "mongo/db/operation_context.h"
-#include "mongo/db/repl/database_task.h"
-#include "mongo/db/repl/task_runner.h"
-#include "mongo/db/repl/task_runner_test_fixture.h"
-#include "mongo/stdx/mutex.h"
-#include "mongo/util/concurrency/old_thread_pool.h"
-
-namespace {
-
-using namespace mongo;
-using namespace mongo::repl;
-
-const std::string databaseName = "mydb";
-const std::string collectionName = "mycoll";
-const NamespaceString nss(databaseName, collectionName);
-
-class DatabaseTaskTest : public TaskRunnerTest {};
-
-TEST_F(DatabaseTaskTest, TaskRunnerErrorStatus) {
- // Should not attempt to acquire lock on error status from task runner.
- auto task = [](OperationContext* opCtx, const Status& status) {
- ASSERT_FALSE(opCtx);
- ASSERT_EQUALS(ErrorCodes::BadValue, status.code());
- return TaskRunner::NextAction::kInvalid;
- };
- auto testLockTask = [](DatabaseTask::Task task) {
- ASSERT_TRUE(TaskRunner::NextAction::kInvalid ==
- task(nullptr, Status(ErrorCodes::BadValue, "")));
- };
- testLockTask(DatabaseTask::makeGlobalExclusiveLockTask(task));
- testLockTask(DatabaseTask::makeDatabaseLockTask(task, databaseName, MODE_X));
- testLockTask(DatabaseTask::makeCollectionLockTask(task, nss, MODE_X));
-}
-
-TEST_F(DatabaseTaskTest, RunGlobalExclusiveLockTask) {
- stdx::mutex mutex;
- bool called = false;
- OperationContext* opCtx = nullptr;
- bool lockIsW = false;
- Status status = getDetectableErrorStatus();
- // Task returning 'void' implies NextAction::NoAction.
- auto task = [&](OperationContext* theTxn, const Status& theStatus) {
- stdx::lock_guard<stdx::mutex> lk(mutex);
- called = true;
- opCtx = theTxn;
- lockIsW = opCtx->lockState()->isW();
- status = theStatus;
- return TaskRunner::NextAction::kCancel;
- };
- getTaskRunner().schedule(DatabaseTask::makeGlobalExclusiveLockTask(task));
- getThreadPool().join();
- ASSERT_FALSE(getTaskRunner().isActive());
-
- stdx::lock_guard<stdx::mutex> lk(mutex);
- ASSERT_TRUE(called);
- ASSERT(opCtx);
- ASSERT_TRUE(lockIsW);
- ASSERT_OK(status);
-}
-
-void _testRunDatabaseLockTask(DatabaseTaskTest& test, LockMode mode) {
- stdx::mutex mutex;
- bool called = false;
- OperationContext* opCtx = nullptr;
- bool isDatabaseLockedForMode = false;
- Status status = test.getDetectableErrorStatus();
- // Task returning 'void' implies NextAction::NoAction.
- auto task = [&](OperationContext* theTxn, const Status& theStatus) {
- stdx::lock_guard<stdx::mutex> lk(mutex);
- called = true;
- opCtx = theTxn;
- isDatabaseLockedForMode = opCtx->lockState()->isDbLockedForMode(databaseName, mode);
- status = theStatus;
- return TaskRunner::NextAction::kCancel;
- };
- test.getTaskRunner().schedule(DatabaseTask::makeDatabaseLockTask(task, databaseName, mode));
- test.getThreadPool().join();
- ASSERT_FALSE(test.getTaskRunner().isActive());
-
- stdx::lock_guard<stdx::mutex> lk(mutex);
- ASSERT_TRUE(called);
- ASSERT(opCtx);
- ASSERT_TRUE(isDatabaseLockedForMode);
- ASSERT_OK(status);
-}
-
-TEST_F(DatabaseTaskTest, RunDatabaseLockTaskModeX) {
- _testRunDatabaseLockTask(*this, MODE_X);
-}
-
-TEST_F(DatabaseTaskTest, RunDatabaseLockTaskModeS) {
- _testRunDatabaseLockTask(*this, MODE_S);
-}
-
-TEST_F(DatabaseTaskTest, RunDatabaseLockTaskModeIX) {
- _testRunDatabaseLockTask(*this, MODE_IX);
-}
-
-TEST_F(DatabaseTaskTest, RunDatabaseLockTaskModeIS) {
- _testRunDatabaseLockTask(*this, MODE_IS);
-}
-
-void _testRunCollectionLockTask(DatabaseTaskTest& test, LockMode mode) {
- stdx::mutex mutex;
- bool called = false;
- OperationContext* opCtx = nullptr;
- bool isCollectionLockedForMode = false;
- Status status = test.getDetectableErrorStatus();
- // Task returning 'void' implies NextAction::NoAction.
- auto task = [&](OperationContext* theTxn, const Status& theStatus) {
- stdx::lock_guard<stdx::mutex> lk(mutex);
- called = true;
- opCtx = theTxn;
- isCollectionLockedForMode =
- opCtx->lockState()->isCollectionLockedForMode(nss.toString(), mode);
- status = theStatus;
- return TaskRunner::NextAction::kCancel;
- };
- test.getTaskRunner().schedule(DatabaseTask::makeCollectionLockTask(task, nss, mode));
- test.getThreadPool().join();
- ASSERT_FALSE(test.getTaskRunner().isActive());
-
- stdx::lock_guard<stdx::mutex> lk(mutex);
- ASSERT_TRUE(called);
- ASSERT(opCtx);
- ASSERT_TRUE(isCollectionLockedForMode);
- ASSERT_OK(status);
-}
-
-TEST_F(DatabaseTaskTest, RunCollectionLockTaskModeX) {
- _testRunCollectionLockTask(*this, MODE_X);
-}
-
-TEST_F(DatabaseTaskTest, RunCollectionLockTaskModeS) {
- _testRunCollectionLockTask(*this, MODE_S);
-}
-
-TEST_F(DatabaseTaskTest, RunCollectionLockTaskModeIX) {
- _testRunCollectionLockTask(*this, MODE_IX);
-}
-
-TEST_F(DatabaseTaskTest, RunCollectionLockTaskModeIS) {
- _testRunCollectionLockTask(*this, MODE_IS);
-}
-
-} // namespace
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 775efcdf8fb..0ebb923a4ff 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -285,7 +285,7 @@ ReplicationCoordinatorImpl::ReplicationCoordinatorImpl(
ServiceContext* service,
const ReplSettings& settings,
std::unique_ptr<ReplicationCoordinatorExternalState> externalState,
- std::unique_ptr<NetworkInterface> network,
+ std::unique_ptr<executor::TaskExecutor> executor,
std::unique_ptr<TopologyCoordinator> topCoord,
ReplicationProcess* replicationProcess,
StorageInterface* storage,
@@ -294,7 +294,7 @@ ReplicationCoordinatorImpl::ReplicationCoordinatorImpl(
_settings(settings),
_replMode(getReplicationModeFromSettings(settings)),
_topCoord(std::move(topCoord)),
- _replExecutor(stdx::make_unique<ReplicationExecutor>(std::move(network), prngSeed)),
+ _replExecutor(std::move(executor)),
_externalState(std::move(externalState)),
_inShutdown(false),
_memberState(MemberState::RS_STARTUP),
@@ -438,16 +438,19 @@ bool ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* opCtx)
// Use a callback here, because _finishLoadLocalConfig calls isself() which requires
// that the server's networking layer be up and running and accepting connections, which
// doesn't happen until startReplication finishes.
- auto handle = _scheduleDBWork(stdx::bind(&ReplicationCoordinatorImpl::_finishLoadLocalConfig,
- this,
- stdx::placeholders::_1,
- localConfig,
- lastOpTimeStatus,
- lastVote));
- {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _finishLoadLocalConfigCbh = handle;
- }
+ auto handle =
+ _replExecutor->scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_finishLoadLocalConfig,
+ this,
+ stdx::placeholders::_1,
+ localConfig,
+ lastOpTimeStatus,
+ lastVote));
+ if (handle == ErrorCodes::ShutdownInProgress) {
+ handle = CallbackHandle{};
+ }
+ fassertStatusOK(40446, handle);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _finishLoadLocalConfigCbh = std::move(handle.getValue());
return false;
}
@@ -536,8 +539,8 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig(
if (!isArbiter) {
_externalState->startThreads(_settings);
- invariant(cbData.opCtx);
- _startDataReplication(cbData.opCtx);
+ auto opCtx = cc().makeOperationContext();
+ _startDataReplication(opCtx.get());
}
}
@@ -690,8 +693,7 @@ void ReplicationCoordinatorImpl::shutdown(OperationContext* opCtx) {
// Shutdown must:
// * prevent new threads from blocking in awaitReplication
// * wake up all existing threads blocking in awaitReplication
- // * tell the ReplicationExecutor to shut down
- // * wait for the thread running the ReplicationExecutor to finish
+ // * Shut down and join the execution resources it owns.
if (!_settings.usingReplSets()) {
return;
@@ -802,7 +804,7 @@ bool ReplicationCoordinatorImpl::setFollowerMode(const MemberState& newState) {
return false;
}
- if (auto electionFinishedEvent = _cancelElectionIfNeeded_inTopoLock()) {
+ if (auto electionFinishedEvent = _cancelElectionIfNeeded_inlock()) {
// We were a candidate, which means _topCoord believed us to be in state RS_SECONDARY, and
// we know that newState != RS_SECONDARY because we would have returned early, above if
// the old and new state were equal. So, try again after the election is over to
@@ -2161,53 +2163,51 @@ Status ReplicationCoordinatorImpl::processReplSetReconfig(OperationContext* opCt
}
auto reconfigFinished = uassertStatusOK(_replExecutor->makeEvent());
- const auto reconfigFinishFn =
- [ this, newConfig, myIndex = myIndex.getValue(), reconfigFinished ](
- const executor::TaskExecutor::CallbackArgs& cbData) {
-
- if (!cbData.status.isOK()) {
- return;
- }
- _finishReplSetReconfig(newConfig, myIndex, reconfigFinished);
- };
-
- // If it's a force reconfig, the primary node may not be electable after the configuration
- // change. In case we are that primary node, finish the reconfig under the global lock,
- // so that the step down occurs safely.
- StatusWith<CallbackHandle> cbhStatus(ErrorCodes::InternalError,
- "reconfigFinishFn hasn't been scheduled");
- if (args.force) {
- cbhStatus = _replExecutor->scheduleWorkWithGlobalExclusiveLock(reconfigFinishFn);
- } else {
- cbhStatus = _replExecutor->scheduleWork(reconfigFinishFn);
- }
- if (cbhStatus.getStatus() == ErrorCodes::ShutdownInProgress) {
- return cbhStatus.getStatus();
- }
-
- fassert(18824, cbhStatus.getStatus());
-
+ uassertStatusOK(
+ _replExecutor->scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_finishReplSetReconfig,
+ this,
+ stdx::placeholders::_1,
+ newConfig,
+ args.force,
+ myIndex.getValue(),
+ reconfigFinished)));
configStateGuard.Dismiss();
_replExecutor->waitForEvent(reconfigFinished);
return Status::OK();
}
void ReplicationCoordinatorImpl::_finishReplSetReconfig(
+ const executor::TaskExecutor::CallbackArgs& cbData,
const ReplSetConfig& newConfig,
+ const bool isForceReconfig,
int myIndex,
const executor::TaskExecutor::EventHandle& finishedEvent) {
+
+ if (cbData.status == ErrorCodes::CallbackCanceled) {
+ return;
+ }
+ auto opCtx = cc().makeOperationContext();
+ boost::optional<Lock::GlobalWrite> globalExclusiveLock;
+ if (isForceReconfig) {
+ // Since it's a force reconfig, the primary node may not be electable after the
+ // configuration change. In case we are that primary node, finish the reconfig under the
+ // global lock, so that the step down occurs safely.
+ globalExclusiveLock.emplace(opCtx.get());
+ }
stdx::unique_lock<stdx::mutex> lk(_mutex);
invariant(_rsConfigState == kConfigReconfiguring);
invariant(_rsConfig.isInitialized());
// Do not conduct an election during a reconfig, as the node may not be electable post-reconfig.
- if (auto electionFinishedEvent = _cancelElectionIfNeeded_inTopoLock()) {
+ if (auto electionFinishedEvent = _cancelElectionIfNeeded_inlock()) {
// Wait for the election to complete and the node's Role to be set to follower.
_replExecutor->onEvent(electionFinishedEvent,
stdx::bind(&ReplicationCoordinatorImpl::_finishReplSetReconfig,
this,
+ stdx::placeholders::_1,
newConfig,
+ isForceReconfig,
myIndex,
finishedEvent));
return;
@@ -2220,15 +2220,9 @@ void ReplicationCoordinatorImpl::_finishReplSetReconfig(
// For example, if we change the meaning of the "committed" snapshot from applied -> durable.
_dropAllSnapshots_inlock();
- auto evh = _resetElectionInfoOnProtocolVersionUpgrade(oldConfig, newConfig);
lk.unlock();
- if (evh) {
- _replExecutor->onEvent(evh, [this, action](const CallbackArgs& cbArgs) {
- _performPostMemberStateUpdateAction(action);
- });
- } else {
- _performPostMemberStateUpdateAction(action);
- }
+ _resetElectionInfoOnProtocolVersionUpgrade(opCtx.get(), oldConfig, newConfig);
+ _performPostMemberStateUpdateAction(action);
_replExecutor->signalEvent(finishedEvent);
}
@@ -3276,90 +3270,32 @@ void ReplicationCoordinatorImpl::waitForElectionDryRunFinish_forTest() {
}
}
-EventHandle ReplicationCoordinatorImpl::_resetElectionInfoOnProtocolVersionUpgrade(
- const ReplSetConfig& oldConfig, const ReplSetConfig& newConfig) {
+void ReplicationCoordinatorImpl::_resetElectionInfoOnProtocolVersionUpgrade(
+ OperationContext* opCtx, const ReplSetConfig& oldConfig, const ReplSetConfig& newConfig) {
+
// On protocol version upgrade, reset last vote as if I just learned the term 0 from other
// nodes.
if (!oldConfig.isInitialized() ||
oldConfig.getProtocolVersion() >= newConfig.getProtocolVersion()) {
- return {};
+ return;
}
invariant(newConfig.getProtocolVersion() == 1);
- // Write last vote
- auto evhStatus = _replExecutor->makeEvent();
- if (evhStatus.getStatus() == ErrorCodes::ShutdownInProgress) {
- return {};
- }
- invariant(evhStatus.isOK());
- auto evh = evhStatus.getValue();
-
- auto cbStatus = _replExecutor->scheduleDBWork([this, evh](const CallbackArgs& cbData) {
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- return;
- }
- invariant(cbData.opCtx);
-
- LastVote lastVote{OpTime::kInitialTerm, -1};
- auto status = _externalState->storeLocalLastVoteDocument(cbData.opCtx, lastVote);
- invariant(status.isOK());
- _replExecutor->signalEvent(evh);
- });
- if (cbStatus.getStatus() == ErrorCodes::ShutdownInProgress) {
- return {};
- }
- invariant(cbStatus.isOK());
- return evh;
-}
-
-CallbackHandle ReplicationCoordinatorImpl::_scheduleWork(const CallbackFn& work) {
- auto scheduleFn = [this](const CallbackFn& workWrapped) {
- return _replExecutor->scheduleWork(workWrapped);
- };
- return _wrapAndScheduleWork(scheduleFn, work);
+ const LastVote lastVote{OpTime::kInitialTerm, -1};
+ fassert(40445, _externalState->storeLocalLastVoteDocument(opCtx, lastVote));
}
CallbackHandle ReplicationCoordinatorImpl::_scheduleWorkAt(Date_t when, const CallbackFn& work) {
- auto scheduleFn = [this, when](const CallbackFn& workWrapped) {
- return _replExecutor->scheduleWorkAt(when, workWrapped);
- };
- return _wrapAndScheduleWork(scheduleFn, work);
-}
-
-void ReplicationCoordinatorImpl::_scheduleWorkAndWaitForCompletion(const CallbackFn& work) {
- if (auto handle = _scheduleWork(work)) {
- _replExecutor->wait(handle);
- }
-}
-
-void ReplicationCoordinatorImpl::_scheduleWorkAtAndWaitForCompletion(Date_t when,
- const CallbackFn& work) {
- if (auto handle = _scheduleWorkAt(when, work)) {
- _replExecutor->wait(handle);
- }
-}
-
-CallbackHandle ReplicationCoordinatorImpl::_scheduleDBWork(const CallbackFn& work) {
- auto scheduleFn = [this](const CallbackFn& workWrapped) {
- return _replExecutor->scheduleDBWork(workWrapped);
- };
- return _wrapAndScheduleWork(scheduleFn, work);
-}
-
-CallbackHandle ReplicationCoordinatorImpl::_wrapAndScheduleWork(ScheduleFn scheduleFn,
- const CallbackFn& work) {
- auto workWrapped = [this, work](const CallbackArgs& args) {
+ auto cbh = _replExecutor->scheduleWorkAt(when, [work](const CallbackArgs& args) {
if (args.status == ErrorCodes::CallbackCanceled) {
return;
}
work(args);
- };
- auto cbh = scheduleFn(workWrapped);
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return CallbackHandle();
+ });
+ if (cbh == ErrorCodes::ShutdownInProgress) {
+ return {};
}
- fassert(28800, cbh.getStatus());
- return cbh.getValue();
+ return fassertStatusOK(28800, cbh);
}
EventHandle ReplicationCoordinatorImpl::_makeEvent() {
@@ -3429,8 +3365,7 @@ void ReplicationCoordinatorImpl::setIndexPrefetchConfig(
_indexPrefetchConfig = cfg;
}
-executor::TaskExecutor::EventHandle
-ReplicationCoordinatorImpl::_cancelElectionIfNeeded_inTopoLock() {
+executor::TaskExecutor::EventHandle ReplicationCoordinatorImpl::_cancelElectionIfNeeded_inlock() {
if (_topCoord->getRole() != TopologyCoordinator::Role::candidate) {
return {};
}
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index 385413d8cb2..4bdb1cd4daf 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -42,7 +42,6 @@
#include "mongo/db/repl/repl_set_config.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/replication_coordinator_external_state.h"
-#include "mongo/db/repl/replication_executor.h"
#include "mongo/db/repl/sync_source_resolver.h"
#include "mongo/db/repl/topology_coordinator.h"
#include "mongo/db/repl/update_position_args.h"
@@ -94,7 +93,7 @@ public:
ReplicationCoordinatorImpl(ServiceContext* serviceContext,
const ReplSettings& settings,
std::unique_ptr<ReplicationCoordinatorExternalState> externalState,
- std::unique_ptr<executor::NetworkInterface> network,
+ std::unique_ptr<executor::TaskExecutor> executor,
std::unique_ptr<TopologyCoordinator> topoCoord,
ReplicationProcess* replicationProcess,
StorageInterface* storage,
@@ -108,12 +107,6 @@ public:
virtual void shutdown(OperationContext* opCtx) override;
- ReplicationExecutor* getExecutor() {
- return _replExecutor.get();
- }
-
- virtual void appendDiagnosticBSON(BSONObjBuilder* bob) override;
-
virtual const ReplSettings& getSettings() const override;
virtual Mode getReplicationMode() const override;
@@ -320,6 +313,8 @@ public:
virtual void waitUntilSnapshotCommitted(OperationContext* opCtx,
const SnapshotName& untilSnapshot) override;
+ virtual void appendDiagnosticBSON(BSONObjBuilder*) override;
+
virtual void appendConnectionStats(executor::ConnectionPoolStats* stats) const override;
virtual size_t getNumUncommittedSnapshots() override;
@@ -569,7 +564,7 @@ private:
private:
ReplicationCoordinatorImpl* _repl; // Not owned.
// Callback handle used to cancel a scheduled catchup timeout callback.
- ReplicationExecutor::CallbackHandle _timeoutCbh;
+ executor::TaskExecutor::CallbackHandle _timeoutCbh;
// Handle to a Waiter that contains the current target optime to reach after which
// we can exit catchup mode.
std::unique_ptr<CallbackWaiter> _waiter;
@@ -798,7 +793,9 @@ private:
* Finishes the work of processReplSetReconfig, in the event of
* a successful quorum check.
*/
- void _finishReplSetReconfig(const ReplSetConfig& newConfig,
+ void _finishReplSetReconfig(const executor::TaskExecutor::CallbackArgs& cbData,
+ const ReplSetConfig& newConfig,
+ bool isForceReconfig,
int myIndex,
const executor::TaskExecutor::EventHandle& finishedEvent);
@@ -921,13 +918,6 @@ private:
void _scheduleHeartbeatReconfig_inlock(const ReplSetConfig& newConfig);
/**
- * Callback that continues a heartbeat-initiated reconfig after a running election
- * completes.
- */
- void _heartbeatReconfigAfterElectionCanceled(const executor::TaskExecutor::CallbackArgs& cbData,
- const ReplSetConfig& newConfig);
-
- /**
* Method to write a configuration transmitted via heartbeat message to stable storage.
*/
void _heartbeatReconfigStore(const executor::TaskExecutor::CallbackArgs& cbd,
@@ -1059,18 +1049,10 @@ private:
/**
* Resets the term of last vote to 0 to prevent any node from voting for term 0.
- * Returns the event handle that indicates when last vote write finishes.
*/
- EventHandle _resetElectionInfoOnProtocolVersionUpgrade(const ReplSetConfig& oldConfig,
- const ReplSetConfig& newConfig);
-
- /**
- * Schedules work and returns handle to callback.
- * If work cannot be scheduled due to shutdown, returns empty handle.
- * All other non-shutdown scheduling failures will abort the process.
- * Does not run 'work' if callback is canceled.
- */
- CallbackHandle _scheduleWork(const CallbackFn& work);
+ void _resetElectionInfoOnProtocolVersionUpgrade(OperationContext* opCtx,
+ const ReplSetConfig& oldConfig,
+ const ReplSetConfig& newConfig);
/**
* Schedules work to be run no sooner than 'when' and returns handle to callback.
@@ -1081,31 +1063,6 @@ private:
CallbackHandle _scheduleWorkAt(Date_t when, const CallbackFn& work);
/**
- * Schedules work and waits for completion.
- */
- void _scheduleWorkAndWaitForCompletion(const CallbackFn& work);
-
- /**
- * Schedules work to be run no sooner than 'when' and waits for completion.
- */
- void _scheduleWorkAtAndWaitForCompletion(Date_t when, const CallbackFn& work);
-
- /**
- * Schedules DB work and returns handle to callback.
- * If work cannot be scheduled due to shutdown, returns empty handle.
- * All other non-shutdown scheduling failures will abort the process.
- * Does not run 'work' if callback is canceled.
- */
- CallbackHandle _scheduleDBWork(const CallbackFn& work);
-
- /**
- * Does the actual work of scheduling the work with the executor.
- * Used by _scheduleWork() and _scheduleWorkAt() only.
- * Do not call this function directly.
- */
- CallbackHandle _wrapAndScheduleWork(ScheduleFn scheduleFn, const CallbackFn& work);
-
- /**
* Creates an event.
* Returns invalid event handle if the executor is shutting down.
* Otherwise aborts on non-shutdown error.
@@ -1133,10 +1090,8 @@ private:
* Cancels the running election, if any, and returns an event that will be signaled when the
* canceled election completes. If there is no running election, returns an invalid event
* handle.
- *
- * Caller must already have locked the _topoMutex.
*/
- executor::TaskExecutor::EventHandle _cancelElectionIfNeeded_inTopoLock();
+ executor::TaskExecutor::EventHandle _cancelElectionIfNeeded_inlock();
/**
* Waits until the optime of the current node is at least the opTime specified in 'readConcern'.
@@ -1196,7 +1151,7 @@ private:
std::unique_ptr<TopologyCoordinator> _topCoord; // (M)
// Executor that drives the topology coordinator.
- std::unique_ptr<ReplicationExecutor> _replExecutor; // (S)
+ std::unique_ptr<executor::TaskExecutor> _replExecutor; // (S)
// Pointer to the ReplicationCoordinatorExternalState owned by this ReplicationCoordinator.
std::unique_ptr<ReplicationCoordinatorExternalState> _externalState; // (PS)
diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp
index 4d42efe75eb..d227f78b76b 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp
@@ -189,7 +189,7 @@ void ReplicationCoordinatorImpl::_onDryRunComplete(long long originalTerm) {
// Store the vote in persistent storage.
LastVote lastVote{originalTerm + 1, _selfIndex};
- auto cbStatus = _replExecutor->scheduleDBWork(
+ auto cbStatus = _replExecutor->scheduleWork(
[this, lastVote](const executor::TaskExecutor::CallbackArgs& cbData) {
_writeLastVoteForMyElection(lastVote, cbData);
});
@@ -206,21 +206,23 @@ void ReplicationCoordinatorImpl::_writeLastVoteForMyElection(
// so _mutex must be unlocked here. However, we cannot return until we
// lock it because we want to lose the election on cancel or error and
// doing so requires _mutex.
- Status status = Status::OK();
- if (cbData.status.isOK()) {
- invariant(cbData.opCtx);
- status = _externalState->storeLocalLastVoteDocument(cbData.opCtx, lastVote);
- }
+ auto status = [&] {
+ if (!cbData.status.isOK()) {
+ return cbData.status;
+ }
+ auto opCtx = cc().makeOperationContext();
+ return _externalState->storeLocalLastVoteDocument(opCtx.get(), lastVote);
+ }();
stdx::lock_guard<stdx::mutex> lk(_mutex);
invariant(_voteRequester);
LoseElectionDryRunGuardV1 lossGuard(this);
- if (!cbData.status.isOK()) {
+ if (status == ErrorCodes::CallbackCanceled) {
return;
}
if (!status.isOK()) {
- error() << "failed to store LastVote document when voting for myself: " << status;
+ log() << "failed to store LastVote document when voting for myself: " << status;
return;
}
diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
index e4417546af9..3c3e9263a36 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
@@ -1330,11 +1330,6 @@ protected:
break;
}
net->runReadyNetworkOperations();
- // Successful elections need to write the last vote to disk, which is done by DB worker.
- // Wait until DB worker finishes its job to ensure the synchronization with the
- // executor.
- getReplExec()->waitForDBWork_forTest();
- net->runReadyNetworkOperations();
net->exitNetwork();
}
}
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
index 3b6836c45b9..04216de9202 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
@@ -337,7 +337,7 @@ executor::TaskExecutor::EventHandle ReplicationCoordinatorImpl::_stepDownStart()
return finishEvent;
}
- _replExecutor->scheduleWorkWithGlobalExclusiveLock(stdx::bind(
+ _replExecutor->scheduleWork(stdx::bind(
&ReplicationCoordinatorImpl::_stepDownFinish, this, stdx::placeholders::_1, finishEvent));
return finishEvent;
}
@@ -345,24 +345,15 @@ executor::TaskExecutor::EventHandle ReplicationCoordinatorImpl::_stepDownStart()
void ReplicationCoordinatorImpl::_stepDownFinish(
const executor::TaskExecutor::CallbackArgs& cbData,
const executor::TaskExecutor::EventHandle& finishedEvent) {
+
if (cbData.status == ErrorCodes::CallbackCanceled) {
return;
}
- if (MONGO_FAIL_POINT(blockHeartbeatStepdown)) {
- // Must reschedule rather than block so we don't take up threads in the replication
- // executor.
- sleepmillis(10);
- _replExecutor->scheduleWorkWithGlobalExclusiveLock(
- stdx::bind(&ReplicationCoordinatorImpl::_stepDownFinish,
- this,
- stdx::placeholders::_1,
- finishedEvent));
-
- return;
- }
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET(blockHeartbeatStepdown);
- invariant(cbData.opCtx);
+ auto opCtx = cc().makeOperationContext();
+ Lock::GlobalWrite globalExclusiveLock(opCtx.get());
// TODO Add invariant that we've got global shared or global exclusive lock, when supported
// by lock manager.
stdx::unique_lock<stdx::mutex> lk(_mutex);
@@ -402,44 +393,27 @@ void ReplicationCoordinatorImpl::_scheduleHeartbeatReconfig_inlock(const ReplSet
_setConfigState_inlock(kConfigHBReconfiguring);
invariant(!_rsConfig.isInitialized() ||
_rsConfig.getConfigVersion() < newConfig.getConfigVersion());
- if (auto electionFinishedEvent = _cancelElectionIfNeeded_inTopoLock()) {
+ if (auto electionFinishedEvent = _cancelElectionIfNeeded_inlock()) {
LOG_FOR_HEARTBEATS(2) << "Rescheduling heartbeat reconfig to version "
<< newConfig.getConfigVersion()
<< " to be processed after election is cancelled.";
- _replExecutor->onEvent(
- electionFinishedEvent,
- stdx::bind(&ReplicationCoordinatorImpl::_heartbeatReconfigAfterElectionCanceled,
- this,
- stdx::placeholders::_1,
- newConfig));
- return;
- }
- _replExecutor->scheduleDBWork(stdx::bind(&ReplicationCoordinatorImpl::_heartbeatReconfigStore,
- this,
- stdx::placeholders::_1,
- newConfig));
-}
-void ReplicationCoordinatorImpl::_heartbeatReconfigAfterElectionCanceled(
- const executor::TaskExecutor::CallbackArgs& cbData, const ReplSetConfig& newConfig) {
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- return;
- }
-
- fassert(18911, cbData.status);
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_inShutdown) {
+ _replExecutor->onEvent(electionFinishedEvent,
+ stdx::bind(&ReplicationCoordinatorImpl::_heartbeatReconfigStore,
+ this,
+ stdx::placeholders::_1,
+ newConfig));
return;
}
-
- _replExecutor->scheduleDBWork(stdx::bind(&ReplicationCoordinatorImpl::_heartbeatReconfigStore,
- this,
- stdx::placeholders::_1,
- newConfig));
+ _replExecutor->scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_heartbeatReconfigStore,
+ this,
+ stdx::placeholders::_1,
+ newConfig));
}
void ReplicationCoordinatorImpl::_heartbeatReconfigStore(
const executor::TaskExecutor::CallbackArgs& cbd, const ReplSetConfig& newConfig) {
+
if (cbd.status.code() == ErrorCodes::CallbackCanceled) {
log() << "The callback to persist the replica set configuration was canceled - "
<< "the configuration was not persisted but was used: " << newConfig.toBSON();
@@ -470,7 +444,9 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore(
} else {
LOG_FOR_HEARTBEATS(2) << "Config with version " << newConfig.getConfigVersion()
<< " validated for reconfig; persisting to disk.";
- Status status = _externalState->storeLocalConfigDocument(cbd.opCtx, newConfig.toBSON());
+
+ auto opCtx = cc().makeOperationContext();
+ auto status = _externalState->storeLocalConfigDocument(opCtx.get(), newConfig.toBSON());
bool isFirstConfig;
{
stdx::lock_guard<stdx::mutex> lk(_mutex);
@@ -493,33 +469,13 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore(
newConfig.getMemberAt(myIndex.getValue()).isArbiter();
if (!isArbiter && isFirstConfig) {
_externalState->startThreads(_settings);
- _startDataReplication(cbd.opCtx);
+ _startDataReplication(opCtx.get());
}
}
- LOG_FOR_HEARTBEATS(2)
- << "New configuration with version " << newConfig.getConfigVersion()
- << " persisted to local storage; scheduling work to install new config in memory.";
-
- const CallbackFn reconfigFinishFn(
- stdx::bind(&ReplicationCoordinatorImpl::_heartbeatReconfigFinish,
- this,
- stdx::placeholders::_1,
- newConfig,
- myIndex));
-
- // Make sure that the reconfigFinishFn doesn't finish until we've reset
- // _heartbeatReconfigThread.
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_memberState.primary()) {
- // If the primary is receiving a heartbeat reconfig, that strongly suggests
- // that there has been a force reconfiguration. In any event, it might lead
- // to this node stepping down as primary, so we'd better do it with the global
- // lock.
- _replExecutor->scheduleWorkWithGlobalExclusiveLock(reconfigFinishFn);
- } else {
- _replExecutor->scheduleWork(reconfigFinishFn);
- }
+ LOG_FOR_HEARTBEATS(2) << "New configuration with version " << newConfig.getConfigVersion()
+ << " persisted to local storage; installing new config in memory";
+ _heartbeatReconfigFinish(cbd, newConfig, myIndex);
}
void ReplicationCoordinatorImpl::_heartbeatReconfigFinish(
@@ -544,32 +500,24 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish(
return;
}
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ auto opCtx = cc().makeOperationContext();
+ boost::optional<Lock::GlobalWrite> globalExclusiveLock;
+ stdx::unique_lock<stdx::mutex> lk{_mutex};
+ if (_memberState.primary()) {
+ // If we are primary, we need the global lock in MODE_X to step down. If we somehow
+ // transition out of primary while waiting for the global lock, there's no harm in holding
+ // it.
+ lk.unlock();
+ globalExclusiveLock.emplace(opCtx.get());
+ lk.lock();
+ }
invariant(_rsConfigState == kConfigHBReconfiguring);
invariant(!_rsConfig.isInitialized() ||
_rsConfig.getConfigVersion() < newConfig.getConfigVersion());
- if (_getMemberState_inlock().primary() && !cbData.opCtx) {
- LOG_FOR_HEARTBEATS(2) << "Attempting to install new config without locks but we are "
- "primary; Rescheduling work with global exclusive lock to finish "
- "reconfig.";
- // Not having an OperationContext in the CallbackData means we definitely aren't holding
- // the global lock. Since we're primary and this reconfig could cause us to stepdown,
- // reschedule this work with the global exclusive lock so the stepdown is safe.
- // TODO(spencer): When we *do* have an OperationContext, consult it to confirm that
- // we are indeed holding the global lock.
- _replExecutor->scheduleWorkWithGlobalExclusiveLock(
- stdx::bind(&ReplicationCoordinatorImpl::_heartbeatReconfigFinish,
- this,
- stdx::placeholders::_1,
- newConfig,
- myIndex));
- return;
- }
-
// Do not conduct an election during a reconfig, as the node may not be electable post-reconfig.
- if (auto electionFinishedEvent = _cancelElectionIfNeeded_inTopoLock()) {
+ if (auto electionFinishedEvent = _cancelElectionIfNeeded_inlock()) {
LOG_FOR_HEARTBEATS(0)
<< "Waiting for election to complete before finishing reconfig to version "
<< newConfig.getConfigVersion();
@@ -607,16 +555,9 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish(
// the data structures inside of the TopologyCoordinator.
const int myIndexValue = myIndex.getStatus().isOK() ? myIndex.getValue() : -1;
const PostMemberStateUpdateAction action = _setCurrentRSConfig_inlock(newConfig, myIndexValue);
- auto evh = _resetElectionInfoOnProtocolVersionUpgrade(oldConfig, newConfig);
lk.unlock();
- if (evh) {
- _replExecutor->onEvent(evh,
- [this, action](const executor::TaskExecutor::CallbackArgs& cbArgs) {
- _performPostMemberStateUpdateAction(action);
- });
- } else {
- _performPostMemberStateUpdateAction(action);
- }
+ _resetElectionInfoOnProtocolVersionUpgrade(opCtx.get(), oldConfig, newConfig);
+ _performPostMemberStateUpdateAction(action);
}
void ReplicationCoordinatorImpl::_trackHeartbeatHandle_inlock(
diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
index 7f4cf49eed8..291d6a7652f 100644
--- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
+++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
@@ -44,6 +44,8 @@
#include "mongo/db/repl/storage_interface_mock.h"
#include "mongo/db/repl/topology_coordinator_impl.h"
#include "mongo/executor/network_interface_mock.h"
+#include "mongo/executor/thread_pool_mock.h"
+#include "mongo/executor/thread_pool_task_executor.h"
#include "mongo/stdx/functional.h"
#include "mongo/stdx/memory.h"
#include "mongo/unittest/unittest.h"
@@ -57,8 +59,8 @@ using executor::NetworkInterfaceMock;
using executor::RemoteCommandRequest;
using executor::RemoteCommandResponse;
-ReplicationExecutor* ReplCoordTest::getReplExec() {
- return _repl->getExecutor();
+executor::TaskExecutor* ReplCoordTest::getReplExec() {
+ return _replExec;
}
ReplSetConfig ReplCoordTest::assertMakeRSConfig(const BSONObj& configBson) {
@@ -138,10 +140,16 @@ void ReplCoordTest::init() {
_net = net.get();
auto externalState = stdx::make_unique<ReplicationCoordinatorExternalStateMock>();
_externalState = externalState.get();
+ executor::ThreadPoolMock::Options tpOptions;
+ tpOptions.onCreateThread = []() { Client::initThread("replexec"); };
+ auto pool = stdx::make_unique<executor::ThreadPoolMock>(_net, seed, tpOptions);
+ auto replExec =
+ stdx::make_unique<executor::ThreadPoolTaskExecutor>(std::move(pool), std::move(net));
+ _replExec = replExec.get();
_repl = stdx::make_unique<ReplicationCoordinatorImpl>(service,
_settings,
std::move(externalState),
- std::move(net),
+ std::move(replExec),
std::move(topo),
replicationProcess,
storageInterface,
@@ -350,11 +358,6 @@ void ReplCoordTest::simulateSuccessfulV1ElectionAt(Date_t electionTime) {
net->blackHole(noi);
}
net->runReadyNetworkOperations();
- // Successful elections need to write the last vote to disk, which is done by DB worker.
- // Wait until DB worker finishes its job to ensure the synchronization with the
- // executor.
- getReplExec()->waitForDBWork_forTest();
- net->runReadyNetworkOperations();
hasReadyRequests = net->hasReadyRequests();
getNet()->exitNetwork();
}
diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.h b/src/mongo/db/repl/replication_coordinator_test_fixture.h
index ab2653be11d..9f1b27aabea 100644
--- a/src/mongo/db/repl/replication_coordinator_test_fixture.h
+++ b/src/mongo/db/repl/replication_coordinator_test_fixture.h
@@ -33,7 +33,7 @@
#include "mongo/db/client.h"
#include "mongo/db/repl/repl_settings.h"
#include "mongo/db/repl/replication_coordinator.h"
-#include "mongo/db/repl/replication_executor.h"
+#include "mongo/executor/task_executor.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
@@ -102,7 +102,7 @@ protected:
/**
* Gets the replication executor under test.
*/
- ReplicationExecutor* getReplExec();
+ executor::TaskExecutor* getReplExec();
/**
* Gets the replication coordinator under test.
@@ -261,10 +261,13 @@ private:
std::unique_ptr<ReplicationCoordinatorImpl> _repl;
// Owned by ReplicationCoordinatorImpl
TopologyCoordinatorImpl* _topo = nullptr;
- // Owned by ReplicationExecutor
+ // Owned by executor
executor::NetworkInterfaceMock* _net = nullptr;
// Owned by ReplicationCoordinatorImpl
ReplicationCoordinatorExternalStateMock* _externalState = nullptr;
+ // Owned by ReplicationCoordinatorImpl
+ executor::TaskExecutor* _replExec = nullptr;
+
ReplSettings _settings;
bool _callShutdown = false;
ServiceContext::UniqueClient _client = getGlobalServiceContext()->makeClient("testClient");
diff --git a/src/mongo/db/repl/replication_executor.cpp b/src/mongo/db/repl/replication_executor.cpp
deleted file mode 100644
index cfdaa785b1a..00000000000
--- a/src/mongo/db/repl/replication_executor.cpp
+++ /dev/null
@@ -1,655 +0,0 @@
-/**
- * Copyright (C) 2014 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kExecutor
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/repl/replication_executor.h"
-
-#include <limits>
-
-#include "mongo/db/client.h"
-#include "mongo/db/repl/database_task.h"
-#include "mongo/executor/network_interface.h"
-#include "mongo/util/assert_util.h"
-#include "mongo/util/log.h"
-#include "mongo/util/mongoutils/str.h"
-
-namespace mongo {
-namespace repl {
-
-namespace {
-
-const char kReplicationExecutorThreadName[] = "ReplicationExecutor";
-
-stdx::function<void()> makeNoExcept(const stdx::function<void()>& fn);
-
-} // namespace
-
-using executor::NetworkInterface;
-using executor::RemoteCommandRequest;
-using executor::RemoteCommandResponse;
-
-ReplicationExecutor::ReplicationExecutor(std::unique_ptr<NetworkInterface> netInterface,
- int64_t prngSeed)
- : _random(prngSeed),
- _networkInterface(std::move(netInterface)),
- _inShutdown(false),
- _dblockWorkers(OldThreadPool::DoNotStartThreadsTag(), 3, "replExecDBWorker-"),
- _dblockTaskRunner(&_dblockWorkers),
- _dblockExclusiveLockTaskRunner(&_dblockWorkers) {}
-
-ReplicationExecutor::~ReplicationExecutor() {
- // join must have been called
- invariant(!_executorThread.joinable());
-}
-
-BSONObj ReplicationExecutor::getDiagnosticBSON() const {
- BSONObjBuilder b;
- appendDiagnosticBSON(&b);
- return b.obj();
-}
-
-void ReplicationExecutor::appendDiagnosticBSON(BSONObjBuilder* builder) const {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
-
- // Counters
- {
- BSONObjBuilder counters(builder->subobjStart("counters"));
- counters.appendIntOrLL("eventCreated", _counterCreatedEvents);
- counters.appendIntOrLL("eventWait", _counterCreatedEvents);
- counters.appendIntOrLL("cancels", _counterCancels);
- counters.appendIntOrLL("waits", _counterWaits);
- counters.appendIntOrLL("scheduledNetCmd", _counterScheduledCommands);
- counters.appendIntOrLL("scheduledDBWork", _counterScheduledDBWorks);
- counters.appendIntOrLL("scheduledXclWork", _counterScheduledExclusiveWorks);
- counters.appendIntOrLL("scheduledWorkAt", _counterScheduledWorkAts);
- counters.appendIntOrLL("scheduledWork", _counterScheduledWorks);
- counters.appendIntOrLL("schedulingFailures", _counterSchedulingFailures);
- }
-
- // Queues
- {
- BSONObjBuilder queues(builder->subobjStart("queues"));
- queues.appendIntOrLL("networkInProgress", _networkInProgressQueue.size());
- queues.appendIntOrLL("dbWorkInProgress", _dbWorkInProgressQueue.size());
- queues.appendIntOrLL("exclusiveInProgress", _exclusiveLockInProgressQueue.size());
- queues.appendIntOrLL("sleepers", _sleepersQueue.size());
- queues.appendIntOrLL("ready", _readyQueue.size());
- queues.appendIntOrLL("free", _freeQueue.size());
- queues.done();
- }
-
- builder->appendIntOrLL("unsignaledEvents", _unsignaledEvents.size());
- builder->appendIntOrLL("eventWaiters", _totalEventWaiters);
- builder->append("shuttingDown", _inShutdown);
- builder->append("networkInterface", _networkInterface->getDiagnosticString());
-}
-
-Date_t ReplicationExecutor::now() {
- return _networkInterface->now();
-}
-
-void ReplicationExecutor::run() {
- setThreadName(kReplicationExecutorThreadName);
- Client::initThread(kReplicationExecutorThreadName);
- _networkInterface->startup();
- _dblockWorkers.startThreads();
- std::pair<WorkItem, CallbackHandle> work;
- while ((work = getWork()).first.callback.isValid()) {
- {
- stdx::lock_guard<stdx::mutex> lk(_terribleExLockSyncMutex);
- const Callback* callback = _getCallbackFromHandle(work.first.callback);
- const Status inStatus = callback->_isCanceled
- ? Status(ErrorCodes::CallbackCanceled, "Callback canceled")
- : Status::OK();
- makeNoExcept(
- stdx::bind(callback->_callbackFn, CallbackArgs(this, work.second, inStatus)))();
- }
- signalEvent(work.first.finishedEvent);
- }
- finishShutdown();
- _networkInterface->shutdown();
-}
-
-void ReplicationExecutor::startup() {
- // Ensure that thread has not yet been created
- invariant(!_executorThread.joinable());
-
- _executorThread = stdx::thread([this] { run(); });
-}
-
-void ReplicationExecutor::shutdown() {
- // Correct shutdown needs to:
- // * Disable future work queueing.
- // * drain all of the unsignaled events, sleepers, and ready queue, by running those
- // callbacks with a "shutdown" or "canceled" status.
- // * Signal all threads blocked in waitForEvent, and wait for them to return from that method.
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_inShutdown)
- return;
- _inShutdown = true;
-
- _readyQueue.splice(_readyQueue.end(), _dbWorkInProgressQueue);
- _readyQueue.splice(_readyQueue.end(), _exclusiveLockInProgressQueue);
- _readyQueue.splice(_readyQueue.end(), _networkInProgressQueue);
- _readyQueue.splice(_readyQueue.end(), _sleepersQueue);
- for (auto event : _unsignaledEvents) {
- _readyQueue.splice(_readyQueue.end(), _getEventFromHandle(event)->_waiters);
- }
- for (auto readyWork : _readyQueue) {
- auto callback = _getCallbackFromHandle(readyWork.callback);
- callback->_isCanceled = true;
- callback->_isSleeper = false;
- }
-
- _networkInterface->signalWorkAvailable();
-}
-
-void ReplicationExecutor::join() {
- invariant(_executorThread.joinable());
- _executorThread.join();
-}
-
-void ReplicationExecutor::finishShutdown() {
- _dblockExclusiveLockTaskRunner.cancel();
- _dblockTaskRunner.cancel();
- _dblockWorkers.join();
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- invariant(_inShutdown);
- invariant(_dbWorkInProgressQueue.empty());
- invariant(_exclusiveLockInProgressQueue.empty());
- invariant(_readyQueue.empty());
- invariant(_sleepersQueue.empty());
-
- while (!_unsignaledEvents.empty()) {
- EventList::iterator eventIter = _unsignaledEvents.begin();
- invariant(_getEventFromHandle(*eventIter)->_waiters.empty());
- signalEvent_inlock(*eventIter);
- }
-
- while (_totalEventWaiters > 0)
- _noMoreWaitingThreads.wait(lk);
-
- invariant(_dbWorkInProgressQueue.empty());
- invariant(_exclusiveLockInProgressQueue.empty());
- invariant(_readyQueue.empty());
- invariant(_sleepersQueue.empty());
- invariant(_unsignaledEvents.empty());
-}
-
-void ReplicationExecutor::maybeNotifyShutdownComplete_inlock() {
- if (_totalEventWaiters == 0)
- _noMoreWaitingThreads.notify_all();
-}
-
-StatusWith<ReplicationExecutor::EventHandle> ReplicationExecutor::makeEvent() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- ++_counterCreatedEvents;
- return makeEvent_inlock();
-}
-
-StatusWith<ReplicationExecutor::EventHandle> ReplicationExecutor::makeEvent_inlock() {
- if (_inShutdown)
- return StatusWith<EventHandle>(ErrorCodes::ShutdownInProgress, "Shutdown in progress");
-
- _unsignaledEvents.emplace_back();
- auto event = std::make_shared<Event>(this, --_unsignaledEvents.end());
- setEventForHandle(&_unsignaledEvents.back(), std::move(event));
- return _unsignaledEvents.back();
-}
-
-void ReplicationExecutor::signalEvent(const EventHandle& eventHandle) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- signalEvent_inlock(eventHandle);
-}
-
-void ReplicationExecutor::signalEvent_inlock(const EventHandle& eventHandle) {
- Event* event = _getEventFromHandle(eventHandle);
- event->_signal_inlock();
- _unsignaledEvents.erase(event->_iter);
-}
-
-void ReplicationExecutor::waitForEvent(const EventHandle& event) {
- ++_counterWaitEvents;
- _getEventFromHandle(event)->waitUntilSignaled();
-}
-
-void ReplicationExecutor::cancel(const CallbackHandle& cbHandle) {
- ++_counterCancels;
- _getCallbackFromHandle(cbHandle)->cancel();
-};
-
-void ReplicationExecutor::wait(const CallbackHandle& cbHandle) {
- ++_counterWaits;
- _getCallbackFromHandle(cbHandle)->waitForCompletion();
-};
-
-StatusWith<ReplicationExecutor::CallbackHandle> ReplicationExecutor::onEvent(
- const EventHandle& eventHandle, const CallbackFn& work) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- WorkQueue* queue = &_readyQueue;
- Event* event = _getEventFromHandle(eventHandle);
- if (!event->_isSignaled) {
- queue = &event->_waiters;
- } else {
- queue = &_readyQueue;
- _networkInterface->signalWorkAvailable();
- }
- return enqueueWork_inlock(queue, work);
-}
-
-static void remoteCommandFinished(const ReplicationExecutor::CallbackArgs& cbData,
- const ReplicationExecutor::RemoteCommandCallbackFn& cb,
- const RemoteCommandRequest& request,
- const RemoteCommandResponse& response) {
- if (cbData.status.isOK()) {
- cb(ReplicationExecutor::RemoteCommandCallbackArgs(
- cbData.executor, cbData.myHandle, request, response));
- } else {
- cb(ReplicationExecutor::RemoteCommandCallbackArgs(
- cbData.executor, cbData.myHandle, request, cbData.status));
- }
-}
-
-static void remoteCommandFailedEarly(const ReplicationExecutor::CallbackArgs& cbData,
- const ReplicationExecutor::RemoteCommandCallbackFn& cb,
- const RemoteCommandRequest& request) {
- invariant(!cbData.status.isOK());
- cb(ReplicationExecutor::RemoteCommandCallbackArgs(
- cbData.executor, cbData.myHandle, request, cbData.status));
-}
-
-void ReplicationExecutor::_finishRemoteCommand(const RemoteCommandRequest& request,
- const RemoteCommandResponse& response,
- const CallbackHandle& cbHandle,
- const uint64_t expectedHandleGeneration,
- const RemoteCommandCallbackFn& cb) {
- Callback* callback = _getCallbackFromHandle(cbHandle);
- const WorkQueue::iterator iter = callback->_iter;
-
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_inShutdown) {
- return;
- }
-
- if (expectedHandleGeneration != iter->generation) {
- return;
- }
-
- LOG(4) << "Received remote response: "
- << (response.isOK() ? response.toString() : response.status.toString());
-
- callback->_callbackFn =
- stdx::bind(remoteCommandFinished, stdx::placeholders::_1, cb, request, response);
- _readyQueue.splice(_readyQueue.end(), _networkInProgressQueue, iter);
-}
-
-StatusWith<ReplicationExecutor::CallbackHandle> ReplicationExecutor::scheduleRemoteCommand(
- const RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb) {
- RemoteCommandRequest scheduledRequest = request;
- if (request.timeout == RemoteCommandRequest::kNoTimeout) {
- scheduledRequest.expirationDate = RemoteCommandRequest::kNoExpirationDate;
- } else {
- scheduledRequest.expirationDate = _networkInterface->now() + scheduledRequest.timeout;
- }
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- StatusWith<CallbackHandle> handle = enqueueWork_inlock(
- &_networkInProgressQueue,
- stdx::bind(remoteCommandFailedEarly, stdx::placeholders::_1, cb, scheduledRequest));
- if (handle.isOK()) {
- _getCallbackFromHandle(handle.getValue())->_iter->isNetworkOperation = true;
-
- LOG(4) << "Scheduling remote request: " << request.toString();
-
- _networkInterface->startCommand(
- handle.getValue(),
- scheduledRequest,
- stdx::bind(&ReplicationExecutor::_finishRemoteCommand,
- this,
- scheduledRequest,
- stdx::placeholders::_1,
- handle.getValue(),
- _getCallbackFromHandle(handle.getValue())->_iter->generation,
- cb));
- }
- ++_counterScheduledCommands;
- return handle;
-}
-
-StatusWith<ReplicationExecutor::CallbackHandle> ReplicationExecutor::scheduleWork(
- const CallbackFn& work) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _networkInterface->signalWorkAvailable();
- const auto status = enqueueWork_inlock(&_readyQueue, work);
- if (status.isOK()) {
- ++_counterScheduledWorks;
- }
- return status;
-}
-
-StatusWith<ReplicationExecutor::CallbackHandle> ReplicationExecutor::scheduleWorkAt(
- Date_t when, const CallbackFn& work) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- WorkQueue temp;
- StatusWith<CallbackHandle> cbHandle = enqueueWork_inlock(&temp, work);
- if (!cbHandle.isOK())
- return cbHandle;
- auto callback = _getCallbackFromHandle(cbHandle.getValue());
- callback->_iter->readyDate = when;
- callback->_isSleeper = true;
- WorkQueue::iterator insertBefore = _sleepersQueue.begin();
- while (insertBefore != _sleepersQueue.end() && insertBefore->readyDate <= when)
- ++insertBefore;
- _sleepersQueue.splice(insertBefore, temp, temp.begin());
- ++_counterScheduledWorkAts;
- _networkInterface->signalWorkAvailable();
- return cbHandle;
-}
-
-StatusWith<ReplicationExecutor::CallbackHandle> ReplicationExecutor::scheduleDBWork(
- const CallbackFn& work) {
- return scheduleDBWork(work, NamespaceString(), MODE_NONE);
-}
-
-StatusWith<ReplicationExecutor::CallbackHandle> ReplicationExecutor::scheduleDBWork(
- const CallbackFn& work, const NamespaceString& nss, LockMode mode) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- StatusWith<CallbackHandle> handle = enqueueWork_inlock(&_dbWorkInProgressQueue, work);
- if (handle.isOK()) {
- auto doOp = stdx::bind(&ReplicationExecutor::_doOperation,
- this,
- stdx::placeholders::_1,
- stdx::placeholders::_2,
- handle.getValue(),
- &_dbWorkInProgressQueue,
- nullptr);
- auto task = [doOp](OperationContext* opCtx, const Status& status) {
- makeNoExcept(stdx::bind(doOp, opCtx, status))();
- return TaskRunner::NextAction::kDisposeOperationContext;
- };
- if (mode == MODE_NONE && nss.ns().empty()) {
- _dblockTaskRunner.schedule(task);
- } else {
- _dblockTaskRunner.schedule(DatabaseTask::makeCollectionLockTask(task, nss, mode));
- }
- }
- ++_counterScheduledDBWorks;
- return handle;
-}
-
-void ReplicationExecutor::_doOperation(OperationContext* opCtx,
- const Status& taskRunnerStatus,
- const CallbackHandle& cbHandle,
- WorkQueue* workQueue,
- stdx::mutex* terribleExLockSyncMutex) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- if (_inShutdown)
- return;
- Callback* callback = _getCallbackFromHandle(cbHandle);
- const WorkQueue::iterator iter = callback->_iter;
- callback->_isRemoved = true;
- iter->callback = CallbackHandle();
- iter->isNetworkOperation = false;
- _freeQueue.splice(_freeQueue.begin(), *workQueue, iter);
- lk.unlock();
- {
- std::unique_ptr<stdx::lock_guard<stdx::mutex>> terribleLock(
- terribleExLockSyncMutex ? new stdx::lock_guard<stdx::mutex>(*terribleExLockSyncMutex)
- : nullptr);
- // Only possible task runner error status is CallbackCanceled.
- callback->_callbackFn(
- CallbackArgs(this,
- cbHandle,
- (callback->_isCanceled || !taskRunnerStatus.isOK()
- ? Status(ErrorCodes::CallbackCanceled, "Callback canceled")
- : Status::OK()),
- opCtx));
- }
- lk.lock();
- signalEvent_inlock(callback->_finishedEvent);
-}
-
-StatusWith<ReplicationExecutor::CallbackHandle>
-ReplicationExecutor::scheduleWorkWithGlobalExclusiveLock(const CallbackFn& work) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- StatusWith<CallbackHandle> handle = enqueueWork_inlock(&_exclusiveLockInProgressQueue, work);
- if (handle.isOK()) {
- auto doOp = stdx::bind(&ReplicationExecutor::_doOperation,
- this,
- stdx::placeholders::_1,
- stdx::placeholders::_2,
- handle.getValue(),
- &_exclusiveLockInProgressQueue,
- &_terribleExLockSyncMutex);
- _dblockExclusiveLockTaskRunner.schedule(DatabaseTask::makeGlobalExclusiveLockTask(
- [doOp](OperationContext* opCtx, const Status& status) {
- makeNoExcept(stdx::bind(doOp, opCtx, status))();
- return TaskRunner::NextAction::kDisposeOperationContext;
- }));
- }
- ++_counterScheduledExclusiveWorks;
- return handle;
-}
-
-void ReplicationExecutor::appendConnectionStats(executor::ConnectionPoolStats* stats) const {
- _networkInterface->appendConnectionStats(stats);
-}
-
-std::pair<ReplicationExecutor::WorkItem, ReplicationExecutor::CallbackHandle>
-ReplicationExecutor::getWork() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- while (true) {
- const Date_t now = _networkInterface->now();
- Date_t nextWakeupDate = scheduleReadySleepers_inlock(now);
- if (!_readyQueue.empty()) {
- break;
- } else if (_inShutdown) {
- return std::make_pair(WorkItem(), CallbackHandle());
- }
- lk.unlock();
- if (nextWakeupDate == Date_t::max()) {
- _networkInterface->waitForWork();
- } else {
- _networkInterface->waitForWorkUntil(nextWakeupDate);
- }
- lk.lock();
- }
- const WorkItem work = *_readyQueue.begin();
- const CallbackHandle cbHandle = work.callback;
- Callback* callback = _getCallbackFromHandle(cbHandle);
- callback->_isRemoved = true;
- _readyQueue.begin()->callback = CallbackHandle();
- _readyQueue.begin()->isNetworkOperation = false;
- _freeQueue.splice(_freeQueue.begin(), _readyQueue, _readyQueue.begin());
- return std::make_pair(work, cbHandle);
-}
-
-int64_t ReplicationExecutor::nextRandomInt64(int64_t limit) {
- return _random.nextInt64(limit);
-}
-
-Date_t ReplicationExecutor::scheduleReadySleepers_inlock(const Date_t now) {
- WorkQueue::iterator iter = _sleepersQueue.begin();
- while ((iter != _sleepersQueue.end()) && (iter->readyDate <= now)) {
- auto callback = ReplicationExecutor::_getCallbackFromHandle(iter->callback);
- callback->_isSleeper = false;
- ++iter;
- }
- _readyQueue.splice(_readyQueue.end(), _sleepersQueue, _sleepersQueue.begin(), iter);
- if (iter == _sleepersQueue.end()) {
- // indicate no sleeper to wait for
- return Date_t::max();
- }
- return iter->readyDate;
-}
-
-StatusWith<ReplicationExecutor::CallbackHandle> ReplicationExecutor::enqueueWork_inlock(
- WorkQueue* queue, const CallbackFn& callbackFn) {
- invariant(callbackFn);
- StatusWith<EventHandle> event = makeEvent_inlock();
- if (!event.isOK())
- return StatusWith<CallbackHandle>(event.getStatus());
-
- if (_freeQueue.empty())
- _freeQueue.push_front(WorkItem());
- const WorkQueue::iterator iter = _freeQueue.begin();
- WorkItem& work = *iter;
-
- invariant(!work.callback.isValid());
- setCallbackForHandle(&work.callback,
- std::shared_ptr<executor::TaskExecutor::CallbackState>(
- new Callback(this, callbackFn, iter, event.getValue())));
-
- work.generation++;
- work.finishedEvent = event.getValue();
- work.readyDate = Date_t();
- queue->splice(queue->end(), _freeQueue, iter);
- return StatusWith<CallbackHandle>(work.callback);
-}
-
-void ReplicationExecutor::waitForDBWork_forTest() {
- _dblockTaskRunner.join();
-}
-
-ReplicationExecutor::WorkItem::WorkItem() : generation(0U), isNetworkOperation(false) {}
-
-ReplicationExecutor::Event::Event(ReplicationExecutor* executor, const EventList::iterator& iter)
- : executor::TaskExecutor::EventState(), _executor(executor), _isSignaled(false), _iter(iter) {}
-
-ReplicationExecutor::Event::~Event() {}
-
-void ReplicationExecutor::Event::signal() {
- // Must go through executor to signal so that this can be removed from the _unsignaledEvents
- // EventList.
- _executor->signalEvent(*_iter);
-}
-
-void ReplicationExecutor::Event::_signal_inlock() {
- invariant(!_isSignaled);
- _isSignaled = true;
-
- if (!_waiters.empty()) {
- _executor->_readyQueue.splice(_executor->_readyQueue.end(), _waiters);
- _executor->_networkInterface->signalWorkAvailable();
- }
-
- _isSignaledCondition.notify_all();
-}
-
-void ReplicationExecutor::Event::waitUntilSignaled() {
- stdx::unique_lock<stdx::mutex> lk(_executor->_mutex);
- ++_executor->_totalEventWaiters;
- while (!_isSignaled) {
- _isSignaledCondition.wait(lk);
- }
- --_executor->_totalEventWaiters;
- _executor->maybeNotifyShutdownComplete_inlock();
-}
-
-bool ReplicationExecutor::Event::isSignaled() {
- stdx::lock_guard<stdx::mutex> lk(_executor->_mutex);
- return _isSignaled;
-}
-
-ReplicationExecutor::Callback::Callback(ReplicationExecutor* executor,
- const CallbackFn callbackFn,
- const WorkQueue::iterator& iter,
- const EventHandle& finishedEvent)
- : executor::TaskExecutor::CallbackState(),
- _executor(executor),
- _callbackFn(callbackFn),
- _isCanceled(false),
- _isSleeper(false),
- _isRemoved(false),
- _iter(iter),
- _finishedEvent(finishedEvent) {}
-
-ReplicationExecutor::Callback::~Callback() {}
-
-bool ReplicationExecutor::Callback::isCanceled() const {
- stdx::unique_lock<stdx::mutex> lk(_executor->_mutex);
- return _isCanceled;
-}
-
-void ReplicationExecutor::Callback::cancel() {
- stdx::unique_lock<stdx::mutex> lk(_executor->_mutex);
- // If this element has already been removed from the queues,
- // the cancel is too late and has no effect.
- if (_isRemoved)
- return;
-
- _isCanceled = true;
-
- if (_isSleeper) {
- _isSleeper = false;
- _executor->_readyQueue.splice(
- _executor->_readyQueue.end(), _executor->_sleepersQueue, _iter);
- }
-
- if (_iter->isNetworkOperation) {
- lk.unlock();
- _executor->_networkInterface->cancelCommand(_iter->callback);
- }
-}
-
-void ReplicationExecutor::Callback::waitForCompletion() {
- _executor->waitForEvent(_finishedEvent);
-}
-
-ReplicationExecutor::Event* ReplicationExecutor::_getEventFromHandle(
- const EventHandle& eventHandle) {
- return static_cast<Event*>(getEventFromHandle(eventHandle));
-}
-
-ReplicationExecutor::Callback* ReplicationExecutor::_getCallbackFromHandle(
- const CallbackHandle& callbackHandle) {
- return static_cast<Callback*>(getCallbackFromHandle(callbackHandle));
-}
-
-namespace {
-
-void callNoExcept(const stdx::function<void()>& fn) {
- try {
- fn();
- } catch (...) {
- auto status = exceptionToStatus();
- log() << "Exception thrown in ReplicationExecutor callback: " << status;
- std::terminate();
- }
-}
-
-stdx::function<void()> makeNoExcept(const stdx::function<void()>& fn) {
- return stdx::bind(callNoExcept, fn);
-}
-
-} // namespace
-
-} // namespace repl
-} // namespace mongo
diff --git a/src/mongo/db/repl/replication_executor.h b/src/mongo/db/repl/replication_executor.h
deleted file mode 100644
index 2da3e3f3deb..00000000000
--- a/src/mongo/db/repl/replication_executor.h
+++ /dev/null
@@ -1,422 +0,0 @@
-/**
- * Copyright (C) 2014 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <string>
-
-#include "mongo/base/disallow_copying.h"
-#include "mongo/base/status.h"
-#include "mongo/base/status_with.h"
-#include "mongo/base/string_data.h"
-#include "mongo/db/concurrency/lock_manager_defs.h"
-#include "mongo/db/repl/task_runner.h"
-#include "mongo/executor/task_executor.h"
-#include "mongo/platform/compiler.h"
-#include "mongo/platform/random.h"
-#include "mongo/stdx/condition_variable.h"
-#include "mongo/stdx/functional.h"
-#include "mongo/stdx/list.h"
-#include "mongo/stdx/mutex.h"
-#include "mongo/stdx/thread.h"
-#include "mongo/util/concurrency/old_thread_pool.h"
-#include "mongo/util/net/hostandport.h"
-#include "mongo/util/time_support.h"
-
-namespace mongo {
-
-class BSONObjBuilder;
-class NamespaceString;
-class OperationContext;
-
-namespace executor {
-struct ConnectionPoolStats;
-class NetworkInterface;
-} // namespace executor
-
-namespace repl {
-
-/**
- * Implementation of the TaskExecutor interface for providing an event loop for driving state
- * machines in replication.
- *
- * Usage: Instantiate an executor, schedule a work item, call run().
- *
- * Implementation details:
- *
- * The executor is composed of several WorkQueues, which are queues of WorkItems. WorkItems
- * describe units of work -- a callback and state needed to track its lifecycle. The iterators
- * pointing to WorkItems are spliced between the WorkQueues, rather than copying WorkItems
- * themselves. Further, those WorkQueue::iterators are never invalidated during the life of an
- * executor. They may be recycled to represent new work items, but when that happens, a counter
- * on the WorkItem is incremented, to disambiguate.
- *
- * All work executed by the run() method of the executor is popped off the front of the
- * _readyQueue. Remote commands blocked on the network can be found in the
- * _networkInProgressQueue. Callbacks waiting for a timer to expire are in the _sleepersQueue.
- * When the network returns or the timer expires, items from these two queues are transferred to
- * the back of the _readyQueue.
- *
- * The _exclusiveLockInProgressQueue, which represents work items to execute while holding the
- * GlobalWrite lock, is exceptional. WorkItems in that queue execute in unspecified order with
- * respect to work in the _readyQueue or other WorkItems in the _exclusiveLockInProgressQueue,
- * but they are executed in a single serial order with respect to those other WorkItems. The
- * _terribleExLockSyncMutex is used to provide this serialization, until such time as the global
- * lock may be passed from one thread to another.
- */
-class ReplicationExecutor final : public executor::TaskExecutor {
- MONGO_DISALLOW_COPYING(ReplicationExecutor);
-
-public:
- /**
- * Constructs a new executor.
- *
- * Takes ownership of the passed NetworkInterface object.
- */
- ReplicationExecutor(std::unique_ptr<executor::NetworkInterface> netInterface, int64_t pnrgSeed);
-
- /**
- * Destroys an executor.
- */
- virtual ~ReplicationExecutor();
-
- BSONObj getDiagnosticBSON() const;
- void appendDiagnosticBSON(BSONObjBuilder* b) const override;
- Date_t now() override;
- void startup() override;
- void shutdown() override;
- void join() override;
- void signalEvent(const EventHandle& event) override;
- StatusWith<EventHandle> makeEvent() override;
- StatusWith<CallbackHandle> onEvent(const EventHandle& event, const CallbackFn& work) override;
- void waitForEvent(const EventHandle& event) override;
- StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) override;
- StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) override;
- StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request,
- const RemoteCommandCallbackFn& cb) override;
- void cancel(const CallbackHandle& cbHandle) override;
- void wait(const CallbackHandle& cbHandle) override;
-
- void appendConnectionStats(executor::ConnectionPoolStats* stats) const override;
-
- /**
- * Executes the run loop. May be called up to one time.
- *
- * Doesn't need to be public, so do not call directly unless from unit-tests.
- *
- * Returns after the executor has been shutdown and is safe to delete.
- */
- void run();
-
- /**
- * Schedules DB "work" to be run by the executor..
- *
- * Takes no locks for caller - global, database or collection.
- *
- * The "work" will run exclusively with other DB work items. All DB work items
- * are run the in order they are scheduled.
- *
- * The "work" may run concurrently with other non-DB work items,
- * but there are no ordering guarantees provided with respect to
- * any other work item.
- *
- * Returns a handle for waiting on or canceling the callback, or
- * ErrorCodes::ShutdownInProgress.
- *
- * May be called by client threads or callbacks running in the executor.
- */
- StatusWith<CallbackHandle> scheduleDBWork(const CallbackFn& work);
-
- /**
- * Schedules DB "work" to be run by the executor while holding the collection lock.
- *
- * Takes collection lock in specified mode (and slightly more permissive lock for the
- * database lock) but not the global exclusive lock.
- *
- * The "work" will run exclusively with other DB work items. All DB work items
- * are run the in order they are scheduled.
- *
- * The "work" may run concurrently with other non-DB work items,
- * but there are no ordering guarantees provided with respect to
- * any other work item.
- *
- * Returns a handle for waiting on or canceling the callback, or
- * ErrorCodes::ShutdownInProgress.
- *
- * May be called by client threads or callbacks running in the executor.
- */
- StatusWith<CallbackHandle> scheduleDBWork(const CallbackFn& work,
- const NamespaceString& nss,
- LockMode mode);
-
- /**
- * Schedules "work" to be run by the executor while holding the global exclusive lock.
- *
- * Takes collection lock in specified mode (and slightly more permissive lock for the
- * database lock) but not the global exclusive lock.
- *
- * The "work" will run exclusively, as though it were executed by the main
- * run loop, but there are no ordering guarantees provided with respect to
- * any other work item.
- *
- * Returns a handle for waiting on or canceling the callback, or
- * ErrorCodes::ShutdownInProgress.
- *
- * May be called by client threads or callbacks running in the executor.
- */
- StatusWith<CallbackHandle> scheduleWorkWithGlobalExclusiveLock(const CallbackFn& work);
-
- /**
- * Returns an int64_t generated by the prng with a max value of "limit".
- */
- int64_t nextRandomInt64(int64_t limit);
-
- /**
- * Wait until DB worker thread is not active. Test only.
- *
- * Usually NetworkInterfaceMock::runReadyNetworkOperations() is called before and after this
- * function to ensure the synchronization of executor thread and DB worker thread.
- */
- void waitForDBWork_forTest();
-
-private:
- class Callback;
- class Event;
- struct WorkItem;
- friend class Callback;
- friend class Event;
-
-
- /**
- * A linked list of WorkItem objects.
- *
- * WorkItems get moved among lists by splicing iterators of work lists together,
- * not by copying underlying WorkItem objects.
- */
- typedef stdx::list<WorkItem> WorkQueue;
-
- /**
- * A linked list of EventHandles.
- */
- typedef stdx::list<EventHandle> EventList;
-
- /**
- * Implementation of makeEvent() for use when _mutex is already held.
- */
- StatusWith<EventHandle> makeEvent_inlock();
-
- /**
- * Implementation of signalEvent() for use when _mutex is already held.
- */
- void signalEvent_inlock(const EventHandle&);
-
- /**
- * Gets a single piece of work to execute.
- *
- * If the "callback" member of the returned WorkItem is falsey, that is a signal
- * to the run loop to wait for shutdown.
- */
- std::pair<WorkItem, CallbackHandle> getWork();
-
- /**
- * Marks as runnable any sleepers whose ready date has passed as of "now".
- * Returns the date when the next sleeper will be ready, or Date_t(~0ULL) if there are no
- * remaining sleepers.
- */
- Date_t scheduleReadySleepers_inlock(Date_t now);
-
- /**
- * Enqueues "callback" into "queue".
- */
- StatusWith<CallbackHandle> enqueueWork_inlock(WorkQueue* queue, const CallbackFn& callback);
-
- /**
- * Notifies interested parties that shutdown has completed, if it has.
- */
- void maybeNotifyShutdownComplete_inlock();
-
- /**
- * Completes the shutdown process. Called by run().
- */
- void finishShutdown();
-
- void _finishRemoteCommand(const executor::RemoteCommandRequest& request,
- const executor::RemoteCommandResponse& response,
- const CallbackHandle& cbHandle,
- const uint64_t expectedHandleGeneration,
- const RemoteCommandCallbackFn& cb);
-
- /**
- * Executes the callback referenced by "cbHandle", and moves the underlying
- * WorkQueue::iterator from "workQueue" into the _freeQueue.
- *
- * "opCtx" is a pointer to the OperationContext.
- *
- * "status" is the callback status from the task runner. Only possible values are
- * Status::OK and ErrorCodes::CallbackCanceled (when task runner is canceled).
- *
- * If "terribleExLockSyncMutex" is not null, serializes execution of "cbHandle" with the
- * execution of other callbacks.
- */
- void _doOperation(OperationContext* opCtx,
- const Status& taskRunnerStatus,
- const CallbackHandle& cbHandle,
- WorkQueue* workQueue,
- stdx::mutex* terribleExLockSyncMutex);
-
- /**
- * Wrapper around TaskExecutor::getCallbackFromHandle that return an Event* instead of
- * a generic EventState*.
- */
- Event* _getEventFromHandle(const EventHandle& eventHandle);
-
- /**
- * Wrapper around TaskExecutor::getCallbackFromHandle that return an Event* instead of
- * a generic EventState*.
- */
- Callback* _getCallbackFromHandle(const CallbackHandle& callbackHandle);
-
- // PRNG; seeded at class construction time.
- PseudoRandom _random;
-
- std::unique_ptr<executor::NetworkInterface> _networkInterface;
-
- // Thread which executes the run method. Started by startup and must be jointed after shutdown.
- stdx::thread _executorThread;
-
- mutable stdx::mutex _mutex;
- stdx::mutex _terribleExLockSyncMutex;
- stdx::condition_variable _noMoreWaitingThreads;
- WorkQueue _freeQueue;
- WorkQueue _readyQueue;
- WorkQueue _dbWorkInProgressQueue;
- WorkQueue _exclusiveLockInProgressQueue;
- WorkQueue _networkInProgressQueue;
- WorkQueue _sleepersQueue;
- EventList _unsignaledEvents;
- int64_t _totalEventWaiters = 0;
-
- // Counters for metrics, for the whole life of this instance, protected by _mutex.
- int64_t _counterWaitEvents = 0;
- int64_t _counterCreatedEvents = 0;
- int64_t _counterScheduledCommands = 0;
- int64_t _counterScheduledExclusiveWorks = 0;
- int64_t _counterScheduledDBWorks = 0;
- int64_t _counterScheduledWorks = 0;
- int64_t _counterScheduledWorkAts = 0;
- int64_t _counterSchedulingFailures = 0;
- int64_t _counterCancels = 0;
- int64_t _counterWaits = 0;
-
- bool _inShutdown;
- OldThreadPool _dblockWorkers;
- TaskRunner _dblockTaskRunner;
- TaskRunner _dblockExclusiveLockTaskRunner;
- uint64_t _nextId = 0;
-};
-
-class ReplicationExecutor::Callback : public executor::TaskExecutor::CallbackState {
- friend class ReplicationExecutor;
-
-public:
- Callback(ReplicationExecutor* executor,
- const CallbackFn callbackFn,
- const WorkQueue::iterator& iter,
- const EventHandle& finishedEvent);
- virtual ~Callback();
-
- void cancel() override;
- void waitForCompletion() override;
- bool isCanceled() const override;
-
-private:
- ReplicationExecutor* _executor;
-
- // All members other than _executor are protected by the executor's _mutex.
- CallbackFn _callbackFn;
- bool _isCanceled;
- bool _isSleeper;
- bool _isRemoved;
- WorkQueue::iterator _iter;
- EventHandle _finishedEvent;
-};
-
-/**
- * Description of a scheduled but not-yet-run work item.
- *
- * Once created, WorkItem objects remain in scope until the executor is destroyed.
- * However, over their lifetime, they may represent many different work items. This
- * divorces the lifetime of CallbackHandles from the lifetime of WorkItem objects, but
- * requires a unique generation identifier in CallbackHandles and WorkItem objects.
- *
- * WorkItem is copyable so that it may be stored in a list. However, in practice they
- * should only be copied by getWork() and when allocating new entries into a WorkQueue (not
- * when moving entries between work lists).
- */
-struct ReplicationExecutor::WorkItem {
- WorkItem();
- uint64_t generation;
- CallbackHandle callback;
- EventHandle finishedEvent;
- Date_t readyDate;
- bool isNetworkOperation;
-};
-
-/**
- * Description of an event.
- *
- * Like WorkItem, above, but for events. On signaling, the executor removes the event from the
- * "unsignaled" EventList and schedules all work items in the _waiters list.
- */
-class ReplicationExecutor::Event : public executor::TaskExecutor::EventState {
- friend class ReplicationExecutor;
-
-public:
- Event(ReplicationExecutor* executor, const EventList::iterator& iter);
- virtual ~Event();
-
- void signal() override;
- void waitUntilSignaled() override;
- bool isSignaled() override;
-
-private:
- // Note that the caller is responsible for removing any references to any EventHandles
- // pointing to this event.
- void _signal_inlock();
-
- ReplicationExecutor* _executor;
-
- // All members other than _executor are protected by the executor's _mutex.
- bool _isSignaled;
- stdx::condition_variable _isSignaledCondition;
- EventList::iterator _iter;
- WorkQueue _waiters;
-};
-
-} // namespace repl
-} // namespace mongo
diff --git a/src/mongo/db/repl/replication_executor_test.cpp b/src/mongo/db/repl/replication_executor_test.cpp
deleted file mode 100644
index e0159598eb6..00000000000
--- a/src/mongo/db/repl/replication_executor_test.cpp
+++ /dev/null
@@ -1,373 +0,0 @@
-/**
- * Copyright (C) 2014 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include <map>
-
-#include "mongo/base/init.h"
-#include "mongo/db/bson/dotted_path_support.h"
-#include "mongo/db/client.h"
-#include "mongo/db/namespace_string.h"
-#include "mongo/db/operation_context.h"
-#include "mongo/db/repl/replication_executor.h"
-#include "mongo/db/repl/replication_executor_test_fixture.h"
-#include "mongo/executor/network_interface_mock.h"
-#include "mongo/executor/task_executor_test_common.h"
-#include "mongo/stdx/functional.h"
-#include "mongo/stdx/memory.h"
-#include "mongo/stdx/thread.h"
-#include "mongo/unittest/barrier.h"
-#include "mongo/unittest/unittest.h"
-#include "mongo/util/assert_util.h"
-#include "mongo/util/map_util.h"
-
-namespace mongo {
-namespace repl {
-
-namespace {
-
-using executor::NetworkInterfaceMock;
-using unittest::assertGet;
-
-namespace dps = ::mongo::dotted_path_support;
-
-const int64_t prngSeed = 1;
-
-MONGO_INITIALIZER(ReplExecutorCommonTests)(InitializerContext*) {
- mongo::executor::addTestsForExecutor(
- "ReplicationExecutorCommon", [](std::unique_ptr<executor::NetworkInterfaceMock> net) {
- return stdx::make_unique<ReplicationExecutor>(std::move(net), prngSeed);
- });
- return Status::OK();
-}
-
-TEST_F(ReplicationExecutorTest, ScheduleDBWorkAndExclusiveWorkConcurrently) {
- unittest::Barrier barrier(2U);
- NamespaceString nss("mydb", "mycoll");
- ReplicationExecutor& executor = getReplExecutor();
- Status status1 = getDetectableErrorStatus();
- OperationContext* opCtx = nullptr;
- using CallbackData = ReplicationExecutor::CallbackArgs;
- ASSERT_OK(executor
- .scheduleDBWork([&](const CallbackData& cbData) {
- status1 = cbData.status;
- opCtx = cbData.opCtx;
- barrier.countDownAndWait();
- if (cbData.status != ErrorCodes::CallbackCanceled)
- cbData.executor->shutdown();
- })
- .getStatus());
- ASSERT_OK(executor
- .scheduleWorkWithGlobalExclusiveLock(
- [&](const CallbackData& cbData) { barrier.countDownAndWait(); })
- .getStatus());
- executor.startup();
- executor.join();
- ASSERT_OK(status1);
- ASSERT(opCtx);
-}
-
-TEST_F(ReplicationExecutorTest, ScheduleDBWorkWithCollectionLock) {
- NamespaceString nss("mydb", "mycoll");
- ReplicationExecutor& executor = getReplExecutor();
- Status status1 = getDetectableErrorStatus();
- OperationContext* opCtx = nullptr;
- bool collectionIsLocked = false;
- using CallbackData = ReplicationExecutor::CallbackArgs;
- ASSERT_OK(executor
- .scheduleDBWork(
- [&](const CallbackData& cbData) {
- status1 = cbData.status;
- opCtx = cbData.opCtx;
- collectionIsLocked = opCtx
- ? opCtx->lockState()->isCollectionLockedForMode(nss.ns(), MODE_X)
- : false;
- if (cbData.status != ErrorCodes::CallbackCanceled)
- cbData.executor->shutdown();
- },
- nss,
- MODE_X)
- .getStatus());
- executor.startup();
- executor.join();
- ASSERT_OK(status1);
- ASSERT(opCtx);
- ASSERT_TRUE(collectionIsLocked);
-}
-
-TEST_F(ReplicationExecutorTest, ScheduleExclusiveLockOperation) {
- ReplicationExecutor& executor = getReplExecutor();
- Status status1 = getDetectableErrorStatus();
- OperationContext* opCtx = nullptr;
- bool lockIsW = false;
- using CallbackData = ReplicationExecutor::CallbackArgs;
- ASSERT_OK(executor
- .scheduleWorkWithGlobalExclusiveLock([&](const CallbackData& cbData) {
- status1 = cbData.status;
- opCtx = cbData.opCtx;
- lockIsW = opCtx ? opCtx->lockState()->isW() : false;
- if (cbData.status != ErrorCodes::CallbackCanceled)
- cbData.executor->shutdown();
- })
- .getStatus());
- executor.startup();
- executor.join();
- ASSERT_OK(status1);
- ASSERT(opCtx);
- ASSERT_TRUE(lockIsW);
-}
-
-TEST_F(ReplicationExecutorTest, ShutdownBeforeRunningSecondExclusiveLockOperation) {
- ReplicationExecutor& executor = getReplExecutor();
- using CallbackData = ReplicationExecutor::CallbackArgs;
- Status status1 = getDetectableErrorStatus();
- ASSERT_OK(executor
- .scheduleWorkWithGlobalExclusiveLock([&](const CallbackData& cbData) {
- status1 = cbData.status;
- if (cbData.status != ErrorCodes::CallbackCanceled)
- cbData.executor->shutdown();
- })
- .getStatus());
- // Second db work item is invoked by the main executor thread because the work item is
- // moved from the exclusive lock queue to the ready work item queue when the first callback
- // cancels the executor.
- Status status2 = getDetectableErrorStatus();
- ASSERT_OK(executor
- .scheduleWorkWithGlobalExclusiveLock([&](const CallbackData& cbData) {
- status2 = cbData.status;
- if (cbData.status != ErrorCodes::CallbackCanceled)
- cbData.executor->shutdown();
- })
- .getStatus());
- executor.startup();
- executor.join();
- ASSERT_OK(status1);
- ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status2.code());
-}
-
-TEST_F(ReplicationExecutorTest, CancelBeforeRunningFutureWork) {
- ReplicationExecutor& executor = getReplExecutor();
- using CallbackData = ReplicationExecutor::CallbackArgs;
- Status status1 = getDetectableErrorStatus();
- auto cbhWithStatus = executor.scheduleWorkAt(
- executor.now() + Milliseconds(1000), [&](const CallbackData& cbData) {
- status1 = cbData.status;
- if (cbData.status != ErrorCodes::CallbackCanceled)
- cbData.executor->shutdown();
- });
- ASSERT_OK(cbhWithStatus.getStatus());
-
- ASSERT_EQUALS(1,
- dps::extractElementAtPath(executor.getDiagnosticBSON(), "queues.sleepers").Int());
- ASSERT_EQUALS(0, dps::extractElementAtPath(executor.getDiagnosticBSON(), "queues.ready").Int());
- executor.cancel(cbhWithStatus.getValue());
-
- ASSERT_EQUALS(0,
- dps::extractElementAtPath(executor.getDiagnosticBSON(), "queues.sleepers").Int());
- ASSERT_EQUALS(1, dps::extractElementAtPath(executor.getDiagnosticBSON(), "queues.ready").Int());
-}
-
-// Equivalent to EventChainAndWaitingTest::onGo
-TEST_F(ReplicationExecutorTest, ScheduleCallbackOnFutureEvent) {
- launchExecutorThread();
- getNet()->exitNetwork();
-
- ReplicationExecutor& executor = getReplExecutor();
- // We signal this "ping" event and the executor will signal "pong" event in return.
- auto ping = assertGet(executor.makeEvent());
- auto pong = assertGet(executor.makeEvent());
- auto fn = [&executor, pong](const ReplicationExecutor::CallbackArgs& cbData) {
- ASSERT_OK(cbData.status);
- executor.signalEvent(pong);
- };
-
- // Wait for a future event.
- executor.onEvent(ping, fn);
- ASSERT_EQUALS(0, dps::extractElementAtPath(executor.getDiagnosticBSON(), "queues.ready").Int());
- executor.signalEvent(ping);
- executor.waitForEvent(pong);
-}
-
-// Equivalent to EventChainAndWaitingTest::onGoAfterTriggered
-TEST_F(ReplicationExecutorTest, ScheduleCallbackOnSignaledEvent) {
- launchExecutorThread();
- getNet()->exitNetwork();
-
- ReplicationExecutor& executor = getReplExecutor();
- // We signal this "ping" event and the executor will signal "pong" event in return.
- auto ping = assertGet(executor.makeEvent());
- auto pong = assertGet(executor.makeEvent());
- auto fn = [&executor, pong](const ReplicationExecutor::CallbackArgs& cbData) {
- ASSERT_OK(cbData.status);
- executor.signalEvent(pong);
- };
-
- // Wait for a signaled event.
- executor.signalEvent(ping);
- executor.onEvent(ping, fn);
- executor.waitForEvent(pong);
-}
-
-TEST_F(ReplicationExecutorTest, ScheduleCallbackAtNow) {
- launchExecutorThread();
- getNet()->exitNetwork();
-
- ReplicationExecutor& executor = getReplExecutor();
- auto finishEvent = assertGet(executor.makeEvent());
- auto fn = [&executor, finishEvent](const ReplicationExecutor::CallbackArgs& cbData) {
- ASSERT_OK(cbData.status);
- executor.signalEvent(finishEvent);
- };
-
- auto cb = executor.scheduleWorkAt(getNet()->now(), fn);
- executor.waitForEvent(finishEvent);
-}
-
-TEST_F(ReplicationExecutorTest, ScheduleCallbackAtAFutureTime) {
- launchExecutorThread();
- getNet()->exitNetwork();
-
- ReplicationExecutor& executor = getReplExecutor();
- auto finishEvent = assertGet(executor.makeEvent());
- auto fn = [&executor, finishEvent](const ReplicationExecutor::CallbackArgs& cbData) {
- ASSERT_OK(cbData.status);
- executor.signalEvent(finishEvent);
- };
-
- auto now = getNet()->now();
- now += Milliseconds(1000);
- auto cb = executor.scheduleWorkAt(now, fn);
-
- getNet()->enterNetwork();
- getNet()->runUntil(now);
- getNet()->exitNetwork();
-
- executor.waitForEvent(finishEvent);
-}
-
-TEST_F(ReplicationExecutorTest, CallbacksAreInvokedOnClientThreads) {
- launchExecutorThread();
- getNet()->exitNetwork();
-
- ReplicationExecutor& executor = getReplExecutor();
- auto status = getDetectableErrorStatus();
- bool haveClientInCallback = false;
- auto fn = [&haveClientInCallback, &status](const ReplicationExecutor::CallbackArgs& cbData) {
- status = cbData.status;
- haveClientInCallback = haveClient();
- };
-
- ASSERT_NOT_OK(status);
- auto cb = unittest::assertGet(executor.scheduleWork(fn));
- executor.wait(cb);
-
- ASSERT_OK(status);
- ASSERT_TRUE(haveClientInCallback);
-}
-
-TEST_F(ReplicationExecutorTest, TestForCancelRace) {
- launchExecutorThread();
- getNet()->exitNetwork();
-
- unittest::Barrier enterCallback(2U), runCallback(2U);
-
- ReplicationExecutor& executor = getReplExecutor();
- bool firstEventDone = false;
- bool firstEventCanceled = false;
- auto fn = [&executor, &enterCallback, &runCallback, &firstEventDone, &firstEventCanceled](
- const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) {
- // This barrier lets the test code wait until we're in the callback.
- enterCallback.countDownAndWait();
- // This barrier lets the test code keep us in the callback until it has run the cancel.
- runCallback.countDownAndWait();
- firstEventCanceled = !cbData.response.status.isOK();
- firstEventDone = true;
- };
-
- // First, schedule a network event to run.
- const executor::RemoteCommandRequest request(
- HostAndPort("test1", 1234), "mydb", BSON("nothing" << 0), nullptr);
- auto firstCallback = assertGet(executor.scheduleRemoteCommand(request, fn));
-
- // Now let the request happen.
- // We need to run the network on another thread, because the test
- // fixture will hang waiting for the callbacks to complete.
- auto timeThread = stdx::thread([this] {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- ASSERT(getNet()->hasReadyRequests());
- auto noi = getNet()->getNextReadyRequest();
- getNet()->scheduleSuccessfulResponse(noi, {});
- getNet()->runReadyNetworkOperations();
- });
-
- // Wait until we're in the callback.
- enterCallback.countDownAndWait();
-
- // Schedule a different network event to run.
- bool secondEventDone = false;
- bool secondEventCanceled = false;
- auto fn2 = [&executor, &secondEventDone, &secondEventCanceled](
- const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) {
- secondEventCanceled = !cbData.response.status.isOK();
- secondEventDone = true;
- };
- auto secondCallback = assertGet(executor.scheduleRemoteCommand(request, fn2));
- ASSERT_FALSE(firstEventDone); // The first event should be stuck at runCallback barrier.
- // Cancel the first callback. This cancel should have no effect as the callback has
- // already been started.
- executor.cancel(firstCallback);
-
- // Let the first callback continue to completion.
- runCallback.countDownAndWait();
-
- // Now the time thread can continue.
- timeThread.join();
-
- // The first event should be done, the second event should be pending.
- ASSERT(firstEventDone);
- ASSERT_FALSE(secondEventDone);
-
- // Run the network thread, which should run the second request.
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- // The second request should be ready.
- ASSERT(getNet()->hasReadyRequests()) << "Second request is not ready (cancelled?)";
- auto noi = getNet()->getNextReadyRequest();
- getNet()->scheduleSuccessfulResponse(noi, {});
- getNet()->runReadyNetworkOperations();
- }
-
- // The second callback should have run without being canceled.
- ASSERT_TRUE(secondEventDone);
- ASSERT_FALSE(secondEventCanceled);
-}
-
-} // namespace
-} // namespace repl
-} // namespace mongo
diff --git a/src/mongo/db/repl/replication_executor_test_fixture.cpp b/src/mongo/db/repl/replication_executor_test_fixture.cpp
deleted file mode 100644
index bdc1f976e9e..00000000000
--- a/src/mongo/db/repl/replication_executor_test_fixture.cpp
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/repl/replication_executor_test_fixture.h"
-
-#include "mongo/db/repl/replication_executor.h"
-#include "mongo/db/repl/storage_interface_mock.h"
-#include "mongo/executor/network_interface_mock.h"
-
-namespace mongo {
-namespace repl {
-
-namespace {
-
-const int64_t prngSeed = 1;
-
-} // namespace
-
-ReplicationExecutor& ReplicationExecutorTest::getReplExecutor() {
- return dynamic_cast<ReplicationExecutor&>(getExecutor());
-}
-
-void ReplicationExecutorTest::postExecutorThreadLaunch() {
- getNet()->enterNetwork();
-}
-
-std::unique_ptr<executor::TaskExecutor> ReplicationExecutorTest::makeTaskExecutor(
- std::unique_ptr<executor::NetworkInterfaceMock> net) {
- return stdx::make_unique<ReplicationExecutor>(std::move(net), prngSeed);
-}
-
-} // namespace repl
-} // namespace mongo
diff --git a/src/mongo/db/repl/replication_executor_test_fixture.h b/src/mongo/db/repl/replication_executor_test_fixture.h
deleted file mode 100644
index 5d106f8b864..00000000000
--- a/src/mongo/db/repl/replication_executor_test_fixture.h
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include "mongo/db/repl/replication_executor.h"
-#include "mongo/executor/task_executor_test_fixture.h"
-#include "mongo/stdx/memory.h"
-#include "mongo/unittest/unittest.h"
-
-namespace mongo {
-
-namespace executor {
-class NetworkInterfaceMock;
-class TaskExecutor;
-} // namespace executor
-
-namespace repl {
-
-class ReplicationExecutor;
-
-/**
- * Test fixture for tests that require a ReplicationExecutor backed by
- * a NetworkInterfaceMock.
- */
-class ReplicationExecutorTest : public executor::TaskExecutorTest {
-protected:
- ReplicationExecutor& getReplExecutor();
-
- /**
- * Anything that needs to be done after launchExecutorThread should go in here.
- */
- void postExecutorThreadLaunch() override;
-
-private:
- std::unique_ptr<executor::TaskExecutor> makeTaskExecutor(
- std::unique_ptr<executor::NetworkInterfaceMock> net) override;
-
- std::unique_ptr<ReplicationExecutor> _executor;
- bool _executorStarted{false};
-};
-
-} // namespace repl
-} // namespace mongo
diff --git a/src/mongo/executor/thread_pool_mock.cpp b/src/mongo/executor/thread_pool_mock.cpp
index ab19d6994c8..b3e1b84a836 100644
--- a/src/mongo/executor/thread_pool_mock.cpp
+++ b/src/mongo/executor/thread_pool_mock.cpp
@@ -126,7 +126,7 @@ void ThreadPoolMock::consumeTasks(stdx::unique_lock<stdx::mutex>* lk) {
invariant(_tasks.empty());
- while (!_joining) {
+ while (_started && !_joining) {
lk->unlock();
_net->waitForWork();
lk->lock();