summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/repl/check_quorum_for_config_change.cpp4
-rw-r--r--src/mongo/db/repl/elect_cmd_runner.cpp11
-rw-r--r--src/mongo/db/repl/elect_cmd_runner.h17
-rw-r--r--src/mongo/db/repl/election_winner_declarer.cpp8
-rw-r--r--src/mongo/db/repl/election_winner_declarer.h9
-rw-r--r--src/mongo/db/repl/freshness_checker.cpp11
-rw-r--r--src/mongo/db/repl/freshness_checker.h21
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp877
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h207
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_elect.cpp49
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp8
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp45
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp8
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp86
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_test.cpp14
-rw-r--r--src/mongo/db/repl/scatter_gather_runner.cpp123
-rw-r--r--src/mongo/db/repl/scatter_gather_runner.h97
-rw-r--r--src/mongo/db/repl/scatter_gather_test.cpp69
-rw-r--r--src/mongo/db/repl/topology_coordinator.h6
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.cpp16
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.h6
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl_test.cpp86
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp86
-rw-r--r--src/mongo/db/repl/vote_requester.cpp22
-rw-r--r--src/mongo/db/repl/vote_requester.h23
25 files changed, 670 insertions, 1239 deletions
diff --git a/src/mongo/db/repl/check_quorum_for_config_change.cpp b/src/mongo/db/repl/check_quorum_for_config_change.cpp
index ae1a1f9c7fa..583b3df37d8 100644
--- a/src/mongo/db/repl/check_quorum_for_config_change.cpp
+++ b/src/mongo/db/repl/check_quorum_for_config_change.cpp
@@ -282,8 +282,8 @@ Status checkQuorumGeneral(ReplicationExecutor* executor,
const ReplicaSetConfig& rsConfig,
const int myIndex) {
QuorumChecker checker(&rsConfig, myIndex);
- ScatterGatherRunner runner(&checker);
- Status status = runner.run(executor);
+ ScatterGatherRunner runner(&checker, executor);
+ Status status = runner.run();
if (!status.isOK()) {
return status;
}
diff --git a/src/mongo/db/repl/elect_cmd_runner.cpp b/src/mongo/db/repl/elect_cmd_runner.cpp
index 15bf827b1da..dc2bb2e240d 100644
--- a/src/mongo/db/repl/elect_cmd_runner.cpp
+++ b/src/mongo/db/repl/elect_cmd_runner.cpp
@@ -131,16 +131,15 @@ StatusWith<ReplicationExecutor::EventHandle> ElectCmdRunner::start(
ReplicationExecutor* executor,
const ReplicaSetConfig& currentConfig,
int selfIndex,
- const std::vector<HostAndPort>& targets,
- const stdx::function<void()>& onCompletion) {
+ const std::vector<HostAndPort>& targets) {
_algorithm.reset(new Algorithm(currentConfig, selfIndex, targets, OID::gen()));
- _runner.reset(new ScatterGatherRunner(_algorithm.get()));
- return _runner->start(executor, onCompletion);
+ _runner.reset(new ScatterGatherRunner(_algorithm.get(), executor));
+ return _runner->start();
}
-void ElectCmdRunner::cancel(ReplicationExecutor* executor) {
+void ElectCmdRunner::cancel() {
_isCanceled = true;
- _runner->cancel(executor);
+ _runner->cancel();
}
int ElectCmdRunner::getReceivedVotes() const {
diff --git a/src/mongo/db/repl/elect_cmd_runner.h b/src/mongo/db/repl/elect_cmd_runner.h
index ad12703b68e..32539b2cd26 100644
--- a/src/mongo/db/repl/elect_cmd_runner.h
+++ b/src/mongo/db/repl/elect_cmd_runner.h
@@ -90,20 +90,15 @@ public:
*
* Returned handle can be used to schedule a callback when the process is complete.
*/
- StatusWith<ReplicationExecutor::EventHandle> start(
- ReplicationExecutor* executor,
- const ReplicaSetConfig& currentConfig,
- int selfIndex,
- const std::vector<HostAndPort>& targets,
- const stdx::function<void()>& onCompletion = stdx::function<void()>());
+ StatusWith<ReplicationExecutor::EventHandle> start(ReplicationExecutor* executor,
+ const ReplicaSetConfig& currentConfig,
+ int selfIndex,
+ const std::vector<HostAndPort>& targets);
/**
- * Informs the ElectCmdRunner to cancel further processing. The "executor"
- * argument must point to the same executor passed to "start()".
- *
- * Like start(), this method must run in the executor context.
+ * Informs the ElectCmdRunner to cancel further processing.
*/
- void cancel(ReplicationExecutor* executor);
+ void cancel();
/**
* Returns the number of received votes. Only valid to call after
diff --git a/src/mongo/db/repl/election_winner_declarer.cpp b/src/mongo/db/repl/election_winner_declarer.cpp
index 9cb43a4c54b..6f2ed66e8a6 100644
--- a/src/mongo/db/repl/election_winner_declarer.cpp
+++ b/src/mongo/db/repl/election_winner_declarer.cpp
@@ -105,13 +105,13 @@ StatusWith<ReplicationExecutor::EventHandle> ElectionWinnerDeclarer::start(
const std::vector<HostAndPort>& targets,
const stdx::function<void()>& onCompletion) {
_algorithm.reset(new Algorithm(setName, winnerId, term, targets));
- _runner.reset(new ScatterGatherRunner(_algorithm.get()));
- return _runner->start(executor, onCompletion);
+ _runner.reset(new ScatterGatherRunner(_algorithm.get(), executor));
+ return _runner->start();
}
-void ElectionWinnerDeclarer::cancel(ReplicationExecutor* executor) {
+void ElectionWinnerDeclarer::cancel() {
_isCanceled = true;
- _runner->cancel(executor);
+ _runner->cancel();
}
Status ElectionWinnerDeclarer::getStatus() const {
diff --git a/src/mongo/db/repl/election_winner_declarer.h b/src/mongo/db/repl/election_winner_declarer.h
index c73f2ca3419..346fc11d252 100644
--- a/src/mongo/db/repl/election_winner_declarer.h
+++ b/src/mongo/db/repl/election_winner_declarer.h
@@ -90,8 +90,6 @@ public:
* in currentConfig, with the intention of alerting them of a new primary.
*
* evh can be used to schedule a callback when the process is complete.
- * This function must be run in the executor, as it must be synchronous with the command
- * callbacks that it schedules.
* If this function returns Status::OK(), evh is then guaranteed to be signaled.
**/
StatusWith<ReplicationExecutor::EventHandle> start(
@@ -103,12 +101,9 @@ public:
const stdx::function<void()>& onCompletion = stdx::function<void()>());
/**
- * Informs the ElectionWinnerDeclarer to cancel further processing. The "executor"
- * argument must point to the same executor passed to "start()".
- *
- * Like start(), this method must run in the executor context.
+ * Informs the ElectionWinnerDeclarer to cancel further processing.
*/
- void cancel(ReplicationExecutor* executor);
+ void cancel();
/**
* Returns a Status from the ElectionWinnerDeclarer::algorithm which indicates what
diff --git a/src/mongo/db/repl/freshness_checker.cpp b/src/mongo/db/repl/freshness_checker.cpp
index 71b7b2e419b..f5bd3963e63 100644
--- a/src/mongo/db/repl/freshness_checker.cpp
+++ b/src/mongo/db/repl/freshness_checker.cpp
@@ -208,17 +208,16 @@ StatusWith<ReplicationExecutor::EventHandle> FreshnessChecker::start(
const Timestamp& lastOpTimeApplied,
const ReplicaSetConfig& currentConfig,
int selfIndex,
- const std::vector<HostAndPort>& targets,
- const stdx::function<void()>& onCompletion) {
+ const std::vector<HostAndPort>& targets) {
_originalConfigVersion = currentConfig.getConfigVersion();
_algorithm.reset(new Algorithm(lastOpTimeApplied, currentConfig, selfIndex, targets));
- _runner.reset(new ScatterGatherRunner(_algorithm.get()));
- return _runner->start(executor, onCompletion);
+ _runner.reset(new ScatterGatherRunner(_algorithm.get(), executor));
+ return _runner->start();
}
-void FreshnessChecker::cancel(ReplicationExecutor* executor) {
+void FreshnessChecker::cancel() {
_isCanceled = true;
- _runner->cancel(executor);
+ _runner->cancel();
}
} // namespace repl
diff --git a/src/mongo/db/repl/freshness_checker.h b/src/mongo/db/repl/freshness_checker.h
index 43dd77eee69..6da248a203e 100644
--- a/src/mongo/db/repl/freshness_checker.h
+++ b/src/mongo/db/repl/freshness_checker.h
@@ -116,25 +116,18 @@ public:
* in currentConfig, with the intention of determining whether the current node
* is freshest.
* evh can be used to schedule a callback when the process is complete.
- * This function must be run in the executor, as it must be synchronous with the command
- * callbacks that it schedules.
* If this function returns Status::OK(), evh is then guaranteed to be signaled.
**/
- StatusWith<ReplicationExecutor::EventHandle> start(
- ReplicationExecutor* executor,
- const Timestamp& lastOpTimeApplied,
- const ReplicaSetConfig& currentConfig,
- int selfIndex,
- const std::vector<HostAndPort>& targets,
- const stdx::function<void()>& onCompletion = stdx::function<void()>());
+ StatusWith<ReplicationExecutor::EventHandle> start(ReplicationExecutor* executor,
+ const Timestamp& lastOpTimeApplied,
+ const ReplicaSetConfig& currentConfig,
+ int selfIndex,
+ const std::vector<HostAndPort>& targets);
/**
- * Informs the freshness checker to cancel further processing. The "executor"
- * argument must point to the same executor passed to "start()".
- *
- * Like start(), this method must run in the executor context.
+ * Informs the freshness checker to cancel further processing.
*/
- void cancel(ReplicationExecutor* executor);
+ void cancel();
/**
* Returns true if cancel() was called on this instance.
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 068bb49ef51..a387b98e863 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -80,6 +80,8 @@
#include "mongo/util/time_support.h"
#include "mongo/util/timer.h"
+#include "mongo/util/stacktrace.h"
+
namespace mongo {
namespace repl {
@@ -90,6 +92,9 @@ using EventHandle = executor::TaskExecutor::EventHandle;
namespace {
+Status shutDownInProgressStatus(ErrorCodes::ShutdownInProgress,
+ "replication system is shutting down");
+
void lockAndCall(stdx::unique_lock<stdx::mutex>* lk, const stdx::function<void()>& fn) {
if (!lk->owns_lock()) {
lk->lock();
@@ -352,18 +357,13 @@ void ReplicationCoordinatorImpl::appendConnectionStats(executor::ConnectionPoolS
_replExecutor.appendConnectionStats(stats);
}
-void ReplicationCoordinatorImpl::_updateLastVote(const LastVote& lastVote) {
- _topCoord->loadLastVote(lastVote);
-}
-
bool ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* txn) {
StatusWith<LastVote> lastVote = _externalState->loadLocalLastVoteDocument(txn);
if (!lastVote.isOK()) {
log() << "Did not find local voted for document at startup; " << lastVote.getStatus();
} else {
- LastVote vote = lastVote.getValue();
- _replExecutor.scheduleWork(
- stdx::bind(&ReplicationCoordinatorImpl::_updateLastVote, this, vote));
+ LockGuard topoLock(_topoMutex);
+ _topCoord->loadLastVote(lastVote.getValue());
}
StatusWith<BSONObj> cfg = _externalState->loadLocalConfigDocument(txn);
@@ -413,6 +413,8 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig(
return;
}
+ LockGuard topoLock(_topoMutex);
+
StatusWith<int> myIndex =
validateConfigForStartUp(_externalState.get(), _rsConfig, localConfig);
if (!myIndex.isOK()) {
@@ -467,7 +469,7 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig(
stdx::unique_lock<stdx::mutex> lock(_mutex);
invariant(_rsConfigState == kConfigStartingUp);
const PostMemberStateUpdateAction action =
- _setCurrentRSConfig_inlock(cbData, localConfig, myIndex.getValue());
+ _setCurrentRSConfig_inlock(localConfig, myIndex.getValue());
_setMyLastAppliedOpTime_inlock(lastOpTime, false);
_setMyLastDurableOpTime_inlock(lastOpTime, false);
_reportUpstream_inlock(std::move(lock));
@@ -628,8 +630,8 @@ Seconds ReplicationCoordinatorImpl::getSlaveDelaySecs() const {
}
void ReplicationCoordinatorImpl::clearSyncSourceBlacklist() {
- auto work = [this](const CallbackArgs&) { _topCoord->clearSyncSourceBlacklist(); };
- _scheduleWorkAndWaitForCompletion(work);
+ LockGuard topoLock(_topoMutex);
+ _topCoord->clearSyncSourceBlacklist();
}
ReplicationExecutor::EventHandle ReplicationCoordinatorImpl::setFollowerMode_nonBlocking(
@@ -640,12 +642,7 @@ ReplicationExecutor::EventHandle ReplicationCoordinatorImpl::setFollowerMode_non
return ReplicationExecutor::EventHandle();
}
fassert(18812, finishedSettingFollowerMode.getStatus());
- _scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_setFollowerModeFinish,
- this,
- stdx::placeholders::_1,
- newState,
- finishedSettingFollowerMode.getValue(),
- success));
+ _setFollowerModeFinish(newState, finishedSettingFollowerMode.getValue(), success);
return finishedSettingFollowerMode.getValue();
}
@@ -658,13 +655,11 @@ bool ReplicationCoordinatorImpl::setFollowerMode(const MemberState& newState) {
}
void ReplicationCoordinatorImpl::_setFollowerModeFinish(
- const ReplicationExecutor::CallbackArgs& cbData,
const MemberState& newState,
const ReplicationExecutor::EventHandle& finishedSettingFollowerMode,
bool* success) {
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- return;
- }
+ LockGuard topoLock(_topoMutex);
+
if (newState == _topCoord->getMemberState()) {
*success = true;
_replExecutor.signalEvent(finishedSettingFollowerMode);
@@ -683,21 +678,21 @@ void ReplicationCoordinatorImpl::_setFollowerModeFinish(
// finish setting the follower mode.
if (isV1ElectionProtocol()) {
invariant(_voteRequester);
- _voteRequester->cancel(&_replExecutor);
+ _voteRequester->cancel();
} else {
invariant(_freshnessChecker);
- _freshnessChecker->cancel(&_replExecutor);
+ _freshnessChecker->cancel();
if (_electCmdRunner) {
- _electCmdRunner->cancel(&_replExecutor);
+ _electCmdRunner->cancel();
}
}
- _replExecutor.onEvent(_electionFinishedEvent,
- stdx::bind(&ReplicationCoordinatorImpl::_setFollowerModeFinish,
+ _replExecutor.onEvent(
+ _electionFinishedEvent,
+ _wrapAsCallbackFn(stdx::bind(&ReplicationCoordinatorImpl::_setFollowerModeFinish,
this,
- stdx::placeholders::_1,
newState,
finishedSettingFollowerMode,
- success));
+ success)));
return;
}
@@ -945,8 +940,8 @@ Status ReplicationCoordinatorImpl::setLastOptimeForSlave(const OID& rid, const T
}
void ReplicationCoordinatorImpl::setMyHeartbeatMessage(const std::string& msg) {
- _scheduleWorkAndWaitForCompletion(stdx::bind(
- &TopologyCoordinator::setMyHeartbeatMessage, _topCoord.get(), _replExecutor.now(), msg));
+ LockGuard topoLock(_topoMutex);
+ _topCoord->setMyHeartbeatMessage(_replExecutor.now(), msg);
}
void ReplicationCoordinatorImpl::setMyLastAppliedOpTimeForward(const OpTime& opTime) {
@@ -1313,47 +1308,57 @@ Status ReplicationCoordinatorImpl::_setLastOptime_inlock(const UpdatePositionArg
}
void ReplicationCoordinatorImpl::interrupt(unsigned opId) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- // Wake ops waiting for a new committed snapshot.
- _currentCommittedSnapshotCond.notify_all();
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ // Wake ops waiting for a new committed snapshot.
+ _currentCommittedSnapshotCond.notify_all();
- for (std::vector<WaiterInfo*>::iterator it = _replicationWaiterList.begin();
- it != _replicationWaiterList.end();
- ++it) {
- WaiterInfo* info = *it;
- if (info->opID == opId) {
- info->condVar->notify_all();
- return;
+ for (std::vector<WaiterInfo*>::iterator it = _replicationWaiterList.begin();
+ it != _replicationWaiterList.end();
+ ++it) {
+ WaiterInfo* info = *it;
+ if (info->opID == opId) {
+ info->condVar->notify_all();
+ return;
+ }
}
- }
- for (auto& opTimeWaiter : _opTimeWaiterList) {
- if (opTimeWaiter->opID == opId) {
- opTimeWaiter->condVar->notify_all();
- return;
+ for (auto& opTimeWaiter : _opTimeWaiterList) {
+ if (opTimeWaiter->opID == opId) {
+ opTimeWaiter->condVar->notify_all();
+ return;
+ }
}
}
- _scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_signalStepDownWaiters, this));
+ {
+ LockGuard topoLock(_topoMutex);
+ _signalStepDownWaiters();
+ }
}
void ReplicationCoordinatorImpl::interruptAll() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- // Wake ops waiting for a new committed snapshot.
- _currentCommittedSnapshotCond.notify_all();
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ // Wake ops waiting for a new committed snapshot.
+ _currentCommittedSnapshotCond.notify_all();
- for (std::vector<WaiterInfo*>::iterator it = _replicationWaiterList.begin();
- it != _replicationWaiterList.end();
- ++it) {
- WaiterInfo* info = *it;
- info->condVar->notify_all();
- }
+ for (std::vector<WaiterInfo*>::iterator it = _replicationWaiterList.begin();
+ it != _replicationWaiterList.end();
+ ++it) {
+ WaiterInfo* info = *it;
+ info->condVar->notify_all();
+ }
- for (auto& opTimeWaiter : _opTimeWaiterList) {
- opTimeWaiter->condVar->notify_all();
+ for (auto& opTimeWaiter : _opTimeWaiterList) {
+ opTimeWaiter->condVar->notify_all();
+ }
}
- _scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_signalStepDownWaiters, this));
+ {
+ LockGuard topoLock(_topoMutex);
+ _signalStepDownWaiters();
+ }
}
bool ReplicationCoordinatorImpl::_doneWaitingForReplication_inlock(
@@ -1611,24 +1616,20 @@ ReplicationCoordinatorImpl::stepDown_nonBlocking(OperationContext* txn,
return StepDownNonBlockingResult();
}
fassert(26000, finishedEvent.getStatus());
- CBHStatus cbh =
- _replExecutor.scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_stepDownContinue,
- this,
- stdx::placeholders::_1,
- finishedEvent.getValue(),
- txn,
- waitUntil,
- stepDownUntil,
- force,
- true, // restartHeartbeats
- result));
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- *result = cbh.getStatus();
- return StepDownNonBlockingResult();
- }
- fassert(18809, cbh.getStatus());
- _scheduleWorkAt(waitUntil,
- stdx::bind(&ReplicationCoordinatorImpl::_signalStepDownWaiters, this));
+ _stepDownContinue(finishedEvent.getValue(),
+ txn,
+ waitUntil,
+ stepDownUntil,
+ force,
+ true, // restartHeartbeats
+ result);
+
+ auto signalStepDownWaitersInLock = [this](const CallbackArgs&) {
+ LockGuard topoLock(_topoMutex);
+ _signalStepDownWaiters();
+ };
+
+ _scheduleWorkAt(waitUntil, signalStepDownWaitersInLock);
return std::make_pair(std::move(globalReadLock), finishedEvent.getValue());
}
@@ -1655,7 +1656,6 @@ void ReplicationCoordinatorImpl::_signalStepDownWaiters() {
}
void ReplicationCoordinatorImpl::_stepDownContinue(
- const ReplicationExecutor::CallbackArgs& cbData,
const ReplicationExecutor::EventHandle finishedEvent,
OperationContext* txn,
const Date_t waitUntil,
@@ -1663,18 +1663,10 @@ void ReplicationCoordinatorImpl::_stepDownContinue(
bool force,
bool restartHeartbeats,
Status* result) {
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- // Cancelation only occurs on shutdown, which will also handle signaling the event.
- *result = Status(ErrorCodes::ShutdownInProgress, "Shutting down replication");
- return;
- }
+ LockGuard topoLock(_topoMutex);
ScopeGuard allFinishedGuard =
MakeGuard(stdx::bind(&ReplicationExecutor::signalEvent, &_replExecutor, finishedEvent));
- if (!cbData.status.isOK()) {
- *result = cbData.status;
- return;
- }
Status interruptedStatus = txn->checkForInterruptNoAssert();
if (!interruptedStatus.isOK()) {
@@ -1698,10 +1690,10 @@ void ReplicationCoordinatorImpl::_stepDownContinue(
bool forceNow = now >= waitUntil ? force : false;
if (_topCoord->stepDown(stepDownUntil, forceNow, getMyLastAppliedOpTime())) {
// Schedule work to (potentially) step back up once the stepdown period has ended.
- _replExecutor.scheduleWorkAt(stepDownUntil,
- stdx::bind(&ReplicationCoordinatorImpl::_handleTimePassing,
- this,
- stdx::placeholders::_1));
+ _scheduleWorkAt(stepDownUntil,
+ stdx::bind(&ReplicationCoordinatorImpl::_handleTimePassing,
+ this,
+ stdx::placeholders::_1));
stdx::unique_lock<stdx::mutex> lk(_mutex);
const PostMemberStateUpdateAction action =
@@ -1732,7 +1724,6 @@ void ReplicationCoordinatorImpl::_stepDownContinue(
CBHStatus cbh = _replExecutor.onEvent(_stepDownWaiters.back(),
stdx::bind(&ReplicationCoordinatorImpl::_stepDownContinue,
this,
- stdx::placeholders::_1,
finishedEvent,
txn,
waitUntil,
@@ -1754,7 +1745,7 @@ void ReplicationCoordinatorImpl::_stepDownContinue(
return;
}
stdx::lock_guard<stdx::mutex> lk(_mutex);
- _restartHeartbeats_inlock(cbData);
+ _restartHeartbeats_inlock();
}
void ReplicationCoordinatorImpl::_handleTimePassing(
@@ -1763,6 +1754,7 @@ void ReplicationCoordinatorImpl::_handleTimePassing(
return;
}
+ LockGuard topoLock(_topoMutex);
if (_topCoord->becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(_replExecutor.now())) {
_performPostMemberStateUpdateAction(kActionWinElection);
}
@@ -1951,38 +1943,30 @@ StatusWith<BSONObj> ReplicationCoordinatorImpl::prepareReplSetUpdatePositionComm
}
Status ReplicationCoordinatorImpl::processReplSetGetStatus(BSONObjBuilder* response) {
+ LockGuard topoLock(_topoMutex);
+
Status result(ErrorCodes::InternalError, "didn't set status in prepareStatusResponse");
- _scheduleWorkAndWaitForCompletion(
- stdx::bind(&TopologyCoordinator::prepareStatusResponse,
- _topCoord.get(),
- stdx::placeholders::_1,
- TopologyCoordinator::ReplSetStatusArgs{
- _replExecutor.now(),
- static_cast<unsigned>(time(0) - serverGlobalParams.started),
- getMyLastAppliedOpTime(),
- getMyLastDurableOpTime(),
- getLastCommittedOpTime(),
- getCurrentCommittedSnapshotOpTime()},
- response,
- &result));
+ _topCoord->prepareStatusResponse(
+ TopologyCoordinator::ReplSetStatusArgs{
+ _replExecutor.now(),
+ static_cast<unsigned>(time(0) - serverGlobalParams.started),
+ getMyLastAppliedOpTime(),
+ getMyLastDurableOpTime(),
+ getLastCommittedOpTime(),
+ getCurrentCommittedSnapshotOpTime()},
+ response,
+ &result);
return result;
}
void ReplicationCoordinatorImpl::fillIsMasterForReplSet(IsMasterResponse* response) {
invariant(getSettings().usingReplSets());
- CBHStatus cbh = _replExecutor.scheduleWork(
- stdx::bind(&ReplicationCoordinatorImpl::_fillIsMasterForReplSet_finish,
- this,
- stdx::placeholders::_1,
- response));
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- response->markAsShutdownInProgress();
- return;
+ {
+ LockGuard topoLock(_topoMutex);
+ _topCoord->fillIsMasterForReplSet(response);
}
- fassert(28602, cbh.getStatus());
- _replExecutor.wait(cbh.getValue());
if (isWaitingForApplierToDrain()) {
// Report that we are secondary to ismaster callers until drain completes.
response->setIsMaster(false);
@@ -1990,15 +1974,6 @@ void ReplicationCoordinatorImpl::fillIsMasterForReplSet(IsMasterResponse* respon
}
}
-void ReplicationCoordinatorImpl::_fillIsMasterForReplSet_finish(
- const ReplicationExecutor::CallbackArgs& cbData, IsMasterResponse* response) {
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- response->markAsShutdownInProgress();
- return;
- }
- _topCoord->fillIsMasterForReplSet(response);
-}
-
void ReplicationCoordinatorImpl::appendSlaveInfoData(BSONObjBuilder* result) {
stdx::lock_guard<stdx::mutex> lock(_mutex);
BSONArrayBuilder replicationProgress(result->subarrayStart("replicationProgress"));
@@ -2039,10 +2014,13 @@ void ReplicationCoordinatorImpl::processReplSetGetConfig(BSONObjBuilder* result)
void ReplicationCoordinatorImpl::processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) {
EventHandle evh;
- _scheduleWorkAndWaitForCompletion([this, &evh, &replMetadata](const CallbackArgs& args) {
+
+ {
+ LockGuard topoLock(_topoMutex);
evh = _processReplSetMetadata_incallback(replMetadata);
- });
- if (evh.isValid()) {
+ }
+
+ if (evh) {
_replExecutor.waitForEvent(evh);
}
}
@@ -2062,26 +2040,8 @@ EventHandle ReplicationCoordinatorImpl::_processReplSetMetadata_incallback(
}
bool ReplicationCoordinatorImpl::getMaintenanceMode() {
- bool maintenanceMode(false);
- CBHStatus cbh = _replExecutor.scheduleWork(
- stdx::bind(&ReplicationCoordinatorImpl::_getMaintenanceMode_helper,
- this,
- stdx::placeholders::_1,
- &maintenanceMode));
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return false;
- }
- fassert(18811, cbh.getStatus());
- _replExecutor.wait(cbh.getValue());
- return maintenanceMode;
-}
-
-void ReplicationCoordinatorImpl::_getMaintenanceMode_helper(
- const ReplicationExecutor::CallbackArgs& cbData, bool* maintenanceMode) {
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- return;
- }
- *maintenanceMode = _topCoord->getMaintenanceCount() > 0;
+ LockGuard topoLock(_topoMutex);
+ return _topCoord->getMaintenanceCount() > 0;
}
Status ReplicationCoordinatorImpl::setMaintenanceMode(bool activate) {
@@ -2090,37 +2050,15 @@ Status ReplicationCoordinatorImpl::setMaintenanceMode(bool activate) {
"can only set maintenance mode on replica set members");
}
- Status result(ErrorCodes::InternalError, "didn't set status in _setMaintenanceMode_helper");
- CBHStatus cbh = _replExecutor.scheduleWork(
- stdx::bind(&ReplicationCoordinatorImpl::_setMaintenanceMode_helper,
- this,
- stdx::placeholders::_1,
- activate,
- &result));
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return cbh.getStatus();
- }
- fassert(18698, cbh.getStatus());
- _replExecutor.wait(cbh.getValue());
- return result;
-}
-
-void ReplicationCoordinatorImpl::_setMaintenanceMode_helper(
- const ReplicationExecutor::CallbackArgs& cbData, bool activate, Status* result) {
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- *result = Status(ErrorCodes::ShutdownInProgress, "replication system is shutting down");
- return;
- }
-
+ Status result(ErrorCodes::InternalError, "didn't set status");
+ LockGuard topoLock(_topoMutex);
if (_topCoord->getRole() == TopologyCoordinator::Role::candidate) {
- *result = Status(ErrorCodes::NotSecondary, "currently running for election");
- return;
+ return Status(ErrorCodes::NotSecondary, "currently running for election");
}
stdx::unique_lock<stdx::mutex> lk(_mutex);
if (_getMemberState_inlock().primary()) {
- *result = Status(ErrorCodes::NotSecondary, "primaries can't modify maintenance mode");
- return;
+ return Status(ErrorCodes::NotSecondary, "primaries can't modify maintenance mode");
}
int curMaintenanceCalls = _topCoord->getMaintenanceCount();
@@ -2137,63 +2075,28 @@ void ReplicationCoordinatorImpl::_setMaintenanceMode_helper(
<< " other maintenance mode tasks ongoing)" << rsLog;
} else {
warning() << "Attempted to leave maintenance mode but it is not currently active";
- *result = Status(ErrorCodes::OperationFailed, "already out of maintenance mode");
- return;
+ return Status(ErrorCodes::OperationFailed, "already out of maintenance mode");
}
const PostMemberStateUpdateAction action = _updateMemberStateFromTopologyCoordinator_inlock();
- *result = Status::OK();
lk.unlock();
_performPostMemberStateUpdateAction(action);
+ return Status::OK();
}
Status ReplicationCoordinatorImpl::processReplSetSyncFrom(const HostAndPort& target,
BSONObjBuilder* resultObj) {
Status result(ErrorCodes::InternalError, "didn't set status in prepareSyncFromResponse");
- CBHStatus cbh =
- _replExecutor.scheduleWork(stdx::bind(&TopologyCoordinator::prepareSyncFromResponse,
- _topCoord.get(),
- stdx::placeholders::_1,
- target,
- _getMyLastAppliedOpTime_inlock(),
- resultObj,
- &result));
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return Status(ErrorCodes::ShutdownInProgress, "replication shutdown in progress");
- }
- fassert(18649, cbh.getStatus());
- _replExecutor.wait(cbh.getValue());
+ LockGuard topoLock(_topoMutex);
+ LockGuard lk(_mutex);
+ auto opTime = _getMyLastAppliedOpTime_inlock();
+ _topCoord->prepareSyncFromResponse(target, opTime, resultObj, &result);
return result;
}
Status ReplicationCoordinatorImpl::processReplSetFreeze(int secs, BSONObjBuilder* resultObj) {
- Status result(ErrorCodes::InternalError, "didn't set status in prepareFreezeResponse");
- CBHStatus cbh = _replExecutor.scheduleWork(
- stdx::bind(&ReplicationCoordinatorImpl::_processReplSetFreeze_finish,
- this,
- stdx::placeholders::_1,
- secs,
- resultObj,
- &result));
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return cbh.getStatus();
- }
- fassert(18641, cbh.getStatus());
- _replExecutor.wait(cbh.getValue());
- return result;
-}
-
-void ReplicationCoordinatorImpl::_processReplSetFreeze_finish(
- const ReplicationExecutor::CallbackArgs& cbData,
- int secs,
- BSONObjBuilder* response,
- Status* result) {
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- *result = Status(ErrorCodes::ShutdownInProgress, "replication system is shutting down");
- return;
- }
-
- _topCoord->prepareFreezeResponse(_replExecutor.now(), secs, response);
+ LockGuard topoLock(_topoMutex);
+ _topCoord->prepareFreezeResponse(_replExecutor.now(), secs, resultObj);
if (_topCoord->getRole() == TopologyCoordinator::Role::candidate) {
// If we just unfroze and ended our stepdown period and we are a one node replica set,
@@ -2201,7 +2104,8 @@ void ReplicationCoordinatorImpl::_processReplSetFreeze_finish(
// need to elect ourself.
_performPostMemberStateUpdateAction(kActionWinElection);
}
- *result = Status::OK();
+ return Status::OK();
+ ;
}
Status ReplicationCoordinatorImpl::processHeartbeat(const ReplSetHeartbeatArgs& args,
@@ -2214,44 +2118,17 @@ Status ReplicationCoordinatorImpl::processHeartbeat(const ReplSetHeartbeatArgs&
}
}
- Status result(ErrorCodes::InternalError, "didn't set status in prepareHeartbeatResponse");
- CBHStatus cbh =
- _replExecutor.scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_processHeartbeatFinish,
- this,
- stdx::placeholders::_1,
- args,
- response,
- &result));
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return Status(ErrorCodes::ShutdownInProgress, "replication shutdown in progress");
- }
- fassert(18508, cbh.getStatus());
- _replExecutor.wait(cbh.getValue());
- return result;
-}
-
-void ReplicationCoordinatorImpl::_processHeartbeatFinish(
- const ReplicationExecutor::CallbackArgs& cbData,
- const ReplSetHeartbeatArgs& args,
- ReplSetHeartbeatResponse* response,
- Status* outStatus) {
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- *outStatus = Status(ErrorCodes::ShutdownInProgress, "Replication shutdown in progress");
- return;
- }
- fassert(18910, cbData.status);
-
auto senderHost(args.getSenderHost());
+ LockGuard topoLock(_topoMutex);
const Date_t now = _replExecutor.now();
- *outStatus = _topCoord->prepareHeartbeatResponse(now,
- args,
- _settings.ourSetName(),
- getMyLastAppliedOpTime(),
- getMyLastDurableOpTime(),
- response);
- if ((outStatus->isOK() || *outStatus == ErrorCodes::InvalidReplicaSetConfig) &&
- _selfIndex < 0) {
+ Status result = _topCoord->prepareHeartbeatResponse(now,
+ args,
+ _settings.ourSetName(),
+ getMyLastAppliedOpTime(),
+ getMyLastDurableOpTime(),
+ response);
+ if ((result.isOK() || result == ErrorCodes::InvalidReplicaSetConfig) && _selfIndex < 0) {
// If this node does not belong to the configuration it knows about, send heartbeats
// back to any node that sends us a heartbeat, in case one of those remote nodes has
// a configuration that contains us. Chances are excellent that it will, since that
@@ -2259,7 +2136,7 @@ void ReplicationCoordinatorImpl::_processHeartbeatFinish(
if (!senderHost.empty() && _seedList.insert(senderHost).second) {
_scheduleHeartbeatToTarget(senderHost, -1, now);
}
- } else if (outStatus->isOK() && response->getConfigVersion() < args.getConfigVersion()) {
+ } else if (result.isOK() && response->getConfigVersion() < args.getConfigVersion()) {
// Schedule a heartbeat to the sender to fetch the new config.
// We cannot cancel the enqueued heartbeat, but either this one or the enqueued heartbeat
// will trigger reconfig, which cancels and reschedules all heartbeats.
@@ -2269,6 +2146,7 @@ void ReplicationCoordinatorImpl::_processHeartbeatFinish(
_scheduleHeartbeatToTarget(senderHost, senderIndex, now);
}
}
+ return result;
}
Status ReplicationCoordinatorImpl::processReplSetReconfig(OperationContext* txn,
@@ -2377,14 +2255,20 @@ Status ReplicationCoordinatorImpl::processReplSetReconfig(OperationContext* txn,
// If it's a force reconfig, the primary node may not be electable after the configuration
// change. In case we are that primary node, finish the reconfig under the global lock,
// so that the step down occurs safely.
- CBHStatus cbh = args.force ? _replExecutor.scheduleWorkWithGlobalExclusiveLock(reconfigFinishFn)
- : _replExecutor.scheduleWork(reconfigFinishFn);
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return status;
+ CBHStatus cbhStatus(ErrorCodes::InternalError, "reconfigFinishFn hasn't been scheduled");
+ if (args.force) {
+ cbhStatus = _replExecutor.scheduleWorkWithGlobalExclusiveLock(reconfigFinishFn);
+ } else {
+ cbhStatus = _replExecutor.scheduleWork(reconfigFinishFn);
+ }
+ if (cbhStatus.getStatus() == ErrorCodes::ShutdownInProgress) {
+ return cbhStatus.getStatus();
}
- fassert(18824, cbh.getStatus());
+
+ fassert(18824, cbhStatus.getStatus());
+
configStateGuard.Dismiss();
- _replExecutor.wait(cbh.getValue());
+ _replExecutor.wait(cbhStatus.getValue());
return Status::OK();
}
@@ -2392,12 +2276,37 @@ void ReplicationCoordinatorImpl::_finishReplSetReconfig(
const ReplicationExecutor::CallbackArgs& cbData,
const ReplicaSetConfig& newConfig,
int myIndex) {
+ LockGuard topoLock(_topoMutex);
+
stdx::unique_lock<stdx::mutex> lk(_mutex);
invariant(_rsConfigState == kConfigReconfiguring);
invariant(_rsConfig.isInitialized());
+
+ // Do not conduct an election during a reconfig, as the node may not be electable post-reconfig.
+ if (_topCoord->getRole() == TopologyCoordinator::Role::candidate) {
+ if (isV1ElectionProtocol()) {
+ invariant(_voteRequester);
+ _voteRequester->cancel();
+ } else {
+ invariant(_freshnessChecker);
+ _freshnessChecker->cancel();
+ if (_electCmdRunner) {
+ _electCmdRunner->cancel();
+ }
+ }
+ // Wait for the election to complete and the node's Role to be set to follower.
+ _replExecutor.onEvent(_electionFinishedEvent,
+ stdx::bind(&ReplicationCoordinatorImpl::_finishReplSetReconfig,
+ this,
+ stdx::placeholders::_1,
+ newConfig,
+ myIndex));
+ return;
+ }
+
+
const ReplicaSetConfig oldConfig = _rsConfig;
- const PostMemberStateUpdateAction action =
- _setCurrentRSConfig_inlock(cbData, newConfig, myIndex);
+ const PostMemberStateUpdateAction action = _setCurrentRSConfig_inlock(newConfig, myIndex);
// On a reconfig we drop all snapshots so we don't mistakenely read from the wrong one.
// For example, if we change the meaning of the "committed" snapshot from applied -> durable.
@@ -2497,18 +2406,11 @@ Status ReplicationCoordinatorImpl::processReplSetInitiate(OperationContext* txn,
// Since the JournalListener has not yet been set up, we must manually set our
// durableOpTime.
setMyLastDurableOpTime(getMyLastAppliedOpTime());
- CBHStatus cbh = _replExecutor.scheduleWork(
- stdx::bind(&ReplicationCoordinatorImpl::_finishReplSetInitiate,
- this,
- stdx::placeholders::_1,
- newConfig,
- myIndex.getValue()));
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return status;
+
+ {
+ LockGuard topoLock(_topoMutex);
+ _finishReplSetInitiate(newConfig, myIndex.getValue());
}
- configStateGuard.Dismiss();
- fassert(18654, cbh.getStatus());
- _replExecutor.wait(cbh.getValue());
// A configuration passed to replSetInitiate() with the current node as an arbiter
// will fail validation with a "replSet initiate got ... while validating" reason.
@@ -2517,19 +2419,17 @@ Status ReplicationCoordinatorImpl::processReplSetInitiate(OperationContext* txn,
_startDataReplication();
}
+ configStateGuard.Dismiss();
return Status::OK();
}
-void ReplicationCoordinatorImpl::_finishReplSetInitiate(
- const ReplicationExecutor::CallbackArgs& cbData,
- const ReplicaSetConfig& newConfig,
- int myIndex) {
+void ReplicationCoordinatorImpl::_finishReplSetInitiate(const ReplicaSetConfig& newConfig,
+ int myIndex) {
stdx::unique_lock<stdx::mutex> lk(_mutex);
invariant(_rsConfigState == kConfigInitiating);
invariant(!_rsConfig.isInitialized());
const ReplicaSetConfig oldConfig = _rsConfig;
- const PostMemberStateUpdateAction action =
- _setCurrentRSConfig_inlock(cbData, newConfig, myIndex);
+ const PostMemberStateUpdateAction action = _setCurrentRSConfig_inlock(newConfig, myIndex);
lk.unlock();
_resetElectionInfoOnProtocolVersionUpgrade(oldConfig, newConfig);
_performPostMemberStateUpdateAction(action);
@@ -2701,94 +2601,31 @@ void ReplicationCoordinatorImpl::incrementRollbackID() {
Status ReplicationCoordinatorImpl::processReplSetFresh(const ReplSetFreshArgs& args,
BSONObjBuilder* resultObj) {
+ LockGuard topoLock(_topoMutex);
Status result(ErrorCodes::InternalError, "didn't set status in prepareFreshResponse");
- CBHStatus cbh = _replExecutor.scheduleWork(
- stdx::bind(&ReplicationCoordinatorImpl::_processReplSetFresh_finish,
- this,
- stdx::placeholders::_1,
- args,
- resultObj,
- &result));
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return Status(ErrorCodes::ShutdownInProgress, "replication shutdown in progress");
- }
- fassert(18652, cbh.getStatus());
- _replExecutor.wait(cbh.getValue());
- return result;
-}
-
-void ReplicationCoordinatorImpl::_processReplSetFresh_finish(
- const ReplicationExecutor::CallbackArgs& cbData,
- const ReplSetFreshArgs& args,
- BSONObjBuilder* response,
- Status* result) {
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- *result = Status(ErrorCodes::ShutdownInProgress, "replication shutdown in progress");
- return;
- }
-
_topCoord->prepareFreshResponse(
- args, _replExecutor.now(), getMyLastAppliedOpTime(), response, result);
+ args, _replExecutor.now(), getMyLastAppliedOpTime(), resultObj, &result);
+ return result;
}
Status ReplicationCoordinatorImpl::processReplSetElect(const ReplSetElectArgs& args,
BSONObjBuilder* responseObj) {
+ LockGuard topoLock(_topoMutex);
Status result = Status(ErrorCodes::InternalError, "status not set by callback");
- CBHStatus cbh = _replExecutor.scheduleWork(
- stdx::bind(&ReplicationCoordinatorImpl::_processReplSetElect_finish,
- this,
- stdx::placeholders::_1,
- args,
- responseObj,
- &result));
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return Status(ErrorCodes::ShutdownInProgress, "replication shutdown in progress");
- }
- fassert(18657, cbh.getStatus());
- _replExecutor.wait(cbh.getValue());
- return result;
-}
-
-void ReplicationCoordinatorImpl::_processReplSetElect_finish(
- const ReplicationExecutor::CallbackArgs& cbData,
- const ReplSetElectArgs& args,
- BSONObjBuilder* response,
- Status* result) {
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- *result = Status(ErrorCodes::ShutdownInProgress, "replication shutdown in progress");
- return;
- }
-
_topCoord->prepareElectResponse(
- args, _replExecutor.now(), getMyLastAppliedOpTime(), response, result);
+ args, _replExecutor.now(), getMyLastAppliedOpTime(), responseObj, &result);
+ return result;
}
ReplicationCoordinatorImpl::PostMemberStateUpdateAction
-ReplicationCoordinatorImpl::_setCurrentRSConfig_inlock(
- const ReplicationExecutor::CallbackArgs& cbData,
- const ReplicaSetConfig& newConfig,
- int myIndex) {
+ReplicationCoordinatorImpl::_setCurrentRSConfig_inlock(const ReplicaSetConfig& newConfig,
+ int myIndex) {
invariant(_settings.usingReplSets());
_cancelHeartbeats_inlock();
_setConfigState_inlock(kConfigSteady);
// Must get this before changing our config.
OpTime myOptime = _getMyLastAppliedOpTime_inlock();
- // Do not conduct an election during a reconfig, as the node may not be electable post-reconfig.
- if (_topCoord->getRole() == TopologyCoordinator::Role::candidate) {
- if (isV1ElectionProtocol()) {
- invariant(_voteRequester);
- _voteRequester->cancel(&_replExecutor);
- } else {
- invariant(_freshnessChecker);
- _freshnessChecker->cancel(&_replExecutor);
- if (_electCmdRunner) {
- _electCmdRunner->cancel(&_replExecutor);
- }
- }
- // Wait for the election to complete and the node's Role to be set to follower.
- _replExecutor.waitForEvent(_electionFinishedEvent);
- }
_topCoord->updateConfig(newConfig, myIndex, _replExecutor.now(), myOptime);
_cachedTerm = _topCoord->getTerm();
const ReplicaSetConfig oldConfig = _rsConfig;
@@ -2811,7 +2648,7 @@ ReplicationCoordinatorImpl::_setCurrentRSConfig_inlock(
if (_selfIndex >= 0) {
// Don't send heartbeats if we're not in the config, if we get re-added one of the
// nodes in the set will contact us.
- _startHeartbeats_inlock(cbData);
+ _startHeartbeats_inlock();
}
_updateLastCommittedOpTime_inlock();
@@ -3028,81 +2865,41 @@ bool ReplicationCoordinatorImpl::isReplEnabled() const {
return getReplicationMode() != modeNone;
}
-void ReplicationCoordinatorImpl::_chooseNewSyncSource(
- const ReplicationExecutor::CallbackArgs& cbData,
- const Timestamp& lastTimestampFetched,
- HostAndPort* newSyncSource) {
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- return;
- }
+HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource(const Timestamp& lastTimestampFetched) {
+ LockGuard topoLock(_topoMutex);
HostAndPort oldSyncSource = _topCoord->getSyncSourceAddress();
-
- *newSyncSource = _topCoord->chooseNewSyncSource(_replExecutor.now(), lastTimestampFetched);
+ HostAndPort newSyncSource =
+ _topCoord->chooseNewSyncSource(_replExecutor.now(), lastTimestampFetched);
stdx::lock_guard<stdx::mutex> lock(_mutex);
// If we lost our sync source, schedule new heartbeats immediately to update our knowledge
// of other members's state, allowing us to make informed sync source decisions.
- if (newSyncSource->empty() && !oldSyncSource.empty() && _selfIndex >= 0 &&
+ if (newSyncSource.empty() && !oldSyncSource.empty() && _selfIndex >= 0 &&
!_getMemberState_inlock().primary()) {
- _restartHeartbeats_inlock(cbData);
+ _restartHeartbeats_inlock();
}
-}
-HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource(const Timestamp& lastTimestampFetched) {
- HostAndPort newSyncSource;
- CBHStatus cbh =
- _replExecutor.scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_chooseNewSyncSource,
- this,
- stdx::placeholders::_1,
- lastTimestampFetched,
- &newSyncSource));
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return newSyncSource; // empty
- }
- fassert(18740, cbh.getStatus());
- _replExecutor.wait(cbh.getValue());
return newSyncSource;
}
-void ReplicationCoordinatorImpl::_blacklistSyncSource(
- const ReplicationExecutor::CallbackArgs& cbData, const HostAndPort& host, Date_t until) {
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- return;
- }
- _topCoord->blacklistSyncSource(host, until);
-
- CBHStatus cbh =
- _replExecutor.scheduleWorkAt(until,
- stdx::bind(&ReplicationCoordinatorImpl::_unblacklistSyncSource,
- this,
- stdx::placeholders::_1,
- host));
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return;
- }
- fassert(28610, cbh.getStatus());
-}
-
void ReplicationCoordinatorImpl::_unblacklistSyncSource(
const ReplicationExecutor::CallbackArgs& cbData, const HostAndPort& host) {
if (cbData.status == ErrorCodes::CallbackCanceled)
return;
+
+ LockGuard topoLock(_topoMutex);
_topCoord->unblacklistSyncSource(host, _replExecutor.now());
}
void ReplicationCoordinatorImpl::blacklistSyncSource(const HostAndPort& host, Date_t until) {
- CBHStatus cbh =
- _replExecutor.scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_blacklistSyncSource,
- this,
- stdx::placeholders::_1,
- host,
- until));
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return;
- }
- fassert(18741, cbh.getStatus());
- _replExecutor.wait(cbh.getValue());
+ LockGuard topoLock(_topoMutex);
+ _topCoord->blacklistSyncSource(host, until);
+ _scheduleWorkAt(until,
+ stdx::bind(&ReplicationCoordinatorImpl::_unblacklistSyncSource,
+ this,
+ stdx::placeholders::_1,
+ host));
}
void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* txn) {
@@ -3124,41 +2921,15 @@ void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* txn
_externalState->setGlobalTimestamp(lastOpTime.getTimestamp());
}
-void ReplicationCoordinatorImpl::_shouldChangeSyncSource(
- const ReplicationExecutor::CallbackArgs& cbData,
- const HostAndPort& currentSource,
- const OpTime& syncSourceLastOpTime,
- bool syncSourceHasSyncSource,
- bool* shouldChange) {
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- return;
- }
-
- *shouldChange = _topCoord->shouldChangeSyncSource(currentSource,
- getMyLastAppliedOpTime(),
- syncSourceLastOpTime,
- syncSourceHasSyncSource,
- _replExecutor.now());
-}
-
bool ReplicationCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentSource,
const OpTime& syncSourceLastOpTime,
bool syncSourceHasSyncSource) {
- bool shouldChange(false);
- CBHStatus cbh =
- _replExecutor.scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_shouldChangeSyncSource,
- this,
- stdx::placeholders::_1,
- currentSource,
- syncSourceLastOpTime,
- syncSourceHasSyncSource,
- &shouldChange));
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return false;
- }
- fassert(18906, cbh.getStatus());
- _replExecutor.wait(cbh.getValue());
- return shouldChange;
+ LockGuard topoLock(_topoMutex);
+ return _topCoord->shouldChangeSyncSource(currentSource,
+ getMyLastAppliedOpTime(),
+ syncSourceLastOpTime,
+ syncSourceHasSyncSource,
+ _replExecutor.now());
}
SyncSourceResolverResponse ReplicationCoordinatorImpl::selectSyncSource(
@@ -3351,18 +3122,12 @@ Status ReplicationCoordinatorImpl::processReplSetRequestVotes(
if (!termStatus.isOK() && termStatus.code() != ErrorCodes::StaleTerm)
return termStatus;
- Status result{ErrorCodes::InternalError, "didn't set status in processReplSetRequestVotes"};
- CBHStatus cbh = _replExecutor.scheduleWork(
- stdx::bind(&ReplicationCoordinatorImpl::_processReplSetRequestVotes_finish,
- this,
- stdx::placeholders::_1,
- args,
- response,
- &result));
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return cbh.getStatus();
+ {
+ LockGuard topoLock(_topoMutex);
+ LockGuard lk(_mutex);
+ _topCoord->processReplSetRequestVotes(args, response, _getMyLastAppliedOpTime_inlock());
}
- _replExecutor.wait(cbh.getValue());
+
if (response->getVoteGranted()) {
LastVote lastVote;
lastVote.setTerm(args.getTerm());
@@ -3374,22 +3139,7 @@ Status ReplicationCoordinatorImpl::processReplSetRequestVotes(
return status;
}
}
- return result;
-}
-
-void ReplicationCoordinatorImpl::_processReplSetRequestVotes_finish(
- const ReplicationExecutor::CallbackArgs& cbData,
- const ReplSetRequestVotesArgs& args,
- ReplSetRequestVotesResponse* response,
- Status* result) {
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- *result = Status(ErrorCodes::ShutdownInProgress, "replication system is shutting down");
- return;
- }
-
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- _topCoord->processReplSetRequestVotes(args, response, _getMyLastAppliedOpTime_inlock());
- *result = Status::OK();
+ return Status::OK();
}
Status ReplicationCoordinatorImpl::processReplSetDeclareElectionWinner(
@@ -3400,33 +3150,9 @@ Status ReplicationCoordinatorImpl::processReplSetDeclareElectionWinner(
// TODO(sz) Remove processReplSetDeclareElectionWinner rathen than passing nullptr.
updateTerm(nullptr, args.getTerm());
-
- Status result{ErrorCodes::InternalError,
- "didn't set status in processReplSetDeclareElectionWinner"};
- CBHStatus cbh = _replExecutor.scheduleWork(
- stdx::bind(&ReplicationCoordinatorImpl::_processReplSetDeclareElectionWinner_finish,
- this,
- stdx::placeholders::_1,
- args,
- responseTerm,
- &result));
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return cbh.getStatus();
- }
- _replExecutor.wait(cbh.getValue());
- return result;
-}
-
-void ReplicationCoordinatorImpl::_processReplSetDeclareElectionWinner_finish(
- const ReplicationExecutor::CallbackArgs& cbData,
- const ReplSetDeclareElectionWinnerArgs& args,
- long long* responseTerm,
- Status* result) {
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- *result = Status(ErrorCodes::ShutdownInProgress, "replication system is shutting down");
- return;
- }
- *result = _topCoord->processReplSetDeclareElectionWinner(args, responseTerm);
+ LockGuard topoLock(_topoMutex);
+ return _topCoord->processReplSetDeclareElectionWinner(args, responseTerm);
+ ;
}
void ReplicationCoordinatorImpl::prepareReplResponseMetadata(const rpc::RequestInterface& request,
@@ -3434,34 +3160,15 @@ void ReplicationCoordinatorImpl::prepareReplResponseMetadata(const rpc::RequestI
BSONObjBuilder* builder) {
if (request.getMetadata().hasField(rpc::kReplSetMetadataFieldName)) {
rpc::ReplSetMetadata metadata;
+ LockGuard topoLock(_topoMutex);
- CBHStatus cbh = _replExecutor.scheduleWork(
- stdx::bind(&ReplicationCoordinatorImpl::_prepareReplResponseMetadata_finish,
- this,
- stdx::placeholders::_1,
- lastOpTimeFromClient,
- &metadata));
-
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return;
- }
-
- fassert(28709, cbh.getStatus());
- _replExecutor.wait(cbh.getValue());
-
+ OpTime lastReadableOpTime = getCurrentCommittedSnapshotOpTime();
+ OpTime lastVisibleOpTime = std::max(lastOpTimeFromClient, lastReadableOpTime);
+ _topCoord->prepareReplResponseMetadata(&metadata, lastVisibleOpTime, _lastCommittedOpTime);
metadata.writeToMetadata(builder);
}
}
-void ReplicationCoordinatorImpl::_prepareReplResponseMetadata_finish(
- const ReplicationExecutor::CallbackArgs& cbData,
- const OpTime& lastOpTimeFromClient,
- rpc::ReplSetMetadata* metadata) {
- OpTime lastReadableOpTime = getCurrentCommittedSnapshotOpTime();
- OpTime lastVisibleOpTime = std::max(lastOpTimeFromClient, lastReadableOpTime);
- _topCoord->prepareReplResponseMetadata(metadata, lastVisibleOpTime, _lastCommittedOpTime);
-}
-
bool ReplicationCoordinatorImpl::isV1ElectionProtocol() const {
return _protVersion.load() == 1;
}
@@ -3485,44 +3192,18 @@ Status ReplicationCoordinatorImpl::processHeartbeatV1(const ReplSetHeartbeatArgs
}
Status result(ErrorCodes::InternalError, "didn't set status in prepareHeartbeatResponse");
- CBHStatus cbh = _replExecutor.scheduleWork(
- stdx::bind(&ReplicationCoordinatorImpl::_processHeartbeatFinishV1,
- this,
- stdx::placeholders::_1,
- args,
- response,
- &result));
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return {ErrorCodes::ShutdownInProgress, "replication shutdown in progress"};
- }
- fassert(28645, cbh.getStatus());
- _replExecutor.wait(cbh.getValue());
-
- return result;
-}
-
-void ReplicationCoordinatorImpl::_processHeartbeatFinishV1(
- const ReplicationExecutor::CallbackArgs& cbData,
- const ReplSetHeartbeatArgsV1& args,
- ReplSetHeartbeatResponse* response,
- Status* outStatus) {
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- *outStatus = {ErrorCodes::ShutdownInProgress, "Replication shutdown in progress"};
- return;
- }
- fassert(28655, cbData.status);
+ LockGuard topoLock(_topoMutex);
auto senderHost(args.getSenderHost());
const Date_t now = _replExecutor.now();
- *outStatus = _topCoord->prepareHeartbeatResponseV1(now,
- args,
- _settings.ourSetName(),
- getMyLastAppliedOpTime(),
- getMyLastDurableOpTime(),
- response);
-
- if ((outStatus->isOK() || *outStatus == ErrorCodes::InvalidReplicaSetConfig) &&
- _selfIndex < 0) {
+ result = _topCoord->prepareHeartbeatResponseV1(now,
+ args,
+ _settings.ourSetName(),
+ getMyLastAppliedOpTime(),
+ getMyLastDurableOpTime(),
+ response);
+
+ if ((result.isOK() || result == ErrorCodes::InvalidReplicaSetConfig) && _selfIndex < 0) {
// If this node does not belong to the configuration it knows about, send heartbeats
// back to any node that sends us a heartbeat, in case one of those remote nodes has
// a configuration that contains us. Chances are excellent that it will, since that
@@ -3530,7 +3211,7 @@ void ReplicationCoordinatorImpl::_processHeartbeatFinishV1(
if (!senderHost.empty() && _seedList.insert(senderHost).second) {
_scheduleHeartbeatToTarget(senderHost, -1, now);
}
- } else if (outStatus->isOK() && response->getConfigVersion() < args.getConfigVersion()) {
+ } else if (result.isOK() && response->getConfigVersion() < args.getConfigVersion()) {
// Schedule a heartbeat to the sender to fetch the new config.
// We cannot cancel the enqueued heartbeat, but either this one or the enqueued heartbeat
// will trigger reconfig, which cancels and reschedules all heartbeats.
@@ -3538,36 +3219,21 @@ void ReplicationCoordinatorImpl::_processHeartbeatFinishV1(
int senderIndex = _rsConfig.findMemberIndexByHostAndPort(senderHost);
_scheduleHeartbeatToTarget(senderHost, senderIndex, now);
}
- } else if (outStatus->isOK()) {
+ } else if (result.isOK()) {
// Update liveness for sending node.
stdx::lock_guard<stdx::mutex> lk(_mutex);
auto slaveInfo = _findSlaveInfoByMemberID_inlock(args.getSenderId());
if (!slaveInfo) {
- return;
+ return result;
}
slaveInfo->lastUpdate = _replExecutor.now();
slaveInfo->down = false;
}
+ return result;
}
void ReplicationCoordinatorImpl::summarizeAsHtml(ReplSetHtmlSummary* output) {
- CBHStatus cbh =
- _replExecutor.scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_summarizeAsHtml_finish,
- this,
- stdx::placeholders::_1,
- output));
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return;
- }
- fassert(28638, cbh.getStatus());
- _replExecutor.wait(cbh.getValue());
-}
-
-void ReplicationCoordinatorImpl::_summarizeAsHtml_finish(const CallbackArgs& cbData,
- ReplSetHtmlSummary* output) {
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- return;
- }
+ LockGuard topoLock(_topoMutex);
// TODO(dannenberg) consider putting both optimes into the htmlsummary.
output->setSelfOptime(getMyLastAppliedOpTime());
@@ -3582,30 +3248,18 @@ long long ReplicationCoordinatorImpl::getTerm() {
return _cachedTerm;
}
-void ReplicationCoordinatorImpl::_getTerm_helper(const ReplicationExecutor::CallbackArgs& cbData,
- long long* term) {
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- return;
- }
- *term = _topCoord->getTerm();
-}
-
EventHandle ReplicationCoordinatorImpl::updateTerm_forTest(
long long term, TopologyCoordinator::UpdateTermResult* updateResult) {
- auto finishEvhStatus = _replExecutor.makeEvent();
- invariantOK(finishEvhStatus.getStatus());
- EventHandle finishEvh = finishEvhStatus.getValue();
- auto signalFinishEvent =
- [this, finishEvh](const CallbackArgs&) { this->_replExecutor.signalEvent(finishEvh); };
- auto work = [this, term, updateResult, signalFinishEvent](const CallbackArgs& args) {
- auto evh = _updateTerm_incallback(term, updateResult);
- if (evh.isValid()) {
- _replExecutor.onEvent(evh, signalFinishEvent);
- } else {
- signalFinishEvent(args);
- }
- };
- _scheduleWork(work);
+ LockGuard topoLock(_topoMutex);
+
+ EventHandle finishEvh;
+ finishEvh = _updateTerm_incallback(term, updateResult);
+ if (!finishEvh) {
+ auto finishEvhStatus = _replExecutor.makeEvent();
+ invariantOK(finishEvhStatus.getStatus());
+ finishEvh = finishEvhStatus.getValue();
+ _replExecutor.signalEvent(finishEvh);
+ }
return finishEvh;
}
@@ -3624,10 +3278,12 @@ Status ReplicationCoordinatorImpl::updateTerm(OperationContext* txn, long long t
dassert(!txn->lockState()->isLocked());
TopologyCoordinator::UpdateTermResult updateTermResult;
EventHandle finishEvh;
- auto work = [this, term, &updateTermResult, &finishEvh](const CallbackArgs&) {
+
+ {
+ LockGuard topoLock(_topoMutex);
finishEvh = _updateTerm_incallback(term, &updateTermResult);
- };
- _scheduleWorkAndWaitForCompletion(work);
+ }
+
// Wait for potential stepdown to finish.
if (finishEvh.isValid()) {
_replExecutor.waitForEvent(finishEvh);
@@ -3829,10 +3485,9 @@ void ReplicationCoordinatorImpl::_scheduleWorkAtAndWaitForCompletion(Date_t when
}
}
-// static
CallbackHandle ReplicationCoordinatorImpl::_wrapAndScheduleWork(ScheduleFn scheduleFn,
const CallbackFn& work) {
- auto workWrapped = [work](const CallbackArgs& args) {
+ auto workWrapped = [this, work](const CallbackArgs& args) {
if (args.status == ErrorCodes::CallbackCanceled) {
return;
}
@@ -3856,23 +3511,12 @@ EventHandle ReplicationCoordinatorImpl::_makeEvent() {
}
void ReplicationCoordinatorImpl::_scheduleElectionWinNotification() {
- auto electionWinNotificationCallback = [this](const CallbackArgs& cbData) {
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- return;
- }
-
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- if (!_getMemberState_inlock().primary()) {
- return;
- }
-
- _restartHeartbeats_inlock(cbData);
- };
-
- auto cbStatus = _replExecutor.scheduleWork(electionWinNotificationCallback);
- if (!cbStatus.getStatus().isOK()) {
- warning() << "Error in scheduling notification of election win: " << cbStatus.getStatus();
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ if (!_getMemberState_inlock().primary()) {
+ return;
}
+
+ _restartHeartbeats_inlock();
}
WriteConcernOptions ReplicationCoordinatorImpl::populateUnsetWriteConcernOptionsSyncMode(
@@ -3889,5 +3533,16 @@ WriteConcernOptions ReplicationCoordinatorImpl::populateUnsetWriteConcernOptions
return writeConcern;
}
+CallbackFn ReplicationCoordinatorImpl::_wrapAsCallbackFn(const stdx::function<void()>& work) {
+ return [work](const CallbackArgs& cbData) {
+ if (cbData.status == ErrorCodes::CallbackCanceled) {
+ return;
+ }
+
+ work();
+ };
+}
+
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index 461d3ababad..bd2e06b8a7c 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -587,10 +587,8 @@ private:
* Returns an action to be performed after unlocking _mutex, via
* _performPostMemberStateUpdateAction.
*/
- PostMemberStateUpdateAction _setCurrentRSConfig_inlock(
- const ReplicationExecutor::CallbackArgs& cbData,
- const ReplicaSetConfig& newConfig,
- int myIndex);
+ PostMemberStateUpdateAction _setCurrentRSConfig_inlock(const ReplicaSetConfig& newConfig,
+ int myIndex);
/**
* Updates the last committed OpTime to be "committedOpTime" if it is more recent than the
@@ -605,74 +603,6 @@ private:
void _wakeReadyWaiters_inlock();
/**
- * Helper method for setting/unsetting maintenance mode. Scheduled by setMaintenanceMode()
- * to run in a global write lock in the replication executor thread.
- */
- void _setMaintenanceMode_helper(const ReplicationExecutor::CallbackArgs& cbData,
- bool activate,
- Status* result);
-
- /**
- * Helper method for retrieving maintenance mode. Scheduled by getMaintenanceMode() to run
- * in the replication executor thread.
- */
- void _getMaintenanceMode_helper(const ReplicationExecutor::CallbackArgs& cbData,
- bool* maintenanceMode);
-
- /**
- * Bottom half of fillIsMasterForReplSet.
- */
- void _fillIsMasterForReplSet_finish(const ReplicationExecutor::CallbackArgs& cbData,
- IsMasterResponse* result);
-
- /**
- * Bottom half of processReplSetFresh.
- */
- void _processReplSetFresh_finish(const ReplicationExecutor::CallbackArgs& cbData,
- const ReplSetFreshArgs& args,
- BSONObjBuilder* response,
- Status* result);
-
- /**
- * Bottom half of processReplSetElect.
- */
- void _processReplSetElect_finish(const ReplicationExecutor::CallbackArgs& cbData,
- const ReplSetElectArgs& args,
- BSONObjBuilder* response,
- Status* result);
-
- /**
- * Bottom half of processReplSetFreeze.
- */
- void _processReplSetFreeze_finish(const ReplicationExecutor::CallbackArgs& cbData,
- int secs,
- BSONObjBuilder* response,
- Status* result);
-
- /**
- * Bottom half of processReplSetDeclareElectionWinner.
- */
- void _processReplSetDeclareElectionWinner_finish(
- const ReplicationExecutor::CallbackArgs& cbData,
- const ReplSetDeclareElectionWinnerArgs& args,
- long long* responseTerm,
- Status* result);
-
- /**
- * Bottom half of processReplSetRequestVotes.
- */
- void _processReplSetRequestVotes_finish(const ReplicationExecutor::CallbackArgs& cbData,
- const ReplSetRequestVotesArgs& args,
- ReplSetRequestVotesResponse* response,
- Status* result);
-
- /**
- * Bottom half of prepareReplResponseMetadata.
- */
- void _prepareReplResponseMetadata_finish(const ReplicationExecutor::CallbackArgs& cbData,
- const OpTime& lastOpTimeFromClient,
- rpc::ReplSetMetadata* metadata);
- /**
* Scheduled to cause the ReplicationCoordinator to reconsider any state that might
* need to change as a result of time passing - for instance becoming PRIMARY when a single
* node replica set member's stepDown period ends.
@@ -722,7 +652,7 @@ private:
/**
* Triggers all callbacks that are blocked waiting for new heartbeat data
* to decide whether or not to finish a step down.
- * Should only be called from executor callbacks.
+ * Should only be called with _topoMutex held.
*/
void _signalStepDownWaiters();
@@ -731,8 +661,7 @@ private:
* it is running within a global shared lock, and thus that no writes are going on at the
* same time.
*/
- void _stepDownContinue(const ReplicationExecutor::CallbackArgs& cbData,
- const ReplicationExecutor::EventHandle finishedEvent,
+ void _stepDownContinue(const ReplicationExecutor::EventHandle finishedEvent,
OperationContext* txn,
Date_t waitUntil,
Date_t stepdownUntil,
@@ -755,8 +684,7 @@ private:
* supply an event, "finishedSettingFollowerMode", and wait for that event to
* be signaled. Do not observe "*success" until after the event is signaled.
*/
- void _setFollowerModeFinish(const ReplicationExecutor::CallbackArgs& cbData,
- const MemberState& newState,
+ void _setFollowerModeFinish(const MemberState& newState,
const ReplicationExecutor::EventHandle& finishedSettingFollowerMode,
bool* success);
@@ -822,21 +750,21 @@ private:
const OpTime& appliedOpTime);
/**
- * Starts a heartbeat for each member in the current config. Called within the executor
- * context.
+ * Starts a heartbeat for each member in the current config. Called while holding _topoMutex
+ * and replCoord _mutex.
*/
- void _startHeartbeats_inlock(const ReplicationExecutor::CallbackArgs& cbData);
+ void _startHeartbeats_inlock();
/**
- * Cancels all heartbeats. Called within executor context.
+ * Cancels all heartbeats. Called while holding _topoMutex and replCoord _mutex.
*/
void _cancelHeartbeats_inlock();
/**
* Cancels all heartbeats, then starts a heartbeat for each member in the current config.
- * Called within the executor context.
+ * Called while holding _topoMutex and replCoord _mutex.
*/
- void _restartHeartbeats_inlock(const ReplicationExecutor::CallbackArgs& cbData);
+ void _restartHeartbeats_inlock();
/**
* Asynchronously sends a heartbeat to "target". "targetIndex" is the index
@@ -853,15 +781,6 @@ private:
MemberState _getMemberState_inlock() const;
/**
- * Callback that gives the TopologyCoordinator an initial LastVote document from
- * local storage.
- *
- * Called only during replication startup. All other updates come from the
- * TopologyCoordinator itself.
- */
- void _updateLastVote(const LastVote& lastVote);
-
- /**
* Starts loading the replication configuration from local storage, and if it is valid,
* schedules a callback (of _finishLoadLocalConfig) to set it as the current replica set
* config (sets _rsConfig and _thisMembersConfigIndex).
@@ -892,16 +811,14 @@ private:
void _stopDataReplication();
/**
- * Callback that finishes the work of processReplSetInitiate() inside the replication
- * executor context, in the event of a successful quorum check.
+ * Finishes the work of processReplSetInitiate() while holding _topoMutex, in the event of
+ * a successful quorum check.
*/
- void _finishReplSetInitiate(const ReplicationExecutor::CallbackArgs& cbData,
- const ReplicaSetConfig& newConfig,
- int myIndex);
+ void _finishReplSetInitiate(const ReplicaSetConfig& newConfig, int myIndex);
/**
- * Callback that finishes the work of processReplSetReconfig inside the replication
- * executor context, in the event of a successful quorum check.
+ * Finishes the work of processReplSetReconfig while holding _topoMutex, in the event of
+ * a successful quorum check.
*/
void _finishReplSetReconfig(const ReplicationExecutor::CallbackArgs& cbData,
const ReplicaSetConfig& newConfig,
@@ -931,7 +848,7 @@ private:
* Begins an attempt to elect this node.
* Called after an incoming heartbeat changes this node's view of the set such that it
* believes it can be elected PRIMARY.
- * For proper concurrency, must be called via a ReplicationExecutor callback.
+ * For proper concurrency, must be called while holding _topoMutex.
*
* For old style elections the election path is:
* _startElectSelf()
@@ -994,27 +911,6 @@ private:
void _recoverFromElectionTie(const ReplicationExecutor::CallbackArgs& cbData);
/**
- * Chooses a new sync source. Must be scheduled as a callback.
- *
- * Calls into the Topology Coordinator, which uses its current view of the set to choose
- * the most appropriate sync source.
- */
- void _chooseNewSyncSource(const ReplicationExecutor::CallbackArgs& cbData,
- const Timestamp& lastTimestampFetched,
- HostAndPort* newSyncSource);
-
- /**
- * Adds 'host' to the sync source blacklist until 'until'. A blacklisted source cannot
- * be chosen as a sync source. Schedules a callback to unblacklist the sync source to be
- * run at 'until'.
- *
- * Must be scheduled as a callback.
- */
- void _blacklistSyncSource(const ReplicationExecutor::CallbackArgs& cbData,
- const HostAndPort& host,
- Date_t until);
-
- /**
* Removes 'host' from the sync source blacklist. If 'host' isn't found, it's simply
* ignored and no error is thrown.
*
@@ -1024,17 +920,6 @@ private:
const HostAndPort& host);
/**
- * Determines if a new sync source should be considered.
- *
- * Must be scheduled as a callback.
- */
- void _shouldChangeSyncSource(const ReplicationExecutor::CallbackArgs& cbData,
- const HostAndPort& currentSource,
- const OpTime& syncSourceLastOpTime,
- bool syncSourceHasSyncSource,
- bool* shouldChange);
-
- /**
* Schedules a request that the given host step down; logs any errors.
*/
void _requestRemotePrimaryStepdown(const HostAndPort& target);
@@ -1083,34 +968,11 @@ private:
const StatusWith<ReplSetHeartbeatResponse>& responseStatus);
/**
- * Bottom half of processHeartbeat(), which runs in the replication executor.
- */
- void _processHeartbeatFinish(const ReplicationExecutor::CallbackArgs& cbData,
- const ReplSetHeartbeatArgs& args,
- ReplSetHeartbeatResponse* response,
- Status* outStatus);
-
- /**
- * Bottom half of processHeartbeatV1(), which runs in the replication executor.
- */
- void _processHeartbeatFinishV1(const ReplicationExecutor::CallbackArgs& cbData,
- const ReplSetHeartbeatArgsV1& args,
- ReplSetHeartbeatResponse* response,
- Status* outStatus);
- /**
* Scan the SlaveInfoVector and determine the highest OplogEntry present on a majority of
* servers; set _lastCommittedOpTime to this new entry, if greater than the current entry.
*/
void _updateLastCommittedOpTime_inlock();
- void _summarizeAsHtml_finish(const ReplicationExecutor::CallbackArgs& cbData,
- ReplSetHtmlSummary* output);
-
- /**
- * Callback that gets the current term from topology coordinator.
- */
- void _getTerm_helper(const ReplicationExecutor::CallbackArgs& cbData, long long* term);
-
/**
* This is used to set a floor of "newOpTime" on the OpTimes we will consider committed.
* This prevents entries from before our election from counting as committed in our view,
@@ -1147,16 +1009,10 @@ private:
void _dropAllSnapshots_inlock();
/**
- * Callback which schedules "_handleLivenessTimeout" to be run whenever the liveness timeout
- * for the node who was least recently reported to be alive occurs.
- */
- void _scheduleNextLivenessUpdate(const ReplicationExecutor::CallbackArgs& cbData);
-
- /**
* Bottom half of _scheduleNextLivenessUpdate.
- * Must be called from within a callback.
+ * Must be called with _topoMutex held.
*/
- void _scheduleNextLivenessUpdate_inlock(const ReplicationExecutor::CallbackArgs& cbData);
+ void _scheduleNextLivenessUpdate_inlock();
/**
* Callback which marks downed nodes as down, triggers a stepdown if a majority of nodes are no
@@ -1227,7 +1083,7 @@ private:
* Used by _scheduleWork() and _scheduleWorkAt() only.
* Do not call this function directly.
*/
- static CallbackHandle _wrapAndScheduleWork(ScheduleFn scheduleFn, const CallbackFn& work);
+ CallbackHandle _wrapAndScheduleWork(ScheduleFn scheduleFn, const CallbackFn& work);
/**
* Creates an event.
@@ -1241,6 +1097,12 @@ private:
*/
void _scheduleElectionWinNotification();
+ /**
+ * Wrap a function into executor callback.
+ * If the callback is cancelled, the given function won't run.
+ */
+ executor::TaskExecutor::CallbackFn _wrapAsCallbackFn(const stdx::function<void()>& work);
+
//
// All member variables are labeled with one of the following codes indicating the
// synchronization rules for accessing them.
@@ -1250,17 +1112,24 @@ private:
// (PS) Pointer is read-only in concurrent operation, item pointed to is self-synchronizing;
// Access in any context.
// (M) Reads and writes guarded by _mutex
- // (X) Reads and writes must be performed in a callback in _replExecutor
- // (MX) Must hold _mutex and be in a callback in _replExecutor to write; must either hold
- // _mutex or be in a callback in _replExecutor to read.
+ // (X) Reads and writes guarded by _topoMutex
+ // (MX) Must hold _mutex and _topoMutex to write; must either hold _mutex or _topoMutex
+ // to read.
// (GX) Readable under a global intent lock. Must either hold global lock in exclusive
- // mode (MODE_X) or both hold global lock in shared mode (MODE_S) and be in executor
- // context to write.
+ // mode (MODE_X) or both hold global lock in shared mode (MODE_S) and hold _topoMutex
+ // to write.
// (I) Independently synchronized, see member variable comment.
+ // When both _mutex and _topoMutex are needed, the caller must follow the strict locking order
+ // to avoid deadlock: _topoMutex must be held before locking _mutex.
+ // In other words, _topoMutex can never be locked while holding _mutex.
+
// Protects member data of this ReplicationCoordinator.
mutable stdx::mutex _mutex; // (S)
+ // Protects member data of the TopologyCoordinator.
+ mutable stdx::mutex _topoMutex; // (S)
+
// Handles to actively queued heartbeats.
HeartbeatHandles _heartbeatHandles; // (X)
diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect.cpp
index a2cd0eb386e..a70c8963af8 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_elect.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_elect.cpp
@@ -137,17 +137,18 @@ void ReplicationCoordinatorImpl::_startElectSelf() {
// _mutex again.
lk.unlock();
- StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh = _freshnessChecker->start(
- &_replExecutor,
- lastOpTimeApplied.getTimestamp(),
- _rsConfig,
- _selfIndex,
- _topCoord->getMaybeUpHostAndPorts(),
- stdx::bind(&ReplicationCoordinatorImpl::_onFreshnessCheckComplete, this));
+ StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh =
+ _freshnessChecker->start(&_replExecutor,
+ lastOpTimeApplied.getTimestamp(),
+ _rsConfig,
+ _selfIndex,
+ _topCoord->getMaybeUpHostAndPorts());
if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) {
return;
}
fassert(18681, nextPhaseEvh.getStatus());
+ _replExecutor.onEvent(nextPhaseEvh.getValue(),
+ stdx::bind(&ReplicationCoordinatorImpl::_onFreshnessCheckComplete, this));
lossGuard.dismiss();
}
@@ -159,6 +160,7 @@ void ReplicationCoordinatorImpl::_onFreshnessCheckComplete() {
&_freshnessChecker,
&_electCmdRunner,
&_electionFinishedEvent);
+ LockGuard lk(_topoMutex);
if (_freshnessChecker->isCanceled()) {
LOG(2) << "Election canceled during freshness check phase";
@@ -180,11 +182,10 @@ void ReplicationCoordinatorImpl::_onFreshnessCheckComplete() {
log() << "possible election tie; sleeping " << ms << " until "
<< dateToISOStringLocal(nextCandidateTime);
_topCoord->setElectionSleepUntil(nextCandidateTime);
- _replExecutor.scheduleWorkAt(
- nextCandidateTime,
- stdx::bind(&ReplicationCoordinatorImpl::_recoverFromElectionTie,
- this,
- stdx::placeholders::_1));
+ _scheduleWorkAt(nextCandidateTime,
+ stdx::bind(&ReplicationCoordinatorImpl::_recoverFromElectionTie,
+ this,
+ stdx::placeholders::_1));
_sleptLastElection = true;
return;
}
@@ -210,15 +211,14 @@ void ReplicationCoordinatorImpl::_onFreshnessCheckComplete() {
_electCmdRunner.reset(new ElectCmdRunner);
StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh = _electCmdRunner->start(
- &_replExecutor,
- _rsConfig,
- _selfIndex,
- _topCoord->getMaybeUpHostAndPorts(),
- stdx::bind(&ReplicationCoordinatorImpl::_onElectCmdRunnerComplete, this));
+ &_replExecutor, _rsConfig, _selfIndex, _topCoord->getMaybeUpHostAndPorts());
if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) {
return;
}
fassert(18685, nextPhaseEvh.getStatus());
+
+ _replExecutor.onEvent(nextPhaseEvh.getValue(),
+ stdx::bind(&ReplicationCoordinatorImpl::_onElectCmdRunnerComplete, this));
lossGuard.dismiss();
}
@@ -228,6 +228,7 @@ void ReplicationCoordinatorImpl::_onElectCmdRunnerComplete() {
&_freshnessChecker,
&_electCmdRunner,
&_electionFinishedEvent);
+ LockGuard lk(_topoMutex);
invariant(_freshnessChecker);
invariant(_electCmdRunner);
@@ -248,11 +249,10 @@ void ReplicationCoordinatorImpl::_onElectCmdRunnerComplete() {
const Date_t nextCandidateTime = now + ms;
log() << "waiting until " << nextCandidateTime << " before standing for election again";
_topCoord->setElectionSleepUntil(nextCandidateTime);
- _replExecutor.scheduleWorkAt(
- nextCandidateTime,
- stdx::bind(&ReplicationCoordinatorImpl::_recoverFromElectionTie,
- this,
- stdx::placeholders::_1));
+ _scheduleWorkAt(nextCandidateTime,
+ stdx::bind(&ReplicationCoordinatorImpl::_recoverFromElectionTie,
+ this,
+ stdx::placeholders::_1));
return;
}
@@ -272,9 +272,8 @@ void ReplicationCoordinatorImpl::_onElectCmdRunnerComplete() {
void ReplicationCoordinatorImpl::_recoverFromElectionTie(
const ReplicationExecutor::CallbackArgs& cbData) {
- if (!cbData.status.isOK()) {
- return;
- }
+ LockGuard topoLock(_topoMutex);
+
auto now = _replExecutor.now();
auto lastOpApplied = getMyLastAppliedOpTime();
const auto status = _topCoord->checkShouldStandForElection(now, lastOpApplied);
diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp
index ca98c3cb471..84d00015fbd 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp
@@ -547,6 +547,10 @@ TEST_F(ReplCoordElectTest, NodeCancelsElectionUponReceivingANewConfigDuringFresh
BSONObjBuilder result;
ASSERT_OK(getReplCoord()->processReplSetReconfig(&txn, config, &result));
+ // Wait until election cancels.
+ net->enterNetwork();
+ net->runReadyNetworkOperations();
+ net->exitNetwork();
ASSERT(TopologyCoordinator::Role::follower == getTopoCoord().getRole());
}
@@ -583,6 +587,10 @@ TEST_F(ReplCoordElectTest, NodeCancelsElectionUponReceivingANewConfigDuringElect
BSONObjBuilder result;
ASSERT_OK(getReplCoord()->processReplSetReconfig(&txn, config, &result));
+ // Wait until election cancels.
+ getNet()->enterNetwork();
+ getNet()->runReadyNetworkOperations();
+ getNet()->exitNetwork();
ASSERT(TopologyCoordinator::Role::follower == getTopoCoord().getRole());
}
diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp
index acc9f8cf2ec..b0ac53d9491 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp
@@ -143,18 +143,19 @@ void ReplicationCoordinatorImpl::_startElectSelfV1() {
lk.unlock();
long long term = _topCoord->getTerm();
- StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh = _voteRequester->start(
- &_replExecutor,
- _rsConfig,
- _selfIndex,
- _topCoord->getTerm(),
- true, // dry run
- lastOpTime,
- stdx::bind(&ReplicationCoordinatorImpl::_onDryRunComplete, this, term));
+ StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh =
+ _voteRequester->start(&_replExecutor,
+ _rsConfig,
+ _selfIndex,
+ _topCoord->getTerm(),
+ true, // dry run
+ lastOpTime);
if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) {
return;
}
fassert(28685, nextPhaseEvh.getStatus());
+ _replExecutor.onEvent(nextPhaseEvh.getValue(),
+ stdx::bind(&ReplicationCoordinatorImpl::_onDryRunComplete, this, term));
lossGuard.dismiss();
}
@@ -163,6 +164,8 @@ void ReplicationCoordinatorImpl::_onDryRunComplete(long long originalTerm) {
invariant(!_electionWinnerDeclarer);
LoseElectionDryRunGuardV1 lossGuard(this);
+ LockGuard lk(_topoMutex);
+
if (_topCoord->getTerm() != originalTerm) {
log() << "not running for primary, we have been superceded already";
return;
@@ -222,15 +225,8 @@ void ReplicationCoordinatorImpl::_writeLastVoteForMyElection(
return;
}
- auto cbStatus = _replExecutor.scheduleWork(
- [this, lastVote](const ReplicationExecutor::CallbackArgs& cbData) {
- _replExecutor.signalEvent(_electionDryRunFinishedEvent);
- _startVoteRequester(lastVote.getTerm());
- });
- if (cbStatus.getStatus() == ErrorCodes::ShutdownInProgress) {
- return;
- }
- fassert(28768, cbStatus.getStatus());
+ _startVoteRequester(lastVote.getTerm());
+ _replExecutor.signalEvent(_electionDryRunFinishedEvent);
lossGuard.dismiss();
}
@@ -240,22 +236,21 @@ void ReplicationCoordinatorImpl::_startVoteRequester(long long newTerm) {
invariant(!_electionWinnerDeclarer);
LoseElectionGuardV1 lossGuard(this);
+ LockGuard lk(_topoMutex);
+
const auto lastOpTime =
_isDurableStorageEngine() ? getMyLastDurableOpTime() : getMyLastAppliedOpTime();
_voteRequester.reset(new VoteRequester);
StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh = _voteRequester->start(
- &_replExecutor,
- _rsConfig,
- _selfIndex,
- _topCoord->getTerm(),
- false,
- lastOpTime,
- stdx::bind(&ReplicationCoordinatorImpl::_onVoteRequestComplete, this, newTerm));
+ &_replExecutor, _rsConfig, _selfIndex, _topCoord->getTerm(), false, lastOpTime);
if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) {
return;
}
fassert(28643, nextPhaseEvh.getStatus());
+ _replExecutor.onEvent(
+ nextPhaseEvh.getValue(),
+ stdx::bind(&ReplicationCoordinatorImpl::_onVoteRequestComplete, this, newTerm));
lossGuard.dismiss();
}
@@ -265,6 +260,8 @@ void ReplicationCoordinatorImpl::_onVoteRequestComplete(long long originalTerm)
invariant(!_electionWinnerDeclarer);
LoseElectionGuardV1 lossGuard(this);
+ LockGuard lk(_topoMutex);
+
if (_topCoord->getTerm() != originalTerm) {
log() << "not becoming primary, we have been superceded already";
return;
diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
index 881e3ef69bc..4679d17004c 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
@@ -846,6 +846,10 @@ TEST_F(ReplCoordTest, NodeCancelsElectionUponReceivingANewConfigDuringDryRun) {
BSONObjBuilder result;
ASSERT_OK(getReplCoord()->processReplSetReconfig(&txn, config, &result));
+ // Wait until election cancels.
+ net->enterNetwork();
+ net->runReadyNetworkOperations();
+ net->exitNetwork();
ASSERT(TopologyCoordinator::Role::follower == getTopoCoord().getRole());
}
@@ -882,6 +886,10 @@ TEST_F(ReplCoordTest, NodeCancelsElectionUponReceivingANewConfigDuringVotePhase)
BSONObjBuilder result;
ASSERT_OK(getReplCoord()->processReplSetReconfig(&txn, config, &result));
+ // Wait until election cancels.
+ getNet()->enterNetwork();
+ getNet()->runReadyNetworkOperations();
+ getNet()->exitNetwork();
ASSERT(TopologyCoordinator::Role::follower == getTopoCoord().getRole());
}
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
index df03ebe37b7..2be0f71b5d8 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
@@ -44,6 +44,7 @@
#include "mongo/db/repl/replication_coordinator_impl.h"
#include "mongo/db/repl/replication_executor.h"
#include "mongo/db/repl/topology_coordinator.h"
+#include "mongo/db/repl/vote_requester.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/stdx/functional.h"
@@ -67,6 +68,8 @@ using executor::RemoteCommandRequest;
void ReplicationCoordinatorImpl::_doMemberHeartbeat(ReplicationExecutor::CallbackArgs cbData,
const HostAndPort& target,
int targetIndex) {
+ LockGuard topoLock(_topoMutex);
+
_untrackHeartbeatHandle(cbData.myHandle);
if (cbData.status == ErrorCodes::CallbackCanceled) {
return;
@@ -113,6 +116,8 @@ void ReplicationCoordinatorImpl::_scheduleHeartbeatToTarget(const HostAndPort& t
void ReplicationCoordinatorImpl::_handleHeartbeatResponse(
const ReplicationExecutor::RemoteCommandCallbackArgs& cbData, int targetIndex) {
+ LockGuard topoLock(_topoMutex);
+
// remove handle from queued heartbeats
_untrackHeartbeatHandle(cbData.myHandle);
@@ -320,6 +325,9 @@ void ReplicationCoordinatorImpl::_stepDownFinish(
if (cbData.status == ErrorCodes::CallbackCanceled) {
return;
}
+
+ LockGuard topoLock(_topoMutex);
+
invariant(cbData.txn);
// TODO Add invariant that we've got global shared or global exclusive lock, when supported
// by lock manager.
@@ -360,9 +368,9 @@ void ReplicationCoordinatorImpl::_scheduleHeartbeatReconfig(const ReplicaSetConf
invariant(!_rsConfig.isInitialized() ||
_rsConfig.getConfigVersion() < newConfig.getConfigVersion());
if (_freshnessChecker) {
- _freshnessChecker->cancel(&_replExecutor);
+ _freshnessChecker->cancel();
if (_electCmdRunner) {
- _electCmdRunner->cancel(&_replExecutor);
+ _electCmdRunner->cancel();
}
_replExecutor.onEvent(
_electionFinishedEvent,
@@ -383,6 +391,8 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigAfterElectionCanceled(
if (cbData.status == ErrorCodes::CallbackCanceled) {
return;
}
+
+ LockGuard topoLock(_topoMutex);
fassert(18911, cbData.status);
stdx::lock_guard<stdx::mutex> lk(_mutex);
if (_inShutdown) {
@@ -452,7 +462,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore(
}
}
- const stdx::function<void(const ReplicationExecutor::CallbackArgs&)> reconfigFinishFn(
+ const CallbackFn reconfigFinishFn(
stdx::bind(&ReplicationCoordinatorImpl::_heartbeatReconfigFinish,
this,
stdx::placeholders::_1,
@@ -481,6 +491,8 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish(
return;
}
+ LockGuard topoLock(_topoMutex);
+
stdx::unique_lock<stdx::mutex> lk(_mutex);
invariant(_rsConfigState == kConfigHBReconfiguring);
invariant(!_rsConfig.isInitialized() ||
@@ -501,6 +513,28 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish(
return;
}
+ // Do not conduct an election during a reconfig, as the node may not be electable post-reconfig.
+ if (_topCoord->getRole() == TopologyCoordinator::Role::candidate) {
+ if (isV1ElectionProtocol()) {
+ invariant(_voteRequester);
+ _voteRequester->cancel();
+ } else {
+ invariant(_freshnessChecker);
+ _freshnessChecker->cancel();
+ if (_electCmdRunner) {
+ _electCmdRunner->cancel();
+ }
+ }
+ // Wait for the election to complete and the node's Role to be set to follower.
+ _replExecutor.onEvent(_electionFinishedEvent,
+ stdx::bind(&ReplicationCoordinatorImpl::_heartbeatReconfigFinish,
+ this,
+ stdx::placeholders::_1,
+ newConfig,
+ myIndex));
+ return;
+ }
+
if (!myIndex.isOK()) {
switch (myIndex.getStatus().code()) {
case ErrorCodes::NodeNotFound:
@@ -524,8 +558,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish(
// If we do not have an index, we should pass -1 as our index to avoid falsely adding ourself to
// the data structures inside of the TopologyCoordinator.
const int myIndexValue = myIndex.getStatus().isOK() ? myIndex.getValue() : -1;
- const PostMemberStateUpdateAction action =
- _setCurrentRSConfig_inlock(cbData, newConfig, myIndexValue);
+ const PostMemberStateUpdateAction action = _setCurrentRSConfig_inlock(newConfig, myIndexValue);
lk.unlock();
_resetElectionInfoOnProtocolVersionUpgrade(oldConfig, newConfig);
_performPostMemberStateUpdateAction(action);
@@ -558,14 +591,12 @@ void ReplicationCoordinatorImpl::_cancelHeartbeats_inlock() {
}
}
-void ReplicationCoordinatorImpl::_restartHeartbeats_inlock(
- const ReplicationExecutor::CallbackArgs& cbData) {
+void ReplicationCoordinatorImpl::_restartHeartbeats_inlock() {
_cancelHeartbeats_inlock();
- _startHeartbeats_inlock(cbData);
+ _startHeartbeats_inlock();
}
-void ReplicationCoordinatorImpl::_startHeartbeats_inlock(
- const ReplicationExecutor::CallbackArgs& cbData) {
+void ReplicationCoordinatorImpl::_startHeartbeats_inlock() {
const Date_t now = _replExecutor.now();
_seedList.clear();
for (int i = 0; i < _rsConfig.getNumMembers(); ++i) {
@@ -579,12 +610,13 @@ void ReplicationCoordinatorImpl::_startHeartbeats_inlock(
slaveInfo.lastUpdate = _replExecutor.now();
slaveInfo.down = false;
}
- _scheduleNextLivenessUpdate_inlock(cbData);
+ _scheduleNextLivenessUpdate_inlock();
}
}
void ReplicationCoordinatorImpl::_handleLivenessTimeout(
const ReplicationExecutor::CallbackArgs& cbData) {
+ LockGuard topoLock(_topoMutex);
stdx::lock_guard<stdx::mutex> lk(_mutex);
// Only reset the callback handle if it matches, otherwise more will be coming through
if (cbData.myHandle == _handleLivenessTimeoutCbh) {
@@ -628,20 +660,10 @@ void ReplicationCoordinatorImpl::_handleLivenessTimeout(
}
}
}
- _scheduleNextLivenessUpdate_inlock(cbData);
-}
-
-void ReplicationCoordinatorImpl::_scheduleNextLivenessUpdate(
- const ReplicationExecutor::CallbackArgs& cbData) {
- if (cbData.status == ErrorCodes::CallbackCanceled)
- return;
-
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- _scheduleNextLivenessUpdate_inlock(cbData);
+ _scheduleNextLivenessUpdate_inlock();
}
-void ReplicationCoordinatorImpl::_scheduleNextLivenessUpdate_inlock(
- const ReplicationExecutor::CallbackArgs& cbData) {
+void ReplicationCoordinatorImpl::_scheduleNextLivenessUpdate_inlock() {
if (!isV1ElectionProtocol()) {
return;
}
@@ -678,15 +700,14 @@ void ReplicationCoordinatorImpl::_scheduleNextLivenessUpdate_inlock(
auto nextTimeout = earliestDate + _rsConfig.getElectionTimeoutPeriod();
if (nextTimeout > _replExecutor.now()) {
LOG(3) << "scheduling next check at " << nextTimeout;
- auto cbh = _replExecutor.scheduleWorkAt(
- nextTimeout,
- stdx::bind(
- &ReplicationCoordinatorImpl::_handleLivenessTimeout, this, stdx::placeholders::_1));
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
+ auto cbh = _scheduleWorkAt(nextTimeout,
+ stdx::bind(&ReplicationCoordinatorImpl::_handleLivenessTimeout,
+ this,
+ stdx::placeholders::_1));
+ if (!cbh) {
return;
}
- fassert(22002, cbh.getStatus());
- _handleLivenessTimeoutCbh = cbh.getValue();
+ _handleLivenessTimeoutCbh = cbh;
_earliestMemberId = earliestMemberId;
}
}
@@ -698,8 +719,7 @@ void ReplicationCoordinatorImpl::_cancelAndRescheduleLivenessUpdate_inlock(int u
if (_handleLivenessTimeoutCbh.isValid()) {
_replExecutor.cancel(_handleLivenessTimeoutCbh);
}
- _replExecutor.scheduleWork(stdx::bind(
- &ReplicationCoordinatorImpl::_scheduleNextLivenessUpdate, this, stdx::placeholders::_1));
+ _scheduleNextLivenessUpdate_inlock();
}
void ReplicationCoordinatorImpl::_cancelPriorityTakeover_inlock() {
@@ -748,6 +768,8 @@ void ReplicationCoordinatorImpl::_cancelAndRescheduleElectionTimeout_inlock() {
}
void ReplicationCoordinatorImpl::_startElectSelfIfEligibleV1(bool isPriorityTakeOver) {
+ LockGuard topoLock(_topoMutex);
+
if (!isV1ElectionProtocol()) {
return;
}
diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
index de8a175d01c..960987a5b46 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
@@ -1692,6 +1692,7 @@ TEST_F(StepDownTest,
// Make a secondary actually catch up
enterNetwork();
+ getNet()->runUntil(getNet()->now() + Milliseconds(1000));
ASSERT(getNet()->hasReadyRequests());
NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
RemoteCommandRequest request = noi->getRequest();
@@ -1750,6 +1751,7 @@ TEST_F(StepDownTest,
// Secondary has not caught up on first round of heartbeats.
enterNetwork();
+ getNet()->runUntil(getNet()->now() + Milliseconds(1000));
ASSERT(getNet()->hasReadyRequests());
NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
RemoteCommandRequest request = noi->getRequest();
@@ -4218,12 +4220,12 @@ TEST_F(ReplCoordTest, WaitForMemberState) {
replCoord->setMyLastDurableOpTime(OpTime(Timestamp(1, 0), 0));
ASSERT_TRUE(replCoord->setFollowerMode(MemberState::RS_SECONDARY));
- // Successful dry run election increases term.
- ASSERT_EQUALS(initialTerm + 1, replCoord->getTerm());
-
// Single node cluster - this node should start election on setFollowerMode() completion.
replCoord->waitForElectionFinish_forTest();
+ // Successful dry run election increases term.
+ ASSERT_EQUALS(initialTerm + 1, replCoord->getTerm());
+
auto timeout = Milliseconds(1);
ASSERT_OK(replCoord->waitForMemberState(MemberState::RS_PRIMARY, timeout));
ASSERT_EQUALS(ErrorCodes::ExceededTimeLimit,
@@ -4253,12 +4255,12 @@ TEST_F(ReplCoordTest, WaitForDrainFinish) {
replCoord->setMyLastDurableOpTime(OpTime(Timestamp(1, 0), 0));
ASSERT_TRUE(replCoord->setFollowerMode(MemberState::RS_SECONDARY));
- // Successful dry run election increases term.
- ASSERT_EQUALS(initialTerm + 1, replCoord->getTerm());
-
// Single node cluster - this node should start election on setFollowerMode() completion.
replCoord->waitForElectionFinish_forTest();
+ // Successful dry run election increases term.
+ ASSERT_EQUALS(initialTerm + 1, replCoord->getTerm());
+
auto timeout = Milliseconds(1);
ASSERT_OK(replCoord->waitForMemberState(MemberState::RS_PRIMARY, timeout));
diff --git a/src/mongo/db/repl/scatter_gather_runner.cpp b/src/mongo/db/repl/scatter_gather_runner.cpp
index 16c778399d0..042bb0f1c10 100644
--- a/src/mongo/db/repl/scatter_gather_runner.cpp
+++ b/src/mongo/db/repl/scatter_gather_runner.cpp
@@ -42,59 +42,63 @@ namespace mongo {
namespace repl {
using executor::RemoteCommandRequest;
-
-ScatterGatherRunner::ScatterGatherRunner(ScatterGatherAlgorithm* algorithm)
- : _algorithm(algorithm), _started(false) {}
-
-ScatterGatherRunner::~ScatterGatherRunner() {}
-
-static void startTrampoline(const ReplicationExecutor::CallbackArgs& cbData,
- ScatterGatherRunner* runner,
- StatusWith<ReplicationExecutor::EventHandle>* result) {
- // TODO: remove static cast once ScatterGatherRunner is designed to work with a generic
- // TaskExecutor.
- ReplicationExecutor* executor = static_cast<ReplicationExecutor*>(cbData.executor);
- *result = runner->start(executor);
-}
-
-Status ScatterGatherRunner::run(ReplicationExecutor* executor) {
- StatusWith<ReplicationExecutor::EventHandle> finishEvh(ErrorCodes::InternalError, "Not set");
- StatusWith<ReplicationExecutor::CallbackHandle> startCBH = executor->scheduleWork(
- stdx::bind(startTrampoline, stdx::placeholders::_1, this, &finishEvh));
- if (!startCBH.isOK()) {
- return startCBH.getStatus();
- }
- executor->wait(startCBH.getValue());
+using LockGuard = stdx::lock_guard<stdx::mutex>;
+using CallbackHandle = ReplicationExecutor::CallbackHandle;
+using EventHandle = ReplicationExecutor::EventHandle;
+using RemoteCommandCallbackArgs = ReplicationExecutor::RemoteCommandCallbackArgs;
+using RemoteCommandCallbackFn = ReplicationExecutor::RemoteCommandCallbackFn;
+
+ScatterGatherRunner::ScatterGatherRunner(ScatterGatherAlgorithm* algorithm,
+ ReplicationExecutor* executor)
+ : _executor(executor), _impl(std::make_shared<RunnerImpl>(algorithm, executor)) {}
+
+Status ScatterGatherRunner::run() {
+ auto finishEvh = start();
if (!finishEvh.isOK()) {
return finishEvh.getStatus();
}
- executor->waitForEvent(finishEvh.getValue());
+ _executor->waitForEvent(finishEvh.getValue());
return Status::OK();
}
-StatusWith<ReplicationExecutor::EventHandle> ScatterGatherRunner::start(
- ReplicationExecutor* executor, const stdx::function<void()>& onCompletion) {
+StatusWith<EventHandle> ScatterGatherRunner::start() {
+ // Callback has a shared pointer to the RunnerImpl, so it's always safe to
+ // access the RunnerImpl.
+ std::shared_ptr<RunnerImpl>& impl = _impl;
+ auto cb = [impl](const RemoteCommandCallbackArgs& cbData) { impl->processResponse(cbData); };
+ return _impl->start(cb);
+}
+
+void ScatterGatherRunner::cancel() {
+ _impl->cancel();
+}
+
+/**
+ * Scatter gather runner implementation.
+ */
+ScatterGatherRunner::RunnerImpl::RunnerImpl(ScatterGatherAlgorithm* algorithm,
+ ReplicationExecutor* executor)
+ : _executor(executor), _algorithm(algorithm) {}
+
+StatusWith<EventHandle> ScatterGatherRunner::RunnerImpl::start(
+ const RemoteCommandCallbackFn processResponseCB) {
+ LockGuard lk(_mutex);
+
invariant(!_started);
_started = true;
- _actualResponses = 0;
- _onCompletion = onCompletion;
- StatusWith<ReplicationExecutor::EventHandle> evh = executor->makeEvent();
+ StatusWith<EventHandle> evh = _executor->makeEvent();
if (!evh.isOK()) {
return evh;
}
_sufficientResponsesReceived = evh.getValue();
- ScopeGuard earlyReturnGuard =
- MakeGuard(&ScatterGatherRunner::_signalSufficientResponsesReceived, this, executor);
-
- const ReplicationExecutor::RemoteCommandCallbackFn cb =
- stdx::bind(&ScatterGatherRunner::_processResponse, stdx::placeholders::_1, this);
+ ScopeGuard earlyReturnGuard = MakeGuard(&RunnerImpl::_signalSufficientResponsesReceived, this);
std::vector<RemoteCommandRequest> requests = _algorithm->getRequests();
for (size_t i = 0; i < requests.size(); ++i) {
- const StatusWith<ReplicationExecutor::CallbackHandle> cbh =
- executor->scheduleRemoteCommand(requests[i], cb);
+ const StatusWith<CallbackHandle> cbh =
+ _executor->scheduleRemoteCommand(requests[i], processResponseCB);
if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return StatusWith<ReplicationExecutor::EventHandle>(cbh.getStatus());
+ return StatusWith<EventHandle>(cbh.getStatus());
}
fassert(18743, cbh.getStatus());
_callbacks.push_back(cbh.getValue());
@@ -102,50 +106,47 @@ StatusWith<ReplicationExecutor::EventHandle> ScatterGatherRunner::start(
if (_callbacks.empty() || _algorithm->hasReceivedSufficientResponses()) {
invariant(_algorithm->hasReceivedSufficientResponses());
- _signalSufficientResponsesReceived(executor);
+ _signalSufficientResponsesReceived();
}
earlyReturnGuard.Dismiss();
return evh;
}
-void ScatterGatherRunner::cancel(ReplicationExecutor* executor) {
+void ScatterGatherRunner::RunnerImpl::cancel() {
+ LockGuard lk(_mutex);
+
invariant(_started);
- _signalSufficientResponsesReceived(executor);
+ _signalSufficientResponsesReceived();
}
-void ScatterGatherRunner::_processResponse(
- const ReplicationExecutor::RemoteCommandCallbackArgs& cbData, ScatterGatherRunner* runner) {
- // It is possible that the ScatterGatherRunner has already gone out of scope, if the
- // response indicates the callback was canceled. In that case, do not access any members
- // of "runner" and return immediately.
+void ScatterGatherRunner::RunnerImpl::processResponse(
+ const ReplicationExecutor::RemoteCommandCallbackArgs& cbData) {
if (cbData.response.getStatus() == ErrorCodes::CallbackCanceled) {
return;
}
+ LockGuard lk(_mutex);
+ if (!_sufficientResponsesReceived.isValid()) {
+ // We've received sufficient responses and it's not safe to access the algorithm any more.
+ return;
+ }
- ++runner->_actualResponses;
- runner->_algorithm->processResponse(cbData.request, cbData.response);
- if (runner->_algorithm->hasReceivedSufficientResponses()) {
- // TODO: remove static cast once ScatterGatherRunner is designed to work with a generic
- // TaskExecutor.
- ReplicationExecutor* executor = static_cast<ReplicationExecutor*>(cbData.executor);
- runner->_signalSufficientResponsesReceived(executor);
+ ++_actualResponses;
+ _algorithm->processResponse(cbData.request, cbData.response);
+ if (_algorithm->hasReceivedSufficientResponses()) {
+ _signalSufficientResponsesReceived();
} else {
- invariant(runner->_actualResponses < runner->_callbacks.size());
+ invariant(_actualResponses < _callbacks.size());
}
}
-void ScatterGatherRunner::_signalSufficientResponsesReceived(ReplicationExecutor* executor) {
+void ScatterGatherRunner::RunnerImpl::_signalSufficientResponsesReceived() {
if (_sufficientResponsesReceived.isValid()) {
std::for_each(_callbacks.begin(),
_callbacks.end(),
- stdx::bind(&ReplicationExecutor::cancel, executor, stdx::placeholders::_1));
- const ReplicationExecutor::EventHandle h = _sufficientResponsesReceived;
- _sufficientResponsesReceived = ReplicationExecutor::EventHandle();
- if (_onCompletion) {
- _onCompletion();
- }
- executor->signalEvent(h);
+ stdx::bind(&ReplicationExecutor::cancel, _executor, stdx::placeholders::_1));
+ _executor->signalEvent(_sufficientResponsesReceived);
+ _sufficientResponsesReceived = EventHandle();
}
}
diff --git a/src/mongo/db/repl/scatter_gather_runner.h b/src/mongo/db/repl/scatter_gather_runner.h
index ad7b8d93aa5..e4e6e40f404 100644
--- a/src/mongo/db/repl/scatter_gather_runner.h
+++ b/src/mongo/db/repl/scatter_gather_runner.h
@@ -33,6 +33,7 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/db/repl/replication_executor.h"
#include "mongo/stdx/functional.h"
+#include "mongo/stdx/mutex.h"
namespace mongo {
@@ -44,7 +45,7 @@ namespace repl {
class ScatterGatherAlgorithm;
/**
- * Implementation of a scatter-gather behavior using a ReplicationExecutor.
+ * Interface of a scatter-gather behavior.
*/
class ScatterGatherRunner {
MONGO_DISALLOW_COPYING(ScatterGatherRunner);
@@ -53,14 +54,12 @@ public:
/**
* Constructs a new runner whose underlying algorithm is "algorithm".
*
- * "algorithm" must remain in scope until the runner's destructor completes.
+ * "algorithm" and "executor" must remain in scope until the runner's destructor completes.
*/
- explicit ScatterGatherRunner(ScatterGatherAlgorithm* algorithm);
-
- ~ScatterGatherRunner();
+ explicit ScatterGatherRunner(ScatterGatherAlgorithm* algorithm, ReplicationExecutor* executor);
/**
- * Runs the scatter-gather process using "executor", and blocks until it completes.
+ * Runs the scatter-gather process and blocks until it completes.
*
* Must _not_ be run from inside the executor context.
*
@@ -70,55 +69,71 @@ public:
* is scheduled but before it completes, this method will return Status::OK(),
* just as it does when it runs successfully to completion.
*/
- Status run(ReplicationExecutor* executor);
+ Status run();
/**
- * Starts executing the scatter-gather process using "executor".
- *
* On success, returns an event handle that will be signaled when the runner has
* finished executing the scatter-gather process. After that event has been
* signaled, it is safe for the caller to examine any state on "algorithm".
*
- * This method must be called inside the executor context.
- *
- * onCompletion is an optional callback that will be executed in executor context
- * immediately prior to signaling the event handle returned here. It must never
- * throw exceptions. It may examine the state of the algorithm object.
- *
- * NOTE: If the executor starts to shut down before onCompletion executes, onCompletion may
- * never execute, even though the returned event will eventually be signaled.
+ * The returned event will eventually be signaled.
*/
- StatusWith<ReplicationExecutor::EventHandle> start(
- ReplicationExecutor* executor,
- const stdx::function<void()>& onCompletion = stdx::function<void()>());
+ StatusWith<ReplicationExecutor::EventHandle> start();
/**
- * Informs the runner to cancel further processing. The "executor" argument
- * must point to the same executor passed to "start()".
- *
- * Like start, this method must be called from within the executor context.
+ * Informs the runner to cancel further processing.
*/
- void cancel(ReplicationExecutor* executor);
+ void cancel();
private:
/**
- * Callback invoked once for every response from the network.
- */
- static void _processResponse(const ReplicationExecutor::RemoteCommandCallbackArgs& cbData,
- ScatterGatherRunner* runner);
-
- /**
- * Method that performs all actions required when _algorithm indicates a sufficient
- * number of respones have been received.
+ * Implementation of a scatter-gather behavior using a ReplicationExecutor.
*/
- void _signalSufficientResponsesReceived(ReplicationExecutor* executor);
-
- ScatterGatherAlgorithm* _algorithm;
- stdx::function<void()> _onCompletion;
- ReplicationExecutor::EventHandle _sufficientResponsesReceived;
- std::vector<ReplicationExecutor::CallbackHandle> _callbacks;
- size_t _actualResponses;
- bool _started;
+ class RunnerImpl {
+ public:
+ explicit RunnerImpl(ScatterGatherAlgorithm* algorithm, ReplicationExecutor* executor);
+
+ /**
+ * On success, returns an event handle that will be signaled when the runner has
+ * finished executing the scatter-gather process. After that event has been
+ * signaled, it is safe for the caller to examine any state on "algorithm".
+ *
+ * The returned event will eventually be signaled.
+ */
+ StatusWith<ReplicationExecutor::EventHandle> start(
+ const ReplicationExecutor::RemoteCommandCallbackFn cb);
+
+ /**
+ * Informs the runner to cancel further processing.
+ */
+ void cancel();
+
+ /**
+ * Callback invoked once for every response from the network.
+ */
+ void processResponse(const ReplicationExecutor::RemoteCommandCallbackArgs& cbData);
+
+ private:
+ /**
+ * Method that performs all actions required when _algorithm indicates a sufficient
+ * number of responses have been received.
+ */
+ void _signalSufficientResponsesReceived();
+
+ ReplicationExecutor* _executor; // Not owned here.
+ ScatterGatherAlgorithm* _algorithm; // Not owned here.
+ ReplicationExecutor::EventHandle _sufficientResponsesReceived;
+ std::vector<ReplicationExecutor::CallbackHandle> _callbacks;
+ size_t _actualResponses = 0;
+ bool _started = false;
+ stdx::mutex _mutex;
+ };
+
+ ReplicationExecutor* _executor; // Not owned here.
+
+ // This pointer of RunnerImpl will be shared with remote command callbacks to make sure
+ // callbacks can access the members safely.
+ std::shared_ptr<RunnerImpl> _impl;
};
} // namespace repl
diff --git a/src/mongo/db/repl/scatter_gather_test.cpp b/src/mongo/db/repl/scatter_gather_test.cpp
index 6e4c11a65e0..98c5032bc6c 100644
--- a/src/mongo/db/repl/scatter_gather_test.cpp
+++ b/src/mongo/db/repl/scatter_gather_test.cpp
@@ -45,6 +45,8 @@ using executor::NetworkInterfaceMock;
using executor::RemoteCommandRequest;
using executor::RemoteCommandResponse;
+static const int kTotalRequests = 3;
+
/**
* Algorithm for testing the ScatterGatherRunner, which will finish running when finish() is
* called, or upon receiving responses from two nodes. Creates a three requests algorithm
@@ -57,7 +59,7 @@ public:
virtual std::vector<RemoteCommandRequest> getRequests() const {
std::vector<RemoteCommandRequest> requests;
- for (int i = 0; i < 3; i++) {
+ for (int i = 0; i < kTotalRequests; i++) {
requests.push_back(RemoteCommandRequest(
HostAndPort("hostname", i), "admin", BSONObj(), Milliseconds(30 * 1000)));
}
@@ -150,7 +152,7 @@ public:
private:
void _run(ReplicationExecutor* executor) {
- _result = _sgr->run(_executor);
+ _result = _sgr->run();
}
ScatterGatherRunner* _sgr;
@@ -161,10 +163,17 @@ private:
// Simple onCompletion function which will toggle a bool, so that we can check the logs to
// ensure the onCompletion function ran when expected.
-void onCompletionTestFunction(bool* ran) {
- *ran = true;
+executor::TaskExecutor::CallbackFn getOnCompletionTestFunction(bool* ran) {
+ auto cb = [ran](const ReplicationExecutor::CallbackArgs& cbData) {
+ if (!cbData.status.isOK()) {
+ return;
+ }
+ *ran = true;
+ };
+ return cb;
}
+
// Confirm that running via start() will finish and run the onComplete function once sufficient
// responses have been received.
// Confirm that deleting both the ScatterGatherTestAlgorithm and ScatterGatherRunner while
@@ -172,10 +181,10 @@ void onCompletionTestFunction(bool* ran) {
// completed.
TEST_F(ScatterGatherTest, DeleteAlgorithmAfterItHasCompleted) {
ScatterGatherTestAlgorithm* sga = new ScatterGatherTestAlgorithm();
- ScatterGatherRunner* sgr = new ScatterGatherRunner(sga);
+ ScatterGatherRunner* sgr = new ScatterGatherRunner(sga, getExecutor());
bool ranCompletion = false;
- StatusWith<ReplicationExecutor::EventHandle> status =
- sgr->start(getExecutor(), stdx::bind(&onCompletionTestFunction, &ranCompletion));
+ StatusWith<ReplicationExecutor::EventHandle> status = sgr->start();
+ getExecutor()->onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion));
ASSERT_OK(status.getStatus());
ASSERT_FALSE(ranCompletion);
@@ -217,10 +226,10 @@ TEST_F(ScatterGatherTest, DeleteAlgorithmAfterItHasCompleted) {
// to return ErrorCodes::ShutdownInProgress.
TEST_F(ScatterGatherTest, ShutdownExecutorBeforeRun) {
ScatterGatherTestAlgorithm sga;
- ScatterGatherRunner sgr(&sga);
+ ScatterGatherRunner sgr(&sga, getExecutor());
getExecutor()->shutdown();
sga.finish();
- Status status = sgr.run(getExecutor());
+ Status status = sgr.run();
ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, status);
}
@@ -228,14 +237,17 @@ TEST_F(ScatterGatherTest, ShutdownExecutorBeforeRun) {
// finishes will cause run() to return Status::OK().
TEST_F(ScatterGatherTest, ShutdownExecutorAfterRun) {
ScatterGatherTestAlgorithm sga;
- ScatterGatherRunner sgr(&sga);
+ ScatterGatherRunner sgr(&sga, getExecutor());
ScatterGatherRunnerRunner sgrr(&sgr, getExecutor());
sgrr.run();
// need to wait for the scatter-gather to be scheduled in the executor
NetworkInterfaceMock* net = getNet();
net->enterNetwork();
- NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest();
- net->blackHole(noi);
+ // Black hole all requests before shutdown, so that scheduleRemoteCommand will succeed.
+ for (int i = 0; i < kTotalRequests; i++) {
+ NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest();
+ net->blackHole(noi);
+ }
net->exitNetwork();
getExecutor()->shutdown();
Status status = sgrr.getResult();
@@ -246,11 +258,10 @@ TEST_F(ScatterGatherTest, ShutdownExecutorAfterRun) {
// to return ErrorCodes::ShutdownInProgress and should not run onCompletion().
TEST_F(ScatterGatherTest, ShutdownExecutorBeforeStart) {
ScatterGatherTestAlgorithm sga;
- ScatterGatherRunner sgr(&sga);
+ ScatterGatherRunner sgr(&sga, getExecutor());
getExecutor()->shutdown();
bool ranCompletion = false;
- StatusWith<ReplicationExecutor::EventHandle> status =
- sgr.start(getExecutor(), stdx::bind(&onCompletionTestFunction, &ranCompletion));
+ StatusWith<ReplicationExecutor::EventHandle> status = sgr.start();
sga.finish();
ASSERT_FALSE(ranCompletion);
ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, status.getStatus());
@@ -260,10 +271,10 @@ TEST_F(ScatterGatherTest, ShutdownExecutorBeforeStart) {
// to return Status::OK and should not run onCompletion().
TEST_F(ScatterGatherTest, ShutdownExecutorAfterStart) {
ScatterGatherTestAlgorithm sga;
- ScatterGatherRunner sgr(&sga);
+ ScatterGatherRunner sgr(&sga, getExecutor());
bool ranCompletion = false;
- StatusWith<ReplicationExecutor::EventHandle> status =
- sgr.start(getExecutor(), stdx::bind(&onCompletionTestFunction, &ranCompletion));
+ StatusWith<ReplicationExecutor::EventHandle> status = sgr.start();
+ getExecutor()->onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion));
getExecutor()->shutdown();
sga.finish();
ASSERT_FALSE(ranCompletion);
@@ -273,10 +284,10 @@ TEST_F(ScatterGatherTest, ShutdownExecutorAfterStart) {
// Confirm that responses are not processed once sufficient responses have been received.
TEST_F(ScatterGatherTest, DoNotProcessMoreThanSufficientResponses) {
ScatterGatherTestAlgorithm sga;
- ScatterGatherRunner sgr(&sga);
+ ScatterGatherRunner sgr(&sga, getExecutor());
bool ranCompletion = false;
- StatusWith<ReplicationExecutor::EventHandle> status =
- sgr.start(getExecutor(), stdx::bind(&onCompletionTestFunction, &ranCompletion));
+ StatusWith<ReplicationExecutor::EventHandle> status = sgr.start();
+ getExecutor()->onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion));
ASSERT_OK(status.getStatus());
ASSERT_FALSE(ranCompletion);
@@ -318,17 +329,17 @@ TEST_F(ScatterGatherTest, DoNotCreateCallbacksIfHasSufficientResponsesReturnsTru
ScatterGatherTestAlgorithm sga;
// set hasReceivedSufficientResponses to return true before the run starts
sga.finish();
- ScatterGatherRunner sgr(&sga);
+ ScatterGatherRunner sgr(&sga, getExecutor());
bool ranCompletion = false;
- StatusWith<ReplicationExecutor::EventHandle> status =
- sgr.start(getExecutor(), stdx::bind(&onCompletionTestFunction, &ranCompletion));
+ StatusWith<ReplicationExecutor::EventHandle> status = sgr.start();
+ getExecutor()->onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion));
ASSERT_OK(status.getStatus());
- ASSERT_TRUE(ranCompletion);
-
+ // Wait until callback finishes.
NetworkInterfaceMock* net = getNet();
net->enterNetwork();
- ASSERT_FALSE(net->hasReadyRequests());
+ net->runReadyNetworkOperations();
net->exitNetwork();
+ ASSERT_TRUE(ranCompletion);
}
#if 0
@@ -340,7 +351,7 @@ TEST_F(ScatterGatherTest, DoNotCreateCallbacksIfHasSufficientResponsesReturnsTru
ScatterGatherRunner sgr(&sga);
bool ranCompletion = false;
StatusWith<ReplicationExecutor::EventHandle> status = sgr.start(getExecutor(),
- stdx::bind(&onCompletionTestFunction, &ranCompletion));
+ getOnCompletionTestFunction(&ranCompletion));
ASSERT_OK(status.getStatus());
ASSERT_FALSE(ranCompletion);
@@ -379,7 +390,7 @@ TEST_F(ScatterGatherTest, DoNotCreateCallbacksIfHasSufficientResponsesReturnsTru
// Confirm that running via run() will finish once sufficient responses have been received.
TEST_F(ScatterGatherTest, SuccessfulScatterGatherViaRun) {
ScatterGatherTestAlgorithm sga;
- ScatterGatherRunner sgr(&sga);
+ ScatterGatherRunner sgr(&sga, getExecutor());
ScatterGatherRunnerRunner sgrr(&sgr, getExecutor());
sgrr.run();
diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h
index e6217295b95..1809d906c20 100644
--- a/src/mongo/db/repl/topology_coordinator.h
+++ b/src/mongo/db/repl/topology_coordinator.h
@@ -208,8 +208,7 @@ public:
////////////////////////////////////////////////////////////
// produces a reply to a replSetSyncFrom command
- virtual void prepareSyncFromResponse(const ReplicationExecutor::CallbackArgs& data,
- const HostAndPort& target,
+ virtual void prepareSyncFromResponse(const HostAndPort& target,
const OpTime& lastOpApplied,
BSONObjBuilder* response,
Status* result) = 0;
@@ -254,8 +253,7 @@ public:
};
// produce a reply to a status request
- virtual void prepareStatusResponse(const ReplicationExecutor::CallbackArgs& data,
- const ReplSetStatusArgs& rsStatusArgs,
+ virtual void prepareStatusResponse(const ReplSetStatusArgs& rsStatusArgs,
BSONObjBuilder* response,
Status* result) = 0;
diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp
index c03a883010d..da60dd90e1e 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl.cpp
@@ -350,16 +350,10 @@ void TopologyCoordinatorImpl::clearSyncSourceBlacklist() {
_syncSourceBlacklist.clear();
}
-void TopologyCoordinatorImpl::prepareSyncFromResponse(const ReplicationExecutor::CallbackArgs& data,
- const HostAndPort& target,
+void TopologyCoordinatorImpl::prepareSyncFromResponse(const HostAndPort& target,
const OpTime& lastOpApplied,
BSONObjBuilder* response,
Status* result) {
- if (data.status == ErrorCodes::CallbackCanceled) {
- *result = Status(ErrorCodes::ShutdownInProgress, "replication system is shutting down");
- return;
- }
-
response->append("syncFromRequested", target.toString());
if (_selfIndex == -1) {
@@ -1507,15 +1501,9 @@ const MemberConfig* TopologyCoordinatorImpl::_currentPrimaryMember() const {
return &(_rsConfig.getMemberAt(_currentPrimaryIndex));
}
-void TopologyCoordinatorImpl::prepareStatusResponse(const ReplicationExecutor::CallbackArgs& data,
- const ReplSetStatusArgs& rsStatusArgs,
+void TopologyCoordinatorImpl::prepareStatusResponse(const ReplSetStatusArgs& rsStatusArgs,
BSONObjBuilder* response,
Status* result) {
- if (data.status == ErrorCodes::CallbackCanceled) {
- *result = Status(ErrorCodes::ShutdownInProgress, "replication system is shutting down");
- return;
- }
-
// output for each member
vector<BSONObj> membersOut;
const MemberState myState = getMemberState();
diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h
index f86c0e8d928..597a1cc6bc3 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.h
+++ b/src/mongo/db/repl/topology_coordinator_impl.h
@@ -164,8 +164,7 @@ public:
virtual void setElectionSleepUntil(Date_t newTime);
virtual void setFollowerMode(MemberState::MS newMode);
virtual void adjustMaintenanceCountBy(int inc);
- virtual void prepareSyncFromResponse(const ReplicationExecutor::CallbackArgs& data,
- const HostAndPort& target,
+ virtual void prepareSyncFromResponse(const HostAndPort& target,
const OpTime& lastOpApplied,
BSONObjBuilder* response,
Status* result);
@@ -191,8 +190,7 @@ public:
const OpTime& lastOpApplied,
const OpTime& lastOpDurable,
ReplSetHeartbeatResponse* response);
- virtual void prepareStatusResponse(const ReplicationExecutor::CallbackArgs& data,
- const ReplSetStatusArgs& rsStatusArgs,
+ virtual void prepareStatusResponse(const ReplSetStatusArgs& rsStatusArgs,
BSONObjBuilder* response,
Status* result);
virtual void fillIsMasterForReplSet(IsMasterResponse* response);
diff --git a/src/mongo/db/repl/topology_coordinator_impl_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_test.cpp
index 27711e3fcf7..24c49837c2c 100644
--- a/src/mongo/db/repl/topology_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl_test.cpp
@@ -829,8 +829,7 @@ TEST_F(TopoCoordTest, NodeReturnsNotSecondaryWhenSyncFromIsRunPriorToHavingAConf
BSONObjBuilder response;
// if we do not have an index in the config, we should get ErrorCodes::NotSecondary
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h1"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h1"), ourOpTime, &response, &result);
ASSERT_EQUALS(ErrorCodes::NotSecondary, result);
ASSERT_EQUALS("Removed and uninitialized nodes do not sync", result.reason());
}
@@ -854,8 +853,7 @@ TEST_F(TopoCoordTest, NodeReturnsNotSecondaryWhenSyncFromIsRunAgainstArbiter) {
<< "h1"))),
0);
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h1"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h1"), ourOpTime, &response, &result);
ASSERT_EQUALS(ErrorCodes::NotSecondary, result);
ASSERT_EQUALS("arbiters don't sync", result.reason());
}
@@ -891,8 +889,7 @@ TEST_F(TopoCoordTest, NodeReturnsNotSecondaryWhenSyncFromIsRunAgainstPrimary) {
makeSelfPrimary();
ASSERT_EQUALS(0, getCurrentPrimaryIndex());
getTopoCoord()._setCurrentPrimaryForTest(0);
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h3"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h3"), ourOpTime, &response, &result);
ASSERT_EQUALS(ErrorCodes::NotSecondary, result);
ASSERT_EQUALS("primaries don't sync", result.reason());
ASSERT_EQUALS("h3:27017", response.obj()["syncFromRequested"].String());
@@ -926,7 +923,7 @@ TEST_F(TopoCoordTest, NodeReturnsNodeNotFoundWhenSyncFromRequestsANodeNotInConfi
setSelfMemberState(MemberState::RS_SECONDARY);
getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("fakemember"), ourOpTime, &response, &result);
+ HostAndPort("fakemember"), ourOpTime, &response, &result);
ASSERT_EQUALS(ErrorCodes::NodeNotFound, result);
ASSERT_EQUALS("Could not find member \"fakemember:27017\" in replica set", result.reason());
}
@@ -959,8 +956,7 @@ TEST_F(TopoCoordTest, NodeReturnsInvalidOptionsWhenSyncFromRequestsSelf) {
setSelfMemberState(MemberState::RS_SECONDARY);
// Try to sync from self
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("hself"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("hself"), ourOpTime, &response, &result);
ASSERT_EQUALS(ErrorCodes::InvalidOptions, result);
ASSERT_EQUALS("I cannot sync from myself", result.reason());
}
@@ -994,8 +990,7 @@ TEST_F(TopoCoordTest, NodeReturnsInvalidOptionsWhenSyncFromRequestsArbiter) {
// Try to sync from an arbiter
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h1"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h1"), ourOpTime, &response, &result);
ASSERT_EQUALS(ErrorCodes::InvalidOptions, result);
ASSERT_EQUALS("Cannot sync from \"h1:27017\" because it is an arbiter", result.reason());
}
@@ -1028,8 +1023,7 @@ TEST_F(TopoCoordTest, NodeReturnsInvalidOptionsWhenSyncFromRequestsAnIndexNonbui
setSelfMemberState(MemberState::RS_SECONDARY);
// Try to sync from a node that doesn't build indexes
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h2"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h2"), ourOpTime, &response, &result);
ASSERT_EQUALS(ErrorCodes::InvalidOptions, result);
ASSERT_EQUALS("Cannot sync from \"h2:27017\" because it does not build indexes",
result.reason());
@@ -1065,8 +1059,7 @@ TEST_F(TopoCoordTest, NodeReturnsHostUnreachableWhenSyncFromRequestsADownNode) {
// Try to sync from a member that is down
receiveDownHeartbeat(HostAndPort("h4"), "rs0", OpTime());
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h4"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h4"), ourOpTime, &response, &result);
ASSERT_EQUALS(ErrorCodes::HostUnreachable, result);
ASSERT_EQUALS("I cannot reach the requested member: h4:27017", result.reason());
}
@@ -1102,8 +1095,7 @@ TEST_F(TopoCoordTest, ChooseRequestedNodeWhenSyncFromRequestsAStaleNode) {
heartbeatFromMember(
HostAndPort("h5"), "rs0", MemberState::RS_SECONDARY, staleOpTime, Milliseconds(100));
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h5"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h5"), ourOpTime, &response, &result);
ASSERT_OK(result);
ASSERT_EQUALS("requested member \"h5:27017\" is more than 10 seconds behind us",
response.obj()["warning"].String());
@@ -1142,8 +1134,7 @@ TEST_F(TopoCoordTest, ChooseRequestedNodeWhenSyncFromRequestsAValidNode) {
heartbeatFromMember(
HostAndPort("h6"), "rs0", MemberState::RS_SECONDARY, ourOpTime, Milliseconds(100));
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h6"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h6"), ourOpTime, &response, &result);
ASSERT_OK(result);
BSONObj responseObj = response.obj();
ASSERT_FALSE(responseObj.hasField("warning"));
@@ -1183,8 +1174,7 @@ TEST_F(TopoCoordTest,
HostAndPort("h6"), "rs0", MemberState::RS_SECONDARY, ourOpTime, Milliseconds(100));
// node goes down between forceSync and chooseNewSyncSource
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h6"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h6"), ourOpTime, &response, &result);
BSONObj responseObj = response.obj();
ASSERT_FALSE(responseObj.hasField("warning"));
receiveDownHeartbeat(HostAndPort("h6"), "rs0", OpTime());
@@ -1222,8 +1212,7 @@ TEST_F(TopoCoordTest, NodeReturnsUnauthorizedWhenSyncFromRequestsANodeWeAreNotAu
// Try to sync from a member that is unauth'd
receiveDownHeartbeat(HostAndPort("h5"), "rs0", OpTime(), ErrorCodes::Unauthorized);
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h5"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h5"), ourOpTime, &response, &result);
ASSERT_NOT_OK(result);
ASSERT_EQUALS(ErrorCodes::Unauthorized, result.code());
ASSERT_EQUALS("not authorized to communicate with h5:27017", result.reason());
@@ -1245,8 +1234,7 @@ TEST_F(TopoCoordTest, NodeReturnsInvalidOptionsWhenAskedToSyncFromANonVoterAsAVo
"]}"),
0);
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h2"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h2"), ourOpTime, &response, &result);
ASSERT_EQUALS(ErrorCodes::InvalidOptions, result);
ASSERT_EQUALS("Cannot sync from \"h2:27017\" because it is not a voter", result.reason());
}
@@ -1284,8 +1272,7 @@ TEST_F(TopoCoordTest,
heartbeatFromMember(
HostAndPort("h5"), "rs0", MemberState::RS_SECONDARY, ourOpTime, Milliseconds(100));
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h5"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h5"), ourOpTime, &response, &result);
ASSERT_OK(result);
BSONObj responseObj = response.obj();
ASSERT_FALSE(responseObj.hasField("warning"));
@@ -1297,8 +1284,7 @@ TEST_F(TopoCoordTest,
HostAndPort("h6"), "rs0", MemberState::RS_SECONDARY, ourOpTime, Milliseconds(100));
// Sync successfully from another up-to-date member.
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h6"), ourOpTime, &response2, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h6"), ourOpTime, &response2, &result);
BSONObj response2Obj = response2.obj();
ASSERT_FALSE(response2Obj.hasField("warning"));
ASSERT_EQUALS(HostAndPort("h5").toString(), response2Obj["prevSyncTarget"].String());
@@ -1372,7 +1358,6 @@ TEST_F(TopoCoordTest, ReplSetGetStatus) {
BSONObjBuilder statusBuilder;
Status resultStatus(ErrorCodes::InternalError, "prepareStatusResponse didn't set result");
getTopoCoord().prepareStatusResponse(
- cbData(),
TopologyCoordinator::ReplSetStatusArgs{
curTime,
static_cast<unsigned>(durationCount<Seconds>(uptimeSecs)),
@@ -1488,7 +1473,6 @@ TEST_F(TopoCoordTest, NodeReturnsInvalidReplicaSetConfigInResponseToGetStatusWhe
BSONObjBuilder statusBuilder;
Status resultStatus(ErrorCodes::InternalError, "prepareStatusResponse didn't set result");
getTopoCoord().prepareStatusResponse(
- cbData(),
TopologyCoordinator::ReplSetStatusArgs{
curTime,
static_cast<unsigned>(durationCount<Seconds>(uptimeSecs)),
@@ -2187,7 +2171,6 @@ public:
BSONObjBuilder statusBuilder;
Status resultStatus(ErrorCodes::InternalError, "prepareStatusResponse didn't set result");
getTopoCoord().prepareStatusResponse(
- cbData(),
TopologyCoordinator::ReplSetStatusArgs{_firstRequestDate + Milliseconds(4000),
10,
OpTime(Timestamp(100, 0), 0),
@@ -2251,7 +2234,6 @@ public:
BSONObjBuilder statusBuilder;
Status resultStatus(ErrorCodes::InternalError, "prepareStatusResponse didn't set result");
getTopoCoord().prepareStatusResponse(
- cbData(),
TopologyCoordinator::ReplSetStatusArgs{firstRequestDate() + Seconds(4),
10,
OpTime(Timestamp(100, 0), 0),
@@ -2568,7 +2550,6 @@ TEST_F(HeartbeatResponseTestTwoRetries, NodeDoesNotRetryHeartbeatsAfterFailingTw
BSONObjBuilder statusBuilder;
Status resultStatus(ErrorCodes::InternalError, "prepareStatusResponse didn't set result");
getTopoCoord().prepareStatusResponse(
- cbData(),
TopologyCoordinator::ReplSetStatusArgs{firstRequestDate() + Milliseconds(4900),
10,
OpTime(Timestamp(100, 0), 0),
@@ -2810,7 +2791,6 @@ TEST_F(HeartbeatResponseTestTwoRetries,
BSONObjBuilder statusBuilder;
Status resultStatus(ErrorCodes::InternalError, "prepareStatusResponse didn't set result");
getTopoCoord().prepareStatusResponse(
- cbData(),
TopologyCoordinator::ReplSetStatusArgs{firstRequestDate() + Milliseconds(7000),
600,
OpTime(Timestamp(100, 0), 0),
@@ -4238,42 +4218,6 @@ TEST_F(TopoCoordTest,
ASSERT(TopologyCoordinator::Role::candidate == getTopoCoord().getRole());
}
-class ShutdownInProgressTest : public TopoCoordTest {
-public:
- ShutdownInProgressTest()
- : ourCbData(NULL,
- ReplicationExecutor::CallbackHandle(),
- Status(ErrorCodes::CallbackCanceled, "")) {}
-
- virtual ReplicationExecutor::CallbackArgs cbData() {
- return ourCbData;
- }
-
-private:
- ReplicationExecutor::CallbackArgs ourCbData;
-};
-
-TEST_F(ShutdownInProgressTest, NodeReturnsShutdownInProgressWhenSyncFromCallbackCanceled) {
- Status result = Status::OK();
- BSONObjBuilder response;
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("host2:27017"), OpTime(), &response, &result);
- ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, result);
- ASSERT_TRUE(response.obj().isEmpty());
-}
-
-TEST_F(ShutdownInProgressTest, NodeReturnsShutDownInProgressWhenGetReplSetStatusCallbackCanceled) {
- Status result = Status::OK();
- BSONObjBuilder response;
- getTopoCoord().prepareStatusResponse(
- cbData(),
- TopologyCoordinator::ReplSetStatusArgs{Date_t(), 0, OpTime(), OpTime(), OpTime(), OpTime()},
- &response,
- &result);
- ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, result);
- ASSERT_TRUE(response.obj().isEmpty());
-}
-
class PrepareHeartbeatResponseTest : public TopoCoordTest {
public:
virtual void setUp() {
diff --git a/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp
index 70cbf8b97aa..6adf5542212 100644
--- a/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp
@@ -822,8 +822,7 @@ TEST_F(TopoCoordTest, NodeReturnsNotSecondaryWhenSyncFromIsRunPriorToHavingAConf
BSONObjBuilder response;
// if we do not have an index in the config, we should get ErrorCodes::NotSecondary
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h1"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h1"), ourOpTime, &response, &result);
ASSERT_EQUALS(ErrorCodes::NotSecondary, result);
ASSERT_EQUALS("Removed and uninitialized nodes do not sync", result.reason());
}
@@ -847,8 +846,7 @@ TEST_F(TopoCoordTest, NodeReturnsNotSecondaryWhenSyncFromIsRunAgainstArbiter) {
<< "h1"))),
0);
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h1"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h1"), ourOpTime, &response, &result);
ASSERT_EQUALS(ErrorCodes::NotSecondary, result);
ASSERT_EQUALS("arbiters don't sync", result.reason());
}
@@ -884,8 +882,7 @@ TEST_F(TopoCoordTest, NodeReturnsNotSecondaryWhenSyncFromIsRunAgainstPrimary) {
makeSelfPrimary();
ASSERT_EQUALS(0, getCurrentPrimaryIndex());
getTopoCoord()._setCurrentPrimaryForTest(0);
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h3"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h3"), ourOpTime, &response, &result);
ASSERT_EQUALS(ErrorCodes::NotSecondary, result);
ASSERT_EQUALS("primaries don't sync", result.reason());
ASSERT_EQUALS("h3:27017", response.obj()["syncFromRequested"].String());
@@ -919,7 +916,7 @@ TEST_F(TopoCoordTest, NodeReturnsNodeNotFoundWhenSyncFromRequestsANodeNotInConfi
setSelfMemberState(MemberState::RS_SECONDARY);
getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("fakemember"), ourOpTime, &response, &result);
+ HostAndPort("fakemember"), ourOpTime, &response, &result);
ASSERT_EQUALS(ErrorCodes::NodeNotFound, result);
ASSERT_EQUALS("Could not find member \"fakemember:27017\" in replica set", result.reason());
}
@@ -952,8 +949,7 @@ TEST_F(TopoCoordTest, NodeReturnsInvalidOptionsWhenSyncFromRequestsSelf) {
setSelfMemberState(MemberState::RS_SECONDARY);
// Try to sync from self
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("hself"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("hself"), ourOpTime, &response, &result);
ASSERT_EQUALS(ErrorCodes::InvalidOptions, result);
ASSERT_EQUALS("I cannot sync from myself", result.reason());
}
@@ -987,8 +983,7 @@ TEST_F(TopoCoordTest, NodeReturnsInvalidOptionsWhenSyncFromRequestsArbiter) {
// Try to sync from an arbiter
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h1"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h1"), ourOpTime, &response, &result);
ASSERT_EQUALS(ErrorCodes::InvalidOptions, result);
ASSERT_EQUALS("Cannot sync from \"h1:27017\" because it is an arbiter", result.reason());
}
@@ -1021,8 +1016,7 @@ TEST_F(TopoCoordTest, NodeReturnsInvalidOptionsWhenSyncFromRequestsAnIndexNonbui
setSelfMemberState(MemberState::RS_SECONDARY);
// Try to sync from a node that doesn't build indexes
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h2"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h2"), ourOpTime, &response, &result);
ASSERT_EQUALS(ErrorCodes::InvalidOptions, result);
ASSERT_EQUALS("Cannot sync from \"h2:27017\" because it does not build indexes",
result.reason());
@@ -1058,8 +1052,7 @@ TEST_F(TopoCoordTest, NodeReturnsHostUnreachableWhenSyncFromRequestsADownNode) {
// Try to sync from a member that is down
receiveDownHeartbeat(HostAndPort("h4"), "rs0", OpTime());
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h4"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h4"), ourOpTime, &response, &result);
ASSERT_EQUALS(ErrorCodes::HostUnreachable, result);
ASSERT_EQUALS("I cannot reach the requested member: h4:27017", result.reason());
}
@@ -1095,8 +1088,7 @@ TEST_F(TopoCoordTest, ChooseRequestedNodeWhenSyncFromRequestsAStaleNode) {
heartbeatFromMember(
HostAndPort("h5"), "rs0", MemberState::RS_SECONDARY, staleOpTime, Milliseconds(100));
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h5"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h5"), ourOpTime, &response, &result);
ASSERT_OK(result);
ASSERT_EQUALS("requested member \"h5:27017\" is more than 10 seconds behind us",
response.obj()["warning"].String());
@@ -1135,8 +1127,7 @@ TEST_F(TopoCoordTest, ChooseRequestedNodeWhenSyncFromRequestsAValidNode) {
heartbeatFromMember(
HostAndPort("h6"), "rs0", MemberState::RS_SECONDARY, ourOpTime, Milliseconds(100));
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h6"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h6"), ourOpTime, &response, &result);
ASSERT_OK(result);
BSONObj responseObj = response.obj();
ASSERT_FALSE(responseObj.hasField("warning"));
@@ -1176,8 +1167,7 @@ TEST_F(TopoCoordTest,
HostAndPort("h6"), "rs0", MemberState::RS_SECONDARY, ourOpTime, Milliseconds(100));
// node goes down between forceSync and chooseNewSyncSource
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h6"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h6"), ourOpTime, &response, &result);
BSONObj responseObj = response.obj();
ASSERT_FALSE(responseObj.hasField("warning"));
receiveDownHeartbeat(HostAndPort("h6"), "rs0", OpTime());
@@ -1215,8 +1205,7 @@ TEST_F(TopoCoordTest, NodeReturnsUnauthorizedWhenSyncFromRequestsANodeWeAreNotAu
// Try to sync from a member that is unauth'd
receiveDownHeartbeat(HostAndPort("h5"), "rs0", OpTime(), ErrorCodes::Unauthorized);
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h5"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h5"), ourOpTime, &response, &result);
ASSERT_NOT_OK(result);
ASSERT_EQUALS(ErrorCodes::Unauthorized, result.code());
ASSERT_EQUALS("not authorized to communicate with h5:27017", result.reason());
@@ -1238,8 +1227,7 @@ TEST_F(TopoCoordTest, NodeReturnsInvalidOptionsWhenAskedToSyncFromANonVoterAsAVo
"]}"),
0);
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h2"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h2"), ourOpTime, &response, &result);
ASSERT_EQUALS(ErrorCodes::InvalidOptions, result);
ASSERT_EQUALS("Cannot sync from \"h2:27017\" because it is not a voter", result.reason());
}
@@ -1277,8 +1265,7 @@ TEST_F(TopoCoordTest,
heartbeatFromMember(
HostAndPort("h5"), "rs0", MemberState::RS_SECONDARY, ourOpTime, Milliseconds(100));
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h5"), ourOpTime, &response, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h5"), ourOpTime, &response, &result);
ASSERT_OK(result);
BSONObj responseObj = response.obj();
ASSERT_FALSE(responseObj.hasField("warning"));
@@ -1290,8 +1277,7 @@ TEST_F(TopoCoordTest,
HostAndPort("h6"), "rs0", MemberState::RS_SECONDARY, ourOpTime, Milliseconds(100));
// Sync successfully from another up-to-date member.
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("h6"), ourOpTime, &response2, &result);
+ getTopoCoord().prepareSyncFromResponse(HostAndPort("h6"), ourOpTime, &response2, &result);
BSONObj response2Obj = response2.obj();
ASSERT_FALSE(response2Obj.hasField("warning"));
ASSERT_EQUALS(HostAndPort("h5").toString(), response2Obj["prevSyncTarget"].String());
@@ -1365,7 +1351,6 @@ TEST_F(TopoCoordTest, ReplSetGetStatus) {
BSONObjBuilder statusBuilder;
Status resultStatus(ErrorCodes::InternalError, "prepareStatusResponse didn't set result");
getTopoCoord().prepareStatusResponse(
- cbData(),
TopologyCoordinator::ReplSetStatusArgs{
curTime,
static_cast<unsigned>(durationCount<Seconds>(uptimeSecs)),
@@ -1481,7 +1466,6 @@ TEST_F(TopoCoordTest, NodeReturnsInvalidReplicaSetConfigInResponseToGetStatusWhe
BSONObjBuilder statusBuilder;
Status resultStatus(ErrorCodes::InternalError, "prepareStatusResponse didn't set result");
getTopoCoord().prepareStatusResponse(
- cbData(),
TopologyCoordinator::ReplSetStatusArgs{
curTime,
static_cast<unsigned>(durationCount<Seconds>(uptimeSecs)),
@@ -1518,42 +1502,6 @@ TEST_F(TopoCoordTest, HeartbeatFrequencyShouldBeHalfElectionTimeoutWhenArbiter)
ASSERT_EQUALS(expected, action.getNextHeartbeatStartDate());
}
-class ShutdownInProgressTest : public TopoCoordTest {
-public:
- ShutdownInProgressTest()
- : ourCbData(NULL,
- ReplicationExecutor::CallbackHandle(),
- Status(ErrorCodes::CallbackCanceled, "")) {}
-
- virtual ReplicationExecutor::CallbackArgs cbData() {
- return ourCbData;
- }
-
-private:
- ReplicationExecutor::CallbackArgs ourCbData;
-};
-
-TEST_F(ShutdownInProgressTest, NodeReturnsShutdownInProgressWhenSyncFromCallbackCanceled) {
- Status result = Status::OK();
- BSONObjBuilder response;
- getTopoCoord().prepareSyncFromResponse(
- cbData(), HostAndPort("host2:27017"), OpTime(), &response, &result);
- ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, result);
- ASSERT_TRUE(response.obj().isEmpty());
-}
-
-TEST_F(ShutdownInProgressTest, NodeReturnsShutDownInProgressWhenGetReplSetStatusCallbackCanceled) {
- Status result = Status::OK();
- BSONObjBuilder response;
- getTopoCoord().prepareStatusResponse(
- cbData(),
- TopologyCoordinator::ReplSetStatusArgs{Date_t(), 0, OpTime(), OpTime(), OpTime(), OpTime()},
- &response,
- &result);
- ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, result);
- ASSERT_TRUE(response.obj().isEmpty());
-}
-
class PrepareHeartbeatResponseV1Test : public TopoCoordTest {
public:
virtual void setUp() {
@@ -4051,7 +3999,6 @@ public:
BSONObjBuilder statusBuilder;
Status resultStatus(ErrorCodes::InternalError, "prepareStatusResponse didn't set result");
getTopoCoord().prepareStatusResponse(
- cbData(),
TopologyCoordinator::ReplSetStatusArgs{_firstRequestDate + Milliseconds(4000),
10,
OpTime(Timestamp(100, 0), 0),
@@ -4135,7 +4082,6 @@ public:
BSONObjBuilder statusBuilder;
Status resultStatus(ErrorCodes::InternalError, "prepareStatusResponse didn't set result");
getTopoCoord().prepareStatusResponse(
- cbData(),
TopologyCoordinator::ReplSetStatusArgs{firstRequestDate() + Seconds(4),
10,
OpTime(Timestamp(100, 0), 0),
@@ -4184,7 +4130,6 @@ TEST_F(HeartbeatResponseTestTwoRetriesV1, NodeDoesNotRetryHeartbeatsAfterFailing
BSONObjBuilder statusBuilder;
Status resultStatus(ErrorCodes::InternalError, "prepareStatusResponse didn't set result");
getTopoCoord().prepareStatusResponse(
- cbData(),
TopologyCoordinator::ReplSetStatusArgs{firstRequestDate() + Milliseconds(4900),
10,
OpTime(Timestamp(100, 0), 0),
@@ -4245,7 +4190,6 @@ TEST_F(HeartbeatResponseTestTwoRetriesV1, HeartbeatThreeNonconsecutiveFailures)
BSONObjBuilder statusBuilder;
Status resultStatus(ErrorCodes::InternalError, "prepareStatusResponse didn't set result");
getTopoCoord().prepareStatusResponse(
- cbData(),
TopologyCoordinator::ReplSetStatusArgs{firstRequestDate() + Milliseconds(7000),
600,
OpTime(Timestamp(100, 0), 0),
diff --git a/src/mongo/db/repl/vote_requester.cpp b/src/mongo/db/repl/vote_requester.cpp
index eafa68715e2..836617e5493 100644
--- a/src/mongo/db/repl/vote_requester.cpp
+++ b/src/mongo/db/repl/vote_requester.cpp
@@ -141,22 +141,20 @@ unordered_set<HostAndPort> VoteRequester::Algorithm::getResponders() const {
VoteRequester::VoteRequester() : _isCanceled(false) {}
VoteRequester::~VoteRequester() {}
-StatusWith<ReplicationExecutor::EventHandle> VoteRequester::start(
- ReplicationExecutor* executor,
- const ReplicaSetConfig& rsConfig,
- long long candidateIndex,
- long long term,
- bool dryRun,
- OpTime lastDurableOpTime,
- const stdx::function<void()>& onCompletion) {
+StatusWith<ReplicationExecutor::EventHandle> VoteRequester::start(ReplicationExecutor* executor,
+ const ReplicaSetConfig& rsConfig,
+ long long candidateIndex,
+ long long term,
+ bool dryRun,
+ OpTime lastDurableOpTime) {
_algorithm.reset(new Algorithm(rsConfig, candidateIndex, term, dryRun, lastDurableOpTime));
- _runner.reset(new ScatterGatherRunner(_algorithm.get()));
- return _runner->start(executor, onCompletion);
+ _runner.reset(new ScatterGatherRunner(_algorithm.get(), executor));
+ return _runner->start();
}
-void VoteRequester::cancel(ReplicationExecutor* executor) {
+void VoteRequester::cancel() {
_isCanceled = true;
- _runner->cancel(executor);
+ _runner->cancel();
}
VoteRequester::Result VoteRequester::getResult() const {
diff --git a/src/mongo/db/repl/vote_requester.h b/src/mongo/db/repl/vote_requester.h
index 433facf345c..cb0be7fc888 100644
--- a/src/mongo/db/repl/vote_requester.h
+++ b/src/mongo/db/repl/vote_requester.h
@@ -105,26 +105,19 @@ public:
* in currentConfig, in attempt to receive sufficient votes to win the election.
*
* evh can be used to schedule a callback when the process is complete.
- * This function must be run in the executor, as it must be synchronous with the command
- * callbacks that it schedules.
* If this function returns Status::OK(), evh is then guaranteed to be signaled.
**/
- StatusWith<ReplicationExecutor::EventHandle> start(
- ReplicationExecutor* executor,
- const ReplicaSetConfig& rsConfig,
- long long candidateIndex,
- long long term,
- bool dryRun,
- OpTime lastDurableOpTime,
- const stdx::function<void()>& onCompletion = stdx::function<void()>());
+ StatusWith<ReplicationExecutor::EventHandle> start(ReplicationExecutor* executor,
+ const ReplicaSetConfig& rsConfig,
+ long long candidateIndex,
+ long long term,
+ bool dryRun,
+ OpTime lastDurableOpTime);
/**
- * Informs the VoteRequester to cancel further processing. The "executor"
- * argument must point to the same executor passed to "start()".
- *
- * Like start(), this method must run in the executor context.
+ * Informs the VoteRequester to cancel further processing.
*/
- void cancel(ReplicationExecutor* executor);
+ void cancel();
Result getResult() const;
unordered_set<HostAndPort> getResponders() const;