summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/migration_chunk_cloner_source.h
blob: 0010f12c3bbbc113aeca4d633264109131f0512d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
/**
 *    Copyright (C) 2018-present MongoDB, Inc.
 *
 *    This program is free software: you can redistribute it and/or modify
 *    it under the terms of the Server Side Public License, version 1,
 *    as published by MongoDB, Inc.
 *
 *    This program is distributed in the hope that it will be useful,
 *    but WITHOUT ANY WARRANTY; without even the implied warranty of
 *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *    Server Side Public License for more details.
 *
 *    You should have received a copy of the Server Side Public License
 *    along with this program. If not, see
 *    <http://www.mongodb.com/licensing/server-side-public-license>.
 *
 *    As a special exception, the copyright holders give permission to link the
 *    code of portions of this program with the OpenSSL library under certain
 *    conditions as described in each individual source file and distribute
 *    linked combinations including the program with the OpenSSL library. You
 *    must comply with the Server Side Public License in all respects for
 *    all of the code used other than as permitted herein. If you modify file(s)
 *    with this exception, you may extend this exception to your version of the
 *    file(s), but you are not obligated to do so. If you do not wish to do so,
 *    delete this exception statement from your version. If you delete this
 *    exception statement from all source files in the program, then also delete
 *    it in the license file.
 */

#pragma once

#include <deque>
#include <list>
#include <memory>
#include <mutex>
#include <set>

#include "mongo/bson/bsonobj.h"
#include "mongo/client/connection_string.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/query/internal_plans.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/s/migration_chunk_cloner_source.h"
#include "mongo/db/s/migration_session_id.h"
#include "mongo/db/s/session_catalog_migration_source.h"
#include "mongo/db/session/logical_session_id.h"
#include "mongo/platform/mutex.h"
#include "mongo/s/request_types/move_range_request_gen.h"
#include "mongo/s/shard_key_pattern.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/util/net/hostandport.h"
#include "mongo/util/time_support.h"

namespace mongo {

class BSONArrayBuilder;
class BSONObjBuilder;
class Collection;
class CollectionPtr;
class Database;
class RecordId;

// Overhead to prevent mods buffers from being too large
const long long kFixedCommandOverhead = 32 * 1024;

/**
 * Used to commit work for LogOpForSharding. Used to keep track of changes in documents that are
 * part of a chunk being migrated.
 */
class LogTransactionOperationsForShardingHandler final : public RecoveryUnit::Change {
public:
    LogTransactionOperationsForShardingHandler(LogicalSessionId lsid,
                                               const std::vector<repl::OplogEntry>& stmts,
                                               repl::OpTime prepareOrCommitOpTime);

    LogTransactionOperationsForShardingHandler(LogicalSessionId lsid,
                                               const std::vector<repl::ReplOperation>& stmts,
                                               repl::OpTime prepareOrCommitOpTime);

    void commit(OperationContext* opCtx, boost::optional<Timestamp>);

    void rollback(OperationContext* opCtx){};

private:
    const LogicalSessionId _lsid;
    std::vector<repl::ReplOperation> _stmts;
    const repl::OpTime _prepareOrCommitOpTime;
};

/**
 * This class is responsible for producing chunk documents to be moved from donor to a recipient
 * shard and its methods represent cloning stages. Its lifetime is owned and controlled by a single
 * migration source manager which registers it for notifications from the replication subsystem
 * before calling startClone.
 *
 * Unless explicitly indicated, the methods on this class are not thread-safe.
 *
 * The pattern of using this interface is such that one thread instantiates it and registers it so
 * it begins receiving notifications from the replication subsystem through the
 * on[insert/update/delete]Op methods. It is up to the creator to decide how these methods end up
 * being called, but currently this is done through the CollectionShardingState. The creator then
 * kicks off the cloning as soon as possible by calling startClone.
 */
class MigrationChunkClonerSource {
    MigrationChunkClonerSource(const MigrationChunkClonerSource&) = delete;
    MigrationChunkClonerSource& operator=(const MigrationChunkClonerSource&) = delete;

public:
    MigrationChunkClonerSource(OperationContext* opCtx,
                               const ShardsvrMoveRange& request,
                               const WriteConcernOptions& writeConcern,
                               const BSONObj& shardKeyPattern,
                               ConnectionString donorConnStr,
                               HostAndPort recipientHost);
    ~MigrationChunkClonerSource();

