diff options
12 files changed, 431 insertions, 132 deletions
diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp index 54381a5f6c0..3b1954de232 100644 --- a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp +++ b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp @@ -270,8 +270,7 @@ bool ReshardingCollectionCloner::doOneBatch(OperationContext* opCtx, Pipeline& p auto batch = resharding::data_copy::fillBatchForInsert( pipeline, resharding::gReshardingCollectionClonerBatchSizeInBytes.load()); - _metrics->onCloningTotalRemoteBatchRetrieval( - duration_cast<Milliseconds>(latencyTimer.elapsed())); + _metrics->onCloningRemoteBatchRetrieval(duration_cast<Milliseconds>(latencyTimer.elapsed())); if (batch.empty()) { return false; diff --git a/src/mongo/db/s/resharding/resharding_cumulative_metrics_test.cpp b/src/mongo/db/s/resharding/resharding_cumulative_metrics_test.cpp index c35b2f31c10..f47a96022ec 100644 --- a/src/mongo/db/s/resharding/resharding_cumulative_metrics_test.cpp +++ b/src/mongo/db/s/resharding/resharding_cumulative_metrics_test.cpp @@ -52,6 +52,10 @@ protected: return std::make_unique<ReshardingCumulativeMetrics>(); } + virtual StringData getRootSectionName() override { + return kResharding; + } + using CoordinatorStateEnum = ReshardingCumulativeMetrics::CoordinatorStateEnum; using DonorStateEnum = ReshardingCumulativeMetrics::DonorStateEnum; using RecipientStateEnum = ReshardingCumulativeMetrics::RecipientStateEnum; @@ -180,15 +184,15 @@ TEST_F(ReshardingCumulativeMetricsTest, ReportContainsInsertsDuringFetching) { ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient}; auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&recipient); - auto latencySection = getLatencySection(kResharding, _reshardingCumulativeMetrics); - ASSERT_EQ(latencySection.getIntField("oplogFetchingTotalLocalInserts"), 0); - ASSERT_EQ(latencySection.getIntField("oplogFetchingTotalLocalInsertTimeMillis"), 0); + auto latencies = getCumulativeMetricsReportForSection(kLatencies); + ASSERT_EQ(latencies.getIntField("oplogFetchingTotalLocalInserts"), 0); + ASSERT_EQ(latencies.getIntField("oplogFetchingTotalLocalInsertTimeMillis"), 0); _reshardingCumulativeMetrics->onLocalInsertDuringOplogFetching(Milliseconds(17)); - latencySection = getLatencySection(kResharding, _reshardingCumulativeMetrics); - ASSERT_EQ(latencySection.getIntField("oplogFetchingTotalLocalInserts"), 1); - ASSERT_EQ(latencySection.getIntField("oplogFetchingTotalLocalInsertTimeMillis"), 17); + latencies = getCumulativeMetricsReportForSection(kLatencies); + ASSERT_EQ(latencies.getIntField("oplogFetchingTotalLocalInserts"), 1); + ASSERT_EQ(latencies.getIntField("oplogFetchingTotalLocalInsertTimeMillis"), 17); } @@ -197,15 +201,15 @@ TEST_F(ReshardingCumulativeMetricsTest, ReportContainsBatchRetrievedDuringApplyi ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient}; auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&recipient); - auto latencySection = getLatencySection(kResharding, _reshardingCumulativeMetrics); - ASSERT_EQ(latencySection.getIntField("oplogApplyingTotalLocalBatchesRetrieved"), 0); - ASSERT_EQ(latencySection.getIntField("oplogApplyingTotalLocalBatchRetrievalTimeMillis"), 0); + auto latencies = getCumulativeMetricsReportForSection(kLatencies); + ASSERT_EQ(latencies.getIntField("oplogApplyingTotalLocalBatchesRetrieved"), 0); + ASSERT_EQ(latencies.getIntField("oplogApplyingTotalLocalBatchRetrievalTimeMillis"), 0); _reshardingCumulativeMetrics->onBatchRetrievedDuringOplogApplying(Milliseconds(39)); - latencySection = getLatencySection(kResharding, _reshardingCumulativeMetrics); - ASSERT_EQ(latencySection.getIntField("oplogApplyingTotalLocalBatchesRetrieved"), 1); - ASSERT_EQ(latencySection.getIntField("oplogApplyingTotalLocalBatchRetrievalTimeMillis"), 39); + latencies = getCumulativeMetricsReportForSection(kLatencies); + ASSERT_EQ(latencies.getIntField("oplogApplyingTotalLocalBatchesRetrieved"), 1); + ASSERT_EQ(latencies.getIntField("oplogApplyingTotalLocalBatchRetrievalTimeMillis"), 39); } @@ -214,15 +218,15 @@ TEST_F(ReshardingCumulativeMetricsTest, ReportContainsBatchApplied) { ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient}; auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&recipient); - auto latencySection = getLatencySection(kResharding, _reshardingCumulativeMetrics); - ASSERT_EQ(latencySection.getIntField("oplogApplyingTotalLocalBatchesApplied"), 0); - ASSERT_EQ(latencySection.getIntField("oplogApplyingTotalLocalBatchApplyTimeMillis"), 0); + auto latencies = getCumulativeMetricsReportForSection(kLatencies); + ASSERT_EQ(latencies.getIntField("oplogApplyingTotalLocalBatchesApplied"), 0); + ASSERT_EQ(latencies.getIntField("oplogApplyingTotalLocalBatchApplyTimeMillis"), 0); _reshardingCumulativeMetrics->onOplogLocalBatchApplied(Milliseconds(333)); - latencySection = getLatencySection(kResharding, _reshardingCumulativeMetrics); - ASSERT_EQ(latencySection.getIntField("oplogApplyingTotalLocalBatchesApplied"), 1); - ASSERT_EQ(latencySection.getIntField("oplogApplyingTotalLocalBatchApplyTimeMillis"), 333); + latencies = getCumulativeMetricsReportForSection(kLatencies); + ASSERT_EQ(latencies.getIntField("oplogApplyingTotalLocalBatchesApplied"), 1); + ASSERT_EQ(latencies.getIntField("oplogApplyingTotalLocalBatchApplyTimeMillis"), 333); } TEST_F(ReshardingCumulativeMetricsTest, ReportContainsInsertsApplied) { @@ -230,13 +234,13 @@ TEST_F(ReshardingCumulativeMetricsTest, ReportContainsInsertsApplied) { ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient}; auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&recipient); - auto activeSection = getActiveSection(kResharding, _reshardingCumulativeMetrics); - ASSERT_EQ(activeSection.getIntField("insertsApplied"), 0); + auto active = getCumulativeMetricsReportForSection(kActive); + ASSERT_EQ(active.getIntField("insertsApplied"), 0); _reshardingCumulativeMetrics->onInsertApplied(); - activeSection = getActiveSection(kResharding, _reshardingCumulativeMetrics); - ASSERT_EQ(activeSection.getIntField("insertsApplied"), 1); + active = getCumulativeMetricsReportForSection(kActive); + ASSERT_EQ(active.getIntField("insertsApplied"), 1); } TEST_F(ReshardingCumulativeMetricsTest, ReportContainsUpdatesApplied) { @@ -244,13 +248,13 @@ TEST_F(ReshardingCumulativeMetricsTest, ReportContainsUpdatesApplied) { ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient}; auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&recipient); - auto activeSection = getActiveSection(kResharding, _reshardingCumulativeMetrics); - ASSERT_EQ(activeSection.getIntField("updatesApplied"), 0); + auto active = getCumulativeMetricsReportForSection(kActive); + ASSERT_EQ(active.getIntField("updatesApplied"), 0); _reshardingCumulativeMetrics->onUpdateApplied(); - activeSection = getActiveSection(kResharding, _reshardingCumulativeMetrics); - ASSERT_EQ(activeSection.getIntField("updatesApplied"), 1); + active = getCumulativeMetricsReportForSection(kActive); + ASSERT_EQ(active.getIntField("updatesApplied"), 1); } TEST_F(ReshardingCumulativeMetricsTest, ReportContainsDeletesApplied) { @@ -258,13 +262,13 @@ TEST_F(ReshardingCumulativeMetricsTest, ReportContainsDeletesApplied) { ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient}; auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&recipient); - auto activeSection = getActiveSection(kResharding, _reshardingCumulativeMetrics); - ASSERT_EQ(activeSection.getIntField("deletesApplied"), 0); + auto active = getCumulativeMetricsReportForSection(kActive); + ASSERT_EQ(active.getIntField("deletesApplied"), 0); _reshardingCumulativeMetrics->onDeleteApplied(); - activeSection = getActiveSection(kResharding, _reshardingCumulativeMetrics); - ASSERT_EQ(activeSection.getIntField("deletesApplied"), 1); + active = getCumulativeMetricsReportForSection(kActive); + ASSERT_EQ(active.getIntField("deletesApplied"), 1); } TEST_F(ReshardingCumulativeMetricsTest, ReportContainsOplogEntriesFetched) { @@ -272,21 +276,21 @@ TEST_F(ReshardingCumulativeMetricsTest, ReportContainsOplogEntriesFetched) { ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient}; auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&recipient); - auto activeSection = getActiveSection(kResharding, _reshardingCumulativeMetrics); - ASSERT_EQ(activeSection.getIntField("oplogEntriesFetched"), 0); + auto active = getCumulativeMetricsReportForSection(kActive); + ASSERT_EQ(active.getIntField("oplogEntriesFetched"), 0); - auto latencySection = getLatencySection(kResharding, _reshardingCumulativeMetrics); - ASSERT_EQ(latencySection.getIntField("oplogFetchingTotalRemoteBatchesRetrieved"), 0); - ASSERT_EQ(latencySection.getIntField("oplogFetchingTotalRemoteBatchRetrievalTimeMillis"), 0); + auto latencies = getCumulativeMetricsReportForSection(kLatencies); + ASSERT_EQ(latencies.getIntField("oplogFetchingTotalRemoteBatchesRetrieved"), 0); + ASSERT_EQ(latencies.getIntField("oplogFetchingTotalRemoteBatchRetrievalTimeMillis"), 0); _reshardingCumulativeMetrics->onOplogEntriesFetched(123, Milliseconds(43)); - activeSection = getActiveSection(kResharding, _reshardingCumulativeMetrics); - ASSERT_EQ(activeSection.getIntField("oplogEntriesFetched"), 123); + active = getCumulativeMetricsReportForSection(kActive); + ASSERT_EQ(active.getIntField("oplogEntriesFetched"), 123); - latencySection = getLatencySection(kResharding, _reshardingCumulativeMetrics); - ASSERT_EQ(latencySection.getIntField("oplogFetchingTotalRemoteBatchesRetrieved"), 1); - ASSERT_EQ(latencySection.getIntField("oplogFetchingTotalRemoteBatchRetrievalTimeMillis"), 43); + latencies = getCumulativeMetricsReportForSection(kLatencies); + ASSERT_EQ(latencies.getIntField("oplogFetchingTotalRemoteBatchesRetrieved"), 1); + ASSERT_EQ(latencies.getIntField("oplogFetchingTotalRemoteBatchRetrievalTimeMillis"), 43); } TEST_F(ReshardingCumulativeMetricsTest, ReportContainsOplogEntriesApplied) { @@ -294,13 +298,13 @@ TEST_F(ReshardingCumulativeMetricsTest, ReportContainsOplogEntriesApplied) { ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient}; auto ignore = _reshardingCumulativeMetrics->registerInstanceMetrics(&recipient); - auto activeSection = getActiveSection(kResharding, _reshardingCumulativeMetrics); - ASSERT_EQ(activeSection.getIntField("oplogEntriesApplied"), 0); + auto active = getCumulativeMetricsReportForSection(kActive); + ASSERT_EQ(active.getIntField("oplogEntriesApplied"), 0); _reshardingCumulativeMetrics->onOplogEntriesApplied(99); - activeSection = getActiveSection(kResharding, _reshardingCumulativeMetrics); - ASSERT_EQ(activeSection.getIntField("oplogEntriesApplied"), 99); + active = getCumulativeMetricsReportForSection(kActive); + ASSERT_EQ(active.getIntField("oplogEntriesApplied"), 99); } TEST_F(ReshardingCumulativeMetricsTest, diff --git a/src/mongo/db/s/resharding/resharding_metrics_test.cpp b/src/mongo/db/s/resharding/resharding_metrics_test.cpp index 99b9d2d7cf6..b36d4a4d3aa 100644 --- a/src/mongo/db/s/resharding/resharding_metrics_test.cpp +++ b/src/mongo/db/s/resharding/resharding_metrics_test.cpp @@ -44,6 +44,7 @@ namespace mongo { namespace { constexpr auto kRunningTime = Seconds(12345); +constexpr auto kResharding = "resharding"; const auto kShardKey = BSON("newKey" << 1); class ReshardingMetricsTest : public ShardingDataTransformMetricsTestFixture { @@ -66,6 +67,10 @@ public: getCumulativeMetrics()); } + virtual StringData getRootSectionName() override { + return kResharding; + } + const UUID& getSourceCollectionId() { static UUID id = UUID::gen(); return id; @@ -194,6 +199,18 @@ public: report = metrics->reportForCurrentOp(); ASSERT_EQ(report.getIntField(fieldName), getExpectedDuration()); } + + void createMetricsAndAssertIncrementsCumulativeMetricsField( + const std::function<void(ReshardingMetrics*)>& mutate, + Section section, + const StringData& fieldName) { + auto metrics = createInstanceMetrics(getClockSource(), UUID::gen(), Role::kCoordinator); + assertIncrementsCumulativeMetricsField( + metrics.get(), + [&](auto base) { mutate(dynamic_cast<ReshardingMetrics*>(base)); }, + section, + fieldName); + } }; TEST_F(ReshardingMetricsTest, ReportForCurrentOpShouldHaveReshardingMetricsDescription) { @@ -534,5 +551,136 @@ TEST_F(ReshardingMetricsTest, CurrentOpDoesNotReportRecipientEstimateIfNotSet) { ASSERT_FALSE(report.hasField("remainingOperationTimeEstimatedSecs")); } +TEST_F(ReshardingMetricsTest, OnInsertAppliedIncrementsCumulativeMetrics) { + createMetricsAndAssertIncrementsCumulativeMetricsField( + [](auto metrics) { metrics->onInsertApplied(); }, Section::kActive, "insertsApplied"); +} + +TEST_F(ReshardingMetricsTest, OnUpdateAppliedIncrementsCumulativeMetrics) { + createMetricsAndAssertIncrementsCumulativeMetricsField( + [](auto metrics) { metrics->onUpdateApplied(); }, Section::kActive, "updatesApplied"); +} + +TEST_F(ReshardingMetricsTest, OnDeleteAppliedIncrementsCumulativeMetrics) { + createMetricsAndAssertIncrementsCumulativeMetricsField( + [](auto metrics) { metrics->onDeleteApplied(); }, Section::kActive, "deletesApplied"); +} + +TEST_F(ReshardingMetricsTest, OnOplogFetchedIncrementsCumulativeMetricsFetchedCount) { + createMetricsAndAssertIncrementsCumulativeMetricsField( + [](auto metrics) { metrics->onOplogEntriesFetched(1, Milliseconds{0}); }, + Section::kActive, + "oplogEntriesFetched"); +} + +TEST_F(ReshardingMetricsTest, OnOplogFetchedIncrementsCumulativeMetricsFetchedBatchCount) { + createMetricsAndAssertIncrementsCumulativeMetricsField( + [](auto metrics) { metrics->onOplogEntriesFetched(0, Milliseconds{0}); }, + Section::kLatencies, + "oplogFetchingTotalRemoteBatchesRetrieved"); +} + +TEST_F(ReshardingMetricsTest, OnOplogFetchedIncrementsCumulativeMetricsFetchedBatchTime) { + createMetricsAndAssertIncrementsCumulativeMetricsField( + [](auto metrics) { metrics->onOplogEntriesFetched(0, Milliseconds{1}); }, + Section::kLatencies, + "oplogFetchingTotalRemoteBatchRetrievalTimeMillis"); +} + +TEST_F(ReshardingMetricsTest, OnLocalFetchingInsertIncrementsCumulativeMetricsFetchedBatchCount) { + createMetricsAndAssertIncrementsCumulativeMetricsField( + [](auto metrics) { metrics->onLocalInsertDuringOplogFetching(Milliseconds{0}); }, + Section::kLatencies, + "oplogFetchingTotalLocalInserts"); +} + +TEST_F(ReshardingMetricsTest, OnLocalFetchingInsertIncrementsCumulativeMetricsFetchedBatchTime) { + createMetricsAndAssertIncrementsCumulativeMetricsField( + [](auto metrics) { metrics->onLocalInsertDuringOplogFetching(Milliseconds{1}); }, + Section::kLatencies, + "oplogFetchingTotalLocalInsertTimeMillis"); +} + +TEST_F(ReshardingMetricsTest, OnOplogAppliedIncrementsCumulativeMetricsAppliedCount) { + createMetricsAndAssertIncrementsCumulativeMetricsField( + [](auto metrics) { metrics->onOplogEntriesApplied(1); }, + Section::kActive, + "oplogEntriesApplied"); +} + +TEST_F(ReshardingMetricsTest, OnApplyingBatchRetrievedIncrementsCumulativeMetricsBatchCount) { + createMetricsAndAssertIncrementsCumulativeMetricsField( + [](auto metrics) { metrics->onBatchRetrievedDuringOplogApplying(Milliseconds{0}); }, + Section::kLatencies, + "oplogApplyingTotalLocalBatchesRetrieved"); +} + +TEST_F(ReshardingMetricsTest, OnApplyingBatchRetrievedIncrementsCumulativeMetricsBatchTime) { + createMetricsAndAssertIncrementsCumulativeMetricsField( + [](auto metrics) { metrics->onBatchRetrievedDuringOplogApplying(Milliseconds{1}); }, + Section::kLatencies, + "oplogApplyingTotalLocalBatchRetrievalTimeMillis"); +} + +TEST_F(ReshardingMetricsTest, OnApplyingBatchAppliedIncrementsCumulativeMetricsBatchCount) { + createMetricsAndAssertIncrementsCumulativeMetricsField( + [](auto metrics) { metrics->onOplogLocalBatchApplied(Milliseconds{0}); }, + Section::kLatencies, + "oplogApplyingTotalLocalBatchesApplied"); +} + +TEST_F(ReshardingMetricsTest, OnApplyingBatchAppliedIncrementsCumulativeMetricsBatchTime) { + createMetricsAndAssertIncrementsCumulativeMetricsField( + [](auto metrics) { metrics->onOplogLocalBatchApplied(Milliseconds{1}); }, + Section::kLatencies, + "oplogApplyingTotalLocalBatchApplyTimeMillis"); +} + +TEST_F(ReshardingMetricsTest, OnStateTransitionFromNoneInformsCumulativeMetrics) { + createMetricsAndAssertIncrementsCumulativeMetricsField( + [](auto metrics) { + metrics->onStateTransition( + boost::none, ReshardingMetrics::CoordinatorState{CoordinatorStateEnum::kApplying}); + }, + Section::kCurrentInSteps, + "countInstancesInCoordinatorState4Applying"); +} + +TEST_F(ReshardingMetricsTest, OnStateTransitionToNoneInformsCumulativeMetrics) { + auto metrics = createInstanceMetrics(getClockSource(), UUID::gen(), Role::kCoordinator); + auto state = ReshardingMetrics::CoordinatorState{CoordinatorStateEnum::kApplying}; + metrics->onStateTransition(boost::none, state); + assertDecrementsCumulativeMetricsField( + metrics.get(), + [=](auto base) { + auto reshardingMetrics = dynamic_cast<ReshardingMetrics*>(base); + reshardingMetrics->onStateTransition(state, boost::none); + }, + Section::kCurrentInSteps, + "countInstancesInCoordinatorState4Applying"); +} + +TEST_F(ReshardingMetricsTest, OnStateTransitionInformsCumulativeMetrics) { + auto metrics = createInstanceMetrics(getClockSource(), UUID::gen(), Role::kCoordinator); + auto initialState = ReshardingMetrics::CoordinatorState{CoordinatorStateEnum::kApplying}; + auto nextState = ReshardingMetrics::CoordinatorState{CoordinatorStateEnum::kBlockingWrites}; + metrics->onStateTransition(boost::none, initialState); + assertAltersCumulativeMetrics( + metrics.get(), + [=](auto base) { + auto reshardingMetrics = dynamic_cast<ReshardingMetrics*>(base); + reshardingMetrics->onStateTransition(initialState, nextState); + }, + [this](auto reportBefore, auto reportAfter) { + auto before = getReportSection(reportBefore, Section::kCurrentInSteps); + ASSERT_EQ(before.getIntField("countInstancesInCoordinatorState4Applying"), 1); + ASSERT_EQ(before.getIntField("countInstancesInCoordinatorState5BlockingWrites"), 0); + auto after = getReportSection(reportAfter, Section::kCurrentInSteps); + ASSERT_EQ(after.getIntField("countInstancesInCoordinatorState4Applying"), 0); + ASSERT_EQ(after.getIntField("countInstancesInCoordinatorState5BlockingWrites"), 1); + return true; + }); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp b/src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp index 0b0665a6b02..f9e3bc5d2ee 100644 --- a/src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp +++ b/src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp @@ -275,8 +275,7 @@ void ShardingDataTransformCumulativeMetrics::onWriteToStashedCollections() { _writesToStashedCollections.fetchAndAdd(1); } -void ShardingDataTransformCumulativeMetrics::onCloningTotalRemoteBatchRetrieval( - Milliseconds elapsed) { +void ShardingDataTransformCumulativeMetrics::onCloningRemoteBatchRetrieval(Milliseconds elapsed) { _totalBatchRetrievedDuringClone.fetchAndAdd(1); _totalBatchRetrievedDuringCloneMillis.fetchAndAdd(durationCount<Milliseconds>(elapsed)); } diff --git a/src/mongo/db/s/sharding_data_transform_cumulative_metrics.h b/src/mongo/db/s/sharding_data_transform_cumulative_metrics.h index 21a107c8f53..c65336e07c0 100644 --- a/src/mongo/db/s/sharding_data_transform_cumulative_metrics.h +++ b/src/mongo/db/s/sharding_data_transform_cumulative_metrics.h @@ -107,7 +107,7 @@ public: void onWriteDuringCriticalSection(); void onWriteToStashedCollections(); - void onCloningTotalRemoteBatchRetrieval(Milliseconds elapsed); + void onCloningRemoteBatchRetrieval(Milliseconds elapsed); void onInsertsDuringCloning(int64_t count, int64_t bytes, const Milliseconds& elapsedTime); protected: diff --git a/src/mongo/db/s/sharding_data_transform_cumulative_metrics_field_name_provider.cpp b/src/mongo/db/s/sharding_data_transform_cumulative_metrics_field_name_provider.cpp index cf0d988fda7..d50acad2640 100644 --- a/src/mongo/db/s/sharding_data_transform_cumulative_metrics_field_name_provider.cpp +++ b/src/mongo/db/s/sharding_data_transform_cumulative_metrics_field_name_provider.cpp @@ -33,7 +33,6 @@ namespace mongo { namespace { using Provider = ShardingDataTransformCumulativeMetricsFieldNameProvider; -using Placeholder = ShardingDataTransformCumulativeMetricsFieldNamePlaceholder; constexpr auto kCountStarted = "countStarted"; constexpr auto kCountSucceeded = "countSucceeded"; constexpr auto kCountFailed = "countFailed"; @@ -106,11 +105,4 @@ StringData Provider::getForCollectionCloningTotalLocalInserts() const { return kCollectionCloningTotalLocalInserts; } -StringData Placeholder::getForDocumentsProcessed() const { - return kDocumentsCopied; -} -StringData Placeholder::getForBytesWritten() const { - return kBytesCopied; -} - } // namespace mongo diff --git a/src/mongo/db/s/sharding_data_transform_cumulative_metrics_field_name_provider.h b/src/mongo/db/s/sharding_data_transform_cumulative_metrics_field_name_provider.h index 51cad5ed3fb..e8e68b0cc83 100644 --- a/src/mongo/db/s/sharding_data_transform_cumulative_metrics_field_name_provider.h +++ b/src/mongo/db/s/sharding_data_transform_cumulative_metrics_field_name_provider.h @@ -55,12 +55,4 @@ public: StringData getForCollectionCloningTotalLocalInserts() const; }; -class ShardingDataTransformCumulativeMetricsFieldNamePlaceholder - : public ShardingDataTransformCumulativeMetricsFieldNameProvider { -public: - virtual ~ShardingDataTransformCumulativeMetricsFieldNamePlaceholder() = default; - virtual StringData getForDocumentsProcessed() const override; - virtual StringData getForBytesWritten() const override; -}; - } // namespace mongo diff --git a/src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp b/src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp index 1fa2585ac3e..b8d295461e8 100644 --- a/src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp +++ b/src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp @@ -61,24 +61,7 @@ private: ShardingDataTransformCumulativeMetrics::UniqueScopedObserver _scopedOpObserver; }; -class ShardingDataTransformCumulativeMetricsTest : public ShardingDataTransformMetricsTestFixture { -public: - static BSONObj getLatencySection(const ShardingDataTransformCumulativeMetrics& metrics) { - BSONObjBuilder bob; - metrics.reportForServerStatus(&bob); - auto report = bob.done(); - return report.getObjectField(kTestMetricsName).getObjectField("latencies").getOwned(); - } - - static BSONObj getActiveSection(const ShardingDataTransformCumulativeMetrics& metrics) { - BSONObjBuilder bob; - metrics.reportForServerStatus(&bob); - auto report = bob.done(); - return report.getObjectField(kTestMetricsName).getObjectField("active").getOwned(); - } -}; - -TEST_F(ShardingDataTransformCumulativeMetricsTest, AddAndRemoveMetrics) { +TEST_F(ShardingDataTransformMetricsTestFixture, AddAndRemoveMetrics) { auto deregister = _cumulativeMetrics->registerInstanceMetrics(getOldestObserver()); ASSERT_EQ(_cumulativeMetrics->getObservedMetricsCount(), 1); deregister.reset(); @@ -320,23 +303,23 @@ TEST_F(ShardingDataTransformMetricsTestFixture, ReportContainsInsertsDuringCloni ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient}; auto ignore = _cumulativeMetrics->registerInstanceMetrics(&recipient); - auto latencySection = getLatencySection(kTestMetricsName, _cumulativeMetrics.get()); - ASSERT_EQ(latencySection.getIntField("collectionCloningTotalLocalInserts"), 0); - ASSERT_EQ(latencySection.getIntField("collectionCloningTotalLocalInsertTimeMillis"), 0); + auto latencies = getCumulativeMetricsReportForSection(kLatencies); + ASSERT_EQ(latencies.getIntField("collectionCloningTotalLocalInserts"), 0); + ASSERT_EQ(latencies.getIntField("collectionCloningTotalLocalInsertTimeMillis"), 0); - auto activeSection = getActiveSection(kTestMetricsName, _cumulativeMetrics.get()); - ASSERT_EQ(activeSection.getIntField("documentsCopied"), 0); - ASSERT_EQ(activeSection.getIntField("bytesCopied"), 0); + auto active = getCumulativeMetricsReportForSection(kActive); + ASSERT_EQ(active.getIntField("documentsProcessed"), 0); + ASSERT_EQ(active.getIntField("bytesWritten"), 0); _cumulativeMetrics->onInsertsDuringCloning(140, 20763, Milliseconds(15)); - latencySection = getLatencySection(kTestMetricsName, _cumulativeMetrics.get()); - ASSERT_EQ(latencySection.getIntField("collectionCloningTotalLocalInserts"), 1); - ASSERT_EQ(latencySection.getIntField("collectionCloningTotalLocalInsertTimeMillis"), 15); + latencies = getCumulativeMetricsReportForSection(kLatencies); + ASSERT_EQ(latencies.getIntField("collectionCloningTotalLocalInserts"), 1); + ASSERT_EQ(latencies.getIntField("collectionCloningTotalLocalInsertTimeMillis"), 15); - activeSection = getActiveSection(kTestMetricsName, _cumulativeMetrics.get()); - ASSERT_EQ(activeSection.getIntField("documentsCopied"), 140); - ASSERT_EQ(activeSection.getIntField("bytesCopied"), 20763); + active = getCumulativeMetricsReportForSection(kActive); + ASSERT_EQ(active.getIntField("documentsProcessed"), 140); + ASSERT_EQ(active.getIntField("bytesWritten"), 20763); } TEST_F(ShardingDataTransformMetricsTestFixture, ReportContainsReadDuringCriticalSection) { @@ -344,13 +327,13 @@ TEST_F(ShardingDataTransformMetricsTestFixture, ReportContainsReadDuringCritical ObserverMock donor{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kDonor}; auto ignore = _cumulativeMetrics->registerInstanceMetrics(&donor); - auto activeSection = getActiveSection(kTestMetricsName, _cumulativeMetrics.get()); - ASSERT_EQ(activeSection.getIntField("countReadsDuringCriticalSection"), 0); + auto active = getCumulativeMetricsReportForSection(kActive); + ASSERT_EQ(active.getIntField("countReadsDuringCriticalSection"), 0); _cumulativeMetrics->onReadDuringCriticalSection(); - activeSection = getActiveSection(kTestMetricsName, _cumulativeMetrics.get()); - ASSERT_EQ(activeSection.getIntField("countReadsDuringCriticalSection"), 1); + active = getCumulativeMetricsReportForSection(kActive); + ASSERT_EQ(active.getIntField("countReadsDuringCriticalSection"), 1); } TEST_F(ShardingDataTransformMetricsTestFixture, ReportContainsWriteDuringCriticalSection) { @@ -358,13 +341,13 @@ TEST_F(ShardingDataTransformMetricsTestFixture, ReportContainsWriteDuringCritica ObserverMock donor{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kDonor}; auto ignore = _cumulativeMetrics->registerInstanceMetrics(&donor); - auto activeSection = getActiveSection(kTestMetricsName, _cumulativeMetrics.get()); - ASSERT_EQ(activeSection.getIntField("countWritesDuringCriticalSection"), 0); + auto active = getCumulativeMetricsReportForSection(kActive); + ASSERT_EQ(active.getIntField("countWritesDuringCriticalSection"), 0); _cumulativeMetrics->onWriteDuringCriticalSection(); - activeSection = getActiveSection(kTestMetricsName, _cumulativeMetrics.get()); - ASSERT_EQ(activeSection.getIntField("countWritesDuringCriticalSection"), 1); + active = getCumulativeMetricsReportForSection(kActive); + ASSERT_EQ(active.getIntField("countWritesDuringCriticalSection"), 1); } TEST_F(ShardingDataTransformMetricsTestFixture, ReportContainsWriteToStashedCollection) { @@ -372,13 +355,13 @@ TEST_F(ShardingDataTransformMetricsTestFixture, ReportContainsWriteToStashedColl ObserverMock recipient{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kRecipient}; auto ignore = _cumulativeMetrics->registerInstanceMetrics(&recipient); - auto activeSection = getActiveSection(kTestMetricsName, _cumulativeMetrics.get()); - ASSERT_EQ(activeSection.getIntField("countWritesToStashCollections"), 0); + auto active = getCumulativeMetricsReportForSection(kActive); + ASSERT_EQ(active.getIntField("countWritesToStashCollections"), 0); _cumulativeMetrics->onWriteToStashedCollections(); - activeSection = getActiveSection(kTestMetricsName, _cumulativeMetrics.get()); - ASSERT_EQ(activeSection.getIntField("countWritesToStashCollections"), 1); + active = getCumulativeMetricsReportForSection(kActive); + ASSERT_EQ(active.getIntField("countWritesToStashCollections"), 1); } TEST_F(ShardingDataTransformMetricsTestFixture, ReportContainsBatchRetrievedDuringCloning) { @@ -386,17 +369,15 @@ TEST_F(ShardingDataTransformMetricsTestFixture, ReportContainsBatchRetrievedDuri ObserverMock recipient{Date_t::fromMillisSinceEpoch(100), 100, 100, Role::kRecipient}; auto ignore = _cumulativeMetrics->registerInstanceMetrics(&recipient); - auto latencySection = getLatencySection(kTestMetricsName, _cumulativeMetrics.get()); - ASSERT_EQ(latencySection.getIntField("collectionCloningTotalRemoteBatchesRetrieved"), 0); - ASSERT_EQ(latencySection.getIntField("collectionCloningTotalRemoteBatchRetrievalTimeMillis"), - 0); + auto latencies = getCumulativeMetricsReportForSection(kLatencies); + ASSERT_EQ(latencies.getIntField("collectionCloningTotalRemoteBatchesRetrieved"), 0); + ASSERT_EQ(latencies.getIntField("collectionCloningTotalRemoteBatchRetrievalTimeMillis"), 0); - _cumulativeMetrics->onCloningTotalRemoteBatchRetrieval(Milliseconds(19)); + _cumulativeMetrics->onCloningRemoteBatchRetrieval(Milliseconds(19)); - latencySection = getLatencySection(kTestMetricsName, _cumulativeMetrics.get()); - ASSERT_EQ(latencySection.getIntField("collectionCloningTotalRemoteBatchesRetrieved"), 1); - ASSERT_EQ(latencySection.getIntField("collectionCloningTotalRemoteBatchRetrievalTimeMillis"), - 19); + latencies = getCumulativeMetricsReportForSection(kLatencies); + ASSERT_EQ(latencies.getIntField("collectionCloningTotalRemoteBatchesRetrieved"), 1); + ASSERT_EQ(latencies.getIntField("collectionCloningTotalRemoteBatchRetrievalTimeMillis"), 19); } } // namespace diff --git a/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp b/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp index 0c64c2ee65c..7f04201316e 100644 --- a/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp +++ b/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp @@ -318,9 +318,8 @@ void ShardingDataTransformInstanceMetrics::onReadDuringCriticalSection() { _cumulativeMetrics->onReadDuringCriticalSection(); } -void ShardingDataTransformInstanceMetrics::onCloningTotalRemoteBatchRetrieval( - Milliseconds elapsed) { - _cumulativeMetrics->onCloningTotalRemoteBatchRetrieval(elapsed); +void ShardingDataTransformInstanceMetrics::onCloningRemoteBatchRetrieval(Milliseconds elapsed) { + _cumulativeMetrics->onCloningRemoteBatchRetrieval(elapsed); } ShardingDataTransformCumulativeMetrics* diff --git a/src/mongo/db/s/sharding_data_transform_instance_metrics.h b/src/mongo/db/s/sharding_data_transform_instance_metrics.h index ecd8a776afb..9c78c547150 100644 --- a/src/mongo/db/s/sharding_data_transform_instance_metrics.h +++ b/src/mongo/db/s/sharding_data_transform_instance_metrics.h @@ -92,7 +92,7 @@ public: void setDocumentsToProcessCounts(int64_t documentCount, int64_t totalDocumentsSizeBytes); void setCoordinatorHighEstimateRemainingTimeMillis(Milliseconds milliseconds); void setCoordinatorLowEstimateRemainingTimeMillis(Milliseconds milliseconds); - void onCloningTotalRemoteBatchRetrieval(Milliseconds elapsed); + void onCloningRemoteBatchRetrieval(Milliseconds elapsed); void onWriteToStashedCollections(); void onReadDuringCriticalSection(); diff --git a/src/mongo/db/s/sharding_data_transform_instance_metrics_test.cpp b/src/mongo/db/s/sharding_data_transform_instance_metrics_test.cpp index a690d908879..c6fc1755328 100644 --- a/src/mongo/db/s/sharding_data_transform_instance_metrics_test.cpp +++ b/src/mongo/db/s/sharding_data_transform_instance_metrics_test.cpp @@ -164,6 +164,14 @@ public: std::move(fieldNameProvider), std::move(mock)); } + + void createMetricsAndAssertIncrementsCumulativeMetricsField( + const std::function<void(ShardingDataTransformInstanceMetrics*)>& mutate, + Section section, + const StringData& fieldName) { + auto metrics = createInstanceMetrics(UUID::gen(), Role::kCoordinator); + assertIncrementsCumulativeMetricsField(metrics.get(), mutate, section, fieldName); + } }; TEST_F(ShardingDataTransformInstanceMetricsTest, RegisterAndDeregisterMetrics) { @@ -362,5 +370,104 @@ TEST_F(ShardingDataTransformInstanceMetricsTest, ASSERT_FALSE(report.hasField("allShardsLowestRemainingOperationTimeEstimatedSecs")); } +TEST_F(ShardingDataTransformInstanceMetricsTest, OnStartedIncrementsCumulativeMetrics) { + createMetricsAndAssertIncrementsCumulativeMetricsField( + [](auto metrics) { metrics->onStarted(); }, Section::kRoot, "countStarted"); +} + +TEST_F(ShardingDataTransformInstanceMetricsTest, OnSuccessIncrementsCumulativeMetrics) { + createMetricsAndAssertIncrementsCumulativeMetricsField( + [](auto metrics) { metrics->onSuccess(); }, Section::kRoot, "countSucceeded"); +} + +TEST_F(ShardingDataTransformInstanceMetricsTest, OnFailureIncrementsCumulativeMetrics) { + createMetricsAndAssertIncrementsCumulativeMetricsField( + [](auto metrics) { metrics->onFailure(); }, Section::kRoot, "countFailed"); +} + +TEST_F(ShardingDataTransformInstanceMetricsTest, OnCanceledIncrementsCumulativeMetrics) { + createMetricsAndAssertIncrementsCumulativeMetricsField( + [](auto metrics) { metrics->onCanceled(); }, Section::kRoot, "countCanceled"); +} + +TEST_F(ShardingDataTransformInstanceMetricsTest, SetChunkImbalanceIncrementsCumulativeMetrics) { + createMetricsAndAssertIncrementsCumulativeMetricsField( + [](auto metrics) { metrics->setLastOpEndingChunkImbalance(1); }, + Section::kRoot, + "lastOpEndingChunkImbalance"); +} + +TEST_F(ShardingDataTransformInstanceMetricsTest, + OnReadDuringCriticalSectionIncrementsCumulativeMetrics) { + createMetricsAndAssertIncrementsCumulativeMetricsField( + [](auto metrics) { metrics->onReadDuringCriticalSection(); }, + Section::kActive, + "countReadsDuringCriticalSection"); +} + +TEST_F(ShardingDataTransformInstanceMetricsTest, + OnWriteDuringCriticalSectionIncrementsCumulativeMetrics) { + createMetricsAndAssertIncrementsCumulativeMetricsField( + [](auto metrics) { metrics->onWriteDuringCriticalSection(); }, + Section::kActive, + "countWritesDuringCriticalSection"); +} + +TEST_F(ShardingDataTransformInstanceMetricsTest, + OnWriteToStashCollectionsIncrementsCumulativeMetrics) { + createMetricsAndAssertIncrementsCumulativeMetricsField( + [](auto metrics) { metrics->onWriteToStashedCollections(); }, + Section::kActive, + "countWritesToStashCollections"); +} + +TEST_F(ShardingDataTransformInstanceMetricsTest, + OnCloningRemoteBatchRetrievalIncrementsCumulativeMetricsCount) { + createMetricsAndAssertIncrementsCumulativeMetricsField( + [](auto metrics) { metrics->onCloningRemoteBatchRetrieval(Milliseconds{0}); }, + Section::kLatencies, + "collectionCloningTotalRemoteBatchesRetrieved"); +} + +TEST_F(ShardingDataTransformInstanceMetricsTest, + OnCloningRemoteBatchRetrievalIncrementsCumulativeMetricsTime) { + createMetricsAndAssertIncrementsCumulativeMetricsField( + [](auto metrics) { metrics->onCloningRemoteBatchRetrieval(Milliseconds{1}); }, + Section::kLatencies, + "collectionCloningTotalRemoteBatchRetrievalTimeMillis"); +} + +TEST_F(ShardingDataTransformInstanceMetricsTest, + OnDocumentsProcessedIncrementsCumulativeMetricsDocumentCount) { + createMetricsAndAssertIncrementsCumulativeMetricsField( + [](auto metrics) { metrics->onDocumentsProcessed(1, 0, Milliseconds{0}); }, + Section::kActive, + "documentsProcessed"); +} + +TEST_F(ShardingDataTransformInstanceMetricsTest, + OnDocumentsProcessedIncrementsCumulativeMetricsLocalInserts) { + createMetricsAndAssertIncrementsCumulativeMetricsField( + [](auto metrics) { metrics->onDocumentsProcessed(1, 0, Milliseconds{0}); }, + Section::kLatencies, + "collectionCloningTotalLocalInserts"); +} + +TEST_F(ShardingDataTransformInstanceMetricsTest, + OnDocumentsProcessedIncrementsCumulativeMetricsByteCount) { + createMetricsAndAssertIncrementsCumulativeMetricsField( + [](auto metrics) { metrics->onDocumentsProcessed(0, 1, Milliseconds{0}); }, + Section::kActive, + "bytesWritten"); +} + +TEST_F(ShardingDataTransformInstanceMetricsTest, + OnDocumentsProcessedIncrementsCumulativeMetricsLocalInsertTime) { + createMetricsAndAssertIncrementsCumulativeMetricsField( + [](auto metrics) { metrics->onDocumentsProcessed(0, 0, Milliseconds{1}); }, + Section::kLatencies, + "collectionCloningTotalLocalInsertTimeMillis"); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/s/sharding_data_transform_metrics_test_fixture.h b/src/mongo/db/s/sharding_data_transform_metrics_test_fixture.h index cc66423e231..e109215195c 100644 --- a/src/mongo/db/s/sharding_data_transform_metrics_test_fixture.h +++ b/src/mongo/db/s/sharding_data_transform_metrics_test_fixture.h @@ -87,6 +87,18 @@ private: ShardingDataTransformMetrics::Role _role; }; +class ShardingDataTransformCumulativeMetricsFieldNameProviderForTest + : public ShardingDataTransformCumulativeMetricsFieldNameProvider { +public: + virtual ~ShardingDataTransformCumulativeMetricsFieldNameProviderForTest() = default; + virtual StringData getForDocumentsProcessed() const override { + return "documentsProcessed"; + } + virtual StringData getForBytesWritten() const override { + return "bytesWritten"; + } +}; + class ShardingDataTransformMetricsTestFixture : public unittest::Test { public: @@ -138,20 +150,86 @@ protected: _cumulativeMetrics = initializeCumulativeMetrics(); } - static BSONObj getLatencySection(StringData rootName, - const ShardingDataTransformCumulativeMetrics* metrics) { - BSONObjBuilder bob; - metrics->reportForServerStatus(&bob); - auto report = bob.done(); - return report.getObjectField(rootName).getObjectField("latencies").getOwned(); + virtual StringData getRootSectionName() { + return kTestMetricsName; } - static BSONObj getActiveSection(StringData rootName, - const ShardingDataTransformCumulativeMetrics* metrics) { + enum Section { kRoot, kActive, kLatencies, kCurrentInSteps }; + + StringData getSectionName(Section section) { + switch (section) { + case kRoot: + return getRootSectionName(); + case kActive: + return "active"; + case kLatencies: + return "latencies"; + case kCurrentInSteps: + return "currentInSteps"; + } + MONGO_UNREACHABLE; + } + + BSONObj getCumulativeMetricsReport() { BSONObjBuilder bob; - metrics->reportForServerStatus(&bob); - auto report = bob.done(); - return report.getObjectField(rootName).getObjectField("active").getOwned(); + getCumulativeMetrics()->reportForServerStatus(&bob); + return bob.obj().getObjectField(getSectionName(kRoot)).getOwned(); + } + + BSONObj getReportSection(BSONObj report, Section section) { + if (section == kRoot) { + return report.getOwned(); + } + return report.getObjectField(getSectionName(section)).getOwned(); + } + + BSONObj getCumulativeMetricsReportForSection(Section section) { + return getReportSection(getCumulativeMetricsReport(), section); + } + + void assertAltersCumulativeMetrics( + ShardingDataTransformInstanceMetrics* metrics, + const std::function<void(ShardingDataTransformInstanceMetrics*)>& mutateFn, + const std::function<bool(BSONObj, BSONObj)>& verifyFn) { + auto before = getCumulativeMetricsReport(); + mutateFn(metrics); + auto after = getCumulativeMetricsReport(); + ASSERT_TRUE(verifyFn(before, after)); + } + + void assertAltersCumulativeMetricsField( + ShardingDataTransformInstanceMetrics* metrics, + const std::function<void(ShardingDataTransformInstanceMetrics*)>& mutateFn, + Section section, + const StringData& fieldName, + const std::function<bool(int, int)>& verifyFn) { + assertAltersCumulativeMetrics(metrics, mutateFn, [&](auto reportBefore, auto reportAfter) { + auto before = getReportSection(reportBefore, section).getIntField(fieldName); + auto after = getReportSection(reportAfter, section).getIntField(fieldName); + return verifyFn(before, after); + }); + } + + void assertIncrementsCumulativeMetricsField( + ShardingDataTransformInstanceMetrics* metrics, + const std::function<void(ShardingDataTransformInstanceMetrics*)>& mutateFn, + Section section, + const StringData& fieldName) { + assertAltersCumulativeMetricsField( + metrics, mutateFn, section, fieldName, [](auto before, auto after) { + return after > before; + }); + } + + void assertDecrementsCumulativeMetricsField( + ShardingDataTransformInstanceMetrics* metrics, + const std::function<void(ShardingDataTransformInstanceMetrics*)>& mutateFn, + Section section, + const StringData& fieldName) { + assertAltersCumulativeMetricsField( + metrics, mutateFn, section, fieldName, [](auto before, auto after) { + return after < before; + }); } constexpr static auto kTestMetricsName = "testMetrics"; @@ -168,7 +246,7 @@ protected: virtual std::unique_ptr<ShardingDataTransformCumulativeMetrics> initializeCumulativeMetrics() { return std::make_unique<ShardingDataTransformCumulativeMetrics>( kTestMetricsName, - std::make_unique<ShardingDataTransformCumulativeMetricsFieldNamePlaceholder>()); + std::make_unique<ShardingDataTransformCumulativeMetricsFieldNameProviderForTest>()); } const ObserverMock* getYoungestObserver() { |