summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/resharding/resharding_metrics.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/resharding/resharding_metrics.cpp')
-rw-r--r--src/mongo/db/s/resharding/resharding_metrics.cpp132
1 files changed, 102 insertions, 30 deletions
diff --git a/src/mongo/db/s/resharding/resharding_metrics.cpp b/src/mongo/db/s/resharding/resharding_metrics.cpp
index 2b01ae0e77f..ad766457587 100644
--- a/src/mongo/db/s/resharding/resharding_metrics.cpp
+++ b/src/mongo/db/s/resharding/resharding_metrics.cpp
@@ -89,22 +89,44 @@ ReshardingMetrics* ReshardingMetrics::get(ServiceContext* ctx) noexcept {
return getMetrics(ctx).get();
}
-void ReshardingMetrics::onStart(Date_t runningOperationStartTime) noexcept {
+void ReshardingMetrics::onStart(Role role, Date_t runningOperationStartTime) noexcept {
stdx::lock_guard<Latch> lk(_mutex);
// TODO Re-add this invariant once all breaking test cases have been fixed.
// invariant(!_currentOp.has_value(), kAnotherOperationInProgress);
+
+ if (!_currentOp) {
+ // Only incremement _started if this is the first time resharding metrics is being invoked
+ // for this resharding operation, and we're not restoring the PrimaryOnlyService from disk.
+ _started++;
+ }
+
// Create a new operation and record the time it started.
- _currentOp.emplace(_svcCtx->getFastClockSource());
- _currentOp->runningOperation.start(runningOperationStartTime);
- _currentOp->opStatus = ReshardingOperationStatusEnum::kRunning;
- _started++;
+ _emplaceCurrentOpForRole(role, runningOperationStartTime);
}
-void ReshardingMetrics::onCompletion(ReshardingOperationStatusEnum status,
+void ReshardingMetrics::onCompletion(Role role,
+ ReshardingOperationStatusEnum status,
Date_t runningOperationEndTime) noexcept {
stdx::lock_guard<Latch> lk(_mutex);
- // TODO Re-add this invariant once all breaking test cases have been fixed.
+ // TODO Re-add this invariant once all breaking test cases have been fixed. Add invariant that
+ // role being completed is a role that is in progress.
// invariant(_currentOp.has_value(), kNoOperationInProgress);
+
+ if (_currentOp->donorState && _currentOp->recipientState) {
+ switch (role) {
+ case Role::kDonor:
+ _currentOp->donorState = boost::none;
+ break;
+ case Role::kRecipient:
+ _currentOp->recipientState = boost::none;
+ break;
+ default:
+ MONGO_UNREACHABLE;
+ }
+
+ return;
+ }
+
switch (status) {
case ReshardingOperationStatusEnum::kSuccess:
_succeeded++;
@@ -125,10 +147,9 @@ void ReshardingMetrics::onCompletion(ReshardingOperationStatusEnum status,
_currentOp = boost::none;
}
-void ReshardingMetrics::onStepUp() noexcept {
+void ReshardingMetrics::onStepUp(Role role) noexcept {
stdx::lock_guard<Latch> lk(_mutex);
- invariant(!_currentOp.has_value(), kAnotherOperationInProgress);
- _currentOp.emplace(_svcCtx->getFastClockSource());
+ _emplaceCurrentOpForRole(role, boost::none);
// TODO SERVER-53913 Implement donor metrics rehydration.
// TODO SERVER-53914 Implement coordinator metrics rehydration.
@@ -136,13 +157,56 @@ void ReshardingMetrics::onStepUp() noexcept {
// TODO SERVER-57094 Resume the runningOperation duration from a timestamp stored on disk
// instead of starting from the current time.
- _currentOp->runningOperation.start(_svcCtx->getFastClockSource()->now());
- _currentOp->opStatus = ReshardingOperationStatusEnum::kRunning;
}
-void ReshardingMetrics::onStepDown() noexcept {
+void ReshardingMetrics::onStepDown(Role role) noexcept {
stdx::lock_guard<Latch> lk(_mutex);
- _currentOp = boost::none;
+ if (_currentOp && _currentOp->donorState && _currentOp->recipientState) {
+ switch (role) {
+ case Role::kDonor:
+ _currentOp->donorState = boost::none;
+ break;
+ case Role::kRecipient:
+ _currentOp->recipientState = boost::none;
+ break;
+ default:
+ MONGO_UNREACHABLE;
+ }
+ } else {
+ _currentOp = boost::none;
+ }
+}
+
+void ReshardingMetrics::_emplaceCurrentOpForRole(
+ Role role, boost::optional<Date_t> runningOperationStartTime) noexcept {
+ // Invariants in this function ensure that the only multi-role state allowed is a combination
+ // of donor and recipient.
+ if (!_currentOp) {
+ _currentOp.emplace(_svcCtx->getFastClockSource());
+ _currentOp->runningOperation.start(runningOperationStartTime
+ ? *runningOperationStartTime
+ : _svcCtx->getFastClockSource()->now());
+ _currentOp->opStatus = ReshardingOperationStatusEnum::kRunning;
+ } else {
+ invariant(role != Role::kCoordinator, kAnotherOperationInProgress);
+ invariant(!_currentOp->coordinatorState, kAnotherOperationInProgress);
+ }
+
+ switch (role) {
+ case Role::kCoordinator:
+ _currentOp->coordinatorState.emplace(CoordinatorStateEnum::kUnused);
+ break;
+ case Role::kDonor:
+ invariant(!_currentOp->donorState, kAnotherOperationInProgress);
+ _currentOp->donorState.emplace(DonorStateEnum::kUnused);
+ break;
+ case Role::kRecipient:
+ invariant(!_currentOp->recipientState, kAnotherOperationInProgress);
+ _currentOp->recipientState.emplace(RecipientStateEnum::kUnused);
+ break;
+ default:
+ MONGO_UNREACHABLE
+ }
}
void ReshardingMetrics::setDonorState(DonorStateEnum state) noexcept {
@@ -167,12 +231,21 @@ void ReshardingMetrics::setCoordinatorState(CoordinatorStateEnum state) noexcept
_currentOp->coordinatorState = state;
}
-static StringData serializeState(RecipientStateEnum e) {
- return RecipientState_serializer(e);
+// TODO SERVER-57217 Remove special-casing for the non-existence of the boost::optional.
+static StringData serializeState(boost::optional<RecipientStateEnum> e) {
+ return e ? RecipientState_serializer(*e)
+ : RecipientState_serializer(RecipientStateEnum::kUnused);
+}
+
+// TODO SERVER-57217 Remove special-casing for the non-existence of the boost::optional.
+static StringData serializeState(boost::optional<DonorStateEnum> e) {
+ return e ? DonorState_serializer(*e) : DonorState_serializer(DonorStateEnum::kUnused);
}
-static StringData serializeState(DonorStateEnum e) {
- return DonorState_serializer(e);
+// TODO SERVER-57217 Remove special-casing for the non-existence of the boost::optional.
+static StringData serializeState(boost::optional<CoordinatorStateEnum> e) {
+ return e ? CoordinatorState_serializer(*e)
+ : CoordinatorState_serializer(CoordinatorStateEnum::kUnused);
}
template <typename T>
@@ -209,7 +282,7 @@ void ReshardingMetrics::onDocumentsCopied(int64_t documents, int64_t bytes) noex
if (!_currentOp)
return;
- invariant(checkState(_currentOp->recipientState,
+ invariant(checkState(*_currentOp->recipientState,
{RecipientStateEnum::kCloning, RecipientStateEnum::kError}));
_currentOp->documentsCopied += documents;
@@ -254,7 +327,7 @@ void ReshardingMetrics::onOplogEntriesFetched(int64_t entries) noexcept {
return;
invariant(checkState(
- _currentOp->recipientState,
+ *_currentOp->recipientState,
{RecipientStateEnum::kCloning, RecipientStateEnum::kApplying, RecipientStateEnum::kError}));
_currentOp->oplogEntriesFetched += entries;
@@ -266,7 +339,7 @@ void ReshardingMetrics::onOplogEntriesApplied(int64_t entries) noexcept {
if (!_currentOp)
return;
- invariant(checkState(_currentOp->recipientState,
+ invariant(checkState(*_currentOp->recipientState,
{RecipientStateEnum::kApplying, RecipientStateEnum::kError}));
_currentOp->oplogEntriesApplied += entries;
@@ -278,7 +351,7 @@ void ReshardingMetrics::onWriteDuringCriticalSection(int64_t writes) noexcept {
if (!_currentOp)
return;
- invariant(checkState(_currentOp->donorState,
+ invariant(checkState(*_currentOp->donorState,
{DonorStateEnum::kDonatingOplogEntries,
DonorStateEnum::kBlockingWrites,
DonorStateEnum::kError}));
@@ -333,7 +406,7 @@ void ReshardingMetrics::OperationMetrics::appendCurrentOpMetrics(BSONObjBuilder*
case Role::kDonor:
bob->append(kWritesDuringCritialSection, writesDuringCriticalSection);
bob->append(kCriticalSectionTimeElapsed, getElapsedTime(inCriticalSection));
- bob->append(kDonorState, DonorState_serializer(donorState));
+ bob->append(kDonorState, serializeState(donorState));
bob->append(kOpStatus, ReshardingOperationStatus_serializer(opStatus));
break;
case Role::kRecipient:
@@ -346,11 +419,11 @@ void ReshardingMetrics::OperationMetrics::appendCurrentOpMetrics(BSONObjBuilder*
bob->append(kOplogsFetched, oplogEntriesFetched);
bob->append(kOplogsApplied, oplogEntriesApplied);
bob->append(kApplyTimeElapsed, getElapsedTime(applyingOplogEntries));
- bob->append(kRecipientState, RecipientState_serializer(recipientState));
+ bob->append(kRecipientState, serializeState(recipientState));
bob->append(kOpStatus, ReshardingOperationStatus_serializer(opStatus));
break;
case Role::kCoordinator:
- bob->append(kCoordinatorState, CoordinatorState_serializer(coordinatorState));
+ bob->append(kCoordinatorState, serializeState(coordinatorState));
bob->append(kOpStatus, ReshardingOperationStatus_serializer(opStatus));
break;
default:
@@ -358,8 +431,7 @@ void ReshardingMetrics::OperationMetrics::appendCurrentOpMetrics(BSONObjBuilder*
}
}
-void ReshardingMetrics::serializeCurrentOpMetrics(BSONObjBuilder* bob,
- ReporterOptions::Role role) const {
+void ReshardingMetrics::serializeCurrentOpMetrics(BSONObjBuilder* bob, Role role) const {
stdx::lock_guard<Latch> lk(_mutex);
if (_currentOp)
_currentOp->appendCurrentOpMetrics(bob, role);
@@ -368,11 +440,11 @@ void ReshardingMetrics::serializeCurrentOpMetrics(BSONObjBuilder* bob,
BSONObj ReshardingMetrics::reportForCurrentOp(const ReporterOptions& options) const noexcept {
const auto role = [&options] {
switch (options.role) {
- case ReporterOptions::Role::kDonor:
+ case Role::kDonor:
return "Donor";
- case ReporterOptions::Role::kRecipient:
+ case Role::kRecipient:
return "Recipient";
- case ReporterOptions::Role::kCoordinator:
+ case Role::kCoordinator:
return "Coordinator";
default:
MONGO_UNREACHABLE;