From 4680351e3fe6f8f47c04440f1c5d1232a4ab7b2d Mon Sep 17 00:00:00 2001 From: Siyuan Zhou Date: Thu, 20 Apr 2017 15:28:38 -0400 Subject: SERVER-26848 Exit catchup mode when not syncing more data. This reverts commit c08590a6ac9dc54c9d910822d47ea17140b56f89. --- src/mongo/db/repl/bgsync.cpp | 8 +- src/mongo/db/repl/member_heartbeat_data.cpp | 4 + src/mongo/db/repl/member_heartbeat_data.h | 14 + src/mongo/db/repl/repl_set_commands.cpp | 35 ++- src/mongo/db/repl/repl_set_config.cpp | 9 +- src/mongo/db/repl/repl_set_config.h | 1 + src/mongo/db/repl/replication_coordinator.h | 5 + src/mongo/db/repl/replication_coordinator_impl.cpp | 283 ++++++++++++------ src/mongo/db/repl/replication_coordinator_impl.h | 97 +++++- .../replication_coordinator_impl_elect_v1_test.cpp | 326 ++++++++++++++------- .../replication_coordinator_impl_heartbeat.cpp | 8 + .../db/repl/replication_coordinator_impl_test.cpp | 6 - src/mongo/db/repl/replication_coordinator_mock.cpp | 4 + src/mongo/db/repl/replication_coordinator_mock.h | 2 + .../repl/replication_coordinator_test_fixture.cpp | 22 +- .../db/repl/replication_coordinator_test_fixture.h | 4 +- src/mongo/db/repl/topology_coordinator.h | 15 + src/mongo/db/repl/topology_coordinator_impl.cpp | 30 ++ src/mongo/db/repl/topology_coordinator_impl.h | 4 + src/mongo/shell/replsettest.js | 11 +- 20 files changed, 661 insertions(+), 227 deletions(-) (limited to 'src/mongo') diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index df792fa6fdc..9d129284de0 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -313,13 +313,12 @@ void BackgroundSync::_produce(OperationContext* opCtx) { if (syncSourceResp.syncSourceStatus == ErrorCodes::OplogStartMissing) { // All (accessible) sync sources were too stale. - // TODO: End catchup mode early if we are too stale. if (_replCoord->getMemberState().primary()) { warning() << "Too stale to catch up."; log() << "Our newest OpTime : " << lastOpTimeFetched; log() << "Earliest OpTime available is " << syncSourceResp.earliestOpTimeSeen << " from " << syncSourceResp.getSyncSource(); - sleepsecs(1); + _replCoord->abortCatchupIfNeeded(); return; } @@ -568,9 +567,8 @@ void BackgroundSync::_runRollback(OperationContext* opCtx, int requiredRBID, StorageInterface* storageInterface) { if (_replCoord->getMemberState().primary()) { - // TODO: Abort catchup mode early if rollback detected. - warning() << "Rollback situation detected in catch-up mode; catch-up mode will end."; - sleepsecs(1); + warning() << "Rollback situation detected in catch-up mode. Aborting catch-up mode."; + _replCoord->abortCatchupIfNeeded(); return; } diff --git a/src/mongo/db/repl/member_heartbeat_data.cpp b/src/mongo/db/repl/member_heartbeat_data.cpp index c267a6ba8ed..1b9b9ea3f13 100644 --- a/src/mongo/db/repl/member_heartbeat_data.cpp +++ b/src/mongo/db/repl/member_heartbeat_data.cpp @@ -54,6 +54,8 @@ void MemberHeartbeatData::setUpValues(Date_t now, } _authIssue = false; _lastHeartbeat = now; + _updatedSinceRestart = true; + if (!hbResponse.hasState()) { hbResponse.setState(MemberState::RS_UNKNOWN); } @@ -77,6 +79,7 @@ void MemberHeartbeatData::setDownValues(Date_t now, const std::string& heartbeat _upSince = Date_t(); _lastHeartbeat = now; _authIssue = false; + _updatedSinceRestart = true; _lastResponse = ReplSetHeartbeatResponse(); _lastResponse.setState(MemberState::RS_DOWN); @@ -91,6 +94,7 @@ void MemberHeartbeatData::setAuthIssue(Date_t now) { _upSince = Date_t(); _lastHeartbeat = now; _authIssue = true; + _updatedSinceRestart = true; _lastResponse = ReplSetHeartbeatResponse(); _lastResponse.setState(MemberState::RS_UNKNOWN); diff --git a/src/mongo/db/repl/member_heartbeat_data.h b/src/mongo/db/repl/member_heartbeat_data.h index 0f5dacd9081..f67a0a87757 100644 --- a/src/mongo/db/repl/member_heartbeat_data.h +++ b/src/mongo/db/repl/member_heartbeat_data.h @@ -123,6 +123,17 @@ public: */ void setAuthIssue(Date_t now); + /** + * Reset the boolean to record the last restart. + */ + void restart() { + _updatedSinceRestart = false; + } + + bool isUpdatedSinceRestart() const { + return _updatedSinceRestart; + } + private: // -1 = not checked yet, 0 = member is down/unreachable, 1 = member is up int _health; @@ -139,6 +150,9 @@ private: // The last heartbeat response we received. ReplSetHeartbeatResponse _lastResponse; + + // Have we received heartbeats since the last restart? + bool _updatedSinceRestart = false; }; } // namespace repl diff --git a/src/mongo/db/repl/repl_set_commands.cpp b/src/mongo/db/repl/repl_set_commands.cpp index 6348f72bfbd..09226ba0237 100644 --- a/src/mongo/db/repl/repl_set_commands.cpp +++ b/src/mongo/db/repl/repl_set_commands.cpp @@ -868,7 +868,7 @@ public: status = getGlobalReplicationCoordinator()->stepUpIfEligible(); if (!status.isOK()) { - log() << "replSetStepUp request failed " << causedBy(status); + log() << "replSetStepUp request failed" << causedBy(status); } return appendCommandStatus(result, status); @@ -880,5 +880,38 @@ private: } } cmdReplSetStepUp; +class CmdReplSetAbortPrimaryCatchUp : public ReplSetCommand { +public: + virtual void help(stringstream& help) const { + help << "{ CmdReplSetAbortPrimaryCatchUp : 1 }\n"; + help << "Abort primary catch-up mode; immediately finish the transition to primary " + "without fetching any further unreplicated writes from any other online nodes"; + } + + CmdReplSetAbortPrimaryCatchUp() : ReplSetCommand("replSetAbortPrimaryCatchUp") {} + + virtual bool run(OperationContext* opCtx, + const string&, + BSONObj& cmdObj, + string& errmsg, + BSONObjBuilder& result) override { + Status status = getGlobalReplicationCoordinator()->checkReplEnabledForCommand(&result); + if (!status.isOK()) + return appendCommandStatus(result, status); + log() << "Received replSetAbortPrimaryCatchUp request"; + + status = getGlobalReplicationCoordinator()->abortCatchupIfNeeded(); + if (!status.isOK()) { + log() << "replSetAbortPrimaryCatchUp request failed" << causedBy(status); + } + return appendCommandStatus(result, status); + } + +private: + ActionSet getAuthActionSet() const override { + return ActionSet{ActionType::replSetStateChange}; + } +} cmdReplSetAbortPrimaryCatchUp; + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/repl_set_config.cpp b/src/mongo/db/repl/repl_set_config.cpp index f7140cc56bf..2fbbd7cad52 100644 --- a/src/mongo/db/repl/repl_set_config.cpp +++ b/src/mongo/db/repl/repl_set_config.cpp @@ -44,6 +44,7 @@ namespace repl { const size_t ReplSetConfig::kMaxMembers; const size_t ReplSetConfig::kMaxVotingMembers; +const Milliseconds ReplSetConfig::kInfiniteCatchUpTimeout(-1); const std::string ReplSetConfig::kConfigServerFieldName = "configsvr"; const std::string ReplSetConfig::kVersionFieldName = "version"; @@ -51,7 +52,7 @@ const std::string ReplSetConfig::kMajorityWriteConcernModeName = "$majority"; const Milliseconds ReplSetConfig::kDefaultHeartbeatInterval(2000); const Seconds ReplSetConfig::kDefaultHeartbeatTimeoutPeriod(10); const Milliseconds ReplSetConfig::kDefaultElectionTimeoutPeriod(10000); -const Milliseconds ReplSetConfig::kDefaultCatchUpTimeoutPeriod(2000); +const Milliseconds ReplSetConfig::kDefaultCatchUpTimeoutPeriod(kInfiniteCatchUpTimeout); const bool ReplSetConfig::kDefaultChainingAllowed(true); namespace { @@ -270,14 +271,14 @@ Status ReplSetConfig::_parseSettingsSubdocument(const BSONObj& settings) { // // Parse catchUpTimeoutMillis // - auto notLessThanZero = stdx::bind(std::greater_equal(), stdx::placeholders::_1, 0); + auto validCatchUpTimeout = [](long long timeout) { return timeout >= 0LL || timeout == -1LL; }; long long catchUpTimeoutMillis; Status catchUpTimeoutStatus = bsonExtractIntegerFieldWithDefaultIf( settings, kCatchUpTimeoutFieldName, durationCount(kDefaultCatchUpTimeoutPeriod), - notLessThanZero, - "catch-up timeout must be greater than or equal to 0", + validCatchUpTimeout, + "catch-up timeout must be positive, 0 (no catch-up) or -1 (infinite catch-up).", &catchUpTimeoutMillis); if (!catchUpTimeoutStatus.isOK()) { return catchUpTimeoutStatus; diff --git a/src/mongo/db/repl/repl_set_config.h b/src/mongo/db/repl/repl_set_config.h index e44e51b12a7..63d5bcb80e8 100644 --- a/src/mongo/db/repl/repl_set_config.h +++ b/src/mongo/db/repl/repl_set_config.h @@ -59,6 +59,7 @@ public: static const size_t kMaxMembers = 50; static const size_t kMaxVotingMembers = 7; + static const Milliseconds kInfiniteCatchUpTimeout; static const Milliseconds kDefaultElectionTimeoutPeriod; static const Milliseconds kDefaultHeartbeatInterval; diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index 8446e718d30..c53f5a7b655 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -880,6 +880,11 @@ public: virtual ServiceContext* getServiceContext() = 0; + /** + * Abort catchup if the node is in catchup mode. + */ + virtual Status abortCatchupIfNeeded() = 0; + protected: ReplicationCoordinator(); }; diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index f42e712ffd3..301f2c92c7c 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -155,54 +155,45 @@ std::string ReplicationCoordinatorImpl::SlaveInfo::toString() const { return toBSON().toString(); } -struct ReplicationCoordinatorImpl::WaiterInfo { +ReplicationCoordinatorImpl::Waiter::Waiter(OpTime _opTime, const WriteConcernOptions* _writeConcern) + : opTime(std::move(_opTime)), writeConcern(_writeConcern) {} - using FinishFunc = stdx::function; +BSONObj ReplicationCoordinatorImpl::Waiter::toBSON() const { + BSONObjBuilder bob; + bob.append("opTime", opTime.toBSON()); + if (writeConcern) { + bob.append("writeConcern", writeConcern->toBSON()); + } + return bob.obj(); +}; - WaiterInfo(unsigned int _opID, - const OpTime _opTime, - const WriteConcernOptions* _writeConcern, - stdx::condition_variable* _condVar) - : opID(_opID), opTime(_opTime), writeConcern(_writeConcern), condVar(_condVar) {} +std::string ReplicationCoordinatorImpl::Waiter::toString() const { + return toBSON().toString(); +}; - // When waiter is signaled, finishCallback will be called while holding replCoord _mutex - // since WaiterLists are protected by _mutex. - WaiterInfo(const OpTime _opTime, FinishFunc _finishCallback) - : opTime(_opTime), finishCallback(_finishCallback) {} - BSONObj toBSON() const { - BSONObjBuilder bob; - bob.append("opId", opID); - bob.append("opTime", opTime.toBSON()); - if (writeConcern) { - bob.append("writeConcern", writeConcern->toBSON()); - } - return bob.obj(); - }; +ReplicationCoordinatorImpl::ThreadWaiter::ThreadWaiter(OpTime _opTime, + const WriteConcernOptions* _writeConcern, + stdx::condition_variable* _condVar) + : Waiter(_opTime, _writeConcern), condVar(_condVar) {} - std::string toString() const { - return toBSON().toString(); - }; +void ReplicationCoordinatorImpl::ThreadWaiter::notify_inlock() { + invariant(condVar); + condVar->notify_all(); +} - // It is invalid to call notify() unless holding ReplicationCoordinatorImpl::_mutex. - void notify() { - if (condVar) { - condVar->notify_all(); - } - if (finishCallback) { - finishCallback(); - } - } +ReplicationCoordinatorImpl::CallbackWaiter::CallbackWaiter(OpTime _opTime, + FinishFunc _finishCallback) + : Waiter(_opTime, nullptr), finishCallback(std::move(_finishCallback)) {} + +void ReplicationCoordinatorImpl::CallbackWaiter::notify_inlock() { + invariant(finishCallback); + finishCallback(); +} - const unsigned int opID = 0; - const OpTime opTime; - const WriteConcernOptions* writeConcern = nullptr; - stdx::condition_variable* condVar = nullptr; - // The callback that will be called when this waiter is notified. - FinishFunc finishCallback = nullptr; -}; -struct ReplicationCoordinatorImpl::WaiterInfoGuard { +class ReplicationCoordinatorImpl::WaiterGuard { +public: /** * Constructor takes the list of waiters and enqueues itself on the list, removing itself * in the destructor. @@ -214,23 +205,17 @@ struct ReplicationCoordinatorImpl::WaiterInfoGuard { * _list is guarded by ReplicationCoordinatorImpl::_mutex, thus it is illegal to construct one * of these without holding _mutex */ - WaiterInfoGuard(WaiterList* list, - unsigned int opID, - const OpTime opTime, - const WriteConcernOptions* writeConcern, - stdx::condition_variable* condVar) - : waiter(opID, opTime, writeConcern, condVar), _list(list) { - list->add_inlock(&waiter); + WaiterGuard(WaiterList* list, Waiter* waiter) : _list(list), _waiter(waiter) { + list->add_inlock(_waiter); } - ~WaiterInfoGuard() { - _list->remove_inlock(&waiter); + ~WaiterGuard() { + _list->remove_inlock(_waiter); } - WaiterInfo waiter; - private: WaiterList* _list; + Waiter* _waiter; }; void ReplicationCoordinatorImpl::WaiterList::add_inlock(WaiterType waiter) { @@ -239,33 +224,46 @@ void ReplicationCoordinatorImpl::WaiterList::add_inlock(WaiterType waiter) { void ReplicationCoordinatorImpl::WaiterList::signalAndRemoveIf_inlock( stdx::function func) { - std::vector::iterator it = _list.end(); - while (true) { - it = std::find_if(_list.begin(), _list.end(), func); - if (it == _list.end()) { - break; + // Only advance iterator when the element doesn't match. + for (auto it = _list.begin(); it != _list.end();) { + if (!func(*it)) { + ++it; + continue; + } + + WaiterType waiter = std::move(*it); + if (it == std::prev(_list.end())) { + // Iterator will be invalid after erasing the last element, so set it to the + // next one (i.e. end()). + it = _list.erase(it); + } else { + // Iterator is still valid after pop_back(). + std::swap(*it, _list.back()); + _list.pop_back(); } - (*it)->notify(); - std::swap(*it, _list.back()); - _list.pop_back(); + + // It's important to call notify() after the waiter has been removed from the list + // since notify() might remove the waiter itself. + waiter->notify_inlock(); } } void ReplicationCoordinatorImpl::WaiterList::signalAndRemoveAll_inlock() { - for (auto& waiter : _list) { - waiter->notify(); + std::vector list = std::move(_list); + // Call notify() after removing the waiters from the list. + for (auto& waiter : list) { + waiter->notify_inlock(); } - _list.clear(); } bool ReplicationCoordinatorImpl::WaiterList::remove_inlock(WaiterType waiter) { auto it = std::find(_list.begin(), _list.end(), waiter); - if (it != _list.end()) { - std::swap(*it, _list.back()); - _list.pop_back(); - return true; + if (it == _list.end()) { + return false; } - return false; + std::swap(*it, _list.back()); + _list.pop_back(); + return true; } namespace { @@ -1186,7 +1184,7 @@ void ReplicationCoordinatorImpl::_setMyLastAppliedOpTime_inlock(const OpTime& op _updateSlaveInfoAppliedOpTime_inlock(mySlaveInfo, opTime); _opTimeWaiterList.signalAndRemoveIf_inlock( - [opTime](WaiterInfo* waiter) { return waiter->opTime <= opTime; }); + [opTime](Waiter* waiter) { return waiter->opTime <= opTime; }); } void ReplicationCoordinatorImpl::_setMyLastDurableOpTime_inlock(const OpTime& opTime, @@ -1352,11 +1350,11 @@ Status ReplicationCoordinatorImpl::_waitUntilOpTimeForReadDeprecated( // We just need to wait for the opTime to catch up to what we need (not majority RC). stdx::condition_variable condVar; - WaiterInfoGuard waitInfo( - &_opTimeWaiterList, opCtx->getOpID(), targetOpTime, nullptr, &condVar); + ThreadWaiter waiter(targetOpTime, nullptr, &condVar); + WaiterGuard guard(&_opTimeWaiterList, &waiter); - LOG(3) << "waituntilOpTime: waiting for OpTime " << waitInfo.waiter << " until " - << opCtx->getDeadline(); + LOG(3) << "waituntilOpTime: OpID " << opCtx->getOpID() << " is waiting for OpTime " + << waiter << " until " << opCtx->getDeadline(); auto waitStatus = opCtx->waitForConditionOrInterruptNoAssert(condVar, lock); if (!waitStatus.isOK()) { @@ -1744,8 +1742,8 @@ Status ReplicationCoordinatorImpl::_awaitReplication_inlock( // Must hold _mutex before constructing waitInfo as it will modify _replicationWaiterList stdx::condition_variable condVar; - WaiterInfoGuard waitInfo( - &_replicationWaiterList, opCtx->getOpID(), opTime, &writeConcern, &condVar); + ThreadWaiter waiter(opTime, &writeConcern, &condVar); + WaiterGuard guard(&_replicationWaiterList, &waiter); while (!_doneWaitingForReplication_inlock(opTime, minSnapshot, writeConcern)) { if (_inShutdown) { @@ -1763,7 +1761,7 @@ Status ReplicationCoordinatorImpl::_awaitReplication_inlock( BSONObjBuilder progress; _appendSlaveInfoData_inlock(&progress); log() << "Replication for failed WC: " << writeConcern.toBSON() - << ", waitInfo:" << waitInfo.waiter.toBSON() + << ", waitInfo: " << waiter << ", opID: " << opCtx->getOpID() << ", progress: " << progress.done(); } return {ErrorCodes::WriteConcernFailed, "waiting for replication timed out"}; @@ -2660,8 +2658,11 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator_inlock() { result = kActionFollowerModeStateChange; } - // Enable replication producer and applier on stepdown. + // Exit catchup mode if we're in it and enable replication producer and applier on stepdown. if (_memberState.primary()) { + if (_catchupState) { + _catchupState->abort_inlock(); + } _applierState = ApplierState::Running; _externalState->startProducerIfStopped(); } @@ -2765,13 +2766,18 @@ void ReplicationCoordinatorImpl::_performPostMemberStateUpdateAction( invariant(nextAction != kActionWinElection); lk.unlock(); _performPostMemberStateUpdateAction(nextAction); - // Notify all secondaries of the election win. lk.lock(); - _scheduleElectionWinNotification_inlock(); + if (!_getMemberState_inlock().primary()) { + break; + } + // Notify all secondaries of the election win. + _restartHeartbeats_inlock(); if (isV1ElectionProtocol()) { - _scanOpTimeForCatchUp_inlock(); + invariant(!_catchupState); + _catchupState = stdx::make_unique(this); + _catchupState->start_inlock(); } else { - _finishCatchingUpOplog_inlock(); + _enterDrainMode_inlock(); } break; } @@ -2786,13 +2792,114 @@ void ReplicationCoordinatorImpl::_performPostMemberStateUpdateAction( } } +void ReplicationCoordinatorImpl::CatchupState::start_inlock() { + log() << "Entering primary catch-up mode."; + + // No catchup in single node replica set. + if (_repl->_rsConfig.getNumMembers() == 1) { + abort_inlock(); + return; + } + + auto timeoutCB = [this](const CallbackArgs& cbData) { + if (!cbData.status.isOK()) { + return; + } + log() << "Catchup timed out after becoming primary."; + stdx::lock_guard lk(_repl->_mutex); + abort_inlock(); + }; + + // Schedule timeout callback. + auto catchupTimeout = _repl->_rsConfig.getCatchUpTimeoutPeriod(); + // Deal with infinity and overflow - no timeout. + if (catchupTimeout == ReplSetConfig::kInfiniteCatchUpTimeout || + Date_t::max() - _repl->_replExecutor->now() <= catchupTimeout) { + return; + } + auto timeoutDate = _repl->_replExecutor->now() + catchupTimeout; + auto status = _repl->_replExecutor->scheduleWorkAt(timeoutDate, timeoutCB); + if (!status.isOK()) { + log() << "Failed to schedule catchup timeout work."; + abort_inlock(); + return; + } + _timeoutCbh = status.getValue(); +} + +void ReplicationCoordinatorImpl::CatchupState::abort_inlock() { + invariant(_repl->_getMemberState_inlock().primary()); + + log() << "Exited primary catch-up mode."; + // Clean up its own members. + if (_timeoutCbh) { + _repl->_replExecutor->cancel(_timeoutCbh); + } + if (_waiter) { + _repl->_opTimeWaiterList.remove_inlock(_waiter.get()); + } + + // Enter primary drain mode. + _repl->_enterDrainMode_inlock(); + // Destruct the state itself. + _repl->_catchupState.reset(nullptr); +} + +void ReplicationCoordinatorImpl::CatchupState::signalHeartbeatUpdate_inlock() { + auto targetOpTime = _repl->_topCoord->latestKnownOpTimeSinceHeartbeatRestart(); + // Haven't collected all heartbeat responses. + if (!targetOpTime) { + return; + } + + // We've caught up. + if (*targetOpTime <= _repl->_getMyLastAppliedOpTime_inlock()) { + log() << "Caught up to the latest known optime via heartbeats after becoming primary."; + abort_inlock(); + return; + } + + // Reset the target optime if it has changed. + if (_waiter && _waiter->opTime == *targetOpTime) { + return; + } + + log() << "Heartbeats updated catchup target optime to " << *targetOpTime; + if (_waiter) { + _repl->_opTimeWaiterList.remove_inlock(_waiter.get()); + } + auto targetOpTimeCB = [this, targetOpTime]() { + // Double check the target time since stepdown may signal us too. + if (*targetOpTime <= _repl->_getMyLastAppliedOpTime_inlock()) { + log() << "Caught up to the latest known optime successfully after becoming primary."; + abort_inlock(); + } + }; + _waiter = stdx::make_unique(*targetOpTime, targetOpTimeCB); + _repl->_opTimeWaiterList.add_inlock(_waiter.get()); +} + +Status ReplicationCoordinatorImpl::abortCatchupIfNeeded() { + if (!isV1ElectionProtocol()) { + return Status(ErrorCodes::CommandNotSupported, + "Primary catch-up is only supported by Protocol Version 1"); + } + + stdx::lock_guard lk(_mutex); + if (_catchupState) { + _catchupState->abort_inlock(); + return Status::OK(); + } + return Status(ErrorCodes::IllegalOperation, "The node is not in catch-up mode."); +} + void ReplicationCoordinatorImpl::_scanOpTimeForCatchUp_inlock() { auto scanner = std::make_shared(); auto scanStartTime = _replExecutor->now(); auto evhStatus = scanner->start( _replExecutor.get(), _rsConfig, _selfIndex, _rsConfig.getCatchUpTimeoutPeriod()); if (evhStatus == ErrorCodes::ShutdownInProgress) { - _finishCatchingUpOplog_inlock(); + _enterDrainMode_inlock(); return; } fassertStatusOK(40254, evhStatus.getStatus()); @@ -2801,7 +2908,7 @@ void ReplicationCoordinatorImpl::_scanOpTimeForCatchUp_inlock() { evhStatus.getValue(), [this, scanner, scanStartTime, term](const CallbackArgs& cbData) { stdx::lock_guard lk(_mutex); if (cbData.status == ErrorCodes::CallbackCanceled) { - _finishCatchingUpOplog_inlock(); + _enterDrainMode_inlock(); return; } auto totalTimeout = _rsConfig.getCatchUpTimeoutPeriod(); @@ -2830,7 +2937,7 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca log() << "Could not access any nodes within timeout when checking for " << "additional ops to apply before finishing transition to primary. " << "Will move forward with becoming primary anyway."; - _finishCatchingUpOplog_inlock(); + _enterDrainMode_inlock(); return; } @@ -2839,7 +2946,7 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca if (freshnessInfo.opTime <= _getMyLastAppliedOpTime_inlock()) { log() << "My optime is most up-to-date, skipping catch-up " << "and completing transition to primary."; - _finishCatchingUpOplog_inlock(); + _enterDrainMode_inlock(); return; } @@ -2852,9 +2959,9 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca log() << "Finished catch-up oplog after becoming primary."; } - _finishCatchingUpOplog_inlock(); + _enterDrainMode_inlock(); }; - auto waiterInfo = std::make_shared(freshnessInfo.opTime, finishCB); + auto waiterInfo = std::make_shared(freshnessInfo.opTime, finishCB); _opTimeWaiterList.add_inlock(waiterInfo.get()); auto timeoutCB = [this, waiterInfo, finishCB](const CallbackArgs& cbData) { @@ -2867,7 +2974,7 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca _replExecutor->scheduleWorkAt(_replExecutor->now() + timeout, timeoutCB); } -void ReplicationCoordinatorImpl::_finishCatchingUpOplog_inlock() { +void ReplicationCoordinatorImpl::_enterDrainMode_inlock() { _applierState = ApplierState::Draining; _externalState->stopProducer(); } @@ -2958,7 +3065,7 @@ ReplicationCoordinatorImpl::_setCurrentRSConfig_inlock(const ReplSetConfig& newC } void ReplicationCoordinatorImpl::_wakeReadyWaiters_inlock() { - _replicationWaiterList.signalAndRemoveIf_inlock([this](WaiterInfo* waiter) { + _replicationWaiterList.signalAndRemoveIf_inlock([this](Waiter* waiter) { return _doneWaitingForReplication_inlock( waiter->opTime, SnapshotName::min(), *waiter->writeConcern); }); diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index c5f94f32e97..404bf0e16b9 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -333,6 +333,8 @@ public: virtual Status stepUpIfEligible() override; + virtual Status abortCatchupIfNeeded() override; + // ================== Test support API =================== /** @@ -482,16 +484,55 @@ private: kActionStartSingleNodeElection }; - // Struct that holds information about clients waiting for replication. - struct WaiterInfo; - struct WaiterInfoGuard; + // Abstract struct that holds information about clients waiting for replication. + // Subclasses need to define how to notify them. + struct Waiter { + Waiter(OpTime _opTime, const WriteConcernOptions* _writeConcern); + virtual ~Waiter() = default; + + BSONObj toBSON() const; + std::string toString() const; + // It is invalid to call notify_inlock() unless holding ReplicationCoordinatorImpl::_mutex. + virtual void notify_inlock() = 0; + + const OpTime opTime; + const WriteConcernOptions* writeConcern = nullptr; + }; + + // When ThreadWaiter gets notified, it will signal the conditional variable. + // + // This is used when a thread wants to block inline until the opTime is reached with the given + // writeConcern. + struct ThreadWaiter : public Waiter { + ThreadWaiter(OpTime _opTime, + const WriteConcernOptions* _writeConcern, + stdx::condition_variable* _condVar); + void notify_inlock() override; + + stdx::condition_variable* condVar = nullptr; + }; + + // When the waiter is notified, finishCallback will be called while holding replCoord _mutex + // since WaiterLists are protected by _mutex. + // + // This is used when we want to run a callback when the opTime is reached. + struct CallbackWaiter : public Waiter { + using FinishFunc = stdx::function; + + CallbackWaiter(OpTime _opTime, FinishFunc _finishCallback); + void notify_inlock() override; + + // The callback that will be called when this waiter is notified. + FinishFunc finishCallback = nullptr; + }; + + class WaiterGuard; class WaiterList { public: - using WaiterType = WaiterInfo*; + using WaiterType = Waiter*; - // Adds waiter into the list. Usually, the waiter will be signaled only once and then - // removed. + // Adds waiter into the list. void add_inlock(WaiterType waiter); // Returns whether waiter is found and removed. bool remove_inlock(WaiterType waiter); @@ -528,6 +569,44 @@ private: typedef std::vector HeartbeatHandles; + // The state and logic of primary catchup. + // + // When start() is called, CatchupState will schedule the timeout callback. When we get + // responses of the latest heartbeats from all nodes, the target time (opTime of _waiter) is + // set. + // The primary exits catchup mode when any of the following happens. + // 1) My last applied optime reaches the target optime, if we've received a heartbeat from all + // nodes. + // 2) Catchup timeout expires. + // 3) Primary steps down. + // 4) The primary has to roll back to catch up. + // 5) The primary is too stale to catch up. + // + // On abort, the state resets the pointer to itself in ReplCoordImpl. In other words, the + // life cycle of the state object aligns with the conceptual state. + // In shutdown, the timeout callback will be canceled by the executor and the state is safe to + // destroy. + // + // Any function of the state must be called while holding _mutex. + class CatchupState { + public: + CatchupState(ReplicationCoordinatorImpl* repl) : _repl(repl) {} + // start() can only be called once. + void start_inlock(); + // Reset the state itself to destruct the state. + void abort_inlock(); + // Heartbeat calls this function to update the target optime. + void signalHeartbeatUpdate_inlock(); + + private: + ReplicationCoordinatorImpl* _repl; // Not owned. + // Callback handle used to cancel a scheduled catchup timeout callback. + ReplicationExecutor::CallbackHandle _timeoutCbh; + // Handle to a Waiter that contains the current target optime to reach after which + // we can exit catchup mode. + std::unique_ptr _waiter; + }; + /** * Appends a "replicationProgress" section with data for each member in set. */ @@ -1168,7 +1247,7 @@ private: /** * Finish catch-up mode and start drain mode. */ - void _finishCatchingUpOplog_inlock(); + void _enterDrainMode_inlock(); /** * Waits for the config state to leave kConfigStartingUp, which indicates that start() has @@ -1404,6 +1483,10 @@ private: mutable stdx::mutex _indexPrefetchMutex; ReplSettings::IndexPrefetchConfig _indexPrefetchConfig = ReplSettings::IndexPrefetchConfig::PREFETCH_ALL; // (I) + + // The catchup state including all catchup logic. The presence of a non-null pointer indicates + // that the node is currently in catchup mode. + std::unique_ptr _catchupState; // (X) }; } // namespace repl diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp index 1075a7e9232..7588fb166d5 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp @@ -160,7 +160,7 @@ TEST_F(ReplCoordTest, ElectionSucceedsWhenNodeIsTheOnlyElectableNode) { ASSERT(getReplCoord()->getMemberState().primary()) << getReplCoord()->getMemberState().toString(); - simulateCatchUpTimeout(); + simulateCatchUpAbort(); ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); const auto opCtxPtr = makeOperationContext(); @@ -223,8 +223,6 @@ TEST_F(ReplCoordTest, ElectionSucceedsWhenNodeIsTheOnlyNode) { getReplCoord()->waitForElectionFinish_forTest(); ASSERT(getReplCoord()->getMemberState().primary()) << getReplCoord()->getMemberState().toString(); - // Wait for catchup check to finish. - simulateCatchUpTimeout(); ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); const auto opCtxPtr = makeOperationContext(); @@ -1280,15 +1278,19 @@ TEST_F(ReplCoordTest, NodeCancelsElectionUponReceivingANewConfigDuringVotePhase) class PrimaryCatchUpTest : public ReplCoordTest { protected: using NetworkOpIter = NetworkInterfaceMock::NetworkOperationIterator; - using FreshnessScanFn = stdx::function; + using NetworkRequestFn = stdx::function; - void replyToHeartbeatRequestAsSecondaries(const NetworkOpIter noi) { + const Timestamp smallTimestamp{1, 1}; + + executor::RemoteCommandResponse makeHeartbeatResponse(OpTime opTime) { ReplSetConfig rsConfig = getReplCoord()->getReplicaSetConfig_forTest(); ReplSetHeartbeatResponse hbResp; hbResp.setSetName(rsConfig.getReplSetName()); hbResp.setState(MemberState::RS_SECONDARY); hbResp.setConfigVersion(rsConfig.getConfigVersion()); - getNet()->scheduleResponse(noi, getNet()->now(), makeResponseStatus(hbResp.toBSON(true))); + hbResp.setAppliedOpTime(opTime); + hbResp.setDurableOpTime(opTime); + return makeResponseStatus(hbResp.toBSON(true)); } void simulateSuccessfulV1Voting() { @@ -1300,10 +1302,9 @@ protected: log() << "Election timeout scheduled at " << electionTimeoutWhen << " (simulator time)"; ASSERT(replCoord->getMemberState().secondary()) << replCoord->getMemberState().toString(); - bool hasReadyRequests = true; - // Process requests until we're primary and consume the heartbeats for the notification + // Process requests until we're primary but leave the heartbeats for the notification // of election win. Exit immediately on unexpected requests. - while (!replCoord->getMemberState().primary() || hasReadyRequests) { + while (!replCoord->getMemberState().primary()) { log() << "Waiting on network in state " << replCoord->getMemberState(); net->enterNetwork(); if (net->now() < electionTimeoutWhen) { @@ -1314,7 +1315,9 @@ protected: const RemoteCommandRequest& request = noi->getRequest(); log() << request.target.toString() << " processing " << request.cmdObj; if (ReplSetHeartbeatArgsV1().initialize(request.cmdObj).isOK()) { - replyToHeartbeatRequestAsSecondaries(net->getNextReadyRequest()); + OpTime opTime(Timestamp(), getReplCoord()->getTerm()); + net->scheduleResponse( + net->getNextReadyRequest(), net->now(), makeHeartbeatResponse(opTime)); } else if (request.cmdObj.firstElement().fieldNameStringData() == "replSetRequestVotes") { net->scheduleResponse(net->getNextReadyRequest(), @@ -1336,12 +1339,11 @@ protected: // executor. getReplExec()->waitForDBWork_forTest(); net->runReadyNetworkOperations(); - hasReadyRequests = net->hasReadyRequests(); net->exitNetwork(); } } - ReplSetConfig setUp3NodeReplSetAndRunForElection(OpTime opTime) { + ReplSetConfig setUp3NodeReplSetAndRunForElection(OpTime opTime, bool infiniteTimeout = false) { BSONObj configObj = BSON("_id" << "mySet" << "version" @@ -1356,7 +1358,8 @@ protected: << "protocolVersion" << 1 << "settings" - << BSON("catchUpTimeoutMillis" << 5000)); + << BSON("heartbeatTimeoutSecs" << 1 << "catchUpTimeoutMillis" + << (infiniteTimeout ? -1 : 5000))); assertStartSuccess(configObj, HostAndPort("node1", 12345)); ReplSetConfig config = assertMakeRSConfig(configObj); @@ -1378,17 +1381,15 @@ protected: return makeResponseStatus(BSON("optimes" << BSON("appliedOpTime" << opTime.toBSON()))); } - void processFreshnessScanRequests(FreshnessScanFn onFreshnessScanRequest) { + void processHeartbeatRequests(NetworkRequestFn onHeartbeatRequest) { NetworkInterfaceMock* net = getNet(); net->enterNetwork(); while (net->hasReadyRequests()) { const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); const RemoteCommandRequest& request = noi->getRequest(); log() << request.target.toString() << " processing " << request.cmdObj; - if (request.cmdObj.firstElement().fieldNameStringData() == "replSetGetStatus") { - onFreshnessScanRequest(noi); - } else if (ReplSetHeartbeatArgsV1().initialize(request.cmdObj).isOK()) { - replyToHeartbeatRequestAsSecondaries(noi); + if (ReplSetHeartbeatArgsV1().initialize(request.cmdObj).isOK()) { + onHeartbeatRequest(noi); } else { log() << "Black holing unexpected request to " << request.target << ": " << request.cmdObj; @@ -1399,7 +1400,8 @@ protected: net->exitNetwork(); } - void replyHeartbeatsAndRunUntil(Date_t until) { + // Response heartbeats with opTime until the given time. Exit if it sees any other request. + void replyHeartbeatsAndRunUntil(Date_t until, NetworkRequestFn onHeartbeatRequest) { auto net = getNet(); net->enterNetwork(); while (net->now() < until) { @@ -1407,9 +1409,10 @@ protected: // Peek the next request auto noi = net->getFrontOfUnscheduledQueue(); auto& request = noi->getRequest(); + log() << request.target << " at " << net->now() << " processing " << request.cmdObj; if (ReplSetHeartbeatArgsV1().initialize(request.cmdObj).isOK()) { // Consume the next request - replyToHeartbeatRequestAsSecondaries(net->getNextReadyRequest()); + onHeartbeatRequest(net->getNextReadyRequest()); } else { // Cannot consume other requests than heartbeats. net->exitNetwork(); @@ -1420,126 +1423,153 @@ protected: } net->exitNetwork(); } + + // Simulate the work done by bgsync and applier threads. setMyLastAppliedOpTime() will signal + // the optime waiter. + void advanceMyLastAppliedOpTime(OpTime opTime) { + getReplCoord()->setMyLastAppliedOpTime(opTime); + getNet()->enterNetwork(); + getNet()->runReadyNetworkOperations(); + getNet()->exitNetwork(); + } }; -TEST_F(PrimaryCatchUpTest, PrimaryDoNotNeedToCatchUp) { +// The first round of heartbeats indicates we are the most up-to-date. +TEST_F(PrimaryCatchUpTest, PrimaryDoesNotNeedToCatchUp) { startCapturingLogMessages(); OpTime time1(Timestamp(100, 1), 0); ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1); - processFreshnessScanRequests([this](const NetworkOpIter noi) { - getNet()->scheduleResponse(noi, getNet()->now(), makeFreshnessScanResponse(OpTime())); + int count = 0; + processHeartbeatRequests([this, time1, &count](const NetworkOpIter noi) { + count++; + auto net = getNet(); + // The old primary accepted one more op and all nodes caught up after voting for me. + net->scheduleResponse(noi, net->now(), makeHeartbeatResponse(time1)); }); + + // Get 2 heartbeats from secondaries. + ASSERT_EQUALS(2, count); ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); stopCapturingLogMessages(); - ASSERT_EQUALS(1, countLogLinesContaining("My optime is most up-to-date, skipping catch-up")); + ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest known optime via heartbeats")); auto opCtx = makeOperationContext(); getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm()); Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1); ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test")); } -TEST_F(PrimaryCatchUpTest, PrimaryFreshnessScanTimeout) { +// Heartbeats set a future target OpTime and we reached that successfully. +TEST_F(PrimaryCatchUpTest, CatchupSucceeds) { startCapturingLogMessages(); OpTime time1(Timestamp(100, 1), 0); + OpTime time2(Timestamp(100, 2), 0); ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1); - - processFreshnessScanRequests([this](const NetworkOpIter noi) { - auto request = noi->getRequest(); - log() << "Black holing request to " << request.target << ": " << request.cmdObj; - getNet()->blackHole(noi); + processHeartbeatRequests([this, time2](const NetworkOpIter noi) { + auto net = getNet(); + // The old primary accepted one more op and all nodes caught up after voting for me. + net->scheduleResponse(noi, net->now(), makeHeartbeatResponse(time2)); }); - - auto net = getNet(); - replyHeartbeatsAndRunUntil(net->now() + config.getCatchUpTimeoutPeriod()); - ASSERT_EQ((int)getReplCoord()->getApplierState(), (int)ApplierState::Draining); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); + advanceMyLastAppliedOpTime(time2); ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); stopCapturingLogMessages(); - ASSERT_EQUALS(1, countLogLinesContaining("Could not access any nodes within timeout")); + ASSERT_EQUALS(1, countLogLinesContaining("Caught up to the latest known optime successfully")); auto opCtx = makeOperationContext(); getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm()); Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1); ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test")); } -TEST_F(PrimaryCatchUpTest, PrimaryCatchUpSucceeds) { +TEST_F(PrimaryCatchUpTest, CatchupTimeout) { startCapturingLogMessages(); OpTime time1(Timestamp(100, 1), 0); OpTime time2(Timestamp(100, 2), 0); ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1); - - processFreshnessScanRequests([this, time2](const NetworkOpIter noi) { - auto net = getNet(); - // The old primary accepted one more op and all nodes caught up after voting for me. - net->scheduleResponse(noi, net->now(), makeFreshnessScanResponse(time2)); + auto catchupTimeoutTime = getNet()->now() + config.getCatchUpTimeoutPeriod(); + replyHeartbeatsAndRunUntil(catchupTimeoutTime, [this, time2](const NetworkOpIter noi) { + // Other nodes are ahead of me. + getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time2)); }); - - NetworkInterfaceMock* net = getNet(); - ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); - // Simulate the work done by bgsync and applier threads. - // setMyLastAppliedOpTime() will signal the optime waiter. - getReplCoord()->setMyLastAppliedOpTime(time2); - net->enterNetwork(); - net->runReadyNetworkOperations(); - net->exitNetwork(); ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); stopCapturingLogMessages(); - ASSERT_EQUALS(1, countLogLinesContaining("Finished catch-up oplog after becoming primary.")); + ASSERT_EQUALS(1, countLogLinesContaining("Catchup timed out")); auto opCtx = makeOperationContext(); getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm()); Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1); ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test")); } -TEST_F(PrimaryCatchUpTest, PrimaryCatchUpTimeout) { +TEST_F(PrimaryCatchUpTest, CannotSeeAllNodes) { startCapturingLogMessages(); OpTime time1(Timestamp(100, 1), 0); - OpTime time2(Timestamp(100, 2), 0); ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1); - - // The new primary learns of the latest OpTime. - processFreshnessScanRequests([this, time2](const NetworkOpIter noi) { - auto net = getNet(); - net->scheduleResponse(noi, net->now(), makeFreshnessScanResponse(time2)); + // We should get caught up by the timeout time. + auto catchupTimeoutTime = getNet()->now() + config.getCatchUpTimeoutPeriod(); + replyHeartbeatsAndRunUntil(catchupTimeoutTime, [this, time1](const NetworkOpIter noi) { + const RemoteCommandRequest& request = noi->getRequest(); + if (request.target.host() == "node2") { + auto status = Status(ErrorCodes::HostUnreachable, "Can't reach remote host"); + getNet()->scheduleResponse(noi, getNet()->now(), status); + } else { + getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time1)); + } }); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); + stopCapturingLogMessages(); + ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest known optime via heartbeats")); + auto opCtx = makeOperationContext(); + getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm()); + Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1); + ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test")); +} - ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); - replyHeartbeatsAndRunUntil(getNet()->now() + config.getCatchUpTimeoutPeriod()); +TEST_F(PrimaryCatchUpTest, HeartbeatTimeout) { + startCapturingLogMessages(); + + OpTime time1(Timestamp(100, 1), 0); + ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1); + // We should get caught up by the timeout time. + auto catchupTimeoutTime = getNet()->now() + config.getCatchUpTimeoutPeriod(); + replyHeartbeatsAndRunUntil(catchupTimeoutTime, [this, time1](const NetworkOpIter noi) { + const RemoteCommandRequest& request = noi->getRequest(); + if (request.target.host() == "node2") { + log() << "Black holing heartbeat from " << request.target.host(); + getNet()->blackHole(noi); + } else { + getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time1)); + } + }); ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); stopCapturingLogMessages(); - ASSERT_EQUALS(1, countLogLinesContaining("Cannot catch up oplog after becoming primary")); + ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest known optime via heartbeats")); auto opCtx = makeOperationContext(); getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm()); Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1); ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test")); } -TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringFreshnessScan) { +TEST_F(PrimaryCatchUpTest, PrimaryStepsDownBeforeHeartbeatRefreshing) { startCapturingLogMessages(); OpTime time1(Timestamp(100, 1), 0); OpTime time2(Timestamp(100, 2), 0); ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1); - - processFreshnessScanRequests([this, time2](const NetworkOpIter noi) { - auto request = noi->getRequest(); - log() << "Black holing request to " << request.target << ": " << request.cmdObj; - getNet()->blackHole(noi); - }); + // Step down immediately. ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); - TopologyCoordinator::UpdateTermResult updateTermResult; auto evh = getReplCoord()->updateTerm_forTest(2, &updateTermResult); ASSERT_TRUE(evh.isValid()); getReplExec()->waitForEvent(evh); ASSERT_TRUE(getReplCoord()->getMemberState().secondary()); - replyHeartbeatsAndRunUntil(getNet()->now() + config.getCatchUpTimeoutPeriod()); ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); stopCapturingLogMessages(); - ASSERT_EQUALS(1, countLogLinesContaining("Stopped transition to primary")); + ASSERT_EQUALS(1, countLogLinesContaining("Exited primary catch-up mode")); + ASSERT_EQUALS(0, countLogLinesContaining("Caught up to the latest known optime")); + ASSERT_EQUALS(0, countLogLinesContaining("Catchup timed out")); auto opCtx = makeOperationContext(); Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1); ASSERT_FALSE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test")); @@ -1551,30 +1581,25 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringCatchUp) { OpTime time1(Timestamp(100, 1), 0); OpTime time2(Timestamp(100, 2), 0); ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1); - - processFreshnessScanRequests([this, time2](const NetworkOpIter noi) { - auto net = getNet(); - // The old primary accepted one more op and all nodes caught up after voting for me. - net->scheduleResponse(noi, net->now(), makeFreshnessScanResponse(time2)); + // Step down in the middle of catchup. + auto abortTime = getNet()->now() + config.getCatchUpTimeoutPeriod() / 2; + replyHeartbeatsAndRunUntil(abortTime, [this, time2](const NetworkOpIter noi) { + // Other nodes are ahead of me. + getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time2)); }); ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); - TopologyCoordinator::UpdateTermResult updateTermResult; auto evh = getReplCoord()->updateTerm_forTest(2, &updateTermResult); ASSERT_TRUE(evh.isValid()); getReplExec()->waitForEvent(evh); ASSERT_TRUE(getReplCoord()->getMemberState().secondary()); - auto net = getNet(); - net->enterNetwork(); - net->runReadyNetworkOperations(); - net->exitNetwork(); - auto opCtx = makeOperationContext(); - // Simulate the applier signaling replCoord to exit drain mode. - // At this point, we see the stepdown and reset the states. - getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm()); + // replyHeartbeatsAndRunUntil(getNet()->now() + config.getCatchUpTimeoutPeriod()); ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); stopCapturingLogMessages(); - ASSERT_EQUALS(1, countLogLinesContaining("Cannot catch up oplog after becoming primary")); + ASSERT_EQUALS(1, countLogLinesContaining("Exited primary catch-up mode")); + ASSERT_EQUALS(0, countLogLinesContaining("Caught up to the latest known optime")); + ASSERT_EQUALS(0, countLogLinesContaining("Catchup timed out")); + auto opCtx = makeOperationContext(); Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1); ASSERT_FALSE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test")); } @@ -1586,24 +1611,17 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringDrainMode) { OpTime time2(Timestamp(100, 2), 0); ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1); - processFreshnessScanRequests([this, time2](const NetworkOpIter noi) { + processHeartbeatRequests([this, time2](const NetworkOpIter noi) { auto net = getNet(); // The old primary accepted one more op and all nodes caught up after voting for me. - net->scheduleResponse(noi, net->now(), makeFreshnessScanResponse(time2)); + net->scheduleResponse(noi, net->now(), makeHeartbeatResponse(time2)); }); - - NetworkInterfaceMock* net = getNet(); ReplicationCoordinatorImpl* replCoord = getReplCoord(); ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); - // Simulate the work done by bgsync and applier threads. - // setMyLastAppliedOpTime() will signal the optime waiter. - replCoord->setMyLastAppliedOpTime(time2); - net->enterNetwork(); - net->runReadyNetworkOperations(); - net->exitNetwork(); + advanceMyLastAppliedOpTime(time2); ASSERT(replCoord->getApplierState() == ApplierState::Draining); stopCapturingLogMessages(); - ASSERT_EQUALS(1, countLogLinesContaining("Finished catch-up oplog after becoming primary.")); + ASSERT_EQUALS(1, countLogLinesContaining("Caught up to the latest known optime")); // Step down during drain mode. TopologyCoordinator::UpdateTermResult updateTermResult; @@ -1617,9 +1635,10 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringDrainMode) { simulateSuccessfulV1Voting(); ASSERT_TRUE(replCoord->getMemberState().primary()); - // No need to catch-up, so we enter drain mode. - processFreshnessScanRequests([this](const NetworkOpIter noi) { - getNet()->scheduleResponse(noi, getNet()->now(), makeFreshnessScanResponse(OpTime())); + // No need to catch up, so we enter drain mode. + processHeartbeatRequests([this, time2](const NetworkOpIter noi) { + auto net = getNet(); + net->scheduleResponse(noi, net->now(), makeHeartbeatResponse(time2)); }); ASSERT(replCoord->getApplierState() == ApplierState::Draining); auto opCtx = makeOperationContext(); @@ -1633,6 +1652,113 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringDrainMode) { ASSERT_TRUE(replCoord->canAcceptWritesForDatabase(opCtx.get(), "test")); } +TEST_F(PrimaryCatchUpTest, FreshestNodeBecomesAvailableLater) { + OpTime time1(Timestamp(100, 1), 0); + OpTime time2(Timestamp(200, 1), 0); + OpTime time3(Timestamp(300, 1), 0); + OpTime time4(Timestamp(400, 1), 0); + + // 1) The primary is at time 1 at the beginning. + ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1); + + // 2) It cannot see all nodes. It learns of time 3 from one node, but the other isn't available. + // So the target optime is time 3. + startCapturingLogMessages(); + auto oneThirdOfTimeout = getNet()->now() + config.getCatchUpTimeoutPeriod() / 3; + replyHeartbeatsAndRunUntil(oneThirdOfTimeout, [this, time3](const NetworkOpIter noi) { + const RemoteCommandRequest& request = noi->getRequest(); + if (request.target.host() == "node2") { + auto status = Status(ErrorCodes::HostUnreachable, "Can't reach remote host"); + getNet()->scheduleResponse(noi, getNet()->now(), status); + } else { + getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time3)); + } + }); + // The node is still in catchup mode, but the target optime has been set. + ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); + stopCapturingLogMessages(); + ASSERT_EQ(1, countLogLinesContaining("Heartbeats updated catchup target optime")); + + // 3) Advancing its applied optime to time 2 isn't enough. + advanceMyLastAppliedOpTime(time2); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); + + // 4) After a while, the other node at time 4 becomes available. Time 4 becomes the new target. + startCapturingLogMessages(); + auto twoThirdsOfTimeout = getNet()->now() + config.getCatchUpTimeoutPeriod() * 2 / 3; + replyHeartbeatsAndRunUntil(twoThirdsOfTimeout, [this, time3, time4](const NetworkOpIter noi) { + const RemoteCommandRequest& request = noi->getRequest(); + if (request.target.host() == "node2") { + getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time4)); + } else { + getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time3)); + } + }); + // The node is still in catchup mode, but the target optime has been updated. + ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); + stopCapturingLogMessages(); + ASSERT_EQ(1, countLogLinesContaining("Heartbeats updated catchup target optime")); + + // 5) Advancing to time 3 isn't enough now. + advanceMyLastAppliedOpTime(time3); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); + + // 6) The node catches up time 4 eventually. + startCapturingLogMessages(); + advanceMyLastAppliedOpTime(time4); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); + stopCapturingLogMessages(); + ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest known optime")); + auto opCtx = makeOperationContext(); + getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm()); + Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1); + ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test")); +} + +TEST_F(PrimaryCatchUpTest, InfiniteTimeoutAndAbort) { + startCapturingLogMessages(); + + OpTime time1(Timestamp(100, 1), 0); + OpTime time2(Timestamp(100, 2), 0); + ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1, true); + + // Run time far forward and ensure we are still in catchup mode. + // This is an arbitrary time 'far' into the future. + auto later = getNet()->now() + config.getElectionTimeoutPeriod() * 10; + replyHeartbeatsAndRunUntil(later, [this, &config, time2](const NetworkOpIter noi) { + // Other nodes are ahead of me. + getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time2)); + + // Simulate the heartbeats from secondaries to primary to update liveness info. + // TODO(sz): Remove this after merging liveness info and heartbeats. + const RemoteCommandRequest& request = noi->getRequest(); + ReplSetHeartbeatArgsV1 hbArgs; + hbArgs.setConfigVersion(config.getConfigVersion()); + hbArgs.setSetName(config.getReplSetName()); + hbArgs.setSenderHost(request.target); + hbArgs.setSenderId(config.findMemberByHostAndPort(request.target)->getId()); + hbArgs.setTerm(getReplCoord()->getTerm()); + ASSERT(hbArgs.isInitialized()); + ReplSetHeartbeatResponse response; + ASSERT_OK(getReplCoord()->processHeartbeatV1(hbArgs, &response)); + }); + ASSERT_TRUE(getReplCoord()->getMemberState().primary()); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); + + // Simulate a user initiated abort. + ASSERT_OK(getReplCoord()->abortCatchupIfNeeded()); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); + + stopCapturingLogMessages(); + ASSERT_EQUALS(1, countLogLinesContaining("Exited primary catch-up mode")); + ASSERT_EQUALS(0, countLogLinesContaining("Caught up to the latest known optime")); + ASSERT_EQUALS(0, countLogLinesContaining("Catchup timed out")); + auto opCtx = makeOperationContext(); + getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm()); + Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1); + ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test")); +} + } // namespace } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index 0495e843590..68e1a61c4ac 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -234,6 +234,11 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse( // Wake the stepdown waiter when our updated OpTime allows it to finish stepping down. _signalStepDownWaiter_inlock(); + // Abort catchup if we have caught up to the latest known optime after heartbeat refreshing. + if (_catchupState) { + _catchupState->signalHeartbeatUpdate_inlock(); + } + _scheduleHeartbeatToTarget_inlock( target, targetIndex, std::max(now, action.getNextHeartbeatStartDate())); @@ -665,6 +670,9 @@ void ReplicationCoordinatorImpl::_startHeartbeats_inlock() { } _scheduleHeartbeatToTarget_inlock(_rsConfig.getMemberAt(i).getHostAndPort(), i, now); } + + _topCoord->restartHeartbeats(); + if (isV1ElectionProtocol()) { for (auto&& slaveInfo : _slaveInfo) { slaveInfo.lastUpdate = _replExecutor->now(); diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index e76c37898a3..1f3b0881d5c 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -107,11 +107,6 @@ void runSingleNodeElection(ServiceContext::UniqueOperationContext opCtx, replCoord->setMyLastDurableOpTime(OpTime(Timestamp(1, 0), 0)); ASSERT(replCoord->setFollowerMode(MemberState::RS_SECONDARY)); replCoord->waitForElectionFinish_forTest(); - // Wait for primary catch-up - net->enterNetwork(); - net->runReadyNetworkOperations(); - net->exitNetwork(); - ASSERT(replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Draining); ASSERT(replCoord->getMemberState().primary()) << replCoord->getMemberState().toString(); @@ -5057,7 +5052,6 @@ TEST_F(ReplCoordTest, WaitForDrainFinish) { // Single node cluster - this node should start election on setFollowerMode() completion. replCoord->waitForElectionFinish_forTest(); - simulateCatchUpTimeout(); // Successful dry run election increases term. ASSERT_EQUALS(initialTerm + 1, replCoord->getTerm()); diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index 27d62c0af1e..18397d23a23 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -497,5 +497,9 @@ void ReplicationCoordinatorMock::setMaster(bool isMaster) { _settings.setMaster(isMaster); } +Status ReplicationCoordinatorMock::abortCatchupIfNeeded() { + return Status::OK(); +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index fdfa491f0ad..72bc120f226 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -284,6 +284,8 @@ public: return _service; } + virtual Status abortCatchupIfNeeded() override; + private: AtomicUInt64 _snapshotNameGenerator; ServiceContext* const _service; diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp index e5a17dd5c1a..aaa00fcdae4 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp @@ -323,6 +323,10 @@ void ReplCoordTest::simulateSuccessfulV1ElectionAt(Date_t electionTime) { ReplSetHeartbeatResponse hbResp; hbResp.setSetName(rsConfig.getReplSetName()); hbResp.setState(MemberState::RS_SECONDARY); + // The smallest valid optime in PV1. + OpTime opTime(Timestamp(), 0); + hbResp.setAppliedOpTime(opTime); + hbResp.setDurableOpTime(opTime); hbResp.setConfigVersion(rsConfig.getConfigVersion()); net->scheduleResponse(noi, net->now(), makeResponseStatus(hbResp.toBSON(true))); } else if (request.cmdObj.firstElement().fieldNameStringData() == "replSetRequestVotes") { @@ -488,32 +492,30 @@ void ReplCoordTest::disableSnapshots() { _externalState->setAreSnapshotsEnabled(false); } -void ReplCoordTest::simulateCatchUpTimeout() { +void ReplCoordTest::simulateCatchUpAbort() { NetworkInterfaceMock* net = getNet(); - auto catchUpTimeoutWhen = net->now() + getReplCoord()->getConfig().getCatchUpTimeoutPeriod(); + auto heartbeatTimeoutWhen = + net->now() + getReplCoord()->getConfig().getHeartbeatTimeoutPeriodMillis(); bool hasRequest = false; net->enterNetwork(); - if (net->now() < catchUpTimeoutWhen) { - net->runUntil(catchUpTimeoutWhen); + if (net->now() < heartbeatTimeoutWhen) { + net->runUntil(heartbeatTimeoutWhen); } hasRequest = net->hasReadyRequests(); - net->exitNetwork(); - while (hasRequest) { - net->enterNetwork(); auto noi = net->getNextReadyRequest(); auto request = noi->getRequest(); // Black hole heartbeat requests caused by time advance. log() << "Black holing request to " << request.target.toString() << " : " << request.cmdObj; net->blackHole(noi); - if (net->now() < catchUpTimeoutWhen) { - net->runUntil(catchUpTimeoutWhen); + if (net->now() < heartbeatTimeoutWhen) { + net->runUntil(heartbeatTimeoutWhen); } else { net->runReadyNetworkOperations(); } hasRequest = net->hasReadyRequests(); - net->exitNetwork(); } + net->exitNetwork(); } } // namespace repl diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.h b/src/mongo/db/repl/replication_coordinator_test_fixture.h index 972e2f503ae..ab2653be11d 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.h +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.h @@ -253,9 +253,9 @@ protected: void disableSnapshots(); /** - * Timeout all freshness scan request for primary catch-up. + * Timeout all heartbeat requests for primary catch-up. */ - void simulateCatchUpTimeout(); + void simulateCatchUpAbort(); private: std::unique_ptr _repl; diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h index 2a2fc1259fb..37aad7459fa 100644 --- a/src/mongo/db/repl/topology_coordinator.h +++ b/src/mongo/db/repl/topology_coordinator.h @@ -499,6 +499,21 @@ public: */ virtual void setStorageEngineSupportsReadCommitted(bool supported) = 0; + /** + * Reset the booleans to record the last heartbeat restart. + */ + virtual void restartHeartbeats() = 0; + + /** + * Scans through all members that are 'up' and return the latest known optime, if we have + * received (successful or failed) heartbeats from all nodes since heartbeat restart. + * + * Returns boost::none if any node hasn't responded to a heartbeat since we last restarted + * heartbeats. + * Returns OpTime(Timestamp(0, 0), 0), the smallest OpTime in PV1, if other nodes are all down. + */ + virtual boost::optional latestKnownOpTimeSinceHeartbeatRestart() const = 0; + protected: TopologyCoordinator() {} }; diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp index 1c1f53f89a1..4211dee4860 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl.cpp @@ -2648,5 +2648,35 @@ void TopologyCoordinatorImpl::setStorageEngineSupportsReadCommitted(bool support supported ? ReadCommittedSupport::kYes : ReadCommittedSupport::kNo; } +void TopologyCoordinatorImpl::restartHeartbeats() { + for (auto& hb : _hbdata) { + hb.restart(); + } +} + +boost::optional TopologyCoordinatorImpl::latestKnownOpTimeSinceHeartbeatRestart() const { + // The smallest OpTime in PV1. + OpTime latest(Timestamp(0, 0), 0); + for (size_t i = 0; i < _hbdata.size(); i++) { + auto& peer = _hbdata[i]; + + if (static_cast(i) == _selfIndex) { + continue; + } + // If any heartbeat is not fresh enough, return none. + if (!peer.isUpdatedSinceRestart()) { + return boost::none; + } + // Ignore down members + if (!peer.up()) { + continue; + } + if (peer.getAppliedOpTime() > latest) { + latest = peer.getAppliedOpTime(); + } + } + return latest; +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h index a52758ab8ed..0c268b67347 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.h +++ b/src/mongo/db/repl/topology_coordinator_impl.h @@ -246,6 +246,10 @@ public: bool isPriorityTakeover); virtual void setStorageEngineSupportsReadCommitted(bool supported); + virtual void restartHeartbeats(); + + virtual boost::optional latestKnownOpTimeSinceHeartbeatRestart() const; + //////////////////////////////////////////////////////////// // // Test support methods diff --git a/src/mongo/shell/replsettest.js b/src/mongo/shell/replsettest.js index 8f2b9c17be9..661c4e0b479 100644 --- a/src/mongo/shell/replsettest.js +++ b/src/mongo/shell/replsettest.js @@ -681,7 +681,8 @@ var ReplSetTest = function(opts) { return config; } - if (_isRunningWithoutJournaling(replNode)) { + // Check journaling by sending commands through the bridge if it's used. + if (_isRunningWithoutJournaling(this.nodes[0])) { config[wcMajorityJournalField] = false; } @@ -873,9 +874,11 @@ var ReplSetTest = function(opts) { }; this.reInitiate = function() { - var config = this.getReplSetConfig(); - var newVersion = this.getReplSetConfigFromNode().version + 1; - config.version = newVersion; + var config = this.getReplSetConfigFromNode(); + var newConfig = this.getReplSetConfig(); + // Only reset members. + config.members = newConfig.members; + config.version += 1; this._setDefaultConfigOptions(config); -- cgit v1.2.1