summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/client/remote_command_targeter.h3
-rw-r--r--src/mongo/client/remote_command_targeter_factory_mock.cpp5
-rw-r--r--src/mongo/client/remote_command_targeter_mock.cpp24
-rw-r--r--src/mongo/client/remote_command_targeter_mock.h7
-rw-r--r--src/mongo/client/remote_command_targeter_rs.cpp5
-rw-r--r--src/mongo/client/remote_command_targeter_rs.h3
-rw-r--r--src/mongo/client/remote_command_targeter_standalone.cpp7
-rw-r--r--src/mongo/client/remote_command_targeter_standalone.h4
-rw-r--r--src/mongo/client/replica_set_monitor.cpp80
-rw-r--r--src/mongo/client/replica_set_monitor.h6
-rw-r--r--src/mongo/client/replica_set_monitor_internal.h4
-rw-r--r--src/mongo/client/replica_set_monitor_read_preference_test.cpp71
12 files changed, 192 insertions, 27 deletions
diff --git a/src/mongo/client/remote_command_targeter.h b/src/mongo/client/remote_command_targeter.h
index 9adc553c2f0..af86ede82e2 100644
--- a/src/mongo/client/remote_command_targeter.h
+++ b/src/mongo/client/remote_command_targeter.h
@@ -81,6 +81,9 @@ public:
virtual SemiFuture<HostAndPort> findHostWithMaxWait(const ReadPreferenceSetting& readPref,
Milliseconds maxWait) = 0;
+ virtual SemiFuture<std::vector<HostAndPort>> findHostsWithMaxWait(
+ const ReadPreferenceSetting& readPref, Milliseconds maxWait) = 0;
+
/**
* Reports to the targeter that a 'status' indicating a not master error was received when
* communicating with 'host', and so it should update its bookkeeping to avoid giving out the
diff --git a/src/mongo/client/remote_command_targeter_factory_mock.cpp b/src/mongo/client/remote_command_targeter_factory_mock.cpp
index b2aef422956..8c50ccd3dd8 100644
--- a/src/mongo/client/remote_command_targeter_factory_mock.cpp
+++ b/src/mongo/client/remote_command_targeter_factory_mock.cpp
@@ -58,6 +58,11 @@ public:
return _mock->findHostWithMaxWait(readPref, maxWait);
}
+ SemiFuture<std::vector<HostAndPort>> findHostsWithMaxWait(const ReadPreferenceSetting& readPref,
+ Milliseconds maxWait) override {
+ return _mock->findHostsWithMaxWait(readPref, maxWait);
+ }
+
void markHostNotMaster(const HostAndPort& host, const Status& status) override {
_mock->markHostNotMaster(host, status);
}
diff --git a/src/mongo/client/remote_command_targeter_mock.cpp b/src/mongo/client/remote_command_targeter_mock.cpp
index 4cec97a246d..164b1e593b3 100644
--- a/src/mongo/client/remote_command_targeter_mock.cpp
+++ b/src/mongo/client/remote_command_targeter_mock.cpp
@@ -55,11 +55,24 @@ ConnectionString RemoteCommandTargeterMock::connectionString() {
StatusWith<HostAndPort> RemoteCommandTargeterMock::findHost(OperationContext* opCtx,
const ReadPreferenceSetting& readPref) {
- return _findHostReturnValue;
+ if (!_findHostReturnValue.isOK()) {
+ return _findHostReturnValue.getStatus();
+ }
+
+ return _findHostReturnValue.getValue()[0];
}
SemiFuture<HostAndPort> RemoteCommandTargeterMock::findHostWithMaxWait(
const ReadPreferenceSetting& readPref, Milliseconds maxTime) {
+ if (!_findHostReturnValue.isOK()) {
+ return _findHostReturnValue.getStatus();
+ }
+
+ return _findHostReturnValue.getValue()[0];
+}
+
+SemiFuture<std::vector<HostAndPort>> RemoteCommandTargeterMock::findHostsWithMaxWait(
+ const ReadPreferenceSetting& readPref, Milliseconds maxWait) {
return _findHostReturnValue;
}
@@ -79,6 +92,15 @@ void RemoteCommandTargeterMock::setConnectionStringReturnValue(const ConnectionS
}
void RemoteCommandTargeterMock::setFindHostReturnValue(StatusWith<HostAndPort> returnValue) {
+ if (!returnValue.isOK()) {
+ _findHostReturnValue = returnValue.getStatus();
+ } else {
+ _findHostReturnValue = std::vector{returnValue.getValue()};
+ }
+}
+
+void RemoteCommandTargeterMock::setFindHostsReturnValue(
+ StatusWith<std::vector<HostAndPort>> returnValue) {
_findHostReturnValue = std::move(returnValue);
}
diff --git a/src/mongo/client/remote_command_targeter_mock.h b/src/mongo/client/remote_command_targeter_mock.h
index 89c7245d62b..3ea39b80d22 100644
--- a/src/mongo/client/remote_command_targeter_mock.h
+++ b/src/mongo/client/remote_command_targeter_mock.h
@@ -58,6 +58,9 @@ public:
SemiFuture<HostAndPort> findHostWithMaxWait(const ReadPreferenceSetting& readPref,
Milliseconds maxWait) override;
+ SemiFuture<std::vector<HostAndPort>> findHostsWithMaxWait(const ReadPreferenceSetting& readPref,
+ Milliseconds maxWait) override;
+
StatusWith<HostAndPort> findHost(OperationContext* opCtx,
const ReadPreferenceSetting& readPref) override;
@@ -81,6 +84,8 @@ public:
*/
void setFindHostReturnValue(StatusWith<HostAndPort> returnValue);
+ void setFindHostsReturnValue(StatusWith<std::vector<HostAndPort>> returnValue);
+
/**
* Returns the current set of hosts marked down and resets the mock's internal list of marked
* down hosts.
@@ -89,7 +94,7 @@ public:
private:
ConnectionString _connectionStringReturnValue;
- StatusWith<HostAndPort> _findHostReturnValue;
+ StatusWith<std::vector<HostAndPort>> _findHostReturnValue;
// Protects _hostsMarkedDown.
mutable stdx::mutex _mutex;
diff --git a/src/mongo/client/remote_command_targeter_rs.cpp b/src/mongo/client/remote_command_targeter_rs.cpp
index f4f3b86e8b6..97eae89da0d 100644
--- a/src/mongo/client/remote_command_targeter_rs.cpp
+++ b/src/mongo/client/remote_command_targeter_rs.cpp
@@ -66,6 +66,11 @@ SemiFuture<HostAndPort> RemoteCommandTargeterRS::findHostWithMaxWait(
return _rsMonitor->getHostOrRefresh(readPref, maxWait);
}
+SemiFuture<std::vector<HostAndPort>> RemoteCommandTargeterRS::findHostsWithMaxWait(
+ const ReadPreferenceSetting& readPref, Milliseconds maxWait) {
+ return _rsMonitor->getHostsOrRefresh(readPref, maxWait);
+}
+
StatusWith<HostAndPort> RemoteCommandTargeterRS::findHost(OperationContext* opCtx,
const ReadPreferenceSetting& readPref) {
const auto interruptStatus = opCtx->checkForInterruptNoAssert();
diff --git a/src/mongo/client/remote_command_targeter_rs.h b/src/mongo/client/remote_command_targeter_rs.h
index fae6225318b..f527e84b45e 100644
--- a/src/mongo/client/remote_command_targeter_rs.h
+++ b/src/mongo/client/remote_command_targeter_rs.h
@@ -56,6 +56,9 @@ public:
StatusWith<HostAndPort> findHost(OperationContext* opCtx,
const ReadPreferenceSetting& readPref) override;
+ SemiFuture<std::vector<HostAndPort>> findHostsWithMaxWait(const ReadPreferenceSetting& readPref,
+ Milliseconds maxWait) override;
+
SemiFuture<HostAndPort> findHostWithMaxWait(const ReadPreferenceSetting& readPref,
Milliseconds maxWait) override;
diff --git a/src/mongo/client/remote_command_targeter_standalone.cpp b/src/mongo/client/remote_command_targeter_standalone.cpp
index aede6abb142..17ac0bb7120 100644
--- a/src/mongo/client/remote_command_targeter_standalone.cpp
+++ b/src/mongo/client/remote_command_targeter_standalone.cpp
@@ -45,7 +45,12 @@ ConnectionString RemoteCommandTargeterStandalone::connectionString() {
SemiFuture<HostAndPort> RemoteCommandTargeterStandalone::findHostWithMaxWait(
const ReadPreferenceSetting& readPref, Milliseconds maxWait) {
- return _hostAndPort;
+ return {_hostAndPort};
+}
+
+SemiFuture<std::vector<HostAndPort>> RemoteCommandTargeterStandalone::findHostsWithMaxWait(
+ const ReadPreferenceSetting& readPref, Milliseconds maxWait) {
+ return {{_hostAndPort}};
}
StatusWith<HostAndPort> RemoteCommandTargeterStandalone::findHost(
diff --git a/src/mongo/client/remote_command_targeter_standalone.h b/src/mongo/client/remote_command_targeter_standalone.h
index 22a07642274..8783f781d0b 100644
--- a/src/mongo/client/remote_command_targeter_standalone.h
+++ b/src/mongo/client/remote_command_targeter_standalone.h
@@ -50,6 +50,10 @@ public:
SemiFuture<HostAndPort> findHostWithMaxWait(const ReadPreferenceSetting& readPref,
Milliseconds maxWait) override;
+
+ SemiFuture<std::vector<HostAndPort>> findHostsWithMaxWait(const ReadPreferenceSetting& readPref,
+ Milliseconds maxWait) override;
+
void markHostNotMaster(const HostAndPort& host, const Status& status) override;
void markHostUnreachable(const HostAndPort& host, const Status& status) override;
diff --git a/src/mongo/client/replica_set_monitor.cpp b/src/mongo/client/replica_set_monitor.cpp
index 25927212728..248446f5cb7 100644
--- a/src/mongo/client/replica_set_monitor.cpp
+++ b/src/mongo/client/replica_set_monitor.cpp
@@ -267,6 +267,22 @@ void ReplicaSetMonitor::_doScheduledRefresh(const CallbackHandle& currentHandle)
SemiFuture<HostAndPort> ReplicaSetMonitor::getHostOrRefresh(const ReadPreferenceSetting& criteria,
Milliseconds maxWait) {
+ return _getHostsOrRefresh(criteria, maxWait)
+ .then([](const auto& hosts) {
+ invariant(hosts.size());
+ return hosts[0];
+ })
+ .semi();
+}
+
+SemiFuture<std::vector<HostAndPort>> ReplicaSetMonitor::getHostsOrRefresh(
+ const ReadPreferenceSetting& criteria, Milliseconds maxWait) {
+ return _getHostsOrRefresh(criteria, maxWait).semi();
+}
+
+Future<std::vector<HostAndPort>> ReplicaSetMonitor::_getHostsOrRefresh(
+ const ReadPreferenceSetting& criteria, Milliseconds maxWait) {
+
// If we're in shutdown, don't bother
if (globalInShutdownDeprecated()) {
return Status(ErrorCodes::ShutdownInProgress, "Server is shutting down"_sd);
@@ -279,7 +295,7 @@ SemiFuture<HostAndPort> ReplicaSetMonitor::getHostOrRefresh(const ReadPreference
// Fast path, for the failure-free case
stdx::lock_guard<stdx::mutex> lk(_state->mutex);
- HostAndPort out = _state->getMatchingHost(criteria);
+ auto out = _state->getMatchingHosts(criteria);
if (!out.empty())
return {std::move(out)};
@@ -289,7 +305,7 @@ SemiFuture<HostAndPort> ReplicaSetMonitor::getHostOrRefresh(const ReadPreference
// TODO look into putting all PrimaryOnly waiters on a single SharedPromise. The tricky part is
// dealing with maxWait.
- auto pf = makePromiseFuture<HostAndPort>();
+ auto pf = makePromiseFuture<decltype(out)>();
_state->waiters.emplace_back(
SetState::Waiter{_state->now() + maxWait, criteria, std::move(pf.promise)});
@@ -306,9 +322,8 @@ SemiFuture<HostAndPort> ReplicaSetMonitor::getHostOrRefresh(const ReadPreference
_scheduleRefresh(_state->now() + kExpeditedRefreshPeriod, lk);
}
- return std::move(pf.future).semi();
+ return std::move(pf.future);
}
-
HostAndPort ReplicaSetMonitor::getMasterOrUassert() {
return getHostOrRefresh(kPrimaryOnlyReadPreference).get();
}
@@ -1036,25 +1051,35 @@ SetState::SetState(const MongoURI& uri,
}
HostAndPort SetState::getMatchingHost(const ReadPreferenceSetting& criteria) const {
+ auto hosts = getMatchingHosts(criteria);
+
+ if (hosts.empty()) {
+ return HostAndPort();
+ }
+
+ return hosts[0];
+}
+
+std::vector<HostAndPort> SetState::getMatchingHosts(const ReadPreferenceSetting& criteria) const {
switch (criteria.pref) {
// "Prefered" read preferences are defined in terms of other preferences
case ReadPreference::PrimaryPreferred: {
- HostAndPort out =
- getMatchingHost(ReadPreferenceSetting(ReadPreference::PrimaryOnly, criteria.tags));
+ std::vector<HostAndPort> out =
+ getMatchingHosts(ReadPreferenceSetting(ReadPreference::PrimaryOnly, criteria.tags));
// NOTE: the spec says we should use the primary even if tags don't match
if (!out.empty())
return out;
- return getMatchingHost(ReadPreferenceSetting(
+ return getMatchingHosts(ReadPreferenceSetting(
ReadPreference::SecondaryOnly, criteria.tags, criteria.maxStalenessSeconds));
}
case ReadPreference::SecondaryPreferred: {
- HostAndPort out = getMatchingHost(ReadPreferenceSetting(
+ std::vector<HostAndPort> out = getMatchingHosts(ReadPreferenceSetting(
ReadPreference::SecondaryOnly, criteria.tags, criteria.maxStalenessSeconds));
if (!out.empty())
return out;
// NOTE: the spec says we should use the primary even if tags don't match
- return getMatchingHost(
+ return getMatchingHosts(
ReadPreferenceSetting(ReadPreference::PrimaryOnly, criteria.tags));
}
@@ -1062,8 +1087,8 @@ HostAndPort SetState::getMatchingHost(const ReadPreferenceSetting& criteria) con
// NOTE: isMaster implies isUp
Nodes::const_iterator it = std::find_if(nodes.begin(), nodes.end(), isMaster);
if (it == nodes.end())
- return HostAndPort();
- return it->host;
+ return {};
+ return {it->host};
}
// The difference between these is handled by Node::matches
@@ -1127,7 +1152,7 @@ HostAndPort SetState::getMatchingHost(const ReadPreferenceSetting& criteria) con
continue;
}
if (matchingNodes.size() == 1) {
- return matchingNodes.front()->host;
+ return {matchingNodes.front()->host};
}
// Only consider nodes that satisfy the minOpTime
@@ -1146,7 +1171,7 @@ HostAndPort SetState::getMatchingHost(const ReadPreferenceSetting& criteria) con
}
if (matchingNodes.size() == 1) {
- return matchingNodes.front()->host;
+ return {matchingNodes.front()->host};
}
}
@@ -1163,17 +1188,26 @@ HostAndPort SetState::getMatchingHost(const ReadPreferenceSetting& criteria) con
}
}
- // of the remaining nodes, pick one at random (or use round-robin)
- if (ReplicaSetMonitor::useDeterministicHostSelection) {
- // only in tests
- return matchingNodes[roundRobin++ % matchingNodes.size()]->host;
- } else {
- // normal case
- return matchingNodes[rand.nextInt32(matchingNodes.size())]->host;
- };
+ std::vector<HostAndPort> hosts;
+ std::transform(matchingNodes.begin(),
+ matchingNodes.end(),
+ std::back_inserter(hosts),
+ [](const auto& node) { return node->host; });
+
+ // Note that the host list is only deterministic (or random) for the first node.
+ // The rest of the list is in matchingNodes order (latency) with one element swapped
+ // for the first element.
+ if (auto bestHostIdx = ReplicaSetMonitor::useDeterministicHostSelection
+ ? roundRobin++ % hosts.size()
+ : rand.nextInt32(hosts.size())) {
+ using std::swap;
+ swap(hosts[0], hosts[bestHostIdx]);
+ }
+
+ return hosts;
}
- return HostAndPort();
+ return {};
}
default:
@@ -1241,7 +1275,7 @@ void SetState::notify(bool finishedScan) {
continue;
}
- auto match = getMatchingHost(it->criteria);
+ auto match = getMatchingHosts(it->criteria);
if (!match.empty()) {
// match;
it->promise.emplaceValue(std::move(match));
diff --git a/src/mongo/client/replica_set_monitor.h b/src/mongo/client/replica_set_monitor.h
index b28364c6a25..92aa3a15add 100644
--- a/src/mongo/client/replica_set_monitor.h
+++ b/src/mongo/client/replica_set_monitor.h
@@ -90,6 +90,9 @@ public:
SemiFuture<HostAndPort> getHostOrRefresh(const ReadPreferenceSetting& readPref,
Milliseconds maxWait = kDefaultFindHostTimeout);
+ SemiFuture<std::vector<HostAndPort>> getHostsOrRefresh(
+ const ReadPreferenceSetting& readPref, Milliseconds maxWait = kDefaultFindHostTimeout);
+
/**
* Returns the host we think is the current master or uasserts.
*
@@ -258,6 +261,9 @@ public:
void runScanForMockReplicaSet();
private:
+ Future<std::vector<HostAndPort>> _getHostsOrRefresh(const ReadPreferenceSetting& readPref,
+ Milliseconds maxWait);
+
/**
* Schedules a refresh via the task executor. (Task is automatically canceled in the d-tor.)
*/
diff --git a/src/mongo/client/replica_set_monitor_internal.h b/src/mongo/client/replica_set_monitor_internal.h
index 04479b6dab9..c4a90b47d79 100644
--- a/src/mongo/client/replica_set_monitor_internal.h
+++ b/src/mongo/client/replica_set_monitor_internal.h
@@ -142,7 +142,7 @@ public:
struct Waiter {
Date_t deadline;
ReadPreferenceSetting criteria;
- Promise<HostAndPort> promise;
+ Promise<std::vector<HostAndPort>> promise;
};
SetState(const MongoURI& uri, ReplicaSetChangeNotifier*, executor::TaskExecutor*);
@@ -154,6 +154,8 @@ public:
*
* Note: Uses only local data and does not go over the network.
*/
+ std::vector<HostAndPort> getMatchingHosts(const ReadPreferenceSetting& criteria) const;
+
HostAndPort getMatchingHost(const ReadPreferenceSetting& criteria) const;
/**
diff --git a/src/mongo/client/replica_set_monitor_read_preference_test.cpp b/src/mongo/client/replica_set_monitor_read_preference_test.cpp
index b380e79b926..3757c62dd61 100644
--- a/src/mongo/client/replica_set_monitor_read_preference_test.cpp
+++ b/src/mongo/client/replica_set_monitor_read_preference_test.cpp
@@ -66,6 +66,35 @@ public:
return out;
}
+ std::vector<HostAndPort> selectNodes(const SetState::Nodes& nodes,
+ ReadPreference pref,
+ const TagSet& tagSet,
+ int latencyThresholdMillis,
+ bool* isPrimarySelected) {
+ invariant(!nodes.empty());
+
+ auto connStr = ConnectionString::forReplicaSet(kSetName, {nodes.front().host});
+ auto set = makeState(MongoURI(connStr));
+ set->nodes = nodes;
+ set->latencyThresholdMicros = latencyThresholdMillis * 1000;
+
+ ReadPreferenceSetting criteria(pref, tagSet);
+ auto out = set->getMatchingHosts(criteria);
+ if (isPrimarySelected && !out.empty()) {
+ for (auto& host : out) {
+ Node* node = set->findNode(host);
+ ASSERT(node);
+
+ if (node->isMaster) {
+ *isPrimarySelected = node->isMaster;
+ break;
+ }
+ }
+ }
+
+ return out;
+ }
+
auto getThreeMemberWithTags() {
SetState::Nodes nodes;
@@ -137,6 +166,19 @@ TEST_F(ReadPrefTest, PrimaryOnly) {
ASSERT_EQUALS("b", host.host());
}
+TEST_F(ReadPrefTest, PrimaryOnlyMulti) {
+ auto nodes = getThreeMemberWithTags();
+ TagSet tags(getDefaultTagSet());
+
+ bool isPrimarySelected = false;
+ std::vector<HostAndPort> hosts =
+ selectNodes(nodes, mongo::ReadPreference::PrimaryOnly, tags, 3, &isPrimarySelected);
+
+ ASSERT(isPrimarySelected);
+ ASSERT_EQUALS(hosts.size(), 1ull);
+ ASSERT_EQUALS("b", hosts[0].host());
+}
+
TEST_F(ReadPrefTest, PrimaryOnlyPriNotOk) {
auto nodes = getThreeMemberWithTags();
TagSet tags(getDefaultTagSet());
@@ -163,6 +205,19 @@ TEST_F(ReadPrefTest, PrimaryMissing) {
ASSERT(host.empty());
}
+TEST_F(ReadPrefTest, PrimaryMissingMulti) {
+ auto nodes = getThreeMemberWithTags();
+ TagSet tags(getDefaultTagSet());
+
+ nodes[1].isMaster = false;
+
+ bool isPrimarySelected = false;
+ std::vector<HostAndPort> hosts =
+ selectNodes(nodes, mongo::ReadPreference::PrimaryOnly, tags, 3, &isPrimarySelected);
+
+ ASSERT(hosts.empty());
+}
+
TEST_F(ReadPrefTest, PriPrefWithPriOk) {
auto nodes = getThreeMemberWithTags();
TagSet tags(getDefaultTagSet());
@@ -203,6 +258,22 @@ TEST_F(ReadPrefTest, SecOnly) {
ASSERT_EQUALS("a", host.host());
}
+TEST_F(ReadPrefTest, SecOnlyMulti) {
+ auto nodes = getThreeMemberWithTags();
+ TagSet tags(getDefaultTagSet());
+
+ bool isPrimarySelected = false;
+ std::vector<HostAndPort> hosts =
+ selectNodes(nodes, mongo::ReadPreference::SecondaryOnly, tags, 1, &isPrimarySelected);
+
+ ASSERT(!isPrimarySelected);
+ std::sort(hosts.begin(), hosts.end());
+
+ ASSERT_EQUALS(hosts.size(), 2ull);
+ ASSERT_EQUALS("a", hosts[0].host());
+ ASSERT_EQUALS("c", hosts[1].host());
+}
+
TEST_F(ReadPrefTest, SecOnlyOnlyPriOk) {
auto nodes = getThreeMemberWithTags();
TagSet tags(getDefaultTagSet());