diff options
author | Blake Oler <blake.oler@mongodb.com> | 2021-05-19 16:51:40 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-05-20 20:08:24 +0000 |
commit | fe0042206cc3ea6a0792a956a876d5793a1c67c2 (patch) | |
tree | 2ef5996ee7c2a20348cdb959d2f7f211601b7233 /src/mongo/db/s | |
parent | a05828b76b27b5198ca2917ba5f979458c084358 (diff) | |
download | mongo-fe0042206cc3ea6a0792a956a876d5793a1c67c2.tar.gz |
SERVER-56739 Rewrite resharding metrics duration component to allow for resuming from stepup
Diffstat (limited to 'src/mongo/db/s')
10 files changed, 190 insertions, 91 deletions
diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp b/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp index 88e16c2202f..f105299e080 100644 --- a/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp +++ b/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp @@ -109,7 +109,7 @@ protected: void setUp() override { ServiceContextTest::setUp(); _metrics = std::make_unique<ReshardingMetrics>(getServiceContext()); - _metrics->onStart(); + _metrics->onStart(getServiceContext()->getFastClockSource()->now()); _metrics->setRecipientState(RecipientStateEnum::kCloning); } diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp index 8f428d62193..6b90cead55e 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp @@ -80,6 +80,11 @@ bool shouldStopAttemptingToCreateIndex(Status status, const CancellationToken& t return status.isOK() || token.isCanceled(); } +Date_t getCurrentTime() { + const auto svcCtx = cc().getServiceContext(); + return svcCtx->getFastClockSource()->now(); +} + void assertNumDocsModifiedMatchesExpected(const BatchedCommandRequest& request, const BSONObj& response, int expected) { @@ -1010,13 +1015,14 @@ void ReshardingCoordinatorService::ReshardingCoordinator::installCoordinatorDoc( } void markCompleted(const Status& status) { + auto currentTime = getCurrentTime(); auto metrics = ReshardingMetrics::get(cc().getServiceContext()); if (status.isOK()) - metrics->onCompletion(ReshardingOperationStatusEnum::kSuccess); + metrics->onCompletion(ReshardingOperationStatusEnum::kSuccess, currentTime); else if (status == ErrorCodes::ReshardCollectionAborted) - metrics->onCompletion(ReshardingOperationStatusEnum::kCanceled); + metrics->onCompletion(ReshardingOperationStatusEnum::kCanceled, currentTime); else - metrics->onCompletion(ReshardingOperationStatusEnum::kFailure); + metrics->onCompletion(ReshardingOperationStatusEnum::kFailure, currentTime); } BSONObj createFlushReshardingStateChangeCommand(const NamespaceString& nss) { @@ -1304,7 +1310,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_insertCoordDocAndChan _coordinatorDocWrittenPromise.emplaceValue(); // TODO SERVER-53914 to accommodate loading metrics for the coordinator. - ReshardingMetrics::get(cc().getServiceContext())->onStart(); + ReshardingMetrics::get(cc().getServiceContext())->onStart(getCurrentTime()); } void ReshardingCoordinatorService::ReshardingCoordinator:: diff --git a/src/mongo/db/s/resharding/resharding_metrics.cpp b/src/mongo/db/s/resharding/resharding_metrics.cpp index 01e447b2524..2b01ae0e77f 100644 --- a/src/mongo/db/s/resharding/resharding_metrics.cpp +++ b/src/mongo/db/s/resharding/resharding_metrics.cpp @@ -89,18 +89,19 @@ ReshardingMetrics* ReshardingMetrics::get(ServiceContext* ctx) noexcept { return getMetrics(ctx).get(); } -void ReshardingMetrics::onStart() noexcept { +void ReshardingMetrics::onStart(Date_t runningOperationStartTime) noexcept { stdx::lock_guard<Latch> lk(_mutex); // TODO Re-add this invariant once all breaking test cases have been fixed. // invariant(!_currentOp.has_value(), kAnotherOperationInProgress); // Create a new operation and record the time it started. _currentOp.emplace(_svcCtx->getFastClockSource()); - _currentOp->runningOperation.start(); + _currentOp->runningOperation.start(runningOperationStartTime); _currentOp->opStatus = ReshardingOperationStatusEnum::kRunning; _started++; } -void ReshardingMetrics::onCompletion(ReshardingOperationStatusEnum status) noexcept { +void ReshardingMetrics::onCompletion(ReshardingOperationStatusEnum status, + Date_t runningOperationEndTime) noexcept { stdx::lock_guard<Latch> lk(_mutex); // TODO Re-add this invariant once all breaking test cases have been fixed. // invariant(_currentOp.has_value(), kNoOperationInProgress); @@ -118,6 +119,8 @@ void ReshardingMetrics::onCompletion(ReshardingOperationStatusEnum status) noexc MONGO_UNREACHABLE; } + _currentOp->runningOperation.end(runningOperationEndTime); + // Reset current op metrics. _currentOp = boost::none; } @@ -131,9 +134,9 @@ void ReshardingMetrics::onStepUp() noexcept { // TODO SERVER-53914 Implement coordinator metrics rehydration. // TODO SERVER-53912 Implement recipient metrics rehydration. - // TODO SERVER-56739 Resume the runningOperation duration from a timestamp stored on disk + // TODO SERVER-57094 Resume the runningOperation duration from a timestamp stored on disk // instead of starting from the current time. - _currentOp->runningOperation.start(); + _currentOp->runningOperation.start(_svcCtx->getFastClockSource()->now()); _currentOp->opStatus = ReshardingOperationStatusEnum::kRunning; } @@ -156,18 +159,6 @@ void ReshardingMetrics::setRecipientState(RecipientStateEnum state) noexcept { const auto oldState = std::exchange(_currentOp->recipientState, state); invariant(oldState != state); - - if (state == RecipientStateEnum::kCloning) { - _currentOp->copyingDocuments.start(); - } else if (state == RecipientStateEnum::kApplying) { - _currentOp->applyingOplogEntries.start(); - } - - if (oldState == RecipientStateEnum::kCloning) { - _currentOp->copyingDocuments.end(); - } else if (oldState == RecipientStateEnum::kApplying) { - _currentOp->applyingOplogEntries.end(); - } } void ReshardingMetrics::setCoordinatorState(CoordinatorStateEnum state) noexcept { @@ -227,14 +218,34 @@ void ReshardingMetrics::onDocumentsCopied(int64_t documents, int64_t bytes) noex _cumulativeOp.bytesCopied += bytes; } -void ReshardingMetrics::startInCriticalSection() { +void ReshardingMetrics::startCopyingDocuments(Date_t start) { + stdx::lock_guard<Latch> lk(_mutex); + _currentOp->copyingDocuments.start(start); +} + +void ReshardingMetrics::endCopyingDocuments(Date_t end) { + stdx::lock_guard<Latch> lk(_mutex); + _currentOp->copyingDocuments.forceEnd(end); +} + +void ReshardingMetrics::startApplyingOplogEntries(Date_t start) { stdx::lock_guard<Latch> lk(_mutex); - _currentOp->inCriticalSection.start(); + _currentOp->applyingOplogEntries.start(start); } -void ReshardingMetrics::endInCritcialSection() { +void ReshardingMetrics::endApplyingOplogEntries(Date_t end) { stdx::lock_guard<Latch> lk(_mutex); - _currentOp->inCriticalSection.end(); + _currentOp->applyingOplogEntries.forceEnd(end); +} + +void ReshardingMetrics::startInCriticalSection(Date_t start) { + stdx::lock_guard<Latch> lk(_mutex); + _currentOp->inCriticalSection.start(start); +} + +void ReshardingMetrics::endInCriticalSection(Date_t end) { + stdx::lock_guard<Latch> lk(_mutex); + _currentOp->inCriticalSection.forceEnd(end); } void ReshardingMetrics::onOplogEntriesFetched(int64_t entries) noexcept { @@ -276,15 +287,23 @@ void ReshardingMetrics::onWriteDuringCriticalSection(int64_t writes) noexcept { _cumulativeOp.writesDuringCriticalSection += writes; } -void ReshardingMetrics::OperationMetrics::TimeInterval::start() noexcept { +void ReshardingMetrics::OperationMetrics::TimeInterval::start(Date_t start) noexcept { invariant(!_start.has_value(), "Already started"); - _start.emplace(_clockSource->now()); + _start.emplace(start); } -void ReshardingMetrics::OperationMetrics::TimeInterval::end() noexcept { +void ReshardingMetrics::OperationMetrics::TimeInterval::end(Date_t end) noexcept { invariant(_start.has_value(), "Not started"); invariant(!_end.has_value(), "Already stopped"); - _end.emplace(_clockSource->now()); + _end.emplace(end); +} + +void ReshardingMetrics::OperationMetrics::TimeInterval::forceEnd(Date_t end) noexcept { + if (!_start.has_value()) { + _start.emplace(end); + } + + this->end(end); } Milliseconds ReshardingMetrics::OperationMetrics::TimeInterval::duration() const noexcept { diff --git a/src/mongo/db/s/resharding/resharding_metrics.h b/src/mongo/db/s/resharding/resharding_metrics.h index 4d703cec82b..aeb3ddf11e1 100644 --- a/src/mongo/db/s/resharding/resharding_metrics.h +++ b/src/mongo/db/s/resharding/resharding_metrics.h @@ -59,7 +59,7 @@ public: // Marks the beginning of a resharding operation. Not that only one resharding operation may run // at any time. - void onStart() noexcept; + void onStart(Date_t runningOperationStartTime) noexcept; // Marks the resumption of a resharding operation. Note that only one resharding operation may // run at any time. @@ -76,9 +76,15 @@ public: // Allows updating metrics on "documents to copy" so long as the recipient is in cloning state. void onDocumentsCopied(int64_t documents, int64_t bytes) noexcept; - // Starts/ends the timer recording the time spent in the critical section. - void startInCriticalSection(); - void endInCritcialSection(); + // Starts/ends the timers recording the times spend in the named sections. + void startCopyingDocuments(Date_t start); + void endCopyingDocuments(Date_t end); + + void startApplyingOplogEntries(Date_t start); + void endApplyingOplogEntries(Date_t end); + + void startInCriticalSection(Date_t start); + void endInCriticalSection(Date_t end); // Allows updating "oplog entries to apply" metrics when the recipient is in applying state. void onOplogEntriesFetched(int64_t entries) noexcept; @@ -94,7 +100,8 @@ public: // Marks the completion of the current (active) resharding operation. Aborts the process if no // resharding operation is in progress. - void onCompletion(ReshardingOperationStatusEnum) noexcept; + void onCompletion(ReshardingOperationStatusEnum status, + Date_t runningOperationEndTime) noexcept; struct ReporterOptions { enum class Role { kDonor, kRecipient, kCoordinator }; @@ -145,9 +152,12 @@ private: public: explicit TimeInterval(ClockSource* clockSource) : _clockSource(clockSource) {} - void start() noexcept; + void start(Date_t start) noexcept; + + void end(Date_t end) noexcept; - void end() noexcept; + // TODO Remove this function once all metrics classes can start from stepup. + void forceEnd(Date_t end) noexcept; Milliseconds duration() const noexcept; diff --git a/src/mongo/db/s/resharding/resharding_metrics_test.cpp b/src/mongo/db/s/resharding/resharding_metrics_test.cpp index 4927109c79a..dd1a131501d 100644 --- a/src/mongo/db/s/resharding/resharding_metrics_test.cpp +++ b/src/mongo/db/s/resharding/resharding_metrics_test.cpp @@ -119,37 +119,41 @@ private: // TODO Re-enable once underlying invariants are re-enabled /* DEATH_TEST_F(ReshardingMetricsTest, RunOnCompletionBeforeOnStart, "No operation is in progress") { - getMetrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess); + getMetrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess, +getGlobalServiceContext()->getFastClockSource()->now()); } DEATH_TEST_F(ReshardingMetricsTest, RunOnStepUpAfterOnStartInvariants, "Another operation is in progress") { - getMetrics()->onStart(); + getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now()); getMetrics()->onStepUp(); } DEATH_TEST_F(ReshardingMetricsTest, RunOnCompletionAfterOnStepDownInvariants, "No operation is in progress") { - getMetrics()->onStart(); + getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now()); getMetrics()->onStepDown(); - getMetrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess); + getMetrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess, +getGlobalServiceContext()->getFastClockSource()->now()); } */ TEST_F(ReshardingMetricsTest, RunOnStepDownAfterOnCompletionIsSafe) { - getMetrics()->onStart(); - getMetrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess); + getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now()); + getMetrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess, + getGlobalServiceContext()->getFastClockSource()->now()); getMetrics()->onStepDown(); } TEST_F(ReshardingMetricsTest, OperationStatus) { - getMetrics()->onStart(); + getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now()); const auto report = getReport(OpReportType::CurrentOpReportCoordinatorRole); ASSERT_EQ(report.getStringField("opStatus"), ReshardingOperationStatus_serializer(ReshardingOperationStatusEnum::kRunning)); - getMetrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess); + getMetrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess, + getGlobalServiceContext()->getFastClockSource()->now()); } TEST_F(ReshardingMetricsTest, TestOperationStatus) { @@ -158,18 +162,21 @@ TEST_F(ReshardingMetricsTest, TestOperationStatus) { const auto kNumCanceledOps = 7; for (auto i = 0; i < kNumSuccessfulOps; i++) { - getMetrics()->onStart(); - getMetrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess); + getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now()); + getMetrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess, + getGlobalServiceContext()->getFastClockSource()->now()); } for (auto i = 0; i < kNumFailedOps; i++) { - getMetrics()->onStart(); - getMetrics()->onCompletion(ReshardingOperationStatusEnum::kFailure); + getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now()); + getMetrics()->onCompletion(ReshardingOperationStatusEnum::kFailure, + getGlobalServiceContext()->getFastClockSource()->now()); } for (auto i = 0; i < kNumCanceledOps; i++) { - getMetrics()->onStart(); - getMetrics()->onCompletion(ReshardingOperationStatusEnum::kCanceled); + getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now()); + getMetrics()->onCompletion(ReshardingOperationStatusEnum::kCanceled, + getGlobalServiceContext()->getFastClockSource()->now()); } checkMetrics("countReshardingSuccessful", kNumSuccessfulOps, OpReportType::CumulativeReport); @@ -178,19 +185,19 @@ TEST_F(ReshardingMetricsTest, TestOperationStatus) { const auto total = kNumSuccessfulOps + kNumFailedOps + kNumCanceledOps; checkMetrics("countReshardingOperations", total, OpReportType::CumulativeReport); - getMetrics()->onStart(); + getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now()); checkMetrics("countReshardingOperations", total + 1, OpReportType::CumulativeReport); } TEST_F(ReshardingMetricsTest, TestElapsedTime) { - getMetrics()->onStart(); + getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now()); const auto elapsedTime = 1; advanceTime(Seconds(elapsedTime)); checkMetrics("totalOperationTimeElapsed", elapsedTime, OpReportType::CurrentOpReportDonorRole); } TEST_F(ReshardingMetricsTest, TestDonorAndRecipientMetrics) { - getMetrics()->onStart(); + getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now()); const auto elapsedTime = 1; advanceTime(Seconds(elapsedTime)); @@ -198,7 +205,7 @@ TEST_F(ReshardingMetricsTest, TestDonorAndRecipientMetrics) { // Update metrics for donor const auto kWritesDuringCriticalSection = 7; getMetrics()->setDonorState(DonorStateEnum::kDonatingOplogEntries); - getMetrics()->startInCriticalSection(); + getMetrics()->startInCriticalSection(getGlobalServiceContext()->getFastClockSource()->now()); getMetrics()->onWriteDuringCriticalSection(kWritesDuringCriticalSection); advanceTime(Seconds(elapsedTime)); @@ -209,13 +216,15 @@ TEST_F(ReshardingMetricsTest, TestDonorAndRecipientMetrics) { getMetrics()->setRecipientState(RecipientStateEnum::kCreatingCollection); getMetrics()->setDocumentsToCopy(kDocumentsToCopy, kBytesToCopy); getMetrics()->setRecipientState(RecipientStateEnum::kCloning); + getMetrics()->startCopyingDocuments(getGlobalServiceContext()->getFastClockSource()->now()); getMetrics()->onDocumentsCopied(kDocumentsToCopy * kCopyProgress / 100, kBytesToCopy * kCopyProgress / 100); advanceTime(Seconds(elapsedTime)); const auto currentDonorOpReport = getReport(OpReportType::CurrentOpReportDonorRole); const auto currentRecipientOpReport = getReport(OpReportType::CurrentOpReportRecipientRole); - getMetrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess); + getMetrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess, + getGlobalServiceContext()->getFastClockSource()->now()); checkMetrics(currentRecipientOpReport, "totalCopyTimeElapsed", elapsedTime); checkMetrics(currentRecipientOpReport, "bytesCopied", kBytesToCopy * kCopyProgress / 100); @@ -242,13 +251,15 @@ TEST_F(ReshardingMetricsTest, TestDonorAndRecipientMetrics) { TEST_F(ReshardingMetricsTest, CumulativeOpMetricsAreRetainedAfterCompletion) { auto constexpr kTag = "documentsCopied"; - getMetrics()->onStart(); + getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now()); const auto kDocumentsToCopy = 2; const auto kBytesToCopy = 200; getMetrics()->setRecipientState(RecipientStateEnum::kCloning); + getMetrics()->startCopyingDocuments(getGlobalServiceContext()->getFastClockSource()->now()); getMetrics()->onDocumentsCopied(kDocumentsToCopy, kBytesToCopy); advanceTime(); - getMetrics()->onCompletion(ReshardingOperationStatusEnum::kFailure); + getMetrics()->onCompletion(ReshardingOperationStatusEnum::kFailure, + getGlobalServiceContext()->getFastClockSource()->now()); advanceTime(); checkMetrics(kTag, @@ -256,20 +267,22 @@ TEST_F(ReshardingMetricsTest, CumulativeOpMetricsAreRetainedAfterCompletion) { "Cumulative metrics are not retained", OpReportType::CumulativeReport); - getMetrics()->onStart(); + getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now()); checkMetrics( kTag, kDocumentsToCopy, "Cumulative metrics are reset", OpReportType::CumulativeReport); } TEST_F(ReshardingMetricsTest, CumulativeOpMetricsAreRetainedAfterCancellation) { auto constexpr kTag = "documentsCopied"; - getMetrics()->onStart(); + getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now()); const auto kDocumentsToCopy = 2; const auto kBytesToCopy = 200; getMetrics()->setRecipientState(RecipientStateEnum::kCloning); + getMetrics()->startCopyingDocuments(getGlobalServiceContext()->getFastClockSource()->now()); getMetrics()->onDocumentsCopied(kDocumentsToCopy, kBytesToCopy); advanceTime(); - getMetrics()->onCompletion(ReshardingOperationStatusEnum::kCanceled); + getMetrics()->onCompletion(ReshardingOperationStatusEnum::kCanceled, + getGlobalServiceContext()->getFastClockSource()->now()); advanceTime(); checkMetrics(kTag, @@ -277,44 +290,48 @@ TEST_F(ReshardingMetricsTest, CumulativeOpMetricsAreRetainedAfterCancellation) { "Cumulative metrics are not retained", OpReportType::CumulativeReport); - getMetrics()->onStart(); + getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now()); checkMetrics( kTag, kDocumentsToCopy, "Cumulative metrics are reset", OpReportType::CumulativeReport); } TEST_F(ReshardingMetricsTest, CurrentOpMetricsAreResetAfterCompletion) { auto constexpr kTag = "documentsCopied"; - getMetrics()->onStart(); + getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now()); const auto kDocumentsToCopy = 2; const auto kBytesToCopy = 200; getMetrics()->setRecipientState(RecipientStateEnum::kCloning); + getMetrics()->startCopyingDocuments(getGlobalServiceContext()->getFastClockSource()->now()); getMetrics()->onDocumentsCopied(kDocumentsToCopy, kBytesToCopy); checkMetrics(kTag, kDocumentsToCopy, "Current metrics are not set", OpReportType::CurrentOpReportRecipientRole); advanceTime(); - getMetrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess); + getMetrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess, + getGlobalServiceContext()->getFastClockSource()->now()); advanceTime(); - getMetrics()->onStart(); + getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now()); checkMetrics( kTag, 0, "Current metrics are not reset", OpReportType::CurrentOpReportRecipientRole); } TEST_F(ReshardingMetricsTest, CurrentOpMetricsAreNotRetainedAfterCompletion) { auto constexpr kTag = "documentsCopied"; - getMetrics()->onStart(); + getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now()); const auto kDocumentsToCopy = 2; const auto kBytesToCopy = 200; getMetrics()->setRecipientState(RecipientStateEnum::kCloning); + getMetrics()->startCopyingDocuments(getGlobalServiceContext()->getFastClockSource()->now()); getMetrics()->onDocumentsCopied(kDocumentsToCopy, kBytesToCopy); checkMetrics(kTag, kDocumentsToCopy, "Current metrics are not set", OpReportType::CurrentOpReportRecipientRole); advanceTime(); - getMetrics()->onCompletion(ReshardingOperationStatusEnum::kFailure); + getMetrics()->onCompletion(ReshardingOperationStatusEnum::kFailure, + getGlobalServiceContext()->getFastClockSource()->now()); advanceTime(); ASSERT_FALSE(getReport(OpReportType::CurrentOpReportRecipientRole)[kTag].ok()); @@ -322,10 +339,11 @@ TEST_F(ReshardingMetricsTest, CurrentOpMetricsAreNotRetainedAfterCompletion) { TEST_F(ReshardingMetricsTest, CurrentOpMetricsAreNotRetainedAfterStepDown) { auto constexpr kTag = "documentsCopied"; - getMetrics()->onStart(); + getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now()); const auto kDocumentsToCopy = 2; const auto kBytesToCopy = 200; getMetrics()->setRecipientState(RecipientStateEnum::kCloning); + getMetrics()->startCopyingDocuments(getGlobalServiceContext()->getFastClockSource()->now()); getMetrics()->onDocumentsCopied(kDocumentsToCopy, kBytesToCopy); checkMetrics(kTag, kDocumentsToCopy, @@ -342,7 +360,7 @@ TEST_F(ReshardingMetricsTest, EstimatedRemainingOperationTime) { auto constexpr kTag = "remainingOperationTimeEstimated"; const auto elapsedTime = 1; - getMetrics()->onStart(); + getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now()); checkMetrics(kTag, -1, OpReportType::CurrentOpReportDonorRole); const auto kDocumentsToCopy = 2; @@ -350,6 +368,7 @@ TEST_F(ReshardingMetricsTest, EstimatedRemainingOperationTime) { getMetrics()->setRecipientState(RecipientStateEnum::kCreatingCollection); getMetrics()->setDocumentsToCopy(kDocumentsToCopy, kBytesToCopy); getMetrics()->setRecipientState(RecipientStateEnum::kCloning); + getMetrics()->startCopyingDocuments(getGlobalServiceContext()->getFastClockSource()->now()); getMetrics()->onDocumentsCopied(kDocumentsToCopy / 2, kBytesToCopy / 2); advanceTime(Seconds(elapsedTime)); // Since 50% of the data is copied, the remaining copy time equals the elapsed copy time, which @@ -359,6 +378,8 @@ TEST_F(ReshardingMetricsTest, EstimatedRemainingOperationTime) { const auto kOplogEntriesFetched = 4; const auto kOplogEntriesApplied = 2; getMetrics()->setRecipientState(RecipientStateEnum::kApplying); + getMetrics()->endCopyingDocuments(getGlobalServiceContext()->getFastClockSource()->now()); + getMetrics()->startApplyingOplogEntries(getGlobalServiceContext()->getFastClockSource()->now()); getMetrics()->onOplogEntriesFetched(kOplogEntriesFetched); getMetrics()->onOplogEntriesApplied(kOplogEntriesApplied); advanceTime(Seconds(elapsedTime)); @@ -370,10 +391,10 @@ TEST_F(ReshardingMetricsTest, EstimatedRemainingOperationTime) { TEST_F(ReshardingMetricsTest, CurrentOpReportForDonor) { const auto kDonorState = DonorStateEnum::kDonatingOplogEntries; - getMetrics()->onStart(); + getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now()); advanceTime(Seconds(2)); getMetrics()->setDonorState(kDonorState); - getMetrics()->startInCriticalSection(); + getMetrics()->startInCriticalSection(getGlobalServiceContext()->getFastClockSource()->now()); advanceTime(Seconds(3)); const ReshardingMetrics::ReporterOptions options( @@ -420,13 +441,14 @@ TEST_F(ReshardingMetricsTest, CurrentOpReportForRecipient) { static_assert(kBytesToCopy >= kBytesCopied); constexpr auto kDelayBeforeCloning = Seconds(2); - getMetrics()->onStart(); + getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now()); advanceTime(kDelayBeforeCloning); constexpr auto kTimeSpentCloning = Seconds(3); getMetrics()->setRecipientState(RecipientStateEnum::kCreatingCollection); getMetrics()->setDocumentsToCopy(kDocumentsToCopy, kBytesToCopy); getMetrics()->setRecipientState(kRecipientState); + getMetrics()->startCopyingDocuments(getGlobalServiceContext()->getFastClockSource()->now()); advanceTime(kTimeSpentCloning); getMetrics()->onDocumentsCopied(kDocumentsCopied, kBytesCopied); @@ -484,7 +506,7 @@ TEST_F(ReshardingMetricsTest, CurrentOpReportForCoordinator) { const auto kCoordinatorState = CoordinatorStateEnum::kInitializing; const auto kSomeDuration = Seconds(10); - getMetrics()->onStart(); + getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now()); getMetrics()->setCoordinatorState(kCoordinatorState); advanceTime(kSomeDuration); @@ -522,7 +544,7 @@ TEST_F(ReshardingMetricsTest, CurrentOpReportForCoordinator) { TEST_F(ReshardingMetricsTest, EstimatedRemainingOperationTimeCloning) { // Copy N docs @ timePerDoc. Check the progression of the estimated time remaining. auto m = getMetrics(); - m->onStart(); + m->onStart(getGlobalServiceContext()->getFastClockSource()->now()); auto timePerDocument = Seconds(2); int64_t bytesPerDocument = 1024; int64_t documentsToCopy = 409; @@ -530,6 +552,7 @@ TEST_F(ReshardingMetricsTest, EstimatedRemainingOperationTimeCloning) { m->setRecipientState(RecipientStateEnum::kCreatingCollection); m->setDocumentsToCopy(documentsToCopy, bytesToCopy); m->setRecipientState(RecipientStateEnum::kCloning); + m->startCopyingDocuments(getGlobalServiceContext()->getFastClockSource()->now()); auto remainingTime = 2 * timePerDocument * documentsToCopy; double maxAbsRelErr = 0; for (int64_t copied = 0; copied < documentsToCopy; ++copied) { @@ -557,8 +580,10 @@ TEST_F(ReshardingMetricsTest, EstimatedRemainingOperationTimeCloning) { TEST_F(ReshardingMetricsTest, EstimatedRemainingOperationTimeApplying) { // Perform N ops @ timePerOp. Check the progression of the estimated time remaining. auto m = getMetrics(); - m->onStart(); + m->onStart(getGlobalServiceContext()->getFastClockSource()->now()); m->setRecipientState(RecipientStateEnum::kApplying); + m->startApplyingOplogEntries(getGlobalServiceContext()->getFastClockSource()->now()); + // 1 extra millisecond here because otherwise an error of just 1ms will round this down to the // next second. auto timePerOp = Milliseconds(1001); @@ -589,21 +614,23 @@ TEST_F(ReshardingMetricsTest, EstimatedRemainingOperationTimeApplying) { TEST_F(ReshardingMetricsTest, CumulativeOpMetricsAccumulate) { auto constexpr kTag = "documentsCopied"; - getMetrics()->onStart(); + getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now()); const auto kDocumentsToCopy1 = 2; const auto kBytesToCopy1 = 200; getMetrics()->setRecipientState(RecipientStateEnum::kCloning); getMetrics()->onDocumentsCopied(kDocumentsToCopy1, kBytesToCopy1); - getMetrics()->onCompletion(ReshardingOperationStatusEnum::kFailure); + getMetrics()->onCompletion(ReshardingOperationStatusEnum::kFailure, + getGlobalServiceContext()->getFastClockSource()->now()); - getMetrics()->onStart(); + getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now()); const auto kDocumentsToCopy2 = 3; const auto kBytesToCopy2 = 400; getMetrics()->setRecipientState(RecipientStateEnum::kCloning); getMetrics()->onDocumentsCopied(kDocumentsToCopy2, kBytesToCopy2); - getMetrics()->onCompletion(ReshardingOperationStatusEnum::kFailure); + getMetrics()->onCompletion(ReshardingOperationStatusEnum::kFailure, + getGlobalServiceContext()->getFastClockSource()->now()); checkMetrics(kTag, kDocumentsToCopy1 + kDocumentsToCopy2, diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp index 0d44ab77f64..0d798bef5e5 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp @@ -155,7 +155,7 @@ public: _cm = createChunkManagerForOriginalColl(); _metrics = std::make_unique<ReshardingMetrics>(getServiceContext()); - _metrics->onStart(); + _metrics->onStart(getServiceContext()->getFastClockSource()->now()); _metrics->setRecipientState(RecipientStateEnum::kApplying); } diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp index a9e60df6ac1..6d66874a296 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp @@ -98,7 +98,7 @@ public: // Initialize ReshardingMetrics to a recipient state compatible with fetching. _metrics = std::make_unique<ReshardingMetrics>(_svcCtx); - _metrics->onStart(); + _metrics->onStart(_svcCtx->getFastClockSource()->now()); _metrics->setRecipientState(RecipientStateEnum::kCloning); for (const auto& shardId : kTwoShardIdList) { diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp index c3a15c40141..c3f3abf7ff8 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp @@ -77,6 +77,11 @@ namespace { const WriteConcernOptions kNoWaitWriteConcern{1, WriteConcernOptions::SyncMode::UNSET, Seconds(0)}; +Date_t getCurrentTime() { + const auto svcCtx = cc().getServiceContext(); + return svcCtx->getFastClockSource()->now(); +} + /** * Fulfills the promise if it is not already. Otherwise, does nothing. */ @@ -327,7 +332,7 @@ SemiFuture<void> ReshardingRecipientService::RecipientStateMachine::run( _cancelableOpCtxFactory.emplace(abortToken, _markKilledExecutor); return ExecutorFuture<void>(**executor) - .then([this] { _metrics()->onStart(); }) + .then([this] { _metrics()->onStart(getCurrentTime()); }) .then([this, executor, abortToken] { return _runUntilStrictConsistencyOrErrored(executor, abortToken); }) @@ -372,7 +377,8 @@ SemiFuture<void> ReshardingRecipientService::RecipientStateMachine::run( // Interrupt occured, ensure the metrics get shut down. // TODO SERVER-56500: Don't use ReshardingOperationStatusEnum::kCanceled here if it // is not meant for failover cases. - _metrics()->onCompletion(ReshardingOperationStatusEnum::kCanceled); + _metrics()->onCompletion(ReshardingOperationStatusEnum::kCanceled, + getCurrentTime()); } return status; @@ -479,7 +485,7 @@ void ReshardingRecipientService::RecipientStateMachine:: }); } - _transitionState(RecipientStateEnum::kCloning); + _transitionToCloning(); } std::unique_ptr<ReshardingDataReplicationInterface> @@ -554,7 +560,7 @@ ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplyin return future_util::withCancellation(_dataReplication->awaitCloningDone(), abortToken) .thenRunOn(**executor) - .then([this] { _transitionState(RecipientStateEnum::kApplying); }); + .then([this] { _transitionToApplying(); }); } ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine:: @@ -603,7 +609,7 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine:: ShardingCatalogClient::kLocalWriteConcern); } - _transitionState(RecipientStateEnum::kStrictConsistency); + _transitionToStrictConsistency(); }); } @@ -681,6 +687,30 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionToCreatingCol std::move(newRecipientCtx), std::move(cloneDetails), std::move(startConfigTxnCloneTime)); } +void ReshardingRecipientService::RecipientStateMachine::_transitionToCloning() { + auto newRecipientCtx = _recipientCtx; + newRecipientCtx.setState(RecipientStateEnum::kCloning); + _transitionState(std::move(newRecipientCtx), boost::none, boost::none); + _metrics()->startCopyingDocuments(getCurrentTime()); +} + +void ReshardingRecipientService::RecipientStateMachine::_transitionToApplying() { + auto newRecipientCtx = _recipientCtx; + newRecipientCtx.setState(RecipientStateEnum::kApplying); + _transitionState(std::move(newRecipientCtx), boost::none, boost::none); + auto currentTime = getCurrentTime(); + _metrics()->endCopyingDocuments(currentTime); + _metrics()->startApplyingOplogEntries(currentTime); +} + +void ReshardingRecipientService::RecipientStateMachine::_transitionToStrictConsistency() { + auto newRecipientCtx = _recipientCtx; + newRecipientCtx.setState(RecipientStateEnum::kStrictConsistency); + _transitionState(std::move(newRecipientCtx), boost::none, boost::none); + auto currentTime = getCurrentTime(); + _metrics()->endApplyingOplogEntries(currentTime); +} + void ReshardingRecipientService::RecipientStateMachine::_transitionToError(Status abortReason) { auto newRecipientCtx = _recipientCtx; newRecipientCtx.setState(RecipientStateEnum::kError); @@ -862,9 +892,11 @@ void ReshardingRecipientService::RecipientStateMachine::_removeRecipientDocument if (_abortReason) { _metrics()->onCompletion(ErrorCodes::isCancellationError(_abortReason.get()) ? ReshardingOperationStatusEnum::kCanceled - : ReshardingOperationStatusEnum::kFailure); + : ReshardingOperationStatusEnum::kFailure, + getCurrentTime()); } else { - _metrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess); + _metrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess, + getCurrentTime()); } _completionPromise.emplaceValue(); diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h index 6656a51add5..08f34aaa65f 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.h +++ b/src/mongo/db/s/resharding/resharding_recipient_service.h @@ -176,11 +176,16 @@ private: boost::optional<CloneDetails>&& cloneDetails, boost::optional<mongo::Date_t> configStartTime); - // Transitions the on-disk and in-memory state to RecipientStateEnum::kCreatingCollection. + // The following functions transition the on-disk and in-memory state to the named state. void _transitionToCreatingCollection(CloneDetails cloneDetails, boost::optional<mongo::Date_t> startConfigTxnCloneTime); - // Transitions the on-disk and in-memory state to RecipientStateEnum::kError. + void _transitionToCloning(); + + void _transitionToApplying(); + + void _transitionToStrictConsistency(); + void _transitionToError(Status abortReason); BSONObj _makeQueryForCoordinatorUpdate(const ShardId& shardId, RecipientStateEnum newState); diff --git a/src/mongo/db/s/resharding_test_commands.cpp b/src/mongo/db/s/resharding_test_commands.cpp index d636c499381..60fdbe5691f 100644 --- a/src/mongo/db/s/resharding_test_commands.cpp +++ b/src/mongo/db/s/resharding_test_commands.cpp @@ -78,7 +78,7 @@ public: }; ReshardingMetrics metrics(opCtx->getServiceContext()); - metrics.onStart(); + metrics.onStart(opCtx->getServiceContext()->getFastClockSource()->now()); metrics.setRecipientState(RecipientStateEnum::kCloning); auto hookList = std::make_unique<rpc::EgressMetadataHookList>(); |