diff options
author | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2016-05-20 22:08:27 -0400 |
---|---|---|
committer | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2016-05-27 17:37:55 -0400 |
commit | 3dd8ba46bf2ce350b5e80b1b2b016a10007beb7b (patch) | |
tree | c0f5896b4727eb7bdc06a5a8fbb9a49d33f1fc1a /src/mongo | |
parent | 66a3866209039ab46274dfe27cf3d985e65d453c (diff) | |
download | mongo-3dd8ba46bf2ce350b5e80b1b2b016a10007beb7b.tar.gz |
SERVER-24222 Update current known primary from command metadata
Diffstat (limited to 'src/mongo')
20 files changed, 126 insertions, 115 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 71c8d28dbcd..c5a5b08a0e7 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -85,8 +85,7 @@ public: ReplicationCoordinatorExternalState* replicationCoordinatorExternalState, BackgroundSync* bgsync); bool shouldStopFetching(const HostAndPort& source, - const OpTime& sourceOpTime, - bool sourceHasSyncSource) override; + const rpc::ReplSetMetadata& metadata) override; private: BackgroundSync* _bgsync; @@ -99,15 +98,13 @@ DataReplicatorExternalStateBackgroundSync::DataReplicatorExternalStateBackground : DataReplicatorExternalStateImpl(replicationCoordinator, replicationCoordinatorExternalState), _bgsync(bgsync) {} -bool DataReplicatorExternalStateBackgroundSync::shouldStopFetching(const HostAndPort& source, - const OpTime& sourceOpTime, - bool sourceHasSyncSource) { +bool DataReplicatorExternalStateBackgroundSync::shouldStopFetching( + const HostAndPort& source, const rpc::ReplSetMetadata& metadata) { if (_bgsync->shouldStopFetching()) { return true; } - return DataReplicatorExternalStateImpl::shouldStopFetching( - source, sourceOpTime, sourceHasSyncSource); + return DataReplicatorExternalStateImpl::shouldStopFetching(source, metadata); } /** diff --git a/src/mongo/db/repl/data_replicator_external_state.h b/src/mongo/db/repl/data_replicator_external_state.h index d19f46f9711..848af9c720e 100644 --- a/src/mongo/db/repl/data_replicator_external_state.h +++ b/src/mongo/db/repl/data_replicator_external_state.h @@ -77,8 +77,7 @@ public: * metadata). */ virtual bool shouldStopFetching(const HostAndPort& source, - const OpTime& sourceOpTime, - bool sourceHasSyncSource) = 0; + const rpc::ReplSetMetadata& metadata) = 0; private: /** diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.cpp b/src/mongo/db/repl/data_replicator_external_state_impl.cpp index 558f3faf700..87b5993b231 100644 --- a/src/mongo/db/repl/data_replicator_external_state_impl.cpp +++ b/src/mongo/db/repl/data_replicator_external_state_impl.cpp @@ -60,14 +60,12 @@ void DataReplicatorExternalStateImpl::processMetadata(const rpc::ReplSetMetadata } bool DataReplicatorExternalStateImpl::shouldStopFetching(const HostAndPort& source, - const OpTime& sourceOpTime, - bool sourceHasSyncSource) { + const rpc::ReplSetMetadata& metadata) { // Re-evaluate quality of sync target. - if (_replicationCoordinator->shouldChangeSyncSource( - source, sourceOpTime, sourceHasSyncSource)) { + if (_replicationCoordinator->shouldChangeSyncSource(source, metadata)) { LOG(1) << "Canceling oplog query because we have to choose a sync source. Current source: " - << source << ", OpTime " << sourceOpTime - << ", hasSyncSource:" << sourceHasSyncSource; + << source << ", OpTime " << metadata.getLastOpVisible() + << ", its sync source index:" << metadata.getSyncSourceIndex(); return true; } return false; diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.h b/src/mongo/db/repl/data_replicator_external_state_impl.h index 25a09e1d7db..e94dccdab32 100644 --- a/src/mongo/db/repl/data_replicator_external_state_impl.h +++ b/src/mongo/db/repl/data_replicator_external_state_impl.h @@ -51,8 +51,7 @@ public: void processMetadata(const rpc::ReplSetMetadata& metadata) override; bool shouldStopFetching(const HostAndPort& source, - const OpTime& sourceOpTime, - bool sourceHasSyncSource) override; + const rpc::ReplSetMetadata& metadata) override; private: StatusWith<OpTime> _multiApply(OperationContext* txn, diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.cpp b/src/mongo/db/repl/data_replicator_external_state_mock.cpp index 83a6e3fd83c..2ec80fad216 100644 --- a/src/mongo/db/repl/data_replicator_external_state_mock.cpp +++ b/src/mongo/db/repl/data_replicator_external_state_mock.cpp @@ -47,11 +47,10 @@ void DataReplicatorExternalStateMock::processMetadata(const rpc::ReplSetMetadata } bool DataReplicatorExternalStateMock::shouldStopFetching(const HostAndPort& source, - const OpTime& sourceOpTime, - bool sourceHasSyncSource) { + const rpc::ReplSetMetadata& metadata) { lastSyncSourceChecked = source; - syncSourceLastOpTime = sourceOpTime; - syncSourceHasSyncSource = sourceHasSyncSource; + syncSourceLastOpTime = metadata.getLastOpVisible(); + syncSourceHasSyncSource = metadata.getSyncSourceIndex() != -1; return shouldStopFetchingResult; } diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.h b/src/mongo/db/repl/data_replicator_external_state_mock.h index 4705fb57bd3..6335552f4b8 100644 --- a/src/mongo/db/repl/data_replicator_external_state_mock.h +++ b/src/mongo/db/repl/data_replicator_external_state_mock.h @@ -48,8 +48,7 @@ public: void processMetadata(const rpc::ReplSetMetadata& metadata) override; bool shouldStopFetching(const HostAndPort& source, - const OpTime& sourceOpTime, - bool sourceHasSyncSource) override; + const rpc::ReplSetMetadata& metadata) override; // Returned by getCurrentTermAndLastCommittedOpTime. long long currentTerm = OpTime::kUninitializedTerm; diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp index 647b623e689..300100d4726 100644 --- a/src/mongo/db/repl/data_replicator_test.cpp +++ b/src/mongo/db/repl/data_replicator_test.cpp @@ -83,8 +83,7 @@ public: _blacklistedSource = host; } bool shouldChangeSyncSource(const HostAndPort& currentSource, - const OpTime& sourcesOpTime, - bool syncSourceHasSyncSource) override { + const rpc::ReplSetMetadata& metadata) override { return false; } SyncSourceResolverResponse selectSyncSource(OperationContext* txn, @@ -125,10 +124,8 @@ public: _syncSourceSelector->blacklistSyncSource(host, until); } bool shouldChangeSyncSource(const HostAndPort& currentSource, - const OpTime& sourcesOpTime, - bool syncSourceHasSyncSource) override { - return _syncSourceSelector->shouldChangeSyncSource( - currentSource, sourcesOpTime, syncSourceHasSyncSource); + const rpc::ReplSetMetadata& metadata) override { + return _syncSourceSelector->shouldChangeSyncSource(currentSource, metadata); } SyncSourceResolverResponse selectSyncSource(OperationContext* txn, const OpTime& lastOpTimeFetched) override { @@ -697,8 +694,7 @@ public: _blacklistedSource = host; } bool shouldChangeSyncSource(const HostAndPort& currentSource, - const OpTime& sourcesOpTime, - bool syncSourceHasSyncSource) override { + const rpc::ReplSetMetadata& metadata) override { return false; } SyncSourceResolverResponse selectSyncSource(OperationContext* txn, @@ -829,8 +825,7 @@ public: } void blacklistSyncSource(const HostAndPort& host, Date_t until) override {} bool shouldChangeSyncSource(const HostAndPort& currentSource, - const OpTime& sourcesOpTime, - bool syncSourceHasSyncSource) override { + const rpc::ReplSetMetadata& metadata) override { return false; } SyncSourceResolverResponse selectSyncSource(OperationContext* txn, diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp index 1655a717210..ec02d16ce1a 100644 --- a/src/mongo/db/repl/oplog_fetcher.cpp +++ b/src/mongo/db/repl/oplog_fetcher.cpp @@ -281,8 +281,7 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result, } const auto& queryResponse = result.getValue(); - OpTime sourcesLastOpTime; - bool syncSourceHasSyncSource = false; + rpc::ReplSetMetadata metadata; // Forward metadata (containing liveness information) to data replicator external state. bool receivedMetadata = @@ -296,10 +295,8 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result, _onShutdown(metadataResult.getStatus()); return; } - auto metadata = metadataResult.getValue(); + metadata = metadataResult.getValue(); _dataReplicatorExternalState->processMetadata(metadata); - sourcesLastOpTime = metadata.getLastOpVisible(); - syncSourceHasSyncSource = metadata.getSyncSourceIndex() != -1; } const auto& documents = queryResponse.documents; @@ -349,13 +346,14 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result, _lastFetched = opTimeWithHash; } - if (_dataReplicatorExternalState->shouldStopFetching( - _fetcher.getSource(), sourcesLastOpTime, syncSourceHasSyncSource)) { + if (_dataReplicatorExternalState->shouldStopFetching(_fetcher.getSource(), metadata)) { _onShutdown(Status(ErrorCodes::InvalidSyncSource, - str::stream() << "sync source " << _fetcher.getSource().toString() - << " (last optime: " << sourcesLastOpTime.toString() - << "; has sync source: " << syncSourceHasSyncSource - << ") is no longer valid"), + str::stream() + << "sync source " << _fetcher.getSource().toString() + << " (last optime: " << metadata.getLastOpVisible().toString() + << "; sync source index: " << metadata.getSyncSourceIndex() + << "; primary index: " << metadata.getPrimaryIndex() + << ") is no longer valid"), opTimeWithHash); return; } diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 172f10b8a08..c33326031b7 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -2893,14 +2893,10 @@ void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* txn } bool ReplicationCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentSource, - const OpTime& syncSourceLastOpTime, - bool syncSourceHasSyncSource) { + const rpc::ReplSetMetadata& metadata) { LockGuard topoLock(_topoMutex); - return _topCoord->shouldChangeSyncSource(currentSource, - getMyLastAppliedOpTime(), - syncSourceLastOpTime, - syncSourceHasSyncSource, - _replExecutor.now()); + return _topCoord->shouldChangeSyncSource( + currentSource, getMyLastAppliedOpTime(), metadata, _replExecutor.now()); } SyncSourceResolverResponse ReplicationCoordinatorImpl::selectSyncSource( diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index d7ed62f89d7..53f0ffde4c3 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -275,8 +275,7 @@ public: virtual void resetLastOpTimesFromOplog(OperationContext* txn) override; virtual bool shouldChangeSyncSource(const HostAndPort& currentSource, - const OpTime& syncSourceLastOpTime, - bool syncSourceHasSyncSource) override; + const rpc::ReplSetMetadata& metadata) override; virtual SyncSourceResolverResponse selectSyncSource(OperationContext* txn, const OpTime& lastOpTimeFetched) override; diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index 5384bf45d95..fc0f9866f55 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -357,8 +357,7 @@ void ReplicationCoordinatorMock::resetLastOpTimesFromOplog(OperationContext* txn } bool ReplicationCoordinatorMock::shouldChangeSyncSource(const HostAndPort& currentSource, - const OpTime& syncSourceLastOpTime, - bool syncSourceHasSyncSource) { + const rpc::ReplSetMetadata& metadata) { invariant(false); } diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index 5fdf319306d..2b56a93c219 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -208,8 +208,7 @@ public: virtual void resetLastOpTimesFromOplog(OperationContext* txn); virtual bool shouldChangeSyncSource(const HostAndPort& currentSource, - const OpTime& syncSourceLastOpTime, - bool syncSourceHasSyncSource); + const rpc::ReplSetMetadata& metadata); virtual OpTime getLastCommittedOpTime() const; diff --git a/src/mongo/db/repl/sync_source_selector.h b/src/mongo/db/repl/sync_source_selector.h index 812944420a9..c3621c3e95d 100644 --- a/src/mongo/db/repl/sync_source_selector.h +++ b/src/mongo/db/repl/sync_source_selector.h @@ -68,17 +68,16 @@ public: /** * Determines if a new sync source should be chosen, if a better candidate sync source is - * available. If the current sync source's last optime ("syncSourceLastOpTime" under + * available. If the current sync source's last optime (visibleOpTime of metadata under * protocolVersion 1, but pulled from the MemberHeartbeatData in protocolVersion 0) is more than * _maxSyncSourceLagSecs behind any syncable source, this function returns true. If we are * running in ProtocolVersion 1, our current sync source is not primary, has no sync source - * ("syncSourceHasSyncSource" is false), and only has data up to "myLastOpTime", returns true. + * and only has data up to "myLastOpTime", returns true. * * "now" is used to skip over currently blacklisted sync sources. */ virtual bool shouldChangeSyncSource(const HostAndPort& currentSource, - const OpTime& syncSourceLastOpTime, - bool syncSourceHasSyncSource) = 0; + const rpc::ReplSetMetadata& metadata) = 0; /** * Returns a SyncSourceResolverResponse containing the syncSource or a new MinValid boundry as diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h index 0fd9974def5..27242c393d8 100644 --- a/src/mongo/db/repl/topology_coordinator.h +++ b/src/mongo/db/repl/topology_coordinator.h @@ -165,8 +165,7 @@ public: */ virtual bool shouldChangeSyncSource(const HostAndPort& currentSource, const OpTime& myLastOpTime, - const OpTime& syncSourceLastOpTime, - bool syncSourceHasSyncSource, + const rpc::ReplSetMetadata& metadata, Date_t now) const = 0; /** diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp index 45f8e6280ad..1cfaee288d4 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl.cpp @@ -2290,10 +2290,11 @@ long long TopologyCoordinatorImpl::getTerm() { return _term; } +// TODO(siyuan): Merge _hddata into _slaveInfo, so that we have a single view of the +// replset. Passing metadata is unnecessary. bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentSource, const OpTime& myLastOpTime, - const OpTime& syncSourceLastOpTime, - bool syncSourceHasSyncSource, + const rpc::ReplSetMetadata& metadata, Date_t now) const { // Methodology: // If there exists a viable sync source member other than currentSource, whose oplog has @@ -2307,14 +2308,16 @@ bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentS return true; } - const int currentSourceIndex = _rsConfig.findMemberIndexByHostAndPort(currentSource); - if (currentSourceIndex == -1) { + if (metadata.getConfigVersion() != _rsConfig.getConfigVersion()) { return true; } + + const int currentSourceIndex = _rsConfig.findMemberIndexByHostAndPort(currentSource); + invariant(currentSourceIndex != -1); invariant(currentSourceIndex != _selfIndex); OpTime currentSourceOpTime = - std::max(syncSourceLastOpTime, _hbdata.at(currentSourceIndex).getAppliedOpTime()); + std::max(metadata.getLastOpVisible(), _hbdata.at(currentSourceIndex).getAppliedOpTime()); if (currentSourceOpTime.isNull()) { // Haven't received a heartbeat from the sync source yet, so can't tell if we should @@ -2322,9 +2325,10 @@ bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentS return false; } - if (_rsConfig.getProtocolVersion() == 1 && !syncSourceHasSyncSource && - currentSourceOpTime <= myLastOpTime && - _hbdata.at(currentSourceIndex).getState() != MemberState::RS_PRIMARY) { + // Change sync source if they are not ahead of us, and don't have a sync source, + // unless they are primary. + if (_rsConfig.getProtocolVersion() == 1 && metadata.getSyncSourceIndex() == -1 && + currentSourceOpTime <= myLastOpTime && metadata.getPrimaryIndex() != currentSourceIndex) { return true; } diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h index 74d7368d552..0cd56272565 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.h +++ b/src/mongo/db/repl/topology_coordinator_impl.h @@ -156,8 +156,7 @@ public: virtual void clearSyncSourceBlacklist(); virtual bool shouldChangeSyncSource(const HostAndPort& currentSource, const OpTime& myLastOpTime, - const OpTime& syncSourceLastOpTime, - bool syncSourceHasSyncSource, + const rpc::ReplSetMetadata& metadata, Date_t now) const; virtual bool becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(Date_t now); virtual void setElectionSleepUntil(Date_t newTime); diff --git a/src/mongo/db/repl/topology_coordinator_impl_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_test.cpp index b0e41018c32..6d1e9f08a97 100644 --- a/src/mongo/db/repl/topology_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl_test.cpp @@ -51,6 +51,7 @@ ASSERT_EQUALS(mongo::repl::HeartbeatResponseAction::NoAction, (EXPRESSION)) using std::unique_ptr; +using mongo::rpc::ReplSetMetadata; namespace mongo { namespace repl { @@ -146,6 +147,12 @@ protected: _currentConfig = config; } + // Make the metadata coming from sync source. Only set visibleOpTime. + ReplSetMetadata makeMetadata(OpTime opTime = OpTime()) { + return ReplSetMetadata( + _topo->getTerm(), OpTime(), opTime, _currentConfig.getConfigVersion(), OID(), -1, -1); + } + HeartbeatResponseAction receiveUpHeartbeat(const HostAndPort& member, const std::string& setName, MemberState memberState, @@ -629,9 +636,9 @@ TEST_F(TopoCoordTest, ChooseRequestedSyncSourceOnlyTheFirstTimeAfterTheSyncSourc // force should cause shouldChangeSyncSource() to return true // even if the currentSource is the force target ASSERT_TRUE( - getTopoCoord().shouldChangeSyncSource(HostAndPort("h2"), OpTime(), OpTime(), false, now())); + getTopoCoord().shouldChangeSyncSource(HostAndPort("h2"), OpTime(), makeMetadata(), now())); ASSERT_TRUE( - getTopoCoord().shouldChangeSyncSource(HostAndPort("h3"), OpTime(), OpTime(), false, now())); + getTopoCoord().shouldChangeSyncSource(HostAndPort("h3"), OpTime(), makeMetadata(), now())); getTopoCoord().chooseNewSyncSource(now()++, Timestamp()); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); @@ -4909,9 +4916,10 @@ TEST_F(HeartbeatResponseTest, ReconfigNodeRemovedBetweenHeartbeatRequestAndRepso TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceWhenMemberNotInConfig) { // In this test, the TopologyCoordinator should tell us to change sync sources away from - // "host4" since "host4" is absent from the config - ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host4"), OpTime(), OpTime(), false, now())); + // "host4" since "host4" is absent from the config of version 10. + ReplSetMetadata metadata(0, OpTime(), OpTime(), 10, OID(), -1, -1); + ASSERT_TRUE( + getTopoCoord().shouldChangeSyncSource(HostAndPort("host4"), OpTime(), metadata, now())); } TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceWhenMemberHasYetToHeartbeatUs) { @@ -4919,7 +4927,7 @@ TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceWhenMemberHasYetToHeartbeatU // "host2" since we do not yet have a heartbeat (and as a result do not yet have an optime) // for "host2" ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), OpTime(), false, now())); + HostAndPort("host2"), OpTime(), makeMetadata(), now())); } TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenNodeIsFreshByHeartbeatButNotMetadata) { @@ -4949,8 +4957,9 @@ TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenNodeIsFreshByHeartbea // set up complete, time for actual check startCapturingLogMessages(); - ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now())); + auto metadata = makeMetadata(lastOpTimeApplied); + ASSERT_FALSE( + getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), metadata, now())); stopCapturingLogMessages(); ASSERT_EQUALS(0, countLogLinesContaining("re-evaluating sync source")); } @@ -4982,8 +4991,9 @@ TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenNodeIsStaleByHeartbea // set up complete, time for actual check startCapturingLogMessages(); - ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), fresherLastOpTimeApplied, false, now())); + auto metadata = makeMetadata(fresherLastOpTimeApplied); + ASSERT_FALSE( + getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), metadata, now())); stopCapturingLogMessages(); ASSERT_EQUALS(0, countLogLinesContaining("re-evaluating sync source")); } @@ -5015,7 +5025,7 @@ TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceWhenFresherMemberExists) { // set up complete, time for actual check startCapturingLogMessages(); ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), OpTime(), false, now())); + HostAndPort("host2"), OpTime(), makeMetadata(), now())); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("re-evaluating sync source")); } @@ -5049,18 +5059,18 @@ TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhileFresherMemberIsBlack // set up complete, time for actual check ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), OpTime(), false, now())); + HostAndPort("host2"), OpTime(), makeMetadata(), now())); // unblacklist with too early a time (node should remained blacklisted) getTopoCoord().unblacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(90)); ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), OpTime(), false, now())); + HostAndPort("host2"), OpTime(), makeMetadata(), now())); // unblacklist and it should succeed getTopoCoord().unblacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(100)); startCapturingLogMessages(); ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), OpTime(), false, now())); + HostAndPort("host2"), OpTime(), makeMetadata(), now())); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("re-evaluating sync source")); } @@ -5094,7 +5104,7 @@ TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenFresherMemberIsDown) nextAction = receiveDownHeartbeat(HostAndPort("host3"), "rs0", lastOpTimeApplied); ASSERT_NO_ACTION(nextAction.getAction()); ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), OpTime(), false, now())); + HostAndPort("host2"), OpTime(), makeMetadata(), now())); } TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenFresherMemberIsNotReadable) { @@ -5124,7 +5134,7 @@ TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenFresherMemberIsNotRea // set up complete, time for actual check ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), OpTime(), false, now())); + HostAndPort("host2"), OpTime(), makeMetadata(), now())); } TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenFresherMemberDoesNotBuildIndexes) { @@ -5164,7 +5174,7 @@ TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenFresherMemberDoesNotB // set up complete, time for actual check ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), OpTime(), false, now())); + HostAndPort("host2"), OpTime(), makeMetadata(), now())); } TEST_F(HeartbeatResponseTest, @@ -5207,7 +5217,7 @@ TEST_F(HeartbeatResponseTest, // set up complete, time for actual check startCapturingLogMessages(); ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), OpTime(), false, now())); + HostAndPort("host2"), OpTime(), makeMetadata(), now())); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("re-evaluating sync source")); } diff --git a/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp index 79d6f7e6727..826905a860a 100644 --- a/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp @@ -52,6 +52,7 @@ ASSERT_EQUALS(mongo::repl::HeartbeatResponseAction::NoAction, (EXPRESSION)) using std::unique_ptr; +using mongo::rpc::ReplSetMetadata; namespace mongo { namespace repl { @@ -148,6 +149,20 @@ protected: _currentConfig = config; } + // Make the metadata coming from sync source. + // Only set visibleOpTime, primaryIndex and syncSourceIndex + ReplSetMetadata makeMetadata(OpTime opTime = OpTime(), + int primaryIndex = -1, + int syncSourceIndex = -1) { + return ReplSetMetadata(_topo->getTerm(), + OpTime(), + opTime, + _currentConfig.getConfigVersion(), + OID(), + primaryIndex, + syncSourceIndex); + } + HeartbeatResponseAction receiveUpHeartbeat(const HostAndPort& member, const std::string& setName, MemberState memberState, @@ -622,9 +637,9 @@ TEST_F(TopoCoordTest, ChooseRequestedSyncSourceOnlyTheFirstTimeAfterTheSyncSourc // force should cause shouldChangeSyncSource() to return true // even if the currentSource is the force target ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("h2"), OpTime(), oldOpTime, false, now())); + HostAndPort("h2"), OpTime(), makeMetadata(oldOpTime), now())); ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("h3"), OpTime(), newOpTime, false, now())); + HostAndPort("h3"), OpTime(), makeMetadata(newOpTime), now())); getTopoCoord().chooseNewSyncSource(now()++, Timestamp()); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); @@ -2696,7 +2711,7 @@ TEST_F(HeartbeatResponseTestV1, // set up complete, time for actual check startCapturingLogMessages(); ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now())); + HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now())); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("re-evaluating sync source")); } @@ -2738,7 +2753,7 @@ TEST_F(HeartbeatResponseTestV1, ASSERT_NO_ACTION(nextAction.getAction()); // Show we like host2 while it is primary. ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), lastOpTimeApplied, lastOpTimeApplied, false, now())); + HostAndPort("host2"), lastOpTimeApplied, makeMetadata(lastOpTimeApplied, 1), now())); // Show that we also like host2 while it has a sync source. nextAction = receiveUpHeartbeat(HostAndPort("host2"), @@ -2749,7 +2764,7 @@ TEST_F(HeartbeatResponseTestV1, lastOpTimeApplied); ASSERT_NO_ACTION(nextAction.getAction()); ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), lastOpTimeApplied, lastOpTimeApplied, true, now())); + HostAndPort("host2"), lastOpTimeApplied, makeMetadata(lastOpTimeApplied, 2, 2), now())); // Show that we do not like it when it is not PRIMARY and lacks a sync source and lacks progress // beyond our own. @@ -2761,9 +2776,16 @@ TEST_F(HeartbeatResponseTestV1, lastOpTimeApplied); ASSERT_NO_ACTION(nextAction.getAction()); ASSERT(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), lastOpTimeApplied, lastOpTimeApplied, false, now())); + HostAndPort("host2"), lastOpTimeApplied, makeMetadata(lastOpTimeApplied), now())); + + // Sometimes the heartbeat is stale and the metadata says it's the primary. Trust the metadata. + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("host2"), + lastOpTimeApplied, + makeMetadata(lastOpTimeApplied, 1 /* host2 is primary */, -1 /* no sync source */), + now())); - // But if it has some progress beyond our own, we still like it. + // But if it is secondary and has some progress beyond our own, we still like it. OpTime newerThanLastOpTimeApplied = OpTime(Timestamp(500, 0), 0); nextAction = receiveUpHeartbeat(HostAndPort("host2"), "rs0", @@ -2773,7 +2795,7 @@ TEST_F(HeartbeatResponseTestV1, newerThanLastOpTimeApplied); ASSERT_NO_ACTION(nextAction.getAction()); ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), lastOpTimeApplied, newerThanLastOpTimeApplied, false, now())); + HostAndPort("host2"), lastOpTimeApplied, makeMetadata(newerThanLastOpTimeApplied), now())); } TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenFresherMemberIsDown) { @@ -2805,7 +2827,7 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenFresherMemberIsDown nextAction = receiveDownHeartbeat(HostAndPort("host3"), "rs0", lastOpTimeApplied); ASSERT_NO_ACTION(nextAction.getAction()); ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now())); + HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now())); } TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhileFresherMemberIsBlackListed) { @@ -2837,18 +2859,18 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhileFresherMemberIsBla // set up complete, time for actual check ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now())); + HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now())); // unblacklist with too early a time (node should remained blacklisted) getTopoCoord().unblacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(90)); ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now())); + HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now())); // unblacklist and it should succeed getTopoCoord().unblacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(100)); startCapturingLogMessages(); ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now())); + HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now())); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("re-evaluating sync source")); } @@ -2881,7 +2903,7 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceIfNodeIsFreshByHeartbea // set up complete, time for actual check startCapturingLogMessages(); ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now())); + HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now())); stopCapturingLogMessages(); ASSERT_EQUALS(0, countLogLinesContaining("re-evaluating sync source")); } @@ -2914,7 +2936,7 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceIfNodeIsStaleByHeartbea // set up complete, time for actual check startCapturingLogMessages(); ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), fresherLastOpTimeApplied, false, now())); + HostAndPort("host2"), OpTime(), makeMetadata(fresherLastOpTimeApplied), now())); stopCapturingLogMessages(); ASSERT_EQUALS(0, countLogLinesContaining("re-evaluating sync source")); } @@ -2946,7 +2968,7 @@ TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceWhenFresherMemberExists) { // set up complete, time for actual check startCapturingLogMessages(); ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now())); + HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now())); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("re-evaluating sync source")); } @@ -2955,14 +2977,15 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenMemberHasYetToHeart // In this test, the TopologyCoordinator should not tell us to change sync sources away from // "host2" since we do not use the member's heartbeatdata in pv1. ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), OpTime(), true, now())); + HostAndPort("host2"), OpTime(), makeMetadata(), now())); } TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenMemberNotInConfig) { // In this test, the TopologyCoordinator should tell us to change sync sources away from - // "host4" since "host4" is absent from the config - ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host4"), OpTime(), OpTime(), true, now())); + // "host4" since "host4" is absent from the config of version 10. + ReplSetMetadata metadata(0, OpTime(), OpTime(), 10, OID(), -1, -1); + ASSERT_TRUE( + getTopoCoord().shouldChangeSyncSource(HostAndPort("host4"), OpTime(), metadata, now())); } // TODO(dannenberg) figure out what this is trying to test.. @@ -3815,7 +3838,7 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenFresherMemberDoesNo // set up complete, time for actual check ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), lastOpTimeApplied, true, now())); + HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now())); } TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenFresherMemberIsNotReadable) { @@ -3845,7 +3868,7 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenFresherMemberIsNotR // set up complete, time for actual check ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), lastOpTimeApplied, true, now())); + HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now())); } class HeartbeatResponseTestOneRetryV1 : public HeartbeatResponseTestV1 { diff --git a/src/mongo/db/repl/update_position_args.h b/src/mongo/db/repl/update_position_args.h index bb4a94d8ffb..0acac33dd6f 100644 --- a/src/mongo/db/repl/update_position_args.h +++ b/src/mongo/db/repl/update_position_args.h @@ -40,7 +40,7 @@ class Status; namespace repl { /** - * Arguments to the handshake command. + * Arguments to the update position command. */ class UpdatePositionArgs { public: diff --git a/src/mongo/rpc/metadata/repl_set_metadata.h b/src/mongo/rpc/metadata/repl_set_metadata.h index 24022063a04..a73e3001015 100644 --- a/src/mongo/rpc/metadata/repl_set_metadata.h +++ b/src/mongo/rpc/metadata/repl_set_metadata.h @@ -134,8 +134,8 @@ public: } private: - repl::OpTime _lastOpCommitted = repl::OpTime(Timestamp(0, 0), repl::OpTime::kUninitializedTerm); - repl::OpTime _lastOpVisible = repl::OpTime(Timestamp(0, 0), repl::OpTime::kUninitializedTerm); + repl::OpTime _lastOpCommitted; + repl::OpTime _lastOpVisible; long long _currentTerm = -1; long long _configVersion = -1; OID _replicaSetId; |