diff options
author | Benety Goh <benety@mongodb.com> | 2016-04-15 11:45:36 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2016-04-18 15:46:12 -0400 |
commit | 2780aef9a4f76ab54f00bb7000e4ecc0219143ec (patch) | |
tree | 4653546be915f6a05f45fd5264e0513420a3db95 | |
parent | 6811d7c70d13559c3162e8f4333c3c061e5a5220 (diff) | |
download | mongo-2780aef9a4f76ab54f00bb7000e4ecc0219143ec.tar.gz |
SERVER-18038 ReplicationExecutor no longer requires StorageInterface to create operation context.
25 files changed, 58 insertions, 209 deletions
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index 01578147cc8..21e657c9200 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -905,7 +905,6 @@ MONGO_INITIALIZER_WITH_PREREQUISITES(CreateReplicationManager, getGlobalReplSettings(), new repl::ReplicationCoordinatorExternalStateImpl, executor::makeNetworkInterface("NetworkInterfaceASIO-Replication").release(), - new repl::StorageInterfaceImpl{}, new repl::TopologyCoordinatorImpl(topoCoordOptions), static_cast<int64_t>(curTimeMillis64())); auto serviceContext = getGlobalServiceContext(); diff --git a/src/mongo/db/repl/check_quorum_for_config_change_test.cpp b/src/mongo/db/repl/check_quorum_for_config_change_test.cpp index 493cbbe2380..b8da58e4372 100644 --- a/src/mongo/db/repl/check_quorum_for_config_change_test.cpp +++ b/src/mongo/db/repl/check_quorum_for_config_change_test.cpp @@ -37,11 +37,11 @@ #include "mongo/db/repl/repl_set_heartbeat_response.h" #include "mongo/db/repl/replica_set_config.h" #include "mongo/db/repl/replication_executor.h" -#include "mongo/db/repl/storage_interface_mock.h" #include "mongo/executor/network_interface_mock.h" #include "mongo/platform/unordered_set.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/stdx/functional.h" +#include "mongo/stdx/memory.h" #include "mongo/stdx/thread.h" #include "mongo/unittest/unittest.h" #include "mongo/util/net/hostandport.h" @@ -77,7 +77,6 @@ protected: bool isQuorumCheckDone(); NetworkInterfaceMock* _net; - StorageInterfaceMock* _storage; std::unique_ptr<ReplicationExecutor> _executor; private: @@ -99,8 +98,7 @@ CheckQuorumTest::CheckQuorumTest() void CheckQuorumTest::setUp() { _net = new NetworkInterfaceMock; - _storage = new StorageInterfaceMock; - _executor.reset(new ReplicationExecutor(_net, _storage, 1 /* prng */)); + _executor = stdx::make_unique<ReplicationExecutor>(_net, 1 /* prng seed */); _executorThread.reset(new stdx::thread(stdx::bind(&ReplicationExecutor::run, _executor.get()))); } diff --git a/src/mongo/db/repl/database_task_test.cpp b/src/mongo/db/repl/database_task_test.cpp index f156cdeb62c..962226b4e36 100644 --- a/src/mongo/db/repl/database_task_test.cpp +++ b/src/mongo/db/repl/database_task_test.cpp @@ -45,15 +45,7 @@ const std::string databaseName = "mydb"; const std::string collectionName = "mycoll"; const NamespaceString nss(databaseName, collectionName); -class DatabaseTaskTest : public TaskRunnerTest { -public: - ServiceContext::UniqueOperationContext createOperationContext(Client* client) const override; -}; - -ServiceContext::UniqueOperationContext DatabaseTaskTest::createOperationContext( - Client* client) const { - return client->makeOperationContext(); -} +class DatabaseTaskTest : public TaskRunnerTest {}; TEST_F(DatabaseTaskTest, TaskRunnerErrorStatus) { // Should not attempt to acquire lock on error status from task runner. diff --git a/src/mongo/db/repl/elect_cmd_runner_test.cpp b/src/mongo/db/repl/elect_cmd_runner_test.cpp index 5635f177cc0..1b4a82902c2 100644 --- a/src/mongo/db/repl/elect_cmd_runner_test.cpp +++ b/src/mongo/db/repl/elect_cmd_runner_test.cpp @@ -34,9 +34,9 @@ #include "mongo/db/repl/member_heartbeat_data.h" #include "mongo/db/repl/replica_set_config.h" #include "mongo/db/repl/replication_executor.h" -#include "mongo/db/repl/storage_interface_mock.h" #include "mongo/executor/network_interface_mock.h" #include "mongo/stdx/functional.h" +#include "mongo/stdx/memory.h" #include "mongo/stdx/thread.h" #include "mongo/unittest/unittest.h" @@ -68,7 +68,6 @@ public: const std::vector<HostAndPort>& hosts); NetworkInterfaceMock* _net; - StorageInterfaceMock* _storage; std::unique_ptr<ReplicationExecutor> _executor; std::unique_ptr<stdx::thread> _executorThread; @@ -81,8 +80,7 @@ private: void ElectCmdRunnerTest::setUp() { _net = new NetworkInterfaceMock; - _storage = new StorageInterfaceMock; - _executor.reset(new ReplicationExecutor(_net, _storage, 1 /* prng seed */)); + _executor = stdx::make_unique<ReplicationExecutor>(_net, 1 /* prng seed */); _executorThread.reset(new stdx::thread(stdx::bind(&ReplicationExecutor::run, _executor.get()))); } diff --git a/src/mongo/db/repl/freshness_checker_test.cpp b/src/mongo/db/repl/freshness_checker_test.cpp index 9f20ec0101c..45fde9ef816 100644 --- a/src/mongo/db/repl/freshness_checker_test.cpp +++ b/src/mongo/db/repl/freshness_checker_test.cpp @@ -34,10 +34,10 @@ #include "mongo/db/repl/member_heartbeat_data.h" #include "mongo/db/repl/replica_set_config.h" #include "mongo/db/repl/replication_executor.h" -#include "mongo/db/repl/storage_interface_mock.h" #include "mongo/executor/network_interface_mock.h" #include "mongo/platform/unordered_set.h" #include "mongo/stdx/functional.h" +#include "mongo/stdx/memory.h" #include "mongo/stdx/thread.h" #include "mongo/unittest/unittest.h" #include "mongo/util/mongoutils/str.h" @@ -73,7 +73,6 @@ protected: } NetworkInterfaceMock* _net; - StorageInterfaceMock* _storage; std::unique_ptr<ReplicationExecutor> _executor; std::unique_ptr<stdx::thread> _executorThread; @@ -92,8 +91,7 @@ private: void FreshnessCheckerTest::setUp() { _net = new NetworkInterfaceMock; - _storage = new StorageInterfaceMock; - _executor.reset(new ReplicationExecutor(_net, _storage, 1 /* prng seed */)); + _executor = stdx::make_unique<ReplicationExecutor>(_net, 1 /* prng seed */); _executorThread.reset(new stdx::thread(stdx::bind(&ReplicationExecutor::run, _executor.get()))); _checker.reset(new FreshnessChecker); } diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index a387b98e863..d3b3c74e095 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -241,14 +241,12 @@ ReplicationCoordinatorImpl::ReplicationCoordinatorImpl( TopologyCoordinator* topCoord, int64_t prngSeed, NetworkInterface* network, - StorageInterface* storage, ReplicationExecutor* replExec, stdx::function<bool()>* isDurableStorageEngineFn) : _settings(settings), _replMode(getReplicationModeFromSettings(settings)), _topCoord(topCoord), - _replExecutorIfOwned(replExec ? nullptr - : new ReplicationExecutor(network, storage, prngSeed)), + _replExecutorIfOwned(replExec ? nullptr : new ReplicationExecutor(network, prngSeed)), _replExecutor(replExec ? *replExec : *_replExecutorIfOwned), _externalState(externalState), _inShutdown(false), @@ -286,11 +284,10 @@ ReplicationCoordinatorImpl::ReplicationCoordinatorImpl( const ReplSettings& settings, ReplicationCoordinatorExternalState* externalState, NetworkInterface* network, - StorageInterface* storage, TopologyCoordinator* topCoord, int64_t prngSeed) : ReplicationCoordinatorImpl( - settings, externalState, topCoord, prngSeed, network, storage, nullptr, nullptr) {} + settings, externalState, topCoord, prngSeed, network, nullptr, nullptr) {} ReplicationCoordinatorImpl::ReplicationCoordinatorImpl( const ReplSettings& settings, @@ -304,7 +301,6 @@ ReplicationCoordinatorImpl::ReplicationCoordinatorImpl( topCoord, prngSeed, nullptr, - nullptr, replExec, isDurableStorageEngineFn) {} diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index bd2e06b8a7c..089b43ff194 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -44,7 +44,6 @@ #include "mongo/db/repl/replication_coordinator_external_state.h" #include "mongo/db/repl/replication_executor.h" #include "mongo/db/repl/sync_source_resolver.h" -#include "mongo/db/repl/storage_interface.h" #include "mongo/db/repl/topology_coordinator.h" #include "mongo/db/repl/update_position_args.h" #include "mongo/db/service_context.h" @@ -98,7 +97,6 @@ public: ReplicationCoordinatorImpl(const ReplSettings& settings, ReplicationCoordinatorExternalState* externalState, executor::NetworkInterface* network, - StorageInterface* storage, TopologyCoordinator* topoCoord, int64_t prngSeed); // Takes ownership of the "externalState" and "topCoord" objects. @@ -455,7 +453,6 @@ private: TopologyCoordinator* topCoord, int64_t prngSeed, executor::NetworkInterface* network, - StorageInterface* storage, ReplicationExecutor* replExec, stdx::function<bool()>* isDurableStorageEngineFn); /** diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp index 1e7f8f07e71..9258f540305 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp @@ -44,6 +44,7 @@ #include "mongo/db/repl/topology_coordinator_impl.h" #include "mongo/executor/network_interface_mock.h" #include "mongo/stdx/functional.h" +#include "mongo/stdx/memory.h" #include "mongo/unittest/unittest.h" #include "mongo/util/log.h" @@ -115,8 +116,7 @@ void ReplCoordTest::init() { _topo = new TopologyCoordinatorImpl(settings); stdx::function<bool()> _durablityLambda = [this]() -> bool { return _isStorageEngineDurable; }; _net = new NetworkInterfaceMock; - _storage = new StorageInterfaceMock; - _replExec.reset(new ReplicationExecutor(_net, _storage, seed)); + _replExec = stdx::make_unique<ReplicationExecutor>(_net, seed); _externalState = new ReplicationCoordinatorExternalStateMock; _repl.reset(new ReplicationCoordinatorImpl( _settings, _externalState, _topo, _replExec.get(), seed, &_durablityLambda)); diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.h b/src/mongo/db/repl/replication_coordinator_test_fixture.h index 1dd995003a4..60a6a21d42b 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.h +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.h @@ -242,8 +242,6 @@ private: TopologyCoordinatorImpl* _topo = nullptr; // Owned by ReplicationExecutor executor::NetworkInterfaceMock* _net = nullptr; - // Owned by ReplicationExecutor - StorageInterfaceMock* _storage = nullptr; std::unique_ptr<ReplicationExecutor> _replExec; // Owned by ReplicationCoordinatorImpl ReplicationCoordinatorExternalStateMock* _externalState = nullptr; diff --git a/src/mongo/db/repl/replication_executor.cpp b/src/mongo/db/repl/replication_executor.cpp index 61d20c7fc60..0caabfe808d 100644 --- a/src/mongo/db/repl/replication_executor.cpp +++ b/src/mongo/db/repl/replication_executor.cpp @@ -35,7 +35,6 @@ #include <limits> #include "mongo/db/repl/database_task.h" -#include "mongo/db/repl/storage_interface.h" #include "mongo/executor/network_interface.h" #include "mongo/util/assert_util.h" #include "mongo/util/log.h" @@ -52,22 +51,13 @@ using executor::NetworkInterface; using executor::RemoteCommandRequest; using executor::RemoteCommandResponse; -ReplicationExecutor::ReplicationExecutor(NetworkInterface* netInterface, - StorageInterface* storageInterface, - int64_t prngSeed) +ReplicationExecutor::ReplicationExecutor(NetworkInterface* netInterface, int64_t prngSeed) : _random(prngSeed), _networkInterface(netInterface), - _storageInterface(storageInterface), _inShutdown(false), _dblockWorkers(OldThreadPool::DoNotStartThreadsTag(), 3, "replExecDBWorker-"), - _dblockTaskRunner(&_dblockWorkers, - stdx::bind(&StorageInterface::createOperationContext, - storageInterface, - stdx::placeholders::_1)), - _dblockExclusiveLockTaskRunner(&_dblockWorkers, - stdx::bind(&StorageInterface::createOperationContext, - storageInterface, - stdx::placeholders::_1)) {} + _dblockTaskRunner(&_dblockWorkers), + _dblockExclusiveLockTaskRunner(&_dblockWorkers) {} ReplicationExecutor::~ReplicationExecutor() { // join must have been called diff --git a/src/mongo/db/repl/replication_executor.h b/src/mongo/db/repl/replication_executor.h index c46ef4a659a..ffb90fb78ea 100644 --- a/src/mongo/db/repl/replication_executor.h +++ b/src/mongo/db/repl/replication_executor.h @@ -61,8 +61,6 @@ class NetworkInterface; namespace repl { -class StorageInterface; - /** * Implementation of the TaskExecutor interface for providing an event loop for driving state * machines in replication. @@ -100,9 +98,7 @@ public: * * Takes ownership of the passed NetworkInterface object. */ - ReplicationExecutor(executor::NetworkInterface* netInterface, - StorageInterface* storageInterface, - int64_t pnrgSeed); + ReplicationExecutor(executor::NetworkInterface* netInterface, int64_t pnrgSeed); /** * Destroys an executor. @@ -306,7 +302,6 @@ private: PseudoRandom _random; std::unique_ptr<executor::NetworkInterface> _networkInterface; - std::unique_ptr<StorageInterface> _storageInterface; // Thread which executes the run method. Started by startup and must be jointed after shutdown. stdx::thread _executorThread; diff --git a/src/mongo/db/repl/replication_executor_test.cpp b/src/mongo/db/repl/replication_executor_test.cpp index 5bcf567b0af..ceb4ec89beb 100644 --- a/src/mongo/db/repl/replication_executor_test.cpp +++ b/src/mongo/db/repl/replication_executor_test.cpp @@ -36,7 +36,6 @@ #include "mongo/db/operation_context.h" #include "mongo/db/repl/replication_executor.h" #include "mongo/db/repl/replication_executor_test_fixture.h" -#include "mongo/db/repl/storage_interface_mock.h" #include "mongo/executor/network_interface_mock.h" #include "mongo/stdx/memory.h" #include "mongo/stdx/functional.h" @@ -57,12 +56,11 @@ using unittest::assertGet; const int64_t prngSeed = 1; MONGO_INITIALIZER(ReplExecutorCommonTests)(InitializerContext*) { - mongo::executor::addTestsForExecutor( - "ReplicationExecutorCommon", - [](std::unique_ptr<executor::NetworkInterfaceMock>* net) { - return stdx::make_unique<ReplicationExecutor>( - net->release(), new StorageInterfaceMock(), prngSeed); - }); + mongo::executor::addTestsForExecutor("ReplicationExecutorCommon", + [](std::unique_ptr<executor::NetworkInterfaceMock>* net) { + return stdx::make_unique<ReplicationExecutor>( + net->release(), prngSeed); + }); return Status::OK(); } diff --git a/src/mongo/db/repl/replication_executor_test_fixture.cpp b/src/mongo/db/repl/replication_executor_test_fixture.cpp index 2ee87f372de..f245e4becc8 100644 --- a/src/mongo/db/repl/replication_executor_test_fixture.cpp +++ b/src/mongo/db/repl/replication_executor_test_fixture.cpp @@ -53,8 +53,7 @@ void ReplicationExecutorTest::postExecutorThreadLaunch() { std::unique_ptr<executor::TaskExecutor> ReplicationExecutorTest::makeTaskExecutor( std::unique_ptr<executor::NetworkInterfaceMock> net) { - _storage = new StorageInterfaceMock(); - return stdx::make_unique<ReplicationExecutor>(net.release(), _storage, prngSeed); + return stdx::make_unique<ReplicationExecutor>(net.release(), prngSeed); } } // namespace repl diff --git a/src/mongo/db/repl/scatter_gather_test.cpp b/src/mongo/db/repl/scatter_gather_test.cpp index 98c5032bc6c..05edf467fbd 100644 --- a/src/mongo/db/repl/scatter_gather_test.cpp +++ b/src/mongo/db/repl/scatter_gather_test.cpp @@ -31,9 +31,9 @@ #include "mongo/db/repl/replication_executor.h" #include "mongo/db/repl/scatter_gather_algorithm.h" #include "mongo/db/repl/scatter_gather_runner.h" -#include "mongo/db/repl/storage_interface_mock.h" #include "mongo/executor/network_interface_mock.h" #include "mongo/stdx/functional.h" +#include "mongo/stdx/memory.h" #include "mongo/stdx/thread.h" #include "mongo/unittest/unittest.h" @@ -113,15 +113,13 @@ private: // owned by _executor NetworkInterfaceMock* _net; - StorageInterfaceMock* _storage; std::unique_ptr<ReplicationExecutor> _executor; std::unique_ptr<stdx::thread> _executorThread; }; void ScatterGatherTest::setUp() { _net = new NetworkInterfaceMock; - _storage = new StorageInterfaceMock; - _executor.reset(new ReplicationExecutor(_net, _storage, 1 /* prng seed */)); + _executor = stdx::make_unique<ReplicationExecutor>(_net, 1 /* prng seed */); _executorThread.reset(new stdx::thread(stdx::bind(&ReplicationExecutor::run, _executor.get()))); } diff --git a/src/mongo/db/repl/storage_interface.h b/src/mongo/db/repl/storage_interface.h index 71540ab10d6..6ac1dc82c9c 100644 --- a/src/mongo/db/repl/storage_interface.h +++ b/src/mongo/db/repl/storage_interface.h @@ -38,7 +38,6 @@ namespace mongo { -class Client; class OperationContext; namespace repl { @@ -97,11 +96,6 @@ public: virtual ~StorageInterface() = default; /** - * Creates an operation context for running database operations. - */ - virtual ServiceContext::UniqueOperationContext createOperationContext(Client* client) = 0; - - /** * Returns true if initial sync was started but has not not completed. */ virtual bool getInitialSyncFlag(OperationContext* txn) const = 0; diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp index 140ed0fb2b4..23f9d4ce0b7 100644 --- a/src/mongo/db/repl/storage_interface_impl.cpp +++ b/src/mongo/db/repl/storage_interface_impl.cpp @@ -67,11 +67,6 @@ NamespaceString StorageInterfaceImpl::getMinValidNss() const { return _minValidNss; } -ServiceContext::UniqueOperationContext StorageInterfaceImpl::createOperationContext( - Client* client) { - return client->makeOperationContext(); -} - bool StorageInterfaceImpl::getInitialSyncFlag(OperationContext* txn) const { MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { ScopedTransaction transaction(txn, MODE_IS); diff --git a/src/mongo/db/repl/storage_interface_impl.h b/src/mongo/db/repl/storage_interface_impl.h index 917b288776b..b2174e4bd8a 100644 --- a/src/mongo/db/repl/storage_interface_impl.h +++ b/src/mongo/db/repl/storage_interface_impl.h @@ -52,8 +52,6 @@ public: */ NamespaceString getMinValidNss() const; - ServiceContext::UniqueOperationContext createOperationContext(Client* client) override; - bool getInitialSyncFlag(OperationContext* txn) const override; void setInitialSyncFlag(OperationContext* txn) override; diff --git a/src/mongo/db/repl/storage_interface_impl_test.cpp b/src/mongo/db/repl/storage_interface_impl_test.cpp index e0c72cd4e93..087780f8afa 100644 --- a/src/mongo/db/repl/storage_interface_impl_test.cpp +++ b/src/mongo/db/repl/storage_interface_impl_test.cpp @@ -113,10 +113,6 @@ TEST_F(StorageInterfaceImplTest, ServiceContextDecorator) { ASSERT_TRUE(storageInterface == StorageInterface::get(serviceContext)); } -TEST_F(StorageInterfaceImplTest, CreateOperationContextShouldNotReturnNull) { - ASSERT_TRUE(StorageInterfaceImpl().createOperationContext(getClient())); -} - TEST_F(StorageInterfaceImplTest, DefaultMinValidNamespace) { ASSERT_EQUALS(NamespaceString(StorageInterfaceImpl::kDefaultMinValidNamespace), StorageInterfaceImpl().getMinValidNss()); @@ -126,7 +122,7 @@ TEST_F(StorageInterfaceImplTest, InitialSyncFlag) { NamespaceString nss("local.StorageInterfaceImplTest_InitialSyncFlag"); StorageInterfaceImpl storageInterface(nss); - auto txn = storageInterface.createOperationContext(getClient()); + auto txn = getClient()->makeOperationContext(); // Initial sync flag should be unset after initializing a new storage engine. ASSERT_FALSE(storageInterface.getInitialSyncFlag(txn.get())); @@ -149,7 +145,7 @@ TEST_F(StorageInterfaceImplTest, MinValid) { NamespaceString nss("local.StorageInterfaceImplTest_MinValid"); StorageInterfaceImpl storageInterface(nss); - auto txn = storageInterface.createOperationContext(getClient()); + auto txn = getClient()->makeOperationContext(); // MinValid boundaries should be {null optime, null optime} after initializing a new storage // engine. diff --git a/src/mongo/db/repl/storage_interface_mock.cpp b/src/mongo/db/repl/storage_interface_mock.cpp index fcb16f71d84..73a4fa64053 100644 --- a/src/mongo/db/repl/storage_interface_mock.cpp +++ b/src/mongo/db/repl/storage_interface_mock.cpp @@ -35,11 +35,6 @@ namespace mongo { namespace repl { -ServiceContext::UniqueOperationContext StorageInterfaceMock::createOperationContext( - Client* client) { - return client->makeOperationContext(); -} - bool StorageInterfaceMock::getInitialSyncFlag(OperationContext* txn) const { stdx::lock_guard<stdx::mutex> lock(_initialSyncFlagMutex); return _initialSyncFlag; diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h index 958adb98d3b..7e4e13af7a9 100644 --- a/src/mongo/db/repl/storage_interface_mock.h +++ b/src/mongo/db/repl/storage_interface_mock.h @@ -42,8 +42,6 @@ class StorageInterfaceMock : public StorageInterface { public: StorageInterfaceMock() = default; - ServiceContext::UniqueOperationContext createOperationContext(Client* client) override; - bool getInitialSyncFlag(OperationContext* txn) const override; void setInitialSyncFlag(OperationContext* txn) override; void clearInitialSyncFlag(OperationContext* txn) override; diff --git a/src/mongo/db/repl/task_runner.cpp b/src/mongo/db/repl/task_runner.cpp index 1459f4e3bc7..37d05af51a6 100644 --- a/src/mongo/db/repl/task_runner.cpp +++ b/src/mongo/db/repl/task_runner.cpp @@ -73,30 +73,16 @@ TaskRunner::Task TaskRunner::makeCancelTask() { return [](OperationContext* txn, const Status& status) { return NextAction::kCancel; }; } -TaskRunner::TaskRunner(OldThreadPool* threadPool, - const CreateOperationContextFn& createOperationContext) - : _threadPool(threadPool), - _createOperationContext(createOperationContext), - _active(false), - _cancelRequested(false) { +TaskRunner::TaskRunner(OldThreadPool* threadPool) + : _threadPool(threadPool), _active(false), _cancelRequested(false) { uassert(ErrorCodes::BadValue, "null thread pool", threadPool); - uassert(ErrorCodes::BadValue, "null operation context factory", createOperationContext); } TaskRunner::~TaskRunner() { - try { - stdx::unique_lock<stdx::mutex> lk(_mutex); - if (!_active) { - return; - } - _cancelRequested = true; - _condition.notify_all(); - while (_active) { - _condition.wait(lk); - } - } catch (...) { - error() << "unexpected exception destroying task runner: " << exceptionToStatus(); - } + DESTRUCTOR_GUARD(stdx::unique_lock<stdx::mutex> lk(_mutex); + if (!_active) { return; } _cancelRequested = true; + _condition.notify_all(); + while (_active) { _condition.wait(lk); }); } std::string TaskRunner::getDiagnosticString() const { @@ -153,7 +139,7 @@ void TaskRunner::_runTasks() { AuthorizationSession::get(client)->grantInternalAuthorization(); } } - txn = _createOperationContext(client); + txn = client->makeOperationContext(); } NextAction nextAction = runSingleTask(task, txn.get(), Status::OK()); diff --git a/src/mongo/db/repl/task_runner.h b/src/mongo/db/repl/task_runner.h index b8bf39ef57b..80fbd12d337 100644 --- a/src/mongo/db/repl/task_runner.h +++ b/src/mongo/db/repl/task_runner.h @@ -38,7 +38,6 @@ namespace mongo { -class Client; class Status; class OldThreadPool; class OperationContext; @@ -59,8 +58,6 @@ public: kCancel = 3, }; - using CreateOperationContextFn = - stdx::function<ServiceContext::UniqueOperationContext(Client*)>; using Task = stdx::function<NextAction(OperationContext*, const Status&)>; /** @@ -72,7 +69,7 @@ public: */ static Task makeCancelTask(); - TaskRunner(OldThreadPool* threadPool, const CreateOperationContextFn& createOperationContext); + explicit TaskRunner(OldThreadPool* threadPool); virtual ~TaskRunner(); @@ -92,7 +89,7 @@ public: * * This transitions the task runner to an active state. * - * The task runner creates an operation context using '_createOperationContext' + * The task runner creates an operation context using the current client * prior to running a scheduled task. Depending on the NextAction returned from the * task, operation contexts may be shared between consecutive tasks invoked by the task * runner. @@ -145,7 +142,6 @@ private: Task _waitForNextTask(); OldThreadPool* _threadPool; - CreateOperationContextFn _createOperationContext; // Protects member data of this TaskRunner. mutable stdx::mutex _mutex; diff --git a/src/mongo/db/repl/task_runner_test.cpp b/src/mongo/db/repl/task_runner_test.cpp index ae385374eac..0279955ab06 100644 --- a/src/mongo/db/repl/task_runner_test.cpp +++ b/src/mongo/db/repl/task_runner_test.cpp @@ -47,15 +47,8 @@ using Task = TaskRunner::Task; TEST_F(TaskRunnerTest, InvalidConstruction) { // Null thread pool. - ASSERT_THROWS_CODE( - TaskRunner(nullptr, [](Client*) { return ServiceContext::UniqueOperationContext(); }), - UserException, - ErrorCodes::BadValue); - - // Null function for creating operation contexts. - ASSERT_THROWS_CODE(TaskRunner(&getThreadPool(), TaskRunner::CreateOperationContextFn()), - UserException, - ErrorCodes::BadValue); + ASSERT_THROWS_CODE_AND_WHAT( + TaskRunner(nullptr), UserException, ErrorCodes::BadValue, "null thread pool"); } TEST_F(TaskRunnerTest, GetDiagnosticString) { @@ -84,48 +77,24 @@ TEST_F(TaskRunnerTest, CallbackValues) { ASSERT_OK(status); } -TEST_F(TaskRunnerTest, OperationContextFactoryReturnsNull) { - resetTaskRunner(new TaskRunner( - &getThreadPool(), [](Client*) { return ServiceContext::UniqueOperationContext(); })); - stdx::mutex mutex; - bool called = false; - OperationContextNoop opCtxNoop; - OperationContext* txn = &opCtxNoop; - Status status = getDetectableErrorStatus(); - auto task = [&](OperationContext* theTxn, const Status& theStatus) { - stdx::lock_guard<stdx::mutex> lk(mutex); - called = true; - txn = theTxn; - status = theStatus; - return TaskRunner::NextAction::kCancel; - }; - getTaskRunner().schedule(task); - getThreadPool().join(); - ASSERT_FALSE(getTaskRunner().isActive()); +using OpIdVector = std::vector<unsigned int>; - stdx::lock_guard<stdx::mutex> lk(mutex); - ASSERT_TRUE(called); - ASSERT_FALSE(txn); - ASSERT_OK(status); -} - -std::vector<int> _testRunTaskTwice(TaskRunnerTest& test, - TaskRunner::NextAction nextAction, - stdx::function<void(const Task& task)> schedule) { +OpIdVector _testRunTaskTwice(TaskRunnerTest& test, + TaskRunner::NextAction nextAction, + stdx::function<void(const Task& task)> schedule) { unittest::Barrier barrier(2U); stdx::mutex mutex; - int i = 0; - OperationContext* txn[2] = {nullptr, nullptr}; - int txnId[2] = {-100, -100}; + std::vector<OperationContext*> txns; + OpIdVector txnIds; auto task = [&](OperationContext* theTxn, const Status& theStatus) { stdx::lock_guard<stdx::mutex> lk(mutex); - int j = i++; - if (j >= 2) { + if (txns.size() >= 2U) { return TaskRunner::NextAction::kInvalid; } - txn[j] = theTxn; - txnId[j] = TaskRunnerTest::getOperationContextId(txn[j]); - TaskRunner::NextAction result = j == 0 ? nextAction : TaskRunner::NextAction::kCancel; + TaskRunner::NextAction result = + txns.size() == 0 ? nextAction : TaskRunner::NextAction::kCancel; + txns.push_back(theTxn); + txnIds.push_back(theTxn->getOpID()); barrier.countDownAndWait(); return result; }; @@ -142,22 +111,20 @@ std::vector<int> _testRunTaskTwice(TaskRunnerTest& test, ASSERT_FALSE(test.getTaskRunner().isActive()); stdx::lock_guard<stdx::mutex> lk(mutex); - ASSERT_EQUALS(2, i); - ASSERT(txn[0]); - ASSERT(txn[1]); - ASSERT_NOT_LESS_THAN(txnId[0], 0); - ASSERT_NOT_LESS_THAN(txnId[1], 0); - return {txnId[0], txnId[1]}; + ASSERT_EQUALS(2U, txns.size()); + ASSERT(txns[0]); + ASSERT(txns[1]); + return txnIds; } -std::vector<int> _testRunTaskTwice(TaskRunnerTest& test, TaskRunner::NextAction nextAction) { +std::vector<unsigned int> _testRunTaskTwice(TaskRunnerTest& test, + TaskRunner::NextAction nextAction) { auto schedule = [&](const Task& task) { test.getTaskRunner().schedule(task); }; return _testRunTaskTwice(test, nextAction, schedule); } TEST_F(TaskRunnerTest, RunTaskTwiceDisposeOperationContext) { - std::vector<int> txnId = - _testRunTaskTwice(*this, TaskRunner::NextAction::kDisposeOperationContext); + auto txnId = _testRunTaskTwice(*this, TaskRunner::NextAction::kDisposeOperationContext); ASSERT_NOT_EQUALS(txnId[0], txnId[1]); } @@ -169,14 +136,13 @@ TEST_F(TaskRunnerTest, RunTaskTwiceDisposeOperationContextJoinThreadPoolBeforeSc getThreadPool().join(); getTaskRunner().schedule(task); }; - std::vector<int> txnId = + auto txnId = _testRunTaskTwice(*this, TaskRunner::NextAction::kDisposeOperationContext, schedule); ASSERT_NOT_EQUALS(txnId[0], txnId[1]); } TEST_F(TaskRunnerTest, RunTaskTwiceKeepOperationContext) { - std::vector<int> txnId = - _testRunTaskTwice(*this, TaskRunner::NextAction::kKeepOperationContext); + auto txnId = _testRunTaskTwice(*this, TaskRunner::NextAction::kKeepOperationContext); ASSERT_EQUALS(txnId[0], txnId[1]); } diff --git a/src/mongo/db/repl/task_runner_test_fixture.cpp b/src/mongo/db/repl/task_runner_test_fixture.cpp index 0e24000c1c8..25360440871 100644 --- a/src/mongo/db/repl/task_runner_test_fixture.cpp +++ b/src/mongo/db/repl/task_runner_test_fixture.cpp @@ -52,18 +52,6 @@ Status TaskRunnerTest::getDetectableErrorStatus() { return Status(ErrorCodes::InternalError, "Not mutated"); } -int TaskRunnerTest::getOperationContextId(OperationContext* txn) { - if (!txn) { - return -1; - } - return int(txn->getOpID()); -} - -ServiceContext::UniqueOperationContext TaskRunnerTest::createOperationContext( - Client* client) const { - return client->makeOperationContext(); -} - TaskRunner& TaskRunnerTest::getTaskRunner() const { ASSERT(_taskRunner.get()); return *_taskRunner; @@ -74,19 +62,13 @@ OldThreadPool& TaskRunnerTest::getThreadPool() const { return *_threadPool; } -void TaskRunnerTest::resetTaskRunner(TaskRunner* taskRunner) { - _taskRunner.reset(taskRunner); -} - void TaskRunnerTest::destroyTaskRunner() { _taskRunner.reset(); } void TaskRunnerTest::setUp() { - _threadPool.reset(new OldThreadPool(kNumThreads, "TaskRunnerTest-")); - resetTaskRunner(new TaskRunner( - _threadPool.get(), - stdx::bind(&TaskRunnerTest::createOperationContext, this, stdx::placeholders::_1))); + _threadPool = stdx::make_unique<OldThreadPool>(kNumThreads, "TaskRunnerTest-"); + _taskRunner = stdx::make_unique<TaskRunner>(_threadPool.get()); } void TaskRunnerTest::tearDown() { diff --git a/src/mongo/db/repl/task_runner_test_fixture.h b/src/mongo/db/repl/task_runner_test_fixture.h index 85dfe418734..84351c01926 100644 --- a/src/mongo/db/repl/task_runner_test_fixture.h +++ b/src/mongo/db/repl/task_runner_test_fixture.h @@ -52,22 +52,9 @@ class TaskRunnerTest : public unittest::Test { public: static Status getDetectableErrorStatus(); - /** - * Returns ID of mock operation context returned from createOperationContext(). - * Returns -1 if txn is null. - * Returns -2 if txn cannot be converted to a mock operation context containing an ID. - */ - static int getOperationContextId(OperationContext* txn); - - /** - * Returns an noop operation context with an embedded numerical ID. - */ - virtual ServiceContext::UniqueOperationContext createOperationContext(Client*) const; - OldThreadPool& getThreadPool() const; TaskRunner& getTaskRunner() const; - void resetTaskRunner(TaskRunner* taskRunner); void destroyTaskRunner(); protected: |