diff options
-rw-r--r-- | src/mongo/client/remote_command_targeter.h | 3 | ||||
-rw-r--r-- | src/mongo/client/remote_command_targeter_factory_mock.cpp | 5 | ||||
-rw-r--r-- | src/mongo/client/remote_command_targeter_mock.cpp | 24 | ||||
-rw-r--r-- | src/mongo/client/remote_command_targeter_mock.h | 7 | ||||
-rw-r--r-- | src/mongo/client/remote_command_targeter_rs.cpp | 5 | ||||
-rw-r--r-- | src/mongo/client/remote_command_targeter_rs.h | 3 | ||||
-rw-r--r-- | src/mongo/client/remote_command_targeter_standalone.cpp | 7 | ||||
-rw-r--r-- | src/mongo/client/remote_command_targeter_standalone.h | 4 | ||||
-rw-r--r-- | src/mongo/client/replica_set_monitor.cpp | 80 | ||||
-rw-r--r-- | src/mongo/client/replica_set_monitor.h | 6 | ||||
-rw-r--r-- | src/mongo/client/replica_set_monitor_internal.h | 4 | ||||
-rw-r--r-- | src/mongo/client/replica_set_monitor_read_preference_test.cpp | 71 |
12 files changed, 192 insertions, 27 deletions
diff --git a/src/mongo/client/remote_command_targeter.h b/src/mongo/client/remote_command_targeter.h index 9adc553c2f0..af86ede82e2 100644 --- a/src/mongo/client/remote_command_targeter.h +++ b/src/mongo/client/remote_command_targeter.h @@ -81,6 +81,9 @@ public: virtual SemiFuture<HostAndPort> findHostWithMaxWait(const ReadPreferenceSetting& readPref, Milliseconds maxWait) = 0; + virtual SemiFuture<std::vector<HostAndPort>> findHostsWithMaxWait( + const ReadPreferenceSetting& readPref, Milliseconds maxWait) = 0; + /** * Reports to the targeter that a 'status' indicating a not master error was received when * communicating with 'host', and so it should update its bookkeeping to avoid giving out the diff --git a/src/mongo/client/remote_command_targeter_factory_mock.cpp b/src/mongo/client/remote_command_targeter_factory_mock.cpp index b2aef422956..8c50ccd3dd8 100644 --- a/src/mongo/client/remote_command_targeter_factory_mock.cpp +++ b/src/mongo/client/remote_command_targeter_factory_mock.cpp @@ -58,6 +58,11 @@ public: return _mock->findHostWithMaxWait(readPref, maxWait); } + SemiFuture<std::vector<HostAndPort>> findHostsWithMaxWait(const ReadPreferenceSetting& readPref, + Milliseconds maxWait) override { + return _mock->findHostsWithMaxWait(readPref, maxWait); + } + void markHostNotMaster(const HostAndPort& host, const Status& status) override { _mock->markHostNotMaster(host, status); } diff --git a/src/mongo/client/remote_command_targeter_mock.cpp b/src/mongo/client/remote_command_targeter_mock.cpp index 4cec97a246d..164b1e593b3 100644 --- a/src/mongo/client/remote_command_targeter_mock.cpp +++ b/src/mongo/client/remote_command_targeter_mock.cpp @@ -55,11 +55,24 @@ ConnectionString RemoteCommandTargeterMock::connectionString() { StatusWith<HostAndPort> RemoteCommandTargeterMock::findHost(OperationContext* opCtx, const ReadPreferenceSetting& readPref) { - return _findHostReturnValue; + if (!_findHostReturnValue.isOK()) { + return _findHostReturnValue.getStatus(); + } + + return _findHostReturnValue.getValue()[0]; } SemiFuture<HostAndPort> RemoteCommandTargeterMock::findHostWithMaxWait( const ReadPreferenceSetting& readPref, Milliseconds maxTime) { + if (!_findHostReturnValue.isOK()) { + return _findHostReturnValue.getStatus(); + } + + return _findHostReturnValue.getValue()[0]; +} + +SemiFuture<std::vector<HostAndPort>> RemoteCommandTargeterMock::findHostsWithMaxWait( + const ReadPreferenceSetting& readPref, Milliseconds maxWait) { return _findHostReturnValue; } @@ -79,6 +92,15 @@ void RemoteCommandTargeterMock::setConnectionStringReturnValue(const ConnectionS } void RemoteCommandTargeterMock::setFindHostReturnValue(StatusWith<HostAndPort> returnValue) { + if (!returnValue.isOK()) { + _findHostReturnValue = returnValue.getStatus(); + } else { + _findHostReturnValue = std::vector{returnValue.getValue()}; + } +} + +void RemoteCommandTargeterMock::setFindHostsReturnValue( + StatusWith<std::vector<HostAndPort>> returnValue) { _findHostReturnValue = std::move(returnValue); } diff --git a/src/mongo/client/remote_command_targeter_mock.h b/src/mongo/client/remote_command_targeter_mock.h index 89c7245d62b..3ea39b80d22 100644 --- a/src/mongo/client/remote_command_targeter_mock.h +++ b/src/mongo/client/remote_command_targeter_mock.h @@ -58,6 +58,9 @@ public: SemiFuture<HostAndPort> findHostWithMaxWait(const ReadPreferenceSetting& readPref, Milliseconds maxWait) override; + SemiFuture<std::vector<HostAndPort>> findHostsWithMaxWait(const ReadPreferenceSetting& readPref, + Milliseconds maxWait) override; + StatusWith<HostAndPort> findHost(OperationContext* opCtx, const ReadPreferenceSetting& readPref) override; @@ -81,6 +84,8 @@ public: */ void setFindHostReturnValue(StatusWith<HostAndPort> returnValue); + void setFindHostsReturnValue(StatusWith<std::vector<HostAndPort>> returnValue); + /** * Returns the current set of hosts marked down and resets the mock's internal list of marked * down hosts. @@ -89,7 +94,7 @@ public: private: ConnectionString _connectionStringReturnValue; - StatusWith<HostAndPort> _findHostReturnValue; + StatusWith<std::vector<HostAndPort>> _findHostReturnValue; // Protects _hostsMarkedDown. mutable stdx::mutex _mutex; diff --git a/src/mongo/client/remote_command_targeter_rs.cpp b/src/mongo/client/remote_command_targeter_rs.cpp index f4f3b86e8b6..97eae89da0d 100644 --- a/src/mongo/client/remote_command_targeter_rs.cpp +++ b/src/mongo/client/remote_command_targeter_rs.cpp @@ -66,6 +66,11 @@ SemiFuture<HostAndPort> RemoteCommandTargeterRS::findHostWithMaxWait( return _rsMonitor->getHostOrRefresh(readPref, maxWait); } +SemiFuture<std::vector<HostAndPort>> RemoteCommandTargeterRS::findHostsWithMaxWait( + const ReadPreferenceSetting& readPref, Milliseconds maxWait) { + return _rsMonitor->getHostsOrRefresh(readPref, maxWait); +} + StatusWith<HostAndPort> RemoteCommandTargeterRS::findHost(OperationContext* opCtx, const ReadPreferenceSetting& readPref) { const auto interruptStatus = opCtx->checkForInterruptNoAssert(); diff --git a/src/mongo/client/remote_command_targeter_rs.h b/src/mongo/client/remote_command_targeter_rs.h index fae6225318b..f527e84b45e 100644 --- a/src/mongo/client/remote_command_targeter_rs.h +++ b/src/mongo/client/remote_command_targeter_rs.h @@ -56,6 +56,9 @@ public: StatusWith<HostAndPort> findHost(OperationContext* opCtx, const ReadPreferenceSetting& readPref) override; + SemiFuture<std::vector<HostAndPort>> findHostsWithMaxWait(const ReadPreferenceSetting& readPref, + Milliseconds maxWait) override; + SemiFuture<HostAndPort> findHostWithMaxWait(const ReadPreferenceSetting& readPref, Milliseconds maxWait) override; diff --git a/src/mongo/client/remote_command_targeter_standalone.cpp b/src/mongo/client/remote_command_targeter_standalone.cpp index aede6abb142..17ac0bb7120 100644 --- a/src/mongo/client/remote_command_targeter_standalone.cpp +++ b/src/mongo/client/remote_command_targeter_standalone.cpp @@ -45,7 +45,12 @@ ConnectionString RemoteCommandTargeterStandalone::connectionString() { SemiFuture<HostAndPort> RemoteCommandTargeterStandalone::findHostWithMaxWait( const ReadPreferenceSetting& readPref, Milliseconds maxWait) { - return _hostAndPort; + return {_hostAndPort}; +} + +SemiFuture<std::vector<HostAndPort>> RemoteCommandTargeterStandalone::findHostsWithMaxWait( + const ReadPreferenceSetting& readPref, Milliseconds maxWait) { + return {{_hostAndPort}}; } StatusWith<HostAndPort> RemoteCommandTargeterStandalone::findHost( diff --git a/src/mongo/client/remote_command_targeter_standalone.h b/src/mongo/client/remote_command_targeter_standalone.h index 22a07642274..8783f781d0b 100644 --- a/src/mongo/client/remote_command_targeter_standalone.h +++ b/src/mongo/client/remote_command_targeter_standalone.h @@ -50,6 +50,10 @@ public: SemiFuture<HostAndPort> findHostWithMaxWait(const ReadPreferenceSetting& readPref, Milliseconds maxWait) override; + + SemiFuture<std::vector<HostAndPort>> findHostsWithMaxWait(const ReadPreferenceSetting& readPref, + Milliseconds maxWait) override; + void markHostNotMaster(const HostAndPort& host, const Status& status) override; void markHostUnreachable(const HostAndPort& host, const Status& status) override; diff --git a/src/mongo/client/replica_set_monitor.cpp b/src/mongo/client/replica_set_monitor.cpp index 25927212728..248446f5cb7 100644 --- a/src/mongo/client/replica_set_monitor.cpp +++ b/src/mongo/client/replica_set_monitor.cpp @@ -267,6 +267,22 @@ void ReplicaSetMonitor::_doScheduledRefresh(const CallbackHandle& currentHandle) SemiFuture<HostAndPort> ReplicaSetMonitor::getHostOrRefresh(const ReadPreferenceSetting& criteria, Milliseconds maxWait) { + return _getHostsOrRefresh(criteria, maxWait) + .then([](const auto& hosts) { + invariant(hosts.size()); + return hosts[0]; + }) + .semi(); +} + +SemiFuture<std::vector<HostAndPort>> ReplicaSetMonitor::getHostsOrRefresh( + const ReadPreferenceSetting& criteria, Milliseconds maxWait) { + return _getHostsOrRefresh(criteria, maxWait).semi(); +} + +Future<std::vector<HostAndPort>> ReplicaSetMonitor::_getHostsOrRefresh( + const ReadPreferenceSetting& criteria, Milliseconds maxWait) { + // If we're in shutdown, don't bother if (globalInShutdownDeprecated()) { return Status(ErrorCodes::ShutdownInProgress, "Server is shutting down"_sd); @@ -279,7 +295,7 @@ SemiFuture<HostAndPort> ReplicaSetMonitor::getHostOrRefresh(const ReadPreference // Fast path, for the failure-free case stdx::lock_guard<stdx::mutex> lk(_state->mutex); - HostAndPort out = _state->getMatchingHost(criteria); + auto out = _state->getMatchingHosts(criteria); if (!out.empty()) return {std::move(out)}; @@ -289,7 +305,7 @@ SemiFuture<HostAndPort> ReplicaSetMonitor::getHostOrRefresh(const ReadPreference // TODO look into putting all PrimaryOnly waiters on a single SharedPromise. The tricky part is // dealing with maxWait. - auto pf = makePromiseFuture<HostAndPort>(); + auto pf = makePromiseFuture<decltype(out)>(); _state->waiters.emplace_back( SetState::Waiter{_state->now() + maxWait, criteria, std::move(pf.promise)}); @@ -306,9 +322,8 @@ SemiFuture<HostAndPort> ReplicaSetMonitor::getHostOrRefresh(const ReadPreference _scheduleRefresh(_state->now() + kExpeditedRefreshPeriod, lk); } - return std::move(pf.future).semi(); + return std::move(pf.future); } - HostAndPort ReplicaSetMonitor::getMasterOrUassert() { return getHostOrRefresh(kPrimaryOnlyReadPreference).get(); } @@ -1036,25 +1051,35 @@ SetState::SetState(const MongoURI& uri, } HostAndPort SetState::getMatchingHost(const ReadPreferenceSetting& criteria) const { + auto hosts = getMatchingHosts(criteria); + + if (hosts.empty()) { + return HostAndPort(); + } + + return hosts[0]; +} + +std::vector<HostAndPort> SetState::getMatchingHosts(const ReadPreferenceSetting& criteria) const { switch (criteria.pref) { // "Prefered" read preferences are defined in terms of other preferences case ReadPreference::PrimaryPreferred: { - HostAndPort out = - getMatchingHost(ReadPreferenceSetting(ReadPreference::PrimaryOnly, criteria.tags)); + std::vector<HostAndPort> out = + getMatchingHosts(ReadPreferenceSetting(ReadPreference::PrimaryOnly, criteria.tags)); // NOTE: the spec says we should use the primary even if tags don't match if (!out.empty()) return out; - return getMatchingHost(ReadPreferenceSetting( + return getMatchingHosts(ReadPreferenceSetting( ReadPreference::SecondaryOnly, criteria.tags, criteria.maxStalenessSeconds)); } case ReadPreference::SecondaryPreferred: { - HostAndPort out = getMatchingHost(ReadPreferenceSetting( + std::vector<HostAndPort> out = getMatchingHosts(ReadPreferenceSetting( ReadPreference::SecondaryOnly, criteria.tags, criteria.maxStalenessSeconds)); if (!out.empty()) return out; // NOTE: the spec says we should use the primary even if tags don't match - return getMatchingHost( + return getMatchingHosts( ReadPreferenceSetting(ReadPreference::PrimaryOnly, criteria.tags)); } @@ -1062,8 +1087,8 @@ HostAndPort SetState::getMatchingHost(const ReadPreferenceSetting& criteria) con // NOTE: isMaster implies isUp Nodes::const_iterator it = std::find_if(nodes.begin(), nodes.end(), isMaster); if (it == nodes.end()) - return HostAndPort(); - return it->host; + return {}; + return {it->host}; } // The difference between these is handled by Node::matches @@ -1127,7 +1152,7 @@ HostAndPort SetState::getMatchingHost(const ReadPreferenceSetting& criteria) con continue; } if (matchingNodes.size() == 1) { - return matchingNodes.front()->host; + return {matchingNodes.front()->host}; } // Only consider nodes that satisfy the minOpTime @@ -1146,7 +1171,7 @@ HostAndPort SetState::getMatchingHost(const ReadPreferenceSetting& criteria) con } if (matchingNodes.size() == 1) { - return matchingNodes.front()->host; + return {matchingNodes.front()->host}; } } @@ -1163,17 +1188,26 @@ HostAndPort SetState::getMatchingHost(const ReadPreferenceSetting& criteria) con } } - // of the remaining nodes, pick one at random (or use round-robin) - if (ReplicaSetMonitor::useDeterministicHostSelection) { - // only in tests - return matchingNodes[roundRobin++ % matchingNodes.size()]->host; - } else { - // normal case - return matchingNodes[rand.nextInt32(matchingNodes.size())]->host; - }; + std::vector<HostAndPort> hosts; + std::transform(matchingNodes.begin(), + matchingNodes.end(), + std::back_inserter(hosts), + [](const auto& node) { return node->host; }); + + // Note that the host list is only deterministic (or random) for the first node. + // The rest of the list is in matchingNodes order (latency) with one element swapped + // for the first element. + if (auto bestHostIdx = ReplicaSetMonitor::useDeterministicHostSelection + ? roundRobin++ % hosts.size() + : rand.nextInt32(hosts.size())) { + using std::swap; + swap(hosts[0], hosts[bestHostIdx]); + } + + return hosts; } - return HostAndPort(); + return {}; } default: @@ -1241,7 +1275,7 @@ void SetState::notify(bool finishedScan) { continue; } - auto match = getMatchingHost(it->criteria); + auto match = getMatchingHosts(it->criteria); if (!match.empty()) { // match; it->promise.emplaceValue(std::move(match)); diff --git a/src/mongo/client/replica_set_monitor.h b/src/mongo/client/replica_set_monitor.h index b28364c6a25..92aa3a15add 100644 --- a/src/mongo/client/replica_set_monitor.h +++ b/src/mongo/client/replica_set_monitor.h @@ -90,6 +90,9 @@ public: SemiFuture<HostAndPort> getHostOrRefresh(const ReadPreferenceSetting& readPref, Milliseconds maxWait = kDefaultFindHostTimeout); + SemiFuture<std::vector<HostAndPort>> getHostsOrRefresh( + const ReadPreferenceSetting& readPref, Milliseconds maxWait = kDefaultFindHostTimeout); + /** * Returns the host we think is the current master or uasserts. * @@ -258,6 +261,9 @@ public: void runScanForMockReplicaSet(); private: + Future<std::vector<HostAndPort>> _getHostsOrRefresh(const ReadPreferenceSetting& readPref, + Milliseconds maxWait); + /** * Schedules a refresh via the task executor. (Task is automatically canceled in the d-tor.) */ diff --git a/src/mongo/client/replica_set_monitor_internal.h b/src/mongo/client/replica_set_monitor_internal.h index 04479b6dab9..c4a90b47d79 100644 --- a/src/mongo/client/replica_set_monitor_internal.h +++ b/src/mongo/client/replica_set_monitor_internal.h @@ -142,7 +142,7 @@ public: struct Waiter { Date_t deadline; ReadPreferenceSetting criteria; - Promise<HostAndPort> promise; + Promise<std::vector<HostAndPort>> promise; }; SetState(const MongoURI& uri, ReplicaSetChangeNotifier*, executor::TaskExecutor*); @@ -154,6 +154,8 @@ public: * * Note: Uses only local data and does not go over the network. */ + std::vector<HostAndPort> getMatchingHosts(const ReadPreferenceSetting& criteria) const; + HostAndPort getMatchingHost(const ReadPreferenceSetting& criteria) const; /** diff --git a/src/mongo/client/replica_set_monitor_read_preference_test.cpp b/src/mongo/client/replica_set_monitor_read_preference_test.cpp index b380e79b926..3757c62dd61 100644 --- a/src/mongo/client/replica_set_monitor_read_preference_test.cpp +++ b/src/mongo/client/replica_set_monitor_read_preference_test.cpp @@ -66,6 +66,35 @@ public: return out; } + std::vector<HostAndPort> selectNodes(const SetState::Nodes& nodes, + ReadPreference pref, + const TagSet& tagSet, + int latencyThresholdMillis, + bool* isPrimarySelected) { + invariant(!nodes.empty()); + + auto connStr = ConnectionString::forReplicaSet(kSetName, {nodes.front().host}); + auto set = makeState(MongoURI(connStr)); + set->nodes = nodes; + set->latencyThresholdMicros = latencyThresholdMillis * 1000; + + ReadPreferenceSetting criteria(pref, tagSet); + auto out = set->getMatchingHosts(criteria); + if (isPrimarySelected && !out.empty()) { + for (auto& host : out) { + Node* node = set->findNode(host); + ASSERT(node); + + if (node->isMaster) { + *isPrimarySelected = node->isMaster; + break; + } + } + } + + return out; + } + auto getThreeMemberWithTags() { SetState::Nodes nodes; @@ -137,6 +166,19 @@ TEST_F(ReadPrefTest, PrimaryOnly) { ASSERT_EQUALS("b", host.host()); } +TEST_F(ReadPrefTest, PrimaryOnlyMulti) { + auto nodes = getThreeMemberWithTags(); + TagSet tags(getDefaultTagSet()); + + bool isPrimarySelected = false; + std::vector<HostAndPort> hosts = + selectNodes(nodes, mongo::ReadPreference::PrimaryOnly, tags, 3, &isPrimarySelected); + + ASSERT(isPrimarySelected); + ASSERT_EQUALS(hosts.size(), 1ull); + ASSERT_EQUALS("b", hosts[0].host()); +} + TEST_F(ReadPrefTest, PrimaryOnlyPriNotOk) { auto nodes = getThreeMemberWithTags(); TagSet tags(getDefaultTagSet()); @@ -163,6 +205,19 @@ TEST_F(ReadPrefTest, PrimaryMissing) { ASSERT(host.empty()); } +TEST_F(ReadPrefTest, PrimaryMissingMulti) { + auto nodes = getThreeMemberWithTags(); + TagSet tags(getDefaultTagSet()); + + nodes[1].isMaster = false; + + bool isPrimarySelected = false; + std::vector<HostAndPort> hosts = + selectNodes(nodes, mongo::ReadPreference::PrimaryOnly, tags, 3, &isPrimarySelected); + + ASSERT(hosts.empty()); +} + TEST_F(ReadPrefTest, PriPrefWithPriOk) { auto nodes = getThreeMemberWithTags(); TagSet tags(getDefaultTagSet()); @@ -203,6 +258,22 @@ TEST_F(ReadPrefTest, SecOnly) { ASSERT_EQUALS("a", host.host()); } +TEST_F(ReadPrefTest, SecOnlyMulti) { + auto nodes = getThreeMemberWithTags(); + TagSet tags(getDefaultTagSet()); + + bool isPrimarySelected = false; + std::vector<HostAndPort> hosts = + selectNodes(nodes, mongo::ReadPreference::SecondaryOnly, tags, 1, &isPrimarySelected); + + ASSERT(!isPrimarySelected); + std::sort(hosts.begin(), hosts.end()); + + ASSERT_EQUALS(hosts.size(), 2ull); + ASSERT_EQUALS("a", hosts[0].host()); + ASSERT_EQUALS("c", hosts[1].host()); +} + TEST_F(ReadPrefTest, SecOnlyOnlyPriOk) { auto nodes = getThreeMemberWithTags(); TagSet tags(getDefaultTagSet()); |