summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/keys_collection_cache.cpp10
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/db/repl/apply_ops.cpp6
-rw-r--r--src/mongo/db/repl/apply_ops.h4
-rw-r--r--src/mongo/db/repl/initial_syncer.cpp14
-rw-r--r--src/mongo/db/repl/initial_syncer_test.cpp40
-rw-r--r--src/mongo/db/repl/oplog_fetcher.cpp11
-rw-r--r--src/mongo/db/repl/oplog_fetcher.h8
-rw-r--r--src/mongo/db/repl/oplog_fetcher_test.cpp33
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp5
-rw-r--r--src/mongo/db/repl/replication_coordinator_test_fixture.cpp4
-rw-r--r--src/mongo/db/repl/replication_recovery.cpp44
-rw-r--r--src/mongo/db/repl/replication_recovery.h9
-rw-r--r--src/mongo/db/repl/replication_recovery_mock.h2
-rw-r--r--src/mongo/db/repl/rollback_impl.cpp3
-rw-r--r--src/mongo/db/repl/storage_interface.cpp1
-rw-r--r--src/mongo/db/repl/sync_tail_test.cpp4
-rw-r--r--src/mongo/db/repl/sync_tail_test_fixture.cpp21
-rw-r--r--src/mongo/db/repl/sync_tail_test_fixture.h2
-rw-r--r--src/mongo/db/repl/transaction_oplog_application.cpp94
-rw-r--r--src/mongo/db/repl/transaction_oplog_application.h10
22 files changed, 215 insertions, 112 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 815de89dce9..98a730ab905 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -1790,6 +1790,7 @@ env.Library(
'keys_collection_document',
'logical_time',
'keys_collection_client_sharded',
+ 'repl/repl_coordinator_interface',
],
)
diff --git a/src/mongo/db/keys_collection_cache.cpp b/src/mongo/db/keys_collection_cache.cpp
index ea63be756fc..f4cff0dc905 100644
--- a/src/mongo/db/keys_collection_cache.cpp
+++ b/src/mongo/db/keys_collection_cache.cpp
@@ -33,6 +33,7 @@
#include "mongo/db/keys_collection_client.h"
#include "mongo/db/keys_collection_document.h"
+#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/util/str.h"
namespace mongo {
@@ -55,6 +56,15 @@ StatusWith<KeysCollectionDocument> KeysCollectionCache::refresh(OperationContext
originalSize = _cache.size();
}
+ // Don't allow this to read during initial sync because it will read at the initialDataTimestamp
+ // and that could conflict with reconstructing prepared transactions using the
+ // initialDataTimestamp as the prepareTimestamp.
+ if (repl::ReplicationCoordinator::get(opCtx) &&
+ repl::ReplicationCoordinator::get(opCtx)->getMemberState().startup2()) {
+ return {ErrorCodes::InitialSyncActive,
+ "Cannot refresh keys collection cache during initial sync"};
+ }
+
auto refreshStatus = _client->getNewKeys(opCtx, _purpose, newerThanThis);
if (!refreshStatus.isOK()) {
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 1ca6913e8da..c206077d0f5 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -1559,6 +1559,7 @@ env.Library(
'database_cloner',
'databases_cloner',
'multiapplier',
+ 'oplog',
'oplog_application_interface',
'oplog_buffer_blocking_queue',
'oplog_entry',
diff --git a/src/mongo/db/repl/apply_ops.cpp b/src/mongo/db/repl/apply_ops.cpp
index 926da2a8d22..5938c6c2615 100644
--- a/src/mongo/db/repl/apply_ops.cpp
+++ b/src/mongo/db/repl/apply_ops.cpp
@@ -478,10 +478,12 @@ Status applyApplyOpsOplogEntry(OperationContext* opCtx,
&resultWeDontCareAbout);
}
-Status applyRecoveredPrepareApplyOpsOplogEntry(OperationContext* opCtx, const OplogEntry& entry) {
+Status applyRecoveredPrepareApplyOpsOplogEntry(OperationContext* opCtx,
+ const OplogEntry& entry,
+ repl::OplogApplication::Mode mode) {
// We might replay a prepared transaction behind oldest timestamp.
opCtx->recoveryUnit()->setRoundUpPreparedTimestamps(true);
- return _applyPrepareTransaction(opCtx, entry, OplogApplication::Mode::kRecovering);
+ return _applyPrepareTransaction(opCtx, entry, mode);
}
Status applyOps(OperationContext* opCtx,
diff --git a/src/mongo/db/repl/apply_ops.h b/src/mongo/db/repl/apply_ops.h
index 94ff4dfa0e5..7cbaa433a87 100644
--- a/src/mongo/db/repl/apply_ops.h
+++ b/src/mongo/db/repl/apply_ops.h
@@ -115,6 +115,8 @@ Status applyApplyOpsOplogEntry(OperationContext* opCtx,
/**
* Called from recovery to apply an 'applyOps' oplog entry that prepares a transaction.
*/
-Status applyRecoveredPrepareApplyOpsOplogEntry(OperationContext* opCtx, const OplogEntry& entry);
+Status applyRecoveredPrepareApplyOpsOplogEntry(OperationContext* opCtx,
+ const OplogEntry& entry,
+ repl::OplogApplication::Mode mode);
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp
index 9bfdfbd13cd..4733e69ea10 100644
--- a/src/mongo/db/repl/initial_syncer.cpp
+++ b/src/mongo/db/repl/initial_syncer.cpp
@@ -58,6 +58,7 @@
#include "mongo/db/repl/replication_process.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/repl/sync_source_selector.h"
+#include "mongo/db/repl/transaction_oplog_application.h"
#include "mongo/db/session_txn_record_gen.h"
#include "mongo/executor/task_executor.h"
#include "mongo/executor/thread_pool_task_executor.h"
@@ -433,19 +434,22 @@ void InitialSyncer::_tearDown_inlock(OperationContext* opCtx,
return;
}
const auto lastAppliedOpTime = lastApplied.getValue().opTime;
+ auto initialDataTimestamp = lastAppliedOpTime.getTimestamp();
// A node coming out of initial sync must guarantee at least one oplog document is visible
- // such that others can sync from this node. Oplog visibility is not rigorously advanced
- // during initial sync. Correct the visibility to match the initial sync time before
- // transitioning to steady state replication.
+ // such that others can sync from this node. Oplog visibility is only advanced when applying
+ // oplog entries during initial sync. Correct the visibility to match the initial sync time
+ // before transitioning to steady state replication.
const bool orderedCommit = true;
- _storage->oplogDiskLocRegister(opCtx, lastAppliedOpTime.getTimestamp(), orderedCommit);
+ _storage->oplogDiskLocRegister(opCtx, initialDataTimestamp, orderedCommit);
+
+ reconstructPreparedTransactions(opCtx, repl::OplogApplication::Mode::kInitialSync);
_replicationProcess->getConsistencyMarkers()->clearInitialSyncFlag(opCtx);
// All updates that represent initial sync must be completed before setting the initial data
// timestamp.
- _storage->setInitialDataTimestamp(opCtx->getServiceContext(), lastAppliedOpTime.getTimestamp());
+ _storage->setInitialDataTimestamp(opCtx->getServiceContext(), initialDataTimestamp);
auto currentLastAppliedOpTime = _opts.getMyLastOptime();
if (currentLastAppliedOpTime.isNull()) {
diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp
index 2eb1d27e5ef..62b6c7902d6 100644
--- a/src/mongo/db/repl/initial_syncer_test.cpp
+++ b/src/mongo/db/repl/initial_syncer_test.cpp
@@ -1849,6 +1849,11 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughOplogFetcherCallbackError) {
TEST_F(InitialSyncerTest,
InitialSyncerSucceedsOnEarlyOplogFetcherCompletionIfThereAreNoOperationsToApply) {
+ // Skip reconstructing prepared transactions at the end of initial sync because
+ // InitialSyncerTest does not construct ServiceEntryPoint and this causes a segmentation fault
+ // when reconstructPreparedTransactions uses DBDirectClient to call into ServiceEntryPoint.
+ FailPointEnableBlock skipReconstructPreparedTransactions("skipReconstructPreparedTransactions");
+
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
@@ -1912,6 +1917,11 @@ TEST_F(InitialSyncerTest,
TEST_F(
InitialSyncerTest,
InitialSyncerSucceedsOnEarlyOplogFetcherCompletionIfThereAreEnoughOperationsInTheOplogBufferToReachEndTimestamp) {
+ // Skip reconstructing prepared transactions at the end of initial sync because
+ // InitialSyncerTest does not construct ServiceEntryPoint and this causes a segmentation fault
+ // when reconstructPreparedTransactions uses DBDirectClient to call into ServiceEntryPoint.
+ FailPointEnableBlock skipReconstructPreparedTransactions("skipReconstructPreparedTransactions");
+
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
@@ -3143,6 +3153,11 @@ TEST_F(InitialSyncerTest,
}
TEST_F(InitialSyncerTest, LastOpTimeShouldBeSetEvenIfNoOperationsAreAppliedAfterCloning) {
+ // Skip reconstructing prepared transactions at the end of initial sync because
+ // InitialSyncerTest does not construct ServiceEntryPoint and this causes a segmentation fault
+ // when reconstructPreparedTransactions uses DBDirectClient to call into ServiceEntryPoint.
+ FailPointEnableBlock skipReconstructPreparedTransactions("skipReconstructPreparedTransactions");
+
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
@@ -3847,6 +3862,11 @@ void InitialSyncerTest::doSuccessfulInitialSyncWithOneBatch(bool shouldSetFCV) {
TEST_F(InitialSyncerTest,
InitialSyncerReturnsLastAppliedOnReachingStopTimestampAfterApplyingOneBatch) {
+ // Skip reconstructing prepared transactions at the end of initial sync because
+ // InitialSyncerTest does not construct ServiceEntryPoint and this causes a segmentation fault
+ // when reconstructPreparedTransactions uses DBDirectClient to call into ServiceEntryPoint.
+ FailPointEnableBlock skipReconstructPreparedTransactions("skipReconstructPreparedTransactions");
+
// Tell test to setFCV=4.2 before the last rollback ID check.
// _rollbackCheckerCheckForRollbackCallback() calls upgradeNonReplicatedUniqueIndexes
// only if fCV is 4.2.
@@ -3859,6 +3879,11 @@ TEST_F(InitialSyncerTest,
TEST_F(InitialSyncerTest,
InitialSyncerReturnsLastAppliedOnReachingStopTimestampAfterApplyingMultipleBatches) {
+ // Skip reconstructing prepared transactions at the end of initial sync because
+ // InitialSyncerTest does not construct ServiceEntryPoint and this causes a segmentation fault
+ // when reconstructPreparedTransactions uses DBDirectClient to call into ServiceEntryPoint.
+ FailPointEnableBlock skipReconstructPreparedTransactions("skipReconstructPreparedTransactions");
+
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
@@ -3966,6 +3991,11 @@ TEST_F(InitialSyncerTest,
TEST_F(
InitialSyncerTest,
InitialSyncerSchedulesLastOplogEntryFetcherToGetNewStopTimestampIfMissingDocumentsHaveBeenFetchedDuringMultiInitialSyncApply) {
+ // Skip reconstructing prepared transactions at the end of initial sync because
+ // InitialSyncerTest does not construct ServiceEntryPoint and this causes a segmentation fault
+ // when reconstructPreparedTransactions uses DBDirectClient to call into ServiceEntryPoint.
+ FailPointEnableBlock skipReconstructPreparedTransactions("skipReconstructPreparedTransactions");
+
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
@@ -4142,6 +4172,11 @@ TEST_F(InitialSyncerTest, OplogOutOfOrderOnOplogFetchFinish) {
}
TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) {
+ // Skip reconstructing prepared transactions at the end of initial sync because
+ // InitialSyncerTest does not construct ServiceEntryPoint and this causes a segmentation fault
+ // when reconstructPreparedTransactions uses DBDirectClient to call into ServiceEntryPoint.
+ FailPointEnableBlock skipReconstructPreparedTransactions("skipReconstructPreparedTransactions");
+
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
ASSERT_OK(ServerParameterSet::getGlobal()
@@ -4494,6 +4529,11 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressOmitsClonerStatsIfClonerStatsExc
}
TEST_F(InitialSyncerTest, InitialSyncerDoesNotCallUpgradeNonReplicatedUniqueIndexesOnFCV40) {
+ // Skip reconstructing prepared transactions at the end of initial sync because
+ // InitialSyncerTest does not construct ServiceEntryPoint and this causes a segmentation fault
+ // when reconstructPreparedTransactions uses DBDirectClient to call into ServiceEntryPoint.
+ FailPointEnableBlock skipReconstructPreparedTransactions("skipReconstructPreparedTransactions");
+
// In MongoDB 4.2, upgradeNonReplicatedUniqueIndexes will only be called if fCV is 4.2.
doSuccessfulInitialSyncWithOneBatch(false);
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp
index a432da62bb4..2a98c73b262 100644
--- a/src/mongo/db/repl/oplog_fetcher.cpp
+++ b/src/mongo/db/repl/oplog_fetcher.cpp
@@ -252,7 +252,10 @@ StatusWith<boost::optional<rpc::OplogQueryMetadata>> parseOplogQueryMetadata(
} // namespace
StatusWith<OplogFetcher::DocumentsInfo> OplogFetcher::validateDocuments(
- const Fetcher::Documents& documents, bool first, Timestamp lastTS) {
+ const Fetcher::Documents& documents,
+ bool first,
+ Timestamp lastTS,
+ StartingPoint startingPoint) {
if (first && documents.empty()) {
return Status(ErrorCodes::OplogStartMissing,
str::stream() << "The first batch of oplog entries is empty, but expected at "
@@ -300,7 +303,7 @@ StatusWith<OplogFetcher::DocumentsInfo> OplogFetcher::validateDocuments(
// These numbers are for the documents we will apply.
info.toApplyDocumentCount = documents.size();
info.toApplyDocumentBytes = info.networkDocumentBytes;
- if (first) {
+ if (first && startingPoint == StartingPoint::kSkipFirstDoc) {
// The count is one less since the first document found was already applied ($gte $ts query)
// and we will not apply it again.
--info.toApplyDocumentCount;
@@ -466,8 +469,8 @@ StatusWith<BSONObj> OplogFetcher::_onSuccessfulBatch(const Fetcher::QueryRespons
}
}
- auto validateResult =
- OplogFetcher::validateDocuments(documents, queryResponse.first, lastFetched.getTimestamp());
+ auto validateResult = OplogFetcher::validateDocuments(
+ documents, queryResponse.first, lastFetched.getTimestamp(), _startingPoint);
if (!validateResult.isOK()) {
return validateResult.getStatus();
}
diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h
index 10b7d26e00a..398ac362377 100644
--- a/src/mongo/db/repl/oplog_fetcher.h
+++ b/src/mongo/db/repl/oplog_fetcher.h
@@ -114,9 +114,11 @@ public:
* query.
* On success, returns statistics on operations.
*/
- static StatusWith<DocumentsInfo> validateDocuments(const Fetcher::Documents& documents,
- bool first,
- Timestamp lastTS);
+ static StatusWith<DocumentsInfo> validateDocuments(
+ const Fetcher::Documents& documents,
+ bool first,
+ Timestamp lastTS,
+ StartingPoint startingPoint = StartingPoint::kSkipFirstDoc);
/**
* Invariants if validation fails on any of the provided arguments.
diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp
index 233735db58e..1550d0882ed 100644
--- a/src/mongo/db/repl/oplog_fetcher_test.cpp
+++ b/src/mongo/db/repl/oplog_fetcher_test.cpp
@@ -930,8 +930,9 @@ TEST_F(OplogFetcherTest, ValidateDocumentsReturnsOutOfOrderIfTimestampInThirdEnt
.getStatus());
}
-TEST_F(OplogFetcherTest,
- ValidateDocumentsExcludesFirstDocumentInApplyCountAndBytesIfProcessingFirstBatch) {
+TEST_F(
+ OplogFetcherTest,
+ ValidateDocumentsExcludesFirstDocumentInApplyCountAndBytesIfProcessingFirstBatchAndSkipFirstDoc) {
auto firstEntry = makeNoopOplogEntry(Seconds(123));
auto secondEntry = makeNoopOplogEntry(Seconds(456));
auto thirdEntry = makeNoopOplogEntry(Seconds(789));
@@ -939,11 +940,37 @@ TEST_F(OplogFetcherTest,
auto info = unittest::assertGet(OplogFetcher::validateDocuments(
{firstEntry, secondEntry, thirdEntry},
true,
- unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp()));
+ unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp(),
+ mongo::repl::OplogFetcher::StartingPoint::kSkipFirstDoc));
+
+ ASSERT_EQUALS(3U, info.networkDocumentCount);
+ ASSERT_EQUALS(2U, info.toApplyDocumentCount);
+ ASSERT_EQUALS(size_t(firstEntry.objsize() + secondEntry.objsize() + thirdEntry.objsize()),
+ info.networkDocumentBytes);
+ ASSERT_EQUALS(size_t(secondEntry.objsize() + thirdEntry.objsize()), info.toApplyDocumentBytes);
+
+ ASSERT_EQUALS(unittest::assertGet(OpTime::parseFromOplogEntry(thirdEntry)), info.lastDocument);
+}
+
+TEST_F(
+ OplogFetcherTest,
+ ValidateDocumentsIncludesFirstDocumentInApplyCountAndBytesIfProcessingFirstBatchAndEnqueueFirstDoc) {
+ auto firstEntry = makeNoopOplogEntry(Seconds(123));
+ auto secondEntry = makeNoopOplogEntry(Seconds(456));
+ auto thirdEntry = makeNoopOplogEntry(Seconds(789));
+
+ auto info = unittest::assertGet(OplogFetcher::validateDocuments(
+ {firstEntry, secondEntry, thirdEntry},
+ true,
+ unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp(),
+ mongo::repl::OplogFetcher::StartingPoint::kEnqueueFirstDoc));
ASSERT_EQUALS(3U, info.networkDocumentCount);
+ ASSERT_EQUALS(3U, info.toApplyDocumentCount);
ASSERT_EQUALS(size_t(firstEntry.objsize() + secondEntry.objsize() + thirdEntry.objsize()),
info.networkDocumentBytes);
+ ASSERT_EQUALS(size_t(firstEntry.objsize() + secondEntry.objsize() + thirdEntry.objsize()),
+ info.toApplyDocumentBytes);
ASSERT_EQUALS(unittest::assertGet(OpTime::parseFromOplogEntry(thirdEntry)), info.lastDocument);
}
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 3353c3cd506..df7e2ca7c44 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -70,6 +70,7 @@
#include "mongo/db/repl/rslog.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/repl/topology_coordinator.h"
+#include "mongo/db/repl/transaction_oplog_application.h"
#include "mongo/db/repl/update_position_args.h"
#include "mongo/db/repl/vote_requester.h"
#include "mongo/db/server_options.h"
@@ -517,7 +518,7 @@ bool ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* opCtx)
// Read the last op from the oplog after cleaning up any partially applied batches.
const auto stableTimestamp = boost::none;
_replicationProcess->getReplicationRecovery()->recoverFromOplog(opCtx, stableTimestamp);
- _replicationProcess->getReplicationRecovery()->reconstructPreparedTransactions(opCtx);
+ reconstructPreparedTransactions(opCtx, OplogApplication::Mode::kRecovering);
const auto lastOpTimeAndWallTimeResult = _externalState->loadLastOpTimeAndWallTime(opCtx);
@@ -791,7 +792,7 @@ void ReplicationCoordinatorImpl::startup(OperationContext* opCtx) {
// for the recoveryTimestamp just like on replica set recovery.
const auto stableTimestamp = boost::none;
_replicationProcess->getReplicationRecovery()->recoverFromOplog(opCtx, stableTimestamp);
- _replicationProcess->getReplicationRecovery()->reconstructPreparedTransactions(opCtx);
+ reconstructPreparedTransactions(opCtx, OplogApplication::Mode::kRecovering);
warning() << "Setting mongod to readOnly mode as a result of specifying "
"'recoverFromOplogAsStandalone'.";
diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
index 5ee9f020ebe..43f65feb274 100644
--- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
+++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
@@ -171,6 +171,10 @@ void ReplCoordTest::init(const std::string& replSet) {
}
void ReplCoordTest::start() {
+ // Skip reconstructing prepared transactions at the end of startup because ReplCoordTest doesn't
+ // construct ServiceEntryPoint and this causes a segmentation fault when
+ // reconstructPreparedTransactions uses DBDirectClient to call into ServiceEntryPoint.
+ FailPointEnableBlock skipReconstructPreparedTransactions("skipReconstructPreparedTransactions");
invariant(!_callShutdown);
// if we haven't initialized yet, do that first.
if (!_repl) {
diff --git a/src/mongo/db/repl/replication_recovery.cpp b/src/mongo/db/repl/replication_recovery.cpp
index 41df37761a9..3232e7480f8 100644
--- a/src/mongo/db/repl/replication_recovery.cpp
+++ b/src/mongo/db/repl/replication_recovery.cpp
@@ -276,50 +276,6 @@ void ReplicationRecoveryImpl::recoverFromOplog(OperationContext* opCtx,
std::terminate();
}
-void ReplicationRecoveryImpl::reconstructPreparedTransactions(OperationContext* opCtx) {
- DBDirectClient client(opCtx);
- const auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace,
- {BSON("state"
- << "prepared")});
-
- // Iterate over each entry in the transactions table that has a prepared transaction.
- while (cursor->more()) {
- const auto txnRecordObj = cursor->next();
- const auto txnRecord = SessionTxnRecord::parse(
- IDLParserErrorContext("recovering prepared transaction"), txnRecordObj);
-
- invariant(txnRecord.getState() == DurableTxnStateEnum::kPrepared);
-
- // Get the prepareTransaction oplog entry corresponding to this transactions table entry.
- invariant(!opCtx->recoveryUnit()->getPointInTimeReadTimestamp());
- const auto prepareOpTime = txnRecord.getLastWriteOpTime();
- invariant(!prepareOpTime.isNull());
- TransactionHistoryIterator iter(prepareOpTime);
- invariant(iter.hasNext());
- const auto prepareOplogEntry = iter.next(opCtx);
-
- {
- // Make a new opCtx so that we can set the lsid when applying the prepare transaction
- // oplog entry.
- auto newClient =
- opCtx->getServiceContext()->makeClient("reconstruct-prepared-transactions");
- AlternativeClientRegion acr(newClient);
- const auto newOpCtx = cc().makeOperationContext();
- repl::UnreplicatedWritesBlock uwb(newOpCtx.get());
-
- // Snapshot transaction can never conflict with the PBWM lock.
- newOpCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(false);
-
- // TODO: SERVER-40177 This should be removed once it is guaranteed operations applied on
- // recovering nodes cannot encounter unnecessary prepare conflicts.
- newOpCtx->recoveryUnit()->setIgnorePrepared(true);
-
- // Checks out the session, applies the operations and prepares the transactions.
- uassertStatusOK(applyRecoveredPrepareTransaction(newOpCtx.get(), prepareOplogEntry));
- }
- }
-}
-
void ReplicationRecoveryImpl::_recoverFromStableTimestamp(OperationContext* opCtx,
Timestamp stableTimestamp,
OpTime appliedThrough,
diff --git a/src/mongo/db/repl/replication_recovery.h b/src/mongo/db/repl/replication_recovery.h
index f04a651f9fe..0261fb5a74d 100644
--- a/src/mongo/db/repl/replication_recovery.h
+++ b/src/mongo/db/repl/replication_recovery.h
@@ -55,13 +55,6 @@ public:
*/
virtual void recoverFromOplog(OperationContext* opCtx,
boost::optional<Timestamp> stableTimestamp) = 0;
-
- /**
- * Reconstruct prepared transactions by iterating over the transactions table to see which
- * transactions should be in the prepared state, getting the corresponding oplog entry and
- * applying the operations.
- */
- virtual void reconstructPreparedTransactions(OperationContext* opCtx) = 0;
};
class ReplicationRecoveryImpl : public ReplicationRecovery {
@@ -75,8 +68,6 @@ public:
void recoverFromOplog(OperationContext* opCtx,
boost::optional<Timestamp> stableTimestamp) override;
- void reconstructPreparedTransactions(OperationContext* opCtx) override;
-
private:
/**
* After truncating the oplog, completes recovery if we're recovering from a stable timestamp
diff --git a/src/mongo/db/repl/replication_recovery_mock.h b/src/mongo/db/repl/replication_recovery_mock.h
index b6d6bf4b572..77835215ca3 100644
--- a/src/mongo/db/repl/replication_recovery_mock.h
+++ b/src/mongo/db/repl/replication_recovery_mock.h
@@ -44,8 +44,6 @@ public:
void recoverFromOplog(OperationContext* opCtx,
boost::optional<Timestamp> stableTimestamp) override {}
-
- void reconstructPreparedTransactions(OperationContext* opCtx) override {}
};
} // namespace repl
diff --git a/src/mongo/db/repl/rollback_impl.cpp b/src/mongo/db/repl/rollback_impl.cpp
index 0bf542ea04b..dd3cb7fcfc0 100644
--- a/src/mongo/db/repl/rollback_impl.cpp
+++ b/src/mongo/db/repl/rollback_impl.cpp
@@ -53,6 +53,7 @@
#include "mongo/db/repl/replication_process.h"
#include "mongo/db/repl/roll_back_local_operations.h"
#include "mongo/db/repl/storage_interface.h"
+#include "mongo/db/repl/transaction_oplog_application.h"
#include "mongo/db/s/shard_identity_rollback_notifier.h"
#include "mongo/db/s/type_shard_identity.h"
#include "mongo/db/server_recovery.h"
@@ -328,7 +329,7 @@ Status RollbackImpl::runRollback(OperationContext* opCtx) {
// transactions were aborted (i.e. the in-memory counts were rolled-back) before computing
// collection counts, reconstruct the prepared transactions now, adding on any additional counts
// to the now corrected record store.
- _replicationProcess->getReplicationRecovery()->reconstructPreparedTransactions(opCtx);
+ reconstructPreparedTransactions(opCtx, OplogApplication::Mode::kRecovering);
// At this point, the last applied and durable optimes on this node still point to ops on
// the divergent branch of history. We therefore update the last optimes to the top of the
diff --git a/src/mongo/db/repl/storage_interface.cpp b/src/mongo/db/repl/storage_interface.cpp
index f7d4efb4354..093c8890747 100644
--- a/src/mongo/db/repl/storage_interface.cpp
+++ b/src/mongo/db/repl/storage_interface.cpp
@@ -56,7 +56,6 @@ StorageInterface* StorageInterface::get(OperationContext* opCtx) {
return get(opCtx->getClient()->getServiceContext());
}
-
void StorageInterface::set(ServiceContext* service, std::unique_ptr<StorageInterface> storage) {
auto& storageInterface = getStorageInterface(service);
storageInterface = std::move(storage);
diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp
index 7d7e330322a..646dc45d764 100644
--- a/src/mongo/db/repl/sync_tail_test.cpp
+++ b/src/mongo/db/repl/sync_tail_test.cpp
@@ -1685,7 +1685,7 @@ DEATH_TEST_F(SyncTailTest,
OplogApplier::Options options;
SyncTail syncTail(nullptr, // observer. not required by oplogApplication().
_consistencyMarkers.get(),
- _storageInterface.get(),
+ getStorageInterface(),
applyOperationFn,
writerPool.get(),
options);
@@ -2769,8 +2769,6 @@ TEST_F(IdempotencyTest, CommitPreparedTransactionDataPartiallyApplied) {
}
TEST_F(IdempotencyTest, AbortPreparedTransaction) {
- // TODO: SERVER-36492 Fix this test
- return;
createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace);
auto uuid = createCollectionWithUuid(_opCtx.get(), nss);
auto lsid = makeLogicalSessionId(_opCtx.get());
diff --git a/src/mongo/db/repl/sync_tail_test_fixture.cpp b/src/mongo/db/repl/sync_tail_test_fixture.cpp
index 9ef501f89b9..045b1b00bc8 100644
--- a/src/mongo/db/repl/sync_tail_test_fixture.cpp
+++ b/src/mongo/db/repl/sync_tail_test_fixture.cpp
@@ -107,17 +107,18 @@ OplogApplier::Options SyncTailTest::makeRecoveryOptions() {
void SyncTailTest::setUp() {
ServiceContextMongoDTest::setUp();
- auto service = getServiceContext();
+ serviceContext = getServiceContext();
_opCtx = cc().makeOperationContext();
- ReplicationCoordinator::set(service, stdx::make_unique<ReplicationCoordinatorMock>(service));
+ ReplicationCoordinator::set(serviceContext,
+ stdx::make_unique<ReplicationCoordinatorMock>(serviceContext));
ASSERT_OK(ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_PRIMARY));
- _storageInterface = stdx::make_unique<StorageInterfaceImpl>();
+ StorageInterface::set(serviceContext, stdx::make_unique<StorageInterfaceImpl>());
DropPendingCollectionReaper::set(
- service, stdx::make_unique<DropPendingCollectionReaper>(_storageInterface.get()));
- repl::setOplogCollectionName(service);
+ serviceContext, stdx::make_unique<DropPendingCollectionReaper>(getStorageInterface()));
+ repl::setOplogCollectionName(serviceContext);
repl::createOplog(_opCtx.get());
_consistencyMarkers = stdx::make_unique<ReplicationConsistencyMarkersMock>();
@@ -125,7 +126,7 @@ void SyncTailTest::setUp() {
// Set up an OpObserver to track the documents SyncTail inserts.
auto opObserver = std::make_unique<SyncTailOpObserver>();
_opObserver = opObserver.get();
- auto opObserverRegistry = dynamic_cast<OpObserverRegistry*>(service->getOpObserver());
+ auto opObserverRegistry = dynamic_cast<OpObserverRegistry*>(serviceContext->getOpObserver());
opObserverRegistry->addObserver(std::move(opObserver));
// Initialize the featureCompatibilityVersion server parameter. This is necessary because this
@@ -136,12 +137,10 @@ void SyncTailTest::setUp() {
}
void SyncTailTest::tearDown() {
- auto service = getServiceContext();
_opCtx.reset();
- _storageInterface = {};
_consistencyMarkers = {};
- DropPendingCollectionReaper::set(service, {});
- StorageInterface::set(service, {});
+ DropPendingCollectionReaper::set(serviceContext, {});
+ StorageInterface::set(serviceContext, {});
ServiceContextMongoDTest::tearDown();
}
@@ -150,7 +149,7 @@ ReplicationConsistencyMarkers* SyncTailTest::getConsistencyMarkers() const {
}
StorageInterface* SyncTailTest::getStorageInterface() const {
- return _storageInterface.get();
+ return StorageInterface::get(serviceContext);
}
void SyncTailTest::_testSyncApplyCrudOperation(ErrorCodes::Error expectedError,
diff --git a/src/mongo/db/repl/sync_tail_test_fixture.h b/src/mongo/db/repl/sync_tail_test_fixture.h
index c1362aad0f2..9b1adaea94f 100644
--- a/src/mongo/db/repl/sync_tail_test_fixture.h
+++ b/src/mongo/db/repl/sync_tail_test_fixture.h
@@ -120,7 +120,7 @@ protected:
ServiceContext::UniqueOperationContext _opCtx;
std::unique_ptr<ReplicationConsistencyMarkers> _consistencyMarkers;
- std::unique_ptr<StorageInterface> _storageInterface;
+ ServiceContext* serviceContext;
SyncTailOpObserver* _opObserver = nullptr;
// Implements the SyncTail::MultiSyncApplyFn interface and does nothing.
diff --git a/src/mongo/db/repl/transaction_oplog_application.cpp b/src/mongo/db/repl/transaction_oplog_application.cpp
index 3af7319fefb..35394e783db 100644
--- a/src/mongo/db/repl/transaction_oplog_application.cpp
+++ b/src/mongo/db/repl/transaction_oplog_application.cpp
@@ -37,8 +37,10 @@
#include "mongo/db/catalog_raii.h"
#include "mongo/db/commands/txn_cmds_gen.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/dbdirectclient.h"
#include "mongo/db/index_builds_coordinator.h"
#include "mongo/db/repl/apply_ops.h"
+#include "mongo/db/repl/storage_interface_impl.h"
#include "mongo/db/repl/timestamp_block.h"
#include "mongo/db/session_catalog_mongod.h"
#include "mongo/db/transaction_history_iterator.h"
@@ -51,6 +53,9 @@ namespace {
// If enabled, causes _applyPrepareTransaction to hang before preparing the transaction participant.
MONGO_FAIL_POINT_DEFINE(applyPrepareCommandHangBeforePreparingTransaction);
+// Failpoint that will cause reconstructPreparedTransactions to return early.
+MONGO_FAIL_POINT_DEFINE(skipReconstructPreparedTransactions);
+
// Apply the oplog entries for a prepare or a prepared commit during recovery/initial sync.
Status _applyOperationsForTransaction(OperationContext* opCtx,
@@ -176,6 +181,7 @@ Status applyCommitTransaction(OperationContext* opCtx,
invariant(entry.getTxnNumber());
opCtx->setLogicalSessionId(*entry.getSessionId());
opCtx->setTxnNumber(*entry.getTxnNumber());
+
// The write on transaction table may be applied concurrently, so refreshing state
// from disk may read that write, causing starting a new transaction on an existing
// txnNumber. Thus, we start a new transaction without refreshing state from disk.
@@ -197,13 +203,13 @@ Status applyAbortTransaction(OperationContext* opCtx,
"abortTransaction is only used internally by secondaries.",
mode != repl::OplogApplication::Mode::kApplyOpsCmd);
- // We don't put transactions into the prepare state until the end of recovery, so there is
- // no transaction to abort.
- if (mode == repl::OplogApplication::Mode::kRecovering) {
+ // We don't put transactions into the prepare state until the end of recovery and initial sync,
+ // so there is no transaction to abort.
+ if (mode == repl::OplogApplication::Mode::kRecovering ||
+ mode == repl::OplogApplication::Mode::kInitialSync) {
return Status::OK();
}
- // TODO: SERVER-36492 Only run on secondary until we support initial sync.
invariant(mode == repl::OplogApplication::Mode::kSecondary);
// Transaction operations are in its own batch, so we can modify their opCtx.
@@ -294,8 +300,8 @@ repl::MultiApplier::Operations readTransactionOperationsFromOplogChain(
namespace {
/**
- * This is the part of applyPrepareTransaction which is common to steady state and recovery
- * oplog application.
+ * This is the part of applyPrepareTransaction which is common to steady state, initial sync and
+ * recovery oplog application.
*/
Status _applyPrepareTransaction(OperationContext* opCtx,
const OplogEntry& entry,
@@ -309,7 +315,8 @@ Status _applyPrepareTransaction(OperationContext* opCtx,
return readTransactionOperationsFromOplogChain(opCtx, entry, {});
}();
- if (oplogApplicationMode == repl::OplogApplication::Mode::kRecovering) {
+ if (oplogApplicationMode == repl::OplogApplication::Mode::kRecovering ||
+ oplogApplicationMode == repl::OplogApplication::Mode::kInitialSync) {
// We might replay a prepared transaction behind oldest timestamp. Note that since this is
// scoped to the storage transaction, and readTransactionOperationsFromOplogChain implicitly
// abandons the storage transaction when it releases the global lock, this must be done
@@ -357,6 +364,23 @@ Status _applyPrepareTransaction(OperationContext* opCtx,
return Status::OK();
}
+
+/**
+ * Apply a prepared transaction during recovery. The OplogEntry must be an 'applyOps' with
+ * 'prepare' set or a prepareTransaction command.
+ */
+Status applyRecoveredPrepareTransaction(OperationContext* opCtx,
+ const OplogEntry& entry,
+ repl::OplogApplication::Mode mode) {
+ // Snapshot transactions never conflict with the PBWM lock.
+ invariant(!opCtx->lockState()->shouldConflictWithSecondaryBatchApplication());
+ if (entry.getCommandType() == OplogEntry::CommandType::kPrepareTransaction) {
+ return _applyPrepareTransaction(opCtx, entry, mode);
+ } else {
+ // This is an applyOps with prepare.
+ return applyRecoveredPrepareApplyOpsOplogEntry(opCtx, entry, mode);
+ }
+}
} // namespace
/**
@@ -396,14 +420,54 @@ Status applyPrepareTransaction(OperationContext* opCtx,
return _applyPrepareTransaction(opCtx, entry, oplogApplicationMode);
}
-Status applyRecoveredPrepareTransaction(OperationContext* opCtx, const OplogEntry& entry) {
- // Snapshot transactions never conflict with the PBWM lock.
- invariant(!opCtx->lockState()->shouldConflictWithSecondaryBatchApplication());
- if (entry.getCommandType() == OplogEntry::CommandType::kPrepareTransaction) {
- return _applyPrepareTransaction(opCtx, entry, repl::OplogApplication::Mode::kRecovering);
- } else {
- // This is an applyOps with prepare.
- return applyRecoveredPrepareApplyOpsOplogEntry(opCtx, entry);
+void reconstructPreparedTransactions(OperationContext* opCtx, repl::OplogApplication::Mode mode) {
+ if (MONGO_FAIL_POINT(skipReconstructPreparedTransactions)) {
+ log() << "Hit skipReconstructPreparedTransactions failpoint";
+ return;
+ }
+ // Read the transactions table with its own snapshot and read timestamp.
+ ReadSourceScope readSourceScope(opCtx);
+
+ DBDirectClient client(opCtx);
+ const auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace,
+ {BSON("state"
+ << "prepared")});
+
+ // Iterate over each entry in the transactions table that has a prepared transaction.
+ while (cursor->more()) {
+ const auto txnRecordObj = cursor->next();
+ const auto txnRecord = SessionTxnRecord::parse(
+ IDLParserErrorContext("recovering prepared transaction"), txnRecordObj);
+
+ invariant(txnRecord.getState() == DurableTxnStateEnum::kPrepared);
+
+ // Get the prepareTransaction oplog entry corresponding to this transactions table entry.
+ const auto prepareOpTime = txnRecord.getLastWriteOpTime();
+ invariant(!prepareOpTime.isNull());
+ TransactionHistoryIterator iter(prepareOpTime);
+ invariant(iter.hasNext());
+ auto prepareOplogEntry = iter.next(opCtx);
+
+ {
+ // Make a new opCtx so that we can set the lsid when applying the prepare transaction
+ // oplog entry.
+ auto newClient =
+ opCtx->getServiceContext()->makeClient("reconstruct-prepared-transactions");
+ AlternativeClientRegion acr(newClient);
+ const auto newOpCtx = cc().makeOperationContext();
+ repl::UnreplicatedWritesBlock uwb(newOpCtx.get());
+
+ // Snapshot transaction can never conflict with the PBWM lock.
+ newOpCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(false);
+
+ // TODO: SERVER-40177 This should be removed once it is guaranteed operations applied on
+ // recovering nodes cannot encounter unnecessary prepare conflicts.
+ newOpCtx->recoveryUnit()->setIgnorePrepared(true);
+
+ // Checks out the session, applies the operations and prepares the transaction.
+ uassertStatusOK(
+ applyRecoveredPrepareTransaction(newOpCtx.get(), prepareOplogEntry, mode));
+ }
}
}
diff --git a/src/mongo/db/repl/transaction_oplog_application.h b/src/mongo/db/repl/transaction_oplog_application.h
index bc960303801..09607e820f2 100644
--- a/src/mongo/db/repl/transaction_oplog_application.h
+++ b/src/mongo/db/repl/transaction_oplog_application.h
@@ -66,10 +66,10 @@ Status applyPrepareTransaction(OperationContext* opCtx,
const repl::OplogEntry& entry,
repl::OplogApplication::Mode mode);
-/**
- * Apply a prepared transaction during recovery. The OplogEntry must be an 'applyOps' with
- * 'prepare' set or a prepareTransaction command.
+/*
+ * Reconstruct prepared transactions by iterating over the transactions table to see which
+ * transactions should be in the prepared state, getting the corresponding oplog entry and applying
+ * the operations. Called at the end of rollback, startup recovery and initial sync.
*/
-Status applyRecoveredPrepareTransaction(OperationContext* opCtx, const repl::OplogEntry& entry);
-
+void reconstructPreparedTransactions(OperationContext* opCtx, repl::OplogApplication::Mode mode);
} // namespace mongo