summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/initial_syncer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/initial_syncer.cpp')
-rw-r--r--src/mongo/db/repl/initial_syncer.cpp114
1 files changed, 59 insertions, 55 deletions
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp
index cd4bba6c986..21d12d6ecb9 100644
--- a/src/mongo/db/repl/initial_syncer.cpp
+++ b/src/mongo/db/repl/initial_syncer.cpp
@@ -284,7 +284,7 @@ Status InitialSyncer::startup(OperationContext* opCtx,
_clonerAttemptExec = std::make_unique<executor::ScopedTaskExecutor>(
_clonerExec, Status(ErrorCodes::CallbackCanceled, "Initial Sync Attempt Canceled"));
auto status = _scheduleWorkAndSaveHandle_inlock(
- [=](const executor::TaskExecutor::CallbackArgs& args) {
+ [=, this](const executor::TaskExecutor::CallbackArgs& args) {
_startInitialSyncAttemptCallback(args, initialSyncAttempt, initialSyncMaxAttempts);
},
&_startInitialSyncAttemptHandle,
@@ -681,7 +681,7 @@ void InitialSyncer::_startInitialSyncAttemptCallback(
// _scheduleWorkAndSaveHandle_inlock() is shutdown-aware.
status = _scheduleWorkAndSaveHandle_inlock(
- [=](const executor::TaskExecutor::CallbackArgs& args) {
+ [=, this](const executor::TaskExecutor::CallbackArgs& args) {
_chooseSyncSourceCallback(
args, chooseSyncSourceAttempt, chooseSyncSourceMaxAttempts, onCompletionGuard);
},
@@ -745,7 +745,7 @@ void InitialSyncer::_chooseSyncSourceCallback(
"numInitialSyncConnectAttempts"_attr = numInitialSyncConnectAttempts.load());
auto status = _scheduleWorkAtAndSaveHandle_inlock(
when,
- [=](const executor::TaskExecutor::CallbackArgs& args) {
+ [=, this](const executor::TaskExecutor::CallbackArgs& args) {
_chooseSyncSourceCallback(args,
chooseSyncSourceAttempt + 1,
chooseSyncSourceMaxAttempts,
@@ -786,7 +786,7 @@ void InitialSyncer::_chooseSyncSourceCallback(
// Schedule rollback ID checker.
_rollbackChecker = std::make_unique<RollbackChecker>(*_attemptExec, _syncSource);
- auto scheduleResult = _rollbackChecker->reset([=](const RollbackChecker::Result& result) {
+ auto scheduleResult = _rollbackChecker->reset([=, this](const RollbackChecker::Result& result) {
return _rollbackCheckerResetCallback(result, onCompletionGuard);
});
status = scheduleResult.getStatus();
@@ -868,9 +868,9 @@ void InitialSyncer::_rollbackCheckerResetCallback(
// which retries up to 'numInitialSyncOplogFindAttempts' times'. This will fail relatively
// quickly in the presence of network errors, allowing us to choose a different sync source.
status = _scheduleLastOplogEntryFetcher_inlock(
- [=](const StatusWith<mongo::Fetcher::QueryResponse>& response,
- mongo::Fetcher::NextAction*,
- mongo::BSONObjBuilder*) mutable {
+ [=, this](const StatusWith<mongo::Fetcher::QueryResponse>& response,
+ mongo::Fetcher::NextAction*,
+ mongo::BSONObjBuilder*) mutable {
_lastOplogEntryFetcherCallbackForDefaultBeginFetchingOpTime(response,
onCompletionGuard);
},
@@ -947,9 +947,9 @@ Status InitialSyncer::_scheduleGetBeginFetchingOpTime_inlock(
_syncSource,
NamespaceString::kSessionTransactionsTableNamespace.db().toString(),
cmd.obj(),
- [=](const StatusWith<mongo::Fetcher::QueryResponse>& response,
- mongo::Fetcher::NextAction*,
- mongo::BSONObjBuilder*) mutable {
+ [=, this](const StatusWith<mongo::Fetcher::QueryResponse>& response,
+ mongo::Fetcher::NextAction*,
+ mongo::BSONObjBuilder*) mutable {
_getBeginFetchingOpTimeCallback(
response, onCompletionGuard, defaultBeginFetchingOpTime);
},
@@ -1019,9 +1019,9 @@ void InitialSyncer::_getBeginFetchingOpTimeCallback(
// which retries up to 'numInitialSyncOplogFindAttempts' times'. This will fail relatively
// quickly in the presence of network errors, allowing us to choose a different sync source.
status = _scheduleLastOplogEntryFetcher_inlock(
- [=](const StatusWith<mongo::Fetcher::QueryResponse>& response,
- mongo::Fetcher::NextAction*,
- mongo::BSONObjBuilder*) mutable {
+ [=, this](const StatusWith<mongo::Fetcher::QueryResponse>& response,
+ mongo::Fetcher::NextAction*,
+ mongo::BSONObjBuilder*) mutable {
_lastOplogEntryFetcherCallbackForBeginApplyingTimestamp(
response, onCompletionGuard, beginFetchingOpTime);
},
@@ -1075,9 +1075,9 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForBeginApplyingTimestamp(
_syncSource,
NamespaceString::kServerConfigurationNamespace.db().toString(),
queryBob.obj(),
- [=](const StatusWith<mongo::Fetcher::QueryResponse>& response,
- mongo::Fetcher::NextAction*,
- mongo::BSONObjBuilder*) mutable {
+ [=, this](const StatusWith<mongo::Fetcher::QueryResponse>& response,
+ mongo::Fetcher::NextAction*,
+ mongo::BSONObjBuilder*) mutable {
_fcvFetcherCallback(response, onCompletionGuard, lastOpTime, beginFetchingOpTime);
},
ReadPreferenceSetting::secondaryPreferredMetadata(),
@@ -1227,12 +1227,12 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse>
std::make_unique<OplogFetcherRestartDecisionInitialSyncer>(
_sharedData.get(), _opts.oplogFetcherMaxFetcherRestarts),
_dataReplicatorExternalState.get(),
- [=](OplogFetcher::Documents::const_iterator first,
- OplogFetcher::Documents::const_iterator last,
- const OplogFetcher::DocumentsInfo& info) {
+ [=, this](OplogFetcher::Documents::const_iterator first,
+ OplogFetcher::Documents::const_iterator last,
+ const OplogFetcher::DocumentsInfo& info) {
return _enqueueDocuments(first, last, info);
},
- [=](const Status& s, int rbid) { _oplogFetcherCallback(s, onCompletionGuard); },
+ [=, this](const Status& s, int rbid) { _oplogFetcherCallback(s, onCompletionGuard); },
std::move(oplogFetcherConfig));
LOGV2_DEBUG(21178,
@@ -1385,9 +1385,9 @@ void InitialSyncer::_allDatabaseClonerCallback(
// strategy used when retrieving collection data, and avoids retrieving all the data and then
// throwing it away due to a transient network outage.
status = _scheduleLastOplogEntryFetcher_inlock(
- [=](const StatusWith<mongo::Fetcher::QueryResponse>& status,
- mongo::Fetcher::NextAction*,
- mongo::BSONObjBuilder*) {
+ [=, this](const StatusWith<mongo::Fetcher::QueryResponse>& status,
+ mongo::Fetcher::NextAction*,
+ mongo::BSONObjBuilder*) {
_lastOplogEntryFetcherCallbackForStopTimestamp(status, onCompletionGuard);
},
kInitialSyncerHandlesRetries);
@@ -1409,31 +1409,31 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForStopTimestamp(
if (_shouldRetryError(lock, status)) {
auto scheduleStatus =
(*_attemptExec)
- ->scheduleWork(
- [this, onCompletionGuard](executor::TaskExecutor::CallbackArgs args) {
- // It is not valid to schedule the retry from within this callback,
- // hence we schedule a lambda to schedule the retry.
- stdx::lock_guard<Latch> lock(_mutex);
- // Since the stopTimestamp is retrieved after we have done all the
- // work of retrieving collection data, we handle retries within this
- // class by retrying for
- // 'initialSyncTransientErrorRetryPeriodSeconds' (default 24 hours).
- // This is the same retry strategy used when retrieving collection
- // data, and avoids retrieving all the data and then throwing it
- // away due to a transient network outage.
- auto status = _scheduleLastOplogEntryFetcher_inlock(
- [=](const StatusWith<mongo::Fetcher::QueryResponse>& status,
- mongo::Fetcher::NextAction*,
- mongo::BSONObjBuilder*) {
- _lastOplogEntryFetcherCallbackForStopTimestamp(
- status, onCompletionGuard);
- },
- kInitialSyncerHandlesRetries);
- if (!status.isOK()) {
- onCompletionGuard->setResultAndCancelRemainingWork_inlock(
- lock, status);
- }
- });
+ ->scheduleWork([this, onCompletionGuard](
+ executor::TaskExecutor::CallbackArgs args) {
+ // It is not valid to schedule the retry from within this callback,
+ // hence we schedule a lambda to schedule the retry.
+ stdx::lock_guard<Latch> lock(_mutex);
+ // Since the stopTimestamp is retrieved after we have done all the
+ // work of retrieving collection data, we handle retries within this
+ // class by retrying for
+ // 'initialSyncTransientErrorRetryPeriodSeconds' (default 24 hours).
+ // This is the same retry strategy used when retrieving collection
+ // data, and avoids retrieving all the data and then throwing it
+ // away due to a transient network outage.
+ auto status = _scheduleLastOplogEntryFetcher_inlock(
+ [=, this](const StatusWith<mongo::Fetcher::QueryResponse>& status,
+ mongo::Fetcher::NextAction*,
+ mongo::BSONObjBuilder*) {
+ _lastOplogEntryFetcherCallbackForStopTimestamp(
+ status, onCompletionGuard);
+ },
+ kInitialSyncerHandlesRetries);
+ if (!status.isOK()) {
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock,
+ status);
+ }
+ });
if (scheduleStatus.isOK())
return;
// If scheduling failed, we're shutting down and cannot retry.
@@ -1569,7 +1569,7 @@ void InitialSyncer::_getNextApplierBatchCallback(
Date_t lastAppliedWall = ops.back().getWallClockTime();
auto numApplied = ops.size();
- MultiApplier::CallbackFn onCompletionFn = [=](const Status& s) {
+ MultiApplier::CallbackFn onCompletionFn = [=, this](const Status& s) {
return _multiApplierCallback(
s, {lastApplied, lastAppliedWall}, numApplied, onCompletionGuard);
};
@@ -1611,7 +1611,9 @@ void InitialSyncer::_getNextApplierBatchCallback(
auto when = (*_attemptExec)->now() + _opts.getApplierBatchCallbackRetryWait;
status = _scheduleWorkAtAndSaveHandle_inlock(
when,
- [=](const CallbackArgs& args) { _getNextApplierBatchCallback(args, onCompletionGuard); },
+ [=, this](const CallbackArgs& args) {
+ _getNextApplierBatchCallback(args, onCompletionGuard);
+ },
&_getNextApplierBatchHandle,
"_getNextApplierBatchCallback");
if (!status.isOK()) {
@@ -1722,8 +1724,10 @@ void InitialSyncer::_finishInitialSyncAttempt(const StatusWith<OpTimeAndWallTime
// declare the scope guard before the lock guard.
auto result = lastApplied;
ScopeGuard finishCallbackGuard([this, &result] {
- auto scheduleResult = _exec->scheduleWork(
- [=](const mongo::executor::TaskExecutor::CallbackArgs&) { _finishCallback(result); });
+ auto scheduleResult =
+ _exec->scheduleWork([=, this](const mongo::executor::TaskExecutor::CallbackArgs&) {
+ _finishCallback(result);
+ });
if (!scheduleResult.isOK()) {
LOGV2_WARNING(21197,
"Unable to schedule initial syncer completion task due to "
@@ -1811,7 +1815,7 @@ void InitialSyncer::_finishInitialSyncAttempt(const StatusWith<OpTimeAndWallTime
auto when = (*_attemptExec)->now() + _opts.initialSyncRetryWait;
auto status = _scheduleWorkAtAndSaveHandle_inlock(
when,
- [=](const executor::TaskExecutor::CallbackArgs& args) {
+ [=, this](const executor::TaskExecutor::CallbackArgs& args) {
_startInitialSyncAttemptCallback(
args, _stats.failedInitialSyncAttempts, _stats.maxFailedInitialSyncAttempts);
},
@@ -1987,7 +1991,7 @@ void InitialSyncer::_checkApplierProgressAndScheduleGetNextApplierBatch_inlock(
// Get another batch to apply.
// _scheduleWorkAndSaveHandle_inlock() is shutdown-aware.
auto status = _scheduleWorkAndSaveHandle_inlock(
- [=](const executor::TaskExecutor::CallbackArgs& args) {
+ [=, this](const executor::TaskExecutor::CallbackArgs& args) {
return _getNextApplierBatchCallback(args, onCompletionGuard);
},
&_getNextApplierBatchHandle,
@@ -2013,7 +2017,7 @@ void InitialSyncer::_scheduleRollbackCheckerCheckForRollback_inlock(
}
auto scheduleResult =
- _rollbackChecker->checkForRollback([=](const RollbackChecker::Result& result) {
+ _rollbackChecker->checkForRollback([=, this](const RollbackChecker::Result& result) {
_rollbackCheckerCheckForRollbackCallback(result, onCompletionGuard);
});