summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorAmirsaman Memaripour <amirsaman.memaripour@mongodb.com>2021-10-28 16:44:30 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-10-28 18:27:43 +0000
commit5df159a364ec3a94d1e1ae01c70e3ca33cb10b4f (patch)
treed3cff6248090ecc9c1f73a6e95d8963628345381 /src/mongo
parent2d428795b0bf58f39f46ab75e9ec638ce8f83a17 (diff)
downloadmongo-5df159a364ec3a94d1e1ae01c70e3ca33cb10b4f.tar.gz
SERVER-59858 Add observability for tasks scheduled on reactor threads
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/client/replica_set_monitor_manager.cpp8
-rw-r--r--src/mongo/client/scanning_replica_set_monitor.cpp6
-rw-r--r--src/mongo/client/scanning_replica_set_monitor.h4
-rw-r--r--src/mongo/db/ftdc/ftdc_mongos.cpp21
-rw-r--r--src/mongo/executor/network_interface.h6
-rw-r--r--src/mongo/executor/network_interface_mock.h1
-rw-r--r--src/mongo/executor/network_interface_tl.cpp5
-rw-r--r--src/mongo/executor/network_interface_tl.h1
-rw-r--r--src/mongo/executor/scoped_task_executor.cpp4
-rw-r--r--src/mongo/executor/task_executor.h5
-rw-r--r--src/mongo/executor/task_executor_pool.cpp7
-rw-r--r--src/mongo/executor/task_executor_pool.h7
-rw-r--r--src/mongo/executor/thread_pool_task_executor.cpp4
-rw-r--r--src/mongo/executor/thread_pool_task_executor.h2
-rw-r--r--src/mongo/s/sharding_task_executor.cpp4
-rw-r--r--src/mongo/s/sharding_task_executor.h1
-rw-r--r--src/mongo/transport/SConscript1
-rw-r--r--src/mongo/transport/transport_layer.h2
-rw-r--r--src/mongo/transport/transport_layer_asio.cpp35
-rw-r--r--src/mongo/unittest/task_executor_proxy.cpp4
-rw-r--r--src/mongo/unittest/task_executor_proxy.h1
-rw-r--r--src/mongo/util/SConscript20
-rw-r--r--src/mongo/util/executor_stats.cpp96
-rw-r--r--src/mongo/util/executor_stats.h80
-rw-r--r--src/mongo/util/executor_stats_test.cpp219
25 files changed, 534 insertions, 10 deletions
diff --git a/src/mongo/client/replica_set_monitor_manager.cpp b/src/mongo/client/replica_set_monitor_manager.cpp
index 0a8c1d5d9b8..3919437676a 100644
--- a/src/mongo/client/replica_set_monitor_manager.cpp
+++ b/src/mongo/client/replica_set_monitor_manager.cpp
@@ -196,14 +196,16 @@ shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getOrCreateMonitor(
"Starting Replica Set Monitor",
"protocol"_attr = toString(gReplicaSetMonitorProtocol),
"uri"_attr = uri.toString());
+ invariant(_taskExecutor);
if (gReplicaSetMonitorProtocol == ReplicaSetMonitorProtocol::kScanning) {
- newMonitor = std::make_shared<ScanningReplicaSetMonitor>(uri, cleanupCallback);
+ newMonitor =
+ std::make_shared<ScanningReplicaSetMonitor>(uri, _taskExecutor, cleanupCallback);
newMonitor->init();
} else {
// Both ReplicaSetMonitorProtocol::kSdam and ReplicaSetMonitorProtocol::kStreamable use the
// StreamableReplicaSetMonitor.
newMonitor = StreamableReplicaSetMonitor::make(
- uri, getExecutor(), _getConnectionManager(), cleanupCallback, _stats);
+ uri, _taskExecutor, _getConnectionManager(), cleanupCallback, _stats);
}
_monitors[setName] = newMonitor;
_numMonitorsCreated++;
@@ -352,7 +354,7 @@ void ReplicaSetMonitorManager::report(BSONObjBuilder* builder, bool forFTDC) {
}
std::shared_ptr<executor::TaskExecutor> ReplicaSetMonitorManager::getExecutor() {
- invariant(_taskExecutor);
+ auto lk = stdx::lock_guard(_mutex);
return _taskExecutor;
}
diff --git a/src/mongo/client/scanning_replica_set_monitor.cpp b/src/mongo/client/scanning_replica_set_monitor.cpp
index 0902abd94e0..353bca3989d 100644
--- a/src/mongo/client/scanning_replica_set_monitor.cpp
+++ b/src/mongo/client/scanning_replica_set_monitor.cpp
@@ -175,11 +175,11 @@ ScanningReplicaSetMonitor::ScanningReplicaSetMonitor(const SetStatePtr& initialS
: ReplicaSetMonitor(cleanupCallback), _state(initialState) {}
ScanningReplicaSetMonitor::ScanningReplicaSetMonitor(const MongoURI& uri,
+ std::shared_ptr<TaskExecutor> executor,
std::function<void()> cleanupCallback)
: ScanningReplicaSetMonitor(
- std::make_shared<SetState>(uri,
- &ReplicaSetMonitorManager::get()->getNotifier(),
- ReplicaSetMonitorManager::get()->getExecutor().get()),
+ std::make_shared<SetState>(
+ uri, &ReplicaSetMonitorManager::get()->getNotifier(), executor.get()),
cleanupCallback) {}
void ScanningReplicaSetMonitor::init() {
diff --git a/src/mongo/client/scanning_replica_set_monitor.h b/src/mongo/client/scanning_replica_set_monitor.h
index 85295e099f1..fca197a409a 100644
--- a/src/mongo/client/scanning_replica_set_monitor.h
+++ b/src/mongo/client/scanning_replica_set_monitor.h
@@ -65,7 +65,9 @@ public:
static constexpr auto kExpeditedRefreshPeriod = Milliseconds(500);
static constexpr auto kCheckTimeout = Seconds(5);
- ScanningReplicaSetMonitor(const MongoURI& uri, std::function<void()> cleanupCallback);
+ ScanningReplicaSetMonitor(const MongoURI& uri,
+ std::shared_ptr<executor::TaskExecutor> executor,
+ std::function<void()> cleanupCallback);
void init() override;
diff --git a/src/mongo/db/ftdc/ftdc_mongos.cpp b/src/mongo/db/ftdc/ftdc_mongos.cpp
index 210ed04ba96..0843e8d3dfc 100644
--- a/src/mongo/db/ftdc/ftdc_mongos.cpp
+++ b/src/mongo/db/ftdc/ftdc_mongos.cpp
@@ -37,6 +37,7 @@
#include "mongo/client/connpool.h"
#include "mongo/client/dbclient_connection.h"
#include "mongo/client/global_conn_pool.h"
+#include "mongo/client/replica_set_monitor_manager.h"
#include "mongo/db/ftdc/controller.h"
#include "mongo/db/ftdc/ftdc_server.h"
#include "mongo/db/repl/replication_coordinator.h"
@@ -84,10 +85,30 @@ public:
}
};
+class NetworkInterfaceStatsCollector final : public FTDCCollectorInterface {
+public:
+ void collect(OperationContext* opCtx, BSONObjBuilder& builder) override {
+ auto const grid = Grid::get(opCtx);
+ if (auto executorPool = grid->getExecutorPool()) {
+ executorPool->appendNetworkInterfaceStats(builder);
+ }
+
+ if (auto executor = ReplicaSetMonitorManager::get()->getExecutor()) {
+ executor->appendNetworkInterfaceStats(builder);
+ }
+ }
+
+ std::string name() const override {
+ return "networkInterfaceStats";
+ }
+};
+
void registerMongoSCollectors(FTDCController* controller) {
// PoolStats
controller->addPeriodicCollector(std::make_unique<ConnPoolStatsCollector>());
+ controller->addPeriodicCollector(std::make_unique<NetworkInterfaceStatsCollector>());
+
// GetDefaultRWConcern
controller->addOnRotateCollector(std::make_unique<FTDCSimpleInternalCommandCollector>(
"getDefaultRWConcern",
diff --git a/src/mongo/executor/network_interface.h b/src/mongo/executor/network_interface.h
index 852f9236ebb..799f473a12d 100644
--- a/src/mongo/executor/network_interface.h
+++ b/src/mongo/executor/network_interface.h
@@ -33,6 +33,7 @@
#include <functional>
#include <string>
+#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/executor/task_executor.h"
#include "mongo/transport/baton.h"
#include "mongo/util/fail_point.h"
@@ -77,6 +78,11 @@ public:
virtual void appendConnectionStats(ConnectionPoolStats* stats) const = 0;
/**
+ * Appends information about this instance of NetworkInterface.
+ */
+ virtual void appendStats(BSONObjBuilder&) const = 0;
+
+ /**
* Starts up the network interface.
*
* It is valid to call all methods except shutdown() before this method completes. That is,
diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h
index ecbba32e7ae..e096589d640 100644
--- a/src/mongo/executor/network_interface_mock.h
+++ b/src/mongo/executor/network_interface_mock.h
@@ -100,6 +100,7 @@ public:
////////////////////////////////////////////////////////////////////////////////
void appendConnectionStats(ConnectionPoolStats* stats) const override {}
+ void appendStats(BSONObjBuilder&) const override {}
std::string getDiagnosticString() override;
diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp
index 41e744405ad..db070586b4b 100644
--- a/src/mongo/executor/network_interface_tl.cpp
+++ b/src/mongo/executor/network_interface_tl.cpp
@@ -192,6 +192,11 @@ void NetworkInterfaceTL::appendConnectionStats(ConnectionPoolStats* stats) const
pool->appendConnectionStats(stats);
}
+void NetworkInterfaceTL::appendStats(BSONObjBuilder& bob) const {
+ BSONObjBuilder builder = bob.subobjStart(_instanceName);
+ _reactor->appendStats(builder);
+}
+
NetworkInterface::Counters NetworkInterfaceTL::getCounters() const {
invariant(_counters);
return _counters->get();
diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h
index 646ebfbb11f..158e4b0a679 100644
--- a/src/mongo/executor/network_interface_tl.h
+++ b/src/mongo/executor/network_interface_tl.h
@@ -64,6 +64,7 @@ public:
std::string getDiagnosticString() override;
void appendConnectionStats(ConnectionPoolStats* stats) const override;
+ void appendStats(BSONObjBuilder&) const override;
std::string getHostName() override;
Counters getCounters() const override;
diff --git a/src/mongo/executor/scoped_task_executor.cpp b/src/mongo/executor/scoped_task_executor.cpp
index 82062cf1454..e780f98a43d 100644
--- a/src/mongo/executor/scoped_task_executor.cpp
+++ b/src/mongo/executor/scoped_task_executor.cpp
@@ -190,6 +190,10 @@ public:
MONGO_UNREACHABLE;
}
+ void appendNetworkInterfaceStats(BSONObjBuilder&) const override {
+ MONGO_UNREACHABLE;
+ }
+
private:
/**
* Helper function to get a shared_ptr<ScopedTaskExecutor::Impl> to this object, akin to
diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h
index 81855a7bade..ad8cab93436 100644
--- a/src/mongo/executor/task_executor.h
+++ b/src/mongo/executor/task_executor.h
@@ -426,6 +426,11 @@ public:
*/
virtual void dropConnections(const HostAndPort& hostAndPort) = 0;
+ /**
+ * Appends statistics for the underlying network interface.
+ */
+ virtual void appendNetworkInterfaceStats(BSONObjBuilder&) const = 0;
+
protected:
// Retrieves the Callback from a given CallbackHandle
static CallbackState* getCallbackFromHandle(const CallbackHandle& cbHandle);
diff --git a/src/mongo/executor/task_executor_pool.cpp b/src/mongo/executor/task_executor_pool.cpp
index ec706a1f0c6..3b53f624154 100644
--- a/src/mongo/executor/task_executor_pool.cpp
+++ b/src/mongo/executor/task_executor_pool.cpp
@@ -101,5 +101,12 @@ void TaskExecutorPool::appendConnectionStats(ConnectionPoolStats* stats) const {
}
}
+void TaskExecutorPool::appendNetworkInterfaceStats(BSONObjBuilder& bob) const {
+ _fixedExecutor->appendNetworkInterfaceStats(bob);
+ for (auto&& executor : _executors) {
+ executor->appendNetworkInterfaceStats(bob);
+ }
+}
+
} // namespace executor
} // namespace mongo
diff --git a/src/mongo/executor/task_executor_pool.h b/src/mongo/executor/task_executor_pool.h
index c593f52da22..b5bbbd72352 100644
--- a/src/mongo/executor/task_executor_pool.h
+++ b/src/mongo/executor/task_executor_pool.h
@@ -110,6 +110,13 @@ public:
*/
void appendConnectionStats(ConnectionPoolStats* stats) const;
+ /**
+ * Appends statistics for all the executors, in particular their underlying network interfaces,
+ * in the pool. The information is collected in a non-blocking fashion and is just an
+ * approximate.
+ */
+ void appendNetworkInterfaceStats(BSONObjBuilder&) const;
+
private:
AtomicWord<unsigned> _counter;
diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp
index b9a500deab7..2dfd75cddce 100644
--- a/src/mongo/executor/thread_pool_task_executor.cpp
+++ b/src/mongo/executor/thread_pool_task_executor.cpp
@@ -531,6 +531,10 @@ void ThreadPoolTaskExecutor::appendConnectionStats(ConnectionPoolStats* stats) c
_net->appendConnectionStats(stats);
}
+void ThreadPoolTaskExecutor::appendNetworkInterfaceStats(BSONObjBuilder& bob) const {
+ _net->appendStats(bob);
+}
+
StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::enqueueCallbackState_inlock(
WorkQueue* queue, WorkQueue* wq) {
if (_inShutdown_inlock()) {
diff --git a/src/mongo/executor/thread_pool_task_executor.h b/src/mongo/executor/thread_pool_task_executor.h
index 1b02b812b85..9c95467118f 100644
--- a/src/mongo/executor/thread_pool_task_executor.h
+++ b/src/mongo/executor/thread_pool_task_executor.h
@@ -101,6 +101,8 @@ public:
void dropConnections(const HostAndPort& hostAndPort) override;
+ void appendNetworkInterfaceStats(BSONObjBuilder&) const override;
+
/**
* Returns true if there are any tasks in any of _poolInProgressQueue, _networkInProgressQueue,
* or _sleepersQueue.
diff --git a/src/mongo/s/sharding_task_executor.cpp b/src/mongo/s/sharding_task_executor.cpp
index 6fcb68704b0..721137f31c8 100644
--- a/src/mongo/s/sharding_task_executor.cpp
+++ b/src/mongo/s/sharding_task_executor.cpp
@@ -262,5 +262,9 @@ void ShardingTaskExecutor::dropConnections(const HostAndPort& hostAndPort) {
_executor->dropConnections(hostAndPort);
}
+void ShardingTaskExecutor::appendNetworkInterfaceStats(BSONObjBuilder& bob) const {
+ _executor->appendNetworkInterfaceStats(bob);
+}
+
} // namespace executor
} // namespace mongo
diff --git a/src/mongo/s/sharding_task_executor.h b/src/mongo/s/sharding_task_executor.h
index 803283b48b5..3db773c5f42 100644
--- a/src/mongo/s/sharding_task_executor.h
+++ b/src/mongo/s/sharding_task_executor.h
@@ -84,6 +84,7 @@ public:
Interruptible* interruptible = Interruptible::notInterruptible()) override;
void appendConnectionStats(ConnectionPoolStats* stats) const override;
+ void appendNetworkInterfaceStats(BSONObjBuilder&) const override;
void dropConnections(const HostAndPort& hostAndPort) override;
diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript
index 61b30fed670..b166e728719 100644
--- a/src/mongo/transport/SConscript
+++ b/src/mongo/transport/SConscript
@@ -63,6 +63,7 @@ tlEnv.Library(
'$BUILD_DIR/mongo/db/server_options_core',
'$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/mongo/db/stats/counters',
+ '$BUILD_DIR/mongo/util/executor_stats',
'transport_layer_common',
],
LIBDEPS_PRIVATE=[
diff --git a/src/mongo/transport/transport_layer.h b/src/mongo/transport/transport_layer.h
index a3f4cc326a8..0398da05442 100644
--- a/src/mongo/transport/transport_layer.h
+++ b/src/mongo/transport/transport_layer.h
@@ -213,6 +213,8 @@ public:
virtual std::unique_ptr<ReactorTimer> makeTimer() = 0;
virtual Date_t now() = 0;
+ virtual void appendStats(BSONObjBuilder& bob) const = 0;
+
protected:
Reactor() = default;
};
diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp
index 363d2ca35db..6047ffa7561 100644
--- a/src/mongo/transport/transport_layer_asio.cpp
+++ b/src/mongo/transport/transport_layer_asio.cpp
@@ -50,7 +50,9 @@
#include "mongo/transport/asio_utils.h"
#include "mongo/transport/service_entry_point.h"
#include "mongo/transport/transport_options_gen.h"
+#include "mongo/util/clock_source.h"
#include "mongo/util/errno_util.h"
+#include "mongo/util/executor_stats.h"
#include "mongo/util/hierarchical_acquisition.h"
#include "mongo/util/net/hostandport.h"
#include "mongo/util/net/sockaddr.h"
@@ -180,7 +182,7 @@ private:
class TransportLayerASIO::ASIOReactor final : public Reactor {
public:
- ASIOReactor() : _ioContext() {}
+ ASIOReactor() : _clkSource(this), _stats(&_clkSource), _ioContext() {}
void run() noexcept override {
ThreadIdGuard threadIdGuard(this);
@@ -216,11 +218,12 @@ public:
}
void schedule(Task task) override {
- asio::post(_ioContext, [task = std::move(task)] { task(Status::OK()); });
+ asio::post(_ioContext, [task = _stats.wrapTask(std::move(task))] { task(Status::OK()); });
}
void dispatch(Task task) override {
- asio::dispatch(_ioContext, [task = std::move(task)] { task(Status::OK()); });
+ asio::dispatch(_ioContext,
+ [task = _stats.wrapTask(std::move(task))] { task(Status::OK()); });
}
bool onReactorThread() const override {
@@ -231,7 +234,29 @@ public:
return _ioContext;
}
+ void appendStats(BSONObjBuilder& bob) const override {
+ _stats.serialize(&bob);
+ }
+
private:
+ // Provides `ClockSource` API for the reactor's clock source.
+ class ReactorClockSource final : public ClockSource {
+ public:
+ explicit ReactorClockSource(ASIOReactor* reactor) : _reactor(reactor) {}
+ ~ReactorClockSource() = default;
+
+ Milliseconds getPrecision() override {
+ MONGO_UNREACHABLE;
+ }
+
+ Date_t now() override {
+ return _reactor->now();
+ }
+
+ private:
+ ASIOReactor* const _reactor;
+ };
+
class ThreadIdGuard {
public:
ThreadIdGuard(TransportLayerASIO::ASIOReactor* reactor) {
@@ -247,6 +272,10 @@ private:
static thread_local ASIOReactor* _reactorForThread;
+ ReactorClockSource _clkSource;
+
+ ExecutorStats _stats;
+
asio::io_context _ioContext;
};
diff --git a/src/mongo/unittest/task_executor_proxy.cpp b/src/mongo/unittest/task_executor_proxy.cpp
index 4fdffbac8ae..9c992e3a352 100644
--- a/src/mongo/unittest/task_executor_proxy.cpp
+++ b/src/mongo/unittest/task_executor_proxy.cpp
@@ -142,5 +142,9 @@ void TaskExecutorProxy::dropConnections(const HostAndPort& hostAndPort) {
_executor.load()->dropConnections(hostAndPort);
}
+void TaskExecutorProxy::appendNetworkInterfaceStats(BSONObjBuilder& bob) const {
+ _executor.load()->appendNetworkInterfaceStats(bob);
+}
+
} // namespace unittest
} // namespace mongo
diff --git a/src/mongo/unittest/task_executor_proxy.h b/src/mongo/unittest/task_executor_proxy.h
index 2507a230458..991d8c68c18 100644
--- a/src/mongo/unittest/task_executor_proxy.h
+++ b/src/mongo/unittest/task_executor_proxy.h
@@ -88,6 +88,7 @@ public:
Interruptible* interruptible = Interruptible::notInterruptible()) override;
void appendConnectionStats(executor::ConnectionPoolStats* stats) const override;
void dropConnections(const HostAndPort& hostAndPort) override;
+ void appendNetworkInterfaceStats(BSONObjBuilder&) const override;
private:
// Not owned by us.
diff --git a/src/mongo/util/SConscript b/src/mongo/util/SConscript
index 3196846636f..97c6aa75eb4 100644
--- a/src/mongo/util/SConscript
+++ b/src/mongo/util/SConscript
@@ -493,6 +493,26 @@ env.Library(
]
)
+env.Library(
+ target='executor_stats',
+ source=[
+ 'executor_stats.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/base',
+ ],
+)
+
+env.CppUnitTest(
+ target='executor_stats_test',
+ source=[
+ 'executor_stats_test.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/util/clock_source_mock',
+ '$BUILD_DIR/mongo/util/executor_stats',
+ ],
+)
env.Benchmark(
target='decimal_counter_bm',
diff --git a/src/mongo/util/executor_stats.cpp b/src/mongo/util/executor_stats.cpp
new file mode 100644
index 00000000000..3f24f4e84ad
--- /dev/null
+++ b/src/mongo/util/executor_stats.cpp
@@ -0,0 +1,96 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include <fmt/format.h>
+#include <string>
+
+#include "mongo/util/executor_stats.h"
+
+#include "mongo/base/status.h"
+
+namespace mongo {
+
+namespace {
+
+constexpr auto kMillisPerBucket = 50;
+
+void recordDuration(Counter64* buckets, Milliseconds duration) {
+ size_t index;
+ if (duration < Milliseconds(1)) {
+ index = 0;
+ } else if (duration >= Seconds(1)) {
+ index = ExecutorStats::kNumBuckets - 1;
+ } else {
+ // Each bucket covers a 50 milliseconds window (e.g., [1, 50), [50, 100) and so on).
+ // That's why the duration (in milliseconds) is divided by 50 to compute the index.
+ index = 1 + durationCount<Milliseconds>(duration) / kMillisPerBucket;
+ }
+ buckets[index].increment();
+}
+
+void serializeBuckets(const Counter64* buckets, BSONObjBuilder bob) {
+ auto makeTag = [](size_t i) -> std::string {
+ if (i == 0)
+ return "0-999us";
+ if (i == ExecutorStats::kNumBuckets - 1)
+ return "1000ms+";
+
+ const auto lb = i > 1 ? (i - 1) * kMillisPerBucket : 1;
+ const auto ub = i * kMillisPerBucket - 1;
+ return fmt::format("{}-{}ms", lb, ub);
+ };
+
+ for (size_t i = 0; i < ExecutorStats::kNumBuckets; i++) {
+ bob.append(makeTag(i), buckets[i].get());
+ }
+}
+
+} // namespace
+
+ExecutorStats::Task ExecutorStats::wrapTask(ExecutorStats::Task&& task) {
+ _scheduled.increment(1);
+ return [this, task = std::move(task), scheduledAt = _clkSource->now()](Status status) {
+ const auto startedAt = _clkSource->now();
+ recordDuration(_waitingBuckets, startedAt - scheduledAt);
+
+ task(std::move(status));
+
+ recordDuration(_runningBuckets, _clkSource->now() - startedAt);
+ _executed.increment(1);
+ };
+}
+
+void ExecutorStats::serialize(BSONObjBuilder* bob) const {
+ bob->append("scheduled"_sd, _scheduled.get());
+ bob->append("executed"_sd, _executed.get());
+ serializeBuckets(_waitingBuckets, bob->subobjStart("waitTime"_sd));
+ serializeBuckets(_runningBuckets, bob->subobjStart("runTime"_sd));
+}
+
+} // namespace mongo
diff --git a/src/mongo/util/executor_stats.h b/src/mongo/util/executor_stats.h
new file mode 100644
index 00000000000..6474c1bc5ce
--- /dev/null
+++ b/src/mongo/util/executor_stats.h
@@ -0,0 +1,80 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/base/counter.h"
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/util/clock_source.h"
+#include "mongo/util/duration.h"
+#include "mongo/util/out_of_line_executor.h"
+
+namespace mongo {
+
+/**
+ * A utility to collect stats for tasks scheduled on an executor (e.g., the reactor thread).
+ *
+ * This class expects a clock source to measure the waiting and execution time of tasks. The clock
+ * source must always outlive instances of this class.
+ *
+ * This class collects stats for any task that is wrapped using `wrapTask`. The wrapped task may
+ * never outlive the instance of `ExecutorStats`.
+ *
+ * All public interfaces for this class are thread-safe.
+ */
+class ExecutorStats {
+public:
+ explicit ExecutorStats(ClockSource* clkSource) : _clkSource(clkSource) {}
+
+ using Task = OutOfLineExecutor::Task;
+ Task wrapTask(Task&&);
+
+ void serialize(BSONObjBuilder* bob) const;
+
+ static constexpr auto kNumBuckets = 22;
+
+private:
+ Counter64 _scheduled;
+ Counter64 _executed;
+
+ /**
+ * The following maintain histograms for tasks scheduled on the executor:
+ * `_waitingBuckets` represents the waiting time for the tasks pending execution.
+ * `_runningBuckets` keeps track of execution time of individual tasks after completion.
+ * Buckets 0 represents any recorded latency less than 1 milliseconds.
+ * Buckets 1 to 20 represent the 1-999 ms range.
+ * The last bucket represents any recorded latency that exceeds one second.
+ */
+ Counter64 _waitingBuckets[kNumBuckets];
+ Counter64 _runningBuckets[kNumBuckets];
+
+ ClockSource* const _clkSource;
+};
+
+} // namespace mongo
diff --git a/src/mongo/util/executor_stats_test.cpp b/src/mongo/util/executor_stats_test.cpp
new file mode 100644
index 00000000000..59285351d3f
--- /dev/null
+++ b/src/mongo/util/executor_stats_test.cpp
@@ -0,0 +1,219 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
+
+#include <fmt/format.h>
+#include <functional>
+#include <memory>
+#include <string>
+#include <tuple>
+#include <vector>
+
+#include "mongo/bson/bsonobj.h"
+#include "mongo/logv2/log.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/clock_source_mock.h"
+#include "mongo/util/executor_stats.h"
+
+namespace mongo {
+
+class ExecutorStatsTest : public unittest::Test {
+public:
+ void setUp() override {
+ _stats = std::make_unique<ExecutorStats>(&_clkSource);
+ }
+
+ void tearDown() override {
+ _stats.reset();
+ }
+
+ auto getScheduled() const {
+ return getStats().getField("scheduled").numberLong();
+ }
+
+ auto getExecuted() const {
+ return getStats().getField("executed").numberLong();
+ }
+
+ auto getWaitTime() const {
+ return getStats().getObjectField("waitTime").getOwned();
+ }
+
+ auto getRunTime() const {
+ return getStats().getObjectField("runTime").getOwned();
+ }
+
+ auto wrapTask(ExecutorStats::Task&& task) {
+ return _stats->wrapTask(std::move(task));
+ }
+
+ void advanceTime(Milliseconds step) {
+ _clkSource.advance(step);
+ }
+
+ BSONObj getStats() const {
+ BSONObjBuilder bob;
+ _stats->serialize(&bob);
+ return bob.obj();
+ }
+
+ void runTimingTests(std::string histogramTag,
+ std::function<size_t(Microseconds, Microseconds)> test) {
+ // Represents a test case, where `min` and `max` will be provided to `test`. The expected
+ // behavior for `ExecutorStats` is to update the histogram bucket corresponding to `tag`.
+ struct Bucket {
+ std::string tag;
+ Microseconds min;
+ Microseconds max;
+ };
+
+ const Bucket inputs[]{
+ {"0-999us", Microseconds(0), Microseconds(1000)},
+ {"1-49ms", Milliseconds(1), Milliseconds(50)},
+ {"50-99ms", Milliseconds(50), Milliseconds(100)},
+ {"500-549ms", Milliseconds(500), Milliseconds(550)},
+ {"950-999ms", Milliseconds(950), Milliseconds(1000)},
+ {"1000ms+", Seconds(1), Seconds(100)},
+ };
+
+ for (const auto& bucket : inputs) {
+ // Runs the test and captures stats before and after.
+ // For each entry in `inputs`, the following calls into `test` and provides `min` and
+ // `max` delays. The test function returns the number of tasks it schedules and runs.
+ const auto beforeStats = getStats().getObjectField(histogramTag).getOwned();
+
+ const auto numTasks = test(bucket.min, bucket.max);
+
+ const auto afterStats = getStats().getObjectField(histogramTag).getOwned();
+
+ // Verify stats by inspecting the before and after states. The expectation is for the
+ // bucket corresponding to `bucket.tag` to increase by `numTasks`, and for all other
+ // buckets to remain unchanged.
+ std::string errMsg =
+ fmt::format("Bad value for bucket '{}' in {}", bucket.tag, afterStats.toString());
+ ASSERT_EQ(afterStats.getField(bucket.tag).numberLong(), numTasks) << errMsg;
+
+ for (const auto& element : afterStats) {
+ const auto fieldName = element.fieldName();
+ if (fieldName == bucket.tag)
+ continue;
+ std::string errMsg = fmt::format("Expected matching values for '{}' in {} and {}",
+ fieldName,
+ beforeStats.toString(),
+ afterStats.toString());
+ ASSERT_EQ(beforeStats.getField(fieldName).numberLong(), element.numberLong())
+ << errMsg;
+ }
+ }
+
+ LOGV2(5985801, "Execution stats", "stats"_attr = getStats());
+ }
+
+private:
+ ClockSourceMock _clkSource;
+
+ std::unique_ptr<ExecutorStats> _stats;
+};
+
+TEST_F(ExecutorStatsTest, ScheduledAndExecutedMetrics) {
+ /**
+ * Wrap a total of three tasks and run them to ensure both `scheduled` and `executed`
+ * metrics are properly incremented.
+ */
+ auto t1 = wrapTask([](Status) {});
+ ASSERT_EQ(getScheduled(), 1);
+ ASSERT_EQ(getExecuted(), 0);
+ t1(Status::OK());
+ ASSERT_EQ(getScheduled(), 1);
+ ASSERT_EQ(getExecuted(), 1);
+
+ {
+ auto t2 = wrapTask([](Status) {});
+ ASSERT_EQ(getScheduled(), 2);
+ auto t3 = wrapTask([](Status) {});
+ ASSERT_EQ(getScheduled(), 3);
+ ASSERT_EQ(getExecuted(), 1);
+
+ t3(Status::OK());
+ ASSERT_EQ(getExecuted(), 2);
+ }
+
+ ASSERT_EQ(getScheduled(), 3);
+ ASSERT_EQ(getExecuted(), 2);
+}
+
+
+TEST_F(ExecutorStatsTest, WaitTime) {
+ /**
+ * Schedules a total of 100 tasks and introduces artificial delays before running each task.
+ * The delays start at `min`, never exceed `max`, and are incrementally increased by `step`.
+ */
+ auto test = [this](Microseconds min, Microseconds max) {
+ const auto numTasks = 100;
+ const auto step = (max - min) / numTasks;
+ ASSERT_GT(step, Microseconds{0});
+
+ size_t scheduledTasks = 0;
+ for (auto delay = min; delay < max; delay += step, scheduledTasks++) {
+ auto task = wrapTask([](Status) {});
+ advanceTime(duration_cast<Milliseconds>(delay));
+ task(Status::OK());
+ }
+
+ return scheduledTasks;
+ };
+
+ runTimingTests("waitTime", test);
+}
+
+TEST_F(ExecutorStatsTest, RunTime) {
+ /**
+ * Schedules a few tasks and introduces artificial delays as running each task. These delays
+ * follow the same semantics as mentioned earlier in `ExecutorStatsTest::WaitTime`.
+ */
+ auto test = [this](Microseconds min, Microseconds max) {
+ const auto numTasks = 20;
+ const auto step = (max - min) / numTasks;
+ ASSERT_GT(step, Microseconds{0});
+
+ size_t scheduledTasks = 0;
+ for (auto delay = min; delay < max; delay += step, scheduledTasks++) {
+ auto task = wrapTask(
+ [this, delay](Status) { advanceTime(duration_cast<Milliseconds>(delay)); });
+ task(Status::OK());
+ }
+
+ return scheduledTasks;
+ };
+
+ runTimingTests("runTime", test);
+}
+
+} // namespace mongo