summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authormatt dannenberg <matt.dannenberg@10gen.com>2015-07-13 09:27:56 -0400
committermatt dannenberg <matt.dannenberg@10gen.com>2015-07-16 11:17:36 -0400
commit785c4953e2330a5fc2366d20d3309c0ebd6a334a (patch)
tree70f9ee6a5484f9d82d9471e6911c42d53831de3f /src/mongo
parentddffe2823221656e10844e7681c5bd766b74c21d (diff)
downloadmongo-785c4953e2330a5fc2366d20d3309c0ebd6a334a.tar.gz
SERVER-19375 choose new sync source based on last fetched op rather than last applied op
also change chooseNewSyncSource to take a Timestamp rather than an OpTime
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/repl/data_replicator.cpp4
-rw-r--r--src/mongo/db/repl/data_replicator_test.cpp11
-rw-r--r--src/mongo/db/repl/oplogreader.cpp5
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp9
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h3
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.h2
-rw-r--r--src/mongo/db/repl/rs_initialsync.cpp6
-rw-r--r--src/mongo/db/repl/sync_source_selector.h6
-rw-r--r--src/mongo/db/repl/topology_coordinator.h2
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.cpp5
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.h2
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl_test.cpp60
13 files changed, 62 insertions, 55 deletions
diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp
index 55499077fa7..24031a04f79 100644
--- a/src/mongo/db/repl/data_replicator.cpp
+++ b/src/mongo/db/repl/data_replicator.cpp
@@ -975,7 +975,7 @@ void DataReplicator::_doNextActions_Rollback_inlock() {
void DataReplicator::_doNextActions_Steady_inlock() {
// Check sync source is still good.
if (_syncSource.empty()) {
- _syncSource = _opts.syncSourceSelector->chooseNewSyncSource();
+ _syncSource = _opts.syncSourceSelector->chooseNewSyncSource(_lastTimestampFetched);
}
if (_syncSource.empty()) {
// No sync source, reschedule check
@@ -1191,7 +1191,7 @@ void DataReplicator::_setState_inlock(const DataReplicatorState& newState) {
Status DataReplicator::_ensureGoodSyncSource_inlock() {
if (_syncSource.empty()) {
- _syncSource = _opts.syncSourceSelector->chooseNewSyncSource();
+ _syncSource = _opts.syncSourceSelector->chooseNewSyncSource(_lastTimestampFetched);
if (!_syncSource.empty()) {
return Status::OK();
}
diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp
index 7116930557e..e28b3b1fd1e 100644
--- a/src/mongo/db/repl/data_replicator_test.cpp
+++ b/src/mongo/db/repl/data_replicator_test.cpp
@@ -37,6 +37,7 @@
#include "mongo/db/repl/base_cloner_test_fixture.h"
#include "mongo/db/repl/data_replicator.h"
#include "mongo/db/repl/member_state.h"
+#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/replication_executor_test_fixture.h"
#include "mongo/db/repl/replication_executor.h"
#include "mongo/db/repl/reporter.h"
@@ -65,7 +66,7 @@ class SyncSourceSelectorMock : public SyncSourceSelector {
public:
SyncSourceSelectorMock(const HostAndPort& syncSource) : _syncSource(syncSource) {}
void clearSyncSourceBlacklist() override {}
- HostAndPort chooseNewSyncSource() override {
+ HostAndPort chooseNewSyncSource(const Timestamp& ts) override {
HostAndPort result = _syncSource;
_syncSource = HostAndPort();
return result;
@@ -103,8 +104,8 @@ public:
void clearSyncSourceBlacklist() override {
_syncSourceSelector->clearSyncSourceBlacklist();
}
- HostAndPort chooseNewSyncSource() override {
- return _syncSourceSelector->chooseNewSyncSource();
+ HostAndPort chooseNewSyncSource(const Timestamp& ts) override {
+ return _syncSourceSelector->chooseNewSyncSource(ts);
}
void blacklistSyncSource(const HostAndPort& host, Date_t until) override {
_syncSourceSelector->blacklistSyncSource(host, until);
@@ -541,7 +542,7 @@ TEST_F(InitialSyncTest, FailsOnClone) {
class TestSyncSourceSelector2 : public SyncSourceSelector {
public:
void clearSyncSourceBlacklist() override {}
- HostAndPort chooseNewSyncSource() override {
+ HostAndPort chooseNewSyncSource(const Timestamp& ts) override {
LockGuard lk(_mutex);
auto result = HostAndPort(str::stream() << "host-" << _nextSourceNum++, -1);
_condition.notify_all();
@@ -672,7 +673,7 @@ class ShutdownExecutorSyncSourceSelector : public SyncSourceSelector {
public:
ShutdownExecutorSyncSourceSelector(ReplicationExecutor* exec) : _exec(exec) {}
void clearSyncSourceBlacklist() override {}
- HostAndPort chooseNewSyncSource() override {
+ HostAndPort chooseNewSyncSource(const Timestamp& ts) override {
_exec->shutdown();
return HostAndPort();
}
diff --git a/src/mongo/db/repl/oplogreader.cpp b/src/mongo/db/repl/oplogreader.cpp
index 950517d6192..e363767ec05 100644
--- a/src/mongo/db/repl/oplogreader.cpp
+++ b/src/mongo/db/repl/oplogreader.cpp
@@ -147,7 +147,7 @@ void OplogReader::connectToSyncSource(OperationContext* txn,
invariant(conn() == NULL);
while (true) {
- HostAndPort candidate = replCoord->chooseNewSyncSource();
+ HostAndPort candidate = replCoord->chooseNewSyncSource(lastOpTimeFetched.getTimestamp());
if (candidate.empty()) {
if (oldestOpTimeSeen == sentinel) {
@@ -186,7 +186,8 @@ void OplogReader::connectToSyncSource(OperationContext* txn,
OpTime remoteOldOpTime = extractOpTime(remoteOldestOp);
// remoteOldOpTime may come from a very old config, so we cannot compare their terms.
- if (lastOpTimeFetched.getTimestamp() < remoteOldOpTime.getTimestamp()) {
+ if (!lastOpTimeFetched.isNull() &&
+ lastOpTimeFetched.getTimestamp() < remoteOldOpTime.getTimestamp()) {
// We're too stale to use this sync source.
resetConnection();
replCoord->blacklistSyncSource(candidate, Date_t::now() + Minutes(10));
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 5927650d0d3..a95115099d6 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -2411,19 +2411,22 @@ bool ReplicationCoordinatorImpl::isReplEnabled() const {
}
void ReplicationCoordinatorImpl::_chooseNewSyncSource(
- const ReplicationExecutor::CallbackArgs& cbData, HostAndPort* newSyncSource) {
+ const ReplicationExecutor::CallbackArgs& cbData,
+ const Timestamp& lastTimestampFetched,
+ HostAndPort* newSyncSource) {
if (cbData.status == ErrorCodes::CallbackCanceled) {
return;
}
- *newSyncSource = _topCoord->chooseNewSyncSource(_replExecutor.now(), getMyLastOptime());
+ *newSyncSource = _topCoord->chooseNewSyncSource(_replExecutor.now(), lastTimestampFetched);
}
-HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource() {
+HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource(const Timestamp& lastTimestampFetched) {
HostAndPort newSyncSource;
CBHStatus cbh =
_replExecutor.scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_chooseNewSyncSource,
this,
stdx::placeholders::_1,
+ lastTimestampFetched,
&newSyncSource));
if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
return newSyncSource; // empty
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index 1a5d31ead79..859e18f54fb 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -234,7 +234,7 @@ public:
virtual bool isReplEnabled() const override;
- virtual HostAndPort chooseNewSyncSource() override;
+ virtual HostAndPort chooseNewSyncSource(const Timestamp& lastTimestampFetched) override;
virtual void blacklistSyncSource(const HostAndPort& host, Date_t until) override;
@@ -778,6 +778,7 @@ private:
* the most appropriate sync source.
*/
void _chooseNewSyncSource(const ReplicationExecutor::CallbackArgs& cbData,
+ const Timestamp& lastTimestampFetched,
HostAndPort* newSyncSource);
/**
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index 51cff896269..1fb8e4d321d 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -300,7 +300,7 @@ Status ReplicationCoordinatorMock::checkReplEnabledForCommand(BSONObjBuilder* re
return Status::OK();
}
-HostAndPort ReplicationCoordinatorMock::chooseNewSyncSource() {
+HostAndPort ReplicationCoordinatorMock::chooseNewSyncSource(const Timestamp& lastTimestampFetched) {
return HostAndPort();
}
diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h
index 65c4a22a81a..0feb5331a5b 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_mock.h
@@ -172,7 +172,7 @@ public:
virtual Status checkReplEnabledForCommand(BSONObjBuilder* result);
- virtual HostAndPort chooseNewSyncSource();
+ virtual HostAndPort chooseNewSyncSource(const Timestamp& lastTimestampFetched);
virtual void blacklistSyncSource(const HostAndPort& host, Date_t until);
diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp
index 773eed2319f..d05644c135d 100644
--- a/src/mongo/db/repl/rs_initialsync.cpp
+++ b/src/mongo/db/repl/rs_initialsync.cpp
@@ -344,13 +344,11 @@ Status _initialSync() {
truncateAndResetOplog(&txn, replCoord, bgsync);
OplogReader r;
- Timestamp now(duration_cast<Seconds>(Milliseconds(curTimeMillis64())), 0);
- OpTime nowOpTime(now, std::numeric_limits<long long>::max());
while (r.getHost().empty()) {
// We must prime the sync source selector so that it considers all candidates regardless
- // of oplog position, by passing in "now" with max term as the last op fetched time.
- r.connectToSyncSource(&txn, nowOpTime, replCoord);
+ // of oplog position, by passing in null OpTime as the last op fetched time.
+ r.connectToSyncSource(&txn, OpTime(), replCoord);
if (r.getHost().empty()) {
std::string msg =
"no valid sync sources found in current replset to do an initial sync";
diff --git a/src/mongo/db/repl/sync_source_selector.h b/src/mongo/db/repl/sync_source_selector.h
index 846c06c24bf..71eccdb8a30 100644
--- a/src/mongo/db/repl/sync_source_selector.h
+++ b/src/mongo/db/repl/sync_source_selector.h
@@ -33,8 +33,10 @@
#include "mongo/util/time_support.h"
namespace mongo {
-namespace repl {
+class Timestamp;
+
+namespace repl {
/**
* Manage list of viable and blocked sync sources that we can replicate from.
@@ -54,7 +56,7 @@ public:
/**
* Chooses a viable sync source, or, if none available, returns empty HostAndPort.
*/
- virtual HostAndPort chooseNewSyncSource() = 0;
+ virtual HostAndPort chooseNewSyncSource(const Timestamp& lastTimestampFetched) = 0;
/**
* Blacklists choosing 'host' as a sync source until time 'until'.
diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h
index ecc143d33a5..f988cef6af6 100644
--- a/src/mongo/db/repl/topology_coordinator.h
+++ b/src/mongo/db/repl/topology_coordinator.h
@@ -132,7 +132,7 @@ public:
/**
* Chooses and sets a new sync source, based on our current knowledge of the world.
*/
- virtual HostAndPort chooseNewSyncSource(Date_t now, const OpTime& lastOpApplied) = 0;
+ virtual HostAndPort chooseNewSyncSource(Date_t now, const Timestamp& lastTimestampApplied) = 0;
/**
* Suppresses selecting "host" as sync source until "until".
diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp
index ab3e87f9f1f..26433cc4f16 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl.cpp
@@ -147,7 +147,8 @@ HostAndPort TopologyCoordinatorImpl::getSyncSourceAddress() const {
return _syncSource;
}
-HostAndPort TopologyCoordinatorImpl::chooseNewSyncSource(Date_t now, const OpTime& lastOpApplied) {
+HostAndPort TopologyCoordinatorImpl::chooseNewSyncSource(Date_t now,
+ const Timestamp& lastTimestampApplied) {
// If we are primary, then we aren't syncing from anyone (else).
if (_iAmPrimary()) {
return HostAndPort();
@@ -255,7 +256,7 @@ HostAndPort TopologyCoordinatorImpl::chooseNewSyncSource(Date_t now, const OpTim
}
// only consider candidates that are ahead of where we are
- if (it->getOpTime().getTimestamp() <= lastOpApplied.getTimestamp()) {
+ if (it->getOpTime().getTimestamp() <= lastTimestampApplied) {
continue;
}
diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h
index e9ef52bf081..6c820888a02 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.h
+++ b/src/mongo/db/repl/topology_coordinator_impl.h
@@ -138,7 +138,7 @@ public:
virtual long long getTerm() const;
virtual bool updateTerm(long long term);
virtual void setForceSyncSourceIndex(int index);
- virtual HostAndPort chooseNewSyncSource(Date_t now, const OpTime& lastOpApplied);
+ virtual HostAndPort chooseNewSyncSource(Date_t now, const Timestamp& lastTimestampApplied);
virtual void blacklistSyncSource(const HostAndPort& host, Date_t until);
virtual void unblacklistSyncSource(const HostAndPort& host, Date_t now);
virtual void clearSyncSourceBlacklist();
diff --git a/src/mongo/db/repl/topology_coordinator_impl_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_test.cpp
index 63690f49d26..80c57e9bc51 100644
--- a/src/mongo/db/repl/topology_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl_test.cpp
@@ -212,7 +212,7 @@ private:
TEST_F(TopoCoordTest, ChooseSyncSourceBasic) {
// if we do not have an index in the config, we should get an empty syncsource
- HostAndPort newSyncSource = getTopoCoord().chooseNewSyncSource(now()++, OpTime());
+ HostAndPort newSyncSource = getTopoCoord().chooseNewSyncSource(now()++, Timestamp());
ASSERT_TRUE(newSyncSource.empty());
updateConfig(BSON("_id"
@@ -235,7 +235,7 @@ TEST_F(TopoCoordTest, ChooseSyncSourceBasic) {
ASSERT(getTopoCoord().getSyncSourceAddress().empty());
// Fail due to insufficient number of pings
- newSyncSource = getTopoCoord().chooseNewSyncSource(now()++, OpTime());
+ newSyncSource = getTopoCoord().chooseNewSyncSource(now()++, Timestamp());
ASSERT_EQUALS(getTopoCoord().getSyncSourceAddress(), newSyncSource);
ASSERT(getTopoCoord().getSyncSourceAddress().empty());
@@ -245,37 +245,37 @@ TEST_F(TopoCoordTest, ChooseSyncSourceBasic) {
heartbeatFromMember(HostAndPort("h3"), "rs0", MemberState::RS_SECONDARY, OpTime());
// Should choose h2, since it is furthest ahead
- newSyncSource = getTopoCoord().chooseNewSyncSource(now()++, OpTime());
+ newSyncSource = getTopoCoord().chooseNewSyncSource(now()++, Timestamp());
ASSERT_EQUALS(getTopoCoord().getSyncSourceAddress(), newSyncSource);
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
// h3 becomes further ahead, so it should be chosen
heartbeatFromMember(
HostAndPort("h3"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(2, 0), 0));
- getTopoCoord().chooseNewSyncSource(now()++, OpTime());
+ getTopoCoord().chooseNewSyncSource(now()++, Timestamp());
ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
// h3 becomes an invalid candidate for sync source; should choose h2 again
heartbeatFromMember(
HostAndPort("h3"), "rs0", MemberState::RS_RECOVERING, OpTime(Timestamp(2, 0), 0));
- getTopoCoord().chooseNewSyncSource(now()++, OpTime());
+ getTopoCoord().chooseNewSyncSource(now()++, Timestamp());
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
// h3 back in SECONDARY and ahead
heartbeatFromMember(
HostAndPort("h3"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(2, 0), 0));
- getTopoCoord().chooseNewSyncSource(now()++, OpTime());
+ getTopoCoord().chooseNewSyncSource(now()++, Timestamp());
ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
// h3 goes down
receiveDownHeartbeat(HostAndPort("h3"), "rs0", OpTime());
- getTopoCoord().chooseNewSyncSource(now()++, OpTime());
+ getTopoCoord().chooseNewSyncSource(now()++, Timestamp());
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
// h3 back up and ahead
heartbeatFromMember(
HostAndPort("h3"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(2, 0), 0));
- getTopoCoord().chooseNewSyncSource(now()++, OpTime());
+ getTopoCoord().chooseNewSyncSource(now()++, Timestamp());
ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
}
@@ -305,7 +305,7 @@ TEST_F(TopoCoordTest, ChooseSyncSourceCandidates) {
0);
setSelfMemberState(MemberState::RS_SECONDARY);
- OpTime lastOpTimeWeApplied = OpTime(Timestamp(100, 0), 0);
+ Timestamp lastOpTimeWeApplied = Timestamp(100, 0);
heartbeatFromMember(HostAndPort("h1"),
"rs0",
@@ -466,7 +466,7 @@ TEST_F(TopoCoordTest, ChooseSyncSourceChainingNotAllowed) {
Milliseconds(300));
// No primary situation: should choose no sync source.
- getTopoCoord().chooseNewSyncSource(now()++, OpTime());
+ getTopoCoord().chooseNewSyncSource(now()++, Timestamp());
ASSERT(getTopoCoord().getSyncSourceAddress().empty());
// Add primary
@@ -480,7 +480,7 @@ TEST_F(TopoCoordTest, ChooseSyncSourceChainingNotAllowed) {
// h3 is primary and should be chosen as sync source, despite being further away than h2
// and the primary (h3) being behind our most recently applied optime
- getTopoCoord().chooseNewSyncSource(now()++, OpTime(Timestamp(10, 0), 0));
+ getTopoCoord().chooseNewSyncSource(now()++, Timestamp(10, 0));
ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
}
@@ -519,7 +519,7 @@ TEST_F(TopoCoordTest, EmptySyncSourceOnPrimary) {
Milliseconds(300));
// No primary situation: should choose h2 sync source.
- getTopoCoord().chooseNewSyncSource(now()++, OpTime());
+ getTopoCoord().chooseNewSyncSource(now()++, Timestamp());
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
// Become primary
@@ -566,18 +566,18 @@ TEST_F(TopoCoordTest, ForceSyncSource) {
Milliseconds(100));
// force should overrule other defaults
- getTopoCoord().chooseNewSyncSource(now()++, OpTime());
+ getTopoCoord().chooseNewSyncSource(now()++, Timestamp());
ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
getTopoCoord().setForceSyncSourceIndex(1);
// force should cause shouldChangeSyncSource() to return true
// even if the currentSource is the force target
ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(HostAndPort("h2"), now()));
ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(HostAndPort("h3"), now()));
- getTopoCoord().chooseNewSyncSource(now()++, OpTime());
+ getTopoCoord().chooseNewSyncSource(now()++, Timestamp());
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
// force should only work for one call to chooseNewSyncSource
- getTopoCoord().chooseNewSyncSource(now()++, OpTime());
+ getTopoCoord().chooseNewSyncSource(now()++, Timestamp());
ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
}
@@ -615,17 +615,17 @@ TEST_F(TopoCoordTest, BlacklistSyncSource) {
OpTime(Timestamp(2, 0), 0),
Milliseconds(100));
- getTopoCoord().chooseNewSyncSource(now()++, OpTime());
+ getTopoCoord().chooseNewSyncSource(now()++, Timestamp());
ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
Date_t expireTime = Date_t::fromMillisSinceEpoch(1000);
getTopoCoord().blacklistSyncSource(HostAndPort("h3"), expireTime);
- getTopoCoord().chooseNewSyncSource(now()++, OpTime());
+ getTopoCoord().chooseNewSyncSource(now()++, Timestamp());
// Should choose second best choice now that h3 is blacklisted.
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
// After time has passed, should go back to original sync source
- getTopoCoord().chooseNewSyncSource(expireTime, OpTime());
+ getTopoCoord().chooseNewSyncSource(expireTime, Timestamp());
ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
}
@@ -666,17 +666,17 @@ TEST_F(TopoCoordTest, BlacklistSyncSourceNoChaining) {
OpTime(Timestamp(2, 0), 0),
Milliseconds(100));
- getTopoCoord().chooseNewSyncSource(now()++, OpTime());
+ getTopoCoord().chooseNewSyncSource(now()++, Timestamp());
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
Date_t expireTime = Date_t::fromMillisSinceEpoch(1000);
getTopoCoord().blacklistSyncSource(HostAndPort("h2"), expireTime);
- getTopoCoord().chooseNewSyncSource(now()++, OpTime());
+ getTopoCoord().chooseNewSyncSource(now()++, Timestamp());
// Can't choose any sync source now.
ASSERT(getTopoCoord().getSyncSourceAddress().empty());
// After time has passed, should go back to the primary
- getTopoCoord().chooseNewSyncSource(expireTime, OpTime());
+ getTopoCoord().chooseNewSyncSource(expireTime, Timestamp());
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
}
@@ -715,20 +715,20 @@ TEST_F(TopoCoordTest, OnlyUnauthorizedUpCausesRecovering) {
OpTime(Timestamp(2, 0), 0),
Milliseconds(100));
- ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().chooseNewSyncSource(now()++, OpTime()));
+ ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().chooseNewSyncSource(now()++, Timestamp()));
ASSERT_EQUALS(MemberState::RS_SECONDARY, getTopoCoord().getMemberState().s);
// Good state setup done
// Mark nodes down, ensure that we have no source and are secondary
receiveDownHeartbeat(HostAndPort("h2"), "rs0", OpTime(), ErrorCodes::NetworkTimeout);
receiveDownHeartbeat(HostAndPort("h3"), "rs0", OpTime(), ErrorCodes::NetworkTimeout);
- ASSERT_TRUE(getTopoCoord().chooseNewSyncSource(now()++, OpTime()).empty());
+ ASSERT_TRUE(getTopoCoord().chooseNewSyncSource(now()++, Timestamp()).empty());
ASSERT_EQUALS(MemberState::RS_SECONDARY, getTopoCoord().getMemberState().s);
// Mark nodes down + unauth, ensure that we have no source and are secondary
receiveDownHeartbeat(HostAndPort("h2"), "rs0", OpTime(), ErrorCodes::NetworkTimeout);
receiveDownHeartbeat(HostAndPort("h3"), "rs0", OpTime(), ErrorCodes::Unauthorized);
- ASSERT_TRUE(getTopoCoord().chooseNewSyncSource(now()++, OpTime()).empty());
+ ASSERT_TRUE(getTopoCoord().chooseNewSyncSource(now()++, Timestamp()).empty());
ASSERT_EQUALS(MemberState::RS_RECOVERING, getTopoCoord().getMemberState().s);
// Having an auth error but with another node up should bring us out of RECOVERING
@@ -872,7 +872,7 @@ TEST_F(TopoCoordTest, PrepareSyncFromResponse) {
ASSERT_OK(result);
ASSERT_EQUALS("requested member \"h5:27017\" is more than 10 seconds behind us",
response8.obj()["warning"].String());
- getTopoCoord().chooseNewSyncSource(now()++, ourOpTime);
+ getTopoCoord().chooseNewSyncSource(now()++, ourOpTime.getTimestamp());
ASSERT_EQUALS(HostAndPort("h5"), getTopoCoord().getSyncSourceAddress());
// Sync successfully from an up-to-date member
@@ -886,7 +886,7 @@ TEST_F(TopoCoordTest, PrepareSyncFromResponse) {
BSONObj response9Obj = response9.obj();
ASSERT_FALSE(response9Obj.hasField("warning"));
ASSERT_EQUALS(HostAndPort("h5").toString(), response9Obj["prevSyncTarget"].String());
- getTopoCoord().chooseNewSyncSource(now()++, ourOpTime);
+ getTopoCoord().chooseNewSyncSource(now()++, ourOpTime.getTimestamp());
ASSERT_EQUALS(HostAndPort("h6"), getTopoCoord().getSyncSourceAddress());
// node goes down between forceSync and chooseNewSyncSource
@@ -897,7 +897,7 @@ TEST_F(TopoCoordTest, PrepareSyncFromResponse) {
ASSERT_FALSE(response10Obj.hasField("warning"));
ASSERT_EQUALS(HostAndPort("h6").toString(), response10Obj["prevSyncTarget"].String());
receiveDownHeartbeat(HostAndPort("h6"), "rs0", OpTime());
- HostAndPort syncSource = getTopoCoord().chooseNewSyncSource(now()++, OpTime());
+ HostAndPort syncSource = getTopoCoord().chooseNewSyncSource(now()++, Timestamp());
ASSERT_EQUALS(HostAndPort("h6"), syncSource);
// Try to sync from a member that is unauth'd
@@ -917,7 +917,7 @@ TEST_F(TopoCoordTest, PrepareSyncFromResponse) {
getTopoCoord().prepareSyncFromResponse(
cbData(), HostAndPort("h6"), ourOpTime, &response12, &result);
ASSERT_OK(result);
- syncSource = getTopoCoord().chooseNewSyncSource(now()++, OpTime());
+ syncSource = getTopoCoord().chooseNewSyncSource(now()++, Timestamp());
ASSERT_EQUALS(HostAndPort("h6"), syncSource);
}
@@ -3636,7 +3636,7 @@ TEST_F(PrepareHeartbeatResponseV1Test, PrepareHeartbeatResponseWithSyncSource) {
HostAndPort("h2"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(1, 0), 0));
heartbeatFromMember(
HostAndPort("h2"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(1, 0), 0));
- getTopoCoord().chooseNewSyncSource(now()++, OpTime());
+ getTopoCoord().chooseNewSyncSource(now()++, Timestamp());
// set up args
ReplSetHeartbeatArgsV1 args;
@@ -3909,7 +3909,7 @@ TEST_F(PrepareHeartbeatResponseTest, PrepareHeartbeatResponseWithSyncSource) {
HostAndPort("h2"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(1, 0), 0));
heartbeatFromMember(
HostAndPort("h2"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(1, 0), 0));
- getTopoCoord().chooseNewSyncSource(now()++, OpTime());
+ getTopoCoord().chooseNewSyncSource(now()++, Timestamp());
// set up args
ReplSetHeartbeatArgs args;