summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/README.md
blob: a3ffc9afad9bfa5f9be19f27add03635dd46d98e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
# Replication Internals

Replication is the set of systems used to continuously copy data from a primary server to secondary
servers so if the primary server fails a secondary server can take over soon. This process is
intended to be mostly transparent to the user, with drivers taking care of routing queries to the
requested replica. Replication in MongoDB is facilitated through [**replica
sets**](https://docs.mongodb.com/manual/replication/).

Replica sets are a group of nodes with one primary and multiple secondaries. The primary is
responsible for all writes. Users may specify that reads from secondaries are acceptable with a
`slaveOK` flag, but they are not by default.

# Steady State Replication

The normal running of a replica set is referred to as steady state replication. This is when there
is one primary and multiple secondaries. Each secondary is replicating data from the primary, or
another secondary off of which it is **chaining**.

## Life as a Primary

### Doing a Write

When a user does a write, all a primary node does is apply the write to the database like a
standalone would. The one difference from a standalone write is that replica set nodes have an
`OpObserver` that inserts a document to the **oplog** whenever a write to the database happens,
describing the write. The oplog is a capped collection called `oplog.rs` in the `local` database.
There are a few optimizations made for it in WiredTiger, and it is the only collection that doesn't
include an _id field.

If a write does multiple operations, each will have its own oplog entry; for example, inserts with
implicit collection creation create two oplog entries, one for the `create` and one for the
`insert`.

These entries are rewritten from the initial operation to make them idempotent; for example, updates
with `$inc` are changed to use `$set`.

Secondaries drive oplog replication via a pull process.

Writes can also specify a [**write
concern**](https://docs.mongodb.com/manual/reference/write-concern/). If a command includes a write
concern, the command will just block in its own thread until the oplog entries it generates have
been replicated to the requested number of nodes. The primary keeps track of how up-to-date the
secondaries are to know when to return. A write concern can specify a number of nodes to wait for,
or **majority**. If **majority** is specified, the write waits for that write to be in the
**committed snapshot** as well, so that it can be read with `readConcern: { level: majority }`
reads. (If this last sentence made no sense, come back to it at the end).

## Life as a Secondary

In general, secondaries just choose a node to sync from, their **sync source**, and then pull
operations from its oplog and apply those oplog entries to their own copy of the data on disk.

Secondaries also constantly update their sync source with their progress so that the primary can
satisfy write concerns.

### Oplog Fetching

A secondary keeps its data synchronized with its sync source by fetching oplog entries from its sync
source. This is done via the
[`OplogFetcher`](https://github.com/mongodb/mongo/blob/929cd5af6623bb72f05d3364942e84d053ddea0d/src/mongo/db/repl/oplog_fetcher.h).

The `OplogFetcher` first creates a connection to the sync source. Through this connection, it will
establish an **exhaust cursor** to fetch oplog entries. This means that after the initial `find` and
`getMore` are sent, the sync source will keep sending all subsequent batches without needing the
fetching node to run any additional `getMore`s.

Let’s refer to the sync source as node A and the fetching node as node B.

The `find` command that B’s `OplogFetcher` first sends to sync source A has a greater than or equal
predicate on the timestamp of the last oplog entry it has fetched. The original `find` command
should always return at least 1 document due to the greater than or equal predicate. If it does not,
that means that the A’s oplog is behind B's and thus A should not be B’s sync source. If it does
return a non-empty batch, but the first document returned does not match the last entry in B’s
oplog, that means that B's oplog has diverged from A's and it should go into
[**ROLLBACK**](https://docs.mongodb.com/manual/core/replica-set-rollbacks/).

After getting the original `find` response, secondaries check the metadata that accompanies the
response to see if the sync source is still a good sync source. Secondaries check that the node has
not rolled back since it was chosen and that it is still ahead of them.

The `OplogFetcher` specifies `awaitData: true, tailable: true` on the cursor so that subsequent
batches block until their `maxTimeMS` expires waiting for more data instead of returning
immediately. If there is no data to return at the end of `maxTimeMS`, the `OplogFetcher` receives an
empty batch and will wait on the next batch.

If the `OplogFetcher` encounters any errors while trying to connect to the sync source or get a
batch, it will use `OplogFetcherRestartDecision` to check that it has enough retries left to create
a new cursor. The connection class will automatically handle reconnecting to the sync source when
needed. Whenever the `OplogFetcher` successfully receives a batch, it will reset its retries. If it
errors enough times in a row to exhaust its retries, that might be an indication that there is
something wrong with the connection or the sync source. In that case, the `OplogFetcher` will shut
down with an error status.

The `OplogFetcher` is owned by the
[`BackgroundSync`](https://github.com/mongodb/mongo/blob/r4.2.0/src/mongo/db/repl/bgsync.h) thread.
The `BackgroundSync` thread runs continuously while a node is in `SECONDARY` state. `BackgroundSync`
sits in a loop, where each iteration it first chooses a sync source with the `SyncSourceResolver`
and then starts up the `OplogFetcher`. When the `OplogFetcher` terminates, `BackgroundSync` restarts
sync source selection, exits, or goes into ROLLBACK depending on the return status. The
`OplogFetcher` could terminate because the first batch implies that a rollback is required, it could
receive an error from the sync source, or it could just be shut down by its owner, such as when
`BackgroundSync` itself is shut down.

The `OplogFetcher` does not directly apply the operations it retrieves from the sync source. Rather,
it puts them into a buffer (the **`OplogBuffer`**) and another thread is in charge of taking the
operations off the buffer and applying them. That buffer uses an in-memory blocking queue for steady
state replication; there is a similar collection-backed buffer used for initial sync.

### Sync Source Selection

Whenever a node starts initial sync, creates a new `BackgroundSync` (when it stops being primary),
or errors on its current `OplogFetcher`, it must get a new sync source. Sync source selection is
done by the
[`SyncSourceResolver`](https://github.com/mongodb/mongo/blob/r4.2.0/src/mongo/db/repl/sync_source_resolver.h).

The `SyncSourceResolver` delegates the duty of choosing a "sync source candidate" to the
[**`ReplicationCoordinator`**](https://github.com/mongodb/mongo/blob/r4.2.0/src/mongo/db/repl/replication_coordinator.h),
which in turn asks the
[**`TopologyCoordinator`**](https://github.com/mongodb/mongo/blob/r4.2.0/src/mongo/db/repl/topology_coordinator.h)
to choose a new sync source.

#### Choosing a sync source candidate

To choose a new sync source candidate, the `TopologyCoordinator` first checks if the user requested
a specific sync source with the `replSetSyncFrom` command. In that case, the secondary chooses that
host as the sync source and resets its state so that it doesn’t use that requested sync source
again.

If **chaining** is disallowed, the secondary needs to sync from the primary, and chooses it as a
candidate.

Otherwise, it iterates through all of the nodes and sees which one is the best.

* First the secondary checks the `TopologyCoordinator`'s cached view of the replica set for the
  latest OpTime known to be on the primary. Secondaries do not sync from nodes whose newest oplog
  entry is more than
  [`maxSyncSourceLagSecs`](https://github.com/mongodb/mongo/blob/r4.2.0/src/mongo/db/repl/topology_coordinator.cpp#L302-L315)
  seconds behind the primary's newest oplog entry.
* Secondaries then loop through each node and choose the closest node that satisfies [various
  criteria](https://github.com/mongodb/mongo/blob/r4.2.0/src/mongo/db/repl/topology_coordinator.cpp#L200-L438).
  “Closest” here is determined by the lowest ping time to each node.
* If no node satisfies the necessary criteria, then the `BackgroundSync` waits 1 second and restarts
  the sync source selection process.

#### Sync Source Probing

After choosing a sync source candidate, the `SyncSourceResolver` probes the sync source candidate to
make sure it actually is able to fetch from the sync source candidate’s oplog.

* If the sync source candidate has no oplog or there is an error, the secondary blacklists that sync
  source for some time and then tries to find a new sync source candidate.
* If the oldest entry in the sync source candidate's oplog is newer than the node's newest entry,
  then the node blacklists that sync source candidate as well because the candidate is too far
  ahead.
* During initial sync, rollback, or recovery from unclean shutdown, nodes will set a specific
  OpTime, **`minValid`**, that they must reach before it is safe to read from the node and before
  the node can transition into `SECONDARY` state. If the secondary has a `minValid`, then the sync
  source candidate is checked for that `minValid` entry.
* The sync source's **RollbackID** is also fetched to be checked after the first batch is returned
  by the `OplogFetcher`.

If the secondary is too far behind all possible sync source candidates then it goes into maintenance
mode and waits for manual intervention (likely a call to `resync`). If no viable candidates were
found, `BackgroundSync` waits 1 second and attempts the entire sync source selection process again.
Otherwise, the secondary found a sync source! At that point `BackgroundSync` starts an OplogFetcher.

### Oplog Entry Application

A separate thread, `RSDataSync` is used for pulling oplog entries off of the oplog buffer and
applying them. `RSDataSync` constructs a `SyncTail` in a loop which is used to actually apply the
operations. The `SyncTail` instance does some oplog application, and terminates when there is a state
change where we need to pause oplog application. After it terminates, `RSDataSync` loops back and
decides if it should make a new `SyncTail` and continue.

`SyncTail` creates multiple threads that apply buffered oplog entries in parallel. Operations are
pulled off of the oplog buffer in batches to be applied. Nodes keep track of their “last applied
OpTime”, which is only moved forward at the end of a batch. Oplog entries within the same batch are
not necessarily applied in order. Operations on a document must be atomic and ordered, so operations
on the same document will be put on the same thread to be serialized. Additionally, command
operations are done serially in batches of size 1. Insert operations are also batched together for
improved performance.

## Replication and Topology Coordinators

The `ReplicationCoordinator` is the public api that replication presents to the rest of the code
base. It is in charge of coordinating the interaction of replication with the rest of the system.

The `ReplicationCoordinator` communicates with the storage layer and other nodes through the
[`ReplicationCoordinatorExternalState`](https://github.com/mongodb/mongo/blob/r4.2.0/src/mongo/db/repl/replication_coordinator_external_state.h).
The external state also manages and owns all of the replication threads.

The `TopologyCoordinator` is in charge of maintaining state about the topology of the cluster. On
significant changes (anything that affects the response to isMaster), the TopologyCoordinator updates
its TopologyVersion. The [`isMaster`](https://github.com/mongodb/mongo/blob/r4.3.3/src/mongo/db/repl/replication_info.cpp#L258) command awaits changes in the TopologyVersion before returning.

The `TopologyCoordinator` is non-blocking and does a large amount of a node's decision making
surrounding replication. Most replication command requests and responses are filled in here.

Both coordinators maintain views of the entire cluster and the state of each node, though there are
plans to merge these together.

## Communication

Each node has a copy of the **`ReplicaSetConfig`** in the `ReplicationCoordinator` that lists all
nodes in the replica set. This config lets each node talk to every other node.

Each node uses the internal client, the legacy c++ driver code in the
[`src/mongo/client`](https://github.com/mongodb/mongo/tree/r4.2.0/src/mongo/client) directory, to
talk to each other node. Nodes talk to each other by sending a mixture of external and internal
commands over the same incoming port as user commands. All commands take the same code path as
normal user commands. For security, nodes use the keyfile to authenticate to each other. You need to
be the system user to run replication commands, so nodes authenticate as the system user when
issuing remote commands to other nodes.

Each node communicates with other nodes at regular intervals to:

* Check the liveness of the other nodes (heartbeats)
* Stay up to date with the primary (oplog fetching)
* Update their sync source with their progress (`replSetUpdatePosition` commands)

Each oplog entry is assigned a unique `OpTime` to describe when it occurred so other nodes can
compare how up-to-date they are.

OpTimes include a timestamp and a term field. The term field indicates how many elections have
occurred since the replica set started.

The election protocol, known as
[protocol version 1 or PV1](https://docs.mongodb.com/manual/reference/replica-set-protocol-versions/),
is built on top of [Raft](https://raft.github.io/raft.pdf), so it is guaranteed that two primaries
will not be elected in the same term. This helps differentiate ops that occurred at the same time
but from different primaries in the case of a network partition.

### Oplog Fetcher Responses

The `OplogFetcher` just issues normal `find` and `getMore` commands, so the upstream node (the sync
source) does not get any information from the request. In the response, however, the downstream
node, the one that issues the `find` to its sync source, gets metadata that it uses to update its
view of the replica set.

There are two types of metadata, `ReplSetMetadata` and `OplogQueryMetadata`. (The
`OplogQueryMetadata` is new, so there is some temporary field duplication for backwards
compatibility.)

#### ReplSetMetadata

`ReplSetMetadata` comes with all replication commands and is processed similarly for all commands.
It includes:

1. The upstream node's last committed OpTime
2. The current term.
3. The `ReplicaSetConfig` version (this is used to determine if a reconfig has occurred on the
   upstream node that hasn't been registered by the downstream node yet).
4. The replica set ID.

If the metadata has a different config version than the downstream node's config version, then the
metadata is ignored until a reconfig command is received that synchronizes the config versions.

The node sets its term to the upstream node's term, and if it's a primary (which can only happen on
heartbeats), it steps down.

The last committed OpTime is only used in this metadata for
[arbiters](https://docs.mongodb.com/manual/core/replica-set-arbiter/), to advance their committed
OpTime and in sharding in some places. Otherwise it is ignored.

#### OplogQueryMetadata

`OplogQueryMetadata` only comes with `OplogFetcher` responses. It includes:

1. The upstream node's last committed OpTime. This is the most recent operation that would be
   reflected in the snapshot used for `readConcern: majority` reads.
2. The upstream node's last applied OpTime.
3. The index (as specified by the `ReplicaSetConfig`) of the node that the upstream node thinks is
   primary.
4. The index of the upstream node's sync source.

If the metadata says there is still a primary, the downstream node resets its election timeout into
the future.

The downstream node sets its last committed OpTime to the last committed OpTime of the upstream
node.

When it updates the last committed OpTime, it chooses a new committed snapshot if possible and tells
the storage engine to erase any old ones if necessary.

Before sending the next `getMore`, the downstream node uses the metadata to check if it should
change sync sources.

### Heartbeats

At a default of every 2 seconds, the `HeartbeatInterval`, every node sends a heartbeat to every
other node with the `replSetHeartbeat` command. This means that the number of heartbeats increases
quadratically with the number of nodes and is the reasoning behind the 50 member limit in a replica
set. The data, `ReplSetHeartbeatArgsV1` that accompanies every heartbeat is:

1. `ReplicaSetConfig` version
2. The id of the sender in the `ReplSetConfig`
3. Term
4. Replica set name
5. Sender host address

When the remote node receives the heartbeat, it first processes the heartbeat data, and then sends a
response back. First, the remote node makes sure the heartbeat is compatible with its replica set
name and its `ReplicaSetConfig` version and otherwise sends an error.

The receiving node's `TopologyCoordinator` updates the last time it received a heartbeat from the
sending node for liveness checking in its `MemberHeartbeatData` list.

If the sending node's config is higher than the receiving node's, then the receiving node schedules
a heartbeat to get the config. The receiving node's `ReplicationCoordinator` also updates its
`SlaveInfo` with the last update from the sending node and marks it as being up.

It then creates a `ReplSetHeartbeatResponse` object. This includes:

1. Replica set name
2. The receiving node's election time
3. The receiving node's last applied OpTime
4. The receiving node's last durable OpTime
5. The node the receiving node thinks is primary
6. The term of the receiving node
7. The state of the receiving node
8. The receiving node's sync source
9. The receiving node's `ReplicaSetConfig` version
10. Whether the receiving node is primary

When the sending node receives the response to the heartbeat, it first processes its
`ReplSetMetadata` like before.

The sending node postpones its election timeout if it sees a primary.

The `TopologyCoordinator` updates its `HeartbeatData`. It marks if the receiving node is up or down.

The sending node's `TopologyCoordinator` then looks at the response and decides the next action to
take: no action, priority takeover, or reconfig,

The `ReplicationCoordinator` then updates the `SlaveInfo` for the receiving node with its most
recently acquired OpTimes.

The next heartbeat is scheduled and then the next action set by the `TopologyCoordinator` is
executed.

If the action was a priority takeover, then the node ranks all of the priorities in its config and
assigns itself a priority takeover timeout proportional to its rank. After that timeout expires the
node will check if it's eligible to run for election and if so will begin an election. The timeout
is simply: `(election timeout) * (priority rank + 1)`.

### Commit Point Propagation

The replication majority **commit point** refers to an OpTime such that all oplog entries with an
OpTime earlier or equal to it have been replicated to a majority of nodes in the replica set. It is
influenced by the [`lastApplied`](#replication-timestamp-glossary) and the
[`lastDurable`](#replication-timestamp-glossary) OpTimes.

On the primary, we advance the commit point by checking what the highest `lastApplied` or
`lastDurable` is on a majority of the nodes. This OpTime must be greater than the current
`commit point` for the primary to advance it. Any threads blocking on a writeConcern are woken up
to check if they now fulfill their requested writeConcern.

When `getWriteConcernMajorityShouldJournal` is set to true, the
[`_lastCommittedOpTime`](#replication-timestamp-glossary) is set to the `lastDurable` OpTime. This
means that the server acknowledges a write operation after a majority has written to the on-disk
journal. Otherwise, `_lastCommittedOpTime` is set using the `lastApplied`.

Secondaries advance their commit point via heartbeats by checking if the commit point is in the
same term as their `lastApplied` OpTime. This ensures that the secondary is on the same branch of
history as the commit point. Additionally, they can update their commit point via the spanning tree
by taking the minimum of the learned commit point and their `lastApplied`.

### Update Position Commands

The last way that replica set nodes regularly communicate with each other is through
`replSetUpdatePosition` commands. The `ReplicationCoordinatorExternalState` creates a
[**`SyncSourceFeedback`**](https://github.com/mongodb/mongo/blob/r4.2.0/src/mongo/db/repl/sync_source_feedback.h)
object at startup that is responsible for sending `replSetUpdatePosition` commands.

The `SyncSourceFeedback` starts a loop. In each iteration it first waits on a condition variable
that is notified whenever the `ReplicationCoordinator` discovers that a node in the replica set has
replicated more operations and become more up-to-date. It checks that it is not in `PRIMARY` or
STARTUP state before moving on.

It then gets the node's sync source and creates a
[**`Reporter`**](https://github.com/mongodb/mongo/blob/r4.2.0/src/mongo/db/repl/reporter.h) that
actually sends the `replSetUpdatePosition` command to the sync source. This command keeps getting
sent every `keepAliveInterval` milliseconds (`(electionTimeout / 2)`) to maintain liveness
information about the nodes in the replica set.

`replSetUpdatePosition` commands are the primary means of maintaining liveness. Thus, if the primary
cannot communicate directly with every node, but it can communicate with every node through other
nodes, it will still stay primary.

The `replSetUpdatePosition` command contains the following information:

1. An `optimes` array containing an object for each live replica set member. This information is
   filled in by the `ReplicationCoordinator` with information from its `SlaveInfo`. Nodes that are
   believed to be down are not included. Each node contains:

    1. last durable OpTime
    2. last applied OpTime
    3. memberId
    4. `ReplicaSetConfig` version

2. `ReplSetMetadata`. Usually this only comes in responses, but here it comes in the request as
   well.

When a node receives a `replSetUpdatePosition` command, the first thing it does is have the
`ReplicationCoordinator` process the `ReplSetMetadata` as before.

For every node’s OpTime data in the `optimes` array, the receiving node updates its view of the
replicaset in the replication and topology coordinators. This updates the liveness information of
every node in the `optimes` list. If the data is about the receiving node, it ignores it. If the
`ReplSetConfig` versions don’t match, it errors. If the receiving node is a primary and it learns
that the commit point should be moved forward, it does so.

If something has changed and the receiving node itself has a sync source, it forwards its new
information to its own sync source.

The `replSetUpdatePosition` command response does not include any information unless there is an
error, such as in a `ReplSetConfig` mismatch.

## Read Concern

All reads in MongoDB are executed on snapshots of the data taken at some point in time. However, for
all read concern levels other than 'snapshot', if the storage engine yields while executing a read,
the read may continue on a newer snapshot. Thus, reads are currently not guaranteed to return all
data from one point in time. This means that some documents can be skipped if they are updated and
any updates that occurred since the read began may or may not be seen.

[Read concern](https://docs.mongodb.com/manual/reference/read-concern/) is an option sent with any
read command to specify at what consistency level the read should be satisfied. There are 5 read
concern levels:

* Local
* Majority
* Linearizable
* Snapshot
* Available

**Local** just returns whatever the most up-to-date data is on the node. On a primary, it does this
by reading from the storage engine's most recent snapshot. On a secondary, it performs a timestamped
read at the lastApplied, so that it does not see writes from the batch that is currently being
applied. For information on how local read concern works within a multi-document transaction, see
the [Read Concern Behavior Within Transactions](#read-concern-behavior-within-transactions) section.

**Majority** does a timestamped read at the stable timestamp (also called the last committed
snapshot in the code, for legacy reasons). The data read only reflects the oplog entries that have
been replicated to a majority of nodes in the replica set. Any data seen in majority reads cannot
roll back in the future. Thus majority reads prevent **dirty reads**, though they often are
**stale reads**.

Read concern majority reads usually return as fast as local reads, but sometimes will block. Read
concern majority reads do not wait for anything to be committed; they just use different snapshots
from local reads. They do block though when the node metadata (in the catalog cache) differs from
the committed snapshot. For example, index builds or drops, collection creates or drops, database
drops, or collmod’s could cause majority reads to block. If the primary receives a `createIndex`
command, subsequent majority reads will block until that index build is finished on a majority of
nodes. Majority reads also block right after startup or rollback when we do not yet have a committed
snapshot.

For information on how majority read concern works within a multi-document transaction, see the
[Read Concern Behavior Within Transactions](#read-concern-behavior-within-transactions) section.

**Linearizable** read concern actually does block for some time. Linearizability guarantees that if
one thread does a write that is acknowledged and tells another thread about that write, then that
second thread should see the write. If you transiently have 2 primaries (one has yet to step down)
and you read the data from the old primary, the new one may have newer data and you may get a stale
read.

To prevent reading from stale primaries, reads block to ensure that the current node remains the
primary after the read is complete. Nodes just write a noop to the oplog and wait for it to be
replicated to a majority of nodes. The node reads data from the most recent snapshot, and then the
noop write occurs after the fact. Thus, since we wait for the noop write to be replicated to a
majority of nodes, linearizable reads satisfy all of the same guarantees of read concern majority,
and then some. Linearizable read concern reads are only done on the primary, and they only apply to
single document reads, since linearizability is only defined as a property on single objects.

Linearizable read concern is not allowed within a multi-document transaction.

**Snapshot** read concern can only be run within a multi-document transaction. See the
[Read Concern Behavior Within Transactions](#read-concern-behavior-within-transactions) section for
more information.

**afterOpTime** is another read concern option, only used internally, only for config servers as
replica sets. **Read after optime** means that the read will block until the node has replicated
writes after a certain OpTime. This means that if read concern local is specified it will wait until
the local snapshot is beyond the specified OpTime. If read concern majority is specified it will
wait until the committed snapshot is beyond the specified OpTime.

**afterClusterTime** is a read concern option used for supporting **causal consistency**.
<!-- TODO: link to the Causal Consistency section of the Sharding Architecture Guide -->

# enableMajorityReadConcern Flag

`readConcern: majority` is enabled by default for WiredTiger in MongoDB. This can be problematic
for systems that use arbiters because it is possible for writes to be accepted but not majority
committed. Accepting writes without moving the majority commit point forward will increase WT cache
pressure until the primary suffers severe performance issues, like stalling. Arbiters cause
primaries to do this, for example, when secondaries are down for maintenance.

`enableMajorityReadConcern=false` (`eMRC=false`) is the recommended configuration for replica sets
with arbiters. When majority reads are disabled, the storage engine no longer maintains history as
far back as the majority commit point. The replication system sets the
[`stableTimestamp`](#replication-timestamp-glossary) to the newest `all_durable` timestamp, meaning
that we can take stable checkpoints that are not necessarily majority committed.

Some significant impacts of this flag in the replication system include changes to
[`Cross Shard Transactions`](#cross-shard-Transactions-and-the-prepared-state), and
[rollback](#rollback).

For more information on how this impacts `Change Streams`, please refer to the Query Architecture
Guide. <!-- TODO Link to Change Streams Section in Query Arch Guide -->

## eMRC=false and Rollback

Even though we added support for taking stable checkpoints when `eMRC=false`, we must still use the
`rollbackViaRefetch` algorithm instead of the `Recover to A Timestamp` algorithm. As aforementioned,
when `eMRC=false`, the replication system will set the `stableTimestamp` to the newest `all_durable`
timestamp. This allows us to take stable checkpoints that are not necessarily majority committed.
Consequently, the `stableTimestamp` can advance past the `majority commit point`, which will break
the `Recover to A Timestamp` algorithm.

### rollbackViaRefetch

Nodes go into rollback if after they receive the first batch of writes from their sync source, they
realize that the greater than or equal to predicate did not return the last op in their oplog. When
rolling back, nodes are in the `ROLLBACK` state and reads are prohibited. When a node goes into
rollback it drops all snapshots.

The rolling-back node first finds the common point between its oplog and its sync source's oplog.
It then goes through all of the operations in its oplog back to the common point and figures out
how to undo them.

Simply doing the "inverse" operation is sometimes impossible, such as a document remove where we do
not log the entire document that is removed. Instead, the node simply refetches the problematic
documents, or entire collections in the case of undoing a `drop`, from the sync source and replaces
the local version with that version. Some operations also have special handling, and some just
fail, such as `dropDatabase`, causing the entire node to shut down.

The node first compiles a list of documents, collections, and indexes to fetch and drop. Before
actually doing the undo steps, the node "fixes up" the operations by "cancelling out" operations
that negate each other to reduce work. The node then drops and fetches all data it needs and
replaces the local version with the remote versions.

The node gets the last applied OpTime from the sync source and the Rollback ID to check if a
rollback has happened during this rollback, in which case it fails rollback and shuts down. The
last applied OpTime is set as the `minValid` for the node and the node goes into RECOVERING state.
The node resumes fetching and applying operations like a normal secondary until it hits that
`minValid`. Only at that point does the node go into SECONDARY state.

This process is very similar to initial sync and startup after an unclean shutdown in that
operations are applied on data that may already reflect those operations and operations in the
future. This leads to all of the same idempotency concerns and index constraint relaxation.

Though we primarily use the `Recover To A Timestamp` algorithm from MongoDB 4.0 and onwards, we
must still keep the `rollbackViaRefetch` to support `eMRC=false` nodes.

## eMRC=false and Single Replica Set Transactions

[Single replica set transactions](#transactions) either do untimestamped reads or read from the
[all_durable](#replication-timestamp-glossary) when using `readConcern: snapshot`, so they do not
rely on storage engine support for reading from a majority committed snapshot. Therefore, single
replica set transactions should work the same with `readConcern: majority` disabled.

## eMRC=false and Cross Shard Transactions

It is illegal to run a [cross-shard transaction](#cross-shard-Transactions-and-the-prepared-state)
on shards that contain an arbiter or have a primary with `eMRC=false`. Shards that contain an
arbiter could indefinitely accept writes but not commit them, which affects the liveness of
cross-shard transactions. Consider a case where we try to commit a cross-shard transaction, but
are unable to put the transaction into the prepare state without a majority of the set.

Additionally, the `rollbackViaRefetch` algorithm does not support `prepare` oplog entries, so we
do not allow cross-shard transactions to run on replica sets that have `eMRC=false` nodes. We
automatically fail the `prepareTransaction` command in these cases, which will prevent a
transaction from even being prepared on an invalid shard or replica set node. This is safe because
it will cause the entire transaction to abort.

In addition to explicitly failing the `prepareTransaction` command, we also crash when replaying
the `prepare` oplog entry during [startup recovery](#startup-recovery) on `eMRC=false` nodes. This
allows us to avoid issues regarding replaying `prepare` oplog entries after recovering from an
unstable checkpoint. The only way to replay these entries and complete recovery is to restart the
node with `eMRC=true`.

# Transactions

**Multi-document transactions** were introduced in MongoDB to provide atomicity for reads and writes
to multiple documents either in the same collection or across multiple collections. Atomicity in
transactions refers to an "all-or-nothing" principle. This means that when a transaction commits,
it will not commit some of its changes while rolling back others. Likewise, when a transaction
aborts, all of its operations abort and all corresponding data changes are aborted.

## Life of a Multi-Document Transaction

All transactions are associated with a server session and at any given time, only one open
transaction can be associated with a single session. The state of a transaction is maintained
through the
[`TransactionParticipant`](https://github.com/mongodb/mongo/blob/r4.2.0/src/mongo/db/transaction_participant.h),
which is a decoration on the session. Any thread that attempts to modify the state of the
transaction, which can include committing, aborting, or adding an operation to the transaction, must
have the correct session checked out before doing so. Only one operation can check out a session at
a time, so other operations that need to use the same session must wait for it to be checked back in.

### Starting a Transaction

Transactions are started on the server by the first operation in the transaction, indicated by a
`startTransaction: true` parameter. All operations in a transaction must include an `lsid`, which is
a unique ID for a session, a `txnNumber`, and an `autocommit:false` parameter. The `txnNumber` must
be higher than the previous `txnNumber` on this session. Otherwise, we will throw a
`TransactionTooOld` error.

When starting a new transaction, we implicitly abort the previously running transaction (if one
exists) on the session by updating our `txnNumber`. Next, we update our `txnState` to
`kInProgress`. The `txnState` maintains the state of the transaction and allows us to determine
legal state transitions. Finally, we reset the in memory state of the transaction as well as any
corresponding transaction metrics from a previous transaction.

When a node starts a transaction, it will acquire the global lock in intent exclusive mode (and as a
result, the [RSTL](#replication-state-transition-lock) in intent exclusive as well), which it will
hold for the duration of the transaction. The only exception is when
[preparing a transaction](#preparing-a-transaction-on-the-primary), which will release the RSTL and
reacquire it when [committing](#committing-a-prepared-transaction) or
[aborting](#aborting-a-prepared-transaction) the transaction. It also opens a `WriteUnitOfWork`,
which begins a storage engine transaction on the `RecoveryUnit`. The `RecoveryUnit` is responsible
for making sure data is persisted and all on-disk data must be modified through this interface. The
storage transaction is updated every time an operation comes in so that we can read our own writes
within a multi-document transaction. These changes are not visible to outside operations because the
node hasn't committed the transaction (and therefore, the WUOW) yet.

### Adding Operations to a Transaction

A user can add additional operations to an existing multi-document transaction by running more
commands on the same session. These operations are then stored in memory. Once a write completes on
the primary, we update the corresponding `sessionTxnRecord` in the transactions table
(`config.transactions`) with information about the transaction. This includes things like the
`lsid`, the `txnNumber` currently associated with the session, and the `txnState`.

This table was introduced for retryable writes and is used to keep track of retryable write and
transaction progress on a session. When checking out a session, this table can be used to restore
the transaction's state. See the
[Recovering Prepared Transactions](#recovering-prepared-transactions) section for information on how
the transactions table is used during transaction recovery.

### Committing a Single Replica Set Transaction

If we decide to commit this transaction, we retrieve those operations, group them into an `applyOps`
command and write down an `applyOps` oplog entry. Since an `applyOps` oplog entry can only be up to
16MB, transactions larger than this require multiple `applyOps` oplog entries upon committing.

If we are committing a read-only transaction, meaning that we did not modify any data, it must wait
for any data it reads to be majority committed regardless of the `readConcern` level.

Once we log the transaction oplog entries, we must commit the storage-transaction on the
`OperationContext`. This involves calling commit() on the WUOW. Once commit() is called on the WUOW
associated with a transaction, all writes that occurred during its lifetime will commit in the
storage engine.

Finally, we update the transactions table, update our local `txnState` to `kCommitted`, log any
transactions metrics, and clear our txnResources.

### Aborting a Single Replica Set Transaction

The process for aborting a multi-document transaction is simpler than committing since none of the
operations are visible at this point. We abort the storage transaction, update the
`sessionTxnRecord` in the transactions table, and write an abort oplog entry. Finally, we change
our local `txnState` to `kAbortedWithoutPrepare`. We now log any transactions metrics and reset the
in memory state of the `TransactionParticipant`.

Note that transactions can abort for reasons outside of the `abortTransaction` command. For example,
we abort non-prepared transactions that encounter write conflicts or state transitions.

## Cross-Shard Transactions and the Prepared State

In 4.2, we added support for **cross-shard transactions**, or transactions that involve data from
multiple shards in a cluster. We needed to add a **Two Phase Commit Protocol** to uphold the
atomicity of a transaction that involves multiple shards. One important part of the Two Phase Commit
Protocol is making sure that all shards participating in the transaction are in the
**prepared state**, or guaranteed to be able to commit, before actually committing the transaction.
This will allow us to avoid a situation where the transaction only commits on some of the shards and
aborts on others. Once a node puts a transaction in the prepared state, it *must* be able to commit
the transaction if we decide to commit the overall cross-shard transaction.

Another key piece of the Two Phase Commit Protocol is the **`TransactionCoordinator`**, which is
the first shard to receive an operation for a particular transaction. The `TransactionCoordinator`
will coordinate between all participating shards to ultimately commit or abort the transaction.

When the `TransactionCoordinator` is told to commit a transaction, it must first make sure that all
participating shards successfully prepare the transaction before telling them to commit the
transaction. As a result, the coordinator will issue the `prepareTransaction` command, an internal
command, on each shard participating in the transaction.

Each participating shard must majority commit the `prepareTransaction` command (thus making sure
that the prepare operation cannot be rolled back) before the `TransactionCoordinator` will send out
the `commitTransaction` command. This will help ensure that once a node prepares a transaction, it
will remain in the prepared state until the transaction is committed or aborted by the
`TransactionCoordinator`. If one of the shards fails to prepare the transaction, the
`TransactionCoordinator` will tell all participating shards to abort the transaction via the
`abortTransaction` command regardless of whether they have prepared it or not.

The durability of the prepared state is managed by the replication system, while the Two Phase
Commit Protocol is managed by the sharding system.

## Lifetime of a Prepared Transaction

Until a `prepareTransaction` command is run for a particular transaction, it follows the same path
as a single replica set transaction. But once a transaction is in the prepared state, new operations
cannot be added to it. The only way for a transaction to exit the prepared state is to either
receive a `commitTransaction` or `abortTransaction` command. This means that prepared transactions
must [survive state transitions and failovers](#state-transitions-and-failovers-with-transactions).
Additionally, there are many situations that need to be prevented to preserve prepared transactions.
For example, they cannot be killed or time out (nor can their sessions), manual updates to the
transactions table are forbidden for transactions in the prepared state, and the prepare transaction
oplog entry(s) cannot fall off the back of the oplog.

### Preparing a Transaction on the Primary

When a primary receives a `prepareTransaction` command, it will transition the associated
transaction's `txnState` to `kPrepared`. Next it will reserve an **oplog slot** (which is a unique
`OpTime`) for the `prepareTransaction` oplog entry. The `prepareTransaction` oplog entry will
contain all the operations from the transaction, which means that if the transaction is larger than
16MB (and thus requires multiple oplog entries), the node will reserve multiple oplog slots. The
`OpTime` for the `prepareTransaction` oplog entry will be used for the
[**`prepareTimestamp`**](#replication-timestamp-glossary).

The node will then set the `prepareTimestamp` on the `RecoveryUnit` and mark the storage engine's
transaction as prepared so that the storage engine can
[block conflicting reads and writes](#prepare-conflicts) until the transaction is committed or
aborted.

Next, the node will create the `prepareTransaction` oplog entry and write it to the oplog. This will
involve taking all the operations from the transaction and storing them as an `applyOps` oplog
entry (or multiple `applyOps` entries for larger transactions). The node will also make a couple
updates to the transactions table. It will update the starting `OpTime` of the transaction, which
will either be the `OpTime` of the prepare oplog entry or, in the case of larger transactions, the
`OpTime` of the first oplog entry of the transaction. It will also update that the state of the
transaction is `kPrepared`. This information will be useful if the node ever needs to recover the
prepared transaction in the event of failover.

If any of the above steps fails when trying to prepare a transaction, then the node will abort the
transaction. If that happens, the node will respond back to the `TransactionCoordinator` that the
transaction failed to prepare. This will cause the `TransactionCoordinator` to tell all other
participating shards to abort the transaction, thus preserving the atomicity of the transaction. If
this happens, it is safe to retry the entire transaction.

Finally, the node will record metrics, release the [RSTL](#replication-state-transition-lock) (while
still holding the global lock) to allow prepared transactions to survive state transitions, and
respond with the `prepareTimestamp` to the `TransactionCoordinator`.

### Prepare Conflicts

A **prepare conflict** is generated when an operation attempts to read a document that was updated
as a part of an active prepared transaction. Since the transaction is still in the prepared state,
it's not yet known whether it will commit or abort, so updates made by a prepared transaction can't
be made visible outside the transaction until it completes.

Based on the read concern, reads will do different things in this case. A read with read concern
local, available or majority (without causal consistency) will not cause a prepare conflict to be
generated by the storage engine, but instead will return the state of the data before the prepared
update. Reads using snapshot, linearizable, or afterClusterTime read concerns, will block and wait
until the transaction is committed or aborted to serve the read.

If a write attempts to modify a document that was also modified by a prepared transaction, it will
block and wait for the transaction to be committed or aborted before proceeding.

### Committing a Prepared Transaction

Committing a prepared transaction is very similar to
[committing a single replica set transaction](#committing-a-single-replica-set-transaction). One of
the main differences is that the commit oplog entry will not have any of the operations from the
transaction in it, because those were already included in the prepare oplog entry(s).

For a cross-shard transaction, the `TransactionCoordinator` will issue the `commitTransaction`
command to all participating shards when each shard has majority committed the `prepareTransaction`
command. The `commitTransaction` command must be run with a specified
[`commitTimestamp`](#replication-timestamp-glossary) so that all participating shards can commit the
transaction at the same timestamp. This will be the timestamp at which the effects of the
transaction are visible.

When a node receives the `commitTransaction` command and the transaction is in the prepared state,
it will first re-acquire the [RSTL](#replication-state-transition-lock) to prevent any state
transitions from happening while the commit is in progress. It will then reserve an oplog slot,
commit the storage transaction at the `commitTimestamp`, write the `commitTransaction` oplog entry
into the oplog, update the transactions table, transition the `txnState` to `kCommitted`, record
metrics, and clean up the transaction resources.

### Aborting a Prepared Transaction

Aborting a prepared transaction is very similar to
[aborting a non-prepared transaction](#aborting-a-single-replica-set-transaction). The only
difference is that before aborting a prepared transaction, the node must re-acquire the
[RSTL](#replication-state-transition-lock) to prevent any state transitions from happening while
the abort is in progress. Non-prepared transactions don't have to do this because the node will
still have the RSTL at this point.

## State Transitions and Failovers with Transactions

### State Transitions and Failovers with Single Replica Set Transactions

The durability of a single replica set transaction is not guaranteed until the `commitTransaction`
command is majority committed. This means that in-progress transactions are not recovered during
failover or preserved during state transitions.

Unprepared transactions that are in-progress will be aborted during stepdown. During a
[stepdown](#step-down), transactions will still be holding the
[RSTL](#replication-state-transition-lock), which will conflict with the stepdown. As a result,
after enqueueing the RSTL during stepdown, the node will abort all unprepared transactions until it
can acquire the RSTL.

Transactions that are in-progress but not in the prepared state will be aborted during
[step up](#step-up). Being in progress during step up means that the transaction requires multiple
oplog entries and that the node has not received all the oplog entries it needs to prepare or commit
the transaction. As a result, the node will abort all such transactions before it steps up.

If a node goes through a shut down, it will not recover any unprepared transactions during
[startup recovery](#startup-recovery).

### Stepdown with a Prepared Transaction

Unlike unprepared transactions, which get aborted during a stepdown, prepared transactions need to
survive stepdown because shards are relying on the `prepareTransaction` command being (and
remaining) majority committed. As a result, after preparing a transaction, the node will release the
[RSTL](#replication-state-transition-lock) so that it does not end up conflicting with state
transitions. When [stepdown](#step-down) is aborting transactions before acquiring the RSTL, it will
only abort unprepared transactions. Once stepdown finishes, the node will yield locks from all
prepared transactions since secondaries don't hold locks for their transactions.

### Step Up with a Prepared Transaction

If a secondary has a prepared transaction when it [steps up](#step-up), it will have to re-acquire
all the locks for the prepared transaction (other than the RSTL), since the primary relies on
holding these locks to prevent conflicting operations.

### Recovering Prepared Transactions

The prepare state *must* endure any state transition or failover, so they must be recovered and
reconstructed in all situations. If the in-memory state of a prepared transaction is lost, it can be
reconstructed using the information in the prepare oplog entry(s).

[Startup recovery](#startup-recovery), [rollback](#rollback), and [initial sync](#initial-sync) all
use the same algorithm to reconstruct prepared transactions. In all situations, the node will go
through a period of applying oplog entries to get the data caught up with the rest of the replica
set.

As the node applies oplog entries, it will update the transaction table every time it encounters a
`prepareTransaction` oplog entry to save that the state of the transaction is prepared. Instead of
actually applying the oplog entry and preparing the transaction, the node will wait until oplog
application has completed to reconstruct the transaction. If the node encounters a
`commitTransaction` oplog entry, it will immediately commit the transaction. If the transaction it's
about to commit was prepared, the node will find the `prepareTransaction` oplog entry(s) using the
[`TransactionHistoryIterator`](https://github.com/mongodb/mongo/blob/r4.2.0/src/mongo/db/transaction_history_iterator.h)
to get the operations for the transaction, prepare it and then immediately commit it.

When oplog application for recovery or initial sync completes, the node will iterate
over all entries in the transactions table to see which transactions are still in the prepared
state. At that point, the node will find the `prepareTransaction` oplog entry(s) associated with the
transaction using the `TransactionHistoryIterator`. It will check out the session associated with
the transaction, apply all the operations from the oplog entry(s) and prepare the transaction.

## Read Concern Behavior Within Transactions

The read concern for all operations within a transaction should be specified when starting the
transaction. If no read concern was specified, the default read concern is local.

Reads within a transaction behave differently from reads outside of a transaction because of
**speculative** behavior. This means a transaction speculatively executes without ensuring that
the data read won't be rolled back until it commits. No matter the read concern, when a node goes to
commit a transaction, it waits for the data that it read to be majority committed *as long as the
transaction was run with write concern majority*. Because of speculative behavior, this means that
the transaction can only provide the guarantees of majority read concern, that data that it read
won't roll back, if it is run with write concern majority.

If the transaction did a write, then waiting for write concern is enough to ensure that all data
read will have since become majority committed. However, if the transaction was read-only, the node
will do a noop write and wait for that to be majority committed to provide the same guarantees.

### Local and Majority Read Concerns

There is currently no functional difference between a transaction with local and majority read
concern. The node will do untimestamped reads in either case. When the transaction is started, it
will choose the most recent snapshot to read from, so that it can read the freshest data.

The node does untimestamped reads because reading at the
[`all_durable`](#replication-timestamp-glossary) timestamp would mean that the node was reading
potentially stale data. This would also allow for more write conflicts that abort the transaction,
since there would be a larger window of time between when the read happens and when the transaction
commits for an outside write to conflict.

In theory, transactions with local read concern should not have to perform a noop write and wait for
it to be majority committed when the transaction commits. However, we intend to make "majority" the
default read concern level for transactions, in order to be consistent with the observation that any
MongoDB command that can write has speculative majority behavior (e.g. findAndModify). Until we
change transactions to have majority as their default read concern, it's important that local and
majority behave the same.

### Snapshot Read Concern

Snapshot read concern will choose a snapshot from which the transaction will read. If it is
specified with an `atClusterTime` argument, then that will be used as the transaction's read
timestamp. If `atClusterTime` is not specified, then the read timestamp of the transaction will be
the [`all_durable`](#replication-timestamp-glossary) timestamp when the transaction is started,
which ensures a snapshot with no oplog holes.

## Transaction Oplog Application

Secondaries begin replicating transaction oplog entries once the primary has either prepared or
committed the transaction. They use the `OplogApplier` to apply these entries, which then uses the
writer thread pool to schedule operations to apply.
<!-- TODO SERVER-43969: Link to oplog application section -->

Before secondaries process and apply transaction oplog entries, they will track operations that
require changes to `config.transactions`. This results in an update to the transactions table entry
(`sessionTxnRecord`) that corresponds to the oplog entry operation. For example,
`prepareTransaction`, `commitTransaction`, and `abortTransaction` will all update the `txnState`
accordingly.

### Unprepared Transactions Oplog Application

Unprepared transactions are comprised of `applyOps` oplog entries. When they are smaller than 16MB,
they will write a single `applyOps` oplog entry for the whole transaction upon commit. Since
secondaries do not need to wait for any additional entries, they can apply the entry for a small
unprepared transaction immediately.

When transactions are larger than 16MB, they use the `prevOpTime` field, which is the opTime of the
previous `applyOps` oplog entry, to link multiple `applyOps` oplog entries together. The
`partialTxn: true` field is used here to indicate that the transaction is incomplete and cannot be
applied immediately. Since the `partialTxn` field does not apply to all oplog entries, it is added
as a subfield of the 'o' field. A secondary must wait until it receives the final `applyOps` oplog
entry of a large unprepared transaction, which will have a non-empty `prevOpTime` field and no
`partialTxn` field, before applying entries. This ensures we have all the entries associated with
the transaction before applying them, allowing us to avoid failover scenarios where only part of a
large transaction is replicated.

When we see an `applyOps` oplog entry that is a part of an unprepared transaction, we will unpack
the CRUD operations and apply them in parallel by using the writer thread pool. For larger
transactions, once the secondary has received all the oplog entries, it will traverse the oplog
chain to get all the operations from the transaction and do the same thing.

Note that checking out the session is not necessary for unprepared transactions since they are
just a series of CRUD operations for secondary oplog application. The atomicity of data on disk is
guaranteed by recovery. The atomicity of visible data in memory is guaranteed by how we advance the
[`lastApplied`](#replication-timestamp-glossary) on secondaries.

### Prepared Transactions Oplog Application

Prepared transactions also write down `applyOps` oplog entries that contain all the operations for
the transaction, but do so when they are prepared. Prepared transactions smaller than 16MB only
write one of these entries, while those larger than 16MB write multiple.

We use a `prepare: true` field to indicate that an `applyOps` entry is for a prepared transaction.
For large prepared transactions, this field will be present in the last `applyOps` entry of the
oplog chain, indicating that the secondary must prepare the transaction. The timestamp of the
prepare oplog entry is referred to as the [`prepareTimestamp`](#replication-timestamp-glossary).

`prepareTransaction` oplog entries are applied in their own batches in a single WUOW. When
applying prepared operations, which are unpacked from the prepared `applyOps` entry, the applier
thread must first check out the appropriate session and **unstash** the transaction resources
(`txnResources`). `txnResources` refers to the lock state and storage state of a transaction. When
we "unstash" these resources, we transfer the management of them to the `OperationContext`. The
applier thread will then add the prepare operations to the storage transaction and finally yield
the locks used for transactions. This means that prepared transactions will only **stash** (which
transfers the management of `txnResources` back to the session) the recovery unit. Stashing the
locks would make secondary oplog application conflict with prepared transactions. These locks are
restored the next time we **unstash** `txnResources`, which would be for `commitTransaction` or
`abortTransaction`.

Prepared transactions write down separate `commitTransaction` and `abortTransaction` oplog entries.
`commitTransaction` oplog entries do not need to store the operations from the transaction since
we have already recorded them through the prepare oplog entries. These entries are also applied
in their own batches and follow the same procedure of checking out the appropriate session,
unstashing transaction resources, and either committing or aborting the storage transaction.

Note that secondaries can apply prepare oplog entries immediately but
[recovering](#recovering-prepared-transactions) nodes must wait until they finish the process or
see a commit oplog entry.

## Transaction Errors

### PreparedTransactionInProgress Errors

Starting a new transaction on a session with an already existing in-progress transaction can cause
the existing transaction to be **implicitly aborted**. Implicitly aborting a transaction happens if
the transaction is aborted without an explicit `abortTransaction` command. However, prepared
transactions cannot be implicitly aborted, since they can only complete after a `commitTransaction`
or `abortTransaction` command from the `TransactionCoordinator`. As a result, any attempt to start a
new transaction on a session that already has a prepared trasaction on it will fail with a
`PreparedTransactionInProgress` error.

Additionally, the only operations that can be run on a prepared transaction are
`prepareTransaction`, `abortTransaction`, and `commitTransaction`. If any other command is run on
the transaction, the command will fail with a `PreparedTransactionInProgress` error.

Commands run as a part of a transaction that fail with this error code will always have the
[`TransientTransactionError`](#transienttransactionerror-label) label attached to its response.

### NoSuchTransaction Errors

`NoSuchTransaction` errors are generated for commands that attempt to continue, prepare, commit, or
abort a transaction that isn't in progress. Two common reasons why this error is generated are
because the transaction has since started a new transaction or the transaction has already been
aborted.

All commands made in a transaction other than `commitTransaction` that fail with this error code
will always have the [`TransientTransactionError`](#transienttransactionerror-label) label attached
to its response. If the `commitTransaction` command failed with a `NoSuchTransaction` error without
a write concern error, then it will have the `TransientTransactionError` label attached to its
response. If it failed with a write concern error, then it wouldn't be safe to retry the entire
transaction since it could have committed on one of the nodes.

### TransactionTooOld Errors

If an attempt is made to start a transaction that is older than the current active or the last
committed transaction, then that operation will fail with `TransactionTooOld`.

### TransientTransactionError Label

A transaction could fail with one of the errors above or a different one. There are some errors that
will cause a transaction to abort with no persistent side effects. In these cases, the server will
attach the `TransientTransactionError` label to the response (which will still contain the orignal
error code), so that the caller knows that they can safely retry the entire transaction.

# Concurrency Control

## Parallel Batch Writer Mode

The **Parallel Batch Writer Mode** lock (also known as the PBWM or the Peanut Butter Lock) is a
global resource that helps manage the concurrency of running operations while a secondary is
applying a batch of oplog entries. Since secondary oplog application applies batches in parallel,
operations will not necessarily be applied in order, so a node will hold the PBWM while it is
waiting for the entire batch to be applied. For secondaries, in order to read at a consistent state
without needing the PBWM lock, a node will try to read at the
[`lastApplied`](#replication-timestamp-glossary) timestamp. Since `lastApplied` is set after a batch
is completed, it is guaranteed to be at a batch boundary. However, during initial sync there could
be changes from a background index build that occur after the `lastApplied` timestamp. Since there
is no guarantee that `lastApplied` will be advanced again, if a node sees that there are pending
changes ahead of `lastApplied`, it will acquire the PBWM to make sure that there isn't an in-progress
batch when reading, and read without a timestamp to ensure all writes are visible, including those
later than the `lastApplied`.

## Replication State Transition Lock

When a node goes through state transitions, it needs something to manage the concurrency of that
state transition with other ongoing operations. For example, a node that is stepping down used to be
able to accept writes, but shouldn't be able to do so until it becomes primary again. As a result,
there is the **Replication State Transition Lock** (or RSTL), a global resource that manages the
concurrency of state transitions.

It is acquired in exclusive mode for the following replication state transitions: `PRIMARY` to
`SECONDARY` (step down), `SECONDARY` to `PRIMARY` (step up), `SECONDARY` to `ROLLBACK` (rollback),
`ROLLBACK` to `SECONDARY`, and `SECONDARY` to `RECOVERING`. Operations can hold it when they need to
ensure that the node won't go through any of the above state transitions. Some examples of
operations that do this are [preparing](#preparing-a-transaction-on-the-primary) a transaction,
[committing](#committing-a-prepared-transaction) or [aborting](#aborting-a-prepared-transaction) a
prepared transaction, and checking/setting if the node can accept writes or serve reads.

## Global Lock Acquisition Ordering

Both the PBWM and RSTL are global resources that must be acquired before the global lock is
acquired. The node must first acquire the PBWM in [intent
shared](https://docs.mongodb.com/manual/reference/glossary/#term-intent-lock) mode. Next, it must
acquire the RSTL in intent exclusive mode. Only then can it acquire the global lock in its desired
mode.

# Elections

## Step Up

There are a number of ways that a node will run for election:
* If it hasn't seen a primary within the election timeout (which defaults to 10 seconds).
* If it realizes that it has higher priority than the primary, it will wait and run for
  election (also known as a **priority takeover**). The amount of time the node waits before calling
  an election is directly related to its priority in comparison to the priority of rest of the set
  (so higher priority nodes will call for a priority takeover faster than lower priority nodes).
  Priority takeovers allow users to specify a node that they would prefer be the primary.
* Newly elected primaries attempt to catchup to the latest applied OpTime in the replica
  set. Until this process (called primary catchup) completes, the new primary will not accept
  writes. If a secondary realizes that it is more up-to-date than the primary and the primary takes
  longer than `catchUpTakeoverDelayMillis` (default 30 seconds), it will run for election. This
  behvarior is known as a **catchup takeover**. If primary catchup is taking too long, catchup
  takeover can help allow the replica set to accept writes sooner, since a more up-to-date node will
  not spend as much time (or any time) in catchup. See the "Transitioning to `PRIMARY`" section for
  further details on primary catchup.
* The `replSetStepUp` command can be run on an eligible node to cause it to run for election
  immediately. We don't expect users to call this command, but it is run internally for election
  handoff and testing.
* When a node is stepped down via the `replSetStepDown` command, if the `enableElectionHandoff`
  parameter is set to true (the default), it will choose an eligible secondary to run the
  `replSetStepUp` command on a best-effort basis. This behavior is called **election handoff**. This
  will mean that the replica set can shorten failover time, since it skips waiting for the election
  timeout. If `replSetStepDown` was called with `force: true` or the node was stepped down while
  `enableElectionHandoff` is false, then nodes in the replica set will wait until the election
  timeout triggers to run for election.


### Candidate Perspective

A candidate node first runs a dry-run election. In a **dry-run election**, a node starts a
[`VoteRequester`](https://github.com/mongodb/mongo/blob/r4.2.0/src/mongo/db/repl/vote_requester.h),
which uses a
[`ScatterGatherRunner`](https://github.com/mongodb/mongo/blob/r4.2.0/src/mongo/db/repl/scatter_gather_runner.h)
to send a `replSetRequestVotes` command to every node asking if that node would vote for it. The
candidate node does not increase its term during a dry-run because if a primary ever sees a higher
term than its own, it steps down. By first conducting a dry-run election, we make it unlikely that
nodes will increase their own term when they would not win and prevent needless primary stepdowns.
If the node fails the dry-run election, it just continues replicating as normal. If the node wins
the dry-run election, it begins a real election.

If the candidate was stepped up as a result of an election handoff, it will skip the dry-run and
immediately call for a real election.

In the real election, the node first increments its term and votes for itself. It then follows the
same process as the dry-run to start a `VoteRequester` to send a `replSetRequestVotes` command to
every single node. Each node then decides if it should vote "aye" or "nay" and responds to the
candidate with their vote. The candidate node must be at least as up to date as a majority of voting
members in order to get elected.

If the candidate received votes from a majority of nodes, including itself, the candidate wins the
election.

### Voter Perspective

When a node receives a `replSetRequestVotes` command, it first checks if the term is up to date and
updates its own term accordingly. The `ReplicationCoordinator` then asks the `TopologyCoordinator`
if it should grant a vote. The vote is rejected if:

1. It's from an older term.
2. The config versions do not match.
3. The replica set name does not match.
4. The last applied OpTime that comes in the vote request is older than the voter's last applied
   OpTime.
5. If it's not a dry-run election and the voter has already voted in this term.
6. If the voter is an arbiter and it can see a healthy primary of greater or equal priority. This is
   to prevent primary flapping when there are two nodes that can't talk to each other and an arbiter
   that can talk to both.

Whenever a node votes for itself, or another node, it records that "LastVote" information durably to
the `local.replset.election` collection. This information is read into memory at startup and used in
future elections. This ensures that even if a node restarts, it does not vote for two nodes in the
same term.

### Transitioning to `PRIMARY`

Now that the candidate has won, it must become `PRIMARY`. First it clears its sync source and
notifies all nodes that it won the election via a round of heartbeats. Then the node checks if it
needs to catch up from the former primary. Since the node can be elected without the former
primary's vote, the primary-elect will attempt to replicate any remaining oplog entries it has not
yet replicated from any viable sync source. While these are guaranteed to not be committed, it is
still good to minimize rollback when possible.

The primary-elect uses the responses from the recent round of heartbeats to see the latest applied
OpTime of every other node. If the primary-elect’s last applied OpTime is less than the newest last
applied OpTime it sees, it will set that as its target OpTime to catch up to. At the beginning of
catchup, the primary-elect will schedule a timer for the catchup-timeout. If that timeout expires or
if the node reaches the target OpTime, then the node ends the catch-up phase. The node then clears
its sync source and stops the `OplogFetcher`.

We will ignore whether or not **chaining** is enabled for primary catchup so that the primary-elect
can find a sync source. And one thing to note is that the primary-elect will not necessarily sync
from the most up-to-date node, but its sync source will sync from a more up-to-date node. This will
mean that the primary-elect will still be able to catchup to its target OpTime. Since catchup is
best-effort, it could time out before the node has applied operations through the target OpTime.
Even if this happens, the primary-elect will not step down.

At this point, whether catchup was successful or not, the node goes into "drain mode". This is when
the node has already logged "transition to `PRIMARY`", but has not yet applied all of the oplog
entries in its oplog buffer. `replSetGetStatus` will now say the node is in `PRIMARY` state. The
applier keeps running, and when it completely drains the buffer, it signals to the
`ReplicationCoordinator` to finish the step up process. The node marks that it can begin to accept
writes. According to the Raft Protocol, we cannot update the commit point to reflect oplog entries
from previous terms until the commit point is updated to reflect an oplog entry in the current term.
The node writes a "new primary" noop oplog entry so that it can commit older writes as soon as
possible. Once the commit point is updated to reflect the "new primary" oplog entry, older writes
will automatically be part of the commit point by nature of happening before the term change.
Finally, the node drops all temporary collections, restores all locks for
[prepared transactions](#step-up-with-a-prepared-transaction), aborts all
[in progress transactions](#state-transitions-and-failovers-with-single-replica-set-transactions),
and logs “transition to primary complete”. At this point, new writes will be accepted by the
primary.

## Step Down

### Conditional

The `replSetStepDown` command is one way that a node relinquishes its position as primary. We
consider this a conditional step down because it can fail if the following conditions are not met:
* `force` is true and now > `waitUntil` deadline, which is the amount of time we will wait before
stepping down (Note: If `force` is true, only this condition needs to be met)
* The [`lastApplied`](#replication-timestamp-glossary) OpTime of the primary must be replicated to
a majority of the nodes
* At least one of the up-to-date secondaries is also electable

When a `replSetStepDown` command comes in, the node begins to check if it can step down. First, the
node attempts to acquire the [RSTL](#replication-state-transition-lock). In order to do so, it must
kill all conflicting user/system operations and abort all unprepared transactions.

Now, the node loops trying to step down. If force is `false`, it repeatedly checks if a majority of
nodes have reached the `lastApplied` optime, meaning that they are caught up. It must also check
that at least one of those nodes is electable. If force is `true`, it does not wait for these
conditions and steps down immediately after it reaches the `waitUntil` deadline.

Upon a successful stepdown, it yields locks held by
[prepared transactions](#stepdown-with-a-prepared-transaction) because we are now a secondary.
Finally, we log stepdown metrics and update our member state to `SECONDARY`.

### Unconditional

Stepdowns can also occur for the following reasons:
* If the primary learns of a higher term
* Liveness timeout: If a primary stops being able to transitively communicate with a majority of
nodes. The primary does not need to be able to communicate directly with a majority of nodes. If
primary A can’t communicate with node B, but A can communicate with C which can communicate with B,
that is okay. If you consider the minimum spanning tree on the cluster where edges are connections
from nodes to their sync source, then as long as the primary is connected to a majority of nodes, it
will stay primary.
* Force reconfig via the `replSetReconfig` command
* Force reconfig via heartbeat: If we learn of a newer config version through heartbeats, we will
schedule a replica set config change.

During unconditional stepdown, we do not check preconditions before attempting to step down. Similar
to conditional stepdowns, we must kill any conflicting user/system operations before acquiring the
RSTL and yield locks of prepared transactions following a successful stepdown.

### Concurrent Stepdown Attempts

It is possible to have concurrent conditional and unconditional stepdown attempts. In this case,
the unconditional stepdown will supercede the conditional stepdown, which causes the conditional
stepdown attempt to fail.

Because concurrent unconditional stepdowns can cause conditional stepdowns to fail, we stop
accepting writes once we confirm that we are allowed to step down. This way, if our stepdown
attempt fails, we can release the RSTL and allow secondaries to catch up without new writes coming
in.

We try to prevent concurrent conditional stepdown attempts by setting `_leaderMode` to
`kSteppingDown` in the `TopologyCoordinator`. By tracking the current stepdown state, we prevent
another conditional stepdown attempt from occurring, but still allow unconditional attempts to
supersede.

# Rollback

Rollback is the process whereby a node that diverges from its sync source gets back to a consistent
point in time on the sync source's branch of history. We currently support two rollback algorithms,
Recover To A Timestamp (RTT) and Rollback via Refetch. This section will cover the RTT method.

Situations that require rollback can occur due to network partitions. Consider a scenario where a
secondary can no longer hear from the primary and subsequently runs for an election. We now have
two primaries that can both accept writes, creating two different branches of history (one of the
primaries will detect this situation soon and step down). If the smaller half, meaning less than a
majority of the set, accepts writes during this time, those writes will be uncommitted. A node with
uncommitted writes will roll back its changes and roll forward to match its sync source. Note that a
rollback is not necessary if there are no uncommitted writes.

As of 4.0, Replication supports the [`Recover To A Timestamp`](https://github.com/mongodb/mongo/blob/r4.2.0/src/mongo/db/repl/rollback_impl.h#L158)
algorithm (RTT), in which a node recovers to a consistent point in time and applies operations until
it catches up to the sync source's branch of history. RTT uses the WiredTiger storage engine to
recover to a [`stable_timestamp`](#replication-timestamp-glossary), which is the highest timestamp
at which the storage engine can take a checkpoint. This can be considered a consistent, majority
committed point in time for replication and storage.

A node goes into rollback when its last fetched OpTime is greater than its sync source's last
applied OpTime, but it is in a lower term. In this case, the `OplogFetcher` will return an empty
batch and fail with an `OplogStartMissing` error.

During [rollback](https://github.com/mongodb/mongo/blob/r4.2.0/src/mongo/db/repl/rollback_impl.cpp#L176),
nodes first transition to the `ROLLBACK` state and kill all user operations to ensure that we can
successfully acquire [the RSTL](#replication-state-transition-lock). Reads are prohibited while
we are in the `ROLLBACK` state.

We then wait for background index builds to complete before finding the `common point` between the
rolling back node and the sync source node. The `common point` is the OpTime after which the nodes'
oplogs start to differ. During this step, we keep track of the operations that are rolled back up
until the `common point` and update necessary data structures. This includes metadata that we may
write out to rollback files and and use to roll back collection fast-counts. Then, we increment
the Rollback ID (RBID), a monotonically increasing number that is incremented every time a rollback
occurs. We can use the RBID to check if a rollback has occurred on our sync source since the
baseline RBID was set.

Now, we enter the data modification section of the rollback algorithm, which begins with
aborting prepared transactions and ends with reconstructing them at the end. If we fail at any point
during this phase, we must terminate the rollback attempt because we cannot safely recover.

Before we actually recover to the `stableTimestamp`, we must abort the storage transaction of any
prepared transaction. In doing so, we release any resources held by those transactions and
invalidate any in-memory state we recorded.

If `createRollbackDataFiles` was set to `true` (the default), we begin writing rollback files for
our rolled back documents. It is important that we do this after we abort any prepared transactions
in order to avoid unnecessary prepare conflicts when trying to read documents that were modified by
those transactions, which must be aborted for rollback anyway. Finally, if we have rolled back any
operations, we invalidate all sessions on this server.

Now, we are ready to tell the storage engine to recover to the last `stable_timestamp`. Upon
success, the storage engine restores the data reflected in the database to the data reflected at the
last `stable_timestamp`. This does not, however, revert the oplog. In order to revert the oplog,
rollback must remove all oplog entries after the `common point`. This is called the truncate point
and is written into the `oplogTruncateAfterPoint` document. Now, the recovery process knows where to
truncate the oplog on the rollback node.

During the last few steps of the data modification section, we clear the state of the
`DropPendingCollectionReaper`, which manages collections that are marked as drop-pending by the Two
Phase Drop algorithm, and make sure it aligns with what is currently on disk. After doing so, we can
run through the oplog recovery process, which truncates the oplog after the `common point` and
applies all oplog entries through the end of the sync source's oplog. See the
[Startup Recovery](#startup-recovery) section for more information on truncating the oplog and
applying oplog entries.

The last thing we do before exiting the data modification section is
[reconstruct prepared transactions](#recovering-prepared-transactions). We must also restore their
in-memory state to what it was prior to the rollback in order to fulfill the durability guarantees
of prepared transactions.

At this point, the last applied and durable OpTimes still point to the divergent branch of history,
so we must update them to be at the top of the oplog, which should be the `common point`.

Now, we can trigger the rollback `OpObserver` and notify any external subsystems that a rollback has
occurred. For example, the config server must update its shard registry in order to make sure it
does not have data that has just been rolled back. Finally, we log a summary of the rollback process
and transition to the `SECONDARY` state. This transition must succeed if we ever entered the
`ROLLBACK` state in the first place. Otherwise, we shut down.

# Initial Sync

Initial sync is the process that we use to add a new node to a replica set. Initial sync is
initiated by the `ReplicationCoordinator` and done in the
[**`InitialSyncer`**](https://github.com/mongodb/mongo/blob/r4.2.0/src/mongo/db/repl/initial_syncer.h).
When a node begins initial sync, it goes into the `STARTUP2` state. `STARTUP` is reserved for the
time before the node has loaded its local configuration of the replica set.

At a high level, there are two phases to initial sync: the [**data clone phase**](#data-clone-phase)
and the [**oplog application phase**](#oplog-application-phase). During the data clone phase, the
node will copy all of another node's data. After that phase is completed, it will start the oplog
application phase where it will apply all the oplog entries that were written since it started
copying data. Finally, it will reconstruct any transactions in the prepared state.

Before the data clone phase begins, the node will do the following:

1. Set the initial sync flag to record that initial sync is in progress and make it durable. If a
   node restarts while this flag is set, it will restart initial sync even though it may already
   have data because it means that initial sync didn't complete. We also check this flag to prevent
   reading from the oplog while initial sync is in progress.
2. Find a sync source.
3. Drop all of its data except for the local database and recreate the oplog.
4. Get the Rollback ID (RBID) from the sync source to ensure at the end that no rollbacks occurred
   during initial sync.
5. Query its sync source's oplog for its latest OpTime and save it as the
   `defaultBeginFetchingOpTime`. If there are no open transactions on the sync source, this will be
   used as the `beginFetchingTimestamp` or the timestamp that it begins fetching oplog entries from.
6. Query its sync source's transactions table for the oldest starting OpTime of all active
   transactions. If this timestamp exists (meaning there is an open transaction on the sync source)
   this will be used as the `beginFetchingTimestamp`. If this timestamp doesn't exist, the node will
   use the `defaultBeginFetchingOpTime` instead. This will ensure that even if a transaction was
   started on the sync source after it was queried for the oldest active transaction timestamp, the
   syncing node will have all the oplog entries associated with an active transaction in its oplog.
7. Query its sync source's oplog for its lastest OpTime. This will be the `beginApplyingTimestamp`,
   or the timestamp that it begins applying oplog entries at once it has completed the data clone
   phase. If there was no active transaction on the sync source, the `beginFetchingTimestamp` will
   be the same as the `beginApplyingTimestamp`.
8. Create an `OplogFetcher` and start fetching and buffering oplog entries from the sync source
   to be applied later. Operations are buffered to a collection so that they are not limited by the
   amount of memory available.

## Data clone phase

The new node then begins to clone data from its sync source. The `InitialSyncer` constructs an
[`AllDatabaseCloner`](https://github.com/mongodb/mongo/blob/r4.3.2/src/mongo/db/repl/all_database_cloner.h)
that's used to clone all of the databases on the upstream node. The `AllDatabaseCloner` asks the
sync source for a list of its databases and then for each one it creates and runs a
[`DatabaseCloner`](https://github.com/mongodb/mongo/blob/r4.3.2/src/mongo/db/repl/database_cloner.h)
to clone that database. Each `DatabaseCloner` asks the sync source for a list of its collections and
for each one creates and runs a
[`CollectionCloner`](https://github.com/mongodb/mongo/blob/r4.3.2/src/mongo/db/repl/collection_cloner.h)
to clone that collection. The `CollectionCloner` calls `listIndexes` on the sync source and creates
a
[`CollectionBulkLoader`](https://github.com/mongodb/mongo/blob/r4.3.2/src/mongo/db/repl/collection_bulk_loader.h)
to create all of the indexes in parallel with the data cloning. The `CollectionCloner` then uses an
**exhaust cursor** to run a `find` request on the sync source for each collection, inserting the
fetched documents each time, until it fetches all of the documents. Instead of explicitly needing to
run a `getMore` on an open cursor to get the next batch, exhaust cursors make it so that if the
`find` does not exhaust the cursor, the sync source will keep sending batches until there are none
left.

The cloners are resilient to transient errors.  If a cloner encounters an error marked with the
`RetriableError` label in
[`error_codes.yml`](https://github.com/mongodb/mongo/blob/r4.3.2/src/mongo/base/error_codes.yml), it
will retry whatever network operation it was attempting.  It will continue attempting to retry for a
length of time set by the server parameter `initialSyncTransientErrorRetryPeriodSeconds`, after
which it will consider the failure permanent.  A permanent failure means it will choose a new sync
source and retry all of initial sync, up to a number of times set by the server parameter
`numInitialSyncAttempts`.  One notable exception, where we do not retry the entire operation, is for
the actual querying of the collection data.  For querying, we use a feature called **resume
tokens**.  We set a flag on the query: `$_requestResumeToken`.  This causes each batch we receive
from the sync source to contain an opaque token which indicates our current position in the
collection.  After storing a batch of data, we store the most recent resume token in a member
variable of the `CollectionCloner`.  Then, when retrying we provide this resume token in the query,
allowing us to avoid having to re-fetch the parts of the collection we have already stored.

The `initialSyncTransientErrorRetryPeriodSeconds` is also used to control retries for the oplog
fetcher and all network operations in initial sync which take place after the data cloning has
started.

## Oplog application phase

After the cloning phase of initial sync has finished, the oplog application phase begins. The new
node first asks its sync source for its last applied OpTime and this is saved as the
`stopTimestamp`, the oplog entry it must apply before it's consistent and can become a secondary. If
the `beginFetchingTimestamp` is the same as the `stopTimestamp`, then it indicates that there are no
oplog entries that need to be written to the oplog and no operations that need to be applied. In
this case, the node will seed its oplog with the last oplog entry applied on its sync source and
finish initial sync.

Otherwise, the new node iterates through all of the buffered operations, writes them to the oplog,
and if their timestamp is after the `beginApplyingTimestamp`, applies them to the data on disk.
Oplog entries continue to be fetched and added to the buffer while this is occurring.

One notable exception is that the node will not apply `prepareTransaction` oplog entries. Similar
to how we reconstruct prepared transactions in startup and rollback recovery, we will update the
transactions table every time we see a `prepareTransaction` oplog entry. Because the nodes wrote
all oplog entries starting at the `beginFetchingTimestamp` into the oplog, the node will have all
the oplog entries it needs to
[reconstruct the state for all prepared transactions](#recovering-prepared-transactions) after the
oplog application phase is done.

## Idempotency concerns

Some of the operations that are applied may already be reflected in the data that was cloned since
we started buffering oplog entries before the collection cloning phase even started. Consider the
following:

1. Start buffering oplog entries
2. Insert `{a: 1, b: 1}` to collection `foo`
3. Insert `{a: 1, b: 2}` to collection `foo`
4. Drop collection `foo`
5. Recreate collection `foo`
6. Create unique index on field `a` in collection `foo`
7. Clone collection `foo`
8. Start applying oplog entries and try to insert both `{a: 1, b: 1}` and `{a: 1, b: 2}`

As seen here, there can be operations on collections that have since been dropped or indexes could
conflict with the data being added. As a result, many errors that occur here are ignored and assumed
to resolve themselves, such as `DuplicateKey` errors (like in the example above). If known
problematic operations such as `renameCollection` are received, where we cannot assume a drop will
come and fix them, we abort and retry initial sync.

## Finishing initial sync

The oplog application phase concludes when the node applies an oplog entry at `stopTimestamp`. The
node checks its sync source's Rollback ID to see if a rollback occurred and if so, restarts initial
sync. Otherwise, the `InitialSyncer` will begin tear down.

It will register the node's [`lastApplied`](#replication-timestamp-glossary) OpTime with the storage
engine to make sure that all oplog entries prior to that will be visible when querying the oplog.
After that it will reconstruct all prepared transactions. The node will then clear the initial sync
flag and tell the storage engine that the [`initialDataTimestamp`](#replication-timestamp-glossary)
is the node's last applied OpTime. Finally, the `InitialSyncer` shuts down and the
`ReplicationCoordinator` starts steady state replication.

# Startup Recovery

**Startup recovery** is a node's process for putting both the oplog and data into a consistent state
during startup (and happens while the node is in the `STARTUP` state). If a node has an empty or
non-existent oplog, or already has the initial sync flag set when starting up, then it will skip
startup recovery and go through [initial sync](#initial-sync) instead.

If the node already has data, it will go through
[startup recovery](https://github.com/mongodb/mongo/blob/r4.2.0/src/mongo/db/repl/replication_recovery.cpp).
It will first get the **recovery timestamp** from the storage engine, which is the timestamp through
which changes are reflected in the data at startup (and the timestamp used to set the
`initialDataTimestamp`). The recovery timestamp will be a `stable_timestamp` so that the node
recovers from a **stable checkpoint**, which is a durable view of the data at a particular timestamp.
It should be noted that due to journaling, the oplog and many collections in the local database are
an exception and are up-to-date at startup rather than reflecting the recovery timestamp.

If a node went through an unclean shutdown, then it might have been in the middle of applying
parallel writes. Each write is associated with an oplog entry. Primaries perform writes in parallel,
and batch application applies oplog entries in parallel. Since these operations are done in
parallel, they can cause temporary gaps in the oplog from entries that are not yet written, called
**oplog holes**. A node can crash while there are still **oplog holes** on disk.

During startup, a node will not be able to tell which oplog entries were successfully persisted in
the oplog and which were uncommitted on disk and disappeared. A primary may be unknowingly missing
oplog entries that the secondaries already replicated; or a secondary may lose oplog entries that it
thought it had already replicated. This would make the recently crashed node inconsistent with the
rest its replica set. To fix this, after getting the recovery timestamp, the node will truncate its
oplog to a point that it can guarantee does not have any oplog holes using the
[`oplogTruncateAfterPoint`](#replication-timestamp-glossary) document. This document is journaled
and untimestamped so that it will reflect information more recent than the latest stable checkpoint
even after a shutdown.

The `oplogTruncateAfterPoint` can be set in two scenarios. The first is during oplog batch
application. Before writing a batch of oplog entries to the oplog, the node will set the
`oplogTruncateAfterPoint` to the `lastApplied` timestamp. If the node shuts down before it finishes
writing the batch, then during startup recovery the node will truncate the oplog back to the point
saved before the batch application began. If the node successfully finishes writing the batch to the
oplog, it will reset the `oplogTruncateAfterPoint` to null since there are no oplog holes and the
oplog will not need to be truncated if the node restarts.

The second scenario for setting the `oplogTruncateAfterPoint` is while primary. A primary allows
secondaries to replicate one of its oplog entries as soon as there are no oplog holes in-memory
behind the entry. However, secondaries do not have to wait for the oplog entry to make it to disk
on the primary nor for there to be no holes behind it on disk on the primary. Therefore, some
already replicated writes may disappear from the primary if the primary crashes. The primary will
continually update the `oplogTruncateAfterPoint` in order to track and forward the no oplog holes
point on disk, in case of an unclean shutdown. Then startup recovery can take care of any oplog
inconsistency with the rest of the replica set.

After truncating the oplog, the node will see if the recovery timestamp differs from the top of the
newly truncated oplog. If it does, this means that there are oplog entries that must be applied to
make the data consistent with the oplog. The node will apply all the operations starting at the
recovery timestamp through the top of the oplog. The one exception is that it will not apply
`prepareTransaction` oplog entries. Similar to how a node reconstructs prepared transactions during
initial sync and rollback, the node will update the transactions table every time it sees a
`prepareTransaction` oplog entry. Once the node has finished applying all the oplog entries through
the top of the oplog, it will [reconstruct](#recovering-prepared-transactions) all transactions
still in the prepare state.

Finally, the node will finish loading the replica set configuration, set its `lastApplied` and
`lastDurable` timestamps to the top of the oplog and start steady state replication.

# Dropping Collections and Databases

In 3.6, the Two Phase Drop Algorithm was added in the replication layer for supporting collection
and database drops. It made it easy to support rollbacks for drop operations. In 4.2, the
implementation for collection drops was moved to the storage engine. This section will cover the
behavior for the implementation in the replication layer, which currently runs on nodes where
[`enableMajorityReadConcern=false`](#enableMajorityReadConcern-flag).

## Dropping Collections

Dropping an unreplicated collection happens immediately. However, the process for dropping a
replicated collection requires two phases.

In the first phase, if the node is the primary, it will write a "dropCollection" oplog entry. The
collection will be flagged as dropped by being added to a list in the `DropPendingCollectionReaper`
(along with its OpTime), but the storage engine won't delete the collection data yet. Every time the
`ReplicationCoordinator` advances the commit point, the node will check to see if any drop's OpTime
is before or at the majority commit point. If any are, those drops will then move to phase 2 and
the `DropPendingCollectionReaper` will tell the storage engine to drop the collection.

By waiting until the "dropCollection" oplog entry is majority committed to drop the collection, it
guarantees that only drops in phase 1 can be rolled back. This means that the storage engine will
still have the collection's data and in the case of a rollback, it can then easily restore the
collection.

## Dropping Databases

When a node receives a `dropDatabase` command, it will initiate a Two Phase Drop as described above
for each collection in the relevant database. Once all collection drops are replicated to a majority
of nodes, the node will drop the now empty database and a `dropDatabase` command oplog entry is
written to the oplog.

# Replication Timestamp Glossary

In this section, when we refer to the word "transaction" without any other qualifier, we are talking
about a storage transaction. Transactions in the replication layer will be referred to as
multi-document or prepared transactions.

**`all_durable`**: All transactions with timestamps earlier than the `all_durable` timestamp are
committed. This is the point at which the oplog has no gaps, which are created when we reserve
timestamps before executing the associated write. Since this timestamp is used to maintain the oplog
visibility point, it is important that all operations up to and including this timestamp are
committed and durable on disk. This is so that we can replicate the oplog without any gaps.

**`commit oplog entry timestamp`**: The timestamp of the ‘commitTransaction’ oplog entry for a
prepared transaction, or the timestamp of the ‘applyOps’ oplog entry for a non-prepared transaction.
In a cross-shard transaction each shard may have a different commit oplog entry timestamp. This is
guaranteed to be greater than the `prepareTimestamp`.

**`commitTimestamp`**: The timestamp at which we committed a multi-document transaction. This will
be the `commitTimestamp` field in the `commitTransaction` oplog entry for a prepared transaction, or
the timestamp of the ‘applyOps’ oplog entry for a non-prepared transaction. In a cross-shard
transaction this timestamp is the same across all shards. The effects of the transaction are visible
as of this timestamp. Note that `commitTimestamp` and the `commit oplog entry timestamp` are the
same for non-prepared transactions because we do not write down the oplog entry until we commit the
transaction. For a prepared transaction, we have the following guarantee: `prepareTimestamp` <=
`commitTimestamp` <= `commit oplog entry timestamp`

**`currentCommittedSnapshot`**: An optime maintained in `ReplicationCoordinator` that is used to
serve majority reads and is always guaranteed to be <= `lastCommittedOpTime`. When `eMRC=true`, this
is currently set to the stable optime, which is guaranteed to be in a node’s oplog. Since it is
reset every time we recalculate the stable optime, it will also be up to date.

When `eMRC=false`, this is set to the `lastCommittedOpTime`, so it may not be in the node’s oplog.
The `stable_timestamp` is not allowed to advance past the `all_durable`. So, this value shouldn’t
be ahead of `all_durable` unless `eMRC=false`.

**`initialDataTimestamp`**: A timestamp used to indicate the timestamp at which history “begins”.
When a node comes out of initial sync, we inform the storage engine that the `initialDataTimestamp`
is the node's `lastApplied`.

By setting this value to 0, it informs the storage engine to take unstable checkpoints. Stable
checkpoints can be viewed as timestamped reads that persist the data they read into a checkpoint.
Unstable checkpoints simply open a transaction and read all data that is currently committed at the
time the transaction is opened. They read a consistent snapshot of data, but the snapshot they read
from is not associated with any particular timestamp.

**`lastApplied`**: In-memory record of the latest applied oplog entry optime. It may lag behind the
optime of the newest oplog entry that is visible in the storage engine because it is updated after
a storage transaction commits.

**`lastCommittedOpTime`**: A node’s local view of the latest majority committed optime. Every time
we update this optime, we also recalculate the `stable_timestamp`. Note that the
`lastCommittedOpTime` can advance beyond a node's `lastApplied` if it has not yet replicated the
most recent majority committed oplog entry. For more information about how the `lastCommittedOpTime`
is updated and propagated, please see [Commit Point Propagation](#commit-point-propagation).

**`lastDurable`**: Optime of either the latest oplog entry (non-primary) or the latest no oplog
holes point (primary) that has been flushed to the journal. It is asynchronously updated by the
storage engine as new writes become durable. Default journaling frequency is 100ms.

**`oldest_timestamp`**: The earliest timestamp that the storage engine is guaranteed to have history
for. New transactions can never start a timestamp earlier than this timestamp. Since we advance this
as we advance the `stable_timestamp`, it will be less than or equal to the `stable_timestamp`.

**`oplogTruncateAfterPoint`**: Tracks the latest no oplog holes point. On primaries, it is updated
by the storage engine prior to flushing the journal to disk. During oplog batch application, it is
set at the start of the batch and cleared at the end of batch application. Startup recovery will use
the `oplogTruncateAfterPoint` to truncate the oplog back to an oplog point consistent with the rest
of the replica set: other nodes may have replicated in-memory data that a crashed node no longer has
and is unaware that it lacks.

**`prepareTimestamp`**: The timestamp of the ‘prepare’ oplog entry for a prepared transaction. This
is the earliest timestamp at which it is legal to commit the transaction. This timestamp is provided
to the storage engine to block reads that are trying to read prepared data until the storage engines
knows whether the prepared transaction has committed or aborted.

**`readConcernMajorityOpTime`**: Exposed in replSetGetStatus as “readConcernMajorityOpTime” but is
populated internally from the `currentCommittedSnapshot` timestamp inside `ReplicationCoordinator`.

**`stable_timestamp`**: The newest timestamp at which the storage engine is allowed to take a
checkpoint, which can be thought of as a consistent snapshot of the data. Replication informs the
storage engine of where it is safe to take its next checkpoint. This timestamp is guaranteed to be
majority committed so that RTT rollback can use it. In the case when 
[`eMRC=false`](#enableMajorityReadConcern-flag), the stable timestamp may not be majority committed,
which is why we must use the Rollback via Refetch rollback algorithm.

This timestamp is also required to increase monotonically except when `eMRC=false`, where in a
special case during rollback it is possible for the `stableTimestamp` to move backwards.