summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBlake Oler <blake.oler@mongodb.com>2021-05-24 18:21:20 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-06-01 21:11:28 +0000
commit83fea20e50872158c620b0eef720a77a173f176a (patch)
tree382ba266db43d4d860434dd261302a2e67650065
parentf5920f892b6cc0572140be6b81babf8bf4419278 (diff)
downloadmongo-83fea20e50872158c620b0eef720a77a173f176a.tar.gz
SERVER-57153 Support co-existing donors/recipients in resharding metrics
-rw-r--r--src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp3
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.cpp39
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service.cpp2
-rw-r--r--src/mongo/db/s/resharding/resharding_metrics.cpp132
-rw-r--r--src/mongo/db/s/resharding/resharding_metrics.h46
-rw-r--r--src/mongo/db/s/resharding/resharding_metrics_test.cpp179
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp6
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp6
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.cpp14
-rw-r--r--src/mongo/db/s/resharding_test_commands.cpp3
10 files changed, 284 insertions, 146 deletions
diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp b/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp
index f105299e080..f47186f8e91 100644
--- a/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp
@@ -109,7 +109,8 @@ protected:
void setUp() override {
ServiceContextTest::setUp();
_metrics = std::make_unique<ReshardingMetrics>(getServiceContext());
- _metrics->onStart(getServiceContext()->getFastClockSource()->now());
+ _metrics->onStart(ReshardingMetrics::Role::kRecipient,
+ getServiceContext()->getFastClockSource()->now());
_metrics->setRecipientState(RecipientStateEnum::kCloning);
}
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
index 288cc3c52c8..a57d269c2a2 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
@@ -1009,14 +1009,19 @@ void ReshardingCoordinatorService::ReshardingCoordinator::installCoordinatorDoc(
}
void markCompleted(const Status& status) {
- auto currentTime = getCurrentTime();
auto metrics = ReshardingMetrics::get(cc().getServiceContext());
- if (status.isOK())
- metrics->onCompletion(ReshardingOperationStatusEnum::kSuccess, currentTime);
- else if (status == ErrorCodes::ReshardCollectionAborted)
- metrics->onCompletion(ReshardingOperationStatusEnum::kCanceled, currentTime);
- else
- metrics->onCompletion(ReshardingOperationStatusEnum::kFailure, currentTime);
+ auto metricsOperationStatus = [&] {
+ if (status.isOK()) {
+ return ReshardingOperationStatusEnum::kSuccess;
+ } else if (status == ErrorCodes::ReshardCollectionAborted) {
+ return ReshardingOperationStatusEnum::kCanceled;
+ } else {
+ return ReshardingOperationStatusEnum::kFailure;
+ }
+ }();
+
+ metrics->onCompletion(
+ ReshardingMetrics::Role::kCoordinator, metricsOperationStatus, getCurrentTime());
}
BSONObj createFlushReshardingStateChangeCommand(const NamespaceString& nss) {
@@ -1200,7 +1205,8 @@ SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run(
// On stepdown or shutdown, the _scopedExecutor may have already been shut down.
// Schedule cleanup work on the parent executor.
if (_ctHolder->isSteppingOrShuttingDown()) {
- ReshardingMetrics::get(cc().getServiceContext())->onStepDown();
+ ReshardingMetrics::get(cc().getServiceContext())
+ ->onStepDown(ReshardingMetrics::Role::kCoordinator);
}
if (!status.isOK()) {
@@ -1264,12 +1270,11 @@ void ReshardingCoordinatorService::ReshardingCoordinator::abort() {
boost::optional<BSONObj> ReshardingCoordinatorService::ReshardingCoordinator::reportForCurrentOp(
MongoProcessInterface::CurrentOpConnectionsMode,
MongoProcessInterface::CurrentOpSessionsMode) noexcept {
- ReshardingMetrics::ReporterOptions options(
- ReshardingMetrics::ReporterOptions::Role::kCoordinator,
- _coordinatorDoc.getReshardingUUID(),
- _coordinatorDoc.getSourceNss(),
- _coordinatorDoc.getReshardingKey().toBSON(),
- false);
+ ReshardingMetrics::ReporterOptions options(ReshardingMetrics::Role::kCoordinator,
+ _coordinatorDoc.getReshardingUUID(),
+ _coordinatorDoc.getSourceNss(),
+ _coordinatorDoc.getReshardingKey().toBSON(),
+ false);
return ReshardingMetrics::get(cc().getServiceContext())->reportForCurrentOp(options);
}
@@ -1289,7 +1294,8 @@ void ReshardingCoordinatorService::ReshardingCoordinator::onOkayToEnterCritical(
void ReshardingCoordinatorService::ReshardingCoordinator::_insertCoordDocAndChangeOrigCollEntry() {
if (_coordinatorDoc.getState() > CoordinatorStateEnum::kUnused) {
_coordinatorDocWrittenPromise.emplaceValue();
- ReshardingMetrics::get(cc().getServiceContext())->onStepUp();
+ ReshardingMetrics::get(cc().getServiceContext())
+ ->onStepUp(ReshardingMetrics::Role::kCoordinator);
return;
}
@@ -1303,7 +1309,8 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_insertCoordDocAndChan
_coordinatorDocWrittenPromise.emplaceValue();
// TODO SERVER-53914 to accommodate loading metrics for the coordinator.
- ReshardingMetrics::get(cc().getServiceContext())->onStart(getCurrentTime());
+ ReshardingMetrics::get(cc().getServiceContext())
+ ->onStart(ReshardingMetrics::Role::kCoordinator, getCurrentTime());
}
void ReshardingCoordinatorService::ReshardingCoordinator::
diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp
index bb78368e144..684bf5475d1 100644
--- a/src/mongo/db/s/resharding/resharding_donor_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp
@@ -411,7 +411,7 @@ void ReshardingDonorService::DonorStateMachine::interrupt(Status status) {}
boost::optional<BSONObj> ReshardingDonorService::DonorStateMachine::reportForCurrentOp(
MongoProcessInterface::CurrentOpConnectionsMode connMode,
MongoProcessInterface::CurrentOpSessionsMode sessionMode) noexcept {
- ReshardingMetrics::ReporterOptions options(ReshardingMetrics::ReporterOptions::Role::kDonor,
+ ReshardingMetrics::ReporterOptions options(ReshardingMetrics::Role::kDonor,
_metadata.getReshardingUUID(),
_metadata.getSourceNss(),
_metadata.getReshardingKey().toBSON(),
diff --git a/src/mongo/db/s/resharding/resharding_metrics.cpp b/src/mongo/db/s/resharding/resharding_metrics.cpp
index 2b01ae0e77f..ad766457587 100644
--- a/src/mongo/db/s/resharding/resharding_metrics.cpp
+++ b/src/mongo/db/s/resharding/resharding_metrics.cpp
@@ -89,22 +89,44 @@ ReshardingMetrics* ReshardingMetrics::get(ServiceContext* ctx) noexcept {
return getMetrics(ctx).get();
}
-void ReshardingMetrics::onStart(Date_t runningOperationStartTime) noexcept {
+void ReshardingMetrics::onStart(Role role, Date_t runningOperationStartTime) noexcept {
stdx::lock_guard<Latch> lk(_mutex);
// TODO Re-add this invariant once all breaking test cases have been fixed.
// invariant(!_currentOp.has_value(), kAnotherOperationInProgress);
+
+ if (!_currentOp) {
+ // Only incremement _started if this is the first time resharding metrics is being invoked
+ // for this resharding operation, and we're not restoring the PrimaryOnlyService from disk.
+ _started++;
+ }
+
// Create a new operation and record the time it started.
- _currentOp.emplace(_svcCtx->getFastClockSource());
- _currentOp->runningOperation.start(runningOperationStartTime);
- _currentOp->opStatus = ReshardingOperationStatusEnum::kRunning;
- _started++;
+ _emplaceCurrentOpForRole(role, runningOperationStartTime);
}
-void ReshardingMetrics::onCompletion(ReshardingOperationStatusEnum status,
+void ReshardingMetrics::onCompletion(Role role,
+ ReshardingOperationStatusEnum status,
Date_t runningOperationEndTime) noexcept {
stdx::lock_guard<Latch> lk(_mutex);
- // TODO Re-add this invariant once all breaking test cases have been fixed.
+ // TODO Re-add this invariant once all breaking test cases have been fixed. Add invariant that
+ // role being completed is a role that is in progress.
// invariant(_currentOp.has_value(), kNoOperationInProgress);
+
+ if (_currentOp->donorState && _currentOp->recipientState) {
+ switch (role) {
+ case Role::kDonor:
+ _currentOp->donorState = boost::none;
+ break;
+ case Role::kRecipient:
+ _currentOp->recipientState = boost::none;
+ break;
+ default:
+ MONGO_UNREACHABLE;
+ }
+
+ return;
+ }
+
switch (status) {
case ReshardingOperationStatusEnum::kSuccess:
_succeeded++;
@@ -125,10 +147,9 @@ void ReshardingMetrics::onCompletion(ReshardingOperationStatusEnum status,
_currentOp = boost::none;
}
-void ReshardingMetrics::onStepUp() noexcept {
+void ReshardingMetrics::onStepUp(Role role) noexcept {
stdx::lock_guard<Latch> lk(_mutex);
- invariant(!_currentOp.has_value(), kAnotherOperationInProgress);
- _currentOp.emplace(_svcCtx->getFastClockSource());
+ _emplaceCurrentOpForRole(role, boost::none);
// TODO SERVER-53913 Implement donor metrics rehydration.
// TODO SERVER-53914 Implement coordinator metrics rehydration.
@@ -136,13 +157,56 @@ void ReshardingMetrics::onStepUp() noexcept {
// TODO SERVER-57094 Resume the runningOperation duration from a timestamp stored on disk
// instead of starting from the current time.
- _currentOp->runningOperation.start(_svcCtx->getFastClockSource()->now());
- _currentOp->opStatus = ReshardingOperationStatusEnum::kRunning;
}
-void ReshardingMetrics::onStepDown() noexcept {
+void ReshardingMetrics::onStepDown(Role role) noexcept {
stdx::lock_guard<Latch> lk(_mutex);
- _currentOp = boost::none;
+ if (_currentOp && _currentOp->donorState && _currentOp->recipientState) {
+ switch (role) {
+ case Role::kDonor:
+ _currentOp->donorState = boost::none;
+ break;
+ case Role::kRecipient:
+ _currentOp->recipientState = boost::none;
+ break;
+ default:
+ MONGO_UNREACHABLE;
+ }
+ } else {
+ _currentOp = boost::none;
+ }
+}
+
+void ReshardingMetrics::_emplaceCurrentOpForRole(
+ Role role, boost::optional<Date_t> runningOperationStartTime) noexcept {
+ // Invariants in this function ensure that the only multi-role state allowed is a combination
+ // of donor and recipient.
+ if (!_currentOp) {
+ _currentOp.emplace(_svcCtx->getFastClockSource());
+ _currentOp->runningOperation.start(runningOperationStartTime
+ ? *runningOperationStartTime
+ : _svcCtx->getFastClockSource()->now());
+ _currentOp->opStatus = ReshardingOperationStatusEnum::kRunning;
+ } else {
+ invariant(role != Role::kCoordinator, kAnotherOperationInProgress);
+ invariant(!_currentOp->coordinatorState, kAnotherOperationInProgress);
+ }
+
+ switch (role) {
+ case Role::kCoordinator:
+ _currentOp->coordinatorState.emplace(CoordinatorStateEnum::kUnused);
+ break;
+ case Role::kDonor:
+ invariant(!_currentOp->donorState, kAnotherOperationInProgress);
+ _currentOp->donorState.emplace(DonorStateEnum::kUnused);
+ break;
+ case Role::kRecipient:
+ invariant(!_currentOp->recipientState, kAnotherOperationInProgress);
+ _currentOp->recipientState.emplace(RecipientStateEnum::kUnused);
+ break;
+ default:
+ MONGO_UNREACHABLE
+ }
}
void ReshardingMetrics::setDonorState(DonorStateEnum state) noexcept {
@@ -167,12 +231,21 @@ void ReshardingMetrics::setCoordinatorState(CoordinatorStateEnum state) noexcept
_currentOp->coordinatorState = state;
}
-static StringData serializeState(RecipientStateEnum e) {
- return RecipientState_serializer(e);
+// TODO SERVER-57217 Remove special-casing for the non-existence of the boost::optional.
+static StringData serializeState(boost::optional<RecipientStateEnum> e) {
+ return e ? RecipientState_serializer(*e)
+ : RecipientState_serializer(RecipientStateEnum::kUnused);
+}
+
+// TODO SERVER-57217 Remove special-casing for the non-existence of the boost::optional.
+static StringData serializeState(boost::optional<DonorStateEnum> e) {
+ return e ? DonorState_serializer(*e) : DonorState_serializer(DonorStateEnum::kUnused);
}
-static StringData serializeState(DonorStateEnum e) {
- return DonorState_serializer(e);
+// TODO SERVER-57217 Remove special-casing for the non-existence of the boost::optional.
+static StringData serializeState(boost::optional<CoordinatorStateEnum> e) {
+ return e ? CoordinatorState_serializer(*e)
+ : CoordinatorState_serializer(CoordinatorStateEnum::kUnused);
}
template <typename T>
@@ -209,7 +282,7 @@ void ReshardingMetrics::onDocumentsCopied(int64_t documents, int64_t bytes) noex
if (!_currentOp)
return;
- invariant(checkState(_currentOp->recipientState,
+ invariant(checkState(*_currentOp->recipientState,
{RecipientStateEnum::kCloning, RecipientStateEnum::kError}));
_currentOp->documentsCopied += documents;
@@ -254,7 +327,7 @@ void ReshardingMetrics::onOplogEntriesFetched(int64_t entries) noexcept {
return;
invariant(checkState(
- _currentOp->recipientState,
+ *_currentOp->recipientState,
{RecipientStateEnum::kCloning, RecipientStateEnum::kApplying, RecipientStateEnum::kError}));
_currentOp->oplogEntriesFetched += entries;
@@ -266,7 +339,7 @@ void ReshardingMetrics::onOplogEntriesApplied(int64_t entries) noexcept {
if (!_currentOp)
return;
- invariant(checkState(_currentOp->recipientState,
+ invariant(checkState(*_currentOp->recipientState,
{RecipientStateEnum::kApplying, RecipientStateEnum::kError}));
_currentOp->oplogEntriesApplied += entries;
@@ -278,7 +351,7 @@ void ReshardingMetrics::onWriteDuringCriticalSection(int64_t writes) noexcept {
if (!_currentOp)
return;
- invariant(checkState(_currentOp->donorState,
+ invariant(checkState(*_currentOp->donorState,
{DonorStateEnum::kDonatingOplogEntries,
DonorStateEnum::kBlockingWrites,
DonorStateEnum::kError}));
@@ -333,7 +406,7 @@ void ReshardingMetrics::OperationMetrics::appendCurrentOpMetrics(BSONObjBuilder*
case Role::kDonor:
bob->append(kWritesDuringCritialSection, writesDuringCriticalSection);
bob->append(kCriticalSectionTimeElapsed, getElapsedTime(inCriticalSection));
- bob->append(kDonorState, DonorState_serializer(donorState));
+ bob->append(kDonorState, serializeState(donorState));
bob->append(kOpStatus, ReshardingOperationStatus_serializer(opStatus));
break;
case Role::kRecipient:
@@ -346,11 +419,11 @@ void ReshardingMetrics::OperationMetrics::appendCurrentOpMetrics(BSONObjBuilder*
bob->append(kOplogsFetched, oplogEntriesFetched);
bob->append(kOplogsApplied, oplogEntriesApplied);
bob->append(kApplyTimeElapsed, getElapsedTime(applyingOplogEntries));
- bob->append(kRecipientState, RecipientState_serializer(recipientState));
+ bob->append(kRecipientState, serializeState(recipientState));
bob->append(kOpStatus, ReshardingOperationStatus_serializer(opStatus));
break;
case Role::kCoordinator:
- bob->append(kCoordinatorState, CoordinatorState_serializer(coordinatorState));
+ bob->append(kCoordinatorState, serializeState(coordinatorState));
bob->append(kOpStatus, ReshardingOperationStatus_serializer(opStatus));
break;
default:
@@ -358,8 +431,7 @@ void ReshardingMetrics::OperationMetrics::appendCurrentOpMetrics(BSONObjBuilder*
}
}
-void ReshardingMetrics::serializeCurrentOpMetrics(BSONObjBuilder* bob,
- ReporterOptions::Role role) const {
+void ReshardingMetrics::serializeCurrentOpMetrics(BSONObjBuilder* bob, Role role) const {
stdx::lock_guard<Latch> lk(_mutex);
if (_currentOp)
_currentOp->appendCurrentOpMetrics(bob, role);
@@ -368,11 +440,11 @@ void ReshardingMetrics::serializeCurrentOpMetrics(BSONObjBuilder* bob,
BSONObj ReshardingMetrics::reportForCurrentOp(const ReporterOptions& options) const noexcept {
const auto role = [&options] {
switch (options.role) {
- case ReporterOptions::Role::kDonor:
+ case Role::kDonor:
return "Donor";
- case ReporterOptions::Role::kRecipient:
+ case Role::kRecipient:
return "Recipient";
- case ReporterOptions::Role::kCoordinator:
+ case Role::kCoordinator:
return "Coordinator";
default:
MONGO_UNREACHABLE;
diff --git a/src/mongo/db/s/resharding/resharding_metrics.h b/src/mongo/db/s/resharding/resharding_metrics.h
index aeb3ddf11e1..1cdc1f88766 100644
--- a/src/mongo/db/s/resharding/resharding_metrics.h
+++ b/src/mongo/db/s/resharding/resharding_metrics.h
@@ -49,6 +49,8 @@ namespace mongo {
*/
class ReshardingMetrics final {
public:
+ enum Role { kCoordinator, kDonor, kRecipient };
+
ReshardingMetrics(const ReshardingMetrics&) = delete;
ReshardingMetrics(ReshardingMetrics&&) = delete;
@@ -57,13 +59,13 @@ public:
static ReshardingMetrics* get(ServiceContext*) noexcept;
- // Marks the beginning of a resharding operation. Not that only one resharding operation may run
- // at any time.
- void onStart(Date_t runningOperationStartTime) noexcept;
+ // Marks the beginning of a resharding operation for a particular role. Note that:
+ // * Only one resharding operation may run at any time.
+ // * The only valid co-existing roles on a process are kDonor and kRecipient.
+ void onStart(Role role, Date_t runningOperationStartTime) noexcept;
- // Marks the resumption of a resharding operation. Note that only one resharding operation may
- // run at any time.
- void onStepUp() noexcept;
+ // Marks the resumption of a resharding operation for a particular role.
+ void onStepUp(Role role) noexcept;
// So long as a resharding operation is in progress, the following may be used to update the
// state of a donor, a recipient, and a coordinator, respectively.
@@ -94,17 +96,21 @@ public:
// "donating-oplog-entries" or "blocking-writes".
void onWriteDuringCriticalSection(int64_t writes) noexcept;
- // Tears down the currentOp variable so that the node that is stepping up may continue the
- // resharding operation from disk.
- void onStepDown() noexcept;
-
- // Marks the completion of the current (active) resharding operation. Aborts the process if no
- // resharding operation is in progress.
- void onCompletion(ReshardingOperationStatusEnum status,
+ // Indicates that a role on this node is stepping down. If the role being stepped down is the
+ // last active role on this process, the function tears down the currentOp variable. The
+ // replica set primary that is stepping up continues the resharding operation from disk.
+ void onStepDown(Role role) noexcept;
+
+ // Marks the completion of the current (active) resharding operation for a particular role. If
+ // the role being completed is the last active role on this process, the function tears down
+ // the currentOp variable, indicating completion for the resharding operation on this process.
+ //
+ // Aborts the process if no resharding operation is in progress.
+ void onCompletion(Role role,
+ ReshardingOperationStatusEnum status,
Date_t runningOperationEndTime) noexcept;
struct ReporterOptions {
- enum class Role { kDonor, kRecipient, kCoordinator };
ReporterOptions(Role role, UUID id, NamespaceString nss, BSONObj shardKey, bool unique)
: role(role),
id(std::move(id)),
@@ -121,7 +127,7 @@ public:
BSONObj reportForCurrentOp(const ReporterOptions& options) const noexcept;
// Append metrics to the builder in CurrentOp format for the given `role`.
- void serializeCurrentOpMetrics(BSONObjBuilder*, ReporterOptions::Role role) const;
+ void serializeCurrentOpMetrics(BSONObjBuilder*, Role role) const;
// Append metrics to the builder in CumulativeOp (ServerStatus) format.
void serializeCumulativeOpMetrics(BSONObjBuilder*) const;
@@ -137,6 +143,9 @@ private:
mutable Mutex _mutex = MONGO_MAKE_LATCH("ReshardingMetrics::_mutex");
+ void _emplaceCurrentOpForRole(Role role,
+ boost::optional<Date_t> runningOperationStartTime) noexcept;
+
// The following maintain the number of resharding operations that have started, succeeded,
// failed with an unrecoverable error, and canceled by the user, respectively.
int64_t _started = 0;
@@ -173,7 +182,6 @@ private:
applyingOplogEntries(clockSource),
inCriticalSection(clockSource) {}
- using Role = ReporterOptions::Role;
void appendCurrentOpMetrics(BSONObjBuilder*, Role) const;
void appendCumulativeOpMetrics(BSONObjBuilder*) const;
@@ -196,9 +204,9 @@ private:
TimeInterval inCriticalSection;
int64_t writesDuringCriticalSection = 0;
- DonorStateEnum donorState = DonorStateEnum::kUnused;
- RecipientStateEnum recipientState = RecipientStateEnum::kUnused;
- CoordinatorStateEnum coordinatorState = CoordinatorStateEnum::kUnused;
+ boost::optional<DonorStateEnum> donorState;
+ boost::optional<RecipientStateEnum> recipientState;
+ boost::optional<CoordinatorStateEnum> coordinatorState;
};
boost::optional<OperationMetrics> _currentOp;
OperationMetrics _cumulativeOp;
diff --git a/src/mongo/db/s/resharding/resharding_metrics_test.cpp b/src/mongo/db/s/resharding/resharding_metrics_test.cpp
index dd1a131501d..04e6e53e17f 100644
--- a/src/mongo/db/s/resharding/resharding_metrics_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_metrics_test.cpp
@@ -67,6 +67,23 @@ public:
return ReshardingMetrics::get(getGlobalServiceContext());
}
+ void startOperation(ReshardingMetrics::Role role) {
+ getMetrics()->onStart(role, getGlobalServiceContext()->getFastClockSource()->now());
+ }
+
+ void stepUpOperation(ReshardingMetrics::Role role) {
+ getMetrics()->onStepUp(role);
+ }
+
+ void stepDownOperation(ReshardingMetrics::Role role) {
+ getMetrics()->onStepDown(role);
+ }
+
+ void completeOperation(ReshardingMetrics::Role role, ReshardingOperationStatusEnum opStatus) {
+ getMetrics()->onCompletion(
+ role, opStatus, getGlobalServiceContext()->getFastClockSource()->now());
+ }
+
// Timer step in milliseconds
static constexpr auto kTimerStep = 100;
@@ -79,14 +96,11 @@ public:
if (reportType == OpReportType::CumulativeReport) {
getMetrics()->serializeCumulativeOpMetrics(&bob);
} else if (reportType == OpReportType::CurrentOpReportDonorRole) {
- getMetrics()->serializeCurrentOpMetrics(
- &bob, ReshardingMetrics::ReporterOptions::Role::kDonor);
+ getMetrics()->serializeCurrentOpMetrics(&bob, ReshardingMetrics::Role::kDonor);
} else if (reportType == OpReportType::CurrentOpReportRecipientRole) {
- getMetrics()->serializeCurrentOpMetrics(
- &bob, ReshardingMetrics::ReporterOptions::Role::kRecipient);
+ getMetrics()->serializeCurrentOpMetrics(&bob, ReshardingMetrics::Role::kRecipient);
} else {
- getMetrics()->serializeCurrentOpMetrics(
- &bob, ReshardingMetrics::ReporterOptions::Role::kCoordinator);
+ getMetrics()->serializeCurrentOpMetrics(&bob, ReshardingMetrics::Role::kCoordinator);
}
return bob.obj();
}
@@ -119,41 +133,74 @@ private:
// TODO Re-enable once underlying invariants are re-enabled
/*
DEATH_TEST_F(ReshardingMetricsTest, RunOnCompletionBeforeOnStart, "No operation is in progress") {
- getMetrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess,
-getGlobalServiceContext()->getFastClockSource()->now());
+ completeOperation(ReshardingMetrics::Role::kRecipient,
+ ReshardingOperationStatusEnum::kSuccess);
}
DEATH_TEST_F(ReshardingMetricsTest,
RunOnStepUpAfterOnStartInvariants,
"Another operation is in progress") {
- getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
- getMetrics()->onStepUp();
+ startOperation(ReshardingMetrics::Role::kRecipient);
+ stepUpOperation(ReshardingMetrics::Role::kRecipient);
}
DEATH_TEST_F(ReshardingMetricsTest,
RunOnCompletionAfterOnStepDownInvariants,
"No operation is in progress") {
- getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
- getMetrics()->onStepDown();
- getMetrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess,
-getGlobalServiceContext()->getFastClockSource()->now());
+ startOperation(ReshardingMetrics::Role::kRecipient);
+ stepDownOperation(ReshardingMetrics::Role::kRecipient);
+ completeOperation(ReshardingMetrics::Role::kRecipient,
+ ReshardingOperationStatusEnum::kSuccess);
}
*/
TEST_F(ReshardingMetricsTest, RunOnStepDownAfterOnCompletionIsSafe) {
- getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
- getMetrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess,
- getGlobalServiceContext()->getFastClockSource()->now());
- getMetrics()->onStepDown();
+ startOperation(ReshardingMetrics::Role::kRecipient);
+ completeOperation(ReshardingMetrics::Role::kRecipient, ReshardingOperationStatusEnum::kSuccess);
+ stepDownOperation(ReshardingMetrics::Role::kRecipient);
+}
+
+DEATH_TEST_F(ReshardingMetricsTest, CoordinatorThenDonor, "Another operation is in progress") {
+ startOperation(ReshardingMetrics::Role::kCoordinator);
+ startOperation(ReshardingMetrics::Role::kDonor);
+}
+
+DEATH_TEST_F(ReshardingMetricsTest, DonorThenCoordinator, "Another operation is in progress") {
+ startOperation(ReshardingMetrics::Role::kDonor);
+ startOperation(ReshardingMetrics::Role::kCoordinator);
+}
+
+DEATH_TEST_F(ReshardingMetricsTest, CoordinatorThenRecipient, "Another operation is in progress") {
+ startOperation(ReshardingMetrics::Role::kCoordinator);
+ startOperation(ReshardingMetrics::Role::kRecipient);
+}
+
+DEATH_TEST_F(ReshardingMetricsTest, RecipientThenCoordinator, "Another operation is in progress") {
+ startOperation(ReshardingMetrics::Role::kRecipient);
+ startOperation(ReshardingMetrics::Role::kCoordinator);
+}
+
+TEST_F(ReshardingMetricsTest, DonorAndRecipientCombinationIsSafe) {
+ startOperation(ReshardingMetrics::Role::kRecipient);
+ startOperation(ReshardingMetrics::Role::kDonor);
+ completeOperation(ReshardingMetrics::Role::kRecipient, ReshardingOperationStatusEnum::kSuccess);
+ completeOperation(ReshardingMetrics::Role::kDonor, ReshardingOperationStatusEnum::kSuccess);
+}
+
+TEST_F(ReshardingMetricsTest, DonorAndRecipientStepdownIsSafe) {
+ startOperation(ReshardingMetrics::Role::kDonor);
+ startOperation(ReshardingMetrics::Role::kRecipient);
+ stepDownOperation(ReshardingMetrics::Role::kRecipient);
+ stepDownOperation(ReshardingMetrics::Role::kDonor);
}
TEST_F(ReshardingMetricsTest, OperationStatus) {
- getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
+ startOperation(ReshardingMetrics::Role::kCoordinator);
const auto report = getReport(OpReportType::CurrentOpReportCoordinatorRole);
ASSERT_EQ(report.getStringField("opStatus"),
ReshardingOperationStatus_serializer(ReshardingOperationStatusEnum::kRunning));
- getMetrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess,
- getGlobalServiceContext()->getFastClockSource()->now());
+ completeOperation(ReshardingMetrics::Role::kCoordinator,
+ ReshardingOperationStatusEnum::kSuccess);
}
TEST_F(ReshardingMetricsTest, TestOperationStatus) {
@@ -162,21 +209,21 @@ TEST_F(ReshardingMetricsTest, TestOperationStatus) {
const auto kNumCanceledOps = 7;
for (auto i = 0; i < kNumSuccessfulOps; i++) {
- getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
- getMetrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess,
- getGlobalServiceContext()->getFastClockSource()->now());
+ startOperation(ReshardingMetrics::Role::kRecipient);
+ completeOperation(ReshardingMetrics::Role::kRecipient,
+ ReshardingOperationStatusEnum::kSuccess);
}
for (auto i = 0; i < kNumFailedOps; i++) {
- getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
- getMetrics()->onCompletion(ReshardingOperationStatusEnum::kFailure,
- getGlobalServiceContext()->getFastClockSource()->now());
+ startOperation(ReshardingMetrics::Role::kRecipient);
+ completeOperation(ReshardingMetrics::Role::kRecipient,
+ ReshardingOperationStatusEnum::kFailure);
}
for (auto i = 0; i < kNumCanceledOps; i++) {
- getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
- getMetrics()->onCompletion(ReshardingOperationStatusEnum::kCanceled,
- getGlobalServiceContext()->getFastClockSource()->now());
+ startOperation(ReshardingMetrics::Role::kRecipient);
+ completeOperation(ReshardingMetrics::Role::kRecipient,
+ ReshardingOperationStatusEnum::kCanceled);
}
checkMetrics("countReshardingSuccessful", kNumSuccessfulOps, OpReportType::CumulativeReport);
@@ -185,19 +232,20 @@ TEST_F(ReshardingMetricsTest, TestOperationStatus) {
const auto total = kNumSuccessfulOps + kNumFailedOps + kNumCanceledOps;
checkMetrics("countReshardingOperations", total, OpReportType::CumulativeReport);
- getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
+ startOperation(ReshardingMetrics::Role::kRecipient);
checkMetrics("countReshardingOperations", total + 1, OpReportType::CumulativeReport);
}
TEST_F(ReshardingMetricsTest, TestElapsedTime) {
- getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
+ startOperation(ReshardingMetrics::Role::kRecipient);
const auto elapsedTime = 1;
advanceTime(Seconds(elapsedTime));
checkMetrics("totalOperationTimeElapsed", elapsedTime, OpReportType::CurrentOpReportDonorRole);
}
TEST_F(ReshardingMetricsTest, TestDonorAndRecipientMetrics) {
- getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
+ startOperation(ReshardingMetrics::Role::kRecipient);
+ startOperation(ReshardingMetrics::Role::kDonor);
const auto elapsedTime = 1;
advanceTime(Seconds(elapsedTime));
@@ -223,8 +271,8 @@ TEST_F(ReshardingMetricsTest, TestDonorAndRecipientMetrics) {
const auto currentDonorOpReport = getReport(OpReportType::CurrentOpReportDonorRole);
const auto currentRecipientOpReport = getReport(OpReportType::CurrentOpReportRecipientRole);
- getMetrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess,
- getGlobalServiceContext()->getFastClockSource()->now());
+ completeOperation(ReshardingMetrics::Role::kDonor, ReshardingOperationStatusEnum::kSuccess);
+ completeOperation(ReshardingMetrics::Role::kRecipient, ReshardingOperationStatusEnum::kSuccess);
checkMetrics(currentRecipientOpReport, "totalCopyTimeElapsed", elapsedTime);
checkMetrics(currentRecipientOpReport, "bytesCopied", kBytesToCopy * kCopyProgress / 100);
@@ -251,15 +299,14 @@ TEST_F(ReshardingMetricsTest, TestDonorAndRecipientMetrics) {
TEST_F(ReshardingMetricsTest, CumulativeOpMetricsAreRetainedAfterCompletion) {
auto constexpr kTag = "documentsCopied";
- getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
+ startOperation(ReshardingMetrics::Role::kRecipient);
const auto kDocumentsToCopy = 2;
const auto kBytesToCopy = 200;
getMetrics()->setRecipientState(RecipientStateEnum::kCloning);
getMetrics()->startCopyingDocuments(getGlobalServiceContext()->getFastClockSource()->now());
getMetrics()->onDocumentsCopied(kDocumentsToCopy, kBytesToCopy);
advanceTime();
- getMetrics()->onCompletion(ReshardingOperationStatusEnum::kFailure,
- getGlobalServiceContext()->getFastClockSource()->now());
+ completeOperation(ReshardingMetrics::Role::kRecipient, ReshardingOperationStatusEnum::kFailure);
advanceTime();
checkMetrics(kTag,
@@ -267,22 +314,22 @@ TEST_F(ReshardingMetricsTest, CumulativeOpMetricsAreRetainedAfterCompletion) {
"Cumulative metrics are not retained",
OpReportType::CumulativeReport);
- getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
+ startOperation(ReshardingMetrics::Role::kRecipient);
checkMetrics(
kTag, kDocumentsToCopy, "Cumulative metrics are reset", OpReportType::CumulativeReport);
}
TEST_F(ReshardingMetricsTest, CumulativeOpMetricsAreRetainedAfterCancellation) {
auto constexpr kTag = "documentsCopied";
- getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
+ startOperation(ReshardingMetrics::Role::kRecipient);
const auto kDocumentsToCopy = 2;
const auto kBytesToCopy = 200;
getMetrics()->setRecipientState(RecipientStateEnum::kCloning);
getMetrics()->startCopyingDocuments(getGlobalServiceContext()->getFastClockSource()->now());
getMetrics()->onDocumentsCopied(kDocumentsToCopy, kBytesToCopy);
advanceTime();
- getMetrics()->onCompletion(ReshardingOperationStatusEnum::kCanceled,
- getGlobalServiceContext()->getFastClockSource()->now());
+ completeOperation(ReshardingMetrics::Role::kRecipient,
+ ReshardingOperationStatusEnum::kCanceled);
advanceTime();
checkMetrics(kTag,
@@ -290,14 +337,14 @@ TEST_F(ReshardingMetricsTest, CumulativeOpMetricsAreRetainedAfterCancellation) {
"Cumulative metrics are not retained",
OpReportType::CumulativeReport);
- getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
+ startOperation(ReshardingMetrics::Role::kRecipient);
checkMetrics(
kTag, kDocumentsToCopy, "Cumulative metrics are reset", OpReportType::CumulativeReport);
}
TEST_F(ReshardingMetricsTest, CurrentOpMetricsAreResetAfterCompletion) {
auto constexpr kTag = "documentsCopied";
- getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
+ startOperation(ReshardingMetrics::Role::kRecipient);
const auto kDocumentsToCopy = 2;
const auto kBytesToCopy = 200;
getMetrics()->setRecipientState(RecipientStateEnum::kCloning);
@@ -308,18 +355,17 @@ TEST_F(ReshardingMetricsTest, CurrentOpMetricsAreResetAfterCompletion) {
"Current metrics are not set",
OpReportType::CurrentOpReportRecipientRole);
advanceTime();
- getMetrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess,
- getGlobalServiceContext()->getFastClockSource()->now());
+ completeOperation(ReshardingMetrics::Role::kRecipient, ReshardingOperationStatusEnum::kSuccess);
advanceTime();
- getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
+ startOperation(ReshardingMetrics::Role::kRecipient);
checkMetrics(
kTag, 0, "Current metrics are not reset", OpReportType::CurrentOpReportRecipientRole);
}
TEST_F(ReshardingMetricsTest, CurrentOpMetricsAreNotRetainedAfterCompletion) {
auto constexpr kTag = "documentsCopied";
- getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
+ startOperation(ReshardingMetrics::Role::kRecipient);
const auto kDocumentsToCopy = 2;
const auto kBytesToCopy = 200;
getMetrics()->setRecipientState(RecipientStateEnum::kCloning);
@@ -330,8 +376,7 @@ TEST_F(ReshardingMetricsTest, CurrentOpMetricsAreNotRetainedAfterCompletion) {
"Current metrics are not set",
OpReportType::CurrentOpReportRecipientRole);
advanceTime();
- getMetrics()->onCompletion(ReshardingOperationStatusEnum::kFailure,
- getGlobalServiceContext()->getFastClockSource()->now());
+ completeOperation(ReshardingMetrics::Role::kRecipient, ReshardingOperationStatusEnum::kFailure);
advanceTime();
ASSERT_FALSE(getReport(OpReportType::CurrentOpReportRecipientRole)[kTag].ok());
@@ -339,7 +384,7 @@ TEST_F(ReshardingMetricsTest, CurrentOpMetricsAreNotRetainedAfterCompletion) {
TEST_F(ReshardingMetricsTest, CurrentOpMetricsAreNotRetainedAfterStepDown) {
auto constexpr kTag = "documentsCopied";
- getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
+ startOperation(ReshardingMetrics::Role::kRecipient);
const auto kDocumentsToCopy = 2;
const auto kBytesToCopy = 200;
getMetrics()->setRecipientState(RecipientStateEnum::kCloning);
@@ -350,7 +395,7 @@ TEST_F(ReshardingMetricsTest, CurrentOpMetricsAreNotRetainedAfterStepDown) {
"Current metrics are not set",
OpReportType::CurrentOpReportRecipientRole);
advanceTime();
- getMetrics()->onStepDown();
+ stepDownOperation(ReshardingMetrics::Role::kRecipient);
advanceTime();
ASSERT_FALSE(getReport(OpReportType::CurrentOpReportRecipientRole)[kTag].ok());
@@ -360,7 +405,7 @@ TEST_F(ReshardingMetricsTest, EstimatedRemainingOperationTime) {
auto constexpr kTag = "remainingOperationTimeEstimated";
const auto elapsedTime = 1;
- getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
+ startOperation(ReshardingMetrics::Role::kRecipient);
checkMetrics(kTag, -1, OpReportType::CurrentOpReportDonorRole);
const auto kDocumentsToCopy = 2;
@@ -391,14 +436,14 @@ TEST_F(ReshardingMetricsTest, EstimatedRemainingOperationTime) {
TEST_F(ReshardingMetricsTest, CurrentOpReportForDonor) {
const auto kDonorState = DonorStateEnum::kDonatingOplogEntries;
- getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
+ startOperation(ReshardingMetrics::Role::kDonor);
advanceTime(Seconds(2));
getMetrics()->setDonorState(kDonorState);
getMetrics()->startInCriticalSection(getGlobalServiceContext()->getFastClockSource()->now());
advanceTime(Seconds(3));
const ReshardingMetrics::ReporterOptions options(
- ReshardingMetrics::ReporterOptions::Role::kDonor,
+ ReshardingMetrics::Role::kDonor,
UUID::parse("12345678-1234-1234-1234-123456789abc").getValue(),
NamespaceString("db", "collection"),
BSON("id" << 1),
@@ -441,7 +486,7 @@ TEST_F(ReshardingMetricsTest, CurrentOpReportForRecipient) {
static_assert(kBytesToCopy >= kBytesCopied);
constexpr auto kDelayBeforeCloning = Seconds(2);
- getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
+ startOperation(ReshardingMetrics::Role::kRecipient);
advanceTime(kDelayBeforeCloning);
constexpr auto kTimeSpentCloning = Seconds(3);
@@ -458,7 +503,7 @@ TEST_F(ReshardingMetricsTest, CurrentOpReportForRecipient) {
durationCount<Seconds>(kTimeSpentCloning) + 2 * kTimeToCopyRemainingSeconds;
const ReshardingMetrics::ReporterOptions options(
- ReshardingMetrics::ReporterOptions::Role::kRecipient,
+ ReshardingMetrics::Role::kRecipient,
UUID::parse("12345678-1234-1234-1234-123456789def").getValue(),
NamespaceString("db", "collection"),
BSON("id" << 1),
@@ -506,12 +551,12 @@ TEST_F(ReshardingMetricsTest, CurrentOpReportForCoordinator) {
const auto kCoordinatorState = CoordinatorStateEnum::kInitializing;
const auto kSomeDuration = Seconds(10);
- getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
+ startOperation(ReshardingMetrics::Role::kCoordinator);
getMetrics()->setCoordinatorState(kCoordinatorState);
advanceTime(kSomeDuration);
const ReshardingMetrics::ReporterOptions options(
- ReshardingMetrics::ReporterOptions::Role::kCoordinator,
+ ReshardingMetrics::Role::kCoordinator,
UUID::parse("12345678-1234-1234-1234-123456789cba").getValue(),
NamespaceString("db", "collection"),
BSON("id" << 1),
@@ -544,7 +589,8 @@ TEST_F(ReshardingMetricsTest, CurrentOpReportForCoordinator) {
TEST_F(ReshardingMetricsTest, EstimatedRemainingOperationTimeCloning) {
// Copy N docs @ timePerDoc. Check the progression of the estimated time remaining.
auto m = getMetrics();
- m->onStart(getGlobalServiceContext()->getFastClockSource()->now());
+ m->onStart(ReshardingMetrics::Role::kRecipient,
+ getGlobalServiceContext()->getFastClockSource()->now());
auto timePerDocument = Seconds(2);
int64_t bytesPerDocument = 1024;
int64_t documentsToCopy = 409;
@@ -580,7 +626,8 @@ TEST_F(ReshardingMetricsTest, EstimatedRemainingOperationTimeCloning) {
TEST_F(ReshardingMetricsTest, EstimatedRemainingOperationTimeApplying) {
// Perform N ops @ timePerOp. Check the progression of the estimated time remaining.
auto m = getMetrics();
- m->onStart(getGlobalServiceContext()->getFastClockSource()->now());
+ m->onStart(ReshardingMetrics::Role::kRecipient,
+ getGlobalServiceContext()->getFastClockSource()->now());
m->setRecipientState(RecipientStateEnum::kApplying);
m->startApplyingOplogEntries(getGlobalServiceContext()->getFastClockSource()->now());
@@ -614,23 +661,21 @@ TEST_F(ReshardingMetricsTest, EstimatedRemainingOperationTimeApplying) {
TEST_F(ReshardingMetricsTest, CumulativeOpMetricsAccumulate) {
auto constexpr kTag = "documentsCopied";
- getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
+ startOperation(ReshardingMetrics::Role::kRecipient);
const auto kDocumentsToCopy1 = 2;
const auto kBytesToCopy1 = 200;
getMetrics()->setRecipientState(RecipientStateEnum::kCloning);
getMetrics()->onDocumentsCopied(kDocumentsToCopy1, kBytesToCopy1);
- getMetrics()->onCompletion(ReshardingOperationStatusEnum::kFailure,
- getGlobalServiceContext()->getFastClockSource()->now());
+ completeOperation(ReshardingMetrics::Role::kRecipient, ReshardingOperationStatusEnum::kFailure);
- getMetrics()->onStart(getGlobalServiceContext()->getFastClockSource()->now());
+ startOperation(ReshardingMetrics::Role::kRecipient);
const auto kDocumentsToCopy2 = 3;
const auto kBytesToCopy2 = 400;
getMetrics()->setRecipientState(RecipientStateEnum::kCloning);
getMetrics()->onDocumentsCopied(kDocumentsToCopy2, kBytesToCopy2);
- getMetrics()->onCompletion(ReshardingOperationStatusEnum::kFailure,
- getGlobalServiceContext()->getFastClockSource()->now());
+ completeOperation(ReshardingMetrics::Role::kRecipient, ReshardingOperationStatusEnum::kFailure);
checkMetrics(kTag,
kDocumentsToCopy1 + kDocumentsToCopy2,
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp
index 0d798bef5e5..8aa9b79b458 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp
@@ -155,7 +155,8 @@ public:
_cm = createChunkManagerForOriginalColl();
_metrics = std::make_unique<ReshardingMetrics>(getServiceContext());
- _metrics->onStart(getServiceContext()->getFastClockSource()->now());
+ _metrics->onStart(ReshardingMetrics::Role::kRecipient,
+ getServiceContext()->getFastClockSource()->now());
_metrics->setRecipientState(RecipientStateEnum::kApplying);
}
@@ -256,8 +257,7 @@ public:
long long metricsAppliedCount() const {
BSONObjBuilder bob;
- _metrics->serializeCurrentOpMetrics(&bob,
- ReshardingMetrics::ReporterOptions::Role::kRecipient);
+ _metrics->serializeCurrentOpMetrics(&bob, ReshardingMetrics::Role::kRecipient);
return bob.obj()["oplogEntriesApplied"_sd].Long();
}
diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp
index 6d66874a296..958ac588789 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp
@@ -98,7 +98,8 @@ public:
// Initialize ReshardingMetrics to a recipient state compatible with fetching.
_metrics = std::make_unique<ReshardingMetrics>(_svcCtx);
- _metrics->onStart(_svcCtx->getFastClockSource()->now());
+ _metrics->onStart(ReshardingMetrics::Role::kRecipient,
+ _svcCtx->getFastClockSource()->now());
_metrics->setRecipientState(RecipientStateEnum::kCloning);
for (const auto& shardId : kTwoShardIdList) {
@@ -314,8 +315,7 @@ public:
long long metricsFetchedCount() const {
BSONObjBuilder bob;
- _metrics->serializeCurrentOpMetrics(&bob,
- ReshardingMetrics::ReporterOptions::Role::kRecipient);
+ _metrics->serializeCurrentOpMetrics(&bob, ReshardingMetrics::Role::kRecipient);
return bob.obj()["oplogEntriesFetched"_sd].Long();
}
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
index f5cb343b747..67a3b3ad4c4 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
@@ -341,7 +341,8 @@ SemiFuture<void> ReshardingRecipientService::RecipientStateMachine::run(
_cancelableOpCtxFactory.emplace(abortToken, _markKilledExecutor);
return ExecutorFuture<void>(**executor)
- .then([this] { _metrics()->onStart(getCurrentTime()); })
+ .then(
+ [this] { _metrics()->onStart(ReshardingMetrics::Role::kRecipient, getCurrentTime()); })
.then([this, executor, abortToken] {
return _runUntilStrictConsistencyOrErrored(executor, abortToken);
})
@@ -386,7 +387,8 @@ SemiFuture<void> ReshardingRecipientService::RecipientStateMachine::run(
// Interrupt occured, ensure the metrics get shut down.
// TODO SERVER-56500: Don't use ReshardingOperationStatusEnum::kCanceled here if it
// is not meant for failover cases.
- _metrics()->onCompletion(ReshardingOperationStatusEnum::kCanceled,
+ _metrics()->onCompletion(ReshardingMetrics::Role::kRecipient,
+ ReshardingOperationStatusEnum::kCanceled,
getCurrentTime());
}
@@ -412,7 +414,7 @@ void ReshardingRecipientService::RecipientStateMachine::interrupt(Status status)
boost::optional<BSONObj> ReshardingRecipientService::RecipientStateMachine::reportForCurrentOp(
MongoProcessInterface::CurrentOpConnectionsMode,
MongoProcessInterface::CurrentOpSessionsMode) noexcept {
- ReshardingMetrics::ReporterOptions options(ReshardingMetrics::ReporterOptions::Role::kRecipient,
+ ReshardingMetrics::ReporterOptions options(ReshardingMetrics::Role::kRecipient,
_metadata.getReshardingUUID(),
_metadata.getSourceNss(),
_metadata.getReshardingKey().toBSON(),
@@ -901,10 +903,12 @@ void ReshardingRecipientService::RecipientStateMachine::_removeRecipientDocument
[this, aborted](boost::optional<Timestamp> unusedCommitTime) {
stdx::lock_guard<Latch> lk(_mutex);
if (aborted) {
- _metrics()->onCompletion(ReshardingOperationStatusEnum::kFailure,
+ _metrics()->onCompletion(ReshardingMetrics::Role::kRecipient,
+ ReshardingOperationStatusEnum::kFailure,
getCurrentTime());
} else {
- _metrics()->onCompletion(ReshardingOperationStatusEnum::kSuccess,
+ _metrics()->onCompletion(ReshardingMetrics::Role::kRecipient,
+ ReshardingOperationStatusEnum::kSuccess,
getCurrentTime());
}
diff --git a/src/mongo/db/s/resharding_test_commands.cpp b/src/mongo/db/s/resharding_test_commands.cpp
index 60fdbe5691f..5efc22d97d6 100644
--- a/src/mongo/db/s/resharding_test_commands.cpp
+++ b/src/mongo/db/s/resharding_test_commands.cpp
@@ -78,7 +78,8 @@ public:
};
ReshardingMetrics metrics(opCtx->getServiceContext());
- metrics.onStart(opCtx->getServiceContext()->getFastClockSource()->now());
+ metrics.onStart(ReshardingMetrics::Role::kRecipient,
+ opCtx->getServiceContext()->getFastClockSource()->now());
metrics.setRecipientState(RecipientStateEnum::kCloning);
auto hookList = std::make_unique<rpc::EgressMetadataHookList>();