summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/tenant_migration_recipient_service.h
blob: 22de9a00fd1a46da3a0938cea0601665df1e4a36 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
/**
 *    Copyright (C) 2020-present MongoDB, Inc.
 *
 *    This program is free software: you can redistribute it and/or modify
 *    it under the terms of the Server Side Public License, version 1,
 *    as published by MongoDB, Inc.
 *
 *    This program is distributed in the hope that it will be useful,
 *    but WITHOUT ANY WARRANTY; without even the implied warranty of
 *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *    Server Side Public License for more details.
 *
 *    You should have received a copy of the Server Side Public License
 *    along with this program. If not, see
 *    <http://www.mongodb.com/licensing/server-side-public-license>.
 *
 *    As a special exception, the copyright holders give permission to link the
 *    code of portions of this program with the OpenSSL library under certain
 *    conditions as described in each individual source file and distribute
 *    linked combinations including the program with the OpenSSL library. You
 *    must comply with the Server Side Public License in all respects for
 *    all of the code used other than as permitted herein. If you modify file(s)
 *    with this exception, you may extend this exception to your version of the
 *    file(s), but you are not obligated to do so. If you do not wish to do so,
 *    delete this exception statement from your version. If you delete this
 *    exception statement from all source files in the program, then also delete
 *    it in the license file.
 */

#pragma once

#include <boost/optional.hpp>
#include <memory>

#include "mongo/client/fetcher.h"
#include "mongo/db/pipeline/aggregate_command_gen.h"
#include "mongo/db/repl/oplog_fetcher.h"
#include "mongo/db/repl/primary_only_service.h"
#include "mongo/db/repl/tenant_all_database_cloner.h"
#include "mongo/db/repl/tenant_migration_state_machine_gen.h"
#include "mongo/db/repl/tenant_oplog_applier.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/util/time_support.h"

namespace mongo {

class DBClientConnection;
class OperationContext;
class ReplicaSetMonitor;
class ServiceContext;

namespace repl {
class OplogBufferCollection;
/**
 * TenantMigrationRecipientService is a primary only service to handle
 * data copy portion of a multitenant migration on recipient side.
 */
class TenantMigrationRecipientService final : public PrimaryOnlyService {
    // Disallows copying.
    TenantMigrationRecipientService(const TenantMigrationRecipientService&) = delete;
    TenantMigrationRecipientService& operator=(const TenantMigrationRecipientService&) = delete;

public:
    static constexpr StringData kTenantMigrationRecipientServiceName =
        "TenantMigrationRecipientService"_sd;
    static constexpr StringData kNoopMsg = "Resume token noop"_sd;

    explicit TenantMigrationRecipientService(ServiceContext* serviceContext);
    ~TenantMigrationRecipientService() = default;

    StringData getServiceName() const final;

    NamespaceString getStateDocumentsNS() const final;

    ThreadPool::Limits getThreadPoolLimits() const final;

    void checkIfConflictsWithOtherInstances(
        OperationContext* opCtx,
        BSONObj initialStateDoc,
        const std::vector<const PrimaryOnlyService::Instance*>& existingInstances) final;

    std::shared_ptr<PrimaryOnlyService::Instance> constructInstance(BSONObj initialStateDoc) final;

    /**
     * Sends an abort to all tenant migration instances on this recipient.
     */
    void abortAllMigrations(OperationContext* opCtx);

    class Instance final : public PrimaryOnlyService::TypedInstance<Instance> {
    public:
        explicit Instance(ServiceContext* serviceContext,
                          const TenantMigrationRecipientService* recipientService,
                          BSONObj stateDoc);

        SemiFuture<void> run(std::shared_ptr<executor::ScopedTaskExecutor> executor,
                             const CancellationToken& token) noexcept final;

        /*
         * Interrupts the running instance and cause the completion future to complete with
         * 'status'.
         */
        void interrupt(Status status) override;

