summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/replication_coordinator_impl.cpp
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@10gen.com>2017-05-15 14:33:22 -0400
committerMatthew Russotto <matthew.russotto@10gen.com>2017-05-15 15:26:27 -0400
commitc88c4809c2440d286ed0fc29e1e8d684f015e563 (patch)
tree71962294640d0ddecacda316e8e1eda2323c9333 /src/mongo/db/repl/replication_coordinator_impl.cpp
parentb69aed9d10ef66de42880fd379b0a593419b6e47 (diff)
downloadmongo-c88c4809c2440d286ed0fc29e1e8d684f015e563.tar.gz
SERVER-26990 Unify tracking of secondary state between replication and topology coordinators
This fixes a bug in the 6adc71f6cf069803f9c1288aef88ffe0d21c6ffe which caused crashes when a sync source change was requested of a node not in the configuration. It also fixes a dependency problem affecting the shared library build.
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_impl.cpp')
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp559
1 files changed, 99 insertions, 460 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index b4138baa290..7591746797d 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -135,25 +135,6 @@ BSONObj incrementConfigVersionByRandom(BSONObj config) {
const Seconds kNoopWriterPeriod(10);
} // namespace
-BSONObj ReplicationCoordinatorImpl::SlaveInfo::toBSON() const {
- BSONObjBuilder bo;
- bo.append("id", memberId);
- bo.append("rid", rid);
- bo.append("host", hostAndPort.toString());
- bo.append("lastDurableOpTime", lastDurableOpTime.toBSON());
- bo.append("lastAppliedOpTime", lastAppliedOpTime.toBSON());
- if (self)
- bo.append("self", true);
- if (down)
- bo.append("down", true);
- bo.append("lastUpdated", lastUpdate);
- return bo.obj();
-}
-
-std::string ReplicationCoordinatorImpl::SlaveInfo::toString() const {
- return toBSON().toString();
-}
-
ReplicationCoordinatorImpl::Waiter::Waiter(OpTime _opTime, const WriteConcernOptions* _writeConcern)
: opTime(std::move(_opTime)), writeConcern(_writeConcern) {}
@@ -331,11 +312,6 @@ ReplicationCoordinatorImpl::ReplicationCoordinatorImpl(
return;
}
- // Make sure there is always an entry in _slaveInfo for ourself.
- SlaveInfo selfInfo;
- selfInfo.self = true;
- _slaveInfo.push_back(selfInfo);
-
_externalState->setupNoopWriter(kNoopWriterPeriod);
}
@@ -681,7 +657,7 @@ void ReplicationCoordinatorImpl::startup(OperationContext* opCtx) {
fassert(18822, !_inShutdown);
_setConfigState_inlock(kConfigStartingUp);
_myRID = rid;
- _slaveInfo[_getMyIndexInSlaveInfo_inlock()].rid = rid;
+ _topCoord->getMyMemberHeartbeatData()->setRid(rid);
}
if (!_settings.usingReplSets()) {
@@ -917,7 +893,7 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* opCtx,
lk.unlock();
OpTime firstOpTime = _externalState->onTransitionToPrimary(opCtx, isV1ElectionProtocol());
lk.lock();
- _setFirstOpTimeOfMyTerm_inlock(firstOpTime);
+ _topCoord->setFirstOpTimeOfMyTerm(firstOpTime);
// Must calculate the commit level again because firstOpTimeOfMyTerm wasn't set when we logged
// our election in onTransitionToPrimary(), above.
@@ -946,127 +922,6 @@ void ReplicationCoordinatorImpl::signalUpstreamUpdater() {
_externalState->forwardSlaveProgress();
}
-ReplicationCoordinatorImpl::SlaveInfo* ReplicationCoordinatorImpl::_findSlaveInfoByMemberID_inlock(
- int memberId) {
- for (SlaveInfoVector::iterator it = _slaveInfo.begin(); it != _slaveInfo.end(); ++it) {
- if (it->memberId == memberId) {
- return &(*it);
- }
- }
- return NULL;
-}
-
-ReplicationCoordinatorImpl::SlaveInfo* ReplicationCoordinatorImpl::_findSlaveInfoByRID_inlock(
- const OID& rid) {
- for (SlaveInfoVector::iterator it = _slaveInfo.begin(); it != _slaveInfo.end(); ++it) {
- if (it->rid == rid) {
- return &(*it);
- }
- }
- return NULL;
-}
-
-void ReplicationCoordinatorImpl::_addSlaveInfo_inlock(const SlaveInfo& slaveInfo) {
- invariant(getReplicationMode() == modeMasterSlave);
- _slaveInfo.push_back(slaveInfo);
-
- _updateLastCommittedOpTime_inlock();
- // Wake up any threads waiting for replication that now have their replication
- // check satisfied
- _wakeReadyWaiters_inlock();
-}
-
-void ReplicationCoordinatorImpl::_updateSlaveInfoAppliedOpTime_inlock(SlaveInfo* slaveInfo,
- const OpTime& opTime) {
- slaveInfo->lastAppliedOpTime = opTime;
- slaveInfo->lastUpdate = _replExecutor->now();
- slaveInfo->down = false;
-
- _updateLastCommittedOpTime_inlock();
- // Wake up any threads waiting for replication that now have their replication
- // check satisfied
- _wakeReadyWaiters_inlock();
-}
-
-void ReplicationCoordinatorImpl::_updateSlaveInfoDurableOpTime_inlock(SlaveInfo* slaveInfo,
- const OpTime& opTime) {
- // lastAppliedOpTime cannot be behind lastDurableOpTime.
- if (slaveInfo->lastAppliedOpTime < opTime) {
- log() << "Durable progress (" << opTime << ") is ahead of the applied progress ("
- << slaveInfo->lastAppliedOpTime << ". This is likely due to a "
- "rollback. slaveInfo: "
- << slaveInfo->toString();
- return;
- }
- slaveInfo->lastDurableOpTime = opTime;
- slaveInfo->lastUpdate = _replExecutor->now();
- slaveInfo->down = false;
-
- _updateLastCommittedOpTime_inlock();
- // Wake up any threads waiting for replication that now have their replication
- // check satisfied
- _wakeReadyWaiters_inlock();
-}
-
-void ReplicationCoordinatorImpl::_updateSlaveInfoFromConfig_inlock() {
- invariant(_settings.usingReplSets());
-
- SlaveInfoVector oldSlaveInfos;
- _slaveInfo.swap(oldSlaveInfos);
-
- if (_selfIndex == -1) {
- // If we aren't in the config then the only data we care about is for ourself
- for (SlaveInfoVector::const_iterator it = oldSlaveInfos.begin(); it != oldSlaveInfos.end();
- ++it) {
- if (it->self) {
- SlaveInfo slaveInfo = *it;
- slaveInfo.memberId = -1;
- _slaveInfo.push_back(slaveInfo);
- return;
- }
- }
- invariant(false); // There should always have been an entry for ourself
- }
-
- for (int i = 0; i < _rsConfig.getNumMembers(); ++i) {
- const MemberConfig& memberConfig = _rsConfig.getMemberAt(i);
- int memberId = memberConfig.getId();
- const HostAndPort& memberHostAndPort = memberConfig.getHostAndPort();
-
- SlaveInfo slaveInfo;
-
- // Check if the node existed with the same member ID and hostname in the old data
- for (SlaveInfoVector::const_iterator it = oldSlaveInfos.begin(); it != oldSlaveInfos.end();
- ++it) {
- if ((it->memberId == memberId && it->hostAndPort == memberHostAndPort) ||
- (i == _selfIndex && it->self)) {
- slaveInfo = *it;
- }
- }
-
- // Make sure you have the most up-to-date info for member ID and hostAndPort.
- slaveInfo.memberId = memberId;
- slaveInfo.hostAndPort = memberHostAndPort;
- _slaveInfo.push_back(slaveInfo);
- }
- invariant(static_cast<int>(_slaveInfo.size()) == _rsConfig.getNumMembers());
-}
-
-size_t ReplicationCoordinatorImpl::_getMyIndexInSlaveInfo_inlock() const {
- if (getReplicationMode() == modeMasterSlave) {
- // Self data always lives in the first entry in _slaveInfo for master/slave
- return 0;
- } else {
- invariant(_settings.usingReplSets());
- if (_selfIndex == -1) {
- invariant(_slaveInfo.size() == 1);
- return 0;
- } else {
- return _selfIndex;
- }
- }
-}
-
Status ReplicationCoordinatorImpl::setLastOptimeForSlave(const OID& rid, const Timestamp& ts) {
stdx::unique_lock<stdx::mutex> lock(_mutex);
massert(28576,
@@ -1077,17 +932,14 @@ Status ReplicationCoordinatorImpl::setLastOptimeForSlave(const OID& rid, const T
// term == -1 for master-slave
OpTime opTime(ts, OpTime::kUninitializedTerm);
- SlaveInfo* slaveInfo = _findSlaveInfoByRID_inlock(rid);
- if (slaveInfo) {
- if (slaveInfo->lastAppliedOpTime < opTime) {
- _updateSlaveInfoAppliedOpTime_inlock(slaveInfo, opTime);
- }
+ MemberHeartbeatData* memberHeartbeatData = _topCoord->findMemberHeartbeatDataByRid(rid);
+ if (memberHeartbeatData) {
+ memberHeartbeatData->advanceLastAppliedOpTime(opTime, _replExecutor->now());
} else {
- SlaveInfo newSlaveInfo;
- newSlaveInfo.rid = rid;
- newSlaveInfo.lastAppliedOpTime = opTime;
- _addSlaveInfo_inlock(newSlaveInfo);
+ auto* memberHeartbeatData = _topCoord->addSlaveMemberData(rid);
+ memberHeartbeatData->setLastAppliedOpTime(opTime, _replExecutor->now());
}
+ _updateLastCommittedOpTime_inlock();
return Status::OK();
}
@@ -1156,27 +1008,20 @@ void ReplicationCoordinatorImpl::_reportUpstream_inlock(stdx::unique_lock<stdx::
void ReplicationCoordinatorImpl::_setMyLastAppliedOpTime_inlock(const OpTime& opTime,
bool isRollbackAllowed) {
- SlaveInfo* mySlaveInfo = &_slaveInfo[_getMyIndexInSlaveInfo_inlock()];
- invariant(isRollbackAllowed || mySlaveInfo->lastAppliedOpTime <= opTime);
- _updateSlaveInfoAppliedOpTime_inlock(mySlaveInfo, opTime);
-
+ auto* myMemberHeartbeatData = _topCoord->getMyMemberHeartbeatData();
+ invariant(isRollbackAllowed || myMemberHeartbeatData->getLastAppliedOpTime() <= opTime);
+ myMemberHeartbeatData->setLastAppliedOpTime(opTime, _replExecutor->now());
+ _updateLastCommittedOpTime_inlock();
_opTimeWaiterList.signalAndRemoveIf_inlock(
[opTime](Waiter* waiter) { return waiter->opTime <= opTime; });
}
void ReplicationCoordinatorImpl::_setMyLastDurableOpTime_inlock(const OpTime& opTime,
bool isRollbackAllowed) {
- SlaveInfo* mySlaveInfo = &_slaveInfo[_getMyIndexInSlaveInfo_inlock()];
- invariant(isRollbackAllowed || mySlaveInfo->lastDurableOpTime <= opTime);
- // lastAppliedOpTime cannot be behind lastDurableOpTime.
- if (mySlaveInfo->lastAppliedOpTime < opTime) {
- log() << "My durable progress (" << opTime << ") is ahead of my applied progress ("
- << mySlaveInfo->lastAppliedOpTime << ". This is likely due to a "
- "rollback. slaveInfo: "
- << mySlaveInfo->toString();
- return;
- }
- _updateSlaveInfoDurableOpTime_inlock(mySlaveInfo, opTime);
+ auto* myMemberHeartbeatData = _topCoord->getMyMemberHeartbeatData();
+ invariant(isRollbackAllowed || myMemberHeartbeatData->getLastDurableOpTime() <= opTime);
+ myMemberHeartbeatData->setLastDurableOpTime(opTime, _replExecutor->now());
+ _updateLastCommittedOpTime_inlock();
}
OpTime ReplicationCoordinatorImpl::getMyLastAppliedOpTime() const {
@@ -1343,11 +1188,11 @@ Status ReplicationCoordinatorImpl::_waitUntilOpTimeForReadDeprecated(
}
OpTime ReplicationCoordinatorImpl::_getMyLastAppliedOpTime_inlock() const {
- return _slaveInfo[_getMyIndexInSlaveInfo_inlock()].lastAppliedOpTime;
+ return _topCoord->getMyLastAppliedOpTime();
}
OpTime ReplicationCoordinatorImpl::_getMyLastDurableOpTime_inlock() const {
- return _slaveInfo[_getMyIndexInSlaveInfo_inlock()].lastDurableOpTime;
+ return _topCoord->getMyLastDurableOpTime();
}
Status ReplicationCoordinatorImpl::setLastDurableOptime_forTest(long long cfgVer,
@@ -1402,7 +1247,6 @@ Status ReplicationCoordinatorImpl::_setLastOptime_inlock(
<< " in config with version " << args.cfgver
<< " has durably reached optime: " << args.ts;
- SlaveInfo* slaveInfo = NULL;
if (args.cfgver != _rsConfig.getConfigVersion()) {
std::string errmsg = str::stream()
<< "Received replSetUpdatePosition for node with memberId " << args.memberId
@@ -1413,8 +1257,8 @@ Status ReplicationCoordinatorImpl::_setLastOptime_inlock(
return Status(ErrorCodes::InvalidReplicaSetConfig, errmsg);
}
- slaveInfo = _findSlaveInfoByMemberID_inlock(args.memberId);
- if (!slaveInfo) {
+ auto* memberHeartbeatData = _topCoord->findMemberHeartbeatDataByMemberId(args.memberId);
+ if (!memberHeartbeatData) {
invariant(!_rsConfig.findMemberByID(args.memberId));
std::string errmsg = str::stream()
@@ -1424,25 +1268,22 @@ Status ReplicationCoordinatorImpl::_setLastOptime_inlock(
return Status(ErrorCodes::NodeNotFound, errmsg);
}
- invariant(args.memberId == slaveInfo->memberId);
+ invariant(args.memberId == memberHeartbeatData->getMemberId());
- LOG(3) << "Node with memberID " << args.memberId << " has durably applied operationss through "
- << slaveInfo->lastDurableOpTime << " and has applied operations through "
- << slaveInfo->lastAppliedOpTime << "; updating to new durable operation with timestamp "
- << args.ts;
+ LOG(3) << "Node with memberID " << args.memberId << " has durably applied operations through "
+ << memberHeartbeatData->getLastDurableOpTime() << " and has applied operations through "
+ << memberHeartbeatData->getLastAppliedOpTime()
+ << "; updating to new durable operation with timestamp " << args.ts;
- // Only update remote optimes if they increase.
- if (slaveInfo->lastAppliedOpTime < args.ts) {
- _updateSlaveInfoAppliedOpTime_inlock(slaveInfo, args.ts);
- }
- if (slaveInfo->lastDurableOpTime < args.ts) {
- _updateSlaveInfoDurableOpTime_inlock(slaveInfo, args.ts);
- }
+ auto now(_replExecutor->now());
+ bool advancedOpTime = memberHeartbeatData->advanceLastAppliedOpTime(args.ts, now);
+ advancedOpTime = memberHeartbeatData->advanceLastDurableOpTime(args.ts, now) || advancedOpTime;
+ // Only update committed optime if the remote optimes increased.
+ if (advancedOpTime) {
+ _updateLastCommittedOpTime_inlock();
+ }
- // Update liveness for this node.
- slaveInfo->lastUpdate = _replExecutor->now();
- slaveInfo->down = false;
_cancelAndRescheduleLivenessUpdate_inlock(args.memberId);
return Status::OK();
}
@@ -1474,7 +1315,6 @@ Status ReplicationCoordinatorImpl::_setLastOptime_inlock(const UpdatePositionArg
<< " has reached optime: " << args.appliedOpTime
<< " and is durable through: " << args.durableOpTime;
- SlaveInfo* slaveInfo = NULL;
if (args.cfgver != _rsConfig.getConfigVersion()) {
std::string errmsg = str::stream()
<< "Received replSetUpdatePosition for node with memberId " << args.memberId
@@ -1485,8 +1325,8 @@ Status ReplicationCoordinatorImpl::_setLastOptime_inlock(const UpdatePositionArg
return Status(ErrorCodes::InvalidReplicaSetConfig, errmsg);
}
- slaveInfo = _findSlaveInfoByMemberID_inlock(args.memberId);
- if (!slaveInfo) {
+ auto* memberHeartbeatData = _topCoord->findMemberHeartbeatDataByMemberId(args.memberId);
+ if (!memberHeartbeatData) {
invariant(!_rsConfig.findMemberByID(args.memberId));
std::string errmsg = str::stream()
@@ -1496,25 +1336,24 @@ Status ReplicationCoordinatorImpl::_setLastOptime_inlock(const UpdatePositionArg
return Status(ErrorCodes::NodeNotFound, errmsg);
}
- invariant(args.memberId == slaveInfo->memberId);
+ invariant(args.memberId == memberHeartbeatData->getMemberId());
LOG(3) << "Node with memberID " << args.memberId << " currently has optime "
- << slaveInfo->lastAppliedOpTime << " durable through " << slaveInfo->lastDurableOpTime
- << "; updating to optime " << args.appliedOpTime << " and durable through "
- << args.durableOpTime;
+ << memberHeartbeatData->getLastAppliedOpTime() << " durable through "
+ << memberHeartbeatData->getLastDurableOpTime() << "; updating to optime "
+ << args.appliedOpTime << " and durable through " << args.durableOpTime;
- // Only update remote optimes if they increase.
- if (slaveInfo->lastAppliedOpTime < args.appliedOpTime) {
- _updateSlaveInfoAppliedOpTime_inlock(slaveInfo, args.appliedOpTime);
- }
- if (slaveInfo->lastDurableOpTime < args.durableOpTime) {
- _updateSlaveInfoDurableOpTime_inlock(slaveInfo, args.durableOpTime);
+ auto now(_replExecutor->now());
+ bool advancedOpTime = memberHeartbeatData->advanceLastAppliedOpTime(args.appliedOpTime, now);
+ advancedOpTime =
+ memberHeartbeatData->advanceLastDurableOpTime(args.durableOpTime, now) || advancedOpTime;
+
+ // Only update committed optime if the remote optimes increased.
+ if (advancedOpTime) {
+ _updateLastCommittedOpTime_inlock();
}
- // Update liveness for this node.
- slaveInfo->lastUpdate = _replExecutor->now();
- slaveInfo->down = false;
_cancelAndRescheduleLivenessUpdate_inlock(args.memberId);
return Status::OK();
}
@@ -1531,7 +1370,8 @@ bool ReplicationCoordinatorImpl::_doneWaitingForReplication_inlock(
const bool useDurableOpTime = writeConcern.syncMode == WriteConcernOptions::SyncMode::JOURNAL;
if (writeConcern.wMode.empty()) {
- return _haveNumNodesReachedOpTime_inlock(opTime, writeConcern.wNumNodes, useDurableOpTime);
+ return _topCoord->haveNumNodesReachedOpTime(
+ opTime, writeConcern.wNumNodes, useDurableOpTime);
}
StringData patternName;
@@ -1564,53 +1404,7 @@ bool ReplicationCoordinatorImpl::_doneWaitingForReplication_inlock(
if (!tagPattern.isOK()) {
return true;
}
- return _haveTaggedNodesReachedOpTime_inlock(opTime, tagPattern.getValue(), useDurableOpTime);
-}
-
-bool ReplicationCoordinatorImpl::_haveNumNodesReachedOpTime_inlock(const OpTime& targetOpTime,
- int numNodes,
- bool durablyWritten) {
- // Replication progress that is for some reason ahead of us should not allow us to
- // satisfy a write concern if we aren't caught up ourselves.
- OpTime myOpTime =
- durablyWritten ? _getMyLastDurableOpTime_inlock() : _getMyLastAppliedOpTime_inlock();
- if (myOpTime < targetOpTime) {
- return false;
- }
-
- for (SlaveInfoVector::iterator it = _slaveInfo.begin(); it != _slaveInfo.end(); ++it) {
- const OpTime& slaveTime = durablyWritten ? it->lastDurableOpTime : it->lastAppliedOpTime;
- if (slaveTime >= targetOpTime) {
- --numNodes;
- }
-
- if (numNodes <= 0) {
- return true;
- }
- }
- return false;
-}
-
-bool ReplicationCoordinatorImpl::_haveTaggedNodesReachedOpTime_inlock(
- const OpTime& opTime, const ReplSetTagPattern& tagPattern, bool durablyWritten) {
- ReplSetTagMatch matcher(tagPattern);
- for (SlaveInfoVector::iterator it = _slaveInfo.begin(); it != _slaveInfo.end(); ++it) {
- const OpTime& slaveTime = durablyWritten ? it->lastDurableOpTime : it->lastAppliedOpTime;
- if (slaveTime >= opTime) {
- // This node has reached the desired optime, now we need to check if it is a part
- // of the tagPattern.
- const MemberConfig* memberConfig = _rsConfig.findMemberByID(it->memberId);
- invariant(memberConfig);
- for (MemberConfig::TagIterator it = memberConfig->tagsBegin();
- it != memberConfig->tagsEnd();
- ++it) {
- if (matcher.update(*it)) {
- return true;
- }
- }
- }
- }
- return false;
+ return _topCoord->haveTaggedNodesReachedOpTime(opTime, tagPattern.getValue(), useDurableOpTime);
}
ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::awaitReplication(
@@ -1736,7 +1530,7 @@ Status ReplicationCoordinatorImpl::_awaitReplication_inlock(
if (Command::testCommandsEnabled) {
// log state of replica set on timeout to help with diagnosis.
BSONObjBuilder progress;
- _appendSlaveInfoData_inlock(&progress);
+ _topCoord->fillMemberData(&progress);
log() << "Replication for failed WC: " << writeConcern.toBSON()
<< ", waitInfo: " << waiter << ", opID: " << opCtx->getOpID()
<< ", progress: " << progress.done();
@@ -1835,7 +1629,7 @@ bool ReplicationCoordinatorImpl::_tryToStepDown_inlock(const Date_t waitUntil,
OpTime lastApplied = _getMyLastAppliedOpTime_inlock();
if (forceNow) {
- return _topCoord->stepDown(stepDownUntil, forceNow, lastApplied);
+ return _topCoord->stepDown(stepDownUntil, forceNow);
}
auto tagStatus = _rsConfig.findCustomWriteMode(ReplSetConfig::kMajorityWriteConcernModeName);
@@ -1843,8 +1637,8 @@ bool ReplicationCoordinatorImpl::_tryToStepDown_inlock(const Date_t waitUntil,
// Check if a majority of nodes have reached the last applied optime
// and there exist an electable node that has my last applied optime.
- if (_haveTaggedNodesReachedOpTime_inlock(lastApplied, tagStatus.getValue(), false) &&
- _topCoord->stepDown(stepDownUntil, forceNow, lastApplied)) {
+ if (_topCoord->haveTaggedNodesReachedOpTime(lastApplied, tagStatus.getValue(), false) &&
+ _topCoord->stepDown(stepDownUntil, forceNow)) {
return true;
}
@@ -2060,57 +1854,9 @@ Status ReplicationCoordinatorImpl::resyncData(OperationContext* opCtx, bool wait
StatusWith<BSONObj> ReplicationCoordinatorImpl::prepareReplSetUpdatePositionCommand(
ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle) const {
- BSONObjBuilder cmdBuilder;
- {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- invariant(_rsConfig.isInitialized());
- // Do not send updates if we have been removed from the config.
- if (_selfIndex == -1) {
- return Status(ErrorCodes::NodeNotFound,
- "This node is not in the current replset configuration.");
- }
- cmdBuilder.append(UpdatePositionArgs::kCommandFieldName, 1);
- // Create an array containing objects each live member connected to us and for ourself.
- BSONArrayBuilder arrayBuilder(cmdBuilder.subarrayStart("optimes"));
- for (const auto& slaveInfo : _slaveInfo) {
- if (slaveInfo.lastAppliedOpTime.isNull()) {
- // Don't include info on members we haven't heard from yet.
- continue;
- }
- // Don't include members we think are down.
- if (!slaveInfo.self && slaveInfo.down) {
- continue;
- }
-
- BSONObjBuilder entry(arrayBuilder.subobjStart());
- switch (commandStyle) {
- case ReplSetUpdatePositionCommandStyle::kNewStyle:
- slaveInfo.lastDurableOpTime.append(&entry,
- UpdatePositionArgs::kDurableOpTimeFieldName);
- slaveInfo.lastAppliedOpTime.append(&entry,
- UpdatePositionArgs::kAppliedOpTimeFieldName);
- break;
- case ReplSetUpdatePositionCommandStyle::kOldStyle:
- entry.append("_id", slaveInfo.rid);
- if (isV1ElectionProtocol()) {
- slaveInfo.lastDurableOpTime.append(&entry, "optime");
- } else {
- entry.append("optime", slaveInfo.lastDurableOpTime.getTimestamp());
- }
- break;
- }
- entry.append(UpdatePositionArgs::kMemberIdFieldName, slaveInfo.memberId);
- entry.append(UpdatePositionArgs::kConfigVersionFieldName, _rsConfig.getConfigVersion());
- }
- arrayBuilder.done();
- }
-
- // Add metadata to command. Old style parsing logic will reject the metadata.
- if (commandStyle == ReplSetUpdatePositionCommandStyle::kNewStyle) {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- _prepareReplSetMetadata_inlock(OpTime(), &cmdBuilder);
- }
- return cmdBuilder.obj();
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ return _topCoord->prepareReplSetUpdatePositionCommand(
+ commandStyle, _getCurrentCommittedSnapshotOpTime_inlock());
}
Status ReplicationCoordinatorImpl::processReplSetGetStatus(
@@ -2130,9 +1876,6 @@ Status ReplicationCoordinatorImpl::processReplSetGetStatus(
TopologyCoordinator::ReplSetStatusArgs{
_replExecutor->now(),
static_cast<unsigned>(time(0) - serverGlobalParams.started),
- _getMyLastAppliedOpTime_inlock(),
- _getMyLastDurableOpTime_inlock(),
- _lastCommittedOpTime,
_getCurrentCommittedSnapshotOpTime_inlock(),
initialSyncProgress},
response,
@@ -2162,34 +1905,7 @@ void ReplicationCoordinatorImpl::fillIsMasterForReplSet(IsMasterResponse* respon
void ReplicationCoordinatorImpl::appendSlaveInfoData(BSONObjBuilder* result) {
stdx::lock_guard<stdx::mutex> lock(_mutex);
- _appendSlaveInfoData_inlock(result);
-}
-
-void ReplicationCoordinatorImpl::_appendSlaveInfoData_inlock(BSONObjBuilder* result) {
- BSONArrayBuilder replicationProgress(result->subarrayStart("replicationProgress"));
- {
- for (SlaveInfoVector::const_iterator itr = _slaveInfo.begin(); itr != _slaveInfo.end();
- ++itr) {
- BSONObjBuilder entry(replicationProgress.subobjStart());
- entry.append("rid", itr->rid);
- if (isV1ElectionProtocol()) {
- BSONObjBuilder opTime(entry.subobjStart("optime"));
- opTime.append("ts", itr->lastDurableOpTime.getTimestamp());
- opTime.append("term", itr->lastDurableOpTime.getTerm());
- opTime.done();
- } else {
- entry.append("optime", itr->lastDurableOpTime.getTimestamp());
- }
- entry.append("host", itr->hostAndPort.toString());
- if (getReplicationMode() == modeReplSet) {
- if (_selfIndex == -1) {
- continue;
- }
- invariant(itr->memberId >= 0);
- entry.append("memberId", itr->memberId);
- }
- }
- }
+ _topCoord->fillMemberData(result);
}
ReplSetConfig ReplicationCoordinatorImpl::getConfig() const {
@@ -2278,8 +1994,7 @@ Status ReplicationCoordinatorImpl::processReplSetSyncFrom(OperationContext* opCt
auto doResync = false;
{
stdx::lock_guard<stdx::mutex> lk(_mutex);
- auto opTime = _getMyLastAppliedOpTime_inlock();
- _topCoord->prepareSyncFromResponse(target, opTime, resultObj, &result);
+ _topCoord->prepareSyncFromResponse(target, resultObj, &result);
// If we are in the middle of an initial sync, do a resync.
doResync = result.isOK() && _initialSyncer && _initialSyncer->isActive();
}
@@ -2325,12 +2040,8 @@ Status ReplicationCoordinatorImpl::processHeartbeat(const ReplSetHeartbeatArgs&
stdx::lock_guard<stdx::mutex> lk(_mutex);
const Date_t now = _replExecutor->now();
- Status result = _topCoord->prepareHeartbeatResponse(now,
- args,
- _settings.ourSetName(),
- _getMyLastAppliedOpTime_inlock(),
- _getMyLastDurableOpTime_inlock(),
- response);
+ Status result =
+ _topCoord->prepareHeartbeatResponse(now, args, _settings.ourSetName(), response);
if ((result.isOK() || result == ErrorCodes::InvalidReplicaSetConfig) && _selfIndex < 0) {
// If this node does not belong to the configuration it knows about, send heartbeats
// back to any node that sends us a heartbeat, in case one of those remote nodes has
@@ -2893,8 +2604,7 @@ Status ReplicationCoordinatorImpl::processReplSetFresh(const ReplSetFreshArgs& a
BSONObjBuilder* resultObj) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
Status result(ErrorCodes::InternalError, "didn't set status in prepareFreshResponse");
- _topCoord->prepareFreshResponse(
- args, _replExecutor->now(), _getMyLastAppliedOpTime_inlock(), resultObj, &result);
+ _topCoord->prepareFreshResponse(args, _replExecutor->now(), resultObj, &result);
return result;
}
@@ -2902,8 +2612,7 @@ Status ReplicationCoordinatorImpl::processReplSetElect(const ReplSetElectArgs& a
BSONObjBuilder* responseObj) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
Status result = Status(ErrorCodes::InternalError, "status not set by callback");
- _topCoord->prepareElectResponse(
- args, _replExecutor->now(), _getMyLastAppliedOpTime_inlock(), responseObj, &result);
+ _topCoord->prepareElectResponse(args, _replExecutor->now(), responseObj, &result);
return result;
}
@@ -2914,9 +2623,7 @@ ReplicationCoordinatorImpl::_setCurrentRSConfig_inlock(const ReplSetConfig& newC
_cancelHeartbeats_inlock();
_setConfigState_inlock(kConfigSteady);
- // Must get this before changing our config.
- OpTime myOptime = _getMyLastAppliedOpTime_inlock();
- _topCoord->updateConfig(newConfig, myIndex, _replExecutor->now(), myOptime);
+ _topCoord->updateConfig(newConfig, myIndex, _replExecutor->now());
const ReplSetConfig oldConfig = _rsConfig;
_rsConfig = newConfig;
_protVersion.store(_rsConfig.getProtocolVersion());
@@ -2933,7 +2640,6 @@ ReplicationCoordinatorImpl::_setCurrentRSConfig_inlock(const ReplSetConfig& newC
_cancelAndRescheduleElectionTimeout_inlock();
const PostMemberStateUpdateAction action = _updateMemberStateFromTopologyCoordinator_inlock();
- _updateSlaveInfoFromConfig_inlock();
if (_selfIndex >= 0) {
// Don't send heartbeats if we're not in the config, if we get re-added one of the
// nodes in the set will contact us.
@@ -2959,7 +2665,6 @@ ReplicationCoordinatorImpl::_setCurrentRSConfig_inlock(const ReplSetConfig& newC
}
}
- _wakeReadyWaiters_inlock();
return action;
}
@@ -3020,24 +2725,20 @@ Status ReplicationCoordinatorImpl::processHandshake(OperationContext* opCtx,
const HandshakeArgs& handshake) {
LOG(2) << "Received handshake " << handshake.toBSON();
- stdx::unique_lock<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
if (getReplicationMode() != modeMasterSlave) {
return Status(ErrorCodes::IllegalOperation,
"The handshake command is only used for master/slave replication");
}
- SlaveInfo* slaveInfo = _findSlaveInfoByRID_inlock(handshake.getRid());
- if (slaveInfo) {
+ auto* memberHeartbeatData = _topCoord->findMemberHeartbeatDataByRid(handshake.getRid());
+ if (memberHeartbeatData) {
return Status::OK(); // nothing to do
}
- SlaveInfo newSlaveInfo;
- newSlaveInfo.rid = handshake.getRid();
- newSlaveInfo.memberId = -1;
- newSlaveInfo.hostAndPort = _externalState->getClientHostAndPort(opCtx);
- // Don't call _addSlaveInfo_inlock as that would wake sleepers unnecessarily.
- _slaveInfo.push_back(newSlaveInfo);
+ memberHeartbeatData = _topCoord->addSlaveMemberData(handshake.getRid());
+ memberHeartbeatData->setHostAndPort(_externalState->getClientHostAndPort(opCtx));
return Status::OK();
}
@@ -3053,26 +2754,10 @@ bool ReplicationCoordinatorImpl::buildsIndexes() {
std::vector<HostAndPort> ReplicationCoordinatorImpl::getHostsWrittenTo(const OpTime& op,
bool durablyWritten) {
- std::vector<HostAndPort> hosts;
stdx::lock_guard<stdx::mutex> lk(_mutex);
- for (size_t i = 0; i < _slaveInfo.size(); ++i) {
- const SlaveInfo& slaveInfo = _slaveInfo[i];
- if (getReplicationMode() == modeMasterSlave && slaveInfo.rid == _getMyRID_inlock()) {
- // Master-slave doesn't know the HostAndPort for itself at this point.
- continue;
- }
-
- if (durablyWritten) {
- if (slaveInfo.lastDurableOpTime < op) {
- continue;
- }
- } else if (slaveInfo.lastAppliedOpTime < op) {
- continue;
- }
-
- hosts.push_back(slaveInfo.hostAndPort);
- }
- return hosts;
+ /* skip self in master-slave mode because our own HostAndPort is unknown */
+ const bool skipSelf = getReplicationMode() == modeMasterSlave;
+ return _topCoord->getHostsWrittenTo(op, durablyWritten, skipSelf);
}
std::vector<HostAndPort> ReplicationCoordinatorImpl::getOtherNodesInReplSet() const {
@@ -3211,42 +2896,19 @@ bool ReplicationCoordinatorImpl::shouldChangeSyncSource(
const rpc::ReplSetMetadata& replMetadata,
boost::optional<rpc::OplogQueryMetadata> oqMetadata) {
stdx::lock_guard<stdx::mutex> lock(_mutex);
- return _topCoord->shouldChangeSyncSource(currentSource,
- _getMyLastAppliedOpTime_inlock(),
- replMetadata,
- oqMetadata,
- _replExecutor->now());
+ return _topCoord->shouldChangeSyncSource(
+ currentSource, replMetadata, oqMetadata, _replExecutor->now());
}
void ReplicationCoordinatorImpl::_updateLastCommittedOpTime_inlock() {
- if (!_getMemberState_inlock().primary() || _topCoord->isStepDownPending()) {
- return;
+ if (_topCoord->updateLastCommittedOpTime()) {
+ _updateCommitPoint_inlock();
}
-
- std::vector<OpTime> votingNodesOpTimes;
-
- // Whether we use the applied or durable OpTime for the commit point is decided here.
- const bool useDurableOpTime = getWriteConcernMajorityShouldJournal_inlock();
-
- for (const auto& sI : _slaveInfo) {
- auto memberConfig = _rsConfig.findMemberByID(sI.memberId);
- invariant(memberConfig);
- if (memberConfig->isVoter()) {
- const auto opTime = useDurableOpTime ? sI.lastDurableOpTime : sI.lastAppliedOpTime;
- votingNodesOpTimes.push_back(opTime);
- }
- }
-
- invariant(votingNodesOpTimes.size() > 0);
- if (votingNodesOpTimes.size() < static_cast<unsigned long>(_rsConfig.getWriteMajority())) {
- return;
- }
- std::sort(votingNodesOpTimes.begin(), votingNodesOpTimes.end());
-
- // need the majority to have this OpTime
- OpTime committedOpTime =
- votingNodesOpTimes[votingNodesOpTimes.size() - _rsConfig.getWriteMajority()];
- _advanceCommitPoint_inlock(committedOpTime);
+ // Wake up any threads waiting for replication that now have their replication
+ // check satisfied. We must do this regardless of whether we updated the lastCommittedOpTime,
+ // as lastCommittedOpTime may be based on durable optimes whereas some waiters may be
+ // waiting on applied (but not necessarily durable) optimes.
+ _wakeReadyWaiters_inlock();
}
void ReplicationCoordinatorImpl::advanceCommitPoint(const OpTime& committedOpTime) {
@@ -3255,28 +2917,17 @@ void ReplicationCoordinatorImpl::advanceCommitPoint(const OpTime& committedOpTim
}
void ReplicationCoordinatorImpl::_advanceCommitPoint_inlock(const OpTime& committedOpTime) {
- if (committedOpTime == _lastCommittedOpTime) {
- return; // Hasn't changed, so ignore it.
- } else if (committedOpTime < _lastCommittedOpTime) {
- LOG(1) << "Ignoring older committed snapshot optime: " << committedOpTime
- << ", currentCommittedOpTime: " << _lastCommittedOpTime;
- return; // This may have come from an out-of-order heartbeat. Ignore it.
- }
-
- // This check is performed to ensure primaries do not commit an OpTime from a previous term.
- if (_getMemberState_inlock().primary() && committedOpTime < _firstOpTimeOfMyTerm) {
- LOG(1) << "Ignoring older committed snapshot from before I became primary, optime: "
- << committedOpTime << ", firstOpTimeOfMyTerm: " << _firstOpTimeOfMyTerm;
- return;
- }
+ if (_topCoord->advanceLastCommittedOpTime(committedOpTime)) {
+ if (_getMemberState_inlock().arbiter()) {
+ _setMyLastAppliedOpTime_inlock(committedOpTime, false);
+ }
- if (_getMemberState_inlock().arbiter()) {
- _setMyLastAppliedOpTime_inlock(committedOpTime, false);
+ _updateCommitPoint_inlock();
}
+}
- LOG(2) << "Updating _lastCommittedOpTime to " << committedOpTime;
- _lastCommittedOpTime = committedOpTime;
-
+void ReplicationCoordinatorImpl::_updateCommitPoint_inlock() {
+ auto committedOpTime = _topCoord->getLastCommittedOpTime();
_externalState->notifyOplogMetadataWaiters();
auto maxSnapshotForOpTime = SnapshotInfo{committedOpTime, SnapshotName::max()};
@@ -3302,13 +2953,9 @@ void ReplicationCoordinatorImpl::_advanceCommitPoint_inlock(const OpTime& commit
}
}
-void ReplicationCoordinatorImpl::_setFirstOpTimeOfMyTerm_inlock(const OpTime& newOpTime) {
- _firstOpTimeOfMyTerm = newOpTime;
-}
-
OpTime ReplicationCoordinatorImpl::getLastCommittedOpTime() const {
stdx::unique_lock<stdx::mutex> lk(_mutex);
- return _lastCommittedOpTime;
+ return _topCoord->getLastCommittedOpTime();
}
Status ReplicationCoordinatorImpl::processReplSetRequestVotes(
@@ -3325,7 +2972,7 @@ Status ReplicationCoordinatorImpl::processReplSetRequestVotes(
{
stdx::lock_guard<stdx::mutex> lk(_mutex);
- _topCoord->processReplSetRequestVotes(args, response, _getMyLastAppliedOpTime_inlock());
+ _topCoord->processReplSetRequestVotes(args, response);
}
if (!args.isADryRun() && response->getVoteGranted()) {
@@ -3374,16 +3021,13 @@ void ReplicationCoordinatorImpl::_prepareReplSetMetadata_inlock(const OpTime& la
BSONObjBuilder* builder) const {
OpTime lastVisibleOpTime =
std::max(lastOpTimeFromClient, _getCurrentCommittedSnapshotOpTime_inlock());
- auto metadata = _topCoord->prepareReplSetMetadata(lastVisibleOpTime, _lastCommittedOpTime);
+ auto metadata = _topCoord->prepareReplSetMetadata(lastVisibleOpTime);
metadata.writeToMetadata(builder);
}
void ReplicationCoordinatorImpl::_prepareOplogQueryMetadata_inlock(int rbid,
BSONObjBuilder* builder) const {
- OpTime lastAppliedOpTime = _getMyLastAppliedOpTime_inlock();
- auto metadata =
- _topCoord->prepareOplogQueryMetadata(_lastCommittedOpTime, lastAppliedOpTime, rbid);
- metadata.writeToMetadata(builder);
+ _topCoord->prepareOplogQueryMetadata(rbid).writeToMetadata(builder);
}
bool ReplicationCoordinatorImpl::isV1ElectionProtocol() const {
@@ -3413,12 +3057,7 @@ Status ReplicationCoordinatorImpl::processHeartbeatV1(const ReplSetHeartbeatArgs
auto senderHost(args.getSenderHost());
const Date_t now = _replExecutor->now();
- result = _topCoord->prepareHeartbeatResponseV1(now,
- args,
- _settings.ourSetName(),
- _getMyLastAppliedOpTime_inlock(),
- _getMyLastDurableOpTime_inlock(),
- response);
+ result = _topCoord->prepareHeartbeatResponseV1(now, args, _settings.ourSetName(), response);
if ((result.isOK() || result == ErrorCodes::InvalidReplicaSetConfig) && _selfIndex < 0) {
// If this node does not belong to the configuration it knows about, send heartbeats
@@ -3438,12 +3077,12 @@ Status ReplicationCoordinatorImpl::processHeartbeatV1(const ReplSetHeartbeatArgs
}
} else if (result.isOK()) {
// Update liveness for sending node.
- auto slaveInfo = _findSlaveInfoByMemberID_inlock(args.getSenderId());
- if (!slaveInfo) {
+ auto* memberHeartbeatData =
+ _topCoord->findMemberHeartbeatDataByMemberId(args.getSenderId());
+ if (!memberHeartbeatData) {
return result;
}
- slaveInfo->lastUpdate = _replExecutor->now();
- slaveInfo->down = false;
+ memberHeartbeatData->updateLiveness(_replExecutor->now());
}
return result;
}
@@ -3574,7 +3213,7 @@ void ReplicationCoordinatorImpl::createSnapshot(OperationContext* opCtx,
_externalState->createSnapshot(opCtx, name);
auto snapshotInfo = SnapshotInfo{timeOfSnapshot, name};
- if (timeOfSnapshot <= _lastCommittedOpTime) {
+ if (timeOfSnapshot <= _topCoord->getLastCommittedOpTime()) {
// This snapshot is ready to be marked as committed.
invariant(_uncommittedSnapshots.empty());
_updateCommittedSnapshot_inlock(snapshotInfo);
@@ -3596,7 +3235,7 @@ void ReplicationCoordinatorImpl::createSnapshot(OperationContext* opCtx,
void ReplicationCoordinatorImpl::_updateCommittedSnapshot_inlock(
SnapshotInfo newCommittedSnapshot) {
invariant(!newCommittedSnapshot.opTime.isNull());
- invariant(newCommittedSnapshot.opTime <= _lastCommittedOpTime);
+ invariant(newCommittedSnapshot.opTime <= _topCoord->getLastCommittedOpTime());
if (_currentCommittedSnapshot) {
invariant(newCommittedSnapshot.opTime >= _currentCommittedSnapshot->opTime);
invariant(newCommittedSnapshot.name > _currentCommittedSnapshot->name);