    /**
     * Blocking method, which prepares the object for serving as a source for migrations and tells
     * the recipient shard to start cloning. Before calling this method, this chunk cloner must be
     * registered for notifications from the replication subsystem (not checked here).
     *
     * NOTE: Must be called without any locks and must succeed, before any other methods are called
     * (except for cancelClone and [insert/update/delete]Op).
     */
    Status startClone(OperationContext* opCtx,
                      const UUID& migrationId,
                      const LogicalSessionId& lsid,
                      TxnNumber txnNumber);

    /**
     * Blocking method, which uses some custom selected logic for deciding whether it is appropriate
     * for the donor shard to enter critical section.
     *
     * If it returns a successful status, the caller must as soon as possible stop writes (by
     * entering critical section). On failure it may return any error. Known errors are:
     *  ExceededTimeLimit - if the maxTimeToWait was exceeded
     *
     * NOTE: Must be called without any locks.
     */
    Status awaitUntilCriticalSectionIsAppropriate(OperationContext* opCtx,
                                                  Milliseconds maxTimeToWait);

    /**
     * Tell the recipient shard to commit the documents it has cloned so far. Must be called only
     * when it has been ensured that there will be no more changes happening to documents on the
     * donor shard. If this is not observed, the recipient might miss changes and thus lose data.
     *
     * This must only be called once and no more methods on the cloner must be used afterwards
     * regardless of whether it succeeds or not.
     *
     * Returns statistics about the move. These are informational only and should not be
     * interpreted by the caller for any means other than reporting.
     *
     * NOTE: Must be called without any locks.
     */
    StatusWith<BSONObj> commitClone(OperationContext* opCtx);

    /**
     * Tells the recipient to abort the clone and cleanup any unused data. This method's
     * implementation should be idempotent and never throw.
     *
     * NOTE: Must be called without any locks.
     */
    void cancelClone(OperationContext* opCtx) noexcept;

    /**
     * Checks whether the specified document is within the bounds of the chunk, which this cloner
     * is responsible for.
     *
     * NOTE: Must be called with at least IS lock held on the collection.
     */
    bool isDocumentInMigratingChunk(const BSONObj& doc);

    /**
     * Notifies this cloner that an insert happened to the collection, which it owns. It is up to
     * the cloner's implementation to decide what to do with this information and it is valid for
     * the implementation to ignore it.
     *
     * NOTE: Must be called with at least IX lock held on the collection.
     */
    void onInsertOp(OperationContext* opCtx,
                    const BSONObj& insertedDoc,
                    const repl::OpTime& opTime);

    /**
     * Notifies this cloner that an update happened to the collection, which it owns. It is up to
     * the cloner's implementation to decide what to do with this information and it is valid for
     * the implementation to ignore it.
     *
     * NOTE: Must be called with at least IX lock held on the collection.
     */
    void onUpdateOp(OperationContext* opCtx,
                    boost::optional<BSONObj> preImageDoc,
                    const BSONObj& postImageDoc,
                    const repl::OpTime& opTime,
                    const repl::OpTime& prePostImageOpTime);

    /**
     * Notifies this cloner that a delede happened to the collection, which it owns. It is up to the
     * cloner's implementation to decide what to do with this information and it is valid for the
     * implementation to ignore it.
     *
     * NOTE: Must be called with at least IX lock held on the collection.
     */
    void onDeleteOp(OperationContext* opCtx,
                    const BSONObj& deletedDocId,
                    const repl::OpTime& opTime,
                    const repl::OpTime& preImageOpTime);

    /**
     * Returns the migration session id associated with this cloner, so stale sessions can be
     * disambiguated.
     */
    const MigrationSessionId& getSessionId() const {
        return _sessionId;
    }


    /**
     * Returns the rollback ID recorded at the beginning of session migration. If the underlying
     * SessionCatalogMigrationSource does not exist, that means this node is running as a standalone
     * and doesn't support retryable writes, so we return boost::none.
     */
    boost::optional<int> getRollbackIdAtInit() const {
        if (_sessionCatalogSource) {
            return _sessionCatalogSource->getRollbackIdAtInit();
        }
        return boost::none;
    }

    /**
     * Called by the recipient shard. Used to estimate how many more bytes of clone data are
     * remaining in the chunk cloner.
     */
    uint64_t getCloneBatchBufferAllocationSize();

