summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/replication_coordinator_impl.cpp
diff options
context:
space:
mode:
authorAndy Schwerin <Andy Schwerin schwerin@mongodb.com>2017-04-13 17:05:04 -0400
committerAndy Schwerin <Andy Schwerin schwerin@mongodb.com>2017-04-19 11:50:48 -0400
commit8cd017f699acba04886b1a7b39f2f15d93c6ff20 (patch)
treeeccab0ae05bb711c36e5dd541f21e7b84de683a2 /src/mongo/db/repl/replication_coordinator_impl.cpp
parentd54dabeb93b36e4ce2a67ea35dba1f57e1298f4f (diff)
downloadmongo-8cd017f699acba04886b1a7b39f2f15d93c6ff20.tar.gz
SERVER-28865 Keep a unique pointer to the executor instead of a value.
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_impl.cpp')
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp146
1 files changed, 73 insertions, 73 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index c6213308b17..f42e712ffd3 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -314,7 +314,7 @@ ReplicationCoordinatorImpl::ReplicationCoordinatorImpl(
_settings(settings),
_replMode(getReplicationModeFromSettings(settings)),
_topCoord(std::move(topCoord)),
- _replExecutor(std::move(network), prngSeed),
+ _replExecutor(stdx::make_unique<ReplicationExecutor>(std::move(network), prngSeed)),
_externalState(std::move(externalState)),
_inShutdown(false),
_memberState(MemberState::RS_STARTUP),
@@ -362,7 +362,7 @@ void ReplicationCoordinatorImpl::_waitForStartUpComplete() {
handle = _finishLoadLocalConfigCbh;
}
if (handle.isValid()) {
- _replExecutor.wait(handle);
+ _replExecutor->wait(handle);
}
}
@@ -409,11 +409,11 @@ LogicalTime ReplicationCoordinatorImpl::_getCurrentCommittedLogicalTime_inlock()
void ReplicationCoordinatorImpl::appendDiagnosticBSON(mongo::BSONObjBuilder* bob) {
BSONObjBuilder eBuilder(bob->subobjStart("executor"));
- _replExecutor.appendDiagnosticBSON(&eBuilder);
+ _replExecutor->appendDiagnosticBSON(&eBuilder);
}
void ReplicationCoordinatorImpl::appendConnectionStats(executor::ConnectionPoolStats* stats) const {
- _replExecutor.appendConnectionStats(stats);
+ _replExecutor->appendConnectionStats(stats);
}
bool ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* opCtx) {
@@ -684,7 +684,7 @@ void ReplicationCoordinatorImpl::startup(OperationContext* opCtx) {
return;
}
- _replExecutor.startup();
+ _replExecutor->startup();
{
stdx::lock_guard<stdx::mutex> lk(_mutex);
@@ -753,8 +753,8 @@ void ReplicationCoordinatorImpl::shutdown(OperationContext* opCtx) {
initialSyncerCopy.reset();
}
_externalState->shutdown(opCtx);
- _replExecutor.shutdown();
- _replExecutor.join();
+ _replExecutor->shutdown();
+ _replExecutor->join();
}
const ReplSettings& ReplicationCoordinatorImpl::getSettings() const {
@@ -811,7 +811,7 @@ void ReplicationCoordinatorImpl::clearSyncSourceBlacklist() {
executor::TaskExecutor::EventHandle ReplicationCoordinatorImpl::setFollowerMode_nonBlocking(
const MemberState& newState, bool* success) {
- auto finishedSettingFollowerMode = _replExecutor.makeEvent();
+ auto finishedSettingFollowerMode = _replExecutor->makeEvent();
if (finishedSettingFollowerMode.getStatus() == ErrorCodes::ShutdownInProgress) {
return {};
}
@@ -823,7 +823,7 @@ executor::TaskExecutor::EventHandle ReplicationCoordinatorImpl::setFollowerMode_
bool ReplicationCoordinatorImpl::setFollowerMode(const MemberState& newState) {
bool success = false;
if (auto eventHandle = setFollowerMode_nonBlocking(newState, &success)) {
- _replExecutor.waitForEvent(eventHandle);
+ _replExecutor->waitForEvent(eventHandle);
}
return success;
}
@@ -836,12 +836,12 @@ void ReplicationCoordinatorImpl::_setFollowerModeFinish(
if (newState == _topCoord->getMemberState()) {
*success = true;
- _replExecutor.signalEvent(finishedSettingFollowerMode);
+ _replExecutor->signalEvent(finishedSettingFollowerMode);
return;
}
if (_topCoord->getRole() == TopologyCoordinator::Role::leader) {
*success = false;
- _replExecutor.signalEvent(finishedSettingFollowerMode);
+ _replExecutor->signalEvent(finishedSettingFollowerMode);
return;
}
@@ -850,7 +850,7 @@ void ReplicationCoordinatorImpl::_setFollowerModeFinish(
// we know that newState != RS_SECONDARY because we would have returned early, above if
// the old and new state were equal. So, try again after the election is over to
// finish setting the follower mode.
- _replExecutor.onEvent(
+ _replExecutor->onEvent(
electionFinishedEvent,
_wrapAsCallbackFn(stdx::bind(&ReplicationCoordinatorImpl::_setFollowerModeFinish,
this,
@@ -866,7 +866,7 @@ void ReplicationCoordinatorImpl::_setFollowerModeFinish(
lk.unlock();
_performPostMemberStateUpdateAction(action);
*success = true;
- _replExecutor.signalEvent(finishedSettingFollowerMode);
+ _replExecutor->signalEvent(finishedSettingFollowerMode);
}
ReplicationCoordinator::ApplierState ReplicationCoordinatorImpl::getApplierState() {
@@ -1004,7 +1004,7 @@ void ReplicationCoordinatorImpl::_addSlaveInfo_inlock(const SlaveInfo& slaveInfo
void ReplicationCoordinatorImpl::_updateSlaveInfoAppliedOpTime_inlock(SlaveInfo* slaveInfo,
const OpTime& opTime) {
slaveInfo->lastAppliedOpTime = opTime;
- slaveInfo->lastUpdate = _replExecutor.now();
+ slaveInfo->lastUpdate = _replExecutor->now();
slaveInfo->down = false;
_updateLastCommittedOpTime_inlock();
@@ -1024,7 +1024,7 @@ void ReplicationCoordinatorImpl::_updateSlaveInfoDurableOpTime_inlock(SlaveInfo*
return;
}
slaveInfo->lastDurableOpTime = opTime;
- slaveInfo->lastUpdate = _replExecutor.now();
+ slaveInfo->lastUpdate = _replExecutor->now();
slaveInfo->down = false;
_updateLastCommittedOpTime_inlock();
@@ -1118,7 +1118,7 @@ Status ReplicationCoordinatorImpl::setLastOptimeForSlave(const OID& rid, const T
void ReplicationCoordinatorImpl::setMyHeartbeatMessage(const std::string& msg) {
stdx::unique_lock<stdx::mutex> lock(_mutex);
- _topCoord->setMyHeartbeatMessage(_replExecutor.now(), msg);
+ _topCoord->setMyHeartbeatMessage(_replExecutor->now(), msg);
}
void ReplicationCoordinatorImpl::setMyLastAppliedOpTimeForward(const OpTime& opTime) {
@@ -1466,7 +1466,7 @@ Status ReplicationCoordinatorImpl::_setLastOptime_inlock(
// Update liveness for this node.
- slaveInfo->lastUpdate = _replExecutor.now();
+ slaveInfo->lastUpdate = _replExecutor->now();
slaveInfo->down = false;
_cancelAndRescheduleLivenessUpdate_inlock(args.memberId);
return Status::OK();
@@ -1538,7 +1538,7 @@ Status ReplicationCoordinatorImpl::_setLastOptime_inlock(const UpdatePositionArg
}
// Update liveness for this node.
- slaveInfo->lastUpdate = _replExecutor.now();
+ slaveInfo->lastUpdate = _replExecutor->now();
slaveInfo->down = false;
_cancelAndRescheduleLivenessUpdate_inlock(args.memberId);
return Status::OK();
@@ -1783,7 +1783,7 @@ Status ReplicationCoordinatorImpl::stepDown(OperationContext* opCtx,
const Milliseconds& waitTime,
const Milliseconds& stepdownTime) {
- const Date_t startTime = _replExecutor.now();
+ const Date_t startTime = _replExecutor->now();
const Date_t stepDownUntil = startTime + stepdownTime;
const Date_t waitUntil = startTime + waitTime;
@@ -1842,7 +1842,7 @@ bool ReplicationCoordinatorImpl::_tryToStepDown(const Date_t waitUntil,
uasserted(ErrorCodes::NotMaster,
"Already stepped down from primary while processing step down request");
}
- const Date_t now = _replExecutor.now();
+ const Date_t now = _replExecutor->now();
if (now >= stepDownUntil) {
uasserted(ErrorCodes::ExceededTimeLimit,
"By the time we were ready to step down, we were already past the "
@@ -1880,7 +1880,7 @@ void ReplicationCoordinatorImpl::_handleTimePassing(
bool wonSingleNodeElection = [this]() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- return _topCoord->becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(_replExecutor.now());
+ return _topCoord->becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(_replExecutor->now());
}();
if (wonSingleNodeElection) {
@@ -2052,10 +2052,10 @@ int ReplicationCoordinatorImpl::_getMyId_inlock() const {
Status ReplicationCoordinatorImpl::resyncData(OperationContext* opCtx, bool waitUntilCompleted) {
_stopDataReplication(opCtx);
- auto finishedEvent = uassertStatusOK(_replExecutor.makeEvent());
+ auto finishedEvent = uassertStatusOK(_replExecutor->makeEvent());
stdx::function<void()> f;
if (waitUntilCompleted)
- f = [&finishedEvent, this]() { _replExecutor.signalEvent(finishedEvent); };
+ f = [&finishedEvent, this]() { _replExecutor->signalEvent(finishedEvent); };
{
stdx::lock_guard<stdx::mutex> lk(_mutex);
@@ -2064,7 +2064,7 @@ Status ReplicationCoordinatorImpl::resyncData(OperationContext* opCtx, bool wait
// unlock before calling _startDataReplication().
_startDataReplication(opCtx, f);
if (waitUntilCompleted) {
- _replExecutor.waitForEvent(finishedEvent);
+ _replExecutor->waitForEvent(finishedEvent);
}
return Status::OK();
}
@@ -2139,7 +2139,7 @@ Status ReplicationCoordinatorImpl::processReplSetGetStatus(
Status result(ErrorCodes::InternalError, "didn't set status in prepareStatusResponse");
_topCoord->prepareStatusResponse(
TopologyCoordinator::ReplSetStatusArgs{
- _replExecutor.now(),
+ _replExecutor->now(),
static_cast<unsigned>(time(0) - serverGlobalParams.started),
_getMyLastAppliedOpTime_inlock(),
_getMyLastDurableOpTime_inlock(),
@@ -2222,7 +2222,7 @@ void ReplicationCoordinatorImpl::processReplSetMetadata(const rpc::ReplSetMetada
}
if (evh) {
- _replExecutor.waitForEvent(evh);
+ _replExecutor->waitForEvent(evh);
}
}
@@ -2305,7 +2305,7 @@ Status ReplicationCoordinatorImpl::processReplSetSyncFrom(OperationContext* opCt
Status ReplicationCoordinatorImpl::processReplSetFreeze(int secs, BSONObjBuilder* resultObj) {
auto result = [=]() {
stdx::lock_guard<stdx::mutex> lock(_mutex);
- return _topCoord->prepareFreezeResponse(_replExecutor.now(), secs, resultObj);
+ return _topCoord->prepareFreezeResponse(_replExecutor->now(), secs, resultObj);
}();
if (!result.isOK()) {
return result.getStatus();
@@ -2335,7 +2335,7 @@ Status ReplicationCoordinatorImpl::processHeartbeat(const ReplSetHeartbeatArgs&
stdx::lock_guard<stdx::mutex> lk(_mutex);
- const Date_t now = _replExecutor.now();
+ const Date_t now = _replExecutor->now();
Status result = _topCoord->prepareHeartbeatResponse(now,
args,
_settings.ourSetName(),
@@ -2446,7 +2446,7 @@ Status ReplicationCoordinatorImpl::processReplSetReconfig(OperationContext* opCt
<< " members parses ok";
if (!args.force) {
- status = checkQuorumForReconfig(&_replExecutor, newConfig, myIndex.getValue());
+ status = checkQuorumForReconfig(_replExecutor.get(), newConfig, myIndex.getValue());
if (!status.isOK()) {
error() << "replSetReconfig failed; " << status;
return status;
@@ -2459,7 +2459,7 @@ Status ReplicationCoordinatorImpl::processReplSetReconfig(OperationContext* opCt
return status;
}
- auto reconfigFinished = uassertStatusOK(_replExecutor.makeEvent());
+ auto reconfigFinished = uassertStatusOK(_replExecutor->makeEvent());
const auto reconfigFinishFn =
[ this, newConfig, myIndex = myIndex.getValue(), reconfigFinished ](
const executor::TaskExecutor::CallbackArgs& cbData) {
@@ -2476,9 +2476,9 @@ Status ReplicationCoordinatorImpl::processReplSetReconfig(OperationContext* opCt
StatusWith<CallbackHandle> cbhStatus(ErrorCodes::InternalError,
"reconfigFinishFn hasn't been scheduled");
if (args.force) {
- cbhStatus = _replExecutor.scheduleWorkWithGlobalExclusiveLock(reconfigFinishFn);
+ cbhStatus = _replExecutor->scheduleWorkWithGlobalExclusiveLock(reconfigFinishFn);
} else {
- cbhStatus = _replExecutor.scheduleWork(reconfigFinishFn);
+ cbhStatus = _replExecutor->scheduleWork(reconfigFinishFn);
}
if (cbhStatus.getStatus() == ErrorCodes::ShutdownInProgress) {
return cbhStatus.getStatus();
@@ -2487,7 +2487,7 @@ Status ReplicationCoordinatorImpl::processReplSetReconfig(OperationContext* opCt
fassert(18824, cbhStatus.getStatus());
configStateGuard.Dismiss();
- _replExecutor.waitForEvent(reconfigFinished);
+ _replExecutor->waitForEvent(reconfigFinished);
return Status::OK();
}
@@ -2503,12 +2503,12 @@ void ReplicationCoordinatorImpl::_finishReplSetReconfig(
// Do not conduct an election during a reconfig, as the node may not be electable post-reconfig.
if (auto electionFinishedEvent = _cancelElectionIfNeeded_inTopoLock()) {
// Wait for the election to complete and the node's Role to be set to follower.
- _replExecutor.onEvent(electionFinishedEvent,
- stdx::bind(&ReplicationCoordinatorImpl::_finishReplSetReconfig,
- this,
- newConfig,
- myIndex,
- finishedEvent));
+ _replExecutor->onEvent(electionFinishedEvent,
+ stdx::bind(&ReplicationCoordinatorImpl::_finishReplSetReconfig,
+ this,
+ newConfig,
+ myIndex,
+ finishedEvent));
return;
}
@@ -2522,13 +2522,13 @@ void ReplicationCoordinatorImpl::_finishReplSetReconfig(
auto evh = _resetElectionInfoOnProtocolVersionUpgrade(oldConfig, newConfig);
lk.unlock();
if (evh) {
- _replExecutor.onEvent(evh, [this, action](const CallbackArgs& cbArgs) {
+ _replExecutor->onEvent(evh, [this, action](const CallbackArgs& cbArgs) {
_performPostMemberStateUpdateAction(action);
});
} else {
_performPostMemberStateUpdateAction(action);
}
- _replExecutor.signalEvent(finishedEvent);
+ _replExecutor->signalEvent(finishedEvent);
}
Status ReplicationCoordinatorImpl::processReplSetInitiate(OperationContext* opCtx,
@@ -2584,7 +2584,7 @@ Status ReplicationCoordinatorImpl::processReplSetInitiate(OperationContext* opCt
log() << "replSetInitiate config object with " << newConfig.getNumMembers()
<< " members parses ok";
- status = checkQuorumForInitiate(&_replExecutor, newConfig, myIndex.getValue());
+ status = checkQuorumForInitiate(_replExecutor.get(), newConfig, myIndex.getValue());
if (!status.isOK()) {
error() << "replSetInitiate failed; " << status;
@@ -2788,16 +2788,16 @@ void ReplicationCoordinatorImpl::_performPostMemberStateUpdateAction(
void ReplicationCoordinatorImpl::_scanOpTimeForCatchUp_inlock() {
auto scanner = std::make_shared<FreshnessScanner>();
- auto scanStartTime = _replExecutor.now();
- auto evhStatus =
- scanner->start(&_replExecutor, _rsConfig, _selfIndex, _rsConfig.getCatchUpTimeoutPeriod());
+ auto scanStartTime = _replExecutor->now();
+ auto evhStatus = scanner->start(
+ _replExecutor.get(), _rsConfig, _selfIndex, _rsConfig.getCatchUpTimeoutPeriod());
if (evhStatus == ErrorCodes::ShutdownInProgress) {
_finishCatchingUpOplog_inlock();
return;
}
fassertStatusOK(40254, evhStatus.getStatus());
long long term = _topCoord->getTerm();
- _replExecutor.onEvent(
+ _replExecutor->onEvent(
evhStatus.getValue(), [this, scanner, scanStartTime, term](const CallbackArgs& cbData) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
if (cbData.status == ErrorCodes::CallbackCanceled) {
@@ -2805,7 +2805,7 @@ void ReplicationCoordinatorImpl::_scanOpTimeForCatchUp_inlock() {
return;
}
auto totalTimeout = _rsConfig.getCatchUpTimeoutPeriod();
- auto catchUpTimeout = totalTimeout - (_replExecutor.now() - scanStartTime);
+ auto catchUpTimeout = totalTimeout - (_replExecutor->now() - scanStartTime);
_catchUpOplogToLatest_inlock(*scanner, catchUpTimeout, term);
});
}
@@ -2864,7 +2864,7 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca
}
};
// Schedule the timeout callback. It may signal after we have already caught up.
- _replExecutor.scheduleWorkAt(_replExecutor.now() + timeout, timeoutCB);
+ _replExecutor->scheduleWorkAt(_replExecutor->now() + timeout, timeoutCB);
}
void ReplicationCoordinatorImpl::_finishCatchingUpOplog_inlock() {
@@ -2888,7 +2888,7 @@ Status ReplicationCoordinatorImpl::processReplSetFresh(const ReplSetFreshArgs& a
stdx::lock_guard<stdx::mutex> lk(_mutex);
Status result(ErrorCodes::InternalError, "didn't set status in prepareFreshResponse");
_topCoord->prepareFreshResponse(
- args, _replExecutor.now(), _getMyLastAppliedOpTime_inlock(), resultObj, &result);
+ args, _replExecutor->now(), _getMyLastAppliedOpTime_inlock(), resultObj, &result);
return result;
}
@@ -2897,7 +2897,7 @@ Status ReplicationCoordinatorImpl::processReplSetElect(const ReplSetElectArgs& a
stdx::lock_guard<stdx::mutex> lk(_mutex);
Status result = Status(ErrorCodes::InternalError, "status not set by callback");
_topCoord->prepareElectResponse(
- args, _replExecutor.now(), _getMyLastAppliedOpTime_inlock(), responseObj, &result);
+ args, _replExecutor->now(), _getMyLastAppliedOpTime_inlock(), responseObj, &result);
return result;
}
@@ -2910,7 +2910,7 @@ ReplicationCoordinatorImpl::_setCurrentRSConfig_inlock(const ReplSetConfig& newC
// Must get this before changing our config.
OpTime myOptime = _getMyLastAppliedOpTime_inlock();
- _topCoord->updateConfig(newConfig, myIndex, _replExecutor.now(), myOptime);
+ _topCoord->updateConfig(newConfig, myIndex, _replExecutor->now(), myOptime);
const ReplSetConfig oldConfig = _rsConfig;
_rsConfig = newConfig;
_protVersion.store(_rsConfig.getProtocolVersion());
@@ -3150,7 +3150,7 @@ HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource(const OpTime& lastOp
? TopologyCoordinator::ChainingPreference::kAllowChaining
: TopologyCoordinator::ChainingPreference::kUseConfiguration;
HostAndPort newSyncSource =
- _topCoord->chooseNewSyncSource(_replExecutor.now(), lastOpTimeFetched, chainingPreference);
+ _topCoord->chooseNewSyncSource(_replExecutor->now(), lastOpTimeFetched, chainingPreference);
// If we lost our sync source, schedule new heartbeats immediately to update our knowledge
// of other members's state, allowing us to make informed sync source decisions.
@@ -3168,7 +3168,7 @@ void ReplicationCoordinatorImpl::_unblacklistSyncSource(
return;
stdx::lock_guard<stdx::mutex> lock(_mutex);
- _topCoord->unblacklistSyncSource(host, _replExecutor.now());
+ _topCoord->unblacklistSyncSource(host, _replExecutor->now());
}
void ReplicationCoordinatorImpl::blacklistSyncSource(const HostAndPort& host, Date_t until) {
@@ -3209,7 +3209,7 @@ bool ReplicationCoordinatorImpl::shouldChangeSyncSource(
_getMyLastAppliedOpTime_inlock(),
replMetadata,
oqMetadata,
- _replExecutor.now());
+ _replExecutor->now());
}
void ReplicationCoordinatorImpl::_updateLastCommittedOpTime_inlock() {
@@ -3397,7 +3397,7 @@ Status ReplicationCoordinatorImpl::processHeartbeatV1(const ReplSetHeartbeatArgs
stdx::lock_guard<stdx::mutex> lk(_mutex);
auto senderHost(args.getSenderHost());
- const Date_t now = _replExecutor.now();
+ const Date_t now = _replExecutor->now();
result = _topCoord->prepareHeartbeatResponseV1(now,
args,
_settings.ourSetName(),
@@ -3427,7 +3427,7 @@ Status ReplicationCoordinatorImpl::processHeartbeatV1(const ReplSetHeartbeatArgs
if (!slaveInfo) {
return result;
}
- slaveInfo->lastUpdate = _replExecutor.now();
+ slaveInfo->lastUpdate = _replExecutor->now();
slaveInfo->down = false;
}
return result;
@@ -3439,7 +3439,7 @@ void ReplicationCoordinatorImpl::summarizeAsHtml(ReplSetHtmlSummary* output) {
// TODO(dannenberg) consider putting both optimes into the htmlsummary.
output->setSelfOptime(_getMyLastAppliedOpTime_inlock());
output->setSelfUptime(time(0) - serverGlobalParams.started);
- output->setNow(_replExecutor.now());
+ output->setNow(_replExecutor->now());
_topCoord->summarizeAsHtml(output);
}
@@ -3456,10 +3456,10 @@ EventHandle ReplicationCoordinatorImpl::updateTerm_forTest(
EventHandle finishEvh;
finishEvh = _updateTerm_inlock(term, updateResult);
if (!finishEvh) {
- auto finishEvhStatus = _replExecutor.makeEvent();
+ auto finishEvhStatus = _replExecutor->makeEvent();
invariantOK(finishEvhStatus.getStatus());
finishEvh = finishEvhStatus.getValue();
- _replExecutor.signalEvent(finishEvh);
+ _replExecutor->signalEvent(finishEvh);
}
return finishEvh;
}
@@ -3487,7 +3487,7 @@ Status ReplicationCoordinatorImpl::updateTerm(OperationContext* opCtx, long long
// Wait for potential stepdown to finish.
if (finishEvh.isValid()) {
- _replExecutor.waitForEvent(finishEvh);
+ _replExecutor->waitForEvent(finishEvh);
}
if (updateTermResult == TopologyCoordinator::UpdateTermResult::kUpdatedTerm ||
updateTermResult == TopologyCoordinator::UpdateTermResult::kTriggerStepDown) {
@@ -3504,7 +3504,7 @@ EventHandle ReplicationCoordinatorImpl::_updateTerm_inlock(
return EventHandle();
}
- auto now = _replExecutor.now();
+ auto now = _replExecutor->now();
TopologyCoordinator::UpdateTermResult localUpdateTermResult = _topCoord->updateTerm(term, now);
{
if (localUpdateTermResult == TopologyCoordinator::UpdateTermResult::kUpdatedTerm) {
@@ -3612,13 +3612,13 @@ void ReplicationCoordinatorImpl::_dropAllSnapshots_inlock() {
void ReplicationCoordinatorImpl::waitForElectionFinish_forTest() {
if (_electionFinishedEvent.isValid()) {
- _replExecutor.waitForEvent(_electionFinishedEvent);
+ _replExecutor->waitForEvent(_electionFinishedEvent);
}
}
void ReplicationCoordinatorImpl::waitForElectionDryRunFinish_forTest() {
if (_electionDryRunFinishedEvent.isValid()) {
- _replExecutor.waitForEvent(_electionDryRunFinishedEvent);
+ _replExecutor->waitForEvent(_electionDryRunFinishedEvent);
}
}
@@ -3633,14 +3633,14 @@ EventHandle ReplicationCoordinatorImpl::_resetElectionInfoOnProtocolVersionUpgra
invariant(newConfig.getProtocolVersion() == 1);
// Write last vote
- auto evhStatus = _replExecutor.makeEvent();
+ auto evhStatus = _replExecutor->makeEvent();
if (evhStatus.getStatus() == ErrorCodes::ShutdownInProgress) {
return {};
}
invariant(evhStatus.isOK());
auto evh = evhStatus.getValue();
- auto cbStatus = _replExecutor.scheduleDBWork([this, evh](const CallbackArgs& cbData) {
+ auto cbStatus = _replExecutor->scheduleDBWork([this, evh](const CallbackArgs& cbData) {
if (cbData.status == ErrorCodes::CallbackCanceled) {
return;
}
@@ -3649,7 +3649,7 @@ EventHandle ReplicationCoordinatorImpl::_resetElectionInfoOnProtocolVersionUpgra
LastVote lastVote{OpTime::kInitialTerm, -1};
auto status = _externalState->storeLocalLastVoteDocument(cbData.opCtx, lastVote);
invariant(status.isOK());
- _replExecutor.signalEvent(evh);
+ _replExecutor->signalEvent(evh);
});
if (cbStatus.getStatus() == ErrorCodes::ShutdownInProgress) {
return {};
@@ -3660,34 +3660,34 @@ EventHandle ReplicationCoordinatorImpl::_resetElectionInfoOnProtocolVersionUpgra
CallbackHandle ReplicationCoordinatorImpl::_scheduleWork(const CallbackFn& work) {
auto scheduleFn = [this](const CallbackFn& workWrapped) {
- return _replExecutor.scheduleWork(workWrapped);
+ return _replExecutor->scheduleWork(workWrapped);
};
return _wrapAndScheduleWork(scheduleFn, work);
}
CallbackHandle ReplicationCoordinatorImpl::_scheduleWorkAt(Date_t when, const CallbackFn& work) {
auto scheduleFn = [this, when](const CallbackFn& workWrapped) {
- return _replExecutor.scheduleWorkAt(when, workWrapped);
+ return _replExecutor->scheduleWorkAt(when, workWrapped);
};
return _wrapAndScheduleWork(scheduleFn, work);
}
void ReplicationCoordinatorImpl::_scheduleWorkAndWaitForCompletion(const CallbackFn& work) {
if (auto handle = _scheduleWork(work)) {
- _replExecutor.wait(handle);
+ _replExecutor->wait(handle);
}
}
void ReplicationCoordinatorImpl::_scheduleWorkAtAndWaitForCompletion(Date_t when,
const CallbackFn& work) {
if (auto handle = _scheduleWorkAt(when, work)) {
- _replExecutor.wait(handle);
+ _replExecutor->wait(handle);
}
}
CallbackHandle ReplicationCoordinatorImpl::_scheduleDBWork(const CallbackFn& work) {
auto scheduleFn = [this](const CallbackFn& workWrapped) {
- return _replExecutor.scheduleDBWork(workWrapped);
+ return _replExecutor->scheduleDBWork(workWrapped);
};
return _wrapAndScheduleWork(scheduleFn, work);
}
@@ -3709,7 +3709,7 @@ CallbackHandle ReplicationCoordinatorImpl::_wrapAndScheduleWork(ScheduleFn sched
}
EventHandle ReplicationCoordinatorImpl::_makeEvent() {
- auto eventResult = this->_replExecutor.makeEvent();
+ auto eventResult = this->_replExecutor->makeEvent();
if (eventResult.getStatus() == ErrorCodes::ShutdownInProgress) {
return EventHandle();
}
@@ -3763,7 +3763,7 @@ Status ReplicationCoordinatorImpl::stepUpIfEligible() {
finishEvent = _electionFinishedEvent;
}
if (finishEvent.isValid()) {
- _replExecutor.waitForEvent(finishEvent);
+ _replExecutor->waitForEvent(finishEvent);
}
auto state = getMemberState();
if (state.primary()) {