summaryrefslogtreecommitdiff
path: root/src/mongo/db/db_raii.cpp
blob: 85cbf4db815c5a1c2c2e59ab35bb0fcb3bdd5ab5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
/**
 *    Copyright (C) 2018-present MongoDB, Inc.
 *
 *    This program is free software: you can redistribute it and/or modify
 *    it under the terms of the Server Side Public License, version 1,
 *    as published by MongoDB, Inc.
 *
 *    This program is distributed in the hope that it will be useful,
 *    but WITHOUT ANY WARRANTY; without even the implied warranty of
 *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *    Server Side Public License for more details.
 *
 *    You should have received a copy of the Server Side Public License
 *    along with this program. If not, see
 *    <http://www.mongodb.com/licensing/server-side-public-license>.
 *
 *    As a special exception, the copyright holders give permission to link the
 *    code of portions of this program with the OpenSSL library under certain
 *    conditions as described in each individual source file and distribute
 *    linked combinations including the program with the OpenSSL library. You
 *    must comply with the Server Side Public License in all respects for
 *    all of the code used other than as permitted herein. If you modify file(s)
 *    with this exception, you may extend this exception to your version of the
 *    file(s), but you are not obligated to do so. If you do not wish to do so,
 *    delete this exception statement from your version. If you delete this
 *    exception statement from all source files in the program, then also delete
 *    it in the license file.
 */

#include "mongo/db/db_raii.h"

#include "mongo/db/catalog/catalog_helper.h"
#include "mongo/db/catalog/collection_catalog.h"
#include "mongo/db/catalog/collection_uuid_mismatch.h"
#include "mongo/db/catalog/collection_yield_restore.h"
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/concurrency/locker.h"
#include "mongo/db/curop.h"
#include "mongo/db/repl/collection_utils.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/database_sharding_state.h"
#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/storage/capped_snapshots.h"
#include "mongo/db/storage/snapshot_helper.h"
#include "mongo/db/storage/storage_parameters_gen.h"
#include "mongo/logv2/log.h"

#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kStorage

