diff options
author | Andy Schwerin <Andy Schwerin schwerin@mongodb.com> | 2017-05-18 16:41:38 -0500 |
---|---|---|
committer | Andy Schwerin <Andy Schwerin schwerin@mongodb.com> | 2017-05-18 16:41:38 -0500 |
commit | 9ff6b2d78cee808d633dee952d31e4d4f1fe0dd0 (patch) | |
tree | 00ef5cf14e062f063e19aa34b81b74b823cad462 /src | |
parent | f5ae1e058b9cef56fdbe08775d48278c79144051 (diff) | |
download | mongo-9ff6b2d78cee808d633dee952d31e4d4f1fe0dd0.tar.gz |
SERVER-28865 Replace ReplicationExecutor with ThreadPoolTaskExecutor in ReplicationCoordinatorImpl.
Diffstat (limited to 'src')
19 files changed, 159 insertions, 2288 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index a998f4c8c64..dc2b8898e6e 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -690,6 +690,7 @@ env.Library( ], LIBDEPS=[ "concurrency/lock_manager", + "concurrency/write_conflict_exception", "curop", "repl/read_concern_args", "repl/repl_coordinator_impl", diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index 2981e14ac43..34786483431 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -855,6 +855,21 @@ static void startupConfigActions(const std::vector<std::string>& args) { #endif } +auto makeReplicationExecutor(ServiceContext* serviceContext) { + ThreadPool::Options tpOptions; + tpOptions.poolName = "replexec"; + tpOptions.maxThreads = 50; + tpOptions.onCreateThread = [](const std::string& threadName) { + Client::initThread(threadName.c_str()); + }; + auto hookList = stdx::make_unique<rpc::EgressMetadataHookList>(); + hookList->addHook(stdx::make_unique<rpc::LogicalTimeMetadataHook>(serviceContext)); + return stdx::make_unique<executor::ThreadPoolTaskExecutor>( + stdx::make_unique<ThreadPool>(tpOptions), + executor::makeNetworkInterface( + "NetworkInterfaceASIO-Replication", nullptr, std::move(hookList))); +} + MONGO_INITIALIZER_WITH_PREREQUISITES(CreateReplicationManager, ("SetGlobalEnvironment", "SSLManager", "default")) (InitializerContext* context) { @@ -873,16 +888,12 @@ MONGO_INITIALIZER_WITH_PREREQUISITES(CreateReplicationManager, auto logicalClock = stdx::make_unique<LogicalClock>(serviceContext); LogicalClock::set(serviceContext, std::move(logicalClock)); - auto hookList = stdx::make_unique<rpc::EgressMetadataHookList>(); - hookList->addHook(stdx::make_unique<rpc::LogicalTimeMetadataHook>(serviceContext)); - auto replCoord = stdx::make_unique<repl::ReplicationCoordinatorImpl>( serviceContext, getGlobalReplSettings(), stdx::make_unique<repl::ReplicationCoordinatorExternalStateImpl>(serviceContext, storageInterface), - executor::makeNetworkInterface( - "NetworkInterfaceASIO-Replication", nullptr, std::move(hookList)), + makeReplicationExecutor(serviceContext), stdx::make_unique<repl::TopologyCoordinatorImpl>(topoCoordOptions), replicationProcess, storageInterface, diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index c5a10d4ae9e..7b22561716e 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -212,19 +212,6 @@ env.CppUnitTest( ) env.Library( - target='replication_executor', - source=[ - 'replication_executor.cpp', - ], - LIBDEPS=[ - 'database_task', - 'task_runner', - '$BUILD_DIR/mongo/executor/network_interface', - '$BUILD_DIR/mongo/executor/task_executor_interface', - ], -) - -env.Library( target='scatter_gather', source=[ 'scatter_gather_algorithm.cpp', @@ -235,24 +222,6 @@ env.Library( ], ) - -env.CppUnitTest( - target='replication_executor_test', - source=[ - 'replication_executor_test.cpp', - 'replication_executor_test_fixture.cpp', - ], - LIBDEPS=[ - 'replication_executor', - 'replmocks', - 'service_context_repl_mock_init', - '$BUILD_DIR/mongo/db/auth/authorization_manager_global', - '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', - '$BUILD_DIR/mongo/executor/task_executor_test_fixture', - '$BUILD_DIR/mongo/unittest/concurrency', - ], -) - env.Library( target='oplog_buffer_blocking_queue', source=[ @@ -583,9 +552,11 @@ env.Library('repl_coordinator_impl', LIBDEPS=[ '$BUILD_DIR/mongo/db/commands/test_commands_enabled', '$BUILD_DIR/mongo/db/common', + '$BUILD_DIR/mongo/db/concurrency/lock_manager', '$BUILD_DIR/mongo/db/index/index_descriptor', '$BUILD_DIR/mongo/db/server_options_core', '$BUILD_DIR/mongo/db/service_context', + '$BUILD_DIR/mongo/executor/task_executor_interface', '$BUILD_DIR/mongo/rpc/command_status', '$BUILD_DIR/mongo/rpc/metadata', '$BUILD_DIR/mongo/transport/transport_layer_common', @@ -597,7 +568,6 @@ env.Library('repl_coordinator_impl', 'repl_coordinator_interface', 'repl_settings', 'replica_set_messages', - 'replication_executor', 'replication_process', 'reporter', 'rslog', @@ -617,6 +587,8 @@ env.Library( 'topology_coordinator_impl', '$BUILD_DIR/mongo/db/auth/authorization_manager_global', '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', + '$BUILD_DIR/mongo/executor/network_interface_mock', + '$BUILD_DIR/mongo/executor/thread_pool_task_executor', '$BUILD_DIR/mongo/unittest/unittest', ], ) @@ -851,7 +823,6 @@ env.Library( '$BUILD_DIR/mongo/db/storage/storage_options', 'repl_settings', 'replica_set_messages', - 'replication_executor', 'replication_process', 'serveronly', ], @@ -955,9 +926,10 @@ env.Library( ], LIBDEPS=[ 'replica_set_messages', - 'replication_executor', '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/namespace_string', + '$BUILD_DIR/mongo/executor/remote_command', + '$BUILD_DIR/mongo/executor/task_executor_interface', '$BUILD_DIR/mongo/rpc/command_status', ], ) @@ -1144,27 +1116,6 @@ env.CppUnitTest( ], ) -env.Library( - target='database_task', - source=[ - 'database_task.cpp', - ], - LIBDEPS=[ - '$BUILD_DIR/mongo/db/concurrency/lock_manager', - '$BUILD_DIR/mongo/db/concurrency/write_conflict_exception', - '$BUILD_DIR/mongo/db/curop', - ], -) - -env.CppUnitTest( - target='database_task_test', - source='database_task_test.cpp', - LIBDEPS=[ - 'database_task', - 'task_runner_test_fixture', - ], -) - env.CppUnitTest( target='read_concern_args_test', source=[ @@ -1302,7 +1253,7 @@ env.Library( 'rollback_checker.cpp', ], LIBDEPS=[ - 'replication_executor', + '$BUILD_DIR/mongo/executor/task_executor_interface', ], ) diff --git a/src/mongo/db/repl/database_task.cpp b/src/mongo/db/repl/database_task.cpp deleted file mode 100644 index 41f29aff342..00000000000 --- a/src/mongo/db/repl/database_task.cpp +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/db/concurrency/d_concurrency.h" -#include "mongo/db/concurrency/write_conflict_exception.h" -#include "mongo/db/curop.h" -#include "mongo/db/operation_context.h" -#include "mongo/db/repl/database_task.h" -#include "mongo/util/assert_util.h" - -namespace mongo { -namespace repl { - -// static -DatabaseTask::Task DatabaseTask::makeGlobalExclusiveLockTask(const Task& task) { - invariant(task); - DatabaseTask::Task newTask = [task](OperationContext* opCtx, const Status& status) { - if (!status.isOK()) { - return task(opCtx, status); - } - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - Lock::GlobalWrite lock(opCtx); - return task(opCtx, status); - } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "globalExclusiveLockTask", "global"); - MONGO_UNREACHABLE; - }; - return newTask; -} - -// static -DatabaseTask::Task DatabaseTask::makeDatabaseLockTask(const Task& task, - const std::string& databaseName, - LockMode mode) { - invariant(task); - DatabaseTask::Task newTask = [=](OperationContext* opCtx, const Status& status) { - if (!status.isOK()) { - return task(opCtx, status); - } - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - Lock::DBLock lock(opCtx, databaseName, mode); - return task(opCtx, status); - } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "databaseLockTask", databaseName); - MONGO_UNREACHABLE; - }; - return newTask; -} - -// static -DatabaseTask::Task DatabaseTask::makeCollectionLockTask(const Task& task, - const NamespaceString& nss, - LockMode mode) { - invariant(task); - DatabaseTask::Task newTask = [=](OperationContext* opCtx, const Status& status) { - if (!status.isOK()) { - return task(opCtx, status); - } - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - LockMode permissiveLockMode = isSharedLockMode(mode) ? MODE_IS : MODE_IX; - Lock::DBLock lock(opCtx, nss.db(), permissiveLockMode); - Lock::CollectionLock collectionLock(opCtx->lockState(), nss.toString(), mode); - return task(opCtx, status); - } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "collectionLockTask", nss.toString()); - MONGO_UNREACHABLE; - }; - return newTask; -} - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/database_task.h b/src/mongo/db/repl/database_task.h deleted file mode 100644 index bde2df64c09..00000000000 --- a/src/mongo/db/repl/database_task.h +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <string> - -#include "mongo/db/concurrency/lock_manager_defs.h" -#include "mongo/db/namespace_string.h" -#include "mongo/db/repl/task_runner.h" - -namespace mongo { - -class OperationContext; - -namespace repl { - -class DatabaseTask { -private: - DatabaseTask(); - -public: - using Task = TaskRunner::Task; - - /** - * Creates a task wrapper that runs the target task inside a global exclusive lock. - */ - static Task makeGlobalExclusiveLockTask(const Task& task); - - /** - * Creates a task wrapper that runs the target task inside a database lock. - */ - static Task makeDatabaseLockTask(const Task& task, - const std::string& databaseName, - LockMode mode); - - /** - * Creates a task wrapper that runs the target task inside a collection lock. - * Task acquires database lock before attempting to lock collection. Do not - * use in combination with makeDatabaseLockTask(). - */ - static Task makeCollectionLockTask(const Task& task, const NamespaceString& nss, LockMode mode); -}; - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/database_task_test.cpp b/src/mongo/db/repl/database_task_test.cpp deleted file mode 100644 index beaa896acf0..00000000000 --- a/src/mongo/db/repl/database_task_test.cpp +++ /dev/null @@ -1,177 +0,0 @@ -/** - * Copyright 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/db/client.h" -#include "mongo/db/operation_context.h" -#include "mongo/db/repl/database_task.h" -#include "mongo/db/repl/task_runner.h" -#include "mongo/db/repl/task_runner_test_fixture.h" -#include "mongo/stdx/mutex.h" -#include "mongo/util/concurrency/old_thread_pool.h" - -namespace { - -using namespace mongo; -using namespace mongo::repl; - -const std::string databaseName = "mydb"; -const std::string collectionName = "mycoll"; -const NamespaceString nss(databaseName, collectionName); - -class DatabaseTaskTest : public TaskRunnerTest {}; - -TEST_F(DatabaseTaskTest, TaskRunnerErrorStatus) { - // Should not attempt to acquire lock on error status from task runner. - auto task = [](OperationContext* opCtx, const Status& status) { - ASSERT_FALSE(opCtx); - ASSERT_EQUALS(ErrorCodes::BadValue, status.code()); - return TaskRunner::NextAction::kInvalid; - }; - auto testLockTask = [](DatabaseTask::Task task) { - ASSERT_TRUE(TaskRunner::NextAction::kInvalid == - task(nullptr, Status(ErrorCodes::BadValue, ""))); - }; - testLockTask(DatabaseTask::makeGlobalExclusiveLockTask(task)); - testLockTask(DatabaseTask::makeDatabaseLockTask(task, databaseName, MODE_X)); - testLockTask(DatabaseTask::makeCollectionLockTask(task, nss, MODE_X)); -} - -TEST_F(DatabaseTaskTest, RunGlobalExclusiveLockTask) { - stdx::mutex mutex; - bool called = false; - OperationContext* opCtx = nullptr; - bool lockIsW = false; - Status status = getDetectableErrorStatus(); - // Task returning 'void' implies NextAction::NoAction. - auto task = [&](OperationContext* theTxn, const Status& theStatus) { - stdx::lock_guard<stdx::mutex> lk(mutex); - called = true; - opCtx = theTxn; - lockIsW = opCtx->lockState()->isW(); - status = theStatus; - return TaskRunner::NextAction::kCancel; - }; - getTaskRunner().schedule(DatabaseTask::makeGlobalExclusiveLockTask(task)); - getThreadPool().join(); - ASSERT_FALSE(getTaskRunner().isActive()); - - stdx::lock_guard<stdx::mutex> lk(mutex); - ASSERT_TRUE(called); - ASSERT(opCtx); - ASSERT_TRUE(lockIsW); - ASSERT_OK(status); -} - -void _testRunDatabaseLockTask(DatabaseTaskTest& test, LockMode mode) { - stdx::mutex mutex; - bool called = false; - OperationContext* opCtx = nullptr; - bool isDatabaseLockedForMode = false; - Status status = test.getDetectableErrorStatus(); - // Task returning 'void' implies NextAction::NoAction. - auto task = [&](OperationContext* theTxn, const Status& theStatus) { - stdx::lock_guard<stdx::mutex> lk(mutex); - called = true; - opCtx = theTxn; - isDatabaseLockedForMode = opCtx->lockState()->isDbLockedForMode(databaseName, mode); - status = theStatus; - return TaskRunner::NextAction::kCancel; - }; - test.getTaskRunner().schedule(DatabaseTask::makeDatabaseLockTask(task, databaseName, mode)); - test.getThreadPool().join(); - ASSERT_FALSE(test.getTaskRunner().isActive()); - - stdx::lock_guard<stdx::mutex> lk(mutex); - ASSERT_TRUE(called); - ASSERT(opCtx); - ASSERT_TRUE(isDatabaseLockedForMode); - ASSERT_OK(status); -} - -TEST_F(DatabaseTaskTest, RunDatabaseLockTaskModeX) { - _testRunDatabaseLockTask(*this, MODE_X); -} - -TEST_F(DatabaseTaskTest, RunDatabaseLockTaskModeS) { - _testRunDatabaseLockTask(*this, MODE_S); -} - -TEST_F(DatabaseTaskTest, RunDatabaseLockTaskModeIX) { - _testRunDatabaseLockTask(*this, MODE_IX); -} - -TEST_F(DatabaseTaskTest, RunDatabaseLockTaskModeIS) { - _testRunDatabaseLockTask(*this, MODE_IS); -} - -void _testRunCollectionLockTask(DatabaseTaskTest& test, LockMode mode) { - stdx::mutex mutex; - bool called = false; - OperationContext* opCtx = nullptr; - bool isCollectionLockedForMode = false; - Status status = test.getDetectableErrorStatus(); - // Task returning 'void' implies NextAction::NoAction. - auto task = [&](OperationContext* theTxn, const Status& theStatus) { - stdx::lock_guard<stdx::mutex> lk(mutex); - called = true; - opCtx = theTxn; - isCollectionLockedForMode = - opCtx->lockState()->isCollectionLockedForMode(nss.toString(), mode); - status = theStatus; - return TaskRunner::NextAction::kCancel; - }; - test.getTaskRunner().schedule(DatabaseTask::makeCollectionLockTask(task, nss, mode)); - test.getThreadPool().join(); - ASSERT_FALSE(test.getTaskRunner().isActive()); - - stdx::lock_guard<stdx::mutex> lk(mutex); - ASSERT_TRUE(called); - ASSERT(opCtx); - ASSERT_TRUE(isCollectionLockedForMode); - ASSERT_OK(status); -} - -TEST_F(DatabaseTaskTest, RunCollectionLockTaskModeX) { - _testRunCollectionLockTask(*this, MODE_X); -} - -TEST_F(DatabaseTaskTest, RunCollectionLockTaskModeS) { - _testRunCollectionLockTask(*this, MODE_S); -} - -TEST_F(DatabaseTaskTest, RunCollectionLockTaskModeIX) { - _testRunCollectionLockTask(*this, MODE_IX); -} - -TEST_F(DatabaseTaskTest, RunCollectionLockTaskModeIS) { - _testRunCollectionLockTask(*this, MODE_IS); -} - -} // namespace diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 775efcdf8fb..0ebb923a4ff 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -285,7 +285,7 @@ ReplicationCoordinatorImpl::ReplicationCoordinatorImpl( ServiceContext* service, const ReplSettings& settings, std::unique_ptr<ReplicationCoordinatorExternalState> externalState, - std::unique_ptr<NetworkInterface> network, + std::unique_ptr<executor::TaskExecutor> executor, std::unique_ptr<TopologyCoordinator> topCoord, ReplicationProcess* replicationProcess, StorageInterface* storage, @@ -294,7 +294,7 @@ ReplicationCoordinatorImpl::ReplicationCoordinatorImpl( _settings(settings), _replMode(getReplicationModeFromSettings(settings)), _topCoord(std::move(topCoord)), - _replExecutor(stdx::make_unique<ReplicationExecutor>(std::move(network), prngSeed)), + _replExecutor(std::move(executor)), _externalState(std::move(externalState)), _inShutdown(false), _memberState(MemberState::RS_STARTUP), @@ -438,16 +438,19 @@ bool ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* opCtx) // Use a callback here, because _finishLoadLocalConfig calls isself() which requires // that the server's networking layer be up and running and accepting connections, which // doesn't happen until startReplication finishes. - auto handle = _scheduleDBWork(stdx::bind(&ReplicationCoordinatorImpl::_finishLoadLocalConfig, - this, - stdx::placeholders::_1, - localConfig, - lastOpTimeStatus, - lastVote)); - { - stdx::lock_guard<stdx::mutex> lk(_mutex); - _finishLoadLocalConfigCbh = handle; - } + auto handle = + _replExecutor->scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_finishLoadLocalConfig, + this, + stdx::placeholders::_1, + localConfig, + lastOpTimeStatus, + lastVote)); + if (handle == ErrorCodes::ShutdownInProgress) { + handle = CallbackHandle{}; + } + fassertStatusOK(40446, handle); + stdx::lock_guard<stdx::mutex> lk(_mutex); + _finishLoadLocalConfigCbh = std::move(handle.getValue()); return false; } @@ -536,8 +539,8 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig( if (!isArbiter) { _externalState->startThreads(_settings); - invariant(cbData.opCtx); - _startDataReplication(cbData.opCtx); + auto opCtx = cc().makeOperationContext(); + _startDataReplication(opCtx.get()); } } @@ -690,8 +693,7 @@ void ReplicationCoordinatorImpl::shutdown(OperationContext* opCtx) { // Shutdown must: // * prevent new threads from blocking in awaitReplication // * wake up all existing threads blocking in awaitReplication - // * tell the ReplicationExecutor to shut down - // * wait for the thread running the ReplicationExecutor to finish + // * Shut down and join the execution resources it owns. if (!_settings.usingReplSets()) { return; @@ -802,7 +804,7 @@ bool ReplicationCoordinatorImpl::setFollowerMode(const MemberState& newState) { return false; } - if (auto electionFinishedEvent = _cancelElectionIfNeeded_inTopoLock()) { + if (auto electionFinishedEvent = _cancelElectionIfNeeded_inlock()) { // We were a candidate, which means _topCoord believed us to be in state RS_SECONDARY, and // we know that newState != RS_SECONDARY because we would have returned early, above if // the old and new state were equal. So, try again after the election is over to @@ -2161,53 +2163,51 @@ Status ReplicationCoordinatorImpl::processReplSetReconfig(OperationContext* opCt } auto reconfigFinished = uassertStatusOK(_replExecutor->makeEvent()); - const auto reconfigFinishFn = - [ this, newConfig, myIndex = myIndex.getValue(), reconfigFinished ]( - const executor::TaskExecutor::CallbackArgs& cbData) { - - if (!cbData.status.isOK()) { - return; - } - _finishReplSetReconfig(newConfig, myIndex, reconfigFinished); - }; - - // If it's a force reconfig, the primary node may not be electable after the configuration - // change. In case we are that primary node, finish the reconfig under the global lock, - // so that the step down occurs safely. - StatusWith<CallbackHandle> cbhStatus(ErrorCodes::InternalError, - "reconfigFinishFn hasn't been scheduled"); - if (args.force) { - cbhStatus = _replExecutor->scheduleWorkWithGlobalExclusiveLock(reconfigFinishFn); - } else { - cbhStatus = _replExecutor->scheduleWork(reconfigFinishFn); - } - if (cbhStatus.getStatus() == ErrorCodes::ShutdownInProgress) { - return cbhStatus.getStatus(); - } - - fassert(18824, cbhStatus.getStatus()); - + uassertStatusOK( + _replExecutor->scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_finishReplSetReconfig, + this, + stdx::placeholders::_1, + newConfig, + args.force, + myIndex.getValue(), + reconfigFinished))); configStateGuard.Dismiss(); _replExecutor->waitForEvent(reconfigFinished); return Status::OK(); } void ReplicationCoordinatorImpl::_finishReplSetReconfig( + const executor::TaskExecutor::CallbackArgs& cbData, const ReplSetConfig& newConfig, + const bool isForceReconfig, int myIndex, const executor::TaskExecutor::EventHandle& finishedEvent) { + + if (cbData.status == ErrorCodes::CallbackCanceled) { + return; + } + auto opCtx = cc().makeOperationContext(); + boost::optional<Lock::GlobalWrite> globalExclusiveLock; + if (isForceReconfig) { + // Since it's a force reconfig, the primary node may not be electable after the + // configuration change. In case we are that primary node, finish the reconfig under the + // global lock, so that the step down occurs safely. + globalExclusiveLock.emplace(opCtx.get()); + } stdx::unique_lock<stdx::mutex> lk(_mutex); invariant(_rsConfigState == kConfigReconfiguring); invariant(_rsConfig.isInitialized()); // Do not conduct an election during a reconfig, as the node may not be electable post-reconfig. - if (auto electionFinishedEvent = _cancelElectionIfNeeded_inTopoLock()) { + if (auto electionFinishedEvent = _cancelElectionIfNeeded_inlock()) { // Wait for the election to complete and the node's Role to be set to follower. _replExecutor->onEvent(electionFinishedEvent, stdx::bind(&ReplicationCoordinatorImpl::_finishReplSetReconfig, this, + stdx::placeholders::_1, newConfig, + isForceReconfig, myIndex, finishedEvent)); return; @@ -2220,15 +2220,9 @@ void ReplicationCoordinatorImpl::_finishReplSetReconfig( // For example, if we change the meaning of the "committed" snapshot from applied -> durable. _dropAllSnapshots_inlock(); - auto evh = _resetElectionInfoOnProtocolVersionUpgrade(oldConfig, newConfig); lk.unlock(); - if (evh) { - _replExecutor->onEvent(evh, [this, action](const CallbackArgs& cbArgs) { - _performPostMemberStateUpdateAction(action); - }); - } else { - _performPostMemberStateUpdateAction(action); - } + _resetElectionInfoOnProtocolVersionUpgrade(opCtx.get(), oldConfig, newConfig); + _performPostMemberStateUpdateAction(action); _replExecutor->signalEvent(finishedEvent); } @@ -3276,90 +3270,32 @@ void ReplicationCoordinatorImpl::waitForElectionDryRunFinish_forTest() { } } -EventHandle ReplicationCoordinatorImpl::_resetElectionInfoOnProtocolVersionUpgrade( - const ReplSetConfig& oldConfig, const ReplSetConfig& newConfig) { +void ReplicationCoordinatorImpl::_resetElectionInfoOnProtocolVersionUpgrade( + OperationContext* opCtx, const ReplSetConfig& oldConfig, const ReplSetConfig& newConfig) { + // On protocol version upgrade, reset last vote as if I just learned the term 0 from other // nodes. if (!oldConfig.isInitialized() || oldConfig.getProtocolVersion() >= newConfig.getProtocolVersion()) { - return {}; + return; } invariant(newConfig.getProtocolVersion() == 1); - // Write last vote - auto evhStatus = _replExecutor->makeEvent(); - if (evhStatus.getStatus() == ErrorCodes::ShutdownInProgress) { - return {}; - } - invariant(evhStatus.isOK()); - auto evh = evhStatus.getValue(); - - auto cbStatus = _replExecutor->scheduleDBWork([this, evh](const CallbackArgs& cbData) { - if (cbData.status == ErrorCodes::CallbackCanceled) { - return; - } - invariant(cbData.opCtx); - - LastVote lastVote{OpTime::kInitialTerm, -1}; - auto status = _externalState->storeLocalLastVoteDocument(cbData.opCtx, lastVote); - invariant(status.isOK()); - _replExecutor->signalEvent(evh); - }); - if (cbStatus.getStatus() == ErrorCodes::ShutdownInProgress) { - return {}; - } - invariant(cbStatus.isOK()); - return evh; -} - -CallbackHandle ReplicationCoordinatorImpl::_scheduleWork(const CallbackFn& work) { - auto scheduleFn = [this](const CallbackFn& workWrapped) { - return _replExecutor->scheduleWork(workWrapped); - }; - return _wrapAndScheduleWork(scheduleFn, work); + const LastVote lastVote{OpTime::kInitialTerm, -1}; + fassert(40445, _externalState->storeLocalLastVoteDocument(opCtx, lastVote)); } CallbackHandle ReplicationCoordinatorImpl::_scheduleWorkAt(Date_t when, const CallbackFn& work) { - auto scheduleFn = [this, when](const CallbackFn& workWrapped) { - return _replExecutor->scheduleWorkAt(when, workWrapped); - }; - return _wrapAndScheduleWork(scheduleFn, work); -} - -void ReplicationCoordinatorImpl::_scheduleWorkAndWaitForCompletion(const CallbackFn& work) { - if (auto handle = _scheduleWork(work)) { - _replExecutor->wait(handle); - } -} - -void ReplicationCoordinatorImpl::_scheduleWorkAtAndWaitForCompletion(Date_t when, - const CallbackFn& work) { - if (auto handle = _scheduleWorkAt(when, work)) { - _replExecutor->wait(handle); - } -} - -CallbackHandle ReplicationCoordinatorImpl::_scheduleDBWork(const CallbackFn& work) { - auto scheduleFn = [this](const CallbackFn& workWrapped) { - return _replExecutor->scheduleDBWork(workWrapped); - }; - return _wrapAndScheduleWork(scheduleFn, work); -} - -CallbackHandle ReplicationCoordinatorImpl::_wrapAndScheduleWork(ScheduleFn scheduleFn, - const CallbackFn& work) { - auto workWrapped = [this, work](const CallbackArgs& args) { + auto cbh = _replExecutor->scheduleWorkAt(when, [work](const CallbackArgs& args) { if (args.status == ErrorCodes::CallbackCanceled) { return; } work(args); - }; - auto cbh = scheduleFn(workWrapped); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return CallbackHandle(); + }); + if (cbh == ErrorCodes::ShutdownInProgress) { + return {}; } - fassert(28800, cbh.getStatus()); - return cbh.getValue(); + return fassertStatusOK(28800, cbh); } EventHandle ReplicationCoordinatorImpl::_makeEvent() { @@ -3429,8 +3365,7 @@ void ReplicationCoordinatorImpl::setIndexPrefetchConfig( _indexPrefetchConfig = cfg; } -executor::TaskExecutor::EventHandle -ReplicationCoordinatorImpl::_cancelElectionIfNeeded_inTopoLock() { +executor::TaskExecutor::EventHandle ReplicationCoordinatorImpl::_cancelElectionIfNeeded_inlock() { if (_topCoord->getRole() != TopologyCoordinator::Role::candidate) { return {}; } diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 385413d8cb2..4bdb1cd4daf 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -42,7 +42,6 @@ #include "mongo/db/repl/repl_set_config.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/replication_coordinator_external_state.h" -#include "mongo/db/repl/replication_executor.h" #include "mongo/db/repl/sync_source_resolver.h" #include "mongo/db/repl/topology_coordinator.h" #include "mongo/db/repl/update_position_args.h" @@ -94,7 +93,7 @@ public: ReplicationCoordinatorImpl(ServiceContext* serviceContext, const ReplSettings& settings, std::unique_ptr<ReplicationCoordinatorExternalState> externalState, - std::unique_ptr<executor::NetworkInterface> network, + std::unique_ptr<executor::TaskExecutor> executor, std::unique_ptr<TopologyCoordinator> topoCoord, ReplicationProcess* replicationProcess, StorageInterface* storage, @@ -108,12 +107,6 @@ public: virtual void shutdown(OperationContext* opCtx) override; - ReplicationExecutor* getExecutor() { - return _replExecutor.get(); - } - - virtual void appendDiagnosticBSON(BSONObjBuilder* bob) override; - virtual const ReplSettings& getSettings() const override; virtual Mode getReplicationMode() const override; @@ -320,6 +313,8 @@ public: virtual void waitUntilSnapshotCommitted(OperationContext* opCtx, const SnapshotName& untilSnapshot) override; + virtual void appendDiagnosticBSON(BSONObjBuilder*) override; + virtual void appendConnectionStats(executor::ConnectionPoolStats* stats) const override; virtual size_t getNumUncommittedSnapshots() override; @@ -569,7 +564,7 @@ private: private: ReplicationCoordinatorImpl* _repl; // Not owned. // Callback handle used to cancel a scheduled catchup timeout callback. - ReplicationExecutor::CallbackHandle _timeoutCbh; + executor::TaskExecutor::CallbackHandle _timeoutCbh; // Handle to a Waiter that contains the current target optime to reach after which // we can exit catchup mode. std::unique_ptr<CallbackWaiter> _waiter; @@ -798,7 +793,9 @@ private: * Finishes the work of processReplSetReconfig, in the event of * a successful quorum check. */ - void _finishReplSetReconfig(const ReplSetConfig& newConfig, + void _finishReplSetReconfig(const executor::TaskExecutor::CallbackArgs& cbData, + const ReplSetConfig& newConfig, + bool isForceReconfig, int myIndex, const executor::TaskExecutor::EventHandle& finishedEvent); @@ -921,13 +918,6 @@ private: void _scheduleHeartbeatReconfig_inlock(const ReplSetConfig& newConfig); /** - * Callback that continues a heartbeat-initiated reconfig after a running election - * completes. - */ - void _heartbeatReconfigAfterElectionCanceled(const executor::TaskExecutor::CallbackArgs& cbData, - const ReplSetConfig& newConfig); - - /** * Method to write a configuration transmitted via heartbeat message to stable storage. */ void _heartbeatReconfigStore(const executor::TaskExecutor::CallbackArgs& cbd, @@ -1059,18 +1049,10 @@ private: /** * Resets the term of last vote to 0 to prevent any node from voting for term 0. - * Returns the event handle that indicates when last vote write finishes. */ - EventHandle _resetElectionInfoOnProtocolVersionUpgrade(const ReplSetConfig& oldConfig, - const ReplSetConfig& newConfig); - - /** - * Schedules work and returns handle to callback. - * If work cannot be scheduled due to shutdown, returns empty handle. - * All other non-shutdown scheduling failures will abort the process. - * Does not run 'work' if callback is canceled. - */ - CallbackHandle _scheduleWork(const CallbackFn& work); + void _resetElectionInfoOnProtocolVersionUpgrade(OperationContext* opCtx, + const ReplSetConfig& oldConfig, + const ReplSetConfig& newConfig); /** * Schedules work to be run no sooner than 'when' and returns handle to callback. @@ -1081,31 +1063,6 @@ private: CallbackHandle _scheduleWorkAt(Date_t when, const CallbackFn& work); /** - * Schedules work and waits for completion. - */ - void _scheduleWorkAndWaitForCompletion(const CallbackFn& work); - - /** - * Schedules work to be run no sooner than 'when' and waits for completion. - */ - void _scheduleWorkAtAndWaitForCompletion(Date_t when, const CallbackFn& work); - - /** - * Schedules DB work and returns handle to callback. - * If work cannot be scheduled due to shutdown, returns empty handle. - * All other non-shutdown scheduling failures will abort the process. - * Does not run 'work' if callback is canceled. - */ - CallbackHandle _scheduleDBWork(const CallbackFn& work); - - /** - * Does the actual work of scheduling the work with the executor. - * Used by _scheduleWork() and _scheduleWorkAt() only. - * Do not call this function directly. - */ - CallbackHandle _wrapAndScheduleWork(ScheduleFn scheduleFn, const CallbackFn& work); - - /** * Creates an event. * Returns invalid event handle if the executor is shutting down. * Otherwise aborts on non-shutdown error. @@ -1133,10 +1090,8 @@ private: * Cancels the running election, if any, and returns an event that will be signaled when the * canceled election completes. If there is no running election, returns an invalid event * handle. - * - * Caller must already have locked the _topoMutex. */ - executor::TaskExecutor::EventHandle _cancelElectionIfNeeded_inTopoLock(); + executor::TaskExecutor::EventHandle _cancelElectionIfNeeded_inlock(); /** * Waits until the optime of the current node is at least the opTime specified in 'readConcern'. @@ -1196,7 +1151,7 @@ private: std::unique_ptr<TopologyCoordinator> _topCoord; // (M) // Executor that drives the topology coordinator. - std::unique_ptr<ReplicationExecutor> _replExecutor; // (S) + std::unique_ptr<executor::TaskExecutor> _replExecutor; // (S) // Pointer to the ReplicationCoordinatorExternalState owned by this ReplicationCoordinator. std::unique_ptr<ReplicationCoordinatorExternalState> _externalState; // (PS) diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp index 4d42efe75eb..d227f78b76b 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp @@ -189,7 +189,7 @@ void ReplicationCoordinatorImpl::_onDryRunComplete(long long originalTerm) { // Store the vote in persistent storage. LastVote lastVote{originalTerm + 1, _selfIndex}; - auto cbStatus = _replExecutor->scheduleDBWork( + auto cbStatus = _replExecutor->scheduleWork( [this, lastVote](const executor::TaskExecutor::CallbackArgs& cbData) { _writeLastVoteForMyElection(lastVote, cbData); }); @@ -206,21 +206,23 @@ void ReplicationCoordinatorImpl::_writeLastVoteForMyElection( // so _mutex must be unlocked here. However, we cannot return until we // lock it because we want to lose the election on cancel or error and // doing so requires _mutex. - Status status = Status::OK(); - if (cbData.status.isOK()) { - invariant(cbData.opCtx); - status = _externalState->storeLocalLastVoteDocument(cbData.opCtx, lastVote); - } + auto status = [&] { + if (!cbData.status.isOK()) { + return cbData.status; + } + auto opCtx = cc().makeOperationContext(); + return _externalState->storeLocalLastVoteDocument(opCtx.get(), lastVote); + }(); stdx::lock_guard<stdx::mutex> lk(_mutex); invariant(_voteRequester); LoseElectionDryRunGuardV1 lossGuard(this); - if (!cbData.status.isOK()) { + if (status == ErrorCodes::CallbackCanceled) { return; } if (!status.isOK()) { - error() << "failed to store LastVote document when voting for myself: " << status; + log() << "failed to store LastVote document when voting for myself: " << status; return; } diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp index e4417546af9..3c3e9263a36 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp @@ -1330,11 +1330,6 @@ protected: break; } net->runReadyNetworkOperations(); - // Successful elections need to write the last vote to disk, which is done by DB worker. - // Wait until DB worker finishes its job to ensure the synchronization with the - // executor. - getReplExec()->waitForDBWork_forTest(); - net->runReadyNetworkOperations(); net->exitNetwork(); } } diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index 3b6836c45b9..04216de9202 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -337,7 +337,7 @@ executor::TaskExecutor::EventHandle ReplicationCoordinatorImpl::_stepDownStart() return finishEvent; } - _replExecutor->scheduleWorkWithGlobalExclusiveLock(stdx::bind( + _replExecutor->scheduleWork(stdx::bind( &ReplicationCoordinatorImpl::_stepDownFinish, this, stdx::placeholders::_1, finishEvent)); return finishEvent; } @@ -345,24 +345,15 @@ executor::TaskExecutor::EventHandle ReplicationCoordinatorImpl::_stepDownStart() void ReplicationCoordinatorImpl::_stepDownFinish( const executor::TaskExecutor::CallbackArgs& cbData, const executor::TaskExecutor::EventHandle& finishedEvent) { + if (cbData.status == ErrorCodes::CallbackCanceled) { return; } - if (MONGO_FAIL_POINT(blockHeartbeatStepdown)) { - // Must reschedule rather than block so we don't take up threads in the replication - // executor. - sleepmillis(10); - _replExecutor->scheduleWorkWithGlobalExclusiveLock( - stdx::bind(&ReplicationCoordinatorImpl::_stepDownFinish, - this, - stdx::placeholders::_1, - finishedEvent)); - - return; - } + MONGO_FAIL_POINT_PAUSE_WHILE_SET(blockHeartbeatStepdown); - invariant(cbData.opCtx); + auto opCtx = cc().makeOperationContext(); + Lock::GlobalWrite globalExclusiveLock(opCtx.get()); // TODO Add invariant that we've got global shared or global exclusive lock, when supported // by lock manager. stdx::unique_lock<stdx::mutex> lk(_mutex); @@ -402,44 +393,27 @@ void ReplicationCoordinatorImpl::_scheduleHeartbeatReconfig_inlock(const ReplSet _setConfigState_inlock(kConfigHBReconfiguring); invariant(!_rsConfig.isInitialized() || _rsConfig.getConfigVersion() < newConfig.getConfigVersion()); - if (auto electionFinishedEvent = _cancelElectionIfNeeded_inTopoLock()) { + if (auto electionFinishedEvent = _cancelElectionIfNeeded_inlock()) { LOG_FOR_HEARTBEATS(2) << "Rescheduling heartbeat reconfig to version " << newConfig.getConfigVersion() << " to be processed after election is cancelled."; - _replExecutor->onEvent( - electionFinishedEvent, - stdx::bind(&ReplicationCoordinatorImpl::_heartbeatReconfigAfterElectionCanceled, - this, - stdx::placeholders::_1, - newConfig)); - return; - } - _replExecutor->scheduleDBWork(stdx::bind(&ReplicationCoordinatorImpl::_heartbeatReconfigStore, - this, - stdx::placeholders::_1, - newConfig)); -} -void ReplicationCoordinatorImpl::_heartbeatReconfigAfterElectionCanceled( - const executor::TaskExecutor::CallbackArgs& cbData, const ReplSetConfig& newConfig) { - if (cbData.status == ErrorCodes::CallbackCanceled) { - return; - } - - fassert(18911, cbData.status); - stdx::lock_guard<stdx::mutex> lk(_mutex); - if (_inShutdown) { + _replExecutor->onEvent(electionFinishedEvent, + stdx::bind(&ReplicationCoordinatorImpl::_heartbeatReconfigStore, + this, + stdx::placeholders::_1, + newConfig)); return; } - - _replExecutor->scheduleDBWork(stdx::bind(&ReplicationCoordinatorImpl::_heartbeatReconfigStore, - this, - stdx::placeholders::_1, - newConfig)); + _replExecutor->scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_heartbeatReconfigStore, + this, + stdx::placeholders::_1, + newConfig)); } void ReplicationCoordinatorImpl::_heartbeatReconfigStore( const executor::TaskExecutor::CallbackArgs& cbd, const ReplSetConfig& newConfig) { + if (cbd.status.code() == ErrorCodes::CallbackCanceled) { log() << "The callback to persist the replica set configuration was canceled - " << "the configuration was not persisted but was used: " << newConfig.toBSON(); @@ -470,7 +444,9 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( } else { LOG_FOR_HEARTBEATS(2) << "Config with version " << newConfig.getConfigVersion() << " validated for reconfig; persisting to disk."; - Status status = _externalState->storeLocalConfigDocument(cbd.opCtx, newConfig.toBSON()); + + auto opCtx = cc().makeOperationContext(); + auto status = _externalState->storeLocalConfigDocument(opCtx.get(), newConfig.toBSON()); bool isFirstConfig; { stdx::lock_guard<stdx::mutex> lk(_mutex); @@ -493,33 +469,13 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( newConfig.getMemberAt(myIndex.getValue()).isArbiter(); if (!isArbiter && isFirstConfig) { _externalState->startThreads(_settings); - _startDataReplication(cbd.opCtx); + _startDataReplication(opCtx.get()); } } - LOG_FOR_HEARTBEATS(2) - << "New configuration with version " << newConfig.getConfigVersion() - << " persisted to local storage; scheduling work to install new config in memory."; - - const CallbackFn reconfigFinishFn( - stdx::bind(&ReplicationCoordinatorImpl::_heartbeatReconfigFinish, - this, - stdx::placeholders::_1, - newConfig, - myIndex)); - - // Make sure that the reconfigFinishFn doesn't finish until we've reset - // _heartbeatReconfigThread. - stdx::lock_guard<stdx::mutex> lk(_mutex); - if (_memberState.primary()) { - // If the primary is receiving a heartbeat reconfig, that strongly suggests - // that there has been a force reconfiguration. In any event, it might lead - // to this node stepping down as primary, so we'd better do it with the global - // lock. - _replExecutor->scheduleWorkWithGlobalExclusiveLock(reconfigFinishFn); - } else { - _replExecutor->scheduleWork(reconfigFinishFn); - } + LOG_FOR_HEARTBEATS(2) << "New configuration with version " << newConfig.getConfigVersion() + << " persisted to local storage; installing new config in memory"; + _heartbeatReconfigFinish(cbd, newConfig, myIndex); } void ReplicationCoordinatorImpl::_heartbeatReconfigFinish( @@ -544,32 +500,24 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish( return; } - stdx::unique_lock<stdx::mutex> lk(_mutex); + auto opCtx = cc().makeOperationContext(); + boost::optional<Lock::GlobalWrite> globalExclusiveLock; + stdx::unique_lock<stdx::mutex> lk{_mutex}; + if (_memberState.primary()) { + // If we are primary, we need the global lock in MODE_X to step down. If we somehow + // transition out of primary while waiting for the global lock, there's no harm in holding + // it. + lk.unlock(); + globalExclusiveLock.emplace(opCtx.get()); + lk.lock(); + } invariant(_rsConfigState == kConfigHBReconfiguring); invariant(!_rsConfig.isInitialized() || _rsConfig.getConfigVersion() < newConfig.getConfigVersion()); - if (_getMemberState_inlock().primary() && !cbData.opCtx) { - LOG_FOR_HEARTBEATS(2) << "Attempting to install new config without locks but we are " - "primary; Rescheduling work with global exclusive lock to finish " - "reconfig."; - // Not having an OperationContext in the CallbackData means we definitely aren't holding - // the global lock. Since we're primary and this reconfig could cause us to stepdown, - // reschedule this work with the global exclusive lock so the stepdown is safe. - // TODO(spencer): When we *do* have an OperationContext, consult it to confirm that - // we are indeed holding the global lock. - _replExecutor->scheduleWorkWithGlobalExclusiveLock( - stdx::bind(&ReplicationCoordinatorImpl::_heartbeatReconfigFinish, - this, - stdx::placeholders::_1, - newConfig, - myIndex)); - return; - } - // Do not conduct an election during a reconfig, as the node may not be electable post-reconfig. - if (auto electionFinishedEvent = _cancelElectionIfNeeded_inTopoLock()) { + if (auto electionFinishedEvent = _cancelElectionIfNeeded_inlock()) { LOG_FOR_HEARTBEATS(0) << "Waiting for election to complete before finishing reconfig to version " << newConfig.getConfigVersion(); @@ -607,16 +555,9 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish( // the data structures inside of the TopologyCoordinator. const int myIndexValue = myIndex.getStatus().isOK() ? myIndex.getValue() : -1; const PostMemberStateUpdateAction action = _setCurrentRSConfig_inlock(newConfig, myIndexValue); - auto evh = _resetElectionInfoOnProtocolVersionUpgrade(oldConfig, newConfig); lk.unlock(); - if (evh) { - _replExecutor->onEvent(evh, - [this, action](const executor::TaskExecutor::CallbackArgs& cbArgs) { - _performPostMemberStateUpdateAction(action); - }); - } else { - _performPostMemberStateUpdateAction(action); - } + _resetElectionInfoOnProtocolVersionUpgrade(opCtx.get(), oldConfig, newConfig); + _performPostMemberStateUpdateAction(action); } void ReplicationCoordinatorImpl::_trackHeartbeatHandle_inlock( diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp index 7f4cf49eed8..291d6a7652f 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp @@ -44,6 +44,8 @@ #include "mongo/db/repl/storage_interface_mock.h" #include "mongo/db/repl/topology_coordinator_impl.h" #include "mongo/executor/network_interface_mock.h" +#include "mongo/executor/thread_pool_mock.h" +#include "mongo/executor/thread_pool_task_executor.h" #include "mongo/stdx/functional.h" #include "mongo/stdx/memory.h" #include "mongo/unittest/unittest.h" @@ -57,8 +59,8 @@ using executor::NetworkInterfaceMock; using executor::RemoteCommandRequest; using executor::RemoteCommandResponse; -ReplicationExecutor* ReplCoordTest::getReplExec() { - return _repl->getExecutor(); +executor::TaskExecutor* ReplCoordTest::getReplExec() { + return _replExec; } ReplSetConfig ReplCoordTest::assertMakeRSConfig(const BSONObj& configBson) { @@ -138,10 +140,16 @@ void ReplCoordTest::init() { _net = net.get(); auto externalState = stdx::make_unique<ReplicationCoordinatorExternalStateMock>(); _externalState = externalState.get(); + executor::ThreadPoolMock::Options tpOptions; + tpOptions.onCreateThread = []() { Client::initThread("replexec"); }; + auto pool = stdx::make_unique<executor::ThreadPoolMock>(_net, seed, tpOptions); + auto replExec = + stdx::make_unique<executor::ThreadPoolTaskExecutor>(std::move(pool), std::move(net)); + _replExec = replExec.get(); _repl = stdx::make_unique<ReplicationCoordinatorImpl>(service, _settings, std::move(externalState), - std::move(net), + std::move(replExec), std::move(topo), replicationProcess, storageInterface, @@ -350,11 +358,6 @@ void ReplCoordTest::simulateSuccessfulV1ElectionAt(Date_t electionTime) { net->blackHole(noi); } net->runReadyNetworkOperations(); - // Successful elections need to write the last vote to disk, which is done by DB worker. - // Wait until DB worker finishes its job to ensure the synchronization with the - // executor. - getReplExec()->waitForDBWork_forTest(); - net->runReadyNetworkOperations(); hasReadyRequests = net->hasReadyRequests(); getNet()->exitNetwork(); } diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.h b/src/mongo/db/repl/replication_coordinator_test_fixture.h index ab2653be11d..9f1b27aabea 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.h +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.h @@ -33,7 +33,7 @@ #include "mongo/db/client.h" #include "mongo/db/repl/repl_settings.h" #include "mongo/db/repl/replication_coordinator.h" -#include "mongo/db/repl/replication_executor.h" +#include "mongo/executor/task_executor.h" #include "mongo/unittest/unittest.h" namespace mongo { @@ -102,7 +102,7 @@ protected: /** * Gets the replication executor under test. */ - ReplicationExecutor* getReplExec(); + executor::TaskExecutor* getReplExec(); /** * Gets the replication coordinator under test. @@ -261,10 +261,13 @@ private: std::unique_ptr<ReplicationCoordinatorImpl> _repl; // Owned by ReplicationCoordinatorImpl TopologyCoordinatorImpl* _topo = nullptr; - // Owned by ReplicationExecutor + // Owned by executor executor::NetworkInterfaceMock* _net = nullptr; // Owned by ReplicationCoordinatorImpl ReplicationCoordinatorExternalStateMock* _externalState = nullptr; + // Owned by ReplicationCoordinatorImpl + executor::TaskExecutor* _replExec = nullptr; + ReplSettings _settings; bool _callShutdown = false; ServiceContext::UniqueClient _client = getGlobalServiceContext()->makeClient("testClient"); diff --git a/src/mongo/db/repl/replication_executor.cpp b/src/mongo/db/repl/replication_executor.cpp deleted file mode 100644 index cfdaa785b1a..00000000000 --- a/src/mongo/db/repl/replication_executor.cpp +++ /dev/null @@ -1,655 +0,0 @@ -/** - * Copyright (C) 2014 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kExecutor - -#include "mongo/platform/basic.h" - -#include "mongo/db/repl/replication_executor.h" - -#include <limits> - -#include "mongo/db/client.h" -#include "mongo/db/repl/database_task.h" -#include "mongo/executor/network_interface.h" -#include "mongo/util/assert_util.h" -#include "mongo/util/log.h" -#include "mongo/util/mongoutils/str.h" - -namespace mongo { -namespace repl { - -namespace { - -const char kReplicationExecutorThreadName[] = "ReplicationExecutor"; - -stdx::function<void()> makeNoExcept(const stdx::function<void()>& fn); - -} // namespace - -using executor::NetworkInterface; -using executor::RemoteCommandRequest; -using executor::RemoteCommandResponse; - -ReplicationExecutor::ReplicationExecutor(std::unique_ptr<NetworkInterface> netInterface, - int64_t prngSeed) - : _random(prngSeed), - _networkInterface(std::move(netInterface)), - _inShutdown(false), - _dblockWorkers(OldThreadPool::DoNotStartThreadsTag(), 3, "replExecDBWorker-"), - _dblockTaskRunner(&_dblockWorkers), - _dblockExclusiveLockTaskRunner(&_dblockWorkers) {} - -ReplicationExecutor::~ReplicationExecutor() { - // join must have been called - invariant(!_executorThread.joinable()); -} - -BSONObj ReplicationExecutor::getDiagnosticBSON() const { - BSONObjBuilder b; - appendDiagnosticBSON(&b); - return b.obj(); -} - -void ReplicationExecutor::appendDiagnosticBSON(BSONObjBuilder* builder) const { - stdx::lock_guard<stdx::mutex> lk(_mutex); - - // Counters - { - BSONObjBuilder counters(builder->subobjStart("counters")); - counters.appendIntOrLL("eventCreated", _counterCreatedEvents); - counters.appendIntOrLL("eventWait", _counterCreatedEvents); - counters.appendIntOrLL("cancels", _counterCancels); - counters.appendIntOrLL("waits", _counterWaits); - counters.appendIntOrLL("scheduledNetCmd", _counterScheduledCommands); - counters.appendIntOrLL("scheduledDBWork", _counterScheduledDBWorks); - counters.appendIntOrLL("scheduledXclWork", _counterScheduledExclusiveWorks); - counters.appendIntOrLL("scheduledWorkAt", _counterScheduledWorkAts); - counters.appendIntOrLL("scheduledWork", _counterScheduledWorks); - counters.appendIntOrLL("schedulingFailures", _counterSchedulingFailures); - } - - // Queues - { - BSONObjBuilder queues(builder->subobjStart("queues")); - queues.appendIntOrLL("networkInProgress", _networkInProgressQueue.size()); - queues.appendIntOrLL("dbWorkInProgress", _dbWorkInProgressQueue.size()); - queues.appendIntOrLL("exclusiveInProgress", _exclusiveLockInProgressQueue.size()); - queues.appendIntOrLL("sleepers", _sleepersQueue.size()); - queues.appendIntOrLL("ready", _readyQueue.size()); - queues.appendIntOrLL("free", _freeQueue.size()); - queues.done(); - } - - builder->appendIntOrLL("unsignaledEvents", _unsignaledEvents.size()); - builder->appendIntOrLL("eventWaiters", _totalEventWaiters); - builder->append("shuttingDown", _inShutdown); - builder->append("networkInterface", _networkInterface->getDiagnosticString()); -} - -Date_t ReplicationExecutor::now() { - return _networkInterface->now(); -} - -void ReplicationExecutor::run() { - setThreadName(kReplicationExecutorThreadName); - Client::initThread(kReplicationExecutorThreadName); - _networkInterface->startup(); - _dblockWorkers.startThreads(); - std::pair<WorkItem, CallbackHandle> work; - while ((work = getWork()).first.callback.isValid()) { - { - stdx::lock_guard<stdx::mutex> lk(_terribleExLockSyncMutex); - const Callback* callback = _getCallbackFromHandle(work.first.callback); - const Status inStatus = callback->_isCanceled - ? Status(ErrorCodes::CallbackCanceled, "Callback canceled") - : Status::OK(); - makeNoExcept( - stdx::bind(callback->_callbackFn, CallbackArgs(this, work.second, inStatus)))(); - } - signalEvent(work.first.finishedEvent); - } - finishShutdown(); - _networkInterface->shutdown(); -} - -void ReplicationExecutor::startup() { - // Ensure that thread has not yet been created - invariant(!_executorThread.joinable()); - - _executorThread = stdx::thread([this] { run(); }); -} - -void ReplicationExecutor::shutdown() { - // Correct shutdown needs to: - // * Disable future work queueing. - // * drain all of the unsignaled events, sleepers, and ready queue, by running those - // callbacks with a "shutdown" or "canceled" status. - // * Signal all threads blocked in waitForEvent, and wait for them to return from that method. - stdx::lock_guard<stdx::mutex> lk(_mutex); - if (_inShutdown) - return; - _inShutdown = true; - - _readyQueue.splice(_readyQueue.end(), _dbWorkInProgressQueue); - _readyQueue.splice(_readyQueue.end(), _exclusiveLockInProgressQueue); - _readyQueue.splice(_readyQueue.end(), _networkInProgressQueue); - _readyQueue.splice(_readyQueue.end(), _sleepersQueue); - for (auto event : _unsignaledEvents) { - _readyQueue.splice(_readyQueue.end(), _getEventFromHandle(event)->_waiters); - } - for (auto readyWork : _readyQueue) { - auto callback = _getCallbackFromHandle(readyWork.callback); - callback->_isCanceled = true; - callback->_isSleeper = false; - } - - _networkInterface->signalWorkAvailable(); -} - -void ReplicationExecutor::join() { - invariant(_executorThread.joinable()); - _executorThread.join(); -} - -void ReplicationExecutor::finishShutdown() { - _dblockExclusiveLockTaskRunner.cancel(); - _dblockTaskRunner.cancel(); - _dblockWorkers.join(); - stdx::unique_lock<stdx::mutex> lk(_mutex); - invariant(_inShutdown); - invariant(_dbWorkInProgressQueue.empty()); - invariant(_exclusiveLockInProgressQueue.empty()); - invariant(_readyQueue.empty()); - invariant(_sleepersQueue.empty()); - - while (!_unsignaledEvents.empty()) { - EventList::iterator eventIter = _unsignaledEvents.begin(); - invariant(_getEventFromHandle(*eventIter)->_waiters.empty()); - signalEvent_inlock(*eventIter); - } - - while (_totalEventWaiters > 0) - _noMoreWaitingThreads.wait(lk); - - invariant(_dbWorkInProgressQueue.empty()); - invariant(_exclusiveLockInProgressQueue.empty()); - invariant(_readyQueue.empty()); - invariant(_sleepersQueue.empty()); - invariant(_unsignaledEvents.empty()); -} - -void ReplicationExecutor::maybeNotifyShutdownComplete_inlock() { - if (_totalEventWaiters == 0) - _noMoreWaitingThreads.notify_all(); -} - -StatusWith<ReplicationExecutor::EventHandle> ReplicationExecutor::makeEvent() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - ++_counterCreatedEvents; - return makeEvent_inlock(); -} - -StatusWith<ReplicationExecutor::EventHandle> ReplicationExecutor::makeEvent_inlock() { - if (_inShutdown) - return StatusWith<EventHandle>(ErrorCodes::ShutdownInProgress, "Shutdown in progress"); - - _unsignaledEvents.emplace_back(); - auto event = std::make_shared<Event>(this, --_unsignaledEvents.end()); - setEventForHandle(&_unsignaledEvents.back(), std::move(event)); - return _unsignaledEvents.back(); -} - -void ReplicationExecutor::signalEvent(const EventHandle& eventHandle) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - signalEvent_inlock(eventHandle); -} - -void ReplicationExecutor::signalEvent_inlock(const EventHandle& eventHandle) { - Event* event = _getEventFromHandle(eventHandle); - event->_signal_inlock(); - _unsignaledEvents.erase(event->_iter); -} - -void ReplicationExecutor::waitForEvent(const EventHandle& event) { - ++_counterWaitEvents; - _getEventFromHandle(event)->waitUntilSignaled(); -} - -void ReplicationExecutor::cancel(const CallbackHandle& cbHandle) { - ++_counterCancels; - _getCallbackFromHandle(cbHandle)->cancel(); -}; - -void ReplicationExecutor::wait(const CallbackHandle& cbHandle) { - ++_counterWaits; - _getCallbackFromHandle(cbHandle)->waitForCompletion(); -}; - -StatusWith<ReplicationExecutor::CallbackHandle> ReplicationExecutor::onEvent( - const EventHandle& eventHandle, const CallbackFn& work) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - WorkQueue* queue = &_readyQueue; - Event* event = _getEventFromHandle(eventHandle); - if (!event->_isSignaled) { - queue = &event->_waiters; - } else { - queue = &_readyQueue; - _networkInterface->signalWorkAvailable(); - } - return enqueueWork_inlock(queue, work); -} - -static void remoteCommandFinished(const ReplicationExecutor::CallbackArgs& cbData, - const ReplicationExecutor::RemoteCommandCallbackFn& cb, - const RemoteCommandRequest& request, - const RemoteCommandResponse& response) { - if (cbData.status.isOK()) { - cb(ReplicationExecutor::RemoteCommandCallbackArgs( - cbData.executor, cbData.myHandle, request, response)); - } else { - cb(ReplicationExecutor::RemoteCommandCallbackArgs( - cbData.executor, cbData.myHandle, request, cbData.status)); - } -} - -static void remoteCommandFailedEarly(const ReplicationExecutor::CallbackArgs& cbData, - const ReplicationExecutor::RemoteCommandCallbackFn& cb, - const RemoteCommandRequest& request) { - invariant(!cbData.status.isOK()); - cb(ReplicationExecutor::RemoteCommandCallbackArgs( - cbData.executor, cbData.myHandle, request, cbData.status)); -} - -void ReplicationExecutor::_finishRemoteCommand(const RemoteCommandRequest& request, - const RemoteCommandResponse& response, - const CallbackHandle& cbHandle, - const uint64_t expectedHandleGeneration, - const RemoteCommandCallbackFn& cb) { - Callback* callback = _getCallbackFromHandle(cbHandle); - const WorkQueue::iterator iter = callback->_iter; - - stdx::lock_guard<stdx::mutex> lk(_mutex); - if (_inShutdown) { - return; - } - - if (expectedHandleGeneration != iter->generation) { - return; - } - - LOG(4) << "Received remote response: " - << (response.isOK() ? response.toString() : response.status.toString()); - - callback->_callbackFn = - stdx::bind(remoteCommandFinished, stdx::placeholders::_1, cb, request, response); - _readyQueue.splice(_readyQueue.end(), _networkInProgressQueue, iter); -} - -StatusWith<ReplicationExecutor::CallbackHandle> ReplicationExecutor::scheduleRemoteCommand( - const RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb) { - RemoteCommandRequest scheduledRequest = request; - if (request.timeout == RemoteCommandRequest::kNoTimeout) { - scheduledRequest.expirationDate = RemoteCommandRequest::kNoExpirationDate; - } else { - scheduledRequest.expirationDate = _networkInterface->now() + scheduledRequest.timeout; - } - stdx::lock_guard<stdx::mutex> lk(_mutex); - StatusWith<CallbackHandle> handle = enqueueWork_inlock( - &_networkInProgressQueue, - stdx::bind(remoteCommandFailedEarly, stdx::placeholders::_1, cb, scheduledRequest)); - if (handle.isOK()) { - _getCallbackFromHandle(handle.getValue())->_iter->isNetworkOperation = true; - - LOG(4) << "Scheduling remote request: " << request.toString(); - - _networkInterface->startCommand( - handle.getValue(), - scheduledRequest, - stdx::bind(&ReplicationExecutor::_finishRemoteCommand, - this, - scheduledRequest, - stdx::placeholders::_1, - handle.getValue(), - _getCallbackFromHandle(handle.getValue())->_iter->generation, - cb)); - } - ++_counterScheduledCommands; - return handle; -} - -StatusWith<ReplicationExecutor::CallbackHandle> ReplicationExecutor::scheduleWork( - const CallbackFn& work) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - _networkInterface->signalWorkAvailable(); - const auto status = enqueueWork_inlock(&_readyQueue, work); - if (status.isOK()) { - ++_counterScheduledWorks; - } - return status; -} - -StatusWith<ReplicationExecutor::CallbackHandle> ReplicationExecutor::scheduleWorkAt( - Date_t when, const CallbackFn& work) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - WorkQueue temp; - StatusWith<CallbackHandle> cbHandle = enqueueWork_inlock(&temp, work); - if (!cbHandle.isOK()) - return cbHandle; - auto callback = _getCallbackFromHandle(cbHandle.getValue()); - callback->_iter->readyDate = when; - callback->_isSleeper = true; - WorkQueue::iterator insertBefore = _sleepersQueue.begin(); - while (insertBefore != _sleepersQueue.end() && insertBefore->readyDate <= when) - ++insertBefore; - _sleepersQueue.splice(insertBefore, temp, temp.begin()); - ++_counterScheduledWorkAts; - _networkInterface->signalWorkAvailable(); - return cbHandle; -} - -StatusWith<ReplicationExecutor::CallbackHandle> ReplicationExecutor::scheduleDBWork( - const CallbackFn& work) { - return scheduleDBWork(work, NamespaceString(), MODE_NONE); -} - -StatusWith<ReplicationExecutor::CallbackHandle> ReplicationExecutor::scheduleDBWork( - const CallbackFn& work, const NamespaceString& nss, LockMode mode) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - StatusWith<CallbackHandle> handle = enqueueWork_inlock(&_dbWorkInProgressQueue, work); - if (handle.isOK()) { - auto doOp = stdx::bind(&ReplicationExecutor::_doOperation, - this, - stdx::placeholders::_1, - stdx::placeholders::_2, - handle.getValue(), - &_dbWorkInProgressQueue, - nullptr); - auto task = [doOp](OperationContext* opCtx, const Status& status) { - makeNoExcept(stdx::bind(doOp, opCtx, status))(); - return TaskRunner::NextAction::kDisposeOperationContext; - }; - if (mode == MODE_NONE && nss.ns().empty()) { - _dblockTaskRunner.schedule(task); - } else { - _dblockTaskRunner.schedule(DatabaseTask::makeCollectionLockTask(task, nss, mode)); - } - } - ++_counterScheduledDBWorks; - return handle; -} - -void ReplicationExecutor::_doOperation(OperationContext* opCtx, - const Status& taskRunnerStatus, - const CallbackHandle& cbHandle, - WorkQueue* workQueue, - stdx::mutex* terribleExLockSyncMutex) { - stdx::unique_lock<stdx::mutex> lk(_mutex); - if (_inShutdown) - return; - Callback* callback = _getCallbackFromHandle(cbHandle); - const WorkQueue::iterator iter = callback->_iter; - callback->_isRemoved = true; - iter->callback = CallbackHandle(); - iter->isNetworkOperation = false; - _freeQueue.splice(_freeQueue.begin(), *workQueue, iter); - lk.unlock(); - { - std::unique_ptr<stdx::lock_guard<stdx::mutex>> terribleLock( - terribleExLockSyncMutex ? new stdx::lock_guard<stdx::mutex>(*terribleExLockSyncMutex) - : nullptr); - // Only possible task runner error status is CallbackCanceled. - callback->_callbackFn( - CallbackArgs(this, - cbHandle, - (callback->_isCanceled || !taskRunnerStatus.isOK() - ? Status(ErrorCodes::CallbackCanceled, "Callback canceled") - : Status::OK()), - opCtx)); - } - lk.lock(); - signalEvent_inlock(callback->_finishedEvent); -} - -StatusWith<ReplicationExecutor::CallbackHandle> -ReplicationExecutor::scheduleWorkWithGlobalExclusiveLock(const CallbackFn& work) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - StatusWith<CallbackHandle> handle = enqueueWork_inlock(&_exclusiveLockInProgressQueue, work); - if (handle.isOK()) { - auto doOp = stdx::bind(&ReplicationExecutor::_doOperation, - this, - stdx::placeholders::_1, - stdx::placeholders::_2, - handle.getValue(), - &_exclusiveLockInProgressQueue, - &_terribleExLockSyncMutex); - _dblockExclusiveLockTaskRunner.schedule(DatabaseTask::makeGlobalExclusiveLockTask( - [doOp](OperationContext* opCtx, const Status& status) { - makeNoExcept(stdx::bind(doOp, opCtx, status))(); - return TaskRunner::NextAction::kDisposeOperationContext; - })); - } - ++_counterScheduledExclusiveWorks; - return handle; -} - -void ReplicationExecutor::appendConnectionStats(executor::ConnectionPoolStats* stats) const { - _networkInterface->appendConnectionStats(stats); -} - -std::pair<ReplicationExecutor::WorkItem, ReplicationExecutor::CallbackHandle> -ReplicationExecutor::getWork() { - stdx::unique_lock<stdx::mutex> lk(_mutex); - while (true) { - const Date_t now = _networkInterface->now(); - Date_t nextWakeupDate = scheduleReadySleepers_inlock(now); - if (!_readyQueue.empty()) { - break; - } else if (_inShutdown) { - return std::make_pair(WorkItem(), CallbackHandle()); - } - lk.unlock(); - if (nextWakeupDate == Date_t::max()) { - _networkInterface->waitForWork(); - } else { - _networkInterface->waitForWorkUntil(nextWakeupDate); - } - lk.lock(); - } - const WorkItem work = *_readyQueue.begin(); - const CallbackHandle cbHandle = work.callback; - Callback* callback = _getCallbackFromHandle(cbHandle); - callback->_isRemoved = true; - _readyQueue.begin()->callback = CallbackHandle(); - _readyQueue.begin()->isNetworkOperation = false; - _freeQueue.splice(_freeQueue.begin(), _readyQueue, _readyQueue.begin()); - return std::make_pair(work, cbHandle); -} - -int64_t ReplicationExecutor::nextRandomInt64(int64_t limit) { - return _random.nextInt64(limit); -} - -Date_t ReplicationExecutor::scheduleReadySleepers_inlock(const Date_t now) { - WorkQueue::iterator iter = _sleepersQueue.begin(); - while ((iter != _sleepersQueue.end()) && (iter->readyDate <= now)) { - auto callback = ReplicationExecutor::_getCallbackFromHandle(iter->callback); - callback->_isSleeper = false; - ++iter; - } - _readyQueue.splice(_readyQueue.end(), _sleepersQueue, _sleepersQueue.begin(), iter); - if (iter == _sleepersQueue.end()) { - // indicate no sleeper to wait for - return Date_t::max(); - } - return iter->readyDate; -} - -StatusWith<ReplicationExecutor::CallbackHandle> ReplicationExecutor::enqueueWork_inlock( - WorkQueue* queue, const CallbackFn& callbackFn) { - invariant(callbackFn); - StatusWith<EventHandle> event = makeEvent_inlock(); - if (!event.isOK()) - return StatusWith<CallbackHandle>(event.getStatus()); - - if (_freeQueue.empty()) - _freeQueue.push_front(WorkItem()); - const WorkQueue::iterator iter = _freeQueue.begin(); - WorkItem& work = *iter; - - invariant(!work.callback.isValid()); - setCallbackForHandle(&work.callback, - std::shared_ptr<executor::TaskExecutor::CallbackState>( - new Callback(this, callbackFn, iter, event.getValue()))); - - work.generation++; - work.finishedEvent = event.getValue(); - work.readyDate = Date_t(); - queue->splice(queue->end(), _freeQueue, iter); - return StatusWith<CallbackHandle>(work.callback); -} - -void ReplicationExecutor::waitForDBWork_forTest() { - _dblockTaskRunner.join(); -} - -ReplicationExecutor::WorkItem::WorkItem() : generation(0U), isNetworkOperation(false) {} - -ReplicationExecutor::Event::Event(ReplicationExecutor* executor, const EventList::iterator& iter) - : executor::TaskExecutor::EventState(), _executor(executor), _isSignaled(false), _iter(iter) {} - -ReplicationExecutor::Event::~Event() {} - -void ReplicationExecutor::Event::signal() { - // Must go through executor to signal so that this can be removed from the _unsignaledEvents - // EventList. - _executor->signalEvent(*_iter); -} - -void ReplicationExecutor::Event::_signal_inlock() { - invariant(!_isSignaled); - _isSignaled = true; - - if (!_waiters.empty()) { - _executor->_readyQueue.splice(_executor->_readyQueue.end(), _waiters); - _executor->_networkInterface->signalWorkAvailable(); - } - - _isSignaledCondition.notify_all(); -} - -void ReplicationExecutor::Event::waitUntilSignaled() { - stdx::unique_lock<stdx::mutex> lk(_executor->_mutex); - ++_executor->_totalEventWaiters; - while (!_isSignaled) { - _isSignaledCondition.wait(lk); - } - --_executor->_totalEventWaiters; - _executor->maybeNotifyShutdownComplete_inlock(); -} - -bool ReplicationExecutor::Event::isSignaled() { - stdx::lock_guard<stdx::mutex> lk(_executor->_mutex); - return _isSignaled; -} - -ReplicationExecutor::Callback::Callback(ReplicationExecutor* executor, - const CallbackFn callbackFn, - const WorkQueue::iterator& iter, - const EventHandle& finishedEvent) - : executor::TaskExecutor::CallbackState(), - _executor(executor), - _callbackFn(callbackFn), - _isCanceled(false), - _isSleeper(false), - _isRemoved(false), - _iter(iter), - _finishedEvent(finishedEvent) {} - -ReplicationExecutor::Callback::~Callback() {} - -bool ReplicationExecutor::Callback::isCanceled() const { - stdx::unique_lock<stdx::mutex> lk(_executor->_mutex); - return _isCanceled; -} - -void ReplicationExecutor::Callback::cancel() { - stdx::unique_lock<stdx::mutex> lk(_executor->_mutex); - // If this element has already been removed from the queues, - // the cancel is too late and has no effect. - if (_isRemoved) - return; - - _isCanceled = true; - - if (_isSleeper) { - _isSleeper = false; - _executor->_readyQueue.splice( - _executor->_readyQueue.end(), _executor->_sleepersQueue, _iter); - } - - if (_iter->isNetworkOperation) { - lk.unlock(); - _executor->_networkInterface->cancelCommand(_iter->callback); - } -} - -void ReplicationExecutor::Callback::waitForCompletion() { - _executor->waitForEvent(_finishedEvent); -} - -ReplicationExecutor::Event* ReplicationExecutor::_getEventFromHandle( - const EventHandle& eventHandle) { - return static_cast<Event*>(getEventFromHandle(eventHandle)); -} - -ReplicationExecutor::Callback* ReplicationExecutor::_getCallbackFromHandle( - const CallbackHandle& callbackHandle) { - return static_cast<Callback*>(getCallbackFromHandle(callbackHandle)); -} - -namespace { - -void callNoExcept(const stdx::function<void()>& fn) { - try { - fn(); - } catch (...) { - auto status = exceptionToStatus(); - log() << "Exception thrown in ReplicationExecutor callback: " << status; - std::terminate(); - } -} - -stdx::function<void()> makeNoExcept(const stdx::function<void()>& fn) { - return stdx::bind(callNoExcept, fn); -} - -} // namespace - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/replication_executor.h b/src/mongo/db/repl/replication_executor.h deleted file mode 100644 index 2da3e3f3deb..00000000000 --- a/src/mongo/db/repl/replication_executor.h +++ /dev/null @@ -1,422 +0,0 @@ -/** - * Copyright (C) 2014 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <string> - -#include "mongo/base/disallow_copying.h" -#include "mongo/base/status.h" -#include "mongo/base/status_with.h" -#include "mongo/base/string_data.h" -#include "mongo/db/concurrency/lock_manager_defs.h" -#include "mongo/db/repl/task_runner.h" -#include "mongo/executor/task_executor.h" -#include "mongo/platform/compiler.h" -#include "mongo/platform/random.h" -#include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/functional.h" -#include "mongo/stdx/list.h" -#include "mongo/stdx/mutex.h" -#include "mongo/stdx/thread.h" -#include "mongo/util/concurrency/old_thread_pool.h" -#include "mongo/util/net/hostandport.h" -#include "mongo/util/time_support.h" - -namespace mongo { - -class BSONObjBuilder; -class NamespaceString; -class OperationContext; - -namespace executor { -struct ConnectionPoolStats; -class NetworkInterface; -} // namespace executor - -namespace repl { - -/** - * Implementation of the TaskExecutor interface for providing an event loop for driving state - * machines in replication. - * - * Usage: Instantiate an executor, schedule a work item, call run(). - * - * Implementation details: - * - * The executor is composed of several WorkQueues, which are queues of WorkItems. WorkItems - * describe units of work -- a callback and state needed to track its lifecycle. The iterators - * pointing to WorkItems are spliced between the WorkQueues, rather than copying WorkItems - * themselves. Further, those WorkQueue::iterators are never invalidated during the life of an - * executor. They may be recycled to represent new work items, but when that happens, a counter - * on the WorkItem is incremented, to disambiguate. - * - * All work executed by the run() method of the executor is popped off the front of the - * _readyQueue. Remote commands blocked on the network can be found in the - * _networkInProgressQueue. Callbacks waiting for a timer to expire are in the _sleepersQueue. - * When the network returns or the timer expires, items from these two queues are transferred to - * the back of the _readyQueue. - * - * The _exclusiveLockInProgressQueue, which represents work items to execute while holding the - * GlobalWrite lock, is exceptional. WorkItems in that queue execute in unspecified order with - * respect to work in the _readyQueue or other WorkItems in the _exclusiveLockInProgressQueue, - * but they are executed in a single serial order with respect to those other WorkItems. The - * _terribleExLockSyncMutex is used to provide this serialization, until such time as the global - * lock may be passed from one thread to another. - */ -class ReplicationExecutor final : public executor::TaskExecutor { - MONGO_DISALLOW_COPYING(ReplicationExecutor); - -public: - /** - * Constructs a new executor. - * - * Takes ownership of the passed NetworkInterface object. - */ - ReplicationExecutor(std::unique_ptr<executor::NetworkInterface> netInterface, int64_t pnrgSeed); - - /** - * Destroys an executor. - */ - virtual ~ReplicationExecutor(); - - BSONObj getDiagnosticBSON() const; - void appendDiagnosticBSON(BSONObjBuilder* b) const override; - Date_t now() override; - void startup() override; - void shutdown() override; - void join() override; - void signalEvent(const EventHandle& event) override; - StatusWith<EventHandle> makeEvent() override; - StatusWith<CallbackHandle> onEvent(const EventHandle& event, const CallbackFn& work) override; - void waitForEvent(const EventHandle& event) override; - StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) override; - StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) override; - StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request, - const RemoteCommandCallbackFn& cb) override; - void cancel(const CallbackHandle& cbHandle) override; - void wait(const CallbackHandle& cbHandle) override; - - void appendConnectionStats(executor::ConnectionPoolStats* stats) const override; - - /** - * Executes the run loop. May be called up to one time. - * - * Doesn't need to be public, so do not call directly unless from unit-tests. - * - * Returns after the executor has been shutdown and is safe to delete. - */ - void run(); - - /** - * Schedules DB "work" to be run by the executor.. - * - * Takes no locks for caller - global, database or collection. - * - * The "work" will run exclusively with other DB work items. All DB work items - * are run the in order they are scheduled. - * - * The "work" may run concurrently with other non-DB work items, - * but there are no ordering guarantees provided with respect to - * any other work item. - * - * Returns a handle for waiting on or canceling the callback, or - * ErrorCodes::ShutdownInProgress. - * - * May be called by client threads or callbacks running in the executor. - */ - StatusWith<CallbackHandle> scheduleDBWork(const CallbackFn& work); - - /** - * Schedules DB "work" to be run by the executor while holding the collection lock. - * - * Takes collection lock in specified mode (and slightly more permissive lock for the - * database lock) but not the global exclusive lock. - * - * The "work" will run exclusively with other DB work items. All DB work items - * are run the in order they are scheduled. - * - * The "work" may run concurrently with other non-DB work items, - * but there are no ordering guarantees provided with respect to - * any other work item. - * - * Returns a handle for waiting on or canceling the callback, or - * ErrorCodes::ShutdownInProgress. - * - * May be called by client threads or callbacks running in the executor. - */ - StatusWith<CallbackHandle> scheduleDBWork(const CallbackFn& work, - const NamespaceString& nss, - LockMode mode); - - /** - * Schedules "work" to be run by the executor while holding the global exclusive lock. - * - * Takes collection lock in specified mode (and slightly more permissive lock for the - * database lock) but not the global exclusive lock. - * - * The "work" will run exclusively, as though it were executed by the main - * run loop, but there are no ordering guarantees provided with respect to - * any other work item. - * - * Returns a handle for waiting on or canceling the callback, or - * ErrorCodes::ShutdownInProgress. - * - * May be called by client threads or callbacks running in the executor. - */ - StatusWith<CallbackHandle> scheduleWorkWithGlobalExclusiveLock(const CallbackFn& work); - - /** - * Returns an int64_t generated by the prng with a max value of "limit". - */ - int64_t nextRandomInt64(int64_t limit); - - /** - * Wait until DB worker thread is not active. Test only. - * - * Usually NetworkInterfaceMock::runReadyNetworkOperations() is called before and after this - * function to ensure the synchronization of executor thread and DB worker thread. - */ - void waitForDBWork_forTest(); - -private: - class Callback; - class Event; - struct WorkItem; - friend class Callback; - friend class Event; - - - /** - * A linked list of WorkItem objects. - * - * WorkItems get moved among lists by splicing iterators of work lists together, - * not by copying underlying WorkItem objects. - */ - typedef stdx::list<WorkItem> WorkQueue; - - /** - * A linked list of EventHandles. - */ - typedef stdx::list<EventHandle> EventList; - - /** - * Implementation of makeEvent() for use when _mutex is already held. - */ - StatusWith<EventHandle> makeEvent_inlock(); - - /** - * Implementation of signalEvent() for use when _mutex is already held. - */ - void signalEvent_inlock(const EventHandle&); - - /** - * Gets a single piece of work to execute. - * - * If the "callback" member of the returned WorkItem is falsey, that is a signal - * to the run loop to wait for shutdown. - */ - std::pair<WorkItem, CallbackHandle> getWork(); - - /** - * Marks as runnable any sleepers whose ready date has passed as of "now". - * Returns the date when the next sleeper will be ready, or Date_t(~0ULL) if there are no - * remaining sleepers. - */ - Date_t scheduleReadySleepers_inlock(Date_t now); - - /** - * Enqueues "callback" into "queue". - */ - StatusWith<CallbackHandle> enqueueWork_inlock(WorkQueue* queue, const CallbackFn& callback); - - /** - * Notifies interested parties that shutdown has completed, if it has. - */ - void maybeNotifyShutdownComplete_inlock(); - - /** - * Completes the shutdown process. Called by run(). - */ - void finishShutdown(); - - void _finishRemoteCommand(const executor::RemoteCommandRequest& request, - const executor::RemoteCommandResponse& response, - const CallbackHandle& cbHandle, - const uint64_t expectedHandleGeneration, - const RemoteCommandCallbackFn& cb); - - /** - * Executes the callback referenced by "cbHandle", and moves the underlying - * WorkQueue::iterator from "workQueue" into the _freeQueue. - * - * "opCtx" is a pointer to the OperationContext. - * - * "status" is the callback status from the task runner. Only possible values are - * Status::OK and ErrorCodes::CallbackCanceled (when task runner is canceled). - * - * If "terribleExLockSyncMutex" is not null, serializes execution of "cbHandle" with the - * execution of other callbacks. - */ - void _doOperation(OperationContext* opCtx, - const Status& taskRunnerStatus, - const CallbackHandle& cbHandle, - WorkQueue* workQueue, - stdx::mutex* terribleExLockSyncMutex); - - /** - * Wrapper around TaskExecutor::getCallbackFromHandle that return an Event* instead of - * a generic EventState*. - */ - Event* _getEventFromHandle(const EventHandle& eventHandle); - - /** - * Wrapper around TaskExecutor::getCallbackFromHandle that return an Event* instead of - * a generic EventState*. - */ - Callback* _getCallbackFromHandle(const CallbackHandle& callbackHandle); - - // PRNG; seeded at class construction time. - PseudoRandom _random; - - std::unique_ptr<executor::NetworkInterface> _networkInterface; - - // Thread which executes the run method. Started by startup and must be jointed after shutdown. - stdx::thread _executorThread; - - mutable stdx::mutex _mutex; - stdx::mutex _terribleExLockSyncMutex; - stdx::condition_variable _noMoreWaitingThreads; - WorkQueue _freeQueue; - WorkQueue _readyQueue; - WorkQueue _dbWorkInProgressQueue; - WorkQueue _exclusiveLockInProgressQueue; - WorkQueue _networkInProgressQueue; - WorkQueue _sleepersQueue; - EventList _unsignaledEvents; - int64_t _totalEventWaiters = 0; - - // Counters for metrics, for the whole life of this instance, protected by _mutex. - int64_t _counterWaitEvents = 0; - int64_t _counterCreatedEvents = 0; - int64_t _counterScheduledCommands = 0; - int64_t _counterScheduledExclusiveWorks = 0; - int64_t _counterScheduledDBWorks = 0; - int64_t _counterScheduledWorks = 0; - int64_t _counterScheduledWorkAts = 0; - int64_t _counterSchedulingFailures = 0; - int64_t _counterCancels = 0; - int64_t _counterWaits = 0; - - bool _inShutdown; - OldThreadPool _dblockWorkers; - TaskRunner _dblockTaskRunner; - TaskRunner _dblockExclusiveLockTaskRunner; - uint64_t _nextId = 0; -}; - -class ReplicationExecutor::Callback : public executor::TaskExecutor::CallbackState { - friend class ReplicationExecutor; - -public: - Callback(ReplicationExecutor* executor, - const CallbackFn callbackFn, - const WorkQueue::iterator& iter, - const EventHandle& finishedEvent); - virtual ~Callback(); - - void cancel() override; - void waitForCompletion() override; - bool isCanceled() const override; - -private: - ReplicationExecutor* _executor; - - // All members other than _executor are protected by the executor's _mutex. - CallbackFn _callbackFn; - bool _isCanceled; - bool _isSleeper; - bool _isRemoved; - WorkQueue::iterator _iter; - EventHandle _finishedEvent; -}; - -/** - * Description of a scheduled but not-yet-run work item. - * - * Once created, WorkItem objects remain in scope until the executor is destroyed. - * However, over their lifetime, they may represent many different work items. This - * divorces the lifetime of CallbackHandles from the lifetime of WorkItem objects, but - * requires a unique generation identifier in CallbackHandles and WorkItem objects. - * - * WorkItem is copyable so that it may be stored in a list. However, in practice they - * should only be copied by getWork() and when allocating new entries into a WorkQueue (not - * when moving entries between work lists). - */ -struct ReplicationExecutor::WorkItem { - WorkItem(); - uint64_t generation; - CallbackHandle callback; - EventHandle finishedEvent; - Date_t readyDate; - bool isNetworkOperation; -}; - -/** - * Description of an event. - * - * Like WorkItem, above, but for events. On signaling, the executor removes the event from the - * "unsignaled" EventList and schedules all work items in the _waiters list. - */ -class ReplicationExecutor::Event : public executor::TaskExecutor::EventState { - friend class ReplicationExecutor; - -public: - Event(ReplicationExecutor* executor, const EventList::iterator& iter); - virtual ~Event(); - - void signal() override; - void waitUntilSignaled() override; - bool isSignaled() override; - -private: - // Note that the caller is responsible for removing any references to any EventHandles - // pointing to this event. - void _signal_inlock(); - - ReplicationExecutor* _executor; - - // All members other than _executor are protected by the executor's _mutex. - bool _isSignaled; - stdx::condition_variable _isSignaledCondition; - EventList::iterator _iter; - WorkQueue _waiters; -}; - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/replication_executor_test.cpp b/src/mongo/db/repl/replication_executor_test.cpp deleted file mode 100644 index e0159598eb6..00000000000 --- a/src/mongo/db/repl/replication_executor_test.cpp +++ /dev/null @@ -1,373 +0,0 @@ -/** - * Copyright (C) 2014 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include <map> - -#include "mongo/base/init.h" -#include "mongo/db/bson/dotted_path_support.h" -#include "mongo/db/client.h" -#include "mongo/db/namespace_string.h" -#include "mongo/db/operation_context.h" -#include "mongo/db/repl/replication_executor.h" -#include "mongo/db/repl/replication_executor_test_fixture.h" -#include "mongo/executor/network_interface_mock.h" -#include "mongo/executor/task_executor_test_common.h" -#include "mongo/stdx/functional.h" -#include "mongo/stdx/memory.h" -#include "mongo/stdx/thread.h" -#include "mongo/unittest/barrier.h" -#include "mongo/unittest/unittest.h" -#include "mongo/util/assert_util.h" -#include "mongo/util/map_util.h" - -namespace mongo { -namespace repl { - -namespace { - -using executor::NetworkInterfaceMock; -using unittest::assertGet; - -namespace dps = ::mongo::dotted_path_support; - -const int64_t prngSeed = 1; - -MONGO_INITIALIZER(ReplExecutorCommonTests)(InitializerContext*) { - mongo::executor::addTestsForExecutor( - "ReplicationExecutorCommon", [](std::unique_ptr<executor::NetworkInterfaceMock> net) { - return stdx::make_unique<ReplicationExecutor>(std::move(net), prngSeed); - }); - return Status::OK(); -} - -TEST_F(ReplicationExecutorTest, ScheduleDBWorkAndExclusiveWorkConcurrently) { - unittest::Barrier barrier(2U); - NamespaceString nss("mydb", "mycoll"); - ReplicationExecutor& executor = getReplExecutor(); - Status status1 = getDetectableErrorStatus(); - OperationContext* opCtx = nullptr; - using CallbackData = ReplicationExecutor::CallbackArgs; - ASSERT_OK(executor - .scheduleDBWork([&](const CallbackData& cbData) { - status1 = cbData.status; - opCtx = cbData.opCtx; - barrier.countDownAndWait(); - if (cbData.status != ErrorCodes::CallbackCanceled) - cbData.executor->shutdown(); - }) - .getStatus()); - ASSERT_OK(executor - .scheduleWorkWithGlobalExclusiveLock( - [&](const CallbackData& cbData) { barrier.countDownAndWait(); }) - .getStatus()); - executor.startup(); - executor.join(); - ASSERT_OK(status1); - ASSERT(opCtx); -} - -TEST_F(ReplicationExecutorTest, ScheduleDBWorkWithCollectionLock) { - NamespaceString nss("mydb", "mycoll"); - ReplicationExecutor& executor = getReplExecutor(); - Status status1 = getDetectableErrorStatus(); - OperationContext* opCtx = nullptr; - bool collectionIsLocked = false; - using CallbackData = ReplicationExecutor::CallbackArgs; - ASSERT_OK(executor - .scheduleDBWork( - [&](const CallbackData& cbData) { - status1 = cbData.status; - opCtx = cbData.opCtx; - collectionIsLocked = opCtx - ? opCtx->lockState()->isCollectionLockedForMode(nss.ns(), MODE_X) - : false; - if (cbData.status != ErrorCodes::CallbackCanceled) - cbData.executor->shutdown(); - }, - nss, - MODE_X) - .getStatus()); - executor.startup(); - executor.join(); - ASSERT_OK(status1); - ASSERT(opCtx); - ASSERT_TRUE(collectionIsLocked); -} - -TEST_F(ReplicationExecutorTest, ScheduleExclusiveLockOperation) { - ReplicationExecutor& executor = getReplExecutor(); - Status status1 = getDetectableErrorStatus(); - OperationContext* opCtx = nullptr; - bool lockIsW = false; - using CallbackData = ReplicationExecutor::CallbackArgs; - ASSERT_OK(executor - .scheduleWorkWithGlobalExclusiveLock([&](const CallbackData& cbData) { - status1 = cbData.status; - opCtx = cbData.opCtx; - lockIsW = opCtx ? opCtx->lockState()->isW() : false; - if (cbData.status != ErrorCodes::CallbackCanceled) - cbData.executor->shutdown(); - }) - .getStatus()); - executor.startup(); - executor.join(); - ASSERT_OK(status1); - ASSERT(opCtx); - ASSERT_TRUE(lockIsW); -} - -TEST_F(ReplicationExecutorTest, ShutdownBeforeRunningSecondExclusiveLockOperation) { - ReplicationExecutor& executor = getReplExecutor(); - using CallbackData = ReplicationExecutor::CallbackArgs; - Status status1 = getDetectableErrorStatus(); - ASSERT_OK(executor - .scheduleWorkWithGlobalExclusiveLock([&](const CallbackData& cbData) { - status1 = cbData.status; - if (cbData.status != ErrorCodes::CallbackCanceled) - cbData.executor->shutdown(); - }) - .getStatus()); - // Second db work item is invoked by the main executor thread because the work item is - // moved from the exclusive lock queue to the ready work item queue when the first callback - // cancels the executor. - Status status2 = getDetectableErrorStatus(); - ASSERT_OK(executor - .scheduleWorkWithGlobalExclusiveLock([&](const CallbackData& cbData) { - status2 = cbData.status; - if (cbData.status != ErrorCodes::CallbackCanceled) - cbData.executor->shutdown(); - }) - .getStatus()); - executor.startup(); - executor.join(); - ASSERT_OK(status1); - ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status2.code()); -} - -TEST_F(ReplicationExecutorTest, CancelBeforeRunningFutureWork) { - ReplicationExecutor& executor = getReplExecutor(); - using CallbackData = ReplicationExecutor::CallbackArgs; - Status status1 = getDetectableErrorStatus(); - auto cbhWithStatus = executor.scheduleWorkAt( - executor.now() + Milliseconds(1000), [&](const CallbackData& cbData) { - status1 = cbData.status; - if (cbData.status != ErrorCodes::CallbackCanceled) - cbData.executor->shutdown(); - }); - ASSERT_OK(cbhWithStatus.getStatus()); - - ASSERT_EQUALS(1, - dps::extractElementAtPath(executor.getDiagnosticBSON(), "queues.sleepers").Int()); - ASSERT_EQUALS(0, dps::extractElementAtPath(executor.getDiagnosticBSON(), "queues.ready").Int()); - executor.cancel(cbhWithStatus.getValue()); - - ASSERT_EQUALS(0, - dps::extractElementAtPath(executor.getDiagnosticBSON(), "queues.sleepers").Int()); - ASSERT_EQUALS(1, dps::extractElementAtPath(executor.getDiagnosticBSON(), "queues.ready").Int()); -} - -// Equivalent to EventChainAndWaitingTest::onGo -TEST_F(ReplicationExecutorTest, ScheduleCallbackOnFutureEvent) { - launchExecutorThread(); - getNet()->exitNetwork(); - - ReplicationExecutor& executor = getReplExecutor(); - // We signal this "ping" event and the executor will signal "pong" event in return. - auto ping = assertGet(executor.makeEvent()); - auto pong = assertGet(executor.makeEvent()); - auto fn = [&executor, pong](const ReplicationExecutor::CallbackArgs& cbData) { - ASSERT_OK(cbData.status); - executor.signalEvent(pong); - }; - - // Wait for a future event. - executor.onEvent(ping, fn); - ASSERT_EQUALS(0, dps::extractElementAtPath(executor.getDiagnosticBSON(), "queues.ready").Int()); - executor.signalEvent(ping); - executor.waitForEvent(pong); -} - -// Equivalent to EventChainAndWaitingTest::onGoAfterTriggered -TEST_F(ReplicationExecutorTest, ScheduleCallbackOnSignaledEvent) { - launchExecutorThread(); - getNet()->exitNetwork(); - - ReplicationExecutor& executor = getReplExecutor(); - // We signal this "ping" event and the executor will signal "pong" event in return. - auto ping = assertGet(executor.makeEvent()); - auto pong = assertGet(executor.makeEvent()); - auto fn = [&executor, pong](const ReplicationExecutor::CallbackArgs& cbData) { - ASSERT_OK(cbData.status); - executor.signalEvent(pong); - }; - - // Wait for a signaled event. - executor.signalEvent(ping); - executor.onEvent(ping, fn); - executor.waitForEvent(pong); -} - -TEST_F(ReplicationExecutorTest, ScheduleCallbackAtNow) { - launchExecutorThread(); - getNet()->exitNetwork(); - - ReplicationExecutor& executor = getReplExecutor(); - auto finishEvent = assertGet(executor.makeEvent()); - auto fn = [&executor, finishEvent](const ReplicationExecutor::CallbackArgs& cbData) { - ASSERT_OK(cbData.status); - executor.signalEvent(finishEvent); - }; - - auto cb = executor.scheduleWorkAt(getNet()->now(), fn); - executor.waitForEvent(finishEvent); -} - -TEST_F(ReplicationExecutorTest, ScheduleCallbackAtAFutureTime) { - launchExecutorThread(); - getNet()->exitNetwork(); - - ReplicationExecutor& executor = getReplExecutor(); - auto finishEvent = assertGet(executor.makeEvent()); - auto fn = [&executor, finishEvent](const ReplicationExecutor::CallbackArgs& cbData) { - ASSERT_OK(cbData.status); - executor.signalEvent(finishEvent); - }; - - auto now = getNet()->now(); - now += Milliseconds(1000); - auto cb = executor.scheduleWorkAt(now, fn); - - getNet()->enterNetwork(); - getNet()->runUntil(now); - getNet()->exitNetwork(); - - executor.waitForEvent(finishEvent); -} - -TEST_F(ReplicationExecutorTest, CallbacksAreInvokedOnClientThreads) { - launchExecutorThread(); - getNet()->exitNetwork(); - - ReplicationExecutor& executor = getReplExecutor(); - auto status = getDetectableErrorStatus(); - bool haveClientInCallback = false; - auto fn = [&haveClientInCallback, &status](const ReplicationExecutor::CallbackArgs& cbData) { - status = cbData.status; - haveClientInCallback = haveClient(); - }; - - ASSERT_NOT_OK(status); - auto cb = unittest::assertGet(executor.scheduleWork(fn)); - executor.wait(cb); - - ASSERT_OK(status); - ASSERT_TRUE(haveClientInCallback); -} - -TEST_F(ReplicationExecutorTest, TestForCancelRace) { - launchExecutorThread(); - getNet()->exitNetwork(); - - unittest::Barrier enterCallback(2U), runCallback(2U); - - ReplicationExecutor& executor = getReplExecutor(); - bool firstEventDone = false; - bool firstEventCanceled = false; - auto fn = [&executor, &enterCallback, &runCallback, &firstEventDone, &firstEventCanceled]( - const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) { - // This barrier lets the test code wait until we're in the callback. - enterCallback.countDownAndWait(); - // This barrier lets the test code keep us in the callback until it has run the cancel. - runCallback.countDownAndWait(); - firstEventCanceled = !cbData.response.status.isOK(); - firstEventDone = true; - }; - - // First, schedule a network event to run. - const executor::RemoteCommandRequest request( - HostAndPort("test1", 1234), "mydb", BSON("nothing" << 0), nullptr); - auto firstCallback = assertGet(executor.scheduleRemoteCommand(request, fn)); - - // Now let the request happen. - // We need to run the network on another thread, because the test - // fixture will hang waiting for the callbacks to complete. - auto timeThread = stdx::thread([this] { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - ASSERT(getNet()->hasReadyRequests()); - auto noi = getNet()->getNextReadyRequest(); - getNet()->scheduleSuccessfulResponse(noi, {}); - getNet()->runReadyNetworkOperations(); - }); - - // Wait until we're in the callback. - enterCallback.countDownAndWait(); - - // Schedule a different network event to run. - bool secondEventDone = false; - bool secondEventCanceled = false; - auto fn2 = [&executor, &secondEventDone, &secondEventCanceled]( - const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) { - secondEventCanceled = !cbData.response.status.isOK(); - secondEventDone = true; - }; - auto secondCallback = assertGet(executor.scheduleRemoteCommand(request, fn2)); - ASSERT_FALSE(firstEventDone); // The first event should be stuck at runCallback barrier. - // Cancel the first callback. This cancel should have no effect as the callback has - // already been started. - executor.cancel(firstCallback); - - // Let the first callback continue to completion. - runCallback.countDownAndWait(); - - // Now the time thread can continue. - timeThread.join(); - - // The first event should be done, the second event should be pending. - ASSERT(firstEventDone); - ASSERT_FALSE(secondEventDone); - - // Run the network thread, which should run the second request. - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - // The second request should be ready. - ASSERT(getNet()->hasReadyRequests()) << "Second request is not ready (cancelled?)"; - auto noi = getNet()->getNextReadyRequest(); - getNet()->scheduleSuccessfulResponse(noi, {}); - getNet()->runReadyNetworkOperations(); - } - - // The second callback should have run without being canceled. - ASSERT_TRUE(secondEventDone); - ASSERT_FALSE(secondEventCanceled); -} - -} // namespace -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/replication_executor_test_fixture.cpp b/src/mongo/db/repl/replication_executor_test_fixture.cpp deleted file mode 100644 index bdc1f976e9e..00000000000 --- a/src/mongo/db/repl/replication_executor_test_fixture.cpp +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/db/repl/replication_executor_test_fixture.h" - -#include "mongo/db/repl/replication_executor.h" -#include "mongo/db/repl/storage_interface_mock.h" -#include "mongo/executor/network_interface_mock.h" - -namespace mongo { -namespace repl { - -namespace { - -const int64_t prngSeed = 1; - -} // namespace - -ReplicationExecutor& ReplicationExecutorTest::getReplExecutor() { - return dynamic_cast<ReplicationExecutor&>(getExecutor()); -} - -void ReplicationExecutorTest::postExecutorThreadLaunch() { - getNet()->enterNetwork(); -} - -std::unique_ptr<executor::TaskExecutor> ReplicationExecutorTest::makeTaskExecutor( - std::unique_ptr<executor::NetworkInterfaceMock> net) { - return stdx::make_unique<ReplicationExecutor>(std::move(net), prngSeed); -} - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/replication_executor_test_fixture.h b/src/mongo/db/repl/replication_executor_test_fixture.h deleted file mode 100644 index 5d106f8b864..00000000000 --- a/src/mongo/db/repl/replication_executor_test_fixture.h +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include "mongo/db/repl/replication_executor.h" -#include "mongo/executor/task_executor_test_fixture.h" -#include "mongo/stdx/memory.h" -#include "mongo/unittest/unittest.h" - -namespace mongo { - -namespace executor { -class NetworkInterfaceMock; -class TaskExecutor; -} // namespace executor - -namespace repl { - -class ReplicationExecutor; - -/** - * Test fixture for tests that require a ReplicationExecutor backed by - * a NetworkInterfaceMock. - */ -class ReplicationExecutorTest : public executor::TaskExecutorTest { -protected: - ReplicationExecutor& getReplExecutor(); - - /** - * Anything that needs to be done after launchExecutorThread should go in here. - */ - void postExecutorThreadLaunch() override; - -private: - std::unique_ptr<executor::TaskExecutor> makeTaskExecutor( - std::unique_ptr<executor::NetworkInterfaceMock> net) override; - - std::unique_ptr<ReplicationExecutor> _executor; - bool _executorStarted{false}; -}; - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/executor/thread_pool_mock.cpp b/src/mongo/executor/thread_pool_mock.cpp index ab19d6994c8..b3e1b84a836 100644 --- a/src/mongo/executor/thread_pool_mock.cpp +++ b/src/mongo/executor/thread_pool_mock.cpp @@ -126,7 +126,7 @@ void ThreadPoolMock::consumeTasks(stdx::unique_lock<stdx::mutex>* lk) { invariant(_tasks.empty()); - while (!_joining) { + while (_started && !_joining) { lk->unlock(); _net->waitForWork(); lk->lock(); |