diff options
Diffstat (limited to 'src/mongo/db/s/resharding/resharding_recipient_service.cpp')
-rw-r--r-- | src/mongo/db/s/resharding/resharding_recipient_service.cpp | 35 |
1 files changed, 17 insertions, 18 deletions
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp index 5e096b1e7f2..fe0d54112ea 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp @@ -65,6 +65,7 @@ #include "mongo/s/grid.h" #include "mongo/s/stale_shard_version_helpers.h" #include "mongo/util/future_util.h" +#include "mongo/util/optional_util.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kResharding @@ -1124,7 +1125,7 @@ void ReshardingRecipientService::RecipientStateMachine::_removeRecipientDocument ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_startMetrics( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, const CancellationToken& abortToken) { - if (_recipientCtx.getState() > RecipientStateEnum::kAwaitingFetchTimestamp) { + if (_metrics->mustRestoreExternallyTrackedRecipientFields(_recipientCtx.getState())) { return _restoreMetricsWithRetry(executor, abortToken); } @@ -1148,28 +1149,25 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_restore void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics( const CancelableOperationContextFactory& factory) { - int64_t documentCountCopied = 0; - int64_t documentBytesCopied = 0; - int64_t oplogEntriesFetched = 0; - int64_t oplogEntriesApplied = 0; + ReshardingMetrics::ExternallyTrackedRecipientFields externalMetrics; auto opCtx = factory.makeOperationContext(&cc()); - { + [&] { AutoGetCollection tempReshardingColl( opCtx.get(), _metadata.getTempReshardingNss(), MODE_IS); - if (tempReshardingColl) { - documentBytesCopied = tempReshardingColl->dataSize(opCtx.get()); - documentCountCopied = tempReshardingColl->numRecords(opCtx.get()); + if (!tempReshardingColl) { + return; } - - if (_recipientCtx.getState() == RecipientStateEnum::kCloning) { + if (_recipientCtx.getState() != RecipientStateEnum::kCloning) { // Before cloning, these values are 0. After cloning these values are written to the // metrics section of the recipient state document and restored during metrics // initialization. This is so that applied oplog entries that add or remove documents do // not affect the cloning metrics. - _metrics->restoreDocumentsProcessed(documentCountCopied, documentBytesCopied); + return; } - } + externalMetrics.documentBytesCopied = tempReshardingColl->dataSize(opCtx.get()); + externalMetrics.documentCountCopied = tempReshardingColl->numRecords(opCtx.get()); + }(); reshardingOpCtxKilledWhileRestoringMetrics.execute( [&opCtx](const BSONObj& data) { opCtx->markKilled(); }); @@ -1183,7 +1181,8 @@ void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics( _metadata.getSourceUUID(), donor.getShardId()), MODE_IS); if (oplogBufferColl) { - oplogEntriesFetched += oplogBufferColl->numRecords(opCtx.get()); + optional_util::setOrAdd(externalMetrics.oplogEntriesFetched, + oplogBufferColl->numRecords(opCtx.get())); } } @@ -1203,7 +1202,8 @@ void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics( if (!result.isEmpty()) { progressDoc = ReshardingOplogApplierProgress::parse( IDLParserContext("resharding-recipient-service-progress-doc"), result); - oplogEntriesApplied += progressDoc->getNumEntriesApplied(); + optional_util::setOrAdd(externalMetrics.oplogEntriesApplied, + progressDoc->getNumEntriesApplied()); } } @@ -1223,15 +1223,14 @@ void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics( continue; } - _metrics->accumulateFrom(*progressDoc); + externalMetrics.accumulateFrom(*progressDoc); auto applierMetrics = std::make_unique<ReshardingOplogApplierMetrics>(_metrics.get(), progressDoc); _applierMetricsMap.emplace(shardId, std::move(applierMetrics)); } - _metrics->restoreOplogEntriesFetched(oplogEntriesFetched); - _metrics->restoreOplogEntriesApplied(oplogEntriesApplied); + _metrics->restoreExternallyTrackedRecipientFields(externalMetrics); } CancellationToken ReshardingRecipientService::RecipientStateMachine::_initAbortSource( |