diff options
Diffstat (limited to 'src')
18 files changed, 328 insertions, 115 deletions
diff --git a/src/mongo/db/s/global_index_metrics.cpp b/src/mongo/db/s/global_index_metrics.cpp index f6f4a46c7cf..36d28736d03 100644 --- a/src/mongo/db/s/global_index_metrics.cpp +++ b/src/mongo/db/s/global_index_metrics.cpp @@ -229,8 +229,9 @@ GlobalIndexCumulativeMetrics* GlobalIndexMetrics::getGlobalIndexCumulativeMetric } -Milliseconds GlobalIndexMetrics::getRecipientHighEstimateRemainingTimeMillis() const { - return Milliseconds{0}; +boost::optional<Milliseconds> GlobalIndexMetrics::getRecipientHighEstimateRemainingTimeMillis() + const { + return boost::none; } } // namespace mongo diff --git a/src/mongo/db/s/global_index_metrics.h b/src/mongo/db/s/global_index_metrics.h index 9f5f53df6a9..6eb2c446565 100644 --- a/src/mongo/db/s/global_index_metrics.h +++ b/src/mongo/db/s/global_index_metrics.h @@ -212,7 +212,6 @@ public: bool unique, ServiceContext* serviceContext); - Milliseconds getRecipientHighEstimateRemainingTimeMillis() const; template <typename T> static auto initializeFrom(const T& document, ServiceContext* serviceContext) { static_assert(isStateDocument<T>); @@ -240,6 +239,9 @@ public: _stateHolder.onStateTransition(before, after); } +protected: + boost::optional<Milliseconds> getRecipientHighEstimateRemainingTimeMillis() const override; + private: GlobalIndexCumulativeMetrics* getGlobalIndexCumulativeMetrics(); std::string createOperationDescription() const noexcept override; diff --git a/src/mongo/db/s/resharding/resharding_metrics.cpp b/src/mongo/db/s/resharding/resharding_metrics.cpp index 328850f0f6b..7827ab4896d 100644 --- a/src/mongo/db/s/resharding/resharding_metrics.cpp +++ b/src/mongo/db/s/resharding/resharding_metrics.cpp @@ -29,6 +29,7 @@ #include "mongo/db/s/resharding/resharding_metrics.h" #include "mongo/db/exec/document_value/document.h" #include "mongo/db/s/resharding/resharding_util.h" +#include "mongo/util/optional_util.h" namespace mongo { namespace { @@ -69,6 +70,16 @@ Date_t readStartTime(const CommonReshardingMetadata& metadata, ClockSource* fall } } // namespace + +void ReshardingMetrics::ExternallyTrackedRecipientFields::accumulateFrom( + const ReshardingOplogApplierProgress& progressDoc) { + using optional_util::setOrAdd; + setOrAdd(insertsApplied, progressDoc.getInsertsApplied()); + setOrAdd(updatesApplied, progressDoc.getUpdatesApplied()); + setOrAdd(deletesApplied, progressDoc.getDeletesApplied()); + setOrAdd(writesToStashCollections, progressDoc.getWritesToStashCollections()); +} + ReshardingMetrics::ReshardingMetrics(UUID instanceId, BSONObj shardKey, NamespaceString nss, @@ -101,6 +112,7 @@ ReshardingMetrics::ReshardingMetrics(UUID instanceId, clockSource, cumulativeMetrics, std::make_unique<ReshardingMetricsFieldNameProvider>()}, + _ableToEstimateRemainingRecipientTime{!mustRestoreExternallyTrackedRecipientFields(state)}, _deletesApplied{0}, _insertsApplied{0}, _updatesApplied{0}, @@ -155,18 +167,18 @@ ReshardingCumulativeMetrics* ReshardingMetrics::getReshardingCumulativeMetrics() return dynamic_cast<ReshardingCumulativeMetrics*>(getCumulativeMetrics()); } -Milliseconds ReshardingMetrics::getRecipientHighEstimateRemainingTimeMillis() const { - auto estimate = resharding::estimateRemainingRecipientTime(_applyingStartTime.load() != kNoDate, - getBytesWrittenCount(), - getApproxBytesToScanCount(), - getCopyingElapsedTimeSecs(), - _oplogEntriesApplied.load(), - _oplogEntriesFetched.load(), - getApplyingElapsedTimeSecs()); - if (!estimate) { - return Milliseconds{0}; +boost::optional<Milliseconds> ReshardingMetrics::getRecipientHighEstimateRemainingTimeMillis() + const { + if (!_ableToEstimateRemainingRecipientTime.load()) { + return boost::none; } - return *estimate; + return resharding::estimateRemainingRecipientTime(_applyingStartTime.load() != kNoDate, + getBytesWrittenCount(), + getApproxBytesToScanCount(), + getCopyingElapsedTimeSecs(), + _oplogEntriesApplied.load(), + _oplogEntriesFetched.load(), + getApplyingElapsedTimeSecs()); } std::unique_ptr<ReshardingMetrics> ReshardingMetrics::makeInstance(UUID instanceId, @@ -223,15 +235,6 @@ BSONObj ReshardingMetrics::reportForCurrentOp() const noexcept { return builder.obj(); } -void ReshardingMetrics::accumulateFrom(const ReshardingOplogApplierProgress& progressDoc) { - invariant(_role == Role::kRecipient); - _insertsApplied.fetchAndAdd(progressDoc.getInsertsApplied()); - _updatesApplied.fetchAndAdd(progressDoc.getUpdatesApplied()); - _deletesApplied.fetchAndAdd(progressDoc.getDeletesApplied()); - - accumulateWritesToStashCollections(progressDoc.getWritesToStashCollections()); -} - void ReshardingMetrics::restoreRecipientSpecificFields( const ReshardingRecipientDocument& document) { auto metrics = document.getMetrics(); @@ -419,6 +422,33 @@ void ReshardingMetrics::restoreOplogEntriesApplied(int64_t numEntries) { _oplogEntriesApplied.store(numEntries); } +void ReshardingMetrics::restoreUpdatesApplied(int64_t count) { + _updatesApplied.store(count); +} + +void ReshardingMetrics::restoreInsertsApplied(int64_t count) { + _insertsApplied.store(count); +} + +void ReshardingMetrics::restoreDeletesApplied(int64_t count) { + _deletesApplied.store(count); +} + +void ReshardingMetrics::restoreExternallyTrackedRecipientFields( + const ExternallyTrackedRecipientFields& values) { + invokeIfAllSet(&ReshardingMetrics::restoreDocumentsProcessed, + values.documentCountCopied, + values.documentBytesCopied); + invokeIfAllSet(&ReshardingMetrics::restoreOplogEntriesFetched, values.oplogEntriesFetched); + invokeIfAllSet(&ReshardingMetrics::restoreOplogEntriesApplied, values.oplogEntriesApplied); + invokeIfAllSet(&ReshardingMetrics::restoreUpdatesApplied, values.updatesApplied); + invokeIfAllSet(&ReshardingMetrics::restoreInsertsApplied, values.insertsApplied); + invokeIfAllSet(&ReshardingMetrics::restoreDeletesApplied, values.deletesApplied); + invokeIfAllSet(&ReshardingMetrics::restoreWritesToStashCollections, + values.writesToStashCollections); + _ableToEstimateRemainingRecipientTime.store(true); +} + void ReshardingMetrics::onLocalInsertDuringOplogFetching(Milliseconds elapsed) { getReshardingCumulativeMetrics()->onLocalInsertDuringOplogFetching(elapsed); } diff --git a/src/mongo/db/s/resharding/resharding_metrics.h b/src/mongo/db/s/resharding/resharding_metrics.h index 5b8f1a76f06..3c2bd1e57a9 100644 --- a/src/mongo/db/s/resharding/resharding_metrics.h +++ b/src/mongo/db/s/resharding/resharding_metrics.h @@ -80,6 +80,20 @@ public: CoordinatorStateEnum _enumVal; }; + struct ExternallyTrackedRecipientFields { + public: + void accumulateFrom(const ReshardingOplogApplierProgress& progressDoc); + + boost::optional<int64_t> documentCountCopied; + boost::optional<int64_t> documentBytesCopied; + boost::optional<int64_t> oplogEntriesFetched; + boost::optional<int64_t> oplogEntriesApplied; + boost::optional<int64_t> insertsApplied; + boost::optional<int64_t> updatesApplied; + boost::optional<int64_t> deletesApplied; + boost::optional<int64_t> writesToStashCollections; + }; + ReshardingMetrics(const CommonReshardingMetadata& metadata, Role role, ClockSource* clockSource, @@ -151,28 +165,43 @@ public: void onStateTransition(T before, T after) { _stateHolder.onStateTransition(before, after); } - void accumulateFrom(const ReshardingOplogApplierProgress& progressDoc); + + template <typename StateOrStateVariant> + static bool mustRestoreExternallyTrackedRecipientFields(StateOrStateVariant stateOrVariant) { + if constexpr (std::is_same_v<StateOrStateVariant, State>) { + return stdx::visit( + [](auto v) { return mustRestoreExternallyTrackedRecipientFieldsImpl(v); }, + stateOrVariant); + } else { + return mustRestoreExternallyTrackedRecipientFieldsImpl(stateOrVariant); + } + } + BSONObj reportForCurrentOp() const noexcept override; void onUpdateApplied(); void onInsertApplied(); void onDeleteApplied(); void onOplogEntriesFetched(int64_t numEntries, Milliseconds elapsed); - void restoreOplogEntriesFetched(int64_t numEntries); void onOplogEntriesApplied(int64_t numEntries); - void restoreOplogEntriesApplied(int64_t numEntries); void onApplyingBegin(); void onApplyingEnd(); void onLocalInsertDuringOplogFetching(Milliseconds elapsed); void onBatchRetrievedDuringOplogApplying(Milliseconds elapsed); void onOplogLocalBatchApplied(Milliseconds elapsed); + void restoreExternallyTrackedRecipientFields(const ExternallyTrackedRecipientFields& values); Seconds getApplyingElapsedTimeSecs() const; Date_t getApplyingBegin() const; Date_t getApplyingEnd() const; - Milliseconds getRecipientHighEstimateRemainingTimeMillis() const; protected: + boost::optional<Milliseconds> getRecipientHighEstimateRemainingTimeMillis() const override; + void restoreOplogEntriesFetched(int64_t numEntries); + void restoreOplogEntriesApplied(int64_t numEntries); + void restoreUpdatesApplied(int64_t count); + void restoreInsertsApplied(int64_t count); + void restoreDeletesApplied(int64_t count); virtual StringData getStateString() const noexcept override; void restoreApplyingBegin(Date_t date); void restoreApplyingEnd(Date_t date); @@ -196,6 +225,16 @@ private: } template <typename T> + static bool mustRestoreExternallyTrackedRecipientFieldsImpl(T state) { + static_assert(resharding_metrics::isState<T>); + if constexpr (std::is_same_v<T, RecipientStateEnum>) { + return state > RecipientStateEnum::kAwaitingFetchTimestamp; + } else { + return false; + } + } + + template <typename T> void restorePhaseDurationFields(const T& document) { static_assert(resharding_metrics::isStateDocument<T>); auto metrics = document.getMetrics(); @@ -226,6 +265,15 @@ private: } } + template <typename MemberFn, typename... T> + void invokeIfAllSet(MemberFn&& fn, const boost::optional<T>&... args) { + if (!(args && ...)) { + return; + } + std::invoke(fn, this, *args...); + } + + AtomicWord<bool> _ableToEstimateRemainingRecipientTime; AtomicWord<int64_t> _deletesApplied; AtomicWord<int64_t> _insertsApplied; AtomicWord<int64_t> _updatesApplied; diff --git a/src/mongo/db/s/resharding/resharding_metrics_helpers.h b/src/mongo/db/s/resharding/resharding_metrics_helpers.h index d5450d12135..42d8fc01fd0 100644 --- a/src/mongo/db/s/resharding/resharding_metrics_helpers.h +++ b/src/mongo/db/s/resharding/resharding_metrics_helpers.h @@ -41,10 +41,14 @@ namespace resharding_metrics { template <class T> inline constexpr bool isStateDocument = - std::disjunction<std::is_same<T, ReshardingRecipientDocument>, - std::is_same<T, ReshardingCoordinatorDocument>, - std::is_same<T, ReshardingDonorDocument>>::value; + std::disjunction_v<std::is_same<T, ReshardingRecipientDocument>, + std::is_same<T, ReshardingCoordinatorDocument>, + std::is_same<T, ReshardingDonorDocument>>; +template <typename T> +inline constexpr bool isState = std::disjunction_v<std::is_same<T, RecipientStateEnum>, + std::is_same<T, CoordinatorStateEnum>, + std::is_same<T, DonorStateEnum>>; template <typename T> inline constexpr auto getState(const T& document) { diff --git a/src/mongo/db/s/resharding/resharding_metrics_test.cpp b/src/mongo/db/s/resharding/resharding_metrics_test.cpp index aa18742fb7c..99b9d2d7cf6 100644 --- a/src/mongo/db/s/resharding/resharding_metrics_test.cpp +++ b/src/mongo/db/s/resharding/resharding_metrics_test.cpp @@ -63,7 +63,7 @@ public: role, clockSource->now(), clockSource, - _cumulativeMetrics.get()); + getCumulativeMetrics()); } const UUID& getSourceCollectionId() { @@ -74,10 +74,20 @@ public: template <typename T> BSONObj getReportFromStateDocument(T document) { auto metrics = - ReshardingMetrics::initializeFrom(document, getClockSource(), _cumulativeMetrics.get()); + ReshardingMetrics::initializeFrom(document, getClockSource(), getCumulativeMetrics()); return metrics->reportForCurrentOp(); } + auto makeRecipientMetricsWithAmbiguousTimeRemaining() { + auto doc = createRecipientDocument(RecipientStateEnum::kApplying, UUID::gen()); + ReshardingRecipientMetrics metricsDoc; + ReshardingMetricsTimeInterval interval; + interval.setStart(getClockSource()->now()); + metricsDoc.setOplogApplication(interval); + doc.setMetrics(metricsDoc); + return ReshardingMetrics::initializeFrom(doc, getClockSource(), getCumulativeMetrics()); + } + ReshardingRecipientDocument createRecipientDocument(RecipientStateEnum state, const UUID& operationId) { RecipientShardContext recipientCtx; @@ -174,7 +184,7 @@ public: doc.setMetrics(metricsDoc); auto metrics = - ReshardingMetrics::initializeFrom(doc, getClockSource(), _cumulativeMetrics.get()); + ReshardingMetrics::initializeFrom(doc, getClockSource(), getCumulativeMetrics()); clock->advance(kInterval); auto report = metrics->reportForCurrentOp(); @@ -235,9 +245,12 @@ TEST_F(ReshardingMetricsTest, RestoresByteAndDocumentCountsFromRecipientStateDoc TEST_F(ReshardingMetricsTest, RestoresByteAndDocumentCountsDuringCloning) { constexpr auto kDocsCopied = 50; constexpr auto kBytesCopied = 500; + ReshardingMetrics::ExternallyTrackedRecipientFields external; + external.documentCountCopied = kDocsCopied; + external.documentBytesCopied = kBytesCopied; auto metrics = createInstanceMetrics(getClockSource(), UUID::gen(), Role::kRecipient); - metrics->restoreDocumentsProcessed(kDocsCopied, kBytesCopied); + metrics->restoreExternallyTrackedRecipientFields(external); auto report = metrics->reportForCurrentOp(); ASSERT_EQ(report.getIntField("documentsCopied"), kDocsCopied); @@ -303,8 +316,11 @@ TEST_F(ReshardingMetricsTest, RestoresFromReshardingApplierProgressDocument) { progressDoc.setDeletesApplied(789); progressDoc.setWritesToStashCollections(800); + ReshardingMetrics::ExternallyTrackedRecipientFields external; + external.accumulateFrom(progressDoc); + auto metrics = createInstanceMetrics(getClockSource(), UUID::gen(), Role::kRecipient); - metrics->accumulateFrom(progressDoc); + metrics->restoreExternallyTrackedRecipientFields(external); auto report = metrics->reportForCurrentOp(); ASSERT_EQ(report.getIntField("insertsApplied"), 123); @@ -399,16 +415,20 @@ TEST_F(ReshardingMetricsTest, RecipientIncrementFetchedOplogEntries) { } TEST_F(ReshardingMetricsTest, RecipientRestoreFetchedOplogEntries) { + ReshardingMetrics::ExternallyTrackedRecipientFields external; + auto metrics = createInstanceMetrics(getClockSource(), UUID::gen(), Role::kRecipient); auto report = metrics->reportForCurrentOp(); ASSERT_EQ(report.getIntField("oplogEntriesFetched"), 0); - metrics->restoreOplogEntriesFetched(100); + external.oplogEntriesFetched = 100; + metrics->restoreExternallyTrackedRecipientFields(external); report = metrics->reportForCurrentOp(); ASSERT_EQ(report.getIntField("oplogEntriesFetched"), 100); - metrics->restoreOplogEntriesFetched(50); + external.oplogEntriesFetched = 50; + metrics->restoreExternallyTrackedRecipientFields(external); report = metrics->reportForCurrentOp(); ASSERT_EQ(report.getIntField("oplogEntriesFetched"), 50); } @@ -423,15 +443,11 @@ TEST_F(ReshardingMetricsTest, RecipientReportsRemainingTime) { metrics->setDocumentsToProcessCounts(0, kOpsPerIncrement * 4); metrics->onOplogEntriesFetched(kOpsPerIncrement * 4, Milliseconds(1)); - // Before cloning. - auto report = metrics->reportForCurrentOp(); - ASSERT_EQ(report.getIntField("remainingOperationTimeEstimatedSecs"), 0); - // During cloning. metrics->onCopyingBegin(); metrics->onDocumentsProcessed(0, kOpsPerIncrement, Milliseconds(1)); clock->advance(kIncrement); - report = metrics->reportForCurrentOp(); + auto report = metrics->reportForCurrentOp(); ASSERT_EQ(report.getIntField("remainingOperationTimeEstimatedSecs"), kExpectedTotal - kIncrementSecs); @@ -467,16 +483,20 @@ TEST_F(ReshardingMetricsTest, RecipientReportsRemainingTime) { } TEST_F(ReshardingMetricsTest, RecipientRestoreAppliedOplogEntries) { + ReshardingMetrics::ExternallyTrackedRecipientFields external; + auto metrics = createInstanceMetrics(getClockSource(), UUID::gen(), Role::kRecipient); auto report = metrics->reportForCurrentOp(); ASSERT_EQ(report.getIntField("oplogEntriesApplied"), 0); - metrics->restoreOplogEntriesApplied(120); + external.oplogEntriesApplied = 120; + metrics->restoreExternallyTrackedRecipientFields(external); report = metrics->reportForCurrentOp(); ASSERT_EQ(report.getIntField("oplogEntriesApplied"), 120); - metrics->restoreOplogEntriesApplied(30); + external.oplogEntriesApplied = 30; + metrics->restoreExternallyTrackedRecipientFields(external); report = metrics->reportForCurrentOp(); ASSERT_EQ(report.getIntField("oplogEntriesApplied"), 30); } @@ -490,5 +510,29 @@ TEST_F(ReshardingMetricsTest, CurrentOpReportsApplyingTime) { [](ReshardingMetrics* metrics) { metrics->onApplyingEnd(); }); } +TEST_F(ReshardingMetricsTest, RecipientEstimatesNoneOnNewInstance) { + auto metrics = createInstanceMetrics(getClockSource(), UUID::gen(), Role::kRecipient); + ASSERT_EQ(metrics->getHighEstimateRemainingTimeMillis(), boost::none); +} + +TEST_F(ReshardingMetricsTest, + RecipientEstimatesNoneBeforeExternalFieldsRestoredForRestoredInstance) { + auto metrics = makeRecipientMetricsWithAmbiguousTimeRemaining(); + ASSERT_EQ(metrics->getHighEstimateRemainingTimeMillis(), boost::none); +} + +TEST_F(ReshardingMetricsTest, RecipientEstimatesAfterExternalFieldsRestoredForRestoredInstance) { + auto metrics = makeRecipientMetricsWithAmbiguousTimeRemaining(); + metrics->restoreExternallyTrackedRecipientFields( + ReshardingMetrics::ExternallyTrackedRecipientFields{}); + ASSERT_EQ(metrics->getHighEstimateRemainingTimeMillis(), Milliseconds{0}); +} + +TEST_F(ReshardingMetricsTest, CurrentOpDoesNotReportRecipientEstimateIfNotSet) { + auto metrics = createInstanceMetrics(getClockSource(), UUID::gen(), Role::kRecipient); + auto report = metrics->reportForCurrentOp(); + ASSERT_FALSE(report.hasField("remainingOperationTimeEstimatedSecs")); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp index 5e096b1e7f2..fe0d54112ea 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp @@ -65,6 +65,7 @@ #include "mongo/s/grid.h" #include "mongo/s/stale_shard_version_helpers.h" #include "mongo/util/future_util.h" +#include "mongo/util/optional_util.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kResharding @@ -1124,7 +1125,7 @@ void ReshardingRecipientService::RecipientStateMachine::_removeRecipientDocument ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_startMetrics( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, const CancellationToken& abortToken) { - if (_recipientCtx.getState() > RecipientStateEnum::kAwaitingFetchTimestamp) { + if (_metrics->mustRestoreExternallyTrackedRecipientFields(_recipientCtx.getState())) { return _restoreMetricsWithRetry(executor, abortToken); } @@ -1148,28 +1149,25 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_restore void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics( const CancelableOperationContextFactory& factory) { - int64_t documentCountCopied = 0; - int64_t documentBytesCopied = 0; - int64_t oplogEntriesFetched = 0; - int64_t oplogEntriesApplied = 0; + ReshardingMetrics::ExternallyTrackedRecipientFields externalMetrics; auto opCtx = factory.makeOperationContext(&cc()); - { + [&] { AutoGetCollection tempReshardingColl( opCtx.get(), _metadata.getTempReshardingNss(), MODE_IS); - if (tempReshardingColl) { - documentBytesCopied = tempReshardingColl->dataSize(opCtx.get()); - documentCountCopied = tempReshardingColl->numRecords(opCtx.get()); + if (!tempReshardingColl) { + return; } - - if (_recipientCtx.getState() == RecipientStateEnum::kCloning) { + if (_recipientCtx.getState() != RecipientStateEnum::kCloning) { // Before cloning, these values are 0. After cloning these values are written to the // metrics section of the recipient state document and restored during metrics // initialization. This is so that applied oplog entries that add or remove documents do // not affect the cloning metrics. - _metrics->restoreDocumentsProcessed(documentCountCopied, documentBytesCopied); + return; } - } + externalMetrics.documentBytesCopied = tempReshardingColl->dataSize(opCtx.get()); + externalMetrics.documentCountCopied = tempReshardingColl->numRecords(opCtx.get()); + }(); reshardingOpCtxKilledWhileRestoringMetrics.execute( [&opCtx](const BSONObj& data) { opCtx->markKilled(); }); @@ -1183,7 +1181,8 @@ void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics( _metadata.getSourceUUID(), donor.getShardId()), MODE_IS); if (oplogBufferColl) { - oplogEntriesFetched += oplogBufferColl->numRecords(opCtx.get()); + optional_util::setOrAdd(externalMetrics.oplogEntriesFetched, + oplogBufferColl->numRecords(opCtx.get())); } } @@ -1203,7 +1202,8 @@ void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics( if (!result.isEmpty()) { progressDoc = ReshardingOplogApplierProgress::parse( IDLParserContext("resharding-recipient-service-progress-doc"), result); - oplogEntriesApplied += progressDoc->getNumEntriesApplied(); + optional_util::setOrAdd(externalMetrics.oplogEntriesApplied, + progressDoc->getNumEntriesApplied()); } } @@ -1223,15 +1223,14 @@ void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics( continue; } - _metrics->accumulateFrom(*progressDoc); + externalMetrics.accumulateFrom(*progressDoc); auto applierMetrics = std::make_unique<ReshardingOplogApplierMetrics>(_metrics.get(), progressDoc); _applierMetricsMap.emplace(shardId, std::move(applierMetrics)); } - _metrics->restoreOplogEntriesFetched(oplogEntriesFetched); - _metrics->restoreOplogEntriesApplied(oplogEntriesApplied); + _metrics->restoreExternallyTrackedRecipientFields(externalMetrics); } CancellationToken ReshardingRecipientService::RecipientStateMachine::_initAbortSource( diff --git a/src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp b/src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp index e13cd3857a1..0b0665a6b02 100644 --- a/src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp +++ b/src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp @@ -42,6 +42,7 @@ constexpr auto kActive = "active"; constexpr auto kOldestActive = "oldestActive"; constexpr auto kLatencies = "latencies"; constexpr auto kCurrentInSteps = "currentInSteps"; +constexpr auto kEstimateNotAvailable = -1; struct Metrics { ReshardingCumulativeMetrics _resharding; @@ -53,7 +54,6 @@ const auto getMetrics = ServiceContext::declareDecoration<MetricsPtr>(); const auto metricsRegisterer = ServiceContext::ConstructorActionRegisterer{ "ShardingDataTransformMetrics", [](ServiceContext* ctx) { getMetrics(ctx) = std::make_unique<Metrics>(); }}; - } // namespace ShardingDataTransformCumulativeMetrics* ShardingDataTransformCumulativeMetrics::getForResharding( @@ -86,18 +86,35 @@ ShardingDataTransformCumulativeMetrics::registerInstanceMetrics(const InstanceOb int64_t ShardingDataTransformCumulativeMetrics::getOldestOperationHighEstimateRemainingTimeMillis( Role role) const { - - stdx::unique_lock guard(_mutex); - auto op = getOldestOperation(guard, role); - return op ? op->getHighEstimateRemainingTimeMillis() : 0; + return getOldestOperationEstimateRemainingTimeMillis(role, EstimateType::kHigh); } int64_t ShardingDataTransformCumulativeMetrics::getOldestOperationLowEstimateRemainingTimeMillis( Role role) const { + return getOldestOperationEstimateRemainingTimeMillis(role, EstimateType::kLow); +} + +int64_t ShardingDataTransformCumulativeMetrics::getOldestOperationEstimateRemainingTimeMillis( + Role role, EstimateType type) const { stdx::unique_lock guard(_mutex); auto op = getOldestOperation(guard, role); - return op ? op->getLowEstimateRemainingTimeMillis() : 0; + if (!op) { + return kEstimateNotAvailable; + } + auto estimate = getEstimate(op, type); + return estimate ? estimate->count() : kEstimateNotAvailable; +} + +boost::optional<Milliseconds> ShardingDataTransformCumulativeMetrics::getEstimate( + const InstanceObserver* op, EstimateType type) const { + switch (type) { + case kHigh: + return op->getHighEstimateRemainingTimeMillis(); + case kLow: + return op->getLowEstimateRemainingTimeMillis(); + } + MONGO_UNREACHABLE; } size_t ShardingDataTransformCumulativeMetrics::getObservedMetricsCount() const { diff --git a/src/mongo/db/s/sharding_data_transform_cumulative_metrics.h b/src/mongo/db/s/sharding_data_transform_cumulative_metrics.h index 814ef0790d4..21a107c8f53 100644 --- a/src/mongo/db/s/sharding_data_transform_cumulative_metrics.h +++ b/src/mongo/db/s/sharding_data_transform_cumulative_metrics.h @@ -119,9 +119,13 @@ protected: virtual void reportCurrentInSteps(BSONObjBuilder* bob) const; private: + enum EstimateType { kHigh, kLow }; + MetricsSet& getMetricsSetForRole(Role role); const MetricsSet& getMetricsSetForRole(Role role) const; const InstanceObserver* getOldestOperation(WithLock, Role role) const; + int64_t getOldestOperationEstimateRemainingTimeMillis(Role role, EstimateType type) const; + boost::optional<Milliseconds> getEstimate(const InstanceObserver* op, EstimateType type) const; MetricsSet::iterator insertMetrics(const InstanceObserver* metrics, MetricsSet& set); void deregisterMetrics(const Role& role, const MetricsSet::iterator& metrics); diff --git a/src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp b/src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp index 299bddb3b09..1fa2585ac3e 100644 --- a/src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp +++ b/src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp @@ -108,11 +108,11 @@ TEST_F(ShardingDataTransformMetricsTestFixture, NoServerStatusWhenNeverUsed) { ASSERT_BSONOBJ_EQ(report, BSONObj()); } -TEST_F(ShardingDataTransformMetricsTestFixture, RemainingTimeReports0WhenEmpty) { +TEST_F(ShardingDataTransformMetricsTestFixture, RemainingTimeReportsMinusOneWhenEmpty) { ASSERT_EQ(_cumulativeMetrics->getObservedMetricsCount(), 0); ASSERT_EQ(_cumulativeMetrics->getOldestOperationHighEstimateRemainingTimeMillis( ObserverMock::kDefaultRole), - 0); + -1); } TEST_F(ShardingDataTransformMetricsTestFixture, UpdatesOldestWhenOldestIsRemoved) { diff --git a/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp b/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp index 96ab3d2a6fd..9c65d80b4b3 100644 --- a/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp +++ b/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp @@ -33,6 +33,29 @@ namespace mongo { +namespace { +constexpr auto kNoEstimate = Milliseconds{-1}; + +boost::optional<Milliseconds> readCoordinatorEstimate(const AtomicWord<Milliseconds>& field) { + auto estimate = field.load(); + if (estimate == kNoEstimate) { + return boost::none; + } + return estimate; +} + +template <typename T> +void appendOptionalMillisecondsFieldAs(BSONObjBuilder& builder, + const StringData& fieldName, + const boost::optional<Milliseconds> value) { + if (!value) { + return; + } + builder.append(fieldName, durationCount<T>(*value)); +} + +} // namespace + ShardingDataTransformInstanceMetrics::ShardingDataTransformInstanceMetrics( UUID instanceId, BSONObj originalCommand, @@ -78,30 +101,32 @@ ShardingDataTransformInstanceMetrics::ShardingDataTransformInstanceMetrics( _documentsProcessed{0}, _approxBytesToScan{0}, _bytesWritten{0}, - _coordinatorHighEstimateRemainingTimeMillis{Milliseconds{0}}, - _coordinatorLowEstimateRemainingTimeMillis{Milliseconds{0}}, + _coordinatorHighEstimateRemainingTimeMillis{kNoEstimate}, + _coordinatorLowEstimateRemainingTimeMillis{kNoEstimate}, _criticalSectionStartTime{kNoDate}, _criticalSectionEndTime{kNoDate}, _writesDuringCriticalSection{0} {} -Milliseconds ShardingDataTransformInstanceMetrics::getHighEstimateRemainingTimeMillis() const { +boost::optional<Milliseconds> +ShardingDataTransformInstanceMetrics::getHighEstimateRemainingTimeMillis() const { switch (_role) { case Role::kRecipient: return getRecipientHighEstimateRemainingTimeMillis(); case Role::kCoordinator: - return Milliseconds{_coordinatorHighEstimateRemainingTimeMillis.load()}; + return readCoordinatorEstimate(_coordinatorHighEstimateRemainingTimeMillis); case Role::kDonor: break; } MONGO_UNREACHABLE; } -Milliseconds ShardingDataTransformInstanceMetrics::getLowEstimateRemainingTimeMillis() const { +boost::optional<Milliseconds> +ShardingDataTransformInstanceMetrics::getLowEstimateRemainingTimeMillis() const { switch (_role) { case Role::kRecipient: return getHighEstimateRemainingTimeMillis(); case Role::kCoordinator: - return _coordinatorLowEstimateRemainingTimeMillis.load(); + return readCoordinatorEstimate(_coordinatorLowEstimateRemainingTimeMillis); case Role::kDonor: break; } @@ -141,10 +166,14 @@ BSONObj ShardingDataTransformInstanceMetrics::reportForCurrentOp() const noexcep builder.append(_fieldNames->getForOpTimeElapsed(), getOperationRunningTimeSecs().count()); switch (_role) { case Role::kCoordinator: - builder.append(_fieldNames->getForAllShardsHighestRemainingOperationTimeEstimatedSecs(), - durationCount<Seconds>(getHighEstimateRemainingTimeMillis())); - builder.append(_fieldNames->getForAllShardsLowestRemainingOperationTimeEstimatedSecs(), - durationCount<Seconds>(getLowEstimateRemainingTimeMillis())); + appendOptionalMillisecondsFieldAs<Seconds>( + builder, + _fieldNames->getForAllShardsHighestRemainingOperationTimeEstimatedSecs(), + getHighEstimateRemainingTimeMillis()); + appendOptionalMillisecondsFieldAs<Seconds>( + builder, + _fieldNames->getForAllShardsLowestRemainingOperationTimeEstimatedSecs(), + getLowEstimateRemainingTimeMillis()); builder.append(_fieldNames->getForCoordinatorState(), getStateString()); builder.append(_fieldNames->getForCopyTimeElapsed(), getCopyingElapsedTimeSecs().count()); @@ -164,8 +193,10 @@ BSONObj ShardingDataTransformInstanceMetrics::reportForCurrentOp() const noexcep builder.append(_fieldNames->getForRecipientState(), getStateString()); builder.append(_fieldNames->getForCopyTimeElapsed(), getCopyingElapsedTimeSecs().count()); - builder.append(_fieldNames->getForRemainingOpTimeEstimated(), - durationCount<Seconds>(getHighEstimateRemainingTimeMillis())); + appendOptionalMillisecondsFieldAs<Seconds>( + builder, + _fieldNames->getForRemainingOpTimeEstimated(), + getHighEstimateRemainingTimeMillis()); builder.append(_fieldNames->getForApproxDocumentsToProcess(), _approxDocumentsToProcess.load()); builder.append(_fieldNames->getForApproxBytesToScan(), _approxBytesToScan.load()); @@ -231,6 +262,11 @@ void ShardingDataTransformInstanceMetrics::restoreDocumentsProcessed( _bytesWritten.store(totalDocumentsSizeBytes); } +void ShardingDataTransformInstanceMetrics::restoreWritesToStashCollections( + int64_t writesToStashCollections) { + _writesToStashCollections.store(writesToStashCollections); +} + void ShardingDataTransformInstanceMetrics::setDocumentsToProcessCounts( int64_t documentCount, int64_t totalDocumentsSizeBytes) { _approxDocumentsToProcess.store(documentCount); @@ -282,11 +318,6 @@ void ShardingDataTransformInstanceMetrics::onReadDuringCriticalSection() { _cumulativeMetrics->onWriteDuringCriticalSection(); } -void ShardingDataTransformInstanceMetrics::accumulateWritesToStashCollections( - int64_t writesToStashCollections) { - _writesToStashCollections.fetchAndAdd(writesToStashCollections); -} - void ShardingDataTransformInstanceMetrics::onCloningTotalRemoteBatchRetrieval( Milliseconds elapsed) { _cumulativeMetrics->onCloningTotalRemoteBatchRetrieval(elapsed); diff --git a/src/mongo/db/s/sharding_data_transform_instance_metrics.h b/src/mongo/db/s/sharding_data_transform_instance_metrics.h index 2c26f4ab480..ecd8a776afb 100644 --- a/src/mongo/db/s/sharding_data_transform_instance_metrics.h +++ b/src/mongo/db/s/sharding_data_transform_instance_metrics.h @@ -67,9 +67,8 @@ public: virtual BSONObj reportForCurrentOp() const noexcept; - Milliseconds getHighEstimateRemainingTimeMillis() const; - Milliseconds getLowEstimateRemainingTimeMillis() const; - virtual Milliseconds getRecipientHighEstimateRemainingTimeMillis() const = 0; + boost::optional<Milliseconds> getHighEstimateRemainingTimeMillis() const; + boost::optional<Milliseconds> getLowEstimateRemainingTimeMillis() const; Date_t getStartTimestamp() const; const UUID& getInstanceId() const; @@ -90,7 +89,6 @@ public: int64_t getDocumentsProcessedCount() const; int64_t getBytesWrittenCount() const; int64_t getApproxBytesToScanCount() const; - void restoreDocumentsProcessed(int64_t documentCount, int64_t totalDocumentsSizeBytes); void setDocumentsToProcessCounts(int64_t documentCount, int64_t totalDocumentsSizeBytes); void setCoordinatorHighEstimateRemainingTimeMillis(Milliseconds milliseconds); void setCoordinatorLowEstimateRemainingTimeMillis(Milliseconds milliseconds); @@ -130,9 +128,11 @@ protected: } void restoreCopyingBegin(Date_t date); void restoreCopyingEnd(Date_t date); + void restoreDocumentsProcessed(int64_t documentCount, int64_t totalDocumentsSizeBytes); + void restoreWritesToStashCollections(int64_t writesToStashCollections); virtual std::string createOperationDescription() const noexcept; virtual StringData getStateString() const noexcept; - void accumulateWritesToStashCollections(int64_t writesToStashCollections); + virtual boost::optional<Milliseconds> getRecipientHighEstimateRemainingTimeMillis() const = 0; ShardingDataTransformCumulativeMetrics* getCumulativeMetrics(); ClockSource* getClockSource() const; diff --git a/src/mongo/db/s/sharding_data_transform_instance_metrics_test.cpp b/src/mongo/db/s/sharding_data_transform_instance_metrics_test.cpp index f5d4ba2df01..a690d908879 100644 --- a/src/mongo/db/s/sharding_data_transform_instance_metrics_test.cpp +++ b/src/mongo/db/s/sharding_data_transform_instance_metrics_test.cpp @@ -105,8 +105,8 @@ public: std::move(observer)}, _scopedObserver(registerInstanceMetrics()) {} - Milliseconds getRecipientHighEstimateRemainingTimeMillis() const { - return Milliseconds{0}; + boost::optional<Milliseconds> getRecipientHighEstimateRemainingTimeMillis() const { + return boost::none; } private: @@ -325,25 +325,41 @@ TEST_F(ShardingDataTransformInstanceMetricsTest, OnWriteToStasheddShouldIncremen TEST_F(ShardingDataTransformInstanceMetricsTest, SetLowestOperationTimeShouldBeReflectedInCurrentOp) { auto metrics = createInstanceMetrics(UUID::gen(), Role::kCoordinator); - - auto report = metrics->reportForCurrentOp(); - ASSERT_EQ(report.getIntField("allShardsLowestRemainingOperationTimeEstimatedSecs"), 0); metrics->setCoordinatorLowEstimateRemainingTimeMillis(Milliseconds(2000)); - - report = metrics->reportForCurrentOp(); + auto report = metrics->reportForCurrentOp(); ASSERT_EQ(report.getIntField("allShardsLowestRemainingOperationTimeEstimatedSecs"), 2); } TEST_F(ShardingDataTransformInstanceMetricsTest, SetHighestOperationTimeShouldBeReflectedInCurrentOp) { auto metrics = createInstanceMetrics(UUID::gen(), Role::kCoordinator); + metrics->setCoordinatorHighEstimateRemainingTimeMillis(Milliseconds(12000)); + auto report = metrics->reportForCurrentOp(); + ASSERT_EQ(report.getIntField("allShardsHighestRemainingOperationTimeEstimatedSecs"), 12); +} + +TEST_F(ShardingDataTransformInstanceMetricsTest, CoordinatorHighEstimateNoneIfNotSet) { + auto metrics = createInstanceMetrics(UUID::gen(), Role::kCoordinator); + ASSERT_EQ(metrics->getHighEstimateRemainingTimeMillis(), boost::none); +} + +TEST_F(ShardingDataTransformInstanceMetricsTest, CoordinatorLowEstimateNoneIfNotSet) { + auto metrics = createInstanceMetrics(UUID::gen(), Role::kCoordinator); + ASSERT_EQ(metrics->getLowEstimateRemainingTimeMillis(), boost::none); +} +TEST_F(ShardingDataTransformInstanceMetricsTest, + CurrentOpDoesNotReportCoordinatorHighEstimateIfNotSet) { + auto metrics = createInstanceMetrics(UUID::gen(), Role::kCoordinator); auto report = metrics->reportForCurrentOp(); - ASSERT_EQ(report.getIntField("allShardsHighestRemainingOperationTimeEstimatedSecs"), 0); - metrics->setCoordinatorHighEstimateRemainingTimeMillis(Milliseconds(12000)); + ASSERT_FALSE(report.hasField("allShardsHighestRemainingOperationTimeEstimatedSecs")); +} - report = metrics->reportForCurrentOp(); - ASSERT_EQ(report.getIntField("allShardsHighestRemainingOperationTimeEstimatedSecs"), 12); +TEST_F(ShardingDataTransformInstanceMetricsTest, + CurrentOpDoesNotReportCoordinatorLowEstimateIfNotSet) { + auto metrics = createInstanceMetrics(UUID::gen(), Role::kCoordinator); + auto report = metrics->reportForCurrentOp(); + ASSERT_FALSE(report.hasField("allShardsLowestRemainingOperationTimeEstimatedSecs")); } } // namespace diff --git a/src/mongo/db/s/sharding_data_transform_metrics_observer.cpp b/src/mongo/db/s/sharding_data_transform_metrics_observer.cpp index eca06d99acb..d1eadcd5038 100644 --- a/src/mongo/db/s/sharding_data_transform_metrics_observer.cpp +++ b/src/mongo/db/s/sharding_data_transform_metrics_observer.cpp @@ -35,12 +35,14 @@ ShardingDataTransformMetricsObserver::ShardingDataTransformMetricsObserver( ShardingDataTransformInstanceMetrics* metrics) : _metrics(metrics) {} -int64_t ShardingDataTransformMetricsObserver::getHighEstimateRemainingTimeMillis() const { - return _metrics->getHighEstimateRemainingTimeMillis().count(); +boost::optional<Milliseconds> +ShardingDataTransformMetricsObserver::getHighEstimateRemainingTimeMillis() const { + return _metrics->getHighEstimateRemainingTimeMillis(); } -int64_t ShardingDataTransformMetricsObserver::getLowEstimateRemainingTimeMillis() const { - return _metrics->getLowEstimateRemainingTimeMillis().count(); +boost::optional<Milliseconds> +ShardingDataTransformMetricsObserver::getLowEstimateRemainingTimeMillis() const { + return _metrics->getLowEstimateRemainingTimeMillis(); } Date_t ShardingDataTransformMetricsObserver::getStartTimestamp() const { diff --git a/src/mongo/db/s/sharding_data_transform_metrics_observer.h b/src/mongo/db/s/sharding_data_transform_metrics_observer.h index 6a7ad9421bd..64e52334d5a 100644 --- a/src/mongo/db/s/sharding_data_transform_metrics_observer.h +++ b/src/mongo/db/s/sharding_data_transform_metrics_observer.h @@ -38,8 +38,8 @@ namespace mongo { class ShardingDataTransformMetricsObserver : public ShardingDataTransformMetricsObserverInterface { public: ShardingDataTransformMetricsObserver(ShardingDataTransformInstanceMetrics* metrics); - int64_t getHighEstimateRemainingTimeMillis() const override; - int64_t getLowEstimateRemainingTimeMillis() const override; + boost::optional<Milliseconds> getHighEstimateRemainingTimeMillis() const override; + boost::optional<Milliseconds> getLowEstimateRemainingTimeMillis() const override; Date_t getStartTimestamp() const override; const UUID& getUuid() const override; ShardingDataTransformMetrics::Role getRole() const override; diff --git a/src/mongo/db/s/sharding_data_transform_metrics_observer_interface.h b/src/mongo/db/s/sharding_data_transform_metrics_observer_interface.h index 68c6426a6ec..de68cfddc26 100644 --- a/src/mongo/db/s/sharding_data_transform_metrics_observer_interface.h +++ b/src/mongo/db/s/sharding_data_transform_metrics_observer_interface.h @@ -36,8 +36,8 @@ namespace mongo { class ShardingDataTransformMetricsObserverInterface { public: virtual ~ShardingDataTransformMetricsObserverInterface() = default; - virtual int64_t getHighEstimateRemainingTimeMillis() const = 0; - virtual int64_t getLowEstimateRemainingTimeMillis() const = 0; + virtual boost::optional<Milliseconds> getHighEstimateRemainingTimeMillis() const = 0; + virtual boost::optional<Milliseconds> getLowEstimateRemainingTimeMillis() const = 0; virtual Date_t getStartTimestamp() const = 0; virtual const UUID& getUuid() const = 0; virtual ShardingDataTransformMetrics::Role getRole() const = 0; diff --git a/src/mongo/db/s/sharding_data_transform_metrics_test_fixture.h b/src/mongo/db/s/sharding_data_transform_metrics_test_fixture.h index a898e9fca73..cc66423e231 100644 --- a/src/mongo/db/s/sharding_data_transform_metrics_test_fixture.h +++ b/src/mongo/db/s/sharding_data_transform_metrics_test_fixture.h @@ -63,11 +63,11 @@ public: return _uuid; } - virtual int64_t getHighEstimateRemainingTimeMillis() const override { + virtual boost::optional<Milliseconds> getHighEstimateRemainingTimeMillis() const override { return _timeRemainingHigh; } - virtual int64_t getLowEstimateRemainingTimeMillis() const override { + virtual boost::optional<Milliseconds> getLowEstimateRemainingTimeMillis() const override { return _timeRemainingLow; } @@ -82,8 +82,8 @@ public: private: UUID _uuid; Date_t _startTime; - int64_t _timeRemainingHigh; - int64_t _timeRemainingLow; + Milliseconds _timeRemainingHigh; + Milliseconds _timeRemainingLow; ShardingDataTransformMetrics::Role _role; }; @@ -186,6 +186,10 @@ protected: return &clock.value(); } + ShardingDataTransformCumulativeMetrics* getCumulativeMetrics() { + return _cumulativeMetrics.get(); + } + using SpecialIndexBehaviorMap = stdx::unordered_map<int, std::function<void()>>; const SpecialIndexBehaviorMap kNoSpecialBehavior{}; SpecialIndexBehaviorMap registerAtIndex(int index, const ObserverMock* mock) { diff --git a/src/mongo/util/optional_util.h b/src/mongo/util/optional_util.h index dca4ab7b7c4..e1820af7320 100644 --- a/src/mongo/util/optional_util.h +++ b/src/mongo/util/optional_util.h @@ -139,4 +139,15 @@ template <typename T, std::enable_if_t<canStreamWithExtension<T>, int> = 0> Extension(const T& t)->Extension<T>; } // namespace optional_io + +namespace optional_util { +template <typename T, typename U> +void setOrAdd(boost::optional<T>& counter, U value) { + if (!counter) { + counter = value; + return; + } + counter = *counter + value; +} +} // namespace optional_util } // namespace mongo |