summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBrett Nawrocki <brett.nawrocki@mongodb.com>2022-04-06 14:44:23 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-04-21 20:43:43 +0000
commit46be98147821353da22d0e647ff06fdb7e6fa5c7 (patch)
treeaf5ac4a01e4eb857ea2e03845f4cafb7e89b2e73 /src
parent51b92630cc5a6448de092c1c6a38c70d702c5153 (diff)
downloadmongo-46be98147821353da22d0e647ff06fdb7e6fa5c7.tar.gz
SERVER-64376 Add Parity Recipient Applying Fields in CurrentOp
Track and report the following fields in ShardingDataTransformInstanceMetrics for resharding $currentOp: oplogEntriesFetched totalApplyTimeElapsedSecs remainingOperationTimeEstimatedSecs
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp21
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.cpp11
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.h7
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp8
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.cpp6
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.h2
-rw-r--r--src/mongo/db/s/resharding/resharding_data_replication.cpp7
-rw-r--r--src/mongo/db/s/resharding/resharding_data_replication.h1
-rw-r--r--src/mongo/db/s/resharding/resharding_metrics.cpp37
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp7
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_fetcher.h9
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp12
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.cpp4
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.h14
-rw-r--r--src/mongo/db/s/resharding/resharding_util.cpp41
-rw-r--r--src/mongo/db/s/resharding/resharding_util.h32
-rw-r--r--src/mongo/db/s/sharding_data_transform_instance_metrics.cpp137
-rw-r--r--src/mongo/db/s/sharding_data_transform_instance_metrics.h30
-rw-r--r--src/mongo/db/s/sharding_data_transform_instance_metrics_test.cpp77
-rw-r--r--src/mongo/db/s/sharding_data_transform_metrics_observer.cpp4
-rw-r--r--src/mongo/db/s/shardsvr_resharding_operation_time_command.cpp39
21 files changed, 369 insertions, 137 deletions
diff --git a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp
index f8a12c57f90..c30d0e12ecf 100644
--- a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp
+++ b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp
@@ -60,22 +60,15 @@ boost::optional<std::shared_ptr<ReshardingCoordinatorService::ReshardingCoordina
getExistingInstanceToJoin(OperationContext* opCtx,
const NamespaceString& nss,
const BSONObj& newShardKey) {
- auto reshardingCoordinatorService = checked_cast<ReshardingCoordinatorService*>(
- repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext())
- ->lookupServiceByName(ReshardingCoordinatorService::kServiceName));
- auto instances = reshardingCoordinatorService->getAllReshardingInstances(opCtx);
- for (auto& instance : instances) {
- auto reshardingCoordinator =
- checked_pointer_cast<ReshardingCoordinatorService::ReshardingCoordinator>(instance);
-
- auto instanceMetadata = reshardingCoordinator->getMetadata();
+ auto instances =
+ getReshardingStateMachines<ReshardingCoordinatorService,
+ ReshardingCoordinatorService::ReshardingCoordinator>(opCtx, nss);
+ for (const auto& instance : instances) {
if (SimpleBSONObjComparator::kInstance.evaluate(
- instanceMetadata.getReshardingKey().toBSON() == newShardKey) &&
- instanceMetadata.getSourceNss() == nss) {
- return reshardingCoordinator;
- };
+ instance->getMetadata().getReshardingKey().toBSON() == newShardKey)) {
+ return instance;
+ }
}
-
return boost::none;
}
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.cpp b/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.cpp
index a9ef25a0cc2..a898267e8a6 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.cpp
@@ -87,17 +87,17 @@ boost::optional<Milliseconds> extractOperationRemainingTime(const BSONObj& obj)
} // namespace
CoordinatorCommitMonitor::CoordinatorCommitMonitor(
+ std::shared_ptr<ReshardingMetricsNew> metricsNew,
NamespaceString ns,
std::vector<ShardId> recipientShards,
CoordinatorCommitMonitor::TaskExecutorPtr executor,
CancellationToken cancelToken,
- ReshardingMetricsNew* metrics,
Milliseconds maxDelayBetweenQueries)
- : _ns(std::move(ns)),
+ : _metricsNew{std::move(metricsNew)},
+ _ns(std::move(ns)),
_recipientShards(std::move(recipientShards)),
_executor(std::move(executor)),
_cancelToken(std::move(cancelToken)),
- _metrics(metrics),
_threshold(Milliseconds(gRemainingReshardingOperationTimeThresholdMillis.load())),
_maxDelayBetweenQueries(maxDelayBetweenQueries) {}
@@ -211,10 +211,9 @@ ExecutorFuture<void> CoordinatorCommitMonitor::_makeFuture() const {
auto metrics = ReshardingMetrics::get(cc().getServiceContext());
metrics->setMinRemainingOperationTime(remainingTimes.min);
metrics->setMaxRemainingOperationTime(remainingTimes.max);
-
if (ShardingDataTransformMetrics::isEnabled()) {
- _metrics->setLowestEstimatedRemainingOperationTime(remainingTimes.min);
- _metrics->setHighestEstimatedRemainingOperationTime(remainingTimes.max);
+ _metricsNew->setCoordinatorHighEstimateRemainingTimeMillis(remainingTimes.max);
+ _metricsNew->setCoordinatorLowEstimateRemainingTimeMillis(remainingTimes.min);
}
// Check if all recipient shards are within the commit threshold.
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.h b/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.h
index a8b070fd30f..fb9f55d614f 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.h
+++ b/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor.h
@@ -69,11 +69,11 @@ public:
Milliseconds max;
};
- CoordinatorCommitMonitor(NamespaceString ns,
+ CoordinatorCommitMonitor(std::shared_ptr<ReshardingMetricsNew> metricsNew,
+ NamespaceString ns,
std::vector<ShardId> recipientShards,
TaskExecutorPtr executor,
CancellationToken cancelToken,
- ReshardingMetricsNew* metrics,
Milliseconds maxDelayBetweenQueries = kMaxDelayBetweenQueries);
SemiFuture<void> waitUntilRecipientsAreWithinCommitThreshold() const;
@@ -95,13 +95,12 @@ private:
static constexpr auto kDiagnosticLogLevel = 0;
static constexpr auto kMaxDelayBetweenQueries = Seconds(30);
+ std::shared_ptr<ReshardingMetricsNew> _metricsNew;
const NamespaceString _ns;
const std::vector<ShardId> _recipientShards;
const TaskExecutorPtr _executor;
const CancellationToken _cancelToken;
- ReshardingMetricsNew* _metrics;
-
const Milliseconds _threshold;
const Milliseconds _maxDelayBetweenQueries;
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp
index b8fd31b8d1d..831c137f340 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_commit_monitor_test.cpp
@@ -107,7 +107,7 @@ private:
boost::optional<Callback> _runOnMockingNextResponse;
ShardingDataTransformCumulativeMetrics _cumulativeMetrics{"dummyForTest"};
- std::unique_ptr<ReshardingMetricsNew> _metrics;
+ std::shared_ptr<ReshardingMetricsNew> _metrics;
};
auto makeExecutor() {
@@ -153,7 +153,7 @@ void CoordinatorCommitMonitorTest::setUp() {
_cancellationSource = std::make_unique<CancellationSource>();
- _metrics = std::make_unique<ReshardingMetricsNew>(
+ _metrics = std::make_shared<ReshardingMetricsNew>(
UUID::gen(),
BSON("y" << 1),
_ns,
@@ -162,11 +162,11 @@ void CoordinatorCommitMonitorTest::setUp() {
clockSource,
&_cumulativeMetrics);
- _commitMonitor = std::make_shared<CoordinatorCommitMonitor>(_ns,
+ _commitMonitor = std::make_shared<CoordinatorCommitMonitor>(_metrics,
+ _ns,
_recipientShards,
_futureExecutor,
_cancellationSource->token(),
- _metrics.get(),
Milliseconds(0));
_commitMonitor->setNetworkExecutorForTest(executor());
}
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
index 30acb2f9ae3..c826c3dbb85 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
@@ -1651,6 +1651,7 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsFinished
coordinatorDocChangedOnDisk);
if (ShardingDataTransformMetrics::isEnabled()) {
_metricsNew->onCopyingEnd();
+ _metricsNew->onApplyingBegin();
}
})
.then([this] { return _waitForMajority(_ctHolder->getAbortToken()); });
@@ -1663,11 +1664,11 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_startCommitMonitor(
}
_commitMonitor = std::make_shared<resharding::CoordinatorCommitMonitor>(
+ _metricsNew,
_coordinatorDoc.getSourceNss(),
extractShardIdsFromParticipantEntries(_coordinatorDoc.getRecipientShards()),
**executor,
- _ctHolder->getCommitMonitorToken(),
- _metricsNew.get());
+ _ctHolder->getCommitMonitorToken());
_commitMonitorQuiesced = _commitMonitor->waitUntilRecipientsAreWithinCommitThreshold()
.thenRunOn(**executor)
@@ -1711,6 +1712,7 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsFinished
this->_updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum::kBlockingWrites,
_coordinatorDoc);
if (ShardingDataTransformMetrics::isEnabled()) {
+ _metricsNew->onApplyingEnd();
_metricsNew->onCriticalSectionBegin();
}
})
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.h b/src/mongo/db/s/resharding/resharding_coordinator_service.h
index 280abbac2f3..49af5739b68 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.h
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.h
@@ -507,7 +507,7 @@ private:
// The primary-only service instance corresponding to the coordinator instance. Not owned.
const ReshardingCoordinatorService* const _coordinatorService;
- std::unique_ptr<ReshardingMetricsNew> _metricsNew;
+ std::shared_ptr<ReshardingMetricsNew> _metricsNew;
// The in-memory representation of the immutable portion of the document in
// config.reshardingOperations.
diff --git a/src/mongo/db/s/resharding/resharding_data_replication.cpp b/src/mongo/db/s/resharding/resharding_data_replication.cpp
index cb468d16aa8..0b7a6c9efb9 100644
--- a/src/mongo/db/s/resharding/resharding_data_replication.cpp
+++ b/src/mongo/db/s/resharding/resharding_data_replication.cpp
@@ -113,6 +113,7 @@ std::vector<std::unique_ptr<ReshardingTxnCloner>> ReshardingDataReplication::_ma
std::vector<std::unique_ptr<ReshardingOplogFetcher>> ReshardingDataReplication::_makeOplogFetchers(
OperationContext* opCtx,
ReshardingMetrics* metrics,
+ ReshardingMetricsNew* metricsNew,
const CommonReshardingMetadata& metadata,
const std::vector<DonorShardFetchTimestamp>& donorShards,
const ShardId& myShardId) {
@@ -128,7 +129,8 @@ std::vector<std::unique_ptr<ReshardingOplogFetcher>> ReshardingDataReplication::
invariant((idToResumeFrom >= ReshardingDonorOplogId{minFetchTimestamp, minFetchTimestamp}));
oplogFetchers.emplace_back(std::make_unique<ReshardingOplogFetcher>(
- std::make_unique<ReshardingOplogFetcher::Env>(opCtx->getServiceContext(), metrics),
+ std::make_unique<ReshardingOplogFetcher::Env>(
+ opCtx->getServiceContext(), metrics, metricsNew),
metadata.getReshardingUUID(),
metadata.getSourceUUID(),
// The recipient fetches oplog entries from the donor starting from the largest _id
@@ -225,7 +227,8 @@ std::unique_ptr<ReshardingDataReplicationInterface> ReshardingDataReplication::m
txnCloners = _makeTxnCloners(metadata, donorShards);
}
- auto oplogFetchers = _makeOplogFetchers(opCtx, metrics, metadata, donorShards, myShardId);
+ auto oplogFetchers =
+ _makeOplogFetchers(opCtx, metrics, metricsNew, metadata, donorShards, myShardId);
auto oplogFetcherExecutor = _makeOplogFetcherExecutor(donorShards.size());
diff --git a/src/mongo/db/s/resharding/resharding_data_replication.h b/src/mongo/db/s/resharding/resharding_data_replication.h
index 16bee54da01..47417ae8363 100644
--- a/src/mongo/db/s/resharding/resharding_data_replication.h
+++ b/src/mongo/db/s/resharding/resharding_data_replication.h
@@ -211,6 +211,7 @@ private:
static std::vector<std::unique_ptr<ReshardingOplogFetcher>> _makeOplogFetchers(
OperationContext* opCtx,
ReshardingMetrics* metrics,
+ ReshardingMetricsNew* metricsNew,
const CommonReshardingMetadata& metadata,
const std::vector<DonorShardFetchTimestamp>& donorShards,
const ShardId& myShardId);
diff --git a/src/mongo/db/s/resharding/resharding_metrics.cpp b/src/mongo/db/s/resharding/resharding_metrics.cpp
index 6e4e4e041e9..fa5fea13683 100644
--- a/src/mongo/db/s/resharding/resharding_metrics.cpp
+++ b/src/mongo/db/s/resharding/resharding_metrics.cpp
@@ -33,6 +33,7 @@
#include <memory>
#include "mongo/db/s/resharding/resharding_metrics.h"
+#include "mongo/db/s/resharding/resharding_util.h"
#include "mongo/logv2/log.h"
#include "mongo/platform/compiler.h"
#include "mongo/util/aligned.h"
@@ -82,20 +83,6 @@ const auto reshardingMetricsRegisterer = ServiceContext::ConstructorActionRegist
"ReshardingMetrics",
[](ServiceContext* ctx) { getMetrics(ctx) = std::make_unique<ReshardingMetrics>(ctx); }};
-/**
- * Given a constant rate of time per unit of work:
- * totalTime / totalWork == elapsedTime / elapsedWork
- * Solve for remaining time.
- * remainingTime := totalTime - elapsedTime
- * == (totalWork * (elapsedTime / elapsedWork)) - elapsedTime
- * == elapsedTime * (totalWork / elapsedWork - 1)
- */
-Milliseconds remainingTime(Milliseconds elapsedTime, double elapsedWork, double totalWork) {
- elapsedWork = std::min(elapsedWork, totalWork);
- double remainingMsec = 1.0 * elapsedTime.count() * (totalWork / elapsedWork - 1);
- return Milliseconds(Milliseconds::rep(remainingMsec));
-}
-
static StringData serializeState(boost::optional<RecipientStateEnum> e) {
return RecipientState_serializer(*e);
}
@@ -255,21 +242,13 @@ void ReshardingMetrics::OperationMetrics::gotDelete() noexcept {
boost::optional<Milliseconds> ReshardingMetrics::OperationMetrics::remainingOperationTime(
Date_t now) const {
- if (recipientState > RecipientStateEnum::kCloning && oplogEntriesFetched == 0) {
- return Milliseconds(0);
- }
-
- if (oplogEntriesApplied > 0 && oplogEntriesFetched > 0) {
- // All fetched oplogEntries must be applied. Some of them already have been.
- return remainingTime(
- applyingOplogEntries.duration(now), oplogEntriesApplied, oplogEntriesFetched);
- }
- if (bytesCopied > 0 && bytesToCopy > 0) {
- // Until the time to apply batches of oplog entries is measured, we assume that applying all
- // of them will take as long as copying did.
- return remainingTime(copyingDocuments.duration(now), bytesCopied, 2 * bytesToCopy);
- }
- return {};
+ return estimateRemainingRecipientTime(recipientState > RecipientStateEnum::kCloning,
+ bytesCopied,
+ bytesToCopy,
+ copyingDocuments.duration(now),
+ oplogEntriesApplied,
+ oplogEntriesFetched,
+ applyingOplogEntries.duration(now));
}
void ReshardingMetrics::OperationMetrics::appendCurrentOpMetrics(BSONObjBuilder* bob,
diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp
index 30811c8d2aa..5aed035817d 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp
@@ -46,6 +46,7 @@
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/db/repl/read_concern_level.h"
#include "mongo/db/s/resharding/resharding_metrics.h"
+#include "mongo/db/s/resharding/resharding_metrics_new.h"
#include "mongo/db/s/resharding/resharding_util.h"
#include "mongo/db/storage/write_unit_of_work.h"
#include "mongo/executor/task_executor.h"
@@ -342,6 +343,9 @@ bool ReshardingOplogFetcher::consume(Client* client,
++_numOplogEntriesCopied;
_env->metrics()->onOplogEntriesFetched(1);
+ if (ShardingDataTransformMetrics::isEnabled()) {
+ _env->metricsNew()->onOplogEntriesFetched(1);
+ }
auto [p, f] = makePromiseFuture<void>();
{
@@ -385,6 +389,9 @@ bool ReshardingOplogFetcher::consume(Client* client,
wuow.commit();
_env->metrics()->onOplogEntriesFetched(1);
+ if (ShardingDataTransformMetrics::isEnabled()) {
+ _env->metricsNew()->onOplogEntriesFetched(1);
+ }
auto [p, f] = makePromiseFuture<void>();
{
diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.h b/src/mongo/db/s/resharding/resharding_oplog_fetcher.h
index e0aad151f78..d0d53395647 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_fetcher.h
+++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.h
@@ -51,23 +51,28 @@
namespace mongo {
class ReshardingMetrics;
+class ReshardingMetricsNew;
class ReshardingOplogFetcher : public resharding::OnInsertAwaitable {
public:
class Env {
public:
- Env(ServiceContext* service, ReshardingMetrics* metrics)
- : _service(service), _metrics(metrics) {}
+ Env(ServiceContext* service, ReshardingMetrics* metrics, ReshardingMetricsNew* metricsNew)
+ : _service(service), _metrics(metrics), _metricsNew(metricsNew) {}
ServiceContext* service() const {
return _service;
}
ReshardingMetrics* metrics() const {
return _metrics;
}
+ ReshardingMetricsNew* metricsNew() const {
+ return _metricsNew;
+ }
private:
ServiceContext* _service;
ReshardingMetrics* _metrics;
+ ReshardingMetricsNew* _metricsNew;
};
// Special value to use for startAt to indicate there are no more oplog entries needing to be
diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp
index e7c512e2669..24d9f016e59 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp
@@ -47,6 +47,7 @@
#include "mongo/db/repl/wait_for_majority_service.h"
#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/resharding/resharding_metrics.h"
+#include "mongo/db/s/resharding/resharding_metrics_new.h"
#include "mongo/db/s/resharding/resharding_oplog_fetcher.h"
#include "mongo/db/s/resharding/resharding_util.h"
#include "mongo/db/s/shard_server_test_fixture.h"
@@ -101,6 +102,13 @@ public:
_metrics->onStart(ReshardingMetrics::Role::kRecipient,
_svcCtx->getFastClockSource()->now());
_metrics->setRecipientState(RecipientStateEnum::kCloning);
+ _metricsNew =
+ ReshardingMetricsNew::makeInstance(_reshardingUUID,
+ BSON("y" << 1),
+ NamespaceString{""},
+ ReshardingMetricsNew::Role::kRecipient,
+ getServiceContext()->getFastClockSource()->now(),
+ getServiceContext());
for (const auto& shardId : kTwoShardIdList) {
auto shardTargeter = RemoteCommandTargeterMock::get(
@@ -128,7 +136,8 @@ public:
}
auto makeFetcherEnv() {
- return std::make_unique<ReshardingOplogFetcher::Env>(_svcCtx, &*_metrics);
+ return std::make_unique<ReshardingOplogFetcher::Env>(
+ _svcCtx, _metrics.get(), _metricsNew.get());
}
/**
@@ -341,6 +350,7 @@ protected:
ShardId _donorShard;
ShardId _destinationShard;
std::unique_ptr<ReshardingMetrics> _metrics;
+ std::unique_ptr<ReshardingMetricsNew> _metricsNew;
private:
static HostAndPort makeHostAndPort(const ShardId& shardId) {
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
index cf6b9fe4619..67dd4a81ceb 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
@@ -829,6 +829,7 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionToApplying(
_metrics()->startApplyingOplogEntries(currentTime);
if (ShardingDataTransformMetrics::isEnabled()) {
_metricsNew->onCopyingEnd();
+ _metricsNew->onApplyingBegin();
}
}
@@ -839,6 +840,9 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionToStrictConsi
_transitionState(std::move(newRecipientCtx), boost::none, boost::none, factory);
auto currentTime = getCurrentTime();
_metrics()->endApplyingOplogEntries(currentTime);
+ if (ShardingDataTransformMetrics::isEnabled()) {
+ _metricsNew->onApplyingEnd();
+ }
}
void ReshardingRecipientService::RecipientStateMachine::_transitionToError(
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h
index 19dd962ace9..832e1dd8083 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.h
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.h
@@ -71,6 +71,11 @@ public:
std::shared_ptr<repl::PrimaryOnlyService::Instance> constructInstance(
BSONObj initialState) override;
+
+ inline std::vector<std::shared_ptr<PrimaryOnlyService::Instance>> getAllReshardingInstances(
+ OperationContext* opCtx) {
+ return getAllInstances(opCtx);
+ }
};
/**
@@ -154,6 +159,15 @@ public:
return _completionPromise.getFuture();
}
+ inline const CommonReshardingMetadata& getMetadata() const {
+ return _metadata;
+ }
+
+ inline const ReshardingMetricsNew& getMetrics() const {
+ invariant(_metricsNew);
+ return *_metricsNew;
+ }
+
boost::optional<BSONObj> reportForCurrentOp(
MongoProcessInterface::CurrentOpConnectionsMode,
MongoProcessInterface::CurrentOpSessionsMode) noexcept override;
diff --git a/src/mongo/db/s/resharding/resharding_util.cpp b/src/mongo/db/s/resharding/resharding_util.cpp
index 27d319264dd..aec59fd5075 100644
--- a/src/mongo/db/s/resharding/resharding_util.cpp
+++ b/src/mongo/db/s/resharding/resharding_util.cpp
@@ -49,6 +49,8 @@
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/resharding/document_source_resharding_add_resume_id.h"
#include "mongo/db/s/resharding/document_source_resharding_iterate_transaction.h"
+#include "mongo/db/s/resharding/resharding_metrics.h"
+#include "mongo/db/s/resharding/resharding_metrics_new.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/storage/write_unit_of_work.h"
#include "mongo/logv2/log.h"
@@ -60,6 +62,23 @@
#include "mongo/s/shard_key_pattern.h"
namespace mongo {
+
+namespace {
+/**
+ * Given a constant rate of time per unit of work:
+ * totalTime / totalWork == elapsedTime / elapsedWork
+ * Solve for remaining time.
+ * remainingTime := totalTime - elapsedTime
+ * == (totalWork * (elapsedTime / elapsedWork)) - elapsedTime
+ * == elapsedTime * (totalWork / elapsedWork - 1)
+ */
+Milliseconds estimateRemainingTime(Milliseconds elapsedTime, double elapsedWork, double totalWork) {
+ elapsedWork = std::min(elapsedWork, totalWork);
+ double remainingMsec = 1.0 * elapsedTime.count() * (totalWork / elapsedWork - 1);
+ return Milliseconds(Milliseconds::rep(std::round(remainingMsec)));
+}
+} // namespace
+
using namespace fmt::literals;
BSONObj serializeAndTruncateReshardingErrorIfNeeded(Status originalError) {
@@ -372,4 +391,26 @@ void doNoopWrite(OperationContext* opCtx, StringData opStr, const NamespaceStrin
});
}
+boost::optional<Milliseconds> estimateRemainingRecipientTime(bool applyingBegan,
+ int64_t bytesCopied,
+ int64_t bytesToCopy,
+ Milliseconds timeSpentCopying,
+ int64_t oplogEntriesApplied,
+ int64_t oplogEntriesFetched,
+ Milliseconds timeSpentApplying) {
+ if (applyingBegan && oplogEntriesFetched == 0) {
+ return Milliseconds(0);
+ }
+ if (oplogEntriesApplied > 0 && oplogEntriesFetched > 0) {
+ // All fetched oplogEntries must be applied. Some of them already have been.
+ return estimateRemainingTime(timeSpentApplying, oplogEntriesApplied, oplogEntriesFetched);
+ }
+ if (bytesCopied > 0 && bytesToCopy > 0) {
+ // Until the time to apply batches of oplog entries is measured, we assume that applying all
+ // of them will take as long as copying did.
+ return estimateRemainingTime(timeSpentCopying, bytesCopied, 2 * bytesToCopy);
+ }
+ return {};
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_util.h b/src/mongo/db/s/resharding/resharding_util.h
index 856d7cbb081..194381e7e78 100644
--- a/src/mongo/db/s/resharding/resharding_util.h
+++ b/src/mongo/db/s/resharding/resharding_util.h
@@ -37,6 +37,7 @@
#include "mongo/db/keypattern.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/pipeline/pipeline.h"
+#include "mongo/db/repl/primary_only_service.h"
#include "mongo/db/s/collection_sharding_runtime.h"
#include "mongo/db/s/resharding/coordinator_document_gen.h"
#include "mongo/db/s/resharding/donor_oplog_id_gen.h"
@@ -293,4 +294,35 @@ NamespaceString getLocalConflictStashNamespace(UUID existingUUID, ShardId donorS
void doNoopWrite(OperationContext* opCtx, StringData opStr, const NamespaceString& nss);
+boost::optional<Milliseconds> estimateRemainingRecipientTime(bool applyingBegan,
+ int64_t bytesCopied,
+ int64_t bytesToCopy,
+ Milliseconds timeSpentCopying,
+ int64_t oplogEntriesApplied,
+ int64_t oplogEntriesFetched,
+ Milliseconds timeSpentApplying);
+/**
+ * Looks up the StateMachine by namespace of the collection being resharded. If it does not exist,
+ * returns boost::none.
+ */
+template <class Service, class Instance>
+std::vector<std::shared_ptr<Instance>> getReshardingStateMachines(OperationContext* opCtx,
+ const NamespaceString& sourceNs) {
+ auto service =
+ checked_cast<Service*>(repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext())
+ ->lookupServiceByName(Service::kServiceName));
+ auto instances = service->getAllReshardingInstances(opCtx);
+ std::vector<std::shared_ptr<Instance>> result;
+ for (const auto& genericInstace : instances) {
+ auto instance = checked_pointer_cast<Instance>(genericInstace);
+ auto metadata = instance->getMetadata();
+ if (metadata.getSourceNss() != sourceNs) {
+ continue;
+ }
+ result.emplace_back(std::move(instance));
+ }
+ return result;
+}
+
+
} // namespace mongo
diff --git a/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp b/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp
index d5c94d46416..095f11d1e76 100644
--- a/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp
+++ b/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp
@@ -28,33 +28,29 @@
*/
#include "mongo/db/s/sharding_data_transform_instance_metrics.h"
+#include "mongo/db/s/resharding/resharding_util.h"
#include "mongo/db/s/sharding_data_transform_metrics_observer.h"
-namespace {
-constexpr int64_t kPlaceholderTimestampForTesting = 0;
-constexpr int64_t kPlaceholderTimeRemainingForTesting = 0;
-constexpr auto TEMP_VALUE = "placeholder";
-
-} // namespace
-
namespace mongo {
namespace {
constexpr auto kNoDate = Date_t::min();
-int64_t getElapsedTimeSeconds(const AtomicWord<Date_t>& startTime,
- const AtomicWord<Date_t>& endTime,
- ClockSource* clock) {
+template <typename T>
+T getElapsed(const AtomicWord<Date_t>& startTime,
+ const AtomicWord<Date_t>& endTime,
+ ClockSource* clock) {
auto start = startTime.load();
if (start == kNoDate) {
- return 0;
+ return T{0};
}
auto end = endTime.load();
if (end == kNoDate) {
end = clock->now();
}
- return durationCount<Seconds>(end - start);
+ return duration_cast<T>(end - start);
}
+
} // namespace
ShardingDataTransformInstanceMetrics::ShardingDataTransformInstanceMetrics(
@@ -99,15 +95,18 @@ ShardingDataTransformInstanceMetrics::ShardingDataTransformInstanceMetrics(
_documentsCopied{0},
_approxBytesToCopy{0},
_bytesCopied{0},
+ _applyingStartTime{kNoDate},
+ _applyingEndTime{kNoDate},
+ _oplogEntriesFetched{0},
_insertsApplied{0},
_updatesApplied{0},
_deletesApplied{0},
_oplogEntriesApplied{0},
+ _coordinatorHighEstimateRemainingTimeMillis{Milliseconds{0}},
+ _coordinatorLowEstimateRemainingTimeMillis{Milliseconds{0}},
_criticalSectionStartTime{kNoDate},
_criticalSectionEndTime{kNoDate},
- _writesDuringCriticalSection{0},
- _lowestEstimatedRemainingOperationTime{Milliseconds(0)},
- _highestEstimatedRemainingOperationTime{Milliseconds(0)} {}
+ _writesDuringCriticalSection{0} {}
ShardingDataTransformInstanceMetrics::~ShardingDataTransformInstanceMetrics() {
if (_deregister) {
@@ -115,12 +114,39 @@ ShardingDataTransformInstanceMetrics::~ShardingDataTransformInstanceMetrics() {
}
}
-int64_t ShardingDataTransformInstanceMetrics::getHighEstimateRemainingTimeMillis() const {
- return durationCount<Milliseconds>(_highestEstimatedRemainingOperationTime.load());
+Milliseconds ShardingDataTransformInstanceMetrics::getHighEstimateRemainingTimeMillis() const {
+ switch (_role) {
+ case Role::kRecipient: {
+ auto estimate = estimateRemainingRecipientTime(_applyingStartTime.load() != kNoDate,
+ _bytesCopied.load(),
+ _approxBytesToCopy.load(),
+ getCopyingElapsedTimeSecs(),
+ _oplogEntriesApplied.load(),
+ _oplogEntriesFetched.load(),
+ getApplyingElapsedTimeSecs());
+ if (!estimate) {
+ return Milliseconds{0};
+ }
+ return *estimate;
+ }
+ case Role::kCoordinator:
+ return Milliseconds{_coordinatorHighEstimateRemainingTimeMillis.load()};
+ case Role::kDonor:
+ break;
+ }
+ MONGO_UNREACHABLE;
}
-int64_t ShardingDataTransformInstanceMetrics::getLowEstimateRemainingTimeMillis() const {
- return durationCount<Milliseconds>(_lowestEstimatedRemainingOperationTime.load());
+Milliseconds ShardingDataTransformInstanceMetrics::getLowEstimateRemainingTimeMillis() const {
+ switch (_role) {
+ case Role::kRecipient:
+ return getHighEstimateRemainingTimeMillis();
+ case Role::kCoordinator:
+ return _coordinatorLowEstimateRemainingTimeMillis.load();
+ case Role::kDonor:
+ break;
+ }
+ MONGO_UNREACHABLE;
}
Date_t ShardingDataTransformInstanceMetrics::getStartTimestamp() const {
@@ -153,30 +179,33 @@ BSONObj ShardingDataTransformInstanceMetrics::reportForCurrentOp() const noexcep
builder.append(kOp, "command");
builder.append(kNamespace, _sourceNs.toString());
builder.append(kOriginatingCommand, _originalCommand);
- builder.append(kOpTimeElapsed, getOperationRunningTimeSecs());
+ builder.append(kOpTimeElapsed, getOperationRunningTimeSecs().count());
switch (_role) {
case Role::kCoordinator:
builder.append(kAllShardsHighestRemainingOperationTimeEstimatedSecs,
- durationCount<Seconds>(_highestEstimatedRemainingOperationTime.load()));
+ durationCount<Seconds>(getHighEstimateRemainingTimeMillis()));
builder.append(kAllShardsLowestRemainingOperationTimeEstimatedSecs,
- durationCount<Seconds>(_lowestEstimatedRemainingOperationTime.load()));
+ durationCount<Seconds>(getLowEstimateRemainingTimeMillis()));
builder.append(kCoordinatorState, getStateString());
- builder.append(kApplyTimeElapsed, TEMP_VALUE);
- builder.append(kCopyTimeElapsed, getCopyingElapsedTimeSecs());
- builder.append(kCriticalSectionTimeElapsed, getCriticalSectionElapsedTimeSecs());
+ builder.append(kApplyTimeElapsed, getApplyingElapsedTimeSecs().count());
+ builder.append(kCopyTimeElapsed, getCopyingElapsedTimeSecs().count());
+ builder.append(kCriticalSectionTimeElapsed,
+ getCriticalSectionElapsedTimeSecs().count());
break;
case Role::kDonor:
builder.append(kDonorState, getStateString());
- builder.append(kCriticalSectionTimeElapsed, getCriticalSectionElapsedTimeSecs());
+ builder.append(kCriticalSectionTimeElapsed,
+ getCriticalSectionElapsedTimeSecs().count());
builder.append(kCountWritesDuringCriticalSection, _writesDuringCriticalSection.load());
builder.append(kCountReadsDuringCriticalSection, _readsDuringCriticalSection.load());
break;
case Role::kRecipient:
builder.append(kRecipientState, getStateString());
- builder.append(kApplyTimeElapsed, TEMP_VALUE);
- builder.append(kCopyTimeElapsed, getCopyingElapsedTimeSecs());
- builder.append(kRemainingOpTimeEstimated, TEMP_VALUE);
+ builder.append(kApplyTimeElapsed, getApplyingElapsedTimeSecs().count());
+ builder.append(kCopyTimeElapsed, getCopyingElapsedTimeSecs().count());
+ builder.append(kRemainingOpTimeEstimated,
+ durationCount<Seconds>(getHighEstimateRemainingTimeMillis()));
builder.append(kApproxDocumentsToCopy, _approxDocumentsToCopy.load());
builder.append(kApproxBytesToCopy, _approxBytesToCopy.load());
builder.append(kBytesCopied, _bytesCopied.load());
@@ -185,7 +214,7 @@ BSONObj ShardingDataTransformInstanceMetrics::reportForCurrentOp() const noexcep
builder.append(kUpdatesApplied, _updatesApplied.load());
builder.append(kDeletesApplied, _deletesApplied.load());
builder.append(kOplogEntriesApplied, _oplogEntriesApplied.load());
- builder.append(kOplogEntriesFetched, TEMP_VALUE);
+ builder.append(kOplogEntriesFetched, _oplogEntriesFetched.load());
builder.append(kDocumentsCopied, _documentsCopied.load());
break;
default:
@@ -203,6 +232,14 @@ void ShardingDataTransformInstanceMetrics::onCopyingEnd() {
_copyingEndTime.store(_clockSource->now());
}
+void ShardingDataTransformInstanceMetrics::onApplyingBegin() {
+ _applyingStartTime.store(_clockSource->now());
+}
+
+void ShardingDataTransformInstanceMetrics::onApplyingEnd() {
+ _applyingEndTime.store(_clockSource->now());
+}
+
void ShardingDataTransformInstanceMetrics::onDocumentsCopied(int64_t documentCount,
int64_t totalDocumentsSizeBytes) {
_documentsCopied.addAndFetch(documentCount);
@@ -215,6 +252,16 @@ void ShardingDataTransformInstanceMetrics::setDocumentsToCopyCounts(
_approxBytesToCopy.store(totalDocumentsSizeBytes);
}
+void ShardingDataTransformInstanceMetrics::setCoordinatorHighEstimateRemainingTimeMillis(
+ Milliseconds milliseconds) {
+ _coordinatorHighEstimateRemainingTimeMillis.store(milliseconds);
+}
+
+void ShardingDataTransformInstanceMetrics::setCoordinatorLowEstimateRemainingTimeMillis(
+ Milliseconds milliseconds) {
+ _coordinatorLowEstimateRemainingTimeMillis.store(milliseconds);
+}
+
void ShardingDataTransformInstanceMetrics::onInsertApplied() {
_insertsApplied.addAndFetch(1);
}
@@ -227,6 +274,10 @@ void ShardingDataTransformInstanceMetrics::onDeleteApplied() {
_deletesApplied.addAndFetch(1);
}
+void ShardingDataTransformInstanceMetrics::onOplogEntriesFetched(int64_t numEntries) {
+ _oplogEntriesFetched.addAndFetch(numEntries);
+}
+
void ShardingDataTransformInstanceMetrics::onOplogEntriesApplied(int64_t numEntries) {
_oplogEntriesApplied.addAndFetch(numEntries);
}
@@ -243,16 +294,20 @@ void ShardingDataTransformInstanceMetrics::onCriticalSectionEnd() {
_criticalSectionEndTime.store(_clockSource->now());
}
-inline int64_t ShardingDataTransformInstanceMetrics::getOperationRunningTimeSecs() const {
- return durationCount<Seconds>(_clockSource->now() - _startTime);
+Seconds ShardingDataTransformInstanceMetrics::getOperationRunningTimeSecs() const {
+ return duration_cast<Seconds>(_clockSource->now() - _startTime);
+}
+
+Seconds ShardingDataTransformInstanceMetrics::getCopyingElapsedTimeSecs() const {
+ return getElapsed<Seconds>(_copyingStartTime, _copyingEndTime, _clockSource);
}
-int64_t ShardingDataTransformInstanceMetrics::getCriticalSectionElapsedTimeSecs() const {
- return getElapsedTimeSeconds(_criticalSectionStartTime, _criticalSectionEndTime, _clockSource);
+Seconds ShardingDataTransformInstanceMetrics::getApplyingElapsedTimeSecs() const {
+ return getElapsed<Seconds>(_applyingStartTime, _applyingEndTime, _clockSource);
}
-int64_t ShardingDataTransformInstanceMetrics::getCopyingElapsedTimeSecs() const {
- return getElapsedTimeSeconds(_copyingStartTime, _copyingEndTime, _clockSource);
+Seconds ShardingDataTransformInstanceMetrics::getCriticalSectionElapsedTimeSecs() const {
+ return getElapsed<Seconds>(_criticalSectionStartTime, _criticalSectionEndTime, _clockSource);
}
void ShardingDataTransformInstanceMetrics::onWriteToStashedCollections() {
@@ -271,14 +326,4 @@ void ShardingDataTransformInstanceMetrics::accumulateValues(int64_t insertsAppli
_deletesApplied.fetchAndAdd(deletesApplied);
}
-void ShardingDataTransformInstanceMetrics::setLowestEstimatedRemainingOperationTime(
- Milliseconds time) {
- _lowestEstimatedRemainingOperationTime.store(time);
-}
-
-void ShardingDataTransformInstanceMetrics::setHighestEstimatedRemainingOperationTime(
- Milliseconds time) {
- _highestEstimatedRemainingOperationTime.store(time);
-}
-
} // namespace mongo
diff --git a/src/mongo/db/s/sharding_data_transform_instance_metrics.h b/src/mongo/db/s/sharding_data_transform_instance_metrics.h
index 66c6092b722..e66a80e501b 100644
--- a/src/mongo/db/s/sharding_data_transform_instance_metrics.h
+++ b/src/mongo/db/s/sharding_data_transform_instance_metrics.h
@@ -60,17 +60,22 @@ public:
virtual ~ShardingDataTransformInstanceMetrics();
BSONObj reportForCurrentOp() const noexcept;
- int64_t getHighEstimateRemainingTimeMillis() const;
- int64_t getLowEstimateRemainingTimeMillis() const;
+ Milliseconds getHighEstimateRemainingTimeMillis() const;
+ Milliseconds getLowEstimateRemainingTimeMillis() const;
Date_t getStartTimestamp() const;
const UUID& getInstanceId() const;
void onCopyingBegin();
void onCopyingEnd();
+ void onApplyingBegin();
+ void onApplyingEnd();
void onDocumentsCopied(int64_t documentCount, int64_t totalDocumentsSizeBytes);
void setDocumentsToCopyCounts(int64_t documentCount, int64_t totalDocumentsSizeBytes);
+ void setCoordinatorHighEstimateRemainingTimeMillis(Milliseconds milliseconds);
+ void setCoordinatorLowEstimateRemainingTimeMillis(Milliseconds milliseconds);
void onInsertApplied();
void onUpdateApplied();
void onDeleteApplied();
+ void onOplogEntriesFetched(int64_t numEntries);
void onOplogEntriesApplied(int64_t numEntries);
void onWriteToStashedCollections();
@@ -79,10 +84,11 @@ public:
void onCriticalSectionBegin();
void onCriticalSectionEnd();
- void setLowestEstimatedRemainingOperationTime(Milliseconds time);
- void setHighestEstimatedRemainingOperationTime(Milliseconds time);
-
Role getRole() const;
+ Seconds getOperationRunningTimeSecs() const;
+ Seconds getCopyingElapsedTimeSecs() const;
+ Seconds getApplyingElapsedTimeSecs() const;
+ Seconds getCriticalSectionElapsedTimeSecs() const;
protected:
virtual std::string createOperationDescription() const noexcept;
@@ -125,10 +131,6 @@ protected:
"allShardsHighestRemainingOperationTimeEstimatedSecs";
private:
- inline int64_t getOperationRunningTimeSecs() const;
- int64_t getCriticalSectionElapsedTimeSecs() const;
- int64_t getCopyingElapsedTimeSecs() const;
-
ClockSource* _clockSource;
ObserverPtr _observer;
ShardingDataTransformCumulativeMetrics* _cumulativeMetrics;
@@ -143,19 +145,23 @@ private:
AtomicWord<int32_t> _approxBytesToCopy;
AtomicWord<int32_t> _bytesCopied;
+ AtomicWord<Date_t> _applyingStartTime;
+ AtomicWord<Date_t> _applyingEndTime;
+ AtomicWord<int64_t> _oplogEntriesFetched;
+
AtomicWord<int64_t> _insertsApplied;
AtomicWord<int64_t> _updatesApplied;
AtomicWord<int64_t> _deletesApplied;
AtomicWord<int64_t> _oplogEntriesApplied;
AtomicWord<int64_t> _writesToStashCollections;
+ AtomicWord<Milliseconds> _coordinatorHighEstimateRemainingTimeMillis;
+ AtomicWord<Milliseconds> _coordinatorLowEstimateRemainingTimeMillis;
+
AtomicWord<Date_t> _criticalSectionStartTime;
AtomicWord<Date_t> _criticalSectionEndTime;
AtomicWord<int64_t> _readsDuringCriticalSection;
AtomicWord<int64_t> _writesDuringCriticalSection;
-
- AtomicWord<Milliseconds> _lowestEstimatedRemainingOperationTime;
- AtomicWord<Milliseconds> _highestEstimatedRemainingOperationTime;
};
} // namespace mongo
diff --git a/src/mongo/db/s/sharding_data_transform_instance_metrics_test.cpp b/src/mongo/db/s/sharding_data_transform_instance_metrics_test.cpp
index ef32091c77b..bb300261e9a 100644
--- a/src/mongo/db/s/sharding_data_transform_instance_metrics_test.cpp
+++ b/src/mongo/db/s/sharding_data_transform_instance_metrics_test.cpp
@@ -249,6 +249,17 @@ TEST_F(ShardingDataTransformInstanceMetricsTest, DonorIncrementReadsDuringCritic
ASSERT_EQ(report.getIntField("countReadsDuringCriticalSection"), 1);
}
+TEST_F(ShardingDataTransformInstanceMetricsTest, RecipientIncrementFetchedOplogEntries) {
+ auto metrics = createInstanceMetrics(UUID::gen(), Role::kRecipient);
+
+ auto report = metrics->reportForCurrentOp();
+ ASSERT_EQ(report.getIntField("oplogEntriesFetched"), 0);
+ metrics->onOplogEntriesFetched(50);
+
+ report = metrics->reportForCurrentOp();
+ ASSERT_EQ(report.getIntField("oplogEntriesFetched"), 50);
+}
+
TEST_F(ShardingDataTransformInstanceMetricsTest, CurrentOpReportsCriticalSectionTime) {
runTimeReportTest(
"CurrentOpReportsCriticalSectionTime",
@@ -289,6 +300,59 @@ TEST_F(ShardingDataTransformInstanceMetricsTest, RecipientIncrementsDocumentsAnd
ASSERT_EQ(report.getIntField("bytesCopied"), 1000);
}
+TEST_F(ShardingDataTransformInstanceMetricsTest, RecipientReportsRemainingTime) {
+ auto metrics = createInstanceMetrics(UUID::gen(), Role::kRecipient);
+ const auto& clock = getClockSource();
+ constexpr auto kIncrement = Milliseconds(5000);
+ constexpr auto kOpsPerIncrement = 25;
+ const auto kIncrementSecs = durationCount<Seconds>(kIncrement);
+ const auto kExpectedTotal = kIncrementSecs * 8;
+ metrics->setDocumentsToCopyCounts(0, kOpsPerIncrement * 4);
+ metrics->onOplogEntriesFetched(kOpsPerIncrement * 4);
+
+ // Before cloning.
+ auto report = metrics->reportForCurrentOp();
+ ASSERT_EQ(report.getIntField("remainingOperationTimeEstimatedSecs"), 0);
+
+ // During cloning.
+ metrics->onCopyingBegin();
+ metrics->onDocumentsCopied(0, kOpsPerIncrement);
+ clock->advance(kIncrement);
+ report = metrics->reportForCurrentOp();
+ ASSERT_EQ(report.getIntField("remainingOperationTimeEstimatedSecs"),
+ kExpectedTotal - kIncrementSecs);
+
+ metrics->onDocumentsCopied(0, kOpsPerIncrement * 2);
+ clock->advance(kIncrement * 2);
+ report = metrics->reportForCurrentOp();
+ ASSERT_EQ(report.getIntField("remainingOperationTimeEstimatedSecs"),
+ kExpectedTotal - (kIncrementSecs * 3));
+
+ // During applying.
+ metrics->onDocumentsCopied(0, kOpsPerIncrement);
+ clock->advance(kIncrement);
+ metrics->onCopyingEnd();
+ metrics->onApplyingBegin();
+ metrics->onOplogEntriesApplied(kOpsPerIncrement);
+ clock->advance(kIncrement);
+ report = metrics->reportForCurrentOp();
+ ASSERT_EQ(report.getIntField("remainingOperationTimeEstimatedSecs"),
+ kExpectedTotal - (kIncrementSecs * 5));
+
+ metrics->onOplogEntriesApplied(kOpsPerIncrement * 2);
+ clock->advance(kIncrement * 2);
+ report = metrics->reportForCurrentOp();
+ ASSERT_EQ(report.getIntField("remainingOperationTimeEstimatedSecs"),
+ kExpectedTotal - (kIncrementSecs * 7));
+
+ // Done.
+ metrics->onOplogEntriesApplied(kOpsPerIncrement);
+ clock->advance(kIncrement);
+ metrics->onApplyingEnd();
+ report = metrics->reportForCurrentOp();
+ ASSERT_EQ(report.getIntField("remainingOperationTimeEstimatedSecs"), 0);
+}
+
TEST_F(ShardingDataTransformInstanceMetricsTest, CurrentOpReportsCopyingTime) {
runTimeReportTest(
"CurrentOpReportsCopyingTime",
@@ -298,6 +362,15 @@ TEST_F(ShardingDataTransformInstanceMetricsTest, CurrentOpReportsCopyingTime) {
[](ShardingDataTransformInstanceMetrics* metrics) { metrics->onCopyingEnd(); });
}
+TEST_F(ShardingDataTransformInstanceMetricsTest, CurrentOpReportsApplyingTime) {
+ runTimeReportTest(
+ "CurrentOpReportsApplyingTime",
+ {Role::kRecipient, Role::kCoordinator},
+ "totalApplyTimeElapsedSecs",
+ [](ShardingDataTransformInstanceMetrics* metrics) { metrics->onApplyingBegin(); },
+ [](ShardingDataTransformInstanceMetrics* metrics) { metrics->onApplyingEnd(); });
+}
+
TEST_F(ShardingDataTransformInstanceMetricsTest, CurrentOpReportsRunningTime) {
auto uuid = UUID::gen();
auto now = getClockSource()->now();
@@ -331,7 +404,7 @@ TEST_F(ShardingDataTransformInstanceMetricsTest,
auto report = metrics->reportForCurrentOp();
ASSERT_EQ(report.getIntField("allShardsLowestRemainingOperationTimeEstimatedSecs"), 0);
- metrics->setLowestEstimatedRemainingOperationTime(Milliseconds(2000));
+ metrics->setCoordinatorLowEstimateRemainingTimeMillis(Milliseconds(2000));
report = metrics->reportForCurrentOp();
ASSERT_EQ(report.getIntField("allShardsLowestRemainingOperationTimeEstimatedSecs"), 2);
@@ -343,7 +416,7 @@ TEST_F(ShardingDataTransformInstanceMetricsTest,
auto report = metrics->reportForCurrentOp();
ASSERT_EQ(report.getIntField("allShardsHighestRemainingOperationTimeEstimatedSecs"), 0);
- metrics->setHighestEstimatedRemainingOperationTime(Milliseconds(12000));
+ metrics->setCoordinatorHighEstimateRemainingTimeMillis(Milliseconds(12000));
report = metrics->reportForCurrentOp();
ASSERT_EQ(report.getIntField("allShardsHighestRemainingOperationTimeEstimatedSecs"), 12);
diff --git a/src/mongo/db/s/sharding_data_transform_metrics_observer.cpp b/src/mongo/db/s/sharding_data_transform_metrics_observer.cpp
index 7dfe3092b8a..eca06d99acb 100644
--- a/src/mongo/db/s/sharding_data_transform_metrics_observer.cpp
+++ b/src/mongo/db/s/sharding_data_transform_metrics_observer.cpp
@@ -36,11 +36,11 @@ ShardingDataTransformMetricsObserver::ShardingDataTransformMetricsObserver(
: _metrics(metrics) {}
int64_t ShardingDataTransformMetricsObserver::getHighEstimateRemainingTimeMillis() const {
- return _metrics->getHighEstimateRemainingTimeMillis();
+ return _metrics->getHighEstimateRemainingTimeMillis().count();
}
int64_t ShardingDataTransformMetricsObserver::getLowEstimateRemainingTimeMillis() const {
- return _metrics->getLowEstimateRemainingTimeMillis();
+ return _metrics->getLowEstimateRemainingTimeMillis().count();
}
Date_t ShardingDataTransformMetricsObserver::getStartTimestamp() const {
diff --git a/src/mongo/db/s/shardsvr_resharding_operation_time_command.cpp b/src/mongo/db/s/shardsvr_resharding_operation_time_command.cpp
index f15435d52e7..0f799f6e3ac 100644
--- a/src/mongo/db/s/shardsvr_resharding_operation_time_command.cpp
+++ b/src/mongo/db/s/shardsvr_resharding_operation_time_command.cpp
@@ -34,7 +34,10 @@
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/commands.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/repl/primary_only_service.h"
#include "mongo/db/s/resharding/resharding_metrics.h"
+#include "mongo/db/s/resharding/resharding_recipient_service.h"
+#include "mongo/db/s/sharding_data_transform_metrics.h"
#include "mongo/db/service_context.h"
#include "mongo/s/request_types/resharding_operation_time_gen.h"
#include "mongo/util/duration.h"
@@ -47,18 +50,21 @@ class ShardsvrReshardingOperationTimeCmd final
public:
class OperationTime {
public:
- explicit OperationTime(ReshardingMetrics* metrics) : _metrics(metrics) {}
+ explicit OperationTime(boost::optional<Milliseconds> elapsedMillis,
+ boost::optional<Milliseconds> remainingMillis)
+ : _elapsedMillis{elapsedMillis}, _remainingMillis{remainingMillis} {}
void serialize(BSONObjBuilder* bob) const {
- if (const auto elapsedTime = _metrics->getOperationElapsedTime()) {
- bob->append("elapsedMillis", durationCount<Milliseconds>(elapsedTime.get()));
+ if (_elapsedMillis) {
+ bob->append("elapsedMillis", _elapsedMillis->count());
}
- if (const auto remainingTime = _metrics->getOperationRemainingTime()) {
- bob->append("remainingMillis", durationCount<Milliseconds>(remainingTime.get()));
+ if (_remainingMillis) {
+ bob->append("remainingMillis", _remainingMillis->count());
}
}
private:
- ReshardingMetrics* const _metrics;
+ boost::optional<Milliseconds> _elapsedMillis;
+ boost::optional<Milliseconds> _remainingMillis;
};
using Request = _shardsvrReshardingOperationTime;
@@ -103,10 +109,23 @@ public:
}
Response typedRun(OperationContext* opCtx) {
- // Once multiple concurrent resharding operations are allowed, the following could use
- // `ns()` to choose the instance of `ReshardingMetrics` that is associated with the
- // provided namespace string.
- return Response(ReshardingMetrics::get(opCtx->getServiceContext()));
+ if (ShardingDataTransformMetrics::isEnabled()) {
+ auto instances =
+ getReshardingStateMachines<ReshardingRecipientService,
+ ReshardingRecipientService::RecipientStateMachine>(
+ opCtx, ns());
+ if (instances.empty()) {
+ return Response{boost::none, boost::none};
+ }
+ invariant(instances.size() == 1);
+ const auto& metrics = instances[0]->getMetrics();
+ return Response{duration_cast<Milliseconds>(metrics.getOperationRunningTimeSecs()),
+ metrics.getHighEstimateRemainingTimeMillis()};
+ } else {
+ auto metrics = ReshardingMetrics::get(opCtx->getServiceContext());
+ return Response{metrics->getOperationElapsedTime(),
+ metrics->getOperationRemainingTime()};
+ }
}
};
} _shardsvrReshardingOperationTime;