summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@10gen.com>2020-04-21 13:16:02 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-04-21 17:54:41 +0000
commit5988fe51d7a18c022b51eda6f7243123b4e9a6ab (patch)
tree6e8fcaba0f9dc427bb586a7ec4100847a80db137
parentd4274059f2070d537d7c62167ab41b4b78f46efd (diff)
downloadmongo-5988fe51d7a18c022b51eda6f7243123b4e9a6ab.tar.gz
SERVER-38731 Implement ability to specify sync source read preference in initial sync
-rwxr-xr-xbuildscripts/errorcodes.py6
-rw-r--r--jstests/replsets/initial_sync_chooses_correct_sync_source.js208
-rw-r--r--src/mongo/client/SConscript1
-rw-r--r--src/mongo/client/read_preference.cpp69
-rw-r--r--src/mongo/client/read_preference.h34
-rw-r--r--src/mongo/client/read_preference.idl56
-rw-r--r--src/mongo/db/repl/SConscript2
-rw-r--r--src/mongo/db/repl/repl_server_parameters.idl16
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp30
-rw-r--r--src/mongo/db/repl/topology_coordinator.cpp249
-rw-r--r--src/mongo/db/repl/topology_coordinator.h38
-rw-r--r--src/mongo/db/repl/topology_coordinator_v1_test.cpp766
12 files changed, 1200 insertions, 275 deletions
diff --git a/buildscripts/errorcodes.py b/buildscripts/errorcodes.py
index d56fadb009a..faf264d0c55 100755
--- a/buildscripts/errorcodes.py
+++ b/buildscripts/errorcodes.py
@@ -28,6 +28,7 @@ ASSERT_NAMES = ["uassert", "massert", "fassert", "fassertFailed"]
MINIMUM_CODE = 10000
# This limit is intended to be increased by 1000 when we get close.
MAXIMUM_CODE = 51999
+FIRST_BACKPORTED_CODE = 3873100
# pylint: disable=invalid-name
codes = [] # type: ignore
@@ -136,7 +137,10 @@ def read_error_codes():
if not code in seen:
seen[code] = assert_loc
# on first occurrence of a specific excessively large code, add to skips, errors
- if int(code) > MAXIMUM_CODE:
+ if int(code) >= FIRST_BACKPORTED_CODE:
+ # Large codes are used in newer versions, so allow them in backports.
+ pass
+ elif int(code) > MAXIMUM_CODE:
skips.append(assert_loc)
errors.append(assert_loc)
elif int(code) > MAXIMUM_CODE - 20:
diff --git a/jstests/replsets/initial_sync_chooses_correct_sync_source.js b/jstests/replsets/initial_sync_chooses_correct_sync_source.js
new file mode 100644
index 00000000000..a6658ff37ba
--- /dev/null
+++ b/jstests/replsets/initial_sync_chooses_correct_sync_source.js
@@ -0,0 +1,208 @@
+/**
+ * Tests that initial sync chooses the correct sync source based on chaining and the
+ * initialSyncReadPreference.
+ *
+ * @tags: [requires_fcv_42]
+ */
+(function() {
+"use strict";
+
+load("jstests/libs/fail_point_util.js");
+
+const delayMillis = 50; // Adds a delay long enough to make a node not the "nearest" sync source.
+const testName = "initial_sync_chooses_correct_sync_source";
+const rst = new ReplSetTest(
+ {name: testName, nodes: [{}, {rsConfig: {priority: 0}}], allowChaining: true, useBridge: true});
+const nodes = rst.startSet();
+rst.initiate();
+
+const primary = rst.getPrimary();
+const primaryDb = primary.getDB("test");
+const secondary = rst.getSecondary();
+
+// Skip validation while bringing the initial sync node up and down, because we don't wait for the
+// sync to complete.
+TestData.skipCollectionAndIndexValidation = true;
+
+// Add some data to be cloned.
+assert.commandWorked(primaryDb.test.insert([{a: 1}, {b: 2}, {c: 3}]));
+rst.awaitReplication();
+
+jsTestLog("Testing chaining enabled, default initialSyncSourceReadPreference, non-voting node");
+// Ensure we see the sync source progress messages.
+TestData.setParameters = TestData.setParameters || {};
+TestData.setParameters.logComponentVerbosity = TestData.setParameters.logComponentVerbosity || {};
+TestData.setParameters.logComponentVerbosity.replication =
+ TestData.setParameters.logComponentVerbosity.replication || {};
+TestData.setParameters.logComponentVerbosity.replication =
+ Object.merge(TestData.setParameters.logComponentVerbosity.replication, {verbosity: 2});
+const initialSyncNode = rst.add({
+ rsConfig: {priority: 0, votes: 0},
+ setParameter: {
+ 'failpoint.initialSyncHangBeforeCreatingOplog': tojson({mode: 'alwaysOn'}),
+ 'numInitialSyncAttempts': 1
+ }
+});
+primary.delayMessagesFrom(initialSyncNode, delayMillis);
+rst.reInitiate();
+
+assert.commandWorked(initialSyncNode.adminCommand({
+ waitForFailPoint: "initialSyncHangBeforeCreatingOplog",
+ timesEntered: 1,
+ maxTimeMS: kDefaultWaitForFailPointTimeout
+}));
+let res = assert.commandWorked(initialSyncNode.adminCommand({replSetGetStatus: 1}));
+
+// With zero votes and a default initialSyncSourceReadPreference, the secondary should be the sync
+// source because this is equivalent to "nearest" and the primary is delayed.
+assert.eq(res.syncSourceHost, secondary.host);
+initialSyncNode.adminCommand(
+ {configureFailPoint: "initialSyncHangBeforeCreatingOplog", mode: "off"});
+
+/*-----------------------------------------------------------------------------------------------*/
+jsTestLog(
+ "Testing chaining enabled, 'primaryPreferred' initialSyncSourceReadPreference, non-voting node");
+rst.restart(initialSyncNode, {
+ startClean: true,
+ setParameter: {
+ 'failpoint.initialSyncHangBeforeCreatingOplog': tojson({mode: 'alwaysOn'}),
+ 'initialSyncSourceReadPreference': 'primaryPreferred',
+ 'numInitialSyncAttempts': 1
+ }
+});
+
+assert.commandWorked(initialSyncNode.adminCommand({
+ waitForFailPoint: "initialSyncHangBeforeCreatingOplog",
+ timesEntered: 1,
+ maxTimeMS: kDefaultWaitForFailPointTimeout
+}));
+res = assert.commandWorked(initialSyncNode.adminCommand({replSetGetStatus: 1}));
+
+// With an initialSyncSourceReadPreference of 'primaryPreferred', the primary should be the sync
+// source even when it is delayed.
+assert.eq(res.syncSourceHost, primary.host);
+initialSyncNode.adminCommand(
+ {configureFailPoint: "initialSyncHangBeforeCreatingOplog", mode: "off"});
+
+/*-----------------------------------------------------------------------------------------------*/
+/* Switch to a configuration with a voting node. */
+/*-----------------------------------------------------------------------------------------------*/
+jsTestLog("Reconfiguring set so initial sync node is voting.");
+let config = rst.getReplSetConfigFromNode();
+config.members[2].votes = 1;
+config.version++;
+assert.commandWorked(primary.adminCommand({replSetReconfig: config}));
+
+jsTestLog("Testing chaining enabled, default initialSyncSourceReadPreference, voting node");
+// Ensure sync source selection is logged.
+rst.restart(initialSyncNode, {
+ startClean: true,
+ setParameter: {
+ 'failpoint.initialSyncHangBeforeCreatingOplog': tojson({mode: 'alwaysOn'}),
+ 'numInitialSyncAttempts': 1
+ }
+});
+
+assert.commandWorked(initialSyncNode.adminCommand({
+ waitForFailPoint: "initialSyncHangBeforeCreatingOplog",
+ timesEntered: 1,
+ maxTimeMS: kDefaultWaitForFailPointTimeout
+}));
+res = assert.commandWorked(initialSyncNode.adminCommand({replSetGetStatus: 1}));
+
+// With a voting node and a default initialSyncSourceReadPreference, the undelayed secondary should
+// be the sync source (in 4.4+, the primary should be the sync source in this case)
+assert.eq(res.syncSourceHost, secondary.host);
+initialSyncNode.adminCommand(
+ {configureFailPoint: "initialSyncHangBeforeCreatingOplog", mode: "off"});
+
+/*-----------------------------------------------------------------------------------------------*/
+jsTestLog(
+ "Testing chaining enabled, 'primaryPreferred' initialSyncSourceReadPreference, voting node");
+rst.restart(initialSyncNode, {
+ startClean: true,
+ setParameter: {
+ 'failpoint.initialSyncHangBeforeCreatingOplog': tojson({mode: 'alwaysOn'}),
+ 'initialSyncSourceReadPreference': 'primaryPreferred',
+ 'numInitialSyncAttempts': 1
+ }
+});
+
+assert.commandWorked(initialSyncNode.adminCommand({
+ waitForFailPoint: "initialSyncHangBeforeCreatingOplog",
+ timesEntered: 1,
+ maxTimeMS: kDefaultWaitForFailPointTimeout
+}));
+res = assert.commandWorked(initialSyncNode.adminCommand({replSetGetStatus: 1}));
+
+// The primary should be chosen when 'primaryPreferred' is used, even though it is delayed.
+assert.eq(res.syncSourceHost, primary.host);
+initialSyncNode.adminCommand(
+ {configureFailPoint: "initialSyncHangBeforeCreatingOplog", mode: "off"});
+
+/*-----------------------------------------------------------------------------------------------*/
+/* Switch back to a non-voting node, and disable chaining also. */
+/*-----------------------------------------------------------------------------------------------*/
+jsTestLog("Reconfiguring set so initial sync node is non-voting and chaining is disabled.");
+config.settings.chainingAllowed = false;
+config.members[2].votes = 0;
+config.version++;
+assert.commandWorked(primary.adminCommand({replSetReconfig: config}));
+
+/*-----------------------------------------------------------------------------------------------*/
+jsTestLog("Testing chaining disabled, default initialSyncSourceReadPreference, non-voting node");
+rst.restart(initialSyncNode, {
+ startClean: true,
+ setParameter: {
+ 'failpoint.initialSyncHangBeforeCreatingOplog': tojson({mode: 'alwaysOn'}),
+ 'numInitialSyncAttempts': 1
+ }
+});
+
+assert.commandWorked(initialSyncNode.adminCommand({
+ waitForFailPoint: "initialSyncHangBeforeCreatingOplog",
+ timesEntered: 1,
+ maxTimeMS: kDefaultWaitForFailPointTimeout
+}));
+res = assert.commandWorked(initialSyncNode.adminCommand({replSetGetStatus: 1}));
+
+// With chaining disabled, the default should be to select the primary even though it is delayed.
+assert.eq(res.syncSourceHost, primary.host);
+initialSyncNode.adminCommand(
+ {configureFailPoint: "initialSyncHangBeforeCreatingOplog", mode: "off"});
+
+/*-----------------------------------------------------------------------------------------------*/
+jsTestLog("Testing chaining disabled, 'nearest' initialSyncSourceReadPreference, non-voting node");
+rst.restart(initialSyncNode, {
+ startClean: true,
+ setParameter: {
+ 'failpoint.initialSyncHangBeforeCreatingOplog': tojson({mode: 'alwaysOn'}),
+ 'initialSyncSourceReadPreference': 'nearest',
+ 'numInitialSyncAttempts': 1
+ }
+});
+
+assert.commandWorked(initialSyncNode.adminCommand({
+ waitForFailPoint: "initialSyncHangBeforeCreatingOplog",
+ timesEntered: 1,
+ maxTimeMS: kDefaultWaitForFailPointTimeout
+}));
+res = assert.commandWorked(initialSyncNode.adminCommand({replSetGetStatus: 1}));
+
+// With chaining disabled, we choose the delayed secondary over the non-delayed primary when
+// readPreference is explicitly 'nearest'.
+assert.eq(res.syncSourceHost, secondary.host);
+initialSyncNode.adminCommand(
+ {configureFailPoint: "initialSyncHangBeforeCreatingOplog", mode: "off"});
+
+// Once we become secondary, the secondary read preference no longer matters and we choose the
+// primary because chaining is disallowed.
+assert.soon(function() {
+ let res = assert.commandWorked(initialSyncNode.adminCommand({replSetGetStatus: 1}));
+ return res.syncSourceHost == primary.host;
+});
+
+primary.delayMessagesFrom(initialSyncNode, 0);
+TestData.skipCollectionAndIndexValidation = false;
+rst.stopSet();
+})();
diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript
index 923ac00d491..a1e79accfec 100644
--- a/src/mongo/client/SConscript
+++ b/src/mongo/client/SConscript
@@ -48,6 +48,7 @@ env.Library(
],
source=[
'read_preference.cpp',
+ env.Idlc('read_preference.idl')[0],
],
LIBDEPS=[
'$BUILD_DIR/mongo/bson/util/bson_extract',
diff --git a/src/mongo/client/read_preference.cpp b/src/mongo/client/read_preference.cpp
index b71825ad395..b5dffdeb43f 100644
--- a/src/mongo/client/read_preference.cpp
+++ b/src/mongo/client/read_preference.cpp
@@ -49,49 +49,6 @@ const char kModeFieldName[] = "mode";
const char kTagsFieldName[] = "tags";
const char kMaxStalenessSecondsFieldName[] = "maxStalenessSeconds";
-const char kPrimaryOnly[] = "primary";
-const char kPrimaryPreferred[] = "primaryPreferred";
-const char kSecondaryOnly[] = "secondary";
-const char kSecondaryPreferred[] = "secondaryPreferred";
-const char kNearest[] = "nearest";
-
-StringData readPreferenceName(ReadPreference pref) {
- switch (pref) {
- case ReadPreference::PrimaryOnly:
- return StringData(kPrimaryOnly);
- case ReadPreference::PrimaryPreferred:
- return StringData(kPrimaryPreferred);
- case ReadPreference::SecondaryOnly:
- return StringData(kSecondaryOnly);
- case ReadPreference::SecondaryPreferred:
- return StringData(kSecondaryPreferred);
- case ReadPreference::Nearest:
- return StringData(kNearest);
- default:
- MONGO_UNREACHABLE;
- }
-}
-
-StatusWith<ReadPreference> parseReadPreferenceMode(StringData prefStr) {
- if (prefStr == kPrimaryOnly) {
- return ReadPreference::PrimaryOnly;
- } else if (prefStr == kPrimaryPreferred) {
- return ReadPreference::PrimaryPreferred;
- } else if (prefStr == kSecondaryOnly) {
- return ReadPreference::SecondaryOnly;
- } else if (prefStr == kSecondaryPreferred) {
- return ReadPreference::SecondaryPreferred;
- } else if (prefStr == kNearest) {
- return ReadPreference::Nearest;
- }
- return Status(ErrorCodes::FailedToParse,
- str::stream() << "Could not parse $readPreference mode '" << prefStr
- << "'. Only the modes '" << kPrimaryOnly << "', '"
- << kPrimaryPreferred << "', '" << kSecondaryOnly << "', '"
- << kSecondaryPreferred << "', and '" << kNearest
- << "' are supported.");
-}
-
// Slight kludge here: if we weren't passed a TagSet, we default to the empty
// TagSet if ReadPreference is Primary, or the default (wildcard) TagSet otherwise.
// This maintains compatibility with existing code, while preserving the ability to round
@@ -107,6 +64,14 @@ TagSet defaultTagSetForMode(ReadPreference mode) {
} // namespace
+Status validateReadPreferenceMode(const std::string& prefStr) {
+ try {
+ ReadPreference_parse(IDLParserErrorContext(kModeFieldName), prefStr);
+ } catch (DBException& e) {
+ return e.toStatus();
+ }
+ return Status::OK();
+}
/**
* Replica set refresh period on the task executor.
@@ -154,11 +119,19 @@ StatusWith<ReadPreferenceSetting> ReadPreferenceSetting::fromInnerBSON(const BSO
}
ReadPreference mode;
- auto swReadPrefMode = parseReadPreferenceMode(modeStr);
- if (!swReadPrefMode.isOK()) {
- return swReadPrefMode.getStatus();
+ try {
+ mode = ReadPreference_parse(IDLParserErrorContext(kModeFieldName), modeStr);
+ } catch (DBException& e) {
+ return e.toStatus().withContext(
+ str::stream() << "Could not parse $readPreference mode '" << modeStr
+ << "'. Only the modes '"
+ << ReadPreference_serializer(ReadPreference::PrimaryOnly) << "', '"
+ << ReadPreference_serializer(ReadPreference::PrimaryPreferred) << "', '"
+ << ReadPreference_serializer(ReadPreference::SecondaryOnly) << "', '"
+ << ReadPreference_serializer(ReadPreference::SecondaryPreferred)
+ << "', and '" << ReadPreference_serializer(ReadPreference::Nearest)
+ << "' are supported.");
}
- mode = std::move(swReadPrefMode.getValue());
TagSet tags;
BSONElement tagsElem;
@@ -243,7 +216,7 @@ StatusWith<ReadPreferenceSetting> ReadPreferenceSetting::fromContainingBSON(
}
void ReadPreferenceSetting::toInnerBSON(BSONObjBuilder* bob) const {
- bob->append(kModeFieldName, readPreferenceName(pref));
+ bob->append(kModeFieldName, ReadPreference_serializer(pref));
if (tags != defaultTagSetForMode(pref)) {
bob->append(kTagsFieldName, tags.getTagBSON());
}
diff --git a/src/mongo/client/read_preference.h b/src/mongo/client/read_preference.h
index f4807110b14..69803e41cf1 100644
--- a/src/mongo/client/read_preference.h
+++ b/src/mongo/client/read_preference.h
@@ -30,6 +30,7 @@
#pragma once
#include "mongo/bson/simple_bsonobj_comparator.h"
+#include "mongo/client/read_preference_gen.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/optime.h"
@@ -39,35 +40,12 @@ namespace mongo {
template <typename T>
class StatusWith;
-enum class ReadPreference {
- /**
- * Read from primary only. All operations produce an error (throw an exception where
- * applicable) if primary is unavailable. Cannot be combined with tags.
- */
- PrimaryOnly = 0,
-
- /**
- * Read from primary if available, otherwise a secondary. Tags will only be applied in the
- * event that the primary is unavailable and a secondary is read from. In this event only
- * secondaries matching the tags provided would be read from.
- */
- PrimaryPreferred,
-
- /**
- * Read from secondary if available, otherwise error.
- */
- SecondaryOnly,
+using ReadPreference = ReadPreferenceEnum;
- /**
- * Read from a secondary if available, otherwise read from the primary.
- */
- SecondaryPreferred,
-
- /**
- * Read from any member.
- */
- Nearest,
-};
+/**
+ * Validate a ReadPreference string. This is intended for use as an IDL validator callback.
+ */
+Status validateReadPreferenceMode(const std::string& prefStr);
/**
* A simple object for representing the list of tags requested by a $readPreference.
diff --git a/src/mongo/client/read_preference.idl b/src/mongo/client/read_preference.idl
new file mode 100644
index 00000000000..d98376baef8
--- /dev/null
+++ b/src/mongo/client/read_preference.idl
@@ -0,0 +1,56 @@
+# Copyright (C) 2020-present MongoDB, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the Server Side Public License, version 1,
+# as published by MongoDB, Inc.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Server Side Public License for more details.
+#
+# You should have received a copy of the Server Side Public License
+# along with this program. If not, see
+# <http://www.mongodb.com/licensing/server-side-public-license>.
+#
+# As a special exception, the copyright holders give permission to link the
+# code of portions of this program with the OpenSSL library under certain
+# conditions as described in each individual source file and distribute
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the Server Side Public License in all respects for
+# all of the code used other than as permitted herein. If you modify file(s)
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
+# delete this exception statement from your version. If you delete this
+# exception statement from all source files in the program, then also delete
+# it in the license file.
+#
+
+global:
+ cpp_namespace: "mongo"
+
+imports:
+ - "mongo/idl/basic_types.idl"
+
+enums:
+ ReadPreference:
+ description: Enumeration representing Read Preference Modes
+ type: string
+ values:
+ # Read from primary only. All operations produce an error (throw an exception where
+ # applicable) if primary is unavailable. Cannot be combined with tags.
+ PrimaryOnly: "primary"
+ #
+ # Read from primary if available, otherwise a secondary. Tags will only be applied in the
+ # event that the primary is unavailable and a secondary is read from. In this event only
+ # secondaries matching the tags provided would be read from.
+ PrimaryPreferred: "primaryPreferred"
+ #
+ # Read from secondary if available, otherwise error.
+ SecondaryOnly: "secondary"
+ #
+ # Read from a secondary if available, otherwise read from the primary.
+ SecondaryPreferred: "secondaryPreferred"
+ #
+ # Read from any member.
+ Nearest: "nearest"
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index e069505779d..a9fd0695f1f 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -10,6 +10,7 @@ env.Library(
env.Idlc('repl_server_parameters.idl')[0]
],
LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/client/read_preference',
'$BUILD_DIR/mongo/idl/server_parameter',
]
)
@@ -925,6 +926,7 @@ env.Library(
'initial_syncer',
'data_replicator_external_state_initial_sync',
'repl_coordinator_interface',
+ 'repl_server_parameters',
'repl_settings',
'replica_set_messages',
'replication_process',
diff --git a/src/mongo/db/repl/repl_server_parameters.idl b/src/mongo/db/repl/repl_server_parameters.idl
index ba56ea21c4d..9e8b771b201 100644
--- a/src/mongo/db/repl/repl_server_parameters.idl
+++ b/src/mongo/db/repl/repl_server_parameters.idl
@@ -30,6 +30,8 @@
global:
cpp_namespace: "mongo::repl"
+ cpp_includes:
+ - "mongo/client/read_preference.h"
imports:
- "mongo/idl/basic_types.idl"
@@ -250,3 +252,17 @@ server_parameters:
lte:
expr: 100 * 1024 * 1024
+ # New parameters since this file was created, not taken from elsewhere.
+ initialSyncSourceReadPreference:
+ description: >-
+ Set this to specify how the sync source for initial sync is determined.
+ Valid options are: nearest, primary, primaryPreferred, secondary,
+ and secondaryPreferred.
+ set_at: startup
+ cpp_vartype: std::string
+ cpp_varname: initialSyncSourceReadPreference
+ # When the default is used, if chaining is disabled in the config then readPreference is
+ # 'primary'. Otherwise, if the node is a voting node, readPreference is 'primaryPreferred'
+ # and if it is not, readPreference is 'nearest'.
+ default: ""
+ validator: { callback: 'validateReadPreferenceMode' }
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 800ef68f611..dec850a385c 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -42,6 +42,7 @@
#include "mongo/bson/json.h"
#include "mongo/bson/timestamp.h"
#include "mongo/client/fetcher.h"
+#include "mongo/client/read_preference.h"
#include "mongo/db/audit.h"
#include "mongo/db/catalog/commit_quorum_options.h"
#include "mongo/db/client.h"
@@ -65,6 +66,7 @@
#include "mongo/db/repl/last_vote.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/db/repl/repl_server_parameters_gen.h"
#include "mongo/db/repl/repl_set_config_checks.h"
#include "mongo/db/repl/repl_set_heartbeat_args_v1.h"
#include "mongo/db/repl/repl_set_heartbeat_response.h"
@@ -3539,11 +3541,33 @@ HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource(const OpTime& lastOp
HostAndPort oldSyncSource = _topCoord->getSyncSourceAddress();
// Always allow chaining while in catchup and drain mode.
- auto chainingPreference = _getMemberState_inlock().primary()
+ auto memberState = _getMemberState_inlock();
+ auto chainingPreference = memberState.primary()
? TopologyCoordinator::ChainingPreference::kAllowChaining
: TopologyCoordinator::ChainingPreference::kUseConfiguration;
- HostAndPort newSyncSource =
- _topCoord->chooseNewSyncSource(_replExecutor->now(), lastOpTimeFetched, chainingPreference);
+ ReadPreference readPreference = ReadPreference::Nearest;
+ // Handle special case of initial sync source read preference.
+ // This sync source will be cleared when we go to secondary mode, because we will perform
+ // a postMemberState action of kOnFollowerModeStateChange which calls chooseNewSyncSource().
+ if (memberState.startup2() && _selfIndex != -1) {
+ if (!initialSyncSourceReadPreference.empty()) {
+ try {
+ readPreference =
+ ReadPreference_parse(IDLParserErrorContext("initialSyncSourceReadPreference"),
+ initialSyncSourceReadPreference);
+ } catch (const DBException& e) {
+ fassertFailedWithStatus(3873100, e.toStatus());
+ }
+ // If read preference is explictly set, it takes precedence over chaining: false.
+ chainingPreference = TopologyCoordinator::ChainingPreference::kAllowChaining;
+ }
+ }
+ HostAndPort newSyncSource = _topCoord->chooseNewSyncSource(
+ _replExecutor->now(), lastOpTimeFetched, chainingPreference, readPreference);
+ auto primary = _topCoord->getCurrentPrimaryMember();
+ // If read preference is SecondaryOnly, we should never choose the primary.
+ invariant(readPreference != ReadPreference::SecondaryOnly || !primary ||
+ primary->getHostAndPort() != newSyncSource);
// If we lost our sync source, schedule new heartbeats immediately to update our knowledge
// of other members's state, allowing us to make informed sync source decisions.
diff --git a/src/mongo/db/repl/topology_coordinator.cpp b/src/mongo/db/repl/topology_coordinator.cpp
index 2fff2fed94e..98b88dad64c 100644
--- a/src/mongo/db/repl/topology_coordinator.cpp
+++ b/src/mongo/db/repl/topology_coordinator.cpp
@@ -193,110 +193,52 @@ HostAndPort TopologyCoordinator::getSyncSourceAddress() const {
HostAndPort TopologyCoordinator::chooseNewSyncSource(Date_t now,
const OpTime& lastOpTimeFetched,
- ChainingPreference chainingPreference) {
- // If we are not a member of the current replica set configuration, no sync source is valid.
- if (_selfIndex == -1) {
- LOG(1) << "Cannot sync from any members because we are not in the replica set config";
- return HostAndPort();
- }
-
- MONGO_FAIL_POINT_BLOCK(forceSyncSourceCandidate, customArgs) {
- const auto& data = customArgs.getData();
- const auto hostAndPortElem = data["hostAndPort"];
- if (!hostAndPortElem) {
- severe() << "'forceSyncSoureCandidate' parameter set with invalid host and port: "
- << data;
- fassertFailed(50835);
- }
-
- const auto hostAndPort = HostAndPort(hostAndPortElem.checkAndGetStringData());
- const int syncSourceIndex = _rsConfig.findMemberIndexByHostAndPort(hostAndPort);
- if (syncSourceIndex < 0) {
- log() << "'forceSyncSourceCandidate' failed due to host and port not in "
- "replica set config: "
- << hostAndPort.toString();
- fassertFailed(50836);
- }
-
-
- if (_memberIsBlacklisted(_rsConfig.getMemberAt(syncSourceIndex), now)) {
- log() << "Cannot select a sync source because forced candidate is blacklisted: "
- << hostAndPort.toString();
- _syncSource = HostAndPort();
- return _syncSource;
- }
-
- _syncSource = _rsConfig.getMemberAt(syncSourceIndex).getHostAndPort();
- log() << "choosing sync source candidate due to 'forceSyncSourceCandidate' parameter: "
- << _syncSource;
- std::string msg(str::stream() << "syncing from: " << _syncSource.toString()
- << " by 'forceSyncSourceCandidate' parameter");
- setMyHeartbeatMessage(now, msg);
+ ChainingPreference chainingPreference,
+ ReadPreference readPreference) {
+ // Check to make sure we can choose a sync source, and choose a forced one if
+ // set.
+ auto maybeSyncSource = _chooseSyncSourceInitialStep(now);
+ if (maybeSyncSource) {
+ _syncSource = *maybeSyncSource;
return _syncSource;
}
- // if we have a target we've requested to sync from, use it
- if (_forceSyncSourceIndex != -1) {
- invariant(_forceSyncSourceIndex < _rsConfig.getNumMembers());
- _syncSource = _rsConfig.getMemberAt(_forceSyncSourceIndex).getHostAndPort();
- _forceSyncSourceIndex = -1;
- log() << "choosing sync source candidate by request: " << _syncSource;
- std::string msg(str::stream()
- << "syncing from: " << _syncSource.toString() << " by request");
- setMyHeartbeatMessage(now, msg);
- return _syncSource;
- }
-
- // wait for 2N pings (not counting ourselves) before choosing a sync target
- int needMorePings = (_memberData.size() - 1) * 2 - _getTotalPings();
-
- if (needMorePings > 0) {
- static Occasionally sampler;
- if (sampler.tick()) {
- log() << "waiting for " << needMorePings << " pings from other members before syncing";
+ // If we are only allowed to sync from the primary, use it as the sync source if possible.
+ if (readPreference == ReadPreference::PrimaryOnly ||
+ (chainingPreference == ChainingPreference::kUseConfiguration &&
+ !_rsConfig.isChainingAllowed())) {
+ if (readPreference == ReadPreference::SecondaryOnly) {
+ severe() << "Sync source read preference 'secondaryOnly' with chaining disabled is not "
+ "valid.";
+ fassertFailed(3873102);
+ }
+ _syncSource = _choosePrimaryAsSyncSource(now, lastOpTimeFetched);
+ if (_syncSource.empty()) {
+ if (readPreference == ReadPreference::PrimaryOnly) {
+ LOG(1) << "Cannot select a sync source because the primary is not a valid sync "
+ "source and the sync source read preference is 'primary'.";
+ } else {
+ LOG(1) << "Cannot select a sync source because the primary is not a valid sync "
+ "source and chaining is disabled.";
+ }
}
- _syncSource = HostAndPort();
return _syncSource;
- }
-
- // If we are only allowed to sync from the primary, set that
- if (chainingPreference == ChainingPreference::kUseConfiguration &&
- !_rsConfig.isChainingAllowed()) {
- if (_currentPrimaryIndex == -1) {
- LOG(1) << "Cannot select a sync source because chaining is"
- " not allowed and primary is unknown/down";
- _syncSource = HostAndPort();
- return _syncSource;
- } else if (_memberIsBlacklisted(*_currentPrimaryMember(), now)) {
- LOG(1) << "Cannot select a sync source because chaining is not allowed and primary "
- "member is blacklisted: "
- << _currentPrimaryMember()->getHostAndPort();
- _syncSource = HostAndPort();
- return _syncSource;
- } else if (_currentPrimaryIndex == _selfIndex) {
- LOG(1)
- << "Cannot select a sync source because chaining is not allowed and we are primary";
- _syncSource = HostAndPort();
- return _syncSource;
- } else if (_memberData.at(_currentPrimaryIndex).getLastAppliedOpTime() <
- lastOpTimeFetched) {
- LOG(1) << "Cannot select a sync source because chaining is not allowed and the primary "
- "is behind me. Last oplog optime of primary {}: {}, my last fetched oplog "
- "optime: {}"_format(
- _currentPrimaryMember()->getHostAndPort(),
- _memberData.at(_currentPrimaryIndex).getLastAppliedOpTime().toBSON(),
- lastOpTimeFetched.toBSON());
- _syncSource = HostAndPort();
- return _syncSource;
- } else {
- _syncSource = _currentPrimaryMember()->getHostAndPort();
- log() << "chaining not allowed, choosing primary as sync source candidate: "
- << _syncSource;
- std::string msg(str::stream() << "syncing from primary: " << _syncSource.toString());
- setMyHeartbeatMessage(now, msg);
+ } else if (readPreference == ReadPreference::PrimaryPreferred) {
+ // If we prefer the primary, try it first.
+ _syncSource = _choosePrimaryAsSyncSource(now, lastOpTimeFetched);
+ if (!_syncSource.empty()) {
return _syncSource;
}
}
+ _syncSource = _chooseNearbySyncSource(now, lastOpTimeFetched, readPreference);
+ return _syncSource;
+}
+
+HostAndPort TopologyCoordinator::_chooseNearbySyncSource(Date_t now,
+ const OpTime& lastOpTimeFetched,
+ ReadPreference readPreference) {
+ // We should have handled PrimaryOnly before calling this.
+ invariant(readPreference != ReadPreference::PrimaryOnly);
// find the member with the lowest ping time that is ahead of me
@@ -353,6 +295,17 @@ HostAndPort TopologyCoordinator::chooseNewSyncSource(Date_t now,
continue;
}
+ // Disallow the primary for first or all attempts depending on the readPreference.
+ if (readPreference == ReadPreference::SecondaryOnly ||
+ (readPreference == ReadPreference::SecondaryPreferred && attempts == 0)) {
+ if (it->getState().primary()) {
+ LOG(2) << "Cannot select sync source because it is a primary and we are "
+ "looking for a secondary: "
+ << itMemberConfig.getHostAndPort();
+ continue;
+ }
+ }
+
// On the first attempt, we skip candidates that do not match these criteria.
if (attempts == 0) {
// Candidate must be a voter if we are a voter.
@@ -441,6 +394,103 @@ HostAndPort TopologyCoordinator::chooseNewSyncSource(Date_t now,
return _syncSource;
}
+boost::optional<HostAndPort> TopologyCoordinator::_chooseSyncSourceInitialStep(Date_t now) {
+ // If we are not a member of the current replica set configuration, no sync source is valid.
+ if (_selfIndex == -1) {
+ LOG(1) << "Cannot sync from any members because we are not in the replica set config";
+ return HostAndPort();
+ }
+
+ MONGO_FAIL_POINT_BLOCK(forceSyncSourceCandidate, customArgs) {
+ const auto& data = customArgs.getData();
+ const auto hostAndPortElem = data["hostAndPort"];
+ if (!hostAndPortElem) {
+ severe() << "'forceSyncSoureCandidate' parameter set with invalid host and port: "
+ << data;
+ fassertFailed(50835);
+ }
+
+ const auto hostAndPort = HostAndPort(hostAndPortElem.checkAndGetStringData());
+ const int syncSourceIndex = _rsConfig.findMemberIndexByHostAndPort(hostAndPort);
+ if (syncSourceIndex < 0) {
+ log() << "'forceSyncSourceCandidate' failed due to host and port not in "
+ "replica set config: "
+ << hostAndPort.toString();
+ fassertFailed(50836);
+ }
+
+
+ if (_memberIsBlacklisted(_rsConfig.getMemberAt(syncSourceIndex), now)) {
+ log() << "Cannot select a sync source because forced candidate is blacklisted: "
+ << hostAndPort.toString();
+ return HostAndPort();
+ }
+
+ auto syncSource = _rsConfig.getMemberAt(syncSourceIndex).getHostAndPort();
+ log() << "choosing sync source candidate due to 'forceSyncSourceCandidate' parameter: "
+ << syncSource;
+ std::string msg(str::stream() << "syncing from: " << syncSource.toString()
+ << " by 'forceSyncSourceCandidate' parameter");
+ setMyHeartbeatMessage(now, msg);
+ return syncSource;
+ }
+
+ // if we have a target we've requested to sync from, use it
+ if (_forceSyncSourceIndex != -1) {
+ invariant(_forceSyncSourceIndex < _rsConfig.getNumMembers());
+ auto syncSource = _rsConfig.getMemberAt(_forceSyncSourceIndex).getHostAndPort();
+ _forceSyncSourceIndex = -1;
+ log() << "choosing sync source candidate by request: " << syncSource;
+ std::string msg(str::stream()
+ << "syncing from: " << syncSource.toString() << " by request");
+ setMyHeartbeatMessage(now, msg);
+ return syncSource;
+ }
+
+ // wait for 2N pings (not counting ourselves) before choosing a sync target
+ int needMorePings = (_memberData.size() - 1) * 2 - pingsInConfig;
+
+ if (needMorePings > 0) {
+ static Occasionally sampler;
+ if (sampler.tick()) {
+ log() << "waiting for " << needMorePings << " pings from other members before syncing";
+ }
+ return HostAndPort();
+ }
+ return boost::none;
+}
+
+HostAndPort TopologyCoordinator::_choosePrimaryAsSyncSource(Date_t now,
+ const OpTime& lastOpTimeFetched) {
+ if (_currentPrimaryIndex == -1) {
+ LOG(1) << "Cannot select the primary as sync source because"
+ "the primary is unknown/down";
+ return HostAndPort();
+ } else if (_memberIsBlacklisted(*getCurrentPrimaryMember(), now)) {
+ LOG(1) << "Cannot select the primary as sync source because the primary "
+ "member is blacklisted: "
+ << getCurrentPrimaryMember()->getHostAndPort();
+ return HostAndPort();
+ } else if (_currentPrimaryIndex == _selfIndex) {
+ LOG(1) << "Cannot select the primary as sync source because this node is primary.";
+ return HostAndPort();
+ } else if (_memberData.at(_currentPrimaryIndex).getLastAppliedOpTime() < lastOpTimeFetched) {
+ LOG(1) << "Cannot select the primary as sync source because the primary "
+ "is behind me. Last oplog optime of primary {}: {}, my last fetched oplog "
+ "optime: {}"_format(
+ getCurrentPrimaryMember()->getHostAndPort(),
+ _memberData.at(_currentPrimaryIndex).getLastAppliedOpTime().toBSON(),
+ lastOpTimeFetched.toBSON());
+ return HostAndPort();
+ } else {
+ auto syncSource = getCurrentPrimaryMember()->getHostAndPort();
+ log() << "Choosing primary as sync source candidate: " << syncSource;
+ std::string msg(str::stream() << "syncing from primary: " << syncSource.toString());
+ setMyHeartbeatMessage(now, msg);
+ return syncSource;
+ }
+}
+
bool TopologyCoordinator::_memberIsBlacklisted(const MemberConfig& memberConfig, Date_t now) const {
std::map<HostAndPort, Date_t>::const_iterator blacklisted =
_syncSourceBlacklist.find(memberConfig.getHostAndPort());
@@ -813,6 +863,7 @@ HeartbeatResponseAction TopologyCoordinator::processHeartbeatResponse(
} else {
ReplSetHeartbeatResponse hbr = std::move(hbResponse.getValue());
LOG(3) << "setUpValues: heartbeat response good for member _id:" << member.getId();
+ pingsInConfig++;
advancedOpTime = hbData.setUpValues(now, std::move(hbr));
}
@@ -1383,7 +1434,7 @@ void TopologyCoordinator::setCurrentPrimary_forTest(int primaryIndex,
}
}
-const MemberConfig* TopologyCoordinator::_currentPrimaryMember() const {
+const MemberConfig* TopologyCoordinator::getCurrentPrimaryMember() const {
if (_currentPrimaryIndex == -1)
return NULL;
@@ -1741,7 +1792,7 @@ void TopologyCoordinator::fillIsMasterForReplSet(IsMasterResponse* const respons
response->setIsMaster(myState.primary() && !isSteppingDown());
response->setIsSecondary(myState.secondary());
- const MemberConfig* curPrimary = _currentPrimaryMember();
+ const MemberConfig* curPrimary = getCurrentPrimaryMember();
if (curPrimary) {
response->setPrimary(curPrimary->getHostAndPort(horizon));
}
@@ -1854,6 +1905,8 @@ void TopologyCoordinator::_updateHeartbeatDataForReconfig(const ReplSetConfig& n
_memberData.clear();
// We're not in the config, we can't sync any more.
_syncSource = HostAndPort();
+ // We shouldn't get a sync source until we've received pings for our new config.
+ pingsInConfig = 0;
MemberData newHeartbeatData;
for (auto&& oldMemberData : oldHeartbeats) {
if (oldMemberData.isSelf()) {
diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h
index db0bf31428d..ba17d534016 100644
--- a/src/mongo/db/repl/topology_coordinator.h
+++ b/src/mongo/db/repl/topology_coordinator.h
@@ -32,6 +32,7 @@
#include <iosfwd>
#include <string>
+#include "mongo/client/read_preference.h"
#include "mongo/db/repl/is_master_response.h"
#include "mongo/db/repl/last_vote.h"
#include "mongo/db/repl/repl_set_heartbeat_args_v1.h"
@@ -186,10 +187,15 @@ public:
/**
* Chooses and sets a new sync source, based on our current knowledge of the world.
+ * If chaining is disabled in the configuration and chainingPreference is kUseConfiguration,
+ * only the primary will be selected (regardless of read preference). Otherwise,
+ * the readPreference is respected. Chaining disabled with SecondaryOnly read preference is
+ * not allowed.
*/
HostAndPort chooseNewSyncSource(Date_t now,
const OpTime& lastOpTimeFetched,
- ChainingPreference chainingPreference);
+ ChainingPreference chainingPreference,
+ ReadPreference readPreference);
/**
* Suppresses selecting "host" as sync source until "until".
@@ -709,6 +715,11 @@ public:
bool checkIfCommitQuorumIsSatisfied(const CommitQuorumOptions& commitQuorum,
const std::vector<HostAndPort>& commitReadyMembers) const;
+ /**
+ * Returns nullptr if there is no primary, or the MemberConfig* for the current primary.
+ */
+ const MemberConfig* getCurrentPrimaryMember() const;
+
////////////////////////////////////////////////////////////
//
// Test support methods
@@ -791,6 +802,25 @@ private:
// Returns the number of heartbeat pings which have occurred.
int _getTotalPings();
+ // Does preliminary checks involved in choosing sync source
+ // * Do we have a valid configuration?
+ // * Do we have a forced sync source?
+ // * Have we gotten enough pings?
+ // Returns a HostAndPort if one is decided (may be empty), boost:none if we need to move to the
+ // next step.
+ boost::optional<HostAndPort> _chooseSyncSourceInitialStep(Date_t now);
+
+ // Returns the primary node if it is a valid sync source, otherwise returns an empty
+ // HostAndPort.
+ HostAndPort _choosePrimaryAsSyncSource(Date_t now, const OpTime& lastOpTimeFetched);
+
+ // Chooses a sync source among available nodes. ReadPreference may be any value but
+ // PrimaryOnly, but PrimaryPreferred is treated the same as "Nearest" (it is assumed
+ // the caller will handle PrimaryPreferred by trying _choosePrimaryAsSyncSource() first)
+ HostAndPort _chooseNearbySyncSource(Date_t now,
+ const OpTime& lastOpTimeFetched,
+ ReadPreference readPreference);
+
// Returns the current "ping" value for the given member by their address
Milliseconds _getPing(const HostAndPort& host);
@@ -842,9 +872,6 @@ private:
*/
MemberData* _findMemberDataByMemberId(const int memberId);
- // Returns NULL if there is no primary, or the MemberConfig* for the current primary
- const MemberConfig* _currentPrimaryMember() const;
-
/**
* Performs updating "_currentPrimaryIndex" for processHeartbeatResponse(), and determines if an
* election or stepdown should commence.
@@ -974,6 +1001,9 @@ private:
typedef std::map<HostAndPort, PingStats> PingMap;
// Ping stats for each member by HostAndPort;
PingMap _pings;
+ // For the purpose of deciding on a sync source, we count only pings for nodes which are in our
+ // current config.
+ int pingsInConfig = 0;
// V1 last vote info for elections
LastVote _lastVote{OpTime::kInitialTerm, -1};
diff --git a/src/mongo/db/repl/topology_coordinator_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_v1_test.cpp
index 3e7de8908b9..d17dc192d57 100644
--- a/src/mongo/db/repl/topology_coordinator_v1_test.cpp
+++ b/src/mongo/db/repl/topology_coordinator_v1_test.cpp
@@ -43,6 +43,7 @@
#include "mongo/logger/logger.h"
#include "mongo/rpc/metadata/oplog_query_metadata.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
+#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/net/hostandport.h"
@@ -320,7 +321,10 @@ private:
TEST_F(TopoCoordTest, NodeReturnsSecondaryWithMostRecentDataAsSyncSource) {
// if we do not have an index in the config, we should get an empty syncsource
HostAndPort newSyncSource = getTopoCoord().chooseNewSyncSource(
- now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++,
+ OpTime(),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest);
ASSERT_TRUE(newSyncSource.empty());
updateConfig(BSON("_id"
@@ -345,7 +349,10 @@ TEST_F(TopoCoordTest, NodeReturnsSecondaryWithMostRecentDataAsSyncSource) {
// Fail due to insufficient number of pings
newSyncSource = getTopoCoord().chooseNewSyncSource(
- now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++,
+ OpTime(),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest);
ASSERT_EQUALS(getTopoCoord().getSyncSourceAddress(), newSyncSource);
ASSERT(getTopoCoord().getSyncSourceAddress().empty());
@@ -356,42 +363,55 @@ TEST_F(TopoCoordTest, NodeReturnsSecondaryWithMostRecentDataAsSyncSource) {
// Should choose h2, since it is furthest ahead
newSyncSource = getTopoCoord().chooseNewSyncSource(
- now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++,
+ OpTime(),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest);
ASSERT_EQUALS(getTopoCoord().getSyncSourceAddress(), newSyncSource);
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
// h3 becomes further ahead, so it should be chosen
heartbeatFromMember(
HostAndPort("h3"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(2, 0), 0));
- getTopoCoord().chooseNewSyncSource(
- now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ getTopoCoord().chooseNewSyncSource(now()++,
+ OpTime(),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest);
ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
// h3 becomes an invalid candidate for sync source; should choose h2 again
heartbeatFromMember(
HostAndPort("h3"), "rs0", MemberState::RS_RECOVERING, OpTime(Timestamp(2, 0), 0));
- getTopoCoord().chooseNewSyncSource(
- now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ getTopoCoord().chooseNewSyncSource(now()++,
+ OpTime(),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest);
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
// h3 back in SECONDARY and ahead
heartbeatFromMember(
HostAndPort("h3"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(2, 0), 0));
- getTopoCoord().chooseNewSyncSource(
- now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ getTopoCoord().chooseNewSyncSource(now()++,
+ OpTime(),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest);
ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
// h3 goes down
receiveDownHeartbeat(HostAndPort("h3"), "rs0");
- getTopoCoord().chooseNewSyncSource(
- now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ getTopoCoord().chooseNewSyncSource(now()++,
+ OpTime(),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest);
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
// h3 back up and ahead
heartbeatFromMember(
HostAndPort("h3"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(2, 0), 0));
- getTopoCoord().chooseNewSyncSource(
- now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ getTopoCoord().chooseNewSyncSource(now()++,
+ OpTime(),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest);
ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
}
@@ -504,8 +524,10 @@ TEST_F(TopoCoordTest, NodeReturnsClosestValidSyncSourceAsSyncSource) {
Milliseconds(100));
// Should choose primary first; it's closest
- getTopoCoord().chooseNewSyncSource(
- now()++, lastOpTimeWeApplied, TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ getTopoCoord().chooseNewSyncSource(now()++,
+ lastOpTimeWeApplied,
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest);
ASSERT_EQUALS(HostAndPort("hprimary"), getTopoCoord().getSyncSourceAddress());
// Primary goes far far away
@@ -517,40 +539,52 @@ TEST_F(TopoCoordTest, NodeReturnsClosestValidSyncSourceAsSyncSource) {
// Should choose h4. (if an arbiter has an oplog, it's a valid sync source)
// h6 is not considered because it is outside the maxSyncLagSeconds window.
- getTopoCoord().chooseNewSyncSource(
- now()++, lastOpTimeWeApplied, TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ getTopoCoord().chooseNewSyncSource(now()++,
+ lastOpTimeWeApplied,
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest);
ASSERT_EQUALS(HostAndPort("h4"), getTopoCoord().getSyncSourceAddress());
// h4 goes down; should choose h1
receiveDownHeartbeat(HostAndPort("h4"), "rs0");
- getTopoCoord().chooseNewSyncSource(
- now()++, lastOpTimeWeApplied, TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ getTopoCoord().chooseNewSyncSource(now()++,
+ lastOpTimeWeApplied,
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest);
ASSERT_EQUALS(HostAndPort("h1"), getTopoCoord().getSyncSourceAddress());
// Primary and h1 go down; should choose h6
receiveDownHeartbeat(HostAndPort("h1"), "rs0");
receiveDownHeartbeat(HostAndPort("hprimary"), "rs0");
ASSERT_EQUALS(-1, getCurrentPrimaryIndex());
- getTopoCoord().chooseNewSyncSource(
- now()++, lastOpTimeWeApplied, TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ getTopoCoord().chooseNewSyncSource(now()++,
+ lastOpTimeWeApplied,
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest);
ASSERT_EQUALS(HostAndPort("h6"), getTopoCoord().getSyncSourceAddress());
// h6 goes down; should choose h5
receiveDownHeartbeat(HostAndPort("h6"), "rs0");
- getTopoCoord().chooseNewSyncSource(
- now()++, lastOpTimeWeApplied, TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ getTopoCoord().chooseNewSyncSource(now()++,
+ lastOpTimeWeApplied,
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest);
ASSERT_EQUALS(HostAndPort("h5"), getTopoCoord().getSyncSourceAddress());
// h5 goes down; should choose h3
receiveDownHeartbeat(HostAndPort("h5"), "rs0");
- getTopoCoord().chooseNewSyncSource(
- now()++, lastOpTimeWeApplied, TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ getTopoCoord().chooseNewSyncSource(now()++,
+ lastOpTimeWeApplied,
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest);
ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
// h3 goes down; no sync source candidates remain
receiveDownHeartbeat(HostAndPort("h3"), "rs0");
- getTopoCoord().chooseNewSyncSource(
- now()++, lastOpTimeWeApplied, TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ getTopoCoord().chooseNewSyncSource(now()++,
+ lastOpTimeWeApplied,
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest);
ASSERT(getTopoCoord().getSyncSourceAddress().empty());
}
@@ -592,14 +626,18 @@ TEST_F(TopoCoordTest, NodeWontChooseSyncSourceFromOlderTerm) {
OpTime(Timestamp(300, 0), 2), // old term
Milliseconds(100));
- getTopoCoord().chooseNewSyncSource(
- now()++, lastOpTimeWeApplied, TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ getTopoCoord().chooseNewSyncSource(now()++,
+ lastOpTimeWeApplied,
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest);
ASSERT_EQUALS(HostAndPort("h1"), getTopoCoord().getSyncSourceAddress());
// h1 goes down; no sync source candidates remain
receiveDownHeartbeat(HostAndPort("h1"), "rs0");
- getTopoCoord().chooseNewSyncSource(
- now()++, lastOpTimeWeApplied, TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ getTopoCoord().chooseNewSyncSource(now()++,
+ lastOpTimeWeApplied,
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest);
ASSERT(getTopoCoord().getSyncSourceAddress().empty());
}
@@ -641,10 +679,12 @@ TEST_F(TopoCoordTest, ChooseOnlyPrimaryAsSyncSourceWhenChainingIsDisallowed) {
Milliseconds(300));
// No primary situation: should choose no sync source.
- ASSERT_EQUALS(
- HostAndPort(),
- getTopoCoord().chooseNewSyncSource(
- now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration));
+ ASSERT_EQUALS(HostAndPort(),
+ getTopoCoord().chooseNewSyncSource(
+ now()++,
+ OpTime(),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest));
ASSERT(getTopoCoord().getSyncSourceAddress().empty());
// Add primary
@@ -662,7 +702,8 @@ TEST_F(TopoCoordTest, ChooseOnlyPrimaryAsSyncSourceWhenChainingIsDisallowed) {
getTopoCoord().chooseNewSyncSource(
now()++,
OpTime(Timestamp(10, 0), 0),
- TopologyCoordinator::ChainingPreference::kUseConfiguration));
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest));
ASSERT_EQUALS(HostAndPort(), getTopoCoord().getSyncSourceAddress());
// Update the primary's position.
@@ -679,16 +720,45 @@ TEST_F(TopoCoordTest, ChooseOnlyPrimaryAsSyncSourceWhenChainingIsDisallowed) {
getTopoCoord().chooseNewSyncSource(
now()++,
OpTime(Timestamp(10, 0), 0),
- TopologyCoordinator::ChainingPreference::kUseConfiguration));
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest));
ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
- // When we are in catch-up mode, the chainingAllowed setting is ignored. h2 should be chosen as
- // the sync source.
- ASSERT_EQUALS(HostAndPort("h2"),
+ // We should choose primary h3 regardless of read preference.
+ ASSERT_EQUALS(HostAndPort("h3"),
getTopoCoord().chooseNewSyncSource(
now()++,
OpTime(Timestamp(10, 0), 0),
- TopologyCoordinator::ChainingPreference::kAllowChaining));
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::PrimaryOnly));
+ ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
+
+ ASSERT_EQUALS(HostAndPort("h3"),
+ getTopoCoord().chooseNewSyncSource(
+ now()++,
+ OpTime(Timestamp(10, 0), 0),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::PrimaryPreferred));
+ ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
+
+ ASSERT_EQUALS(HostAndPort("h3"),
+ getTopoCoord().chooseNewSyncSource(
+ now()++,
+ OpTime(Timestamp(10, 0), 0),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::SecondaryPreferred));
+ ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
+
+ // We do not test SecondaryOnly here because that results in an fassert.
+
+ // When we are in catch-up mode, the chainingAllowed setting is ignored. h2 should be chosen as
+ // the sync source.
+ ASSERT_EQUALS(
+ HostAndPort("h2"),
+ getTopoCoord().chooseNewSyncSource(now()++,
+ OpTime(Timestamp(10, 0), 0),
+ TopologyCoordinator::ChainingPreference::kAllowChaining,
+ ReadPreference::Nearest));
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
// Become primary: should not choose self as sync source.
@@ -699,13 +769,61 @@ TEST_F(TopoCoordTest, ChooseOnlyPrimaryAsSyncSourceWhenChainingIsDisallowed) {
Milliseconds(300));
makeSelfPrimary(Timestamp(3.0));
ASSERT_EQUALS(0, getCurrentPrimaryIndex());
- ASSERT_EQUALS(
- HostAndPort(),
- getTopoCoord().chooseNewSyncSource(
- now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration));
+ ASSERT_EQUALS(HostAndPort(),
+ getTopoCoord().chooseNewSyncSource(
+ now()++,
+ OpTime(),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest));
ASSERT(getTopoCoord().getSyncSourceAddress().empty());
}
+DEATH_TEST_F(TopoCoordTest, SecondaryOnlyAssertsWhenChainingNotAllowed, "3873102") {
+ updateConfig(BSON("_id"
+ << "rs0"
+ << "version" << 1 << "settings" << BSON("chainingAllowed" << false)
+ << "members"
+ << BSON_ARRAY(BSON("_id" << 10 << "host"
+ << "hself")
+ << BSON("_id" << 20 << "host"
+ << "h2")
+ << BSON("_id" << 30 << "host"
+ << "h3"))),
+ 0);
+
+ setSelfMemberState(MemberState::RS_SECONDARY);
+
+ heartbeatFromMember(HostAndPort("h2"),
+ "rs0",
+ MemberState::RS_SECONDARY,
+ OpTime(Timestamp(11, 0), 0),
+ Milliseconds(100));
+ heartbeatFromMember(HostAndPort("h2"),
+ "rs0",
+ MemberState::RS_SECONDARY,
+ OpTime(Timestamp(11, 0), 0),
+ Milliseconds(100));
+ heartbeatFromMember(HostAndPort("h3"),
+ "rs0",
+ MemberState::RS_SECONDARY,
+ OpTime(Timestamp(0, 0), 0),
+ Milliseconds(300));
+
+ // Set h3 as primary.
+ heartbeatFromMember(HostAndPort("h3"),
+ "rs0",
+ MemberState::RS_PRIMARY,
+ OpTime(Timestamp(10, 0), 0),
+ Milliseconds(300));
+
+ // Attempting to choose a sync source with SecondaryOnly and chaining disabled
+ // results in an fassert.
+ getTopoCoord().chooseNewSyncSource(now()++,
+ OpTime(Timestamp(10, 0), 0),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::SecondaryOnly);
+}
+
TEST_F(TopoCoordTest, ChooseOnlyVotersAsSyncSourceWhenNodeIsAVoter) {
updateConfig(fromjson("{_id:'rs0', version:1, members:["
"{_id:10, host:'hself'}, "
@@ -729,18 +847,27 @@ TEST_F(TopoCoordTest, ChooseOnlyVotersAsSyncSourceWhenNodeIsAVoter) {
// Should choose h3 as it is a voter
auto newSource = getTopoCoord().chooseNewSyncSource(
- now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++,
+ OpTime(),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest);
ASSERT_EQUALS(h3, newSource);
// Can't choose h2 as it is not a voter
newSource = getTopoCoord().chooseNewSyncSource(
- now()++, ot10, TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++,
+ ot10,
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest);
ASSERT_EQUALS(HostAndPort(), newSource);
// Should choose h3 as it is a voter, and ahead
heartbeatFromMember(h3, "rs0", MemberState::RS_SECONDARY, ot5, hbRTT300);
newSource = getTopoCoord().chooseNewSyncSource(
- now()++, ot1, TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++,
+ ot1,
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest);
ASSERT_EQUALS(h3, newSource);
}
@@ -781,10 +908,12 @@ TEST_F(TopoCoordTest, ChooseSameSyncSourceEvenWhenPrimary) {
Milliseconds(300));
// No primary situation: should choose h2 sync source.
- ASSERT_EQUALS(
- HostAndPort("h2"),
- getTopoCoord().chooseNewSyncSource(
- now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration));
+ ASSERT_EQUALS(HostAndPort("h2"),
+ getTopoCoord().chooseNewSyncSource(
+ now()++,
+ OpTime(),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest));
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
// Become primary
@@ -792,10 +921,12 @@ TEST_F(TopoCoordTest, ChooseSameSyncSourceEvenWhenPrimary) {
ASSERT_EQUALS(0, getCurrentPrimaryIndex());
// Choose same sync source even when primary.
- ASSERT_EQUALS(
- HostAndPort("h2"),
- getTopoCoord().chooseNewSyncSource(
- now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration));
+ ASSERT_EQUALS(HostAndPort("h2"),
+ getTopoCoord().chooseNewSyncSource(
+ now()++,
+ OpTime(),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest));
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
}
@@ -826,8 +957,10 @@ TEST_F(TopoCoordTest, ChooseRequestedSyncSourceOnlyTheFirstTimeAfterTheSyncSourc
HostAndPort("h3"), "rs0", MemberState::RS_SECONDARY, newOpTime, Milliseconds(100));
// force should overrule other defaults
- getTopoCoord().chooseNewSyncSource(
- now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ getTopoCoord().chooseNewSyncSource(now()++,
+ OpTime(),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest);
ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
getTopoCoord().setForceSyncSourceIndex(1);
// force should cause shouldChangeSyncSource() to return true
@@ -840,13 +973,17 @@ TEST_F(TopoCoordTest, ChooseRequestedSyncSourceOnlyTheFirstTimeAfterTheSyncSourc
HostAndPort("h2"), makeReplSetMetadata(oldOpTime), boost::none, now()));
ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
HostAndPort("h3"), makeReplSetMetadata(newOpTime), boost::none, now()));
- getTopoCoord().chooseNewSyncSource(
- now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ getTopoCoord().chooseNewSyncSource(now()++,
+ OpTime(),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest);
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
// force should only work for one call to chooseNewSyncSource
- getTopoCoord().chooseNewSyncSource(
- now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ getTopoCoord().chooseNewSyncSource(now()++,
+ OpTime(),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest);
ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
}
@@ -886,20 +1023,26 @@ TEST_F(TopoCoordTest, NodeDoesNotChooseBlacklistedSyncSourceUntilBlacklistingExp
OpTime(Timestamp(2, 0), 0),
Milliseconds(100));
- getTopoCoord().chooseNewSyncSource(
- now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ getTopoCoord().chooseNewSyncSource(now()++,
+ OpTime(),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest);
ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
Date_t expireTime = Date_t::fromMillisSinceEpoch(1000);
getTopoCoord().blacklistSyncSource(HostAndPort("h3"), expireTime);
- getTopoCoord().chooseNewSyncSource(
- now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ getTopoCoord().chooseNewSyncSource(now()++,
+ OpTime(),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest);
// Should choose second best choice now that h3 is blacklisted.
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
// After time has passed, should go back to original sync source
- getTopoCoord().chooseNewSyncSource(
- expireTime, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ getTopoCoord().chooseNewSyncSource(expireTime,
+ OpTime(),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest);
ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
}
@@ -941,21 +1084,439 @@ TEST_F(TopoCoordTest, ChooseNoSyncSourceWhenPrimaryIsBlacklistedAndChainingIsDis
OpTime(Timestamp(2, 0), 0),
Milliseconds(100));
- getTopoCoord().chooseNewSyncSource(
- now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ getTopoCoord().chooseNewSyncSource(now()++,
+ OpTime(),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest);
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
Date_t expireTime = Date_t::fromMillisSinceEpoch(1000);
getTopoCoord().blacklistSyncSource(HostAndPort("h2"), expireTime);
- getTopoCoord().chooseNewSyncSource(
- now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ getTopoCoord().chooseNewSyncSource(now()++,
+ OpTime(),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest);
// Can't choose any sync source now.
ASSERT(getTopoCoord().getSyncSourceAddress().empty());
// After time has passed, should go back to the primary
- getTopoCoord().chooseNewSyncSource(
- expireTime, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ getTopoCoord().chooseNewSyncSource(expireTime,
+ OpTime(),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest);
+ ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
+}
+
+TEST_F(TopoCoordTest, ChooseOnlyPrimaryAsSyncSourceWhenReadPreferenceIsPrimaryOnly) {
+ updateConfig(BSON("_id"
+ << "rs0"
+ << "version" << 1 << "settings" << BSON("chainingAllowed" << true)
+ << "members"
+ << BSON_ARRAY(BSON("_id" << 10 << "host"
+ << "hself")
+ << BSON("_id" << 20 << "host"
+ << "h2")
+ << BSON("_id" << 30 << "host"
+ << "h3"))),
+ 0);
+
+ // We use readPreference nearest only when in initialSync.
+ setSelfMemberState(MemberState::RS_STARTUP2);
+
+ heartbeatFromMember(HostAndPort("h2"),
+ "rs0",
+ MemberState::RS_SECONDARY,
+ OpTime(Timestamp(11, 0), 0),
+ Milliseconds(100));
+ heartbeatFromMember(HostAndPort("h2"),
+ "rs0",
+ MemberState::RS_SECONDARY,
+ OpTime(Timestamp(11, 0), 0),
+ Milliseconds(100));
+ heartbeatFromMember(HostAndPort("h3"),
+ "rs0",
+ MemberState::RS_SECONDARY,
+ OpTime(Timestamp(0, 0), 0),
+ Milliseconds(300));
+ heartbeatFromMember(HostAndPort("h3"),
+ "rs0",
+ MemberState::RS_SECONDARY,
+ OpTime(Timestamp(0, 0), 0),
+ Milliseconds(300));
+
+ // No primary situation: should choose no sync source.
+ ASSERT_EQUALS(HostAndPort(),
+ getTopoCoord().chooseNewSyncSource(
+ now()++,
+ OpTime(),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::PrimaryOnly));
+ ASSERT(getTopoCoord().getSyncSourceAddress().empty());
+
+ // Add primary
+ ASSERT_EQUALS(-1, getCurrentPrimaryIndex());
+ heartbeatFromMember(HostAndPort("h3"),
+ "rs0",
+ MemberState::RS_PRIMARY,
+ OpTime(Timestamp(0, 0), 0),
+ Milliseconds(300));
+ ASSERT_EQUALS(2, getCurrentPrimaryIndex());
+
+ // h3 is primary, but its last applied isn't as up-to-date as ours, so it cannot be chosen
+ // as the sync source.
+ ASSERT_EQUALS(HostAndPort(),
+ getTopoCoord().chooseNewSyncSource(
+ now()++,
+ OpTime(Timestamp(10, 0), 0),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::PrimaryOnly));
+ ASSERT_EQUALS(HostAndPort(), getTopoCoord().getSyncSourceAddress());
+
+ // Update the primary's position.
+ heartbeatFromMember(HostAndPort("h3"),
+ "rs0",
+ MemberState::RS_PRIMARY,
+ OpTime(Timestamp(10, 0), 0),
+ Milliseconds(300));
+
+ // h3 is primary and should be chosen as the sync source, despite being further away than h2
+ // and the primary (h3) being at our most recently applied optime.
+ ASSERT_EQUALS(HostAndPort("h3"),
+ getTopoCoord().chooseNewSyncSource(
+ now()++,
+ OpTime(Timestamp(10, 0), 0),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::PrimaryOnly));
+ ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
+
+ // Become primary: should not choose self as sync source.
+ heartbeatFromMember(HostAndPort("h3"),
+ "rs0",
+ MemberState::RS_SECONDARY,
+ OpTime(Timestamp(0, 0), 0),
+ Milliseconds(300));
+ makeSelfPrimary(Timestamp(3.0));
+ ASSERT_EQUALS(0, getCurrentPrimaryIndex());
+ ASSERT_EQUALS(HostAndPort(),
+ getTopoCoord().chooseNewSyncSource(
+ now()++,
+ OpTime(),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::PrimaryOnly));
+ ASSERT(getTopoCoord().getSyncSourceAddress().empty());
+}
+
+TEST_F(TopoCoordTest, ChooseOnlySecondaryAsSyncSourceWhenReadPreferenceIsSecondaryOnly) {
+ updateConfig(BSON("_id"
+ << "rs0"
+ << "version" << 1 << "settings" << BSON("chainingAllowed" << true)
+ << "members"
+ << BSON_ARRAY(BSON("_id" << 10 << "host"
+ << "hself")
+ << BSON("_id" << 20 << "host"
+ << "h2")
+ << BSON("_id" << 30 << "host"
+ << "h3"))),
+ 0);
+
+ // We use readPreference nearest only when in initialSync.
+ setSelfMemberState(MemberState::RS_STARTUP2);
+
+ heartbeatFromMember(HostAndPort("h2"),
+ "rs0",
+ MemberState::RS_RECOVERING,
+ OpTime(Timestamp(0, 0), 0),
+ Milliseconds(300));
+ heartbeatFromMember(HostAndPort("h2"),
+ "rs0",
+ MemberState::RS_RECOVERING,
+ OpTime(Timestamp(0, 0), 0),
+ Milliseconds(300));
+ heartbeatFromMember(HostAndPort("h3"),
+ "rs0",
+ MemberState::RS_PRIMARY,
+ OpTime(Timestamp(11, 0), 0),
+ Milliseconds(100));
+ heartbeatFromMember(HostAndPort("h3"),
+ "rs0",
+ MemberState::RS_PRIMARY,
+ OpTime(Timestamp(11, 0), 0),
+ Milliseconds(100));
+
+ // No secondary situation: should choose no sync source.
+ ASSERT_EQUALS(HostAndPort(),
+ getTopoCoord().chooseNewSyncSource(
+ now()++,
+ OpTime(),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::SecondaryOnly));
+ ASSERT(getTopoCoord().getSyncSourceAddress().empty());
+
+ // Add secondary
+ heartbeatFromMember(HostAndPort("h2"),
+ "rs0",
+ MemberState::RS_SECONDARY,
+ OpTime(Timestamp(10, 0), 0),
+ Milliseconds(300));
+
+ // h2 is a secondary, but its last applied isn't more up-to-date than ours, so it cannot be
+ // chosen as the sync source.
+ ASSERT_EQUALS(HostAndPort(),
+ getTopoCoord().chooseNewSyncSource(
+ now()++,
+ OpTime(Timestamp(10, 0), 0),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::SecondaryOnly));
+ ASSERT_EQUALS(HostAndPort(), getTopoCoord().getSyncSourceAddress());
+
+ // Update h2's position.
+ heartbeatFromMember(HostAndPort("h2"),
+ "rs0",
+ MemberState::RS_SECONDARY,
+ OpTime(Timestamp(11, 0), 0),
+ Milliseconds(300));
+
+ // h2 is a secondary and should be chosen as the sync source despite being further away than the
+ // primary (h3).
+ ASSERT_EQUALS(HostAndPort("h2"),
+ getTopoCoord().chooseNewSyncSource(
+ now()++,
+ OpTime(Timestamp(10, 0), 0),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::SecondaryOnly));
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
+
+ // Become primary: should choose nearest valid secondary as sync source.
+ heartbeatFromMember(HostAndPort("h3"),
+ "rs0",
+ MemberState::RS_SECONDARY,
+ OpTime(Timestamp(11, 0), 0),
+ Milliseconds(100));
+ makeSelfPrimary(Timestamp(3.0));
+ ASSERT_EQUALS(0, getCurrentPrimaryIndex());
+ ASSERT_EQUALS(HostAndPort("h3"),
+ getTopoCoord().chooseNewSyncSource(
+ now()++,
+ OpTime(),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::SecondaryOnly));
+ ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
+}
+
+TEST_F(TopoCoordTest, PreferPrimaryAsSyncSourceWhenReadPreferenceIsPrimaryPreferred) {
+ updateConfig(BSON("_id"
+ << "rs0"
+ << "version" << 1 << "settings" << BSON("chainingAllowed" << true)
+ << "members"
+ << BSON_ARRAY(BSON("_id" << 10 << "host"
+ << "hself")
+ << BSON("_id" << 20 << "host"
+ << "h2")
+ << BSON("_id" << 30 << "host"
+ << "h3"))),
+ 0);
+
+ // We use readPreference nearest only when in initialSync.
+ setSelfMemberState(MemberState::RS_STARTUP2);
+
+ heartbeatFromMember(HostAndPort("h2"),
+ "rs0",
+ MemberState::RS_SECONDARY,
+ OpTime(Timestamp(11, 0), 0),
+ Milliseconds(100));
+ heartbeatFromMember(HostAndPort("h2"),
+ "rs0",
+ MemberState::RS_SECONDARY,
+ OpTime(Timestamp(11, 0), 0),
+ Milliseconds(100));
+ heartbeatFromMember(HostAndPort("h3"),
+ "rs0",
+ MemberState::RS_SECONDARY,
+ OpTime(Timestamp(0, 0), 0),
+ Milliseconds(300));
+ heartbeatFromMember(HostAndPort("h3"),
+ "rs0",
+ MemberState::RS_SECONDARY,
+ OpTime(Timestamp(0, 0), 0),
+ Milliseconds(300));
+
+ // No primary situation: should choose h2.
+ ASSERT_EQUALS(HostAndPort("h2"),
+ getTopoCoord().chooseNewSyncSource(
+ now()++,
+ OpTime(),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::PrimaryPreferred));
+ ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
+
+ // Add primary
+ ASSERT_EQUALS(-1, getCurrentPrimaryIndex());
+ heartbeatFromMember(HostAndPort("h3"),
+ "rs0",
+ MemberState::RS_PRIMARY,
+ OpTime(Timestamp(0, 0), 0),
+ Milliseconds(300));
+ ASSERT_EQUALS(2, getCurrentPrimaryIndex());
+
+ // h3 is primary, but its last applied isn't as up-to-date as ours, so it cannot be chosen
+ // as the sync source.
+ ASSERT_EQUALS(HostAndPort("h2"),
+ getTopoCoord().chooseNewSyncSource(
+ now()++,
+ OpTime(Timestamp(10, 0), 0),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::PrimaryPreferred));
+ ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
+
+ // Update the primary's position.
+ heartbeatFromMember(HostAndPort("h3"),
+ "rs0",
+ MemberState::RS_PRIMARY,
+ OpTime(Timestamp(10, 0), 0),
+ Milliseconds(300));
+
+ // h3 is primary and should be chosen as the sync source, despite being further away than h2
+ // and the primary (h3) being at our most recently applied optime.
+ ASSERT_EQUALS(HostAndPort("h3"),
+ getTopoCoord().chooseNewSyncSource(
+ now()++,
+ OpTime(Timestamp(10, 0), 0),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::PrimaryPreferred));
+ ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
+
+ // Sanity check: the same test as above should return the secondary "h2" if primary is not
+ // preferred.
+ ASSERT_EQUALS(HostAndPort("h2"),
+ getTopoCoord().chooseNewSyncSource(
+ now()++,
+ OpTime(Timestamp(10, 0), 0),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest));
+ ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
+
+ // Become primary: should choose closest secondary (h2).
+ heartbeatFromMember(HostAndPort("h3"),
+ "rs0",
+ MemberState::RS_SECONDARY,
+ OpTime(Timestamp(11, 0), 0),
+ Milliseconds(300));
+ makeSelfPrimary(Timestamp(3.0));
+ ASSERT_EQUALS(0, getCurrentPrimaryIndex());
+ ASSERT_EQUALS(HostAndPort("h2"),
+ getTopoCoord().chooseNewSyncSource(
+ now()++,
+ OpTime(),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::PrimaryPreferred));
+ ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
+}
+
+TEST_F(TopoCoordTest, PreferSecondaryAsSyncSourceWhenReadPreferenceIsSecondaryPreferred) {
+ updateConfig(BSON("_id"
+ << "rs0"
+ << "version" << 1 << "settings" << BSON("chainingAllowed" << true)
+ << "members"
+ << BSON_ARRAY(BSON("_id" << 10 << "host"
+ << "hself")
+ << BSON("_id" << 20 << "host"
+ << "h2")
+ << BSON("_id" << 30 << "host"
+ << "h3"))),
+ 0);
+
+ // We use readPreference nearest only when in initialSync.
+ setSelfMemberState(MemberState::RS_STARTUP2);
+
+ heartbeatFromMember(HostAndPort("h2"),
+ "rs0",
+ MemberState::RS_RECOVERING,
+ OpTime(Timestamp(0, 0), 0),
+ Milliseconds(300));
+ heartbeatFromMember(HostAndPort("h2"),
+ "rs0",
+ MemberState::RS_RECOVERING,
+ OpTime(Timestamp(0, 0), 0),
+ Milliseconds(300));
+ heartbeatFromMember(HostAndPort("h3"),
+ "rs0",
+ MemberState::RS_PRIMARY,
+ OpTime(Timestamp(11, 0), 0),
+ Milliseconds(100));
+ heartbeatFromMember(HostAndPort("h3"),
+ "rs0",
+ MemberState::RS_PRIMARY,
+ OpTime(Timestamp(11, 0), 0),
+ Milliseconds(100));
+
+ // No secondary situation: should choose primary as sync source.
+ ASSERT_EQUALS(HostAndPort("h3"),
+ getTopoCoord().chooseNewSyncSource(
+ now()++,
+ OpTime(),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::SecondaryPreferred));
+ ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
+
+ // Add secondary.
+ heartbeatFromMember(HostAndPort("h2"),
+ "rs0",
+ MemberState::RS_SECONDARY,
+ OpTime(Timestamp(10, 0), 0),
+ Milliseconds(300));
+
+ // h2 is a secondary, but its last applied isn't more up-to-date than ours, so it cannot be
+ // chosen as the sync source.
+ ASSERT_EQUALS(HostAndPort("h3"),
+ getTopoCoord().chooseNewSyncSource(
+ now()++,
+ OpTime(Timestamp(10, 0), 0),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::SecondaryPreferred));
+ ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
+
+ // Update h2's position.
+ heartbeatFromMember(HostAndPort("h2"),
+ "rs0",
+ MemberState::RS_SECONDARY,
+ OpTime(Timestamp(11, 0), 0),
+ Milliseconds(300));
+
+ // h2 is a secondary and should be chosen as the sync source despite being further away than the
+ // primary (h3).
+ ASSERT_EQUALS(HostAndPort("h2"),
+ getTopoCoord().chooseNewSyncSource(
+ now()++,
+ OpTime(Timestamp(10, 0), 0),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::SecondaryPreferred));
+ ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
+
+ // Sanity check: If we weren't preferring the secondary, we'd choose the primary in this
+ // situation.
+ ASSERT_EQUALS(HostAndPort("h3"),
+ getTopoCoord().chooseNewSyncSource(
+ now()++,
+ OpTime(Timestamp(10, 0), 0),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest));
+ ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
+
+ // Become primary: should choose nearest valid secondary as sync source.
+ heartbeatFromMember(HostAndPort("h3"),
+ "rs0",
+ MemberState::RS_SECONDARY,
+ OpTime(Timestamp(11, 0), 0),
+ Milliseconds(100));
+ makeSelfPrimary(Timestamp(3.0));
+ ASSERT_EQUALS(0, getCurrentPrimaryIndex());
+ ASSERT_EQUALS(HostAndPort("h3"),
+ getTopoCoord().chooseNewSyncSource(
+ now()++,
+ OpTime(),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::SecondaryPreferred));
+ ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
}
TEST_F(TopoCoordTest, NodeChangesToRecoveringWhenOnlyUnauthorizedNodesAreUp) {
@@ -994,10 +1555,12 @@ TEST_F(TopoCoordTest, NodeChangesToRecoveringWhenOnlyUnauthorizedNodesAreUp) {
OpTime(Timestamp(2, 0), 0),
Milliseconds(100));
- ASSERT_EQUALS(
- HostAndPort("h3"),
- getTopoCoord().chooseNewSyncSource(
- now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration));
+ ASSERT_EQUALS(HostAndPort("h3"),
+ getTopoCoord().chooseNewSyncSource(
+ now()++,
+ OpTime(),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest));
ASSERT_EQUALS(MemberState::RS_SECONDARY, getTopoCoord().getMemberState().s);
// Good state setup done
@@ -1007,7 +1570,8 @@ TEST_F(TopoCoordTest, NodeChangesToRecoveringWhenOnlyUnauthorizedNodesAreUp) {
ASSERT_TRUE(getTopoCoord()
.chooseNewSyncSource(now()++,
OpTime(),
- TopologyCoordinator::ChainingPreference::kUseConfiguration)
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest)
.empty());
ASSERT_EQUALS(MemberState::RS_SECONDARY, getTopoCoord().getMemberState().s);
@@ -1017,7 +1581,8 @@ TEST_F(TopoCoordTest, NodeChangesToRecoveringWhenOnlyUnauthorizedNodesAreUp) {
ASSERT_TRUE(getTopoCoord()
.chooseNewSyncSource(now()++,
OpTime(),
- TopologyCoordinator::ChainingPreference::kUseConfiguration)
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest)
.empty());
ASSERT_EQUALS(MemberState::RS_RECOVERING, getTopoCoord().getMemberState().s);
@@ -1318,8 +1883,10 @@ TEST_F(TopoCoordTest, ChooseRequestedNodeWhenSyncFromRequestsAStaleNode) {
ASSERT_OK(result);
ASSERT_EQUALS("requested member \"h5:27017\" is more than 10 seconds behind us",
response.obj()["warning"].String());
- getTopoCoord().chooseNewSyncSource(
- now()++, ourOpTime, TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ getTopoCoord().chooseNewSyncSource(now()++,
+ ourOpTime,
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest);
ASSERT_EQUALS(HostAndPort("h5"), getTopoCoord().getSyncSourceAddress());
}
@@ -1361,8 +1928,10 @@ TEST_F(TopoCoordTest, ChooseRequestedNodeWhenSyncFromRequestsAValidNode) {
ASSERT_OK(result);
BSONObj responseObj = response.obj();
ASSERT_FALSE(responseObj.hasField("warning"));
- getTopoCoord().chooseNewSyncSource(
- now()++, ourOpTime, TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ getTopoCoord().chooseNewSyncSource(now()++,
+ ourOpTime,
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest);
ASSERT_EQUALS(HostAndPort("h6"), getTopoCoord().getSyncSourceAddress());
}
@@ -1406,7 +1975,10 @@ TEST_F(TopoCoordTest,
ASSERT_FALSE(responseObj.hasField("warning"));
receiveDownHeartbeat(HostAndPort("h6"), "rs0");
HostAndPort syncSource = getTopoCoord().chooseNewSyncSource(
- now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++,
+ OpTime(),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest);
ASSERT_EQUALS(HostAndPort("h6"), syncSource);
}
@@ -1511,8 +2083,10 @@ TEST_F(TopoCoordTest,
BSONObj responseObj = response.obj();
ASSERT_FALSE(responseObj.hasField("warning"));
ASSERT_FALSE(responseObj.hasField("prevSyncTarget"));
- getTopoCoord().chooseNewSyncSource(
- now()++, ourOpTime, TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ getTopoCoord().chooseNewSyncSource(now()++,
+ ourOpTime,
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest);
ASSERT_EQUALS(HostAndPort("h5"), getTopoCoord().getSyncSourceAddress());
heartbeatFromMember(
@@ -2075,8 +2649,10 @@ TEST_F(PrepareHeartbeatResponseV1Test,
HostAndPort("h2"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(1, 0), 0));
heartbeatFromMember(
HostAndPort("h2"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(1, 0), 0));
- getTopoCoord().chooseNewSyncSource(
- now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ getTopoCoord().chooseNewSyncSource(now()++,
+ OpTime(),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest);
// set up args
ReplSetHeartbeatArgsV1 args;
@@ -4193,8 +4769,10 @@ TEST_F(TopoCoordTest,
receiveUpHeartbeat(
HostAndPort("host1"), "rs0", MemberState::RS_PRIMARY, election, oplogProgress);
- getTopoCoord().chooseNewSyncSource(
- now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ getTopoCoord().chooseNewSyncSource(now()++,
+ OpTime(),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest);
ASSERT_EQUALS(HostAndPort("host1"), getTopoCoord().getSyncSourceAddress());
{
@@ -4281,8 +4859,10 @@ TEST_F(TopoCoordTest, replSetGetStatusForThreeMemberedReplicaSet) {
HostAndPort("hprimary"));
// Since chainingAllowed is disabled, hself should choose hprimary.
- getTopoCoord().chooseNewSyncSource(
- now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ getTopoCoord().chooseNewSyncSource(now()++,
+ OpTime(),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration,
+ ReadPreference::Nearest);
ASSERT_EQUALS(HostAndPort("hprimary"), getTopoCoord().getSyncSourceAddress());
BSONObjBuilder statusBuilder;