namespace mongo {
namespace {

MONGO_FAIL_POINT_DEFINE(hangBeforeAutoGetCollectionLockFreeShardedStateAccess);
MONGO_FAIL_POINT_DEFINE(hangBeforeAutoGetShardVersionCheck);
MONGO_FAIL_POINT_DEFINE(reachedAutoGetLockFreeShardConsistencyRetry);

const boost::optional<int> kDoNotChangeProfilingLevel = boost::none;

// TODO (SERVER-69813): Get rid of this when ShardServerCatalogCacheLoader will be removed.
// If set to false, secondary reads should wait behind the PBW lock.
const auto allowSecondaryReadsDuringBatchApplication_DONT_USE =
    OperationContext::declareDecoration<boost::optional<bool>>();

/**
 * Performs some checks to determine whether the operation is compatible with a lock-free read.
 * Multi-doc transactions are not supported, nor are operations holding an exclusive lock.
 */
bool supportsLockFreeRead(OperationContext* opCtx) {
    // Lock-free reads are not supported in multi-document transactions.
    // Lock-free reads are not supported under an exclusive lock (nested reads under exclusive lock
    // holding operations).
    // Lock-free reads are not supported if a storage txn is already open w/o the lock-free reads
    // operation flag set.
    return !storageGlobalParams.disableLockFreeReads && !opCtx->inMultiDocumentTransaction() &&
        !opCtx->lockState()->isWriteLocked() &&
        !(opCtx->recoveryUnit()->isActive() && !opCtx->isLockFreeReadsOp());
}

/**
 * Performs validation of special locking requirements for certain namespaces.
 */
void verifyNamespaceLockingRequirements(OperationContext* opCtx,
                                        LockMode modeColl,
                                        const NamespaceString& resolvedNss) {
    // In most cases we expect modifications for system.views to upgrade MODE_IX to MODE_X before
    // taking the lock. One exception is a query by UUID of system.views in a transaction. Usual
    // queries of system.views (by name, not UUID) within a transaction are rejected. However, if
    // the query is by UUID we can't determine whether the namespace is actually system.views until
    // we take the lock here. So we have this one last assertion.
    uassert(7195700,
            "Modifications to system.views must take an exclusive lock",
            !resolvedNss.isSystemDotViews() || modeColl != MODE_IX);
}

/**
 * Returns true if 'nss' is a view. False if the view doesn't exist.
 */
bool isNssAView(OperationContext* opCtx,
                const CollectionCatalog* catalog,
                const NamespaceString& nss) {
    return catalog->lookupView(opCtx, nss).get();
}

/**
 * Returns true if 'nss' is sharded. False otherwise.
 */
bool isNssSharded(OperationContext* opCtx, const NamespaceString& nss) {
    return CollectionShardingState::acquire(opCtx, nss)
        ->getCollectionDescription(opCtx)
        .isSharded();
}

bool isNssAViewOrSharded(OperationContext* opCtx,
                         const CollectionCatalog* catalog,
                         const NamespaceString& nss) {
    auto collection = catalog->lookupCollectionByNamespace(opCtx, nss);
    bool isView = !collection && isNssAView(opCtx, catalog, nss);
    return isView || isNssSharded(opCtx, nss);
}

bool isAnyNssAViewOrSharded(OperationContext* opCtx,
                            const CollectionCatalog* catalog,
                            const std::vector<NamespaceString>& namespaces) {
    return std::any_of(namespaces.begin(), namespaces.end(), [&](auto&& nss) {
        return isNssAViewOrSharded(opCtx, catalog, nss);
    });
}

/**
 * Resolves all NamespaceStringOrUUIDs in the input vector by using the input catalog to call
 * CollectionCatalog::resolveSecondaryNamespacesOrUUIDs.
 *
 * If any of the input NamespaceStringOrUUIDs is found to correspond to a view, or to a sharded
 * collection, returns boost::none.
 *
 * Otherwise, returns a vector of NamespaceStrings that the input NamespaceStringOrUUIDs resolved
 * to.
 */
boost::optional<std::vector<NamespaceString>> resolveSecondaryNamespacesOrUUIDs(
    OperationContext* opCtx,
    const CollectionCatalog* catalog,
    const std::vector<NamespaceStringOrUUID>& secondaryNssOrUUIDs) {
    std::vector<NamespaceString> resolvedNamespaces;
    resolvedNamespaces.reserve(secondaryNssOrUUIDs.size());
    for (auto&& nssOrUUID : secondaryNssOrUUIDs) {
        auto nss = catalog->resolveNamespaceStringOrUUID(opCtx, nssOrUUID);
        resolvedNamespaces.emplace_back(nss);
    }

    auto isAnySecondaryNssShardedOrAView =
        isAnyNssAViewOrSharded(opCtx, catalog, resolvedNamespaces);

    if (isAnySecondaryNssShardedOrAView) {
        return boost::none;
    } else {
        return std::move(resolvedNamespaces);
    }
}

/*
 * Establish a capped snapshot if necessary on the provided namespace.
 */
void establishCappedSnapshotIfNeeded(OperationContext* opCtx,
                                     const std::shared_ptr<const CollectionCatalog>& catalog,
                                     const NamespaceStringOrUUID& nsOrUUID) {
    auto coll = catalog->lookupCollectionByNamespaceOrUUID(opCtx, nsOrUUID);
    if (coll && coll->usesCappedSnapshots()) {
        CappedSnapshots::get(opCtx).establish(opCtx, coll);
    }
}

bool haveAcquiredConsistentCatalogAndSnapshot(
    OperationContext* opCtx,
    const CollectionCatalog* catalogBeforeSnapshot,
    const CollectionCatalog* catalogAfterSnapshot,
    long long replTermBeforeSnapshot,
    long long replTermAfterSnapshot,
    const boost::optional<std::vector<NamespaceString>>& resolvedSecondaryNamespaces) {

    if (catalogBeforeSnapshot == catalogAfterSnapshot &&
        replTermBeforeSnapshot == replTermAfterSnapshot) {
        // At this point, we know all secondary namespaces map to the same collections/views,
        // because the catalog has not changed.
        //
        // It's still possible that some collection has become sharded since before opening the
        // snapshot, in which case we would need to retry and acquire a new snapshot, so we must
        // check for that as well.
        //
        // If some secondary namespace was already a view or sharded (i.e.
        // resolvedSecondaryNamespaces is boost::none), then we don't care whether any namespaces
        // are newly sharded, so this will be false.
        bool secondaryNamespaceBecameSharded = resolvedSecondaryNamespaces &&
            std::any_of(resolvedSecondaryNamespaces->begin(),
                        resolvedSecondaryNamespaces->end(),
                        [&](auto&& nss) { return isNssSharded(opCtx, nss); });

        // If no secondary namespace has become sharded since opening a snapshot, we have found a
        // consistent catalog and snapshot and can stop retrying.
        return !secondaryNamespaceBecameSharded;
    } else {
        return false;
    }
}

void assertReadConcernSupported(const CollectionPtr& coll,
                                const repl::ReadConcernArgs& readConcernArgs,
                                const RecoveryUnit::ReadSource& readSource) {
    const auto readConcernLevel = readConcernArgs.getLevel();
    // Ban snapshot reads on capped collections.
    uassert(ErrorCodes::SnapshotUnavailable,
            "Reading from capped collections with readConcern snapshot is not supported",
            !coll->isCapped() || readConcernLevel != repl::ReadConcernLevel::kSnapshotReadConcern);

    // Disallow snapshot reads and causal consistent majority reads on config.transactions
    // outside of transactions to avoid running the collection at a point-in-time in the middle
    // of a secondary batch. Such reads are unsafe because config.transactions updates are
    // coalesced on secondaries. Majority reads without an afterClusterTime is allowed because
    // they are allowed to return arbitrarily stale data. We allow kNoTimestamp and kLastApplied
    // reads because they must be from internal readers given the snapshot/majority readConcern
    // (e.g. for session checkout).

    if (coll->ns() == NamespaceString::kSessionTransactionsTableNamespace &&
        readSource != RecoveryUnit::ReadSource::kNoTimestamp &&
        readSource != RecoveryUnit::ReadSource::kLastApplied &&
        ((readConcernLevel == repl::ReadConcernLevel::kSnapshotReadConcern &&
          !readConcernArgs.allowTransactionTableSnapshot()) ||
         (readConcernLevel == repl::ReadConcernLevel::kMajorityReadConcern &&
          readConcernArgs.getArgsAfterClusterTime()))) {
        uasserted(5557800,
                  "Snapshot reads and causal consistent majority reads on config.transactions "
                  "are not supported");
    }
}

void checkInvariantsForReadOptions(const NamespaceString& nss,
                                   const boost::optional<LogicalTime>& afterClusterTime,
                                   const RecoveryUnit::ReadSource& readSource,
                                   const boost::optional<Timestamp>& readTimestamp,
                                   bool callerWasConflicting,
                                   bool shouldReadAtLastApplied) {
    if (readTimestamp && afterClusterTime) {
        // Readers that use afterClusterTime have already waited at a higher level for the
        // all_durable time to advance to a specified optime, and they assume the read timestamp
        // of the operation is at least that waited-for timestamp. For kNoOverlap, which is
        // the minimum of lastApplied and all_durable, this invariant ensures that
        // afterClusterTime reads do not choose a read timestamp older than the one requested.
        invariant(*readTimestamp >= afterClusterTime->asTimestamp(),
                  str::stream() << "read timestamp " << readTimestamp->toString()
                                << "was less than afterClusterTime: "
                                << afterClusterTime->asTimestamp().toString());
    }

    // This assertion protects operations from reading inconsistent data on secondaries when
    // using the default ReadSource of kNoTimestamp.

    // Reading at lastApplied on secondaries is the safest behavior and is enabled for all user
    // and DBDirectClient reads using 'local' and 'available' readConcerns. If an internal
    // operation wishes to read without a timestamp during a batch, a ShouldNotConflict can
    // suppress this fatal assertion with the following considerations:
    // * The operation is not reading replicated data in a replication state where batch
    //   application is active OR
    // * Reading inconsistent, out-of-order data is either inconsequential or required by
    //   the operation.

    // If the caller entered this function expecting to conflict with batch application
    // (i.e. no ShouldNotConflict block in scope), but they are reading without a timestamp and
    // not holding the PBWM lock, then there is a possibility that this reader may
    // unintentionally see inconsistent data during a batch. Certain namespaces are applied
    // serially in oplog application, and therefore can be safely read without taking the PBWM
    // lock or reading at a timestamp.
    if (readSource == RecoveryUnit::ReadSource::kNoTimestamp && callerWasConflicting &&
        !nss.mustBeAppliedInOwnOplogBatch() && shouldReadAtLastApplied) {
        LOGV2_FATAL(4728700,
                    "Reading from replicated collection on a secondary without read timestamp "
                    "or PBWM lock",
                    "collection"_attr = nss);
    }
}

}  // namespace

AutoStatsTracker::AutoStatsTracker(
    OperationContext* opCtx,
    const NamespaceString& nss,
    Top::LockType lockType,
    LogMode logMode,
    int dbProfilingLevel,
    Date_t deadline,
    const std::vector<NamespaceStringOrUUID>& secondaryNssOrUUIDVector)
    : _opCtx(opCtx), _lockType(lockType), _logMode(logMode) {
    // Deduplicate all namespaces for Top reporting on destruct.
    _nssSet.insert(nss);
    auto catalog = CollectionCatalog::get(opCtx);
    for (auto&& secondaryNssOrUUID : secondaryNssOrUUIDVector) {
        _nssSet.insert(catalog->resolveNamespaceStringOrUUID(opCtx, secondaryNssOrUUID));
    }

    if (_logMode == LogMode::kUpdateTop) {
        return;
    }

    stdx::lock_guard<Client> clientLock(*_opCtx->getClient());
    CurOp::get(_opCtx)->enter_inlock(nss, dbProfilingLevel);
}

AutoStatsTracker::~AutoStatsTracker() {
    if (_logMode == LogMode::kUpdateCurOp) {
        return;
    }

    // Update stats for each namespace.
    auto curOp = CurOp::get(_opCtx);
    Top::get(_opCtx->getServiceContext())
        .record(_opCtx,
                _nssSet,
                curOp->getLogicalOp(),
                _lockType,
                durationCount<Microseconds>(curOp->elapsedTimeExcludingPauses()),
                curOp->isCommand(),
                curOp->getReadWriteType());
}

AutoGetCollectionForRead::AutoGetCollectionForRead(OperationContext* opCtx,
                                                   const NamespaceStringOrUUID& nsOrUUID,
                                                   AutoGetCollection::Options options)
    : _callerWasConflicting(opCtx->lockState()->shouldConflictWithSecondaryBatchApplication()),
      _shouldNotConflictWithSecondaryBatchApplicationBlock(
          [&]() -> boost::optional<ShouldNotConflictWithSecondaryBatchApplicationBlock> {
              if (allowSecondaryReadsDuringBatchApplication_DONT_USE(opCtx).value_or(true) &&
                  opCtx->getServiceContext()->getStorageEngine()->supportsReadConcernSnapshot()) {
                  return boost::optional<ShouldNotConflictWithSecondaryBatchApplicationBlock>(
                      opCtx->lockState());
              }

              return boost::none;
          }()),
      _autoDb(AutoGetDb::createForAutoGetCollection(
          opCtx, nsOrUUID, getLockModeForQuery(opCtx, nsOrUUID.nss()), options)) {

    const auto modeColl = getLockModeForQuery(opCtx, nsOrUUID.nss());
    const auto viewMode = options._viewMode;
    const auto deadline = options._deadline;
    const auto& secondaryNssOrUUIDs = options._secondaryNssOrUUIDs;

    // Acquire the collection locks. If there's only one lock, then it can simply be taken. If
    // there are many, however, the locks must be taken in _ascending_ ResourceId order to avoid
    // deadlocks across threads.
    if (secondaryNssOrUUIDs.empty()) {
        uassertStatusOK(nsOrUUID.isNssValid());
        _collLocks.emplace_back(opCtx, nsOrUUID, modeColl, deadline);
    } else {
        catalog_helper::acquireCollectionLocksInResourceIdOrder(
            opCtx, nsOrUUID, modeColl, deadline, secondaryNssOrUUIDs, &_collLocks);
    }

    // Wait for a configured amount of time after acquiring locks if the failpoint is enabled
    catalog_helper::setAutoGetCollectionWaitFailpointExecute(
        [&](const BSONObj& data) { sleepFor(Milliseconds(data["waitForMillis"].numberInt())); });

    auto catalog = CollectionCatalog::get(opCtx);

    _resolvedNss = catalog->resolveNamespaceStringOrUUID(opCtx, nsOrUUID);

    // During batch application on secondaries, there is a potential to read inconsistent states
    // that would normally be protected by the PBWM lock. In order to serve secondary reads
    // during this period, we default to not acquiring the lock (by setting
    // _shouldNotConflictWithSecondaryBatchApplicationBlock). On primaries, we always read at a
    // consistent time, so not taking the PBWM lock is not a problem. On secondaries, we have to
    // guarantee we read at a consistent state, so we must read at the lastApplied timestamp,
    // which is set after each complete batch.

    // Once we have our locks, check whether or not we should override the ReadSource that was
    // set before acquiring locks.
    const bool shouldReadAtLastApplied =
        SnapshotHelper::changeReadSourceIfNeeded(opCtx, _resolvedNss);
    // Update readSource in case it was updated.
    const auto readSource = opCtx->recoveryUnit()->getTimestampReadSource();
    const auto readTimestamp = opCtx->recoveryUnit()->getPointInTimeReadTimestamp(opCtx);

    // Check that the collections are all safe to use. First acquire collection from our catalog
    // compatible with the specified 'readTimestamp'. Creates and places a compatible PIT collection
    // reference in the 'catalog' if needed and the collection exists at that PIT.
    _coll = CollectionPtr(catalog->establishConsistentCollection(opCtx, nsOrUUID, readTimestamp));
    _coll.makeYieldable(opCtx, LockedCollectionYieldRestore{opCtx, _coll});

    // Validate primary collection.
    checkCollectionUUIDMismatch(opCtx, _resolvedNss, _coll, options._expectedUUID);
    verifyNamespaceLockingRequirements(opCtx, modeColl, _resolvedNss);

    // Check secondary collections and verify they are valid for use.
    if (!secondaryNssOrUUIDs.empty()) {
        // Check that none of the namespaces are views or sharded collections, which are not
        // supported for secondary namespaces.
        auto resolvedSecondaryNamespaces =
            resolveSecondaryNamespacesOrUUIDs(opCtx, catalog.get(), secondaryNssOrUUIDs);
        _secondaryNssIsAViewOrSharded = !resolvedSecondaryNamespaces.has_value();

        if (!_secondaryNssIsAViewOrSharded) {
            // Ensure that the readTimestamp is compatible with the latest Collection instances or
            // create PIT instances in the 'catalog' (if the collections existed at that PIT).
            for (const auto& secondaryNssOrUUID : secondaryNssOrUUIDs) {
                auto secondaryCollectionAtPIT = catalog->establishConsistentCollection(
                    opCtx, secondaryNssOrUUID, readTimestamp);
                if (secondaryCollectionAtPIT) {
                    invariant(secondaryCollectionAtPIT->ns().dbName() == _resolvedNss.dbName());
                    verifyNamespaceLockingRequirements(
                        opCtx, MODE_IS, secondaryCollectionAtPIT->ns());
                }
            }
        }
    }

    if (_coll) {
        // Fetch and store the sharding collection description data needed for use during the
        // operation. The shardVersion will be checked later if the shard filtering metadata is
        // fetched, ensuring both that the collection description info used here and the routing
        // table are consistent with the read request's shardVersion.
        //
        // Note: sharding versioning for an operation has no concept of multiple collections.
        auto scopedCss = CollectionShardingState::acquire(opCtx, _resolvedNss);
        scopedCss->checkShardVersionOrThrow(opCtx);

        auto collDesc = scopedCss->getCollectionDescription(opCtx);
        if (collDesc.isSharded()) {
            _coll.setShardKeyPattern(collDesc.getKeyPattern());
        }

        auto readConcernArgs = repl::ReadConcernArgs::get(opCtx);
        assertReadConcernSupported(
            _coll, readConcernArgs, opCtx->recoveryUnit()->getTimestampReadSource());

        checkInvariantsForReadOptions(_coll->ns(),
                                      readConcernArgs.getArgsAfterClusterTime(),
                                      readSource,
                                      readTimestamp,
                                      _callerWasConflicting,
                                      shouldReadAtLastApplied);

        return;
    }

    // No Collection found, try and lookup view.
    const auto receivedShardVersion{
        OperationShardingState::get(opCtx).getShardVersion(_resolvedNss)};

    if (!options._expectedUUID) {
        // We only need to look up a view if an expected collection UUID was not provided. If this
        // namespace were a view, the collection UUID mismatch check would have failed above.
        if ((_view = catalog->lookupView(opCtx, _resolvedNss))) {
            uassert(ErrorCodes::CommandNotSupportedOnView,
                    str::stream() << "Taking " << _resolvedNss.toStringForErrorMsg()
                                  << " lock for timeseries is not allowed",
                    viewMode == auto_get_collection::ViewMode::kViewsPermitted ||
                        !_view->timeseries());

            uassert(ErrorCodes::CommandNotSupportedOnView,
                    str::stream() << "Namespace " << _resolvedNss.toStringForErrorMsg()
                                  << " is a view, not a collection",
                    viewMode == auto_get_collection::ViewMode::kViewsPermitted);

            uassert(StaleConfigInfo(_resolvedNss,
                                    *receivedShardVersion,
                                    ShardVersion::UNSHARDED() /* wantedVersion */,
                                    ShardingState::get(opCtx)->shardId()),
                    str::stream() << "Namespace " << _resolvedNss.toStringForErrorMsg()
                                  << " is a view therefore the shard "
                                  << "version attached to the request must be unset or UNSHARDED",
                    !receivedShardVersion || *receivedShardVersion == ShardVersion::UNSHARDED());

            return;
        }
    }

    // There is neither a collection nor a view for the namespace, so if we reached to this point
    // there are the following possibilities depending on the received shard version:
    //   1. ShardVersion::UNSHARDED: The request comes from a router and the operation entails the
    //      implicit creation of an unsharded collection. We can continue.
    //   2. ChunkVersion::IGNORED: The request comes from a router that broadcasted the same to all
    //      shards, but this shard doesn't own any chunks for the collection. We can continue.
    //   3. boost::none: The request comes from client directly connected to the shard. We can
    //      continue.
    //   4. Any other value: The request comes from a stale router on a collection or a view which
    //      was deleted time ago (or the user manually deleted it from from underneath of sharding).
    //      We return a stale config error so that the router recovers.

    uassert(StaleConfigInfo(_resolvedNss,
                            *receivedShardVersion,
                            boost::none /* wantedVersion */,
                            ShardingState::get(opCtx)->shardId()),
            str::stream() << "No metadata for namespace " << _resolvedNss.toStringForErrorMsg()
                          << " therefore the shard "
                          << "version attached to the request must be unset, UNSHARDED or IGNORED",
            !receivedShardVersion || *receivedShardVersion == ShardVersion::UNSHARDED() ||
                ShardVersion::isPlacementVersionIgnored(*receivedShardVersion));
}

const CollectionPtr& AutoGetCollectionForRead::getCollection() const {
    return _coll;
}

const ViewDefinition* AutoGetCollectionForRead::getView() const {
    return _view.get();
}

const NamespaceString& AutoGetCollectionForRead::getNss() const {
    return _resolvedNss;
}

namespace {

void openSnapshot(OperationContext* opCtx, bool isForOplogRead) {
    if (isForOplogRead) {
        opCtx->recoveryUnit()->preallocateSnapshotForOplogRead();
    } else {
        opCtx->recoveryUnit()->preallocateSnapshot();
    }
}

/**
 * Used as a return value for the following function.
 */
struct ConsistentCatalogAndSnapshot {
    std::shared_ptr<const CollectionCatalog> catalog;
    bool isAnySecondaryNamespaceAViewOrSharded;
    RecoveryUnit::ReadSource readSource;
    boost::optional<Timestamp> readTimestamp;
};

/**
 * This function is responsible for acquiring an in-memory version of the CollectionCatalog and
 * opening a snapshot such that the data contained in the in-memory CollectionCatalog matches the
 * data in the durable catalog in that snapshot.
 *
 * It is used by readers who do not have any collection locks (a.k.a lock-free readers), and so
 * there may be ongoing DDL operations concurrent with this function being called. This means we
 * must take care to handle cases where the state of the catalog changes during the course of
 * execution of this function.
 *
 * The high-level algorithm here is:
 *  * Get the latest version of the catalog
 *  * Open a snapshot
 *  * Get the latest version of the catalog and check if it changed since opening the snapshot. If
 *    it did, we need to retry, because that means that the version of the durable catalog that
 *    would be read from the snapshot would be different from the in-memory CollectionCatalog.
 *
 * Note that it is still possible for the version of the CollectionCatalog obtained to be
 * different from the durable catalog if there's a DDL operation pending commit at precisely the
 * right time. This is okay because the CollectionCatalog tracks DDL entries pending commit and
 * lock-free readers will check this state for the namespace they care about before deciding whether
 * to use an entry from the CollectionCatalog or whether to read catalog information directly from
 * the durable catalog instead.
 *
 * Also note that this retry algorithm is only necessary for readers who are not reading with a
 * timestamp. Readers at points in time in the past (e.g. readers with the kProvided ReadSource)
 * always will read directly from the durable catalog, and so it is not important for the in-memory
 * CollectionCatalog to match the durable catalog for these readers. In the future, we may want to
 * consider separating the code paths for timestamped and untimestamped writes, but for now, both
 * cases flow through this same function.
 *
 * We also check for replication state changes before and after opening a snapshot, since the
 * replication state determines the whether readers without a timestamp must read from the storage
 * engine without a timestamp or whether they should read at the last applied timestamp. If the
 * replication state changes, the opened snapshot is abandoned and the process is retried.
 */
ConsistentCatalogAndSnapshot getConsistentCatalogAndSnapshot(
    OperationContext* opCtx,
    NamespaceStringOrUUID nsOrUUID,
    const std::vector<NamespaceStringOrUUID>& secondaryNssOrUUIDs,
    const repl::ReadConcernArgs& readConcernArgs,
    bool callerExpectedToConflictWithSecondaryBatchApplication) {
    // Loop until we get a consistent catalog and snapshot or throw an exception.
    while (true) {
        // The read source used can change depending on replication state, so we must fetch the repl
        // state beforehand, to compare with afterwards.
        const auto replTermBeforeSnapshot = repl::ReplicationCoordinator::get(opCtx)->getTerm();

        const auto catalogBeforeSnapshot = CollectionCatalog::get(opCtx);

        // When a query yields it releases its snapshot, and any point-in-time instantiated
        // collections stored on the snapshot decoration are destructed. At the start of a query,
        // collections are fetched using a namespace. However, when a query is restoring from
        // yield it attempts to fetch collections by UUID. It's possible for a UUID to no longer
        // resolve to a namespace in the latest collection catalog if that collection was dropped
        // while the query was yielding. This doesn't conclude that the collection is inaccessible
        // at an earlier point-in-time as the data files may still be on disk. This namespace is
        // used to determine if the read source needs to be changed and we only do this if the
        // original read source is kNoTimestamp or kLastApplied. If it's neither of the two we can
        // safely continue.
        NamespaceString nss;
        try {
            nss = catalogBeforeSnapshot->resolveNamespaceStringOrUUID(opCtx, nsOrUUID);
        } catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) {
            invariant(nsOrUUID.uuid());

            const auto readSource = opCtx->recoveryUnit()->getTimestampReadSource();
            if (readSource == RecoveryUnit::ReadSource::kNoTimestamp ||
                readSource == RecoveryUnit::ReadSource::kLastApplied) {
                throw;
            }
        }

        // This may modify the read source on the recovery unit for opCtx if the current read source
        // is either kNoTimestamp or kLastApplied.
        const bool shouldReadAtLastApplied = SnapshotHelper::changeReadSourceIfNeeded(opCtx, nss);

        const auto resolvedSecondaryNamespaces = resolveSecondaryNamespacesOrUUIDs(
            opCtx, catalogBeforeSnapshot.get(), secondaryNssOrUUIDs);

        // If the collection requires capped snapshots (i.e. it is unreplicated, capped, not the
        // oplog, and not clustered), establish a capped snapshot. This must happen before opening
        // the storage snapshot to ensure a reader using tailable cursors would not miss any writes.
        // See comments in getCollectionFromCatalog for why it is safe to establish the capped
        // snapshot here, on the Collection object in the latest version of the catalog, even if
        // openCollection is eventually called to construct a Collection object from the durable
        // catalog.
        establishCappedSnapshotIfNeeded(opCtx, catalogBeforeSnapshot, nsOrUUID);

        openSnapshot(opCtx, nss.isOplog());

        const auto readSource = opCtx->recoveryUnit()->getTimestampReadSource();
        const auto readTimestamp = opCtx->recoveryUnit()->getPointInTimeReadTimestamp(opCtx);

        checkInvariantsForReadOptions(nss,
                                      readConcernArgs.getArgsAfterClusterTime(),
                                      readSource,
                                      readTimestamp,
                                      callerExpectedToConflictWithSecondaryBatchApplication,
                                      shouldReadAtLastApplied);

        const auto catalogAfterSnapshot = CollectionCatalog::get(opCtx);

        const auto replTermAfterSnapshot = repl::ReplicationCoordinator::get(opCtx)->getTerm();

        if (haveAcquiredConsistentCatalogAndSnapshot(opCtx,
                                                     catalogBeforeSnapshot.get(),
                                                     catalogAfterSnapshot.get(),
                                                     replTermBeforeSnapshot,
                                                     replTermAfterSnapshot,
                                                     resolvedSecondaryNamespaces)) {
            bool isAnySecondaryNssShardedOrAView = !resolvedSecondaryNamespaces.has_value();
            return {
                catalogBeforeSnapshot, isAnySecondaryNssShardedOrAView, readSource, readTimestamp};
        } else {
            opCtx->recoveryUnit()->abandonSnapshot();

            CurOp::get(opCtx)->yielded();
        }
    }
}

/**
 * Helper function to acquire a consistent catalog and storage snapshot without holding the RSTL or
 * collection locks.
 *
 * Should only be used to setup lock-free reads in a global/db context. Not safe to use for reading
 * on collections.
 *
 * Pass in boost::none as dbName to setup for a global context
 */
void acquireConsistentCatalogAndSnapshotUnsafe(OperationContext* opCtx,
                                               boost::optional<const DatabaseName&> dbName) {

    while (true) {
        // AutoGetCollectionForReadBase can choose a read source based on the current replication
        // state. Therefore we must fetch the repl state beforehand, to compare with afterwards.
        long long replTerm = repl::ReplicationCoordinator::get(opCtx)->getTerm();

        auto catalog = CollectionCatalog::get(opCtx);

        // Check that the sharding database version matches our read.
        if (dbName) {
            // Check that the sharding database version matches our read.
            DatabaseShardingState::assertMatchingDbVersion(opCtx, *dbName);
        }

        // We must open a storage snapshot consistent with the fetched in-memory Catalog instance.
        // The Catalog instance and replication state after opening a snapshot will be compared with
        // the previously acquired state. If either does not match, then this loop will retry lock
        // acquisition and read source selection until there is a match.
        opCtx->recoveryUnit()->preallocateSnapshot();

        // Verify that the catalog has not changed while we opened the storage snapshot. If the
        // catalog is unchanged, then the requested Collection is also guaranteed to be the same.
        auto newCatalog = CollectionCatalog::get(opCtx);

        if (haveAcquiredConsistentCatalogAndSnapshot(
                opCtx,
                catalog.get(),
                newCatalog.get(),
                replTerm,
                repl::ReplicationCoordinator::get(opCtx)->getTerm(),
                boost::none)) {
            CollectionCatalog::stash(opCtx, std::move(catalog));
            return;
        }

        LOGV2_DEBUG(5067701,
                    3,
                    "Retrying acquiring state for lock-free read because collection, catalog or "
                    "replication state changed.");
        opCtx->recoveryUnit()->abandonSnapshot();
    }
}

std::shared_ptr<const ViewDefinition> lookupView(
    OperationContext* opCtx,
    const std::shared_ptr<const CollectionCatalog>& catalog,
    const NamespaceString& nss,
    auto_get_collection::ViewMode viewMode) {
    auto view = catalog->lookupView(opCtx, nss);
    uassert(ErrorCodes::CommandNotSupportedOnView,
            str::stream() << "Taking " << nss.toStringForErrorMsg()
                          << " lock for timeseries is not allowed",
            !view || viewMode == auto_get_collection::ViewMode::kViewsPermitted ||
                !view->timeseries());
    uassert(ErrorCodes::CommandNotSupportedOnView,
            str::stream() << "Namespace " << nss.toStringForErrorMsg()
                          << " is a view, not a collection",
            !view || viewMode == auto_get_collection::ViewMode::kViewsPermitted);
    return view;
}

std::tuple<NamespaceString, const Collection*, std::shared_ptr<const ViewDefinition>>
getCollectionForLockFreeRead(OperationContext* opCtx,
                             const std::shared_ptr<const CollectionCatalog>& catalog,
                             boost::optional<Timestamp> readTimestamp,
                             const NamespaceStringOrUUID& nsOrUUID,
                             AutoGetCollection::Options options) {
    hangBeforeAutoGetCollectionLockFreeShardedStateAccess.executeIf(
        [&](auto&) { hangBeforeAutoGetCollectionLockFreeShardedStateAccess.pauseWhileSet(opCtx); },
        [&](const BSONObj& data) {
            return opCtx->getLogicalSessionId() &&
                opCtx->getLogicalSessionId()->getId() == UUID::fromCDR(data["lsid"].uuid());
        });


    // Returns a collection reference compatible with the specified 'readTimestamp'. Creates and
    // places a compatible PIT collection reference in the 'catalog' if needed and the collection
    // exists at that PIT.
    const Collection* coll = catalog->establishConsistentCollection(opCtx, nsOrUUID, readTimestamp);
    // Note: This call to resolveNamespaceStringOrUUID must happen after getCollectionFromCatalog
    // above, since getCollectionFromCatalog may call openCollection, which could change the result
    // of namespace resolution.
    const auto nss = catalog->resolveNamespaceStringOrUUID(opCtx, nsOrUUID);
    checkCollectionUUIDMismatch(opCtx, catalog, nss, coll, options._expectedUUID);

    std::shared_ptr<const ViewDefinition> viewDefinition =
        coll ? nullptr : lookupView(opCtx, catalog, nss, options._viewMode);
    return {nss, coll, std::move(viewDefinition)};
}

static const Lock::GlobalLockSkipOptions kLockFreeReadsGlobalLockOptions{[] {
    Lock::GlobalLockSkipOptions options;
    options.skipRSTLLock = true;
    return options;
}()};

struct CatalogStateForNamespace {
    std::shared_ptr<const CollectionCatalog> catalog;
    bool isAnySecondaryNssShardedOrAView;
    NamespaceString resolvedNss;
    const Collection* collection;
    std::shared_ptr<const ViewDefinition> view;
};

CatalogStateForNamespace acquireCatalogStateForNamespace(
    OperationContext* opCtx,
    const NamespaceStringOrUUID& nsOrUUID,
    const repl::ReadConcernArgs& readConcernArgs,
    bool callerExpectedToConflictWithSecondaryBatchApplication,
    AutoGetCollection::Options options) {

    auto [catalog, isAnySecondaryNssShardedOrAView, readSource, readTimestamp] =
        getConsistentCatalogAndSnapshot(opCtx,
                                        nsOrUUID,
                                        options._secondaryNssOrUUIDs,
                                        readConcernArgs,
                                        callerExpectedToConflictWithSecondaryBatchApplication);

    auto [resolvedNss, collection, view] =
        getCollectionForLockFreeRead(opCtx, catalog, readTimestamp, nsOrUUID, options);

    return CatalogStateForNamespace{
        catalog, isAnySecondaryNssShardedOrAView, resolvedNss, collection, view};
}

boost::optional<ShouldNotConflictWithSecondaryBatchApplicationBlock>
makeShouldNotConflictWithSecondaryBatchApplicationBlock(OperationContext* opCtx,
                                                        bool isLockFreeReadSubOperation) {
    if (!isLockFreeReadSubOperation &&
        allowSecondaryReadsDuringBatchApplication_DONT_USE(opCtx).value_or(true) &&
        opCtx->getServiceContext()->getStorageEngine()->supportsReadConcernSnapshot()) {
        return boost::optional<ShouldNotConflictWithSecondaryBatchApplicationBlock>(
            opCtx->lockState());
    } else {
        return boost::none;
    }
}

}  // namespace

