summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/repl/base_cloner_test_fixture.cpp10
-rw-r--r--src/mongo/db/repl/base_cloner_test_fixture.h4
-rw-r--r--src/mongo/db/repl/collection_cloner.cpp2
-rw-r--r--src/mongo/db/repl/collection_cloner.h9
-rw-r--r--src/mongo/db/repl/data_replicator_external_state.h5
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_impl.cpp2
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_impl.h2
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_mock.cpp2
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_mock.h4
-rw-r--r--src/mongo/db/repl/database_cloner.cpp2
-rw-r--r--src/mongo/db/repl/database_cloner.h8
-rw-r--r--src/mongo/db/repl/databases_cloner.cpp2
-rw-r--r--src/mongo/db/repl/databases_cloner.h10
-rw-r--r--src/mongo/db/repl/databases_cloner_test.cpp12
-rw-r--r--src/mongo/db/repl/initial_syncer_test.cpp10
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state.h4
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp5
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.h9
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.h2
-rw-r--r--src/mongo/db/repl/task_runner.cpp5
-rw-r--r--src/mongo/db/repl/task_runner.h6
-rw-r--r--src/mongo/db/repl/task_runner_test.cpp16
-rw-r--r--src/mongo/db/repl/task_runner_test_fixture.cpp15
-rw-r--r--src/mongo/db/repl/task_runner_test_fixture.h6
25 files changed, 72 insertions, 82 deletions
diff --git a/src/mongo/db/repl/base_cloner_test_fixture.cpp b/src/mongo/db/repl/base_cloner_test_fixture.cpp
index fa418d338fb..c38663a54cf 100644
--- a/src/mongo/db/repl/base_cloner_test_fixture.cpp
+++ b/src/mongo/db/repl/base_cloner_test_fixture.cpp
@@ -111,7 +111,13 @@ void BaseClonerTest::setUp() {
executor::ThreadPoolExecutorTest::setUp();
clear();
launchExecutorThread();
- dbWorkThreadPool = stdx::make_unique<OldThreadPool>(1);
+
+ ThreadPool::Options options;
+ options.minThreads = 1U;
+ options.maxThreads = 1U;
+ dbWorkThreadPool = stdx::make_unique<ThreadPool>(options);
+ dbWorkThreadPool->startup();
+
storageInterface.reset(new StorageInterfaceMock());
}
@@ -120,7 +126,7 @@ void BaseClonerTest::tearDown() {
getExecutor().join();
storageInterface.reset();
- dbWorkThreadPool->join();
+
dbWorkThreadPool.reset();
}
diff --git a/src/mongo/db/repl/base_cloner_test_fixture.h b/src/mongo/db/repl/base_cloner_test_fixture.h
index e5615806fc6..7e6900a2785 100644
--- a/src/mongo/db/repl/base_cloner_test_fixture.h
+++ b/src/mongo/db/repl/base_cloner_test_fixture.h
@@ -41,7 +41,7 @@
#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
-#include "mongo/util/concurrency/old_thread_pool.h"
+#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/net/hostandport.h"
namespace mongo {
@@ -128,7 +128,7 @@ protected:
void tearDown() override;
std::unique_ptr<StorageInterfaceMock> storageInterface;
- std::unique_ptr<OldThreadPool> dbWorkThreadPool;
+ std::unique_ptr<ThreadPool> dbWorkThreadPool;
private:
// Protects member data of this base cloner fixture.
diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp
index 159a664ccb8..773f54a0b8b 100644
--- a/src/mongo/db/repl/collection_cloner.cpp
+++ b/src/mongo/db/repl/collection_cloner.cpp
@@ -99,7 +99,7 @@ BSONObj makeCommandWithUUIDorCollectionName(StringData command,
}
CollectionCloner::CollectionCloner(executor::TaskExecutor* executor,
- OldThreadPool* dbWorkThreadPool,
+ ThreadPool* dbWorkThreadPool,
const HostAndPort& source,
const NamespaceString& sourceNss,
const CollectionOptions& options,
diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h
index 46dd6d7903f..e79258e3734 100644
--- a/src/mongo/db/repl/collection_cloner.h
+++ b/src/mongo/db/repl/collection_cloner.h
@@ -50,14 +50,11 @@
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/functional.h"
#include "mongo/stdx/mutex.h"
-#include "mongo/util/concurrency/old_thread_pool.h"
+#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/net/hostandport.h"
#include "mongo/util/progress_meter.h"
namespace mongo {
-
-class OldThreadPool;
-
namespace repl {
class StorageInterface;
@@ -106,7 +103,7 @@ public:
* Takes ownership of the passed StorageInterface object.
*/
CollectionCloner(executor::TaskExecutor* executor,
- OldThreadPool* dbWorkThreadPool,
+ ThreadPool* dbWorkThreadPool,
const HostAndPort& source,
const NamespaceString& sourceNss,
const CollectionOptions& options,
@@ -278,7 +275,7 @@ private:
mutable stdx::mutex _mutex;
mutable stdx::condition_variable _condition; // (M)
executor::TaskExecutor* _executor; // (R) Not owned by us.
- OldThreadPool* _dbWorkThreadPool; // (R) Not owned by us.
+ ThreadPool* _dbWorkThreadPool; // (R) Not owned by us.
HostAndPort _source; // (R)
NamespaceString _sourceNss; // (R)
NamespaceString _destNss; // (R)
diff --git a/src/mongo/db/repl/data_replicator_external_state.h b/src/mongo/db/repl/data_replicator_external_state.h
index 818a9fdd38a..442022eeeeb 100644
--- a/src/mongo/db/repl/data_replicator_external_state.h
+++ b/src/mongo/db/repl/data_replicator_external_state.h
@@ -37,13 +37,12 @@
#include "mongo/db/repl/repl_set_config.h"
#include "mongo/rpc/metadata/oplog_query_metadata.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
+#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/net/hostandport.h"
#include "mongo/util/time_support.h"
namespace mongo {
-class OldThreadPool;
-
namespace executor {
class TaskExecutor;
} // namespace executor
@@ -79,7 +78,7 @@ public:
/**
* Returns shared db worker thread pool for collection cloning.
*/
- virtual OldThreadPool* getDbWorkThreadPool() const = 0;
+ virtual ThreadPool* getDbWorkThreadPool() const = 0;
/**
* Returns the current term and last committed optime.
diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.cpp b/src/mongo/db/repl/data_replicator_external_state_impl.cpp
index 49ea11001b1..1d93a2fb5cf 100644
--- a/src/mongo/db/repl/data_replicator_external_state_impl.cpp
+++ b/src/mongo/db/repl/data_replicator_external_state_impl.cpp
@@ -49,7 +49,7 @@ executor::TaskExecutor* DataReplicatorExternalStateImpl::getTaskExecutor() const
return _replicationCoordinatorExternalState->getTaskExecutor();
}
-OldThreadPool* DataReplicatorExternalStateImpl::getDbWorkThreadPool() const {
+ThreadPool* DataReplicatorExternalStateImpl::getDbWorkThreadPool() const {
return _replicationCoordinatorExternalState->getDbWorkThreadPool();
}
diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.h b/src/mongo/db/repl/data_replicator_external_state_impl.h
index 586409b2b50..b66dbaff53b 100644
--- a/src/mongo/db/repl/data_replicator_external_state_impl.h
+++ b/src/mongo/db/repl/data_replicator_external_state_impl.h
@@ -48,7 +48,7 @@ public:
executor::TaskExecutor* getTaskExecutor() const override;
- OldThreadPool* getDbWorkThreadPool() const override;
+ ThreadPool* getDbWorkThreadPool() const override;
OpTimeWithTerm getCurrentTermAndLastCommittedOpTime() override;
diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.cpp b/src/mongo/db/repl/data_replicator_external_state_mock.cpp
index de00a40efbf..71ad72d166d 100644
--- a/src/mongo/db/repl/data_replicator_external_state_mock.cpp
+++ b/src/mongo/db/repl/data_replicator_external_state_mock.cpp
@@ -45,7 +45,7 @@ executor::TaskExecutor* DataReplicatorExternalStateMock::getTaskExecutor() const
return taskExecutor;
}
-OldThreadPool* DataReplicatorExternalStateMock::getDbWorkThreadPool() const {
+ThreadPool* DataReplicatorExternalStateMock::getDbWorkThreadPool() const {
return dbWorkThreadPool;
}
diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.h b/src/mongo/db/repl/data_replicator_external_state_mock.h
index c556b822315..1b93307f2fa 100644
--- a/src/mongo/db/repl/data_replicator_external_state_mock.h
+++ b/src/mongo/db/repl/data_replicator_external_state_mock.h
@@ -45,7 +45,7 @@ public:
executor::TaskExecutor* getTaskExecutor() const override;
- OldThreadPool* getDbWorkThreadPool() const override;
+ ThreadPool* getDbWorkThreadPool() const override;
OpTimeWithTerm getCurrentTermAndLastCommittedOpTime() override;
@@ -66,7 +66,7 @@ public:
executor::TaskExecutor* taskExecutor = nullptr;
// DB worker thread pool. Not owned by us.
- OldThreadPool* dbWorkThreadPool = nullptr;
+ ThreadPool* dbWorkThreadPool = nullptr;
// Returned by getCurrentTermAndLastCommittedOpTime.
long long currentTerm = OpTime::kUninitializedTerm;
diff --git a/src/mongo/db/repl/database_cloner.cpp b/src/mongo/db/repl/database_cloner.cpp
index 71d11c1436a..74da2fe22be 100644
--- a/src/mongo/db/repl/database_cloner.cpp
+++ b/src/mongo/db/repl/database_cloner.cpp
@@ -101,7 +101,7 @@ BSONObj createListCollectionsCommandObject(const BSONObj& filter) {
} // namespace
DatabaseCloner::DatabaseCloner(executor::TaskExecutor* executor,
- OldThreadPool* dbWorkThreadPool,
+ ThreadPool* dbWorkThreadPool,
const HostAndPort& source,
const std::string& dbname,
const BSONObj& listCollectionsFilter,
diff --git a/src/mongo/db/repl/database_cloner.h b/src/mongo/db/repl/database_cloner.h
index e6692428efb..b32ce95f584 100644
--- a/src/mongo/db/repl/database_cloner.h
+++ b/src/mongo/db/repl/database_cloner.h
@@ -43,12 +43,10 @@
#include "mongo/executor/task_executor.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
+#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/net/hostandport.h"
namespace mongo {
-
-class OldThreadPool;
-
namespace repl {
class StorageInterface;
@@ -109,7 +107,7 @@ public:
* 'listCollectionsFilter' will be extended to include collections only, filtering out views.
*/
DatabaseCloner(executor::TaskExecutor* executor,
- OldThreadPool* dbWorkThreadPool,
+ ThreadPool* dbWorkThreadPool,
const HostAndPort& source,
const std::string& dbname,
const BSONObj& listCollectionsFilter,
@@ -213,7 +211,7 @@ private:
mutable stdx::mutex _mutex;
mutable stdx::condition_variable _condition; // (M)
executor::TaskExecutor* _executor; // (R)
- OldThreadPool* _dbWorkThreadPool; // (R)
+ ThreadPool* _dbWorkThreadPool; // (R)
const HostAndPort _source; // (R)
const std::string _dbname; // (R)
const BSONObj _listCollectionsFilter; // (R)
diff --git a/src/mongo/db/repl/databases_cloner.cpp b/src/mongo/db/repl/databases_cloner.cpp
index 2a4e47edbf2..e4cad653e4f 100644
--- a/src/mongo/db/repl/databases_cloner.cpp
+++ b/src/mongo/db/repl/databases_cloner.cpp
@@ -66,7 +66,7 @@ MONGO_EXPORT_SERVER_PARAMETER(numInitialSyncListDatabasesAttempts, int, 3);
DatabasesCloner::DatabasesCloner(StorageInterface* si,
executor::TaskExecutor* exec,
- OldThreadPool* dbWorkThreadPool,
+ ThreadPool* dbWorkThreadPool,
HostAndPort source,
IncludeDbFilterFn includeDbPred,
OnFinishFn finishFn)
diff --git a/src/mongo/db/repl/databases_cloner.h b/src/mongo/db/repl/databases_cloner.h
index f47ef569614..afb69c4ade9 100644
--- a/src/mongo/db/repl/databases_cloner.h
+++ b/src/mongo/db/repl/databases_cloner.h
@@ -44,12 +44,10 @@
#include "mongo/executor/task_executor.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
+#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/net/hostandport.h"
namespace mongo {
-
-class OldThreadPool;
-
namespace repl {
/**
@@ -70,7 +68,7 @@ public:
using OnFinishFn = stdx::function<void(const Status&)>;
DatabasesCloner(StorageInterface* si,
executor::TaskExecutor* exec,
- OldThreadPool* dbWorkThreadPool,
+ ThreadPool* dbWorkThreadPool,
HostAndPort source,
IncludeDbFilterFn includeDbPred,
OnFinishFn finishFn);
@@ -169,8 +167,8 @@ private:
mutable stdx::mutex _mutex; // (S)
Status _status{ErrorCodes::NotYetInitialized, ""}; // (M) If it is not OK, we stop everything.
executor::TaskExecutor* _exec; // (R) executor to schedule things with
- OldThreadPool* _dbWorkThreadPool; // (R) db worker thread pool for collection cloning.
- const HostAndPort _source; // (R) The source to use.
+ ThreadPool* _dbWorkThreadPool; // (R) db worker thread pool for collection cloning.
+ const HostAndPort _source; // (R) The source to use.
CollectionCloner::ScheduleDbWorkFn _scheduleDbWorkFn; // (M)
const IncludeDbFilterFn _includeDbFn; // (R) function which decides which dbs are cloned.
diff --git a/src/mongo/db/repl/databases_cloner_test.cpp b/src/mongo/db/repl/databases_cloner_test.cpp
index 3705afed383..369b8a27c38 100644
--- a/src/mongo/db/repl/databases_cloner_test.cpp
+++ b/src/mongo/db/repl/databases_cloner_test.cpp
@@ -43,8 +43,8 @@
#include "mongo/stdx/mutex.h"
#include "mongo/unittest/task_executor_proxy.h"
#include "mongo/unittest/unittest.h"
-#include "mongo/util/concurrency/old_thread_pool.h"
#include "mongo/util/concurrency/thread_name.h"
+#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/mongoutils/str.h"
#include "mongo/util/scopeguard.h"
@@ -79,14 +79,13 @@ struct StorageInterfaceResults {
class DBsClonerTest : public executor::ThreadPoolExecutorTest {
public:
- DBsClonerTest()
- : _storageInterface{}, _dbWorkThreadPool{OldThreadPool::DoNotStartThreadsTag(), 1} {}
+ DBsClonerTest() : _storageInterface{}, _dbWorkThreadPool(ThreadPool::Options()) {}
StorageInterface& getStorage() {
return _storageInterface;
}
- OldThreadPool& getDbWorkThreadPool() {
+ ThreadPool& getDbWorkThreadPool() {
return _dbWorkThreadPool;
}
@@ -186,13 +185,12 @@ protected:
std::unique_ptr<CollectionBulkLoader>(collInfo->loader));
};
- _dbWorkThreadPool.startThreads();
+ _dbWorkThreadPool.startup();
}
void tearDown() override {
getExecutor().shutdown();
getExecutor().join();
- _dbWorkThreadPool.join();
}
/**
@@ -330,7 +328,7 @@ protected:
StorageInterfaceMock _storageInterface;
private:
- OldThreadPool _dbWorkThreadPool;
+ ThreadPool _dbWorkThreadPool;
std::map<NamespaceString, CollectionMockStats> _collectionStats;
std::map<NamespaceString, CollectionCloneInfo> _collections;
StorageInterfaceResults _storageInterfaceWorkDone;
diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp
index 428be3cdb14..9d01a75c1c9 100644
--- a/src/mongo/db/repl/initial_syncer_test.cpp
+++ b/src/mongo/db/repl/initial_syncer_test.cpp
@@ -59,8 +59,8 @@
#include "mongo/executor/network_interface_mock.h"
#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
#include "mongo/stdx/mutex.h"
-#include "mongo/util/concurrency/old_thread_pool.h"
#include "mongo/util/concurrency/thread_name.h"
+#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/mongoutils/str.h"
#include "mongo/util/scopeguard.h"
@@ -227,7 +227,7 @@ public:
return *_storageInterface;
}
- OldThreadPool& getDbWorkThreadPool() {
+ ThreadPool& getDbWorkThreadPool() {
return *_dbWorkThreadPool;
}
@@ -305,7 +305,8 @@ protected:
std::unique_ptr<CollectionBulkLoader>(collInfo->loader));
};
- _dbWorkThreadPool = stdx::make_unique<OldThreadPool>(1);
+ _dbWorkThreadPool = stdx::make_unique<ThreadPool>(ThreadPool::Options());
+ _dbWorkThreadPool->startup();
Client::initThreadIfNotAlready();
reset();
@@ -403,7 +404,6 @@ protected:
void tearDown() override {
tearDownExecutorThread();
_initialSyncer.reset();
- _dbWorkThreadPool->join();
_dbWorkThreadPool.reset();
_replicationProcess.reset();
_storageInterface.reset();
@@ -439,7 +439,7 @@ protected:
std::unique_ptr<SyncSourceSelectorMock> _syncSourceSelector;
std::unique_ptr<StorageInterfaceMock> _storageInterface;
std::unique_ptr<ReplicationProcess> _replicationProcess;
- std::unique_ptr<OldThreadPool> _dbWorkThreadPool;
+ std::unique_ptr<ThreadPool> _dbWorkThreadPool;
std::map<NamespaceString, CollectionMockStats> _collectionStats;
std::map<NamespaceString, CollectionCloneInfo> _collections;
diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h
index 0730e1bed42..ec457252d2b 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state.h
@@ -38,13 +38,13 @@
#include "mongo/db/repl/oplog_buffer.h"
#include "mongo/db/repl/optime.h"
#include "mongo/stdx/functional.h"
+#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/time_support.h"
namespace mongo {
class BSONObj;
class OID;
-class OldThreadPool;
class OperationContext;
class ServiceContext;
class Status;
@@ -115,7 +115,7 @@ public:
/**
* Returns shared db worker thread pool for collection cloning.
*/
- virtual OldThreadPool* getDbWorkThreadPool() const = 0;
+ virtual ThreadPool* getDbWorkThreadPool() const = 0;
/**
* Runs the repair database command on the "local" db, if the storage engine is MMapV1.
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 0bced2a2351..81e4f554b22 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -312,7 +312,6 @@ void ReplicationCoordinatorExternalStateImpl::startThreads(const ReplSettings& s
executor::makeNetworkInterface("NetworkInterfaceASIO-RS", nullptr, std::move(hookList)));
_taskExecutor->startup();
- _dbWorkerPool = std::make_unique<OldThreadPool>(16, "db worker");
_writerPool = SyncTail::makeWriterPool();
_startedThreads = true;
@@ -360,8 +359,8 @@ executor::TaskExecutor* ReplicationCoordinatorExternalStateImpl::getTaskExecutor
return _taskExecutor.get();
}
-OldThreadPool* ReplicationCoordinatorExternalStateImpl::getDbWorkThreadPool() const {
- return _dbWorkerPool.get();
+ThreadPool* ReplicationCoordinatorExternalStateImpl::getDbWorkThreadPool() const {
+ return _writerPool.get();
}
Status ReplicationCoordinatorExternalStateImpl::runRepairOnLocalDB(OperationContext* opCtx) {
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
index 291e29ec7af..a303c88e990 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
@@ -41,7 +41,6 @@
#include "mongo/db/storage/journal_listener.h"
#include "mongo/db/storage/snapshot_manager.h"
#include "mongo/stdx/mutex.h"
-#include "mongo/util/concurrency/old_thread_pool.h"
#include "mongo/util/concurrency/thread_pool.h"
namespace mongo {
@@ -78,7 +77,7 @@ public:
virtual void startMasterSlave(OperationContext* opCtx);
virtual void shutdown(OperationContext* opCtx);
virtual executor::TaskExecutor* getTaskExecutor() const override;
- virtual OldThreadPool* getDbWorkThreadPool() const override;
+ virtual ThreadPool* getDbWorkThreadPool() const override;
virtual Status runRepairOnLocalDB(OperationContext* opCtx) override;
virtual Status initializeReplSetStorage(OperationContext* opCtx, const BSONObj& config);
virtual void waitForAllEarlierOplogWritesToBeVisible(OperationContext* opCtx);
@@ -199,10 +198,10 @@ private:
// Task executor used to run replication tasks.
std::unique_ptr<executor::TaskExecutor> _taskExecutor;
- // Used by database and collection cloners to perform storage operations.
- std::unique_ptr<OldThreadPool> _dbWorkerPool;
-
// Used by repl::multiApply() to apply the sync source's operations in parallel.
+ // Also used by database and collection cloners to perform storage operations.
+ // Cloners and oplog application run in separate phases of initial sync so it is fine to share
+ // this thread pool.
std::unique_ptr<ThreadPool> _writerPool;
// Writes a noop every 10 seconds.
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
index 0cec9f77524..bf908281c33 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
@@ -92,7 +92,7 @@ executor::TaskExecutor* ReplicationCoordinatorExternalStateMock::getTaskExecutor
return nullptr;
}
-OldThreadPool* ReplicationCoordinatorExternalStateMock::getDbWorkThreadPool() const {
+ThreadPool* ReplicationCoordinatorExternalStateMock::getDbWorkThreadPool() const {
return nullptr;
}
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
index f227fef5489..675b9ca94fe 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
@@ -65,7 +65,7 @@ public:
virtual void startMasterSlave(OperationContext*);
virtual void shutdown(OperationContext* opCtx);
virtual executor::TaskExecutor* getTaskExecutor() const override;
- virtual OldThreadPool* getDbWorkThreadPool() const override;
+ virtual ThreadPool* getDbWorkThreadPool() const override;
virtual Status runRepairOnLocalDB(OperationContext* opCtx) override;
virtual Status initializeReplSetStorage(OperationContext* opCtx, const BSONObj& config);
virtual void waitForAllEarlierOplogWritesToBeVisible(OperationContext* opCtx);
diff --git a/src/mongo/db/repl/task_runner.cpp b/src/mongo/db/repl/task_runner.cpp
index 909bb594ad2..8d08c21b746 100644
--- a/src/mongo/db/repl/task_runner.cpp
+++ b/src/mongo/db/repl/task_runner.cpp
@@ -40,7 +40,6 @@
#include "mongo/db/operation_context.h"
#include "mongo/db/service_context.h"
#include "mongo/util/assert_util.h"
-#include "mongo/util/concurrency/old_thread_pool.h"
#include "mongo/util/concurrency/thread_name.h"
#include "mongo/util/destructor_guard.h"
#include "mongo/util/log.h"
@@ -77,7 +76,7 @@ TaskRunner::Task TaskRunner::makeCancelTask() {
return [](OperationContext* opCtx, const Status& status) { return NextAction::kCancel; };
}
-TaskRunner::TaskRunner(OldThreadPool* threadPool)
+TaskRunner::TaskRunner(ThreadPool* threadPool)
: _threadPool(threadPool), _active(false), _cancelRequested(false) {
uassert(ErrorCodes::BadValue, "null thread pool", threadPool);
}
@@ -113,7 +112,7 @@ void TaskRunner::schedule(const Task& task) {
return;
}
- _threadPool->schedule([this] { _runTasks(); });
+ invariantOK(_threadPool->schedule([this] { _runTasks(); }));
_active = true;
_cancelRequested = false;
diff --git a/src/mongo/db/repl/task_runner.h b/src/mongo/db/repl/task_runner.h
index b7dcf4c05d6..310f432379d 100644
--- a/src/mongo/db/repl/task_runner.h
+++ b/src/mongo/db/repl/task_runner.h
@@ -35,11 +35,11 @@
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/functional.h"
#include "mongo/stdx/mutex.h"
+#include "mongo/util/concurrency/thread_pool.h"
namespace mongo {
class Status;
-class OldThreadPool;
class OperationContext;
namespace repl {
@@ -79,7 +79,7 @@ public:
*/
static Task makeCancelTask();
- explicit TaskRunner(OldThreadPool* threadPool);
+ explicit TaskRunner(ThreadPool* threadPool);
virtual ~TaskRunner();
@@ -156,7 +156,7 @@ private:
*/
Task _waitForNextTask();
- OldThreadPool* _threadPool;
+ ThreadPool* _threadPool;
// Protects member data of this TaskRunner.
mutable stdx::mutex _mutex;
diff --git a/src/mongo/db/repl/task_runner_test.cpp b/src/mongo/db/repl/task_runner_test.cpp
index d2f9d3440e0..0be2eebb0cd 100644
--- a/src/mongo/db/repl/task_runner_test.cpp
+++ b/src/mongo/db/repl/task_runner_test.cpp
@@ -36,7 +36,7 @@
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
#include "mongo/unittest/barrier.h"
-#include "mongo/util/concurrency/old_thread_pool.h"
+#include "mongo/util/concurrency/thread_pool.h"
namespace {
@@ -68,7 +68,7 @@ TEST_F(TaskRunnerTest, CallbackValues) {
return TaskRunner::NextAction::kCancel;
};
getTaskRunner().schedule(task);
- getThreadPool().join();
+ getThreadPool().waitForIdle();
ASSERT_FALSE(getTaskRunner().isActive());
stdx::lock_guard<stdx::mutex> lk(mutex);
@@ -107,7 +107,7 @@ OpIdVector _testRunTaskTwice(TaskRunnerTest& test,
ASSERT_TRUE(test.getTaskRunner().isActive());
barrier.countDownAndWait();
- test.getThreadPool().join();
+ test.getThreadPool().waitForIdle();
ASSERT_FALSE(test.getTaskRunner().isActive());
stdx::lock_guard<stdx::mutex> lk(mutex);
@@ -133,7 +133,7 @@ TEST_F(TaskRunnerTest, RunTaskTwiceDisposeOperationContext) {
// thread back to pool after disposing of operation context.
TEST_F(TaskRunnerTest, RunTaskTwiceDisposeOperationContextJoinThreadPoolBeforeScheduling) {
auto schedule = [this](const Task& task) {
- getThreadPool().join();
+ getThreadPool().waitForIdle();
getTaskRunner().schedule(task);
};
auto txnId =
@@ -177,7 +177,7 @@ TEST_F(TaskRunnerTest, SkipSecondTask) {
schedulingDone = true;
condition.notify_all();
}
- getThreadPool().join();
+ getThreadPool().waitForIdle();
ASSERT_FALSE(getTaskRunner().isActive());
stdx::lock_guard<stdx::mutex> lk(mutex);
@@ -226,7 +226,7 @@ TEST_F(TaskRunnerTest, FirstTaskThrowsException) {
schedulingDone = true;
condition.notify_all();
}
- getThreadPool().join();
+ getThreadPool().waitForIdle();
ASSERT_FALSE(getTaskRunner().isActive());
stdx::lock_guard<stdx::mutex> lk(mutex);
@@ -270,7 +270,7 @@ TEST_F(TaskRunnerTest, Cancel) {
getTaskRunner().cancel();
getTaskRunner().cancel();
- getThreadPool().join();
+ getThreadPool().waitForIdle();
ASSERT_FALSE(getTaskRunner().isActive());
// This status will not be OK if canceling the task runner
@@ -345,7 +345,7 @@ TEST_F(TaskRunnerTest, DestroyShouldWaitForTasksToComplete) {
destroyTaskRunner();
- getThreadPool().join();
+ getThreadPool().waitForIdle();
// This status will not be OK if canceling the task runner
// before scheduling the task results in the task being canceled.
diff --git a/src/mongo/db/repl/task_runner_test_fixture.cpp b/src/mongo/db/repl/task_runner_test_fixture.cpp
index 25360440871..f16bfa67be0 100644
--- a/src/mongo/db/repl/task_runner_test_fixture.cpp
+++ b/src/mongo/db/repl/task_runner_test_fixture.cpp
@@ -34,7 +34,6 @@
#include "mongo/db/repl/task_runner.h"
#include "mongo/stdx/functional.h"
#include "mongo/stdx/memory.h"
-#include "mongo/util/concurrency/old_thread_pool.h"
namespace mongo {
namespace repl {
@@ -42,12 +41,6 @@ namespace repl {
using namespace mongo;
using namespace mongo::repl;
-namespace {
-
-const int kNumThreads = 3;
-
-} // namespace
-
Status TaskRunnerTest::getDetectableErrorStatus() {
return Status(ErrorCodes::InternalError, "Not mutated");
}
@@ -57,7 +50,7 @@ TaskRunner& TaskRunnerTest::getTaskRunner() const {
return *_taskRunner;
}
-OldThreadPool& TaskRunnerTest::getThreadPool() const {
+ThreadPool& TaskRunnerTest::getThreadPool() const {
ASSERT(_threadPool.get());
return *_threadPool;
}
@@ -67,7 +60,11 @@ void TaskRunnerTest::destroyTaskRunner() {
}
void TaskRunnerTest::setUp() {
- _threadPool = stdx::make_unique<OldThreadPool>(kNumThreads, "TaskRunnerTest-");
+ ThreadPool::Options options;
+ options.poolName = "TaskRunnerTest";
+ _threadPool = stdx::make_unique<ThreadPool>(options);
+ _threadPool->startup();
+
_taskRunner = stdx::make_unique<TaskRunner>(_threadPool.get());
}
diff --git a/src/mongo/db/repl/task_runner_test_fixture.h b/src/mongo/db/repl/task_runner_test_fixture.h
index 84351c01926..2787419a1e5 100644
--- a/src/mongo/db/repl/task_runner_test_fixture.h
+++ b/src/mongo/db/repl/task_runner_test_fixture.h
@@ -33,11 +33,11 @@
#include "mongo/base/status.h"
#include "mongo/db/service_context.h"
#include "mongo/unittest/unittest.h"
+#include "mongo/util/concurrency/thread_pool.h"
namespace mongo {
class Client;
-class OldThreadPool;
class OperationContext;
namespace repl {
@@ -52,7 +52,7 @@ class TaskRunnerTest : public unittest::Test {
public:
static Status getDetectableErrorStatus();
- OldThreadPool& getThreadPool() const;
+ ThreadPool& getThreadPool() const;
TaskRunner& getTaskRunner() const;
void destroyTaskRunner();
@@ -62,7 +62,7 @@ protected:
void tearDown() override;
private:
- std::unique_ptr<OldThreadPool> _threadPool;
+ std::unique_ptr<ThreadPool> _threadPool;
std::unique_ptr<TaskRunner> _taskRunner;
};