diff options
Diffstat (limited to 'src/mongo/db')
25 files changed, 670 insertions, 1239 deletions
diff --git a/src/mongo/db/repl/check_quorum_for_config_change.cpp b/src/mongo/db/repl/check_quorum_for_config_change.cpp index ae1a1f9c7fa..583b3df37d8 100644 --- a/src/mongo/db/repl/check_quorum_for_config_change.cpp +++ b/src/mongo/db/repl/check_quorum_for_config_change.cpp @@ -282,8 +282,8 @@ Status checkQuorumGeneral(ReplicationExecutor* executor, const ReplicaSetConfig& rsConfig, const int myIndex) { QuorumChecker checker(&rsConfig, myIndex); - ScatterGatherRunner runner(&checker); - Status status = runner.run(executor); + ScatterGatherRunner runner(&checker, executor); + Status status = runner.run(); if (!status.isOK()) { return status; } diff --git a/src/mongo/db/repl/elect_cmd_runner.cpp b/src/mongo/db/repl/elect_cmd_runner.cpp index 15bf827b1da..dc2bb2e240d 100644 --- a/src/mongo/db/repl/elect_cmd_runner.cpp +++ b/src/mongo/db/repl/elect_cmd_runner.cpp @@ -131,16 +131,15 @@ StatusWith<ReplicationExecutor::EventHandle> ElectCmdRunner::start( ReplicationExecutor* executor, const ReplicaSetConfig& currentConfig, int selfIndex, - const std::vector<HostAndPort>& targets, - const stdx::function<void()>& onCompletion) { + const std::vector<HostAndPort>& targets) { _algorithm.reset(new Algorithm(currentConfig, selfIndex, targets, OID::gen())); - _runner.reset(new ScatterGatherRunner(_algorithm.get())); - return _runner->start(executor, onCompletion); + _runner.reset(new ScatterGatherRunner(_algorithm.get(), executor)); + return _runner->start(); } -void ElectCmdRunner::cancel(ReplicationExecutor* executor) { +void ElectCmdRunner::cancel() { _isCanceled = true; - _runner->cancel(executor); + _runner->cancel(); } int ElectCmdRunner::getReceivedVotes() const { diff --git a/src/mongo/db/repl/elect_cmd_runner.h b/src/mongo/db/repl/elect_cmd_runner.h index ad12703b68e..32539b2cd26 100644 --- a/src/mongo/db/repl/elect_cmd_runner.h +++ b/src/mongo/db/repl/elect_cmd_runner.h @@ -90,20 +90,15 @@ public: * * Returned handle can be used to schedule a callback when the process is complete. */ - StatusWith<ReplicationExecutor::EventHandle> start( - ReplicationExecutor* executor, - const ReplicaSetConfig& currentConfig, - int selfIndex, - const std::vector<HostAndPort>& targets, - const stdx::function<void()>& onCompletion = stdx::function<void()>()); + StatusWith<ReplicationExecutor::EventHandle> start(ReplicationExecutor* executor, + const ReplicaSetConfig& currentConfig, + int selfIndex, + const std::vector<HostAndPort>& targets); /** - * Informs the ElectCmdRunner to cancel further processing. The "executor" - * argument must point to the same executor passed to "start()". - * - * Like start(), this method must run in the executor context. + * Informs the ElectCmdRunner to cancel further processing. */ - void cancel(ReplicationExecutor* executor); + void cancel(); /** * Returns the number of received votes. Only valid to call after diff --git a/src/mongo/db/repl/election_winner_declarer.cpp b/src/mongo/db/repl/election_winner_declarer.cpp index 9cb43a4c54b..6f2ed66e8a6 100644 --- a/src/mongo/db/repl/election_winner_declarer.cpp +++ b/src/mongo/db/repl/election_winner_declarer.cpp @@ -105,13 +105,13 @@ StatusWith<ReplicationExecutor::EventHandle> ElectionWinnerDeclarer::start( const std::vector<HostAndPort>& targets, const stdx::function<void()>& onCompletion) { _algorithm.reset(new Algorithm(setName, winnerId, term, targets)); - _runner.reset(new ScatterGatherRunner(_algorithm.get())); - return _runner->start(executor, onCompletion); + _runner.reset(new ScatterGatherRunner(_algorithm.get(), executor)); + return _runner->start(); } -void ElectionWinnerDeclarer::cancel(ReplicationExecutor* executor) { +void ElectionWinnerDeclarer::cancel() { _isCanceled = true; - _runner->cancel(executor); + _runner->cancel(); } Status ElectionWinnerDeclarer::getStatus() const { diff --git a/src/mongo/db/repl/election_winner_declarer.h b/src/mongo/db/repl/election_winner_declarer.h index c73f2ca3419..346fc11d252 100644 --- a/src/mongo/db/repl/election_winner_declarer.h +++ b/src/mongo/db/repl/election_winner_declarer.h @@ -90,8 +90,6 @@ public: * in currentConfig, with the intention of alerting them of a new primary. * * evh can be used to schedule a callback when the process is complete. - * This function must be run in the executor, as it must be synchronous with the command - * callbacks that it schedules. * If this function returns Status::OK(), evh is then guaranteed to be signaled. **/ StatusWith<ReplicationExecutor::EventHandle> start( @@ -103,12 +101,9 @@ public: const stdx::function<void()>& onCompletion = stdx::function<void()>()); /** - * Informs the ElectionWinnerDeclarer to cancel further processing. The "executor" - * argument must point to the same executor passed to "start()". - * - * Like start(), this method must run in the executor context. + * Informs the ElectionWinnerDeclarer to cancel further processing. */ - void cancel(ReplicationExecutor* executor); + void cancel(); /** * Returns a Status from the ElectionWinnerDeclarer::algorithm which indicates what diff --git a/src/mongo/db/repl/freshness_checker.cpp b/src/mongo/db/repl/freshness_checker.cpp index 71b7b2e419b..f5bd3963e63 100644 --- a/src/mongo/db/repl/freshness_checker.cpp +++ b/src/mongo/db/repl/freshness_checker.cpp @@ -208,17 +208,16 @@ StatusWith<ReplicationExecutor::EventHandle> FreshnessChecker::start( const Timestamp& lastOpTimeApplied, const ReplicaSetConfig& currentConfig, int selfIndex, - const std::vector<HostAndPort>& targets, - const stdx::function<void()>& onCompletion) { + const std::vector<HostAndPort>& targets) { _originalConfigVersion = currentConfig.getConfigVersion(); _algorithm.reset(new Algorithm(lastOpTimeApplied, currentConfig, selfIndex, targets)); - _runner.reset(new ScatterGatherRunner(_algorithm.get())); - return _runner->start(executor, onCompletion); + _runner.reset(new ScatterGatherRunner(_algorithm.get(), executor)); + return _runner->start(); } -void FreshnessChecker::cancel(ReplicationExecutor* executor) { +void FreshnessChecker::cancel() { _isCanceled = true; - _runner->cancel(executor); + _runner->cancel(); } } // namespace repl diff --git a/src/mongo/db/repl/freshness_checker.h b/src/mongo/db/repl/freshness_checker.h index 43dd77eee69..6da248a203e 100644 --- a/src/mongo/db/repl/freshness_checker.h +++ b/src/mongo/db/repl/freshness_checker.h @@ -116,25 +116,18 @@ public: * in currentConfig, with the intention of determining whether the current node * is freshest. * evh can be used to schedule a callback when the process is complete. - * This function must be run in the executor, as it must be synchronous with the command - * callbacks that it schedules. * If this function returns Status::OK(), evh is then guaranteed to be signaled. **/ - StatusWith<ReplicationExecutor::EventHandle> start( - ReplicationExecutor* executor, - const Timestamp& lastOpTimeApplied, - const ReplicaSetConfig& currentConfig, - int selfIndex, - const std::vector<HostAndPort>& targets, - const stdx::function<void()>& onCompletion = stdx::function<void()>()); + StatusWith<ReplicationExecutor::EventHandle> start(ReplicationExecutor* executor, + const Timestamp& lastOpTimeApplied, + const ReplicaSetConfig& currentConfig, + int selfIndex, + const std::vector<HostAndPort>& targets); /** - * Informs the freshness checker to cancel further processing. The "executor" - * argument must point to the same executor passed to "start()". - * - * Like start(), this method must run in the executor context. + * Informs the freshness checker to cancel further processing. */ - void cancel(ReplicationExecutor* executor); + void cancel(); /** * Returns true if cancel() was called on this instance. diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 068bb49ef51..a387b98e863 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -80,6 +80,8 @@ #include "mongo/util/time_support.h" #include "mongo/util/timer.h" +#include "mongo/util/stacktrace.h" + namespace mongo { namespace repl { @@ -90,6 +92,9 @@ using EventHandle = executor::TaskExecutor::EventHandle; namespace { +Status shutDownInProgressStatus(ErrorCodes::ShutdownInProgress, + "replication system is shutting down"); + void lockAndCall(stdx::unique_lock<stdx::mutex>* lk, const stdx::function<void()>& fn) { if (!lk->owns_lock()) { lk->lock(); @@ -352,18 +357,13 @@ void ReplicationCoordinatorImpl::appendConnectionStats(executor::ConnectionPoolS _replExecutor.appendConnectionStats(stats); } -void ReplicationCoordinatorImpl::_updateLastVote(const LastVote& lastVote) { - _topCoord->loadLastVote(lastVote); -} - bool ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* txn) { StatusWith<LastVote> lastVote = _externalState->loadLocalLastVoteDocument(txn); if (!lastVote.isOK()) { log() << "Did not find local voted for document at startup; " << lastVote.getStatus(); } else { - LastVote vote = lastVote.getValue(); - _replExecutor.scheduleWork( - stdx::bind(&ReplicationCoordinatorImpl::_updateLastVote, this, vote)); + LockGuard topoLock(_topoMutex); + _topCoord->loadLastVote(lastVote.getValue()); } StatusWith<BSONObj> cfg = _externalState->loadLocalConfigDocument(txn); @@ -413,6 +413,8 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig( return; } + LockGuard topoLock(_topoMutex); + StatusWith<int> myIndex = validateConfigForStartUp(_externalState.get(), _rsConfig, localConfig); if (!myIndex.isOK()) { @@ -467,7 +469,7 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig( stdx::unique_lock<stdx::mutex> lock(_mutex); invariant(_rsConfigState == kConfigStartingUp); const PostMemberStateUpdateAction action = - _setCurrentRSConfig_inlock(cbData, localConfig, myIndex.getValue()); + _setCurrentRSConfig_inlock(localConfig, myIndex.getValue()); _setMyLastAppliedOpTime_inlock(lastOpTime, false); _setMyLastDurableOpTime_inlock(lastOpTime, false); _reportUpstream_inlock(std::move(lock)); @@ -628,8 +630,8 @@ Seconds ReplicationCoordinatorImpl::getSlaveDelaySecs() const { } void ReplicationCoordinatorImpl::clearSyncSourceBlacklist() { - auto work = [this](const CallbackArgs&) { _topCoord->clearSyncSourceBlacklist(); }; - _scheduleWorkAndWaitForCompletion(work); + LockGuard topoLock(_topoMutex); + _topCoord->clearSyncSourceBlacklist(); } ReplicationExecutor::EventHandle ReplicationCoordinatorImpl::setFollowerMode_nonBlocking( @@ -640,12 +642,7 @@ ReplicationExecutor::EventHandle ReplicationCoordinatorImpl::setFollowerMode_non return ReplicationExecutor::EventHandle(); } fassert(18812, finishedSettingFollowerMode.getStatus()); - _scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_setFollowerModeFinish, - this, - stdx::placeholders::_1, - newState, - finishedSettingFollowerMode.getValue(), - success)); + _setFollowerModeFinish(newState, finishedSettingFollowerMode.getValue(), success); return finishedSettingFollowerMode.getValue(); } @@ -658,13 +655,11 @@ bool ReplicationCoordinatorImpl::setFollowerMode(const MemberState& newState) { } void ReplicationCoordinatorImpl::_setFollowerModeFinish( - const ReplicationExecutor::CallbackArgs& cbData, const MemberState& newState, const ReplicationExecutor::EventHandle& finishedSettingFollowerMode, bool* success) { - if (cbData.status == ErrorCodes::CallbackCanceled) { - return; - } + LockGuard topoLock(_topoMutex); + if (newState == _topCoord->getMemberState()) { *success = true; _replExecutor.signalEvent(finishedSettingFollowerMode); @@ -683,21 +678,21 @@ void ReplicationCoordinatorImpl::_setFollowerModeFinish( // finish setting the follower mode. if (isV1ElectionProtocol()) { invariant(_voteRequester); - _voteRequester->cancel(&_replExecutor); + _voteRequester->cancel(); } else { invariant(_freshnessChecker); - _freshnessChecker->cancel(&_replExecutor); + _freshnessChecker->cancel(); if (_electCmdRunner) { - _electCmdRunner->cancel(&_replExecutor); + _electCmdRunner->cancel(); } } - _replExecutor.onEvent(_electionFinishedEvent, - stdx::bind(&ReplicationCoordinatorImpl::_setFollowerModeFinish, + _replExecutor.onEvent( + _electionFinishedEvent, + _wrapAsCallbackFn(stdx::bind(&ReplicationCoordinatorImpl::_setFollowerModeFinish, this, - stdx::placeholders::_1, newState, finishedSettingFollowerMode, - success)); + success))); return; } @@ -945,8 +940,8 @@ Status ReplicationCoordinatorImpl::setLastOptimeForSlave(const OID& rid, const T } void ReplicationCoordinatorImpl::setMyHeartbeatMessage(const std::string& msg) { - _scheduleWorkAndWaitForCompletion(stdx::bind( - &TopologyCoordinator::setMyHeartbeatMessage, _topCoord.get(), _replExecutor.now(), msg)); + LockGuard topoLock(_topoMutex); + _topCoord->setMyHeartbeatMessage(_replExecutor.now(), msg); } void ReplicationCoordinatorImpl::setMyLastAppliedOpTimeForward(const OpTime& opTime) { @@ -1313,47 +1308,57 @@ Status ReplicationCoordinatorImpl::_setLastOptime_inlock(const UpdatePositionArg } void ReplicationCoordinatorImpl::interrupt(unsigned opId) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - // Wake ops waiting for a new committed snapshot. - _currentCommittedSnapshotCond.notify_all(); + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + // Wake ops waiting for a new committed snapshot. + _currentCommittedSnapshotCond.notify_all(); - for (std::vector<WaiterInfo*>::iterator it = _replicationWaiterList.begin(); - it != _replicationWaiterList.end(); - ++it) { - WaiterInfo* info = *it; - if (info->opID == opId) { - info->condVar->notify_all(); - return; + for (std::vector<WaiterInfo*>::iterator it = _replicationWaiterList.begin(); + it != _replicationWaiterList.end(); + ++it) { + WaiterInfo* info = *it; + if (info->opID == opId) { + info->condVar->notify_all(); + return; + } } - } - for (auto& opTimeWaiter : _opTimeWaiterList) { - if (opTimeWaiter->opID == opId) { - opTimeWaiter->condVar->notify_all(); - return; + for (auto& opTimeWaiter : _opTimeWaiterList) { + if (opTimeWaiter->opID == opId) { + opTimeWaiter->condVar->notify_all(); + return; + } } } - _scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_signalStepDownWaiters, this)); + { + LockGuard topoLock(_topoMutex); + _signalStepDownWaiters(); + } } void ReplicationCoordinatorImpl::interruptAll() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - // Wake ops waiting for a new committed snapshot. - _currentCommittedSnapshotCond.notify_all(); + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + // Wake ops waiting for a new committed snapshot. + _currentCommittedSnapshotCond.notify_all(); - for (std::vector<WaiterInfo*>::iterator it = _replicationWaiterList.begin(); - it != _replicationWaiterList.end(); - ++it) { - WaiterInfo* info = *it; - info->condVar->notify_all(); - } + for (std::vector<WaiterInfo*>::iterator it = _replicationWaiterList.begin(); + it != _replicationWaiterList.end(); + ++it) { + WaiterInfo* info = *it; + info->condVar->notify_all(); + } - for (auto& opTimeWaiter : _opTimeWaiterList) { - opTimeWaiter->condVar->notify_all(); + for (auto& opTimeWaiter : _opTimeWaiterList) { + opTimeWaiter->condVar->notify_all(); + } } - _scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_signalStepDownWaiters, this)); + { + LockGuard topoLock(_topoMutex); + _signalStepDownWaiters(); + } } bool ReplicationCoordinatorImpl::_doneWaitingForReplication_inlock( @@ -1611,24 +1616,20 @@ ReplicationCoordinatorImpl::stepDown_nonBlocking(OperationContext* txn, return StepDownNonBlockingResult(); } fassert(26000, finishedEvent.getStatus()); - CBHStatus cbh = - _replExecutor.scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_stepDownContinue, - this, - stdx::placeholders::_1, - finishedEvent.getValue(), - txn, - waitUntil, - stepDownUntil, - force, - true, // restartHeartbeats - result)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - *result = cbh.getStatus(); - return StepDownNonBlockingResult(); - } - fassert(18809, cbh.getStatus()); - _scheduleWorkAt(waitUntil, - stdx::bind(&ReplicationCoordinatorImpl::_signalStepDownWaiters, this)); + _stepDownContinue(finishedEvent.getValue(), + txn, + waitUntil, + stepDownUntil, + force, + true, // restartHeartbeats + result); + + auto signalStepDownWaitersInLock = [this](const CallbackArgs&) { + LockGuard topoLock(_topoMutex); + _signalStepDownWaiters(); + }; + + _scheduleWorkAt(waitUntil, signalStepDownWaitersInLock); return std::make_pair(std::move(globalReadLock), finishedEvent.getValue()); } @@ -1655,7 +1656,6 @@ void ReplicationCoordinatorImpl::_signalStepDownWaiters() { } void ReplicationCoordinatorImpl::_stepDownContinue( - const ReplicationExecutor::CallbackArgs& cbData, const ReplicationExecutor::EventHandle finishedEvent, OperationContext* txn, const Date_t waitUntil, @@ -1663,18 +1663,10 @@ void ReplicationCoordinatorImpl::_stepDownContinue( bool force, bool restartHeartbeats, Status* result) { - if (cbData.status == ErrorCodes::CallbackCanceled) { - // Cancelation only occurs on shutdown, which will also handle signaling the event. - *result = Status(ErrorCodes::ShutdownInProgress, "Shutting down replication"); - return; - } + LockGuard topoLock(_topoMutex); ScopeGuard allFinishedGuard = MakeGuard(stdx::bind(&ReplicationExecutor::signalEvent, &_replExecutor, finishedEvent)); - if (!cbData.status.isOK()) { - *result = cbData.status; - return; - } Status interruptedStatus = txn->checkForInterruptNoAssert(); if (!interruptedStatus.isOK()) { @@ -1698,10 +1690,10 @@ void ReplicationCoordinatorImpl::_stepDownContinue( bool forceNow = now >= waitUntil ? force : false; if (_topCoord->stepDown(stepDownUntil, forceNow, getMyLastAppliedOpTime())) { // Schedule work to (potentially) step back up once the stepdown period has ended. - _replExecutor.scheduleWorkAt(stepDownUntil, - stdx::bind(&ReplicationCoordinatorImpl::_handleTimePassing, - this, - stdx::placeholders::_1)); + _scheduleWorkAt(stepDownUntil, + stdx::bind(&ReplicationCoordinatorImpl::_handleTimePassing, + this, + stdx::placeholders::_1)); stdx::unique_lock<stdx::mutex> lk(_mutex); const PostMemberStateUpdateAction action = @@ -1732,7 +1724,6 @@ void ReplicationCoordinatorImpl::_stepDownContinue( CBHStatus cbh = _replExecutor.onEvent(_stepDownWaiters.back(), stdx::bind(&ReplicationCoordinatorImpl::_stepDownContinue, this, - stdx::placeholders::_1, finishedEvent, txn, waitUntil, @@ -1754,7 +1745,7 @@ void ReplicationCoordinatorImpl::_stepDownContinue( return; } stdx::lock_guard<stdx::mutex> lk(_mutex); - _restartHeartbeats_inlock(cbData); + _restartHeartbeats_inlock(); } void ReplicationCoordinatorImpl::_handleTimePassing( @@ -1763,6 +1754,7 @@ void ReplicationCoordinatorImpl::_handleTimePassing( return; } + LockGuard topoLock(_topoMutex); if (_topCoord->becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(_replExecutor.now())) { _performPostMemberStateUpdateAction(kActionWinElection); } @@ -1951,38 +1943,30 @@ StatusWith<BSONObj> ReplicationCoordinatorImpl::prepareReplSetUpdatePositionComm } Status ReplicationCoordinatorImpl::processReplSetGetStatus(BSONObjBuilder* response) { + LockGuard topoLock(_topoMutex); + Status result(ErrorCodes::InternalError, "didn't set status in prepareStatusResponse"); - _scheduleWorkAndWaitForCompletion( - stdx::bind(&TopologyCoordinator::prepareStatusResponse, - _topCoord.get(), - stdx::placeholders::_1, - TopologyCoordinator::ReplSetStatusArgs{ - _replExecutor.now(), - static_cast<unsigned>(time(0) - serverGlobalParams.started), - getMyLastAppliedOpTime(), - getMyLastDurableOpTime(), - getLastCommittedOpTime(), - getCurrentCommittedSnapshotOpTime()}, - response, - &result)); + _topCoord->prepareStatusResponse( + TopologyCoordinator::ReplSetStatusArgs{ + _replExecutor.now(), + static_cast<unsigned>(time(0) - serverGlobalParams.started), + getMyLastAppliedOpTime(), + getMyLastDurableOpTime(), + getLastCommittedOpTime(), + getCurrentCommittedSnapshotOpTime()}, + response, + &result); return result; } void ReplicationCoordinatorImpl::fillIsMasterForReplSet(IsMasterResponse* response) { invariant(getSettings().usingReplSets()); - CBHStatus cbh = _replExecutor.scheduleWork( - stdx::bind(&ReplicationCoordinatorImpl::_fillIsMasterForReplSet_finish, - this, - stdx::placeholders::_1, - response)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - response->markAsShutdownInProgress(); - return; + { + LockGuard topoLock(_topoMutex); + _topCoord->fillIsMasterForReplSet(response); } - fassert(28602, cbh.getStatus()); - _replExecutor.wait(cbh.getValue()); if (isWaitingForApplierToDrain()) { // Report that we are secondary to ismaster callers until drain completes. response->setIsMaster(false); @@ -1990,15 +1974,6 @@ void ReplicationCoordinatorImpl::fillIsMasterForReplSet(IsMasterResponse* respon } } -void ReplicationCoordinatorImpl::_fillIsMasterForReplSet_finish( - const ReplicationExecutor::CallbackArgs& cbData, IsMasterResponse* response) { - if (cbData.status == ErrorCodes::CallbackCanceled) { - response->markAsShutdownInProgress(); - return; - } - _topCoord->fillIsMasterForReplSet(response); -} - void ReplicationCoordinatorImpl::appendSlaveInfoData(BSONObjBuilder* result) { stdx::lock_guard<stdx::mutex> lock(_mutex); BSONArrayBuilder replicationProgress(result->subarrayStart("replicationProgress")); @@ -2039,10 +2014,13 @@ void ReplicationCoordinatorImpl::processReplSetGetConfig(BSONObjBuilder* result) void ReplicationCoordinatorImpl::processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) { EventHandle evh; - _scheduleWorkAndWaitForCompletion([this, &evh, &replMetadata](const CallbackArgs& args) { + + { + LockGuard topoLock(_topoMutex); evh = _processReplSetMetadata_incallback(replMetadata); - }); - if (evh.isValid()) { + } + + if (evh) { _replExecutor.waitForEvent(evh); } } @@ -2062,26 +2040,8 @@ EventHandle ReplicationCoordinatorImpl::_processReplSetMetadata_incallback( } bool ReplicationCoordinatorImpl::getMaintenanceMode() { - bool maintenanceMode(false); - CBHStatus cbh = _replExecutor.scheduleWork( - stdx::bind(&ReplicationCoordinatorImpl::_getMaintenanceMode_helper, - this, - stdx::placeholders::_1, - &maintenanceMode)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return false; - } - fassert(18811, cbh.getStatus()); - _replExecutor.wait(cbh.getValue()); - return maintenanceMode; -} - -void ReplicationCoordinatorImpl::_getMaintenanceMode_helper( - const ReplicationExecutor::CallbackArgs& cbData, bool* maintenanceMode) { - if (cbData.status == ErrorCodes::CallbackCanceled) { - return; - } - *maintenanceMode = _topCoord->getMaintenanceCount() > 0; + LockGuard topoLock(_topoMutex); + return _topCoord->getMaintenanceCount() > 0; } Status ReplicationCoordinatorImpl::setMaintenanceMode(bool activate) { @@ -2090,37 +2050,15 @@ Status ReplicationCoordinatorImpl::setMaintenanceMode(bool activate) { "can only set maintenance mode on replica set members"); } - Status result(ErrorCodes::InternalError, "didn't set status in _setMaintenanceMode_helper"); - CBHStatus cbh = _replExecutor.scheduleWork( - stdx::bind(&ReplicationCoordinatorImpl::_setMaintenanceMode_helper, - this, - stdx::placeholders::_1, - activate, - &result)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return cbh.getStatus(); - } - fassert(18698, cbh.getStatus()); - _replExecutor.wait(cbh.getValue()); - return result; -} - -void ReplicationCoordinatorImpl::_setMaintenanceMode_helper( - const ReplicationExecutor::CallbackArgs& cbData, bool activate, Status* result) { - if (cbData.status == ErrorCodes::CallbackCanceled) { - *result = Status(ErrorCodes::ShutdownInProgress, "replication system is shutting down"); - return; - } - + Status result(ErrorCodes::InternalError, "didn't set status"); + LockGuard topoLock(_topoMutex); if (_topCoord->getRole() == TopologyCoordinator::Role::candidate) { - *result = Status(ErrorCodes::NotSecondary, "currently running for election"); - return; + return Status(ErrorCodes::NotSecondary, "currently running for election"); } stdx::unique_lock<stdx::mutex> lk(_mutex); if (_getMemberState_inlock().primary()) { - *result = Status(ErrorCodes::NotSecondary, "primaries can't modify maintenance mode"); - return; + return Status(ErrorCodes::NotSecondary, "primaries can't modify maintenance mode"); } int curMaintenanceCalls = _topCoord->getMaintenanceCount(); @@ -2137,63 +2075,28 @@ void ReplicationCoordinatorImpl::_setMaintenanceMode_helper( << " other maintenance mode tasks ongoing)" << rsLog; } else { warning() << "Attempted to leave maintenance mode but it is not currently active"; - *result = Status(ErrorCodes::OperationFailed, "already out of maintenance mode"); - return; + return Status(ErrorCodes::OperationFailed, "already out of maintenance mode"); } const PostMemberStateUpdateAction action = _updateMemberStateFromTopologyCoordinator_inlock(); - *result = Status::OK(); lk.unlock(); _performPostMemberStateUpdateAction(action); + return Status::OK(); } Status ReplicationCoordinatorImpl::processReplSetSyncFrom(const HostAndPort& target, BSONObjBuilder* resultObj) { Status result(ErrorCodes::InternalError, "didn't set status in prepareSyncFromResponse"); - CBHStatus cbh = - _replExecutor.scheduleWork(stdx::bind(&TopologyCoordinator::prepareSyncFromResponse, - _topCoord.get(), - stdx::placeholders::_1, - target, - _getMyLastAppliedOpTime_inlock(), - resultObj, - &result)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return Status(ErrorCodes::ShutdownInProgress, "replication shutdown in progress"); - } - fassert(18649, cbh.getStatus()); - _replExecutor.wait(cbh.getValue()); + LockGuard topoLock(_topoMutex); + LockGuard lk(_mutex); + auto opTime = _getMyLastAppliedOpTime_inlock(); + _topCoord->prepareSyncFromResponse(target, opTime, resultObj, &result); return result; } Status ReplicationCoordinatorImpl::processReplSetFreeze(int secs, BSONObjBuilder* resultObj) { - Status result(ErrorCodes::InternalError, "didn't set status in prepareFreezeResponse"); - CBHStatus cbh = _replExecutor.scheduleWork( - stdx::bind(&ReplicationCoordinatorImpl::_processReplSetFreeze_finish, - this, - stdx::placeholders::_1, - secs, - resultObj, - &result)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return cbh.getStatus(); - } - fassert(18641, cbh.getStatus()); - _replExecutor.wait(cbh.getValue()); - return result; -} - -void ReplicationCoordinatorImpl::_processReplSetFreeze_finish( - const ReplicationExecutor::CallbackArgs& cbData, - int secs, - BSONObjBuilder* response, - Status* result) { - if (cbData.status == ErrorCodes::CallbackCanceled) { - *result = Status(ErrorCodes::ShutdownInProgress, "replication system is shutting down"); - return; - } - - _topCoord->prepareFreezeResponse(_replExecutor.now(), secs, response); + LockGuard topoLock(_topoMutex); + _topCoord->prepareFreezeResponse(_replExecutor.now(), secs, resultObj); if (_topCoord->getRole() == TopologyCoordinator::Role::candidate) { // If we just unfroze and ended our stepdown period and we are a one node replica set, @@ -2201,7 +2104,8 @@ void ReplicationCoordinatorImpl::_processReplSetFreeze_finish( // need to elect ourself. _performPostMemberStateUpdateAction(kActionWinElection); } - *result = Status::OK(); + return Status::OK(); + ; } Status ReplicationCoordinatorImpl::processHeartbeat(const ReplSetHeartbeatArgs& args, @@ -2214,44 +2118,17 @@ Status ReplicationCoordinatorImpl::processHeartbeat(const ReplSetHeartbeatArgs& } } - Status result(ErrorCodes::InternalError, "didn't set status in prepareHeartbeatResponse"); - CBHStatus cbh = - _replExecutor.scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_processHeartbeatFinish, - this, - stdx::placeholders::_1, - args, - response, - &result)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return Status(ErrorCodes::ShutdownInProgress, "replication shutdown in progress"); - } - fassert(18508, cbh.getStatus()); - _replExecutor.wait(cbh.getValue()); - return result; -} - -void ReplicationCoordinatorImpl::_processHeartbeatFinish( - const ReplicationExecutor::CallbackArgs& cbData, - const ReplSetHeartbeatArgs& args, - ReplSetHeartbeatResponse* response, - Status* outStatus) { - if (cbData.status == ErrorCodes::CallbackCanceled) { - *outStatus = Status(ErrorCodes::ShutdownInProgress, "Replication shutdown in progress"); - return; - } - fassert(18910, cbData.status); - auto senderHost(args.getSenderHost()); + LockGuard topoLock(_topoMutex); const Date_t now = _replExecutor.now(); - *outStatus = _topCoord->prepareHeartbeatResponse(now, - args, - _settings.ourSetName(), - getMyLastAppliedOpTime(), - getMyLastDurableOpTime(), - response); - if ((outStatus->isOK() || *outStatus == ErrorCodes::InvalidReplicaSetConfig) && - _selfIndex < 0) { + Status result = _topCoord->prepareHeartbeatResponse(now, + args, + _settings.ourSetName(), + getMyLastAppliedOpTime(), + getMyLastDurableOpTime(), + response); + if ((result.isOK() || result == ErrorCodes::InvalidReplicaSetConfig) && _selfIndex < 0) { // If this node does not belong to the configuration it knows about, send heartbeats // back to any node that sends us a heartbeat, in case one of those remote nodes has // a configuration that contains us. Chances are excellent that it will, since that @@ -2259,7 +2136,7 @@ void ReplicationCoordinatorImpl::_processHeartbeatFinish( if (!senderHost.empty() && _seedList.insert(senderHost).second) { _scheduleHeartbeatToTarget(senderHost, -1, now); } - } else if (outStatus->isOK() && response->getConfigVersion() < args.getConfigVersion()) { + } else if (result.isOK() && response->getConfigVersion() < args.getConfigVersion()) { // Schedule a heartbeat to the sender to fetch the new config. // We cannot cancel the enqueued heartbeat, but either this one or the enqueued heartbeat // will trigger reconfig, which cancels and reschedules all heartbeats. @@ -2269,6 +2146,7 @@ void ReplicationCoordinatorImpl::_processHeartbeatFinish( _scheduleHeartbeatToTarget(senderHost, senderIndex, now); } } + return result; } Status ReplicationCoordinatorImpl::processReplSetReconfig(OperationContext* txn, @@ -2377,14 +2255,20 @@ Status ReplicationCoordinatorImpl::processReplSetReconfig(OperationContext* txn, // If it's a force reconfig, the primary node may not be electable after the configuration // change. In case we are that primary node, finish the reconfig under the global lock, // so that the step down occurs safely. - CBHStatus cbh = args.force ? _replExecutor.scheduleWorkWithGlobalExclusiveLock(reconfigFinishFn) - : _replExecutor.scheduleWork(reconfigFinishFn); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return status; + CBHStatus cbhStatus(ErrorCodes::InternalError, "reconfigFinishFn hasn't been scheduled"); + if (args.force) { + cbhStatus = _replExecutor.scheduleWorkWithGlobalExclusiveLock(reconfigFinishFn); + } else { + cbhStatus = _replExecutor.scheduleWork(reconfigFinishFn); + } + if (cbhStatus.getStatus() == ErrorCodes::ShutdownInProgress) { + return cbhStatus.getStatus(); } - fassert(18824, cbh.getStatus()); + + fassert(18824, cbhStatus.getStatus()); + configStateGuard.Dismiss(); - _replExecutor.wait(cbh.getValue()); + _replExecutor.wait(cbhStatus.getValue()); return Status::OK(); } @@ -2392,12 +2276,37 @@ void ReplicationCoordinatorImpl::_finishReplSetReconfig( const ReplicationExecutor::CallbackArgs& cbData, const ReplicaSetConfig& newConfig, int myIndex) { + LockGuard topoLock(_topoMutex); + stdx::unique_lock<stdx::mutex> lk(_mutex); invariant(_rsConfigState == kConfigReconfiguring); invariant(_rsConfig.isInitialized()); + + // Do not conduct an election during a reconfig, as the node may not be electable post-reconfig. + if (_topCoord->getRole() == TopologyCoordinator::Role::candidate) { + if (isV1ElectionProtocol()) { + invariant(_voteRequester); + _voteRequester->cancel(); + } else { + invariant(_freshnessChecker); + _freshnessChecker->cancel(); + if (_electCmdRunner) { + _electCmdRunner->cancel(); + } + } + // Wait for the election to complete and the node's Role to be set to follower. + _replExecutor.onEvent(_electionFinishedEvent, + stdx::bind(&ReplicationCoordinatorImpl::_finishReplSetReconfig, + this, + stdx::placeholders::_1, + newConfig, + myIndex)); + return; + } + + const ReplicaSetConfig oldConfig = _rsConfig; - const PostMemberStateUpdateAction action = - _setCurrentRSConfig_inlock(cbData, newConfig, myIndex); + const PostMemberStateUpdateAction action = _setCurrentRSConfig_inlock(newConfig, myIndex); // On a reconfig we drop all snapshots so we don't mistakenely read from the wrong one. // For example, if we change the meaning of the "committed" snapshot from applied -> durable. @@ -2497,18 +2406,11 @@ Status ReplicationCoordinatorImpl::processReplSetInitiate(OperationContext* txn, // Since the JournalListener has not yet been set up, we must manually set our // durableOpTime. setMyLastDurableOpTime(getMyLastAppliedOpTime()); - CBHStatus cbh = _replExecutor.scheduleWork( - stdx::bind(&ReplicationCoordinatorImpl::_finishReplSetInitiate, - this, - stdx::placeholders::_1, - newConfig, - myIndex.getValue())); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return status; + + { + LockGuard topoLock(_topoMutex); + _finishReplSetInitiate(newConfig, myIndex.getValue()); } - configStateGuard.Dismiss(); - fassert(18654, cbh.getStatus()); - _replExecutor.wait(cbh.getValue()); // A configuration passed to replSetInitiate() with the current node as an arbiter // will fail validation with a "replSet initiate got ... while validating" reason. @@ -2517,19 +2419,17 @@ Status ReplicationCoordinatorImpl::processReplSetInitiate(OperationContext* txn, _startDataReplication(); } + configStateGuard.Dismiss(); return Status::OK(); } -void ReplicationCoordinatorImpl::_finishReplSetInitiate( - const ReplicationExecutor::CallbackArgs& cbData, - const ReplicaSetConfig& newConfig, - int myIndex) { +void ReplicationCoordinatorImpl::_finishReplSetInitiate(const ReplicaSetConfig& newConfig, + int myIndex) { stdx::unique_lock<stdx::mutex> lk(_mutex); invariant(_rsConfigState == kConfigInitiating); invariant(!_rsConfig.isInitialized()); const ReplicaSetConfig oldConfig = _rsConfig; - const PostMemberStateUpdateAction action = - _setCurrentRSConfig_inlock(cbData, newConfig, myIndex); + const PostMemberStateUpdateAction action = _setCurrentRSConfig_inlock(newConfig, myIndex); lk.unlock(); _resetElectionInfoOnProtocolVersionUpgrade(oldConfig, newConfig); _performPostMemberStateUpdateAction(action); @@ -2701,94 +2601,31 @@ void ReplicationCoordinatorImpl::incrementRollbackID() { Status ReplicationCoordinatorImpl::processReplSetFresh(const ReplSetFreshArgs& args, BSONObjBuilder* resultObj) { + LockGuard topoLock(_topoMutex); Status result(ErrorCodes::InternalError, "didn't set status in prepareFreshResponse"); - CBHStatus cbh = _replExecutor.scheduleWork( - stdx::bind(&ReplicationCoordinatorImpl::_processReplSetFresh_finish, - this, - stdx::placeholders::_1, - args, - resultObj, - &result)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return Status(ErrorCodes::ShutdownInProgress, "replication shutdown in progress"); - } - fassert(18652, cbh.getStatus()); - _replExecutor.wait(cbh.getValue()); - return result; -} - -void ReplicationCoordinatorImpl::_processReplSetFresh_finish( - const ReplicationExecutor::CallbackArgs& cbData, - const ReplSetFreshArgs& args, - BSONObjBuilder* response, - Status* result) { - if (cbData.status == ErrorCodes::CallbackCanceled) { - *result = Status(ErrorCodes::ShutdownInProgress, "replication shutdown in progress"); - return; - } - _topCoord->prepareFreshResponse( - args, _replExecutor.now(), getMyLastAppliedOpTime(), response, result); + args, _replExecutor.now(), getMyLastAppliedOpTime(), resultObj, &result); + return result; } Status ReplicationCoordinatorImpl::processReplSetElect(const ReplSetElectArgs& args, BSONObjBuilder* responseObj) { + LockGuard topoLock(_topoMutex); Status result = Status(ErrorCodes::InternalError, "status not set by callback"); - CBHStatus cbh = _replExecutor.scheduleWork( - stdx::bind(&ReplicationCoordinatorImpl::_processReplSetElect_finish, - this, - stdx::placeholders::_1, - args, - responseObj, - &result)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return Status(ErrorCodes::ShutdownInProgress, "replication shutdown in progress"); - } - fassert(18657, cbh.getStatus()); - _replExecutor.wait(cbh.getValue()); - return result; -} - -void ReplicationCoordinatorImpl::_processReplSetElect_finish( - const ReplicationExecutor::CallbackArgs& cbData, - const ReplSetElectArgs& args, - BSONObjBuilder* response, - Status* result) { - if (cbData.status == ErrorCodes::CallbackCanceled) { - *result = Status(ErrorCodes::ShutdownInProgress, "replication shutdown in progress"); - return; - } - _topCoord->prepareElectResponse( - args, _replExecutor.now(), getMyLastAppliedOpTime(), response, result); + args, _replExecutor.now(), getMyLastAppliedOpTime(), responseObj, &result); + return result; } ReplicationCoordinatorImpl::PostMemberStateUpdateAction -ReplicationCoordinatorImpl::_setCurrentRSConfig_inlock( - const ReplicationExecutor::CallbackArgs& cbData, - const ReplicaSetConfig& newConfig, - int myIndex) { +ReplicationCoordinatorImpl::_setCurrentRSConfig_inlock(const ReplicaSetConfig& newConfig, + int myIndex) { invariant(_settings.usingReplSets()); _cancelHeartbeats_inlock(); _setConfigState_inlock(kConfigSteady); // Must get this before changing our config. OpTime myOptime = _getMyLastAppliedOpTime_inlock(); - // Do not conduct an election during a reconfig, as the node may not be electable post-reconfig. - if (_topCoord->getRole() == TopologyCoordinator::Role::candidate) { - if (isV1ElectionProtocol()) { - invariant(_voteRequester); - _voteRequester->cancel(&_replExecutor); - } else { - invariant(_freshnessChecker); - _freshnessChecker->cancel(&_replExecutor); - if (_electCmdRunner) { - _electCmdRunner->cancel(&_replExecutor); - } - } - // Wait for the election to complete and the node's Role to be set to follower. - _replExecutor.waitForEvent(_electionFinishedEvent); - } _topCoord->updateConfig(newConfig, myIndex, _replExecutor.now(), myOptime); _cachedTerm = _topCoord->getTerm(); const ReplicaSetConfig oldConfig = _rsConfig; @@ -2811,7 +2648,7 @@ ReplicationCoordinatorImpl::_setCurrentRSConfig_inlock( if (_selfIndex >= 0) { // Don't send heartbeats if we're not in the config, if we get re-added one of the // nodes in the set will contact us. - _startHeartbeats_inlock(cbData); + _startHeartbeats_inlock(); } _updateLastCommittedOpTime_inlock(); @@ -3028,81 +2865,41 @@ bool ReplicationCoordinatorImpl::isReplEnabled() const { return getReplicationMode() != modeNone; } -void ReplicationCoordinatorImpl::_chooseNewSyncSource( - const ReplicationExecutor::CallbackArgs& cbData, - const Timestamp& lastTimestampFetched, - HostAndPort* newSyncSource) { - if (cbData.status == ErrorCodes::CallbackCanceled) { - return; - } +HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource(const Timestamp& lastTimestampFetched) { + LockGuard topoLock(_topoMutex); HostAndPort oldSyncSource = _topCoord->getSyncSourceAddress(); - - *newSyncSource = _topCoord->chooseNewSyncSource(_replExecutor.now(), lastTimestampFetched); + HostAndPort newSyncSource = + _topCoord->chooseNewSyncSource(_replExecutor.now(), lastTimestampFetched); stdx::lock_guard<stdx::mutex> lock(_mutex); // If we lost our sync source, schedule new heartbeats immediately to update our knowledge // of other members's state, allowing us to make informed sync source decisions. - if (newSyncSource->empty() && !oldSyncSource.empty() && _selfIndex >= 0 && + if (newSyncSource.empty() && !oldSyncSource.empty() && _selfIndex >= 0 && !_getMemberState_inlock().primary()) { - _restartHeartbeats_inlock(cbData); + _restartHeartbeats_inlock(); } -} -HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource(const Timestamp& lastTimestampFetched) { - HostAndPort newSyncSource; - CBHStatus cbh = - _replExecutor.scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_chooseNewSyncSource, - this, - stdx::placeholders::_1, - lastTimestampFetched, - &newSyncSource)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return newSyncSource; // empty - } - fassert(18740, cbh.getStatus()); - _replExecutor.wait(cbh.getValue()); return newSyncSource; } -void ReplicationCoordinatorImpl::_blacklistSyncSource( - const ReplicationExecutor::CallbackArgs& cbData, const HostAndPort& host, Date_t until) { - if (cbData.status == ErrorCodes::CallbackCanceled) { - return; - } - _topCoord->blacklistSyncSource(host, until); - - CBHStatus cbh = - _replExecutor.scheduleWorkAt(until, - stdx::bind(&ReplicationCoordinatorImpl::_unblacklistSyncSource, - this, - stdx::placeholders::_1, - host)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return; - } - fassert(28610, cbh.getStatus()); -} - void ReplicationCoordinatorImpl::_unblacklistSyncSource( const ReplicationExecutor::CallbackArgs& cbData, const HostAndPort& host) { if (cbData.status == ErrorCodes::CallbackCanceled) return; + + LockGuard topoLock(_topoMutex); _topCoord->unblacklistSyncSource(host, _replExecutor.now()); } void ReplicationCoordinatorImpl::blacklistSyncSource(const HostAndPort& host, Date_t until) { - CBHStatus cbh = - _replExecutor.scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_blacklistSyncSource, - this, - stdx::placeholders::_1, - host, - until)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return; - } - fassert(18741, cbh.getStatus()); - _replExecutor.wait(cbh.getValue()); + LockGuard topoLock(_topoMutex); + _topCoord->blacklistSyncSource(host, until); + _scheduleWorkAt(until, + stdx::bind(&ReplicationCoordinatorImpl::_unblacklistSyncSource, + this, + stdx::placeholders::_1, + host)); } void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* txn) { @@ -3124,41 +2921,15 @@ void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* txn _externalState->setGlobalTimestamp(lastOpTime.getTimestamp()); } -void ReplicationCoordinatorImpl::_shouldChangeSyncSource( - const ReplicationExecutor::CallbackArgs& cbData, - const HostAndPort& currentSource, - const OpTime& syncSourceLastOpTime, - bool syncSourceHasSyncSource, - bool* shouldChange) { - if (cbData.status == ErrorCodes::CallbackCanceled) { - return; - } - - *shouldChange = _topCoord->shouldChangeSyncSource(currentSource, - getMyLastAppliedOpTime(), - syncSourceLastOpTime, - syncSourceHasSyncSource, - _replExecutor.now()); -} - bool ReplicationCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentSource, const OpTime& syncSourceLastOpTime, bool syncSourceHasSyncSource) { - bool shouldChange(false); - CBHStatus cbh = - _replExecutor.scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_shouldChangeSyncSource, - this, - stdx::placeholders::_1, - currentSource, - syncSourceLastOpTime, - syncSourceHasSyncSource, - &shouldChange)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return false; - } - fassert(18906, cbh.getStatus()); - _replExecutor.wait(cbh.getValue()); - return shouldChange; + LockGuard topoLock(_topoMutex); + return _topCoord->shouldChangeSyncSource(currentSource, + getMyLastAppliedOpTime(), + syncSourceLastOpTime, + syncSourceHasSyncSource, + _replExecutor.now()); } SyncSourceResolverResponse ReplicationCoordinatorImpl::selectSyncSource( @@ -3351,18 +3122,12 @@ Status ReplicationCoordinatorImpl::processReplSetRequestVotes( if (!termStatus.isOK() && termStatus.code() != ErrorCodes::StaleTerm) return termStatus; - Status result{ErrorCodes::InternalError, "didn't set status in processReplSetRequestVotes"}; - CBHStatus cbh = _replExecutor.scheduleWork( - stdx::bind(&ReplicationCoordinatorImpl::_processReplSetRequestVotes_finish, - this, - stdx::placeholders::_1, - args, - response, - &result)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return cbh.getStatus(); + { + LockGuard topoLock(_topoMutex); + LockGuard lk(_mutex); + _topCoord->processReplSetRequestVotes(args, response, _getMyLastAppliedOpTime_inlock()); } - _replExecutor.wait(cbh.getValue()); + if (response->getVoteGranted()) { LastVote lastVote; lastVote.setTerm(args.getTerm()); @@ -3374,22 +3139,7 @@ Status ReplicationCoordinatorImpl::processReplSetRequestVotes( return status; } } - return result; -} - -void ReplicationCoordinatorImpl::_processReplSetRequestVotes_finish( - const ReplicationExecutor::CallbackArgs& cbData, - const ReplSetRequestVotesArgs& args, - ReplSetRequestVotesResponse* response, - Status* result) { - if (cbData.status == ErrorCodes::CallbackCanceled) { - *result = Status(ErrorCodes::ShutdownInProgress, "replication system is shutting down"); - return; - } - - stdx::unique_lock<stdx::mutex> lk(_mutex); - _topCoord->processReplSetRequestVotes(args, response, _getMyLastAppliedOpTime_inlock()); - *result = Status::OK(); + return Status::OK(); } Status ReplicationCoordinatorImpl::processReplSetDeclareElectionWinner( @@ -3400,33 +3150,9 @@ Status ReplicationCoordinatorImpl::processReplSetDeclareElectionWinner( // TODO(sz) Remove processReplSetDeclareElectionWinner rathen than passing nullptr. updateTerm(nullptr, args.getTerm()); - - Status result{ErrorCodes::InternalError, - "didn't set status in processReplSetDeclareElectionWinner"}; - CBHStatus cbh = _replExecutor.scheduleWork( - stdx::bind(&ReplicationCoordinatorImpl::_processReplSetDeclareElectionWinner_finish, - this, - stdx::placeholders::_1, - args, - responseTerm, - &result)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return cbh.getStatus(); - } - _replExecutor.wait(cbh.getValue()); - return result; -} - -void ReplicationCoordinatorImpl::_processReplSetDeclareElectionWinner_finish( - const ReplicationExecutor::CallbackArgs& cbData, - const ReplSetDeclareElectionWinnerArgs& args, - long long* responseTerm, - Status* result) { - if (cbData.status == ErrorCodes::CallbackCanceled) { - *result = Status(ErrorCodes::ShutdownInProgress, "replication system is shutting down"); - return; - } - *result = _topCoord->processReplSetDeclareElectionWinner(args, responseTerm); + LockGuard topoLock(_topoMutex); + return _topCoord->processReplSetDeclareElectionWinner(args, responseTerm); + ; } void ReplicationCoordinatorImpl::prepareReplResponseMetadata(const rpc::RequestInterface& request, @@ -3434,34 +3160,15 @@ void ReplicationCoordinatorImpl::prepareReplResponseMetadata(const rpc::RequestI BSONObjBuilder* builder) { if (request.getMetadata().hasField(rpc::kReplSetMetadataFieldName)) { rpc::ReplSetMetadata metadata; + LockGuard topoLock(_topoMutex); - CBHStatus cbh = _replExecutor.scheduleWork( - stdx::bind(&ReplicationCoordinatorImpl::_prepareReplResponseMetadata_finish, - this, - stdx::placeholders::_1, - lastOpTimeFromClient, - &metadata)); - - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return; - } - - fassert(28709, cbh.getStatus()); - _replExecutor.wait(cbh.getValue()); - + OpTime lastReadableOpTime = getCurrentCommittedSnapshotOpTime(); + OpTime lastVisibleOpTime = std::max(lastOpTimeFromClient, lastReadableOpTime); + _topCoord->prepareReplResponseMetadata(&metadata, lastVisibleOpTime, _lastCommittedOpTime); metadata.writeToMetadata(builder); } } -void ReplicationCoordinatorImpl::_prepareReplResponseMetadata_finish( - const ReplicationExecutor::CallbackArgs& cbData, - const OpTime& lastOpTimeFromClient, - rpc::ReplSetMetadata* metadata) { - OpTime lastReadableOpTime = getCurrentCommittedSnapshotOpTime(); - OpTime lastVisibleOpTime = std::max(lastOpTimeFromClient, lastReadableOpTime); - _topCoord->prepareReplResponseMetadata(metadata, lastVisibleOpTime, _lastCommittedOpTime); -} - bool ReplicationCoordinatorImpl::isV1ElectionProtocol() const { return _protVersion.load() == 1; } @@ -3485,44 +3192,18 @@ Status ReplicationCoordinatorImpl::processHeartbeatV1(const ReplSetHeartbeatArgs } Status result(ErrorCodes::InternalError, "didn't set status in prepareHeartbeatResponse"); - CBHStatus cbh = _replExecutor.scheduleWork( - stdx::bind(&ReplicationCoordinatorImpl::_processHeartbeatFinishV1, - this, - stdx::placeholders::_1, - args, - response, - &result)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return {ErrorCodes::ShutdownInProgress, "replication shutdown in progress"}; - } - fassert(28645, cbh.getStatus()); - _replExecutor.wait(cbh.getValue()); - - return result; -} - -void ReplicationCoordinatorImpl::_processHeartbeatFinishV1( - const ReplicationExecutor::CallbackArgs& cbData, - const ReplSetHeartbeatArgsV1& args, - ReplSetHeartbeatResponse* response, - Status* outStatus) { - if (cbData.status == ErrorCodes::CallbackCanceled) { - *outStatus = {ErrorCodes::ShutdownInProgress, "Replication shutdown in progress"}; - return; - } - fassert(28655, cbData.status); + LockGuard topoLock(_topoMutex); auto senderHost(args.getSenderHost()); const Date_t now = _replExecutor.now(); - *outStatus = _topCoord->prepareHeartbeatResponseV1(now, - args, - _settings.ourSetName(), - getMyLastAppliedOpTime(), - getMyLastDurableOpTime(), - response); - - if ((outStatus->isOK() || *outStatus == ErrorCodes::InvalidReplicaSetConfig) && - _selfIndex < 0) { + result = _topCoord->prepareHeartbeatResponseV1(now, + args, + _settings.ourSetName(), + getMyLastAppliedOpTime(), + getMyLastDurableOpTime(), + response); + + if ((result.isOK() || result == ErrorCodes::InvalidReplicaSetConfig) && _selfIndex < 0) { // If this node does not belong to the configuration it knows about, send heartbeats // back to any node that sends us a heartbeat, in case one of those remote nodes has // a configuration that contains us. Chances are excellent that it will, since that @@ -3530,7 +3211,7 @@ void ReplicationCoordinatorImpl::_processHeartbeatFinishV1( if (!senderHost.empty() && _seedList.insert(senderHost).second) { _scheduleHeartbeatToTarget(senderHost, -1, now); } - } else if (outStatus->isOK() && response->getConfigVersion() < args.getConfigVersion()) { + } else if (result.isOK() && response->getConfigVersion() < args.getConfigVersion()) { // Schedule a heartbeat to the sender to fetch the new config. // We cannot cancel the enqueued heartbeat, but either this one or the enqueued heartbeat // will trigger reconfig, which cancels and reschedules all heartbeats. @@ -3538,36 +3219,21 @@ void ReplicationCoordinatorImpl::_processHeartbeatFinishV1( int senderIndex = _rsConfig.findMemberIndexByHostAndPort(senderHost); _scheduleHeartbeatToTarget(senderHost, senderIndex, now); } - } else if (outStatus->isOK()) { + } else if (result.isOK()) { // Update liveness for sending node. stdx::lock_guard<stdx::mutex> lk(_mutex); auto slaveInfo = _findSlaveInfoByMemberID_inlock(args.getSenderId()); if (!slaveInfo) { - return; + return result; } slaveInfo->lastUpdate = _replExecutor.now(); slaveInfo->down = false; } + return result; } void ReplicationCoordinatorImpl::summarizeAsHtml(ReplSetHtmlSummary* output) { - CBHStatus cbh = - _replExecutor.scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_summarizeAsHtml_finish, - this, - stdx::placeholders::_1, - output)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return; - } - fassert(28638, cbh.getStatus()); - _replExecutor.wait(cbh.getValue()); -} - -void ReplicationCoordinatorImpl::_summarizeAsHtml_finish(const CallbackArgs& cbData, - ReplSetHtmlSummary* output) { - if (cbData.status == ErrorCodes::CallbackCanceled) { - return; - } + LockGuard topoLock(_topoMutex); // TODO(dannenberg) consider putting both optimes into the htmlsummary. output->setSelfOptime(getMyLastAppliedOpTime()); @@ -3582,30 +3248,18 @@ long long ReplicationCoordinatorImpl::getTerm() { return _cachedTerm; } -void ReplicationCoordinatorImpl::_getTerm_helper(const ReplicationExecutor::CallbackArgs& cbData, - long long* term) { - if (cbData.status == ErrorCodes::CallbackCanceled) { - return; - } - *term = _topCoord->getTerm(); -} - EventHandle ReplicationCoordinatorImpl::updateTerm_forTest( long long term, TopologyCoordinator::UpdateTermResult* updateResult) { - auto finishEvhStatus = _replExecutor.makeEvent(); - invariantOK(finishEvhStatus.getStatus()); - EventHandle finishEvh = finishEvhStatus.getValue(); - auto signalFinishEvent = - [this, finishEvh](const CallbackArgs&) { this->_replExecutor.signalEvent(finishEvh); }; - auto work = [this, term, updateResult, signalFinishEvent](const CallbackArgs& args) { - auto evh = _updateTerm_incallback(term, updateResult); - if (evh.isValid()) { - _replExecutor.onEvent(evh, signalFinishEvent); - } else { - signalFinishEvent(args); - } - }; - _scheduleWork(work); + LockGuard topoLock(_topoMutex); + + EventHandle finishEvh; + finishEvh = _updateTerm_incallback(term, updateResult); + if (!finishEvh) { + auto finishEvhStatus = _replExecutor.makeEvent(); + invariantOK(finishEvhStatus.getStatus()); + finishEvh = finishEvhStatus.getValue(); + _replExecutor.signalEvent(finishEvh); + } return finishEvh; } @@ -3624,10 +3278,12 @@ Status ReplicationCoordinatorImpl::updateTerm(OperationContext* txn, long long t dassert(!txn->lockState()->isLocked()); TopologyCoordinator::UpdateTermResult updateTermResult; EventHandle finishEvh; - auto work = [this, term, &updateTermResult, &finishEvh](const CallbackArgs&) { + + { + LockGuard topoLock(_topoMutex); finishEvh = _updateTerm_incallback(term, &updateTermResult); - }; - _scheduleWorkAndWaitForCompletion(work); + } + // Wait for potential stepdown to finish. if (finishEvh.isValid()) { _replExecutor.waitForEvent(finishEvh); @@ -3829,10 +3485,9 @@ void ReplicationCoordinatorImpl::_scheduleWorkAtAndWaitForCompletion(Date_t when } } -// static CallbackHandle ReplicationCoordinatorImpl::_wrapAndScheduleWork(ScheduleFn scheduleFn, const CallbackFn& work) { - auto workWrapped = [work](const CallbackArgs& args) { + auto workWrapped = [this, work](const CallbackArgs& args) { if (args.status == ErrorCodes::CallbackCanceled) { return; } @@ -3856,23 +3511,12 @@ EventHandle ReplicationCoordinatorImpl::_makeEvent() { } void ReplicationCoordinatorImpl::_scheduleElectionWinNotification() { - auto electionWinNotificationCallback = [this](const CallbackArgs& cbData) { - if (cbData.status == ErrorCodes::CallbackCanceled) { - return; - } - - stdx::lock_guard<stdx::mutex> lock(_mutex); - if (!_getMemberState_inlock().primary()) { - return; - } - - _restartHeartbeats_inlock(cbData); - }; - - auto cbStatus = _replExecutor.scheduleWork(electionWinNotificationCallback); - if (!cbStatus.getStatus().isOK()) { - warning() << "Error in scheduling notification of election win: " << cbStatus.getStatus(); + stdx::lock_guard<stdx::mutex> lock(_mutex); + if (!_getMemberState_inlock().primary()) { + return; } + + _restartHeartbeats_inlock(); } WriteConcernOptions ReplicationCoordinatorImpl::populateUnsetWriteConcernOptionsSyncMode( @@ -3889,5 +3533,16 @@ WriteConcernOptions ReplicationCoordinatorImpl::populateUnsetWriteConcernOptions return writeConcern; } +CallbackFn ReplicationCoordinatorImpl::_wrapAsCallbackFn(const stdx::function<void()>& work) { + return [work](const CallbackArgs& cbData) { + if (cbData.status == ErrorCodes::CallbackCanceled) { + return; + } + + work(); + }; +} + + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 461d3ababad..bd2e06b8a7c 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -587,10 +587,8 @@ private: * Returns an action to be performed after unlocking _mutex, via * _performPostMemberStateUpdateAction. */ - PostMemberStateUpdateAction _setCurrentRSConfig_inlock( - const ReplicationExecutor::CallbackArgs& cbData, - const ReplicaSetConfig& newConfig, - int myIndex); + PostMemberStateUpdateAction _setCurrentRSConfig_inlock(const ReplicaSetConfig& newConfig, + int myIndex); /** * Updates the last committed OpTime to be "committedOpTime" if it is more recent than the @@ -605,74 +603,6 @@ private: void _wakeReadyWaiters_inlock(); /** - * Helper method for setting/unsetting maintenance mode. Scheduled by setMaintenanceMode() - * to run in a global write lock in the replication executor thread. - */ - void _setMaintenanceMode_helper(const ReplicationExecutor::CallbackArgs& cbData, - bool activate, - Status* result); - - /** - * Helper method for retrieving maintenance mode. Scheduled by getMaintenanceMode() to run - * in the replication executor thread. - */ - void _getMaintenanceMode_helper(const ReplicationExecutor::CallbackArgs& cbData, - bool* maintenanceMode); - - /** - * Bottom half of fillIsMasterForReplSet. - */ - void _fillIsMasterForReplSet_finish(const ReplicationExecutor::CallbackArgs& cbData, - IsMasterResponse* result); - - /** - * Bottom half of processReplSetFresh. - */ - void _processReplSetFresh_finish(const ReplicationExecutor::CallbackArgs& cbData, - const ReplSetFreshArgs& args, - BSONObjBuilder* response, - Status* result); - - /** - * Bottom half of processReplSetElect. - */ - void _processReplSetElect_finish(const ReplicationExecutor::CallbackArgs& cbData, - const ReplSetElectArgs& args, - BSONObjBuilder* response, - Status* result); - - /** - * Bottom half of processReplSetFreeze. - */ - void _processReplSetFreeze_finish(const ReplicationExecutor::CallbackArgs& cbData, - int secs, - BSONObjBuilder* response, - Status* result); - - /** - * Bottom half of processReplSetDeclareElectionWinner. - */ - void _processReplSetDeclareElectionWinner_finish( - const ReplicationExecutor::CallbackArgs& cbData, - const ReplSetDeclareElectionWinnerArgs& args, - long long* responseTerm, - Status* result); - - /** - * Bottom half of processReplSetRequestVotes. - */ - void _processReplSetRequestVotes_finish(const ReplicationExecutor::CallbackArgs& cbData, - const ReplSetRequestVotesArgs& args, - ReplSetRequestVotesResponse* response, - Status* result); - - /** - * Bottom half of prepareReplResponseMetadata. - */ - void _prepareReplResponseMetadata_finish(const ReplicationExecutor::CallbackArgs& cbData, - const OpTime& lastOpTimeFromClient, - rpc::ReplSetMetadata* metadata); - /** * Scheduled to cause the ReplicationCoordinator to reconsider any state that might * need to change as a result of time passing - for instance becoming PRIMARY when a single * node replica set member's stepDown period ends. @@ -722,7 +652,7 @@ private: /** * Triggers all callbacks that are blocked waiting for new heartbeat data * to decide whether or not to finish a step down. - * Should only be called from executor callbacks. + * Should only be called with _topoMutex held. */ void _signalStepDownWaiters(); @@ -731,8 +661,7 @@ private: * it is running within a global shared lock, and thus that no writes are going on at the * same time. */ - void _stepDownContinue(const ReplicationExecutor::CallbackArgs& cbData, - const ReplicationExecutor::EventHandle finishedEvent, + void _stepDownContinue(const ReplicationExecutor::EventHandle finishedEvent, OperationContext* txn, Date_t waitUntil, Date_t stepdownUntil, @@ -755,8 +684,7 @@ private: * supply an event, "finishedSettingFollowerMode", and wait for that event to * be signaled. Do not observe "*success" until after the event is signaled. */ - void _setFollowerModeFinish(const ReplicationExecutor::CallbackArgs& cbData, - const MemberState& newState, + void _setFollowerModeFinish(const MemberState& newState, const ReplicationExecutor::EventHandle& finishedSettingFollowerMode, bool* success); @@ -822,21 +750,21 @@ private: const OpTime& appliedOpTime); /** - * Starts a heartbeat for each member in the current config. Called within the executor - * context. + * Starts a heartbeat for each member in the current config. Called while holding _topoMutex + * and replCoord _mutex. */ - void _startHeartbeats_inlock(const ReplicationExecutor::CallbackArgs& cbData); + void _startHeartbeats_inlock(); /** - * Cancels all heartbeats. Called within executor context. + * Cancels all heartbeats. Called while holding _topoMutex and replCoord _mutex. */ void _cancelHeartbeats_inlock(); /** * Cancels all heartbeats, then starts a heartbeat for each member in the current config. - * Called within the executor context. + * Called while holding _topoMutex and replCoord _mutex. */ - void _restartHeartbeats_inlock(const ReplicationExecutor::CallbackArgs& cbData); + void _restartHeartbeats_inlock(); /** * Asynchronously sends a heartbeat to "target". "targetIndex" is the index @@ -853,15 +781,6 @@ private: MemberState _getMemberState_inlock() const; /** - * Callback that gives the TopologyCoordinator an initial LastVote document from - * local storage. - * - * Called only during replication startup. All other updates come from the - * TopologyCoordinator itself. - */ - void _updateLastVote(const LastVote& lastVote); - - /** * Starts loading the replication configuration from local storage, and if it is valid, * schedules a callback (of _finishLoadLocalConfig) to set it as the current replica set * config (sets _rsConfig and _thisMembersConfigIndex). @@ -892,16 +811,14 @@ private: void _stopDataReplication(); /** - * Callback that finishes the work of processReplSetInitiate() inside the replication - * executor context, in the event of a successful quorum check. + * Finishes the work of processReplSetInitiate() while holding _topoMutex, in the event of + * a successful quorum check. */ - void _finishReplSetInitiate(const ReplicationExecutor::CallbackArgs& cbData, - const ReplicaSetConfig& newConfig, - int myIndex); + void _finishReplSetInitiate(const ReplicaSetConfig& newConfig, int myIndex); /** - * Callback that finishes the work of processReplSetReconfig inside the replication - * executor context, in the event of a successful quorum check. + * Finishes the work of processReplSetReconfig while holding _topoMutex, in the event of + * a successful quorum check. */ void _finishReplSetReconfig(const ReplicationExecutor::CallbackArgs& cbData, const ReplicaSetConfig& newConfig, @@ -931,7 +848,7 @@ private: * Begins an attempt to elect this node. * Called after an incoming heartbeat changes this node's view of the set such that it * believes it can be elected PRIMARY. - * For proper concurrency, must be called via a ReplicationExecutor callback. + * For proper concurrency, must be called while holding _topoMutex. * * For old style elections the election path is: * _startElectSelf() @@ -994,27 +911,6 @@ private: void _recoverFromElectionTie(const ReplicationExecutor::CallbackArgs& cbData); /** - * Chooses a new sync source. Must be scheduled as a callback. - * - * Calls into the Topology Coordinator, which uses its current view of the set to choose - * the most appropriate sync source. - */ - void _chooseNewSyncSource(const ReplicationExecutor::CallbackArgs& cbData, - const Timestamp& lastTimestampFetched, - HostAndPort* newSyncSource); - - /** - * Adds 'host' to the sync source blacklist until 'until'. A blacklisted source cannot - * be chosen as a sync source. Schedules a callback to unblacklist the sync source to be - * run at 'until'. - * - * Must be scheduled as a callback. - */ - void _blacklistSyncSource(const ReplicationExecutor::CallbackArgs& cbData, - const HostAndPort& host, - Date_t until); - - /** * Removes 'host' from the sync source blacklist. If 'host' isn't found, it's simply * ignored and no error is thrown. * @@ -1024,17 +920,6 @@ private: const HostAndPort& host); /** - * Determines if a new sync source should be considered. - * - * Must be scheduled as a callback. - */ - void _shouldChangeSyncSource(const ReplicationExecutor::CallbackArgs& cbData, - const HostAndPort& currentSource, - const OpTime& syncSourceLastOpTime, - bool syncSourceHasSyncSource, - bool* shouldChange); - - /** * Schedules a request that the given host step down; logs any errors. */ void _requestRemotePrimaryStepdown(const HostAndPort& target); @@ -1083,34 +968,11 @@ private: const StatusWith<ReplSetHeartbeatResponse>& responseStatus); /** - * Bottom half of processHeartbeat(), which runs in the replication executor. - */ - void _processHeartbeatFinish(const ReplicationExecutor::CallbackArgs& cbData, - const ReplSetHeartbeatArgs& args, - ReplSetHeartbeatResponse* response, - Status* outStatus); - - /** - * Bottom half of processHeartbeatV1(), which runs in the replication executor. - */ - void _processHeartbeatFinishV1(const ReplicationExecutor::CallbackArgs& cbData, - const ReplSetHeartbeatArgsV1& args, - ReplSetHeartbeatResponse* response, - Status* outStatus); - /** * Scan the SlaveInfoVector and determine the highest OplogEntry present on a majority of * servers; set _lastCommittedOpTime to this new entry, if greater than the current entry. */ void _updateLastCommittedOpTime_inlock(); - void _summarizeAsHtml_finish(const ReplicationExecutor::CallbackArgs& cbData, - ReplSetHtmlSummary* output); - - /** - * Callback that gets the current term from topology coordinator. - */ - void _getTerm_helper(const ReplicationExecutor::CallbackArgs& cbData, long long* term); - /** * This is used to set a floor of "newOpTime" on the OpTimes we will consider committed. * This prevents entries from before our election from counting as committed in our view, @@ -1147,16 +1009,10 @@ private: void _dropAllSnapshots_inlock(); /** - * Callback which schedules "_handleLivenessTimeout" to be run whenever the liveness timeout - * for the node who was least recently reported to be alive occurs. - */ - void _scheduleNextLivenessUpdate(const ReplicationExecutor::CallbackArgs& cbData); - - /** * Bottom half of _scheduleNextLivenessUpdate. - * Must be called from within a callback. + * Must be called with _topoMutex held. */ - void _scheduleNextLivenessUpdate_inlock(const ReplicationExecutor::CallbackArgs& cbData); + void _scheduleNextLivenessUpdate_inlock(); /** * Callback which marks downed nodes as down, triggers a stepdown if a majority of nodes are no @@ -1227,7 +1083,7 @@ private: * Used by _scheduleWork() and _scheduleWorkAt() only. * Do not call this function directly. */ - static CallbackHandle _wrapAndScheduleWork(ScheduleFn scheduleFn, const CallbackFn& work); + CallbackHandle _wrapAndScheduleWork(ScheduleFn scheduleFn, const CallbackFn& work); /** * Creates an event. @@ -1241,6 +1097,12 @@ private: */ void _scheduleElectionWinNotification(); + /** + * Wrap a function into executor callback. + * If the callback is cancelled, the given function won't run. + */ + executor::TaskExecutor::CallbackFn _wrapAsCallbackFn(const stdx::function<void()>& work); + // // All member variables are labeled with one of the following codes indicating the // synchronization rules for accessing them. @@ -1250,17 +1112,24 @@ private: // (PS) Pointer is read-only in concurrent operation, item pointed to is self-synchronizing; // Access in any context. // (M) Reads and writes guarded by _mutex - // (X) Reads and writes must be performed in a callback in _replExecutor - // (MX) Must hold _mutex and be in a callback in _replExecutor to write; must either hold - // _mutex or be in a callback in _replExecutor to read. + // (X) Reads and writes guarded by _topoMutex + // (MX) Must hold _mutex and _topoMutex to write; must either hold _mutex or _topoMutex + // to read. // (GX) Readable under a global intent lock. Must either hold global lock in exclusive - // mode (MODE_X) or both hold global lock in shared mode (MODE_S) and be in executor - // context to write. + // mode (MODE_X) or both hold global lock in shared mode (MODE_S) and hold _topoMutex + // to write. // (I) Independently synchronized, see member variable comment. + // When both _mutex and _topoMutex are needed, the caller must follow the strict locking order + // to avoid deadlock: _topoMutex must be held before locking _mutex. + // In other words, _topoMutex can never be locked while holding _mutex. + // Protects member data of this ReplicationCoordinator. mutable stdx::mutex _mutex; // (S) + // Protects member data of the TopologyCoordinator. + mutable stdx::mutex _topoMutex; // (S) + // Handles to actively queued heartbeats. HeartbeatHandles _heartbeatHandles; // (X) diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect.cpp index a2cd0eb386e..a70c8963af8 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_elect.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_elect.cpp @@ -137,17 +137,18 @@ void ReplicationCoordinatorImpl::_startElectSelf() { // _mutex again. lk.unlock(); - StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh = _freshnessChecker->start( - &_replExecutor, - lastOpTimeApplied.getTimestamp(), - _rsConfig, - _selfIndex, - _topCoord->getMaybeUpHostAndPorts(), - stdx::bind(&ReplicationCoordinatorImpl::_onFreshnessCheckComplete, this)); + StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh = + _freshnessChecker->start(&_replExecutor, + lastOpTimeApplied.getTimestamp(), + _rsConfig, + _selfIndex, + _topCoord->getMaybeUpHostAndPorts()); if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) { return; } fassert(18681, nextPhaseEvh.getStatus()); + _replExecutor.onEvent(nextPhaseEvh.getValue(), + stdx::bind(&ReplicationCoordinatorImpl::_onFreshnessCheckComplete, this)); lossGuard.dismiss(); } @@ -159,6 +160,7 @@ void ReplicationCoordinatorImpl::_onFreshnessCheckComplete() { &_freshnessChecker, &_electCmdRunner, &_electionFinishedEvent); + LockGuard lk(_topoMutex); if (_freshnessChecker->isCanceled()) { LOG(2) << "Election canceled during freshness check phase"; @@ -180,11 +182,10 @@ void ReplicationCoordinatorImpl::_onFreshnessCheckComplete() { log() << "possible election tie; sleeping " << ms << " until " << dateToISOStringLocal(nextCandidateTime); _topCoord->setElectionSleepUntil(nextCandidateTime); - _replExecutor.scheduleWorkAt( - nextCandidateTime, - stdx::bind(&ReplicationCoordinatorImpl::_recoverFromElectionTie, - this, - stdx::placeholders::_1)); + _scheduleWorkAt(nextCandidateTime, + stdx::bind(&ReplicationCoordinatorImpl::_recoverFromElectionTie, + this, + stdx::placeholders::_1)); _sleptLastElection = true; return; } @@ -210,15 +211,14 @@ void ReplicationCoordinatorImpl::_onFreshnessCheckComplete() { _electCmdRunner.reset(new ElectCmdRunner); StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh = _electCmdRunner->start( - &_replExecutor, - _rsConfig, - _selfIndex, - _topCoord->getMaybeUpHostAndPorts(), - stdx::bind(&ReplicationCoordinatorImpl::_onElectCmdRunnerComplete, this)); + &_replExecutor, _rsConfig, _selfIndex, _topCoord->getMaybeUpHostAndPorts()); if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) { return; } fassert(18685, nextPhaseEvh.getStatus()); + + _replExecutor.onEvent(nextPhaseEvh.getValue(), + stdx::bind(&ReplicationCoordinatorImpl::_onElectCmdRunnerComplete, this)); lossGuard.dismiss(); } @@ -228,6 +228,7 @@ void ReplicationCoordinatorImpl::_onElectCmdRunnerComplete() { &_freshnessChecker, &_electCmdRunner, &_electionFinishedEvent); + LockGuard lk(_topoMutex); invariant(_freshnessChecker); invariant(_electCmdRunner); @@ -248,11 +249,10 @@ void ReplicationCoordinatorImpl::_onElectCmdRunnerComplete() { const Date_t nextCandidateTime = now + ms; log() << "waiting until " << nextCandidateTime << " before standing for election again"; _topCoord->setElectionSleepUntil(nextCandidateTime); - _replExecutor.scheduleWorkAt( - nextCandidateTime, - stdx::bind(&ReplicationCoordinatorImpl::_recoverFromElectionTie, - this, - stdx::placeholders::_1)); + _scheduleWorkAt(nextCandidateTime, + stdx::bind(&ReplicationCoordinatorImpl::_recoverFromElectionTie, + this, + stdx::placeholders::_1)); return; } @@ -272,9 +272,8 @@ void ReplicationCoordinatorImpl::_onElectCmdRunnerComplete() { void ReplicationCoordinatorImpl::_recoverFromElectionTie( const ReplicationExecutor::CallbackArgs& cbData) { - if (!cbData.status.isOK()) { - return; - } + LockGuard topoLock(_topoMutex); + auto now = _replExecutor.now(); auto lastOpApplied = getMyLastAppliedOpTime(); const auto status = _topCoord->checkShouldStandForElection(now, lastOpApplied); diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp index ca98c3cb471..84d00015fbd 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp @@ -547,6 +547,10 @@ TEST_F(ReplCoordElectTest, NodeCancelsElectionUponReceivingANewConfigDuringFresh BSONObjBuilder result; ASSERT_OK(getReplCoord()->processReplSetReconfig(&txn, config, &result)); + // Wait until election cancels. + net->enterNetwork(); + net->runReadyNetworkOperations(); + net->exitNetwork(); ASSERT(TopologyCoordinator::Role::follower == getTopoCoord().getRole()); } @@ -583,6 +587,10 @@ TEST_F(ReplCoordElectTest, NodeCancelsElectionUponReceivingANewConfigDuringElect BSONObjBuilder result; ASSERT_OK(getReplCoord()->processReplSetReconfig(&txn, config, &result)); + // Wait until election cancels. + getNet()->enterNetwork(); + getNet()->runReadyNetworkOperations(); + getNet()->exitNetwork(); ASSERT(TopologyCoordinator::Role::follower == getTopoCoord().getRole()); } diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp index acc9f8cf2ec..b0ac53d9491 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp @@ -143,18 +143,19 @@ void ReplicationCoordinatorImpl::_startElectSelfV1() { lk.unlock(); long long term = _topCoord->getTerm(); - StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh = _voteRequester->start( - &_replExecutor, - _rsConfig, - _selfIndex, - _topCoord->getTerm(), - true, // dry run - lastOpTime, - stdx::bind(&ReplicationCoordinatorImpl::_onDryRunComplete, this, term)); + StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh = + _voteRequester->start(&_replExecutor, + _rsConfig, + _selfIndex, + _topCoord->getTerm(), + true, // dry run + lastOpTime); if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) { return; } fassert(28685, nextPhaseEvh.getStatus()); + _replExecutor.onEvent(nextPhaseEvh.getValue(), + stdx::bind(&ReplicationCoordinatorImpl::_onDryRunComplete, this, term)); lossGuard.dismiss(); } @@ -163,6 +164,8 @@ void ReplicationCoordinatorImpl::_onDryRunComplete(long long originalTerm) { invariant(!_electionWinnerDeclarer); LoseElectionDryRunGuardV1 lossGuard(this); + LockGuard lk(_topoMutex); + if (_topCoord->getTerm() != originalTerm) { log() << "not running for primary, we have been superceded already"; return; @@ -222,15 +225,8 @@ void ReplicationCoordinatorImpl::_writeLastVoteForMyElection( return; } - auto cbStatus = _replExecutor.scheduleWork( - [this, lastVote](const ReplicationExecutor::CallbackArgs& cbData) { - _replExecutor.signalEvent(_electionDryRunFinishedEvent); - _startVoteRequester(lastVote.getTerm()); - }); - if (cbStatus.getStatus() == ErrorCodes::ShutdownInProgress) { - return; - } - fassert(28768, cbStatus.getStatus()); + _startVoteRequester(lastVote.getTerm()); + _replExecutor.signalEvent(_electionDryRunFinishedEvent); lossGuard.dismiss(); } @@ -240,22 +236,21 @@ void ReplicationCoordinatorImpl::_startVoteRequester(long long newTerm) { invariant(!_electionWinnerDeclarer); LoseElectionGuardV1 lossGuard(this); + LockGuard lk(_topoMutex); + const auto lastOpTime = _isDurableStorageEngine() ? getMyLastDurableOpTime() : getMyLastAppliedOpTime(); _voteRequester.reset(new VoteRequester); StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh = _voteRequester->start( - &_replExecutor, - _rsConfig, - _selfIndex, - _topCoord->getTerm(), - false, - lastOpTime, - stdx::bind(&ReplicationCoordinatorImpl::_onVoteRequestComplete, this, newTerm)); + &_replExecutor, _rsConfig, _selfIndex, _topCoord->getTerm(), false, lastOpTime); if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) { return; } fassert(28643, nextPhaseEvh.getStatus()); + _replExecutor.onEvent( + nextPhaseEvh.getValue(), + stdx::bind(&ReplicationCoordinatorImpl::_onVoteRequestComplete, this, newTerm)); lossGuard.dismiss(); } @@ -265,6 +260,8 @@ void ReplicationCoordinatorImpl::_onVoteRequestComplete(long long originalTerm) invariant(!_electionWinnerDeclarer); LoseElectionGuardV1 lossGuard(this); + LockGuard lk(_topoMutex); + if (_topCoord->getTerm() != originalTerm) { log() << "not becoming primary, we have been superceded already"; return; diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp index 881e3ef69bc..4679d17004c 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp @@ -846,6 +846,10 @@ TEST_F(ReplCoordTest, NodeCancelsElectionUponReceivingANewConfigDuringDryRun) { BSONObjBuilder result; ASSERT_OK(getReplCoord()->processReplSetReconfig(&txn, config, &result)); + // Wait until election cancels. + net->enterNetwork(); + net->runReadyNetworkOperations(); + net->exitNetwork(); ASSERT(TopologyCoordinator::Role::follower == getTopoCoord().getRole()); } @@ -882,6 +886,10 @@ TEST_F(ReplCoordTest, NodeCancelsElectionUponReceivingANewConfigDuringVotePhase) BSONObjBuilder result; ASSERT_OK(getReplCoord()->processReplSetReconfig(&txn, config, &result)); + // Wait until election cancels. + getNet()->enterNetwork(); + getNet()->runReadyNetworkOperations(); + getNet()->exitNetwork(); ASSERT(TopologyCoordinator::Role::follower == getTopoCoord().getRole()); } diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index df03ebe37b7..2be0f71b5d8 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -44,6 +44,7 @@ #include "mongo/db/repl/replication_coordinator_impl.h" #include "mongo/db/repl/replication_executor.h" #include "mongo/db/repl/topology_coordinator.h" +#include "mongo/db/repl/vote_requester.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/stdx/functional.h" @@ -67,6 +68,8 @@ using executor::RemoteCommandRequest; void ReplicationCoordinatorImpl::_doMemberHeartbeat(ReplicationExecutor::CallbackArgs cbData, const HostAndPort& target, int targetIndex) { + LockGuard topoLock(_topoMutex); + _untrackHeartbeatHandle(cbData.myHandle); if (cbData.status == ErrorCodes::CallbackCanceled) { return; @@ -113,6 +116,8 @@ void ReplicationCoordinatorImpl::_scheduleHeartbeatToTarget(const HostAndPort& t void ReplicationCoordinatorImpl::_handleHeartbeatResponse( const ReplicationExecutor::RemoteCommandCallbackArgs& cbData, int targetIndex) { + LockGuard topoLock(_topoMutex); + // remove handle from queued heartbeats _untrackHeartbeatHandle(cbData.myHandle); @@ -320,6 +325,9 @@ void ReplicationCoordinatorImpl::_stepDownFinish( if (cbData.status == ErrorCodes::CallbackCanceled) { return; } + + LockGuard topoLock(_topoMutex); + invariant(cbData.txn); // TODO Add invariant that we've got global shared or global exclusive lock, when supported // by lock manager. @@ -360,9 +368,9 @@ void ReplicationCoordinatorImpl::_scheduleHeartbeatReconfig(const ReplicaSetConf invariant(!_rsConfig.isInitialized() || _rsConfig.getConfigVersion() < newConfig.getConfigVersion()); if (_freshnessChecker) { - _freshnessChecker->cancel(&_replExecutor); + _freshnessChecker->cancel(); if (_electCmdRunner) { - _electCmdRunner->cancel(&_replExecutor); + _electCmdRunner->cancel(); } _replExecutor.onEvent( _electionFinishedEvent, @@ -383,6 +391,8 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigAfterElectionCanceled( if (cbData.status == ErrorCodes::CallbackCanceled) { return; } + + LockGuard topoLock(_topoMutex); fassert(18911, cbData.status); stdx::lock_guard<stdx::mutex> lk(_mutex); if (_inShutdown) { @@ -452,7 +462,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( } } - const stdx::function<void(const ReplicationExecutor::CallbackArgs&)> reconfigFinishFn( + const CallbackFn reconfigFinishFn( stdx::bind(&ReplicationCoordinatorImpl::_heartbeatReconfigFinish, this, stdx::placeholders::_1, @@ -481,6 +491,8 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish( return; } + LockGuard topoLock(_topoMutex); + stdx::unique_lock<stdx::mutex> lk(_mutex); invariant(_rsConfigState == kConfigHBReconfiguring); invariant(!_rsConfig.isInitialized() || @@ -501,6 +513,28 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish( return; } + // Do not conduct an election during a reconfig, as the node may not be electable post-reconfig. + if (_topCoord->getRole() == TopologyCoordinator::Role::candidate) { + if (isV1ElectionProtocol()) { + invariant(_voteRequester); + _voteRequester->cancel(); + } else { + invariant(_freshnessChecker); + _freshnessChecker->cancel(); + if (_electCmdRunner) { + _electCmdRunner->cancel(); + } + } + // Wait for the election to complete and the node's Role to be set to follower. + _replExecutor.onEvent(_electionFinishedEvent, + stdx::bind(&ReplicationCoordinatorImpl::_heartbeatReconfigFinish, + this, + stdx::placeholders::_1, + newConfig, + myIndex)); + return; + } + if (!myIndex.isOK()) { switch (myIndex.getStatus().code()) { case ErrorCodes::NodeNotFound: @@ -524,8 +558,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish( // If we do not have an index, we should pass -1 as our index to avoid falsely adding ourself to // the data structures inside of the TopologyCoordinator. const int myIndexValue = myIndex.getStatus().isOK() ? myIndex.getValue() : -1; - const PostMemberStateUpdateAction action = - _setCurrentRSConfig_inlock(cbData, newConfig, myIndexValue); + const PostMemberStateUpdateAction action = _setCurrentRSConfig_inlock(newConfig, myIndexValue); lk.unlock(); _resetElectionInfoOnProtocolVersionUpgrade(oldConfig, newConfig); _performPostMemberStateUpdateAction(action); @@ -558,14 +591,12 @@ void ReplicationCoordinatorImpl::_cancelHeartbeats_inlock() { } } -void ReplicationCoordinatorImpl::_restartHeartbeats_inlock( - const ReplicationExecutor::CallbackArgs& cbData) { +void ReplicationCoordinatorImpl::_restartHeartbeats_inlock() { _cancelHeartbeats_inlock(); - _startHeartbeats_inlock(cbData); + _startHeartbeats_inlock(); } -void ReplicationCoordinatorImpl::_startHeartbeats_inlock( - const ReplicationExecutor::CallbackArgs& cbData) { +void ReplicationCoordinatorImpl::_startHeartbeats_inlock() { const Date_t now = _replExecutor.now(); _seedList.clear(); for (int i = 0; i < _rsConfig.getNumMembers(); ++i) { @@ -579,12 +610,13 @@ void ReplicationCoordinatorImpl::_startHeartbeats_inlock( slaveInfo.lastUpdate = _replExecutor.now(); slaveInfo.down = false; } - _scheduleNextLivenessUpdate_inlock(cbData); + _scheduleNextLivenessUpdate_inlock(); } } void ReplicationCoordinatorImpl::_handleLivenessTimeout( const ReplicationExecutor::CallbackArgs& cbData) { + LockGuard topoLock(_topoMutex); stdx::lock_guard<stdx::mutex> lk(_mutex); // Only reset the callback handle if it matches, otherwise more will be coming through if (cbData.myHandle == _handleLivenessTimeoutCbh) { @@ -628,20 +660,10 @@ void ReplicationCoordinatorImpl::_handleLivenessTimeout( } } } - _scheduleNextLivenessUpdate_inlock(cbData); -} - -void ReplicationCoordinatorImpl::_scheduleNextLivenessUpdate( - const ReplicationExecutor::CallbackArgs& cbData) { - if (cbData.status == ErrorCodes::CallbackCanceled) - return; - - stdx::lock_guard<stdx::mutex> lock(_mutex); - _scheduleNextLivenessUpdate_inlock(cbData); + _scheduleNextLivenessUpdate_inlock(); } -void ReplicationCoordinatorImpl::_scheduleNextLivenessUpdate_inlock( - const ReplicationExecutor::CallbackArgs& cbData) { +void ReplicationCoordinatorImpl::_scheduleNextLivenessUpdate_inlock() { if (!isV1ElectionProtocol()) { return; } @@ -678,15 +700,14 @@ void ReplicationCoordinatorImpl::_scheduleNextLivenessUpdate_inlock( auto nextTimeout = earliestDate + _rsConfig.getElectionTimeoutPeriod(); if (nextTimeout > _replExecutor.now()) { LOG(3) << "scheduling next check at " << nextTimeout; - auto cbh = _replExecutor.scheduleWorkAt( - nextTimeout, - stdx::bind( - &ReplicationCoordinatorImpl::_handleLivenessTimeout, this, stdx::placeholders::_1)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { + auto cbh = _scheduleWorkAt(nextTimeout, + stdx::bind(&ReplicationCoordinatorImpl::_handleLivenessTimeout, + this, + stdx::placeholders::_1)); + if (!cbh) { return; } - fassert(22002, cbh.getStatus()); - _handleLivenessTimeoutCbh = cbh.getValue(); + _handleLivenessTimeoutCbh = cbh; _earliestMemberId = earliestMemberId; } } @@ -698,8 +719,7 @@ void ReplicationCoordinatorImpl::_cancelAndRescheduleLivenessUpdate_inlock(int u if (_handleLivenessTimeoutCbh.isValid()) { _replExecutor.cancel(_handleLivenessTimeoutCbh); } - _replExecutor.scheduleWork(stdx::bind( - &ReplicationCoordinatorImpl::_scheduleNextLivenessUpdate, this, stdx::placeholders::_1)); + _scheduleNextLivenessUpdate_inlock(); } void ReplicationCoordinatorImpl::_cancelPriorityTakeover_inlock() { @@ -748,6 +768,8 @@ void ReplicationCoordinatorImpl::_cancelAndRescheduleElectionTimeout_inlock() { } void ReplicationCoordinatorImpl::_startElectSelfIfEligibleV1(bool isPriorityTakeOver) { + LockGuard topoLock(_topoMutex); + if (!isV1ElectionProtocol()) { return; } diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index de8a175d01c..960987a5b46 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -1692,6 +1692,7 @@ TEST_F(StepDownTest, // Make a secondary actually catch up enterNetwork(); + getNet()->runUntil(getNet()->now() + Milliseconds(1000)); ASSERT(getNet()->hasReadyRequests()); NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest(); RemoteCommandRequest request = noi->getRequest(); @@ -1750,6 +1751,7 @@ TEST_F(StepDownTest, // Secondary has not caught up on first round of heartbeats. enterNetwork(); + getNet()->runUntil(getNet()->now() + Milliseconds(1000)); ASSERT(getNet()->hasReadyRequests()); NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest(); RemoteCommandRequest request = noi->getRequest(); @@ -4218,12 +4220,12 @@ TEST_F(ReplCoordTest, WaitForMemberState) { replCoord->setMyLastDurableOpTime(OpTime(Timestamp(1, 0), 0)); ASSERT_TRUE(replCoord->setFollowerMode(MemberState::RS_SECONDARY)); - // Successful dry run election increases term. - ASSERT_EQUALS(initialTerm + 1, replCoord->getTerm()); - // Single node cluster - this node should start election on setFollowerMode() completion. replCoord->waitForElectionFinish_forTest(); + // Successful dry run election increases term. + ASSERT_EQUALS(initialTerm + 1, replCoord->getTerm()); + auto timeout = Milliseconds(1); ASSERT_OK(replCoord->waitForMemberState(MemberState::RS_PRIMARY, timeout)); ASSERT_EQUALS(ErrorCodes::ExceededTimeLimit, @@ -4253,12 +4255,12 @@ TEST_F(ReplCoordTest, WaitForDrainFinish) { replCoord->setMyLastDurableOpTime(OpTime(Timestamp(1, 0), 0)); ASSERT_TRUE(replCoord->setFollowerMode(MemberState::RS_SECONDARY)); - // Successful dry run election increases term. - ASSERT_EQUALS(initialTerm + 1, replCoord->getTerm()); - // Single node cluster - this node should start election on setFollowerMode() completion. replCoord->waitForElectionFinish_forTest(); + // Successful dry run election increases term. + ASSERT_EQUALS(initialTerm + 1, replCoord->getTerm()); + auto timeout = Milliseconds(1); ASSERT_OK(replCoord->waitForMemberState(MemberState::RS_PRIMARY, timeout)); diff --git a/src/mongo/db/repl/scatter_gather_runner.cpp b/src/mongo/db/repl/scatter_gather_runner.cpp index 16c778399d0..042bb0f1c10 100644 --- a/src/mongo/db/repl/scatter_gather_runner.cpp +++ b/src/mongo/db/repl/scatter_gather_runner.cpp @@ -42,59 +42,63 @@ namespace mongo { namespace repl { using executor::RemoteCommandRequest; - -ScatterGatherRunner::ScatterGatherRunner(ScatterGatherAlgorithm* algorithm) - : _algorithm(algorithm), _started(false) {} - -ScatterGatherRunner::~ScatterGatherRunner() {} - -static void startTrampoline(const ReplicationExecutor::CallbackArgs& cbData, - ScatterGatherRunner* runner, - StatusWith<ReplicationExecutor::EventHandle>* result) { - // TODO: remove static cast once ScatterGatherRunner is designed to work with a generic - // TaskExecutor. - ReplicationExecutor* executor = static_cast<ReplicationExecutor*>(cbData.executor); - *result = runner->start(executor); -} - -Status ScatterGatherRunner::run(ReplicationExecutor* executor) { - StatusWith<ReplicationExecutor::EventHandle> finishEvh(ErrorCodes::InternalError, "Not set"); - StatusWith<ReplicationExecutor::CallbackHandle> startCBH = executor->scheduleWork( - stdx::bind(startTrampoline, stdx::placeholders::_1, this, &finishEvh)); - if (!startCBH.isOK()) { - return startCBH.getStatus(); - } - executor->wait(startCBH.getValue()); +using LockGuard = stdx::lock_guard<stdx::mutex>; +using CallbackHandle = ReplicationExecutor::CallbackHandle; +using EventHandle = ReplicationExecutor::EventHandle; +using RemoteCommandCallbackArgs = ReplicationExecutor::RemoteCommandCallbackArgs; +using RemoteCommandCallbackFn = ReplicationExecutor::RemoteCommandCallbackFn; + +ScatterGatherRunner::ScatterGatherRunner(ScatterGatherAlgorithm* algorithm, + ReplicationExecutor* executor) + : _executor(executor), _impl(std::make_shared<RunnerImpl>(algorithm, executor)) {} + +Status ScatterGatherRunner::run() { + auto finishEvh = start(); if (!finishEvh.isOK()) { return finishEvh.getStatus(); } - executor->waitForEvent(finishEvh.getValue()); + _executor->waitForEvent(finishEvh.getValue()); return Status::OK(); } -StatusWith<ReplicationExecutor::EventHandle> ScatterGatherRunner::start( - ReplicationExecutor* executor, const stdx::function<void()>& onCompletion) { +StatusWith<EventHandle> ScatterGatherRunner::start() { + // Callback has a shared pointer to the RunnerImpl, so it's always safe to + // access the RunnerImpl. + std::shared_ptr<RunnerImpl>& impl = _impl; + auto cb = [impl](const RemoteCommandCallbackArgs& cbData) { impl->processResponse(cbData); }; + return _impl->start(cb); +} + +void ScatterGatherRunner::cancel() { + _impl->cancel(); +} + +/** + * Scatter gather runner implementation. + */ +ScatterGatherRunner::RunnerImpl::RunnerImpl(ScatterGatherAlgorithm* algorithm, + ReplicationExecutor* executor) + : _executor(executor), _algorithm(algorithm) {} + +StatusWith<EventHandle> ScatterGatherRunner::RunnerImpl::start( + const RemoteCommandCallbackFn processResponseCB) { + LockGuard lk(_mutex); + invariant(!_started); _started = true; - _actualResponses = 0; - _onCompletion = onCompletion; - StatusWith<ReplicationExecutor::EventHandle> evh = executor->makeEvent(); + StatusWith<EventHandle> evh = _executor->makeEvent(); if (!evh.isOK()) { return evh; } _sufficientResponsesReceived = evh.getValue(); - ScopeGuard earlyReturnGuard = - MakeGuard(&ScatterGatherRunner::_signalSufficientResponsesReceived, this, executor); - - const ReplicationExecutor::RemoteCommandCallbackFn cb = - stdx::bind(&ScatterGatherRunner::_processResponse, stdx::placeholders::_1, this); + ScopeGuard earlyReturnGuard = MakeGuard(&RunnerImpl::_signalSufficientResponsesReceived, this); std::vector<RemoteCommandRequest> requests = _algorithm->getRequests(); for (size_t i = 0; i < requests.size(); ++i) { - const StatusWith<ReplicationExecutor::CallbackHandle> cbh = - executor->scheduleRemoteCommand(requests[i], cb); + const StatusWith<CallbackHandle> cbh = + _executor->scheduleRemoteCommand(requests[i], processResponseCB); if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return StatusWith<ReplicationExecutor::EventHandle>(cbh.getStatus()); + return StatusWith<EventHandle>(cbh.getStatus()); } fassert(18743, cbh.getStatus()); _callbacks.push_back(cbh.getValue()); @@ -102,50 +106,47 @@ StatusWith<ReplicationExecutor::EventHandle> ScatterGatherRunner::start( if (_callbacks.empty() || _algorithm->hasReceivedSufficientResponses()) { invariant(_algorithm->hasReceivedSufficientResponses()); - _signalSufficientResponsesReceived(executor); + _signalSufficientResponsesReceived(); } earlyReturnGuard.Dismiss(); return evh; } -void ScatterGatherRunner::cancel(ReplicationExecutor* executor) { +void ScatterGatherRunner::RunnerImpl::cancel() { + LockGuard lk(_mutex); + invariant(_started); - _signalSufficientResponsesReceived(executor); + _signalSufficientResponsesReceived(); } -void ScatterGatherRunner::_processResponse( - const ReplicationExecutor::RemoteCommandCallbackArgs& cbData, ScatterGatherRunner* runner) { - // It is possible that the ScatterGatherRunner has already gone out of scope, if the - // response indicates the callback was canceled. In that case, do not access any members - // of "runner" and return immediately. +void ScatterGatherRunner::RunnerImpl::processResponse( + const ReplicationExecutor::RemoteCommandCallbackArgs& cbData) { if (cbData.response.getStatus() == ErrorCodes::CallbackCanceled) { return; } + LockGuard lk(_mutex); + if (!_sufficientResponsesReceived.isValid()) { + // We've received sufficient responses and it's not safe to access the algorithm any more. + return; + } - ++runner->_actualResponses; - runner->_algorithm->processResponse(cbData.request, cbData.response); - if (runner->_algorithm->hasReceivedSufficientResponses()) { - // TODO: remove static cast once ScatterGatherRunner is designed to work with a generic - // TaskExecutor. - ReplicationExecutor* executor = static_cast<ReplicationExecutor*>(cbData.executor); - runner->_signalSufficientResponsesReceived(executor); + ++_actualResponses; + _algorithm->processResponse(cbData.request, cbData.response); + if (_algorithm->hasReceivedSufficientResponses()) { + _signalSufficientResponsesReceived(); } else { - invariant(runner->_actualResponses < runner->_callbacks.size()); + invariant(_actualResponses < _callbacks.size()); } } -void ScatterGatherRunner::_signalSufficientResponsesReceived(ReplicationExecutor* executor) { +void ScatterGatherRunner::RunnerImpl::_signalSufficientResponsesReceived() { if (_sufficientResponsesReceived.isValid()) { std::for_each(_callbacks.begin(), _callbacks.end(), - stdx::bind(&ReplicationExecutor::cancel, executor, stdx::placeholders::_1)); - const ReplicationExecutor::EventHandle h = _sufficientResponsesReceived; - _sufficientResponsesReceived = ReplicationExecutor::EventHandle(); - if (_onCompletion) { - _onCompletion(); - } - executor->signalEvent(h); + stdx::bind(&ReplicationExecutor::cancel, _executor, stdx::placeholders::_1)); + _executor->signalEvent(_sufficientResponsesReceived); + _sufficientResponsesReceived = EventHandle(); } } diff --git a/src/mongo/db/repl/scatter_gather_runner.h b/src/mongo/db/repl/scatter_gather_runner.h index ad7b8d93aa5..e4e6e40f404 100644 --- a/src/mongo/db/repl/scatter_gather_runner.h +++ b/src/mongo/db/repl/scatter_gather_runner.h @@ -33,6 +33,7 @@ #include "mongo/base/disallow_copying.h" #include "mongo/db/repl/replication_executor.h" #include "mongo/stdx/functional.h" +#include "mongo/stdx/mutex.h" namespace mongo { @@ -44,7 +45,7 @@ namespace repl { class ScatterGatherAlgorithm; /** - * Implementation of a scatter-gather behavior using a ReplicationExecutor. + * Interface of a scatter-gather behavior. */ class ScatterGatherRunner { MONGO_DISALLOW_COPYING(ScatterGatherRunner); @@ -53,14 +54,12 @@ public: /** * Constructs a new runner whose underlying algorithm is "algorithm". * - * "algorithm" must remain in scope until the runner's destructor completes. + * "algorithm" and "executor" must remain in scope until the runner's destructor completes. */ - explicit ScatterGatherRunner(ScatterGatherAlgorithm* algorithm); - - ~ScatterGatherRunner(); + explicit ScatterGatherRunner(ScatterGatherAlgorithm* algorithm, ReplicationExecutor* executor); /** - * Runs the scatter-gather process using "executor", and blocks until it completes. + * Runs the scatter-gather process and blocks until it completes. * * Must _not_ be run from inside the executor context. * @@ -70,55 +69,71 @@ public: * is scheduled but before it completes, this method will return Status::OK(), * just as it does when it runs successfully to completion. */ - Status run(ReplicationExecutor* executor); + Status run(); /** - * Starts executing the scatter-gather process using "executor". - * * On success, returns an event handle that will be signaled when the runner has * finished executing the scatter-gather process. After that event has been * signaled, it is safe for the caller to examine any state on "algorithm". * - * This method must be called inside the executor context. - * - * onCompletion is an optional callback that will be executed in executor context - * immediately prior to signaling the event handle returned here. It must never - * throw exceptions. It may examine the state of the algorithm object. - * - * NOTE: If the executor starts to shut down before onCompletion executes, onCompletion may - * never execute, even though the returned event will eventually be signaled. + * The returned event will eventually be signaled. */ - StatusWith<ReplicationExecutor::EventHandle> start( - ReplicationExecutor* executor, - const stdx::function<void()>& onCompletion = stdx::function<void()>()); + StatusWith<ReplicationExecutor::EventHandle> start(); /** - * Informs the runner to cancel further processing. The "executor" argument - * must point to the same executor passed to "start()". - * - * Like start, this method must be called from within the executor context. + * Informs the runner to cancel further processing. */ - void cancel(ReplicationExecutor* executor); + void cancel(); private: /** - * Callback invoked once for every response from the network. - */ - static void _processResponse(const ReplicationExecutor::RemoteCommandCallbackArgs& cbData, - ScatterGatherRunner* runner); - - /** - * Method that performs all actions required when _algorithm indicates a sufficient - * number of respones have been received. + * Implementation of a scatter-gather behavior using a ReplicationExecutor. */ - void _signalSufficientResponsesReceived(ReplicationExecutor* executor); - - ScatterGatherAlgorithm* _algorithm; - stdx::function<void()> _onCompletion; - ReplicationExecutor::EventHandle _sufficientResponsesReceived; - std::vector<ReplicationExecutor::CallbackHandle> _callbacks; - size_t _actualResponses; - bool _started; + class RunnerImpl { + public: + explicit RunnerImpl(ScatterGatherAlgorithm* algorithm, ReplicationExecutor* executor); + + /** + * On success, returns an event handle that will be signaled when the runner has + * finished executing the scatter-gather process. After that event has been + * signaled, it is safe for the caller to examine any state on "algorithm". + * + * The returned event will eventually be signaled. + */ + StatusWith<ReplicationExecutor::EventHandle> start( + const ReplicationExecutor::RemoteCommandCallbackFn cb); + + /** + * Informs the runner to cancel further processing. + */ + void cancel(); + + /** + * Callback invoked once for every response from the network. + */ + void processResponse(const ReplicationExecutor::RemoteCommandCallbackArgs& cbData); + + private: + /** + * Method that performs all actions required when _algorithm indicates a sufficient + * number of responses have been received. + */ + void _signalSufficientResponsesReceived(); + + ReplicationExecutor* _executor; // Not owned here. + ScatterGatherAlgorithm* _algorithm; // Not owned here. + ReplicationExecutor::EventHandle _sufficientResponsesReceived; + std::vector<ReplicationExecutor::CallbackHandle> _callbacks; + size_t _actualResponses = 0; + bool _started = false; + stdx::mutex _mutex; + }; + + ReplicationExecutor* _executor; // Not owned here. + + // This pointer of RunnerImpl will be shared with remote command callbacks to make sure + // callbacks can access the members safely. + std::shared_ptr<RunnerImpl> _impl; }; } // namespace repl diff --git a/src/mongo/db/repl/scatter_gather_test.cpp b/src/mongo/db/repl/scatter_gather_test.cpp index 6e4c11a65e0..98c5032bc6c 100644 --- a/src/mongo/db/repl/scatter_gather_test.cpp +++ b/src/mongo/db/repl/scatter_gather_test.cpp @@ -45,6 +45,8 @@ using executor::NetworkInterfaceMock; using executor::RemoteCommandRequest; using executor::RemoteCommandResponse; +static const int kTotalRequests = 3; + /** * Algorithm for testing the ScatterGatherRunner, which will finish running when finish() is * called, or upon receiving responses from two nodes. Creates a three requests algorithm @@ -57,7 +59,7 @@ public: virtual std::vector<RemoteCommandRequest> getRequests() const { std::vector<RemoteCommandRequest> requests; - for (int i = 0; i < 3; i++) { + for (int i = 0; i < kTotalRequests; i++) { requests.push_back(RemoteCommandRequest( HostAndPort("hostname", i), "admin", BSONObj(), Milliseconds(30 * 1000))); } @@ -150,7 +152,7 @@ public: private: void _run(ReplicationExecutor* executor) { - _result = _sgr->run(_executor); + _result = _sgr->run(); } ScatterGatherRunner* _sgr; @@ -161,10 +163,17 @@ private: // Simple onCompletion function which will toggle a bool, so that we can check the logs to // ensure the onCompletion function ran when expected. -void onCompletionTestFunction(bool* ran) { - *ran = true; +executor::TaskExecutor::CallbackFn getOnCompletionTestFunction(bool* ran) { + auto cb = [ran](const ReplicationExecutor::CallbackArgs& cbData) { + if (!cbData.status.isOK()) { + return; + } + *ran = true; + }; + return cb; } + // Confirm that running via start() will finish and run the onComplete function once sufficient // responses have been received. // Confirm that deleting both the ScatterGatherTestAlgorithm and ScatterGatherRunner while @@ -172,10 +181,10 @@ void onCompletionTestFunction(bool* ran) { // completed. TEST_F(ScatterGatherTest, DeleteAlgorithmAfterItHasCompleted) { ScatterGatherTestAlgorithm* sga = new ScatterGatherTestAlgorithm(); - ScatterGatherRunner* sgr = new ScatterGatherRunner(sga); + ScatterGatherRunner* sgr = new ScatterGatherRunner(sga, getExecutor()); bool ranCompletion = false; - StatusWith<ReplicationExecutor::EventHandle> status = - sgr->start(getExecutor(), stdx::bind(&onCompletionTestFunction, &ranCompletion)); + StatusWith<ReplicationExecutor::EventHandle> status = sgr->start(); + getExecutor()->onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion)); ASSERT_OK(status.getStatus()); ASSERT_FALSE(ranCompletion); @@ -217,10 +226,10 @@ TEST_F(ScatterGatherTest, DeleteAlgorithmAfterItHasCompleted) { // to return ErrorCodes::ShutdownInProgress. TEST_F(ScatterGatherTest, ShutdownExecutorBeforeRun) { ScatterGatherTestAlgorithm sga; - ScatterGatherRunner sgr(&sga); + ScatterGatherRunner sgr(&sga, getExecutor()); getExecutor()->shutdown(); sga.finish(); - Status status = sgr.run(getExecutor()); + Status status = sgr.run(); ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, status); } @@ -228,14 +237,17 @@ TEST_F(ScatterGatherTest, ShutdownExecutorBeforeRun) { // finishes will cause run() to return Status::OK(). TEST_F(ScatterGatherTest, ShutdownExecutorAfterRun) { ScatterGatherTestAlgorithm sga; - ScatterGatherRunner sgr(&sga); + ScatterGatherRunner sgr(&sga, getExecutor()); ScatterGatherRunnerRunner sgrr(&sgr, getExecutor()); sgrr.run(); // need to wait for the scatter-gather to be scheduled in the executor NetworkInterfaceMock* net = getNet(); net->enterNetwork(); - NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); - net->blackHole(noi); + // Black hole all requests before shutdown, so that scheduleRemoteCommand will succeed. + for (int i = 0; i < kTotalRequests; i++) { + NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); + net->blackHole(noi); + } net->exitNetwork(); getExecutor()->shutdown(); Status status = sgrr.getResult(); @@ -246,11 +258,10 @@ TEST_F(ScatterGatherTest, ShutdownExecutorAfterRun) { // to return ErrorCodes::ShutdownInProgress and should not run onCompletion(). TEST_F(ScatterGatherTest, ShutdownExecutorBeforeStart) { ScatterGatherTestAlgorithm sga; - ScatterGatherRunner sgr(&sga); + ScatterGatherRunner sgr(&sga, getExecutor()); getExecutor()->shutdown(); bool ranCompletion = false; - StatusWith<ReplicationExecutor::EventHandle> status = - sgr.start(getExecutor(), stdx::bind(&onCompletionTestFunction, &ranCompletion)); + StatusWith<ReplicationExecutor::EventHandle> status = sgr.start(); sga.finish(); ASSERT_FALSE(ranCompletion); ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, status.getStatus()); @@ -260,10 +271,10 @@ TEST_F(ScatterGatherTest, ShutdownExecutorBeforeStart) { // to return Status::OK and should not run onCompletion(). TEST_F(ScatterGatherTest, ShutdownExecutorAfterStart) { ScatterGatherTestAlgorithm sga; - ScatterGatherRunner sgr(&sga); + ScatterGatherRunner sgr(&sga, getExecutor()); bool ranCompletion = false; - StatusWith<ReplicationExecutor::EventHandle> status = - sgr.start(getExecutor(), stdx::bind(&onCompletionTestFunction, &ranCompletion)); + StatusWith<ReplicationExecutor::EventHandle> status = sgr.start(); + getExecutor()->onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion)); getExecutor()->shutdown(); sga.finish(); ASSERT_FALSE(ranCompletion); @@ -273,10 +284,10 @@ TEST_F(ScatterGatherTest, ShutdownExecutorAfterStart) { // Confirm that responses are not processed once sufficient responses have been received. TEST_F(ScatterGatherTest, DoNotProcessMoreThanSufficientResponses) { ScatterGatherTestAlgorithm sga; - ScatterGatherRunner sgr(&sga); + ScatterGatherRunner sgr(&sga, getExecutor()); bool ranCompletion = false; - StatusWith<ReplicationExecutor::EventHandle> status = - sgr.start(getExecutor(), stdx::bind(&onCompletionTestFunction, &ranCompletion)); + StatusWith<ReplicationExecutor::EventHandle> status = sgr.start(); + getExecutor()->onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion)); ASSERT_OK(status.getStatus()); ASSERT_FALSE(ranCompletion); @@ -318,17 +329,17 @@ TEST_F(ScatterGatherTest, DoNotCreateCallbacksIfHasSufficientResponsesReturnsTru ScatterGatherTestAlgorithm sga; // set hasReceivedSufficientResponses to return true before the run starts sga.finish(); - ScatterGatherRunner sgr(&sga); + ScatterGatherRunner sgr(&sga, getExecutor()); bool ranCompletion = false; - StatusWith<ReplicationExecutor::EventHandle> status = - sgr.start(getExecutor(), stdx::bind(&onCompletionTestFunction, &ranCompletion)); + StatusWith<ReplicationExecutor::EventHandle> status = sgr.start(); + getExecutor()->onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion)); ASSERT_OK(status.getStatus()); - ASSERT_TRUE(ranCompletion); - + // Wait until callback finishes. NetworkInterfaceMock* net = getNet(); net->enterNetwork(); - ASSERT_FALSE(net->hasReadyRequests()); + net->runReadyNetworkOperations(); net->exitNetwork(); + ASSERT_TRUE(ranCompletion); } #if 0 @@ -340,7 +351,7 @@ TEST_F(ScatterGatherTest, DoNotCreateCallbacksIfHasSufficientResponsesReturnsTru ScatterGatherRunner sgr(&sga); bool ranCompletion = false; StatusWith<ReplicationExecutor::EventHandle> status = sgr.start(getExecutor(), - stdx::bind(&onCompletionTestFunction, &ranCompletion)); + getOnCompletionTestFunction(&ranCompletion)); ASSERT_OK(status.getStatus()); ASSERT_FALSE(ranCompletion); @@ -379,7 +390,7 @@ TEST_F(ScatterGatherTest, DoNotCreateCallbacksIfHasSufficientResponsesReturnsTru // Confirm that running via run() will finish once sufficient responses have been received. TEST_F(ScatterGatherTest, SuccessfulScatterGatherViaRun) { ScatterGatherTestAlgorithm sga; - ScatterGatherRunner sgr(&sga); + ScatterGatherRunner sgr(&sga, getExecutor()); ScatterGatherRunnerRunner sgrr(&sgr, getExecutor()); sgrr.run(); diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h index e6217295b95..1809d906c20 100644 --- a/src/mongo/db/repl/topology_coordinator.h +++ b/src/mongo/db/repl/topology_coordinator.h @@ -208,8 +208,7 @@ public: //////////////////////////////////////////////////////////// // produces a reply to a replSetSyncFrom command - virtual void prepareSyncFromResponse(const ReplicationExecutor::CallbackArgs& data, - const HostAndPort& target, + virtual void prepareSyncFromResponse(const HostAndPort& target, const OpTime& lastOpApplied, BSONObjBuilder* response, Status* result) = 0; @@ -254,8 +253,7 @@ public: }; // produce a reply to a status request - virtual void prepareStatusResponse(const ReplicationExecutor::CallbackArgs& data, - const ReplSetStatusArgs& rsStatusArgs, + virtual void prepareStatusResponse(const ReplSetStatusArgs& rsStatusArgs, BSONObjBuilder* response, Status* result) = 0; diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp index c03a883010d..da60dd90e1e 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl.cpp @@ -350,16 +350,10 @@ void TopologyCoordinatorImpl::clearSyncSourceBlacklist() { _syncSourceBlacklist.clear(); } -void TopologyCoordinatorImpl::prepareSyncFromResponse(const ReplicationExecutor::CallbackArgs& data, - const HostAndPort& target, +void TopologyCoordinatorImpl::prepareSyncFromResponse(const HostAndPort& target, const OpTime& lastOpApplied, BSONObjBuilder* response, Status* result) { - if (data.status == ErrorCodes::CallbackCanceled) { - *result = Status(ErrorCodes::ShutdownInProgress, "replication system is shutting down"); - return; - } - response->append("syncFromRequested", target.toString()); if (_selfIndex == -1) { @@ -1507,15 +1501,9 @@ const MemberConfig* TopologyCoordinatorImpl::_currentPrimaryMember() const { return &(_rsConfig.getMemberAt(_currentPrimaryIndex)); } -void TopologyCoordinatorImpl::prepareStatusResponse(const ReplicationExecutor::CallbackArgs& data, - const ReplSetStatusArgs& rsStatusArgs, +void TopologyCoordinatorImpl::prepareStatusResponse(const ReplSetStatusArgs& rsStatusArgs, BSONObjBuilder* response, Status* result) { - if (data.status == ErrorCodes::CallbackCanceled) { - *result = Status(ErrorCodes::ShutdownInProgress, "replication system is shutting down"); - return; - } - // output for each member vector<BSONObj> membersOut; const MemberState myState = getMemberState(); diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h index f86c0e8d928..597a1cc6bc3 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.h +++ b/src/mongo/db/repl/topology_coordinator_impl.h @@ -164,8 +164,7 @@ public: virtual void setElectionSleepUntil(Date_t newTime); virtual void setFollowerMode(MemberState::MS newMode); virtual void adjustMaintenanceCountBy(int inc); - virtual void prepareSyncFromResponse(const ReplicationExecutor::CallbackArgs& data, - const HostAndPort& target, + virtual void prepareSyncFromResponse(const HostAndPort& target, const OpTime& lastOpApplied, BSONObjBuilder* response, Status* result); @@ -191,8 +190,7 @@ public: const OpTime& lastOpApplied, const OpTime& lastOpDurable, ReplSetHeartbeatResponse* response); - virtual void prepareStatusResponse(const ReplicationExecutor::CallbackArgs& data, - const ReplSetStatusArgs& rsStatusArgs, + virtual void prepareStatusResponse(const ReplSetStatusArgs& rsStatusArgs, BSONObjBuilder* response, Status* result); virtual void fillIsMasterForReplSet(IsMasterResponse* response); diff --git a/src/mongo/db/repl/topology_coordinator_impl_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_test.cpp index 27711e3fcf7..24c49837c2c 100644 --- a/src/mongo/db/repl/topology_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl_test.cpp @@ -829,8 +829,7 @@ TEST_F(TopoCoordTest, NodeReturnsNotSecondaryWhenSyncFromIsRunPriorToHavingAConf BSONObjBuilder response; // if we do not have an index in the config, we should get ErrorCodes::NotSecondary - getTopoCoord().prepareSyncFromResponse( - cbData(), HostAndPort("h1"), ourOpTime, &response, &result); + getTopoCoord().prepareSyncFromResponse(HostAndPort("h1"), ourOpTime, &response, &result); ASSERT_EQUALS(ErrorCodes::NotSecondary, result); ASSERT_EQUALS("Removed and uninitialized nodes do not sync", result.reason()); } @@ -854,8 +853,7 @@ TEST_F(TopoCoordTest, NodeReturnsNotSecondaryWhenSyncFromIsRunAgainstArbiter) { << "h1"))), 0); - getTopoCoord().prepareSyncFromResponse( - cbData(), HostAndPort("h1"), ourOpTime, &response, &result); + getTopoCoord().prepareSyncFromResponse(HostAndPort("h1"), ourOpTime, &response, &result); ASSERT_EQUALS(ErrorCodes::NotSecondary, result); ASSERT_EQUALS("arbiters don't sync", result.reason()); } @@ -891,8 +889,7 @@ TEST_F(TopoCoordTest, NodeReturnsNotSecondaryWhenSyncFromIsRunAgainstPrimary) { makeSelfPrimary(); ASSERT_EQUALS(0, getCurrentPrimaryIndex()); getTopoCoord()._setCurrentPrimaryForTest(0); - getTopoCoord().prepareSyncFromResponse( - cbData(), HostAndPort("h3"), ourOpTime, &response, &result); + getTopoCoord().prepareSyncFromResponse(HostAndPort("h3"), ourOpTime, &response, &result); ASSERT_EQUALS(ErrorCodes::NotSecondary, result); ASSERT_EQUALS("primaries don't sync", result.reason()); ASSERT_EQUALS("h3:27017", response.obj()["syncFromRequested"].String()); @@ -926,7 +923,7 @@ TEST_F(TopoCoordTest, NodeReturnsNodeNotFoundWhenSyncFromRequestsANodeNotInConfi setSelfMemberState(MemberState::RS_SECONDARY); getTopoCoord().prepareSyncFromResponse( - cbData(), HostAndPort("fakemember"), ourOpTime, &response, &result); + HostAndPort("fakemember"), ourOpTime, &response, &result); ASSERT_EQUALS(ErrorCodes::NodeNotFound, result); ASSERT_EQUALS("Could not find member \"fakemember:27017\" in replica set", result.reason()); } @@ -959,8 +956,7 @@ TEST_F(TopoCoordTest, NodeReturnsInvalidOptionsWhenSyncFromRequestsSelf) { setSelfMemberState(MemberState::RS_SECONDARY); // Try to sync from self - getTopoCoord().prepareSyncFromResponse( - cbData(), HostAndPort("hself"), ourOpTime, &response, &result); + getTopoCoord().prepareSyncFromResponse(HostAndPort("hself"), ourOpTime, &response, &result); ASSERT_EQUALS(ErrorCodes::InvalidOptions, result); ASSERT_EQUALS("I cannot sync from myself", result.reason()); } @@ -994,8 +990,7 @@ TEST_F(TopoCoordTest, NodeReturnsInvalidOptionsWhenSyncFromRequestsArbiter) { // Try to sync from an arbiter - getTopoCoord().prepareSyncFromResponse( - cbData(), HostAndPort("h1"), ourOpTime, &response, &result); + getTopoCoord().prepareSyncFromResponse(HostAndPort("h1"), ourOpTime, &response, &result); ASSERT_EQUALS(ErrorCodes::InvalidOptions, result); ASSERT_EQUALS("Cannot sync from \"h1:27017\" because it is an arbiter", result.reason()); } @@ -1028,8 +1023,7 @@ TEST_F(TopoCoordTest, NodeReturnsInvalidOptionsWhenSyncFromRequestsAnIndexNonbui setSelfMemberState(MemberState::RS_SECONDARY); // Try to sync from a node that doesn't build indexes - getTopoCoord().prepareSyncFromResponse( - cbData(), HostAndPort("h2"), ourOpTime, &response, &result); + getTopoCoord().prepareSyncFromResponse(HostAndPort("h2"), ourOpTime, &response, &result); ASSERT_EQUALS(ErrorCodes::InvalidOptions, result); ASSERT_EQUALS("Cannot sync from \"h2:27017\" because it does not build indexes", result.reason()); @@ -1065,8 +1059,7 @@ TEST_F(TopoCoordTest, NodeReturnsHostUnreachableWhenSyncFromRequestsADownNode) { // Try to sync from a member that is down receiveDownHeartbeat(HostAndPort("h4"), "rs0", OpTime()); - getTopoCoord().prepareSyncFromResponse( - cbData(), HostAndPort("h4"), ourOpTime, &response, &result); + getTopoCoord().prepareSyncFromResponse(HostAndPort("h4"), ourOpTime, &response, &result); ASSERT_EQUALS(ErrorCodes::HostUnreachable, result); ASSERT_EQUALS("I cannot reach the requested member: h4:27017", result.reason()); } @@ -1102,8 +1095,7 @@ TEST_F(TopoCoordTest, ChooseRequestedNodeWhenSyncFromRequestsAStaleNode) { heartbeatFromMember( HostAndPort("h5"), "rs0", MemberState::RS_SECONDARY, staleOpTime, Milliseconds(100)); - getTopoCoord().prepareSyncFromResponse( - cbData(), HostAndPort("h5"), ourOpTime, &response, &result); + getTopoCoord().prepareSyncFromResponse(HostAndPort("h5"), ourOpTime, &response, &result); ASSERT_OK(result); ASSERT_EQUALS("requested member \"h5:27017\" is more than 10 seconds behind us", response.obj()["warning"].String()); @@ -1142,8 +1134,7 @@ TEST_F(TopoCoordTest, ChooseRequestedNodeWhenSyncFromRequestsAValidNode) { heartbeatFromMember( HostAndPort("h6"), "rs0", MemberState::RS_SECONDARY, ourOpTime, Milliseconds(100)); - getTopoCoord().prepareSyncFromResponse( - cbData(), HostAndPort("h6"), ourOpTime, &response, &result); + getTopoCoord().prepareSyncFromResponse(HostAndPort("h6"), ourOpTime, &response, &result); ASSERT_OK(result); BSONObj responseObj = response.obj(); ASSERT_FALSE(responseObj.hasField("warning")); @@ -1183,8 +1174,7 @@ TEST_F(TopoCoordTest, HostAndPort("h6"), "rs0", MemberState::RS_SECONDARY, ourOpTime, Milliseconds(100)); // node goes down between forceSync and chooseNewSyncSource - getTopoCoord().prepareSyncFromResponse( - cbData(), HostAndPort("h6"), ourOpTime, &response, &result); + getTopoCoord().prepareSyncFromResponse(HostAndPort("h6"), ourOpTime, &response, &result); BSONObj responseObj = response.obj(); ASSERT_FALSE(responseObj.hasField("warning")); receiveDownHeartbeat(HostAndPort("h6"), "rs0", OpTime()); @@ -1222,8 +1212,7 @@ TEST_F(TopoCoordTest, NodeReturnsUnauthorizedWhenSyncFromRequestsANodeWeAreNotAu // Try to sync from a member that is unauth'd receiveDownHeartbeat(HostAndPort("h5"), "rs0", OpTime(), ErrorCodes::Unauthorized); - getTopoCoord().prepareSyncFromResponse( - cbData(), HostAndPort("h5"), ourOpTime, &response, &result); + getTopoCoord().prepareSyncFromResponse(HostAndPort("h5"), ourOpTime, &response, &result); ASSERT_NOT_OK(result); ASSERT_EQUALS(ErrorCodes::Unauthorized, result.code()); ASSERT_EQUALS("not authorized to communicate with h5:27017", result.reason()); @@ -1245,8 +1234,7 @@ TEST_F(TopoCoordTest, NodeReturnsInvalidOptionsWhenAskedToSyncFromANonVoterAsAVo "]}"), 0); - getTopoCoord().prepareSyncFromResponse( - cbData(), HostAndPort("h2"), ourOpTime, &response, &result); + getTopoCoord().prepareSyncFromResponse(HostAndPort("h2"), ourOpTime, &response, &result); ASSERT_EQUALS(ErrorCodes::InvalidOptions, result); ASSERT_EQUALS("Cannot sync from \"h2:27017\" because it is not a voter", result.reason()); } @@ -1284,8 +1272,7 @@ TEST_F(TopoCoordTest, heartbeatFromMember( HostAndPort("h5"), "rs0", MemberState::RS_SECONDARY, ourOpTime, Milliseconds(100)); - getTopoCoord().prepareSyncFromResponse( - cbData(), HostAndPort("h5"), ourOpTime, &response, &result); + getTopoCoord().prepareSyncFromResponse(HostAndPort("h5"), ourOpTime, &response, &result); ASSERT_OK(result); BSONObj responseObj = response.obj(); ASSERT_FALSE(responseObj.hasField("warning")); @@ -1297,8 +1284,7 @@ TEST_F(TopoCoordTest, HostAndPort("h6"), "rs0", MemberState::RS_SECONDARY, ourOpTime, Milliseconds(100)); // Sync successfully from another up-to-date member. - getTopoCoord().prepareSyncFromResponse( - cbData(), HostAndPort("h6"), ourOpTime, &response2, &result); + getTopoCoord().prepareSyncFromResponse(HostAndPort("h6"), ourOpTime, &response2, &result); BSONObj response2Obj = response2.obj(); ASSERT_FALSE(response2Obj.hasField("warning")); ASSERT_EQUALS(HostAndPort("h5").toString(), response2Obj["prevSyncTarget"].String()); @@ -1372,7 +1358,6 @@ TEST_F(TopoCoordTest, ReplSetGetStatus) { BSONObjBuilder statusBuilder; Status resultStatus(ErrorCodes::InternalError, "prepareStatusResponse didn't set result"); getTopoCoord().prepareStatusResponse( - cbData(), TopologyCoordinator::ReplSetStatusArgs{ curTime, static_cast<unsigned>(durationCount<Seconds>(uptimeSecs)), @@ -1488,7 +1473,6 @@ TEST_F(TopoCoordTest, NodeReturnsInvalidReplicaSetConfigInResponseToGetStatusWhe BSONObjBuilder statusBuilder; Status resultStatus(ErrorCodes::InternalError, "prepareStatusResponse didn't set result"); getTopoCoord().prepareStatusResponse( - cbData(), TopologyCoordinator::ReplSetStatusArgs{ curTime, static_cast<unsigned>(durationCount<Seconds>(uptimeSecs)), @@ -2187,7 +2171,6 @@ public: BSONObjBuilder statusBuilder; Status resultStatus(ErrorCodes::InternalError, "prepareStatusResponse didn't set result"); getTopoCoord().prepareStatusResponse( - cbData(), TopologyCoordinator::ReplSetStatusArgs{_firstRequestDate + Milliseconds(4000), 10, OpTime(Timestamp(100, 0), 0), @@ -2251,7 +2234,6 @@ public: BSONObjBuilder statusBuilder; Status resultStatus(ErrorCodes::InternalError, "prepareStatusResponse didn't set result"); getTopoCoord().prepareStatusResponse( - cbData(), TopologyCoordinator::ReplSetStatusArgs{firstRequestDate() + Seconds(4), 10, OpTime(Timestamp(100, 0), 0), @@ -2568,7 +2550,6 @@ TEST_F(HeartbeatResponseTestTwoRetries, NodeDoesNotRetryHeartbeatsAfterFailingTw BSONObjBuilder statusBuilder; Status resultStatus(ErrorCodes::InternalError, "prepareStatusResponse didn't set result"); getTopoCoord().prepareStatusResponse( - cbData(), TopologyCoordinator::ReplSetStatusArgs{firstRequestDate() + Milliseconds(4900), 10, OpTime(Timestamp(100, 0), 0), @@ -2810,7 +2791,6 @@ TEST_F(HeartbeatResponseTestTwoRetries, BSONObjBuilder statusBuilder; Status resultStatus(ErrorCodes::InternalError, "prepareStatusResponse didn't set result"); getTopoCoord().prepareStatusResponse( - cbData(), TopologyCoordinator::ReplSetStatusArgs{firstRequestDate() + Milliseconds(7000), 600, OpTime(Timestamp(100, 0), 0), @@ -4238,42 +4218,6 @@ TEST_F(TopoCoordTest, ASSERT(TopologyCoordinator::Role::candidate == getTopoCoord().getRole()); } -class ShutdownInProgressTest : public TopoCoordTest { -public: - ShutdownInProgressTest() - : ourCbData(NULL, - ReplicationExecutor::CallbackHandle(), - Status(ErrorCodes::CallbackCanceled, "")) {} - - virtual ReplicationExecutor::CallbackArgs cbData() { - return ourCbData; - } - -private: - ReplicationExecutor::CallbackArgs ourCbData; -}; - -TEST_F(ShutdownInProgressTest, NodeReturnsShutdownInProgressWhenSyncFromCallbackCanceled) { - Status result = Status::OK(); - BSONObjBuilder response; - getTopoCoord().prepareSyncFromResponse( - cbData(), HostAndPort("host2:27017"), OpTime(), &response, &result); - ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, result); - ASSERT_TRUE(response.obj().isEmpty()); -} - -TEST_F(ShutdownInProgressTest, NodeReturnsShutDownInProgressWhenGetReplSetStatusCallbackCanceled) { - Status result = Status::OK(); - BSONObjBuilder response; - getTopoCoord().prepareStatusResponse( - cbData(), - TopologyCoordinator::ReplSetStatusArgs{Date_t(), 0, OpTime(), OpTime(), OpTime(), OpTime()}, - &response, - &result); - ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, result); - ASSERT_TRUE(response.obj().isEmpty()); -} - class PrepareHeartbeatResponseTest : public TopoCoordTest { public: virtual void setUp() { diff --git a/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp index 70cbf8b97aa..6adf5542212 100644 --- a/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp @@ -822,8 +822,7 @@ TEST_F(TopoCoordTest, NodeReturnsNotSecondaryWhenSyncFromIsRunPriorToHavingAConf BSONObjBuilder response; // if we do not have an index in the config, we should get ErrorCodes::NotSecondary - getTopoCoord().prepareSyncFromResponse( - cbData(), HostAndPort("h1"), ourOpTime, &response, &result); + getTopoCoord().prepareSyncFromResponse(HostAndPort("h1"), ourOpTime, &response, &result); ASSERT_EQUALS(ErrorCodes::NotSecondary, result); ASSERT_EQUALS("Removed and uninitialized nodes do not sync", result.reason()); } @@ -847,8 +846,7 @@ TEST_F(TopoCoordTest, NodeReturnsNotSecondaryWhenSyncFromIsRunAgainstArbiter) { << "h1"))), 0); - getTopoCoord().prepareSyncFromResponse( - cbData(), HostAndPort("h1"), ourOpTime, &response, &result); + getTopoCoord().prepareSyncFromResponse(HostAndPort("h1"), ourOpTime, &response, &result); ASSERT_EQUALS(ErrorCodes::NotSecondary, result); ASSERT_EQUALS("arbiters don't sync", result.reason()); } @@ -884,8 +882,7 @@ TEST_F(TopoCoordTest, NodeReturnsNotSecondaryWhenSyncFromIsRunAgainstPrimary) { makeSelfPrimary(); ASSERT_EQUALS(0, getCurrentPrimaryIndex()); getTopoCoord()._setCurrentPrimaryForTest(0); - getTopoCoord().prepareSyncFromResponse( - cbData(), HostAndPort("h3"), ourOpTime, &response, &result); + getTopoCoord().prepareSyncFromResponse(HostAndPort("h3"), ourOpTime, &response, &result); ASSERT_EQUALS(ErrorCodes::NotSecondary, result); ASSERT_EQUALS("primaries don't sync", result.reason()); ASSERT_EQUALS("h3:27017", response.obj()["syncFromRequested"].String()); @@ -919,7 +916,7 @@ TEST_F(TopoCoordTest, NodeReturnsNodeNotFoundWhenSyncFromRequestsANodeNotInConfi setSelfMemberState(MemberState::RS_SECONDARY); getTopoCoord().prepareSyncFromResponse( - cbData(), HostAndPort("fakemember"), ourOpTime, &response, &result); + HostAndPort("fakemember"), ourOpTime, &response, &result); ASSERT_EQUALS(ErrorCodes::NodeNotFound, result); ASSERT_EQUALS("Could not find member \"fakemember:27017\" in replica set", result.reason()); } @@ -952,8 +949,7 @@ TEST_F(TopoCoordTest, NodeReturnsInvalidOptionsWhenSyncFromRequestsSelf) { setSelfMemberState(MemberState::RS_SECONDARY); // Try to sync from self - getTopoCoord().prepareSyncFromResponse( - cbData(), HostAndPort("hself"), ourOpTime, &response, &result); + getTopoCoord().prepareSyncFromResponse(HostAndPort("hself"), ourOpTime, &response, &result); ASSERT_EQUALS(ErrorCodes::InvalidOptions, result); ASSERT_EQUALS("I cannot sync from myself", result.reason()); } @@ -987,8 +983,7 @@ TEST_F(TopoCoordTest, NodeReturnsInvalidOptionsWhenSyncFromRequestsArbiter) { // Try to sync from an arbiter - getTopoCoord().prepareSyncFromResponse( - cbData(), HostAndPort("h1"), ourOpTime, &response, &result); + getTopoCoord().prepareSyncFromResponse(HostAndPort("h1"), ourOpTime, &response, &result); ASSERT_EQUALS(ErrorCodes::InvalidOptions, result); ASSERT_EQUALS("Cannot sync from \"h1:27017\" because it is an arbiter", result.reason()); } @@ -1021,8 +1016,7 @@ TEST_F(TopoCoordTest, NodeReturnsInvalidOptionsWhenSyncFromRequestsAnIndexNonbui setSelfMemberState(MemberState::RS_SECONDARY); // Try to sync from a node that doesn't build indexes - getTopoCoord().prepareSyncFromResponse( - cbData(), HostAndPort("h2"), ourOpTime, &response, &result); + getTopoCoord().prepareSyncFromResponse(HostAndPort("h2"), ourOpTime, &response, &result); ASSERT_EQUALS(ErrorCodes::InvalidOptions, result); ASSERT_EQUALS("Cannot sync from \"h2:27017\" because it does not build indexes", result.reason()); @@ -1058,8 +1052,7 @@ TEST_F(TopoCoordTest, NodeReturnsHostUnreachableWhenSyncFromRequestsADownNode) { // Try to sync from a member that is down receiveDownHeartbeat(HostAndPort("h4"), "rs0", OpTime()); - getTopoCoord().prepareSyncFromResponse( - cbData(), HostAndPort("h4"), ourOpTime, &response, &result); + getTopoCoord().prepareSyncFromResponse(HostAndPort("h4"), ourOpTime, &response, &result); ASSERT_EQUALS(ErrorCodes::HostUnreachable, result); ASSERT_EQUALS("I cannot reach the requested member: h4:27017", result.reason()); } @@ -1095,8 +1088,7 @@ TEST_F(TopoCoordTest, ChooseRequestedNodeWhenSyncFromRequestsAStaleNode) { heartbeatFromMember( HostAndPort("h5"), "rs0", MemberState::RS_SECONDARY, staleOpTime, Milliseconds(100)); - getTopoCoord().prepareSyncFromResponse( - cbData(), HostAndPort("h5"), ourOpTime, &response, &result); + getTopoCoord().prepareSyncFromResponse(HostAndPort("h5"), ourOpTime, &response, &result); ASSERT_OK(result); ASSERT_EQUALS("requested member \"h5:27017\" is more than 10 seconds behind us", response.obj()["warning"].String()); @@ -1135,8 +1127,7 @@ TEST_F(TopoCoordTest, ChooseRequestedNodeWhenSyncFromRequestsAValidNode) { heartbeatFromMember( HostAndPort("h6"), "rs0", MemberState::RS_SECONDARY, ourOpTime, Milliseconds(100)); - getTopoCoord().prepareSyncFromResponse( - cbData(), HostAndPort("h6"), ourOpTime, &response, &result); + getTopoCoord().prepareSyncFromResponse(HostAndPort("h6"), ourOpTime, &response, &result); ASSERT_OK(result); BSONObj responseObj = response.obj(); ASSERT_FALSE(responseObj.hasField("warning")); @@ -1176,8 +1167,7 @@ TEST_F(TopoCoordTest, HostAndPort("h6"), "rs0", MemberState::RS_SECONDARY, ourOpTime, Milliseconds(100)); // node goes down between forceSync and chooseNewSyncSource - getTopoCoord().prepareSyncFromResponse( - cbData(), HostAndPort("h6"), ourOpTime, &response, &result); + getTopoCoord().prepareSyncFromResponse(HostAndPort("h6"), ourOpTime, &response, &result); BSONObj responseObj = response.obj(); ASSERT_FALSE(responseObj.hasField("warning")); receiveDownHeartbeat(HostAndPort("h6"), "rs0", OpTime()); @@ -1215,8 +1205,7 @@ TEST_F(TopoCoordTest, NodeReturnsUnauthorizedWhenSyncFromRequestsANodeWeAreNotAu // Try to sync from a member that is unauth'd receiveDownHeartbeat(HostAndPort("h5"), "rs0", OpTime(), ErrorCodes::Unauthorized); - getTopoCoord().prepareSyncFromResponse( - cbData(), HostAndPort("h5"), ourOpTime, &response, &result); + getTopoCoord().prepareSyncFromResponse(HostAndPort("h5"), ourOpTime, &response, &result); ASSERT_NOT_OK(result); ASSERT_EQUALS(ErrorCodes::Unauthorized, result.code()); ASSERT_EQUALS("not authorized to communicate with h5:27017", result.reason()); @@ -1238,8 +1227,7 @@ TEST_F(TopoCoordTest, NodeReturnsInvalidOptionsWhenAskedToSyncFromANonVoterAsAVo "]}"), 0); - getTopoCoord().prepareSyncFromResponse( - cbData(), HostAndPort("h2"), ourOpTime, &response, &result); + getTopoCoord().prepareSyncFromResponse(HostAndPort("h2"), ourOpTime, &response, &result); ASSERT_EQUALS(ErrorCodes::InvalidOptions, result); ASSERT_EQUALS("Cannot sync from \"h2:27017\" because it is not a voter", result.reason()); } @@ -1277,8 +1265,7 @@ TEST_F(TopoCoordTest, heartbeatFromMember( HostAndPort("h5"), "rs0", MemberState::RS_SECONDARY, ourOpTime, Milliseconds(100)); - getTopoCoord().prepareSyncFromResponse( - cbData(), HostAndPort("h5"), ourOpTime, &response, &result); + getTopoCoord().prepareSyncFromResponse(HostAndPort("h5"), ourOpTime, &response, &result); ASSERT_OK(result); BSONObj responseObj = response.obj(); ASSERT_FALSE(responseObj.hasField("warning")); @@ -1290,8 +1277,7 @@ TEST_F(TopoCoordTest, HostAndPort("h6"), "rs0", MemberState::RS_SECONDARY, ourOpTime, Milliseconds(100)); // Sync successfully from another up-to-date member. - getTopoCoord().prepareSyncFromResponse( - cbData(), HostAndPort("h6"), ourOpTime, &response2, &result); + getTopoCoord().prepareSyncFromResponse(HostAndPort("h6"), ourOpTime, &response2, &result); BSONObj response2Obj = response2.obj(); ASSERT_FALSE(response2Obj.hasField("warning")); ASSERT_EQUALS(HostAndPort("h5").toString(), response2Obj["prevSyncTarget"].String()); @@ -1365,7 +1351,6 @@ TEST_F(TopoCoordTest, ReplSetGetStatus) { BSONObjBuilder statusBuilder; Status resultStatus(ErrorCodes::InternalError, "prepareStatusResponse didn't set result"); getTopoCoord().prepareStatusResponse( - cbData(), TopologyCoordinator::ReplSetStatusArgs{ curTime, static_cast<unsigned>(durationCount<Seconds>(uptimeSecs)), @@ -1481,7 +1466,6 @@ TEST_F(TopoCoordTest, NodeReturnsInvalidReplicaSetConfigInResponseToGetStatusWhe BSONObjBuilder statusBuilder; Status resultStatus(ErrorCodes::InternalError, "prepareStatusResponse didn't set result"); getTopoCoord().prepareStatusResponse( - cbData(), TopologyCoordinator::ReplSetStatusArgs{ curTime, static_cast<unsigned>(durationCount<Seconds>(uptimeSecs)), @@ -1518,42 +1502,6 @@ TEST_F(TopoCoordTest, HeartbeatFrequencyShouldBeHalfElectionTimeoutWhenArbiter) ASSERT_EQUALS(expected, action.getNextHeartbeatStartDate()); } -class ShutdownInProgressTest : public TopoCoordTest { -public: - ShutdownInProgressTest() - : ourCbData(NULL, - ReplicationExecutor::CallbackHandle(), - Status(ErrorCodes::CallbackCanceled, "")) {} - - virtual ReplicationExecutor::CallbackArgs cbData() { - return ourCbData; - } - -private: - ReplicationExecutor::CallbackArgs ourCbData; -}; - -TEST_F(ShutdownInProgressTest, NodeReturnsShutdownInProgressWhenSyncFromCallbackCanceled) { - Status result = Status::OK(); - BSONObjBuilder response; - getTopoCoord().prepareSyncFromResponse( - cbData(), HostAndPort("host2:27017"), OpTime(), &response, &result); - ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, result); - ASSERT_TRUE(response.obj().isEmpty()); -} - -TEST_F(ShutdownInProgressTest, NodeReturnsShutDownInProgressWhenGetReplSetStatusCallbackCanceled) { - Status result = Status::OK(); - BSONObjBuilder response; - getTopoCoord().prepareStatusResponse( - cbData(), - TopologyCoordinator::ReplSetStatusArgs{Date_t(), 0, OpTime(), OpTime(), OpTime(), OpTime()}, - &response, - &result); - ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, result); - ASSERT_TRUE(response.obj().isEmpty()); -} - class PrepareHeartbeatResponseV1Test : public TopoCoordTest { public: virtual void setUp() { @@ -4051,7 +3999,6 @@ public: BSONObjBuilder statusBuilder; Status resultStatus(ErrorCodes::InternalError, "prepareStatusResponse didn't set result"); getTopoCoord().prepareStatusResponse( - cbData(), TopologyCoordinator::ReplSetStatusArgs{_firstRequestDate + Milliseconds(4000), 10, OpTime(Timestamp(100, 0), 0), @@ -4135,7 +4082,6 @@ public: BSONObjBuilder statusBuilder; Status resultStatus(ErrorCodes::InternalError, "prepareStatusResponse didn't set result"); getTopoCoord().prepareStatusResponse( - cbData(), TopologyCoordinator::ReplSetStatusArgs{firstRequestDate() + Seconds(4), 10, OpTime(Timestamp(100, 0), 0), @@ -4184,7 +4130,6 @@ TEST_F(HeartbeatResponseTestTwoRetriesV1, NodeDoesNotRetryHeartbeatsAfterFailing BSONObjBuilder statusBuilder; Status resultStatus(ErrorCodes::InternalError, "prepareStatusResponse didn't set result"); getTopoCoord().prepareStatusResponse( - cbData(), TopologyCoordinator::ReplSetStatusArgs{firstRequestDate() + Milliseconds(4900), 10, OpTime(Timestamp(100, 0), 0), @@ -4245,7 +4190,6 @@ TEST_F(HeartbeatResponseTestTwoRetriesV1, HeartbeatThreeNonconsecutiveFailures) BSONObjBuilder statusBuilder; Status resultStatus(ErrorCodes::InternalError, "prepareStatusResponse didn't set result"); getTopoCoord().prepareStatusResponse( - cbData(), TopologyCoordinator::ReplSetStatusArgs{firstRequestDate() + Milliseconds(7000), 600, OpTime(Timestamp(100, 0), 0), diff --git a/src/mongo/db/repl/vote_requester.cpp b/src/mongo/db/repl/vote_requester.cpp index eafa68715e2..836617e5493 100644 --- a/src/mongo/db/repl/vote_requester.cpp +++ b/src/mongo/db/repl/vote_requester.cpp @@ -141,22 +141,20 @@ unordered_set<HostAndPort> VoteRequester::Algorithm::getResponders() const { VoteRequester::VoteRequester() : _isCanceled(false) {} VoteRequester::~VoteRequester() {} -StatusWith<ReplicationExecutor::EventHandle> VoteRequester::start( - ReplicationExecutor* executor, - const ReplicaSetConfig& rsConfig, - long long candidateIndex, - long long term, - bool dryRun, - OpTime lastDurableOpTime, - const stdx::function<void()>& onCompletion) { +StatusWith<ReplicationExecutor::EventHandle> VoteRequester::start(ReplicationExecutor* executor, + const ReplicaSetConfig& rsConfig, + long long candidateIndex, + long long term, + bool dryRun, + OpTime lastDurableOpTime) { _algorithm.reset(new Algorithm(rsConfig, candidateIndex, term, dryRun, lastDurableOpTime)); - _runner.reset(new ScatterGatherRunner(_algorithm.get())); - return _runner->start(executor, onCompletion); + _runner.reset(new ScatterGatherRunner(_algorithm.get(), executor)); + return _runner->start(); } -void VoteRequester::cancel(ReplicationExecutor* executor) { +void VoteRequester::cancel() { _isCanceled = true; - _runner->cancel(executor); + _runner->cancel(); } VoteRequester::Result VoteRequester::getResult() const { diff --git a/src/mongo/db/repl/vote_requester.h b/src/mongo/db/repl/vote_requester.h index 433facf345c..cb0be7fc888 100644 --- a/src/mongo/db/repl/vote_requester.h +++ b/src/mongo/db/repl/vote_requester.h @@ -105,26 +105,19 @@ public: * in currentConfig, in attempt to receive sufficient votes to win the election. * * evh can be used to schedule a callback when the process is complete. - * This function must be run in the executor, as it must be synchronous with the command - * callbacks that it schedules. * If this function returns Status::OK(), evh is then guaranteed to be signaled. **/ - StatusWith<ReplicationExecutor::EventHandle> start( - ReplicationExecutor* executor, - const ReplicaSetConfig& rsConfig, - long long candidateIndex, - long long term, - bool dryRun, - OpTime lastDurableOpTime, - const stdx::function<void()>& onCompletion = stdx::function<void()>()); + StatusWith<ReplicationExecutor::EventHandle> start(ReplicationExecutor* executor, + const ReplicaSetConfig& rsConfig, + long long candidateIndex, + long long term, + bool dryRun, + OpTime lastDurableOpTime); /** - * Informs the VoteRequester to cancel further processing. The "executor" - * argument must point to the same executor passed to "start()". - * - * Like start(), this method must run in the executor context. + * Informs the VoteRequester to cancel further processing. */ - void cancel(ReplicationExecutor* executor); + void cancel(); Result getResult() const; unordered_set<HostAndPort> getResponders() const; |