summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorEric Milkie <milkie@10gen.com>2014-10-16 13:26:54 -0400
committerEric Milkie <milkie@10gen.com>2014-10-16 13:59:44 -0400
commitca1a8d5bb453a645b61a8ebcee8b40b7a51bdfdf (patch)
treeb1785488c090a104dcc35b5c97bcb7037b17be51 /src
parent9ccfb17324b76539401e3eeb7cbb8f3f0e2140d5 (diff)
downloadmongo-ca1a8d5bb453a645b61a8ebcee8b40b7a51bdfdf.tar.gz
SERVER-15535 update slavemap optimes on heartbeats
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/repl_coordinator_impl.h7
-rw-r--r--src/mongo/db/repl/repl_coordinator_impl_heartbeat.cpp32
2 files changed, 39 insertions, 0 deletions
diff --git a/src/mongo/db/repl/repl_coordinator_impl.h b/src/mongo/db/repl/repl_coordinator_impl.h
index 9c280b12303..7a30fbd6687 100644
--- a/src/mongo/db/repl/repl_coordinator_impl.h
+++ b/src/mongo/db/repl/repl_coordinator_impl.h
@@ -465,6 +465,13 @@ namespace repl {
void _untrackHeartbeatHandle(const ReplicationExecutor::CallbackHandle& handle);
/**
+ * Helper for _handleHeartbeatResponse.
+ *
+ * Looks up target in the slave map and updates its optime if found.
+ */
+ void _updateOpTimeFromHeartbeat(const HostAndPort& target, OpTime optime);
+
+ /**
* Starts a heartbeat for each member in the current config. Called within the executor
* context.
*/
diff --git a/src/mongo/db/repl/repl_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/repl_coordinator_impl_heartbeat.cpp
index b7c67d97426..8cdafe23bd7 100644
--- a/src/mongo/db/repl/repl_coordinator_impl_heartbeat.cpp
+++ b/src/mongo/db/repl/repl_coordinator_impl_heartbeat.cpp
@@ -149,6 +149,12 @@ namespace {
hbStatusResponse,
lastApplied);
+ if (action.getAction() == HeartbeatResponseAction::NoAction &&
+ hbStatusResponse.isOK() &&
+ hbStatusResponse.getValue().hasOpTime()) {
+ _updateOpTimeFromHeartbeat(target, hbStatusResponse.getValue().getOpTime());
+ }
+
_scheduleHeartbeatToTarget(
target,
std::max(now, action.getNextHeartbeatStartDate()));
@@ -156,6 +162,32 @@ namespace {
_handleHeartbeatResponseAction(action, hbStatusResponse);
}
+ void ReplicationCoordinatorImpl::_updateOpTimeFromHeartbeat(const HostAndPort& target,
+ OpTime optime) {
+ boost::unique_lock<boost::mutex> lk(_mutex);
+ const MemberConfig* targetMember = _rsConfig.findMemberByHostAndPort(target);
+ if (!targetMember) {
+ return;
+ }
+ int targetId = targetMember->getId();
+ // Find targetId in map
+ for (SlaveInfoMap::const_iterator it = _slaveInfoMap.begin();
+ it != _slaveInfoMap.end(); ++it) {
+ const OID& rid = it->first;
+ const SlaveInfo& slaveInfo = it->second;
+ if (slaveInfo.memberID == targetId) {
+ Status status = _setLastOptime_inlock(&lk,
+ rid,
+ optime);
+ if (!status.isOK()) {
+ LOG(1) << "Could not update optime from node " << target.toString() <<
+ ": " << status;
+ }
+ return;
+ }
+ }
+ }
+
void ReplicationCoordinatorImpl::_handleHeartbeatResponseAction(
const HeartbeatResponseAction& action,
const StatusWith<ReplSetHeartbeatResponse>& responseStatus) {