diff options
author | Andy Schwerin <Andy Schwerin schwerin@mongodb.com> | 2017-04-13 17:05:04 -0400 |
---|---|---|
committer | Andy Schwerin <Andy Schwerin schwerin@mongodb.com> | 2017-04-19 11:50:48 -0400 |
commit | 8cd017f699acba04886b1a7b39f2f15d93c6ff20 (patch) | |
tree | eccab0ae05bb711c36e5dd541f21e7b84de683a2 /src/mongo/db/repl/replication_coordinator_impl.cpp | |
parent | d54dabeb93b36e4ce2a67ea35dba1f57e1298f4f (diff) | |
download | mongo-8cd017f699acba04886b1a7b39f2f15d93c6ff20.tar.gz |
SERVER-28865 Keep a unique pointer to the executor instead of a value.
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_impl.cpp')
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 146 |
1 files changed, 73 insertions, 73 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index c6213308b17..f42e712ffd3 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -314,7 +314,7 @@ ReplicationCoordinatorImpl::ReplicationCoordinatorImpl( _settings(settings), _replMode(getReplicationModeFromSettings(settings)), _topCoord(std::move(topCoord)), - _replExecutor(std::move(network), prngSeed), + _replExecutor(stdx::make_unique<ReplicationExecutor>(std::move(network), prngSeed)), _externalState(std::move(externalState)), _inShutdown(false), _memberState(MemberState::RS_STARTUP), @@ -362,7 +362,7 @@ void ReplicationCoordinatorImpl::_waitForStartUpComplete() { handle = _finishLoadLocalConfigCbh; } if (handle.isValid()) { - _replExecutor.wait(handle); + _replExecutor->wait(handle); } } @@ -409,11 +409,11 @@ LogicalTime ReplicationCoordinatorImpl::_getCurrentCommittedLogicalTime_inlock() void ReplicationCoordinatorImpl::appendDiagnosticBSON(mongo::BSONObjBuilder* bob) { BSONObjBuilder eBuilder(bob->subobjStart("executor")); - _replExecutor.appendDiagnosticBSON(&eBuilder); + _replExecutor->appendDiagnosticBSON(&eBuilder); } void ReplicationCoordinatorImpl::appendConnectionStats(executor::ConnectionPoolStats* stats) const { - _replExecutor.appendConnectionStats(stats); + _replExecutor->appendConnectionStats(stats); } bool ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* opCtx) { @@ -684,7 +684,7 @@ void ReplicationCoordinatorImpl::startup(OperationContext* opCtx) { return; } - _replExecutor.startup(); + _replExecutor->startup(); { stdx::lock_guard<stdx::mutex> lk(_mutex); @@ -753,8 +753,8 @@ void ReplicationCoordinatorImpl::shutdown(OperationContext* opCtx) { initialSyncerCopy.reset(); } _externalState->shutdown(opCtx); - _replExecutor.shutdown(); - _replExecutor.join(); + _replExecutor->shutdown(); + _replExecutor->join(); } const ReplSettings& ReplicationCoordinatorImpl::getSettings() const { @@ -811,7 +811,7 @@ void ReplicationCoordinatorImpl::clearSyncSourceBlacklist() { executor::TaskExecutor::EventHandle ReplicationCoordinatorImpl::setFollowerMode_nonBlocking( const MemberState& newState, bool* success) { - auto finishedSettingFollowerMode = _replExecutor.makeEvent(); + auto finishedSettingFollowerMode = _replExecutor->makeEvent(); if (finishedSettingFollowerMode.getStatus() == ErrorCodes::ShutdownInProgress) { return {}; } @@ -823,7 +823,7 @@ executor::TaskExecutor::EventHandle ReplicationCoordinatorImpl::setFollowerMode_ bool ReplicationCoordinatorImpl::setFollowerMode(const MemberState& newState) { bool success = false; if (auto eventHandle = setFollowerMode_nonBlocking(newState, &success)) { - _replExecutor.waitForEvent(eventHandle); + _replExecutor->waitForEvent(eventHandle); } return success; } @@ -836,12 +836,12 @@ void ReplicationCoordinatorImpl::_setFollowerModeFinish( if (newState == _topCoord->getMemberState()) { *success = true; - _replExecutor.signalEvent(finishedSettingFollowerMode); + _replExecutor->signalEvent(finishedSettingFollowerMode); return; } if (_topCoord->getRole() == TopologyCoordinator::Role::leader) { *success = false; - _replExecutor.signalEvent(finishedSettingFollowerMode); + _replExecutor->signalEvent(finishedSettingFollowerMode); return; } @@ -850,7 +850,7 @@ void ReplicationCoordinatorImpl::_setFollowerModeFinish( // we know that newState != RS_SECONDARY because we would have returned early, above if // the old and new state were equal. So, try again after the election is over to // finish setting the follower mode. - _replExecutor.onEvent( + _replExecutor->onEvent( electionFinishedEvent, _wrapAsCallbackFn(stdx::bind(&ReplicationCoordinatorImpl::_setFollowerModeFinish, this, @@ -866,7 +866,7 @@ void ReplicationCoordinatorImpl::_setFollowerModeFinish( lk.unlock(); _performPostMemberStateUpdateAction(action); *success = true; - _replExecutor.signalEvent(finishedSettingFollowerMode); + _replExecutor->signalEvent(finishedSettingFollowerMode); } ReplicationCoordinator::ApplierState ReplicationCoordinatorImpl::getApplierState() { @@ -1004,7 +1004,7 @@ void ReplicationCoordinatorImpl::_addSlaveInfo_inlock(const SlaveInfo& slaveInfo void ReplicationCoordinatorImpl::_updateSlaveInfoAppliedOpTime_inlock(SlaveInfo* slaveInfo, const OpTime& opTime) { slaveInfo->lastAppliedOpTime = opTime; - slaveInfo->lastUpdate = _replExecutor.now(); + slaveInfo->lastUpdate = _replExecutor->now(); slaveInfo->down = false; _updateLastCommittedOpTime_inlock(); @@ -1024,7 +1024,7 @@ void ReplicationCoordinatorImpl::_updateSlaveInfoDurableOpTime_inlock(SlaveInfo* return; } slaveInfo->lastDurableOpTime = opTime; - slaveInfo->lastUpdate = _replExecutor.now(); + slaveInfo->lastUpdate = _replExecutor->now(); slaveInfo->down = false; _updateLastCommittedOpTime_inlock(); @@ -1118,7 +1118,7 @@ Status ReplicationCoordinatorImpl::setLastOptimeForSlave(const OID& rid, const T void ReplicationCoordinatorImpl::setMyHeartbeatMessage(const std::string& msg) { stdx::unique_lock<stdx::mutex> lock(_mutex); - _topCoord->setMyHeartbeatMessage(_replExecutor.now(), msg); + _topCoord->setMyHeartbeatMessage(_replExecutor->now(), msg); } void ReplicationCoordinatorImpl::setMyLastAppliedOpTimeForward(const OpTime& opTime) { @@ -1466,7 +1466,7 @@ Status ReplicationCoordinatorImpl::_setLastOptime_inlock( // Update liveness for this node. - slaveInfo->lastUpdate = _replExecutor.now(); + slaveInfo->lastUpdate = _replExecutor->now(); slaveInfo->down = false; _cancelAndRescheduleLivenessUpdate_inlock(args.memberId); return Status::OK(); @@ -1538,7 +1538,7 @@ Status ReplicationCoordinatorImpl::_setLastOptime_inlock(const UpdatePositionArg } // Update liveness for this node. - slaveInfo->lastUpdate = _replExecutor.now(); + slaveInfo->lastUpdate = _replExecutor->now(); slaveInfo->down = false; _cancelAndRescheduleLivenessUpdate_inlock(args.memberId); return Status::OK(); @@ -1783,7 +1783,7 @@ Status ReplicationCoordinatorImpl::stepDown(OperationContext* opCtx, const Milliseconds& waitTime, const Milliseconds& stepdownTime) { - const Date_t startTime = _replExecutor.now(); + const Date_t startTime = _replExecutor->now(); const Date_t stepDownUntil = startTime + stepdownTime; const Date_t waitUntil = startTime + waitTime; @@ -1842,7 +1842,7 @@ bool ReplicationCoordinatorImpl::_tryToStepDown(const Date_t waitUntil, uasserted(ErrorCodes::NotMaster, "Already stepped down from primary while processing step down request"); } - const Date_t now = _replExecutor.now(); + const Date_t now = _replExecutor->now(); if (now >= stepDownUntil) { uasserted(ErrorCodes::ExceededTimeLimit, "By the time we were ready to step down, we were already past the " @@ -1880,7 +1880,7 @@ void ReplicationCoordinatorImpl::_handleTimePassing( bool wonSingleNodeElection = [this]() { stdx::lock_guard<stdx::mutex> lk(_mutex); - return _topCoord->becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(_replExecutor.now()); + return _topCoord->becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(_replExecutor->now()); }(); if (wonSingleNodeElection) { @@ -2052,10 +2052,10 @@ int ReplicationCoordinatorImpl::_getMyId_inlock() const { Status ReplicationCoordinatorImpl::resyncData(OperationContext* opCtx, bool waitUntilCompleted) { _stopDataReplication(opCtx); - auto finishedEvent = uassertStatusOK(_replExecutor.makeEvent()); + auto finishedEvent = uassertStatusOK(_replExecutor->makeEvent()); stdx::function<void()> f; if (waitUntilCompleted) - f = [&finishedEvent, this]() { _replExecutor.signalEvent(finishedEvent); }; + f = [&finishedEvent, this]() { _replExecutor->signalEvent(finishedEvent); }; { stdx::lock_guard<stdx::mutex> lk(_mutex); @@ -2064,7 +2064,7 @@ Status ReplicationCoordinatorImpl::resyncData(OperationContext* opCtx, bool wait // unlock before calling _startDataReplication(). _startDataReplication(opCtx, f); if (waitUntilCompleted) { - _replExecutor.waitForEvent(finishedEvent); + _replExecutor->waitForEvent(finishedEvent); } return Status::OK(); } @@ -2139,7 +2139,7 @@ Status ReplicationCoordinatorImpl::processReplSetGetStatus( Status result(ErrorCodes::InternalError, "didn't set status in prepareStatusResponse"); _topCoord->prepareStatusResponse( TopologyCoordinator::ReplSetStatusArgs{ - _replExecutor.now(), + _replExecutor->now(), static_cast<unsigned>(time(0) - serverGlobalParams.started), _getMyLastAppliedOpTime_inlock(), _getMyLastDurableOpTime_inlock(), @@ -2222,7 +2222,7 @@ void ReplicationCoordinatorImpl::processReplSetMetadata(const rpc::ReplSetMetada } if (evh) { - _replExecutor.waitForEvent(evh); + _replExecutor->waitForEvent(evh); } } @@ -2305,7 +2305,7 @@ Status ReplicationCoordinatorImpl::processReplSetSyncFrom(OperationContext* opCt Status ReplicationCoordinatorImpl::processReplSetFreeze(int secs, BSONObjBuilder* resultObj) { auto result = [=]() { stdx::lock_guard<stdx::mutex> lock(_mutex); - return _topCoord->prepareFreezeResponse(_replExecutor.now(), secs, resultObj); + return _topCoord->prepareFreezeResponse(_replExecutor->now(), secs, resultObj); }(); if (!result.isOK()) { return result.getStatus(); @@ -2335,7 +2335,7 @@ Status ReplicationCoordinatorImpl::processHeartbeat(const ReplSetHeartbeatArgs& stdx::lock_guard<stdx::mutex> lk(_mutex); - const Date_t now = _replExecutor.now(); + const Date_t now = _replExecutor->now(); Status result = _topCoord->prepareHeartbeatResponse(now, args, _settings.ourSetName(), @@ -2446,7 +2446,7 @@ Status ReplicationCoordinatorImpl::processReplSetReconfig(OperationContext* opCt << " members parses ok"; if (!args.force) { - status = checkQuorumForReconfig(&_replExecutor, newConfig, myIndex.getValue()); + status = checkQuorumForReconfig(_replExecutor.get(), newConfig, myIndex.getValue()); if (!status.isOK()) { error() << "replSetReconfig failed; " << status; return status; @@ -2459,7 +2459,7 @@ Status ReplicationCoordinatorImpl::processReplSetReconfig(OperationContext* opCt return status; } - auto reconfigFinished = uassertStatusOK(_replExecutor.makeEvent()); + auto reconfigFinished = uassertStatusOK(_replExecutor->makeEvent()); const auto reconfigFinishFn = [ this, newConfig, myIndex = myIndex.getValue(), reconfigFinished ]( const executor::TaskExecutor::CallbackArgs& cbData) { @@ -2476,9 +2476,9 @@ Status ReplicationCoordinatorImpl::processReplSetReconfig(OperationContext* opCt StatusWith<CallbackHandle> cbhStatus(ErrorCodes::InternalError, "reconfigFinishFn hasn't been scheduled"); if (args.force) { - cbhStatus = _replExecutor.scheduleWorkWithGlobalExclusiveLock(reconfigFinishFn); + cbhStatus = _replExecutor->scheduleWorkWithGlobalExclusiveLock(reconfigFinishFn); } else { - cbhStatus = _replExecutor.scheduleWork(reconfigFinishFn); + cbhStatus = _replExecutor->scheduleWork(reconfigFinishFn); } if (cbhStatus.getStatus() == ErrorCodes::ShutdownInProgress) { return cbhStatus.getStatus(); @@ -2487,7 +2487,7 @@ Status ReplicationCoordinatorImpl::processReplSetReconfig(OperationContext* opCt fassert(18824, cbhStatus.getStatus()); configStateGuard.Dismiss(); - _replExecutor.waitForEvent(reconfigFinished); + _replExecutor->waitForEvent(reconfigFinished); return Status::OK(); } @@ -2503,12 +2503,12 @@ void ReplicationCoordinatorImpl::_finishReplSetReconfig( // Do not conduct an election during a reconfig, as the node may not be electable post-reconfig. if (auto electionFinishedEvent = _cancelElectionIfNeeded_inTopoLock()) { // Wait for the election to complete and the node's Role to be set to follower. - _replExecutor.onEvent(electionFinishedEvent, - stdx::bind(&ReplicationCoordinatorImpl::_finishReplSetReconfig, - this, - newConfig, - myIndex, - finishedEvent)); + _replExecutor->onEvent(electionFinishedEvent, + stdx::bind(&ReplicationCoordinatorImpl::_finishReplSetReconfig, + this, + newConfig, + myIndex, + finishedEvent)); return; } @@ -2522,13 +2522,13 @@ void ReplicationCoordinatorImpl::_finishReplSetReconfig( auto evh = _resetElectionInfoOnProtocolVersionUpgrade(oldConfig, newConfig); lk.unlock(); if (evh) { - _replExecutor.onEvent(evh, [this, action](const CallbackArgs& cbArgs) { + _replExecutor->onEvent(evh, [this, action](const CallbackArgs& cbArgs) { _performPostMemberStateUpdateAction(action); }); } else { _performPostMemberStateUpdateAction(action); } - _replExecutor.signalEvent(finishedEvent); + _replExecutor->signalEvent(finishedEvent); } Status ReplicationCoordinatorImpl::processReplSetInitiate(OperationContext* opCtx, @@ -2584,7 +2584,7 @@ Status ReplicationCoordinatorImpl::processReplSetInitiate(OperationContext* opCt log() << "replSetInitiate config object with " << newConfig.getNumMembers() << " members parses ok"; - status = checkQuorumForInitiate(&_replExecutor, newConfig, myIndex.getValue()); + status = checkQuorumForInitiate(_replExecutor.get(), newConfig, myIndex.getValue()); if (!status.isOK()) { error() << "replSetInitiate failed; " << status; @@ -2788,16 +2788,16 @@ void ReplicationCoordinatorImpl::_performPostMemberStateUpdateAction( void ReplicationCoordinatorImpl::_scanOpTimeForCatchUp_inlock() { auto scanner = std::make_shared<FreshnessScanner>(); - auto scanStartTime = _replExecutor.now(); - auto evhStatus = - scanner->start(&_replExecutor, _rsConfig, _selfIndex, _rsConfig.getCatchUpTimeoutPeriod()); + auto scanStartTime = _replExecutor->now(); + auto evhStatus = scanner->start( + _replExecutor.get(), _rsConfig, _selfIndex, _rsConfig.getCatchUpTimeoutPeriod()); if (evhStatus == ErrorCodes::ShutdownInProgress) { _finishCatchingUpOplog_inlock(); return; } fassertStatusOK(40254, evhStatus.getStatus()); long long term = _topCoord->getTerm(); - _replExecutor.onEvent( + _replExecutor->onEvent( evhStatus.getValue(), [this, scanner, scanStartTime, term](const CallbackArgs& cbData) { stdx::lock_guard<stdx::mutex> lk(_mutex); if (cbData.status == ErrorCodes::CallbackCanceled) { @@ -2805,7 +2805,7 @@ void ReplicationCoordinatorImpl::_scanOpTimeForCatchUp_inlock() { return; } auto totalTimeout = _rsConfig.getCatchUpTimeoutPeriod(); - auto catchUpTimeout = totalTimeout - (_replExecutor.now() - scanStartTime); + auto catchUpTimeout = totalTimeout - (_replExecutor->now() - scanStartTime); _catchUpOplogToLatest_inlock(*scanner, catchUpTimeout, term); }); } @@ -2864,7 +2864,7 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca } }; // Schedule the timeout callback. It may signal after we have already caught up. - _replExecutor.scheduleWorkAt(_replExecutor.now() + timeout, timeoutCB); + _replExecutor->scheduleWorkAt(_replExecutor->now() + timeout, timeoutCB); } void ReplicationCoordinatorImpl::_finishCatchingUpOplog_inlock() { @@ -2888,7 +2888,7 @@ Status ReplicationCoordinatorImpl::processReplSetFresh(const ReplSetFreshArgs& a stdx::lock_guard<stdx::mutex> lk(_mutex); Status result(ErrorCodes::InternalError, "didn't set status in prepareFreshResponse"); _topCoord->prepareFreshResponse( - args, _replExecutor.now(), _getMyLastAppliedOpTime_inlock(), resultObj, &result); + args, _replExecutor->now(), _getMyLastAppliedOpTime_inlock(), resultObj, &result); return result; } @@ -2897,7 +2897,7 @@ Status ReplicationCoordinatorImpl::processReplSetElect(const ReplSetElectArgs& a stdx::lock_guard<stdx::mutex> lk(_mutex); Status result = Status(ErrorCodes::InternalError, "status not set by callback"); _topCoord->prepareElectResponse( - args, _replExecutor.now(), _getMyLastAppliedOpTime_inlock(), responseObj, &result); + args, _replExecutor->now(), _getMyLastAppliedOpTime_inlock(), responseObj, &result); return result; } @@ -2910,7 +2910,7 @@ ReplicationCoordinatorImpl::_setCurrentRSConfig_inlock(const ReplSetConfig& newC // Must get this before changing our config. OpTime myOptime = _getMyLastAppliedOpTime_inlock(); - _topCoord->updateConfig(newConfig, myIndex, _replExecutor.now(), myOptime); + _topCoord->updateConfig(newConfig, myIndex, _replExecutor->now(), myOptime); const ReplSetConfig oldConfig = _rsConfig; _rsConfig = newConfig; _protVersion.store(_rsConfig.getProtocolVersion()); @@ -3150,7 +3150,7 @@ HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource(const OpTime& lastOp ? TopologyCoordinator::ChainingPreference::kAllowChaining : TopologyCoordinator::ChainingPreference::kUseConfiguration; HostAndPort newSyncSource = - _topCoord->chooseNewSyncSource(_replExecutor.now(), lastOpTimeFetched, chainingPreference); + _topCoord->chooseNewSyncSource(_replExecutor->now(), lastOpTimeFetched, chainingPreference); // If we lost our sync source, schedule new heartbeats immediately to update our knowledge // of other members's state, allowing us to make informed sync source decisions. @@ -3168,7 +3168,7 @@ void ReplicationCoordinatorImpl::_unblacklistSyncSource( return; stdx::lock_guard<stdx::mutex> lock(_mutex); - _topCoord->unblacklistSyncSource(host, _replExecutor.now()); + _topCoord->unblacklistSyncSource(host, _replExecutor->now()); } void ReplicationCoordinatorImpl::blacklistSyncSource(const HostAndPort& host, Date_t until) { @@ -3209,7 +3209,7 @@ bool ReplicationCoordinatorImpl::shouldChangeSyncSource( _getMyLastAppliedOpTime_inlock(), replMetadata, oqMetadata, - _replExecutor.now()); + _replExecutor->now()); } void ReplicationCoordinatorImpl::_updateLastCommittedOpTime_inlock() { @@ -3397,7 +3397,7 @@ Status ReplicationCoordinatorImpl::processHeartbeatV1(const ReplSetHeartbeatArgs stdx::lock_guard<stdx::mutex> lk(_mutex); auto senderHost(args.getSenderHost()); - const Date_t now = _replExecutor.now(); + const Date_t now = _replExecutor->now(); result = _topCoord->prepareHeartbeatResponseV1(now, args, _settings.ourSetName(), @@ -3427,7 +3427,7 @@ Status ReplicationCoordinatorImpl::processHeartbeatV1(const ReplSetHeartbeatArgs if (!slaveInfo) { return result; } - slaveInfo->lastUpdate = _replExecutor.now(); + slaveInfo->lastUpdate = _replExecutor->now(); slaveInfo->down = false; } return result; @@ -3439,7 +3439,7 @@ void ReplicationCoordinatorImpl::summarizeAsHtml(ReplSetHtmlSummary* output) { // TODO(dannenberg) consider putting both optimes into the htmlsummary. output->setSelfOptime(_getMyLastAppliedOpTime_inlock()); output->setSelfUptime(time(0) - serverGlobalParams.started); - output->setNow(_replExecutor.now()); + output->setNow(_replExecutor->now()); _topCoord->summarizeAsHtml(output); } @@ -3456,10 +3456,10 @@ EventHandle ReplicationCoordinatorImpl::updateTerm_forTest( EventHandle finishEvh; finishEvh = _updateTerm_inlock(term, updateResult); if (!finishEvh) { - auto finishEvhStatus = _replExecutor.makeEvent(); + auto finishEvhStatus = _replExecutor->makeEvent(); invariantOK(finishEvhStatus.getStatus()); finishEvh = finishEvhStatus.getValue(); - _replExecutor.signalEvent(finishEvh); + _replExecutor->signalEvent(finishEvh); } return finishEvh; } @@ -3487,7 +3487,7 @@ Status ReplicationCoordinatorImpl::updateTerm(OperationContext* opCtx, long long // Wait for potential stepdown to finish. if (finishEvh.isValid()) { - _replExecutor.waitForEvent(finishEvh); + _replExecutor->waitForEvent(finishEvh); } if (updateTermResult == TopologyCoordinator::UpdateTermResult::kUpdatedTerm || updateTermResult == TopologyCoordinator::UpdateTermResult::kTriggerStepDown) { @@ -3504,7 +3504,7 @@ EventHandle ReplicationCoordinatorImpl::_updateTerm_inlock( return EventHandle(); } - auto now = _replExecutor.now(); + auto now = _replExecutor->now(); TopologyCoordinator::UpdateTermResult localUpdateTermResult = _topCoord->updateTerm(term, now); { if (localUpdateTermResult == TopologyCoordinator::UpdateTermResult::kUpdatedTerm) { @@ -3612,13 +3612,13 @@ void ReplicationCoordinatorImpl::_dropAllSnapshots_inlock() { void ReplicationCoordinatorImpl::waitForElectionFinish_forTest() { if (_electionFinishedEvent.isValid()) { - _replExecutor.waitForEvent(_electionFinishedEvent); + _replExecutor->waitForEvent(_electionFinishedEvent); } } void ReplicationCoordinatorImpl::waitForElectionDryRunFinish_forTest() { if (_electionDryRunFinishedEvent.isValid()) { - _replExecutor.waitForEvent(_electionDryRunFinishedEvent); + _replExecutor->waitForEvent(_electionDryRunFinishedEvent); } } @@ -3633,14 +3633,14 @@ EventHandle ReplicationCoordinatorImpl::_resetElectionInfoOnProtocolVersionUpgra invariant(newConfig.getProtocolVersion() == 1); // Write last vote - auto evhStatus = _replExecutor.makeEvent(); + auto evhStatus = _replExecutor->makeEvent(); if (evhStatus.getStatus() == ErrorCodes::ShutdownInProgress) { return {}; } invariant(evhStatus.isOK()); auto evh = evhStatus.getValue(); - auto cbStatus = _replExecutor.scheduleDBWork([this, evh](const CallbackArgs& cbData) { + auto cbStatus = _replExecutor->scheduleDBWork([this, evh](const CallbackArgs& cbData) { if (cbData.status == ErrorCodes::CallbackCanceled) { return; } @@ -3649,7 +3649,7 @@ EventHandle ReplicationCoordinatorImpl::_resetElectionInfoOnProtocolVersionUpgra LastVote lastVote{OpTime::kInitialTerm, -1}; auto status = _externalState->storeLocalLastVoteDocument(cbData.opCtx, lastVote); invariant(status.isOK()); - _replExecutor.signalEvent(evh); + _replExecutor->signalEvent(evh); }); if (cbStatus.getStatus() == ErrorCodes::ShutdownInProgress) { return {}; @@ -3660,34 +3660,34 @@ EventHandle ReplicationCoordinatorImpl::_resetElectionInfoOnProtocolVersionUpgra CallbackHandle ReplicationCoordinatorImpl::_scheduleWork(const CallbackFn& work) { auto scheduleFn = [this](const CallbackFn& workWrapped) { - return _replExecutor.scheduleWork(workWrapped); + return _replExecutor->scheduleWork(workWrapped); }; return _wrapAndScheduleWork(scheduleFn, work); } CallbackHandle ReplicationCoordinatorImpl::_scheduleWorkAt(Date_t when, const CallbackFn& work) { auto scheduleFn = [this, when](const CallbackFn& workWrapped) { - return _replExecutor.scheduleWorkAt(when, workWrapped); + return _replExecutor->scheduleWorkAt(when, workWrapped); }; return _wrapAndScheduleWork(scheduleFn, work); } void ReplicationCoordinatorImpl::_scheduleWorkAndWaitForCompletion(const CallbackFn& work) { if (auto handle = _scheduleWork(work)) { - _replExecutor.wait(handle); + _replExecutor->wait(handle); } } void ReplicationCoordinatorImpl::_scheduleWorkAtAndWaitForCompletion(Date_t when, const CallbackFn& work) { if (auto handle = _scheduleWorkAt(when, work)) { - _replExecutor.wait(handle); + _replExecutor->wait(handle); } } CallbackHandle ReplicationCoordinatorImpl::_scheduleDBWork(const CallbackFn& work) { auto scheduleFn = [this](const CallbackFn& workWrapped) { - return _replExecutor.scheduleDBWork(workWrapped); + return _replExecutor->scheduleDBWork(workWrapped); }; return _wrapAndScheduleWork(scheduleFn, work); } @@ -3709,7 +3709,7 @@ CallbackHandle ReplicationCoordinatorImpl::_wrapAndScheduleWork(ScheduleFn sched } EventHandle ReplicationCoordinatorImpl::_makeEvent() { - auto eventResult = this->_replExecutor.makeEvent(); + auto eventResult = this->_replExecutor->makeEvent(); if (eventResult.getStatus() == ErrorCodes::ShutdownInProgress) { return EventHandle(); } @@ -3763,7 +3763,7 @@ Status ReplicationCoordinatorImpl::stepUpIfEligible() { finishEvent = _electionFinishedEvent; } if (finishEvent.isValid()) { - _replExecutor.waitForEvent(finishEvent); + _replExecutor->waitForEvent(finishEvent); } auto state = getMemberState(); if (state.primary()) { |