From aa236ed4f3096c85118f00618eec834c82363527 Mon Sep 17 00:00:00 2001 From: Mathias Stearn Date: Wed, 26 Dec 2018 14:47:00 -0500 Subject: SERVER-38295 ReplSetMonitor::getHostOrRefresh should not do anything if maxWait <= 0 --- src/mongo/client/remote_command_targeter.h | 13 ----- src/mongo/client/remote_command_targeter_rs.cpp | 29 ++++------ src/mongo/client/replica_set_monitor.cpp | 20 ++++--- src/mongo/client/replica_set_monitor_internal.h | 2 + src/mongo/db/s/move_chunk_command.cpp | 4 +- src/mongo/db/transaction_coordinator_service.cpp | 2 +- src/mongo/db/transaction_coordinator_util.cpp | 17 +++--- src/mongo/dbtests/replica_set_monitor_test.cpp | 71 ++++++++++++++++++++++-- src/mongo/s/async_requests_sender.cpp | 4 +- src/mongo/util/concurrency/thread_pool.h | 5 ++ 10 files changed, 108 insertions(+), 59 deletions(-) diff --git a/src/mongo/client/remote_command_targeter.h b/src/mongo/client/remote_command_targeter.h index 07b035ab60e..370752836a9 100644 --- a/src/mongo/client/remote_command_targeter.h +++ b/src/mongo/client/remote_command_targeter.h @@ -82,19 +82,6 @@ public: virtual SharedSemiFuture findHostWithMaxWait(const ReadPreferenceSetting& readPref, Milliseconds maxWait) = 0; - /** - * Finds a host matching the given read preference, giving up if a match is not found promptly. - * - * This method may still engage in blocking networking calls, but will attempt contact every - * member of the replica set at most one time. - * - * TODO(schwerin): Change this implementation to not perform any networking, once existing - * callers have been shown to be safe with this behavior or changed to call findHost. - */ - StatusWith findHostNoWait(const ReadPreferenceSetting& readPref) { - return findHostWithMaxWait(readPref, Milliseconds::zero()).getNoThrow(); - } - /** * Reports to the targeter that a 'status' indicating a not master error was received when * communicating with 'host', and so it should update its bookkeeping to avoid giving out the diff --git a/src/mongo/client/remote_command_targeter_rs.cpp b/src/mongo/client/remote_command_targeter_rs.cpp index 917898d8c2b..15d5fdc930c 100644 --- a/src/mongo/client/remote_command_targeter_rs.cpp +++ b/src/mongo/client/remote_command_targeter_rs.cpp @@ -69,25 +69,18 @@ SharedSemiFuture RemoteCommandTargeterRS::findHostWithMaxWait( StatusWith RemoteCommandTargeterRS::findHost(OperationContext* opCtx, const ReadPreferenceSetting& readPref) { - auto clock = opCtx->getServiceContext()->getFastClockSource(); - auto startDate = clock->now(); - while (true) { - const auto interruptStatus = opCtx->checkForInterruptNoAssert(); - if (!interruptStatus.isOK()) { - return interruptStatus; - } - auto host = _rsMonitor->getHostOrRefresh(readPref, Milliseconds::zero()).getNoThrow(); - if (host.getStatus() != ErrorCodes::FailedToSatisfyReadPreference) { - return host; - } - // Enforce a 20-second ceiling on the time spent looking for a host. This conforms with the - // behavior used throughout mongos prior to version 3.4, but is not fundamentally desirable. - // See comment in remote_command_targeter.h for details. - if (clock->now() - startDate > Seconds{20}) { - return host; - } - sleepFor(Milliseconds{500}); + const auto interruptStatus = opCtx->checkForInterruptNoAssert(); + if (!interruptStatus.isOK()) { + return interruptStatus; } + + // Enforce a 20-second ceiling on the time spent looking for a host. This conforms with the + // behavior used throughout mongos prior to version 3.4, but is not fundamentally desirable. + // See comment in remote_command_targeter.h for details. + return _rsMonitor + ->getHostOrRefresh(readPref, + std::min(opCtx->getRemainingMaxTimeMillis(), Seconds(20))) + .getNoThrow(opCtx); } void RemoteCommandTargeterRS::markHostNotMaster(const HostAndPort& host, const Status& status) { diff --git a/src/mongo/client/replica_set_monitor.cpp b/src/mongo/client/replica_set_monitor.cpp index 95c7875460c..d78adff5a50 100644 --- a/src/mongo/client/replica_set_monitor.cpp +++ b/src/mongo/client/replica_set_monitor.cpp @@ -288,8 +288,9 @@ SharedSemiFuture ReplicaSetMonitor::getHostOrRefresh( if (!out.empty()) return {std::move(out)}; - // TODO early return if maxWait <= 0? Need to figure out if anyone is relying on the current - // behavior where this finishes the scan before doing so. + // Fail early without doing any more work if the timeout has already expired. + if (maxWait <= Milliseconds(0)) + return _state->makeUnsatisfedReadPrefError(criteria); // TODO look into putting all PrimaryOnly waiters on a single SharedPromise. The tricky part is // dealing with maxWait. @@ -1282,12 +1283,7 @@ void SetState::notify(bool finishedScan) { (it->deadline <= cachedNow || areRefreshRetriesDisabledForTest.load())) { // To preserve prior behavior, we only examine deadlines at the end of a scan. // This ensures that we only report failure after trying to contact all hosts. - it->promise.setError(Status(ErrorCodes::FailedToSatisfyReadPreference, - str::stream() - << "Could not find host matching read preference " - << it->criteria.toString() - << " for set " - << name)); + it->promise.setError(makeUnsatisfedReadPrefError(it->criteria)); waiters.erase(it++); } else { it++; @@ -1295,6 +1291,14 @@ void SetState::notify(bool finishedScan) { } } +Status SetState::makeUnsatisfedReadPrefError(const ReadPreferenceSetting& criteria) const { + return Status(ErrorCodes::FailedToSatisfyReadPreference, + str::stream() << "Could not find host matching read preference " + << criteria.toString() + << " for set " + << name); +} + void SetState::checkInvariants() const { bool foundMaster = false; for (size_t i = 0; i < nodes.size(); i++) { diff --git a/src/mongo/client/replica_set_monitor_internal.h b/src/mongo/client/replica_set_monitor_internal.h index 378ad60639b..4b8b88b0823 100644 --- a/src/mongo/client/replica_set_monitor_internal.h +++ b/src/mongo/client/replica_set_monitor_internal.h @@ -199,6 +199,8 @@ public: return executor ? executor->now() : Date_t::now(); } + Status makeUnsatisfedReadPrefError(const ReadPreferenceSetting& criteria) const; + /** * Before unlocking, do DEV checkInvariants(); */ diff --git a/src/mongo/db/s/move_chunk_command.cpp b/src/mongo/db/s/move_chunk_command.cpp index 06aefe463fb..afd0b8c1065 100644 --- a/src/mongo/db/s/move_chunk_command.cpp +++ b/src/mongo/db/s/move_chunk_command.cpp @@ -198,8 +198,8 @@ private: auto recipientShard = uassertStatusOK(shardRegistry->getShard(opCtx, moveChunkRequest.getToShardId())); - return recipientShard->getTargeter()->findHostNoWait( - ReadPreferenceSetting{ReadPreference::PrimaryOnly}); + return recipientShard->getTargeter()->findHost( + opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}); }()); std::string unusedErrMsg; diff --git a/src/mongo/db/transaction_coordinator_service.cpp b/src/mongo/db/transaction_coordinator_service.cpp index 19868013f6e..be96e7a4527 100644 --- a/src/mongo/db/transaction_coordinator_service.cpp +++ b/src/mongo/db/transaction_coordinator_service.cpp @@ -63,7 +63,7 @@ ThreadPool::Options makeDefaultThreadPoolOptions() { ThreadPool::Options options; options.poolName = "TransactionCoordinatorService"; options.minThreads = 0; - options.maxThreads = 16; + options.maxThreads = ThreadPool::Options::kUnlimited; // Ensure all threads have a client options.onCreateThread = [](const std::string& threadName) { diff --git a/src/mongo/db/transaction_coordinator_util.cpp b/src/mongo/db/transaction_coordinator_util.cpp index 90649f37d4d..87e6719ef15 100644 --- a/src/mongo/db/transaction_coordinator_util.cpp +++ b/src/mongo/db/transaction_coordinator_util.cpp @@ -74,16 +74,13 @@ const WriteConcernOptions kInternalMajorityNoSnapshotWriteConcern( * Finds the host and port for a shard. */ HostAndPort targetHost(const ShardId& shardId, const ReadPreferenceSetting& readPref) { - auto opCtx = cc().makeOperationContext(); - auto shardRegistry = Grid::get(opCtx->getServiceContext())->shardRegistry(); - auto swShard = shardRegistry->getShard(opCtx.get(), shardId); - uassertStatusOK(swShard); - auto shard = swShard.getValue(); - auto swHostAndPort = shard->getTargeter()->findHostNoWait(readPref); - uassertStatusOKWithContext(swHostAndPort.getStatus(), - str::stream() << "Could not find shard " << shardId); - - return swHostAndPort.getValue(); + const auto opCtxHolder = cc().makeOperationContext(); + const auto opCtx = opCtxHolder.get(); + auto shard = uassertStatusOK( + Grid::get(opCtx->getServiceContext())->shardRegistry()->getShard(opCtx, shardId)); + // TODO SERVER-35678 return a SemiFuture rather than using a blocking call to + // get(). + return shard->getTargeter()->findHostWithMaxWait(readPref, Seconds(20)).get(opCtx); } /** diff --git a/src/mongo/dbtests/replica_set_monitor_test.cpp b/src/mongo/dbtests/replica_set_monitor_test.cpp index 8ba72e860f1..693a7d7e9df 100644 --- a/src/mongo/dbtests/replica_set_monitor_test.cpp +++ b/src/mongo/dbtests/replica_set_monitor_test.cpp @@ -33,6 +33,7 @@ #include #include +#include "mongo/base/init.h" #include "mongo/client/connpool.h" #include "mongo/client/dbclient_rs.h" #include "mongo/client/replica_set_monitor.h" @@ -51,6 +52,11 @@ using std::string; using std::unique_ptr; using unittest::assertGet; +MONGO_INITIALIZER(DisableReplicaSetMonitorRefreshRetries)(InitializerContext*) { + ReplicaSetMonitor::disableRefreshRetries_forTest(); + return Status::OK(); +} + // TODO: Port these existing tests here: replmonitor_bad_seed.js, repl_monitor_refresh.js /** @@ -253,7 +259,7 @@ private: // Tests the case where the connection to secondary went bad and the replica set // monitor needs to perform a refresh of it's local view then retry the node selection -// again after the refresh. +// again after the refresh as long as the timeout is > 0. TEST_F(TwoNodeWithTags, SecDownRetryNoTag) { MockReplicaSet* replSet = getReplSet(); @@ -272,7 +278,7 @@ TEST_F(TwoNodeWithTags, SecDownRetryNoTag) { HostAndPort node = monitor ->getHostOrRefresh(ReadPreferenceSetting( mongo::ReadPreference::SecondaryOnly, TagSet()), - Milliseconds(0)) + Milliseconds(1)) .get(); ASSERT_FALSE(monitor->isPrimary(node)); @@ -282,7 +288,7 @@ TEST_F(TwoNodeWithTags, SecDownRetryNoTag) { // Tests the case where the connection to secondary went bad and the replica set // monitor needs to perform a refresh of it's local view then retry the node selection -// with tags again after the refresh. +// with tags again after the refresh as long as the timeout is > 0. TEST_F(TwoNodeWithTags, SecDownRetryWithTag) { MockReplicaSet* replSet = getReplSet(); @@ -303,7 +309,7 @@ TEST_F(TwoNodeWithTags, SecDownRetryWithTag) { HostAndPort node = monitor ->getHostOrRefresh(ReadPreferenceSetting(mongo::ReadPreference::SecondaryOnly, tags), - Milliseconds(0)) + Milliseconds(1)) .get(); ASSERT_FALSE(monitor->isPrimary(node)); @@ -311,5 +317,62 @@ TEST_F(TwoNodeWithTags, SecDownRetryWithTag) { monitor.reset(); } +// Tests the case where the connection to secondary went bad and the replica set +// monitor needs to perform a refresh of it's local view, but the scan has an expired timeout. +TEST_F(TwoNodeWithTags, SecDownRetryExpiredTimeout) { + MockReplicaSet* replSet = getReplSet(); + + set seedList; + seedList.insert(HostAndPort(replSet->getPrimary())); + auto monitor = ReplicaSetMonitor::createIfNeeded(replSet->getSetName(), seedList); + + const string secHost(replSet->getSecondaries().front()); + replSet->kill(secHost); + + // Make sure monitor sees the dead secondary + monitor->runScanForMockReplicaSet(); + + replSet->restore(secHost); + + // This will fail, immediately without doing any refreshing. + auto errorFut = monitor->getHostOrRefresh( + ReadPreferenceSetting(mongo::ReadPreference::SecondaryOnly, TagSet()), Milliseconds(0)); + ASSERT(errorFut.isReady()); + ASSERT_EQ(errorFut.getNoThrow().getStatus(), ErrorCodes::FailedToSatisfyReadPreference); + + // Because it did not schedule an expedited scan, it will continue failing until someone waits. + errorFut = monitor->getHostOrRefresh( + ReadPreferenceSetting(mongo::ReadPreference::SecondaryOnly, TagSet()), Milliseconds(0)); + ASSERT(errorFut.isReady()); + ASSERT_EQ(errorFut.getNoThrow().getStatus(), ErrorCodes::FailedToSatisfyReadPreference); + + // Negative timeouts are handled the same way + errorFut = monitor->getHostOrRefresh( + ReadPreferenceSetting(mongo::ReadPreference::SecondaryOnly, TagSet()), Milliseconds(-1234)); + ASSERT(errorFut.isReady()); + ASSERT_EQ(errorFut.getNoThrow().getStatus(), ErrorCodes::FailedToSatisfyReadPreference); + + // This will trigger a rescan. It is the only call in this test with a non-zero timeout. + HostAndPort node = monitor + ->getHostOrRefresh(ReadPreferenceSetting( + mongo::ReadPreference::SecondaryOnly, TagSet()), + Milliseconds(1)) + .get(); + + ASSERT_FALSE(monitor->isPrimary(node)); + ASSERT_EQUALS(secHost, node.toString()); + + // And this will now succeed. + node = monitor + ->getHostOrRefresh( + ReadPreferenceSetting(mongo::ReadPreference::SecondaryOnly, TagSet()), + Milliseconds(0)) + .get(); + ASSERT_FALSE(monitor->isPrimary(node)); + ASSERT_EQUALS(secHost, node.toString()); + + monitor.reset(); +} + } // namespace } // namespace mongo diff --git a/src/mongo/s/async_requests_sender.cpp b/src/mongo/s/async_requests_sender.cpp index c776bdc2420..42b8efe1b82 100644 --- a/src/mongo/s/async_requests_sender.cpp +++ b/src/mongo/s/async_requests_sender.cpp @@ -299,9 +299,7 @@ Status AsyncRequestsSender::RemoteData::resolveShardIdToHostAndPort( str::stream() << "Could not find shard " << shardId); } - auto findHostStatus = - shard->getTargeter()->findHostWithMaxWait(readPref, Seconds{20}).getNoThrow(ars->_opCtx); - + auto findHostStatus = shard->getTargeter()->findHost(ars->_opCtx, readPref); if (findHostStatus.isOK()) shardHostAndPort = std::move(findHostStatus.getValue()); diff --git a/src/mongo/util/concurrency/thread_pool.h b/src/mongo/util/concurrency/thread_pool.h index d6ae8a76c54..1d7e35fbf7e 100644 --- a/src/mongo/util/concurrency/thread_pool.h +++ b/src/mongo/util/concurrency/thread_pool.h @@ -59,6 +59,11 @@ public: * Structure used to configure an instance of ThreadPool. */ struct Options { + // Set maxThreads to this if you don't want to limit the number of threads in the pool. + // Note: the value used here is high enough that it will never be reached, but low enough + // that it won't cause overflows if mixed with signed ints or math. + static constexpr size_t kUnlimited = 1'000'000'000; + // Name of the thread pool. If this string is empty, the pool will be assigned a // name unique to the current process. std::string poolName; -- cgit v1.2.1