diff options
author | Scott Hernandez <scotthernandez@gmail.com> | 2015-08-28 15:25:06 -0400 |
---|---|---|
committer | Scott Hernandez <scotthernandez@gmail.com> | 2015-08-31 08:05:52 -0400 |
commit | ccbac2bd346c7bf422d1e13742842919932aafc6 (patch) | |
tree | 9dece88a55db37e3f8b8e396e48742ea3000f7de /src/mongo/db | |
parent | 18b5dbd7ebb80cfa0c07a0f480554293c1a0b3f7 (diff) | |
download | mongo-ccbac2bd346c7bf422d1e13742842919932aafc6.tar.gz |
SERVER-19956: arbiter uses last committed time during electionswq
Diffstat (limited to 'src/mongo/db')
3 files changed, 66 insertions, 21 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index f71b24c7b5d..557c3b0f919 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -365,7 +365,7 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig( invariant(_rsConfigState == kConfigStartingUp); const PostMemberStateUpdateAction action = _setCurrentRSConfig_inlock(localConfig, myIndex.getValue()); - _setMyLastOptime_inlock(&lk, lastOpTime, false); + _setMyLastOptimeAndReport_inlock(&lk, lastOpTime, false); _externalState->setGlobalTimestamp(lastOpTime.getTimestamp()); _updateTerm_incallback(term); LOG(1) << "Current term is now " << term; @@ -760,39 +760,30 @@ void ReplicationCoordinatorImpl::setMyHeartbeatMessage(const std::string& msg) { void ReplicationCoordinatorImpl::setMyLastOptimeForward(const OpTime& opTime) { stdx::unique_lock<stdx::mutex> lock(_mutex); if (opTime > _getMyLastOptime_inlock()) { - _setMyLastOptime_inlock(&lock, opTime, false); + _setMyLastOptimeAndReport_inlock(&lock, opTime, false); } } void ReplicationCoordinatorImpl::setMyLastOptime(const OpTime& opTime) { stdx::unique_lock<stdx::mutex> lock(_mutex); - _setMyLastOptime_inlock(&lock, opTime, false); + _setMyLastOptimeAndReport_inlock(&lock, opTime, false); } void ReplicationCoordinatorImpl::resetMyLastOptime() { stdx::unique_lock<stdx::mutex> lock(_mutex); // Reset to uninitialized OpTime - _setMyLastOptime_inlock(&lock, OpTime(), true); + _setMyLastOptimeAndReport_inlock(&lock, OpTime(), true); } -void ReplicationCoordinatorImpl::_setMyLastOptime_inlock(stdx::unique_lock<stdx::mutex>* lock, - const OpTime& opTime, - bool isRollbackAllowed) { +void ReplicationCoordinatorImpl::_setMyLastOptimeAndReport_inlock( + stdx::unique_lock<stdx::mutex>* lock, const OpTime& opTime, bool isRollbackAllowed) { invariant(lock->owns_lock()); - SlaveInfo* mySlaveInfo = &_slaveInfo[_getMyIndexInSlaveInfo_inlock()]; - invariant(isRollbackAllowed || mySlaveInfo->opTime <= opTime); - _updateSlaveInfoOptime_inlock(mySlaveInfo, opTime); + _setMyLastOptime_inlock(opTime, isRollbackAllowed); if (getReplicationMode() != modeReplSet) { return; } - for (auto& opTimeWaiter : _opTimeWaiterList) { - if (*(opTimeWaiter->opTime) <= opTime) { - opTimeWaiter->condVar->notify_all(); - } - } - if (_getMemberState_inlock().primary()) { return; } @@ -802,6 +793,19 @@ void ReplicationCoordinatorImpl::_setMyLastOptime_inlock(stdx::unique_lock<stdx: _externalState->forwardSlaveProgress(); // Must do this outside _mutex } +void ReplicationCoordinatorImpl::_setMyLastOptime_inlock(const OpTime& opTime, + bool isRollbackAllowed) { + SlaveInfo* mySlaveInfo = &_slaveInfo[_getMyIndexInSlaveInfo_inlock()]; + invariant(isRollbackAllowed || mySlaveInfo->opTime <= opTime); + _updateSlaveInfoOptime_inlock(mySlaveInfo, opTime); + + for (auto& opTimeWaiter : _opTimeWaiterList) { + if (*(opTimeWaiter->opTime) <= opTime) { + opTimeWaiter->condVar->notify_all(); + } + } +} + OpTime ReplicationCoordinatorImpl::getMyLastOptime() const { stdx::lock_guard<stdx::mutex> lock(_mutex); return _getMyLastOptime_inlock(); @@ -2604,7 +2608,7 @@ void ReplicationCoordinatorImpl::resetLastOpTimeFromOplog(OperationContext* txn) lastOpTime = lastOpTimeStatus.getValue(); } stdx::unique_lock<stdx::mutex> lk(_mutex); - _setMyLastOptime_inlock(&lk, lastOpTime, true); + _setMyLastOptimeAndReport_inlock(&lk, lastOpTime, true); _externalState->setGlobalTimestamp(lastOpTime.getTimestamp()); } @@ -2678,6 +2682,10 @@ void ReplicationCoordinatorImpl::_setLastCommittedOpTime_inlock(const OpTime& co return; } + if (_getMemberState_inlock().arbiter()) { + _setMyLastOptime_inlock(committedOpTime, false); + } + _lastCommittedOpTime = committedOpTime; auto maxSnapshotForOpTime = SnapshotInfo{committedOpTime, SnapshotName::max()}; @@ -2719,7 +2727,9 @@ Status ReplicationCoordinatorImpl::processReplSetRequestVotes( return {ErrorCodes::BadValue, "not using election protocol v1"}; } - updateTerm(args.getTerm()); + auto termStatus = updateTerm(args.getTerm()); + if (!termStatus.isOK()) + return termStatus; Status result{ErrorCodes::InternalError, "didn't set status in processReplSetRequestVotes"}; CBHStatus cbh = _replExecutor.scheduleWork( diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 263cc513aa3..6fd652cb951 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -659,10 +659,13 @@ private: * * This function has the same rules for "opTime" as setMyLastOptime(), unless * "isRollbackAllowed" is true. + * + * This function will also report our position externally (like upstream) if necessary. */ - void _setMyLastOptime_inlock(stdx::unique_lock<stdx::mutex>* lock, - const OpTime& opTime, - bool isRollbackAllowed); + void _setMyLastOptimeAndReport_inlock(stdx::unique_lock<stdx::mutex>* lock, + const OpTime& opTime, + bool isRollbackAllowed); + void _setMyLastOptime_inlock(const OpTime& opTime, bool isRollbackAllowed); /** * Schedules a heartbeat to be sent to "target" at "when". "targetIndex" is the index diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp index 6ab99d225f1..1b118e1c53b 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp @@ -30,6 +30,7 @@ #include "mongo/platform/basic.h" +#include "mongo/bson/json.h" #include "mongo/db/jsobj.h" #include "mongo/db/operation_context_noop.h" #include "mongo/db/repl/repl_set_heartbeat_args.h" @@ -40,6 +41,7 @@ #include "mongo/db/repl/replication_coordinator_test_fixture.h" #include "mongo/db/repl/topology_coordinator_impl.h" #include "mongo/executor/network_interface_mock.h" +#include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/unittest/unittest.h" #include "mongo/util/log.h" @@ -256,6 +258,36 @@ TEST_F(ReplCoordHBV1Test, OnlyUnauthorizedUpCausesRecovering) { assertMemberState(MemberState::RS_RECOVERING, "0"); } +TEST_F(ReplCoordHBV1Test, ArbiterRecordsCommittedOpTimeFromHeartbeat) { + // Tests that an arbiter will update its committed optime from the heartbeat metadata + assertStartSuccess(fromjson( + "{_id:'mySet', version:1, protocolVersion:1, members:[" + "{_id:1, host:'node1:12345', arbiterOnly:true}, " + "{_id:2, host:'node2:12345'}]}"), + HostAndPort("node1", 12345)); + ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_ARBITER)); + + // calls processReplSetMetadata with the "committed" optime and verifies that the arbiter sets + // its current optime to 'expected' + auto test = [this](OpTime committedOpTime, OpTime expected) { + // process heartbeat metadata directly + StatusWith<rpc::ReplSetMetadata> metadata = rpc::ReplSetMetadata::readFromMetadata(BSON( + rpc::kReplSetMetadataFieldName << BSON( + "lastOpCommitted" << BSON("ts" << committedOpTime.getTimestamp() << "term" + << committedOpTime.getTerm()) << "lastOpVisible" + << BSON("ts" << committedOpTime.getTimestamp() << "term" + << committedOpTime.getTerm()) << "configVersion" << 1 + << "primaryIndex" << 1 << "term" << committedOpTime.getTerm()))); + getReplCoord()->processReplSetMetadata(metadata.getValue()); + + ASSERT_EQ(getReplCoord()->getMyLastOptime().getTimestamp(), expected.getTimestamp()); + }; + + OpTime committedOpTime{Timestamp{10, 10}, 10}; + test(committedOpTime, committedOpTime); + OpTime olderOpTime{Timestamp{2, 2}, 9}; + test(olderOpTime, committedOpTime); +} } // namespace } // namespace repl } // namespace mongo |