summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/topology_coordinator_impl.cpp
diff options
context:
space:
mode:
authormatt dannenberg <matt.dannenberg@10gen.com>2016-01-05 10:29:01 -0500
committerScott Hernandez <scotthernandez@tart.local>2016-02-24 09:55:46 -0500
commit7bc59dac4f46e8f59786130262fb1dfea68fb605 (patch)
tree0ad5c45f99655a7cc60abc6162993ec1eab0070d /src/mongo/db/repl/topology_coordinator_impl.cpp
parentb5a76e83860d0cff964af4989d798f19ffce4aae (diff)
downloadmongo-7bc59dac4f46e8f59786130262fb1dfea68fb605.tar.gz
SERVER-22276 SERVER-22277 implement "j" flag in write concern apply to secondary as well as primary
(cherry picked from commit 2c2e6a38f559f25559c2b24eff51511c6fbc4a5b)
Diffstat (limited to 'src/mongo/db/repl/topology_coordinator_impl.cpp')
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.cpp52
1 files changed, 29 insertions, 23 deletions
diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp
index 0c4169fe4b9..32d857be5c8 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl.cpp
@@ -204,7 +204,7 @@ HostAndPort TopologyCoordinatorImpl::chooseNewSyncSource(Date_t now,
// Find primary's oplog time. Reject sync candidates that are more than
// _options.maxSyncSourceLagSecs seconds behind.
if (_currentPrimaryIndex != -1) {
- OpTime primaryOpTime = _hbdata[_currentPrimaryIndex].getOpTime();
+ OpTime primaryOpTime = _hbdata[_currentPrimaryIndex].getAppliedOpTime();
// Check if primaryOpTime is still close to 0 because we haven't received
// our first heartbeat from a new primary yet.
@@ -257,7 +257,7 @@ HostAndPort TopologyCoordinatorImpl::chooseNewSyncSource(Date_t now,
continue;
}
// Candidates cannot be excessively behind.
- if (it->getOpTime() < oldestSyncOpTime) {
+ if (it->getAppliedOpTime() < oldestSyncOpTime) {
continue;
}
// Candidate must not have a configured delay larger than ours.
@@ -272,7 +272,7 @@ HostAndPort TopologyCoordinatorImpl::chooseNewSyncSource(Date_t now,
}
}
// only consider candidates that are ahead of where we are
- if (it->getOpTime().getTimestamp() <= lastTimestampApplied) {
+ if (it->getAppliedOpTime().getTimestamp() <= lastTimestampApplied) {
continue;
}
// Candidate cannot be more latent than anything we've already considered.
@@ -421,10 +421,10 @@ void TopologyCoordinatorImpl::prepareSyncFromResponse(const ReplicationExecutor:
str::stream() << "I cannot reach the requested member: " << target.toString());
return;
}
- if (hbdata.getOpTime().getSecs() + 10 < lastOpApplied.getSecs()) {
+ if (hbdata.getAppliedOpTime().getSecs() + 10 < lastOpApplied.getSecs()) {
warning() << "attempting to sync from " << target << ", but its latest opTime is "
- << hbdata.getOpTime().getSecs() << " and ours is " << lastOpApplied.getSecs()
- << " so this may not work";
+ << hbdata.getAppliedOpTime().getSecs() << " and ours is "
+ << lastOpApplied.getSecs() << " so this may not work";
response->append("warning",
str::stream() << "requested member \"" << target.toString()
<< "\" is more than 10 seconds behind us");
@@ -518,7 +518,7 @@ bool TopologyCoordinatorImpl::_shouldVetoMember(
return true;
}
- if (_iAmPrimary() && lastOpApplied >= _hbdata[hopefulIndex].getOpTime()) {
+ if (_iAmPrimary() && lastOpApplied >= _hbdata[hopefulIndex].getAppliedOpTime()) {
// hbinfo is not updated for ourself, so if we are primary we have to check the
// primary's last optime separately
*errmsg = str::stream() << "I am already primary, "
@@ -528,7 +528,8 @@ bool TopologyCoordinatorImpl::_shouldVetoMember(
}
if (_currentPrimaryIndex != -1 && (hopefulIndex != _currentPrimaryIndex) &&
- (_hbdata[_currentPrimaryIndex].getOpTime() >= _hbdata[hopefulIndex].getOpTime())) {
+ (_hbdata[_currentPrimaryIndex].getAppliedOpTime() >=
+ _hbdata[hopefulIndex].getAppliedOpTime())) {
// other members might be aware of more up-to-date nodes
*errmsg =
str::stream() << _rsConfig.getMemberAt(hopefulIndex).getHostAndPort().toString()
@@ -646,6 +647,7 @@ Status TopologyCoordinatorImpl::prepareHeartbeatResponse(Date_t now,
const ReplSetHeartbeatArgs& args,
const std::string& ourSetName,
const OpTime& lastOpApplied,
+ const OpTime& lastOpDurable,
ReplSetHeartbeatResponse* response) {
if (args.getProtocolVersion() != 1) {
return Status(ErrorCodes::BadValue,
@@ -694,7 +696,8 @@ Status TopologyCoordinatorImpl::prepareHeartbeatResponse(Date_t now,
// Heartbeat status message
response->setHbMsg(_getHbmsg(now));
response->setTime(duration_cast<Seconds>(now - Date_t{}));
- response->setOpTime(lastOpApplied);
+ response->setAppliedOpTime(lastOpApplied);
+ response->setDurableOpTime(lastOpDurable);
if (!_syncSource.empty()) {
response->setSyncingTo(_syncSource);
@@ -737,6 +740,7 @@ Status TopologyCoordinatorImpl::prepareHeartbeatResponseV1(Date_t now,
const ReplSetHeartbeatArgsV1& args,
const std::string& ourSetName,
const OpTime& lastOpApplied,
+ const OpTime& lastOpDurable,
ReplSetHeartbeatResponse* response) {
// Verify that replica set names match
const std::string rshb = args.getSetName();
@@ -770,7 +774,8 @@ Status TopologyCoordinatorImpl::prepareHeartbeatResponseV1(Date_t now,
response->setElectionTime(_electionTime);
}
- response->setOpTime(lastOpApplied);
+ response->setAppliedOpTime(lastOpApplied);
+ response->setDurableOpTime(lastOpDurable);
if (_currentPrimaryIndex != -1) {
response->setPrimaryId(_rsConfig.getMemberAt(_currentPrimaryIndex).getId());
@@ -1148,7 +1153,7 @@ HeartbeatResponseAction TopologyCoordinatorImpl::_updatePrimaryFromHBData(
const MemberConfig& highestPriorityMember = _rsConfig.getMemberAt(highestPriorityIndex);
const OpTime highestPriorityMemberOptime = highestPriorityIndex == _selfIndex
? lastOpApplied
- : _hbdata[highestPriorityIndex].getOpTime();
+ : _hbdata[highestPriorityIndex].getAppliedOpTime();
if ((highestPriorityMember.getPriority() > currentPrimaryMember.getPriority()) &&
_isOpTimeCloseEnoughToLatestToElect(highestPriorityMemberOptime, lastOpApplied)) {
@@ -1378,7 +1383,7 @@ OpTime TopologyCoordinatorImpl::_latestKnownOpTime(const OpTime& ourLastOpApplie
continue;
}
- OpTime optime = it->getOpTime();
+ OpTime optime = it->getAppliedOpTime();
if (optime > latest) {
latest = optime;
@@ -1467,7 +1472,7 @@ void TopologyCoordinatorImpl::_setCurrentPrimaryForTest(int primaryIndex) {
ReplSetHeartbeatResponse hbResponse;
hbResponse.setState(MemberState::RS_PRIMARY);
hbResponse.setElectionTime(Timestamp());
- hbResponse.setOpTime(_hbdata[primaryIndex].getOpTime());
+ hbResponse.setAppliedOpTime(_hbdata[primaryIndex].getAppliedOpTime());
hbResponse.setSyncingTo(HostAndPort());
hbResponse.setHbMsg("");
_hbdata[primaryIndex].setUpValues(_hbdata[primaryIndex].getLastHeartbeat(),
@@ -1598,15 +1603,16 @@ void TopologyCoordinatorImpl::prepareStatusResponse(const ReplicationExecutor::C
if (!itConfig.isArbiter()) {
if (_rsConfig.getProtocolVersion() == 1) {
BSONObjBuilder opTime(bb.subobjStart("optime"));
- opTime.append("ts", it->getOpTime().getTimestamp());
- opTime.append("t", it->getOpTime().getTerm());
+ opTime.append("ts", it->getAppliedOpTime().getTimestamp());
+ opTime.append("t", it->getAppliedOpTime().getTerm());
opTime.done();
} else {
- bb.append("optime", it->getOpTime().getTimestamp());
+ bb.append("optime", it->getAppliedOpTime().getTimestamp());
}
- bb.appendDate("optimeDate",
- Date_t::fromDurationSinceEpoch(Seconds(it->getOpTime().getSecs())));
+ bb.appendDate(
+ "optimeDate",
+ Date_t::fromDurationSinceEpoch(Seconds(it->getAppliedOpTime().getSecs())));
}
bb.appendDate("lastHeartbeat", it->getLastHeartbeat());
bb.appendDate("lastHeartbeatRecv", it->getLastHeartbeatRecv());
@@ -1914,7 +1920,7 @@ TopologyCoordinatorImpl::UnelectableReasonMask TopologyCoordinatorImpl::_getUnel
result |= NotSecondary;
}
if (_rsConfig.getProtocolVersion() == 0 &&
- !_isOpTimeCloseEnoughToLatestToElect(hbData.getOpTime(), lastOpApplied)) {
+ !_isOpTimeCloseEnoughToLatestToElect(hbData.getAppliedOpTime(), lastOpApplied)) {
result |= NotCloseEnoughToLatestOptime;
}
if (hbData.up() && hbData.isUnelectable()) {
@@ -2175,7 +2181,7 @@ bool TopologyCoordinatorImpl::stepDown(Date_t until, bool force, const OpTime& l
continue;
}
UnelectableReasonMask reason = _getUnelectableReason(i, lastOpApplied);
- if (!reason && _hbdata[i].getOpTime() >= lastOpApplied) {
+ if (!reason && _hbdata[i].getAppliedOpTime() >= lastOpApplied) {
canStepDown = true;
}
}
@@ -2309,7 +2315,7 @@ bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentS
invariant(currentSourceIndex != _selfIndex);
OpTime currentSourceOpTime =
- std::max(syncSourceLastOpTime, _hbdata[currentSourceIndex].getOpTime());
+ std::max(syncSourceLastOpTime, _hbdata[currentSourceIndex].getAppliedOpTime());
if (currentSourceOpTime.isNull()) {
// Haven't received a heartbeat from the sync source yet, so can't tell if we should
@@ -2333,12 +2339,12 @@ bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentS
if (it->up() && (candidateConfig.isVoter() || !_selfConfig().isVoter()) &&
(candidateConfig.shouldBuildIndexes() || !_selfConfig().shouldBuildIndexes()) &&
it->getState().readable() && !_memberIsBlacklisted(candidateConfig, now) &&
- goalSecs < it->getOpTime().getSecs()) {
+ goalSecs < it->getAppliedOpTime().getSecs()) {
log() << "re-evaluating sync source because our current sync source's most recent "
<< "OpTime is " << currentSourceOpTime.toString() << " which is more than "
<< _options.maxSyncSourceLagSecs << " behind member "
<< candidateConfig.getHostAndPort().toString() << " whose most recent OpTime is "
- << it->getOpTime().toString();
+ << it->getAppliedOpTime().toString();
invariant(itIndex != _selfIndex);
return true;
}