diff options
author | Andrew Witten <andrew.witten@mongodb.com> | 2022-08-11 14:01:43 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-08-11 14:31:47 +0000 |
commit | bbc0bb5e9878e48b6bc3b666affbdf102b379450 (patch) | |
tree | 479875f04f1bffc34185e4dc6b43cef4db067623 /src/mongo | |
parent | 2f27abe1eaad20074706fd740dcde445001201a6 (diff) | |
download | mongo-bbc0bb5e9878e48b6bc3b666affbdf102b379450.tar.gz |
SERVER-67650 persist additional recipient resharding metrics
Diffstat (limited to 'src/mongo')
6 files changed, 163 insertions, 43 deletions
diff --git a/src/mongo/db/s/resharding/recipient_document.idl b/src/mongo/db/s/resharding/recipient_document.idl index e3128de0db5..1eda4620b8c 100644 --- a/src/mongo/db/s/resharding/recipient_document.idl +++ b/src/mongo/db/s/resharding/recipient_document.idl @@ -82,6 +82,11 @@ structs: startConfigTxnCloneTime: type: date optional: true + approxBytesToCopy: + type: long + description: >- + Approximate number of bytes to copy during cloning + optional: true metrics: type: ReshardingRecipientMetrics description: "Metrics related to this recipient." diff --git a/src/mongo/db/s/resharding/resharding_metrics.cpp b/src/mongo/db/s/resharding/resharding_metrics.cpp index 6e4e4e041e9..48576c26f45 100644 --- a/src/mongo/db/s/resharding/resharding_metrics.cpp +++ b/src/mongo/db/s/resharding/resharding_metrics.cpp @@ -406,6 +406,60 @@ void ReshardingMetrics::onStepUp(Role role) noexcept { // instead of starting from the current time. } +void ReshardingMetrics::onStepUp(RecipientStateEnum state, + const ReshardingRecipientCountsAndMetrics& recipientMetrics) { + stdx::lock_guard<Latch> lk(_mutex); + + _emplaceCurrentOpForRole(Role::kRecipient, boost::none); + _onStepUpCalled = true; + + invariant(_currentOp, kNoOperationInProgress); + invariant(_currentOp->documentsCopied == 0, kMetricsSetBeforeRestore); + invariant(_currentOp->bytesCopied == 0, kMetricsSetBeforeRestore); + invariant(_currentOp->oplogEntriesFetched == 0, kMetricsSetBeforeRestore); + invariant(_currentOp->oplogEntriesApplied == 0, kMetricsSetBeforeRestore); + + _currentOp->recipientState = state; + _currentOp->documentsCopied = recipientMetrics.documentCountCopied; + _currentOp->bytesCopied = recipientMetrics.documentBytesCopied; + _currentOp->oplogEntriesFetched = recipientMetrics.oplogEntriesFetched; + _currentOp->oplogEntriesApplied = recipientMetrics.oplogEntriesApplied; + + if (recipientMetrics.approxBytesToCopy) + _currentOp->bytesToCopy = recipientMetrics.approxBytesToCopy.get(); + + + const auto& timeIntervals = recipientMetrics.metrics; + + // Restore in memory state of document copy metrics. + // Not calling startCopyingDocuments or endCopyingDocuments because they acquire a mutex that we + // already have. + // + // Also, note that it is possible for documentCopyInterval->getStart() to be none and for + // documentCopyInterval->getStop() to be not none. That can happen if the cluster is upgraded + // to include code for persisting time intervals during a resharding operation. + // In that case, restore neither the start nor stop time. The resharding coordinator will still + // treat this scenario as the recipient shard being completely caught up after a primary + // failover and engage the critical section too early. + const auto& documentCopyInterval = timeIntervals.getDocumentCopy(); + if (documentCopyInterval && documentCopyInterval->getStart()) { + _currentOp->copyingDocuments.start(documentCopyInterval->getStart().get()); + if (documentCopyInterval->getStop()) { + _currentOp->copyingDocuments.end(documentCopyInterval->getStop().get()); + } + } + // Restore in memory state of oplog application metrics. + // Not calling startApplyingOplogEntries or endApplyingOplogEntries because they acquire a mutex + // that we already have. + const auto& oplogApplicationInterval = timeIntervals.getOplogApplication(); + if (oplogApplicationInterval && oplogApplicationInterval->getStart()) { + _currentOp->applyingOplogEntries.start(oplogApplicationInterval->getStart().get()); + if (oplogApplicationInterval->getStop()) { + _currentOp->applyingOplogEntries.end(oplogApplicationInterval->getStop().get()); + } + } +} + void ReshardingMetrics::onStepUp(DonorStateEnum state, ReshardingDonorMetrics donorMetrics) { stdx::lock_guard<Latch> lk(_mutex); auto operationRuntime = donorMetrics.getOperationRuntime(); @@ -662,22 +716,6 @@ void ReshardingMetrics::onOplogEntriesApplied(int64_t entries) noexcept { _cumulativeOp->oplogEntriesApplied += entries; } -void ReshardingMetrics::restoreForCurrentOp(int64_t documentCountCopied, - int64_t documentBytesCopied, - int64_t oplogEntriesFetched, - int64_t oplogEntriesApplied) noexcept { - invariant(_currentOp, kNoOperationInProgress); - invariant(_currentOp->documentsCopied == 0, kMetricsSetBeforeRestore); - invariant(_currentOp->bytesCopied == 0, kMetricsSetBeforeRestore); - invariant(_currentOp->oplogEntriesFetched == 0, kMetricsSetBeforeRestore); - invariant(_currentOp->oplogEntriesApplied == 0, kMetricsSetBeforeRestore); - - _currentOp->documentsCopied = documentCountCopied; - _currentOp->bytesCopied = documentBytesCopied; - _currentOp->oplogEntriesFetched = oplogEntriesFetched; - _currentOp->oplogEntriesApplied = oplogEntriesApplied; -} - void ReshardingMetrics::onWriteDuringCriticalSection(int64_t writes) noexcept { stdx::lock_guard<Latch> lk(_mutex); if (!_currentOp) diff --git a/src/mongo/db/s/resharding/resharding_metrics.h b/src/mongo/db/s/resharding/resharding_metrics.h index a6964c9d611..9ba80d35417 100644 --- a/src/mongo/db/s/resharding/resharding_metrics.h +++ b/src/mongo/db/s/resharding/resharding_metrics.h @@ -35,6 +35,7 @@ #include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/namespace_string.h" #include "mongo/db/s/resharding/donor_document_gen.h" +#include "mongo/db/s/resharding/recipient_document_gen.h" #include "mongo/db/service_context.h" #include "mongo/platform/mutex.h" #include "mongo/s/resharding/common_types_gen.h" @@ -71,6 +72,29 @@ public: void onStepUp(DonorStateEnum state, ReshardingDonorMetrics donorMetrics); + struct ReshardingRecipientCountsAndMetrics { + ReshardingRecipientCountsAndMetrics(int64_t documentCountCopied, + int64_t documentBytesCopied, + int64_t oplogEntriesFetched, + int64_t oplogEntriesApplied, + boost::optional<int64_t> approxBytesToCopy, + ReshardingRecipientMetrics metrics) + : documentCountCopied{documentCountCopied}, + documentBytesCopied{documentBytesCopied}, + oplogEntriesFetched{oplogEntriesFetched}, + oplogEntriesApplied{oplogEntriesApplied}, + approxBytesToCopy{approxBytesToCopy}, + metrics{metrics} {} + int64_t documentCountCopied; + int64_t documentBytesCopied; + int64_t oplogEntriesFetched; + int64_t oplogEntriesApplied; + boost::optional<int64_t> approxBytesToCopy; + ReshardingRecipientMetrics metrics; + }; + + void onStepUp(RecipientStateEnum, const ReshardingRecipientCountsAndMetrics&); + // So long as a resharding operation is in progress, the following may be used to update the // state of a donor, a recipient, and a coordinator, respectively. void setDonorState(DonorStateEnum) noexcept; @@ -113,11 +137,6 @@ public: // Allows restoring "oplog entries to apply" metrics. void onOplogEntriesApplied(int64_t entries) noexcept; - void restoreForCurrentOp(int64_t documentCountCopied, - int64_t documentBytesCopied, - int64_t oplogEntriesFetched, - int64_t oplogEntriesApplied) noexcept; - // Allows tracking writes during a critical section when the donor's state is either of // "donating-oplog-entries" or "blocking-writes". void onWriteDuringCriticalSection(int64_t writes) noexcept; diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp index ccfd5fefdea..7c07564a688 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp @@ -145,6 +145,8 @@ ReshardingRecipientService::RecipientStateMachine::RecipientStateMachine( _recipientCtx{recipientDoc.getMutableState()}, _donorShards{recipientDoc.getDonorShards()}, _cloneTimestamp{recipientDoc.getCloneTimestamp()}, + _timeIntervals{recipientDoc.getMetrics().get_value_or({})}, + _approxBytesToCopy{recipientDoc.getApproxBytesToCopy()}, _externalState{std::move(externalState)}, _startConfigTxnCloneAt{recipientDoc.getStartConfigTxnCloneTime()}, _markKilledExecutor(std::make_shared<ThreadPool>([] { @@ -166,6 +168,7 @@ ReshardingRecipientService::RecipientStateMachine::RecipientStateMachine( return donor.getShardId() == myShardId; }) != _donorShards.end(); }()) { + invariant(_externalState); } @@ -813,27 +816,53 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionToCloning( const CancelableOperationContextFactory& factory) { auto newRecipientCtx = _recipientCtx; newRecipientCtx.setState(RecipientStateEnum::kCloning); + auto cloningStartTime = getCurrentTime(); + + // Record cloning start time. + ReshardingMetricsTimeInterval interval; + interval.setStart(cloningStartTime); + _timeIntervals.setDocumentCopy(interval); + _transitionState(std::move(newRecipientCtx), boost::none, boost::none, factory); - _metrics()->startCopyingDocuments(getCurrentTime()); + _metrics()->startCopyingDocuments(cloningStartTime); } void ReshardingRecipientService::RecipientStateMachine::_transitionToApplying( const CancelableOperationContextFactory& factory) { auto newRecipientCtx = _recipientCtx; newRecipientCtx.setState(RecipientStateEnum::kApplying); + auto oplogApplicationStartTime = getCurrentTime(); + + // Record oplog application start time. + ReshardingMetricsTimeInterval interval; + interval.setStart(oplogApplicationStartTime); + _timeIntervals.setOplogApplication(interval); + + // Record document copy stop time. + ReshardingMetricsTimeInterval documentCopy{_timeIntervals.getDocumentCopy().get_value_or({})}; + documentCopy.setStop(oplogApplicationStartTime); + _timeIntervals.setDocumentCopy(documentCopy); + _transitionState(std::move(newRecipientCtx), boost::none, boost::none, factory); - auto currentTime = getCurrentTime(); - _metrics()->endCopyingDocuments(currentTime); - _metrics()->startApplyingOplogEntries(currentTime); + _metrics()->endCopyingDocuments(oplogApplicationStartTime); + _metrics()->startApplyingOplogEntries(oplogApplicationStartTime); } void ReshardingRecipientService::RecipientStateMachine::_transitionToStrictConsistency( const CancelableOperationContextFactory& factory) { auto newRecipientCtx = _recipientCtx; newRecipientCtx.setState(RecipientStateEnum::kStrictConsistency); + auto oplogApplicationStopTime = getCurrentTime(); + + // Record oplog application stop time + ReshardingMetricsTimeInterval oplogApplication{ + _timeIntervals.getOplogApplication().get_value_or({})}; + oplogApplication.setStop(oplogApplicationStopTime); + _timeIntervals.setOplogApplication(oplogApplication); + + _transitionState(std::move(newRecipientCtx), boost::none, boost::none, factory); - auto currentTime = getCurrentTime(); - _metrics()->endApplyingOplogEntries(currentTime); + _metrics()->endApplyingOplogEntries(oplogApplicationStopTime); } void ReshardingRecipientService::RecipientStateMachine::_transitionToError( @@ -985,6 +1014,9 @@ void ReshardingRecipientService::RecipientStateMachine::_updateRecipientDocument setBuilder.append(ReshardingRecipientDocument::kDonorShardsFieldName, donorShardsArrayBuilder.arr()); + + setBuilder.append(ReshardingRecipientDocument::kApproxBytesToCopyFieldName, + cloneDetails->approxBytesToCopy); } if (configStartTime) { @@ -992,6 +1024,8 @@ void ReshardingRecipientService::RecipientStateMachine::_updateRecipientDocument *configStartTime); } + setBuilder.append(ReshardingRecipientDocument::kMetricsFieldName, _timeIntervals.toBSON()); + setBuilder.doneFast(); } @@ -1009,6 +1043,7 @@ void ReshardingRecipientService::RecipientStateMachine::_updateRecipientDocument if (cloneDetails) { _cloneTimestamp = cloneDetails->cloneTimestamp; _donorShards = std::move(cloneDetails->donorShards); + _approxBytesToCopy = cloneDetails->approxBytesToCopy; } if (configStartTime) { @@ -1068,7 +1103,6 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_startMe const std::shared_ptr<executor::ScopedTaskExecutor>& executor, const CancellationToken& abortToken) { if (_recipientCtx.getState() > RecipientStateEnum::kAwaitingFetchTimestamp) { - _metrics()->onStepUp(ReshardingMetrics::Role::kRecipient); return _restoreMetricsWithRetry(executor, abortToken); } _metrics()->onStart(ReshardingMetrics::Role::kRecipient, getCurrentTime()); @@ -1078,7 +1112,6 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_startMe ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_restoreMetricsWithRetry( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, const CancellationToken& abortToken) { - _metrics()->setRecipientState(_recipientCtx.getState()); return _retryingCancelableOpCtxFactory ->withAutomaticRetry( [this, executor, abortToken](const auto& factory) { _restoreMetrics(factory); }) @@ -1144,8 +1177,13 @@ void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics( } } - _metrics()->restoreForCurrentOp( - documentCountCopied, documentBytesCopied, oplogEntriesFetched, oplogEntriesApplied); + _metrics()->onStepUp(_recipientCtx.getState(), + ReshardingMetrics::ReshardingRecipientCountsAndMetrics{documentCountCopied, + documentBytesCopied, + oplogEntriesFetched, + oplogEntriesApplied, + _approxBytesToCopy, + _timeIntervals}); } CancellationToken ReshardingRecipientService::RecipientStateMachine::_initAbortSource( diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h index e1fe504a970..86d3236b9c9 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.h +++ b/src/mongo/db/s/resharding/resharding_recipient_service.h @@ -288,6 +288,8 @@ private: RecipientShardContext _recipientCtx; std::vector<DonorShardFetchTimestamp> _donorShards; boost::optional<Timestamp> _cloneTimestamp; + ReshardingRecipientMetrics _timeIntervals; + boost::optional<int64_t> _approxBytesToCopy; const std::unique_ptr<RecipientStateMachineExternalState> _externalState; diff --git a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp index 43ccab32bec..b243a433995 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp @@ -50,8 +50,10 @@ #include "mongo/db/s/resharding/resharding_service_test_helpers.h" #include "mongo/logv2/log.h" #include "mongo/unittest/death_test.h" +#include "mongo/util/clock_source_mock.h" #include "mongo/util/fail_point.h" + namespace mongo { namespace { @@ -208,6 +210,8 @@ public: */ class ReshardingRecipientServiceTest : public repl::PrimaryOnlyServiceMongoDTest { public: + ReshardingRecipientServiceTest() : PrimaryOnlyServiceMongoDTest(Options{}.useMockClock(true)) {} + using RecipientStateMachine = ReshardingRecipientService::RecipientStateMachine; std::unique_ptr<repl::PrimaryOnlyService> makeService(ServiceContext* serviceContext) override { @@ -222,7 +226,6 @@ public: repl::DropPendingCollectionReaper::set( serviceContext, std::make_unique<repl::DropPendingCollectionReaper>(storageMock.get())); repl::StorageInterface::set(serviceContext, std::move(storageMock)); - _controller = std::make_shared<RecipientStateTransitionController>(); _opObserverRegistry->addObserver(std::make_unique<RecipientOpObserverForTest>(_controller)); } @@ -816,22 +819,33 @@ TEST_F(ReshardingRecipientServiceTest, RestoreMetricsAfterStepUp) { } // Step down before the transition to state can complete. stateTransitionsGuard.wait(state); - if (state == RecipientStateEnum::kStrictConsistency) { - auto currOp = recipient - ->reportForCurrentOp( - MongoProcessInterface::CurrentOpConnectionsMode::kExcludeIdle, - MongoProcessInterface::CurrentOpSessionsMode::kExcludeIdle) - .get(); + + dynamic_cast<ClockSourceMock*>(getServiceContext()->getFastClockSource()) + ->advance(Seconds(1)); + auto currOp = + recipient + ->reportForCurrentOp(MongoProcessInterface::CurrentOpConnectionsMode::kExcludeIdle, + MongoProcessInterface::CurrentOpSessionsMode::kExcludeIdle) + .get(); + + + if (state == RecipientStateEnum::kApplying) { + ASSERT_EQ(currOp.getField("totalApplyTimeElapsedSecs").Long(), 0); + ASSERT_EQ(currOp.getStringField("recipientState"), + RecipientState_serializer(RecipientStateEnum::kCloning)); + ASSERT_GT(currOp.getField("totalCopyTimeElapsedSecs").Long(), 0); + ASSERT_GT(currOp.getField("approxBytesToCopy").Long(), 0); + + } else if (state == RecipientStateEnum::kStrictConsistency) { ASSERT_EQ(currOp.getField("documentsCopied").Long(), 1L); ASSERT_EQ(currOp.getField("bytesCopied").Long(), (long)reshardedDoc.objsize()); ASSERT_EQ(currOp.getStringField("recipientState"), RecipientState_serializer(RecipientStateEnum::kApplying)); + ASSERT_GT(currOp.getField("totalCopyTimeElapsedSecs").Long(), 0); + ASSERT_GT(currOp.getField("totalApplyTimeElapsedSecs").Long(), 0); + ASSERT_GT(currOp.getField("approxBytesToCopy").Long(), 0); + } else if (state == RecipientStateEnum::kDone) { - auto currOp = recipient - ->reportForCurrentOp( - MongoProcessInterface::CurrentOpConnectionsMode::kExcludeIdle, - MongoProcessInterface::CurrentOpSessionsMode::kExcludeIdle) - .get(); ASSERT_EQ(currOp.getField("documentsCopied").Long(), 1L); ASSERT_EQ(currOp.getField("bytesCopied").Long(), (long)reshardedDoc.objsize()); ASSERT_EQ(currOp.getField("oplogEntriesFetched").Long(), @@ -840,7 +854,11 @@ TEST_F(ReshardingRecipientServiceTest, RestoreMetricsAfterStepUp) { oplogEntriesAppliedOnEachDonor * doc.getDonorShards().size()); ASSERT_EQ(currOp.getStringField("recipientState"), RecipientState_serializer(RecipientStateEnum::kStrictConsistency)); + ASSERT_GT(currOp.getField("totalCopyTimeElapsedSecs").Long(), 0); + ASSERT_GT(currOp.getField("totalApplyTimeElapsedSecs").Long(), 0); + ASSERT_GT(currOp.getField("approxBytesToCopy").Long(), 0); } + stepDown(); ASSERT_EQ(recipient->getCompletionFuture().getNoThrow(), |