diff options
Diffstat (limited to 'src/mongo/db/s/resharding/resharding_recipient_service.h')
-rw-r--r-- | src/mongo/db/s/resharding/resharding_recipient_service.h | 61 |
1 files changed, 40 insertions, 21 deletions
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h index e88d50a2330..3e9a56b4646 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.h +++ b/src/mongo/db/s/resharding/resharding_recipient_service.h @@ -32,6 +32,7 @@ #include "mongo/db/repl/primary_only_service.h" #include "mongo/db/s/resharding/recipient_document_gen.h" #include "mongo/db/s/resharding/resharding_data_replication.h" +#include "mongo/db/s/resharding/resharding_future_util.h" #include "mongo/db/s/resharding_util.h" #include "mongo/s/resharding/type_collection_fields_gen.h" #include "mongo/util/concurrency/thread_pool.h" @@ -176,56 +177,67 @@ private: // The following functions correspond to the actions to take at a particular recipient state. ExecutorFuture<void> _awaitAllDonorsPreparedToDonateThenTransitionToCreatingCollection( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, - const CancellationToken& abortToken); + const CancellationToken& abortToken, + const CancelableOperationContextFactory& factory); - void _createTemporaryReshardingCollectionThenTransitionToCloning(); + void _createTemporaryReshardingCollectionThenTransitionToCloning( + const CancelableOperationContextFactory& factory); ExecutorFuture<void> _cloneThenTransitionToApplying( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, - const CancellationToken& abortToken); + const CancellationToken& abortToken, + const CancelableOperationContextFactory& factory); ExecutorFuture<void> _awaitAllDonorsBlockingWritesThenTransitionToStrictConsistency( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, - const CancellationToken& abortToken); + const CancellationToken& abortToken, + const CancelableOperationContextFactory& factory); - void _writeStrictConsistencyOplog(); + void _writeStrictConsistencyOplog(const CancelableOperationContextFactory& factory); - void _renameTemporaryReshardingCollection(); + void _renameTemporaryReshardingCollection(const CancelableOperationContextFactory& factory); - void _cleanupReshardingCollections(bool aborted); + void _cleanupReshardingCollections(bool aborted, + const CancelableOperationContextFactory& factory); // Transitions the on-disk and in-memory state to 'newState'. - void _transitionState(RecipientStateEnum newState); + void _transitionState(RecipientStateEnum newState, + const CancelableOperationContextFactory& factory); void _transitionState(RecipientShardContext&& newRecipientCtx, boost::optional<CloneDetails>&& cloneDetails, - boost::optional<mongo::Date_t> configStartTime); + boost::optional<mongo::Date_t> configStartTime, + const CancelableOperationContextFactory& factory); // The following functions transition the on-disk and in-memory state to the named state. void _transitionToCreatingCollection(CloneDetails cloneDetails, - boost::optional<mongo::Date_t> startConfigTxnCloneTime); + boost::optional<mongo::Date_t> startConfigTxnCloneTime, + const CancelableOperationContextFactory& factory); - void _transitionToCloning(); + void _transitionToCloning(const CancelableOperationContextFactory& factory); - void _transitionToApplying(); + void _transitionToApplying(const CancelableOperationContextFactory& factory); - void _transitionToStrictConsistency(); + void _transitionToStrictConsistency(const CancelableOperationContextFactory& factory); - void _transitionToError(Status abortReason); + void _transitionToError(Status abortReason, const CancelableOperationContextFactory& factory); BSONObj _makeQueryForCoordinatorUpdate(const ShardId& shardId, RecipientStateEnum newState); ExecutorFuture<void> _updateCoordinator( - OperationContext* opCtx, const std::shared_ptr<executor::ScopedTaskExecutor>& executor); + OperationContext* opCtx, + const std::shared_ptr<executor::ScopedTaskExecutor>& executor, + const CancelableOperationContextFactory& factory); // Updates the mutable portion of the on-disk and in-memory recipient document with // 'newRecipientCtx', 'fetchTimestamp and 'donorShards'. void _updateRecipientDocument(RecipientShardContext&& newRecipientCtx, boost::optional<CloneDetails>&& cloneDetails, - boost::optional<mongo::Date_t> configStartTime); + boost::optional<mongo::Date_t> configStartTime, + const CancelableOperationContextFactory& factory); // Removes the local recipient document from disk. - void _removeRecipientDocument(bool aborted); + void _removeRecipientDocument(bool aborted, const CancelableOperationContextFactory& factory); std::unique_ptr<ReshardingDataReplicationInterface> _makeDataReplication( OperationContext* opCtx, bool cloningDone); @@ -233,14 +245,20 @@ private: void _ensureDataReplicationStarted( OperationContext* opCtx, const std::shared_ptr<executor::ScopedTaskExecutor>& executor, - const CancellationToken& abortToken); + const CancellationToken& abortToken, + const CancelableOperationContextFactory& factory); ReshardingMetrics* _metrics() const; - void _startMetrics(); + ExecutorFuture<void> _startMetrics( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor, + const CancellationToken& abortToken); // Restore metrics using the persisted metrics after stepping up. - void _restoreMetrics(); + ExecutorFuture<void> _restoreMetricsWithRetry( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor, + const CancellationToken& abortToken); + void _restoreMetrics(const CancelableOperationContextFactory& factory); // Initializes the _abortSource and generates a token from it to return back the caller. // @@ -271,7 +289,8 @@ private: // CancelableOperationContext must have a thread that is always available to it to mark its // opCtx as killed when the cancelToken has been cancelled. const std::shared_ptr<ThreadPool> _markKilledExecutor; - boost::optional<CancelableOperationContextFactory> _cancelableOpCtxFactory; + boost::optional<resharding::RetryingCancelableOperationContextFactory> + _retryingCancelableOpCtxFactory; const ReshardingDataReplicationFactory _dataReplicationFactory; SharedSemiFuture<void> _dataReplicationQuiesced; |