diff options
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_impl.cpp')
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 90 |
1 files changed, 0 insertions, 90 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 301f2c92c7c..e5a0d5bf393 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -48,7 +48,6 @@ #include "mongo/db/repl/data_replicator_external_state_initial_sync.h" #include "mongo/db/repl/elect_cmd_runner.h" #include "mongo/db/repl/freshness_checker.h" -#include "mongo/db/repl/freshness_scanner.h" #include "mongo/db/repl/handshake_args.h" #include "mongo/db/repl/is_master_response.h" #include "mongo/db/repl/last_vote.h" @@ -2893,87 +2892,6 @@ Status ReplicationCoordinatorImpl::abortCatchupIfNeeded() { return Status(ErrorCodes::IllegalOperation, "The node is not in catch-up mode."); } -void ReplicationCoordinatorImpl::_scanOpTimeForCatchUp_inlock() { - auto scanner = std::make_shared<FreshnessScanner>(); - auto scanStartTime = _replExecutor->now(); - auto evhStatus = scanner->start( - _replExecutor.get(), _rsConfig, _selfIndex, _rsConfig.getCatchUpTimeoutPeriod()); - if (evhStatus == ErrorCodes::ShutdownInProgress) { - _enterDrainMode_inlock(); - return; - } - fassertStatusOK(40254, evhStatus.getStatus()); - long long term = _topCoord->getTerm(); - _replExecutor->onEvent( - evhStatus.getValue(), [this, scanner, scanStartTime, term](const CallbackArgs& cbData) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - if (cbData.status == ErrorCodes::CallbackCanceled) { - _enterDrainMode_inlock(); - return; - } - auto totalTimeout = _rsConfig.getCatchUpTimeoutPeriod(); - auto catchUpTimeout = totalTimeout - (_replExecutor->now() - scanStartTime); - _catchUpOplogToLatest_inlock(*scanner, catchUpTimeout, term); - }); -} - -void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessScanner& scanner, - Milliseconds timeout, - long long originalTerm) { - // On stepping down, the node doesn't update its term immediately due to SERVER-21425. - // Term is also checked in case the catchup timeout is so long that the node becomes primary - // again. - if (!_memberState.primary() || originalTerm != _topCoord->getTerm()) { - // If the node steps down during the catch-up, we don't go into drain mode. - log() << "Stopped transition to primary of term " << originalTerm - << " because I've already stepped down."; - return; - } - - auto result = scanner.getResult(); - - // Cannot access any nodes within timeout. - if (result.size() == 0) { - log() << "Could not access any nodes within timeout when checking for " - << "additional ops to apply before finishing transition to primary. " - << "Will move forward with becoming primary anyway."; - _enterDrainMode_inlock(); - return; - } - - // I'm most up-to-date as far as I know. - auto freshnessInfo = result.front(); - if (freshnessInfo.opTime <= _getMyLastAppliedOpTime_inlock()) { - log() << "My optime is most up-to-date, skipping catch-up " - << "and completing transition to primary."; - _enterDrainMode_inlock(); - return; - } - - // Wait for the replication level to reach the latest opTime within timeout. - auto latestOpTime = freshnessInfo.opTime; - auto finishCB = [this, latestOpTime]() { - if (latestOpTime > _getMyLastAppliedOpTime_inlock()) { - log() << "Cannot catch up oplog after becoming primary."; - } else { - log() << "Finished catch-up oplog after becoming primary."; - } - - _enterDrainMode_inlock(); - }; - auto waiterInfo = std::make_shared<CallbackWaiter>(freshnessInfo.opTime, finishCB); - - _opTimeWaiterList.add_inlock(waiterInfo.get()); - auto timeoutCB = [this, waiterInfo, finishCB](const CallbackArgs& cbData) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - if (_opTimeWaiterList.remove_inlock(waiterInfo.get())) { - finishCB(); - } - }; - // Schedule the timeout callback. It may signal after we have already caught up. - _replExecutor->scheduleWorkAt(_replExecutor->now() + timeout, timeoutCB); -} - void ReplicationCoordinatorImpl::_enterDrainMode_inlock() { _applierState = ApplierState::Draining; _externalState->stopProducer(); @@ -3824,14 +3742,6 @@ EventHandle ReplicationCoordinatorImpl::_makeEvent() { return eventResult.getValue(); } -void ReplicationCoordinatorImpl::_scheduleElectionWinNotification_inlock() { - if (!_getMemberState_inlock().primary()) { - return; - } - - _restartHeartbeats_inlock(); -} - WriteConcernOptions ReplicationCoordinatorImpl::populateUnsetWriteConcernOptionsSyncMode( WriteConcernOptions wc) { |