diff options
Diffstat (limited to 'src/mongo/db/s/resharding/resharding_recipient_service.cpp')
-rw-r--r-- | src/mongo/db/s/resharding/resharding_recipient_service.cpp | 78 |
1 files changed, 42 insertions, 36 deletions
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp index baaf64fc5e3..5b66d19e8bd 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp @@ -120,12 +120,12 @@ using resharding_metrics::getIntervalStartFieldName; using DocT = ReshardingRecipientDocument; const auto metricsPrefix = resharding_metrics::getMetricsPrefix<DocT>(); -void buildStateDocumentCloneMetricsForUpdate(BSONObjBuilder& bob, ReshardingMetricsNew* metrics) { +void buildStateDocumentCloneMetricsForUpdate(BSONObjBuilder& bob, ReshardingMetrics* metrics) { bob.append(getIntervalStartFieldName<DocT>(ReshardingRecipientMetrics::kDocumentCopyFieldName), metrics->getCopyingBegin()); } -void buildStateDocumentApplyMetricsForUpdate(BSONObjBuilder& bob, ReshardingMetricsNew* metrics) { +void buildStateDocumentApplyMetricsForUpdate(BSONObjBuilder& bob, ReshardingMetrics* metrics) { bob.append(getIntervalEndFieldName<DocT>(ReshardingRecipientMetrics::kDocumentCopyFieldName), metrics->getCopyingEnd()); bob.append( @@ -138,14 +138,14 @@ void buildStateDocumentApplyMetricsForUpdate(BSONObjBuilder& bob, ReshardingMetr } void buildStateDocumentStrictConsistencyMetricsForUpdate(BSONObjBuilder& bob, - ReshardingMetricsNew* metrics) { + ReshardingMetrics* metrics) { bob.append( getIntervalEndFieldName<DocT>(ReshardingRecipientMetrics::kOplogApplicationFieldName), metrics->getApplyingEnd()); } void buildStateDocumentMetricsForUpdate(BSONObjBuilder& bob, - ReshardingMetricsNew* metrics, + ReshardingMetrics* metrics, RecipientStateEnum newState) { switch (newState) { case RecipientStateEnum::kCloning: @@ -162,8 +162,8 @@ void buildStateDocumentMetricsForUpdate(BSONObjBuilder& bob, } } -ReshardingMetricsNew::RecipientState toMetricsState(RecipientStateEnum state) { - return ReshardingMetricsNew::RecipientState(state); +ReshardingMetrics::RecipientState toMetricsState(RecipientStateEnum state) { + return ReshardingMetrics::RecipientState(state); } } // namespace @@ -190,7 +190,7 @@ ReshardingRecipientService::RecipientStateMachine::RecipientStateMachine( ReshardingDataReplicationFactory dataReplicationFactory) : repl::PrimaryOnlyService::TypedInstance<RecipientStateMachine>(), _recipientService{recipientService}, - _metricsNew{ReshardingMetricsNew::initializeFrom(recipientDoc, getGlobalServiceContext())}, + _metrics{ReshardingMetrics::initializeFrom(recipientDoc, getGlobalServiceContext())}, _metadata{recipientDoc.getCommonReshardingMetadata()}, _minimumOperationDuration{Milliseconds{recipientDoc.getMinimumOperationDurationMillis()}}, _recipientCtx{recipientDoc.getMutableState()}, @@ -219,7 +219,7 @@ ReshardingRecipientService::RecipientStateMachine::RecipientStateMachine( }()) { invariant(_externalState); - _metricsNew->onStateTransition(boost::none, toMetricsState(_recipientCtx.getState())); + _metrics->onStateTransition(boost::none, toMetricsState(_recipientCtx.getState())); } ExecutorFuture<void> @@ -370,7 +370,9 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_finishR if (!_isAlsoDonor) { auto opCtx = factory.makeOperationContext(&cc()); - _externalState->clearFilteringMetadata(opCtx.get()); + _externalState->clearFilteringMetadata(opCtx.get(), + _metadata.getSourceNss(), + _metadata.getTempReshardingNss()); RecoverableCriticalSectionService::get(opCtx.get()) ->releaseRecoverableCriticalSection( @@ -417,7 +419,13 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_runMand self = shared_from_this(), outerStatus = status, isCanceled = stepdownToken.isCanceled()](Status dataReplicationHaltStatus) { - _metricsNew->onStateTransition(toMetricsState(_recipientCtx.getState()), boost::none); + _metrics->onStateTransition(toMetricsState(_recipientCtx.getState()), boost::none); + + // Destroy metrics early so it's lifetime will not be tied to the lifetime of this + // state machine. This is because we have future callbacks copy shared pointers to this + // state machine that causes it to live longer than expected and potentially overlap + // with a newer instance when stepping up. + _metrics.reset(); // If the stepdownToken was triggered, it takes priority in order to make sure that // the promise is set with an error that the coordinator can retry with. If it ran into @@ -432,7 +440,6 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_runMand // replication errors because resharding is known to have failed already. stdx::lock_guard<Latch> lk(_mutex); ensureFulfilledPromise(lk, _completionPromise, outerStatus); - return outerStatus; }); } @@ -504,7 +511,7 @@ void ReshardingRecipientService::RecipientStateMachine::interrupt(Status status) boost::optional<BSONObj> ReshardingRecipientService::RecipientStateMachine::reportForCurrentOp( MongoProcessInterface::CurrentOpConnectionsMode, MongoProcessInterface::CurrentOpSessionsMode) noexcept { - return _metricsNew->reportForCurrentOp(); + return _metrics->reportForCurrentOp(); } void ReshardingRecipientService::RecipientStateMachine::onReshardingFieldsChanges( @@ -550,8 +557,8 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine:: ReshardingRecipientService::RecipientStateMachine::CloneDetails cloneDetails) { _transitionToCreatingCollection( cloneDetails, (*executor)->now() + _minimumOperationDuration, factory); - _metricsNew->setDocumentsToCopyCounts(cloneDetails.approxDocumentsToCopy, - cloneDetails.approxBytesToCopy); + _metrics->setDocumentsToCopyCounts(cloneDetails.approxDocumentsToCopy, + cloneDetails.approxBytesToCopy); }); } @@ -616,7 +623,7 @@ ReshardingRecipientService::RecipientStateMachine::_makeDataReplication(Operatio for (const auto& donor : _donorShards) { _applierMetricsMap.emplace( donor.getShardId(), - std::make_unique<ReshardingOplogApplierMetrics>(_metricsNew.get(), boost::none)); + std::make_unique<ReshardingOplogApplierMetrics>(_metrics.get(), boost::none)); } } else { invariant(_applierMetricsMap.size() == _donorShards.size(), @@ -625,7 +632,7 @@ ReshardingRecipientService::RecipientStateMachine::_makeDataReplication(Operatio } return _dataReplicationFactory(opCtx, - _metricsNew.get(), + _metrics.get(), &_applierMetricsMap, _metadata, _donorShards, @@ -726,8 +733,8 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine:: .then([this, &factory] { auto opCtx = factory.makeOperationContext(&cc()); for (const auto& donor : _donorShards) { - auto stashNss = - getLocalConflictStashNamespace(_metadata.getSourceUUID(), donor.getShardId()); + auto stashNss = resharding::getLocalConflictStashNamespace( + _metadata.getSourceUUID(), donor.getShardId()); AutoGetCollection stashColl(opCtx.get(), stashNss, MODE_IS); uassert(5356800, "Resharding completed with non-empty stash collections", @@ -846,7 +853,7 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionState( _updateRecipientDocument( std::move(newRecipientCtx), std::move(cloneDetails), std::move(configStartTime), factory); - _metricsNew->onStateTransition(toMetricsState(oldState), toMetricsState(newState)); + _metrics->onStateTransition(toMetricsState(oldState), toMetricsState(newState)); LOGV2_INFO(5279506, "Transitioned resharding recipient state", @@ -871,7 +878,7 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionToCreatingCol void ReshardingRecipientService::RecipientStateMachine::_transitionToCloning( const CancelableOperationContextFactory& factory) { - _metricsNew->onCopyingBegin(); + _metrics->onCopyingBegin(); auto newRecipientCtx = _recipientCtx; newRecipientCtx.setState(RecipientStateEnum::kCloning); _transitionState(std::move(newRecipientCtx), boost::none, boost::none, factory); @@ -883,8 +890,8 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionToApplying( newRecipientCtx.setState(RecipientStateEnum::kApplying); _transitionState(std::move(newRecipientCtx), boost::none, boost::none, factory); - _metricsNew->onCopyingEnd(); - _metricsNew->onApplyingBegin(); + _metrics->onCopyingEnd(); + _metrics->onApplyingBegin(); } void ReshardingRecipientService::RecipientStateMachine::_transitionToStrictConsistency( @@ -893,14 +900,14 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionToStrictConsi newRecipientCtx.setState(RecipientStateEnum::kStrictConsistency); _transitionState(std::move(newRecipientCtx), boost::none, boost::none, factory); - _metricsNew->onApplyingEnd(); + _metrics->onApplyingEnd(); } void ReshardingRecipientService::RecipientStateMachine::_transitionToError( Status abortReason, const CancelableOperationContextFactory& factory) { auto newRecipientCtx = _recipientCtx; newRecipientCtx.setState(RecipientStateEnum::kError); - emplaceTruncatedAbortReasonIfExists(newRecipientCtx, abortReason); + resharding::emplaceTruncatedAbortReasonIfExists(newRecipientCtx, abortReason); _transitionState(std::move(newRecipientCtx), boost::none, boost::none, factory); } @@ -1052,8 +1059,7 @@ void ReshardingRecipientService::RecipientStateMachine::_updateRecipientDocument *configStartTime); } - buildStateDocumentMetricsForUpdate( - setBuilder, _metricsNew.get(), newRecipientCtx.getState()); + buildStateDocumentMetricsForUpdate(setBuilder, _metrics.get(), newRecipientCtx.getState()); setBuilder.doneFast(); } @@ -1156,7 +1162,7 @@ void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics( // metrics section of the recipient state document and restored during metrics // initialization. This is so that applied oplog entries that add or remove documents do // not affect the cloning metrics. - _metricsNew->restoreDocumentsCopied(documentCountCopied, documentBytesCopied); + _metrics->restoreDocumentsCopied(documentCountCopied, documentBytesCopied); } } @@ -1167,10 +1173,10 @@ void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics( progressDocList; for (const auto& donor : _donorShards) { { - AutoGetCollection oplogBufferColl( - opCtx.get(), - getLocalOplogBufferNamespace(_metadata.getSourceUUID(), donor.getShardId()), - MODE_IS); + AutoGetCollection oplogBufferColl(opCtx.get(), + resharding::getLocalOplogBufferNamespace( + _metadata.getSourceUUID(), donor.getShardId()), + MODE_IS); if (oplogBufferColl) { oplogEntriesFetched += oplogBufferColl->numRecords(opCtx.get()); } @@ -1208,19 +1214,19 @@ void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics( if (!progressDoc) { _applierMetricsMap.emplace( shardId, - std::make_unique<ReshardingOplogApplierMetrics>(_metricsNew.get(), boost::none)); + std::make_unique<ReshardingOplogApplierMetrics>(_metrics.get(), boost::none)); continue; } - _metricsNew->accumulateFrom(*progressDoc); + _metrics->accumulateFrom(*progressDoc); auto applierMetrics = - std::make_unique<ReshardingOplogApplierMetrics>(_metricsNew.get(), progressDoc); + std::make_unique<ReshardingOplogApplierMetrics>(_metrics.get(), progressDoc); _applierMetricsMap.emplace(shardId, std::move(applierMetrics)); } - _metricsNew->restoreOplogEntriesFetched(oplogEntriesFetched); - _metricsNew->restoreOplogEntriesApplied(oplogEntriesApplied); + _metrics->restoreOplogEntriesFetched(oplogEntriesFetched); + _metrics->restoreOplogEntriesApplied(oplogEntriesApplied); } CancellationToken ReshardingRecipientService::RecipientStateMachine::_initAbortSource( |