diff options
author | Brett Nawrocki <brett.nawrocki@mongodb.com> | 2022-04-06 14:44:23 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-04-21 20:43:43 +0000 |
commit | 46be98147821353da22d0e647ff06fdb7e6fa5c7 (patch) | |
tree | af5ac4a01e4eb857ea2e03845f4cafb7e89b2e73 /src | |
parent | 51b92630cc5a6448de092c1c6a38c70d702c5153 (diff) | |
download | mongo-46be98147821353da22d0e647ff06fdb7e6fa5c7.tar.gz |
SERVER-64376 Add Parity Recipient Applying Fields in CurrentOp
Track and report the following fields in
ShardingDataTransformInstanceMetrics for resharding $currentOp:
oplogEntriesFetched
totalApplyTimeElapsedSecs
remainingOperationTimeEstimatedSecs
Diffstat (limited to 'src')
21 files changed, 369 insertions, 137 deletions
diff --git a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp index f8a12c57f90..c30d0e12ecf 100644 --- a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp +++ b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp @@ -60,22 +60,15 @@ boost::optional<std::shared_ptr<ReshardingCoordinatorService::ReshardingCoordina getExistingInstanceToJoin(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& newShardKey) { - auto reshardingCoordinatorService = checked_cast<ReshardingCoordinatorService*>( - repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext()) - ->lookupServiceByName(ReshardingCoordinatorService::kServiceName)); - auto instances = reshardingCoordinatorService->getAllReshardingInstances(opCtx); - for (auto& instance : instances) { - auto reshardingCoordinator = - checked_pointer_cast<ReshardingCoordinatorService::ReshardingCoordinator>(instance); - - auto instanceMetadata = reshardingCoordinator->getMetadata(); + auto instances = + getReshardingStateMachines<ReshardingCoordinatorService, + ReshardingCoordinatorService::ReshardingCoordinator>(opCtx, nss); + for (const auto& instance : instances) { if (SimpleBSONObjComparator::kInstance.evaluate( - instanceMetadata.getReshardingKey().toBSON() == newShardKey) && - instanceMetadata.getSourceNss() == nss) { - return reshardingCoordinator; - }; + instance->getMetadata().getReshardingKey().toBSON() == newShardKey)) { + return instance; + } } - return boost::none; } diff --git a/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.cpp b/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.cpp index a9ef25a0cc2..a898267e8a6 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.cpp @@ -87,17 +87,17 @@ boost::optional<Milliseconds> extractOperationRemainingTime(const BSONObj& obj) } // namespace CoordinatorCommitMonitor::CoordinatorCommitMonitor( + std::shared_ptr<ReshardingMetricsNew> metricsNew, NamespaceString ns, std::vector<ShardId> recipientShards, CoordinatorCommitMonitor::TaskExecutorPtr executor, CancellationToken cancelToken, - ReshardingMetricsNew* metrics, Milliseconds maxDelayBetweenQueries) - : _ns(std::move(ns)), + : _metricsNew{std::move(metricsNew)}, + _ns(std::move(ns)), _recipientShards(std::move(recipientShards)), _executor(std::move(executor)), _cancelToken(std::move(cancelToken)), - _metrics(metrics), _threshold(Milliseconds(gRemainingReshardingOperationTimeThresholdMillis.load())), _maxDelayBetweenQueries(maxDelayBetweenQueries) {} @@ -211,10 +211,9 @@ ExecutorFuture<void> CoordinatorCommitMonitor::_makeFuture() const { auto metrics = ReshardingMetrics::get(cc().getServiceContext()); metrics->setMinRemainingOperationTime(remainingTimes.min); metrics->setMaxRemainingOperationTime(remainingTimes.max); - if (ShardingDataTransformMetrics::isEnabled()) { - _metrics->setLowestEstimatedRemainingOperationTime(remainingTimes.min); - _metrics->setHighestEstimatedRemainingOperationTime(remainingTimes.max); + _metricsNew->setCoordinatorHighEstimateRemainingTimeMillis(remainingTimes.max); + _metricsNew->setCoordinatorLowEstimateRemainingTimeMillis(remainingTimes.min); } // Check if all recipient shards are within the commit threshold. diff --git a/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.h b/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.h index a8b070fd30f..fb9f55d614f 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.h +++ b/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.h @@ -69,11 +69,11 @@ public: Milliseconds max; }; - CoordinatorCommitMonitor(NamespaceString ns, + CoordinatorCommitMonitor(std::shared_ptr<ReshardingMetricsNew> metricsNew, + NamespaceString ns, std::vector<ShardId> recipientShards, TaskExecutorPtr executor, CancellationToken cancelToken, - ReshardingMetricsNew* metrics, Milliseconds maxDelayBetweenQueries = kMaxDelayBetweenQueries); SemiFuture<void> waitUntilRecipientsAreWithinCommitThreshold() const; @@ -95,13 +95,12 @@ private: static constexpr auto kDiagnosticLogLevel = 0; static constexpr auto kMaxDelayBetweenQueries = Seconds(30); + std::shared_ptr<ReshardingMetricsNew> _metricsNew; const NamespaceString _ns; const std::vector<ShardId> _recipientShards; const TaskExecutorPtr _executor; const CancellationToken _cancelToken; - ReshardingMetricsNew* _metrics; - const Milliseconds _threshold; const Milliseconds _maxDelayBetweenQueries; diff --git a/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp index b8fd31b8d1d..831c137f340 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp @@ -107,7 +107,7 @@ private: boost::optional<Callback> _runOnMockingNextResponse; ShardingDataTransformCumulativeMetrics _cumulativeMetrics{"dummyForTest"}; - std::unique_ptr<ReshardingMetricsNew> _metrics; + std::shared_ptr<ReshardingMetricsNew> _metrics; }; auto makeExecutor() { @@ -153,7 +153,7 @@ void CoordinatorCommitMonitorTest::setUp() { _cancellationSource = std::make_unique<CancellationSource>(); - _metrics = std::make_unique<ReshardingMetricsNew>( + _metrics = std::make_shared<ReshardingMetricsNew>( UUID::gen(), BSON("y" << 1), _ns, @@ -162,11 +162,11 @@ void CoordinatorCommitMonitorTest::setUp() { clockSource, &_cumulativeMetrics); - _commitMonitor = std::make_shared<CoordinatorCommitMonitor>(_ns, + _commitMonitor = std::make_shared<CoordinatorCommitMonitor>(_metrics, + _ns, _recipientShards, _futureExecutor, _cancellationSource->token(), - _metrics.get(), Milliseconds(0)); _commitMonitor->setNetworkExecutorForTest(executor()); } diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp index 30acb2f9ae3..c826c3dbb85 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp @@ -1651,6 +1651,7 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsFinished coordinatorDocChangedOnDisk); if (ShardingDataTransformMetrics::isEnabled()) { _metricsNew->onCopyingEnd(); + _metricsNew->onApplyingBegin(); } }) .then([this] { return _waitForMajority(_ctHolder->getAbortToken()); }); @@ -1663,11 +1664,11 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_startCommitMonitor( } _commitMonitor = std::make_shared<resharding::CoordinatorCommitMonitor>( + _metricsNew, _coordinatorDoc.getSourceNss(), extractShardIdsFromParticipantEntries(_coordinatorDoc.getRecipientShards()), **executor, - _ctHolder->getCommitMonitorToken(), - _metricsNew.get()); + _ctHolder->getCommitMonitorToken()); _commitMonitorQuiesced = _commitMonitor->waitUntilRecipientsAreWithinCommitThreshold() .thenRunOn(**executor) @@ -1711,6 +1712,7 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsFinished this->_updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum::kBlockingWrites, _coordinatorDoc); if (ShardingDataTransformMetrics::isEnabled()) { + _metricsNew->onApplyingEnd(); _metricsNew->onCriticalSectionBegin(); } }) diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.h b/src/mongo/db/s/resharding/resharding_coordinator_service.h index 280abbac2f3..49af5739b68 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.h +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.h @@ -507,7 +507,7 @@ private: // The primary-only service instance corresponding to the coordinator instance. Not owned. const ReshardingCoordinatorService* const _coordinatorService; - std::unique_ptr<ReshardingMetricsNew> _metricsNew; + std::shared_ptr<ReshardingMetricsNew> _metricsNew; // The in-memory representation of the immutable portion of the document in // config.reshardingOperations. diff --git a/src/mongo/db/s/resharding/resharding_data_replication.cpp b/src/mongo/db/s/resharding/resharding_data_replication.cpp index cb468d16aa8..0b7a6c9efb9 100644 --- a/src/mongo/db/s/resharding/resharding_data_replication.cpp +++ b/src/mongo/db/s/resharding/resharding_data_replication.cpp @@ -113,6 +113,7 @@ std::vector<std::unique_ptr<ReshardingTxnCloner>> ReshardingDataReplication::_ma std::vector<std::unique_ptr<ReshardingOplogFetcher>> ReshardingDataReplication::_makeOplogFetchers( OperationContext* opCtx, ReshardingMetrics* metrics, + ReshardingMetricsNew* metricsNew, const CommonReshardingMetadata& metadata, const std::vector<DonorShardFetchTimestamp>& donorShards, const ShardId& myShardId) { @@ -128,7 +129,8 @@ std::vector<std::unique_ptr<ReshardingOplogFetcher>> ReshardingDataReplication:: invariant((idToResumeFrom >= ReshardingDonorOplogId{minFetchTimestamp, minFetchTimestamp})); oplogFetchers.emplace_back(std::make_unique<ReshardingOplogFetcher>( - std::make_unique<ReshardingOplogFetcher::Env>(opCtx->getServiceContext(), metrics), + std::make_unique<ReshardingOplogFetcher::Env>( + opCtx->getServiceContext(), metrics, metricsNew), metadata.getReshardingUUID(), metadata.getSourceUUID(), // The recipient fetches oplog entries from the donor starting from the largest _id @@ -225,7 +227,8 @@ std::unique_ptr<ReshardingDataReplicationInterface> ReshardingDataReplication::m txnCloners = _makeTxnCloners(metadata, donorShards); } - auto oplogFetchers = _makeOplogFetchers(opCtx, metrics, metadata, donorShards, myShardId); + auto oplogFetchers = + _makeOplogFetchers(opCtx, metrics, metricsNew, metadata, donorShards, myShardId); auto oplogFetcherExecutor = _makeOplogFetcherExecutor(donorShards.size()); diff --git a/src/mongo/db/s/resharding/resharding_data_replication.h b/src/mongo/db/s/resharding/resharding_data_replication.h index 16bee54da01..47417ae8363 100644 --- a/src/mongo/db/s/resharding/resharding_data_replication.h +++ b/src/mongo/db/s/resharding/resharding_data_replication.h @@ -211,6 +211,7 @@ private: static std::vector<std::unique_ptr<ReshardingOplogFetcher>> _makeOplogFetchers( OperationContext* opCtx, ReshardingMetrics* metrics, + ReshardingMetricsNew* metricsNew, const CommonReshardingMetadata& metadata, const std::vector<DonorShardFetchTimestamp>& donorShards, const ShardId& myShardId); diff --git a/src/mongo/db/s/resharding/resharding_metrics.cpp b/src/mongo/db/s/resharding/resharding_metrics.cpp index 6e4e4e041e9..fa5fea13683 100644 --- a/src/mongo/db/s/resharding/resharding_metrics.cpp +++ b/src/mongo/db/s/resharding/resharding_metrics.cpp @@ -33,6 +33,7 @@ #include <memory> #include "mongo/db/s/resharding/resharding_metrics.h" +#include "mongo/db/s/resharding/resharding_util.h" #include "mongo/logv2/log.h" #include "mongo/platform/compiler.h" #include "mongo/util/aligned.h" @@ -82,20 +83,6 @@ const auto reshardingMetricsRegisterer = ServiceContext::ConstructorActionRegist "ReshardingMetrics", [](ServiceContext* ctx) { getMetrics(ctx) = std::make_unique<ReshardingMetrics>(ctx); }}; -/** - * Given a constant rate of time per unit of work: - * totalTime / totalWork == elapsedTime / elapsedWork - * Solve for remaining time. - * remainingTime := totalTime - elapsedTime - * == (totalWork * (elapsedTime / elapsedWork)) - elapsedTime - * == elapsedTime * (totalWork / elapsedWork - 1) - */ -Milliseconds remainingTime(Milliseconds elapsedTime, double elapsedWork, double totalWork) { - elapsedWork = std::min(elapsedWork, totalWork); - double remainingMsec = 1.0 * elapsedTime.count() * (totalWork / elapsedWork - 1); - return Milliseconds(Milliseconds::rep(remainingMsec)); -} - static StringData serializeState(boost::optional<RecipientStateEnum> e) { return RecipientState_serializer(*e); } @@ -255,21 +242,13 @@ void ReshardingMetrics::OperationMetrics::gotDelete() noexcept { boost::optional<Milliseconds> ReshardingMetrics::OperationMetrics::remainingOperationTime( Date_t now) const { - if (recipientState > RecipientStateEnum::kCloning && oplogEntriesFetched == 0) { - return Milliseconds(0); - } - - if (oplogEntriesApplied > 0 && oplogEntriesFetched > 0) { - // All fetched oplogEntries must be applied. Some of them already have been. - return remainingTime( - applyingOplogEntries.duration(now), oplogEntriesApplied, oplogEntriesFetched); - } - if (bytesCopied > 0 && bytesToCopy > 0) { - // Until the time to apply batches of oplog entries is measured, we assume that applying all - // of them will take as long as copying did. - return remainingTime(copyingDocuments.duration(now), bytesCopied, 2 * bytesToCopy); - } - return {}; + return estimateRemainingRecipientTime(recipientState > RecipientStateEnum::kCloning, + bytesCopied, + bytesToCopy, + copyingDocuments.duration(now), + oplogEntriesApplied, + oplogEntriesFetched, + applyingOplogEntries.duration(now)); } void ReshardingMetrics::OperationMetrics::appendCurrentOpMetrics(BSONObjBuilder* bob, diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp index 30811c8d2aa..5aed035817d 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp @@ -46,6 +46,7 @@ #include "mongo/db/repl/read_concern_args.h" #include "mongo/db/repl/read_concern_level.h" #include "mongo/db/s/resharding/resharding_metrics.h" +#include "mongo/db/s/resharding/resharding_metrics_new.h" #include "mongo/db/s/resharding/resharding_util.h" #include "mongo/db/storage/write_unit_of_work.h" #include "mongo/executor/task_executor.h" @@ -342,6 +343,9 @@ bool ReshardingOplogFetcher::consume(Client* client, ++_numOplogEntriesCopied; _env->metrics()->onOplogEntriesFetched(1); + if (ShardingDataTransformMetrics::isEnabled()) { + _env->metricsNew()->onOplogEntriesFetched(1); + } auto [p, f] = makePromiseFuture<void>(); { @@ -385,6 +389,9 @@ bool ReshardingOplogFetcher::consume(Client* client, wuow.commit(); _env->metrics()->onOplogEntriesFetched(1); + if (ShardingDataTransformMetrics::isEnabled()) { + _env->metricsNew()->onOplogEntriesFetched(1); + } auto [p, f] = makePromiseFuture<void>(); { diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.h b/src/mongo/db/s/resharding/resharding_oplog_fetcher.h index e0aad151f78..d0d53395647 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_fetcher.h +++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.h @@ -51,23 +51,28 @@ namespace mongo { class ReshardingMetrics; +class ReshardingMetricsNew; class ReshardingOplogFetcher : public resharding::OnInsertAwaitable { public: class Env { public: - Env(ServiceContext* service, ReshardingMetrics* metrics) - : _service(service), _metrics(metrics) {} + Env(ServiceContext* service, ReshardingMetrics* metrics, ReshardingMetricsNew* metricsNew) + : _service(service), _metrics(metrics), _metricsNew(metricsNew) {} ServiceContext* service() const { return _service; } ReshardingMetrics* metrics() const { return _metrics; } + ReshardingMetricsNew* metricsNew() const { + return _metricsNew; + } private: ServiceContext* _service; ReshardingMetrics* _metrics; + ReshardingMetricsNew* _metricsNew; }; // Special value to use for startAt to indicate there are no more oplog entries needing to be diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp index e7c512e2669..24d9f016e59 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp @@ -47,6 +47,7 @@ #include "mongo/db/repl/wait_for_majority_service.h" #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/resharding/resharding_metrics.h" +#include "mongo/db/s/resharding/resharding_metrics_new.h" #include "mongo/db/s/resharding/resharding_oplog_fetcher.h" #include "mongo/db/s/resharding/resharding_util.h" #include "mongo/db/s/shard_server_test_fixture.h" @@ -101,6 +102,13 @@ public: _metrics->onStart(ReshardingMetrics::Role::kRecipient, _svcCtx->getFastClockSource()->now()); _metrics->setRecipientState(RecipientStateEnum::kCloning); + _metricsNew = + ReshardingMetricsNew::makeInstance(_reshardingUUID, + BSON("y" << 1), + NamespaceString{""}, + ReshardingMetricsNew::Role::kRecipient, + getServiceContext()->getFastClockSource()->now(), + getServiceContext()); for (const auto& shardId : kTwoShardIdList) { auto shardTargeter = RemoteCommandTargeterMock::get( @@ -128,7 +136,8 @@ public: } auto makeFetcherEnv() { - return std::make_unique<ReshardingOplogFetcher::Env>(_svcCtx, &*_metrics); + return std::make_unique<ReshardingOplogFetcher::Env>( + _svcCtx, _metrics.get(), _metricsNew.get()); } /** @@ -341,6 +350,7 @@ protected: ShardId _donorShard; ShardId _destinationShard; std::unique_ptr<ReshardingMetrics> _metrics; + std::unique_ptr<ReshardingMetricsNew> _metricsNew; private: static HostAndPort makeHostAndPort(const ShardId& shardId) { diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp index cf6b9fe4619..67dd4a81ceb 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp @@ -829,6 +829,7 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionToApplying( _metrics()->startApplyingOplogEntries(currentTime); if (ShardingDataTransformMetrics::isEnabled()) { _metricsNew->onCopyingEnd(); + _metricsNew->onApplyingBegin(); } } @@ -839,6 +840,9 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionToStrictConsi _transitionState(std::move(newRecipientCtx), boost::none, boost::none, factory); auto currentTime = getCurrentTime(); _metrics()->endApplyingOplogEntries(currentTime); + if (ShardingDataTransformMetrics::isEnabled()) { + _metricsNew->onApplyingEnd(); + } } void ReshardingRecipientService::RecipientStateMachine::_transitionToError( diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h index 19dd962ace9..832e1dd8083 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.h +++ b/src/mongo/db/s/resharding/resharding_recipient_service.h @@ -71,6 +71,11 @@ public: std::shared_ptr<repl::PrimaryOnlyService::Instance> constructInstance( BSONObj initialState) override; + + inline std::vector<std::shared_ptr<PrimaryOnlyService::Instance>> getAllReshardingInstances( + OperationContext* opCtx) { + return getAllInstances(opCtx); + } }; /** @@ -154,6 +159,15 @@ public: return _completionPromise.getFuture(); } + inline const CommonReshardingMetadata& getMetadata() const { + return _metadata; + } + + inline const ReshardingMetricsNew& getMetrics() const { + invariant(_metricsNew); + return *_metricsNew; + } + boost::optional<BSONObj> reportForCurrentOp( MongoProcessInterface::CurrentOpConnectionsMode, MongoProcessInterface::CurrentOpSessionsMode) noexcept override; diff --git a/src/mongo/db/s/resharding/resharding_util.cpp b/src/mongo/db/s/resharding/resharding_util.cpp index 27d319264dd..aec59fd5075 100644 --- a/src/mongo/db/s/resharding/resharding_util.cpp +++ b/src/mongo/db/s/resharding/resharding_util.cpp @@ -49,6 +49,8 @@ #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/resharding/document_source_resharding_add_resume_id.h" #include "mongo/db/s/resharding/document_source_resharding_iterate_transaction.h" +#include "mongo/db/s/resharding/resharding_metrics.h" +#include "mongo/db/s/resharding/resharding_metrics_new.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/storage/write_unit_of_work.h" #include "mongo/logv2/log.h" @@ -60,6 +62,23 @@ #include "mongo/s/shard_key_pattern.h" namespace mongo { + +namespace { +/** + * Given a constant rate of time per unit of work: + * totalTime / totalWork == elapsedTime / elapsedWork + * Solve for remaining time. + * remainingTime := totalTime - elapsedTime + * == (totalWork * (elapsedTime / elapsedWork)) - elapsedTime + * == elapsedTime * (totalWork / elapsedWork - 1) + */ +Milliseconds estimateRemainingTime(Milliseconds elapsedTime, double elapsedWork, double totalWork) { + elapsedWork = std::min(elapsedWork, totalWork); + double remainingMsec = 1.0 * elapsedTime.count() * (totalWork / elapsedWork - 1); + return Milliseconds(Milliseconds::rep(std::round(remainingMsec))); +} +} // namespace + using namespace fmt::literals; BSONObj serializeAndTruncateReshardingErrorIfNeeded(Status originalError) { @@ -372,4 +391,26 @@ void doNoopWrite(OperationContext* opCtx, StringData opStr, const NamespaceStrin }); } +boost::optional<Milliseconds> estimateRemainingRecipientTime(bool applyingBegan, + int64_t bytesCopied, + int64_t bytesToCopy, + Milliseconds timeSpentCopying, + int64_t oplogEntriesApplied, + int64_t oplogEntriesFetched, + Milliseconds timeSpentApplying) { + if (applyingBegan && oplogEntriesFetched == 0) { + return Milliseconds(0); + } + if (oplogEntriesApplied > 0 && oplogEntriesFetched > 0) { + // All fetched oplogEntries must be applied. Some of them already have been. + return estimateRemainingTime(timeSpentApplying, oplogEntriesApplied, oplogEntriesFetched); + } + if (bytesCopied > 0 && bytesToCopy > 0) { + // Until the time to apply batches of oplog entries is measured, we assume that applying all + // of them will take as long as copying did. + return estimateRemainingTime(timeSpentCopying, bytesCopied, 2 * bytesToCopy); + } + return {}; +} + } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_util.h b/src/mongo/db/s/resharding/resharding_util.h index 856d7cbb081..194381e7e78 100644 --- a/src/mongo/db/s/resharding/resharding_util.h +++ b/src/mongo/db/s/resharding/resharding_util.h @@ -37,6 +37,7 @@ #include "mongo/db/keypattern.h" #include "mongo/db/operation_context.h" #include "mongo/db/pipeline/pipeline.h" +#include "mongo/db/repl/primary_only_service.h" #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/resharding/coordinator_document_gen.h" #include "mongo/db/s/resharding/donor_oplog_id_gen.h" @@ -293,4 +294,35 @@ NamespaceString getLocalConflictStashNamespace(UUID existingUUID, ShardId donorS void doNoopWrite(OperationContext* opCtx, StringData opStr, const NamespaceString& nss); +boost::optional<Milliseconds> estimateRemainingRecipientTime(bool applyingBegan, + int64_t bytesCopied, + int64_t bytesToCopy, + Milliseconds timeSpentCopying, + int64_t oplogEntriesApplied, + int64_t oplogEntriesFetched, + Milliseconds timeSpentApplying); +/** + * Looks up the StateMachine by namespace of the collection being resharded. If it does not exist, + * returns boost::none. + */ +template <class Service, class Instance> +std::vector<std::shared_ptr<Instance>> getReshardingStateMachines(OperationContext* opCtx, + const NamespaceString& sourceNs) { + auto service = + checked_cast<Service*>(repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext()) + ->lookupServiceByName(Service::kServiceName)); + auto instances = service->getAllReshardingInstances(opCtx); + std::vector<std::shared_ptr<Instance>> result; + for (const auto& genericInstace : instances) { + auto instance = checked_pointer_cast<Instance>(genericInstace); + auto metadata = instance->getMetadata(); + if (metadata.getSourceNss() != sourceNs) { + continue; + } + result.emplace_back(std::move(instance)); + } + return result; +} + + } // namespace mongo diff --git a/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp b/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp index d5c94d46416..095f11d1e76 100644 --- a/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp +++ b/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp @@ -28,33 +28,29 @@ */ #include "mongo/db/s/sharding_data_transform_instance_metrics.h" +#include "mongo/db/s/resharding/resharding_util.h" #include "mongo/db/s/sharding_data_transform_metrics_observer.h" -namespace { -constexpr int64_t kPlaceholderTimestampForTesting = 0; -constexpr int64_t kPlaceholderTimeRemainingForTesting = 0; -constexpr auto TEMP_VALUE = "placeholder"; - -} // namespace - namespace mongo { namespace { constexpr auto kNoDate = Date_t::min(); -int64_t getElapsedTimeSeconds(const AtomicWord<Date_t>& startTime, - const AtomicWord<Date_t>& endTime, - ClockSource* clock) { +template <typename T> +T getElapsed(const AtomicWord<Date_t>& startTime, + const AtomicWord<Date_t>& endTime, + ClockSource* clock) { auto start = startTime.load(); if (start == kNoDate) { - return 0; + return T{0}; } auto end = endTime.load(); if (end == kNoDate) { end = clock->now(); } - return durationCount<Seconds>(end - start); + return duration_cast<T>(end - start); } + } // namespace ShardingDataTransformInstanceMetrics::ShardingDataTransformInstanceMetrics( @@ -99,15 +95,18 @@ ShardingDataTransformInstanceMetrics::ShardingDataTransformInstanceMetrics( _documentsCopied{0}, _approxBytesToCopy{0}, _bytesCopied{0}, + _applyingStartTime{kNoDate}, + _applyingEndTime{kNoDate}, + _oplogEntriesFetched{0}, _insertsApplied{0}, _updatesApplied{0}, _deletesApplied{0}, _oplogEntriesApplied{0}, + _coordinatorHighEstimateRemainingTimeMillis{Milliseconds{0}}, + _coordinatorLowEstimateRemainingTimeMillis{Milliseconds{0}}, _criticalSectionStartTime{kNoDate}, _criticalSectionEndTime{kNoDate}, - _writesDuringCriticalSection{0}, - _lowestEstimatedRemainingOperationTime{Milliseconds(0)}, - _highestEstimatedRemainingOperationTime{Milliseconds(0)} {} + _writesDuringCriticalSection{0} {} ShardingDataTransformInstanceMetrics::~ShardingDataTransformInstanceMetrics() { if (_deregister) { @@ -115,12 +114,39 @@ ShardingDataTransformInstanceMetrics::~ShardingDataTransformInstanceMetrics() { } } -int64_t ShardingDataTransformInstanceMetrics::getHighEstimateRemainingTimeMillis() const { - return durationCount<Milliseconds>(_highestEstimatedRemainingOperationTime.load()); +Milliseconds ShardingDataTransformInstanceMetrics::getHighEstimateRemainingTimeMillis() const { + switch (_role) { + case Role::kRecipient: { + auto estimate = estimateRemainingRecipientTime(_applyingStartTime.load() != kNoDate, + _bytesCopied.load(), + _approxBytesToCopy.load(), + getCopyingElapsedTimeSecs(), + _oplogEntriesApplied.load(), + _oplogEntriesFetched.load(), + getApplyingElapsedTimeSecs()); + if (!estimate) { + return Milliseconds{0}; + } + return *estimate; + } + case Role::kCoordinator: + return Milliseconds{_coordinatorHighEstimateRemainingTimeMillis.load()}; + case Role::kDonor: + break; + } + MONGO_UNREACHABLE; } -int64_t ShardingDataTransformInstanceMetrics::getLowEstimateRemainingTimeMillis() const { - return durationCount<Milliseconds>(_lowestEstimatedRemainingOperationTime.load()); +Milliseconds ShardingDataTransformInstanceMetrics::getLowEstimateRemainingTimeMillis() const { + switch (_role) { + case Role::kRecipient: + return getHighEstimateRemainingTimeMillis(); + case Role::kCoordinator: + return _coordinatorLowEstimateRemainingTimeMillis.load(); + case Role::kDonor: + break; + } + MONGO_UNREACHABLE; } Date_t ShardingDataTransformInstanceMetrics::getStartTimestamp() const { @@ -153,30 +179,33 @@ BSONObj ShardingDataTransformInstanceMetrics::reportForCurrentOp() const noexcep builder.append(kOp, "command"); builder.append(kNamespace, _sourceNs.toString()); builder.append(kOriginatingCommand, _originalCommand); - builder.append(kOpTimeElapsed, getOperationRunningTimeSecs()); + builder.append(kOpTimeElapsed, getOperationRunningTimeSecs().count()); switch (_role) { case Role::kCoordinator: builder.append(kAllShardsHighestRemainingOperationTimeEstimatedSecs, - durationCount<Seconds>(_highestEstimatedRemainingOperationTime.load())); + durationCount<Seconds>(getHighEstimateRemainingTimeMillis())); builder.append(kAllShardsLowestRemainingOperationTimeEstimatedSecs, - durationCount<Seconds>(_lowestEstimatedRemainingOperationTime.load())); + durationCount<Seconds>(getLowEstimateRemainingTimeMillis())); builder.append(kCoordinatorState, getStateString()); - builder.append(kApplyTimeElapsed, TEMP_VALUE); - builder.append(kCopyTimeElapsed, getCopyingElapsedTimeSecs()); - builder.append(kCriticalSectionTimeElapsed, getCriticalSectionElapsedTimeSecs()); + builder.append(kApplyTimeElapsed, getApplyingElapsedTimeSecs().count()); + builder.append(kCopyTimeElapsed, getCopyingElapsedTimeSecs().count()); + builder.append(kCriticalSectionTimeElapsed, + getCriticalSectionElapsedTimeSecs().count()); break; case Role::kDonor: builder.append(kDonorState, getStateString()); - builder.append(kCriticalSectionTimeElapsed, getCriticalSectionElapsedTimeSecs()); + builder.append(kCriticalSectionTimeElapsed, + getCriticalSectionElapsedTimeSecs().count()); builder.append(kCountWritesDuringCriticalSection, _writesDuringCriticalSection.load()); builder.append(kCountReadsDuringCriticalSection, _readsDuringCriticalSection.load()); break; case Role::kRecipient: builder.append(kRecipientState, getStateString()); - builder.append(kApplyTimeElapsed, TEMP_VALUE); - builder.append(kCopyTimeElapsed, getCopyingElapsedTimeSecs()); - builder.append(kRemainingOpTimeEstimated, TEMP_VALUE); + builder.append(kApplyTimeElapsed, getApplyingElapsedTimeSecs().count()); + builder.append(kCopyTimeElapsed, getCopyingElapsedTimeSecs().count()); + builder.append(kRemainingOpTimeEstimated, + durationCount<Seconds>(getHighEstimateRemainingTimeMillis())); builder.append(kApproxDocumentsToCopy, _approxDocumentsToCopy.load()); builder.append(kApproxBytesToCopy, _approxBytesToCopy.load()); builder.append(kBytesCopied, _bytesCopied.load()); @@ -185,7 +214,7 @@ BSONObj ShardingDataTransformInstanceMetrics::reportForCurrentOp() const noexcep builder.append(kUpdatesApplied, _updatesApplied.load()); builder.append(kDeletesApplied, _deletesApplied.load()); builder.append(kOplogEntriesApplied, _oplogEntriesApplied.load()); - builder.append(kOplogEntriesFetched, TEMP_VALUE); + builder.append(kOplogEntriesFetched, _oplogEntriesFetched.load()); builder.append(kDocumentsCopied, _documentsCopied.load()); break; default: @@ -203,6 +232,14 @@ void ShardingDataTransformInstanceMetrics::onCopyingEnd() { _copyingEndTime.store(_clockSource->now()); } +void ShardingDataTransformInstanceMetrics::onApplyingBegin() { + _applyingStartTime.store(_clockSource->now()); +} + +void ShardingDataTransformInstanceMetrics::onApplyingEnd() { + _applyingEndTime.store(_clockSource->now()); +} + void ShardingDataTransformInstanceMetrics::onDocumentsCopied(int64_t documentCount, int64_t totalDocumentsSizeBytes) { _documentsCopied.addAndFetch(documentCount); @@ -215,6 +252,16 @@ void ShardingDataTransformInstanceMetrics::setDocumentsToCopyCounts( _approxBytesToCopy.store(totalDocumentsSizeBytes); } +void ShardingDataTransformInstanceMetrics::setCoordinatorHighEstimateRemainingTimeMillis( + Milliseconds milliseconds) { + _coordinatorHighEstimateRemainingTimeMillis.store(milliseconds); +} + +void ShardingDataTransformInstanceMetrics::setCoordinatorLowEstimateRemainingTimeMillis( + Milliseconds milliseconds) { + _coordinatorLowEstimateRemainingTimeMillis.store(milliseconds); +} + void ShardingDataTransformInstanceMetrics::onInsertApplied() { _insertsApplied.addAndFetch(1); } @@ -227,6 +274,10 @@ void ShardingDataTransformInstanceMetrics::onDeleteApplied() { _deletesApplied.addAndFetch(1); } +void ShardingDataTransformInstanceMetrics::onOplogEntriesFetched(int64_t numEntries) { + _oplogEntriesFetched.addAndFetch(numEntries); +} + void ShardingDataTransformInstanceMetrics::onOplogEntriesApplied(int64_t numEntries) { _oplogEntriesApplied.addAndFetch(numEntries); } @@ -243,16 +294,20 @@ void ShardingDataTransformInstanceMetrics::onCriticalSectionEnd() { _criticalSectionEndTime.store(_clockSource->now()); } -inline int64_t ShardingDataTransformInstanceMetrics::getOperationRunningTimeSecs() const { - return durationCount<Seconds>(_clockSource->now() - _startTime); +Seconds ShardingDataTransformInstanceMetrics::getOperationRunningTimeSecs() const { + return duration_cast<Seconds>(_clockSource->now() - _startTime); +} + +Seconds ShardingDataTransformInstanceMetrics::getCopyingElapsedTimeSecs() const { + return getElapsed<Seconds>(_copyingStartTime, _copyingEndTime, _clockSource); } -int64_t ShardingDataTransformInstanceMetrics::getCriticalSectionElapsedTimeSecs() const { - return getElapsedTimeSeconds(_criticalSectionStartTime, _criticalSectionEndTime, _clockSource); +Seconds ShardingDataTransformInstanceMetrics::getApplyingElapsedTimeSecs() const { + return getElapsed<Seconds>(_applyingStartTime, _applyingEndTime, _clockSource); } -int64_t ShardingDataTransformInstanceMetrics::getCopyingElapsedTimeSecs() const { - return getElapsedTimeSeconds(_copyingStartTime, _copyingEndTime, _clockSource); +Seconds ShardingDataTransformInstanceMetrics::getCriticalSectionElapsedTimeSecs() const { + return getElapsed<Seconds>(_criticalSectionStartTime, _criticalSectionEndTime, _clockSource); } void ShardingDataTransformInstanceMetrics::onWriteToStashedCollections() { @@ -271,14 +326,4 @@ void ShardingDataTransformInstanceMetrics::accumulateValues(int64_t insertsAppli _deletesApplied.fetchAndAdd(deletesApplied); } -void ShardingDataTransformInstanceMetrics::setLowestEstimatedRemainingOperationTime( - Milliseconds time) { - _lowestEstimatedRemainingOperationTime.store(time); -} - -void ShardingDataTransformInstanceMetrics::setHighestEstimatedRemainingOperationTime( - Milliseconds time) { - _highestEstimatedRemainingOperationTime.store(time); -} - } // namespace mongo diff --git a/src/mongo/db/s/sharding_data_transform_instance_metrics.h b/src/mongo/db/s/sharding_data_transform_instance_metrics.h index 66c6092b722..e66a80e501b 100644 --- a/src/mongo/db/s/sharding_data_transform_instance_metrics.h +++ b/src/mongo/db/s/sharding_data_transform_instance_metrics.h @@ -60,17 +60,22 @@ public: virtual ~ShardingDataTransformInstanceMetrics(); BSONObj reportForCurrentOp() const noexcept; - int64_t getHighEstimateRemainingTimeMillis() const; - int64_t getLowEstimateRemainingTimeMillis() const; + Milliseconds getHighEstimateRemainingTimeMillis() const; + Milliseconds getLowEstimateRemainingTimeMillis() const; Date_t getStartTimestamp() const; const UUID& getInstanceId() const; void onCopyingBegin(); void onCopyingEnd(); + void onApplyingBegin(); + void onApplyingEnd(); void onDocumentsCopied(int64_t documentCount, int64_t totalDocumentsSizeBytes); void setDocumentsToCopyCounts(int64_t documentCount, int64_t totalDocumentsSizeBytes); + void setCoordinatorHighEstimateRemainingTimeMillis(Milliseconds milliseconds); + void setCoordinatorLowEstimateRemainingTimeMillis(Milliseconds milliseconds); void onInsertApplied(); void onUpdateApplied(); void onDeleteApplied(); + void onOplogEntriesFetched(int64_t numEntries); void onOplogEntriesApplied(int64_t numEntries); void onWriteToStashedCollections(); @@ -79,10 +84,11 @@ public: void onCriticalSectionBegin(); void onCriticalSectionEnd(); - void setLowestEstimatedRemainingOperationTime(Milliseconds time); - void setHighestEstimatedRemainingOperationTime(Milliseconds time); - Role getRole() const; + Seconds getOperationRunningTimeSecs() const; + Seconds getCopyingElapsedTimeSecs() const; + Seconds getApplyingElapsedTimeSecs() const; + Seconds getCriticalSectionElapsedTimeSecs() const; protected: virtual std::string createOperationDescription() const noexcept; @@ -125,10 +131,6 @@ protected: "allShardsHighestRemainingOperationTimeEstimatedSecs"; private: - inline int64_t getOperationRunningTimeSecs() const; - int64_t getCriticalSectionElapsedTimeSecs() const; - int64_t getCopyingElapsedTimeSecs() const; - ClockSource* _clockSource; ObserverPtr _observer; ShardingDataTransformCumulativeMetrics* _cumulativeMetrics; @@ -143,19 +145,23 @@ private: AtomicWord<int32_t> _approxBytesToCopy; AtomicWord<int32_t> _bytesCopied; + AtomicWord<Date_t> _applyingStartTime; + AtomicWord<Date_t> _applyingEndTime; + AtomicWord<int64_t> _oplogEntriesFetched; + AtomicWord<int64_t> _insertsApplied; AtomicWord<int64_t> _updatesApplied; AtomicWord<int64_t> _deletesApplied; AtomicWord<int64_t> _oplogEntriesApplied; AtomicWord<int64_t> _writesToStashCollections; + AtomicWord<Milliseconds> _coordinatorHighEstimateRemainingTimeMillis; + AtomicWord<Milliseconds> _coordinatorLowEstimateRemainingTimeMillis; + AtomicWord<Date_t> _criticalSectionStartTime; AtomicWord<Date_t> _criticalSectionEndTime; AtomicWord<int64_t> _readsDuringCriticalSection; AtomicWord<int64_t> _writesDuringCriticalSection; - - AtomicWord<Milliseconds> _lowestEstimatedRemainingOperationTime; - AtomicWord<Milliseconds> _highestEstimatedRemainingOperationTime; }; } // namespace mongo diff --git a/src/mongo/db/s/sharding_data_transform_instance_metrics_test.cpp b/src/mongo/db/s/sharding_data_transform_instance_metrics_test.cpp index ef32091c77b..bb300261e9a 100644 --- a/src/mongo/db/s/sharding_data_transform_instance_metrics_test.cpp +++ b/src/mongo/db/s/sharding_data_transform_instance_metrics_test.cpp @@ -249,6 +249,17 @@ TEST_F(ShardingDataTransformInstanceMetricsTest, DonorIncrementReadsDuringCritic ASSERT_EQ(report.getIntField("countReadsDuringCriticalSection"), 1); } +TEST_F(ShardingDataTransformInstanceMetricsTest, RecipientIncrementFetchedOplogEntries) { + auto metrics = createInstanceMetrics(UUID::gen(), Role::kRecipient); + + auto report = metrics->reportForCurrentOp(); + ASSERT_EQ(report.getIntField("oplogEntriesFetched"), 0); + metrics->onOplogEntriesFetched(50); + + report = metrics->reportForCurrentOp(); + ASSERT_EQ(report.getIntField("oplogEntriesFetched"), 50); +} + TEST_F(ShardingDataTransformInstanceMetricsTest, CurrentOpReportsCriticalSectionTime) { runTimeReportTest( "CurrentOpReportsCriticalSectionTime", @@ -289,6 +300,59 @@ TEST_F(ShardingDataTransformInstanceMetricsTest, RecipientIncrementsDocumentsAnd ASSERT_EQ(report.getIntField("bytesCopied"), 1000); } +TEST_F(ShardingDataTransformInstanceMetricsTest, RecipientReportsRemainingTime) { + auto metrics = createInstanceMetrics(UUID::gen(), Role::kRecipient); + const auto& clock = getClockSource(); + constexpr auto kIncrement = Milliseconds(5000); + constexpr auto kOpsPerIncrement = 25; + const auto kIncrementSecs = durationCount<Seconds>(kIncrement); + const auto kExpectedTotal = kIncrementSecs * 8; + metrics->setDocumentsToCopyCounts(0, kOpsPerIncrement * 4); + metrics->onOplogEntriesFetched(kOpsPerIncrement * 4); + + // Before cloning. + auto report = metrics->reportForCurrentOp(); + ASSERT_EQ(report.getIntField("remainingOperationTimeEstimatedSecs"), 0); + + // During cloning. + metrics->onCopyingBegin(); + metrics->onDocumentsCopied(0, kOpsPerIncrement); + clock->advance(kIncrement); + report = metrics->reportForCurrentOp(); + ASSERT_EQ(report.getIntField("remainingOperationTimeEstimatedSecs"), + kExpectedTotal - kIncrementSecs); + + metrics->onDocumentsCopied(0, kOpsPerIncrement * 2); + clock->advance(kIncrement * 2); + report = metrics->reportForCurrentOp(); + ASSERT_EQ(report.getIntField("remainingOperationTimeEstimatedSecs"), + kExpectedTotal - (kIncrementSecs * 3)); + + // During applying. + metrics->onDocumentsCopied(0, kOpsPerIncrement); + clock->advance(kIncrement); + metrics->onCopyingEnd(); + metrics->onApplyingBegin(); + metrics->onOplogEntriesApplied(kOpsPerIncrement); + clock->advance(kIncrement); + report = metrics->reportForCurrentOp(); + ASSERT_EQ(report.getIntField("remainingOperationTimeEstimatedSecs"), + kExpectedTotal - (kIncrementSecs * 5)); + + metrics->onOplogEntriesApplied(kOpsPerIncrement * 2); + clock->advance(kIncrement * 2); + report = metrics->reportForCurrentOp(); + ASSERT_EQ(report.getIntField("remainingOperationTimeEstimatedSecs"), + kExpectedTotal - (kIncrementSecs * 7)); + + // Done. + metrics->onOplogEntriesApplied(kOpsPerIncrement); + clock->advance(kIncrement); + metrics->onApplyingEnd(); + report = metrics->reportForCurrentOp(); + ASSERT_EQ(report.getIntField("remainingOperationTimeEstimatedSecs"), 0); +} + TEST_F(ShardingDataTransformInstanceMetricsTest, CurrentOpReportsCopyingTime) { runTimeReportTest( "CurrentOpReportsCopyingTime", @@ -298,6 +362,15 @@ TEST_F(ShardingDataTransformInstanceMetricsTest, CurrentOpReportsCopyingTime) { [](ShardingDataTransformInstanceMetrics* metrics) { metrics->onCopyingEnd(); }); } +TEST_F(ShardingDataTransformInstanceMetricsTest, CurrentOpReportsApplyingTime) { + runTimeReportTest( + "CurrentOpReportsApplyingTime", + {Role::kRecipient, Role::kCoordinator}, + "totalApplyTimeElapsedSecs", + [](ShardingDataTransformInstanceMetrics* metrics) { metrics->onApplyingBegin(); }, + [](ShardingDataTransformInstanceMetrics* metrics) { metrics->onApplyingEnd(); }); +} + TEST_F(ShardingDataTransformInstanceMetricsTest, CurrentOpReportsRunningTime) { auto uuid = UUID::gen(); auto now = getClockSource()->now(); @@ -331,7 +404,7 @@ TEST_F(ShardingDataTransformInstanceMetricsTest, auto report = metrics->reportForCurrentOp(); ASSERT_EQ(report.getIntField("allShardsLowestRemainingOperationTimeEstimatedSecs"), 0); - metrics->setLowestEstimatedRemainingOperationTime(Milliseconds(2000)); + metrics->setCoordinatorLowEstimateRemainingTimeMillis(Milliseconds(2000)); report = metrics->reportForCurrentOp(); ASSERT_EQ(report.getIntField("allShardsLowestRemainingOperationTimeEstimatedSecs"), 2); @@ -343,7 +416,7 @@ TEST_F(ShardingDataTransformInstanceMetricsTest, auto report = metrics->reportForCurrentOp(); ASSERT_EQ(report.getIntField("allShardsHighestRemainingOperationTimeEstimatedSecs"), 0); - metrics->setHighestEstimatedRemainingOperationTime(Milliseconds(12000)); + metrics->setCoordinatorHighEstimateRemainingTimeMillis(Milliseconds(12000)); report = metrics->reportForCurrentOp(); ASSERT_EQ(report.getIntField("allShardsHighestRemainingOperationTimeEstimatedSecs"), 12); diff --git a/src/mongo/db/s/sharding_data_transform_metrics_observer.cpp b/src/mongo/db/s/sharding_data_transform_metrics_observer.cpp index 7dfe3092b8a..eca06d99acb 100644 --- a/src/mongo/db/s/sharding_data_transform_metrics_observer.cpp +++ b/src/mongo/db/s/sharding_data_transform_metrics_observer.cpp @@ -36,11 +36,11 @@ ShardingDataTransformMetricsObserver::ShardingDataTransformMetricsObserver( : _metrics(metrics) {} int64_t ShardingDataTransformMetricsObserver::getHighEstimateRemainingTimeMillis() const { - return _metrics->getHighEstimateRemainingTimeMillis(); + return _metrics->getHighEstimateRemainingTimeMillis().count(); } int64_t ShardingDataTransformMetricsObserver::getLowEstimateRemainingTimeMillis() const { - return _metrics->getLowEstimateRemainingTimeMillis(); + return _metrics->getLowEstimateRemainingTimeMillis().count(); } Date_t ShardingDataTransformMetricsObserver::getStartTimestamp() const { diff --git a/src/mongo/db/s/shardsvr_resharding_operation_time_command.cpp b/src/mongo/db/s/shardsvr_resharding_operation_time_command.cpp index f15435d52e7..0f799f6e3ac 100644 --- a/src/mongo/db/s/shardsvr_resharding_operation_time_command.cpp +++ b/src/mongo/db/s/shardsvr_resharding_operation_time_command.cpp @@ -34,7 +34,10 @@ #include "mongo/db/auth/authorization_session.h" #include "mongo/db/commands.h" #include "mongo/db/operation_context.h" +#include "mongo/db/repl/primary_only_service.h" #include "mongo/db/s/resharding/resharding_metrics.h" +#include "mongo/db/s/resharding/resharding_recipient_service.h" +#include "mongo/db/s/sharding_data_transform_metrics.h" #include "mongo/db/service_context.h" #include "mongo/s/request_types/resharding_operation_time_gen.h" #include "mongo/util/duration.h" @@ -47,18 +50,21 @@ class ShardsvrReshardingOperationTimeCmd final public: class OperationTime { public: - explicit OperationTime(ReshardingMetrics* metrics) : _metrics(metrics) {} + explicit OperationTime(boost::optional<Milliseconds> elapsedMillis, + boost::optional<Milliseconds> remainingMillis) + : _elapsedMillis{elapsedMillis}, _remainingMillis{remainingMillis} {} void serialize(BSONObjBuilder* bob) const { - if (const auto elapsedTime = _metrics->getOperationElapsedTime()) { - bob->append("elapsedMillis", durationCount<Milliseconds>(elapsedTime.get())); + if (_elapsedMillis) { + bob->append("elapsedMillis", _elapsedMillis->count()); } - if (const auto remainingTime = _metrics->getOperationRemainingTime()) { - bob->append("remainingMillis", durationCount<Milliseconds>(remainingTime.get())); + if (_remainingMillis) { + bob->append("remainingMillis", _remainingMillis->count()); } } private: - ReshardingMetrics* const _metrics; + boost::optional<Milliseconds> _elapsedMillis; + boost::optional<Milliseconds> _remainingMillis; }; using Request = _shardsvrReshardingOperationTime; @@ -103,10 +109,23 @@ public: } Response typedRun(OperationContext* opCtx) { - // Once multiple concurrent resharding operations are allowed, the following could use - // `ns()` to choose the instance of `ReshardingMetrics` that is associated with the - // provided namespace string. - return Response(ReshardingMetrics::get(opCtx->getServiceContext())); + if (ShardingDataTransformMetrics::isEnabled()) { + auto instances = + getReshardingStateMachines<ReshardingRecipientService, + ReshardingRecipientService::RecipientStateMachine>( + opCtx, ns()); + if (instances.empty()) { + return Response{boost::none, boost::none}; + } + invariant(instances.size() == 1); + const auto& metrics = instances[0]->getMetrics(); + return Response{duration_cast<Milliseconds>(metrics.getOperationRunningTimeSecs()), + metrics.getHighEstimateRemainingTimeMillis()}; + } else { + auto metrics = ReshardingMetrics::get(opCtx->getServiceContext()); + return Response{metrics->getOperationElapsedTime(), + metrics->getOperationRemainingTime()}; + } } }; } _shardsvrReshardingOperationTime; |