CollectionPtr::RestoreFn AutoGetCollectionForReadLockFree::_makeRestoreFromYieldFn(
    const AutoGetCollection::Options& options,
    bool callerExpectedToConflictWithSecondaryBatchApplication,
    const DatabaseName& dbName) {
    return [this, options, callerExpectedToConflictWithSecondaryBatchApplication, dbName](
               OperationContext* opCtx, UUID uuid) -> const Collection* {
        auto nsOrUUID = NamespaceStringOrUUID(dbName, uuid);
        try {
            auto catalogStateForNamespace = acquireCatalogStateForNamespace(
                opCtx,
                nsOrUUID,
                repl::ReadConcernArgs::get(opCtx),
                callerExpectedToConflictWithSecondaryBatchApplication,
                options);

            _resolvedNss = catalogStateForNamespace.resolvedNss;
            _view = catalogStateForNamespace.view;
            CollectionCatalog::stash(opCtx, std::move(catalogStateForNamespace.catalog));

            return catalogStateForNamespace.collection;
        } catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) {
            // Calls to CollectionCatalog::resolveNamespaceStringOrUUID (called from
            // acquireCatalogStateForNamespace) will result in a NamespaceNotFound error if the
            // collection corresponding to the UUID passed as a parameter no longer exists. This can
            // happen if this collection was dropped while the query was yielding.
            //
            // In this case, the query subsystem expects that this CollectionPtr::RestoreFn will
            // result in a nullptr, so NamespaceNotFound errors are converted to nullptr here.
            return nullptr;
        }
    };
}

