diff options
author | Benety Goh <benety@mongodb.com> | 2018-03-06 20:17:44 -0500 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2018-03-06 20:17:44 -0500 |
commit | 01a89a37332c7c5bd9892f6fbd4082c9f9e9462d (patch) | |
tree | d7285545300c88e822502cbc091f087e52da5128 /src/mongo/db | |
parent | d368f071651856039f2637202c122b1a55a44142 (diff) | |
download | mongo-01a89a37332c7c5bd9892f6fbd4082c9f9e9462d.tar.gz |
SERVER-33625 replace use of OldThreadPool in cloners and TaskRunner with ThreadPool
Diffstat (limited to 'src/mongo/db')
25 files changed, 72 insertions, 82 deletions
diff --git a/src/mongo/db/repl/base_cloner_test_fixture.cpp b/src/mongo/db/repl/base_cloner_test_fixture.cpp index fa418d338fb..c38663a54cf 100644 --- a/src/mongo/db/repl/base_cloner_test_fixture.cpp +++ b/src/mongo/db/repl/base_cloner_test_fixture.cpp @@ -111,7 +111,13 @@ void BaseClonerTest::setUp() { executor::ThreadPoolExecutorTest::setUp(); clear(); launchExecutorThread(); - dbWorkThreadPool = stdx::make_unique<OldThreadPool>(1); + + ThreadPool::Options options; + options.minThreads = 1U; + options.maxThreads = 1U; + dbWorkThreadPool = stdx::make_unique<ThreadPool>(options); + dbWorkThreadPool->startup(); + storageInterface.reset(new StorageInterfaceMock()); } @@ -120,7 +126,7 @@ void BaseClonerTest::tearDown() { getExecutor().join(); storageInterface.reset(); - dbWorkThreadPool->join(); + dbWorkThreadPool.reset(); } diff --git a/src/mongo/db/repl/base_cloner_test_fixture.h b/src/mongo/db/repl/base_cloner_test_fixture.h index e5615806fc6..7e6900a2785 100644 --- a/src/mongo/db/repl/base_cloner_test_fixture.h +++ b/src/mongo/db/repl/base_cloner_test_fixture.h @@ -41,7 +41,7 @@ #include "mongo/executor/thread_pool_task_executor_test_fixture.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/mutex.h" -#include "mongo/util/concurrency/old_thread_pool.h" +#include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/net/hostandport.h" namespace mongo { @@ -128,7 +128,7 @@ protected: void tearDown() override; std::unique_ptr<StorageInterfaceMock> storageInterface; - std::unique_ptr<OldThreadPool> dbWorkThreadPool; + std::unique_ptr<ThreadPool> dbWorkThreadPool; private: // Protects member data of this base cloner fixture. diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp index 159a664ccb8..773f54a0b8b 100644 --- a/src/mongo/db/repl/collection_cloner.cpp +++ b/src/mongo/db/repl/collection_cloner.cpp @@ -99,7 +99,7 @@ BSONObj makeCommandWithUUIDorCollectionName(StringData command, } CollectionCloner::CollectionCloner(executor::TaskExecutor* executor, - OldThreadPool* dbWorkThreadPool, + ThreadPool* dbWorkThreadPool, const HostAndPort& source, const NamespaceString& sourceNss, const CollectionOptions& options, diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h index 46dd6d7903f..e79258e3734 100644 --- a/src/mongo/db/repl/collection_cloner.h +++ b/src/mongo/db/repl/collection_cloner.h @@ -50,14 +50,11 @@ #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/functional.h" #include "mongo/stdx/mutex.h" -#include "mongo/util/concurrency/old_thread_pool.h" +#include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/net/hostandport.h" #include "mongo/util/progress_meter.h" namespace mongo { - -class OldThreadPool; - namespace repl { class StorageInterface; @@ -106,7 +103,7 @@ public: * Takes ownership of the passed StorageInterface object. */ CollectionCloner(executor::TaskExecutor* executor, - OldThreadPool* dbWorkThreadPool, + ThreadPool* dbWorkThreadPool, const HostAndPort& source, const NamespaceString& sourceNss, const CollectionOptions& options, @@ -278,7 +275,7 @@ private: mutable stdx::mutex _mutex; mutable stdx::condition_variable _condition; // (M) executor::TaskExecutor* _executor; // (R) Not owned by us. - OldThreadPool* _dbWorkThreadPool; // (R) Not owned by us. + ThreadPool* _dbWorkThreadPool; // (R) Not owned by us. HostAndPort _source; // (R) NamespaceString _sourceNss; // (R) NamespaceString _destNss; // (R) diff --git a/src/mongo/db/repl/data_replicator_external_state.h b/src/mongo/db/repl/data_replicator_external_state.h index 818a9fdd38a..442022eeeeb 100644 --- a/src/mongo/db/repl/data_replicator_external_state.h +++ b/src/mongo/db/repl/data_replicator_external_state.h @@ -37,13 +37,12 @@ #include "mongo/db/repl/repl_set_config.h" #include "mongo/rpc/metadata/oplog_query_metadata.h" #include "mongo/rpc/metadata/repl_set_metadata.h" +#include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/net/hostandport.h" #include "mongo/util/time_support.h" namespace mongo { -class OldThreadPool; - namespace executor { class TaskExecutor; } // namespace executor @@ -79,7 +78,7 @@ public: /** * Returns shared db worker thread pool for collection cloning. */ - virtual OldThreadPool* getDbWorkThreadPool() const = 0; + virtual ThreadPool* getDbWorkThreadPool() const = 0; /** * Returns the current term and last committed optime. diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.cpp b/src/mongo/db/repl/data_replicator_external_state_impl.cpp index 49ea11001b1..1d93a2fb5cf 100644 --- a/src/mongo/db/repl/data_replicator_external_state_impl.cpp +++ b/src/mongo/db/repl/data_replicator_external_state_impl.cpp @@ -49,7 +49,7 @@ executor::TaskExecutor* DataReplicatorExternalStateImpl::getTaskExecutor() const return _replicationCoordinatorExternalState->getTaskExecutor(); } -OldThreadPool* DataReplicatorExternalStateImpl::getDbWorkThreadPool() const { +ThreadPool* DataReplicatorExternalStateImpl::getDbWorkThreadPool() const { return _replicationCoordinatorExternalState->getDbWorkThreadPool(); } diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.h b/src/mongo/db/repl/data_replicator_external_state_impl.h index 586409b2b50..b66dbaff53b 100644 --- a/src/mongo/db/repl/data_replicator_external_state_impl.h +++ b/src/mongo/db/repl/data_replicator_external_state_impl.h @@ -48,7 +48,7 @@ public: executor::TaskExecutor* getTaskExecutor() const override; - OldThreadPool* getDbWorkThreadPool() const override; + ThreadPool* getDbWorkThreadPool() const override; OpTimeWithTerm getCurrentTermAndLastCommittedOpTime() override; diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.cpp b/src/mongo/db/repl/data_replicator_external_state_mock.cpp index de00a40efbf..71ad72d166d 100644 --- a/src/mongo/db/repl/data_replicator_external_state_mock.cpp +++ b/src/mongo/db/repl/data_replicator_external_state_mock.cpp @@ -45,7 +45,7 @@ executor::TaskExecutor* DataReplicatorExternalStateMock::getTaskExecutor() const return taskExecutor; } -OldThreadPool* DataReplicatorExternalStateMock::getDbWorkThreadPool() const { +ThreadPool* DataReplicatorExternalStateMock::getDbWorkThreadPool() const { return dbWorkThreadPool; } diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.h b/src/mongo/db/repl/data_replicator_external_state_mock.h index c556b822315..1b93307f2fa 100644 --- a/src/mongo/db/repl/data_replicator_external_state_mock.h +++ b/src/mongo/db/repl/data_replicator_external_state_mock.h @@ -45,7 +45,7 @@ public: executor::TaskExecutor* getTaskExecutor() const override; - OldThreadPool* getDbWorkThreadPool() const override; + ThreadPool* getDbWorkThreadPool() const override; OpTimeWithTerm getCurrentTermAndLastCommittedOpTime() override; @@ -66,7 +66,7 @@ public: executor::TaskExecutor* taskExecutor = nullptr; // DB worker thread pool. Not owned by us. - OldThreadPool* dbWorkThreadPool = nullptr; + ThreadPool* dbWorkThreadPool = nullptr; // Returned by getCurrentTermAndLastCommittedOpTime. long long currentTerm = OpTime::kUninitializedTerm; diff --git a/src/mongo/db/repl/database_cloner.cpp b/src/mongo/db/repl/database_cloner.cpp index 71d11c1436a..74da2fe22be 100644 --- a/src/mongo/db/repl/database_cloner.cpp +++ b/src/mongo/db/repl/database_cloner.cpp @@ -101,7 +101,7 @@ BSONObj createListCollectionsCommandObject(const BSONObj& filter) { } // namespace DatabaseCloner::DatabaseCloner(executor::TaskExecutor* executor, - OldThreadPool* dbWorkThreadPool, + ThreadPool* dbWorkThreadPool, const HostAndPort& source, const std::string& dbname, const BSONObj& listCollectionsFilter, diff --git a/src/mongo/db/repl/database_cloner.h b/src/mongo/db/repl/database_cloner.h index e6692428efb..b32ce95f584 100644 --- a/src/mongo/db/repl/database_cloner.h +++ b/src/mongo/db/repl/database_cloner.h @@ -43,12 +43,10 @@ #include "mongo/executor/task_executor.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/mutex.h" +#include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/net/hostandport.h" namespace mongo { - -class OldThreadPool; - namespace repl { class StorageInterface; @@ -109,7 +107,7 @@ public: * 'listCollectionsFilter' will be extended to include collections only, filtering out views. */ DatabaseCloner(executor::TaskExecutor* executor, - OldThreadPool* dbWorkThreadPool, + ThreadPool* dbWorkThreadPool, const HostAndPort& source, const std::string& dbname, const BSONObj& listCollectionsFilter, @@ -213,7 +211,7 @@ private: mutable stdx::mutex _mutex; mutable stdx::condition_variable _condition; // (M) executor::TaskExecutor* _executor; // (R) - OldThreadPool* _dbWorkThreadPool; // (R) + ThreadPool* _dbWorkThreadPool; // (R) const HostAndPort _source; // (R) const std::string _dbname; // (R) const BSONObj _listCollectionsFilter; // (R) diff --git a/src/mongo/db/repl/databases_cloner.cpp b/src/mongo/db/repl/databases_cloner.cpp index 2a4e47edbf2..e4cad653e4f 100644 --- a/src/mongo/db/repl/databases_cloner.cpp +++ b/src/mongo/db/repl/databases_cloner.cpp @@ -66,7 +66,7 @@ MONGO_EXPORT_SERVER_PARAMETER(numInitialSyncListDatabasesAttempts, int, 3); DatabasesCloner::DatabasesCloner(StorageInterface* si, executor::TaskExecutor* exec, - OldThreadPool* dbWorkThreadPool, + ThreadPool* dbWorkThreadPool, HostAndPort source, IncludeDbFilterFn includeDbPred, OnFinishFn finishFn) diff --git a/src/mongo/db/repl/databases_cloner.h b/src/mongo/db/repl/databases_cloner.h index f47ef569614..afb69c4ade9 100644 --- a/src/mongo/db/repl/databases_cloner.h +++ b/src/mongo/db/repl/databases_cloner.h @@ -44,12 +44,10 @@ #include "mongo/executor/task_executor.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/mutex.h" +#include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/net/hostandport.h" namespace mongo { - -class OldThreadPool; - namespace repl { /** @@ -70,7 +68,7 @@ public: using OnFinishFn = stdx::function<void(const Status&)>; DatabasesCloner(StorageInterface* si, executor::TaskExecutor* exec, - OldThreadPool* dbWorkThreadPool, + ThreadPool* dbWorkThreadPool, HostAndPort source, IncludeDbFilterFn includeDbPred, OnFinishFn finishFn); @@ -169,8 +167,8 @@ private: mutable stdx::mutex _mutex; // (S) Status _status{ErrorCodes::NotYetInitialized, ""}; // (M) If it is not OK, we stop everything. executor::TaskExecutor* _exec; // (R) executor to schedule things with - OldThreadPool* _dbWorkThreadPool; // (R) db worker thread pool for collection cloning. - const HostAndPort _source; // (R) The source to use. + ThreadPool* _dbWorkThreadPool; // (R) db worker thread pool for collection cloning. + const HostAndPort _source; // (R) The source to use. CollectionCloner::ScheduleDbWorkFn _scheduleDbWorkFn; // (M) const IncludeDbFilterFn _includeDbFn; // (R) function which decides which dbs are cloned. diff --git a/src/mongo/db/repl/databases_cloner_test.cpp b/src/mongo/db/repl/databases_cloner_test.cpp index 3705afed383..369b8a27c38 100644 --- a/src/mongo/db/repl/databases_cloner_test.cpp +++ b/src/mongo/db/repl/databases_cloner_test.cpp @@ -43,8 +43,8 @@ #include "mongo/stdx/mutex.h" #include "mongo/unittest/task_executor_proxy.h" #include "mongo/unittest/unittest.h" -#include "mongo/util/concurrency/old_thread_pool.h" #include "mongo/util/concurrency/thread_name.h" +#include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/scopeguard.h" @@ -79,14 +79,13 @@ struct StorageInterfaceResults { class DBsClonerTest : public executor::ThreadPoolExecutorTest { public: - DBsClonerTest() - : _storageInterface{}, _dbWorkThreadPool{OldThreadPool::DoNotStartThreadsTag(), 1} {} + DBsClonerTest() : _storageInterface{}, _dbWorkThreadPool(ThreadPool::Options()) {} StorageInterface& getStorage() { return _storageInterface; } - OldThreadPool& getDbWorkThreadPool() { + ThreadPool& getDbWorkThreadPool() { return _dbWorkThreadPool; } @@ -186,13 +185,12 @@ protected: std::unique_ptr<CollectionBulkLoader>(collInfo->loader)); }; - _dbWorkThreadPool.startThreads(); + _dbWorkThreadPool.startup(); } void tearDown() override { getExecutor().shutdown(); getExecutor().join(); - _dbWorkThreadPool.join(); } /** @@ -330,7 +328,7 @@ protected: StorageInterfaceMock _storageInterface; private: - OldThreadPool _dbWorkThreadPool; + ThreadPool _dbWorkThreadPool; std::map<NamespaceString, CollectionMockStats> _collectionStats; std::map<NamespaceString, CollectionCloneInfo> _collections; StorageInterfaceResults _storageInterfaceWorkDone; diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp index 428be3cdb14..9d01a75c1c9 100644 --- a/src/mongo/db/repl/initial_syncer_test.cpp +++ b/src/mongo/db/repl/initial_syncer_test.cpp @@ -59,8 +59,8 @@ #include "mongo/executor/network_interface_mock.h" #include "mongo/executor/thread_pool_task_executor_test_fixture.h" #include "mongo/stdx/mutex.h" -#include "mongo/util/concurrency/old_thread_pool.h" #include "mongo/util/concurrency/thread_name.h" +#include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/scopeguard.h" @@ -227,7 +227,7 @@ public: return *_storageInterface; } - OldThreadPool& getDbWorkThreadPool() { + ThreadPool& getDbWorkThreadPool() { return *_dbWorkThreadPool; } @@ -305,7 +305,8 @@ protected: std::unique_ptr<CollectionBulkLoader>(collInfo->loader)); }; - _dbWorkThreadPool = stdx::make_unique<OldThreadPool>(1); + _dbWorkThreadPool = stdx::make_unique<ThreadPool>(ThreadPool::Options()); + _dbWorkThreadPool->startup(); Client::initThreadIfNotAlready(); reset(); @@ -403,7 +404,6 @@ protected: void tearDown() override { tearDownExecutorThread(); _initialSyncer.reset(); - _dbWorkThreadPool->join(); _dbWorkThreadPool.reset(); _replicationProcess.reset(); _storageInterface.reset(); @@ -439,7 +439,7 @@ protected: std::unique_ptr<SyncSourceSelectorMock> _syncSourceSelector; std::unique_ptr<StorageInterfaceMock> _storageInterface; std::unique_ptr<ReplicationProcess> _replicationProcess; - std::unique_ptr<OldThreadPool> _dbWorkThreadPool; + std::unique_ptr<ThreadPool> _dbWorkThreadPool; std::map<NamespaceString, CollectionMockStats> _collectionStats; std::map<NamespaceString, CollectionCloneInfo> _collections; diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h index 0730e1bed42..ec457252d2b 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state.h +++ b/src/mongo/db/repl/replication_coordinator_external_state.h @@ -38,13 +38,13 @@ #include "mongo/db/repl/oplog_buffer.h" #include "mongo/db/repl/optime.h" #include "mongo/stdx/functional.h" +#include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/time_support.h" namespace mongo { class BSONObj; class OID; -class OldThreadPool; class OperationContext; class ServiceContext; class Status; @@ -115,7 +115,7 @@ public: /** * Returns shared db worker thread pool for collection cloning. */ - virtual OldThreadPool* getDbWorkThreadPool() const = 0; + virtual ThreadPool* getDbWorkThreadPool() const = 0; /** * Runs the repair database command on the "local" db, if the storage engine is MMapV1. diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 0bced2a2351..81e4f554b22 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -312,7 +312,6 @@ void ReplicationCoordinatorExternalStateImpl::startThreads(const ReplSettings& s executor::makeNetworkInterface("NetworkInterfaceASIO-RS", nullptr, std::move(hookList))); _taskExecutor->startup(); - _dbWorkerPool = std::make_unique<OldThreadPool>(16, "db worker"); _writerPool = SyncTail::makeWriterPool(); _startedThreads = true; @@ -360,8 +359,8 @@ executor::TaskExecutor* ReplicationCoordinatorExternalStateImpl::getTaskExecutor return _taskExecutor.get(); } -OldThreadPool* ReplicationCoordinatorExternalStateImpl::getDbWorkThreadPool() const { - return _dbWorkerPool.get(); +ThreadPool* ReplicationCoordinatorExternalStateImpl::getDbWorkThreadPool() const { + return _writerPool.get(); } Status ReplicationCoordinatorExternalStateImpl::runRepairOnLocalDB(OperationContext* opCtx) { diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h index 291e29ec7af..a303c88e990 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h @@ -41,7 +41,6 @@ #include "mongo/db/storage/journal_listener.h" #include "mongo/db/storage/snapshot_manager.h" #include "mongo/stdx/mutex.h" -#include "mongo/util/concurrency/old_thread_pool.h" #include "mongo/util/concurrency/thread_pool.h" namespace mongo { @@ -78,7 +77,7 @@ public: virtual void startMasterSlave(OperationContext* opCtx); virtual void shutdown(OperationContext* opCtx); virtual executor::TaskExecutor* getTaskExecutor() const override; - virtual OldThreadPool* getDbWorkThreadPool() const override; + virtual ThreadPool* getDbWorkThreadPool() const override; virtual Status runRepairOnLocalDB(OperationContext* opCtx) override; virtual Status initializeReplSetStorage(OperationContext* opCtx, const BSONObj& config); virtual void waitForAllEarlierOplogWritesToBeVisible(OperationContext* opCtx); @@ -199,10 +198,10 @@ private: // Task executor used to run replication tasks. std::unique_ptr<executor::TaskExecutor> _taskExecutor; - // Used by database and collection cloners to perform storage operations. - std::unique_ptr<OldThreadPool> _dbWorkerPool; - // Used by repl::multiApply() to apply the sync source's operations in parallel. + // Also used by database and collection cloners to perform storage operations. + // Cloners and oplog application run in separate phases of initial sync so it is fine to share + // this thread pool. std::unique_ptr<ThreadPool> _writerPool; // Writes a noop every 10 seconds. diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp index 0cec9f77524..bf908281c33 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp @@ -92,7 +92,7 @@ executor::TaskExecutor* ReplicationCoordinatorExternalStateMock::getTaskExecutor return nullptr; } -OldThreadPool* ReplicationCoordinatorExternalStateMock::getDbWorkThreadPool() const { +ThreadPool* ReplicationCoordinatorExternalStateMock::getDbWorkThreadPool() const { return nullptr; } diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h index f227fef5489..675b9ca94fe 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h @@ -65,7 +65,7 @@ public: virtual void startMasterSlave(OperationContext*); virtual void shutdown(OperationContext* opCtx); virtual executor::TaskExecutor* getTaskExecutor() const override; - virtual OldThreadPool* getDbWorkThreadPool() const override; + virtual ThreadPool* getDbWorkThreadPool() const override; virtual Status runRepairOnLocalDB(OperationContext* opCtx) override; virtual Status initializeReplSetStorage(OperationContext* opCtx, const BSONObj& config); virtual void waitForAllEarlierOplogWritesToBeVisible(OperationContext* opCtx); diff --git a/src/mongo/db/repl/task_runner.cpp b/src/mongo/db/repl/task_runner.cpp index 909bb594ad2..8d08c21b746 100644 --- a/src/mongo/db/repl/task_runner.cpp +++ b/src/mongo/db/repl/task_runner.cpp @@ -40,7 +40,6 @@ #include "mongo/db/operation_context.h" #include "mongo/db/service_context.h" #include "mongo/util/assert_util.h" -#include "mongo/util/concurrency/old_thread_pool.h" #include "mongo/util/concurrency/thread_name.h" #include "mongo/util/destructor_guard.h" #include "mongo/util/log.h" @@ -77,7 +76,7 @@ TaskRunner::Task TaskRunner::makeCancelTask() { return [](OperationContext* opCtx, const Status& status) { return NextAction::kCancel; }; } -TaskRunner::TaskRunner(OldThreadPool* threadPool) +TaskRunner::TaskRunner(ThreadPool* threadPool) : _threadPool(threadPool), _active(false), _cancelRequested(false) { uassert(ErrorCodes::BadValue, "null thread pool", threadPool); } @@ -113,7 +112,7 @@ void TaskRunner::schedule(const Task& task) { return; } - _threadPool->schedule([this] { _runTasks(); }); + invariantOK(_threadPool->schedule([this] { _runTasks(); })); _active = true; _cancelRequested = false; diff --git a/src/mongo/db/repl/task_runner.h b/src/mongo/db/repl/task_runner.h index b7dcf4c05d6..310f432379d 100644 --- a/src/mongo/db/repl/task_runner.h +++ b/src/mongo/db/repl/task_runner.h @@ -35,11 +35,11 @@ #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/functional.h" #include "mongo/stdx/mutex.h" +#include "mongo/util/concurrency/thread_pool.h" namespace mongo { class Status; -class OldThreadPool; class OperationContext; namespace repl { @@ -79,7 +79,7 @@ public: */ static Task makeCancelTask(); - explicit TaskRunner(OldThreadPool* threadPool); + explicit TaskRunner(ThreadPool* threadPool); virtual ~TaskRunner(); @@ -156,7 +156,7 @@ private: */ Task _waitForNextTask(); - OldThreadPool* _threadPool; + ThreadPool* _threadPool; // Protects member data of this TaskRunner. mutable stdx::mutex _mutex; diff --git a/src/mongo/db/repl/task_runner_test.cpp b/src/mongo/db/repl/task_runner_test.cpp index d2f9d3440e0..0be2eebb0cd 100644 --- a/src/mongo/db/repl/task_runner_test.cpp +++ b/src/mongo/db/repl/task_runner_test.cpp @@ -36,7 +36,7 @@ #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/mutex.h" #include "mongo/unittest/barrier.h" -#include "mongo/util/concurrency/old_thread_pool.h" +#include "mongo/util/concurrency/thread_pool.h" namespace { @@ -68,7 +68,7 @@ TEST_F(TaskRunnerTest, CallbackValues) { return TaskRunner::NextAction::kCancel; }; getTaskRunner().schedule(task); - getThreadPool().join(); + getThreadPool().waitForIdle(); ASSERT_FALSE(getTaskRunner().isActive()); stdx::lock_guard<stdx::mutex> lk(mutex); @@ -107,7 +107,7 @@ OpIdVector _testRunTaskTwice(TaskRunnerTest& test, ASSERT_TRUE(test.getTaskRunner().isActive()); barrier.countDownAndWait(); - test.getThreadPool().join(); + test.getThreadPool().waitForIdle(); ASSERT_FALSE(test.getTaskRunner().isActive()); stdx::lock_guard<stdx::mutex> lk(mutex); @@ -133,7 +133,7 @@ TEST_F(TaskRunnerTest, RunTaskTwiceDisposeOperationContext) { // thread back to pool after disposing of operation context. TEST_F(TaskRunnerTest, RunTaskTwiceDisposeOperationContextJoinThreadPoolBeforeScheduling) { auto schedule = [this](const Task& task) { - getThreadPool().join(); + getThreadPool().waitForIdle(); getTaskRunner().schedule(task); }; auto txnId = @@ -177,7 +177,7 @@ TEST_F(TaskRunnerTest, SkipSecondTask) { schedulingDone = true; condition.notify_all(); } - getThreadPool().join(); + getThreadPool().waitForIdle(); ASSERT_FALSE(getTaskRunner().isActive()); stdx::lock_guard<stdx::mutex> lk(mutex); @@ -226,7 +226,7 @@ TEST_F(TaskRunnerTest, FirstTaskThrowsException) { schedulingDone = true; condition.notify_all(); } - getThreadPool().join(); + getThreadPool().waitForIdle(); ASSERT_FALSE(getTaskRunner().isActive()); stdx::lock_guard<stdx::mutex> lk(mutex); @@ -270,7 +270,7 @@ TEST_F(TaskRunnerTest, Cancel) { getTaskRunner().cancel(); getTaskRunner().cancel(); - getThreadPool().join(); + getThreadPool().waitForIdle(); ASSERT_FALSE(getTaskRunner().isActive()); // This status will not be OK if canceling the task runner @@ -345,7 +345,7 @@ TEST_F(TaskRunnerTest, DestroyShouldWaitForTasksToComplete) { destroyTaskRunner(); - getThreadPool().join(); + getThreadPool().waitForIdle(); // This status will not be OK if canceling the task runner // before scheduling the task results in the task being canceled. diff --git a/src/mongo/db/repl/task_runner_test_fixture.cpp b/src/mongo/db/repl/task_runner_test_fixture.cpp index 25360440871..f16bfa67be0 100644 --- a/src/mongo/db/repl/task_runner_test_fixture.cpp +++ b/src/mongo/db/repl/task_runner_test_fixture.cpp @@ -34,7 +34,6 @@ #include "mongo/db/repl/task_runner.h" #include "mongo/stdx/functional.h" #include "mongo/stdx/memory.h" -#include "mongo/util/concurrency/old_thread_pool.h" namespace mongo { namespace repl { @@ -42,12 +41,6 @@ namespace repl { using namespace mongo; using namespace mongo::repl; -namespace { - -const int kNumThreads = 3; - -} // namespace - Status TaskRunnerTest::getDetectableErrorStatus() { return Status(ErrorCodes::InternalError, "Not mutated"); } @@ -57,7 +50,7 @@ TaskRunner& TaskRunnerTest::getTaskRunner() const { return *_taskRunner; } -OldThreadPool& TaskRunnerTest::getThreadPool() const { +ThreadPool& TaskRunnerTest::getThreadPool() const { ASSERT(_threadPool.get()); return *_threadPool; } @@ -67,7 +60,11 @@ void TaskRunnerTest::destroyTaskRunner() { } void TaskRunnerTest::setUp() { - _threadPool = stdx::make_unique<OldThreadPool>(kNumThreads, "TaskRunnerTest-"); + ThreadPool::Options options; + options.poolName = "TaskRunnerTest"; + _threadPool = stdx::make_unique<ThreadPool>(options); + _threadPool->startup(); + _taskRunner = stdx::make_unique<TaskRunner>(_threadPool.get()); } diff --git a/src/mongo/db/repl/task_runner_test_fixture.h b/src/mongo/db/repl/task_runner_test_fixture.h index 84351c01926..2787419a1e5 100644 --- a/src/mongo/db/repl/task_runner_test_fixture.h +++ b/src/mongo/db/repl/task_runner_test_fixture.h @@ -33,11 +33,11 @@ #include "mongo/base/status.h" #include "mongo/db/service_context.h" #include "mongo/unittest/unittest.h" +#include "mongo/util/concurrency/thread_pool.h" namespace mongo { class Client; -class OldThreadPool; class OperationContext; namespace repl { @@ -52,7 +52,7 @@ class TaskRunnerTest : public unittest::Test { public: static Status getDetectableErrorStatus(); - OldThreadPool& getThreadPool() const; + ThreadPool& getThreadPool() const; TaskRunner& getTaskRunner() const; void destroyTaskRunner(); @@ -62,7 +62,7 @@ protected: void tearDown() override; private: - std::unique_ptr<OldThreadPool> _threadPool; + std::unique_ptr<ThreadPool> _threadPool; std::unique_ptr<TaskRunner> _taskRunner; }; |