diff options
author | Matthew Russotto <matthew.russotto@mongodb.com> | 2020-09-15 11:37:11 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-09-16 14:44:23 +0000 |
commit | 728fc1c52300cd409dd0619ebe669f8d0bd1835a (patch) | |
tree | 93a36bee14fec25258991a741f63b979b0788c96 /src/mongo/db | |
parent | 5a1953c4ae4416a60c45c5c90a210bd84b352032 (diff) | |
download | mongo-728fc1c52300cd409dd0619ebe669f8d0bd1835a.tar.gz |
SERVER-50320 Fix cancellation races in initial syncer
Diffstat (limited to 'src/mongo/db')
14 files changed, 139 insertions, 61 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index dd28cc7cf14..99b7db99e5f 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -1182,6 +1182,7 @@ env.Library( LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/db/commands/feature_compatibility_parsers', '$BUILD_DIR/mongo/db/index_builds_coordinator_interface', + '$BUILD_DIR/mongo/executor/scoped_task_executor', 'repl_server_parameters', ] ) diff --git a/src/mongo/db/repl/data_replicator_external_state.h b/src/mongo/db/repl/data_replicator_external_state.h index 2d2e56df3d1..067a131201b 100644 --- a/src/mongo/db/repl/data_replicator_external_state.h +++ b/src/mongo/db/repl/data_replicator_external_state.h @@ -78,6 +78,7 @@ public: * Returns task executor for scheduling tasks to be run asynchronously. */ virtual executor::TaskExecutor* getTaskExecutor() const = 0; + virtual std::shared_ptr<executor::TaskExecutor> getSharedTaskExecutor() const = 0; /** * Returns the current term and last committed optime. diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.cpp b/src/mongo/db/repl/data_replicator_external_state_impl.cpp index 6849dd52418..31bb90785cd 100644 --- a/src/mongo/db/repl/data_replicator_external_state_impl.cpp +++ b/src/mongo/db/repl/data_replicator_external_state_impl.cpp @@ -73,6 +73,11 @@ executor::TaskExecutor* DataReplicatorExternalStateImpl::getTaskExecutor() const return _replicationCoordinatorExternalState->getTaskExecutor(); } +std::shared_ptr<executor::TaskExecutor> DataReplicatorExternalStateImpl::getSharedTaskExecutor() + const { + return _replicationCoordinatorExternalState->getSharedTaskExecutor(); +} + OpTimeWithTerm DataReplicatorExternalStateImpl::getCurrentTermAndLastCommittedOpTime() { return {_replicationCoordinator->getTerm(), _replicationCoordinator->getLastCommittedOpTime()}; } diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.h b/src/mongo/db/repl/data_replicator_external_state_impl.h index 3741fbae7c4..aa62e77a7d0 100644 --- a/src/mongo/db/repl/data_replicator_external_state_impl.h +++ b/src/mongo/db/repl/data_replicator_external_state_impl.h @@ -48,6 +48,7 @@ public: ReplicationCoordinatorExternalState* replicationCoordinatorExternalState); executor::TaskExecutor* getTaskExecutor() const override; + std::shared_ptr<executor::TaskExecutor> getSharedTaskExecutor() const; OpTimeWithTerm getCurrentTermAndLastCommittedOpTime() override; diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.cpp b/src/mongo/db/repl/data_replicator_external_state_mock.cpp index 305c4e1abd2..5e292f899d1 100644 --- a/src/mongo/db/repl/data_replicator_external_state_mock.cpp +++ b/src/mongo/db/repl/data_replicator_external_state_mock.cpp @@ -75,6 +75,10 @@ DataReplicatorExternalStateMock::DataReplicatorExternalStateMock() OplogApplier::Observer*) { return ops.back().getOpTime(); }) {} executor::TaskExecutor* DataReplicatorExternalStateMock::getTaskExecutor() const { + return taskExecutor.get(); +} +std::shared_ptr<executor::TaskExecutor> DataReplicatorExternalStateMock::getSharedTaskExecutor() + const { return taskExecutor; } diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.h b/src/mongo/db/repl/data_replicator_external_state_mock.h index a9155c30b9e..b32ff68df1c 100644 --- a/src/mongo/db/repl/data_replicator_external_state_mock.h +++ b/src/mongo/db/repl/data_replicator_external_state_mock.h @@ -45,6 +45,7 @@ public: DataReplicatorExternalStateMock(); executor::TaskExecutor* getTaskExecutor() const override; + std::shared_ptr<executor::TaskExecutor> getSharedTaskExecutor() const override; OpTimeWithTerm getCurrentTermAndLastCommittedOpTime() override; @@ -69,8 +70,8 @@ public: StatusWith<ReplSetConfig> getCurrentConfig() const override; - // Task executor. Not owned by us. - executor::TaskExecutor* taskExecutor = nullptr; + // Task executor. + std::shared_ptr<executor::TaskExecutor> taskExecutor = nullptr; // Returned by getCurrentTermAndLastCommittedOpTime. long long currentTerm = OpTime::kUninitializedTerm; diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp index 0048ff09bf1..7c5c08f8096 100644 --- a/src/mongo/db/repl/initial_syncer.cpp +++ b/src/mongo/db/repl/initial_syncer.cpp @@ -88,6 +88,10 @@ MONGO_FAIL_POINT_DEFINE(failInitSyncWithBufferedEntriesLeft); // transaction timestamp from the sync source. MONGO_FAIL_POINT_DEFINE(initialSyncHangAfterGettingBeginFetchingTimestamp); +// Failpoint which causes the initial sync function to hang before creating shared data and +// splitting control flow between the oplog fetcher and the cloners. +MONGO_FAIL_POINT_DEFINE(initialSyncHangBeforeSplittingControlFlow); + // Failpoint which causes the initial sync function to hang before copying databases. MONGO_FAIL_POINT_DEFINE(initialSyncHangBeforeCopyingDatabases); @@ -202,7 +206,7 @@ InitialSyncer::InitialSyncer( : _fetchCount(0), _opts(opts), _dataReplicatorExternalState(std::move(dataReplicatorExternalState)), - _exec(_dataReplicatorExternalState->getTaskExecutor()), + _exec(_dataReplicatorExternalState->getSharedTaskExecutor()), _clonerExec(_exec), _writerPool(writerPool), _storage(storage), @@ -284,6 +288,10 @@ Status InitialSyncer::startup(OperationContext* opCtx, // Start first initial sync attempt. std::uint32_t initialSyncAttempt = 0; + _attemptExec = std::make_unique<executor::ScopedTaskExecutor>( + _exec, Status(ErrorCodes::CallbackCanceled, "Initial Sync Attempt Canceled")); + _clonerAttemptExec = std::make_unique<executor::ScopedTaskExecutor>( + _clonerExec, Status(ErrorCodes::CallbackCanceled, "Initial Sync Attempt Canceled")); auto status = _scheduleWorkAndSaveHandle_inlock( [=](const executor::TaskExecutor::CallbackArgs& args) { _startInitialSyncAttemptCallback(args, initialSyncAttempt, initialSyncMaxAttempts); @@ -358,6 +366,9 @@ void InitialSyncer::_cancelRemainingWork_inlock() { _shutdownComponent_inlock(_fCVFetcher); _shutdownComponent_inlock(_lastOplogEntryFetcher); _shutdownComponent_inlock(_beginFetchingOpTimeFetcher); + (*_attemptExec)->shutdown(); + (*_clonerAttemptExec)->shutdown(); + _attemptCanceled = true; } void InitialSyncer::join() { @@ -501,7 +512,7 @@ OplogFetcher* InitialSyncer::getOplogFetcher_forTest() const { return nullptr; } -void InitialSyncer::setClonerExecutor_forTest(executor::TaskExecutor* clonerExec) { +void InitialSyncer::setClonerExecutor_forTest(std::shared_ptr<executor::TaskExecutor> clonerExec) { _clonerExec = clonerExec; } @@ -693,7 +704,7 @@ void InitialSyncer::_chooseSyncSourceCallback( return; } - auto when = _exec->now() + _opts.syncSourceRetryWait; + auto when = (*_attemptExec)->now() + _opts.syncSourceRetryWait; LOGV2_DEBUG(21169, 1, "Error getting sync source: '{error}', trying again in " @@ -747,7 +758,7 @@ void InitialSyncer::_chooseSyncSourceCallback( _syncSource = syncSource.getValue(); // Schedule rollback ID checker. - _rollbackChecker = std::make_unique<RollbackChecker>(_exec, _syncSource); + _rollbackChecker = std::make_unique<RollbackChecker>(*_attemptExec, _syncSource); auto scheduleResult = _rollbackChecker->reset([=](const RollbackChecker::Result& result) { return _rollbackCheckerResetCallback(result, onCompletionGuard); }); @@ -898,7 +909,7 @@ Status InitialSyncer::_scheduleGetBeginFetchingOpTime_inlock( cmd.append("limit", 1); _beginFetchingOpTimeFetcher = std::make_unique<Fetcher>( - _exec, + *_attemptExec, _syncSource, NamespaceString::kSessionTransactionsTableNamespace.db().toString(), cmd.obj(), @@ -1027,7 +1038,7 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForBeginApplyingTimestamp( readConcernBob.done(); _fCVFetcher = std::make_unique<Fetcher>( - _exec, + *_attemptExec, _syncSource, NamespaceString::kServerConfigurationNamespace.db().toString(), queryBob.obj(), @@ -1100,6 +1111,18 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse> return; } + if (MONGO_unlikely(initialSyncHangBeforeSplittingControlFlow.shouldFail())) { + lock.unlock(); + LOGV2(5032000, + "initial sync - initialSyncHangBeforeSplittingControlFlow fail point " + "enabled. Blocking until fail point is disabled."); + while (MONGO_unlikely(initialSyncHangBeforeSplittingControlFlow.shouldFail()) && + !_isShuttingDown()) { + mongo::sleepsecs(1); + } + lock.lock(); + } + // This is where the flow of control starts to split into two parallel tracks: // - oplog fetcher // - data cloning and applier @@ -1155,7 +1178,7 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse> const auto& config = configResult.getValue(); _oplogFetcher = _createOplogFetcherFn( - _exec, + *_attemptExec, beginFetchingOpTime, _syncSource, config, @@ -1209,7 +1232,7 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse> "allDatabaseCloner"_attr = _initialSyncState->allDatabaseCloner->toString()); auto [startClonerFuture, startCloner] = - _initialSyncState->allDatabaseCloner->runOnExecutorEvent(_clonerExec); + _initialSyncState->allDatabaseCloner->runOnExecutorEvent(*_clonerAttemptExec); // runOnExecutorEvent ensures the future is not ready unless an error has occurred. if (startClonerFuture.isReady()) { status = startClonerFuture.getNoThrow(); @@ -1224,10 +1247,11 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse> // and in unit tests, if the cloner finishes very quickly, the callback could run // in-line and result in self-deadlock. stdx::unique_lock<Latch> lock(_mutex); - auto exec_status = _exec->scheduleWork( - [this, status, onCompletionGuard](executor::TaskExecutor::CallbackArgs args) { - _allDatabaseClonerCallback(status, onCompletionGuard); - }); + auto exec_status = (*_attemptExec) + ->scheduleWork([this, status, onCompletionGuard]( + executor::TaskExecutor::CallbackArgs args) { + _allDatabaseClonerCallback(status, onCompletionGuard); + }); if (!exec_status.isOK()) { onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, exec_status.getStatus()); @@ -1244,7 +1268,7 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse> lock.unlock(); // Start (and therefore finish) the cloners outside the lock. This ensures onCompletion // is not run with the mutex held, which would result in self-deadlock. - _clonerExec->signalEvent(startCloner); + (*_clonerAttemptExec)->signalEvent(startCloner); } void InitialSyncer::_oplogFetcherCallback(const Status& oplogFetcherFinishStatus, @@ -1343,29 +1367,31 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForStopTimestamp( auto status = _checkForShutdownAndConvertStatus_inlock( result.getStatus(), "error fetching last oplog entry for stop timestamp"); if (_shouldRetryError(lock, status)) { - auto scheduleStatus = _exec->scheduleWork( - [this, onCompletionGuard](executor::TaskExecutor::CallbackArgs args) { - // It is not valid to schedule the retry from within this callback, - // hence we schedule a lambda to schedule the retry. - stdx::lock_guard<Latch> lock(_mutex); - // Since the stopTimestamp is retrieved after we have done all the work of - // retrieving collection data, we handle retries within this class by retrying - // for 'initialSyncTransientErrorRetryPeriodSeconds' (default 24 hours). This - // is the same retry strategy used when retrieving collection data, and avoids - // retrieving all the data and then throwing it away due to a transient network - // outage. - auto status = _scheduleLastOplogEntryFetcher_inlock( - [=](const StatusWith<mongo::Fetcher::QueryResponse>& status, - mongo::Fetcher::NextAction*, - mongo::BSONObjBuilder*) { - _lastOplogEntryFetcherCallbackForStopTimestamp(status, - onCompletionGuard); - }, - kInitialSyncerHandlesRetries); - if (!status.isOK()) { - onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); - } - }); + auto scheduleStatus = + (*_attemptExec) + ->scheduleWork([this, + onCompletionGuard](executor::TaskExecutor::CallbackArgs args) { + // It is not valid to schedule the retry from within this callback, + // hence we schedule a lambda to schedule the retry. + stdx::lock_guard<Latch> lock(_mutex); + // Since the stopTimestamp is retrieved after we have done all the work of + // retrieving collection data, we handle retries within this class by + // retrying for 'initialSyncTransientErrorRetryPeriodSeconds' (default 24 + // hours). This is the same retry strategy used when retrieving collection + // data, and avoids retrieving all the data and then throwing it away due to + // a transient network outage. + auto status = _scheduleLastOplogEntryFetcher_inlock( + [=](const StatusWith<mongo::Fetcher::QueryResponse>& status, + mongo::Fetcher::NextAction*, + mongo::BSONObjBuilder*) { + _lastOplogEntryFetcherCallbackForStopTimestamp(status, + onCompletionGuard); + }, + kInitialSyncerHandlesRetries); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + } + }); if (scheduleStatus.isOK()) return; // If scheduling failed, we're shutting down and cannot retry. @@ -1494,7 +1520,7 @@ void InitialSyncer::_getNextApplierBatchCallback( }; _applier = std::make_unique<MultiApplier>( - _exec, ops, std::move(applyBatchOfOperationsFn), std::move(onCompletionFn)); + *_attemptExec, ops, std::move(applyBatchOfOperationsFn), std::move(onCompletionFn)); status = _startupComponent_inlock(_applier); if (!status.isOK()) { onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); @@ -1527,7 +1553,7 @@ void InitialSyncer::_getNextApplierBatchCallback( // If there are no operations at the moment to apply and the oplog fetcher is still waiting on // the sync source, we'll check the oplog buffer again in // '_opts.getApplierBatchCallbackRetryWait' ms. - auto when = _exec->now() + _opts.getApplierBatchCallbackRetryWait; + auto when = (*_attemptExec)->now() + _opts.getApplierBatchCallbackRetryWait; status = _scheduleWorkAtAndSaveHandle_inlock( when, [=](const CallbackArgs& args) { _getNextApplierBatchCallback(args, onCompletionGuard); }, @@ -1721,7 +1747,12 @@ void InitialSyncer::_finishInitialSyncAttempt(const StatusWith<OpTimeAndWallTime return; } - auto when = _exec->now() + _opts.initialSyncRetryWait; + _attemptExec = std::make_unique<executor::ScopedTaskExecutor>( + _exec, Status(ErrorCodes::CallbackCanceled, "Initial Sync Attempt Canceled")); + _clonerAttemptExec = std::make_unique<executor::ScopedTaskExecutor>( + _clonerExec, Status(ErrorCodes::CallbackCanceled, "Initial Sync Attempt Canceled")); + _attemptCanceled = false; + auto when = (*_attemptExec)->now() + _opts.initialSyncRetryWait; auto status = _scheduleWorkAtAndSaveHandle_inlock( when, [=](const executor::TaskExecutor::CallbackArgs& args) { @@ -1794,6 +1825,12 @@ void InitialSyncer::_finishCallback(StatusWith<OpTimeAndWallTime> lastApplied) { if (lastApplied.isOK() && !MONGO_unlikely(skipClearInitialSyncState.shouldFail())) { _initialSyncState.reset(); } + + // Destroy shared references to executors. + _attemptExec = nullptr; + _clonerAttemptExec = nullptr; + _clonerExec = nullptr; + _exec = nullptr; } Status InitialSyncer::_scheduleLastOplogEntryFetcher_inlock( @@ -1803,7 +1840,7 @@ Status InitialSyncer::_scheduleLastOplogEntryFetcher_inlock( << ReadConcernArgs::kImplicitDefault); _lastOplogEntryFetcher = std::make_unique<Fetcher>( - _exec, + *_attemptExec, _syncSource, _opts.remoteOplogNS.db().toString(), query, @@ -1959,7 +1996,7 @@ Status InitialSyncer::_scheduleWorkAndSaveHandle_inlock( str::stream() << "failed to schedule work " << name << ": initial syncer is shutting down"); } - auto result = _exec->scheduleWork(std::move(work)); + auto result = (*_attemptExec)->scheduleWork(std::move(work)); if (!result.isOK()) { return result.getStatus().withContext(str::stream() << "failed to schedule work " << name); } @@ -1978,7 +2015,7 @@ Status InitialSyncer::_scheduleWorkAtAndSaveHandle_inlock( str::stream() << "failed to schedule work " << name << " at " << when.toString() << ": initial syncer is shutting down"); } - auto result = _exec->scheduleWorkAt(when, std::move(work)); + auto result = (*_attemptExec)->scheduleWorkAt(when, std::move(work)); if (!result.isOK()) { return result.getStatus().withContext(str::stream() << "failed to schedule work " << name << " at " << when.toString()); @@ -1991,15 +2028,24 @@ void InitialSyncer::_cancelHandle_inlock(executor::TaskExecutor::CallbackHandle if (!handle) { return; } - _exec->cancel(handle); + (*_attemptExec)->cancel(handle); } template <typename Component> Status InitialSyncer::_startupComponent_inlock(Component& component) { - if (_isShuttingDown_inlock()) { + // It is necessary to check if shutdown or attempt cancelling happens before starting a + // component; otherwise the component may call a callback function in line which will + // cause a deadlock when the callback attempts to obtain the initial syncer mutex. + if (_isShuttingDown_inlock() || _attemptCanceled) { component.reset(); - return Status(ErrorCodes::CallbackCanceled, - "initial syncer shutdown while trying to call startup() on component"); + if (_isShuttingDown_inlock()) { + return Status(ErrorCodes::CallbackCanceled, + "initial syncer shutdown while trying to call startup() on component"); + } else { + return Status( + ErrorCodes::CallbackCanceled, + "initial sync attempt canceled while trying to call startup() on component"); + } } auto status = component->startup(); if (!status.isOK()) { diff --git a/src/mongo/db/repl/initial_syncer.h b/src/mongo/db/repl/initial_syncer.h index 71063112642..ed5ab5dc5da 100644 --- a/src/mongo/db/repl/initial_syncer.h +++ b/src/mongo/db/repl/initial_syncer.h @@ -52,6 +52,7 @@ #include "mongo/db/repl/rollback_checker.h" #include "mongo/db/repl/sync_source_selector.h" #include "mongo/dbtests/mock/mock_dbclient_connection.h" +#include "mongo/executor/scoped_task_executor.h" #include "mongo/platform/mutex.h" #include "mongo/stdx/condition_variable.h" #include "mongo/util/concurrency/thread_pool.h" @@ -295,7 +296,7 @@ public: * * For testing only */ - void setClonerExecutor_forTest(executor::TaskExecutor* clonerExec); + void setClonerExecutor_forTest(std::shared_ptr<executor::TaskExecutor> clonerExec); /** * @@ -710,15 +711,17 @@ private: mutable Mutex _mutex = MONGO_MAKE_LATCH("InitialSyncer::_mutex"); // (S) const InitialSyncerOptions _opts; // (R) std::unique_ptr<DataReplicatorExternalState> _dataReplicatorExternalState; // (R) - executor::TaskExecutor* _exec; // (R) + std::shared_ptr<executor::TaskExecutor> _exec; // (R) + std::unique_ptr<executor::ScopedTaskExecutor> _attemptExec; // (X) // The executor that the Cloner thread runs on. In production code this is the same as _exec, // but for unit testing, _exec is single-threaded and our NetworkInterfaceMock runs it in // lockstep with the unit test code. If we pause the cloners using failpoints // NetworkInterfaceMock is unaware of this and this causes our unit tests to deadlock. - executor::TaskExecutor* _clonerExec; // (R) - ThreadPool* _writerPool; // (R) - StorageInterface* _storage; // (R) - ReplicationProcess* _replicationProcess; // (S) + std::shared_ptr<executor::TaskExecutor> _clonerExec; // (R) + std::unique_ptr<executor::ScopedTaskExecutor> _clonerAttemptExec; // (X) + ThreadPool* _writerPool; // (R) + StorageInterface* _storage; // (R) + ReplicationProcess* _replicationProcess; // (S) // This is invoked with the final status of the initial sync. If startup() fails, this callback // is never invoked. The caller gets the last applied optime when the initial sync completes @@ -784,6 +787,9 @@ private: // Amount of time an outage is allowed to continue before the initial sync attempt is marked // as failed. Milliseconds _allowedOutageDuration; // (M) + + // The initial sync attempt has been canceled + bool _attemptCanceled = false; // (X) }; } // namespace repl diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp index ff78d54ec78..1587a0fef0f 100644 --- a/src/mongo/db/repl/initial_syncer_test.cpp +++ b/src/mongo/db/repl/initial_syncer_test.cpp @@ -397,7 +397,7 @@ protected: }; auto dataReplicatorExternalState = std::make_unique<DataReplicatorExternalStateMock>(); - dataReplicatorExternalState->taskExecutor = _executorProxy.get(); + dataReplicatorExternalState->taskExecutor = _executorProxy; dataReplicatorExternalState->currentTerm = 1LL; dataReplicatorExternalState->lastCommittedOpTime = _myLastOpTime; { @@ -439,7 +439,7 @@ protected: return std::unique_ptr<DBClientConnection>( new MockDBClientConnection(_mockServer.get())); }); - _initialSyncer->setClonerExecutor_forTest(_clonerExecutor.get()); + _initialSyncer->setClonerExecutor_forTest(_clonerExecutor); _initialSyncer->setCreateOplogFetcherFn_forTest( [](executor::TaskExecutor* executor, OpTime lastFetched, @@ -514,8 +514,8 @@ protected: void doSuccessfulInitialSyncWithOneBatch(); OplogEntry doInitialSyncWithOneBatch(); - std::unique_ptr<TaskExecutorMock> _executorProxy; - std::unique_ptr<executor::ThreadPoolTaskExecutor> _clonerExecutor; + std::shared_ptr<TaskExecutorMock> _executorProxy; + std::shared_ptr<executor::ThreadPoolTaskExecutor> _clonerExecutor; InitialSyncerOptions _options; InitialSyncerOptions::SetMyLastOptimeFn _setMyLastOptime; @@ -716,7 +716,7 @@ TEST_F(InitialSyncerTest, InvalidConstruction) { // Null callback function. { auto dataReplicatorExternalState = std::make_unique<DataReplicatorExternalStateMock>(); - dataReplicatorExternalState->taskExecutor = &getExecutor(); + dataReplicatorExternalState->taskExecutor = _executorProxy; ASSERT_THROWS_CODE_AND_WHAT(InitialSyncer(options, std::move(dataReplicatorExternalState), _dbWorkThreadPool.get(), @@ -1058,7 +1058,7 @@ TEST_F(InitialSyncerTest, InitialSyncerResetsOnCompletionCallbackFunctionPointer decltype(_lastApplied) lastApplied = getDetectableErrorStatus(); auto dataReplicatorExternalState = std::make_unique<DataReplicatorExternalStateMock>(); - dataReplicatorExternalState->taskExecutor = &getExecutor(); + dataReplicatorExternalState->taskExecutor = _executorProxy; auto initialSyncer = std::make_unique<InitialSyncer>( _options, std::move(dataReplicatorExternalState), diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h index 506b5caa11d..db66d215ece 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state.h +++ b/src/mongo/db/repl/replication_coordinator_external_state.h @@ -114,6 +114,7 @@ public: * Returns task executor for scheduling tasks to be run asynchronously. */ virtual executor::TaskExecutor* getTaskExecutor() const = 0; + virtual std::shared_ptr<executor::TaskExecutor> getSharedTaskExecutor() const = 0; /** * Returns shared db worker thread pool for collection cloning. diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 3f03ceb9d28..08fe07b0ed2 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -426,6 +426,11 @@ executor::TaskExecutor* ReplicationCoordinatorExternalStateImpl::getTaskExecutor return _taskExecutor.get(); } +std::shared_ptr<executor::TaskExecutor> +ReplicationCoordinatorExternalStateImpl::getSharedTaskExecutor() const { + return _taskExecutor; +} + ThreadPool* ReplicationCoordinatorExternalStateImpl::getDbWorkThreadPool() const { return _writerPool.get(); } diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h index 3f638707bcc..74662f30fca 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h @@ -78,6 +78,7 @@ public: virtual void clearAppliedThroughIfCleanShutdown(OperationContext* opCtx); virtual executor::TaskExecutor* getTaskExecutor() const override; + std::shared_ptr<executor::TaskExecutor> getSharedTaskExecutor() const override; virtual ThreadPool* getDbWorkThreadPool() const override; virtual Status initializeReplSetStorage(OperationContext* opCtx, const BSONObj& config); void onDrainComplete(OperationContext* opCtx) override; @@ -220,7 +221,7 @@ private: long long _nextThreadId = 0; // Task executor used to run replication tasks. - std::unique_ptr<executor::TaskExecutor> _taskExecutor; + std::shared_ptr<executor::TaskExecutor> _taskExecutor; // Used by repl::applyOplogBatch() to apply the sync source's operations in parallel. // Also used by database and collection cloners to perform storage operations. diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp index bc3cb98418c..27f569e592c 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp @@ -88,6 +88,11 @@ executor::TaskExecutor* ReplicationCoordinatorExternalStateMock::getTaskExecutor return nullptr; } +std::shared_ptr<executor::TaskExecutor> +ReplicationCoordinatorExternalStateMock::getSharedTaskExecutor() const { + return nullptr; +} + ThreadPool* ReplicationCoordinatorExternalStateMock::getDbWorkThreadPool() const { return nullptr; } diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h index 71a2f7becd9..f69b774b8e9 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h @@ -68,6 +68,7 @@ public: virtual void shutdown(OperationContext* opCtx); virtual void clearAppliedThroughIfCleanShutdown(OperationContext* opCtx); virtual executor::TaskExecutor* getTaskExecutor() const override; + virtual std::shared_ptr<executor::TaskExecutor> getSharedTaskExecutor() const override; virtual ThreadPool* getDbWorkThreadPool() const override; virtual Status initializeReplSetStorage(OperationContext* opCtx, const BSONObj& config); void onDrainComplete(OperationContext* opCtx) override; |