AutoGetCollectionForReadLockFree::AutoGetCollectionForReadLockFree(
    OperationContext* opCtx, NamespaceStringOrUUID nsOrUUID, AutoGetCollection::Options options)
    : _originalReadSource(opCtx->recoveryUnit()->getTimestampReadSource()),
      _isLockFreeReadSubOperation(opCtx->isLockFreeReadsOp()),  // This has to come before LFRBlock.
      _callerExpectedToConflictWithSecondaryBatchApplication(
          opCtx->lockState()->shouldConflictWithSecondaryBatchApplication()),
      _shouldNotConflictWithSecondaryBatchApplicationBlock(
          makeShouldNotConflictWithSecondaryBatchApplicationBlock(opCtx,
                                                                  _isLockFreeReadSubOperation)),
      _lockFreeReadsBlock(opCtx),
      _globalLock(opCtx,
                  MODE_IS,
                  options._deadline,
                  Lock::InterruptBehavior::kThrow,
                  kLockFreeReadsGlobalLockOptions) {

    catalog_helper::setAutoGetCollectionWaitFailpointExecute(
        [&](const BSONObj& data) { sleepFor(Milliseconds(data["waitForMillis"].numberInt())); });

    // Supported lock-free reads should only ever have an open storage snapshot prior to
    // calling this helper if it is a nested lock-free operation. The storage snapshot and
    // in-memory state used across lock=free reads must be consistent.
    invariant(supportsLockFreeRead(opCtx) &&
              (!opCtx->recoveryUnit()->isActive() || _isLockFreeReadSubOperation));

    DatabaseShardingState::assertMatchingDbVersion(opCtx, nsOrUUID.dbName());

    auto readConcernArgs = repl::ReadConcernArgs::get(opCtx);
    if (_isLockFreeReadSubOperation) {
        // If this instance is nested and lock-free, then we do not want to adjust any setting, but
        // we do need to set up the Collection reference.
        auto catalog = CollectionCatalog::get(opCtx);

        auto [resolvedNss, collection, view] =
            getCollectionForLockFreeRead(opCtx,
                                         catalog,
                                         opCtx->recoveryUnit()->getPointInTimeReadTimestamp(opCtx),
                                         nsOrUUID,
                                         options);
        _resolvedNss = resolvedNss;
        _view = view;

        if (_view) {
            _lockFreeReadsBlock.reset();
        }
        _collectionPtr = CollectionPtr(collection);
        // Nested operations should never yield as we don't yield when the global lock is held
        // recursively. But this is not known when we create the Query plan for this sub operation.
        // Pretend that we are yieldable but don't allow yield to actually be called.
        _collectionPtr.makeYieldable(opCtx, [](OperationContext*, UUID) {
            MONGO_UNREACHABLE;
            return nullptr;
        });
    } else {
        auto catalogStateForNamespace =
            acquireCatalogStateForNamespace(opCtx,
                                            nsOrUUID,
                                            readConcernArgs,
                                            _callerExpectedToConflictWithSecondaryBatchApplication,
                                            options);

        _resolvedNss = catalogStateForNamespace.resolvedNss;
        _view = catalogStateForNamespace.view;

        if (_view) {
            _lockFreeReadsBlock.reset();
        }
        CollectionCatalog::stash(opCtx, std::move(catalogStateForNamespace.catalog));
        _secondaryNssIsAViewOrSharded = catalogStateForNamespace.isAnySecondaryNssShardedOrAView;

        _collectionPtr = CollectionPtr(catalogStateForNamespace.collection);
        _collectionPtr.makeYieldable(
            opCtx,
            _makeRestoreFromYieldFn(options,
                                    _callerExpectedToConflictWithSecondaryBatchApplication,
                                    _resolvedNss.dbName()));
    }

    if (_collectionPtr) {
        assertReadConcernSupported(
            _collectionPtr, readConcernArgs, opCtx->recoveryUnit()->getTimestampReadSource());

        auto scopedCss = CollectionShardingState::acquire(opCtx, _collectionPtr->ns());
        auto collDesc = scopedCss->getCollectionDescription(opCtx);
        if (collDesc.isSharded()) {
            _collectionPtr.setShardKeyPattern(collDesc.getKeyPattern());
        }
    } else {
        invariant(!options._expectedUUID);
    }
}

