summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/s/global_index_metrics.cpp5
-rw-r--r--src/mongo/db/s/global_index_metrics.h4
-rw-r--r--src/mongo/db/s/resharding/resharding_metrics.cpp70
-rw-r--r--src/mongo/db/s/resharding/resharding_metrics.h56
-rw-r--r--src/mongo/db/s/resharding/resharding_metrics_helpers.h10
-rw-r--r--src/mongo/db/s/resharding/resharding_metrics_test.cpp72
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.cpp35
-rw-r--r--src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp29
-rw-r--r--src/mongo/db/s/sharding_data_transform_cumulative_metrics.h4
-rw-r--r--src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp4
-rw-r--r--src/mongo/db/s/sharding_data_transform_instance_metrics.cpp65
-rw-r--r--src/mongo/db/s/sharding_data_transform_instance_metrics.h10
-rw-r--r--src/mongo/db/s/sharding_data_transform_instance_metrics_test.cpp38
-rw-r--r--src/mongo/db/s/sharding_data_transform_metrics_observer.cpp10
-rw-r--r--src/mongo/db/s/sharding_data_transform_metrics_observer.h4
-rw-r--r--src/mongo/db/s/sharding_data_transform_metrics_observer_interface.h4
-rw-r--r--src/mongo/db/s/sharding_data_transform_metrics_test_fixture.h12
-rw-r--r--src/mongo/util/optional_util.h11
18 files changed, 328 insertions, 115 deletions
diff --git a/src/mongo/db/s/global_index_metrics.cpp b/src/mongo/db/s/global_index_metrics.cpp
index f6f4a46c7cf..36d28736d03 100644
--- a/src/mongo/db/s/global_index_metrics.cpp
+++ b/src/mongo/db/s/global_index_metrics.cpp
@@ -229,8 +229,9 @@ GlobalIndexCumulativeMetrics* GlobalIndexMetrics::getGlobalIndexCumulativeMetric
}
-Milliseconds GlobalIndexMetrics::getRecipientHighEstimateRemainingTimeMillis() const {
- return Milliseconds{0};
+boost::optional<Milliseconds> GlobalIndexMetrics::getRecipientHighEstimateRemainingTimeMillis()
+ const {
+ return boost::none;
}
} // namespace mongo
diff --git a/src/mongo/db/s/global_index_metrics.h b/src/mongo/db/s/global_index_metrics.h
index 9f5f53df6a9..6eb2c446565 100644
--- a/src/mongo/db/s/global_index_metrics.h
+++ b/src/mongo/db/s/global_index_metrics.h
@@ -212,7 +212,6 @@ public:
bool unique,
ServiceContext* serviceContext);
- Milliseconds getRecipientHighEstimateRemainingTimeMillis() const;
template <typename T>
static auto initializeFrom(const T& document, ServiceContext* serviceContext) {
static_assert(isStateDocument<T>);
@@ -240,6 +239,9 @@ public:
_stateHolder.onStateTransition(before, after);
}
+protected:
+ boost::optional<Milliseconds> getRecipientHighEstimateRemainingTimeMillis() const override;
+
private:
GlobalIndexCumulativeMetrics* getGlobalIndexCumulativeMetrics();
std::string createOperationDescription() const noexcept override;
diff --git a/src/mongo/db/s/resharding/resharding_metrics.cpp b/src/mongo/db/s/resharding/resharding_metrics.cpp
index 328850f0f6b..7827ab4896d 100644
--- a/src/mongo/db/s/resharding/resharding_metrics.cpp
+++ b/src/mongo/db/s/resharding/resharding_metrics.cpp
@@ -29,6 +29,7 @@
#include "mongo/db/s/resharding/resharding_metrics.h"
#include "mongo/db/exec/document_value/document.h"
#include "mongo/db/s/resharding/resharding_util.h"
+#include "mongo/util/optional_util.h"
namespace mongo {
namespace {
@@ -69,6 +70,16 @@ Date_t readStartTime(const CommonReshardingMetadata& metadata, ClockSource* fall
}
} // namespace
+
+void ReshardingMetrics::ExternallyTrackedRecipientFields::accumulateFrom(
+ const ReshardingOplogApplierProgress& progressDoc) {
+ using optional_util::setOrAdd;
+ setOrAdd(insertsApplied, progressDoc.getInsertsApplied());
+ setOrAdd(updatesApplied, progressDoc.getUpdatesApplied());
+ setOrAdd(deletesApplied, progressDoc.getDeletesApplied());
+ setOrAdd(writesToStashCollections, progressDoc.getWritesToStashCollections());
+}
+
ReshardingMetrics::ReshardingMetrics(UUID instanceId,
BSONObj shardKey,
NamespaceString nss,
@@ -101,6 +112,7 @@ ReshardingMetrics::ReshardingMetrics(UUID instanceId,
clockSource,
cumulativeMetrics,
std::make_unique<ReshardingMetricsFieldNameProvider>()},
+ _ableToEstimateRemainingRecipientTime{!mustRestoreExternallyTrackedRecipientFields(state)},
_deletesApplied{0},
_insertsApplied{0},
_updatesApplied{0},
@@ -155,18 +167,18 @@ ReshardingCumulativeMetrics* ReshardingMetrics::getReshardingCumulativeMetrics()
return dynamic_cast<ReshardingCumulativeMetrics*>(getCumulativeMetrics());
}
-Milliseconds ReshardingMetrics::getRecipientHighEstimateRemainingTimeMillis() const {
- auto estimate = resharding::estimateRemainingRecipientTime(_applyingStartTime.load() != kNoDate,
- getBytesWrittenCount(),
- getApproxBytesToScanCount(),
- getCopyingElapsedTimeSecs(),
- _oplogEntriesApplied.load(),
- _oplogEntriesFetched.load(),
- getApplyingElapsedTimeSecs());
- if (!estimate) {
- return Milliseconds{0};
+boost::optional<Milliseconds> ReshardingMetrics::getRecipientHighEstimateRemainingTimeMillis()
+ const {
+ if (!_ableToEstimateRemainingRecipientTime.load()) {
+ return boost::none;
}
- return *estimate;
+ return resharding::estimateRemainingRecipientTime(_applyingStartTime.load() != kNoDate,
+ getBytesWrittenCount(),
+ getApproxBytesToScanCount(),
+ getCopyingElapsedTimeSecs(),
+ _oplogEntriesApplied.load(),
+ _oplogEntriesFetched.load(),
+ getApplyingElapsedTimeSecs());
}
std::unique_ptr<ReshardingMetrics> ReshardingMetrics::makeInstance(UUID instanceId,
@@ -223,15 +235,6 @@ BSONObj ReshardingMetrics::reportForCurrentOp() const noexcept {
return builder.obj();
}
-void ReshardingMetrics::accumulateFrom(const ReshardingOplogApplierProgress& progressDoc) {
- invariant(_role == Role::kRecipient);
- _insertsApplied.fetchAndAdd(progressDoc.getInsertsApplied());
- _updatesApplied.fetchAndAdd(progressDoc.getUpdatesApplied());
- _deletesApplied.fetchAndAdd(progressDoc.getDeletesApplied());
-
- accumulateWritesToStashCollections(progressDoc.getWritesToStashCollections());
-}
-
void ReshardingMetrics::restoreRecipientSpecificFields(
const ReshardingRecipientDocument& document) {
auto metrics = document.getMetrics();
@@ -419,6 +422,33 @@ void ReshardingMetrics::restoreOplogEntriesApplied(int64_t numEntries) {
_oplogEntriesApplied.store(numEntries);
}
+void ReshardingMetrics::restoreUpdatesApplied(int64_t count) {
+ _updatesApplied.store(count);
+}
+
+void ReshardingMetrics::restoreInsertsApplied(int64_t count) {
+ _insertsApplied.store(count);
+}
+
+void ReshardingMetrics::restoreDeletesApplied(int64_t count) {
+ _deletesApplied.store(count);
+}
+
+void ReshardingMetrics::restoreExternallyTrackedRecipientFields(
+ const ExternallyTrackedRecipientFields& values) {
+ invokeIfAllSet(&ReshardingMetrics::restoreDocumentsProcessed,
+ values.documentCountCopied,
+ values.documentBytesCopied);
+ invokeIfAllSet(&ReshardingMetrics::restoreOplogEntriesFetched, values.oplogEntriesFetched);
+ invokeIfAllSet(&ReshardingMetrics::restoreOplogEntriesApplied, values.oplogEntriesApplied);
+ invokeIfAllSet(&ReshardingMetrics::restoreUpdatesApplied, values.updatesApplied);
+ invokeIfAllSet(&ReshardingMetrics::restoreInsertsApplied, values.insertsApplied);
+ invokeIfAllSet(&ReshardingMetrics::restoreDeletesApplied, values.deletesApplied);
+ invokeIfAllSet(&ReshardingMetrics::restoreWritesToStashCollections,
+ values.writesToStashCollections);
+ _ableToEstimateRemainingRecipientTime.store(true);
+}
+
void ReshardingMetrics::onLocalInsertDuringOplogFetching(Milliseconds elapsed) {
getReshardingCumulativeMetrics()->onLocalInsertDuringOplogFetching(elapsed);
}
diff --git a/src/mongo/db/s/resharding/resharding_metrics.h b/src/mongo/db/s/resharding/resharding_metrics.h
index 5b8f1a76f06..3c2bd1e57a9 100644
--- a/src/mongo/db/s/resharding/resharding_metrics.h
+++ b/src/mongo/db/s/resharding/resharding_metrics.h
@@ -80,6 +80,20 @@ public:
CoordinatorStateEnum _enumVal;
};
+ struct ExternallyTrackedRecipientFields {
+ public:
+ void accumulateFrom(const ReshardingOplogApplierProgress& progressDoc);
+
+ boost::optional<int64_t> documentCountCopied;
+ boost::optional<int64_t> documentBytesCopied;
+ boost::optional<int64_t> oplogEntriesFetched;
+ boost::optional<int64_t> oplogEntriesApplied;
+ boost::optional<int64_t> insertsApplied;
+ boost::optional<int64_t> updatesApplied;
+ boost::optional<int64_t> deletesApplied;
+ boost::optional<int64_t> writesToStashCollections;
+ };
+
ReshardingMetrics(const CommonReshardingMetadata& metadata,
Role role,
ClockSource* clockSource,
@@ -151,28 +165,43 @@ public:
void onStateTransition(T before, T after) {
_stateHolder.onStateTransition(before, after);
}
- void accumulateFrom(const ReshardingOplogApplierProgress& progressDoc);
+
+ template <typename StateOrStateVariant>
+ static bool mustRestoreExternallyTrackedRecipientFields(StateOrStateVariant stateOrVariant) {
+ if constexpr (std::is_same_v<StateOrStateVariant, State>) {
+ return stdx::visit(
+ [](auto v) { return mustRestoreExternallyTrackedRecipientFieldsImpl(v); },
+ stateOrVariant);
+ } else {
+ return mustRestoreExternallyTrackedRecipientFieldsImpl(stateOrVariant);
+ }
+ }
+
BSONObj reportForCurrentOp() const noexcept override;
void onUpdateApplied();
void onInsertApplied();
void onDeleteApplied();
void onOplogEntriesFetched(int64_t numEntries, Milliseconds elapsed);
- void restoreOplogEntriesFetched(int64_t numEntries);
void onOplogEntriesApplied(int64_t numEntries);
- void restoreOplogEntriesApplied(int64_t numEntries);
void onApplyingBegin();
void onApplyingEnd();
void onLocalInsertDuringOplogFetching(Milliseconds elapsed);
void onBatchRetrievedDuringOplogApplying(Milliseconds elapsed);
void onOplogLocalBatchApplied(Milliseconds elapsed);
+ void restoreExternallyTrackedRecipientFields(const ExternallyTrackedRecipientFields& values);
Seconds getApplyingElapsedTimeSecs() const;
Date_t getApplyingBegin() const;
Date_t getApplyingEnd() const;
- Milliseconds getRecipientHighEstimateRemainingTimeMillis() const;
protected:
+ boost::optional<Milliseconds> getRecipientHighEstimateRemainingTimeMillis() const override;
+ void restoreOplogEntriesFetched(int64_t numEntries);
+ void restoreOplogEntriesApplied(int64_t numEntries);
+ void restoreUpdatesApplied(int64_t count);
+ void restoreInsertsApplied(int64_t count);
+ void restoreDeletesApplied(int64_t count);
virtual StringData getStateString() const noexcept override;
void restoreApplyingBegin(Date_t date);
void restoreApplyingEnd(Date_t date);
@@ -196,6 +225,16 @@ private:
}
template <typename T>
+ static bool mustRestoreExternallyTrackedRecipientFieldsImpl(T state) {
+ static_assert(resharding_metrics::isState<T>);
+ if constexpr (std::is_same_v<T, RecipientStateEnum>) {
+ return state > RecipientStateEnum::kAwaitingFetchTimestamp;
+ } else {
+ return false;
+ }
+ }
+
+ template <typename T>
void restorePhaseDurationFields(const T& document) {
static_assert(resharding_metrics::isStateDocument<T>);
auto metrics = document.getMetrics();
@@ -226,6 +265,15 @@ private:
}
}
+ template <typename MemberFn, typename... T>
+ void invokeIfAllSet(MemberFn&& fn, const boost::optional<T>&... args) {
+ if (!(args && ...)) {
+ return;
+ }
+ std::invoke(fn, this, *args...);
+ }
+
+ AtomicWord<bool> _ableToEstimateRemainingRecipientTime;
AtomicWord<int64_t> _deletesApplied;
AtomicWord<int64_t> _insertsApplied;
AtomicWord<int64_t> _updatesApplied;
diff --git a/src/mongo/db/s/resharding/resharding_metrics_helpers.h b/src/mongo/db/s/resharding/resharding_metrics_helpers.h
index d5450d12135..42d8fc01fd0 100644
--- a/src/mongo/db/s/resharding/resharding_metrics_helpers.h
+++ b/src/mongo/db/s/resharding/resharding_metrics_helpers.h
@@ -41,10 +41,14 @@ namespace resharding_metrics {
template <class T>
inline constexpr bool isStateDocument =
- std::disjunction<std::is_same<T, ReshardingRecipientDocument>,
- std::is_same<T, ReshardingCoordinatorDocument>,
- std::is_same<T, ReshardingDonorDocument>>::value;
+ std::disjunction_v<std::is_same<T, ReshardingRecipientDocument>,
+ std::is_same<T, ReshardingCoordinatorDocument>,
+ std::is_same<T, ReshardingDonorDocument>>;
+template <typename T>
+inline constexpr bool isState = std::disjunction_v<std::is_same<T, RecipientStateEnum>,
+ std::is_same<T, CoordinatorStateEnum>,
+ std::is_same<T, DonorStateEnum>>;
template <typename T>
inline constexpr auto getState(const T& document) {
diff --git a/src/mongo/db/s/resharding/resharding_metrics_test.cpp b/src/mongo/db/s/resharding/resharding_metrics_test.cpp
index aa18742fb7c..99b9d2d7cf6 100644
--- a/src/mongo/db/s/resharding/resharding_metrics_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_metrics_test.cpp
@@ -63,7 +63,7 @@ public:
role,
clockSource->now(),
clockSource,
- _cumulativeMetrics.get());
+ getCumulativeMetrics());
}
const UUID& getSourceCollectionId() {
@@ -74,10 +74,20 @@ public:
template <typename T>
BSONObj getReportFromStateDocument(T document) {
auto metrics =
- ReshardingMetrics::initializeFrom(document, getClockSource(), _cumulativeMetrics.get());
+ ReshardingMetrics::initializeFrom(document, getClockSource(), getCumulativeMetrics());
return metrics->reportForCurrentOp();
}
+ auto makeRecipientMetricsWithAmbiguousTimeRemaining() {
+ auto doc = createRecipientDocument(RecipientStateEnum::kApplying, UUID::gen());
+ ReshardingRecipientMetrics metricsDoc;
+ ReshardingMetricsTimeInterval interval;
+ interval.setStart(getClockSource()->now());
+ metricsDoc.setOplogApplication(interval);
+ doc.setMetrics(metricsDoc);
+ return ReshardingMetrics::initializeFrom(doc, getClockSource(), getCumulativeMetrics());
+ }
+
ReshardingRecipientDocument createRecipientDocument(RecipientStateEnum state,
const UUID& operationId) {
RecipientShardContext recipientCtx;
@@ -174,7 +184,7 @@ public:
doc.setMetrics(metricsDoc);
auto metrics =
- ReshardingMetrics::initializeFrom(doc, getClockSource(), _cumulativeMetrics.get());
+ ReshardingMetrics::initializeFrom(doc, getClockSource(), getCumulativeMetrics());
clock->advance(kInterval);
auto report = metrics->reportForCurrentOp();
@@ -235,9 +245,12 @@ TEST_F(ReshardingMetricsTest, RestoresByteAndDocumentCountsFromRecipientStateDoc
TEST_F(ReshardingMetricsTest, RestoresByteAndDocumentCountsDuringCloning) {
constexpr auto kDocsCopied = 50;
constexpr auto kBytesCopied = 500;
+ ReshardingMetrics::ExternallyTrackedRecipientFields external;
+ external.documentCountCopied = kDocsCopied;
+ external.documentBytesCopied = kBytesCopied;
auto metrics = createInstanceMetrics(getClockSource(), UUID::gen(), Role::kRecipient);
- metrics->restoreDocumentsProcessed(kDocsCopied, kBytesCopied);
+ metrics->restoreExternallyTrackedRecipientFields(external);
auto report = metrics->reportForCurrentOp();
ASSERT_EQ(report.getIntField("documentsCopied"), kDocsCopied);
@@ -303,8 +316,11 @@ TEST_F(ReshardingMetricsTest, RestoresFromReshardingApplierProgressDocument) {
progressDoc.setDeletesApplied(789);
progressDoc.setWritesToStashCollections(800);
+ ReshardingMetrics::ExternallyTrackedRecipientFields external;
+ external.accumulateFrom(progressDoc);
+
auto metrics = createInstanceMetrics(getClockSource(), UUID::gen(), Role::kRecipient);
- metrics->accumulateFrom(progressDoc);
+ metrics->restoreExternallyTrackedRecipientFields(external);
auto report = metrics->reportForCurrentOp();
ASSERT_EQ(report.getIntField("insertsApplied"), 123);
@@ -399,16 +415,20 @@ TEST_F(ReshardingMetricsTest, RecipientIncrementFetchedOplogEntries) {
}
TEST_F(ReshardingMetricsTest, RecipientRestoreFetchedOplogEntries) {
+ ReshardingMetrics::ExternallyTrackedRecipientFields external;
+
auto metrics = createInstanceMetrics(getClockSource(), UUID::gen(), Role::kRecipient);
auto report = metrics->reportForCurrentOp();
ASSERT_EQ(report.getIntField("oplogEntriesFetched"), 0);
- metrics->restoreOplogEntriesFetched(100);
+ external.oplogEntriesFetched = 100;
+ metrics->restoreExternallyTrackedRecipientFields(external);
report = metrics->reportForCurrentOp();
ASSERT_EQ(report.getIntField("oplogEntriesFetched"), 100);
- metrics->restoreOplogEntriesFetched(50);
+ external.oplogEntriesFetched = 50;
+ metrics->restoreExternallyTrackedRecipientFields(external);
report = metrics->reportForCurrentOp();
ASSERT_EQ(report.getIntField("oplogEntriesFetched"), 50);
}
@@ -423,15 +443,11 @@ TEST_F(ReshardingMetricsTest, RecipientReportsRemainingTime) {
metrics->setDocumentsToProcessCounts(0, kOpsPerIncrement * 4);
metrics->onOplogEntriesFetched(kOpsPerIncrement * 4, Milliseconds(1));
- // Before cloning.
- auto report = metrics->reportForCurrentOp();
- ASSERT_EQ(report.getIntField("remainingOperationTimeEstimatedSecs"), 0);
-
// During cloning.
metrics->onCopyingBegin();
metrics->onDocumentsProcessed(0, kOpsPerIncrement, Milliseconds(1));
clock->advance(kIncrement);
- report = metrics->reportForCurrentOp();
+ auto report = metrics->reportForCurrentOp();
ASSERT_EQ(report.getIntField("remainingOperationTimeEstimatedSecs"),
kExpectedTotal - kIncrementSecs);
@@ -467,16 +483,20 @@ TEST_F(ReshardingMetricsTest, RecipientReportsRemainingTime) {
}
TEST_F(ReshardingMetricsTest, RecipientRestoreAppliedOplogEntries) {
+ ReshardingMetrics::ExternallyTrackedRecipientFields external;
+
auto metrics = createInstanceMetrics(getClockSource(), UUID::gen(), Role::kRecipient);
auto report = metrics->reportForCurrentOp();
ASSERT_EQ(report.getIntField("oplogEntriesApplied"), 0);
- metrics->restoreOplogEntriesApplied(120);
+ external.oplogEntriesApplied = 120;
+ metrics->restoreExternallyTrackedRecipientFields(external);
report = metrics->reportForCurrentOp();
ASSERT_EQ(report.getIntField("oplogEntriesApplied"), 120);
- metrics->restoreOplogEntriesApplied(30);
+ external.oplogEntriesApplied = 30;
+ metrics->restoreExternallyTrackedRecipientFields(external);
report = metrics->reportForCurrentOp();
ASSERT_EQ(report.getIntField("oplogEntriesApplied"), 30);
}
@@ -490,5 +510,29 @@ TEST_F(ReshardingMetricsTest, CurrentOpReportsApplyingTime) {
[](ReshardingMetrics* metrics) { metrics->onApplyingEnd(); });
}
+TEST_F(ReshardingMetricsTest, RecipientEstimatesNoneOnNewInstance) {
+ auto metrics = createInstanceMetrics(getClockSource(), UUID::gen(), Role::kRecipient);
+ ASSERT_EQ(metrics->getHighEstimateRemainingTimeMillis(), boost::none);
+}
+
+TEST_F(ReshardingMetricsTest,
+ RecipientEstimatesNoneBeforeExternalFieldsRestoredForRestoredInstance) {
+ auto metrics = makeRecipientMetricsWithAmbiguousTimeRemaining();
+ ASSERT_EQ(metrics->getHighEstimateRemainingTimeMillis(), boost::none);
+}
+
+TEST_F(ReshardingMetricsTest, RecipientEstimatesAfterExternalFieldsRestoredForRestoredInstance) {
+ auto metrics = makeRecipientMetricsWithAmbiguousTimeRemaining();
+ metrics->restoreExternallyTrackedRecipientFields(
+ ReshardingMetrics::ExternallyTrackedRecipientFields{});
+ ASSERT_EQ(metrics->getHighEstimateRemainingTimeMillis(), Milliseconds{0});
+}
+
+TEST_F(ReshardingMetricsTest, CurrentOpDoesNotReportRecipientEstimateIfNotSet) {
+ auto metrics = createInstanceMetrics(getClockSource(), UUID::gen(), Role::kRecipient);
+ auto report = metrics->reportForCurrentOp();
+ ASSERT_FALSE(report.hasField("remainingOperationTimeEstimatedSecs"));
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
index 5e096b1e7f2..fe0d54112ea 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
@@ -65,6 +65,7 @@
#include "mongo/s/grid.h"
#include "mongo/s/stale_shard_version_helpers.h"
#include "mongo/util/future_util.h"
+#include "mongo/util/optional_util.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kResharding
@@ -1124,7 +1125,7 @@ void ReshardingRecipientService::RecipientStateMachine::_removeRecipientDocument
ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_startMetrics(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
const CancellationToken& abortToken) {
- if (_recipientCtx.getState() > RecipientStateEnum::kAwaitingFetchTimestamp) {
+ if (_metrics->mustRestoreExternallyTrackedRecipientFields(_recipientCtx.getState())) {
return _restoreMetricsWithRetry(executor, abortToken);
}
@@ -1148,28 +1149,25 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_restore
void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics(
const CancelableOperationContextFactory& factory) {
- int64_t documentCountCopied = 0;
- int64_t documentBytesCopied = 0;
- int64_t oplogEntriesFetched = 0;
- int64_t oplogEntriesApplied = 0;
+ ReshardingMetrics::ExternallyTrackedRecipientFields externalMetrics;
auto opCtx = factory.makeOperationContext(&cc());
- {
+ [&] {
AutoGetCollection tempReshardingColl(
opCtx.get(), _metadata.getTempReshardingNss(), MODE_IS);
- if (tempReshardingColl) {
- documentBytesCopied = tempReshardingColl->dataSize(opCtx.get());
- documentCountCopied = tempReshardingColl->numRecords(opCtx.get());
+ if (!tempReshardingColl) {
+ return;
}
-
- if (_recipientCtx.getState() == RecipientStateEnum::kCloning) {
+ if (_recipientCtx.getState() != RecipientStateEnum::kCloning) {
// Before cloning, these values are 0. After cloning these values are written to the
// metrics section of the recipient state document and restored during metrics
// initialization. This is so that applied oplog entries that add or remove documents do
// not affect the cloning metrics.
- _metrics->restoreDocumentsProcessed(documentCountCopied, documentBytesCopied);
+ return;
}
- }
+ externalMetrics.documentBytesCopied = tempReshardingColl->dataSize(opCtx.get());
+ externalMetrics.documentCountCopied = tempReshardingColl->numRecords(opCtx.get());
+ }();
reshardingOpCtxKilledWhileRestoringMetrics.execute(
[&opCtx](const BSONObj& data) { opCtx->markKilled(); });
@@ -1183,7 +1181,8 @@ void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics(
_metadata.getSourceUUID(), donor.getShardId()),
MODE_IS);
if (oplogBufferColl) {
- oplogEntriesFetched += oplogBufferColl->numRecords(opCtx.get());
+ optional_util::setOrAdd(externalMetrics.oplogEntriesFetched,
+ oplogBufferColl->numRecords(opCtx.get()));
}
}
@@ -1203,7 +1202,8 @@ void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics(
if (!result.isEmpty()) {
progressDoc = ReshardingOplogApplierProgress::parse(
IDLParserContext("resharding-recipient-service-progress-doc"), result);
- oplogEntriesApplied += progressDoc->getNumEntriesApplied();
+ optional_util::setOrAdd(externalMetrics.oplogEntriesApplied,
+ progressDoc->getNumEntriesApplied());
}
}
@@ -1223,15 +1223,14 @@ void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics(
continue;
}
- _metrics->accumulateFrom(*progressDoc);
+ externalMetrics.accumulateFrom(*progressDoc);
auto applierMetrics =
std::make_unique<ReshardingOplogApplierMetrics>(_metrics.get(), progressDoc);
_applierMetricsMap.emplace(shardId, std::move(applierMetrics));
}
- _metrics->restoreOplogEntriesFetched(oplogEntriesFetched);
- _metrics->restoreOplogEntriesApplied(oplogEntriesApplied);
+ _metrics->restoreExternallyTrackedRecipientFields(externalMetrics);
}
CancellationToken ReshardingRecipientService::RecipientStateMachine::_initAbortSource(
diff --git a/src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp b/src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp
index e13cd3857a1..0b0665a6b02 100644
--- a/src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp
+++ b/src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp
@@ -42,6 +42,7 @@ constexpr auto kActive = "active";
constexpr auto kOldestActive = "oldestActive";
constexpr auto kLatencies = "latencies";
constexpr auto kCurrentInSteps = "currentInSteps";
+constexpr auto kEstimateNotAvailable = -1;
struct Metrics {
ReshardingCumulativeMetrics _resharding;
@@ -53,7 +54,6 @@ const auto getMetrics = ServiceContext::declareDecoration<MetricsPtr>();
const auto metricsRegisterer = ServiceContext::ConstructorActionRegisterer{
"ShardingDataTransformMetrics",
[](ServiceContext* ctx) { getMetrics(ctx) = std::make_unique<Metrics>(); }};
-
} // namespace
ShardingDataTransformCumulativeMetrics* ShardingDataTransformCumulativeMetrics::getForResharding(
@@ -86,18 +86,35 @@ ShardingDataTransformCumulativeMetrics::registerInstanceMetrics(const InstanceOb
int64_t ShardingDataTransformCumulativeMetrics::getOldestOperationHighEstimateRemainingTimeMillis(
Role role) const {
-
- stdx::unique_lock guard(_mutex);
- auto op = getOldestOperation(guard, role);
- return op ? op->getHighEstimateRemainingTimeMillis() : 0;
+ return getOldestOperationEstimateRemainingTimeMillis(role, EstimateType::kHigh);
}
int64_t ShardingDataTransformCumulativeMetrics::getOldestOperationLowEstimateRemainingTimeMillis(
Role role) const {
+ return getOldestOperationEstimateRemainingTimeMillis(role, EstimateType::kLow);
+}
+
+int64_t ShardingDataTransformCumulativeMetrics::getOldestOperationEstimateRemainingTimeMillis(
+ Role role, EstimateType type) const {
stdx::unique_lock guard(_mutex);
auto op = getOldestOperation(guard, role);
- return op ? op->getLowEstimateRemainingTimeMillis() : 0;
+ if (!op) {
+ return kEstimateNotAvailable;
+ }
+ auto estimate = getEstimate(op, type);
+ return estimate ? estimate->count() : kEstimateNotAvailable;
+}
+
+boost::optional<Milliseconds> ShardingDataTransformCumulativeMetrics::getEstimate(
+ const InstanceObserver* op, EstimateType type) const {
+ switch (type) {
+ case kHigh:
+ return op->getHighEstimateRemainingTimeMillis();
+ case kLow:
+ return op->getLowEstimateRemainingTimeMillis();
+ }
+ MONGO_UNREACHABLE;
}
size_t ShardingDataTransformCumulativeMetrics::getObservedMetricsCount() const {
diff --git a/src/mongo/db/s/sharding_data_transform_cumulative_metrics.h b/src/mongo/db/s/sharding_data_transform_cumulative_metrics.h
index 814ef0790d4..21a107c8f53 100644
--- a/src/mongo/db/s/sharding_data_transform_cumulative_metrics.h
+++ b/src/mongo/db/s/sharding_data_transform_cumulative_metrics.h
@@ -119,9 +119,13 @@ protected:
virtual void reportCurrentInSteps(BSONObjBuilder* bob) const;
private:
+ enum EstimateType { kHigh, kLow };
+
MetricsSet& getMetricsSetForRole(Role role);
const MetricsSet& getMetricsSetForRole(Role role) const;
const InstanceObserver* getOldestOperation(WithLock, Role role) const;
+ int64_t getOldestOperationEstimateRemainingTimeMillis(Role role, EstimateType type) const;
+ boost::optional<Milliseconds> getEstimate(const InstanceObserver* op, EstimateType type) const;
MetricsSet::iterator insertMetrics(const InstanceObserver* metrics, MetricsSet& set);
void deregisterMetrics(const Role& role, const MetricsSet::iterator& metrics);
diff --git a/src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp b/src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp
index 299bddb3b09..1fa2585ac3e 100644
--- a/src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp
+++ b/src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp
@@ -108,11 +108,11 @@ TEST_F(ShardingDataTransformMetricsTestFixture, NoServerStatusWhenNeverUsed) {
ASSERT_BSONOBJ_EQ(report, BSONObj());
}
-TEST_F(ShardingDataTransformMetricsTestFixture, RemainingTimeReports0WhenEmpty) {
+TEST_F(ShardingDataTransformMetricsTestFixture, RemainingTimeReportsMinusOneWhenEmpty) {
ASSERT_EQ(_cumulativeMetrics->getObservedMetricsCount(), 0);
ASSERT_EQ(_cumulativeMetrics->getOldestOperationHighEstimateRemainingTimeMillis(
ObserverMock::kDefaultRole),
- 0);
+ -1);
}
TEST_F(ShardingDataTransformMetricsTestFixture, UpdatesOldestWhenOldestIsRemoved) {
diff --git a/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp b/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp
index 96ab3d2a6fd..9c65d80b4b3 100644
--- a/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp
+++ b/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp
@@ -33,6 +33,29 @@
namespace mongo {
+namespace {
+constexpr auto kNoEstimate = Milliseconds{-1};
+
+boost::optional<Milliseconds> readCoordinatorEstimate(const AtomicWord<Milliseconds>& field) {
+ auto estimate = field.load();
+ if (estimate == kNoEstimate) {
+ return boost::none;
+ }
+ return estimate;
+}
+
+template <typename T>
+void appendOptionalMillisecondsFieldAs(BSONObjBuilder& builder,
+ const StringData& fieldName,
+ const boost::optional<Milliseconds> value) {
+ if (!value) {
+ return;
+ }
+ builder.append(fieldName, durationCount<T>(*value));
+}
+
+} // namespace
+
ShardingDataTransformInstanceMetrics::ShardingDataTransformInstanceMetrics(
UUID instanceId,
BSONObj originalCommand,
@@ -78,30 +101,32 @@ ShardingDataTransformInstanceMetrics::ShardingDataTransformInstanceMetrics(
_documentsProcessed{0},
_approxBytesToScan{0},
_bytesWritten{0},
- _coordinatorHighEstimateRemainingTimeMillis{Milliseconds{0}},
- _coordinatorLowEstimateRemainingTimeMillis{Milliseconds{0}},
+ _coordinatorHighEstimateRemainingTimeMillis{kNoEstimate},
+ _coordinatorLowEstimateRemainingTimeMillis{kNoEstimate},
_criticalSectionStartTime{kNoDate},
_criticalSectionEndTime{kNoDate},
_writesDuringCriticalSection{0} {}
-Milliseconds ShardingDataTransformInstanceMetrics::getHighEstimateRemainingTimeMillis() const {
+boost::optional<Milliseconds>
+ShardingDataTransformInstanceMetrics::getHighEstimateRemainingTimeMillis() const {
switch (_role) {
case Role::kRecipient:
return getRecipientHighEstimateRemainingTimeMillis();
case Role::kCoordinator:
- return Milliseconds{_coordinatorHighEstimateRemainingTimeMillis.load()};
+ return readCoordinatorEstimate(_coordinatorHighEstimateRemainingTimeMillis);
case Role::kDonor:
break;
}
MONGO_UNREACHABLE;
}
-Milliseconds ShardingDataTransformInstanceMetrics::getLowEstimateRemainingTimeMillis() const {
+boost::optional<Milliseconds>
+ShardingDataTransformInstanceMetrics::getLowEstimateRemainingTimeMillis() const {
switch (_role) {
case Role::kRecipient:
return getHighEstimateRemainingTimeMillis();
case Role::kCoordinator:
- return _coordinatorLowEstimateRemainingTimeMillis.load();
+ return readCoordinatorEstimate(_coordinatorLowEstimateRemainingTimeMillis);
case Role::kDonor:
break;
}
@@ -141,10 +166,14 @@ BSONObj ShardingDataTransformInstanceMetrics::reportForCurrentOp() const noexcep
builder.append(_fieldNames->getForOpTimeElapsed(), getOperationRunningTimeSecs().count());
switch (_role) {
case Role::kCoordinator:
- builder.append(_fieldNames->getForAllShardsHighestRemainingOperationTimeEstimatedSecs(),
- durationCount<Seconds>(getHighEstimateRemainingTimeMillis()));
- builder.append(_fieldNames->getForAllShardsLowestRemainingOperationTimeEstimatedSecs(),
- durationCount<Seconds>(getLowEstimateRemainingTimeMillis()));
+ appendOptionalMillisecondsFieldAs<Seconds>(
+ builder,
+ _fieldNames->getForAllShardsHighestRemainingOperationTimeEstimatedSecs(),
+ getHighEstimateRemainingTimeMillis());
+ appendOptionalMillisecondsFieldAs<Seconds>(
+ builder,
+ _fieldNames->getForAllShardsLowestRemainingOperationTimeEstimatedSecs(),
+ getLowEstimateRemainingTimeMillis());
builder.append(_fieldNames->getForCoordinatorState(), getStateString());
builder.append(_fieldNames->getForCopyTimeElapsed(),
getCopyingElapsedTimeSecs().count());
@@ -164,8 +193,10 @@ BSONObj ShardingDataTransformInstanceMetrics::reportForCurrentOp() const noexcep
builder.append(_fieldNames->getForRecipientState(), getStateString());
builder.append(_fieldNames->getForCopyTimeElapsed(),
getCopyingElapsedTimeSecs().count());
- builder.append(_fieldNames->getForRemainingOpTimeEstimated(),
- durationCount<Seconds>(getHighEstimateRemainingTimeMillis()));
+ appendOptionalMillisecondsFieldAs<Seconds>(
+ builder,
+ _fieldNames->getForRemainingOpTimeEstimated(),
+ getHighEstimateRemainingTimeMillis());
builder.append(_fieldNames->getForApproxDocumentsToProcess(),
_approxDocumentsToProcess.load());
builder.append(_fieldNames->getForApproxBytesToScan(), _approxBytesToScan.load());
@@ -231,6 +262,11 @@ void ShardingDataTransformInstanceMetrics::restoreDocumentsProcessed(
_bytesWritten.store(totalDocumentsSizeBytes);
}
+void ShardingDataTransformInstanceMetrics::restoreWritesToStashCollections(
+ int64_t writesToStashCollections) {
+ _writesToStashCollections.store(writesToStashCollections);
+}
+
void ShardingDataTransformInstanceMetrics::setDocumentsToProcessCounts(
int64_t documentCount, int64_t totalDocumentsSizeBytes) {
_approxDocumentsToProcess.store(documentCount);
@@ -282,11 +318,6 @@ void ShardingDataTransformInstanceMetrics::onReadDuringCriticalSection() {
_cumulativeMetrics->onWriteDuringCriticalSection();
}
-void ShardingDataTransformInstanceMetrics::accumulateWritesToStashCollections(
- int64_t writesToStashCollections) {
- _writesToStashCollections.fetchAndAdd(writesToStashCollections);
-}
-
void ShardingDataTransformInstanceMetrics::onCloningTotalRemoteBatchRetrieval(
Milliseconds elapsed) {
_cumulativeMetrics->onCloningTotalRemoteBatchRetrieval(elapsed);
diff --git a/src/mongo/db/s/sharding_data_transform_instance_metrics.h b/src/mongo/db/s/sharding_data_transform_instance_metrics.h
index 2c26f4ab480..ecd8a776afb 100644
--- a/src/mongo/db/s/sharding_data_transform_instance_metrics.h
+++ b/src/mongo/db/s/sharding_data_transform_instance_metrics.h
@@ -67,9 +67,8 @@ public:
virtual BSONObj reportForCurrentOp() const noexcept;
- Milliseconds getHighEstimateRemainingTimeMillis() const;
- Milliseconds getLowEstimateRemainingTimeMillis() const;
- virtual Milliseconds getRecipientHighEstimateRemainingTimeMillis() const = 0;
+ boost::optional<Milliseconds> getHighEstimateRemainingTimeMillis() const;
+ boost::optional<Milliseconds> getLowEstimateRemainingTimeMillis() const;
Date_t getStartTimestamp() const;
const UUID& getInstanceId() const;
@@ -90,7 +89,6 @@ public:
int64_t getDocumentsProcessedCount() const;
int64_t getBytesWrittenCount() const;
int64_t getApproxBytesToScanCount() const;
- void restoreDocumentsProcessed(int64_t documentCount, int64_t totalDocumentsSizeBytes);
void setDocumentsToProcessCounts(int64_t documentCount, int64_t totalDocumentsSizeBytes);
void setCoordinatorHighEstimateRemainingTimeMillis(Milliseconds milliseconds);
void setCoordinatorLowEstimateRemainingTimeMillis(Milliseconds milliseconds);
@@ -130,9 +128,11 @@ protected:
}
void restoreCopyingBegin(Date_t date);
void restoreCopyingEnd(Date_t date);
+ void restoreDocumentsProcessed(int64_t documentCount, int64_t totalDocumentsSizeBytes);
+ void restoreWritesToStashCollections(int64_t writesToStashCollections);
virtual std::string createOperationDescription() const noexcept;
virtual StringData getStateString() const noexcept;
- void accumulateWritesToStashCollections(int64_t writesToStashCollections);
+ virtual boost::optional<Milliseconds> getRecipientHighEstimateRemainingTimeMillis() const = 0;
ShardingDataTransformCumulativeMetrics* getCumulativeMetrics();
ClockSource* getClockSource() const;
diff --git a/src/mongo/db/s/sharding_data_transform_instance_metrics_test.cpp b/src/mongo/db/s/sharding_data_transform_instance_metrics_test.cpp
index f5d4ba2df01..a690d908879 100644
--- a/src/mongo/db/s/sharding_data_transform_instance_metrics_test.cpp
+++ b/src/mongo/db/s/sharding_data_transform_instance_metrics_test.cpp
@@ -105,8 +105,8 @@ public:
std::move(observer)},
_scopedObserver(registerInstanceMetrics()) {}
- Milliseconds getRecipientHighEstimateRemainingTimeMillis() const {
- return Milliseconds{0};
+ boost::optional<Milliseconds> getRecipientHighEstimateRemainingTimeMillis() const {
+ return boost::none;
}
private:
@@ -325,25 +325,41 @@ TEST_F(ShardingDataTransformInstanceMetricsTest, OnWriteToStasheddShouldIncremen
TEST_F(ShardingDataTransformInstanceMetricsTest,
SetLowestOperationTimeShouldBeReflectedInCurrentOp) {
auto metrics = createInstanceMetrics(UUID::gen(), Role::kCoordinator);
-
- auto report = metrics->reportForCurrentOp();
- ASSERT_EQ(report.getIntField("allShardsLowestRemainingOperationTimeEstimatedSecs"), 0);
metrics->setCoordinatorLowEstimateRemainingTimeMillis(Milliseconds(2000));
-
- report = metrics->reportForCurrentOp();
+ auto report = metrics->reportForCurrentOp();
ASSERT_EQ(report.getIntField("allShardsLowestRemainingOperationTimeEstimatedSecs"), 2);
}
TEST_F(ShardingDataTransformInstanceMetricsTest,
SetHighestOperationTimeShouldBeReflectedInCurrentOp) {
auto metrics = createInstanceMetrics(UUID::gen(), Role::kCoordinator);
+ metrics->setCoordinatorHighEstimateRemainingTimeMillis(Milliseconds(12000));
+ auto report = metrics->reportForCurrentOp();
+ ASSERT_EQ(report.getIntField("allShardsHighestRemainingOperationTimeEstimatedSecs"), 12);
+}
+
+TEST_F(ShardingDataTransformInstanceMetricsTest, CoordinatorHighEstimateNoneIfNotSet) {
+ auto metrics = createInstanceMetrics(UUID::gen(), Role::kCoordinator);
+ ASSERT_EQ(metrics->getHighEstimateRemainingTimeMillis(), boost::none);
+}
+
+TEST_F(ShardingDataTransformInstanceMetricsTest, CoordinatorLowEstimateNoneIfNotSet) {
+ auto metrics = createInstanceMetrics(UUID::gen(), Role::kCoordinator);
+ ASSERT_EQ(metrics->getLowEstimateRemainingTimeMillis(), boost::none);
+}
+TEST_F(ShardingDataTransformInstanceMetricsTest,
+ CurrentOpDoesNotReportCoordinatorHighEstimateIfNotSet) {
+ auto metrics = createInstanceMetrics(UUID::gen(), Role::kCoordinator);
auto report = metrics->reportForCurrentOp();
- ASSERT_EQ(report.getIntField("allShardsHighestRemainingOperationTimeEstimatedSecs"), 0);
- metrics->setCoordinatorHighEstimateRemainingTimeMillis(Milliseconds(12000));
+ ASSERT_FALSE(report.hasField("allShardsHighestRemainingOperationTimeEstimatedSecs"));
+}
- report = metrics->reportForCurrentOp();
- ASSERT_EQ(report.getIntField("allShardsHighestRemainingOperationTimeEstimatedSecs"), 12);
+TEST_F(ShardingDataTransformInstanceMetricsTest,
+ CurrentOpDoesNotReportCoordinatorLowEstimateIfNotSet) {
+ auto metrics = createInstanceMetrics(UUID::gen(), Role::kCoordinator);
+ auto report = metrics->reportForCurrentOp();
+ ASSERT_FALSE(report.hasField("allShardsLowestRemainingOperationTimeEstimatedSecs"));
}
} // namespace
diff --git a/src/mongo/db/s/sharding_data_transform_metrics_observer.cpp b/src/mongo/db/s/sharding_data_transform_metrics_observer.cpp
index eca06d99acb..d1eadcd5038 100644
--- a/src/mongo/db/s/sharding_data_transform_metrics_observer.cpp
+++ b/src/mongo/db/s/sharding_data_transform_metrics_observer.cpp
@@ -35,12 +35,14 @@ ShardingDataTransformMetricsObserver::ShardingDataTransformMetricsObserver(
ShardingDataTransformInstanceMetrics* metrics)
: _metrics(metrics) {}
-int64_t ShardingDataTransformMetricsObserver::getHighEstimateRemainingTimeMillis() const {
- return _metrics->getHighEstimateRemainingTimeMillis().count();
+boost::optional<Milliseconds>
+ShardingDataTransformMetricsObserver::getHighEstimateRemainingTimeMillis() const {
+ return _metrics->getHighEstimateRemainingTimeMillis();
}
-int64_t ShardingDataTransformMetricsObserver::getLowEstimateRemainingTimeMillis() const {
- return _metrics->getLowEstimateRemainingTimeMillis().count();
+boost::optional<Milliseconds>
+ShardingDataTransformMetricsObserver::getLowEstimateRemainingTimeMillis() const {
+ return _metrics->getLowEstimateRemainingTimeMillis();
}
Date_t ShardingDataTransformMetricsObserver::getStartTimestamp() const {
diff --git a/src/mongo/db/s/sharding_data_transform_metrics_observer.h b/src/mongo/db/s/sharding_data_transform_metrics_observer.h
index 6a7ad9421bd..64e52334d5a 100644
--- a/src/mongo/db/s/sharding_data_transform_metrics_observer.h
+++ b/src/mongo/db/s/sharding_data_transform_metrics_observer.h
@@ -38,8 +38,8 @@ namespace mongo {
class ShardingDataTransformMetricsObserver : public ShardingDataTransformMetricsObserverInterface {
public:
ShardingDataTransformMetricsObserver(ShardingDataTransformInstanceMetrics* metrics);
- int64_t getHighEstimateRemainingTimeMillis() const override;
- int64_t getLowEstimateRemainingTimeMillis() const override;
+ boost::optional<Milliseconds> getHighEstimateRemainingTimeMillis() const override;
+ boost::optional<Milliseconds> getLowEstimateRemainingTimeMillis() const override;
Date_t getStartTimestamp() const override;
const UUID& getUuid() const override;
ShardingDataTransformMetrics::Role getRole() const override;
diff --git a/src/mongo/db/s/sharding_data_transform_metrics_observer_interface.h b/src/mongo/db/s/sharding_data_transform_metrics_observer_interface.h
index 68c6426a6ec..de68cfddc26 100644
--- a/src/mongo/db/s/sharding_data_transform_metrics_observer_interface.h
+++ b/src/mongo/db/s/sharding_data_transform_metrics_observer_interface.h
@@ -36,8 +36,8 @@ namespace mongo {
class ShardingDataTransformMetricsObserverInterface {
public:
virtual ~ShardingDataTransformMetricsObserverInterface() = default;
- virtual int64_t getHighEstimateRemainingTimeMillis() const = 0;
- virtual int64_t getLowEstimateRemainingTimeMillis() const = 0;
+ virtual boost::optional<Milliseconds> getHighEstimateRemainingTimeMillis() const = 0;
+ virtual boost::optional<Milliseconds> getLowEstimateRemainingTimeMillis() const = 0;
virtual Date_t getStartTimestamp() const = 0;
virtual const UUID& getUuid() const = 0;
virtual ShardingDataTransformMetrics::Role getRole() const = 0;
diff --git a/src/mongo/db/s/sharding_data_transform_metrics_test_fixture.h b/src/mongo/db/s/sharding_data_transform_metrics_test_fixture.h
index a898e9fca73..cc66423e231 100644
--- a/src/mongo/db/s/sharding_data_transform_metrics_test_fixture.h
+++ b/src/mongo/db/s/sharding_data_transform_metrics_test_fixture.h
@@ -63,11 +63,11 @@ public:
return _uuid;
}
- virtual int64_t getHighEstimateRemainingTimeMillis() const override {
+ virtual boost::optional<Milliseconds> getHighEstimateRemainingTimeMillis() const override {
return _timeRemainingHigh;
}
- virtual int64_t getLowEstimateRemainingTimeMillis() const override {
+ virtual boost::optional<Milliseconds> getLowEstimateRemainingTimeMillis() const override {
return _timeRemainingLow;
}
@@ -82,8 +82,8 @@ public:
private:
UUID _uuid;
Date_t _startTime;
- int64_t _timeRemainingHigh;
- int64_t _timeRemainingLow;
+ Milliseconds _timeRemainingHigh;
+ Milliseconds _timeRemainingLow;
ShardingDataTransformMetrics::Role _role;
};
@@ -186,6 +186,10 @@ protected:
return &clock.value();
}
+ ShardingDataTransformCumulativeMetrics* getCumulativeMetrics() {
+ return _cumulativeMetrics.get();
+ }
+
using SpecialIndexBehaviorMap = stdx::unordered_map<int, std::function<void()>>;
const SpecialIndexBehaviorMap kNoSpecialBehavior{};
SpecialIndexBehaviorMap registerAtIndex(int index, const ObserverMock* mock) {
diff --git a/src/mongo/util/optional_util.h b/src/mongo/util/optional_util.h
index dca4ab7b7c4..e1820af7320 100644
--- a/src/mongo/util/optional_util.h
+++ b/src/mongo/util/optional_util.h
@@ -139,4 +139,15 @@ template <typename T, std::enable_if_t<canStreamWithExtension<T>, int> = 0>
Extension(const T& t)->Extension<T>;
} // namespace optional_io
+
+namespace optional_util {
+template <typename T, typename U>
+void setOrAdd(boost::optional<T>& counter, U value) {
+ if (!counter) {
+ counter = value;
+ return;
+ }
+ counter = *counter + value;
+}
+} // namespace optional_util
} // namespace mongo