diff options
author | Scott Hernandez <scotthernandez@gmail.com> | 2014-07-21 11:15:20 -0400 |
---|---|---|
committer | Scott Hernandez <scotthernandez@gmail.com> | 2014-07-22 09:08:23 -0400 |
commit | fb270d89cbcfdb98c3cee3e631c76ca035c7b4f0 (patch) | |
tree | 4c638460603110e4f4c96d6d4d75e89e73bd892a /src | |
parent | d8f96453ea6e86c9fbe020766edda85b44823201 (diff) | |
download | mongo-fb270d89cbcfdb98c3cee3e631c76ca035c7b4f0.tar.gz |
SERVER-14517: Start of heartbeat
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/repl/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_impl.cpp | 139 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_impl.h | 16 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator.h | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_impl.cpp | 2 |
5 files changed, 144 insertions, 17 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 9b3aeaf776f..f98d2bf694f 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -28,6 +28,7 @@ env.Library('topology_coordinator_impl', env.Library('repl_coordinator_impl', 'repl_coordinator_impl.cpp', LIBDEPS=['$BUILD_DIR/mongo/db/common', + '$BUILD_DIR/mongo/fail_point', '$BUILD_DIR/mongo/foundation', '$BUILD_DIR/mongo/server_options_core', 'repl_coordinator_interface', diff --git a/src/mongo/db/repl/repl_coordinator_impl.cpp b/src/mongo/db/repl/repl_coordinator_impl.cpp index 3f38d0d381e..b1cc99d2b02 100644 --- a/src/mongo/db/repl/repl_coordinator_impl.cpp +++ b/src/mongo/db/repl/repl_coordinator_impl.cpp @@ -41,12 +41,17 @@ #include "mongo/db/write_concern_options.h" #include "mongo/stdx/functional.h" #include "mongo/util/assert_util.h" +#include "mongo/util/fail_point_service.h" #include "mongo/util/time_support.h" #include "mongo/util/timer.h" namespace mongo { namespace repl { + namespace { + typedef StatusWith<ReplicationExecutor::CallbackHandle> CBHStatus; + } //namespace + struct ReplicationCoordinatorImpl::WaiterInfo { /** @@ -74,8 +79,14 @@ namespace repl { }; ReplicationCoordinatorImpl::ReplicationCoordinatorImpl( - const ReplSettings& settings, ReplicationCoordinatorExternalState* externalState) : - _inShutdown(false), _settings(settings), _externalState(externalState) {} + const ReplSettings& settings, + ReplicationCoordinatorExternalState* externalState) : + _inShutdown(false), + _settings(settings), + _externalState(externalState), + _thisMembersConfigIndex(-1) { + + } ReplicationCoordinatorImpl::~ReplicationCoordinatorImpl() {} @@ -92,7 +103,8 @@ namespace repl { _topCoord->registerConfigChangeCallback( stdx::bind(&ReplicationCoordinatorImpl::setCurrentReplicaSetConfig, this, - stdx::placeholders::_1)); + stdx::placeholders::_1, + stdx::placeholders::_2)); _topCoord->registerStateChangeCallback( stdx::bind(&ReplicationCoordinatorImpl::setCurrentMemberState, this, @@ -371,7 +383,7 @@ namespace repl { Status ReplicationCoordinatorImpl::processHeartbeat(const BSONObj& cmdObj, BSONObjBuilder* resultObj) { Status result(ErrorCodes::InternalError, "didn't set status in prepareHeartbeatResponse"); - StatusWith<ReplicationExecutor::CallbackHandle> cbh = _replExecutor->scheduleWork( + CBHStatus cbh = _replExecutor->scheduleWork( stdx::bind(&TopologyCoordinator::prepareHeartbeatResponse, _topCoord.get(), stdx::placeholders::_1, @@ -420,12 +432,15 @@ namespace repl { return Status::OK(); } - void ReplicationCoordinatorImpl::setCurrentReplicaSetConfig(const ReplicaSetConfig& newConfig) { + void ReplicationCoordinatorImpl::setCurrentReplicaSetConfig(const ReplicaSetConfig& newConfig, + int myIndex) { invariant(getReplicationMode() == modeReplSet); boost::lock_guard<boost::mutex> lk(_mutex); _rsConfig = newConfig; + _thisMembersConfigIndex = myIndex; - // TODO: Cancel heartbeats, start new ones + cancelHeartbeats(); + _startHeartbeats(); // TODO(SERVER-14591): instead of this, use WriteConcernOptions and store in replcoord; // in getLastError command, fetch the defaults via a getter in replcoord. @@ -478,15 +493,98 @@ namespace repl { return Status::OK(); } - void ReplicationCoordinatorImpl::doMemberHeartbeat(ReplicationExecutor* executor, - const Status& inStatus, + MONGO_FP_DECLARE(rsHeartbeatRequestNoopByMember); + + namespace { + // decide where these live, see TopologyCoordinator::HeartbeatOptions + const int heartbeatFrequencyMillis = 2 * 1000; // 2 seconds + const int heartbeatTimeoutDefaultMillis = 10 * 1000; // 10 seconds + const int heartbeatRetries = 2; + } //namespace + + + void ReplicationCoordinatorImpl::doMemberHeartbeat(ReplicationExecutor::CallbackData cbData, const HostAndPort& hap) { - // TODO + + if (cbData.status == ErrorCodes::CallbackCanceled) { + return; + } + + // Are we blind, or do we have a failpoint setup to ignore this member? + bool dontHeartbeatMember = false; // TODO: replSetBlind should be here as the default + + MONGO_FAIL_POINT_BLOCK(rsHeartbeatRequestNoopByMember, member) { + const StringData& stopMember = member.getData()["member"].valueStringData(); + HostAndPort ignoreHAP; + Status status = ignoreHAP.initialize(stopMember); + // Ignore + if (status.isOK()) { + if (hap == ignoreHAP) { + dontHeartbeatMember = true; + } + } else { + log() << "replset: Bad member for rsHeartbeatRequestNoopByMember failpoint " + << member.getData() << ". 'member' failed to parse into HostAndPort -- " + << status; + } + } + + if (dontHeartbeatMember) { + // Don't issue real heartbeats, just call start again after the timeout. + ReplicationExecutor::CallbackFn restartCB = stdx::bind( + &ReplicationCoordinatorImpl::doMemberHeartbeat, + this, + stdx::placeholders::_1, + hap); + CBHStatus status = _replExecutor->scheduleWorkAt( + Date_t(curTimeMillis64() + heartbeatFrequencyMillis), + restartCB); + if (!status.isOK()) { + log() << "replset: aborting heartbeats for " << hap << " due to scheduling error" + << " -- "<< status; + return; + } + _trackHeartbeatHandle(status.getValue()); + return; + } + + // Compose heartbeat command message + BSONObj hbCommandBSON; + { + // take lock to build request + boost::lock_guard<boost::mutex> lock(_mutex); + BSONObjBuilder cmdBuilder; + const MemberConfig me = _rsConfig.getMemberAt(_thisMembersConfigIndex); + cmdBuilder.append("replSetHeartbeat", _rsConfig.getReplSetName()); + cmdBuilder.append("v", _rsConfig.getConfigVersion()); + cmdBuilder.append("pv", 1); + cmdBuilder.append("checkEmpty", false); + cmdBuilder.append("from", me.getHostAndPort().toString()); + cmdBuilder.append("fromId", me.getId()); + hbCommandBSON = cmdBuilder.done(); + } + const ReplicationExecutor::RemoteCommandRequest request(hap, "admin", hbCommandBSON); + + ReplicationExecutor::RemoteCommandCallbackFn callback = stdx::bind( + &ReplicationCoordinatorImpl::_handleHeartbeatResponse, + this, + stdx::placeholders::_1, + hap, + curTimeMillis64(), + heartbeatRetries); + + + CBHStatus status = _replExecutor->scheduleRemoteCommand(request, callback); + if (!status.isOK()) { + log() << "replset: aborting heartbeats for " << hap << " due to scheduling error" + << status; + return; + } + _trackHeartbeatHandle(status.getValue()); } void ReplicationCoordinatorImpl::_handleHeartbeatResponse( const ReplicationExecutor::RemoteCommandCallbackData& cbData, - StatusWith<BSONObj>* outStatus, const HostAndPort& hap, Date_t firstCallDate, int retriesLeft) { @@ -525,5 +623,26 @@ namespace repl { _heartbeatHandles.clear(); } + void ReplicationCoordinatorImpl::_startHeartbeats() { + ReplicaSetConfig::MemberIterator it = _rsConfig.membersBegin(); + ReplicaSetConfig::MemberIterator end = _rsConfig.membersBegin(); + + for(;it != end; it++) { + HostAndPort host = it->getHostAndPort(); + CBHStatus status = _replExecutor->scheduleWork( + stdx::bind( + &ReplicationCoordinatorImpl::doMemberHeartbeat, + this, + stdx::placeholders::_1, + host)); + if (!status.isOK()) { + log() << "replset: cannot start heartbeats for " + << host << " due to scheduling error -- "<< status; + continue; + } + _trackHeartbeatHandle(status.getValue()); + } + } + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/repl_coordinator_impl.h b/src/mongo/db/repl/repl_coordinator_impl.h index dcf635edebc..3365126546a 100644 --- a/src/mongo/db/repl/repl_coordinator_impl.h +++ b/src/mongo/db/repl/repl_coordinator_impl.h @@ -160,14 +160,13 @@ namespace repl { void setCurrentMemberState(const MemberState& newState); // Called by the TopologyCoordinator whenever the replica set configuration is updated - void setCurrentReplicaSetConfig(const ReplicaSetConfig& newConfig); + void setCurrentReplicaSetConfig(const ReplicaSetConfig& newConfig, int myIndex); /** * Does a heartbeat for a member of the replica set. * Should be started during (re)configuration or in the heartbeat callback only. */ - void doMemberHeartbeat(ReplicationExecutor* executor, - const Status& inStatus, + void doMemberHeartbeat(ReplicationExecutor::CallbackData cbData, const HostAndPort& hap); /** @@ -196,7 +195,6 @@ namespace repl { * and on success. */ void _handleHeartbeatResponse(const ReplicationExecutor::RemoteCommandCallbackData& cbData, - StatusWith<BSONObj>* outStatus, const HostAndPort& hap, Date_t firstCallDate, int retriesLeft); @@ -205,7 +203,12 @@ namespace repl { void _untrackHeartbeatHandle(const ReplicationExecutor::CallbackHandle& handle); - // Handles to actively queued heartbeats. + /** + * Start a heartbeat for each member in the current config + */ + void _startHeartbeats(); + + // Handles to actively queued heartbeats typedef std::vector<ReplicationExecutor::CallbackHandle> HeartbeatHandles; HeartbeatHandles _heartbeatHandles; @@ -248,6 +251,9 @@ namespace repl { // The current ReplicaSet configuration object, including the information about tag groups // that is used to satisfy write concern requests with named gle modes. ReplicaSetConfig _rsConfig; + + // This member's index position in the current config. + int _thisMembersConfigIndex; }; } // namespace repl diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h index 6ca7bb12fa9..cac580dafaa 100644 --- a/src/mongo/db/repl/topology_coordinator.h +++ b/src/mongo/db/repl/topology_coordinator.h @@ -89,7 +89,8 @@ namespace repl { // Add function pointer to callback list; call function when config changes // Applier needs to know when things like chainingAllowed or slaveDelay change. // ReplCoord needs to know when things like the tag sets change. - typedef stdx::function<void (const ReplicaSetConfig& config)> ConfigChangeCallbackFn; + typedef stdx::function<void (const ReplicaSetConfig& config, int myIndex)> + ConfigChangeCallbackFn; virtual void registerConfigChangeCallback(const ConfigChangeCallbackFn& fn) = 0; // ReplCoord needs to know the state to implement certain public functions typedef stdx::function<void (const MemberState& newMemberState)> StateChangeCallbackFn; diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp index fa7fa3ba2be..e4b20a40781 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl.cpp @@ -1051,7 +1051,7 @@ namespace repl { for (std::vector<ConfigChangeCallbackFn>::const_iterator it = _configChangeCallbacks.begin(); it != _configChangeCallbacks.end(); ++it) { - (*it)(_currentConfig); + (*it)(_currentConfig, selfIndex); } } |