        /*
         * Cancels the running instance but permits waiting for forgetMigration.
         */
        void cancelMigration();

        /**
         * Interrupts the migration for garbage collection.
         */
        void onReceiveRecipientForgetMigration(OperationContext* opCtx);

        /**
         * Returns a Future that will be resolved when data sync associated with this Instance has
         * completed running.
         */
        SharedSemiFuture<void> getDataSyncCompletionFuture() const {
            return _dataSyncCompletionPromise.getFuture();
        }

        /**
         * Returns a Future that will be resolved when the instance has been durably marked garbage
         * collectable.
         */
        SharedSemiFuture<void> getForgetMigrationDurableFuture() const {
            return _forgetMigrationDurablePromise.getFuture();
        }

        /**
         * Report TenantMigrationRecipientService Instances in currentOp().
         */
        boost::optional<BSONObj> reportForCurrentOp(
            MongoProcessInterface::CurrentOpConnectionsMode connMode,
            MongoProcessInterface::CurrentOpSessionsMode sessionMode) noexcept final;

        /*
         *  Returns the instance id.
         */
        const UUID& getMigrationUUID() const;

        /*
         *  Returns the tenant id (database prefix).
         */
        const std::string& getTenantId() const;

        /*
         *  Returns the migration protocol.
         */
        const MigrationProtocolEnum& getProtocol() const;

        /*
         * Returns the recipient document state.
         */
        TenantMigrationRecipientDocument getState() const;

        void checkIfOptionsConflict(const BSONObj& stateDoc) const final;

        /*
         * Blocks the thread until the tenant migration reaches consistent state in an interruptible
         * mode. Returns the donor optime at which the migration reached consistent state. Throws
         * exception on error.
         */
        OpTime waitUntilMigrationReachesConsistentState(OperationContext* opCtx) const;

        /*
         * Blocks the thread until the tenant oplog applier applied data past the
         * 'returnAfterReachingTimestamp' in an interruptible mode. If the recipient's logical clock
         * has not yet reached the 'returnAfterReachingTimestamp', advances the recipient's logical
         * clock to 'returnAfterReachingTimestamp'. Finally, stores the
         * 'returnAfterReachingTimestamp' as 'rejectReadsBeforeTimestamp' in the state
         * document and waits for the write to be replicated to every node (i.e. wait for
         * 'rejectReadsBeforeTimestamp' to be set on the TenantMigrationRecipientAccessBlocker of
         * every node) to guarantee that no reads will be incorrectly accepted.
         */
        OpTime waitUntilMigrationReachesReturnAfterReachingTimestamp(
            OperationContext* opCtx, const Timestamp& returnAfterReachingTimestamp);

        /*
         * Called when a replica set member (self, or a secondary) finishes importing donated files.
         */
        void onMemberImportedFiles(const HostAndPort& host,
                                   bool success,
                                   const boost::optional<StringData>& reason = boost::none);

        /*
         *  Set the oplog creator functor, to allow use of a mock oplog fetcher.
         */
        void setCreateOplogFetcherFn_forTest(
            std::unique_ptr<OplogFetcherFactory>&& createOplogFetcherFn) {
            _createOplogFetcherFn = std::move(createOplogFetcherFn);
        }

        /**
         * Stops the oplog applier without going through tenantForgetMigration.
         */
        void stopOplogApplier_forTest() {
            stdx::lock_guard lk(_mutex);
            _tenantOplogApplier->shutdown();
        }

        /*
         * Suppresses selecting 'host' as the donor sync source, until 'until'.
         */
        void excludeDonorHost_forTest(const HostAndPort& host, Date_t until) {
            stdx::lock_guard lk(_mutex);
            _excludeDonorHost(lk, host, until);
        }

        const auto& getExcludedDonorHosts_forTest() {
            return _excludedDonorHosts;
        }

    private:
        friend class TenantMigrationRecipientServiceTest;

