summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSiyuan Zhou <siyuan.zhou@mongodb.com>2016-06-01 14:01:21 -0400
committerSiyuan Zhou <siyuan.zhou@mongodb.com>2016-06-01 15:56:27 -0400
commit504c299109fd72b3c9155d7bd4e5a41e800cd457 (patch)
tree0116f87f1be1b1365c3cdb65a12ed8fbddd4fefe
parent190c3c700b994ef5f92c13c9290c6700b63f4854 (diff)
downloadmongo-504c299109fd72b3c9155d7bd4e5a41e800cd457.tar.gz
SERVER-24222 Update current known primary from command metadata
This reverts commit ed3f25ced04931525db8e2f11f8bdef7bf49992a. Fixed replication legacy test suite.
-rw-r--r--src/mongo/db/repl/bgsync.cpp11
-rw-r--r--src/mongo/db/repl/data_replicator_external_state.h3
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_impl.cpp10
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_impl.h3
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_mock.cpp7
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_mock.h3
-rw-r--r--src/mongo/db/repl/data_replicator_test.cpp15
-rw-r--r--src/mongo/db/repl/oplog_fetcher.cpp52
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp10
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h3
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp3
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.h3
-rw-r--r--src/mongo/db/repl/sync_source_selector.h11
-rw-r--r--src/mongo/db/repl/topology_coordinator.h3
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.cpp21
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.h3
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl_test.cpp47
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp66
-rw-r--r--src/mongo/db/repl/update_position_args.h2
-rw-r--r--src/mongo/rpc/metadata/repl_set_metadata.h4
20 files changed, 146 insertions, 134 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 5146e0ea1bc..144f14f5fbe 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -85,8 +85,7 @@ public:
ReplicationCoordinatorExternalState* replicationCoordinatorExternalState,
BackgroundSync* bgsync);
bool shouldStopFetching(const HostAndPort& source,
- const OpTime& sourceOpTime,
- bool sourceHasSyncSource) override;
+ const rpc::ReplSetMetadata& metadata) override;
private:
BackgroundSync* _bgsync;
@@ -99,15 +98,13 @@ DataReplicatorExternalStateBackgroundSync::DataReplicatorExternalStateBackground
: DataReplicatorExternalStateImpl(replicationCoordinator, replicationCoordinatorExternalState),
_bgsync(bgsync) {}
-bool DataReplicatorExternalStateBackgroundSync::shouldStopFetching(const HostAndPort& source,
- const OpTime& sourceOpTime,
- bool sourceHasSyncSource) {
+bool DataReplicatorExternalStateBackgroundSync::shouldStopFetching(
+ const HostAndPort& source, const rpc::ReplSetMetadata& metadata) {
if (_bgsync->shouldStopFetching()) {
return true;
}
- return DataReplicatorExternalStateImpl::shouldStopFetching(
- source, sourceOpTime, sourceHasSyncSource);
+ return DataReplicatorExternalStateImpl::shouldStopFetching(source, metadata);
}
/**
diff --git a/src/mongo/db/repl/data_replicator_external_state.h b/src/mongo/db/repl/data_replicator_external_state.h
index d19f46f9711..848af9c720e 100644
--- a/src/mongo/db/repl/data_replicator_external_state.h
+++ b/src/mongo/db/repl/data_replicator_external_state.h
@@ -77,8 +77,7 @@ public:
* metadata).
*/
virtual bool shouldStopFetching(const HostAndPort& source,
- const OpTime& sourceOpTime,
- bool sourceHasSyncSource) = 0;
+ const rpc::ReplSetMetadata& metadata) = 0;
private:
/**
diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.cpp b/src/mongo/db/repl/data_replicator_external_state_impl.cpp
index 558f3faf700..87b5993b231 100644
--- a/src/mongo/db/repl/data_replicator_external_state_impl.cpp
+++ b/src/mongo/db/repl/data_replicator_external_state_impl.cpp
@@ -60,14 +60,12 @@ void DataReplicatorExternalStateImpl::processMetadata(const rpc::ReplSetMetadata
}
bool DataReplicatorExternalStateImpl::shouldStopFetching(const HostAndPort& source,
- const OpTime& sourceOpTime,
- bool sourceHasSyncSource) {
+ const rpc::ReplSetMetadata& metadata) {
// Re-evaluate quality of sync target.
- if (_replicationCoordinator->shouldChangeSyncSource(
- source, sourceOpTime, sourceHasSyncSource)) {
+ if (_replicationCoordinator->shouldChangeSyncSource(source, metadata)) {
LOG(1) << "Canceling oplog query because we have to choose a sync source. Current source: "
- << source << ", OpTime " << sourceOpTime
- << ", hasSyncSource:" << sourceHasSyncSource;
+ << source << ", OpTime " << metadata.getLastOpVisible()
+ << ", its sync source index:" << metadata.getSyncSourceIndex();
return true;
}
return false;
diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.h b/src/mongo/db/repl/data_replicator_external_state_impl.h
index 25a09e1d7db..e94dccdab32 100644
--- a/src/mongo/db/repl/data_replicator_external_state_impl.h
+++ b/src/mongo/db/repl/data_replicator_external_state_impl.h
@@ -51,8 +51,7 @@ public:
void processMetadata(const rpc::ReplSetMetadata& metadata) override;
bool shouldStopFetching(const HostAndPort& source,
- const OpTime& sourceOpTime,
- bool sourceHasSyncSource) override;
+ const rpc::ReplSetMetadata& metadata) override;
private:
StatusWith<OpTime> _multiApply(OperationContext* txn,
diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.cpp b/src/mongo/db/repl/data_replicator_external_state_mock.cpp
index 83a6e3fd83c..2ec80fad216 100644
--- a/src/mongo/db/repl/data_replicator_external_state_mock.cpp
+++ b/src/mongo/db/repl/data_replicator_external_state_mock.cpp
@@ -47,11 +47,10 @@ void DataReplicatorExternalStateMock::processMetadata(const rpc::ReplSetMetadata
}
bool DataReplicatorExternalStateMock::shouldStopFetching(const HostAndPort& source,
- const OpTime& sourceOpTime,
- bool sourceHasSyncSource) {
+ const rpc::ReplSetMetadata& metadata) {
lastSyncSourceChecked = source;
- syncSourceLastOpTime = sourceOpTime;
- syncSourceHasSyncSource = sourceHasSyncSource;
+ syncSourceLastOpTime = metadata.getLastOpVisible();
+ syncSourceHasSyncSource = metadata.getSyncSourceIndex() != -1;
return shouldStopFetchingResult;
}
diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.h b/src/mongo/db/repl/data_replicator_external_state_mock.h
index 4705fb57bd3..6335552f4b8 100644
--- a/src/mongo/db/repl/data_replicator_external_state_mock.h
+++ b/src/mongo/db/repl/data_replicator_external_state_mock.h
@@ -48,8 +48,7 @@ public:
void processMetadata(const rpc::ReplSetMetadata& metadata) override;
bool shouldStopFetching(const HostAndPort& source,
- const OpTime& sourceOpTime,
- bool sourceHasSyncSource) override;
+ const rpc::ReplSetMetadata& metadata) override;
// Returned by getCurrentTermAndLastCommittedOpTime.
long long currentTerm = OpTime::kUninitializedTerm;
diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp
index 294beba46ed..43e42f7cc5e 100644
--- a/src/mongo/db/repl/data_replicator_test.cpp
+++ b/src/mongo/db/repl/data_replicator_test.cpp
@@ -83,8 +83,7 @@ public:
_blacklistedSource = host;
}
bool shouldChangeSyncSource(const HostAndPort& currentSource,
- const OpTime& sourcesOpTime,
- bool syncSourceHasSyncSource) override {
+ const rpc::ReplSetMetadata& metadata) override {
return false;
}
SyncSourceResolverResponse selectSyncSource(OperationContext* txn,
@@ -126,10 +125,8 @@ public:
_syncSourceSelector->blacklistSyncSource(host, until);
}
bool shouldChangeSyncSource(const HostAndPort& currentSource,
- const OpTime& sourcesOpTime,
- bool syncSourceHasSyncSource) override {
- return _syncSourceSelector->shouldChangeSyncSource(
- currentSource, sourcesOpTime, syncSourceHasSyncSource);
+ const rpc::ReplSetMetadata& metadata) override {
+ return _syncSourceSelector->shouldChangeSyncSource(currentSource, metadata);
}
SyncSourceResolverResponse selectSyncSource(OperationContext* txn,
const OpTime& lastOpTimeFetched) override {
@@ -718,8 +715,7 @@ public:
_blacklistedSource = host;
}
bool shouldChangeSyncSource(const HostAndPort& currentSource,
- const OpTime& sourcesOpTime,
- bool syncSourceHasSyncSource) override {
+ const rpc::ReplSetMetadata& metadata) override {
return false;
}
SyncSourceResolverResponse selectSyncSource(OperationContext* txn,
@@ -850,8 +846,7 @@ public:
}
void blacklistSyncSource(const HostAndPort& host, Date_t until) override {}
bool shouldChangeSyncSource(const HostAndPort& currentSource,
- const OpTime& sourcesOpTime,
- bool syncSourceHasSyncSource) override {
+ const rpc::ReplSetMetadata& metadata) override {
return false;
}
SyncSourceResolverResponse selectSyncSource(OperationContext* txn,
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp
index fcdc7819c72..f43e53817f7 100644
--- a/src/mongo/db/repl/oplog_fetcher.cpp
+++ b/src/mongo/db/repl/oplog_fetcher.cpp
@@ -50,10 +50,8 @@ namespace {
* Calculates await data timeout based on the current replica set configuration.
*/
Milliseconds calculateAwaitDataTimeout(const ReplicaSetConfig& config) {
- // Under protocol version 1, make the awaitData timeout (maxTimeMS) dependent
- // on the election
- // timeout. This enables the sync source to communicate liveness of the
- // primary to secondaries.
+ // Under protocol version 1, make the awaitData timeout (maxTimeMS) dependent on the election
+ // timeout. This enables the sync source to communicate liveness of the primary to secondaries.
// Under protocol version 0, use a default timeout of 2 seconds for awaitData.
if (config.getProtocolVersion() == 1LL) {
return config.getElectionTimeoutPeriod() / 2;
@@ -111,19 +109,15 @@ StatusWith<BSONObj> makeMetadataObject(bool isV1ElectionProtocol) {
/**
* Checks the first batch of results from query.
- * 'documents' are the first batch of results returned from tailing the remote
- * oplog.
- * 'lastFetched' optime and hash should be consistent with the predicate in the
- * query.
+ * 'documents' are the first batch of results returned from tailing the remote oplog.
+ * 'lastFetched' optime and hash should be consistent with the predicate in the query.
* Returns RemoteOplogStale if the oplog query has no results.
- * Returns OplogStartMissing if we cannot find the optime of the last fetched
- * operation in
+ * Returns OplogStartMissing if we cannot find the optime of the last fetched operation in
* the remote oplog.
*/
Status checkRemoteOplogStart(const Fetcher::Documents& documents, OpTimeWithHash lastFetched) {
if (documents.empty()) {
- // The GTE query from upstream returns nothing, so we're ahead of the
- // upstream.
+ // The GTE query from upstream returns nothing, so we're ahead of the upstream.
return Status(ErrorCodes::RemoteOplogStale,
str::stream() << "We are ahead of the sync source. Our last op time fetched: "
<< lastFetched.opTime.toString());
@@ -176,8 +170,7 @@ StatusWith<OplogFetcher::DocumentsInfo> OplogFetcher::validateDocuments(
info.networkDocumentBytes += doc.objsize();
++info.networkDocumentCount;
- // If this is the first response (to the $gte query) then we already applied
- // the first doc.
+ // If this is the first response (to the $gte query) then we already applied the first doc.
if (first && info.networkDocumentCount == 1U) {
continue;
}
@@ -208,8 +201,7 @@ StatusWith<OplogFetcher::DocumentsInfo> OplogFetcher::validateDocuments(
info.toApplyDocumentCount = documents.size();
info.toApplyDocumentBytes = info.networkDocumentBytes;
if (first) {
- // The count is one less since the first document found was already applied
- // ($gte $ts query)
+ // The count is one less since the first document found was already applied ($gte $ts query)
// and we will not apply it again.
--info.toApplyDocumentCount;
auto alreadyAppliedDocument = documents.cbegin();
@@ -302,11 +294,9 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result,
}
const auto& queryResponse = result.getValue();
- OpTime sourcesLastOpTime;
- bool syncSourceHasSyncSource = false;
+ rpc::ReplSetMetadata metadata;
- // Forward metadata (containing liveness information) to data replicator
- // external state.
+ // Forward metadata (containing liveness information) to data replicator external state.
bool receivedMetadata =
queryResponse.otherFields.metadata.hasElement(rpc::kReplSetMetadataFieldName);
if (receivedMetadata) {
@@ -318,10 +308,8 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result,
_onShutdown(metadataResult.getStatus());
return;
}
- auto metadata = metadataResult.getValue();
+ metadata = metadataResult.getValue();
_dataReplicatorExternalState->processMetadata(metadata);
- sourcesLastOpTime = metadata.getLastOpVisible();
- syncSourceHasSyncSource = metadata.getSyncSourceIndex() != -1;
}
const auto& documents = queryResponse.documents;
@@ -337,8 +325,7 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result,
auto opTimeWithHash = getLastOpTimeWithHashFetched();
- // Check start of remote oplog and, if necessary, stop fetcher to execute
- // rollback.
+ // Check start of remote oplog and, if necessary, stop fetcher to execute rollback.
if (queryResponse.first) {
auto status = checkRemoteOplogStart(documents, opTimeWithHash);
if (!status.isOK()) {
@@ -347,8 +334,7 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result,
return;
}
- // If this is the first batch and no rollback is needed, skip the first
- // document.
+ // If this is the first batch and no rollback is needed, skip the first document.
firstDocToApply++;
}
@@ -373,14 +359,15 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result,
_lastFetched = opTimeWithHash;
}
- if (_dataReplicatorExternalState->shouldStopFetching(
- _fetcher.getSource(), sourcesLastOpTime, syncSourceHasSyncSource)) {
+ if (_dataReplicatorExternalState->shouldStopFetching(_fetcher.getSource(), metadata)) {
_onShutdown(Status(ErrorCodes::InvalidSyncSource,
str::stream() << "sync source " << _fetcher.getSource().toString()
<< " (last optime: "
- << sourcesLastOpTime.toString()
- << "; has sync source: "
- << syncSourceHasSyncSource
+ << metadata.getLastOpVisible().toString()
+ << "; sync source index: "
+ << metadata.getSyncSourceIndex()
+ << "; primary index: "
+ << metadata.getPrimaryIndex()
<< ") is no longer valid"),
opTimeWithHash);
return;
@@ -407,5 +394,6 @@ void OplogFetcher::_onShutdown(Status status, OpTimeWithHash opTimeWithHash) {
_onShutdownCallbackFn(status, opTimeWithHash);
}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 36d10aae018..b6c89737b5f 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -2901,14 +2901,10 @@ void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* txn
}
bool ReplicationCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentSource,
- const OpTime& syncSourceLastOpTime,
- bool syncSourceHasSyncSource) {
+ const rpc::ReplSetMetadata& metadata) {
LockGuard topoLock(_topoMutex);
- return _topCoord->shouldChangeSyncSource(currentSource,
- getMyLastAppliedOpTime(),
- syncSourceLastOpTime,
- syncSourceHasSyncSource,
- _replExecutor.now());
+ return _topCoord->shouldChangeSyncSource(
+ currentSource, getMyLastAppliedOpTime(), metadata, _replExecutor.now());
}
SyncSourceResolverResponse ReplicationCoordinatorImpl::selectSyncSource(
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index 93979a0dd9c..bfa93ac3459 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -275,8 +275,7 @@ public:
virtual void resetLastOpTimesFromOplog(OperationContext* txn) override;
virtual bool shouldChangeSyncSource(const HostAndPort& currentSource,
- const OpTime& syncSourceLastOpTime,
- bool syncSourceHasSyncSource) override;
+ const rpc::ReplSetMetadata& metadata) override;
virtual SyncSourceResolverResponse selectSyncSource(OperationContext* txn,
const OpTime& lastOpTimeFetched) override;
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index 706a5cd6747..75efdf63845 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -357,8 +357,7 @@ void ReplicationCoordinatorMock::resetLastOpTimesFromOplog(OperationContext* txn
}
bool ReplicationCoordinatorMock::shouldChangeSyncSource(const HostAndPort& currentSource,
- const OpTime& syncSourceLastOpTime,
- bool syncSourceHasSyncSource) {
+ const rpc::ReplSetMetadata& metadata) {
invariant(false);
}
diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h
index 15e7c513727..3ace717f8c2 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_mock.h
@@ -208,8 +208,7 @@ public:
virtual void resetLastOpTimesFromOplog(OperationContext* txn);
virtual bool shouldChangeSyncSource(const HostAndPort& currentSource,
- const OpTime& syncSourceLastOpTime,
- bool syncSourceHasSyncSource);
+ const rpc::ReplSetMetadata& metadata);
virtual OpTime getLastCommittedOpTime() const;
diff --git a/src/mongo/db/repl/sync_source_selector.h b/src/mongo/db/repl/sync_source_selector.h
index eb1b19f3764..2b3c8b903e7 100644
--- a/src/mongo/db/repl/sync_source_selector.h
+++ b/src/mongo/db/repl/sync_source_selector.h
@@ -37,6 +37,10 @@ namespace mongo {
class OperationContext;
class Timestamp;
+namespace rpc {
+class ReplSetMetadata;
+}
+
namespace repl {
class OpTime;
@@ -69,17 +73,16 @@ public:
/**
* Determines if a new sync source should be chosen, if a better candidate sync source is
- * available. If the current sync source's last optime ("syncSourceLastOpTime" under
+ * available. If the current sync source's last optime (visibleOpTime of metadata under
* protocolVersion 1, but pulled from the MemberHeartbeatData in protocolVersion 0) is more than
* _maxSyncSourceLagSecs behind any syncable source, this function returns true. If we are
* running in ProtocolVersion 1, our current sync source is not primary, has no sync source
- * ("syncSourceHasSyncSource" is false), and only has data up to "myLastOpTime", returns true.
+ * and only has data up to "myLastOpTime", returns true.
*
* "now" is used to skip over currently blacklisted sync sources.
*/
virtual bool shouldChangeSyncSource(const HostAndPort& currentSource,
- const OpTime& syncSourceLastOpTime,
- bool syncSourceHasSyncSource) = 0;
+ const rpc::ReplSetMetadata& metadata) = 0;
/**
* Returns a SyncSourceResolverResponse containing the syncSource or a new MinValid boundry as
diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h
index 43bcc71f909..1548cb774a9 100644
--- a/src/mongo/db/repl/topology_coordinator.h
+++ b/src/mongo/db/repl/topology_coordinator.h
@@ -165,8 +165,7 @@ public:
*/
virtual bool shouldChangeSyncSource(const HostAndPort& currentSource,
const OpTime& myLastOpTime,
- const OpTime& syncSourceLastOpTime,
- bool syncSourceHasSyncSource,
+ const rpc::ReplSetMetadata& metadata,
Date_t now) const = 0;
/**
diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp
index a4d7c3c4934..fb3c5aaec98 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl.cpp
@@ -2300,10 +2300,11 @@ long long TopologyCoordinatorImpl::getTerm() {
return _term;
}
+// TODO(siyuan): Merge _hddata into _slaveInfo, so that we have a single view of the
+// replset. Passing metadata is unnecessary.
bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentSource,
const OpTime& myLastOpTime,
- const OpTime& syncSourceLastOpTime,
- bool syncSourceHasSyncSource,
+ const rpc::ReplSetMetadata& metadata,
Date_t now) const {
// Methodology:
// If there exists a viable sync source member other than currentSource, whose oplog has
@@ -2317,14 +2318,21 @@ bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentS
return true;
}
+ if (_rsConfig.getProtocolVersion() == 1 &&
+ metadata.getConfigVersion() != _rsConfig.getConfigVersion()) {
+ return true;
+ }
+
const int currentSourceIndex = _rsConfig.findMemberIndexByHostAndPort(currentSource);
+ // PV0 doesn't use metadata, we have to consult _rsConfig.
if (currentSourceIndex == -1) {
return true;
}
+
invariant(currentSourceIndex != _selfIndex);
OpTime currentSourceOpTime =
- std::max(syncSourceLastOpTime, _hbdata.at(currentSourceIndex).getAppliedOpTime());
+ std::max(metadata.getLastOpVisible(), _hbdata.at(currentSourceIndex).getAppliedOpTime());
if (currentSourceOpTime.isNull()) {
// Haven't received a heartbeat from the sync source yet, so can't tell if we should
@@ -2332,9 +2340,10 @@ bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentS
return false;
}
- if (_rsConfig.getProtocolVersion() == 1 && !syncSourceHasSyncSource &&
- currentSourceOpTime <= myLastOpTime &&
- _hbdata.at(currentSourceIndex).getState() != MemberState::RS_PRIMARY) {
+ // Change sync source if they are not ahead of us, and don't have a sync source,
+ // unless they are primary.
+ if (_rsConfig.getProtocolVersion() == 1 && metadata.getSyncSourceIndex() == -1 &&
+ currentSourceOpTime <= myLastOpTime && metadata.getPrimaryIndex() != currentSourceIndex) {
return true;
}
diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h
index 6ae0b62788d..d3eb233acb8 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.h
+++ b/src/mongo/db/repl/topology_coordinator_impl.h
@@ -157,8 +157,7 @@ public:
virtual void clearSyncSourceBlacklist();
virtual bool shouldChangeSyncSource(const HostAndPort& currentSource,
const OpTime& myLastOpTime,
- const OpTime& syncSourceLastOpTime,
- bool syncSourceHasSyncSource,
+ const rpc::ReplSetMetadata& metadata,
Date_t now) const;
virtual bool becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(Date_t now);
virtual void setElectionSleepUntil(Date_t newTime);
diff --git a/src/mongo/db/repl/topology_coordinator_impl_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_test.cpp
index 0ec6363a09d..94fbfb96b58 100644
--- a/src/mongo/db/repl/topology_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl_test.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/repl/topology_coordinator_impl.h"
#include "mongo/db/server_options.h"
#include "mongo/logger/logger.h"
+#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/s/catalog/catalog_manager.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/assert_util.h"
@@ -51,6 +52,7 @@
ASSERT_EQUALS(mongo::repl::HeartbeatResponseAction::NoAction, (EXPRESSION))
using std::unique_ptr;
+using mongo::rpc::ReplSetMetadata;
namespace mongo {
namespace repl {
@@ -146,6 +148,12 @@ protected:
_currentConfig = config;
}
+ // Make the metadata coming from sync source. Only set visibleOpTime.
+ ReplSetMetadata makeMetadata(OpTime opTime = OpTime()) {
+ return ReplSetMetadata(
+ _topo->getTerm(), OpTime(), opTime, _currentConfig.getConfigVersion(), OID(), -1, -1);
+ }
+
HeartbeatResponseAction receiveUpHeartbeat(const HostAndPort& member,
const std::string& setName,
MemberState memberState,
@@ -656,9 +664,9 @@ TEST_F(TopoCoordTest, ChooseRequestedSyncSourceOnlyTheFirstTimeAfterTheSyncSourc
// force should cause shouldChangeSyncSource() to return true
// even if the currentSource is the force target
ASSERT_TRUE(
- getTopoCoord().shouldChangeSyncSource(HostAndPort("h2"), OpTime(), OpTime(), false, now()));
+ getTopoCoord().shouldChangeSyncSource(HostAndPort("h2"), OpTime(), makeMetadata(), now()));
ASSERT_TRUE(
- getTopoCoord().shouldChangeSyncSource(HostAndPort("h3"), OpTime(), OpTime(), false, now()));
+ getTopoCoord().shouldChangeSyncSource(HostAndPort("h3"), OpTime(), makeMetadata(), now()));
getTopoCoord().chooseNewSyncSource(now()++, Timestamp());
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
@@ -5217,9 +5225,10 @@ TEST_F(HeartbeatResponseTest, ReconfigNodeRemovedBetweenHeartbeatRequestAndRepso
TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceWhenMemberNotInConfig) {
// In this test, the TopologyCoordinator should tell us to change sync sources away from
- // "host4" since "host4" is absent from the config
- ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host4"), OpTime(), OpTime(), false, now()));
+ // "host4" since "host4" is absent from the config of version 10.
+ ReplSetMetadata metadata(0, OpTime(), OpTime(), 10, OID(), -1, -1);
+ ASSERT_TRUE(
+ getTopoCoord().shouldChangeSyncSource(HostAndPort("host4"), OpTime(), metadata, now()));
}
TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceWhenMemberHasYetToHeartbeatUs) {
@@ -5227,7 +5236,7 @@ TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceWhenMemberHasYetToHeartbeatU
// "host2" since we do not yet have a heartbeat (and as a result do not yet have an optime)
// for "host2"
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), OpTime(), false, now()));
+ HostAndPort("host2"), OpTime(), makeMetadata(), now()));
}
TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenNodeIsFreshByHeartbeatButNotMetadata) {
@@ -5257,8 +5266,9 @@ TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenNodeIsFreshByHeartbea
// set up complete, time for actual check
startCapturingLogMessages();
- ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now()));
+ auto metadata = makeMetadata(lastOpTimeApplied);
+ ASSERT_FALSE(
+ getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), metadata, now()));
stopCapturingLogMessages();
ASSERT_EQUALS(0, countLogLinesContaining("re-evaluating sync source"));
}
@@ -5290,8 +5300,9 @@ TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenNodeIsStaleByHeartbea
// set up complete, time for actual check
startCapturingLogMessages();
- ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), fresherLastOpTimeApplied, false, now()));
+ auto metadata = makeMetadata(fresherLastOpTimeApplied);
+ ASSERT_FALSE(
+ getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), metadata, now()));
stopCapturingLogMessages();
ASSERT_EQUALS(0, countLogLinesContaining("re-evaluating sync source"));
}
@@ -5323,7 +5334,7 @@ TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceWhenFresherMemberExists) {
// set up complete, time for actual check
startCapturingLogMessages();
ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), OpTime(), false, now()));
+ HostAndPort("host2"), OpTime(), makeMetadata(), now()));
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("re-evaluating sync source"));
}
@@ -5357,18 +5368,18 @@ TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhileFresherMemberIsBlack
// set up complete, time for actual check
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), OpTime(), false, now()));
+ HostAndPort("host2"), OpTime(), makeMetadata(), now()));
// unblacklist with too early a time (node should remained blacklisted)
getTopoCoord().unblacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(90));
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), OpTime(), false, now()));
+ HostAndPort("host2"), OpTime(), makeMetadata(), now()));
// unblacklist and it should succeed
getTopoCoord().unblacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(100));
startCapturingLogMessages();
ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), OpTime(), false, now()));
+ HostAndPort("host2"), OpTime(), makeMetadata(), now()));
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("re-evaluating sync source"));
}
@@ -5402,7 +5413,7 @@ TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenFresherMemberIsDown)
nextAction = receiveDownHeartbeat(HostAndPort("host3"), "rs0", lastOpTimeApplied);
ASSERT_NO_ACTION(nextAction.getAction());
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), OpTime(), false, now()));
+ HostAndPort("host2"), OpTime(), makeMetadata(), now()));
}
TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenFresherMemberIsNotReadable) {
@@ -5432,7 +5443,7 @@ TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenFresherMemberIsNotRea
// set up complete, time for actual check
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), OpTime(), false, now()));
+ HostAndPort("host2"), OpTime(), makeMetadata(), now()));
}
TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenFresherMemberDoesNotBuildIndexes) {
@@ -5477,7 +5488,7 @@ TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenFresherMemberDoesNotB
// set up complete, time for actual check
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), OpTime(), false, now()));
+ HostAndPort("host2"), OpTime(), makeMetadata(), now()));
}
TEST_F(HeartbeatResponseTest,
@@ -5528,7 +5539,7 @@ TEST_F(HeartbeatResponseTest,
// set up complete, time for actual check
startCapturingLogMessages();
ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), OpTime(), false, now()));
+ HostAndPort("host2"), OpTime(), makeMetadata(), now()));
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("re-evaluating sync source"));
}
diff --git a/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp
index 684d39514a9..925e8a364ae 100644
--- a/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/repl/topology_coordinator_impl.h"
#include "mongo/db/server_options.h"
#include "mongo/logger/logger.h"
+#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/s/catalog/catalog_manager.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/assert_util.h"
@@ -52,6 +53,7 @@
ASSERT_EQUALS(mongo::repl::HeartbeatResponseAction::NoAction, (EXPRESSION))
using std::unique_ptr;
+using mongo::rpc::ReplSetMetadata;
namespace mongo {
namespace repl {
@@ -148,6 +150,20 @@ protected:
_currentConfig = config;
}
+ // Make the metadata coming from sync source.
+ // Only set visibleOpTime, primaryIndex and syncSourceIndex
+ ReplSetMetadata makeMetadata(OpTime opTime = OpTime(),
+ int primaryIndex = -1,
+ int syncSourceIndex = -1) {
+ return ReplSetMetadata(_topo->getTerm(),
+ OpTime(),
+ opTime,
+ _currentConfig.getConfigVersion(),
+ OID(),
+ primaryIndex,
+ syncSourceIndex);
+ }
+
HeartbeatResponseAction receiveUpHeartbeat(const HostAndPort& member,
const std::string& setName,
MemberState memberState,
@@ -649,9 +665,9 @@ TEST_F(TopoCoordTest, ChooseRequestedSyncSourceOnlyTheFirstTimeAfterTheSyncSourc
// force should cause shouldChangeSyncSource() to return true
// even if the currentSource is the force target
ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("h2"), OpTime(), oldOpTime, false, now()));
+ HostAndPort("h2"), OpTime(), makeMetadata(oldOpTime), now()));
ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("h3"), OpTime(), newOpTime, false, now()));
+ HostAndPort("h3"), OpTime(), makeMetadata(newOpTime), now()));
getTopoCoord().chooseNewSyncSource(now()++, Timestamp());
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
@@ -3078,7 +3094,7 @@ TEST_F(HeartbeatResponseTestV1,
// set up complete, time for actual check
startCapturingLogMessages();
ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now()));
+ HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now()));
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("re-evaluating sync source"));
}
@@ -3120,7 +3136,7 @@ TEST_F(HeartbeatResponseTestV1,
ASSERT_NO_ACTION(nextAction.getAction());
// Show we like host2 while it is primary.
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), lastOpTimeApplied, lastOpTimeApplied, false, now()));
+ HostAndPort("host2"), lastOpTimeApplied, makeMetadata(lastOpTimeApplied, 1), now()));
// Show that we also like host2 while it has a sync source.
nextAction = receiveUpHeartbeat(HostAndPort("host2"),
@@ -3131,7 +3147,7 @@ TEST_F(HeartbeatResponseTestV1,
lastOpTimeApplied);
ASSERT_NO_ACTION(nextAction.getAction());
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), lastOpTimeApplied, lastOpTimeApplied, true, now()));
+ HostAndPort("host2"), lastOpTimeApplied, makeMetadata(lastOpTimeApplied, 2, 2), now()));
// Show that we do not like it when it is not PRIMARY and lacks a sync source and lacks progress
// beyond our own.
@@ -3143,9 +3159,16 @@ TEST_F(HeartbeatResponseTestV1,
lastOpTimeApplied);
ASSERT_NO_ACTION(nextAction.getAction());
ASSERT(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), lastOpTimeApplied, lastOpTimeApplied, false, now()));
+ HostAndPort("host2"), lastOpTimeApplied, makeMetadata(lastOpTimeApplied), now()));
+
+ // Sometimes the heartbeat is stale and the metadata says it's the primary. Trust the metadata.
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("host2"),
+ lastOpTimeApplied,
+ makeMetadata(lastOpTimeApplied, 1 /* host2 is primary */, -1 /* no sync source */),
+ now()));
- // But if it has some progress beyond our own, we still like it.
+ // But if it is secondary and has some progress beyond our own, we still like it.
OpTime newerThanLastOpTimeApplied = OpTime(Timestamp(500, 0), 0);
nextAction = receiveUpHeartbeat(HostAndPort("host2"),
"rs0",
@@ -3155,7 +3178,7 @@ TEST_F(HeartbeatResponseTestV1,
newerThanLastOpTimeApplied);
ASSERT_NO_ACTION(nextAction.getAction());
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), lastOpTimeApplied, newerThanLastOpTimeApplied, false, now()));
+ HostAndPort("host2"), lastOpTimeApplied, makeMetadata(newerThanLastOpTimeApplied), now()));
}
TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenFresherMemberIsDown) {
@@ -3187,7 +3210,7 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenFresherMemberIsDown
nextAction = receiveDownHeartbeat(HostAndPort("host3"), "rs0", lastOpTimeApplied);
ASSERT_NO_ACTION(nextAction.getAction());
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now()));
+ HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now()));
}
TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhileFresherMemberIsBlackListed) {
@@ -3219,18 +3242,18 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhileFresherMemberIsBla
// set up complete, time for actual check
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now()));
+ HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now()));
// unblacklist with too early a time (node should remained blacklisted)
getTopoCoord().unblacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(90));
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now()));
+ HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now()));
// unblacklist and it should succeed
getTopoCoord().unblacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(100));
startCapturingLogMessages();
ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now()));
+ HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now()));
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("re-evaluating sync source"));
}
@@ -3263,7 +3286,7 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceIfNodeIsFreshByHeartbea
// set up complete, time for actual check
startCapturingLogMessages();
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now()));
+ HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now()));
stopCapturingLogMessages();
ASSERT_EQUALS(0, countLogLinesContaining("re-evaluating sync source"));
}
@@ -3296,7 +3319,7 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceIfNodeIsStaleByHeartbea
// set up complete, time for actual check
startCapturingLogMessages();
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), fresherLastOpTimeApplied, false, now()));
+ HostAndPort("host2"), OpTime(), makeMetadata(fresherLastOpTimeApplied), now()));
stopCapturingLogMessages();
ASSERT_EQUALS(0, countLogLinesContaining("re-evaluating sync source"));
}
@@ -3328,7 +3351,7 @@ TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceWhenFresherMemberExists) {
// set up complete, time for actual check
startCapturingLogMessages();
ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now()));
+ HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now()));
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("re-evaluating sync source"));
}
@@ -3337,14 +3360,15 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenMemberHasYetToHeart
// In this test, the TopologyCoordinator should not tell us to change sync sources away from
// "host2" since we do not use the member's heartbeatdata in pv1.
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), OpTime(), true, now()));
+ HostAndPort("host2"), OpTime(), makeMetadata(), now()));
}
TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenMemberNotInConfig) {
// In this test, the TopologyCoordinator should tell us to change sync sources away from
- // "host4" since "host4" is absent from the config
- ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host4"), OpTime(), OpTime(), true, now()));
+ // "host4" since "host4" is absent from the config of version 10.
+ ReplSetMetadata metadata(0, OpTime(), OpTime(), 10, OID(), -1, -1);
+ ASSERT_TRUE(
+ getTopoCoord().shouldChangeSyncSource(HostAndPort("host4"), OpTime(), metadata, now()));
}
// TODO(dannenberg) figure out what this is trying to test..
@@ -4274,7 +4298,7 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenFresherMemberDoesNo
// set up complete, time for actual check
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), lastOpTimeApplied, true, now()));
+ HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now()));
}
TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenFresherMemberIsNotReadable) {
@@ -4304,7 +4328,7 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenFresherMemberIsNotR
// set up complete, time for actual check
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), lastOpTimeApplied, true, now()));
+ HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now()));
}
class HeartbeatResponseTestOneRetryV1 : public HeartbeatResponseTestV1 {
diff --git a/src/mongo/db/repl/update_position_args.h b/src/mongo/db/repl/update_position_args.h
index bb4a94d8ffb..0acac33dd6f 100644
--- a/src/mongo/db/repl/update_position_args.h
+++ b/src/mongo/db/repl/update_position_args.h
@@ -40,7 +40,7 @@ class Status;
namespace repl {
/**
- * Arguments to the handshake command.
+ * Arguments to the update position command.
*/
class UpdatePositionArgs {
public:
diff --git a/src/mongo/rpc/metadata/repl_set_metadata.h b/src/mongo/rpc/metadata/repl_set_metadata.h
index 24022063a04..a73e3001015 100644
--- a/src/mongo/rpc/metadata/repl_set_metadata.h
+++ b/src/mongo/rpc/metadata/repl_set_metadata.h
@@ -134,8 +134,8 @@ public:
}
private:
- repl::OpTime _lastOpCommitted = repl::OpTime(Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
- repl::OpTime _lastOpVisible = repl::OpTime(Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
+ repl::OpTime _lastOpCommitted;
+ repl::OpTime _lastOpVisible;
long long _currentTerm = -1;
long long _configVersion = -1;
OID _replicaSetId;