summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/s/resharding/resharding_metrics.cpp147
-rw-r--r--src/mongo/db/s/resharding/resharding_metrics.h30
-rw-r--r--src/mongo/db/s/resharding/resharding_metrics_test.cpp224
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp3
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp3
-rw-r--r--src/mongo/db/s/sharding_server_status.cpp3
6 files changed, 246 insertions, 164 deletions
diff --git a/src/mongo/db/s/resharding/resharding_metrics.cpp b/src/mongo/db/s/resharding/resharding_metrics.cpp
index 000cb691264..cc5cb36ac24 100644
--- a/src/mongo/db/s/resharding/resharding_metrics.cpp
+++ b/src/mongo/db/s/resharding/resharding_metrics.cpp
@@ -58,7 +58,7 @@ constexpr auto kCriticalSectionTimeElapsed = "totalCriticalSectionTimeElapsed";
constexpr auto kCoordinatorState = "coordinatorState";
constexpr auto kDonorState = "donorState";
constexpr auto kRecipientState = "recipientState";
-constexpr auto kCompletionStatus = "opStatus";
+constexpr auto kOpStatus = "opStatus";
using MetricsPtr = std::unique_ptr<ReshardingMetrics>;
@@ -89,8 +89,7 @@ ReshardingMetrics* ReshardingMetrics::get(ServiceContext* ctx) noexcept {
void ReshardingMetrics::onStart() noexcept {
stdx::lock_guard<Latch> lk(_mutex);
- invariant(!_currentOp.has_value() || _currentOp->isCompleted(), kAnotherOperationInProgress);
-
+ invariant(!_currentOp.has_value(), kAnotherOperationInProgress);
// Create a new operation and record the time it started.
_currentOp.emplace(_svcCtx->getFastClockSource());
_currentOp->runningOperation.start();
@@ -100,7 +99,7 @@ void ReshardingMetrics::onStart() noexcept {
void ReshardingMetrics::onCompletion(ReshardingOperationStatusEnum status) noexcept {
stdx::lock_guard<Latch> lk(_mutex);
- invariant(_currentOp.has_value() && !_currentOp->isCompleted(), kNoOperationInProgress);
+ invariant(_currentOp.has_value(), kNoOperationInProgress);
switch (status) {
case ReshardingOperationStatusEnum::kSuccess:
_succeeded++;
@@ -115,17 +114,13 @@ void ReshardingMetrics::onCompletion(ReshardingOperationStatusEnum status) noexc
MONGO_UNREACHABLE;
}
- // Mark the active operation as completed and ensure all timers are stopped.
- _currentOp->runningOperation.end();
- _currentOp->copyingDocuments.tryEnd();
- _currentOp->applyingOplogEntries.tryEnd();
- _currentOp->inCriticalSection.tryEnd();
- _currentOp->opStatus = status;
+ // Reset current op metrics.
+ _currentOp = boost::none;
}
void ReshardingMetrics::setDonorState(DonorStateEnum state) noexcept {
stdx::lock_guard<Latch> lk(_mutex);
- invariant(_currentOp.has_value() && !_currentOp->isCompleted(), kNoOperationInProgress);
+ invariant(_currentOp.has_value(), kNoOperationInProgress);
const auto oldState = std::exchange(_currentOp->donorState, state);
invariant(oldState != state);
@@ -141,7 +136,7 @@ void ReshardingMetrics::setDonorState(DonorStateEnum state) noexcept {
void ReshardingMetrics::setRecipientState(RecipientStateEnum state) noexcept {
stdx::lock_guard<Latch> lk(_mutex);
- invariant(_currentOp.has_value() && !_currentOp->isCompleted(), kNoOperationInProgress);
+ invariant(_currentOp.has_value(), kNoOperationInProgress);
const auto oldState = std::exchange(_currentOp->recipientState, state);
invariant(oldState != state);
@@ -161,13 +156,13 @@ void ReshardingMetrics::setRecipientState(RecipientStateEnum state) noexcept {
void ReshardingMetrics::setCoordinatorState(CoordinatorStateEnum state) noexcept {
stdx::lock_guard<Latch> lk(_mutex);
- invariant(_currentOp.has_value() && !_currentOp->isCompleted(), kNoOperationInProgress);
+ invariant(_currentOp.has_value(), kNoOperationInProgress);
_currentOp->coordinatorState = state;
}
void ReshardingMetrics::setDocumentsToCopy(int64_t documents, int64_t bytes) noexcept {
stdx::lock_guard<Latch> lk(_mutex);
- invariant(_currentOp.has_value() && !_currentOp->isCompleted(), kNoOperationInProgress);
+ invariant(_currentOp.has_value(), kNoOperationInProgress);
_currentOp->documentsToCopy = documents;
_currentOp->bytesToCopy = bytes;
@@ -175,36 +170,41 @@ void ReshardingMetrics::setDocumentsToCopy(int64_t documents, int64_t bytes) noe
void ReshardingMetrics::onDocumentsCopied(int64_t documents, int64_t bytes) noexcept {
stdx::lock_guard<Latch> lk(_mutex);
- invariant(_currentOp.has_value() && !_currentOp->isCompleted(), kNoOperationInProgress);
+ invariant(_currentOp.has_value(), kNoOperationInProgress);
invariant(_currentOp->recipientState == RecipientStateEnum::kCloning);
_currentOp->documentsCopied += documents;
_currentOp->bytesCopied += bytes;
+ _cumulativeOp.documentsCopied += documents;
+ _cumulativeOp.bytesCopied += bytes;
}
void ReshardingMetrics::onOplogEntriesFetched(int64_t entries) noexcept {
stdx::lock_guard<Latch> lk(_mutex);
- invariant(_currentOp.has_value() && !_currentOp->isCompleted(), kNoOperationInProgress);
+ invariant(_currentOp.has_value(), kNoOperationInProgress);
invariant(_currentOp->recipientState == RecipientStateEnum::kCloning ||
_currentOp->recipientState == RecipientStateEnum::kApplying ||
_currentOp->recipientState == RecipientStateEnum::kSteadyState);
_currentOp->oplogEntriesFetched += entries;
+ _cumulativeOp.oplogEntriesFetched += entries;
}
void ReshardingMetrics::onOplogEntriesApplied(int64_t entries) noexcept {
stdx::lock_guard<Latch> lk(_mutex);
- invariant(_currentOp.has_value() && !_currentOp->isCompleted(), kNoOperationInProgress);
+ invariant(_currentOp.has_value(), kNoOperationInProgress);
invariant(_currentOp->recipientState == RecipientStateEnum::kApplying ||
_currentOp->recipientState == RecipientStateEnum::kSteadyState);
_currentOp->oplogEntriesApplied += entries;
+ _cumulativeOp.oplogEntriesApplied += entries;
}
void ReshardingMetrics::onWriteDuringCriticalSection(int64_t writes) noexcept {
stdx::lock_guard<Latch> lk(_mutex);
- invariant(_currentOp.has_value() && !_currentOp->isCompleted(), kNoOperationInProgress);
+ invariant(_currentOp.has_value(), kNoOperationInProgress);
invariant(_currentOp->donorState == DonorStateEnum::kPreparingToBlockWrites ||
_currentOp->donorState == DonorStateEnum::kBlockingWrites);
_currentOp->writesDuringCriticalSection += writes;
+ _cumulativeOp.writesDuringCriticalSection += writes;
}
void ReshardingMetrics::OperationMetrics::TimeInterval::start() noexcept {
@@ -212,18 +212,10 @@ void ReshardingMetrics::OperationMetrics::TimeInterval::start() noexcept {
_start.emplace(_clockSource->now());
}
-void ReshardingMetrics::OperationMetrics::TimeInterval::tryEnd() noexcept {
- if (!_start.has_value())
- return;
- if (_end.has_value())
- return;
- _end.emplace(_clockSource->now());
-}
-
void ReshardingMetrics::OperationMetrics::TimeInterval::end() noexcept {
invariant(_start.has_value(), "Not started");
invariant(!_end.has_value(), "Already stopped");
- tryEnd();
+ _end.emplace(_clockSource->now());
}
Milliseconds ReshardingMetrics::OperationMetrics::TimeInterval::duration() const noexcept {
@@ -234,12 +226,10 @@ Milliseconds ReshardingMetrics::OperationMetrics::TimeInterval::duration() const
return duration_cast<Milliseconds>(_end.value() - _start.value());
}
-void ReshardingMetrics::OperationMetrics::append(BSONObjBuilder* bob, Role role) const {
- auto getElapsedTime = [role](const TimeInterval& interval) -> int64_t {
- if (role == Role::kAll)
- return durationCount<Milliseconds>(interval.duration());
- else
- return durationCount<Seconds>(interval.duration());
+void ReshardingMetrics::OperationMetrics::appendCurrentOpMetrics(BSONObjBuilder* bob,
+ Role role) const {
+ auto getElapsedTime = [](const TimeInterval& interval) -> int64_t {
+ return durationCount<Seconds>(interval.duration());
};
auto remainingMsec = [&]() -> boost::optional<Milliseconds> {
@@ -256,74 +246,46 @@ void ReshardingMetrics::OperationMetrics::append(BSONObjBuilder* bob, Role role)
return {};
}();
+ bob->append(kOpTimeElapsed, getElapsedTime(runningOperation));
- const std::string kIntervalSuffix = role == Role::kAll ? "Millis" : "";
- bob->append(kOpTimeElapsed + kIntervalSuffix, getElapsedTime(runningOperation));
-
- bob->append(kOpTimeRemaining + kIntervalSuffix,
+ bob->append(kOpTimeRemaining,
!remainingMsec ? int64_t{-1} /** -1 is a specified integer null value */
- : role == Role::kAll ? durationCount<Milliseconds>(*remainingMsec)
- : durationCount<Seconds>(*remainingMsec));
-
- if (role == Role::kAll || role == Role::kRecipient) {
- bob->append(kDocumentsToCopy, documentsToCopy);
- bob->append(kDocumentsCopied, documentsCopied);
- bob->append(kBytesToCopy, bytesToCopy);
- bob->append(kBytesCopied, bytesCopied);
- bob->append(kCopyTimeElapsed + kIntervalSuffix, getElapsedTime(copyingDocuments));
-
- bob->append(kOplogsFetched, oplogEntriesFetched);
- bob->append(kOplogsApplied, oplogEntriesApplied);
- bob->append(kApplyTimeElapsed + kIntervalSuffix, getElapsedTime(applyingOplogEntries));
- }
-
- if (role == Role::kAll || role == Role::kDonor) {
- bob->append(kWritesDuringCritialSection, writesDuringCriticalSection);
- bob->append(kCriticalSectionTimeElapsed + kIntervalSuffix,
- getElapsedTime(inCriticalSection));
- }
+ : durationCount<Seconds>(*remainingMsec));
switch (role) {
case Role::kDonor:
+ bob->append(kWritesDuringCritialSection, writesDuringCriticalSection);
+ bob->append(kCriticalSectionTimeElapsed, getElapsedTime(inCriticalSection));
bob->append(kDonorState, DonorState_serializer(donorState));
- bob->append(kCompletionStatus, ReshardingOperationStatus_serializer(opStatus));
+ bob->append(kOpStatus, ReshardingOperationStatus_serializer(opStatus));
break;
case Role::kRecipient:
+ bob->append(kDocumentsToCopy, documentsToCopy);
+ bob->append(kDocumentsCopied, documentsCopied);
+ bob->append(kBytesToCopy, bytesToCopy);
+ bob->append(kBytesCopied, bytesCopied);
+ bob->append(kCopyTimeElapsed, getElapsedTime(copyingDocuments));
+
+ bob->append(kOplogsFetched, oplogEntriesFetched);
+ bob->append(kOplogsApplied, oplogEntriesApplied);
+ bob->append(kApplyTimeElapsed, getElapsedTime(applyingOplogEntries));
bob->append(kRecipientState, RecipientState_serializer(recipientState));
- bob->append(kCompletionStatus, ReshardingOperationStatus_serializer(opStatus));
+ bob->append(kOpStatus, ReshardingOperationStatus_serializer(opStatus));
break;
case Role::kCoordinator:
bob->append(kCoordinatorState, CoordinatorState_serializer(coordinatorState));
- bob->append(kCompletionStatus, ReshardingOperationStatus_serializer(opStatus));
- break;
- case Role::kAll:
- bob->append(kDonorState, donorState);
- bob->append(kRecipientState, recipientState);
- bob->append(kCoordinatorState, coordinatorState);
- bob->append(kCompletionStatus, opStatus);
+ bob->append(kOpStatus, ReshardingOperationStatus_serializer(opStatus));
break;
default:
MONGO_UNREACHABLE;
}
}
-void ReshardingMetrics::serialize(BSONObjBuilder* bob, ReporterOptions::Role role) const {
+void ReshardingMetrics::serializeCurrentOpMetrics(BSONObjBuilder* bob,
+ ReporterOptions::Role role) const {
stdx::lock_guard<Latch> lk(_mutex);
-
- if (role == ReporterOptions::Role::kAll) {
- bob->append(kTotalOps, _started);
- bob->append(kSuccessfulOps, _succeeded);
- bob->append(kFailedOps, _failed);
- bob->append(kCanceledOps, _canceled);
- }
-
- if (_currentOp) {
- _currentOp->append(bob, role);
- } else {
- // There are no resharding operations in progress, so report the default metrics.
- OperationMetrics opMetrics(_svcCtx->getFastClockSource());
- opMetrics.append(bob, role);
- }
+ if (_currentOp)
+ _currentOp->appendCurrentOpMetrics(bob, role);
}
BSONObj ReshardingMetrics::reportForCurrentOp(const ReporterOptions& options) const noexcept {
@@ -359,7 +321,7 @@ BSONObj ReshardingMetrics::reportForCurrentOp(const ReporterOptions& options) co
bob.append("ns", options.nss.toString());
bob.append("originatingCommand", originatingCommand);
- serialize(&bob, options.role);
+ serializeCurrentOpMetrics(&bob, options.role);
return bob.obj();
}
@@ -370,4 +332,23 @@ boost::optional<Milliseconds> ReshardingMetrics::getOperationElapsedTime() const
return _currentOp->runningOperation.duration();
}
+void ReshardingMetrics::OperationMetrics::appendCumulativeOpMetrics(BSONObjBuilder* bob) const {
+ bob->append(kDocumentsCopied, documentsCopied);
+ bob->append(kBytesCopied, bytesCopied);
+ bob->append(kOplogsApplied, oplogEntriesApplied);
+ bob->append(kWritesDuringCritialSection, writesDuringCriticalSection);
+ bob->append(kOplogsFetched, oplogEntriesFetched);
+}
+
+void ReshardingMetrics::serializeCumulativeOpMetrics(BSONObjBuilder* bob) const {
+ stdx::lock_guard<Latch> lk(_mutex);
+
+ bob->append(kTotalOps, _started);
+ bob->append(kSuccessfulOps, _succeeded);
+ bob->append(kFailedOps, _failed);
+ bob->append(kCanceledOps, _canceled);
+
+ _cumulativeOp.appendCumulativeOpMetrics(bob);
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_metrics.h b/src/mongo/db/s/resharding/resharding_metrics.h
index 32e7e0a53c7..f2aa4fe1795 100644
--- a/src/mongo/db/s/resharding/resharding_metrics.h
+++ b/src/mongo/db/s/resharding/resharding_metrics.h
@@ -52,7 +52,8 @@ public:
ReshardingMetrics(const ReshardingMetrics&) = delete;
ReshardingMetrics(ReshardingMetrics&&) = delete;
- explicit ReshardingMetrics(ServiceContext* svcCtx) : _svcCtx(svcCtx) {}
+ explicit ReshardingMetrics(ServiceContext* svcCtx)
+ : _svcCtx(svcCtx), _cumulativeOp(svcCtx->getFastClockSource()) {}
static ReshardingMetrics* get(ServiceContext*) noexcept;
@@ -84,7 +85,7 @@ public:
void onCompletion(ReshardingOperationStatusEnum) noexcept;
struct ReporterOptions {
- enum class Role { kAll, kDonor, kRecipient, kCoordinator };
+ enum class Role { kDonor, kRecipient, kCoordinator };
ReporterOptions(Role role, UUID id, NamespaceString nss, BSONObj shardKey, bool unique)
: role(role),
id(std::move(id)),
@@ -100,15 +101,11 @@ public:
};
BSONObj reportForCurrentOp(const ReporterOptions& options) const noexcept;
- /**
- * Append metrics to the builder in CurrentOp format for the given `role`.
- * If `role` is omitted, append in ServerStatus format.
- * There are significant format differences. ServerStatus:
- * - Uses integers instead of names for enum values to improve FTDC compression.
- * - Uses millisecond time intervals and + 'Millis' field name suffixes.
- * - Has no role. Any data from any available resharding role is merged in.
- */
- void serialize(BSONObjBuilder*, ReporterOptions::Role role = ReporterOptions::Role::kAll) const;
+ // Append metrics to the builder in CurrentOp format for the given `role`.
+ void serializeCurrentOpMetrics(BSONObjBuilder*, ReporterOptions::Role role) const;
+
+ // Append metrics to the builder in CumulativeOp (ServerStatus) format.
+ void serializeCumulativeOpMetrics(BSONObjBuilder*) const;
// Reports the elapsed time for the active resharding operation, or `boost::none`.
boost::optional<Milliseconds> getOperationElapsedTime() const;
@@ -125,7 +122,7 @@ private:
int64_t _failed = 0;
int64_t _canceled = 0;
- // Metrics for an active resharding operation. Accesses must be serialized using `_mutex`.
+ // Metrics for resharding operation. Accesses must be serialized using `_mutex`.
struct OperationMetrics {
// Allows tracking elapsed time for the resharding operation and its sub operations (e.g.,
// applying oplog entries).
@@ -135,7 +132,6 @@ private:
void start() noexcept;
- void tryEnd() noexcept;
void end() noexcept;
Milliseconds duration() const noexcept;
@@ -153,12 +149,9 @@ private:
inCriticalSection(clockSource) {}
using Role = ReporterOptions::Role;
- void append(BSONObjBuilder*, Role) const;
+ void appendCurrentOpMetrics(BSONObjBuilder*, Role) const;
- bool isCompleted() const noexcept {
- return !(opStatus == ReshardingOperationStatusEnum::kRunning ||
- opStatus == ReshardingOperationStatusEnum::kInactive);
- }
+ void appendCumulativeOpMetrics(BSONObjBuilder*) const;
TimeInterval runningOperation;
ReshardingOperationStatusEnum opStatus = ReshardingOperationStatusEnum::kInactive;
@@ -181,6 +174,7 @@ private:
CoordinatorStateEnum coordinatorState = CoordinatorStateEnum::kUnused;
};
boost::optional<OperationMetrics> _currentOp;
+ OperationMetrics _cumulativeOp;
};
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_metrics_test.cpp b/src/mongo/db/s/resharding/resharding_metrics_test.cpp
index 6624e17e6db..1ef859eac2e 100644
--- a/src/mongo/db/s/resharding/resharding_metrics_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_metrics_test.cpp
@@ -46,10 +46,17 @@ namespace {
using namespace fmt::literals;
-constexpr auto kOpTimeRemaining = "remainingOperationTimeEstimatedMillis"_sd;
+constexpr auto kOpTimeRemaining = "remainingOperationTimeEstimated"_sd;
class ReshardingMetricsTest : public ServiceContextTest {
public:
+ enum OpReportType {
+ CumulativeReport,
+ CurrentOpReportDonorRole,
+ CurrentOpReportRecipientRole,
+ CurrentOpReportCoordinatorRole
+ };
+
void setUp() override {
auto clockSource = std::make_unique<ClockSourceMock>();
_clockSource = clockSource.get();
@@ -67,20 +74,34 @@ public:
_clockSource->advance(step);
}
- auto getReport() {
+ auto getReport(OpReportType reportType) {
BSONObjBuilder bob;
- getMetrics()->serialize(&bob);
+ if (reportType == OpReportType::CumulativeReport) {
+ getMetrics()->serializeCumulativeOpMetrics(&bob);
+ } else if (reportType == OpReportType::CurrentOpReportDonorRole) {
+ getMetrics()->serializeCurrentOpMetrics(
+ &bob, ReshardingMetrics::ReporterOptions::Role::kDonor);
+ } else if (reportType == OpReportType::CurrentOpReportRecipientRole) {
+ getMetrics()->serializeCurrentOpMetrics(
+ &bob, ReshardingMetrics::ReporterOptions::Role::kRecipient);
+ } else {
+ getMetrics()->serializeCurrentOpMetrics(
+ &bob, ReshardingMetrics::ReporterOptions::Role::kCoordinator);
+ }
return bob.obj();
}
- void checkMetrics(std::string tag, int expectedValue) {
- const auto report = getReport();
- checkMetrics(report, std::move(tag), std::move(expectedValue));
+ void checkMetrics(std::string tag, int expectedValue, OpReportType reportType) {
+ const auto report = getReport(reportType);
+ checkMetrics(report, std::move(tag), expectedValue);
}
- void checkMetrics(std::string tag, int expectedValue, std::string errMsg) {
- const auto report = getReport();
- checkMetrics(report, std::move(tag), std::move(expectedValue), std::move(errMsg));
+ void checkMetrics(std::string tag,
+ int expectedValue,
+ std::string errMsg,
+ OpReportType reportType) {
+ const auto report = getReport(reportType);
+ checkMetrics(report, std::move(tag), expectedValue, std::move(errMsg));
}
void checkMetrics(const BSONObj& report,
@@ -104,17 +125,11 @@ DEATH_TEST_F(ReshardingMetricsTest, RunOnCompletionBeforeOnStart, "No operation
}
TEST_F(ReshardingMetricsTest, OperationStatus) {
- auto constexpr kTag = "opStatus";
- // No operation has completed yet, so the status is unknown.
- checkMetrics(kTag, (int)ReshardingOperationStatusEnum::kInactive);
- for (auto status : {ReshardingOperationStatusEnum::kSuccess,
- ReshardingOperationStatusEnum::kFailure,
- ReshardingOperationStatusEnum::kCanceled}) {
- getMetrics()->onStart();
- checkMetrics(kTag, (int)ReshardingOperationStatusEnum::kRunning);
- getMetrics()->onCompletion(status);
- checkMetrics(kTag, (int)status);
- }
+ getMetrics()->onStart();
+ const auto report = getReport(OpReportType::CurrentOpReportCoordinatorRole);
+ ASSERT_EQ(report.getStringField("opStatus"),
+ ReshardingOperationStatus_serializer(ReshardingOperationStatusEnum::kRunning));
+ getMetrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess);
}
TEST_F(ReshardingMetricsTest, TestOperationStatus) {
@@ -137,33 +152,34 @@ TEST_F(ReshardingMetricsTest, TestOperationStatus) {
getMetrics()->onCompletion(ReshardingOperationStatusEnum::kCanceled);
}
- checkMetrics("countReshardingSuccessful", kNumSuccessfulOps);
- checkMetrics("countReshardingFailures", kNumFailedOps);
- checkMetrics("countReshardingCanceled", kNumCanceledOps);
+ checkMetrics("countReshardingSuccessful", kNumSuccessfulOps, OpReportType::CumulativeReport);
+ checkMetrics("countReshardingFailures", kNumFailedOps, OpReportType::CumulativeReport);
+ checkMetrics("countReshardingCanceled", kNumCanceledOps, OpReportType::CumulativeReport);
const auto total = kNumSuccessfulOps + kNumFailedOps + kNumCanceledOps;
- checkMetrics("countReshardingOperations", total);
+ checkMetrics("countReshardingOperations", total, OpReportType::CumulativeReport);
getMetrics()->onStart();
- checkMetrics("countReshardingOperations", total + 1);
+ checkMetrics("countReshardingOperations", total + 1, OpReportType::CumulativeReport);
}
TEST_F(ReshardingMetricsTest, TestElapsedTime) {
getMetrics()->onStart();
- advanceTime();
- getMetrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess);
- checkMetrics("totalOperationTimeElapsedMillis", kTimerStep);
+ const auto elapsedTime = 1;
+ advanceTime(Seconds(elapsedTime));
+ checkMetrics("totalOperationTimeElapsed", elapsedTime, OpReportType::CurrentOpReportDonorRole);
}
TEST_F(ReshardingMetricsTest, TestDonorAndRecipientMetrics) {
getMetrics()->onStart();
+ const auto elapsedTime = 1;
- advanceTime();
+ advanceTime(Seconds(elapsedTime));
// Update metrics for donor
const auto kWritesDuringCriticalSection = 7;
getMetrics()->setDonorState(DonorStateEnum::kPreparingToBlockWrites);
getMetrics()->onWriteDuringCriticalSection(kWritesDuringCriticalSection);
- advanceTime();
+ advanceTime(Seconds(elapsedTime));
// Update metrics for recipient
const auto kDocumentsToCopy = 50;
@@ -173,61 +189,121 @@ TEST_F(ReshardingMetricsTest, TestDonorAndRecipientMetrics) {
getMetrics()->setDocumentsToCopy(kDocumentsToCopy, kBytesToCopy);
getMetrics()->onDocumentsCopied(kDocumentsToCopy * kCopyProgress / 100,
kBytesToCopy * kCopyProgress / 100);
- advanceTime();
+ advanceTime(Seconds(elapsedTime));
- const auto report = getReport();
+ const auto currentDonorOpReport = getReport(OpReportType::CurrentOpReportDonorRole);
+ const auto currentRecipientOpReport = getReport(OpReportType::CurrentOpReportRecipientRole);
getMetrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess);
- checkMetrics(report, "totalCopyTimeElapsedMillis", kTimerStep);
- checkMetrics(report, "bytesCopied", kBytesToCopy * kCopyProgress / 100);
- checkMetrics(report, "documentsCopied", kDocumentsToCopy * kCopyProgress / 100);
- checkMetrics(report, "totalCriticalSectionTimeElapsedMillis", kTimerStep * 2);
- checkMetrics(report, "countWritesDuringCriticalSection", kWritesDuringCriticalSection);
-
- // Expected remaining time = totalCopyTimeElapsedMillis + 2 * estimated time to copy remaining
- checkMetrics(report,
- "remainingOperationTimeEstimatedMillis",
- kTimerStep + 2 * (100 - kCopyProgress) / kCopyProgress * kTimerStep);
+ checkMetrics(currentRecipientOpReport, "totalCopyTimeElapsed", elapsedTime);
+ checkMetrics(currentRecipientOpReport, "bytesCopied", kBytesToCopy * kCopyProgress / 100);
+ checkMetrics(
+ currentRecipientOpReport, "documentsCopied", kDocumentsToCopy * kCopyProgress / 100);
+ checkMetrics(currentDonorOpReport, "totalCriticalSectionTimeElapsed", elapsedTime * 2);
+ checkMetrics(
+ currentDonorOpReport, "countWritesDuringCriticalSection", kWritesDuringCriticalSection);
+
+ // Expected remaining time = totalCopyTimeElapsed + 2 * estimated time to copy remaining
+ checkMetrics(currentDonorOpReport,
+ "remainingOperationTimeEstimated",
+ elapsedTime + 2 * (100 - kCopyProgress) / kCopyProgress * elapsedTime);
+
+ const auto cumulativeReportAfterCompletion = getReport(OpReportType::CumulativeReport);
+ checkMetrics(
+ cumulativeReportAfterCompletion, "bytesCopied", kBytesToCopy * kCopyProgress / 100);
+ checkMetrics(
+ cumulativeReportAfterCompletion, "documentsCopied", kDocumentsToCopy * kCopyProgress / 100);
+ checkMetrics(cumulativeReportAfterCompletion,
+ "countWritesDuringCriticalSection",
+ kWritesDuringCriticalSection);
}
-TEST_F(ReshardingMetricsTest, MetricsAreRetainedAfterCompletion) {
- auto constexpr kTag = "totalOperationTimeElapsedMillis";
+TEST_F(ReshardingMetricsTest, CumulativeOpMetricsAreRetainedAfterCompletion) {
+ auto constexpr kTag = "documentsCopied";
+ getMetrics()->onStart();
+ const auto kDocumentsToCopy = 2;
+ const auto kBytesToCopy = 200;
+ getMetrics()->setRecipientState(RecipientStateEnum::kCloning);
+ getMetrics()->onDocumentsCopied(kDocumentsToCopy, kBytesToCopy);
+ advanceTime();
+ getMetrics()->onCompletion(ReshardingOperationStatusEnum::kFailure);
+ advanceTime();
+
+ checkMetrics(kTag,
+ kDocumentsToCopy,
+ "Cumulative metrics are not retained",
+ OpReportType::CumulativeReport);
+
+ getMetrics()->onStart();
+ checkMetrics(
+ kTag, kDocumentsToCopy, "Cumulative metrics are reset", OpReportType::CumulativeReport);
+}
+TEST_F(ReshardingMetricsTest, CurrentOpMetricsAreResetAfterCompletion) {
+ auto constexpr kTag = "documentsCopied";
getMetrics()->onStart();
+ const auto kDocumentsToCopy = 2;
+ const auto kBytesToCopy = 200;
+ getMetrics()->setRecipientState(RecipientStateEnum::kCloning);
+ getMetrics()->onDocumentsCopied(kDocumentsToCopy, kBytesToCopy);
+ checkMetrics(kTag,
+ kDocumentsToCopy,
+ "Current metrics are not set",
+ OpReportType::CurrentOpReportRecipientRole);
advanceTime();
getMetrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess);
advanceTime();
- checkMetrics(kTag, kTimerStep, "Metrics are not retained");
+ getMetrics()->onStart();
+ checkMetrics(
+ kTag, 0, "Current metrics are not reset", OpReportType::CurrentOpReportRecipientRole);
+}
+TEST_F(ReshardingMetricsTest, CurrentOpMetricsAreNotRetainedAfterCompletion) {
+ auto constexpr kTag = "documentsCopied";
getMetrics()->onStart();
- checkMetrics(kTag, 0, "Metrics are not reset");
+ const auto kDocumentsToCopy = 2;
+ const auto kBytesToCopy = 200;
+ getMetrics()->setRecipientState(RecipientStateEnum::kCloning);
+ getMetrics()->onDocumentsCopied(kDocumentsToCopy, kBytesToCopy);
+ checkMetrics(kTag,
+ kDocumentsToCopy,
+ "Current metrics are not set",
+ OpReportType::CurrentOpReportRecipientRole);
+ advanceTime();
+ getMetrics()->onCompletion(ReshardingOperationStatusEnum::kFailure);
+ advanceTime();
+
+ ASSERT_FALSE(getReport(OpReportType::CurrentOpReportRecipientRole)[kTag].ok());
}
TEST_F(ReshardingMetricsTest, EstimatedRemainingOperationTime) {
- auto constexpr kTag = "remainingOperationTimeEstimatedMillis";
+ auto constexpr kTag = "remainingOperationTimeEstimated";
+ const auto elapsedTime = 1;
getMetrics()->onStart();
- checkMetrics(kTag, -1);
+ checkMetrics(kTag, -1, OpReportType::CurrentOpReportDonorRole);
const auto kDocumentsToCopy = 2;
const auto kBytesToCopy = 200;
getMetrics()->setRecipientState(RecipientStateEnum::kCloning);
getMetrics()->setDocumentsToCopy(kDocumentsToCopy, kBytesToCopy);
getMetrics()->onDocumentsCopied(kDocumentsToCopy / 2, kBytesToCopy / 2);
- advanceTime();
+ advanceTime(Seconds(elapsedTime));
// Since 50% of the data is copied, the remaining copy time equals the elapsed copy time, which
- // is equal to `kTimerStep` milliseconds.
- checkMetrics(kTag, kTimerStep + 2 * kTimerStep);
+ // is equal to `elapsedTime` seconds.
+ checkMetrics(kTag, elapsedTime + 2 * elapsedTime, OpReportType::CurrentOpReportDonorRole);
const auto kOplogEntriesFetched = 4;
const auto kOplogEntriesApplied = 2;
getMetrics()->setRecipientState(RecipientStateEnum::kApplying);
getMetrics()->onOplogEntriesFetched(kOplogEntriesFetched);
getMetrics()->onOplogEntriesApplied(kOplogEntriesApplied);
- advanceTime();
- // So far, the time to apply oplog entries equals `kTimerStep` milliseconds.
- checkMetrics(kTag, kTimerStep * (kOplogEntriesFetched / kOplogEntriesApplied - 1));
+ advanceTime(Seconds(elapsedTime));
+ // So far, the time to apply oplog entries equals `elapsedTime` seconds.
+ checkMetrics(kTag,
+ elapsedTime * (kOplogEntriesFetched / kOplogEntriesApplied - 1),
+ OpReportType::CurrentOpReportDonorRole);
}
TEST_F(ReshardingMetricsTest, CurrentOpReportForDonor) {
@@ -384,7 +460,7 @@ TEST_F(ReshardingMetricsTest, EstimatedRemainingOperationTimeCloning) {
auto m = getMetrics();
m->onStart();
m->setRecipientState(RecipientStateEnum::kCloning);
- auto timePerDocument = Milliseconds{123};
+ auto timePerDocument = Seconds(2);
int64_t bytesPerDocument = 1024;
int64_t documentsToCopy = 409;
int64_t bytesToCopy = bytesPerDocument * documentsToCopy;
@@ -392,12 +468,13 @@ TEST_F(ReshardingMetricsTest, EstimatedRemainingOperationTimeCloning) {
auto remainingTime = 2 * timePerDocument * documentsToCopy;
double maxAbsRelErr = 0;
for (int64_t copied = 0; copied < documentsToCopy; ++copied) {
- double output = getReport()[kOpTimeRemaining].Number();
+ double output =
+ getReport(OpReportType::CurrentOpReportRecipientRole)[kOpTimeRemaining].Number();
if (copied == 0) {
ASSERT_EQ(output, -1);
} else {
ASSERT_GTE(output, 0);
- auto expected = durationCount<Milliseconds>(remainingTime);
+ auto expected = durationCount<Seconds>(remainingTime);
// Check that error is pretty small (it should get better as the operation progresses)
double absRelErr = std::abs((output - expected) / expected);
ASSERT_LT(absRelErr, 0.05)
@@ -417,17 +494,20 @@ TEST_F(ReshardingMetricsTest, EstimatedRemainingOperationTimeApplying) {
auto m = getMetrics();
m->onStart();
m->setRecipientState(RecipientStateEnum::kApplying);
- auto timePerOp = Milliseconds{123};
+ // 1 extra millisecond here because otherwise an error of just 1ms will round this down to the
+ // next second.
+ auto timePerOp = Milliseconds(1001);
int64_t fetched = 10000;
m->onOplogEntriesFetched(fetched);
auto remainingTime = timePerOp * fetched;
double maxAbsRelErr = 0;
for (int64_t applied = 0; applied < fetched; ++applied) {
- double output = getReport()[kOpTimeRemaining].Number();
+ double output =
+ getReport(OpReportType::CurrentOpReportRecipientRole)[kOpTimeRemaining].Number();
if (applied == 0) {
ASSERT_EQ(output, -1);
} else {
- auto expected = durationCount<Milliseconds>(remainingTime);
+ auto expected = durationCount<Seconds>(remainingTime);
// Check that error is pretty small (it should get better as the operation progresses)
double absRelErr = std::abs((output - expected) / expected);
ASSERT_LT(absRelErr, 0.05)
@@ -442,5 +522,29 @@ TEST_F(ReshardingMetricsTest, EstimatedRemainingOperationTimeApplying) {
5422701, 3, "Max absolute relative error observed", "maxAbsRelErr"_attr = maxAbsRelErr);
}
+TEST_F(ReshardingMetricsTest, CumulativeOpMetricsAccumulate) {
+ auto constexpr kTag = "documentsCopied";
+ getMetrics()->onStart();
+ const auto kDocumentsToCopy1 = 2;
+ const auto kBytesToCopy1 = 200;
+
+ getMetrics()->setRecipientState(RecipientStateEnum::kCloning);
+ getMetrics()->onDocumentsCopied(kDocumentsToCopy1, kBytesToCopy1);
+ getMetrics()->onCompletion(ReshardingOperationStatusEnum::kFailure);
+
+ getMetrics()->onStart();
+ const auto kDocumentsToCopy2 = 3;
+ const auto kBytesToCopy2 = 400;
+
+ getMetrics()->setRecipientState(RecipientStateEnum::kCloning);
+ getMetrics()->onDocumentsCopied(kDocumentsToCopy2, kBytesToCopy2);
+ getMetrics()->onCompletion(ReshardingOperationStatusEnum::kFailure);
+
+ checkMetrics(kTag,
+ kDocumentsToCopy1 + kDocumentsToCopy2,
+ "Cumulative metrics are not accumulated",
+ OpReportType::CumulativeReport);
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp
index e49959b7143..224adf089f8 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp
@@ -280,7 +280,8 @@ public:
long long metricsAppliedCount() const {
BSONObjBuilder bob;
- _metrics->serialize(&bob, ReshardingMetrics::ReporterOptions::Role::kRecipient);
+ _metrics->serializeCurrentOpMetrics(&bob,
+ ReshardingMetrics::ReporterOptions::Role::kRecipient);
return bob.obj()["oplogEntriesApplied"_sd].Long();
}
diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp
index ab0932a9daf..93bf7646532 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp
@@ -307,7 +307,8 @@ public:
long long metricsFetchedCount() const {
BSONObjBuilder bob;
- _metrics->serialize(&bob, ReshardingMetrics::ReporterOptions::Role::kRecipient);
+ _metrics->serializeCurrentOpMetrics(&bob,
+ ReshardingMetrics::ReporterOptions::Role::kRecipient);
return bob.obj()["oplogEntriesFetched"_sd].Long();
}
diff --git a/src/mongo/db/s/sharding_server_status.cpp b/src/mongo/db/s/sharding_server_status.cpp
index 19015bf5472..3489612493c 100644
--- a/src/mongo/db/s/sharding_server_status.cpp
+++ b/src/mongo/db/s/sharding_server_status.cpp
@@ -118,7 +118,8 @@ public:
// checking whether the resharding feature is enabled here.
if (resharding::gFeatureFlagResharding.isEnabledAndIgnoreFCV()) {
BSONObjBuilder subObjBuilder(result.subobjStart("resharding"));
- ReshardingMetrics::get(opCtx->getServiceContext())->serialize(&subObjBuilder);
+ ReshardingMetrics::get(opCtx->getServiceContext())
+ ->serializeCumulativeOpMetrics(&subObjBuilder);
}
return result.obj();