    /**
     * Called by the recipient shard. Populates the passed BSONArrayBuilder with a set of documents,
     * which are part of the initial clone sequence. Assumes that there is only one active caller
     * to this method at a time (otherwise, it can cause corruption/crash).
     *
     * Returns OK status on success. If there were documents returned in the result argument, this
     * method should be called more times until the result is empty. If it returns failure, it is
     * not safe to call more methods on this class other than cancelClone.
     *
     * This method will return early if too much time is spent fetching the documents in order to
     * give a chance to the caller to perform some form of yielding. It does not free or acquire any
     * locks on its own.
     *
     * NOTE: Must be called with the collection lock held in at least IS mode.
     */
    Status nextCloneBatch(OperationContext* opCtx,
                          const CollectionPtr& collection,
                          BSONArrayBuilder* arrBuilder);

    /**
     * Called by the recipient shard. Transfers the accummulated local mods from source to
     * destination. Must not be called before all cloned objects have been fetched through calls to
     * nextCloneBatch.
     *
     * NOTE: Must be called with the collection lock held in at least IS mode.
     */
    Status nextModsBatch(OperationContext* opCtx, BSONObjBuilder* builder);

    /**
     * Appends to 'arrBuilder' oplog entries which wrote to the currently migrated chunk and contain
     * session information.
     *
     * If this function returns a valid OpTime, this means that the oplog appended are not
     * guaranteed to be majority committed and the caller has to wait for the returned opTime to be
     * majority committed before returning them to the donor shard.
     *
     * If the underlying SessionCatalogMigrationSource does not exist, that means this node is
     * running as a standalone and doesn't support retryable writes, so we return boost::none.
     *
     * This waiting is necessary because session migration is only allowed to send out committed
     * entries, as opposed to chunk migration, which can send out uncommitted documents. With chunk
     * migration, the uncommitted documents will not be visibile until the end of the migration
     * commits, which means that if it fails, they won't be visible, whereas session oplog entries
     * take effect immediately since they are appended to the chain.
     */
    boost::optional<repl::OpTime> nextSessionMigrationBatch(OperationContext* opCtx,
                                                            BSONArrayBuilder* arrBuilder);

    /**
     * Returns a notification that can be used to wait for new oplog that needs to be migrated.
     * If the value in the notification returns true, it means that there are no more new batches
     * that needs to be fetched because the migration has already entered the critical section or
     * aborted.
     *
     * Returns nullptr if there is no session migration associated with this migration.
     */
    std::shared_ptr<Notification<bool>> getNotificationForNextSessionMigrationBatch();

    const NamespaceString& nss() {
        return _args.getCommandParameter();
    }

    const BSONObj& getMin() {
        invariant(_args.getMin());
        return *_args.getMin();
    }

    const BSONObj& getMax() {
        invariant(_args.getMax());
        return *_args.getMax();
    }

    /**
     * Returns the number of session oplog entries that were not found but not sent to the
     * destination shard.
     */
    boost::optional<long long> getSessionOplogEntriesSkippedSoFarLowerBound();

    /**
     * Returns the number of session oplog entries that need to be sent to the destination shard.
     */
    boost::optional<long long> getSessionOplogEntriesToBeMigratedSoFar();

private:
    friend class LogOpForShardingHandler;
    friend class LogTransactionOperationsForShardingHandler;

    using RecordIdSet = std::set<RecordId>;

    /**
     * This is responsible for all the logic revolving around handling documents that needs to be
     * cloned.
     *
     * This class is multithread-safe.
     */
    class CloneList {
    public:
        /**
         * Simple container that increments the given counter when this is constructed and
         * decrements it when it is destroyed. User of this class is responsible for holding
         * necessary mutexes when counter is being modified.
         */
        class InProgressReadToken {
        public:
            InProgressReadToken(WithLock, CloneList& cloneList);
            InProgressReadToken(const InProgressReadToken&) = delete;
            InProgressReadToken(InProgressReadToken&&) = default;

            ~InProgressReadToken();

        private:
            CloneList& _cloneList;
        };

