diff options
Diffstat (limited to 'src/mongo/db/repl/initial_syncer.cpp')
-rw-r--r-- | src/mongo/db/repl/initial_syncer.cpp | 114 |
1 files changed, 59 insertions, 55 deletions
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp index cd4bba6c986..21d12d6ecb9 100644 --- a/src/mongo/db/repl/initial_syncer.cpp +++ b/src/mongo/db/repl/initial_syncer.cpp @@ -284,7 +284,7 @@ Status InitialSyncer::startup(OperationContext* opCtx, _clonerAttemptExec = std::make_unique<executor::ScopedTaskExecutor>( _clonerExec, Status(ErrorCodes::CallbackCanceled, "Initial Sync Attempt Canceled")); auto status = _scheduleWorkAndSaveHandle_inlock( - [=](const executor::TaskExecutor::CallbackArgs& args) { + [=, this](const executor::TaskExecutor::CallbackArgs& args) { _startInitialSyncAttemptCallback(args, initialSyncAttempt, initialSyncMaxAttempts); }, &_startInitialSyncAttemptHandle, @@ -681,7 +681,7 @@ void InitialSyncer::_startInitialSyncAttemptCallback( // _scheduleWorkAndSaveHandle_inlock() is shutdown-aware. status = _scheduleWorkAndSaveHandle_inlock( - [=](const executor::TaskExecutor::CallbackArgs& args) { + [=, this](const executor::TaskExecutor::CallbackArgs& args) { _chooseSyncSourceCallback( args, chooseSyncSourceAttempt, chooseSyncSourceMaxAttempts, onCompletionGuard); }, @@ -745,7 +745,7 @@ void InitialSyncer::_chooseSyncSourceCallback( "numInitialSyncConnectAttempts"_attr = numInitialSyncConnectAttempts.load()); auto status = _scheduleWorkAtAndSaveHandle_inlock( when, - [=](const executor::TaskExecutor::CallbackArgs& args) { + [=, this](const executor::TaskExecutor::CallbackArgs& args) { _chooseSyncSourceCallback(args, chooseSyncSourceAttempt + 1, chooseSyncSourceMaxAttempts, @@ -786,7 +786,7 @@ void InitialSyncer::_chooseSyncSourceCallback( // Schedule rollback ID checker. _rollbackChecker = std::make_unique<RollbackChecker>(*_attemptExec, _syncSource); - auto scheduleResult = _rollbackChecker->reset([=](const RollbackChecker::Result& result) { + auto scheduleResult = _rollbackChecker->reset([=, this](const RollbackChecker::Result& result) { return _rollbackCheckerResetCallback(result, onCompletionGuard); }); status = scheduleResult.getStatus(); @@ -868,9 +868,9 @@ void InitialSyncer::_rollbackCheckerResetCallback( // which retries up to 'numInitialSyncOplogFindAttempts' times'. This will fail relatively // quickly in the presence of network errors, allowing us to choose a different sync source. status = _scheduleLastOplogEntryFetcher_inlock( - [=](const StatusWith<mongo::Fetcher::QueryResponse>& response, - mongo::Fetcher::NextAction*, - mongo::BSONObjBuilder*) mutable { + [=, this](const StatusWith<mongo::Fetcher::QueryResponse>& response, + mongo::Fetcher::NextAction*, + mongo::BSONObjBuilder*) mutable { _lastOplogEntryFetcherCallbackForDefaultBeginFetchingOpTime(response, onCompletionGuard); }, @@ -947,9 +947,9 @@ Status InitialSyncer::_scheduleGetBeginFetchingOpTime_inlock( _syncSource, NamespaceString::kSessionTransactionsTableNamespace.db().toString(), cmd.obj(), - [=](const StatusWith<mongo::Fetcher::QueryResponse>& response, - mongo::Fetcher::NextAction*, - mongo::BSONObjBuilder*) mutable { + [=, this](const StatusWith<mongo::Fetcher::QueryResponse>& response, + mongo::Fetcher::NextAction*, + mongo::BSONObjBuilder*) mutable { _getBeginFetchingOpTimeCallback( response, onCompletionGuard, defaultBeginFetchingOpTime); }, @@ -1019,9 +1019,9 @@ void InitialSyncer::_getBeginFetchingOpTimeCallback( // which retries up to 'numInitialSyncOplogFindAttempts' times'. This will fail relatively // quickly in the presence of network errors, allowing us to choose a different sync source. status = _scheduleLastOplogEntryFetcher_inlock( - [=](const StatusWith<mongo::Fetcher::QueryResponse>& response, - mongo::Fetcher::NextAction*, - mongo::BSONObjBuilder*) mutable { + [=, this](const StatusWith<mongo::Fetcher::QueryResponse>& response, + mongo::Fetcher::NextAction*, + mongo::BSONObjBuilder*) mutable { _lastOplogEntryFetcherCallbackForBeginApplyingTimestamp( response, onCompletionGuard, beginFetchingOpTime); }, @@ -1075,9 +1075,9 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForBeginApplyingTimestamp( _syncSource, NamespaceString::kServerConfigurationNamespace.db().toString(), queryBob.obj(), - [=](const StatusWith<mongo::Fetcher::QueryResponse>& response, - mongo::Fetcher::NextAction*, - mongo::BSONObjBuilder*) mutable { + [=, this](const StatusWith<mongo::Fetcher::QueryResponse>& response, + mongo::Fetcher::NextAction*, + mongo::BSONObjBuilder*) mutable { _fcvFetcherCallback(response, onCompletionGuard, lastOpTime, beginFetchingOpTime); }, ReadPreferenceSetting::secondaryPreferredMetadata(), @@ -1227,12 +1227,12 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse> std::make_unique<OplogFetcherRestartDecisionInitialSyncer>( _sharedData.get(), _opts.oplogFetcherMaxFetcherRestarts), _dataReplicatorExternalState.get(), - [=](OplogFetcher::Documents::const_iterator first, - OplogFetcher::Documents::const_iterator last, - const OplogFetcher::DocumentsInfo& info) { + [=, this](OplogFetcher::Documents::const_iterator first, + OplogFetcher::Documents::const_iterator last, + const OplogFetcher::DocumentsInfo& info) { return _enqueueDocuments(first, last, info); }, - [=](const Status& s, int rbid) { _oplogFetcherCallback(s, onCompletionGuard); }, + [=, this](const Status& s, int rbid) { _oplogFetcherCallback(s, onCompletionGuard); }, std::move(oplogFetcherConfig)); LOGV2_DEBUG(21178, @@ -1385,9 +1385,9 @@ void InitialSyncer::_allDatabaseClonerCallback( // strategy used when retrieving collection data, and avoids retrieving all the data and then // throwing it away due to a transient network outage. status = _scheduleLastOplogEntryFetcher_inlock( - [=](const StatusWith<mongo::Fetcher::QueryResponse>& status, - mongo::Fetcher::NextAction*, - mongo::BSONObjBuilder*) { + [=, this](const StatusWith<mongo::Fetcher::QueryResponse>& status, + mongo::Fetcher::NextAction*, + mongo::BSONObjBuilder*) { _lastOplogEntryFetcherCallbackForStopTimestamp(status, onCompletionGuard); }, kInitialSyncerHandlesRetries); @@ -1409,31 +1409,31 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForStopTimestamp( if (_shouldRetryError(lock, status)) { auto scheduleStatus = (*_attemptExec) - ->scheduleWork( - [this, onCompletionGuard](executor::TaskExecutor::CallbackArgs args) { - // It is not valid to schedule the retry from within this callback, - // hence we schedule a lambda to schedule the retry. - stdx::lock_guard<Latch> lock(_mutex); - // Since the stopTimestamp is retrieved after we have done all the - // work of retrieving collection data, we handle retries within this - // class by retrying for - // 'initialSyncTransientErrorRetryPeriodSeconds' (default 24 hours). - // This is the same retry strategy used when retrieving collection - // data, and avoids retrieving all the data and then throwing it - // away due to a transient network outage. - auto status = _scheduleLastOplogEntryFetcher_inlock( - [=](const StatusWith<mongo::Fetcher::QueryResponse>& status, - mongo::Fetcher::NextAction*, - mongo::BSONObjBuilder*) { - _lastOplogEntryFetcherCallbackForStopTimestamp( - status, onCompletionGuard); - }, - kInitialSyncerHandlesRetries); - if (!status.isOK()) { - onCompletionGuard->setResultAndCancelRemainingWork_inlock( - lock, status); - } - }); + ->scheduleWork([this, onCompletionGuard]( + executor::TaskExecutor::CallbackArgs args) { + // It is not valid to schedule the retry from within this callback, + // hence we schedule a lambda to schedule the retry. + stdx::lock_guard<Latch> lock(_mutex); + // Since the stopTimestamp is retrieved after we have done all the + // work of retrieving collection data, we handle retries within this + // class by retrying for + // 'initialSyncTransientErrorRetryPeriodSeconds' (default 24 hours). + // This is the same retry strategy used when retrieving collection + // data, and avoids retrieving all the data and then throwing it + // away due to a transient network outage. + auto status = _scheduleLastOplogEntryFetcher_inlock( + [=, this](const StatusWith<mongo::Fetcher::QueryResponse>& status, + mongo::Fetcher::NextAction*, + mongo::BSONObjBuilder*) { + _lastOplogEntryFetcherCallbackForStopTimestamp( + status, onCompletionGuard); + }, + kInitialSyncerHandlesRetries); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, + status); + } + }); if (scheduleStatus.isOK()) return; // If scheduling failed, we're shutting down and cannot retry. @@ -1569,7 +1569,7 @@ void InitialSyncer::_getNextApplierBatchCallback( Date_t lastAppliedWall = ops.back().getWallClockTime(); auto numApplied = ops.size(); - MultiApplier::CallbackFn onCompletionFn = [=](const Status& s) { + MultiApplier::CallbackFn onCompletionFn = [=, this](const Status& s) { return _multiApplierCallback( s, {lastApplied, lastAppliedWall}, numApplied, onCompletionGuard); }; @@ -1611,7 +1611,9 @@ void InitialSyncer::_getNextApplierBatchCallback( auto when = (*_attemptExec)->now() + _opts.getApplierBatchCallbackRetryWait; status = _scheduleWorkAtAndSaveHandle_inlock( when, - [=](const CallbackArgs& args) { _getNextApplierBatchCallback(args, onCompletionGuard); }, + [=, this](const CallbackArgs& args) { + _getNextApplierBatchCallback(args, onCompletionGuard); + }, &_getNextApplierBatchHandle, "_getNextApplierBatchCallback"); if (!status.isOK()) { @@ -1722,8 +1724,10 @@ void InitialSyncer::_finishInitialSyncAttempt(const StatusWith<OpTimeAndWallTime // declare the scope guard before the lock guard. auto result = lastApplied; ScopeGuard finishCallbackGuard([this, &result] { - auto scheduleResult = _exec->scheduleWork( - [=](const mongo::executor::TaskExecutor::CallbackArgs&) { _finishCallback(result); }); + auto scheduleResult = + _exec->scheduleWork([=, this](const mongo::executor::TaskExecutor::CallbackArgs&) { + _finishCallback(result); + }); if (!scheduleResult.isOK()) { LOGV2_WARNING(21197, "Unable to schedule initial syncer completion task due to " @@ -1811,7 +1815,7 @@ void InitialSyncer::_finishInitialSyncAttempt(const StatusWith<OpTimeAndWallTime auto when = (*_attemptExec)->now() + _opts.initialSyncRetryWait; auto status = _scheduleWorkAtAndSaveHandle_inlock( when, - [=](const executor::TaskExecutor::CallbackArgs& args) { + [=, this](const executor::TaskExecutor::CallbackArgs& args) { _startInitialSyncAttemptCallback( args, _stats.failedInitialSyncAttempts, _stats.maxFailedInitialSyncAttempts); }, @@ -1987,7 +1991,7 @@ void InitialSyncer::_checkApplierProgressAndScheduleGetNextApplierBatch_inlock( // Get another batch to apply. // _scheduleWorkAndSaveHandle_inlock() is shutdown-aware. auto status = _scheduleWorkAndSaveHandle_inlock( - [=](const executor::TaskExecutor::CallbackArgs& args) { + [=, this](const executor::TaskExecutor::CallbackArgs& args) { return _getNextApplierBatchCallback(args, onCompletionGuard); }, &_getNextApplierBatchHandle, @@ -2013,7 +2017,7 @@ void InitialSyncer::_scheduleRollbackCheckerCheckForRollback_inlock( } auto scheduleResult = - _rollbackChecker->checkForRollback([=](const RollbackChecker::Result& result) { + _rollbackChecker->checkForRollback([=, this](const RollbackChecker::Result& result) { _rollbackCheckerCheckForRollbackCallback(result, onCompletionGuard); }); |