From e53e5cbb1edd1575ba04698d104b0a87e29100a2 Mon Sep 17 00:00:00 2001 From: Randolph Tan Date: Tue, 24 May 2022 15:40:35 +0000 Subject: SERVER-66422 Switch over to new resharding metrics --- src/mongo/db/s/SConscript | 3 - .../s/config/configsvr_reshard_collection_cmd.cpp | 7 +- .../s/resharding/resharding_collection_cloner.cpp | 24 +- .../db/s/resharding/resharding_collection_cloner.h | 23 +- .../resharding_collection_cloner_test.cpp | 145 ---- .../resharding_coordinator_commit_monitor.cpp | 10 +- .../resharding_coordinator_commit_monitor_test.cpp | 8 +- .../resharding/resharding_coordinator_service.cpp | 168 +--- .../resharding_coordinator_service_test.cpp | 27 +- .../s/resharding/resharding_data_replication.cpp | 21 +- .../db/s/resharding/resharding_data_replication.h | 5 - .../resharding_donor_recipient_common.cpp | 18 +- .../db/s/resharding/resharding_donor_service.cpp | 106 +-- .../db/s/resharding/resharding_donor_service.h | 7 - .../s/resharding/resharding_donor_service_test.cpp | 23 +- src/mongo/db/s/resharding/resharding_metrics.cpp | 782 ------------------ src/mongo/db/s/resharding/resharding_metrics.h | 203 ----- .../db/s/resharding/resharding_metrics_helpers.cpp | 3 - .../db/s/resharding/resharding_metrics_new.cpp | 138 +++- src/mongo/db/s/resharding/resharding_metrics_new.h | 56 ++ .../db/s/resharding/resharding_metrics_test.cpp | 895 --------------------- .../s/resharding/resharding_oplog_application.cpp | 39 +- .../db/s/resharding/resharding_oplog_application.h | 3 - .../db/s/resharding/resharding_oplog_applier.cpp | 52 +- .../db/s/resharding/resharding_oplog_applier.h | 11 +- .../s/resharding/resharding_oplog_applier_test.cpp | 31 +- .../resharding_oplog_batch_applier_test.cpp | 6 +- .../resharding_oplog_crud_application_test.cpp | 5 +- .../db/s/resharding/resharding_oplog_fetcher.cpp | 20 +- .../db/s/resharding/resharding_oplog_fetcher.h | 10 +- .../s/resharding/resharding_oplog_fetcher_test.cpp | 19 +- .../s/resharding/resharding_recipient_service.cpp | 160 +--- .../db/s/resharding/resharding_recipient_service.h | 2 - .../resharding_recipient_service_test.cpp | 42 +- src/mongo/db/s/resharding/resharding_util.cpp | 1 - src/mongo/db/s/resharding_test_commands.cpp | 38 +- .../sharding_data_transform_cumulative_metrics.cpp | 24 +- .../s/sharding_data_transform_cumulative_metrics.h | 5 +- ...ding_data_transform_cumulative_metrics_test.cpp | 6 +- .../s/sharding_data_transform_instance_metrics.cpp | 29 + .../s/sharding_data_transform_instance_metrics.h | 23 +- ...arding_data_transform_instance_metrics_test.cpp | 15 + src/mongo/db/s/sharding_data_transform_metrics.cpp | 5 - src/mongo/db/s/sharding_server_status.cpp | 32 +- .../shardsvr_resharding_operation_time_command.cpp | 27 +- 45 files changed, 465 insertions(+), 2812 deletions(-) delete mode 100644 src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp delete mode 100644 src/mongo/db/s/resharding/resharding_metrics.cpp delete mode 100644 src/mongo/db/s/resharding/resharding_metrics.h delete mode 100644 src/mongo/db/s/resharding/resharding_metrics_test.cpp (limited to 'src/mongo/db/s') diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 713f537e9f6..415bd49e852 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -96,7 +96,6 @@ env.Library( 'resharding/resharding_future_util.cpp', 'resharding/resharding_manual_cleanup.cpp', 'resharding/resharding_metrics_helpers.cpp', - 'resharding/resharding_metrics.cpp', 'resharding/resharding_metrics_new.cpp', 'resharding/resharding_op_observer.cpp', 'resharding/resharding_oplog_applier.cpp', @@ -569,7 +568,6 @@ env.CppUnitTest( 'persistent_task_queue_test.cpp', 'range_deletion_util_test.cpp', 'resharding/resharding_agg_test.cpp', - 'resharding/resharding_collection_cloner_test.cpp', 'resharding/resharding_collection_test.cpp', 'resharding/resharding_data_replication_test.cpp', 'resharding/resharding_destined_recipient_test.cpp', @@ -577,7 +575,6 @@ env.CppUnitTest( 'resharding/resharding_donor_recipient_common_test.cpp', 'resharding/resharding_donor_service_test.cpp', 'resharding/resharding_metrics_new_test.cpp', - 'resharding/resharding_metrics_test.cpp', 'resharding/resharding_oplog_applier_test.cpp', 'resharding/resharding_oplog_applier_metrics_test.cpp', 'resharding/resharding_oplog_batch_applier_test.cpp', diff --git a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp index 23ea95511ba..7f284e2c642 100644 --- a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp +++ b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp @@ -203,10 +203,9 @@ public: std::move(existingUUID), std::move(tempReshardingNss), request().getKey()); - if (ShardingDataTransformMetrics::isEnabled()) { - commonMetadata.setStartTime( - opCtx->getServiceContext()->getFastClockSource()->now()); - } + commonMetadata.setStartTime( + opCtx->getServiceContext()->getFastClockSource()->now()); + coordinatorDoc.setCommonReshardingMetadata(std::move(commonMetadata)); coordinatorDoc.setZones(request().getZones()); coordinatorDoc.setPresetReshardedChunks(request().get_presetReshardedChunks()); diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp index f447f9dd1b1..a80bfbb88ec 100644 --- a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp +++ b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp @@ -50,7 +50,6 @@ #include "mongo/db/s/resharding/document_source_resharding_ownership_match.h" #include "mongo/db/s/resharding/resharding_data_copy_util.h" #include "mongo/db/s/resharding/resharding_future_util.h" -#include "mongo/db/s/resharding/resharding_metrics.h" #include "mongo/db/s/resharding/resharding_metrics_new.h" #include "mongo/db/s/resharding/resharding_server_parameters_gen.h" #include "mongo/db/s/resharding/resharding_util.h" @@ -81,14 +80,14 @@ bool collectionHasSimpleCollation(OperationContext* opCtx, const NamespaceString } // namespace -ReshardingCollectionCloner::ReshardingCollectionCloner(std::unique_ptr env, +ReshardingCollectionCloner::ReshardingCollectionCloner(ReshardingMetricsNew* metrics, ShardKeyPattern newShardKeyPattern, NamespaceString sourceNss, const UUID& sourceUUID, ShardId recipientShard, Timestamp atClusterTime, NamespaceString outputNss) - : _env(std::move(env)), + : _metrics(metrics), _newShardKeyPattern(std::move(newShardKeyPattern)), _sourceNss(std::move(sourceNss)), _sourceUUID(std::move(sourceUUID)), @@ -270,12 +269,9 @@ bool ReshardingCollectionCloner::doOneBatch(OperationContext* opCtx, Pipeline& p Timer latencyTimer; auto batch = resharding::data_copy::fillBatchForInsert( pipeline, resharding::gReshardingCollectionClonerBatchSizeInBytes.load()); - _env->metrics()->onCollClonerFillBatchForInsert( + + _metrics->onCloningTotalRemoteBatchRetrieval( duration_cast(latencyTimer.elapsed())); - if (ShardingDataTransformMetrics::isEnabled()) { - _env->metricsNew()->onCloningTotalRemoteBatchRetrieval( - duration_cast(latencyTimer.elapsed())); - } if (batch.empty()) { return false; @@ -294,15 +290,9 @@ bool ReshardingCollectionCloner::doOneBatch(OperationContext* opCtx, Pipeline& p int bytesInserted = resharding::data_copy::withOneStaleConfigRetry( opCtx, [&] { return resharding::data_copy::insertBatch(opCtx, _outputNss, batch); }); - _env->metrics()->onDocumentsCopied(batch.size(), bytesInserted); - _env->metrics()->gotInserts(batch.size()); - if (ShardingDataTransformMetrics::isEnabled()) { - _env->metricsNew()->onDocumentsCopied( - batch.size(), bytesInserted, Milliseconds(batchInsertTimer.millis())); - // TODO: Remove this comment when ReshardingMetrics are replaced with ReshardingMetricsNew. - // ReshardingMetricsNew::onInsertsApplied is intentionally not called here. Documents copied - // are no longer considered applied inserts. - } + _metrics->onDocumentsCopied( + batch.size(), bytesInserted, Milliseconds(batchInsertTimer.millis())); + return true; } diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner.h b/src/mongo/db/s/resharding/resharding_collection_cloner.h index 06e38be80fc..e24b03c76b6 100644 --- a/src/mongo/db/s/resharding/resharding_collection_cloner.h +++ b/src/mongo/db/s/resharding/resharding_collection_cloner.h @@ -52,7 +52,6 @@ class TaskExecutor; class OperationContext; class MongoProcessInterface; -class ReshardingMetrics; class ReshardingMetricsNew; class ServiceContext; @@ -62,25 +61,7 @@ class ServiceContext; */ class ReshardingCollectionCloner { public: - class Env { - public: - explicit Env(ReshardingMetrics* metrics, ReshardingMetricsNew* metricsNew) - : _metrics(metrics), _metricsNew(metricsNew) {} - - ReshardingMetrics* metrics() const { - return _metrics; - } - - ReshardingMetricsNew* metricsNew() const { - return _metricsNew; - } - - private: - ReshardingMetrics* const _metrics; - ReshardingMetricsNew* const _metricsNew; - }; - - ReshardingCollectionCloner(std::unique_ptr env, + ReshardingCollectionCloner(ReshardingMetricsNew* metrics, ShardKeyPattern newShardKeyPattern, NamespaceString sourceNss, const UUID& sourceUUID, @@ -118,7 +99,7 @@ private: std::unique_ptr _restartPipeline(OperationContext* opCtx); - const std::unique_ptr _env; + ReshardingMetricsNew* _metrics; const ShardKeyPattern _newShardKeyPattern; const NamespaceString _sourceNss; const UUID _sourceUUID; diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp b/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp deleted file mode 100644 index 62330924df8..00000000000 --- a/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp +++ /dev/null @@ -1,145 +0,0 @@ -/** - * Copyright (C) 2020-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * . - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - - -#include "mongo/platform/basic.h" - -#include - -#include "mongo/bson/bsonmisc.h" -#include "mongo/bson/json.h" -#include "mongo/db/exec/document_value/document_value_test_util.h" -#include "mongo/db/hasher.h" -#include "mongo/db/pipeline/document_source_mock.h" -#include "mongo/db/s/resharding/resharding_collection_cloner.h" -#include "mongo/db/s/resharding/resharding_metrics.h" -#include "mongo/db/s/resharding/resharding_metrics_new.h" -#include "mongo/db/s/resharding/resharding_util.h" -#include "mongo/db/service_context_test_fixture.h" -#include "mongo/unittest/unittest.h" - -#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest - - -namespace mongo { -namespace { - -using Doc = Document; -using Arr = std::vector; -using V = Value; - -/** - * Mock interface to allow specifying mock results for the 'from' collection of the $lookup stage. - */ -class MockMongoInterface final : public StubMongoProcessInterface { -public: - MockMongoInterface(std::deque mockResults) - : _mockResults(std::move(mockResults)) {} - - std::unique_ptr attachCursorSourceToPipeline( - Pipeline* ownedPipeline, - ShardTargetingPolicy shardTargetingPolicy = ShardTargetingPolicy::kAllowed, - boost::optional readConcern = boost::none) final { - std::unique_ptr pipeline( - ownedPipeline, PipelineDeleter(ownedPipeline->getContext()->opCtx)); - - pipeline->addInitialSource( - DocumentSourceMock::createForTest(_mockResults, pipeline->getContext())); - return pipeline; - } - -private: - std::deque _mockResults; -}; - -class ReshardingCollectionClonerTest : public ServiceContextTest { -protected: - std::unique_ptr makePipeline( - ShardKeyPattern newShardKeyPattern, - ShardId recipientShard, - std::deque sourceCollectionData, - std::deque configCacheChunksData) { - auto tempNss = constructTemporaryReshardingNss(_sourceNss.db(), _sourceUUID); - - _metricsNew = - ReshardingMetricsNew::makeInstance(_sourceUUID, - newShardKeyPattern.toBSON(), - _sourceNss, - ReshardingMetricsNew::Role::kRecipient, - getServiceContext()->getFastClockSource()->now(), - getServiceContext()); - - ReshardingCollectionCloner cloner( - std::make_unique(_metrics.get(), _metricsNew.get()), - std::move(newShardKeyPattern), - _sourceNss, - _sourceUUID, - std::move(recipientShard), - Timestamp(1, 0), /* dummy value */ - std::move(tempNss)); - - auto pipeline = cloner.makePipeline( - _opCtx.get(), std::make_shared(std::move(configCacheChunksData))); - - pipeline->addInitialSource(DocumentSourceMock::createForTest( - std::move(sourceCollectionData), pipeline->getContext())); - - return pipeline; - } - - template - auto getHashedElementValue(T value) { - return BSONElementHasher::hash64(BSON("" << value).firstElement(), - BSONElementHasher::DEFAULT_HASH_SEED); - } - - void setUp() override { - ServiceContextTest::setUp(); - _metrics = std::make_unique(getServiceContext()); - _metrics->onStart(ReshardingMetrics::Role::kRecipient, - getServiceContext()->getFastClockSource()->now()); - _metrics->setRecipientState(RecipientStateEnum::kCloning); - } - - void tearDown() override { - _metrics = nullptr; - ServiceContextTest::tearDown(); - } - -private: - const NamespaceString _sourceNss = NamespaceString("test"_sd, "collection_being_resharded"_sd); - const UUID _sourceUUID = UUID::gen(); - - ServiceContext::UniqueOperationContext _opCtx = makeOperationContext(); - std::unique_ptr _metrics; - std::unique_ptr _metricsNew; -}; - -} // namespace -} // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.cpp b/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.cpp index 314812f7158..ae6b61fb314 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.cpp @@ -37,7 +37,6 @@ #include "mongo/bson/bsonobj.h" #include "mongo/client/read_preference.h" #include "mongo/db/cancelable_operation_context.h" -#include "mongo/db/s/resharding/resharding_metrics.h" #include "mongo/db/s/resharding/resharding_server_parameters_gen.h" #include "mongo/logv2/log.h" #include "mongo/s/async_requests_sender.h" @@ -210,13 +209,8 @@ ExecutorFuture CoordinatorCommitMonitor::_makeFuture() const { return RemainingOperationTimes{Milliseconds(0), Milliseconds::max()}; }) .then([this, anchor = shared_from_this()](RemainingOperationTimes remainingTimes) { - auto metrics = ReshardingMetrics::get(cc().getServiceContext()); - metrics->setMinRemainingOperationTime(remainingTimes.min); - metrics->setMaxRemainingOperationTime(remainingTimes.max); - if (ShardingDataTransformMetrics::isEnabled()) { - _metricsNew->setCoordinatorHighEstimateRemainingTimeMillis(remainingTimes.max); - _metricsNew->setCoordinatorLowEstimateRemainingTimeMillis(remainingTimes.min); - } + _metricsNew->setCoordinatorHighEstimateRemainingTimeMillis(remainingTimes.max); + _metricsNew->setCoordinatorLowEstimateRemainingTimeMillis(remainingTimes.min); // Check if all recipient shards are within the commit threshold. if (remainingTimes.max <= _threshold) diff --git a/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp index cc9431145ae..1cc717b7aec 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp @@ -40,7 +40,7 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/s/config/config_server_test_fixture.h" #include "mongo/db/s/resharding/resharding_coordinator_commit_monitor.h" -#include "mongo/db/s/resharding/resharding_metrics.h" +#include "mongo/db/s/resharding/resharding_metrics_new.h" #include "mongo/db/s/resharding/resharding_server_parameters_gen.h" #include "mongo/executor/thread_pool_task_executor_test_fixture.h" #include "mongo/logv2/log.h" @@ -122,11 +122,6 @@ auto makeExecutor() { void CoordinatorCommitMonitorTest::setUp() { ConfigServerTestFixture::setUp(); - auto clockSource = getServiceContext()->getFastClockSource(); - auto metrics = ReshardingMetrics::get(getServiceContext()); - metrics->onStart(ReshardingMetrics::Role::kCoordinator, clockSource->now()); - metrics->setCoordinatorState(CoordinatorStateEnum::kApplying); - auto hostNameForShard = [](const ShardId& shard) -> std::string { return fmt::format("{}:1234", shard.toString()); }; @@ -155,6 +150,7 @@ void CoordinatorCommitMonitorTest::setUp() { _cancellationSource = std::make_unique(); + auto clockSource = getServiceContext()->getFastClockSource(); _metrics = std::make_shared( UUID::gen(), BSON("y" << 1), diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp index 68ad17e3b62..cbd6232a5d1 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp @@ -48,7 +48,6 @@ #include "mongo/db/s/config/sharding_catalog_manager.h" #include "mongo/db/s/resharding/resharding_coordinator_commit_monitor.h" #include "mongo/db/s/resharding/resharding_future_util.h" -#include "mongo/db/s/resharding/resharding_metrics.h" #include "mongo/db/s/resharding/resharding_server_parameters_gen.h" #include "mongo/db/s/resharding/resharding_util.h" #include "mongo/db/s/sharding_data_transform_cumulative_metrics.h" @@ -244,9 +243,7 @@ void writeToCoordinatorStateNss(OperationContext* opCtx, *approxDocumentsToCopy); } - if (ShardingDataTransformMetrics::isEnabled()) { - buildStateDocumentMetricsForUpdate(setBuilder, metrics, nextState); - } + buildStateDocumentMetricsForUpdate(setBuilder, metrics, nextState); if (nextState == CoordinatorStateEnum::kPreparingToDonate) { appendShardEntriesToSetBuilder(coordinatorDoc, setBuilder); @@ -317,9 +314,7 @@ BSONObj createReshardingFieldsUpdateForOriginalNss( TypeCollectionReshardingFields originalEntryReshardingFields( coordinatorDoc.getReshardingUUID()); originalEntryReshardingFields.setState(coordinatorDoc.getState()); - if (ShardingDataTransformMetrics::isEnabled()) { - originalEntryReshardingFields.setStartTime(coordinatorDoc.getStartTime()); - } + originalEntryReshardingFields.setStartTime(coordinatorDoc.getStartTime()); return BSON("$set" << BSON(CollectionType::kReshardingFieldsFieldName << originalEntryReshardingFields.toBSON() @@ -613,41 +608,8 @@ BSONObj makeFlushRoutingTableCacheUpdatesCmd(const NamespaceString& nss) { BSON(WriteConcernOptions::kWriteConcernField << kMajorityWriteConcern.toBSON())); } -ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum toMetricsState( - CoordinatorStateEnum enumVal) { - switch (enumVal) { - case CoordinatorStateEnum::kUnused: - return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kUnused; - - case CoordinatorStateEnum::kInitializing: - return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kInitializing; - - case CoordinatorStateEnum::kPreparingToDonate: - return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kPreparingToDonate; - - case CoordinatorStateEnum::kCloning: - return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kCloning; - - case CoordinatorStateEnum::kApplying: - return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kApplying; - - case CoordinatorStateEnum::kBlockingWrites: - return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kBlockingWrites; - - case CoordinatorStateEnum::kAborting: - return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kAborting; - - case CoordinatorStateEnum::kCommitting: - return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kCommitting; - - case CoordinatorStateEnum::kDone: - return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kDone; - default: - invariant(false, - str::stream() << "Unexpected resharding coordinator state: " - << CoordinatorState_serializer(enumVal)); - MONGO_UNREACHABLE; - } +ReshardingMetricsNew::CoordinatorState toMetricsState(CoordinatorStateEnum state) { + return ReshardingMetricsNew::CoordinatorState(state); } } // namespace @@ -668,9 +630,7 @@ CollectionType createTempReshardingCollectionType( TypeCollectionReshardingFields tempEntryReshardingFields(coordinatorDoc.getReshardingUUID()); tempEntryReshardingFields.setState(coordinatorDoc.getState()); - if (ShardingDataTransformMetrics::isEnabled()) { - tempEntryReshardingFields.setStartTime(coordinatorDoc.getStartTime()); - } + tempEntryReshardingFields.setStartTime(coordinatorDoc.getStartTime()); auto recipientFields = constructRecipientFields(coordinatorDoc); tempEntryReshardingFields.setRecipientFields(std::move(recipientFields)); @@ -888,10 +848,8 @@ void removeCoordinatorDocAndReshardingFields(OperationContext* opCtx, }, ShardingCatalogClient::kLocalWriteConcern); - if (metrics && ShardingDataTransformMetrics::isEnabled()) { - metrics->onStateTransition(toMetricsState(coordinatorDoc.getState()), - toMetricsState(updatedCoordinatorDoc.getState())); - } + metrics->onStateTransition(toMetricsState(coordinatorDoc.getState()), + toMetricsState(updatedCoordinatorDoc.getState())); } } // namespace resharding @@ -1078,10 +1036,7 @@ ReshardingCoordinatorService::ReshardingCoordinator::ReshardingCoordinator( : PrimaryOnlyService::TypedInstance(), _id(coordinatorDoc.getReshardingUUID().toBSON()), _coordinatorService(coordinatorService), - _metricsNew{ - ShardingDataTransformMetrics::isEnabled() - ? ReshardingMetricsNew::initializeFrom(coordinatorDoc, getGlobalServiceContext()) - : nullptr}, + _metricsNew{ReshardingMetricsNew::initializeFrom(coordinatorDoc, getGlobalServiceContext())}, _metadata(coordinatorDoc.getCommonReshardingMetadata()), _coordinatorDoc(coordinatorDoc), _markKilledExecutor(std::make_shared([] { @@ -1100,9 +1055,7 @@ ReshardingCoordinatorService::ReshardingCoordinator::ReshardingCoordinator( _reshardingCoordinatorObserver->onReshardingParticipantTransition(coordinatorDoc); } - if (ShardingDataTransformMetrics::isEnabled()) { - _metricsNew->onStateTransition(boost::none, toMetricsState(coordinatorDoc.getState())); - } + _metricsNew->onStateTransition(boost::none, toMetricsState(coordinatorDoc.getState())); } void ReshardingCoordinatorService::ReshardingCoordinator::installCoordinatorDoc( @@ -1127,10 +1080,8 @@ void ReshardingCoordinatorService::ReshardingCoordinator::installCoordinatorDoc( const auto previousState = _coordinatorDoc.getState(); _coordinatorDoc = doc; - if (ShardingDataTransformMetrics::isEnabled()) { - _metricsNew->onStateTransition(toMetricsState(previousState), - toMetricsState(_coordinatorDoc.getState())); - } + _metricsNew->onStateTransition(toMetricsState(previousState), + toMetricsState(_coordinatorDoc.getState())); ShardingLogging::get(opCtx)->logChange(opCtx, "resharding.coordinator.transition", @@ -1139,24 +1090,13 @@ void ReshardingCoordinatorService::ReshardingCoordinator::installCoordinatorDoc( kMajorityWriteConcern); } -void markCompleted(const Status& status) { - auto metrics = ReshardingMetrics::get(cc().getServiceContext()); - auto metricsOperationStatus = [&] { - if (status.isOK()) { - return ReshardingOperationStatusEnum::kSuccess; - } else if (status == ErrorCodes::ReshardCollectionAborted) { - return ReshardingOperationStatusEnum::kCanceled; - } else { - return ReshardingOperationStatusEnum::kFailure; - } - }(); - - metrics->onCompletion( - ReshardingMetrics::Role::kCoordinator, metricsOperationStatus, getCurrentTime()); - - if (ShardingDataTransformMetrics::isEnabled()) { - ShardingDataTransformCumulativeMetrics::getForResharding(cc().getServiceContext()) - ->onCompletion(metricsOperationStatus); +void markCompleted(const Status& status, ReshardingMetricsNew* metrics) { + if (status.isOK()) { + metrics->onSuccess(); + } else if (status == ErrorCodes::ReshardCollectionAborted) { + metrics->onCanceled(); + } else { + metrics->onFailure(); } } @@ -1380,9 +1320,8 @@ ReshardingCoordinatorService::ReshardingCoordinator::_commitAndFinishReshardOper }) .then([this, executor] { return _awaitAllParticipantShardsDone(executor); }) .then([this, executor] { - if (ShardingDataTransformMetrics::isEnabled()) { - _metricsNew->onCriticalSectionEnd(); - } + _metricsNew->onCriticalSectionEnd(); + // Best-effort attempt to trigger a refresh on the participant shards so // they see the collection metadata without reshardingFields and no longer // throw ReshardCollectionInProgress. There is no guarantee this logic ever @@ -1464,13 +1403,6 @@ SemiFuture ReshardingCoordinatorService::ReshardingCoordinator::run( .onCompletion([outerStatus](Status) { return outerStatus; }); }) .onCompletion([this, self = shared_from_this()](Status status) { - // On stepdown or shutdown, the _scopedExecutor may have already been shut down. - // Schedule cleanup work on the parent executor. - if (_ctHolder->isSteppingOrShuttingDown()) { - ReshardingMetrics::get(cc().getServiceContext()) - ->onStepDown(ReshardingMetrics::Role::kCoordinator); - } - if (!status.isOK()) { { auto lg = stdx::lock_guard(_fulfillmentMutex); @@ -1485,10 +1417,7 @@ SemiFuture ReshardingCoordinatorService::ReshardingCoordinator::run( _reshardingCoordinatorObserver->interrupt(status); } - if (ShardingDataTransformMetrics::isEnabled()) { - _metricsNew->onStateTransition(toMetricsState(_coordinatorDoc.getState()), - boost::none); - } + _metricsNew->onStateTransition(toMetricsState(_coordinatorDoc.getState()), boost::none); }) .semi(); } @@ -1502,9 +1431,8 @@ ExecutorFuture ReshardingCoordinatorService::ReshardingCoordinator::_onAbo return resharding::WithAutomaticRetry([this, executor, status] { auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); - // Notify `ReshardingMetrics` as the operation is now complete for external - // observers. - markCompleted(status); + // Notify metrics as the operation is now complete for external observers. + markCompleted(status, _metricsNew.get()); // The temporary collection and its corresponding entries were never created. Only // the coordinator document and reshardingFields require cleanup. @@ -1578,16 +1506,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator::abort() { boost::optional ReshardingCoordinatorService::ReshardingCoordinator::reportForCurrentOp( MongoProcessInterface::CurrentOpConnectionsMode, MongoProcessInterface::CurrentOpSessionsMode) noexcept { - if (ShardingDataTransformMetrics::isEnabled()) { - return _metricsNew->reportForCurrentOp(); - } - - ReshardingMetrics::ReporterOptions options(ReshardingMetrics::Role::kCoordinator, - _coordinatorDoc.getReshardingUUID(), - _coordinatorDoc.getSourceNss(), - _coordinatorDoc.getReshardingKey().toBSON(), - false); - return ReshardingMetrics::get(cc().getServiceContext())->reportForCurrentOp(options); + return _metricsNew->reportForCurrentOp(); } std::shared_ptr @@ -1627,8 +1546,6 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_insertCoordDocAndChan if (_coordinatorDoc.getState() > CoordinatorStateEnum::kUnused) { if (!_coordinatorDocWrittenPromise.getFuture().isReady()) { _coordinatorDocWrittenPromise.emplaceValue(); - ReshardingMetrics::get(cc().getServiceContext()) - ->onStepUp(ReshardingMetrics::Role::kCoordinator); } if (_coordinatorDoc.getState() == CoordinatorStateEnum::kAborting) { @@ -1650,15 +1567,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_insertCoordDocAndChan { // Note: don't put blocking or interruptible code in this block. _coordinatorDocWrittenPromise.emplaceValue(); - - // TODO SERVER-53914 to accommodate loading metrics for the coordinator. - ReshardingMetrics::get(cc().getServiceContext()) - ->onStart(ReshardingMetrics::Role::kCoordinator, getCurrentTime()); - - if (ShardingDataTransformMetrics::isEnabled()) { - ShardingDataTransformCumulativeMetrics::getForResharding(cc().getServiceContext()) - ->onStarted(); - } + _metricsNew->onStarted(); } pauseBeforeInsertCoordinatorDoc.pauseWhileSet(); @@ -1750,9 +1659,7 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllDonorsReadyToDonat coordinatorDocChangedOnDisk, highestMinFetchTimestamp, computeApproxCopySize(coordinatorDocChangedOnDisk)); - if (ShardingDataTransformMetrics::isEnabled()) { - _metricsNew->onCopyingBegin(); - } + _metricsNew->onCopyingBegin(); }) .then([this] { return _waitForMajority(_ctHolder->getAbortToken()); }); } @@ -1771,10 +1678,8 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsFinished .then([this](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) { this->_updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum::kApplying, coordinatorDocChangedOnDisk); - if (ShardingDataTransformMetrics::isEnabled()) { - _metricsNew->onCopyingEnd(); - _metricsNew->onApplyingBegin(); - } + _metricsNew->onCopyingEnd(); + _metricsNew->onApplyingBegin(); }) .then([this] { return _waitForMajority(_ctHolder->getAbortToken()); }); } @@ -1833,10 +1738,8 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsFinished this->_updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum::kBlockingWrites, _coordinatorDoc); - if (ShardingDataTransformMetrics::isEnabled()) { - _metricsNew->onApplyingEnd(); - _metricsNew->onCriticalSectionBegin(); - } + _metricsNew->onApplyingEnd(); + _metricsNew->onCriticalSectionBegin(); }) .then([this] { return _waitForMajority(_ctHolder->getAbortToken()); }) .thenRunOn(**executor) @@ -1975,8 +1878,8 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllParticipantShardsD reshardingPauseCoordinatorBeforeRemovingStateDoc.pauseWhileSetAndNotCanceled( opCtx.get(), _ctHolder->getStepdownToken()); - // Notify `ReshardingMetrics` as the operation is now complete for external observers. - markCompleted(abortReason ? *abortReason : Status::OK()); + // Notify metrics as the operation is now complete for external observers. + markCompleted(abortReason ? *abortReason : Status::OK(), _metricsNew.get()); resharding::removeCoordinatorDocAndReshardingFields( opCtx.get(), _metricsNew.get(), coordinatorDoc, abortReason); @@ -2133,12 +2036,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_updateChunkImbalanceM auto imbalanceCount = getMaxChunkImbalanceCount(routingInfo, allShardsWithOpTime.value, zoneInfo); - ReshardingMetrics::get(opCtx->getServiceContext()) - ->setLastReshardChunkImbalanceCount(imbalanceCount); - if (ShardingDataTransformMetrics::isEnabled()) { - ShardingDataTransformCumulativeMetrics::getForResharding(opCtx->getServiceContext()) - ->setLastOpEndingChunkImbalance(imbalanceCount); - } + _metricsNew->setLastOpEndingChunkImbalance(imbalanceCount); } catch (const DBException& ex) { LOGV2_WARNING(5543000, "Encountered error while trying to update resharding chunk imbalance metrics", diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp index 7f57851d8e2..dc16d5fe271 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp @@ -42,7 +42,6 @@ #include "mongo/db/repl/wait_for_majority_service.h" #include "mongo/db/s/config/config_server_test_fixture.h" #include "mongo/db/s/resharding/resharding_coordinator_service.h" -#include "mongo/db/s/resharding/resharding_metrics.h" #include "mongo/db/s/resharding/resharding_op_observer.h" #include "mongo/db/s/resharding/resharding_server_parameters_gen.h" #include "mongo/db/s/resharding/resharding_service_test_helpers.h" @@ -198,6 +197,8 @@ public: CoordinatorStateEnum state, boost::optional fetchTimestamp = boost::none) { CommonReshardingMetadata meta( _reshardingUUID, _originalNss, UUID::gen(), _tempNss, _newShardKey.toBSON()); + meta.setStartTime(getServiceContext()->getFastClockSource()->now()); + ReshardingCoordinatorDocument doc(state, {DonorShardEntry(ShardId("shard0000"), {})}, {RecipientShardEntry(ShardId("shard0001"), {})}); @@ -798,7 +799,6 @@ TEST_F(ReshardingCoordinatorServiceTest, StepDownStepUpEachTransition) { stateTransitionsGuard.wait(state); - stepDown(opCtx); ASSERT_EQ(coordinator->getCompletionFuture().getNoThrow(), @@ -806,15 +806,6 @@ TEST_F(ReshardingCoordinatorServiceTest, StepDownStepUpEachTransition) { coordinator.reset(); - // Metrics should be cleared after step down. - { - auto metrics = ReshardingMetrics::get(opCtx->getServiceContext()); - BSONObjBuilder metricsBuilder; - metrics->serializeCurrentOpMetrics(&metricsBuilder, - ReshardingMetrics::Role::kCoordinator); - ASSERT_BSONOBJ_EQ(BSONObj(), metricsBuilder.done()); - } - stepUp(opCtx); stateTransitionsGuard.unset(state); @@ -827,12 +818,6 @@ TEST_F(ReshardingCoordinatorServiceTest, StepDownStepUpEachTransition) { // 'done' state is never written to storage so don't wait for it. waitUntilCommittedCoordinatorDocReach(opCtx, state); - - // Metrics should not be empty after step up. - auto metrics = ReshardingMetrics::get(opCtx->getServiceContext()); - BSONObjBuilder metricsBuilder; - metrics->serializeCurrentOpMetrics(&metricsBuilder, ReshardingMetrics::Role::kCoordinator); - ASSERT_BSONOBJ_NE(BSONObj(), metricsBuilder.done()); } makeDonorsProceedToDone(opCtx); @@ -843,14 +828,6 @@ TEST_F(ReshardingCoordinatorServiceTest, StepDownStepUpEachTransition) { coordinator->getCompletionFuture().get(opCtx); } - // Metrics should be cleared after commit. - { - auto metrics = ReshardingMetrics::get(opCtx->getServiceContext()); - BSONObjBuilder metricsBuilder; - metrics->serializeCurrentOpMetrics(&metricsBuilder, ReshardingMetrics::Role::kCoordinator); - ASSERT_BSONOBJ_EQ(BSONObj(), metricsBuilder.done()); - } - { DBDirectClient client(opCtx); diff --git a/src/mongo/db/s/resharding/resharding_data_replication.cpp b/src/mongo/db/s/resharding/resharding_data_replication.cpp index 1b2bd3c2fa4..4143c8c0c76 100644 --- a/src/mongo/db/s/resharding/resharding_data_replication.cpp +++ b/src/mongo/db/s/resharding/resharding_data_replication.cpp @@ -36,7 +36,6 @@ #include "mongo/db/s/resharding/resharding_collection_cloner.h" #include "mongo/db/s/resharding/resharding_data_copy_util.h" #include "mongo/db/s/resharding/resharding_future_util.h" -#include "mongo/db/s/resharding/resharding_metrics.h" #include "mongo/db/s/resharding/resharding_oplog_applier.h" #include "mongo/db/s/resharding/resharding_oplog_fetcher.h" #include "mongo/db/s/resharding/resharding_server_parameters_gen.h" @@ -82,13 +81,12 @@ void ensureFulfilledPromise(SharedPromise& sp, Status error) { } // namespace std::unique_ptr ReshardingDataReplication::_makeCollectionCloner( - ReshardingMetrics* metrics, ReshardingMetricsNew* metricsNew, const CommonReshardingMetadata& metadata, const ShardId& myShardId, Timestamp cloneTimestamp) { return std::make_unique( - std::make_unique(metrics, metricsNew), + metricsNew, ShardKeyPattern{metadata.getReshardingKey()}, metadata.getSourceNss(), metadata.getSourceUUID(), @@ -114,7 +112,6 @@ std::vector> ReshardingDataReplication::_ma std::vector> ReshardingDataReplication::_makeOplogFetchers( OperationContext* opCtx, - ReshardingMetrics* metrics, ReshardingMetricsNew* metricsNew, const CommonReshardingMetadata& metadata, const std::vector& donorShards, @@ -131,8 +128,7 @@ std::vector> ReshardingDataReplication:: invariant((idToResumeFrom >= ReshardingDonorOplogId{minFetchTimestamp, minFetchTimestamp})); oplogFetchers.emplace_back(std::make_unique( - std::make_unique( - opCtx->getServiceContext(), metrics, metricsNew), + std::make_unique(opCtx->getServiceContext(), metricsNew), metadata.getReshardingUUID(), metadata.getSourceUUID(), // The recipient fetches oplog entries from the donor starting from the largest _id @@ -168,7 +164,6 @@ std::shared_ptr ReshardingDataReplication::_makeOplogFet std::vector> ReshardingDataReplication::_makeOplogAppliers( OperationContext* opCtx, - ReshardingMetrics* metrics, ReshardingApplierMetricsMap* applierMetricsMap, const CommonReshardingMetadata& metadata, const std::vector& donorShards, @@ -191,8 +186,8 @@ std::vector> ReshardingDataReplication:: auto applierMetrics = (*applierMetricsMap)[donorShardId].get(); oplogAppliers.emplace_back(std::make_unique( - std::make_unique( - opCtx->getServiceContext(), metrics, applierMetrics), + std::make_unique(opCtx->getServiceContext(), + applierMetrics), std::move(sourceId), oplogBufferNss, metadata.getTempReshardingNss(), @@ -211,7 +206,6 @@ std::vector> ReshardingDataReplication:: std::unique_ptr ReshardingDataReplication::make( OperationContext* opCtx, - ReshardingMetrics* metrics, ReshardingMetricsNew* metricsNew, ReshardingApplierMetricsMap* applierMetricsMap, CommonReshardingMetadata metadata, @@ -224,19 +218,16 @@ std::unique_ptr ReshardingDataReplication::m std::vector> txnCloners; if (!cloningDone) { - collectionCloner = - _makeCollectionCloner(metrics, metricsNew, metadata, myShardId, cloneTimestamp); + collectionCloner = _makeCollectionCloner(metricsNew, metadata, myShardId, cloneTimestamp); txnCloners = _makeTxnCloners(metadata, donorShards); } - auto oplogFetchers = - _makeOplogFetchers(opCtx, metrics, metricsNew, metadata, donorShards, myShardId); + auto oplogFetchers = _makeOplogFetchers(opCtx, metricsNew, metadata, donorShards, myShardId); auto oplogFetcherExecutor = _makeOplogFetcherExecutor(donorShards.size()); auto stashCollections = ensureStashCollectionsExist(opCtx, sourceChunkMgr, donorShards); auto oplogAppliers = _makeOplogAppliers(opCtx, - metrics, applierMetricsMap, metadata, donorShards, diff --git a/src/mongo/db/s/resharding/resharding_data_replication.h b/src/mongo/db/s/resharding/resharding_data_replication.h index 47417ae8363..f8348646758 100644 --- a/src/mongo/db/s/resharding/resharding_data_replication.h +++ b/src/mongo/db/s/resharding/resharding_data_replication.h @@ -49,7 +49,6 @@ namespace mongo { class OperationContext; class ReshardingOplogApplier; class ReshardingCollectionCloner; -class ReshardingMetrics; class ReshardingOplogFetcher; class ReshardingTxnCloner; class ServiceContext; @@ -141,7 +140,6 @@ private: public: static std::unique_ptr make( OperationContext* opCtx, - ReshardingMetrics* metrics, ReshardingMetricsNew* metricsNew, ReshardingApplierMetricsMap* applierMetricsMap, CommonReshardingMetadata metadata, @@ -198,7 +196,6 @@ public: private: static std::unique_ptr _makeCollectionCloner( - ReshardingMetrics* metrics, ReshardingMetricsNew* metricsNew, const CommonReshardingMetadata& metadata, const ShardId& myShardId, @@ -210,7 +207,6 @@ private: static std::vector> _makeOplogFetchers( OperationContext* opCtx, - ReshardingMetrics* metrics, ReshardingMetricsNew* metricsNew, const CommonReshardingMetadata& metadata, const std::vector& donorShards, @@ -220,7 +216,6 @@ private: static std::vector> _makeOplogAppliers( OperationContext* opCtx, - ReshardingMetrics* metrics, ReshardingApplierMetricsMap* applierMetricsMap, const CommonReshardingMetadata& metadata, const std::vector& donorShards, diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp index 60a094f8e5e..43d91e83b97 100644 --- a/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp @@ -252,9 +252,7 @@ ReshardingDonorDocument constructDonorDocumentFromReshardingFields( sourceUUID, reshardingFields.getDonorFields()->getTempReshardingNss(), reshardingFields.getDonorFields()->getReshardingKey().toBSON()); - if (ShardingDataTransformMetrics::isEnabled()) { - commonMetadata.setStartTime(reshardingFields.getStartTime()); - } + commonMetadata.setStartTime(reshardingFields.getStartTime()); donorDoc.setCommonReshardingMetadata(std::move(commonMetadata)); return donorDoc; @@ -285,13 +283,13 @@ ReshardingRecipientDocument constructRecipientDocumentFromReshardingFields( sourceUUID, nss, metadata.getShardKeyPattern().toBSON()); - if (ShardingDataTransformMetrics::isEnabled()) { - commonMetadata.setStartTime(reshardingFields.getStartTime()); - ReshardingRecipientMetrics metrics; - metrics.setApproxDocumentsToCopy(recipientFields->getApproxDocumentsToCopy()); - metrics.setApproxBytesToCopy(recipientFields->getApproxBytesToCopy()); - recipientDoc.setMetrics(std::move(metrics)); - } + commonMetadata.setStartTime(reshardingFields.getStartTime()); + + ReshardingRecipientMetrics metrics; + metrics.setApproxDocumentsToCopy(recipientFields->getApproxDocumentsToCopy()); + metrics.setApproxBytesToCopy(recipientFields->getApproxBytesToCopy()); + recipientDoc.setMetrics(std::move(metrics)); + recipientDoc.setCommonReshardingMetadata(std::move(commonMetadata)); return recipientDoc; diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp index 6db00e4dc7e..7f870033a1f 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp @@ -51,7 +51,6 @@ #include "mongo/db/s/resharding/resharding_data_copy_util.h" #include "mongo/db/s/resharding/resharding_donor_recipient_common.h" #include "mongo/db/s/resharding/resharding_future_util.h" -#include "mongo/db/s/resharding/resharding_metrics.h" #include "mongo/db/s/resharding/resharding_server_parameters_gen.h" #include "mongo/db/s/resharding/resharding_util.h" #include "mongo/db/s/sharding_state.h" @@ -184,39 +183,8 @@ public: } }; -ShardingDataTransformCumulativeMetrics::DonorStateEnum toMetricsState(DonorStateEnum enumVal) { - using MetricsEnum = ShardingDataTransformCumulativeMetrics::DonorStateEnum; - - switch (enumVal) { - case DonorStateEnum::kUnused: - return MetricsEnum::kUnused; - - case DonorStateEnum::kPreparingToDonate: - return MetricsEnum::kPreparingToDonate; - - case DonorStateEnum::kDonatingInitialData: - return MetricsEnum::kDonatingInitialData; - - case DonorStateEnum::kDonatingOplogEntries: - return MetricsEnum::kDonatingOplogEntries; - - case DonorStateEnum::kPreparingToBlockWrites: - return MetricsEnum::kPreparingToBlockWrites; - - case DonorStateEnum::kError: - return MetricsEnum::kError; - - case DonorStateEnum::kBlockingWrites: - return MetricsEnum::kBlockingWrites; - - case DonorStateEnum::kDone: - return MetricsEnum::kDone; - default: - invariant(false, - str::stream() << "Unexpected resharding coordinator state: " - << DonorState_serializer(enumVal)); - MONGO_UNREACHABLE; - } +ReshardingMetricsNew::DonorState toMetricsState(DonorStateEnum state) { + return ReshardingMetricsNew::DonorState(state); } } // namespace @@ -241,9 +209,7 @@ ReshardingDonorService::DonorStateMachine::DonorStateMachine( std::unique_ptr externalState) : repl::PrimaryOnlyService::TypedInstance(), _donorService(donorService), - _metricsNew{ShardingDataTransformMetrics::isEnabled() - ? ReshardingMetricsNew::initializeFrom(donorDoc, getGlobalServiceContext()) - : nullptr}, + _metricsNew{ReshardingMetricsNew::initializeFrom(donorDoc, getGlobalServiceContext())}, _metadata{donorDoc.getCommonReshardingMetadata()}, _recipientShardIds{donorDoc.getRecipientShards()}, _donorCtx{donorDoc.getMutableState()}, @@ -267,9 +233,7 @@ ReshardingDonorService::DonorStateMachine::DonorStateMachine( }()) { invariant(_externalState); - if (ShardingDataTransformMetrics::isEnabled()) { - _metricsNew->onStateTransition(boost::none, toMetricsState(_donorCtx.getState())); - } + _metricsNew->onStateTransition(boost::none, toMetricsState(_donorCtx.getState())); } ExecutorFuture ReshardingDonorService::DonorStateMachine::_runUntilBlockingWritesOrErrored( @@ -421,10 +385,7 @@ ExecutorFuture ReshardingDonorService::DonorStateMachine::_finishReshardin _critSecReason, ShardingCatalogClient::kLocalWriteConcern); - _metrics()->leaveCriticalSection(getCurrentTime()); - if (ShardingDataTransformMetrics::isEnabled()) { - _metricsNew->onCriticalSectionEnd(); - } + _metricsNew->onCriticalSectionEnd(); } auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); @@ -466,13 +427,7 @@ Status ReshardingDonorService::DonorStateMachine::_runMandatoryCleanup( ensureFulfilledPromise(lk, _completionPromise, statusForPromise); } - if (stepdownToken.isCanceled()) { - _metrics()->onStepDown(ReshardingMetrics::Role::kDonor); - } - - if (ShardingDataTransformMetrics::isEnabled()) { - _metricsNew->onStateTransition(toMetricsState(_donorCtx.getState()), boost::none); - } + _metricsNew->onStateTransition(toMetricsState(_donorCtx.getState()), boost::none); return status; } @@ -485,7 +440,6 @@ SemiFuture ReshardingDonorService::DonorStateMachine::run( _cancelableOpCtxFactory.emplace(abortToken, _markKilledExecutor); return ExecutorFuture(**executor) - .then([this] { _startMetrics(); }) .then([this, executor, abortToken] { return _runUntilBlockingWritesOrErrored(executor, abortToken); }) @@ -539,16 +493,7 @@ void ReshardingDonorService::DonorStateMachine::interrupt(Status status) {} boost::optional ReshardingDonorService::DonorStateMachine::reportForCurrentOp( MongoProcessInterface::CurrentOpConnectionsMode connMode, MongoProcessInterface::CurrentOpSessionsMode sessionMode) noexcept { - if (ShardingDataTransformMetrics::isEnabled()) { - return _metricsNew->reportForCurrentOp(); - } - - ReshardingMetrics::ReporterOptions options(ReshardingMetrics::Role::kDonor, - _metadata.getReshardingUUID(), - _metadata.getSourceNss(), - _metadata.getReshardingKey().toBSON(), - false); - return _metrics()->reportForCurrentOp(options); + return _metricsNew->reportForCurrentOp(); } void ReshardingDonorService::DonorStateMachine::onReshardingFieldsChanges( @@ -576,16 +521,10 @@ void ReshardingDonorService::DonorStateMachine::onReshardingFieldsChanges( } void ReshardingDonorService::DonorStateMachine::onWriteDuringCriticalSection() { - if (!ShardingDataTransformMetrics::isEnabled()) { - return; - } _metricsNew->onWriteDuringCriticalSection(); } void ReshardingDonorService::DonorStateMachine::onReadDuringCriticalSection() { - if (!ShardingDataTransformMetrics::isEnabled()) { - return; - } _metricsNew->onReadDuringCriticalSection(); } @@ -751,10 +690,7 @@ void ReshardingDonorService::DonorStateMachine:: _critSecReason, ShardingCatalogClient::kLocalWriteConcern); - _metrics()->enterCriticalSection(getCurrentTime()); - if (ShardingDataTransformMetrics::isEnabled()) { - _metricsNew->onCriticalSectionBegin(); - } + _metricsNew->onCriticalSectionBegin(); } { @@ -891,11 +827,8 @@ void ReshardingDonorService::DonorStateMachine::_transitionState(DonorShardConte auto newState = newDonorCtx.getState(); _updateDonorDocument(std::move(newDonorCtx)); - _metrics()->setDonorState(newState); - if (ShardingDataTransformMetrics::isEnabled()) { - _metricsNew->onStateTransition(toMetricsState(oldState), toMetricsState(newState)); - } + _metricsNew->onStateTransition(toMetricsState(oldState), toMetricsState(newState)); LOGV2_INFO(5279505, "Transitioned resharding donor state", @@ -1076,14 +1009,6 @@ void ReshardingDonorService::DonorStateMachine::_removeDonorDocument( opCtx->recoveryUnit()->onCommit([this, stepdownToken, aborted](boost::optional) { stdx::lock_guard lk(_mutex); - - if (!stepdownToken.isCanceled()) { - _metrics()->onCompletion(ReshardingMetrics::Role::kDonor, - aborted ? ReshardingOperationStatusEnum::kFailure - : ReshardingOperationStatusEnum::kSuccess, - getCurrentTime()); - } - _completionPromise.emplaceValue(); }); @@ -1098,19 +1023,6 @@ void ReshardingDonorService::DonorStateMachine::_removeDonorDocument( }); } -ReshardingMetrics* ReshardingDonorService::DonorStateMachine::_metrics() const { - return ReshardingMetrics::get(cc().getServiceContext()); -} - -void ReshardingDonorService::DonorStateMachine::_startMetrics() { - auto donorState = _donorCtx.getState(); - if (donorState > DonorStateEnum::kPreparingToDonate) { - _metrics()->onStepUp(donorState, _donorMetricsToRestore); - } else { - _metrics()->onStart(ReshardingMetrics::Role::kDonor, getCurrentTime()); - } -} - CancellationToken ReshardingDonorService::DonorStateMachine::_initAbortSource( const CancellationToken& stepdownToken) { { diff --git a/src/mongo/db/s/resharding/resharding_donor_service.h b/src/mongo/db/s/resharding/resharding_donor_service.h index 881e74475ec..f2f4d99d2e8 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service.h +++ b/src/mongo/db/s/resharding/resharding_donor_service.h @@ -32,7 +32,6 @@ #include "mongo/db/cancelable_operation_context.h" #include "mongo/db/repl/primary_only_service.h" #include "mongo/db/s/resharding/donor_document_gen.h" -#include "mongo/db/s/resharding/resharding_metrics.h" #include "mongo/db/s/resharding/resharding_metrics_new.h" #include "mongo/s/resharding/type_collection_fields_gen.h" @@ -209,12 +208,6 @@ private: // Removes the local donor document from disk. void _removeDonorDocument(const CancellationToken& stepdownToken, bool aborted); - // Accesses the ReshardingMetrics module for this donor's underlying mongod process. - ReshardingMetrics* _metrics() const; - - // Starts the metrics subsystem for this donor's underlying mongod process. - void _startMetrics(); - // Initializes the _abortSource and generates a token from it to return back the caller. If an // abort was reported prior to the initialization, automatically cancels the _abortSource before // returning the token. diff --git a/src/mongo/db/s/resharding/resharding_donor_service_test.cpp b/src/mongo/db/s/resharding/resharding_donor_service_test.cpp index 658c9500881..0f40919d14d 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service_test.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_service_test.cpp @@ -154,6 +154,7 @@ public: sourceUUID, constructTemporaryReshardingNss(sourceNss.db(), sourceUUID), BSON("newKey" << 1)); + commonMetadata.setStartTime(getServiceContext()->getFastClockSource()->now()); doc.setCommonReshardingMetadata(std::move(commonMetadata)); return doc; @@ -736,22 +737,11 @@ TEST_F(ReshardingDonorServiceTest, RestoreMetricsOnKBlockingWrites) { }; doc.setMutableState(makeDonorCtx()); - auto makeMetricsTimeInterval = [&](const Date_t startTime) { - ReshardingMetricsTimeInterval timeInterval; - timeInterval.setStart(startTime); - return timeInterval; - }; - - auto timeNow = Date_t::now(); - auto opTimeDurationSecs = 60; - auto critSecDurationSecs = 10; - - ReshardingDonorMetrics reshardingDonorMetrics; - reshardingDonorMetrics.setOperationRuntime( - makeMetricsTimeInterval(timeNow - Seconds(opTimeDurationSecs))); - reshardingDonorMetrics.setCriticalSection( - makeMetricsTimeInterval(timeNow - Seconds(critSecDurationSecs))); - doc.setMetrics(reshardingDonorMetrics); + auto timeNow = getServiceContext()->getFastClockSource()->now(); + const auto opTimeDurationSecs = 60; + auto commonMetadata = doc.getCommonReshardingMetadata(); + commonMetadata.setStartTime(timeNow - Seconds(opTimeDurationSecs)); + doc.setCommonReshardingMetadata(std::move(commonMetadata)); createSourceCollection(opCtx.get(), doc); DonorStateMachine::insertStateDocument(opCtx.get(), doc); @@ -779,7 +769,6 @@ TEST_F(ReshardingDonorServiceTest, RestoreMetricsOnKBlockingWrites) { ASSERT_EQ(currOp.getStringField("donorState"), DonorState_serializer(DonorStateEnum::kBlockingWrites)); ASSERT_GTE(currOp.getField("totalOperationTimeElapsedSecs").Long(), opTimeDurationSecs); - ASSERT_GTE(currOp.getField("totalCriticalSectionTimeElapsedSecs").Long(), critSecDurationSecs); stateTransitionsGuard.unset(kDoneState); ASSERT_OK(donor->getCompletionFuture().getNoThrow()); diff --git a/src/mongo/db/s/resharding/resharding_metrics.cpp b/src/mongo/db/s/resharding/resharding_metrics.cpp deleted file mode 100644 index 627f36a89e4..00000000000 --- a/src/mongo/db/s/resharding/resharding_metrics.cpp +++ /dev/null @@ -1,782 +0,0 @@ -/** - * Copyright (C) 2020-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * . - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - - -#include -#include - -#include "mongo/db/s/resharding/resharding_metrics.h" -#include "mongo/db/s/resharding/resharding_util.h" -#include "mongo/logv2/log.h" -#include "mongo/platform/compiler.h" -#include "mongo/util/aligned.h" -#include "mongo/util/duration.h" -#include "mongo/util/histogram.h" - -#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kResharding - - -namespace mongo { - -namespace { -constexpr auto kAnotherOperationInProgress = "Another operation is in progress"; -constexpr auto kNoOperationInProgress = "No operation is in progress"; -constexpr auto kMetricsSetBeforeRestore = "Expected metrics to be 0 prior to restore"; - -constexpr auto kTotalOps = "countReshardingOperations"; -constexpr auto kSuccessfulOps = "countReshardingSuccessful"; -constexpr auto kFailedOps = "countReshardingFailures"; -constexpr auto kCanceledOps = "countReshardingCanceled"; -constexpr auto kOpTimeElapsed = "totalOperationTimeElapsedSecs"; -constexpr auto kOpTimeRemaining = "remainingOperationTimeEstimatedSecs"; -constexpr auto kDocumentsToCopy = "approxDocumentsToCopy"; -constexpr auto kDocumentsCopied = "documentsCopied"; -constexpr auto kBytesToCopy = "approxBytesToCopy"; -constexpr auto kBytesCopied = "bytesCopied"; -constexpr auto kCopyTimeElapsed = "totalCopyTimeElapsedSecs"; -constexpr auto kOplogsFetched = "oplogEntriesFetched"; -constexpr auto kOplogsApplied = "oplogEntriesApplied"; -constexpr auto kApplyTimeElapsed = "totalApplyTimeElapsedSecs"; -constexpr auto kWritesDuringCritialSection = "countWritesDuringCriticalSection"; -constexpr auto kCriticalSectionTimeElapsed = "totalCriticalSectionTimeElapsedSecs"; -constexpr auto kCoordinatorState = "coordinatorState"; -constexpr auto kDonorState = "donorState"; -constexpr auto kRecipientState = "recipientState"; -constexpr auto kOpStatus = "opStatus"; -constexpr auto kLastOpEndingChunkImbalance = "lastOpEndingChunkImbalance"; -constexpr auto kOpCounters = "opcounters"; -constexpr auto kMinRemainingOperationTime = "minShardRemainingOperationTimeEstimatedMillis"; -constexpr auto kMaxRemainingOperationTime = "maxShardRemainingOperationTimeEstimatedMillis"; -constexpr auto kOplogApplierApplyBatchLatencyMillis = "oplogApplierApplyBatchLatencyMillis"; -constexpr auto kCollClonerFillBatchForInsertLatencyMillis = - "collClonerFillBatchForInsertLatencyMillis"; - -using MetricsPtr = std::unique_ptr; - -const auto getMetrics = ServiceContext::declareDecoration(); - -const auto reshardingMetricsRegisterer = ServiceContext::ConstructorActionRegisterer{ - "ReshardingMetrics", - [](ServiceContext* ctx) { getMetrics(ctx) = std::make_unique(ctx); }}; - -static StringData serializeState(boost::optional e) { - return RecipientState_serializer(*e); -} - -static StringData serializeState(boost::optional e) { - return DonorState_serializer(*e); -} - -static StringData serializeState(boost::optional e) { - return CoordinatorState_serializer(*e); -} - -// Enables resharding to distinguish the ops it does to keep up with client writes from the -// client workload tracked in the globalOpCounters. -class ReshardingOpCounters { -public: - void gotInserts(int n) noexcept { - _checkWrap(&ReshardingOpCounters::_insert, n); - } - void gotInsert() noexcept { - _checkWrap(&ReshardingOpCounters::_insert, 1); - } - void gotUpdate() noexcept { - _checkWrap(&ReshardingOpCounters::_update, 1); - } - void gotDelete() noexcept { - _checkWrap(&ReshardingOpCounters::_delete, 1); - } - - BSONObj getObj() const noexcept { - BSONObjBuilder b; - b.append("insert", _insert->loadRelaxed()); - b.append("update", _update->loadRelaxed()); - b.append("delete", _delete->loadRelaxed()); - return b.obj(); - } - -private: - // Increment member `counter` by n, resetting all counters if it was > 2^60. - void _checkWrap(CacheExclusive> ReshardingOpCounters::*counter, int n) { - static constexpr auto maxCount = 1LL << 60; - auto oldValue = (this->*counter)->fetchAndAddRelaxed(n); - if (oldValue > maxCount - n) { - LOGV2(5776000, - "ReshardingOpCounters exceeded maximum value, resetting all to 0", - "insert"_attr = _insert->loadRelaxed(), - "update"_attr = _update->loadRelaxed(), - "delete"_attr = _delete->loadRelaxed()); - _insert->store(0); - _update->store(0); - _delete->store(0); - } - } - - CacheExclusive> _insert; - CacheExclusive> _update; - CacheExclusive> _delete; -}; - -// Allows tracking elapsed time for the resharding operation and its sub operations (e.g., -// applying oplog entries). -class TimeInterval { -public: - void start(Date_t d) noexcept { - if (_start) { - LOGV2_WARNING(5892600, "Resharding metrics already started, start() is a no-op"); - return; - } - _start = d; - } - - void end(Date_t d) noexcept { - invariant(_start, "Not started"); - if (_end) { - LOGV2_WARNING(5892601, "Resharding metrics already ended, end() is a no-op"); - return; - } - _end = d; - } - - // TODO Remove this function once all metrics classes can start from stepup. - void forceEnd(Date_t d) noexcept { - if (!_start) - _start = d; - end(d); - } - - Milliseconds duration(Date_t now) const noexcept { - return !_start ? Milliseconds(0) : ((!_end ? now : *_end) - *_start); - } - -private: - boost::optional _start; - boost::optional _end; -}; - -} // namespace - -class ReshardingMetrics::OperationMetrics { -public: - void appendCurrentOpMetrics(BSONObjBuilder*, Role, Date_t now) const; - - boost::optional remainingOperationTime(Date_t now) const; - - void gotInserts(int n) noexcept; - void gotInsert() noexcept; - void gotUpdate() noexcept; - void gotDelete() noexcept; - - TimeInterval runningOperation; - ReshardingOperationStatusEnum opStatus = ReshardingOperationStatusEnum::kInactive; - - TimeInterval copyingDocuments; - int64_t documentsToCopy = 0; - int64_t documentsCopied = 0; - int64_t bytesToCopy = 0; - int64_t bytesCopied = 0; - - TimeInterval applyingOplogEntries; - int64_t oplogEntriesFetched = 0; - int64_t oplogEntriesApplied = 0; - - TimeInterval inCriticalSection; - int64_t writesDuringCriticalSection = 0; - - int64_t chunkImbalanceCount = 0; - - Histogram oplogApplierApplyBatchLatencyMillis = getLatencyHistogram(); - Histogram collClonerFillBatchForInsertLatencyMillis = getLatencyHistogram(); - - // The ops done by resharding to keep up with the client writes. - ReshardingOpCounters opCounters; - - Milliseconds minRemainingOperationTime = Milliseconds(0); - Milliseconds maxRemainingOperationTime = Milliseconds(0); - - boost::optional donorState; - boost::optional recipientState; - boost::optional coordinatorState; -}; - -void ReshardingMetrics::OperationMetrics::gotInserts(int n) noexcept { - opCounters.gotInserts(n); -} - -void ReshardingMetrics::OperationMetrics::gotInsert() noexcept { - opCounters.gotInsert(); -} - -void ReshardingMetrics::OperationMetrics::gotUpdate() noexcept { - opCounters.gotUpdate(); -} - -void ReshardingMetrics::OperationMetrics::gotDelete() noexcept { - opCounters.gotDelete(); -} - -boost::optional ReshardingMetrics::OperationMetrics::remainingOperationTime( - Date_t now) const { - return estimateRemainingRecipientTime(recipientState > RecipientStateEnum::kCloning, - bytesCopied, - bytesToCopy, - copyingDocuments.duration(now), - oplogEntriesApplied, - oplogEntriesFetched, - applyingOplogEntries.duration(now)); -} - -void ReshardingMetrics::OperationMetrics::appendCurrentOpMetrics(BSONObjBuilder* bob, - Role role, - Date_t now) const { - auto getElapsedTime = [&](const TimeInterval& interval) -> int64_t { - return durationCount(interval.duration(now)); - }; - - const auto remainingMsec = remainingOperationTime(now); - - bob->append(kOpTimeElapsed, getElapsedTime(runningOperation)); - - switch (role) { - case Role::kDonor: - bob->append(kWritesDuringCritialSection, writesDuringCriticalSection); - bob->append(kCriticalSectionTimeElapsed, getElapsedTime(inCriticalSection)); - bob->append(kDonorState, - serializeState(donorState.get_value_or(DonorStateEnum::kUnused))); - bob->append(kOpStatus, ReshardingOperationStatus_serializer(opStatus)); - break; - case Role::kRecipient: - bob->append(kOpTimeRemaining, - !remainingMsec ? int64_t{-1} /** -1 is a specified integer null value */ - : durationCount(*remainingMsec)); - bob->append(kDocumentsToCopy, documentsToCopy); - bob->append(kDocumentsCopied, documentsCopied); - bob->append(kBytesToCopy, bytesToCopy); - bob->append(kBytesCopied, bytesCopied); - bob->append(kCopyTimeElapsed, getElapsedTime(copyingDocuments)); - - bob->append(kOplogsFetched, oplogEntriesFetched); - bob->append(kOplogsApplied, oplogEntriesApplied); - bob->append(kApplyTimeElapsed, getElapsedTime(applyingOplogEntries)); - bob->append(kRecipientState, - serializeState(recipientState.get_value_or(RecipientStateEnum::kUnused))); - bob->append(kOpStatus, ReshardingOperationStatus_serializer(opStatus)); - - appendHistogram( - *bob, oplogApplierApplyBatchLatencyMillis, kOplogApplierApplyBatchLatencyMillis); - appendHistogram(*bob, - collClonerFillBatchForInsertLatencyMillis, - kCollClonerFillBatchForInsertLatencyMillis); - break; - case Role::kCoordinator: - bob->append(kCoordinatorState, serializeState(coordinatorState)); - bob->append(kOpStatus, ReshardingOperationStatus_serializer(opStatus)); - break; - default: - MONGO_UNREACHABLE; - } -} - -ReshardingMetrics* ReshardingMetrics::get(ServiceContext* ctx) noexcept { - return getMetrics(ctx).get(); -} - -void ReshardingMetrics::onStart(Role role, Date_t runningOperationStartTime) noexcept { - stdx::lock_guard lk(_mutex); - // TODO Re-add this invariant once all breaking test cases have been fixed. - // invariant(!_currentOp.has_value(), kAnotherOperationInProgress); - - if (!_currentOp) { - // Only incremement _started if this is the first time resharding metrics is being invoked - // for this resharding operation, and we're not restoring the PrimaryOnlyService from disk. - _started++; - } - - // Create a new operation and record the time it started. - _emplaceCurrentOpForRole(role, runningOperationStartTime); -} - -void ReshardingMetrics::onCompletion(Role role, - ReshardingOperationStatusEnum status, - Date_t runningOperationEndTime) noexcept { - stdx::lock_guard lk(_mutex); - // TODO Re-add this invariant once all breaking test cases have been fixed. Add invariant that - // role being completed is a role that is in progress. - // invariant(_currentOp.has_value(), kNoOperationInProgress); - - // Reset the cumulative min and max remaining operation time to 0. Only coordinators - // will need to report this information, so only reset it for coordinators. - if (role == ReshardingMetrics::Role::kCoordinator) { - _cumulativeOp->minRemainingOperationTime = Milliseconds(0); - _cumulativeOp->maxRemainingOperationTime = Milliseconds(0); - } - - if (!_currentOp) { - return; - } - - if (_currentOp->donorState && _currentOp->recipientState) { - switch (role) { - case Role::kDonor: - _currentOp->donorState = boost::none; - break; - case Role::kRecipient: - _currentOp->recipientState = boost::none; - break; - default: - MONGO_UNREACHABLE; - } - - return; - } - - switch (status) { - case ReshardingOperationStatusEnum::kSuccess: - _succeeded++; - break; - case ReshardingOperationStatusEnum::kFailure: - _failed++; - break; - case ReshardingOperationStatusEnum::kCanceled: - _canceled++; - break; - default: - MONGO_UNREACHABLE; - } - - _currentOp->runningOperation.end(runningOperationEndTime); - - // Reset current op metrics. - _currentOp = nullptr; -} - -void ReshardingMetrics::onStepUp(Role role) noexcept { - stdx::lock_guard lk(_mutex); - _emplaceCurrentOpForRole(role, boost::none); - _onStepUpCalled = true; - // TODO SERVER-53914 Implement coordinator metrics rehydration. - - // TODO SERVER-57094 Resume the runningOperation duration from a timestamp stored on disk - // instead of starting from the current time. -} - -void ReshardingMetrics::onStepUp(DonorStateEnum state, ReshardingDonorMetrics donorMetrics) { - stdx::lock_guard lk(_mutex); - auto operationRuntime = donorMetrics.getOperationRuntime(); - _emplaceCurrentOpForRole( - Role::kDonor, operationRuntime.has_value() ? operationRuntime->getStart() : boost::none); - _currentOp->donorState = state; - _onStepUpCalled = true; - - if (auto criticalSectionTimeInterval = donorMetrics.getCriticalSection(); - criticalSectionTimeInterval.has_value() && - criticalSectionTimeInterval->getStart().has_value()) { - _currentOp->inCriticalSection.start(criticalSectionTimeInterval->getStart().get()); - - if (auto stopTime = criticalSectionTimeInterval->getStop(); stopTime.has_value()) - _currentOp->inCriticalSection.forceEnd(stopTime.get()); - } -} - -void ReshardingMetrics::onStepDown(Role role) noexcept { - stdx::lock_guard lk(_mutex); - if (_currentOp && _currentOp->donorState && _currentOp->recipientState) { - switch (role) { - case Role::kDonor: - _currentOp->donorState = boost::none; - break; - case Role::kRecipient: - _currentOp->recipientState = boost::none; - break; - default: - MONGO_UNREACHABLE; - } - } else { - _currentOp = nullptr; - } -} - -void ReshardingMetrics::_emplaceCurrentOpForRole( - Role role, boost::optional runningOperationStartTime) noexcept { - // Invariants in this function ensure that the only multi-role state allowed is a combination - // of donor and recipient. - if (!_currentOp) { - _currentOp = std::make_unique(); - _currentOp->runningOperation.start(runningOperationStartTime - ? *runningOperationStartTime - : _svcCtx->getFastClockSource()->now()); - _currentOp->opStatus = ReshardingOperationStatusEnum::kRunning; - } else { - invariant(role != Role::kCoordinator, kAnotherOperationInProgress); - invariant(!_currentOp->coordinatorState, kAnotherOperationInProgress); - } - - switch (role) { - case Role::kCoordinator: - _currentOp->coordinatorState.emplace(CoordinatorStateEnum::kUnused); - break; - case Role::kDonor: - invariant(!_currentOp->donorState, kAnotherOperationInProgress); - _currentOp->donorState.emplace(DonorStateEnum::kUnused); - break; - case Role::kRecipient: - invariant(!_currentOp->recipientState, kAnotherOperationInProgress); - _currentOp->recipientState.emplace(RecipientStateEnum::kUnused); - break; - default: - MONGO_UNREACHABLE - } -} - -void ReshardingMetrics::setDonorState(DonorStateEnum state) noexcept { - stdx::lock_guard lk(_mutex); - invariant(_currentOp, kNoOperationInProgress); - - const auto oldState = std::exchange(_currentOp->donorState, state); - invariant(oldState != state); -} - -void ReshardingMetrics::setRecipientState(RecipientStateEnum state) noexcept { - stdx::lock_guard lk(_mutex); - invariant(_currentOp, kNoOperationInProgress); - - const auto oldState = std::exchange(_currentOp->recipientState, state); - invariant(oldState != state); -} - -void ReshardingMetrics::setCoordinatorState(CoordinatorStateEnum state) noexcept { - stdx::lock_guard lk(_mutex); - invariant(_currentOp, kNoOperationInProgress); - _currentOp->coordinatorState = state; -} - -template -static bool checkState(T state, std::initializer_list validStates) { - invariant(validStates.size()); - if (std::find(validStates.begin(), validStates.end(), state) != validStates.end()) - return true; - - std::stringstream ss; - StringData sep = ""; - for (auto s : validStates) { - ss << sep << serializeState(s); - sep = ", "_sd; - } - - LOGV2_FATAL_CONTINUE(5553300, - "Invalid resharding state", - "state"_attr = serializeState(state), - "valid"_attr = ss.str()); - return false; -} - -void ReshardingMetrics::setDocumentsToCopy(int64_t documents, int64_t bytes) noexcept { - stdx::lock_guard lk(_mutex); - invariant(_currentOp, kNoOperationInProgress); - invariant(_currentOp->recipientState == RecipientStateEnum::kCreatingCollection); - - setDocumentsToCopyForCurrentOp(documents, bytes); -} - -void ReshardingMetrics::setDocumentsToCopyForCurrentOp(int64_t documents, int64_t bytes) noexcept { - invariant(_currentOp, kNoOperationInProgress); - - _currentOp->documentsToCopy = documents; - _currentOp->bytesToCopy = bytes; -} - -void ReshardingMetrics::setLastReshardChunkImbalanceCount(int64_t newCount) noexcept { - stdx::lock_guard lk(_mutex); - - invariant(_currentOp, kNoOperationInProgress); - invariant(_currentOp->coordinatorState); - - _cumulativeOp->chunkImbalanceCount = newCount; -} - -void ReshardingMetrics::setMinRemainingOperationTime(Milliseconds minTime) noexcept { - stdx::lock_guard lk(_mutex); - if (_currentOp) { - _cumulativeOp->minRemainingOperationTime = minTime; - } -} - -void ReshardingMetrics::setMaxRemainingOperationTime(Milliseconds maxTime) noexcept { - stdx::lock_guard lk(_mutex); - if (_currentOp) { - _cumulativeOp->maxRemainingOperationTime = maxTime; - } -} - -void ReshardingMetrics::onDocumentsCopied(int64_t documents, int64_t bytes) noexcept { - stdx::lock_guard lk(_mutex); - if (!_currentOp) - return; - - invariant(checkState(*_currentOp->recipientState, - {RecipientStateEnum::kCloning, RecipientStateEnum::kError})); - - _currentOp->documentsCopied += documents; - _currentOp->bytesCopied += bytes; - _cumulativeOp->documentsCopied += documents; - _cumulativeOp->bytesCopied += bytes; -} - -void ReshardingMetrics::gotInserts(int n) noexcept { - _cumulativeOp->gotInserts(n); -} - -void ReshardingMetrics::onOplogApplierApplyBatch(Milliseconds latency) { - stdx::lock_guard lk(_mutex); - invariant(_currentOp, kNoOperationInProgress); - invariant(checkState(*_currentOp->recipientState, - {RecipientStateEnum::kApplying, RecipientStateEnum::kError})); - - _currentOp->oplogApplierApplyBatchLatencyMillis.increment(durationCount(latency)); - _cumulativeOp->oplogApplierApplyBatchLatencyMillis.increment( - durationCount(latency)); -} - -void ReshardingMetrics::onCollClonerFillBatchForInsert(Milliseconds latency) { - stdx::lock_guard lk(_mutex); - invariant(_currentOp, kNoOperationInProgress); - invariant(checkState(*_currentOp->recipientState, - {RecipientStateEnum::kCloning, RecipientStateEnum::kError})); - - _currentOp->collClonerFillBatchForInsertLatencyMillis.increment( - durationCount(latency)); - _cumulativeOp->collClonerFillBatchForInsertLatencyMillis.increment( - durationCount(latency)); -} - -void ReshardingMetrics::gotInsert() noexcept { - _cumulativeOp->gotInsert(); -} - -void ReshardingMetrics::gotUpdate() noexcept { - _cumulativeOp->gotUpdate(); -} - -void ReshardingMetrics::gotDelete() noexcept { - _cumulativeOp->gotDelete(); -} - -void ReshardingMetrics::startCopyingDocuments(Date_t start) { - stdx::lock_guard lk(_mutex); - _currentOp->copyingDocuments.start(start); -} - -void ReshardingMetrics::endCopyingDocuments(Date_t end) { - stdx::lock_guard lk(_mutex); - _currentOp->copyingDocuments.forceEnd(end); -} - -void ReshardingMetrics::startApplyingOplogEntries(Date_t start) { - stdx::lock_guard lk(_mutex); - _currentOp->applyingOplogEntries.start(start); -} - -void ReshardingMetrics::endApplyingOplogEntries(Date_t end) { - stdx::lock_guard lk(_mutex); - _currentOp->applyingOplogEntries.forceEnd(end); -} - -void ReshardingMetrics::enterCriticalSection(Date_t start) { - stdx::lock_guard lk(_mutex); - _currentOp->inCriticalSection.start(start); -} - -void ReshardingMetrics::leaveCriticalSection(Date_t end) { - stdx::lock_guard lk(_mutex); - _currentOp->inCriticalSection.forceEnd(end); -} - -void ReshardingMetrics::onOplogEntriesFetched(int64_t entries) noexcept { - stdx::lock_guard lk(_mutex); - if (!_currentOp) - return; - - invariant(checkState( - *_currentOp->recipientState, - {RecipientStateEnum::kCloning, RecipientStateEnum::kApplying, RecipientStateEnum::kError})); - - _currentOp->oplogEntriesFetched += entries; - _cumulativeOp->oplogEntriesFetched += entries; -} - -void ReshardingMetrics::onOplogEntriesApplied(int64_t entries) noexcept { - stdx::lock_guard lk(_mutex); - if (!_currentOp) - return; - - invariant(checkState(*_currentOp->recipientState, - {RecipientStateEnum::kApplying, RecipientStateEnum::kError})); - - _currentOp->oplogEntriesApplied += entries; - _cumulativeOp->oplogEntriesApplied += entries; -} - -void ReshardingMetrics::restoreForCurrentOp(int64_t documentCountCopied, - int64_t documentBytesCopied, - int64_t oplogEntriesFetched, - int64_t oplogEntriesApplied) noexcept { - invariant(_currentOp, kNoOperationInProgress); - invariant(_currentOp->documentsCopied == 0, kMetricsSetBeforeRestore); - invariant(_currentOp->bytesCopied == 0, kMetricsSetBeforeRestore); - invariant(_currentOp->oplogEntriesFetched == 0, kMetricsSetBeforeRestore); - invariant(_currentOp->oplogEntriesApplied == 0, kMetricsSetBeforeRestore); - - _currentOp->documentsCopied = documentCountCopied; - _currentOp->bytesCopied = documentBytesCopied; - _currentOp->oplogEntriesFetched = oplogEntriesFetched; - _currentOp->oplogEntriesApplied = oplogEntriesApplied; -} - -void ReshardingMetrics::onWriteDuringCriticalSection(int64_t writes) noexcept { - stdx::lock_guard lk(_mutex); - if (!_currentOp) - return; - - invariant(checkState(*_currentOp->donorState, - {DonorStateEnum::kPreparingToBlockWrites, - DonorStateEnum::kBlockingWrites, - DonorStateEnum::kError})); - - onWriteDuringCriticalSectionForCurrentOp(writes); - _cumulativeOp->writesDuringCriticalSection += writes; -} - -void ReshardingMetrics::onWriteDuringCriticalSectionForCurrentOp(int64_t writes) noexcept { - invariant(_currentOp, kNoOperationInProgress); - - _currentOp->writesDuringCriticalSection += writes; -} - -void ReshardingMetrics::serializeCurrentOpMetrics(BSONObjBuilder* bob, Role role) const { - stdx::lock_guard lk(_mutex); - if (_currentOp) - _currentOp->appendCurrentOpMetrics(bob, role, _now()); -} - -BSONObj ReshardingMetrics::reportForCurrentOp(const ReporterOptions& options) const noexcept { - auto roleName = [](Role role) { - switch (role) { - case Role::kDonor: - return "Donor"_sd; - case Role::kRecipient: - return "Recipient"_sd; - case Role::kCoordinator: - return "Coordinator"_sd; - } - MONGO_UNREACHABLE; - }; - - BSONObjBuilder bob; - bob.append("type", "op"); - bob.append( - "desc", - fmt::format("Resharding{}Service {}", roleName(options.role), options.id.toString())); - bob.append("op", "command"); - bob.append("ns", options.nss.toString()); - - { - BSONObjBuilder originating{bob.subobjStart("originatingCommand")}; - originating.append("reshardCollection", options.nss.toString()); - originating.append("key", options.shardKey); - originating.append("unique", options.unique); - BSONObjBuilder{originating.subobjStart("collation")}.append("locale", "simple"); - } - - serializeCurrentOpMetrics(&bob, options.role); - - return bob.obj(); -} - -boost::optional ReshardingMetrics::getOperationElapsedTime() const { - stdx::lock_guard lk(_mutex); - if (!_currentOp) - return boost::none; - return _currentOp->runningOperation.duration(_now()); -} - -boost::optional ReshardingMetrics::getOperationRemainingTime() const { - stdx::lock_guard lk(_mutex); - if (_currentOp) - return _currentOp->remainingOperationTime(_now()); - return boost::none; -} - -bool ReshardingMetrics::wasReshardingEverAttempted() const { - stdx::lock_guard lk(_mutex); - return _started != 0 || _succeeded != 0 || _failed != 0 || _canceled != 0 || _onStepUpCalled; -} - -void ReshardingMetrics::serializeCumulativeOpMetrics(BSONObjBuilder* bob) const { - stdx::lock_guard lk(_mutex); - - auto getRemainingOperationTime = [&](const Milliseconds& time) -> int64_t { - return durationCount(time); - }; - - bob->append(kTotalOps, _started); - bob->append(kSuccessfulOps, _succeeded); - bob->append(kFailedOps, _failed); - bob->append(kCanceledOps, _canceled); - - const auto& ops = *_cumulativeOp; - bob->append(kDocumentsCopied, ops.documentsCopied); - bob->append(kBytesCopied, ops.bytesCopied); - bob->append(kOplogsApplied, ops.oplogEntriesApplied); - bob->append(kWritesDuringCritialSection, ops.writesDuringCriticalSection); - bob->append(kOplogsFetched, ops.oplogEntriesFetched); - bob->append(kLastOpEndingChunkImbalance, ops.chunkImbalanceCount); - bob->append(kOpCounters, ops.opCounters.getObj()); - bob->append(kMinRemainingOperationTime, - getRemainingOperationTime(ops.minRemainingOperationTime)); - bob->append(kMaxRemainingOperationTime, - getRemainingOperationTime(ops.maxRemainingOperationTime)); - - appendHistogram( - *bob, ops.oplogApplierApplyBatchLatencyMillis, kOplogApplierApplyBatchLatencyMillis); - appendHistogram(*bob, - ops.collClonerFillBatchForInsertLatencyMillis, - kCollClonerFillBatchForInsertLatencyMillis); -} - -Date_t ReshardingMetrics::_now() const { - return _svcCtx->getFastClockSource()->now(); -} - -ReshardingMetrics::ReshardingMetrics(ServiceContext* svcCtx) - : _svcCtx{svcCtx}, _cumulativeOp{std::make_unique()} {} - -ReshardingMetrics::~ReshardingMetrics() = default; - -} // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_metrics.h b/src/mongo/db/s/resharding/resharding_metrics.h deleted file mode 100644 index a6964c9d611..00000000000 --- a/src/mongo/db/s/resharding/resharding_metrics.h +++ /dev/null @@ -1,203 +0,0 @@ -/** - * Copyright (C) 2020-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * . - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include - -#include "mongo/bson/bsonobj.h" -#include "mongo/bson/bsonobjbuilder.h" -#include "mongo/db/namespace_string.h" -#include "mongo/db/s/resharding/donor_document_gen.h" -#include "mongo/db/service_context.h" -#include "mongo/platform/mutex.h" -#include "mongo/s/resharding/common_types_gen.h" -#include "mongo/util/clock_source.h" -#include "mongo/util/duration.h" -#include "mongo/util/histogram.h" -#include "mongo/util/uuid.h" - -namespace mongo { - -/* - * Maintains the metrics for resharding operations. - * All members of this class are thread-safe. - */ -class ReshardingMetrics final { -public: - enum Role { kCoordinator, kDonor, kRecipient }; - - static ReshardingMetrics* get(ServiceContext*) noexcept; - - explicit ReshardingMetrics(ServiceContext* svcCtx); - ~ReshardingMetrics(); - - ReshardingMetrics(const ReshardingMetrics&) = delete; - ReshardingMetrics& operator=(const ReshardingMetrics&) = delete; - - // Marks the beginning of a resharding operation for a particular role. Note that: - // * Only one resharding operation may run at any time. - // * The only valid co-existing roles on a process are kDonor and kRecipient. - void onStart(Role role, Date_t runningOperationStartTime) noexcept; - - // Marks the resumption of a resharding operation for a particular role. - void onStepUp(Role role) noexcept; - - void onStepUp(DonorStateEnum state, ReshardingDonorMetrics donorMetrics); - - // So long as a resharding operation is in progress, the following may be used to update the - // state of a donor, a recipient, and a coordinator, respectively. - void setDonorState(DonorStateEnum) noexcept; - void setRecipientState(RecipientStateEnum) noexcept; - void setCoordinatorState(CoordinatorStateEnum) noexcept; - - void setDocumentsToCopy(int64_t documents, int64_t bytes) noexcept; - void setDocumentsToCopyForCurrentOp(int64_t documents, int64_t bytes) noexcept; - // Allows updating metrics on "documents to copy" so long as the recipient is in cloning state. - void onDocumentsCopied(int64_t documents, int64_t bytes) noexcept; - - // Allows updating metrics on "opcounters"; - void gotInserts(int n) noexcept; - void gotInsert() noexcept; - void gotUpdate() noexcept; - void gotDelete() noexcept; - - void setMinRemainingOperationTime(Milliseconds minOpTime) noexcept; - void setMaxRemainingOperationTime(Milliseconds maxOpTime) noexcept; - - // Starts/ends the timers recording the times spend in the named sections. - void startCopyingDocuments(Date_t start); - void endCopyingDocuments(Date_t end); - - void startApplyingOplogEntries(Date_t start); - void endApplyingOplogEntries(Date_t end); - - void enterCriticalSection(Date_t start); - void leaveCriticalSection(Date_t end); - - // Records latency and throughput of calls to ReshardingOplogApplier::_applyBatch - void onOplogApplierApplyBatch(Milliseconds latency); - - // Records latency and throughput of calls to resharding::data_copy::fillBatchForInsert - // in ReshardingCollectionCloner::doOneBatch - void onCollClonerFillBatchForInsert(Milliseconds latency); - - // Allows updating "oplog entries to apply" metrics when the recipient is in applying state. - void onOplogEntriesFetched(int64_t entries) noexcept; - // Allows restoring "oplog entries to apply" metrics. - void onOplogEntriesApplied(int64_t entries) noexcept; - - void restoreForCurrentOp(int64_t documentCountCopied, - int64_t documentBytesCopied, - int64_t oplogEntriesFetched, - int64_t oplogEntriesApplied) noexcept; - - // Allows tracking writes during a critical section when the donor's state is either of - // "donating-oplog-entries" or "blocking-writes". - void onWriteDuringCriticalSection(int64_t writes) noexcept; - // Allows restoring writes during a critical section. - void onWriteDuringCriticalSectionForCurrentOp(int64_t writes) noexcept; - - // Indicates that a role on this node is stepping down. If the role being stepped down is the - // last active role on this process, the function tears down the currentOp variable. The - // replica set primary that is stepping up continues the resharding operation from disk. - void onStepDown(Role role) noexcept; - - // Marks the completion of the current (active) resharding operation for a particular role. If - // the role being completed is the last active role on this process, the function tears down - // the currentOp variable, indicating completion for the resharding operation on this process. - // - // Aborts the process if no resharding operation is in progress. - void onCompletion(Role role, - ReshardingOperationStatusEnum status, - Date_t runningOperationEndTime) noexcept; - - // Records the chunk imbalance count for the most recent resharding operation. - void setLastReshardChunkImbalanceCount(int64_t newCount) noexcept; - - struct ReporterOptions { - ReporterOptions(Role role, UUID id, NamespaceString nss, BSONObj shardKey, bool unique) - : role(role), - id(std::move(id)), - nss(std::move(nss)), - shardKey(std::move(shardKey)), - unique(unique) {} - - const Role role; - const UUID id; - const NamespaceString nss; - const BSONObj shardKey; - const bool unique; - }; - BSONObj reportForCurrentOp(const ReporterOptions& options) const noexcept; - - bool wasReshardingEverAttempted() const; - - // Append metrics to the builder in CurrentOp format for the given `role`. - void serializeCurrentOpMetrics(BSONObjBuilder*, Role role) const; - - // Append metrics to the builder in CumulativeOp (ServerStatus) format. - void serializeCumulativeOpMetrics(BSONObjBuilder*) const; - - // Reports the elapsed time for the active resharding operation, or `boost::none`. - boost::optional getOperationElapsedTime() const; - - // Reports the estimated remaining time for the active resharding operation, or `boost::none`. - boost::optional getOperationRemainingTime() const; - - static Histogram getLatencyHistogram() { - return Histogram({10, 100, 1000, 10000}); - } - -private: - class OperationMetrics; - - ServiceContext* const _svcCtx; - - mutable Mutex _mutex = MONGO_MAKE_LATCH("ReshardingMetrics::_mutex"); - - void _emplaceCurrentOpForRole(Role role, - boost::optional runningOperationStartTime) noexcept; - - Date_t _now() const; - - bool _onStepUpCalled = false; - - // The following maintain the number of resharding operations that have started, succeeded, - // failed with an unrecoverable error, and canceled by the user, respectively. - int64_t _started = 0; - int64_t _succeeded = 0; - int64_t _failed = 0; - int64_t _canceled = 0; - - std::unique_ptr _currentOp; - std::unique_ptr _cumulativeOp; -}; - -} // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_metrics_helpers.cpp b/src/mongo/db/s/resharding/resharding_metrics_helpers.cpp index 786bcdaa9ab..86cc76261a8 100644 --- a/src/mongo/db/s/resharding/resharding_metrics_helpers.cpp +++ b/src/mongo/db/s/resharding/resharding_metrics_helpers.cpp @@ -42,9 +42,6 @@ namespace resharding_metrics { namespace { void onCriticalSectionErrorThrows(OperationContext* opCtx, const StaleConfigInfo& info) { - if (!ShardingDataTransformMetrics::isEnabled()) { - return; - } const auto& operationType = info.getDuringOperationType(); if (!operationType) { return; diff --git a/src/mongo/db/s/resharding/resharding_metrics_new.cpp b/src/mongo/db/s/resharding/resharding_metrics_new.cpp index fa63eb9da3c..e07468ad1b9 100644 --- a/src/mongo/db/s/resharding/resharding_metrics_new.cpp +++ b/src/mongo/db/s/resharding/resharding_metrics_new.cpp @@ -61,13 +61,9 @@ BSONObj createOriginalCommand(const NamespaceString& nss, BSONObj shardKey) { } Date_t readStartTime(const CommonReshardingMetadata& metadata, ClockSource* fallbackSource) { - try { - const auto& startTime = metadata.getStartTime(); - tassert(6503901, - "Metadata is missing start time despite feature flag being enabled", - startTime.has_value()); + if (const auto& startTime = metadata.getStartTime()) { return startTime.get(); - } catch (const DBException&) { + } else { return fallbackSource->now(); } } @@ -170,4 +166,134 @@ void ReshardingMetricsNew::restoreCoordinatorSpecificFields( restorePhaseDurationFields(document); } +ReshardingMetricsNew::DonorState::DonorState(DonorStateEnum enumVal) : _enumVal(enumVal) {} + +ShardingDataTransformCumulativeMetrics::DonorStateEnum ReshardingMetricsNew::DonorState::toMetrics() + const { + using MetricsEnum = ShardingDataTransformCumulativeMetrics::DonorStateEnum; + + switch (_enumVal) { + case DonorStateEnum::kUnused: + return MetricsEnum::kUnused; + + case DonorStateEnum::kPreparingToDonate: + return MetricsEnum::kPreparingToDonate; + + case DonorStateEnum::kDonatingInitialData: + return MetricsEnum::kDonatingInitialData; + + case DonorStateEnum::kDonatingOplogEntries: + return MetricsEnum::kDonatingOplogEntries; + + case DonorStateEnum::kPreparingToBlockWrites: + return MetricsEnum::kPreparingToBlockWrites; + + case DonorStateEnum::kError: + return MetricsEnum::kError; + + case DonorStateEnum::kBlockingWrites: + return MetricsEnum::kBlockingWrites; + + case DonorStateEnum::kDone: + return MetricsEnum::kDone; + default: + invariant(false, + str::stream() << "Unexpected resharding coordinator state: " + << DonorState_serializer(_enumVal)); + MONGO_UNREACHABLE; + } +} + +DonorStateEnum ReshardingMetricsNew::DonorState::getState() const { + return _enumVal; +} + +ReshardingMetricsNew::RecipientState::RecipientState(RecipientStateEnum enumVal) + : _enumVal(enumVal) {} + +ShardingDataTransformCumulativeMetrics::RecipientStateEnum +ReshardingMetricsNew::RecipientState::toMetrics() const { + using MetricsEnum = ShardingDataTransformCumulativeMetrics::RecipientStateEnum; + + switch (_enumVal) { + case RecipientStateEnum::kUnused: + return MetricsEnum::kUnused; + + case RecipientStateEnum::kAwaitingFetchTimestamp: + return MetricsEnum::kAwaitingFetchTimestamp; + + case RecipientStateEnum::kCreatingCollection: + return MetricsEnum::kCreatingCollection; + + case RecipientStateEnum::kCloning: + return MetricsEnum::kCloning; + + case RecipientStateEnum::kApplying: + return MetricsEnum::kApplying; + + case RecipientStateEnum::kError: + return MetricsEnum::kError; + + case RecipientStateEnum::kStrictConsistency: + return MetricsEnum::kStrictConsistency; + + case RecipientStateEnum::kDone: + return MetricsEnum::kDone; + + default: + invariant(false, + str::stream() << "Unexpected resharding coordinator state: " + << RecipientState_serializer(_enumVal)); + MONGO_UNREACHABLE; + } +} + +RecipientStateEnum ReshardingMetricsNew::RecipientState::getState() const { + return _enumVal; +} + +ReshardingMetricsNew::CoordinatorState::CoordinatorState(CoordinatorStateEnum enumVal) + : _enumVal(enumVal) {} + +ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum +ReshardingMetricsNew::CoordinatorState::toMetrics() const { + switch (_enumVal) { + case CoordinatorStateEnum::kUnused: + return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kUnused; + + case CoordinatorStateEnum::kInitializing: + return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kInitializing; + + case CoordinatorStateEnum::kPreparingToDonate: + return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kPreparingToDonate; + + case CoordinatorStateEnum::kCloning: + return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kCloning; + + case CoordinatorStateEnum::kApplying: + return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kApplying; + + case CoordinatorStateEnum::kBlockingWrites: + return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kBlockingWrites; + + case CoordinatorStateEnum::kAborting: + return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kAborting; + + case CoordinatorStateEnum::kCommitting: + return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kCommitting; + + case CoordinatorStateEnum::kDone: + return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kDone; + default: + invariant(false, + str::stream() << "Unexpected resharding coordinator state: " + << CoordinatorState_serializer(_enumVal)); + MONGO_UNREACHABLE; + } +} + +CoordinatorStateEnum ReshardingMetricsNew::CoordinatorState::getState() const { + return _enumVal; +} + } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_metrics_new.h b/src/mongo/db/s/resharding/resharding_metrics_new.h index 1f060b320b0..b8e96698b0d 100644 --- a/src/mongo/db/s/resharding/resharding_metrics_new.h +++ b/src/mongo/db/s/resharding/resharding_metrics_new.h @@ -42,6 +42,42 @@ class ReshardingMetricsNew : public ShardingDataTransformInstanceMetrics { public: using State = stdx::variant; + class DonorState { + public: + using MetricsType = ShardingDataTransformCumulativeMetrics::DonorStateEnum; + + explicit DonorState(DonorStateEnum enumVal); + MetricsType toMetrics() const; + DonorStateEnum getState() const; + + private: + const DonorStateEnum _enumVal; + }; + + class RecipientState { + public: + using MetricsType = ShardingDataTransformCumulativeMetrics::RecipientStateEnum; + + explicit RecipientState(RecipientStateEnum enumVal); + MetricsType toMetrics() const; + RecipientStateEnum getState() const; + + private: + RecipientStateEnum _enumVal; + }; + + class CoordinatorState { + public: + using MetricsType = ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum; + + explicit CoordinatorState(CoordinatorStateEnum enumVal); + MetricsType toMetrics() const; + CoordinatorStateEnum getState() const; + + private: + CoordinatorStateEnum _enumVal; + }; + ReshardingMetricsNew(UUID instanceId, BSONObj shardKey, NamespaceString nss, @@ -84,6 +120,26 @@ public: ShardingDataTransformCumulativeMetrics::getForResharding(serviceContext)); } + template + void onStateTransition(T before, boost::none_t after) { + getCumulativeMetrics()->onStateTransition(before.toMetrics(), + after); + } + + template + void onStateTransition(boost::none_t before, T after) { + setState(after.getState()); + getCumulativeMetrics()->onStateTransition(before, + after.toMetrics()); + } + + template + void onStateTransition(T before, T after) { + setState(after.getState()); + getCumulativeMetrics()->onStateTransition(before.toMetrics(), + after.toMetrics()); + } + template void setState(T state) { static_assert(std::is_assignable_v); diff --git a/src/mongo/db/s/resharding/resharding_metrics_test.cpp b/src/mongo/db/s/resharding/resharding_metrics_test.cpp deleted file mode 100644 index 978963eb555..00000000000 --- a/src/mongo/db/s/resharding/resharding_metrics_test.cpp +++ /dev/null @@ -1,895 +0,0 @@ -/** - * Copyright (C) 2020-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * . - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - - -#include - -#include "mongo/bson/bsonobjbuilder.h" -#include "mongo/bson/json.h" -#include "mongo/db/s/resharding/resharding_metrics.h" -#include "mongo/db/service_context_test_fixture.h" -#include "mongo/logv2/log.h" -#include "mongo/unittest/death_test.h" -#include "mongo/unittest/unittest.h" -#include "mongo/util/clock_source_mock.h" -#include "mongo/util/duration.h" -#include "mongo/util/histogram.h" -#include "mongo/util/uuid.h" - -#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest - - -namespace mongo { -namespace { - -using namespace fmt::literals; - -constexpr auto kOpTimeRemaining = "remainingOperationTimeEstimatedSecs"_sd; -constexpr auto kOplogApplierApplyBatchLatencyMillis = "oplogApplierApplyBatchLatencyMillis"; -constexpr auto kCollClonerFillBatchForInsertLatencyMillis = - "collClonerFillBatchForInsertLatencyMillis"; - -class ReshardingMetricsTest : public ServiceContextTest { -public: - enum OpReportType { - CumulativeReport, - CurrentOpReportDonorRole, - CurrentOpReportRecipientRole, - CurrentOpReportCoordinatorRole - }; - - void setUp() override { - auto clockSource = std::make_unique(); - _clockSource = clockSource.get(); - getGlobalServiceContext()->setFastClockSource(std::move(clockSource)); - } - - auto getMetrics() { - return ReshardingMetrics::get(getGlobalServiceContext()); - } - - void startOperation(ReshardingMetrics::Role role) { - getMetrics()->onStart(role, getGlobalServiceContext()->getFastClockSource()->now()); - } - - void stepUpOperation(ReshardingMetrics::Role role) { - getMetrics()->onStepUp(role); - } - - void stepDownOperation(ReshardingMetrics::Role role) { - getMetrics()->onStepDown(role); - } - - void completeOperation(ReshardingMetrics::Role role, ReshardingOperationStatusEnum opStatus) { - getMetrics()->onCompletion( - role, opStatus, getGlobalServiceContext()->getFastClockSource()->now()); - } - - // Timer step in milliseconds - static constexpr auto kTimerStep = 100; - - void advanceTime(Milliseconds step = Milliseconds{kTimerStep}) { - _clockSource->advance(step); - } - - auto getWasReshardingEverAttempted() { - return getMetrics()->wasReshardingEverAttempted(); - } - - auto getReport(OpReportType reportType) { - BSONObjBuilder bob; - if (reportType == OpReportType::CumulativeReport) { - getMetrics()->serializeCumulativeOpMetrics(&bob); - } else if (reportType == OpReportType::CurrentOpReportDonorRole) { - getMetrics()->serializeCurrentOpMetrics(&bob, ReshardingMetrics::Role::kDonor); - } else if (reportType == OpReportType::CurrentOpReportRecipientRole) { - getMetrics()->serializeCurrentOpMetrics(&bob, ReshardingMetrics::Role::kRecipient); - } else { - getMetrics()->serializeCurrentOpMetrics(&bob, ReshardingMetrics::Role::kCoordinator); - } - return bob.obj(); - } - - void checkMetrics(std::string tag, int expectedValue, OpReportType reportType) { - const auto report = getReport(reportType); - checkMetrics(report, std::move(tag), expectedValue); - } - - void checkMetrics(std::string tag, - int expectedValue, - std::string errMsg, - OpReportType reportType) { - const auto report = getReport(reportType); - checkMetrics(report, std::move(tag), expectedValue, std::move(errMsg)); - } - - void checkMetrics(const BSONObj& report, - std::string tag, - int expectedValue, - std::string errMsg = "Unexpected value") const { - ASSERT_EQ(report.getIntField(tag), expectedValue) - << fmt::format("{}: {}", errMsg, report.toString()); - }; - - void appendExpectedHistogramResult(BSONObjBuilder* bob, - std::string tag, - const std::vector& latencies) { - Histogram hist = ReshardingMetrics::getLatencyHistogram(); - for (size_t i = 0; i < latencies.size(); i++) { - hist.increment(latencies[i]); - } - - BSONObjBuilder histogramBuilder; - appendHistogram(histogramBuilder, hist, tag); - BSONObj histogram = histogramBuilder.obj(); - bob->appendElements(histogram); - } - -private: - ClockSourceMock* _clockSource; -}; - -// TODO Re-enable once underlying invariants are re-enabled -/* -DEATH_TEST_F(ReshardingMetricsTest, RunOnCompletionBeforeOnStart, "No operation is in progress") { - completeOperation(ReshardingMetrics::Role::kRecipient, - ReshardingOperationStatusEnum::kSuccess); -} - -DEATH_TEST_F(ReshardingMetricsTest, - RunOnStepUpAfterOnStartInvariants, - "Another operation is in progress") { - startOperation(ReshardingMetrics::Role::kRecipient); - stepUpOperation(ReshardingMetrics::Role::kRecipient); -} - -DEATH_TEST_F(ReshardingMetricsTest, - RunOnCompletionAfterOnStepDownInvariants, - "No operation is in progress") { - startOperation(ReshardingMetrics::Role::kRecipient); - stepDownOperation(ReshardingMetrics::Role::kRecipient); - completeOperation(ReshardingMetrics::Role::kRecipient, - ReshardingOperationStatusEnum::kSuccess); -} -*/ - -TEST_F(ReshardingMetricsTest, RunOnCompletionTwiceIsSafe) { - startOperation(ReshardingMetrics::Role::kCoordinator); - completeOperation(ReshardingMetrics::Role::kCoordinator, - ReshardingOperationStatusEnum::kSuccess); - completeOperation(ReshardingMetrics::Role::kCoordinator, - ReshardingOperationStatusEnum::kSuccess); -} - -TEST_F(ReshardingMetricsTest, RunOnStepDownAfterOnCompletionIsSafe) { - startOperation(ReshardingMetrics::Role::kRecipient); - completeOperation(ReshardingMetrics::Role::kRecipient, ReshardingOperationStatusEnum::kSuccess); - stepDownOperation(ReshardingMetrics::Role::kRecipient); -} - -DEATH_TEST_F(ReshardingMetricsTest, CoordinatorThenDonor, "Another operation is in progress") { - startOperation(ReshardingMetrics::Role::kCoordinator); - startOperation(ReshardingMetrics::Role::kDonor); -} - -DEATH_TEST_F(ReshardingMetricsTest, DonorThenCoordinator, "Another operation is in progress") { - startOperation(ReshardingMetrics::Role::kDonor); - startOperation(ReshardingMetrics::Role::kCoordinator); -} - -DEATH_TEST_F(ReshardingMetricsTest, CoordinatorThenRecipient, "Another operation is in progress") { - startOperation(ReshardingMetrics::Role::kCoordinator); - startOperation(ReshardingMetrics::Role::kRecipient); -} - -DEATH_TEST_F(ReshardingMetricsTest, RecipientThenCoordinator, "Another operation is in progress") { - startOperation(ReshardingMetrics::Role::kRecipient); - startOperation(ReshardingMetrics::Role::kCoordinator); -} - -TEST_F(ReshardingMetricsTest, DonorAndRecipientCombinationIsSafe) { - startOperation(ReshardingMetrics::Role::kRecipient); - startOperation(ReshardingMetrics::Role::kDonor); - completeOperation(ReshardingMetrics::Role::kRecipient, ReshardingOperationStatusEnum::kSuccess); - completeOperation(ReshardingMetrics::Role::kDonor, ReshardingOperationStatusEnum::kSuccess); -} - -TEST_F(ReshardingMetricsTest, DonorAndRecipientStepdownIsSafe) { - startOperation(ReshardingMetrics::Role::kDonor); - startOperation(ReshardingMetrics::Role::kRecipient); - stepDownOperation(ReshardingMetrics::Role::kRecipient); - stepDownOperation(ReshardingMetrics::Role::kDonor); -} - -TEST_F(ReshardingMetricsTest, OperationStatus) { - startOperation(ReshardingMetrics::Role::kCoordinator); - const auto report = getReport(OpReportType::CurrentOpReportCoordinatorRole); - ASSERT_EQ(report.getStringField("opStatus"), - ReshardingOperationStatus_serializer(ReshardingOperationStatusEnum::kRunning)); - completeOperation(ReshardingMetrics::Role::kCoordinator, - ReshardingOperationStatusEnum::kSuccess); -} - -TEST_F(ReshardingMetricsTest, TestOperationStatus) { - const auto kNumSuccessfulOps = 3; - const auto kNumFailedOps = 5; - const auto kNumCanceledOps = 7; - - for (auto i = 0; i < kNumSuccessfulOps; i++) { - startOperation(ReshardingMetrics::Role::kRecipient); - completeOperation(ReshardingMetrics::Role::kRecipient, - ReshardingOperationStatusEnum::kSuccess); - } - - for (auto i = 0; i < kNumFailedOps; i++) { - startOperation(ReshardingMetrics::Role::kRecipient); - completeOperation(ReshardingMetrics::Role::kRecipient, - ReshardingOperationStatusEnum::kFailure); - } - - for (auto i = 0; i < kNumCanceledOps; i++) { - startOperation(ReshardingMetrics::Role::kRecipient); - completeOperation(ReshardingMetrics::Role::kRecipient, - ReshardingOperationStatusEnum::kCanceled); - } - - checkMetrics("countReshardingSuccessful", kNumSuccessfulOps, OpReportType::CumulativeReport); - checkMetrics("countReshardingFailures", kNumFailedOps, OpReportType::CumulativeReport); - checkMetrics("countReshardingCanceled", kNumCanceledOps, OpReportType::CumulativeReport); - - const auto total = kNumSuccessfulOps + kNumFailedOps + kNumCanceledOps; - checkMetrics("countReshardingOperations", total, OpReportType::CumulativeReport); - startOperation(ReshardingMetrics::Role::kRecipient); - checkMetrics("countReshardingOperations", total + 1, OpReportType::CumulativeReport); -} - -TEST_F(ReshardingMetricsTest, TestElapsedTime) { - startOperation(ReshardingMetrics::Role::kDonor); - const auto elapsedTime = 1; - advanceTime(Seconds(elapsedTime)); - checkMetrics( - "totalOperationTimeElapsedSecs", elapsedTime, OpReportType::CurrentOpReportDonorRole); -} - -TEST_F(ReshardingMetricsTest, TestDonorAndRecipientMetrics) { - startOperation(ReshardingMetrics::Role::kRecipient); - startOperation(ReshardingMetrics::Role::kDonor); - const auto elapsedTime = 1; - - advanceTime(Seconds(elapsedTime)); - - // Update metrics for donor - const auto kWritesDuringCriticalSection = 7; - getMetrics()->setDonorState(DonorStateEnum::kPreparingToBlockWrites); - getMetrics()->enterCriticalSection(getGlobalServiceContext()->getFastClockSource()->now()); - getMetrics()->onWriteDuringCriticalSection(kWritesDuringCriticalSection); - advanceTime(Seconds(elapsedTime)); - - // Update metrics for recipient - const auto kDocumentsToCopy = 50; - const auto kBytesToCopy = 740; - const auto kCopyProgress = 50; - getMetrics()->setRecipientState(RecipientStateEnum::kCreatingCollection); - getMetrics()->setDocumentsToCopy(kDocumentsToCopy, kBytesToCopy); - getMetrics()->setRecipientState(RecipientStateEnum::kCloning); - getMetrics()->startCopyingDocuments(getGlobalServiceContext()->getFastClockSource()->now()); - getMetrics()->onDocumentsCopied(kDocumentsToCopy * kCopyProgress / 100, - kBytesToCopy * kCopyProgress / 100); - advanceTime(Seconds(elapsedTime)); - - const auto currentDonorOpReport = getReport(OpReportType::CurrentOpReportDonorRole); - const auto currentRecipientOpReport = getReport(OpReportType::CurrentOpReportRecipientRole); - completeOperation(ReshardingMetrics::Role::kDonor, ReshardingOperationStatusEnum::kSuccess); - completeOperation(ReshardingMetrics::Role::kRecipient, ReshardingOperationStatusEnum::kSuccess); - - checkMetrics(currentRecipientOpReport, "totalCopyTimeElapsedSecs", elapsedTime); - checkMetrics(currentRecipientOpReport, "bytesCopied", kBytesToCopy * kCopyProgress / 100); - checkMetrics( - currentRecipientOpReport, "documentsCopied", kDocumentsToCopy * kCopyProgress / 100); - checkMetrics(currentDonorOpReport, "totalCriticalSectionTimeElapsedSecs", elapsedTime * 2); - checkMetrics( - currentDonorOpReport, "countWritesDuringCriticalSection", kWritesDuringCriticalSection); - - const auto cumulativeReportAfterCompletion = getReport(OpReportType::CumulativeReport); - checkMetrics( - cumulativeReportAfterCompletion, "bytesCopied", kBytesToCopy * kCopyProgress / 100); - checkMetrics( - cumulativeReportAfterCompletion, "documentsCopied", kDocumentsToCopy * kCopyProgress / 100); - checkMetrics(cumulativeReportAfterCompletion, - "countWritesDuringCriticalSection", - kWritesDuringCriticalSection); -} - -TEST_F(ReshardingMetricsTest, CumulativeOpMetricsAreRetainedAfterCompletion) { - auto constexpr kTag = "documentsCopied"; - startOperation(ReshardingMetrics::Role::kRecipient); - const auto kDocumentsToCopy = 2; - const auto kBytesToCopy = 200; - getMetrics()->setRecipientState(RecipientStateEnum::kCloning); - getMetrics()->startCopyingDocuments(getGlobalServiceContext()->getFastClockSource()->now()); - getMetrics()->onDocumentsCopied(kDocumentsToCopy, kBytesToCopy); - advanceTime(); - completeOperation(ReshardingMetrics::Role::kRecipient, ReshardingOperationStatusEnum::kFailure); - advanceTime(); - - checkMetrics(kTag, - kDocumentsToCopy, - "Cumulative metrics are not retained", - OpReportType::CumulativeReport); - - startOperation(ReshardingMetrics::Role::kRecipient); - checkMetrics( - kTag, kDocumentsToCopy, "Cumulative metrics are reset", OpReportType::CumulativeReport); -} - -TEST_F(ReshardingMetricsTest, CumulativeOpMetricsAreRetainedAfterCancellation) { - auto constexpr kTag = "documentsCopied"; - startOperation(ReshardingMetrics::Role::kRecipient); - const auto kDocumentsToCopy = 2; - const auto kBytesToCopy = 200; - getMetrics()->setRecipientState(RecipientStateEnum::kCloning); - getMetrics()->startCopyingDocuments(getGlobalServiceContext()->getFastClockSource()->now()); - getMetrics()->onDocumentsCopied(kDocumentsToCopy, kBytesToCopy); - advanceTime(); - completeOperation(ReshardingMetrics::Role::kRecipient, - ReshardingOperationStatusEnum::kCanceled); - advanceTime(); - - checkMetrics(kTag, - kDocumentsToCopy, - "Cumulative metrics are not retained", - OpReportType::CumulativeReport); - - startOperation(ReshardingMetrics::Role::kRecipient); - checkMetrics( - kTag, kDocumentsToCopy, "Cumulative metrics are reset", OpReportType::CumulativeReport); -} - -TEST_F(ReshardingMetricsTest, CurrentOpMetricsAreResetAfterCompletion) { - auto constexpr kTag = "documentsCopied"; - startOperation(ReshardingMetrics::Role::kRecipient); - const auto kDocumentsToCopy = 2; - const auto kBytesToCopy = 200; - getMetrics()->setRecipientState(RecipientStateEnum::kCloning); - getMetrics()->startCopyingDocuments(getGlobalServiceContext()->getFastClockSource()->now()); - getMetrics()->onDocumentsCopied(kDocumentsToCopy, kBytesToCopy); - checkMetrics(kTag, - kDocumentsToCopy, - "Current metrics are not set", - OpReportType::CurrentOpReportRecipientRole); - advanceTime(); - completeOperation(ReshardingMetrics::Role::kRecipient, ReshardingOperationStatusEnum::kSuccess); - advanceTime(); - - startOperation(ReshardingMetrics::Role::kRecipient); - checkMetrics( - kTag, 0, "Current metrics are not reset", OpReportType::CurrentOpReportRecipientRole); -} - -TEST_F(ReshardingMetricsTest, CurrentOpMetricsAreNotRetainedAfterCompletion) { - auto constexpr kTag = "documentsCopied"; - startOperation(ReshardingMetrics::Role::kRecipient); - const auto kDocumentsToCopy = 2; - const auto kBytesToCopy = 200; - getMetrics()->setRecipientState(RecipientStateEnum::kCloning); - getMetrics()->startCopyingDocuments(getGlobalServiceContext()->getFastClockSource()->now()); - getMetrics()->onDocumentsCopied(kDocumentsToCopy, kBytesToCopy); - checkMetrics(kTag, - kDocumentsToCopy, - "Current metrics are not set", - OpReportType::CurrentOpReportRecipientRole); - advanceTime(); - completeOperation(ReshardingMetrics::Role::kRecipient, ReshardingOperationStatusEnum::kFailure); - advanceTime(); - - ASSERT_FALSE(getReport(OpReportType::CurrentOpReportRecipientRole)[kTag].ok()); -} - -TEST_F(ReshardingMetricsTest, CurrentOpMetricsAreNotRetainedAfterStepDown) { - auto constexpr kTag = "documentsCopied"; - startOperation(ReshardingMetrics::Role::kRecipient); - const auto kDocumentsToCopy = 2; - const auto kBytesToCopy = 200; - getMetrics()->setRecipientState(RecipientStateEnum::kCloning); - getMetrics()->startCopyingDocuments(getGlobalServiceContext()->getFastClockSource()->now()); - getMetrics()->onDocumentsCopied(kDocumentsToCopy, kBytesToCopy); - checkMetrics(kTag, - kDocumentsToCopy, - "Current metrics are not set", - OpReportType::CurrentOpReportRecipientRole); - advanceTime(); - stepDownOperation(ReshardingMetrics::Role::kRecipient); - advanceTime(); - - ASSERT_FALSE(getReport(OpReportType::CurrentOpReportRecipientRole)[kTag].ok()); -} - -TEST_F(ReshardingMetricsTest, CumulativeOpMetricsAreSetAndReset) { - const auto kExpectedMin = Milliseconds(1); - const auto kExpectedMax = Milliseconds(8); - auto constexpr kMinOpTime = "minShardRemainingOperationTimeEstimatedMillis"; - auto constexpr kMaxOpTime = "maxShardRemainingOperationTimeEstimatedMillis"; - startOperation(ReshardingMetrics::Role::kCoordinator); - getMetrics()->setCoordinatorState(CoordinatorStateEnum::kBlockingWrites); - - getMetrics()->setMinRemainingOperationTime(kExpectedMin); - checkMetrics(kMinOpTime, - durationCount(kExpectedMin), - "Cumulative metrics minimum time remaining is not set", - OpReportType::CumulativeReport); - - getMetrics()->setMaxRemainingOperationTime(kExpectedMax); - checkMetrics(kMaxOpTime, - durationCount(kExpectedMax), - "Cumulative metrics maximum time remaining is not set", - OpReportType::CumulativeReport); - - advanceTime(); - completeOperation(ReshardingMetrics::Role::kCoordinator, - ReshardingOperationStatusEnum::kSuccess); - advanceTime(); - checkMetrics(kMinOpTime, - 0, - "Cumulative metrics minimum time remaining is not reset", - OpReportType::CumulativeReport); - - checkMetrics(kMaxOpTime, - 0, - "Cumulative metrics maximum time remaining is not reset", - OpReportType::CumulativeReport); -} - -TEST_F(ReshardingMetricsTest, EstimatedRemainingOperationTime) { - auto constexpr kTag = "remainingOperationTimeEstimatedSecs"; - const auto elapsedTime = 1; - - startOperation(ReshardingMetrics::Role::kRecipient); - checkMetrics(kTag, -1, OpReportType::CurrentOpReportRecipientRole); - - const auto kDocumentsToCopy = 2; - const auto kBytesToCopy = 200; - getMetrics()->setRecipientState(RecipientStateEnum::kCreatingCollection); - getMetrics()->setDocumentsToCopy(kDocumentsToCopy, kBytesToCopy); - getMetrics()->setRecipientState(RecipientStateEnum::kCloning); - getMetrics()->startCopyingDocuments(getGlobalServiceContext()->getFastClockSource()->now()); - getMetrics()->onDocumentsCopied(kDocumentsToCopy / 2, kBytesToCopy / 2); - advanceTime(Seconds(elapsedTime)); - // Since 50% of the data is copied, the remaining copy time equals the elapsed copy time, which - // is equal to `elapsedTime` seconds. - checkMetrics(kTag, elapsedTime + 2 * elapsedTime, OpReportType::CurrentOpReportRecipientRole); - - const auto kOplogEntriesFetched = 4; - const auto kOplogEntriesApplied = 2; - getMetrics()->setRecipientState(RecipientStateEnum::kApplying); - getMetrics()->endCopyingDocuments(getGlobalServiceContext()->getFastClockSource()->now()); - getMetrics()->startApplyingOplogEntries(getGlobalServiceContext()->getFastClockSource()->now()); - getMetrics()->onOplogEntriesFetched(kOplogEntriesFetched); - getMetrics()->onOplogEntriesApplied(kOplogEntriesApplied); - advanceTime(Seconds(elapsedTime)); - // So far, the time to apply oplog entries equals `elapsedTime` seconds. - checkMetrics(kTag, - elapsedTime * (kOplogEntriesFetched / kOplogEntriesApplied - 1), - OpReportType::CurrentOpReportRecipientRole); -} - -TEST_F(ReshardingMetricsTest, CurrentOpReportForDonor) { - const auto kDonorState = DonorStateEnum::kPreparingToBlockWrites; - startOperation(ReshardingMetrics::Role::kDonor); - advanceTime(Seconds(2)); - getMetrics()->setDonorState(kDonorState); - getMetrics()->enterCriticalSection(getGlobalServiceContext()->getFastClockSource()->now()); - advanceTime(Seconds(3)); - - const ReshardingMetrics::ReporterOptions options( - ReshardingMetrics::Role::kDonor, - UUID::parse("12345678-1234-1234-1234-123456789abc").getValue(), - NamespaceString("db", "collection"), - BSON("id" << 1), - true); - - const auto expected = - fromjson(fmt::format("{{ type: \"op\"," - "desc: \"ReshardingDonorService {0}\"," - "op: \"command\"," - "ns: \"{1}\"," - "originatingCommand: {{ reshardCollection: \"{1}\"," - "key: {2}," - "unique: {3}," - "collation: {{ locale: \"simple\" }} }}," - "totalOperationTimeElapsedSecs: 5," - "countWritesDuringCriticalSection: 0," - "totalCriticalSectionTimeElapsedSecs : 3," - "donorState: \"{4}\"," - "opStatus: \"running\" }}", - options.id.toString(), - options.nss.toString(), - options.shardKey.toString(), - options.unique ? "true" : "false", - DonorState_serializer(kDonorState))); - - const auto report = getMetrics()->reportForCurrentOp(options); - ASSERT_BSONOBJ_EQ(expected, report); -} - -TEST_F(ReshardingMetricsTest, CurrentOpReportForRecipient) { - const auto kRecipientState = RecipientStateEnum::kCloning; - - constexpr auto kDocumentsToCopy = 500; - constexpr auto kDocumentsCopied = kDocumentsToCopy * 0.5; - static_assert(kDocumentsToCopy >= kDocumentsCopied); - - constexpr auto kBytesToCopy = 8192; - constexpr auto kBytesCopied = kBytesToCopy * 0.5; - static_assert(kBytesToCopy >= kBytesCopied); - - constexpr auto kDelayBeforeCloning = Seconds(2); - startOperation(ReshardingMetrics::Role::kRecipient); - advanceTime(kDelayBeforeCloning); - - constexpr auto kTimeSpentCloning = Seconds(3); - getMetrics()->setRecipientState(RecipientStateEnum::kCreatingCollection); - getMetrics()->setDocumentsToCopy(kDocumentsToCopy, kBytesToCopy); - getMetrics()->setRecipientState(kRecipientState); - getMetrics()->startCopyingDocuments(getGlobalServiceContext()->getFastClockSource()->now()); - advanceTime(kTimeSpentCloning); - getMetrics()->onDocumentsCopied(kDocumentsCopied, kBytesCopied); - - const auto kTimeToCopyRemainingSeconds = - durationCount(kTimeSpentCloning) * (kBytesToCopy / kBytesCopied - 1); - const auto kRemainingOperationTimeSeconds = - durationCount(kTimeSpentCloning) + 2 * kTimeToCopyRemainingSeconds; - - const ReshardingMetrics::ReporterOptions options( - ReshardingMetrics::Role::kRecipient, - UUID::parse("12345678-1234-1234-1234-123456789def").getValue(), - NamespaceString("db", "collection"), - BSON("id" << 1), - false); - - BSONObj expectedPrefix = - fromjson(fmt::format("{{ type: \"op\"," - "desc: \"ReshardingRecipientService {0}\"," - "op: \"command\"," - "ns: \"{1}\"," - "originatingCommand: {{ reshardCollection: \"{1}\"," - "key: {2}," - "unique: {3}," - "collation: {{ locale: \"simple\" }} }}," - "totalOperationTimeElapsedSecs: {4}," - "remainingOperationTimeEstimatedSecs: {5}," - "approxDocumentsToCopy: {6}," - "documentsCopied: {7}," - "approxBytesToCopy: {8}," - "bytesCopied: {9}," - "totalCopyTimeElapsedSecs: {10}," - "oplogEntriesFetched: 0," - "oplogEntriesApplied: 0," - "totalApplyTimeElapsedSecs: 0," - "recipientState: \"{11}\"," - "opStatus: \"running\" }}", - options.id.toString(), - options.nss.toString(), - options.shardKey.toString(), - options.unique ? "true" : "false", - durationCount(kDelayBeforeCloning + kTimeSpentCloning), - kRemainingOperationTimeSeconds, - kDocumentsToCopy, - kDocumentsCopied, - kBytesToCopy, - kBytesCopied, - durationCount(kTimeSpentCloning), - RecipientState_serializer(kRecipientState))); - - BSONObjBuilder expectedBuilder(std::move(expectedPrefix)); - - // Append histogram latency data. - appendExpectedHistogramResult(&expectedBuilder, kOplogApplierApplyBatchLatencyMillis, {}); - appendExpectedHistogramResult(&expectedBuilder, kCollClonerFillBatchForInsertLatencyMillis, {}); - - BSONObj expected = expectedBuilder.done(); - - const auto report = getMetrics()->reportForCurrentOp(options); - ASSERT_BSONOBJ_EQ(expected, report); -} - -TEST_F(ReshardingMetricsTest, TestHistogramMetricsForRecipient) { - const std::vector applyLatencies_1{3, 427, 0, 6004, 320, 10056, 12300, 105, 70}; - const std::vector applyLatencies_2{800, 20, 5, 1025, 10567}; - const std::vector insertLatencies_1{120, 7, 110, 50, 0, 16500, 77000, 667, 7980}; - const std::vector insertLatencies_2{12450, 2400, 760, 57, 2}; - - const auto combineLatencies = [](std::vector* allLatencies, - const std::vector& latencies_1, - const std::vector& latencies_2) { - allLatencies->insert(allLatencies->end(), latencies_1.begin(), latencies_1.end()); - allLatencies->insert(allLatencies->end(), latencies_2.begin(), latencies_2.end()); - }; - - std::vector allApplyLatencies; - combineLatencies(&allApplyLatencies, applyLatencies_1, applyLatencies_2); - std::vector allInsertLatencies; - combineLatencies(&allInsertLatencies, insertLatencies_1, insertLatencies_2); - - - const ReshardingMetrics::ReporterOptions options( - ReshardingMetrics::Role::kRecipient, - UUID::parse("12345678-1234-1234-1234-123456789def").getValue(), - NamespaceString("db", "collection"), - BSON("id" << 1), - false); - - // Test that all histogram metrics appear in both currentOp and cumulativeOp. - const size_t kNumTests = 4; - std::vector testLatencies[kNumTests] = { - applyLatencies_1, applyLatencies_2, insertLatencies_1, insertLatencies_2}; - std::vector expectedLatencies[kNumTests] = { - applyLatencies_1, allApplyLatencies, insertLatencies_1, allInsertLatencies}; - OpReportType testReportTypes[kNumTests] = {OpReportType::CurrentOpReportRecipientRole, - OpReportType::CumulativeReport, - OpReportType::CurrentOpReportRecipientRole, - OpReportType::CumulativeReport}; - std::string histogramTag[kNumTests] = {kOplogApplierApplyBatchLatencyMillis, - kOplogApplierApplyBatchLatencyMillis, - kCollClonerFillBatchForInsertLatencyMillis, - kCollClonerFillBatchForInsertLatencyMillis}; - - auto testLatencyHistogram = [&](std::vector latencies, - OpReportType reportType, - std::vector expectedLatencies, - std::string histogramTag) { - LOGV2(57700, - "TestHistogramMetricsForRecipient test case", - "reportType"_attr = reportType, - "histogramTag"_attr = histogramTag); - - startOperation(ReshardingMetrics::Role::kRecipient); - - RecipientStateEnum state = (histogramTag.compare(kOplogApplierApplyBatchLatencyMillis) == 0 - ? RecipientStateEnum::kApplying - : RecipientStateEnum::kCloning); - getMetrics()->setRecipientState(state); - - for (size_t i = 0; i < latencies.size(); i++) { - if (histogramTag.compare(kOplogApplierApplyBatchLatencyMillis) == 0) { - getMetrics()->onOplogApplierApplyBatch(Milliseconds(latencies[i])); - } else if (histogramTag.compare(kCollClonerFillBatchForInsertLatencyMillis) == 0) { - getMetrics()->onCollClonerFillBatchForInsert(Milliseconds(latencies[i])); - } else { - MONGO_UNREACHABLE; - } - } - - const auto report = getReport(reportType); - const auto buckets = report[histogramTag]; - - BSONObjBuilder expectedBuilder; - appendExpectedHistogramResult(&expectedBuilder, histogramTag, expectedLatencies); - const auto expectedHist = expectedBuilder.done(); - const auto expectedBuckets = expectedHist[histogramTag]; - - ASSERT_EQ(buckets.woCompare(expectedBuckets), 0); - - completeOperation(ReshardingMetrics::Role::kRecipient, - ReshardingOperationStatusEnum::kFailure); - }; - - for (size_t i = 0; i < kNumTests; i++) { - testLatencyHistogram( - testLatencies[i], testReportTypes[i], expectedLatencies[i], histogramTag[i]); - } -} - -TEST_F(ReshardingMetricsTest, CurrentOpReportForCoordinator) { - const auto kCoordinatorState = CoordinatorStateEnum::kInitializing; - const auto kSomeDuration = Seconds(10); - - startOperation(ReshardingMetrics::Role::kCoordinator); - getMetrics()->setCoordinatorState(kCoordinatorState); - advanceTime(kSomeDuration); - - const ReshardingMetrics::ReporterOptions options( - ReshardingMetrics::Role::kCoordinator, - UUID::parse("12345678-1234-1234-1234-123456789cba").getValue(), - NamespaceString("db", "collection"), - BSON("id" << 1), - false); - - const auto expected = - fromjson(fmt::format("{{ type: \"op\"," - "desc: \"ReshardingCoordinatorService {0}\"," - "op: \"command\"," - "ns: \"{1}\"," - "originatingCommand: {{ reshardCollection: \"{1}\"," - "key: {2}," - "unique: {3}," - "collation: {{ locale: \"simple\" }} }}," - "totalOperationTimeElapsedSecs: {4}," - "coordinatorState: \"{5}\"," - "opStatus: \"running\" }}", - options.id.toString(), - options.nss.toString(), - options.shardKey.toString(), - options.unique ? "true" : "false", - durationCount(kSomeDuration), - CoordinatorState_serializer(kCoordinatorState))); - - const auto report = getMetrics()->reportForCurrentOp(options); - ASSERT_BSONOBJ_EQ(expected, report); -} - -TEST_F(ReshardingMetricsTest, EstimatedRemainingOperationTimeCloning) { - // Copy N docs @ timePerDoc. Check the progression of the estimated time remaining. - auto m = getMetrics(); - m->onStart(ReshardingMetrics::Role::kRecipient, - getGlobalServiceContext()->getFastClockSource()->now()); - auto timePerDocument = Seconds(2); - int64_t bytesPerDocument = 1024; - int64_t documentsToCopy = 409; - int64_t bytesToCopy = bytesPerDocument * documentsToCopy; - m->setRecipientState(RecipientStateEnum::kCreatingCollection); - m->setDocumentsToCopy(documentsToCopy, bytesToCopy); - m->setRecipientState(RecipientStateEnum::kCloning); - m->startCopyingDocuments(getGlobalServiceContext()->getFastClockSource()->now()); - auto remainingTime = 2 * timePerDocument * documentsToCopy; - double maxAbsRelErr = 0; - for (int64_t copied = 0; copied < documentsToCopy; ++copied) { - double output = - getReport(OpReportType::CurrentOpReportRecipientRole)[kOpTimeRemaining].Number(); - if (copied == 0) { - ASSERT_EQ(output, -1); - } else { - ASSERT_GTE(output, 0); - auto expected = durationCount(remainingTime); - // Check that error is pretty small (it should get better as the operation progresses) - double absRelErr = std::abs((output - expected) / expected); - ASSERT_LT(absRelErr, 0.05) - << "output={}, expected={}, copied={}"_format(output, expected, copied); - maxAbsRelErr = std::max(maxAbsRelErr, absRelErr); - } - m->onDocumentsCopied(1, bytesPerDocument); - advanceTime(timePerDocument); - remainingTime -= timePerDocument; - } - LOGV2_DEBUG( - 5422700, 3, "Max absolute relative error observed", "maxAbsRelErr"_attr = maxAbsRelErr); -} - -TEST_F(ReshardingMetricsTest, EstimatedRemainingOperationTimeApplying) { - // Perform N ops @ timePerOp. Check the progression of the estimated time remaining. - auto m = getMetrics(); - m->onStart(ReshardingMetrics::Role::kRecipient, - getGlobalServiceContext()->getFastClockSource()->now()); - m->setRecipientState(RecipientStateEnum::kApplying); - m->startApplyingOplogEntries(getGlobalServiceContext()->getFastClockSource()->now()); - - // 1 extra millisecond here because otherwise an error of just 1ms will round this down to the - // next second. - auto timePerOp = Milliseconds(1001); - int64_t fetched = 10000; - m->onOplogEntriesFetched(fetched); - auto remainingTime = timePerOp * fetched; - double maxAbsRelErr = 0; - for (int64_t applied = 0; applied < fetched; ++applied) { - double output = - getReport(OpReportType::CurrentOpReportRecipientRole)[kOpTimeRemaining].Number(); - if (applied == 0) { - ASSERT_EQ(output, -1); - } else { - auto expected = durationCount(remainingTime); - // Check that error is pretty small (it should get better as the operation progresses) - double absRelErr = std::abs((output - expected) / expected); - ASSERT_LT(absRelErr, 0.05) - << "output={}, expected={}, applied={}"_format(output, expected, applied); - maxAbsRelErr = std::max(maxAbsRelErr, absRelErr); - } - advanceTime(timePerOp); - m->onOplogEntriesApplied(1); - remainingTime -= timePerOp; - } - LOGV2_DEBUG( - 5422701, 3, "Max absolute relative error observed", "maxAbsRelErr"_attr = maxAbsRelErr); -} - -TEST_F(ReshardingMetricsTest, CumulativeOpMetricsAccumulate) { - auto constexpr kTag = "documentsCopied"; - startOperation(ReshardingMetrics::Role::kRecipient); - const auto kDocumentsToCopy1 = 2; - const auto kBytesToCopy1 = 200; - - getMetrics()->setRecipientState(RecipientStateEnum::kCloning); - getMetrics()->onDocumentsCopied(kDocumentsToCopy1, kBytesToCopy1); - completeOperation(ReshardingMetrics::Role::kRecipient, ReshardingOperationStatusEnum::kFailure); - - startOperation(ReshardingMetrics::Role::kRecipient); - const auto kDocumentsToCopy2 = 3; - const auto kBytesToCopy2 = 400; - - getMetrics()->setRecipientState(RecipientStateEnum::kCloning); - getMetrics()->onDocumentsCopied(kDocumentsToCopy2, kBytesToCopy2); - completeOperation(ReshardingMetrics::Role::kRecipient, ReshardingOperationStatusEnum::kFailure); - - checkMetrics(kTag, - kDocumentsToCopy1 + kDocumentsToCopy2, - "Cumulative metrics are not accumulated", - OpReportType::CumulativeReport); -} - -TEST_F(ReshardingMetricsTest, TestWasReshardingEverAttemptedStartComplete) { - ASSERT_FALSE(getWasReshardingEverAttempted()); - - startOperation(ReshardingMetrics::Role::kRecipient); - ASSERT_TRUE(getWasReshardingEverAttempted()); - - completeOperation(ReshardingMetrics::Role::kRecipient, ReshardingOperationStatusEnum::kSuccess); - ASSERT_TRUE(getWasReshardingEverAttempted()); -} - -TEST_F(ReshardingMetricsTest, TestWasReshardingEverAttemptedStartStepDownStepUp) { - ASSERT_FALSE(getWasReshardingEverAttempted()); - - startOperation(ReshardingMetrics::Role::kRecipient); - ASSERT_TRUE(getWasReshardingEverAttempted()); - - stepDownOperation(ReshardingMetrics::Role::kRecipient); - ASSERT_TRUE(getWasReshardingEverAttempted()); - - stepUpOperation(ReshardingMetrics::Role::kRecipient); - ASSERT_TRUE(getWasReshardingEverAttempted()); -} - -TEST_F(ReshardingMetricsTest, TestWasReshardingEverAttemptedStepUpStepDown) { - ASSERT_FALSE(getWasReshardingEverAttempted()); - - stepUpOperation(ReshardingMetrics::Role::kRecipient); - ASSERT_TRUE(getWasReshardingEverAttempted()); - - stepDownOperation(ReshardingMetrics::Role::kRecipient); - ASSERT_TRUE(getWasReshardingEverAttempted()); -} - -TEST_F(ReshardingMetricsTest, TestWasReshardingEverAttemptedStepUpComplete) { - ASSERT_FALSE(getWasReshardingEverAttempted()); - - stepUpOperation(ReshardingMetrics::Role::kRecipient); - ASSERT_TRUE(getWasReshardingEverAttempted()); - - completeOperation(ReshardingMetrics::Role::kRecipient, ReshardingOperationStatusEnum::kSuccess); - ASSERT_TRUE(getWasReshardingEverAttempted()); -} - -TEST_F(ReshardingMetricsTest, TestOnStepUpWithDonorMetrics) { - ASSERT_FALSE(getWasReshardingEverAttempted()); - - getMetrics()->onStepUp(DonorStateEnum::kUnused, ReshardingDonorMetrics()); - - ASSERT_TRUE(getWasReshardingEverAttempted()); -} - -} // namespace -} // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_oplog_application.cpp b/src/mongo/db/s/resharding/resharding_oplog_application.cpp index c2d013dfbc2..1478a3ec30c 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_application.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_application.cpp @@ -45,7 +45,6 @@ #include "mongo/db/query/plan_executor.h" #include "mongo/db/repl/oplog_applier_utils.h" #include "mongo/db/s/operation_sharding_state.h" -#include "mongo/db/s/resharding/resharding_metrics.h" #include "mongo/db/s/resharding/resharding_server_parameters_gen.h" #include "mongo/db/session_catalog_mongod.h" #include "mongo/db/stats/counters.h" @@ -127,7 +126,6 @@ ReshardingOplogApplicationRules::ReshardingOplogApplicationRules( size_t myStashIdx, ShardId donorShardId, ChunkManager sourceChunkMgr, - ReshardingMetrics* metrics, ReshardingOplogApplierMetrics* applierMetrics) : _outputNss(std::move(outputNss)), _allStashNss(std::move(allStashNss)), @@ -135,7 +133,6 @@ ReshardingOplogApplicationRules::ReshardingOplogApplicationRules( _myStashNss(_allStashNss.at(_myStashIdx)), _donorShardId(std::move(donorShardId)), _sourceChunkMgr(std::move(sourceChunkMgr)), - _metrics(metrics), _applierMetrics(applierMetrics) {} Status ReshardingOplogApplicationRules::applyOperation(OperationContext* opCtx, @@ -176,23 +173,18 @@ Status ReshardingOplogApplicationRules::applyOperation(OperationContext* opCtx, case repl::OpTypeEnum::kInsert: _applyInsert_inlock( opCtx, autoCollOutput.getDb(), *autoCollOutput, *autoCollStash, op); - if (ShardingDataTransformMetrics::isEnabled()) { - _applierMetrics->onInsertApplied(); - } + _applierMetrics->onInsertApplied(); + break; case repl::OpTypeEnum::kUpdate: _applyUpdate_inlock( opCtx, autoCollOutput.getDb(), *autoCollOutput, *autoCollStash, op); - if (ShardingDataTransformMetrics::isEnabled()) { - _applierMetrics->onUpdateApplied(); - } + _applierMetrics->onUpdateApplied(); break; case repl::OpTypeEnum::kDelete: _applyDelete_inlock( opCtx, autoCollOutput.getDb(), *autoCollOutput, *autoCollStash, op); - if (ShardingDataTransformMetrics::isEnabled()) { - _applierMetrics->onDeleteApplied(); - } + _applierMetrics->onDeleteApplied(); break; default: MONGO_UNREACHABLE; @@ -245,7 +237,6 @@ void ReshardingOplogApplicationRules::_applyInsert_inlock(OperationContext* opCt * 4. If there exists a document with _id == [op _id] in the output collection and it is NOT * owned by this donor shard, insert the contents of 'op' into the conflict stash collection. */ - _metrics->gotInsert(); BSONObj oField = op.getObject(); @@ -273,9 +264,7 @@ void ReshardingOplogApplicationRules::_applyInsert_inlock(OperationContext* opCt UpdateResult ur = update(opCtx, db, request); invariant(ur.numMatched != 0); - if (ShardingDataTransformMetrics::isEnabled()) { - _applierMetrics->onWriteToStashCollections(); - } + _applierMetrics->onWriteToStashCollections(); return; } @@ -318,9 +307,7 @@ void ReshardingOplogApplicationRules::_applyInsert_inlock(OperationContext* opCt uassertStatusOK(stashColl->insertDocument( opCtx, InsertStatement(oField), nullptr /* nullOpDebug */, false /* fromMigrate */)); - if (ShardingDataTransformMetrics::isEnabled()) { - _applierMetrics->onWriteToStashCollections(); - } + _applierMetrics->onWriteToStashCollections(); } void ReshardingOplogApplicationRules::_applyUpdate_inlock(OperationContext* opCtx, @@ -342,7 +329,6 @@ void ReshardingOplogApplicationRules::_applyUpdate_inlock(OperationContext* opCt * 4. If there exists a document with _id == [op _id] in the output collection and it is owned * by this donor shard, update the document from this collection. */ - _metrics->gotUpdate(); BSONObj oField = op.getObject(); BSONObj o2Field; @@ -374,9 +360,7 @@ void ReshardingOplogApplicationRules::_applyUpdate_inlock(OperationContext* opCt invariant(ur.numMatched != 0); - if (ShardingDataTransformMetrics::isEnabled()) { - _applierMetrics->onWriteToStashCollections(); - } + _applierMetrics->onWriteToStashCollections(); return; } @@ -431,7 +415,6 @@ void ReshardingOplogApplicationRules::_applyDelete_inlock(OperationContext* opCt * _id == [op _id] arbitrarily from among all resharding conflict stash collections to delete * from that resharding conflict stash collection and insert into the output collection. */ - _metrics->gotDelete(); BSONObj oField = op.getObject(); @@ -452,9 +435,7 @@ void ReshardingOplogApplicationRules::_applyDelete_inlock(OperationContext* opCt auto nDeleted = deleteObjects(opCtx, stashColl, _myStashNss, idQuery, true /* justOne */); invariant(nDeleted != 0); - if (ShardingDataTransformMetrics::isEnabled()) { - _applierMetrics->onWriteToStashCollections(); - } + _applierMetrics->onWriteToStashCollections(); return; } @@ -540,9 +521,7 @@ void ReshardingOplogApplicationRules::_applyDelete_inlock(OperationContext* opCt BSONObj res; auto state = exec->getNext(&res, nullptr); - if (ShardingDataTransformMetrics::isEnabled()) { - _applierMetrics->onWriteToStashCollections(); - } + _applierMetrics->onWriteToStashCollections(); if (PlanExecutor::ADVANCED == state) { // We matched a document and deleted it, so break. diff --git a/src/mongo/db/s/resharding/resharding_oplog_application.h b/src/mongo/db/s/resharding/resharding_oplog_application.h index 5052627c81b..b8bd3942b40 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_application.h +++ b/src/mongo/db/s/resharding/resharding_oplog_application.h @@ -41,7 +41,6 @@ #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/replication_coordinator.h" -#include "mongo/db/s/resharding/resharding_metrics.h" #include "mongo/db/s/resharding/resharding_oplog_applier_metrics.h" #include "mongo/s/chunk_manager.h" @@ -61,7 +60,6 @@ public: size_t myStashIdx, ShardId donorShardId, ChunkManager sourceChunkMgr, - ReshardingMetrics* metrics, ReshardingOplogApplierMetrics* applierMetrics); const NamespaceString& getOutputNss() const { @@ -121,7 +119,6 @@ private: // The chunk manager for the source namespace and original shard key. const ChunkManager _sourceChunkMgr; - ReshardingMetrics* _metrics; ReshardingOplogApplierMetrics* _applierMetrics; }; diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp index 3fafff24f67..cf449c4c00c 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp @@ -36,7 +36,6 @@ #include "mongo/db/s/resharding/resharding_data_copy_util.h" #include "mongo/db/s/resharding/resharding_donor_oplog_iterator.h" #include "mongo/db/s/resharding/resharding_future_util.h" -#include "mongo/db/s/resharding/resharding_metrics.h" #include "mongo/db/s/resharding/resharding_util.h" #include "mongo/logv2/log.h" #include "mongo/s/sharding_feature_flags_gen.h" @@ -70,7 +69,6 @@ ReshardingOplogApplier::ReshardingOplogApplier( myStashIdx, _sourceId.getShardId(), std::move(sourceChunkMgr), - _env->metrics(), _env->applierMetrics()}, _sessionApplication{std::move(oplogBufferNss)}, _batchApplier{_crudApplication, _sessionApplication}, @@ -122,13 +120,9 @@ SemiFuture ReshardingOplogApplier::_applyBatch( return status; }) .onCompletion([this, latencyTimer](Status status) { - _env->metrics()->onOplogApplierApplyBatch( + _env->applierMetrics()->onOplogLocalBatchApplied( duration_cast(latencyTimer.elapsed())); - if (ShardingDataTransformMetrics::isEnabled()) { - _env->applierMetrics()->onOplogLocalBatchApplied( - duration_cast(latencyTimer.elapsed())); - } return status; }) .semi(); @@ -154,10 +148,8 @@ SemiFuture ReshardingOplogApplier::run( .then([this, chainCtx, executor, cancelToken, factory](OplogBatch batch) { LOGV2_DEBUG(5391002, 3, "Starting batch", "batchSize"_attr = batch.size()); - if (ShardingDataTransformMetrics::isEnabled()) { - _env->applierMetrics()->onBatchRetrievedDuringOplogApplying( - duration_cast(chainCtx->fetchTimer.elapsed())); - } + _env->applierMetrics()->onBatchRetrievedDuringOplogApplying( + duration_cast(chainCtx->fetchTimer.elapsed())); _currentBatchToApply = std::move(batch); return _applyBatch(executor, cancelToken, factory); @@ -177,8 +169,10 @@ SemiFuture ReshardingOplogApplier::run( .then([this, factory] { if (_currentBatchToApply.empty()) { // Increment the number of entries applied by 1 in order to account for - // the final oplog entry. - _env->metrics()->onOplogEntriesApplied(1); + // the final oplog entry that the iterator never returns because it's a + // known no-op oplog entry. + _env->applierMetrics()->onOplogEntriesApplied(1); + return false; } @@ -247,29 +241,25 @@ void ReshardingOplogApplier::_clearAppliedOpsAndStoreProgress(OperationContext* BSON(ReshardingOplogApplierProgress::kNumEntriesAppliedFieldName << static_cast(_currentBatchToApply.size()))); - if (ShardingDataTransformMetrics::isEnabled()) { - builder.append("$set", - BSON(ReshardingOplogApplierProgress::kInsertsAppliedFieldName - << _env->applierMetrics()->getInsertsApplied())); - builder.append("$set", - BSON(ReshardingOplogApplierProgress::kUpdatesAppliedFieldName - << _env->applierMetrics()->getUpdatesApplied())); - builder.append("$set", - BSON(ReshardingOplogApplierProgress::kDeletesAppliedFieldName - << _env->applierMetrics()->getDeletesApplied())); - builder.append("$set", - BSON(ReshardingOplogApplierProgress::kWritesToStashCollectionsFieldName - << _env->applierMetrics()->getWritesToStashCollections())); - } + builder.append("$set", + BSON(ReshardingOplogApplierProgress::kInsertsAppliedFieldName + << _env->applierMetrics()->getInsertsApplied())); + builder.append("$set", + BSON(ReshardingOplogApplierProgress::kUpdatesAppliedFieldName + << _env->applierMetrics()->getUpdatesApplied())); + builder.append("$set", + BSON(ReshardingOplogApplierProgress::kDeletesAppliedFieldName + << _env->applierMetrics()->getDeletesApplied())); + builder.append("$set", + BSON(ReshardingOplogApplierProgress::kWritesToStashCollectionsFieldName + << _env->applierMetrics()->getWritesToStashCollections())); store.upsert( opCtx, BSON(ReshardingOplogApplierProgress::kOplogSourceIdFieldName << _sourceId.toBSON()), builder.obj()); - _env->metrics()->onOplogEntriesApplied(_currentBatchToApply.size()); - if (ShardingDataTransformMetrics::isEnabled()) { - _env->applierMetrics()->onOplogEntriesApplied(_currentBatchToApply.size()); - } + + _env->applierMetrics()->onOplogEntriesApplied(_currentBatchToApply.size()); _currentBatchToApply.clear(); _currentDerivedOpsForCrudWriters.clear(); diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.h b/src/mongo/db/s/resharding/resharding_oplog_applier.h index cee7ce3af0a..56a7e9d3a0a 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_applier.h +++ b/src/mongo/db/s/resharding/resharding_oplog_applier.h @@ -48,7 +48,6 @@ namespace mongo { -class ReshardingMetrics; class ServiceContext; /** @@ -67,17 +66,12 @@ class ReshardingOplogApplier { public: class Env { public: - Env(ServiceContext* service, - ReshardingMetrics* metrics, - ReshardingOplogApplierMetrics* applierMetrics) - : _service(service), _metrics(metrics), _applierMetrics(applierMetrics) {} + Env(ServiceContext* service, ReshardingOplogApplierMetrics* applierMetrics) + : _service(service), _applierMetrics(applierMetrics) {} ServiceContext* service() const { return _service; } - ReshardingMetrics* metrics() const { - return _metrics; - } ReshardingOplogApplierMetrics* applierMetrics() { return _applierMetrics; @@ -85,7 +79,6 @@ public: private: ServiceContext* _service; - ReshardingMetrics* _metrics; ReshardingOplogApplierMetrics* _applierMetrics; }; diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp index c6772c93d0b..0e3f5a87504 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp @@ -44,7 +44,6 @@ #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/resharding/resharding_donor_oplog_iterator.h" -#include "mongo/db/s/resharding/resharding_metrics.h" #include "mongo/db/s/resharding/resharding_oplog_applier.h" #include "mongo/db/s/resharding/resharding_server_parameters_gen.h" #include "mongo/db/s/resharding/resharding_util.h" @@ -159,8 +158,7 @@ public: _cm = createChunkManagerForOriginalColl(); - _metrics = std::make_unique(getServiceContext()); - _metricsNew = + _metrics = ReshardingMetricsNew::makeInstance(kCrudUUID, BSON("y" << 1), kCrudNs, @@ -168,10 +166,7 @@ public: getServiceContext()->getFastClockSource()->now(), getServiceContext()); _applierMetrics = - std::make_unique(_metricsNew.get(), boost::none); - _metrics->onStart(ReshardingMetrics::Role::kRecipient, - getServiceContext()->getFastClockSource()->now()); - _metrics->setRecipientState(RecipientStateEnum::kApplying); + std::make_unique(_metrics.get(), boost::none); _executor = makeTaskExecutorForApplier(); _executor->startup(); @@ -292,15 +287,12 @@ public: } BSONObj getMetricsOpCounters() { - BSONObjBuilder bob; - _metrics->serializeCumulativeOpMetrics(&bob); - return bob.obj().getObjectField("opcounters").getOwned(); + return _metrics->reportForCurrentOp(); } long long metricsAppliedCount() const { - BSONObjBuilder bob; - _metrics->serializeCurrentOpMetrics(&bob, ReshardingMetrics::Role::kRecipient); - return bob.obj()["oplogEntriesApplied"_sd].Long(); + auto fullCurOp = _metrics->reportForCurrentOp(); + return fullCurOp["oplogEntriesApplied"_sd].Long(); } std::shared_ptr getExecutor() { @@ -314,8 +306,8 @@ public: protected: auto makeApplierEnv() { - return std::make_unique( - getServiceContext(), _metrics.get(), _applierMetrics.get()); + return std::make_unique(getServiceContext(), + _applierMetrics.get()); } std::shared_ptr makeTaskExecutorForApplier() { @@ -371,8 +363,7 @@ protected: boost::optional _cm; const ReshardingSourceId _sourceId{UUID::gen(), kMyShardId}; - std::unique_ptr _metrics; - std::unique_ptr _metricsNew; + std::unique_ptr _metrics; std::unique_ptr _applierMetrics; std::shared_ptr _executor; @@ -869,9 +860,9 @@ TEST_F(ReshardingOplogApplierTest, MetricsAreReported) { ASSERT_OK(future.getNoThrow()); auto opCountersObj = getMetricsOpCounters(); - ASSERT_EQ(opCountersObj.getIntField("insert"), 2); - ASSERT_EQ(opCountersObj.getIntField("update"), 1); - ASSERT_EQ(opCountersObj.getIntField("delete"), 2); + ASSERT_EQ(opCountersObj.getIntField("insertsApplied"), 2); + ASSERT_EQ(opCountersObj.getIntField("updatesApplied"), 1); + ASSERT_EQ(opCountersObj.getIntField("deletesApplied"), 2); // The in-memory metrics should show the 5 ops above + the final oplog entry, but on disk should // not include the final entry in its count. diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp index 7e4c2b6c22f..f8af8d80998 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp @@ -46,7 +46,7 @@ #include "mongo/db/repl/storage_interface_impl.h" #include "mongo/db/s/op_observer_sharding_impl.h" #include "mongo/db/s/resharding/resharding_data_copy_util.h" -#include "mongo/db/s/resharding/resharding_metrics.h" +#include "mongo/db/s/resharding/resharding_metrics_new.h" #include "mongo/db/s/resharding/resharding_oplog_application.h" #include "mongo/db/s/resharding/resharding_oplog_batch_applier.h" #include "mongo/db/s/resharding/resharding_oplog_session_application.h" @@ -57,6 +57,7 @@ #include "mongo/db/vector_clock_metadata_hook.h" #include "mongo/executor/network_interface_factory.h" #include "mongo/executor/thread_pool_task_executor.h" +#include "mongo/idl/server_parameter_test_util.h" #include "mongo/rpc/metadata/egress_metadata_hook_list.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/chunk_manager.h" @@ -110,7 +111,6 @@ public: opCtx.get(), nss, CollectionOptions{}); } - _metrics = std::make_unique(serviceContext); _metricsNew = ReshardingMetricsNew::makeInstance(UUID::gen(), BSON("y" << 1), @@ -126,7 +126,6 @@ public: 0U, _myDonorId, makeChunkManagerForSourceCollection(), - _metrics.get(), _applierMetrics.get()); _sessionApplication = @@ -363,7 +362,6 @@ private: getLocalConflictStashNamespace(_sourceUUID, _otherDonorId); const NamespaceString _myOplogBufferNss = getLocalOplogBufferNamespace(_sourceUUID, _myDonorId); - std::unique_ptr _metrics; std::unique_ptr _metricsNew; std::unique_ptr _applierMetrics; diff --git a/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp index 96282217dad..4114100a5bc 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp @@ -47,7 +47,7 @@ #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/op_observer_sharding_impl.h" #include "mongo/db/s/resharding/resharding_data_copy_util.h" -#include "mongo/db/s/resharding/resharding_metrics.h" +#include "mongo/db/s/resharding/resharding_metrics_new.h" #include "mongo/db/s/resharding/resharding_oplog_application.h" #include "mongo/db/s/resharding/resharding_util.h" #include "mongo/db/s/sharding_state.h" @@ -112,7 +112,6 @@ public: CollectionMetadata(makeChunkManagerForOutputCollection(), _myDonorId)); } - _metrics = std::make_unique(getServiceContext()); _metricsNew = ReshardingMetricsNew::makeInstance(_sourceUUID, BSON(_newShardKey << 1), @@ -128,7 +127,6 @@ public: 0U, _myDonorId, makeChunkManagerForSourceCollection(), - _metrics.get(), _oplogApplierMetrics.get()); } } @@ -343,7 +341,6 @@ private: getLocalConflictStashNamespace(_sourceUUID, _otherDonorId); std::unique_ptr _applier; - std::unique_ptr _metrics; std::unique_ptr _metricsNew; std::unique_ptr _oplogApplierMetrics; }; diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp index 94071745f24..41f87420e70 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp @@ -45,7 +45,6 @@ #include "mongo/db/pipeline/aggregate_command_gen.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/db/repl/read_concern_level.h" -#include "mongo/db/s/resharding/resharding_metrics.h" #include "mongo/db/s/resharding/resharding_metrics_new.h" #include "mongo/db/s/resharding/resharding_util.h" #include "mongo/db/s/sharding_data_transform_cumulative_metrics.h" @@ -327,10 +326,8 @@ bool ReshardingOplogFetcher::consume(Client* client, [this, &batchesProcessed, &moreToCome, &opCtxRaii, &batchFetchTimer, factory]( const std::vector& batch, const boost::optional& postBatchResumeToken) { - if (ShardingDataTransformMetrics::isEnabled()) { - _env->metricsNew()->onOplogEntriesFetched(batch.size(), - Milliseconds(batchFetchTimer.millis())); - } + _env->metricsNew()->onOplogEntriesFetched(batch.size(), + Milliseconds(batchFetchTimer.millis())); ThreadClient client(fmt::format("ReshardingFetcher-{}-{}", _reshardingUUID.toString(), @@ -357,15 +354,11 @@ bool ReshardingOplogFetcher::consume(Client* client, uassertStatusOK(toWriteTo->insertDocument(opCtx, InsertStatement{doc}, nullptr)); wuow.commit(); - if (ShardingDataTransformMetrics::isEnabled()) { - _env->metricsNew()->onLocalInsertDuringOplogFetching( - Milliseconds(insertTimer.millis())); - } + _env->metricsNew()->onLocalInsertDuringOplogFetching( + Milliseconds(insertTimer.millis())); ++_numOplogEntriesCopied; - _env->metrics()->onOplogEntriesFetched(1); - auto [p, f] = makePromiseFuture(); { stdx::lock_guard lk(_mutex); @@ -409,10 +402,7 @@ bool ReshardingOplogFetcher::consume(Client* client, // Also include synthetic oplog in the fetched count so it can match up with the // total oplog applied count in the end. - _env->metrics()->onOplogEntriesFetched(1); - if (ShardingDataTransformMetrics::isEnabled()) { - _env->metricsNew()->onOplogEntriesFetched(1, Milliseconds(0)); - } + _env->metricsNew()->onOplogEntriesFetched(1, Milliseconds(0)); auto [p, f] = makePromiseFuture(); { diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.h b/src/mongo/db/s/resharding/resharding_oplog_fetcher.h index d0d53395647..5772c6bdfaa 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_fetcher.h +++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.h @@ -50,28 +50,24 @@ namespace mongo { -class ReshardingMetrics; class ReshardingMetricsNew; class ReshardingOplogFetcher : public resharding::OnInsertAwaitable { public: class Env { public: - Env(ServiceContext* service, ReshardingMetrics* metrics, ReshardingMetricsNew* metricsNew) - : _service(service), _metrics(metrics), _metricsNew(metricsNew) {} + Env(ServiceContext* service, ReshardingMetricsNew* metricsNew) + : _service(service), _metricsNew(metricsNew) {} ServiceContext* service() const { return _service; } - ReshardingMetrics* metrics() const { - return _metrics; - } + ReshardingMetricsNew* metricsNew() const { return _metricsNew; } private: ServiceContext* _service; - ReshardingMetrics* _metrics; ReshardingMetricsNew* _metricsNew; }; diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp index 118778f0b90..17624acced9 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp @@ -45,7 +45,6 @@ #include "mongo/db/repl/storage_interface_impl.h" #include "mongo/db/repl/wait_for_majority_service.h" #include "mongo/db/s/operation_sharding_state.h" -#include "mongo/db/s/resharding/resharding_metrics.h" #include "mongo/db/s/resharding/resharding_metrics_new.h" #include "mongo/db/s/resharding/resharding_oplog_fetcher.h" #include "mongo/db/s/resharding/resharding_util.h" @@ -99,12 +98,7 @@ public: OldClientContext ctx(_opCtx, NamespaceString::kRsOplogNamespace.ns()); } - // Initialize ReshardingMetrics to a recipient state compatible with fetching. - _metrics = std::make_unique(_svcCtx); - _metrics->onStart(ReshardingMetrics::Role::kRecipient, - _svcCtx->getFastClockSource()->now()); - _metrics->setRecipientState(RecipientStateEnum::kCloning); - _metricsNew = + _metrics = ReshardingMetricsNew::makeInstance(_reshardingUUID, BSON("y" << 1), NamespaceString{""}, @@ -138,8 +132,7 @@ public: } auto makeFetcherEnv() { - return std::make_unique( - _svcCtx, _metrics.get(), _metricsNew.get()); + return std::make_unique(_svcCtx, _metrics.get()); } /** @@ -324,9 +317,8 @@ public: } long long metricsFetchedCount() const { - BSONObjBuilder bob; - _metrics->serializeCurrentOpMetrics(&bob, ReshardingMetrics::Role::kRecipient); - return bob.obj()["oplogEntriesFetched"_sd].Long(); + auto curOp = _metrics->reportForCurrentOp(); + return curOp["oplogEntriesFetched"_sd].Long(); } CancelableOperationContextFactory makeCancelableOpCtx() { @@ -351,8 +343,7 @@ protected: Timestamp _fetchTimestamp; ShardId _donorShard; ShardId _destinationShard; - std::unique_ptr _metrics; - std::unique_ptr _metricsNew; + std::unique_ptr _metrics; private: static HostAndPort makeHostAndPort(const ShardId& shardId) { diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp index fc225f6f801..baaf64fc5e3 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp @@ -51,7 +51,6 @@ #include "mongo/db/s/resharding/resharding_change_event_o2_field_gen.h" #include "mongo/db/s/resharding/resharding_data_copy_util.h" #include "mongo/db/s/resharding/resharding_future_util.h" -#include "mongo/db/s/resharding/resharding_metrics.h" #include "mongo/db/s/resharding/resharding_metrics_helpers.h" #include "mongo/db/s/resharding/resharding_oplog_applier.h" #include "mongo/db/s/resharding/resharding_recipient_service_external_state.h" @@ -163,41 +162,8 @@ void buildStateDocumentMetricsForUpdate(BSONObjBuilder& bob, } } -ShardingDataTransformCumulativeMetrics::RecipientStateEnum toMetricsState( - RecipientStateEnum enumVal) { - using MetricsEnum = ShardingDataTransformCumulativeMetrics::RecipientStateEnum; - - switch (enumVal) { - case RecipientStateEnum::kUnused: - return MetricsEnum::kUnused; - - case RecipientStateEnum::kAwaitingFetchTimestamp: - return MetricsEnum::kAwaitingFetchTimestamp; - - case RecipientStateEnum::kCreatingCollection: - return MetricsEnum::kCreatingCollection; - - case RecipientStateEnum::kCloning: - return MetricsEnum::kCloning; - - case RecipientStateEnum::kApplying: - return MetricsEnum::kApplying; - - case RecipientStateEnum::kError: - return MetricsEnum::kError; - - case RecipientStateEnum::kStrictConsistency: - return MetricsEnum::kStrictConsistency; - - case RecipientStateEnum::kDone: - return MetricsEnum::kDone; - - default: - invariant(false, - str::stream() << "Unexpected resharding coordinator state: " - << RecipientState_serializer(enumVal)); - MONGO_UNREACHABLE; - } +ReshardingMetricsNew::RecipientState toMetricsState(RecipientStateEnum state) { + return ReshardingMetricsNew::RecipientState(state); } } // namespace @@ -224,10 +190,7 @@ ReshardingRecipientService::RecipientStateMachine::RecipientStateMachine( ReshardingDataReplicationFactory dataReplicationFactory) : repl::PrimaryOnlyService::TypedInstance(), _recipientService{recipientService}, - _metricsNew{ - ShardingDataTransformMetrics::isEnabled() - ? ReshardingMetricsNew::initializeFrom(recipientDoc, getGlobalServiceContext()) - : nullptr}, + _metricsNew{ReshardingMetricsNew::initializeFrom(recipientDoc, getGlobalServiceContext())}, _metadata{recipientDoc.getCommonReshardingMetadata()}, _minimumOperationDuration{Milliseconds{recipientDoc.getMinimumOperationDurationMillis()}}, _recipientCtx{recipientDoc.getMutableState()}, @@ -256,9 +219,7 @@ ReshardingRecipientService::RecipientStateMachine::RecipientStateMachine( }()) { invariant(_externalState); - if (ShardingDataTransformMetrics::isEnabled()) { - _metricsNew->onStateTransition(boost::none, toMetricsState(_recipientCtx.getState())); - } + _metricsNew->onStateTransition(boost::none, toMetricsState(_recipientCtx.getState())); } ExecutorFuture @@ -417,8 +378,6 @@ ExecutorFuture ReshardingRecipientService::RecipientStateMachine::_finishR _metadata.getSourceNss(), _critSecReason, ShardingCatalogClient::kLocalWriteConcern); - - _metrics()->leaveCriticalSection(getCurrentTime()); } }) .then([this, executor, &factory] { @@ -458,15 +417,7 @@ ExecutorFuture ReshardingRecipientService::RecipientStateMachine::_runMand self = shared_from_this(), outerStatus = status, isCanceled = stepdownToken.isCanceled()](Status dataReplicationHaltStatus) { - if (isCanceled) { - // Interrupt occurred, ensure the metrics get shut down. - _metrics()->onStepDown(ReshardingMetrics::Role::kRecipient); - } - - if (ShardingDataTransformMetrics::isEnabled()) { - _metricsNew->onStateTransition(toMetricsState(_recipientCtx.getState()), - boost::none); - } + _metricsNew->onStateTransition(toMetricsState(_recipientCtx.getState()), boost::none); // If the stepdownToken was triggered, it takes priority in order to make sure that // the promise is set with an error that the coordinator can retry with. If it ran into @@ -553,16 +504,7 @@ void ReshardingRecipientService::RecipientStateMachine::interrupt(Status status) boost::optional ReshardingRecipientService::RecipientStateMachine::reportForCurrentOp( MongoProcessInterface::CurrentOpConnectionsMode, MongoProcessInterface::CurrentOpSessionsMode) noexcept { - if (ShardingDataTransformMetrics::isEnabled()) { - return _metricsNew->reportForCurrentOp(); - } - - ReshardingMetrics::ReporterOptions options(ReshardingMetrics::Role::kRecipient, - _metadata.getReshardingUUID(), - _metadata.getSourceNss(), - _metadata.getReshardingKey().toBSON(), - false); - return _metrics()->reportForCurrentOp(options); + return _metricsNew->reportForCurrentOp(); } void ReshardingRecipientService::RecipientStateMachine::onReshardingFieldsChanges( @@ -608,12 +550,8 @@ ExecutorFuture ReshardingRecipientService::RecipientStateMachine:: ReshardingRecipientService::RecipientStateMachine::CloneDetails cloneDetails) { _transitionToCreatingCollection( cloneDetails, (*executor)->now() + _minimumOperationDuration, factory); - _metrics()->setDocumentsToCopy(cloneDetails.approxDocumentsToCopy, - cloneDetails.approxBytesToCopy); - if (ShardingDataTransformMetrics::isEnabled()) { - _metricsNew->setDocumentsToCopyCounts(cloneDetails.approxDocumentsToCopy, - cloneDetails.approxBytesToCopy); - } + _metricsNew->setDocumentsToCopyCounts(cloneDetails.approxDocumentsToCopy, + cloneDetails.approxBytesToCopy); }); } @@ -687,7 +625,6 @@ ReshardingRecipientService::RecipientStateMachine::_makeDataReplication(Operatio } return _dataReplicationFactory(opCtx, - _metrics(), _metricsNew.get(), &_applierMetricsMap, _metadata, @@ -806,8 +743,6 @@ ExecutorFuture ReshardingRecipientService::RecipientStateMachine:: _metadata.getSourceNss(), _critSecReason, ShardingCatalogClient::kLocalWriteConcern); - - _metrics()->enterCriticalSection(getCurrentTime()); } _transitionToStrictConsistency(factory); @@ -911,11 +846,7 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionState( _updateRecipientDocument( std::move(newRecipientCtx), std::move(cloneDetails), std::move(configStartTime), factory); - _metrics()->setRecipientState(newState); - - if (ShardingDataTransformMetrics::isEnabled()) { - _metricsNew->onStateTransition(toMetricsState(oldState), toMetricsState(newState)); - } + _metricsNew->onStateTransition(toMetricsState(oldState), toMetricsState(newState)); LOGV2_INFO(5279506, "Transitioned resharding recipient state", @@ -940,13 +871,10 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionToCreatingCol void ReshardingRecipientService::RecipientStateMachine::_transitionToCloning( const CancelableOperationContextFactory& factory) { - if (ShardingDataTransformMetrics::isEnabled()) { - _metricsNew->onCopyingBegin(); - } + _metricsNew->onCopyingBegin(); auto newRecipientCtx = _recipientCtx; newRecipientCtx.setState(RecipientStateEnum::kCloning); _transitionState(std::move(newRecipientCtx), boost::none, boost::none, factory); - _metrics()->startCopyingDocuments(getCurrentTime()); } void ReshardingRecipientService::RecipientStateMachine::_transitionToApplying( @@ -954,13 +882,9 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionToApplying( auto newRecipientCtx = _recipientCtx; newRecipientCtx.setState(RecipientStateEnum::kApplying); _transitionState(std::move(newRecipientCtx), boost::none, boost::none, factory); - auto currentTime = getCurrentTime(); - _metrics()->endCopyingDocuments(currentTime); - _metrics()->startApplyingOplogEntries(currentTime); - if (ShardingDataTransformMetrics::isEnabled()) { - _metricsNew->onCopyingEnd(); - _metricsNew->onApplyingBegin(); - } + + _metricsNew->onCopyingEnd(); + _metricsNew->onApplyingBegin(); } void ReshardingRecipientService::RecipientStateMachine::_transitionToStrictConsistency( @@ -968,11 +892,8 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionToStrictConsi auto newRecipientCtx = _recipientCtx; newRecipientCtx.setState(RecipientStateEnum::kStrictConsistency); _transitionState(std::move(newRecipientCtx), boost::none, boost::none, factory); - auto currentTime = getCurrentTime(); - _metrics()->endApplyingOplogEntries(currentTime); - if (ShardingDataTransformMetrics::isEnabled()) { - _metricsNew->onApplyingEnd(); - } + + _metricsNew->onApplyingEnd(); } void ReshardingRecipientService::RecipientStateMachine::_transitionToError( @@ -1131,10 +1052,8 @@ void ReshardingRecipientService::RecipientStateMachine::_updateRecipientDocument *configStartTime); } - if (ShardingDataTransformMetrics::isEnabled()) { - buildStateDocumentMetricsForUpdate( - setBuilder, _metricsNew.get(), newRecipientCtx.getState()); - } + buildStateDocumentMetricsForUpdate( + setBuilder, _metricsNew.get(), newRecipientCtx.getState()); setBuilder.doneFast(); } @@ -1175,23 +1094,10 @@ void ReshardingRecipientService::RecipientStateMachine::_removeRecipientDocument WriteUnitOfWork wuow(opCtx.get()); - opCtx->recoveryUnit()->onCommit( - [this, aborted](boost::optional unusedCommitTime) { - stdx::lock_guard lk(_mutex); - if (aborted) { - _metrics()->onCompletion(ReshardingMetrics::Role::kRecipient, - _userCanceled.get() == true - ? ReshardingOperationStatusEnum::kCanceled - : ReshardingOperationStatusEnum::kFailure, - getCurrentTime()); - } else { - _metrics()->onCompletion(ReshardingMetrics::Role::kRecipient, - ReshardingOperationStatusEnum::kSuccess, - getCurrentTime()); - } - - _completionPromise.emplaceValue(); - }); + opCtx->recoveryUnit()->onCommit([this](boost::optional unusedCommitTime) { + stdx::lock_guard lk(_mutex); + _completionPromise.emplaceValue(); + }); deleteObjects(opCtx.get(), *coll, @@ -1204,25 +1110,19 @@ void ReshardingRecipientService::RecipientStateMachine::_removeRecipientDocument }); } -ReshardingMetrics* ReshardingRecipientService::RecipientStateMachine::_metrics() const { - return ReshardingMetrics::get(cc().getServiceContext()); -} - ExecutorFuture ReshardingRecipientService::RecipientStateMachine::_startMetrics( const std::shared_ptr& executor, const CancellationToken& abortToken) { if (_recipientCtx.getState() > RecipientStateEnum::kAwaitingFetchTimestamp) { - _metrics()->onStepUp(ReshardingMetrics::Role::kRecipient); return _restoreMetricsWithRetry(executor, abortToken); } - _metrics()->onStart(ReshardingMetrics::Role::kRecipient, getCurrentTime()); + return ExecutorFuture(**executor); } ExecutorFuture ReshardingRecipientService::RecipientStateMachine::_restoreMetricsWithRetry( const std::shared_ptr& executor, const CancellationToken& abortToken) { - _metrics()->setRecipientState(_recipientCtx.getState()); return _retryingCancelableOpCtxFactory ->withAutomaticRetry( [this, executor, abortToken](const auto& factory) { _restoreMetrics(factory); }) @@ -1250,8 +1150,8 @@ void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics( documentBytesCopied = tempReshardingColl->dataSize(opCtx.get()); documentCountCopied = tempReshardingColl->numRecords(opCtx.get()); } - if (ShardingDataTransformMetrics::isEnabled() && - _recipientCtx.getState() == RecipientStateEnum::kCloning) { + + if (_recipientCtx.getState() == RecipientStateEnum::kCloning) { // Before cloning, these values are 0. After cloning these values are written to the // metrics section of the recipient state document and restored during metrics // initialization. This is so that applied oplog entries that add or remove documents do @@ -1296,9 +1196,7 @@ void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics( } } - if (ShardingDataTransformMetrics::isEnabled()) { - progressDocList.emplace_back(donor.getShardId(), progressDoc); - } + progressDocList.emplace_back(donor.getShardId(), progressDoc); } // Restore stats here where interrupts will never occur, this is to ensure we will only update @@ -1321,12 +1219,8 @@ void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics( _applierMetricsMap.emplace(shardId, std::move(applierMetrics)); } - if (ShardingDataTransformMetrics::isEnabled()) { - _metricsNew->restoreOplogEntriesFetched(oplogEntriesFetched); - } - - _metrics()->restoreForCurrentOp( - documentCountCopied, documentBytesCopied, oplogEntriesFetched, oplogEntriesApplied); + _metricsNew->restoreOplogEntriesFetched(oplogEntriesFetched); + _metricsNew->restoreOplogEntriesApplied(oplogEntriesApplied); } CancellationToken ReshardingRecipientService::RecipientStateMachine::_initAbortSource( diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h index 832e1dd8083..fc41ba0e9ee 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.h +++ b/src/mongo/db/s/resharding/resharding_recipient_service.h @@ -271,8 +271,6 @@ private: const CancellationToken& abortToken, const CancelableOperationContextFactory& factory); - ReshardingMetrics* _metrics() const; - ExecutorFuture _startMetrics( const std::shared_ptr& executor, const CancellationToken& abortToken); diff --git a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp index 50d635016f7..78316aacca7 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp @@ -43,7 +43,6 @@ #include "mongo/db/s/resharding/resharding_change_event_o2_field_gen.h" #include "mongo/db/s/resharding/resharding_data_copy_util.h" #include "mongo/db/s/resharding/resharding_data_replication.h" -#include "mongo/db/s/resharding/resharding_metrics.h" #include "mongo/db/s/resharding/resharding_oplog_applier_progress_gen.h" #include "mongo/db/s/resharding/resharding_recipient_service.h" #include "mongo/db/s/resharding/resharding_recipient_service_external_state.h" @@ -235,11 +234,6 @@ public: return _controller.get(); } - ReshardingMetrics* metrics() { - auto serviceContext = getServiceContext(); - return ReshardingMetrics::get(serviceContext); - } - BSONObj newShardKeyPattern() { return BSON("newKey" << 1); } @@ -262,6 +256,7 @@ public: sourceUUID, constructTemporaryReshardingNss(sourceNss.db(), sourceUUID), newShardKeyPattern()); + commonMetadata.setStartTime(getServiceContext()->getFastClockSource()->now()); doc.setCommonReshardingMetadata(std::move(commonMetadata)); return doc; @@ -511,7 +506,6 @@ DEATH_TEST_REGEX_F(ReshardingRecipientServiceTest, CommitFn, "4457001.*tripwire" } TEST_F(ReshardingRecipientServiceTest, DropsTemporaryReshardingCollectionOnAbort) { - auto metrics = ReshardingRecipientServiceTest::metrics(); for (bool isAlsoDonor : {false, true}) { LOGV2(5551107, "Running case", @@ -571,11 +565,6 @@ TEST_F(ReshardingRecipientServiceTest, DropsTemporaryReshardingCollectionOnAbort ASSERT_FALSE(bool(coll)); } } - - BSONObjBuilder result; - metrics->serializeCumulativeOpMetrics(&result); - - ASSERT_EQ(result.obj().getField("countReshardingFailures").numberLong(), 2); } TEST_F(ReshardingRecipientServiceTest, RenamesTemporaryReshardingCollectionWhenDone) { @@ -707,8 +696,6 @@ TEST_F(ReshardingRecipientServiceTest, WritesNoopOplogEntryForImplicitShardColle } TEST_F(ReshardingRecipientServiceTest, TruncatesXLErrorOnRecipientDocument) { - auto metrics = ReshardingRecipientServiceTest::metrics(); - for (bool isAlsoDonor : {false, true}) { LOGV2(5568600, "Running case", @@ -761,14 +748,9 @@ TEST_F(ReshardingRecipientServiceTest, TruncatesXLErrorOnRecipientDocument) { recipient->abort(false); ASSERT_OK(recipient->getCompletionFuture().getNoThrow()); } - BSONObjBuilder result; - metrics->serializeCumulativeOpMetrics(&result); - - ASSERT_EQ(result.obj().getField("countReshardingFailures").numberLong(), 2); } TEST_F(ReshardingRecipientServiceTest, MetricsSuccessfullyShutDownOnUserCancelation) { - auto metrics = ReshardingRecipientServiceTest::metrics(); auto doc = makeStateDocument(false); auto opCtx = makeOperationContext(); RecipientStateMachine::insertStateDocument(opCtx.get(), doc); @@ -781,11 +763,6 @@ TEST_F(ReshardingRecipientServiceTest, MetricsSuccessfullyShutDownOnUserCancelat recipient->abort(true); ASSERT_OK(recipient->getCompletionFuture().getNoThrow()); - BSONObjBuilder result; - metrics->serializeCumulativeOpMetrics(&result); - BSONObj obj = result.obj(); - ASSERT_EQ(obj.getField("countReshardingCanceled").numberLong(), 1); - ASSERT_EQ(obj.getField("countReshardingFailures").numberLong(), 0); } TEST_F(ReshardingRecipientServiceTest, RestoreMetricsAfterStepUp) { @@ -883,8 +860,9 @@ TEST_F(ReshardingRecipientServiceTest, RestoreMetricsAfterStepUp) { MongoProcessInterface::CurrentOpConnectionsMode::kExcludeIdle, MongoProcessInterface::CurrentOpSessionsMode::kExcludeIdle) .get(); - ASSERT_EQ(currOp.getField("documentsCopied").Long(), 1L); - ASSERT_EQ(currOp.getField("bytesCopied").Long(), (long)reshardedDoc.objsize()); + + ASSERT_EQ(currOp.getField("documentsCopied").numberLong(), 1L); + ASSERT_EQ(currOp.getField("bytesCopied").numberLong(), (long)reshardedDoc.objsize()); ASSERT_EQ(currOp.getStringField("recipientState"), RecipientState_serializer(RecipientStateEnum::kApplying)); } else if (state == RecipientStateEnum::kDone) { @@ -893,11 +871,12 @@ TEST_F(ReshardingRecipientServiceTest, RestoreMetricsAfterStepUp) { MongoProcessInterface::CurrentOpConnectionsMode::kExcludeIdle, MongoProcessInterface::CurrentOpSessionsMode::kExcludeIdle) .get(); - ASSERT_EQ(currOp.getField("documentsCopied").Long(), 1L); - ASSERT_EQ(currOp.getField("bytesCopied").Long(), (long)reshardedDoc.objsize()); - ASSERT_EQ(currOp.getField("oplogEntriesFetched").Long(), + + ASSERT_EQ(currOp.getField("documentsCopied").numberLong(), 1L); + ASSERT_EQ(currOp.getField("bytesCopied").numberLong(), (long)reshardedDoc.objsize()); + ASSERT_EQ(currOp.getField("oplogEntriesFetched").numberLong(), (long)(1 * doc.getDonorShards().size())); - ASSERT_EQ(currOp.getField("oplogEntriesApplied").Long(), + ASSERT_EQ(currOp.getField("oplogEntriesApplied").numberLong(), oplogEntriesAppliedOnEachDonor * doc.getDonorShards().size()); ASSERT_EQ(currOp.getStringField("recipientState"), RecipientState_serializer(RecipientStateEnum::kStrictConsistency)); @@ -922,9 +901,6 @@ TEST_F(ReshardingRecipientServiceTest, RestoreMetricsAfterStepUp) { } TEST_F(ReshardingRecipientServiceTest, RestoreMetricsAfterStepUpWithMissingProgressDoc) { - RAIIServerParameterControllerForTest featureFlagController( - "featureFlagShardingDataTransformMetrics", true); - auto doc = makeStateDocument(false); auto instanceId = BSON(ReshardingRecipientDocument::kReshardingUUIDFieldName << doc.getReshardingUUID()); diff --git a/src/mongo/db/s/resharding/resharding_util.cpp b/src/mongo/db/s/resharding/resharding_util.cpp index ce51b7d25a1..d9a04064d3c 100644 --- a/src/mongo/db/s/resharding/resharding_util.cpp +++ b/src/mongo/db/s/resharding/resharding_util.cpp @@ -48,7 +48,6 @@ #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/resharding/document_source_resharding_add_resume_id.h" #include "mongo/db/s/resharding/document_source_resharding_iterate_transaction.h" -#include "mongo/db/s/resharding/resharding_metrics.h" #include "mongo/db/s/resharding/resharding_metrics_new.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/storage/write_unit_of_work.h" diff --git a/src/mongo/db/s/resharding_test_commands.cpp b/src/mongo/db/s/resharding_test_commands.cpp index ad34b6716ba..61fa835829f 100644 --- a/src/mongo/db/s/resharding_test_commands.cpp +++ b/src/mongo/db/s/resharding_test_commands.cpp @@ -37,7 +37,6 @@ #include "mongo/db/commands.h" #include "mongo/db/operation_context.h" #include "mongo/db/s/resharding/resharding_collection_cloner.h" -#include "mongo/db/s/resharding/resharding_metrics.h" #include "mongo/db/s/resharding/resharding_metrics_new.h" #include "mongo/db/s/resharding_test_commands_gen.h" #include "mongo/db/vector_clock_metadata_hook.h" @@ -80,21 +79,13 @@ public: } }; - ReshardingMetrics metrics(opCtx->getServiceContext()); - metrics.onStart(ReshardingMetrics::Role::kRecipient, - opCtx->getServiceContext()->getFastClockSource()->now()); - metrics.setRecipientState(RecipientStateEnum::kCloning); - - std::unique_ptr metricsNew; - if (ShardingDataTransformMetrics::isEnabled()) { - metricsNew = ReshardingMetricsNew::makeInstance( - request().getUuid(), - request().getShardKey(), - ns(), - ReshardingMetricsNew::Role::kRecipient, - opCtx->getServiceContext()->getFastClockSource()->now(), - opCtx->getServiceContext()); - } + auto metrics = ReshardingMetricsNew::makeInstance( + request().getUuid(), + request().getShardKey(), + ns(), + ReshardingMetricsNew::Role::kRecipient, + opCtx->getServiceContext()->getFastClockSource()->now(), + opCtx->getServiceContext()); auto hookList = std::make_unique(); hookList->addHook( @@ -106,14 +97,13 @@ public: "TestReshardCloneCollectionNetwork", nullptr, std::move(hookList))); executor->startup(); - ReshardingCollectionCloner cloner( - std::make_unique(&metrics, metricsNew.get()), - ShardKeyPattern(request().getShardKey()), - ns(), - request().getUuid(), - request().getShardId(), - request().getAtClusterTime(), - request().getOutputNs()); + ReshardingCollectionCloner cloner(metrics.get(), + ShardKeyPattern(request().getShardKey()), + ns(), + request().getUuid(), + request().getShardId(), + request().getAtClusterTime(), + request().getOutputNs()); std::shared_ptr cancelableOperationContextPool = [] { ThreadPool::Options options; diff --git a/src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp b/src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp index d532488fc79..add2ac6f728 100644 --- a/src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp +++ b/src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp @@ -349,20 +349,16 @@ void ShardingDataTransformCumulativeMetrics::onStarted() { _countStarted.fetchAndAdd(1); } -void ShardingDataTransformCumulativeMetrics::onCompletion(ReshardingOperationStatusEnum status) { - switch (status) { - case ReshardingOperationStatusEnum::kSuccess: - _countSucceeded.fetchAndAdd(1); - break; - case ReshardingOperationStatusEnum::kFailure: - _countFailed.fetchAndAdd(1); - break; - case ReshardingOperationStatusEnum::kCanceled: - _countCancelled.fetchAndAdd(1); - break; - default: - MONGO_UNREACHABLE; - } +void ShardingDataTransformCumulativeMetrics::onSuccess() { + _countSucceeded.fetchAndAdd(1); +} + +void ShardingDataTransformCumulativeMetrics::onFailure() { + _countFailed.fetchAndAdd(1); +} + +void ShardingDataTransformCumulativeMetrics::onCanceled() { + _countCancelled.fetchAndAdd(1); } void ShardingDataTransformCumulativeMetrics::setLastOpEndingChunkImbalance(int64_t imbalanceCount) { diff --git a/src/mongo/db/s/sharding_data_transform_cumulative_metrics.h b/src/mongo/db/s/sharding_data_transform_cumulative_metrics.h index 596b90024bd..dfd8c989628 100644 --- a/src/mongo/db/s/sharding_data_transform_cumulative_metrics.h +++ b/src/mongo/db/s/sharding_data_transform_cumulative_metrics.h @@ -34,7 +34,6 @@ #include "mongo/db/service_context.h" #include "mongo/platform/atomic_word.h" #include "mongo/platform/mutex.h" -#include "mongo/s/resharding/common_types_gen.h" #include "mongo/util/duration.h" #include "mongo/util/functional.h" #include @@ -97,7 +96,9 @@ public: void reportForServerStatus(BSONObjBuilder* bob) const; void onStarted(); - void onCompletion(ReshardingOperationStatusEnum status); + void onSuccess(); + void onFailure(); + void onCanceled(); void setLastOpEndingChunkImbalance(int64_t imbalanceCount); diff --git a/src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp b/src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp index 8355fc71603..99a221b10ba 100644 --- a/src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp +++ b/src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp @@ -232,7 +232,7 @@ TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsSucceededCount) ASSERT_EQ(report.getObjectField(kTestMetricsName).getIntField("countSucceeded"), 0); } - _cumulativeMetrics.onCompletion(ReshardingOperationStatusEnum::kSuccess); + _cumulativeMetrics.onSuccess(); { BSONObjBuilder bob; @@ -254,7 +254,7 @@ TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsFailedCount) { ASSERT_EQ(report.getObjectField(kTestMetricsName).getIntField("countFailed"), 0); } - _cumulativeMetrics.onCompletion(ReshardingOperationStatusEnum::kFailure); + _cumulativeMetrics.onFailure(); { BSONObjBuilder bob; @@ -276,7 +276,7 @@ TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsCanceledCount) ASSERT_EQ(report.getObjectField(kTestMetricsName).getIntField("countCanceled"), 0); } - _cumulativeMetrics.onCompletion(ReshardingOperationStatusEnum::kCanceled); + _cumulativeMetrics.onCanceled(); { BSONObjBuilder bob; diff --git a/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp b/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp index 630542b03fe..e74155e374b 100644 --- a/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp +++ b/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp @@ -350,6 +350,10 @@ void ShardingDataTransformInstanceMetrics::onOplogEntriesApplied(int64_t numEntr _cumulativeMetrics->onOplogEntriesApplied(numEntries); } +void ShardingDataTransformInstanceMetrics::restoreOplogEntriesApplied(int64_t numEntries) { + _oplogEntriesApplied.store(numEntries); +} + void ShardingDataTransformInstanceMetrics::onWriteDuringCriticalSection() { _writesDuringCriticalSection.addAndFetch(1); _cumulativeMetrics->onWriteDuringCriticalSection(); @@ -408,4 +412,29 @@ void ShardingDataTransformInstanceMetrics::onOplogLocalBatchApplied(Milliseconds _cumulativeMetrics->onOplogLocalBatchApplied(elapsed); } +ShardingDataTransformCumulativeMetrics* +ShardingDataTransformInstanceMetrics::getCumulativeMetrics() { + return _cumulativeMetrics; +} + +void ShardingDataTransformInstanceMetrics::onStarted() { + _cumulativeMetrics->onStarted(); +} + +void ShardingDataTransformInstanceMetrics::onSuccess() { + _cumulativeMetrics->onSuccess(); +} + +void ShardingDataTransformInstanceMetrics::onFailure() { + _cumulativeMetrics->onFailure(); +} + +void ShardingDataTransformInstanceMetrics::onCanceled() { + _cumulativeMetrics->onCanceled(); +} + +void ShardingDataTransformInstanceMetrics::setLastOpEndingChunkImbalance(int64_t imbalanceCount) { + _cumulativeMetrics->setLastOpEndingChunkImbalance(imbalanceCount); +} + } // namespace mongo diff --git a/src/mongo/db/s/sharding_data_transform_instance_metrics.h b/src/mongo/db/s/sharding_data_transform_instance_metrics.h index 18cbcdea812..6c508bbafd8 100644 --- a/src/mongo/db/s/sharding_data_transform_instance_metrics.h +++ b/src/mongo/db/s/sharding_data_transform_instance_metrics.h @@ -66,20 +66,10 @@ public: Date_t getStartTimestamp() const; const UUID& getInstanceId() const; - template - void onStateTransition(T before, boost::none_t after) { - _cumulativeMetrics->onStateTransition(before, after); - } - - template - void onStateTransition(boost::none_t before, T after) { - _cumulativeMetrics->onStateTransition(before, after); - } - - template - void onStateTransition(T before, T after) { - _cumulativeMetrics->onStateTransition(before, after); - } + void onStarted(); + void onSuccess(); + void onFailure(); + void onCanceled(); void onCopyingBegin(); void onCopyingEnd(); @@ -106,6 +96,7 @@ public: void onLocalInsertDuringOplogFetching(Milliseconds elapsed); void onBatchRetrievedDuringOplogApplying(Milliseconds elapsed); void onOplogEntriesApplied(int64_t numEntries); + void restoreOplogEntriesApplied(int64_t numEntries); void onCloningTotalRemoteBatchRetrieval(Milliseconds elapsed); void onOplogLocalBatchApplied(Milliseconds elapsed); void onWriteToStashedCollections(); @@ -121,6 +112,8 @@ public: Seconds getApplyingElapsedTimeSecs() const; Seconds getCriticalSectionElapsedTimeSecs() const; + void setLastOpEndingChunkImbalance(int64_t imbalanceCount); + protected: void restoreCopyingBegin(Date_t date); void restoreCopyingEnd(Date_t date); @@ -134,6 +127,8 @@ protected: int64_t deletesApplied, int64_t writesToStashCollections); + ShardingDataTransformCumulativeMetrics* getCumulativeMetrics(); + const UUID _instanceId; const BSONObj _originalCommand; const NamespaceString _sourceNs; diff --git a/src/mongo/db/s/sharding_data_transform_instance_metrics_test.cpp b/src/mongo/db/s/sharding_data_transform_instance_metrics_test.cpp index 13eb3ae8874..85797e5e361 100644 --- a/src/mongo/db/s/sharding_data_transform_instance_metrics_test.cpp +++ b/src/mongo/db/s/sharding_data_transform_instance_metrics_test.cpp @@ -371,6 +371,21 @@ TEST_F(ShardingDataTransformInstanceMetricsTest, RecipientReportsRemainingTime) ASSERT_EQ(report.getIntField("remainingOperationTimeEstimatedSecs"), 0); } +TEST_F(ShardingDataTransformInstanceMetricsTest, RecipientRestoreAppliedOplogEntries) { + auto metrics = createInstanceMetrics(UUID::gen(), Role::kRecipient); + + auto report = metrics->reportForCurrentOp(); + ASSERT_EQ(report.getIntField("oplogEntriesApplied"), 0); + + metrics->restoreOplogEntriesApplied(120); + report = metrics->reportForCurrentOp(); + ASSERT_EQ(report.getIntField("oplogEntriesApplied"), 120); + + metrics->restoreOplogEntriesApplied(30); + report = metrics->reportForCurrentOp(); + ASSERT_EQ(report.getIntField("oplogEntriesApplied"), 30); +} + TEST_F(ShardingDataTransformInstanceMetricsTest, CurrentOpReportsCopyingTime) { runTimeReportTest( "CurrentOpReportsCopyingTime", diff --git a/src/mongo/db/s/sharding_data_transform_metrics.cpp b/src/mongo/db/s/sharding_data_transform_metrics.cpp index 7725752a8ed..de3aab4f99e 100644 --- a/src/mongo/db/s/sharding_data_transform_metrics.cpp +++ b/src/mongo/db/s/sharding_data_transform_metrics.cpp @@ -47,9 +47,4 @@ StringData ShardingDataTransformMetrics::getRoleName(Role role) { return it->second; } -bool ShardingDataTransformMetrics::isEnabled() { - return feature_flags::gFeatureFlagShardingDataTransformMetrics.isEnabled( - serverGlobalParams.featureCompatibility); -} - } // namespace mongo diff --git a/src/mongo/db/s/sharding_server_status.cpp b/src/mongo/db/s/sharding_server_status.cpp index bc8673000c8..8d560454382 100644 --- a/src/mongo/db/s/sharding_server_status.cpp +++ b/src/mongo/db/s/sharding_server_status.cpp @@ -33,7 +33,6 @@ #include "mongo/db/commands/server_status.h" #include "mongo/db/s/active_migrations_registry.h" #include "mongo/db/s/collection_sharding_state.h" -#include "mongo/db/s/resharding/resharding_metrics.h" #include "mongo/db/s/sharding_data_transform_cumulative_metrics.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/s/sharding_statistics.h" @@ -123,35 +122,22 @@ public: CollectionShardingState::appendInfoForServerStatus(opCtx, &result); } - // The serverStatus command is run before the FCV is initialized so we ignore it when - // checking whether the resharding feature is enabled here. - if (resharding::gFeatureFlagResharding.isEnabledAndIgnoreFCV()) { - if (feature_flags::gFeatureFlagShardingDataTransformMetrics.isEnabledAndIgnoreFCV()) { - // TODO PM-2664: Switch over to using data transform metrics when they have feature - // parity with resharding metrics. - reportDataTransformMetrics(opCtx, &result); - } else { - reportReshardingMetrics(opCtx, &result); - } - } + reportDataTransformMetrics(opCtx, &result); return result.obj(); } - void reportReshardingMetrics(OperationContext* opCtx, BSONObjBuilder* bob) const { - auto metrics = ReshardingMetrics::get(opCtx->getServiceContext()); - if (!metrics->wasReshardingEverAttempted()) { - return; - } - BSONObjBuilder subObjBuilder(bob->subobjStart("resharding")); - metrics->serializeCumulativeOpMetrics(&subObjBuilder); - } - void reportDataTransformMetrics(OperationContext* opCtx, BSONObjBuilder* bob) const { auto sCtx = opCtx->getServiceContext(); using Metrics = ShardingDataTransformCumulativeMetrics; - Metrics::getForResharding(sCtx)->reportForServerStatus(bob); - Metrics::getForGlobalIndexes(sCtx)->reportForServerStatus(bob); + + // The serverStatus command is run before the FCV is initialized so we ignore it when + // checking whether the resharding feature is enabled here. + if (resharding::gFeatureFlagResharding.isEnabledAndIgnoreFCV()) { + Metrics::getForResharding(sCtx)->reportForServerStatus(bob); + // TODO SERVER-67049: move this into a separate feature flag check for global indexes. + Metrics::getForGlobalIndexes(sCtx)->reportForServerStatus(bob); + } } } shardingStatisticsServerStatus; diff --git a/src/mongo/db/s/shardsvr_resharding_operation_time_command.cpp b/src/mongo/db/s/shardsvr_resharding_operation_time_command.cpp index 0f799f6e3ac..4c3e05a7879 100644 --- a/src/mongo/db/s/shardsvr_resharding_operation_time_command.cpp +++ b/src/mongo/db/s/shardsvr_resharding_operation_time_command.cpp @@ -35,7 +35,6 @@ #include "mongo/db/commands.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/primary_only_service.h" -#include "mongo/db/s/resharding/resharding_metrics.h" #include "mongo/db/s/resharding/resharding_recipient_service.h" #include "mongo/db/s/sharding_data_transform_metrics.h" #include "mongo/db/service_context.h" @@ -109,23 +108,17 @@ public: } Response typedRun(OperationContext* opCtx) { - if (ShardingDataTransformMetrics::isEnabled()) { - auto instances = - getReshardingStateMachines( - opCtx, ns()); - if (instances.empty()) { - return Response{boost::none, boost::none}; - } - invariant(instances.size() == 1); - const auto& metrics = instances[0]->getMetrics(); - return Response{duration_cast(metrics.getOperationRunningTimeSecs()), - metrics.getHighEstimateRemainingTimeMillis()}; - } else { - auto metrics = ReshardingMetrics::get(opCtx->getServiceContext()); - return Response{metrics->getOperationElapsedTime(), - metrics->getOperationRemainingTime()}; + auto instances = + getReshardingStateMachines(opCtx, + ns()); + if (instances.empty()) { + return Response{boost::none, boost::none}; } + invariant(instances.size() == 1); + const auto& metrics = instances[0]->getMetrics(); + return Response{duration_cast(metrics.getOperationRunningTimeSecs()), + metrics.getHighEstimateRemainingTimeMillis()}; } }; } _shardsvrReshardingOperationTime; -- cgit v1.2.1