        /**
         * Container for a document that can be added to the nextCloneBatch call. As long as
         * instances of this object exist, it will prevent getNextDoc from prematurely returning
         * an empty response (which means there are no more docs left to clone).
         *
         * This assumes that _mutex is not being held when it is destroyed.
         */
        class DocumentInFlightWhileNotInLock {
        public:
            DocumentInFlightWhileNotInLock(std::unique_ptr<InProgressReadToken> inProgressReadToken,
                                           boost::optional<Snapshotted<BSONObj>> doc);
            DocumentInFlightWhileNotInLock(const DocumentInFlightWhileNotInLock&) = delete;
            DocumentInFlightWhileNotInLock(DocumentInFlightWhileNotInLock&&) = default;

            void setDoc(boost::optional<Snapshotted<BSONObj>> doc);
            const boost::optional<Snapshotted<BSONObj>>& getDoc();

        private:
            std::unique_ptr<InProgressReadToken> _inProgressReadToken;
            boost::optional<Snapshotted<BSONObj>> _doc;
        };

        /**
         * A variant of the DocumentInFlightWhileNotInLock where the _mutex should be held while it
         * has a document contained within it.
         */
        class DocumentInFlightWithLock {
        public:
            DocumentInFlightWithLock(WithLock, CloneList& clonerList);
            DocumentInFlightWithLock(const DocumentInFlightWithLock&) = delete;
            DocumentInFlightWithLock(DocumentInFlightWithLock&&) = default;

            void setDoc(boost::optional<Snapshotted<BSONObj>> doc);

            /**
             * Releases the contained document. Can only be called once for the entire lifetime
             * of this object.
             */
            std::unique_ptr<DocumentInFlightWhileNotInLock> release();

        private:
            std::unique_ptr<InProgressReadToken> _inProgressReadToken;
            boost::optional<Snapshotted<BSONObj>> _doc;
        };

        CloneList();

        /**
         * Overwrites the list of record ids to clone.
         */
        void populateList(RecordIdSet recordIds);

        /**
         * Returns a document to clone. If there are no more documents left to clone,
         * DocumentInFlightWhileNotInLock::getDoc will return boost::none.
         *
         * numRecordsNoLonger exists is an optional parameter that can be used to track
         * the number of recordIds encountered that refers to a document that no longer
         * exists.
         */
        std::unique_ptr<DocumentInFlightWhileNotInLock> getNextDoc(OperationContext* opCtx,
                                                                   const CollectionPtr& collection,
                                                                   int* numRecordsNoLongerExist);

        /**
         * Put back a document previously obtained from this CloneList instance to the overflow
         * pool.
         */
        void insertOverflowDoc(Snapshotted<BSONObj> doc);

        /**
         * Returns true if there are more documents to clone.
         */
        bool hasMore() const;

        /**
         * Returns the size of the populated record ids.
         */
        size_t size() const;

    private:
        /**
         * Increments the counter for inProgressReads.
         */
        void _startedOneInProgressRead(WithLock);

        /**
         * Decrements the counter for inProgressReads.
         */
        void _finishedOneInProgressRead();

        mutable Mutex _mutex = MONGO_MAKE_LATCH("MigrationChunkClonerSource::CloneList::_mutex");

        RecordIdSet _recordIds;

        // This iterator is a pointer into the _recordIds set.  It allows concurrent access to
        // the _recordIds set by allowing threads servicing _migrateClone requests to do the
        // following:
        //   1.  Acquire mutex "_mutex" above.
        //   2.  Copy *_recordIdsIter into its local stack frame.
        //   3.  Increment _recordIdsIter
        //   4.  Unlock "_mutex."
        //   5.  Do the I/O to fetch the document corresponding to this record Id.
        //
        // The purpose of this algorithm, is to allow different threads to concurrently start I/O
        // jobs in order to more fully saturate the disk.
        //
        // One issue with this algorithm, is that only 16MB worth of documents can be returned in
        // response to a _migrateClone request.  But, the thread does not know the size of a
        // document until it does the I/O.  At which point, if the document does not fit in the
        // response to _migrateClone request the document must be made available to a different
        // thread servicing a _migrateClone request. To solve this problem, the thread adds the
        // document to the below _overflowDocs deque.
        RecordIdSet::iterator _recordIdsIter;

        // This deque stores all documents that must be sent to the destination, but could not fit
        // in the response to a particular _migrateClone request.
        std::deque<Snapshotted<BSONObj>> _overflowDocs;

