diff options
Diffstat (limited to 'src/mongo/db/s/resharding/resharding_metrics.cpp')
-rw-r--r-- | src/mongo/db/s/resharding/resharding_metrics.cpp | 132 |
1 files changed, 102 insertions, 30 deletions
diff --git a/src/mongo/db/s/resharding/resharding_metrics.cpp b/src/mongo/db/s/resharding/resharding_metrics.cpp index 2b01ae0e77f..ad766457587 100644 --- a/src/mongo/db/s/resharding/resharding_metrics.cpp +++ b/src/mongo/db/s/resharding/resharding_metrics.cpp @@ -89,22 +89,44 @@ ReshardingMetrics* ReshardingMetrics::get(ServiceContext* ctx) noexcept { return getMetrics(ctx).get(); } -void ReshardingMetrics::onStart(Date_t runningOperationStartTime) noexcept { +void ReshardingMetrics::onStart(Role role, Date_t runningOperationStartTime) noexcept { stdx::lock_guard<Latch> lk(_mutex); // TODO Re-add this invariant once all breaking test cases have been fixed. // invariant(!_currentOp.has_value(), kAnotherOperationInProgress); + + if (!_currentOp) { + // Only incremement _started if this is the first time resharding metrics is being invoked + // for this resharding operation, and we're not restoring the PrimaryOnlyService from disk. + _started++; + } + // Create a new operation and record the time it started. - _currentOp.emplace(_svcCtx->getFastClockSource()); - _currentOp->runningOperation.start(runningOperationStartTime); - _currentOp->opStatus = ReshardingOperationStatusEnum::kRunning; - _started++; + _emplaceCurrentOpForRole(role, runningOperationStartTime); } -void ReshardingMetrics::onCompletion(ReshardingOperationStatusEnum status, +void ReshardingMetrics::onCompletion(Role role, + ReshardingOperationStatusEnum status, Date_t runningOperationEndTime) noexcept { stdx::lock_guard<Latch> lk(_mutex); - // TODO Re-add this invariant once all breaking test cases have been fixed. + // TODO Re-add this invariant once all breaking test cases have been fixed. Add invariant that + // role being completed is a role that is in progress. // invariant(_currentOp.has_value(), kNoOperationInProgress); + + if (_currentOp->donorState && _currentOp->recipientState) { + switch (role) { + case Role::kDonor: + _currentOp->donorState = boost::none; + break; + case Role::kRecipient: + _currentOp->recipientState = boost::none; + break; + default: + MONGO_UNREACHABLE; + } + + return; + } + switch (status) { case ReshardingOperationStatusEnum::kSuccess: _succeeded++; @@ -125,10 +147,9 @@ void ReshardingMetrics::onCompletion(ReshardingOperationStatusEnum status, _currentOp = boost::none; } -void ReshardingMetrics::onStepUp() noexcept { +void ReshardingMetrics::onStepUp(Role role) noexcept { stdx::lock_guard<Latch> lk(_mutex); - invariant(!_currentOp.has_value(), kAnotherOperationInProgress); - _currentOp.emplace(_svcCtx->getFastClockSource()); + _emplaceCurrentOpForRole(role, boost::none); // TODO SERVER-53913 Implement donor metrics rehydration. // TODO SERVER-53914 Implement coordinator metrics rehydration. @@ -136,13 +157,56 @@ void ReshardingMetrics::onStepUp() noexcept { // TODO SERVER-57094 Resume the runningOperation duration from a timestamp stored on disk // instead of starting from the current time. - _currentOp->runningOperation.start(_svcCtx->getFastClockSource()->now()); - _currentOp->opStatus = ReshardingOperationStatusEnum::kRunning; } -void ReshardingMetrics::onStepDown() noexcept { +void ReshardingMetrics::onStepDown(Role role) noexcept { stdx::lock_guard<Latch> lk(_mutex); - _currentOp = boost::none; + if (_currentOp && _currentOp->donorState && _currentOp->recipientState) { + switch (role) { + case Role::kDonor: + _currentOp->donorState = boost::none; + break; + case Role::kRecipient: + _currentOp->recipientState = boost::none; + break; + default: + MONGO_UNREACHABLE; + } + } else { + _currentOp = boost::none; + } +} + +void ReshardingMetrics::_emplaceCurrentOpForRole( + Role role, boost::optional<Date_t> runningOperationStartTime) noexcept { + // Invariants in this function ensure that the only multi-role state allowed is a combination + // of donor and recipient. + if (!_currentOp) { + _currentOp.emplace(_svcCtx->getFastClockSource()); + _currentOp->runningOperation.start(runningOperationStartTime + ? *runningOperationStartTime + : _svcCtx->getFastClockSource()->now()); + _currentOp->opStatus = ReshardingOperationStatusEnum::kRunning; + } else { + invariant(role != Role::kCoordinator, kAnotherOperationInProgress); + invariant(!_currentOp->coordinatorState, kAnotherOperationInProgress); + } + + switch (role) { + case Role::kCoordinator: + _currentOp->coordinatorState.emplace(CoordinatorStateEnum::kUnused); + break; + case Role::kDonor: + invariant(!_currentOp->donorState, kAnotherOperationInProgress); + _currentOp->donorState.emplace(DonorStateEnum::kUnused); + break; + case Role::kRecipient: + invariant(!_currentOp->recipientState, kAnotherOperationInProgress); + _currentOp->recipientState.emplace(RecipientStateEnum::kUnused); + break; + default: + MONGO_UNREACHABLE + } } void ReshardingMetrics::setDonorState(DonorStateEnum state) noexcept { @@ -167,12 +231,21 @@ void ReshardingMetrics::setCoordinatorState(CoordinatorStateEnum state) noexcept _currentOp->coordinatorState = state; } -static StringData serializeState(RecipientStateEnum e) { - return RecipientState_serializer(e); +// TODO SERVER-57217 Remove special-casing for the non-existence of the boost::optional. +static StringData serializeState(boost::optional<RecipientStateEnum> e) { + return e ? RecipientState_serializer(*e) + : RecipientState_serializer(RecipientStateEnum::kUnused); +} + +// TODO SERVER-57217 Remove special-casing for the non-existence of the boost::optional. +static StringData serializeState(boost::optional<DonorStateEnum> e) { + return e ? DonorState_serializer(*e) : DonorState_serializer(DonorStateEnum::kUnused); } -static StringData serializeState(DonorStateEnum e) { - return DonorState_serializer(e); +// TODO SERVER-57217 Remove special-casing for the non-existence of the boost::optional. +static StringData serializeState(boost::optional<CoordinatorStateEnum> e) { + return e ? CoordinatorState_serializer(*e) + : CoordinatorState_serializer(CoordinatorStateEnum::kUnused); } template <typename T> @@ -209,7 +282,7 @@ void ReshardingMetrics::onDocumentsCopied(int64_t documents, int64_t bytes) noex if (!_currentOp) return; - invariant(checkState(_currentOp->recipientState, + invariant(checkState(*_currentOp->recipientState, {RecipientStateEnum::kCloning, RecipientStateEnum::kError})); _currentOp->documentsCopied += documents; @@ -254,7 +327,7 @@ void ReshardingMetrics::onOplogEntriesFetched(int64_t entries) noexcept { return; invariant(checkState( - _currentOp->recipientState, + *_currentOp->recipientState, {RecipientStateEnum::kCloning, RecipientStateEnum::kApplying, RecipientStateEnum::kError})); _currentOp->oplogEntriesFetched += entries; @@ -266,7 +339,7 @@ void ReshardingMetrics::onOplogEntriesApplied(int64_t entries) noexcept { if (!_currentOp) return; - invariant(checkState(_currentOp->recipientState, + invariant(checkState(*_currentOp->recipientState, {RecipientStateEnum::kApplying, RecipientStateEnum::kError})); _currentOp->oplogEntriesApplied += entries; @@ -278,7 +351,7 @@ void ReshardingMetrics::onWriteDuringCriticalSection(int64_t writes) noexcept { if (!_currentOp) return; - invariant(checkState(_currentOp->donorState, + invariant(checkState(*_currentOp->donorState, {DonorStateEnum::kDonatingOplogEntries, DonorStateEnum::kBlockingWrites, DonorStateEnum::kError})); @@ -333,7 +406,7 @@ void ReshardingMetrics::OperationMetrics::appendCurrentOpMetrics(BSONObjBuilder* case Role::kDonor: bob->append(kWritesDuringCritialSection, writesDuringCriticalSection); bob->append(kCriticalSectionTimeElapsed, getElapsedTime(inCriticalSection)); - bob->append(kDonorState, DonorState_serializer(donorState)); + bob->append(kDonorState, serializeState(donorState)); bob->append(kOpStatus, ReshardingOperationStatus_serializer(opStatus)); break; case Role::kRecipient: @@ -346,11 +419,11 @@ void ReshardingMetrics::OperationMetrics::appendCurrentOpMetrics(BSONObjBuilder* bob->append(kOplogsFetched, oplogEntriesFetched); bob->append(kOplogsApplied, oplogEntriesApplied); bob->append(kApplyTimeElapsed, getElapsedTime(applyingOplogEntries)); - bob->append(kRecipientState, RecipientState_serializer(recipientState)); + bob->append(kRecipientState, serializeState(recipientState)); bob->append(kOpStatus, ReshardingOperationStatus_serializer(opStatus)); break; case Role::kCoordinator: - bob->append(kCoordinatorState, CoordinatorState_serializer(coordinatorState)); + bob->append(kCoordinatorState, serializeState(coordinatorState)); bob->append(kOpStatus, ReshardingOperationStatus_serializer(opStatus)); break; default: @@ -358,8 +431,7 @@ void ReshardingMetrics::OperationMetrics::appendCurrentOpMetrics(BSONObjBuilder* } } -void ReshardingMetrics::serializeCurrentOpMetrics(BSONObjBuilder* bob, - ReporterOptions::Role role) const { +void ReshardingMetrics::serializeCurrentOpMetrics(BSONObjBuilder* bob, Role role) const { stdx::lock_guard<Latch> lk(_mutex); if (_currentOp) _currentOp->appendCurrentOpMetrics(bob, role); @@ -368,11 +440,11 @@ void ReshardingMetrics::serializeCurrentOpMetrics(BSONObjBuilder* bob, BSONObj ReshardingMetrics::reportForCurrentOp(const ReporterOptions& options) const noexcept { const auto role = [&options] { switch (options.role) { - case ReporterOptions::Role::kDonor: + case Role::kDonor: return "Donor"; - case ReporterOptions::Role::kRecipient: + case Role::kRecipient: return "Recipient"; - case ReporterOptions::Role::kCoordinator: + case Role::kCoordinator: return "Coordinator"; default: MONGO_UNREACHABLE; |