summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/resharding/resharding_recipient_service.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/resharding/resharding_recipient_service.cpp')
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.cpp78
1 files changed, 42 insertions, 36 deletions
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
index baaf64fc5e3..5b66d19e8bd 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
@@ -120,12 +120,12 @@ using resharding_metrics::getIntervalStartFieldName;
using DocT = ReshardingRecipientDocument;
const auto metricsPrefix = resharding_metrics::getMetricsPrefix<DocT>();
-void buildStateDocumentCloneMetricsForUpdate(BSONObjBuilder& bob, ReshardingMetricsNew* metrics) {
+void buildStateDocumentCloneMetricsForUpdate(BSONObjBuilder& bob, ReshardingMetrics* metrics) {
bob.append(getIntervalStartFieldName<DocT>(ReshardingRecipientMetrics::kDocumentCopyFieldName),
metrics->getCopyingBegin());
}
-void buildStateDocumentApplyMetricsForUpdate(BSONObjBuilder& bob, ReshardingMetricsNew* metrics) {
+void buildStateDocumentApplyMetricsForUpdate(BSONObjBuilder& bob, ReshardingMetrics* metrics) {
bob.append(getIntervalEndFieldName<DocT>(ReshardingRecipientMetrics::kDocumentCopyFieldName),
metrics->getCopyingEnd());
bob.append(
@@ -138,14 +138,14 @@ void buildStateDocumentApplyMetricsForUpdate(BSONObjBuilder& bob, ReshardingMetr
}
void buildStateDocumentStrictConsistencyMetricsForUpdate(BSONObjBuilder& bob,
- ReshardingMetricsNew* metrics) {
+ ReshardingMetrics* metrics) {
bob.append(
getIntervalEndFieldName<DocT>(ReshardingRecipientMetrics::kOplogApplicationFieldName),
metrics->getApplyingEnd());
}
void buildStateDocumentMetricsForUpdate(BSONObjBuilder& bob,
- ReshardingMetricsNew* metrics,
+ ReshardingMetrics* metrics,
RecipientStateEnum newState) {
switch (newState) {
case RecipientStateEnum::kCloning:
@@ -162,8 +162,8 @@ void buildStateDocumentMetricsForUpdate(BSONObjBuilder& bob,
}
}
-ReshardingMetricsNew::RecipientState toMetricsState(RecipientStateEnum state) {
- return ReshardingMetricsNew::RecipientState(state);
+ReshardingMetrics::RecipientState toMetricsState(RecipientStateEnum state) {
+ return ReshardingMetrics::RecipientState(state);
}
} // namespace
@@ -190,7 +190,7 @@ ReshardingRecipientService::RecipientStateMachine::RecipientStateMachine(
ReshardingDataReplicationFactory dataReplicationFactory)
: repl::PrimaryOnlyService::TypedInstance<RecipientStateMachine>(),
_recipientService{recipientService},
- _metricsNew{ReshardingMetricsNew::initializeFrom(recipientDoc, getGlobalServiceContext())},
+ _metrics{ReshardingMetrics::initializeFrom(recipientDoc, getGlobalServiceContext())},
_metadata{recipientDoc.getCommonReshardingMetadata()},
_minimumOperationDuration{Milliseconds{recipientDoc.getMinimumOperationDurationMillis()}},
_recipientCtx{recipientDoc.getMutableState()},
@@ -219,7 +219,7 @@ ReshardingRecipientService::RecipientStateMachine::RecipientStateMachine(
}()) {
invariant(_externalState);
- _metricsNew->onStateTransition(boost::none, toMetricsState(_recipientCtx.getState()));
+ _metrics->onStateTransition(boost::none, toMetricsState(_recipientCtx.getState()));
}
ExecutorFuture<void>
@@ -370,7 +370,9 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_finishR
if (!_isAlsoDonor) {
auto opCtx = factory.makeOperationContext(&cc());
- _externalState->clearFilteringMetadata(opCtx.get());
+ _externalState->clearFilteringMetadata(opCtx.get(),
+ _metadata.getSourceNss(),
+ _metadata.getTempReshardingNss());
RecoverableCriticalSectionService::get(opCtx.get())
->releaseRecoverableCriticalSection(
@@ -417,7 +419,13 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_runMand
self = shared_from_this(),
outerStatus = status,
isCanceled = stepdownToken.isCanceled()](Status dataReplicationHaltStatus) {
- _metricsNew->onStateTransition(toMetricsState(_recipientCtx.getState()), boost::none);
+ _metrics->onStateTransition(toMetricsState(_recipientCtx.getState()), boost::none);
+
+ // Destroy metrics early so it's lifetime will not be tied to the lifetime of this
+ // state machine. This is because we have future callbacks copy shared pointers to this
+ // state machine that causes it to live longer than expected and potentially overlap
+ // with a newer instance when stepping up.
+ _metrics.reset();
// If the stepdownToken was triggered, it takes priority in order to make sure that
// the promise is set with an error that the coordinator can retry with. If it ran into
@@ -432,7 +440,6 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_runMand
// replication errors because resharding is known to have failed already.
stdx::lock_guard<Latch> lk(_mutex);
ensureFulfilledPromise(lk, _completionPromise, outerStatus);
-
return outerStatus;
});
}
@@ -504,7 +511,7 @@ void ReshardingRecipientService::RecipientStateMachine::interrupt(Status status)
boost::optional<BSONObj> ReshardingRecipientService::RecipientStateMachine::reportForCurrentOp(
MongoProcessInterface::CurrentOpConnectionsMode,
MongoProcessInterface::CurrentOpSessionsMode) noexcept {
- return _metricsNew->reportForCurrentOp();
+ return _metrics->reportForCurrentOp();
}
void ReshardingRecipientService::RecipientStateMachine::onReshardingFieldsChanges(
@@ -550,8 +557,8 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
ReshardingRecipientService::RecipientStateMachine::CloneDetails cloneDetails) {
_transitionToCreatingCollection(
cloneDetails, (*executor)->now() + _minimumOperationDuration, factory);
- _metricsNew->setDocumentsToCopyCounts(cloneDetails.approxDocumentsToCopy,
- cloneDetails.approxBytesToCopy);
+ _metrics->setDocumentsToCopyCounts(cloneDetails.approxDocumentsToCopy,
+ cloneDetails.approxBytesToCopy);
});
}
@@ -616,7 +623,7 @@ ReshardingRecipientService::RecipientStateMachine::_makeDataReplication(Operatio
for (const auto& donor : _donorShards) {
_applierMetricsMap.emplace(
donor.getShardId(),
- std::make_unique<ReshardingOplogApplierMetrics>(_metricsNew.get(), boost::none));
+ std::make_unique<ReshardingOplogApplierMetrics>(_metrics.get(), boost::none));
}
} else {
invariant(_applierMetricsMap.size() == _donorShards.size(),
@@ -625,7 +632,7 @@ ReshardingRecipientService::RecipientStateMachine::_makeDataReplication(Operatio
}
return _dataReplicationFactory(opCtx,
- _metricsNew.get(),
+ _metrics.get(),
&_applierMetricsMap,
_metadata,
_donorShards,
@@ -726,8 +733,8 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
.then([this, &factory] {
auto opCtx = factory.makeOperationContext(&cc());
for (const auto& donor : _donorShards) {
- auto stashNss =
- getLocalConflictStashNamespace(_metadata.getSourceUUID(), donor.getShardId());
+ auto stashNss = resharding::getLocalConflictStashNamespace(
+ _metadata.getSourceUUID(), donor.getShardId());
AutoGetCollection stashColl(opCtx.get(), stashNss, MODE_IS);
uassert(5356800,
"Resharding completed with non-empty stash collections",
@@ -846,7 +853,7 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionState(
_updateRecipientDocument(
std::move(newRecipientCtx), std::move(cloneDetails), std::move(configStartTime), factory);
- _metricsNew->onStateTransition(toMetricsState(oldState), toMetricsState(newState));
+ _metrics->onStateTransition(toMetricsState(oldState), toMetricsState(newState));
LOGV2_INFO(5279506,
"Transitioned resharding recipient state",
@@ -871,7 +878,7 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionToCreatingCol
void ReshardingRecipientService::RecipientStateMachine::_transitionToCloning(
const CancelableOperationContextFactory& factory) {
- _metricsNew->onCopyingBegin();
+ _metrics->onCopyingBegin();
auto newRecipientCtx = _recipientCtx;
newRecipientCtx.setState(RecipientStateEnum::kCloning);
_transitionState(std::move(newRecipientCtx), boost::none, boost::none, factory);
@@ -883,8 +890,8 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionToApplying(
newRecipientCtx.setState(RecipientStateEnum::kApplying);
_transitionState(std::move(newRecipientCtx), boost::none, boost::none, factory);
- _metricsNew->onCopyingEnd();
- _metricsNew->onApplyingBegin();
+ _metrics->onCopyingEnd();
+ _metrics->onApplyingBegin();
}
void ReshardingRecipientService::RecipientStateMachine::_transitionToStrictConsistency(
@@ -893,14 +900,14 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionToStrictConsi
newRecipientCtx.setState(RecipientStateEnum::kStrictConsistency);
_transitionState(std::move(newRecipientCtx), boost::none, boost::none, factory);
- _metricsNew->onApplyingEnd();
+ _metrics->onApplyingEnd();
}
void ReshardingRecipientService::RecipientStateMachine::_transitionToError(
Status abortReason, const CancelableOperationContextFactory& factory) {
auto newRecipientCtx = _recipientCtx;
newRecipientCtx.setState(RecipientStateEnum::kError);
- emplaceTruncatedAbortReasonIfExists(newRecipientCtx, abortReason);
+ resharding::emplaceTruncatedAbortReasonIfExists(newRecipientCtx, abortReason);
_transitionState(std::move(newRecipientCtx), boost::none, boost::none, factory);
}
@@ -1052,8 +1059,7 @@ void ReshardingRecipientService::RecipientStateMachine::_updateRecipientDocument
*configStartTime);
}
- buildStateDocumentMetricsForUpdate(
- setBuilder, _metricsNew.get(), newRecipientCtx.getState());
+ buildStateDocumentMetricsForUpdate(setBuilder, _metrics.get(), newRecipientCtx.getState());
setBuilder.doneFast();
}
@@ -1156,7 +1162,7 @@ void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics(
// metrics section of the recipient state document and restored during metrics
// initialization. This is so that applied oplog entries that add or remove documents do
// not affect the cloning metrics.
- _metricsNew->restoreDocumentsCopied(documentCountCopied, documentBytesCopied);
+ _metrics->restoreDocumentsCopied(documentCountCopied, documentBytesCopied);
}
}
@@ -1167,10 +1173,10 @@ void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics(
progressDocList;
for (const auto& donor : _donorShards) {
{
- AutoGetCollection oplogBufferColl(
- opCtx.get(),
- getLocalOplogBufferNamespace(_metadata.getSourceUUID(), donor.getShardId()),
- MODE_IS);
+ AutoGetCollection oplogBufferColl(opCtx.get(),
+ resharding::getLocalOplogBufferNamespace(
+ _metadata.getSourceUUID(), donor.getShardId()),
+ MODE_IS);
if (oplogBufferColl) {
oplogEntriesFetched += oplogBufferColl->numRecords(opCtx.get());
}
@@ -1208,19 +1214,19 @@ void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics(
if (!progressDoc) {
_applierMetricsMap.emplace(
shardId,
- std::make_unique<ReshardingOplogApplierMetrics>(_metricsNew.get(), boost::none));
+ std::make_unique<ReshardingOplogApplierMetrics>(_metrics.get(), boost::none));
continue;
}
- _metricsNew->accumulateFrom(*progressDoc);
+ _metrics->accumulateFrom(*progressDoc);
auto applierMetrics =
- std::make_unique<ReshardingOplogApplierMetrics>(_metricsNew.get(), progressDoc);
+ std::make_unique<ReshardingOplogApplierMetrics>(_metrics.get(), progressDoc);
_applierMetricsMap.emplace(shardId, std::move(applierMetrics));
}
- _metricsNew->restoreOplogEntriesFetched(oplogEntriesFetched);
- _metricsNew->restoreOplogEntriesApplied(oplogEntriesApplied);
+ _metrics->restoreOplogEntriesFetched(oplogEntriesFetched);
+ _metrics->restoreOplogEntriesApplied(oplogEntriesApplied);
}
CancellationToken ReshardingRecipientService::RecipientStateMachine::_initAbortSource(