summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/resharding/resharding_recipient_service.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/resharding/resharding_recipient_service.cpp')
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.cpp35
1 files changed, 17 insertions, 18 deletions
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
index 5e096b1e7f2..fe0d54112ea 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
@@ -65,6 +65,7 @@
#include "mongo/s/grid.h"
#include "mongo/s/stale_shard_version_helpers.h"
#include "mongo/util/future_util.h"
+#include "mongo/util/optional_util.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kResharding
@@ -1124,7 +1125,7 @@ void ReshardingRecipientService::RecipientStateMachine::_removeRecipientDocument
ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_startMetrics(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
const CancellationToken& abortToken) {
- if (_recipientCtx.getState() > RecipientStateEnum::kAwaitingFetchTimestamp) {
+ if (_metrics->mustRestoreExternallyTrackedRecipientFields(_recipientCtx.getState())) {
return _restoreMetricsWithRetry(executor, abortToken);
}
@@ -1148,28 +1149,25 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_restore
void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics(
const CancelableOperationContextFactory& factory) {
- int64_t documentCountCopied = 0;
- int64_t documentBytesCopied = 0;
- int64_t oplogEntriesFetched = 0;
- int64_t oplogEntriesApplied = 0;
+ ReshardingMetrics::ExternallyTrackedRecipientFields externalMetrics;
auto opCtx = factory.makeOperationContext(&cc());
- {
+ [&] {
AutoGetCollection tempReshardingColl(
opCtx.get(), _metadata.getTempReshardingNss(), MODE_IS);
- if (tempReshardingColl) {
- documentBytesCopied = tempReshardingColl->dataSize(opCtx.get());
- documentCountCopied = tempReshardingColl->numRecords(opCtx.get());
+ if (!tempReshardingColl) {
+ return;
}
-
- if (_recipientCtx.getState() == RecipientStateEnum::kCloning) {
+ if (_recipientCtx.getState() != RecipientStateEnum::kCloning) {
// Before cloning, these values are 0. After cloning these values are written to the
// metrics section of the recipient state document and restored during metrics
// initialization. This is so that applied oplog entries that add or remove documents do
// not affect the cloning metrics.
- _metrics->restoreDocumentsProcessed(documentCountCopied, documentBytesCopied);
+ return;
}
- }
+ externalMetrics.documentBytesCopied = tempReshardingColl->dataSize(opCtx.get());
+ externalMetrics.documentCountCopied = tempReshardingColl->numRecords(opCtx.get());
+ }();
reshardingOpCtxKilledWhileRestoringMetrics.execute(
[&opCtx](const BSONObj& data) { opCtx->markKilled(); });
@@ -1183,7 +1181,8 @@ void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics(
_metadata.getSourceUUID(), donor.getShardId()),
MODE_IS);
if (oplogBufferColl) {
- oplogEntriesFetched += oplogBufferColl->numRecords(opCtx.get());
+ optional_util::setOrAdd(externalMetrics.oplogEntriesFetched,
+ oplogBufferColl->numRecords(opCtx.get()));
}
}
@@ -1203,7 +1202,8 @@ void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics(
if (!result.isEmpty()) {
progressDoc = ReshardingOplogApplierProgress::parse(
IDLParserContext("resharding-recipient-service-progress-doc"), result);
- oplogEntriesApplied += progressDoc->getNumEntriesApplied();
+ optional_util::setOrAdd(externalMetrics.oplogEntriesApplied,
+ progressDoc->getNumEntriesApplied());
}
}
@@ -1223,15 +1223,14 @@ void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics(
continue;
}
- _metrics->accumulateFrom(*progressDoc);
+ externalMetrics.accumulateFrom(*progressDoc);
auto applierMetrics =
std::make_unique<ReshardingOplogApplierMetrics>(_metrics.get(), progressDoc);
_applierMetricsMap.emplace(shardId, std::move(applierMetrics));
}
- _metrics->restoreOplogEntriesFetched(oplogEntriesFetched);
- _metrics->restoreOplogEntriesApplied(oplogEntriesApplied);
+ _metrics->restoreExternallyTrackedRecipientFields(externalMetrics);
}
CancellationToken ReshardingRecipientService::RecipientStateMachine::_initAbortSource(