summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/replication_coordinator_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_impl.cpp')
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp90
1 files changed, 0 insertions, 90 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 301f2c92c7c..e5a0d5bf393 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -48,7 +48,6 @@
#include "mongo/db/repl/data_replicator_external_state_initial_sync.h"
#include "mongo/db/repl/elect_cmd_runner.h"
#include "mongo/db/repl/freshness_checker.h"
-#include "mongo/db/repl/freshness_scanner.h"
#include "mongo/db/repl/handshake_args.h"
#include "mongo/db/repl/is_master_response.h"
#include "mongo/db/repl/last_vote.h"
@@ -2893,87 +2892,6 @@ Status ReplicationCoordinatorImpl::abortCatchupIfNeeded() {
return Status(ErrorCodes::IllegalOperation, "The node is not in catch-up mode.");
}
-void ReplicationCoordinatorImpl::_scanOpTimeForCatchUp_inlock() {
- auto scanner = std::make_shared<FreshnessScanner>();
- auto scanStartTime = _replExecutor->now();
- auto evhStatus = scanner->start(
- _replExecutor.get(), _rsConfig, _selfIndex, _rsConfig.getCatchUpTimeoutPeriod());
- if (evhStatus == ErrorCodes::ShutdownInProgress) {
- _enterDrainMode_inlock();
- return;
- }
- fassertStatusOK(40254, evhStatus.getStatus());
- long long term = _topCoord->getTerm();
- _replExecutor->onEvent(
- evhStatus.getValue(), [this, scanner, scanStartTime, term](const CallbackArgs& cbData) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- _enterDrainMode_inlock();
- return;
- }
- auto totalTimeout = _rsConfig.getCatchUpTimeoutPeriod();
- auto catchUpTimeout = totalTimeout - (_replExecutor->now() - scanStartTime);
- _catchUpOplogToLatest_inlock(*scanner, catchUpTimeout, term);
- });
-}
-
-void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessScanner& scanner,
- Milliseconds timeout,
- long long originalTerm) {
- // On stepping down, the node doesn't update its term immediately due to SERVER-21425.
- // Term is also checked in case the catchup timeout is so long that the node becomes primary
- // again.
- if (!_memberState.primary() || originalTerm != _topCoord->getTerm()) {
- // If the node steps down during the catch-up, we don't go into drain mode.
- log() << "Stopped transition to primary of term " << originalTerm
- << " because I've already stepped down.";
- return;
- }
-
- auto result = scanner.getResult();
-
- // Cannot access any nodes within timeout.
- if (result.size() == 0) {
- log() << "Could not access any nodes within timeout when checking for "
- << "additional ops to apply before finishing transition to primary. "
- << "Will move forward with becoming primary anyway.";
- _enterDrainMode_inlock();
- return;
- }
-
- // I'm most up-to-date as far as I know.
- auto freshnessInfo = result.front();
- if (freshnessInfo.opTime <= _getMyLastAppliedOpTime_inlock()) {
- log() << "My optime is most up-to-date, skipping catch-up "
- << "and completing transition to primary.";
- _enterDrainMode_inlock();
- return;
- }
-
- // Wait for the replication level to reach the latest opTime within timeout.
- auto latestOpTime = freshnessInfo.opTime;
- auto finishCB = [this, latestOpTime]() {
- if (latestOpTime > _getMyLastAppliedOpTime_inlock()) {
- log() << "Cannot catch up oplog after becoming primary.";
- } else {
- log() << "Finished catch-up oplog after becoming primary.";
- }
-
- _enterDrainMode_inlock();
- };
- auto waiterInfo = std::make_shared<CallbackWaiter>(freshnessInfo.opTime, finishCB);
-
- _opTimeWaiterList.add_inlock(waiterInfo.get());
- auto timeoutCB = [this, waiterInfo, finishCB](const CallbackArgs& cbData) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_opTimeWaiterList.remove_inlock(waiterInfo.get())) {
- finishCB();
- }
- };
- // Schedule the timeout callback. It may signal after we have already caught up.
- _replExecutor->scheduleWorkAt(_replExecutor->now() + timeout, timeoutCB);
-}
-
void ReplicationCoordinatorImpl::_enterDrainMode_inlock() {
_applierState = ApplierState::Draining;
_externalState->stopProducer();
@@ -3824,14 +3742,6 @@ EventHandle ReplicationCoordinatorImpl::_makeEvent() {
return eventResult.getValue();
}
-void ReplicationCoordinatorImpl::_scheduleElectionWinNotification_inlock() {
- if (!_getMemberState_inlock().primary()) {
- return;
- }
-
- _restartHeartbeats_inlock();
-}
-
WriteConcernOptions ReplicationCoordinatorImpl::populateUnsetWriteConcernOptionsSyncMode(
WriteConcernOptions wc) {