diff options
author | Amirsaman Memaripour <amirsaman.memaripour@mongodb.com> | 2021-10-28 16:44:30 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-10-28 18:27:43 +0000 |
commit | 5df159a364ec3a94d1e1ae01c70e3ca33cb10b4f (patch) | |
tree | d3cff6248090ecc9c1f73a6e95d8963628345381 /src/mongo | |
parent | 2d428795b0bf58f39f46ab75e9ec638ce8f83a17 (diff) | |
download | mongo-5df159a364ec3a94d1e1ae01c70e3ca33cb10b4f.tar.gz |
SERVER-59858 Add observability for tasks scheduled on reactor threads
Diffstat (limited to 'src/mongo')
25 files changed, 534 insertions, 10 deletions
diff --git a/src/mongo/client/replica_set_monitor_manager.cpp b/src/mongo/client/replica_set_monitor_manager.cpp index 0a8c1d5d9b8..3919437676a 100644 --- a/src/mongo/client/replica_set_monitor_manager.cpp +++ b/src/mongo/client/replica_set_monitor_manager.cpp @@ -196,14 +196,16 @@ shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getOrCreateMonitor( "Starting Replica Set Monitor", "protocol"_attr = toString(gReplicaSetMonitorProtocol), "uri"_attr = uri.toString()); + invariant(_taskExecutor); if (gReplicaSetMonitorProtocol == ReplicaSetMonitorProtocol::kScanning) { - newMonitor = std::make_shared<ScanningReplicaSetMonitor>(uri, cleanupCallback); + newMonitor = + std::make_shared<ScanningReplicaSetMonitor>(uri, _taskExecutor, cleanupCallback); newMonitor->init(); } else { // Both ReplicaSetMonitorProtocol::kSdam and ReplicaSetMonitorProtocol::kStreamable use the // StreamableReplicaSetMonitor. newMonitor = StreamableReplicaSetMonitor::make( - uri, getExecutor(), _getConnectionManager(), cleanupCallback, _stats); + uri, _taskExecutor, _getConnectionManager(), cleanupCallback, _stats); } _monitors[setName] = newMonitor; _numMonitorsCreated++; @@ -352,7 +354,7 @@ void ReplicaSetMonitorManager::report(BSONObjBuilder* builder, bool forFTDC) { } std::shared_ptr<executor::TaskExecutor> ReplicaSetMonitorManager::getExecutor() { - invariant(_taskExecutor); + auto lk = stdx::lock_guard(_mutex); return _taskExecutor; } diff --git a/src/mongo/client/scanning_replica_set_monitor.cpp b/src/mongo/client/scanning_replica_set_monitor.cpp index 0902abd94e0..353bca3989d 100644 --- a/src/mongo/client/scanning_replica_set_monitor.cpp +++ b/src/mongo/client/scanning_replica_set_monitor.cpp @@ -175,11 +175,11 @@ ScanningReplicaSetMonitor::ScanningReplicaSetMonitor(const SetStatePtr& initialS : ReplicaSetMonitor(cleanupCallback), _state(initialState) {} ScanningReplicaSetMonitor::ScanningReplicaSetMonitor(const MongoURI& uri, + std::shared_ptr<TaskExecutor> executor, std::function<void()> cleanupCallback) : ScanningReplicaSetMonitor( - std::make_shared<SetState>(uri, - &ReplicaSetMonitorManager::get()->getNotifier(), - ReplicaSetMonitorManager::get()->getExecutor().get()), + std::make_shared<SetState>( + uri, &ReplicaSetMonitorManager::get()->getNotifier(), executor.get()), cleanupCallback) {} void ScanningReplicaSetMonitor::init() { diff --git a/src/mongo/client/scanning_replica_set_monitor.h b/src/mongo/client/scanning_replica_set_monitor.h index 85295e099f1..fca197a409a 100644 --- a/src/mongo/client/scanning_replica_set_monitor.h +++ b/src/mongo/client/scanning_replica_set_monitor.h @@ -65,7 +65,9 @@ public: static constexpr auto kExpeditedRefreshPeriod = Milliseconds(500); static constexpr auto kCheckTimeout = Seconds(5); - ScanningReplicaSetMonitor(const MongoURI& uri, std::function<void()> cleanupCallback); + ScanningReplicaSetMonitor(const MongoURI& uri, + std::shared_ptr<executor::TaskExecutor> executor, + std::function<void()> cleanupCallback); void init() override; diff --git a/src/mongo/db/ftdc/ftdc_mongos.cpp b/src/mongo/db/ftdc/ftdc_mongos.cpp index 210ed04ba96..0843e8d3dfc 100644 --- a/src/mongo/db/ftdc/ftdc_mongos.cpp +++ b/src/mongo/db/ftdc/ftdc_mongos.cpp @@ -37,6 +37,7 @@ #include "mongo/client/connpool.h" #include "mongo/client/dbclient_connection.h" #include "mongo/client/global_conn_pool.h" +#include "mongo/client/replica_set_monitor_manager.h" #include "mongo/db/ftdc/controller.h" #include "mongo/db/ftdc/ftdc_server.h" #include "mongo/db/repl/replication_coordinator.h" @@ -84,10 +85,30 @@ public: } }; +class NetworkInterfaceStatsCollector final : public FTDCCollectorInterface { +public: + void collect(OperationContext* opCtx, BSONObjBuilder& builder) override { + auto const grid = Grid::get(opCtx); + if (auto executorPool = grid->getExecutorPool()) { + executorPool->appendNetworkInterfaceStats(builder); + } + + if (auto executor = ReplicaSetMonitorManager::get()->getExecutor()) { + executor->appendNetworkInterfaceStats(builder); + } + } + + std::string name() const override { + return "networkInterfaceStats"; + } +}; + void registerMongoSCollectors(FTDCController* controller) { // PoolStats controller->addPeriodicCollector(std::make_unique<ConnPoolStatsCollector>()); + controller->addPeriodicCollector(std::make_unique<NetworkInterfaceStatsCollector>()); + // GetDefaultRWConcern controller->addOnRotateCollector(std::make_unique<FTDCSimpleInternalCommandCollector>( "getDefaultRWConcern", diff --git a/src/mongo/executor/network_interface.h b/src/mongo/executor/network_interface.h index 852f9236ebb..799f473a12d 100644 --- a/src/mongo/executor/network_interface.h +++ b/src/mongo/executor/network_interface.h @@ -33,6 +33,7 @@ #include <functional> #include <string> +#include "mongo/bson/bsonobjbuilder.h" #include "mongo/executor/task_executor.h" #include "mongo/transport/baton.h" #include "mongo/util/fail_point.h" @@ -77,6 +78,11 @@ public: virtual void appendConnectionStats(ConnectionPoolStats* stats) const = 0; /** + * Appends information about this instance of NetworkInterface. + */ + virtual void appendStats(BSONObjBuilder&) const = 0; + + /** * Starts up the network interface. * * It is valid to call all methods except shutdown() before this method completes. That is, diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h index ecbba32e7ae..e096589d640 100644 --- a/src/mongo/executor/network_interface_mock.h +++ b/src/mongo/executor/network_interface_mock.h @@ -100,6 +100,7 @@ public: //////////////////////////////////////////////////////////////////////////////// void appendConnectionStats(ConnectionPoolStats* stats) const override {} + void appendStats(BSONObjBuilder&) const override {} std::string getDiagnosticString() override; diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp index 41e744405ad..db070586b4b 100644 --- a/src/mongo/executor/network_interface_tl.cpp +++ b/src/mongo/executor/network_interface_tl.cpp @@ -192,6 +192,11 @@ void NetworkInterfaceTL::appendConnectionStats(ConnectionPoolStats* stats) const pool->appendConnectionStats(stats); } +void NetworkInterfaceTL::appendStats(BSONObjBuilder& bob) const { + BSONObjBuilder builder = bob.subobjStart(_instanceName); + _reactor->appendStats(builder); +} + NetworkInterface::Counters NetworkInterfaceTL::getCounters() const { invariant(_counters); return _counters->get(); diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h index 646ebfbb11f..158e4b0a679 100644 --- a/src/mongo/executor/network_interface_tl.h +++ b/src/mongo/executor/network_interface_tl.h @@ -64,6 +64,7 @@ public: std::string getDiagnosticString() override; void appendConnectionStats(ConnectionPoolStats* stats) const override; + void appendStats(BSONObjBuilder&) const override; std::string getHostName() override; Counters getCounters() const override; diff --git a/src/mongo/executor/scoped_task_executor.cpp b/src/mongo/executor/scoped_task_executor.cpp index 82062cf1454..e780f98a43d 100644 --- a/src/mongo/executor/scoped_task_executor.cpp +++ b/src/mongo/executor/scoped_task_executor.cpp @@ -190,6 +190,10 @@ public: MONGO_UNREACHABLE; } + void appendNetworkInterfaceStats(BSONObjBuilder&) const override { + MONGO_UNREACHABLE; + } + private: /** * Helper function to get a shared_ptr<ScopedTaskExecutor::Impl> to this object, akin to diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h index 81855a7bade..ad8cab93436 100644 --- a/src/mongo/executor/task_executor.h +++ b/src/mongo/executor/task_executor.h @@ -426,6 +426,11 @@ public: */ virtual void dropConnections(const HostAndPort& hostAndPort) = 0; + /** + * Appends statistics for the underlying network interface. + */ + virtual void appendNetworkInterfaceStats(BSONObjBuilder&) const = 0; + protected: // Retrieves the Callback from a given CallbackHandle static CallbackState* getCallbackFromHandle(const CallbackHandle& cbHandle); diff --git a/src/mongo/executor/task_executor_pool.cpp b/src/mongo/executor/task_executor_pool.cpp index ec706a1f0c6..3b53f624154 100644 --- a/src/mongo/executor/task_executor_pool.cpp +++ b/src/mongo/executor/task_executor_pool.cpp @@ -101,5 +101,12 @@ void TaskExecutorPool::appendConnectionStats(ConnectionPoolStats* stats) const { } } +void TaskExecutorPool::appendNetworkInterfaceStats(BSONObjBuilder& bob) const { + _fixedExecutor->appendNetworkInterfaceStats(bob); + for (auto&& executor : _executors) { + executor->appendNetworkInterfaceStats(bob); + } +} + } // namespace executor } // namespace mongo diff --git a/src/mongo/executor/task_executor_pool.h b/src/mongo/executor/task_executor_pool.h index c593f52da22..b5bbbd72352 100644 --- a/src/mongo/executor/task_executor_pool.h +++ b/src/mongo/executor/task_executor_pool.h @@ -110,6 +110,13 @@ public: */ void appendConnectionStats(ConnectionPoolStats* stats) const; + /** + * Appends statistics for all the executors, in particular their underlying network interfaces, + * in the pool. The information is collected in a non-blocking fashion and is just an + * approximate. + */ + void appendNetworkInterfaceStats(BSONObjBuilder&) const; + private: AtomicWord<unsigned> _counter; diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp index b9a500deab7..2dfd75cddce 100644 --- a/src/mongo/executor/thread_pool_task_executor.cpp +++ b/src/mongo/executor/thread_pool_task_executor.cpp @@ -531,6 +531,10 @@ void ThreadPoolTaskExecutor::appendConnectionStats(ConnectionPoolStats* stats) c _net->appendConnectionStats(stats); } +void ThreadPoolTaskExecutor::appendNetworkInterfaceStats(BSONObjBuilder& bob) const { + _net->appendStats(bob); +} + StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::enqueueCallbackState_inlock( WorkQueue* queue, WorkQueue* wq) { if (_inShutdown_inlock()) { diff --git a/src/mongo/executor/thread_pool_task_executor.h b/src/mongo/executor/thread_pool_task_executor.h index 1b02b812b85..9c95467118f 100644 --- a/src/mongo/executor/thread_pool_task_executor.h +++ b/src/mongo/executor/thread_pool_task_executor.h @@ -101,6 +101,8 @@ public: void dropConnections(const HostAndPort& hostAndPort) override; + void appendNetworkInterfaceStats(BSONObjBuilder&) const override; + /** * Returns true if there are any tasks in any of _poolInProgressQueue, _networkInProgressQueue, * or _sleepersQueue. diff --git a/src/mongo/s/sharding_task_executor.cpp b/src/mongo/s/sharding_task_executor.cpp index 6fcb68704b0..721137f31c8 100644 --- a/src/mongo/s/sharding_task_executor.cpp +++ b/src/mongo/s/sharding_task_executor.cpp @@ -262,5 +262,9 @@ void ShardingTaskExecutor::dropConnections(const HostAndPort& hostAndPort) { _executor->dropConnections(hostAndPort); } +void ShardingTaskExecutor::appendNetworkInterfaceStats(BSONObjBuilder& bob) const { + _executor->appendNetworkInterfaceStats(bob); +} + } // namespace executor } // namespace mongo diff --git a/src/mongo/s/sharding_task_executor.h b/src/mongo/s/sharding_task_executor.h index 803283b48b5..3db773c5f42 100644 --- a/src/mongo/s/sharding_task_executor.h +++ b/src/mongo/s/sharding_task_executor.h @@ -84,6 +84,7 @@ public: Interruptible* interruptible = Interruptible::notInterruptible()) override; void appendConnectionStats(ConnectionPoolStats* stats) const override; + void appendNetworkInterfaceStats(BSONObjBuilder&) const override; void dropConnections(const HostAndPort& hostAndPort) override; diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript index 61b30fed670..b166e728719 100644 --- a/src/mongo/transport/SConscript +++ b/src/mongo/transport/SConscript @@ -63,6 +63,7 @@ tlEnv.Library( '$BUILD_DIR/mongo/db/server_options_core', '$BUILD_DIR/mongo/db/service_context', '$BUILD_DIR/mongo/db/stats/counters', + '$BUILD_DIR/mongo/util/executor_stats', 'transport_layer_common', ], LIBDEPS_PRIVATE=[ diff --git a/src/mongo/transport/transport_layer.h b/src/mongo/transport/transport_layer.h index a3f4cc326a8..0398da05442 100644 --- a/src/mongo/transport/transport_layer.h +++ b/src/mongo/transport/transport_layer.h @@ -213,6 +213,8 @@ public: virtual std::unique_ptr<ReactorTimer> makeTimer() = 0; virtual Date_t now() = 0; + virtual void appendStats(BSONObjBuilder& bob) const = 0; + protected: Reactor() = default; }; diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp index 363d2ca35db..6047ffa7561 100644 --- a/src/mongo/transport/transport_layer_asio.cpp +++ b/src/mongo/transport/transport_layer_asio.cpp @@ -50,7 +50,9 @@ #include "mongo/transport/asio_utils.h" #include "mongo/transport/service_entry_point.h" #include "mongo/transport/transport_options_gen.h" +#include "mongo/util/clock_source.h" #include "mongo/util/errno_util.h" +#include "mongo/util/executor_stats.h" #include "mongo/util/hierarchical_acquisition.h" #include "mongo/util/net/hostandport.h" #include "mongo/util/net/sockaddr.h" @@ -180,7 +182,7 @@ private: class TransportLayerASIO::ASIOReactor final : public Reactor { public: - ASIOReactor() : _ioContext() {} + ASIOReactor() : _clkSource(this), _stats(&_clkSource), _ioContext() {} void run() noexcept override { ThreadIdGuard threadIdGuard(this); @@ -216,11 +218,12 @@ public: } void schedule(Task task) override { - asio::post(_ioContext, [task = std::move(task)] { task(Status::OK()); }); + asio::post(_ioContext, [task = _stats.wrapTask(std::move(task))] { task(Status::OK()); }); } void dispatch(Task task) override { - asio::dispatch(_ioContext, [task = std::move(task)] { task(Status::OK()); }); + asio::dispatch(_ioContext, + [task = _stats.wrapTask(std::move(task))] { task(Status::OK()); }); } bool onReactorThread() const override { @@ -231,7 +234,29 @@ public: return _ioContext; } + void appendStats(BSONObjBuilder& bob) const override { + _stats.serialize(&bob); + } + private: + // Provides `ClockSource` API for the reactor's clock source. + class ReactorClockSource final : public ClockSource { + public: + explicit ReactorClockSource(ASIOReactor* reactor) : _reactor(reactor) {} + ~ReactorClockSource() = default; + + Milliseconds getPrecision() override { + MONGO_UNREACHABLE; + } + + Date_t now() override { + return _reactor->now(); + } + + private: + ASIOReactor* const _reactor; + }; + class ThreadIdGuard { public: ThreadIdGuard(TransportLayerASIO::ASIOReactor* reactor) { @@ -247,6 +272,10 @@ private: static thread_local ASIOReactor* _reactorForThread; + ReactorClockSource _clkSource; + + ExecutorStats _stats; + asio::io_context _ioContext; }; diff --git a/src/mongo/unittest/task_executor_proxy.cpp b/src/mongo/unittest/task_executor_proxy.cpp index 4fdffbac8ae..9c992e3a352 100644 --- a/src/mongo/unittest/task_executor_proxy.cpp +++ b/src/mongo/unittest/task_executor_proxy.cpp @@ -142,5 +142,9 @@ void TaskExecutorProxy::dropConnections(const HostAndPort& hostAndPort) { _executor.load()->dropConnections(hostAndPort); } +void TaskExecutorProxy::appendNetworkInterfaceStats(BSONObjBuilder& bob) const { + _executor.load()->appendNetworkInterfaceStats(bob); +} + } // namespace unittest } // namespace mongo diff --git a/src/mongo/unittest/task_executor_proxy.h b/src/mongo/unittest/task_executor_proxy.h index 2507a230458..991d8c68c18 100644 --- a/src/mongo/unittest/task_executor_proxy.h +++ b/src/mongo/unittest/task_executor_proxy.h @@ -88,6 +88,7 @@ public: Interruptible* interruptible = Interruptible::notInterruptible()) override; void appendConnectionStats(executor::ConnectionPoolStats* stats) const override; void dropConnections(const HostAndPort& hostAndPort) override; + void appendNetworkInterfaceStats(BSONObjBuilder&) const override; private: // Not owned by us. diff --git a/src/mongo/util/SConscript b/src/mongo/util/SConscript index 3196846636f..97c6aa75eb4 100644 --- a/src/mongo/util/SConscript +++ b/src/mongo/util/SConscript @@ -493,6 +493,26 @@ env.Library( ] ) +env.Library( + target='executor_stats', + source=[ + 'executor_stats.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + ], +) + +env.CppUnitTest( + target='executor_stats_test', + source=[ + 'executor_stats_test.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/util/clock_source_mock', + '$BUILD_DIR/mongo/util/executor_stats', + ], +) env.Benchmark( target='decimal_counter_bm', diff --git a/src/mongo/util/executor_stats.cpp b/src/mongo/util/executor_stats.cpp new file mode 100644 index 00000000000..3f24f4e84ad --- /dev/null +++ b/src/mongo/util/executor_stats.cpp @@ -0,0 +1,96 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include <fmt/format.h> +#include <string> + +#include "mongo/util/executor_stats.h" + +#include "mongo/base/status.h" + +namespace mongo { + +namespace { + +constexpr auto kMillisPerBucket = 50; + +void recordDuration(Counter64* buckets, Milliseconds duration) { + size_t index; + if (duration < Milliseconds(1)) { + index = 0; + } else if (duration >= Seconds(1)) { + index = ExecutorStats::kNumBuckets - 1; + } else { + // Each bucket covers a 50 milliseconds window (e.g., [1, 50), [50, 100) and so on). + // That's why the duration (in milliseconds) is divided by 50 to compute the index. + index = 1 + durationCount<Milliseconds>(duration) / kMillisPerBucket; + } + buckets[index].increment(); +} + +void serializeBuckets(const Counter64* buckets, BSONObjBuilder bob) { + auto makeTag = [](size_t i) -> std::string { + if (i == 0) + return "0-999us"; + if (i == ExecutorStats::kNumBuckets - 1) + return "1000ms+"; + + const auto lb = i > 1 ? (i - 1) * kMillisPerBucket : 1; + const auto ub = i * kMillisPerBucket - 1; + return fmt::format("{}-{}ms", lb, ub); + }; + + for (size_t i = 0; i < ExecutorStats::kNumBuckets; i++) { + bob.append(makeTag(i), buckets[i].get()); + } +} + +} // namespace + +ExecutorStats::Task ExecutorStats::wrapTask(ExecutorStats::Task&& task) { + _scheduled.increment(1); + return [this, task = std::move(task), scheduledAt = _clkSource->now()](Status status) { + const auto startedAt = _clkSource->now(); + recordDuration(_waitingBuckets, startedAt - scheduledAt); + + task(std::move(status)); + + recordDuration(_runningBuckets, _clkSource->now() - startedAt); + _executed.increment(1); + }; +} + +void ExecutorStats::serialize(BSONObjBuilder* bob) const { + bob->append("scheduled"_sd, _scheduled.get()); + bob->append("executed"_sd, _executed.get()); + serializeBuckets(_waitingBuckets, bob->subobjStart("waitTime"_sd)); + serializeBuckets(_runningBuckets, bob->subobjStart("runTime"_sd)); +} + +} // namespace mongo diff --git a/src/mongo/util/executor_stats.h b/src/mongo/util/executor_stats.h new file mode 100644 index 00000000000..6474c1bc5ce --- /dev/null +++ b/src/mongo/util/executor_stats.h @@ -0,0 +1,80 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/base/counter.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/util/clock_source.h" +#include "mongo/util/duration.h" +#include "mongo/util/out_of_line_executor.h" + +namespace mongo { + +/** + * A utility to collect stats for tasks scheduled on an executor (e.g., the reactor thread). + * + * This class expects a clock source to measure the waiting and execution time of tasks. The clock + * source must always outlive instances of this class. + * + * This class collects stats for any task that is wrapped using `wrapTask`. The wrapped task may + * never outlive the instance of `ExecutorStats`. + * + * All public interfaces for this class are thread-safe. + */ +class ExecutorStats { +public: + explicit ExecutorStats(ClockSource* clkSource) : _clkSource(clkSource) {} + + using Task = OutOfLineExecutor::Task; + Task wrapTask(Task&&); + + void serialize(BSONObjBuilder* bob) const; + + static constexpr auto kNumBuckets = 22; + +private: + Counter64 _scheduled; + Counter64 _executed; + + /** + * The following maintain histograms for tasks scheduled on the executor: + * `_waitingBuckets` represents the waiting time for the tasks pending execution. + * `_runningBuckets` keeps track of execution time of individual tasks after completion. + * Buckets 0 represents any recorded latency less than 1 milliseconds. + * Buckets 1 to 20 represent the 1-999 ms range. + * The last bucket represents any recorded latency that exceeds one second. + */ + Counter64 _waitingBuckets[kNumBuckets]; + Counter64 _runningBuckets[kNumBuckets]; + + ClockSource* const _clkSource; +}; + +} // namespace mongo diff --git a/src/mongo/util/executor_stats_test.cpp b/src/mongo/util/executor_stats_test.cpp new file mode 100644 index 00000000000..59285351d3f --- /dev/null +++ b/src/mongo/util/executor_stats_test.cpp @@ -0,0 +1,219 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest + +#include <fmt/format.h> +#include <functional> +#include <memory> +#include <string> +#include <tuple> +#include <vector> + +#include "mongo/bson/bsonobj.h" +#include "mongo/logv2/log.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/clock_source_mock.h" +#include "mongo/util/executor_stats.h" + +namespace mongo { + +class ExecutorStatsTest : public unittest::Test { +public: + void setUp() override { + _stats = std::make_unique<ExecutorStats>(&_clkSource); + } + + void tearDown() override { + _stats.reset(); + } + + auto getScheduled() const { + return getStats().getField("scheduled").numberLong(); + } + + auto getExecuted() const { + return getStats().getField("executed").numberLong(); + } + + auto getWaitTime() const { + return getStats().getObjectField("waitTime").getOwned(); + } + + auto getRunTime() const { + return getStats().getObjectField("runTime").getOwned(); + } + + auto wrapTask(ExecutorStats::Task&& task) { + return _stats->wrapTask(std::move(task)); + } + + void advanceTime(Milliseconds step) { + _clkSource.advance(step); + } + + BSONObj getStats() const { + BSONObjBuilder bob; + _stats->serialize(&bob); + return bob.obj(); + } + + void runTimingTests(std::string histogramTag, + std::function<size_t(Microseconds, Microseconds)> test) { + // Represents a test case, where `min` and `max` will be provided to `test`. The expected + // behavior for `ExecutorStats` is to update the histogram bucket corresponding to `tag`. + struct Bucket { + std::string tag; + Microseconds min; + Microseconds max; + }; + + const Bucket inputs[]{ + {"0-999us", Microseconds(0), Microseconds(1000)}, + {"1-49ms", Milliseconds(1), Milliseconds(50)}, + {"50-99ms", Milliseconds(50), Milliseconds(100)}, + {"500-549ms", Milliseconds(500), Milliseconds(550)}, + {"950-999ms", Milliseconds(950), Milliseconds(1000)}, + {"1000ms+", Seconds(1), Seconds(100)}, + }; + + for (const auto& bucket : inputs) { + // Runs the test and captures stats before and after. + // For each entry in `inputs`, the following calls into `test` and provides `min` and + // `max` delays. The test function returns the number of tasks it schedules and runs. + const auto beforeStats = getStats().getObjectField(histogramTag).getOwned(); + + const auto numTasks = test(bucket.min, bucket.max); + + const auto afterStats = getStats().getObjectField(histogramTag).getOwned(); + + // Verify stats by inspecting the before and after states. The expectation is for the + // bucket corresponding to `bucket.tag` to increase by `numTasks`, and for all other + // buckets to remain unchanged. + std::string errMsg = + fmt::format("Bad value for bucket '{}' in {}", bucket.tag, afterStats.toString()); + ASSERT_EQ(afterStats.getField(bucket.tag).numberLong(), numTasks) << errMsg; + + for (const auto& element : afterStats) { + const auto fieldName = element.fieldName(); + if (fieldName == bucket.tag) + continue; + std::string errMsg = fmt::format("Expected matching values for '{}' in {} and {}", + fieldName, + beforeStats.toString(), + afterStats.toString()); + ASSERT_EQ(beforeStats.getField(fieldName).numberLong(), element.numberLong()) + << errMsg; + } + } + + LOGV2(5985801, "Execution stats", "stats"_attr = getStats()); + } + +private: + ClockSourceMock _clkSource; + + std::unique_ptr<ExecutorStats> _stats; +}; + +TEST_F(ExecutorStatsTest, ScheduledAndExecutedMetrics) { + /** + * Wrap a total of three tasks and run them to ensure both `scheduled` and `executed` + * metrics are properly incremented. + */ + auto t1 = wrapTask([](Status) {}); + ASSERT_EQ(getScheduled(), 1); + ASSERT_EQ(getExecuted(), 0); + t1(Status::OK()); + ASSERT_EQ(getScheduled(), 1); + ASSERT_EQ(getExecuted(), 1); + + { + auto t2 = wrapTask([](Status) {}); + ASSERT_EQ(getScheduled(), 2); + auto t3 = wrapTask([](Status) {}); + ASSERT_EQ(getScheduled(), 3); + ASSERT_EQ(getExecuted(), 1); + + t3(Status::OK()); + ASSERT_EQ(getExecuted(), 2); + } + + ASSERT_EQ(getScheduled(), 3); + ASSERT_EQ(getExecuted(), 2); +} + + +TEST_F(ExecutorStatsTest, WaitTime) { + /** + * Schedules a total of 100 tasks and introduces artificial delays before running each task. + * The delays start at `min`, never exceed `max`, and are incrementally increased by `step`. + */ + auto test = [this](Microseconds min, Microseconds max) { + const auto numTasks = 100; + const auto step = (max - min) / numTasks; + ASSERT_GT(step, Microseconds{0}); + + size_t scheduledTasks = 0; + for (auto delay = min; delay < max; delay += step, scheduledTasks++) { + auto task = wrapTask([](Status) {}); + advanceTime(duration_cast<Milliseconds>(delay)); + task(Status::OK()); + } + + return scheduledTasks; + }; + + runTimingTests("waitTime", test); +} + +TEST_F(ExecutorStatsTest, RunTime) { + /** + * Schedules a few tasks and introduces artificial delays as running each task. These delays + * follow the same semantics as mentioned earlier in `ExecutorStatsTest::WaitTime`. + */ + auto test = [this](Microseconds min, Microseconds max) { + const auto numTasks = 20; + const auto step = (max - min) / numTasks; + ASSERT_GT(step, Microseconds{0}); + + size_t scheduledTasks = 0; + for (auto delay = min; delay < max; delay += step, scheduledTasks++) { + auto task = wrapTask( + [this, delay](Status) { advanceTime(duration_cast<Milliseconds>(delay)); }); + task(Status::OK()); + } + + return scheduledTasks; + }; + + runTimingTests("runTime", test); +} + +} // namespace mongo |