        const NamespaceString _stateDocumentsNS =
            NamespaceString::kTenantMigrationRecipientsNamespace;

        using ConnectionPair =
            std::pair<std::unique_ptr<DBClientConnection>, std::unique_ptr<DBClientConnection>>;

        // Represents the instance task state.
        class TaskState {
        public:
            enum StateFlag {
                kNotStarted = 1 << 0,
                kRunning = 1 << 1,
                kInterrupted = 1 << 2,
                kDone = 1 << 3,
            };

            using StateSet = int;
            bool isSet(StateSet stateSet) const {
                return _state & stateSet;
            }

            bool checkIfValidTransition(StateFlag newState) {
                switch (_state) {
                    case kNotStarted:
                        return newState == kRunning || newState == kInterrupted ||
                            newState == kDone;
                    case kRunning:
                        return newState == kInterrupted || newState == kDone;
                    case kInterrupted:
                        return newState == kDone || newState == kRunning;
                    case kDone:
                        return false;
                }
                MONGO_UNREACHABLE;
            }

            void setState(StateFlag state, boost::optional<Status> interruptStatus = boost::none) {
                invariant(checkIfValidTransition(state),
                          str::stream() << "current state: " << toString(_state)
                                        << ", new state: " << toString(state));

                // The interruptStatus can exist (and should be non-OK) if and only if the state is
                // kInterrupted.
                invariant((state == kInterrupted && interruptStatus && !interruptStatus->isOK()) ||
                              (state != kInterrupted && !interruptStatus),
                          str::stream() << "new state: " << toString(state)
                                        << ", interruptStatus: " << interruptStatus);

                _state = state;
                _interruptStatus = (interruptStatus) ? interruptStatus.get() : _interruptStatus;
            }

            bool isNotStarted() const {
                return _state == kNotStarted;
            }

            bool isRunning() const {
                return _state == kRunning;
            }

            bool isInterrupted() const {
                return _state == kInterrupted;
            }

            bool isDone() const {
                return _state == kDone;
            }

            Status getInterruptStatus() const {
                return _interruptStatus;
            }

            std::string toString() const {
                return toString(_state);
            }

            static std::string toString(StateFlag state) {
                switch (state) {
                    case kNotStarted:
                        return "Not started";
                    case kRunning:
                        return "Running";
                    case kInterrupted:
                        return "Interrupted";
                    case kDone:
                        return "Done";
                }
                MONGO_UNREACHABLE;
            }

        private:
            // task state.
            StateFlag _state = kNotStarted;
            // task interrupt status. Set to Status::OK() only when the recipient service has not
            // been interrupted so far, and is used to remember the initial interrupt error.
            Status _interruptStatus = Status::OK();
        };

        /*
         * Helper for interrupt().
         * The _receivedForgetMigrationPromise is resolved when skipWaitingForForgetMigration is
         * set (e.g. stepDown/shutDown). And we use skipWaitingForForgetMigration=false for
         * interruptions coming from the instance's task chain itself (e.g. _oplogFetcherCallback).
         */
        void _interrupt(Status status, bool skipWaitingForForgetMigration);

        /*
         * Transitions the instance state to 'kStarted'.
         *
         * Persists the instance state doc and waits for it to be majority replicated.
         * Throws an user assertion on failure.
         */
        SemiFuture<void> _initializeStateDoc(WithLock);

        /*
         * Transitions the instance state to 'kDone' and sets the expireAt field.
         *
         * Persists the instance state doc and waits for it to be majority replicated.
         * Throws on shutdown / notPrimary errors.
         */
        SemiFuture<void> _markStateDocAsGarbageCollectable();

        /**
         * Deletes the state document. Does not return the opTime for the delete, since it's not
         * necessary to wait for this delete to be majority committed (this is one of the last steps
         * in the chain, and if the delete rolls back, the new primary will re-do the delete).
         */
        SemiFuture<void> _removeStateDoc(const CancellationToken& token);

