summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp
diff options
context:
space:
mode:
authorSiyuan Zhou <siyuan.zhou@mongodb.com>2016-03-08 19:40:54 -0500
committerSiyuan Zhou <siyuan.zhou@mongodb.com>2016-04-12 16:54:42 -0400
commitded60f5d4254f08d76ccdf0d3a694d473bd14100 (patch)
tree585cd96ce9dbfc9d23e6a66911e33f464e87edf1 /src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp
parent51abe7d21b799a3758ce71d18ac7d6a088f71e2c (diff)
downloadmongo-ded60f5d4254f08d76ccdf0d3a694d473bd14100.tar.gz
SERVER-22995 Protect TopoCoord with mutex rather than single thread executor.
Move scatter gather runner out of executor and protect the runner with its own mutex. Replace onComplete with callbacks scheduled on finish event.
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp')
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp45
1 files changed, 21 insertions, 24 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp
index acc9f8cf2ec..b0ac53d9491 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp
@@ -143,18 +143,19 @@ void ReplicationCoordinatorImpl::_startElectSelfV1() {
lk.unlock();
long long term = _topCoord->getTerm();
- StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh = _voteRequester->start(
- &_replExecutor,
- _rsConfig,
- _selfIndex,
- _topCoord->getTerm(),
- true, // dry run
- lastOpTime,
- stdx::bind(&ReplicationCoordinatorImpl::_onDryRunComplete, this, term));
+ StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh =
+ _voteRequester->start(&_replExecutor,
+ _rsConfig,
+ _selfIndex,
+ _topCoord->getTerm(),
+ true, // dry run
+ lastOpTime);
if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) {
return;
}
fassert(28685, nextPhaseEvh.getStatus());
+ _replExecutor.onEvent(nextPhaseEvh.getValue(),
+ stdx::bind(&ReplicationCoordinatorImpl::_onDryRunComplete, this, term));
lossGuard.dismiss();
}
@@ -163,6 +164,8 @@ void ReplicationCoordinatorImpl::_onDryRunComplete(long long originalTerm) {
invariant(!_electionWinnerDeclarer);
LoseElectionDryRunGuardV1 lossGuard(this);
+ LockGuard lk(_topoMutex);
+
if (_topCoord->getTerm() != originalTerm) {
log() << "not running for primary, we have been superceded already";
return;
@@ -222,15 +225,8 @@ void ReplicationCoordinatorImpl::_writeLastVoteForMyElection(
return;
}
- auto cbStatus = _replExecutor.scheduleWork(
- [this, lastVote](const ReplicationExecutor::CallbackArgs& cbData) {
- _replExecutor.signalEvent(_electionDryRunFinishedEvent);
- _startVoteRequester(lastVote.getTerm());
- });
- if (cbStatus.getStatus() == ErrorCodes::ShutdownInProgress) {
- return;
- }
- fassert(28768, cbStatus.getStatus());
+ _startVoteRequester(lastVote.getTerm());
+ _replExecutor.signalEvent(_electionDryRunFinishedEvent);
lossGuard.dismiss();
}
@@ -240,22 +236,21 @@ void ReplicationCoordinatorImpl::_startVoteRequester(long long newTerm) {
invariant(!_electionWinnerDeclarer);
LoseElectionGuardV1 lossGuard(this);
+ LockGuard lk(_topoMutex);
+
const auto lastOpTime =
_isDurableStorageEngine() ? getMyLastDurableOpTime() : getMyLastAppliedOpTime();
_voteRequester.reset(new VoteRequester);
StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh = _voteRequester->start(
- &_replExecutor,
- _rsConfig,
- _selfIndex,
- _topCoord->getTerm(),
- false,
- lastOpTime,
- stdx::bind(&ReplicationCoordinatorImpl::_onVoteRequestComplete, this, newTerm));
+ &_replExecutor, _rsConfig, _selfIndex, _topCoord->getTerm(), false, lastOpTime);
if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) {
return;
}
fassert(28643, nextPhaseEvh.getStatus());
+ _replExecutor.onEvent(
+ nextPhaseEvh.getValue(),
+ stdx::bind(&ReplicationCoordinatorImpl::_onVoteRequestComplete, this, newTerm));
lossGuard.dismiss();
}
@@ -265,6 +260,8 @@ void ReplicationCoordinatorImpl::_onVoteRequestComplete(long long originalTerm)
invariant(!_electionWinnerDeclarer);
LoseElectionGuardV1 lossGuard(this);
+ LockGuard lk(_topoMutex);
+
if (_topCoord->getTerm() != originalTerm) {
log() << "not becoming primary, we have been superceded already";
return;