diff options
author | matt dannenberg <matt.dannenberg@10gen.com> | 2016-01-05 10:29:01 -0500 |
---|---|---|
committer | Scott Hernandez <scotthernandez@tart.local> | 2016-02-24 09:55:46 -0500 |
commit | 7bc59dac4f46e8f59786130262fb1dfea68fb605 (patch) | |
tree | 0ad5c45f99655a7cc60abc6162993ec1eab0070d /src/mongo/db/repl/topology_coordinator_impl.cpp | |
parent | b5a76e83860d0cff964af4989d798f19ffce4aae (diff) | |
download | mongo-7bc59dac4f46e8f59786130262fb1dfea68fb605.tar.gz |
SERVER-22276 SERVER-22277 implement "j" flag in write concern apply to secondary as well as primary
(cherry picked from commit 2c2e6a38f559f25559c2b24eff51511c6fbc4a5b)
Diffstat (limited to 'src/mongo/db/repl/topology_coordinator_impl.cpp')
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_impl.cpp | 52 |
1 files changed, 29 insertions, 23 deletions
diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp index 0c4169fe4b9..32d857be5c8 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl.cpp @@ -204,7 +204,7 @@ HostAndPort TopologyCoordinatorImpl::chooseNewSyncSource(Date_t now, // Find primary's oplog time. Reject sync candidates that are more than // _options.maxSyncSourceLagSecs seconds behind. if (_currentPrimaryIndex != -1) { - OpTime primaryOpTime = _hbdata[_currentPrimaryIndex].getOpTime(); + OpTime primaryOpTime = _hbdata[_currentPrimaryIndex].getAppliedOpTime(); // Check if primaryOpTime is still close to 0 because we haven't received // our first heartbeat from a new primary yet. @@ -257,7 +257,7 @@ HostAndPort TopologyCoordinatorImpl::chooseNewSyncSource(Date_t now, continue; } // Candidates cannot be excessively behind. - if (it->getOpTime() < oldestSyncOpTime) { + if (it->getAppliedOpTime() < oldestSyncOpTime) { continue; } // Candidate must not have a configured delay larger than ours. @@ -272,7 +272,7 @@ HostAndPort TopologyCoordinatorImpl::chooseNewSyncSource(Date_t now, } } // only consider candidates that are ahead of where we are - if (it->getOpTime().getTimestamp() <= lastTimestampApplied) { + if (it->getAppliedOpTime().getTimestamp() <= lastTimestampApplied) { continue; } // Candidate cannot be more latent than anything we've already considered. @@ -421,10 +421,10 @@ void TopologyCoordinatorImpl::prepareSyncFromResponse(const ReplicationExecutor: str::stream() << "I cannot reach the requested member: " << target.toString()); return; } - if (hbdata.getOpTime().getSecs() + 10 < lastOpApplied.getSecs()) { + if (hbdata.getAppliedOpTime().getSecs() + 10 < lastOpApplied.getSecs()) { warning() << "attempting to sync from " << target << ", but its latest opTime is " - << hbdata.getOpTime().getSecs() << " and ours is " << lastOpApplied.getSecs() - << " so this may not work"; + << hbdata.getAppliedOpTime().getSecs() << " and ours is " + << lastOpApplied.getSecs() << " so this may not work"; response->append("warning", str::stream() << "requested member \"" << target.toString() << "\" is more than 10 seconds behind us"); @@ -518,7 +518,7 @@ bool TopologyCoordinatorImpl::_shouldVetoMember( return true; } - if (_iAmPrimary() && lastOpApplied >= _hbdata[hopefulIndex].getOpTime()) { + if (_iAmPrimary() && lastOpApplied >= _hbdata[hopefulIndex].getAppliedOpTime()) { // hbinfo is not updated for ourself, so if we are primary we have to check the // primary's last optime separately *errmsg = str::stream() << "I am already primary, " @@ -528,7 +528,8 @@ bool TopologyCoordinatorImpl::_shouldVetoMember( } if (_currentPrimaryIndex != -1 && (hopefulIndex != _currentPrimaryIndex) && - (_hbdata[_currentPrimaryIndex].getOpTime() >= _hbdata[hopefulIndex].getOpTime())) { + (_hbdata[_currentPrimaryIndex].getAppliedOpTime() >= + _hbdata[hopefulIndex].getAppliedOpTime())) { // other members might be aware of more up-to-date nodes *errmsg = str::stream() << _rsConfig.getMemberAt(hopefulIndex).getHostAndPort().toString() @@ -646,6 +647,7 @@ Status TopologyCoordinatorImpl::prepareHeartbeatResponse(Date_t now, const ReplSetHeartbeatArgs& args, const std::string& ourSetName, const OpTime& lastOpApplied, + const OpTime& lastOpDurable, ReplSetHeartbeatResponse* response) { if (args.getProtocolVersion() != 1) { return Status(ErrorCodes::BadValue, @@ -694,7 +696,8 @@ Status TopologyCoordinatorImpl::prepareHeartbeatResponse(Date_t now, // Heartbeat status message response->setHbMsg(_getHbmsg(now)); response->setTime(duration_cast<Seconds>(now - Date_t{})); - response->setOpTime(lastOpApplied); + response->setAppliedOpTime(lastOpApplied); + response->setDurableOpTime(lastOpDurable); if (!_syncSource.empty()) { response->setSyncingTo(_syncSource); @@ -737,6 +740,7 @@ Status TopologyCoordinatorImpl::prepareHeartbeatResponseV1(Date_t now, const ReplSetHeartbeatArgsV1& args, const std::string& ourSetName, const OpTime& lastOpApplied, + const OpTime& lastOpDurable, ReplSetHeartbeatResponse* response) { // Verify that replica set names match const std::string rshb = args.getSetName(); @@ -770,7 +774,8 @@ Status TopologyCoordinatorImpl::prepareHeartbeatResponseV1(Date_t now, response->setElectionTime(_electionTime); } - response->setOpTime(lastOpApplied); + response->setAppliedOpTime(lastOpApplied); + response->setDurableOpTime(lastOpDurable); if (_currentPrimaryIndex != -1) { response->setPrimaryId(_rsConfig.getMemberAt(_currentPrimaryIndex).getId()); @@ -1148,7 +1153,7 @@ HeartbeatResponseAction TopologyCoordinatorImpl::_updatePrimaryFromHBData( const MemberConfig& highestPriorityMember = _rsConfig.getMemberAt(highestPriorityIndex); const OpTime highestPriorityMemberOptime = highestPriorityIndex == _selfIndex ? lastOpApplied - : _hbdata[highestPriorityIndex].getOpTime(); + : _hbdata[highestPriorityIndex].getAppliedOpTime(); if ((highestPriorityMember.getPriority() > currentPrimaryMember.getPriority()) && _isOpTimeCloseEnoughToLatestToElect(highestPriorityMemberOptime, lastOpApplied)) { @@ -1378,7 +1383,7 @@ OpTime TopologyCoordinatorImpl::_latestKnownOpTime(const OpTime& ourLastOpApplie continue; } - OpTime optime = it->getOpTime(); + OpTime optime = it->getAppliedOpTime(); if (optime > latest) { latest = optime; @@ -1467,7 +1472,7 @@ void TopologyCoordinatorImpl::_setCurrentPrimaryForTest(int primaryIndex) { ReplSetHeartbeatResponse hbResponse; hbResponse.setState(MemberState::RS_PRIMARY); hbResponse.setElectionTime(Timestamp()); - hbResponse.setOpTime(_hbdata[primaryIndex].getOpTime()); + hbResponse.setAppliedOpTime(_hbdata[primaryIndex].getAppliedOpTime()); hbResponse.setSyncingTo(HostAndPort()); hbResponse.setHbMsg(""); _hbdata[primaryIndex].setUpValues(_hbdata[primaryIndex].getLastHeartbeat(), @@ -1598,15 +1603,16 @@ void TopologyCoordinatorImpl::prepareStatusResponse(const ReplicationExecutor::C if (!itConfig.isArbiter()) { if (_rsConfig.getProtocolVersion() == 1) { BSONObjBuilder opTime(bb.subobjStart("optime")); - opTime.append("ts", it->getOpTime().getTimestamp()); - opTime.append("t", it->getOpTime().getTerm()); + opTime.append("ts", it->getAppliedOpTime().getTimestamp()); + opTime.append("t", it->getAppliedOpTime().getTerm()); opTime.done(); } else { - bb.append("optime", it->getOpTime().getTimestamp()); + bb.append("optime", it->getAppliedOpTime().getTimestamp()); } - bb.appendDate("optimeDate", - Date_t::fromDurationSinceEpoch(Seconds(it->getOpTime().getSecs()))); + bb.appendDate( + "optimeDate", + Date_t::fromDurationSinceEpoch(Seconds(it->getAppliedOpTime().getSecs()))); } bb.appendDate("lastHeartbeat", it->getLastHeartbeat()); bb.appendDate("lastHeartbeatRecv", it->getLastHeartbeatRecv()); @@ -1914,7 +1920,7 @@ TopologyCoordinatorImpl::UnelectableReasonMask TopologyCoordinatorImpl::_getUnel result |= NotSecondary; } if (_rsConfig.getProtocolVersion() == 0 && - !_isOpTimeCloseEnoughToLatestToElect(hbData.getOpTime(), lastOpApplied)) { + !_isOpTimeCloseEnoughToLatestToElect(hbData.getAppliedOpTime(), lastOpApplied)) { result |= NotCloseEnoughToLatestOptime; } if (hbData.up() && hbData.isUnelectable()) { @@ -2175,7 +2181,7 @@ bool TopologyCoordinatorImpl::stepDown(Date_t until, bool force, const OpTime& l continue; } UnelectableReasonMask reason = _getUnelectableReason(i, lastOpApplied); - if (!reason && _hbdata[i].getOpTime() >= lastOpApplied) { + if (!reason && _hbdata[i].getAppliedOpTime() >= lastOpApplied) { canStepDown = true; } } @@ -2309,7 +2315,7 @@ bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentS invariant(currentSourceIndex != _selfIndex); OpTime currentSourceOpTime = - std::max(syncSourceLastOpTime, _hbdata[currentSourceIndex].getOpTime()); + std::max(syncSourceLastOpTime, _hbdata[currentSourceIndex].getAppliedOpTime()); if (currentSourceOpTime.isNull()) { // Haven't received a heartbeat from the sync source yet, so can't tell if we should @@ -2333,12 +2339,12 @@ bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentS if (it->up() && (candidateConfig.isVoter() || !_selfConfig().isVoter()) && (candidateConfig.shouldBuildIndexes() || !_selfConfig().shouldBuildIndexes()) && it->getState().readable() && !_memberIsBlacklisted(candidateConfig, now) && - goalSecs < it->getOpTime().getSecs()) { + goalSecs < it->getAppliedOpTime().getSecs()) { log() << "re-evaluating sync source because our current sync source's most recent " << "OpTime is " << currentSourceOpTime.toString() << " which is more than " << _options.maxSyncSourceLagSecs << " behind member " << candidateConfig.getHostAndPort().toString() << " whose most recent OpTime is " - << it->getOpTime().toString(); + << it->getAppliedOpTime().toString(); invariant(itIndex != _selfIndex); return true; } |