diff options
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_impl.cpp')
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 559 |
1 files changed, 99 insertions, 460 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index b4138baa290..7591746797d 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -135,25 +135,6 @@ BSONObj incrementConfigVersionByRandom(BSONObj config) { const Seconds kNoopWriterPeriod(10); } // namespace -BSONObj ReplicationCoordinatorImpl::SlaveInfo::toBSON() const { - BSONObjBuilder bo; - bo.append("id", memberId); - bo.append("rid", rid); - bo.append("host", hostAndPort.toString()); - bo.append("lastDurableOpTime", lastDurableOpTime.toBSON()); - bo.append("lastAppliedOpTime", lastAppliedOpTime.toBSON()); - if (self) - bo.append("self", true); - if (down) - bo.append("down", true); - bo.append("lastUpdated", lastUpdate); - return bo.obj(); -} - -std::string ReplicationCoordinatorImpl::SlaveInfo::toString() const { - return toBSON().toString(); -} - ReplicationCoordinatorImpl::Waiter::Waiter(OpTime _opTime, const WriteConcernOptions* _writeConcern) : opTime(std::move(_opTime)), writeConcern(_writeConcern) {} @@ -331,11 +312,6 @@ ReplicationCoordinatorImpl::ReplicationCoordinatorImpl( return; } - // Make sure there is always an entry in _slaveInfo for ourself. - SlaveInfo selfInfo; - selfInfo.self = true; - _slaveInfo.push_back(selfInfo); - _externalState->setupNoopWriter(kNoopWriterPeriod); } @@ -681,7 +657,7 @@ void ReplicationCoordinatorImpl::startup(OperationContext* opCtx) { fassert(18822, !_inShutdown); _setConfigState_inlock(kConfigStartingUp); _myRID = rid; - _slaveInfo[_getMyIndexInSlaveInfo_inlock()].rid = rid; + _topCoord->getMyMemberHeartbeatData()->setRid(rid); } if (!_settings.usingReplSets()) { @@ -917,7 +893,7 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* opCtx, lk.unlock(); OpTime firstOpTime = _externalState->onTransitionToPrimary(opCtx, isV1ElectionProtocol()); lk.lock(); - _setFirstOpTimeOfMyTerm_inlock(firstOpTime); + _topCoord->setFirstOpTimeOfMyTerm(firstOpTime); // Must calculate the commit level again because firstOpTimeOfMyTerm wasn't set when we logged // our election in onTransitionToPrimary(), above. @@ -946,127 +922,6 @@ void ReplicationCoordinatorImpl::signalUpstreamUpdater() { _externalState->forwardSlaveProgress(); } -ReplicationCoordinatorImpl::SlaveInfo* ReplicationCoordinatorImpl::_findSlaveInfoByMemberID_inlock( - int memberId) { - for (SlaveInfoVector::iterator it = _slaveInfo.begin(); it != _slaveInfo.end(); ++it) { - if (it->memberId == memberId) { - return &(*it); - } - } - return NULL; -} - -ReplicationCoordinatorImpl::SlaveInfo* ReplicationCoordinatorImpl::_findSlaveInfoByRID_inlock( - const OID& rid) { - for (SlaveInfoVector::iterator it = _slaveInfo.begin(); it != _slaveInfo.end(); ++it) { - if (it->rid == rid) { - return &(*it); - } - } - return NULL; -} - -void ReplicationCoordinatorImpl::_addSlaveInfo_inlock(const SlaveInfo& slaveInfo) { - invariant(getReplicationMode() == modeMasterSlave); - _slaveInfo.push_back(slaveInfo); - - _updateLastCommittedOpTime_inlock(); - // Wake up any threads waiting for replication that now have their replication - // check satisfied - _wakeReadyWaiters_inlock(); -} - -void ReplicationCoordinatorImpl::_updateSlaveInfoAppliedOpTime_inlock(SlaveInfo* slaveInfo, - const OpTime& opTime) { - slaveInfo->lastAppliedOpTime = opTime; - slaveInfo->lastUpdate = _replExecutor->now(); - slaveInfo->down = false; - - _updateLastCommittedOpTime_inlock(); - // Wake up any threads waiting for replication that now have their replication - // check satisfied - _wakeReadyWaiters_inlock(); -} - -void ReplicationCoordinatorImpl::_updateSlaveInfoDurableOpTime_inlock(SlaveInfo* slaveInfo, - const OpTime& opTime) { - // lastAppliedOpTime cannot be behind lastDurableOpTime. - if (slaveInfo->lastAppliedOpTime < opTime) { - log() << "Durable progress (" << opTime << ") is ahead of the applied progress (" - << slaveInfo->lastAppliedOpTime << ". This is likely due to a " - "rollback. slaveInfo: " - << slaveInfo->toString(); - return; - } - slaveInfo->lastDurableOpTime = opTime; - slaveInfo->lastUpdate = _replExecutor->now(); - slaveInfo->down = false; - - _updateLastCommittedOpTime_inlock(); - // Wake up any threads waiting for replication that now have their replication - // check satisfied - _wakeReadyWaiters_inlock(); -} - -void ReplicationCoordinatorImpl::_updateSlaveInfoFromConfig_inlock() { - invariant(_settings.usingReplSets()); - - SlaveInfoVector oldSlaveInfos; - _slaveInfo.swap(oldSlaveInfos); - - if (_selfIndex == -1) { - // If we aren't in the config then the only data we care about is for ourself - for (SlaveInfoVector::const_iterator it = oldSlaveInfos.begin(); it != oldSlaveInfos.end(); - ++it) { - if (it->self) { - SlaveInfo slaveInfo = *it; - slaveInfo.memberId = -1; - _slaveInfo.push_back(slaveInfo); - return; - } - } - invariant(false); // There should always have been an entry for ourself - } - - for (int i = 0; i < _rsConfig.getNumMembers(); ++i) { - const MemberConfig& memberConfig = _rsConfig.getMemberAt(i); - int memberId = memberConfig.getId(); - const HostAndPort& memberHostAndPort = memberConfig.getHostAndPort(); - - SlaveInfo slaveInfo; - - // Check if the node existed with the same member ID and hostname in the old data - for (SlaveInfoVector::const_iterator it = oldSlaveInfos.begin(); it != oldSlaveInfos.end(); - ++it) { - if ((it->memberId == memberId && it->hostAndPort == memberHostAndPort) || - (i == _selfIndex && it->self)) { - slaveInfo = *it; - } - } - - // Make sure you have the most up-to-date info for member ID and hostAndPort. - slaveInfo.memberId = memberId; - slaveInfo.hostAndPort = memberHostAndPort; - _slaveInfo.push_back(slaveInfo); - } - invariant(static_cast<int>(_slaveInfo.size()) == _rsConfig.getNumMembers()); -} - -size_t ReplicationCoordinatorImpl::_getMyIndexInSlaveInfo_inlock() const { - if (getReplicationMode() == modeMasterSlave) { - // Self data always lives in the first entry in _slaveInfo for master/slave - return 0; - } else { - invariant(_settings.usingReplSets()); - if (_selfIndex == -1) { - invariant(_slaveInfo.size() == 1); - return 0; - } else { - return _selfIndex; - } - } -} - Status ReplicationCoordinatorImpl::setLastOptimeForSlave(const OID& rid, const Timestamp& ts) { stdx::unique_lock<stdx::mutex> lock(_mutex); massert(28576, @@ -1077,17 +932,14 @@ Status ReplicationCoordinatorImpl::setLastOptimeForSlave(const OID& rid, const T // term == -1 for master-slave OpTime opTime(ts, OpTime::kUninitializedTerm); - SlaveInfo* slaveInfo = _findSlaveInfoByRID_inlock(rid); - if (slaveInfo) { - if (slaveInfo->lastAppliedOpTime < opTime) { - _updateSlaveInfoAppliedOpTime_inlock(slaveInfo, opTime); - } + MemberHeartbeatData* memberHeartbeatData = _topCoord->findMemberHeartbeatDataByRid(rid); + if (memberHeartbeatData) { + memberHeartbeatData->advanceLastAppliedOpTime(opTime, _replExecutor->now()); } else { - SlaveInfo newSlaveInfo; - newSlaveInfo.rid = rid; - newSlaveInfo.lastAppliedOpTime = opTime; - _addSlaveInfo_inlock(newSlaveInfo); + auto* memberHeartbeatData = _topCoord->addSlaveMemberData(rid); + memberHeartbeatData->setLastAppliedOpTime(opTime, _replExecutor->now()); } + _updateLastCommittedOpTime_inlock(); return Status::OK(); } @@ -1156,27 +1008,20 @@ void ReplicationCoordinatorImpl::_reportUpstream_inlock(stdx::unique_lock<stdx:: void ReplicationCoordinatorImpl::_setMyLastAppliedOpTime_inlock(const OpTime& opTime, bool isRollbackAllowed) { - SlaveInfo* mySlaveInfo = &_slaveInfo[_getMyIndexInSlaveInfo_inlock()]; - invariant(isRollbackAllowed || mySlaveInfo->lastAppliedOpTime <= opTime); - _updateSlaveInfoAppliedOpTime_inlock(mySlaveInfo, opTime); - + auto* myMemberHeartbeatData = _topCoord->getMyMemberHeartbeatData(); + invariant(isRollbackAllowed || myMemberHeartbeatData->getLastAppliedOpTime() <= opTime); + myMemberHeartbeatData->setLastAppliedOpTime(opTime, _replExecutor->now()); + _updateLastCommittedOpTime_inlock(); _opTimeWaiterList.signalAndRemoveIf_inlock( [opTime](Waiter* waiter) { return waiter->opTime <= opTime; }); } void ReplicationCoordinatorImpl::_setMyLastDurableOpTime_inlock(const OpTime& opTime, bool isRollbackAllowed) { - SlaveInfo* mySlaveInfo = &_slaveInfo[_getMyIndexInSlaveInfo_inlock()]; - invariant(isRollbackAllowed || mySlaveInfo->lastDurableOpTime <= opTime); - // lastAppliedOpTime cannot be behind lastDurableOpTime. - if (mySlaveInfo->lastAppliedOpTime < opTime) { - log() << "My durable progress (" << opTime << ") is ahead of my applied progress (" - << mySlaveInfo->lastAppliedOpTime << ". This is likely due to a " - "rollback. slaveInfo: " - << mySlaveInfo->toString(); - return; - } - _updateSlaveInfoDurableOpTime_inlock(mySlaveInfo, opTime); + auto* myMemberHeartbeatData = _topCoord->getMyMemberHeartbeatData(); + invariant(isRollbackAllowed || myMemberHeartbeatData->getLastDurableOpTime() <= opTime); + myMemberHeartbeatData->setLastDurableOpTime(opTime, _replExecutor->now()); + _updateLastCommittedOpTime_inlock(); } OpTime ReplicationCoordinatorImpl::getMyLastAppliedOpTime() const { @@ -1343,11 +1188,11 @@ Status ReplicationCoordinatorImpl::_waitUntilOpTimeForReadDeprecated( } OpTime ReplicationCoordinatorImpl::_getMyLastAppliedOpTime_inlock() const { - return _slaveInfo[_getMyIndexInSlaveInfo_inlock()].lastAppliedOpTime; + return _topCoord->getMyLastAppliedOpTime(); } OpTime ReplicationCoordinatorImpl::_getMyLastDurableOpTime_inlock() const { - return _slaveInfo[_getMyIndexInSlaveInfo_inlock()].lastDurableOpTime; + return _topCoord->getMyLastDurableOpTime(); } Status ReplicationCoordinatorImpl::setLastDurableOptime_forTest(long long cfgVer, @@ -1402,7 +1247,6 @@ Status ReplicationCoordinatorImpl::_setLastOptime_inlock( << " in config with version " << args.cfgver << " has durably reached optime: " << args.ts; - SlaveInfo* slaveInfo = NULL; if (args.cfgver != _rsConfig.getConfigVersion()) { std::string errmsg = str::stream() << "Received replSetUpdatePosition for node with memberId " << args.memberId @@ -1413,8 +1257,8 @@ Status ReplicationCoordinatorImpl::_setLastOptime_inlock( return Status(ErrorCodes::InvalidReplicaSetConfig, errmsg); } - slaveInfo = _findSlaveInfoByMemberID_inlock(args.memberId); - if (!slaveInfo) { + auto* memberHeartbeatData = _topCoord->findMemberHeartbeatDataByMemberId(args.memberId); + if (!memberHeartbeatData) { invariant(!_rsConfig.findMemberByID(args.memberId)); std::string errmsg = str::stream() @@ -1424,25 +1268,22 @@ Status ReplicationCoordinatorImpl::_setLastOptime_inlock( return Status(ErrorCodes::NodeNotFound, errmsg); } - invariant(args.memberId == slaveInfo->memberId); + invariant(args.memberId == memberHeartbeatData->getMemberId()); - LOG(3) << "Node with memberID " << args.memberId << " has durably applied operationss through " - << slaveInfo->lastDurableOpTime << " and has applied operations through " - << slaveInfo->lastAppliedOpTime << "; updating to new durable operation with timestamp " - << args.ts; + LOG(3) << "Node with memberID " << args.memberId << " has durably applied operations through " + << memberHeartbeatData->getLastDurableOpTime() << " and has applied operations through " + << memberHeartbeatData->getLastAppliedOpTime() + << "; updating to new durable operation with timestamp " << args.ts; - // Only update remote optimes if they increase. - if (slaveInfo->lastAppliedOpTime < args.ts) { - _updateSlaveInfoAppliedOpTime_inlock(slaveInfo, args.ts); - } - if (slaveInfo->lastDurableOpTime < args.ts) { - _updateSlaveInfoDurableOpTime_inlock(slaveInfo, args.ts); - } + auto now(_replExecutor->now()); + bool advancedOpTime = memberHeartbeatData->advanceLastAppliedOpTime(args.ts, now); + advancedOpTime = memberHeartbeatData->advanceLastDurableOpTime(args.ts, now) || advancedOpTime; + // Only update committed optime if the remote optimes increased. + if (advancedOpTime) { + _updateLastCommittedOpTime_inlock(); + } - // Update liveness for this node. - slaveInfo->lastUpdate = _replExecutor->now(); - slaveInfo->down = false; _cancelAndRescheduleLivenessUpdate_inlock(args.memberId); return Status::OK(); } @@ -1474,7 +1315,6 @@ Status ReplicationCoordinatorImpl::_setLastOptime_inlock(const UpdatePositionArg << " has reached optime: " << args.appliedOpTime << " and is durable through: " << args.durableOpTime; - SlaveInfo* slaveInfo = NULL; if (args.cfgver != _rsConfig.getConfigVersion()) { std::string errmsg = str::stream() << "Received replSetUpdatePosition for node with memberId " << args.memberId @@ -1485,8 +1325,8 @@ Status ReplicationCoordinatorImpl::_setLastOptime_inlock(const UpdatePositionArg return Status(ErrorCodes::InvalidReplicaSetConfig, errmsg); } - slaveInfo = _findSlaveInfoByMemberID_inlock(args.memberId); - if (!slaveInfo) { + auto* memberHeartbeatData = _topCoord->findMemberHeartbeatDataByMemberId(args.memberId); + if (!memberHeartbeatData) { invariant(!_rsConfig.findMemberByID(args.memberId)); std::string errmsg = str::stream() @@ -1496,25 +1336,24 @@ Status ReplicationCoordinatorImpl::_setLastOptime_inlock(const UpdatePositionArg return Status(ErrorCodes::NodeNotFound, errmsg); } - invariant(args.memberId == slaveInfo->memberId); + invariant(args.memberId == memberHeartbeatData->getMemberId()); LOG(3) << "Node with memberID " << args.memberId << " currently has optime " - << slaveInfo->lastAppliedOpTime << " durable through " << slaveInfo->lastDurableOpTime - << "; updating to optime " << args.appliedOpTime << " and durable through " - << args.durableOpTime; + << memberHeartbeatData->getLastAppliedOpTime() << " durable through " + << memberHeartbeatData->getLastDurableOpTime() << "; updating to optime " + << args.appliedOpTime << " and durable through " << args.durableOpTime; - // Only update remote optimes if they increase. - if (slaveInfo->lastAppliedOpTime < args.appliedOpTime) { - _updateSlaveInfoAppliedOpTime_inlock(slaveInfo, args.appliedOpTime); - } - if (slaveInfo->lastDurableOpTime < args.durableOpTime) { - _updateSlaveInfoDurableOpTime_inlock(slaveInfo, args.durableOpTime); + auto now(_replExecutor->now()); + bool advancedOpTime = memberHeartbeatData->advanceLastAppliedOpTime(args.appliedOpTime, now); + advancedOpTime = + memberHeartbeatData->advanceLastDurableOpTime(args.durableOpTime, now) || advancedOpTime; + + // Only update committed optime if the remote optimes increased. + if (advancedOpTime) { + _updateLastCommittedOpTime_inlock(); } - // Update liveness for this node. - slaveInfo->lastUpdate = _replExecutor->now(); - slaveInfo->down = false; _cancelAndRescheduleLivenessUpdate_inlock(args.memberId); return Status::OK(); } @@ -1531,7 +1370,8 @@ bool ReplicationCoordinatorImpl::_doneWaitingForReplication_inlock( const bool useDurableOpTime = writeConcern.syncMode == WriteConcernOptions::SyncMode::JOURNAL; if (writeConcern.wMode.empty()) { - return _haveNumNodesReachedOpTime_inlock(opTime, writeConcern.wNumNodes, useDurableOpTime); + return _topCoord->haveNumNodesReachedOpTime( + opTime, writeConcern.wNumNodes, useDurableOpTime); } StringData patternName; @@ -1564,53 +1404,7 @@ bool ReplicationCoordinatorImpl::_doneWaitingForReplication_inlock( if (!tagPattern.isOK()) { return true; } - return _haveTaggedNodesReachedOpTime_inlock(opTime, tagPattern.getValue(), useDurableOpTime); -} - -bool ReplicationCoordinatorImpl::_haveNumNodesReachedOpTime_inlock(const OpTime& targetOpTime, - int numNodes, - bool durablyWritten) { - // Replication progress that is for some reason ahead of us should not allow us to - // satisfy a write concern if we aren't caught up ourselves. - OpTime myOpTime = - durablyWritten ? _getMyLastDurableOpTime_inlock() : _getMyLastAppliedOpTime_inlock(); - if (myOpTime < targetOpTime) { - return false; - } - - for (SlaveInfoVector::iterator it = _slaveInfo.begin(); it != _slaveInfo.end(); ++it) { - const OpTime& slaveTime = durablyWritten ? it->lastDurableOpTime : it->lastAppliedOpTime; - if (slaveTime >= targetOpTime) { - --numNodes; - } - - if (numNodes <= 0) { - return true; - } - } - return false; -} - -bool ReplicationCoordinatorImpl::_haveTaggedNodesReachedOpTime_inlock( - const OpTime& opTime, const ReplSetTagPattern& tagPattern, bool durablyWritten) { - ReplSetTagMatch matcher(tagPattern); - for (SlaveInfoVector::iterator it = _slaveInfo.begin(); it != _slaveInfo.end(); ++it) { - const OpTime& slaveTime = durablyWritten ? it->lastDurableOpTime : it->lastAppliedOpTime; - if (slaveTime >= opTime) { - // This node has reached the desired optime, now we need to check if it is a part - // of the tagPattern. - const MemberConfig* memberConfig = _rsConfig.findMemberByID(it->memberId); - invariant(memberConfig); - for (MemberConfig::TagIterator it = memberConfig->tagsBegin(); - it != memberConfig->tagsEnd(); - ++it) { - if (matcher.update(*it)) { - return true; - } - } - } - } - return false; + return _topCoord->haveTaggedNodesReachedOpTime(opTime, tagPattern.getValue(), useDurableOpTime); } ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::awaitReplication( @@ -1736,7 +1530,7 @@ Status ReplicationCoordinatorImpl::_awaitReplication_inlock( if (Command::testCommandsEnabled) { // log state of replica set on timeout to help with diagnosis. BSONObjBuilder progress; - _appendSlaveInfoData_inlock(&progress); + _topCoord->fillMemberData(&progress); log() << "Replication for failed WC: " << writeConcern.toBSON() << ", waitInfo: " << waiter << ", opID: " << opCtx->getOpID() << ", progress: " << progress.done(); @@ -1835,7 +1629,7 @@ bool ReplicationCoordinatorImpl::_tryToStepDown_inlock(const Date_t waitUntil, OpTime lastApplied = _getMyLastAppliedOpTime_inlock(); if (forceNow) { - return _topCoord->stepDown(stepDownUntil, forceNow, lastApplied); + return _topCoord->stepDown(stepDownUntil, forceNow); } auto tagStatus = _rsConfig.findCustomWriteMode(ReplSetConfig::kMajorityWriteConcernModeName); @@ -1843,8 +1637,8 @@ bool ReplicationCoordinatorImpl::_tryToStepDown_inlock(const Date_t waitUntil, // Check if a majority of nodes have reached the last applied optime // and there exist an electable node that has my last applied optime. - if (_haveTaggedNodesReachedOpTime_inlock(lastApplied, tagStatus.getValue(), false) && - _topCoord->stepDown(stepDownUntil, forceNow, lastApplied)) { + if (_topCoord->haveTaggedNodesReachedOpTime(lastApplied, tagStatus.getValue(), false) && + _topCoord->stepDown(stepDownUntil, forceNow)) { return true; } @@ -2060,57 +1854,9 @@ Status ReplicationCoordinatorImpl::resyncData(OperationContext* opCtx, bool wait StatusWith<BSONObj> ReplicationCoordinatorImpl::prepareReplSetUpdatePositionCommand( ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle) const { - BSONObjBuilder cmdBuilder; - { - stdx::lock_guard<stdx::mutex> lock(_mutex); - invariant(_rsConfig.isInitialized()); - // Do not send updates if we have been removed from the config. - if (_selfIndex == -1) { - return Status(ErrorCodes::NodeNotFound, - "This node is not in the current replset configuration."); - } - cmdBuilder.append(UpdatePositionArgs::kCommandFieldName, 1); - // Create an array containing objects each live member connected to us and for ourself. - BSONArrayBuilder arrayBuilder(cmdBuilder.subarrayStart("optimes")); - for (const auto& slaveInfo : _slaveInfo) { - if (slaveInfo.lastAppliedOpTime.isNull()) { - // Don't include info on members we haven't heard from yet. - continue; - } - // Don't include members we think are down. - if (!slaveInfo.self && slaveInfo.down) { - continue; - } - - BSONObjBuilder entry(arrayBuilder.subobjStart()); - switch (commandStyle) { - case ReplSetUpdatePositionCommandStyle::kNewStyle: - slaveInfo.lastDurableOpTime.append(&entry, - UpdatePositionArgs::kDurableOpTimeFieldName); - slaveInfo.lastAppliedOpTime.append(&entry, - UpdatePositionArgs::kAppliedOpTimeFieldName); - break; - case ReplSetUpdatePositionCommandStyle::kOldStyle: - entry.append("_id", slaveInfo.rid); - if (isV1ElectionProtocol()) { - slaveInfo.lastDurableOpTime.append(&entry, "optime"); - } else { - entry.append("optime", slaveInfo.lastDurableOpTime.getTimestamp()); - } - break; - } - entry.append(UpdatePositionArgs::kMemberIdFieldName, slaveInfo.memberId); - entry.append(UpdatePositionArgs::kConfigVersionFieldName, _rsConfig.getConfigVersion()); - } - arrayBuilder.done(); - } - - // Add metadata to command. Old style parsing logic will reject the metadata. - if (commandStyle == ReplSetUpdatePositionCommandStyle::kNewStyle) { - stdx::lock_guard<stdx::mutex> lock(_mutex); - _prepareReplSetMetadata_inlock(OpTime(), &cmdBuilder); - } - return cmdBuilder.obj(); + stdx::lock_guard<stdx::mutex> lock(_mutex); + return _topCoord->prepareReplSetUpdatePositionCommand( + commandStyle, _getCurrentCommittedSnapshotOpTime_inlock()); } Status ReplicationCoordinatorImpl::processReplSetGetStatus( @@ -2130,9 +1876,6 @@ Status ReplicationCoordinatorImpl::processReplSetGetStatus( TopologyCoordinator::ReplSetStatusArgs{ _replExecutor->now(), static_cast<unsigned>(time(0) - serverGlobalParams.started), - _getMyLastAppliedOpTime_inlock(), - _getMyLastDurableOpTime_inlock(), - _lastCommittedOpTime, _getCurrentCommittedSnapshotOpTime_inlock(), initialSyncProgress}, response, @@ -2162,34 +1905,7 @@ void ReplicationCoordinatorImpl::fillIsMasterForReplSet(IsMasterResponse* respon void ReplicationCoordinatorImpl::appendSlaveInfoData(BSONObjBuilder* result) { stdx::lock_guard<stdx::mutex> lock(_mutex); - _appendSlaveInfoData_inlock(result); -} - -void ReplicationCoordinatorImpl::_appendSlaveInfoData_inlock(BSONObjBuilder* result) { - BSONArrayBuilder replicationProgress(result->subarrayStart("replicationProgress")); - { - for (SlaveInfoVector::const_iterator itr = _slaveInfo.begin(); itr != _slaveInfo.end(); - ++itr) { - BSONObjBuilder entry(replicationProgress.subobjStart()); - entry.append("rid", itr->rid); - if (isV1ElectionProtocol()) { - BSONObjBuilder opTime(entry.subobjStart("optime")); - opTime.append("ts", itr->lastDurableOpTime.getTimestamp()); - opTime.append("term", itr->lastDurableOpTime.getTerm()); - opTime.done(); - } else { - entry.append("optime", itr->lastDurableOpTime.getTimestamp()); - } - entry.append("host", itr->hostAndPort.toString()); - if (getReplicationMode() == modeReplSet) { - if (_selfIndex == -1) { - continue; - } - invariant(itr->memberId >= 0); - entry.append("memberId", itr->memberId); - } - } - } + _topCoord->fillMemberData(result); } ReplSetConfig ReplicationCoordinatorImpl::getConfig() const { @@ -2278,8 +1994,7 @@ Status ReplicationCoordinatorImpl::processReplSetSyncFrom(OperationContext* opCt auto doResync = false; { stdx::lock_guard<stdx::mutex> lk(_mutex); - auto opTime = _getMyLastAppliedOpTime_inlock(); - _topCoord->prepareSyncFromResponse(target, opTime, resultObj, &result); + _topCoord->prepareSyncFromResponse(target, resultObj, &result); // If we are in the middle of an initial sync, do a resync. doResync = result.isOK() && _initialSyncer && _initialSyncer->isActive(); } @@ -2325,12 +2040,8 @@ Status ReplicationCoordinatorImpl::processHeartbeat(const ReplSetHeartbeatArgs& stdx::lock_guard<stdx::mutex> lk(_mutex); const Date_t now = _replExecutor->now(); - Status result = _topCoord->prepareHeartbeatResponse(now, - args, - _settings.ourSetName(), - _getMyLastAppliedOpTime_inlock(), - _getMyLastDurableOpTime_inlock(), - response); + Status result = + _topCoord->prepareHeartbeatResponse(now, args, _settings.ourSetName(), response); if ((result.isOK() || result == ErrorCodes::InvalidReplicaSetConfig) && _selfIndex < 0) { // If this node does not belong to the configuration it knows about, send heartbeats // back to any node that sends us a heartbeat, in case one of those remote nodes has @@ -2893,8 +2604,7 @@ Status ReplicationCoordinatorImpl::processReplSetFresh(const ReplSetFreshArgs& a BSONObjBuilder* resultObj) { stdx::lock_guard<stdx::mutex> lk(_mutex); Status result(ErrorCodes::InternalError, "didn't set status in prepareFreshResponse"); - _topCoord->prepareFreshResponse( - args, _replExecutor->now(), _getMyLastAppliedOpTime_inlock(), resultObj, &result); + _topCoord->prepareFreshResponse(args, _replExecutor->now(), resultObj, &result); return result; } @@ -2902,8 +2612,7 @@ Status ReplicationCoordinatorImpl::processReplSetElect(const ReplSetElectArgs& a BSONObjBuilder* responseObj) { stdx::lock_guard<stdx::mutex> lk(_mutex); Status result = Status(ErrorCodes::InternalError, "status not set by callback"); - _topCoord->prepareElectResponse( - args, _replExecutor->now(), _getMyLastAppliedOpTime_inlock(), responseObj, &result); + _topCoord->prepareElectResponse(args, _replExecutor->now(), responseObj, &result); return result; } @@ -2914,9 +2623,7 @@ ReplicationCoordinatorImpl::_setCurrentRSConfig_inlock(const ReplSetConfig& newC _cancelHeartbeats_inlock(); _setConfigState_inlock(kConfigSteady); - // Must get this before changing our config. - OpTime myOptime = _getMyLastAppliedOpTime_inlock(); - _topCoord->updateConfig(newConfig, myIndex, _replExecutor->now(), myOptime); + _topCoord->updateConfig(newConfig, myIndex, _replExecutor->now()); const ReplSetConfig oldConfig = _rsConfig; _rsConfig = newConfig; _protVersion.store(_rsConfig.getProtocolVersion()); @@ -2933,7 +2640,6 @@ ReplicationCoordinatorImpl::_setCurrentRSConfig_inlock(const ReplSetConfig& newC _cancelAndRescheduleElectionTimeout_inlock(); const PostMemberStateUpdateAction action = _updateMemberStateFromTopologyCoordinator_inlock(); - _updateSlaveInfoFromConfig_inlock(); if (_selfIndex >= 0) { // Don't send heartbeats if we're not in the config, if we get re-added one of the // nodes in the set will contact us. @@ -2959,7 +2665,6 @@ ReplicationCoordinatorImpl::_setCurrentRSConfig_inlock(const ReplSetConfig& newC } } - _wakeReadyWaiters_inlock(); return action; } @@ -3020,24 +2725,20 @@ Status ReplicationCoordinatorImpl::processHandshake(OperationContext* opCtx, const HandshakeArgs& handshake) { LOG(2) << "Received handshake " << handshake.toBSON(); - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::lock_guard<stdx::mutex> lock(_mutex); if (getReplicationMode() != modeMasterSlave) { return Status(ErrorCodes::IllegalOperation, "The handshake command is only used for master/slave replication"); } - SlaveInfo* slaveInfo = _findSlaveInfoByRID_inlock(handshake.getRid()); - if (slaveInfo) { + auto* memberHeartbeatData = _topCoord->findMemberHeartbeatDataByRid(handshake.getRid()); + if (memberHeartbeatData) { return Status::OK(); // nothing to do } - SlaveInfo newSlaveInfo; - newSlaveInfo.rid = handshake.getRid(); - newSlaveInfo.memberId = -1; - newSlaveInfo.hostAndPort = _externalState->getClientHostAndPort(opCtx); - // Don't call _addSlaveInfo_inlock as that would wake sleepers unnecessarily. - _slaveInfo.push_back(newSlaveInfo); + memberHeartbeatData = _topCoord->addSlaveMemberData(handshake.getRid()); + memberHeartbeatData->setHostAndPort(_externalState->getClientHostAndPort(opCtx)); return Status::OK(); } @@ -3053,26 +2754,10 @@ bool ReplicationCoordinatorImpl::buildsIndexes() { std::vector<HostAndPort> ReplicationCoordinatorImpl::getHostsWrittenTo(const OpTime& op, bool durablyWritten) { - std::vector<HostAndPort> hosts; stdx::lock_guard<stdx::mutex> lk(_mutex); - for (size_t i = 0; i < _slaveInfo.size(); ++i) { - const SlaveInfo& slaveInfo = _slaveInfo[i]; - if (getReplicationMode() == modeMasterSlave && slaveInfo.rid == _getMyRID_inlock()) { - // Master-slave doesn't know the HostAndPort for itself at this point. - continue; - } - - if (durablyWritten) { - if (slaveInfo.lastDurableOpTime < op) { - continue; - } - } else if (slaveInfo.lastAppliedOpTime < op) { - continue; - } - - hosts.push_back(slaveInfo.hostAndPort); - } - return hosts; + /* skip self in master-slave mode because our own HostAndPort is unknown */ + const bool skipSelf = getReplicationMode() == modeMasterSlave; + return _topCoord->getHostsWrittenTo(op, durablyWritten, skipSelf); } std::vector<HostAndPort> ReplicationCoordinatorImpl::getOtherNodesInReplSet() const { @@ -3211,42 +2896,19 @@ bool ReplicationCoordinatorImpl::shouldChangeSyncSource( const rpc::ReplSetMetadata& replMetadata, boost::optional<rpc::OplogQueryMetadata> oqMetadata) { stdx::lock_guard<stdx::mutex> lock(_mutex); - return _topCoord->shouldChangeSyncSource(currentSource, - _getMyLastAppliedOpTime_inlock(), - replMetadata, - oqMetadata, - _replExecutor->now()); + return _topCoord->shouldChangeSyncSource( + currentSource, replMetadata, oqMetadata, _replExecutor->now()); } void ReplicationCoordinatorImpl::_updateLastCommittedOpTime_inlock() { - if (!_getMemberState_inlock().primary() || _topCoord->isStepDownPending()) { - return; + if (_topCoord->updateLastCommittedOpTime()) { + _updateCommitPoint_inlock(); } - - std::vector<OpTime> votingNodesOpTimes; - - // Whether we use the applied or durable OpTime for the commit point is decided here. - const bool useDurableOpTime = getWriteConcernMajorityShouldJournal_inlock(); - - for (const auto& sI : _slaveInfo) { - auto memberConfig = _rsConfig.findMemberByID(sI.memberId); - invariant(memberConfig); - if (memberConfig->isVoter()) { - const auto opTime = useDurableOpTime ? sI.lastDurableOpTime : sI.lastAppliedOpTime; - votingNodesOpTimes.push_back(opTime); - } - } - - invariant(votingNodesOpTimes.size() > 0); - if (votingNodesOpTimes.size() < static_cast<unsigned long>(_rsConfig.getWriteMajority())) { - return; - } - std::sort(votingNodesOpTimes.begin(), votingNodesOpTimes.end()); - - // need the majority to have this OpTime - OpTime committedOpTime = - votingNodesOpTimes[votingNodesOpTimes.size() - _rsConfig.getWriteMajority()]; - _advanceCommitPoint_inlock(committedOpTime); + // Wake up any threads waiting for replication that now have their replication + // check satisfied. We must do this regardless of whether we updated the lastCommittedOpTime, + // as lastCommittedOpTime may be based on durable optimes whereas some waiters may be + // waiting on applied (but not necessarily durable) optimes. + _wakeReadyWaiters_inlock(); } void ReplicationCoordinatorImpl::advanceCommitPoint(const OpTime& committedOpTime) { @@ -3255,28 +2917,17 @@ void ReplicationCoordinatorImpl::advanceCommitPoint(const OpTime& committedOpTim } void ReplicationCoordinatorImpl::_advanceCommitPoint_inlock(const OpTime& committedOpTime) { - if (committedOpTime == _lastCommittedOpTime) { - return; // Hasn't changed, so ignore it. - } else if (committedOpTime < _lastCommittedOpTime) { - LOG(1) << "Ignoring older committed snapshot optime: " << committedOpTime - << ", currentCommittedOpTime: " << _lastCommittedOpTime; - return; // This may have come from an out-of-order heartbeat. Ignore it. - } - - // This check is performed to ensure primaries do not commit an OpTime from a previous term. - if (_getMemberState_inlock().primary() && committedOpTime < _firstOpTimeOfMyTerm) { - LOG(1) << "Ignoring older committed snapshot from before I became primary, optime: " - << committedOpTime << ", firstOpTimeOfMyTerm: " << _firstOpTimeOfMyTerm; - return; - } + if (_topCoord->advanceLastCommittedOpTime(committedOpTime)) { + if (_getMemberState_inlock().arbiter()) { + _setMyLastAppliedOpTime_inlock(committedOpTime, false); + } - if (_getMemberState_inlock().arbiter()) { - _setMyLastAppliedOpTime_inlock(committedOpTime, false); + _updateCommitPoint_inlock(); } +} - LOG(2) << "Updating _lastCommittedOpTime to " << committedOpTime; - _lastCommittedOpTime = committedOpTime; - +void ReplicationCoordinatorImpl::_updateCommitPoint_inlock() { + auto committedOpTime = _topCoord->getLastCommittedOpTime(); _externalState->notifyOplogMetadataWaiters(); auto maxSnapshotForOpTime = SnapshotInfo{committedOpTime, SnapshotName::max()}; @@ -3302,13 +2953,9 @@ void ReplicationCoordinatorImpl::_advanceCommitPoint_inlock(const OpTime& commit } } -void ReplicationCoordinatorImpl::_setFirstOpTimeOfMyTerm_inlock(const OpTime& newOpTime) { - _firstOpTimeOfMyTerm = newOpTime; -} - OpTime ReplicationCoordinatorImpl::getLastCommittedOpTime() const { stdx::unique_lock<stdx::mutex> lk(_mutex); - return _lastCommittedOpTime; + return _topCoord->getLastCommittedOpTime(); } Status ReplicationCoordinatorImpl::processReplSetRequestVotes( @@ -3325,7 +2972,7 @@ Status ReplicationCoordinatorImpl::processReplSetRequestVotes( { stdx::lock_guard<stdx::mutex> lk(_mutex); - _topCoord->processReplSetRequestVotes(args, response, _getMyLastAppliedOpTime_inlock()); + _topCoord->processReplSetRequestVotes(args, response); } if (!args.isADryRun() && response->getVoteGranted()) { @@ -3374,16 +3021,13 @@ void ReplicationCoordinatorImpl::_prepareReplSetMetadata_inlock(const OpTime& la BSONObjBuilder* builder) const { OpTime lastVisibleOpTime = std::max(lastOpTimeFromClient, _getCurrentCommittedSnapshotOpTime_inlock()); - auto metadata = _topCoord->prepareReplSetMetadata(lastVisibleOpTime, _lastCommittedOpTime); + auto metadata = _topCoord->prepareReplSetMetadata(lastVisibleOpTime); metadata.writeToMetadata(builder); } void ReplicationCoordinatorImpl::_prepareOplogQueryMetadata_inlock(int rbid, BSONObjBuilder* builder) const { - OpTime lastAppliedOpTime = _getMyLastAppliedOpTime_inlock(); - auto metadata = - _topCoord->prepareOplogQueryMetadata(_lastCommittedOpTime, lastAppliedOpTime, rbid); - metadata.writeToMetadata(builder); + _topCoord->prepareOplogQueryMetadata(rbid).writeToMetadata(builder); } bool ReplicationCoordinatorImpl::isV1ElectionProtocol() const { @@ -3413,12 +3057,7 @@ Status ReplicationCoordinatorImpl::processHeartbeatV1(const ReplSetHeartbeatArgs auto senderHost(args.getSenderHost()); const Date_t now = _replExecutor->now(); - result = _topCoord->prepareHeartbeatResponseV1(now, - args, - _settings.ourSetName(), - _getMyLastAppliedOpTime_inlock(), - _getMyLastDurableOpTime_inlock(), - response); + result = _topCoord->prepareHeartbeatResponseV1(now, args, _settings.ourSetName(), response); if ((result.isOK() || result == ErrorCodes::InvalidReplicaSetConfig) && _selfIndex < 0) { // If this node does not belong to the configuration it knows about, send heartbeats @@ -3438,12 +3077,12 @@ Status ReplicationCoordinatorImpl::processHeartbeatV1(const ReplSetHeartbeatArgs } } else if (result.isOK()) { // Update liveness for sending node. - auto slaveInfo = _findSlaveInfoByMemberID_inlock(args.getSenderId()); - if (!slaveInfo) { + auto* memberHeartbeatData = + _topCoord->findMemberHeartbeatDataByMemberId(args.getSenderId()); + if (!memberHeartbeatData) { return result; } - slaveInfo->lastUpdate = _replExecutor->now(); - slaveInfo->down = false; + memberHeartbeatData->updateLiveness(_replExecutor->now()); } return result; } @@ -3574,7 +3213,7 @@ void ReplicationCoordinatorImpl::createSnapshot(OperationContext* opCtx, _externalState->createSnapshot(opCtx, name); auto snapshotInfo = SnapshotInfo{timeOfSnapshot, name}; - if (timeOfSnapshot <= _lastCommittedOpTime) { + if (timeOfSnapshot <= _topCoord->getLastCommittedOpTime()) { // This snapshot is ready to be marked as committed. invariant(_uncommittedSnapshots.empty()); _updateCommittedSnapshot_inlock(snapshotInfo); @@ -3596,7 +3235,7 @@ void ReplicationCoordinatorImpl::createSnapshot(OperationContext* opCtx, void ReplicationCoordinatorImpl::_updateCommittedSnapshot_inlock( SnapshotInfo newCommittedSnapshot) { invariant(!newCommittedSnapshot.opTime.isNull()); - invariant(newCommittedSnapshot.opTime <= _lastCommittedOpTime); + invariant(newCommittedSnapshot.opTime <= _topCoord->getLastCommittedOpTime()); if (_currentCommittedSnapshot) { invariant(newCommittedSnapshot.opTime >= _currentCommittedSnapshot->opTime); invariant(newCommittedSnapshot.name > _currentCommittedSnapshot->name); |