        SemiFuture<void> _waitForGarbageCollectionDelayThenDeleteStateDoc(
            const CancellationToken& token);

        /**
         * Creates a client, connects it to the donor. If '_transientSSLParams' is not none, uses
         * the migration certificate to do SSL authentication. Otherwise, uses the default
         * authentication mode. Throws a user assertion on failure.
         *
         */
        std::unique_ptr<DBClientConnection> _connectAndAuth(const HostAndPort& serverAddress,
                                                            StringData applicationName);

        /**
         * Creates and connects both the oplog fetcher client and the client used for other
         * operations.
         */
        SemiFuture<void> _createAndConnectClients();

        /**
         * Fetches all key documents from the donor's admin.system.keys collection, stores them in
         * config.external_validation_keys, and refreshes the keys cache.
         */
        void _fetchAndStoreDonorClusterTimeKeyDocs(const CancellationToken& token);

        /**
         * Opens a backup cursor on the donor primary and fetches the
         * list of donor files to be cloned.
         */
        SemiFuture<void> _openBackupCursor(const CancellationToken& token);
        SemiFuture<void> _openBackupCursorWithRetry(const CancellationToken& token);

        /**
         * Keeps the donor backup cursor alive.
         */
        void _keepBackupCursorAlive(const CancellationToken& token);

        /**
         * Kills the Donor backup cursor
         */
        SemiFuture<void> _killBackupCursor();

        /**
         * Gets the backup cursor metadata info.
         */
        const BackupCursorInfo& _getDonorBackupCursorInfo(WithLock) const;

        /**
         * Get the oldest active multi-statement transaction optime by reading
         * config.transactions collection at given ReadTimestamp (i.e, equal to
         * startApplyingDonorOpTime) snapshot.
         */
        boost::optional<OpTime> _getOldestActiveTransactionAt(Timestamp ReadTimestamp);

        /**
         * Retrieves the start/fetch optimes from the donor and updates the in-memory/on-disk states
         * accordingly.
         */
        SemiFuture<void> _getStartOpTimesFromDonor();

        /**
         * Pushes documents from oplog fetcher to oplog buffer.
         *
         * Returns a status even though it always returns OK, to conform the interface OplogFetcher
         * expects for the EnqueueDocumentsFn.
         */
        Status _enqueueDocuments(OplogFetcher::Documents::const_iterator begin,
                                 OplogFetcher::Documents::const_iterator end,
                                 const OplogFetcher::DocumentsInfo& info);

        /**
         * Creates the oplog buffer that will be populated by donor oplog entries from the retryable
         * writes fetching stage and oplog fetching stage.
         */
        void _createOplogBuffer(WithLock, OperationContext* opCtx);

        /**
         * Runs an aggregation that gets the entire oplog chain for every retryable write entry in
         * `config.transactions`. Only returns oplog entries in the chain where
         * `ts` < `startFetchingOpTime.ts` and adds them to the oplog buffer.
         */
        SemiFuture<void> _fetchRetryableWritesOplogBeforeStartOpTime();

        /**
         * Migrates committed transactions entries into 'config.transactions'.
         */
        SemiFuture<void> _fetchCommittedTransactionsBeforeStartOpTime();

        /**
         * Opens and returns a cursor for all entries with 'lastWriteOpTime' <=
         * 'startApplyingDonorOpTime' and state 'committed'.
         */
        std::unique_ptr<DBClientCursor> _openCommittedTransactionsFindCursor();

        /**
         * Opens and returns a cursor for entries from '_makeCommittedTransactionsAggregation()'.
         */
        std::unique_ptr<DBClientCursor> _openCommittedTransactionsAggregationCursor();

        /**
         * Creates an aggregation pipeline to fetch transaction entries with 'lastWriteOpTime' <
         * 'startFetchingDonorOpTime' and 'state: committed'.
         */
        AggregateCommandRequest _makeCommittedTransactionsAggregation() const;