        // This integer represents how many documents are being "held" by threads servicing
        // _migrateClone requests. Any document that is "held" by a thread may be added to the
        // _overflowDocs deque if it doesn't fit in the response to a _migrateClone request.
        // This integer is necessary because it gives us a condition on when all documents to be
        // sent to the destination have been exhausted.
        //
        // If (_recordIdsIter == _recordIds.end() && _overflowDocs.empty() &&
        //     _inProgressReads == 0) then all documents have been returned to the destination.
        RecordIdSet::size_type _inProgressReads = 0;

        // This condition variable allows us to wait on the following condition:
        //   Either we're done and the above condition is satisfied, or there is some document to
        //   return.
        stdx::condition_variable _moreDocsCV;
    };

    // Represents the states in which the cloner can be
    enum State { kNew, kCloning, kDone };

    /**
     * Idempotent method, which cleans up any previously initialized state. It is safe to be called
     * at any time, but no methods should be called after it.
     */
    void _cleanup();

    /**
     * Synchronously invokes the recipient shard with the specified command and either returns the
     * command response (if succeeded) or the status, if the command failed.
     */
    StatusWith<BSONObj> _callRecipient(OperationContext* opCtx, const BSONObj& cmdObj);

    StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> _getIndexScanExecutor(
        OperationContext* opCtx,
        const CollectionPtr& collection,
        InternalPlanner::IndexScanOptions scanOption);

    void _nextCloneBatchFromIndexScan(OperationContext* opCtx,
                                      const CollectionPtr& collection,
                                      BSONArrayBuilder* arrBuilder);

    void _nextCloneBatchFromCloneRecordIds(OperationContext* opCtx,
                                           const CollectionPtr& collection,
                                           BSONArrayBuilder* arrBuilder);

    /**
     * Get the recordIds that belong to the chunk migrated and sort them in _cloneRecordIds (to
     * avoid seeking disk later).
     *
     * Returns OK or any error status otherwise.
     */
    Status _storeCurrentRecordId(OperationContext* opCtx);

    /**
     * Adds the OpTime to the list of OpTimes for oplog entries that we should consider migrating as
     * part of session migration.
     */
    void _addToSessionMigrationOptimeQueue(
        const repl::OpTime& opTime,
        SessionCatalogMigrationSource::EntryAtOpTimeType entryAtOpTimeType);

    void _addToSessionMigrationOptimeQueueForTransactionCommit(
        const repl::OpTime& opTime,
        SessionCatalogMigrationSource::EntryAtOpTimeType entryAtOpTimeType);

    /*
     * Appends the relevant document changes to the appropriate internal data structures (known
     * colloquially as the 'transfer mods queue'). These structures track document changes that are
     * part of a part of a chunk being migrated. In doing so, this the method also removes the
     * corresponding operation track request from the operation track requests queue.
     */
    void _addToTransferModsQueue(const BSONObj& idObj, char op, const repl::OpTime& opTime);

    /**
     * Adds an operation to the outstanding operation track requests. Returns false if the cloner
     * is no longer accepting new operation track requests.
     */
    bool _addedOperationToOutstandingOperationTrackRequests();

    /**
     * Called to indicate a request to track an operation must be filled. The operations in
     * question indicate a change to a document in the chunk being cloned. Increments a counter
     * residing inside the MigrationChunkClonerSource class.
     *
     * There should always be a one to one match from the number of calls to this function to the
     * number of calls to the corresponding decrement* function.
     *
     * NOTE: This funtion invariants that we are currently accepting new operation track requests.
     * It is up to callers of this function to make sure that will always be the case.
     */
    void _incrementOutstandingOperationTrackRequests(WithLock);

    /**
     * Called once a request to track an operation has been filled. The operations in question
     * indicate a change to a document in the chunk being cloned. Decrements a counter residing
     * inside the MigrationChunkClonerSource class.
     *
     * There should always be a one to one match from the number of calls to this function to the
     * number of calls to the corresponding increment* function.
     */
    void _decrementOutstandingOperationTrackRequests();

    /**
     * Waits for all outstanding operation track requests to be fulfilled before returning from this
     * function. Should only be used in the cleanup for this class. Should use a lock wrapped
     * around this class's mutex.
     */
    void _drainAllOutstandingOperationTrackRequests(stdx::unique_lock<Latch>& lk);