AutoGetCollectionForReadMaybeLockFree::AutoGetCollectionForReadMaybeLockFree(
    OperationContext* opCtx,
    const NamespaceStringOrUUID& nsOrUUID,
    AutoGetCollection::Options options) {
    if (supportsLockFreeRead(opCtx)) {
        _autoGetLockFree.emplace(opCtx, nsOrUUID, std::move(options));
    } else {
        _autoGet.emplace(opCtx, nsOrUUID, std::move(options));
    }
}

const ViewDefinition* AutoGetCollectionForReadMaybeLockFree::getView() const {
    if (_autoGet) {
        return _autoGet->getView();
    } else {
        return _autoGetLockFree->getView();
    }
}

const NamespaceString& AutoGetCollectionForReadMaybeLockFree::getNss() const {
    if (_autoGet) {
        return _autoGet->getNss();
    } else {
        return _autoGetLockFree->getNss();
    }
}

const CollectionPtr& AutoGetCollectionForReadMaybeLockFree::getCollection() const {
    if (_autoGet) {
        return _autoGet->getCollection();
    } else {
        return _autoGetLockFree->getCollection();
    }
}

bool AutoGetCollectionForReadMaybeLockFree::isAnySecondaryNamespaceAViewOrSharded() const {
    if (_autoGet) {
        return _autoGet->isAnySecondaryNamespaceAViewOrSharded();
    } else {
        return _autoGetLockFree->isAnySecondaryNamespaceAViewOrSharded();
    }
}