        /**
         * Processes a committed transaction entry from the donor. Updates the recipient's
         * 'config.transactions' collection with the entry and writes a no-op entry for the
         * recipient secondaries to replicate the entry.
         */
        void _processCommittedTransactionEntry(const BSONObj& entry);

        /**
         * Starts the tenant oplog fetcher.
         */
        void _startOplogFetcher();

        /**
         * Called when the oplog fetcher finishes.  Usually the oplog fetcher finishes only when
         * cancelled or on error.
         */
        void _oplogFetcherCallback(Status oplogFetcherStatus);

        /**
         * Returns the filter used to get only oplog documents related to the appropriate tenant.
         */
        BSONObj _getOplogFetcherFilter() const;

        /*
         * Traverse backwards through the oplog to find the optime which tenant oplog application
         * should resume from. The oplog applier should resume applying entries that have a greater
         * optime than the returned value.
         */
        OpTime _getOplogResumeApplyingDonorOptime(const OpTime& cloneFinishedRecipientOpTime) const;

        /*
         * Starts the tenant cloner.
         * Returns future that will be fulfilled when the cloner completes.
         */
        Future<void> _startTenantAllDatabaseCloner(WithLock lk);

        /*
         * Starts the tenant oplog applier.
         */
        void _startOplogApplier();

        /*
         * Waits for tenant oplog applier to stop.
         */
        SemiFuture<TenantOplogApplier::OpTimePair> _waitForOplogApplierToStop();

        /*
         * Advances the majority commit timestamp to be >= donor's backup cursor checkpoint
         * timestamp(CkptTs) by:
         * 1. Advancing the clusterTime to CkptTs.
         * 2. Writing a no-op oplog entry with ts > CkptTs
         * 3. Waiting for the majority commit timestamp to be the time of the no-op write.
         *
         * Notes: This method should be called before transitioning the instance state to
         * 'kLearnedFilenames' which causes donor collections to get imported. Current import rule
         * is that the import table's checkpoint timestamp can't be later than the recipient's
         * stable timestamp. Due to the fact, we don't have a mechanism to wait until a specific
         * stable timestamp on a given node or set of nodes in the replica set and the majority
         * commit point and stable timestamp aren't atomically updated, advancing the majority
         * commit point on the recipient before import collection stage is a best-effort attempt to
         * prevent import retry attempts on import timestamp rule violation.
         */
        SemiFuture<void> _advanceMajorityCommitTsToBkpCursorCheckpointTs(
            const CancellationToken& token);

        /*
         * Gets called when the logical/file cloner completes cloning data successfully.
         * And, it is responsible to populate the 'dataConsistentStopDonorOpTime'
         * and 'cloneFinishedRecipientOpTime' fields in the state doc.
         */
        SemiFuture<void> _onCloneSuccess();

        /*
         * Returns a future that will be fulfilled when the tenant migration reaches consistent
         * state.
         */
        SemiFuture<void> _getDataConsistentFuture();

        /*
         * Wait for the data cloned via logical cloner to be consistent.
         */
        SemiFuture<TenantOplogApplier::OpTimePair> _waitForDataToBecomeConsistent();

        /*
         * Transitions the instance state to 'kLearnedFilenames'.
         */
        SemiFuture<void> _enterLearnedFilenamesState();

        /*
         * Transitions the instance state to 'kConsistent'.
         */
        SemiFuture<void> _enterConsistentState();

        /*
         * Persists the instance state doc and waits for it to be majority replicated.
         * Throws an user assertion on failure.
         */
        SemiFuture<void> _persistConsistentState();

        /*
         * Cancels the tenant migration recipient instance task work.
         */
        void _cancelRemainingWork(WithLock lk);

        /*
         * Performs some cleanup work on sync completion, like, shutting down the components or
         * fulfilling any data-sync related instance promises.
         */
        void _cleanupOnDataSyncCompletion(Status status);

