diff options
20 files changed, 146 insertions, 134 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 5146e0ea1bc..144f14f5fbe 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -85,8 +85,7 @@ public: ReplicationCoordinatorExternalState* replicationCoordinatorExternalState, BackgroundSync* bgsync); bool shouldStopFetching(const HostAndPort& source, - const OpTime& sourceOpTime, - bool sourceHasSyncSource) override; + const rpc::ReplSetMetadata& metadata) override; private: BackgroundSync* _bgsync; @@ -99,15 +98,13 @@ DataReplicatorExternalStateBackgroundSync::DataReplicatorExternalStateBackground : DataReplicatorExternalStateImpl(replicationCoordinator, replicationCoordinatorExternalState), _bgsync(bgsync) {} -bool DataReplicatorExternalStateBackgroundSync::shouldStopFetching(const HostAndPort& source, - const OpTime& sourceOpTime, - bool sourceHasSyncSource) { +bool DataReplicatorExternalStateBackgroundSync::shouldStopFetching( + const HostAndPort& source, const rpc::ReplSetMetadata& metadata) { if (_bgsync->shouldStopFetching()) { return true; } - return DataReplicatorExternalStateImpl::shouldStopFetching( - source, sourceOpTime, sourceHasSyncSource); + return DataReplicatorExternalStateImpl::shouldStopFetching(source, metadata); } /** diff --git a/src/mongo/db/repl/data_replicator_external_state.h b/src/mongo/db/repl/data_replicator_external_state.h index d19f46f9711..848af9c720e 100644 --- a/src/mongo/db/repl/data_replicator_external_state.h +++ b/src/mongo/db/repl/data_replicator_external_state.h @@ -77,8 +77,7 @@ public: * metadata). */ virtual bool shouldStopFetching(const HostAndPort& source, - const OpTime& sourceOpTime, - bool sourceHasSyncSource) = 0; + const rpc::ReplSetMetadata& metadata) = 0; private: /** diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.cpp b/src/mongo/db/repl/data_replicator_external_state_impl.cpp index 558f3faf700..87b5993b231 100644 --- a/src/mongo/db/repl/data_replicator_external_state_impl.cpp +++ b/src/mongo/db/repl/data_replicator_external_state_impl.cpp @@ -60,14 +60,12 @@ void DataReplicatorExternalStateImpl::processMetadata(const rpc::ReplSetMetadata } bool DataReplicatorExternalStateImpl::shouldStopFetching(const HostAndPort& source, - const OpTime& sourceOpTime, - bool sourceHasSyncSource) { + const rpc::ReplSetMetadata& metadata) { // Re-evaluate quality of sync target. - if (_replicationCoordinator->shouldChangeSyncSource( - source, sourceOpTime, sourceHasSyncSource)) { + if (_replicationCoordinator->shouldChangeSyncSource(source, metadata)) { LOG(1) << "Canceling oplog query because we have to choose a sync source. Current source: " - << source << ", OpTime " << sourceOpTime - << ", hasSyncSource:" << sourceHasSyncSource; + << source << ", OpTime " << metadata.getLastOpVisible() + << ", its sync source index:" << metadata.getSyncSourceIndex(); return true; } return false; diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.h b/src/mongo/db/repl/data_replicator_external_state_impl.h index 25a09e1d7db..e94dccdab32 100644 --- a/src/mongo/db/repl/data_replicator_external_state_impl.h +++ b/src/mongo/db/repl/data_replicator_external_state_impl.h @@ -51,8 +51,7 @@ public: void processMetadata(const rpc::ReplSetMetadata& metadata) override; bool shouldStopFetching(const HostAndPort& source, - const OpTime& sourceOpTime, - bool sourceHasSyncSource) override; + const rpc::ReplSetMetadata& metadata) override; private: StatusWith<OpTime> _multiApply(OperationContext* txn, diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.cpp b/src/mongo/db/repl/data_replicator_external_state_mock.cpp index 83a6e3fd83c..2ec80fad216 100644 --- a/src/mongo/db/repl/data_replicator_external_state_mock.cpp +++ b/src/mongo/db/repl/data_replicator_external_state_mock.cpp @@ -47,11 +47,10 @@ void DataReplicatorExternalStateMock::processMetadata(const rpc::ReplSetMetadata } bool DataReplicatorExternalStateMock::shouldStopFetching(const HostAndPort& source, - const OpTime& sourceOpTime, - bool sourceHasSyncSource) { + const rpc::ReplSetMetadata& metadata) { lastSyncSourceChecked = source; - syncSourceLastOpTime = sourceOpTime; - syncSourceHasSyncSource = sourceHasSyncSource; + syncSourceLastOpTime = metadata.getLastOpVisible(); + syncSourceHasSyncSource = metadata.getSyncSourceIndex() != -1; return shouldStopFetchingResult; } diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.h b/src/mongo/db/repl/data_replicator_external_state_mock.h index 4705fb57bd3..6335552f4b8 100644 --- a/src/mongo/db/repl/data_replicator_external_state_mock.h +++ b/src/mongo/db/repl/data_replicator_external_state_mock.h @@ -48,8 +48,7 @@ public: void processMetadata(const rpc::ReplSetMetadata& metadata) override; bool shouldStopFetching(const HostAndPort& source, - const OpTime& sourceOpTime, - bool sourceHasSyncSource) override; + const rpc::ReplSetMetadata& metadata) override; // Returned by getCurrentTermAndLastCommittedOpTime. long long currentTerm = OpTime::kUninitializedTerm; diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp index 294beba46ed..43e42f7cc5e 100644 --- a/src/mongo/db/repl/data_replicator_test.cpp +++ b/src/mongo/db/repl/data_replicator_test.cpp @@ -83,8 +83,7 @@ public: _blacklistedSource = host; } bool shouldChangeSyncSource(const HostAndPort& currentSource, - const OpTime& sourcesOpTime, - bool syncSourceHasSyncSource) override { + const rpc::ReplSetMetadata& metadata) override { return false; } SyncSourceResolverResponse selectSyncSource(OperationContext* txn, @@ -126,10 +125,8 @@ public: _syncSourceSelector->blacklistSyncSource(host, until); } bool shouldChangeSyncSource(const HostAndPort& currentSource, - const OpTime& sourcesOpTime, - bool syncSourceHasSyncSource) override { - return _syncSourceSelector->shouldChangeSyncSource( - currentSource, sourcesOpTime, syncSourceHasSyncSource); + const rpc::ReplSetMetadata& metadata) override { + return _syncSourceSelector->shouldChangeSyncSource(currentSource, metadata); } SyncSourceResolverResponse selectSyncSource(OperationContext* txn, const OpTime& lastOpTimeFetched) override { @@ -718,8 +715,7 @@ public: _blacklistedSource = host; } bool shouldChangeSyncSource(const HostAndPort& currentSource, - const OpTime& sourcesOpTime, - bool syncSourceHasSyncSource) override { + const rpc::ReplSetMetadata& metadata) override { return false; } SyncSourceResolverResponse selectSyncSource(OperationContext* txn, @@ -850,8 +846,7 @@ public: } void blacklistSyncSource(const HostAndPort& host, Date_t until) override {} bool shouldChangeSyncSource(const HostAndPort& currentSource, - const OpTime& sourcesOpTime, - bool syncSourceHasSyncSource) override { + const rpc::ReplSetMetadata& metadata) override { return false; } SyncSourceResolverResponse selectSyncSource(OperationContext* txn, diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp index fcdc7819c72..f43e53817f7 100644 --- a/src/mongo/db/repl/oplog_fetcher.cpp +++ b/src/mongo/db/repl/oplog_fetcher.cpp @@ -50,10 +50,8 @@ namespace { * Calculates await data timeout based on the current replica set configuration. */ Milliseconds calculateAwaitDataTimeout(const ReplicaSetConfig& config) { - // Under protocol version 1, make the awaitData timeout (maxTimeMS) dependent - // on the election - // timeout. This enables the sync source to communicate liveness of the - // primary to secondaries. + // Under protocol version 1, make the awaitData timeout (maxTimeMS) dependent on the election + // timeout. This enables the sync source to communicate liveness of the primary to secondaries. // Under protocol version 0, use a default timeout of 2 seconds for awaitData. if (config.getProtocolVersion() == 1LL) { return config.getElectionTimeoutPeriod() / 2; @@ -111,19 +109,15 @@ StatusWith<BSONObj> makeMetadataObject(bool isV1ElectionProtocol) { /** * Checks the first batch of results from query. - * 'documents' are the first batch of results returned from tailing the remote - * oplog. - * 'lastFetched' optime and hash should be consistent with the predicate in the - * query. + * 'documents' are the first batch of results returned from tailing the remote oplog. + * 'lastFetched' optime and hash should be consistent with the predicate in the query. * Returns RemoteOplogStale if the oplog query has no results. - * Returns OplogStartMissing if we cannot find the optime of the last fetched - * operation in + * Returns OplogStartMissing if we cannot find the optime of the last fetched operation in * the remote oplog. */ Status checkRemoteOplogStart(const Fetcher::Documents& documents, OpTimeWithHash lastFetched) { if (documents.empty()) { - // The GTE query from upstream returns nothing, so we're ahead of the - // upstream. + // The GTE query from upstream returns nothing, so we're ahead of the upstream. return Status(ErrorCodes::RemoteOplogStale, str::stream() << "We are ahead of the sync source. Our last op time fetched: " << lastFetched.opTime.toString()); @@ -176,8 +170,7 @@ StatusWith<OplogFetcher::DocumentsInfo> OplogFetcher::validateDocuments( info.networkDocumentBytes += doc.objsize(); ++info.networkDocumentCount; - // If this is the first response (to the $gte query) then we already applied - // the first doc. + // If this is the first response (to the $gte query) then we already applied the first doc. if (first && info.networkDocumentCount == 1U) { continue; } @@ -208,8 +201,7 @@ StatusWith<OplogFetcher::DocumentsInfo> OplogFetcher::validateDocuments( info.toApplyDocumentCount = documents.size(); info.toApplyDocumentBytes = info.networkDocumentBytes; if (first) { - // The count is one less since the first document found was already applied - // ($gte $ts query) + // The count is one less since the first document found was already applied ($gte $ts query) // and we will not apply it again. --info.toApplyDocumentCount; auto alreadyAppliedDocument = documents.cbegin(); @@ -302,11 +294,9 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result, } const auto& queryResponse = result.getValue(); - OpTime sourcesLastOpTime; - bool syncSourceHasSyncSource = false; + rpc::ReplSetMetadata metadata; - // Forward metadata (containing liveness information) to data replicator - // external state. + // Forward metadata (containing liveness information) to data replicator external state. bool receivedMetadata = queryResponse.otherFields.metadata.hasElement(rpc::kReplSetMetadataFieldName); if (receivedMetadata) { @@ -318,10 +308,8 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result, _onShutdown(metadataResult.getStatus()); return; } - auto metadata = metadataResult.getValue(); + metadata = metadataResult.getValue(); _dataReplicatorExternalState->processMetadata(metadata); - sourcesLastOpTime = metadata.getLastOpVisible(); - syncSourceHasSyncSource = metadata.getSyncSourceIndex() != -1; } const auto& documents = queryResponse.documents; @@ -337,8 +325,7 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result, auto opTimeWithHash = getLastOpTimeWithHashFetched(); - // Check start of remote oplog and, if necessary, stop fetcher to execute - // rollback. + // Check start of remote oplog and, if necessary, stop fetcher to execute rollback. if (queryResponse.first) { auto status = checkRemoteOplogStart(documents, opTimeWithHash); if (!status.isOK()) { @@ -347,8 +334,7 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result, return; } - // If this is the first batch and no rollback is needed, skip the first - // document. + // If this is the first batch and no rollback is needed, skip the first document. firstDocToApply++; } @@ -373,14 +359,15 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result, _lastFetched = opTimeWithHash; } - if (_dataReplicatorExternalState->shouldStopFetching( - _fetcher.getSource(), sourcesLastOpTime, syncSourceHasSyncSource)) { + if (_dataReplicatorExternalState->shouldStopFetching(_fetcher.getSource(), metadata)) { _onShutdown(Status(ErrorCodes::InvalidSyncSource, str::stream() << "sync source " << _fetcher.getSource().toString() << " (last optime: " - << sourcesLastOpTime.toString() - << "; has sync source: " - << syncSourceHasSyncSource + << metadata.getLastOpVisible().toString() + << "; sync source index: " + << metadata.getSyncSourceIndex() + << "; primary index: " + << metadata.getPrimaryIndex() << ") is no longer valid"), opTimeWithHash); return; @@ -407,5 +394,6 @@ void OplogFetcher::_onShutdown(Status status, OpTimeWithHash opTimeWithHash) { _onShutdownCallbackFn(status, opTimeWithHash); } + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 36d10aae018..b6c89737b5f 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -2901,14 +2901,10 @@ void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* txn } bool ReplicationCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentSource, - const OpTime& syncSourceLastOpTime, - bool syncSourceHasSyncSource) { + const rpc::ReplSetMetadata& metadata) { LockGuard topoLock(_topoMutex); - return _topCoord->shouldChangeSyncSource(currentSource, - getMyLastAppliedOpTime(), - syncSourceLastOpTime, - syncSourceHasSyncSource, - _replExecutor.now()); + return _topCoord->shouldChangeSyncSource( + currentSource, getMyLastAppliedOpTime(), metadata, _replExecutor.now()); } SyncSourceResolverResponse ReplicationCoordinatorImpl::selectSyncSource( diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 93979a0dd9c..bfa93ac3459 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -275,8 +275,7 @@ public: virtual void resetLastOpTimesFromOplog(OperationContext* txn) override; virtual bool shouldChangeSyncSource(const HostAndPort& currentSource, - const OpTime& syncSourceLastOpTime, - bool syncSourceHasSyncSource) override; + const rpc::ReplSetMetadata& metadata) override; virtual SyncSourceResolverResponse selectSyncSource(OperationContext* txn, const OpTime& lastOpTimeFetched) override; diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index 706a5cd6747..75efdf63845 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -357,8 +357,7 @@ void ReplicationCoordinatorMock::resetLastOpTimesFromOplog(OperationContext* txn } bool ReplicationCoordinatorMock::shouldChangeSyncSource(const HostAndPort& currentSource, - const OpTime& syncSourceLastOpTime, - bool syncSourceHasSyncSource) { + const rpc::ReplSetMetadata& metadata) { invariant(false); } diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index 15e7c513727..3ace717f8c2 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -208,8 +208,7 @@ public: virtual void resetLastOpTimesFromOplog(OperationContext* txn); virtual bool shouldChangeSyncSource(const HostAndPort& currentSource, - const OpTime& syncSourceLastOpTime, - bool syncSourceHasSyncSource); + const rpc::ReplSetMetadata& metadata); virtual OpTime getLastCommittedOpTime() const; diff --git a/src/mongo/db/repl/sync_source_selector.h b/src/mongo/db/repl/sync_source_selector.h index eb1b19f3764..2b3c8b903e7 100644 --- a/src/mongo/db/repl/sync_source_selector.h +++ b/src/mongo/db/repl/sync_source_selector.h @@ -37,6 +37,10 @@ namespace mongo { class OperationContext; class Timestamp; +namespace rpc { +class ReplSetMetadata; +} + namespace repl { class OpTime; @@ -69,17 +73,16 @@ public: /** * Determines if a new sync source should be chosen, if a better candidate sync source is - * available. If the current sync source's last optime ("syncSourceLastOpTime" under + * available. If the current sync source's last optime (visibleOpTime of metadata under * protocolVersion 1, but pulled from the MemberHeartbeatData in protocolVersion 0) is more than * _maxSyncSourceLagSecs behind any syncable source, this function returns true. If we are * running in ProtocolVersion 1, our current sync source is not primary, has no sync source - * ("syncSourceHasSyncSource" is false), and only has data up to "myLastOpTime", returns true. + * and only has data up to "myLastOpTime", returns true. * * "now" is used to skip over currently blacklisted sync sources. */ virtual bool shouldChangeSyncSource(const HostAndPort& currentSource, - const OpTime& syncSourceLastOpTime, - bool syncSourceHasSyncSource) = 0; + const rpc::ReplSetMetadata& metadata) = 0; /** * Returns a SyncSourceResolverResponse containing the syncSource or a new MinValid boundry as diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h index 43bcc71f909..1548cb774a9 100644 --- a/src/mongo/db/repl/topology_coordinator.h +++ b/src/mongo/db/repl/topology_coordinator.h @@ -165,8 +165,7 @@ public: */ virtual bool shouldChangeSyncSource(const HostAndPort& currentSource, const OpTime& myLastOpTime, - const OpTime& syncSourceLastOpTime, - bool syncSourceHasSyncSource, + const rpc::ReplSetMetadata& metadata, Date_t now) const = 0; /** diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp index a4d7c3c4934..fb3c5aaec98 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl.cpp @@ -2300,10 +2300,11 @@ long long TopologyCoordinatorImpl::getTerm() { return _term; } +// TODO(siyuan): Merge _hddata into _slaveInfo, so that we have a single view of the +// replset. Passing metadata is unnecessary. bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentSource, const OpTime& myLastOpTime, - const OpTime& syncSourceLastOpTime, - bool syncSourceHasSyncSource, + const rpc::ReplSetMetadata& metadata, Date_t now) const { // Methodology: // If there exists a viable sync source member other than currentSource, whose oplog has @@ -2317,14 +2318,21 @@ bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentS return true; } + if (_rsConfig.getProtocolVersion() == 1 && + metadata.getConfigVersion() != _rsConfig.getConfigVersion()) { + return true; + } + const int currentSourceIndex = _rsConfig.findMemberIndexByHostAndPort(currentSource); + // PV0 doesn't use metadata, we have to consult _rsConfig. if (currentSourceIndex == -1) { return true; } + invariant(currentSourceIndex != _selfIndex); OpTime currentSourceOpTime = - std::max(syncSourceLastOpTime, _hbdata.at(currentSourceIndex).getAppliedOpTime()); + std::max(metadata.getLastOpVisible(), _hbdata.at(currentSourceIndex).getAppliedOpTime()); if (currentSourceOpTime.isNull()) { // Haven't received a heartbeat from the sync source yet, so can't tell if we should @@ -2332,9 +2340,10 @@ bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentS return false; } - if (_rsConfig.getProtocolVersion() == 1 && !syncSourceHasSyncSource && - currentSourceOpTime <= myLastOpTime && - _hbdata.at(currentSourceIndex).getState() != MemberState::RS_PRIMARY) { + // Change sync source if they are not ahead of us, and don't have a sync source, + // unless they are primary. + if (_rsConfig.getProtocolVersion() == 1 && metadata.getSyncSourceIndex() == -1 && + currentSourceOpTime <= myLastOpTime && metadata.getPrimaryIndex() != currentSourceIndex) { return true; } diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h index 6ae0b62788d..d3eb233acb8 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.h +++ b/src/mongo/db/repl/topology_coordinator_impl.h @@ -157,8 +157,7 @@ public: virtual void clearSyncSourceBlacklist(); virtual bool shouldChangeSyncSource(const HostAndPort& currentSource, const OpTime& myLastOpTime, - const OpTime& syncSourceLastOpTime, - bool syncSourceHasSyncSource, + const rpc::ReplSetMetadata& metadata, Date_t now) const; virtual bool becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(Date_t now); virtual void setElectionSleepUntil(Date_t newTime); diff --git a/src/mongo/db/repl/topology_coordinator_impl_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_test.cpp index 0ec6363a09d..94fbfb96b58 100644 --- a/src/mongo/db/repl/topology_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl_test.cpp @@ -40,6 +40,7 @@ #include "mongo/db/repl/topology_coordinator_impl.h" #include "mongo/db/server_options.h" #include "mongo/logger/logger.h" +#include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/s/catalog/catalog_manager.h" #include "mongo/unittest/unittest.h" #include "mongo/util/assert_util.h" @@ -51,6 +52,7 @@ ASSERT_EQUALS(mongo::repl::HeartbeatResponseAction::NoAction, (EXPRESSION)) using std::unique_ptr; +using mongo::rpc::ReplSetMetadata; namespace mongo { namespace repl { @@ -146,6 +148,12 @@ protected: _currentConfig = config; } + // Make the metadata coming from sync source. Only set visibleOpTime. + ReplSetMetadata makeMetadata(OpTime opTime = OpTime()) { + return ReplSetMetadata( + _topo->getTerm(), OpTime(), opTime, _currentConfig.getConfigVersion(), OID(), -1, -1); + } + HeartbeatResponseAction receiveUpHeartbeat(const HostAndPort& member, const std::string& setName, MemberState memberState, @@ -656,9 +664,9 @@ TEST_F(TopoCoordTest, ChooseRequestedSyncSourceOnlyTheFirstTimeAfterTheSyncSourc // force should cause shouldChangeSyncSource() to return true // even if the currentSource is the force target ASSERT_TRUE( - getTopoCoord().shouldChangeSyncSource(HostAndPort("h2"), OpTime(), OpTime(), false, now())); + getTopoCoord().shouldChangeSyncSource(HostAndPort("h2"), OpTime(), makeMetadata(), now())); ASSERT_TRUE( - getTopoCoord().shouldChangeSyncSource(HostAndPort("h3"), OpTime(), OpTime(), false, now())); + getTopoCoord().shouldChangeSyncSource(HostAndPort("h3"), OpTime(), makeMetadata(), now())); getTopoCoord().chooseNewSyncSource(now()++, Timestamp()); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); @@ -5217,9 +5225,10 @@ TEST_F(HeartbeatResponseTest, ReconfigNodeRemovedBetweenHeartbeatRequestAndRepso TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceWhenMemberNotInConfig) { // In this test, the TopologyCoordinator should tell us to change sync sources away from - // "host4" since "host4" is absent from the config - ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host4"), OpTime(), OpTime(), false, now())); + // "host4" since "host4" is absent from the config of version 10. + ReplSetMetadata metadata(0, OpTime(), OpTime(), 10, OID(), -1, -1); + ASSERT_TRUE( + getTopoCoord().shouldChangeSyncSource(HostAndPort("host4"), OpTime(), metadata, now())); } TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceWhenMemberHasYetToHeartbeatUs) { @@ -5227,7 +5236,7 @@ TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceWhenMemberHasYetToHeartbeatU // "host2" since we do not yet have a heartbeat (and as a result do not yet have an optime) // for "host2" ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), OpTime(), false, now())); + HostAndPort("host2"), OpTime(), makeMetadata(), now())); } TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenNodeIsFreshByHeartbeatButNotMetadata) { @@ -5257,8 +5266,9 @@ TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenNodeIsFreshByHeartbea // set up complete, time for actual check startCapturingLogMessages(); - ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now())); + auto metadata = makeMetadata(lastOpTimeApplied); + ASSERT_FALSE( + getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), metadata, now())); stopCapturingLogMessages(); ASSERT_EQUALS(0, countLogLinesContaining("re-evaluating sync source")); } @@ -5290,8 +5300,9 @@ TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenNodeIsStaleByHeartbea // set up complete, time for actual check startCapturingLogMessages(); - ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), fresherLastOpTimeApplied, false, now())); + auto metadata = makeMetadata(fresherLastOpTimeApplied); + ASSERT_FALSE( + getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), metadata, now())); stopCapturingLogMessages(); ASSERT_EQUALS(0, countLogLinesContaining("re-evaluating sync source")); } @@ -5323,7 +5334,7 @@ TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceWhenFresherMemberExists) { // set up complete, time for actual check startCapturingLogMessages(); ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), OpTime(), false, now())); + HostAndPort("host2"), OpTime(), makeMetadata(), now())); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("re-evaluating sync source")); } @@ -5357,18 +5368,18 @@ TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhileFresherMemberIsBlack // set up complete, time for actual check ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), OpTime(), false, now())); + HostAndPort("host2"), OpTime(), makeMetadata(), now())); // unblacklist with too early a time (node should remained blacklisted) getTopoCoord().unblacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(90)); ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), OpTime(), false, now())); + HostAndPort("host2"), OpTime(), makeMetadata(), now())); // unblacklist and it should succeed getTopoCoord().unblacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(100)); startCapturingLogMessages(); ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), OpTime(), false, now())); + HostAndPort("host2"), OpTime(), makeMetadata(), now())); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("re-evaluating sync source")); } @@ -5402,7 +5413,7 @@ TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenFresherMemberIsDown) nextAction = receiveDownHeartbeat(HostAndPort("host3"), "rs0", lastOpTimeApplied); ASSERT_NO_ACTION(nextAction.getAction()); ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), OpTime(), false, now())); + HostAndPort("host2"), OpTime(), makeMetadata(), now())); } TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenFresherMemberIsNotReadable) { @@ -5432,7 +5443,7 @@ TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenFresherMemberIsNotRea // set up complete, time for actual check ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), OpTime(), false, now())); + HostAndPort("host2"), OpTime(), makeMetadata(), now())); } TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenFresherMemberDoesNotBuildIndexes) { @@ -5477,7 +5488,7 @@ TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenFresherMemberDoesNotB // set up complete, time for actual check ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), OpTime(), false, now())); + HostAndPort("host2"), OpTime(), makeMetadata(), now())); } TEST_F(HeartbeatResponseTest, @@ -5528,7 +5539,7 @@ TEST_F(HeartbeatResponseTest, // set up complete, time for actual check startCapturingLogMessages(); ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), OpTime(), false, now())); + HostAndPort("host2"), OpTime(), makeMetadata(), now())); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("re-evaluating sync source")); } diff --git a/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp index 684d39514a9..925e8a364ae 100644 --- a/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp @@ -41,6 +41,7 @@ #include "mongo/db/repl/topology_coordinator_impl.h" #include "mongo/db/server_options.h" #include "mongo/logger/logger.h" +#include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/s/catalog/catalog_manager.h" #include "mongo/unittest/unittest.h" #include "mongo/util/assert_util.h" @@ -52,6 +53,7 @@ ASSERT_EQUALS(mongo::repl::HeartbeatResponseAction::NoAction, (EXPRESSION)) using std::unique_ptr; +using mongo::rpc::ReplSetMetadata; namespace mongo { namespace repl { @@ -148,6 +150,20 @@ protected: _currentConfig = config; } + // Make the metadata coming from sync source. + // Only set visibleOpTime, primaryIndex and syncSourceIndex + ReplSetMetadata makeMetadata(OpTime opTime = OpTime(), + int primaryIndex = -1, + int syncSourceIndex = -1) { + return ReplSetMetadata(_topo->getTerm(), + OpTime(), + opTime, + _currentConfig.getConfigVersion(), + OID(), + primaryIndex, + syncSourceIndex); + } + HeartbeatResponseAction receiveUpHeartbeat(const HostAndPort& member, const std::string& setName, MemberState memberState, @@ -649,9 +665,9 @@ TEST_F(TopoCoordTest, ChooseRequestedSyncSourceOnlyTheFirstTimeAfterTheSyncSourc // force should cause shouldChangeSyncSource() to return true // even if the currentSource is the force target ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("h2"), OpTime(), oldOpTime, false, now())); + HostAndPort("h2"), OpTime(), makeMetadata(oldOpTime), now())); ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("h3"), OpTime(), newOpTime, false, now())); + HostAndPort("h3"), OpTime(), makeMetadata(newOpTime), now())); getTopoCoord().chooseNewSyncSource(now()++, Timestamp()); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); @@ -3078,7 +3094,7 @@ TEST_F(HeartbeatResponseTestV1, // set up complete, time for actual check startCapturingLogMessages(); ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now())); + HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now())); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("re-evaluating sync source")); } @@ -3120,7 +3136,7 @@ TEST_F(HeartbeatResponseTestV1, ASSERT_NO_ACTION(nextAction.getAction()); // Show we like host2 while it is primary. ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), lastOpTimeApplied, lastOpTimeApplied, false, now())); + HostAndPort("host2"), lastOpTimeApplied, makeMetadata(lastOpTimeApplied, 1), now())); // Show that we also like host2 while it has a sync source. nextAction = receiveUpHeartbeat(HostAndPort("host2"), @@ -3131,7 +3147,7 @@ TEST_F(HeartbeatResponseTestV1, lastOpTimeApplied); ASSERT_NO_ACTION(nextAction.getAction()); ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), lastOpTimeApplied, lastOpTimeApplied, true, now())); + HostAndPort("host2"), lastOpTimeApplied, makeMetadata(lastOpTimeApplied, 2, 2), now())); // Show that we do not like it when it is not PRIMARY and lacks a sync source and lacks progress // beyond our own. @@ -3143,9 +3159,16 @@ TEST_F(HeartbeatResponseTestV1, lastOpTimeApplied); ASSERT_NO_ACTION(nextAction.getAction()); ASSERT(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), lastOpTimeApplied, lastOpTimeApplied, false, now())); + HostAndPort("host2"), lastOpTimeApplied, makeMetadata(lastOpTimeApplied), now())); + + // Sometimes the heartbeat is stale and the metadata says it's the primary. Trust the metadata. + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("host2"), + lastOpTimeApplied, + makeMetadata(lastOpTimeApplied, 1 /* host2 is primary */, -1 /* no sync source */), + now())); - // But if it has some progress beyond our own, we still like it. + // But if it is secondary and has some progress beyond our own, we still like it. OpTime newerThanLastOpTimeApplied = OpTime(Timestamp(500, 0), 0); nextAction = receiveUpHeartbeat(HostAndPort("host2"), "rs0", @@ -3155,7 +3178,7 @@ TEST_F(HeartbeatResponseTestV1, newerThanLastOpTimeApplied); ASSERT_NO_ACTION(nextAction.getAction()); ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), lastOpTimeApplied, newerThanLastOpTimeApplied, false, now())); + HostAndPort("host2"), lastOpTimeApplied, makeMetadata(newerThanLastOpTimeApplied), now())); } TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenFresherMemberIsDown) { @@ -3187,7 +3210,7 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenFresherMemberIsDown nextAction = receiveDownHeartbeat(HostAndPort("host3"), "rs0", lastOpTimeApplied); ASSERT_NO_ACTION(nextAction.getAction()); ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now())); + HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now())); } TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhileFresherMemberIsBlackListed) { @@ -3219,18 +3242,18 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhileFresherMemberIsBla // set up complete, time for actual check ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now())); + HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now())); // unblacklist with too early a time (node should remained blacklisted) getTopoCoord().unblacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(90)); ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now())); + HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now())); // unblacklist and it should succeed getTopoCoord().unblacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(100)); startCapturingLogMessages(); ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now())); + HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now())); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("re-evaluating sync source")); } @@ -3263,7 +3286,7 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceIfNodeIsFreshByHeartbea // set up complete, time for actual check startCapturingLogMessages(); ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now())); + HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now())); stopCapturingLogMessages(); ASSERT_EQUALS(0, countLogLinesContaining("re-evaluating sync source")); } @@ -3296,7 +3319,7 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceIfNodeIsStaleByHeartbea // set up complete, time for actual check startCapturingLogMessages(); ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), fresherLastOpTimeApplied, false, now())); + HostAndPort("host2"), OpTime(), makeMetadata(fresherLastOpTimeApplied), now())); stopCapturingLogMessages(); ASSERT_EQUALS(0, countLogLinesContaining("re-evaluating sync source")); } @@ -3328,7 +3351,7 @@ TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceWhenFresherMemberExists) { // set up complete, time for actual check startCapturingLogMessages(); ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now())); + HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now())); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("re-evaluating sync source")); } @@ -3337,14 +3360,15 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenMemberHasYetToHeart // In this test, the TopologyCoordinator should not tell us to change sync sources away from // "host2" since we do not use the member's heartbeatdata in pv1. ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), OpTime(), true, now())); + HostAndPort("host2"), OpTime(), makeMetadata(), now())); } TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenMemberNotInConfig) { // In this test, the TopologyCoordinator should tell us to change sync sources away from - // "host4" since "host4" is absent from the config - ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host4"), OpTime(), OpTime(), true, now())); + // "host4" since "host4" is absent from the config of version 10. + ReplSetMetadata metadata(0, OpTime(), OpTime(), 10, OID(), -1, -1); + ASSERT_TRUE( + getTopoCoord().shouldChangeSyncSource(HostAndPort("host4"), OpTime(), metadata, now())); } // TODO(dannenberg) figure out what this is trying to test.. @@ -4274,7 +4298,7 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenFresherMemberDoesNo // set up complete, time for actual check ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), lastOpTimeApplied, true, now())); + HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now())); } TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenFresherMemberIsNotReadable) { @@ -4304,7 +4328,7 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenFresherMemberIsNotR // set up complete, time for actual check ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), lastOpTimeApplied, true, now())); + HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now())); } class HeartbeatResponseTestOneRetryV1 : public HeartbeatResponseTestV1 { diff --git a/src/mongo/db/repl/update_position_args.h b/src/mongo/db/repl/update_position_args.h index bb4a94d8ffb..0acac33dd6f 100644 --- a/src/mongo/db/repl/update_position_args.h +++ b/src/mongo/db/repl/update_position_args.h @@ -40,7 +40,7 @@ class Status; namespace repl { /** - * Arguments to the handshake command. + * Arguments to the update position command. */ class UpdatePositionArgs { public: diff --git a/src/mongo/rpc/metadata/repl_set_metadata.h b/src/mongo/rpc/metadata/repl_set_metadata.h index 24022063a04..a73e3001015 100644 --- a/src/mongo/rpc/metadata/repl_set_metadata.h +++ b/src/mongo/rpc/metadata/repl_set_metadata.h @@ -134,8 +134,8 @@ public: } private: - repl::OpTime _lastOpCommitted = repl::OpTime(Timestamp(0, 0), repl::OpTime::kUninitializedTerm); - repl::OpTime _lastOpVisible = repl::OpTime(Timestamp(0, 0), repl::OpTime::kUninitializedTerm); + repl::OpTime _lastOpCommitted; + repl::OpTime _lastOpVisible; long long _currentTerm = -1; long long _configVersion = -1; OID _replicaSetId; |