summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorSiyuan Zhou <siyuan.zhou@mongodb.com>2016-03-08 19:40:54 -0500
committerSiyuan Zhou <siyuan.zhou@mongodb.com>2016-04-12 16:54:42 -0400
commitded60f5d4254f08d76ccdf0d3a694d473bd14100 (patch)
tree585cd96ce9dbfc9d23e6a66911e33f464e87edf1 /src/mongo/db
parent51abe7d21b799a3758ce71d18ac7d6a088f71e2c (diff)
downloadmongo-ded60f5d4254f08d76ccdf0d3a694d473bd14100.tar.gz
SERVER-22995 Protect TopoCoord with mutex rather than single thread executor.
Move scatter gather runner out of executor and protect the runner with its own mutex. Replace onComplete with callbacks scheduled on finish event.
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/repl/check_quorum_for_config_change.cpp4
-rw-r--r--src/mongo/db/repl/elect_cmd_runner.cpp11
-rw-r--r--src/mongo/db/repl/elect_cmd_runner.h17
-rw-r--r--src/mongo/db/repl/election_winner_declarer.cpp8
-rw-r--r--src/mongo/db/repl/election_winner_declarer.h9
-rw-r--r--src/mongo/db/repl/freshness_checker.cpp11
-rw-r--r--src/mongo/db/repl/freshness_checker.h21
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp877
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h207
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_elect.cpp49
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp8
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp45
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp8
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp86
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_test.cpp14
-rw-r--r--src/mongo/db/repl/scatter_gather_runner.cpp123
-rw-r--r--src/mongo/db/repl/scatter_gather_runner.h97
-rw-r--r--src/mongo/db/repl/scatter_gather_test.cpp69
-rw-r--r--src/mongo/db/repl/topology_coordinator.h6
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.cpp16
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.h6
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl_test.cpp86
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp86
-rw-r--r--src/mongo/db/repl/vote_requester.cpp22
-rw-r--r--src/mongo/db/repl/vote_requester.h23
25 files changed, 670 insertions, 1239 deletions
diff --git a/src/mongo/db/repl/check_quorum_for_config_change.cpp b/src/mongo/db/repl/check_quorum_for_config_change.cpp
index ae1a1f9c7fa..583b3df37d8 100644
--- a/src/mongo/db/repl/check_quorum_for_config_change.cpp
+++ b/src/mongo/db/repl/check_quorum_for_config_change.cpp
@@ -282,8 +282,8 @@ Status checkQuorumGeneral(ReplicationExecutor* executor,
const ReplicaSetConfig& rsConfig,
const int myIndex) {
QuorumChecker checker(&rsConfig, myIndex);
- ScatterGatherRunner runner(&checker);
- Status status = runner.run(executor);
+ ScatterGatherRunner runner(&checker, executor);
+ Status status = runner.run();
if (!status.isOK()) {
return status;
}
diff --git a/src/mongo/db/repl/elect_cmd_runner.cpp b/src/mongo/db/repl/elect_cmd_runner.cpp
index 15bf827b1da..dc2bb2e240d 100644
--- a/src/mongo/db/repl/elect_cmd_runner.cpp
+++ b/src/mongo/db/repl/elect_cmd_runner.cpp
@@ -131,16 +131,15 @@ StatusWith<ReplicationExecutor::EventHandle> ElectCmdRunner::start(
ReplicationExecutor* executor,
const ReplicaSetConfig& currentConfig,
int selfIndex,
- const std::vector<HostAndPort>& targets,
- const stdx::function<void()>& onCompletion) {
+ const std::vector<HostAndPort>& targets) {
_algorithm.reset(new Algorithm(currentConfig, selfIndex, targets, OID::gen()));
- _runner.reset(new ScatterGatherRunner(_algorithm.get()));
- return _runner->start(executor, onCompletion);
+ _runner.reset(new ScatterGatherRunner(_algorithm.get(), executor));
+ return _runner->start();
}
-void ElectCmdRunner::cancel(ReplicationExecutor* executor) {
+void ElectCmdRunner::cancel() {
_isCanceled = true;
- _runner->cancel(executor);
+ _runner->cancel();
}
int ElectCmdRunner::getReceivedVotes() const {
diff --git a/src/mongo/db/repl/elect_cmd_runner.h b/src/mongo/db/repl/elect_cmd_runner.h
index ad12703b68e..32539b2cd26 100644
--- a/src/mongo/db/repl/elect_cmd_runner.h
+++ b/src/mongo/db/repl/elect_cmd_runner.h
@@ -90,20 +90,15 @@ public:
*
* Returned handle can be used to schedule a callback when the process is complete.
*/
- StatusWith<ReplicationExecutor::EventHandle> start(
- ReplicationExecutor* executor,
- const ReplicaSetConfig& currentConfig,
- int selfIndex,
- const std::vector<HostAndPort>& targets,
- const stdx::function<void()>& onCompletion = stdx::function<void()>());
+ StatusWith<ReplicationExecutor::EventHandle> start(ReplicationExecutor* executor,
+ const ReplicaSetConfig& currentConfig,
+ int selfIndex,
+ const std::vector<HostAndPort>& targets);
/**
- * Informs the ElectCmdRunner to cancel further processing. The "executor"
- * argument must point to the same executor passed to "start()".
- *
- * Like start(), this method must run in the executor context.
+ * Informs the ElectCmdRunner to cancel further processing.
*/
- void cancel(ReplicationExecutor* executor);
+ void cancel();
/**
* Returns the number of received votes. Only valid to call after
diff --git a/src/mongo/db/repl/election_winner_declarer.cpp b/src/mongo/db/repl/election_winner_declarer.cpp
index 9cb43a4c54b..6f2ed66e8a6 100644
--- a/src/mongo/db/repl/election_winner_declarer.cpp
+++ b/src/mongo/db/repl/election_winner_declarer.cpp
@@ -105,13 +105,13 @@ StatusWith<ReplicationExecutor::EventHandle> ElectionWinnerDeclarer::start(
const std::vector<HostAndPort>& targets,
const stdx::function<void()>& onCompletion) {
_algorithm.reset(new Algorithm(setName, winnerId, term, targets));
- _runner.reset(new ScatterGatherRunner(_algorithm.get()));
- return _runner->start(executor, onCompletion);
+ _runner.reset(new ScatterGatherRunner(_algorithm.get(), executor));
+ return _runner->start();
}
-void ElectionWinnerDeclarer::cancel(ReplicationExecutor* executor) {
+void ElectionWinnerDeclarer::cancel() {
_isCanceled = true;
- _runner->cancel(executor);
+ _runner->cancel();
}
Status ElectionWinnerDeclarer::getStatus() const {
diff --git a/src/mongo/db/repl/election_winner_declarer.h b/src/mongo/db/repl/election_winner_declarer.h
index c73f2ca3419..346fc11d252 100644
--- a/src/mongo/db/repl/election_winner_declarer.h
+++ b/src/mongo/db/repl/election_winner_declarer.h
@@ -90,8 +90,6 @@ public:
* in currentConfig, with the intention of alerting them of a new primary.
*
* evh can be used to schedule a callback when the process is complete.
- * This function must be run in the executor, as it must be synchronous with the command
- * callbacks that it schedules.
* If this function returns Status::OK(), evh is then guaranteed to be signaled.
**/
StatusWith<ReplicationExecutor::EventHandle> start(
@@ -103,12 +101,9 @@ public:
const stdx::function<void()>& onCompletion = stdx::function<void()>());
/**
- * Informs the ElectionWinnerDeclarer to cancel further processing. The "executor"
- * argument must point to the same executor passed to "start()".
- *
- * Like start(), this method must run in the executor context.
+ * Informs the ElectionWinnerDeclarer to cancel further processing.
*/
- void cancel(ReplicationExecutor* executor);
+ void cancel();
/**
* Returns a Status from the ElectionWinnerDeclarer::algorithm which indicates what
diff --git a/src/mongo/db/repl/freshness_checker.cpp b/src/mongo/db/repl/freshness_checker.cpp
index 71b7b2e419b..f5bd3963e63 100644
--- a/src/mongo/db/repl/freshness_checker.cpp
+++ b/src/mongo/db/repl/freshness_checker.cpp
@@ -208,17 +208,16 @@ StatusWith<ReplicationExecutor::EventHandle> FreshnessChecker::start(
const Timestamp& lastOpTimeApplied,
const ReplicaSetConfig& currentConfig,
int selfIndex,
- const std::vector<HostAndPort>& targets,
- const stdx::function<void()>& onCompletion) {
+ const std::vector<HostAndPort>& targets) {
_originalConfigVersion = currentConfig.getConfigVersion();
_algorithm.reset(new Algorithm(lastOpTimeApplied, currentConfig, selfIndex, targets));
- _runner.reset(new ScatterGatherRunner(_algorithm.get()));
- return _runner->start(executor, onCompletion);
+ _runner.reset(new ScatterGatherRunner(_algorithm.get(), executor));
+ return _runner->start();
}
-void FreshnessChecker::cancel(ReplicationExecutor* executor) {
+void FreshnessChecker::cancel() {
_isCanceled = true;
- _runner->cancel(executor);
+ _runner->cancel();
}
} // namespace repl
diff --git a/src/mongo/db/repl/freshness_checker.h b/src/mongo/db/repl/freshness_checker.h
index 43dd77eee69..6da248a203e 100644
--- a/src/mongo/db/repl/freshness_checker.h
+++ b/src/mongo/db/repl/freshness_checker.h
@@ -116,25 +116,18 @@ public:
* in currentConfig, with the intention of determining whether the current node
* is freshest.
* evh can be used to schedule a callback when the process is complete.
- * This function must be run in the executor, as it must be synchronous with the command
- * callbacks that it schedules.
* If this function returns Status::OK(), evh is then guaranteed to be signaled.
**/
- StatusWith<ReplicationExecutor::EventHandle> start(
- ReplicationExecutor* executor,
- const Timestamp& lastOpTimeApplied,
- const ReplicaSetConfig& currentConfig,
- int selfIndex,
- const std::vector<HostAndPort>& targets,
- const stdx::function<void()>& onCompletion = stdx::function<void()>());
+ StatusWith<ReplicationExecutor::EventHandle> start(ReplicationExecutor* executor,
+ const Timestamp& lastOpTimeApplied,
+ const ReplicaSetConfig& currentConfig,
+ int selfIndex,
+ const std::vector<HostAndPort>& targets);
/**
- * Informs the freshness checker to cancel further processing. The "executor"
- * argument must point to the same executor passed to "start()".
- *
- * Like start(), this method must run in the executor context.
+ * Informs the freshness checker to cancel further processing.
*/
- void cancel(ReplicationExecutor* executor);
+ void cancel();
/**
* Returns true if cancel() was called on this instance.
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 068bb49ef51..a387b98e863 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -80,6 +80,8 @@
#include "mongo/util/time_support.h"
#include "mongo/util/timer.h"
+#include "mongo/util/stacktrace.h"
+
namespace mongo {
namespace repl {
@@ -90,6 +92,9 @@ using EventHandle = executor::TaskExecutor::EventHandle;
namespace {
+Status shutDownInProgressStatus(ErrorCodes::ShutdownInProgress,
+ "replication system is shutting down");
+
void lockAndCall(stdx::unique_lock<stdx::mutex>* lk, const stdx::function<void()>& fn) {
if (!lk->owns_lock()) {
lk->lock();
@@ -352,18 +357,13 @@ void ReplicationCoordinatorImpl::appendConnectionStats(executor::ConnectionPoolS
_replExecutor.appendConnectionStats(stats);
}
-void ReplicationCoordinatorImpl::_updateLastVote(const LastVote& lastVote) {
- _topCoord->loadLastVote(lastVote);
-}
-
bool ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* txn) {
StatusWith<LastVote> lastVote = _externalState->loadLocalLastVoteDocument(txn);
if (!lastVote.isOK()) {
log() << "Did not find local voted for document at startup; " << lastVote.getStatus();
} else {
- LastVote vote = lastVote.getValue();
- _replExecutor.scheduleWork(
- stdx::bind(&ReplicationCoordinatorImpl::_updateLastVote, this, vote));
+ LockGuard topoLock(_topoMutex);
+ _topCoord->loadLastVote(lastVote.getValue());
}
StatusWith<BSONObj> cfg = _externalState->loadLocalConfigDocument(txn);
@@ -413,6 +413,8 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig(
return;
}
+ LockGuard topoLock(_topoMutex);
+
StatusWith<int> myIndex =
validateConfigForStartUp(_externalState.get(), _rsConfig, localConfig);
if (!myIndex.isOK()) {
@@ -467,7 +469,7 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig(
stdx::unique_lock<stdx::mutex> lock(_mutex);
invariant(_rsConfigState == kConfigStartingUp);
const PostMemberStateUpdateAction action =
- _setCurrentRSConfig_inlock(cbData, localConfig, myIndex.getValue());
+ _setCurrentRSConfig_inlock(localConfig, myIndex.getValue());
_setMyLastAppliedOpTime_inlock(lastOpTime, false);
_setMyLastDurableOpTime_inlock(lastOpTime, false);
_reportUpstream_inlock(std::move(lock));
@@ -628,8 +630,8 @@ Seconds ReplicationCoordinatorImpl::getSlaveDelaySecs() const {
}
void ReplicationCoordinatorImpl::clearSyncSourceBlacklist() {
- auto work = [this](const CallbackArgs&) { _topCoord->clearSyncSourceBlacklist(); };
- _scheduleWorkAndWaitForCompletion(work);
+ LockGuard topoLock(_topoMutex);
+ _topCoord->clearSyncSourceBlacklist();
}
ReplicationExecutor::EventHandle ReplicationCoordinatorImpl::setFollowerMode_nonBlocking(
@@ -640,12 +642,7 @@ ReplicationExecutor::EventHandle ReplicationCoordinatorImpl::setFollowerMode_non
return ReplicationExecutor::EventHandle();
}
fassert(18812, finishedSettingFollowerMode.getStatus());
- _scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_setFollowerModeFinish,
- this,
- stdx::placeholders::_1,
- newState,
- finishedSettingFollowerMode.getValue(),
- success));
+ _setFollowerModeFinish(newState, finishedSettingFollowerMode.getValue(), success);
return finishedSettingFollowerMode.getValue();
}
@@ -658,13 +655,11 @@ bool ReplicationCoordinatorImpl::setFollowerMode(const MemberState& newState) {
}
void ReplicationCoordinatorImpl::_setFollowerModeFinish(
- const ReplicationExecutor::CallbackArgs& cbData,
const MemberState& newState,
const ReplicationExecutor::EventHandle& finishedSettingFollowerMode,
bool* success) {
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- return;
- }
+ LockGuard topoLock(_topoMutex);
+
if (newState == _topCoord->getMemberState()) {
*success = true;
_replExecutor.signalEvent(finishedSettingFollowerMode);
@@ -683,21 +678,21 @@ void ReplicationCoordinatorImpl::_setFollowerModeFinish(
// finish setting the follower mode.
if (isV1ElectionProtocol()) {
invariant(_voteRequester);
- _voteRequester->cancel(&_replExecutor);
+ _voteRequester->cancel();
} else {
invariant(_freshnessChecker);
- _freshnessChecker->cancel(&_replExecutor);
+ _freshnessChecker->cancel();
if (_electCmdRunner) {
- _electCmdRunner->cancel(&_replExecutor);
+ _electCmdRunner->cancel();
}
}
- _replExecutor.onEvent(_electionFinishedEvent,
- stdx::bind(&ReplicationCoordinatorImpl::_setFollowerModeFinish,
+ _replExecutor.onEvent(
+ _electionFinishedEvent,
+ _wrapAsCallbackFn(stdx::bind(&ReplicationCoordinatorImpl::_setFollowerModeFinish,
this,
- stdx::placeholders::_1,
newState,
finishedSettingFollowerMode,
- success));
+ success)));
return;
}
@@ -945,8 +940,8 @@ Status ReplicationCoordinatorImpl::setLastOptimeForSlave(const OID& rid, const T
}
void ReplicationCoordinatorImpl::setMyHeartbeatMessage(const std::string& msg) {
- _scheduleWorkAndWaitForCompletion(stdx::bind(
- &TopologyCoordinator::setMyHeartbeatMessage, _topCoord.get(), _replExecutor.now(), msg));
+ LockGuard topoLock(_topoMutex);
+ _topCoord->setMyHeartbeatMessage(_replExecutor.now(), msg);
}
void ReplicationCoordinatorImpl::setMyLastAppliedOpTimeForward(const OpTime& opTime) {
@@ -1313,47 +1308,57 @@ Status ReplicationCoordinatorImpl::_setLastOptime_inlock(const UpdatePositionArg
}
void ReplicationCoordinatorImpl::interrupt(unsigned opId) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- // Wake ops waiting for a new committed snapshot.
- _currentCommittedSnapshotCond.notify_all();
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ // Wake ops waiting for a new committed snapshot.
+ _currentCommittedSnapshotCond.notify_all();
- for (std::vector<WaiterInfo*>::iterator it = _replicationWaiterList.begin();
- it != _replicationWaiterList.end();
- ++it) {
- WaiterInfo* info = *it;
- if (info->opID == opId) {
- info->condVar->notify_all();
- return;
+ for (std::vector<WaiterInfo*>::iterator it = _replicationWaiterList.begin();
+ it != _replicationWaiterList.end();
+ ++it) {
+ WaiterInfo* info = *it;
+ if (info->opID == opId) {
+ info->condVar->notify_all();
+ return;
+ }
}
- }
- for (auto& opTimeWaiter : _opTimeWaiterList) {
- if (opTimeWaiter->opID == opId) {
- opTimeWaiter->condVar->notify_all();
- return;
+ for (auto& opTimeWaiter : _opTimeWaiterList) {
+ if (opTimeWaiter->opID == opId) {
+ opTimeWaiter->condVar->notify_all();
+ return;
+ }
}
}
- _scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_signalStepDownWaiters, this));
+ {
+ LockGuard topoLock(_topoMutex);
+ _signalStepDownWaiters();
+ }
}
void ReplicationCoordinatorImpl::interruptAll() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- // Wake ops waiting for a new committed snapshot.
- _currentCommittedSnapshotCond.notify_all();
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ // Wake ops waiting for a new committed snapshot.
+ _currentCommittedSnapshotCond.notify_all();
- for (std::vector<WaiterInfo*>::iterator it = _replicationWaiterList.begin();
- it != _replicationWaiterList.end();
- ++it) {
- WaiterInfo* info = *it;
- info->condVar->notify_all();
- }
+ for (std::vector<WaiterInfo*>::iterator it = _replicationWaiterList.begin();
+ it != _replicationWaiterList.end();
+ ++it) {
+ WaiterInfo* info = *it;
+ info->condVar->notify_all();
+ }
- for (auto& opTimeWaiter : _opTimeWaiterList) {
- opTimeWaiter->condVar->notify_all();
+ for (auto& opTimeWaiter : _opTimeWaiterList) {
+ opTimeWaiter->condVar->notify_all();
+ }
}
- _scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_signalStepDownWaiters, this));
+ {
+ LockGuard topoLock(_topoMutex);
+ _signalStepDownWaiters();
+ }
}
bool ReplicationCoordinatorImpl::_doneWaitingForReplication_inlock(
@@ -1611,24 +1616,20 @@ ReplicationCoordinatorImpl::stepDown_nonBlocking(OperationContext* txn,
return StepDownNonBlockingResult();
}
fassert(26000, finishedEvent.getStatus());
- CBHStatus cbh =
- _replExecutor.scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_stepDownContinue,
- this,
- stdx::placeholders::_1,
- finishedEvent.getValue(),
- txn,
- waitUntil,
- stepDownUntil,
- force,
- true, // restartHeartbeats
- result));
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- *result = cbh.getStatus();
- return StepDownNonBlockingResult();
- }
- fassert(18809, cbh.getStatus());
- _scheduleWorkAt(waitUntil,
- stdx::bind(&ReplicationCoordinatorImpl::_signalStepDownWaiters, this));
+ _stepDownContinue(finishedEvent.getValue(),
+ txn,
+ waitUntil,
+ stepDownUntil,
+ force,
+ true, // restartHeartbeats
+ result);
+
+ auto signalStepDownWaitersInLock = [this](const CallbackArgs&) {
+ LockGuard topoLock(_topoMutex);
+ _signalStepDownWaiters();
+ };
+
+ _scheduleWorkAt(waitUntil, signalStepDownWaitersInLock);
return std::make_pair(std::move(globalReadLock), finishedEvent.getValue());
}
@@ -1655,7 +1656,6 @@ void ReplicationCoordinatorImpl::_signalStepDownWaiters() {
}
void ReplicationCoordinatorImpl::_stepDownContinue(
- const ReplicationExecutor::CallbackArgs& cbData,
const ReplicationExecutor::EventHandle finishedEvent,
OperationContext* txn,
const Date_t waitUntil,
@@ -1663,18 +1663,10 @@ void ReplicationCoordinatorImpl::_stepDownContinue(
bool force,
bool restartHeartbeats,
Status* result) {
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- // Cancelation only occurs on shutdown, which will also handle signaling the event.
- *result = Status(ErrorCodes::ShutdownInProgress, "Shutting down replication");
- return;
- }
+ LockGuard topoLock(_topoMutex);
ScopeGuard allFinishedGuard =
MakeGuard(stdx::bind(&ReplicationExecutor::signalEvent, &_replExecutor, finishedEvent));
- if (!cbData.status.isOK()) {
- *result = cbData.status;
- return;
- }
Status interruptedStatus = txn->checkForInterruptNoAssert();
if (!interruptedStatus.isOK()) {
@@ -1698,10 +1690,10 @@ void ReplicationCoordinatorImpl::_stepDownContinue(
bool forceNow = now >= waitUntil ? force : false;
if (_topCoord->stepDown(stepDownUntil, forceNow, getMyLastAppliedOpTime())) {
// Schedule work to (potentially) step back up once the stepdown period has ended.
- _replExecutor.scheduleWorkAt(stepDownUntil,
- stdx::bind(&ReplicationCoordinatorImpl::_handleTimePassing,
- this,
- stdx::placeholders::_1));
+ _scheduleWorkAt(stepDownUntil,
+ stdx::bind(&ReplicationCoordinatorImpl::_handleTimePassing,
+ this,
+ stdx::placeholders::_1));
stdx::unique_lock<stdx::mutex> lk(_mutex);
const PostMemberStateUpdateAction action =
@@ -1732,7 +1724,6 @@ void ReplicationCoordinatorImpl::_stepDownContinue(
CBHStatus cbh = _replExecutor.onEvent(_stepDownWaiters.back(),
stdx::bind(&ReplicationCoordinatorImpl::_stepDownContinue,
this,
- stdx::placeholders::_1,
finishedEvent,
txn,
waitUntil,
@@ -1754,7 +1745,7 @@ void ReplicationCoordinatorImpl::_stepDownContinue(
return;
}
stdx::lock_guard<stdx::mutex> lk(_mutex);
- _restartHeartbeats_inlock(cbData);
+ _restartHeartbeats_inlock();
}
void ReplicationCoordinatorImpl::_handleTimePassing(
@@ -1763,6 +1754,7 @@ void ReplicationCoordinatorImpl::_handleTimePassing(
return;
}
+ LockGuard topoLock(_topoMutex);
if (_topCoord->becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(_replExecutor.now())) {
_performPostMemberStateUpdateAction(kActionWinElection);
}
@@ -1951,38 +1943,30 @@ StatusWith<BSONObj> ReplicationCoordinatorImpl::prepareReplSetUpdatePositionComm
}
Status ReplicationCoordinatorImpl::processReplSetGetStatus(BSONObjBuilder* response) {
+ LockGuard topoLock(_topoMutex);
+
Status result(ErrorCodes::InternalError, "didn't set status in prepareStatusResponse");
- _scheduleWorkAndWaitForCompletion(
- stdx::bind(&TopologyCoordinator::prepareStatusResponse,
- _topCoord.get(),
- stdx::placeholders::_1,
- TopologyCoordinator::ReplSetStatusArgs{
- _replExecutor.now(),
- static_cast<unsigned>(time(0) - serverGlobalParams.started),
- getMyLastAppliedOpTime(),
- getMyLastDurableOpTime(),
- getLastCommittedOpTime(),
- getCurrentCommittedSnapshotOpTime()},
- response,
- &result));
+ _topCoord->prepareStatusResponse(
+ TopologyCoordinator::ReplSetStatusArgs{
+ _replExecutor.now(),
+ static_cast<unsigned>(time(0) - serverGlobalParams.started),
+ getMyLastAppliedOpTime(),
+ getMyLastDurableOpTime(),
+ getLastCommittedOpTime(),
+ getCurrentCommittedSnapshotOpTime()},
+ response,
+ &result);
return result;
}
void ReplicationCoordinatorImpl::fillIsMasterForReplSet(IsMasterResponse* response) {
invariant(getSettings().usingReplSets());
- CBHStatus cbh = _replExecutor.scheduleWork(
- stdx::bind(&ReplicationCoordinatorImpl::_fillIsMasterForReplSet_finish,
- this,
- stdx::placeholders::_1,
- response));
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- response->markAsShutdownInProgress();
- return;
+ {
+ LockGuard topoLock(_topoMutex);
+ _topCoord->fillIsMasterForReplSet(response);
}
- fassert(28602, cbh.getStatus());
- _replExecutor.wait(cbh.getValue());
if (isWaitingForApplierToDrain()) {
// Report that we are secondary to ismaster callers until drain completes.
response->setIsMaster(false);
@@ -1990,15 +1974,6 @@ void ReplicationCoordinatorImpl::fillIsMasterForReplSet(IsMasterResponse* respon
}
}
-void ReplicationCoordinatorImpl::_fillIsMasterForReplSet_finish(
- const ReplicationExecutor::CallbackArgs& cbData, IsMasterResponse* response) {
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- response->markAsShutdownInProgress();
- return;
- }
- _topCoord->fillIsMasterForReplSet(response);
-}
-
void ReplicationCoordinatorImpl::appendSlaveInfoData(BSONObjBuilder* result) {
stdx::lock_guard<stdx::mutex> lock(_mutex);
BSONArrayBuilder replicationProgress(result->subarrayStart("replicationProgress"));
@@ -2039,10 +2014,13 @@ void ReplicationCoordinatorImpl::processReplSetGetConfig(BSONObjBuilder* result)
void ReplicationCoordinatorImpl::processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) {
EventHandle evh;
- _scheduleWorkAndWaitForCompletion([this, &evh, &replMetadata](const CallbackArgs& args) {
+
+ {
+ LockGuard topoLock(_topoMutex);
evh = _processReplSetMetadata_incallback(replMetadata);
- });
- if (evh.isValid()) {
+ }
+
+ if (evh) {
_replExecutor.waitForEvent(evh);
}
}
@@ -2062,26 +2040,8 @@ EventHandle ReplicationCoordinatorImpl::_processReplSetMetadata_incallback(
}
bool ReplicationCoordinatorImpl::getMaintenanceMode() {
- bool maintenanceMode(false);
- CBHStatus cbh = _replExecutor.scheduleWork(
- stdx::bind(&ReplicationCoordinatorImpl::_getMaintenanceMode_helper,
- this,
- stdx::placeholders::_1,
- &maintenanceMode));
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return false;
- }
- fassert(18811, cbh.getStatus());
- _replExecutor.wait(cbh.getValue());
- return maintenanceMode;
-}
-
-void ReplicationCoordinatorImpl::_getMaintenanceMode_helper(
- const ReplicationExecutor::CallbackArgs& cbData, bool* maintenanceMode) {
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- return;
- }
- *maintenanceMode = _topCoord->getMaintenanceCount() > 0;
+ LockGuard topoLock(_topoMutex);
+ return _topCoord->getMaintenanceCount() > 0;
}
Status ReplicationCoordinatorImpl::setMaintenanceMode(bool activate) {
@@ -2090,37 +2050,15 @@ Status ReplicationCoordinatorImpl::setMaintenanceMode(bool activate) {
"can only set maintenance mode on replica set members");
}
- Status result(ErrorCodes::InternalError, "didn't set status in _setMaintenanceMode_helper");
- CBHStatus cbh = _replExecutor.scheduleWork(
- stdx::bind(&ReplicationCoordinatorImpl::_setMaintenanceMode_helper,
- this,
- stdx::placeholders::_1,
- activate,
- &result));
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return cbh.getStatus();
- }
- fassert(18698, cbh.getStatus());
- _replExecutor.wait(cbh.getValue());
- return result;
-}
-
-void ReplicationCoordinatorImpl::_setMaintenanceMode_helper(
- const ReplicationExecutor::CallbackArgs& cbData, bool activate, Status* result) {
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- *result = Status(ErrorCodes::ShutdownInProgress, "replication system is shutting down");
- return;
- }
-
+ Status result(ErrorCodes::InternalError, "didn't set status");
+ LockGuard topoLock(_topoMutex);
if (_topCoord->getRole() == TopologyCoordinator::Role::candidate) {
- *result = Status(ErrorCodes::NotSecondary, "currently running for election");
- return;
+ return Status(ErrorCodes::NotSecondary, "currently running for election");
}
stdx::unique_lock<stdx::mutex> lk(_mutex);
if (_getMemberState_inlock().primary()) {
- *result = Status(ErrorCodes::NotSecondary, "primaries can't modify maintenance mode");
- return;
+ return Status(ErrorCodes::NotSecondary, "primaries can't modify maintenance mode");
}
int curMaintenanceCalls = _topCoord->getMaintenanceCount();
@@ -2137,63 +2075,28 @@ void ReplicationCoordinatorImpl::_setMaintenanceMode_helper(
<< " other maintenance mode tasks ongoing)" << rsLog;
} else {
warning() << "Attempted to leave maintenance mode but it is not currently active";
- *result = Status(ErrorCodes::OperationFailed, "already out of maintenance mode");
- return;
+ return Status(ErrorCodes::OperationFailed, "already out of maintenance mode");
}
const PostMemberStateUpdateAction action = _updateMemberStateFromTopologyCoordinator_inlock();
- *result = Status::OK();
lk.unlock();
_performPostMemberStateUpdateAction(action);
+ return Status::OK();
}
Status ReplicationCoordinatorImpl::processReplSetSyncFrom(const HostAndPort& target,
BSONObjBuilder* resultObj) {
Status result(ErrorCodes::InternalError, "didn't set status in prepareSyncFromResponse");
- CBHStatus cbh =
- _replExecutor.scheduleWork(stdx::bind(&TopologyCoordinator::prepareSyncFromResponse,
- _topCoord.get(),
- stdx::placeholders::_1,
- target,
- _getMyLastAppliedOpTime_inlock(),
- resultObj,
- &result));
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return Status(ErrorCodes::ShutdownInProgress, "replication shutdown in progress");
- }
- fassert(18649, cbh.getStatus());
- _replExecutor.wait(cbh.getValue());
+ LockGuard topoLock(_topoMutex);
+ LockGuard lk(_mutex);
+ auto opTime = _getMyLastAppliedOpTime_inlock();
+ _topCoord->prepareSyncFromResponse(target, opTime, resultObj, &result);
return result;
}
Status ReplicationCoordinatorImpl::processReplSetFreeze(int secs, BSONObjBuilder* resultObj) {
- Status result(ErrorCodes::InternalError, "didn't set status in prepareFreezeResponse");
- CBHStatus cbh = _replExecutor.scheduleWork(
- stdx::bind(&ReplicationCoordinatorImpl::_processReplSetFreeze_finish,
- this,
- stdx::placeholders::_1,
- secs,
- resultObj,
- &result));
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return cbh.getStatus();
- }
- fassert(18641, cbh.getStatus());
- _replExecutor.wait(cbh.getValue());
- return result;
-}
-
-void ReplicationCoordinatorImpl::_processReplSetFreeze_finish(
- const ReplicationExecutor::CallbackArgs& cbData,
- int secs,
- BSONObjBuilder* response,
- Status* result) {
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- *result = Status(ErrorCodes::ShutdownInProgress, "replication system is shutting down");
- return;
- }
-
- _topCoord->prepareFreezeResponse(_replExecutor.now(), secs, response);
+ LockGuard topoLock(_topoMutex);
+ _topCoord->prepareFreezeResponse(_replExecutor.now(), secs, resultObj);
if (_topCoord->getRole() == TopologyCoordinator::Role::candidate) {
// If we just unfroze and ended our stepdown period and we are a one node replica set,
@@ -2201,7 +2104,8 @@ void ReplicationCoordinatorImpl::_processReplSetFreeze_finish(
// need to elect ourself.
_performPostMemberStateUpdateAction(kActionWinElection);
}
- *result = Status::OK();
+ return Status::OK();
+ ;
}
Status ReplicationCoordinatorImpl::processHeartbeat(const ReplSetHeartbeatArgs& args,
@@ -2214,44 +2118,17 @@ Status ReplicationCoordinatorImpl::processHeartbeat(const ReplSetHeartbeatArgs&
}
}
- Status result(ErrorCodes::InternalError, "didn't set status in prepareHeartbeatResponse");
- CBHStatus cbh =
- _replExecutor.scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_processHeartbeatFinish,
- this,
- stdx::placeholders::_1,
- args,
- response,
- &result));
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return Status(ErrorCodes::ShutdownInProgress, "replication shutdown in progress");
- }
- fassert(18508, cbh.getStatus());
- _replExecutor.wait(cbh.getValue());
- return result;
-}
-
-void ReplicationCoordinatorImpl::_processHeartbeatFinish(
- const ReplicationExecutor::CallbackArgs& cbData,
- const ReplSetHeartbeatArgs& args,
- ReplSetHeartbeatResponse* response,
- Status* outStatus) {
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- *outStatus = Status(ErrorCodes::ShutdownInProgress, "Replication shutdown in progress");
- return;
- }
- fassert(18910, cbData.status);
-
auto senderHost(args.getSenderHost());
+ LockGuard topoLock(_topoMutex);
const Date_t now = _replExecutor.now();
- *outStatus = _topCoord->prepareHeartbeatResponse(now,
- args,
- _settings.ourSetName(),
- getMyLastAppliedOpTime(),
- getMyLastDurableOpTime(),
- response);
- if ((outStatus->isOK() || *outStatus == ErrorCodes::InvalidReplicaSetConfig) &&
- _selfIndex < 0) {
+ Status result = _topCoord->prepareHeartbeatResponse(now,
+ args,
+ _settings.ourSetName(),
+ getMyLastAppliedOpTime(),
+ getMyLastDurableOpTime(),
+ response);
+ if ((result.isOK() || result == ErrorCodes::InvalidReplicaSetConfig) && _selfIndex < 0) {
// If this node does not belong to the configuration it knows about, send heartbeats
// back to any node that sends us a heartbeat, in case one of those remote nodes has
// a configuration that contains us. Chances are excellent that it will, since that
@@ -2259,7 +2136,7 @@ void ReplicationCoordinatorImpl::_processHeartbeatFinish(
if (!senderHost.empty() && _seedList.insert(senderHost).second) {
_scheduleHeartbeatToTarget(senderHost, -1, now);
}
- } else if (outStatus->isOK() && response->getConfigVersion() < args.getConfigVersion()) {
+ } else if (result.isOK() && response->getConfigVersion() < args.getConfigVersion()) {
// Schedule a heartbeat to the sender to fetch the new config.
// We cannot cancel the enqueued heartbeat, but either this one or the enqueued heartbeat
// will trigger reconfig, which cancels and reschedules all heartbeats.
@@ -2269,6 +2146,7 @@ void ReplicationCoordinatorImpl::_processHeartbeatFinish(
_scheduleHeartbeatToTarget(senderHost, senderIndex, now);
}
}
+ return result;
}
Status ReplicationCoordinatorImpl::processReplSetReconfig(OperationContext* txn,
@@ -2377,14 +2255,20 @@ Status ReplicationCoordinatorImpl::processReplSetReconfig(OperationContext* txn,
// If it's a force reconfig, the primary node may not be electable after the configuration
// change. In case we are that primary node, finish the reconfig under the global lock,
// so that the step down occurs safely.
- CBHStatus cbh = args.force ? _replExecutor.scheduleWorkWithGlobalExclusiveLock(reconfigFinishFn)
- : _replExecutor.scheduleWork(reconfigFinishFn);
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return status;
+ CBHStatus cbhStatus(ErrorCodes::InternalError, "reconfigFinishFn hasn't been scheduled");
+ if (args.force) {
+ cbhStatus = _replExecutor.scheduleWorkWithGlobalExclusiveLock(reconfigFinishFn);
+ } else {
+ cbhStatus = _replExecutor.scheduleWork(reconfigFinishFn);
+ }
+ if (cbhStatus.getStatus() == ErrorCodes::ShutdownInProgress) {
+ return cbhStatus.getStatus();
}
- fassert(18824, cbh.getStatus());
+
+ fassert(18824, cbhStatus.getStatus());
+
configStateGuard.Dismiss();
- _replExecutor.wait(cbh.getValue());
+ _replExecutor.wait(cbhStatus.getValue());
return Status::OK();
}
@@ -2392,12 +2276,37 @@ void ReplicationCoordinatorImpl::_finishReplSetReconfig(
const ReplicationExecutor::CallbackArgs& cbData,
const ReplicaSetConfig& newConfig,
int myIndex) {
+ LockGuard topoLock(_topoMutex);
+
stdx::unique_lock<stdx::mutex> lk(_mutex);
invariant(_rsConfigState == kConfigReconfiguring);
invariant(_rsConfig.isInitialized());
+
+ // Do not conduct an election during a reconfig, as the node may not be electable post-reconfig.
+ if (_topCoord->getRole() == TopologyCoordinator::Role::candidate) {
+ if (isV1ElectionProtocol()) {
+ invariant(_voteRequester);
+ _voteRequester->cancel();
+ } else {
+ invariant(_freshnessChecker);
+ _freshnessChecker->cancel();
+ if (_electCmdRunner) {
+ _electCmdRunner->cancel();
+ }
+ }
+ // Wait for the election to complete and the node's Role to be set to follower.
+ _replExecutor.onEvent(_electionFinishedEvent,
+ stdx::bind(&ReplicationCoordinatorImpl::_finishReplSetReconfig,
+ this,
+ stdx::placeholders::_1,
+ newConfig,
+ myIndex));
+ return;
+ }
+
+
const ReplicaSetConfig oldConfig = _rsConfig;
- const PostMemberStateUpdateAction action =
- _setCurrentRSConfig_inlock(cbData, newConfig, myIndex);
+ const PostMemberStateUpdateAction action = _setCurrentRSConfig_inlock(newConfig, myIndex);
// On a reconfig we drop all snapshots so we don't mistakenely read from the wrong one.
// For example, if we change the meaning of the "committed" snapshot from applied -> durable.
@@ -2497,18 +2406,11 @@ Status ReplicationCoordinatorImpl::processReplSetInitiate(OperationContext* txn,
// Since the JournalListener has not yet been set up, we must manually set our
// durableOpTime.
setMyLastDurableOpTime(getMyLastAppliedOpTime());
- CBHStatus cbh = _replExecutor.scheduleWork(
- stdx::bind(&ReplicationCoordinatorImpl::_finishReplSetInitiate,
- this,
- stdx::placeholders::_1,
- newConfig,
- myIndex.getValue()));
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return status;
+
+ {
+ LockGuard topoLock(_topoMutex);
+ _finishReplSetInitiate(newConfig, myIndex.getValue());
}
- configStateGuard.Dismiss();
- fassert(18654, cbh.getStatus());
- _replExecutor.wait(cbh.getValue());
// A configuration passed to replSetInitiate() with the current node as an arbiter
// will fail validation with a "replSet initiate got ... while validating" reason.
@@ -2517,19 +2419,17 @@ Status ReplicationCoordinatorImpl::processReplSetInitiate(OperationContext* txn,
_startDataReplication();
}
+ configStateGuard.Dismiss();
return Status::OK();
}
-void ReplicationCoordinatorImpl::_finishReplSetInitiate(
- const ReplicationExecutor::CallbackArgs& cbData,
- const ReplicaSetConfig& newConfig,
- int myIndex) {
+void ReplicationCoordinatorImpl::_finishReplSetInitiate(const ReplicaSetConfig& newConfig,
+ int myIndex) {
stdx::unique_lock<stdx::mutex> lk(_mutex);
invariant(_rsConfigState == kConfigInitiating);
invariant(!_rsConfig.isInitialized());
const ReplicaSetConfig oldConfig = _rsConfig;
- const PostMemberStateUpdateAction action =
- _setCurrentRSConfig_inlock(cbData, newConfig, myIndex);
+ const PostMemberStateUpdateAction action = _setCurrentRSConfig_inlock(newConfig, myIndex);
lk.unlock();
_resetElectionInfoOnProtocolVersionUpgrade(oldConfig, newConfig);
_performPostMemberStateUpdateAction(action);
@@ -2701,94 +2601,31 @@ void ReplicationCoordinatorImpl::incrementRollbackID() {
Status ReplicationCoordinatorImpl::processReplSetFresh(const ReplSetFreshArgs& args,
BSONObjBuilder* resultObj) {
+ LockGuard topoLock(_topoMutex);
Status result(ErrorCodes::InternalError, "didn't set status in prepareFreshResponse");
- CBHStatus cbh = _replExecutor.scheduleWork(
- stdx::bind(&ReplicationCoordinatorImpl::_processReplSetFresh_finish,
- this,
- stdx::placeholders::_1,
- args,
- resultObj,
- &result));
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return Status(ErrorCodes::ShutdownInProgress, "replication shutdown in progress");
- }
- fassert(18652, cbh.getStatus());
- _replExecutor.wait(cbh.getValue());
- return result;
-}
-
-void ReplicationCoordinatorImpl::_processReplSetFresh_finish(
- const ReplicationExecutor::CallbackArgs& cbData,
- const ReplSetFreshArgs& args,
- BSONObjBuilder* response,
- Status* result) {
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- *result = Status(ErrorCodes::ShutdownInProgress, "replication shutdown in progress");
- return;
- }
-
_topCoord->prepareFreshResponse(
- args, _replExecutor.now(), getMyLastAppliedOpTime(), response, result);
+ args, _replExecutor.now(), getMyLastAppliedOpTime(), resultObj, &result);
+ return result;
}
Status ReplicationCoordinatorImpl::processReplSetElect(const ReplSetElectArgs& args,
BSONObjBuilder* responseObj) {
+ LockGuard topoLock(_topoMutex);
Status result = Status(ErrorCodes::InternalError, "status not set by callback");
- CBHStatus cbh = _replExecutor.scheduleWork(
- stdx::bind(&ReplicationCoordinatorImpl::_processReplSetElect_finish,
- this,
- stdx::placeholders::_1,
- args,
- responseObj,
- &result));
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return Status(ErrorCodes::ShutdownInProgress, "replication shutdown in progress");
- }
- fassert(18657, cbh.getStatus());
- _replExecutor.wait(cbh.getValue());
- return result;
-}
-
-void ReplicationCoordinatorImpl::_processReplSetElect_finish(
- const ReplicationExecutor::CallbackArgs& cbData,
- const ReplSetElectArgs& args,
- BSONObjBuilder* response,
- Status* result) {
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- *result = Status(ErrorCodes::ShutdownInProgress, "replication shutdown in progress");
- return;
- }
-
_topCoord->prepareElectResponse(
- args, _replExecutor.now(), getMyLastAppliedOpTime(), response, result);
+ args, _replExecutor.now(), getMyLastAppliedOpTime(), responseObj, &result);
+ return result;
}
ReplicationCoordinatorImpl::PostMemberStateUpdateAction
-ReplicationCoordinatorImpl::_setCurrentRSConfig_inlock(
- const ReplicationExecutor::CallbackArgs& cbData,
- const ReplicaSetConfig& newConfig,
- int myIndex) {
+ReplicationCoordinatorImpl::_setCurrentRSConfig_inlock(const ReplicaSetConfig& newConfig,
+ int myIndex) {
invariant(_settings.usingReplSets());
_cancelHeartbeats_inlock();
_setConfigState_inlock(kConfigSteady);
// Must get this before changing our config.
OpTime myOptime = _getMyLastAppliedOpTime_inlock();
- // Do not conduct an election during a reconfig, as the node may not be electable post-reconfig.
- if (_topCoord->getRole() == TopologyCoordinator::Role::candidate) {
- if (isV1ElectionProtocol()) {
- invariant(_voteRequester);
- _voteRequester->cancel(&_replExecutor);
- } else {
- invariant(_freshnessChecker);
- _freshnessChecker->cancel(&_replExecutor);
- if (_electCmdRunner) {
- _electCmdRunner->cancel(&_replExecutor);
- }
- }
- // Wait for the election to complete and the node's Role to be set to follower.
- _replExecutor.waitForEvent(_electionFinishedEvent);
- }
_topCoord->updateConfig(newConfig, myIndex, _replExecutor.now(), myOptime);
_cachedTerm = _topCoord->getTerm();
const ReplicaSetConfig oldConfig = _rsConfig;
@@ -2811,7 +2648,7 @@ ReplicationCoordinatorImpl::_setCurrentRSConfig_inlock(
if (_selfIndex >= 0) {
// Don't send heartbeats if we're not in the config, if we get re-added one of the
// nodes in the set will contact us.
- _startHeartbeats_inlock(cbData);
+ _startHeartbeats_inlock();
}
_updateLastCommittedOpTime_inlock();
@@ -3028,81 +2865,41 @@ bool ReplicationCoordinatorImpl::isReplEnabled() const {
return getReplicationMode() != modeNone;
}
-void ReplicationCoordinatorImpl::_chooseNewSyncSource(
- const ReplicationExecutor::CallbackArgs& cbData,
- const Timestamp& lastTimestampFetched,
- HostAndPort* newSyncSource) {
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- return;
- }
+HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource(const Timestamp& lastTimestampFetched) {
+ LockGuard topoLock(_topoMutex);
HostAndPort oldSyncSource = _topCoord->getSyncSourceAddress();
-
- *newSyncSource = _topCoord->chooseNewSyncSource(_replExecutor.now(), lastTimestampFetched);
+ HostAndPort newSyncSource =
+ _topCoord->chooseNewSyncSource(_replExecutor.now(), lastTimestampFetched);
stdx::lock_guard<stdx::mutex> lock(_mutex);
// If we lost our sync source, schedule new heartbeats immediately to update our knowledge
// of other members's state, allowing us to make informed sync source decisions.
- if (newSyncSource->empty() && !oldSyncSource.empty() && _selfIndex >= 0 &&
+ if (newSyncSource.empty() && !oldSyncSource.empty() && _selfIndex >= 0 &&
!_getMemberState_inlock().primary()) {
- _restartHeartbeats_inlock(cbData);
+ _restartHeartbeats_inlock();
}
-}
-HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource(const Timestamp& lastTimestampFetched) {
- HostAndPort newSyncSource;
- CBHStatus cbh =
- _replExecutor.scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_chooseNewSyncSource,
- this,
- stdx::placeholders::_1,
- lastTimestampFetched,
- &newSyncSource));
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return newSyncSource; // empty
- }
- fassert(18740, cbh.getStatus());
- _replExecutor.wait(cbh.getValue());
return newSyncSource;
}
-void ReplicationCoordinatorImpl::_blacklistSyncSource(
- const ReplicationExecutor::CallbackArgs& cbData, const HostAndPort& host, Date_t until) {
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- return;
- }
- _topCoord->blacklistSyncSource(host, until);
-
- CBHStatus cbh =
- _replExecutor.scheduleWorkAt(until,
- stdx::bind(&ReplicationCoordinatorImpl::_unblacklistSyncSource,
- this,
- stdx::placeholders::_1,
- host));
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return;
- }
- fassert(28610, cbh.getStatus());
-}
-
void ReplicationCoordinatorImpl::_unblacklistSyncSource(
const ReplicationExecutor::CallbackArgs& cbData, const HostAndPort& host) {
if (cbData.status == ErrorCodes::CallbackCanceled)
return;
+
+ LockGuard topoLock(_topoMutex);
_topCoord->unblacklistSyncSource(host, _replExecutor.now());
}
void ReplicationCoordinatorImpl::blacklistSyncSource(const HostAndPort& host, Date_t until) {
- CBHStatus cbh =
- _replExecutor.scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_blacklistSyncSource,
- this,
- stdx::placeholders::_1,
- host,
- until));
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return;
- }
- fassert(18741, cbh.getStatus());
- _replExecutor.wait(cbh.getValue());
+ LockGuard topoLock(_topoMutex);
+ _topCoord->blacklistSyncSource(host, until);
+ _scheduleWorkAt(until,
+ stdx::bind(&ReplicationCoordinatorImpl::_unblacklistSyncSource,
+ this,
+ stdx::placeholders::_1,
+ host));
}
void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* txn) {
@@ -3124,41 +2921,15 @@ void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* txn
_externalState->setGlobalTimestamp(lastOpTime.getTimestamp());
}
-void ReplicationCoordinatorImpl::_shouldChangeSyncSource(
- const ReplicationExecutor::CallbackArgs& cbData,
- const HostAndPort& currentSource,
- const OpTime& syncSourceLastOpTime,
- bool syncSourceHasSyncSource,
- bool* shouldChange) {
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- return;
- }
-
- *shouldChange = _topCoord->shouldChangeSyncSource(currentSource,
- getMyLastAppliedOpTime(),
- syncSourceLastOpTime,
- syncSourceHasSyncSource,
- _replExecutor.now());
-}
-
bool ReplicationCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentSource,
const OpTime& syncSourceLastOpTime,
bool syncSourceHasSyncSource) {
- bool shouldChange(false);
- CBHStatus cbh =
- _replExecutor.scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_shouldChangeSyncSource,
- this,
- stdx::placeholders::_1,
- currentSource,
- syncSourceLastOpTime,
- syncSourceHasSyncSource,
- &shouldChange));
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return false;
- }
- fassert(18906, cbh.getStatus());
- _replExecutor.wait(cbh.getValue());
- return shouldChange;
+ LockGuard topoLock(_topoMutex);
+ return _topCoord->shouldChangeSyncSource(currentSource,
+ getMyLastAppliedOpTime(),
+ syncSourceLastOpTime,
+ syncSourceHasSyncSource,
+ _replExecutor.now());
}
SyncSourceResolverResponse ReplicationCoordinatorImpl::selectSyncSource(
@@ -3351,18 +3122,12 @@ Status ReplicationCoordinatorImpl::processReplSetRequestVotes(
if (!termStatus.isOK() && termStatus.code() != ErrorCodes::StaleTerm)
return termStatus;
- Status result{ErrorCodes::InternalError, "didn't set status in processReplSetRequestVotes"};
- CBHStatus cbh = _replExecutor.scheduleWork(
- stdx::bind(&ReplicationCoordinatorImpl::_processReplSetRequestVotes_finish,
- this,
- stdx::placeholders::_1,
- args,
- response,
- &result));
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return cbh.getStatus();
+ {
+ LockGuard topoLock(_topoMutex);
+ LockGuard lk(_mutex);
+ _topCoord->processReplSetRequestVotes(args, response, _getMyLastAppliedOpTime_inlock());
}
- _replExecutor.wait(cbh.getValue());
+
if (response->getVoteGranted()) {
LastVote lastVote;
lastVote.setTerm(args.getTerm());
@@ -3374,22 +3139,7 @@ Status ReplicationCoordinatorImpl::processReplSetRequestVotes(
return status;
}
}
- return result;
-}
-
-void ReplicationCoordinatorImpl::_processReplSetRequestVotes_finish(
- const ReplicationExecutor::CallbackArgs& cbData,
- const ReplSetRequestVotesArgs& args,
- ReplSetRequestVotesResponse* response,
- Status* result) {
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- *result = Status(ErrorCodes::ShutdownInProgress, "replication system is shutting down");
- return;
- }
-
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- _topCoord->processReplSetRequestVotes(args, response, _getMyLastAppliedOpTime_inlock());
- *result = Status::OK();
+ return Status::OK();
}
Status ReplicationCoordinatorImpl::processReplSetDeclareElectionWinner(
@@ -3400,33 +3150,9 @@ Status ReplicationCoordinatorImpl::processReplSetDeclareElectionWinner(
// TODO(sz) Remove processReplSetDeclareElectionWinner rathen than passing nullptr.
updateTerm(nullptr, args.getTerm());
-
- Status result{ErrorCodes::InternalError,
- "didn't set status in processReplSetDeclareElectionWinner"};
- CBHStatus cbh = _replExecutor.scheduleWork(
- stdx::bind(&ReplicationCoordinatorImpl::_processReplSetDeclareElectionWinner_finish,
- this,
- stdx::placeholders::_1,
- args,
- responseTerm,
- &result));
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return cbh.getStatus();
- }
- _replExecutor.wait(cbh.getValue());
- return result;
-}
-
-void ReplicationCoordinatorImpl::_processReplSetDeclareElectionWinner_finish(
- const ReplicationExecutor::CallbackArgs& cbData,
- const ReplSetDeclareElectionWinnerArgs& args,
- long long* responseTerm,
- Status* result) {
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- *result = Status(ErrorCodes::ShutdownInProgress, "replication system is shutting down");
- return;
- }
- *result = _topCoord->processReplSetDeclareElectionWinner(args, responseTerm);
+ LockGuard topoLock(_topoMutex);
+ return _topCoord->processReplSetDeclareElectionWinner(args, responseTerm);
+ ;
}
void ReplicationCoordinatorImpl::prepareReplResponseMetadata(const rpc::RequestInterface& request,
@@ -3434,34 +3160,15 @@ void ReplicationCoordinatorImpl::prepareReplResponseMetadata(const rpc::RequestI
BSONObjBuilder* builder) {
if (request.getMetadata().hasField(rpc::kReplSetMetadataFieldName)) {
rpc::ReplSetMetadata metadata;
+ LockGuard topoLock(_topoMutex);
- CBHStatus cbh = _replExecutor.scheduleWork(
- stdx::bind(&ReplicationCoordinatorImpl::_prepareReplResponseMetadata_finish,
- this,
- stdx::placeholders::_1,
- lastOpTimeFromClient,
- &metadata));
-
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return;
- }
-
- fassert(28709, cbh.getStatus());
- _replExecutor.wait(cbh.getValue());
-
+ OpTime lastReadableOpTime = getCurrentCommittedSnapshotOpTime();
+ OpTime lastVisibleOpTime = std::max(lastOpTimeFromClient, lastReadableOpTime);
+ _topCoord->prepareReplResponseMetadata(&metadata, lastVisibleOpTime, _lastCommittedOpTime);
metadata.writeToMetadata(builder);
}
}
-void ReplicationCoordinatorImpl::_prepareReplResponseMetadata_finish(
- const ReplicationExecutor::CallbackArgs& cbData,
- const OpTime& lastOpTimeFromClient,
- rpc::ReplSetMetadata* metadata) {
- OpTime lastReadableOpTime = getCurrentCommittedSnapshotOpTime();
- OpTime lastVisibleOpTime = std::max(lastOpTimeFromClient, lastReadableOpTime);
- _topCoord->prepareReplResponseMetadata(metadata, lastVisibleOpTime, _lastCommittedOpTime);
-}
-
bool ReplicationCoordinatorImpl::isV1ElectionProtocol() const {
return _protVersion.load() == 1;
}
@@ -3485,44 +3192,18 @@ Status ReplicationCoordinatorImpl::processHeartbeatV1(const ReplSetHeartbeatArgs
}
Status result(ErrorCodes::InternalError, "didn't set status in prepareHeartbeatResponse");
- CBHStatus cbh = _replExecutor.scheduleWork(
- stdx::bind(&ReplicationCoordinatorImpl::_processHeartbeatFinishV1,
- this,
- stdx::placeholders::_1,
- args,
- response,
- &result));
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return {ErrorCodes::ShutdownInProgress, "replication shutdown in progress"};
- }
- fassert(28645, cbh.getStatus());
- _replExecutor.wait(cbh.getValue());
-
- return result;
-}
-
-void ReplicationCoordinatorImpl::_processHeartbeatFinishV1(
- const ReplicationExecutor::CallbackArgs& cbData,
- const ReplSetHeartbeatArgsV1& args,
- ReplSetHeartbeatResponse* response,
- Status* outStatus) {
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- *outStatus = {ErrorCodes::ShutdownInProgress, "Replication shutdown in progress"};
- return;
- }
- fassert(28655, cbData.status);
+ LockGuard topoLock(_topoMutex);
auto senderHost(args.getSenderHost());
const Date_t now = _replExecutor.now();
- *outStatus = _topCoord->prepareHeartbeatResponseV1(now,
- args,
- _settings.ourSetName(),
- getMyLastAppliedOpTime(),
- getMyLastDurableOpTime(),
- response);
-
- if ((outStatus->isOK() || *outStatus == ErrorCodes::InvalidReplicaSetConfig) &&
- _selfIndex < 0) {
+ result = _topCoord->prepareHeartbeatResponseV1(now,
+ args,
+ _settings.ourSetName(),
+ getMyLastAppliedOpTime(),
+ getMyLastDurableOpTime(),
+ response);
+
+ if ((result.isOK() || result == ErrorCodes::InvalidReplicaSetConfig) && _selfIndex < 0) {
// If this node does not belong to the configuration it knows about, send heartbeats
// back to any node that sends us a heartbeat, in case one of those remote nodes has
// a configuration that contains us. Chances are excellent that it will, since that
@@ -3530,7 +3211,7 @@ void ReplicationCoordinatorImpl::_processHeartbeatFinishV1(
if (!senderHost.empty() && _seedList.insert(senderHost).second) {
_scheduleHeartbeatToTarget(senderHost, -1, now);
}
- } else if (outStatus->isOK() && response->getConfigVersion() < args.getConfigVersion()) {
+ } else if (result.isOK() && response->getConfigVersion() < args.getConfigVersion()) {
// Schedule a heartbeat to the sender to fetch the new config.
// We cannot cancel the enqueued heartbeat, but either this one or the enqueued heartbeat
// will trigger reconfig, which cancels and reschedules all heartbeats.
@@ -3538,36 +3219,21 @@ void ReplicationCoordinatorImpl::_processHeartbeatFinishV1(
int senderIndex = _rsConfig.findMemberIndexByHostAndPort(senderHost);
_scheduleHeartbeatToTarget(senderHost, senderIndex, now);
}
- } else if (outStatus->isOK()) {
+ } else if (result.isOK()) {
// Update liveness for sending node.
stdx::lock_guard<stdx::mutex> lk(_mutex);
auto slaveInfo = _findSlaveInfoByMemberID_inlock(args.getSenderId());
if (!slaveInfo) {
- return;
+ return result;
}
slaveInfo->lastUpdate = _replExecutor.now();
slaveInfo->down = false;
}
+ return result;
}
void ReplicationCoordinatorImpl::summarizeAsHtml(ReplSetHtmlSummary* output) {
- CBHStatus cbh =
- _replExecutor.scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_summarizeAsHtml_finish,
- this,
- stdx::placeholders::_1,
- output));
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return;
- }
- fassert(28638, cbh.getStatus());
- _replExecutor.wait(cbh.getValue());
-}
-
-void ReplicationCoordinatorImpl::_summarizeAsHtml_finish(const CallbackArgs& cbData,
- ReplSetHtmlSummary* output) {
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- return;
- }
+ LockGuard topoLock(_topoMutex);
// TODO(dannenberg) consider putting both optimes into the htmlsummary.
output->setSelfOptime(getMyLastAppliedOpTime());
@@ -3582,30 +3248,18 @@ long long ReplicationCoordinatorImpl::getTerm() {
return _cachedTerm;
}
-void ReplicationCoordinatorImpl::_getTerm_helper(const ReplicationExecutor::CallbackArgs& cbData,
- long long* term) {
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- return;
- }
- *term = _topCoord->getTerm();
-}
-
EventHandle ReplicationCoordinatorImpl::updateTerm_forTest(
long long term, TopologyCoordinator::UpdateTermResult* updateResult) {
- auto finishEvhStatus = _replExecutor.makeEvent();
- invariantOK(finishEvhStatus.getStatus());
- EventHandle finishEvh = finishEvhStatus.getValue();
- auto signalFinishEvent =
- [this, finishEvh](const CallbackArgs&) { this->_replExecutor.signalEvent(finishEvh); };
- auto work = [this, term, updateResult, signalFinishEvent](const CallbackArgs& args) {
- auto evh = _updateTerm_incallback(term, updateResult);
- if (evh.isValid()) {
- _replExecutor.onEvent(evh, signalFinishEvent);
- } else {
- signalFinishEvent(args);
- }
- };
- _scheduleWork(work);
+ LockGuard topoLock(_topoMutex);
+
+ EventHandle finishEvh;
+ finishEvh = _updateTerm_incallback(term, updateResult);
+ if (!finishEvh) {
+ auto finishEvhStatus = _replExecutor.makeEvent();
+ invariantOK(finishEvhStatus.getStatus());
+ finishEvh = finishEvhStatus.getValue();
+ _replExecutor.signalEvent(finishEvh);
+ }
return finishEvh;
}
@@ -3624,10 +3278,12 @@ Status ReplicationCoordinatorImpl::updateTerm(OperationContext* txn, long long t
dassert(!txn->lockState()->isLocked());
TopologyCoordinator::UpdateTermResult updateTermResult;
EventHandle finishEvh;
- auto work = [this, term, &updateTermResult, &finishEvh](const CallbackArgs&) {
+
+ {
+ LockGuard topoLock(_topoMutex);
finishEvh = _updateTerm_incallback(term, &updateTermResult);
- };
- _scheduleWorkAndWaitForCompletion(work);
+ }
+
// Wait for potential stepdown to finish.
if (finishEvh.isValid()) {
_replExecutor.waitForEvent(finishEvh);
@@ -3829,10 +3485,9 @@ void ReplicationCoordinatorImpl::_scheduleWorkAtAndWaitForCompletion(Date_t when
}
}
-// static
CallbackHandle ReplicationCoordinatorImpl::_wrapAndScheduleWork(ScheduleFn scheduleFn,
const CallbackFn& work) {
- auto workWrapped = [work](const CallbackArgs& args) {
+ auto workWrapped = [this, work](const CallbackArgs& args) {
if (args.status == ErrorCodes::CallbackCanceled) {
return;
}
@@ -3856,23 +3511,12 @@ EventHandle ReplicationCoordinatorImpl::_makeEvent() {
}
void ReplicationCoordinatorImpl::_scheduleElectionWinNotification() {
- auto electionWinNotificationCallback = [this](const CallbackArgs& cbData) {
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- return;
- }
-
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- if (!_getMemberState_inlock().primary()) {
- return;
- }
-
- _restartHeartbeats_inlock(cbData);
- };
-
- auto cbStatus = _replExecutor.scheduleWork(electionWinNotificationCallback);
- if (!cbStatus.getStatus().isOK()) {
- warning() << "Error in scheduling notification of election win: " << cbStatus.getStatus();
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ if (!_getMemberState_inlock().primary()) {
+ return;
}
+
+ _restartHeartbeats_inlock();
}
WriteConcernOptions ReplicationCoordinatorImpl::populateUnsetWriteConcernOptionsSyncMode(
@@ -3889,5 +3533,16 @@ WriteConcernOptions ReplicationCoordinatorImpl::populateUnsetWriteConcernOptions
return writeConcern;
}
+CallbackFn ReplicationCoordinatorImpl::_wrapAsCallbackFn(const stdx::function<void()>& work) {
+ return [work](const CallbackArgs& cbData) {
+ if (cbData.status == ErrorCodes::CallbackCanceled) {
+ return;
+ }
+
+ work();
+ };
+}
+
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index 461d3ababad..bd2e06b8a7c 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -587,10 +587,8 @@ private:
* Returns an action to be performed after unlocking _mutex, via
* _performPostMemberStateUpdateAction.
*/
- PostMemberStateUpdateAction _setCurrentRSConfig_inlock(
- const ReplicationExecutor::CallbackArgs& cbData,
- const ReplicaSetConfig& newConfig,
- int myIndex);
+ PostMemberStateUpdateAction _setCurrentRSConfig_inlock(const ReplicaSetConfig& newConfig,
+ int myIndex);
/**
* Updates the last committed OpTime to be "committedOpTime" if it is more recent than the
@@ -605,74 +603,6 @@ private:
void _wakeReadyWaiters_inlock();
/**
- * Helper method for setting/unsetting maintenance mode. Scheduled by setMaintenanceMode()
- * to run in a global write lock in the replication executor thread.
- */
- void _setMaintenanceMode_helper(const ReplicationExecutor::CallbackArgs& cbData,
- bool activate,
- Status* result);
-
- /**
- * Helper method for retrieving maintenance mode. Scheduled by getMaintenanceMode() to run
- * in the replication executor thread.
- */
- void _getMaintenanceMode_helper(const ReplicationExecutor::CallbackArgs& cbData,
- bool* maintenanceMode);
-
- /**
- * Bottom half of fillIsMasterForReplSet.
- */
- void _fillIsMasterForReplSet_finish(const ReplicationExecutor::CallbackArgs& cbData,
- IsMasterResponse* result);
-
- /**
- * Bottom half of processReplSetFresh.
- */
- void _processReplSetFresh_finish(const ReplicationExecutor::CallbackArgs& cbData,
- const ReplSetFreshArgs& args,
- BSONObjBuilder* response,
- Status* result);
-
- /**
- * Bottom half of processReplSetElect.
- */
- void _processReplSetElect_finish(const ReplicationExecutor::CallbackArgs& cbData,
- const ReplSetElectArgs& args,
- BSONObjBuilder* response,
- Status* result);
-
- /**
- * Bottom half of processReplSetFreeze.
- */
- void _processReplSetFreeze_finish(const ReplicationExecutor::CallbackArgs& cbData,
- int secs,
- BSONObjBuilder* response,
- Status* result);
-
- /**
- * Bottom half of processReplSetDeclareElectionWinner.
- */
- void _processReplSetDeclareElectionWinner_finish(
- const ReplicationExecutor::CallbackArgs& cbData,
- const ReplSetDeclareElectionWinnerArgs& args,
- long long* responseTerm,
- Status* result);
-
- /**
- * Bottom half of processReplSetRequestVotes.
- */
- void _processReplSetRequestVotes_finish(const ReplicationExecutor::CallbackArgs& cbData,
- const ReplSetRequestVotesArgs& args,
- ReplSetRequestVotesResponse* response,
- Status* result);
-
- /**
- * Bottom half of prepareReplResponseMetadata.
- */
- void _prepareReplResponseMetadata_finish(const ReplicationExecutor::CallbackArgs& cbData,
- const OpTime& lastOpTimeFromClient,
- rpc::ReplSetMetadata* metadata);
- /**
* Scheduled to cause the ReplicationCoordinator to reconsider any state that might
* need to change as a result of time passing - for instance becoming PRIMARY when a single
* node replica set member's stepDown period ends.
@@ -722,7 +652,7 @@ private:
/**
* Triggers all callbacks that are blocked waiting for new heartbeat data
* to decide whether or not to finish a step down.
- * Should only be called from executor callbacks.
+ * Should only be called with _topoMutex held.
*/
void _signalStepDownWaiters();
@@ -731,8 +661,7 @@ private:
* it is running within a global shared lock, and thus that no writes are going on at the
* same time.
*/
- void _stepDownContinue(const ReplicationExecutor::CallbackArgs& cbData,
- const ReplicationExecutor::EventHandle finishedEvent,
+ void _stepDownContinue(const ReplicationExecutor::EventHandle finishedEvent,
OperationContext* txn,
Date_t waitUntil,
Date_t stepdownUntil,
@@ -755,8 +684,7 @@ private:
* supply an event, "finishedSettingFollowerMode", and wait for that event to
* be signaled. Do not observe "*success" until after the event is signaled.
*/
- void _setFollowerModeFinish(const ReplicationExecutor::CallbackArgs& cbData,
- const MemberState& newState,
+ void _setFollowerModeFinish(const MemberState& newState,
const ReplicationExecutor::EventHandle& finishedSettingFollowerMode,
bool* success);
@@ -822,21 +750,21 @@ private:
const OpTime& appliedOpTime);
/**
- * Starts a heartbeat for each member in the current config. Called within the executor
- * context.
+ * Starts a heartbeat for each member in the current config. Called while holding _topoMutex
+ * and replCoord _mutex.
*/
- void _startHeartbeats_inlock(const ReplicationExecutor::CallbackArgs& cbData);
+ void _startHeartbeats_inlock();
/**
- * Cancels all heartbeats. Called within executor context.
+ * Cancels all heartbeats. Called while holding _topoMutex and replCoord _mutex.
*/
void _cancelHeartbeats_inlock();
/**
* Cancels all heartbeats, then starts a heartbeat for each member in the current config.
- * Called within the executor context.
+ * Called while holding _topoMutex and replCoord _mutex.
*/
- void _restartHeartbeats_inlock(const ReplicationExecutor::CallbackArgs& cbData);
+ void _restartHeartbeats_inlock();
/**
* Asynchronously sends a heartbeat to "target". "targetIndex" is the index
@@ -853,15 +781,6 @@ private:
MemberState _getMemberState_inlock() const;
/**
- * Callback that gives the TopologyCoordinator an initial LastVote document from
- * local storage.
- *
- * Called only during replication startup. All other updates come from the
- * TopologyCoordinator itself.
- */
- void _updateLastVote(const LastVote& lastVote);
-
- /**
* Starts loading the replication configuration from local storage, and if it is valid,
* schedules a callback (of _finishLoadLocalConfig) to set it as the current replica set
* config (sets _rsConfig and _thisMembersConfigIndex).
@@ -892,16 +811,14 @@ private:
void _stopDataReplication();
/**
- * Callback that finishes the work of processReplSetInitiate() inside the replication
- * executor context, in the event of a successful quorum check.
+ * Finishes the work of processReplSetInitiate() while holding _topoMutex, in the event of
+ * a successful quorum check.
*/
- void _finishReplSetInitiate(const ReplicationExecutor::CallbackArgs& cbData,
- const ReplicaSetConfig& newConfig,
- int myIndex);
+ void _finishReplSetInitiate(const ReplicaSetConfig& newConfig, int myIndex);
/**
- * Callback that finishes the work of processReplSetReconfig inside the replication
- * executor context, in the event of a successful quorum check.
+ * Finishes the work of processReplSetReconfig while holding _topoMutex, in the event of
+ * a successful quorum check.
*/
void _finishReplSetReconfig(const ReplicationExecutor::CallbackArgs& cbData,
const ReplicaSetConfig& newConfig,
@@ -931,7 +848,7 @@ private:
* Begins an attempt to elect this node.
* Called after an incoming heartbeat changes this node's view of the set such that it
* believes it can be elected PRIMARY.
- * For proper concurrency, must be called via a ReplicationExecutor callback.
+ * For proper concurrency, must be called while holding _topoMutex.
*
* For old style elections the election path is:
* _startElectSelf()
@@ -994,27 +911,6 @@ private:
void _recoverFromElectionTie(const ReplicationExecutor::CallbackArgs& cbData);
/**
- * Chooses a new sync source. Must be scheduled as a callback.
- *
- * Calls into the Topology Coordinator, which uses its current view of the set to choose
- * the most appropriate sync source.
- */
- void _chooseNewSyncSource(const ReplicationExecutor::CallbackArgs& cbData,
- const Timestamp& lastTimestampFetched,
- HostAndPort* newSyncSource);
-
- /**
- * Adds 'host' to the sync source blacklist until 'until'. A blacklisted source cannot
- * be chosen as a sync source. Schedules a callback to unblacklist the sync source to be
- * run at 'until'.
- *
- * Must be scheduled as a callback.
- */
- void _blacklistSyncSource(const ReplicationExecutor::CallbackArgs& cbData,
- const HostAndPort& host,
- Date_t until);
-
- /**
* Removes 'host' from the sync source blacklist. If 'host' isn't found, it's simply
* ignored and no error is thrown.
*
@@ -1024,17 +920,6 @@ private:
const HostAndPort& host);
/**
- * Determines if a new sync source should be considered.
- *
- * Must be scheduled as a callback.
- */
- void _shouldChangeSyncSource(const ReplicationExecutor::CallbackArgs& cbData,
- const HostAndPort& currentSource,
- const OpTime& syncSourceLastOpTime,
- bool syncSourceHasSyncSource,
- bool* shouldChange);
-
- /**
* Schedules a request that the given host step down; logs any errors.
*/
void _requestRemotePrimaryStepdown(const HostAndPort& target);
@@ -1083,34 +968,11 @@ private:
const StatusWith<ReplSetHeartbeatResponse>& responseStatus);
/**
- * Bottom half of processHeartbeat(), which runs in the replication executor.
- */
- void _processHeartbeatFinish(const ReplicationExecutor::CallbackArgs& cbData,
- const ReplSetHeartbeatArgs& args,
- ReplSetHeartbeatResponse* response,
- Status* outStatus);
-
- /**
- * Bottom half of processHeartbeatV1(), which runs in the replication executor.
- */
- void _processHeartbeatFinishV1(const ReplicationExecutor::CallbackArgs& cbData,
- const ReplSetHeartbeatArgsV1& args,
- ReplSetHeartbeatResponse* response,
- Status* outStatus);
- /**
* Scan the SlaveInfoVector and determine the highest OplogEntry present on a majority of
* servers; set _lastCommittedOpTime to this new entry, if greater than the current entry.
*/
void _updateLastCommittedOpTime_inlock();
- void _summarizeAsHtml_finish(const ReplicationExecutor::CallbackArgs& cbData,
- ReplSetHtmlSummary* output);
-
- /**
- * Callback that gets the current term from topology coordinator.
- */
- void _getTerm_helper(const ReplicationExecutor::CallbackArgs& cbData, long long* term);
-
/**
* This is used to set a floor of "newOpTime" on the OpTimes we will consider committed.
* This prevents entries from before our election from counting as committed in our view,
@@ -1147,16 +1009,10 @@ private:
void _dropAllSnapshots_inlock();
/**
- * Callback which schedules "_handleLivenessTimeout" to be run whenever the liveness timeout
- * for the node who was least recently reported to be alive occurs.
- */
- void _scheduleNextLivenessUpdate(const ReplicationExecutor::CallbackArgs& cbData);
-
- /**
* Bottom half of _scheduleNextLivenessUpdate.
- * Must be called from within a callback.
+ * Must be called with _topoMutex held.
*/
- void _scheduleNextLivenessUpdate_inlock(const ReplicationExecutor::CallbackArgs& cbData);
+ void _scheduleNextLivenessUpdate_inlock();
/**
* Callback which marks downed nodes as down, triggers a stepdown if a majority of nodes are no
@@ -1227,7 +1083,7 @@ private:
* Used by _scheduleWork() and _scheduleWorkAt() only.
* Do not call this function directly.
*/
- static CallbackHandle _wrapAndScheduleWork(ScheduleFn scheduleFn, const CallbackFn& work);
+ CallbackHandle _wrapAndScheduleWork(ScheduleFn scheduleFn, const CallbackFn& work);
/**
* Creates an event.
@@ -1241,6 +1097,12 @@ private:
*/
void _scheduleElectionWinNotification();
+ /**
+ * Wrap a function into executor callback.
+ * If the callback is cancelled, the given function won't run.
+ */
+ executor::TaskExecutor::CallbackFn _wrapAsCallbackFn(const stdx::function<void()>& work);
+
//
// All member variables are labeled with one of the following codes indicating the
// synchronization rules for accessing them.
@@ -1250,17 +1112,24 @@ private:
// (PS) Pointer is read-only in concurrent operation, item pointed to is self-synchronizing;
// Access in any context.
// (M) Reads and writes guarded by _mutex
- // (X) Reads and writes must be performed in a callback in _replExecutor
- // (MX) Must hold _mutex and be in a callback in _replExecutor to write; must either hold
- // _mutex or be in a callback in _replExecutor to read.
+ // (X) Reads and writes guarded by _topoMutex
+ // (MX) Must hold _mutex and _topoMutex to write; must either hold _mutex or _topoMutex
+ // to read.
// (GX) Readable under a global intent lock. Must either hold global lock in exclusive
- // mode (MODE_X) or both hold global lock in shared mode (MODE_S) and be in executor
- // context to write.
+ // mode (MODE_X) or both hold global lock in shared mode (MODE_S) and hold _topoMutex
+ // to write.
// (I) Independently synchronized, see member variable comment.
+ // When both _mutex and _topoMutex are needed, the caller must follow the strict locking order
+ // to avoid deadlock: _topoMutex must be held before locking _mutex.
+ // In other words, _topoMutex can never be locked while holding _mutex.
+
// Protects member data of this ReplicationCoordinator.
mutable stdx::mutex _mutex; // (S)
+ // Protects member data of the TopologyCoordinator.
+ mutable stdx::mutex _topoMutex; // (S)
+
// Handles to actively queued heartbeats.
HeartbeatHandles _heartbeatHandles; // (X)
diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect.cpp
index a2cd0eb386e..a70c8963af8 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_elect.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_elect.cpp
@@ -137,17 +137,18 @@ void ReplicationCoordinatorImpl::_startElectSelf() {
// _mutex again.
lk.unlock();
- StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh = _freshnessChecker->start(
- &_replExecutor,
- lastOpTimeApplied.getTimestamp(),
- _rsConfig,
- _selfIndex,
- _topCoord->getMaybeUpHostAndPorts(),
- stdx::bind(&ReplicationCoordinatorImpl::_onFreshnessCheckComplete, this));
+ StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh =
+ _freshnessChecker->start(&_replExecutor,
+ lastOpTimeApplied.getTimestamp(),
+ _rsConfig,
+ _selfIndex,
+ _topCoord->getMaybeUpHostAndPorts());
if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) {
return;
}
fassert(18681, nextPhaseEvh.getStatus());
+ _replExecutor.onEvent(nextPhaseEvh.getValue(),
+ stdx::bind(&ReplicationCoordinatorImpl::_onFreshnessCheckComplete, this));
lossGuard.dismiss();
}
@@ -159,6 +160,7 @@ void ReplicationCoordinatorImpl::_onFreshnessCheckComplete() {
&_freshnessChecker,
&_electCmdRunner,
&_electionFinishedEvent);
+ LockGuard lk(_topoMutex);
if (_freshnessChecker->isCanceled()) {
LOG(2) << "Election canceled during freshness check phase";
@@ -180,11 +182,10 @@ void ReplicationCoordinatorImpl::_onFreshnessCheckComplete() {
log() << "possible election tie; sleeping " << ms << " until "
<< dateToISOStringLocal(nextCandidateTime);
_topCoord->setElectionSleepUntil(nextCandidateTime);
- _replExecutor.scheduleWorkAt(
- nextCandidateTime,
- stdx::bind(&ReplicationCoordinatorImpl::_recoverFromElectionTie,
- this,
- stdx::placeholders::_1));
+ _scheduleWorkAt(nextCandidateTime,
+ stdx::bind(&ReplicationCoordinatorImpl::_recoverFromElectionTie,
+ this,
+ stdx::placeholders::_1));
_sleptLastElection = true;
return;
}
@@ -210,15 +211,14 @@ void ReplicationCoordinatorImpl::_onFreshnessCheckComplete() {
_electCmdRunner.reset(new ElectCmdRunner);
StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh = _electCmdRunner->start(
- &_replExecutor,
- _rsConfig,
- _selfIndex,
- _topCoord->getMaybeUpHostAndPorts(),
- stdx::bind(&ReplicationCoordinatorImpl::_onElectCmdRunnerComplete, this));
+ &_replExecutor, _rsConfig, _selfIndex, _topCoord->getMaybeUpHostAndPorts());
if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) {
return;
}
fassert(18685, nextPhaseEvh.getStatus());
+
+ _replExecutor.onEvent(nextPhaseEvh.getValue(),
+ stdx::bind(&ReplicationCoordinatorImpl::_onElectCmdRunnerComplete, this));
lossGuard.dismiss();
}
@@ -228,6 +228,7 @@ void ReplicationCoordinatorImpl::_onElectCmdRunnerComplete() {
&_freshnessChecker,
&_electCmdRunner,
&_electionFinishedEvent);
+ LockGuard lk(_topoMutex);
invariant(_freshnessChecker);
invariant(_electCmdRunner);
@@ -248,11 +249,10 @@ void ReplicationCoordinatorImpl::_onElectCmdRunnerComplete() {
const Date_t nextCandidateTime = now + ms;
log() << "waiting until " << nextCandidateTime << " before standing for election again";
_topCoord->setElectionSleepUntil(nextCandidateTime);
- _replExecutor.scheduleWorkAt(
- nextCandidateTime,
- stdx::bind(&ReplicationCoordinatorImpl::_recoverFromElectionTie,
- this,
- stdx::placeholders::_1));
+ _scheduleWorkAt(nextCandidateTime,
+ stdx::bind(&ReplicationCoordinatorImpl::_recoverFromElectionTie,
+ this,
+ stdx::placeholders::_1));
return;
}
@@ -272,9 +272,8 @@ void ReplicationCoordinatorImpl::_onElectCmdRunnerComplete() {
void ReplicationCoordinatorImpl::_recoverFromElectionTie(
const ReplicationExecutor::CallbackArgs& cbData) {
- if (!cbData.status.isOK()) {
- return;
- }
+ LockGuard topoLock(_topoMutex);
+
auto now = _replExecutor.now();
auto lastOpApplied = getMyLastAppliedOpTime();
const auto status = _topCoord->checkShouldStandForElection(now, lastOpApplied);
diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp
index ca98c3cb471..84d00015fbd 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp
@@ -547,6 +547,10 @@ TEST_F(ReplCoordElectTest, NodeCancelsElectionUponReceivingANewConfigDuringFresh
BSONObjBuilder result;
ASSERT_OK(getReplCoord()->processReplSetReconfig(&txn, config, &result));
+ // Wait until election cancels.
+ net->enterNetwork();
+ net->runReadyNetworkOperations();
+ net->exitNetwork();
ASSERT(TopologyCoordinator::Role::follower == getTopoCoord().getRole());
}
@@ -583,6 +587,10 @@ TEST_F(ReplCoordElectTest, NodeCancelsElectionUponReceivingANewConfigDuringElect
BSONObjBuilder result;
ASSERT_OK(getReplCoord()->processReplSetReconfig(&txn, config, &result));
+ // Wait until election cancels.
+ getNet()->enterNetwork();
+ getNet()->runReadyNetworkOperations();
+ getNet()->exitNetwork();
ASSERT(TopologyCoordinator::Role::follower == getTopoCoord().getRole());
}
diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp
index acc9f8cf2ec..b0ac53d9491 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp
@@ -143,18 +143,19 @@ void ReplicationCoordinatorImpl::_startElectSelfV1() {
lk.unlock();
long long term = _topCoord->getTerm();
- StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh = _voteRequester->start(
- &_replExecutor,
- _rsConfig,
- _selfIndex,
- _topCoord->getTerm(),
- true, // dry run
- lastOpTime,
- stdx::bind(&ReplicationCoordinatorImpl::_onDryRunComplete, this, term));
+ StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh =
+ _voteRequester->start(&_replExecutor,
+ _rsConfig,
+ _selfIndex,
+ _topCoord->getTerm(),
+ true, // dry run
+ lastOpTime);
if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) {
return;
}
fassert(28685, nextPhaseEvh.getStatus());
+ _replExecutor.onEvent(nextPhaseEvh.getValue(),
+ stdx::bind(&ReplicationCoordinatorImpl::_onDryRunComplete, this, term));
lossGuard.dismiss();
}
@@ -163,6 +164,8 @@ void ReplicationCoordinatorImpl::_onDryRunComplete(long long originalTerm) {
invariant(!_electionWinnerDeclarer);
LoseElectionDryRunGuardV1 lossGuard(this);
+ LockGuard lk(_topoMutex);
+
if (_topCoord->getTerm() != originalTerm) {
log() << "not running for primary, we have been superceded already";
return;
@@ -222,15 +225,8 @@ void ReplicationCoordinatorImpl::_writeLastVoteForMyElection(
return;
}
- auto cbStatus = _replExecutor.scheduleWork(
- [this, lastVote](const ReplicationExecutor::CallbackArgs& cbData) {
- _replExecutor.signalEvent(_electionDryRunFinishedEvent);
- _startVoteRequester(lastVote.getTerm());
- });
- if (cbStatus.getStatus() == ErrorCodes::ShutdownInProgress) {
- return;
- }
- fassert(28768, cbStatus.getStatus());
+ _startVoteRequester(lastVote.getTerm());
+ _replExecutor.signalEvent(_electionDryRunFinishedEvent);
lossGuard.dismiss();
}
@@ -240,22 +236,21 @@ void ReplicationCoordinatorImpl::_startVoteRequester(long long newTerm) {
invariant(!_electionWinnerDeclarer);
LoseElectionGuardV1 lossGuard(this);
+ LockGuard lk(_topoMutex);
+
const auto lastOpTime =
_isDurableStorageEngine() ? getMyLastDurableOpTime() : getMyLastAppliedOpTime();
_voteRequester.reset(new VoteRequester);
StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh = _voteRequester->start(
- &_replExecutor,
- _rsConfig,
- _selfIndex,
- _topCoord->getTerm(),
- false,
- lastOpTime,
- stdx::bind(&ReplicationCoordinatorImpl::_onVoteRequestComplete, this, newTerm));
+ &_replExecutor, _rsConfig, _selfIndex, _topCoord->getTerm(), false, lastOpTime);
if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) {
return;
}
fassert(28643, nextPhaseEvh.getStatus());
+ _replExecutor.onEvent(
+ nextPhaseEvh.getValue(),
+ stdx::bind(&ReplicationCoordinatorImpl::_onVoteRequestComplete, this, newTerm));
lossGuard.dismiss();
}
@@ -265,6 +260,8 @@ void ReplicationCoordinatorImpl::_onVoteRequestComplete(long long originalTerm)
invariant(!_electionWinnerDeclarer);
LoseElectionGuardV1 lossGuard(this);
+ LockGuard lk(_topoMutex);
+
if (_topCoord->getTerm() != originalTerm) {
log() << "not becoming primary, we have been superceded already";
return;
diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
index 881e3ef69bc..4679d17004c 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
@@ -846,6 +846,10 @@ TEST_F(ReplCoordTest, NodeCancelsElectionUponReceivingANewConfigDuringDryRun) {
BSONObjBuilder result;
ASSERT_OK(getReplCoord()->processReplSetReconfig(&txn, config, &result));
+ // Wait until election cancels.
+ net->enterNetwork();
+ net->runReadyNetworkOperations();
+ net->exitNetwork();
ASSERT(TopologyCoordinator::Role::follower == getTopoCoord().getRole());
}
@@ -882,6 +886,10 @@ TEST_F(ReplCoordTest, NodeCancelsElectionUponReceivingANewConfigDuringVotePhase)
BSONObjBuilder result;
ASSERT_OK(getReplCoord()->processReplSetReconfig(&txn, config, &result));
+ // Wait until election cancels.
+ getNet()->enterNetwork();
+ getNet()->runReadyNetworkOperations();
+ getNet()->exitNetwork();
ASSERT(TopologyCoordinator::Role::follower == getTopoCoord().getRole());
}
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
index df03ebe37b7..2be0f71b5d8 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
@@ -44,6 +44,7 @@
#include "mongo/db/repl/replication_coordinator_impl.h"
#include "mongo/db/repl/replication_executor.h"
#include "mongo/db/repl/topology_coordinator.h"
+#include "mongo/db/repl/vote_requester.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/stdx/functional.h"
@@ -67,6 +68,8 @@ using executor::RemoteCommandRequest;
void ReplicationCoordinatorImpl::_doMemberHeartbeat(ReplicationExecutor::CallbackArgs cbData,
const HostAndPort& target,
int targetIndex) {
+ LockGuard topoLock(_topoMutex);
+
_untrackHeartbeatHandle(cbData.myHandle);
if (cbData.status == ErrorCodes::CallbackCanceled) {
return;
@@ -113,6 +116,8 @@ void ReplicationCoordinatorImpl::_scheduleHeartbeatToTarget(const HostAndPort& t
void ReplicationCoordinatorImpl::_handleHeartbeatResponse(
const ReplicationExecutor::RemoteCommandCallbackArgs& cbData, int targetIndex) {
+ LockGuard topoLock(_topoMutex);
+
// remove handle from queued heartbeats
_untrackHeartbeatHandle(cbData.myHandle);
@@ -320,6 +325,9 @@ void ReplicationCoordinatorImpl::_stepDownFinish(
if (cbData.status == ErrorCodes::CallbackCanceled) {
return;
}
+
+ LockGuard topoLock(_topoMutex);
+
invariant(cbData.txn);
// TODO Add invariant that we've got global shared or global exclusive lock, when supported
// by lock manager.
@@ -360,9 +368,9 @@ void ReplicationCoordinatorImpl::_scheduleHeartbeatReconfig(const ReplicaSetConf
invariant(!_rsConfig.isInitialized() ||
_rsConfig.getConfigVersion() < newConfig.getConfigVersion());
if (_freshnessChecker) {
- _freshnessChecker->cancel(&_replExecutor);
+ _freshnessChecker->cancel();
if (_electCmdRunner) {
- _electCmdRunner->cancel(&_replExecutor);
+ _electCmdRunner->cancel();
}
_replExecutor.onEvent(
_electionFinishedEvent,
@@ -383,6 +391,8 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigAfterElectionCanceled(
if (cbData.status == ErrorCodes::CallbackCanceled) {
return;
}
+
+ LockGuard topoLock(_topoMutex);
fassert(18911, cbData.status);
stdx::lock_guard<stdx::mutex> lk(_mutex);
if (_inShutdown) {
@@ -452,7 +462,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore(
}
}
- const stdx::function<void(const ReplicationExecutor::CallbackArgs&)> reconfigFinishFn(
+ const CallbackFn reconfigFinishFn(
stdx::bind(&ReplicationCoordinatorImpl::_heartbeatReconfigFinish,
this,
stdx::placeholders::_1,
@@ -481,6 +491,8 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish(
return;
}
+ LockGuard topoLock(_topoMutex);
+
stdx::unique_lock<stdx::mutex> lk(_mutex);
invariant(_rsConfigState == kConfigHBReconfiguring);
invariant(!_rsConfig.isInitialized() ||
@@ -501,6 +513,28 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish(
return;
}
+ // Do not conduct an election during a reconfig, as the node may not be electable post-reconfig.
+ if (_topCoord->getRole() == TopologyCoordinator::Role::candidate) {
+ if (isV1ElectionProtocol()) {
+ invariant(_voteRequester);
+ _voteRequester->cancel();
+ } else {
+ invariant(_freshnessChecker);
+ _freshnessChecker->cancel();
+ if (_electCmdRunner) {
+ _electCmdRunner->cancel();
+ }
+ }
+ // Wait for the election to complete and the node's Role to be set to follower.
+ _replExecutor.onEvent(_electionFinishedEvent,
+ stdx::bind(&ReplicationCoordinatorImpl::_heartbeatReconfigFinish,
+ this,
+ stdx::placeholders::_1,
+ newConfig,
+ myIndex));
+ return;
+ }
+
if (!myIndex.isOK()) {
switch (myIndex.getStatus().code()) {
case ErrorCodes::NodeNotFound:
@@ -524,8 +558,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish(
// If we do not have an index, we should pass -1 as our index to avoid falsely adding ourself to
// the data structures inside of the TopologyCoordinator.
const int myIndexValue = myIndex.getStatus().isOK() ? myIndex.getValue() : -1;
- const PostMemberStateUpdateAction action =
- _setCurrentRSConfig_inlock(cbData, newConfig, myIndexValue);
+ const PostMemberStateUpdateAction action = _setCurrentRSConfig_inlock(newConfig, myIndexValue);
lk.unlock();
_resetElectionInfoOnProtocolVersionUpgrade(oldConfig, newConfig);
_performPostMemberStateUpdateAction(action);
@@ -558,14 +591,12 @@ void ReplicationCoordinatorImpl::_cancelHeartbeats_inlock() {
}
}
-void ReplicationCoordinatorImpl::_restartHeartbeats_inlock(
- const ReplicationExecutor::CallbackArgs& cbData) {
+void ReplicationCoordinatorImpl::_restartHeartbeats_inlock() {
_cancelHeartbeats_inlock();
- _startHeartbeats_inlock(cbData);
+ _startHeartbeats_inlock();
}
-void ReplicationCoordinatorImpl::_startHeartbeats_inlock(
- const ReplicationExecutor::CallbackArgs& cbData) {
+void ReplicationCoordinatorImpl::_startHeartbeats_inlock() {
const Date_t now = _replExecutor.now();
_seedList.clear();
for (int i = 0; i < _rsConfig.getNumMembers(); ++i) {
@@ -579,12 +610,13 @@ void ReplicationCoordinatorImpl::_startHeartbeats_inlock(
slaveInfo.lastUpdate = _replExecutor.now();
slaveInfo.down = false;
}
- _scheduleNextLivenessUpdate_inlock(cbData);
+ _scheduleNextLivenessUpdate_inlock();
}
}
void ReplicationCoordinatorImpl::_handleLivenessTimeout(
const ReplicationExecutor::CallbackArgs& cbData) {
+ LockGuard topoLock(_topoMutex);
stdx::lock_guard<stdx::mutex> lk(_mutex);
// Only reset the callback handle if it matches, otherwise more will be coming through
if (cbData.myHandle == _handleLivenessTimeoutCbh) {
@@ -628,20 +660,10 @@ void ReplicationCoordinatorImpl::_handleLivenessTimeout(
}
}
}
- _scheduleNextLivenessUpdate_inlock(cbData);
-}
-
-void ReplicationCoordinatorImpl::_scheduleNextLivenessUpdate(
- const ReplicationExecutor::CallbackArgs& cbData) {
- if (cbData.status == ErrorCodes::CallbackCanceled)
- return;
-
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- _scheduleNextLivenessUpdate_inlock(cbData);
+ _scheduleNextLivenessUpdate_inlock();
}
-void ReplicationCoordinatorImpl::_scheduleNextLivenessUpdate_inlock(
- const ReplicationExecutor::CallbackArgs& cbData) {
+void ReplicationCoordinatorImpl::_scheduleNextLivenessUpdate_inlock() {
if (!isV1ElectionProtocol()) {
return;
}
@@ -678,15 +700,14 @@ void ReplicationCoordinatorImpl::_scheduleNextLivenessUpdate_inlock(
auto nextTimeout = earliestDate + _rsConfig.getElectionTimeoutPeriod();
if (nextTimeout > _replExecutor.now()) {
LOG(3) << "scheduling next check at " << nextTimeout;
- auto cbh = _replExecutor.scheduleWorkAt(
- nextTimeout,
- stdx::bind(
- &ReplicationCoordinatorImpl::_handleLivenessTimeout, this, stdx::placeholders::_1));
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
+ auto cbh = _scheduleWorkAt(nextTimeout,
+ stdx::bind(&ReplicationCoordinatorImpl::_handleLivenessTimeout,
+ this,
+ stdx::placeholders::_1));
+ if (!cbh) {
return;
}
- fassert(22002, cbh.getStatus());
- _handleLivenessTimeoutCbh = cbh.getValue();
+ _handleLivenessTimeoutCbh = cbh;
_earliestMemberId = earliestMemberId;
}
}
@@ -698,8 +719,7 @@ void ReplicationCoordinatorImpl::_cancelAndRescheduleLivenessUpdate_inlock(int u
if (_handleLivenessTimeoutCbh.isValid()) {
_replExecutor.cancel(_handleLivenessTimeoutCbh);
}
- _replExecutor.scheduleWork(stdx::bind(
- &ReplicationCoordinatorImpl::_scheduleNextLivenessUpdate, this, stdx::placeholders::_1));
+ _scheduleNextLivenessUpdate_inlock();
}
void ReplicationCoordinatorImpl::_cancelPriorityTakeover_inlock() {
@@ -748,6 +768,8 @@ void ReplicationCoordinatorImpl::_cancelAndRescheduleElectionTimeout_inlock() {
}
void ReplicationCoordinatorImpl::_startElectSelfIfEligibleV1(bool isPriorityTakeOver) {
+ LockGuard topoLock(_topoMutex);
+
if (!isV1ElectionProtocol()) {
return;
}
diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
index de8a175d01c..960987a5b46 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
@@ -1692,6 +1692,7 @@ TEST_F(StepDownTest,
// Make a secondary actually catch up
enterNetwork();
+ getNet()->runUntil(getNet()->now() + Milliseconds(1000));
ASSERT(getNet()->hasReadyRequests());
NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
RemoteCommandRequest request = noi->getRequest();
@@ -1750,6 +1751,7 @@ TEST_F(StepDownTest,
// Secondary has not caught up on first round of heartbeats.
enterNetwork();
+ getNet()->runUntil(getNet()->now() + Milliseconds(1000));
ASSERT(getNet()->hasReadyRequests());
NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
RemoteCommandRequest request = noi->getRequest();
@@ -4218,12 +4220,12 @@ TEST_F(ReplCoordTest, WaitForMemberState) {
replCoord->setMyLastDurableOpTime(OpTime(Timestamp(1, 0), 0));
ASSERT_TRUE(replCoord->setFollowerMode(MemberState::RS_SECONDARY));
- // Successful dry run election increases term.
- ASSERT_EQUALS(initialTerm + 1, replCoord->getTerm());
-
// Single node cluster - this node should start election on setFollowerMode() completion.
replCoord->waitForElectionFinish_forTest();
+ // Successful dry run election increases term.
+ ASSERT_EQUALS(initialTerm + 1, replCoord->getTerm());
+
auto timeout = Milliseconds(1);
ASSERT_OK(replCoord->waitForMemberState(MemberState::RS_PRIMARY, timeout));
ASSERT_EQUALS(ErrorCodes::ExceededTimeLimit,
@@ -4253,12 +4255,12 @@ TEST_F(ReplCoordTest, WaitForDrainFinish) {
replCoord->setMyLastDurableOpTime(OpTime(Timestamp(1, 0), 0));
ASSERT_TRUE(replCoord->setFollowerMode(MemberState::RS_SECONDARY));
- // Successful dry run election increases term.
- ASSERT_EQUALS(initialTerm + 1, replCoord->getTerm());
-
// Single node cluster - this node should start election on setFollowerMode() completion.
replCoord->waitForElectionFinish_forTest();
+ // Successful dry run election increases term.
+ ASSERT_EQUALS(initialTerm + 1, replCoord->getTerm());
+
auto timeout = Milliseconds(1);
ASSERT_OK(replCoord->waitForMemberState(MemberState::RS_PRIMARY, timeout));
diff --git a/src/mongo/db/repl/scatter_gather_runner.cpp b/src/mongo/db/repl/scatter_gather_runner.cpp
index 16c778399d0..042bb0f1c10 100644
--- a/src/mongo/db/repl/scatter_gather_runner.cpp
+++ b/src/mongo/db/repl/scatter_gather_runner.cpp
@@ -42,59 +42,63 @@ namespace mongo {
namespace repl {
using executor::RemoteCommandRequest;
-
-ScatterGatherRunner::ScatterGatherRunner(ScatterGatherAlgorithm* algorithm)
- : _algorithm(algorithm), _started(false) {}
-
-ScatterGatherRunner::~ScatterGatherRunner() {}
-
-static void startTrampoline(const ReplicationExecutor::CallbackArgs& cbData,
- ScatterGatherRunner* runner,
- StatusWith<ReplicationExecutor::EventHandle>* result) {
- // TODO: remove static cast once ScatterGatherRunner is designed to work with a generic
- // TaskExecutor.
- ReplicationExecutor* executor = static_cast<ReplicationExecutor*>(cbData.executor);
- *result = runner->start(executor);
-}
-
-Status ScatterGatherRunner::run(ReplicationExecutor* executor) {
- StatusWith<ReplicationExecutor::EventHandle> finishEvh(ErrorCodes::InternalError, "Not set");
- StatusWith<ReplicationExecutor::CallbackHandle> startCBH = executor->scheduleWork(
- stdx::bind(startTrampoline, stdx::placeholders::_1, this, &finishEvh));
- if (!startCBH.isOK()) {
- return startCBH.getStatus();
- }
- executor->wait(startCBH.getValue());
+using LockGuard = stdx::lock_guard<stdx::mutex>;
+using CallbackHandle = ReplicationExecutor::CallbackHandle;
+using EventHandle = ReplicationExecutor::EventHandle;
+using RemoteCommandCallbackArgs = ReplicationExecutor::RemoteCommandCallbackArgs;
+using RemoteCommandCallbackFn = ReplicationExecutor::RemoteCommandCallbackFn;
+
+ScatterGatherRunner::ScatterGatherRunner(ScatterGatherAlgorithm* algorithm,
+ ReplicationExecutor* executor)
+ : _executor(executor), _impl(std::make_shared<RunnerImpl>(algorithm, executor)) {}
+
+Status ScatterGatherRunner::run() {
+ auto finishEvh = start();
if (!finishEvh.isOK()) {
return finishEvh.getStatus();
}
- executor->waitForEvent(finishEvh.getValue());
+ _executor->waitForEvent(finishEvh.getValue());
return Status::OK();
}
-StatusWith<ReplicationExecutor::EventHandle> ScatterGatherRunner::start(
- ReplicationExecutor* executor, const stdx::function<void()>& onCompletion) {
+StatusWith<EventHandle> ScatterGatherRunner::start() {
+ // Callback has a shared pointer to the RunnerImpl, so it's always safe to
+ // access the RunnerImpl.
+ std::shared_ptr<RunnerImpl>& impl = _impl;
+ auto cb = [impl](const RemoteCommandCallbackArgs& cbData) { impl->processResponse(cbData); };
+ return _impl->start(cb);
+}
+
+void ScatterGatherRunner::cancel() {
+ _impl->cancel();
+}
+
+/**
+ * Scatter gather runner implementation.
+ */
+ScatterGatherRunner::RunnerImpl::RunnerImpl(ScatterGatherAlgorithm* algorithm,
+ ReplicationExecutor* executor)
+ : _executor(executor), _algorithm(algorithm) {}
+
+StatusWith<EventHandle> ScatterGatherRunner::RunnerImpl::start(
+ const RemoteCommandCallbackFn processResponseCB) {
+ LockGuard lk(_mutex);
+
invariant(!_started);
_started = true;
- _actualResponses = 0;
- _onCompletion = onCompletion;
- StatusWith<ReplicationExecutor::EventHandle> evh = executor->makeEvent();
+ StatusWith<EventHandle> evh = _executor->makeEvent();
if (!evh.isOK()) {
return evh;
}
_sufficientResponsesReceived = evh.getValue();
- ScopeGuard earlyReturnGuard =
- MakeGuard(&ScatterGatherRunner::_signalSufficientResponsesReceived, this, executor);
-
- const ReplicationExecutor::RemoteCommandCallbackFn cb =
- stdx::bind(&ScatterGatherRunner::_processResponse, stdx::placeholders::_1, this);
+ ScopeGuard earlyReturnGuard = MakeGuard(&RunnerImpl::_signalSufficientResponsesReceived, this);
std::vector<RemoteCommandRequest> requests = _algorithm->getRequests();
for (size_t i = 0; i < requests.size(); ++i) {
- const StatusWith<ReplicationExecutor::CallbackHandle> cbh =
- executor->scheduleRemoteCommand(requests[i], cb);
+ const StatusWith<CallbackHandle> cbh =
+ _executor->scheduleRemoteCommand(requests[i], processResponseCB);
if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return StatusWith<ReplicationExecutor::EventHandle>(cbh.getStatus());
+ return StatusWith<EventHandle>(cbh.getStatus());
}
fassert(18743, cbh.getStatus());
_callbacks.push_back(cbh.getValue());
@@ -102,50 +106,47 @@ StatusWith<ReplicationExecutor::EventHandle> ScatterGatherRunner::start(
if (_callbacks.empty() || _algorithm->hasReceivedSufficientResponses()) {
invariant(_algorithm->hasReceivedSufficientResponses());
- _signalSufficientResponsesReceived(executor);
+ _signalSufficientResponsesReceived();
}
earlyReturnGuard.Dismiss();
return evh;
}
-void ScatterGatherRunner::cancel(ReplicationExecutor* executor) {
+void ScatterGatherRunner::RunnerImpl::cancel() {
+ LockGuard lk(_mutex);
+
invariant(_started);
- _signalSufficientResponsesReceived(executor);
+ _signalSufficientResponsesReceived();
}
-void ScatterGatherRunner::_processResponse(
- const ReplicationExecutor::RemoteCommandCallbackArgs& cbData, ScatterGatherRunner* runner) {
- // It is possible that the ScatterGatherRunner has already gone out of scope, if the
- // response indicates the callback was canceled. In that case, do not access any members
- // of "runner" and return immediately.
+void ScatterGatherRunner::RunnerImpl::processResponse(
+ const ReplicationExecutor::RemoteCommandCallbackArgs& cbData) {
if (cbData.response.getStatus() == ErrorCodes::CallbackCanceled) {
return;
}
+ LockGuard lk(_mutex);
+ if (!_sufficientResponsesReceived.isValid()) {
+ // We've received sufficient responses and it's not safe to access the algorithm any more.
+ return;
+ }
- ++runner->_actualResponses;
- runner->_algorithm->processResponse(cbData.request, cbData.response);
- if (runner->_algorithm->hasReceivedSufficientResponses()) {
- // TODO: remove static cast once ScatterGatherRunner is designed to work with a generic
- // TaskExecutor.
- ReplicationExecutor* executor = static_cast<ReplicationExecutor*>(cbData.executor);
- runner->_signalSufficientResponsesReceived(executor);
+ ++_actualResponses;
+ _algorithm->processResponse(cbData.request, cbData.response);
+ if (_algorithm->hasReceivedSufficientResponses()) {
+ _signalSufficientResponsesReceived();
} else {
- invariant(runner->_actualResponses < runner->_callbacks.size());
+ invariant(_actualResponses < _callbacks.size());
}
}
-void ScatterGatherRunner::_signalSufficientResponsesReceived(ReplicationExecutor* executor) {
+void ScatterGatherRunner::RunnerImpl::_signalSufficientResponsesReceived() {
if (_sufficientResponsesReceived.isValid()) {
std::for_each(_callbacks.begin(),
_callbacks.end(),
- stdx::bind(&ReplicationExecutor::cancel, executor, stdx::placeholders::_1));
- const ReplicationExecutor::EventHandle h = _sufficientResponsesReceived;
- _sufficientResponsesReceived = ReplicationExecutor::EventHandle();
- if (_onCompletion) {
- _onCompletion();
- }
- executor->signalEvent(h);
+ stdx::bind(&ReplicationExecutor::cancel, _executor, stdx::placeholders::_1));
+ _executor->signalEvent(_sufficientResponsesReceived);
+ _sufficientResponsesReceived = EventHandle();
}
}
diff --git a/src/mongo/db/repl/scatter_gather_runner.h b/src/mongo/db/repl/scatter_gather_runner.h
index ad7b8d93aa5..e4e6e40f404 100644
--- a/src/mongo/db/repl/scatter_gather_runner.h
+++ b/src/mongo/db/repl/scatter_gather_runner.h
@@ -33,6 +33,7 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/db/repl/replication_executor.h"
#include "mongo/stdx/functional.h"
+#include "mongo/stdx/mutex.h"
namespace mongo {
@@ -44,7 +45,7 @@ namespace repl {
class ScatterGatherAlgorithm;
/**
- * Implementation of a scatter-gather behavior using a ReplicationExecutor.
+ * Interface of a scatter-gather behavior.
*/
class ScatterGatherRunner {
MONGO_DISALLOW_COPYING(ScatterGatherRunner);
@@ -53,14 +54,12 @@ public:
/**
* Constructs a new runner whose underlying algorithm is "algorithm".
*
- * "algorithm" must remain in scope until the runner's destructor completes.
+ * "algorithm" and "executor" must remain in scope until the runner's destructor completes.
*/
- explicit ScatterGatherRunner(ScatterGatherAlgorithm* algorithm);
-
- ~ScatterGatherRunner();
+ explicit ScatterGatherRunner(ScatterGatherAlgorithm* algorithm, ReplicationExecutor* executor);
/**
- * Runs the scatter-gather process using "executor", and blocks until it completes.
+ * Runs the scatter-gather process and blocks until it completes.
*
* Must _not_ be run from inside the executor context.
*
@@ -70,55 +69,71 @@ public:
* is scheduled but before it completes, this method will return Status::OK(),
* just as it does when it runs successfully to completion.
*/
- Status run(ReplicationExecutor* executor);
+ Status run();
/**
- * Starts executing the scatter-gather process using "executor".
- *
* On success, returns an event handle that will be signaled when the runner has
* finished executing the scatter-gather process. After that event has been
* signaled, it is safe for the caller to examine any state on "algorithm".
*
- * This method must be called inside the executor context.
- *
- * onCompletion is an optional callback that will be executed in executor context
- * immediately prior to signaling the event handle returned here. It must never
- * throw exceptions. It may examine the state of the algorithm object.
- *
- * NOTE: If the executor starts to shut down before onCompletion executes, onCompletion may
- * never execute, even though the returned event will eventually be signaled.
+ * The returned event will eventually be signaled.
*/
- StatusWith<ReplicationExecutor::EventHandle> start(
- ReplicationExecutor* executor,
- const stdx::function<void()>& onCompletion = stdx::function<void()>());
+ StatusWith<ReplicationExecutor::EventHandle> start();
/**
- * Informs the runner to cancel further processing. The "executor" argument
- * must point to the same executor passed to "start()".
- *
- * Like start, this method must be called from within the executor context.
+ * Informs the runner to cancel further processing.
*/
- void cancel(ReplicationExecutor* executor);
+ void cancel();
private:
/**
- * Callback invoked once for every response from the network.
- */
- static void _processResponse(const ReplicationExecutor::RemoteCommandCallbackArgs& cbData,
- ScatterGatherRunner* runner);
-
- /**
- * Method that performs all actions required when _algorithm indicates a sufficient
- * number of respones have been received.
+ * Implementation of a scatter-gather behavior using a ReplicationExecutor.
*/
- void _signalSufficientResponsesReceived(ReplicationExecutor* executor);
-
- ScatterGatherAlgorithm* _algorithm;
- stdx::function<void()> _onCompletion;
- ReplicationExecutor::EventHandle _sufficientResponsesReceived;
- std::vector<ReplicationExecutor::CallbackHandle> _callbacks;
- size_t _actualResponses;
- bool _started;
+ class RunnerImpl {
+ public:
+ explicit RunnerImpl(ScatterGatherAlgorithm* algorithm, ReplicationExecutor* executor);
+
+ /**
+ * On success, returns an event handle that will be signaled when the runner has
+ * finished executing the scatter-gather process. After that event has been
+ * signaled, it is safe for the caller to examine any state on "algorithm".
+ *
+ * The returned event will eventually be signaled.
+ */
+ StatusWith<ReplicationExecutor::EventHandle> start(
+ const ReplicationExecutor::RemoteCommandCallbackFn cb);
+
+ /**
+ * Informs the runner to cancel further processing.
+ */
+ void cancel();
+
+ /**
+ * Callback invoked once for every response from the network.
+ */
+ void processResponse(const ReplicationExecutor::RemoteCommandCallbackArgs& cbData);
+
+ private:
+ /**
+ * Method that performs all actions required when _algorithm indicates a sufficient
+ * number of responses have been received.
+ */
+ void _signalSufficientResponsesReceived();
+
+ ReplicationExecutor* _executor; // Not owned here.
+ ScatterGatherAlgorithm* _algorithm; // Not owned here.
+ ReplicationExecutor::EventHandle _sufficientResponsesReceived;
+ std::vector<ReplicationExecutor::CallbackHandle> _callbacks;
+ size_t _actualResponses = 0;
+ bool _started = false;
+ stdx::mutex _mutex;
+ };
+
+ ReplicationExecutor* _executor; // Not owned here.
+
+ // This pointer of RunnerImpl will be shared with remote command callbacks to make sure
+ // callbacks can access the members safely.
+ std::shared_ptr<RunnerImpl> _impl;
};
} // namespace repl
diff --git a/src/mongo/db/repl/scatter_gather_test.cpp b/src/mongo/db/repl/scatter_gather_test.cpp
index 6e4c11a65e0..98c5032bc6c 100644
--- a/src/mongo/db/repl/scatter_gather_test.cpp
+++ b/src/mongo/db/repl/scatter_gather_test.cpp
@@ -45,6 +45,8 @@ using executor::NetworkInterfaceMock;
using executor::RemoteCommandRequest;
using executor::RemoteCommandResponse;
+static const int kTotalRequests = 3;
+
/**
* Algorithm for testing the ScatterGatherRunner, which will finish running when finish() is
* called, or upon receiving responses from two nodes. Creates a three requests algorithm
@@ -57,7 +59,7 @@ public:
virtual std::vector<RemoteCommandRequest> getRequests() const {
std::vector<RemoteCommandRequest> requests;
- for (int i = 0; i < 3; i++) {
+ for (int i = 0; i < kTotalRequests; i++) {
requests.push_back(RemoteCommandRequest(
HostAndPort("hostname", i), "admin", BSONObj(), Milliseconds(30 * 1000)));
}
@@ -150,7 +152,7 @@ public:
private:
void _run(ReplicationExecutor* executor) {
- _result = _sgr->run(_executor);
+ _result = _sgr->run();
}
ScatterGatherRunner* _sgr;
@@ -161,10 +163,17 @@ private:
// Simple onCompletion function which will toggle a bool, so that we can check the logs to
// ensure the onCompletion function ran when expected.
-void onCompletionTestFunction(bool* ran) {
- *ran = true;
+executor::TaskExecutor::CallbackFn getOnCompletionTestFunction(bool* ran) {
+ auto cb = [ran](const ReplicationExecutor::CallbackArgs& cbData) {
+ if (!cbData.status.isOK()) {
+ return;
+ }
+ *ran = true;
+ };
+ return cb;
}
+
// Confirm that running via start() will finish and run the onComplete function once sufficient
// responses have been received.
// Confirm that deleting both the ScatterGatherTestAlgorithm and ScatterGatherRunner while
@@ -172,10 +181,10 @@ void onCompletionTestFunction(bool* ran) {
// completed.
TEST_F(ScatterGatherTest, DeleteAlgorithmAfterItHasCompleted) {
ScatterGatherTestAlgorithm* sga = new ScatterGatherTestAlgorithm();
- ScatterGatherRunner* sgr = new ScatterGatherRunner(sga);
+ ScatterGatherRunner* sgr = new ScatterGatherRunner(sga, getExecutor());
bool ranCompletion = false;
- StatusWith<ReplicationExecutor::EventHandle> status =
- sgr->start(getExecutor(), stdx::bind(&onCompletionTestFunction, &ranCompletion));
+ StatusWith<ReplicationExecutor::EventHandle> status = sgr->start();
+ getExecutor()->onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion));
ASSERT_OK(status.getStatus());
ASSERT_FALSE(ranCompletion);
@@ -217,10 +226,10 @@ TEST_F(ScatterGatherTest, DeleteAlgorithmAfterItHasCompleted) {
// to return ErrorCodes::ShutdownInProgress.
TEST_F(ScatterGatherTest, ShutdownExecutorBeforeRun) {
ScatterGatherTestAlgorithm sga;
- ScatterGatherRunner sgr(&sga);
+ ScatterGatherRunner sgr(&sga, getExecutor());
getExecutor()->shutdown();
sga.finish();
- Status status = sgr.run(getExecutor());
+ Status status = sgr.run();
ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, status);
}
@@ -228,14 +237,17 @@ TEST_F(ScatterGatherTest, ShutdownExecutorBeforeRun) {
// finishes will cause run() to return Status::OK().
TEST_F(ScatterGatherTest, ShutdownExecutorAfterRun) {
ScatterGatherTestAlgorithm sga;
- ScatterGatherRunner sgr(&sga);
+ ScatterGatherRunner sgr(&sga, getExecutor());
ScatterGatherRunnerRunner sgrr(&sgr, getExecutor());
sgrr.run();
// need to wait for the scatter-gather to be scheduled in the executor
NetworkInterfaceMock* net = getNet();
net->enterNetwork();
- NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest();
- net->blackHole(noi);
+ // Black hole all requests before shutdown, so that scheduleRemoteCommand will succeed.
+ for (int i = 0; i < kTotalRequests; i++) {
+ NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest();
+ net->blackHole(noi);
+ }
net->exitNetwork();
getExecutor()->shutdown();
Status status = sgrr.getResult();
@@ -246,11 +258,10 @@ TEST_F(ScatterGatherTest, ShutdownExecutorAfterRun) {
// to return ErrorCodes::ShutdownInProgress and should not run onCompletion().
TEST_F(ScatterGatherTest, ShutdownExecutorBeforeStart) {
ScatterGatherTestAlgorithm sga;
- ScatterGatherRunner sgr(&sga);
+ ScatterGatherRunner sgr(&sga, getExecutor());
getExecutor()->shutdown();
bool ranCompletion = false;
- StatusWith<ReplicationExecutor::EventHandle> status =
- sgr.start(getExecutor(), stdx::bind(&onCompletionTestFunction, &ranCompletion));
+ StatusWith<ReplicationExecutor::EventHandle> status = sgr.start();
sga.finish();
ASSERT_FALSE(ranCompletion);
ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, status.getStatus());
@@ -260,10 +271,10 @@ TEST_F(ScatterGatherTest, ShutdownExecutorBeforeStart) {
// to return Status::OK and should not run onCompletion().
TEST_F(ScatterGatherTest, ShutdownExecutorAfterStart) {
ScatterGatherTestAlgorithm sga;
- ScatterGatherRunner sgr(&sga);
+ ScatterGatherRunner sgr(&sga, getExecutor());
bool ranCompletion = false;
- StatusWith<ReplicationExecutor::EventHandle> status =
- sgr.start(getExecutor(), stdx::bind(&onCompletionTestFunction, &ranCompletion));
+ StatusWith<ReplicationExecutor::EventHandle> status = sgr.start();
+ getExecutor()->onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion));
getExecutor()->shutdown();
sga.finish();
ASSERT_FALSE(ranCompletion);
@@ -273,10 +284,10 @@ TEST_F(ScatterGatherTest, ShutdownExecutorAfterStart) {
// Confirm that responses are not processed once sufficient responses have been received.
TEST_F(ScatterGatherTest, DoNotProcessMoreThanSufficientResponses) {
ScatterGatherTestAlgorithm sga;
- ScatterGatherRunner sgr(&sga);
+ ScatterGatherRunner sgr(&sga, getExecutor());
bool ranCompletion = false;
- StatusWith<ReplicationExecutor::EventHandle> status =
- sgr.start(getExecutor(), stdx::bind(&onCompletionTestFunction, &ranCompletion));
+ StatusWith<ReplicationExecutor::EventHandle> status = sgr.start();
+ getExecutor()->onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion));
ASSERT_OK(status.getStatus());
ASSERT_FALSE(ranCompletion);
@@ -318,17 +329,17 @@ TEST_F(ScatterGatherTest, DoNotCreateCallbacksIfHasSufficientResponsesReturnsTru
ScatterGatherTestAlgorithm sga;
// set hasReceivedSufficientResponses to return true before the run starts
sga.finish();
- ScatterGatherRunner sgr(&sga);
+ ScatterGatherRunner sgr(&sga, getExecutor());
bool ranCompletion = false;
- StatusWith<ReplicationExecutor::EventHandle> status =
- sgr.start(getExecutor(), stdx::bind(&onCompletionTestFunction, &ranCompletion));
+ StatusWith<ReplicationExecutor::EventHandle> status = sgr.start();
+ getExecutor()->onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion));
ASSERT_OK(status.getStatus());
- ASSERT_TRUE(ranCompletion);
-
+ // Wait until callback finishes.
NetworkInterfaceMock* net = getNet();
net->enterNetwork();
- ASSERT_FALSE(net->hasReadyRequests());
+ net->runReadyNetworkOperations();
net->exitNetwork();
+ ASSERT_TRUE(ranCompletion);
}
#if 0
@@ -340,7 +351,7 @@ TEST_F(ScatterGatherTest, DoNotCreateCallbacksIfHasSufficientResponsesReturnsTru
ScatterGatherRunner sgr(&sga);
bool ranCompletion = false;
StatusWith<ReplicationExecutor::EventHandle> status = sgr.start(getExecutor(),
- stdx::bind(&onCompletionTestFunction, &ranCompletion));
+ getOnCompletionTestFunction(&ranCompletion));
ASSERT_OK(status.getStatus());
ASSERT_FALSE(ranCompletion);
@@ -379,7 +390,7 @@ TEST_F(ScatterGatherTest, DoNotCreateCallbacksIfHasSufficientResponsesReturnsTru
// Confirm that running via run() will finish once sufficient responses have been received.
TEST_F(ScatterGatherTest, SuccessfulScatterGatherViaRun) {
ScatterGatherTestAlgorithm sga;
- ScatterGatherRunner sgr(&sga);
+ ScatterGatherRunner sgr(&sga, getExecutor());
ScatterGatherRunnerRunner sgrr(&sgr, getExecutor());
sgrr.run();
diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h
index e6217295b95..1809d906c20 100644
--- a/src/mongo/db/repl/topology_coordinator.h
+++ b/src/mongo/db/repl/topology_coordinator.h
@@ -208,8 +208,7 @@ public:
////////////////////////////////////////////////////////////
// produces a reply to a replSetSyncFrom command
- virtual void prepareSyncFromResponse(const ReplicationExecutor::CallbackArgs& data,
- const HostAndPort& target,
+ virtual void prepareSyncFromResponse(const HostAndPort& target,
const OpTime& lastOpApplied,
BSONObjBuilder* response,
Status* result) = 0;
@@ -254,8 +253,7 @@ public:
};
// produce a reply to a status request
- virtual void prepareStatusResponse(const ReplicationExecutor::CallbackArgs& data,
- const ReplSetStatusArgs& rsStatusArgs,
+ virtual void prepareStatusResponse(const ReplSetStatusArgs& rsStatusArgs,
BSONObjBuilder* response,
Status* result) = 0;
diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp
index c03a883010d..da60dd90e1e 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl.cpp
@@ -350,16 +350,10 @@ void TopologyCoordinatorImpl::clearSyncSourceBlacklist() {
_syncSourceBlacklist.clear();
}
-void TopologyCoordinatorImpl::prepareSyncFromResponse(const ReplicationExecutor::CallbackArgs& data,
- const HostAndPort& target,
+void TopologyCoordinatorImpl::prepareSyncFromResponse(const HostAndPort& target,
const OpTime& lastOpApplied,
BSONObjBuilder* response,
Status* result) {
- if (data.status == ErrorCodes::CallbackCanceled) {
- *result = Status(ErrorCodes::ShutdownInProgress, "replication system is shutting down");
- return;
- }
-
response->append("syncFromRequested", target.toString());
if (_selfIndex == -1) {
@@ -1507,15 +1501,9 @@ const MemberConfig* TopologyCoordinatorImpl::_currentPrimaryMember() const {
return &(_rsConfig.getMemberAt(_currentPrimaryIndex));
}
-void TopologyCoordinatorImpl::prepareStatusResponse(const ReplicationExecutor::CallbackArgs& data,
- const ReplSetStatusArgs& rsStatusArgs,
+void TopologyCoordinatorImpl::prepareStatusResponse(const ReplSetStatusArgs& rsStatusArgs,
BSONObjBuilder* response,
Status* result) {
- if (data.status == ErrorCodes::CallbackCanceled) {
- *result = Status(ErrorCodes::ShutdownInProgress, "replication system is shutting down");
- return;
- }
-
// output for each member
vector<BSONObj> membersOut;
const MemberState myState = getMemberState();
diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h
index f86c0e8d928..597a1cc6bc3 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.h
+++ b/src/mongo/db/repl/topology_coordinator_impl.h
@@ -164,8 +164,7 @@ public:
virtual void setElectionSleepUntil(Date_t newTime);
virtual void setFollowerMode(MemberState::MS newMode);
virtual void adjustMaintenanceCountBy(int inc);
- virtual void prepareSyncFromResponse(const ReplicationExecutor::CallbackArgs& data,
- const HostAndPort& target,
+ virtual void prepareSyncFromResponse(const HostAndPort& target,
const OpTime& lastOpApplied,
BSONObjBuilder* response,
Status* result);
@@ -191,8 +190,7 @@ public:
const OpTime& lastOpApplied,
const OpTime& lastOpDurable,
ReplSetHeartbeatResponse* response);
- virtual void prepareStatusResponse(const ReplicationExecutor::CallbackArgs& data,
- const ReplSetStatusArgs& rsStatusArgs,
+ virtual void prepareStatusResponse(const ReplSetStatusArgs& rsStatusArgs,
BSONObjBuilder* response,
Status* result);
virtual void fillIsMasterForReplSet(IsMasterResponse* response);
diff --git a/src/mongo/db/repl/topology_coordinator_impl_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_test.cpp
index 27711e3fcf7..24c49837c2c 100644
--- a/src/mongo/db/repl/topology_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl_test.cpp
@@ -829,8 +829,7 @@ TEST_F(TopoCoordTest, NodeReturnsNotSecondaryWhenSyncFromIsRunPriorToHavingAConf
BSONObjBuilder response;
// if we do not have an index in the config, we should get ErrorCodes::NotSecondary
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h1"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h1"), ourOpTime, &response, &result);
ASSERT_EQUALS(ErrorCodes::NotSecondary, result);
ASSERT_EQUALS("Removed and uninitialized nodes do not sync", result.reason());
}
@@ -854,8 +853,7 @@ TEST_F(TopoCoordTest, NodeReturnsNotSecondaryWhenSyncFromIsRunAgainstArbiter) {
<< "h1"))),
0);
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h1"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h1"), ourOpTime, &response, &result);
ASSERT_EQUALS(ErrorCodes::NotSecondary, result);
ASSERT_EQUALS("arbiters don't sync", result.reason());
}
@@ -891,8 +889,7 @@ TEST_F(TopoCoordTest, NodeReturnsNotSecondaryWhenSyncFromIsRunAgainstPrimary) {
makeSelfPrimary();
ASSERT_EQUALS(0, getCurrentPrimaryIndex());
getTopoCoord()._setCurrentPrimaryForTest(0);
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h3"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h3"), ourOpTime, &response, &result);
ASSERT_EQUALS(ErrorCodes::NotSecondary, result);
ASSERT_EQUALS("primaries don't sync", result.reason());
ASSERT_EQUALS("h3:27017", response.obj()["syncFromRequested"].String());
@@ -926,7 +923,7 @@ TEST_F(TopoCoordTest, NodeReturnsNodeNotFoundWhenSyncFromRequestsANodeNotInConfi
setSelfMemberState(MemberState::RS_SECONDARY);
getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("fakemember"), ourOpTime, &response, &result);
+ HostAndPort("fakemember"), ourOpTime, &response, &result);
ASSERT_EQUALS(ErrorCodes::NodeNotFound, result);
ASSERT_EQUALS("Could not find member \"fakemember:27017\" in replica set", result.reason());
}
@@ -959,8 +956,7 @@ TEST_F(TopoCoordTest, NodeReturnsInvalidOptionsWhenSyncFromRequestsSelf) {
setSelfMemberState(MemberState::RS_SECONDARY);
// Try to sync from self
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("hself"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("hself"), ourOpTime, &response, &result);
ASSERT_EQUALS(ErrorCodes::InvalidOptions, result);
ASSERT_EQUALS("I cannot sync from myself", result.reason());
}
@@ -994,8 +990,7 @@ TEST_F(TopoCoordTest, NodeReturnsInvalidOptionsWhenSyncFromRequestsArbiter) {
// Try to sync from an arbiter
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h1"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h1"), ourOpTime, &response, &result);
ASSERT_EQUALS(ErrorCodes::InvalidOptions, result);
ASSERT_EQUALS("Cannot sync from \"h1:27017\" because it is an arbiter", result.reason());
}
@@ -1028,8 +1023,7 @@ TEST_F(TopoCoordTest, NodeReturnsInvalidOptionsWhenSyncFromRequestsAnIndexNonbui
setSelfMemberState(MemberState::RS_SECONDARY);
// Try to sync from a node that doesn't build indexes
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h2"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h2"), ourOpTime, &response, &result);
ASSERT_EQUALS(ErrorCodes::InvalidOptions, result);
ASSERT_EQUALS("Cannot sync from \"h2:27017\" because it does not build indexes",
result.reason());
@@ -1065,8 +1059,7 @@ TEST_F(TopoCoordTest, NodeReturnsHostUnreachableWhenSyncFromRequestsADownNode) {
// Try to sync from a member that is down
receiveDownHeartbeat(HostAndPort("h4"), "rs0", OpTime());
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h4"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h4"), ourOpTime, &response, &result);
ASSERT_EQUALS(ErrorCodes::HostUnreachable, result);
ASSERT_EQUALS("I cannot reach the requested member: h4:27017", result.reason());
}
@@ -1102,8 +1095,7 @@ TEST_F(TopoCoordTest, ChooseRequestedNodeWhenSyncFromRequestsAStaleNode) {
heartbeatFromMember(
HostAndPort("h5"), "rs0", MemberState::RS_SECONDARY, staleOpTime, Milliseconds(100));
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h5"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h5"), ourOpTime, &response, &result);
ASSERT_OK(result);
ASSERT_EQUALS("requested member \"h5:27017\" is more than 10 seconds behind us",
response.obj()["warning"].String());
@@ -1142,8 +1134,7 @@ TEST_F(TopoCoordTest, ChooseRequestedNodeWhenSyncFromRequestsAValidNode) {
heartbeatFromMember(
HostAndPort("h6"), "rs0", MemberState::RS_SECONDARY, ourOpTime, Milliseconds(100));
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h6"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h6"), ourOpTime, &response, &result);
ASSERT_OK(result);
BSONObj responseObj = response.obj();
ASSERT_FALSE(responseObj.hasField("warning"));
@@ -1183,8 +1174,7 @@ TEST_F(TopoCoordTest,
HostAndPort("h6"), "rs0", MemberState::RS_SECONDARY, ourOpTime, Milliseconds(100));
// node goes down between forceSync and chooseNewSyncSource
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h6"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h6"), ourOpTime, &response, &result);
BSONObj responseObj = response.obj();
ASSERT_FALSE(responseObj.hasField("warning"));
receiveDownHeartbeat(HostAndPort("h6"), "rs0", OpTime());
@@ -1222,8 +1212,7 @@ TEST_F(TopoCoordTest, NodeReturnsUnauthorizedWhenSyncFromRequestsANodeWeAreNotAu
// Try to sync from a member that is unauth'd
receiveDownHeartbeat(HostAndPort("h5"), "rs0", OpTime(), ErrorCodes::Unauthorized);
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h5"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h5"), ourOpTime, &response, &result);
ASSERT_NOT_OK(result);
ASSERT_EQUALS(ErrorCodes::Unauthorized, result.code());
ASSERT_EQUALS("not authorized to communicate with h5:27017", result.reason());
@@ -1245,8 +1234,7 @@ TEST_F(TopoCoordTest, NodeReturnsInvalidOptionsWhenAskedToSyncFromANonVoterAsAVo
"]}"),
0);
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h2"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h2"), ourOpTime, &response, &result);
ASSERT_EQUALS(ErrorCodes::InvalidOptions, result);
ASSERT_EQUALS("Cannot sync from \"h2:27017\" because it is not a voter", result.reason());
}
@@ -1284,8 +1272,7 @@ TEST_F(TopoCoordTest,
heartbeatFromMember(
HostAndPort("h5"), "rs0", MemberState::RS_SECONDARY, ourOpTime, Milliseconds(100));
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h5"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h5"), ourOpTime, &response, &result);
ASSERT_OK(result);
BSONObj responseObj = response.obj();
ASSERT_FALSE(responseObj.hasField("warning"));
@@ -1297,8 +1284,7 @@ TEST_F(TopoCoordTest,
HostAndPort("h6"), "rs0", MemberState::RS_SECONDARY, ourOpTime, Milliseconds(100));
// Sync successfully from another up-to-date member.
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h6"), ourOpTime, &response2, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h6"), ourOpTime, &response2, &result);
BSONObj response2Obj = response2.obj();
ASSERT_FALSE(response2Obj.hasField("warning"));
ASSERT_EQUALS(HostAndPort("h5").toString(), response2Obj["prevSyncTarget"].String());
@@ -1372,7 +1358,6 @@ TEST_F(TopoCoordTest, ReplSetGetStatus) {
BSONObjBuilder statusBuilder;
Status resultStatus(ErrorCodes::InternalError, "prepareStatusResponse didn't set result");
getTopoCoord().prepareStatusResponse(
- cbData(),
TopologyCoordinator::ReplSetStatusArgs{
curTime,
static_cast<unsigned>(durationCount<Seconds>(uptimeSecs)),
@@ -1488,7 +1473,6 @@ TEST_F(TopoCoordTest, NodeReturnsInvalidReplicaSetConfigInResponseToGetStatusWhe
BSONObjBuilder statusBuilder;
Status resultStatus(ErrorCodes::InternalError, "prepareStatusResponse didn't set result");
getTopoCoord().prepareStatusResponse(
- cbData(),
TopologyCoordinator::ReplSetStatusArgs{
curTime,
static_cast<unsigned>(durationCount<Seconds>(uptimeSecs)),
@@ -2187,7 +2171,6 @@ public:
BSONObjBuilder statusBuilder;
Status resultStatus(ErrorCodes::InternalError, "prepareStatusResponse didn't set result");
getTopoCoord().prepareStatusResponse(
- cbData(),
TopologyCoordinator::ReplSetStatusArgs{_firstRequestDate + Milliseconds(4000),
10,
OpTime(Timestamp(100, 0), 0),
@@ -2251,7 +2234,6 @@ public:
BSONObjBuilder statusBuilder;
Status resultStatus(ErrorCodes::InternalError, "prepareStatusResponse didn't set result");
getTopoCoord().prepareStatusResponse(
- cbData(),
TopologyCoordinator::ReplSetStatusArgs{firstRequestDate() + Seconds(4),
10,
OpTime(Timestamp(100, 0), 0),
@@ -2568,7 +2550,6 @@ TEST_F(HeartbeatResponseTestTwoRetries, NodeDoesNotRetryHeartbeatsAfterFailingTw
BSONObjBuilder statusBuilder;
Status resultStatus(ErrorCodes::InternalError, "prepareStatusResponse didn't set result");
getTopoCoord().prepareStatusResponse(
- cbData(),
TopologyCoordinator::ReplSetStatusArgs{firstRequestDate() + Milliseconds(4900),
10,
OpTime(Timestamp(100, 0), 0),
@@ -2810,7 +2791,6 @@ TEST_F(HeartbeatResponseTestTwoRetries,
BSONObjBuilder statusBuilder;
Status resultStatus(ErrorCodes::InternalError, "prepareStatusResponse didn't set result");
getTopoCoord().prepareStatusResponse(
- cbData(),
TopologyCoordinator::ReplSetStatusArgs{firstRequestDate() + Milliseconds(7000),
600,
OpTime(Timestamp(100, 0), 0),
@@ -4238,42 +4218,6 @@ TEST_F(TopoCoordTest,
ASSERT(TopologyCoordinator::Role::candidate == getTopoCoord().getRole());
}
-class ShutdownInProgressTest : public TopoCoordTest {
-public:
- ShutdownInProgressTest()
- : ourCbData(NULL,
- ReplicationExecutor::CallbackHandle(),
- Status(ErrorCodes::CallbackCanceled, "")) {}
-
- virtual ReplicationExecutor::CallbackArgs cbData() {
- return ourCbData;
- }
-
-private:
- ReplicationExecutor::CallbackArgs ourCbData;
-};
-
-TEST_F(ShutdownInProgressTest, NodeReturnsShutdownInProgressWhenSyncFromCallbackCanceled) {
- Status result = Status::OK();
- BSONObjBuilder response;
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("host2:27017"), OpTime(), &response, &result);
- ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, result);
- ASSERT_TRUE(response.obj().isEmpty());
-}
-
-TEST_F(ShutdownInProgressTest, NodeReturnsShutDownInProgressWhenGetReplSetStatusCallbackCanceled) {
- Status result = Status::OK();
- BSONObjBuilder response;
- getTopoCoord().prepareStatusResponse(
- cbData(),
- TopologyCoordinator::ReplSetStatusArgs{Date_t(), 0, OpTime(), OpTime(), OpTime(), OpTime()},
- &response,
- &result);
- ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, result);
- ASSERT_TRUE(response.obj().isEmpty());
-}
-
class PrepareHeartbeatResponseTest : public TopoCoordTest {
public:
virtual void setUp() {
diff --git a/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp
index 70cbf8b97aa..6adf5542212 100644
--- a/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp
@@ -822,8 +822,7 @@ TEST_F(TopoCoordTest, NodeReturnsNotSecondaryWhenSyncFromIsRunPriorToHavingAConf
BSONObjBuilder response;
// if we do not have an index in the config, we should get ErrorCodes::NotSecondary
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h1"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h1"), ourOpTime, &response, &result);
ASSERT_EQUALS(ErrorCodes::NotSecondary, result);
ASSERT_EQUALS("Removed and uninitialized nodes do not sync", result.reason());
}
@@ -847,8 +846,7 @@ TEST_F(TopoCoordTest, NodeReturnsNotSecondaryWhenSyncFromIsRunAgainstArbiter) {
<< "h1"))),
0);
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h1"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h1"), ourOpTime, &response, &result);
ASSERT_EQUALS(ErrorCodes::NotSecondary, result);
ASSERT_EQUALS("arbiters don't sync", result.reason());
}
@@ -884,8 +882,7 @@ TEST_F(TopoCoordTest, NodeReturnsNotSecondaryWhenSyncFromIsRunAgainstPrimary) {
makeSelfPrimary();
ASSERT_EQUALS(0, getCurrentPrimaryIndex());
getTopoCoord()._setCurrentPrimaryForTest(0);
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h3"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h3"), ourOpTime, &response, &result);
ASSERT_EQUALS(ErrorCodes::NotSecondary, result);
ASSERT_EQUALS("primaries don't sync", result.reason());
ASSERT_EQUALS("h3:27017", response.obj()["syncFromRequested"].String());
@@ -919,7 +916,7 @@ TEST_F(TopoCoordTest, NodeReturnsNodeNotFoundWhenSyncFromRequestsANodeNotInConfi
setSelfMemberState(MemberState::RS_SECONDARY);
getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("fakemember"), ourOpTime, &response, &result);
+ HostAndPort("fakemember"), ourOpTime, &response, &result);
ASSERT_EQUALS(ErrorCodes::NodeNotFound, result);
ASSERT_EQUALS("Could not find member \"fakemember:27017\" in replica set", result.reason());
}
@@ -952,8 +949,7 @@ TEST_F(TopoCoordTest, NodeReturnsInvalidOptionsWhenSyncFromRequestsSelf) {
setSelfMemberState(MemberState::RS_SECONDARY);
// Try to sync from self
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("hself"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("hself"), ourOpTime, &response, &result);
ASSERT_EQUALS(ErrorCodes::InvalidOptions, result);
ASSERT_EQUALS("I cannot sync from myself", result.reason());
}
@@ -987,8 +983,7 @@ TEST_F(TopoCoordTest, NodeReturnsInvalidOptionsWhenSyncFromRequestsArbiter) {
// Try to sync from an arbiter
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h1"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h1"), ourOpTime, &response, &result);
ASSERT_EQUALS(ErrorCodes::InvalidOptions, result);
ASSERT_EQUALS("Cannot sync from \"h1:27017\" because it is an arbiter", result.reason());
}
@@ -1021,8 +1016,7 @@ TEST_F(TopoCoordTest, NodeReturnsInvalidOptionsWhenSyncFromRequestsAnIndexNonbui
setSelfMemberState(MemberState::RS_SECONDARY);
// Try to sync from a node that doesn't build indexes
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h2"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h2"), ourOpTime, &response, &result);
ASSERT_EQUALS(ErrorCodes::InvalidOptions, result);
ASSERT_EQUALS("Cannot sync from \"h2:27017\" because it does not build indexes",
result.reason());
@@ -1058,8 +1052,7 @@ TEST_F(TopoCoordTest, NodeReturnsHostUnreachableWhenSyncFromRequestsADownNode) {
// Try to sync from a member that is down
receiveDownHeartbeat(HostAndPort("h4"), "rs0", OpTime());
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h4"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h4"), ourOpTime, &response, &result);
ASSERT_EQUALS(ErrorCodes::HostUnreachable, result);
ASSERT_EQUALS("I cannot reach the requested member: h4:27017", result.reason());
}
@@ -1095,8 +1088,7 @@ TEST_F(TopoCoordTest, ChooseRequestedNodeWhenSyncFromRequestsAStaleNode) {
heartbeatFromMember(
HostAndPort("h5"), "rs0", MemberState::RS_SECONDARY, staleOpTime, Milliseconds(100));
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h5"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h5"), ourOpTime, &response, &result);
ASSERT_OK(result);
ASSERT_EQUALS("requested member \"h5:27017\" is more than 10 seconds behind us",
response.obj()["warning"].String());
@@ -1135,8 +1127,7 @@ TEST_F(TopoCoordTest, ChooseRequestedNodeWhenSyncFromRequestsAValidNode) {
heartbeatFromMember(
HostAndPort("h6"), "rs0", MemberState::RS_SECONDARY, ourOpTime, Milliseconds(100));
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h6"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h6"), ourOpTime, &response, &result);
ASSERT_OK(result);
BSONObj responseObj = response.obj();
ASSERT_FALSE(responseObj.hasField("warning"));
@@ -1176,8 +1167,7 @@ TEST_F(TopoCoordTest,
HostAndPort("h6"), "rs0", MemberState::RS_SECONDARY, ourOpTime, Milliseconds(100));
// node goes down between forceSync and chooseNewSyncSource
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h6"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h6"), ourOpTime, &response, &result);
BSONObj responseObj = response.obj();
ASSERT_FALSE(responseObj.hasField("warning"));
receiveDownHeartbeat(HostAndPort("h6"), "rs0", OpTime());
@@ -1215,8 +1205,7 @@ TEST_F(TopoCoordTest, NodeReturnsUnauthorizedWhenSyncFromRequestsANodeWeAreNotAu
// Try to sync from a member that is unauth'd
receiveDownHeartbeat(HostAndPort("h5"), "rs0", OpTime(), ErrorCodes::Unauthorized);
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h5"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h5"), ourOpTime, &response, &result);
ASSERT_NOT_OK(result);
ASSERT_EQUALS(ErrorCodes::Unauthorized, result.code());
ASSERT_EQUALS("not authorized to communicate with h5:27017", result.reason());
@@ -1238,8 +1227,7 @@ TEST_F(TopoCoordTest, NodeReturnsInvalidOptionsWhenAskedToSyncFromANonVoterAsAVo
"]}"),
0);
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h2"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h2"), ourOpTime, &response, &result);
ASSERT_EQUALS(ErrorCodes::InvalidOptions, result);
ASSERT_EQUALS("Cannot sync from \"h2:27017\" because it is not a voter", result.reason());
}
@@ -1277,8 +1265,7 @@ TEST_F(TopoCoordTest,
heartbeatFromMember(
HostAndPort("h5"), "rs0", MemberState::RS_SECONDARY, ourOpTime, Milliseconds(100));
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h5"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h5"), ourOpTime, &response, &result);
ASSERT_OK(result);
BSONObj responseObj = response.obj();
ASSERT_FALSE(responseObj.hasField("warning"));
@@ -1290,8 +1277,7 @@ TEST_F(TopoCoordTest,
HostAndPort("h6"), "rs0", MemberState::RS_SECONDARY, ourOpTime, Milliseconds(100));
// Sync successfully from another up-to-date member.
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h6"), ourOpTime, &response2, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h6"), ourOpTime, &response2, &result);
BSONObj response2Obj = response2.obj();
ASSERT_FALSE(response2Obj.hasField("warning"));
ASSERT_EQUALS(HostAndPort("h5").toString(), response2Obj["prevSyncTarget"].String());
@@ -1365,7 +1351,6 @@ TEST_F(TopoCoordTest, ReplSetGetStatus) {
BSONObjBuilder statusBuilder;
Status resultStatus(ErrorCodes::InternalError, "prepareStatusResponse didn't set result");
getTopoCoord().prepareStatusResponse(
- cbData(),
TopologyCoordinator::ReplSetStatusArgs{
curTime,
static_cast<unsigned>(durationCount<Seconds>(uptimeSecs)),
@@ -1481,7 +1466,6 @@ TEST_F(TopoCoordTest, NodeReturnsInvalidReplicaSetConfigInResponseToGetStatusWhe
BSONObjBuilder statusBuilder;
Status resultStatus(ErrorCodes::InternalError, "prepareStatusResponse didn't set result");
getTopoCoord().prepareStatusResponse(
- cbData(),
TopologyCoordinator::ReplSetStatusArgs{
curTime,
static_cast<unsigned>(durationCount<Seconds>(uptimeSecs)),
@@ -1518,42 +1502,6 @@ TEST_F(TopoCoordTest, HeartbeatFrequencyShouldBeHalfElectionTimeoutWhenArbiter)
ASSERT_EQUALS(expected, action.getNextHeartbeatStartDate());
}
-class ShutdownInProgressTest : public TopoCoordTest {
-public:
- ShutdownInProgressTest()
- : ourCbData(NULL,
- ReplicationExecutor::CallbackHandle(),
- Status(ErrorCodes::CallbackCanceled, "")) {}
-
- virtual ReplicationExecutor::CallbackArgs cbData() {
- return ourCbData;
- }
-
-private:
- ReplicationExecutor::CallbackArgs ourCbData;
-};
-
-TEST_F(ShutdownInProgressTest, NodeReturnsShutdownInProgressWhenSyncFromCallbackCanceled) {
- Status result = Status::OK();
- BSONObjBuilder response;
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("host2:27017"), OpTime(), &response, &result);
- ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, result);
- ASSERT_TRUE(response.obj().isEmpty());
-}
-
-TEST_F(ShutdownInProgressTest, NodeReturnsShutDownInProgressWhenGetReplSetStatusCallbackCanceled) {
- Status result = Status::OK();
- BSONObjBuilder response;
- getTopoCoord().prepareStatusResponse(
- cbData(),
- TopologyCoordinator::ReplSetStatusArgs{Date_t(), 0, OpTime(), OpTime(), OpTime(), OpTime()},
- &response,
- &result);
- ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, result);
- ASSERT_TRUE(response.obj().isEmpty());
-}
-
class PrepareHeartbeatResponseV1Test : public TopoCoordTest {
public:
virtual void setUp() {
@@ -4051,7 +3999,6 @@ public:
BSONObjBuilder statusBuilder;
Status resultStatus(ErrorCodes::InternalError, "prepareStatusResponse didn't set result");
getTopoCoord().prepareStatusResponse(
- cbData(),
TopologyCoordinator::ReplSetStatusArgs{_firstRequestDate + Milliseconds(4000),
10,
OpTime(Timestamp(100, 0), 0),
@@ -4135,7 +4082,6 @@ public:
BSONObjBuilder statusBuilder;
Status resultStatus(ErrorCodes::InternalError, "prepareStatusResponse didn't set result");
getTopoCoord().prepareStatusResponse(
- cbData(),
TopologyCoordinator::ReplSetStatusArgs{firstRequestDate() + Seconds(4),
10,
OpTime(Timestamp(100, 0), 0),
@@ -4184,7 +4130,6 @@ TEST_F(HeartbeatResponseTestTwoRetriesV1, NodeDoesNotRetryHeartbeatsAfterFailing
BSONObjBuilder statusBuilder;
Status resultStatus(ErrorCodes::InternalError, "prepareStatusResponse didn't set result");
getTopoCoord().prepareStatusResponse(
- cbData(),
TopologyCoordinator::ReplSetStatusArgs{firstRequestDate() + Milliseconds(4900),
10,
OpTime(Timestamp(100, 0), 0),
@@ -4245,7 +4190,6 @@ TEST_F(HeartbeatResponseTestTwoRetriesV1, HeartbeatThreeNonconsecutiveFailures)
BSONObjBuilder statusBuilder;
Status resultStatus(ErrorCodes::InternalError, "prepareStatusResponse didn't set result");
getTopoCoord().prepareStatusResponse(
- cbData(),
TopologyCoordinator::ReplSetStatusArgs{firstRequestDate() + Milliseconds(7000),
600,
OpTime(Timestamp(100, 0), 0),
diff --git a/src/mongo/db/repl/vote_requester.cpp b/src/mongo/db/repl/vote_requester.cpp
index eafa68715e2..836617e5493 100644
--- a/src/mongo/db/repl/vote_requester.cpp
+++ b/src/mongo/db/repl/vote_requester.cpp
@@ -141,22 +141,20 @@ unordered_set<HostAndPort> VoteRequester::Algorithm::getResponders() const {
VoteRequester::VoteRequester() : _isCanceled(false) {}
VoteRequester::~VoteRequester() {}
-StatusWith<ReplicationExecutor::EventHandle> VoteRequester::start(
- ReplicationExecutor* executor,
- const ReplicaSetConfig& rsConfig,
- long long candidateIndex,
- long long term,
- bool dryRun,
- OpTime lastDurableOpTime,
- const stdx::function<void()>& onCompletion) {
+StatusWith<ReplicationExecutor::EventHandle> VoteRequester::start(ReplicationExecutor* executor,
+ const ReplicaSetConfig& rsConfig,
+ long long candidateIndex,
+ long long term,
+ bool dryRun,
+ OpTime lastDurableOpTime) {
_algorithm.reset(new Algorithm(rsConfig, candidateIndex, term, dryRun, lastDurableOpTime));
- _runner.reset(new ScatterGatherRunner(_algorithm.get()));
- return _runner->start(executor, onCompletion);
+ _runner.reset(new ScatterGatherRunner(_algorithm.get(), executor));
+ return _runner->start();
}
-void VoteRequester::cancel(ReplicationExecutor* executor) {
+void VoteRequester::cancel() {
_isCanceled = true;
- _runner->cancel(executor);
+ _runner->cancel();
}
VoteRequester::Result VoteRequester::getResult() const {
diff --git a/src/mongo/db/repl/vote_requester.h b/src/mongo/db/repl/vote_requester.h
index 433facf345c..cb0be7fc888 100644
--- a/src/mongo/db/repl/vote_requester.h
+++ b/src/mongo/db/repl/vote_requester.h
@@ -105,26 +105,19 @@ public:
* in currentConfig, in attempt to receive sufficient votes to win the election.
*
* evh can be used to schedule a callback when the process is complete.
- * This function must be run in the executor, as it must be synchronous with the command
- * callbacks that it schedules.
* If this function returns Status::OK(), evh is then guaranteed to be signaled.
**/
- StatusWith<ReplicationExecutor::EventHandle> start(
- ReplicationExecutor* executor,
- const ReplicaSetConfig& rsConfig,
- long long candidateIndex,
- long long term,
- bool dryRun,
- OpTime lastDurableOpTime,
- const stdx::function<void()>& onCompletion = stdx::function<void()>());
+ StatusWith<ReplicationExecutor::EventHandle> start(ReplicationExecutor* executor,
+ const ReplicaSetConfig& rsConfig,
+ long long candidateIndex,
+ long long term,
+ bool dryRun,
+ OpTime lastDurableOpTime);
/**
- * Informs the VoteRequester to cancel further processing. The "executor"
- * argument must point to the same executor passed to "start()".
- *
- * Like start(), this method must run in the executor context.
+ * Informs the VoteRequester to cancel further processing.
*/
- void cancel(ReplicationExecutor* executor);
+ void cancel();
Result getResult() const;
unordered_set<HostAndPort> getResponders() const;