diff options
author | matt dannenberg <matt.dannenberg@10gen.com> | 2015-05-26 10:34:41 -0400 |
---|---|---|
committer | matt dannenberg <matt.dannenberg@10gen.com> | 2015-06-02 05:18:02 -0400 |
commit | c561f5178239654cfbf14d398ab0fea1f08f5002 (patch) | |
tree | 5e6e6815569b131d3aca2a5f9a2296b5396e6df0 /src/mongo/db | |
parent | 34eb51e5d8d05abb56055042da244c30e58dfe08 (diff) | |
download | mongo-c561f5178239654cfbf14d398ab0fea1f08f5002.tar.gz |
SERVER-18669 add updateTerm() in ReplicationCoordinator to detect need to stepdown when term changes while primary
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/repl/replication_coordinator.h | 10 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 75 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.h | 16 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_test.cpp | 39 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_mock.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_mock.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator.h | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_impl.cpp | 19 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_impl.h | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_impl_test.cpp | 2 |
11 files changed, 163 insertions, 12 deletions
diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index 903e13fc758..9ce8a4e3a1f 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -621,10 +621,18 @@ namespace repl { virtual void summarizeAsHtml(ReplSetHtmlSummary* output) = 0; /** - * Return the current term. + * Returns the current term. */ virtual long long getTerm() = 0; + /** + * Attempts to update the current term for the V1 election protocol. If the term changes and + * this node is primary, relinquishes primary. + * Returns true if the term was updated (that is, when "term" was higher than the previously + * recorded term) and false otherwise. + */ + virtual bool updateTerm(long long term) = 0; + protected: ReplicationCoordinator(); diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 5be07179f69..79a47f52a7f 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -2541,6 +2541,9 @@ namespace { if (!isV1ElectionProtocol()) { return {ErrorCodes::BadValue, "not using election protocol v1"}; } + + updateTerm(args.getTerm()); + Status result{ErrorCodes::InternalError, "didn't set status in processReplSetRequestVotes"}; CBHStatus cbh = _replExecutor.scheduleWork( stdx::bind(&ReplicationCoordinatorImpl::_processReplSetRequestVotes_finish, @@ -2589,6 +2592,9 @@ namespace { if (!isV1ElectionProtocol()) { return {ErrorCodes::BadValue, "not using election protocol v1"}; } + + updateTerm(args.getTerm()); + Status result{ErrorCodes::InternalError, "didn't set status in processReplSetDeclareElectionWinner"}; CBHStatus cbh = _replExecutor.scheduleWork( @@ -2735,5 +2741,74 @@ namespace { *term = _topCoord->getTerm(); } + bool ReplicationCoordinatorImpl::updateTerm(long long term) { + bool updated = false; + CBHStatus cbh = _replExecutor.scheduleWork( + stdx::bind(&ReplicationCoordinatorImpl::_updateTerm_helper, + this, + stdx::placeholders::_1, + term, + &updated, + nullptr)); + if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { + return false; + } + fassert(28670, cbh.getStatus()); + _replExecutor.wait(cbh.getValue()); + return updated; + } + + bool ReplicationCoordinatorImpl::updateTerm_forTest(long long term) { + bool updated = false; + Handle cbHandle; + CBHStatus cbh = _replExecutor.scheduleWork( + stdx::bind(&ReplicationCoordinatorImpl::_updateTerm_helper, + this, + stdx::placeholders::_1, + term, + &updated, + &cbHandle)); + if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { + return false; + } + fassert(28673, cbh.getStatus()); + _replExecutor.wait(cbh.getValue()); + _replExecutor.wait(cbHandle); + return updated; + } + + void ReplicationCoordinatorImpl::_updateTerm_helper( + const ReplicationExecutor::CallbackData& cbData, + long long term, + bool* updated, + Handle* cbHandle) { + if (cbData.status == ErrorCodes::CallbackCanceled) { + return; + } + + *updated = _updateTerm_incallback(term, cbHandle); + } + + bool ReplicationCoordinatorImpl::_updateTerm_incallback(long long term, Handle* cbHandle) { + bool updated = _topCoord->updateTerm(term); + + if (updated && getMemberState().primary()) { + log() << "stepping down from primary, because a new term has begun"; + _topCoord->prepareForStepDown(); + CBHStatus cbh = _replExecutor.scheduleWorkWithGlobalExclusiveLock( + stdx::bind(&ReplicationCoordinatorImpl::_stepDownFinish, + this, + stdx::placeholders::_1)); + if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { + return true; + } + fassert(28672, cbh.getStatus()); + if (cbHandle) { + *cbHandle = cbh.getValue(); + } + } + return updated; + } + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 4c4539eabe8..c015fdbfb97 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -267,7 +267,9 @@ namespace repl { /** * Get current term from topology coordinator */ - long long getTerm() override; + virtual long long getTerm() override; + + virtual bool updateTerm(long long term) override; // ================== Test support API =================== @@ -287,6 +289,8 @@ namespace repl { */ Status setLastOptime_forTest(long long cfgVer, long long memberId, const OpTime& opTime); + bool updateTerm_forTest(long long term); + private: ReplicationCoordinatorImpl(const ReplSettings& settings, ReplicationCoordinatorExternalState* externalState, @@ -863,6 +867,16 @@ namespace repl { void _getTerm_helper(const ReplicationExecutor::CallbackData& cbData, long long* term); + /** + * Callback that attempts to set the current term in topology coordinator and + * relinquishes primary if the term actually changes and we are primary. + */ + void _updateTerm_helper(const ReplicationExecutor::CallbackData& cbData, + long long term, + bool* updated, + Handle* cbHandle); + bool _updateTerm_incallback(long long term, Handle* cbHandle); + // // All member variables are labeled with one of the following codes indicating the // synchronization rules for accessing them. diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index d911d007126..7ee2cf55f01 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -161,6 +161,8 @@ namespace { hbStatusResponse = StatusWith<ReplSetHeartbeatResponse>(responseStatus); } + _updateTerm_incallback(hbStatusResponse.getValue().getTerm(), nullptr); + HeartbeatResponseAction action = _topCoord->processHeartbeatResponse( now, diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index 66b2001755c..5ced40ce853 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -918,6 +918,45 @@ namespace { } }; + TEST_F(ReplCoordTest, UpdateTerm) { + ReplCoordTest::setUp(); + init("mySet/test1:1234,test2:1234,test3:1234"); + + assertStartSuccess( + BSON("_id" << "mySet" << + "version" << 1 << + "members" << BSON_ARRAY(BSON("_id" << 0 << "host" << "test1:1234") << + BSON("_id" << 1 << "host" << "test2:1234") << + BSON("_id" << 2 << "host" << "test3:1234")) << + "protocolVersion" << 1), + HostAndPort("test1", 1234)); + getReplCoord()->setMyLastOptime(OpTime(Timestamp (100, 1), 0)); + ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); + ASSERT_TRUE(getReplCoord()->getMemberState().secondary()); + + simulateSuccessfulV1Election(); + + ASSERT_EQUALS(1, getReplCoord()->getTerm()); + ASSERT_TRUE(getReplCoord()->getMemberState().primary()); + + // lower term, no change + getReplCoord()->updateTerm(0); + ASSERT_EQUALS(1, getReplCoord()->getTerm()); + ASSERT_TRUE(getReplCoord()->getMemberState().primary()); + + // same term, no change + getReplCoord()->updateTerm(1); + ASSERT_EQUALS(1, getReplCoord()->getTerm()); + ASSERT_TRUE(getReplCoord()->getMemberState().primary()); + + // higher term, step down and change term + Handle cbHandle; + getReplCoord()->updateTerm_forTest(2); + ASSERT_EQUALS(2, getReplCoord()->getTerm()); + ASSERT_TRUE(getReplCoord()->getMemberState().secondary()); + + } + TEST_F(StepDownTest, StepDownNotPrimary) { OperationContextReplMock txn; OpTimeWithTermZero optime1(100, 1); diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index 4648eb05c7d..9a4e069a747 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -342,5 +342,7 @@ namespace repl { long long ReplicationCoordinatorMock::getTerm() { return OpTime::kDefaultTerm; } + bool ReplicationCoordinatorMock::updateTerm(long long term) { return false; } + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index 7ac09e27c2d..216a60fcafe 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -205,6 +205,8 @@ namespace repl { virtual long long getTerm(); + virtual bool updateTerm(long long term); + private: const ReplSettings _settings; diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h index 8cc1385aaa5..ee33cd2339e 100644 --- a/src/mongo/db/repl/topology_coordinator.h +++ b/src/mongo/db/repl/topology_coordinator.h @@ -110,6 +110,13 @@ namespace repl { */ virtual long long getTerm() const = 0; + /** + * Sets the latest term this member is aware of to the higher of its current value and + * the value passed in as "term". + * Returns true if the local term value is changed. + */ + virtual bool updateTerm(long long term) = 0; + //////////////////////////////////////////////////////////// // // Basic state manipulation methods. diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp index 1da923df904..2dfb470bf5e 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl.cpp @@ -2169,6 +2169,14 @@ namespace { return _maintenanceModeCalls; } + bool TopologyCoordinatorImpl::updateTerm(long long term) { + if (term <= _term) { + return false; + } + _term = term; + return true; + } + long long TopologyCoordinatorImpl::getTerm() const { return _term; } @@ -2245,10 +2253,6 @@ namespace { const ReplSetRequestVotesArgs& args, ReplSetRequestVotesResponse* response, const OpTime& lastAppliedOpTime) { - if (args.getTerm() > _term) { - _term = args.getTerm(); - } - response->setOk(true); response->setTerm(_term); @@ -2290,16 +2294,11 @@ namespace { else if (args.getTerm() < _term) { return {ErrorCodes::BadValue, "term has already passed"}; } - else if (args.getTerm() == _term && + else if (args.getTerm() == _term && _currentPrimaryIndex > -1 && args.getWinnerId() != _rsConfig.getMemberAt(_currentPrimaryIndex).getId()) { return {ErrorCodes::BadValue, "term already has a primary"}; } - if (args.getTerm() > _term) { - _term = args.getTerm(); - *responseTerm = _term; - } - _currentPrimaryIndex = _rsConfig.findMemberIndexByConfigId(args.getWinnerId()); return Status::OK(); } diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h index acd4fdee6f8..db096ec964b 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.h +++ b/src/mongo/db/repl/topology_coordinator_impl.h @@ -128,6 +128,7 @@ namespace repl { virtual std::vector<HostAndPort> getMaybeUpHostAndPorts() const; virtual int getMaintenanceCount() const; virtual long long getTerm() const; + virtual bool updateTerm(long long term); virtual void setForceSyncSourceIndex(int index); virtual HostAndPort chooseNewSyncSource(Date_t now, const OpTime& lastOpApplied); diff --git a/src/mongo/db/repl/topology_coordinator_impl_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_test.cpp index 5433f495f22..0defa78cd36 100644 --- a/src/mongo/db/repl/topology_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl_test.cpp @@ -4648,6 +4648,7 @@ namespace { << "term" << 2 << "winnerId" << 30)); long long responseTerm; + ASSERT(getTopoCoord().updateTerm(winnerArgs.getTerm())); ASSERT_OK(getTopoCoord().processReplSetDeclareElectionWinner(winnerArgs, &responseTerm)); ASSERT_EQUALS(2, responseTerm); @@ -4701,6 +4702,7 @@ namespace { << "term" << 2 << "winnerId" << 30)); long long responseTerm = -1; + ASSERT(getTopoCoord().updateTerm(winnerArgs.getTerm())); ASSERT_OK(getTopoCoord().processReplSetDeclareElectionWinner(winnerArgs, &responseTerm)); ASSERT_EQUALS(2, responseTerm); |