template <typename AutoGetCollectionForReadType>
AutoGetCollectionForReadCommandBase<AutoGetCollectionForReadType>::
    AutoGetCollectionForReadCommandBase(OperationContext* opCtx,
                                        const NamespaceStringOrUUID& nsOrUUID,
                                        AutoGetCollection::Options options,
                                        AutoStatsTracker::LogMode logMode)
    : _autoCollForRead(opCtx, nsOrUUID, options),
      _statsTracker(opCtx,
                    _autoCollForRead.getNss(),
                    Top::LockType::ReadLocked,
                    logMode,
                    CollectionCatalog::get(opCtx)->getDatabaseProfileLevel(
                        _autoCollForRead.getNss().dbName()),
                    options._deadline,
                    options._secondaryNssOrUUIDs) {
    hangBeforeAutoGetShardVersionCheck.executeIf(
        [&](auto&) { hangBeforeAutoGetShardVersionCheck.pauseWhileSet(opCtx); },
        [&](const BSONObj& data) {
            return opCtx->getLogicalSessionId() &&
                opCtx->getLogicalSessionId()->getId() == UUID::fromCDR(data["lsid"].uuid());
        });

    if (!_autoCollForRead.getView()) {
        auto scopedCss = CollectionShardingState::acquire(opCtx, _autoCollForRead.getNss());
        scopedCss->checkShardVersionOrThrow(opCtx);
    }
}

