diff options
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_impl.cpp')
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 283 |
1 files changed, 88 insertions, 195 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 301f2c92c7c..f42e712ffd3 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -155,45 +155,54 @@ std::string ReplicationCoordinatorImpl::SlaveInfo::toString() const { return toBSON().toString(); } -ReplicationCoordinatorImpl::Waiter::Waiter(OpTime _opTime, const WriteConcernOptions* _writeConcern) - : opTime(std::move(_opTime)), writeConcern(_writeConcern) {} +struct ReplicationCoordinatorImpl::WaiterInfo { -BSONObj ReplicationCoordinatorImpl::Waiter::toBSON() const { - BSONObjBuilder bob; - bob.append("opTime", opTime.toBSON()); - if (writeConcern) { - bob.append("writeConcern", writeConcern->toBSON()); - } - return bob.obj(); -}; - -std::string ReplicationCoordinatorImpl::Waiter::toString() const { - return toBSON().toString(); -}; + using FinishFunc = stdx::function<void()>; + WaiterInfo(unsigned int _opID, + const OpTime _opTime, + const WriteConcernOptions* _writeConcern, + stdx::condition_variable* _condVar) + : opID(_opID), opTime(_opTime), writeConcern(_writeConcern), condVar(_condVar) {} -ReplicationCoordinatorImpl::ThreadWaiter::ThreadWaiter(OpTime _opTime, - const WriteConcernOptions* _writeConcern, - stdx::condition_variable* _condVar) - : Waiter(_opTime, _writeConcern), condVar(_condVar) {} + // When waiter is signaled, finishCallback will be called while holding replCoord _mutex + // since WaiterLists are protected by _mutex. + WaiterInfo(const OpTime _opTime, FinishFunc _finishCallback) + : opTime(_opTime), finishCallback(_finishCallback) {} -void ReplicationCoordinatorImpl::ThreadWaiter::notify_inlock() { - invariant(condVar); - condVar->notify_all(); -} + BSONObj toBSON() const { + BSONObjBuilder bob; + bob.append("opId", opID); + bob.append("opTime", opTime.toBSON()); + if (writeConcern) { + bob.append("writeConcern", writeConcern->toBSON()); + } + return bob.obj(); + }; -ReplicationCoordinatorImpl::CallbackWaiter::CallbackWaiter(OpTime _opTime, - FinishFunc _finishCallback) - : Waiter(_opTime, nullptr), finishCallback(std::move(_finishCallback)) {} + std::string toString() const { + return toBSON().toString(); + }; -void ReplicationCoordinatorImpl::CallbackWaiter::notify_inlock() { - invariant(finishCallback); - finishCallback(); -} + // It is invalid to call notify() unless holding ReplicationCoordinatorImpl::_mutex. + void notify() { + if (condVar) { + condVar->notify_all(); + } + if (finishCallback) { + finishCallback(); + } + } + const unsigned int opID = 0; + const OpTime opTime; + const WriteConcernOptions* writeConcern = nullptr; + stdx::condition_variable* condVar = nullptr; + // The callback that will be called when this waiter is notified. + FinishFunc finishCallback = nullptr; +}; -class ReplicationCoordinatorImpl::WaiterGuard { -public: +struct ReplicationCoordinatorImpl::WaiterInfoGuard { /** * Constructor takes the list of waiters and enqueues itself on the list, removing itself * in the destructor. @@ -205,17 +214,23 @@ public: * _list is guarded by ReplicationCoordinatorImpl::_mutex, thus it is illegal to construct one * of these without holding _mutex */ - WaiterGuard(WaiterList* list, Waiter* waiter) : _list(list), _waiter(waiter) { - list->add_inlock(_waiter); + WaiterInfoGuard(WaiterList* list, + unsigned int opID, + const OpTime opTime, + const WriteConcernOptions* writeConcern, + stdx::condition_variable* condVar) + : waiter(opID, opTime, writeConcern, condVar), _list(list) { + list->add_inlock(&waiter); } - ~WaiterGuard() { - _list->remove_inlock(_waiter); + ~WaiterInfoGuard() { + _list->remove_inlock(&waiter); } + WaiterInfo waiter; + private: WaiterList* _list; - Waiter* _waiter; }; void ReplicationCoordinatorImpl::WaiterList::add_inlock(WaiterType waiter) { @@ -224,46 +239,33 @@ void ReplicationCoordinatorImpl::WaiterList::add_inlock(WaiterType waiter) { void ReplicationCoordinatorImpl::WaiterList::signalAndRemoveIf_inlock( stdx::function<bool(WaiterType)> func) { - // Only advance iterator when the element doesn't match. - for (auto it = _list.begin(); it != _list.end();) { - if (!func(*it)) { - ++it; - continue; - } - - WaiterType waiter = std::move(*it); - if (it == std::prev(_list.end())) { - // Iterator will be invalid after erasing the last element, so set it to the - // next one (i.e. end()). - it = _list.erase(it); - } else { - // Iterator is still valid after pop_back(). - std::swap(*it, _list.back()); - _list.pop_back(); + std::vector<WaiterType>::iterator it = _list.end(); + while (true) { + it = std::find_if(_list.begin(), _list.end(), func); + if (it == _list.end()) { + break; } - - // It's important to call notify() after the waiter has been removed from the list - // since notify() might remove the waiter itself. - waiter->notify_inlock(); + (*it)->notify(); + std::swap(*it, _list.back()); + _list.pop_back(); } } void ReplicationCoordinatorImpl::WaiterList::signalAndRemoveAll_inlock() { - std::vector<WaiterType> list = std::move(_list); - // Call notify() after removing the waiters from the list. - for (auto& waiter : list) { - waiter->notify_inlock(); + for (auto& waiter : _list) { + waiter->notify(); } + _list.clear(); } bool ReplicationCoordinatorImpl::WaiterList::remove_inlock(WaiterType waiter) { auto it = std::find(_list.begin(), _list.end(), waiter); - if (it == _list.end()) { - return false; + if (it != _list.end()) { + std::swap(*it, _list.back()); + _list.pop_back(); + return true; } - std::swap(*it, _list.back()); - _list.pop_back(); - return true; + return false; } namespace { @@ -1184,7 +1186,7 @@ void ReplicationCoordinatorImpl::_setMyLastAppliedOpTime_inlock(const OpTime& op _updateSlaveInfoAppliedOpTime_inlock(mySlaveInfo, opTime); _opTimeWaiterList.signalAndRemoveIf_inlock( - [opTime](Waiter* waiter) { return waiter->opTime <= opTime; }); + [opTime](WaiterInfo* waiter) { return waiter->opTime <= opTime; }); } void ReplicationCoordinatorImpl::_setMyLastDurableOpTime_inlock(const OpTime& opTime, @@ -1350,11 +1352,11 @@ Status ReplicationCoordinatorImpl::_waitUntilOpTimeForReadDeprecated( // We just need to wait for the opTime to catch up to what we need (not majority RC). stdx::condition_variable condVar; - ThreadWaiter waiter(targetOpTime, nullptr, &condVar); - WaiterGuard guard(&_opTimeWaiterList, &waiter); + WaiterInfoGuard waitInfo( + &_opTimeWaiterList, opCtx->getOpID(), targetOpTime, nullptr, &condVar); - LOG(3) << "waituntilOpTime: OpID " << opCtx->getOpID() << " is waiting for OpTime " - << waiter << " until " << opCtx->getDeadline(); + LOG(3) << "waituntilOpTime: waiting for OpTime " << waitInfo.waiter << " until " + << opCtx->getDeadline(); auto waitStatus = opCtx->waitForConditionOrInterruptNoAssert(condVar, lock); if (!waitStatus.isOK()) { @@ -1742,8 +1744,8 @@ Status ReplicationCoordinatorImpl::_awaitReplication_inlock( // Must hold _mutex before constructing waitInfo as it will modify _replicationWaiterList stdx::condition_variable condVar; - ThreadWaiter waiter(opTime, &writeConcern, &condVar); - WaiterGuard guard(&_replicationWaiterList, &waiter); + WaiterInfoGuard waitInfo( + &_replicationWaiterList, opCtx->getOpID(), opTime, &writeConcern, &condVar); while (!_doneWaitingForReplication_inlock(opTime, minSnapshot, writeConcern)) { if (_inShutdown) { @@ -1761,7 +1763,7 @@ Status ReplicationCoordinatorImpl::_awaitReplication_inlock( BSONObjBuilder progress; _appendSlaveInfoData_inlock(&progress); log() << "Replication for failed WC: " << writeConcern.toBSON() - << ", waitInfo: " << waiter << ", opID: " << opCtx->getOpID() + << ", waitInfo:" << waitInfo.waiter.toBSON() << ", progress: " << progress.done(); } return {ErrorCodes::WriteConcernFailed, "waiting for replication timed out"}; @@ -2658,11 +2660,8 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator_inlock() { result = kActionFollowerModeStateChange; } - // Exit catchup mode if we're in it and enable replication producer and applier on stepdown. + // Enable replication producer and applier on stepdown. if (_memberState.primary()) { - if (_catchupState) { - _catchupState->abort_inlock(); - } _applierState = ApplierState::Running; _externalState->startProducerIfStopped(); } @@ -2766,18 +2765,13 @@ void ReplicationCoordinatorImpl::_performPostMemberStateUpdateAction( invariant(nextAction != kActionWinElection); lk.unlock(); _performPostMemberStateUpdateAction(nextAction); - lk.lock(); - if (!_getMemberState_inlock().primary()) { - break; - } // Notify all secondaries of the election win. - _restartHeartbeats_inlock(); + lk.lock(); + _scheduleElectionWinNotification_inlock(); if (isV1ElectionProtocol()) { - invariant(!_catchupState); - _catchupState = stdx::make_unique<CatchupState>(this); - _catchupState->start_inlock(); + _scanOpTimeForCatchUp_inlock(); } else { - _enterDrainMode_inlock(); + _finishCatchingUpOplog_inlock(); } break; } @@ -2792,114 +2786,13 @@ void ReplicationCoordinatorImpl::_performPostMemberStateUpdateAction( } } -void ReplicationCoordinatorImpl::CatchupState::start_inlock() { - log() << "Entering primary catch-up mode."; - - // No catchup in single node replica set. - if (_repl->_rsConfig.getNumMembers() == 1) { - abort_inlock(); - return; - } - - auto timeoutCB = [this](const CallbackArgs& cbData) { - if (!cbData.status.isOK()) { - return; - } - log() << "Catchup timed out after becoming primary."; - stdx::lock_guard<stdx::mutex> lk(_repl->_mutex); - abort_inlock(); - }; - - // Schedule timeout callback. - auto catchupTimeout = _repl->_rsConfig.getCatchUpTimeoutPeriod(); - // Deal with infinity and overflow - no timeout. - if (catchupTimeout == ReplSetConfig::kInfiniteCatchUpTimeout || - Date_t::max() - _repl->_replExecutor->now() <= catchupTimeout) { - return; - } - auto timeoutDate = _repl->_replExecutor->now() + catchupTimeout; - auto status = _repl->_replExecutor->scheduleWorkAt(timeoutDate, timeoutCB); - if (!status.isOK()) { - log() << "Failed to schedule catchup timeout work."; - abort_inlock(); - return; - } - _timeoutCbh = status.getValue(); -} - -void ReplicationCoordinatorImpl::CatchupState::abort_inlock() { - invariant(_repl->_getMemberState_inlock().primary()); - - log() << "Exited primary catch-up mode."; - // Clean up its own members. - if (_timeoutCbh) { - _repl->_replExecutor->cancel(_timeoutCbh); - } - if (_waiter) { - _repl->_opTimeWaiterList.remove_inlock(_waiter.get()); - } - - // Enter primary drain mode. - _repl->_enterDrainMode_inlock(); - // Destruct the state itself. - _repl->_catchupState.reset(nullptr); -} - -void ReplicationCoordinatorImpl::CatchupState::signalHeartbeatUpdate_inlock() { - auto targetOpTime = _repl->_topCoord->latestKnownOpTimeSinceHeartbeatRestart(); - // Haven't collected all heartbeat responses. - if (!targetOpTime) { - return; - } - - // We've caught up. - if (*targetOpTime <= _repl->_getMyLastAppliedOpTime_inlock()) { - log() << "Caught up to the latest known optime via heartbeats after becoming primary."; - abort_inlock(); - return; - } - - // Reset the target optime if it has changed. - if (_waiter && _waiter->opTime == *targetOpTime) { - return; - } - - log() << "Heartbeats updated catchup target optime to " << *targetOpTime; - if (_waiter) { - _repl->_opTimeWaiterList.remove_inlock(_waiter.get()); - } - auto targetOpTimeCB = [this, targetOpTime]() { - // Double check the target time since stepdown may signal us too. - if (*targetOpTime <= _repl->_getMyLastAppliedOpTime_inlock()) { - log() << "Caught up to the latest known optime successfully after becoming primary."; - abort_inlock(); - } - }; - _waiter = stdx::make_unique<CallbackWaiter>(*targetOpTime, targetOpTimeCB); - _repl->_opTimeWaiterList.add_inlock(_waiter.get()); -} - -Status ReplicationCoordinatorImpl::abortCatchupIfNeeded() { - if (!isV1ElectionProtocol()) { - return Status(ErrorCodes::CommandNotSupported, - "Primary catch-up is only supported by Protocol Version 1"); - } - - stdx::lock_guard<stdx::mutex> lk(_mutex); - if (_catchupState) { - _catchupState->abort_inlock(); - return Status::OK(); - } - return Status(ErrorCodes::IllegalOperation, "The node is not in catch-up mode."); -} - void ReplicationCoordinatorImpl::_scanOpTimeForCatchUp_inlock() { auto scanner = std::make_shared<FreshnessScanner>(); auto scanStartTime = _replExecutor->now(); auto evhStatus = scanner->start( _replExecutor.get(), _rsConfig, _selfIndex, _rsConfig.getCatchUpTimeoutPeriod()); if (evhStatus == ErrorCodes::ShutdownInProgress) { - _enterDrainMode_inlock(); + _finishCatchingUpOplog_inlock(); return; } fassertStatusOK(40254, evhStatus.getStatus()); @@ -2908,7 +2801,7 @@ void ReplicationCoordinatorImpl::_scanOpTimeForCatchUp_inlock() { evhStatus.getValue(), [this, scanner, scanStartTime, term](const CallbackArgs& cbData) { stdx::lock_guard<stdx::mutex> lk(_mutex); if (cbData.status == ErrorCodes::CallbackCanceled) { - _enterDrainMode_inlock(); + _finishCatchingUpOplog_inlock(); return; } auto totalTimeout = _rsConfig.getCatchUpTimeoutPeriod(); @@ -2937,7 +2830,7 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca log() << "Could not access any nodes within timeout when checking for " << "additional ops to apply before finishing transition to primary. " << "Will move forward with becoming primary anyway."; - _enterDrainMode_inlock(); + _finishCatchingUpOplog_inlock(); return; } @@ -2946,7 +2839,7 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca if (freshnessInfo.opTime <= _getMyLastAppliedOpTime_inlock()) { log() << "My optime is most up-to-date, skipping catch-up " << "and completing transition to primary."; - _enterDrainMode_inlock(); + _finishCatchingUpOplog_inlock(); return; } @@ -2959,9 +2852,9 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca log() << "Finished catch-up oplog after becoming primary."; } - _enterDrainMode_inlock(); + _finishCatchingUpOplog_inlock(); }; - auto waiterInfo = std::make_shared<CallbackWaiter>(freshnessInfo.opTime, finishCB); + auto waiterInfo = std::make_shared<WaiterInfo>(freshnessInfo.opTime, finishCB); _opTimeWaiterList.add_inlock(waiterInfo.get()); auto timeoutCB = [this, waiterInfo, finishCB](const CallbackArgs& cbData) { @@ -2974,7 +2867,7 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca _replExecutor->scheduleWorkAt(_replExecutor->now() + timeout, timeoutCB); } -void ReplicationCoordinatorImpl::_enterDrainMode_inlock() { +void ReplicationCoordinatorImpl::_finishCatchingUpOplog_inlock() { _applierState = ApplierState::Draining; _externalState->stopProducer(); } @@ -3065,7 +2958,7 @@ ReplicationCoordinatorImpl::_setCurrentRSConfig_inlock(const ReplSetConfig& newC } void ReplicationCoordinatorImpl::_wakeReadyWaiters_inlock() { - _replicationWaiterList.signalAndRemoveIf_inlock([this](Waiter* waiter) { + _replicationWaiterList.signalAndRemoveIf_inlock([this](WaiterInfo* waiter) { return _doneWaitingForReplication_inlock( waiter->opTime, SnapshotName::min(), *waiter->writeConcern); }); |