diff options
author | matt dannenberg <matt.dannenberg@10gen.com> | 2015-07-13 09:27:56 -0400 |
---|---|---|
committer | matt dannenberg <matt.dannenberg@10gen.com> | 2015-07-16 11:17:36 -0400 |
commit | 785c4953e2330a5fc2366d20d3309c0ebd6a334a (patch) | |
tree | 70f9ee6a5484f9d82d9471e6911c42d53831de3f /src/mongo | |
parent | ddffe2823221656e10844e7681c5bd766b74c21d (diff) | |
download | mongo-785c4953e2330a5fc2366d20d3309c0ebd6a334a.tar.gz |
SERVER-19375 choose new sync source based on last fetched op rather than last applied op
also change chooseNewSyncSource to take a Timestamp rather than an OpTime
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/repl/data_replicator.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator_test.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/repl/oplogreader.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.h | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_mock.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_mock.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_initialsync.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_source_selector.h | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_impl.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_impl.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_impl_test.cpp | 60 |
13 files changed, 62 insertions, 55 deletions
diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp index 55499077fa7..24031a04f79 100644 --- a/src/mongo/db/repl/data_replicator.cpp +++ b/src/mongo/db/repl/data_replicator.cpp @@ -975,7 +975,7 @@ void DataReplicator::_doNextActions_Rollback_inlock() { void DataReplicator::_doNextActions_Steady_inlock() { // Check sync source is still good. if (_syncSource.empty()) { - _syncSource = _opts.syncSourceSelector->chooseNewSyncSource(); + _syncSource = _opts.syncSourceSelector->chooseNewSyncSource(_lastTimestampFetched); } if (_syncSource.empty()) { // No sync source, reschedule check @@ -1191,7 +1191,7 @@ void DataReplicator::_setState_inlock(const DataReplicatorState& newState) { Status DataReplicator::_ensureGoodSyncSource_inlock() { if (_syncSource.empty()) { - _syncSource = _opts.syncSourceSelector->chooseNewSyncSource(); + _syncSource = _opts.syncSourceSelector->chooseNewSyncSource(_lastTimestampFetched); if (!_syncSource.empty()) { return Status::OK(); } diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp index 7116930557e..e28b3b1fd1e 100644 --- a/src/mongo/db/repl/data_replicator_test.cpp +++ b/src/mongo/db/repl/data_replicator_test.cpp @@ -37,6 +37,7 @@ #include "mongo/db/repl/base_cloner_test_fixture.h" #include "mongo/db/repl/data_replicator.h" #include "mongo/db/repl/member_state.h" +#include "mongo/db/repl/optime.h" #include "mongo/db/repl/replication_executor_test_fixture.h" #include "mongo/db/repl/replication_executor.h" #include "mongo/db/repl/reporter.h" @@ -65,7 +66,7 @@ class SyncSourceSelectorMock : public SyncSourceSelector { public: SyncSourceSelectorMock(const HostAndPort& syncSource) : _syncSource(syncSource) {} void clearSyncSourceBlacklist() override {} - HostAndPort chooseNewSyncSource() override { + HostAndPort chooseNewSyncSource(const Timestamp& ts) override { HostAndPort result = _syncSource; _syncSource = HostAndPort(); return result; @@ -103,8 +104,8 @@ public: void clearSyncSourceBlacklist() override { _syncSourceSelector->clearSyncSourceBlacklist(); } - HostAndPort chooseNewSyncSource() override { - return _syncSourceSelector->chooseNewSyncSource(); + HostAndPort chooseNewSyncSource(const Timestamp& ts) override { + return _syncSourceSelector->chooseNewSyncSource(ts); } void blacklistSyncSource(const HostAndPort& host, Date_t until) override { _syncSourceSelector->blacklistSyncSource(host, until); @@ -541,7 +542,7 @@ TEST_F(InitialSyncTest, FailsOnClone) { class TestSyncSourceSelector2 : public SyncSourceSelector { public: void clearSyncSourceBlacklist() override {} - HostAndPort chooseNewSyncSource() override { + HostAndPort chooseNewSyncSource(const Timestamp& ts) override { LockGuard lk(_mutex); auto result = HostAndPort(str::stream() << "host-" << _nextSourceNum++, -1); _condition.notify_all(); @@ -672,7 +673,7 @@ class ShutdownExecutorSyncSourceSelector : public SyncSourceSelector { public: ShutdownExecutorSyncSourceSelector(ReplicationExecutor* exec) : _exec(exec) {} void clearSyncSourceBlacklist() override {} - HostAndPort chooseNewSyncSource() override { + HostAndPort chooseNewSyncSource(const Timestamp& ts) override { _exec->shutdown(); return HostAndPort(); } diff --git a/src/mongo/db/repl/oplogreader.cpp b/src/mongo/db/repl/oplogreader.cpp index 950517d6192..e363767ec05 100644 --- a/src/mongo/db/repl/oplogreader.cpp +++ b/src/mongo/db/repl/oplogreader.cpp @@ -147,7 +147,7 @@ void OplogReader::connectToSyncSource(OperationContext* txn, invariant(conn() == NULL); while (true) { - HostAndPort candidate = replCoord->chooseNewSyncSource(); + HostAndPort candidate = replCoord->chooseNewSyncSource(lastOpTimeFetched.getTimestamp()); if (candidate.empty()) { if (oldestOpTimeSeen == sentinel) { @@ -186,7 +186,8 @@ void OplogReader::connectToSyncSource(OperationContext* txn, OpTime remoteOldOpTime = extractOpTime(remoteOldestOp); // remoteOldOpTime may come from a very old config, so we cannot compare their terms. - if (lastOpTimeFetched.getTimestamp() < remoteOldOpTime.getTimestamp()) { + if (!lastOpTimeFetched.isNull() && + lastOpTimeFetched.getTimestamp() < remoteOldOpTime.getTimestamp()) { // We're too stale to use this sync source. resetConnection(); replCoord->blacklistSyncSource(candidate, Date_t::now() + Minutes(10)); diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 5927650d0d3..a95115099d6 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -2411,19 +2411,22 @@ bool ReplicationCoordinatorImpl::isReplEnabled() const { } void ReplicationCoordinatorImpl::_chooseNewSyncSource( - const ReplicationExecutor::CallbackArgs& cbData, HostAndPort* newSyncSource) { + const ReplicationExecutor::CallbackArgs& cbData, + const Timestamp& lastTimestampFetched, + HostAndPort* newSyncSource) { if (cbData.status == ErrorCodes::CallbackCanceled) { return; } - *newSyncSource = _topCoord->chooseNewSyncSource(_replExecutor.now(), getMyLastOptime()); + *newSyncSource = _topCoord->chooseNewSyncSource(_replExecutor.now(), lastTimestampFetched); } -HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource() { +HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource(const Timestamp& lastTimestampFetched) { HostAndPort newSyncSource; CBHStatus cbh = _replExecutor.scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_chooseNewSyncSource, this, stdx::placeholders::_1, + lastTimestampFetched, &newSyncSource)); if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { return newSyncSource; // empty diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 1a5d31ead79..859e18f54fb 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -234,7 +234,7 @@ public: virtual bool isReplEnabled() const override; - virtual HostAndPort chooseNewSyncSource() override; + virtual HostAndPort chooseNewSyncSource(const Timestamp& lastTimestampFetched) override; virtual void blacklistSyncSource(const HostAndPort& host, Date_t until) override; @@ -778,6 +778,7 @@ private: * the most appropriate sync source. */ void _chooseNewSyncSource(const ReplicationExecutor::CallbackArgs& cbData, + const Timestamp& lastTimestampFetched, HostAndPort* newSyncSource); /** diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index 51cff896269..1fb8e4d321d 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -300,7 +300,7 @@ Status ReplicationCoordinatorMock::checkReplEnabledForCommand(BSONObjBuilder* re return Status::OK(); } -HostAndPort ReplicationCoordinatorMock::chooseNewSyncSource() { +HostAndPort ReplicationCoordinatorMock::chooseNewSyncSource(const Timestamp& lastTimestampFetched) { return HostAndPort(); } diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index 65c4a22a81a..0feb5331a5b 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -172,7 +172,7 @@ public: virtual Status checkReplEnabledForCommand(BSONObjBuilder* result); - virtual HostAndPort chooseNewSyncSource(); + virtual HostAndPort chooseNewSyncSource(const Timestamp& lastTimestampFetched); virtual void blacklistSyncSource(const HostAndPort& host, Date_t until); diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp index 773eed2319f..d05644c135d 100644 --- a/src/mongo/db/repl/rs_initialsync.cpp +++ b/src/mongo/db/repl/rs_initialsync.cpp @@ -344,13 +344,11 @@ Status _initialSync() { truncateAndResetOplog(&txn, replCoord, bgsync); OplogReader r; - Timestamp now(duration_cast<Seconds>(Milliseconds(curTimeMillis64())), 0); - OpTime nowOpTime(now, std::numeric_limits<long long>::max()); while (r.getHost().empty()) { // We must prime the sync source selector so that it considers all candidates regardless - // of oplog position, by passing in "now" with max term as the last op fetched time. - r.connectToSyncSource(&txn, nowOpTime, replCoord); + // of oplog position, by passing in null OpTime as the last op fetched time. + r.connectToSyncSource(&txn, OpTime(), replCoord); if (r.getHost().empty()) { std::string msg = "no valid sync sources found in current replset to do an initial sync"; diff --git a/src/mongo/db/repl/sync_source_selector.h b/src/mongo/db/repl/sync_source_selector.h index 846c06c24bf..71eccdb8a30 100644 --- a/src/mongo/db/repl/sync_source_selector.h +++ b/src/mongo/db/repl/sync_source_selector.h @@ -33,8 +33,10 @@ #include "mongo/util/time_support.h" namespace mongo { -namespace repl { +class Timestamp; + +namespace repl { /** * Manage list of viable and blocked sync sources that we can replicate from. @@ -54,7 +56,7 @@ public: /** * Chooses a viable sync source, or, if none available, returns empty HostAndPort. */ - virtual HostAndPort chooseNewSyncSource() = 0; + virtual HostAndPort chooseNewSyncSource(const Timestamp& lastTimestampFetched) = 0; /** * Blacklists choosing 'host' as a sync source until time 'until'. diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h index ecc143d33a5..f988cef6af6 100644 --- a/src/mongo/db/repl/topology_coordinator.h +++ b/src/mongo/db/repl/topology_coordinator.h @@ -132,7 +132,7 @@ public: /** * Chooses and sets a new sync source, based on our current knowledge of the world. */ - virtual HostAndPort chooseNewSyncSource(Date_t now, const OpTime& lastOpApplied) = 0; + virtual HostAndPort chooseNewSyncSource(Date_t now, const Timestamp& lastTimestampApplied) = 0; /** * Suppresses selecting "host" as sync source until "until". diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp index ab3e87f9f1f..26433cc4f16 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl.cpp @@ -147,7 +147,8 @@ HostAndPort TopologyCoordinatorImpl::getSyncSourceAddress() const { return _syncSource; } -HostAndPort TopologyCoordinatorImpl::chooseNewSyncSource(Date_t now, const OpTime& lastOpApplied) { +HostAndPort TopologyCoordinatorImpl::chooseNewSyncSource(Date_t now, + const Timestamp& lastTimestampApplied) { // If we are primary, then we aren't syncing from anyone (else). if (_iAmPrimary()) { return HostAndPort(); @@ -255,7 +256,7 @@ HostAndPort TopologyCoordinatorImpl::chooseNewSyncSource(Date_t now, const OpTim } // only consider candidates that are ahead of where we are - if (it->getOpTime().getTimestamp() <= lastOpApplied.getTimestamp()) { + if (it->getOpTime().getTimestamp() <= lastTimestampApplied) { continue; } diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h index e9ef52bf081..6c820888a02 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.h +++ b/src/mongo/db/repl/topology_coordinator_impl.h @@ -138,7 +138,7 @@ public: virtual long long getTerm() const; virtual bool updateTerm(long long term); virtual void setForceSyncSourceIndex(int index); - virtual HostAndPort chooseNewSyncSource(Date_t now, const OpTime& lastOpApplied); + virtual HostAndPort chooseNewSyncSource(Date_t now, const Timestamp& lastTimestampApplied); virtual void blacklistSyncSource(const HostAndPort& host, Date_t until); virtual void unblacklistSyncSource(const HostAndPort& host, Date_t now); virtual void clearSyncSourceBlacklist(); diff --git a/src/mongo/db/repl/topology_coordinator_impl_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_test.cpp index 63690f49d26..80c57e9bc51 100644 --- a/src/mongo/db/repl/topology_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl_test.cpp @@ -212,7 +212,7 @@ private: TEST_F(TopoCoordTest, ChooseSyncSourceBasic) { // if we do not have an index in the config, we should get an empty syncsource - HostAndPort newSyncSource = getTopoCoord().chooseNewSyncSource(now()++, OpTime()); + HostAndPort newSyncSource = getTopoCoord().chooseNewSyncSource(now()++, Timestamp()); ASSERT_TRUE(newSyncSource.empty()); updateConfig(BSON("_id" @@ -235,7 +235,7 @@ TEST_F(TopoCoordTest, ChooseSyncSourceBasic) { ASSERT(getTopoCoord().getSyncSourceAddress().empty()); // Fail due to insufficient number of pings - newSyncSource = getTopoCoord().chooseNewSyncSource(now()++, OpTime()); + newSyncSource = getTopoCoord().chooseNewSyncSource(now()++, Timestamp()); ASSERT_EQUALS(getTopoCoord().getSyncSourceAddress(), newSyncSource); ASSERT(getTopoCoord().getSyncSourceAddress().empty()); @@ -245,37 +245,37 @@ TEST_F(TopoCoordTest, ChooseSyncSourceBasic) { heartbeatFromMember(HostAndPort("h3"), "rs0", MemberState::RS_SECONDARY, OpTime()); // Should choose h2, since it is furthest ahead - newSyncSource = getTopoCoord().chooseNewSyncSource(now()++, OpTime()); + newSyncSource = getTopoCoord().chooseNewSyncSource(now()++, Timestamp()); ASSERT_EQUALS(getTopoCoord().getSyncSourceAddress(), newSyncSource); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); // h3 becomes further ahead, so it should be chosen heartbeatFromMember( HostAndPort("h3"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(2, 0), 0)); - getTopoCoord().chooseNewSyncSource(now()++, OpTime()); + getTopoCoord().chooseNewSyncSource(now()++, Timestamp()); ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); // h3 becomes an invalid candidate for sync source; should choose h2 again heartbeatFromMember( HostAndPort("h3"), "rs0", MemberState::RS_RECOVERING, OpTime(Timestamp(2, 0), 0)); - getTopoCoord().chooseNewSyncSource(now()++, OpTime()); + getTopoCoord().chooseNewSyncSource(now()++, Timestamp()); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); // h3 back in SECONDARY and ahead heartbeatFromMember( HostAndPort("h3"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(2, 0), 0)); - getTopoCoord().chooseNewSyncSource(now()++, OpTime()); + getTopoCoord().chooseNewSyncSource(now()++, Timestamp()); ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); // h3 goes down receiveDownHeartbeat(HostAndPort("h3"), "rs0", OpTime()); - getTopoCoord().chooseNewSyncSource(now()++, OpTime()); + getTopoCoord().chooseNewSyncSource(now()++, Timestamp()); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); // h3 back up and ahead heartbeatFromMember( HostAndPort("h3"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(2, 0), 0)); - getTopoCoord().chooseNewSyncSource(now()++, OpTime()); + getTopoCoord().chooseNewSyncSource(now()++, Timestamp()); ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); } @@ -305,7 +305,7 @@ TEST_F(TopoCoordTest, ChooseSyncSourceCandidates) { 0); setSelfMemberState(MemberState::RS_SECONDARY); - OpTime lastOpTimeWeApplied = OpTime(Timestamp(100, 0), 0); + Timestamp lastOpTimeWeApplied = Timestamp(100, 0); heartbeatFromMember(HostAndPort("h1"), "rs0", @@ -466,7 +466,7 @@ TEST_F(TopoCoordTest, ChooseSyncSourceChainingNotAllowed) { Milliseconds(300)); // No primary situation: should choose no sync source. - getTopoCoord().chooseNewSyncSource(now()++, OpTime()); + getTopoCoord().chooseNewSyncSource(now()++, Timestamp()); ASSERT(getTopoCoord().getSyncSourceAddress().empty()); // Add primary @@ -480,7 +480,7 @@ TEST_F(TopoCoordTest, ChooseSyncSourceChainingNotAllowed) { // h3 is primary and should be chosen as sync source, despite being further away than h2 // and the primary (h3) being behind our most recently applied optime - getTopoCoord().chooseNewSyncSource(now()++, OpTime(Timestamp(10, 0), 0)); + getTopoCoord().chooseNewSyncSource(now()++, Timestamp(10, 0)); ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); } @@ -519,7 +519,7 @@ TEST_F(TopoCoordTest, EmptySyncSourceOnPrimary) { Milliseconds(300)); // No primary situation: should choose h2 sync source. - getTopoCoord().chooseNewSyncSource(now()++, OpTime()); + getTopoCoord().chooseNewSyncSource(now()++, Timestamp()); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); // Become primary @@ -566,18 +566,18 @@ TEST_F(TopoCoordTest, ForceSyncSource) { Milliseconds(100)); // force should overrule other defaults - getTopoCoord().chooseNewSyncSource(now()++, OpTime()); + getTopoCoord().chooseNewSyncSource(now()++, Timestamp()); ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); getTopoCoord().setForceSyncSourceIndex(1); // force should cause shouldChangeSyncSource() to return true // even if the currentSource is the force target ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(HostAndPort("h2"), now())); ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(HostAndPort("h3"), now())); - getTopoCoord().chooseNewSyncSource(now()++, OpTime()); + getTopoCoord().chooseNewSyncSource(now()++, Timestamp()); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); // force should only work for one call to chooseNewSyncSource - getTopoCoord().chooseNewSyncSource(now()++, OpTime()); + getTopoCoord().chooseNewSyncSource(now()++, Timestamp()); ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); } @@ -615,17 +615,17 @@ TEST_F(TopoCoordTest, BlacklistSyncSource) { OpTime(Timestamp(2, 0), 0), Milliseconds(100)); - getTopoCoord().chooseNewSyncSource(now()++, OpTime()); + getTopoCoord().chooseNewSyncSource(now()++, Timestamp()); ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); Date_t expireTime = Date_t::fromMillisSinceEpoch(1000); getTopoCoord().blacklistSyncSource(HostAndPort("h3"), expireTime); - getTopoCoord().chooseNewSyncSource(now()++, OpTime()); + getTopoCoord().chooseNewSyncSource(now()++, Timestamp()); // Should choose second best choice now that h3 is blacklisted. ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); // After time has passed, should go back to original sync source - getTopoCoord().chooseNewSyncSource(expireTime, OpTime()); + getTopoCoord().chooseNewSyncSource(expireTime, Timestamp()); ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); } @@ -666,17 +666,17 @@ TEST_F(TopoCoordTest, BlacklistSyncSourceNoChaining) { OpTime(Timestamp(2, 0), 0), Milliseconds(100)); - getTopoCoord().chooseNewSyncSource(now()++, OpTime()); + getTopoCoord().chooseNewSyncSource(now()++, Timestamp()); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); Date_t expireTime = Date_t::fromMillisSinceEpoch(1000); getTopoCoord().blacklistSyncSource(HostAndPort("h2"), expireTime); - getTopoCoord().chooseNewSyncSource(now()++, OpTime()); + getTopoCoord().chooseNewSyncSource(now()++, Timestamp()); // Can't choose any sync source now. ASSERT(getTopoCoord().getSyncSourceAddress().empty()); // After time has passed, should go back to the primary - getTopoCoord().chooseNewSyncSource(expireTime, OpTime()); + getTopoCoord().chooseNewSyncSource(expireTime, Timestamp()); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); } @@ -715,20 +715,20 @@ TEST_F(TopoCoordTest, OnlyUnauthorizedUpCausesRecovering) { OpTime(Timestamp(2, 0), 0), Milliseconds(100)); - ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().chooseNewSyncSource(now()++, OpTime())); + ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().chooseNewSyncSource(now()++, Timestamp())); ASSERT_EQUALS(MemberState::RS_SECONDARY, getTopoCoord().getMemberState().s); // Good state setup done // Mark nodes down, ensure that we have no source and are secondary receiveDownHeartbeat(HostAndPort("h2"), "rs0", OpTime(), ErrorCodes::NetworkTimeout); receiveDownHeartbeat(HostAndPort("h3"), "rs0", OpTime(), ErrorCodes::NetworkTimeout); - ASSERT_TRUE(getTopoCoord().chooseNewSyncSource(now()++, OpTime()).empty()); + ASSERT_TRUE(getTopoCoord().chooseNewSyncSource(now()++, Timestamp()).empty()); ASSERT_EQUALS(MemberState::RS_SECONDARY, getTopoCoord().getMemberState().s); // Mark nodes down + unauth, ensure that we have no source and are secondary receiveDownHeartbeat(HostAndPort("h2"), "rs0", OpTime(), ErrorCodes::NetworkTimeout); receiveDownHeartbeat(HostAndPort("h3"), "rs0", OpTime(), ErrorCodes::Unauthorized); - ASSERT_TRUE(getTopoCoord().chooseNewSyncSource(now()++, OpTime()).empty()); + ASSERT_TRUE(getTopoCoord().chooseNewSyncSource(now()++, Timestamp()).empty()); ASSERT_EQUALS(MemberState::RS_RECOVERING, getTopoCoord().getMemberState().s); // Having an auth error but with another node up should bring us out of RECOVERING @@ -872,7 +872,7 @@ TEST_F(TopoCoordTest, PrepareSyncFromResponse) { ASSERT_OK(result); ASSERT_EQUALS("requested member \"h5:27017\" is more than 10 seconds behind us", response8.obj()["warning"].String()); - getTopoCoord().chooseNewSyncSource(now()++, ourOpTime); + getTopoCoord().chooseNewSyncSource(now()++, ourOpTime.getTimestamp()); ASSERT_EQUALS(HostAndPort("h5"), getTopoCoord().getSyncSourceAddress()); // Sync successfully from an up-to-date member @@ -886,7 +886,7 @@ TEST_F(TopoCoordTest, PrepareSyncFromResponse) { BSONObj response9Obj = response9.obj(); ASSERT_FALSE(response9Obj.hasField("warning")); ASSERT_EQUALS(HostAndPort("h5").toString(), response9Obj["prevSyncTarget"].String()); - getTopoCoord().chooseNewSyncSource(now()++, ourOpTime); + getTopoCoord().chooseNewSyncSource(now()++, ourOpTime.getTimestamp()); ASSERT_EQUALS(HostAndPort("h6"), getTopoCoord().getSyncSourceAddress()); // node goes down between forceSync and chooseNewSyncSource @@ -897,7 +897,7 @@ TEST_F(TopoCoordTest, PrepareSyncFromResponse) { ASSERT_FALSE(response10Obj.hasField("warning")); ASSERT_EQUALS(HostAndPort("h6").toString(), response10Obj["prevSyncTarget"].String()); receiveDownHeartbeat(HostAndPort("h6"), "rs0", OpTime()); - HostAndPort syncSource = getTopoCoord().chooseNewSyncSource(now()++, OpTime()); + HostAndPort syncSource = getTopoCoord().chooseNewSyncSource(now()++, Timestamp()); ASSERT_EQUALS(HostAndPort("h6"), syncSource); // Try to sync from a member that is unauth'd @@ -917,7 +917,7 @@ TEST_F(TopoCoordTest, PrepareSyncFromResponse) { getTopoCoord().prepareSyncFromResponse( cbData(), HostAndPort("h6"), ourOpTime, &response12, &result); ASSERT_OK(result); - syncSource = getTopoCoord().chooseNewSyncSource(now()++, OpTime()); + syncSource = getTopoCoord().chooseNewSyncSource(now()++, Timestamp()); ASSERT_EQUALS(HostAndPort("h6"), syncSource); } @@ -3636,7 +3636,7 @@ TEST_F(PrepareHeartbeatResponseV1Test, PrepareHeartbeatResponseWithSyncSource) { HostAndPort("h2"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(1, 0), 0)); heartbeatFromMember( HostAndPort("h2"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(1, 0), 0)); - getTopoCoord().chooseNewSyncSource(now()++, OpTime()); + getTopoCoord().chooseNewSyncSource(now()++, Timestamp()); // set up args ReplSetHeartbeatArgsV1 args; @@ -3909,7 +3909,7 @@ TEST_F(PrepareHeartbeatResponseTest, PrepareHeartbeatResponseWithSyncSource) { HostAndPort("h2"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(1, 0), 0)); heartbeatFromMember( HostAndPort("h2"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(1, 0), 0)); - getTopoCoord().chooseNewSyncSource(now()++, OpTime()); + getTopoCoord().chooseNewSyncSource(now()++, Timestamp()); // set up args ReplSetHeartbeatArgs args; |