diff options
author | Randolph Tan <randolph@10gen.com> | 2019-06-27 15:41:10 -0400 |
---|---|---|
committer | Randolph Tan <randolph@10gen.com> | 2019-07-17 11:43:27 -0400 |
commit | ed935ffffae8dd373063da350e4cb61547e20688 (patch) | |
tree | fb277c512fddb87a44790cf34a0deaae0837dec0 /src/mongo | |
parent | 998640bbece98f31a92bba8c02785c818545ba84 (diff) | |
download | mongo-ed935ffffae8dd373063da350e4cb61547e20688.tar.gz |
SERVER-40785 Change sharding fixed and arbitrary executors from unique_ptr to shared_ptr
(cherry picked from commit d15b1f2e036e262f8ea976e04780aa366fa20ad4)
Diffstat (limited to 'src/mongo')
43 files changed, 150 insertions, 117 deletions
diff --git a/src/mongo/db/s/balancer/migration_manager.cpp b/src/mongo/db/s/balancer/migration_manager.cpp index 9c409334636..b131bbafde7 100644 --- a/src/mongo/db/s/balancer/migration_manager.cpp +++ b/src/mongo/db/s/balancer/migration_manager.cpp @@ -381,8 +381,7 @@ void MigrationManager::finishRecovery(OperationContext* opCtx, } void MigrationManager::interruptAndDisableMigrations() { - executor::TaskExecutor* const executor = - Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor(); + auto executor = Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor(); stdx::lock_guard<stdx::mutex> lock(_mutex); invariant(_state == State::kEnabled || _state == State::kRecovering); @@ -479,8 +478,7 @@ void MigrationManager::_schedule(WithLock lock, OperationContext* opCtx, const HostAndPort& targetHost, Migration migration) { - executor::TaskExecutor* const executor = - Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); + auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); const NamespaceString nss(migration.nss); diff --git a/src/mongo/db/s/collection_metadata_filtering_test.cpp b/src/mongo/db/s/collection_metadata_filtering_test.cpp index 6463636c22a..d125e651adc 100644 --- a/src/mongo/db/s/collection_metadata_filtering_test.cpp +++ b/src/mongo/db/s/collection_metadata_filtering_test.cpp @@ -44,7 +44,7 @@ class CollectionMetadataFilteringTest : public ShardServerTestFixture { protected: void setUp() override { ShardServerTestFixture::setUp(); - _manager = std::make_shared<MetadataManager>(getServiceContext(), kNss, executor()); + _manager = std::make_shared<MetadataManager>(getServiceContext(), kNss, executor().get()); } /** diff --git a/src/mongo/db/s/metadata_manager_test.cpp b/src/mongo/db/s/metadata_manager_test.cpp index db701ec0a60..8563070c7cf 100644 --- a/src/mongo/db/s/metadata_manager_test.cpp +++ b/src/mongo/db/s/metadata_manager_test.cpp @@ -64,7 +64,7 @@ class MetadataManagerTest : public ShardServerTestFixture { protected: void setUp() override { ShardServerTestFixture::setUp(); - _manager = std::make_shared<MetadataManager>(getServiceContext(), kNss, executor()); + _manager = std::make_shared<MetadataManager>(getServiceContext(), kNss, executor().get()); } /** diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index 74a3e7707fd..ab4d76a8952 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -108,8 +108,7 @@ void refreshRecipientRoutingTable(OperationContext* opCtx, opCtx, executor::RemoteCommandRequest::kNoTimeout); - executor::TaskExecutor* const executor = - Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); + auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); auto noOp = [](const executor::TaskExecutor::RemoteCommandCallbackArgs&) {}; executor->scheduleRemoteCommand(request, noOp).getStatus().ignore(); } diff --git a/src/mongo/db/s/transaction_coordinator_futures_util.h b/src/mongo/db/s/transaction_coordinator_futures_util.h index ee35d8e4db3..1c654d8707f 100644 --- a/src/mongo/db/s/transaction_coordinator_futures_util.h +++ b/src/mongo/db/s/transaction_coordinator_futures_util.h @@ -29,6 +29,7 @@ #pragma once +#include <memory> #include <vector> #include "mongo/client/read_preference.h" @@ -192,8 +193,8 @@ private: // Service context under which this executor runs ServiceContext* const _serviceContext; - // Cached reference to the executor to use - executor::TaskExecutor* const _executor; + // Executor for performing async tasks. + std::shared_ptr<executor::TaskExecutor> _executor; // If this work scheduler was constructed through 'makeChildScheduler', points to the parent // scheduler and contains the iterator from the parent, which needs to be removed on destruction diff --git a/src/mongo/executor/async_multicaster.cpp b/src/mongo/executor/async_multicaster.cpp index f1c97acf5a3..24e72527d51 100644 --- a/src/mongo/executor/async_multicaster.cpp +++ b/src/mongo/executor/async_multicaster.cpp @@ -44,8 +44,9 @@ namespace mongo { namespace executor { -AsyncMulticaster::AsyncMulticaster(executor::TaskExecutor* executor, Options options) - : _options(options), _executor(executor) {} +AsyncMulticaster::AsyncMulticaster(std::shared_ptr<executor::TaskExecutor> executor, + Options options) + : _options(options), _executor(std::move(executor)) {} std::vector<AsyncMulticaster::Reply> AsyncMulticaster::multicast( const std::vector<HostAndPort> servers, diff --git a/src/mongo/executor/async_multicaster.h b/src/mongo/executor/async_multicaster.h index 2433f1fd45b..c2bc9e0be93 100644 --- a/src/mongo/executor/async_multicaster.h +++ b/src/mongo/executor/async_multicaster.h @@ -29,6 +29,7 @@ #pragma once +#include <memory> #include <vector> #include "mongo/executor/remote_command_response.h" @@ -54,7 +55,7 @@ public: size_t maxConcurrency = kMaxConcurrency; }; - AsyncMulticaster(executor::TaskExecutor* executor, Options options); + AsyncMulticaster(std::shared_ptr<executor::TaskExecutor> executor, Options options); /** * Sends the cmd out to all passed servers (via the executor), observing the multicaster's @@ -73,7 +74,7 @@ public: private: Options _options; - executor::TaskExecutor* _executor; + std::shared_ptr<executor::TaskExecutor> _executor; }; } // namespace executor diff --git a/src/mongo/executor/scoped_task_executor.cpp b/src/mongo/executor/scoped_task_executor.cpp index e891d3c69a8..0f718242163 100644 --- a/src/mongo/executor/scoped_task_executor.cpp +++ b/src/mongo/executor/scoped_task_executor.cpp @@ -49,7 +49,7 @@ class ScopedTaskExecutor::Impl : public std::enable_shared_from_this<ScopedTaskE Status(ErrorCodes::ShutdownInProgress, "Shutting down ScopedTaskExecutor::Impl"); public: - explicit Impl(TaskExecutor* executor) : _executor(executor) {} + explicit Impl(std::shared_ptr<TaskExecutor> executor) : _executor(std::move(executor)) {} ~Impl() { // The ScopedTaskExecutor dtor calls shutdown, so this is guaranteed. @@ -304,7 +304,7 @@ private: stdx::mutex _mutex; bool _inShutdown = false; - TaskExecutor* const _executor; + std::shared_ptr<TaskExecutor> _executor; size_t _id = 0; stdx::unordered_map<size_t, CallbackHandle> _cbHandles; @@ -313,8 +313,8 @@ private: stdx::condition_variable _cv; }; -ScopedTaskExecutor::ScopedTaskExecutor(TaskExecutor* executor) - : _executor(std::make_shared<Impl>(executor)) {} +ScopedTaskExecutor::ScopedTaskExecutor(std::shared_ptr<TaskExecutor> executor) + : _executor(std::make_shared<Impl>(std::move(executor))) {} ScopedTaskExecutor::~ScopedTaskExecutor() { _executor->shutdown(); diff --git a/src/mongo/executor/scoped_task_executor.h b/src/mongo/executor/scoped_task_executor.h index 122308e489a..1c42e867fc7 100644 --- a/src/mongo/executor/scoped_task_executor.h +++ b/src/mongo/executor/scoped_task_executor.h @@ -83,7 +83,7 @@ namespace executor { */ class ScopedTaskExecutor { public: - explicit ScopedTaskExecutor(TaskExecutor* executor); + explicit ScopedTaskExecutor(std::shared_ptr<TaskExecutor> executor); // Delete all move/copy-ability ScopedTaskExecutor(TaskExecutor&&) = delete; diff --git a/src/mongo/executor/scoped_task_executor_test.cpp b/src/mongo/executor/scoped_task_executor_test.cpp index 5ce217ecddf..ec078cb4dde 100644 --- a/src/mongo/executor/scoped_task_executor_test.cpp +++ b/src/mongo/executor/scoped_task_executor_test.cpp @@ -46,10 +46,10 @@ public: void setUp() override { auto net = std::make_unique<NetworkInterfaceMock>(); _net = net.get(); - _tpte.emplace(std::make_unique<ThreadPoolMock>(_net, 1, ThreadPoolMock::Options{}), - std::move(net)); + _tpte = std::make_shared<ThreadPoolTaskExecutor>( + std::make_unique<ThreadPoolMock>(_net, 1, ThreadPoolMock::Options{}), std::move(net)); _tpte->startup(); - _executor.emplace(_tpte.get_ptr()); + _executor.emplace(_tpte); } void tearDown() override { @@ -118,7 +118,7 @@ public: private: NetworkInterfaceMock* _net; - boost::optional<ThreadPoolTaskExecutor> _tpte; + std::shared_ptr<ThreadPoolTaskExecutor> _tpte; boost::optional<ScopedTaskExecutor> _executor; }; diff --git a/src/mongo/executor/task_executor_pool.cpp b/src/mongo/executor/task_executor_pool.cpp index e77e5eb665a..2f7bd5d95d8 100644 --- a/src/mongo/executor/task_executor_pool.cpp +++ b/src/mongo/executor/task_executor_pool.cpp @@ -73,8 +73,8 @@ void TaskExecutorPool::shutdownAndJoin() { } } -void TaskExecutorPool::addExecutors(std::vector<std::unique_ptr<TaskExecutor>> executors, - std::unique_ptr<TaskExecutor> fixedExecutor) { +void TaskExecutorPool::addExecutors(std::vector<std::shared_ptr<TaskExecutor>> executors, + std::shared_ptr<TaskExecutor> fixedExecutor) { invariant(_executors.empty()); invariant(fixedExecutor); invariant(!_fixedExecutor); @@ -83,15 +83,15 @@ void TaskExecutorPool::addExecutors(std::vector<std::unique_ptr<TaskExecutor>> e _executors = std::move(executors); } -TaskExecutor* TaskExecutorPool::getArbitraryExecutor() { +const std::shared_ptr<TaskExecutor>& TaskExecutorPool::getArbitraryExecutor() { invariant(!_executors.empty()); uint64_t idx = (_counter.fetchAndAdd(1) % _executors.size()); - return _executors[idx].get(); + return _executors[idx]; } -TaskExecutor* TaskExecutorPool::getFixedExecutor() { +const std::shared_ptr<TaskExecutor>& TaskExecutorPool::getFixedExecutor() { invariant(_fixedExecutor); - return _fixedExecutor.get(); + return _fixedExecutor; } void TaskExecutorPool::appendConnectionStats(ConnectionPoolStats* stats) const { diff --git a/src/mongo/executor/task_executor_pool.h b/src/mongo/executor/task_executor_pool.h index b70a24c705f..05b307a9099 100644 --- a/src/mongo/executor/task_executor_pool.h +++ b/src/mongo/executor/task_executor_pool.h @@ -78,8 +78,8 @@ public: * Adds 'executors' and 'fixedExecutor' to the pool. May be called at most once to initialize an * empty pool. */ - void addExecutors(std::vector<std::unique_ptr<TaskExecutor>> executors, - std::unique_ptr<TaskExecutor> fixedExecutor); + void addExecutors(std::vector<std::shared_ptr<TaskExecutor>> executors, + std::shared_ptr<TaskExecutor> fixedExecutor); /** * Returns a pointer to one of the executors in the pool. Two calls to this method may return @@ -90,7 +90,7 @@ public: * * Thread-safe. */ - TaskExecutor* getArbitraryExecutor(); + const std::shared_ptr<TaskExecutor>& getArbitraryExecutor(); /** * Returns a pointer to the pool's fixed executor. Every call to this method will return the @@ -101,7 +101,7 @@ public: * * Thread-safe. */ - TaskExecutor* getFixedExecutor(); + const std::shared_ptr<TaskExecutor>& getFixedExecutor(); /** * Appends connection information from all of the executors in the pool. @@ -115,9 +115,9 @@ public: private: AtomicWord<unsigned> _counter; - std::vector<std::unique_ptr<TaskExecutor>> _executors; + std::vector<std::shared_ptr<TaskExecutor>> _executors; - std::unique_ptr<TaskExecutor> _fixedExecutor; + std::shared_ptr<TaskExecutor> _fixedExecutor; }; } // namespace executor diff --git a/src/mongo/executor/thread_pool_task_executor_test_fixture.cpp b/src/mongo/executor/thread_pool_task_executor_test_fixture.cpp index ba35b1963e9..4be2c7265fb 100644 --- a/src/mongo/executor/thread_pool_task_executor_test_fixture.cpp +++ b/src/mongo/executor/thread_pool_task_executor_test_fixture.cpp @@ -44,6 +44,13 @@ std::unique_ptr<ThreadPoolTaskExecutor> makeThreadPoolTestExecutor( stdx::make_unique<ThreadPoolMock>(netPtr, 1, std::move(options)), std::move(net)); } +std::shared_ptr<ThreadPoolTaskExecutor> makeSharedThreadPoolTestExecutor( + std::unique_ptr<NetworkInterfaceMock> net, ThreadPoolMock::Options options) { + auto netPtr = net.get(); + return std::make_shared<ThreadPoolTaskExecutor>( + std::make_unique<ThreadPoolMock>(netPtr, 1, std::move(options)), std::move(net)); +} + ThreadPoolExecutorTest::ThreadPoolExecutorTest() {} ThreadPoolExecutorTest::ThreadPoolExecutorTest(ThreadPoolMock::Options options) diff --git a/src/mongo/executor/thread_pool_task_executor_test_fixture.h b/src/mongo/executor/thread_pool_task_executor_test_fixture.h index 2519de104f1..07d593b244f 100644 --- a/src/mongo/executor/thread_pool_task_executor_test_fixture.h +++ b/src/mongo/executor/thread_pool_task_executor_test_fixture.h @@ -46,6 +46,10 @@ std::unique_ptr<ThreadPoolTaskExecutor> makeThreadPoolTestExecutor( std::unique_ptr<NetworkInterfaceMock> net, executor::ThreadPoolMock::Options options = executor::ThreadPoolMock::Options()); +std::shared_ptr<ThreadPoolTaskExecutor> makeSharedThreadPoolTestExecutor( + std::unique_ptr<NetworkInterfaceMock> net, + executor::ThreadPoolMock::Options options = executor::ThreadPoolMock::Options()); + /** * Useful fixture class for tests that use a ThreadPoolTaskExecutor. */ diff --git a/src/mongo/s/async_requests_sender.cpp b/src/mongo/s/async_requests_sender.cpp index 86fa3af6300..609b8db39fb 100644 --- a/src/mongo/s/async_requests_sender.cpp +++ b/src/mongo/s/async_requests_sender.cpp @@ -59,7 +59,7 @@ const int kMaxNumFailedHostRetryAttempts = 3; } // namespace AsyncRequestsSender::AsyncRequestsSender(OperationContext* opCtx, - executor::TaskExecutor* executor, + std::shared_ptr<executor::TaskExecutor> executor, StringData dbName, const std::vector<AsyncRequestsSender::Request>& requests, const ReadPreferenceSetting& readPreference, @@ -68,7 +68,7 @@ AsyncRequestsSender::AsyncRequestsSender(OperationContext* opCtx, _db(dbName.toString()), _readPreference(readPreference), _retryPolicy(retryPolicy), - _subExecutor(executor), + _subExecutor(std::move(executor)), _subBaton(opCtx->getBaton()->makeSubBaton()) { _remotesLeft = requests.size(); diff --git a/src/mongo/s/async_requests_sender.h b/src/mongo/s/async_requests_sender.h index 95c457e2e7f..3e14bec56d3 100644 --- a/src/mongo/s/async_requests_sender.h +++ b/src/mongo/s/async_requests_sender.h @@ -131,7 +131,7 @@ public: * valid for the lifetime of the ARS. */ AsyncRequestsSender(OperationContext* opCtx, - executor::TaskExecutor* executor, + std::shared_ptr<executor::TaskExecutor> executor, StringData dbName, const std::vector<AsyncRequestsSender::Request>& requests, const ReadPreferenceSetting& readPreference, diff --git a/src/mongo/s/client/shard_remote.cpp b/src/mongo/s/client/shard_remote.cpp index 5e5b5b3ecb9..362160babec 100644 --- a/src/mongo/s/client/shard_remote.cpp +++ b/src/mongo/s/client/shard_remote.cpp @@ -301,7 +301,8 @@ StatusWith<Shard::QueryResponse> ShardRemote::_runExhaustiveCursorCommand( const Milliseconds requestTimeout = std::min(opCtx->getRemainingMaxTimeMillis(), maxTimeMSOverride); - Fetcher fetcher(Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(), + auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); + Fetcher fetcher(executor.get(), host.getValue(), dbName.toString(), cmdObj, diff --git a/src/mongo/s/multi_statement_transaction_requests_sender.cpp b/src/mongo/s/multi_statement_transaction_requests_sender.cpp index 9671914d315..c302ac28d2b 100644 --- a/src/mongo/s/multi_statement_transaction_requests_sender.cpp +++ b/src/mongo/s/multi_statement_transaction_requests_sender.cpp @@ -74,15 +74,18 @@ void processReplyMetadata(OperationContext* opCtx, const AsyncRequestsSender::Re MultiStatementTransactionRequestsSender::MultiStatementTransactionRequestsSender( OperationContext* opCtx, - executor::TaskExecutor* executor, + std::shared_ptr<executor::TaskExecutor> executor, StringData dbName, const std::vector<AsyncRequestsSender::Request>& requests, const ReadPreferenceSetting& readPreference, Shard::RetryPolicy retryPolicy) : _opCtx(opCtx), - _ars( - opCtx, executor, dbName, attachTxnDetails(opCtx, requests), readPreference, retryPolicy) { -} + _ars(opCtx, + std::move(executor), + dbName, + attachTxnDetails(opCtx, requests), + readPreference, + retryPolicy) {} bool MultiStatementTransactionRequestsSender::done() { return _ars.done(); diff --git a/src/mongo/s/multi_statement_transaction_requests_sender.h b/src/mongo/s/multi_statement_transaction_requests_sender.h index af19762ea6f..0d93d031b99 100644 --- a/src/mongo/s/multi_statement_transaction_requests_sender.h +++ b/src/mongo/s/multi_statement_transaction_requests_sender.h @@ -47,7 +47,7 @@ public: */ MultiStatementTransactionRequestsSender( OperationContext* opCtx, - executor::TaskExecutor* executor, + std::shared_ptr<executor::TaskExecutor> executor, StringData dbName, const std::vector<AsyncRequestsSender::Request>& requests, const ReadPreferenceSetting& readPreference, diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index 444c3968ee5..0f76cdd3f67 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -82,10 +82,10 @@ int compareSortKeys(BSONObj leftSortKey, BSONObj rightSortKey, BSONObj sortKeyPa } // namespace AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx, - executor::TaskExecutor* executor, + std::shared_ptr<executor::TaskExecutor> executor, AsyncResultsMergerParams params) : _opCtx(opCtx), - _executor(executor), + _executor(std::move(executor)), // This strange initialization is to work around the fact that the IDL does not currently // support a default value for an enum. The default tailable mode should be 'kNormal', but // since that is not supported we treat boost::none (unspecified) to mean 'kNormal'. diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h index 92cf05e66f5..3cf357dca6b 100644 --- a/src/mongo/s/query/async_results_merger.h +++ b/src/mongo/s/query/async_results_merger.h @@ -98,7 +98,7 @@ public: * with a new, valid OperationContext before the next use. */ AsyncResultsMerger(OperationContext* opCtx, - executor::TaskExecutor* executor, + std::shared_ptr<executor::TaskExecutor> executor, AsyncResultsMergerParams params); /** @@ -446,7 +446,7 @@ private: void _updateRemoteMetadata(WithLock, size_t remoteIndex, const CursorResponse& response); OperationContext* _opCtx; - executor::TaskExecutor* _executor; + std::shared_ptr<executor::TaskExecutor> _executor; TailableModeEnum _tailableMode; AsyncResultsMergerParams _params; diff --git a/src/mongo/s/query/blocking_results_merger.cpp b/src/mongo/s/query/blocking_results_merger.cpp index c8eda4d0253..7d01175c5c5 100644 --- a/src/mongo/s/query/blocking_results_merger.cpp +++ b/src/mongo/s/query/blocking_results_merger.cpp @@ -38,11 +38,11 @@ namespace mongo { BlockingResultsMerger::BlockingResultsMerger(OperationContext* opCtx, AsyncResultsMergerParams&& armParams, - executor::TaskExecutor* executor, + std::shared_ptr<executor::TaskExecutor> executor, std::unique_ptr<ResourceYielder> resourceYielder) : _tailableMode(armParams.getTailableMode().value_or(TailableModeEnum::kNormal)), _executor(executor), - _arm(opCtx, executor, std::move(armParams)), + _arm(opCtx, std::move(executor), std::move(armParams)), _resourceYielder(std::move(resourceYielder)) {} StatusWith<stdx::cv_status> BlockingResultsMerger::doWaiting( diff --git a/src/mongo/s/query/blocking_results_merger.h b/src/mongo/s/query/blocking_results_merger.h index 75da6fcc26f..563a1e36010 100644 --- a/src/mongo/s/query/blocking_results_merger.h +++ b/src/mongo/s/query/blocking_results_merger.h @@ -29,6 +29,8 @@ #pragma once +#include <memory> + #include "mongo/s/query/async_results_merger.h" #include "mongo/s/query/router_exec_stage.h" @@ -42,7 +44,7 @@ class BlockingResultsMerger { public: BlockingResultsMerger(OperationContext* opCtx, AsyncResultsMergerParams&& arm, - executor::TaskExecutor*, + std::shared_ptr<executor::TaskExecutor> executor, std::unique_ptr<ResourceYielder> resourceYielder); /** @@ -113,7 +115,7 @@ private: const std::function<StatusWith<stdx::cv_status>()>& waitFn) noexcept; TailableModeEnum _tailableMode; - executor::TaskExecutor* _executor; + std::shared_ptr<executor::TaskExecutor> _executor; // In a case where we have a tailable, awaitData cursor, a call to 'next()' will block waiting // for an event generated by '_arm', but may time out waiting for this event to be triggered. diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp index 165140819ed..da55bf71dad 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.cpp +++ b/src/mongo/s/query/cluster_aggregation_planner.cpp @@ -483,7 +483,7 @@ void addMergeCursorsSource(Pipeline* mergePipeline, std::vector<OwnedRemoteCursor> ownedCursors, const std::vector<ShardId>& targetedShards, boost::optional<BSONObj> shardCursorsSortSpec, - executor::TaskExecutor* executor) { + std::shared_ptr<executor::TaskExecutor> executor) { auto* opCtx = mergePipeline->getContext()->opCtx; AsyncResultsMergerParams armParams; armParams.setSort(shardCursorsSortSpec); @@ -521,7 +521,7 @@ void addMergeCursorsSource(Pipeline* mergePipeline, // For change streams, we need to set up a custom stage to establish cursors on new shards when // they are added, to ensure we don't miss results from the new shards. auto mergeCursorsStage = DocumentSourceMergeCursors::create( - executor, std::move(armParams), mergePipeline->getContext()); + std::move(executor), std::move(armParams), mergePipeline->getContext()); if (liteParsedPipeline.hasChangeStream()) { mergePipeline->addInitialSource(DocumentSourceUpdateOnAddShard::create( diff --git a/src/mongo/s/query/cluster_aggregation_planner.h b/src/mongo/s/query/cluster_aggregation_planner.h index 1d5ed1e13b5..95158f8a81f 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.h +++ b/src/mongo/s/query/cluster_aggregation_planner.h @@ -29,6 +29,8 @@ #pragma once +#include <memory> + #include "mongo/db/pipeline/exchange_spec_gen.h" #include "mongo/db/pipeline/lite_parsed_pipeline.h" #include "mongo/db/pipeline/pipeline.h" @@ -82,7 +84,7 @@ void addMergeCursorsSource(Pipeline* mergePipeline, std::vector<OwnedRemoteCursor> remoteCursors, const std::vector<ShardId>& targetedShards, boost::optional<BSONObj> shardCursorsSortSpec, - executor::TaskExecutor*); + std::shared_ptr<executor::TaskExecutor> executor); /** * Builds a ClusterClientCursor which will execute 'pipeline'. If 'pipeline' consists entirely of diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp index 9644abfc432..c56dfc593fe 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.cpp +++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp @@ -57,11 +57,12 @@ std::unique_ptr<ClusterClientCursor> ClusterClientCursorGuard::releaseCursor() { return std::move(_ccc); } -ClusterClientCursorGuard ClusterClientCursorImpl::make(OperationContext* opCtx, - executor::TaskExecutor* executor, - ClusterClientCursorParams&& params) { +ClusterClientCursorGuard ClusterClientCursorImpl::make( + OperationContext* opCtx, + std::shared_ptr<executor::TaskExecutor> executor, + ClusterClientCursorParams&& params) { std::unique_ptr<ClusterClientCursor> cursor(new ClusterClientCursorImpl( - opCtx, executor, std::move(params), opCtx->getLogicalSessionId())); + opCtx, std::move(executor), std::move(params), opCtx->getLogicalSessionId())); return ClusterClientCursorGuard(opCtx, std::move(cursor)); } @@ -74,11 +75,11 @@ ClusterClientCursorGuard ClusterClientCursorImpl::make(OperationContext* opCtx, } ClusterClientCursorImpl::ClusterClientCursorImpl(OperationContext* opCtx, - executor::TaskExecutor* executor, + std::shared_ptr<executor::TaskExecutor> executor, ClusterClientCursorParams&& params, boost::optional<LogicalSessionId> lsid) : _params(std::move(params)), - _root(buildMergerPlan(opCtx, executor, &_params)), + _root(buildMergerPlan(opCtx, std::move(executor), &_params)), _lsid(lsid), _opCtx(opCtx), _createdDate(opCtx->getServiceContext()->getPreciseClockSource()->now()), @@ -222,7 +223,9 @@ boost::optional<ReadPreferenceSetting> ClusterClientCursorImpl::getReadPreferenc } std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan( - OperationContext* opCtx, executor::TaskExecutor* executor, ClusterClientCursorParams* params) { + OperationContext* opCtx, + std::shared_ptr<executor::TaskExecutor> executor, + ClusterClientCursorParams* params) { const auto skip = params->skip; const auto limit = params->limit; diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h index 50d36853cf8..2ec439e9627 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.h +++ b/src/mongo/s/query/cluster_client_cursor_impl.h @@ -90,7 +90,7 @@ public: * ensured by an RAII object. */ static ClusterClientCursorGuard make(OperationContext* opCtx, - executor::TaskExecutor* executor, + std::shared_ptr<executor::TaskExecutor> executor, ClusterClientCursorParams&& params); /** @@ -161,7 +161,7 @@ public: * Constructs a cluster client cursor. */ ClusterClientCursorImpl(OperationContext* opCtx, - executor::TaskExecutor* executor, + std::shared_ptr<executor::TaskExecutor> executor, ClusterClientCursorParams&& params, boost::optional<LogicalSessionId> lsid); @@ -169,9 +169,10 @@ private: /** * Constructs the pipeline of MergerPlanStages which will be used to answer the query. */ - std::unique_ptr<RouterExecStage> buildMergerPlan(OperationContext* opCtx, - executor::TaskExecutor* executor, - ClusterClientCursorParams* params); + std::unique_ptr<RouterExecStage> buildMergerPlan( + OperationContext* opCtx, + std::shared_ptr<executor::TaskExecutor> executor, + ClusterClientCursorParams* params); ClusterClientCursorParams _params; diff --git a/src/mongo/s/query/cluster_client_cursor_impl_test.cpp b/src/mongo/s/query/cluster_client_cursor_impl_test.cpp index fb90fdb1478..16cd7f8e2d3 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl_test.cpp +++ b/src/mongo/s/query/cluster_client_cursor_impl_test.cpp @@ -208,13 +208,15 @@ TEST_F(ClusterClientCursorImplTest, LogicalSessionIdsOnCursors) { } TEST_F(ClusterClientCursorImplTest, ShouldStoreLSIDIfSetOnOpCtx) { + std::shared_ptr<executor::TaskExecutor> nullExecutor; + { // Make a cursor with no lsid or txnNumber. ClusterClientCursorParams params(NamespaceString("test"), {}); params.lsid = _opCtx->getLogicalSessionId(); params.txnNumber = _opCtx->getTxnNumber(); - auto cursor = ClusterClientCursorImpl::make(_opCtx.get(), nullptr, std::move(params)); + auto cursor = ClusterClientCursorImpl::make(_opCtx.get(), nullExecutor, std::move(params)); ASSERT_FALSE(cursor->getLsid()); ASSERT_FALSE(cursor->getTxnNumber()); } @@ -228,7 +230,7 @@ TEST_F(ClusterClientCursorImplTest, ShouldStoreLSIDIfSetOnOpCtx) { params.lsid = _opCtx->getLogicalSessionId(); params.txnNumber = _opCtx->getTxnNumber(); - auto cursor = ClusterClientCursorImpl::make(_opCtx.get(), nullptr, std::move(params)); + auto cursor = ClusterClientCursorImpl::make(_opCtx.get(), nullExecutor, std::move(params)); ASSERT_EQ(*cursor->getLsid(), lsid); ASSERT_FALSE(cursor->getTxnNumber()); } @@ -242,7 +244,7 @@ TEST_F(ClusterClientCursorImplTest, ShouldStoreLSIDIfSetOnOpCtx) { params.lsid = _opCtx->getLogicalSessionId(); params.txnNumber = _opCtx->getTxnNumber(); - auto cursor = ClusterClientCursorImpl::make(_opCtx.get(), nullptr, std::move(params)); + auto cursor = ClusterClientCursorImpl::make(_opCtx.get(), nullExecutor, std::move(params)); ASSERT_EQ(*cursor->getLsid(), lsid); ASSERT_EQ(*cursor->getTxnNumber(), txnNumber); } diff --git a/src/mongo/s/query/document_source_merge_cursors.cpp b/src/mongo/s/query/document_source_merge_cursors.cpp index 79634ac15c5..4d4cbebb7a8 100644 --- a/src/mongo/s/query/document_source_merge_cursors.cpp +++ b/src/mongo/s/query/document_source_merge_cursors.cpp @@ -46,13 +46,13 @@ REGISTER_DOCUMENT_SOURCE(mergeCursors, constexpr StringData DocumentSourceMergeCursors::kStageName; DocumentSourceMergeCursors::DocumentSourceMergeCursors( - executor::TaskExecutor* executor, + std::shared_ptr<executor::TaskExecutor> executor, AsyncResultsMergerParams armParams, const boost::intrusive_ptr<ExpressionContext>& expCtx, boost::optional<BSONObj> ownedParamsSpec) : DocumentSource(expCtx), _armParamsObj(std::move(ownedParamsSpec)), - _executor(executor), + _executor(std::move(executor)), _armParams(std::move(armParams)) {} std::size_t DocumentSourceMergeCursors::getNumRemotes() const { @@ -129,10 +129,10 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceMergeCursors::createFromBson( } boost::intrusive_ptr<DocumentSourceMergeCursors> DocumentSourceMergeCursors::create( - executor::TaskExecutor* executor, + std::shared_ptr<executor::TaskExecutor> executor, AsyncResultsMergerParams params, const boost::intrusive_ptr<ExpressionContext>& expCtx) { - return new DocumentSourceMergeCursors(executor, std::move(params), expCtx); + return new DocumentSourceMergeCursors(std::move(executor), std::move(params), expCtx); } void DocumentSourceMergeCursors::detachFromOperationContext() { diff --git a/src/mongo/s/query/document_source_merge_cursors.h b/src/mongo/s/query/document_source_merge_cursors.h index 60cdf7d25f6..f8e25299260 100644 --- a/src/mongo/s/query/document_source_merge_cursors.h +++ b/src/mongo/s/query/document_source_merge_cursors.h @@ -29,6 +29,8 @@ #pragma once +#include <memory> + #include "mongo/db/pipeline/document_source.h" #include "mongo/executor/task_executor.h" #include "mongo/s/query/blocking_results_merger.h" @@ -60,7 +62,7 @@ public: * Creates a new DocumentSourceMergeCursors from the given parameters. */ static boost::intrusive_ptr<DocumentSourceMergeCursors> create( - executor::TaskExecutor*, + std::shared_ptr<executor::TaskExecutor>, AsyncResultsMergerParams, const boost::intrusive_ptr<ExpressionContext>&); @@ -149,7 +151,7 @@ protected: void doDispose() final; private: - DocumentSourceMergeCursors(executor::TaskExecutor*, + DocumentSourceMergeCursors(std::shared_ptr<executor::TaskExecutor>, AsyncResultsMergerParams, const boost::intrusive_ptr<ExpressionContext>&, boost::optional<BSONObj> ownedParamsSpec = boost::none); @@ -164,7 +166,7 @@ private: // params are in use. We store them here. boost::optional<BSONObj> _armParamsObj; - executor::TaskExecutor* _executor; + std::shared_ptr<executor::TaskExecutor> _executor; // '_blockingResultsMerger' is lazily populated. Until we need to use it, '_armParams' will be // populated with the parameters. Once we start using '_blockingResultsMerger', '_armParams' diff --git a/src/mongo/s/query/document_source_update_on_add_shard.cpp b/src/mongo/s/query/document_source_update_on_add_shard.cpp index 3a295f200f7..00a8971454c 100644 --- a/src/mongo/s/query/document_source_update_on_add_shard.cpp +++ b/src/mongo/s/query/document_source_update_on_add_shard.cpp @@ -49,22 +49,25 @@ bool needsUpdate(const Document& childResult) { boost::intrusive_ptr<DocumentSourceUpdateOnAddShard> DocumentSourceUpdateOnAddShard::create( const boost::intrusive_ptr<ExpressionContext>& expCtx, - executor::TaskExecutor* executor, + std::shared_ptr<executor::TaskExecutor> executor, const boost::intrusive_ptr<DocumentSourceMergeCursors>& mergeCursors, std::vector<ShardId> shardsWithCursors, BSONObj cmdToRunOnNewShards) { - return new DocumentSourceUpdateOnAddShard( - expCtx, executor, mergeCursors, std::move(shardsWithCursors), cmdToRunOnNewShards); + return new DocumentSourceUpdateOnAddShard(expCtx, + std::move(executor), + mergeCursors, + std::move(shardsWithCursors), + cmdToRunOnNewShards); } DocumentSourceUpdateOnAddShard::DocumentSourceUpdateOnAddShard( const boost::intrusive_ptr<ExpressionContext>& expCtx, - executor::TaskExecutor* executor, + std::shared_ptr<executor::TaskExecutor> executor, const boost::intrusive_ptr<DocumentSourceMergeCursors>& mergeCursors, std::vector<ShardId>&& shardsWithCursors, BSONObj cmdToRunOnNewShards) : DocumentSource(expCtx), - _executor(executor), + _executor(std::move(executor)), _mergeCursors(mergeCursors), _shardsWithCursors(std::move(shardsWithCursors)), _cmdToRunOnNewShards(cmdToRunOnNewShards.getOwned()) {} diff --git a/src/mongo/s/query/document_source_update_on_add_shard.h b/src/mongo/s/query/document_source_update_on_add_shard.h index 5b00ece5e52..e8f2473b432 100644 --- a/src/mongo/s/query/document_source_update_on_add_shard.h +++ b/src/mongo/s/query/document_source_update_on_add_shard.h @@ -29,6 +29,8 @@ #pragma once +#include <memory> + #include "mongo/db/pipeline/document_source.h" #include "mongo/executor/task_executor.h" #include "mongo/s/query/document_source_merge_cursors.h" @@ -51,7 +53,7 @@ public: */ static boost::intrusive_ptr<DocumentSourceUpdateOnAddShard> create( const boost::intrusive_ptr<ExpressionContext>&, - executor::TaskExecutor*, + std::shared_ptr<executor::TaskExecutor> executor, const boost::intrusive_ptr<DocumentSourceMergeCursors>&, std::vector<ShardId> shardsWithCursors, BSONObj cmdToRunOnNewShards); @@ -81,7 +83,7 @@ public: private: DocumentSourceUpdateOnAddShard(const boost::intrusive_ptr<ExpressionContext>&, - executor::TaskExecutor*, + std::shared_ptr<executor::TaskExecutor> executor, const boost::intrusive_ptr<DocumentSourceMergeCursors>&, std::vector<ShardId>&& shardsWithCursors, BSONObj cmdToRunOnNewShards); @@ -96,7 +98,7 @@ private: */ std::vector<RemoteCursor> establishShardCursorsOnNewShards(const Document& newShardDetectedObj); - executor::TaskExecutor* _executor; + std::shared_ptr<executor::TaskExecutor> _executor; boost::intrusive_ptr<DocumentSourceMergeCursors> _mergeCursors; std::vector<ShardId> _shardsWithCursors; BSONObj _cmdToRunOnNewShards; diff --git a/src/mongo/s/query/establish_cursors.cpp b/src/mongo/s/query/establish_cursors.cpp index 614672813a1..b97f9026d03 100644 --- a/src/mongo/s/query/establish_cursors.cpp +++ b/src/mongo/s/query/establish_cursors.cpp @@ -50,7 +50,7 @@ namespace mongo { std::vector<RemoteCursor> establishCursors(OperationContext* opCtx, - executor::TaskExecutor* executor, + std::shared_ptr<executor::TaskExecutor> executor, const NamespaceString& nss, const ReadPreferenceSetting readPref, const std::vector<std::pair<ShardId, BSONObj>>& remotes, @@ -136,7 +136,7 @@ std::vector<RemoteCursor> establishCursors(OperationContext* opCtx, } // Schedule killCursors against all cursors that were established. - killRemoteCursors(opCtx, executor, std::move(remoteCursors), nss); + killRemoteCursors(opCtx, executor.get(), std::move(remoteCursors), nss); } catch (const DBException&) { // Ignore the new error and rethrow the original one. } diff --git a/src/mongo/s/query/establish_cursors.h b/src/mongo/s/query/establish_cursors.h index be7eac8d869..97e72225072 100644 --- a/src/mongo/s/query/establish_cursors.h +++ b/src/mongo/s/query/establish_cursors.h @@ -30,6 +30,7 @@ #pragma once #include <boost/optional.hpp> +#include <memory> #include <vector> #include "mongo/base/status_with.h" @@ -64,7 +65,7 @@ class CursorResponse; */ std::vector<RemoteCursor> establishCursors( OperationContext* opCtx, - executor::TaskExecutor* executor, + std::shared_ptr<executor::TaskExecutor> executor, const NamespaceString& nss, const ReadPreferenceSetting readPref, const std::vector<std::pair<ShardId, BSONObj>>& remotes, diff --git a/src/mongo/s/query/owned_remote_cursor.h b/src/mongo/s/query/owned_remote_cursor.h index 473196e4de5..0e4e03103fe 100644 --- a/src/mongo/s/query/owned_remote_cursor.h +++ b/src/mongo/s/query/owned_remote_cursor.h @@ -50,10 +50,8 @@ public: ~OwnedRemoteCursor() { if (_remoteCursor) { - killRemoteCursor(_opCtx, - Grid::get(_opCtx)->getExecutorPool()->getArbitraryExecutor(), - releaseCursor(), - _nss); + auto executor = Grid::get(_opCtx)->getExecutorPool()->getArbitraryExecutor(); + killRemoteCursor(_opCtx, executor.get(), releaseCursor(), _nss); } } diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h index 574f39a280d..a7cead17d89 100644 --- a/src/mongo/s/query/router_stage_merge.h +++ b/src/mongo/s/query/router_stage_merge.h @@ -29,6 +29,8 @@ #pragma once +#include <memory> + #include "mongo/executor/task_executor.h" #include "mongo/s/query/blocking_results_merger.h" #include "mongo/s/query/cluster_client_cursor_params.h" @@ -44,9 +46,10 @@ namespace mongo { class RouterStageMerge final : public RouterExecStage { public: RouterStageMerge(OperationContext* opCtx, - executor::TaskExecutor* executor, + std::shared_ptr<executor::TaskExecutor> executor, AsyncResultsMergerParams&& armParams) - : RouterExecStage(opCtx), _resultsMerger(opCtx, std::move(armParams), executor, nullptr) {} + : RouterExecStage(opCtx), + _resultsMerger(opCtx, std::move(armParams), std::move(executor), nullptr) {} StatusWith<ClusterQueryResult> next(ExecContext execCtx) final { return _resultsMerger.next(getOpCtx(), execCtx); diff --git a/src/mongo/s/query/store_possible_cursor.cpp b/src/mongo/s/query/store_possible_cursor.cpp index e2edf321a4e..9877af451a2 100644 --- a/src/mongo/s/query/store_possible_cursor.cpp +++ b/src/mongo/s/query/store_possible_cursor.cpp @@ -74,7 +74,7 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx, const HostAndPort& server, const BSONObj& cmdResult, const NamespaceString& requestedNss, - executor::TaskExecutor* executor, + std::shared_ptr<executor::TaskExecutor> executor, ClusterCursorManager* cursorManager, PrivilegeVector privileges, TailableModeEnum tailableMode) { @@ -117,7 +117,7 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx, params.isAutoCommit = false; } - auto ccc = ClusterClientCursorImpl::make(opCtx, executor, std::move(params)); + auto ccc = ClusterClientCursorImpl::make(opCtx, std::move(executor), std::move(params)); // We don't expect to use this cursor until a subsequent getMore, so detach from the current // OperationContext until then. diff --git a/src/mongo/s/query/store_possible_cursor.h b/src/mongo/s/query/store_possible_cursor.h index 5202935df32..38b13b4ea7a 100644 --- a/src/mongo/s/query/store_possible_cursor.h +++ b/src/mongo/s/query/store_possible_cursor.h @@ -78,7 +78,7 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx, const HostAndPort& server, const BSONObj& cmdResult, const NamespaceString& requestedNss, - executor::TaskExecutor* executor, + std::shared_ptr<executor::TaskExecutor> executor, ClusterCursorManager* cursorManager, PrivilegeVector privileges, TailableModeEnum tailableMode = TailableModeEnum::kNormal); diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp index 0b542f920dc..a18d35ca6ed 100644 --- a/src/mongo/s/sharding_initialization.cpp +++ b/src/mongo/s/sharding_initialization.cpp @@ -98,7 +98,7 @@ std::unique_ptr<ShardingCatalogClient> makeCatalogClient(ServiceContext* service return stdx::make_unique<ShardingCatalogClientImpl>(std::move(distLockManager)); } -std::unique_ptr<executor::TaskExecutor> makeShardingFixedTaskExecutor( +std::shared_ptr<executor::TaskExecutor> makeShardingFixedTaskExecutor( std::unique_ptr<NetworkInterface> net) { auto executor = stdx::make_unique<ThreadPoolTaskExecutor>(stdx::make_unique<ThreadPool>([] { @@ -110,7 +110,7 @@ std::unique_ptr<executor::TaskExecutor> makeShardingFixedTaskExecutor( }()), std::move(net)); - return stdx::make_unique<executor::ShardingTaskExecutor>(std::move(executor)); + return std::make_shared<executor::ShardingTaskExecutor>(std::move(executor)); } std::unique_ptr<TaskExecutorPool> makeShardingTaskExecutorPool( @@ -118,7 +118,7 @@ std::unique_ptr<TaskExecutorPool> makeShardingTaskExecutorPool( rpc::ShardingEgressMetadataHookBuilder metadataHookBuilder, ConnectionPool::Options connPoolOptions, boost::optional<size_t> taskExecutorPoolSize) { - std::vector<std::unique_ptr<executor::TaskExecutor>> executors; + std::vector<std::shared_ptr<executor::TaskExecutor>> executors; const auto poolSize = taskExecutorPoolSize.value_or(TaskExecutorPool::getSuggestedPoolSize()); diff --git a/src/mongo/s/sharding_mongod_test_fixture.cpp b/src/mongo/s/sharding_mongod_test_fixture.cpp index 06081591f0c..92a27af4c92 100644 --- a/src/mongo/s/sharding_mongod_test_fixture.cpp +++ b/src/mongo/s/sharding_mongod_test_fixture.cpp @@ -171,13 +171,13 @@ std::unique_ptr<executor::TaskExecutorPool> ShardingMongodTestFixture::makeTaskE // note that the ThreadPoolMock uses the NetworkInterfaceMock's threads to run tasks, which is // again just the (single) thread the unit test is running on. Therefore, all tasks, local and // remote, must be carried out synchronously by the test thread. - auto fixedTaskExecutor = makeThreadPoolTestExecutor(std::move(netForFixedTaskExecutor)); + auto fixedTaskExecutor = makeSharedThreadPoolTestExecutor(std::move(netForFixedTaskExecutor)); _networkTestEnv = stdx::make_unique<NetworkTestEnv>(fixedTaskExecutor.get(), _mockNetwork); // Set up (one) TaskExecutor for the set of arbitrary TaskExecutors. - std::vector<std::unique_ptr<executor::TaskExecutor>> arbitraryExecutorsForExecutorPool; + std::vector<std::shared_ptr<executor::TaskExecutor>> arbitraryExecutorsForExecutorPool; arbitraryExecutorsForExecutorPool.emplace_back( - makeThreadPoolTestExecutor(stdx::make_unique<executor::NetworkInterfaceMock>())); + makeSharedThreadPoolTestExecutor(stdx::make_unique<executor::NetworkInterfaceMock>())); // Set up the TaskExecutorPool with the fixed TaskExecutor and set of arbitrary TaskExecutors. auto executorPool = stdx::make_unique<executor::TaskExecutorPool>(); @@ -338,7 +338,7 @@ void ShardingMongodTestFixture::shutdownExecutorPool() { _executorPoolShutDown = true; } -executor::TaskExecutor* ShardingMongodTestFixture::executor() const { +std::shared_ptr<executor::TaskExecutor> ShardingMongodTestFixture::executor() const { invariant(Grid::get(operationContext())->getExecutorPool()); return Grid::get(operationContext())->getExecutorPool()->getFixedExecutor(); } diff --git a/src/mongo/s/sharding_mongod_test_fixture.h b/src/mongo/s/sharding_mongod_test_fixture.h index ee29f7de8b6..b7cadc17792 100644 --- a/src/mongo/s/sharding_mongod_test_fixture.h +++ b/src/mongo/s/sharding_mongod_test_fixture.h @@ -76,7 +76,7 @@ public: CatalogCache* catalogCache() const; ShardRegistry* shardRegistry() const; RemoteCommandTargeterFactoryMock* targeterFactory() const; - executor::TaskExecutor* executor() const; + std::shared_ptr<executor::TaskExecutor> executor() const; DistLockManager* distLock() const; ClusterCursorManager* clusterCursorManager() const; executor::TaskExecutorPool* executorPool() const; diff --git a/src/mongo/s/sharding_router_test_fixture.cpp b/src/mongo/s/sharding_router_test_fixture.cpp index 2b4f11a1216..0261a54f8ba 100644 --- a/src/mongo/s/sharding_router_test_fixture.cpp +++ b/src/mongo/s/sharding_router_test_fixture.cpp @@ -118,9 +118,8 @@ ShardingTestFixture::ShardingTestFixture() { auto fixedNet = stdx::make_unique<executor::NetworkInterfaceMock>(); fixedNet->setEgressMetadataHook(makeMetadataHookList()); _mockNetwork = fixedNet.get(); - auto fixedExec = makeShardingTestExecutor(std::move(fixedNet)); - _networkTestEnv = stdx::make_unique<NetworkTestEnv>(fixedExec.get(), _mockNetwork); - _executor = fixedExec.get(); + _fixedExecutor = makeShardingTestExecutor(std::move(fixedNet)); + _networkTestEnv = stdx::make_unique<NetworkTestEnv>(_fixedExecutor.get(), _mockNetwork); auto netForPool = stdx::make_unique<executor::NetworkInterfaceMock>(); netForPool->setEgressMetadataHook(makeMetadataHookList()); @@ -128,11 +127,11 @@ ShardingTestFixture::ShardingTestFixture() { auto execForPool = makeShardingTestExecutor(std::move(netForPool)); _networkTestEnvForPool = stdx::make_unique<NetworkTestEnv>(execForPool.get(), _mockNetworkForPool); - std::vector<std::unique_ptr<executor::TaskExecutor>> executorsForPool; + std::vector<std::shared_ptr<executor::TaskExecutor>> executorsForPool; executorsForPool.emplace_back(std::move(execForPool)); auto executorPool = stdx::make_unique<executor::TaskExecutorPool>(); - executorPool->addExecutors(std::move(executorsForPool), std::move(fixedExec)); + executorPool->addExecutors(std::move(executorsForPool), _fixedExecutor); auto uniqueDistLockManager = stdx::make_unique<DistLockManagerMock>(nullptr); _distLockManager = uniqueDistLockManager.get(); @@ -195,8 +194,8 @@ ShardingTestFixture::~ShardingTestFixture() { } void ShardingTestFixture::shutdownExecutor() { - if (_executor) - _executor->shutdown(); + if (_fixedExecutor) + _fixedExecutor->shutdown(); } ShardingCatalogClient* ShardingTestFixture::catalogClient() const { @@ -219,10 +218,10 @@ RemoteCommandTargeterMock* ShardingTestFixture::configTargeter() const { return _configTargeter; } -executor::TaskExecutor* ShardingTestFixture::executor() const { - invariant(_executor); +std::shared_ptr<executor::TaskExecutor> ShardingTestFixture::executor() const { + invariant(_fixedExecutor); - return _executor; + return _fixedExecutor; } DistLockManagerMock* ShardingTestFixture::distLock() const { diff --git a/src/mongo/s/sharding_router_test_fixture.h b/src/mongo/s/sharding_router_test_fixture.h index 05fa042a8a7..6f3ca156e12 100644 --- a/src/mongo/s/sharding_router_test_fixture.h +++ b/src/mongo/s/sharding_router_test_fixture.h @@ -62,7 +62,7 @@ public: ShardingCatalogClient* catalogClient() const; ShardRegistry* shardRegistry() const; RemoteCommandTargeterFactoryMock* targeterFactory() const; - executor::TaskExecutor* executor() const; + std::shared_ptr<executor::TaskExecutor> executor() const; DistLockManagerMock* distLock() const; RemoteCommandTargeterMock* configTargeter() const; @@ -180,7 +180,7 @@ private: RemoteCommandTargeterMock* _configTargeter; // For the Grid's fixed executor. - executor::TaskExecutor* _executor; + std::shared_ptr<executor::TaskExecutor> _fixedExecutor; // For the Grid's arbitrary executor in its executorPool. std::unique_ptr<executor::NetworkTestEnv> _networkTestEnvForPool; |