summaryrefslogtreecommitdiff
path: root/src/mongo/db/s
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2022-05-24 15:40:35 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-06-07 19:17:04 +0000
commite53e5cbb1edd1575ba04698d104b0a87e29100a2 (patch)
tree274ddd07c0b37c5cedbd8bdedfc80ba2c9289da0 /src/mongo/db/s
parent1ad84362c2d2a8f08bfe3617123e02b7cd8972f6 (diff)
downloadmongo-e53e5cbb1edd1575ba04698d104b0a87e29100a2.tar.gz
SERVER-66422 Switch over to new resharding metrics
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r--src/mongo/db/s/SConscript3
-rw-r--r--src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp7
-rw-r--r--src/mongo/db/s/resharding/resharding_collection_cloner.cpp24
-rw-r--r--src/mongo/db/s/resharding/resharding_collection_cloner.h23
-rw-r--r--src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp145
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.cpp10
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp8
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.cpp168
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp27
-rw-r--r--src/mongo/db/s/resharding/resharding_data_replication.cpp21
-rw-r--r--src/mongo/db/s/resharding/resharding_data_replication.h5
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp18
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service.cpp106
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service.h7
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service_test.cpp23
-rw-r--r--src/mongo/db/s/resharding/resharding_metrics.cpp782
-rw-r--r--src/mongo/db/s/resharding/resharding_metrics.h203
-rw-r--r--src/mongo/db/s/resharding/resharding_metrics_helpers.cpp3
-rw-r--r--src/mongo/db/s/resharding/resharding_metrics_new.cpp138
-rw-r--r--src/mongo/db/s/resharding/resharding_metrics_new.h56
-rw-r--r--src/mongo/db/s/resharding/resharding_metrics_test.cpp895
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_application.cpp39
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_application.h3
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_applier.cpp52
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_applier.h11
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp31
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp6
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp5
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp20
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_fetcher.h10
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp19
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.cpp160
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.h2
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service_test.cpp42
-rw-r--r--src/mongo/db/s/resharding/resharding_util.cpp1
-rw-r--r--src/mongo/db/s/resharding_test_commands.cpp38
-rw-r--r--src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp24
-rw-r--r--src/mongo/db/s/sharding_data_transform_cumulative_metrics.h5
-rw-r--r--src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp6
-rw-r--r--src/mongo/db/s/sharding_data_transform_instance_metrics.cpp29
-rw-r--r--src/mongo/db/s/sharding_data_transform_instance_metrics.h23
-rw-r--r--src/mongo/db/s/sharding_data_transform_instance_metrics_test.cpp15
-rw-r--r--src/mongo/db/s/sharding_data_transform_metrics.cpp5
-rw-r--r--src/mongo/db/s/sharding_server_status.cpp32
-rw-r--r--src/mongo/db/s/shardsvr_resharding_operation_time_command.cpp27
45 files changed, 465 insertions, 2812 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 713f537e9f6..415bd49e852 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -96,7 +96,6 @@ env.Library(
'resharding/resharding_future_util.cpp',
'resharding/resharding_manual_cleanup.cpp',
'resharding/resharding_metrics_helpers.cpp',
- 'resharding/resharding_metrics.cpp',
'resharding/resharding_metrics_new.cpp',
'resharding/resharding_op_observer.cpp',
'resharding/resharding_oplog_applier.cpp',
@@ -569,7 +568,6 @@ env.CppUnitTest(
'persistent_task_queue_test.cpp',
'range_deletion_util_test.cpp',
'resharding/resharding_agg_test.cpp',
- 'resharding/resharding_collection_cloner_test.cpp',
'resharding/resharding_collection_test.cpp',
'resharding/resharding_data_replication_test.cpp',
'resharding/resharding_destined_recipient_test.cpp',
@@ -577,7 +575,6 @@ env.CppUnitTest(
'resharding/resharding_donor_recipient_common_test.cpp',
'resharding/resharding_donor_service_test.cpp',
'resharding/resharding_metrics_new_test.cpp',
- 'resharding/resharding_metrics_test.cpp',
'resharding/resharding_oplog_applier_test.cpp',
'resharding/resharding_oplog_applier_metrics_test.cpp',
'resharding/resharding_oplog_batch_applier_test.cpp',
diff --git a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp
index 23ea95511ba..7f284e2c642 100644
--- a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp
+++ b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp
@@ -203,10 +203,9 @@ public:
std::move(existingUUID),
std::move(tempReshardingNss),
request().getKey());
- if (ShardingDataTransformMetrics::isEnabled()) {
- commonMetadata.setStartTime(
- opCtx->getServiceContext()->getFastClockSource()->now());
- }
+ commonMetadata.setStartTime(
+ opCtx->getServiceContext()->getFastClockSource()->now());
+
coordinatorDoc.setCommonReshardingMetadata(std::move(commonMetadata));
coordinatorDoc.setZones(request().getZones());
coordinatorDoc.setPresetReshardedChunks(request().get_presetReshardedChunks());
diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp
index f447f9dd1b1..a80bfbb88ec 100644
--- a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp
+++ b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp
@@ -50,7 +50,6 @@
#include "mongo/db/s/resharding/document_source_resharding_ownership_match.h"
#include "mongo/db/s/resharding/resharding_data_copy_util.h"
#include "mongo/db/s/resharding/resharding_future_util.h"
-#include "mongo/db/s/resharding/resharding_metrics.h"
#include "mongo/db/s/resharding/resharding_metrics_new.h"
#include "mongo/db/s/resharding/resharding_server_parameters_gen.h"
#include "mongo/db/s/resharding/resharding_util.h"
@@ -81,14 +80,14 @@ bool collectionHasSimpleCollation(OperationContext* opCtx, const NamespaceString
} // namespace
-ReshardingCollectionCloner::ReshardingCollectionCloner(std::unique_ptr<Env> env,
+ReshardingCollectionCloner::ReshardingCollectionCloner(ReshardingMetricsNew* metrics,
ShardKeyPattern newShardKeyPattern,
NamespaceString sourceNss,
const UUID& sourceUUID,
ShardId recipientShard,
Timestamp atClusterTime,
NamespaceString outputNss)
- : _env(std::move(env)),
+ : _metrics(metrics),
_newShardKeyPattern(std::move(newShardKeyPattern)),
_sourceNss(std::move(sourceNss)),
_sourceUUID(std::move(sourceUUID)),
@@ -270,12 +269,9 @@ bool ReshardingCollectionCloner::doOneBatch(OperationContext* opCtx, Pipeline& p
Timer latencyTimer;
auto batch = resharding::data_copy::fillBatchForInsert(
pipeline, resharding::gReshardingCollectionClonerBatchSizeInBytes.load());
- _env->metrics()->onCollClonerFillBatchForInsert(
+
+ _metrics->onCloningTotalRemoteBatchRetrieval(
duration_cast<Milliseconds>(latencyTimer.elapsed()));
- if (ShardingDataTransformMetrics::isEnabled()) {
- _env->metricsNew()->onCloningTotalRemoteBatchRetrieval(
- duration_cast<Milliseconds>(latencyTimer.elapsed()));
- }
if (batch.empty()) {
return false;
@@ -294,15 +290,9 @@ bool ReshardingCollectionCloner::doOneBatch(OperationContext* opCtx, Pipeline& p
int bytesInserted = resharding::data_copy::withOneStaleConfigRetry(
opCtx, [&] { return resharding::data_copy::insertBatch(opCtx, _outputNss, batch); });
- _env->metrics()->onDocumentsCopied(batch.size(), bytesInserted);
- _env->metrics()->gotInserts(batch.size());
- if (ShardingDataTransformMetrics::isEnabled()) {
- _env->metricsNew()->onDocumentsCopied(
- batch.size(), bytesInserted, Milliseconds(batchInsertTimer.millis()));
- // TODO: Remove this comment when ReshardingMetrics are replaced with ReshardingMetricsNew.
- // ReshardingMetricsNew::onInsertsApplied is intentionally not called here. Documents copied
- // are no longer considered applied inserts.
- }
+ _metrics->onDocumentsCopied(
+ batch.size(), bytesInserted, Milliseconds(batchInsertTimer.millis()));
+
return true;
}
diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner.h b/src/mongo/db/s/resharding/resharding_collection_cloner.h
index 06e38be80fc..e24b03c76b6 100644
--- a/src/mongo/db/s/resharding/resharding_collection_cloner.h
+++ b/src/mongo/db/s/resharding/resharding_collection_cloner.h
@@ -52,7 +52,6 @@ class TaskExecutor;
class OperationContext;
class MongoProcessInterface;
-class ReshardingMetrics;
class ReshardingMetricsNew;
class ServiceContext;
@@ -62,25 +61,7 @@ class ServiceContext;
*/
class ReshardingCollectionCloner {
public:
- class Env {
- public:
- explicit Env(ReshardingMetrics* metrics, ReshardingMetricsNew* metricsNew)
- : _metrics(metrics), _metricsNew(metricsNew) {}
-
- ReshardingMetrics* metrics() const {
- return _metrics;
- }
-
- ReshardingMetricsNew* metricsNew() const {
- return _metricsNew;
- }
-
- private:
- ReshardingMetrics* const _metrics;
- ReshardingMetricsNew* const _metricsNew;
- };
-
- ReshardingCollectionCloner(std::unique_ptr<Env> env,
+ ReshardingCollectionCloner(ReshardingMetricsNew* metrics,
ShardKeyPattern newShardKeyPattern,
NamespaceString sourceNss,
const UUID& sourceUUID,
@@ -118,7 +99,7 @@ private:
std::unique_ptr<Pipeline, PipelineDeleter> _restartPipeline(OperationContext* opCtx);
- const std::unique_ptr<Env> _env;
+ ReshardingMetricsNew* _metrics;
const ShardKeyPattern _newShardKeyPattern;
const NamespaceString _sourceNss;
const UUID _sourceUUID;
diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp b/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp
deleted file mode 100644
index 62330924df8..00000000000
--- a/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Copyright (C) 2020-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-
-#include "mongo/platform/basic.h"
-
-#include <vector>
-
-#include "mongo/bson/bsonmisc.h"
-#include "mongo/bson/json.h"
-#include "mongo/db/exec/document_value/document_value_test_util.h"
-#include "mongo/db/hasher.h"
-#include "mongo/db/pipeline/document_source_mock.h"
-#include "mongo/db/s/resharding/resharding_collection_cloner.h"
-#include "mongo/db/s/resharding/resharding_metrics.h"
-#include "mongo/db/s/resharding/resharding_metrics_new.h"
-#include "mongo/db/s/resharding/resharding_util.h"
-#include "mongo/db/service_context_test_fixture.h"
-#include "mongo/unittest/unittest.h"
-
-#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
-
-
-namespace mongo {
-namespace {
-
-using Doc = Document;
-using Arr = std::vector<Value>;
-using V = Value;
-
-/**
- * Mock interface to allow specifying mock results for the 'from' collection of the $lookup stage.
- */
-class MockMongoInterface final : public StubMongoProcessInterface {
-public:
- MockMongoInterface(std::deque<DocumentSource::GetNextResult> mockResults)
- : _mockResults(std::move(mockResults)) {}
-
- std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
- Pipeline* ownedPipeline,
- ShardTargetingPolicy shardTargetingPolicy = ShardTargetingPolicy::kAllowed,
- boost::optional<BSONObj> readConcern = boost::none) final {
- std::unique_ptr<Pipeline, PipelineDeleter> pipeline(
- ownedPipeline, PipelineDeleter(ownedPipeline->getContext()->opCtx));
-
- pipeline->addInitialSource(
- DocumentSourceMock::createForTest(_mockResults, pipeline->getContext()));
- return pipeline;
- }
-
-private:
- std::deque<DocumentSource::GetNextResult> _mockResults;
-};
-
-class ReshardingCollectionClonerTest : public ServiceContextTest {
-protected:
- std::unique_ptr<Pipeline, PipelineDeleter> makePipeline(
- ShardKeyPattern newShardKeyPattern,
- ShardId recipientShard,
- std::deque<DocumentSource::GetNextResult> sourceCollectionData,
- std::deque<DocumentSource::GetNextResult> configCacheChunksData) {
- auto tempNss = constructTemporaryReshardingNss(_sourceNss.db(), _sourceUUID);
-
- _metricsNew =
- ReshardingMetricsNew::makeInstance(_sourceUUID,
- newShardKeyPattern.toBSON(),
- _sourceNss,
- ReshardingMetricsNew::Role::kRecipient,
- getServiceContext()->getFastClockSource()->now(),
- getServiceContext());
-
- ReshardingCollectionCloner cloner(
- std::make_unique<ReshardingCollectionCloner::Env>(_metrics.get(), _metricsNew.get()),
- std::move(newShardKeyPattern),
- _sourceNss,
- _sourceUUID,
- std::move(recipientShard),
- Timestamp(1, 0), /* dummy value */
- std::move(tempNss));
-
- auto pipeline = cloner.makePipeline(
- _opCtx.get(), std::make_shared<MockMongoInterface>(std::move(configCacheChunksData)));
-
- pipeline->addInitialSource(DocumentSourceMock::createForTest(
- std::move(sourceCollectionData), pipeline->getContext()));
-
- return pipeline;
- }
-
- template <class T>
- auto getHashedElementValue(T value) {
- return BSONElementHasher::hash64(BSON("" << value).firstElement(),
- BSONElementHasher::DEFAULT_HASH_SEED);
- }
-
- void setUp() override {
- ServiceContextTest::setUp();
- _metrics = std::make_unique<ReshardingMetrics>(getServiceContext());
- _metrics->onStart(ReshardingMetrics::Role::kRecipient,
- getServiceContext()->getFastClockSource()->now());
- _metrics->setRecipientState(RecipientStateEnum::kCloning);
- }
-
- void tearDown() override {
- _metrics = nullptr;
- ServiceContextTest::tearDown();
- }
-
-private:
- const NamespaceString _sourceNss = NamespaceString("test"_sd, "collection_being_resharded"_sd);
- const UUID _sourceUUID = UUID::gen();
-
- ServiceContext::UniqueOperationContext _opCtx = makeOperationContext();
- std::unique_ptr<ReshardingMetrics> _metrics;
- std::unique_ptr<ReshardingMetricsNew> _metricsNew;
-};
-
-} // namespace
-} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.cpp b/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.cpp
index 314812f7158..ae6b61fb314 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.cpp
@@ -37,7 +37,6 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/client/read_preference.h"
#include "mongo/db/cancelable_operation_context.h"
-#include "mongo/db/s/resharding/resharding_metrics.h"
#include "mongo/db/s/resharding/resharding_server_parameters_gen.h"
#include "mongo/logv2/log.h"
#include "mongo/s/async_requests_sender.h"
@@ -210,13 +209,8 @@ ExecutorFuture<void> CoordinatorCommitMonitor::_makeFuture() const {
return RemainingOperationTimes{Milliseconds(0), Milliseconds::max()};
})
.then([this, anchor = shared_from_this()](RemainingOperationTimes remainingTimes) {
- auto metrics = ReshardingMetrics::get(cc().getServiceContext());
- metrics->setMinRemainingOperationTime(remainingTimes.min);
- metrics->setMaxRemainingOperationTime(remainingTimes.max);
- if (ShardingDataTransformMetrics::isEnabled()) {
- _metricsNew->setCoordinatorHighEstimateRemainingTimeMillis(remainingTimes.max);
- _metricsNew->setCoordinatorLowEstimateRemainingTimeMillis(remainingTimes.min);
- }
+ _metricsNew->setCoordinatorHighEstimateRemainingTimeMillis(remainingTimes.max);
+ _metricsNew->setCoordinatorLowEstimateRemainingTimeMillis(remainingTimes.min);
// Check if all recipient shards are within the commit threshold.
if (remainingTimes.max <= _threshold)
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp
index cc9431145ae..1cc717b7aec 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp
@@ -40,7 +40,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/s/config/config_server_test_fixture.h"
#include "mongo/db/s/resharding/resharding_coordinator_commit_monitor.h"
-#include "mongo/db/s/resharding/resharding_metrics.h"
+#include "mongo/db/s/resharding/resharding_metrics_new.h"
#include "mongo/db/s/resharding/resharding_server_parameters_gen.h"
#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
#include "mongo/logv2/log.h"
@@ -122,11 +122,6 @@ auto makeExecutor() {
void CoordinatorCommitMonitorTest::setUp() {
ConfigServerTestFixture::setUp();
- auto clockSource = getServiceContext()->getFastClockSource();
- auto metrics = ReshardingMetrics::get(getServiceContext());
- metrics->onStart(ReshardingMetrics::Role::kCoordinator, clockSource->now());
- metrics->setCoordinatorState(CoordinatorStateEnum::kApplying);
-
auto hostNameForShard = [](const ShardId& shard) -> std::string {
return fmt::format("{}:1234", shard.toString());
};
@@ -155,6 +150,7 @@ void CoordinatorCommitMonitorTest::setUp() {
_cancellationSource = std::make_unique<CancellationSource>();
+ auto clockSource = getServiceContext()->getFastClockSource();
_metrics = std::make_shared<ReshardingMetricsNew>(
UUID::gen(),
BSON("y" << 1),
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
index 68ad17e3b62..cbd6232a5d1 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
@@ -48,7 +48,6 @@
#include "mongo/db/s/config/sharding_catalog_manager.h"
#include "mongo/db/s/resharding/resharding_coordinator_commit_monitor.h"
#include "mongo/db/s/resharding/resharding_future_util.h"
-#include "mongo/db/s/resharding/resharding_metrics.h"
#include "mongo/db/s/resharding/resharding_server_parameters_gen.h"
#include "mongo/db/s/resharding/resharding_util.h"
#include "mongo/db/s/sharding_data_transform_cumulative_metrics.h"
@@ -244,9 +243,7 @@ void writeToCoordinatorStateNss(OperationContext* opCtx,
*approxDocumentsToCopy);
}
- if (ShardingDataTransformMetrics::isEnabled()) {
- buildStateDocumentMetricsForUpdate(setBuilder, metrics, nextState);
- }
+ buildStateDocumentMetricsForUpdate(setBuilder, metrics, nextState);
if (nextState == CoordinatorStateEnum::kPreparingToDonate) {
appendShardEntriesToSetBuilder(coordinatorDoc, setBuilder);
@@ -317,9 +314,7 @@ BSONObj createReshardingFieldsUpdateForOriginalNss(
TypeCollectionReshardingFields originalEntryReshardingFields(
coordinatorDoc.getReshardingUUID());
originalEntryReshardingFields.setState(coordinatorDoc.getState());
- if (ShardingDataTransformMetrics::isEnabled()) {
- originalEntryReshardingFields.setStartTime(coordinatorDoc.getStartTime());
- }
+ originalEntryReshardingFields.setStartTime(coordinatorDoc.getStartTime());
return BSON("$set" << BSON(CollectionType::kReshardingFieldsFieldName
<< originalEntryReshardingFields.toBSON()
@@ -613,41 +608,8 @@ BSONObj makeFlushRoutingTableCacheUpdatesCmd(const NamespaceString& nss) {
BSON(WriteConcernOptions::kWriteConcernField << kMajorityWriteConcern.toBSON()));
}
-ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum toMetricsState(
- CoordinatorStateEnum enumVal) {
- switch (enumVal) {
- case CoordinatorStateEnum::kUnused:
- return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kUnused;
-
- case CoordinatorStateEnum::kInitializing:
- return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kInitializing;
-
- case CoordinatorStateEnum::kPreparingToDonate:
- return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kPreparingToDonate;
-
- case CoordinatorStateEnum::kCloning:
- return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kCloning;
-
- case CoordinatorStateEnum::kApplying:
- return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kApplying;
-
- case CoordinatorStateEnum::kBlockingWrites:
- return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kBlockingWrites;
-
- case CoordinatorStateEnum::kAborting:
- return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kAborting;
-
- case CoordinatorStateEnum::kCommitting:
- return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kCommitting;
-
- case CoordinatorStateEnum::kDone:
- return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kDone;
- default:
- invariant(false,
- str::stream() << "Unexpected resharding coordinator state: "
- << CoordinatorState_serializer(enumVal));
- MONGO_UNREACHABLE;
- }
+ReshardingMetricsNew::CoordinatorState toMetricsState(CoordinatorStateEnum state) {
+ return ReshardingMetricsNew::CoordinatorState(state);
}
} // namespace
@@ -668,9 +630,7 @@ CollectionType createTempReshardingCollectionType(
TypeCollectionReshardingFields tempEntryReshardingFields(coordinatorDoc.getReshardingUUID());
tempEntryReshardingFields.setState(coordinatorDoc.getState());
- if (ShardingDataTransformMetrics::isEnabled()) {
- tempEntryReshardingFields.setStartTime(coordinatorDoc.getStartTime());
- }
+ tempEntryReshardingFields.setStartTime(coordinatorDoc.getStartTime());
auto recipientFields = constructRecipientFields(coordinatorDoc);
tempEntryReshardingFields.setRecipientFields(std::move(recipientFields));
@@ -888,10 +848,8 @@ void removeCoordinatorDocAndReshardingFields(OperationContext* opCtx,
},
ShardingCatalogClient::kLocalWriteConcern);
- if (metrics && ShardingDataTransformMetrics::isEnabled()) {
- metrics->onStateTransition(toMetricsState(coordinatorDoc.getState()),
- toMetricsState(updatedCoordinatorDoc.getState()));
- }
+ metrics->onStateTransition(toMetricsState(coordinatorDoc.getState()),
+ toMetricsState(updatedCoordinatorDoc.getState()));
}
} // namespace resharding
@@ -1078,10 +1036,7 @@ ReshardingCoordinatorService::ReshardingCoordinator::ReshardingCoordinator(
: PrimaryOnlyService::TypedInstance<ReshardingCoordinator>(),
_id(coordinatorDoc.getReshardingUUID().toBSON()),
_coordinatorService(coordinatorService),
- _metricsNew{
- ShardingDataTransformMetrics::isEnabled()
- ? ReshardingMetricsNew::initializeFrom(coordinatorDoc, getGlobalServiceContext())
- : nullptr},
+ _metricsNew{ReshardingMetricsNew::initializeFrom(coordinatorDoc, getGlobalServiceContext())},
_metadata(coordinatorDoc.getCommonReshardingMetadata()),
_coordinatorDoc(coordinatorDoc),
_markKilledExecutor(std::make_shared<ThreadPool>([] {
@@ -1100,9 +1055,7 @@ ReshardingCoordinatorService::ReshardingCoordinator::ReshardingCoordinator(
_reshardingCoordinatorObserver->onReshardingParticipantTransition(coordinatorDoc);
}
- if (ShardingDataTransformMetrics::isEnabled()) {
- _metricsNew->onStateTransition(boost::none, toMetricsState(coordinatorDoc.getState()));
- }
+ _metricsNew->onStateTransition(boost::none, toMetricsState(coordinatorDoc.getState()));
}
void ReshardingCoordinatorService::ReshardingCoordinator::installCoordinatorDoc(
@@ -1127,10 +1080,8 @@ void ReshardingCoordinatorService::ReshardingCoordinator::installCoordinatorDoc(
const auto previousState = _coordinatorDoc.getState();
_coordinatorDoc = doc;
- if (ShardingDataTransformMetrics::isEnabled()) {
- _metricsNew->onStateTransition(toMetricsState(previousState),
- toMetricsState(_coordinatorDoc.getState()));
- }
+ _metricsNew->onStateTransition(toMetricsState(previousState),
+ toMetricsState(_coordinatorDoc.getState()));
ShardingLogging::get(opCtx)->logChange(opCtx,
"resharding.coordinator.transition",
@@ -1139,24 +1090,13 @@ void ReshardingCoordinatorService::ReshardingCoordinator::installCoordinatorDoc(
kMajorityWriteConcern);
}
-void markCompleted(const Status& status) {
- auto metrics = ReshardingMetrics::get(cc().getServiceContext());
- auto metricsOperationStatus = [&] {
- if (status.isOK()) {
- return ReshardingOperationStatusEnum::kSuccess;
- } else if (status == ErrorCodes::ReshardCollectionAborted) {
- return ReshardingOperationStatusEnum::kCanceled;
- } else {
- return ReshardingOperationStatusEnum::kFailure;
- }
- }();
-
- metrics->onCompletion(
- ReshardingMetrics::Role::kCoordinator, metricsOperationStatus, getCurrentTime());
-
- if (ShardingDataTransformMetrics::isEnabled()) {
- ShardingDataTransformCumulativeMetrics::getForResharding(cc().getServiceContext())
- ->onCompletion(metricsOperationStatus);
+void markCompleted(const Status& status, ReshardingMetricsNew* metrics) {
+ if (status.isOK()) {
+ metrics->onSuccess();
+ } else if (status == ErrorCodes::ReshardCollectionAborted) {
+ metrics->onCanceled();
+ } else {
+ metrics->onFailure();
}
}
@@ -1380,9 +1320,8 @@ ReshardingCoordinatorService::ReshardingCoordinator::_commitAndFinishReshardOper
})
.then([this, executor] { return _awaitAllParticipantShardsDone(executor); })
.then([this, executor] {
- if (ShardingDataTransformMetrics::isEnabled()) {
- _metricsNew->onCriticalSectionEnd();
- }
+ _metricsNew->onCriticalSectionEnd();
+
// Best-effort attempt to trigger a refresh on the participant shards so
// they see the collection metadata without reshardingFields and no longer
// throw ReshardCollectionInProgress. There is no guarantee this logic ever
@@ -1464,13 +1403,6 @@ SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run(
.onCompletion([outerStatus](Status) { return outerStatus; });
})
.onCompletion([this, self = shared_from_this()](Status status) {
- // On stepdown or shutdown, the _scopedExecutor may have already been shut down.
- // Schedule cleanup work on the parent executor.
- if (_ctHolder->isSteppingOrShuttingDown()) {
- ReshardingMetrics::get(cc().getServiceContext())
- ->onStepDown(ReshardingMetrics::Role::kCoordinator);
- }
-
if (!status.isOK()) {
{
auto lg = stdx::lock_guard(_fulfillmentMutex);
@@ -1485,10 +1417,7 @@ SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run(
_reshardingCoordinatorObserver->interrupt(status);
}
- if (ShardingDataTransformMetrics::isEnabled()) {
- _metricsNew->onStateTransition(toMetricsState(_coordinatorDoc.getState()),
- boost::none);
- }
+ _metricsNew->onStateTransition(toMetricsState(_coordinatorDoc.getState()), boost::none);
})
.semi();
}
@@ -1502,9 +1431,8 @@ ExecutorFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::_onAbo
return resharding::WithAutomaticRetry([this, executor, status] {
auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
- // Notify `ReshardingMetrics` as the operation is now complete for external
- // observers.
- markCompleted(status);
+ // Notify metrics as the operation is now complete for external observers.
+ markCompleted(status, _metricsNew.get());
// The temporary collection and its corresponding entries were never created. Only
// the coordinator document and reshardingFields require cleanup.
@@ -1578,16 +1506,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator::abort() {
boost::optional<BSONObj> ReshardingCoordinatorService::ReshardingCoordinator::reportForCurrentOp(
MongoProcessInterface::CurrentOpConnectionsMode,
MongoProcessInterface::CurrentOpSessionsMode) noexcept {
- if (ShardingDataTransformMetrics::isEnabled()) {
- return _metricsNew->reportForCurrentOp();
- }
-
- ReshardingMetrics::ReporterOptions options(ReshardingMetrics::Role::kCoordinator,
- _coordinatorDoc.getReshardingUUID(),
- _coordinatorDoc.getSourceNss(),
- _coordinatorDoc.getReshardingKey().toBSON(),
- false);
- return ReshardingMetrics::get(cc().getServiceContext())->reportForCurrentOp(options);
+ return _metricsNew->reportForCurrentOp();
}
std::shared_ptr<ReshardingCoordinatorObserver>
@@ -1627,8 +1546,6 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_insertCoordDocAndChan
if (_coordinatorDoc.getState() > CoordinatorStateEnum::kUnused) {
if (!_coordinatorDocWrittenPromise.getFuture().isReady()) {
_coordinatorDocWrittenPromise.emplaceValue();
- ReshardingMetrics::get(cc().getServiceContext())
- ->onStepUp(ReshardingMetrics::Role::kCoordinator);
}
if (_coordinatorDoc.getState() == CoordinatorStateEnum::kAborting) {
@@ -1650,15 +1567,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_insertCoordDocAndChan
{
// Note: don't put blocking or interruptible code in this block.
_coordinatorDocWrittenPromise.emplaceValue();
-
- // TODO SERVER-53914 to accommodate loading metrics for the coordinator.
- ReshardingMetrics::get(cc().getServiceContext())
- ->onStart(ReshardingMetrics::Role::kCoordinator, getCurrentTime());
-
- if (ShardingDataTransformMetrics::isEnabled()) {
- ShardingDataTransformCumulativeMetrics::getForResharding(cc().getServiceContext())
- ->onStarted();
- }
+ _metricsNew->onStarted();
}
pauseBeforeInsertCoordinatorDoc.pauseWhileSet();
@@ -1750,9 +1659,7 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllDonorsReadyToDonat
coordinatorDocChangedOnDisk,
highestMinFetchTimestamp,
computeApproxCopySize(coordinatorDocChangedOnDisk));
- if (ShardingDataTransformMetrics::isEnabled()) {
- _metricsNew->onCopyingBegin();
- }
+ _metricsNew->onCopyingBegin();
})
.then([this] { return _waitForMajority(_ctHolder->getAbortToken()); });
}
@@ -1771,10 +1678,8 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsFinished
.then([this](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) {
this->_updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum::kApplying,
coordinatorDocChangedOnDisk);
- if (ShardingDataTransformMetrics::isEnabled()) {
- _metricsNew->onCopyingEnd();
- _metricsNew->onApplyingBegin();
- }
+ _metricsNew->onCopyingEnd();
+ _metricsNew->onApplyingBegin();
})
.then([this] { return _waitForMajority(_ctHolder->getAbortToken()); });
}
@@ -1833,10 +1738,8 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsFinished
this->_updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum::kBlockingWrites,
_coordinatorDoc);
- if (ShardingDataTransformMetrics::isEnabled()) {
- _metricsNew->onApplyingEnd();
- _metricsNew->onCriticalSectionBegin();
- }
+ _metricsNew->onApplyingEnd();
+ _metricsNew->onCriticalSectionBegin();
})
.then([this] { return _waitForMajority(_ctHolder->getAbortToken()); })
.thenRunOn(**executor)
@@ -1975,8 +1878,8 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllParticipantShardsD
reshardingPauseCoordinatorBeforeRemovingStateDoc.pauseWhileSetAndNotCanceled(
opCtx.get(), _ctHolder->getStepdownToken());
- // Notify `ReshardingMetrics` as the operation is now complete for external observers.
- markCompleted(abortReason ? *abortReason : Status::OK());
+ // Notify metrics as the operation is now complete for external observers.
+ markCompleted(abortReason ? *abortReason : Status::OK(), _metricsNew.get());
resharding::removeCoordinatorDocAndReshardingFields(
opCtx.get(), _metricsNew.get(), coordinatorDoc, abortReason);
@@ -2133,12 +2036,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_updateChunkImbalanceM
auto imbalanceCount =
getMaxChunkImbalanceCount(routingInfo, allShardsWithOpTime.value, zoneInfo);
- ReshardingMetrics::get(opCtx->getServiceContext())
- ->setLastReshardChunkImbalanceCount(imbalanceCount);
- if (ShardingDataTransformMetrics::isEnabled()) {
- ShardingDataTransformCumulativeMetrics::getForResharding(opCtx->getServiceContext())
- ->setLastOpEndingChunkImbalance(imbalanceCount);
- }
+ _metricsNew->setLastOpEndingChunkImbalance(imbalanceCount);
} catch (const DBException& ex) {
LOGV2_WARNING(5543000,
"Encountered error while trying to update resharding chunk imbalance metrics",
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp
index 7f57851d8e2..dc16d5fe271 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp
@@ -42,7 +42,6 @@
#include "mongo/db/repl/wait_for_majority_service.h"
#include "mongo/db/s/config/config_server_test_fixture.h"
#include "mongo/db/s/resharding/resharding_coordinator_service.h"
-#include "mongo/db/s/resharding/resharding_metrics.h"
#include "mongo/db/s/resharding/resharding_op_observer.h"
#include "mongo/db/s/resharding/resharding_server_parameters_gen.h"
#include "mongo/db/s/resharding/resharding_service_test_helpers.h"
@@ -198,6 +197,8 @@ public:
CoordinatorStateEnum state, boost::optional<Timestamp> fetchTimestamp = boost::none) {
CommonReshardingMetadata meta(
_reshardingUUID, _originalNss, UUID::gen(), _tempNss, _newShardKey.toBSON());
+ meta.setStartTime(getServiceContext()->getFastClockSource()->now());
+
ReshardingCoordinatorDocument doc(state,
{DonorShardEntry(ShardId("shard0000"), {})},
{RecipientShardEntry(ShardId("shard0001"), {})});
@@ -798,7 +799,6 @@ TEST_F(ReshardingCoordinatorServiceTest, StepDownStepUpEachTransition) {
stateTransitionsGuard.wait(state);
-
stepDown(opCtx);
ASSERT_EQ(coordinator->getCompletionFuture().getNoThrow(),
@@ -806,15 +806,6 @@ TEST_F(ReshardingCoordinatorServiceTest, StepDownStepUpEachTransition) {
coordinator.reset();
- // Metrics should be cleared after step down.
- {
- auto metrics = ReshardingMetrics::get(opCtx->getServiceContext());
- BSONObjBuilder metricsBuilder;
- metrics->serializeCurrentOpMetrics(&metricsBuilder,
- ReshardingMetrics::Role::kCoordinator);
- ASSERT_BSONOBJ_EQ(BSONObj(), metricsBuilder.done());
- }
-
stepUp(opCtx);
stateTransitionsGuard.unset(state);
@@ -827,12 +818,6 @@ TEST_F(ReshardingCoordinatorServiceTest, StepDownStepUpEachTransition) {
// 'done' state is never written to storage so don't wait for it.
waitUntilCommittedCoordinatorDocReach(opCtx, state);
-
- // Metrics should not be empty after step up.
- auto metrics = ReshardingMetrics::get(opCtx->getServiceContext());
- BSONObjBuilder metricsBuilder;
- metrics->serializeCurrentOpMetrics(&metricsBuilder, ReshardingMetrics::Role::kCoordinator);
- ASSERT_BSONOBJ_NE(BSONObj(), metricsBuilder.done());
}
makeDonorsProceedToDone(opCtx);
@@ -843,14 +828,6 @@ TEST_F(ReshardingCoordinatorServiceTest, StepDownStepUpEachTransition) {
coordinator->getCompletionFuture().get(opCtx);
}
- // Metrics should be cleared after commit.
- {
- auto metrics = ReshardingMetrics::get(opCtx->getServiceContext());
- BSONObjBuilder metricsBuilder;
- metrics->serializeCurrentOpMetrics(&metricsBuilder, ReshardingMetrics::Role::kCoordinator);
- ASSERT_BSONOBJ_EQ(BSONObj(), metricsBuilder.done());
- }
-
{
DBDirectClient client(opCtx);
diff --git a/src/mongo/db/s/resharding/resharding_data_replication.cpp b/src/mongo/db/s/resharding/resharding_data_replication.cpp
index 1b2bd3c2fa4..4143c8c0c76 100644
--- a/src/mongo/db/s/resharding/resharding_data_replication.cpp
+++ b/src/mongo/db/s/resharding/resharding_data_replication.cpp
@@ -36,7 +36,6 @@
#include "mongo/db/s/resharding/resharding_collection_cloner.h"
#include "mongo/db/s/resharding/resharding_data_copy_util.h"
#include "mongo/db/s/resharding/resharding_future_util.h"
-#include "mongo/db/s/resharding/resharding_metrics.h"
#include "mongo/db/s/resharding/resharding_oplog_applier.h"
#include "mongo/db/s/resharding/resharding_oplog_fetcher.h"
#include "mongo/db/s/resharding/resharding_server_parameters_gen.h"
@@ -82,13 +81,12 @@ void ensureFulfilledPromise(SharedPromise<void>& sp, Status error) {
} // namespace
std::unique_ptr<ReshardingCollectionCloner> ReshardingDataReplication::_makeCollectionCloner(
- ReshardingMetrics* metrics,
ReshardingMetricsNew* metricsNew,
const CommonReshardingMetadata& metadata,
const ShardId& myShardId,
Timestamp cloneTimestamp) {
return std::make_unique<ReshardingCollectionCloner>(
- std::make_unique<ReshardingCollectionCloner::Env>(metrics, metricsNew),
+ metricsNew,
ShardKeyPattern{metadata.getReshardingKey()},
metadata.getSourceNss(),
metadata.getSourceUUID(),
@@ -114,7 +112,6 @@ std::vector<std::unique_ptr<ReshardingTxnCloner>> ReshardingDataReplication::_ma
std::vector<std::unique_ptr<ReshardingOplogFetcher>> ReshardingDataReplication::_makeOplogFetchers(
OperationContext* opCtx,
- ReshardingMetrics* metrics,
ReshardingMetricsNew* metricsNew,
const CommonReshardingMetadata& metadata,
const std::vector<DonorShardFetchTimestamp>& donorShards,
@@ -131,8 +128,7 @@ std::vector<std::unique_ptr<ReshardingOplogFetcher>> ReshardingDataReplication::
invariant((idToResumeFrom >= ReshardingDonorOplogId{minFetchTimestamp, minFetchTimestamp}));
oplogFetchers.emplace_back(std::make_unique<ReshardingOplogFetcher>(
- std::make_unique<ReshardingOplogFetcher::Env>(
- opCtx->getServiceContext(), metrics, metricsNew),
+ std::make_unique<ReshardingOplogFetcher::Env>(opCtx->getServiceContext(), metricsNew),
metadata.getReshardingUUID(),
metadata.getSourceUUID(),
// The recipient fetches oplog entries from the donor starting from the largest _id
@@ -168,7 +164,6 @@ std::shared_ptr<executor::TaskExecutor> ReshardingDataReplication::_makeOplogFet
std::vector<std::unique_ptr<ReshardingOplogApplier>> ReshardingDataReplication::_makeOplogAppliers(
OperationContext* opCtx,
- ReshardingMetrics* metrics,
ReshardingApplierMetricsMap* applierMetricsMap,
const CommonReshardingMetadata& metadata,
const std::vector<DonorShardFetchTimestamp>& donorShards,
@@ -191,8 +186,8 @@ std::vector<std::unique_ptr<ReshardingOplogApplier>> ReshardingDataReplication::
auto applierMetrics = (*applierMetricsMap)[donorShardId].get();
oplogAppliers.emplace_back(std::make_unique<ReshardingOplogApplier>(
- std::make_unique<ReshardingOplogApplier::Env>(
- opCtx->getServiceContext(), metrics, applierMetrics),
+ std::make_unique<ReshardingOplogApplier::Env>(opCtx->getServiceContext(),
+ applierMetrics),
std::move(sourceId),
oplogBufferNss,
metadata.getTempReshardingNss(),
@@ -211,7 +206,6 @@ std::vector<std::unique_ptr<ReshardingOplogApplier>> ReshardingDataReplication::
std::unique_ptr<ReshardingDataReplicationInterface> ReshardingDataReplication::make(
OperationContext* opCtx,
- ReshardingMetrics* metrics,
ReshardingMetricsNew* metricsNew,
ReshardingApplierMetricsMap* applierMetricsMap,
CommonReshardingMetadata metadata,
@@ -224,19 +218,16 @@ std::unique_ptr<ReshardingDataReplicationInterface> ReshardingDataReplication::m
std::vector<std::unique_ptr<ReshardingTxnCloner>> txnCloners;
if (!cloningDone) {
- collectionCloner =
- _makeCollectionCloner(metrics, metricsNew, metadata, myShardId, cloneTimestamp);
+ collectionCloner = _makeCollectionCloner(metricsNew, metadata, myShardId, cloneTimestamp);
txnCloners = _makeTxnCloners(metadata, donorShards);
}
- auto oplogFetchers =
- _makeOplogFetchers(opCtx, metrics, metricsNew, metadata, donorShards, myShardId);
+ auto oplogFetchers = _makeOplogFetchers(opCtx, metricsNew, metadata, donorShards, myShardId);
auto oplogFetcherExecutor = _makeOplogFetcherExecutor(donorShards.size());
auto stashCollections = ensureStashCollectionsExist(opCtx, sourceChunkMgr, donorShards);
auto oplogAppliers = _makeOplogAppliers(opCtx,
- metrics,
applierMetricsMap,
metadata,
donorShards,
diff --git a/src/mongo/db/s/resharding/resharding_data_replication.h b/src/mongo/db/s/resharding/resharding_data_replication.h
index 47417ae8363..f8348646758 100644
--- a/src/mongo/db/s/resharding/resharding_data_replication.h
+++ b/src/mongo/db/s/resharding/resharding_data_replication.h
@@ -49,7 +49,6 @@ namespace mongo {
class OperationContext;
class ReshardingOplogApplier;
class ReshardingCollectionCloner;
-class ReshardingMetrics;
class ReshardingOplogFetcher;
class ReshardingTxnCloner;
class ServiceContext;
@@ -141,7 +140,6 @@ private:
public:
static std::unique_ptr<ReshardingDataReplicationInterface> make(
OperationContext* opCtx,
- ReshardingMetrics* metrics,
ReshardingMetricsNew* metricsNew,
ReshardingApplierMetricsMap* applierMetricsMap,
CommonReshardingMetadata metadata,
@@ -198,7 +196,6 @@ public:
private:
static std::unique_ptr<ReshardingCollectionCloner> _makeCollectionCloner(
- ReshardingMetrics* metrics,
ReshardingMetricsNew* metricsNew,
const CommonReshardingMetadata& metadata,
const ShardId& myShardId,
@@ -210,7 +207,6 @@ private:
static std::vector<std::unique_ptr<ReshardingOplogFetcher>> _makeOplogFetchers(
OperationContext* opCtx,
- ReshardingMetrics* metrics,
ReshardingMetricsNew* metricsNew,
const CommonReshardingMetadata& metadata,
const std::vector<DonorShardFetchTimestamp>& donorShards,
@@ -220,7 +216,6 @@ private:
static std::vector<std::unique_ptr<ReshardingOplogApplier>> _makeOplogAppliers(
OperationContext* opCtx,
- ReshardingMetrics* metrics,
ReshardingApplierMetricsMap* applierMetricsMap,
const CommonReshardingMetadata& metadata,
const std::vector<DonorShardFetchTimestamp>& donorShards,
diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp
index 60a094f8e5e..43d91e83b97 100644
--- a/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp
@@ -252,9 +252,7 @@ ReshardingDonorDocument constructDonorDocumentFromReshardingFields(
sourceUUID,
reshardingFields.getDonorFields()->getTempReshardingNss(),
reshardingFields.getDonorFields()->getReshardingKey().toBSON());
- if (ShardingDataTransformMetrics::isEnabled()) {
- commonMetadata.setStartTime(reshardingFields.getStartTime());
- }
+ commonMetadata.setStartTime(reshardingFields.getStartTime());
donorDoc.setCommonReshardingMetadata(std::move(commonMetadata));
return donorDoc;
@@ -285,13 +283,13 @@ ReshardingRecipientDocument constructRecipientDocumentFromReshardingFields(
sourceUUID,
nss,
metadata.getShardKeyPattern().toBSON());
- if (ShardingDataTransformMetrics::isEnabled()) {
- commonMetadata.setStartTime(reshardingFields.getStartTime());
- ReshardingRecipientMetrics metrics;
- metrics.setApproxDocumentsToCopy(recipientFields->getApproxDocumentsToCopy());
- metrics.setApproxBytesToCopy(recipientFields->getApproxBytesToCopy());
- recipientDoc.setMetrics(std::move(metrics));
- }
+ commonMetadata.setStartTime(reshardingFields.getStartTime());
+
+ ReshardingRecipientMetrics metrics;
+ metrics.setApproxDocumentsToCopy(recipientFields->getApproxDocumentsToCopy());
+ metrics.setApproxBytesToCopy(recipientFields->getApproxBytesToCopy());
+ recipientDoc.setMetrics(std::move(metrics));
+
recipientDoc.setCommonReshardingMetadata(std::move(commonMetadata));
return recipientDoc;
diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp
index 6db00e4dc7e..7f870033a1f 100644
--- a/src/mongo/db/s/resharding/resharding_donor_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp
@@ -51,7 +51,6 @@
#include "mongo/db/s/resharding/resharding_data_copy_util.h"
#include "mongo/db/s/resharding/resharding_donor_recipient_common.h"
#include "mongo/db/s/resharding/resharding_future_util.h"
-#include "mongo/db/s/resharding/resharding_metrics.h"
#include "mongo/db/s/resharding/resharding_server_parameters_gen.h"
#include "mongo/db/s/resharding/resharding_util.h"
#include "mongo/db/s/sharding_state.h"
@@ -184,39 +183,8 @@ public:
}
};
-ShardingDataTransformCumulativeMetrics::DonorStateEnum toMetricsState(DonorStateEnum enumVal) {
- using MetricsEnum = ShardingDataTransformCumulativeMetrics::DonorStateEnum;
-
- switch (enumVal) {
- case DonorStateEnum::kUnused:
- return MetricsEnum::kUnused;
-
- case DonorStateEnum::kPreparingToDonate:
- return MetricsEnum::kPreparingToDonate;
-
- case DonorStateEnum::kDonatingInitialData:
- return MetricsEnum::kDonatingInitialData;
-
- case DonorStateEnum::kDonatingOplogEntries:
- return MetricsEnum::kDonatingOplogEntries;
-
- case DonorStateEnum::kPreparingToBlockWrites:
- return MetricsEnum::kPreparingToBlockWrites;
-
- case DonorStateEnum::kError:
- return MetricsEnum::kError;
-
- case DonorStateEnum::kBlockingWrites:
- return MetricsEnum::kBlockingWrites;
-
- case DonorStateEnum::kDone:
- return MetricsEnum::kDone;
- default:
- invariant(false,
- str::stream() << "Unexpected resharding coordinator state: "
- << DonorState_serializer(enumVal));
- MONGO_UNREACHABLE;
- }
+ReshardingMetricsNew::DonorState toMetricsState(DonorStateEnum state) {
+ return ReshardingMetricsNew::DonorState(state);
}
} // namespace
@@ -241,9 +209,7 @@ ReshardingDonorService::DonorStateMachine::DonorStateMachine(
std::unique_ptr<DonorStateMachineExternalState> externalState)
: repl::PrimaryOnlyService::TypedInstance<DonorStateMachine>(),
_donorService(donorService),
- _metricsNew{ShardingDataTransformMetrics::isEnabled()
- ? ReshardingMetricsNew::initializeFrom(donorDoc, getGlobalServiceContext())
- : nullptr},
+ _metricsNew{ReshardingMetricsNew::initializeFrom(donorDoc, getGlobalServiceContext())},
_metadata{donorDoc.getCommonReshardingMetadata()},
_recipientShardIds{donorDoc.getRecipientShards()},
_donorCtx{donorDoc.getMutableState()},
@@ -267,9 +233,7 @@ ReshardingDonorService::DonorStateMachine::DonorStateMachine(
}()) {
invariant(_externalState);
- if (ShardingDataTransformMetrics::isEnabled()) {
- _metricsNew->onStateTransition(boost::none, toMetricsState(_donorCtx.getState()));
- }
+ _metricsNew->onStateTransition(boost::none, toMetricsState(_donorCtx.getState()));
}
ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::_runUntilBlockingWritesOrErrored(
@@ -421,10 +385,7 @@ ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::_finishReshardin
_critSecReason,
ShardingCatalogClient::kLocalWriteConcern);
- _metrics()->leaveCriticalSection(getCurrentTime());
- if (ShardingDataTransformMetrics::isEnabled()) {
- _metricsNew->onCriticalSectionEnd();
- }
+ _metricsNew->onCriticalSectionEnd();
}
auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
@@ -466,13 +427,7 @@ Status ReshardingDonorService::DonorStateMachine::_runMandatoryCleanup(
ensureFulfilledPromise(lk, _completionPromise, statusForPromise);
}
- if (stepdownToken.isCanceled()) {
- _metrics()->onStepDown(ReshardingMetrics::Role::kDonor);
- }
-
- if (ShardingDataTransformMetrics::isEnabled()) {
- _metricsNew->onStateTransition(toMetricsState(_donorCtx.getState()), boost::none);
- }
+ _metricsNew->onStateTransition(toMetricsState(_donorCtx.getState()), boost::none);
return status;
}
@@ -485,7 +440,6 @@ SemiFuture<void> ReshardingDonorService::DonorStateMachine::run(
_cancelableOpCtxFactory.emplace(abortToken, _markKilledExecutor);
return ExecutorFuture(**executor)
- .then([this] { _startMetrics(); })
.then([this, executor, abortToken] {
return _runUntilBlockingWritesOrErrored(executor, abortToken);
})
@@ -539,16 +493,7 @@ void ReshardingDonorService::DonorStateMachine::interrupt(Status status) {}
boost::optional<BSONObj> ReshardingDonorService::DonorStateMachine::reportForCurrentOp(
MongoProcessInterface::CurrentOpConnectionsMode connMode,
MongoProcessInterface::CurrentOpSessionsMode sessionMode) noexcept {
- if (ShardingDataTransformMetrics::isEnabled()) {
- return _metricsNew->reportForCurrentOp();
- }
-
- ReshardingMetrics::ReporterOptions options(ReshardingMetrics::Role::kDonor,
- _metadata.getReshardingUUID(),
- _metadata.getSourceNss(),
- _metadata.getReshardingKey().toBSON(),
- false);
- return _metrics()->reportForCurrentOp(options);
+ return _metricsNew->reportForCurrentOp();
}
void ReshardingDonorService::DonorStateMachine::onReshardingFieldsChanges(
@@ -576,16 +521,10 @@ void ReshardingDonorService::DonorStateMachine::onReshardingFieldsChanges(
}
void ReshardingDonorService::DonorStateMachine::onWriteDuringCriticalSection() {
- if (!ShardingDataTransformMetrics::isEnabled()) {
- return;
- }
_metricsNew->onWriteDuringCriticalSection();
}
void ReshardingDonorService::DonorStateMachine::onReadDuringCriticalSection() {
- if (!ShardingDataTransformMetrics::isEnabled()) {
- return;
- }
_metricsNew->onReadDuringCriticalSection();
}
@@ -751,10 +690,7 @@ void ReshardingDonorService::DonorStateMachine::
_critSecReason,
ShardingCatalogClient::kLocalWriteConcern);
- _metrics()->enterCriticalSection(getCurrentTime());
- if (ShardingDataTransformMetrics::isEnabled()) {
- _metricsNew->onCriticalSectionBegin();
- }
+ _metricsNew->onCriticalSectionBegin();
}
{
@@ -891,11 +827,8 @@ void ReshardingDonorService::DonorStateMachine::_transitionState(DonorShardConte
auto newState = newDonorCtx.getState();
_updateDonorDocument(std::move(newDonorCtx));
- _metrics()->setDonorState(newState);
- if (ShardingDataTransformMetrics::isEnabled()) {
- _metricsNew->onStateTransition(toMetricsState(oldState), toMetricsState(newState));
- }
+ _metricsNew->onStateTransition(toMetricsState(oldState), toMetricsState(newState));
LOGV2_INFO(5279505,
"Transitioned resharding donor state",
@@ -1076,14 +1009,6 @@ void ReshardingDonorService::DonorStateMachine::_removeDonorDocument(
opCtx->recoveryUnit()->onCommit([this, stepdownToken, aborted](boost::optional<Timestamp>) {
stdx::lock_guard<Latch> lk(_mutex);
-
- if (!stepdownToken.isCanceled()) {
- _metrics()->onCompletion(ReshardingMetrics::Role::kDonor,
- aborted ? ReshardingOperationStatusEnum::kFailure
- : ReshardingOperationStatusEnum::kSuccess,
- getCurrentTime());
- }
-
_completionPromise.emplaceValue();
});
@@ -1098,19 +1023,6 @@ void ReshardingDonorService::DonorStateMachine::_removeDonorDocument(
});
}
-ReshardingMetrics* ReshardingDonorService::DonorStateMachine::_metrics() const {
- return ReshardingMetrics::get(cc().getServiceContext());
-}
-
-void ReshardingDonorService::DonorStateMachine::_startMetrics() {
- auto donorState = _donorCtx.getState();
- if (donorState > DonorStateEnum::kPreparingToDonate) {
- _metrics()->onStepUp(donorState, _donorMetricsToRestore);
- } else {
- _metrics()->onStart(ReshardingMetrics::Role::kDonor, getCurrentTime());
- }
-}
-
CancellationToken ReshardingDonorService::DonorStateMachine::_initAbortSource(
const CancellationToken& stepdownToken) {
{
diff --git a/src/mongo/db/s/resharding/resharding_donor_service.h b/src/mongo/db/s/resharding/resharding_donor_service.h
index 881e74475ec..f2f4d99d2e8 100644
--- a/src/mongo/db/s/resharding/resharding_donor_service.h
+++ b/src/mongo/db/s/resharding/resharding_donor_service.h
@@ -32,7 +32,6 @@
#include "mongo/db/cancelable_operation_context.h"
#include "mongo/db/repl/primary_only_service.h"
#include "mongo/db/s/resharding/donor_document_gen.h"
-#include "mongo/db/s/resharding/resharding_metrics.h"
#include "mongo/db/s/resharding/resharding_metrics_new.h"
#include "mongo/s/resharding/type_collection_fields_gen.h"
@@ -209,12 +208,6 @@ private:
// Removes the local donor document from disk.
void _removeDonorDocument(const CancellationToken& stepdownToken, bool aborted);
- // Accesses the ReshardingMetrics module for this donor's underlying mongod process.
- ReshardingMetrics* _metrics() const;
-
- // Starts the metrics subsystem for this donor's underlying mongod process.
- void _startMetrics();
-
// Initializes the _abortSource and generates a token from it to return back the caller. If an
// abort was reported prior to the initialization, automatically cancels the _abortSource before
// returning the token.
diff --git a/src/mongo/db/s/resharding/resharding_donor_service_test.cpp b/src/mongo/db/s/resharding/resharding_donor_service_test.cpp
index 658c9500881..0f40919d14d 100644
--- a/src/mongo/db/s/resharding/resharding_donor_service_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_service_test.cpp
@@ -154,6 +154,7 @@ public:
sourceUUID,
constructTemporaryReshardingNss(sourceNss.db(), sourceUUID),
BSON("newKey" << 1));
+ commonMetadata.setStartTime(getServiceContext()->getFastClockSource()->now());
doc.setCommonReshardingMetadata(std::move(commonMetadata));
return doc;
@@ -736,22 +737,11 @@ TEST_F(ReshardingDonorServiceTest, RestoreMetricsOnKBlockingWrites) {
};
doc.setMutableState(makeDonorCtx());
- auto makeMetricsTimeInterval = [&](const Date_t startTime) {
- ReshardingMetricsTimeInterval timeInterval;
- timeInterval.setStart(startTime);
- return timeInterval;
- };
-
- auto timeNow = Date_t::now();
- auto opTimeDurationSecs = 60;
- auto critSecDurationSecs = 10;
-
- ReshardingDonorMetrics reshardingDonorMetrics;
- reshardingDonorMetrics.setOperationRuntime(
- makeMetricsTimeInterval(timeNow - Seconds(opTimeDurationSecs)));
- reshardingDonorMetrics.setCriticalSection(
- makeMetricsTimeInterval(timeNow - Seconds(critSecDurationSecs)));
- doc.setMetrics(reshardingDonorMetrics);
+ auto timeNow = getServiceContext()->getFastClockSource()->now();
+ const auto opTimeDurationSecs = 60;
+ auto commonMetadata = doc.getCommonReshardingMetadata();
+ commonMetadata.setStartTime(timeNow - Seconds(opTimeDurationSecs));
+ doc.setCommonReshardingMetadata(std::move(commonMetadata));
createSourceCollection(opCtx.get(), doc);
DonorStateMachine::insertStateDocument(opCtx.get(), doc);
@@ -779,7 +769,6 @@ TEST_F(ReshardingDonorServiceTest, RestoreMetricsOnKBlockingWrites) {
ASSERT_EQ(currOp.getStringField("donorState"),
DonorState_serializer(DonorStateEnum::kBlockingWrites));
ASSERT_GTE(currOp.getField("totalOperationTimeElapsedSecs").Long(), opTimeDurationSecs);
- ASSERT_GTE(currOp.getField("totalCriticalSectionTimeElapsedSecs").Long(), critSecDurationSecs);
stateTransitionsGuard.unset(kDoneState);
ASSERT_OK(donor->getCompletionFuture().getNoThrow());
diff --git a/src/mongo/db/s/resharding/resharding_metrics.cpp b/src/mongo/db/s/resharding/resharding_metrics.cpp
deleted file mode 100644
index 627f36a89e4..00000000000
--- a/src/mongo/db/s/resharding/resharding_metrics.cpp
+++ /dev/null
@@ -1,782 +0,0 @@
-/**
- * Copyright (C) 2020-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-
-#include <algorithm>
-#include <memory>
-
-#include "mongo/db/s/resharding/resharding_metrics.h"
-#include "mongo/db/s/resharding/resharding_util.h"
-#include "mongo/logv2/log.h"
-#include "mongo/platform/compiler.h"
-#include "mongo/util/aligned.h"
-#include "mongo/util/duration.h"
-#include "mongo/util/histogram.h"
-
-#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kResharding
-
-
-namespace mongo {
-
-namespace {
-constexpr auto kAnotherOperationInProgress = "Another operation is in progress";
-constexpr auto kNoOperationInProgress = "No operation is in progress";
-constexpr auto kMetricsSetBeforeRestore = "Expected metrics to be 0 prior to restore";
-
-constexpr auto kTotalOps = "countReshardingOperations";
-constexpr auto kSuccessfulOps = "countReshardingSuccessful";
-constexpr auto kFailedOps = "countReshardingFailures";
-constexpr auto kCanceledOps = "countReshardingCanceled";
-constexpr auto kOpTimeElapsed = "totalOperationTimeElapsedSecs";
-constexpr auto kOpTimeRemaining = "remainingOperationTimeEstimatedSecs";
-constexpr auto kDocumentsToCopy = "approxDocumentsToCopy";
-constexpr auto kDocumentsCopied = "documentsCopied";
-constexpr auto kBytesToCopy = "approxBytesToCopy";
-constexpr auto kBytesCopied = "bytesCopied";
-constexpr auto kCopyTimeElapsed = "totalCopyTimeElapsedSecs";
-constexpr auto kOplogsFetched = "oplogEntriesFetched";
-constexpr auto kOplogsApplied = "oplogEntriesApplied";
-constexpr auto kApplyTimeElapsed = "totalApplyTimeElapsedSecs";
-constexpr auto kWritesDuringCritialSection = "countWritesDuringCriticalSection";
-constexpr auto kCriticalSectionTimeElapsed = "totalCriticalSectionTimeElapsedSecs";
-constexpr auto kCoordinatorState = "coordinatorState";
-constexpr auto kDonorState = "donorState";
-constexpr auto kRecipientState = "recipientState";
-constexpr auto kOpStatus = "opStatus";
-constexpr auto kLastOpEndingChunkImbalance = "lastOpEndingChunkImbalance";
-constexpr auto kOpCounters = "opcounters";
-constexpr auto kMinRemainingOperationTime = "minShardRemainingOperationTimeEstimatedMillis";
-constexpr auto kMaxRemainingOperationTime = "maxShardRemainingOperationTimeEstimatedMillis";
-constexpr auto kOplogApplierApplyBatchLatencyMillis = "oplogApplierApplyBatchLatencyMillis";
-constexpr auto kCollClonerFillBatchForInsertLatencyMillis =
- "collClonerFillBatchForInsertLatencyMillis";
-
-using MetricsPtr = std::unique_ptr<ReshardingMetrics>;
-
-const auto getMetrics = ServiceContext::declareDecoration<MetricsPtr>();
-
-const auto reshardingMetricsRegisterer = ServiceContext::ConstructorActionRegisterer{
- "ReshardingMetrics",
- [](ServiceContext* ctx) { getMetrics(ctx) = std::make_unique<ReshardingMetrics>(ctx); }};
-
-static StringData serializeState(boost::optional<RecipientStateEnum> e) {
- return RecipientState_serializer(*e);
-}
-
-static StringData serializeState(boost::optional<DonorStateEnum> e) {
- return DonorState_serializer(*e);
-}
-
-static StringData serializeState(boost::optional<CoordinatorStateEnum> e) {
- return CoordinatorState_serializer(*e);
-}
-
-// Enables resharding to distinguish the ops it does to keep up with client writes from the
-// client workload tracked in the globalOpCounters.
-class ReshardingOpCounters {
-public:
- void gotInserts(int n) noexcept {
- _checkWrap(&ReshardingOpCounters::_insert, n);
- }
- void gotInsert() noexcept {
- _checkWrap(&ReshardingOpCounters::_insert, 1);
- }
- void gotUpdate() noexcept {
- _checkWrap(&ReshardingOpCounters::_update, 1);
- }
- void gotDelete() noexcept {
- _checkWrap(&ReshardingOpCounters::_delete, 1);
- }
-
- BSONObj getObj() const noexcept {
- BSONObjBuilder b;
- b.append("insert", _insert->loadRelaxed());
- b.append("update", _update->loadRelaxed());
- b.append("delete", _delete->loadRelaxed());
- return b.obj();
- }
-
-private:
- // Increment member `counter` by n, resetting all counters if it was > 2^60.
- void _checkWrap(CacheExclusive<AtomicWord<long long>> ReshardingOpCounters::*counter, int n) {
- static constexpr auto maxCount = 1LL << 60;
- auto oldValue = (this->*counter)->fetchAndAddRelaxed(n);
- if (oldValue > maxCount - n) {
- LOGV2(5776000,
- "ReshardingOpCounters exceeded maximum value, resetting all to 0",
- "insert"_attr = _insert->loadRelaxed(),
- "update"_attr = _update->loadRelaxed(),
- "delete"_attr = _delete->loadRelaxed());
- _insert->store(0);
- _update->store(0);
- _delete->store(0);
- }
- }
-
- CacheExclusive<AtomicWord<long long>> _insert;
- CacheExclusive<AtomicWord<long long>> _update;
- CacheExclusive<AtomicWord<long long>> _delete;
-};
-
-// Allows tracking elapsed time for the resharding operation and its sub operations (e.g.,
-// applying oplog entries).
-class TimeInterval {
-public:
- void start(Date_t d) noexcept {
- if (_start) {
- LOGV2_WARNING(5892600, "Resharding metrics already started, start() is a no-op");
- return;
- }
- _start = d;
- }
-
- void end(Date_t d) noexcept {
- invariant(_start, "Not started");
- if (_end) {
- LOGV2_WARNING(5892601, "Resharding metrics already ended, end() is a no-op");
- return;
- }
- _end = d;
- }
-
- // TODO Remove this function once all metrics classes can start from stepup.
- void forceEnd(Date_t d) noexcept {
- if (!_start)
- _start = d;
- end(d);
- }
-
- Milliseconds duration(Date_t now) const noexcept {
- return !_start ? Milliseconds(0) : ((!_end ? now : *_end) - *_start);
- }
-
-private:
- boost::optional<Date_t> _start;
- boost::optional<Date_t> _end;
-};
-
-} // namespace
-
-class ReshardingMetrics::OperationMetrics {
-public:
- void appendCurrentOpMetrics(BSONObjBuilder*, Role, Date_t now) const;
-
- boost::optional<Milliseconds> remainingOperationTime(Date_t now) const;
-
- void gotInserts(int n) noexcept;
- void gotInsert() noexcept;
- void gotUpdate() noexcept;
- void gotDelete() noexcept;
-
- TimeInterval runningOperation;
- ReshardingOperationStatusEnum opStatus = ReshardingOperationStatusEnum::kInactive;
-
- TimeInterval copyingDocuments;
- int64_t documentsToCopy = 0;
- int64_t documentsCopied = 0;
- int64_t bytesToCopy = 0;
- int64_t bytesCopied = 0;
-
- TimeInterval applyingOplogEntries;
- int64_t oplogEntriesFetched = 0;
- int64_t oplogEntriesApplied = 0;
-
- TimeInterval inCriticalSection;
- int64_t writesDuringCriticalSection = 0;
-
- int64_t chunkImbalanceCount = 0;
-
- Histogram<int64_t> oplogApplierApplyBatchLatencyMillis = getLatencyHistogram();
- Histogram<int64_t> collClonerFillBatchForInsertLatencyMillis = getLatencyHistogram();
-
- // The ops done by resharding to keep up with the client writes.
- ReshardingOpCounters opCounters;
-
- Milliseconds minRemainingOperationTime = Milliseconds(0);
- Milliseconds maxRemainingOperationTime = Milliseconds(0);
-
- boost::optional<DonorStateEnum> donorState;
- boost::optional<RecipientStateEnum> recipientState;
- boost::optional<CoordinatorStateEnum> coordinatorState;
-};
-
-void ReshardingMetrics::OperationMetrics::gotInserts(int n) noexcept {
- opCounters.gotInserts(n);
-}
-
-void ReshardingMetrics::OperationMetrics::gotInsert() noexcept {
- opCounters.gotInsert();
-}
-
-void ReshardingMetrics::OperationMetrics::gotUpdate() noexcept {
- opCounters.gotUpdate();
-}
-
-void ReshardingMetrics::OperationMetrics::gotDelete() noexcept {
- opCounters.gotDelete();
-}
-
-boost::optional<Milliseconds> ReshardingMetrics::OperationMetrics::remainingOperationTime(
- Date_t now) const {
- return estimateRemainingRecipientTime(recipientState > RecipientStateEnum::kCloning,
- bytesCopied,
- bytesToCopy,
- copyingDocuments.duration(now),
- oplogEntriesApplied,
- oplogEntriesFetched,
- applyingOplogEntries.duration(now));
-}
-
-void ReshardingMetrics::OperationMetrics::appendCurrentOpMetrics(BSONObjBuilder* bob,
- Role role,
- Date_t now) const {
- auto getElapsedTime = [&](const TimeInterval& interval) -> int64_t {
- return durationCount<Seconds>(interval.duration(now));
- };
-
- const auto remainingMsec = remainingOperationTime(now);
-
- bob->append(kOpTimeElapsed, getElapsedTime(runningOperation));
-
- switch (role) {
- case Role::kDonor:
- bob->append(kWritesDuringCritialSection, writesDuringCriticalSection);
- bob->append(kCriticalSectionTimeElapsed, getElapsedTime(inCriticalSection));
- bob->append(kDonorState,
- serializeState(donorState.get_value_or(DonorStateEnum::kUnused)));
- bob->append(kOpStatus, ReshardingOperationStatus_serializer(opStatus));
- break;
- case Role::kRecipient:
- bob->append(kOpTimeRemaining,
- !remainingMsec ? int64_t{-1} /** -1 is a specified integer null value */
- : durationCount<Seconds>(*remainingMsec));
- bob->append(kDocumentsToCopy, documentsToCopy);
- bob->append(kDocumentsCopied, documentsCopied);
- bob->append(kBytesToCopy, bytesToCopy);
- bob->append(kBytesCopied, bytesCopied);
- bob->append(kCopyTimeElapsed, getElapsedTime(copyingDocuments));
-
- bob->append(kOplogsFetched, oplogEntriesFetched);
- bob->append(kOplogsApplied, oplogEntriesApplied);
- bob->append(kApplyTimeElapsed, getElapsedTime(applyingOplogEntries));
- bob->append(kRecipientState,
- serializeState(recipientState.get_value_or(RecipientStateEnum::kUnused)));
- bob->append(kOpStatus, ReshardingOperationStatus_serializer(opStatus));
-
- appendHistogram(
- *bob, oplogApplierApplyBatchLatencyMillis, kOplogApplierApplyBatchLatencyMillis);
- appendHistogram(*bob,
- collClonerFillBatchForInsertLatencyMillis,
- kCollClonerFillBatchForInsertLatencyMillis);
- break;
- case Role::kCoordinator:
- bob->append(kCoordinatorState, serializeState(coordinatorState));
- bob->append(kOpStatus, ReshardingOperationStatus_serializer(opStatus));
- break;
- default:
- MONGO_UNREACHABLE;
- }
-}
-
-ReshardingMetrics* ReshardingMetrics::get(ServiceContext* ctx) noexcept {
- return getMetrics(ctx).get();
-}
-
-void ReshardingMetrics::onStart(Role role, Date_t runningOperationStartTime) noexcept {
- stdx::lock_guard<Latch> lk(_mutex);
- // TODO Re-add this invariant once all breaking test cases have been fixed.
- // invariant(!_currentOp.has_value(), kAnotherOperationInProgress);
-
- if (!_currentOp) {
- // Only incremement _started if this is the first time resharding metrics is being invoked
- // for this resharding operation, and we're not restoring the PrimaryOnlyService from disk.
- _started++;
- }
-
- // Create a new operation and record the time it started.
- _emplaceCurrentOpForRole(role, runningOperationStartTime);
-}
-
-void ReshardingMetrics::onCompletion(Role role,
- ReshardingOperationStatusEnum status,
- Date_t runningOperationEndTime) noexcept {
- stdx::lock_guard<Latch> lk(_mutex);
- // TODO Re-add this invariant once all breaking test cases have been fixed. Add invariant that
- // role being completed is a role that is in progress.
- // invariant(_currentOp.has_value(), kNoOperationInProgress);
-
- // Reset the cumulative min and max remaining operation time to 0. Only coordinators
- // will need to report this information, so only reset it for coordinators.
- if (role == ReshardingMetrics::Role::kCoordinator) {
- _cumulativeOp->minRemainingOperationTime = Milliseconds(0);
- _cumulativeOp->maxRemainingOperationTime = Milliseconds(0);
- }
-
- if (!_currentOp) {
- return;
- }
-
- if (_currentOp->donorState && _currentOp->recipientState) {
- switch (role) {
- case Role::kDonor:
- _currentOp->donorState = boost::none;
- break;
- case Role::kRecipient:
- _currentOp->recipientState = boost::none;
- break;
- default:
- MONGO_UNREACHABLE;
- }
-
- return;
- }
-
- switch (status) {
- case ReshardingOperationStatusEnum::kSuccess:
- _succeeded++;
- break;
- case ReshardingOperationStatusEnum::kFailure:
- _failed++;
- break;
- case ReshardingOperationStatusEnum::kCanceled:
- _canceled++;
- break;
- default:
- MONGO_UNREACHABLE;
- }
-
- _currentOp->runningOperation.end(runningOperationEndTime);
-
- // Reset current op metrics.
- _currentOp = nullptr;
-}
-
-void ReshardingMetrics::onStepUp(Role role) noexcept {
- stdx::lock_guard<Latch> lk(_mutex);
- _emplaceCurrentOpForRole(role, boost::none);
- _onStepUpCalled = true;
- // TODO SERVER-53914 Implement coordinator metrics rehydration.
-
- // TODO SERVER-57094 Resume the runningOperation duration from a timestamp stored on disk
- // instead of starting from the current time.
-}
-
-void ReshardingMetrics::onStepUp(DonorStateEnum state, ReshardingDonorMetrics donorMetrics) {
- stdx::lock_guard<Latch> lk(_mutex);
- auto operationRuntime = donorMetrics.getOperationRuntime();
- _emplaceCurrentOpForRole(
- Role::kDonor, operationRuntime.has_value() ? operationRuntime->getStart() : boost::none);
- _currentOp->donorState = state;
- _onStepUpCalled = true;
-
- if (auto criticalSectionTimeInterval = donorMetrics.getCriticalSection();
- criticalSectionTimeInterval.has_value() &&
- criticalSectionTimeInterval->getStart().has_value()) {
- _currentOp->inCriticalSection.start(criticalSectionTimeInterval->getStart().get());
-
- if (auto stopTime = criticalSectionTimeInterval->getStop(); stopTime.has_value())
- _currentOp->inCriticalSection.forceEnd(stopTime.get());
- }
-}
-
-void ReshardingMetrics::onStepDown(Role role) noexcept {
- stdx::lock_guard<Latch> lk(_mutex);
- if (_currentOp && _currentOp->donorState && _currentOp->recipientState) {
- switch (role) {
- case Role::kDonor:
- _currentOp->donorState = boost::none;
- break;
- case Role::kRecipient:
- _currentOp->recipientState = boost::none;
- break;
- default:
- MONGO_UNREACHABLE;
- }
- } else {
- _currentOp = nullptr;
- }
-}
-
-void ReshardingMetrics::_emplaceCurrentOpForRole(
- Role role, boost::optional<Date_t> runningOperationStartTime) noexcept {
- // Invariants in this function ensure that the only multi-role state allowed is a combination
- // of donor and recipient.
- if (!_currentOp) {
- _currentOp = std::make_unique<OperationMetrics>();
- _currentOp->runningOperation.start(runningOperationStartTime
- ? *runningOperationStartTime
- : _svcCtx->getFastClockSource()->now());
- _currentOp->opStatus = ReshardingOperationStatusEnum::kRunning;
- } else {
- invariant(role != Role::kCoordinator, kAnotherOperationInProgress);
- invariant(!_currentOp->coordinatorState, kAnotherOperationInProgress);
- }
-
- switch (role) {
- case Role::kCoordinator:
- _currentOp->coordinatorState.emplace(CoordinatorStateEnum::kUnused);
- break;
- case Role::kDonor:
- invariant(!_currentOp->donorState, kAnotherOperationInProgress);
- _currentOp->donorState.emplace(DonorStateEnum::kUnused);
- break;
- case Role::kRecipient:
- invariant(!_currentOp->recipientState, kAnotherOperationInProgress);
- _currentOp->recipientState.emplace(RecipientStateEnum::kUnused);
- break;
- default:
- MONGO_UNREACHABLE
- }
-}
-
-void ReshardingMetrics::setDonorState(DonorStateEnum state) noexcept {
- stdx::lock_guard<Latch> lk(_mutex);
- invariant(_currentOp, kNoOperationInProgress);
-
- const auto oldState = std::exchange(_currentOp->donorState, state);
- invariant(oldState != state);
-}
-
-void ReshardingMetrics::setRecipientState(RecipientStateEnum state) noexcept {
- stdx::lock_guard<Latch> lk(_mutex);
- invariant(_currentOp, kNoOperationInProgress);
-
- const auto oldState = std::exchange(_currentOp->recipientState, state);
- invariant(oldState != state);
-}
-
-void ReshardingMetrics::setCoordinatorState(CoordinatorStateEnum state) noexcept {
- stdx::lock_guard<Latch> lk(_mutex);
- invariant(_currentOp, kNoOperationInProgress);
- _currentOp->coordinatorState = state;
-}
-
-template <typename T>
-static bool checkState(T state, std::initializer_list<T> validStates) {
- invariant(validStates.size());
- if (std::find(validStates.begin(), validStates.end(), state) != validStates.end())
- return true;
-
- std::stringstream ss;
- StringData sep = "";
- for (auto s : validStates) {
- ss << sep << serializeState(s);
- sep = ", "_sd;
- }
-
- LOGV2_FATAL_CONTINUE(5553300,
- "Invalid resharding state",
- "state"_attr = serializeState(state),
- "valid"_attr = ss.str());
- return false;
-}
-
-void ReshardingMetrics::setDocumentsToCopy(int64_t documents, int64_t bytes) noexcept {
- stdx::lock_guard<Latch> lk(_mutex);
- invariant(_currentOp, kNoOperationInProgress);
- invariant(_currentOp->recipientState == RecipientStateEnum::kCreatingCollection);
-
- setDocumentsToCopyForCurrentOp(documents, bytes);
-}
-
-void ReshardingMetrics::setDocumentsToCopyForCurrentOp(int64_t documents, int64_t bytes) noexcept {
- invariant(_currentOp, kNoOperationInProgress);
-
- _currentOp->documentsToCopy = documents;
- _currentOp->bytesToCopy = bytes;
-}
-
-void ReshardingMetrics::setLastReshardChunkImbalanceCount(int64_t newCount) noexcept {
- stdx::lock_guard<Latch> lk(_mutex);
-
- invariant(_currentOp, kNoOperationInProgress);
- invariant(_currentOp->coordinatorState);
-
- _cumulativeOp->chunkImbalanceCount = newCount;
-}
-
-void ReshardingMetrics::setMinRemainingOperationTime(Milliseconds minTime) noexcept {
- stdx::lock_guard<Latch> lk(_mutex);
- if (_currentOp) {
- _cumulativeOp->minRemainingOperationTime = minTime;
- }
-}
-
-void ReshardingMetrics::setMaxRemainingOperationTime(Milliseconds maxTime) noexcept {
- stdx::lock_guard<Latch> lk(_mutex);
- if (_currentOp) {
- _cumulativeOp->maxRemainingOperationTime = maxTime;
- }
-}
-
-void ReshardingMetrics::onDocumentsCopied(int64_t documents, int64_t bytes) noexcept {
- stdx::lock_guard<Latch> lk(_mutex);
- if (!_currentOp)
- return;
-
- invariant(checkState(*_currentOp->recipientState,
- {RecipientStateEnum::kCloning, RecipientStateEnum::kError}));
-
- _currentOp->documentsCopied += documents;
- _currentOp->bytesCopied += bytes;
- _cumulativeOp->documentsCopied += documents;
- _cumulativeOp->bytesCopied += bytes;
-}
-
-void ReshardingMetrics::gotInserts(int n) noexcept {
- _cumulativeOp->gotInserts(n);
-}
-
-void ReshardingMetrics::onOplogApplierApplyBatch(Milliseconds latency) {
- stdx::lock_guard<Latch> lk(_mutex);
- invariant(_currentOp, kNoOperationInProgress);
- invariant(checkState(*_currentOp->recipientState,
- {RecipientStateEnum::kApplying, RecipientStateEnum::kError}));
-
- _currentOp->oplogApplierApplyBatchLatencyMillis.increment(durationCount<Milliseconds>(latency));
- _cumulativeOp->oplogApplierApplyBatchLatencyMillis.increment(
- durationCount<Milliseconds>(latency));
-}
-
-void ReshardingMetrics::onCollClonerFillBatchForInsert(Milliseconds latency) {
- stdx::lock_guard<Latch> lk(_mutex);
- invariant(_currentOp, kNoOperationInProgress);
- invariant(checkState(*_currentOp->recipientState,
- {RecipientStateEnum::kCloning, RecipientStateEnum::kError}));
-
- _currentOp->collClonerFillBatchForInsertLatencyMillis.increment(
- durationCount<Milliseconds>(latency));
- _cumulativeOp->collClonerFillBatchForInsertLatencyMillis.increment(
- durationCount<Milliseconds>(latency));
-}
-
-void ReshardingMetrics::gotInsert() noexcept {
- _cumulativeOp->gotInsert();
-}
-
-void ReshardingMetrics::gotUpdate() noexcept {
- _cumulativeOp->gotUpdate();
-}
-
-void ReshardingMetrics::gotDelete() noexcept {
- _cumulativeOp->gotDelete();
-}
-
-void ReshardingMetrics::startCopyingDocuments(Date_t start) {
- stdx::lock_guard<Latch> lk(_mutex);
- _currentOp->copyingDocuments.start(start);
-}
-
-void ReshardingMetrics::endCopyingDocuments(Date_t end) {
- stdx::lock_guard<Latch> lk(_mutex);
- _currentOp->copyingDocuments.forceEnd(end);
-}
-
-void ReshardingMetrics::startApplyingOplogEntries(Date_t start) {
- stdx::lock_guard<Latch> lk(_mutex);
- _currentOp->applyingOplogEntries.start(start);
-}
-
-void ReshardingMetrics::endApplyingOplogEntries(Date_t end) {
- stdx::lock_guard<Latch> lk(_mutex);
- _currentOp->applyingOplogEntries.forceEnd(end);
-}
-
-void ReshardingMetrics::enterCriticalSection(Date_t start) {
- stdx::lock_guard<Latch> lk(_mutex);
- _currentOp->inCriticalSection.start(start);
-}
-
-void ReshardingMetrics::leaveCriticalSection(Date_t end) {
- stdx::lock_guard<Latch> lk(_mutex);
- _currentOp->inCriticalSection.forceEnd(end);
-}
-
-void ReshardingMetrics::onOplogEntriesFetched(int64_t entries) noexcept {
- stdx::lock_guard<Latch> lk(_mutex);
- if (!_currentOp)
- return;
-
- invariant(checkState(
- *_currentOp->recipientState,
- {RecipientStateEnum::kCloning, RecipientStateEnum::kApplying, RecipientStateEnum::kError}));
-
- _currentOp->oplogEntriesFetched += entries;
- _cumulativeOp->oplogEntriesFetched += entries;
-}
-
-void ReshardingMetrics::onOplogEntriesApplied(int64_t entries) noexcept {
- stdx::lock_guard<Latch> lk(_mutex);
- if (!_currentOp)
- return;
-
- invariant(checkState(*_currentOp->recipientState,
- {RecipientStateEnum::kApplying, RecipientStateEnum::kError}));
-
- _currentOp->oplogEntriesApplied += entries;
- _cumulativeOp->oplogEntriesApplied += entries;
-}
-
-void ReshardingMetrics::restoreForCurrentOp(int64_t documentCountCopied,
- int64_t documentBytesCopied,
- int64_t oplogEntriesFetched,
- int64_t oplogEntriesApplied) noexcept {
- invariant(_currentOp, kNoOperationInProgress);
- invariant(_currentOp->documentsCopied == 0, kMetricsSetBeforeRestore);
- invariant(_currentOp->bytesCopied == 0, kMetricsSetBeforeRestore);
- invariant(_currentOp->oplogEntriesFetched == 0, kMetricsSetBeforeRestore);
- invariant(_currentOp->oplogEntriesApplied == 0, kMetricsSetBeforeRestore);
-
- _currentOp->documentsCopied = documentCountCopied;
- _currentOp->bytesCopied = documentBytesCopied;
- _currentOp->oplogEntriesFetched = oplogEntriesFetched;
- _currentOp->oplogEntriesApplied = oplogEntriesApplied;
-}
-
-void ReshardingMetrics::onWriteDuringCriticalSection(int64_t writes) noexcept {
- stdx::lock_guard<Latch> lk(_mutex);
- if (!_currentOp)
- return;
-
- invariant(checkState(*_currentOp->donorState,
- {DonorStateEnum::kPreparingToBlockWrites,
- DonorStateEnum::kBlockingWrites,
- DonorStateEnum::kError}));
-
- onWriteDuringCriticalSectionForCurrentOp(writes);
- _cumulativeOp->writesDuringCriticalSection += writes;
-}
-
-void ReshardingMetrics::onWriteDuringCriticalSectionForCurrentOp(int64_t writes) noexcept {
- invariant(_currentOp, kNoOperationInProgress);
-
- _currentOp->writesDuringCriticalSection += writes;
-}
-
-void ReshardingMetrics::serializeCurrentOpMetrics(BSONObjBuilder* bob, Role role) const {
- stdx::lock_guard<Latch> lk(_mutex);
- if (_currentOp)
- _currentOp->appendCurrentOpMetrics(bob, role, _now());
-}
-
-BSONObj ReshardingMetrics::reportForCurrentOp(const ReporterOptions& options) const noexcept {
- auto roleName = [](Role role) {
- switch (role) {
- case Role::kDonor:
- return "Donor"_sd;
- case Role::kRecipient:
- return "Recipient"_sd;
- case Role::kCoordinator:
- return "Coordinator"_sd;
- }
- MONGO_UNREACHABLE;
- };
-
- BSONObjBuilder bob;
- bob.append("type", "op");
- bob.append(
- "desc",
- fmt::format("Resharding{}Service {}", roleName(options.role), options.id.toString()));
- bob.append("op", "command");
- bob.append("ns", options.nss.toString());
-
- {
- BSONObjBuilder originating{bob.subobjStart("originatingCommand")};
- originating.append("reshardCollection", options.nss.toString());
- originating.append("key", options.shardKey);
- originating.append("unique", options.unique);
- BSONObjBuilder{originating.subobjStart("collation")}.append("locale", "simple");
- }
-
- serializeCurrentOpMetrics(&bob, options.role);
-
- return bob.obj();
-}
-
-boost::optional<Milliseconds> ReshardingMetrics::getOperationElapsedTime() const {
- stdx::lock_guard<Latch> lk(_mutex);
- if (!_currentOp)
- return boost::none;
- return _currentOp->runningOperation.duration(_now());
-}
-
-boost::optional<Milliseconds> ReshardingMetrics::getOperationRemainingTime() const {
- stdx::lock_guard<Latch> lk(_mutex);
- if (_currentOp)
- return _currentOp->remainingOperationTime(_now());
- return boost::none;
-}
-
-bool ReshardingMetrics::wasReshardingEverAttempted() const {
- stdx::lock_guard<Latch> lk(_mutex);
- return _started != 0 || _succeeded != 0 || _failed != 0 || _canceled != 0 || _onStepUpCalled;
-}
-
-void ReshardingMetrics::serializeCumulativeOpMetrics(BSONObjBuilder* bob) const {
- stdx::lock_guard<Latch> lk(_mutex);
-
- auto getRemainingOperationTime = [&](const Milliseconds& time) -> int64_t {
- return durationCount<Milliseconds>(time);
- };
-
- bob->append(kTotalOps, _started);
- bob->append(kSuccessfulOps, _succeeded);
- bob->append(kFailedOps, _failed);
- bob->append(kCanceledOps, _canceled);
-
- const auto& ops = *_cumulativeOp;
- bob->append(kDocumentsCopied, ops.documentsCopied);
- bob->append(kBytesCopied, ops.bytesCopied);
- bob->append(kOplogsApplied, ops.oplogEntriesApplied);
- bob->append(kWritesDuringCritialSection, ops.writesDuringCriticalSection);
- bob->append(kOplogsFetched, ops.oplogEntriesFetched);
- bob->append(kLastOpEndingChunkImbalance, ops.chunkImbalanceCount);
- bob->append(kOpCounters, ops.opCounters.getObj());
- bob->append(kMinRemainingOperationTime,
- getRemainingOperationTime(ops.minRemainingOperationTime));
- bob->append(kMaxRemainingOperationTime,
- getRemainingOperationTime(ops.maxRemainingOperationTime));
-
- appendHistogram(
- *bob, ops.oplogApplierApplyBatchLatencyMillis, kOplogApplierApplyBatchLatencyMillis);
- appendHistogram(*bob,
- ops.collClonerFillBatchForInsertLatencyMillis,
- kCollClonerFillBatchForInsertLatencyMillis);
-}
-
-Date_t ReshardingMetrics::_now() const {
- return _svcCtx->getFastClockSource()->now();
-}
-
-ReshardingMetrics::ReshardingMetrics(ServiceContext* svcCtx)
- : _svcCtx{svcCtx}, _cumulativeOp{std::make_unique<OperationMetrics>()} {}
-
-ReshardingMetrics::~ReshardingMetrics() = default;
-
-} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_metrics.h b/src/mongo/db/s/resharding/resharding_metrics.h
deleted file mode 100644
index a6964c9d611..00000000000
--- a/src/mongo/db/s/resharding/resharding_metrics.h
+++ /dev/null
@@ -1,203 +0,0 @@
-/**
- * Copyright (C) 2020-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <boost/optional.hpp>
-
-#include "mongo/bson/bsonobj.h"
-#include "mongo/bson/bsonobjbuilder.h"
-#include "mongo/db/namespace_string.h"
-#include "mongo/db/s/resharding/donor_document_gen.h"
-#include "mongo/db/service_context.h"
-#include "mongo/platform/mutex.h"
-#include "mongo/s/resharding/common_types_gen.h"
-#include "mongo/util/clock_source.h"
-#include "mongo/util/duration.h"
-#include "mongo/util/histogram.h"
-#include "mongo/util/uuid.h"
-
-namespace mongo {
-
-/*
- * Maintains the metrics for resharding operations.
- * All members of this class are thread-safe.
- */
-class ReshardingMetrics final {
-public:
- enum Role { kCoordinator, kDonor, kRecipient };
-
- static ReshardingMetrics* get(ServiceContext*) noexcept;
-
- explicit ReshardingMetrics(ServiceContext* svcCtx);
- ~ReshardingMetrics();
-
- ReshardingMetrics(const ReshardingMetrics&) = delete;
- ReshardingMetrics& operator=(const ReshardingMetrics&) = delete;
-
- // Marks the beginning of a resharding operation for a particular role. Note that:
- // * Only one resharding operation may run at any time.
- // * The only valid co-existing roles on a process are kDonor and kRecipient.
- void onStart(Role role, Date_t runningOperationStartTime) noexcept;
-
- // Marks the resumption of a resharding operation for a particular role.
- void onStepUp(Role role) noexcept;
-
- void onStepUp(DonorStateEnum state, ReshardingDonorMetrics donorMetrics);
-
- // So long as a resharding operation is in progress, the following may be used to update the
- // state of a donor, a recipient, and a coordinator, respectively.
- void setDonorState(DonorStateEnum) noexcept;
- void setRecipientState(RecipientStateEnum) noexcept;
- void setCoordinatorState(CoordinatorStateEnum) noexcept;
-
- void setDocumentsToCopy(int64_t documents, int64_t bytes) noexcept;
- void setDocumentsToCopyForCurrentOp(int64_t documents, int64_t bytes) noexcept;
- // Allows updating metrics on "documents to copy" so long as the recipient is in cloning state.
- void onDocumentsCopied(int64_t documents, int64_t bytes) noexcept;
-
- // Allows updating metrics on "opcounters";
- void gotInserts(int n) noexcept;
- void gotInsert() noexcept;
- void gotUpdate() noexcept;
- void gotDelete() noexcept;
-
- void setMinRemainingOperationTime(Milliseconds minOpTime) noexcept;
- void setMaxRemainingOperationTime(Milliseconds maxOpTime) noexcept;
-
- // Starts/ends the timers recording the times spend in the named sections.
- void startCopyingDocuments(Date_t start);
- void endCopyingDocuments(Date_t end);
-
- void startApplyingOplogEntries(Date_t start);
- void endApplyingOplogEntries(Date_t end);
-
- void enterCriticalSection(Date_t start);
- void leaveCriticalSection(Date_t end);
-
- // Records latency and throughput of calls to ReshardingOplogApplier::_applyBatch
- void onOplogApplierApplyBatch(Milliseconds latency);
-
- // Records latency and throughput of calls to resharding::data_copy::fillBatchForInsert
- // in ReshardingCollectionCloner::doOneBatch
- void onCollClonerFillBatchForInsert(Milliseconds latency);
-
- // Allows updating "oplog entries to apply" metrics when the recipient is in applying state.
- void onOplogEntriesFetched(int64_t entries) noexcept;
- // Allows restoring "oplog entries to apply" metrics.
- void onOplogEntriesApplied(int64_t entries) noexcept;
-
- void restoreForCurrentOp(int64_t documentCountCopied,
- int64_t documentBytesCopied,
- int64_t oplogEntriesFetched,
- int64_t oplogEntriesApplied) noexcept;
-
- // Allows tracking writes during a critical section when the donor's state is either of
- // "donating-oplog-entries" or "blocking-writes".
- void onWriteDuringCriticalSection(int64_t writes) noexcept;
- // Allows restoring writes during a critical section.
- void onWriteDuringCriticalSectionForCurrentOp(int64_t writes) noexcept;
-
- // Indicates that a role on this node is stepping down. If the role being stepped down is the
- // last active role on this process, the function tears down the currentOp variable. The
- // replica set primary that is stepping up continues the resharding operation from disk.
- void onStepDown(Role role) noexcept;
-
- // Marks the completion of the current (active) resharding operation for a particular role. If
- // the role being completed is the last active role on this process, the function tears down
- // the currentOp variable, indicating completion for the resharding operation on this process.
- //
- // Aborts the process if no resharding operation is in progress.
- void onCompletion(Role role,
- ReshardingOperationStatusEnum status,
- Date_t runningOperationEndTime) noexcept;
-
- // Records the chunk imbalance count for the most recent resharding operation.
- void setLastReshardChunkImbalanceCount(int64_t newCount) noexcept;
-
- struct ReporterOptions {
- ReporterOptions(Role role, UUID id, NamespaceString nss, BSONObj shardKey, bool unique)
- : role(role),
- id(std::move(id)),
- nss(std::move(nss)),
- shardKey(std::move(shardKey)),
- unique(unique) {}
-
- const Role role;
- const UUID id;
- const NamespaceString nss;
- const BSONObj shardKey;
- const bool unique;
- };
- BSONObj reportForCurrentOp(const ReporterOptions& options) const noexcept;
-
- bool wasReshardingEverAttempted() const;
-
- // Append metrics to the builder in CurrentOp format for the given `role`.
- void serializeCurrentOpMetrics(BSONObjBuilder*, Role role) const;
-
- // Append metrics to the builder in CumulativeOp (ServerStatus) format.
- void serializeCumulativeOpMetrics(BSONObjBuilder*) const;
-
- // Reports the elapsed time for the active resharding operation, or `boost::none`.
- boost::optional<Milliseconds> getOperationElapsedTime() const;
-
- // Reports the estimated remaining time for the active resharding operation, or `boost::none`.
- boost::optional<Milliseconds> getOperationRemainingTime() const;
-
- static Histogram<int64_t> getLatencyHistogram() {
- return Histogram<int64_t>({10, 100, 1000, 10000});
- }
-
-private:
- class OperationMetrics;
-
- ServiceContext* const _svcCtx;
-
- mutable Mutex _mutex = MONGO_MAKE_LATCH("ReshardingMetrics::_mutex");
-
- void _emplaceCurrentOpForRole(Role role,
- boost::optional<Date_t> runningOperationStartTime) noexcept;
-
- Date_t _now() const;
-
- bool _onStepUpCalled = false;
-
- // The following maintain the number of resharding operations that have started, succeeded,
- // failed with an unrecoverable error, and canceled by the user, respectively.
- int64_t _started = 0;
- int64_t _succeeded = 0;
- int64_t _failed = 0;
- int64_t _canceled = 0;
-
- std::unique_ptr<OperationMetrics> _currentOp;
- std::unique_ptr<OperationMetrics> _cumulativeOp;
-};
-
-} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_metrics_helpers.cpp b/src/mongo/db/s/resharding/resharding_metrics_helpers.cpp
index 786bcdaa9ab..86cc76261a8 100644
--- a/src/mongo/db/s/resharding/resharding_metrics_helpers.cpp
+++ b/src/mongo/db/s/resharding/resharding_metrics_helpers.cpp
@@ -42,9 +42,6 @@ namespace resharding_metrics {
namespace {
void onCriticalSectionErrorThrows(OperationContext* opCtx, const StaleConfigInfo& info) {
- if (!ShardingDataTransformMetrics::isEnabled()) {
- return;
- }
const auto& operationType = info.getDuringOperationType();
if (!operationType) {
return;
diff --git a/src/mongo/db/s/resharding/resharding_metrics_new.cpp b/src/mongo/db/s/resharding/resharding_metrics_new.cpp
index fa63eb9da3c..e07468ad1b9 100644
--- a/src/mongo/db/s/resharding/resharding_metrics_new.cpp
+++ b/src/mongo/db/s/resharding/resharding_metrics_new.cpp
@@ -61,13 +61,9 @@ BSONObj createOriginalCommand(const NamespaceString& nss, BSONObj shardKey) {
}
Date_t readStartTime(const CommonReshardingMetadata& metadata, ClockSource* fallbackSource) {
- try {
- const auto& startTime = metadata.getStartTime();
- tassert(6503901,
- "Metadata is missing start time despite feature flag being enabled",
- startTime.has_value());
+ if (const auto& startTime = metadata.getStartTime()) {
return startTime.get();
- } catch (const DBException&) {
+ } else {
return fallbackSource->now();
}
}
@@ -170,4 +166,134 @@ void ReshardingMetricsNew::restoreCoordinatorSpecificFields(
restorePhaseDurationFields(document);
}
+ReshardingMetricsNew::DonorState::DonorState(DonorStateEnum enumVal) : _enumVal(enumVal) {}
+
+ShardingDataTransformCumulativeMetrics::DonorStateEnum ReshardingMetricsNew::DonorState::toMetrics()
+ const {
+ using MetricsEnum = ShardingDataTransformCumulativeMetrics::DonorStateEnum;
+
+ switch (_enumVal) {
+ case DonorStateEnum::kUnused:
+ return MetricsEnum::kUnused;
+
+ case DonorStateEnum::kPreparingToDonate:
+ return MetricsEnum::kPreparingToDonate;
+
+ case DonorStateEnum::kDonatingInitialData:
+ return MetricsEnum::kDonatingInitialData;
+
+ case DonorStateEnum::kDonatingOplogEntries:
+ return MetricsEnum::kDonatingOplogEntries;
+
+ case DonorStateEnum::kPreparingToBlockWrites:
+ return MetricsEnum::kPreparingToBlockWrites;
+
+ case DonorStateEnum::kError:
+ return MetricsEnum::kError;
+
+ case DonorStateEnum::kBlockingWrites:
+ return MetricsEnum::kBlockingWrites;
+
+ case DonorStateEnum::kDone:
+ return MetricsEnum::kDone;
+ default:
+ invariant(false,
+ str::stream() << "Unexpected resharding coordinator state: "
+ << DonorState_serializer(_enumVal));
+ MONGO_UNREACHABLE;
+ }
+}
+
+DonorStateEnum ReshardingMetricsNew::DonorState::getState() const {
+ return _enumVal;
+}
+
+ReshardingMetricsNew::RecipientState::RecipientState(RecipientStateEnum enumVal)
+ : _enumVal(enumVal) {}
+
+ShardingDataTransformCumulativeMetrics::RecipientStateEnum
+ReshardingMetricsNew::RecipientState::toMetrics() const {
+ using MetricsEnum = ShardingDataTransformCumulativeMetrics::RecipientStateEnum;
+
+ switch (_enumVal) {
+ case RecipientStateEnum::kUnused:
+ return MetricsEnum::kUnused;
+
+ case RecipientStateEnum::kAwaitingFetchTimestamp:
+ return MetricsEnum::kAwaitingFetchTimestamp;
+
+ case RecipientStateEnum::kCreatingCollection:
+ return MetricsEnum::kCreatingCollection;
+
+ case RecipientStateEnum::kCloning:
+ return MetricsEnum::kCloning;
+
+ case RecipientStateEnum::kApplying:
+ return MetricsEnum::kApplying;
+
+ case RecipientStateEnum::kError:
+ return MetricsEnum::kError;
+
+ case RecipientStateEnum::kStrictConsistency:
+ return MetricsEnum::kStrictConsistency;
+
+ case RecipientStateEnum::kDone:
+ return MetricsEnum::kDone;
+
+ default:
+ invariant(false,
+ str::stream() << "Unexpected resharding coordinator state: "
+ << RecipientState_serializer(_enumVal));
+ MONGO_UNREACHABLE;
+ }
+}
+
+RecipientStateEnum ReshardingMetricsNew::RecipientState::getState() const {
+ return _enumVal;
+}
+
+ReshardingMetricsNew::CoordinatorState::CoordinatorState(CoordinatorStateEnum enumVal)
+ : _enumVal(enumVal) {}
+
+ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum
+ReshardingMetricsNew::CoordinatorState::toMetrics() const {
+ switch (_enumVal) {
+ case CoordinatorStateEnum::kUnused:
+ return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kUnused;
+
+ case CoordinatorStateEnum::kInitializing:
+ return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kInitializing;
+
+ case CoordinatorStateEnum::kPreparingToDonate:
+ return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kPreparingToDonate;
+
+ case CoordinatorStateEnum::kCloning:
+ return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kCloning;
+
+ case CoordinatorStateEnum::kApplying:
+ return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kApplying;
+
+ case CoordinatorStateEnum::kBlockingWrites:
+ return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kBlockingWrites;
+
+ case CoordinatorStateEnum::kAborting:
+ return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kAborting;
+
+ case CoordinatorStateEnum::kCommitting:
+ return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kCommitting;
+
+ case CoordinatorStateEnum::kDone:
+ return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kDone;
+ default:
+ invariant(false,
+ str::stream() << "Unexpected resharding coordinator state: "
+ << CoordinatorState_serializer(_enumVal));
+ MONGO_UNREACHABLE;
+ }
+}
+
+CoordinatorStateEnum ReshardingMetricsNew::CoordinatorState::getState() const {
+ return _enumVal;
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_metrics_new.h b/src/mongo/db/s/resharding/resharding_metrics_new.h
index 1f060b320b0..b8e96698b0d 100644
--- a/src/mongo/db/s/resharding/resharding_metrics_new.h
+++ b/src/mongo/db/s/resharding/resharding_metrics_new.h
@@ -42,6 +42,42 @@ class ReshardingMetricsNew : public ShardingDataTransformInstanceMetrics {
public:
using State = stdx::variant<CoordinatorStateEnum, RecipientStateEnum, DonorStateEnum>;
+ class DonorState {
+ public:
+ using MetricsType = ShardingDataTransformCumulativeMetrics::DonorStateEnum;
+
+ explicit DonorState(DonorStateEnum enumVal);
+ MetricsType toMetrics() const;
+ DonorStateEnum getState() const;
+
+ private:
+ const DonorStateEnum _enumVal;
+ };
+
+ class RecipientState {
+ public:
+ using MetricsType = ShardingDataTransformCumulativeMetrics::RecipientStateEnum;
+
+ explicit RecipientState(RecipientStateEnum enumVal);
+ MetricsType toMetrics() const;
+ RecipientStateEnum getState() const;
+
+ private:
+ RecipientStateEnum _enumVal;
+ };
+
+ class CoordinatorState {
+ public:
+ using MetricsType = ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum;
+
+ explicit CoordinatorState(CoordinatorStateEnum enumVal);
+ MetricsType toMetrics() const;
+ CoordinatorStateEnum getState() const;
+
+ private:
+ CoordinatorStateEnum _enumVal;
+ };
+
ReshardingMetricsNew(UUID instanceId,
BSONObj shardKey,
NamespaceString nss,
@@ -85,6 +121,26 @@ public:
}
template <typename T>
+ void onStateTransition(T before, boost::none_t after) {
+ getCumulativeMetrics()->onStateTransition<typename T::MetricsType>(before.toMetrics(),
+ after);
+ }
+
+ template <typename T>
+ void onStateTransition(boost::none_t before, T after) {
+ setState(after.getState());
+ getCumulativeMetrics()->onStateTransition<typename T::MetricsType>(before,
+ after.toMetrics());
+ }
+
+ template <typename T>
+ void onStateTransition(T before, T after) {
+ setState(after.getState());
+ getCumulativeMetrics()->onStateTransition<typename T::MetricsType>(before.toMetrics(),
+ after.toMetrics());
+ }
+
+ template <typename T>
void setState(T state) {
static_assert(std::is_assignable_v<State, T>);
_state.store(state);
diff --git a/src/mongo/db/s/resharding/resharding_metrics_test.cpp b/src/mongo/db/s/resharding/resharding_metrics_test.cpp
deleted file mode 100644
index 978963eb555..00000000000
--- a/src/mongo/db/s/resharding/resharding_metrics_test.cpp
+++ /dev/null
@@ -1,895 +0,0 @@
-/**
- * Copyright (C) 2020-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-
-#include <fmt/format.h>
-
-#include "mongo/bson/bsonobjbuilder.h"
-#include "mongo/bson/json.h"
-#include "mongo/db/s/resharding/resharding_metrics.h"
-#include "mongo/db/service_context_test_fixture.h"
-#include "mongo/logv2/log.h"
-#include "mongo/unittest/death_test.h"
-#include "mongo/unittest/unittest.h"
-#include "mongo/util/clock_source_mock.h"
-#include "mongo/util/duration.h"
-#include "mongo/util/histogram.h"
-#include "mongo/util/uuid.h"
-
-#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
-
-
-namespace mongo {
-namespace {
-
-using namespace fmt::literals;
-
-constexpr auto kOpTimeRemaining = "remainingOperationTimeEstimatedSecs"_sd;
-constexpr auto kOplogApplierApplyBatchLatencyMillis = "oplogApplierApplyBatchLatencyMillis";
-constexpr auto kCollClonerFillBatchForInsertLatencyMillis =
- "collClonerFillBatchForInsertLatencyMillis";
-
-class ReshardingMetricsTest : public ServiceContextTest {
-public:
- enum OpReportType {
- CumulativeReport,
- CurrentOpReportDonorRole,
- CurrentOpReportRecipientRole,
- CurrentOpReportCoordinatorRole
- };
-
- void setUp() override {
- auto clockSource = std::make_unique<ClockSourceMock>();
- _clockSource = clockSource.get();
- getGlobalServiceContext()->setFastClockSource(std::move(clockSource));
- }
-
- auto getMetrics() {
- return ReshardingMetrics::get(getGlobalServiceContext());
- }
-
- void startOperation(ReshardingMetrics::Role role) {
- getMetrics()->onStart(role, getGlobalServiceContext()->getFastClockSource()->now());
- }
-
- void stepUpOperation(ReshardingMetrics::Role role) {
- getMetrics()->onStepUp(role);
- }
-
- void stepDownOperation(ReshardingMetrics::Role role) {
- getMetrics()->onStepDown(role);
- }
-
- void completeOperation(ReshardingMetrics::Role role, ReshardingOperationStatusEnum opStatus) {
- getMetrics()->onCompletion(
- role, opStatus, getGlobalServiceContext()->getFastClockSource()->now());
- }
-
- // Timer step in milliseconds
- static constexpr auto kTimerStep = 100;
-
- void advanceTime(Milliseconds step = Milliseconds{kTimerStep}) {
- _clockSource->advance(step);
- }
-
- auto getWasReshardingEverAttempted() {
- return getMetrics()->wasReshardingEverAttempted();
- }
-
- auto getReport(OpReportType reportType) {
- BSONObjBuilder bob;
- if (reportType == OpReportType::CumulativeReport) {
- getMetrics()->serializeCumulativeOpMetrics(&bob);
- } else if (reportType == OpReportType::CurrentOpReportDonorRole) {
- getMetrics()->serializeCurrentOpMetrics(&bob, ReshardingMetrics::Role::kDonor);
- } else if (reportType == OpReportType::CurrentOpReportRecipientRole) {
- getMetrics()->serializeCurrentOpMetrics(&bob, ReshardingMetrics::Role::kRecipient);
- } else {
- getMetrics()->serializeCurrentOpMetrics(&bob, ReshardingMetrics::Role::kCoordinator);
- }
- return bob.obj();
- }
-
- void checkMetrics(std::string tag, int expectedValue, OpReportType reportType) {
- const auto report = getReport(reportType);
- checkMetrics(report, std::move(tag), expectedValue);
- }
-
- void checkMetrics(std::string tag,
- int expectedValue,
- std::string errMsg,
- OpReportType reportType) {
- const auto report = getReport(reportType);
- checkMetrics(report, std::move(tag), expectedValue, std::move(errMsg));
- }
-
- void checkMetrics(const BSONObj& report,
- std::string tag,
- int expectedValue,
- std::string errMsg = "Unexpected value") const {
- ASSERT_EQ(report.getIntField(tag), expectedValue)
- << fmt::format("{}: {}", errMsg, report.toString());
- };
-
- void appendExpectedHistogramResult(BSONObjBuilder* bob,
- std::string tag,
- const std::vector<int64_t>& latencies) {
- Histogram<int64_t> hist = ReshardingMetrics::getLatencyHistogram();
- for (size_t i = 0; i < latencies.size(); i++) {
- hist.increment(latencies[i]);
- }
-
- BSONObjBuilder histogramBuilder;
- appendHistogram(histogramBuilder, hist, tag);
- BSONObj histogram = histogramBuilder.obj();
- bob->appendElements(histogram);
- }
-
-private:
- ClockSourceMock* _clockSource;
-};
-
-// TODO Re-enable once underlying invariants are re-enabled
-/*
-DEATH_TEST_F(ReshardingMetricsTest, RunOnCompletionBeforeOnStart, "No operation is in progress") {
- completeOperation(ReshardingMetrics::Role::kRecipient,
- ReshardingOperationStatusEnum::kSuccess);
-}
-
-DEATH_TEST_F(ReshardingMetricsTest,
- RunOnStepUpAfterOnStartInvariants,
- "Another operation is in progress") {
- startOperation(ReshardingMetrics::Role::kRecipient);
- stepUpOperation(ReshardingMetrics::Role::kRecipient);
-}
-
-DEATH_TEST_F(ReshardingMetricsTest,
- RunOnCompletionAfterOnStepDownInvariants,
- "No operation is in progress") {
- startOperation(ReshardingMetrics::Role::kRecipient);
- stepDownOperation(ReshardingMetrics::Role::kRecipient);
- completeOperation(ReshardingMetrics::Role::kRecipient,
- ReshardingOperationStatusEnum::kSuccess);
-}
-*/
-
-TEST_F(ReshardingMetricsTest, RunOnCompletionTwiceIsSafe) {
- startOperation(ReshardingMetrics::Role::kCoordinator);
- completeOperation(ReshardingMetrics::Role::kCoordinator,
- ReshardingOperationStatusEnum::kSuccess);
- completeOperation(ReshardingMetrics::Role::kCoordinator,
- ReshardingOperationStatusEnum::kSuccess);
-}
-
-TEST_F(ReshardingMetricsTest, RunOnStepDownAfterOnCompletionIsSafe) {
- startOperation(ReshardingMetrics::Role::kRecipient);
- completeOperation(ReshardingMetrics::Role::kRecipient, ReshardingOperationStatusEnum::kSuccess);
- stepDownOperation(ReshardingMetrics::Role::kRecipient);
-}
-
-DEATH_TEST_F(ReshardingMetricsTest, CoordinatorThenDonor, "Another operation is in progress") {
- startOperation(ReshardingMetrics::Role::kCoordinator);
- startOperation(ReshardingMetrics::Role::kDonor);
-}
-
-DEATH_TEST_F(ReshardingMetricsTest, DonorThenCoordinator, "Another operation is in progress") {
- startOperation(ReshardingMetrics::Role::kDonor);
- startOperation(ReshardingMetrics::Role::kCoordinator);
-}
-
-DEATH_TEST_F(ReshardingMetricsTest, CoordinatorThenRecipient, "Another operation is in progress") {
- startOperation(ReshardingMetrics::Role::kCoordinator);
- startOperation(ReshardingMetrics::Role::kRecipient);
-}
-
-DEATH_TEST_F(ReshardingMetricsTest, RecipientThenCoordinator, "Another operation is in progress") {
- startOperation(ReshardingMetrics::Role::kRecipient);
- startOperation(ReshardingMetrics::Role::kCoordinator);
-}
-
-TEST_F(ReshardingMetricsTest, DonorAndRecipientCombinationIsSafe) {
- startOperation(ReshardingMetrics::Role::kRecipient);
- startOperation(ReshardingMetrics::Role::kDonor);
- completeOperation(ReshardingMetrics::Role::kRecipient, ReshardingOperationStatusEnum::kSuccess);
- completeOperation(ReshardingMetrics::Role::kDonor, ReshardingOperationStatusEnum::kSuccess);
-}
-
-TEST_F(ReshardingMetricsTest, DonorAndRecipientStepdownIsSafe) {
- startOperation(ReshardingMetrics::Role::kDonor);
- startOperation(ReshardingMetrics::Role::kRecipient);
- stepDownOperation(ReshardingMetrics::Role::kRecipient);
- stepDownOperation(ReshardingMetrics::Role::kDonor);
-}
-
-TEST_F(ReshardingMetricsTest, OperationStatus) {
- startOperation(ReshardingMetrics::Role::kCoordinator);
- const auto report = getReport(OpReportType::CurrentOpReportCoordinatorRole);
- ASSERT_EQ(report.getStringField("opStatus"),
- ReshardingOperationStatus_serializer(ReshardingOperationStatusEnum::kRunning));
- completeOperation(ReshardingMetrics::Role::kCoordinator,
- ReshardingOperationStatusEnum::kSuccess);
-}
-
-TEST_F(ReshardingMetricsTest, TestOperationStatus) {
- const auto kNumSuccessfulOps = 3;
- const auto kNumFailedOps = 5;
- const auto kNumCanceledOps = 7;
-
- for (auto i = 0; i < kNumSuccessfulOps; i++) {
- startOperation(ReshardingMetrics::Role::kRecipient);
- completeOperation(ReshardingMetrics::Role::kRecipient,
- ReshardingOperationStatusEnum::kSuccess);
- }
-
- for (auto i = 0; i < kNumFailedOps; i++) {
- startOperation(ReshardingMetrics::Role::kRecipient);
- completeOperation(ReshardingMetrics::Role::kRecipient,
- ReshardingOperationStatusEnum::kFailure);
- }
-
- for (auto i = 0; i < kNumCanceledOps; i++) {
- startOperation(ReshardingMetrics::Role::kRecipient);
- completeOperation(ReshardingMetrics::Role::kRecipient,
- ReshardingOperationStatusEnum::kCanceled);
- }
-
- checkMetrics("countReshardingSuccessful", kNumSuccessfulOps, OpReportType::CumulativeReport);
- checkMetrics("countReshardingFailures", kNumFailedOps, OpReportType::CumulativeReport);
- checkMetrics("countReshardingCanceled", kNumCanceledOps, OpReportType::CumulativeReport);
-
- const auto total = kNumSuccessfulOps + kNumFailedOps + kNumCanceledOps;
- checkMetrics("countReshardingOperations", total, OpReportType::CumulativeReport);
- startOperation(ReshardingMetrics::Role::kRecipient);
- checkMetrics("countReshardingOperations", total + 1, OpReportType::CumulativeReport);
-}
-
-TEST_F(ReshardingMetricsTest, TestElapsedTime) {
- startOperation(ReshardingMetrics::Role::kDonor);
- const auto elapsedTime = 1;
- advanceTime(Seconds(elapsedTime));
- checkMetrics(
- "totalOperationTimeElapsedSecs", elapsedTime, OpReportType::CurrentOpReportDonorRole);
-}
-
-TEST_F(ReshardingMetricsTest, TestDonorAndRecipientMetrics) {
- startOperation(ReshardingMetrics::Role::kRecipient);
- startOperation(ReshardingMetrics::Role::kDonor);
- const auto elapsedTime = 1;
-
- advanceTime(Seconds(elapsedTime));
-
- // Update metrics for donor
- const auto kWritesDuringCriticalSection = 7;
- getMetrics()->setDonorState(DonorStateEnum::kPreparingToBlockWrites);
- getMetrics()->enterCriticalSection(getGlobalServiceContext()->getFastClockSource()->now());
- getMetrics()->onWriteDuringCriticalSection(kWritesDuringCriticalSection);
- advanceTime(Seconds(elapsedTime));
-
- // Update metrics for recipient
- const auto kDocumentsToCopy = 50;
- const auto kBytesToCopy = 740;
- const auto kCopyProgress = 50;
- getMetrics()->setRecipientState(RecipientStateEnum::kCreatingCollection);
- getMetrics()->setDocumentsToCopy(kDocumentsToCopy, kBytesToCopy);
- getMetrics()->setRecipientState(RecipientStateEnum::kCloning);
- getMetrics()->startCopyingDocuments(getGlobalServiceContext()->getFastClockSource()->now());
- getMetrics()->onDocumentsCopied(kDocumentsToCopy * kCopyProgress / 100,
- kBytesToCopy * kCopyProgress / 100);
- advanceTime(Seconds(elapsedTime));
-
- const auto currentDonorOpReport = getReport(OpReportType::CurrentOpReportDonorRole);
- const auto currentRecipientOpReport = getReport(OpReportType::CurrentOpReportRecipientRole);
- completeOperation(ReshardingMetrics::Role::kDonor, ReshardingOperationStatusEnum::kSuccess);
- completeOperation(ReshardingMetrics::Role::kRecipient, ReshardingOperationStatusEnum::kSuccess);
-
- checkMetrics(currentRecipientOpReport, "totalCopyTimeElapsedSecs", elapsedTime);
- checkMetrics(currentRecipientOpReport, "bytesCopied", kBytesToCopy * kCopyProgress / 100);
- checkMetrics(
- currentRecipientOpReport, "documentsCopied", kDocumentsToCopy * kCopyProgress / 100);
- checkMetrics(currentDonorOpReport, "totalCriticalSectionTimeElapsedSecs", elapsedTime * 2);
- checkMetrics(
- currentDonorOpReport, "countWritesDuringCriticalSection", kWritesDuringCriticalSection);
-
- const auto cumulativeReportAfterCompletion = getReport(OpReportType::CumulativeReport);
- checkMetrics(
- cumulativeReportAfterCompletion, "bytesCopied", kBytesToCopy * kCopyProgress / 100);
- checkMetrics(
- cumulativeReportAfterCompletion, "documentsCopied", kDocumentsToCopy * kCopyProgress / 100);
- checkMetrics(cumulativeReportAfterCompletion,
- "countWritesDuringCriticalSection",
- kWritesDuringCriticalSection);
-}
-
-TEST_F(ReshardingMetricsTest, CumulativeOpMetricsAreRetainedAfterCompletion) {
- auto constexpr kTag = "documentsCopied";
- startOperation(ReshardingMetrics::Role::kRecipient);
- const auto kDocumentsToCopy = 2;
- const auto kBytesToCopy = 200;
- getMetrics()->setRecipientState(RecipientStateEnum::kCloning);
- getMetrics()->startCopyingDocuments(getGlobalServiceContext()->getFastClockSource()->now());
- getMetrics()->onDocumentsCopied(kDocumentsToCopy, kBytesToCopy);
- advanceTime();
- completeOperation(ReshardingMetrics::Role::kRecipient, ReshardingOperationStatusEnum::kFailure);
- advanceTime();
-
- checkMetrics(kTag,
- kDocumentsToCopy,
- "Cumulative metrics are not retained",
- OpReportType::CumulativeReport);
-
- startOperation(ReshardingMetrics::Role::kRecipient);
- checkMetrics(
- kTag, kDocumentsToCopy, "Cumulative metrics are reset", OpReportType::CumulativeReport);
-}
-
-TEST_F(ReshardingMetricsTest, CumulativeOpMetricsAreRetainedAfterCancellation) {
- auto constexpr kTag = "documentsCopied";
- startOperation(ReshardingMetrics::Role::kRecipient);
- const auto kDocumentsToCopy = 2;
- const auto kBytesToCopy = 200;
- getMetrics()->setRecipientState(RecipientStateEnum::kCloning);
- getMetrics()->startCopyingDocuments(getGlobalServiceContext()->getFastClockSource()->now());
- getMetrics()->onDocumentsCopied(kDocumentsToCopy, kBytesToCopy);
- advanceTime();
- completeOperation(ReshardingMetrics::Role::kRecipient,
- ReshardingOperationStatusEnum::kCanceled);
- advanceTime();
-
- checkMetrics(kTag,
- kDocumentsToCopy,
- "Cumulative metrics are not retained",
- OpReportType::CumulativeReport);
-
- startOperation(ReshardingMetrics::Role::kRecipient);
- checkMetrics(
- kTag, kDocumentsToCopy, "Cumulative metrics are reset", OpReportType::CumulativeReport);
-}
-
-TEST_F(ReshardingMetricsTest, CurrentOpMetricsAreResetAfterCompletion) {
- auto constexpr kTag = "documentsCopied";
- startOperation(ReshardingMetrics::Role::kRecipient);
- const auto kDocumentsToCopy = 2;
- const auto kBytesToCopy = 200;
- getMetrics()->setRecipientState(RecipientStateEnum::kCloning);
- getMetrics()->startCopyingDocuments(getGlobalServiceContext()->getFastClockSource()->now());
- getMetrics()->onDocumentsCopied(kDocumentsToCopy, kBytesToCopy);
- checkMetrics(kTag,
- kDocumentsToCopy,
- "Current metrics are not set",
- OpReportType::CurrentOpReportRecipientRole);
- advanceTime();
- completeOperation(ReshardingMetrics::Role::kRecipient, ReshardingOperationStatusEnum::kSuccess);
- advanceTime();
-
- startOperation(ReshardingMetrics::Role::kRecipient);
- checkMetrics(
- kTag, 0, "Current metrics are not reset", OpReportType::CurrentOpReportRecipientRole);
-}
-
-TEST_F(ReshardingMetricsTest, CurrentOpMetricsAreNotRetainedAfterCompletion) {
- auto constexpr kTag = "documentsCopied";
- startOperation(ReshardingMetrics::Role::kRecipient);
- const auto kDocumentsToCopy = 2;
- const auto kBytesToCopy = 200;
- getMetrics()->setRecipientState(RecipientStateEnum::kCloning);
- getMetrics()->startCopyingDocuments(getGlobalServiceContext()->getFastClockSource()->now());
- getMetrics()->onDocumentsCopied(kDocumentsToCopy, kBytesToCopy);
- checkMetrics(kTag,
- kDocumentsToCopy,
- "Current metrics are not set",
- OpReportType::CurrentOpReportRecipientRole);
- advanceTime();
- completeOperation(ReshardingMetrics::Role::kRecipient, ReshardingOperationStatusEnum::kFailure);
- advanceTime();
-
- ASSERT_FALSE(getReport(OpReportType::CurrentOpReportRecipientRole)[kTag].ok());
-}
-
-TEST_F(ReshardingMetricsTest, CurrentOpMetricsAreNotRetainedAfterStepDown) {
- auto constexpr kTag = "documentsCopied";
- startOperation(ReshardingMetrics::Role::kRecipient);
- const auto kDocumentsToCopy = 2;
- const auto kBytesToCopy = 200;
- getMetrics()->setRecipientState(RecipientStateEnum::kCloning);
- getMetrics()->startCopyingDocuments(getGlobalServiceContext()->getFastClockSource()->now());
- getMetrics()->onDocumentsCopied(kDocumentsToCopy, kBytesToCopy);
- checkMetrics(kTag,
- kDocumentsToCopy,
- "Current metrics are not set",
- OpReportType::CurrentOpReportRecipientRole);
- advanceTime();
- stepDownOperation(ReshardingMetrics::Role::kRecipient);
- advanceTime();
-
- ASSERT_FALSE(getReport(OpReportType::CurrentOpReportRecipientRole)[kTag].ok());
-}
-
-TEST_F(ReshardingMetricsTest, CumulativeOpMetricsAreSetAndReset) {
- const auto kExpectedMin = Milliseconds(1);
- const auto kExpectedMax = Milliseconds(8);
- auto constexpr kMinOpTime = "minShardRemainingOperationTimeEstimatedMillis";
- auto constexpr kMaxOpTime = "maxShardRemainingOperationTimeEstimatedMillis";
- startOperation(ReshardingMetrics::Role::kCoordinator);
- getMetrics()->setCoordinatorState(CoordinatorStateEnum::kBlockingWrites);
-
- getMetrics()->setMinRemainingOperationTime(kExpectedMin);
- checkMetrics(kMinOpTime,
- durationCount<Milliseconds>(kExpectedMin),
- "Cumulative metrics minimum time remaining is not set",
- OpReportType::CumulativeReport);
-
- getMetrics()->setMaxRemainingOperationTime(kExpectedMax);
- checkMetrics(kMaxOpTime,
- durationCount<Milliseconds>(kExpectedMax),
- "Cumulative metrics maximum time remaining is not set",
- OpReportType::CumulativeReport);
-
- advanceTime();
- completeOperation(ReshardingMetrics::Role::kCoordinator,
- ReshardingOperationStatusEnum::kSuccess);
- advanceTime();
- checkMetrics(kMinOpTime,
- 0,
- "Cumulative metrics minimum time remaining is not reset",
- OpReportType::CumulativeReport);
-
- checkMetrics(kMaxOpTime,
- 0,
- "Cumulative metrics maximum time remaining is not reset",
- OpReportType::CumulativeReport);
-}
-
-TEST_F(ReshardingMetricsTest, EstimatedRemainingOperationTime) {
- auto constexpr kTag = "remainingOperationTimeEstimatedSecs";
- const auto elapsedTime = 1;
-
- startOperation(ReshardingMetrics::Role::kRecipient);
- checkMetrics(kTag, -1, OpReportType::CurrentOpReportRecipientRole);
-
- const auto kDocumentsToCopy = 2;
- const auto kBytesToCopy = 200;
- getMetrics()->setRecipientState(RecipientStateEnum::kCreatingCollection);
- getMetrics()->setDocumentsToCopy(kDocumentsToCopy, kBytesToCopy);
- getMetrics()->setRecipientState(RecipientStateEnum::kCloning);
- getMetrics()->startCopyingDocuments(getGlobalServiceContext()->getFastClockSource()->now());
- getMetrics()->onDocumentsCopied(kDocumentsToCopy / 2, kBytesToCopy / 2);
- advanceTime(Seconds(elapsedTime));
- // Since 50% of the data is copied, the remaining copy time equals the elapsed copy time, which
- // is equal to `elapsedTime` seconds.
- checkMetrics(kTag, elapsedTime + 2 * elapsedTime, OpReportType::CurrentOpReportRecipientRole);
-
- const auto kOplogEntriesFetched = 4;
- const auto kOplogEntriesApplied = 2;
- getMetrics()->setRecipientState(RecipientStateEnum::kApplying);
- getMetrics()->endCopyingDocuments(getGlobalServiceContext()->getFastClockSource()->now());
- getMetrics()->startApplyingOplogEntries(getGlobalServiceContext()->getFastClockSource()->now());
- getMetrics()->onOplogEntriesFetched(kOplogEntriesFetched);
- getMetrics()->onOplogEntriesApplied(kOplogEntriesApplied);
- advanceTime(Seconds(elapsedTime));
- // So far, the time to apply oplog entries equals `elapsedTime` seconds.
- checkMetrics(kTag,
- elapsedTime * (kOplogEntriesFetched / kOplogEntriesApplied - 1),
- OpReportType::CurrentOpReportRecipientRole);
-}
-
-TEST_F(ReshardingMetricsTest, CurrentOpReportForDonor) {
- const auto kDonorState = DonorStateEnum::kPreparingToBlockWrites;
- startOperation(ReshardingMetrics::Role::kDonor);
- advanceTime(Seconds(2));
- getMetrics()->setDonorState(kDonorState);
- getMetrics()->enterCriticalSection(getGlobalServiceContext()->getFastClockSource()->now());
- advanceTime(Seconds(3));
-
- const ReshardingMetrics::ReporterOptions options(
- ReshardingMetrics::Role::kDonor,
- UUID::parse("12345678-1234-1234-1234-123456789abc").getValue(),
- NamespaceString("db", "collection"),
- BSON("id" << 1),
- true);
-
- const auto expected =
- fromjson(fmt::format("{{ type: \"op\","
- "desc: \"ReshardingDonorService {0}\","
- "op: \"command\","
- "ns: \"{1}\","
- "originatingCommand: {{ reshardCollection: \"{1}\","
- "key: {2},"
- "unique: {3},"
- "collation: {{ locale: \"simple\" }} }},"
- "totalOperationTimeElapsedSecs: 5,"
- "countWritesDuringCriticalSection: 0,"
- "totalCriticalSectionTimeElapsedSecs : 3,"
- "donorState: \"{4}\","
- "opStatus: \"running\" }}",
- options.id.toString(),
- options.nss.toString(),
- options.shardKey.toString(),
- options.unique ? "true" : "false",
- DonorState_serializer(kDonorState)));
-
- const auto report = getMetrics()->reportForCurrentOp(options);
- ASSERT_BSONOBJ_EQ(expected, report);
-}
-
-TEST_F(ReshardingMetricsTest, CurrentOpReportForRecipient) {
- const auto kRecipientState = RecipientStateEnum::kCloning;
-
- constexpr auto kDocumentsToCopy = 500;
- constexpr auto kDocumentsCopied = kDocumentsToCopy * 0.5;
- static_assert(kDocumentsToCopy >= kDocumentsCopied);
-
- constexpr auto kBytesToCopy = 8192;
- constexpr auto kBytesCopied = kBytesToCopy * 0.5;
- static_assert(kBytesToCopy >= kBytesCopied);
-
- constexpr auto kDelayBeforeCloning = Seconds(2);
- startOperation(ReshardingMetrics::Role::kRecipient);
- advanceTime(kDelayBeforeCloning);
-
- constexpr auto kTimeSpentCloning = Seconds(3);
- getMetrics()->setRecipientState(RecipientStateEnum::kCreatingCollection);
- getMetrics()->setDocumentsToCopy(kDocumentsToCopy, kBytesToCopy);
- getMetrics()->setRecipientState(kRecipientState);
- getMetrics()->startCopyingDocuments(getGlobalServiceContext()->getFastClockSource()->now());
- advanceTime(kTimeSpentCloning);
- getMetrics()->onDocumentsCopied(kDocumentsCopied, kBytesCopied);
-
- const auto kTimeToCopyRemainingSeconds =
- durationCount<Seconds>(kTimeSpentCloning) * (kBytesToCopy / kBytesCopied - 1);
- const auto kRemainingOperationTimeSeconds =
- durationCount<Seconds>(kTimeSpentCloning) + 2 * kTimeToCopyRemainingSeconds;
-
- const ReshardingMetrics::ReporterOptions options(
- ReshardingMetrics::Role::kRecipient,
- UUID::parse("12345678-1234-1234-1234-123456789def").getValue(),
- NamespaceString("db", "collection"),
- BSON("id" << 1),
- false);
-
- BSONObj expectedPrefix =
- fromjson(fmt::format("{{ type: \"op\","
- "desc: \"ReshardingRecipientService {0}\","
- "op: \"command\","
- "ns: \"{1}\","
- "originatingCommand: {{ reshardCollection: \"{1}\","
- "key: {2},"
- "unique: {3},"
- "collation: {{ locale: \"simple\" }} }},"
- "totalOperationTimeElapsedSecs: {4},"
- "remainingOperationTimeEstimatedSecs: {5},"
- "approxDocumentsToCopy: {6},"
- "documentsCopied: {7},"
- "approxBytesToCopy: {8},"
- "bytesCopied: {9},"
- "totalCopyTimeElapsedSecs: {10},"
- "oplogEntriesFetched: 0,"
- "oplogEntriesApplied: 0,"
- "totalApplyTimeElapsedSecs: 0,"
- "recipientState: \"{11}\","
- "opStatus: \"running\" }}",
- options.id.toString(),
- options.nss.toString(),
- options.shardKey.toString(),
- options.unique ? "true" : "false",
- durationCount<Seconds>(kDelayBeforeCloning + kTimeSpentCloning),
- kRemainingOperationTimeSeconds,
- kDocumentsToCopy,
- kDocumentsCopied,
- kBytesToCopy,
- kBytesCopied,
- durationCount<Seconds>(kTimeSpentCloning),
- RecipientState_serializer(kRecipientState)));
-
- BSONObjBuilder expectedBuilder(std::move(expectedPrefix));
-
- // Append histogram latency data.
- appendExpectedHistogramResult(&expectedBuilder, kOplogApplierApplyBatchLatencyMillis, {});
- appendExpectedHistogramResult(&expectedBuilder, kCollClonerFillBatchForInsertLatencyMillis, {});
-
- BSONObj expected = expectedBuilder.done();
-
- const auto report = getMetrics()->reportForCurrentOp(options);
- ASSERT_BSONOBJ_EQ(expected, report);
-}
-
-TEST_F(ReshardingMetricsTest, TestHistogramMetricsForRecipient) {
- const std::vector<int64_t> applyLatencies_1{3, 427, 0, 6004, 320, 10056, 12300, 105, 70};
- const std::vector<int64_t> applyLatencies_2{800, 20, 5, 1025, 10567};
- const std::vector<int64_t> insertLatencies_1{120, 7, 110, 50, 0, 16500, 77000, 667, 7980};
- const std::vector<int64_t> insertLatencies_2{12450, 2400, 760, 57, 2};
-
- const auto combineLatencies = [](std::vector<int64_t>* allLatencies,
- const std::vector<int64_t>& latencies_1,
- const std::vector<int64_t>& latencies_2) {
- allLatencies->insert(allLatencies->end(), latencies_1.begin(), latencies_1.end());
- allLatencies->insert(allLatencies->end(), latencies_2.begin(), latencies_2.end());
- };
-
- std::vector<int64_t> allApplyLatencies;
- combineLatencies(&allApplyLatencies, applyLatencies_1, applyLatencies_2);
- std::vector<int64_t> allInsertLatencies;
- combineLatencies(&allInsertLatencies, insertLatencies_1, insertLatencies_2);
-
-
- const ReshardingMetrics::ReporterOptions options(
- ReshardingMetrics::Role::kRecipient,
- UUID::parse("12345678-1234-1234-1234-123456789def").getValue(),
- NamespaceString("db", "collection"),
- BSON("id" << 1),
- false);
-
- // Test that all histogram metrics appear in both currentOp and cumulativeOp.
- const size_t kNumTests = 4;
- std::vector<int64_t> testLatencies[kNumTests] = {
- applyLatencies_1, applyLatencies_2, insertLatencies_1, insertLatencies_2};
- std::vector<int64_t> expectedLatencies[kNumTests] = {
- applyLatencies_1, allApplyLatencies, insertLatencies_1, allInsertLatencies};
- OpReportType testReportTypes[kNumTests] = {OpReportType::CurrentOpReportRecipientRole,
- OpReportType::CumulativeReport,
- OpReportType::CurrentOpReportRecipientRole,
- OpReportType::CumulativeReport};
- std::string histogramTag[kNumTests] = {kOplogApplierApplyBatchLatencyMillis,
- kOplogApplierApplyBatchLatencyMillis,
- kCollClonerFillBatchForInsertLatencyMillis,
- kCollClonerFillBatchForInsertLatencyMillis};
-
- auto testLatencyHistogram = [&](std::vector<int64_t> latencies,
- OpReportType reportType,
- std::vector<int64_t> expectedLatencies,
- std::string histogramTag) {
- LOGV2(57700,
- "TestHistogramMetricsForRecipient test case",
- "reportType"_attr = reportType,
- "histogramTag"_attr = histogramTag);
-
- startOperation(ReshardingMetrics::Role::kRecipient);
-
- RecipientStateEnum state = (histogramTag.compare(kOplogApplierApplyBatchLatencyMillis) == 0
- ? RecipientStateEnum::kApplying
- : RecipientStateEnum::kCloning);
- getMetrics()->setRecipientState(state);
-
- for (size_t i = 0; i < latencies.size(); i++) {
- if (histogramTag.compare(kOplogApplierApplyBatchLatencyMillis) == 0) {
- getMetrics()->onOplogApplierApplyBatch(Milliseconds(latencies[i]));
- } else if (histogramTag.compare(kCollClonerFillBatchForInsertLatencyMillis) == 0) {
- getMetrics()->onCollClonerFillBatchForInsert(Milliseconds(latencies[i]));
- } else {
- MONGO_UNREACHABLE;
- }
- }
-
- const auto report = getReport(reportType);
- const auto buckets = report[histogramTag];
-
- BSONObjBuilder expectedBuilder;
- appendExpectedHistogramResult(&expectedBuilder, histogramTag, expectedLatencies);
- const auto expectedHist = expectedBuilder.done();
- const auto expectedBuckets = expectedHist[histogramTag];
-
- ASSERT_EQ(buckets.woCompare(expectedBuckets), 0);
-
- completeOperation(ReshardingMetrics::Role::kRecipient,
- ReshardingOperationStatusEnum::kFailure);
- };
-
- for (size_t i = 0; i < kNumTests; i++) {
- testLatencyHistogram(
- testLatencies[i], testReportTypes[i], expectedLatencies[i], histogramTag[i]);
- }
-}
-
-TEST_F(ReshardingMetricsTest, CurrentOpReportForCoordinator) {
- const auto kCoordinatorState = CoordinatorStateEnum::kInitializing;
- const auto kSomeDuration = Seconds(10);
-
- startOperation(ReshardingMetrics::Role::kCoordinator);
- getMetrics()->setCoordinatorState(kCoordinatorState);
- advanceTime(kSomeDuration);
-
- const ReshardingMetrics::ReporterOptions options(
- ReshardingMetrics::Role::kCoordinator,
- UUID::parse("12345678-1234-1234-1234-123456789cba").getValue(),
- NamespaceString("db", "collection"),
- BSON("id" << 1),
- false);
-
- const auto expected =
- fromjson(fmt::format("{{ type: \"op\","
- "desc: \"ReshardingCoordinatorService {0}\","
- "op: \"command\","
- "ns: \"{1}\","
- "originatingCommand: {{ reshardCollection: \"{1}\","
- "key: {2},"
- "unique: {3},"
- "collation: {{ locale: \"simple\" }} }},"
- "totalOperationTimeElapsedSecs: {4},"
- "coordinatorState: \"{5}\","
- "opStatus: \"running\" }}",
- options.id.toString(),
- options.nss.toString(),
- options.shardKey.toString(),
- options.unique ? "true" : "false",
- durationCount<Seconds>(kSomeDuration),
- CoordinatorState_serializer(kCoordinatorState)));
-
- const auto report = getMetrics()->reportForCurrentOp(options);
- ASSERT_BSONOBJ_EQ(expected, report);
-}
-
-TEST_F(ReshardingMetricsTest, EstimatedRemainingOperationTimeCloning) {
- // Copy N docs @ timePerDoc. Check the progression of the estimated time remaining.
- auto m = getMetrics();
- m->onStart(ReshardingMetrics::Role::kRecipient,
- getGlobalServiceContext()->getFastClockSource()->now());
- auto timePerDocument = Seconds(2);
- int64_t bytesPerDocument = 1024;
- int64_t documentsToCopy = 409;
- int64_t bytesToCopy = bytesPerDocument * documentsToCopy;
- m->setRecipientState(RecipientStateEnum::kCreatingCollection);
- m->setDocumentsToCopy(documentsToCopy, bytesToCopy);
- m->setRecipientState(RecipientStateEnum::kCloning);
- m->startCopyingDocuments(getGlobalServiceContext()->getFastClockSource()->now());
- auto remainingTime = 2 * timePerDocument * documentsToCopy;
- double maxAbsRelErr = 0;
- for (int64_t copied = 0; copied < documentsToCopy; ++copied) {
- double output =
- getReport(OpReportType::CurrentOpReportRecipientRole)[kOpTimeRemaining].Number();
- if (copied == 0) {
- ASSERT_EQ(output, -1);
- } else {
- ASSERT_GTE(output, 0);
- auto expected = durationCount<Seconds>(remainingTime);
- // Check that error is pretty small (it should get better as the operation progresses)
- double absRelErr = std::abs((output - expected) / expected);
- ASSERT_LT(absRelErr, 0.05)
- << "output={}, expected={}, copied={}"_format(output, expected, copied);
- maxAbsRelErr = std::max(maxAbsRelErr, absRelErr);
- }
- m->onDocumentsCopied(1, bytesPerDocument);
- advanceTime(timePerDocument);
- remainingTime -= timePerDocument;
- }
- LOGV2_DEBUG(
- 5422700, 3, "Max absolute relative error observed", "maxAbsRelErr"_attr = maxAbsRelErr);
-}
-
-TEST_F(ReshardingMetricsTest, EstimatedRemainingOperationTimeApplying) {
- // Perform N ops @ timePerOp. Check the progression of the estimated time remaining.
- auto m = getMetrics();
- m->onStart(ReshardingMetrics::Role::kRecipient,
- getGlobalServiceContext()->getFastClockSource()->now());
- m->setRecipientState(RecipientStateEnum::kApplying);
- m->startApplyingOplogEntries(getGlobalServiceContext()->getFastClockSource()->now());
-
- // 1 extra millisecond here because otherwise an error of just 1ms will round this down to the
- // next second.
- auto timePerOp = Milliseconds(1001);
- int64_t fetched = 10000;
- m->onOplogEntriesFetched(fetched);
- auto remainingTime = timePerOp * fetched;
- double maxAbsRelErr = 0;
- for (int64_t applied = 0; applied < fetched; ++applied) {
- double output =
- getReport(OpReportType::CurrentOpReportRecipientRole)[kOpTimeRemaining].Number();
- if (applied == 0) {
- ASSERT_EQ(output, -1);
- } else {
- auto expected = durationCount<Seconds>(remainingTime);
- // Check that error is pretty small (it should get better as the operation progresses)
- double absRelErr = std::abs((output - expected) / expected);
- ASSERT_LT(absRelErr, 0.05)
- << "output={}, expected={}, applied={}"_format(output, expected, applied);
- maxAbsRelErr = std::max(maxAbsRelErr, absRelErr);
- }
- advanceTime(timePerOp);
- m->onOplogEntriesApplied(1);
- remainingTime -= timePerOp;
- }
- LOGV2_DEBUG(
- 5422701, 3, "Max absolute relative error observed", "maxAbsRelErr"_attr = maxAbsRelErr);
-}
-
-TEST_F(ReshardingMetricsTest, CumulativeOpMetricsAccumulate) {
- auto constexpr kTag = "documentsCopied";
- startOperation(ReshardingMetrics::Role::kRecipient);
- const auto kDocumentsToCopy1 = 2;
- const auto kBytesToCopy1 = 200;
-
- getMetrics()->setRecipientState(RecipientStateEnum::kCloning);
- getMetrics()->onDocumentsCopied(kDocumentsToCopy1, kBytesToCopy1);
- completeOperation(ReshardingMetrics::Role::kRecipient, ReshardingOperationStatusEnum::kFailure);
-
- startOperation(ReshardingMetrics::Role::kRecipient);
- const auto kDocumentsToCopy2 = 3;
- const auto kBytesToCopy2 = 400;
-
- getMetrics()->setRecipientState(RecipientStateEnum::kCloning);
- getMetrics()->onDocumentsCopied(kDocumentsToCopy2, kBytesToCopy2);
- completeOperation(ReshardingMetrics::Role::kRecipient, ReshardingOperationStatusEnum::kFailure);
-
- checkMetrics(kTag,
- kDocumentsToCopy1 + kDocumentsToCopy2,
- "Cumulative metrics are not accumulated",
- OpReportType::CumulativeReport);
-}
-
-TEST_F(ReshardingMetricsTest, TestWasReshardingEverAttemptedStartComplete) {
- ASSERT_FALSE(getWasReshardingEverAttempted());
-
- startOperation(ReshardingMetrics::Role::kRecipient);
- ASSERT_TRUE(getWasReshardingEverAttempted());
-
- completeOperation(ReshardingMetrics::Role::kRecipient, ReshardingOperationStatusEnum::kSuccess);
- ASSERT_TRUE(getWasReshardingEverAttempted());
-}
-
-TEST_F(ReshardingMetricsTest, TestWasReshardingEverAttemptedStartStepDownStepUp) {
- ASSERT_FALSE(getWasReshardingEverAttempted());
-
- startOperation(ReshardingMetrics::Role::kRecipient);
- ASSERT_TRUE(getWasReshardingEverAttempted());
-
- stepDownOperation(ReshardingMetrics::Role::kRecipient);
- ASSERT_TRUE(getWasReshardingEverAttempted());
-
- stepUpOperation(ReshardingMetrics::Role::kRecipient);
- ASSERT_TRUE(getWasReshardingEverAttempted());
-}
-
-TEST_F(ReshardingMetricsTest, TestWasReshardingEverAttemptedStepUpStepDown) {
- ASSERT_FALSE(getWasReshardingEverAttempted());
-
- stepUpOperation(ReshardingMetrics::Role::kRecipient);
- ASSERT_TRUE(getWasReshardingEverAttempted());
-
- stepDownOperation(ReshardingMetrics::Role::kRecipient);
- ASSERT_TRUE(getWasReshardingEverAttempted());
-}
-
-TEST_F(ReshardingMetricsTest, TestWasReshardingEverAttemptedStepUpComplete) {
- ASSERT_FALSE(getWasReshardingEverAttempted());
-
- stepUpOperation(ReshardingMetrics::Role::kRecipient);
- ASSERT_TRUE(getWasReshardingEverAttempted());
-
- completeOperation(ReshardingMetrics::Role::kRecipient, ReshardingOperationStatusEnum::kSuccess);
- ASSERT_TRUE(getWasReshardingEverAttempted());
-}
-
-TEST_F(ReshardingMetricsTest, TestOnStepUpWithDonorMetrics) {
- ASSERT_FALSE(getWasReshardingEverAttempted());
-
- getMetrics()->onStepUp(DonorStateEnum::kUnused, ReshardingDonorMetrics());
-
- ASSERT_TRUE(getWasReshardingEverAttempted());
-}
-
-} // namespace
-} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_oplog_application.cpp b/src/mongo/db/s/resharding/resharding_oplog_application.cpp
index c2d013dfbc2..1478a3ec30c 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_application.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_application.cpp
@@ -45,7 +45,6 @@
#include "mongo/db/query/plan_executor.h"
#include "mongo/db/repl/oplog_applier_utils.h"
#include "mongo/db/s/operation_sharding_state.h"
-#include "mongo/db/s/resharding/resharding_metrics.h"
#include "mongo/db/s/resharding/resharding_server_parameters_gen.h"
#include "mongo/db/session_catalog_mongod.h"
#include "mongo/db/stats/counters.h"
@@ -127,7 +126,6 @@ ReshardingOplogApplicationRules::ReshardingOplogApplicationRules(
size_t myStashIdx,
ShardId donorShardId,
ChunkManager sourceChunkMgr,
- ReshardingMetrics* metrics,
ReshardingOplogApplierMetrics* applierMetrics)
: _outputNss(std::move(outputNss)),
_allStashNss(std::move(allStashNss)),
@@ -135,7 +133,6 @@ ReshardingOplogApplicationRules::ReshardingOplogApplicationRules(
_myStashNss(_allStashNss.at(_myStashIdx)),
_donorShardId(std::move(donorShardId)),
_sourceChunkMgr(std::move(sourceChunkMgr)),
- _metrics(metrics),
_applierMetrics(applierMetrics) {}
Status ReshardingOplogApplicationRules::applyOperation(OperationContext* opCtx,
@@ -176,23 +173,18 @@ Status ReshardingOplogApplicationRules::applyOperation(OperationContext* opCtx,
case repl::OpTypeEnum::kInsert:
_applyInsert_inlock(
opCtx, autoCollOutput.getDb(), *autoCollOutput, *autoCollStash, op);
- if (ShardingDataTransformMetrics::isEnabled()) {
- _applierMetrics->onInsertApplied();
- }
+ _applierMetrics->onInsertApplied();
+
break;
case repl::OpTypeEnum::kUpdate:
_applyUpdate_inlock(
opCtx, autoCollOutput.getDb(), *autoCollOutput, *autoCollStash, op);
- if (ShardingDataTransformMetrics::isEnabled()) {
- _applierMetrics->onUpdateApplied();
- }
+ _applierMetrics->onUpdateApplied();
break;
case repl::OpTypeEnum::kDelete:
_applyDelete_inlock(
opCtx, autoCollOutput.getDb(), *autoCollOutput, *autoCollStash, op);
- if (ShardingDataTransformMetrics::isEnabled()) {
- _applierMetrics->onDeleteApplied();
- }
+ _applierMetrics->onDeleteApplied();
break;
default:
MONGO_UNREACHABLE;
@@ -245,7 +237,6 @@ void ReshardingOplogApplicationRules::_applyInsert_inlock(OperationContext* opCt
* 4. If there exists a document with _id == [op _id] in the output collection and it is NOT
* owned by this donor shard, insert the contents of 'op' into the conflict stash collection.
*/
- _metrics->gotInsert();
BSONObj oField = op.getObject();
@@ -273,9 +264,7 @@ void ReshardingOplogApplicationRules::_applyInsert_inlock(OperationContext* opCt
UpdateResult ur = update(opCtx, db, request);
invariant(ur.numMatched != 0);
- if (ShardingDataTransformMetrics::isEnabled()) {
- _applierMetrics->onWriteToStashCollections();
- }
+ _applierMetrics->onWriteToStashCollections();
return;
}
@@ -318,9 +307,7 @@ void ReshardingOplogApplicationRules::_applyInsert_inlock(OperationContext* opCt
uassertStatusOK(stashColl->insertDocument(
opCtx, InsertStatement(oField), nullptr /* nullOpDebug */, false /* fromMigrate */));
- if (ShardingDataTransformMetrics::isEnabled()) {
- _applierMetrics->onWriteToStashCollections();
- }
+ _applierMetrics->onWriteToStashCollections();
}
void ReshardingOplogApplicationRules::_applyUpdate_inlock(OperationContext* opCtx,
@@ -342,7 +329,6 @@ void ReshardingOplogApplicationRules::_applyUpdate_inlock(OperationContext* opCt
* 4. If there exists a document with _id == [op _id] in the output collection and it is owned
* by this donor shard, update the document from this collection.
*/
- _metrics->gotUpdate();
BSONObj oField = op.getObject();
BSONObj o2Field;
@@ -374,9 +360,7 @@ void ReshardingOplogApplicationRules::_applyUpdate_inlock(OperationContext* opCt
invariant(ur.numMatched != 0);
- if (ShardingDataTransformMetrics::isEnabled()) {
- _applierMetrics->onWriteToStashCollections();
- }
+ _applierMetrics->onWriteToStashCollections();
return;
}
@@ -431,7 +415,6 @@ void ReshardingOplogApplicationRules::_applyDelete_inlock(OperationContext* opCt
* _id == [op _id] arbitrarily from among all resharding conflict stash collections to delete
* from that resharding conflict stash collection and insert into the output collection.
*/
- _metrics->gotDelete();
BSONObj oField = op.getObject();
@@ -452,9 +435,7 @@ void ReshardingOplogApplicationRules::_applyDelete_inlock(OperationContext* opCt
auto nDeleted = deleteObjects(opCtx, stashColl, _myStashNss, idQuery, true /* justOne */);
invariant(nDeleted != 0);
- if (ShardingDataTransformMetrics::isEnabled()) {
- _applierMetrics->onWriteToStashCollections();
- }
+ _applierMetrics->onWriteToStashCollections();
return;
}
@@ -540,9 +521,7 @@ void ReshardingOplogApplicationRules::_applyDelete_inlock(OperationContext* opCt
BSONObj res;
auto state = exec->getNext(&res, nullptr);
- if (ShardingDataTransformMetrics::isEnabled()) {
- _applierMetrics->onWriteToStashCollections();
- }
+ _applierMetrics->onWriteToStashCollections();
if (PlanExecutor::ADVANCED == state) {
// We matched a document and deleted it, so break.
diff --git a/src/mongo/db/s/resharding/resharding_oplog_application.h b/src/mongo/db/s/resharding/resharding_oplog_application.h
index 5052627c81b..b8bd3942b40 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_application.h
+++ b/src/mongo/db/s/resharding/resharding_oplog_application.h
@@ -41,7 +41,6 @@
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/replication_coordinator.h"
-#include "mongo/db/s/resharding/resharding_metrics.h"
#include "mongo/db/s/resharding/resharding_oplog_applier_metrics.h"
#include "mongo/s/chunk_manager.h"
@@ -61,7 +60,6 @@ public:
size_t myStashIdx,
ShardId donorShardId,
ChunkManager sourceChunkMgr,
- ReshardingMetrics* metrics,
ReshardingOplogApplierMetrics* applierMetrics);
const NamespaceString& getOutputNss() const {
@@ -121,7 +119,6 @@ private:
// The chunk manager for the source namespace and original shard key.
const ChunkManager _sourceChunkMgr;
- ReshardingMetrics* _metrics;
ReshardingOplogApplierMetrics* _applierMetrics;
};
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp
index 3fafff24f67..cf449c4c00c 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp
@@ -36,7 +36,6 @@
#include "mongo/db/s/resharding/resharding_data_copy_util.h"
#include "mongo/db/s/resharding/resharding_donor_oplog_iterator.h"
#include "mongo/db/s/resharding/resharding_future_util.h"
-#include "mongo/db/s/resharding/resharding_metrics.h"
#include "mongo/db/s/resharding/resharding_util.h"
#include "mongo/logv2/log.h"
#include "mongo/s/sharding_feature_flags_gen.h"
@@ -70,7 +69,6 @@ ReshardingOplogApplier::ReshardingOplogApplier(
myStashIdx,
_sourceId.getShardId(),
std::move(sourceChunkMgr),
- _env->metrics(),
_env->applierMetrics()},
_sessionApplication{std::move(oplogBufferNss)},
_batchApplier{_crudApplication, _sessionApplication},
@@ -122,13 +120,9 @@ SemiFuture<void> ReshardingOplogApplier::_applyBatch(
return status;
})
.onCompletion([this, latencyTimer](Status status) {
- _env->metrics()->onOplogApplierApplyBatch(
+ _env->applierMetrics()->onOplogLocalBatchApplied(
duration_cast<Milliseconds>(latencyTimer.elapsed()));
- if (ShardingDataTransformMetrics::isEnabled()) {
- _env->applierMetrics()->onOplogLocalBatchApplied(
- duration_cast<Milliseconds>(latencyTimer.elapsed()));
- }
return status;
})
.semi();
@@ -154,10 +148,8 @@ SemiFuture<void> ReshardingOplogApplier::run(
.then([this, chainCtx, executor, cancelToken, factory](OplogBatch batch) {
LOGV2_DEBUG(5391002, 3, "Starting batch", "batchSize"_attr = batch.size());
- if (ShardingDataTransformMetrics::isEnabled()) {
- _env->applierMetrics()->onBatchRetrievedDuringOplogApplying(
- duration_cast<Milliseconds>(chainCtx->fetchTimer.elapsed()));
- }
+ _env->applierMetrics()->onBatchRetrievedDuringOplogApplying(
+ duration_cast<Milliseconds>(chainCtx->fetchTimer.elapsed()));
_currentBatchToApply = std::move(batch);
return _applyBatch(executor, cancelToken, factory);
@@ -177,8 +169,10 @@ SemiFuture<void> ReshardingOplogApplier::run(
.then([this, factory] {
if (_currentBatchToApply.empty()) {
// Increment the number of entries applied by 1 in order to account for
- // the final oplog entry.
- _env->metrics()->onOplogEntriesApplied(1);
+ // the final oplog entry that the iterator never returns because it's a
+ // known no-op oplog entry.
+ _env->applierMetrics()->onOplogEntriesApplied(1);
+
return false;
}
@@ -247,29 +241,25 @@ void ReshardingOplogApplier::_clearAppliedOpsAndStoreProgress(OperationContext*
BSON(ReshardingOplogApplierProgress::kNumEntriesAppliedFieldName
<< static_cast<long long>(_currentBatchToApply.size())));
- if (ShardingDataTransformMetrics::isEnabled()) {
- builder.append("$set",
- BSON(ReshardingOplogApplierProgress::kInsertsAppliedFieldName
- << _env->applierMetrics()->getInsertsApplied()));
- builder.append("$set",
- BSON(ReshardingOplogApplierProgress::kUpdatesAppliedFieldName
- << _env->applierMetrics()->getUpdatesApplied()));
- builder.append("$set",
- BSON(ReshardingOplogApplierProgress::kDeletesAppliedFieldName
- << _env->applierMetrics()->getDeletesApplied()));
- builder.append("$set",
- BSON(ReshardingOplogApplierProgress::kWritesToStashCollectionsFieldName
- << _env->applierMetrics()->getWritesToStashCollections()));
- }
+ builder.append("$set",
+ BSON(ReshardingOplogApplierProgress::kInsertsAppliedFieldName
+ << _env->applierMetrics()->getInsertsApplied()));
+ builder.append("$set",
+ BSON(ReshardingOplogApplierProgress::kUpdatesAppliedFieldName
+ << _env->applierMetrics()->getUpdatesApplied()));
+ builder.append("$set",
+ BSON(ReshardingOplogApplierProgress::kDeletesAppliedFieldName
+ << _env->applierMetrics()->getDeletesApplied()));
+ builder.append("$set",
+ BSON(ReshardingOplogApplierProgress::kWritesToStashCollectionsFieldName
+ << _env->applierMetrics()->getWritesToStashCollections()));
store.upsert(
opCtx,
BSON(ReshardingOplogApplierProgress::kOplogSourceIdFieldName << _sourceId.toBSON()),
builder.obj());
- _env->metrics()->onOplogEntriesApplied(_currentBatchToApply.size());
- if (ShardingDataTransformMetrics::isEnabled()) {
- _env->applierMetrics()->onOplogEntriesApplied(_currentBatchToApply.size());
- }
+
+ _env->applierMetrics()->onOplogEntriesApplied(_currentBatchToApply.size());
_currentBatchToApply.clear();
_currentDerivedOpsForCrudWriters.clear();
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.h b/src/mongo/db/s/resharding/resharding_oplog_applier.h
index cee7ce3af0a..56a7e9d3a0a 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_applier.h
+++ b/src/mongo/db/s/resharding/resharding_oplog_applier.h
@@ -48,7 +48,6 @@
namespace mongo {
-class ReshardingMetrics;
class ServiceContext;
/**
@@ -67,17 +66,12 @@ class ReshardingOplogApplier {
public:
class Env {
public:
- Env(ServiceContext* service,
- ReshardingMetrics* metrics,
- ReshardingOplogApplierMetrics* applierMetrics)
- : _service(service), _metrics(metrics), _applierMetrics(applierMetrics) {}
+ Env(ServiceContext* service, ReshardingOplogApplierMetrics* applierMetrics)
+ : _service(service), _applierMetrics(applierMetrics) {}
ServiceContext* service() const {
return _service;
}
- ReshardingMetrics* metrics() const {
- return _metrics;
- }
ReshardingOplogApplierMetrics* applierMetrics() {
return _applierMetrics;
@@ -85,7 +79,6 @@ public:
private:
ServiceContext* _service;
- ReshardingMetrics* _metrics;
ReshardingOplogApplierMetrics* _applierMetrics;
};
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp
index c6772c93d0b..0e3f5a87504 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp
@@ -44,7 +44,6 @@
#include "mongo/db/s/collection_sharding_runtime.h"
#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/resharding/resharding_donor_oplog_iterator.h"
-#include "mongo/db/s/resharding/resharding_metrics.h"
#include "mongo/db/s/resharding/resharding_oplog_applier.h"
#include "mongo/db/s/resharding/resharding_server_parameters_gen.h"
#include "mongo/db/s/resharding/resharding_util.h"
@@ -159,8 +158,7 @@ public:
_cm = createChunkManagerForOriginalColl();
- _metrics = std::make_unique<ReshardingMetrics>(getServiceContext());
- _metricsNew =
+ _metrics =
ReshardingMetricsNew::makeInstance(kCrudUUID,
BSON("y" << 1),
kCrudNs,
@@ -168,10 +166,7 @@ public:
getServiceContext()->getFastClockSource()->now(),
getServiceContext());
_applierMetrics =
- std::make_unique<ReshardingOplogApplierMetrics>(_metricsNew.get(), boost::none);
- _metrics->onStart(ReshardingMetrics::Role::kRecipient,
- getServiceContext()->getFastClockSource()->now());
- _metrics->setRecipientState(RecipientStateEnum::kApplying);
+ std::make_unique<ReshardingOplogApplierMetrics>(_metrics.get(), boost::none);
_executor = makeTaskExecutorForApplier();
_executor->startup();
@@ -292,15 +287,12 @@ public:
}
BSONObj getMetricsOpCounters() {
- BSONObjBuilder bob;
- _metrics->serializeCumulativeOpMetrics(&bob);
- return bob.obj().getObjectField("opcounters").getOwned();
+ return _metrics->reportForCurrentOp();
}
long long metricsAppliedCount() const {
- BSONObjBuilder bob;
- _metrics->serializeCurrentOpMetrics(&bob, ReshardingMetrics::Role::kRecipient);
- return bob.obj()["oplogEntriesApplied"_sd].Long();
+ auto fullCurOp = _metrics->reportForCurrentOp();
+ return fullCurOp["oplogEntriesApplied"_sd].Long();
}
std::shared_ptr<executor::ThreadPoolTaskExecutor> getExecutor() {
@@ -314,8 +306,8 @@ public:
protected:
auto makeApplierEnv() {
- return std::make_unique<ReshardingOplogApplier::Env>(
- getServiceContext(), _metrics.get(), _applierMetrics.get());
+ return std::make_unique<ReshardingOplogApplier::Env>(getServiceContext(),
+ _applierMetrics.get());
}
std::shared_ptr<executor::ThreadPoolTaskExecutor> makeTaskExecutorForApplier() {
@@ -371,8 +363,7 @@ protected:
boost::optional<ChunkManager> _cm;
const ReshardingSourceId _sourceId{UUID::gen(), kMyShardId};
- std::unique_ptr<ReshardingMetrics> _metrics;
- std::unique_ptr<ReshardingMetricsNew> _metricsNew;
+ std::unique_ptr<ReshardingMetricsNew> _metrics;
std::unique_ptr<ReshardingOplogApplierMetrics> _applierMetrics;
std::shared_ptr<executor::ThreadPoolTaskExecutor> _executor;
@@ -869,9 +860,9 @@ TEST_F(ReshardingOplogApplierTest, MetricsAreReported) {
ASSERT_OK(future.getNoThrow());
auto opCountersObj = getMetricsOpCounters();
- ASSERT_EQ(opCountersObj.getIntField("insert"), 2);
- ASSERT_EQ(opCountersObj.getIntField("update"), 1);
- ASSERT_EQ(opCountersObj.getIntField("delete"), 2);
+ ASSERT_EQ(opCountersObj.getIntField("insertsApplied"), 2);
+ ASSERT_EQ(opCountersObj.getIntField("updatesApplied"), 1);
+ ASSERT_EQ(opCountersObj.getIntField("deletesApplied"), 2);
// The in-memory metrics should show the 5 ops above + the final oplog entry, but on disk should
// not include the final entry in its count.
diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp
index 7e4c2b6c22f..f8af8d80998 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp
@@ -46,7 +46,7 @@
#include "mongo/db/repl/storage_interface_impl.h"
#include "mongo/db/s/op_observer_sharding_impl.h"
#include "mongo/db/s/resharding/resharding_data_copy_util.h"
-#include "mongo/db/s/resharding/resharding_metrics.h"
+#include "mongo/db/s/resharding/resharding_metrics_new.h"
#include "mongo/db/s/resharding/resharding_oplog_application.h"
#include "mongo/db/s/resharding/resharding_oplog_batch_applier.h"
#include "mongo/db/s/resharding/resharding_oplog_session_application.h"
@@ -57,6 +57,7 @@
#include "mongo/db/vector_clock_metadata_hook.h"
#include "mongo/executor/network_interface_factory.h"
#include "mongo/executor/thread_pool_task_executor.h"
+#include "mongo/idl/server_parameter_test_util.h"
#include "mongo/rpc/metadata/egress_metadata_hook_list.h"
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/chunk_manager.h"
@@ -110,7 +111,6 @@ public:
opCtx.get(), nss, CollectionOptions{});
}
- _metrics = std::make_unique<ReshardingMetrics>(serviceContext);
_metricsNew =
ReshardingMetricsNew::makeInstance(UUID::gen(),
BSON("y" << 1),
@@ -126,7 +126,6 @@ public:
0U,
_myDonorId,
makeChunkManagerForSourceCollection(),
- _metrics.get(),
_applierMetrics.get());
_sessionApplication =
@@ -363,7 +362,6 @@ private:
getLocalConflictStashNamespace(_sourceUUID, _otherDonorId);
const NamespaceString _myOplogBufferNss = getLocalOplogBufferNamespace(_sourceUUID, _myDonorId);
- std::unique_ptr<ReshardingMetrics> _metrics;
std::unique_ptr<ReshardingMetricsNew> _metricsNew;
std::unique_ptr<ReshardingOplogApplierMetrics> _applierMetrics;
diff --git a/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp
index 96282217dad..4114100a5bc 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp
@@ -47,7 +47,7 @@
#include "mongo/db/s/collection_sharding_runtime.h"
#include "mongo/db/s/op_observer_sharding_impl.h"
#include "mongo/db/s/resharding/resharding_data_copy_util.h"
-#include "mongo/db/s/resharding/resharding_metrics.h"
+#include "mongo/db/s/resharding/resharding_metrics_new.h"
#include "mongo/db/s/resharding/resharding_oplog_application.h"
#include "mongo/db/s/resharding/resharding_util.h"
#include "mongo/db/s/sharding_state.h"
@@ -112,7 +112,6 @@ public:
CollectionMetadata(makeChunkManagerForOutputCollection(), _myDonorId));
}
- _metrics = std::make_unique<ReshardingMetrics>(getServiceContext());
_metricsNew =
ReshardingMetricsNew::makeInstance(_sourceUUID,
BSON(_newShardKey << 1),
@@ -128,7 +127,6 @@ public:
0U,
_myDonorId,
makeChunkManagerForSourceCollection(),
- _metrics.get(),
_oplogApplierMetrics.get());
}
}
@@ -343,7 +341,6 @@ private:
getLocalConflictStashNamespace(_sourceUUID, _otherDonorId);
std::unique_ptr<ReshardingOplogApplicationRules> _applier;
- std::unique_ptr<ReshardingMetrics> _metrics;
std::unique_ptr<ReshardingMetricsNew> _metricsNew;
std::unique_ptr<ReshardingOplogApplierMetrics> _oplogApplierMetrics;
};
diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp
index 94071745f24..41f87420e70 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp
@@ -45,7 +45,6 @@
#include "mongo/db/pipeline/aggregate_command_gen.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/db/repl/read_concern_level.h"
-#include "mongo/db/s/resharding/resharding_metrics.h"
#include "mongo/db/s/resharding/resharding_metrics_new.h"
#include "mongo/db/s/resharding/resharding_util.h"
#include "mongo/db/s/sharding_data_transform_cumulative_metrics.h"
@@ -327,10 +326,8 @@ bool ReshardingOplogFetcher::consume(Client* client,
[this, &batchesProcessed, &moreToCome, &opCtxRaii, &batchFetchTimer, factory](
const std::vector<BSONObj>& batch,
const boost::optional<BSONObj>& postBatchResumeToken) {
- if (ShardingDataTransformMetrics::isEnabled()) {
- _env->metricsNew()->onOplogEntriesFetched(batch.size(),
- Milliseconds(batchFetchTimer.millis()));
- }
+ _env->metricsNew()->onOplogEntriesFetched(batch.size(),
+ Milliseconds(batchFetchTimer.millis()));
ThreadClient client(fmt::format("ReshardingFetcher-{}-{}",
_reshardingUUID.toString(),
@@ -357,15 +354,11 @@ bool ReshardingOplogFetcher::consume(Client* client,
uassertStatusOK(toWriteTo->insertDocument(opCtx, InsertStatement{doc}, nullptr));
wuow.commit();
- if (ShardingDataTransformMetrics::isEnabled()) {
- _env->metricsNew()->onLocalInsertDuringOplogFetching(
- Milliseconds(insertTimer.millis()));
- }
+ _env->metricsNew()->onLocalInsertDuringOplogFetching(
+ Milliseconds(insertTimer.millis()));
++_numOplogEntriesCopied;
- _env->metrics()->onOplogEntriesFetched(1);
-
auto [p, f] = makePromiseFuture<void>();
{
stdx::lock_guard lk(_mutex);
@@ -409,10 +402,7 @@ bool ReshardingOplogFetcher::consume(Client* client,
// Also include synthetic oplog in the fetched count so it can match up with the
// total oplog applied count in the end.
- _env->metrics()->onOplogEntriesFetched(1);
- if (ShardingDataTransformMetrics::isEnabled()) {
- _env->metricsNew()->onOplogEntriesFetched(1, Milliseconds(0));
- }
+ _env->metricsNew()->onOplogEntriesFetched(1, Milliseconds(0));
auto [p, f] = makePromiseFuture<void>();
{
diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.h b/src/mongo/db/s/resharding/resharding_oplog_fetcher.h
index d0d53395647..5772c6bdfaa 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_fetcher.h
+++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.h
@@ -50,28 +50,24 @@
namespace mongo {
-class ReshardingMetrics;
class ReshardingMetricsNew;
class ReshardingOplogFetcher : public resharding::OnInsertAwaitable {
public:
class Env {
public:
- Env(ServiceContext* service, ReshardingMetrics* metrics, ReshardingMetricsNew* metricsNew)
- : _service(service), _metrics(metrics), _metricsNew(metricsNew) {}
+ Env(ServiceContext* service, ReshardingMetricsNew* metricsNew)
+ : _service(service), _metricsNew(metricsNew) {}
ServiceContext* service() const {
return _service;
}
- ReshardingMetrics* metrics() const {
- return _metrics;
- }
+
ReshardingMetricsNew* metricsNew() const {
return _metricsNew;
}
private:
ServiceContext* _service;
- ReshardingMetrics* _metrics;
ReshardingMetricsNew* _metricsNew;
};
diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp
index 118778f0b90..17624acced9 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp
@@ -45,7 +45,6 @@
#include "mongo/db/repl/storage_interface_impl.h"
#include "mongo/db/repl/wait_for_majority_service.h"
#include "mongo/db/s/operation_sharding_state.h"
-#include "mongo/db/s/resharding/resharding_metrics.h"
#include "mongo/db/s/resharding/resharding_metrics_new.h"
#include "mongo/db/s/resharding/resharding_oplog_fetcher.h"
#include "mongo/db/s/resharding/resharding_util.h"
@@ -99,12 +98,7 @@ public:
OldClientContext ctx(_opCtx, NamespaceString::kRsOplogNamespace.ns());
}
- // Initialize ReshardingMetrics to a recipient state compatible with fetching.
- _metrics = std::make_unique<ReshardingMetrics>(_svcCtx);
- _metrics->onStart(ReshardingMetrics::Role::kRecipient,
- _svcCtx->getFastClockSource()->now());
- _metrics->setRecipientState(RecipientStateEnum::kCloning);
- _metricsNew =
+ _metrics =
ReshardingMetricsNew::makeInstance(_reshardingUUID,
BSON("y" << 1),
NamespaceString{""},
@@ -138,8 +132,7 @@ public:
}
auto makeFetcherEnv() {
- return std::make_unique<ReshardingOplogFetcher::Env>(
- _svcCtx, _metrics.get(), _metricsNew.get());
+ return std::make_unique<ReshardingOplogFetcher::Env>(_svcCtx, _metrics.get());
}
/**
@@ -324,9 +317,8 @@ public:
}
long long metricsFetchedCount() const {
- BSONObjBuilder bob;
- _metrics->serializeCurrentOpMetrics(&bob, ReshardingMetrics::Role::kRecipient);
- return bob.obj()["oplogEntriesFetched"_sd].Long();
+ auto curOp = _metrics->reportForCurrentOp();
+ return curOp["oplogEntriesFetched"_sd].Long();
}
CancelableOperationContextFactory makeCancelableOpCtx() {
@@ -351,8 +343,7 @@ protected:
Timestamp _fetchTimestamp;
ShardId _donorShard;
ShardId _destinationShard;
- std::unique_ptr<ReshardingMetrics> _metrics;
- std::unique_ptr<ReshardingMetricsNew> _metricsNew;
+ std::unique_ptr<ReshardingMetricsNew> _metrics;
private:
static HostAndPort makeHostAndPort(const ShardId& shardId) {
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
index fc225f6f801..baaf64fc5e3 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
@@ -51,7 +51,6 @@
#include "mongo/db/s/resharding/resharding_change_event_o2_field_gen.h"
#include "mongo/db/s/resharding/resharding_data_copy_util.h"
#include "mongo/db/s/resharding/resharding_future_util.h"
-#include "mongo/db/s/resharding/resharding_metrics.h"
#include "mongo/db/s/resharding/resharding_metrics_helpers.h"
#include "mongo/db/s/resharding/resharding_oplog_applier.h"
#include "mongo/db/s/resharding/resharding_recipient_service_external_state.h"
@@ -163,41 +162,8 @@ void buildStateDocumentMetricsForUpdate(BSONObjBuilder& bob,
}
}
-ShardingDataTransformCumulativeMetrics::RecipientStateEnum toMetricsState(
- RecipientStateEnum enumVal) {
- using MetricsEnum = ShardingDataTransformCumulativeMetrics::RecipientStateEnum;
-
- switch (enumVal) {
- case RecipientStateEnum::kUnused:
- return MetricsEnum::kUnused;
-
- case RecipientStateEnum::kAwaitingFetchTimestamp:
- return MetricsEnum::kAwaitingFetchTimestamp;
-
- case RecipientStateEnum::kCreatingCollection:
- return MetricsEnum::kCreatingCollection;
-
- case RecipientStateEnum::kCloning:
- return MetricsEnum::kCloning;
-
- case RecipientStateEnum::kApplying:
- return MetricsEnum::kApplying;
-
- case RecipientStateEnum::kError:
- return MetricsEnum::kError;
-
- case RecipientStateEnum::kStrictConsistency:
- return MetricsEnum::kStrictConsistency;
-
- case RecipientStateEnum::kDone:
- return MetricsEnum::kDone;
-
- default:
- invariant(false,
- str::stream() << "Unexpected resharding coordinator state: "
- << RecipientState_serializer(enumVal));
- MONGO_UNREACHABLE;
- }
+ReshardingMetricsNew::RecipientState toMetricsState(RecipientStateEnum state) {
+ return ReshardingMetricsNew::RecipientState(state);
}
} // namespace
@@ -224,10 +190,7 @@ ReshardingRecipientService::RecipientStateMachine::RecipientStateMachine(
ReshardingDataReplicationFactory dataReplicationFactory)
: repl::PrimaryOnlyService::TypedInstance<RecipientStateMachine>(),
_recipientService{recipientService},
- _metricsNew{
- ShardingDataTransformMetrics::isEnabled()
- ? ReshardingMetricsNew::initializeFrom(recipientDoc, getGlobalServiceContext())
- : nullptr},
+ _metricsNew{ReshardingMetricsNew::initializeFrom(recipientDoc, getGlobalServiceContext())},
_metadata{recipientDoc.getCommonReshardingMetadata()},
_minimumOperationDuration{Milliseconds{recipientDoc.getMinimumOperationDurationMillis()}},
_recipientCtx{recipientDoc.getMutableState()},
@@ -256,9 +219,7 @@ ReshardingRecipientService::RecipientStateMachine::RecipientStateMachine(
}()) {
invariant(_externalState);
- if (ShardingDataTransformMetrics::isEnabled()) {
- _metricsNew->onStateTransition(boost::none, toMetricsState(_recipientCtx.getState()));
- }
+ _metricsNew->onStateTransition(boost::none, toMetricsState(_recipientCtx.getState()));
}
ExecutorFuture<void>
@@ -417,8 +378,6 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_finishR
_metadata.getSourceNss(),
_critSecReason,
ShardingCatalogClient::kLocalWriteConcern);
-
- _metrics()->leaveCriticalSection(getCurrentTime());
}
})
.then([this, executor, &factory] {
@@ -458,15 +417,7 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_runMand
self = shared_from_this(),
outerStatus = status,
isCanceled = stepdownToken.isCanceled()](Status dataReplicationHaltStatus) {
- if (isCanceled) {
- // Interrupt occurred, ensure the metrics get shut down.
- _metrics()->onStepDown(ReshardingMetrics::Role::kRecipient);
- }
-
- if (ShardingDataTransformMetrics::isEnabled()) {
- _metricsNew->onStateTransition(toMetricsState(_recipientCtx.getState()),
- boost::none);
- }
+ _metricsNew->onStateTransition(toMetricsState(_recipientCtx.getState()), boost::none);
// If the stepdownToken was triggered, it takes priority in order to make sure that
// the promise is set with an error that the coordinator can retry with. If it ran into
@@ -553,16 +504,7 @@ void ReshardingRecipientService::RecipientStateMachine::interrupt(Status status)
boost::optional<BSONObj> ReshardingRecipientService::RecipientStateMachine::reportForCurrentOp(
MongoProcessInterface::CurrentOpConnectionsMode,
MongoProcessInterface::CurrentOpSessionsMode) noexcept {
- if (ShardingDataTransformMetrics::isEnabled()) {
- return _metricsNew->reportForCurrentOp();
- }
-
- ReshardingMetrics::ReporterOptions options(ReshardingMetrics::Role::kRecipient,
- _metadata.getReshardingUUID(),
- _metadata.getSourceNss(),
- _metadata.getReshardingKey().toBSON(),
- false);
- return _metrics()->reportForCurrentOp(options);
+ return _metricsNew->reportForCurrentOp();
}
void ReshardingRecipientService::RecipientStateMachine::onReshardingFieldsChanges(
@@ -608,12 +550,8 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
ReshardingRecipientService::RecipientStateMachine::CloneDetails cloneDetails) {
_transitionToCreatingCollection(
cloneDetails, (*executor)->now() + _minimumOperationDuration, factory);
- _metrics()->setDocumentsToCopy(cloneDetails.approxDocumentsToCopy,
- cloneDetails.approxBytesToCopy);
- if (ShardingDataTransformMetrics::isEnabled()) {
- _metricsNew->setDocumentsToCopyCounts(cloneDetails.approxDocumentsToCopy,
- cloneDetails.approxBytesToCopy);
- }
+ _metricsNew->setDocumentsToCopyCounts(cloneDetails.approxDocumentsToCopy,
+ cloneDetails.approxBytesToCopy);
});
}
@@ -687,7 +625,6 @@ ReshardingRecipientService::RecipientStateMachine::_makeDataReplication(Operatio
}
return _dataReplicationFactory(opCtx,
- _metrics(),
_metricsNew.get(),
&_applierMetricsMap,
_metadata,
@@ -806,8 +743,6 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
_metadata.getSourceNss(),
_critSecReason,
ShardingCatalogClient::kLocalWriteConcern);
-
- _metrics()->enterCriticalSection(getCurrentTime());
}
_transitionToStrictConsistency(factory);
@@ -911,11 +846,7 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionState(
_updateRecipientDocument(
std::move(newRecipientCtx), std::move(cloneDetails), std::move(configStartTime), factory);
- _metrics()->setRecipientState(newState);
-
- if (ShardingDataTransformMetrics::isEnabled()) {
- _metricsNew->onStateTransition(toMetricsState(oldState), toMetricsState(newState));
- }
+ _metricsNew->onStateTransition(toMetricsState(oldState), toMetricsState(newState));
LOGV2_INFO(5279506,
"Transitioned resharding recipient state",
@@ -940,13 +871,10 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionToCreatingCol
void ReshardingRecipientService::RecipientStateMachine::_transitionToCloning(
const CancelableOperationContextFactory& factory) {
- if (ShardingDataTransformMetrics::isEnabled()) {
- _metricsNew->onCopyingBegin();
- }
+ _metricsNew->onCopyingBegin();
auto newRecipientCtx = _recipientCtx;
newRecipientCtx.setState(RecipientStateEnum::kCloning);
_transitionState(std::move(newRecipientCtx), boost::none, boost::none, factory);
- _metrics()->startCopyingDocuments(getCurrentTime());
}
void ReshardingRecipientService::RecipientStateMachine::_transitionToApplying(
@@ -954,13 +882,9 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionToApplying(
auto newRecipientCtx = _recipientCtx;
newRecipientCtx.setState(RecipientStateEnum::kApplying);
_transitionState(std::move(newRecipientCtx), boost::none, boost::none, factory);
- auto currentTime = getCurrentTime();
- _metrics()->endCopyingDocuments(currentTime);
- _metrics()->startApplyingOplogEntries(currentTime);
- if (ShardingDataTransformMetrics::isEnabled()) {
- _metricsNew->onCopyingEnd();
- _metricsNew->onApplyingBegin();
- }
+
+ _metricsNew->onCopyingEnd();
+ _metricsNew->onApplyingBegin();
}
void ReshardingRecipientService::RecipientStateMachine::_transitionToStrictConsistency(
@@ -968,11 +892,8 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionToStrictConsi
auto newRecipientCtx = _recipientCtx;
newRecipientCtx.setState(RecipientStateEnum::kStrictConsistency);
_transitionState(std::move(newRecipientCtx), boost::none, boost::none, factory);
- auto currentTime = getCurrentTime();
- _metrics()->endApplyingOplogEntries(currentTime);
- if (ShardingDataTransformMetrics::isEnabled()) {
- _metricsNew->onApplyingEnd();
- }
+
+ _metricsNew->onApplyingEnd();
}
void ReshardingRecipientService::RecipientStateMachine::_transitionToError(
@@ -1131,10 +1052,8 @@ void ReshardingRecipientService::RecipientStateMachine::_updateRecipientDocument
*configStartTime);
}
- if (ShardingDataTransformMetrics::isEnabled()) {
- buildStateDocumentMetricsForUpdate(
- setBuilder, _metricsNew.get(), newRecipientCtx.getState());
- }
+ buildStateDocumentMetricsForUpdate(
+ setBuilder, _metricsNew.get(), newRecipientCtx.getState());
setBuilder.doneFast();
}
@@ -1175,23 +1094,10 @@ void ReshardingRecipientService::RecipientStateMachine::_removeRecipientDocument
WriteUnitOfWork wuow(opCtx.get());
- opCtx->recoveryUnit()->onCommit(
- [this, aborted](boost::optional<Timestamp> unusedCommitTime) {
- stdx::lock_guard<Latch> lk(_mutex);
- if (aborted) {
- _metrics()->onCompletion(ReshardingMetrics::Role::kRecipient,
- _userCanceled.get() == true
- ? ReshardingOperationStatusEnum::kCanceled
- : ReshardingOperationStatusEnum::kFailure,
- getCurrentTime());
- } else {
- _metrics()->onCompletion(ReshardingMetrics::Role::kRecipient,
- ReshardingOperationStatusEnum::kSuccess,
- getCurrentTime());
- }
-
- _completionPromise.emplaceValue();
- });
+ opCtx->recoveryUnit()->onCommit([this](boost::optional<Timestamp> unusedCommitTime) {
+ stdx::lock_guard<Latch> lk(_mutex);
+ _completionPromise.emplaceValue();
+ });
deleteObjects(opCtx.get(),
*coll,
@@ -1204,25 +1110,19 @@ void ReshardingRecipientService::RecipientStateMachine::_removeRecipientDocument
});
}
-ReshardingMetrics* ReshardingRecipientService::RecipientStateMachine::_metrics() const {
- return ReshardingMetrics::get(cc().getServiceContext());
-}
-
ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_startMetrics(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
const CancellationToken& abortToken) {
if (_recipientCtx.getState() > RecipientStateEnum::kAwaitingFetchTimestamp) {
- _metrics()->onStepUp(ReshardingMetrics::Role::kRecipient);
return _restoreMetricsWithRetry(executor, abortToken);
}
- _metrics()->onStart(ReshardingMetrics::Role::kRecipient, getCurrentTime());
+
return ExecutorFuture<void>(**executor);
}
ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_restoreMetricsWithRetry(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
const CancellationToken& abortToken) {
- _metrics()->setRecipientState(_recipientCtx.getState());
return _retryingCancelableOpCtxFactory
->withAutomaticRetry(
[this, executor, abortToken](const auto& factory) { _restoreMetrics(factory); })
@@ -1250,8 +1150,8 @@ void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics(
documentBytesCopied = tempReshardingColl->dataSize(opCtx.get());
documentCountCopied = tempReshardingColl->numRecords(opCtx.get());
}
- if (ShardingDataTransformMetrics::isEnabled() &&
- _recipientCtx.getState() == RecipientStateEnum::kCloning) {
+
+ if (_recipientCtx.getState() == RecipientStateEnum::kCloning) {
// Before cloning, these values are 0. After cloning these values are written to the
// metrics section of the recipient state document and restored during metrics
// initialization. This is so that applied oplog entries that add or remove documents do
@@ -1296,9 +1196,7 @@ void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics(
}
}
- if (ShardingDataTransformMetrics::isEnabled()) {
- progressDocList.emplace_back(donor.getShardId(), progressDoc);
- }
+ progressDocList.emplace_back(donor.getShardId(), progressDoc);
}
// Restore stats here where interrupts will never occur, this is to ensure we will only update
@@ -1321,12 +1219,8 @@ void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics(
_applierMetricsMap.emplace(shardId, std::move(applierMetrics));
}
- if (ShardingDataTransformMetrics::isEnabled()) {
- _metricsNew->restoreOplogEntriesFetched(oplogEntriesFetched);
- }
-
- _metrics()->restoreForCurrentOp(
- documentCountCopied, documentBytesCopied, oplogEntriesFetched, oplogEntriesApplied);
+ _metricsNew->restoreOplogEntriesFetched(oplogEntriesFetched);
+ _metricsNew->restoreOplogEntriesApplied(oplogEntriesApplied);
}
CancellationToken ReshardingRecipientService::RecipientStateMachine::_initAbortSource(
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h
index 832e1dd8083..fc41ba0e9ee 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.h
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.h
@@ -271,8 +271,6 @@ private:
const CancellationToken& abortToken,
const CancelableOperationContextFactory& factory);
- ReshardingMetrics* _metrics() const;
-
ExecutorFuture<void> _startMetrics(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
const CancellationToken& abortToken);
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
index 50d635016f7..78316aacca7 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
@@ -43,7 +43,6 @@
#include "mongo/db/s/resharding/resharding_change_event_o2_field_gen.h"
#include "mongo/db/s/resharding/resharding_data_copy_util.h"
#include "mongo/db/s/resharding/resharding_data_replication.h"
-#include "mongo/db/s/resharding/resharding_metrics.h"
#include "mongo/db/s/resharding/resharding_oplog_applier_progress_gen.h"
#include "mongo/db/s/resharding/resharding_recipient_service.h"
#include "mongo/db/s/resharding/resharding_recipient_service_external_state.h"
@@ -235,11 +234,6 @@ public:
return _controller.get();
}
- ReshardingMetrics* metrics() {
- auto serviceContext = getServiceContext();
- return ReshardingMetrics::get(serviceContext);
- }
-
BSONObj newShardKeyPattern() {
return BSON("newKey" << 1);
}
@@ -262,6 +256,7 @@ public:
sourceUUID,
constructTemporaryReshardingNss(sourceNss.db(), sourceUUID),
newShardKeyPattern());
+ commonMetadata.setStartTime(getServiceContext()->getFastClockSource()->now());
doc.setCommonReshardingMetadata(std::move(commonMetadata));
return doc;
@@ -511,7 +506,6 @@ DEATH_TEST_REGEX_F(ReshardingRecipientServiceTest, CommitFn, "4457001.*tripwire"
}
TEST_F(ReshardingRecipientServiceTest, DropsTemporaryReshardingCollectionOnAbort) {
- auto metrics = ReshardingRecipientServiceTest::metrics();
for (bool isAlsoDonor : {false, true}) {
LOGV2(5551107,
"Running case",
@@ -571,11 +565,6 @@ TEST_F(ReshardingRecipientServiceTest, DropsTemporaryReshardingCollectionOnAbort
ASSERT_FALSE(bool(coll));
}
}
-
- BSONObjBuilder result;
- metrics->serializeCumulativeOpMetrics(&result);
-
- ASSERT_EQ(result.obj().getField("countReshardingFailures").numberLong(), 2);
}
TEST_F(ReshardingRecipientServiceTest, RenamesTemporaryReshardingCollectionWhenDone) {
@@ -707,8 +696,6 @@ TEST_F(ReshardingRecipientServiceTest, WritesNoopOplogEntryForImplicitShardColle
}
TEST_F(ReshardingRecipientServiceTest, TruncatesXLErrorOnRecipientDocument) {
- auto metrics = ReshardingRecipientServiceTest::metrics();
-
for (bool isAlsoDonor : {false, true}) {
LOGV2(5568600,
"Running case",
@@ -761,14 +748,9 @@ TEST_F(ReshardingRecipientServiceTest, TruncatesXLErrorOnRecipientDocument) {
recipient->abort(false);
ASSERT_OK(recipient->getCompletionFuture().getNoThrow());
}
- BSONObjBuilder result;
- metrics->serializeCumulativeOpMetrics(&result);
-
- ASSERT_EQ(result.obj().getField("countReshardingFailures").numberLong(), 2);
}
TEST_F(ReshardingRecipientServiceTest, MetricsSuccessfullyShutDownOnUserCancelation) {
- auto metrics = ReshardingRecipientServiceTest::metrics();
auto doc = makeStateDocument(false);
auto opCtx = makeOperationContext();
RecipientStateMachine::insertStateDocument(opCtx.get(), doc);
@@ -781,11 +763,6 @@ TEST_F(ReshardingRecipientServiceTest, MetricsSuccessfullyShutDownOnUserCancelat
recipient->abort(true);
ASSERT_OK(recipient->getCompletionFuture().getNoThrow());
- BSONObjBuilder result;
- metrics->serializeCumulativeOpMetrics(&result);
- BSONObj obj = result.obj();
- ASSERT_EQ(obj.getField("countReshardingCanceled").numberLong(), 1);
- ASSERT_EQ(obj.getField("countReshardingFailures").numberLong(), 0);
}
TEST_F(ReshardingRecipientServiceTest, RestoreMetricsAfterStepUp) {
@@ -883,8 +860,9 @@ TEST_F(ReshardingRecipientServiceTest, RestoreMetricsAfterStepUp) {
MongoProcessInterface::CurrentOpConnectionsMode::kExcludeIdle,
MongoProcessInterface::CurrentOpSessionsMode::kExcludeIdle)
.get();
- ASSERT_EQ(currOp.getField("documentsCopied").Long(), 1L);
- ASSERT_EQ(currOp.getField("bytesCopied").Long(), (long)reshardedDoc.objsize());
+
+ ASSERT_EQ(currOp.getField("documentsCopied").numberLong(), 1L);
+ ASSERT_EQ(currOp.getField("bytesCopied").numberLong(), (long)reshardedDoc.objsize());
ASSERT_EQ(currOp.getStringField("recipientState"),
RecipientState_serializer(RecipientStateEnum::kApplying));
} else if (state == RecipientStateEnum::kDone) {
@@ -893,11 +871,12 @@ TEST_F(ReshardingRecipientServiceTest, RestoreMetricsAfterStepUp) {
MongoProcessInterface::CurrentOpConnectionsMode::kExcludeIdle,
MongoProcessInterface::CurrentOpSessionsMode::kExcludeIdle)
.get();
- ASSERT_EQ(currOp.getField("documentsCopied").Long(), 1L);
- ASSERT_EQ(currOp.getField("bytesCopied").Long(), (long)reshardedDoc.objsize());
- ASSERT_EQ(currOp.getField("oplogEntriesFetched").Long(),
+
+ ASSERT_EQ(currOp.getField("documentsCopied").numberLong(), 1L);
+ ASSERT_EQ(currOp.getField("bytesCopied").numberLong(), (long)reshardedDoc.objsize());
+ ASSERT_EQ(currOp.getField("oplogEntriesFetched").numberLong(),
(long)(1 * doc.getDonorShards().size()));
- ASSERT_EQ(currOp.getField("oplogEntriesApplied").Long(),
+ ASSERT_EQ(currOp.getField("oplogEntriesApplied").numberLong(),
oplogEntriesAppliedOnEachDonor * doc.getDonorShards().size());
ASSERT_EQ(currOp.getStringField("recipientState"),
RecipientState_serializer(RecipientStateEnum::kStrictConsistency));
@@ -922,9 +901,6 @@ TEST_F(ReshardingRecipientServiceTest, RestoreMetricsAfterStepUp) {
}
TEST_F(ReshardingRecipientServiceTest, RestoreMetricsAfterStepUpWithMissingProgressDoc) {
- RAIIServerParameterControllerForTest featureFlagController(
- "featureFlagShardingDataTransformMetrics", true);
-
auto doc = makeStateDocument(false);
auto instanceId =
BSON(ReshardingRecipientDocument::kReshardingUUIDFieldName << doc.getReshardingUUID());
diff --git a/src/mongo/db/s/resharding/resharding_util.cpp b/src/mongo/db/s/resharding/resharding_util.cpp
index ce51b7d25a1..d9a04064d3c 100644
--- a/src/mongo/db/s/resharding/resharding_util.cpp
+++ b/src/mongo/db/s/resharding/resharding_util.cpp
@@ -48,7 +48,6 @@
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/resharding/document_source_resharding_add_resume_id.h"
#include "mongo/db/s/resharding/document_source_resharding_iterate_transaction.h"
-#include "mongo/db/s/resharding/resharding_metrics.h"
#include "mongo/db/s/resharding/resharding_metrics_new.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/storage/write_unit_of_work.h"
diff --git a/src/mongo/db/s/resharding_test_commands.cpp b/src/mongo/db/s/resharding_test_commands.cpp
index ad34b6716ba..61fa835829f 100644
--- a/src/mongo/db/s/resharding_test_commands.cpp
+++ b/src/mongo/db/s/resharding_test_commands.cpp
@@ -37,7 +37,6 @@
#include "mongo/db/commands.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/s/resharding/resharding_collection_cloner.h"
-#include "mongo/db/s/resharding/resharding_metrics.h"
#include "mongo/db/s/resharding/resharding_metrics_new.h"
#include "mongo/db/s/resharding_test_commands_gen.h"
#include "mongo/db/vector_clock_metadata_hook.h"
@@ -80,21 +79,13 @@ public:
}
};
- ReshardingMetrics metrics(opCtx->getServiceContext());
- metrics.onStart(ReshardingMetrics::Role::kRecipient,
- opCtx->getServiceContext()->getFastClockSource()->now());
- metrics.setRecipientState(RecipientStateEnum::kCloning);
-
- std::unique_ptr<ReshardingMetricsNew> metricsNew;
- if (ShardingDataTransformMetrics::isEnabled()) {
- metricsNew = ReshardingMetricsNew::makeInstance(
- request().getUuid(),
- request().getShardKey(),
- ns(),
- ReshardingMetricsNew::Role::kRecipient,
- opCtx->getServiceContext()->getFastClockSource()->now(),
- opCtx->getServiceContext());
- }
+ auto metrics = ReshardingMetricsNew::makeInstance(
+ request().getUuid(),
+ request().getShardKey(),
+ ns(),
+ ReshardingMetricsNew::Role::kRecipient,
+ opCtx->getServiceContext()->getFastClockSource()->now(),
+ opCtx->getServiceContext());
auto hookList = std::make_unique<rpc::EgressMetadataHookList>();
hookList->addHook(
@@ -106,14 +97,13 @@ public:
"TestReshardCloneCollectionNetwork", nullptr, std::move(hookList)));
executor->startup();
- ReshardingCollectionCloner cloner(
- std::make_unique<ReshardingCollectionCloner::Env>(&metrics, metricsNew.get()),
- ShardKeyPattern(request().getShardKey()),
- ns(),
- request().getUuid(),
- request().getShardId(),
- request().getAtClusterTime(),
- request().getOutputNs());
+ ReshardingCollectionCloner cloner(metrics.get(),
+ ShardKeyPattern(request().getShardKey()),
+ ns(),
+ request().getUuid(),
+ request().getShardId(),
+ request().getAtClusterTime(),
+ request().getOutputNs());
std::shared_ptr<ThreadPool> cancelableOperationContextPool = [] {
ThreadPool::Options options;
diff --git a/src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp b/src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp
index d532488fc79..add2ac6f728 100644
--- a/src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp
+++ b/src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp
@@ -349,20 +349,16 @@ void ShardingDataTransformCumulativeMetrics::onStarted() {
_countStarted.fetchAndAdd(1);
}
-void ShardingDataTransformCumulativeMetrics::onCompletion(ReshardingOperationStatusEnum status) {
- switch (status) {
- case ReshardingOperationStatusEnum::kSuccess:
- _countSucceeded.fetchAndAdd(1);
- break;
- case ReshardingOperationStatusEnum::kFailure:
- _countFailed.fetchAndAdd(1);
- break;
- case ReshardingOperationStatusEnum::kCanceled:
- _countCancelled.fetchAndAdd(1);
- break;
- default:
- MONGO_UNREACHABLE;
- }
+void ShardingDataTransformCumulativeMetrics::onSuccess() {
+ _countSucceeded.fetchAndAdd(1);
+}
+
+void ShardingDataTransformCumulativeMetrics::onFailure() {
+ _countFailed.fetchAndAdd(1);
+}
+
+void ShardingDataTransformCumulativeMetrics::onCanceled() {
+ _countCancelled.fetchAndAdd(1);
}
void ShardingDataTransformCumulativeMetrics::setLastOpEndingChunkImbalance(int64_t imbalanceCount) {
diff --git a/src/mongo/db/s/sharding_data_transform_cumulative_metrics.h b/src/mongo/db/s/sharding_data_transform_cumulative_metrics.h
index 596b90024bd..dfd8c989628 100644
--- a/src/mongo/db/s/sharding_data_transform_cumulative_metrics.h
+++ b/src/mongo/db/s/sharding_data_transform_cumulative_metrics.h
@@ -34,7 +34,6 @@
#include "mongo/db/service_context.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/platform/mutex.h"
-#include "mongo/s/resharding/common_types_gen.h"
#include "mongo/util/duration.h"
#include "mongo/util/functional.h"
#include <set>
@@ -97,7 +96,9 @@ public:
void reportForServerStatus(BSONObjBuilder* bob) const;
void onStarted();
- void onCompletion(ReshardingOperationStatusEnum status);
+ void onSuccess();
+ void onFailure();
+ void onCanceled();
void setLastOpEndingChunkImbalance(int64_t imbalanceCount);
diff --git a/src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp b/src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp
index 8355fc71603..99a221b10ba 100644
--- a/src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp
+++ b/src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp
@@ -232,7 +232,7 @@ TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsSucceededCount)
ASSERT_EQ(report.getObjectField(kTestMetricsName).getIntField("countSucceeded"), 0);
}
- _cumulativeMetrics.onCompletion(ReshardingOperationStatusEnum::kSuccess);
+ _cumulativeMetrics.onSuccess();
{
BSONObjBuilder bob;
@@ -254,7 +254,7 @@ TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsFailedCount) {
ASSERT_EQ(report.getObjectField(kTestMetricsName).getIntField("countFailed"), 0);
}
- _cumulativeMetrics.onCompletion(ReshardingOperationStatusEnum::kFailure);
+ _cumulativeMetrics.onFailure();
{
BSONObjBuilder bob;
@@ -276,7 +276,7 @@ TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsCanceledCount)
ASSERT_EQ(report.getObjectField(kTestMetricsName).getIntField("countCanceled"), 0);
}
- _cumulativeMetrics.onCompletion(ReshardingOperationStatusEnum::kCanceled);
+ _cumulativeMetrics.onCanceled();
{
BSONObjBuilder bob;
diff --git a/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp b/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp
index 630542b03fe..e74155e374b 100644
--- a/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp
+++ b/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp
@@ -350,6 +350,10 @@ void ShardingDataTransformInstanceMetrics::onOplogEntriesApplied(int64_t numEntr
_cumulativeMetrics->onOplogEntriesApplied(numEntries);
}
+void ShardingDataTransformInstanceMetrics::restoreOplogEntriesApplied(int64_t numEntries) {
+ _oplogEntriesApplied.store(numEntries);
+}
+
void ShardingDataTransformInstanceMetrics::onWriteDuringCriticalSection() {
_writesDuringCriticalSection.addAndFetch(1);
_cumulativeMetrics->onWriteDuringCriticalSection();
@@ -408,4 +412,29 @@ void ShardingDataTransformInstanceMetrics::onOplogLocalBatchApplied(Milliseconds
_cumulativeMetrics->onOplogLocalBatchApplied(elapsed);
}
+ShardingDataTransformCumulativeMetrics*
+ShardingDataTransformInstanceMetrics::getCumulativeMetrics() {
+ return _cumulativeMetrics;
+}
+
+void ShardingDataTransformInstanceMetrics::onStarted() {
+ _cumulativeMetrics->onStarted();
+}
+
+void ShardingDataTransformInstanceMetrics::onSuccess() {
+ _cumulativeMetrics->onSuccess();
+}
+
+void ShardingDataTransformInstanceMetrics::onFailure() {
+ _cumulativeMetrics->onFailure();
+}
+
+void ShardingDataTransformInstanceMetrics::onCanceled() {
+ _cumulativeMetrics->onCanceled();
+}
+
+void ShardingDataTransformInstanceMetrics::setLastOpEndingChunkImbalance(int64_t imbalanceCount) {
+ _cumulativeMetrics->setLastOpEndingChunkImbalance(imbalanceCount);
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/sharding_data_transform_instance_metrics.h b/src/mongo/db/s/sharding_data_transform_instance_metrics.h
index 18cbcdea812..6c508bbafd8 100644
--- a/src/mongo/db/s/sharding_data_transform_instance_metrics.h
+++ b/src/mongo/db/s/sharding_data_transform_instance_metrics.h
@@ -66,20 +66,10 @@ public:
Date_t getStartTimestamp() const;
const UUID& getInstanceId() const;
- template <typename T>
- void onStateTransition(T before, boost::none_t after) {
- _cumulativeMetrics->onStateTransition<T>(before, after);
- }
-
- template <typename T>
- void onStateTransition(boost::none_t before, T after) {
- _cumulativeMetrics->onStateTransition<T>(before, after);
- }
-
- template <typename T>
- void onStateTransition(T before, T after) {
- _cumulativeMetrics->onStateTransition<T>(before, after);
- }
+ void onStarted();
+ void onSuccess();
+ void onFailure();
+ void onCanceled();
void onCopyingBegin();
void onCopyingEnd();
@@ -106,6 +96,7 @@ public:
void onLocalInsertDuringOplogFetching(Milliseconds elapsed);
void onBatchRetrievedDuringOplogApplying(Milliseconds elapsed);
void onOplogEntriesApplied(int64_t numEntries);
+ void restoreOplogEntriesApplied(int64_t numEntries);
void onCloningTotalRemoteBatchRetrieval(Milliseconds elapsed);
void onOplogLocalBatchApplied(Milliseconds elapsed);
void onWriteToStashedCollections();
@@ -121,6 +112,8 @@ public:
Seconds getApplyingElapsedTimeSecs() const;
Seconds getCriticalSectionElapsedTimeSecs() const;
+ void setLastOpEndingChunkImbalance(int64_t imbalanceCount);
+
protected:
void restoreCopyingBegin(Date_t date);
void restoreCopyingEnd(Date_t date);
@@ -134,6 +127,8 @@ protected:
int64_t deletesApplied,
int64_t writesToStashCollections);
+ ShardingDataTransformCumulativeMetrics* getCumulativeMetrics();
+
const UUID _instanceId;
const BSONObj _originalCommand;
const NamespaceString _sourceNs;
diff --git a/src/mongo/db/s/sharding_data_transform_instance_metrics_test.cpp b/src/mongo/db/s/sharding_data_transform_instance_metrics_test.cpp
index 13eb3ae8874..85797e5e361 100644
--- a/src/mongo/db/s/sharding_data_transform_instance_metrics_test.cpp
+++ b/src/mongo/db/s/sharding_data_transform_instance_metrics_test.cpp
@@ -371,6 +371,21 @@ TEST_F(ShardingDataTransformInstanceMetricsTest, RecipientReportsRemainingTime)
ASSERT_EQ(report.getIntField("remainingOperationTimeEstimatedSecs"), 0);
}
+TEST_F(ShardingDataTransformInstanceMetricsTest, RecipientRestoreAppliedOplogEntries) {
+ auto metrics = createInstanceMetrics(UUID::gen(), Role::kRecipient);
+
+ auto report = metrics->reportForCurrentOp();
+ ASSERT_EQ(report.getIntField("oplogEntriesApplied"), 0);
+
+ metrics->restoreOplogEntriesApplied(120);
+ report = metrics->reportForCurrentOp();
+ ASSERT_EQ(report.getIntField("oplogEntriesApplied"), 120);
+
+ metrics->restoreOplogEntriesApplied(30);
+ report = metrics->reportForCurrentOp();
+ ASSERT_EQ(report.getIntField("oplogEntriesApplied"), 30);
+}
+
TEST_F(ShardingDataTransformInstanceMetricsTest, CurrentOpReportsCopyingTime) {
runTimeReportTest(
"CurrentOpReportsCopyingTime",
diff --git a/src/mongo/db/s/sharding_data_transform_metrics.cpp b/src/mongo/db/s/sharding_data_transform_metrics.cpp
index 7725752a8ed..de3aab4f99e 100644
--- a/src/mongo/db/s/sharding_data_transform_metrics.cpp
+++ b/src/mongo/db/s/sharding_data_transform_metrics.cpp
@@ -47,9 +47,4 @@ StringData ShardingDataTransformMetrics::getRoleName(Role role) {
return it->second;
}
-bool ShardingDataTransformMetrics::isEnabled() {
- return feature_flags::gFeatureFlagShardingDataTransformMetrics.isEnabled(
- serverGlobalParams.featureCompatibility);
-}
-
} // namespace mongo
diff --git a/src/mongo/db/s/sharding_server_status.cpp b/src/mongo/db/s/sharding_server_status.cpp
index bc8673000c8..8d560454382 100644
--- a/src/mongo/db/s/sharding_server_status.cpp
+++ b/src/mongo/db/s/sharding_server_status.cpp
@@ -33,7 +33,6 @@
#include "mongo/db/commands/server_status.h"
#include "mongo/db/s/active_migrations_registry.h"
#include "mongo/db/s/collection_sharding_state.h"
-#include "mongo/db/s/resharding/resharding_metrics.h"
#include "mongo/db/s/sharding_data_transform_cumulative_metrics.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/s/sharding_statistics.h"
@@ -123,35 +122,22 @@ public:
CollectionShardingState::appendInfoForServerStatus(opCtx, &result);
}
- // The serverStatus command is run before the FCV is initialized so we ignore it when
- // checking whether the resharding feature is enabled here.
- if (resharding::gFeatureFlagResharding.isEnabledAndIgnoreFCV()) {
- if (feature_flags::gFeatureFlagShardingDataTransformMetrics.isEnabledAndIgnoreFCV()) {
- // TODO PM-2664: Switch over to using data transform metrics when they have feature
- // parity with resharding metrics.
- reportDataTransformMetrics(opCtx, &result);
- } else {
- reportReshardingMetrics(opCtx, &result);
- }
- }
+ reportDataTransformMetrics(opCtx, &result);
return result.obj();
}
- void reportReshardingMetrics(OperationContext* opCtx, BSONObjBuilder* bob) const {
- auto metrics = ReshardingMetrics::get(opCtx->getServiceContext());
- if (!metrics->wasReshardingEverAttempted()) {
- return;
- }
- BSONObjBuilder subObjBuilder(bob->subobjStart("resharding"));
- metrics->serializeCumulativeOpMetrics(&subObjBuilder);
- }
-
void reportDataTransformMetrics(OperationContext* opCtx, BSONObjBuilder* bob) const {
auto sCtx = opCtx->getServiceContext();
using Metrics = ShardingDataTransformCumulativeMetrics;
- Metrics::getForResharding(sCtx)->reportForServerStatus(bob);
- Metrics::getForGlobalIndexes(sCtx)->reportForServerStatus(bob);
+
+ // The serverStatus command is run before the FCV is initialized so we ignore it when
+ // checking whether the resharding feature is enabled here.
+ if (resharding::gFeatureFlagResharding.isEnabledAndIgnoreFCV()) {
+ Metrics::getForResharding(sCtx)->reportForServerStatus(bob);
+ // TODO SERVER-67049: move this into a separate feature flag check for global indexes.
+ Metrics::getForGlobalIndexes(sCtx)->reportForServerStatus(bob);
+ }
}
} shardingStatisticsServerStatus;
diff --git a/src/mongo/db/s/shardsvr_resharding_operation_time_command.cpp b/src/mongo/db/s/shardsvr_resharding_operation_time_command.cpp
index 0f799f6e3ac..4c3e05a7879 100644
--- a/src/mongo/db/s/shardsvr_resharding_operation_time_command.cpp
+++ b/src/mongo/db/s/shardsvr_resharding_operation_time_command.cpp
@@ -35,7 +35,6 @@
#include "mongo/db/commands.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/primary_only_service.h"
-#include "mongo/db/s/resharding/resharding_metrics.h"
#include "mongo/db/s/resharding/resharding_recipient_service.h"
#include "mongo/db/s/sharding_data_transform_metrics.h"
#include "mongo/db/service_context.h"
@@ -109,23 +108,17 @@ public:
}
Response typedRun(OperationContext* opCtx) {
- if (ShardingDataTransformMetrics::isEnabled()) {
- auto instances =
- getReshardingStateMachines<ReshardingRecipientService,
- ReshardingRecipientService::RecipientStateMachine>(
- opCtx, ns());
- if (instances.empty()) {
- return Response{boost::none, boost::none};
- }
- invariant(instances.size() == 1);
- const auto& metrics = instances[0]->getMetrics();
- return Response{duration_cast<Milliseconds>(metrics.getOperationRunningTimeSecs()),
- metrics.getHighEstimateRemainingTimeMillis()};
- } else {
- auto metrics = ReshardingMetrics::get(opCtx->getServiceContext());
- return Response{metrics->getOperationElapsedTime(),
- metrics->getOperationRemainingTime()};
+ auto instances =
+ getReshardingStateMachines<ReshardingRecipientService,
+ ReshardingRecipientService::RecipientStateMachine>(opCtx,
+ ns());
+ if (instances.empty()) {
+ return Response{boost::none, boost::none};
}
+ invariant(instances.size() == 1);
+ const auto& metrics = instances[0]->getMetrics();
+ return Response{duration_cast<Milliseconds>(metrics.getOperationRunningTimeSecs()),
+ metrics.getHighEstimateRemainingTimeMillis()};
}
};
} _shardsvrReshardingOperationTime;