summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMathias Stearn <mathias@10gen.com>2018-12-26 14:47:00 -0500
committerMathias Stearn <mathias@10gen.com>2018-12-27 19:24:54 -0500
commitaa236ed4f3096c85118f00618eec834c82363527 (patch)
tree9acf754dd14364af2a8f076ebbab955ab04662c2
parent5286b698336889c2bcfb08e8ab7c16334f447511 (diff)
downloadmongo-aa236ed4f3096c85118f00618eec834c82363527.tar.gz
SERVER-38295 ReplSetMonitor::getHostOrRefresh should not do anything if maxWait <= 0
-rw-r--r--src/mongo/client/remote_command_targeter.h13
-rw-r--r--src/mongo/client/remote_command_targeter_rs.cpp29
-rw-r--r--src/mongo/client/replica_set_monitor.cpp20
-rw-r--r--src/mongo/client/replica_set_monitor_internal.h2
-rw-r--r--src/mongo/db/s/move_chunk_command.cpp4
-rw-r--r--src/mongo/db/transaction_coordinator_service.cpp2
-rw-r--r--src/mongo/db/transaction_coordinator_util.cpp17
-rw-r--r--src/mongo/dbtests/replica_set_monitor_test.cpp71
-rw-r--r--src/mongo/s/async_requests_sender.cpp4
-rw-r--r--src/mongo/util/concurrency/thread_pool.h5
10 files changed, 108 insertions, 59 deletions
diff --git a/src/mongo/client/remote_command_targeter.h b/src/mongo/client/remote_command_targeter.h
index 07b035ab60e..370752836a9 100644
--- a/src/mongo/client/remote_command_targeter.h
+++ b/src/mongo/client/remote_command_targeter.h
@@ -83,19 +83,6 @@ public:
Milliseconds maxWait) = 0;
/**
- * Finds a host matching the given read preference, giving up if a match is not found promptly.
- *
- * This method may still engage in blocking networking calls, but will attempt contact every
- * member of the replica set at most one time.
- *
- * TODO(schwerin): Change this implementation to not perform any networking, once existing
- * callers have been shown to be safe with this behavior or changed to call findHost.
- */
- StatusWith<HostAndPort> findHostNoWait(const ReadPreferenceSetting& readPref) {
- return findHostWithMaxWait(readPref, Milliseconds::zero()).getNoThrow();
- }
-
- /**
* Reports to the targeter that a 'status' indicating a not master error was received when
* communicating with 'host', and so it should update its bookkeeping to avoid giving out the
* host again on a subsequent request for the primary.
diff --git a/src/mongo/client/remote_command_targeter_rs.cpp b/src/mongo/client/remote_command_targeter_rs.cpp
index 917898d8c2b..15d5fdc930c 100644
--- a/src/mongo/client/remote_command_targeter_rs.cpp
+++ b/src/mongo/client/remote_command_targeter_rs.cpp
@@ -69,25 +69,18 @@ SharedSemiFuture<HostAndPort> RemoteCommandTargeterRS::findHostWithMaxWait(
StatusWith<HostAndPort> RemoteCommandTargeterRS::findHost(OperationContext* opCtx,
const ReadPreferenceSetting& readPref) {
- auto clock = opCtx->getServiceContext()->getFastClockSource();
- auto startDate = clock->now();
- while (true) {
- const auto interruptStatus = opCtx->checkForInterruptNoAssert();
- if (!interruptStatus.isOK()) {
- return interruptStatus;
- }
- auto host = _rsMonitor->getHostOrRefresh(readPref, Milliseconds::zero()).getNoThrow();
- if (host.getStatus() != ErrorCodes::FailedToSatisfyReadPreference) {
- return host;
- }
- // Enforce a 20-second ceiling on the time spent looking for a host. This conforms with the
- // behavior used throughout mongos prior to version 3.4, but is not fundamentally desirable.
- // See comment in remote_command_targeter.h for details.
- if (clock->now() - startDate > Seconds{20}) {
- return host;
- }
- sleepFor(Milliseconds{500});
+ const auto interruptStatus = opCtx->checkForInterruptNoAssert();
+ if (!interruptStatus.isOK()) {
+ return interruptStatus;
}
+
+ // Enforce a 20-second ceiling on the time spent looking for a host. This conforms with the
+ // behavior used throughout mongos prior to version 3.4, but is not fundamentally desirable.
+ // See comment in remote_command_targeter.h for details.
+ return _rsMonitor
+ ->getHostOrRefresh(readPref,
+ std::min<Milliseconds>(opCtx->getRemainingMaxTimeMillis(), Seconds(20)))
+ .getNoThrow(opCtx);
}
void RemoteCommandTargeterRS::markHostNotMaster(const HostAndPort& host, const Status& status) {
diff --git a/src/mongo/client/replica_set_monitor.cpp b/src/mongo/client/replica_set_monitor.cpp
index 95c7875460c..d78adff5a50 100644
--- a/src/mongo/client/replica_set_monitor.cpp
+++ b/src/mongo/client/replica_set_monitor.cpp
@@ -288,8 +288,9 @@ SharedSemiFuture<HostAndPort> ReplicaSetMonitor::getHostOrRefresh(
if (!out.empty())
return {std::move(out)};
- // TODO early return if maxWait <= 0? Need to figure out if anyone is relying on the current
- // behavior where this finishes the scan before doing so.
+ // Fail early without doing any more work if the timeout has already expired.
+ if (maxWait <= Milliseconds(0))
+ return _state->makeUnsatisfedReadPrefError(criteria);
// TODO look into putting all PrimaryOnly waiters on a single SharedPromise. The tricky part is
// dealing with maxWait.
@@ -1282,12 +1283,7 @@ void SetState::notify(bool finishedScan) {
(it->deadline <= cachedNow || areRefreshRetriesDisabledForTest.load())) {
// To preserve prior behavior, we only examine deadlines at the end of a scan.
// This ensures that we only report failure after trying to contact all hosts.
- it->promise.setError(Status(ErrorCodes::FailedToSatisfyReadPreference,
- str::stream()
- << "Could not find host matching read preference "
- << it->criteria.toString()
- << " for set "
- << name));
+ it->promise.setError(makeUnsatisfedReadPrefError(it->criteria));
waiters.erase(it++);
} else {
it++;
@@ -1295,6 +1291,14 @@ void SetState::notify(bool finishedScan) {
}
}
+Status SetState::makeUnsatisfedReadPrefError(const ReadPreferenceSetting& criteria) const {
+ return Status(ErrorCodes::FailedToSatisfyReadPreference,
+ str::stream() << "Could not find host matching read preference "
+ << criteria.toString()
+ << " for set "
+ << name);
+}
+
void SetState::checkInvariants() const {
bool foundMaster = false;
for (size_t i = 0; i < nodes.size(); i++) {
diff --git a/src/mongo/client/replica_set_monitor_internal.h b/src/mongo/client/replica_set_monitor_internal.h
index 378ad60639b..4b8b88b0823 100644
--- a/src/mongo/client/replica_set_monitor_internal.h
+++ b/src/mongo/client/replica_set_monitor_internal.h
@@ -199,6 +199,8 @@ public:
return executor ? executor->now() : Date_t::now();
}
+ Status makeUnsatisfedReadPrefError(const ReadPreferenceSetting& criteria) const;
+
/**
* Before unlocking, do DEV checkInvariants();
*/
diff --git a/src/mongo/db/s/move_chunk_command.cpp b/src/mongo/db/s/move_chunk_command.cpp
index 06aefe463fb..afd0b8c1065 100644
--- a/src/mongo/db/s/move_chunk_command.cpp
+++ b/src/mongo/db/s/move_chunk_command.cpp
@@ -198,8 +198,8 @@ private:
auto recipientShard =
uassertStatusOK(shardRegistry->getShard(opCtx, moveChunkRequest.getToShardId()));
- return recipientShard->getTargeter()->findHostNoWait(
- ReadPreferenceSetting{ReadPreference::PrimaryOnly});
+ return recipientShard->getTargeter()->findHost(
+ opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly});
}());
std::string unusedErrMsg;
diff --git a/src/mongo/db/transaction_coordinator_service.cpp b/src/mongo/db/transaction_coordinator_service.cpp
index 19868013f6e..be96e7a4527 100644
--- a/src/mongo/db/transaction_coordinator_service.cpp
+++ b/src/mongo/db/transaction_coordinator_service.cpp
@@ -63,7 +63,7 @@ ThreadPool::Options makeDefaultThreadPoolOptions() {
ThreadPool::Options options;
options.poolName = "TransactionCoordinatorService";
options.minThreads = 0;
- options.maxThreads = 16;
+ options.maxThreads = ThreadPool::Options::kUnlimited;
// Ensure all threads have a client
options.onCreateThread = [](const std::string& threadName) {
diff --git a/src/mongo/db/transaction_coordinator_util.cpp b/src/mongo/db/transaction_coordinator_util.cpp
index 90649f37d4d..87e6719ef15 100644
--- a/src/mongo/db/transaction_coordinator_util.cpp
+++ b/src/mongo/db/transaction_coordinator_util.cpp
@@ -74,16 +74,13 @@ const WriteConcernOptions kInternalMajorityNoSnapshotWriteConcern(
* Finds the host and port for a shard.
*/
HostAndPort targetHost(const ShardId& shardId, const ReadPreferenceSetting& readPref) {
- auto opCtx = cc().makeOperationContext();
- auto shardRegistry = Grid::get(opCtx->getServiceContext())->shardRegistry();
- auto swShard = shardRegistry->getShard(opCtx.get(), shardId);
- uassertStatusOK(swShard);
- auto shard = swShard.getValue();
- auto swHostAndPort = shard->getTargeter()->findHostNoWait(readPref);
- uassertStatusOKWithContext(swHostAndPort.getStatus(),
- str::stream() << "Could not find shard " << shardId);
-
- return swHostAndPort.getValue();
+ const auto opCtxHolder = cc().makeOperationContext();
+ const auto opCtx = opCtxHolder.get();
+ auto shard = uassertStatusOK(
+ Grid::get(opCtx->getServiceContext())->shardRegistry()->getShard(opCtx, shardId));
+ // TODO SERVER-35678 return a SemiFuture<HostAndPort> rather than using a blocking call to
+ // get().
+ return shard->getTargeter()->findHostWithMaxWait(readPref, Seconds(20)).get(opCtx);
}
/**
diff --git a/src/mongo/dbtests/replica_set_monitor_test.cpp b/src/mongo/dbtests/replica_set_monitor_test.cpp
index 8ba72e860f1..693a7d7e9df 100644
--- a/src/mongo/dbtests/replica_set_monitor_test.cpp
+++ b/src/mongo/dbtests/replica_set_monitor_test.cpp
@@ -33,6 +33,7 @@
#include <set>
#include <vector>
+#include "mongo/base/init.h"
#include "mongo/client/connpool.h"
#include "mongo/client/dbclient_rs.h"
#include "mongo/client/replica_set_monitor.h"
@@ -51,6 +52,11 @@ using std::string;
using std::unique_ptr;
using unittest::assertGet;
+MONGO_INITIALIZER(DisableReplicaSetMonitorRefreshRetries)(InitializerContext*) {
+ ReplicaSetMonitor::disableRefreshRetries_forTest();
+ return Status::OK();
+}
+
// TODO: Port these existing tests here: replmonitor_bad_seed.js, repl_monitor_refresh.js
/**
@@ -253,7 +259,7 @@ private:
// Tests the case where the connection to secondary went bad and the replica set
// monitor needs to perform a refresh of it's local view then retry the node selection
-// again after the refresh.
+// again after the refresh as long as the timeout is > 0.
TEST_F(TwoNodeWithTags, SecDownRetryNoTag) {
MockReplicaSet* replSet = getReplSet();
@@ -272,7 +278,7 @@ TEST_F(TwoNodeWithTags, SecDownRetryNoTag) {
HostAndPort node = monitor
->getHostOrRefresh(ReadPreferenceSetting(
mongo::ReadPreference::SecondaryOnly, TagSet()),
- Milliseconds(0))
+ Milliseconds(1))
.get();
ASSERT_FALSE(monitor->isPrimary(node));
@@ -282,7 +288,7 @@ TEST_F(TwoNodeWithTags, SecDownRetryNoTag) {
// Tests the case where the connection to secondary went bad and the replica set
// monitor needs to perform a refresh of it's local view then retry the node selection
-// with tags again after the refresh.
+// with tags again after the refresh as long as the timeout is > 0.
TEST_F(TwoNodeWithTags, SecDownRetryWithTag) {
MockReplicaSet* replSet = getReplSet();
@@ -303,7 +309,7 @@ TEST_F(TwoNodeWithTags, SecDownRetryWithTag) {
HostAndPort node =
monitor
->getHostOrRefresh(ReadPreferenceSetting(mongo::ReadPreference::SecondaryOnly, tags),
- Milliseconds(0))
+ Milliseconds(1))
.get();
ASSERT_FALSE(monitor->isPrimary(node));
@@ -311,5 +317,62 @@ TEST_F(TwoNodeWithTags, SecDownRetryWithTag) {
monitor.reset();
}
+// Tests the case where the connection to secondary went bad and the replica set
+// monitor needs to perform a refresh of it's local view, but the scan has an expired timeout.
+TEST_F(TwoNodeWithTags, SecDownRetryExpiredTimeout) {
+ MockReplicaSet* replSet = getReplSet();
+
+ set<HostAndPort> seedList;
+ seedList.insert(HostAndPort(replSet->getPrimary()));
+ auto monitor = ReplicaSetMonitor::createIfNeeded(replSet->getSetName(), seedList);
+
+ const string secHost(replSet->getSecondaries().front());
+ replSet->kill(secHost);
+
+ // Make sure monitor sees the dead secondary
+ monitor->runScanForMockReplicaSet();
+
+ replSet->restore(secHost);
+
+ // This will fail, immediately without doing any refreshing.
+ auto errorFut = monitor->getHostOrRefresh(
+ ReadPreferenceSetting(mongo::ReadPreference::SecondaryOnly, TagSet()), Milliseconds(0));
+ ASSERT(errorFut.isReady());
+ ASSERT_EQ(errorFut.getNoThrow().getStatus(), ErrorCodes::FailedToSatisfyReadPreference);
+
+ // Because it did not schedule an expedited scan, it will continue failing until someone waits.
+ errorFut = monitor->getHostOrRefresh(
+ ReadPreferenceSetting(mongo::ReadPreference::SecondaryOnly, TagSet()), Milliseconds(0));
+ ASSERT(errorFut.isReady());
+ ASSERT_EQ(errorFut.getNoThrow().getStatus(), ErrorCodes::FailedToSatisfyReadPreference);
+
+ // Negative timeouts are handled the same way
+ errorFut = monitor->getHostOrRefresh(
+ ReadPreferenceSetting(mongo::ReadPreference::SecondaryOnly, TagSet()), Milliseconds(-1234));
+ ASSERT(errorFut.isReady());
+ ASSERT_EQ(errorFut.getNoThrow().getStatus(), ErrorCodes::FailedToSatisfyReadPreference);
+
+ // This will trigger a rescan. It is the only call in this test with a non-zero timeout.
+ HostAndPort node = monitor
+ ->getHostOrRefresh(ReadPreferenceSetting(
+ mongo::ReadPreference::SecondaryOnly, TagSet()),
+ Milliseconds(1))
+ .get();
+
+ ASSERT_FALSE(monitor->isPrimary(node));
+ ASSERT_EQUALS(secHost, node.toString());
+
+ // And this will now succeed.
+ node = monitor
+ ->getHostOrRefresh(
+ ReadPreferenceSetting(mongo::ReadPreference::SecondaryOnly, TagSet()),
+ Milliseconds(0))
+ .get();
+ ASSERT_FALSE(monitor->isPrimary(node));
+ ASSERT_EQUALS(secHost, node.toString());
+
+ monitor.reset();
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/s/async_requests_sender.cpp b/src/mongo/s/async_requests_sender.cpp
index c776bdc2420..42b8efe1b82 100644
--- a/src/mongo/s/async_requests_sender.cpp
+++ b/src/mongo/s/async_requests_sender.cpp
@@ -299,9 +299,7 @@ Status AsyncRequestsSender::RemoteData::resolveShardIdToHostAndPort(
str::stream() << "Could not find shard " << shardId);
}
- auto findHostStatus =
- shard->getTargeter()->findHostWithMaxWait(readPref, Seconds{20}).getNoThrow(ars->_opCtx);
-
+ auto findHostStatus = shard->getTargeter()->findHost(ars->_opCtx, readPref);
if (findHostStatus.isOK())
shardHostAndPort = std::move(findHostStatus.getValue());
diff --git a/src/mongo/util/concurrency/thread_pool.h b/src/mongo/util/concurrency/thread_pool.h
index d6ae8a76c54..1d7e35fbf7e 100644
--- a/src/mongo/util/concurrency/thread_pool.h
+++ b/src/mongo/util/concurrency/thread_pool.h
@@ -59,6 +59,11 @@ public:
* Structure used to configure an instance of ThreadPool.
*/
struct Options {
+ // Set maxThreads to this if you don't want to limit the number of threads in the pool.
+ // Note: the value used here is high enough that it will never be reached, but low enough
+ // that it won't cause overflows if mixed with signed ints or math.
+ static constexpr size_t kUnlimited = 1'000'000'000;
+
// Name of the thread pool. If this string is empty, the pool will be assigned a
// name unique to the current process.
std::string poolName;