summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorMatthew Saltz <matthew.saltz@mongodb.com>2020-02-20 13:46:31 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-02-20 20:49:40 +0000
commita8fdddf98dd47b65b05ce28d41214ba87446f626 (patch)
tree8c0e94d19f73e60c7c58ff4bf662a42ae8554638 /src/mongo
parentc73b7af4489ed839ab9f337bad08d9a6c93220e4 (diff)
downloadmongo-a8fdddf98dd47b65b05ce28d41214ba87446f626.tar.gz
SERVER-46075 Make new executor for submitting range deletion tasks
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/s/collection_sharding_state_factory_shard.cpp20
-rw-r--r--src/mongo/db/s/collection_sharding_state_factory_shard.h4
-rw-r--r--src/mongo/db/s/migration_util.cpp50
3 files changed, 48 insertions, 26 deletions
diff --git a/src/mongo/db/s/collection_sharding_state_factory_shard.cpp b/src/mongo/db/s/collection_sharding_state_factory_shard.cpp
index 15a4d035873..626564fff79 100644
--- a/src/mongo/db/s/collection_sharding_state_factory_shard.cpp
+++ b/src/mongo/db/s/collection_sharding_state_factory_shard.cpp
@@ -48,20 +48,22 @@ CollectionShardingStateFactoryShard::~CollectionShardingStateFactoryShard() {
}
void CollectionShardingStateFactoryShard::join() {
- if (_taskExecutor) {
- _taskExecutor->shutdown();
- _taskExecutor->join();
+ if (_rangeDeletionExecutor) {
+ _rangeDeletionExecutor->shutdown();
+ _rangeDeletionExecutor->join();
}
}
std::unique_ptr<CollectionShardingState> CollectionShardingStateFactoryShard::make(
const NamespaceString& nss) {
- return std::make_unique<CollectionShardingRuntime>(_serviceContext, nss, _getExecutor());
+ return std::make_unique<CollectionShardingRuntime>(
+ _serviceContext, nss, _getRangeDeletionExecutor());
}
-std::shared_ptr<executor::TaskExecutor> CollectionShardingStateFactoryShard::_getExecutor() {
+std::shared_ptr<executor::TaskExecutor>
+CollectionShardingStateFactoryShard::_getRangeDeletionExecutor() {
stdx::lock_guard<Latch> lg(_mutex);
- if (!_taskExecutor) {
+ if (!_rangeDeletionExecutor) {
const std::string kExecName("CollectionRangeDeleter-TaskExecutor");
auto net = executor::makeNetworkInterface(kExecName);
@@ -70,11 +72,11 @@ std::shared_ptr<executor::TaskExecutor> CollectionShardingStateFactoryShard::_ge
std::make_shared<executor::ThreadPoolTaskExecutor>(std::move(pool), std::move(net));
taskExecutor->startup();
- _taskExecutor = std::move(taskExecutor);
+ _rangeDeletionExecutor = std::move(taskExecutor);
}
- return _taskExecutor;
+ return _rangeDeletionExecutor;
}
-} // namespace mongo \ No newline at end of file
+} // namespace mongo
diff --git a/src/mongo/db/s/collection_sharding_state_factory_shard.h b/src/mongo/db/s/collection_sharding_state_factory_shard.h
index f24929b4694..a3a0c04e82a 100644
--- a/src/mongo/db/s/collection_sharding_state_factory_shard.h
+++ b/src/mongo/db/s/collection_sharding_state_factory_shard.h
@@ -45,13 +45,13 @@ public:
std::unique_ptr<CollectionShardingState> make(const NamespaceString& nss) override;
private:
- std::shared_ptr<executor::TaskExecutor> _getExecutor();
+ std::shared_ptr<executor::TaskExecutor> _getRangeDeletionExecutor();
// Serializes the instantiation of the task executor
Mutex _mutex = MONGO_MAKE_LATCH("CollectionShardingStateFactoryShard::_mutex");
// Required to be a shared_ptr since it is used as an executor for ExecutorFutures.
- std::shared_ptr<executor::TaskExecutor> _taskExecutor = {nullptr};
+ std::shared_ptr<executor::TaskExecutor> _rangeDeletionExecutor = {nullptr};
};
} // namespace mongo
diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp
index c7eb2183d85..760f058ad52 100644
--- a/src/mongo/db/s/migration_util.cpp
+++ b/src/mongo/db/s/migration_util.cpp
@@ -59,6 +59,7 @@
#include "mongo/s/client/shard.h"
#include "mongo/s/grid.h"
#include "mongo/s/request_types/ensure_chunk_version_is_greater_than_gen.h"
+#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/exit.h"
#include "mongo/util/log.h"
@@ -103,6 +104,27 @@ void sendToRecipient(OperationContext* opCtx,
uassertStatusOK(Shard::CommandResponse::getEffectiveStatus(response));
}
+// Returns an executor to be used to run commands related to submitting tasks to the range deleter.
+// The executor is initialized on the first call to this function. Uses a shared_ptr
+// because a shared_ptr is required to work with ExecutorFutures.
+static std::shared_ptr<ThreadPool> getMigrationUtilExecutor() {
+ static Mutex mutex = MONGO_MAKE_LATCH("MigrationUtilExecutor::_mutex");
+ static std::shared_ptr<ThreadPool> executor;
+
+ stdx::lock_guard<Latch> lg(mutex);
+ if (!executor) {
+ ThreadPool::Options options;
+ options.poolName = "MoveChunk";
+ options.minThreads = 0;
+ options.maxThreads = 16;
+ executor = std::make_shared<ThreadPool>(std::move(options));
+ executor->startup();
+ }
+
+ return executor;
+}
+
+
} // namespace
BSONObj makeMigrationStatusDocument(const NamespaceString& nss,
@@ -138,7 +160,7 @@ bool checkForConflictingDeletions(OperationContext* opCtx,
ExecutorFuture<void> submitRangeDeletionTask(OperationContext* opCtx,
const RangeDeletionTask& deletionTask) {
const auto serviceContext = opCtx->getServiceContext();
- return ExecutorFuture<void>(Grid::get(serviceContext)->getExecutorPool()->getFixedExecutor())
+ return ExecutorFuture<void>(getMigrationUtilExecutor())
.then([=] {
ThreadClient tc(kRangeDeletionThreadName, serviceContext);
{
@@ -242,19 +264,18 @@ void submitPendingDeletions(OperationContext* opCtx) {
void resubmitRangeDeletionsOnStepUp(ServiceContext* serviceContext) {
LOGV2(22028, "Starting pending deletion submission thread.");
- auto executor = Grid::get(serviceContext)->getExecutorPool()->getFixedExecutor();
-
- ExecutorFuture<void>(executor).getAsync([serviceContext](const Status& status) {
- ThreadClient tc("ResubmitRangeDeletions", serviceContext);
- {
- stdx::lock_guard<Client> lk(*tc.get());
- tc->setSystemOperationKillable(lk);
- }
+ ExecutorFuture<void>(getMigrationUtilExecutor())
+ .getAsync([serviceContext](const Status& status) {
+ ThreadClient tc("ResubmitRangeDeletions", serviceContext);
+ {
+ stdx::lock_guard<Client> lk(*tc.get());
+ tc->setSystemOperationKillable(lk);
+ }
- auto opCtx = tc->makeOperationContext();
+ auto opCtx = tc->makeOperationContext();
- submitPendingDeletions(opCtx.get());
- });
+ submitPendingDeletions(opCtx.get());
+ });
}
void dropRangeDeletionsCollection(OperationContext* opCtx) {
@@ -626,8 +647,7 @@ void refreshFilteringMetadataUntilSuccess(OperationContext* opCtx, const Namespa
void resumeMigrationCoordinationsOnStepUp(ServiceContext* serviceContext) {
LOGV2(22037, "Starting migration coordinator stepup recovery thread.");
- auto executor = Grid::get(serviceContext)->getExecutorPool()->getFixedExecutor();
- ExecutorFuture<void>(executor)
+ ExecutorFuture<void>(getMigrationUtilExecutor())
.then([serviceContext] {
ThreadClient tc("MigrationCoordinatorStepupRecovery", serviceContext);
{
@@ -659,7 +679,7 @@ void resumeMigrationCoordinationsOnStepUp(ServiceContext* serviceContext) {
"lastOpTime"_attr = lastOpTime);
return WaitForMajorityService::get(serviceContext).waitUntilMajority(lastOpTime);
})
- .thenRunOn(executor)
+ .thenRunOn(getMigrationUtilExecutor())
.then([serviceContext]() {
ThreadClient tc("MigrationCoordinatorStepupRecovery", serviceContext);
{