diff options
Diffstat (limited to 'src/mongo')
6 files changed, 246 insertions, 164 deletions
diff --git a/src/mongo/db/s/resharding/resharding_metrics.cpp b/src/mongo/db/s/resharding/resharding_metrics.cpp index 000cb691264..cc5cb36ac24 100644 --- a/src/mongo/db/s/resharding/resharding_metrics.cpp +++ b/src/mongo/db/s/resharding/resharding_metrics.cpp @@ -58,7 +58,7 @@ constexpr auto kCriticalSectionTimeElapsed = "totalCriticalSectionTimeElapsed"; constexpr auto kCoordinatorState = "coordinatorState"; constexpr auto kDonorState = "donorState"; constexpr auto kRecipientState = "recipientState"; -constexpr auto kCompletionStatus = "opStatus"; +constexpr auto kOpStatus = "opStatus"; using MetricsPtr = std::unique_ptr<ReshardingMetrics>; @@ -89,8 +89,7 @@ ReshardingMetrics* ReshardingMetrics::get(ServiceContext* ctx) noexcept { void ReshardingMetrics::onStart() noexcept { stdx::lock_guard<Latch> lk(_mutex); - invariant(!_currentOp.has_value() || _currentOp->isCompleted(), kAnotherOperationInProgress); - + invariant(!_currentOp.has_value(), kAnotherOperationInProgress); // Create a new operation and record the time it started. _currentOp.emplace(_svcCtx->getFastClockSource()); _currentOp->runningOperation.start(); @@ -100,7 +99,7 @@ void ReshardingMetrics::onStart() noexcept { void ReshardingMetrics::onCompletion(ReshardingOperationStatusEnum status) noexcept { stdx::lock_guard<Latch> lk(_mutex); - invariant(_currentOp.has_value() && !_currentOp->isCompleted(), kNoOperationInProgress); + invariant(_currentOp.has_value(), kNoOperationInProgress); switch (status) { case ReshardingOperationStatusEnum::kSuccess: _succeeded++; @@ -115,17 +114,13 @@ void ReshardingMetrics::onCompletion(ReshardingOperationStatusEnum status) noexc MONGO_UNREACHABLE; } - // Mark the active operation as completed and ensure all timers are stopped. - _currentOp->runningOperation.end(); - _currentOp->copyingDocuments.tryEnd(); - _currentOp->applyingOplogEntries.tryEnd(); - _currentOp->inCriticalSection.tryEnd(); - _currentOp->opStatus = status; + // Reset current op metrics. + _currentOp = boost::none; } void ReshardingMetrics::setDonorState(DonorStateEnum state) noexcept { stdx::lock_guard<Latch> lk(_mutex); - invariant(_currentOp.has_value() && !_currentOp->isCompleted(), kNoOperationInProgress); + invariant(_currentOp.has_value(), kNoOperationInProgress); const auto oldState = std::exchange(_currentOp->donorState, state); invariant(oldState != state); @@ -141,7 +136,7 @@ void ReshardingMetrics::setDonorState(DonorStateEnum state) noexcept { void ReshardingMetrics::setRecipientState(RecipientStateEnum state) noexcept { stdx::lock_guard<Latch> lk(_mutex); - invariant(_currentOp.has_value() && !_currentOp->isCompleted(), kNoOperationInProgress); + invariant(_currentOp.has_value(), kNoOperationInProgress); const auto oldState = std::exchange(_currentOp->recipientState, state); invariant(oldState != state); @@ -161,13 +156,13 @@ void ReshardingMetrics::setRecipientState(RecipientStateEnum state) noexcept { void ReshardingMetrics::setCoordinatorState(CoordinatorStateEnum state) noexcept { stdx::lock_guard<Latch> lk(_mutex); - invariant(_currentOp.has_value() && !_currentOp->isCompleted(), kNoOperationInProgress); + invariant(_currentOp.has_value(), kNoOperationInProgress); _currentOp->coordinatorState = state; } void ReshardingMetrics::setDocumentsToCopy(int64_t documents, int64_t bytes) noexcept { stdx::lock_guard<Latch> lk(_mutex); - invariant(_currentOp.has_value() && !_currentOp->isCompleted(), kNoOperationInProgress); + invariant(_currentOp.has_value(), kNoOperationInProgress); _currentOp->documentsToCopy = documents; _currentOp->bytesToCopy = bytes; @@ -175,36 +170,41 @@ void ReshardingMetrics::setDocumentsToCopy(int64_t documents, int64_t bytes) noe void ReshardingMetrics::onDocumentsCopied(int64_t documents, int64_t bytes) noexcept { stdx::lock_guard<Latch> lk(_mutex); - invariant(_currentOp.has_value() && !_currentOp->isCompleted(), kNoOperationInProgress); + invariant(_currentOp.has_value(), kNoOperationInProgress); invariant(_currentOp->recipientState == RecipientStateEnum::kCloning); _currentOp->documentsCopied += documents; _currentOp->bytesCopied += bytes; + _cumulativeOp.documentsCopied += documents; + _cumulativeOp.bytesCopied += bytes; } void ReshardingMetrics::onOplogEntriesFetched(int64_t entries) noexcept { stdx::lock_guard<Latch> lk(_mutex); - invariant(_currentOp.has_value() && !_currentOp->isCompleted(), kNoOperationInProgress); + invariant(_currentOp.has_value(), kNoOperationInProgress); invariant(_currentOp->recipientState == RecipientStateEnum::kCloning || _currentOp->recipientState == RecipientStateEnum::kApplying || _currentOp->recipientState == RecipientStateEnum::kSteadyState); _currentOp->oplogEntriesFetched += entries; + _cumulativeOp.oplogEntriesFetched += entries; } void ReshardingMetrics::onOplogEntriesApplied(int64_t entries) noexcept { stdx::lock_guard<Latch> lk(_mutex); - invariant(_currentOp.has_value() && !_currentOp->isCompleted(), kNoOperationInProgress); + invariant(_currentOp.has_value(), kNoOperationInProgress); invariant(_currentOp->recipientState == RecipientStateEnum::kApplying || _currentOp->recipientState == RecipientStateEnum::kSteadyState); _currentOp->oplogEntriesApplied += entries; + _cumulativeOp.oplogEntriesApplied += entries; } void ReshardingMetrics::onWriteDuringCriticalSection(int64_t writes) noexcept { stdx::lock_guard<Latch> lk(_mutex); - invariant(_currentOp.has_value() && !_currentOp->isCompleted(), kNoOperationInProgress); + invariant(_currentOp.has_value(), kNoOperationInProgress); invariant(_currentOp->donorState == DonorStateEnum::kPreparingToBlockWrites || _currentOp->donorState == DonorStateEnum::kBlockingWrites); _currentOp->writesDuringCriticalSection += writes; + _cumulativeOp.writesDuringCriticalSection += writes; } void ReshardingMetrics::OperationMetrics::TimeInterval::start() noexcept { @@ -212,18 +212,10 @@ void ReshardingMetrics::OperationMetrics::TimeInterval::start() noexcept { _start.emplace(_clockSource->now()); } -void ReshardingMetrics::OperationMetrics::TimeInterval::tryEnd() noexcept { - if (!_start.has_value()) - return; - if (_end.has_value()) - return; - _end.emplace(_clockSource->now()); -} - void ReshardingMetrics::OperationMetrics::TimeInterval::end() noexcept { invariant(_start.has_value(), "Not started"); invariant(!_end.has_value(), "Already stopped"); - tryEnd(); + _end.emplace(_clockSource->now()); } Milliseconds ReshardingMetrics::OperationMetrics::TimeInterval::duration() const noexcept { @@ -234,12 +226,10 @@ Milliseconds ReshardingMetrics::OperationMetrics::TimeInterval::duration() const return duration_cast<Milliseconds>(_end.value() - _start.value()); } -void ReshardingMetrics::OperationMetrics::append(BSONObjBuilder* bob, Role role) const { - auto getElapsedTime = [role](const TimeInterval& interval) -> int64_t { - if (role == Role::kAll) - return durationCount<Milliseconds>(interval.duration()); - else - return durationCount<Seconds>(interval.duration()); +void ReshardingMetrics::OperationMetrics::appendCurrentOpMetrics(BSONObjBuilder* bob, + Role role) const { + auto getElapsedTime = [](const TimeInterval& interval) -> int64_t { + return durationCount<Seconds>(interval.duration()); }; auto remainingMsec = [&]() -> boost::optional<Milliseconds> { @@ -256,74 +246,46 @@ void ReshardingMetrics::OperationMetrics::append(BSONObjBuilder* bob, Role role) return {}; }(); + bob->append(kOpTimeElapsed, getElapsedTime(runningOperation)); - const std::string kIntervalSuffix = role == Role::kAll ? "Millis" : ""; - bob->append(kOpTimeElapsed + kIntervalSuffix, getElapsedTime(runningOperation)); - - bob->append(kOpTimeRemaining + kIntervalSuffix, + bob->append(kOpTimeRemaining, !remainingMsec ? int64_t{-1} /** -1 is a specified integer null value */ - : role == Role::kAll ? durationCount<Milliseconds>(*remainingMsec) - : durationCount<Seconds>(*remainingMsec)); - - if (role == Role::kAll || role == Role::kRecipient) { - bob->append(kDocumentsToCopy, documentsToCopy); - bob->append(kDocumentsCopied, documentsCopied); - bob->append(kBytesToCopy, bytesToCopy); - bob->append(kBytesCopied, bytesCopied); - bob->append(kCopyTimeElapsed + kIntervalSuffix, getElapsedTime(copyingDocuments)); - - bob->append(kOplogsFetched, oplogEntriesFetched); - bob->append(kOplogsApplied, oplogEntriesApplied); - bob->append(kApplyTimeElapsed + kIntervalSuffix, getElapsedTime(applyingOplogEntries)); - } - - if (role == Role::kAll || role == Role::kDonor) { - bob->append(kWritesDuringCritialSection, writesDuringCriticalSection); - bob->append(kCriticalSectionTimeElapsed + kIntervalSuffix, - getElapsedTime(inCriticalSection)); - } + : durationCount<Seconds>(*remainingMsec)); switch (role) { case Role::kDonor: + bob->append(kWritesDuringCritialSection, writesDuringCriticalSection); + bob->append(kCriticalSectionTimeElapsed, getElapsedTime(inCriticalSection)); bob->append(kDonorState, DonorState_serializer(donorState)); - bob->append(kCompletionStatus, ReshardingOperationStatus_serializer(opStatus)); + bob->append(kOpStatus, ReshardingOperationStatus_serializer(opStatus)); break; case Role::kRecipient: + bob->append(kDocumentsToCopy, documentsToCopy); + bob->append(kDocumentsCopied, documentsCopied); + bob->append(kBytesToCopy, bytesToCopy); + bob->append(kBytesCopied, bytesCopied); + bob->append(kCopyTimeElapsed, getElapsedTime(copyingDocuments)); + + bob->append(kOplogsFetched, oplogEntriesFetched); + bob->append(kOplogsApplied, oplogEntriesApplied); + bob->append(kApplyTimeElapsed, getElapsedTime(applyingOplogEntries)); bob->append(kRecipientState, RecipientState_serializer(recipientState)); - bob->append(kCompletionStatus, ReshardingOperationStatus_serializer(opStatus)); + bob->append(kOpStatus, ReshardingOperationStatus_serializer(opStatus)); break; case Role::kCoordinator: bob->append(kCoordinatorState, CoordinatorState_serializer(coordinatorState)); - bob->append(kCompletionStatus, ReshardingOperationStatus_serializer(opStatus)); - break; - case Role::kAll: - bob->append(kDonorState, donorState); - bob->append(kRecipientState, recipientState); - bob->append(kCoordinatorState, coordinatorState); - bob->append(kCompletionStatus, opStatus); + bob->append(kOpStatus, ReshardingOperationStatus_serializer(opStatus)); break; default: MONGO_UNREACHABLE; } } -void ReshardingMetrics::serialize(BSONObjBuilder* bob, ReporterOptions::Role role) const { +void ReshardingMetrics::serializeCurrentOpMetrics(BSONObjBuilder* bob, + ReporterOptions::Role role) const { stdx::lock_guard<Latch> lk(_mutex); - - if (role == ReporterOptions::Role::kAll) { - bob->append(kTotalOps, _started); - bob->append(kSuccessfulOps, _succeeded); - bob->append(kFailedOps, _failed); - bob->append(kCanceledOps, _canceled); - } - - if (_currentOp) { - _currentOp->append(bob, role); - } else { - // There are no resharding operations in progress, so report the default metrics. - OperationMetrics opMetrics(_svcCtx->getFastClockSource()); - opMetrics.append(bob, role); - } + if (_currentOp) + _currentOp->appendCurrentOpMetrics(bob, role); } BSONObj ReshardingMetrics::reportForCurrentOp(const ReporterOptions& options) const noexcept { @@ -359,7 +321,7 @@ BSONObj ReshardingMetrics::reportForCurrentOp(const ReporterOptions& options) co bob.append("ns", options.nss.toString()); bob.append("originatingCommand", originatingCommand); - serialize(&bob, options.role); + serializeCurrentOpMetrics(&bob, options.role); return bob.obj(); } @@ -370,4 +332,23 @@ boost::optional<Milliseconds> ReshardingMetrics::getOperationElapsedTime() const return _currentOp->runningOperation.duration(); } +void ReshardingMetrics::OperationMetrics::appendCumulativeOpMetrics(BSONObjBuilder* bob) const { + bob->append(kDocumentsCopied, documentsCopied); + bob->append(kBytesCopied, bytesCopied); + bob->append(kOplogsApplied, oplogEntriesApplied); + bob->append(kWritesDuringCritialSection, writesDuringCriticalSection); + bob->append(kOplogsFetched, oplogEntriesFetched); +} + +void ReshardingMetrics::serializeCumulativeOpMetrics(BSONObjBuilder* bob) const { + stdx::lock_guard<Latch> lk(_mutex); + + bob->append(kTotalOps, _started); + bob->append(kSuccessfulOps, _succeeded); + bob->append(kFailedOps, _failed); + bob->append(kCanceledOps, _canceled); + + _cumulativeOp.appendCumulativeOpMetrics(bob); +} + } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_metrics.h b/src/mongo/db/s/resharding/resharding_metrics.h index 32e7e0a53c7..f2aa4fe1795 100644 --- a/src/mongo/db/s/resharding/resharding_metrics.h +++ b/src/mongo/db/s/resharding/resharding_metrics.h @@ -52,7 +52,8 @@ public: ReshardingMetrics(const ReshardingMetrics&) = delete; ReshardingMetrics(ReshardingMetrics&&) = delete; - explicit ReshardingMetrics(ServiceContext* svcCtx) : _svcCtx(svcCtx) {} + explicit ReshardingMetrics(ServiceContext* svcCtx) + : _svcCtx(svcCtx), _cumulativeOp(svcCtx->getFastClockSource()) {} static ReshardingMetrics* get(ServiceContext*) noexcept; @@ -84,7 +85,7 @@ public: void onCompletion(ReshardingOperationStatusEnum) noexcept; struct ReporterOptions { - enum class Role { kAll, kDonor, kRecipient, kCoordinator }; + enum class Role { kDonor, kRecipient, kCoordinator }; ReporterOptions(Role role, UUID id, NamespaceString nss, BSONObj shardKey, bool unique) : role(role), id(std::move(id)), @@ -100,15 +101,11 @@ public: }; BSONObj reportForCurrentOp(const ReporterOptions& options) const noexcept; - /** - * Append metrics to the builder in CurrentOp format for the given `role`. - * If `role` is omitted, append in ServerStatus format. - * There are significant format differences. ServerStatus: - * - Uses integers instead of names for enum values to improve FTDC compression. - * - Uses millisecond time intervals and + 'Millis' field name suffixes. - * - Has no role. Any data from any available resharding role is merged in. - */ - void serialize(BSONObjBuilder*, ReporterOptions::Role role = ReporterOptions::Role::kAll) const; + // Append metrics to the builder in CurrentOp format for the given `role`. + void serializeCurrentOpMetrics(BSONObjBuilder*, ReporterOptions::Role role) const; + + // Append metrics to the builder in CumulativeOp (ServerStatus) format. + void serializeCumulativeOpMetrics(BSONObjBuilder*) const; // Reports the elapsed time for the active resharding operation, or `boost::none`. boost::optional<Milliseconds> getOperationElapsedTime() const; @@ -125,7 +122,7 @@ private: int64_t _failed = 0; int64_t _canceled = 0; - // Metrics for an active resharding operation. Accesses must be serialized using `_mutex`. + // Metrics for resharding operation. Accesses must be serialized using `_mutex`. struct OperationMetrics { // Allows tracking elapsed time for the resharding operation and its sub operations (e.g., // applying oplog entries). @@ -135,7 +132,6 @@ private: void start() noexcept; - void tryEnd() noexcept; void end() noexcept; Milliseconds duration() const noexcept; @@ -153,12 +149,9 @@ private: inCriticalSection(clockSource) {} using Role = ReporterOptions::Role; - void append(BSONObjBuilder*, Role) const; + void appendCurrentOpMetrics(BSONObjBuilder*, Role) const; - bool isCompleted() const noexcept { - return !(opStatus == ReshardingOperationStatusEnum::kRunning || - opStatus == ReshardingOperationStatusEnum::kInactive); - } + void appendCumulativeOpMetrics(BSONObjBuilder*) const; TimeInterval runningOperation; ReshardingOperationStatusEnum opStatus = ReshardingOperationStatusEnum::kInactive; @@ -181,6 +174,7 @@ private: CoordinatorStateEnum coordinatorState = CoordinatorStateEnum::kUnused; }; boost::optional<OperationMetrics> _currentOp; + OperationMetrics _cumulativeOp; }; } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_metrics_test.cpp b/src/mongo/db/s/resharding/resharding_metrics_test.cpp index 6624e17e6db..1ef859eac2e 100644 --- a/src/mongo/db/s/resharding/resharding_metrics_test.cpp +++ b/src/mongo/db/s/resharding/resharding_metrics_test.cpp @@ -46,10 +46,17 @@ namespace { using namespace fmt::literals; -constexpr auto kOpTimeRemaining = "remainingOperationTimeEstimatedMillis"_sd; +constexpr auto kOpTimeRemaining = "remainingOperationTimeEstimated"_sd; class ReshardingMetricsTest : public ServiceContextTest { public: + enum OpReportType { + CumulativeReport, + CurrentOpReportDonorRole, + CurrentOpReportRecipientRole, + CurrentOpReportCoordinatorRole + }; + void setUp() override { auto clockSource = std::make_unique<ClockSourceMock>(); _clockSource = clockSource.get(); @@ -67,20 +74,34 @@ public: _clockSource->advance(step); } - auto getReport() { + auto getReport(OpReportType reportType) { BSONObjBuilder bob; - getMetrics()->serialize(&bob); + if (reportType == OpReportType::CumulativeReport) { + getMetrics()->serializeCumulativeOpMetrics(&bob); + } else if (reportType == OpReportType::CurrentOpReportDonorRole) { + getMetrics()->serializeCurrentOpMetrics( + &bob, ReshardingMetrics::ReporterOptions::Role::kDonor); + } else if (reportType == OpReportType::CurrentOpReportRecipientRole) { + getMetrics()->serializeCurrentOpMetrics( + &bob, ReshardingMetrics::ReporterOptions::Role::kRecipient); + } else { + getMetrics()->serializeCurrentOpMetrics( + &bob, ReshardingMetrics::ReporterOptions::Role::kCoordinator); + } return bob.obj(); } - void checkMetrics(std::string tag, int expectedValue) { - const auto report = getReport(); - checkMetrics(report, std::move(tag), std::move(expectedValue)); + void checkMetrics(std::string tag, int expectedValue, OpReportType reportType) { + const auto report = getReport(reportType); + checkMetrics(report, std::move(tag), expectedValue); } - void checkMetrics(std::string tag, int expectedValue, std::string errMsg) { - const auto report = getReport(); - checkMetrics(report, std::move(tag), std::move(expectedValue), std::move(errMsg)); + void checkMetrics(std::string tag, + int expectedValue, + std::string errMsg, + OpReportType reportType) { + const auto report = getReport(reportType); + checkMetrics(report, std::move(tag), expectedValue, std::move(errMsg)); } void checkMetrics(const BSONObj& report, @@ -104,17 +125,11 @@ DEATH_TEST_F(ReshardingMetricsTest, RunOnCompletionBeforeOnStart, "No operation } TEST_F(ReshardingMetricsTest, OperationStatus) { - auto constexpr kTag = "opStatus"; - // No operation has completed yet, so the status is unknown. - checkMetrics(kTag, (int)ReshardingOperationStatusEnum::kInactive); - for (auto status : {ReshardingOperationStatusEnum::kSuccess, - ReshardingOperationStatusEnum::kFailure, - ReshardingOperationStatusEnum::kCanceled}) { - getMetrics()->onStart(); - checkMetrics(kTag, (int)ReshardingOperationStatusEnum::kRunning); - getMetrics()->onCompletion(status); - checkMetrics(kTag, (int)status); - } + getMetrics()->onStart(); + const auto report = getReport(OpReportType::CurrentOpReportCoordinatorRole); + ASSERT_EQ(report.getStringField("opStatus"), + ReshardingOperationStatus_serializer(ReshardingOperationStatusEnum::kRunning)); + getMetrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess); } TEST_F(ReshardingMetricsTest, TestOperationStatus) { @@ -137,33 +152,34 @@ TEST_F(ReshardingMetricsTest, TestOperationStatus) { getMetrics()->onCompletion(ReshardingOperationStatusEnum::kCanceled); } - checkMetrics("countReshardingSuccessful", kNumSuccessfulOps); - checkMetrics("countReshardingFailures", kNumFailedOps); - checkMetrics("countReshardingCanceled", kNumCanceledOps); + checkMetrics("countReshardingSuccessful", kNumSuccessfulOps, OpReportType::CumulativeReport); + checkMetrics("countReshardingFailures", kNumFailedOps, OpReportType::CumulativeReport); + checkMetrics("countReshardingCanceled", kNumCanceledOps, OpReportType::CumulativeReport); const auto total = kNumSuccessfulOps + kNumFailedOps + kNumCanceledOps; - checkMetrics("countReshardingOperations", total); + checkMetrics("countReshardingOperations", total, OpReportType::CumulativeReport); getMetrics()->onStart(); - checkMetrics("countReshardingOperations", total + 1); + checkMetrics("countReshardingOperations", total + 1, OpReportType::CumulativeReport); } TEST_F(ReshardingMetricsTest, TestElapsedTime) { getMetrics()->onStart(); - advanceTime(); - getMetrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess); - checkMetrics("totalOperationTimeElapsedMillis", kTimerStep); + const auto elapsedTime = 1; + advanceTime(Seconds(elapsedTime)); + checkMetrics("totalOperationTimeElapsed", elapsedTime, OpReportType::CurrentOpReportDonorRole); } TEST_F(ReshardingMetricsTest, TestDonorAndRecipientMetrics) { getMetrics()->onStart(); + const auto elapsedTime = 1; - advanceTime(); + advanceTime(Seconds(elapsedTime)); // Update metrics for donor const auto kWritesDuringCriticalSection = 7; getMetrics()->setDonorState(DonorStateEnum::kPreparingToBlockWrites); getMetrics()->onWriteDuringCriticalSection(kWritesDuringCriticalSection); - advanceTime(); + advanceTime(Seconds(elapsedTime)); // Update metrics for recipient const auto kDocumentsToCopy = 50; @@ -173,61 +189,121 @@ TEST_F(ReshardingMetricsTest, TestDonorAndRecipientMetrics) { getMetrics()->setDocumentsToCopy(kDocumentsToCopy, kBytesToCopy); getMetrics()->onDocumentsCopied(kDocumentsToCopy * kCopyProgress / 100, kBytesToCopy * kCopyProgress / 100); - advanceTime(); + advanceTime(Seconds(elapsedTime)); - const auto report = getReport(); + const auto currentDonorOpReport = getReport(OpReportType::CurrentOpReportDonorRole); + const auto currentRecipientOpReport = getReport(OpReportType::CurrentOpReportRecipientRole); getMetrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess); - checkMetrics(report, "totalCopyTimeElapsedMillis", kTimerStep); - checkMetrics(report, "bytesCopied", kBytesToCopy * kCopyProgress / 100); - checkMetrics(report, "documentsCopied", kDocumentsToCopy * kCopyProgress / 100); - checkMetrics(report, "totalCriticalSectionTimeElapsedMillis", kTimerStep * 2); - checkMetrics(report, "countWritesDuringCriticalSection", kWritesDuringCriticalSection); - - // Expected remaining time = totalCopyTimeElapsedMillis + 2 * estimated time to copy remaining - checkMetrics(report, - "remainingOperationTimeEstimatedMillis", - kTimerStep + 2 * (100 - kCopyProgress) / kCopyProgress * kTimerStep); + checkMetrics(currentRecipientOpReport, "totalCopyTimeElapsed", elapsedTime); + checkMetrics(currentRecipientOpReport, "bytesCopied", kBytesToCopy * kCopyProgress / 100); + checkMetrics( + currentRecipientOpReport, "documentsCopied", kDocumentsToCopy * kCopyProgress / 100); + checkMetrics(currentDonorOpReport, "totalCriticalSectionTimeElapsed", elapsedTime * 2); + checkMetrics( + currentDonorOpReport, "countWritesDuringCriticalSection", kWritesDuringCriticalSection); + + // Expected remaining time = totalCopyTimeElapsed + 2 * estimated time to copy remaining + checkMetrics(currentDonorOpReport, + "remainingOperationTimeEstimated", + elapsedTime + 2 * (100 - kCopyProgress) / kCopyProgress * elapsedTime); + + const auto cumulativeReportAfterCompletion = getReport(OpReportType::CumulativeReport); + checkMetrics( + cumulativeReportAfterCompletion, "bytesCopied", kBytesToCopy * kCopyProgress / 100); + checkMetrics( + cumulativeReportAfterCompletion, "documentsCopied", kDocumentsToCopy * kCopyProgress / 100); + checkMetrics(cumulativeReportAfterCompletion, + "countWritesDuringCriticalSection", + kWritesDuringCriticalSection); } -TEST_F(ReshardingMetricsTest, MetricsAreRetainedAfterCompletion) { - auto constexpr kTag = "totalOperationTimeElapsedMillis"; +TEST_F(ReshardingMetricsTest, CumulativeOpMetricsAreRetainedAfterCompletion) { + auto constexpr kTag = "documentsCopied"; + getMetrics()->onStart(); + const auto kDocumentsToCopy = 2; + const auto kBytesToCopy = 200; + getMetrics()->setRecipientState(RecipientStateEnum::kCloning); + getMetrics()->onDocumentsCopied(kDocumentsToCopy, kBytesToCopy); + advanceTime(); + getMetrics()->onCompletion(ReshardingOperationStatusEnum::kFailure); + advanceTime(); + + checkMetrics(kTag, + kDocumentsToCopy, + "Cumulative metrics are not retained", + OpReportType::CumulativeReport); + + getMetrics()->onStart(); + checkMetrics( + kTag, kDocumentsToCopy, "Cumulative metrics are reset", OpReportType::CumulativeReport); +} +TEST_F(ReshardingMetricsTest, CurrentOpMetricsAreResetAfterCompletion) { + auto constexpr kTag = "documentsCopied"; getMetrics()->onStart(); + const auto kDocumentsToCopy = 2; + const auto kBytesToCopy = 200; + getMetrics()->setRecipientState(RecipientStateEnum::kCloning); + getMetrics()->onDocumentsCopied(kDocumentsToCopy, kBytesToCopy); + checkMetrics(kTag, + kDocumentsToCopy, + "Current metrics are not set", + OpReportType::CurrentOpReportRecipientRole); advanceTime(); getMetrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess); advanceTime(); - checkMetrics(kTag, kTimerStep, "Metrics are not retained"); + getMetrics()->onStart(); + checkMetrics( + kTag, 0, "Current metrics are not reset", OpReportType::CurrentOpReportRecipientRole); +} +TEST_F(ReshardingMetricsTest, CurrentOpMetricsAreNotRetainedAfterCompletion) { + auto constexpr kTag = "documentsCopied"; getMetrics()->onStart(); - checkMetrics(kTag, 0, "Metrics are not reset"); + const auto kDocumentsToCopy = 2; + const auto kBytesToCopy = 200; + getMetrics()->setRecipientState(RecipientStateEnum::kCloning); + getMetrics()->onDocumentsCopied(kDocumentsToCopy, kBytesToCopy); + checkMetrics(kTag, + kDocumentsToCopy, + "Current metrics are not set", + OpReportType::CurrentOpReportRecipientRole); + advanceTime(); + getMetrics()->onCompletion(ReshardingOperationStatusEnum::kFailure); + advanceTime(); + + ASSERT_FALSE(getReport(OpReportType::CurrentOpReportRecipientRole)[kTag].ok()); } TEST_F(ReshardingMetricsTest, EstimatedRemainingOperationTime) { - auto constexpr kTag = "remainingOperationTimeEstimatedMillis"; + auto constexpr kTag = "remainingOperationTimeEstimated"; + const auto elapsedTime = 1; getMetrics()->onStart(); - checkMetrics(kTag, -1); + checkMetrics(kTag, -1, OpReportType::CurrentOpReportDonorRole); const auto kDocumentsToCopy = 2; const auto kBytesToCopy = 200; getMetrics()->setRecipientState(RecipientStateEnum::kCloning); getMetrics()->setDocumentsToCopy(kDocumentsToCopy, kBytesToCopy); getMetrics()->onDocumentsCopied(kDocumentsToCopy / 2, kBytesToCopy / 2); - advanceTime(); + advanceTime(Seconds(elapsedTime)); // Since 50% of the data is copied, the remaining copy time equals the elapsed copy time, which - // is equal to `kTimerStep` milliseconds. - checkMetrics(kTag, kTimerStep + 2 * kTimerStep); + // is equal to `elapsedTime` seconds. + checkMetrics(kTag, elapsedTime + 2 * elapsedTime, OpReportType::CurrentOpReportDonorRole); const auto kOplogEntriesFetched = 4; const auto kOplogEntriesApplied = 2; getMetrics()->setRecipientState(RecipientStateEnum::kApplying); getMetrics()->onOplogEntriesFetched(kOplogEntriesFetched); getMetrics()->onOplogEntriesApplied(kOplogEntriesApplied); - advanceTime(); - // So far, the time to apply oplog entries equals `kTimerStep` milliseconds. - checkMetrics(kTag, kTimerStep * (kOplogEntriesFetched / kOplogEntriesApplied - 1)); + advanceTime(Seconds(elapsedTime)); + // So far, the time to apply oplog entries equals `elapsedTime` seconds. + checkMetrics(kTag, + elapsedTime * (kOplogEntriesFetched / kOplogEntriesApplied - 1), + OpReportType::CurrentOpReportDonorRole); } TEST_F(ReshardingMetricsTest, CurrentOpReportForDonor) { @@ -384,7 +460,7 @@ TEST_F(ReshardingMetricsTest, EstimatedRemainingOperationTimeCloning) { auto m = getMetrics(); m->onStart(); m->setRecipientState(RecipientStateEnum::kCloning); - auto timePerDocument = Milliseconds{123}; + auto timePerDocument = Seconds(2); int64_t bytesPerDocument = 1024; int64_t documentsToCopy = 409; int64_t bytesToCopy = bytesPerDocument * documentsToCopy; @@ -392,12 +468,13 @@ TEST_F(ReshardingMetricsTest, EstimatedRemainingOperationTimeCloning) { auto remainingTime = 2 * timePerDocument * documentsToCopy; double maxAbsRelErr = 0; for (int64_t copied = 0; copied < documentsToCopy; ++copied) { - double output = getReport()[kOpTimeRemaining].Number(); + double output = + getReport(OpReportType::CurrentOpReportRecipientRole)[kOpTimeRemaining].Number(); if (copied == 0) { ASSERT_EQ(output, -1); } else { ASSERT_GTE(output, 0); - auto expected = durationCount<Milliseconds>(remainingTime); + auto expected = durationCount<Seconds>(remainingTime); // Check that error is pretty small (it should get better as the operation progresses) double absRelErr = std::abs((output - expected) / expected); ASSERT_LT(absRelErr, 0.05) @@ -417,17 +494,20 @@ TEST_F(ReshardingMetricsTest, EstimatedRemainingOperationTimeApplying) { auto m = getMetrics(); m->onStart(); m->setRecipientState(RecipientStateEnum::kApplying); - auto timePerOp = Milliseconds{123}; + // 1 extra millisecond here because otherwise an error of just 1ms will round this down to the + // next second. + auto timePerOp = Milliseconds(1001); int64_t fetched = 10000; m->onOplogEntriesFetched(fetched); auto remainingTime = timePerOp * fetched; double maxAbsRelErr = 0; for (int64_t applied = 0; applied < fetched; ++applied) { - double output = getReport()[kOpTimeRemaining].Number(); + double output = + getReport(OpReportType::CurrentOpReportRecipientRole)[kOpTimeRemaining].Number(); if (applied == 0) { ASSERT_EQ(output, -1); } else { - auto expected = durationCount<Milliseconds>(remainingTime); + auto expected = durationCount<Seconds>(remainingTime); // Check that error is pretty small (it should get better as the operation progresses) double absRelErr = std::abs((output - expected) / expected); ASSERT_LT(absRelErr, 0.05) @@ -442,5 +522,29 @@ TEST_F(ReshardingMetricsTest, EstimatedRemainingOperationTimeApplying) { 5422701, 3, "Max absolute relative error observed", "maxAbsRelErr"_attr = maxAbsRelErr); } +TEST_F(ReshardingMetricsTest, CumulativeOpMetricsAccumulate) { + auto constexpr kTag = "documentsCopied"; + getMetrics()->onStart(); + const auto kDocumentsToCopy1 = 2; + const auto kBytesToCopy1 = 200; + + getMetrics()->setRecipientState(RecipientStateEnum::kCloning); + getMetrics()->onDocumentsCopied(kDocumentsToCopy1, kBytesToCopy1); + getMetrics()->onCompletion(ReshardingOperationStatusEnum::kFailure); + + getMetrics()->onStart(); + const auto kDocumentsToCopy2 = 3; + const auto kBytesToCopy2 = 400; + + getMetrics()->setRecipientState(RecipientStateEnum::kCloning); + getMetrics()->onDocumentsCopied(kDocumentsToCopy2, kBytesToCopy2); + getMetrics()->onCompletion(ReshardingOperationStatusEnum::kFailure); + + checkMetrics(kTag, + kDocumentsToCopy1 + kDocumentsToCopy2, + "Cumulative metrics are not accumulated", + OpReportType::CumulativeReport); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp index e49959b7143..224adf089f8 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp @@ -280,7 +280,8 @@ public: long long metricsAppliedCount() const { BSONObjBuilder bob; - _metrics->serialize(&bob, ReshardingMetrics::ReporterOptions::Role::kRecipient); + _metrics->serializeCurrentOpMetrics(&bob, + ReshardingMetrics::ReporterOptions::Role::kRecipient); return bob.obj()["oplogEntriesApplied"_sd].Long(); } diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp index ab0932a9daf..93bf7646532 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp @@ -307,7 +307,8 @@ public: long long metricsFetchedCount() const { BSONObjBuilder bob; - _metrics->serialize(&bob, ReshardingMetrics::ReporterOptions::Role::kRecipient); + _metrics->serializeCurrentOpMetrics(&bob, + ReshardingMetrics::ReporterOptions::Role::kRecipient); return bob.obj()["oplogEntriesFetched"_sd].Long(); } diff --git a/src/mongo/db/s/sharding_server_status.cpp b/src/mongo/db/s/sharding_server_status.cpp index 19015bf5472..3489612493c 100644 --- a/src/mongo/db/s/sharding_server_status.cpp +++ b/src/mongo/db/s/sharding_server_status.cpp @@ -118,7 +118,8 @@ public: // checking whether the resharding feature is enabled here. if (resharding::gFeatureFlagResharding.isEnabledAndIgnoreFCV()) { BSONObjBuilder subObjBuilder(result.subobjStart("resharding")); - ReshardingMetrics::get(opCtx->getServiceContext())->serialize(&subObjBuilder); + ReshardingMetrics::get(opCtx->getServiceContext()) + ->serializeCumulativeOpMetrics(&subObjBuilder); } return result.obj(); |