summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authormatt dannenberg <matt.dannenberg@10gen.com>2015-05-26 10:34:41 -0400
committermatt dannenberg <matt.dannenberg@10gen.com>2015-06-02 05:18:02 -0400
commitc561f5178239654cfbf14d398ab0fea1f08f5002 (patch)
tree5e6e6815569b131d3aca2a5f9a2296b5396e6df0 /src/mongo/db
parent34eb51e5d8d05abb56055042da244c30e58dfe08 (diff)
downloadmongo-c561f5178239654cfbf14d398ab0fea1f08f5002.tar.gz
SERVER-18669 add updateTerm() in ReplicationCoordinator to detect need to stepdown when term changes while primary
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/repl/replication_coordinator.h10
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp75
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h16
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_test.cpp39
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.h2
-rw-r--r--src/mongo/db/repl/topology_coordinator.h7
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.cpp19
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.h1
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl_test.cpp2
11 files changed, 163 insertions, 12 deletions
diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h
index 903e13fc758..9ce8a4e3a1f 100644
--- a/src/mongo/db/repl/replication_coordinator.h
+++ b/src/mongo/db/repl/replication_coordinator.h
@@ -621,10 +621,18 @@ namespace repl {
virtual void summarizeAsHtml(ReplSetHtmlSummary* output) = 0;
/**
- * Return the current term.
+ * Returns the current term.
*/
virtual long long getTerm() = 0;
+ /**
+ * Attempts to update the current term for the V1 election protocol. If the term changes and
+ * this node is primary, relinquishes primary.
+ * Returns true if the term was updated (that is, when "term" was higher than the previously
+ * recorded term) and false otherwise.
+ */
+ virtual bool updateTerm(long long term) = 0;
+
protected:
ReplicationCoordinator();
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 5be07179f69..79a47f52a7f 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -2541,6 +2541,9 @@ namespace {
if (!isV1ElectionProtocol()) {
return {ErrorCodes::BadValue, "not using election protocol v1"};
}
+
+ updateTerm(args.getTerm());
+
Status result{ErrorCodes::InternalError, "didn't set status in processReplSetRequestVotes"};
CBHStatus cbh = _replExecutor.scheduleWork(
stdx::bind(&ReplicationCoordinatorImpl::_processReplSetRequestVotes_finish,
@@ -2589,6 +2592,9 @@ namespace {
if (!isV1ElectionProtocol()) {
return {ErrorCodes::BadValue, "not using election protocol v1"};
}
+
+ updateTerm(args.getTerm());
+
Status result{ErrorCodes::InternalError,
"didn't set status in processReplSetDeclareElectionWinner"};
CBHStatus cbh = _replExecutor.scheduleWork(
@@ -2735,5 +2741,74 @@ namespace {
*term = _topCoord->getTerm();
}
+ bool ReplicationCoordinatorImpl::updateTerm(long long term) {
+ bool updated = false;
+ CBHStatus cbh = _replExecutor.scheduleWork(
+ stdx::bind(&ReplicationCoordinatorImpl::_updateTerm_helper,
+ this,
+ stdx::placeholders::_1,
+ term,
+ &updated,
+ nullptr));
+ if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
+ return false;
+ }
+ fassert(28670, cbh.getStatus());
+ _replExecutor.wait(cbh.getValue());
+ return updated;
+ }
+
+ bool ReplicationCoordinatorImpl::updateTerm_forTest(long long term) {
+ bool updated = false;
+ Handle cbHandle;
+ CBHStatus cbh = _replExecutor.scheduleWork(
+ stdx::bind(&ReplicationCoordinatorImpl::_updateTerm_helper,
+ this,
+ stdx::placeholders::_1,
+ term,
+ &updated,
+ &cbHandle));
+ if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
+ return false;
+ }
+ fassert(28673, cbh.getStatus());
+ _replExecutor.wait(cbh.getValue());
+ _replExecutor.wait(cbHandle);
+ return updated;
+ }
+
+ void ReplicationCoordinatorImpl::_updateTerm_helper(
+ const ReplicationExecutor::CallbackData& cbData,
+ long long term,
+ bool* updated,
+ Handle* cbHandle) {
+ if (cbData.status == ErrorCodes::CallbackCanceled) {
+ return;
+ }
+
+ *updated = _updateTerm_incallback(term, cbHandle);
+ }
+
+ bool ReplicationCoordinatorImpl::_updateTerm_incallback(long long term, Handle* cbHandle) {
+ bool updated = _topCoord->updateTerm(term);
+
+ if (updated && getMemberState().primary()) {
+ log() << "stepping down from primary, because a new term has begun";
+ _topCoord->prepareForStepDown();
+ CBHStatus cbh = _replExecutor.scheduleWorkWithGlobalExclusiveLock(
+ stdx::bind(&ReplicationCoordinatorImpl::_stepDownFinish,
+ this,
+ stdx::placeholders::_1));
+ if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
+ return true;
+ }
+ fassert(28672, cbh.getStatus());
+ if (cbHandle) {
+ *cbHandle = cbh.getValue();
+ }
+ }
+ return updated;
+ }
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index 4c4539eabe8..c015fdbfb97 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -267,7 +267,9 @@ namespace repl {
/**
* Get current term from topology coordinator
*/
- long long getTerm() override;
+ virtual long long getTerm() override;
+
+ virtual bool updateTerm(long long term) override;
// ================== Test support API ===================
@@ -287,6 +289,8 @@ namespace repl {
*/
Status setLastOptime_forTest(long long cfgVer, long long memberId, const OpTime& opTime);
+ bool updateTerm_forTest(long long term);
+
private:
ReplicationCoordinatorImpl(const ReplSettings& settings,
ReplicationCoordinatorExternalState* externalState,
@@ -863,6 +867,16 @@ namespace repl {
void _getTerm_helper(const ReplicationExecutor::CallbackData& cbData, long long* term);
+ /**
+ * Callback that attempts to set the current term in topology coordinator and
+ * relinquishes primary if the term actually changes and we are primary.
+ */
+ void _updateTerm_helper(const ReplicationExecutor::CallbackData& cbData,
+ long long term,
+ bool* updated,
+ Handle* cbHandle);
+ bool _updateTerm_incallback(long long term, Handle* cbHandle);
+
//
// All member variables are labeled with one of the following codes indicating the
// synchronization rules for accessing them.
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
index d911d007126..7ee2cf55f01 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
@@ -161,6 +161,8 @@ namespace {
hbStatusResponse = StatusWith<ReplSetHeartbeatResponse>(responseStatus);
}
+ _updateTerm_incallback(hbStatusResponse.getValue().getTerm(), nullptr);
+
HeartbeatResponseAction action =
_topCoord->processHeartbeatResponse(
now,
diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
index 66b2001755c..5ced40ce853 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
@@ -918,6 +918,45 @@ namespace {
}
};
+ TEST_F(ReplCoordTest, UpdateTerm) {
+ ReplCoordTest::setUp();
+ init("mySet/test1:1234,test2:1234,test3:1234");
+
+ assertStartSuccess(
+ BSON("_id" << "mySet" <<
+ "version" << 1 <<
+ "members" << BSON_ARRAY(BSON("_id" << 0 << "host" << "test1:1234") <<
+ BSON("_id" << 1 << "host" << "test2:1234") <<
+ BSON("_id" << 2 << "host" << "test3:1234")) <<
+ "protocolVersion" << 1),
+ HostAndPort("test1", 1234));
+ getReplCoord()->setMyLastOptime(OpTime(Timestamp (100, 1), 0));
+ ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
+ ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
+
+ simulateSuccessfulV1Election();
+
+ ASSERT_EQUALS(1, getReplCoord()->getTerm());
+ ASSERT_TRUE(getReplCoord()->getMemberState().primary());
+
+ // lower term, no change
+ getReplCoord()->updateTerm(0);
+ ASSERT_EQUALS(1, getReplCoord()->getTerm());
+ ASSERT_TRUE(getReplCoord()->getMemberState().primary());
+
+ // same term, no change
+ getReplCoord()->updateTerm(1);
+ ASSERT_EQUALS(1, getReplCoord()->getTerm());
+ ASSERT_TRUE(getReplCoord()->getMemberState().primary());
+
+ // higher term, step down and change term
+ Handle cbHandle;
+ getReplCoord()->updateTerm_forTest(2);
+ ASSERT_EQUALS(2, getReplCoord()->getTerm());
+ ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
+
+ }
+
TEST_F(StepDownTest, StepDownNotPrimary) {
OperationContextReplMock txn;
OpTimeWithTermZero optime1(100, 1);
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index 4648eb05c7d..9a4e069a747 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -342,5 +342,7 @@ namespace repl {
long long ReplicationCoordinatorMock::getTerm() { return OpTime::kDefaultTerm; }
+ bool ReplicationCoordinatorMock::updateTerm(long long term) { return false; }
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h
index 7ac09e27c2d..216a60fcafe 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_mock.h
@@ -205,6 +205,8 @@ namespace repl {
virtual long long getTerm();
+ virtual bool updateTerm(long long term);
+
private:
const ReplSettings _settings;
diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h
index 8cc1385aaa5..ee33cd2339e 100644
--- a/src/mongo/db/repl/topology_coordinator.h
+++ b/src/mongo/db/repl/topology_coordinator.h
@@ -110,6 +110,13 @@ namespace repl {
*/
virtual long long getTerm() const = 0;
+ /**
+ * Sets the latest term this member is aware of to the higher of its current value and
+ * the value passed in as "term".
+ * Returns true if the local term value is changed.
+ */
+ virtual bool updateTerm(long long term) = 0;
+
////////////////////////////////////////////////////////////
//
// Basic state manipulation methods.
diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp
index 1da923df904..2dfb470bf5e 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl.cpp
@@ -2169,6 +2169,14 @@ namespace {
return _maintenanceModeCalls;
}
+ bool TopologyCoordinatorImpl::updateTerm(long long term) {
+ if (term <= _term) {
+ return false;
+ }
+ _term = term;
+ return true;
+ }
+
long long TopologyCoordinatorImpl::getTerm() const {
return _term;
}
@@ -2245,10 +2253,6 @@ namespace {
const ReplSetRequestVotesArgs& args,
ReplSetRequestVotesResponse* response,
const OpTime& lastAppliedOpTime) {
- if (args.getTerm() > _term) {
- _term = args.getTerm();
- }
-
response->setOk(true);
response->setTerm(_term);
@@ -2290,16 +2294,11 @@ namespace {
else if (args.getTerm() < _term) {
return {ErrorCodes::BadValue, "term has already passed"};
}
- else if (args.getTerm() == _term &&
+ else if (args.getTerm() == _term && _currentPrimaryIndex > -1 &&
args.getWinnerId() != _rsConfig.getMemberAt(_currentPrimaryIndex).getId()) {
return {ErrorCodes::BadValue, "term already has a primary"};
}
- if (args.getTerm() > _term) {
- _term = args.getTerm();
- *responseTerm = _term;
- }
-
_currentPrimaryIndex = _rsConfig.findMemberIndexByConfigId(args.getWinnerId());
return Status::OK();
}
diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h
index acd4fdee6f8..db096ec964b 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.h
+++ b/src/mongo/db/repl/topology_coordinator_impl.h
@@ -128,6 +128,7 @@ namespace repl {
virtual std::vector<HostAndPort> getMaybeUpHostAndPorts() const;
virtual int getMaintenanceCount() const;
virtual long long getTerm() const;
+ virtual bool updateTerm(long long term);
virtual void setForceSyncSourceIndex(int index);
virtual HostAndPort chooseNewSyncSource(Date_t now,
const OpTime& lastOpApplied);
diff --git a/src/mongo/db/repl/topology_coordinator_impl_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_test.cpp
index 5433f495f22..0defa78cd36 100644
--- a/src/mongo/db/repl/topology_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl_test.cpp
@@ -4648,6 +4648,7 @@ namespace {
<< "term" << 2
<< "winnerId" << 30));
long long responseTerm;
+ ASSERT(getTopoCoord().updateTerm(winnerArgs.getTerm()));
ASSERT_OK(getTopoCoord().processReplSetDeclareElectionWinner(winnerArgs, &responseTerm));
ASSERT_EQUALS(2, responseTerm);
@@ -4701,6 +4702,7 @@ namespace {
<< "term" << 2
<< "winnerId" << 30));
long long responseTerm = -1;
+ ASSERT(getTopoCoord().updateTerm(winnerArgs.getTerm()));
ASSERT_OK(getTopoCoord().processReplSetDeclareElectionWinner(winnerArgs, &responseTerm));
ASSERT_EQUALS(2, responseTerm);