summaryrefslogtreecommitdiff
path: root/src/mongo/db/s
diff options
context:
space:
mode:
authorBlake Oler <blake.oler@mongodb.com>2021-05-19 16:51:40 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-05-20 20:08:24 +0000
commitfe0042206cc3ea6a0792a956a876d5793a1c67c2 (patch)
tree2ef5996ee7c2a20348cdb959d2f7f211601b7233 /src/mongo/db/s
parenta05828b76b27b5198ca2917ba5f979458c084358 (diff)
downloadmongo-fe0042206cc3ea6a0792a956a876d5793a1c67c2.tar.gz
SERVER-56739 Rewrite resharding metrics duration component to allow for resuming from stepup
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r--src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp2
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.cpp14
-rw-r--r--src/mongo/db/s/resharding/resharding_metrics.cpp69
-rw-r--r--src/mongo/db/s/resharding/resharding_metrics.h24
-rw-r--r--src/mongo/db/s/resharding/resharding_metrics_test.cpp111
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp2
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp2
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.cpp46
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.h9
-rw-r--r--src/mongo/db/s/resharding_test_commands.cpp2
10 files changed, 190 insertions, 91 deletions
diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp b/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp
index 88e16c2202f..f105299e080 100644
--- a/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp
@@ -109,7 +109,7 @@ protected:
void setUp() override {
ServiceContextTest::setUp();
_metrics = std::make_unique<ReshardingMetrics>(getServiceContext());
- _metrics->onStart();
+ _metrics->onStart(getServiceContext()->getFastClockSource()->now());
_metrics->setRecipientState(RecipientStateEnum::kCloning);
}
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
index 8f428d62193..6b90cead55e 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
@@ -80,6 +80,11 @@ bool shouldStopAttemptingToCreateIndex(Status status, const CancellationToken& t
return status.isOK() || token.isCanceled();
}
+Date_t getCurrentTime() {
+ const auto svcCtx = cc().getServiceContext();
+ return svcCtx->getFastClockSource()->now();
+}
+
void assertNumDocsModifiedMatchesExpected(const BatchedCommandRequest& request,
const BSONObj& response,
int expected) {
@@ -1010,13 +1015,14 @@ void ReshardingCoordinatorService::ReshardingCoordinator::installCoordinatorDoc(
}
void markCompleted(const Status& status) {
+ auto currentTime = getCurrentTime();
auto metrics = ReshardingMetrics::get(cc().getServiceContext());
if (status.isOK())
- metrics->onCompletion(ReshardingOperationStatusEnum::kSuccess);
+ metrics->onCompletion(ReshardingOperationStatusEnum::kSuccess, currentTime);
else if (status == ErrorCodes::ReshardCollectionAborted)
- metrics->onCompletion(ReshardingOperationStatusEnum::kCanceled);
+ metrics->onCompletion(ReshardingOperationStatusEnum::kCanceled, currentTime);
else
- metrics->onCompletion(ReshardingOperationStatusEnum::kFailure);
+ metrics->onCompletion(ReshardingOperationStatusEnum::kFailure, currentTime);
}
BSONObj createFlushReshardingStateChangeCommand(const NamespaceString& nss) {
@@ -1304,7 +1310,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_insertCoordDocAndChan
_coordinatorDocWrittenPromise.emplaceValue();
// TODO SERVER-53914 to accommodate loading metrics for the coordinator.
- ReshardingMetrics::get(cc().getServiceContext())->onStart();
+ ReshardingMetrics::get(cc().getServiceContext())->onStart(getCurrentTime());
}
void ReshardingCoordinatorService::ReshardingCoordinator::
diff --git a/src/mongo/db/s/resharding/resharding_metrics.cpp b/src/mongo/db/s/resharding/resharding_metrics.cpp
index 01e447b2524..2b01ae0e77f 100644
--- a/src/mongo/db/s/resharding/resharding_metrics.cpp
+++ b/src/mongo/db/s/resharding/resharding_metrics.cpp
@@ -89,18 +89,19 @@ ReshardingMetrics* ReshardingMetrics::get(ServiceContext* ctx) noexcept {
return getMetrics(ctx).get();
}
-void ReshardingMetrics::onStart() noexcept {
+void ReshardingMetrics::onStart(Date_t runningOperationStartTime) noexcept {
stdx::lock_guard<Latch> lk(_mutex);
// TODO Re-add this invariant once all breaking test cases have been fixed.
// invariant(!_currentOp.has_value(), kAnotherOperationInProgress);
// Create a new operation and record the time it started.
_currentOp.emplace(_svcCtx->getFastClockSource());
- _currentOp->runningOperation.start();
+ _currentOp->runningOperation.start(runningOperationStartTime);
_currentOp->opStatus = ReshardingOperationStatusEnum::kRunning;
_started++;
}
-void ReshardingMetrics::onCompletion(ReshardingOperationStatusEnum status) noexcept {
+void ReshardingMetrics::onCompletion(ReshardingOperationStatusEnum status,
+ Date_t runningOperationEndTime) noexcept {
stdx::lock_guard<Latch> lk(_mutex);
// TODO Re-add this invariant once all breaking test cases have been fixed.
// invariant(_currentOp.has_value(), kNoOperationInProgress);
@@ -118,6 +119,8 @@ void ReshardingMetrics::onCompletion(ReshardingOperationStatusEnum status) noexc
MONGO_UNREACHABLE;
}
+ _currentOp->runningOperation.end(runningOperationEndTime);
+
// Reset current op metrics.
_currentOp = boost::none;
}
@@ -131,9 +134,9 @@ void ReshardingMetrics::onStepUp() noexcept {
// TODO SERVER-53914 Implement coordinator metrics rehydration.
// TODO SERVER-53912 Implement recipient metrics rehydration.
- // TODO SERVER-56739 Resume the runningOperation duration from a timestamp stored on disk
+ // TODO SERVER-57094 Resume the runningOperation duration from a timestamp stored on disk
// instead of starting from the current time.
- _currentOp->runningOperation.start();
+ _currentOp->runningOperation.start(_svcCtx->getFastClockSource()->now());
_currentOp->opStatus = ReshardingOperationStatusEnum::kRunning;
}
@@ -156,18 +159,6 @@ void ReshardingMetrics::setRecipientState(RecipientStateEnum state) noexcept {
const auto oldState = std::exchange(_currentOp->recipientState, state);
invariant(oldState != state);
-
- if (state == RecipientStateEnum::kCloning) {
- _currentOp->copyingDocuments.start();
- } else if (state == RecipientStateEnum::kApplying) {
- _currentOp->applyingOplogEntries.start();
- }
-
- if (oldState == RecipientStateEnum::kCloning) {
- _currentOp->copyingDocuments.end();
- } else if (oldState == RecipientStateEnum::kApplying) {
- _currentOp->applyingOplogEntries.end();
- }
}
void ReshardingMetrics::setCoordinatorState(CoordinatorStateEnum state) noexcept {
@@ -227,14 +218,34 @@ void ReshardingMetrics::onDocumentsCopied(int64_t documents, int64_t bytes) noex
_cumulativeOp.bytesCopied += bytes;
}
-void ReshardingMetrics::startInCriticalSection() {
+void ReshardingMetrics::startCopyingDocuments(Date_t start) {
+ stdx::lock_guard<Latch> lk(_mutex);
+ _currentOp->copyingDocuments.start(start);
+}
+
+void ReshardingMetrics::endCopyingDocuments(Date_t end) {
+ stdx::lock_guard<Latch> lk(_mutex);
+ _currentOp->copyingDocuments.forceEnd(end);
+}
+
+void ReshardingMetrics::startApplyingOplogEntries(Date_t start) {
stdx::lock_guard<Latch> lk(_mutex);
- _currentOp->inCriticalSection.start();
+ _currentOp->applyingOplogEntries.start(start);
}
-void ReshardingMetrics::endInCritcialSection() {
+void ReshardingMetrics::endApplyingOplogEntries(Date_t end) {
stdx::lock_guard<Latch> lk(_mutex);
- _currentOp->inCriticalSection.end();
+ _currentOp->applyingOplogEntries.forceEnd(end);
+}
+
+void ReshardingMetrics::startInCriticalSection(Date_t start) {
+ stdx::lock_guard<Latch> lk(_mutex);
+ _currentOp->inCriticalSection.start(start);
+}
+
+void ReshardingMetrics::endInCriticalSection(Date_t end) {
+ stdx::lock_guard<Latch> lk(_mutex);
+ _currentOp->inCriticalSection.forceEnd(end);
}
void ReshardingMetrics::onOplogEntriesFetched(int64_t entries) noexcept {
@@ -276,15 +287,23 @@ void ReshardingMetrics::onWriteDuringCriticalSection(int64_t writes) noexcept {
_cumulativeOp.writesDuringCriticalSection += writes;
}
-void ReshardingMetrics::OperationMetrics::TimeInterval::start() noexcept {
+void ReshardingMetrics::OperationMetrics::TimeInterval::start(Date_t start) noexcept {
invariant(!_start.has_value(), "Already started");
- _start.emplace(_clockSource->now());
+ _start.emplace(start);
}
-void ReshardingMetrics::OperationMetrics::TimeInterval::end() noexcept {
+void ReshardingMetrics::OperationMetrics::TimeInterval::end(Date_t end) noexcept {
invariant(_start.has_value(), "Not started");
invariant(!_end.has_value(), "Already stopped");
- _end.emplace(_clockSource->now());
+ _end.emplace(end);
+}
+
+void ReshardingMetrics::OperationMetrics::TimeInterval::forceEnd(Date_t end) noexcept {
+ if (!_start.has_value()) {
+ _start.emplace(end);
+ }
+
+ this->end(end);
}
Milliseconds ReshardingMetrics::OperationMetrics::TimeInterval::duration() const noexcept {
diff --git a/src/mongo/db/s/resharding/resharding_metrics.h b/src/mongo/db/s/resharding/resharding_metrics.h
index 4d703cec82b..aeb3ddf11e1 100644
--- a/src/mongo/db/s/resharding/resharding_metrics.h
+++ b/src/mongo/db/s/resharding/resharding_metrics.h
@@ -59,7 +59,7 @@ public:
// Marks the beginning of a resharding operation. Not that only one resharding operation may run
// at any time.
- void onStart() noexcept;
+ void onStart(Date_t runningOperationStartTime) noexcept;
// Marks the resumption of a resharding operation. Note that only one resharding operation may
// run at any time.
@@ -76,9 +76,15 @@ public:
// Allows updating metrics on "documents to copy" so long as the recipient is in cloning state.
void onDocumentsCopied(int64_t documents, int64_t bytes) noexcept;
- // Starts/ends the timer recording the time spent in the critical section.
- void startInCriticalSection();
- void endInCritcialSection();
+ // Starts/ends the timers recording the times spend in the named sections.
+ void startCopyingDocuments(Date_t start);
+ void endCopyingDocuments(Date_t end);
+
+ void startApplyingOplogEntries(Date_t start);
+ void endApplyingOplogEntries(Date_t end);
+
+ void startInCriticalSection(Date_t start);
+ void endInCriticalSection(Date_t end);
// Allows updating "oplog entries to apply" metrics when the recipient is in applying state.
void onOplogEntriesFetched(int64_t entries) noexcept;
@@ -94,7 +100,8 @@ public:
// Marks the completion of the current (active) resharding operation. Aborts the process if no
// resharding operation is in progress.
- void onCompletion(ReshardingOperationStatusEnum) noexcept;
+ void onCompletion(ReshardingOperationStatusEnum status,
+ Date_t runningOperationEndTime) noexcept;
struct ReporterOptions {
enum class Role { kDonor, kRecipient, kCoordinator };
@@ -145,9 +152,12 @@ private:
public:
explicit TimeInterval(ClockSource* clockSource) : _clockSource(clockSource) {}
- void start() noexcept;
+ void start(Date_t start) noexcept;
+
+ void end(Date_t end) noexcept;
- void end() noexcept;
+ // TODO Remove this function once all metrics classes can start from stepup.
+ void forceEnd(Date_t end) noexcept;
Milliseconds duration() const noexcept;
diff --git a/src/mongo/db/s/resharding/resharding_metrics_test.cpp b/src/mongo/db/s/resharding/resharding_metrics_test.cpp
index 4927109c79a..dd1a131501d 100644
--- a/src/mongo/db/s/resharding/resharding_metrics_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_metrics_test.cpp
@@ -119,37 +119,41 @@ private:
// TODO Re-enable once underlying invariants are re-enabled
/*
DEATH_TEST_F(ReshardingMetricsTest, RunOnCompletionBeforeOnStart, "No operation is in progress") {
- getMetrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess);
+ getMetrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess,
+getGlobalServiceContext()->getFastClockSource()->now());
}
DEATH_TEST_F(ReshardingMetricsTest,
RunOnStepUpAfterOnStartInvariants,
"Another operation is in progress") {
- getMetrics()->onStart();
+ getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
getMetrics()->onStepUp();
}
DEATH_TEST_F(ReshardingMetricsTest,
RunOnCompletionAfterOnStepDownInvariants,
"No operation is in progress") {
- getMetrics()->onStart();
+ getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
getMetrics()->onStepDown();
- getMetrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess);
+ getMetrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess,
+getGlobalServiceContext()->getFastClockSource()->now());
}
*/
TEST_F(ReshardingMetricsTest, RunOnStepDownAfterOnCompletionIsSafe) {
- getMetrics()->onStart();
- getMetrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess);
+ getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
+ getMetrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess,
+ getGlobalServiceContext()->getFastClockSource()->now());
getMetrics()->onStepDown();
}
TEST_F(ReshardingMetricsTest, OperationStatus) {
- getMetrics()->onStart();
+ getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
const auto report = getReport(OpReportType::CurrentOpReportCoordinatorRole);
ASSERT_EQ(report.getStringField("opStatus"),
ReshardingOperationStatus_serializer(ReshardingOperationStatusEnum::kRunning));
- getMetrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess);
+ getMetrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess,
+ getGlobalServiceContext()->getFastClockSource()->now());
}
TEST_F(ReshardingMetricsTest, TestOperationStatus) {
@@ -158,18 +162,21 @@ TEST_F(ReshardingMetricsTest, TestOperationStatus) {
const auto kNumCanceledOps = 7;
for (auto i = 0; i < kNumSuccessfulOps; i++) {
- getMetrics()->onStart();
- getMetrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess);
+ getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
+ getMetrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess,
+ getGlobalServiceContext()->getFastClockSource()->now());
}
for (auto i = 0; i < kNumFailedOps; i++) {
- getMetrics()->onStart();
- getMetrics()->onCompletion(ReshardingOperationStatusEnum::kFailure);
+ getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
+ getMetrics()->onCompletion(ReshardingOperationStatusEnum::kFailure,
+ getGlobalServiceContext()->getFastClockSource()->now());
}
for (auto i = 0; i < kNumCanceledOps; i++) {
- getMetrics()->onStart();
- getMetrics()->onCompletion(ReshardingOperationStatusEnum::kCanceled);
+ getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
+ getMetrics()->onCompletion(ReshardingOperationStatusEnum::kCanceled,
+ getGlobalServiceContext()->getFastClockSource()->now());
}
checkMetrics("countReshardingSuccessful", kNumSuccessfulOps, OpReportType::CumulativeReport);
@@ -178,19 +185,19 @@ TEST_F(ReshardingMetricsTest, TestOperationStatus) {
const auto total = kNumSuccessfulOps + kNumFailedOps + kNumCanceledOps;
checkMetrics("countReshardingOperations", total, OpReportType::CumulativeReport);
- getMetrics()->onStart();
+ getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
checkMetrics("countReshardingOperations", total + 1, OpReportType::CumulativeReport);
}
TEST_F(ReshardingMetricsTest, TestElapsedTime) {
- getMetrics()->onStart();
+ getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
const auto elapsedTime = 1;
advanceTime(Seconds(elapsedTime));
checkMetrics("totalOperationTimeElapsed", elapsedTime, OpReportType::CurrentOpReportDonorRole);
}
TEST_F(ReshardingMetricsTest, TestDonorAndRecipientMetrics) {
- getMetrics()->onStart();
+ getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
const auto elapsedTime = 1;
advanceTime(Seconds(elapsedTime));
@@ -198,7 +205,7 @@ TEST_F(ReshardingMetricsTest, TestDonorAndRecipientMetrics) {
// Update metrics for donor
const auto kWritesDuringCriticalSection = 7;
getMetrics()->setDonorState(DonorStateEnum::kDonatingOplogEntries);
- getMetrics()->startInCriticalSection();
+ getMetrics()->startInCriticalSection(getGlobalServiceContext()->getFastClockSource()->now());
getMetrics()->onWriteDuringCriticalSection(kWritesDuringCriticalSection);
advanceTime(Seconds(elapsedTime));
@@ -209,13 +216,15 @@ TEST_F(ReshardingMetricsTest, TestDonorAndRecipientMetrics) {
getMetrics()->setRecipientState(RecipientStateEnum::kCreatingCollection);
getMetrics()->setDocumentsToCopy(kDocumentsToCopy, kBytesToCopy);
getMetrics()->setRecipientState(RecipientStateEnum::kCloning);
+ getMetrics()->startCopyingDocuments(getGlobalServiceContext()->getFastClockSource()->now());
getMetrics()->onDocumentsCopied(kDocumentsToCopy * kCopyProgress / 100,
kBytesToCopy * kCopyProgress / 100);
advanceTime(Seconds(elapsedTime));
const auto currentDonorOpReport = getReport(OpReportType::CurrentOpReportDonorRole);
const auto currentRecipientOpReport = getReport(OpReportType::CurrentOpReportRecipientRole);
- getMetrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess);
+ getMetrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess,
+ getGlobalServiceContext()->getFastClockSource()->now());
checkMetrics(currentRecipientOpReport, "totalCopyTimeElapsed", elapsedTime);
checkMetrics(currentRecipientOpReport, "bytesCopied", kBytesToCopy * kCopyProgress / 100);
@@ -242,13 +251,15 @@ TEST_F(ReshardingMetricsTest, TestDonorAndRecipientMetrics) {
TEST_F(ReshardingMetricsTest, CumulativeOpMetricsAreRetainedAfterCompletion) {
auto constexpr kTag = "documentsCopied";
- getMetrics()->onStart();
+ getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
const auto kDocumentsToCopy = 2;
const auto kBytesToCopy = 200;
getMetrics()->setRecipientState(RecipientStateEnum::kCloning);
+ getMetrics()->startCopyingDocuments(getGlobalServiceContext()->getFastClockSource()->now());
getMetrics()->onDocumentsCopied(kDocumentsToCopy, kBytesToCopy);
advanceTime();
- getMetrics()->onCompletion(ReshardingOperationStatusEnum::kFailure);
+ getMetrics()->onCompletion(ReshardingOperationStatusEnum::kFailure,
+ getGlobalServiceContext()->getFastClockSource()->now());
advanceTime();
checkMetrics(kTag,
@@ -256,20 +267,22 @@ TEST_F(ReshardingMetricsTest, CumulativeOpMetricsAreRetainedAfterCompletion) {
"Cumulative metrics are not retained",
OpReportType::CumulativeReport);
- getMetrics()->onStart();
+ getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
checkMetrics(
kTag, kDocumentsToCopy, "Cumulative metrics are reset", OpReportType::CumulativeReport);
}
TEST_F(ReshardingMetricsTest, CumulativeOpMetricsAreRetainedAfterCancellation) {
auto constexpr kTag = "documentsCopied";
- getMetrics()->onStart();
+ getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
const auto kDocumentsToCopy = 2;
const auto kBytesToCopy = 200;
getMetrics()->setRecipientState(RecipientStateEnum::kCloning);
+ getMetrics()->startCopyingDocuments(getGlobalServiceContext()->getFastClockSource()->now());
getMetrics()->onDocumentsCopied(kDocumentsToCopy, kBytesToCopy);
advanceTime();
- getMetrics()->onCompletion(ReshardingOperationStatusEnum::kCanceled);
+ getMetrics()->onCompletion(ReshardingOperationStatusEnum::kCanceled,
+ getGlobalServiceContext()->getFastClockSource()->now());
advanceTime();
checkMetrics(kTag,
@@ -277,44 +290,48 @@ TEST_F(ReshardingMetricsTest, CumulativeOpMetricsAreRetainedAfterCancellation) {
"Cumulative metrics are not retained",
OpReportType::CumulativeReport);
- getMetrics()->onStart();
+ getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
checkMetrics(
kTag, kDocumentsToCopy, "Cumulative metrics are reset", OpReportType::CumulativeReport);
}
TEST_F(ReshardingMetricsTest, CurrentOpMetricsAreResetAfterCompletion) {
auto constexpr kTag = "documentsCopied";
- getMetrics()->onStart();
+ getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
const auto kDocumentsToCopy = 2;
const auto kBytesToCopy = 200;
getMetrics()->setRecipientState(RecipientStateEnum::kCloning);
+ getMetrics()->startCopyingDocuments(getGlobalServiceContext()->getFastClockSource()->now());
getMetrics()->onDocumentsCopied(kDocumentsToCopy, kBytesToCopy);
checkMetrics(kTag,
kDocumentsToCopy,
"Current metrics are not set",
OpReportType::CurrentOpReportRecipientRole);
advanceTime();
- getMetrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess);
+ getMetrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess,
+ getGlobalServiceContext()->getFastClockSource()->now());
advanceTime();
- getMetrics()->onStart();
+ getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
checkMetrics(
kTag, 0, "Current metrics are not reset", OpReportType::CurrentOpReportRecipientRole);
}
TEST_F(ReshardingMetricsTest, CurrentOpMetricsAreNotRetainedAfterCompletion) {
auto constexpr kTag = "documentsCopied";
- getMetrics()->onStart();
+ getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
const auto kDocumentsToCopy = 2;
const auto kBytesToCopy = 200;
getMetrics()->setRecipientState(RecipientStateEnum::kCloning);
+ getMetrics()->startCopyingDocuments(getGlobalServiceContext()->getFastClockSource()->now());
getMetrics()->onDocumentsCopied(kDocumentsToCopy, kBytesToCopy);
checkMetrics(kTag,
kDocumentsToCopy,
"Current metrics are not set",
OpReportType::CurrentOpReportRecipientRole);
advanceTime();
- getMetrics()->onCompletion(ReshardingOperationStatusEnum::kFailure);
+ getMetrics()->onCompletion(ReshardingOperationStatusEnum::kFailure,
+ getGlobalServiceContext()->getFastClockSource()->now());
advanceTime();
ASSERT_FALSE(getReport(OpReportType::CurrentOpReportRecipientRole)[kTag].ok());
@@ -322,10 +339,11 @@ TEST_F(ReshardingMetricsTest, CurrentOpMetricsAreNotRetainedAfterCompletion) {
TEST_F(ReshardingMetricsTest, CurrentOpMetricsAreNotRetainedAfterStepDown) {
auto constexpr kTag = "documentsCopied";
- getMetrics()->onStart();
+ getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
const auto kDocumentsToCopy = 2;
const auto kBytesToCopy = 200;
getMetrics()->setRecipientState(RecipientStateEnum::kCloning);
+ getMetrics()->startCopyingDocuments(getGlobalServiceContext()->getFastClockSource()->now());
getMetrics()->onDocumentsCopied(kDocumentsToCopy, kBytesToCopy);
checkMetrics(kTag,
kDocumentsToCopy,
@@ -342,7 +360,7 @@ TEST_F(ReshardingMetricsTest, EstimatedRemainingOperationTime) {
auto constexpr kTag = "remainingOperationTimeEstimated";
const auto elapsedTime = 1;
- getMetrics()->onStart();
+ getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
checkMetrics(kTag, -1, OpReportType::CurrentOpReportDonorRole);
const auto kDocumentsToCopy = 2;
@@ -350,6 +368,7 @@ TEST_F(ReshardingMetricsTest, EstimatedRemainingOperationTime) {
getMetrics()->setRecipientState(RecipientStateEnum::kCreatingCollection);
getMetrics()->setDocumentsToCopy(kDocumentsToCopy, kBytesToCopy);
getMetrics()->setRecipientState(RecipientStateEnum::kCloning);
+ getMetrics()->startCopyingDocuments(getGlobalServiceContext()->getFastClockSource()->now());
getMetrics()->onDocumentsCopied(kDocumentsToCopy / 2, kBytesToCopy / 2);
advanceTime(Seconds(elapsedTime));
// Since 50% of the data is copied, the remaining copy time equals the elapsed copy time, which
@@ -359,6 +378,8 @@ TEST_F(ReshardingMetricsTest, EstimatedRemainingOperationTime) {
const auto kOplogEntriesFetched = 4;
const auto kOplogEntriesApplied = 2;
getMetrics()->setRecipientState(RecipientStateEnum::kApplying);
+ getMetrics()->endCopyingDocuments(getGlobalServiceContext()->getFastClockSource()->now());
+ getMetrics()->startApplyingOplogEntries(getGlobalServiceContext()->getFastClockSource()->now());
getMetrics()->onOplogEntriesFetched(kOplogEntriesFetched);
getMetrics()->onOplogEntriesApplied(kOplogEntriesApplied);
advanceTime(Seconds(elapsedTime));
@@ -370,10 +391,10 @@ TEST_F(ReshardingMetricsTest, EstimatedRemainingOperationTime) {
TEST_F(ReshardingMetricsTest, CurrentOpReportForDonor) {
const auto kDonorState = DonorStateEnum::kDonatingOplogEntries;
- getMetrics()->onStart();
+ getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
advanceTime(Seconds(2));
getMetrics()->setDonorState(kDonorState);
- getMetrics()->startInCriticalSection();
+ getMetrics()->startInCriticalSection(getGlobalServiceContext()->getFastClockSource()->now());
advanceTime(Seconds(3));
const ReshardingMetrics::ReporterOptions options(
@@ -420,13 +441,14 @@ TEST_F(ReshardingMetricsTest, CurrentOpReportForRecipient) {
static_assert(kBytesToCopy >= kBytesCopied);
constexpr auto kDelayBeforeCloning = Seconds(2);
- getMetrics()->onStart();
+ getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
advanceTime(kDelayBeforeCloning);
constexpr auto kTimeSpentCloning = Seconds(3);
getMetrics()->setRecipientState(RecipientStateEnum::kCreatingCollection);
getMetrics()->setDocumentsToCopy(kDocumentsToCopy, kBytesToCopy);
getMetrics()->setRecipientState(kRecipientState);
+ getMetrics()->startCopyingDocuments(getGlobalServiceContext()->getFastClockSource()->now());
advanceTime(kTimeSpentCloning);
getMetrics()->onDocumentsCopied(kDocumentsCopied, kBytesCopied);
@@ -484,7 +506,7 @@ TEST_F(ReshardingMetricsTest, CurrentOpReportForCoordinator) {
const auto kCoordinatorState = CoordinatorStateEnum::kInitializing;
const auto kSomeDuration = Seconds(10);
- getMetrics()->onStart();
+ getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
getMetrics()->setCoordinatorState(kCoordinatorState);
advanceTime(kSomeDuration);
@@ -522,7 +544,7 @@ TEST_F(ReshardingMetricsTest, CurrentOpReportForCoordinator) {
TEST_F(ReshardingMetricsTest, EstimatedRemainingOperationTimeCloning) {
// Copy N docs @ timePerDoc. Check the progression of the estimated time remaining.
auto m = getMetrics();
- m->onStart();
+ m->onStart(getGlobalServiceContext()->getFastClockSource()->now());
auto timePerDocument = Seconds(2);
int64_t bytesPerDocument = 1024;
int64_t documentsToCopy = 409;
@@ -530,6 +552,7 @@ TEST_F(ReshardingMetricsTest, EstimatedRemainingOperationTimeCloning) {
m->setRecipientState(RecipientStateEnum::kCreatingCollection);
m->setDocumentsToCopy(documentsToCopy, bytesToCopy);
m->setRecipientState(RecipientStateEnum::kCloning);
+ m->startCopyingDocuments(getGlobalServiceContext()->getFastClockSource()->now());
auto remainingTime = 2 * timePerDocument * documentsToCopy;
double maxAbsRelErr = 0;
for (int64_t copied = 0; copied < documentsToCopy; ++copied) {
@@ -557,8 +580,10 @@ TEST_F(ReshardingMetricsTest, EstimatedRemainingOperationTimeCloning) {
TEST_F(ReshardingMetricsTest, EstimatedRemainingOperationTimeApplying) {
// Perform N ops @ timePerOp. Check the progression of the estimated time remaining.
auto m = getMetrics();
- m->onStart();
+ m->onStart(getGlobalServiceContext()->getFastClockSource()->now());
m->setRecipientState(RecipientStateEnum::kApplying);
+ m->startApplyingOplogEntries(getGlobalServiceContext()->getFastClockSource()->now());
+
// 1 extra millisecond here because otherwise an error of just 1ms will round this down to the
// next second.
auto timePerOp = Milliseconds(1001);
@@ -589,21 +614,23 @@ TEST_F(ReshardingMetricsTest, EstimatedRemainingOperationTimeApplying) {
TEST_F(ReshardingMetricsTest, CumulativeOpMetricsAccumulate) {
auto constexpr kTag = "documentsCopied";
- getMetrics()->onStart();
+ getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
const auto kDocumentsToCopy1 = 2;
const auto kBytesToCopy1 = 200;
getMetrics()->setRecipientState(RecipientStateEnum::kCloning);
getMetrics()->onDocumentsCopied(kDocumentsToCopy1, kBytesToCopy1);
- getMetrics()->onCompletion(ReshardingOperationStatusEnum::kFailure);
+ getMetrics()->onCompletion(ReshardingOperationStatusEnum::kFailure,
+ getGlobalServiceContext()->getFastClockSource()->now());
- getMetrics()->onStart();
+ getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
const auto kDocumentsToCopy2 = 3;
const auto kBytesToCopy2 = 400;
getMetrics()->setRecipientState(RecipientStateEnum::kCloning);
getMetrics()->onDocumentsCopied(kDocumentsToCopy2, kBytesToCopy2);
- getMetrics()->onCompletion(ReshardingOperationStatusEnum::kFailure);
+ getMetrics()->onCompletion(ReshardingOperationStatusEnum::kFailure,
+ getGlobalServiceContext()->getFastClockSource()->now());
checkMetrics(kTag,
kDocumentsToCopy1 + kDocumentsToCopy2,
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp
index 0d44ab77f64..0d798bef5e5 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp
@@ -155,7 +155,7 @@ public:
_cm = createChunkManagerForOriginalColl();
_metrics = std::make_unique<ReshardingMetrics>(getServiceContext());
- _metrics->onStart();
+ _metrics->onStart(getServiceContext()->getFastClockSource()->now());
_metrics->setRecipientState(RecipientStateEnum::kApplying);
}
diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp
index a9e60df6ac1..6d66874a296 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp
@@ -98,7 +98,7 @@ public:
// Initialize ReshardingMetrics to a recipient state compatible with fetching.
_metrics = std::make_unique<ReshardingMetrics>(_svcCtx);
- _metrics->onStart();
+ _metrics->onStart(_svcCtx->getFastClockSource()->now());
_metrics->setRecipientState(RecipientStateEnum::kCloning);
for (const auto& shardId : kTwoShardIdList) {
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
index c3a15c40141..c3f3abf7ff8 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
@@ -77,6 +77,11 @@ namespace {
const WriteConcernOptions kNoWaitWriteConcern{1, WriteConcernOptions::SyncMode::UNSET, Seconds(0)};
+Date_t getCurrentTime() {
+ const auto svcCtx = cc().getServiceContext();
+ return svcCtx->getFastClockSource()->now();
+}
+
/**
* Fulfills the promise if it is not already. Otherwise, does nothing.
*/
@@ -327,7 +332,7 @@ SemiFuture<void> ReshardingRecipientService::RecipientStateMachine::run(
_cancelableOpCtxFactory.emplace(abortToken, _markKilledExecutor);
return ExecutorFuture<void>(**executor)
- .then([this] { _metrics()->onStart(); })
+ .then([this] { _metrics()->onStart(getCurrentTime()); })
.then([this, executor, abortToken] {
return _runUntilStrictConsistencyOrErrored(executor, abortToken);
})
@@ -372,7 +377,8 @@ SemiFuture<void> ReshardingRecipientService::RecipientStateMachine::run(
// Interrupt occured, ensure the metrics get shut down.
// TODO SERVER-56500: Don't use ReshardingOperationStatusEnum::kCanceled here if it
// is not meant for failover cases.
- _metrics()->onCompletion(ReshardingOperationStatusEnum::kCanceled);
+ _metrics()->onCompletion(ReshardingOperationStatusEnum::kCanceled,
+ getCurrentTime());
}
return status;
@@ -479,7 +485,7 @@ void ReshardingRecipientService::RecipientStateMachine::
});
}
- _transitionState(RecipientStateEnum::kCloning);
+ _transitionToCloning();
}
std::unique_ptr<ReshardingDataReplicationInterface>
@@ -554,7 +560,7 @@ ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplyin
return future_util::withCancellation(_dataReplication->awaitCloningDone(), abortToken)
.thenRunOn(**executor)
- .then([this] { _transitionState(RecipientStateEnum::kApplying); });
+ .then([this] { _transitionToApplying(); });
}
ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
@@ -603,7 +609,7 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
ShardingCatalogClient::kLocalWriteConcern);
}
- _transitionState(RecipientStateEnum::kStrictConsistency);
+ _transitionToStrictConsistency();
});
}
@@ -681,6 +687,30 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionToCreatingCol
std::move(newRecipientCtx), std::move(cloneDetails), std::move(startConfigTxnCloneTime));
}
+void ReshardingRecipientService::RecipientStateMachine::_transitionToCloning() {
+ auto newRecipientCtx = _recipientCtx;
+ newRecipientCtx.setState(RecipientStateEnum::kCloning);
+ _transitionState(std::move(newRecipientCtx), boost::none, boost::none);
+ _metrics()->startCopyingDocuments(getCurrentTime());
+}
+
+void ReshardingRecipientService::RecipientStateMachine::_transitionToApplying() {
+ auto newRecipientCtx = _recipientCtx;
+ newRecipientCtx.setState(RecipientStateEnum::kApplying);
+ _transitionState(std::move(newRecipientCtx), boost::none, boost::none);
+ auto currentTime = getCurrentTime();
+ _metrics()->endCopyingDocuments(currentTime);
+ _metrics()->startApplyingOplogEntries(currentTime);
+}
+
+void ReshardingRecipientService::RecipientStateMachine::_transitionToStrictConsistency() {
+ auto newRecipientCtx = _recipientCtx;
+ newRecipientCtx.setState(RecipientStateEnum::kStrictConsistency);
+ _transitionState(std::move(newRecipientCtx), boost::none, boost::none);
+ auto currentTime = getCurrentTime();
+ _metrics()->endApplyingOplogEntries(currentTime);
+}
+
void ReshardingRecipientService::RecipientStateMachine::_transitionToError(Status abortReason) {
auto newRecipientCtx = _recipientCtx;
newRecipientCtx.setState(RecipientStateEnum::kError);
@@ -862,9 +892,11 @@ void ReshardingRecipientService::RecipientStateMachine::_removeRecipientDocument
if (_abortReason) {
_metrics()->onCompletion(ErrorCodes::isCancellationError(_abortReason.get())
? ReshardingOperationStatusEnum::kCanceled
- : ReshardingOperationStatusEnum::kFailure);
+ : ReshardingOperationStatusEnum::kFailure,
+ getCurrentTime());
} else {
- _metrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess);
+ _metrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess,
+ getCurrentTime());
}
_completionPromise.emplaceValue();
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h
index 6656a51add5..08f34aaa65f 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.h
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.h
@@ -176,11 +176,16 @@ private:
boost::optional<CloneDetails>&& cloneDetails,
boost::optional<mongo::Date_t> configStartTime);
- // Transitions the on-disk and in-memory state to RecipientStateEnum::kCreatingCollection.
+ // The following functions transition the on-disk and in-memory state to the named state.
void _transitionToCreatingCollection(CloneDetails cloneDetails,
boost::optional<mongo::Date_t> startConfigTxnCloneTime);
- // Transitions the on-disk and in-memory state to RecipientStateEnum::kError.
+ void _transitionToCloning();
+
+ void _transitionToApplying();
+
+ void _transitionToStrictConsistency();
+
void _transitionToError(Status abortReason);
BSONObj _makeQueryForCoordinatorUpdate(const ShardId& shardId, RecipientStateEnum newState);
diff --git a/src/mongo/db/s/resharding_test_commands.cpp b/src/mongo/db/s/resharding_test_commands.cpp
index d636c499381..60fdbe5691f 100644
--- a/src/mongo/db/s/resharding_test_commands.cpp
+++ b/src/mongo/db/s/resharding_test_commands.cpp
@@ -78,7 +78,7 @@ public:
};
ReshardingMetrics metrics(opCtx->getServiceContext());
- metrics.onStart();
+ metrics.onStart(opCtx->getServiceContext()->getFastClockSource()->now());
metrics.setRecipientState(RecipientStateEnum::kCloning);
auto hookList = std::make_unique<rpc::EgressMetadataHookList>();