diff options
Diffstat (limited to 'src/mongo/db/s/resharding/resharding_metrics_test.cpp')
-rw-r--r-- | src/mongo/db/s/resharding/resharding_metrics_test.cpp | 343 |
1 files changed, 343 insertions, 0 deletions
diff --git a/src/mongo/db/s/resharding/resharding_metrics_test.cpp b/src/mongo/db/s/resharding/resharding_metrics_test.cpp new file mode 100644 index 00000000000..e57581cf8dd --- /dev/null +++ b/src/mongo/db/s/resharding/resharding_metrics_test.cpp @@ -0,0 +1,343 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/resharding/resharding_metrics.h" +#include "mongo/db/s/resharding/resharding_service_test_helpers.h" +#include "mongo/db/s/resharding/resharding_util.h" +#include "mongo/db/s/sharding_data_transform_cumulative_metrics.h" +#include "mongo/db/s/sharding_data_transform_metrics_test_fixture.h" +#include "mongo/unittest/unittest.h" + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest + + +namespace mongo { +namespace { + +constexpr auto kRunningTime = Seconds(12345); +const auto kShardKey = BSON("newKey" << 1); + +class ReshardingMetricsTest : public ShardingDataTransformMetricsTestFixture { + +public: + std::unique_ptr<ReshardingMetrics> createInstanceMetrics(ClockSource* clockSource, + UUID instanceId = UUID::gen(), + Role role = Role::kDonor) { + return std::make_unique<ReshardingMetrics>(instanceId, + BSON("y" << 1), + kTestNamespace, + role, + clockSource->now(), + clockSource, + &_cumulativeMetrics); + } + + const UUID& getSourceCollectionId() { + static UUID id = UUID::gen(); + return id; + } + + template <typename T> + BSONObj getReportFromStateDocument(T document) { + auto metrics = + ReshardingMetrics::initializeFrom(document, getClockSource(), &_cumulativeMetrics); + return metrics->reportForCurrentOp(); + } + + ReshardingRecipientDocument createRecipientDocument(RecipientStateEnum state, + const UUID& operationId) { + RecipientShardContext recipientCtx; + recipientCtx.setState(state); + ReshardingRecipientDocument doc{std::move(recipientCtx), {ShardId{"donor1"}}, 5}; + doc.setCommonReshardingMetadata(createCommonReshardingMetadata(operationId)); + return doc; + } + + ReshardingDonorDocument createDonorDocument(DonorStateEnum state, const UUID& operationId) { + DonorShardContext donorCtx; + donorCtx.setState(state); + ReshardingDonorDocument doc{std::move(donorCtx), {ShardId{"recipient1"}}}; + doc.setCommonReshardingMetadata(createCommonReshardingMetadata(operationId)); + return doc; + } + + ReshardingCoordinatorDocument createCoordinatorDocument(CoordinatorStateEnum state, + const UUID& operationId) { + ReshardingCoordinatorDocument doc{state, {}, {}}; + doc.setCommonReshardingMetadata(createCommonReshardingMetadata(operationId)); + return doc; + } + + CommonReshardingMetadata createCommonReshardingMetadata(const UUID& operationId) { + CommonReshardingMetadata metadata{operationId, + kTestNamespace, + getSourceCollectionId(), + resharding::constructTemporaryReshardingNss( + kTestNamespace.db(), getSourceCollectionId()), + kShardKey}; + metadata.setStartTime(getClockSource()->now() - kRunningTime); + return metadata; + } + + void verifyCommonCurrentOpFields(const BSONObj& report) { + ASSERT_EQ(report.getStringField("type"), "op"); + ASSERT_EQ(report.getStringField("op"), "command"); + auto originalCommand = report.getObjectField("originatingCommand"); + ASSERT_EQ(originalCommand.getStringField("reshardCollection"), kTestNamespace.toString()); + ASSERT_EQ(originalCommand.getObjectField("key").woCompare(kShardKey), 0); + ASSERT_EQ(originalCommand.getStringField("unique"), "false"); + ASSERT_EQ(originalCommand.getObjectField("collation") + .woCompare(BSON("locale" + << "simple")), + 0); + ASSERT_EQ(report.getIntField("totalOperationTimeElapsedSecs"), kRunningTime.count()); + } + + template <typename MetricsDocument, typename Document> + void doRestoreOngoingPhaseTest( + const std::function<Document()>& createDocument, + const std::function<void(MetricsDocument&, ReshardingMetricsTimeInterval)>& setInterval, + const std::string& fieldName) { + doRestorePhaseTestImpl<MetricsDocument>(createDocument, setInterval, fieldName, false); + } + + template <typename MetricsDocument, typename Document> + void doRestoreCompletedPhaseTest( + const std::function<Document()>& createDocument, + const std::function<void(MetricsDocument&, ReshardingMetricsTimeInterval)>& setInterval, + const std::string& fieldName) { + doRestorePhaseTestImpl<MetricsDocument>(createDocument, setInterval, fieldName, true); + } + + template <typename MetricsDocument, typename Document> + void doRestorePhaseTestImpl( + const std::function<Document()>& createDocument, + const std::function<void(MetricsDocument&, ReshardingMetricsTimeInterval)>& setInterval, + const std::string& fieldName, + bool completed) { + constexpr auto kInterval = Milliseconds{5000}; + auto clock = getClockSource(); + const auto start = clock->now(); + boost::optional<long long> finishedPhaseDuration; + + auto getExpectedDuration = [&] { + if (finishedPhaseDuration) { + return *finishedPhaseDuration; + } + return durationCount<Seconds>(clock->now() - start); + }; + + ReshardingMetricsTimeInterval interval; + interval.setStart(start); + if (completed) { + clock->advance(kInterval); + interval.setStop(clock->now()); + finishedPhaseDuration = durationCount<Seconds>(kInterval); + } + MetricsDocument metricsDoc; + setInterval(metricsDoc, std::move(interval)); + auto doc = createDocument(); + doc.setMetrics(metricsDoc); + + auto metrics = + ReshardingMetrics::initializeFrom(doc, getClockSource(), &_cumulativeMetrics); + + clock->advance(kInterval); + auto report = metrics->reportForCurrentOp(); + ASSERT_EQ(report.getIntField(fieldName), getExpectedDuration()); + + clock->advance(kInterval); + report = metrics->reportForCurrentOp(); + ASSERT_EQ(report.getIntField(fieldName), getExpectedDuration()); + } +}; + + +TEST_F(ReshardingMetricsTest, ReportForCurrentOpShouldHaveGlobalIndexDescription) { + std::vector<Role> roles{Role::kCoordinator, Role::kDonor, Role::kRecipient}; + + std::for_each(roles.begin(), roles.end(), [&](Role role) { + auto instanceId = UUID::gen(); + auto metrics = createInstanceMetrics(getClockSource(), instanceId, role); + auto report = metrics->reportForCurrentOp(); + + ASSERT_EQ(report.getStringField("desc").toString(), + fmt::format("ReshardingMetrics{}Service {}", + ShardingDataTransformMetrics::getRoleName(role), + instanceId.toString())); + }); +} + +TEST_F(ReshardingMetricsTest, RestoresGeneralFieldsFromRecipientStateDocument) { + auto state = RecipientStateEnum::kAwaitingFetchTimestamp; + auto opId = UUID::gen(); + auto report = getReportFromStateDocument(createRecipientDocument(state, opId)); + + verifyCommonCurrentOpFields(report); + ASSERT_EQ(report.getStringField("desc"), + "ReshardingMetricsRecipientService " + opId.toString()); + ASSERT_EQ(report.getStringField("recipientState").toString(), RecipientState_serializer(state)); +} + +TEST_F(ReshardingMetricsTest, RestoresByteAndDocumentCountsFromRecipientStateDocument) { + constexpr auto kDocsToCopy = 100; + constexpr auto kBytesToCopy = 1000; + constexpr auto kDocsCopied = 101; + constexpr auto kBytesCopied = 1001; + ReshardingRecipientMetrics metrics; + metrics.setApproxDocumentsToCopy(kDocsToCopy); + metrics.setApproxBytesToCopy(kBytesToCopy); + metrics.setFinalBytesCopiedCount(kBytesCopied); + metrics.setFinalDocumentsCopiedCount(kDocsCopied); + auto doc = createRecipientDocument(RecipientStateEnum::kApplying, UUID::gen()); + doc.setMetrics(metrics); + auto report = getReportFromStateDocument(std::move(doc)); + + ASSERT_EQ(report.getIntField("approxDocumentsToCopy"), kDocsToCopy); + ASSERT_EQ(report.getIntField("approxBytesToCopy"), kBytesToCopy); + ASSERT_EQ(report.getIntField("documentsCopied"), kDocsCopied); + ASSERT_EQ(report.getIntField("bytesCopied"), kBytesCopied); +} + +TEST_F(ReshardingMetricsTest, RestoresByteAndDocumentCountsDuringCloning) { + constexpr auto kDocsCopied = 50; + constexpr auto kBytesCopied = 500; + + auto metrics = createInstanceMetrics(getClockSource(), UUID::gen(), Role::kRecipient); + metrics->restoreDocumentsCopied(kDocsCopied, kBytesCopied); + auto report = metrics->reportForCurrentOp(); + + ASSERT_EQ(report.getIntField("documentsCopied"), kDocsCopied); + ASSERT_EQ(report.getIntField("bytesCopied"), kBytesCopied); +} + +TEST_F(ReshardingMetricsTest, RestoresOngoingCloningTimeFromRecipientStateDocument) { + doRestoreOngoingPhaseTest<ReshardingRecipientMetrics, ReshardingRecipientDocument>( + [this] { return createRecipientDocument(RecipientStateEnum::kCloning, UUID::gen()); }, + [this](auto& doc, auto interval) { doc.setDocumentCopy(std::move(interval)); }, + "totalCopyTimeElapsedSecs"); +} + +TEST_F(ReshardingMetricsTest, RestoresFinishedCloningTimeFromRecipientStateDocument) { + doRestoreCompletedPhaseTest<ReshardingRecipientMetrics, ReshardingRecipientDocument>( + [this] { return createRecipientDocument(RecipientStateEnum::kApplying, UUID::gen()); }, + [this](auto& doc, auto interval) { doc.setDocumentCopy(std::move(interval)); }, + "totalCopyTimeElapsedSecs"); +} + +TEST_F(ReshardingMetricsTest, RestoresOngoingApplyingTimeFromRecipientStateDocument) { + doRestoreOngoingPhaseTest<ReshardingRecipientMetrics, ReshardingRecipientDocument>( + [this] { return createRecipientDocument(RecipientStateEnum::kApplying, UUID::gen()); }, + [this](auto& doc, auto interval) { doc.setOplogApplication(std::move(interval)); }, + "totalApplyTimeElapsedSecs"); +} + +TEST_F(ReshardingMetricsTest, RestoresFinishedApplyingTimeFromRecipientStateDocument) { + doRestoreCompletedPhaseTest<ReshardingRecipientMetrics, ReshardingRecipientDocument>( + [this] { + return createRecipientDocument(RecipientStateEnum::kStrictConsistency, UUID::gen()); + }, + [this](auto& doc, auto interval) { doc.setOplogApplication(std::move(interval)); }, + "totalApplyTimeElapsedSecs"); +} + +TEST_F(ReshardingMetricsTest, RestoresGeneralFieldsFromDonorStateDocument) { + auto state = DonorStateEnum::kDonatingInitialData; + auto opId = UUID::gen(); + auto report = getReportFromStateDocument(createDonorDocument(state, opId)); + + verifyCommonCurrentOpFields(report); + ASSERT_EQ(report.getStringField("desc"), "ReshardingMetricsDonorService " + opId.toString()); + ASSERT_EQ(report.getStringField("donorState").toString(), DonorState_serializer(state)); +} + +TEST_F(ReshardingMetricsTest, RestoresGeneralFieldsFromCoordinatorStateDocument) { + auto state = CoordinatorStateEnum::kPreparingToDonate; + auto opId = UUID::gen(); + auto report = getReportFromStateDocument(createCoordinatorDocument(state, opId)); + + verifyCommonCurrentOpFields(report); + ASSERT_EQ(report.getStringField("desc"), + "ReshardingMetricsCoordinatorService " + opId.toString()); + ASSERT_EQ(report.getStringField("coordinatorState").toString(), + CoordinatorState_serializer(state)); +} + +TEST_F(ReshardingMetricsTest, RestoresFromReshardingApplierProgressDocument) { + ReshardingOplogApplierProgress progressDoc; + progressDoc.setInsertsApplied(123); + progressDoc.setUpdatesApplied(456); + progressDoc.setDeletesApplied(789); + progressDoc.setWritesToStashCollections(800); + + auto metrics = createInstanceMetrics(getClockSource(), UUID::gen(), Role::kRecipient); + metrics->accumulateFrom(progressDoc); + auto report = metrics->reportForCurrentOp(); + + ASSERT_EQ(report.getIntField("insertsApplied"), 123); + ASSERT_EQ(report.getIntField("updatesApplied"), 456); + ASSERT_EQ(report.getIntField("deletesApplied"), 789); + ASSERT_EQ(report.getIntField("countWritesToStashCollections"), 800); +} + +TEST_F(ReshardingMetricsTest, RestoresOngoingCloningTimeFromCoordinatorStateDocument) { + doRestoreOngoingPhaseTest<ReshardingCoordinatorMetrics, ReshardingCoordinatorDocument>( + [this] { return createCoordinatorDocument(CoordinatorStateEnum::kCloning, UUID::gen()); }, + [this](auto& doc, auto interval) { doc.setDocumentCopy(std::move(interval)); }, + "totalCopyTimeElapsedSecs"); +} + +TEST_F(ReshardingMetricsTest, RestoresFinishedCloningTimeFromCoordinatorStateDocument) { + doRestoreCompletedPhaseTest<ReshardingCoordinatorMetrics, ReshardingCoordinatorDocument>( + [this] { return createCoordinatorDocument(CoordinatorStateEnum::kApplying, UUID::gen()); }, + [this](auto& doc, auto interval) { doc.setDocumentCopy(std::move(interval)); }, + "totalCopyTimeElapsedSecs"); +} + +TEST_F(ReshardingMetricsTest, RestoresOngoingApplyingTimeFromCoordinatorStateDocument) { + doRestoreOngoingPhaseTest<ReshardingCoordinatorMetrics, ReshardingCoordinatorDocument>( + [this] { return createCoordinatorDocument(CoordinatorStateEnum::kApplying, UUID::gen()); }, + [this](auto& doc, auto interval) { doc.setOplogApplication(std::move(interval)); }, + "totalApplyTimeElapsedSecs"); +} + +TEST_F(ReshardingMetricsTest, RestoresFinishedApplyingTimeFromCoordinatorStateDocument) { + doRestoreCompletedPhaseTest<ReshardingCoordinatorMetrics, ReshardingCoordinatorDocument>( + [this] { + return createCoordinatorDocument(CoordinatorStateEnum::kBlockingWrites, UUID::gen()); + }, + [this](auto& doc, auto interval) { doc.setOplogApplication(std::move(interval)); }, + "totalApplyTimeElapsedSecs"); +} + +} // namespace +} // namespace mongo |