        /*
         * Suppresses selecting 'host' as the donor sync source, until 'until'.
         */
        void _excludeDonorHost(WithLock, const HostAndPort& host, Date_t until);

        /*
         * Returns a vector of currently excluded donor hosts. Also removes hosts from the list of
         * excluded donor nodes, if the exclude duration has expired.
         */
        std::vector<HostAndPort> _getExcludedDonorHosts(WithLock);

        /*
         * Makes the failpoint stop or hang the migration based on failpoint data "action" field.
         * If "action" is "hang" and 'opCtx' is not null, the failpoint will be interruptible.
         */
        void _stopOrHangOnFailPoint(FailPoint* fp, OperationContext* opCtx = nullptr);

        /**
         * Updates the state doc in the database and waits for that to be propagated to a majority.
         */
        SemiFuture<void> _updateStateDocForMajority(WithLock lk) const;

        /*
         * Returns the majority OpTime on the donor node that 'client' is connected to.
         */
        OpTime _getDonorMajorityOpTime(std::unique_ptr<mongo::DBClientConnection>& client);

        /*
         * Detects recipient FCV changes during migration.
         */
        SemiFuture<void> _checkIfFcvHasChangedSinceLastAttempt();

        /**
         * Enforces that the donor and recipient share the same featureCompatibilityVersion.
         */
        void _compareRecipientAndDonorFCV() const;

        /*
         * Sets up internal state to begin migration.
         */
        void _setup();

        SemiFuture<TenantOplogApplier::OpTimePair> _migrateUsingMTMProtocol(
            const CancellationToken& token);

        SemiFuture<TenantOplogApplier::OpTimePair> _migrateUsingShardMergeProtocol(
            const CancellationToken& token);

        mutable Mutex _mutex = MONGO_MAKE_LATCH("TenantMigrationRecipientService::_mutex");

        // All member variables are labeled with one of the following codes indicating the
        // synchronization rules for accessing them.
        //
        // (R)  Read-only in concurrent operation; no synchronization required.
        // (S)  Self-synchronizing; access according to class's own rules.
        // (M)  Reads and writes guarded by _mutex.
        // (W)  Synchronization required only for writes.

        ServiceContext* const _serviceContext;
        const TenantMigrationRecipientService* const _recipientService;  // (R) (not owned)
        std::shared_ptr<executor::ScopedTaskExecutor> _scopedExecutor;   // (M)
        TenantMigrationRecipientDocument _stateDoc;                      // (M)

        // This data is provided in the initial state doc and never changes.  We keep copies to
        // avoid having to obtain the mutex to access them.
        const std::string _tenantId;                                                     // (R)
        const MigrationProtocolEnum _protocol;                                           // (R)
        const UUID _migrationUuid;                                                       // (R)
        const std::string _donorConnectionString;                                        // (R)
        const MongoURI _donorUri;                                                        // (R)
        const ReadPreferenceSetting _readPreference;                                     // (R)
        const boost::optional<TenantMigrationPEMPayload> _recipientCertificateForDonor;  // (R)
        // TODO (SERVER-54085): Remove server parameter tenantMigrationDisableX509Auth.
        // Transient SSL params created based on the state doc if the server parameter
        // 'tenantMigrationDisableX509Auth' is false.
        const boost::optional<TransientSSLParams> _transientSSLParams = boost::none;  // (R)

        std::shared_ptr<ReplicaSetMonitor> _donorReplicaSetMonitor;  // (M)

        // Members of the donor replica set that we have excluded as a potential sync source for
        // some period of time.
        std::vector<std::pair<HostAndPort, Date_t>> _excludedDonorHosts;  // (M)

        // Because the cloners and oplog fetcher use exhaust, we need a separate connection for
        // each.  The '_client' will be used for the cloners and other operations such as fetching
        // optimes while the '_oplogFetcherClient' will be reserved for the oplog fetcher only.
        //
        // Follow DBClientCursor synchonization rules.
        std::unique_ptr<DBClientConnection> _client;              // (S)
        std::unique_ptr<DBClientConnection> _oplogFetcherClient;  // (S)

