summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/db/repl/data_replicator_external_state.h1
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_impl.cpp5
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_impl.h1
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_mock.cpp4
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_mock.h5
-rw-r--r--src/mongo/db/repl/initial_syncer.cpp138
-rw-r--r--src/mongo/db/repl/initial_syncer.h18
-rw-r--r--src/mongo/db/repl/initial_syncer_test.cpp12
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state.h1
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp5
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.h3
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.cpp5
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.h1
14 files changed, 139 insertions, 61 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index dd28cc7cf14..99b7db99e5f 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -1182,6 +1182,7 @@ env.Library(
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/db/commands/feature_compatibility_parsers',
'$BUILD_DIR/mongo/db/index_builds_coordinator_interface',
+ '$BUILD_DIR/mongo/executor/scoped_task_executor',
'repl_server_parameters',
]
)
diff --git a/src/mongo/db/repl/data_replicator_external_state.h b/src/mongo/db/repl/data_replicator_external_state.h
index 2d2e56df3d1..067a131201b 100644
--- a/src/mongo/db/repl/data_replicator_external_state.h
+++ b/src/mongo/db/repl/data_replicator_external_state.h
@@ -78,6 +78,7 @@ public:
* Returns task executor for scheduling tasks to be run asynchronously.
*/
virtual executor::TaskExecutor* getTaskExecutor() const = 0;
+ virtual std::shared_ptr<executor::TaskExecutor> getSharedTaskExecutor() const = 0;
/**
* Returns the current term and last committed optime.
diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.cpp b/src/mongo/db/repl/data_replicator_external_state_impl.cpp
index 6849dd52418..31bb90785cd 100644
--- a/src/mongo/db/repl/data_replicator_external_state_impl.cpp
+++ b/src/mongo/db/repl/data_replicator_external_state_impl.cpp
@@ -73,6 +73,11 @@ executor::TaskExecutor* DataReplicatorExternalStateImpl::getTaskExecutor() const
return _replicationCoordinatorExternalState->getTaskExecutor();
}
+std::shared_ptr<executor::TaskExecutor> DataReplicatorExternalStateImpl::getSharedTaskExecutor()
+ const {
+ return _replicationCoordinatorExternalState->getSharedTaskExecutor();
+}
+
OpTimeWithTerm DataReplicatorExternalStateImpl::getCurrentTermAndLastCommittedOpTime() {
return {_replicationCoordinator->getTerm(), _replicationCoordinator->getLastCommittedOpTime()};
}
diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.h b/src/mongo/db/repl/data_replicator_external_state_impl.h
index 3741fbae7c4..aa62e77a7d0 100644
--- a/src/mongo/db/repl/data_replicator_external_state_impl.h
+++ b/src/mongo/db/repl/data_replicator_external_state_impl.h
@@ -48,6 +48,7 @@ public:
ReplicationCoordinatorExternalState* replicationCoordinatorExternalState);
executor::TaskExecutor* getTaskExecutor() const override;
+ std::shared_ptr<executor::TaskExecutor> getSharedTaskExecutor() const;
OpTimeWithTerm getCurrentTermAndLastCommittedOpTime() override;
diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.cpp b/src/mongo/db/repl/data_replicator_external_state_mock.cpp
index 305c4e1abd2..5e292f899d1 100644
--- a/src/mongo/db/repl/data_replicator_external_state_mock.cpp
+++ b/src/mongo/db/repl/data_replicator_external_state_mock.cpp
@@ -75,6 +75,10 @@ DataReplicatorExternalStateMock::DataReplicatorExternalStateMock()
OplogApplier::Observer*) { return ops.back().getOpTime(); }) {}
executor::TaskExecutor* DataReplicatorExternalStateMock::getTaskExecutor() const {
+ return taskExecutor.get();
+}
+std::shared_ptr<executor::TaskExecutor> DataReplicatorExternalStateMock::getSharedTaskExecutor()
+ const {
return taskExecutor;
}
diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.h b/src/mongo/db/repl/data_replicator_external_state_mock.h
index a9155c30b9e..b32ff68df1c 100644
--- a/src/mongo/db/repl/data_replicator_external_state_mock.h
+++ b/src/mongo/db/repl/data_replicator_external_state_mock.h
@@ -45,6 +45,7 @@ public:
DataReplicatorExternalStateMock();
executor::TaskExecutor* getTaskExecutor() const override;
+ std::shared_ptr<executor::TaskExecutor> getSharedTaskExecutor() const override;
OpTimeWithTerm getCurrentTermAndLastCommittedOpTime() override;
@@ -69,8 +70,8 @@ public:
StatusWith<ReplSetConfig> getCurrentConfig() const override;
- // Task executor. Not owned by us.
- executor::TaskExecutor* taskExecutor = nullptr;
+ // Task executor.
+ std::shared_ptr<executor::TaskExecutor> taskExecutor = nullptr;
// Returned by getCurrentTermAndLastCommittedOpTime.
long long currentTerm = OpTime::kUninitializedTerm;
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp
index 0048ff09bf1..7c5c08f8096 100644
--- a/src/mongo/db/repl/initial_syncer.cpp
+++ b/src/mongo/db/repl/initial_syncer.cpp
@@ -88,6 +88,10 @@ MONGO_FAIL_POINT_DEFINE(failInitSyncWithBufferedEntriesLeft);
// transaction timestamp from the sync source.
MONGO_FAIL_POINT_DEFINE(initialSyncHangAfterGettingBeginFetchingTimestamp);
+// Failpoint which causes the initial sync function to hang before creating shared data and
+// splitting control flow between the oplog fetcher and the cloners.
+MONGO_FAIL_POINT_DEFINE(initialSyncHangBeforeSplittingControlFlow);
+
// Failpoint which causes the initial sync function to hang before copying databases.
MONGO_FAIL_POINT_DEFINE(initialSyncHangBeforeCopyingDatabases);
@@ -202,7 +206,7 @@ InitialSyncer::InitialSyncer(
: _fetchCount(0),
_opts(opts),
_dataReplicatorExternalState(std::move(dataReplicatorExternalState)),
- _exec(_dataReplicatorExternalState->getTaskExecutor()),
+ _exec(_dataReplicatorExternalState->getSharedTaskExecutor()),
_clonerExec(_exec),
_writerPool(writerPool),
_storage(storage),
@@ -284,6 +288,10 @@ Status InitialSyncer::startup(OperationContext* opCtx,
// Start first initial sync attempt.
std::uint32_t initialSyncAttempt = 0;
+ _attemptExec = std::make_unique<executor::ScopedTaskExecutor>(
+ _exec, Status(ErrorCodes::CallbackCanceled, "Initial Sync Attempt Canceled"));
+ _clonerAttemptExec = std::make_unique<executor::ScopedTaskExecutor>(
+ _clonerExec, Status(ErrorCodes::CallbackCanceled, "Initial Sync Attempt Canceled"));
auto status = _scheduleWorkAndSaveHandle_inlock(
[=](const executor::TaskExecutor::CallbackArgs& args) {
_startInitialSyncAttemptCallback(args, initialSyncAttempt, initialSyncMaxAttempts);
@@ -358,6 +366,9 @@ void InitialSyncer::_cancelRemainingWork_inlock() {
_shutdownComponent_inlock(_fCVFetcher);
_shutdownComponent_inlock(_lastOplogEntryFetcher);
_shutdownComponent_inlock(_beginFetchingOpTimeFetcher);
+ (*_attemptExec)->shutdown();
+ (*_clonerAttemptExec)->shutdown();
+ _attemptCanceled = true;
}
void InitialSyncer::join() {
@@ -501,7 +512,7 @@ OplogFetcher* InitialSyncer::getOplogFetcher_forTest() const {
return nullptr;
}
-void InitialSyncer::setClonerExecutor_forTest(executor::TaskExecutor* clonerExec) {
+void InitialSyncer::setClonerExecutor_forTest(std::shared_ptr<executor::TaskExecutor> clonerExec) {
_clonerExec = clonerExec;
}
@@ -693,7 +704,7 @@ void InitialSyncer::_chooseSyncSourceCallback(
return;
}
- auto when = _exec->now() + _opts.syncSourceRetryWait;
+ auto when = (*_attemptExec)->now() + _opts.syncSourceRetryWait;
LOGV2_DEBUG(21169,
1,
"Error getting sync source: '{error}', trying again in "
@@ -747,7 +758,7 @@ void InitialSyncer::_chooseSyncSourceCallback(
_syncSource = syncSource.getValue();
// Schedule rollback ID checker.
- _rollbackChecker = std::make_unique<RollbackChecker>(_exec, _syncSource);
+ _rollbackChecker = std::make_unique<RollbackChecker>(*_attemptExec, _syncSource);
auto scheduleResult = _rollbackChecker->reset([=](const RollbackChecker::Result& result) {
return _rollbackCheckerResetCallback(result, onCompletionGuard);
});
@@ -898,7 +909,7 @@ Status InitialSyncer::_scheduleGetBeginFetchingOpTime_inlock(
cmd.append("limit", 1);
_beginFetchingOpTimeFetcher = std::make_unique<Fetcher>(
- _exec,
+ *_attemptExec,
_syncSource,
NamespaceString::kSessionTransactionsTableNamespace.db().toString(),
cmd.obj(),
@@ -1027,7 +1038,7 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForBeginApplyingTimestamp(
readConcernBob.done();
_fCVFetcher = std::make_unique<Fetcher>(
- _exec,
+ *_attemptExec,
_syncSource,
NamespaceString::kServerConfigurationNamespace.db().toString(),
queryBob.obj(),
@@ -1100,6 +1111,18 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse>
return;
}
+ if (MONGO_unlikely(initialSyncHangBeforeSplittingControlFlow.shouldFail())) {
+ lock.unlock();
+ LOGV2(5032000,
+ "initial sync - initialSyncHangBeforeSplittingControlFlow fail point "
+ "enabled. Blocking until fail point is disabled.");
+ while (MONGO_unlikely(initialSyncHangBeforeSplittingControlFlow.shouldFail()) &&
+ !_isShuttingDown()) {
+ mongo::sleepsecs(1);
+ }
+ lock.lock();
+ }
+
// This is where the flow of control starts to split into two parallel tracks:
// - oplog fetcher
// - data cloning and applier
@@ -1155,7 +1178,7 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse>
const auto& config = configResult.getValue();
_oplogFetcher = _createOplogFetcherFn(
- _exec,
+ *_attemptExec,
beginFetchingOpTime,
_syncSource,
config,
@@ -1209,7 +1232,7 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse>
"allDatabaseCloner"_attr = _initialSyncState->allDatabaseCloner->toString());
auto [startClonerFuture, startCloner] =
- _initialSyncState->allDatabaseCloner->runOnExecutorEvent(_clonerExec);
+ _initialSyncState->allDatabaseCloner->runOnExecutorEvent(*_clonerAttemptExec);
// runOnExecutorEvent ensures the future is not ready unless an error has occurred.
if (startClonerFuture.isReady()) {
status = startClonerFuture.getNoThrow();
@@ -1224,10 +1247,11 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse>
// and in unit tests, if the cloner finishes very quickly, the callback could run
// in-line and result in self-deadlock.
stdx::unique_lock<Latch> lock(_mutex);
- auto exec_status = _exec->scheduleWork(
- [this, status, onCompletionGuard](executor::TaskExecutor::CallbackArgs args) {
- _allDatabaseClonerCallback(status, onCompletionGuard);
- });
+ auto exec_status = (*_attemptExec)
+ ->scheduleWork([this, status, onCompletionGuard](
+ executor::TaskExecutor::CallbackArgs args) {
+ _allDatabaseClonerCallback(status, onCompletionGuard);
+ });
if (!exec_status.isOK()) {
onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock,
exec_status.getStatus());
@@ -1244,7 +1268,7 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse>
lock.unlock();
// Start (and therefore finish) the cloners outside the lock. This ensures onCompletion
// is not run with the mutex held, which would result in self-deadlock.
- _clonerExec->signalEvent(startCloner);
+ (*_clonerAttemptExec)->signalEvent(startCloner);
}
void InitialSyncer::_oplogFetcherCallback(const Status& oplogFetcherFinishStatus,
@@ -1343,29 +1367,31 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForStopTimestamp(
auto status = _checkForShutdownAndConvertStatus_inlock(
result.getStatus(), "error fetching last oplog entry for stop timestamp");
if (_shouldRetryError(lock, status)) {
- auto scheduleStatus = _exec->scheduleWork(
- [this, onCompletionGuard](executor::TaskExecutor::CallbackArgs args) {
- // It is not valid to schedule the retry from within this callback,
- // hence we schedule a lambda to schedule the retry.
- stdx::lock_guard<Latch> lock(_mutex);
- // Since the stopTimestamp is retrieved after we have done all the work of
- // retrieving collection data, we handle retries within this class by retrying
- // for 'initialSyncTransientErrorRetryPeriodSeconds' (default 24 hours). This
- // is the same retry strategy used when retrieving collection data, and avoids
- // retrieving all the data and then throwing it away due to a transient network
- // outage.
- auto status = _scheduleLastOplogEntryFetcher_inlock(
- [=](const StatusWith<mongo::Fetcher::QueryResponse>& status,
- mongo::Fetcher::NextAction*,
- mongo::BSONObjBuilder*) {
- _lastOplogEntryFetcherCallbackForStopTimestamp(status,
- onCompletionGuard);
- },
- kInitialSyncerHandlesRetries);
- if (!status.isOK()) {
- onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
- }
- });
+ auto scheduleStatus =
+ (*_attemptExec)
+ ->scheduleWork([this,
+ onCompletionGuard](executor::TaskExecutor::CallbackArgs args) {
+ // It is not valid to schedule the retry from within this callback,
+ // hence we schedule a lambda to schedule the retry.
+ stdx::lock_guard<Latch> lock(_mutex);
+ // Since the stopTimestamp is retrieved after we have done all the work of
+ // retrieving collection data, we handle retries within this class by
+ // retrying for 'initialSyncTransientErrorRetryPeriodSeconds' (default 24
+ // hours). This is the same retry strategy used when retrieving collection
+ // data, and avoids retrieving all the data and then throwing it away due to
+ // a transient network outage.
+ auto status = _scheduleLastOplogEntryFetcher_inlock(
+ [=](const StatusWith<mongo::Fetcher::QueryResponse>& status,
+ mongo::Fetcher::NextAction*,
+ mongo::BSONObjBuilder*) {
+ _lastOplogEntryFetcherCallbackForStopTimestamp(status,
+ onCompletionGuard);
+ },
+ kInitialSyncerHandlesRetries);
+ if (!status.isOK()) {
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
+ }
+ });
if (scheduleStatus.isOK())
return;
// If scheduling failed, we're shutting down and cannot retry.
@@ -1494,7 +1520,7 @@ void InitialSyncer::_getNextApplierBatchCallback(
};
_applier = std::make_unique<MultiApplier>(
- _exec, ops, std::move(applyBatchOfOperationsFn), std::move(onCompletionFn));
+ *_attemptExec, ops, std::move(applyBatchOfOperationsFn), std::move(onCompletionFn));
status = _startupComponent_inlock(_applier);
if (!status.isOK()) {
onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
@@ -1527,7 +1553,7 @@ void InitialSyncer::_getNextApplierBatchCallback(
// If there are no operations at the moment to apply and the oplog fetcher is still waiting on
// the sync source, we'll check the oplog buffer again in
// '_opts.getApplierBatchCallbackRetryWait' ms.
- auto when = _exec->now() + _opts.getApplierBatchCallbackRetryWait;
+ auto when = (*_attemptExec)->now() + _opts.getApplierBatchCallbackRetryWait;
status = _scheduleWorkAtAndSaveHandle_inlock(
when,
[=](const CallbackArgs& args) { _getNextApplierBatchCallback(args, onCompletionGuard); },
@@ -1721,7 +1747,12 @@ void InitialSyncer::_finishInitialSyncAttempt(const StatusWith<OpTimeAndWallTime
return;
}
- auto when = _exec->now() + _opts.initialSyncRetryWait;
+ _attemptExec = std::make_unique<executor::ScopedTaskExecutor>(
+ _exec, Status(ErrorCodes::CallbackCanceled, "Initial Sync Attempt Canceled"));
+ _clonerAttemptExec = std::make_unique<executor::ScopedTaskExecutor>(
+ _clonerExec, Status(ErrorCodes::CallbackCanceled, "Initial Sync Attempt Canceled"));
+ _attemptCanceled = false;
+ auto when = (*_attemptExec)->now() + _opts.initialSyncRetryWait;
auto status = _scheduleWorkAtAndSaveHandle_inlock(
when,
[=](const executor::TaskExecutor::CallbackArgs& args) {
@@ -1794,6 +1825,12 @@ void InitialSyncer::_finishCallback(StatusWith<OpTimeAndWallTime> lastApplied) {
if (lastApplied.isOK() && !MONGO_unlikely(skipClearInitialSyncState.shouldFail())) {
_initialSyncState.reset();
}
+
+ // Destroy shared references to executors.
+ _attemptExec = nullptr;
+ _clonerAttemptExec = nullptr;
+ _clonerExec = nullptr;
+ _exec = nullptr;
}
Status InitialSyncer::_scheduleLastOplogEntryFetcher_inlock(
@@ -1803,7 +1840,7 @@ Status InitialSyncer::_scheduleLastOplogEntryFetcher_inlock(
<< ReadConcernArgs::kImplicitDefault);
_lastOplogEntryFetcher = std::make_unique<Fetcher>(
- _exec,
+ *_attemptExec,
_syncSource,
_opts.remoteOplogNS.db().toString(),
query,
@@ -1959,7 +1996,7 @@ Status InitialSyncer::_scheduleWorkAndSaveHandle_inlock(
str::stream() << "failed to schedule work " << name
<< ": initial syncer is shutting down");
}
- auto result = _exec->scheduleWork(std::move(work));
+ auto result = (*_attemptExec)->scheduleWork(std::move(work));
if (!result.isOK()) {
return result.getStatus().withContext(str::stream() << "failed to schedule work " << name);
}
@@ -1978,7 +2015,7 @@ Status InitialSyncer::_scheduleWorkAtAndSaveHandle_inlock(
str::stream() << "failed to schedule work " << name << " at "
<< when.toString() << ": initial syncer is shutting down");
}
- auto result = _exec->scheduleWorkAt(when, std::move(work));
+ auto result = (*_attemptExec)->scheduleWorkAt(when, std::move(work));
if (!result.isOK()) {
return result.getStatus().withContext(str::stream() << "failed to schedule work " << name
<< " at " << when.toString());
@@ -1991,15 +2028,24 @@ void InitialSyncer::_cancelHandle_inlock(executor::TaskExecutor::CallbackHandle
if (!handle) {
return;
}
- _exec->cancel(handle);
+ (*_attemptExec)->cancel(handle);
}
template <typename Component>
Status InitialSyncer::_startupComponent_inlock(Component& component) {
- if (_isShuttingDown_inlock()) {
+ // It is necessary to check if shutdown or attempt cancelling happens before starting a
+ // component; otherwise the component may call a callback function in line which will
+ // cause a deadlock when the callback attempts to obtain the initial syncer mutex.
+ if (_isShuttingDown_inlock() || _attemptCanceled) {
component.reset();
- return Status(ErrorCodes::CallbackCanceled,
- "initial syncer shutdown while trying to call startup() on component");
+ if (_isShuttingDown_inlock()) {
+ return Status(ErrorCodes::CallbackCanceled,
+ "initial syncer shutdown while trying to call startup() on component");
+ } else {
+ return Status(
+ ErrorCodes::CallbackCanceled,
+ "initial sync attempt canceled while trying to call startup() on component");
+ }
}
auto status = component->startup();
if (!status.isOK()) {
diff --git a/src/mongo/db/repl/initial_syncer.h b/src/mongo/db/repl/initial_syncer.h
index 71063112642..ed5ab5dc5da 100644
--- a/src/mongo/db/repl/initial_syncer.h
+++ b/src/mongo/db/repl/initial_syncer.h
@@ -52,6 +52,7 @@
#include "mongo/db/repl/rollback_checker.h"
#include "mongo/db/repl/sync_source_selector.h"
#include "mongo/dbtests/mock/mock_dbclient_connection.h"
+#include "mongo/executor/scoped_task_executor.h"
#include "mongo/platform/mutex.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/util/concurrency/thread_pool.h"
@@ -295,7 +296,7 @@ public:
*
* For testing only
*/
- void setClonerExecutor_forTest(executor::TaskExecutor* clonerExec);
+ void setClonerExecutor_forTest(std::shared_ptr<executor::TaskExecutor> clonerExec);
/**
*
@@ -710,15 +711,17 @@ private:
mutable Mutex _mutex = MONGO_MAKE_LATCH("InitialSyncer::_mutex"); // (S)
const InitialSyncerOptions _opts; // (R)
std::unique_ptr<DataReplicatorExternalState> _dataReplicatorExternalState; // (R)
- executor::TaskExecutor* _exec; // (R)
+ std::shared_ptr<executor::TaskExecutor> _exec; // (R)
+ std::unique_ptr<executor::ScopedTaskExecutor> _attemptExec; // (X)
// The executor that the Cloner thread runs on. In production code this is the same as _exec,
// but for unit testing, _exec is single-threaded and our NetworkInterfaceMock runs it in
// lockstep with the unit test code. If we pause the cloners using failpoints
// NetworkInterfaceMock is unaware of this and this causes our unit tests to deadlock.
- executor::TaskExecutor* _clonerExec; // (R)
- ThreadPool* _writerPool; // (R)
- StorageInterface* _storage; // (R)
- ReplicationProcess* _replicationProcess; // (S)
+ std::shared_ptr<executor::TaskExecutor> _clonerExec; // (R)
+ std::unique_ptr<executor::ScopedTaskExecutor> _clonerAttemptExec; // (X)
+ ThreadPool* _writerPool; // (R)
+ StorageInterface* _storage; // (R)
+ ReplicationProcess* _replicationProcess; // (S)
// This is invoked with the final status of the initial sync. If startup() fails, this callback
// is never invoked. The caller gets the last applied optime when the initial sync completes
@@ -784,6 +787,9 @@ private:
// Amount of time an outage is allowed to continue before the initial sync attempt is marked
// as failed.
Milliseconds _allowedOutageDuration; // (M)
+
+ // The initial sync attempt has been canceled
+ bool _attemptCanceled = false; // (X)
};
} // namespace repl
diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp
index ff78d54ec78..1587a0fef0f 100644
--- a/src/mongo/db/repl/initial_syncer_test.cpp
+++ b/src/mongo/db/repl/initial_syncer_test.cpp
@@ -397,7 +397,7 @@ protected:
};
auto dataReplicatorExternalState = std::make_unique<DataReplicatorExternalStateMock>();
- dataReplicatorExternalState->taskExecutor = _executorProxy.get();
+ dataReplicatorExternalState->taskExecutor = _executorProxy;
dataReplicatorExternalState->currentTerm = 1LL;
dataReplicatorExternalState->lastCommittedOpTime = _myLastOpTime;
{
@@ -439,7 +439,7 @@ protected:
return std::unique_ptr<DBClientConnection>(
new MockDBClientConnection(_mockServer.get()));
});
- _initialSyncer->setClonerExecutor_forTest(_clonerExecutor.get());
+ _initialSyncer->setClonerExecutor_forTest(_clonerExecutor);
_initialSyncer->setCreateOplogFetcherFn_forTest(
[](executor::TaskExecutor* executor,
OpTime lastFetched,
@@ -514,8 +514,8 @@ protected:
void doSuccessfulInitialSyncWithOneBatch();
OplogEntry doInitialSyncWithOneBatch();
- std::unique_ptr<TaskExecutorMock> _executorProxy;
- std::unique_ptr<executor::ThreadPoolTaskExecutor> _clonerExecutor;
+ std::shared_ptr<TaskExecutorMock> _executorProxy;
+ std::shared_ptr<executor::ThreadPoolTaskExecutor> _clonerExecutor;
InitialSyncerOptions _options;
InitialSyncerOptions::SetMyLastOptimeFn _setMyLastOptime;
@@ -716,7 +716,7 @@ TEST_F(InitialSyncerTest, InvalidConstruction) {
// Null callback function.
{
auto dataReplicatorExternalState = std::make_unique<DataReplicatorExternalStateMock>();
- dataReplicatorExternalState->taskExecutor = &getExecutor();
+ dataReplicatorExternalState->taskExecutor = _executorProxy;
ASSERT_THROWS_CODE_AND_WHAT(InitialSyncer(options,
std::move(dataReplicatorExternalState),
_dbWorkThreadPool.get(),
@@ -1058,7 +1058,7 @@ TEST_F(InitialSyncerTest, InitialSyncerResetsOnCompletionCallbackFunctionPointer
decltype(_lastApplied) lastApplied = getDetectableErrorStatus();
auto dataReplicatorExternalState = std::make_unique<DataReplicatorExternalStateMock>();
- dataReplicatorExternalState->taskExecutor = &getExecutor();
+ dataReplicatorExternalState->taskExecutor = _executorProxy;
auto initialSyncer = std::make_unique<InitialSyncer>(
_options,
std::move(dataReplicatorExternalState),
diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h
index 506b5caa11d..db66d215ece 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state.h
@@ -114,6 +114,7 @@ public:
* Returns task executor for scheduling tasks to be run asynchronously.
*/
virtual executor::TaskExecutor* getTaskExecutor() const = 0;
+ virtual std::shared_ptr<executor::TaskExecutor> getSharedTaskExecutor() const = 0;
/**
* Returns shared db worker thread pool for collection cloning.
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 3f03ceb9d28..08fe07b0ed2 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -426,6 +426,11 @@ executor::TaskExecutor* ReplicationCoordinatorExternalStateImpl::getTaskExecutor
return _taskExecutor.get();
}
+std::shared_ptr<executor::TaskExecutor>
+ReplicationCoordinatorExternalStateImpl::getSharedTaskExecutor() const {
+ return _taskExecutor;
+}
+
ThreadPool* ReplicationCoordinatorExternalStateImpl::getDbWorkThreadPool() const {
return _writerPool.get();
}
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
index 3f638707bcc..74662f30fca 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
@@ -78,6 +78,7 @@ public:
virtual void clearAppliedThroughIfCleanShutdown(OperationContext* opCtx);
virtual executor::TaskExecutor* getTaskExecutor() const override;
+ std::shared_ptr<executor::TaskExecutor> getSharedTaskExecutor() const override;
virtual ThreadPool* getDbWorkThreadPool() const override;
virtual Status initializeReplSetStorage(OperationContext* opCtx, const BSONObj& config);
void onDrainComplete(OperationContext* opCtx) override;
@@ -220,7 +221,7 @@ private:
long long _nextThreadId = 0;
// Task executor used to run replication tasks.
- std::unique_ptr<executor::TaskExecutor> _taskExecutor;
+ std::shared_ptr<executor::TaskExecutor> _taskExecutor;
// Used by repl::applyOplogBatch() to apply the sync source's operations in parallel.
// Also used by database and collection cloners to perform storage operations.
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
index bc3cb98418c..27f569e592c 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
@@ -88,6 +88,11 @@ executor::TaskExecutor* ReplicationCoordinatorExternalStateMock::getTaskExecutor
return nullptr;
}
+std::shared_ptr<executor::TaskExecutor>
+ReplicationCoordinatorExternalStateMock::getSharedTaskExecutor() const {
+ return nullptr;
+}
+
ThreadPool* ReplicationCoordinatorExternalStateMock::getDbWorkThreadPool() const {
return nullptr;
}
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
index 71a2f7becd9..f69b774b8e9 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
@@ -68,6 +68,7 @@ public:
virtual void shutdown(OperationContext* opCtx);
virtual void clearAppliedThroughIfCleanShutdown(OperationContext* opCtx);
virtual executor::TaskExecutor* getTaskExecutor() const override;
+ virtual std::shared_ptr<executor::TaskExecutor> getSharedTaskExecutor() const override;
virtual ThreadPool* getDbWorkThreadPool() const override;
virtual Status initializeReplSetStorage(OperationContext* opCtx, const BSONObj& config);
void onDrainComplete(OperationContext* opCtx) override;