summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorScott Hernandez <scotthernandez@gmail.com>2014-07-21 11:15:20 -0400
committerScott Hernandez <scotthernandez@gmail.com>2014-07-22 09:08:23 -0400
commitfb270d89cbcfdb98c3cee3e631c76ca035c7b4f0 (patch)
tree4c638460603110e4f4c96d6d4d75e89e73bd892a /src
parentd8f96453ea6e86c9fbe020766edda85b44823201 (diff)
downloadmongo-fb270d89cbcfdb98c3cee3e631c76ca035c7b4f0.tar.gz
SERVER-14517: Start of heartbeat
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/db/repl/repl_coordinator_impl.cpp139
-rw-r--r--src/mongo/db/repl/repl_coordinator_impl.h16
-rw-r--r--src/mongo/db/repl/topology_coordinator.h3
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.cpp2
5 files changed, 144 insertions, 17 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 9b3aeaf776f..f98d2bf694f 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -28,6 +28,7 @@ env.Library('topology_coordinator_impl',
env.Library('repl_coordinator_impl',
'repl_coordinator_impl.cpp',
LIBDEPS=['$BUILD_DIR/mongo/db/common',
+ '$BUILD_DIR/mongo/fail_point',
'$BUILD_DIR/mongo/foundation',
'$BUILD_DIR/mongo/server_options_core',
'repl_coordinator_interface',
diff --git a/src/mongo/db/repl/repl_coordinator_impl.cpp b/src/mongo/db/repl/repl_coordinator_impl.cpp
index 3f38d0d381e..b1cc99d2b02 100644
--- a/src/mongo/db/repl/repl_coordinator_impl.cpp
+++ b/src/mongo/db/repl/repl_coordinator_impl.cpp
@@ -41,12 +41,17 @@
#include "mongo/db/write_concern_options.h"
#include "mongo/stdx/functional.h"
#include "mongo/util/assert_util.h"
+#include "mongo/util/fail_point_service.h"
#include "mongo/util/time_support.h"
#include "mongo/util/timer.h"
namespace mongo {
namespace repl {
+ namespace {
+ typedef StatusWith<ReplicationExecutor::CallbackHandle> CBHStatus;
+ } //namespace
+
struct ReplicationCoordinatorImpl::WaiterInfo {
/**
@@ -74,8 +79,14 @@ namespace repl {
};
ReplicationCoordinatorImpl::ReplicationCoordinatorImpl(
- const ReplSettings& settings, ReplicationCoordinatorExternalState* externalState) :
- _inShutdown(false), _settings(settings), _externalState(externalState) {}
+ const ReplSettings& settings,
+ ReplicationCoordinatorExternalState* externalState) :
+ _inShutdown(false),
+ _settings(settings),
+ _externalState(externalState),
+ _thisMembersConfigIndex(-1) {
+
+ }
ReplicationCoordinatorImpl::~ReplicationCoordinatorImpl() {}
@@ -92,7 +103,8 @@ namespace repl {
_topCoord->registerConfigChangeCallback(
stdx::bind(&ReplicationCoordinatorImpl::setCurrentReplicaSetConfig,
this,
- stdx::placeholders::_1));
+ stdx::placeholders::_1,
+ stdx::placeholders::_2));
_topCoord->registerStateChangeCallback(
stdx::bind(&ReplicationCoordinatorImpl::setCurrentMemberState,
this,
@@ -371,7 +383,7 @@ namespace repl {
Status ReplicationCoordinatorImpl::processHeartbeat(const BSONObj& cmdObj,
BSONObjBuilder* resultObj) {
Status result(ErrorCodes::InternalError, "didn't set status in prepareHeartbeatResponse");
- StatusWith<ReplicationExecutor::CallbackHandle> cbh = _replExecutor->scheduleWork(
+ CBHStatus cbh = _replExecutor->scheduleWork(
stdx::bind(&TopologyCoordinator::prepareHeartbeatResponse,
_topCoord.get(),
stdx::placeholders::_1,
@@ -420,12 +432,15 @@ namespace repl {
return Status::OK();
}
- void ReplicationCoordinatorImpl::setCurrentReplicaSetConfig(const ReplicaSetConfig& newConfig) {
+ void ReplicationCoordinatorImpl::setCurrentReplicaSetConfig(const ReplicaSetConfig& newConfig,
+ int myIndex) {
invariant(getReplicationMode() == modeReplSet);
boost::lock_guard<boost::mutex> lk(_mutex);
_rsConfig = newConfig;
+ _thisMembersConfigIndex = myIndex;
- // TODO: Cancel heartbeats, start new ones
+ cancelHeartbeats();
+ _startHeartbeats();
// TODO(SERVER-14591): instead of this, use WriteConcernOptions and store in replcoord;
// in getLastError command, fetch the defaults via a getter in replcoord.
@@ -478,15 +493,98 @@ namespace repl {
return Status::OK();
}
- void ReplicationCoordinatorImpl::doMemberHeartbeat(ReplicationExecutor* executor,
- const Status& inStatus,
+ MONGO_FP_DECLARE(rsHeartbeatRequestNoopByMember);
+
+ namespace {
+ // decide where these live, see TopologyCoordinator::HeartbeatOptions
+ const int heartbeatFrequencyMillis = 2 * 1000; // 2 seconds
+ const int heartbeatTimeoutDefaultMillis = 10 * 1000; // 10 seconds
+ const int heartbeatRetries = 2;
+ } //namespace
+
+
+ void ReplicationCoordinatorImpl::doMemberHeartbeat(ReplicationExecutor::CallbackData cbData,
const HostAndPort& hap) {
- // TODO
+
+ if (cbData.status == ErrorCodes::CallbackCanceled) {
+ return;
+ }
+
+ // Are we blind, or do we have a failpoint setup to ignore this member?
+ bool dontHeartbeatMember = false; // TODO: replSetBlind should be here as the default
+
+ MONGO_FAIL_POINT_BLOCK(rsHeartbeatRequestNoopByMember, member) {
+ const StringData& stopMember = member.getData()["member"].valueStringData();
+ HostAndPort ignoreHAP;
+ Status status = ignoreHAP.initialize(stopMember);
+ // Ignore
+ if (status.isOK()) {
+ if (hap == ignoreHAP) {
+ dontHeartbeatMember = true;
+ }
+ } else {
+ log() << "replset: Bad member for rsHeartbeatRequestNoopByMember failpoint "
+ << member.getData() << ". 'member' failed to parse into HostAndPort -- "
+ << status;
+ }
+ }
+
+ if (dontHeartbeatMember) {
+ // Don't issue real heartbeats, just call start again after the timeout.
+ ReplicationExecutor::CallbackFn restartCB = stdx::bind(
+ &ReplicationCoordinatorImpl::doMemberHeartbeat,
+ this,
+ stdx::placeholders::_1,
+ hap);
+ CBHStatus status = _replExecutor->scheduleWorkAt(
+ Date_t(curTimeMillis64() + heartbeatFrequencyMillis),
+ restartCB);
+ if (!status.isOK()) {
+ log() << "replset: aborting heartbeats for " << hap << " due to scheduling error"
+ << " -- "<< status;
+ return;
+ }
+ _trackHeartbeatHandle(status.getValue());
+ return;
+ }
+
+ // Compose heartbeat command message
+ BSONObj hbCommandBSON;
+ {
+ // take lock to build request
+ boost::lock_guard<boost::mutex> lock(_mutex);
+ BSONObjBuilder cmdBuilder;
+ const MemberConfig me = _rsConfig.getMemberAt(_thisMembersConfigIndex);
+ cmdBuilder.append("replSetHeartbeat", _rsConfig.getReplSetName());
+ cmdBuilder.append("v", _rsConfig.getConfigVersion());
+ cmdBuilder.append("pv", 1);
+ cmdBuilder.append("checkEmpty", false);
+ cmdBuilder.append("from", me.getHostAndPort().toString());
+ cmdBuilder.append("fromId", me.getId());
+ hbCommandBSON = cmdBuilder.done();
+ }
+ const ReplicationExecutor::RemoteCommandRequest request(hap, "admin", hbCommandBSON);
+
+ ReplicationExecutor::RemoteCommandCallbackFn callback = stdx::bind(
+ &ReplicationCoordinatorImpl::_handleHeartbeatResponse,
+ this,
+ stdx::placeholders::_1,
+ hap,
+ curTimeMillis64(),
+ heartbeatRetries);
+
+
+ CBHStatus status = _replExecutor->scheduleRemoteCommand(request, callback);
+ if (!status.isOK()) {
+ log() << "replset: aborting heartbeats for " << hap << " due to scheduling error"
+ << status;
+ return;
+ }
+ _trackHeartbeatHandle(status.getValue());
}
void ReplicationCoordinatorImpl::_handleHeartbeatResponse(
const ReplicationExecutor::RemoteCommandCallbackData& cbData,
- StatusWith<BSONObj>* outStatus,
const HostAndPort& hap,
Date_t firstCallDate,
int retriesLeft) {
@@ -525,5 +623,26 @@ namespace repl {
_heartbeatHandles.clear();
}
+ void ReplicationCoordinatorImpl::_startHeartbeats() {
+ ReplicaSetConfig::MemberIterator it = _rsConfig.membersBegin();
+ ReplicaSetConfig::MemberIterator end = _rsConfig.membersBegin();
+
+ for(;it != end; it++) {
+ HostAndPort host = it->getHostAndPort();
+ CBHStatus status = _replExecutor->scheduleWork(
+ stdx::bind(
+ &ReplicationCoordinatorImpl::doMemberHeartbeat,
+ this,
+ stdx::placeholders::_1,
+ host));
+ if (!status.isOK()) {
+ log() << "replset: cannot start heartbeats for "
+ << host << " due to scheduling error -- "<< status;
+ continue;
+ }
+ _trackHeartbeatHandle(status.getValue());
+ }
+ }
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/repl_coordinator_impl.h b/src/mongo/db/repl/repl_coordinator_impl.h
index dcf635edebc..3365126546a 100644
--- a/src/mongo/db/repl/repl_coordinator_impl.h
+++ b/src/mongo/db/repl/repl_coordinator_impl.h
@@ -160,14 +160,13 @@ namespace repl {
void setCurrentMemberState(const MemberState& newState);
// Called by the TopologyCoordinator whenever the replica set configuration is updated
- void setCurrentReplicaSetConfig(const ReplicaSetConfig& newConfig);
+ void setCurrentReplicaSetConfig(const ReplicaSetConfig& newConfig, int myIndex);
/**
* Does a heartbeat for a member of the replica set.
* Should be started during (re)configuration or in the heartbeat callback only.
*/
- void doMemberHeartbeat(ReplicationExecutor* executor,
- const Status& inStatus,
+ void doMemberHeartbeat(ReplicationExecutor::CallbackData cbData,
const HostAndPort& hap);
/**
@@ -196,7 +195,6 @@ namespace repl {
* and on success.
*/
void _handleHeartbeatResponse(const ReplicationExecutor::RemoteCommandCallbackData& cbData,
- StatusWith<BSONObj>* outStatus,
const HostAndPort& hap,
Date_t firstCallDate,
int retriesLeft);
@@ -205,7 +203,12 @@ namespace repl {
void _untrackHeartbeatHandle(const ReplicationExecutor::CallbackHandle& handle);
- // Handles to actively queued heartbeats.
+ /**
+ * Start a heartbeat for each member in the current config
+ */
+ void _startHeartbeats();
+
+ // Handles to actively queued heartbeats
typedef std::vector<ReplicationExecutor::CallbackHandle> HeartbeatHandles;
HeartbeatHandles _heartbeatHandles;
@@ -248,6 +251,9 @@ namespace repl {
// The current ReplicaSet configuration object, including the information about tag groups
// that is used to satisfy write concern requests with named gle modes.
ReplicaSetConfig _rsConfig;
+
+ // This member's index position in the current config.
+ int _thisMembersConfigIndex;
};
} // namespace repl
diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h
index 6ca7bb12fa9..cac580dafaa 100644
--- a/src/mongo/db/repl/topology_coordinator.h
+++ b/src/mongo/db/repl/topology_coordinator.h
@@ -89,7 +89,8 @@ namespace repl {
// Add function pointer to callback list; call function when config changes
// Applier needs to know when things like chainingAllowed or slaveDelay change.
// ReplCoord needs to know when things like the tag sets change.
- typedef stdx::function<void (const ReplicaSetConfig& config)> ConfigChangeCallbackFn;
+ typedef stdx::function<void (const ReplicaSetConfig& config, int myIndex)>
+ ConfigChangeCallbackFn;
virtual void registerConfigChangeCallback(const ConfigChangeCallbackFn& fn) = 0;
// ReplCoord needs to know the state to implement certain public functions
typedef stdx::function<void (const MemberState& newMemberState)> StateChangeCallbackFn;
diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp
index fa7fa3ba2be..e4b20a40781 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl.cpp
@@ -1051,7 +1051,7 @@ namespace repl {
for (std::vector<ConfigChangeCallbackFn>::const_iterator it =
_configChangeCallbacks.begin();
it != _configChangeCallbacks.end(); ++it) {
- (*it)(_currentConfig);
+ (*it)(_currentConfig, selfIndex);
}
}