diff options
author | Randolph Tan <randolph@10gen.com> | 2022-04-12 17:53:30 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-04-19 14:31:49 +0000 |
commit | 572cb2ea4d872432ce95c0de1c5cf4d37641b50a (patch) | |
tree | 17bbd28b562a7b505b72ebc3b70de528d7c4775d /src | |
parent | 31864e3866ce9cc54c08463019846ded2ad9e6e5 (diff) | |
download | mongo-572cb2ea4d872432ce95c0de1c5cf4d37641b50a.tar.gz |
SERVER-64384 Track and Report Parity Coordinator Fields in ServerStatus
Diffstat (limited to 'src')
10 files changed, 265 insertions, 15 deletions
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.cpp b/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.cpp index 374dcd6538f..a9ef25a0cc2 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.cpp @@ -91,11 +91,13 @@ CoordinatorCommitMonitor::CoordinatorCommitMonitor( std::vector<ShardId> recipientShards, CoordinatorCommitMonitor::TaskExecutorPtr executor, CancellationToken cancelToken, + ReshardingMetricsNew* metrics, Milliseconds maxDelayBetweenQueries) : _ns(std::move(ns)), _recipientShards(std::move(recipientShards)), _executor(std::move(executor)), _cancelToken(std::move(cancelToken)), + _metrics(metrics), _threshold(Milliseconds(gRemainingReshardingOperationTimeThresholdMillis.load())), _maxDelayBetweenQueries(maxDelayBetweenQueries) {} @@ -210,6 +212,11 @@ ExecutorFuture<void> CoordinatorCommitMonitor::_makeFuture() const { metrics->setMinRemainingOperationTime(remainingTimes.min); metrics->setMaxRemainingOperationTime(remainingTimes.max); + if (ShardingDataTransformMetrics::isEnabled()) { + _metrics->setLowestEstimatedRemainingOperationTime(remainingTimes.min); + _metrics->setHighestEstimatedRemainingOperationTime(remainingTimes.max); + } + // Check if all recipient shards are within the commit threshold. if (remainingTimes.max <= _threshold) return ExecutorFuture<void>(_executor); diff --git a/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.h b/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.h index 64544981ae5..a8b070fd30f 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.h +++ b/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.h @@ -33,6 +33,7 @@ #include <vector> #include "mongo/db/namespace_string.h" +#include "mongo/db/s/resharding/resharding_metrics_new.h" #include "mongo/executor/task_executor.h" #include "mongo/s/shard_id.h" #include "mongo/util/cancellation.h" @@ -72,6 +73,7 @@ public: std::vector<ShardId> recipientShards, TaskExecutorPtr executor, CancellationToken cancelToken, + ReshardingMetricsNew* metrics, Milliseconds maxDelayBetweenQueries = kMaxDelayBetweenQueries); SemiFuture<void> waitUntilRecipientsAreWithinCommitThreshold() const; @@ -97,6 +99,9 @@ private: const std::vector<ShardId> _recipientShards; const TaskExecutorPtr _executor; const CancellationToken _cancelToken; + + ReshardingMetricsNew* _metrics; + const Milliseconds _threshold; const Milliseconds _maxDelayBetweenQueries; diff --git a/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp index e7f90cc41fa..b8fd31b8d1d 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp @@ -105,6 +105,9 @@ private: std::shared_ptr<CoordinatorCommitMonitor> _commitMonitor; boost::optional<Callback> _runOnMockingNextResponse; + + ShardingDataTransformCumulativeMetrics _cumulativeMetrics{"dummyForTest"}; + std::unique_ptr<ReshardingMetricsNew> _metrics; }; auto makeExecutor() { @@ -117,9 +120,9 @@ auto makeExecutor() { void CoordinatorCommitMonitorTest::setUp() { ConfigServerTestFixture::setUp(); + auto clockSource = getServiceContext()->getFastClockSource(); auto metrics = ReshardingMetrics::get(getServiceContext()); - metrics->onStart(ReshardingMetrics::Role::kCoordinator, - getServiceContext()->getFastClockSource()->now()); + metrics->onStart(ReshardingMetrics::Role::kCoordinator, clockSource->now()); metrics->setCoordinatorState(CoordinatorStateEnum::kApplying); auto hostNameForShard = [](const ShardId& shard) -> std::string { @@ -149,8 +152,22 @@ void CoordinatorCommitMonitorTest::setUp() { _futureExecutor->startup(); _cancellationSource = std::make_unique<CancellationSource>(); - _commitMonitor = std::make_shared<CoordinatorCommitMonitor>( - _ns, _recipientShards, _futureExecutor, _cancellationSource->token(), Milliseconds(0)); + + _metrics = std::make_unique<ReshardingMetricsNew>( + UUID::gen(), + BSON("y" << 1), + _ns, + ShardingDataTransformInstanceMetrics::Role::kCoordinator, + clockSource->now(), + clockSource, + &_cumulativeMetrics); + + _commitMonitor = std::make_shared<CoordinatorCommitMonitor>(_ns, + _recipientShards, + _futureExecutor, + _cancellationSource->token(), + _metrics.get(), + Milliseconds(0)); _commitMonitor->setNetworkExecutorForTest(executor()); } diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp index 0186943be5e..30acb2f9ae3 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp @@ -52,6 +52,7 @@ #include "mongo/db/s/resharding/resharding_metrics.h" #include "mongo/db/s/resharding/resharding_server_parameters_gen.h" #include "mongo/db/s/resharding/resharding_util.h" +#include "mongo/db/s/sharding_data_transform_cumulative_metrics.h" #include "mongo/db/s/sharding_data_transform_metrics.h" #include "mongo/db/s/sharding_ddl_util.h" #include "mongo/db/s/sharding_logging.h" @@ -1041,6 +1042,11 @@ void markCompleted(const Status& status) { metrics->onCompletion( ReshardingMetrics::Role::kCoordinator, metricsOperationStatus, getCurrentTime()); + + if (ShardingDataTransformMetrics::isEnabled()) { + ShardingDataTransformCumulativeMetrics::getForResharding(cc().getServiceContext()) + ->onCompletion(metricsOperationStatus); + } } BSONObj createFlushReshardingStateChangeCommand(const NamespaceString& nss, @@ -1527,6 +1533,11 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_insertCoordDocAndChan // TODO SERVER-53914 to accommodate loading metrics for the coordinator. ReshardingMetrics::get(cc().getServiceContext()) ->onStart(ReshardingMetrics::Role::kCoordinator, getCurrentTime()); + + if (ShardingDataTransformMetrics::isEnabled()) { + ShardingDataTransformCumulativeMetrics::getForResharding(cc().getServiceContext()) + ->onStarted(); + } } pauseBeforeInsertCoordinatorDoc.pauseWhileSet(); @@ -1655,7 +1666,8 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_startCommitMonitor( _coordinatorDoc.getSourceNss(), extractShardIdsFromParticipantEntries(_coordinatorDoc.getRecipientShards()), **executor, - _ctHolder->getCommitMonitorToken()); + _ctHolder->getCommitMonitorToken(), + _metricsNew.get()); _commitMonitorQuiesced = _commitMonitor->waitUntilRecipientsAreWithinCommitThreshold() .thenRunOn(**executor) @@ -1998,6 +2010,10 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_updateChunkImbalanceM ReshardingMetrics::get(opCtx->getServiceContext()) ->setLastReshardChunkImbalanceCount(imbalanceCount); + if (ShardingDataTransformMetrics::isEnabled()) { + ShardingDataTransformCumulativeMetrics::getForResharding(opCtx->getServiceContext()) + ->setLastOpEndingChunkImbalance(imbalanceCount); + } } catch (const DBException& ex) { LOGV2_WARNING(5543000, "Encountered error while trying to update resharding chunk imbalance metrics", diff --git a/src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp b/src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp index c169abfdf9a..9e53bfd346b 100644 --- a/src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp +++ b/src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp @@ -197,12 +197,14 @@ void ShardingDataTransformCumulativeMetrics::reportForServerStatus(BSONObjBuilde if (!_operationWasAttempted.load()) { return; } + BSONObjBuilder root(bob->subobjStart(_rootSectionName)); - root.append(kCountStarted, kPlaceholderLong); - root.append(kCountSucceeded, kPlaceholderLong); - root.append(kCountFailed, kPlaceholderLong); - root.append(kCountCanceled, kPlaceholderLong); - root.append(kLastOpEndingChunkImbalance, kPlaceholderLong); + root.append(kCountStarted, _countStarted.load()); + root.append(kCountSucceeded, _countSucceeded.load()); + root.append(kCountFailed, _countFailed.load()); + root.append(kCountCanceled, _countCancelled.load()); + root.append(kLastOpEndingChunkImbalance, _lastOpEndingChunkImbalance.load()); + reportActive(&root); reportOldestActive(&root); reportLatencies(&root); @@ -303,4 +305,28 @@ ShardingDataTransformCumulativeMetrics::insertMetrics(const InstanceObserver* me return it; } +void ShardingDataTransformCumulativeMetrics::onStarted() { + _countStarted.fetchAndAdd(1); +} + +void ShardingDataTransformCumulativeMetrics::onCompletion(ReshardingOperationStatusEnum status) { + switch (status) { + case ReshardingOperationStatusEnum::kSuccess: + _countSucceeded.fetchAndAdd(1); + break; + case ReshardingOperationStatusEnum::kFailure: + _countFailed.fetchAndAdd(1); + break; + case ReshardingOperationStatusEnum::kCanceled: + _countCancelled.fetchAndAdd(1); + break; + default: + MONGO_UNREACHABLE; + } +} + +void ShardingDataTransformCumulativeMetrics::setLastOpEndingChunkImbalance(int64_t imbalanceCount) { + _lastOpEndingChunkImbalance.store(imbalanceCount); +} + } // namespace mongo diff --git a/src/mongo/db/s/sharding_data_transform_cumulative_metrics.h b/src/mongo/db/s/sharding_data_transform_cumulative_metrics.h index 930ebab2f4e..8374f018753 100644 --- a/src/mongo/db/s/sharding_data_transform_cumulative_metrics.h +++ b/src/mongo/db/s/sharding_data_transform_cumulative_metrics.h @@ -34,6 +34,7 @@ #include "mongo/db/service_context.h" #include "mongo/platform/atomic_word.h" #include "mongo/platform/mutex.h" +#include "mongo/s/resharding/common_types_gen.h" #include "mongo/util/functional.h" #include <set> @@ -56,6 +57,11 @@ public: size_t getObservedMetricsCount(Role role) const; void reportForServerStatus(BSONObjBuilder* bob) const; + void onStarted(); + void onCompletion(ReshardingOperationStatusEnum status); + + void setLastOpEndingChunkImbalance(int64_t imbalanceCount); + private: struct MetricsComparer { inline bool operator()(const InstanceObserver* a, const InstanceObserver* b) const { @@ -82,6 +88,13 @@ private: const std::string _rootSectionName; std::vector<MetricsSet> _instanceMetricsForAllRoles; AtomicWord<bool> _operationWasAttempted; + + AtomicWord<int64_t> _countStarted{0}; + AtomicWord<int64_t> _countSucceeded{0}; + AtomicWord<int64_t> _countFailed{0}; + AtomicWord<int64_t> _countCancelled{0}; + + AtomicWord<int64_t> _lastOpEndingChunkImbalance{0}; }; } // namespace mongo diff --git a/src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp b/src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp index beb1b079301..15de8cf5199 100644 --- a/src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp +++ b/src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp @@ -175,5 +175,127 @@ TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsTimeEstimates) 300); } +TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsRunCount) { + using Role = ShardingDataTransformMetrics::Role; + ObserverMock coordinator{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kCoordinator}; + auto ignore = _cumulativeMetrics.registerInstanceMetrics(&coordinator); + + { + BSONObjBuilder bob; + _cumulativeMetrics.reportForServerStatus(&bob); + auto report = bob.done(); + ASSERT_EQ(report.getObjectField(kTestMetricsName).getIntField("countStarted"), 0); + } + + _cumulativeMetrics.onStarted(); + + { + BSONObjBuilder bob; + _cumulativeMetrics.reportForServerStatus(&bob); + auto report = bob.done(); + ASSERT_EQ(report.getObjectField(kTestMetricsName).getIntField("countStarted"), 1); + } +} + +TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsSucceededCount) { + using Role = ShardingDataTransformMetrics::Role; + ObserverMock coordinator{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kCoordinator}; + auto ignore = _cumulativeMetrics.registerInstanceMetrics(&coordinator); + + { + BSONObjBuilder bob; + _cumulativeMetrics.reportForServerStatus(&bob); + auto report = bob.done(); + ASSERT_EQ(report.getObjectField(kTestMetricsName).getIntField("countSucceeded"), 0); + } + + _cumulativeMetrics.onCompletion(ReshardingOperationStatusEnum::kSuccess); + + { + BSONObjBuilder bob; + _cumulativeMetrics.reportForServerStatus(&bob); + auto report = bob.done(); + ASSERT_EQ(report.getObjectField(kTestMetricsName).getIntField("countSucceeded"), 1); + } +} + +TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsFailedCount) { + using Role = ShardingDataTransformMetrics::Role; + ObserverMock coordinator{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kCoordinator}; + auto ignore = _cumulativeMetrics.registerInstanceMetrics(&coordinator); + + { + BSONObjBuilder bob; + _cumulativeMetrics.reportForServerStatus(&bob); + auto report = bob.done(); + ASSERT_EQ(report.getObjectField(kTestMetricsName).getIntField("countFailed"), 0); + } + + _cumulativeMetrics.onCompletion(ReshardingOperationStatusEnum::kFailure); + + { + BSONObjBuilder bob; + _cumulativeMetrics.reportForServerStatus(&bob); + auto report = bob.done(); + ASSERT_EQ(report.getObjectField(kTestMetricsName).getIntField("countFailed"), 1); + } +} + +TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsCanceledCount) { + using Role = ShardingDataTransformMetrics::Role; + ObserverMock coordinator{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kCoordinator}; + auto ignore = _cumulativeMetrics.registerInstanceMetrics(&coordinator); + + { + BSONObjBuilder bob; + _cumulativeMetrics.reportForServerStatus(&bob); + auto report = bob.done(); + ASSERT_EQ(report.getObjectField(kTestMetricsName).getIntField("countCanceled"), 0); + } + + _cumulativeMetrics.onCompletion(ReshardingOperationStatusEnum::kCanceled); + + { + BSONObjBuilder bob; + _cumulativeMetrics.reportForServerStatus(&bob); + auto report = bob.done(); + ASSERT_EQ(report.getObjectField(kTestMetricsName).getIntField("countCanceled"), 1); + } +} + +TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsLastChunkImbalanceCount) { + using Role = ShardingDataTransformMetrics::Role; + ObserverMock coordinator{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kCoordinator}; + auto ignore = _cumulativeMetrics.registerInstanceMetrics(&coordinator); + + { + BSONObjBuilder bob; + _cumulativeMetrics.reportForServerStatus(&bob); + auto report = bob.done(); + ASSERT_EQ(report.getObjectField(kTestMetricsName).getIntField("lastOpEndingChunkImbalance"), + 0); + } + + _cumulativeMetrics.setLastOpEndingChunkImbalance(111); + + { + BSONObjBuilder bob; + _cumulativeMetrics.reportForServerStatus(&bob); + auto report = bob.done(); + ASSERT_EQ(report.getObjectField(kTestMetricsName).getIntField("lastOpEndingChunkImbalance"), + 111); + } + + _cumulativeMetrics.setLastOpEndingChunkImbalance(777); + + { + BSONObjBuilder bob; + _cumulativeMetrics.reportForServerStatus(&bob); + auto report = bob.done(); + ASSERT_EQ(report.getObjectField(kTestMetricsName).getIntField("lastOpEndingChunkImbalance"), + 777); + } +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp b/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp index c556e235daa..d5c94d46416 100644 --- a/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp +++ b/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp @@ -105,7 +105,9 @@ ShardingDataTransformInstanceMetrics::ShardingDataTransformInstanceMetrics( _oplogEntriesApplied{0}, _criticalSectionStartTime{kNoDate}, _criticalSectionEndTime{kNoDate}, - _writesDuringCriticalSection{0} {} + _writesDuringCriticalSection{0}, + _lowestEstimatedRemainingOperationTime{Milliseconds(0)}, + _highestEstimatedRemainingOperationTime{Milliseconds(0)} {} ShardingDataTransformInstanceMetrics::~ShardingDataTransformInstanceMetrics() { if (_deregister) { @@ -114,11 +116,11 @@ ShardingDataTransformInstanceMetrics::~ShardingDataTransformInstanceMetrics() { } int64_t ShardingDataTransformInstanceMetrics::getHighEstimateRemainingTimeMillis() const { - return kPlaceholderTimeRemainingForTesting; + return durationCount<Milliseconds>(_highestEstimatedRemainingOperationTime.load()); } int64_t ShardingDataTransformInstanceMetrics::getLowEstimateRemainingTimeMillis() const { - return kPlaceholderTimeRemainingForTesting; + return durationCount<Milliseconds>(_lowestEstimatedRemainingOperationTime.load()); } Date_t ShardingDataTransformInstanceMetrics::getStartTimestamp() const { @@ -155,8 +157,10 @@ BSONObj ShardingDataTransformInstanceMetrics::reportForCurrentOp() const noexcep switch (_role) { case Role::kCoordinator: - builder.append(kAllShardsHighestRemainingOperationTimeEstimatedSecs, TEMP_VALUE); - builder.append(kAllShardsLowestRemainingOperationTimeEstimatedSecs, TEMP_VALUE); + builder.append(kAllShardsHighestRemainingOperationTimeEstimatedSecs, + durationCount<Seconds>(_highestEstimatedRemainingOperationTime.load())); + builder.append(kAllShardsLowestRemainingOperationTimeEstimatedSecs, + durationCount<Seconds>(_lowestEstimatedRemainingOperationTime.load())); builder.append(kCoordinatorState, getStateString()); builder.append(kApplyTimeElapsed, TEMP_VALUE); builder.append(kCopyTimeElapsed, getCopyingElapsedTimeSecs()); @@ -267,4 +271,14 @@ void ShardingDataTransformInstanceMetrics::accumulateValues(int64_t insertsAppli _deletesApplied.fetchAndAdd(deletesApplied); } +void ShardingDataTransformInstanceMetrics::setLowestEstimatedRemainingOperationTime( + Milliseconds time) { + _lowestEstimatedRemainingOperationTime.store(time); +} + +void ShardingDataTransformInstanceMetrics::setHighestEstimatedRemainingOperationTime( + Milliseconds time) { + _highestEstimatedRemainingOperationTime.store(time); +} + } // namespace mongo diff --git a/src/mongo/db/s/sharding_data_transform_instance_metrics.h b/src/mongo/db/s/sharding_data_transform_instance_metrics.h index 0e29562188e..66c6092b722 100644 --- a/src/mongo/db/s/sharding_data_transform_instance_metrics.h +++ b/src/mongo/db/s/sharding_data_transform_instance_metrics.h @@ -79,6 +79,9 @@ public: void onCriticalSectionBegin(); void onCriticalSectionEnd(); + void setLowestEstimatedRemainingOperationTime(Milliseconds time); + void setHighestEstimatedRemainingOperationTime(Milliseconds time); + Role getRole() const; protected: @@ -150,6 +153,9 @@ private: AtomicWord<Date_t> _criticalSectionEndTime; AtomicWord<int64_t> _readsDuringCriticalSection; AtomicWord<int64_t> _writesDuringCriticalSection; + + AtomicWord<Milliseconds> _lowestEstimatedRemainingOperationTime; + AtomicWord<Milliseconds> _highestEstimatedRemainingOperationTime; }; } // namespace mongo diff --git a/src/mongo/db/s/sharding_data_transform_instance_metrics_test.cpp b/src/mongo/db/s/sharding_data_transform_instance_metrics_test.cpp index 6b1f49ecd2c..ef32091c77b 100644 --- a/src/mongo/db/s/sharding_data_transform_instance_metrics_test.cpp +++ b/src/mongo/db/s/sharding_data_transform_instance_metrics_test.cpp @@ -325,5 +325,29 @@ TEST_F(ShardingDataTransformInstanceMetricsTest, OnWriteToStasheddShouldIncremen ASSERT_EQ(report.getIntField("countWritesToStashCollections"), 1); } +TEST_F(ShardingDataTransformInstanceMetricsTest, + SetLowestOperationTimeShouldBeReflectedInCurrentOp) { + auto metrics = createInstanceMetrics(UUID::gen(), Role::kCoordinator); + + auto report = metrics->reportForCurrentOp(); + ASSERT_EQ(report.getIntField("allShardsLowestRemainingOperationTimeEstimatedSecs"), 0); + metrics->setLowestEstimatedRemainingOperationTime(Milliseconds(2000)); + + report = metrics->reportForCurrentOp(); + ASSERT_EQ(report.getIntField("allShardsLowestRemainingOperationTimeEstimatedSecs"), 2); +} + +TEST_F(ShardingDataTransformInstanceMetricsTest, + SetHighestOperationTimeShouldBeReflectedInCurrentOp) { + auto metrics = createInstanceMetrics(UUID::gen(), Role::kCoordinator); + + auto report = metrics->reportForCurrentOp(); + ASSERT_EQ(report.getIntField("allShardsHighestRemainingOperationTimeEstimatedSecs"), 0); + metrics->setHighestEstimatedRemainingOperationTime(Milliseconds(12000)); + + report = metrics->reportForCurrentOp(); + ASSERT_EQ(report.getIntField("allShardsHighestRemainingOperationTimeEstimatedSecs"), 12); +} + } // namespace } // namespace mongo |