        std::unique_ptr<Fetcher> _donorFilenameBackupCursorFileFetcher;  // (M)
        CancellationSource _backupCursorKeepAliveCancellation = {};      // (X)
        boost::optional<SemiFuture<void>> _backupCursorKeepAliveFuture;  // (M)

        std::unique_ptr<OplogFetcherFactory> _createOplogFetcherFn =
            std::make_unique<CreateOplogFetcherFn>();                               // (M)
        std::unique_ptr<OplogBufferCollection> _donorOplogBuffer;                   // (M)
        std::unique_ptr<DataReplicatorExternalState> _dataReplicatorExternalState;  // (M)
        std::unique_ptr<OplogFetcher> _donorOplogFetcher;                           // (M)
        std::unique_ptr<TenantAllDatabaseCloner> _tenantAllDatabaseCloner;          // (M)
        std::shared_ptr<TenantOplogApplier> _tenantOplogApplier;                    // (M)

        // Writer pool to do storage write operation. Used by tenant collection cloner and by
        // tenant oplog applier.
        std::unique_ptr<ThreadPool> _writerPool;  //(M)
        // Data shared by cloners. Follow TenantMigrationSharedData synchronization rules.
        std::unique_ptr<TenantMigrationSharedData> _sharedData;  // (S)
        // Indicates whether the main task future continuation chain state kicked off by run().
        TaskState _taskState;  // (M)

        // Promise that is resolved when the state document is initialized and persisted.
        SharedPromise<void> _stateDocPersistedPromise;  // (W)
        // Promise that is resolved Signaled when the instance has started tenant database cloner
        // and tenant oplog fetcher.
        SharedPromise<void> _dataSyncStartedPromise;  // (W)
        // Promise that is resolved when all recipient nodes have imported all donor files.
        SharedPromise<void> _importedFilesPromise;  // (W)
        // Whether we are waiting for members to import donor files.
        bool _waitingForMembersToImportFiles = true;
        // Which members have imported all donor files.
        stdx::unordered_set<HostAndPort> _membersWhoHaveImportedFiles;
        // Promise that is resolved when the tenant data sync has reached consistent point.
        SharedPromise<OpTime> _dataConsistentPromise;  // (W)
        // Promise that is resolved when the data sync has completed.
        SharedPromise<void> _dataSyncCompletionPromise;  // (W)
        // Promise that is resolved when the recipientForgetMigration command is received or on
        // stepDown/shutDown with errors.
        SharedPromise<void> _receivedRecipientForgetMigrationPromise;  // (W)
        // Promise that is resolved when the instance has been durably marked garbage collectable
        SharedPromise<void> _forgetMigrationDurablePromise;  // (W)
        // Waiters are notified when 'tenantOplogApplier' is valid on restart.
        stdx::condition_variable _restartOplogApplierCondVar;  // (M)
        // Waiters are notified when 'tenantOplogApplier' is ready to use.
        stdx::condition_variable _oplogApplierReadyCondVar;  // (M)
        // Indicates whether 'tenantOplogApplier' is ready to use or not.
        bool _oplogApplierReady = false;  // (M)
    };

private:
    ExecutorFuture<void> _rebuildService(std::shared_ptr<executor::ScopedTaskExecutor> executor,
                                         const CancellationToken& token) override;

    ServiceContext* const _serviceContext;

    /*
     * Ensures that only one Instance is able to insert the initial state doc provided by the user,
     * into NamespaceString::kTenantMigrationRecipientsNamespace collection at a time.
     *
     * No other locks should be held when locking this. RSTl/global/db/collection locks have to be
     * taken after taking this.
     */
    Lock::ResourceMutex _stateDocInsertMutex{"TenantMigrationRecipientStateDocInsert::mutex"};
};
}  // namespace repl
}  // namespace mongo