diff options
author | Matthew Russotto <matthew.russotto@10gen.com> | 2020-04-21 13:16:02 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-04-21 17:54:41 +0000 |
commit | 5988fe51d7a18c022b51eda6f7243123b4e9a6ab (patch) | |
tree | 6e8fcaba0f9dc427bb586a7ec4100847a80db137 | |
parent | d4274059f2070d537d7c62167ab41b4b78f46efd (diff) | |
download | mongo-5988fe51d7a18c022b51eda6f7243123b4e9a6ab.tar.gz |
SERVER-38731 Implement ability to specify sync source read preference in initial sync
-rwxr-xr-x | buildscripts/errorcodes.py | 6 | ||||
-rw-r--r-- | jstests/replsets/initial_sync_chooses_correct_sync_source.js | 208 | ||||
-rw-r--r-- | src/mongo/client/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/client/read_preference.cpp | 69 | ||||
-rw-r--r-- | src/mongo/client/read_preference.h | 34 | ||||
-rw-r--r-- | src/mongo/client/read_preference.idl | 56 | ||||
-rw-r--r-- | src/mongo/db/repl/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_server_parameters.idl | 16 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 30 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator.cpp | 249 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator.h | 38 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_v1_test.cpp | 766 |
12 files changed, 1200 insertions, 275 deletions
diff --git a/buildscripts/errorcodes.py b/buildscripts/errorcodes.py index d56fadb009a..faf264d0c55 100755 --- a/buildscripts/errorcodes.py +++ b/buildscripts/errorcodes.py @@ -28,6 +28,7 @@ ASSERT_NAMES = ["uassert", "massert", "fassert", "fassertFailed"] MINIMUM_CODE = 10000 # This limit is intended to be increased by 1000 when we get close. MAXIMUM_CODE = 51999 +FIRST_BACKPORTED_CODE = 3873100 # pylint: disable=invalid-name codes = [] # type: ignore @@ -136,7 +137,10 @@ def read_error_codes(): if not code in seen: seen[code] = assert_loc # on first occurrence of a specific excessively large code, add to skips, errors - if int(code) > MAXIMUM_CODE: + if int(code) >= FIRST_BACKPORTED_CODE: + # Large codes are used in newer versions, so allow them in backports. + pass + elif int(code) > MAXIMUM_CODE: skips.append(assert_loc) errors.append(assert_loc) elif int(code) > MAXIMUM_CODE - 20: diff --git a/jstests/replsets/initial_sync_chooses_correct_sync_source.js b/jstests/replsets/initial_sync_chooses_correct_sync_source.js new file mode 100644 index 00000000000..a6658ff37ba --- /dev/null +++ b/jstests/replsets/initial_sync_chooses_correct_sync_source.js @@ -0,0 +1,208 @@ +/** + * Tests that initial sync chooses the correct sync source based on chaining and the + * initialSyncReadPreference. + * + * @tags: [requires_fcv_42] + */ +(function() { +"use strict"; + +load("jstests/libs/fail_point_util.js"); + +const delayMillis = 50; // Adds a delay long enough to make a node not the "nearest" sync source. +const testName = "initial_sync_chooses_correct_sync_source"; +const rst = new ReplSetTest( + {name: testName, nodes: [{}, {rsConfig: {priority: 0}}], allowChaining: true, useBridge: true}); +const nodes = rst.startSet(); +rst.initiate(); + +const primary = rst.getPrimary(); +const primaryDb = primary.getDB("test"); +const secondary = rst.getSecondary(); + +// Skip validation while bringing the initial sync node up and down, because we don't wait for the +// sync to complete. +TestData.skipCollectionAndIndexValidation = true; + +// Add some data to be cloned. +assert.commandWorked(primaryDb.test.insert([{a: 1}, {b: 2}, {c: 3}])); +rst.awaitReplication(); + +jsTestLog("Testing chaining enabled, default initialSyncSourceReadPreference, non-voting node"); +// Ensure we see the sync source progress messages. +TestData.setParameters = TestData.setParameters || {}; +TestData.setParameters.logComponentVerbosity = TestData.setParameters.logComponentVerbosity || {}; +TestData.setParameters.logComponentVerbosity.replication = + TestData.setParameters.logComponentVerbosity.replication || {}; +TestData.setParameters.logComponentVerbosity.replication = + Object.merge(TestData.setParameters.logComponentVerbosity.replication, {verbosity: 2}); +const initialSyncNode = rst.add({ + rsConfig: {priority: 0, votes: 0}, + setParameter: { + 'failpoint.initialSyncHangBeforeCreatingOplog': tojson({mode: 'alwaysOn'}), + 'numInitialSyncAttempts': 1 + } +}); +primary.delayMessagesFrom(initialSyncNode, delayMillis); +rst.reInitiate(); + +assert.commandWorked(initialSyncNode.adminCommand({ + waitForFailPoint: "initialSyncHangBeforeCreatingOplog", + timesEntered: 1, + maxTimeMS: kDefaultWaitForFailPointTimeout +})); +let res = assert.commandWorked(initialSyncNode.adminCommand({replSetGetStatus: 1})); + +// With zero votes and a default initialSyncSourceReadPreference, the secondary should be the sync +// source because this is equivalent to "nearest" and the primary is delayed. +assert.eq(res.syncSourceHost, secondary.host); +initialSyncNode.adminCommand( + {configureFailPoint: "initialSyncHangBeforeCreatingOplog", mode: "off"}); + +/*-----------------------------------------------------------------------------------------------*/ +jsTestLog( + "Testing chaining enabled, 'primaryPreferred' initialSyncSourceReadPreference, non-voting node"); +rst.restart(initialSyncNode, { + startClean: true, + setParameter: { + 'failpoint.initialSyncHangBeforeCreatingOplog': tojson({mode: 'alwaysOn'}), + 'initialSyncSourceReadPreference': 'primaryPreferred', + 'numInitialSyncAttempts': 1 + } +}); + +assert.commandWorked(initialSyncNode.adminCommand({ + waitForFailPoint: "initialSyncHangBeforeCreatingOplog", + timesEntered: 1, + maxTimeMS: kDefaultWaitForFailPointTimeout +})); +res = assert.commandWorked(initialSyncNode.adminCommand({replSetGetStatus: 1})); + +// With an initialSyncSourceReadPreference of 'primaryPreferred', the primary should be the sync +// source even when it is delayed. +assert.eq(res.syncSourceHost, primary.host); +initialSyncNode.adminCommand( + {configureFailPoint: "initialSyncHangBeforeCreatingOplog", mode: "off"}); + +/*-----------------------------------------------------------------------------------------------*/ +/* Switch to a configuration with a voting node. */ +/*-----------------------------------------------------------------------------------------------*/ +jsTestLog("Reconfiguring set so initial sync node is voting."); +let config = rst.getReplSetConfigFromNode(); +config.members[2].votes = 1; +config.version++; +assert.commandWorked(primary.adminCommand({replSetReconfig: config})); + +jsTestLog("Testing chaining enabled, default initialSyncSourceReadPreference, voting node"); +// Ensure sync source selection is logged. +rst.restart(initialSyncNode, { + startClean: true, + setParameter: { + 'failpoint.initialSyncHangBeforeCreatingOplog': tojson({mode: 'alwaysOn'}), + 'numInitialSyncAttempts': 1 + } +}); + +assert.commandWorked(initialSyncNode.adminCommand({ + waitForFailPoint: "initialSyncHangBeforeCreatingOplog", + timesEntered: 1, + maxTimeMS: kDefaultWaitForFailPointTimeout +})); +res = assert.commandWorked(initialSyncNode.adminCommand({replSetGetStatus: 1})); + +// With a voting node and a default initialSyncSourceReadPreference, the undelayed secondary should +// be the sync source (in 4.4+, the primary should be the sync source in this case) +assert.eq(res.syncSourceHost, secondary.host); +initialSyncNode.adminCommand( + {configureFailPoint: "initialSyncHangBeforeCreatingOplog", mode: "off"}); + +/*-----------------------------------------------------------------------------------------------*/ +jsTestLog( + "Testing chaining enabled, 'primaryPreferred' initialSyncSourceReadPreference, voting node"); +rst.restart(initialSyncNode, { + startClean: true, + setParameter: { + 'failpoint.initialSyncHangBeforeCreatingOplog': tojson({mode: 'alwaysOn'}), + 'initialSyncSourceReadPreference': 'primaryPreferred', + 'numInitialSyncAttempts': 1 + } +}); + +assert.commandWorked(initialSyncNode.adminCommand({ + waitForFailPoint: "initialSyncHangBeforeCreatingOplog", + timesEntered: 1, + maxTimeMS: kDefaultWaitForFailPointTimeout +})); +res = assert.commandWorked(initialSyncNode.adminCommand({replSetGetStatus: 1})); + +// The primary should be chosen when 'primaryPreferred' is used, even though it is delayed. +assert.eq(res.syncSourceHost, primary.host); +initialSyncNode.adminCommand( + {configureFailPoint: "initialSyncHangBeforeCreatingOplog", mode: "off"}); + +/*-----------------------------------------------------------------------------------------------*/ +/* Switch back to a non-voting node, and disable chaining also. */ +/*-----------------------------------------------------------------------------------------------*/ +jsTestLog("Reconfiguring set so initial sync node is non-voting and chaining is disabled."); +config.settings.chainingAllowed = false; +config.members[2].votes = 0; +config.version++; +assert.commandWorked(primary.adminCommand({replSetReconfig: config})); + +/*-----------------------------------------------------------------------------------------------*/ +jsTestLog("Testing chaining disabled, default initialSyncSourceReadPreference, non-voting node"); +rst.restart(initialSyncNode, { + startClean: true, + setParameter: { + 'failpoint.initialSyncHangBeforeCreatingOplog': tojson({mode: 'alwaysOn'}), + 'numInitialSyncAttempts': 1 + } +}); + +assert.commandWorked(initialSyncNode.adminCommand({ + waitForFailPoint: "initialSyncHangBeforeCreatingOplog", + timesEntered: 1, + maxTimeMS: kDefaultWaitForFailPointTimeout +})); +res = assert.commandWorked(initialSyncNode.adminCommand({replSetGetStatus: 1})); + +// With chaining disabled, the default should be to select the primary even though it is delayed. +assert.eq(res.syncSourceHost, primary.host); +initialSyncNode.adminCommand( + {configureFailPoint: "initialSyncHangBeforeCreatingOplog", mode: "off"}); + +/*-----------------------------------------------------------------------------------------------*/ +jsTestLog("Testing chaining disabled, 'nearest' initialSyncSourceReadPreference, non-voting node"); +rst.restart(initialSyncNode, { + startClean: true, + setParameter: { + 'failpoint.initialSyncHangBeforeCreatingOplog': tojson({mode: 'alwaysOn'}), + 'initialSyncSourceReadPreference': 'nearest', + 'numInitialSyncAttempts': 1 + } +}); + +assert.commandWorked(initialSyncNode.adminCommand({ + waitForFailPoint: "initialSyncHangBeforeCreatingOplog", + timesEntered: 1, + maxTimeMS: kDefaultWaitForFailPointTimeout +})); +res = assert.commandWorked(initialSyncNode.adminCommand({replSetGetStatus: 1})); + +// With chaining disabled, we choose the delayed secondary over the non-delayed primary when +// readPreference is explicitly 'nearest'. +assert.eq(res.syncSourceHost, secondary.host); +initialSyncNode.adminCommand( + {configureFailPoint: "initialSyncHangBeforeCreatingOplog", mode: "off"}); + +// Once we become secondary, the secondary read preference no longer matters and we choose the +// primary because chaining is disallowed. +assert.soon(function() { + let res = assert.commandWorked(initialSyncNode.adminCommand({replSetGetStatus: 1})); + return res.syncSourceHost == primary.host; +}); + +primary.delayMessagesFrom(initialSyncNode, 0); +TestData.skipCollectionAndIndexValidation = false; +rst.stopSet(); +})(); diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript index 923ac00d491..a1e79accfec 100644 --- a/src/mongo/client/SConscript +++ b/src/mongo/client/SConscript @@ -48,6 +48,7 @@ env.Library( ], source=[ 'read_preference.cpp', + env.Idlc('read_preference.idl')[0], ], LIBDEPS=[ '$BUILD_DIR/mongo/bson/util/bson_extract', diff --git a/src/mongo/client/read_preference.cpp b/src/mongo/client/read_preference.cpp index b71825ad395..b5dffdeb43f 100644 --- a/src/mongo/client/read_preference.cpp +++ b/src/mongo/client/read_preference.cpp @@ -49,49 +49,6 @@ const char kModeFieldName[] = "mode"; const char kTagsFieldName[] = "tags"; const char kMaxStalenessSecondsFieldName[] = "maxStalenessSeconds"; -const char kPrimaryOnly[] = "primary"; -const char kPrimaryPreferred[] = "primaryPreferred"; -const char kSecondaryOnly[] = "secondary"; -const char kSecondaryPreferred[] = "secondaryPreferred"; -const char kNearest[] = "nearest"; - -StringData readPreferenceName(ReadPreference pref) { - switch (pref) { - case ReadPreference::PrimaryOnly: - return StringData(kPrimaryOnly); - case ReadPreference::PrimaryPreferred: - return StringData(kPrimaryPreferred); - case ReadPreference::SecondaryOnly: - return StringData(kSecondaryOnly); - case ReadPreference::SecondaryPreferred: - return StringData(kSecondaryPreferred); - case ReadPreference::Nearest: - return StringData(kNearest); - default: - MONGO_UNREACHABLE; - } -} - -StatusWith<ReadPreference> parseReadPreferenceMode(StringData prefStr) { - if (prefStr == kPrimaryOnly) { - return ReadPreference::PrimaryOnly; - } else if (prefStr == kPrimaryPreferred) { - return ReadPreference::PrimaryPreferred; - } else if (prefStr == kSecondaryOnly) { - return ReadPreference::SecondaryOnly; - } else if (prefStr == kSecondaryPreferred) { - return ReadPreference::SecondaryPreferred; - } else if (prefStr == kNearest) { - return ReadPreference::Nearest; - } - return Status(ErrorCodes::FailedToParse, - str::stream() << "Could not parse $readPreference mode '" << prefStr - << "'. Only the modes '" << kPrimaryOnly << "', '" - << kPrimaryPreferred << "', '" << kSecondaryOnly << "', '" - << kSecondaryPreferred << "', and '" << kNearest - << "' are supported."); -} - // Slight kludge here: if we weren't passed a TagSet, we default to the empty // TagSet if ReadPreference is Primary, or the default (wildcard) TagSet otherwise. // This maintains compatibility with existing code, while preserving the ability to round @@ -107,6 +64,14 @@ TagSet defaultTagSetForMode(ReadPreference mode) { } // namespace +Status validateReadPreferenceMode(const std::string& prefStr) { + try { + ReadPreference_parse(IDLParserErrorContext(kModeFieldName), prefStr); + } catch (DBException& e) { + return e.toStatus(); + } + return Status::OK(); +} /** * Replica set refresh period on the task executor. @@ -154,11 +119,19 @@ StatusWith<ReadPreferenceSetting> ReadPreferenceSetting::fromInnerBSON(const BSO } ReadPreference mode; - auto swReadPrefMode = parseReadPreferenceMode(modeStr); - if (!swReadPrefMode.isOK()) { - return swReadPrefMode.getStatus(); + try { + mode = ReadPreference_parse(IDLParserErrorContext(kModeFieldName), modeStr); + } catch (DBException& e) { + return e.toStatus().withContext( + str::stream() << "Could not parse $readPreference mode '" << modeStr + << "'. Only the modes '" + << ReadPreference_serializer(ReadPreference::PrimaryOnly) << "', '" + << ReadPreference_serializer(ReadPreference::PrimaryPreferred) << "', '" + << ReadPreference_serializer(ReadPreference::SecondaryOnly) << "', '" + << ReadPreference_serializer(ReadPreference::SecondaryPreferred) + << "', and '" << ReadPreference_serializer(ReadPreference::Nearest) + << "' are supported."); } - mode = std::move(swReadPrefMode.getValue()); TagSet tags; BSONElement tagsElem; @@ -243,7 +216,7 @@ StatusWith<ReadPreferenceSetting> ReadPreferenceSetting::fromContainingBSON( } void ReadPreferenceSetting::toInnerBSON(BSONObjBuilder* bob) const { - bob->append(kModeFieldName, readPreferenceName(pref)); + bob->append(kModeFieldName, ReadPreference_serializer(pref)); if (tags != defaultTagSetForMode(pref)) { bob->append(kTagsFieldName, tags.getTagBSON()); } diff --git a/src/mongo/client/read_preference.h b/src/mongo/client/read_preference.h index f4807110b14..69803e41cf1 100644 --- a/src/mongo/client/read_preference.h +++ b/src/mongo/client/read_preference.h @@ -30,6 +30,7 @@ #pragma once #include "mongo/bson/simple_bsonobj_comparator.h" +#include "mongo/client/read_preference_gen.h" #include "mongo/db/jsobj.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/optime.h" @@ -39,35 +40,12 @@ namespace mongo { template <typename T> class StatusWith; -enum class ReadPreference { - /** - * Read from primary only. All operations produce an error (throw an exception where - * applicable) if primary is unavailable. Cannot be combined with tags. - */ - PrimaryOnly = 0, - - /** - * Read from primary if available, otherwise a secondary. Tags will only be applied in the - * event that the primary is unavailable and a secondary is read from. In this event only - * secondaries matching the tags provided would be read from. - */ - PrimaryPreferred, - - /** - * Read from secondary if available, otherwise error. - */ - SecondaryOnly, +using ReadPreference = ReadPreferenceEnum; - /** - * Read from a secondary if available, otherwise read from the primary. - */ - SecondaryPreferred, - - /** - * Read from any member. - */ - Nearest, -}; +/** + * Validate a ReadPreference string. This is intended for use as an IDL validator callback. + */ +Status validateReadPreferenceMode(const std::string& prefStr); /** * A simple object for representing the list of tags requested by a $readPreference. diff --git a/src/mongo/client/read_preference.idl b/src/mongo/client/read_preference.idl new file mode 100644 index 00000000000..d98376baef8 --- /dev/null +++ b/src/mongo/client/read_preference.idl @@ -0,0 +1,56 @@ +# Copyright (C) 2020-present MongoDB, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the Server Side Public License, version 1, +# as published by MongoDB, Inc. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Server Side Public License for more details. +# +# You should have received a copy of the Server Side Public License +# along with this program. If not, see +# <http://www.mongodb.com/licensing/server-side-public-license>. +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the Server Side Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. +# + +global: + cpp_namespace: "mongo" + +imports: + - "mongo/idl/basic_types.idl" + +enums: + ReadPreference: + description: Enumeration representing Read Preference Modes + type: string + values: + # Read from primary only. All operations produce an error (throw an exception where + # applicable) if primary is unavailable. Cannot be combined with tags. + PrimaryOnly: "primary" + # + # Read from primary if available, otherwise a secondary. Tags will only be applied in the + # event that the primary is unavailable and a secondary is read from. In this event only + # secondaries matching the tags provided would be read from. + PrimaryPreferred: "primaryPreferred" + # + # Read from secondary if available, otherwise error. + SecondaryOnly: "secondary" + # + # Read from a secondary if available, otherwise read from the primary. + SecondaryPreferred: "secondaryPreferred" + # + # Read from any member. + Nearest: "nearest" diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index e069505779d..a9fd0695f1f 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -10,6 +10,7 @@ env.Library( env.Idlc('repl_server_parameters.idl')[0] ], LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/client/read_preference', '$BUILD_DIR/mongo/idl/server_parameter', ] ) @@ -925,6 +926,7 @@ env.Library( 'initial_syncer', 'data_replicator_external_state_initial_sync', 'repl_coordinator_interface', + 'repl_server_parameters', 'repl_settings', 'replica_set_messages', 'replication_process', diff --git a/src/mongo/db/repl/repl_server_parameters.idl b/src/mongo/db/repl/repl_server_parameters.idl index ba56ea21c4d..9e8b771b201 100644 --- a/src/mongo/db/repl/repl_server_parameters.idl +++ b/src/mongo/db/repl/repl_server_parameters.idl @@ -30,6 +30,8 @@ global: cpp_namespace: "mongo::repl" + cpp_includes: + - "mongo/client/read_preference.h" imports: - "mongo/idl/basic_types.idl" @@ -250,3 +252,17 @@ server_parameters: lte: expr: 100 * 1024 * 1024 + # New parameters since this file was created, not taken from elsewhere. + initialSyncSourceReadPreference: + description: >- + Set this to specify how the sync source for initial sync is determined. + Valid options are: nearest, primary, primaryPreferred, secondary, + and secondaryPreferred. + set_at: startup + cpp_vartype: std::string + cpp_varname: initialSyncSourceReadPreference + # When the default is used, if chaining is disabled in the config then readPreference is + # 'primary'. Otherwise, if the node is a voting node, readPreference is 'primaryPreferred' + # and if it is not, readPreference is 'nearest'. + default: "" + validator: { callback: 'validateReadPreferenceMode' } diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 800ef68f611..dec850a385c 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -42,6 +42,7 @@ #include "mongo/bson/json.h" #include "mongo/bson/timestamp.h" #include "mongo/client/fetcher.h" +#include "mongo/client/read_preference.h" #include "mongo/db/audit.h" #include "mongo/db/catalog/commit_quorum_options.h" #include "mongo/db/client.h" @@ -65,6 +66,7 @@ #include "mongo/db/repl/last_vote.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/db/repl/repl_client_info.h" +#include "mongo/db/repl/repl_server_parameters_gen.h" #include "mongo/db/repl/repl_set_config_checks.h" #include "mongo/db/repl/repl_set_heartbeat_args_v1.h" #include "mongo/db/repl/repl_set_heartbeat_response.h" @@ -3539,11 +3541,33 @@ HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource(const OpTime& lastOp HostAndPort oldSyncSource = _topCoord->getSyncSourceAddress(); // Always allow chaining while in catchup and drain mode. - auto chainingPreference = _getMemberState_inlock().primary() + auto memberState = _getMemberState_inlock(); + auto chainingPreference = memberState.primary() ? TopologyCoordinator::ChainingPreference::kAllowChaining : TopologyCoordinator::ChainingPreference::kUseConfiguration; - HostAndPort newSyncSource = - _topCoord->chooseNewSyncSource(_replExecutor->now(), lastOpTimeFetched, chainingPreference); + ReadPreference readPreference = ReadPreference::Nearest; + // Handle special case of initial sync source read preference. + // This sync source will be cleared when we go to secondary mode, because we will perform + // a postMemberState action of kOnFollowerModeStateChange which calls chooseNewSyncSource(). + if (memberState.startup2() && _selfIndex != -1) { + if (!initialSyncSourceReadPreference.empty()) { + try { + readPreference = + ReadPreference_parse(IDLParserErrorContext("initialSyncSourceReadPreference"), + initialSyncSourceReadPreference); + } catch (const DBException& e) { + fassertFailedWithStatus(3873100, e.toStatus()); + } + // If read preference is explictly set, it takes precedence over chaining: false. + chainingPreference = TopologyCoordinator::ChainingPreference::kAllowChaining; + } + } + HostAndPort newSyncSource = _topCoord->chooseNewSyncSource( + _replExecutor->now(), lastOpTimeFetched, chainingPreference, readPreference); + auto primary = _topCoord->getCurrentPrimaryMember(); + // If read preference is SecondaryOnly, we should never choose the primary. + invariant(readPreference != ReadPreference::SecondaryOnly || !primary || + primary->getHostAndPort() != newSyncSource); // If we lost our sync source, schedule new heartbeats immediately to update our knowledge // of other members's state, allowing us to make informed sync source decisions. diff --git a/src/mongo/db/repl/topology_coordinator.cpp b/src/mongo/db/repl/topology_coordinator.cpp index 2fff2fed94e..98b88dad64c 100644 --- a/src/mongo/db/repl/topology_coordinator.cpp +++ b/src/mongo/db/repl/topology_coordinator.cpp @@ -193,110 +193,52 @@ HostAndPort TopologyCoordinator::getSyncSourceAddress() const { HostAndPort TopologyCoordinator::chooseNewSyncSource(Date_t now, const OpTime& lastOpTimeFetched, - ChainingPreference chainingPreference) { - // If we are not a member of the current replica set configuration, no sync source is valid. - if (_selfIndex == -1) { - LOG(1) << "Cannot sync from any members because we are not in the replica set config"; - return HostAndPort(); - } - - MONGO_FAIL_POINT_BLOCK(forceSyncSourceCandidate, customArgs) { - const auto& data = customArgs.getData(); - const auto hostAndPortElem = data["hostAndPort"]; - if (!hostAndPortElem) { - severe() << "'forceSyncSoureCandidate' parameter set with invalid host and port: " - << data; - fassertFailed(50835); - } - - const auto hostAndPort = HostAndPort(hostAndPortElem.checkAndGetStringData()); - const int syncSourceIndex = _rsConfig.findMemberIndexByHostAndPort(hostAndPort); - if (syncSourceIndex < 0) { - log() << "'forceSyncSourceCandidate' failed due to host and port not in " - "replica set config: " - << hostAndPort.toString(); - fassertFailed(50836); - } - - - if (_memberIsBlacklisted(_rsConfig.getMemberAt(syncSourceIndex), now)) { - log() << "Cannot select a sync source because forced candidate is blacklisted: " - << hostAndPort.toString(); - _syncSource = HostAndPort(); - return _syncSource; - } - - _syncSource = _rsConfig.getMemberAt(syncSourceIndex).getHostAndPort(); - log() << "choosing sync source candidate due to 'forceSyncSourceCandidate' parameter: " - << _syncSource; - std::string msg(str::stream() << "syncing from: " << _syncSource.toString() - << " by 'forceSyncSourceCandidate' parameter"); - setMyHeartbeatMessage(now, msg); + ChainingPreference chainingPreference, + ReadPreference readPreference) { + // Check to make sure we can choose a sync source, and choose a forced one if + // set. + auto maybeSyncSource = _chooseSyncSourceInitialStep(now); + if (maybeSyncSource) { + _syncSource = *maybeSyncSource; return _syncSource; } - // if we have a target we've requested to sync from, use it - if (_forceSyncSourceIndex != -1) { - invariant(_forceSyncSourceIndex < _rsConfig.getNumMembers()); - _syncSource = _rsConfig.getMemberAt(_forceSyncSourceIndex).getHostAndPort(); - _forceSyncSourceIndex = -1; - log() << "choosing sync source candidate by request: " << _syncSource; - std::string msg(str::stream() - << "syncing from: " << _syncSource.toString() << " by request"); - setMyHeartbeatMessage(now, msg); - return _syncSource; - } - - // wait for 2N pings (not counting ourselves) before choosing a sync target - int needMorePings = (_memberData.size() - 1) * 2 - _getTotalPings(); - - if (needMorePings > 0) { - static Occasionally sampler; - if (sampler.tick()) { - log() << "waiting for " << needMorePings << " pings from other members before syncing"; + // If we are only allowed to sync from the primary, use it as the sync source if possible. + if (readPreference == ReadPreference::PrimaryOnly || + (chainingPreference == ChainingPreference::kUseConfiguration && + !_rsConfig.isChainingAllowed())) { + if (readPreference == ReadPreference::SecondaryOnly) { + severe() << "Sync source read preference 'secondaryOnly' with chaining disabled is not " + "valid."; + fassertFailed(3873102); + } + _syncSource = _choosePrimaryAsSyncSource(now, lastOpTimeFetched); + if (_syncSource.empty()) { + if (readPreference == ReadPreference::PrimaryOnly) { + LOG(1) << "Cannot select a sync source because the primary is not a valid sync " + "source and the sync source read preference is 'primary'."; + } else { + LOG(1) << "Cannot select a sync source because the primary is not a valid sync " + "source and chaining is disabled."; + } } - _syncSource = HostAndPort(); return _syncSource; - } - - // If we are only allowed to sync from the primary, set that - if (chainingPreference == ChainingPreference::kUseConfiguration && - !_rsConfig.isChainingAllowed()) { - if (_currentPrimaryIndex == -1) { - LOG(1) << "Cannot select a sync source because chaining is" - " not allowed and primary is unknown/down"; - _syncSource = HostAndPort(); - return _syncSource; - } else if (_memberIsBlacklisted(*_currentPrimaryMember(), now)) { - LOG(1) << "Cannot select a sync source because chaining is not allowed and primary " - "member is blacklisted: " - << _currentPrimaryMember()->getHostAndPort(); - _syncSource = HostAndPort(); - return _syncSource; - } else if (_currentPrimaryIndex == _selfIndex) { - LOG(1) - << "Cannot select a sync source because chaining is not allowed and we are primary"; - _syncSource = HostAndPort(); - return _syncSource; - } else if (_memberData.at(_currentPrimaryIndex).getLastAppliedOpTime() < - lastOpTimeFetched) { - LOG(1) << "Cannot select a sync source because chaining is not allowed and the primary " - "is behind me. Last oplog optime of primary {}: {}, my last fetched oplog " - "optime: {}"_format( - _currentPrimaryMember()->getHostAndPort(), - _memberData.at(_currentPrimaryIndex).getLastAppliedOpTime().toBSON(), - lastOpTimeFetched.toBSON()); - _syncSource = HostAndPort(); - return _syncSource; - } else { - _syncSource = _currentPrimaryMember()->getHostAndPort(); - log() << "chaining not allowed, choosing primary as sync source candidate: " - << _syncSource; - std::string msg(str::stream() << "syncing from primary: " << _syncSource.toString()); - setMyHeartbeatMessage(now, msg); + } else if (readPreference == ReadPreference::PrimaryPreferred) { + // If we prefer the primary, try it first. + _syncSource = _choosePrimaryAsSyncSource(now, lastOpTimeFetched); + if (!_syncSource.empty()) { return _syncSource; } } + _syncSource = _chooseNearbySyncSource(now, lastOpTimeFetched, readPreference); + return _syncSource; +} + +HostAndPort TopologyCoordinator::_chooseNearbySyncSource(Date_t now, + const OpTime& lastOpTimeFetched, + ReadPreference readPreference) { + // We should have handled PrimaryOnly before calling this. + invariant(readPreference != ReadPreference::PrimaryOnly); // find the member with the lowest ping time that is ahead of me @@ -353,6 +295,17 @@ HostAndPort TopologyCoordinator::chooseNewSyncSource(Date_t now, continue; } + // Disallow the primary for first or all attempts depending on the readPreference. + if (readPreference == ReadPreference::SecondaryOnly || + (readPreference == ReadPreference::SecondaryPreferred && attempts == 0)) { + if (it->getState().primary()) { + LOG(2) << "Cannot select sync source because it is a primary and we are " + "looking for a secondary: " + << itMemberConfig.getHostAndPort(); + continue; + } + } + // On the first attempt, we skip candidates that do not match these criteria. if (attempts == 0) { // Candidate must be a voter if we are a voter. @@ -441,6 +394,103 @@ HostAndPort TopologyCoordinator::chooseNewSyncSource(Date_t now, return _syncSource; } +boost::optional<HostAndPort> TopologyCoordinator::_chooseSyncSourceInitialStep(Date_t now) { + // If we are not a member of the current replica set configuration, no sync source is valid. + if (_selfIndex == -1) { + LOG(1) << "Cannot sync from any members because we are not in the replica set config"; + return HostAndPort(); + } + + MONGO_FAIL_POINT_BLOCK(forceSyncSourceCandidate, customArgs) { + const auto& data = customArgs.getData(); + const auto hostAndPortElem = data["hostAndPort"]; + if (!hostAndPortElem) { + severe() << "'forceSyncSoureCandidate' parameter set with invalid host and port: " + << data; + fassertFailed(50835); + } + + const auto hostAndPort = HostAndPort(hostAndPortElem.checkAndGetStringData()); + const int syncSourceIndex = _rsConfig.findMemberIndexByHostAndPort(hostAndPort); + if (syncSourceIndex < 0) { + log() << "'forceSyncSourceCandidate' failed due to host and port not in " + "replica set config: " + << hostAndPort.toString(); + fassertFailed(50836); + } + + + if (_memberIsBlacklisted(_rsConfig.getMemberAt(syncSourceIndex), now)) { + log() << "Cannot select a sync source because forced candidate is blacklisted: " + << hostAndPort.toString(); + return HostAndPort(); + } + + auto syncSource = _rsConfig.getMemberAt(syncSourceIndex).getHostAndPort(); + log() << "choosing sync source candidate due to 'forceSyncSourceCandidate' parameter: " + << syncSource; + std::string msg(str::stream() << "syncing from: " << syncSource.toString() + << " by 'forceSyncSourceCandidate' parameter"); + setMyHeartbeatMessage(now, msg); + return syncSource; + } + + // if we have a target we've requested to sync from, use it + if (_forceSyncSourceIndex != -1) { + invariant(_forceSyncSourceIndex < _rsConfig.getNumMembers()); + auto syncSource = _rsConfig.getMemberAt(_forceSyncSourceIndex).getHostAndPort(); + _forceSyncSourceIndex = -1; + log() << "choosing sync source candidate by request: " << syncSource; + std::string msg(str::stream() + << "syncing from: " << syncSource.toString() << " by request"); + setMyHeartbeatMessage(now, msg); + return syncSource; + } + + // wait for 2N pings (not counting ourselves) before choosing a sync target + int needMorePings = (_memberData.size() - 1) * 2 - pingsInConfig; + + if (needMorePings > 0) { + static Occasionally sampler; + if (sampler.tick()) { + log() << "waiting for " << needMorePings << " pings from other members before syncing"; + } + return HostAndPort(); + } + return boost::none; +} + +HostAndPort TopologyCoordinator::_choosePrimaryAsSyncSource(Date_t now, + const OpTime& lastOpTimeFetched) { + if (_currentPrimaryIndex == -1) { + LOG(1) << "Cannot select the primary as sync source because" + "the primary is unknown/down"; + return HostAndPort(); + } else if (_memberIsBlacklisted(*getCurrentPrimaryMember(), now)) { + LOG(1) << "Cannot select the primary as sync source because the primary " + "member is blacklisted: " + << getCurrentPrimaryMember()->getHostAndPort(); + return HostAndPort(); + } else if (_currentPrimaryIndex == _selfIndex) { + LOG(1) << "Cannot select the primary as sync source because this node is primary."; + return HostAndPort(); + } else if (_memberData.at(_currentPrimaryIndex).getLastAppliedOpTime() < lastOpTimeFetched) { + LOG(1) << "Cannot select the primary as sync source because the primary " + "is behind me. Last oplog optime of primary {}: {}, my last fetched oplog " + "optime: {}"_format( + getCurrentPrimaryMember()->getHostAndPort(), + _memberData.at(_currentPrimaryIndex).getLastAppliedOpTime().toBSON(), + lastOpTimeFetched.toBSON()); + return HostAndPort(); + } else { + auto syncSource = getCurrentPrimaryMember()->getHostAndPort(); + log() << "Choosing primary as sync source candidate: " << syncSource; + std::string msg(str::stream() << "syncing from primary: " << syncSource.toString()); + setMyHeartbeatMessage(now, msg); + return syncSource; + } +} + bool TopologyCoordinator::_memberIsBlacklisted(const MemberConfig& memberConfig, Date_t now) const { std::map<HostAndPort, Date_t>::const_iterator blacklisted = _syncSourceBlacklist.find(memberConfig.getHostAndPort()); @@ -813,6 +863,7 @@ HeartbeatResponseAction TopologyCoordinator::processHeartbeatResponse( } else { ReplSetHeartbeatResponse hbr = std::move(hbResponse.getValue()); LOG(3) << "setUpValues: heartbeat response good for member _id:" << member.getId(); + pingsInConfig++; advancedOpTime = hbData.setUpValues(now, std::move(hbr)); } @@ -1383,7 +1434,7 @@ void TopologyCoordinator::setCurrentPrimary_forTest(int primaryIndex, } } -const MemberConfig* TopologyCoordinator::_currentPrimaryMember() const { +const MemberConfig* TopologyCoordinator::getCurrentPrimaryMember() const { if (_currentPrimaryIndex == -1) return NULL; @@ -1741,7 +1792,7 @@ void TopologyCoordinator::fillIsMasterForReplSet(IsMasterResponse* const respons response->setIsMaster(myState.primary() && !isSteppingDown()); response->setIsSecondary(myState.secondary()); - const MemberConfig* curPrimary = _currentPrimaryMember(); + const MemberConfig* curPrimary = getCurrentPrimaryMember(); if (curPrimary) { response->setPrimary(curPrimary->getHostAndPort(horizon)); } @@ -1854,6 +1905,8 @@ void TopologyCoordinator::_updateHeartbeatDataForReconfig(const ReplSetConfig& n _memberData.clear(); // We're not in the config, we can't sync any more. _syncSource = HostAndPort(); + // We shouldn't get a sync source until we've received pings for our new config. + pingsInConfig = 0; MemberData newHeartbeatData; for (auto&& oldMemberData : oldHeartbeats) { if (oldMemberData.isSelf()) { diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h index db0bf31428d..ba17d534016 100644 --- a/src/mongo/db/repl/topology_coordinator.h +++ b/src/mongo/db/repl/topology_coordinator.h @@ -32,6 +32,7 @@ #include <iosfwd> #include <string> +#include "mongo/client/read_preference.h" #include "mongo/db/repl/is_master_response.h" #include "mongo/db/repl/last_vote.h" #include "mongo/db/repl/repl_set_heartbeat_args_v1.h" @@ -186,10 +187,15 @@ public: /** * Chooses and sets a new sync source, based on our current knowledge of the world. + * If chaining is disabled in the configuration and chainingPreference is kUseConfiguration, + * only the primary will be selected (regardless of read preference). Otherwise, + * the readPreference is respected. Chaining disabled with SecondaryOnly read preference is + * not allowed. */ HostAndPort chooseNewSyncSource(Date_t now, const OpTime& lastOpTimeFetched, - ChainingPreference chainingPreference); + ChainingPreference chainingPreference, + ReadPreference readPreference); /** * Suppresses selecting "host" as sync source until "until". @@ -709,6 +715,11 @@ public: bool checkIfCommitQuorumIsSatisfied(const CommitQuorumOptions& commitQuorum, const std::vector<HostAndPort>& commitReadyMembers) const; + /** + * Returns nullptr if there is no primary, or the MemberConfig* for the current primary. + */ + const MemberConfig* getCurrentPrimaryMember() const; + //////////////////////////////////////////////////////////// // // Test support methods @@ -791,6 +802,25 @@ private: // Returns the number of heartbeat pings which have occurred. int _getTotalPings(); + // Does preliminary checks involved in choosing sync source + // * Do we have a valid configuration? + // * Do we have a forced sync source? + // * Have we gotten enough pings? + // Returns a HostAndPort if one is decided (may be empty), boost:none if we need to move to the + // next step. + boost::optional<HostAndPort> _chooseSyncSourceInitialStep(Date_t now); + + // Returns the primary node if it is a valid sync source, otherwise returns an empty + // HostAndPort. + HostAndPort _choosePrimaryAsSyncSource(Date_t now, const OpTime& lastOpTimeFetched); + + // Chooses a sync source among available nodes. ReadPreference may be any value but + // PrimaryOnly, but PrimaryPreferred is treated the same as "Nearest" (it is assumed + // the caller will handle PrimaryPreferred by trying _choosePrimaryAsSyncSource() first) + HostAndPort _chooseNearbySyncSource(Date_t now, + const OpTime& lastOpTimeFetched, + ReadPreference readPreference); + // Returns the current "ping" value for the given member by their address Milliseconds _getPing(const HostAndPort& host); @@ -842,9 +872,6 @@ private: */ MemberData* _findMemberDataByMemberId(const int memberId); - // Returns NULL if there is no primary, or the MemberConfig* for the current primary - const MemberConfig* _currentPrimaryMember() const; - /** * Performs updating "_currentPrimaryIndex" for processHeartbeatResponse(), and determines if an * election or stepdown should commence. @@ -974,6 +1001,9 @@ private: typedef std::map<HostAndPort, PingStats> PingMap; // Ping stats for each member by HostAndPort; PingMap _pings; + // For the purpose of deciding on a sync source, we count only pings for nodes which are in our + // current config. + int pingsInConfig = 0; // V1 last vote info for elections LastVote _lastVote{OpTime::kInitialTerm, -1}; diff --git a/src/mongo/db/repl/topology_coordinator_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_v1_test.cpp index 3e7de8908b9..d17dc192d57 100644 --- a/src/mongo/db/repl/topology_coordinator_v1_test.cpp +++ b/src/mongo/db/repl/topology_coordinator_v1_test.cpp @@ -43,6 +43,7 @@ #include "mongo/logger/logger.h" #include "mongo/rpc/metadata/oplog_query_metadata.h" #include "mongo/rpc/metadata/repl_set_metadata.h" +#include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" #include "mongo/util/assert_util.h" #include "mongo/util/net/hostandport.h" @@ -320,7 +321,10 @@ private: TEST_F(TopoCoordTest, NodeReturnsSecondaryWithMostRecentDataAsSyncSource) { // if we do not have an index in the config, we should get an empty syncsource HostAndPort newSyncSource = getTopoCoord().chooseNewSyncSource( - now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, + OpTime(), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest); ASSERT_TRUE(newSyncSource.empty()); updateConfig(BSON("_id" @@ -345,7 +349,10 @@ TEST_F(TopoCoordTest, NodeReturnsSecondaryWithMostRecentDataAsSyncSource) { // Fail due to insufficient number of pings newSyncSource = getTopoCoord().chooseNewSyncSource( - now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, + OpTime(), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest); ASSERT_EQUALS(getTopoCoord().getSyncSourceAddress(), newSyncSource); ASSERT(getTopoCoord().getSyncSourceAddress().empty()); @@ -356,42 +363,55 @@ TEST_F(TopoCoordTest, NodeReturnsSecondaryWithMostRecentDataAsSyncSource) { // Should choose h2, since it is furthest ahead newSyncSource = getTopoCoord().chooseNewSyncSource( - now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, + OpTime(), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest); ASSERT_EQUALS(getTopoCoord().getSyncSourceAddress(), newSyncSource); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); // h3 becomes further ahead, so it should be chosen heartbeatFromMember( HostAndPort("h3"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(2, 0), 0)); - getTopoCoord().chooseNewSyncSource( - now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + getTopoCoord().chooseNewSyncSource(now()++, + OpTime(), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest); ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); // h3 becomes an invalid candidate for sync source; should choose h2 again heartbeatFromMember( HostAndPort("h3"), "rs0", MemberState::RS_RECOVERING, OpTime(Timestamp(2, 0), 0)); - getTopoCoord().chooseNewSyncSource( - now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + getTopoCoord().chooseNewSyncSource(now()++, + OpTime(), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); // h3 back in SECONDARY and ahead heartbeatFromMember( HostAndPort("h3"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(2, 0), 0)); - getTopoCoord().chooseNewSyncSource( - now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + getTopoCoord().chooseNewSyncSource(now()++, + OpTime(), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest); ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); // h3 goes down receiveDownHeartbeat(HostAndPort("h3"), "rs0"); - getTopoCoord().chooseNewSyncSource( - now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + getTopoCoord().chooseNewSyncSource(now()++, + OpTime(), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); // h3 back up and ahead heartbeatFromMember( HostAndPort("h3"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(2, 0), 0)); - getTopoCoord().chooseNewSyncSource( - now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + getTopoCoord().chooseNewSyncSource(now()++, + OpTime(), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest); ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); } @@ -504,8 +524,10 @@ TEST_F(TopoCoordTest, NodeReturnsClosestValidSyncSourceAsSyncSource) { Milliseconds(100)); // Should choose primary first; it's closest - getTopoCoord().chooseNewSyncSource( - now()++, lastOpTimeWeApplied, TopologyCoordinator::ChainingPreference::kUseConfiguration); + getTopoCoord().chooseNewSyncSource(now()++, + lastOpTimeWeApplied, + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest); ASSERT_EQUALS(HostAndPort("hprimary"), getTopoCoord().getSyncSourceAddress()); // Primary goes far far away @@ -517,40 +539,52 @@ TEST_F(TopoCoordTest, NodeReturnsClosestValidSyncSourceAsSyncSource) { // Should choose h4. (if an arbiter has an oplog, it's a valid sync source) // h6 is not considered because it is outside the maxSyncLagSeconds window. - getTopoCoord().chooseNewSyncSource( - now()++, lastOpTimeWeApplied, TopologyCoordinator::ChainingPreference::kUseConfiguration); + getTopoCoord().chooseNewSyncSource(now()++, + lastOpTimeWeApplied, + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest); ASSERT_EQUALS(HostAndPort("h4"), getTopoCoord().getSyncSourceAddress()); // h4 goes down; should choose h1 receiveDownHeartbeat(HostAndPort("h4"), "rs0"); - getTopoCoord().chooseNewSyncSource( - now()++, lastOpTimeWeApplied, TopologyCoordinator::ChainingPreference::kUseConfiguration); + getTopoCoord().chooseNewSyncSource(now()++, + lastOpTimeWeApplied, + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest); ASSERT_EQUALS(HostAndPort("h1"), getTopoCoord().getSyncSourceAddress()); // Primary and h1 go down; should choose h6 receiveDownHeartbeat(HostAndPort("h1"), "rs0"); receiveDownHeartbeat(HostAndPort("hprimary"), "rs0"); ASSERT_EQUALS(-1, getCurrentPrimaryIndex()); - getTopoCoord().chooseNewSyncSource( - now()++, lastOpTimeWeApplied, TopologyCoordinator::ChainingPreference::kUseConfiguration); + getTopoCoord().chooseNewSyncSource(now()++, + lastOpTimeWeApplied, + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest); ASSERT_EQUALS(HostAndPort("h6"), getTopoCoord().getSyncSourceAddress()); // h6 goes down; should choose h5 receiveDownHeartbeat(HostAndPort("h6"), "rs0"); - getTopoCoord().chooseNewSyncSource( - now()++, lastOpTimeWeApplied, TopologyCoordinator::ChainingPreference::kUseConfiguration); + getTopoCoord().chooseNewSyncSource(now()++, + lastOpTimeWeApplied, + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest); ASSERT_EQUALS(HostAndPort("h5"), getTopoCoord().getSyncSourceAddress()); // h5 goes down; should choose h3 receiveDownHeartbeat(HostAndPort("h5"), "rs0"); - getTopoCoord().chooseNewSyncSource( - now()++, lastOpTimeWeApplied, TopologyCoordinator::ChainingPreference::kUseConfiguration); + getTopoCoord().chooseNewSyncSource(now()++, + lastOpTimeWeApplied, + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest); ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); // h3 goes down; no sync source candidates remain receiveDownHeartbeat(HostAndPort("h3"), "rs0"); - getTopoCoord().chooseNewSyncSource( - now()++, lastOpTimeWeApplied, TopologyCoordinator::ChainingPreference::kUseConfiguration); + getTopoCoord().chooseNewSyncSource(now()++, + lastOpTimeWeApplied, + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest); ASSERT(getTopoCoord().getSyncSourceAddress().empty()); } @@ -592,14 +626,18 @@ TEST_F(TopoCoordTest, NodeWontChooseSyncSourceFromOlderTerm) { OpTime(Timestamp(300, 0), 2), // old term Milliseconds(100)); - getTopoCoord().chooseNewSyncSource( - now()++, lastOpTimeWeApplied, TopologyCoordinator::ChainingPreference::kUseConfiguration); + getTopoCoord().chooseNewSyncSource(now()++, + lastOpTimeWeApplied, + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest); ASSERT_EQUALS(HostAndPort("h1"), getTopoCoord().getSyncSourceAddress()); // h1 goes down; no sync source candidates remain receiveDownHeartbeat(HostAndPort("h1"), "rs0"); - getTopoCoord().chooseNewSyncSource( - now()++, lastOpTimeWeApplied, TopologyCoordinator::ChainingPreference::kUseConfiguration); + getTopoCoord().chooseNewSyncSource(now()++, + lastOpTimeWeApplied, + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest); ASSERT(getTopoCoord().getSyncSourceAddress().empty()); } @@ -641,10 +679,12 @@ TEST_F(TopoCoordTest, ChooseOnlyPrimaryAsSyncSourceWhenChainingIsDisallowed) { Milliseconds(300)); // No primary situation: should choose no sync source. - ASSERT_EQUALS( - HostAndPort(), - getTopoCoord().chooseNewSyncSource( - now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration)); + ASSERT_EQUALS(HostAndPort(), + getTopoCoord().chooseNewSyncSource( + now()++, + OpTime(), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest)); ASSERT(getTopoCoord().getSyncSourceAddress().empty()); // Add primary @@ -662,7 +702,8 @@ TEST_F(TopoCoordTest, ChooseOnlyPrimaryAsSyncSourceWhenChainingIsDisallowed) { getTopoCoord().chooseNewSyncSource( now()++, OpTime(Timestamp(10, 0), 0), - TopologyCoordinator::ChainingPreference::kUseConfiguration)); + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest)); ASSERT_EQUALS(HostAndPort(), getTopoCoord().getSyncSourceAddress()); // Update the primary's position. @@ -679,16 +720,45 @@ TEST_F(TopoCoordTest, ChooseOnlyPrimaryAsSyncSourceWhenChainingIsDisallowed) { getTopoCoord().chooseNewSyncSource( now()++, OpTime(Timestamp(10, 0), 0), - TopologyCoordinator::ChainingPreference::kUseConfiguration)); + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest)); ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); - // When we are in catch-up mode, the chainingAllowed setting is ignored. h2 should be chosen as - // the sync source. - ASSERT_EQUALS(HostAndPort("h2"), + // We should choose primary h3 regardless of read preference. + ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().chooseNewSyncSource( now()++, OpTime(Timestamp(10, 0), 0), - TopologyCoordinator::ChainingPreference::kAllowChaining)); + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::PrimaryOnly)); + ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); + + ASSERT_EQUALS(HostAndPort("h3"), + getTopoCoord().chooseNewSyncSource( + now()++, + OpTime(Timestamp(10, 0), 0), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::PrimaryPreferred)); + ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); + + ASSERT_EQUALS(HostAndPort("h3"), + getTopoCoord().chooseNewSyncSource( + now()++, + OpTime(Timestamp(10, 0), 0), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::SecondaryPreferred)); + ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); + + // We do not test SecondaryOnly here because that results in an fassert. + + // When we are in catch-up mode, the chainingAllowed setting is ignored. h2 should be chosen as + // the sync source. + ASSERT_EQUALS( + HostAndPort("h2"), + getTopoCoord().chooseNewSyncSource(now()++, + OpTime(Timestamp(10, 0), 0), + TopologyCoordinator::ChainingPreference::kAllowChaining, + ReadPreference::Nearest)); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); // Become primary: should not choose self as sync source. @@ -699,13 +769,61 @@ TEST_F(TopoCoordTest, ChooseOnlyPrimaryAsSyncSourceWhenChainingIsDisallowed) { Milliseconds(300)); makeSelfPrimary(Timestamp(3.0)); ASSERT_EQUALS(0, getCurrentPrimaryIndex()); - ASSERT_EQUALS( - HostAndPort(), - getTopoCoord().chooseNewSyncSource( - now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration)); + ASSERT_EQUALS(HostAndPort(), + getTopoCoord().chooseNewSyncSource( + now()++, + OpTime(), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest)); ASSERT(getTopoCoord().getSyncSourceAddress().empty()); } +DEATH_TEST_F(TopoCoordTest, SecondaryOnlyAssertsWhenChainingNotAllowed, "3873102") { + updateConfig(BSON("_id" + << "rs0" + << "version" << 1 << "settings" << BSON("chainingAllowed" << false) + << "members" + << BSON_ARRAY(BSON("_id" << 10 << "host" + << "hself") + << BSON("_id" << 20 << "host" + << "h2") + << BSON("_id" << 30 << "host" + << "h3"))), + 0); + + setSelfMemberState(MemberState::RS_SECONDARY); + + heartbeatFromMember(HostAndPort("h2"), + "rs0", + MemberState::RS_SECONDARY, + OpTime(Timestamp(11, 0), 0), + Milliseconds(100)); + heartbeatFromMember(HostAndPort("h2"), + "rs0", + MemberState::RS_SECONDARY, + OpTime(Timestamp(11, 0), 0), + Milliseconds(100)); + heartbeatFromMember(HostAndPort("h3"), + "rs0", + MemberState::RS_SECONDARY, + OpTime(Timestamp(0, 0), 0), + Milliseconds(300)); + + // Set h3 as primary. + heartbeatFromMember(HostAndPort("h3"), + "rs0", + MemberState::RS_PRIMARY, + OpTime(Timestamp(10, 0), 0), + Milliseconds(300)); + + // Attempting to choose a sync source with SecondaryOnly and chaining disabled + // results in an fassert. + getTopoCoord().chooseNewSyncSource(now()++, + OpTime(Timestamp(10, 0), 0), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::SecondaryOnly); +} + TEST_F(TopoCoordTest, ChooseOnlyVotersAsSyncSourceWhenNodeIsAVoter) { updateConfig(fromjson("{_id:'rs0', version:1, members:[" "{_id:10, host:'hself'}, " @@ -729,18 +847,27 @@ TEST_F(TopoCoordTest, ChooseOnlyVotersAsSyncSourceWhenNodeIsAVoter) { // Should choose h3 as it is a voter auto newSource = getTopoCoord().chooseNewSyncSource( - now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, + OpTime(), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest); ASSERT_EQUALS(h3, newSource); // Can't choose h2 as it is not a voter newSource = getTopoCoord().chooseNewSyncSource( - now()++, ot10, TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, + ot10, + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest); ASSERT_EQUALS(HostAndPort(), newSource); // Should choose h3 as it is a voter, and ahead heartbeatFromMember(h3, "rs0", MemberState::RS_SECONDARY, ot5, hbRTT300); newSource = getTopoCoord().chooseNewSyncSource( - now()++, ot1, TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, + ot1, + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest); ASSERT_EQUALS(h3, newSource); } @@ -781,10 +908,12 @@ TEST_F(TopoCoordTest, ChooseSameSyncSourceEvenWhenPrimary) { Milliseconds(300)); // No primary situation: should choose h2 sync source. - ASSERT_EQUALS( - HostAndPort("h2"), - getTopoCoord().chooseNewSyncSource( - now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration)); + ASSERT_EQUALS(HostAndPort("h2"), + getTopoCoord().chooseNewSyncSource( + now()++, + OpTime(), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest)); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); // Become primary @@ -792,10 +921,12 @@ TEST_F(TopoCoordTest, ChooseSameSyncSourceEvenWhenPrimary) { ASSERT_EQUALS(0, getCurrentPrimaryIndex()); // Choose same sync source even when primary. - ASSERT_EQUALS( - HostAndPort("h2"), - getTopoCoord().chooseNewSyncSource( - now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration)); + ASSERT_EQUALS(HostAndPort("h2"), + getTopoCoord().chooseNewSyncSource( + now()++, + OpTime(), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest)); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); } @@ -826,8 +957,10 @@ TEST_F(TopoCoordTest, ChooseRequestedSyncSourceOnlyTheFirstTimeAfterTheSyncSourc HostAndPort("h3"), "rs0", MemberState::RS_SECONDARY, newOpTime, Milliseconds(100)); // force should overrule other defaults - getTopoCoord().chooseNewSyncSource( - now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + getTopoCoord().chooseNewSyncSource(now()++, + OpTime(), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest); ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); getTopoCoord().setForceSyncSourceIndex(1); // force should cause shouldChangeSyncSource() to return true @@ -840,13 +973,17 @@ TEST_F(TopoCoordTest, ChooseRequestedSyncSourceOnlyTheFirstTimeAfterTheSyncSourc HostAndPort("h2"), makeReplSetMetadata(oldOpTime), boost::none, now())); ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( HostAndPort("h3"), makeReplSetMetadata(newOpTime), boost::none, now())); - getTopoCoord().chooseNewSyncSource( - now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + getTopoCoord().chooseNewSyncSource(now()++, + OpTime(), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); // force should only work for one call to chooseNewSyncSource - getTopoCoord().chooseNewSyncSource( - now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + getTopoCoord().chooseNewSyncSource(now()++, + OpTime(), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest); ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); } @@ -886,20 +1023,26 @@ TEST_F(TopoCoordTest, NodeDoesNotChooseBlacklistedSyncSourceUntilBlacklistingExp OpTime(Timestamp(2, 0), 0), Milliseconds(100)); - getTopoCoord().chooseNewSyncSource( - now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + getTopoCoord().chooseNewSyncSource(now()++, + OpTime(), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest); ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); Date_t expireTime = Date_t::fromMillisSinceEpoch(1000); getTopoCoord().blacklistSyncSource(HostAndPort("h3"), expireTime); - getTopoCoord().chooseNewSyncSource( - now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + getTopoCoord().chooseNewSyncSource(now()++, + OpTime(), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest); // Should choose second best choice now that h3 is blacklisted. ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); // After time has passed, should go back to original sync source - getTopoCoord().chooseNewSyncSource( - expireTime, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + getTopoCoord().chooseNewSyncSource(expireTime, + OpTime(), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest); ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); } @@ -941,21 +1084,439 @@ TEST_F(TopoCoordTest, ChooseNoSyncSourceWhenPrimaryIsBlacklistedAndChainingIsDis OpTime(Timestamp(2, 0), 0), Milliseconds(100)); - getTopoCoord().chooseNewSyncSource( - now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + getTopoCoord().chooseNewSyncSource(now()++, + OpTime(), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); Date_t expireTime = Date_t::fromMillisSinceEpoch(1000); getTopoCoord().blacklistSyncSource(HostAndPort("h2"), expireTime); - getTopoCoord().chooseNewSyncSource( - now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + getTopoCoord().chooseNewSyncSource(now()++, + OpTime(), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest); // Can't choose any sync source now. ASSERT(getTopoCoord().getSyncSourceAddress().empty()); // After time has passed, should go back to the primary - getTopoCoord().chooseNewSyncSource( - expireTime, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + getTopoCoord().chooseNewSyncSource(expireTime, + OpTime(), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest); + ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); +} + +TEST_F(TopoCoordTest, ChooseOnlyPrimaryAsSyncSourceWhenReadPreferenceIsPrimaryOnly) { + updateConfig(BSON("_id" + << "rs0" + << "version" << 1 << "settings" << BSON("chainingAllowed" << true) + << "members" + << BSON_ARRAY(BSON("_id" << 10 << "host" + << "hself") + << BSON("_id" << 20 << "host" + << "h2") + << BSON("_id" << 30 << "host" + << "h3"))), + 0); + + // We use readPreference nearest only when in initialSync. + setSelfMemberState(MemberState::RS_STARTUP2); + + heartbeatFromMember(HostAndPort("h2"), + "rs0", + MemberState::RS_SECONDARY, + OpTime(Timestamp(11, 0), 0), + Milliseconds(100)); + heartbeatFromMember(HostAndPort("h2"), + "rs0", + MemberState::RS_SECONDARY, + OpTime(Timestamp(11, 0), 0), + Milliseconds(100)); + heartbeatFromMember(HostAndPort("h3"), + "rs0", + MemberState::RS_SECONDARY, + OpTime(Timestamp(0, 0), 0), + Milliseconds(300)); + heartbeatFromMember(HostAndPort("h3"), + "rs0", + MemberState::RS_SECONDARY, + OpTime(Timestamp(0, 0), 0), + Milliseconds(300)); + + // No primary situation: should choose no sync source. + ASSERT_EQUALS(HostAndPort(), + getTopoCoord().chooseNewSyncSource( + now()++, + OpTime(), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::PrimaryOnly)); + ASSERT(getTopoCoord().getSyncSourceAddress().empty()); + + // Add primary + ASSERT_EQUALS(-1, getCurrentPrimaryIndex()); + heartbeatFromMember(HostAndPort("h3"), + "rs0", + MemberState::RS_PRIMARY, + OpTime(Timestamp(0, 0), 0), + Milliseconds(300)); + ASSERT_EQUALS(2, getCurrentPrimaryIndex()); + + // h3 is primary, but its last applied isn't as up-to-date as ours, so it cannot be chosen + // as the sync source. + ASSERT_EQUALS(HostAndPort(), + getTopoCoord().chooseNewSyncSource( + now()++, + OpTime(Timestamp(10, 0), 0), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::PrimaryOnly)); + ASSERT_EQUALS(HostAndPort(), getTopoCoord().getSyncSourceAddress()); + + // Update the primary's position. + heartbeatFromMember(HostAndPort("h3"), + "rs0", + MemberState::RS_PRIMARY, + OpTime(Timestamp(10, 0), 0), + Milliseconds(300)); + + // h3 is primary and should be chosen as the sync source, despite being further away than h2 + // and the primary (h3) being at our most recently applied optime. + ASSERT_EQUALS(HostAndPort("h3"), + getTopoCoord().chooseNewSyncSource( + now()++, + OpTime(Timestamp(10, 0), 0), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::PrimaryOnly)); + ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); + + // Become primary: should not choose self as sync source. + heartbeatFromMember(HostAndPort("h3"), + "rs0", + MemberState::RS_SECONDARY, + OpTime(Timestamp(0, 0), 0), + Milliseconds(300)); + makeSelfPrimary(Timestamp(3.0)); + ASSERT_EQUALS(0, getCurrentPrimaryIndex()); + ASSERT_EQUALS(HostAndPort(), + getTopoCoord().chooseNewSyncSource( + now()++, + OpTime(), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::PrimaryOnly)); + ASSERT(getTopoCoord().getSyncSourceAddress().empty()); +} + +TEST_F(TopoCoordTest, ChooseOnlySecondaryAsSyncSourceWhenReadPreferenceIsSecondaryOnly) { + updateConfig(BSON("_id" + << "rs0" + << "version" << 1 << "settings" << BSON("chainingAllowed" << true) + << "members" + << BSON_ARRAY(BSON("_id" << 10 << "host" + << "hself") + << BSON("_id" << 20 << "host" + << "h2") + << BSON("_id" << 30 << "host" + << "h3"))), + 0); + + // We use readPreference nearest only when in initialSync. + setSelfMemberState(MemberState::RS_STARTUP2); + + heartbeatFromMember(HostAndPort("h2"), + "rs0", + MemberState::RS_RECOVERING, + OpTime(Timestamp(0, 0), 0), + Milliseconds(300)); + heartbeatFromMember(HostAndPort("h2"), + "rs0", + MemberState::RS_RECOVERING, + OpTime(Timestamp(0, 0), 0), + Milliseconds(300)); + heartbeatFromMember(HostAndPort("h3"), + "rs0", + MemberState::RS_PRIMARY, + OpTime(Timestamp(11, 0), 0), + Milliseconds(100)); + heartbeatFromMember(HostAndPort("h3"), + "rs0", + MemberState::RS_PRIMARY, + OpTime(Timestamp(11, 0), 0), + Milliseconds(100)); + + // No secondary situation: should choose no sync source. + ASSERT_EQUALS(HostAndPort(), + getTopoCoord().chooseNewSyncSource( + now()++, + OpTime(), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::SecondaryOnly)); + ASSERT(getTopoCoord().getSyncSourceAddress().empty()); + + // Add secondary + heartbeatFromMember(HostAndPort("h2"), + "rs0", + MemberState::RS_SECONDARY, + OpTime(Timestamp(10, 0), 0), + Milliseconds(300)); + + // h2 is a secondary, but its last applied isn't more up-to-date than ours, so it cannot be + // chosen as the sync source. + ASSERT_EQUALS(HostAndPort(), + getTopoCoord().chooseNewSyncSource( + now()++, + OpTime(Timestamp(10, 0), 0), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::SecondaryOnly)); + ASSERT_EQUALS(HostAndPort(), getTopoCoord().getSyncSourceAddress()); + + // Update h2's position. + heartbeatFromMember(HostAndPort("h2"), + "rs0", + MemberState::RS_SECONDARY, + OpTime(Timestamp(11, 0), 0), + Milliseconds(300)); + + // h2 is a secondary and should be chosen as the sync source despite being further away than the + // primary (h3). + ASSERT_EQUALS(HostAndPort("h2"), + getTopoCoord().chooseNewSyncSource( + now()++, + OpTime(Timestamp(10, 0), 0), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::SecondaryOnly)); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); + + // Become primary: should choose nearest valid secondary as sync source. + heartbeatFromMember(HostAndPort("h3"), + "rs0", + MemberState::RS_SECONDARY, + OpTime(Timestamp(11, 0), 0), + Milliseconds(100)); + makeSelfPrimary(Timestamp(3.0)); + ASSERT_EQUALS(0, getCurrentPrimaryIndex()); + ASSERT_EQUALS(HostAndPort("h3"), + getTopoCoord().chooseNewSyncSource( + now()++, + OpTime(), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::SecondaryOnly)); + ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); +} + +TEST_F(TopoCoordTest, PreferPrimaryAsSyncSourceWhenReadPreferenceIsPrimaryPreferred) { + updateConfig(BSON("_id" + << "rs0" + << "version" << 1 << "settings" << BSON("chainingAllowed" << true) + << "members" + << BSON_ARRAY(BSON("_id" << 10 << "host" + << "hself") + << BSON("_id" << 20 << "host" + << "h2") + << BSON("_id" << 30 << "host" + << "h3"))), + 0); + + // We use readPreference nearest only when in initialSync. + setSelfMemberState(MemberState::RS_STARTUP2); + + heartbeatFromMember(HostAndPort("h2"), + "rs0", + MemberState::RS_SECONDARY, + OpTime(Timestamp(11, 0), 0), + Milliseconds(100)); + heartbeatFromMember(HostAndPort("h2"), + "rs0", + MemberState::RS_SECONDARY, + OpTime(Timestamp(11, 0), 0), + Milliseconds(100)); + heartbeatFromMember(HostAndPort("h3"), + "rs0", + MemberState::RS_SECONDARY, + OpTime(Timestamp(0, 0), 0), + Milliseconds(300)); + heartbeatFromMember(HostAndPort("h3"), + "rs0", + MemberState::RS_SECONDARY, + OpTime(Timestamp(0, 0), 0), + Milliseconds(300)); + + // No primary situation: should choose h2. + ASSERT_EQUALS(HostAndPort("h2"), + getTopoCoord().chooseNewSyncSource( + now()++, + OpTime(), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::PrimaryPreferred)); + ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); + + // Add primary + ASSERT_EQUALS(-1, getCurrentPrimaryIndex()); + heartbeatFromMember(HostAndPort("h3"), + "rs0", + MemberState::RS_PRIMARY, + OpTime(Timestamp(0, 0), 0), + Milliseconds(300)); + ASSERT_EQUALS(2, getCurrentPrimaryIndex()); + + // h3 is primary, but its last applied isn't as up-to-date as ours, so it cannot be chosen + // as the sync source. + ASSERT_EQUALS(HostAndPort("h2"), + getTopoCoord().chooseNewSyncSource( + now()++, + OpTime(Timestamp(10, 0), 0), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::PrimaryPreferred)); + ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); + + // Update the primary's position. + heartbeatFromMember(HostAndPort("h3"), + "rs0", + MemberState::RS_PRIMARY, + OpTime(Timestamp(10, 0), 0), + Milliseconds(300)); + + // h3 is primary and should be chosen as the sync source, despite being further away than h2 + // and the primary (h3) being at our most recently applied optime. + ASSERT_EQUALS(HostAndPort("h3"), + getTopoCoord().chooseNewSyncSource( + now()++, + OpTime(Timestamp(10, 0), 0), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::PrimaryPreferred)); + ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); + + // Sanity check: the same test as above should return the secondary "h2" if primary is not + // preferred. + ASSERT_EQUALS(HostAndPort("h2"), + getTopoCoord().chooseNewSyncSource( + now()++, + OpTime(Timestamp(10, 0), 0), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest)); + ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); + + // Become primary: should choose closest secondary (h2). + heartbeatFromMember(HostAndPort("h3"), + "rs0", + MemberState::RS_SECONDARY, + OpTime(Timestamp(11, 0), 0), + Milliseconds(300)); + makeSelfPrimary(Timestamp(3.0)); + ASSERT_EQUALS(0, getCurrentPrimaryIndex()); + ASSERT_EQUALS(HostAndPort("h2"), + getTopoCoord().chooseNewSyncSource( + now()++, + OpTime(), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::PrimaryPreferred)); + ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); +} + +TEST_F(TopoCoordTest, PreferSecondaryAsSyncSourceWhenReadPreferenceIsSecondaryPreferred) { + updateConfig(BSON("_id" + << "rs0" + << "version" << 1 << "settings" << BSON("chainingAllowed" << true) + << "members" + << BSON_ARRAY(BSON("_id" << 10 << "host" + << "hself") + << BSON("_id" << 20 << "host" + << "h2") + << BSON("_id" << 30 << "host" + << "h3"))), + 0); + + // We use readPreference nearest only when in initialSync. + setSelfMemberState(MemberState::RS_STARTUP2); + + heartbeatFromMember(HostAndPort("h2"), + "rs0", + MemberState::RS_RECOVERING, + OpTime(Timestamp(0, 0), 0), + Milliseconds(300)); + heartbeatFromMember(HostAndPort("h2"), + "rs0", + MemberState::RS_RECOVERING, + OpTime(Timestamp(0, 0), 0), + Milliseconds(300)); + heartbeatFromMember(HostAndPort("h3"), + "rs0", + MemberState::RS_PRIMARY, + OpTime(Timestamp(11, 0), 0), + Milliseconds(100)); + heartbeatFromMember(HostAndPort("h3"), + "rs0", + MemberState::RS_PRIMARY, + OpTime(Timestamp(11, 0), 0), + Milliseconds(100)); + + // No secondary situation: should choose primary as sync source. + ASSERT_EQUALS(HostAndPort("h3"), + getTopoCoord().chooseNewSyncSource( + now()++, + OpTime(), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::SecondaryPreferred)); + ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); + + // Add secondary. + heartbeatFromMember(HostAndPort("h2"), + "rs0", + MemberState::RS_SECONDARY, + OpTime(Timestamp(10, 0), 0), + Milliseconds(300)); + + // h2 is a secondary, but its last applied isn't more up-to-date than ours, so it cannot be + // chosen as the sync source. + ASSERT_EQUALS(HostAndPort("h3"), + getTopoCoord().chooseNewSyncSource( + now()++, + OpTime(Timestamp(10, 0), 0), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::SecondaryPreferred)); + ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); + + // Update h2's position. + heartbeatFromMember(HostAndPort("h2"), + "rs0", + MemberState::RS_SECONDARY, + OpTime(Timestamp(11, 0), 0), + Milliseconds(300)); + + // h2 is a secondary and should be chosen as the sync source despite being further away than the + // primary (h3). + ASSERT_EQUALS(HostAndPort("h2"), + getTopoCoord().chooseNewSyncSource( + now()++, + OpTime(Timestamp(10, 0), 0), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::SecondaryPreferred)); + ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); + + // Sanity check: If we weren't preferring the secondary, we'd choose the primary in this + // situation. + ASSERT_EQUALS(HostAndPort("h3"), + getTopoCoord().chooseNewSyncSource( + now()++, + OpTime(Timestamp(10, 0), 0), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest)); + ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); + + // Become primary: should choose nearest valid secondary as sync source. + heartbeatFromMember(HostAndPort("h3"), + "rs0", + MemberState::RS_SECONDARY, + OpTime(Timestamp(11, 0), 0), + Milliseconds(100)); + makeSelfPrimary(Timestamp(3.0)); + ASSERT_EQUALS(0, getCurrentPrimaryIndex()); + ASSERT_EQUALS(HostAndPort("h3"), + getTopoCoord().chooseNewSyncSource( + now()++, + OpTime(), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::SecondaryPreferred)); + ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); } TEST_F(TopoCoordTest, NodeChangesToRecoveringWhenOnlyUnauthorizedNodesAreUp) { @@ -994,10 +1555,12 @@ TEST_F(TopoCoordTest, NodeChangesToRecoveringWhenOnlyUnauthorizedNodesAreUp) { OpTime(Timestamp(2, 0), 0), Milliseconds(100)); - ASSERT_EQUALS( - HostAndPort("h3"), - getTopoCoord().chooseNewSyncSource( - now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration)); + ASSERT_EQUALS(HostAndPort("h3"), + getTopoCoord().chooseNewSyncSource( + now()++, + OpTime(), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest)); ASSERT_EQUALS(MemberState::RS_SECONDARY, getTopoCoord().getMemberState().s); // Good state setup done @@ -1007,7 +1570,8 @@ TEST_F(TopoCoordTest, NodeChangesToRecoveringWhenOnlyUnauthorizedNodesAreUp) { ASSERT_TRUE(getTopoCoord() .chooseNewSyncSource(now()++, OpTime(), - TopologyCoordinator::ChainingPreference::kUseConfiguration) + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest) .empty()); ASSERT_EQUALS(MemberState::RS_SECONDARY, getTopoCoord().getMemberState().s); @@ -1017,7 +1581,8 @@ TEST_F(TopoCoordTest, NodeChangesToRecoveringWhenOnlyUnauthorizedNodesAreUp) { ASSERT_TRUE(getTopoCoord() .chooseNewSyncSource(now()++, OpTime(), - TopologyCoordinator::ChainingPreference::kUseConfiguration) + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest) .empty()); ASSERT_EQUALS(MemberState::RS_RECOVERING, getTopoCoord().getMemberState().s); @@ -1318,8 +1883,10 @@ TEST_F(TopoCoordTest, ChooseRequestedNodeWhenSyncFromRequestsAStaleNode) { ASSERT_OK(result); ASSERT_EQUALS("requested member \"h5:27017\" is more than 10 seconds behind us", response.obj()["warning"].String()); - getTopoCoord().chooseNewSyncSource( - now()++, ourOpTime, TopologyCoordinator::ChainingPreference::kUseConfiguration); + getTopoCoord().chooseNewSyncSource(now()++, + ourOpTime, + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest); ASSERT_EQUALS(HostAndPort("h5"), getTopoCoord().getSyncSourceAddress()); } @@ -1361,8 +1928,10 @@ TEST_F(TopoCoordTest, ChooseRequestedNodeWhenSyncFromRequestsAValidNode) { ASSERT_OK(result); BSONObj responseObj = response.obj(); ASSERT_FALSE(responseObj.hasField("warning")); - getTopoCoord().chooseNewSyncSource( - now()++, ourOpTime, TopologyCoordinator::ChainingPreference::kUseConfiguration); + getTopoCoord().chooseNewSyncSource(now()++, + ourOpTime, + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest); ASSERT_EQUALS(HostAndPort("h6"), getTopoCoord().getSyncSourceAddress()); } @@ -1406,7 +1975,10 @@ TEST_F(TopoCoordTest, ASSERT_FALSE(responseObj.hasField("warning")); receiveDownHeartbeat(HostAndPort("h6"), "rs0"); HostAndPort syncSource = getTopoCoord().chooseNewSyncSource( - now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, + OpTime(), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest); ASSERT_EQUALS(HostAndPort("h6"), syncSource); } @@ -1511,8 +2083,10 @@ TEST_F(TopoCoordTest, BSONObj responseObj = response.obj(); ASSERT_FALSE(responseObj.hasField("warning")); ASSERT_FALSE(responseObj.hasField("prevSyncTarget")); - getTopoCoord().chooseNewSyncSource( - now()++, ourOpTime, TopologyCoordinator::ChainingPreference::kUseConfiguration); + getTopoCoord().chooseNewSyncSource(now()++, + ourOpTime, + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest); ASSERT_EQUALS(HostAndPort("h5"), getTopoCoord().getSyncSourceAddress()); heartbeatFromMember( @@ -2075,8 +2649,10 @@ TEST_F(PrepareHeartbeatResponseV1Test, HostAndPort("h2"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(1, 0), 0)); heartbeatFromMember( HostAndPort("h2"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(1, 0), 0)); - getTopoCoord().chooseNewSyncSource( - now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + getTopoCoord().chooseNewSyncSource(now()++, + OpTime(), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest); // set up args ReplSetHeartbeatArgsV1 args; @@ -4193,8 +4769,10 @@ TEST_F(TopoCoordTest, receiveUpHeartbeat( HostAndPort("host1"), "rs0", MemberState::RS_PRIMARY, election, oplogProgress); - getTopoCoord().chooseNewSyncSource( - now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + getTopoCoord().chooseNewSyncSource(now()++, + OpTime(), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest); ASSERT_EQUALS(HostAndPort("host1"), getTopoCoord().getSyncSourceAddress()); { @@ -4281,8 +4859,10 @@ TEST_F(TopoCoordTest, replSetGetStatusForThreeMemberedReplicaSet) { HostAndPort("hprimary")); // Since chainingAllowed is disabled, hself should choose hprimary. - getTopoCoord().chooseNewSyncSource( - now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + getTopoCoord().chooseNewSyncSource(now()++, + OpTime(), + TopologyCoordinator::ChainingPreference::kUseConfiguration, + ReadPreference::Nearest); ASSERT_EQUALS(HostAndPort("hprimary"), getTopoCoord().getSyncSourceAddress()); BSONObjBuilder statusBuilder; |