diff options
author | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2017-04-20 15:28:38 -0400 |
---|---|---|
committer | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2017-04-20 22:36:43 -0400 |
commit | 4680351e3fe6f8f47c04440f1c5d1232a4ab7b2d (patch) | |
tree | dae7b10842b2e1899a683adf4a545759182ce2ab /src/mongo/db/repl/replication_coordinator_impl.cpp | |
parent | 8b437e7a762e3ef99848659dc0d68df1e2add0a4 (diff) | |
download | mongo-4680351e3fe6f8f47c04440f1c5d1232a4ab7b2d.tar.gz |
SERVER-26848 Exit catchup mode when not syncing more data.
This reverts commit c08590a6ac9dc54c9d910822d47ea17140b56f89.
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_impl.cpp')
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 283 |
1 files changed, 195 insertions, 88 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index f42e712ffd3..301f2c92c7c 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -155,54 +155,45 @@ std::string ReplicationCoordinatorImpl::SlaveInfo::toString() const { return toBSON().toString(); } -struct ReplicationCoordinatorImpl::WaiterInfo { +ReplicationCoordinatorImpl::Waiter::Waiter(OpTime _opTime, const WriteConcernOptions* _writeConcern) + : opTime(std::move(_opTime)), writeConcern(_writeConcern) {} - using FinishFunc = stdx::function<void()>; +BSONObj ReplicationCoordinatorImpl::Waiter::toBSON() const { + BSONObjBuilder bob; + bob.append("opTime", opTime.toBSON()); + if (writeConcern) { + bob.append("writeConcern", writeConcern->toBSON()); + } + return bob.obj(); +}; - WaiterInfo(unsigned int _opID, - const OpTime _opTime, - const WriteConcernOptions* _writeConcern, - stdx::condition_variable* _condVar) - : opID(_opID), opTime(_opTime), writeConcern(_writeConcern), condVar(_condVar) {} +std::string ReplicationCoordinatorImpl::Waiter::toString() const { + return toBSON().toString(); +}; - // When waiter is signaled, finishCallback will be called while holding replCoord _mutex - // since WaiterLists are protected by _mutex. - WaiterInfo(const OpTime _opTime, FinishFunc _finishCallback) - : opTime(_opTime), finishCallback(_finishCallback) {} - BSONObj toBSON() const { - BSONObjBuilder bob; - bob.append("opId", opID); - bob.append("opTime", opTime.toBSON()); - if (writeConcern) { - bob.append("writeConcern", writeConcern->toBSON()); - } - return bob.obj(); - }; +ReplicationCoordinatorImpl::ThreadWaiter::ThreadWaiter(OpTime _opTime, + const WriteConcernOptions* _writeConcern, + stdx::condition_variable* _condVar) + : Waiter(_opTime, _writeConcern), condVar(_condVar) {} - std::string toString() const { - return toBSON().toString(); - }; +void ReplicationCoordinatorImpl::ThreadWaiter::notify_inlock() { + invariant(condVar); + condVar->notify_all(); +} - // It is invalid to call notify() unless holding ReplicationCoordinatorImpl::_mutex. - void notify() { - if (condVar) { - condVar->notify_all(); - } - if (finishCallback) { - finishCallback(); - } - } +ReplicationCoordinatorImpl::CallbackWaiter::CallbackWaiter(OpTime _opTime, + FinishFunc _finishCallback) + : Waiter(_opTime, nullptr), finishCallback(std::move(_finishCallback)) {} + +void ReplicationCoordinatorImpl::CallbackWaiter::notify_inlock() { + invariant(finishCallback); + finishCallback(); +} - const unsigned int opID = 0; - const OpTime opTime; - const WriteConcernOptions* writeConcern = nullptr; - stdx::condition_variable* condVar = nullptr; - // The callback that will be called when this waiter is notified. - FinishFunc finishCallback = nullptr; -}; -struct ReplicationCoordinatorImpl::WaiterInfoGuard { +class ReplicationCoordinatorImpl::WaiterGuard { +public: /** * Constructor takes the list of waiters and enqueues itself on the list, removing itself * in the destructor. @@ -214,23 +205,17 @@ struct ReplicationCoordinatorImpl::WaiterInfoGuard { * _list is guarded by ReplicationCoordinatorImpl::_mutex, thus it is illegal to construct one * of these without holding _mutex */ - WaiterInfoGuard(WaiterList* list, - unsigned int opID, - const OpTime opTime, - const WriteConcernOptions* writeConcern, - stdx::condition_variable* condVar) - : waiter(opID, opTime, writeConcern, condVar), _list(list) { - list->add_inlock(&waiter); + WaiterGuard(WaiterList* list, Waiter* waiter) : _list(list), _waiter(waiter) { + list->add_inlock(_waiter); } - ~WaiterInfoGuard() { - _list->remove_inlock(&waiter); + ~WaiterGuard() { + _list->remove_inlock(_waiter); } - WaiterInfo waiter; - private: WaiterList* _list; + Waiter* _waiter; }; void ReplicationCoordinatorImpl::WaiterList::add_inlock(WaiterType waiter) { @@ -239,33 +224,46 @@ void ReplicationCoordinatorImpl::WaiterList::add_inlock(WaiterType waiter) { void ReplicationCoordinatorImpl::WaiterList::signalAndRemoveIf_inlock( stdx::function<bool(WaiterType)> func) { - std::vector<WaiterType>::iterator it = _list.end(); - while (true) { - it = std::find_if(_list.begin(), _list.end(), func); - if (it == _list.end()) { - break; + // Only advance iterator when the element doesn't match. + for (auto it = _list.begin(); it != _list.end();) { + if (!func(*it)) { + ++it; + continue; + } + + WaiterType waiter = std::move(*it); + if (it == std::prev(_list.end())) { + // Iterator will be invalid after erasing the last element, so set it to the + // next one (i.e. end()). + it = _list.erase(it); + } else { + // Iterator is still valid after pop_back(). + std::swap(*it, _list.back()); + _list.pop_back(); } - (*it)->notify(); - std::swap(*it, _list.back()); - _list.pop_back(); + + // It's important to call notify() after the waiter has been removed from the list + // since notify() might remove the waiter itself. + waiter->notify_inlock(); } } void ReplicationCoordinatorImpl::WaiterList::signalAndRemoveAll_inlock() { - for (auto& waiter : _list) { - waiter->notify(); + std::vector<WaiterType> list = std::move(_list); + // Call notify() after removing the waiters from the list. + for (auto& waiter : list) { + waiter->notify_inlock(); } - _list.clear(); } bool ReplicationCoordinatorImpl::WaiterList::remove_inlock(WaiterType waiter) { auto it = std::find(_list.begin(), _list.end(), waiter); - if (it != _list.end()) { - std::swap(*it, _list.back()); - _list.pop_back(); - return true; + if (it == _list.end()) { + return false; } - return false; + std::swap(*it, _list.back()); + _list.pop_back(); + return true; } namespace { @@ -1186,7 +1184,7 @@ void ReplicationCoordinatorImpl::_setMyLastAppliedOpTime_inlock(const OpTime& op _updateSlaveInfoAppliedOpTime_inlock(mySlaveInfo, opTime); _opTimeWaiterList.signalAndRemoveIf_inlock( - [opTime](WaiterInfo* waiter) { return waiter->opTime <= opTime; }); + [opTime](Waiter* waiter) { return waiter->opTime <= opTime; }); } void ReplicationCoordinatorImpl::_setMyLastDurableOpTime_inlock(const OpTime& opTime, @@ -1352,11 +1350,11 @@ Status ReplicationCoordinatorImpl::_waitUntilOpTimeForReadDeprecated( // We just need to wait for the opTime to catch up to what we need (not majority RC). stdx::condition_variable condVar; - WaiterInfoGuard waitInfo( - &_opTimeWaiterList, opCtx->getOpID(), targetOpTime, nullptr, &condVar); + ThreadWaiter waiter(targetOpTime, nullptr, &condVar); + WaiterGuard guard(&_opTimeWaiterList, &waiter); - LOG(3) << "waituntilOpTime: waiting for OpTime " << waitInfo.waiter << " until " - << opCtx->getDeadline(); + LOG(3) << "waituntilOpTime: OpID " << opCtx->getOpID() << " is waiting for OpTime " + << waiter << " until " << opCtx->getDeadline(); auto waitStatus = opCtx->waitForConditionOrInterruptNoAssert(condVar, lock); if (!waitStatus.isOK()) { @@ -1744,8 +1742,8 @@ Status ReplicationCoordinatorImpl::_awaitReplication_inlock( // Must hold _mutex before constructing waitInfo as it will modify _replicationWaiterList stdx::condition_variable condVar; - WaiterInfoGuard waitInfo( - &_replicationWaiterList, opCtx->getOpID(), opTime, &writeConcern, &condVar); + ThreadWaiter waiter(opTime, &writeConcern, &condVar); + WaiterGuard guard(&_replicationWaiterList, &waiter); while (!_doneWaitingForReplication_inlock(opTime, minSnapshot, writeConcern)) { if (_inShutdown) { @@ -1763,7 +1761,7 @@ Status ReplicationCoordinatorImpl::_awaitReplication_inlock( BSONObjBuilder progress; _appendSlaveInfoData_inlock(&progress); log() << "Replication for failed WC: " << writeConcern.toBSON() - << ", waitInfo:" << waitInfo.waiter.toBSON() + << ", waitInfo: " << waiter << ", opID: " << opCtx->getOpID() << ", progress: " << progress.done(); } return {ErrorCodes::WriteConcernFailed, "waiting for replication timed out"}; @@ -2660,8 +2658,11 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator_inlock() { result = kActionFollowerModeStateChange; } - // Enable replication producer and applier on stepdown. + // Exit catchup mode if we're in it and enable replication producer and applier on stepdown. if (_memberState.primary()) { + if (_catchupState) { + _catchupState->abort_inlock(); + } _applierState = ApplierState::Running; _externalState->startProducerIfStopped(); } @@ -2765,13 +2766,18 @@ void ReplicationCoordinatorImpl::_performPostMemberStateUpdateAction( invariant(nextAction != kActionWinElection); lk.unlock(); _performPostMemberStateUpdateAction(nextAction); - // Notify all secondaries of the election win. lk.lock(); - _scheduleElectionWinNotification_inlock(); + if (!_getMemberState_inlock().primary()) { + break; + } + // Notify all secondaries of the election win. + _restartHeartbeats_inlock(); if (isV1ElectionProtocol()) { - _scanOpTimeForCatchUp_inlock(); + invariant(!_catchupState); + _catchupState = stdx::make_unique<CatchupState>(this); + _catchupState->start_inlock(); } else { - _finishCatchingUpOplog_inlock(); + _enterDrainMode_inlock(); } break; } @@ -2786,13 +2792,114 @@ void ReplicationCoordinatorImpl::_performPostMemberStateUpdateAction( } } +void ReplicationCoordinatorImpl::CatchupState::start_inlock() { + log() << "Entering primary catch-up mode."; + + // No catchup in single node replica set. + if (_repl->_rsConfig.getNumMembers() == 1) { + abort_inlock(); + return; + } + + auto timeoutCB = [this](const CallbackArgs& cbData) { + if (!cbData.status.isOK()) { + return; + } + log() << "Catchup timed out after becoming primary."; + stdx::lock_guard<stdx::mutex> lk(_repl->_mutex); + abort_inlock(); + }; + + // Schedule timeout callback. + auto catchupTimeout = _repl->_rsConfig.getCatchUpTimeoutPeriod(); + // Deal with infinity and overflow - no timeout. + if (catchupTimeout == ReplSetConfig::kInfiniteCatchUpTimeout || + Date_t::max() - _repl->_replExecutor->now() <= catchupTimeout) { + return; + } + auto timeoutDate = _repl->_replExecutor->now() + catchupTimeout; + auto status = _repl->_replExecutor->scheduleWorkAt(timeoutDate, timeoutCB); + if (!status.isOK()) { + log() << "Failed to schedule catchup timeout work."; + abort_inlock(); + return; + } + _timeoutCbh = status.getValue(); +} + +void ReplicationCoordinatorImpl::CatchupState::abort_inlock() { + invariant(_repl->_getMemberState_inlock().primary()); + + log() << "Exited primary catch-up mode."; + // Clean up its own members. + if (_timeoutCbh) { + _repl->_replExecutor->cancel(_timeoutCbh); + } + if (_waiter) { + _repl->_opTimeWaiterList.remove_inlock(_waiter.get()); + } + + // Enter primary drain mode. + _repl->_enterDrainMode_inlock(); + // Destruct the state itself. + _repl->_catchupState.reset(nullptr); +} + +void ReplicationCoordinatorImpl::CatchupState::signalHeartbeatUpdate_inlock() { + auto targetOpTime = _repl->_topCoord->latestKnownOpTimeSinceHeartbeatRestart(); + // Haven't collected all heartbeat responses. + if (!targetOpTime) { + return; + } + + // We've caught up. + if (*targetOpTime <= _repl->_getMyLastAppliedOpTime_inlock()) { + log() << "Caught up to the latest known optime via heartbeats after becoming primary."; + abort_inlock(); + return; + } + + // Reset the target optime if it has changed. + if (_waiter && _waiter->opTime == *targetOpTime) { + return; + } + + log() << "Heartbeats updated catchup target optime to " << *targetOpTime; + if (_waiter) { + _repl->_opTimeWaiterList.remove_inlock(_waiter.get()); + } + auto targetOpTimeCB = [this, targetOpTime]() { + // Double check the target time since stepdown may signal us too. + if (*targetOpTime <= _repl->_getMyLastAppliedOpTime_inlock()) { + log() << "Caught up to the latest known optime successfully after becoming primary."; + abort_inlock(); + } + }; + _waiter = stdx::make_unique<CallbackWaiter>(*targetOpTime, targetOpTimeCB); + _repl->_opTimeWaiterList.add_inlock(_waiter.get()); +} + +Status ReplicationCoordinatorImpl::abortCatchupIfNeeded() { + if (!isV1ElectionProtocol()) { + return Status(ErrorCodes::CommandNotSupported, + "Primary catch-up is only supported by Protocol Version 1"); + } + + stdx::lock_guard<stdx::mutex> lk(_mutex); + if (_catchupState) { + _catchupState->abort_inlock(); + return Status::OK(); + } + return Status(ErrorCodes::IllegalOperation, "The node is not in catch-up mode."); +} + void ReplicationCoordinatorImpl::_scanOpTimeForCatchUp_inlock() { auto scanner = std::make_shared<FreshnessScanner>(); auto scanStartTime = _replExecutor->now(); auto evhStatus = scanner->start( _replExecutor.get(), _rsConfig, _selfIndex, _rsConfig.getCatchUpTimeoutPeriod()); if (evhStatus == ErrorCodes::ShutdownInProgress) { - _finishCatchingUpOplog_inlock(); + _enterDrainMode_inlock(); return; } fassertStatusOK(40254, evhStatus.getStatus()); @@ -2801,7 +2908,7 @@ void ReplicationCoordinatorImpl::_scanOpTimeForCatchUp_inlock() { evhStatus.getValue(), [this, scanner, scanStartTime, term](const CallbackArgs& cbData) { stdx::lock_guard<stdx::mutex> lk(_mutex); if (cbData.status == ErrorCodes::CallbackCanceled) { - _finishCatchingUpOplog_inlock(); + _enterDrainMode_inlock(); return; } auto totalTimeout = _rsConfig.getCatchUpTimeoutPeriod(); @@ -2830,7 +2937,7 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca log() << "Could not access any nodes within timeout when checking for " << "additional ops to apply before finishing transition to primary. " << "Will move forward with becoming primary anyway."; - _finishCatchingUpOplog_inlock(); + _enterDrainMode_inlock(); return; } @@ -2839,7 +2946,7 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca if (freshnessInfo.opTime <= _getMyLastAppliedOpTime_inlock()) { log() << "My optime is most up-to-date, skipping catch-up " << "and completing transition to primary."; - _finishCatchingUpOplog_inlock(); + _enterDrainMode_inlock(); return; } @@ -2852,9 +2959,9 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca log() << "Finished catch-up oplog after becoming primary."; } - _finishCatchingUpOplog_inlock(); + _enterDrainMode_inlock(); }; - auto waiterInfo = std::make_shared<WaiterInfo>(freshnessInfo.opTime, finishCB); + auto waiterInfo = std::make_shared<CallbackWaiter>(freshnessInfo.opTime, finishCB); _opTimeWaiterList.add_inlock(waiterInfo.get()); auto timeoutCB = [this, waiterInfo, finishCB](const CallbackArgs& cbData) { @@ -2867,7 +2974,7 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca _replExecutor->scheduleWorkAt(_replExecutor->now() + timeout, timeoutCB); } -void ReplicationCoordinatorImpl::_finishCatchingUpOplog_inlock() { +void ReplicationCoordinatorImpl::_enterDrainMode_inlock() { _applierState = ApplierState::Draining; _externalState->stopProducer(); } @@ -2958,7 +3065,7 @@ ReplicationCoordinatorImpl::_setCurrentRSConfig_inlock(const ReplSetConfig& newC } void ReplicationCoordinatorImpl::_wakeReadyWaiters_inlock() { - _replicationWaiterList.signalAndRemoveIf_inlock([this](WaiterInfo* waiter) { + _replicationWaiterList.signalAndRemoveIf_inlock([this](Waiter* waiter) { return _doneWaitingForReplication_inlock( waiter->opTime, SnapshotName::min(), *waiter->writeConcern); }); |