summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/s/balancer/migration_manager.cpp6
-rw-r--r--src/mongo/db/s/collection_metadata_filtering_test.cpp2
-rw-r--r--src/mongo/db/s/metadata_manager_test.cpp2
-rw-r--r--src/mongo/db/s/migration_source_manager.cpp3
-rw-r--r--src/mongo/db/s/transaction_coordinator_futures_util.h5
-rw-r--r--src/mongo/executor/async_multicaster.cpp5
-rw-r--r--src/mongo/executor/async_multicaster.h5
-rw-r--r--src/mongo/executor/scoped_task_executor.cpp8
-rw-r--r--src/mongo/executor/scoped_task_executor.h2
-rw-r--r--src/mongo/executor/scoped_task_executor_test.cpp8
-rw-r--r--src/mongo/executor/task_executor_pool.cpp12
-rw-r--r--src/mongo/executor/task_executor_pool.h12
-rw-r--r--src/mongo/executor/thread_pool_task_executor_test_fixture.cpp7
-rw-r--r--src/mongo/executor/thread_pool_task_executor_test_fixture.h4
-rw-r--r--src/mongo/s/async_requests_sender.cpp4
-rw-r--r--src/mongo/s/async_requests_sender.h2
-rw-r--r--src/mongo/s/client/shard_remote.cpp3
-rw-r--r--src/mongo/s/multi_statement_transaction_requests_sender.cpp11
-rw-r--r--src/mongo/s/multi_statement_transaction_requests_sender.h2
-rw-r--r--src/mongo/s/query/async_results_merger.cpp4
-rw-r--r--src/mongo/s/query/async_results_merger.h4
-rw-r--r--src/mongo/s/query/blocking_results_merger.cpp4
-rw-r--r--src/mongo/s/query/blocking_results_merger.h6
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner.cpp4
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner.h4
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.cpp17
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.h11
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl_test.cpp8
-rw-r--r--src/mongo/s/query/document_source_merge_cursors.cpp8
-rw-r--r--src/mongo/s/query/document_source_merge_cursors.h8
-rw-r--r--src/mongo/s/query/document_source_update_on_add_shard.cpp13
-rw-r--r--src/mongo/s/query/document_source_update_on_add_shard.h8
-rw-r--r--src/mongo/s/query/establish_cursors.cpp4
-rw-r--r--src/mongo/s/query/establish_cursors.h3
-rw-r--r--src/mongo/s/query/owned_remote_cursor.h6
-rw-r--r--src/mongo/s/query/router_stage_merge.h7
-rw-r--r--src/mongo/s/query/store_possible_cursor.cpp4
-rw-r--r--src/mongo/s/query/store_possible_cursor.h2
-rw-r--r--src/mongo/s/sharding_initialization.cpp6
-rw-r--r--src/mongo/s/sharding_mongod_test_fixture.cpp8
-rw-r--r--src/mongo/s/sharding_mongod_test_fixture.h2
-rw-r--r--src/mongo/s/sharding_router_test_fixture.cpp19
-rw-r--r--src/mongo/s/sharding_router_test_fixture.h4
43 files changed, 150 insertions, 117 deletions
diff --git a/src/mongo/db/s/balancer/migration_manager.cpp b/src/mongo/db/s/balancer/migration_manager.cpp
index 9c409334636..b131bbafde7 100644
--- a/src/mongo/db/s/balancer/migration_manager.cpp
+++ b/src/mongo/db/s/balancer/migration_manager.cpp
@@ -381,8 +381,7 @@ void MigrationManager::finishRecovery(OperationContext* opCtx,
}
void MigrationManager::interruptAndDisableMigrations() {
- executor::TaskExecutor* const executor =
- Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor();
+ auto executor = Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor();
stdx::lock_guard<stdx::mutex> lock(_mutex);
invariant(_state == State::kEnabled || _state == State::kRecovering);
@@ -479,8 +478,7 @@ void MigrationManager::_schedule(WithLock lock,
OperationContext* opCtx,
const HostAndPort& targetHost,
Migration migration) {
- executor::TaskExecutor* const executor =
- Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
+ auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
const NamespaceString nss(migration.nss);
diff --git a/src/mongo/db/s/collection_metadata_filtering_test.cpp b/src/mongo/db/s/collection_metadata_filtering_test.cpp
index 6463636c22a..d125e651adc 100644
--- a/src/mongo/db/s/collection_metadata_filtering_test.cpp
+++ b/src/mongo/db/s/collection_metadata_filtering_test.cpp
@@ -44,7 +44,7 @@ class CollectionMetadataFilteringTest : public ShardServerTestFixture {
protected:
void setUp() override {
ShardServerTestFixture::setUp();
- _manager = std::make_shared<MetadataManager>(getServiceContext(), kNss, executor());
+ _manager = std::make_shared<MetadataManager>(getServiceContext(), kNss, executor().get());
}
/**
diff --git a/src/mongo/db/s/metadata_manager_test.cpp b/src/mongo/db/s/metadata_manager_test.cpp
index 99c6ca25c5f..1e367cf7aea 100644
--- a/src/mongo/db/s/metadata_manager_test.cpp
+++ b/src/mongo/db/s/metadata_manager_test.cpp
@@ -64,7 +64,7 @@ class MetadataManagerTest : public ShardServerTestFixture {
protected:
void setUp() override {
ShardServerTestFixture::setUp();
- _manager = std::make_shared<MetadataManager>(getServiceContext(), kNss, executor());
+ _manager = std::make_shared<MetadataManager>(getServiceContext(), kNss, executor().get());
}
/**
diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp
index 65d557688d6..4e9fb2fd0ff 100644
--- a/src/mongo/db/s/migration_source_manager.cpp
+++ b/src/mongo/db/s/migration_source_manager.cpp
@@ -109,8 +109,7 @@ void refreshRecipientRoutingTable(OperationContext* opCtx,
opCtx,
executor::RemoteCommandRequest::kNoTimeout);
- executor::TaskExecutor* const executor =
- Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
+ auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
auto noOp = [](const executor::TaskExecutor::RemoteCommandCallbackArgs&) {};
executor->scheduleRemoteCommand(request, noOp).getStatus().ignore();
}
diff --git a/src/mongo/db/s/transaction_coordinator_futures_util.h b/src/mongo/db/s/transaction_coordinator_futures_util.h
index ee35d8e4db3..1c654d8707f 100644
--- a/src/mongo/db/s/transaction_coordinator_futures_util.h
+++ b/src/mongo/db/s/transaction_coordinator_futures_util.h
@@ -29,6 +29,7 @@
#pragma once
+#include <memory>
#include <vector>
#include "mongo/client/read_preference.h"
@@ -192,8 +193,8 @@ private:
// Service context under which this executor runs
ServiceContext* const _serviceContext;
- // Cached reference to the executor to use
- executor::TaskExecutor* const _executor;
+ // Executor for performing async tasks.
+ std::shared_ptr<executor::TaskExecutor> _executor;
// If this work scheduler was constructed through 'makeChildScheduler', points to the parent
// scheduler and contains the iterator from the parent, which needs to be removed on destruction
diff --git a/src/mongo/executor/async_multicaster.cpp b/src/mongo/executor/async_multicaster.cpp
index f1c97acf5a3..24e72527d51 100644
--- a/src/mongo/executor/async_multicaster.cpp
+++ b/src/mongo/executor/async_multicaster.cpp
@@ -44,8 +44,9 @@
namespace mongo {
namespace executor {
-AsyncMulticaster::AsyncMulticaster(executor::TaskExecutor* executor, Options options)
- : _options(options), _executor(executor) {}
+AsyncMulticaster::AsyncMulticaster(std::shared_ptr<executor::TaskExecutor> executor,
+ Options options)
+ : _options(options), _executor(std::move(executor)) {}
std::vector<AsyncMulticaster::Reply> AsyncMulticaster::multicast(
const std::vector<HostAndPort> servers,
diff --git a/src/mongo/executor/async_multicaster.h b/src/mongo/executor/async_multicaster.h
index 2433f1fd45b..c2bc9e0be93 100644
--- a/src/mongo/executor/async_multicaster.h
+++ b/src/mongo/executor/async_multicaster.h
@@ -29,6 +29,7 @@
#pragma once
+#include <memory>
#include <vector>
#include "mongo/executor/remote_command_response.h"
@@ -54,7 +55,7 @@ public:
size_t maxConcurrency = kMaxConcurrency;
};
- AsyncMulticaster(executor::TaskExecutor* executor, Options options);
+ AsyncMulticaster(std::shared_ptr<executor::TaskExecutor> executor, Options options);
/**
* Sends the cmd out to all passed servers (via the executor), observing the multicaster's
@@ -73,7 +74,7 @@ public:
private:
Options _options;
- executor::TaskExecutor* _executor;
+ std::shared_ptr<executor::TaskExecutor> _executor;
};
} // namespace executor
diff --git a/src/mongo/executor/scoped_task_executor.cpp b/src/mongo/executor/scoped_task_executor.cpp
index e891d3c69a8..0f718242163 100644
--- a/src/mongo/executor/scoped_task_executor.cpp
+++ b/src/mongo/executor/scoped_task_executor.cpp
@@ -49,7 +49,7 @@ class ScopedTaskExecutor::Impl : public std::enable_shared_from_this<ScopedTaskE
Status(ErrorCodes::ShutdownInProgress, "Shutting down ScopedTaskExecutor::Impl");
public:
- explicit Impl(TaskExecutor* executor) : _executor(executor) {}
+ explicit Impl(std::shared_ptr<TaskExecutor> executor) : _executor(std::move(executor)) {}
~Impl() {
// The ScopedTaskExecutor dtor calls shutdown, so this is guaranteed.
@@ -304,7 +304,7 @@ private:
stdx::mutex _mutex;
bool _inShutdown = false;
- TaskExecutor* const _executor;
+ std::shared_ptr<TaskExecutor> _executor;
size_t _id = 0;
stdx::unordered_map<size_t, CallbackHandle> _cbHandles;
@@ -313,8 +313,8 @@ private:
stdx::condition_variable _cv;
};
-ScopedTaskExecutor::ScopedTaskExecutor(TaskExecutor* executor)
- : _executor(std::make_shared<Impl>(executor)) {}
+ScopedTaskExecutor::ScopedTaskExecutor(std::shared_ptr<TaskExecutor> executor)
+ : _executor(std::make_shared<Impl>(std::move(executor))) {}
ScopedTaskExecutor::~ScopedTaskExecutor() {
_executor->shutdown();
diff --git a/src/mongo/executor/scoped_task_executor.h b/src/mongo/executor/scoped_task_executor.h
index 122308e489a..1c42e867fc7 100644
--- a/src/mongo/executor/scoped_task_executor.h
+++ b/src/mongo/executor/scoped_task_executor.h
@@ -83,7 +83,7 @@ namespace executor {
*/
class ScopedTaskExecutor {
public:
- explicit ScopedTaskExecutor(TaskExecutor* executor);
+ explicit ScopedTaskExecutor(std::shared_ptr<TaskExecutor> executor);
// Delete all move/copy-ability
ScopedTaskExecutor(TaskExecutor&&) = delete;
diff --git a/src/mongo/executor/scoped_task_executor_test.cpp b/src/mongo/executor/scoped_task_executor_test.cpp
index 5ce217ecddf..ec078cb4dde 100644
--- a/src/mongo/executor/scoped_task_executor_test.cpp
+++ b/src/mongo/executor/scoped_task_executor_test.cpp
@@ -46,10 +46,10 @@ public:
void setUp() override {
auto net = std::make_unique<NetworkInterfaceMock>();
_net = net.get();
- _tpte.emplace(std::make_unique<ThreadPoolMock>(_net, 1, ThreadPoolMock::Options{}),
- std::move(net));
+ _tpte = std::make_shared<ThreadPoolTaskExecutor>(
+ std::make_unique<ThreadPoolMock>(_net, 1, ThreadPoolMock::Options{}), std::move(net));
_tpte->startup();
- _executor.emplace(_tpte.get_ptr());
+ _executor.emplace(_tpte);
}
void tearDown() override {
@@ -118,7 +118,7 @@ public:
private:
NetworkInterfaceMock* _net;
- boost::optional<ThreadPoolTaskExecutor> _tpte;
+ std::shared_ptr<ThreadPoolTaskExecutor> _tpte;
boost::optional<ScopedTaskExecutor> _executor;
};
diff --git a/src/mongo/executor/task_executor_pool.cpp b/src/mongo/executor/task_executor_pool.cpp
index e77e5eb665a..2f7bd5d95d8 100644
--- a/src/mongo/executor/task_executor_pool.cpp
+++ b/src/mongo/executor/task_executor_pool.cpp
@@ -73,8 +73,8 @@ void TaskExecutorPool::shutdownAndJoin() {
}
}
-void TaskExecutorPool::addExecutors(std::vector<std::unique_ptr<TaskExecutor>> executors,
- std::unique_ptr<TaskExecutor> fixedExecutor) {
+void TaskExecutorPool::addExecutors(std::vector<std::shared_ptr<TaskExecutor>> executors,
+ std::shared_ptr<TaskExecutor> fixedExecutor) {
invariant(_executors.empty());
invariant(fixedExecutor);
invariant(!_fixedExecutor);
@@ -83,15 +83,15 @@ void TaskExecutorPool::addExecutors(std::vector<std::unique_ptr<TaskExecutor>> e
_executors = std::move(executors);
}
-TaskExecutor* TaskExecutorPool::getArbitraryExecutor() {
+const std::shared_ptr<TaskExecutor>& TaskExecutorPool::getArbitraryExecutor() {
invariant(!_executors.empty());
uint64_t idx = (_counter.fetchAndAdd(1) % _executors.size());
- return _executors[idx].get();
+ return _executors[idx];
}
-TaskExecutor* TaskExecutorPool::getFixedExecutor() {
+const std::shared_ptr<TaskExecutor>& TaskExecutorPool::getFixedExecutor() {
invariant(_fixedExecutor);
- return _fixedExecutor.get();
+ return _fixedExecutor;
}
void TaskExecutorPool::appendConnectionStats(ConnectionPoolStats* stats) const {
diff --git a/src/mongo/executor/task_executor_pool.h b/src/mongo/executor/task_executor_pool.h
index b70a24c705f..05b307a9099 100644
--- a/src/mongo/executor/task_executor_pool.h
+++ b/src/mongo/executor/task_executor_pool.h
@@ -78,8 +78,8 @@ public:
* Adds 'executors' and 'fixedExecutor' to the pool. May be called at most once to initialize an
* empty pool.
*/
- void addExecutors(std::vector<std::unique_ptr<TaskExecutor>> executors,
- std::unique_ptr<TaskExecutor> fixedExecutor);
+ void addExecutors(std::vector<std::shared_ptr<TaskExecutor>> executors,
+ std::shared_ptr<TaskExecutor> fixedExecutor);
/**
* Returns a pointer to one of the executors in the pool. Two calls to this method may return
@@ -90,7 +90,7 @@ public:
*
* Thread-safe.
*/
- TaskExecutor* getArbitraryExecutor();
+ const std::shared_ptr<TaskExecutor>& getArbitraryExecutor();
/**
* Returns a pointer to the pool's fixed executor. Every call to this method will return the
@@ -101,7 +101,7 @@ public:
*
* Thread-safe.
*/
- TaskExecutor* getFixedExecutor();
+ const std::shared_ptr<TaskExecutor>& getFixedExecutor();
/**
* Appends connection information from all of the executors in the pool.
@@ -115,9 +115,9 @@ public:
private:
AtomicWord<unsigned> _counter;
- std::vector<std::unique_ptr<TaskExecutor>> _executors;
+ std::vector<std::shared_ptr<TaskExecutor>> _executors;
- std::unique_ptr<TaskExecutor> _fixedExecutor;
+ std::shared_ptr<TaskExecutor> _fixedExecutor;
};
} // namespace executor
diff --git a/src/mongo/executor/thread_pool_task_executor_test_fixture.cpp b/src/mongo/executor/thread_pool_task_executor_test_fixture.cpp
index ce0efbda2da..4498692f0a5 100644
--- a/src/mongo/executor/thread_pool_task_executor_test_fixture.cpp
+++ b/src/mongo/executor/thread_pool_task_executor_test_fixture.cpp
@@ -45,6 +45,13 @@ std::unique_ptr<ThreadPoolTaskExecutor> makeThreadPoolTestExecutor(
std::make_unique<ThreadPoolMock>(netPtr, 1, std::move(options)), std::move(net));
}
+std::shared_ptr<ThreadPoolTaskExecutor> makeSharedThreadPoolTestExecutor(
+ std::unique_ptr<NetworkInterfaceMock> net, ThreadPoolMock::Options options) {
+ auto netPtr = net.get();
+ return std::make_shared<ThreadPoolTaskExecutor>(
+ std::make_unique<ThreadPoolMock>(netPtr, 1, std::move(options)), std::move(net));
+}
+
ThreadPoolExecutorTest::ThreadPoolExecutorTest() {}
ThreadPoolExecutorTest::ThreadPoolExecutorTest(ThreadPoolMock::Options options)
diff --git a/src/mongo/executor/thread_pool_task_executor_test_fixture.h b/src/mongo/executor/thread_pool_task_executor_test_fixture.h
index 2519de104f1..07d593b244f 100644
--- a/src/mongo/executor/thread_pool_task_executor_test_fixture.h
+++ b/src/mongo/executor/thread_pool_task_executor_test_fixture.h
@@ -46,6 +46,10 @@ std::unique_ptr<ThreadPoolTaskExecutor> makeThreadPoolTestExecutor(
std::unique_ptr<NetworkInterfaceMock> net,
executor::ThreadPoolMock::Options options = executor::ThreadPoolMock::Options());
+std::shared_ptr<ThreadPoolTaskExecutor> makeSharedThreadPoolTestExecutor(
+ std::unique_ptr<NetworkInterfaceMock> net,
+ executor::ThreadPoolMock::Options options = executor::ThreadPoolMock::Options());
+
/**
* Useful fixture class for tests that use a ThreadPoolTaskExecutor.
*/
diff --git a/src/mongo/s/async_requests_sender.cpp b/src/mongo/s/async_requests_sender.cpp
index 494dd4edb8f..0d14cf81015 100644
--- a/src/mongo/s/async_requests_sender.cpp
+++ b/src/mongo/s/async_requests_sender.cpp
@@ -59,7 +59,7 @@ const int kMaxNumFailedHostRetryAttempts = 3;
} // namespace
AsyncRequestsSender::AsyncRequestsSender(OperationContext* opCtx,
- executor::TaskExecutor* executor,
+ std::shared_ptr<executor::TaskExecutor> executor,
StringData dbName,
const std::vector<AsyncRequestsSender::Request>& requests,
const ReadPreferenceSetting& readPreference,
@@ -68,7 +68,7 @@ AsyncRequestsSender::AsyncRequestsSender(OperationContext* opCtx,
_db(dbName.toString()),
_readPreference(readPreference),
_retryPolicy(retryPolicy),
- _subExecutor(executor),
+ _subExecutor(std::move(executor)),
_subBaton(opCtx->getBaton()->makeSubBaton()) {
_remotesLeft = requests.size();
diff --git a/src/mongo/s/async_requests_sender.h b/src/mongo/s/async_requests_sender.h
index 95c457e2e7f..3e14bec56d3 100644
--- a/src/mongo/s/async_requests_sender.h
+++ b/src/mongo/s/async_requests_sender.h
@@ -131,7 +131,7 @@ public:
* valid for the lifetime of the ARS.
*/
AsyncRequestsSender(OperationContext* opCtx,
- executor::TaskExecutor* executor,
+ std::shared_ptr<executor::TaskExecutor> executor,
StringData dbName,
const std::vector<AsyncRequestsSender::Request>& requests,
const ReadPreferenceSetting& readPreference,
diff --git a/src/mongo/s/client/shard_remote.cpp b/src/mongo/s/client/shard_remote.cpp
index 5e5b5b3ecb9..362160babec 100644
--- a/src/mongo/s/client/shard_remote.cpp
+++ b/src/mongo/s/client/shard_remote.cpp
@@ -301,7 +301,8 @@ StatusWith<Shard::QueryResponse> ShardRemote::_runExhaustiveCursorCommand(
const Milliseconds requestTimeout =
std::min(opCtx->getRemainingMaxTimeMillis(), maxTimeMSOverride);
- Fetcher fetcher(Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(),
+ auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
+ Fetcher fetcher(executor.get(),
host.getValue(),
dbName.toString(),
cmdObj,
diff --git a/src/mongo/s/multi_statement_transaction_requests_sender.cpp b/src/mongo/s/multi_statement_transaction_requests_sender.cpp
index 9671914d315..c302ac28d2b 100644
--- a/src/mongo/s/multi_statement_transaction_requests_sender.cpp
+++ b/src/mongo/s/multi_statement_transaction_requests_sender.cpp
@@ -74,15 +74,18 @@ void processReplyMetadata(OperationContext* opCtx, const AsyncRequestsSender::Re
MultiStatementTransactionRequestsSender::MultiStatementTransactionRequestsSender(
OperationContext* opCtx,
- executor::TaskExecutor* executor,
+ std::shared_ptr<executor::TaskExecutor> executor,
StringData dbName,
const std::vector<AsyncRequestsSender::Request>& requests,
const ReadPreferenceSetting& readPreference,
Shard::RetryPolicy retryPolicy)
: _opCtx(opCtx),
- _ars(
- opCtx, executor, dbName, attachTxnDetails(opCtx, requests), readPreference, retryPolicy) {
-}
+ _ars(opCtx,
+ std::move(executor),
+ dbName,
+ attachTxnDetails(opCtx, requests),
+ readPreference,
+ retryPolicy) {}
bool MultiStatementTransactionRequestsSender::done() {
return _ars.done();
diff --git a/src/mongo/s/multi_statement_transaction_requests_sender.h b/src/mongo/s/multi_statement_transaction_requests_sender.h
index af19762ea6f..0d93d031b99 100644
--- a/src/mongo/s/multi_statement_transaction_requests_sender.h
+++ b/src/mongo/s/multi_statement_transaction_requests_sender.h
@@ -47,7 +47,7 @@ public:
*/
MultiStatementTransactionRequestsSender(
OperationContext* opCtx,
- executor::TaskExecutor* executor,
+ std::shared_ptr<executor::TaskExecutor> executor,
StringData dbName,
const std::vector<AsyncRequestsSender::Request>& requests,
const ReadPreferenceSetting& readPreference,
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index 71701cfa4a9..8ee68933d49 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -82,10 +82,10 @@ int compareSortKeys(BSONObj leftSortKey, BSONObj rightSortKey, BSONObj sortKeyPa
} // namespace
AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx,
- executor::TaskExecutor* executor,
+ std::shared_ptr<executor::TaskExecutor> executor,
AsyncResultsMergerParams params)
: _opCtx(opCtx),
- _executor(executor),
+ _executor(std::move(executor)),
// This strange initialization is to work around the fact that the IDL does not currently
// support a default value for an enum. The default tailable mode should be 'kNormal', but
// since that is not supported we treat boost::none (unspecified) to mean 'kNormal'.
diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h
index 92cf05e66f5..3cf357dca6b 100644
--- a/src/mongo/s/query/async_results_merger.h
+++ b/src/mongo/s/query/async_results_merger.h
@@ -98,7 +98,7 @@ public:
* with a new, valid OperationContext before the next use.
*/
AsyncResultsMerger(OperationContext* opCtx,
- executor::TaskExecutor* executor,
+ std::shared_ptr<executor::TaskExecutor> executor,
AsyncResultsMergerParams params);
/**
@@ -446,7 +446,7 @@ private:
void _updateRemoteMetadata(WithLock, size_t remoteIndex, const CursorResponse& response);
OperationContext* _opCtx;
- executor::TaskExecutor* _executor;
+ std::shared_ptr<executor::TaskExecutor> _executor;
TailableModeEnum _tailableMode;
AsyncResultsMergerParams _params;
diff --git a/src/mongo/s/query/blocking_results_merger.cpp b/src/mongo/s/query/blocking_results_merger.cpp
index c8eda4d0253..7d01175c5c5 100644
--- a/src/mongo/s/query/blocking_results_merger.cpp
+++ b/src/mongo/s/query/blocking_results_merger.cpp
@@ -38,11 +38,11 @@ namespace mongo {
BlockingResultsMerger::BlockingResultsMerger(OperationContext* opCtx,
AsyncResultsMergerParams&& armParams,
- executor::TaskExecutor* executor,
+ std::shared_ptr<executor::TaskExecutor> executor,
std::unique_ptr<ResourceYielder> resourceYielder)
: _tailableMode(armParams.getTailableMode().value_or(TailableModeEnum::kNormal)),
_executor(executor),
- _arm(opCtx, executor, std::move(armParams)),
+ _arm(opCtx, std::move(executor), std::move(armParams)),
_resourceYielder(std::move(resourceYielder)) {}
StatusWith<stdx::cv_status> BlockingResultsMerger::doWaiting(
diff --git a/src/mongo/s/query/blocking_results_merger.h b/src/mongo/s/query/blocking_results_merger.h
index 75da6fcc26f..563a1e36010 100644
--- a/src/mongo/s/query/blocking_results_merger.h
+++ b/src/mongo/s/query/blocking_results_merger.h
@@ -29,6 +29,8 @@
#pragma once
+#include <memory>
+
#include "mongo/s/query/async_results_merger.h"
#include "mongo/s/query/router_exec_stage.h"
@@ -42,7 +44,7 @@ class BlockingResultsMerger {
public:
BlockingResultsMerger(OperationContext* opCtx,
AsyncResultsMergerParams&& arm,
- executor::TaskExecutor*,
+ std::shared_ptr<executor::TaskExecutor> executor,
std::unique_ptr<ResourceYielder> resourceYielder);
/**
@@ -113,7 +115,7 @@ private:
const std::function<StatusWith<stdx::cv_status>()>& waitFn) noexcept;
TailableModeEnum _tailableMode;
- executor::TaskExecutor* _executor;
+ std::shared_ptr<executor::TaskExecutor> _executor;
// In a case where we have a tailable, awaitData cursor, a call to 'next()' will block waiting
// for an event generated by '_arm', but may time out waiting for this event to be triggered.
diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp
index c11be89a9be..989231ce951 100644
--- a/src/mongo/s/query/cluster_aggregation_planner.cpp
+++ b/src/mongo/s/query/cluster_aggregation_planner.cpp
@@ -485,7 +485,7 @@ void addMergeCursorsSource(Pipeline* mergePipeline,
std::vector<OwnedRemoteCursor> ownedCursors,
const std::vector<ShardId>& targetedShards,
boost::optional<BSONObj> shardCursorsSortSpec,
- executor::TaskExecutor* executor) {
+ std::shared_ptr<executor::TaskExecutor> executor) {
auto* opCtx = mergePipeline->getContext()->opCtx;
AsyncResultsMergerParams armParams;
armParams.setSort(shardCursorsSortSpec);
@@ -523,7 +523,7 @@ void addMergeCursorsSource(Pipeline* mergePipeline,
// For change streams, we need to set up a custom stage to establish cursors on new shards when
// they are added, to ensure we don't miss results from the new shards.
auto mergeCursorsStage = DocumentSourceMergeCursors::create(
- executor, std::move(armParams), mergePipeline->getContext());
+ std::move(executor), std::move(armParams), mergePipeline->getContext());
if (liteParsedPipeline.hasChangeStream()) {
mergePipeline->addInitialSource(DocumentSourceUpdateOnAddShard::create(
diff --git a/src/mongo/s/query/cluster_aggregation_planner.h b/src/mongo/s/query/cluster_aggregation_planner.h
index 1d5ed1e13b5..95158f8a81f 100644
--- a/src/mongo/s/query/cluster_aggregation_planner.h
+++ b/src/mongo/s/query/cluster_aggregation_planner.h
@@ -29,6 +29,8 @@
#pragma once
+#include <memory>
+
#include "mongo/db/pipeline/exchange_spec_gen.h"
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
#include "mongo/db/pipeline/pipeline.h"
@@ -82,7 +84,7 @@ void addMergeCursorsSource(Pipeline* mergePipeline,
std::vector<OwnedRemoteCursor> remoteCursors,
const std::vector<ShardId>& targetedShards,
boost::optional<BSONObj> shardCursorsSortSpec,
- executor::TaskExecutor*);
+ std::shared_ptr<executor::TaskExecutor> executor);
/**
* Builds a ClusterClientCursor which will execute 'pipeline'. If 'pipeline' consists entirely of
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp
index ecfb31a2a46..8737dea92ea 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp
@@ -58,11 +58,12 @@ std::unique_ptr<ClusterClientCursor> ClusterClientCursorGuard::releaseCursor() {
return std::move(_ccc);
}
-ClusterClientCursorGuard ClusterClientCursorImpl::make(OperationContext* opCtx,
- executor::TaskExecutor* executor,
- ClusterClientCursorParams&& params) {
+ClusterClientCursorGuard ClusterClientCursorImpl::make(
+ OperationContext* opCtx,
+ std::shared_ptr<executor::TaskExecutor> executor,
+ ClusterClientCursorParams&& params) {
std::unique_ptr<ClusterClientCursor> cursor(new ClusterClientCursorImpl(
- opCtx, executor, std::move(params), opCtx->getLogicalSessionId()));
+ opCtx, std::move(executor), std::move(params), opCtx->getLogicalSessionId()));
return ClusterClientCursorGuard(opCtx, std::move(cursor));
}
@@ -75,11 +76,11 @@ ClusterClientCursorGuard ClusterClientCursorImpl::make(OperationContext* opCtx,
}
ClusterClientCursorImpl::ClusterClientCursorImpl(OperationContext* opCtx,
- executor::TaskExecutor* executor,
+ std::shared_ptr<executor::TaskExecutor> executor,
ClusterClientCursorParams&& params,
boost::optional<LogicalSessionId> lsid)
: _params(std::move(params)),
- _root(buildMergerPlan(opCtx, executor, &_params)),
+ _root(buildMergerPlan(opCtx, std::move(executor), &_params)),
_lsid(lsid),
_opCtx(opCtx),
_createdDate(opCtx->getServiceContext()->getPreciseClockSource()->now()),
@@ -223,7 +224,9 @@ boost::optional<ReadPreferenceSetting> ClusterClientCursorImpl::getReadPreferenc
}
std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan(
- OperationContext* opCtx, executor::TaskExecutor* executor, ClusterClientCursorParams* params) {
+ OperationContext* opCtx,
+ std::shared_ptr<executor::TaskExecutor> executor,
+ ClusterClientCursorParams* params) {
const auto skip = params->skip;
const auto limit = params->limit;
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h
index 50d36853cf8..2ec439e9627 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.h
+++ b/src/mongo/s/query/cluster_client_cursor_impl.h
@@ -90,7 +90,7 @@ public:
* ensured by an RAII object.
*/
static ClusterClientCursorGuard make(OperationContext* opCtx,
- executor::TaskExecutor* executor,
+ std::shared_ptr<executor::TaskExecutor> executor,
ClusterClientCursorParams&& params);
/**
@@ -161,7 +161,7 @@ public:
* Constructs a cluster client cursor.
*/
ClusterClientCursorImpl(OperationContext* opCtx,
- executor::TaskExecutor* executor,
+ std::shared_ptr<executor::TaskExecutor> executor,
ClusterClientCursorParams&& params,
boost::optional<LogicalSessionId> lsid);
@@ -169,9 +169,10 @@ private:
/**
* Constructs the pipeline of MergerPlanStages which will be used to answer the query.
*/
- std::unique_ptr<RouterExecStage> buildMergerPlan(OperationContext* opCtx,
- executor::TaskExecutor* executor,
- ClusterClientCursorParams* params);
+ std::unique_ptr<RouterExecStage> buildMergerPlan(
+ OperationContext* opCtx,
+ std::shared_ptr<executor::TaskExecutor> executor,
+ ClusterClientCursorParams* params);
ClusterClientCursorParams _params;
diff --git a/src/mongo/s/query/cluster_client_cursor_impl_test.cpp b/src/mongo/s/query/cluster_client_cursor_impl_test.cpp
index 215ac79365e..0119abcbd68 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl_test.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl_test.cpp
@@ -209,13 +209,15 @@ TEST_F(ClusterClientCursorImplTest, LogicalSessionIdsOnCursors) {
}
TEST_F(ClusterClientCursorImplTest, ShouldStoreLSIDIfSetOnOpCtx) {
+ std::shared_ptr<executor::TaskExecutor> nullExecutor;
+
{
// Make a cursor with no lsid or txnNumber.
ClusterClientCursorParams params(NamespaceString("test"), {});
params.lsid = _opCtx->getLogicalSessionId();
params.txnNumber = _opCtx->getTxnNumber();
- auto cursor = ClusterClientCursorImpl::make(_opCtx.get(), nullptr, std::move(params));
+ auto cursor = ClusterClientCursorImpl::make(_opCtx.get(), nullExecutor, std::move(params));
ASSERT_FALSE(cursor->getLsid());
ASSERT_FALSE(cursor->getTxnNumber());
}
@@ -229,7 +231,7 @@ TEST_F(ClusterClientCursorImplTest, ShouldStoreLSIDIfSetOnOpCtx) {
params.lsid = _opCtx->getLogicalSessionId();
params.txnNumber = _opCtx->getTxnNumber();
- auto cursor = ClusterClientCursorImpl::make(_opCtx.get(), nullptr, std::move(params));
+ auto cursor = ClusterClientCursorImpl::make(_opCtx.get(), nullExecutor, std::move(params));
ASSERT_EQ(*cursor->getLsid(), lsid);
ASSERT_FALSE(cursor->getTxnNumber());
}
@@ -243,7 +245,7 @@ TEST_F(ClusterClientCursorImplTest, ShouldStoreLSIDIfSetOnOpCtx) {
params.lsid = _opCtx->getLogicalSessionId();
params.txnNumber = _opCtx->getTxnNumber();
- auto cursor = ClusterClientCursorImpl::make(_opCtx.get(), nullptr, std::move(params));
+ auto cursor = ClusterClientCursorImpl::make(_opCtx.get(), nullExecutor, std::move(params));
ASSERT_EQ(*cursor->getLsid(), lsid);
ASSERT_EQ(*cursor->getTxnNumber(), txnNumber);
}
diff --git a/src/mongo/s/query/document_source_merge_cursors.cpp b/src/mongo/s/query/document_source_merge_cursors.cpp
index 0959e8f6efb..d48c6ca804a 100644
--- a/src/mongo/s/query/document_source_merge_cursors.cpp
+++ b/src/mongo/s/query/document_source_merge_cursors.cpp
@@ -46,13 +46,13 @@ REGISTER_DOCUMENT_SOURCE(mergeCursors,
constexpr StringData DocumentSourceMergeCursors::kStageName;
DocumentSourceMergeCursors::DocumentSourceMergeCursors(
- executor::TaskExecutor* executor,
+ std::shared_ptr<executor::TaskExecutor> executor,
AsyncResultsMergerParams armParams,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
boost::optional<BSONObj> ownedParamsSpec)
: DocumentSource(expCtx),
_armParamsObj(std::move(ownedParamsSpec)),
- _executor(executor),
+ _executor(std::move(executor)),
_armParams(std::move(armParams)) {}
std::size_t DocumentSourceMergeCursors::getNumRemotes() const {
@@ -129,10 +129,10 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceMergeCursors::createFromBson(
}
boost::intrusive_ptr<DocumentSourceMergeCursors> DocumentSourceMergeCursors::create(
- executor::TaskExecutor* executor,
+ std::shared_ptr<executor::TaskExecutor> executor,
AsyncResultsMergerParams params,
const boost::intrusive_ptr<ExpressionContext>& expCtx) {
- return new DocumentSourceMergeCursors(executor, std::move(params), expCtx);
+ return new DocumentSourceMergeCursors(std::move(executor), std::move(params), expCtx);
}
void DocumentSourceMergeCursors::detachFromOperationContext() {
diff --git a/src/mongo/s/query/document_source_merge_cursors.h b/src/mongo/s/query/document_source_merge_cursors.h
index 60cdf7d25f6..f8e25299260 100644
--- a/src/mongo/s/query/document_source_merge_cursors.h
+++ b/src/mongo/s/query/document_source_merge_cursors.h
@@ -29,6 +29,8 @@
#pragma once
+#include <memory>
+
#include "mongo/db/pipeline/document_source.h"
#include "mongo/executor/task_executor.h"
#include "mongo/s/query/blocking_results_merger.h"
@@ -60,7 +62,7 @@ public:
* Creates a new DocumentSourceMergeCursors from the given parameters.
*/
static boost::intrusive_ptr<DocumentSourceMergeCursors> create(
- executor::TaskExecutor*,
+ std::shared_ptr<executor::TaskExecutor>,
AsyncResultsMergerParams,
const boost::intrusive_ptr<ExpressionContext>&);
@@ -149,7 +151,7 @@ protected:
void doDispose() final;
private:
- DocumentSourceMergeCursors(executor::TaskExecutor*,
+ DocumentSourceMergeCursors(std::shared_ptr<executor::TaskExecutor>,
AsyncResultsMergerParams,
const boost::intrusive_ptr<ExpressionContext>&,
boost::optional<BSONObj> ownedParamsSpec = boost::none);
@@ -164,7 +166,7 @@ private:
// params are in use. We store them here.
boost::optional<BSONObj> _armParamsObj;
- executor::TaskExecutor* _executor;
+ std::shared_ptr<executor::TaskExecutor> _executor;
// '_blockingResultsMerger' is lazily populated. Until we need to use it, '_armParams' will be
// populated with the parameters. Once we start using '_blockingResultsMerger', '_armParams'
diff --git a/src/mongo/s/query/document_source_update_on_add_shard.cpp b/src/mongo/s/query/document_source_update_on_add_shard.cpp
index 3a295f200f7..00a8971454c 100644
--- a/src/mongo/s/query/document_source_update_on_add_shard.cpp
+++ b/src/mongo/s/query/document_source_update_on_add_shard.cpp
@@ -49,22 +49,25 @@ bool needsUpdate(const Document& childResult) {
boost::intrusive_ptr<DocumentSourceUpdateOnAddShard> DocumentSourceUpdateOnAddShard::create(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
- executor::TaskExecutor* executor,
+ std::shared_ptr<executor::TaskExecutor> executor,
const boost::intrusive_ptr<DocumentSourceMergeCursors>& mergeCursors,
std::vector<ShardId> shardsWithCursors,
BSONObj cmdToRunOnNewShards) {
- return new DocumentSourceUpdateOnAddShard(
- expCtx, executor, mergeCursors, std::move(shardsWithCursors), cmdToRunOnNewShards);
+ return new DocumentSourceUpdateOnAddShard(expCtx,
+ std::move(executor),
+ mergeCursors,
+ std::move(shardsWithCursors),
+ cmdToRunOnNewShards);
}
DocumentSourceUpdateOnAddShard::DocumentSourceUpdateOnAddShard(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
- executor::TaskExecutor* executor,
+ std::shared_ptr<executor::TaskExecutor> executor,
const boost::intrusive_ptr<DocumentSourceMergeCursors>& mergeCursors,
std::vector<ShardId>&& shardsWithCursors,
BSONObj cmdToRunOnNewShards)
: DocumentSource(expCtx),
- _executor(executor),
+ _executor(std::move(executor)),
_mergeCursors(mergeCursors),
_shardsWithCursors(std::move(shardsWithCursors)),
_cmdToRunOnNewShards(cmdToRunOnNewShards.getOwned()) {}
diff --git a/src/mongo/s/query/document_source_update_on_add_shard.h b/src/mongo/s/query/document_source_update_on_add_shard.h
index 5b00ece5e52..e8f2473b432 100644
--- a/src/mongo/s/query/document_source_update_on_add_shard.h
+++ b/src/mongo/s/query/document_source_update_on_add_shard.h
@@ -29,6 +29,8 @@
#pragma once
+#include <memory>
+
#include "mongo/db/pipeline/document_source.h"
#include "mongo/executor/task_executor.h"
#include "mongo/s/query/document_source_merge_cursors.h"
@@ -51,7 +53,7 @@ public:
*/
static boost::intrusive_ptr<DocumentSourceUpdateOnAddShard> create(
const boost::intrusive_ptr<ExpressionContext>&,
- executor::TaskExecutor*,
+ std::shared_ptr<executor::TaskExecutor> executor,
const boost::intrusive_ptr<DocumentSourceMergeCursors>&,
std::vector<ShardId> shardsWithCursors,
BSONObj cmdToRunOnNewShards);
@@ -81,7 +83,7 @@ public:
private:
DocumentSourceUpdateOnAddShard(const boost::intrusive_ptr<ExpressionContext>&,
- executor::TaskExecutor*,
+ std::shared_ptr<executor::TaskExecutor> executor,
const boost::intrusive_ptr<DocumentSourceMergeCursors>&,
std::vector<ShardId>&& shardsWithCursors,
BSONObj cmdToRunOnNewShards);
@@ -96,7 +98,7 @@ private:
*/
std::vector<RemoteCursor> establishShardCursorsOnNewShards(const Document& newShardDetectedObj);
- executor::TaskExecutor* _executor;
+ std::shared_ptr<executor::TaskExecutor> _executor;
boost::intrusive_ptr<DocumentSourceMergeCursors> _mergeCursors;
std::vector<ShardId> _shardsWithCursors;
BSONObj _cmdToRunOnNewShards;
diff --git a/src/mongo/s/query/establish_cursors.cpp b/src/mongo/s/query/establish_cursors.cpp
index 614672813a1..b97f9026d03 100644
--- a/src/mongo/s/query/establish_cursors.cpp
+++ b/src/mongo/s/query/establish_cursors.cpp
@@ -50,7 +50,7 @@
namespace mongo {
std::vector<RemoteCursor> establishCursors(OperationContext* opCtx,
- executor::TaskExecutor* executor,
+ std::shared_ptr<executor::TaskExecutor> executor,
const NamespaceString& nss,
const ReadPreferenceSetting readPref,
const std::vector<std::pair<ShardId, BSONObj>>& remotes,
@@ -136,7 +136,7 @@ std::vector<RemoteCursor> establishCursors(OperationContext* opCtx,
}
// Schedule killCursors against all cursors that were established.
- killRemoteCursors(opCtx, executor, std::move(remoteCursors), nss);
+ killRemoteCursors(opCtx, executor.get(), std::move(remoteCursors), nss);
} catch (const DBException&) {
// Ignore the new error and rethrow the original one.
}
diff --git a/src/mongo/s/query/establish_cursors.h b/src/mongo/s/query/establish_cursors.h
index be7eac8d869..97e72225072 100644
--- a/src/mongo/s/query/establish_cursors.h
+++ b/src/mongo/s/query/establish_cursors.h
@@ -30,6 +30,7 @@
#pragma once
#include <boost/optional.hpp>
+#include <memory>
#include <vector>
#include "mongo/base/status_with.h"
@@ -64,7 +65,7 @@ class CursorResponse;
*/
std::vector<RemoteCursor> establishCursors(
OperationContext* opCtx,
- executor::TaskExecutor* executor,
+ std::shared_ptr<executor::TaskExecutor> executor,
const NamespaceString& nss,
const ReadPreferenceSetting readPref,
const std::vector<std::pair<ShardId, BSONObj>>& remotes,
diff --git a/src/mongo/s/query/owned_remote_cursor.h b/src/mongo/s/query/owned_remote_cursor.h
index 473196e4de5..0e4e03103fe 100644
--- a/src/mongo/s/query/owned_remote_cursor.h
+++ b/src/mongo/s/query/owned_remote_cursor.h
@@ -50,10 +50,8 @@ public:
~OwnedRemoteCursor() {
if (_remoteCursor) {
- killRemoteCursor(_opCtx,
- Grid::get(_opCtx)->getExecutorPool()->getArbitraryExecutor(),
- releaseCursor(),
- _nss);
+ auto executor = Grid::get(_opCtx)->getExecutorPool()->getArbitraryExecutor();
+ killRemoteCursor(_opCtx, executor.get(), releaseCursor(), _nss);
}
}
diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h
index 574f39a280d..a7cead17d89 100644
--- a/src/mongo/s/query/router_stage_merge.h
+++ b/src/mongo/s/query/router_stage_merge.h
@@ -29,6 +29,8 @@
#pragma once
+#include <memory>
+
#include "mongo/executor/task_executor.h"
#include "mongo/s/query/blocking_results_merger.h"
#include "mongo/s/query/cluster_client_cursor_params.h"
@@ -44,9 +46,10 @@ namespace mongo {
class RouterStageMerge final : public RouterExecStage {
public:
RouterStageMerge(OperationContext* opCtx,
- executor::TaskExecutor* executor,
+ std::shared_ptr<executor::TaskExecutor> executor,
AsyncResultsMergerParams&& armParams)
- : RouterExecStage(opCtx), _resultsMerger(opCtx, std::move(armParams), executor, nullptr) {}
+ : RouterExecStage(opCtx),
+ _resultsMerger(opCtx, std::move(armParams), std::move(executor), nullptr) {}
StatusWith<ClusterQueryResult> next(ExecContext execCtx) final {
return _resultsMerger.next(getOpCtx(), execCtx);
diff --git a/src/mongo/s/query/store_possible_cursor.cpp b/src/mongo/s/query/store_possible_cursor.cpp
index e2edf321a4e..9877af451a2 100644
--- a/src/mongo/s/query/store_possible_cursor.cpp
+++ b/src/mongo/s/query/store_possible_cursor.cpp
@@ -74,7 +74,7 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
const HostAndPort& server,
const BSONObj& cmdResult,
const NamespaceString& requestedNss,
- executor::TaskExecutor* executor,
+ std::shared_ptr<executor::TaskExecutor> executor,
ClusterCursorManager* cursorManager,
PrivilegeVector privileges,
TailableModeEnum tailableMode) {
@@ -117,7 +117,7 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
params.isAutoCommit = false;
}
- auto ccc = ClusterClientCursorImpl::make(opCtx, executor, std::move(params));
+ auto ccc = ClusterClientCursorImpl::make(opCtx, std::move(executor), std::move(params));
// We don't expect to use this cursor until a subsequent getMore, so detach from the current
// OperationContext until then.
diff --git a/src/mongo/s/query/store_possible_cursor.h b/src/mongo/s/query/store_possible_cursor.h
index 5202935df32..38b13b4ea7a 100644
--- a/src/mongo/s/query/store_possible_cursor.h
+++ b/src/mongo/s/query/store_possible_cursor.h
@@ -78,7 +78,7 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
const HostAndPort& server,
const BSONObj& cmdResult,
const NamespaceString& requestedNss,
- executor::TaskExecutor* executor,
+ std::shared_ptr<executor::TaskExecutor> executor,
ClusterCursorManager* cursorManager,
PrivilegeVector privileges,
TailableModeEnum tailableMode = TailableModeEnum::kNormal);
diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp
index ed9364ea133..944ff7d94f4 100644
--- a/src/mongo/s/sharding_initialization.cpp
+++ b/src/mongo/s/sharding_initialization.cpp
@@ -98,7 +98,7 @@ std::unique_ptr<ShardingCatalogClient> makeCatalogClient(ServiceContext* service
return std::make_unique<ShardingCatalogClientImpl>(std::move(distLockManager));
}
-std::unique_ptr<executor::TaskExecutor> makeShardingFixedTaskExecutor(
+std::shared_ptr<executor::TaskExecutor> makeShardingFixedTaskExecutor(
std::unique_ptr<NetworkInterface> net) {
auto executor =
std::make_unique<ThreadPoolTaskExecutor>(std::make_unique<ThreadPool>([] {
@@ -110,7 +110,7 @@ std::unique_ptr<executor::TaskExecutor> makeShardingFixedTaskExecutor(
}()),
std::move(net));
- return std::make_unique<executor::ShardingTaskExecutor>(std::move(executor));
+ return std::make_shared<executor::ShardingTaskExecutor>(std::move(executor));
}
std::unique_ptr<TaskExecutorPool> makeShardingTaskExecutorPool(
@@ -118,7 +118,7 @@ std::unique_ptr<TaskExecutorPool> makeShardingTaskExecutorPool(
rpc::ShardingEgressMetadataHookBuilder metadataHookBuilder,
ConnectionPool::Options connPoolOptions,
boost::optional<size_t> taskExecutorPoolSize) {
- std::vector<std::unique_ptr<executor::TaskExecutor>> executors;
+ std::vector<std::shared_ptr<executor::TaskExecutor>> executors;
const auto poolSize = taskExecutorPoolSize.value_or(TaskExecutorPool::getSuggestedPoolSize());
diff --git a/src/mongo/s/sharding_mongod_test_fixture.cpp b/src/mongo/s/sharding_mongod_test_fixture.cpp
index 7080a938399..29f029933dd 100644
--- a/src/mongo/s/sharding_mongod_test_fixture.cpp
+++ b/src/mongo/s/sharding_mongod_test_fixture.cpp
@@ -171,13 +171,13 @@ std::unique_ptr<executor::TaskExecutorPool> ShardingMongodTestFixture::makeTaskE
// note that the ThreadPoolMock uses the NetworkInterfaceMock's threads to run tasks, which is
// again just the (single) thread the unit test is running on. Therefore, all tasks, local and
// remote, must be carried out synchronously by the test thread.
- auto fixedTaskExecutor = makeThreadPoolTestExecutor(std::move(netForFixedTaskExecutor));
+ auto fixedTaskExecutor = makeSharedThreadPoolTestExecutor(std::move(netForFixedTaskExecutor));
_networkTestEnv = std::make_unique<NetworkTestEnv>(fixedTaskExecutor.get(), _mockNetwork);
// Set up (one) TaskExecutor for the set of arbitrary TaskExecutors.
- std::vector<std::unique_ptr<executor::TaskExecutor>> arbitraryExecutorsForExecutorPool;
+ std::vector<std::shared_ptr<executor::TaskExecutor>> arbitraryExecutorsForExecutorPool;
arbitraryExecutorsForExecutorPool.emplace_back(
- makeThreadPoolTestExecutor(std::make_unique<executor::NetworkInterfaceMock>()));
+ makeSharedThreadPoolTestExecutor(std::make_unique<executor::NetworkInterfaceMock>()));
// Set up the TaskExecutorPool with the fixed TaskExecutor and set of arbitrary TaskExecutors.
auto executorPool = std::make_unique<executor::TaskExecutorPool>();
@@ -336,7 +336,7 @@ void ShardingMongodTestFixture::shutdownExecutorPool() {
_executorPoolShutDown = true;
}
-executor::TaskExecutor* ShardingMongodTestFixture::executor() const {
+std::shared_ptr<executor::TaskExecutor> ShardingMongodTestFixture::executor() const {
invariant(Grid::get(operationContext())->getExecutorPool());
return Grid::get(operationContext())->getExecutorPool()->getFixedExecutor();
}
diff --git a/src/mongo/s/sharding_mongod_test_fixture.h b/src/mongo/s/sharding_mongod_test_fixture.h
index ee29f7de8b6..b7cadc17792 100644
--- a/src/mongo/s/sharding_mongod_test_fixture.h
+++ b/src/mongo/s/sharding_mongod_test_fixture.h
@@ -76,7 +76,7 @@ public:
CatalogCache* catalogCache() const;
ShardRegistry* shardRegistry() const;
RemoteCommandTargeterFactoryMock* targeterFactory() const;
- executor::TaskExecutor* executor() const;
+ std::shared_ptr<executor::TaskExecutor> executor() const;
DistLockManager* distLock() const;
ClusterCursorManager* clusterCursorManager() const;
executor::TaskExecutorPool* executorPool() const;
diff --git a/src/mongo/s/sharding_router_test_fixture.cpp b/src/mongo/s/sharding_router_test_fixture.cpp
index 189b76e73d6..c0b5e8d4a62 100644
--- a/src/mongo/s/sharding_router_test_fixture.cpp
+++ b/src/mongo/s/sharding_router_test_fixture.cpp
@@ -118,9 +118,8 @@ ShardingTestFixture::ShardingTestFixture() {
auto fixedNet = std::make_unique<executor::NetworkInterfaceMock>();
fixedNet->setEgressMetadataHook(makeMetadataHookList());
_mockNetwork = fixedNet.get();
- auto fixedExec = makeShardingTestExecutor(std::move(fixedNet));
- _networkTestEnv = std::make_unique<NetworkTestEnv>(fixedExec.get(), _mockNetwork);
- _executor = fixedExec.get();
+ _fixedExecutor = makeShardingTestExecutor(std::move(fixedNet));
+ _networkTestEnv = std::make_unique<NetworkTestEnv>(_fixedExecutor.get(), _mockNetwork);
auto netForPool = std::make_unique<executor::NetworkInterfaceMock>();
netForPool->setEgressMetadataHook(makeMetadataHookList());
@@ -128,11 +127,11 @@ ShardingTestFixture::ShardingTestFixture() {
auto execForPool = makeShardingTestExecutor(std::move(netForPool));
_networkTestEnvForPool =
std::make_unique<NetworkTestEnv>(execForPool.get(), _mockNetworkForPool);
- std::vector<std::unique_ptr<executor::TaskExecutor>> executorsForPool;
+ std::vector<std::shared_ptr<executor::TaskExecutor>> executorsForPool;
executorsForPool.emplace_back(std::move(execForPool));
auto executorPool = std::make_unique<executor::TaskExecutorPool>();
- executorPool->addExecutors(std::move(executorsForPool), std::move(fixedExec));
+ executorPool->addExecutors(std::move(executorsForPool), _fixedExecutor);
auto uniqueDistLockManager = std::make_unique<DistLockManagerMock>(nullptr);
_distLockManager = uniqueDistLockManager.get();
@@ -193,8 +192,8 @@ ShardingTestFixture::~ShardingTestFixture() {
}
void ShardingTestFixture::shutdownExecutor() {
- if (_executor)
- _executor->shutdown();
+ if (_fixedExecutor)
+ _fixedExecutor->shutdown();
}
ShardingCatalogClient* ShardingTestFixture::catalogClient() const {
@@ -217,10 +216,10 @@ RemoteCommandTargeterMock* ShardingTestFixture::configTargeter() const {
return _configTargeter;
}
-executor::TaskExecutor* ShardingTestFixture::executor() const {
- invariant(_executor);
+std::shared_ptr<executor::TaskExecutor> ShardingTestFixture::executor() const {
+ invariant(_fixedExecutor);
- return _executor;
+ return _fixedExecutor;
}
DistLockManagerMock* ShardingTestFixture::distLock() const {
diff --git a/src/mongo/s/sharding_router_test_fixture.h b/src/mongo/s/sharding_router_test_fixture.h
index 05fa042a8a7..6f3ca156e12 100644
--- a/src/mongo/s/sharding_router_test_fixture.h
+++ b/src/mongo/s/sharding_router_test_fixture.h
@@ -62,7 +62,7 @@ public:
ShardingCatalogClient* catalogClient() const;
ShardRegistry* shardRegistry() const;
RemoteCommandTargeterFactoryMock* targeterFactory() const;
- executor::TaskExecutor* executor() const;
+ std::shared_ptr<executor::TaskExecutor> executor() const;
DistLockManagerMock* distLock() const;
RemoteCommandTargeterMock* configTargeter() const;
@@ -180,7 +180,7 @@ private:
RemoteCommandTargeterMock* _configTargeter;
// For the Grid's fixed executor.
- executor::TaskExecutor* _executor;
+ std::shared_ptr<executor::TaskExecutor> _fixedExecutor;
// For the Grid's arbitrary executor in its executorPool.
std::unique_ptr<executor::NetworkTestEnv> _networkTestEnvForPool;