    /**
     * Sends _recvChunkStatus to the recipient shard until it receives 'steady' from the recipient,
     * an error has occurred, or a timeout is hit.
     */
    Status _checkRecipientCloningStatus(OperationContext* opCtx, Milliseconds maxTimeToWait);

    /**
     * Inspects the pre and post image document keys and determines which xferMods bucket to
     * add a new entry. Returns false if neither pre or post image document keys fall into
     * the chunk boundaries being migrated.
     */
    bool _processUpdateForXferMod(const BSONObj& preImageDocKey, const BSONObj& postImageDocKey);

    /**
     * Defer processing of update ops into xferMods entries to when nextModsBatch is called.
     */
    void _deferProcessingForXferMod(const BSONObj& preImageDocKey);

    /**
     * Converts all deferred update ops captured by the op observer into xferMods entries.
     */
    void _processDeferredXferMods(OperationContext* opCtx);

    // The original move range request
    const ShardsvrMoveRange _args;

    // The write concern associated with the move range
    const WriteConcernOptions _writeConcern;

    // The shard key associated with the namespace
    const ShardKeyPattern _shardKeyPattern;

    // The migration session id
    const MigrationSessionId _sessionId;

    // The resolved connection string of the donor shard
    const ConnectionString _donorConnStr;

    // The resolved primary of the recipient shard
    const HostAndPort _recipientHost;

    std::unique_ptr<SessionCatalogMigrationSource> _sessionCatalogSource;

    // Protects the entries below
    mutable Mutex _mutex = MONGO_MAKE_LATCH("MigrationChunkClonerSource::_mutex");

    // The current state of the cloner
    State _state{kNew};

    CloneList _cloneList;

    RecordIdSet::size_type _numRecordsCloned{0};
    RecordIdSet::size_type _numRecordsPassedOver{0};

    // The estimated average object size during the clone phase. Used for buffer size
    // pre-allocation (initial clone).
    uint64_t _averageObjectSizeForCloneRecordIds{0};

    // The estimated average object _id size during the clone phase.
    uint64_t _averageObjectIdSize{0};

    // Represents all of the requested but not yet fulfilled operations to be tracked, with regards
    // to the chunk being cloned.
    uint64_t _outstandingOperationTrackRequests{0};

    // Signals to any waiters once all unresolved operation tracking requests have completed.
    stdx::condition_variable _allOutstandingOperationTrackRequestsDrained;

    // Indicates whether new requests to track an operation are accepted.
    bool _acceptingNewOperationTrackRequests{true};

    // List of _id of documents that were modified that must be re-cloned (xfer mods)
    std::list<BSONObj> _reload;

    // Amount of upsert xfer mods that have not yet reached the recipient.
    size_t _untransferredUpsertsCounter{0};

    // List of _id of documents that were deleted during clone that should be deleted later (xfer
    // mods)
    std::list<BSONObj> _deleted;

    // Amount of delete xfer mods that have not yet reached the recipient.
    size_t _untransferredDeletesCounter{0};

    // Amount of ops that are yet to be converted to update/delete xferMods.
    size_t _deferredUntransferredOpsCounter{0};

    // Stores document keys of document that needs to be examined if we need to put in to xferMods
    // list later.
    std::vector<BSONObj> _deferredReloadOrDeletePreImageDocKeys;

    // Total bytes in _reload + _deleted (xfer mods)
    uint64_t _memoryUsed{0};

    // False if the move chunk request specified ForceJumbo::kDoNotForce, true otherwise.
    const bool _forceJumbo;
    struct JumboChunkCloneState {
        // Plan executor for collection scan used to clone docs.
        std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> clonerExec;

        // The current state of 'clonerExec'.
        PlanExecutor::ExecState clonerState;

        // Number docs in jumbo chunk cloned so far
        int docsCloned = 0;
    };

    // Set only once its discovered a chunk is jumbo
    boost::optional<JumboChunkCloneState> _jumboChunkCloneState;

protected:
    MigrationChunkClonerSource();
};

/**
 * Appends to the builder the list of documents either deleted or modified during migration.
 * Entries appended to the builder are removed from the list.
 * Returns the total size of the documents that were appended + initialSize.
 */
long long xferMods(BSONArrayBuilder* arr,
                   std::list<BSONObj>* modsList,
                   long long initialSize,
                   std::function<bool(BSONObj, BSONObj*)> extractDocToAppendFn);

}  // namespace mongo