AutoGetCollectionForReadCommandLockFree::AutoGetCollectionForReadCommandLockFree(
    OperationContext* opCtx,
    const NamespaceStringOrUUID& nsOrUUID,
    AutoGetCollection::Options options,
    AutoStatsTracker::LogMode logMode) {
    _autoCollForReadCommandBase.emplace(opCtx, nsOrUUID, options, logMode);
    auto receivedShardVersion =
        OperationShardingState::get(opCtx).getShardVersion(_autoCollForReadCommandBase->getNss());

    while (_autoCollForReadCommandBase->getCollection() &&
           _autoCollForReadCommandBase->getCollection().isSharded() && receivedShardVersion &&
           receivedShardVersion.value() == ShardVersion::UNSHARDED()) {
        reachedAutoGetLockFreeShardConsistencyRetry.executeIf(
            [&](auto&) { reachedAutoGetLockFreeShardConsistencyRetry.pauseWhileSet(opCtx); },
            [&](const BSONObj& data) {
                return opCtx->getLogicalSessionId() &&
                    opCtx->getLogicalSessionId()->getId() == UUID::fromCDR(data["lsid"].uuid());
            });

        // A request may arrive with an UNSHARDED shard version for the namespace, and then running
        // lock-free it is possible that the lock-free state finds a sharded collection but
        // subsequently the namespace was dropped and recreated UNSHARDED again, in time for the SV
        // check performed in AutoGetCollectionForReadCommandBase. We must check here whether
        // sharded state was found by the lock-free state setup, and make sure that the collection
        // state in-use matches the shard version in the request. If there is an issue, we can
        // simply retry: the scenario is very unlikely.
        //
        // It's possible for there to be no SV for the namespace in the command request. That's OK
        // because shard versioning isn't needed in that case. See SERVER-63009 for more details.
        _autoCollForReadCommandBase.emplace(opCtx, nsOrUUID, options, logMode);
        receivedShardVersion = OperationShardingState::get(opCtx).getShardVersion(
            _autoCollForReadCommandBase->getNss());
    }
}

