/** * Copyright (C) 2022-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, * as published by MongoDB, Inc. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * Server Side Public License for more details. * * You should have received a copy of the Server Side Public License * along with this program. If not, see * . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the Server Side Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #pragma once #include "mongo/bson/bsonobj.h" #include "mongo/db/namespace_string.h" #include "mongo/db/s/metrics_state_holder.h" #include "mongo/db/s/resharding/resharding_cumulative_metrics.h" #include "mongo/db/s/resharding/resharding_metrics_field_name_provider.h" #include "mongo/db/s/resharding/resharding_metrics_helpers.h" #include "mongo/db/s/resharding/resharding_oplog_applier_progress_gen.h" #include "mongo/db/s/sharding_data_transform_instance_metrics.h" #include "mongo/util/uuid.h" namespace mongo { class ReshardingMetrics : public ShardingDataTransformInstanceMetrics { public: using State = stdx::variant; class DonorState { public: using MetricsType = ReshardingCumulativeMetrics::DonorStateEnum; explicit DonorState(DonorStateEnum enumVal); MetricsType toMetrics() const; DonorStateEnum getState() const; private: const DonorStateEnum _enumVal; }; class RecipientState { public: using MetricsType = ReshardingCumulativeMetrics::RecipientStateEnum; explicit RecipientState(RecipientStateEnum enumVal); MetricsType toMetrics() const; RecipientStateEnum getState() const; private: RecipientStateEnum _enumVal; }; class CoordinatorState { public: using MetricsType = ReshardingCumulativeMetrics::CoordinatorStateEnum; explicit CoordinatorState(CoordinatorStateEnum enumVal); MetricsType toMetrics() const; CoordinatorStateEnum getState() const; private: CoordinatorStateEnum _enumVal; }; struct ExternallyTrackedRecipientFields { public: void accumulateFrom(const ReshardingOplogApplierProgress& progressDoc); boost::optional documentCountCopied; boost::optional documentBytesCopied; boost::optional oplogEntriesFetched; boost::optional oplogEntriesApplied; boost::optional insertsApplied; boost::optional updatesApplied; boost::optional deletesApplied; boost::optional writesToStashCollections; }; ReshardingMetrics(const CommonReshardingMetadata& metadata, Role role, ClockSource* clockSource, ShardingDataTransformCumulativeMetrics* cumulativeMetrics); ReshardingMetrics(const CommonReshardingMetadata& metadata, Role role, ClockSource* clockSource, ShardingDataTransformCumulativeMetrics* cumulativeMetrics, State state); ReshardingMetrics(UUID instanceId, BSONObj shardKey, NamespaceString nss, Role role, Date_t startTime, ClockSource* clockSource, ShardingDataTransformCumulativeMetrics* cumulativeMetrics); ReshardingMetrics(UUID instanceId, BSONObj shardKey, NamespaceString nss, Role role, Date_t startTime, ClockSource* clockSource, ShardingDataTransformCumulativeMetrics* cumulativeMetrics, State state); ~ReshardingMetrics(); static std::unique_ptr makeInstance(UUID instanceId, BSONObj shardKey, NamespaceString nss, Role role, Date_t startTime, ServiceContext* serviceContext); template static auto initializeFrom(const T& document, ClockSource* clockSource, ShardingDataTransformCumulativeMetrics* cumulativeMetrics) { static_assert(resharding_metrics::isStateDocument); auto result = std::make_unique(document.getCommonReshardingMetadata(), resharding_metrics::getRoleForStateDocument(), clockSource, cumulativeMetrics, resharding_metrics::getState(document)); result->restoreRoleSpecificFields(document); return result; } template static auto initializeFrom(const T& document, ServiceContext* serviceContext) { return initializeFrom( document, serviceContext->getFastClockSource(), ShardingDataTransformCumulativeMetrics::getForResharding(serviceContext)); } template void onStateTransition(T before, boost::none_t after) { _stateHolder.onStateTransition(before, after); } template void onStateTransition(boost::none_t before, T after) { _stateHolder.onStateTransition(before, after); } template void onStateTransition(T before, T after) { _stateHolder.onStateTransition(before, after); } template static bool mustRestoreExternallyTrackedRecipientFields(StateOrStateVariant stateOrVariant) { if constexpr (std::is_same_v) { return stdx::visit( [](auto v) { return mustRestoreExternallyTrackedRecipientFieldsImpl(v); }, stateOrVariant); } else { return mustRestoreExternallyTrackedRecipientFieldsImpl(stateOrVariant); } } BSONObj reportForCurrentOp() const noexcept override; void onUpdateApplied(); void onInsertApplied(); void onDeleteApplied(); void onOplogEntriesFetched(int64_t numEntries, Milliseconds elapsed); void onOplogEntriesApplied(int64_t numEntries); void onApplyingBegin(); void onApplyingEnd(); void setApplyingBegin(Date_t date); void setApplyingEnd(Date_t date); void onLocalInsertDuringOplogFetching(Milliseconds elapsed); void onBatchRetrievedDuringOplogApplying(Milliseconds elapsed); void onOplogLocalBatchApplied(Milliseconds elapsed); void restoreExternallyTrackedRecipientFields(const ExternallyTrackedRecipientFields& values); Seconds getApplyingElapsedTimeSecs() const; Date_t getApplyingBegin() const; Date_t getApplyingEnd() const; protected: boost::optional getRecipientHighEstimateRemainingTimeMillis() const override; void restoreOplogEntriesFetched(int64_t numEntries); void restoreOplogEntriesApplied(int64_t numEntries); void restoreUpdatesApplied(int64_t count); void restoreInsertsApplied(int64_t count); void restoreDeletesApplied(int64_t count); virtual StringData getStateString() const noexcept override; void restoreApplyingBegin(Date_t date); void restoreApplyingEnd(Date_t date); private: std::string createOperationDescription() const noexcept override; void restoreRecipientSpecificFields(const ReshardingRecipientDocument& document); void restoreCoordinatorSpecificFields(const ReshardingCoordinatorDocument& document); ReshardingCumulativeMetrics* getReshardingCumulativeMetrics(); template void restoreRoleSpecificFields(const T& document) { if constexpr (std::is_same_v) { restoreRecipientSpecificFields(document); return; } if constexpr (std::is_same_v) { restoreCoordinatorSpecificFields(document); return; } } template static bool mustRestoreExternallyTrackedRecipientFieldsImpl(T state) { static_assert(resharding_metrics::isState); if constexpr (std::is_same_v) { return state > RecipientStateEnum::kAwaitingFetchTimestamp; } else { return false; } } template void restorePhaseDurationFields(const T& document) { static_assert(resharding_metrics::isStateDocument); auto metrics = document.getMetrics(); if (!metrics) { return; } auto copyDurations = metrics->getDocumentCopy(); if (copyDurations) { auto copyingBegin = copyDurations->getStart(); if (copyingBegin) { restoreCopyingBegin(*copyingBegin); } auto copyingEnd = copyDurations->getStop(); if (copyingEnd) { restoreCopyingEnd(*copyingEnd); } } auto applyDurations = metrics->getOplogApplication(); if (applyDurations) { auto applyingBegin = applyDurations->getStart(); if (applyingBegin) { restoreApplyingBegin(*applyingBegin); } auto applyingEnd = applyDurations->getStop(); if (applyingEnd) { restoreApplyingEnd(*applyingEnd); } } } template void invokeIfAllSet(MemberFn&& fn, const boost::optional&... args) { if (!(args && ...)) { return; } std::invoke(fn, this, *args...); } AtomicWord _ableToEstimateRemainingRecipientTime; AtomicWord _deletesApplied; AtomicWord _insertsApplied; AtomicWord _updatesApplied; AtomicWord _oplogEntriesApplied; AtomicWord _oplogEntriesFetched; AtomicWord _applyingStartTime; AtomicWord _applyingEndTime; MetricsStateHolder _stateHolder; ShardingDataTransformInstanceMetrics::UniqueScopedObserver _scopedObserver; ReshardingMetricsFieldNameProvider* _reshardingFieldNames; }; } // namespace mongo