diff options
author | Brett Nawrocki <brett.nawrocki@mongodb.com> | 2022-07-22 16:11:26 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-07-28 18:55:59 +0000 |
commit | b263fd5da80fe00213ccc8694f5215ab4da2159a (patch) | |
tree | 0cc5983bb8cdce0dc7ba5acfb48077296b97785a /src/mongo | |
parent | c1c13273cb24b8968849533bf6d25e9eaf2d6855 (diff) | |
download | mongo-b263fd5da80fe00213ccc8694f5215ab4da2159a.tar.gz |
SERVER-67112 Create ReshardingCumulativeMetrics subclass
Diffstat (limited to 'src/mongo')
17 files changed, 1345 insertions, 1153 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index f7cbf589684..f306e353230 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -94,6 +94,7 @@ env.Library( 'resharding/resharding_coordinator_observer.cpp', 'resharding/resharding_coordinator_service.cpp', 'resharding/resharding_cumulative_metrics_field_name_provider.cpp', + 'resharding/resharding_cumulative_metrics.cpp', 'resharding/resharding_data_copy_util.cpp', 'resharding/resharding_data_replication.cpp', 'resharding/resharding_donor_oplog_iterator.cpp', @@ -591,6 +592,7 @@ env.CppUnitTest( 'resharding/resharding_agg_test.cpp', 'resharding/resharding_collection_cloner_test.cpp', 'resharding/resharding_collection_test.cpp', + 'resharding/resharding_cumulative_metrics_test.cpp', 'resharding/resharding_data_replication_test.cpp', 'resharding/resharding_destined_recipient_test.cpp', 'resharding/resharding_donor_oplog_iterator_test.cpp', diff --git a/src/mongo/db/s/global_index_metrics_test.cpp b/src/mongo/db/s/global_index_metrics_test.cpp index 7d2dcb36f41..ebdae28fc08 100644 --- a/src/mongo/db/s/global_index_metrics_test.cpp +++ b/src/mongo/db/s/global_index_metrics_test.cpp @@ -54,7 +54,7 @@ public: role, clockSource->now(), clockSource, - &_cumulativeMetrics); + _cumulativeMetrics.get()); } }; diff --git a/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp index d8740053c80..ed4e41a63f2 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp @@ -108,7 +108,7 @@ private: boost::optional<Callback> _runOnMockingNextResponse; - ShardingDataTransformCumulativeMetrics _cumulativeMetrics{"dummyForTest"}; + ReshardingCumulativeMetrics _cumulativeMetrics; std::shared_ptr<ReshardingMetrics> _metrics; }; diff --git a/src/mongo/db/s/resharding/resharding_cumulative_metrics.cpp b/src/mongo/db/s/resharding/resharding_cumulative_metrics.cpp new file mode 100644 index 00000000000..164b8c4e0d0 --- /dev/null +++ b/src/mongo/db/s/resharding/resharding_cumulative_metrics.cpp @@ -0,0 +1,304 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/s/resharding/resharding_cumulative_metrics.h" + +namespace mongo { + +namespace { +constexpr auto kResharding = "resharding"; +} + +ReshardingCumulativeMetrics::ReshardingCumulativeMetrics() + : ShardingDataTransformCumulativeMetrics( + kResharding, std::make_unique<ReshardingCumulativeMetricsFieldNameProvider>()), + _fieldNames( + static_cast<const ReshardingCumulativeMetricsFieldNameProvider*>(getFieldNames())), + _coordinatorStateList{AtomicWord<int64_t>{0}, + AtomicWord<int64_t>{0}, + AtomicWord<int64_t>{0}, + AtomicWord<int64_t>{0}, + AtomicWord<int64_t>{0}, + AtomicWord<int64_t>{0}, + AtomicWord<int64_t>{0}, + AtomicWord<int64_t>{0}}, + _donorStateList{AtomicWord<int64_t>{0}, + AtomicWord<int64_t>{0}, + AtomicWord<int64_t>{0}, + AtomicWord<int64_t>{0}, + AtomicWord<int64_t>{0}, + AtomicWord<int64_t>{0}, + AtomicWord<int64_t>{0}}, + _recipientStateList{AtomicWord<int64_t>{0}, + AtomicWord<int64_t>{0}, + AtomicWord<int64_t>{0}, + AtomicWord<int64_t>{0}, + AtomicWord<int64_t>{0}, + AtomicWord<int64_t>{0}, + AtomicWord<int64_t>{0}} {} + +StringData ReshardingCumulativeMetrics::fieldNameFor( + CoordinatorStateEnum state, const ReshardingCumulativeMetricsFieldNameProvider* provider) { + switch (state) { + case CoordinatorStateEnum::kInitializing: + return provider->getForCountInstancesInCoordinatorState1Initializing(); + + case CoordinatorStateEnum::kPreparingToDonate: + return provider->getForCountInstancesInCoordinatorState2PreparingToDonate(); + + case CoordinatorStateEnum::kCloning: + return provider->getForCountInstancesInCoordinatorState3Cloning(); + + case CoordinatorStateEnum::kApplying: + return provider->getForCountInstancesInCoordinatorState4Applying(); + + case CoordinatorStateEnum::kBlockingWrites: + return provider->getForCountInstancesInCoordinatorState5BlockingWrites(); + + case CoordinatorStateEnum::kAborting: + return provider->getForCountInstancesInCoordinatorState6Aborting(); + + case CoordinatorStateEnum::kCommitting: + return provider->getForCountInstancesInCoordinatorState7Committing(); + + default: + uasserted(6438601, + str::stream() + << "no field name for coordinator state " << static_cast<int32_t>(state)); + break; + } + + MONGO_UNREACHABLE; +} + +StringData ReshardingCumulativeMetrics::fieldNameFor( + DonorStateEnum state, const ReshardingCumulativeMetricsFieldNameProvider* provider) { + switch (state) { + case DonorStateEnum::kPreparingToDonate: + return provider->getForCountInstancesInDonorState1PreparingToDonate(); + + case DonorStateEnum::kDonatingInitialData: + return provider->getForCountInstancesInDonorState2DonatingInitialData(); + + case DonorStateEnum::kDonatingOplogEntries: + return provider->getForCountInstancesInDonorState3DonatingOplogEntries(); + + case DonorStateEnum::kPreparingToBlockWrites: + return provider->getForCountInstancesInDonorState4PreparingToBlockWrites(); + + case DonorStateEnum::kError: + return provider->getForCountInstancesInDonorState5Error(); + + case DonorStateEnum::kBlockingWrites: + return provider->getForCountInstancesInDonorState6BlockingWrites(); + + case DonorStateEnum::kDone: + return provider->getForCountInstancesInDonorState7Done(); + + default: + uasserted(6438700, + str::stream() + << "no field name for donor state " << static_cast<int32_t>(state)); + break; + } + + MONGO_UNREACHABLE; +} + +StringData ReshardingCumulativeMetrics::fieldNameFor( + RecipientStateEnum state, const ReshardingCumulativeMetricsFieldNameProvider* provider) { + switch (state) { + case RecipientStateEnum::kAwaitingFetchTimestamp: + return provider->getForCountInstancesInRecipientState1AwaitingFetchTimestamp(); + + case RecipientStateEnum::kCreatingCollection: + return provider->getForCountInstancesInRecipientState2CreatingCollection(); + + case RecipientStateEnum::kCloning: + return provider->getForCountInstancesInRecipientState3Cloning(); + + case RecipientStateEnum::kApplying: + return provider->getForCountInstancesInRecipientState4Applying(); + + case RecipientStateEnum::kError: + return provider->getForCountInstancesInRecipientState5Error(); + + case RecipientStateEnum::kStrictConsistency: + return provider->getForCountInstancesInRecipientState6StrictConsistency(); + + case RecipientStateEnum::kDone: + return provider->getForCountInstancesInRecipientState7Done(); + + default: + uasserted(6438900, + str::stream() + << "no field name for recipient state " << static_cast<int32_t>(state)); + break; + } + + MONGO_UNREACHABLE; +} + + +ReshardingCumulativeMetrics::CoordinatorStateArray* ReshardingCumulativeMetrics::getStateArrayFor( + CoordinatorStateEnum state) { + return &_coordinatorStateList; +} + +const ReshardingCumulativeMetrics::CoordinatorStateArray* +ReshardingCumulativeMetrics::getStateArrayFor(CoordinatorStateEnum state) const { + return &_coordinatorStateList; +} + +ReshardingCumulativeMetrics::DonorStateArray* ReshardingCumulativeMetrics::getStateArrayFor( + DonorStateEnum state) { + return &_donorStateList; +} + +const ReshardingCumulativeMetrics::DonorStateArray* ReshardingCumulativeMetrics::getStateArrayFor( + DonorStateEnum state) const { + return &_donorStateList; +} + +ReshardingCumulativeMetrics::RecipientStateArray* ReshardingCumulativeMetrics::getStateArrayFor( + RecipientStateEnum state) { + return &_recipientStateList; +} + +const ReshardingCumulativeMetrics::RecipientStateArray* +ReshardingCumulativeMetrics::getStateArrayFor(RecipientStateEnum state) const { + return &_recipientStateList; +} + +void ReshardingCumulativeMetrics::reportActive(BSONObjBuilder* bob) const { + ShardingDataTransformCumulativeMetrics::reportActive(bob); + + bob->append(_fieldNames->getForOplogEntriesFetched(), _oplogEntriesFetched.load()); + bob->append(_fieldNames->getForOplogEntriesApplied(), _oplogEntriesApplied.load()); + bob->append(_fieldNames->getForInsertsApplied(), _insertsApplied.load()); + bob->append(_fieldNames->getForUpdatesApplied(), _updatesApplied.load()); + bob->append(_fieldNames->getForDeletesApplied(), _deletesApplied.load()); +} + +void ReshardingCumulativeMetrics::reportLatencies(BSONObjBuilder* bob) const { + ShardingDataTransformCumulativeMetrics::reportLatencies(bob); + + bob->append(_fieldNames->getForOplogFetchingTotalRemoteBatchRetrievalTimeMillis(), + _oplogFetchingTotalRemoteBatchesRetrievalTimeMillis.load()); + bob->append(_fieldNames->getForOplogFetchingTotalRemoteBatchesRetrieved(), + _oplogFetchingTotalRemoteBatchesRetrieved.load()); + bob->append(_fieldNames->getForOplogFetchingTotalLocalInsertTimeMillis(), + _oplogFetchingTotalLocalInsertTimeMillis.load()); + bob->append(_fieldNames->getForOplogFetchingTotalLocalInserts(), + _oplogFetchingTotalLocalInserts.load()); + bob->append(_fieldNames->getForOplogApplyingTotalLocalBatchRetrievalTimeMillis(), + _oplogApplyingTotalBatchesRetrievalTimeMillis.load()); + bob->append(_fieldNames->getForOplogApplyingTotalLocalBatchesRetrieved(), + _oplogApplyingTotalBatchesRetrieved.load()); + bob->append(_fieldNames->getForOplogApplyingTotalLocalBatchApplyTimeMillis(), + _oplogBatchAppliedMillis.load()); + bob->append(_fieldNames->getForOplogApplyingTotalLocalBatchesApplied(), + _oplogBatchApplied.load()); +} + +void ReshardingCumulativeMetrics::reportCurrentInSteps(BSONObjBuilder* bob) const { + ShardingDataTransformCumulativeMetrics::reportCurrentInSteps(bob); + + auto reportState = [this, bob](auto state) { + bob->append(fieldNameFor(state, _fieldNames), getStateCounter(state)->load()); + }; + + reportState(CoordinatorStateEnum::kInitializing); + reportState(CoordinatorStateEnum::kPreparingToDonate); + reportState(CoordinatorStateEnum::kCloning); + reportState(CoordinatorStateEnum::kApplying); + reportState(CoordinatorStateEnum::kBlockingWrites); + reportState(CoordinatorStateEnum::kAborting); + reportState(CoordinatorStateEnum::kCommitting); + + reportState(RecipientStateEnum::kAwaitingFetchTimestamp); + reportState(RecipientStateEnum::kCreatingCollection); + reportState(RecipientStateEnum::kCloning); + reportState(RecipientStateEnum::kApplying); + reportState(RecipientStateEnum::kError); + reportState(RecipientStateEnum::kStrictConsistency); + reportState(RecipientStateEnum::kDone); + + reportState(DonorStateEnum::kPreparingToDonate); + reportState(DonorStateEnum::kDonatingInitialData); + reportState(DonorStateEnum::kDonatingOplogEntries); + reportState(DonorStateEnum::kPreparingToBlockWrites); + reportState(DonorStateEnum::kError); + reportState(DonorStateEnum::kBlockingWrites); + reportState(DonorStateEnum::kDone); +} + +void ReshardingCumulativeMetrics::onInsertApplied() { + _insertsApplied.fetchAndAdd(1); +} + +void ReshardingCumulativeMetrics::onUpdateApplied() { + _updatesApplied.fetchAndAdd(1); +} + +void ReshardingCumulativeMetrics::onDeleteApplied() { + _deletesApplied.fetchAndAdd(1); +} + +void ReshardingCumulativeMetrics::onOplogEntriesFetched(int64_t numEntries, Milliseconds elapsed) { + _oplogEntriesFetched.fetchAndAdd(numEntries); + _oplogFetchingTotalRemoteBatchesRetrieved.fetchAndAdd(1); + _oplogFetchingTotalRemoteBatchesRetrievalTimeMillis.fetchAndAdd( + durationCount<Milliseconds>(elapsed)); +} + +void ReshardingCumulativeMetrics::onOplogEntriesApplied(int64_t numEntries) { + _oplogEntriesApplied.fetchAndAdd(numEntries); +} + +void ReshardingCumulativeMetrics::onOplogLocalBatchApplied(Milliseconds elapsed) { + _oplogBatchApplied.fetchAndAdd(1); + _oplogBatchAppliedMillis.fetchAndAdd(durationCount<Milliseconds>(elapsed)); +} + +void ReshardingCumulativeMetrics::onLocalInsertDuringOplogFetching( + const Milliseconds& elapsedTime) { + _oplogFetchingTotalLocalInserts.fetchAndAdd(1); + _oplogFetchingTotalLocalInsertTimeMillis.fetchAndAdd(durationCount<Milliseconds>(elapsedTime)); +} + +void ReshardingCumulativeMetrics::onBatchRetrievedDuringOplogApplying( + const Milliseconds& elapsedTime) { + _oplogApplyingTotalBatchesRetrieved.fetchAndAdd(1); + _oplogApplyingTotalBatchesRetrievalTimeMillis.fetchAndAdd( + durationCount<Milliseconds>(elapsedTime)); +} + + +} // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_cumulative_metrics.h b/src/mongo/db/s/resharding/resharding_cumulative_metrics.h new file mode 100644 index 00000000000..af3e9e29206 --- /dev/null +++ b/src/mongo/db/s/resharding/resharding_cumulative_metrics.h @@ -0,0 +1,184 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/s/resharding/resharding_cumulative_metrics_field_name_provider.h" +#include "mongo/db/s/sharding_data_transform_cumulative_metrics.h" + +namespace mongo { + +class ReshardingCumulativeMetrics : public ShardingDataTransformCumulativeMetrics { +public: + enum class CoordinatorStateEnum : int32_t { + kUnused = -1, + kInitializing, + kPreparingToDonate, + kCloning, + kApplying, + kBlockingWrites, + kAborting, + kCommitting, + kDone, + kNumStates + }; + + enum class DonorStateEnum : int32_t { + kUnused = -1, + kPreparingToDonate, + kDonatingInitialData, + kDonatingOplogEntries, + kPreparingToBlockWrites, + kError, + kBlockingWrites, + kDone, + kNumStates + }; + + enum class RecipientStateEnum : int32_t { + kUnused = -1, + kAwaitingFetchTimestamp, + kCreatingCollection, + kCloning, + kApplying, + kError, + kStrictConsistency, + kDone, + kNumStates + }; + + ReshardingCumulativeMetrics(); + + static StringData fieldNameFor(CoordinatorStateEnum state, + const ReshardingCumulativeMetricsFieldNameProvider* provider); + static StringData fieldNameFor(DonorStateEnum state, + const ReshardingCumulativeMetricsFieldNameProvider* provider); + static StringData fieldNameFor(RecipientStateEnum state, + const ReshardingCumulativeMetricsFieldNameProvider* provider); + + /** + * The before can be boost::none to represent the initial state transition and + * after can be boost::none to represent cases where it is no longer active. + */ + template <typename T> + void onStateTransition(boost::optional<T> before, boost::optional<T> after); + + void onInsertApplied(); + void onUpdateApplied(); + void onDeleteApplied(); + + void onOplogEntriesFetched(int64_t numEntries, Milliseconds elapsed); + void onOplogEntriesApplied(int64_t numEntries); + void onLocalInsertDuringOplogFetching(const Milliseconds& elapsedTime); + void onBatchRetrievedDuringOplogApplying(const Milliseconds& elapsedTime); + void onOplogLocalBatchApplied(Milliseconds elapsed); + +private: + using CoordinatorStateArray = + std::array<AtomicWord<int64_t>, static_cast<size_t>(CoordinatorStateEnum::kNumStates)>; + using DonorStateArray = + std::array<AtomicWord<int64_t>, static_cast<size_t>(DonorStateEnum::kNumStates)>; + using RecipientStateArray = + std::array<AtomicWord<int64_t>, static_cast<size_t>(RecipientStateEnum::kNumStates)>; + + template <typename T> + const AtomicWord<int64_t>* getStateCounter(T state) const; + template <typename T> + AtomicWord<int64_t>* getMutableStateCounter(T state); + + CoordinatorStateArray* getStateArrayFor(CoordinatorStateEnum state); + const CoordinatorStateArray* getStateArrayFor(CoordinatorStateEnum state) const; + DonorStateArray* getStateArrayFor(DonorStateEnum state); + const DonorStateArray* getStateArrayFor(DonorStateEnum state) const; + RecipientStateArray* getStateArrayFor(RecipientStateEnum state); + const RecipientStateArray* getStateArrayFor(RecipientStateEnum state) const; + + virtual void reportActive(BSONObjBuilder* bob) const; + virtual void reportLatencies(BSONObjBuilder* bob) const; + virtual void reportCurrentInSteps(BSONObjBuilder* bob) const; + + const ReshardingCumulativeMetricsFieldNameProvider* _fieldNames; + + AtomicWord<int64_t> _insertsApplied{0}; + AtomicWord<int64_t> _updatesApplied{0}; + AtomicWord<int64_t> _deletesApplied{0}; + AtomicWord<int64_t> _oplogEntriesApplied{0}; + AtomicWord<int64_t> _oplogEntriesFetched{0}; + + AtomicWord<int64_t> _oplogFetchingTotalRemoteBatchesRetrieved{0}; + AtomicWord<int64_t> _oplogFetchingTotalRemoteBatchesRetrievalTimeMillis{0}; + AtomicWord<int64_t> _oplogFetchingTotalLocalInserts{0}; + AtomicWord<int64_t> _oplogFetchingTotalLocalInsertTimeMillis{0}; + AtomicWord<int64_t> _oplogApplyingTotalBatchesRetrieved{0}; + AtomicWord<int64_t> _oplogApplyingTotalBatchesRetrievalTimeMillis{0}; + AtomicWord<int64_t> _oplogBatchApplied{0}; + AtomicWord<int64_t> _oplogBatchAppliedMillis{0}; + + CoordinatorStateArray _coordinatorStateList; + DonorStateArray _donorStateList; + RecipientStateArray _recipientStateList; +}; + +template <typename T> +void ReshardingCumulativeMetrics::onStateTransition(boost::optional<T> before, + boost::optional<T> after) { + if (before) { + if (auto counter = getMutableStateCounter(*before)) { + counter->fetchAndSubtract(1); + } + } + + if (after) { + if (auto counter = getMutableStateCounter(*after)) { + counter->fetchAndAdd(1); + } + } +} + +template <typename T> +const AtomicWord<int64_t>* ReshardingCumulativeMetrics::getStateCounter(T state) const { + if (state == T::kUnused) { + return nullptr; + } + + invariant(static_cast<size_t>(state) < static_cast<size_t>(T::kNumStates)); + return &((*getStateArrayFor(state))[static_cast<size_t>(state)]); +} + +template <typename T> +AtomicWord<int64_t>* ReshardingCumulativeMetrics::getMutableStateCounter(T state) { + if (state == T::kUnused) { + return nullptr; + } + + invariant(static_cast<size_t>(state) < static_cast<size_t>(T::kNumStates)); + return &((*getStateArrayFor(state))[static_cast<size_t>(state)]); +} + +} // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_cumulative_metrics_test.cpp b/src/mongo/db/s/resharding/resharding_cumulative_metrics_test.cpp new file mode 100644 index 00000000000..c35b2f31c10 --- /dev/null +++ b/src/mongo/db/s/resharding/resharding_cumulative_metrics_test.cpp @@ -0,0 +1,574 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + + +#include "mongo/db/s/resharding/resharding_cumulative_metrics.h" +#include "mongo/db/s/sharding_data_transform_metrics_test_fixture.h" + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest + +namespace mongo { +namespace { + +constexpr auto kResharding = "resharding"; + +class ReshardingCumulativeMetricsTest : public ShardingDataTransformMetricsTestFixture { +protected: + void setUp() override { + ShardingDataTransformMetricsTestFixture::setUp(); + _reshardingCumulativeMetrics = + static_cast<ReshardingCumulativeMetrics*>(_cumulativeMetrics.get()); + _fieldNames = std::make_unique<ReshardingCumulativeMetricsFieldNameProvider>(); + } + + virtual std::unique_ptr<ShardingDataTransformCumulativeMetrics> initializeCumulativeMetrics() + override { + return std::make_unique<ReshardingCumulativeMetrics>(); + } + + using CoordinatorStateEnum = ReshardingCumulativeMetrics::CoordinatorStateEnum; + using DonorStateEnum = ReshardingCumulativeMetrics::DonorStateEnum; + using RecipientStateEnum = ReshardingCumulativeMetrics::RecipientStateEnum; + + template <typename T> + StringData fieldNameFor(T state) { + return ReshardingCumulativeMetrics::fieldNameFor(state, _fieldNames.get()); + } + + BSONObj getStateSubObj(const ReshardingCumulativeMetrics* metrics) { + BSONObjBuilder bob; + metrics->reportForServerStatus(&bob); + auto report = bob.done(); + return report.getObjectField(kResharding).getObjectField("currentInSteps").getOwned(); + } + + bool checkCoordinateStateField(const ReshardingCumulativeMetrics* metrics, + boost::optional<CoordinatorStateEnum> expectedState) { + auto serverStatusSubObj = getStateSubObj(metrics); + std::map<std::string, int> expectedStateFieldCount; + + auto addExpectedField = [&](CoordinatorStateEnum stateToPopulate) { + expectedStateFieldCount.emplace( + fieldNameFor(stateToPopulate), + ((expectedState && (stateToPopulate == expectedState)) ? 1 : 0)); + }; + + addExpectedField(CoordinatorStateEnum::kInitializing); + addExpectedField(CoordinatorStateEnum::kPreparingToDonate); + addExpectedField(CoordinatorStateEnum::kCloning); + addExpectedField(CoordinatorStateEnum::kApplying); + addExpectedField(CoordinatorStateEnum::kBlockingWrites); + addExpectedField(CoordinatorStateEnum::kAborting); + addExpectedField(CoordinatorStateEnum::kCommitting); + + for (const auto& fieldNameAndState : expectedStateFieldCount) { + const auto actualValue = serverStatusSubObj.getIntField(fieldNameAndState.first); + if (actualValue != fieldNameAndState.second) { + LOGV2_DEBUG(6438600, + 0, + "coordinator state field value does not match expected value", + "field"_attr = fieldNameAndState.first, + "serverStatus"_attr = serverStatusSubObj); + return false; + } + } + + return true; + } + + bool checkDonorStateField(const ReshardingCumulativeMetrics* metrics, + boost::optional<DonorStateEnum> expectedState) { + auto serverStatusSubObj = getStateSubObj(metrics); + std::map<std::string, int> expectedStateFieldCount; + + auto addExpectedField = [&](DonorStateEnum stateToPopulate) { + expectedStateFieldCount.emplace( + fieldNameFor(stateToPopulate), + ((expectedState && (stateToPopulate == expectedState)) ? 1 : 0)); + }; + + addExpectedField(DonorStateEnum::kPreparingToDonate); + addExpectedField(DonorStateEnum::kDonatingInitialData); + addExpectedField(DonorStateEnum::kDonatingOplogEntries); + addExpectedField(DonorStateEnum::kPreparingToBlockWrites); + addExpectedField(DonorStateEnum::kError); + addExpectedField(DonorStateEnum::kBlockingWrites); + addExpectedField(DonorStateEnum::kDone); + + for (const auto& fieldNameAndState : expectedStateFieldCount) { + const auto actualValue = serverStatusSubObj.getIntField(fieldNameAndState.first); + if (actualValue != fieldNameAndState.second) { + LOGV2_DEBUG(6438701, + 0, + "Donor state field value does not match expected value", + "field"_attr = fieldNameAndState.first, + "serverStatus"_attr = serverStatusSubObj); + return false; + } + } + + return true; + } + + bool checkRecipientStateField(const ReshardingCumulativeMetrics* metrics, + boost::optional<RecipientStateEnum> expectedState) { + auto serverStatusSubObj = getStateSubObj(metrics); + std::map<std::string, int> expectedStateFieldCount; + + auto addExpectedField = [&](RecipientStateEnum stateToPopulate) { + expectedStateFieldCount.emplace( + fieldNameFor(stateToPopulate), + ((expectedState && (stateToPopulate == expectedState)) ? 1 : 0)); + }; + + addExpectedField(RecipientStateEnum::kAwaitingFetchTimestamp); + addExpectedField(RecipientStateEnum::kCreatingCollection); + addExpectedField(RecipientStateEnum::kCloning); + addExpectedField(RecipientStateEnum::kApplying); + addExpectedField(RecipientStateEnum::kError); + addExpectedField(RecipientStateEnum::kStrictConsistency); + addExpectedField(RecipientStateEnum::kDone); + + for (const auto& fieldNameAndState : expectedStateFieldCount) { + const auto actualValue = serverStatusSubObj.getIntField(fieldNameAndState.first); + if (actualValue != fieldNameAndState.second) { + LOGV2_DEBUG(6438901, + 0, + "Recipient state field value does not match expected value", + "field"_attr = fieldNameAndState.first, + "serverStatus"_attr = serverStatusSubObj); + return false; + } + } + + return true; + } + + ReshardingCumulativeMetrics* _reshardingCumulativeMetrics; + std::unique_ptr<ReshardingCumulativeMetricsFieldNameProvider> _fieldNames; +}; + + +TEST_F(ReshardingCumulativeMetricsTest, ReportContainsInsertsDuringFetching) { + using Role = ShardingDataTransformMetrics::Role; + ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient}; + auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&recipient); + + auto latencySection = getLatencySection(kResharding, _reshardingCumulativeMetrics); + ASSERT_EQ(latencySection.getIntField("oplogFetchingTotalLocalInserts"), 0); + ASSERT_EQ(latencySection.getIntField("oplogFetchingTotalLocalInsertTimeMillis"), 0); + + _reshardingCumulativeMetrics->onLocalInsertDuringOplogFetching(Milliseconds(17)); + + latencySection = getLatencySection(kResharding, _reshardingCumulativeMetrics); + ASSERT_EQ(latencySection.getIntField("oplogFetchingTotalLocalInserts"), 1); + ASSERT_EQ(latencySection.getIntField("oplogFetchingTotalLocalInsertTimeMillis"), 17); +} + + +TEST_F(ReshardingCumulativeMetricsTest, ReportContainsBatchRetrievedDuringApplying) { + using Role = ShardingDataTransformMetrics::Role; + ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient}; + auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&recipient); + + auto latencySection = getLatencySection(kResharding, _reshardingCumulativeMetrics); + ASSERT_EQ(latencySection.getIntField("oplogApplyingTotalLocalBatchesRetrieved"), 0); + ASSERT_EQ(latencySection.getIntField("oplogApplyingTotalLocalBatchRetrievalTimeMillis"), 0); + + _reshardingCumulativeMetrics->onBatchRetrievedDuringOplogApplying(Milliseconds(39)); + + latencySection = getLatencySection(kResharding, _reshardingCumulativeMetrics); + ASSERT_EQ(latencySection.getIntField("oplogApplyingTotalLocalBatchesRetrieved"), 1); + ASSERT_EQ(latencySection.getIntField("oplogApplyingTotalLocalBatchRetrievalTimeMillis"), 39); +} + + +TEST_F(ReshardingCumulativeMetricsTest, ReportContainsBatchApplied) { + using Role = ShardingDataTransformMetrics::Role; + ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient}; + auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&recipient); + + auto latencySection = getLatencySection(kResharding, _reshardingCumulativeMetrics); + ASSERT_EQ(latencySection.getIntField("oplogApplyingTotalLocalBatchesApplied"), 0); + ASSERT_EQ(latencySection.getIntField("oplogApplyingTotalLocalBatchApplyTimeMillis"), 0); + + _reshardingCumulativeMetrics->onOplogLocalBatchApplied(Milliseconds(333)); + + latencySection = getLatencySection(kResharding, _reshardingCumulativeMetrics); + ASSERT_EQ(latencySection.getIntField("oplogApplyingTotalLocalBatchesApplied"), 1); + ASSERT_EQ(latencySection.getIntField("oplogApplyingTotalLocalBatchApplyTimeMillis"), 333); +} + +TEST_F(ReshardingCumulativeMetricsTest, ReportContainsInsertsApplied) { + using Role = ShardingDataTransformMetrics::Role; + ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient}; + auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&recipient); + + auto activeSection = getActiveSection(kResharding, _reshardingCumulativeMetrics); + ASSERT_EQ(activeSection.getIntField("insertsApplied"), 0); + + _reshardingCumulativeMetrics->onInsertApplied(); + + activeSection = getActiveSection(kResharding, _reshardingCumulativeMetrics); + ASSERT_EQ(activeSection.getIntField("insertsApplied"), 1); +} + +TEST_F(ReshardingCumulativeMetricsTest, ReportContainsUpdatesApplied) { + using Role = ShardingDataTransformMetrics::Role; + ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient}; + auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&recipient); + + auto activeSection = getActiveSection(kResharding, _reshardingCumulativeMetrics); + ASSERT_EQ(activeSection.getIntField("updatesApplied"), 0); + + _reshardingCumulativeMetrics->onUpdateApplied(); + + activeSection = getActiveSection(kResharding, _reshardingCumulativeMetrics); + ASSERT_EQ(activeSection.getIntField("updatesApplied"), 1); +} + +TEST_F(ReshardingCumulativeMetricsTest, ReportContainsDeletesApplied) { + using Role = ShardingDataTransformMetrics::Role; + ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient}; + auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&recipient); + + auto activeSection = getActiveSection(kResharding, _reshardingCumulativeMetrics); + ASSERT_EQ(activeSection.getIntField("deletesApplied"), 0); + + _reshardingCumulativeMetrics->onDeleteApplied(); + + activeSection = getActiveSection(kResharding, _reshardingCumulativeMetrics); + ASSERT_EQ(activeSection.getIntField("deletesApplied"), 1); +} + +TEST_F(ReshardingCumulativeMetricsTest, ReportContainsOplogEntriesFetched) { + using Role = ShardingDataTransformMetrics::Role; + ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient}; + auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&recipient); + + auto activeSection = getActiveSection(kResharding, _reshardingCumulativeMetrics); + ASSERT_EQ(activeSection.getIntField("oplogEntriesFetched"), 0); + + auto latencySection = getLatencySection(kResharding, _reshardingCumulativeMetrics); + ASSERT_EQ(latencySection.getIntField("oplogFetchingTotalRemoteBatchesRetrieved"), 0); + ASSERT_EQ(latencySection.getIntField("oplogFetchingTotalRemoteBatchRetrievalTimeMillis"), 0); + + _reshardingCumulativeMetrics->onOplogEntriesFetched(123, Milliseconds(43)); + + activeSection = getActiveSection(kResharding, _reshardingCumulativeMetrics); + ASSERT_EQ(activeSection.getIntField("oplogEntriesFetched"), 123); + + latencySection = getLatencySection(kResharding, _reshardingCumulativeMetrics); + ASSERT_EQ(latencySection.getIntField("oplogFetchingTotalRemoteBatchesRetrieved"), 1); + ASSERT_EQ(latencySection.getIntField("oplogFetchingTotalRemoteBatchRetrievalTimeMillis"), 43); +} + +TEST_F(ReshardingCumulativeMetricsTest, ReportContainsOplogEntriesApplied) { + using Role = ShardingDataTransformMetrics::Role; + ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient}; + auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&recipient); + + auto activeSection = getActiveSection(kResharding, _reshardingCumulativeMetrics); + ASSERT_EQ(activeSection.getIntField("oplogEntriesApplied"), 0); + + _reshardingCumulativeMetrics->onOplogEntriesApplied(99); + + activeSection = getActiveSection(kResharding, _reshardingCumulativeMetrics); + ASSERT_EQ(activeSection.getIntField("oplogEntriesApplied"), 99); +} + +TEST_F(ReshardingCumulativeMetricsTest, + SimulatedNormalCoordinatorStateTransitionReportsStateCorrectly) { + using Role = ShardingDataTransformMetrics::Role; + ObserverMock coordinator{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kCoordinator}; + auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&coordinator); + + ASSERT(checkCoordinateStateField(_reshardingCumulativeMetrics, CoordinatorStateEnum::kUnused)); + + boost::optional<CoordinatorStateEnum> prevState; + boost::optional<CoordinatorStateEnum> nextState; + + auto simulateTransitionTo = [&](boost::optional<CoordinatorStateEnum> newState) { + prevState = nextState; + nextState = newState; + _reshardingCumulativeMetrics->onStateTransition(prevState, nextState); + return checkCoordinateStateField(_reshardingCumulativeMetrics, nextState); + }; + + ASSERT(simulateTransitionTo(CoordinatorStateEnum::kUnused)); + ASSERT(simulateTransitionTo(CoordinatorStateEnum::kInitializing)); + ASSERT(simulateTransitionTo(CoordinatorStateEnum::kPreparingToDonate)); + ASSERT(simulateTransitionTo(CoordinatorStateEnum::kCloning)); + ASSERT(simulateTransitionTo(CoordinatorStateEnum::kBlockingWrites)); + ASSERT(simulateTransitionTo(CoordinatorStateEnum::kCommitting)); + ASSERT(simulateTransitionTo(CoordinatorStateEnum::kDone)); + ASSERT(simulateTransitionTo(boost::none)); +} + +TEST_F(ReshardingCumulativeMetricsTest, + SimulatedAbortedCoordinatorStateTransitionReportsStateCorrectly) { + using Role = ShardingDataTransformMetrics::Role; + ObserverMock coordinator{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kCoordinator}; + auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&coordinator); + + ASSERT(checkCoordinateStateField(_reshardingCumulativeMetrics, CoordinatorStateEnum::kUnused)); + + boost::optional<CoordinatorStateEnum> prevState; + boost::optional<CoordinatorStateEnum> nextState; + + auto simulateTransitionTo = [&](boost::optional<CoordinatorStateEnum> newState) { + prevState = nextState; + nextState = newState; + _reshardingCumulativeMetrics->onStateTransition(prevState, nextState); + return checkCoordinateStateField(_reshardingCumulativeMetrics, nextState); + }; + + ASSERT(simulateTransitionTo(CoordinatorStateEnum::kUnused)); + ASSERT(simulateTransitionTo(CoordinatorStateEnum::kInitializing)); + ASSERT(simulateTransitionTo(CoordinatorStateEnum::kPreparingToDonate)); + ASSERT(simulateTransitionTo(CoordinatorStateEnum::kAborting)); + ASSERT(simulateTransitionTo(boost::none)); +} + +TEST_F(ReshardingCumulativeMetricsTest, + SimulatedSteppedDownCoordinatorStateFromUnusedReportsStateCorrectly) { + using Role = ShardingDataTransformMetrics::Role; + ObserverMock coordinator{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kCoordinator}; + auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&coordinator); + + boost::optional<CoordinatorStateEnum> initState = CoordinatorStateEnum::kUnused; + ASSERT(checkCoordinateStateField(_reshardingCumulativeMetrics, initState)); + + _reshardingCumulativeMetrics->onStateTransition(initState, {boost::none}); + ASSERT(checkCoordinateStateField(_reshardingCumulativeMetrics, initState)); +} + +TEST_F(ReshardingCumulativeMetricsTest, + SimulatedSteppedDownCoordinatorStateTransitionReportsStateCorrectly) { + using Role = ShardingDataTransformMetrics::Role; + ObserverMock coordinator{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kCoordinator}; + auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&coordinator); + + ASSERT(checkCoordinateStateField(_reshardingCumulativeMetrics, CoordinatorStateEnum::kUnused)); + + boost::optional<CoordinatorStateEnum> prevState; + boost::optional<CoordinatorStateEnum> nextState; + + auto simulateTransitionTo = [&](boost::optional<CoordinatorStateEnum> newState) { + prevState = nextState; + nextState = newState; + _reshardingCumulativeMetrics->onStateTransition(prevState, nextState); + return checkCoordinateStateField(_reshardingCumulativeMetrics, nextState); + }; + + ASSERT(simulateTransitionTo(CoordinatorStateEnum::kUnused)); + ASSERT(simulateTransitionTo(CoordinatorStateEnum::kInitializing)); + ASSERT(simulateTransitionTo(CoordinatorStateEnum::kPreparingToDonate)); + ASSERT(simulateTransitionTo(boost::none)); +} + +TEST_F(ReshardingCumulativeMetricsTest, SimulatedNormalDonorStateTransitionReportsStateCorrectly) { + using Role = ShardingDataTransformMetrics::Role; + ObserverMock donor{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kDonor}; + auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&donor); + + ASSERT(checkDonorStateField(_reshardingCumulativeMetrics, DonorStateEnum::kUnused)); + + boost::optional<DonorStateEnum> prevState; + boost::optional<DonorStateEnum> nextState; + + auto simulateTransitionTo = [&](boost::optional<DonorStateEnum> newState) { + prevState = nextState; + nextState = newState; + _reshardingCumulativeMetrics->onStateTransition(prevState, nextState); + return checkDonorStateField(_reshardingCumulativeMetrics, nextState); + }; + + ASSERT(simulateTransitionTo(DonorStateEnum::kUnused)); + ASSERT(simulateTransitionTo(DonorStateEnum::kPreparingToDonate)); + ASSERT(simulateTransitionTo(DonorStateEnum::kDonatingInitialData)); + ASSERT(simulateTransitionTo(DonorStateEnum::kDonatingOplogEntries)); + ASSERT(simulateTransitionTo(DonorStateEnum::kPreparingToBlockWrites)); + ASSERT(simulateTransitionTo(DonorStateEnum::kBlockingWrites)); + ASSERT(simulateTransitionTo(DonorStateEnum::kDone)); + ASSERT(simulateTransitionTo(boost::none)); +} + +TEST_F(ReshardingCumulativeMetricsTest, SimulatedAbortedDonorStateTransitionReportsStateCorrectly) { + using Role = ShardingDataTransformMetrics::Role; + ObserverMock donor{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kDonor}; + auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&donor); + + ASSERT(checkDonorStateField(_reshardingCumulativeMetrics, DonorStateEnum::kUnused)); + + boost::optional<DonorStateEnum> prevState; + boost::optional<DonorStateEnum> nextState; + + auto simulateTransitionTo = [&](boost::optional<DonorStateEnum> newState) { + prevState = nextState; + nextState = newState; + _reshardingCumulativeMetrics->onStateTransition(prevState, nextState); + return checkDonorStateField(_reshardingCumulativeMetrics, nextState); + }; + + ASSERT(simulateTransitionTo(DonorStateEnum::kUnused)); + ASSERT(simulateTransitionTo(DonorStateEnum::kPreparingToDonate)); + ASSERT(simulateTransitionTo(DonorStateEnum::kError)); + ASSERT(simulateTransitionTo(DonorStateEnum::kDone)); + ASSERT(simulateTransitionTo(boost::none)); +} + +TEST_F(ReshardingCumulativeMetricsTest, + SimulatedSteppedDownDonorStateFromUnusedReportsStateCorrectly) { + using Role = ShardingDataTransformMetrics::Role; + ObserverMock donor{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kDonor}; + auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&donor); + + boost::optional<DonorStateEnum> initState = DonorStateEnum::kUnused; + ASSERT(checkDonorStateField(_reshardingCumulativeMetrics, initState)); + + _reshardingCumulativeMetrics->onStateTransition(initState, {boost::none}); + ASSERT(checkDonorStateField(_reshardingCumulativeMetrics, initState)); +} + +TEST_F(ReshardingCumulativeMetricsTest, + SimulatedSteppedDownDonorStateTransitionReportsStateCorrectly) { + using Role = ShardingDataTransformMetrics::Role; + ObserverMock donor{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kDonor}; + auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&donor); + + ASSERT(checkDonorStateField(_reshardingCumulativeMetrics, DonorStateEnum::kUnused)); + + boost::optional<DonorStateEnum> prevState; + boost::optional<DonorStateEnum> nextState; + + auto simulateTransitionTo = [&](boost::optional<DonorStateEnum> newState) { + prevState = nextState; + nextState = newState; + _reshardingCumulativeMetrics->onStateTransition(prevState, nextState); + return checkDonorStateField(_reshardingCumulativeMetrics, nextState); + }; + + ASSERT(simulateTransitionTo(DonorStateEnum::kUnused)); + ASSERT(simulateTransitionTo(DonorStateEnum::kPreparingToDonate)); + ASSERT(simulateTransitionTo(DonorStateEnum::kDonatingInitialData)); + ASSERT(simulateTransitionTo(boost::none)); +} + +TEST_F(ReshardingCumulativeMetricsTest, + SimulatedNormalRecipientStateTransitionReportsStateCorrectly) { + using Role = ShardingDataTransformMetrics::Role; + ObserverMock recipient{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kRecipient}; + auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&recipient); + + ASSERT(checkRecipientStateField(_reshardingCumulativeMetrics, RecipientStateEnum::kUnused)); + + boost::optional<RecipientStateEnum> prevState; + boost::optional<RecipientStateEnum> nextState; + + auto simulateTransitionTo = [&](boost::optional<RecipientStateEnum> newState) { + prevState = nextState; + nextState = newState; + _reshardingCumulativeMetrics->onStateTransition(prevState, nextState); + return checkRecipientStateField(_reshardingCumulativeMetrics, nextState); + }; + + ASSERT(simulateTransitionTo(RecipientStateEnum::kUnused)); + ASSERT(simulateTransitionTo(RecipientStateEnum::kAwaitingFetchTimestamp)); + ASSERT(simulateTransitionTo(RecipientStateEnum::kCreatingCollection)); + ASSERT(simulateTransitionTo(RecipientStateEnum::kCloning)); + ASSERT(simulateTransitionTo(RecipientStateEnum::kApplying)); + ASSERT(simulateTransitionTo(RecipientStateEnum::kStrictConsistency)); + ASSERT(simulateTransitionTo(RecipientStateEnum::kDone)); + ASSERT(simulateTransitionTo(boost::none)); +} + +TEST_F(ReshardingCumulativeMetricsTest, + SimulatedAbortedRecipientStateTransitionReportsStateCorrectly) { + using Role = ShardingDataTransformMetrics::Role; + ObserverMock recipient{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kRecipient}; + auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&recipient); + + ASSERT(checkRecipientStateField(_reshardingCumulativeMetrics, RecipientStateEnum::kUnused)); + + boost::optional<RecipientStateEnum> prevState; + boost::optional<RecipientStateEnum> nextState; + + auto simulateTransitionTo = [&](boost::optional<RecipientStateEnum> newState) { + prevState = nextState; + nextState = newState; + _reshardingCumulativeMetrics->onStateTransition(prevState, nextState); + return checkRecipientStateField(_reshardingCumulativeMetrics, nextState); + }; + + ASSERT(simulateTransitionTo(RecipientStateEnum::kUnused)); + ASSERT(simulateTransitionTo(RecipientStateEnum::kAwaitingFetchTimestamp)); + ASSERT(simulateTransitionTo(RecipientStateEnum::kError)); + ASSERT(simulateTransitionTo(boost::none)); +} + +TEST_F(ReshardingCumulativeMetricsTest, + SimulatedSteppedDownRecipientStateFromUnusedReportsStateCorrectly) { + using Role = ShardingDataTransformMetrics::Role; + ObserverMock recipient{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kRecipient}; + auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&recipient); + + boost::optional<RecipientStateEnum> initState = RecipientStateEnum::kUnused; + ASSERT(checkRecipientStateField(_reshardingCumulativeMetrics, initState)); + + _reshardingCumulativeMetrics->onStateTransition(initState, {boost::none}); + ASSERT(checkRecipientStateField(_reshardingCumulativeMetrics, initState)); +} + +TEST_F(ReshardingCumulativeMetricsTest, + SimulatedSteppedDownRecipientStateTransitionReportsStateCorrectly) { + using Role = ShardingDataTransformMetrics::Role; + ObserverMock recipient{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kRecipient}; + auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&recipient); + + ASSERT(checkRecipientStateField(_reshardingCumulativeMetrics, RecipientStateEnum::kUnused)); + + boost::optional<RecipientStateEnum> prevState; + boost::optional<RecipientStateEnum> nextState; + + auto simulateTransitionTo = [&](boost::optional<RecipientStateEnum> newState) { + prevState = nextState; + nextState = newState; + _reshardingCumulativeMetrics->onStateTransition(prevState, nextState); + return checkRecipientStateField(_reshardingCumulativeMetrics, nextState); + }; + + ASSERT(simulateTransitionTo(RecipientStateEnum::kUnused)); + ASSERT(simulateTransitionTo(RecipientStateEnum::kAwaitingFetchTimestamp)); + ASSERT(simulateTransitionTo(RecipientStateEnum::kCreatingCollection)); + ASSERT(simulateTransitionTo(boost::none)); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_metrics.cpp b/src/mongo/db/s/resharding/resharding_metrics.cpp index 69d2e899d84..f0faa8ddd2c 100644 --- a/src/mongo/db/s/resharding/resharding_metrics.cpp +++ b/src/mongo/db/s/resharding/resharding_metrics.cpp @@ -120,6 +120,10 @@ std::string ReshardingMetrics::createOperationDescription() const noexcept { _instanceId.toString()); } +ReshardingCumulativeMetrics* ReshardingMetrics::getReshardingCumulativeMetrics() { + return dynamic_cast<ReshardingCumulativeMetrics*>(getCumulativeMetrics()); +} + Milliseconds ReshardingMetrics::getRecipientHighEstimateRemainingTimeMillis() const { auto estimate = resharding::estimateRemainingRecipientTime(_applyingStartTime.load() != kNoDate, getBytesWrittenCount(), @@ -222,9 +226,8 @@ void ReshardingMetrics::restoreCoordinatorSpecificFields( ReshardingMetrics::DonorState::DonorState(DonorStateEnum enumVal) : _enumVal(enumVal) {} -ShardingDataTransformCumulativeMetrics::DonorStateEnum ReshardingMetrics::DonorState::toMetrics() - const { - using MetricsEnum = ShardingDataTransformCumulativeMetrics::DonorStateEnum; +ReshardingCumulativeMetrics::DonorStateEnum ReshardingMetrics::DonorState::toMetrics() const { + using MetricsEnum = ReshardingCumulativeMetrics::DonorStateEnum; switch (_enumVal) { case DonorStateEnum::kUnused: @@ -264,9 +267,9 @@ DonorStateEnum ReshardingMetrics::DonorState::getState() const { ReshardingMetrics::RecipientState::RecipientState(RecipientStateEnum enumVal) : _enumVal(enumVal) {} -ShardingDataTransformCumulativeMetrics::RecipientStateEnum -ReshardingMetrics::RecipientState::toMetrics() const { - using MetricsEnum = ShardingDataTransformCumulativeMetrics::RecipientStateEnum; +ReshardingCumulativeMetrics::RecipientStateEnum ReshardingMetrics::RecipientState::toMetrics() + const { + using MetricsEnum = ReshardingCumulativeMetrics::RecipientStateEnum; switch (_enumVal) { case RecipientStateEnum::kUnused: @@ -308,35 +311,37 @@ RecipientStateEnum ReshardingMetrics::RecipientState::getState() const { ReshardingMetrics::CoordinatorState::CoordinatorState(CoordinatorStateEnum enumVal) : _enumVal(enumVal) {} -ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum -ReshardingMetrics::CoordinatorState::toMetrics() const { +ReshardingCumulativeMetrics::CoordinatorStateEnum ReshardingMetrics::CoordinatorState::toMetrics() + const { + using MetricsEnum = ReshardingCumulativeMetrics::CoordinatorStateEnum; + switch (_enumVal) { case CoordinatorStateEnum::kUnused: - return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kUnused; + return MetricsEnum::kUnused; case CoordinatorStateEnum::kInitializing: - return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kInitializing; + return MetricsEnum::kInitializing; case CoordinatorStateEnum::kPreparingToDonate: - return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kPreparingToDonate; + return MetricsEnum::kPreparingToDonate; case CoordinatorStateEnum::kCloning: - return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kCloning; + return MetricsEnum::kCloning; case CoordinatorStateEnum::kApplying: - return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kApplying; + return MetricsEnum::kApplying; case CoordinatorStateEnum::kBlockingWrites: - return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kBlockingWrites; + return MetricsEnum::kBlockingWrites; case CoordinatorStateEnum::kAborting: - return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kAborting; + return MetricsEnum::kAborting; case CoordinatorStateEnum::kCommitting: - return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kCommitting; + return MetricsEnum::kCommitting; case CoordinatorStateEnum::kDone: - return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kDone; + return MetricsEnum::kDone; default: invariant(false, str::stream() << "Unexpected resharding coordinator state: " @@ -351,22 +356,22 @@ CoordinatorStateEnum ReshardingMetrics::CoordinatorState::getState() const { void ReshardingMetrics::onDeleteApplied() { _deletesApplied.addAndFetch(1); - getCumulativeMetrics()->onDeleteApplied(); + getReshardingCumulativeMetrics()->onDeleteApplied(); } void ReshardingMetrics::onInsertApplied() { _insertsApplied.addAndFetch(1); - getCumulativeMetrics()->onInsertApplied(); + getReshardingCumulativeMetrics()->onInsertApplied(); } void ReshardingMetrics::onUpdateApplied() { _updatesApplied.addAndFetch(1); - getCumulativeMetrics()->onUpdateApplied(); + getReshardingCumulativeMetrics()->onUpdateApplied(); } void ReshardingMetrics::onOplogEntriesFetched(int64_t numEntries, Milliseconds elapsed) { _oplogEntriesFetched.addAndFetch(numEntries); - getCumulativeMetrics()->onOplogEntriesFetched(numEntries, elapsed); + getReshardingCumulativeMetrics()->onOplogEntriesFetched(numEntries, elapsed); } void ReshardingMetrics::restoreOplogEntriesFetched(int64_t numEntries) { @@ -375,13 +380,25 @@ void ReshardingMetrics::restoreOplogEntriesFetched(int64_t numEntries) { void ReshardingMetrics::onOplogEntriesApplied(int64_t numEntries) { _oplogEntriesApplied.addAndFetch(numEntries); - getCumulativeMetrics()->onOplogEntriesApplied(numEntries); + getReshardingCumulativeMetrics()->onOplogEntriesApplied(numEntries); } void ReshardingMetrics::restoreOplogEntriesApplied(int64_t numEntries) { _oplogEntriesApplied.store(numEntries); } +void ReshardingMetrics::onLocalInsertDuringOplogFetching(Milliseconds elapsed) { + getReshardingCumulativeMetrics()->onLocalInsertDuringOplogFetching(elapsed); +} + +void ReshardingMetrics::onBatchRetrievedDuringOplogApplying(Milliseconds elapsed) { + getReshardingCumulativeMetrics()->onBatchRetrievedDuringOplogApplying(elapsed); +} + +void ReshardingMetrics::onOplogLocalBatchApplied(Milliseconds elapsed) { + getReshardingCumulativeMetrics()->onOplogLocalBatchApplied(elapsed); +} + void ReshardingMetrics::onApplyingBegin() { _applyingStartTime.store(getClockSource()->now()); } diff --git a/src/mongo/db/s/resharding/resharding_metrics.h b/src/mongo/db/s/resharding/resharding_metrics.h index 909f78e8c80..26a82a3ba89 100644 --- a/src/mongo/db/s/resharding/resharding_metrics.h +++ b/src/mongo/db/s/resharding/resharding_metrics.h @@ -31,6 +31,7 @@ #include "mongo/bson/bsonobj.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/s/resharding/resharding_cumulative_metrics.h" #include "mongo/db/s/resharding/resharding_metrics_field_name_provider.h" #include "mongo/db/s/resharding/resharding_metrics_helpers.h" #include "mongo/db/s/resharding/resharding_oplog_applier_progress_gen.h" @@ -44,7 +45,7 @@ public: using State = stdx::variant<CoordinatorStateEnum, RecipientStateEnum, DonorStateEnum>; class DonorState { public: - using MetricsType = ShardingDataTransformCumulativeMetrics::DonorStateEnum; + using MetricsType = ReshardingCumulativeMetrics::DonorStateEnum; explicit DonorState(DonorStateEnum enumVal); MetricsType toMetrics() const; @@ -56,7 +57,7 @@ public: class RecipientState { public: - using MetricsType = ShardingDataTransformCumulativeMetrics::RecipientStateEnum; + using MetricsType = ReshardingCumulativeMetrics::RecipientStateEnum; explicit RecipientState(RecipientStateEnum enumVal); MetricsType toMetrics() const; @@ -68,7 +69,7 @@ public: class CoordinatorState { public: - using MetricsType = ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum; + using MetricsType = ReshardingCumulativeMetrics::CoordinatorStateEnum; explicit CoordinatorState(CoordinatorStateEnum enumVal); MetricsType toMetrics() const; @@ -123,22 +124,22 @@ public: template <typename T> void onStateTransition(T before, boost::none_t after) { - getCumulativeMetrics()->onStateTransition<typename T::MetricsType>(before.toMetrics(), - after); + getReshardingCumulativeMetrics()->onStateTransition<typename T::MetricsType>( + before.toMetrics(), after); } template <typename T> void onStateTransition(boost::none_t before, T after) { setState(after.getState()); - getCumulativeMetrics()->onStateTransition<typename T::MetricsType>(before, - after.toMetrics()); + getReshardingCumulativeMetrics()->onStateTransition<typename T::MetricsType>( + before, after.toMetrics()); } template <typename T> void onStateTransition(T before, T after) { setState(after.getState()); - getCumulativeMetrics()->onStateTransition<typename T::MetricsType>(before.toMetrics(), - after.toMetrics()); + getReshardingCumulativeMetrics()->onStateTransition<typename T::MetricsType>( + before.toMetrics(), after.toMetrics()); } void accumulateFrom(const ReshardingOplogApplierProgress& progressDoc); @@ -153,6 +154,9 @@ public: void restoreOplogEntriesApplied(int64_t numEntries); void onApplyingBegin(); void onApplyingEnd(); + void onLocalInsertDuringOplogFetching(Milliseconds elapsed); + void onBatchRetrievedDuringOplogApplying(Milliseconds elapsed); + void onOplogLocalBatchApplied(Milliseconds elapsed); Seconds getApplyingElapsedTimeSecs() const; Date_t getApplyingBegin() const; @@ -168,6 +172,7 @@ private: std::string createOperationDescription() const noexcept override; void restoreRecipientSpecificFields(const ReshardingRecipientDocument& document); void restoreCoordinatorSpecificFields(const ReshardingCoordinatorDocument& document); + ReshardingCumulativeMetrics* getReshardingCumulativeMetrics(); template <typename T> void setState(T state) { diff --git a/src/mongo/db/s/resharding/resharding_metrics_test.cpp b/src/mongo/db/s/resharding/resharding_metrics_test.cpp index 1e20c778071..aa18742fb7c 100644 --- a/src/mongo/db/s/resharding/resharding_metrics_test.cpp +++ b/src/mongo/db/s/resharding/resharding_metrics_test.cpp @@ -49,6 +49,11 @@ const auto kShardKey = BSON("newKey" << 1); class ReshardingMetricsTest : public ShardingDataTransformMetricsTestFixture { public: + virtual std::unique_ptr<ShardingDataTransformCumulativeMetrics> initializeCumulativeMetrics() + override { + return std::make_unique<ReshardingCumulativeMetrics>(); + } + std::unique_ptr<ReshardingMetrics> createInstanceMetrics(ClockSource* clockSource, UUID instanceId = UUID::gen(), Role role = Role::kDonor) { @@ -58,7 +63,7 @@ public: role, clockSource->now(), clockSource, - &_cumulativeMetrics); + _cumulativeMetrics.get()); } const UUID& getSourceCollectionId() { @@ -69,7 +74,7 @@ public: template <typename T> BSONObj getReportFromStateDocument(T document) { auto metrics = - ReshardingMetrics::initializeFrom(document, getClockSource(), &_cumulativeMetrics); + ReshardingMetrics::initializeFrom(document, getClockSource(), _cumulativeMetrics.get()); return metrics->reportForCurrentOp(); } @@ -169,7 +174,7 @@ public: doc.setMetrics(metricsDoc); auto metrics = - ReshardingMetrics::initializeFrom(doc, getClockSource(), &_cumulativeMetrics); + ReshardingMetrics::initializeFrom(doc, getClockSource(), _cumulativeMetrics.get()); clock->advance(kInterval); auto report = metrics->reportForCurrentOp(); diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier_metrics_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier_metrics_test.cpp index 7c04439713a..4e5e29943bf 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_applier_metrics_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_applier_metrics_test.cpp @@ -42,6 +42,11 @@ namespace { class ReshardingOplogApplierMetricsTest : public ShardingDataTransformMetricsTestFixture { public: + virtual std::unique_ptr<ShardingDataTransformCumulativeMetrics> initializeCumulativeMetrics() + override { + return std::make_unique<ReshardingCumulativeMetrics>(); + } + std::unique_ptr<ReshardingMetrics> createInstanceMetrics() { return std::make_unique<ReshardingMetrics>(UUID::gen(), kTestCommand, @@ -49,7 +54,7 @@ public: ReshardingMetrics::Role::kRecipient, getClockSource()->now(), getClockSource(), - &_cumulativeMetrics); + _cumulativeMetrics.get()); } }; diff --git a/src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp b/src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp index 5d7d4e052fe..70c5252a96b 100644 --- a/src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp +++ b/src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp @@ -28,6 +28,7 @@ */ #include "mongo/db/s/sharding_data_transform_cumulative_metrics.h" +#include "mongo/db/s/resharding/resharding_cumulative_metrics.h" #include <cstdint> @@ -37,74 +38,18 @@ namespace mongo { namespace { -constexpr int32_t kPlaceholderInt = 0; -constexpr int64_t kPlaceholderLong = 0; -} // namespace - -namespace { -constexpr auto kResharding = "resharding"; constexpr auto kGlobalIndex = "globalIndex"; constexpr auto kActive = "active"; -constexpr auto kOplogEntriesFetched = "oplogEntriesFetched"; -constexpr auto kOplogEntriesApplied = "oplogEntriesApplied"; -constexpr auto kInsertsApplied = "insertsApplied"; -constexpr auto kUpdatesApplied = "updatesApplied"; -constexpr auto kDeletesApplied = "deletesApplied"; constexpr auto kOldestActive = "oldestActive"; constexpr auto kLatencies = "latencies"; -constexpr auto kOplogFetchingTotalRemoteBatchRetrievalTimeMillis = - "oplogFetchingTotalRemoteBatchRetrievalTimeMillis"; -constexpr auto kOplogFetchingTotalRemoteBatchesRetrieved = - "oplogFetchingTotalRemoteBatchesRetrieved"; -constexpr auto kOplogFetchingTotalLocalInsertTimeMillis = "oplogFetchingTotalLocalInsertTimeMillis"; -constexpr auto kOplogFetchingTotalLocalInserts = "oplogFetchingTotalLocalInserts"; -constexpr auto kOplogApplyingTotalLocalBatchRetrievalTimeMillis = - "oplogApplyingTotalLocalBatchRetrievalTimeMillis"; -constexpr auto kOplogApplyingTotalLocalBatchesRetrieved = "oplogApplyingTotalLocalBatchesRetrieved"; -constexpr auto kOplogApplyingTotalLocalBatchApplyTimeMillis = - "oplogApplyingTotalLocalBatchApplyTimeMillis"; -constexpr auto kOplogApplyingTotalLocalBatchesApplied = "oplogApplyingTotalLocalBatchesApplied"; constexpr auto kCurrentInSteps = "currentInSteps"; -constexpr auto kCountInstancesInCoordinatorState1Initializing = - "countInstancesInCoordinatorState1Initializing"; -constexpr auto kCountInstancesInCoordinatorState2PreparingToDonate = - "countInstancesInCoordinatorState2PreparingToDonate"; -constexpr auto kCountInstancesInCoordinatorState3Cloning = - "countInstancesInCoordinatorState3Cloning"; -constexpr auto kCountInstancesInCoordinatorState4Applying = - "countInstancesInCoordinatorState4Applying"; -constexpr auto kCountInstancesInCoordinatorState5BlockingWrites = - "countInstancesInCoordinatorState5BlockingWrites"; -constexpr auto kCountInstancesInCoordinatorState6Aborting = - "countInstancesInCoordinatorState6Aborting"; -constexpr auto kCountInstancesInCoordinatorState7Committing = - "countInstancesInCoordinatorState7Committing"; -constexpr auto kCountInstancesInRecipientState1AwaitingFetchTimestamp = - "countInstancesInRecipientState1AwaitingFetchTimestamp"; -constexpr auto kCountInstancesInRecipientState2CreatingCollection = - "countInstancesInRecipientState2CreatingCollection"; -constexpr auto kCountInstancesInRecipientState3Cloning = "countInstancesInRecipientState3Cloning"; -constexpr auto kCountInstancesInRecipientState4Applying = "countInstancesInRecipientState4Applying"; -constexpr auto kCountInstancesInRecipientState5Error = "countInstancesInRecipientState5Error"; -constexpr auto kCountInstancesInRecipientState6StrictConsistency = - "countInstancesInRecipientState6StrictConsistency"; -constexpr auto kCountInstancesInRecipientState7Done = "countInstancesInRecipientState7Done"; -constexpr auto kCountInstancesInDonorState1PreparingToDonate = - "countInstancesInDonorState1PreparingToDonate"; -constexpr auto kCountInstancesInDonorState2DonatingInitialData = - "countInstancesInDonorState2DonatingInitialData"; -constexpr auto kCountInstancesInDonorState3DonatingOplogEntries = - "countInstancesInDonorState3DonatingOplogEntries"; -constexpr auto kCountInstancesInDonorState4PreparingToBlockWrites = - "countInstancesInDonorState4PreparingToBlockWrites"; -constexpr auto kCountInstancesInDonorState5Error = "countInstancesInDonorState5Error"; -constexpr auto kCountInstancesInDonorState6BlockingWrites = - "countInstancesInDonorState6BlockingWrites"; -constexpr auto kCountInstancesInDonorState7Done = "countInstancesInDonorState7Done"; struct Metrics { - Metrics() : _resharding(kResharding), _globalIndexes(kGlobalIndex) {} - ShardingDataTransformCumulativeMetrics _resharding; + Metrics() + : _globalIndexes( + kGlobalIndex, + std::make_unique<ShardingDataTransformCumulativeMetricsFieldNamePlaceholder>()) {} + ReshardingCumulativeMetrics _resharding; ShardingDataTransformCumulativeMetrics _globalIndexes; }; using MetricsPtr = std::unique_ptr<Metrics>; @@ -129,33 +74,11 @@ ShardingDataTransformCumulativeMetrics* ShardingDataTransformCumulativeMetrics:: } ShardingDataTransformCumulativeMetrics::ShardingDataTransformCumulativeMetrics( - const std::string& rootSectionName) + const std::string& rootSectionName, std::unique_ptr<NameProvider> fieldNameProvider) : _rootSectionName{rootSectionName}, - _fieldNames{std::make_unique<ShardingDataTransformCumulativeMetricsFieldNamePlaceholder>()}, + _fieldNames{std::move(fieldNameProvider)}, _instanceMetricsForAllRoles(ShardingDataTransformMetrics::kRoleCount), - _operationWasAttempted{false}, - _coordinatorStateList{AtomicWord<int64_t>{0}, - AtomicWord<int64_t>{0}, - AtomicWord<int64_t>{0}, - AtomicWord<int64_t>{0}, - AtomicWord<int64_t>{0}, - AtomicWord<int64_t>{0}, - AtomicWord<int64_t>{0}, - AtomicWord<int64_t>{0}}, - _donorStateList{AtomicWord<int64_t>{0}, - AtomicWord<int64_t>{0}, - AtomicWord<int64_t>{0}, - AtomicWord<int64_t>{0}, - AtomicWord<int64_t>{0}, - AtomicWord<int64_t>{0}, - AtomicWord<int64_t>{0}}, - _recipientStateList{AtomicWord<int64_t>{0}, - AtomicWord<int64_t>{0}, - AtomicWord<int64_t>{0}, - AtomicWord<int64_t>{0}, - AtomicWord<int64_t>{0}, - AtomicWord<int64_t>{0}, - AtomicWord<int64_t>{0}} {} + _operationWasAttempted{false} {} ShardingDataTransformCumulativeMetrics::UniqueScopedObserver ShardingDataTransformCumulativeMetrics::registerInstanceMetrics(const InstanceObserver* metrics) { @@ -208,94 +131,59 @@ void ShardingDataTransformCumulativeMetrics::reportForServerStatus(BSONObjBuilde root.append(_fieldNames->getForCountCanceled(), _countCancelled.load()); root.append(_fieldNames->getForLastOpEndingChunkImbalance(), _lastOpEndingChunkImbalance.load()); - - reportActive(&root); - reportOldestActive(&root); - reportLatencies(&root); - reportCurrentInSteps(&root); + { + BSONObjBuilder active(bob->subobjStart(kActive)); + reportActive(&active); + } + { + BSONObjBuilder oldest(bob->subobjStart(kOldestActive)); + reportOldestActive(&oldest); + } + { + BSONObjBuilder latencies(bob->subobjStart(kLatencies)); + reportLatencies(&latencies); + } + { + BSONObjBuilder steps(bob->subobjStart(kCurrentInSteps)); + reportCurrentInSteps(&steps); + } } void ShardingDataTransformCumulativeMetrics::reportActive(BSONObjBuilder* bob) const { - BSONObjBuilder s(bob->subobjStart(kActive)); - s.append(_fieldNames->getForDocumentsProcessed(), _documentsProcessed.load()); - s.append(_fieldNames->getForBytesWritten(), _bytesWritten.load()); - s.append(kOplogEntriesFetched, _oplogEntriesFetched.load()); - s.append(kOplogEntriesApplied, _oplogEntriesApplied.load()); - s.append(kInsertsApplied, _insertsApplied.load()); - s.append(kUpdatesApplied, _updatesApplied.load()); - s.append(kDeletesApplied, _deletesApplied.load()); - s.append(_fieldNames->getForCountWritesToStashCollections(), - _writesToStashedCollections.load()); - s.append(_fieldNames->getForCountWritesDuringCriticalSection(), - _writesDuringCriticalSection.load()); - s.append(_fieldNames->getForCountReadsDuringCriticalSection(), - _readsDuringCriticalSection.load()); + bob->append(_fieldNames->getForDocumentsProcessed(), _documentsProcessed.load()); + bob->append(_fieldNames->getForBytesWritten(), _bytesWritten.load()); + bob->append(_fieldNames->getForCountWritesToStashCollections(), + _writesToStashedCollections.load()); + bob->append(_fieldNames->getForCountWritesDuringCriticalSection(), + _writesDuringCriticalSection.load()); + bob->append(_fieldNames->getForCountReadsDuringCriticalSection(), + _readsDuringCriticalSection.load()); } void ShardingDataTransformCumulativeMetrics::reportOldestActive(BSONObjBuilder* bob) const { - BSONObjBuilder s(bob->subobjStart(kOldestActive)); - s.append(_fieldNames->getForCoordinatorAllShardsHighestRemainingOperationTimeEstimatedMillis(), - getOldestOperationHighEstimateRemainingTimeMillis(Role::kCoordinator)); - s.append(_fieldNames->getForCoordinatorAllShardsLowestRemainingOperationTimeEstimatedMillis(), - getOldestOperationLowEstimateRemainingTimeMillis(Role::kCoordinator)); - s.append(_fieldNames->getForRecipientRemainingOperationTimeEstimatedMillis(), - getOldestOperationHighEstimateRemainingTimeMillis(Role::kRecipient)); + bob->append( + _fieldNames->getForCoordinatorAllShardsHighestRemainingOperationTimeEstimatedMillis(), + getOldestOperationHighEstimateRemainingTimeMillis(Role::kCoordinator)); + bob->append( + _fieldNames->getForCoordinatorAllShardsLowestRemainingOperationTimeEstimatedMillis(), + getOldestOperationLowEstimateRemainingTimeMillis(Role::kCoordinator)); + bob->append(_fieldNames->getForRecipientRemainingOperationTimeEstimatedMillis(), + getOldestOperationHighEstimateRemainingTimeMillis(Role::kRecipient)); } void ShardingDataTransformCumulativeMetrics::reportLatencies(BSONObjBuilder* bob) const { - BSONObjBuilder s(bob->subobjStart(kLatencies)); - s.append(_fieldNames->getForCollectionCloningTotalRemoteBatchRetrievalTimeMillis(), - _totalBatchRetrievedDuringCloneMillis.load()); - s.append(_fieldNames->getForCollectionCloningTotalRemoteBatchesRetrieved(), - _totalBatchRetrievedDuringClone.load()); - s.append(_fieldNames->getForCollectionCloningTotalLocalInsertTimeMillis(), - _collectionCloningTotalLocalInsertTimeMillis.load()); - s.append(_fieldNames->getForCollectionCloningTotalLocalInserts(), - _collectionCloningTotalLocalBatchInserts.load()); - s.append(kOplogFetchingTotalRemoteBatchRetrievalTimeMillis, - _oplogFetchingTotalRemoteBatchesRetrievalTimeMillis.load()); - s.append(kOplogFetchingTotalRemoteBatchesRetrieved, - _oplogFetchingTotalRemoteBatchesRetrieved.load()); - s.append(kOplogFetchingTotalLocalInsertTimeMillis, - _oplogFetchingTotalLocalInsertTimeMillis.load()); - s.append(kOplogFetchingTotalLocalInserts, _oplogFetchingTotalLocalInserts.load()); - s.append(kOplogApplyingTotalLocalBatchRetrievalTimeMillis, - _oplogApplyingTotalBatchesRetrievalTimeMillis.load()); - s.append(kOplogApplyingTotalLocalBatchesRetrieved, _oplogApplyingTotalBatchesRetrieved.load()); - s.append(kOplogApplyingTotalLocalBatchApplyTimeMillis, _oplogBatchAppliedMillis.load()); - s.append(kOplogApplyingTotalLocalBatchesApplied, _oplogBatchApplied.load()); + bob->append(_fieldNames->getForCollectionCloningTotalRemoteBatchRetrievalTimeMillis(), + _totalBatchRetrievedDuringCloneMillis.load()); + bob->append(_fieldNames->getForCollectionCloningTotalRemoteBatchesRetrieved(), + _totalBatchRetrievedDuringClone.load()); + bob->append(_fieldNames->getForCollectionCloningTotalLocalInsertTimeMillis(), + _collectionCloningTotalLocalInsertTimeMillis.load()); + bob->append(_fieldNames->getForCollectionCloningTotalLocalInserts(), + _collectionCloningTotalLocalBatchInserts.load()); } void ShardingDataTransformCumulativeMetrics::reportCurrentInSteps(BSONObjBuilder* bob) const { - BSONObjBuilder s(bob->subobjStart(kCurrentInSteps)); - - auto reportState = [this, &s](auto state) { - s.append(fieldNameFor(state), getStateCounter(state)->load()); - }; - - reportState(CoordinatorStateEnum::kInitializing); - reportState(CoordinatorStateEnum::kPreparingToDonate); - reportState(CoordinatorStateEnum::kCloning); - reportState(CoordinatorStateEnum::kApplying); - reportState(CoordinatorStateEnum::kBlockingWrites); - reportState(CoordinatorStateEnum::kAborting); - reportState(CoordinatorStateEnum::kCommitting); - - reportState(RecipientStateEnum::kAwaitingFetchTimestamp); - reportState(RecipientStateEnum::kCreatingCollection); - reportState(RecipientStateEnum::kCloning); - reportState(RecipientStateEnum::kApplying); - reportState(RecipientStateEnum::kError); - reportState(RecipientStateEnum::kStrictConsistency); - reportState(RecipientStateEnum::kDone); - - reportState(DonorStateEnum::kPreparingToDonate); - reportState(DonorStateEnum::kDonatingInitialData); - reportState(DonorStateEnum::kDonatingOplogEntries); - reportState(DonorStateEnum::kPreparingToBlockWrites); - reportState(DonorStateEnum::kError); - reportState(DonorStateEnum::kBlockingWrites); - reportState(DonorStateEnum::kDone); + // Do nothing. } const ShardingDataTransformCumulativeMetrics::InstanceObserver* @@ -354,60 +242,6 @@ void ShardingDataTransformCumulativeMetrics::setLastOpEndingChunkImbalance(int64 _lastOpEndingChunkImbalance.store(imbalanceCount); } -ShardingDataTransformCumulativeMetrics::CoordinatorStateArray* -ShardingDataTransformCumulativeMetrics::getStateArrayFor(CoordinatorStateEnum state) { - return &_coordinatorStateList; -} - -const ShardingDataTransformCumulativeMetrics::CoordinatorStateArray* -ShardingDataTransformCumulativeMetrics::getStateArrayFor(CoordinatorStateEnum state) const { - return &_coordinatorStateList; -} - -ShardingDataTransformCumulativeMetrics::DonorStateArray* -ShardingDataTransformCumulativeMetrics::getStateArrayFor(DonorStateEnum state) { - return &_donorStateList; -} - -const ShardingDataTransformCumulativeMetrics::DonorStateArray* -ShardingDataTransformCumulativeMetrics::getStateArrayFor(DonorStateEnum state) const { - return &_donorStateList; -} - -const char* ShardingDataTransformCumulativeMetrics::fieldNameFor( - ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum state) { - switch (state) { - case CoordinatorStateEnum::kInitializing: - return kCountInstancesInCoordinatorState1Initializing; - - case CoordinatorStateEnum::kPreparingToDonate: - return kCountInstancesInCoordinatorState2PreparingToDonate; - - case CoordinatorStateEnum::kCloning: - return kCountInstancesInCoordinatorState3Cloning; - - case CoordinatorStateEnum::kApplying: - return kCountInstancesInCoordinatorState4Applying; - - case CoordinatorStateEnum::kBlockingWrites: - return kCountInstancesInCoordinatorState5BlockingWrites; - - case CoordinatorStateEnum::kAborting: - return kCountInstancesInCoordinatorState6Aborting; - - case CoordinatorStateEnum::kCommitting: - return kCountInstancesInCoordinatorState7Committing; - - default: - uasserted(6438601, - str::stream() - << "no field name for coordinator state " << static_cast<int32_t>(state)); - break; - } - - MONGO_UNREACHABLE; -} - void ShardingDataTransformCumulativeMetrics::onInsertsDuringCloning( int64_t count, int64_t bytes, const Milliseconds& elapsedTime) { _collectionCloningTotalLocalBatchInserts.fetchAndAdd(1); @@ -417,53 +251,6 @@ void ShardingDataTransformCumulativeMetrics::onInsertsDuringCloning( durationCount<Milliseconds>(elapsedTime)); } -void ShardingDataTransformCumulativeMetrics::onLocalInsertDuringOplogFetching( - const Milliseconds& elapsedTime) { - _oplogFetchingTotalLocalInserts.fetchAndAdd(1); - _oplogFetchingTotalLocalInsertTimeMillis.fetchAndAdd(durationCount<Milliseconds>(elapsedTime)); -} - -void ShardingDataTransformCumulativeMetrics::onBatchRetrievedDuringOplogApplying( - const Milliseconds& elapsedTime) { - _oplogApplyingTotalBatchesRetrieved.fetchAndAdd(1); - _oplogApplyingTotalBatchesRetrievalTimeMillis.fetchAndAdd( - durationCount<Milliseconds>(elapsedTime)); -} - -const char* ShardingDataTransformCumulativeMetrics::fieldNameFor( - ShardingDataTransformCumulativeMetrics::DonorStateEnum state) { - switch (state) { - case DonorStateEnum::kPreparingToDonate: - return kCountInstancesInDonorState1PreparingToDonate; - - case DonorStateEnum::kDonatingInitialData: - return kCountInstancesInDonorState2DonatingInitialData; - - case DonorStateEnum::kDonatingOplogEntries: - return kCountInstancesInDonorState3DonatingOplogEntries; - - case DonorStateEnum::kPreparingToBlockWrites: - return kCountInstancesInDonorState4PreparingToBlockWrites; - - case DonorStateEnum::kError: - return kCountInstancesInDonorState5Error; - - case DonorStateEnum::kBlockingWrites: - return kCountInstancesInDonorState6BlockingWrites; - - case DonorStateEnum::kDone: - return kCountInstancesInDonorState7Done; - - default: - uasserted(6438700, - str::stream() - << "no field name for donor state " << static_cast<int32_t>(state)); - break; - } - - MONGO_UNREACHABLE; -} - void ShardingDataTransformCumulativeMetrics::onReadDuringCriticalSection() { _readsDuringCriticalSection.fetchAndAdd(1); } @@ -472,88 +259,19 @@ void ShardingDataTransformCumulativeMetrics::onWriteDuringCriticalSection() { _writesDuringCriticalSection.fetchAndAdd(1); } -const char* ShardingDataTransformCumulativeMetrics::fieldNameFor(RecipientStateEnum state) { - switch (state) { - case RecipientStateEnum::kAwaitingFetchTimestamp: - return kCountInstancesInRecipientState1AwaitingFetchTimestamp; - - case RecipientStateEnum::kCreatingCollection: - return kCountInstancesInRecipientState2CreatingCollection; - - case RecipientStateEnum::kCloning: - return kCountInstancesInRecipientState3Cloning; - - case RecipientStateEnum::kApplying: - return kCountInstancesInRecipientState4Applying; - - case RecipientStateEnum::kError: - return kCountInstancesInRecipientState5Error; - - case RecipientStateEnum::kStrictConsistency: - return kCountInstancesInRecipientState6StrictConsistency; - - case RecipientStateEnum::kDone: - return kCountInstancesInRecipientState7Done; - - default: - uasserted(6438900, - str::stream() - << "no field name for recipient state " << static_cast<int32_t>(state)); - break; - } - - MONGO_UNREACHABLE; -} - void ShardingDataTransformCumulativeMetrics::onWriteToStashedCollections() { _writesToStashedCollections.fetchAndAdd(1); } -ShardingDataTransformCumulativeMetrics::RecipientStateArray* -ShardingDataTransformCumulativeMetrics::getStateArrayFor( - ShardingDataTransformCumulativeMetrics::RecipientStateEnum state) { - return &_recipientStateList; -} - -const ShardingDataTransformCumulativeMetrics::RecipientStateArray* -ShardingDataTransformCumulativeMetrics::getStateArrayFor( - ShardingDataTransformCumulativeMetrics::RecipientStateEnum state) const { - return &_recipientStateList; -} - -void ShardingDataTransformCumulativeMetrics::onInsertApplied() { - _insertsApplied.fetchAndAdd(1); -} - -void ShardingDataTransformCumulativeMetrics::onUpdateApplied() { - _updatesApplied.fetchAndAdd(1); -} - -void ShardingDataTransformCumulativeMetrics::onDeleteApplied() { - _deletesApplied.fetchAndAdd(1); -} - -void ShardingDataTransformCumulativeMetrics::onOplogEntriesFetched(int64_t numEntries, - Milliseconds elapsed) { - _oplogEntriesFetched.fetchAndAdd(numEntries); - _oplogFetchingTotalRemoteBatchesRetrieved.fetchAndAdd(1); - _oplogFetchingTotalRemoteBatchesRetrievalTimeMillis.fetchAndAdd( - durationCount<Milliseconds>(elapsed)); -} - -void ShardingDataTransformCumulativeMetrics::onOplogEntriesApplied(int64_t numEntries) { - _oplogEntriesApplied.fetchAndAdd(numEntries); -} - void ShardingDataTransformCumulativeMetrics::onCloningTotalRemoteBatchRetrieval( Milliseconds elapsed) { _totalBatchRetrievedDuringClone.fetchAndAdd(1); _totalBatchRetrievedDuringCloneMillis.fetchAndAdd(durationCount<Milliseconds>(elapsed)); } -void ShardingDataTransformCumulativeMetrics::onOplogLocalBatchApplied(Milliseconds elapsed) { - _oplogBatchApplied.fetchAndAdd(1); - _oplogBatchAppliedMillis.fetchAndAdd(durationCount<Milliseconds>(elapsed)); +const ShardingDataTransformCumulativeMetricsFieldNameProvider* +ShardingDataTransformCumulativeMetrics::getFieldNames() const { + return _fieldNames.get(); } ShardingDataTransformCumulativeMetrics::ScopedObserver::ScopedObserver( diff --git a/src/mongo/db/s/sharding_data_transform_cumulative_metrics.h b/src/mongo/db/s/sharding_data_transform_cumulative_metrics.h index f09aebce244..814ef0790d4 100644 --- a/src/mongo/db/s/sharding_data_transform_cumulative_metrics.h +++ b/src/mongo/db/s/sharding_data_transform_cumulative_metrics.h @@ -43,43 +43,6 @@ namespace mongo { class ShardingDataTransformCumulativeMetrics { public: - enum class CoordinatorStateEnum : int32_t { - kUnused = -1, - kInitializing, - kPreparingToDonate, - kCloning, - kApplying, - kBlockingWrites, - kAborting, - kCommitting, - kDone, - kNumStates - }; - - enum class DonorStateEnum : int32_t { - kUnused = -1, - kPreparingToDonate, - kDonatingInitialData, - kDonatingOplogEntries, - kPreparingToBlockWrites, - kError, - kBlockingWrites, - kDone, - kNumStates - }; - - enum class RecipientStateEnum : int32_t { - kUnused = -1, - kAwaitingFetchTimestamp, - kCreatingCollection, - kCloning, - kApplying, - kError, - kStrictConsistency, - kDone, - kNumStates - }; - using NameProvider = ShardingDataTransformCumulativeMetricsFieldNameProvider; using Role = ShardingDataTransformMetrics::Role; using InstanceObserver = ShardingDataTransformMetricsObserverInterface; @@ -122,7 +85,10 @@ public: static ShardingDataTransformCumulativeMetrics* getForResharding(ServiceContext* context); static ShardingDataTransformCumulativeMetrics* getForGlobalIndexes(ServiceContext* context); - ShardingDataTransformCumulativeMetrics(const std::string& rootSectionName); + + ShardingDataTransformCumulativeMetrics(const std::string& rootSectionName, + std::unique_ptr<NameProvider> fieldNameProvider); + virtual ~ShardingDataTransformCumulativeMetrics() = default; [[nodiscard]] UniqueScopedObserver registerInstanceMetrics(const InstanceObserver* metrics); int64_t getOldestOperationHighEstimateRemainingTimeMillis(Role role) const; int64_t getOldestOperationLowEstimateRemainingTimeMillis(Role role) const; @@ -137,62 +103,26 @@ public: void setLastOpEndingChunkImbalance(int64_t imbalanceCount); - /** - * The before can be boost::none to represent the initial state transition and - * after can be boost::none to represent cases where it is no longer active. - */ - template <typename T> - void onStateTransition(boost::optional<T> before, boost::optional<T> after); - void onReadDuringCriticalSection(); void onWriteDuringCriticalSection(); void onWriteToStashedCollections(); - void onInsertApplied(); - void onUpdateApplied(); - void onDeleteApplied(); - void onOplogEntriesFetched(int64_t numEntries, Milliseconds elapsed); - void onOplogEntriesApplied(int64_t numEntries); void onCloningTotalRemoteBatchRetrieval(Milliseconds elapsed); - void onOplogLocalBatchApplied(Milliseconds elapsed); - - static const char* fieldNameFor(CoordinatorStateEnum state); - static const char* fieldNameFor(DonorStateEnum state); - static const char* fieldNameFor(RecipientStateEnum state); - void onInsertsDuringCloning(int64_t count, int64_t bytes, const Milliseconds& elapsedTime); - void onLocalInsertDuringOplogFetching(const Milliseconds& elapsedTime); - void onBatchRetrievedDuringOplogApplying(const Milliseconds& elapsedTime); -private: - using CoordinatorStateArray = - std::array<AtomicWord<int64_t>, static_cast<size_t>(CoordinatorStateEnum::kNumStates)>; - using DonorStateArray = - std::array<AtomicWord<int64_t>, static_cast<size_t>(DonorStateEnum::kNumStates)>; - using RecipientStateArray = - std::array<AtomicWord<int64_t>, static_cast<size_t>(RecipientStateEnum::kNumStates)>; +protected: + const ShardingDataTransformCumulativeMetricsFieldNameProvider* getFieldNames() const; - void reportActive(BSONObjBuilder* bob) const; - void reportOldestActive(BSONObjBuilder* bob) const; - void reportLatencies(BSONObjBuilder* bob) const; - void reportCurrentInSteps(BSONObjBuilder* bob) const; + virtual void reportActive(BSONObjBuilder* bob) const; + virtual void reportOldestActive(BSONObjBuilder* bob) const; + virtual void reportLatencies(BSONObjBuilder* bob) const; + virtual void reportCurrentInSteps(BSONObjBuilder* bob) const; +private: MetricsSet& getMetricsSetForRole(Role role); const MetricsSet& getMetricsSetForRole(Role role) const; const InstanceObserver* getOldestOperation(WithLock, Role role) const; - template <typename T> - const AtomicWord<int64_t>* getStateCounter(T state) const; - template <typename T> - AtomicWord<int64_t>* getMutableStateCounter(T state); - - CoordinatorStateArray* getStateArrayFor(CoordinatorStateEnum state); - const CoordinatorStateArray* getStateArrayFor(CoordinatorStateEnum state) const; - DonorStateArray* getStateArrayFor(DonorStateEnum state); - const DonorStateArray* getStateArrayFor(DonorStateEnum state) const; - RecipientStateArray* getStateArrayFor(RecipientStateEnum state); - const RecipientStateArray* getStateArrayFor(RecipientStateEnum state) const; - MetricsSet::iterator insertMetrics(const InstanceObserver* metrics, MetricsSet& set); void deregisterMetrics(const Role& role, const MetricsSet::iterator& metrics); @@ -207,16 +137,8 @@ private: AtomicWord<int64_t> _countFailed{0}; AtomicWord<int64_t> _countCancelled{0}; - AtomicWord<int64_t> _insertsApplied{0}; - AtomicWord<int64_t> _updatesApplied{0}; - AtomicWord<int64_t> _deletesApplied{0}; - AtomicWord<int64_t> _oplogEntriesApplied{0}; - AtomicWord<int64_t> _oplogEntriesFetched{0}; - AtomicWord<int64_t> _totalBatchRetrievedDuringClone{0}; AtomicWord<int64_t> _totalBatchRetrievedDuringCloneMillis{0}; - AtomicWord<int64_t> _oplogBatchApplied{0}; - AtomicWord<int64_t> _oplogBatchAppliedMillis{0}; AtomicWord<int64_t> _documentsProcessed{0}; AtomicWord<int64_t> _bytesWritten{0}; @@ -224,55 +146,9 @@ private: AtomicWord<int64_t> _readsDuringCriticalSection{0}; AtomicWord<int64_t> _writesDuringCriticalSection{0}; - CoordinatorStateArray _coordinatorStateList; - DonorStateArray _donorStateList; - RecipientStateArray _recipientStateList; - AtomicWord<int64_t> _collectionCloningTotalLocalBatchInserts{0}; AtomicWord<int64_t> _collectionCloningTotalLocalInsertTimeMillis{0}; - AtomicWord<int64_t> _oplogFetchingTotalRemoteBatchesRetrieved{0}; - AtomicWord<int64_t> _oplogFetchingTotalRemoteBatchesRetrievalTimeMillis{0}; - AtomicWord<int64_t> _oplogFetchingTotalLocalInserts{0}; - AtomicWord<int64_t> _oplogFetchingTotalLocalInsertTimeMillis{0}; - AtomicWord<int64_t> _oplogApplyingTotalBatchesRetrieved{0}; - AtomicWord<int64_t> _oplogApplyingTotalBatchesRetrievalTimeMillis{0}; AtomicWord<int64_t> _writesToStashedCollections{0}; }; -template <typename T> -void ShardingDataTransformCumulativeMetrics::onStateTransition(boost::optional<T> before, - boost::optional<T> after) { - if (before) { - if (auto counter = getMutableStateCounter(*before)) { - counter->fetchAndSubtract(1); - } - } - - if (after) { - if (auto counter = getMutableStateCounter(*after)) { - counter->fetchAndAdd(1); - } - } -} - -template <typename T> -const AtomicWord<int64_t>* ShardingDataTransformCumulativeMetrics::getStateCounter(T state) const { - if (state == T::kUnused) { - return nullptr; - } - - invariant(static_cast<size_t>(state) < static_cast<size_t>(T::kNumStates)); - return &((*getStateArrayFor(state))[static_cast<size_t>(state)]); -} - -template <typename T> -AtomicWord<int64_t>* ShardingDataTransformCumulativeMetrics::getMutableStateCounter(T state) { - if (state == T::kUnused) { - return nullptr; - } - - invariant(static_cast<size_t>(state) < static_cast<size_t>(T::kNumStates)); - return &((*getStateArrayFor(state))[static_cast<size_t>(state)]); -} - } // namespace mongo diff --git a/src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp b/src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp index 1b07ca9655f..299bddb3b09 100644 --- a/src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp +++ b/src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp @@ -79,109 +79,109 @@ public: }; TEST_F(ShardingDataTransformCumulativeMetricsTest, AddAndRemoveMetrics) { - auto deregister = _cumulativeMetrics.registerInstanceMetrics(getOldestObserver()); - ASSERT_EQ(_cumulativeMetrics.getObservedMetricsCount(), 1); + auto deregister = _cumulativeMetrics->registerInstanceMetrics(getOldestObserver()); + ASSERT_EQ(_cumulativeMetrics->getObservedMetricsCount(), 1); deregister.reset(); - ASSERT_EQ(_cumulativeMetrics.getObservedMetricsCount(), 0); + ASSERT_EQ(_cumulativeMetrics->getObservedMetricsCount(), 0); } -TEST_F(ShardingDataTransformCumulativeMetricsTest, MetricsReportsOldestWhenInsertedFirst) { - auto deregisterOldest = _cumulativeMetrics.registerInstanceMetrics(getOldestObserver()); - auto deregisterYoungest = _cumulativeMetrics.registerInstanceMetrics(getYoungestObserver()); - ASSERT_EQ(_cumulativeMetrics.getOldestOperationHighEstimateRemainingTimeMillis( +TEST_F(ShardingDataTransformMetricsTestFixture, MetricsReportsOldestWhenInsertedFirst) { + auto deregisterOldest = _cumulativeMetrics->registerInstanceMetrics(getOldestObserver()); + auto deregisterYoungest = _cumulativeMetrics->registerInstanceMetrics(getYoungestObserver()); + ASSERT_EQ(_cumulativeMetrics->getOldestOperationHighEstimateRemainingTimeMillis( ObserverMock::kDefaultRole), kOldestTimeLeft); } -TEST_F(ShardingDataTransformCumulativeMetricsTest, MetricsReportsOldestWhenInsertedLast) { - auto deregisterYoungest = _cumulativeMetrics.registerInstanceMetrics(getYoungestObserver()); - auto deregisterOldest = _cumulativeMetrics.registerInstanceMetrics(getOldestObserver()); - ASSERT_EQ(_cumulativeMetrics.getOldestOperationHighEstimateRemainingTimeMillis( +TEST_F(ShardingDataTransformMetricsTestFixture, MetricsReportsOldestWhenInsertedLast) { + auto deregisterYoungest = _cumulativeMetrics->registerInstanceMetrics(getYoungestObserver()); + auto deregisterOldest = _cumulativeMetrics->registerInstanceMetrics(getOldestObserver()); + ASSERT_EQ(_cumulativeMetrics->getOldestOperationHighEstimateRemainingTimeMillis( ObserverMock::kDefaultRole), kOldestTimeLeft); } -TEST_F(ShardingDataTransformCumulativeMetricsTest, NoServerStatusWhenNeverUsed) { +TEST_F(ShardingDataTransformMetricsTestFixture, NoServerStatusWhenNeverUsed) { BSONObjBuilder bob; - _cumulativeMetrics.reportForServerStatus(&bob); + _cumulativeMetrics->reportForServerStatus(&bob); auto report = bob.done(); ASSERT_BSONOBJ_EQ(report, BSONObj()); } -TEST_F(ShardingDataTransformCumulativeMetricsTest, RemainingTimeReports0WhenEmpty) { - ASSERT_EQ(_cumulativeMetrics.getObservedMetricsCount(), 0); - ASSERT_EQ(_cumulativeMetrics.getOldestOperationHighEstimateRemainingTimeMillis( +TEST_F(ShardingDataTransformMetricsTestFixture, RemainingTimeReports0WhenEmpty) { + ASSERT_EQ(_cumulativeMetrics->getObservedMetricsCount(), 0); + ASSERT_EQ(_cumulativeMetrics->getOldestOperationHighEstimateRemainingTimeMillis( ObserverMock::kDefaultRole), 0); } -TEST_F(ShardingDataTransformCumulativeMetricsTest, UpdatesOldestWhenOldestIsRemoved) { - auto deregisterYoungest = _cumulativeMetrics.registerInstanceMetrics(getYoungestObserver()); - auto deregisterOldest = _cumulativeMetrics.registerInstanceMetrics(getOldestObserver()); - ASSERT_EQ(_cumulativeMetrics.getOldestOperationHighEstimateRemainingTimeMillis( +TEST_F(ShardingDataTransformMetricsTestFixture, UpdatesOldestWhenOldestIsRemoved) { + auto deregisterYoungest = _cumulativeMetrics->registerInstanceMetrics(getYoungestObserver()); + auto deregisterOldest = _cumulativeMetrics->registerInstanceMetrics(getOldestObserver()); + ASSERT_EQ(_cumulativeMetrics->getOldestOperationHighEstimateRemainingTimeMillis( ObserverMock::kDefaultRole), kOldestTimeLeft); deregisterOldest.reset(); - ASSERT_EQ(_cumulativeMetrics.getOldestOperationHighEstimateRemainingTimeMillis( + ASSERT_EQ(_cumulativeMetrics->getOldestOperationHighEstimateRemainingTimeMillis( ObserverMock::kDefaultRole), kYoungestTimeLeft); } -TEST_F(ShardingDataTransformCumulativeMetricsTest, InsertsTwoWithSameStartTime) { - auto deregisterOldest = _cumulativeMetrics.registerInstanceMetrics(getOldestObserver()); +TEST_F(ShardingDataTransformMetricsTestFixture, InsertsTwoWithSameStartTime) { + auto deregisterOldest = _cumulativeMetrics->registerInstanceMetrics(getOldestObserver()); ObserverMock sameAsOldest{kOldestTime, kOldestTimeLeft}; - auto deregisterOldest2 = _cumulativeMetrics.registerInstanceMetrics(&sameAsOldest); - ASSERT_EQ(_cumulativeMetrics.getObservedMetricsCount(), 2); - ASSERT_EQ(_cumulativeMetrics.getOldestOperationHighEstimateRemainingTimeMillis( + auto deregisterOldest2 = _cumulativeMetrics->registerInstanceMetrics(&sameAsOldest); + ASSERT_EQ(_cumulativeMetrics->getObservedMetricsCount(), 2); + ASSERT_EQ(_cumulativeMetrics->getOldestOperationHighEstimateRemainingTimeMillis( ObserverMock::kDefaultRole), kOldestTimeLeft); } -TEST_F(ShardingDataTransformCumulativeMetricsTest, StillReportsOldestAfterRandomOperations) { +TEST_F(ShardingDataTransformMetricsTestFixture, StillReportsOldestAfterRandomOperations) { doRandomOperationsTest<ScopedObserverMock>(); } -TEST_F(ShardingDataTransformCumulativeMetricsTest, +TEST_F(ShardingDataTransformMetricsTestFixture, StillReportsOldestAfterRandomOperationsMultithreaded) { doRandomOperationsMultithreadedTest<ScopedObserverMock>(); } -TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportsOldestByRole) { +TEST_F(ShardingDataTransformMetricsTestFixture, ReportsOldestByRole) { using Role = ShardingDataTransformMetrics::Role; auto& metrics = _cumulativeMetrics; ObserverMock oldDonor{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kDonor}; ObserverMock youngDonor{Date_t::fromMillisSinceEpoch(200), 200, 200, Role::kDonor}; ObserverMock oldRecipient{Date_t::fromMillisSinceEpoch(300), 300, 300, Role::kRecipient}; ObserverMock youngRecipient{Date_t::fromMillisSinceEpoch(400), 400, 400, Role::kRecipient}; - auto removeOldD = metrics.registerInstanceMetrics(&oldDonor); - auto removeYoungD = metrics.registerInstanceMetrics(&youngDonor); - auto removeOldR = metrics.registerInstanceMetrics(&oldRecipient); - auto removeYoungR = metrics.registerInstanceMetrics(&youngRecipient); - - ASSERT_EQ(metrics.getObservedMetricsCount(), 4); - ASSERT_EQ(metrics.getObservedMetricsCount(Role::kDonor), 2); - ASSERT_EQ(metrics.getObservedMetricsCount(Role::kRecipient), 2); - ASSERT_EQ(metrics.getOldestOperationHighEstimateRemainingTimeMillis(Role::kDonor), 100); - ASSERT_EQ(metrics.getOldestOperationHighEstimateRemainingTimeMillis(Role::kRecipient), 300); + auto removeOldD = metrics->registerInstanceMetrics(&oldDonor); + auto removeYoungD = metrics->registerInstanceMetrics(&youngDonor); + auto removeOldR = metrics->registerInstanceMetrics(&oldRecipient); + auto removeYoungR = metrics->registerInstanceMetrics(&youngRecipient); + + ASSERT_EQ(metrics->getObservedMetricsCount(), 4); + ASSERT_EQ(metrics->getObservedMetricsCount(Role::kDonor), 2); + ASSERT_EQ(metrics->getObservedMetricsCount(Role::kRecipient), 2); + ASSERT_EQ(metrics->getOldestOperationHighEstimateRemainingTimeMillis(Role::kDonor), 100); + ASSERT_EQ(metrics->getOldestOperationHighEstimateRemainingTimeMillis(Role::kRecipient), 300); removeOldD.reset(); - ASSERT_EQ(metrics.getObservedMetricsCount(), 3); - ASSERT_EQ(metrics.getObservedMetricsCount(Role::kDonor), 1); - ASSERT_EQ(metrics.getOldestOperationHighEstimateRemainingTimeMillis(Role::kDonor), 200); + ASSERT_EQ(metrics->getObservedMetricsCount(), 3); + ASSERT_EQ(metrics->getObservedMetricsCount(Role::kDonor), 1); + ASSERT_EQ(metrics->getOldestOperationHighEstimateRemainingTimeMillis(Role::kDonor), 200); removeOldR.reset(); - ASSERT_EQ(metrics.getObservedMetricsCount(), 2); - ASSERT_EQ(metrics.getObservedMetricsCount(Role::kRecipient), 1); - ASSERT_EQ(metrics.getOldestOperationHighEstimateRemainingTimeMillis(Role::kRecipient), 400); + ASSERT_EQ(metrics->getObservedMetricsCount(), 2); + ASSERT_EQ(metrics->getObservedMetricsCount(Role::kRecipient), 1); + ASSERT_EQ(metrics->getOldestOperationHighEstimateRemainingTimeMillis(Role::kRecipient), 400); } -TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsTimeEstimates) { +TEST_F(ShardingDataTransformMetricsTestFixture, ReportContainsTimeEstimates) { using Role = ShardingDataTransformMetrics::Role; ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient}; ObserverMock coordinator{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kCoordinator}; - auto recipientObserver = _cumulativeMetrics.registerInstanceMetrics(&recipient); - auto coordinatorObserver = _cumulativeMetrics.registerInstanceMetrics(&coordinator); + auto recipientObserver = _cumulativeMetrics->registerInstanceMetrics(&recipient); + auto coordinatorObserver = _cumulativeMetrics->registerInstanceMetrics(&coordinator); BSONObjBuilder bob; - _cumulativeMetrics.reportForServerStatus(&bob); + _cumulativeMetrics->reportForServerStatus(&bob); auto report = bob.done(); auto section = report.getObjectField(kTestMetricsName).getObjectField("oldestActive"); ASSERT_EQ(section.getIntField("recipientRemainingOperationTimeEstimatedMillis"), 100); @@ -193,722 +193,211 @@ TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsTimeEstimates) 300); } -TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsRunCount) { +TEST_F(ShardingDataTransformMetricsTestFixture, ReportContainsRunCount) { using Role = ShardingDataTransformMetrics::Role; ObserverMock coordinator{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kCoordinator}; - auto ignore = _cumulativeMetrics.registerInstanceMetrics(&coordinator); + auto ignore = _cumulativeMetrics->registerInstanceMetrics(&coordinator); { BSONObjBuilder bob; - _cumulativeMetrics.reportForServerStatus(&bob); + _cumulativeMetrics->reportForServerStatus(&bob); auto report = bob.done(); ASSERT_EQ(report.getObjectField(kTestMetricsName).getIntField("countStarted"), 0); } - _cumulativeMetrics.onStarted(); + _cumulativeMetrics->onStarted(); { BSONObjBuilder bob; - _cumulativeMetrics.reportForServerStatus(&bob); + _cumulativeMetrics->reportForServerStatus(&bob); auto report = bob.done(); ASSERT_EQ(report.getObjectField(kTestMetricsName).getIntField("countStarted"), 1); } } -TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsSucceededCount) { +TEST_F(ShardingDataTransformMetricsTestFixture, ReportContainsSucceededCount) { using Role = ShardingDataTransformMetrics::Role; ObserverMock coordinator{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kCoordinator}; - auto ignore = _cumulativeMetrics.registerInstanceMetrics(&coordinator); + auto ignore = _cumulativeMetrics->registerInstanceMetrics(&coordinator); { BSONObjBuilder bob; - _cumulativeMetrics.reportForServerStatus(&bob); + _cumulativeMetrics->reportForServerStatus(&bob); auto report = bob.done(); ASSERT_EQ(report.getObjectField(kTestMetricsName).getIntField("countSucceeded"), 0); } - _cumulativeMetrics.onSuccess(); + _cumulativeMetrics->onSuccess(); { BSONObjBuilder bob; - _cumulativeMetrics.reportForServerStatus(&bob); + _cumulativeMetrics->reportForServerStatus(&bob); auto report = bob.done(); ASSERT_EQ(report.getObjectField(kTestMetricsName).getIntField("countSucceeded"), 1); } } -TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsFailedCount) { +TEST_F(ShardingDataTransformMetricsTestFixture, ReportContainsFailedCount) { using Role = ShardingDataTransformMetrics::Role; ObserverMock coordinator{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kCoordinator}; - auto ignore = _cumulativeMetrics.registerInstanceMetrics(&coordinator); + auto ignore = _cumulativeMetrics->registerInstanceMetrics(&coordinator); { BSONObjBuilder bob; - _cumulativeMetrics.reportForServerStatus(&bob); + _cumulativeMetrics->reportForServerStatus(&bob); auto report = bob.done(); ASSERT_EQ(report.getObjectField(kTestMetricsName).getIntField("countFailed"), 0); } - _cumulativeMetrics.onFailure(); + _cumulativeMetrics->onFailure(); { BSONObjBuilder bob; - _cumulativeMetrics.reportForServerStatus(&bob); + _cumulativeMetrics->reportForServerStatus(&bob); auto report = bob.done(); ASSERT_EQ(report.getObjectField(kTestMetricsName).getIntField("countFailed"), 1); } } -TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsCanceledCount) { +TEST_F(ShardingDataTransformMetricsTestFixture, ReportContainsCanceledCount) { using Role = ShardingDataTransformMetrics::Role; ObserverMock coordinator{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kCoordinator}; - auto ignore = _cumulativeMetrics.registerInstanceMetrics(&coordinator); + auto ignore = _cumulativeMetrics->registerInstanceMetrics(&coordinator); { BSONObjBuilder bob; - _cumulativeMetrics.reportForServerStatus(&bob); + _cumulativeMetrics->reportForServerStatus(&bob); auto report = bob.done(); ASSERT_EQ(report.getObjectField(kTestMetricsName).getIntField("countCanceled"), 0); } - _cumulativeMetrics.onCanceled(); + _cumulativeMetrics->onCanceled(); { BSONObjBuilder bob; - _cumulativeMetrics.reportForServerStatus(&bob); + _cumulativeMetrics->reportForServerStatus(&bob); auto report = bob.done(); ASSERT_EQ(report.getObjectField(kTestMetricsName).getIntField("countCanceled"), 1); } } -TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsLastChunkImbalanceCount) { +TEST_F(ShardingDataTransformMetricsTestFixture, ReportContainsLastChunkImbalanceCount) { using Role = ShardingDataTransformMetrics::Role; ObserverMock coordinator{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kCoordinator}; - auto ignore = _cumulativeMetrics.registerInstanceMetrics(&coordinator); + auto ignore = _cumulativeMetrics->registerInstanceMetrics(&coordinator); { BSONObjBuilder bob; - _cumulativeMetrics.reportForServerStatus(&bob); + _cumulativeMetrics->reportForServerStatus(&bob); auto report = bob.done(); ASSERT_EQ(report.getObjectField(kTestMetricsName).getIntField("lastOpEndingChunkImbalance"), 0); } - _cumulativeMetrics.setLastOpEndingChunkImbalance(111); + _cumulativeMetrics->setLastOpEndingChunkImbalance(111); { BSONObjBuilder bob; - _cumulativeMetrics.reportForServerStatus(&bob); + _cumulativeMetrics->reportForServerStatus(&bob); auto report = bob.done(); ASSERT_EQ(report.getObjectField(kTestMetricsName).getIntField("lastOpEndingChunkImbalance"), 111); } - _cumulativeMetrics.setLastOpEndingChunkImbalance(777); + _cumulativeMetrics->setLastOpEndingChunkImbalance(777); { BSONObjBuilder bob; - _cumulativeMetrics.reportForServerStatus(&bob); + _cumulativeMetrics->reportForServerStatus(&bob); auto report = bob.done(); ASSERT_EQ(report.getObjectField(kTestMetricsName).getIntField("lastOpEndingChunkImbalance"), 777); } } -TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsInsertsDuringCloning) { +TEST_F(ShardingDataTransformMetricsTestFixture, ReportContainsInsertsDuringCloning) { using Role = ShardingDataTransformMetrics::Role; ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient}; - auto ignore = _cumulativeMetrics.registerInstanceMetrics(&recipient); + auto ignore = _cumulativeMetrics->registerInstanceMetrics(&recipient); - auto latencySection = getLatencySection(_cumulativeMetrics); + auto latencySection = getLatencySection(kTestMetricsName, _cumulativeMetrics.get()); ASSERT_EQ(latencySection.getIntField("collectionCloningTotalLocalInserts"), 0); ASSERT_EQ(latencySection.getIntField("collectionCloningTotalLocalInsertTimeMillis"), 0); - auto activeSection = getActiveSection(_cumulativeMetrics); + auto activeSection = getActiveSection(kTestMetricsName, _cumulativeMetrics.get()); ASSERT_EQ(activeSection.getIntField("documentsCopied"), 0); ASSERT_EQ(activeSection.getIntField("bytesCopied"), 0); - _cumulativeMetrics.onInsertsDuringCloning(140, 20763, Milliseconds(15)); + _cumulativeMetrics->onInsertsDuringCloning(140, 20763, Milliseconds(15)); - latencySection = getLatencySection(_cumulativeMetrics); + latencySection = getLatencySection(kTestMetricsName, _cumulativeMetrics.get()); ASSERT_EQ(latencySection.getIntField("collectionCloningTotalLocalInserts"), 1); ASSERT_EQ(latencySection.getIntField("collectionCloningTotalLocalInsertTimeMillis"), 15); - activeSection = getActiveSection(_cumulativeMetrics); + activeSection = getActiveSection(kTestMetricsName, _cumulativeMetrics.get()); ASSERT_EQ(activeSection.getIntField("documentsCopied"), 140); ASSERT_EQ(activeSection.getIntField("bytesCopied"), 20763); } -TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsInsertsDuringFetching) { - using Role = ShardingDataTransformMetrics::Role; - ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient}; - auto ignore = _cumulativeMetrics.registerInstanceMetrics(&recipient); - - auto latencySection = getLatencySection(_cumulativeMetrics); - ASSERT_EQ(latencySection.getIntField("oplogFetchingTotalLocalInserts"), 0); - ASSERT_EQ(latencySection.getIntField("oplogFetchingTotalLocalInsertTimeMillis"), 0); - - _cumulativeMetrics.onLocalInsertDuringOplogFetching(Milliseconds(17)); - - latencySection = getLatencySection(_cumulativeMetrics); - ASSERT_EQ(latencySection.getIntField("oplogFetchingTotalLocalInserts"), 1); - ASSERT_EQ(latencySection.getIntField("oplogFetchingTotalLocalInsertTimeMillis"), 17); -} - -TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsBatchRetrievedDuringApplying) { - using Role = ShardingDataTransformMetrics::Role; - ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient}; - auto ignore = _cumulativeMetrics.registerInstanceMetrics(&recipient); - - auto latencySection = getLatencySection(_cumulativeMetrics); - ASSERT_EQ(latencySection.getIntField("oplogApplyingTotalLocalBatchesRetrieved"), 0); - ASSERT_EQ(latencySection.getIntField("oplogApplyingTotalLocalBatchRetrievalTimeMillis"), 0); - - _cumulativeMetrics.onBatchRetrievedDuringOplogApplying(Milliseconds(39)); - - latencySection = getLatencySection(_cumulativeMetrics); - ASSERT_EQ(latencySection.getIntField("oplogApplyingTotalLocalBatchesRetrieved"), 1); - ASSERT_EQ(latencySection.getIntField("oplogApplyingTotalLocalBatchRetrievalTimeMillis"), 39); -} - -TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsReadDuringCriticalSection) { +TEST_F(ShardingDataTransformMetricsTestFixture, ReportContainsReadDuringCriticalSection) { using Role = ShardingDataTransformMetrics::Role; ObserverMock donor{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kDonor}; - auto ignore = _cumulativeMetrics.registerInstanceMetrics(&donor); + auto ignore = _cumulativeMetrics->registerInstanceMetrics(&donor); - auto activeSection = getActiveSection(_cumulativeMetrics); + auto activeSection = getActiveSection(kTestMetricsName, _cumulativeMetrics.get()); ASSERT_EQ(activeSection.getIntField("countReadsDuringCriticalSection"), 0); - _cumulativeMetrics.onReadDuringCriticalSection(); + _cumulativeMetrics->onReadDuringCriticalSection(); - activeSection = getActiveSection(_cumulativeMetrics); + activeSection = getActiveSection(kTestMetricsName, _cumulativeMetrics.get()); ASSERT_EQ(activeSection.getIntField("countReadsDuringCriticalSection"), 1); } -TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsWriteDuringCriticalSection) { +TEST_F(ShardingDataTransformMetricsTestFixture, ReportContainsWriteDuringCriticalSection) { using Role = ShardingDataTransformMetrics::Role; ObserverMock donor{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kDonor}; - auto ignore = _cumulativeMetrics.registerInstanceMetrics(&donor); + auto ignore = _cumulativeMetrics->registerInstanceMetrics(&donor); - auto activeSection = getActiveSection(_cumulativeMetrics); + auto activeSection = getActiveSection(kTestMetricsName, _cumulativeMetrics.get()); ASSERT_EQ(activeSection.getIntField("countWritesDuringCriticalSection"), 0); - _cumulativeMetrics.onWriteDuringCriticalSection(); + _cumulativeMetrics->onWriteDuringCriticalSection(); - activeSection = getActiveSection(_cumulativeMetrics); + activeSection = getActiveSection(kTestMetricsName, _cumulativeMetrics.get()); ASSERT_EQ(activeSection.getIntField("countWritesDuringCriticalSection"), 1); } -TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsWriteToStashedCollection) { +TEST_F(ShardingDataTransformMetricsTestFixture, ReportContainsWriteToStashedCollection) { using Role = ShardingDataTransformMetrics::Role; ObserverMock recipient{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kRecipient}; - auto ignore = _cumulativeMetrics.registerInstanceMetrics(&recipient); + auto ignore = _cumulativeMetrics->registerInstanceMetrics(&recipient); - auto activeSection = getActiveSection(_cumulativeMetrics); + auto activeSection = getActiveSection(kTestMetricsName, _cumulativeMetrics.get()); ASSERT_EQ(activeSection.getIntField("countWritesToStashCollections"), 0); - _cumulativeMetrics.onWriteToStashedCollections(); + _cumulativeMetrics->onWriteToStashedCollections(); - activeSection = getActiveSection(_cumulativeMetrics); + activeSection = getActiveSection(kTestMetricsName, _cumulativeMetrics.get()); ASSERT_EQ(activeSection.getIntField("countWritesToStashCollections"), 1); } -TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsBatchRetrievedDuringCloning) { +TEST_F(ShardingDataTransformMetricsTestFixture, ReportContainsBatchRetrievedDuringCloning) { using Role = ShardingDataTransformMetrics::Role; ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient}; - auto ignore = _cumulativeMetrics.registerInstanceMetrics(&recipient); + auto ignore = _cumulativeMetrics->registerInstanceMetrics(&recipient); - auto latencySection = getLatencySection(_cumulativeMetrics); + auto latencySection = getLatencySection(kTestMetricsName, _cumulativeMetrics.get()); ASSERT_EQ(latencySection.getIntField("collectionCloningTotalRemoteBatchesRetrieved"), 0); ASSERT_EQ(latencySection.getIntField("collectionCloningTotalRemoteBatchRetrievalTimeMillis"), 0); - _cumulativeMetrics.onCloningTotalRemoteBatchRetrieval(Milliseconds(19)); + _cumulativeMetrics->onCloningTotalRemoteBatchRetrieval(Milliseconds(19)); - latencySection = getLatencySection(_cumulativeMetrics); + latencySection = getLatencySection(kTestMetricsName, _cumulativeMetrics.get()); ASSERT_EQ(latencySection.getIntField("collectionCloningTotalRemoteBatchesRetrieved"), 1); ASSERT_EQ(latencySection.getIntField("collectionCloningTotalRemoteBatchRetrievalTimeMillis"), 19); } -TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsBatchApplied) { - using Role = ShardingDataTransformMetrics::Role; - ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient}; - auto ignore = _cumulativeMetrics.registerInstanceMetrics(&recipient); - - auto latencySection = getLatencySection(_cumulativeMetrics); - ASSERT_EQ(latencySection.getIntField("oplogApplyingTotalLocalBatchesApplied"), 0); - ASSERT_EQ(latencySection.getIntField("oplogApplyingTotalLocalBatchApplyTimeMillis"), 0); - - _cumulativeMetrics.onOplogLocalBatchApplied(Milliseconds(333)); - - latencySection = getLatencySection(_cumulativeMetrics); - ASSERT_EQ(latencySection.getIntField("oplogApplyingTotalLocalBatchesApplied"), 1); - ASSERT_EQ(latencySection.getIntField("oplogApplyingTotalLocalBatchApplyTimeMillis"), 333); -} - -TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsInsertsApplied) { - using Role = ShardingDataTransformMetrics::Role; - ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient}; - auto ignore = _cumulativeMetrics.registerInstanceMetrics(&recipient); - - auto activeSection = getActiveSection(_cumulativeMetrics); - ASSERT_EQ(activeSection.getIntField("insertsApplied"), 0); - - _cumulativeMetrics.onInsertApplied(); - - activeSection = getActiveSection(_cumulativeMetrics); - ASSERT_EQ(activeSection.getIntField("insertsApplied"), 1); -} - -TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsUpdatesApplied) { - using Role = ShardingDataTransformMetrics::Role; - ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient}; - auto ignore = _cumulativeMetrics.registerInstanceMetrics(&recipient); - - auto activeSection = getActiveSection(_cumulativeMetrics); - ASSERT_EQ(activeSection.getIntField("updatesApplied"), 0); - - _cumulativeMetrics.onUpdateApplied(); - - activeSection = getActiveSection(_cumulativeMetrics); - ASSERT_EQ(activeSection.getIntField("updatesApplied"), 1); -} - -TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsDeletesApplied) { - using Role = ShardingDataTransformMetrics::Role; - ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient}; - auto ignore = _cumulativeMetrics.registerInstanceMetrics(&recipient); - - auto activeSection = getActiveSection(_cumulativeMetrics); - ASSERT_EQ(activeSection.getIntField("deletesApplied"), 0); - - _cumulativeMetrics.onDeleteApplied(); - - activeSection = getActiveSection(_cumulativeMetrics); - ASSERT_EQ(activeSection.getIntField("deletesApplied"), 1); -} - -TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsOplogEntriesFetched) { - using Role = ShardingDataTransformMetrics::Role; - ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient}; - auto ignore = _cumulativeMetrics.registerInstanceMetrics(&recipient); - - auto activeSection = getActiveSection(_cumulativeMetrics); - ASSERT_EQ(activeSection.getIntField("oplogEntriesFetched"), 0); - - auto latencySection = getLatencySection(_cumulativeMetrics); - ASSERT_EQ(latencySection.getIntField("oplogFetchingTotalRemoteBatchesRetrieved"), 0); - ASSERT_EQ(latencySection.getIntField("oplogFetchingTotalRemoteBatchRetrievalTimeMillis"), 0); - - _cumulativeMetrics.onOplogEntriesFetched(123, Milliseconds(43)); - - activeSection = getActiveSection(_cumulativeMetrics); - ASSERT_EQ(activeSection.getIntField("oplogEntriesFetched"), 123); - - latencySection = getLatencySection(_cumulativeMetrics); - ASSERT_EQ(latencySection.getIntField("oplogFetchingTotalRemoteBatchesRetrieved"), 1); - ASSERT_EQ(latencySection.getIntField("oplogFetchingTotalRemoteBatchRetrievalTimeMillis"), 43); -} - -TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsOplogEntriesApplied) { - using Role = ShardingDataTransformMetrics::Role; - ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient}; - auto ignore = _cumulativeMetrics.registerInstanceMetrics(&recipient); - - auto activeSection = getActiveSection(_cumulativeMetrics); - ASSERT_EQ(activeSection.getIntField("oplogEntriesApplied"), 0); - - _cumulativeMetrics.onOplogEntriesApplied(99); - - activeSection = getActiveSection(_cumulativeMetrics); - ASSERT_EQ(activeSection.getIntField("oplogEntriesApplied"), 99); -} - -class ShardingDataTransformCumulativeStateTest : public ShardingDataTransformCumulativeMetricsTest { -public: - using CoordinatorStateEnum = ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum; - using DonorStateEnum = ShardingDataTransformCumulativeMetrics::DonorStateEnum; - using RecipientStateEnum = ShardingDataTransformCumulativeMetrics::RecipientStateEnum; - - BSONObj getStateSubObj(const ShardingDataTransformCumulativeMetrics& metrics) { - BSONObjBuilder bob; - metrics.reportForServerStatus(&bob); - auto report = bob.done(); - return report.getObjectField(kTestMetricsName).getObjectField("currentInSteps").getOwned(); - } - - bool checkCoordinateStateField(const ShardingDataTransformCumulativeMetrics& metrics, - boost::optional<CoordinatorStateEnum> expectedState) { - auto serverStatusSubObj = getStateSubObj(metrics); - std::map<std::string, int> expectedStateFieldCount; - - auto addExpectedField = [&](CoordinatorStateEnum stateToPopulate) { - expectedStateFieldCount.emplace( - ShardingDataTransformCumulativeMetrics::fieldNameFor(stateToPopulate), - ((expectedState && (stateToPopulate == expectedState)) ? 1 : 0)); - }; - - addExpectedField(CoordinatorStateEnum::kInitializing); - addExpectedField(CoordinatorStateEnum::kPreparingToDonate); - addExpectedField(CoordinatorStateEnum::kCloning); - addExpectedField(CoordinatorStateEnum::kApplying); - addExpectedField(CoordinatorStateEnum::kBlockingWrites); - addExpectedField(CoordinatorStateEnum::kAborting); - addExpectedField(CoordinatorStateEnum::kCommitting); - - for (const auto& fieldNameAndState : expectedStateFieldCount) { - const auto actualValue = serverStatusSubObj.getIntField(fieldNameAndState.first); - if (actualValue != fieldNameAndState.second) { - LOGV2_DEBUG(6438600, - 0, - "coordinator state field value does not match expected value", - "field"_attr = fieldNameAndState.first, - "serverStatus"_attr = serverStatusSubObj); - return false; - } - } - - return true; - } - - bool checkDonorStateField(const ShardingDataTransformCumulativeMetrics& metrics, - boost::optional<DonorStateEnum> expectedState) { - auto serverStatusSubObj = getStateSubObj(metrics); - std::map<std::string, int> expectedStateFieldCount; - - auto addExpectedField = [&](DonorStateEnum stateToPopulate) { - expectedStateFieldCount.emplace( - ShardingDataTransformCumulativeMetrics::fieldNameFor(stateToPopulate), - ((expectedState && (stateToPopulate == expectedState)) ? 1 : 0)); - }; - - addExpectedField(DonorStateEnum::kPreparingToDonate); - addExpectedField(DonorStateEnum::kDonatingInitialData); - addExpectedField(DonorStateEnum::kDonatingOplogEntries); - addExpectedField(DonorStateEnum::kPreparingToBlockWrites); - addExpectedField(DonorStateEnum::kError); - addExpectedField(DonorStateEnum::kBlockingWrites); - addExpectedField(DonorStateEnum::kDone); - - for (const auto& fieldNameAndState : expectedStateFieldCount) { - const auto actualValue = serverStatusSubObj.getIntField(fieldNameAndState.first); - if (actualValue != fieldNameAndState.second) { - LOGV2_DEBUG(6438701, - 0, - "Donor state field value does not match expected value", - "field"_attr = fieldNameAndState.first, - "serverStatus"_attr = serverStatusSubObj); - return false; - } - } - - return true; - } - - bool checkRecipientStateField(const ShardingDataTransformCumulativeMetrics& metrics, - boost::optional<RecipientStateEnum> expectedState) { - auto serverStatusSubObj = getStateSubObj(metrics); - std::map<std::string, int> expectedStateFieldCount; - - auto addExpectedField = [&](RecipientStateEnum stateToPopulate) { - expectedStateFieldCount.emplace( - ShardingDataTransformCumulativeMetrics::fieldNameFor(stateToPopulate), - ((expectedState && (stateToPopulate == expectedState)) ? 1 : 0)); - }; - - addExpectedField(RecipientStateEnum::kAwaitingFetchTimestamp); - addExpectedField(RecipientStateEnum::kCreatingCollection); - addExpectedField(RecipientStateEnum::kCloning); - addExpectedField(RecipientStateEnum::kApplying); - addExpectedField(RecipientStateEnum::kError); - addExpectedField(RecipientStateEnum::kStrictConsistency); - addExpectedField(RecipientStateEnum::kDone); - - for (const auto& fieldNameAndState : expectedStateFieldCount) { - const auto actualValue = serverStatusSubObj.getIntField(fieldNameAndState.first); - if (actualValue != fieldNameAndState.second) { - LOGV2_DEBUG(6438901, - 0, - "Recipient state field value does not match expected value", - "field"_attr = fieldNameAndState.first, - "serverStatus"_attr = serverStatusSubObj); - return false; - } - } - - return true; - } -}; - -TEST_F(ShardingDataTransformCumulativeStateTest, - SimulatedNormalCoordinatorStateTransitionReportsStateCorrectly) { - using Role = ShardingDataTransformMetrics::Role; - ObserverMock coordinator{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kCoordinator}; - auto ignore = _cumulativeMetrics.registerInstanceMetrics(&coordinator); - - ASSERT(checkCoordinateStateField(_cumulativeMetrics, CoordinatorStateEnum::kUnused)); - - boost::optional<CoordinatorStateEnum> prevState; - boost::optional<CoordinatorStateEnum> nextState; - - auto simulateTransitionTo = [&](boost::optional<CoordinatorStateEnum> newState) { - prevState = nextState; - nextState = newState; - _cumulativeMetrics.onStateTransition(prevState, nextState); - return checkCoordinateStateField(_cumulativeMetrics, nextState); - }; - - ASSERT(simulateTransitionTo(CoordinatorStateEnum::kUnused)); - ASSERT(simulateTransitionTo(CoordinatorStateEnum::kInitializing)); - ASSERT(simulateTransitionTo(CoordinatorStateEnum::kPreparingToDonate)); - ASSERT(simulateTransitionTo(CoordinatorStateEnum::kCloning)); - ASSERT(simulateTransitionTo(CoordinatorStateEnum::kBlockingWrites)); - ASSERT(simulateTransitionTo(CoordinatorStateEnum::kCommitting)); - ASSERT(simulateTransitionTo(CoordinatorStateEnum::kDone)); - ASSERT(simulateTransitionTo(boost::none)); -} - -TEST_F(ShardingDataTransformCumulativeStateTest, - SimulatedAbortedCoordinatorStateTransitionReportsStateCorrectly) { - using Role = ShardingDataTransformMetrics::Role; - ObserverMock coordinator{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kCoordinator}; - auto ignore = _cumulativeMetrics.registerInstanceMetrics(&coordinator); - - ASSERT(checkCoordinateStateField(_cumulativeMetrics, CoordinatorStateEnum::kUnused)); - - boost::optional<CoordinatorStateEnum> prevState; - boost::optional<CoordinatorStateEnum> nextState; - - auto simulateTransitionTo = [&](boost::optional<CoordinatorStateEnum> newState) { - prevState = nextState; - nextState = newState; - _cumulativeMetrics.onStateTransition(prevState, nextState); - return checkCoordinateStateField(_cumulativeMetrics, nextState); - }; - - ASSERT(simulateTransitionTo(CoordinatorStateEnum::kUnused)); - ASSERT(simulateTransitionTo(CoordinatorStateEnum::kInitializing)); - ASSERT(simulateTransitionTo(CoordinatorStateEnum::kPreparingToDonate)); - ASSERT(simulateTransitionTo(CoordinatorStateEnum::kAborting)); - ASSERT(simulateTransitionTo(boost::none)); -} - -TEST_F(ShardingDataTransformCumulativeStateTest, - SimulatedSteppedDownCoordinatorStateFromUnusedReportsStateCorrectly) { - using Role = ShardingDataTransformMetrics::Role; - ObserverMock coordinator{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kCoordinator}; - auto ignore = _cumulativeMetrics.registerInstanceMetrics(&coordinator); - - boost::optional<CoordinatorStateEnum> initState = CoordinatorStateEnum::kUnused; - ASSERT(checkCoordinateStateField(_cumulativeMetrics, initState)); - - _cumulativeMetrics.onStateTransition(initState, {boost::none}); - ASSERT(checkCoordinateStateField(_cumulativeMetrics, initState)); -} - -TEST_F(ShardingDataTransformCumulativeStateTest, - SimulatedSteppedDownCoordinatorStateTransitionReportsStateCorrectly) { - using Role = ShardingDataTransformMetrics::Role; - ObserverMock coordinator{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kCoordinator}; - auto ignore = _cumulativeMetrics.registerInstanceMetrics(&coordinator); - - ASSERT(checkCoordinateStateField(_cumulativeMetrics, CoordinatorStateEnum::kUnused)); - - boost::optional<CoordinatorStateEnum> prevState; - boost::optional<CoordinatorStateEnum> nextState; - - auto simulateTransitionTo = [&](boost::optional<CoordinatorStateEnum> newState) { - prevState = nextState; - nextState = newState; - _cumulativeMetrics.onStateTransition(prevState, nextState); - return checkCoordinateStateField(_cumulativeMetrics, nextState); - }; - - ASSERT(simulateTransitionTo(CoordinatorStateEnum::kUnused)); - ASSERT(simulateTransitionTo(CoordinatorStateEnum::kInitializing)); - ASSERT(simulateTransitionTo(CoordinatorStateEnum::kPreparingToDonate)); - ASSERT(simulateTransitionTo(boost::none)); -} - -TEST_F(ShardingDataTransformCumulativeStateTest, - SimulatedNormalDonorStateTransitionReportsStateCorrectly) { - using Role = ShardingDataTransformMetrics::Role; - ObserverMock donor{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kDonor}; - auto ignore = _cumulativeMetrics.registerInstanceMetrics(&donor); - - ASSERT(checkDonorStateField(_cumulativeMetrics, DonorStateEnum::kUnused)); - - boost::optional<DonorStateEnum> prevState; - boost::optional<DonorStateEnum> nextState; - - auto simulateTransitionTo = [&](boost::optional<DonorStateEnum> newState) { - prevState = nextState; - nextState = newState; - _cumulativeMetrics.onStateTransition(prevState, nextState); - return checkDonorStateField(_cumulativeMetrics, nextState); - }; - - ASSERT(simulateTransitionTo(DonorStateEnum::kUnused)); - ASSERT(simulateTransitionTo(DonorStateEnum::kPreparingToDonate)); - ASSERT(simulateTransitionTo(DonorStateEnum::kDonatingInitialData)); - ASSERT(simulateTransitionTo(DonorStateEnum::kDonatingOplogEntries)); - ASSERT(simulateTransitionTo(DonorStateEnum::kPreparingToBlockWrites)); - ASSERT(simulateTransitionTo(DonorStateEnum::kBlockingWrites)); - ASSERT(simulateTransitionTo(DonorStateEnum::kDone)); - ASSERT(simulateTransitionTo(boost::none)); -} - -TEST_F(ShardingDataTransformCumulativeStateTest, - SimulatedAbortedDonorStateTransitionReportsStateCorrectly) { - using Role = ShardingDataTransformMetrics::Role; - ObserverMock donor{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kDonor}; - auto ignore = _cumulativeMetrics.registerInstanceMetrics(&donor); - - ASSERT(checkDonorStateField(_cumulativeMetrics, DonorStateEnum::kUnused)); - - boost::optional<DonorStateEnum> prevState; - boost::optional<DonorStateEnum> nextState; - - auto simulateTransitionTo = [&](boost::optional<DonorStateEnum> newState) { - prevState = nextState; - nextState = newState; - _cumulativeMetrics.onStateTransition(prevState, nextState); - return checkDonorStateField(_cumulativeMetrics, nextState); - }; - - ASSERT(simulateTransitionTo(DonorStateEnum::kUnused)); - ASSERT(simulateTransitionTo(DonorStateEnum::kPreparingToDonate)); - ASSERT(simulateTransitionTo(DonorStateEnum::kError)); - ASSERT(simulateTransitionTo(DonorStateEnum::kDone)); - ASSERT(simulateTransitionTo(boost::none)); -} - -TEST_F(ShardingDataTransformCumulativeStateTest, - SimulatedSteppedDownDonorStateFromUnusedReportsStateCorrectly) { - using Role = ShardingDataTransformMetrics::Role; - ObserverMock donor{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kDonor}; - auto ignore = _cumulativeMetrics.registerInstanceMetrics(&donor); - - boost::optional<DonorStateEnum> initState = DonorStateEnum::kUnused; - ASSERT(checkDonorStateField(_cumulativeMetrics, initState)); - - _cumulativeMetrics.onStateTransition(initState, {boost::none}); - ASSERT(checkDonorStateField(_cumulativeMetrics, initState)); -} - -TEST_F(ShardingDataTransformCumulativeStateTest, - SimulatedSteppedDownDonorStateTransitionReportsStateCorrectly) { - using Role = ShardingDataTransformMetrics::Role; - ObserverMock donor{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kDonor}; - auto ignore = _cumulativeMetrics.registerInstanceMetrics(&donor); - - ASSERT(checkDonorStateField(_cumulativeMetrics, DonorStateEnum::kUnused)); - - boost::optional<DonorStateEnum> prevState; - boost::optional<DonorStateEnum> nextState; - - auto simulateTransitionTo = [&](boost::optional<DonorStateEnum> newState) { - prevState = nextState; - nextState = newState; - _cumulativeMetrics.onStateTransition(prevState, nextState); - return checkDonorStateField(_cumulativeMetrics, nextState); - }; - - ASSERT(simulateTransitionTo(DonorStateEnum::kUnused)); - ASSERT(simulateTransitionTo(DonorStateEnum::kPreparingToDonate)); - ASSERT(simulateTransitionTo(DonorStateEnum::kDonatingInitialData)); - ASSERT(simulateTransitionTo(boost::none)); -} - -TEST_F(ShardingDataTransformCumulativeStateTest, - SimulatedNormalRecipientStateTransitionReportsStateCorrectly) { - using Role = ShardingDataTransformMetrics::Role; - ObserverMock recipient{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kRecipient}; - auto ignore = _cumulativeMetrics.registerInstanceMetrics(&recipient); - - ASSERT(checkRecipientStateField(_cumulativeMetrics, RecipientStateEnum::kUnused)); - - boost::optional<RecipientStateEnum> prevState; - boost::optional<RecipientStateEnum> nextState; - - auto simulateTransitionTo = [&](boost::optional<RecipientStateEnum> newState) { - prevState = nextState; - nextState = newState; - _cumulativeMetrics.onStateTransition(prevState, nextState); - return checkRecipientStateField(_cumulativeMetrics, nextState); - }; - - ASSERT(simulateTransitionTo(RecipientStateEnum::kUnused)); - ASSERT(simulateTransitionTo(RecipientStateEnum::kAwaitingFetchTimestamp)); - ASSERT(simulateTransitionTo(RecipientStateEnum::kCreatingCollection)); - ASSERT(simulateTransitionTo(RecipientStateEnum::kCloning)); - ASSERT(simulateTransitionTo(RecipientStateEnum::kApplying)); - ASSERT(simulateTransitionTo(RecipientStateEnum::kStrictConsistency)); - ASSERT(simulateTransitionTo(RecipientStateEnum::kDone)); - ASSERT(simulateTransitionTo(boost::none)); -} - -TEST_F(ShardingDataTransformCumulativeStateTest, - SimulatedAbortedRecipientStateTransitionReportsStateCorrectly) { - using Role = ShardingDataTransformMetrics::Role; - ObserverMock recipient{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kRecipient}; - auto ignore = _cumulativeMetrics.registerInstanceMetrics(&recipient); - - ASSERT(checkRecipientStateField(_cumulativeMetrics, RecipientStateEnum::kUnused)); - - boost::optional<RecipientStateEnum> prevState; - boost::optional<RecipientStateEnum> nextState; - - auto simulateTransitionTo = [&](boost::optional<RecipientStateEnum> newState) { - prevState = nextState; - nextState = newState; - _cumulativeMetrics.onStateTransition(prevState, nextState); - return checkRecipientStateField(_cumulativeMetrics, nextState); - }; - - ASSERT(simulateTransitionTo(RecipientStateEnum::kUnused)); - ASSERT(simulateTransitionTo(RecipientStateEnum::kAwaitingFetchTimestamp)); - ASSERT(simulateTransitionTo(RecipientStateEnum::kError)); - ASSERT(simulateTransitionTo(boost::none)); -} - -TEST_F(ShardingDataTransformCumulativeStateTest, - SimulatedSteppedDownRecipientStateFromUnusedReportsStateCorrectly) { - using Role = ShardingDataTransformMetrics::Role; - ObserverMock recipient{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kRecipient}; - auto ignore = _cumulativeMetrics.registerInstanceMetrics(&recipient); - - boost::optional<RecipientStateEnum> initState = RecipientStateEnum::kUnused; - ASSERT(checkRecipientStateField(_cumulativeMetrics, initState)); - - _cumulativeMetrics.onStateTransition(initState, {boost::none}); - ASSERT(checkRecipientStateField(_cumulativeMetrics, initState)); -} - -TEST_F(ShardingDataTransformCumulativeStateTest, - SimulatedSteppedDownRecipientStateTransitionReportsStateCorrectly) { - using Role = ShardingDataTransformMetrics::Role; - ObserverMock recipient{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kRecipient}; - auto ignore = _cumulativeMetrics.registerInstanceMetrics(&recipient); - - ASSERT(checkRecipientStateField(_cumulativeMetrics, RecipientStateEnum::kUnused)); - - boost::optional<RecipientStateEnum> prevState; - boost::optional<RecipientStateEnum> nextState; - - auto simulateTransitionTo = [&](boost::optional<RecipientStateEnum> newState) { - prevState = nextState; - nextState = newState; - _cumulativeMetrics.onStateTransition(prevState, nextState); - return checkRecipientStateField(_cumulativeMetrics, nextState); - }; - - ASSERT(simulateTransitionTo(RecipientStateEnum::kUnused)); - ASSERT(simulateTransitionTo(RecipientStateEnum::kAwaitingFetchTimestamp)); - ASSERT(simulateTransitionTo(RecipientStateEnum::kCreatingCollection)); - ASSERT(simulateTransitionTo(boost::none)); -} - } // namespace } // namespace mongo diff --git a/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp b/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp index 9d66e37c47e..96ab3d2a6fd 100644 --- a/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp +++ b/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp @@ -247,15 +247,6 @@ void ShardingDataTransformInstanceMetrics::setCoordinatorLowEstimateRemainingTim _coordinatorLowEstimateRemainingTimeMillis.store(milliseconds); } -void ShardingDataTransformInstanceMetrics::onLocalInsertDuringOplogFetching(Milliseconds elapsed) { - _cumulativeMetrics->onLocalInsertDuringOplogFetching(elapsed); -} - -void ShardingDataTransformInstanceMetrics::onBatchRetrievedDuringOplogApplying( - Milliseconds elapsed) { - _cumulativeMetrics->onBatchRetrievedDuringOplogApplying(elapsed); -} - void ShardingDataTransformInstanceMetrics::onWriteDuringCriticalSection() { _writesDuringCriticalSection.addAndFetch(1); _cumulativeMetrics->onWriteDuringCriticalSection(); @@ -301,10 +292,6 @@ void ShardingDataTransformInstanceMetrics::onCloningTotalRemoteBatchRetrieval( _cumulativeMetrics->onCloningTotalRemoteBatchRetrieval(elapsed); } -void ShardingDataTransformInstanceMetrics::onOplogLocalBatchApplied(Milliseconds elapsed) { - _cumulativeMetrics->onOplogLocalBatchApplied(elapsed); -} - ShardingDataTransformCumulativeMetrics* ShardingDataTransformInstanceMetrics::getCumulativeMetrics() { return _cumulativeMetrics; diff --git a/src/mongo/db/s/sharding_data_transform_instance_metrics.h b/src/mongo/db/s/sharding_data_transform_instance_metrics.h index bba3a29a53e..2c26f4ab480 100644 --- a/src/mongo/db/s/sharding_data_transform_instance_metrics.h +++ b/src/mongo/db/s/sharding_data_transform_instance_metrics.h @@ -94,10 +94,7 @@ public: void setDocumentsToProcessCounts(int64_t documentCount, int64_t totalDocumentsSizeBytes); void setCoordinatorHighEstimateRemainingTimeMillis(Milliseconds milliseconds); void setCoordinatorLowEstimateRemainingTimeMillis(Milliseconds milliseconds); - void onLocalInsertDuringOplogFetching(Milliseconds elapsed); - void onBatchRetrievedDuringOplogApplying(Milliseconds elapsed); void onCloningTotalRemoteBatchRetrieval(Milliseconds elapsed); - void onOplogLocalBatchApplied(Milliseconds elapsed); void onWriteToStashedCollections(); void onReadDuringCriticalSection(); diff --git a/src/mongo/db/s/sharding_data_transform_instance_metrics_test.cpp b/src/mongo/db/s/sharding_data_transform_instance_metrics_test.cpp index 9392fe02129..f5d4ba2df01 100644 --- a/src/mongo/db/s/sharding_data_transform_instance_metrics_test.cpp +++ b/src/mongo/db/s/sharding_data_transform_instance_metrics_test.cpp @@ -146,7 +146,7 @@ public: role, getClockSource()->now(), getClockSource(), - &_cumulativeMetrics); + _cumulativeMetrics.get()); } std::unique_ptr<ShardingDataTransformInstanceMetricsForTest> createInstanceMetrics( @@ -160,7 +160,7 @@ public: ShardingDataTransformInstanceMetrics::Role::kDonor, getClockSource()->now(), getClockSource(), - &_cumulativeMetrics, + _cumulativeMetrics.get(), std::move(fieldNameProvider), std::move(mock)); } @@ -169,9 +169,9 @@ public: TEST_F(ShardingDataTransformInstanceMetricsTest, RegisterAndDeregisterMetrics) { for (auto i = 0; i < 100; i++) { auto metrics = createInstanceMetrics(); - ASSERT_EQ(_cumulativeMetrics.getObservedMetricsCount(), 1); + ASSERT_EQ(_cumulativeMetrics->getObservedMetricsCount(), 1); } - ASSERT_EQ(_cumulativeMetrics.getObservedMetricsCount(), 0); + ASSERT_EQ(_cumulativeMetrics->getObservedMetricsCount(), 0); } TEST_F(ShardingDataTransformInstanceMetricsTest, RegisterAndDeregisterMetricsAtOnce) { @@ -179,10 +179,10 @@ TEST_F(ShardingDataTransformInstanceMetricsTest, RegisterAndDeregisterMetricsAtO std::vector<std::unique_ptr<ShardingDataTransformInstanceMetricsForTest>> registered; for (auto i = 0; i < 100; i++) { registered.emplace_back(createInstanceMetrics()); - ASSERT_EQ(_cumulativeMetrics.getObservedMetricsCount(), registered.size()); + ASSERT_EQ(_cumulativeMetrics->getObservedMetricsCount(), registered.size()); } } - ASSERT_EQ(_cumulativeMetrics.getObservedMetricsCount(), 0); + ASSERT_EQ(_cumulativeMetrics->getObservedMetricsCount(), 0); } TEST_F(ShardingDataTransformInstanceMetricsTest, RandomOperations) { @@ -306,7 +306,7 @@ TEST_F(ShardingDataTransformInstanceMetricsTest, CurrentOpReportsRunningTime) { Role::kCoordinator, start, getClockSource(), - &_cumulativeMetrics); + _cumulativeMetrics.get()); auto report = metrics->reportForCurrentOp(); ASSERT_EQ(report.getIntField("totalOperationTimeElapsedSecs"), kTimeElapsed); } diff --git a/src/mongo/db/s/sharding_data_transform_metrics_test_fixture.h b/src/mongo/db/s/sharding_data_transform_metrics_test_fixture.h index a548723e16a..a898e9fca73 100644 --- a/src/mongo/db/s/sharding_data_transform_metrics_test_fixture.h +++ b/src/mongo/db/s/sharding_data_transform_metrics_test_fixture.h @@ -103,8 +103,13 @@ public: LOGV2(6437400, "", "TestName"_attr = testName, "Role"_attr = role); auto uuid = UUID::gen(); const auto& clock = getClockSource(); - auto metrics = std::make_unique<T>( - uuid, kTestCommand, kTestNamespace, role, clock->now(), clock, &_cumulativeMetrics); + auto metrics = std::make_unique<T>(uuid, + kTestCommand, + kTestNamespace, + role, + clock->now(), + clock, + _cumulativeMetrics.get()); // Reports 0 before timed section entered. clock->advance(kIncrement); @@ -129,6 +134,26 @@ public: } protected: + void setUp() override { + _cumulativeMetrics = initializeCumulativeMetrics(); + } + + static BSONObj getLatencySection(StringData rootName, + const ShardingDataTransformCumulativeMetrics* metrics) { + BSONObjBuilder bob; + metrics->reportForServerStatus(&bob); + auto report = bob.done(); + return report.getObjectField(rootName).getObjectField("latencies").getOwned(); + } + + static BSONObj getActiveSection(StringData rootName, + const ShardingDataTransformCumulativeMetrics* metrics) { + BSONObjBuilder bob; + metrics->reportForServerStatus(&bob); + auto report = bob.done(); + return report.getObjectField(rootName).getObjectField("active").getOwned(); + } + constexpr static auto kTestMetricsName = "testMetrics"; constexpr static auto kYoungestTime = Date_t::fromMillisSinceEpoch(std::numeric_limits<int64_t>::max()); @@ -140,7 +165,11 @@ protected: const BSONObj kTestCommand = BSON("command" << "test"); - ShardingDataTransformMetricsTestFixture() : _cumulativeMetrics{kTestMetricsName} {} + virtual std::unique_ptr<ShardingDataTransformCumulativeMetrics> initializeCumulativeMetrics() { + return std::make_unique<ShardingDataTransformCumulativeMetrics>( + kTestMetricsName, + std::make_unique<ShardingDataTransformCumulativeMetricsFieldNamePlaceholder>()); + } const ObserverMock* getYoungestObserver() { static StaticImmortal<ObserverMock> youngest{kYoungestTime, kYoungestTimeLeft}; @@ -162,7 +191,7 @@ protected: SpecialIndexBehaviorMap registerAtIndex(int index, const ObserverMock* mock) { return SpecialIndexBehaviorMap{{index, [this, mock] { _observers.emplace_back( - _cumulativeMetrics.registerInstanceMetrics(mock)); + _cumulativeMetrics->registerInstanceMetrics(mock)); }}}; } @@ -186,7 +215,7 @@ protected: std::make_unique<ScopedObserverType>(Date_t::fromMillisSinceEpoch(startTime), timeLeft, getClockSource(), - &_cumulativeMetrics)); + _cumulativeMetrics.get())); }; auto performRemoval = [&] { auto i = rng.nextInt32(inserted.size()); @@ -219,7 +248,7 @@ protected: kRemovalOdds, rng.nextInt64(), registerAtIndex(rng.nextInt32(kIterations), getOldestObserver())); - ASSERT_EQ(_cumulativeMetrics.getOldestOperationHighEstimateRemainingTimeMillis( + ASSERT_EQ(_cumulativeMetrics->getOldestOperationHighEstimateRemainingTimeMillis( ObserverMock::kDefaultRole), kOldestTimeLeft); } @@ -256,20 +285,20 @@ protected: for (auto& pf : threadPFs) { pf.future.wait(); } - ASSERT_EQ(_cumulativeMetrics.getOldestOperationHighEstimateRemainingTimeMillis( + ASSERT_EQ(_cumulativeMetrics->getOldestOperationHighEstimateRemainingTimeMillis( ObserverMock::kDefaultRole), kOldestTimeLeft); size_t expectedCount = 1; // Special insert for kOldest is not counted in vector size. for (auto& v : threadStorage) { expectedCount += v.size(); } - ASSERT_EQ(_cumulativeMetrics.getObservedMetricsCount(), expectedCount); + ASSERT_EQ(_cumulativeMetrics->getObservedMetricsCount(), expectedCount); for (auto& t : threads) { t.join(); } } - ShardingDataTransformCumulativeMetrics _cumulativeMetrics; + std::unique_ptr<ShardingDataTransformCumulativeMetrics> _cumulativeMetrics; std::vector<ShardingDataTransformCumulativeMetrics::UniqueScopedObserver> _observers; }; |