summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/resharding/resharding_recipient_service.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/resharding/resharding_recipient_service.h')
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.h61
1 files changed, 40 insertions, 21 deletions
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h
index e88d50a2330..3e9a56b4646 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.h
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.h
@@ -32,6 +32,7 @@
#include "mongo/db/repl/primary_only_service.h"
#include "mongo/db/s/resharding/recipient_document_gen.h"
#include "mongo/db/s/resharding/resharding_data_replication.h"
+#include "mongo/db/s/resharding/resharding_future_util.h"
#include "mongo/db/s/resharding_util.h"
#include "mongo/s/resharding/type_collection_fields_gen.h"
#include "mongo/util/concurrency/thread_pool.h"
@@ -176,56 +177,67 @@ private:
// The following functions correspond to the actions to take at a particular recipient state.
ExecutorFuture<void> _awaitAllDonorsPreparedToDonateThenTransitionToCreatingCollection(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
- const CancellationToken& abortToken);
+ const CancellationToken& abortToken,
+ const CancelableOperationContextFactory& factory);
- void _createTemporaryReshardingCollectionThenTransitionToCloning();
+ void _createTemporaryReshardingCollectionThenTransitionToCloning(
+ const CancelableOperationContextFactory& factory);
ExecutorFuture<void> _cloneThenTransitionToApplying(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
- const CancellationToken& abortToken);
+ const CancellationToken& abortToken,
+ const CancelableOperationContextFactory& factory);
ExecutorFuture<void> _awaitAllDonorsBlockingWritesThenTransitionToStrictConsistency(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
- const CancellationToken& abortToken);
+ const CancellationToken& abortToken,
+ const CancelableOperationContextFactory& factory);
- void _writeStrictConsistencyOplog();
+ void _writeStrictConsistencyOplog(const CancelableOperationContextFactory& factory);
- void _renameTemporaryReshardingCollection();
+ void _renameTemporaryReshardingCollection(const CancelableOperationContextFactory& factory);
- void _cleanupReshardingCollections(bool aborted);
+ void _cleanupReshardingCollections(bool aborted,
+ const CancelableOperationContextFactory& factory);
// Transitions the on-disk and in-memory state to 'newState'.
- void _transitionState(RecipientStateEnum newState);
+ void _transitionState(RecipientStateEnum newState,
+ const CancelableOperationContextFactory& factory);
void _transitionState(RecipientShardContext&& newRecipientCtx,
boost::optional<CloneDetails>&& cloneDetails,
- boost::optional<mongo::Date_t> configStartTime);
+ boost::optional<mongo::Date_t> configStartTime,
+ const CancelableOperationContextFactory& factory);
// The following functions transition the on-disk and in-memory state to the named state.
void _transitionToCreatingCollection(CloneDetails cloneDetails,
- boost::optional<mongo::Date_t> startConfigTxnCloneTime);
+ boost::optional<mongo::Date_t> startConfigTxnCloneTime,
+ const CancelableOperationContextFactory& factory);
- void _transitionToCloning();
+ void _transitionToCloning(const CancelableOperationContextFactory& factory);
- void _transitionToApplying();
+ void _transitionToApplying(const CancelableOperationContextFactory& factory);
- void _transitionToStrictConsistency();
+ void _transitionToStrictConsistency(const CancelableOperationContextFactory& factory);
- void _transitionToError(Status abortReason);
+ void _transitionToError(Status abortReason, const CancelableOperationContextFactory& factory);
BSONObj _makeQueryForCoordinatorUpdate(const ShardId& shardId, RecipientStateEnum newState);
ExecutorFuture<void> _updateCoordinator(
- OperationContext* opCtx, const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
+ OperationContext* opCtx,
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ const CancelableOperationContextFactory& factory);
// Updates the mutable portion of the on-disk and in-memory recipient document with
// 'newRecipientCtx', 'fetchTimestamp and 'donorShards'.
void _updateRecipientDocument(RecipientShardContext&& newRecipientCtx,
boost::optional<CloneDetails>&& cloneDetails,
- boost::optional<mongo::Date_t> configStartTime);
+ boost::optional<mongo::Date_t> configStartTime,
+ const CancelableOperationContextFactory& factory);
// Removes the local recipient document from disk.
- void _removeRecipientDocument(bool aborted);
+ void _removeRecipientDocument(bool aborted, const CancelableOperationContextFactory& factory);
std::unique_ptr<ReshardingDataReplicationInterface> _makeDataReplication(
OperationContext* opCtx, bool cloningDone);
@@ -233,14 +245,20 @@ private:
void _ensureDataReplicationStarted(
OperationContext* opCtx,
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
- const CancellationToken& abortToken);
+ const CancellationToken& abortToken,
+ const CancelableOperationContextFactory& factory);
ReshardingMetrics* _metrics() const;
- void _startMetrics();
+ ExecutorFuture<void> _startMetrics(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ const CancellationToken& abortToken);
// Restore metrics using the persisted metrics after stepping up.
- void _restoreMetrics();
+ ExecutorFuture<void> _restoreMetricsWithRetry(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ const CancellationToken& abortToken);
+ void _restoreMetrics(const CancelableOperationContextFactory& factory);
// Initializes the _abortSource and generates a token from it to return back the caller.
//
@@ -271,7 +289,8 @@ private:
// CancelableOperationContext must have a thread that is always available to it to mark its
// opCtx as killed when the cancelToken has been cancelled.
const std::shared_ptr<ThreadPool> _markKilledExecutor;
- boost::optional<CancelableOperationContextFactory> _cancelableOpCtxFactory;
+ boost::optional<resharding::RetryingCancelableOperationContextFactory>
+ _retryingCancelableOpCtxFactory;
const ReshardingDataReplicationFactory _dataReplicationFactory;
SharedSemiFuture<void> _dataReplicationQuiesced;