OldClientContext::OldClientContext(OperationContext* opCtx,
                                   const NamespaceString& nss,
                                   bool doVersion)
    : _opCtx(opCtx) {
    const auto dbName = nss.dbName();
    _db = DatabaseHolder::get(opCtx)->getDb(opCtx, dbName);

    if (!_db) {
        _db = DatabaseHolder::get(opCtx)->openDb(_opCtx, dbName, &_justCreated);
        invariant(_db);
    }

    auto const currentOp = CurOp::get(_opCtx);

    if (doVersion) {
        switch (currentOp->getNetworkOp()) {
            case dbGetMore:  // getMore is special and should be handled elsewhere
            case dbUpdate:   // update & delete check shard version as part of the write executor
            case dbDelete:   // path, so no need to check them here as well
                break;
            default:
                CollectionShardingState::assertCollectionLockedAndAcquire(_opCtx, nss)
                    ->checkShardVersionOrThrow(_opCtx);
                break;
        }
    }

    stdx::lock_guard<Client> lk(*_opCtx->getClient());
    currentOp->enter_inlock(nss,
                            CollectionCatalog::get(opCtx)->getDatabaseProfileLevel(_db->name()));
}

AutoGetCollectionForReadCommandMaybeLockFree::AutoGetCollectionForReadCommandMaybeLockFree(
    OperationContext* opCtx,
    const NamespaceStringOrUUID& nsOrUUID,
    AutoGetCollection::Options options,
    AutoStatsTracker::LogMode logMode) {
    if (supportsLockFreeRead(opCtx)) {
        _autoGetLockFree.emplace(opCtx, nsOrUUID, std::move(options), logMode);
    } else {
        _autoGet.emplace(opCtx, nsOrUUID, std::move(options), logMode);
    }
}

const CollectionPtr& AutoGetCollectionForReadCommandMaybeLockFree::getCollection() const {
    if (_autoGet) {
        return _autoGet->getCollection();
    } else {
        return _autoGetLockFree->getCollection();
    }
}

const ViewDefinition* AutoGetCollectionForReadCommandMaybeLockFree::getView() const {
    if (_autoGet) {
        return _autoGet->getView();
    } else {
        return _autoGetLockFree->getView();
    }
}

const NamespaceString& AutoGetCollectionForReadCommandMaybeLockFree::getNss() const {
    if (_autoGet) {
        return _autoGet->getNss();
    } else {
        return _autoGetLockFree->getNss();
    }
}

bool AutoGetCollectionForReadCommandMaybeLockFree::isAnySecondaryNamespaceAViewOrSharded() const {
    return _autoGet ? _autoGet->isAnySecondaryNamespaceAViewOrSharded()
                    : _autoGetLockFree->isAnySecondaryNamespaceAViewOrSharded();
}

AutoReadLockFree::AutoReadLockFree(OperationContext* opCtx, Date_t deadline)
    : _lockFreeReadsBlock(opCtx),
      _globalLock(opCtx, MODE_IS, deadline, Lock::InterruptBehavior::kThrow, [] {
          Lock::GlobalLockSkipOptions options;
          options.skipRSTLLock = true;
          return options;
      }()) {

    acquireConsistentCatalogAndSnapshotUnsafe(opCtx, /*dbName*/ boost::none);
}

AutoGetDbForReadLockFree::AutoGetDbForReadLockFree(OperationContext* opCtx,
                                                   const DatabaseName& dbName,
                                                   Date_t deadline)
    : _lockFreeReadsBlock(opCtx),
      _globalLock(opCtx, MODE_IS, deadline, Lock::InterruptBehavior::kThrow, [] {
          Lock::GlobalLockSkipOptions options;
          options.skipRSTLLock = true;
          return options;
      }()) {

    acquireConsistentCatalogAndSnapshotUnsafe(opCtx, dbName);
}

AutoGetDbForReadMaybeLockFree::AutoGetDbForReadMaybeLockFree(OperationContext* opCtx,
                                                             const DatabaseName& dbName,
                                                             Date_t deadline) {
    if (supportsLockFreeRead(opCtx)) {
        _autoGetLockFree.emplace(opCtx, dbName, deadline);
    } else {
        _autoGet.emplace(opCtx, dbName, MODE_IS, deadline);
    }
}

OldClientContext::~OldClientContext() {
    // If in an interrupt, don't record any stats.
    // It is possible to have no lock after saving the lock state and being interrupted while
    // waiting to restore.
    if (_opCtx->getKillStatus() != ErrorCodes::OK)
        return;

    invariant(_opCtx->lockState()->isLocked());
    auto currentOp = CurOp::get(_opCtx);
    Top::get(_opCtx->getClient()->getServiceContext())
        .record(_opCtx,
                currentOp->getNS(),
                currentOp->getLogicalOp(),
                _opCtx->lockState()->isWriteLocked() ? Top::LockType::WriteLocked
                                                     : Top::LockType::ReadLocked,
                _timer.micros(),
                currentOp->isCommand(),
                currentOp->getReadWriteType());
}

LockMode getLockModeForQuery(OperationContext* opCtx, const boost::optional<NamespaceString>& nss) {
    invariant(opCtx);

    // Use IX locks for multi-statement transactions; otherwise, use IS locks.
    if (opCtx->inMultiDocumentTransaction()) {
        uassert(51071,
                "Cannot query system.views within a transaction",
                !nss || !nss->isSystemDotViews());
        return MODE_IX;
    }
    return MODE_IS;
}

BlockSecondaryReadsDuringBatchApplication_DONT_USE::
    BlockSecondaryReadsDuringBatchApplication_DONT_USE(OperationContext* opCtx)
    : _opCtx(opCtx) {
    auto allowSecondaryReads = &allowSecondaryReadsDuringBatchApplication_DONT_USE(opCtx);
    allowSecondaryReads->swap(_originalSettings);
    *allowSecondaryReads = false;
}

BlockSecondaryReadsDuringBatchApplication_DONT_USE::
    ~BlockSecondaryReadsDuringBatchApplication_DONT_USE() {
    auto allowSecondaryReads = &allowSecondaryReadsDuringBatchApplication_DONT_USE(_opCtx);
    allowSecondaryReads->swap(_originalSettings);
}

template class AutoGetCollectionForReadCommandBase<AutoGetCollectionForRead>;
template class AutoGetCollectionForReadCommandBase<AutoGetCollectionForReadLockFree>;

}  // namespace mongo