diff options
Diffstat (limited to 'src/mongo/db/s/resharding/resharding_metrics.cpp')
-rw-r--r-- | src/mongo/db/s/resharding/resharding_metrics.cpp | 70 |
1 files changed, 50 insertions, 20 deletions
diff --git a/src/mongo/db/s/resharding/resharding_metrics.cpp b/src/mongo/db/s/resharding/resharding_metrics.cpp index 328850f0f6b..7827ab4896d 100644 --- a/src/mongo/db/s/resharding/resharding_metrics.cpp +++ b/src/mongo/db/s/resharding/resharding_metrics.cpp @@ -29,6 +29,7 @@ #include "mongo/db/s/resharding/resharding_metrics.h" #include "mongo/db/exec/document_value/document.h" #include "mongo/db/s/resharding/resharding_util.h" +#include "mongo/util/optional_util.h" namespace mongo { namespace { @@ -69,6 +70,16 @@ Date_t readStartTime(const CommonReshardingMetadata& metadata, ClockSource* fall } } // namespace + +void ReshardingMetrics::ExternallyTrackedRecipientFields::accumulateFrom( + const ReshardingOplogApplierProgress& progressDoc) { + using optional_util::setOrAdd; + setOrAdd(insertsApplied, progressDoc.getInsertsApplied()); + setOrAdd(updatesApplied, progressDoc.getUpdatesApplied()); + setOrAdd(deletesApplied, progressDoc.getDeletesApplied()); + setOrAdd(writesToStashCollections, progressDoc.getWritesToStashCollections()); +} + ReshardingMetrics::ReshardingMetrics(UUID instanceId, BSONObj shardKey, NamespaceString nss, @@ -101,6 +112,7 @@ ReshardingMetrics::ReshardingMetrics(UUID instanceId, clockSource, cumulativeMetrics, std::make_unique<ReshardingMetricsFieldNameProvider>()}, + _ableToEstimateRemainingRecipientTime{!mustRestoreExternallyTrackedRecipientFields(state)}, _deletesApplied{0}, _insertsApplied{0}, _updatesApplied{0}, @@ -155,18 +167,18 @@ ReshardingCumulativeMetrics* ReshardingMetrics::getReshardingCumulativeMetrics() return dynamic_cast<ReshardingCumulativeMetrics*>(getCumulativeMetrics()); } -Milliseconds ReshardingMetrics::getRecipientHighEstimateRemainingTimeMillis() const { - auto estimate = resharding::estimateRemainingRecipientTime(_applyingStartTime.load() != kNoDate, - getBytesWrittenCount(), - getApproxBytesToScanCount(), - getCopyingElapsedTimeSecs(), - _oplogEntriesApplied.load(), - _oplogEntriesFetched.load(), - getApplyingElapsedTimeSecs()); - if (!estimate) { - return Milliseconds{0}; +boost::optional<Milliseconds> ReshardingMetrics::getRecipientHighEstimateRemainingTimeMillis() + const { + if (!_ableToEstimateRemainingRecipientTime.load()) { + return boost::none; } - return *estimate; + return resharding::estimateRemainingRecipientTime(_applyingStartTime.load() != kNoDate, + getBytesWrittenCount(), + getApproxBytesToScanCount(), + getCopyingElapsedTimeSecs(), + _oplogEntriesApplied.load(), + _oplogEntriesFetched.load(), + getApplyingElapsedTimeSecs()); } std::unique_ptr<ReshardingMetrics> ReshardingMetrics::makeInstance(UUID instanceId, @@ -223,15 +235,6 @@ BSONObj ReshardingMetrics::reportForCurrentOp() const noexcept { return builder.obj(); } -void ReshardingMetrics::accumulateFrom(const ReshardingOplogApplierProgress& progressDoc) { - invariant(_role == Role::kRecipient); - _insertsApplied.fetchAndAdd(progressDoc.getInsertsApplied()); - _updatesApplied.fetchAndAdd(progressDoc.getUpdatesApplied()); - _deletesApplied.fetchAndAdd(progressDoc.getDeletesApplied()); - - accumulateWritesToStashCollections(progressDoc.getWritesToStashCollections()); -} - void ReshardingMetrics::restoreRecipientSpecificFields( const ReshardingRecipientDocument& document) { auto metrics = document.getMetrics(); @@ -419,6 +422,33 @@ void ReshardingMetrics::restoreOplogEntriesApplied(int64_t numEntries) { _oplogEntriesApplied.store(numEntries); } +void ReshardingMetrics::restoreUpdatesApplied(int64_t count) { + _updatesApplied.store(count); +} + +void ReshardingMetrics::restoreInsertsApplied(int64_t count) { + _insertsApplied.store(count); +} + +void ReshardingMetrics::restoreDeletesApplied(int64_t count) { + _deletesApplied.store(count); +} + +void ReshardingMetrics::restoreExternallyTrackedRecipientFields( + const ExternallyTrackedRecipientFields& values) { + invokeIfAllSet(&ReshardingMetrics::restoreDocumentsProcessed, + values.documentCountCopied, + values.documentBytesCopied); + invokeIfAllSet(&ReshardingMetrics::restoreOplogEntriesFetched, values.oplogEntriesFetched); + invokeIfAllSet(&ReshardingMetrics::restoreOplogEntriesApplied, values.oplogEntriesApplied); + invokeIfAllSet(&ReshardingMetrics::restoreUpdatesApplied, values.updatesApplied); + invokeIfAllSet(&ReshardingMetrics::restoreInsertsApplied, values.insertsApplied); + invokeIfAllSet(&ReshardingMetrics::restoreDeletesApplied, values.deletesApplied); + invokeIfAllSet(&ReshardingMetrics::restoreWritesToStashCollections, + values.writesToStashCollections); + _ableToEstimateRemainingRecipientTime.store(true); +} + void ReshardingMetrics::onLocalInsertDuringOplogFetching(Milliseconds elapsed) { getReshardingCumulativeMetrics()->onLocalInsertDuringOplogFetching(elapsed); } |