From 5b57abab3e157d775157518780143fbcbbd7b96c Mon Sep 17 00:00:00 2001 From: Ben Caimano Date: Mon, 10 Aug 2020 18:48:23 +0000 Subject: SERVER-51499 Track service executor statistics more thoroughly --- etc/evergreen.yml | 2 + jstests/noPassthrough/max_conns_override.js | 65 +++++++++++++++------- src/mongo/db/commands/server_status_servers.cpp | 23 ++++++-- src/mongo/transport/service_entry_point_impl.cpp | 5 -- src/mongo/transport/service_executor_fixed.cpp | 22 ++++++-- src/mongo/transport/service_executor_fixed.h | 25 ++++++++- src/mongo/transport/service_executor_reserved.cpp | 37 +++++++++--- .../transport/service_executor_synchronous.cpp | 17 ++++-- src/mongo/transport/service_executor_test.cpp | 31 ----------- 9 files changed, 146 insertions(+), 81 deletions(-) diff --git a/etc/evergreen.yml b/etc/evergreen.yml index 997d9ef897d..898f8763e33 100644 --- a/etc/evergreen.yml +++ b/etc/evergreen.yml @@ -12626,6 +12626,8 @@ buildvariants: - name: .concurrency .common !.kill_terminate - name: .integration !.audit - name: .jscore .common + - name: noPassthrough_gen + - name: noPassthroughWithMongod_gen - name: .logical_session_cache .one_sec - name: .sharding .jscore !.wo_snapshot !.multi_stmt - name: .sharding .common !.csrs diff --git a/jstests/noPassthrough/max_conns_override.js b/jstests/noPassthrough/max_conns_override.js index 9da68bdb90b..20464bfdc3b 100644 --- a/jstests/noPassthrough/max_conns_override.js +++ b/jstests/noPassthrough/max_conns_override.js @@ -20,17 +20,34 @@ function verifyStats({exemptCount, normalCount}) { // Verify that we have updated serverStatus. assert.soon(() => { const serverStatus = getStats(); + const executors = serverStatus.network.serviceExecutors; - const readyAdminThreads = serverStatus.connections.adminConnections.readyThreads; + const currentCount = serverStatus.connections.current; + if (currentCount != totalCount) { + print(`Not yet at the expected count of connections: ${currentCount} != ${totalCount}`); + return false; + } + + const readyAdminThreads = + executors.reserved.threadsRunning - executors.reserved.clientsRunning; if (readyAdminThreads < configuredReadyAdminThreads) { - print(`Not enough admin threads yet: ${readyAdminThreads} vs ${ - configuredReadyAdminThreads}`); + print("Not enough admin threads yet: " + + `${readyAdminThreads} < ${configuredReadyAdminThreads}`); return false; } - const currentCount = serverStatus.connections.current; - if (currentCount != totalCount) { - print(`Not yet at the expected count of connections: ${currentCount} vs ${totalCount}`); + const threadedCount = serverStatus.connections.threaded; + const threadedExecutorCount = + executors.passthrough.clientsInTotal + executors.reserved.clientsInTotal; + if (threadedCount != threadedExecutorCount) { + print("Not enough running threaded clients yet: " + + `${threadedCount} != ${threadedExecutorCount}`); + return false; + } + + const totalExecutorCount = threadedExecutorCount + executors.fixed.clientsInTotal; + if (totalCount != totalExecutorCount) { + print(`Not enough running clients yet: ${totalCount} != ${totalExecutorCount}`); return false; } @@ -39,14 +56,19 @@ function verifyStats({exemptCount, normalCount}) { const serverStatus = getStats(); const connectionsStatus = serverStatus.connections; - const reservedExecutorStatus = connectionsStatus.adminConnections; - const executorStatus = serverStatus.network.serviceExecutorTaskStats; + const reservedExecutorStatus = serverStatus.network.serviceExecutors.reserved; + const fixedExecutorStatus = serverStatus.network.serviceExecutors.fixed; + const executorStatus = serverStatus.network.serviceExecutors.passthrough; // Log these serverStatus sections so we can debug this easily. const filteredSections = { connections: connectionsStatus, network: { - serviceExecutorTaskStats: executorStatus, + serviceExecutors: { + passthrough: executorStatus, + fixed: fixedExecutorStatus, + reserved: reservedExecutorStatus + } } }; print(`serverStatus: ${tojson(filteredSections)}`); @@ -61,18 +83,23 @@ function verifyStats({exemptCount, normalCount}) { // All connections on an exempt CIDR should be marked as limitExempt. assert.eq(connectionsStatus["limitExempt"], exemptCount); - // Without a borrowing executor, all connections are threaded. - assert.eq(connectionsStatus["threaded"], totalCount); + // The normal serviceExecutor should only be running at most maxConns number of threads. + assert.lte(executorStatus["threadsRunning"], configuredMaxConns); - if (totalCount > configuredMaxConns) { - // The normal serviceExecutor should only be running at most maxConns number of threads. - assert.eq(executorStatus["threadsRunning"], configuredMaxConns); - } else { - assert.eq(executorStatus["threadsRunning"], totalCount); - } + // Clients on the normal executor own their thread and cannot wait asynchronously. + assert.eq(executorStatus["clientsRunning"], executorStatus["clientsInTotal"]); + assert.eq(executorStatus["clientsRunning"], executorStatus["threadsRunning"]); + assert.eq(executorStatus["clientsWaitingForData"], 0); + + // Clients on the reserved executor run on a thread and cannot wait asynchronously. + assert.eq(reservedExecutorStatus["clientsRunning"], reservedExecutorStatus["clientsInTotal"]); + assert.lte(reservedExecutorStatus["clientsRunning"], reservedExecutorStatus["threadsRunning"]); + assert.eq(reservedExecutorStatus["clientsWaitingForData"], 0); - // We should have all excess connections on the reserved executor. - assert.gt(reservedExecutorStatus["threadsRunning"], totalCount - configuredMaxConns); + // Clients on the fixed executor borrow one thread and can wait asynchronously + assert.lte(fixedExecutorStatus["clientsRunning"], fixedExecutorStatus["clientsInTotal"]); + assert.lte(fixedExecutorStatus["clientsRunning"], fixedExecutorStatus["threadsRunning"]); + assert.lte(fixedExecutorStatus["clientsWaitingForData"], fixedExecutorStatus["clientsInTotal"]); } // Use the external ip to avoid our exempt CIDR. diff --git a/src/mongo/db/commands/server_status_servers.cpp b/src/mongo/db/commands/server_status_servers.cpp index 7ee1dfcba52..82ea2b95f12 100644 --- a/src/mongo/db/commands/server_status_servers.cpp +++ b/src/mongo/db/commands/server_status_servers.cpp @@ -33,6 +33,8 @@ #include "mongo/db/commands/server_status.h" #include "mongo/transport/message_compressor_registry.h" #include "mongo/transport/service_entry_point.h" +#include "mongo/transport/service_executor_fixed.h" +#include "mongo/transport/service_executor_reserved.h" #include "mongo/transport/service_executor_synchronous.h" #include "mongo/util/net/hostname_canonicalization.h" #include "mongo/util/net/socket_utils.h" @@ -78,16 +80,27 @@ public: return true; } - // TODO: need to track connections in server stats (see SERVER-49109) BSONObj generateSection(OperationContext* opCtx, const BSONElement& configElement) const override { BSONObjBuilder b; networkCounter.append(b); appendMessageCompressionStats(&b); - auto executor = transport::ServiceExecutorSynchronous::get(opCtx->getServiceContext()); - if (executor) { - BSONObjBuilder section(b.subobjStart("serviceExecutorTaskStats")); - executor->appendStats(§ion); + + { + BSONObjBuilder section = b.subobjStart("serviceExecutors"); + + auto svcCtx = opCtx->getServiceContext(); + if (auto executor = transport::ServiceExecutorSynchronous::get(svcCtx)) { + executor->appendStats(§ion); + } + + if (auto executor = transport::ServiceExecutorReserved::get(svcCtx)) { + executor->appendStats(§ion); + } + + if (auto executor = transport::ServiceExecutorFixed::get(svcCtx)) { + executor->appendStats(§ion); + } } return b.obj(); diff --git a/src/mongo/transport/service_entry_point_impl.cpp b/src/mongo/transport/service_entry_point_impl.cpp index 400979f14d9..33f2ae0f213 100644 --- a/src/mongo/transport/service_entry_point_impl.cpp +++ b/src/mongo/transport/service_entry_point_impl.cpp @@ -333,11 +333,6 @@ void ServiceEntryPointImpl::appendStats(BSONObjBuilder* bob) const { static_cast(IsMasterMetrics::get(_svcCtx)->getNumExhaustIsMaster())); bob->append("awaitingTopologyChanges", static_cast(IsMasterMetrics::get(_svcCtx)->getNumAwaitingTopologyChanges())); - - if (auto adminExec = transport::ServiceExecutorReserved::get(_svcCtx)) { - BSONObjBuilder section(bob->subobjStart("adminConnections")); - adminExec->appendStats(§ion); - } } } // namespace mongo diff --git a/src/mongo/transport/service_executor_fixed.cpp b/src/mongo/transport/service_executor_fixed.cpp index fb34101f213..b1301576e10 100644 --- a/src/mongo/transport/service_executor_fixed.cpp +++ b/src/mongo/transport/service_executor_fixed.cpp @@ -49,10 +49,13 @@ MONGO_FAIL_POINT_DEFINE(hangBeforeServiceExecutorFixedLastExecutorThreadReturns) namespace transport { namespace { -constexpr auto kThreadsRunning = "threadsRunning"_sd; -constexpr auto kExecutorLabel = "executor"_sd; constexpr auto kExecutorName = "fixed"_sd; +constexpr auto kThreadsRunning = "threadsRunning"_sd; +constexpr auto kClientsInTotal = "clientsInTotal"_sd; +constexpr auto kClientsRunning = "clientsRunning"_sd; +constexpr auto kClientsWaiting = "clientsWaitingForData"_sd; + const auto getServiceExecutorFixed = ServiceContext::declareDecoration>(); @@ -158,7 +161,7 @@ ServiceExecutorFixed::~ServiceExecutorFixed() { invariant(_threadsRunning() == 0); invariant(_tasksRunning() == 0); - invariant(_waiters.empty()); + invariant(_tasksWaiting() == 0); } Status ServiceExecutorFixed::start() { @@ -400,6 +403,8 @@ void ServiceExecutorFixed::runOnDataAvailable(const SessionHandle& session, } it = _waiters.emplace(_waiters.end(), std::move(waiter)); + + _stats.waitersStarted.fetchAndAdd(1); } session->asyncWaitForData() @@ -411,6 +416,8 @@ void ServiceExecutorFixed::runOnDataAvailable(const SessionHandle& session, auto lk = stdx::unique_lock(_mutex); waiter = std::exchange(*it, {}); _waiters.erase(it); + + _stats.waitersEnded.fetchAndAdd(1); } waiter.onCompletionCallback(std::move(status)); @@ -418,8 +425,13 @@ void ServiceExecutorFixed::runOnDataAvailable(const SessionHandle& session, } void ServiceExecutorFixed::appendStats(BSONObjBuilder* bob) const { - *bob << kExecutorLabel << kExecutorName << kThreadsRunning - << static_cast(_threadsRunning()); + // The ServiceExecutorFixed schedules Clients temporarily onto its threads and waits + // asynchronously. + BSONObjBuilder subbob = bob->subobjStart(kExecutorName); + subbob.append(kThreadsRunning, static_cast(_threadsRunning())); + subbob.append(kClientsInTotal, static_cast(_tasksTotal())); + subbob.append(kClientsRunning, static_cast(_tasksRunning())); + subbob.append(kClientsWaiting, static_cast(_tasksWaiting())); } int ServiceExecutorFixed::getRecursionDepthForExecutorThread() const { diff --git a/src/mongo/transport/service_executor_fixed.h b/src/mongo/transport/service_executor_fixed.h index 3a96ec30f66..403339e61c6 100644 --- a/src/mongo/transport/service_executor_fixed.h +++ b/src/mongo/transport/service_executor_fixed.h @@ -99,15 +99,31 @@ private: void _schedule(OutOfLineExecutor::Task task) noexcept; auto _threadsRunning() const { - return _stats.threadsStarted.load() - _stats.threadsEnded.loadRelaxed(); + auto ended = _stats.threadsEnded.load(); + auto started = _stats.threadsStarted.loadRelaxed(); + return started - ended; } auto _tasksRunning() const { - return _stats.tasksStarted.load() - _stats.tasksEnded.loadRelaxed(); + auto ended = _stats.tasksEnded.load(); + auto started = _stats.tasksStarted.loadRelaxed(); + return started - ended; } auto _tasksLeft() const { - return _stats.tasksScheduled.load() - _stats.tasksEnded.loadRelaxed(); + auto ended = _stats.tasksEnded.load(); + auto scheduled = _stats.tasksScheduled.loadRelaxed(); + return scheduled - ended; + } + + auto _tasksWaiting() const { + auto ended = _stats.waitersEnded.load(); + auto started = _stats.waitersStarted.loadRelaxed(); + return started - ended; + } + + auto _tasksTotal() const { + return _tasksRunning() + _tasksWaiting(); } struct Stats { @@ -117,6 +133,9 @@ private: AtomicWord tasksScheduled{0}; AtomicWord tasksStarted{0}; AtomicWord tasksEnded{0}; + + AtomicWord waitersStarted{0}; + AtomicWord waitersEnded{0}; }; Stats _stats; diff --git a/src/mongo/transport/service_executor_reserved.cpp b/src/mongo/transport/service_executor_reserved.cpp index b4d3709008a..c613c9c7ba2 100644 --- a/src/mongo/transport/service_executor_reserved.cpp +++ b/src/mongo/transport/service_executor_reserved.cpp @@ -45,11 +45,12 @@ namespace mongo { namespace transport { namespace { -constexpr auto kThreadsRunning = "threadsRunning"_sd; -constexpr auto kExecutorLabel = "executor"_sd; constexpr auto kExecutorName = "reserved"_sd; -constexpr auto kReadyThreads = "readyThreads"_sd; -constexpr auto kStartingThreads = "startingThreads"_sd; + +constexpr auto kThreadsRunning = "threadsRunning"_sd; +constexpr auto kClientsInTotal = "clientsInTotal"_sd; +constexpr auto kClientsRunning = "clientsRunning"_sd; +constexpr auto kClientsWaiting = "clientsWaitingForData"_sd; const auto getServiceExecutorReserved = ServiceContext::declareDecoration>(); @@ -214,11 +215,29 @@ Status ServiceExecutorReserved::scheduleTask(Task task, ScheduleFlags flags) { } void ServiceExecutorReserved::appendStats(BSONObjBuilder* bob) const { - stdx::lock_guard lk(_mutex); - *bob << kExecutorLabel << kExecutorName << kThreadsRunning - << static_cast(_numRunningWorkerThreads.loadRelaxed()) << kReadyThreads - << static_cast(_numReadyThreads) << kStartingThreads - << static_cast(_numStartingThreads); + // The ServiceExecutorReserved loans a thread to one client for its lifetime and waits + // synchronously on thread. + struct Statlet { + int threads; + int total; + int running; + int waiting; + }; + + auto statlet = [&] { + stdx::lock_guard lk(_mutex); + auto threads = static_cast(_numRunningWorkerThreads.loadRelaxed()); + auto total = static_cast(threads - _numReadyThreads - _numStartingThreads); + auto running = total; + auto waiting = 0; + return Statlet{threads, total, running, waiting}; + }(); + + BSONObjBuilder subbob = bob->subobjStart(kExecutorName); + subbob.append(kThreadsRunning, statlet.threads); + subbob.append(kClientsInTotal, statlet.total); + subbob.append(kClientsRunning, statlet.running); + subbob.append(kClientsWaiting, statlet.waiting); } void ServiceExecutorReserved::runOnDataAvailable(const SessionHandle& session, diff --git a/src/mongo/transport/service_executor_synchronous.cpp b/src/mongo/transport/service_executor_synchronous.cpp index b5df5501d82..cb24e5b305e 100644 --- a/src/mongo/transport/service_executor_synchronous.cpp +++ b/src/mongo/transport/service_executor_synchronous.cpp @@ -43,10 +43,13 @@ namespace mongo { namespace transport { namespace { -constexpr auto kThreadsRunning = "threadsRunning"_sd; -constexpr auto kExecutorLabel = "executor"_sd; constexpr auto kExecutorName = "passthrough"_sd; +constexpr auto kThreadsRunning = "threadsRunning"_sd; +constexpr auto kClientsInTotal = "clientsInTotal"_sd; +constexpr auto kClientsRunning = "clientsRunning"_sd; +constexpr auto kClientsWaiting = "clientsWaitingForData"_sd; + const auto getServiceExecutorSynchronous = ServiceContext::declareDecoration>(); @@ -152,8 +155,14 @@ Status ServiceExecutorSynchronous::scheduleTask(Task task, ScheduleFlags flags) } void ServiceExecutorSynchronous::appendStats(BSONObjBuilder* bob) const { - *bob << kExecutorLabel << kExecutorName << kThreadsRunning - << static_cast(_numRunningWorkerThreads.loadRelaxed()); + // The ServiceExecutorSynchronous has one client per thread and waits synchronously on thread. + auto threads = static_cast(_numRunningWorkerThreads.loadRelaxed()); + + BSONObjBuilder subbob = bob->subobjStart(kExecutorName); + subbob.append(kThreadsRunning, threads); + subbob.append(kClientsInTotal, threads); + subbob.append(kClientsRunning, threads); + subbob.append(kClientsWaiting, 0); } void ServiceExecutorSynchronous::runOnDataAvailable(const SessionHandle& session, diff --git a/src/mongo/transport/service_executor_test.cpp b/src/mongo/transport/service_executor_test.cpp index be85fb1fb43..46deb4d00fd 100644 --- a/src/mongo/transport/service_executor_test.cpp +++ b/src/mongo/transport/service_executor_test.cpp @@ -301,37 +301,6 @@ TEST_F(ServiceExecutorFixedFixture, ShutdownTimeLimit) { mayReturn->emplaceValue(); } -TEST_F(ServiceExecutorFixedFixture, Stats) { - ServiceExecutorHandle executorHandle(ServiceExecutorHandle::kStartExecutor); - auto rendezvousBarrier = std::make_shared(kNumExecutorThreads + 1); - auto returnBarrier = std::make_shared(kNumExecutorThreads + 1); - - auto task = [rendezvousBarrier, returnBarrier]() mutable { - rendezvousBarrier->countDownAndWait(); - // Executor threads wait here for the main thread to test "executor->appendStats()". - returnBarrier->countDownAndWait(); - }; - - for (auto i = 0; i < kNumExecutorThreads; i++) { - ASSERT_OK(executorHandle->scheduleTask(task, ServiceExecutor::kEmptyFlags)); - } - - // The main thread waits for the executor threads to bump up "threadsRunning" while picking up a - // task to execute. Once all executor threads are running (rendezvous) and the main thread is - // done testing the stats, the main thread will unblock them through "returnBarrier". - rendezvousBarrier->countDownAndWait(); - - BSONObjBuilder bob; - executorHandle->appendStats(&bob); - auto obj = bob.obj(); - ASSERT(obj.hasField("threadsRunning")); - auto threadsRunning = obj.getIntField("threadsRunning"); - ASSERT_EQ(threadsRunning, static_cast(ServiceExecutorFixedFixture::kNumExecutorThreads)); - - returnBarrier->countDownAndWait(); -} - - TEST_F(ServiceExecutorFixedFixture, ScheduleSucceedsBeforeShutdown) { ServiceExecutorHandle executorHandle(ServiceExecutorHandle::kStartExecutor); -- cgit v1.2.1