summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorBrett Nawrocki <brett.nawrocki@mongodb.com>2022-07-22 16:11:26 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-07-28 18:55:59 +0000
commitb263fd5da80fe00213ccc8694f5215ab4da2159a (patch)
tree0cc5983bb8cdce0dc7ba5acfb48077296b97785a /src/mongo
parentc1c13273cb24b8968849533bf6d25e9eaf2d6855 (diff)
downloadmongo-b263fd5da80fe00213ccc8694f5215ab4da2159a.tar.gz
SERVER-67112 Create ReshardingCumulativeMetrics subclass
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/s/SConscript2
-rw-r--r--src/mongo/db/s/global_index_metrics_test.cpp2
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp2
-rw-r--r--src/mongo/db/s/resharding/resharding_cumulative_metrics.cpp304
-rw-r--r--src/mongo/db/s/resharding/resharding_cumulative_metrics.h184
-rw-r--r--src/mongo/db/s/resharding/resharding_cumulative_metrics_test.cpp574
-rw-r--r--src/mongo/db/s/resharding/resharding_metrics.cpp61
-rw-r--r--src/mongo/db/s/resharding/resharding_metrics.h23
-rw-r--r--src/mongo/db/s/resharding/resharding_metrics_test.cpp11
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_applier_metrics_test.cpp7
-rw-r--r--src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp388
-rw-r--r--src/mongo/db/s/sharding_data_transform_cumulative_metrics.h146
-rw-r--r--src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp717
-rw-r--r--src/mongo/db/s/sharding_data_transform_instance_metrics.cpp13
-rw-r--r--src/mongo/db/s/sharding_data_transform_instance_metrics.h3
-rw-r--r--src/mongo/db/s/sharding_data_transform_instance_metrics_test.cpp14
-rw-r--r--src/mongo/db/s/sharding_data_transform_metrics_test_fixture.h47
17 files changed, 1345 insertions, 1153 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index f7cbf589684..f306e353230 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -94,6 +94,7 @@ env.Library(
'resharding/resharding_coordinator_observer.cpp',
'resharding/resharding_coordinator_service.cpp',
'resharding/resharding_cumulative_metrics_field_name_provider.cpp',
+ 'resharding/resharding_cumulative_metrics.cpp',
'resharding/resharding_data_copy_util.cpp',
'resharding/resharding_data_replication.cpp',
'resharding/resharding_donor_oplog_iterator.cpp',
@@ -591,6 +592,7 @@ env.CppUnitTest(
'resharding/resharding_agg_test.cpp',
'resharding/resharding_collection_cloner_test.cpp',
'resharding/resharding_collection_test.cpp',
+ 'resharding/resharding_cumulative_metrics_test.cpp',
'resharding/resharding_data_replication_test.cpp',
'resharding/resharding_destined_recipient_test.cpp',
'resharding/resharding_donor_oplog_iterator_test.cpp',
diff --git a/src/mongo/db/s/global_index_metrics_test.cpp b/src/mongo/db/s/global_index_metrics_test.cpp
index 7d2dcb36f41..ebdae28fc08 100644
--- a/src/mongo/db/s/global_index_metrics_test.cpp
+++ b/src/mongo/db/s/global_index_metrics_test.cpp
@@ -54,7 +54,7 @@ public:
role,
clockSource->now(),
clockSource,
- &_cumulativeMetrics);
+ _cumulativeMetrics.get());
}
};
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp
index d8740053c80..ed4e41a63f2 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp
@@ -108,7 +108,7 @@ private:
boost::optional<Callback> _runOnMockingNextResponse;
- ShardingDataTransformCumulativeMetrics _cumulativeMetrics{"dummyForTest"};
+ ReshardingCumulativeMetrics _cumulativeMetrics;
std::shared_ptr<ReshardingMetrics> _metrics;
};
diff --git a/src/mongo/db/s/resharding/resharding_cumulative_metrics.cpp b/src/mongo/db/s/resharding/resharding_cumulative_metrics.cpp
new file mode 100644
index 00000000000..164b8c4e0d0
--- /dev/null
+++ b/src/mongo/db/s/resharding/resharding_cumulative_metrics.cpp
@@ -0,0 +1,304 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/s/resharding/resharding_cumulative_metrics.h"
+
+namespace mongo {
+
+namespace {
+constexpr auto kResharding = "resharding";
+}
+
+ReshardingCumulativeMetrics::ReshardingCumulativeMetrics()
+ : ShardingDataTransformCumulativeMetrics(
+ kResharding, std::make_unique<ReshardingCumulativeMetricsFieldNameProvider>()),
+ _fieldNames(
+ static_cast<const ReshardingCumulativeMetricsFieldNameProvider*>(getFieldNames())),
+ _coordinatorStateList{AtomicWord<int64_t>{0},
+ AtomicWord<int64_t>{0},
+ AtomicWord<int64_t>{0},
+ AtomicWord<int64_t>{0},
+ AtomicWord<int64_t>{0},
+ AtomicWord<int64_t>{0},
+ AtomicWord<int64_t>{0},
+ AtomicWord<int64_t>{0}},
+ _donorStateList{AtomicWord<int64_t>{0},
+ AtomicWord<int64_t>{0},
+ AtomicWord<int64_t>{0},
+ AtomicWord<int64_t>{0},
+ AtomicWord<int64_t>{0},
+ AtomicWord<int64_t>{0},
+ AtomicWord<int64_t>{0}},
+ _recipientStateList{AtomicWord<int64_t>{0},
+ AtomicWord<int64_t>{0},
+ AtomicWord<int64_t>{0},
+ AtomicWord<int64_t>{0},
+ AtomicWord<int64_t>{0},
+ AtomicWord<int64_t>{0},
+ AtomicWord<int64_t>{0}} {}
+
+StringData ReshardingCumulativeMetrics::fieldNameFor(
+ CoordinatorStateEnum state, const ReshardingCumulativeMetricsFieldNameProvider* provider) {
+ switch (state) {
+ case CoordinatorStateEnum::kInitializing:
+ return provider->getForCountInstancesInCoordinatorState1Initializing();
+
+ case CoordinatorStateEnum::kPreparingToDonate:
+ return provider->getForCountInstancesInCoordinatorState2PreparingToDonate();
+
+ case CoordinatorStateEnum::kCloning:
+ return provider->getForCountInstancesInCoordinatorState3Cloning();
+
+ case CoordinatorStateEnum::kApplying:
+ return provider->getForCountInstancesInCoordinatorState4Applying();
+
+ case CoordinatorStateEnum::kBlockingWrites:
+ return provider->getForCountInstancesInCoordinatorState5BlockingWrites();
+
+ case CoordinatorStateEnum::kAborting:
+ return provider->getForCountInstancesInCoordinatorState6Aborting();
+
+ case CoordinatorStateEnum::kCommitting:
+ return provider->getForCountInstancesInCoordinatorState7Committing();
+
+ default:
+ uasserted(6438601,
+ str::stream()
+ << "no field name for coordinator state " << static_cast<int32_t>(state));
+ break;
+ }
+
+ MONGO_UNREACHABLE;
+}
+
+StringData ReshardingCumulativeMetrics::fieldNameFor(
+ DonorStateEnum state, const ReshardingCumulativeMetricsFieldNameProvider* provider) {
+ switch (state) {
+ case DonorStateEnum::kPreparingToDonate:
+ return provider->getForCountInstancesInDonorState1PreparingToDonate();
+
+ case DonorStateEnum::kDonatingInitialData:
+ return provider->getForCountInstancesInDonorState2DonatingInitialData();
+
+ case DonorStateEnum::kDonatingOplogEntries:
+ return provider->getForCountInstancesInDonorState3DonatingOplogEntries();
+
+ case DonorStateEnum::kPreparingToBlockWrites:
+ return provider->getForCountInstancesInDonorState4PreparingToBlockWrites();
+
+ case DonorStateEnum::kError:
+ return provider->getForCountInstancesInDonorState5Error();
+
+ case DonorStateEnum::kBlockingWrites:
+ return provider->getForCountInstancesInDonorState6BlockingWrites();
+
+ case DonorStateEnum::kDone:
+ return provider->getForCountInstancesInDonorState7Done();
+
+ default:
+ uasserted(6438700,
+ str::stream()
+ << "no field name for donor state " << static_cast<int32_t>(state));
+ break;
+ }
+
+ MONGO_UNREACHABLE;
+}
+
+StringData ReshardingCumulativeMetrics::fieldNameFor(
+ RecipientStateEnum state, const ReshardingCumulativeMetricsFieldNameProvider* provider) {
+ switch (state) {
+ case RecipientStateEnum::kAwaitingFetchTimestamp:
+ return provider->getForCountInstancesInRecipientState1AwaitingFetchTimestamp();
+
+ case RecipientStateEnum::kCreatingCollection:
+ return provider->getForCountInstancesInRecipientState2CreatingCollection();
+
+ case RecipientStateEnum::kCloning:
+ return provider->getForCountInstancesInRecipientState3Cloning();
+
+ case RecipientStateEnum::kApplying:
+ return provider->getForCountInstancesInRecipientState4Applying();
+
+ case RecipientStateEnum::kError:
+ return provider->getForCountInstancesInRecipientState5Error();
+
+ case RecipientStateEnum::kStrictConsistency:
+ return provider->getForCountInstancesInRecipientState6StrictConsistency();
+
+ case RecipientStateEnum::kDone:
+ return provider->getForCountInstancesInRecipientState7Done();
+
+ default:
+ uasserted(6438900,
+ str::stream()
+ << "no field name for recipient state " << static_cast<int32_t>(state));
+ break;
+ }
+
+ MONGO_UNREACHABLE;
+}
+
+
+ReshardingCumulativeMetrics::CoordinatorStateArray* ReshardingCumulativeMetrics::getStateArrayFor(
+ CoordinatorStateEnum state) {
+ return &_coordinatorStateList;
+}
+
+const ReshardingCumulativeMetrics::CoordinatorStateArray*
+ReshardingCumulativeMetrics::getStateArrayFor(CoordinatorStateEnum state) const {
+ return &_coordinatorStateList;
+}
+
+ReshardingCumulativeMetrics::DonorStateArray* ReshardingCumulativeMetrics::getStateArrayFor(
+ DonorStateEnum state) {
+ return &_donorStateList;
+}
+
+const ReshardingCumulativeMetrics::DonorStateArray* ReshardingCumulativeMetrics::getStateArrayFor(
+ DonorStateEnum state) const {
+ return &_donorStateList;
+}
+
+ReshardingCumulativeMetrics::RecipientStateArray* ReshardingCumulativeMetrics::getStateArrayFor(
+ RecipientStateEnum state) {
+ return &_recipientStateList;
+}
+
+const ReshardingCumulativeMetrics::RecipientStateArray*
+ReshardingCumulativeMetrics::getStateArrayFor(RecipientStateEnum state) const {
+ return &_recipientStateList;
+}
+
+void ReshardingCumulativeMetrics::reportActive(BSONObjBuilder* bob) const {
+ ShardingDataTransformCumulativeMetrics::reportActive(bob);
+
+ bob->append(_fieldNames->getForOplogEntriesFetched(), _oplogEntriesFetched.load());
+ bob->append(_fieldNames->getForOplogEntriesApplied(), _oplogEntriesApplied.load());
+ bob->append(_fieldNames->getForInsertsApplied(), _insertsApplied.load());
+ bob->append(_fieldNames->getForUpdatesApplied(), _updatesApplied.load());
+ bob->append(_fieldNames->getForDeletesApplied(), _deletesApplied.load());
+}
+
+void ReshardingCumulativeMetrics::reportLatencies(BSONObjBuilder* bob) const {
+ ShardingDataTransformCumulativeMetrics::reportLatencies(bob);
+
+ bob->append(_fieldNames->getForOplogFetchingTotalRemoteBatchRetrievalTimeMillis(),
+ _oplogFetchingTotalRemoteBatchesRetrievalTimeMillis.load());
+ bob->append(_fieldNames->getForOplogFetchingTotalRemoteBatchesRetrieved(),
+ _oplogFetchingTotalRemoteBatchesRetrieved.load());
+ bob->append(_fieldNames->getForOplogFetchingTotalLocalInsertTimeMillis(),
+ _oplogFetchingTotalLocalInsertTimeMillis.load());
+ bob->append(_fieldNames->getForOplogFetchingTotalLocalInserts(),
+ _oplogFetchingTotalLocalInserts.load());
+ bob->append(_fieldNames->getForOplogApplyingTotalLocalBatchRetrievalTimeMillis(),
+ _oplogApplyingTotalBatchesRetrievalTimeMillis.load());
+ bob->append(_fieldNames->getForOplogApplyingTotalLocalBatchesRetrieved(),
+ _oplogApplyingTotalBatchesRetrieved.load());
+ bob->append(_fieldNames->getForOplogApplyingTotalLocalBatchApplyTimeMillis(),
+ _oplogBatchAppliedMillis.load());
+ bob->append(_fieldNames->getForOplogApplyingTotalLocalBatchesApplied(),
+ _oplogBatchApplied.load());
+}
+
+void ReshardingCumulativeMetrics::reportCurrentInSteps(BSONObjBuilder* bob) const {
+ ShardingDataTransformCumulativeMetrics::reportCurrentInSteps(bob);
+
+ auto reportState = [this, bob](auto state) {
+ bob->append(fieldNameFor(state, _fieldNames), getStateCounter(state)->load());
+ };
+
+ reportState(CoordinatorStateEnum::kInitializing);
+ reportState(CoordinatorStateEnum::kPreparingToDonate);
+ reportState(CoordinatorStateEnum::kCloning);
+ reportState(CoordinatorStateEnum::kApplying);
+ reportState(CoordinatorStateEnum::kBlockingWrites);
+ reportState(CoordinatorStateEnum::kAborting);
+ reportState(CoordinatorStateEnum::kCommitting);
+
+ reportState(RecipientStateEnum::kAwaitingFetchTimestamp);
+ reportState(RecipientStateEnum::kCreatingCollection);
+ reportState(RecipientStateEnum::kCloning);
+ reportState(RecipientStateEnum::kApplying);
+ reportState(RecipientStateEnum::kError);
+ reportState(RecipientStateEnum::kStrictConsistency);
+ reportState(RecipientStateEnum::kDone);
+
+ reportState(DonorStateEnum::kPreparingToDonate);
+ reportState(DonorStateEnum::kDonatingInitialData);
+ reportState(DonorStateEnum::kDonatingOplogEntries);
+ reportState(DonorStateEnum::kPreparingToBlockWrites);
+ reportState(DonorStateEnum::kError);
+ reportState(DonorStateEnum::kBlockingWrites);
+ reportState(DonorStateEnum::kDone);
+}
+
+void ReshardingCumulativeMetrics::onInsertApplied() {
+ _insertsApplied.fetchAndAdd(1);
+}
+
+void ReshardingCumulativeMetrics::onUpdateApplied() {
+ _updatesApplied.fetchAndAdd(1);
+}
+
+void ReshardingCumulativeMetrics::onDeleteApplied() {
+ _deletesApplied.fetchAndAdd(1);
+}
+
+void ReshardingCumulativeMetrics::onOplogEntriesFetched(int64_t numEntries, Milliseconds elapsed) {
+ _oplogEntriesFetched.fetchAndAdd(numEntries);
+ _oplogFetchingTotalRemoteBatchesRetrieved.fetchAndAdd(1);
+ _oplogFetchingTotalRemoteBatchesRetrievalTimeMillis.fetchAndAdd(
+ durationCount<Milliseconds>(elapsed));
+}
+
+void ReshardingCumulativeMetrics::onOplogEntriesApplied(int64_t numEntries) {
+ _oplogEntriesApplied.fetchAndAdd(numEntries);
+}
+
+void ReshardingCumulativeMetrics::onOplogLocalBatchApplied(Milliseconds elapsed) {
+ _oplogBatchApplied.fetchAndAdd(1);
+ _oplogBatchAppliedMillis.fetchAndAdd(durationCount<Milliseconds>(elapsed));
+}
+
+void ReshardingCumulativeMetrics::onLocalInsertDuringOplogFetching(
+ const Milliseconds& elapsedTime) {
+ _oplogFetchingTotalLocalInserts.fetchAndAdd(1);
+ _oplogFetchingTotalLocalInsertTimeMillis.fetchAndAdd(durationCount<Milliseconds>(elapsedTime));
+}
+
+void ReshardingCumulativeMetrics::onBatchRetrievedDuringOplogApplying(
+ const Milliseconds& elapsedTime) {
+ _oplogApplyingTotalBatchesRetrieved.fetchAndAdd(1);
+ _oplogApplyingTotalBatchesRetrievalTimeMillis.fetchAndAdd(
+ durationCount<Milliseconds>(elapsedTime));
+}
+
+
+} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_cumulative_metrics.h b/src/mongo/db/s/resharding/resharding_cumulative_metrics.h
new file mode 100644
index 00000000000..af3e9e29206
--- /dev/null
+++ b/src/mongo/db/s/resharding/resharding_cumulative_metrics.h
@@ -0,0 +1,184 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/s/resharding/resharding_cumulative_metrics_field_name_provider.h"
+#include "mongo/db/s/sharding_data_transform_cumulative_metrics.h"
+
+namespace mongo {
+
+class ReshardingCumulativeMetrics : public ShardingDataTransformCumulativeMetrics {
+public:
+ enum class CoordinatorStateEnum : int32_t {
+ kUnused = -1,
+ kInitializing,
+ kPreparingToDonate,
+ kCloning,
+ kApplying,
+ kBlockingWrites,
+ kAborting,
+ kCommitting,
+ kDone,
+ kNumStates
+ };
+
+ enum class DonorStateEnum : int32_t {
+ kUnused = -1,
+ kPreparingToDonate,
+ kDonatingInitialData,
+ kDonatingOplogEntries,
+ kPreparingToBlockWrites,
+ kError,
+ kBlockingWrites,
+ kDone,
+ kNumStates
+ };
+
+ enum class RecipientStateEnum : int32_t {
+ kUnused = -1,
+ kAwaitingFetchTimestamp,
+ kCreatingCollection,
+ kCloning,
+ kApplying,
+ kError,
+ kStrictConsistency,
+ kDone,
+ kNumStates
+ };
+
+ ReshardingCumulativeMetrics();
+
+ static StringData fieldNameFor(CoordinatorStateEnum state,
+ const ReshardingCumulativeMetricsFieldNameProvider* provider);
+ static StringData fieldNameFor(DonorStateEnum state,
+ const ReshardingCumulativeMetricsFieldNameProvider* provider);
+ static StringData fieldNameFor(RecipientStateEnum state,
+ const ReshardingCumulativeMetricsFieldNameProvider* provider);
+
+ /**
+ * The before can be boost::none to represent the initial state transition and
+ * after can be boost::none to represent cases where it is no longer active.
+ */
+ template <typename T>
+ void onStateTransition(boost::optional<T> before, boost::optional<T> after);
+
+ void onInsertApplied();
+ void onUpdateApplied();
+ void onDeleteApplied();
+
+ void onOplogEntriesFetched(int64_t numEntries, Milliseconds elapsed);
+ void onOplogEntriesApplied(int64_t numEntries);
+ void onLocalInsertDuringOplogFetching(const Milliseconds& elapsedTime);
+ void onBatchRetrievedDuringOplogApplying(const Milliseconds& elapsedTime);
+ void onOplogLocalBatchApplied(Milliseconds elapsed);
+
+private:
+ using CoordinatorStateArray =
+ std::array<AtomicWord<int64_t>, static_cast<size_t>(CoordinatorStateEnum::kNumStates)>;
+ using DonorStateArray =
+ std::array<AtomicWord<int64_t>, static_cast<size_t>(DonorStateEnum::kNumStates)>;
+ using RecipientStateArray =
+ std::array<AtomicWord<int64_t>, static_cast<size_t>(RecipientStateEnum::kNumStates)>;
+
+ template <typename T>
+ const AtomicWord<int64_t>* getStateCounter(T state) const;
+ template <typename T>
+ AtomicWord<int64_t>* getMutableStateCounter(T state);
+
+ CoordinatorStateArray* getStateArrayFor(CoordinatorStateEnum state);
+ const CoordinatorStateArray* getStateArrayFor(CoordinatorStateEnum state) const;
+ DonorStateArray* getStateArrayFor(DonorStateEnum state);
+ const DonorStateArray* getStateArrayFor(DonorStateEnum state) const;
+ RecipientStateArray* getStateArrayFor(RecipientStateEnum state);
+ const RecipientStateArray* getStateArrayFor(RecipientStateEnum state) const;
+
+ virtual void reportActive(BSONObjBuilder* bob) const;
+ virtual void reportLatencies(BSONObjBuilder* bob) const;
+ virtual void reportCurrentInSteps(BSONObjBuilder* bob) const;
+
+ const ReshardingCumulativeMetricsFieldNameProvider* _fieldNames;
+
+ AtomicWord<int64_t> _insertsApplied{0};
+ AtomicWord<int64_t> _updatesApplied{0};
+ AtomicWord<int64_t> _deletesApplied{0};
+ AtomicWord<int64_t> _oplogEntriesApplied{0};
+ AtomicWord<int64_t> _oplogEntriesFetched{0};
+
+ AtomicWord<int64_t> _oplogFetchingTotalRemoteBatchesRetrieved{0};
+ AtomicWord<int64_t> _oplogFetchingTotalRemoteBatchesRetrievalTimeMillis{0};
+ AtomicWord<int64_t> _oplogFetchingTotalLocalInserts{0};
+ AtomicWord<int64_t> _oplogFetchingTotalLocalInsertTimeMillis{0};
+ AtomicWord<int64_t> _oplogApplyingTotalBatchesRetrieved{0};
+ AtomicWord<int64_t> _oplogApplyingTotalBatchesRetrievalTimeMillis{0};
+ AtomicWord<int64_t> _oplogBatchApplied{0};
+ AtomicWord<int64_t> _oplogBatchAppliedMillis{0};
+
+ CoordinatorStateArray _coordinatorStateList;
+ DonorStateArray _donorStateList;
+ RecipientStateArray _recipientStateList;
+};
+
+template <typename T>
+void ReshardingCumulativeMetrics::onStateTransition(boost::optional<T> before,
+ boost::optional<T> after) {
+ if (before) {
+ if (auto counter = getMutableStateCounter(*before)) {
+ counter->fetchAndSubtract(1);
+ }
+ }
+
+ if (after) {
+ if (auto counter = getMutableStateCounter(*after)) {
+ counter->fetchAndAdd(1);
+ }
+ }
+}
+
+template <typename T>
+const AtomicWord<int64_t>* ReshardingCumulativeMetrics::getStateCounter(T state) const {
+ if (state == T::kUnused) {
+ return nullptr;
+ }
+
+ invariant(static_cast<size_t>(state) < static_cast<size_t>(T::kNumStates));
+ return &((*getStateArrayFor(state))[static_cast<size_t>(state)]);
+}
+
+template <typename T>
+AtomicWord<int64_t>* ReshardingCumulativeMetrics::getMutableStateCounter(T state) {
+ if (state == T::kUnused) {
+ return nullptr;
+ }
+
+ invariant(static_cast<size_t>(state) < static_cast<size_t>(T::kNumStates));
+ return &((*getStateArrayFor(state))[static_cast<size_t>(state)]);
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_cumulative_metrics_test.cpp b/src/mongo/db/s/resharding/resharding_cumulative_metrics_test.cpp
new file mode 100644
index 00000000000..c35b2f31c10
--- /dev/null
+++ b/src/mongo/db/s/resharding/resharding_cumulative_metrics_test.cpp
@@ -0,0 +1,574 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+
+#include "mongo/db/s/resharding/resharding_cumulative_metrics.h"
+#include "mongo/db/s/sharding_data_transform_metrics_test_fixture.h"
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
+
+namespace mongo {
+namespace {
+
+constexpr auto kResharding = "resharding";
+
+class ReshardingCumulativeMetricsTest : public ShardingDataTransformMetricsTestFixture {
+protected:
+ void setUp() override {
+ ShardingDataTransformMetricsTestFixture::setUp();
+ _reshardingCumulativeMetrics =
+ static_cast<ReshardingCumulativeMetrics*>(_cumulativeMetrics.get());
+ _fieldNames = std::make_unique<ReshardingCumulativeMetricsFieldNameProvider>();
+ }
+
+ virtual std::unique_ptr<ShardingDataTransformCumulativeMetrics> initializeCumulativeMetrics()
+ override {
+ return std::make_unique<ReshardingCumulativeMetrics>();
+ }
+
+ using CoordinatorStateEnum = ReshardingCumulativeMetrics::CoordinatorStateEnum;
+ using DonorStateEnum = ReshardingCumulativeMetrics::DonorStateEnum;
+ using RecipientStateEnum = ReshardingCumulativeMetrics::RecipientStateEnum;
+
+ template <typename T>
+ StringData fieldNameFor(T state) {
+ return ReshardingCumulativeMetrics::fieldNameFor(state, _fieldNames.get());
+ }
+
+ BSONObj getStateSubObj(const ReshardingCumulativeMetrics* metrics) {
+ BSONObjBuilder bob;
+ metrics->reportForServerStatus(&bob);
+ auto report = bob.done();
+ return report.getObjectField(kResharding).getObjectField("currentInSteps").getOwned();
+ }
+
+ bool checkCoordinateStateField(const ReshardingCumulativeMetrics* metrics,
+ boost::optional<CoordinatorStateEnum> expectedState) {
+ auto serverStatusSubObj = getStateSubObj(metrics);
+ std::map<std::string, int> expectedStateFieldCount;
+
+ auto addExpectedField = [&](CoordinatorStateEnum stateToPopulate) {
+ expectedStateFieldCount.emplace(
+ fieldNameFor(stateToPopulate),
+ ((expectedState && (stateToPopulate == expectedState)) ? 1 : 0));
+ };
+
+ addExpectedField(CoordinatorStateEnum::kInitializing);
+ addExpectedField(CoordinatorStateEnum::kPreparingToDonate);
+ addExpectedField(CoordinatorStateEnum::kCloning);
+ addExpectedField(CoordinatorStateEnum::kApplying);
+ addExpectedField(CoordinatorStateEnum::kBlockingWrites);
+ addExpectedField(CoordinatorStateEnum::kAborting);
+ addExpectedField(CoordinatorStateEnum::kCommitting);
+
+ for (const auto& fieldNameAndState : expectedStateFieldCount) {
+ const auto actualValue = serverStatusSubObj.getIntField(fieldNameAndState.first);
+ if (actualValue != fieldNameAndState.second) {
+ LOGV2_DEBUG(6438600,
+ 0,
+ "coordinator state field value does not match expected value",
+ "field"_attr = fieldNameAndState.first,
+ "serverStatus"_attr = serverStatusSubObj);
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ bool checkDonorStateField(const ReshardingCumulativeMetrics* metrics,
+ boost::optional<DonorStateEnum> expectedState) {
+ auto serverStatusSubObj = getStateSubObj(metrics);
+ std::map<std::string, int> expectedStateFieldCount;
+
+ auto addExpectedField = [&](DonorStateEnum stateToPopulate) {
+ expectedStateFieldCount.emplace(
+ fieldNameFor(stateToPopulate),
+ ((expectedState && (stateToPopulate == expectedState)) ? 1 : 0));
+ };
+
+ addExpectedField(DonorStateEnum::kPreparingToDonate);
+ addExpectedField(DonorStateEnum::kDonatingInitialData);
+ addExpectedField(DonorStateEnum::kDonatingOplogEntries);
+ addExpectedField(DonorStateEnum::kPreparingToBlockWrites);
+ addExpectedField(DonorStateEnum::kError);
+ addExpectedField(DonorStateEnum::kBlockingWrites);
+ addExpectedField(DonorStateEnum::kDone);
+
+ for (const auto& fieldNameAndState : expectedStateFieldCount) {
+ const auto actualValue = serverStatusSubObj.getIntField(fieldNameAndState.first);
+ if (actualValue != fieldNameAndState.second) {
+ LOGV2_DEBUG(6438701,
+ 0,
+ "Donor state field value does not match expected value",
+ "field"_attr = fieldNameAndState.first,
+ "serverStatus"_attr = serverStatusSubObj);
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ bool checkRecipientStateField(const ReshardingCumulativeMetrics* metrics,
+ boost::optional<RecipientStateEnum> expectedState) {
+ auto serverStatusSubObj = getStateSubObj(metrics);
+ std::map<std::string, int> expectedStateFieldCount;
+
+ auto addExpectedField = [&](RecipientStateEnum stateToPopulate) {
+ expectedStateFieldCount.emplace(
+ fieldNameFor(stateToPopulate),
+ ((expectedState && (stateToPopulate == expectedState)) ? 1 : 0));
+ };
+
+ addExpectedField(RecipientStateEnum::kAwaitingFetchTimestamp);
+ addExpectedField(RecipientStateEnum::kCreatingCollection);
+ addExpectedField(RecipientStateEnum::kCloning);
+ addExpectedField(RecipientStateEnum::kApplying);
+ addExpectedField(RecipientStateEnum::kError);
+ addExpectedField(RecipientStateEnum::kStrictConsistency);
+ addExpectedField(RecipientStateEnum::kDone);
+
+ for (const auto& fieldNameAndState : expectedStateFieldCount) {
+ const auto actualValue = serverStatusSubObj.getIntField(fieldNameAndState.first);
+ if (actualValue != fieldNameAndState.second) {
+ LOGV2_DEBUG(6438901,
+ 0,
+ "Recipient state field value does not match expected value",
+ "field"_attr = fieldNameAndState.first,
+ "serverStatus"_attr = serverStatusSubObj);
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ ReshardingCumulativeMetrics* _reshardingCumulativeMetrics;
+ std::unique_ptr<ReshardingCumulativeMetricsFieldNameProvider> _fieldNames;
+};
+
+
+TEST_F(ReshardingCumulativeMetricsTest, ReportContainsInsertsDuringFetching) {
+ using Role = ShardingDataTransformMetrics::Role;
+ ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient};
+ auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&recipient);
+
+ auto latencySection = getLatencySection(kResharding, _reshardingCumulativeMetrics);
+ ASSERT_EQ(latencySection.getIntField("oplogFetchingTotalLocalInserts"), 0);
+ ASSERT_EQ(latencySection.getIntField("oplogFetchingTotalLocalInsertTimeMillis"), 0);
+
+ _reshardingCumulativeMetrics->onLocalInsertDuringOplogFetching(Milliseconds(17));
+
+ latencySection = getLatencySection(kResharding, _reshardingCumulativeMetrics);
+ ASSERT_EQ(latencySection.getIntField("oplogFetchingTotalLocalInserts"), 1);
+ ASSERT_EQ(latencySection.getIntField("oplogFetchingTotalLocalInsertTimeMillis"), 17);
+}
+
+
+TEST_F(ReshardingCumulativeMetricsTest, ReportContainsBatchRetrievedDuringApplying) {
+ using Role = ShardingDataTransformMetrics::Role;
+ ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient};
+ auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&recipient);
+
+ auto latencySection = getLatencySection(kResharding, _reshardingCumulativeMetrics);
+ ASSERT_EQ(latencySection.getIntField("oplogApplyingTotalLocalBatchesRetrieved"), 0);
+ ASSERT_EQ(latencySection.getIntField("oplogApplyingTotalLocalBatchRetrievalTimeMillis"), 0);
+
+ _reshardingCumulativeMetrics->onBatchRetrievedDuringOplogApplying(Milliseconds(39));
+
+ latencySection = getLatencySection(kResharding, _reshardingCumulativeMetrics);
+ ASSERT_EQ(latencySection.getIntField("oplogApplyingTotalLocalBatchesRetrieved"), 1);
+ ASSERT_EQ(latencySection.getIntField("oplogApplyingTotalLocalBatchRetrievalTimeMillis"), 39);
+}
+
+
+TEST_F(ReshardingCumulativeMetricsTest, ReportContainsBatchApplied) {
+ using Role = ShardingDataTransformMetrics::Role;
+ ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient};
+ auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&recipient);
+
+ auto latencySection = getLatencySection(kResharding, _reshardingCumulativeMetrics);
+ ASSERT_EQ(latencySection.getIntField("oplogApplyingTotalLocalBatchesApplied"), 0);
+ ASSERT_EQ(latencySection.getIntField("oplogApplyingTotalLocalBatchApplyTimeMillis"), 0);
+
+ _reshardingCumulativeMetrics->onOplogLocalBatchApplied(Milliseconds(333));
+
+ latencySection = getLatencySection(kResharding, _reshardingCumulativeMetrics);
+ ASSERT_EQ(latencySection.getIntField("oplogApplyingTotalLocalBatchesApplied"), 1);
+ ASSERT_EQ(latencySection.getIntField("oplogApplyingTotalLocalBatchApplyTimeMillis"), 333);
+}
+
+TEST_F(ReshardingCumulativeMetricsTest, ReportContainsInsertsApplied) {
+ using Role = ShardingDataTransformMetrics::Role;
+ ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient};
+ auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&recipient);
+
+ auto activeSection = getActiveSection(kResharding, _reshardingCumulativeMetrics);
+ ASSERT_EQ(activeSection.getIntField("insertsApplied"), 0);
+
+ _reshardingCumulativeMetrics->onInsertApplied();
+
+ activeSection = getActiveSection(kResharding, _reshardingCumulativeMetrics);
+ ASSERT_EQ(activeSection.getIntField("insertsApplied"), 1);
+}
+
+TEST_F(ReshardingCumulativeMetricsTest, ReportContainsUpdatesApplied) {
+ using Role = ShardingDataTransformMetrics::Role;
+ ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient};
+ auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&recipient);
+
+ auto activeSection = getActiveSection(kResharding, _reshardingCumulativeMetrics);
+ ASSERT_EQ(activeSection.getIntField("updatesApplied"), 0);
+
+ _reshardingCumulativeMetrics->onUpdateApplied();
+
+ activeSection = getActiveSection(kResharding, _reshardingCumulativeMetrics);
+ ASSERT_EQ(activeSection.getIntField("updatesApplied"), 1);
+}
+
+TEST_F(ReshardingCumulativeMetricsTest, ReportContainsDeletesApplied) {
+ using Role = ShardingDataTransformMetrics::Role;
+ ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient};
+ auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&recipient);
+
+ auto activeSection = getActiveSection(kResharding, _reshardingCumulativeMetrics);
+ ASSERT_EQ(activeSection.getIntField("deletesApplied"), 0);
+
+ _reshardingCumulativeMetrics->onDeleteApplied();
+
+ activeSection = getActiveSection(kResharding, _reshardingCumulativeMetrics);
+ ASSERT_EQ(activeSection.getIntField("deletesApplied"), 1);
+}
+
+TEST_F(ReshardingCumulativeMetricsTest, ReportContainsOplogEntriesFetched) {
+ using Role = ShardingDataTransformMetrics::Role;
+ ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient};
+ auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&recipient);
+
+ auto activeSection = getActiveSection(kResharding, _reshardingCumulativeMetrics);
+ ASSERT_EQ(activeSection.getIntField("oplogEntriesFetched"), 0);
+
+ auto latencySection = getLatencySection(kResharding, _reshardingCumulativeMetrics);
+ ASSERT_EQ(latencySection.getIntField("oplogFetchingTotalRemoteBatchesRetrieved"), 0);
+ ASSERT_EQ(latencySection.getIntField("oplogFetchingTotalRemoteBatchRetrievalTimeMillis"), 0);
+
+ _reshardingCumulativeMetrics->onOplogEntriesFetched(123, Milliseconds(43));
+
+ activeSection = getActiveSection(kResharding, _reshardingCumulativeMetrics);
+ ASSERT_EQ(activeSection.getIntField("oplogEntriesFetched"), 123);
+
+ latencySection = getLatencySection(kResharding, _reshardingCumulativeMetrics);
+ ASSERT_EQ(latencySection.getIntField("oplogFetchingTotalRemoteBatchesRetrieved"), 1);
+ ASSERT_EQ(latencySection.getIntField("oplogFetchingTotalRemoteBatchRetrievalTimeMillis"), 43);
+}
+
+TEST_F(ReshardingCumulativeMetricsTest, ReportContainsOplogEntriesApplied) {
+ using Role = ShardingDataTransformMetrics::Role;
+ ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient};
+ auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&recipient);
+
+ auto activeSection = getActiveSection(kResharding, _reshardingCumulativeMetrics);
+ ASSERT_EQ(activeSection.getIntField("oplogEntriesApplied"), 0);
+
+ _reshardingCumulativeMetrics->onOplogEntriesApplied(99);
+
+ activeSection = getActiveSection(kResharding, _reshardingCumulativeMetrics);
+ ASSERT_EQ(activeSection.getIntField("oplogEntriesApplied"), 99);
+}
+
+TEST_F(ReshardingCumulativeMetricsTest,
+ SimulatedNormalCoordinatorStateTransitionReportsStateCorrectly) {
+ using Role = ShardingDataTransformMetrics::Role;
+ ObserverMock coordinator{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kCoordinator};
+ auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&coordinator);
+
+ ASSERT(checkCoordinateStateField(_reshardingCumulativeMetrics, CoordinatorStateEnum::kUnused));
+
+ boost::optional<CoordinatorStateEnum> prevState;
+ boost::optional<CoordinatorStateEnum> nextState;
+
+ auto simulateTransitionTo = [&](boost::optional<CoordinatorStateEnum> newState) {
+ prevState = nextState;
+ nextState = newState;
+ _reshardingCumulativeMetrics->onStateTransition(prevState, nextState);
+ return checkCoordinateStateField(_reshardingCumulativeMetrics, nextState);
+ };
+
+ ASSERT(simulateTransitionTo(CoordinatorStateEnum::kUnused));
+ ASSERT(simulateTransitionTo(CoordinatorStateEnum::kInitializing));
+ ASSERT(simulateTransitionTo(CoordinatorStateEnum::kPreparingToDonate));
+ ASSERT(simulateTransitionTo(CoordinatorStateEnum::kCloning));
+ ASSERT(simulateTransitionTo(CoordinatorStateEnum::kBlockingWrites));
+ ASSERT(simulateTransitionTo(CoordinatorStateEnum::kCommitting));
+ ASSERT(simulateTransitionTo(CoordinatorStateEnum::kDone));
+ ASSERT(simulateTransitionTo(boost::none));
+}
+
+TEST_F(ReshardingCumulativeMetricsTest,
+ SimulatedAbortedCoordinatorStateTransitionReportsStateCorrectly) {
+ using Role = ShardingDataTransformMetrics::Role;
+ ObserverMock coordinator{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kCoordinator};
+ auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&coordinator);
+
+ ASSERT(checkCoordinateStateField(_reshardingCumulativeMetrics, CoordinatorStateEnum::kUnused));
+
+ boost::optional<CoordinatorStateEnum> prevState;
+ boost::optional<CoordinatorStateEnum> nextState;
+
+ auto simulateTransitionTo = [&](boost::optional<CoordinatorStateEnum> newState) {
+ prevState = nextState;
+ nextState = newState;
+ _reshardingCumulativeMetrics->onStateTransition(prevState, nextState);
+ return checkCoordinateStateField(_reshardingCumulativeMetrics, nextState);
+ };
+
+ ASSERT(simulateTransitionTo(CoordinatorStateEnum::kUnused));
+ ASSERT(simulateTransitionTo(CoordinatorStateEnum::kInitializing));
+ ASSERT(simulateTransitionTo(CoordinatorStateEnum::kPreparingToDonate));
+ ASSERT(simulateTransitionTo(CoordinatorStateEnum::kAborting));
+ ASSERT(simulateTransitionTo(boost::none));
+}
+
+TEST_F(ReshardingCumulativeMetricsTest,
+ SimulatedSteppedDownCoordinatorStateFromUnusedReportsStateCorrectly) {
+ using Role = ShardingDataTransformMetrics::Role;
+ ObserverMock coordinator{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kCoordinator};
+ auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&coordinator);
+
+ boost::optional<CoordinatorStateEnum> initState = CoordinatorStateEnum::kUnused;
+ ASSERT(checkCoordinateStateField(_reshardingCumulativeMetrics, initState));
+
+ _reshardingCumulativeMetrics->onStateTransition(initState, {boost::none});
+ ASSERT(checkCoordinateStateField(_reshardingCumulativeMetrics, initState));
+}
+
+TEST_F(ReshardingCumulativeMetricsTest,
+ SimulatedSteppedDownCoordinatorStateTransitionReportsStateCorrectly) {
+ using Role = ShardingDataTransformMetrics::Role;
+ ObserverMock coordinator{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kCoordinator};
+ auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&coordinator);
+
+ ASSERT(checkCoordinateStateField(_reshardingCumulativeMetrics, CoordinatorStateEnum::kUnused));
+
+ boost::optional<CoordinatorStateEnum> prevState;
+ boost::optional<CoordinatorStateEnum> nextState;
+
+ auto simulateTransitionTo = [&](boost::optional<CoordinatorStateEnum> newState) {
+ prevState = nextState;
+ nextState = newState;
+ _reshardingCumulativeMetrics->onStateTransition(prevState, nextState);
+ return checkCoordinateStateField(_reshardingCumulativeMetrics, nextState);
+ };
+
+ ASSERT(simulateTransitionTo(CoordinatorStateEnum::kUnused));
+ ASSERT(simulateTransitionTo(CoordinatorStateEnum::kInitializing));
+ ASSERT(simulateTransitionTo(CoordinatorStateEnum::kPreparingToDonate));
+ ASSERT(simulateTransitionTo(boost::none));
+}
+
+TEST_F(ReshardingCumulativeMetricsTest, SimulatedNormalDonorStateTransitionReportsStateCorrectly) {
+ using Role = ShardingDataTransformMetrics::Role;
+ ObserverMock donor{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kDonor};
+ auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&donor);
+
+ ASSERT(checkDonorStateField(_reshardingCumulativeMetrics, DonorStateEnum::kUnused));
+
+ boost::optional<DonorStateEnum> prevState;
+ boost::optional<DonorStateEnum> nextState;
+
+ auto simulateTransitionTo = [&](boost::optional<DonorStateEnum> newState) {
+ prevState = nextState;
+ nextState = newState;
+ _reshardingCumulativeMetrics->onStateTransition(prevState, nextState);
+ return checkDonorStateField(_reshardingCumulativeMetrics, nextState);
+ };
+
+ ASSERT(simulateTransitionTo(DonorStateEnum::kUnused));
+ ASSERT(simulateTransitionTo(DonorStateEnum::kPreparingToDonate));
+ ASSERT(simulateTransitionTo(DonorStateEnum::kDonatingInitialData));
+ ASSERT(simulateTransitionTo(DonorStateEnum::kDonatingOplogEntries));
+ ASSERT(simulateTransitionTo(DonorStateEnum::kPreparingToBlockWrites));
+ ASSERT(simulateTransitionTo(DonorStateEnum::kBlockingWrites));
+ ASSERT(simulateTransitionTo(DonorStateEnum::kDone));
+ ASSERT(simulateTransitionTo(boost::none));
+}
+
+TEST_F(ReshardingCumulativeMetricsTest, SimulatedAbortedDonorStateTransitionReportsStateCorrectly) {
+ using Role = ShardingDataTransformMetrics::Role;
+ ObserverMock donor{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kDonor};
+ auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&donor);
+
+ ASSERT(checkDonorStateField(_reshardingCumulativeMetrics, DonorStateEnum::kUnused));
+
+ boost::optional<DonorStateEnum> prevState;
+ boost::optional<DonorStateEnum> nextState;
+
+ auto simulateTransitionTo = [&](boost::optional<DonorStateEnum> newState) {
+ prevState = nextState;
+ nextState = newState;
+ _reshardingCumulativeMetrics->onStateTransition(prevState, nextState);
+ return checkDonorStateField(_reshardingCumulativeMetrics, nextState);
+ };
+
+ ASSERT(simulateTransitionTo(DonorStateEnum::kUnused));
+ ASSERT(simulateTransitionTo(DonorStateEnum::kPreparingToDonate));
+ ASSERT(simulateTransitionTo(DonorStateEnum::kError));
+ ASSERT(simulateTransitionTo(DonorStateEnum::kDone));
+ ASSERT(simulateTransitionTo(boost::none));
+}
+
+TEST_F(ReshardingCumulativeMetricsTest,
+ SimulatedSteppedDownDonorStateFromUnusedReportsStateCorrectly) {
+ using Role = ShardingDataTransformMetrics::Role;
+ ObserverMock donor{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kDonor};
+ auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&donor);
+
+ boost::optional<DonorStateEnum> initState = DonorStateEnum::kUnused;
+ ASSERT(checkDonorStateField(_reshardingCumulativeMetrics, initState));
+
+ _reshardingCumulativeMetrics->onStateTransition(initState, {boost::none});
+ ASSERT(checkDonorStateField(_reshardingCumulativeMetrics, initState));
+}
+
+TEST_F(ReshardingCumulativeMetricsTest,
+ SimulatedSteppedDownDonorStateTransitionReportsStateCorrectly) {
+ using Role = ShardingDataTransformMetrics::Role;
+ ObserverMock donor{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kDonor};
+ auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&donor);
+
+ ASSERT(checkDonorStateField(_reshardingCumulativeMetrics, DonorStateEnum::kUnused));
+
+ boost::optional<DonorStateEnum> prevState;
+ boost::optional<DonorStateEnum> nextState;
+
+ auto simulateTransitionTo = [&](boost::optional<DonorStateEnum> newState) {
+ prevState = nextState;
+ nextState = newState;
+ _reshardingCumulativeMetrics->onStateTransition(prevState, nextState);
+ return checkDonorStateField(_reshardingCumulativeMetrics, nextState);
+ };
+
+ ASSERT(simulateTransitionTo(DonorStateEnum::kUnused));
+ ASSERT(simulateTransitionTo(DonorStateEnum::kPreparingToDonate));
+ ASSERT(simulateTransitionTo(DonorStateEnum::kDonatingInitialData));
+ ASSERT(simulateTransitionTo(boost::none));
+}
+
+TEST_F(ReshardingCumulativeMetricsTest,
+ SimulatedNormalRecipientStateTransitionReportsStateCorrectly) {
+ using Role = ShardingDataTransformMetrics::Role;
+ ObserverMock recipient{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kRecipient};
+ auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&recipient);
+
+ ASSERT(checkRecipientStateField(_reshardingCumulativeMetrics, RecipientStateEnum::kUnused));
+
+ boost::optional<RecipientStateEnum> prevState;
+ boost::optional<RecipientStateEnum> nextState;
+
+ auto simulateTransitionTo = [&](boost::optional<RecipientStateEnum> newState) {
+ prevState = nextState;
+ nextState = newState;
+ _reshardingCumulativeMetrics->onStateTransition(prevState, nextState);
+ return checkRecipientStateField(_reshardingCumulativeMetrics, nextState);
+ };
+
+ ASSERT(simulateTransitionTo(RecipientStateEnum::kUnused));
+ ASSERT(simulateTransitionTo(RecipientStateEnum::kAwaitingFetchTimestamp));
+ ASSERT(simulateTransitionTo(RecipientStateEnum::kCreatingCollection));
+ ASSERT(simulateTransitionTo(RecipientStateEnum::kCloning));
+ ASSERT(simulateTransitionTo(RecipientStateEnum::kApplying));
+ ASSERT(simulateTransitionTo(RecipientStateEnum::kStrictConsistency));
+ ASSERT(simulateTransitionTo(RecipientStateEnum::kDone));
+ ASSERT(simulateTransitionTo(boost::none));
+}
+
+TEST_F(ReshardingCumulativeMetricsTest,
+ SimulatedAbortedRecipientStateTransitionReportsStateCorrectly) {
+ using Role = ShardingDataTransformMetrics::Role;
+ ObserverMock recipient{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kRecipient};
+ auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&recipient);
+
+ ASSERT(checkRecipientStateField(_reshardingCumulativeMetrics, RecipientStateEnum::kUnused));
+
+ boost::optional<RecipientStateEnum> prevState;
+ boost::optional<RecipientStateEnum> nextState;
+
+ auto simulateTransitionTo = [&](boost::optional<RecipientStateEnum> newState) {
+ prevState = nextState;
+ nextState = newState;
+ _reshardingCumulativeMetrics->onStateTransition(prevState, nextState);
+ return checkRecipientStateField(_reshardingCumulativeMetrics, nextState);
+ };
+
+ ASSERT(simulateTransitionTo(RecipientStateEnum::kUnused));
+ ASSERT(simulateTransitionTo(RecipientStateEnum::kAwaitingFetchTimestamp));
+ ASSERT(simulateTransitionTo(RecipientStateEnum::kError));
+ ASSERT(simulateTransitionTo(boost::none));
+}
+
+TEST_F(ReshardingCumulativeMetricsTest,
+ SimulatedSteppedDownRecipientStateFromUnusedReportsStateCorrectly) {
+ using Role = ShardingDataTransformMetrics::Role;
+ ObserverMock recipient{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kRecipient};
+ auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&recipient);
+
+ boost::optional<RecipientStateEnum> initState = RecipientStateEnum::kUnused;
+ ASSERT(checkRecipientStateField(_reshardingCumulativeMetrics, initState));
+
+ _reshardingCumulativeMetrics->onStateTransition(initState, {boost::none});
+ ASSERT(checkRecipientStateField(_reshardingCumulativeMetrics, initState));
+}
+
+TEST_F(ReshardingCumulativeMetricsTest,
+ SimulatedSteppedDownRecipientStateTransitionReportsStateCorrectly) {
+ using Role = ShardingDataTransformMetrics::Role;
+ ObserverMock recipient{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kRecipient};
+ auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&recipient);
+
+ ASSERT(checkRecipientStateField(_reshardingCumulativeMetrics, RecipientStateEnum::kUnused));
+
+ boost::optional<RecipientStateEnum> prevState;
+ boost::optional<RecipientStateEnum> nextState;
+
+ auto simulateTransitionTo = [&](boost::optional<RecipientStateEnum> newState) {
+ prevState = nextState;
+ nextState = newState;
+ _reshardingCumulativeMetrics->onStateTransition(prevState, nextState);
+ return checkRecipientStateField(_reshardingCumulativeMetrics, nextState);
+ };
+
+ ASSERT(simulateTransitionTo(RecipientStateEnum::kUnused));
+ ASSERT(simulateTransitionTo(RecipientStateEnum::kAwaitingFetchTimestamp));
+ ASSERT(simulateTransitionTo(RecipientStateEnum::kCreatingCollection));
+ ASSERT(simulateTransitionTo(boost::none));
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_metrics.cpp b/src/mongo/db/s/resharding/resharding_metrics.cpp
index 69d2e899d84..f0faa8ddd2c 100644
--- a/src/mongo/db/s/resharding/resharding_metrics.cpp
+++ b/src/mongo/db/s/resharding/resharding_metrics.cpp
@@ -120,6 +120,10 @@ std::string ReshardingMetrics::createOperationDescription() const noexcept {
_instanceId.toString());
}
+ReshardingCumulativeMetrics* ReshardingMetrics::getReshardingCumulativeMetrics() {
+ return dynamic_cast<ReshardingCumulativeMetrics*>(getCumulativeMetrics());
+}
+
Milliseconds ReshardingMetrics::getRecipientHighEstimateRemainingTimeMillis() const {
auto estimate = resharding::estimateRemainingRecipientTime(_applyingStartTime.load() != kNoDate,
getBytesWrittenCount(),
@@ -222,9 +226,8 @@ void ReshardingMetrics::restoreCoordinatorSpecificFields(
ReshardingMetrics::DonorState::DonorState(DonorStateEnum enumVal) : _enumVal(enumVal) {}
-ShardingDataTransformCumulativeMetrics::DonorStateEnum ReshardingMetrics::DonorState::toMetrics()
- const {
- using MetricsEnum = ShardingDataTransformCumulativeMetrics::DonorStateEnum;
+ReshardingCumulativeMetrics::DonorStateEnum ReshardingMetrics::DonorState::toMetrics() const {
+ using MetricsEnum = ReshardingCumulativeMetrics::DonorStateEnum;
switch (_enumVal) {
case DonorStateEnum::kUnused:
@@ -264,9 +267,9 @@ DonorStateEnum ReshardingMetrics::DonorState::getState() const {
ReshardingMetrics::RecipientState::RecipientState(RecipientStateEnum enumVal) : _enumVal(enumVal) {}
-ShardingDataTransformCumulativeMetrics::RecipientStateEnum
-ReshardingMetrics::RecipientState::toMetrics() const {
- using MetricsEnum = ShardingDataTransformCumulativeMetrics::RecipientStateEnum;
+ReshardingCumulativeMetrics::RecipientStateEnum ReshardingMetrics::RecipientState::toMetrics()
+ const {
+ using MetricsEnum = ReshardingCumulativeMetrics::RecipientStateEnum;
switch (_enumVal) {
case RecipientStateEnum::kUnused:
@@ -308,35 +311,37 @@ RecipientStateEnum ReshardingMetrics::RecipientState::getState() const {
ReshardingMetrics::CoordinatorState::CoordinatorState(CoordinatorStateEnum enumVal)
: _enumVal(enumVal) {}
-ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum
-ReshardingMetrics::CoordinatorState::toMetrics() const {
+ReshardingCumulativeMetrics::CoordinatorStateEnum ReshardingMetrics::CoordinatorState::toMetrics()
+ const {
+ using MetricsEnum = ReshardingCumulativeMetrics::CoordinatorStateEnum;
+
switch (_enumVal) {
case CoordinatorStateEnum::kUnused:
- return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kUnused;
+ return MetricsEnum::kUnused;
case CoordinatorStateEnum::kInitializing:
- return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kInitializing;
+ return MetricsEnum::kInitializing;
case CoordinatorStateEnum::kPreparingToDonate:
- return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kPreparingToDonate;
+ return MetricsEnum::kPreparingToDonate;
case CoordinatorStateEnum::kCloning:
- return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kCloning;
+ return MetricsEnum::kCloning;
case CoordinatorStateEnum::kApplying:
- return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kApplying;
+ return MetricsEnum::kApplying;
case CoordinatorStateEnum::kBlockingWrites:
- return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kBlockingWrites;
+ return MetricsEnum::kBlockingWrites;
case CoordinatorStateEnum::kAborting:
- return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kAborting;
+ return MetricsEnum::kAborting;
case CoordinatorStateEnum::kCommitting:
- return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kCommitting;
+ return MetricsEnum::kCommitting;
case CoordinatorStateEnum::kDone:
- return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kDone;
+ return MetricsEnum::kDone;
default:
invariant(false,
str::stream() << "Unexpected resharding coordinator state: "
@@ -351,22 +356,22 @@ CoordinatorStateEnum ReshardingMetrics::CoordinatorState::getState() const {
void ReshardingMetrics::onDeleteApplied() {
_deletesApplied.addAndFetch(1);
- getCumulativeMetrics()->onDeleteApplied();
+ getReshardingCumulativeMetrics()->onDeleteApplied();
}
void ReshardingMetrics::onInsertApplied() {
_insertsApplied.addAndFetch(1);
- getCumulativeMetrics()->onInsertApplied();
+ getReshardingCumulativeMetrics()->onInsertApplied();
}
void ReshardingMetrics::onUpdateApplied() {
_updatesApplied.addAndFetch(1);
- getCumulativeMetrics()->onUpdateApplied();
+ getReshardingCumulativeMetrics()->onUpdateApplied();
}
void ReshardingMetrics::onOplogEntriesFetched(int64_t numEntries, Milliseconds elapsed) {
_oplogEntriesFetched.addAndFetch(numEntries);
- getCumulativeMetrics()->onOplogEntriesFetched(numEntries, elapsed);
+ getReshardingCumulativeMetrics()->onOplogEntriesFetched(numEntries, elapsed);
}
void ReshardingMetrics::restoreOplogEntriesFetched(int64_t numEntries) {
@@ -375,13 +380,25 @@ void ReshardingMetrics::restoreOplogEntriesFetched(int64_t numEntries) {
void ReshardingMetrics::onOplogEntriesApplied(int64_t numEntries) {
_oplogEntriesApplied.addAndFetch(numEntries);
- getCumulativeMetrics()->onOplogEntriesApplied(numEntries);
+ getReshardingCumulativeMetrics()->onOplogEntriesApplied(numEntries);
}
void ReshardingMetrics::restoreOplogEntriesApplied(int64_t numEntries) {
_oplogEntriesApplied.store(numEntries);
}
+void ReshardingMetrics::onLocalInsertDuringOplogFetching(Milliseconds elapsed) {
+ getReshardingCumulativeMetrics()->onLocalInsertDuringOplogFetching(elapsed);
+}
+
+void ReshardingMetrics::onBatchRetrievedDuringOplogApplying(Milliseconds elapsed) {
+ getReshardingCumulativeMetrics()->onBatchRetrievedDuringOplogApplying(elapsed);
+}
+
+void ReshardingMetrics::onOplogLocalBatchApplied(Milliseconds elapsed) {
+ getReshardingCumulativeMetrics()->onOplogLocalBatchApplied(elapsed);
+}
+
void ReshardingMetrics::onApplyingBegin() {
_applyingStartTime.store(getClockSource()->now());
}
diff --git a/src/mongo/db/s/resharding/resharding_metrics.h b/src/mongo/db/s/resharding/resharding_metrics.h
index 909f78e8c80..26a82a3ba89 100644
--- a/src/mongo/db/s/resharding/resharding_metrics.h
+++ b/src/mongo/db/s/resharding/resharding_metrics.h
@@ -31,6 +31,7 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/s/resharding/resharding_cumulative_metrics.h"
#include "mongo/db/s/resharding/resharding_metrics_field_name_provider.h"
#include "mongo/db/s/resharding/resharding_metrics_helpers.h"
#include "mongo/db/s/resharding/resharding_oplog_applier_progress_gen.h"
@@ -44,7 +45,7 @@ public:
using State = stdx::variant<CoordinatorStateEnum, RecipientStateEnum, DonorStateEnum>;
class DonorState {
public:
- using MetricsType = ShardingDataTransformCumulativeMetrics::DonorStateEnum;
+ using MetricsType = ReshardingCumulativeMetrics::DonorStateEnum;
explicit DonorState(DonorStateEnum enumVal);
MetricsType toMetrics() const;
@@ -56,7 +57,7 @@ public:
class RecipientState {
public:
- using MetricsType = ShardingDataTransformCumulativeMetrics::RecipientStateEnum;
+ using MetricsType = ReshardingCumulativeMetrics::RecipientStateEnum;
explicit RecipientState(RecipientStateEnum enumVal);
MetricsType toMetrics() const;
@@ -68,7 +69,7 @@ public:
class CoordinatorState {
public:
- using MetricsType = ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum;
+ using MetricsType = ReshardingCumulativeMetrics::CoordinatorStateEnum;
explicit CoordinatorState(CoordinatorStateEnum enumVal);
MetricsType toMetrics() const;
@@ -123,22 +124,22 @@ public:
template <typename T>
void onStateTransition(T before, boost::none_t after) {
- getCumulativeMetrics()->onStateTransition<typename T::MetricsType>(before.toMetrics(),
- after);
+ getReshardingCumulativeMetrics()->onStateTransition<typename T::MetricsType>(
+ before.toMetrics(), after);
}
template <typename T>
void onStateTransition(boost::none_t before, T after) {
setState(after.getState());
- getCumulativeMetrics()->onStateTransition<typename T::MetricsType>(before,
- after.toMetrics());
+ getReshardingCumulativeMetrics()->onStateTransition<typename T::MetricsType>(
+ before, after.toMetrics());
}
template <typename T>
void onStateTransition(T before, T after) {
setState(after.getState());
- getCumulativeMetrics()->onStateTransition<typename T::MetricsType>(before.toMetrics(),
- after.toMetrics());
+ getReshardingCumulativeMetrics()->onStateTransition<typename T::MetricsType>(
+ before.toMetrics(), after.toMetrics());
}
void accumulateFrom(const ReshardingOplogApplierProgress& progressDoc);
@@ -153,6 +154,9 @@ public:
void restoreOplogEntriesApplied(int64_t numEntries);
void onApplyingBegin();
void onApplyingEnd();
+ void onLocalInsertDuringOplogFetching(Milliseconds elapsed);
+ void onBatchRetrievedDuringOplogApplying(Milliseconds elapsed);
+ void onOplogLocalBatchApplied(Milliseconds elapsed);
Seconds getApplyingElapsedTimeSecs() const;
Date_t getApplyingBegin() const;
@@ -168,6 +172,7 @@ private:
std::string createOperationDescription() const noexcept override;
void restoreRecipientSpecificFields(const ReshardingRecipientDocument& document);
void restoreCoordinatorSpecificFields(const ReshardingCoordinatorDocument& document);
+ ReshardingCumulativeMetrics* getReshardingCumulativeMetrics();
template <typename T>
void setState(T state) {
diff --git a/src/mongo/db/s/resharding/resharding_metrics_test.cpp b/src/mongo/db/s/resharding/resharding_metrics_test.cpp
index 1e20c778071..aa18742fb7c 100644
--- a/src/mongo/db/s/resharding/resharding_metrics_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_metrics_test.cpp
@@ -49,6 +49,11 @@ const auto kShardKey = BSON("newKey" << 1);
class ReshardingMetricsTest : public ShardingDataTransformMetricsTestFixture {
public:
+ virtual std::unique_ptr<ShardingDataTransformCumulativeMetrics> initializeCumulativeMetrics()
+ override {
+ return std::make_unique<ReshardingCumulativeMetrics>();
+ }
+
std::unique_ptr<ReshardingMetrics> createInstanceMetrics(ClockSource* clockSource,
UUID instanceId = UUID::gen(),
Role role = Role::kDonor) {
@@ -58,7 +63,7 @@ public:
role,
clockSource->now(),
clockSource,
- &_cumulativeMetrics);
+ _cumulativeMetrics.get());
}
const UUID& getSourceCollectionId() {
@@ -69,7 +74,7 @@ public:
template <typename T>
BSONObj getReportFromStateDocument(T document) {
auto metrics =
- ReshardingMetrics::initializeFrom(document, getClockSource(), &_cumulativeMetrics);
+ ReshardingMetrics::initializeFrom(document, getClockSource(), _cumulativeMetrics.get());
return metrics->reportForCurrentOp();
}
@@ -169,7 +174,7 @@ public:
doc.setMetrics(metricsDoc);
auto metrics =
- ReshardingMetrics::initializeFrom(doc, getClockSource(), &_cumulativeMetrics);
+ ReshardingMetrics::initializeFrom(doc, getClockSource(), _cumulativeMetrics.get());
clock->advance(kInterval);
auto report = metrics->reportForCurrentOp();
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier_metrics_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier_metrics_test.cpp
index 7c04439713a..4e5e29943bf 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_applier_metrics_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_applier_metrics_test.cpp
@@ -42,6 +42,11 @@ namespace {
class ReshardingOplogApplierMetricsTest : public ShardingDataTransformMetricsTestFixture {
public:
+ virtual std::unique_ptr<ShardingDataTransformCumulativeMetrics> initializeCumulativeMetrics()
+ override {
+ return std::make_unique<ReshardingCumulativeMetrics>();
+ }
+
std::unique_ptr<ReshardingMetrics> createInstanceMetrics() {
return std::make_unique<ReshardingMetrics>(UUID::gen(),
kTestCommand,
@@ -49,7 +54,7 @@ public:
ReshardingMetrics::Role::kRecipient,
getClockSource()->now(),
getClockSource(),
- &_cumulativeMetrics);
+ _cumulativeMetrics.get());
}
};
diff --git a/src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp b/src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp
index 5d7d4e052fe..70c5252a96b 100644
--- a/src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp
+++ b/src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp
@@ -28,6 +28,7 @@
*/
#include "mongo/db/s/sharding_data_transform_cumulative_metrics.h"
+#include "mongo/db/s/resharding/resharding_cumulative_metrics.h"
#include <cstdint>
@@ -37,74 +38,18 @@
namespace mongo {
namespace {
-constexpr int32_t kPlaceholderInt = 0;
-constexpr int64_t kPlaceholderLong = 0;
-} // namespace
-
-namespace {
-constexpr auto kResharding = "resharding";
constexpr auto kGlobalIndex = "globalIndex";
constexpr auto kActive = "active";
-constexpr auto kOplogEntriesFetched = "oplogEntriesFetched";
-constexpr auto kOplogEntriesApplied = "oplogEntriesApplied";
-constexpr auto kInsertsApplied = "insertsApplied";
-constexpr auto kUpdatesApplied = "updatesApplied";
-constexpr auto kDeletesApplied = "deletesApplied";
constexpr auto kOldestActive = "oldestActive";
constexpr auto kLatencies = "latencies";
-constexpr auto kOplogFetchingTotalRemoteBatchRetrievalTimeMillis =
- "oplogFetchingTotalRemoteBatchRetrievalTimeMillis";
-constexpr auto kOplogFetchingTotalRemoteBatchesRetrieved =
- "oplogFetchingTotalRemoteBatchesRetrieved";
-constexpr auto kOplogFetchingTotalLocalInsertTimeMillis = "oplogFetchingTotalLocalInsertTimeMillis";
-constexpr auto kOplogFetchingTotalLocalInserts = "oplogFetchingTotalLocalInserts";
-constexpr auto kOplogApplyingTotalLocalBatchRetrievalTimeMillis =
- "oplogApplyingTotalLocalBatchRetrievalTimeMillis";
-constexpr auto kOplogApplyingTotalLocalBatchesRetrieved = "oplogApplyingTotalLocalBatchesRetrieved";
-constexpr auto kOplogApplyingTotalLocalBatchApplyTimeMillis =
- "oplogApplyingTotalLocalBatchApplyTimeMillis";
-constexpr auto kOplogApplyingTotalLocalBatchesApplied = "oplogApplyingTotalLocalBatchesApplied";
constexpr auto kCurrentInSteps = "currentInSteps";
-constexpr auto kCountInstancesInCoordinatorState1Initializing =
- "countInstancesInCoordinatorState1Initializing";
-constexpr auto kCountInstancesInCoordinatorState2PreparingToDonate =
- "countInstancesInCoordinatorState2PreparingToDonate";
-constexpr auto kCountInstancesInCoordinatorState3Cloning =
- "countInstancesInCoordinatorState3Cloning";
-constexpr auto kCountInstancesInCoordinatorState4Applying =
- "countInstancesInCoordinatorState4Applying";
-constexpr auto kCountInstancesInCoordinatorState5BlockingWrites =
- "countInstancesInCoordinatorState5BlockingWrites";
-constexpr auto kCountInstancesInCoordinatorState6Aborting =
- "countInstancesInCoordinatorState6Aborting";
-constexpr auto kCountInstancesInCoordinatorState7Committing =
- "countInstancesInCoordinatorState7Committing";
-constexpr auto kCountInstancesInRecipientState1AwaitingFetchTimestamp =
- "countInstancesInRecipientState1AwaitingFetchTimestamp";
-constexpr auto kCountInstancesInRecipientState2CreatingCollection =
- "countInstancesInRecipientState2CreatingCollection";
-constexpr auto kCountInstancesInRecipientState3Cloning = "countInstancesInRecipientState3Cloning";
-constexpr auto kCountInstancesInRecipientState4Applying = "countInstancesInRecipientState4Applying";
-constexpr auto kCountInstancesInRecipientState5Error = "countInstancesInRecipientState5Error";
-constexpr auto kCountInstancesInRecipientState6StrictConsistency =
- "countInstancesInRecipientState6StrictConsistency";
-constexpr auto kCountInstancesInRecipientState7Done = "countInstancesInRecipientState7Done";
-constexpr auto kCountInstancesInDonorState1PreparingToDonate =
- "countInstancesInDonorState1PreparingToDonate";
-constexpr auto kCountInstancesInDonorState2DonatingInitialData =
- "countInstancesInDonorState2DonatingInitialData";
-constexpr auto kCountInstancesInDonorState3DonatingOplogEntries =
- "countInstancesInDonorState3DonatingOplogEntries";
-constexpr auto kCountInstancesInDonorState4PreparingToBlockWrites =
- "countInstancesInDonorState4PreparingToBlockWrites";
-constexpr auto kCountInstancesInDonorState5Error = "countInstancesInDonorState5Error";
-constexpr auto kCountInstancesInDonorState6BlockingWrites =
- "countInstancesInDonorState6BlockingWrites";
-constexpr auto kCountInstancesInDonorState7Done = "countInstancesInDonorState7Done";
struct Metrics {
- Metrics() : _resharding(kResharding), _globalIndexes(kGlobalIndex) {}
- ShardingDataTransformCumulativeMetrics _resharding;
+ Metrics()
+ : _globalIndexes(
+ kGlobalIndex,
+ std::make_unique<ShardingDataTransformCumulativeMetricsFieldNamePlaceholder>()) {}
+ ReshardingCumulativeMetrics _resharding;
ShardingDataTransformCumulativeMetrics _globalIndexes;
};
using MetricsPtr = std::unique_ptr<Metrics>;
@@ -129,33 +74,11 @@ ShardingDataTransformCumulativeMetrics* ShardingDataTransformCumulativeMetrics::
}
ShardingDataTransformCumulativeMetrics::ShardingDataTransformCumulativeMetrics(
- const std::string& rootSectionName)
+ const std::string& rootSectionName, std::unique_ptr<NameProvider> fieldNameProvider)
: _rootSectionName{rootSectionName},
- _fieldNames{std::make_unique<ShardingDataTransformCumulativeMetricsFieldNamePlaceholder>()},
+ _fieldNames{std::move(fieldNameProvider)},
_instanceMetricsForAllRoles(ShardingDataTransformMetrics::kRoleCount),
- _operationWasAttempted{false},
- _coordinatorStateList{AtomicWord<int64_t>{0},
- AtomicWord<int64_t>{0},
- AtomicWord<int64_t>{0},
- AtomicWord<int64_t>{0},
- AtomicWord<int64_t>{0},
- AtomicWord<int64_t>{0},
- AtomicWord<int64_t>{0},
- AtomicWord<int64_t>{0}},
- _donorStateList{AtomicWord<int64_t>{0},
- AtomicWord<int64_t>{0},
- AtomicWord<int64_t>{0},
- AtomicWord<int64_t>{0},
- AtomicWord<int64_t>{0},
- AtomicWord<int64_t>{0},
- AtomicWord<int64_t>{0}},
- _recipientStateList{AtomicWord<int64_t>{0},
- AtomicWord<int64_t>{0},
- AtomicWord<int64_t>{0},
- AtomicWord<int64_t>{0},
- AtomicWord<int64_t>{0},
- AtomicWord<int64_t>{0},
- AtomicWord<int64_t>{0}} {}
+ _operationWasAttempted{false} {}
ShardingDataTransformCumulativeMetrics::UniqueScopedObserver
ShardingDataTransformCumulativeMetrics::registerInstanceMetrics(const InstanceObserver* metrics) {
@@ -208,94 +131,59 @@ void ShardingDataTransformCumulativeMetrics::reportForServerStatus(BSONObjBuilde
root.append(_fieldNames->getForCountCanceled(), _countCancelled.load());
root.append(_fieldNames->getForLastOpEndingChunkImbalance(),
_lastOpEndingChunkImbalance.load());
-
- reportActive(&root);
- reportOldestActive(&root);
- reportLatencies(&root);
- reportCurrentInSteps(&root);
+ {
+ BSONObjBuilder active(bob->subobjStart(kActive));
+ reportActive(&active);
+ }
+ {
+ BSONObjBuilder oldest(bob->subobjStart(kOldestActive));
+ reportOldestActive(&oldest);
+ }
+ {
+ BSONObjBuilder latencies(bob->subobjStart(kLatencies));
+ reportLatencies(&latencies);
+ }
+ {
+ BSONObjBuilder steps(bob->subobjStart(kCurrentInSteps));
+ reportCurrentInSteps(&steps);
+ }
}
void ShardingDataTransformCumulativeMetrics::reportActive(BSONObjBuilder* bob) const {
- BSONObjBuilder s(bob->subobjStart(kActive));
- s.append(_fieldNames->getForDocumentsProcessed(), _documentsProcessed.load());
- s.append(_fieldNames->getForBytesWritten(), _bytesWritten.load());
- s.append(kOplogEntriesFetched, _oplogEntriesFetched.load());
- s.append(kOplogEntriesApplied, _oplogEntriesApplied.load());
- s.append(kInsertsApplied, _insertsApplied.load());
- s.append(kUpdatesApplied, _updatesApplied.load());
- s.append(kDeletesApplied, _deletesApplied.load());
- s.append(_fieldNames->getForCountWritesToStashCollections(),
- _writesToStashedCollections.load());
- s.append(_fieldNames->getForCountWritesDuringCriticalSection(),
- _writesDuringCriticalSection.load());
- s.append(_fieldNames->getForCountReadsDuringCriticalSection(),
- _readsDuringCriticalSection.load());
+ bob->append(_fieldNames->getForDocumentsProcessed(), _documentsProcessed.load());
+ bob->append(_fieldNames->getForBytesWritten(), _bytesWritten.load());
+ bob->append(_fieldNames->getForCountWritesToStashCollections(),
+ _writesToStashedCollections.load());
+ bob->append(_fieldNames->getForCountWritesDuringCriticalSection(),
+ _writesDuringCriticalSection.load());
+ bob->append(_fieldNames->getForCountReadsDuringCriticalSection(),
+ _readsDuringCriticalSection.load());
}
void ShardingDataTransformCumulativeMetrics::reportOldestActive(BSONObjBuilder* bob) const {
- BSONObjBuilder s(bob->subobjStart(kOldestActive));
- s.append(_fieldNames->getForCoordinatorAllShardsHighestRemainingOperationTimeEstimatedMillis(),
- getOldestOperationHighEstimateRemainingTimeMillis(Role::kCoordinator));
- s.append(_fieldNames->getForCoordinatorAllShardsLowestRemainingOperationTimeEstimatedMillis(),
- getOldestOperationLowEstimateRemainingTimeMillis(Role::kCoordinator));
- s.append(_fieldNames->getForRecipientRemainingOperationTimeEstimatedMillis(),
- getOldestOperationHighEstimateRemainingTimeMillis(Role::kRecipient));
+ bob->append(
+ _fieldNames->getForCoordinatorAllShardsHighestRemainingOperationTimeEstimatedMillis(),
+ getOldestOperationHighEstimateRemainingTimeMillis(Role::kCoordinator));
+ bob->append(
+ _fieldNames->getForCoordinatorAllShardsLowestRemainingOperationTimeEstimatedMillis(),
+ getOldestOperationLowEstimateRemainingTimeMillis(Role::kCoordinator));
+ bob->append(_fieldNames->getForRecipientRemainingOperationTimeEstimatedMillis(),
+ getOldestOperationHighEstimateRemainingTimeMillis(Role::kRecipient));
}
void ShardingDataTransformCumulativeMetrics::reportLatencies(BSONObjBuilder* bob) const {
- BSONObjBuilder s(bob->subobjStart(kLatencies));
- s.append(_fieldNames->getForCollectionCloningTotalRemoteBatchRetrievalTimeMillis(),
- _totalBatchRetrievedDuringCloneMillis.load());
- s.append(_fieldNames->getForCollectionCloningTotalRemoteBatchesRetrieved(),
- _totalBatchRetrievedDuringClone.load());
- s.append(_fieldNames->getForCollectionCloningTotalLocalInsertTimeMillis(),
- _collectionCloningTotalLocalInsertTimeMillis.load());
- s.append(_fieldNames->getForCollectionCloningTotalLocalInserts(),
- _collectionCloningTotalLocalBatchInserts.load());
- s.append(kOplogFetchingTotalRemoteBatchRetrievalTimeMillis,
- _oplogFetchingTotalRemoteBatchesRetrievalTimeMillis.load());
- s.append(kOplogFetchingTotalRemoteBatchesRetrieved,
- _oplogFetchingTotalRemoteBatchesRetrieved.load());
- s.append(kOplogFetchingTotalLocalInsertTimeMillis,
- _oplogFetchingTotalLocalInsertTimeMillis.load());
- s.append(kOplogFetchingTotalLocalInserts, _oplogFetchingTotalLocalInserts.load());
- s.append(kOplogApplyingTotalLocalBatchRetrievalTimeMillis,
- _oplogApplyingTotalBatchesRetrievalTimeMillis.load());
- s.append(kOplogApplyingTotalLocalBatchesRetrieved, _oplogApplyingTotalBatchesRetrieved.load());
- s.append(kOplogApplyingTotalLocalBatchApplyTimeMillis, _oplogBatchAppliedMillis.load());
- s.append(kOplogApplyingTotalLocalBatchesApplied, _oplogBatchApplied.load());
+ bob->append(_fieldNames->getForCollectionCloningTotalRemoteBatchRetrievalTimeMillis(),
+ _totalBatchRetrievedDuringCloneMillis.load());
+ bob->append(_fieldNames->getForCollectionCloningTotalRemoteBatchesRetrieved(),
+ _totalBatchRetrievedDuringClone.load());
+ bob->append(_fieldNames->getForCollectionCloningTotalLocalInsertTimeMillis(),
+ _collectionCloningTotalLocalInsertTimeMillis.load());
+ bob->append(_fieldNames->getForCollectionCloningTotalLocalInserts(),
+ _collectionCloningTotalLocalBatchInserts.load());
}
void ShardingDataTransformCumulativeMetrics::reportCurrentInSteps(BSONObjBuilder* bob) const {
- BSONObjBuilder s(bob->subobjStart(kCurrentInSteps));
-
- auto reportState = [this, &s](auto state) {
- s.append(fieldNameFor(state), getStateCounter(state)->load());
- };
-
- reportState(CoordinatorStateEnum::kInitializing);
- reportState(CoordinatorStateEnum::kPreparingToDonate);
- reportState(CoordinatorStateEnum::kCloning);
- reportState(CoordinatorStateEnum::kApplying);
- reportState(CoordinatorStateEnum::kBlockingWrites);
- reportState(CoordinatorStateEnum::kAborting);
- reportState(CoordinatorStateEnum::kCommitting);
-
- reportState(RecipientStateEnum::kAwaitingFetchTimestamp);
- reportState(RecipientStateEnum::kCreatingCollection);
- reportState(RecipientStateEnum::kCloning);
- reportState(RecipientStateEnum::kApplying);
- reportState(RecipientStateEnum::kError);
- reportState(RecipientStateEnum::kStrictConsistency);
- reportState(RecipientStateEnum::kDone);
-
- reportState(DonorStateEnum::kPreparingToDonate);
- reportState(DonorStateEnum::kDonatingInitialData);
- reportState(DonorStateEnum::kDonatingOplogEntries);
- reportState(DonorStateEnum::kPreparingToBlockWrites);
- reportState(DonorStateEnum::kError);
- reportState(DonorStateEnum::kBlockingWrites);
- reportState(DonorStateEnum::kDone);
+ // Do nothing.
}
const ShardingDataTransformCumulativeMetrics::InstanceObserver*
@@ -354,60 +242,6 @@ void ShardingDataTransformCumulativeMetrics::setLastOpEndingChunkImbalance(int64
_lastOpEndingChunkImbalance.store(imbalanceCount);
}
-ShardingDataTransformCumulativeMetrics::CoordinatorStateArray*
-ShardingDataTransformCumulativeMetrics::getStateArrayFor(CoordinatorStateEnum state) {
- return &_coordinatorStateList;
-}
-
-const ShardingDataTransformCumulativeMetrics::CoordinatorStateArray*
-ShardingDataTransformCumulativeMetrics::getStateArrayFor(CoordinatorStateEnum state) const {
- return &_coordinatorStateList;
-}
-
-ShardingDataTransformCumulativeMetrics::DonorStateArray*
-ShardingDataTransformCumulativeMetrics::getStateArrayFor(DonorStateEnum state) {
- return &_donorStateList;
-}
-
-const ShardingDataTransformCumulativeMetrics::DonorStateArray*
-ShardingDataTransformCumulativeMetrics::getStateArrayFor(DonorStateEnum state) const {
- return &_donorStateList;
-}
-
-const char* ShardingDataTransformCumulativeMetrics::fieldNameFor(
- ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum state) {
- switch (state) {
- case CoordinatorStateEnum::kInitializing:
- return kCountInstancesInCoordinatorState1Initializing;
-
- case CoordinatorStateEnum::kPreparingToDonate:
- return kCountInstancesInCoordinatorState2PreparingToDonate;
-
- case CoordinatorStateEnum::kCloning:
- return kCountInstancesInCoordinatorState3Cloning;
-
- case CoordinatorStateEnum::kApplying:
- return kCountInstancesInCoordinatorState4Applying;
-
- case CoordinatorStateEnum::kBlockingWrites:
- return kCountInstancesInCoordinatorState5BlockingWrites;
-
- case CoordinatorStateEnum::kAborting:
- return kCountInstancesInCoordinatorState6Aborting;
-
- case CoordinatorStateEnum::kCommitting:
- return kCountInstancesInCoordinatorState7Committing;
-
- default:
- uasserted(6438601,
- str::stream()
- << "no field name for coordinator state " << static_cast<int32_t>(state));
- break;
- }
-
- MONGO_UNREACHABLE;
-}
-
void ShardingDataTransformCumulativeMetrics::onInsertsDuringCloning(
int64_t count, int64_t bytes, const Milliseconds& elapsedTime) {
_collectionCloningTotalLocalBatchInserts.fetchAndAdd(1);
@@ -417,53 +251,6 @@ void ShardingDataTransformCumulativeMetrics::onInsertsDuringCloning(
durationCount<Milliseconds>(elapsedTime));
}
-void ShardingDataTransformCumulativeMetrics::onLocalInsertDuringOplogFetching(
- const Milliseconds& elapsedTime) {
- _oplogFetchingTotalLocalInserts.fetchAndAdd(1);
- _oplogFetchingTotalLocalInsertTimeMillis.fetchAndAdd(durationCount<Milliseconds>(elapsedTime));
-}
-
-void ShardingDataTransformCumulativeMetrics::onBatchRetrievedDuringOplogApplying(
- const Milliseconds& elapsedTime) {
- _oplogApplyingTotalBatchesRetrieved.fetchAndAdd(1);
- _oplogApplyingTotalBatchesRetrievalTimeMillis.fetchAndAdd(
- durationCount<Milliseconds>(elapsedTime));
-}
-
-const char* ShardingDataTransformCumulativeMetrics::fieldNameFor(
- ShardingDataTransformCumulativeMetrics::DonorStateEnum state) {
- switch (state) {
- case DonorStateEnum::kPreparingToDonate:
- return kCountInstancesInDonorState1PreparingToDonate;
-
- case DonorStateEnum::kDonatingInitialData:
- return kCountInstancesInDonorState2DonatingInitialData;
-
- case DonorStateEnum::kDonatingOplogEntries:
- return kCountInstancesInDonorState3DonatingOplogEntries;
-
- case DonorStateEnum::kPreparingToBlockWrites:
- return kCountInstancesInDonorState4PreparingToBlockWrites;
-
- case DonorStateEnum::kError:
- return kCountInstancesInDonorState5Error;
-
- case DonorStateEnum::kBlockingWrites:
- return kCountInstancesInDonorState6BlockingWrites;
-
- case DonorStateEnum::kDone:
- return kCountInstancesInDonorState7Done;
-
- default:
- uasserted(6438700,
- str::stream()
- << "no field name for donor state " << static_cast<int32_t>(state));
- break;
- }
-
- MONGO_UNREACHABLE;
-}
-
void ShardingDataTransformCumulativeMetrics::onReadDuringCriticalSection() {
_readsDuringCriticalSection.fetchAndAdd(1);
}
@@ -472,88 +259,19 @@ void ShardingDataTransformCumulativeMetrics::onWriteDuringCriticalSection() {
_writesDuringCriticalSection.fetchAndAdd(1);
}
-const char* ShardingDataTransformCumulativeMetrics::fieldNameFor(RecipientStateEnum state) {
- switch (state) {
- case RecipientStateEnum::kAwaitingFetchTimestamp:
- return kCountInstancesInRecipientState1AwaitingFetchTimestamp;
-
- case RecipientStateEnum::kCreatingCollection:
- return kCountInstancesInRecipientState2CreatingCollection;
-
- case RecipientStateEnum::kCloning:
- return kCountInstancesInRecipientState3Cloning;
-
- case RecipientStateEnum::kApplying:
- return kCountInstancesInRecipientState4Applying;
-
- case RecipientStateEnum::kError:
- return kCountInstancesInRecipientState5Error;
-
- case RecipientStateEnum::kStrictConsistency:
- return kCountInstancesInRecipientState6StrictConsistency;
-
- case RecipientStateEnum::kDone:
- return kCountInstancesInRecipientState7Done;
-
- default:
- uasserted(6438900,
- str::stream()
- << "no field name for recipient state " << static_cast<int32_t>(state));
- break;
- }
-
- MONGO_UNREACHABLE;
-}
-
void ShardingDataTransformCumulativeMetrics::onWriteToStashedCollections() {
_writesToStashedCollections.fetchAndAdd(1);
}
-ShardingDataTransformCumulativeMetrics::RecipientStateArray*
-ShardingDataTransformCumulativeMetrics::getStateArrayFor(
- ShardingDataTransformCumulativeMetrics::RecipientStateEnum state) {
- return &_recipientStateList;
-}
-
-const ShardingDataTransformCumulativeMetrics::RecipientStateArray*
-ShardingDataTransformCumulativeMetrics::getStateArrayFor(
- ShardingDataTransformCumulativeMetrics::RecipientStateEnum state) const {
- return &_recipientStateList;
-}
-
-void ShardingDataTransformCumulativeMetrics::onInsertApplied() {
- _insertsApplied.fetchAndAdd(1);
-}
-
-void ShardingDataTransformCumulativeMetrics::onUpdateApplied() {
- _updatesApplied.fetchAndAdd(1);
-}
-
-void ShardingDataTransformCumulativeMetrics::onDeleteApplied() {
- _deletesApplied.fetchAndAdd(1);
-}
-
-void ShardingDataTransformCumulativeMetrics::onOplogEntriesFetched(int64_t numEntries,
- Milliseconds elapsed) {
- _oplogEntriesFetched.fetchAndAdd(numEntries);
- _oplogFetchingTotalRemoteBatchesRetrieved.fetchAndAdd(1);
- _oplogFetchingTotalRemoteBatchesRetrievalTimeMillis.fetchAndAdd(
- durationCount<Milliseconds>(elapsed));
-}
-
-void ShardingDataTransformCumulativeMetrics::onOplogEntriesApplied(int64_t numEntries) {
- _oplogEntriesApplied.fetchAndAdd(numEntries);
-}
-
void ShardingDataTransformCumulativeMetrics::onCloningTotalRemoteBatchRetrieval(
Milliseconds elapsed) {
_totalBatchRetrievedDuringClone.fetchAndAdd(1);
_totalBatchRetrievedDuringCloneMillis.fetchAndAdd(durationCount<Milliseconds>(elapsed));
}
-void ShardingDataTransformCumulativeMetrics::onOplogLocalBatchApplied(Milliseconds elapsed) {
- _oplogBatchApplied.fetchAndAdd(1);
- _oplogBatchAppliedMillis.fetchAndAdd(durationCount<Milliseconds>(elapsed));
+const ShardingDataTransformCumulativeMetricsFieldNameProvider*
+ShardingDataTransformCumulativeMetrics::getFieldNames() const {
+ return _fieldNames.get();
}
ShardingDataTransformCumulativeMetrics::ScopedObserver::ScopedObserver(
diff --git a/src/mongo/db/s/sharding_data_transform_cumulative_metrics.h b/src/mongo/db/s/sharding_data_transform_cumulative_metrics.h
index f09aebce244..814ef0790d4 100644
--- a/src/mongo/db/s/sharding_data_transform_cumulative_metrics.h
+++ b/src/mongo/db/s/sharding_data_transform_cumulative_metrics.h
@@ -43,43 +43,6 @@ namespace mongo {
class ShardingDataTransformCumulativeMetrics {
public:
- enum class CoordinatorStateEnum : int32_t {
- kUnused = -1,
- kInitializing,
- kPreparingToDonate,
- kCloning,
- kApplying,
- kBlockingWrites,
- kAborting,
- kCommitting,
- kDone,
- kNumStates
- };
-
- enum class DonorStateEnum : int32_t {
- kUnused = -1,
- kPreparingToDonate,
- kDonatingInitialData,
- kDonatingOplogEntries,
- kPreparingToBlockWrites,
- kError,
- kBlockingWrites,
- kDone,
- kNumStates
- };
-
- enum class RecipientStateEnum : int32_t {
- kUnused = -1,
- kAwaitingFetchTimestamp,
- kCreatingCollection,
- kCloning,
- kApplying,
- kError,
- kStrictConsistency,
- kDone,
- kNumStates
- };
-
using NameProvider = ShardingDataTransformCumulativeMetricsFieldNameProvider;
using Role = ShardingDataTransformMetrics::Role;
using InstanceObserver = ShardingDataTransformMetricsObserverInterface;
@@ -122,7 +85,10 @@ public:
static ShardingDataTransformCumulativeMetrics* getForResharding(ServiceContext* context);
static ShardingDataTransformCumulativeMetrics* getForGlobalIndexes(ServiceContext* context);
- ShardingDataTransformCumulativeMetrics(const std::string& rootSectionName);
+
+ ShardingDataTransformCumulativeMetrics(const std::string& rootSectionName,
+ std::unique_ptr<NameProvider> fieldNameProvider);
+ virtual ~ShardingDataTransformCumulativeMetrics() = default;
[[nodiscard]] UniqueScopedObserver registerInstanceMetrics(const InstanceObserver* metrics);
int64_t getOldestOperationHighEstimateRemainingTimeMillis(Role role) const;
int64_t getOldestOperationLowEstimateRemainingTimeMillis(Role role) const;
@@ -137,62 +103,26 @@ public:
void setLastOpEndingChunkImbalance(int64_t imbalanceCount);
- /**
- * The before can be boost::none to represent the initial state transition and
- * after can be boost::none to represent cases where it is no longer active.
- */
- template <typename T>
- void onStateTransition(boost::optional<T> before, boost::optional<T> after);
-
void onReadDuringCriticalSection();
void onWriteDuringCriticalSection();
void onWriteToStashedCollections();
- void onInsertApplied();
- void onUpdateApplied();
- void onDeleteApplied();
- void onOplogEntriesFetched(int64_t numEntries, Milliseconds elapsed);
- void onOplogEntriesApplied(int64_t numEntries);
void onCloningTotalRemoteBatchRetrieval(Milliseconds elapsed);
- void onOplogLocalBatchApplied(Milliseconds elapsed);
-
- static const char* fieldNameFor(CoordinatorStateEnum state);
- static const char* fieldNameFor(DonorStateEnum state);
- static const char* fieldNameFor(RecipientStateEnum state);
-
void onInsertsDuringCloning(int64_t count, int64_t bytes, const Milliseconds& elapsedTime);
- void onLocalInsertDuringOplogFetching(const Milliseconds& elapsedTime);
- void onBatchRetrievedDuringOplogApplying(const Milliseconds& elapsedTime);
-private:
- using CoordinatorStateArray =
- std::array<AtomicWord<int64_t>, static_cast<size_t>(CoordinatorStateEnum::kNumStates)>;
- using DonorStateArray =
- std::array<AtomicWord<int64_t>, static_cast<size_t>(DonorStateEnum::kNumStates)>;
- using RecipientStateArray =
- std::array<AtomicWord<int64_t>, static_cast<size_t>(RecipientStateEnum::kNumStates)>;
+protected:
+ const ShardingDataTransformCumulativeMetricsFieldNameProvider* getFieldNames() const;
- void reportActive(BSONObjBuilder* bob) const;
- void reportOldestActive(BSONObjBuilder* bob) const;
- void reportLatencies(BSONObjBuilder* bob) const;
- void reportCurrentInSteps(BSONObjBuilder* bob) const;
+ virtual void reportActive(BSONObjBuilder* bob) const;
+ virtual void reportOldestActive(BSONObjBuilder* bob) const;
+ virtual void reportLatencies(BSONObjBuilder* bob) const;
+ virtual void reportCurrentInSteps(BSONObjBuilder* bob) const;
+private:
MetricsSet& getMetricsSetForRole(Role role);
const MetricsSet& getMetricsSetForRole(Role role) const;
const InstanceObserver* getOldestOperation(WithLock, Role role) const;
- template <typename T>
- const AtomicWord<int64_t>* getStateCounter(T state) const;
- template <typename T>
- AtomicWord<int64_t>* getMutableStateCounter(T state);
-
- CoordinatorStateArray* getStateArrayFor(CoordinatorStateEnum state);
- const CoordinatorStateArray* getStateArrayFor(CoordinatorStateEnum state) const;
- DonorStateArray* getStateArrayFor(DonorStateEnum state);
- const DonorStateArray* getStateArrayFor(DonorStateEnum state) const;
- RecipientStateArray* getStateArrayFor(RecipientStateEnum state);
- const RecipientStateArray* getStateArrayFor(RecipientStateEnum state) const;
-
MetricsSet::iterator insertMetrics(const InstanceObserver* metrics, MetricsSet& set);
void deregisterMetrics(const Role& role, const MetricsSet::iterator& metrics);
@@ -207,16 +137,8 @@ private:
AtomicWord<int64_t> _countFailed{0};
AtomicWord<int64_t> _countCancelled{0};
- AtomicWord<int64_t> _insertsApplied{0};
- AtomicWord<int64_t> _updatesApplied{0};
- AtomicWord<int64_t> _deletesApplied{0};
- AtomicWord<int64_t> _oplogEntriesApplied{0};
- AtomicWord<int64_t> _oplogEntriesFetched{0};
-
AtomicWord<int64_t> _totalBatchRetrievedDuringClone{0};
AtomicWord<int64_t> _totalBatchRetrievedDuringCloneMillis{0};
- AtomicWord<int64_t> _oplogBatchApplied{0};
- AtomicWord<int64_t> _oplogBatchAppliedMillis{0};
AtomicWord<int64_t> _documentsProcessed{0};
AtomicWord<int64_t> _bytesWritten{0};
@@ -224,55 +146,9 @@ private:
AtomicWord<int64_t> _readsDuringCriticalSection{0};
AtomicWord<int64_t> _writesDuringCriticalSection{0};
- CoordinatorStateArray _coordinatorStateList;
- DonorStateArray _donorStateList;
- RecipientStateArray _recipientStateList;
-
AtomicWord<int64_t> _collectionCloningTotalLocalBatchInserts{0};
AtomicWord<int64_t> _collectionCloningTotalLocalInsertTimeMillis{0};
- AtomicWord<int64_t> _oplogFetchingTotalRemoteBatchesRetrieved{0};
- AtomicWord<int64_t> _oplogFetchingTotalRemoteBatchesRetrievalTimeMillis{0};
- AtomicWord<int64_t> _oplogFetchingTotalLocalInserts{0};
- AtomicWord<int64_t> _oplogFetchingTotalLocalInsertTimeMillis{0};
- AtomicWord<int64_t> _oplogApplyingTotalBatchesRetrieved{0};
- AtomicWord<int64_t> _oplogApplyingTotalBatchesRetrievalTimeMillis{0};
AtomicWord<int64_t> _writesToStashedCollections{0};
};
-template <typename T>
-void ShardingDataTransformCumulativeMetrics::onStateTransition(boost::optional<T> before,
- boost::optional<T> after) {
- if (before) {
- if (auto counter = getMutableStateCounter(*before)) {
- counter->fetchAndSubtract(1);
- }
- }
-
- if (after) {
- if (auto counter = getMutableStateCounter(*after)) {
- counter->fetchAndAdd(1);
- }
- }
-}
-
-template <typename T>
-const AtomicWord<int64_t>* ShardingDataTransformCumulativeMetrics::getStateCounter(T state) const {
- if (state == T::kUnused) {
- return nullptr;
- }
-
- invariant(static_cast<size_t>(state) < static_cast<size_t>(T::kNumStates));
- return &((*getStateArrayFor(state))[static_cast<size_t>(state)]);
-}
-
-template <typename T>
-AtomicWord<int64_t>* ShardingDataTransformCumulativeMetrics::getMutableStateCounter(T state) {
- if (state == T::kUnused) {
- return nullptr;
- }
-
- invariant(static_cast<size_t>(state) < static_cast<size_t>(T::kNumStates));
- return &((*getStateArrayFor(state))[static_cast<size_t>(state)]);
-}
-
} // namespace mongo
diff --git a/src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp b/src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp
index 1b07ca9655f..299bddb3b09 100644
--- a/src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp
+++ b/src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp
@@ -79,109 +79,109 @@ public:
};
TEST_F(ShardingDataTransformCumulativeMetricsTest, AddAndRemoveMetrics) {
- auto deregister = _cumulativeMetrics.registerInstanceMetrics(getOldestObserver());
- ASSERT_EQ(_cumulativeMetrics.getObservedMetricsCount(), 1);
+ auto deregister = _cumulativeMetrics->registerInstanceMetrics(getOldestObserver());
+ ASSERT_EQ(_cumulativeMetrics->getObservedMetricsCount(), 1);
deregister.reset();
- ASSERT_EQ(_cumulativeMetrics.getObservedMetricsCount(), 0);
+ ASSERT_EQ(_cumulativeMetrics->getObservedMetricsCount(), 0);
}
-TEST_F(ShardingDataTransformCumulativeMetricsTest, MetricsReportsOldestWhenInsertedFirst) {
- auto deregisterOldest = _cumulativeMetrics.registerInstanceMetrics(getOldestObserver());
- auto deregisterYoungest = _cumulativeMetrics.registerInstanceMetrics(getYoungestObserver());
- ASSERT_EQ(_cumulativeMetrics.getOldestOperationHighEstimateRemainingTimeMillis(
+TEST_F(ShardingDataTransformMetricsTestFixture, MetricsReportsOldestWhenInsertedFirst) {
+ auto deregisterOldest = _cumulativeMetrics->registerInstanceMetrics(getOldestObserver());
+ auto deregisterYoungest = _cumulativeMetrics->registerInstanceMetrics(getYoungestObserver());
+ ASSERT_EQ(_cumulativeMetrics->getOldestOperationHighEstimateRemainingTimeMillis(
ObserverMock::kDefaultRole),
kOldestTimeLeft);
}
-TEST_F(ShardingDataTransformCumulativeMetricsTest, MetricsReportsOldestWhenInsertedLast) {
- auto deregisterYoungest = _cumulativeMetrics.registerInstanceMetrics(getYoungestObserver());
- auto deregisterOldest = _cumulativeMetrics.registerInstanceMetrics(getOldestObserver());
- ASSERT_EQ(_cumulativeMetrics.getOldestOperationHighEstimateRemainingTimeMillis(
+TEST_F(ShardingDataTransformMetricsTestFixture, MetricsReportsOldestWhenInsertedLast) {
+ auto deregisterYoungest = _cumulativeMetrics->registerInstanceMetrics(getYoungestObserver());
+ auto deregisterOldest = _cumulativeMetrics->registerInstanceMetrics(getOldestObserver());
+ ASSERT_EQ(_cumulativeMetrics->getOldestOperationHighEstimateRemainingTimeMillis(
ObserverMock::kDefaultRole),
kOldestTimeLeft);
}
-TEST_F(ShardingDataTransformCumulativeMetricsTest, NoServerStatusWhenNeverUsed) {
+TEST_F(ShardingDataTransformMetricsTestFixture, NoServerStatusWhenNeverUsed) {
BSONObjBuilder bob;
- _cumulativeMetrics.reportForServerStatus(&bob);
+ _cumulativeMetrics->reportForServerStatus(&bob);
auto report = bob.done();
ASSERT_BSONOBJ_EQ(report, BSONObj());
}
-TEST_F(ShardingDataTransformCumulativeMetricsTest, RemainingTimeReports0WhenEmpty) {
- ASSERT_EQ(_cumulativeMetrics.getObservedMetricsCount(), 0);
- ASSERT_EQ(_cumulativeMetrics.getOldestOperationHighEstimateRemainingTimeMillis(
+TEST_F(ShardingDataTransformMetricsTestFixture, RemainingTimeReports0WhenEmpty) {
+ ASSERT_EQ(_cumulativeMetrics->getObservedMetricsCount(), 0);
+ ASSERT_EQ(_cumulativeMetrics->getOldestOperationHighEstimateRemainingTimeMillis(
ObserverMock::kDefaultRole),
0);
}
-TEST_F(ShardingDataTransformCumulativeMetricsTest, UpdatesOldestWhenOldestIsRemoved) {
- auto deregisterYoungest = _cumulativeMetrics.registerInstanceMetrics(getYoungestObserver());
- auto deregisterOldest = _cumulativeMetrics.registerInstanceMetrics(getOldestObserver());
- ASSERT_EQ(_cumulativeMetrics.getOldestOperationHighEstimateRemainingTimeMillis(
+TEST_F(ShardingDataTransformMetricsTestFixture, UpdatesOldestWhenOldestIsRemoved) {
+ auto deregisterYoungest = _cumulativeMetrics->registerInstanceMetrics(getYoungestObserver());
+ auto deregisterOldest = _cumulativeMetrics->registerInstanceMetrics(getOldestObserver());
+ ASSERT_EQ(_cumulativeMetrics->getOldestOperationHighEstimateRemainingTimeMillis(
ObserverMock::kDefaultRole),
kOldestTimeLeft);
deregisterOldest.reset();
- ASSERT_EQ(_cumulativeMetrics.getOldestOperationHighEstimateRemainingTimeMillis(
+ ASSERT_EQ(_cumulativeMetrics->getOldestOperationHighEstimateRemainingTimeMillis(
ObserverMock::kDefaultRole),
kYoungestTimeLeft);
}
-TEST_F(ShardingDataTransformCumulativeMetricsTest, InsertsTwoWithSameStartTime) {
- auto deregisterOldest = _cumulativeMetrics.registerInstanceMetrics(getOldestObserver());
+TEST_F(ShardingDataTransformMetricsTestFixture, InsertsTwoWithSameStartTime) {
+ auto deregisterOldest = _cumulativeMetrics->registerInstanceMetrics(getOldestObserver());
ObserverMock sameAsOldest{kOldestTime, kOldestTimeLeft};
- auto deregisterOldest2 = _cumulativeMetrics.registerInstanceMetrics(&sameAsOldest);
- ASSERT_EQ(_cumulativeMetrics.getObservedMetricsCount(), 2);
- ASSERT_EQ(_cumulativeMetrics.getOldestOperationHighEstimateRemainingTimeMillis(
+ auto deregisterOldest2 = _cumulativeMetrics->registerInstanceMetrics(&sameAsOldest);
+ ASSERT_EQ(_cumulativeMetrics->getObservedMetricsCount(), 2);
+ ASSERT_EQ(_cumulativeMetrics->getOldestOperationHighEstimateRemainingTimeMillis(
ObserverMock::kDefaultRole),
kOldestTimeLeft);
}
-TEST_F(ShardingDataTransformCumulativeMetricsTest, StillReportsOldestAfterRandomOperations) {
+TEST_F(ShardingDataTransformMetricsTestFixture, StillReportsOldestAfterRandomOperations) {
doRandomOperationsTest<ScopedObserverMock>();
}
-TEST_F(ShardingDataTransformCumulativeMetricsTest,
+TEST_F(ShardingDataTransformMetricsTestFixture,
StillReportsOldestAfterRandomOperationsMultithreaded) {
doRandomOperationsMultithreadedTest<ScopedObserverMock>();
}
-TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportsOldestByRole) {
+TEST_F(ShardingDataTransformMetricsTestFixture, ReportsOldestByRole) {
using Role = ShardingDataTransformMetrics::Role;
auto& metrics = _cumulativeMetrics;
ObserverMock oldDonor{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kDonor};
ObserverMock youngDonor{Date_t::fromMillisSinceEpoch(200), 200, 200, Role::kDonor};
ObserverMock oldRecipient{Date_t::fromMillisSinceEpoch(300), 300, 300, Role::kRecipient};
ObserverMock youngRecipient{Date_t::fromMillisSinceEpoch(400), 400, 400, Role::kRecipient};
- auto removeOldD = metrics.registerInstanceMetrics(&oldDonor);
- auto removeYoungD = metrics.registerInstanceMetrics(&youngDonor);
- auto removeOldR = metrics.registerInstanceMetrics(&oldRecipient);
- auto removeYoungR = metrics.registerInstanceMetrics(&youngRecipient);
-
- ASSERT_EQ(metrics.getObservedMetricsCount(), 4);
- ASSERT_EQ(metrics.getObservedMetricsCount(Role::kDonor), 2);
- ASSERT_EQ(metrics.getObservedMetricsCount(Role::kRecipient), 2);
- ASSERT_EQ(metrics.getOldestOperationHighEstimateRemainingTimeMillis(Role::kDonor), 100);
- ASSERT_EQ(metrics.getOldestOperationHighEstimateRemainingTimeMillis(Role::kRecipient), 300);
+ auto removeOldD = metrics->registerInstanceMetrics(&oldDonor);
+ auto removeYoungD = metrics->registerInstanceMetrics(&youngDonor);
+ auto removeOldR = metrics->registerInstanceMetrics(&oldRecipient);
+ auto removeYoungR = metrics->registerInstanceMetrics(&youngRecipient);
+
+ ASSERT_EQ(metrics->getObservedMetricsCount(), 4);
+ ASSERT_EQ(metrics->getObservedMetricsCount(Role::kDonor), 2);
+ ASSERT_EQ(metrics->getObservedMetricsCount(Role::kRecipient), 2);
+ ASSERT_EQ(metrics->getOldestOperationHighEstimateRemainingTimeMillis(Role::kDonor), 100);
+ ASSERT_EQ(metrics->getOldestOperationHighEstimateRemainingTimeMillis(Role::kRecipient), 300);
removeOldD.reset();
- ASSERT_EQ(metrics.getObservedMetricsCount(), 3);
- ASSERT_EQ(metrics.getObservedMetricsCount(Role::kDonor), 1);
- ASSERT_EQ(metrics.getOldestOperationHighEstimateRemainingTimeMillis(Role::kDonor), 200);
+ ASSERT_EQ(metrics->getObservedMetricsCount(), 3);
+ ASSERT_EQ(metrics->getObservedMetricsCount(Role::kDonor), 1);
+ ASSERT_EQ(metrics->getOldestOperationHighEstimateRemainingTimeMillis(Role::kDonor), 200);
removeOldR.reset();
- ASSERT_EQ(metrics.getObservedMetricsCount(), 2);
- ASSERT_EQ(metrics.getObservedMetricsCount(Role::kRecipient), 1);
- ASSERT_EQ(metrics.getOldestOperationHighEstimateRemainingTimeMillis(Role::kRecipient), 400);
+ ASSERT_EQ(metrics->getObservedMetricsCount(), 2);
+ ASSERT_EQ(metrics->getObservedMetricsCount(Role::kRecipient), 1);
+ ASSERT_EQ(metrics->getOldestOperationHighEstimateRemainingTimeMillis(Role::kRecipient), 400);
}
-TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsTimeEstimates) {
+TEST_F(ShardingDataTransformMetricsTestFixture, ReportContainsTimeEstimates) {
using Role = ShardingDataTransformMetrics::Role;
ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient};
ObserverMock coordinator{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kCoordinator};
- auto recipientObserver = _cumulativeMetrics.registerInstanceMetrics(&recipient);
- auto coordinatorObserver = _cumulativeMetrics.registerInstanceMetrics(&coordinator);
+ auto recipientObserver = _cumulativeMetrics->registerInstanceMetrics(&recipient);
+ auto coordinatorObserver = _cumulativeMetrics->registerInstanceMetrics(&coordinator);
BSONObjBuilder bob;
- _cumulativeMetrics.reportForServerStatus(&bob);
+ _cumulativeMetrics->reportForServerStatus(&bob);
auto report = bob.done();
auto section = report.getObjectField(kTestMetricsName).getObjectField("oldestActive");
ASSERT_EQ(section.getIntField("recipientRemainingOperationTimeEstimatedMillis"), 100);
@@ -193,722 +193,211 @@ TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsTimeEstimates)
300);
}
-TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsRunCount) {
+TEST_F(ShardingDataTransformMetricsTestFixture, ReportContainsRunCount) {
using Role = ShardingDataTransformMetrics::Role;
ObserverMock coordinator{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kCoordinator};
- auto ignore = _cumulativeMetrics.registerInstanceMetrics(&coordinator);
+ auto ignore = _cumulativeMetrics->registerInstanceMetrics(&coordinator);
{
BSONObjBuilder bob;
- _cumulativeMetrics.reportForServerStatus(&bob);
+ _cumulativeMetrics->reportForServerStatus(&bob);
auto report = bob.done();
ASSERT_EQ(report.getObjectField(kTestMetricsName).getIntField("countStarted"), 0);
}
- _cumulativeMetrics.onStarted();
+ _cumulativeMetrics->onStarted();
{
BSONObjBuilder bob;
- _cumulativeMetrics.reportForServerStatus(&bob);
+ _cumulativeMetrics->reportForServerStatus(&bob);
auto report = bob.done();
ASSERT_EQ(report.getObjectField(kTestMetricsName).getIntField("countStarted"), 1);
}
}
-TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsSucceededCount) {
+TEST_F(ShardingDataTransformMetricsTestFixture, ReportContainsSucceededCount) {
using Role = ShardingDataTransformMetrics::Role;
ObserverMock coordinator{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kCoordinator};
- auto ignore = _cumulativeMetrics.registerInstanceMetrics(&coordinator);
+ auto ignore = _cumulativeMetrics->registerInstanceMetrics(&coordinator);
{
BSONObjBuilder bob;
- _cumulativeMetrics.reportForServerStatus(&bob);
+ _cumulativeMetrics->reportForServerStatus(&bob);
auto report = bob.done();
ASSERT_EQ(report.getObjectField(kTestMetricsName).getIntField("countSucceeded"), 0);
}
- _cumulativeMetrics.onSuccess();
+ _cumulativeMetrics->onSuccess();
{
BSONObjBuilder bob;
- _cumulativeMetrics.reportForServerStatus(&bob);
+ _cumulativeMetrics->reportForServerStatus(&bob);
auto report = bob.done();
ASSERT_EQ(report.getObjectField(kTestMetricsName).getIntField("countSucceeded"), 1);
}
}
-TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsFailedCount) {
+TEST_F(ShardingDataTransformMetricsTestFixture, ReportContainsFailedCount) {
using Role = ShardingDataTransformMetrics::Role;
ObserverMock coordinator{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kCoordinator};
- auto ignore = _cumulativeMetrics.registerInstanceMetrics(&coordinator);
+ auto ignore = _cumulativeMetrics->registerInstanceMetrics(&coordinator);
{
BSONObjBuilder bob;
- _cumulativeMetrics.reportForServerStatus(&bob);
+ _cumulativeMetrics->reportForServerStatus(&bob);
auto report = bob.done();
ASSERT_EQ(report.getObjectField(kTestMetricsName).getIntField("countFailed"), 0);
}
- _cumulativeMetrics.onFailure();
+ _cumulativeMetrics->onFailure();
{
BSONObjBuilder bob;
- _cumulativeMetrics.reportForServerStatus(&bob);
+ _cumulativeMetrics->reportForServerStatus(&bob);
auto report = bob.done();
ASSERT_EQ(report.getObjectField(kTestMetricsName).getIntField("countFailed"), 1);
}
}
-TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsCanceledCount) {
+TEST_F(ShardingDataTransformMetricsTestFixture, ReportContainsCanceledCount) {
using Role = ShardingDataTransformMetrics::Role;
ObserverMock coordinator{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kCoordinator};
- auto ignore = _cumulativeMetrics.registerInstanceMetrics(&coordinator);
+ auto ignore = _cumulativeMetrics->registerInstanceMetrics(&coordinator);
{
BSONObjBuilder bob;
- _cumulativeMetrics.reportForServerStatus(&bob);
+ _cumulativeMetrics->reportForServerStatus(&bob);
auto report = bob.done();
ASSERT_EQ(report.getObjectField(kTestMetricsName).getIntField("countCanceled"), 0);
}
- _cumulativeMetrics.onCanceled();
+ _cumulativeMetrics->onCanceled();
{
BSONObjBuilder bob;
- _cumulativeMetrics.reportForServerStatus(&bob);
+ _cumulativeMetrics->reportForServerStatus(&bob);
auto report = bob.done();
ASSERT_EQ(report.getObjectField(kTestMetricsName).getIntField("countCanceled"), 1);
}
}
-TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsLastChunkImbalanceCount) {
+TEST_F(ShardingDataTransformMetricsTestFixture, ReportContainsLastChunkImbalanceCount) {
using Role = ShardingDataTransformMetrics::Role;
ObserverMock coordinator{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kCoordinator};
- auto ignore = _cumulativeMetrics.registerInstanceMetrics(&coordinator);
+ auto ignore = _cumulativeMetrics->registerInstanceMetrics(&coordinator);
{
BSONObjBuilder bob;
- _cumulativeMetrics.reportForServerStatus(&bob);
+ _cumulativeMetrics->reportForServerStatus(&bob);
auto report = bob.done();
ASSERT_EQ(report.getObjectField(kTestMetricsName).getIntField("lastOpEndingChunkImbalance"),
0);
}
- _cumulativeMetrics.setLastOpEndingChunkImbalance(111);
+ _cumulativeMetrics->setLastOpEndingChunkImbalance(111);
{
BSONObjBuilder bob;
- _cumulativeMetrics.reportForServerStatus(&bob);
+ _cumulativeMetrics->reportForServerStatus(&bob);
auto report = bob.done();
ASSERT_EQ(report.getObjectField(kTestMetricsName).getIntField("lastOpEndingChunkImbalance"),
111);
}
- _cumulativeMetrics.setLastOpEndingChunkImbalance(777);
+ _cumulativeMetrics->setLastOpEndingChunkImbalance(777);
{
BSONObjBuilder bob;
- _cumulativeMetrics.reportForServerStatus(&bob);
+ _cumulativeMetrics->reportForServerStatus(&bob);
auto report = bob.done();
ASSERT_EQ(report.getObjectField(kTestMetricsName).getIntField("lastOpEndingChunkImbalance"),
777);
}
}
-TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsInsertsDuringCloning) {
+TEST_F(ShardingDataTransformMetricsTestFixture, ReportContainsInsertsDuringCloning) {
using Role = ShardingDataTransformMetrics::Role;
ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient};
- auto ignore = _cumulativeMetrics.registerInstanceMetrics(&recipient);
+ auto ignore = _cumulativeMetrics->registerInstanceMetrics(&recipient);
- auto latencySection = getLatencySection(_cumulativeMetrics);
+ auto latencySection = getLatencySection(kTestMetricsName, _cumulativeMetrics.get());
ASSERT_EQ(latencySection.getIntField("collectionCloningTotalLocalInserts"), 0);
ASSERT_EQ(latencySection.getIntField("collectionCloningTotalLocalInsertTimeMillis"), 0);
- auto activeSection = getActiveSection(_cumulativeMetrics);
+ auto activeSection = getActiveSection(kTestMetricsName, _cumulativeMetrics.get());
ASSERT_EQ(activeSection.getIntField("documentsCopied"), 0);
ASSERT_EQ(activeSection.getIntField("bytesCopied"), 0);
- _cumulativeMetrics.onInsertsDuringCloning(140, 20763, Milliseconds(15));
+ _cumulativeMetrics->onInsertsDuringCloning(140, 20763, Milliseconds(15));
- latencySection = getLatencySection(_cumulativeMetrics);
+ latencySection = getLatencySection(kTestMetricsName, _cumulativeMetrics.get());
ASSERT_EQ(latencySection.getIntField("collectionCloningTotalLocalInserts"), 1);
ASSERT_EQ(latencySection.getIntField("collectionCloningTotalLocalInsertTimeMillis"), 15);
- activeSection = getActiveSection(_cumulativeMetrics);
+ activeSection = getActiveSection(kTestMetricsName, _cumulativeMetrics.get());
ASSERT_EQ(activeSection.getIntField("documentsCopied"), 140);
ASSERT_EQ(activeSection.getIntField("bytesCopied"), 20763);
}
-TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsInsertsDuringFetching) {
- using Role = ShardingDataTransformMetrics::Role;
- ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient};
- auto ignore = _cumulativeMetrics.registerInstanceMetrics(&recipient);
-
- auto latencySection = getLatencySection(_cumulativeMetrics);
- ASSERT_EQ(latencySection.getIntField("oplogFetchingTotalLocalInserts"), 0);
- ASSERT_EQ(latencySection.getIntField("oplogFetchingTotalLocalInsertTimeMillis"), 0);
-
- _cumulativeMetrics.onLocalInsertDuringOplogFetching(Milliseconds(17));
-
- latencySection = getLatencySection(_cumulativeMetrics);
- ASSERT_EQ(latencySection.getIntField("oplogFetchingTotalLocalInserts"), 1);
- ASSERT_EQ(latencySection.getIntField("oplogFetchingTotalLocalInsertTimeMillis"), 17);
-}
-
-TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsBatchRetrievedDuringApplying) {
- using Role = ShardingDataTransformMetrics::Role;
- ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient};
- auto ignore = _cumulativeMetrics.registerInstanceMetrics(&recipient);
-
- auto latencySection = getLatencySection(_cumulativeMetrics);
- ASSERT_EQ(latencySection.getIntField("oplogApplyingTotalLocalBatchesRetrieved"), 0);
- ASSERT_EQ(latencySection.getIntField("oplogApplyingTotalLocalBatchRetrievalTimeMillis"), 0);
-
- _cumulativeMetrics.onBatchRetrievedDuringOplogApplying(Milliseconds(39));
-
- latencySection = getLatencySection(_cumulativeMetrics);
- ASSERT_EQ(latencySection.getIntField("oplogApplyingTotalLocalBatchesRetrieved"), 1);
- ASSERT_EQ(latencySection.getIntField("oplogApplyingTotalLocalBatchRetrievalTimeMillis"), 39);
-}
-
-TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsReadDuringCriticalSection) {
+TEST_F(ShardingDataTransformMetricsTestFixture, ReportContainsReadDuringCriticalSection) {
using Role = ShardingDataTransformMetrics::Role;
ObserverMock donor{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kDonor};
- auto ignore = _cumulativeMetrics.registerInstanceMetrics(&donor);
+ auto ignore = _cumulativeMetrics->registerInstanceMetrics(&donor);
- auto activeSection = getActiveSection(_cumulativeMetrics);
+ auto activeSection = getActiveSection(kTestMetricsName, _cumulativeMetrics.get());
ASSERT_EQ(activeSection.getIntField("countReadsDuringCriticalSection"), 0);
- _cumulativeMetrics.onReadDuringCriticalSection();
+ _cumulativeMetrics->onReadDuringCriticalSection();
- activeSection = getActiveSection(_cumulativeMetrics);
+ activeSection = getActiveSection(kTestMetricsName, _cumulativeMetrics.get());
ASSERT_EQ(activeSection.getIntField("countReadsDuringCriticalSection"), 1);
}
-TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsWriteDuringCriticalSection) {
+TEST_F(ShardingDataTransformMetricsTestFixture, ReportContainsWriteDuringCriticalSection) {
using Role = ShardingDataTransformMetrics::Role;
ObserverMock donor{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kDonor};
- auto ignore = _cumulativeMetrics.registerInstanceMetrics(&donor);
+ auto ignore = _cumulativeMetrics->registerInstanceMetrics(&donor);
- auto activeSection = getActiveSection(_cumulativeMetrics);
+ auto activeSection = getActiveSection(kTestMetricsName, _cumulativeMetrics.get());
ASSERT_EQ(activeSection.getIntField("countWritesDuringCriticalSection"), 0);
- _cumulativeMetrics.onWriteDuringCriticalSection();
+ _cumulativeMetrics->onWriteDuringCriticalSection();
- activeSection = getActiveSection(_cumulativeMetrics);
+ activeSection = getActiveSection(kTestMetricsName, _cumulativeMetrics.get());
ASSERT_EQ(activeSection.getIntField("countWritesDuringCriticalSection"), 1);
}
-TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsWriteToStashedCollection) {
+TEST_F(ShardingDataTransformMetricsTestFixture, ReportContainsWriteToStashedCollection) {
using Role = ShardingDataTransformMetrics::Role;
ObserverMock recipient{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kRecipient};
- auto ignore = _cumulativeMetrics.registerInstanceMetrics(&recipient);
+ auto ignore = _cumulativeMetrics->registerInstanceMetrics(&recipient);
- auto activeSection = getActiveSection(_cumulativeMetrics);
+ auto activeSection = getActiveSection(kTestMetricsName, _cumulativeMetrics.get());
ASSERT_EQ(activeSection.getIntField("countWritesToStashCollections"), 0);
- _cumulativeMetrics.onWriteToStashedCollections();
+ _cumulativeMetrics->onWriteToStashedCollections();
- activeSection = getActiveSection(_cumulativeMetrics);
+ activeSection = getActiveSection(kTestMetricsName, _cumulativeMetrics.get());
ASSERT_EQ(activeSection.getIntField("countWritesToStashCollections"), 1);
}
-TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsBatchRetrievedDuringCloning) {
+TEST_F(ShardingDataTransformMetricsTestFixture, ReportContainsBatchRetrievedDuringCloning) {
using Role = ShardingDataTransformMetrics::Role;
ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient};
- auto ignore = _cumulativeMetrics.registerInstanceMetrics(&recipient);
+ auto ignore = _cumulativeMetrics->registerInstanceMetrics(&recipient);
- auto latencySection = getLatencySection(_cumulativeMetrics);
+ auto latencySection = getLatencySection(kTestMetricsName, _cumulativeMetrics.get());
ASSERT_EQ(latencySection.getIntField("collectionCloningTotalRemoteBatchesRetrieved"), 0);
ASSERT_EQ(latencySection.getIntField("collectionCloningTotalRemoteBatchRetrievalTimeMillis"),
0);
- _cumulativeMetrics.onCloningTotalRemoteBatchRetrieval(Milliseconds(19));
+ _cumulativeMetrics->onCloningTotalRemoteBatchRetrieval(Milliseconds(19));
- latencySection = getLatencySection(_cumulativeMetrics);
+ latencySection = getLatencySection(kTestMetricsName, _cumulativeMetrics.get());
ASSERT_EQ(latencySection.getIntField("collectionCloningTotalRemoteBatchesRetrieved"), 1);
ASSERT_EQ(latencySection.getIntField("collectionCloningTotalRemoteBatchRetrievalTimeMillis"),
19);
}
-TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsBatchApplied) {
- using Role = ShardingDataTransformMetrics::Role;
- ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient};
- auto ignore = _cumulativeMetrics.registerInstanceMetrics(&recipient);
-
- auto latencySection = getLatencySection(_cumulativeMetrics);
- ASSERT_EQ(latencySection.getIntField("oplogApplyingTotalLocalBatchesApplied"), 0);
- ASSERT_EQ(latencySection.getIntField("oplogApplyingTotalLocalBatchApplyTimeMillis"), 0);
-
- _cumulativeMetrics.onOplogLocalBatchApplied(Milliseconds(333));
-
- latencySection = getLatencySection(_cumulativeMetrics);
- ASSERT_EQ(latencySection.getIntField("oplogApplyingTotalLocalBatchesApplied"), 1);
- ASSERT_EQ(latencySection.getIntField("oplogApplyingTotalLocalBatchApplyTimeMillis"), 333);
-}
-
-TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsInsertsApplied) {
- using Role = ShardingDataTransformMetrics::Role;
- ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient};
- auto ignore = _cumulativeMetrics.registerInstanceMetrics(&recipient);
-
- auto activeSection = getActiveSection(_cumulativeMetrics);
- ASSERT_EQ(activeSection.getIntField("insertsApplied"), 0);
-
- _cumulativeMetrics.onInsertApplied();
-
- activeSection = getActiveSection(_cumulativeMetrics);
- ASSERT_EQ(activeSection.getIntField("insertsApplied"), 1);
-}
-
-TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsUpdatesApplied) {
- using Role = ShardingDataTransformMetrics::Role;
- ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient};
- auto ignore = _cumulativeMetrics.registerInstanceMetrics(&recipient);
-
- auto activeSection = getActiveSection(_cumulativeMetrics);
- ASSERT_EQ(activeSection.getIntField("updatesApplied"), 0);
-
- _cumulativeMetrics.onUpdateApplied();
-
- activeSection = getActiveSection(_cumulativeMetrics);
- ASSERT_EQ(activeSection.getIntField("updatesApplied"), 1);
-}
-
-TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsDeletesApplied) {
- using Role = ShardingDataTransformMetrics::Role;
- ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient};
- auto ignore = _cumulativeMetrics.registerInstanceMetrics(&recipient);
-
- auto activeSection = getActiveSection(_cumulativeMetrics);
- ASSERT_EQ(activeSection.getIntField("deletesApplied"), 0);
-
- _cumulativeMetrics.onDeleteApplied();
-
- activeSection = getActiveSection(_cumulativeMetrics);
- ASSERT_EQ(activeSection.getIntField("deletesApplied"), 1);
-}
-
-TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsOplogEntriesFetched) {
- using Role = ShardingDataTransformMetrics::Role;
- ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient};
- auto ignore = _cumulativeMetrics.registerInstanceMetrics(&recipient);
-
- auto activeSection = getActiveSection(_cumulativeMetrics);
- ASSERT_EQ(activeSection.getIntField("oplogEntriesFetched"), 0);
-
- auto latencySection = getLatencySection(_cumulativeMetrics);
- ASSERT_EQ(latencySection.getIntField("oplogFetchingTotalRemoteBatchesRetrieved"), 0);
- ASSERT_EQ(latencySection.getIntField("oplogFetchingTotalRemoteBatchRetrievalTimeMillis"), 0);
-
- _cumulativeMetrics.onOplogEntriesFetched(123, Milliseconds(43));
-
- activeSection = getActiveSection(_cumulativeMetrics);
- ASSERT_EQ(activeSection.getIntField("oplogEntriesFetched"), 123);
-
- latencySection = getLatencySection(_cumulativeMetrics);
- ASSERT_EQ(latencySection.getIntField("oplogFetchingTotalRemoteBatchesRetrieved"), 1);
- ASSERT_EQ(latencySection.getIntField("oplogFetchingTotalRemoteBatchRetrievalTimeMillis"), 43);
-}
-
-TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsOplogEntriesApplied) {
- using Role = ShardingDataTransformMetrics::Role;
- ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient};
- auto ignore = _cumulativeMetrics.registerInstanceMetrics(&recipient);
-
- auto activeSection = getActiveSection(_cumulativeMetrics);
- ASSERT_EQ(activeSection.getIntField("oplogEntriesApplied"), 0);
-
- _cumulativeMetrics.onOplogEntriesApplied(99);
-
- activeSection = getActiveSection(_cumulativeMetrics);
- ASSERT_EQ(activeSection.getIntField("oplogEntriesApplied"), 99);
-}
-
-class ShardingDataTransformCumulativeStateTest : public ShardingDataTransformCumulativeMetricsTest {
-public:
- using CoordinatorStateEnum = ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum;
- using DonorStateEnum = ShardingDataTransformCumulativeMetrics::DonorStateEnum;
- using RecipientStateEnum = ShardingDataTransformCumulativeMetrics::RecipientStateEnum;
-
- BSONObj getStateSubObj(const ShardingDataTransformCumulativeMetrics& metrics) {
- BSONObjBuilder bob;
- metrics.reportForServerStatus(&bob);
- auto report = bob.done();
- return report.getObjectField(kTestMetricsName).getObjectField("currentInSteps").getOwned();
- }
-
- bool checkCoordinateStateField(const ShardingDataTransformCumulativeMetrics& metrics,
- boost::optional<CoordinatorStateEnum> expectedState) {
- auto serverStatusSubObj = getStateSubObj(metrics);
- std::map<std::string, int> expectedStateFieldCount;
-
- auto addExpectedField = [&](CoordinatorStateEnum stateToPopulate) {
- expectedStateFieldCount.emplace(
- ShardingDataTransformCumulativeMetrics::fieldNameFor(stateToPopulate),
- ((expectedState && (stateToPopulate == expectedState)) ? 1 : 0));
- };
-
- addExpectedField(CoordinatorStateEnum::kInitializing);
- addExpectedField(CoordinatorStateEnum::kPreparingToDonate);
- addExpectedField(CoordinatorStateEnum::kCloning);
- addExpectedField(CoordinatorStateEnum::kApplying);
- addExpectedField(CoordinatorStateEnum::kBlockingWrites);
- addExpectedField(CoordinatorStateEnum::kAborting);
- addExpectedField(CoordinatorStateEnum::kCommitting);
-
- for (const auto& fieldNameAndState : expectedStateFieldCount) {
- const auto actualValue = serverStatusSubObj.getIntField(fieldNameAndState.first);
- if (actualValue != fieldNameAndState.second) {
- LOGV2_DEBUG(6438600,
- 0,
- "coordinator state field value does not match expected value",
- "field"_attr = fieldNameAndState.first,
- "serverStatus"_attr = serverStatusSubObj);
- return false;
- }
- }
-
- return true;
- }
-
- bool checkDonorStateField(const ShardingDataTransformCumulativeMetrics& metrics,
- boost::optional<DonorStateEnum> expectedState) {
- auto serverStatusSubObj = getStateSubObj(metrics);
- std::map<std::string, int> expectedStateFieldCount;
-
- auto addExpectedField = [&](DonorStateEnum stateToPopulate) {
- expectedStateFieldCount.emplace(
- ShardingDataTransformCumulativeMetrics::fieldNameFor(stateToPopulate),
- ((expectedState && (stateToPopulate == expectedState)) ? 1 : 0));
- };
-
- addExpectedField(DonorStateEnum::kPreparingToDonate);
- addExpectedField(DonorStateEnum::kDonatingInitialData);
- addExpectedField(DonorStateEnum::kDonatingOplogEntries);
- addExpectedField(DonorStateEnum::kPreparingToBlockWrites);
- addExpectedField(DonorStateEnum::kError);
- addExpectedField(DonorStateEnum::kBlockingWrites);
- addExpectedField(DonorStateEnum::kDone);
-
- for (const auto& fieldNameAndState : expectedStateFieldCount) {
- const auto actualValue = serverStatusSubObj.getIntField(fieldNameAndState.first);
- if (actualValue != fieldNameAndState.second) {
- LOGV2_DEBUG(6438701,
- 0,
- "Donor state field value does not match expected value",
- "field"_attr = fieldNameAndState.first,
- "serverStatus"_attr = serverStatusSubObj);
- return false;
- }
- }
-
- return true;
- }
-
- bool checkRecipientStateField(const ShardingDataTransformCumulativeMetrics& metrics,
- boost::optional<RecipientStateEnum> expectedState) {
- auto serverStatusSubObj = getStateSubObj(metrics);
- std::map<std::string, int> expectedStateFieldCount;
-
- auto addExpectedField = [&](RecipientStateEnum stateToPopulate) {
- expectedStateFieldCount.emplace(
- ShardingDataTransformCumulativeMetrics::fieldNameFor(stateToPopulate),
- ((expectedState && (stateToPopulate == expectedState)) ? 1 : 0));
- };
-
- addExpectedField(RecipientStateEnum::kAwaitingFetchTimestamp);
- addExpectedField(RecipientStateEnum::kCreatingCollection);
- addExpectedField(RecipientStateEnum::kCloning);
- addExpectedField(RecipientStateEnum::kApplying);
- addExpectedField(RecipientStateEnum::kError);
- addExpectedField(RecipientStateEnum::kStrictConsistency);
- addExpectedField(RecipientStateEnum::kDone);
-
- for (const auto& fieldNameAndState : expectedStateFieldCount) {
- const auto actualValue = serverStatusSubObj.getIntField(fieldNameAndState.first);
- if (actualValue != fieldNameAndState.second) {
- LOGV2_DEBUG(6438901,
- 0,
- "Recipient state field value does not match expected value",
- "field"_attr = fieldNameAndState.first,
- "serverStatus"_attr = serverStatusSubObj);
- return false;
- }
- }
-
- return true;
- }
-};
-
-TEST_F(ShardingDataTransformCumulativeStateTest,
- SimulatedNormalCoordinatorStateTransitionReportsStateCorrectly) {
- using Role = ShardingDataTransformMetrics::Role;
- ObserverMock coordinator{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kCoordinator};
- auto ignore = _cumulativeMetrics.registerInstanceMetrics(&coordinator);
-
- ASSERT(checkCoordinateStateField(_cumulativeMetrics, CoordinatorStateEnum::kUnused));
-
- boost::optional<CoordinatorStateEnum> prevState;
- boost::optional<CoordinatorStateEnum> nextState;
-
- auto simulateTransitionTo = [&](boost::optional<CoordinatorStateEnum> newState) {
- prevState = nextState;
- nextState = newState;
- _cumulativeMetrics.onStateTransition(prevState, nextState);
- return checkCoordinateStateField(_cumulativeMetrics, nextState);
- };
-
- ASSERT(simulateTransitionTo(CoordinatorStateEnum::kUnused));
- ASSERT(simulateTransitionTo(CoordinatorStateEnum::kInitializing));
- ASSERT(simulateTransitionTo(CoordinatorStateEnum::kPreparingToDonate));
- ASSERT(simulateTransitionTo(CoordinatorStateEnum::kCloning));
- ASSERT(simulateTransitionTo(CoordinatorStateEnum::kBlockingWrites));
- ASSERT(simulateTransitionTo(CoordinatorStateEnum::kCommitting));
- ASSERT(simulateTransitionTo(CoordinatorStateEnum::kDone));
- ASSERT(simulateTransitionTo(boost::none));
-}
-
-TEST_F(ShardingDataTransformCumulativeStateTest,
- SimulatedAbortedCoordinatorStateTransitionReportsStateCorrectly) {
- using Role = ShardingDataTransformMetrics::Role;
- ObserverMock coordinator{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kCoordinator};
- auto ignore = _cumulativeMetrics.registerInstanceMetrics(&coordinator);
-
- ASSERT(checkCoordinateStateField(_cumulativeMetrics, CoordinatorStateEnum::kUnused));
-
- boost::optional<CoordinatorStateEnum> prevState;
- boost::optional<CoordinatorStateEnum> nextState;
-
- auto simulateTransitionTo = [&](boost::optional<CoordinatorStateEnum> newState) {
- prevState = nextState;
- nextState = newState;
- _cumulativeMetrics.onStateTransition(prevState, nextState);
- return checkCoordinateStateField(_cumulativeMetrics, nextState);
- };
-
- ASSERT(simulateTransitionTo(CoordinatorStateEnum::kUnused));
- ASSERT(simulateTransitionTo(CoordinatorStateEnum::kInitializing));
- ASSERT(simulateTransitionTo(CoordinatorStateEnum::kPreparingToDonate));
- ASSERT(simulateTransitionTo(CoordinatorStateEnum::kAborting));
- ASSERT(simulateTransitionTo(boost::none));
-}
-
-TEST_F(ShardingDataTransformCumulativeStateTest,
- SimulatedSteppedDownCoordinatorStateFromUnusedReportsStateCorrectly) {
- using Role = ShardingDataTransformMetrics::Role;
- ObserverMock coordinator{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kCoordinator};
- auto ignore = _cumulativeMetrics.registerInstanceMetrics(&coordinator);
-
- boost::optional<CoordinatorStateEnum> initState = CoordinatorStateEnum::kUnused;
- ASSERT(checkCoordinateStateField(_cumulativeMetrics, initState));
-
- _cumulativeMetrics.onStateTransition(initState, {boost::none});
- ASSERT(checkCoordinateStateField(_cumulativeMetrics, initState));
-}
-
-TEST_F(ShardingDataTransformCumulativeStateTest,
- SimulatedSteppedDownCoordinatorStateTransitionReportsStateCorrectly) {
- using Role = ShardingDataTransformMetrics::Role;
- ObserverMock coordinator{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kCoordinator};
- auto ignore = _cumulativeMetrics.registerInstanceMetrics(&coordinator);
-
- ASSERT(checkCoordinateStateField(_cumulativeMetrics, CoordinatorStateEnum::kUnused));
-
- boost::optional<CoordinatorStateEnum> prevState;
- boost::optional<CoordinatorStateEnum> nextState;
-
- auto simulateTransitionTo = [&](boost::optional<CoordinatorStateEnum> newState) {
- prevState = nextState;
- nextState = newState;
- _cumulativeMetrics.onStateTransition(prevState, nextState);
- return checkCoordinateStateField(_cumulativeMetrics, nextState);
- };
-
- ASSERT(simulateTransitionTo(CoordinatorStateEnum::kUnused));
- ASSERT(simulateTransitionTo(CoordinatorStateEnum::kInitializing));
- ASSERT(simulateTransitionTo(CoordinatorStateEnum::kPreparingToDonate));
- ASSERT(simulateTransitionTo(boost::none));
-}
-
-TEST_F(ShardingDataTransformCumulativeStateTest,
- SimulatedNormalDonorStateTransitionReportsStateCorrectly) {
- using Role = ShardingDataTransformMetrics::Role;
- ObserverMock donor{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kDonor};
- auto ignore = _cumulativeMetrics.registerInstanceMetrics(&donor);
-
- ASSERT(checkDonorStateField(_cumulativeMetrics, DonorStateEnum::kUnused));
-
- boost::optional<DonorStateEnum> prevState;
- boost::optional<DonorStateEnum> nextState;
-
- auto simulateTransitionTo = [&](boost::optional<DonorStateEnum> newState) {
- prevState = nextState;
- nextState = newState;
- _cumulativeMetrics.onStateTransition(prevState, nextState);
- return checkDonorStateField(_cumulativeMetrics, nextState);
- };
-
- ASSERT(simulateTransitionTo(DonorStateEnum::kUnused));
- ASSERT(simulateTransitionTo(DonorStateEnum::kPreparingToDonate));
- ASSERT(simulateTransitionTo(DonorStateEnum::kDonatingInitialData));
- ASSERT(simulateTransitionTo(DonorStateEnum::kDonatingOplogEntries));
- ASSERT(simulateTransitionTo(DonorStateEnum::kPreparingToBlockWrites));
- ASSERT(simulateTransitionTo(DonorStateEnum::kBlockingWrites));
- ASSERT(simulateTransitionTo(DonorStateEnum::kDone));
- ASSERT(simulateTransitionTo(boost::none));
-}
-
-TEST_F(ShardingDataTransformCumulativeStateTest,
- SimulatedAbortedDonorStateTransitionReportsStateCorrectly) {
- using Role = ShardingDataTransformMetrics::Role;
- ObserverMock donor{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kDonor};
- auto ignore = _cumulativeMetrics.registerInstanceMetrics(&donor);
-
- ASSERT(checkDonorStateField(_cumulativeMetrics, DonorStateEnum::kUnused));
-
- boost::optional<DonorStateEnum> prevState;
- boost::optional<DonorStateEnum> nextState;
-
- auto simulateTransitionTo = [&](boost::optional<DonorStateEnum> newState) {
- prevState = nextState;
- nextState = newState;
- _cumulativeMetrics.onStateTransition(prevState, nextState);
- return checkDonorStateField(_cumulativeMetrics, nextState);
- };
-
- ASSERT(simulateTransitionTo(DonorStateEnum::kUnused));
- ASSERT(simulateTransitionTo(DonorStateEnum::kPreparingToDonate));
- ASSERT(simulateTransitionTo(DonorStateEnum::kError));
- ASSERT(simulateTransitionTo(DonorStateEnum::kDone));
- ASSERT(simulateTransitionTo(boost::none));
-}
-
-TEST_F(ShardingDataTransformCumulativeStateTest,
- SimulatedSteppedDownDonorStateFromUnusedReportsStateCorrectly) {
- using Role = ShardingDataTransformMetrics::Role;
- ObserverMock donor{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kDonor};
- auto ignore = _cumulativeMetrics.registerInstanceMetrics(&donor);
-
- boost::optional<DonorStateEnum> initState = DonorStateEnum::kUnused;
- ASSERT(checkDonorStateField(_cumulativeMetrics, initState));
-
- _cumulativeMetrics.onStateTransition(initState, {boost::none});
- ASSERT(checkDonorStateField(_cumulativeMetrics, initState));
-}
-
-TEST_F(ShardingDataTransformCumulativeStateTest,
- SimulatedSteppedDownDonorStateTransitionReportsStateCorrectly) {
- using Role = ShardingDataTransformMetrics::Role;
- ObserverMock donor{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kDonor};
- auto ignore = _cumulativeMetrics.registerInstanceMetrics(&donor);
-
- ASSERT(checkDonorStateField(_cumulativeMetrics, DonorStateEnum::kUnused));
-
- boost::optional<DonorStateEnum> prevState;
- boost::optional<DonorStateEnum> nextState;
-
- auto simulateTransitionTo = [&](boost::optional<DonorStateEnum> newState) {
- prevState = nextState;
- nextState = newState;
- _cumulativeMetrics.onStateTransition(prevState, nextState);
- return checkDonorStateField(_cumulativeMetrics, nextState);
- };
-
- ASSERT(simulateTransitionTo(DonorStateEnum::kUnused));
- ASSERT(simulateTransitionTo(DonorStateEnum::kPreparingToDonate));
- ASSERT(simulateTransitionTo(DonorStateEnum::kDonatingInitialData));
- ASSERT(simulateTransitionTo(boost::none));
-}
-
-TEST_F(ShardingDataTransformCumulativeStateTest,
- SimulatedNormalRecipientStateTransitionReportsStateCorrectly) {
- using Role = ShardingDataTransformMetrics::Role;
- ObserverMock recipient{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kRecipient};
- auto ignore = _cumulativeMetrics.registerInstanceMetrics(&recipient);
-
- ASSERT(checkRecipientStateField(_cumulativeMetrics, RecipientStateEnum::kUnused));
-
- boost::optional<RecipientStateEnum> prevState;
- boost::optional<RecipientStateEnum> nextState;
-
- auto simulateTransitionTo = [&](boost::optional<RecipientStateEnum> newState) {
- prevState = nextState;
- nextState = newState;
- _cumulativeMetrics.onStateTransition(prevState, nextState);
- return checkRecipientStateField(_cumulativeMetrics, nextState);
- };
-
- ASSERT(simulateTransitionTo(RecipientStateEnum::kUnused));
- ASSERT(simulateTransitionTo(RecipientStateEnum::kAwaitingFetchTimestamp));
- ASSERT(simulateTransitionTo(RecipientStateEnum::kCreatingCollection));
- ASSERT(simulateTransitionTo(RecipientStateEnum::kCloning));
- ASSERT(simulateTransitionTo(RecipientStateEnum::kApplying));
- ASSERT(simulateTransitionTo(RecipientStateEnum::kStrictConsistency));
- ASSERT(simulateTransitionTo(RecipientStateEnum::kDone));
- ASSERT(simulateTransitionTo(boost::none));
-}
-
-TEST_F(ShardingDataTransformCumulativeStateTest,
- SimulatedAbortedRecipientStateTransitionReportsStateCorrectly) {
- using Role = ShardingDataTransformMetrics::Role;
- ObserverMock recipient{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kRecipient};
- auto ignore = _cumulativeMetrics.registerInstanceMetrics(&recipient);
-
- ASSERT(checkRecipientStateField(_cumulativeMetrics, RecipientStateEnum::kUnused));
-
- boost::optional<RecipientStateEnum> prevState;
- boost::optional<RecipientStateEnum> nextState;
-
- auto simulateTransitionTo = [&](boost::optional<RecipientStateEnum> newState) {
- prevState = nextState;
- nextState = newState;
- _cumulativeMetrics.onStateTransition(prevState, nextState);
- return checkRecipientStateField(_cumulativeMetrics, nextState);
- };
-
- ASSERT(simulateTransitionTo(RecipientStateEnum::kUnused));
- ASSERT(simulateTransitionTo(RecipientStateEnum::kAwaitingFetchTimestamp));
- ASSERT(simulateTransitionTo(RecipientStateEnum::kError));
- ASSERT(simulateTransitionTo(boost::none));
-}
-
-TEST_F(ShardingDataTransformCumulativeStateTest,
- SimulatedSteppedDownRecipientStateFromUnusedReportsStateCorrectly) {
- using Role = ShardingDataTransformMetrics::Role;
- ObserverMock recipient{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kRecipient};
- auto ignore = _cumulativeMetrics.registerInstanceMetrics(&recipient);
-
- boost::optional<RecipientStateEnum> initState = RecipientStateEnum::kUnused;
- ASSERT(checkRecipientStateField(_cumulativeMetrics, initState));
-
- _cumulativeMetrics.onStateTransition(initState, {boost::none});
- ASSERT(checkRecipientStateField(_cumulativeMetrics, initState));
-}
-
-TEST_F(ShardingDataTransformCumulativeStateTest,
- SimulatedSteppedDownRecipientStateTransitionReportsStateCorrectly) {
- using Role = ShardingDataTransformMetrics::Role;
- ObserverMock recipient{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kRecipient};
- auto ignore = _cumulativeMetrics.registerInstanceMetrics(&recipient);
-
- ASSERT(checkRecipientStateField(_cumulativeMetrics, RecipientStateEnum::kUnused));
-
- boost::optional<RecipientStateEnum> prevState;
- boost::optional<RecipientStateEnum> nextState;
-
- auto simulateTransitionTo = [&](boost::optional<RecipientStateEnum> newState) {
- prevState = nextState;
- nextState = newState;
- _cumulativeMetrics.onStateTransition(prevState, nextState);
- return checkRecipientStateField(_cumulativeMetrics, nextState);
- };
-
- ASSERT(simulateTransitionTo(RecipientStateEnum::kUnused));
- ASSERT(simulateTransitionTo(RecipientStateEnum::kAwaitingFetchTimestamp));
- ASSERT(simulateTransitionTo(RecipientStateEnum::kCreatingCollection));
- ASSERT(simulateTransitionTo(boost::none));
-}
-
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp b/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp
index 9d66e37c47e..96ab3d2a6fd 100644
--- a/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp
+++ b/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp
@@ -247,15 +247,6 @@ void ShardingDataTransformInstanceMetrics::setCoordinatorLowEstimateRemainingTim
_coordinatorLowEstimateRemainingTimeMillis.store(milliseconds);
}
-void ShardingDataTransformInstanceMetrics::onLocalInsertDuringOplogFetching(Milliseconds elapsed) {
- _cumulativeMetrics->onLocalInsertDuringOplogFetching(elapsed);
-}
-
-void ShardingDataTransformInstanceMetrics::onBatchRetrievedDuringOplogApplying(
- Milliseconds elapsed) {
- _cumulativeMetrics->onBatchRetrievedDuringOplogApplying(elapsed);
-}
-
void ShardingDataTransformInstanceMetrics::onWriteDuringCriticalSection() {
_writesDuringCriticalSection.addAndFetch(1);
_cumulativeMetrics->onWriteDuringCriticalSection();
@@ -301,10 +292,6 @@ void ShardingDataTransformInstanceMetrics::onCloningTotalRemoteBatchRetrieval(
_cumulativeMetrics->onCloningTotalRemoteBatchRetrieval(elapsed);
}
-void ShardingDataTransformInstanceMetrics::onOplogLocalBatchApplied(Milliseconds elapsed) {
- _cumulativeMetrics->onOplogLocalBatchApplied(elapsed);
-}
-
ShardingDataTransformCumulativeMetrics*
ShardingDataTransformInstanceMetrics::getCumulativeMetrics() {
return _cumulativeMetrics;
diff --git a/src/mongo/db/s/sharding_data_transform_instance_metrics.h b/src/mongo/db/s/sharding_data_transform_instance_metrics.h
index bba3a29a53e..2c26f4ab480 100644
--- a/src/mongo/db/s/sharding_data_transform_instance_metrics.h
+++ b/src/mongo/db/s/sharding_data_transform_instance_metrics.h
@@ -94,10 +94,7 @@ public:
void setDocumentsToProcessCounts(int64_t documentCount, int64_t totalDocumentsSizeBytes);
void setCoordinatorHighEstimateRemainingTimeMillis(Milliseconds milliseconds);
void setCoordinatorLowEstimateRemainingTimeMillis(Milliseconds milliseconds);
- void onLocalInsertDuringOplogFetching(Milliseconds elapsed);
- void onBatchRetrievedDuringOplogApplying(Milliseconds elapsed);
void onCloningTotalRemoteBatchRetrieval(Milliseconds elapsed);
- void onOplogLocalBatchApplied(Milliseconds elapsed);
void onWriteToStashedCollections();
void onReadDuringCriticalSection();
diff --git a/src/mongo/db/s/sharding_data_transform_instance_metrics_test.cpp b/src/mongo/db/s/sharding_data_transform_instance_metrics_test.cpp
index 9392fe02129..f5d4ba2df01 100644
--- a/src/mongo/db/s/sharding_data_transform_instance_metrics_test.cpp
+++ b/src/mongo/db/s/sharding_data_transform_instance_metrics_test.cpp
@@ -146,7 +146,7 @@ public:
role,
getClockSource()->now(),
getClockSource(),
- &_cumulativeMetrics);
+ _cumulativeMetrics.get());
}
std::unique_ptr<ShardingDataTransformInstanceMetricsForTest> createInstanceMetrics(
@@ -160,7 +160,7 @@ public:
ShardingDataTransformInstanceMetrics::Role::kDonor,
getClockSource()->now(),
getClockSource(),
- &_cumulativeMetrics,
+ _cumulativeMetrics.get(),
std::move(fieldNameProvider),
std::move(mock));
}
@@ -169,9 +169,9 @@ public:
TEST_F(ShardingDataTransformInstanceMetricsTest, RegisterAndDeregisterMetrics) {
for (auto i = 0; i < 100; i++) {
auto metrics = createInstanceMetrics();
- ASSERT_EQ(_cumulativeMetrics.getObservedMetricsCount(), 1);
+ ASSERT_EQ(_cumulativeMetrics->getObservedMetricsCount(), 1);
}
- ASSERT_EQ(_cumulativeMetrics.getObservedMetricsCount(), 0);
+ ASSERT_EQ(_cumulativeMetrics->getObservedMetricsCount(), 0);
}
TEST_F(ShardingDataTransformInstanceMetricsTest, RegisterAndDeregisterMetricsAtOnce) {
@@ -179,10 +179,10 @@ TEST_F(ShardingDataTransformInstanceMetricsTest, RegisterAndDeregisterMetricsAtO
std::vector<std::unique_ptr<ShardingDataTransformInstanceMetricsForTest>> registered;
for (auto i = 0; i < 100; i++) {
registered.emplace_back(createInstanceMetrics());
- ASSERT_EQ(_cumulativeMetrics.getObservedMetricsCount(), registered.size());
+ ASSERT_EQ(_cumulativeMetrics->getObservedMetricsCount(), registered.size());
}
}
- ASSERT_EQ(_cumulativeMetrics.getObservedMetricsCount(), 0);
+ ASSERT_EQ(_cumulativeMetrics->getObservedMetricsCount(), 0);
}
TEST_F(ShardingDataTransformInstanceMetricsTest, RandomOperations) {
@@ -306,7 +306,7 @@ TEST_F(ShardingDataTransformInstanceMetricsTest, CurrentOpReportsRunningTime) {
Role::kCoordinator,
start,
getClockSource(),
- &_cumulativeMetrics);
+ _cumulativeMetrics.get());
auto report = metrics->reportForCurrentOp();
ASSERT_EQ(report.getIntField("totalOperationTimeElapsedSecs"), kTimeElapsed);
}
diff --git a/src/mongo/db/s/sharding_data_transform_metrics_test_fixture.h b/src/mongo/db/s/sharding_data_transform_metrics_test_fixture.h
index a548723e16a..a898e9fca73 100644
--- a/src/mongo/db/s/sharding_data_transform_metrics_test_fixture.h
+++ b/src/mongo/db/s/sharding_data_transform_metrics_test_fixture.h
@@ -103,8 +103,13 @@ public:
LOGV2(6437400, "", "TestName"_attr = testName, "Role"_attr = role);
auto uuid = UUID::gen();
const auto& clock = getClockSource();
- auto metrics = std::make_unique<T>(
- uuid, kTestCommand, kTestNamespace, role, clock->now(), clock, &_cumulativeMetrics);
+ auto metrics = std::make_unique<T>(uuid,
+ kTestCommand,
+ kTestNamespace,
+ role,
+ clock->now(),
+ clock,
+ _cumulativeMetrics.get());
// Reports 0 before timed section entered.
clock->advance(kIncrement);
@@ -129,6 +134,26 @@ public:
}
protected:
+ void setUp() override {
+ _cumulativeMetrics = initializeCumulativeMetrics();
+ }
+
+ static BSONObj getLatencySection(StringData rootName,
+ const ShardingDataTransformCumulativeMetrics* metrics) {
+ BSONObjBuilder bob;
+ metrics->reportForServerStatus(&bob);
+ auto report = bob.done();
+ return report.getObjectField(rootName).getObjectField("latencies").getOwned();
+ }
+
+ static BSONObj getActiveSection(StringData rootName,
+ const ShardingDataTransformCumulativeMetrics* metrics) {
+ BSONObjBuilder bob;
+ metrics->reportForServerStatus(&bob);
+ auto report = bob.done();
+ return report.getObjectField(rootName).getObjectField("active").getOwned();
+ }
+
constexpr static auto kTestMetricsName = "testMetrics";
constexpr static auto kYoungestTime =
Date_t::fromMillisSinceEpoch(std::numeric_limits<int64_t>::max());
@@ -140,7 +165,11 @@ protected:
const BSONObj kTestCommand = BSON("command"
<< "test");
- ShardingDataTransformMetricsTestFixture() : _cumulativeMetrics{kTestMetricsName} {}
+ virtual std::unique_ptr<ShardingDataTransformCumulativeMetrics> initializeCumulativeMetrics() {
+ return std::make_unique<ShardingDataTransformCumulativeMetrics>(
+ kTestMetricsName,
+ std::make_unique<ShardingDataTransformCumulativeMetricsFieldNamePlaceholder>());
+ }
const ObserverMock* getYoungestObserver() {
static StaticImmortal<ObserverMock> youngest{kYoungestTime, kYoungestTimeLeft};
@@ -162,7 +191,7 @@ protected:
SpecialIndexBehaviorMap registerAtIndex(int index, const ObserverMock* mock) {
return SpecialIndexBehaviorMap{{index, [this, mock] {
_observers.emplace_back(
- _cumulativeMetrics.registerInstanceMetrics(mock));
+ _cumulativeMetrics->registerInstanceMetrics(mock));
}}};
}
@@ -186,7 +215,7 @@ protected:
std::make_unique<ScopedObserverType>(Date_t::fromMillisSinceEpoch(startTime),
timeLeft,
getClockSource(),
- &_cumulativeMetrics));
+ _cumulativeMetrics.get()));
};
auto performRemoval = [&] {
auto i = rng.nextInt32(inserted.size());
@@ -219,7 +248,7 @@ protected:
kRemovalOdds,
rng.nextInt64(),
registerAtIndex(rng.nextInt32(kIterations), getOldestObserver()));
- ASSERT_EQ(_cumulativeMetrics.getOldestOperationHighEstimateRemainingTimeMillis(
+ ASSERT_EQ(_cumulativeMetrics->getOldestOperationHighEstimateRemainingTimeMillis(
ObserverMock::kDefaultRole),
kOldestTimeLeft);
}
@@ -256,20 +285,20 @@ protected:
for (auto& pf : threadPFs) {
pf.future.wait();
}
- ASSERT_EQ(_cumulativeMetrics.getOldestOperationHighEstimateRemainingTimeMillis(
+ ASSERT_EQ(_cumulativeMetrics->getOldestOperationHighEstimateRemainingTimeMillis(
ObserverMock::kDefaultRole),
kOldestTimeLeft);
size_t expectedCount = 1; // Special insert for kOldest is not counted in vector size.
for (auto& v : threadStorage) {
expectedCount += v.size();
}
- ASSERT_EQ(_cumulativeMetrics.getObservedMetricsCount(), expectedCount);
+ ASSERT_EQ(_cumulativeMetrics->getObservedMetricsCount(), expectedCount);
for (auto& t : threads) {
t.join();
}
}
- ShardingDataTransformCumulativeMetrics _cumulativeMetrics;
+ std::unique_ptr<ShardingDataTransformCumulativeMetrics> _cumulativeMetrics;
std::vector<ShardingDataTransformCumulativeMetrics::UniqueScopedObserver> _observers;
};