diff options
author | Samy Lanka <samy.lanka@mongodb.com> | 2019-04-30 21:59:26 -0400 |
---|---|---|
committer | Samy Lanka <samy.lanka@mongodb.com> | 2019-05-06 17:23:18 -0400 |
commit | 8ad1effc48ac5193c5f57630d1fbce8bda0cfdaf (patch) | |
tree | f12beec2e31fc5a189bbf1943682f2b05a77972d /src/mongo/db/repl | |
parent | a25226e009fa1598f3077dd7972b9be3d2368785 (diff) | |
download | mongo-8ad1effc48ac5193c5f57630d1fbce8bda0cfdaf.tar.gz |
SERVER-36492 Reconstruct prepared transactions at the end of initial sync
Diffstat (limited to 'src/mongo/db/repl')
20 files changed, 204 insertions, 112 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 1ca6913e8da..c206077d0f5 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -1559,6 +1559,7 @@ env.Library( 'database_cloner', 'databases_cloner', 'multiapplier', + 'oplog', 'oplog_application_interface', 'oplog_buffer_blocking_queue', 'oplog_entry', diff --git a/src/mongo/db/repl/apply_ops.cpp b/src/mongo/db/repl/apply_ops.cpp index 926da2a8d22..5938c6c2615 100644 --- a/src/mongo/db/repl/apply_ops.cpp +++ b/src/mongo/db/repl/apply_ops.cpp @@ -478,10 +478,12 @@ Status applyApplyOpsOplogEntry(OperationContext* opCtx, &resultWeDontCareAbout); } -Status applyRecoveredPrepareApplyOpsOplogEntry(OperationContext* opCtx, const OplogEntry& entry) { +Status applyRecoveredPrepareApplyOpsOplogEntry(OperationContext* opCtx, + const OplogEntry& entry, + repl::OplogApplication::Mode mode) { // We might replay a prepared transaction behind oldest timestamp. opCtx->recoveryUnit()->setRoundUpPreparedTimestamps(true); - return _applyPrepareTransaction(opCtx, entry, OplogApplication::Mode::kRecovering); + return _applyPrepareTransaction(opCtx, entry, mode); } Status applyOps(OperationContext* opCtx, diff --git a/src/mongo/db/repl/apply_ops.h b/src/mongo/db/repl/apply_ops.h index 94ff4dfa0e5..7cbaa433a87 100644 --- a/src/mongo/db/repl/apply_ops.h +++ b/src/mongo/db/repl/apply_ops.h @@ -115,6 +115,8 @@ Status applyApplyOpsOplogEntry(OperationContext* opCtx, /** * Called from recovery to apply an 'applyOps' oplog entry that prepares a transaction. */ -Status applyRecoveredPrepareApplyOpsOplogEntry(OperationContext* opCtx, const OplogEntry& entry); +Status applyRecoveredPrepareApplyOpsOplogEntry(OperationContext* opCtx, + const OplogEntry& entry, + repl::OplogApplication::Mode mode); } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp index 9bfdfbd13cd..4733e69ea10 100644 --- a/src/mongo/db/repl/initial_syncer.cpp +++ b/src/mongo/db/repl/initial_syncer.cpp @@ -58,6 +58,7 @@ #include "mongo/db/repl/replication_process.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/db/repl/sync_source_selector.h" +#include "mongo/db/repl/transaction_oplog_application.h" #include "mongo/db/session_txn_record_gen.h" #include "mongo/executor/task_executor.h" #include "mongo/executor/thread_pool_task_executor.h" @@ -433,19 +434,22 @@ void InitialSyncer::_tearDown_inlock(OperationContext* opCtx, return; } const auto lastAppliedOpTime = lastApplied.getValue().opTime; + auto initialDataTimestamp = lastAppliedOpTime.getTimestamp(); // A node coming out of initial sync must guarantee at least one oplog document is visible - // such that others can sync from this node. Oplog visibility is not rigorously advanced - // during initial sync. Correct the visibility to match the initial sync time before - // transitioning to steady state replication. + // such that others can sync from this node. Oplog visibility is only advanced when applying + // oplog entries during initial sync. Correct the visibility to match the initial sync time + // before transitioning to steady state replication. const bool orderedCommit = true; - _storage->oplogDiskLocRegister(opCtx, lastAppliedOpTime.getTimestamp(), orderedCommit); + _storage->oplogDiskLocRegister(opCtx, initialDataTimestamp, orderedCommit); + + reconstructPreparedTransactions(opCtx, repl::OplogApplication::Mode::kInitialSync); _replicationProcess->getConsistencyMarkers()->clearInitialSyncFlag(opCtx); // All updates that represent initial sync must be completed before setting the initial data // timestamp. - _storage->setInitialDataTimestamp(opCtx->getServiceContext(), lastAppliedOpTime.getTimestamp()); + _storage->setInitialDataTimestamp(opCtx->getServiceContext(), initialDataTimestamp); auto currentLastAppliedOpTime = _opts.getMyLastOptime(); if (currentLastAppliedOpTime.isNull()) { diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp index 2eb1d27e5ef..62b6c7902d6 100644 --- a/src/mongo/db/repl/initial_syncer_test.cpp +++ b/src/mongo/db/repl/initial_syncer_test.cpp @@ -1849,6 +1849,11 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughOplogFetcherCallbackError) { TEST_F(InitialSyncerTest, InitialSyncerSucceedsOnEarlyOplogFetcherCompletionIfThereAreNoOperationsToApply) { + // Skip reconstructing prepared transactions at the end of initial sync because + // InitialSyncerTest does not construct ServiceEntryPoint and this causes a segmentation fault + // when reconstructPreparedTransactions uses DBDirectClient to call into ServiceEntryPoint. + FailPointEnableBlock skipReconstructPreparedTransactions("skipReconstructPreparedTransactions"); + auto initialSyncer = &getInitialSyncer(); auto opCtx = makeOpCtx(); @@ -1912,6 +1917,11 @@ TEST_F(InitialSyncerTest, TEST_F( InitialSyncerTest, InitialSyncerSucceedsOnEarlyOplogFetcherCompletionIfThereAreEnoughOperationsInTheOplogBufferToReachEndTimestamp) { + // Skip reconstructing prepared transactions at the end of initial sync because + // InitialSyncerTest does not construct ServiceEntryPoint and this causes a segmentation fault + // when reconstructPreparedTransactions uses DBDirectClient to call into ServiceEntryPoint. + FailPointEnableBlock skipReconstructPreparedTransactions("skipReconstructPreparedTransactions"); + auto initialSyncer = &getInitialSyncer(); auto opCtx = makeOpCtx(); @@ -3143,6 +3153,11 @@ TEST_F(InitialSyncerTest, } TEST_F(InitialSyncerTest, LastOpTimeShouldBeSetEvenIfNoOperationsAreAppliedAfterCloning) { + // Skip reconstructing prepared transactions at the end of initial sync because + // InitialSyncerTest does not construct ServiceEntryPoint and this causes a segmentation fault + // when reconstructPreparedTransactions uses DBDirectClient to call into ServiceEntryPoint. + FailPointEnableBlock skipReconstructPreparedTransactions("skipReconstructPreparedTransactions"); + auto initialSyncer = &getInitialSyncer(); auto opCtx = makeOpCtx(); @@ -3847,6 +3862,11 @@ void InitialSyncerTest::doSuccessfulInitialSyncWithOneBatch(bool shouldSetFCV) { TEST_F(InitialSyncerTest, InitialSyncerReturnsLastAppliedOnReachingStopTimestampAfterApplyingOneBatch) { + // Skip reconstructing prepared transactions at the end of initial sync because + // InitialSyncerTest does not construct ServiceEntryPoint and this causes a segmentation fault + // when reconstructPreparedTransactions uses DBDirectClient to call into ServiceEntryPoint. + FailPointEnableBlock skipReconstructPreparedTransactions("skipReconstructPreparedTransactions"); + // Tell test to setFCV=4.2 before the last rollback ID check. // _rollbackCheckerCheckForRollbackCallback() calls upgradeNonReplicatedUniqueIndexes // only if fCV is 4.2. @@ -3859,6 +3879,11 @@ TEST_F(InitialSyncerTest, TEST_F(InitialSyncerTest, InitialSyncerReturnsLastAppliedOnReachingStopTimestampAfterApplyingMultipleBatches) { + // Skip reconstructing prepared transactions at the end of initial sync because + // InitialSyncerTest does not construct ServiceEntryPoint and this causes a segmentation fault + // when reconstructPreparedTransactions uses DBDirectClient to call into ServiceEntryPoint. + FailPointEnableBlock skipReconstructPreparedTransactions("skipReconstructPreparedTransactions"); + auto initialSyncer = &getInitialSyncer(); auto opCtx = makeOpCtx(); @@ -3966,6 +3991,11 @@ TEST_F(InitialSyncerTest, TEST_F( InitialSyncerTest, InitialSyncerSchedulesLastOplogEntryFetcherToGetNewStopTimestampIfMissingDocumentsHaveBeenFetchedDuringMultiInitialSyncApply) { + // Skip reconstructing prepared transactions at the end of initial sync because + // InitialSyncerTest does not construct ServiceEntryPoint and this causes a segmentation fault + // when reconstructPreparedTransactions uses DBDirectClient to call into ServiceEntryPoint. + FailPointEnableBlock skipReconstructPreparedTransactions("skipReconstructPreparedTransactions"); + auto initialSyncer = &getInitialSyncer(); auto opCtx = makeOpCtx(); @@ -4142,6 +4172,11 @@ TEST_F(InitialSyncerTest, OplogOutOfOrderOnOplogFetchFinish) { } TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) { + // Skip reconstructing prepared transactions at the end of initial sync because + // InitialSyncerTest does not construct ServiceEntryPoint and this causes a segmentation fault + // when reconstructPreparedTransactions uses DBDirectClient to call into ServiceEntryPoint. + FailPointEnableBlock skipReconstructPreparedTransactions("skipReconstructPreparedTransactions"); + auto initialSyncer = &getInitialSyncer(); auto opCtx = makeOpCtx(); ASSERT_OK(ServerParameterSet::getGlobal() @@ -4494,6 +4529,11 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressOmitsClonerStatsIfClonerStatsExc } TEST_F(InitialSyncerTest, InitialSyncerDoesNotCallUpgradeNonReplicatedUniqueIndexesOnFCV40) { + // Skip reconstructing prepared transactions at the end of initial sync because + // InitialSyncerTest does not construct ServiceEntryPoint and this causes a segmentation fault + // when reconstructPreparedTransactions uses DBDirectClient to call into ServiceEntryPoint. + FailPointEnableBlock skipReconstructPreparedTransactions("skipReconstructPreparedTransactions"); + // In MongoDB 4.2, upgradeNonReplicatedUniqueIndexes will only be called if fCV is 4.2. doSuccessfulInitialSyncWithOneBatch(false); diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp index a432da62bb4..2a98c73b262 100644 --- a/src/mongo/db/repl/oplog_fetcher.cpp +++ b/src/mongo/db/repl/oplog_fetcher.cpp @@ -252,7 +252,10 @@ StatusWith<boost::optional<rpc::OplogQueryMetadata>> parseOplogQueryMetadata( } // namespace StatusWith<OplogFetcher::DocumentsInfo> OplogFetcher::validateDocuments( - const Fetcher::Documents& documents, bool first, Timestamp lastTS) { + const Fetcher::Documents& documents, + bool first, + Timestamp lastTS, + StartingPoint startingPoint) { if (first && documents.empty()) { return Status(ErrorCodes::OplogStartMissing, str::stream() << "The first batch of oplog entries is empty, but expected at " @@ -300,7 +303,7 @@ StatusWith<OplogFetcher::DocumentsInfo> OplogFetcher::validateDocuments( // These numbers are for the documents we will apply. info.toApplyDocumentCount = documents.size(); info.toApplyDocumentBytes = info.networkDocumentBytes; - if (first) { + if (first && startingPoint == StartingPoint::kSkipFirstDoc) { // The count is one less since the first document found was already applied ($gte $ts query) // and we will not apply it again. --info.toApplyDocumentCount; @@ -466,8 +469,8 @@ StatusWith<BSONObj> OplogFetcher::_onSuccessfulBatch(const Fetcher::QueryRespons } } - auto validateResult = - OplogFetcher::validateDocuments(documents, queryResponse.first, lastFetched.getTimestamp()); + auto validateResult = OplogFetcher::validateDocuments( + documents, queryResponse.first, lastFetched.getTimestamp(), _startingPoint); if (!validateResult.isOK()) { return validateResult.getStatus(); } diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h index 10b7d26e00a..398ac362377 100644 --- a/src/mongo/db/repl/oplog_fetcher.h +++ b/src/mongo/db/repl/oplog_fetcher.h @@ -114,9 +114,11 @@ public: * query. * On success, returns statistics on operations. */ - static StatusWith<DocumentsInfo> validateDocuments(const Fetcher::Documents& documents, - bool first, - Timestamp lastTS); + static StatusWith<DocumentsInfo> validateDocuments( + const Fetcher::Documents& documents, + bool first, + Timestamp lastTS, + StartingPoint startingPoint = StartingPoint::kSkipFirstDoc); /** * Invariants if validation fails on any of the provided arguments. diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp index 233735db58e..1550d0882ed 100644 --- a/src/mongo/db/repl/oplog_fetcher_test.cpp +++ b/src/mongo/db/repl/oplog_fetcher_test.cpp @@ -930,8 +930,9 @@ TEST_F(OplogFetcherTest, ValidateDocumentsReturnsOutOfOrderIfTimestampInThirdEnt .getStatus()); } -TEST_F(OplogFetcherTest, - ValidateDocumentsExcludesFirstDocumentInApplyCountAndBytesIfProcessingFirstBatch) { +TEST_F( + OplogFetcherTest, + ValidateDocumentsExcludesFirstDocumentInApplyCountAndBytesIfProcessingFirstBatchAndSkipFirstDoc) { auto firstEntry = makeNoopOplogEntry(Seconds(123)); auto secondEntry = makeNoopOplogEntry(Seconds(456)); auto thirdEntry = makeNoopOplogEntry(Seconds(789)); @@ -939,11 +940,37 @@ TEST_F(OplogFetcherTest, auto info = unittest::assertGet(OplogFetcher::validateDocuments( {firstEntry, secondEntry, thirdEntry}, true, - unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp())); + unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp(), + mongo::repl::OplogFetcher::StartingPoint::kSkipFirstDoc)); + + ASSERT_EQUALS(3U, info.networkDocumentCount); + ASSERT_EQUALS(2U, info.toApplyDocumentCount); + ASSERT_EQUALS(size_t(firstEntry.objsize() + secondEntry.objsize() + thirdEntry.objsize()), + info.networkDocumentBytes); + ASSERT_EQUALS(size_t(secondEntry.objsize() + thirdEntry.objsize()), info.toApplyDocumentBytes); + + ASSERT_EQUALS(unittest::assertGet(OpTime::parseFromOplogEntry(thirdEntry)), info.lastDocument); +} + +TEST_F( + OplogFetcherTest, + ValidateDocumentsIncludesFirstDocumentInApplyCountAndBytesIfProcessingFirstBatchAndEnqueueFirstDoc) { + auto firstEntry = makeNoopOplogEntry(Seconds(123)); + auto secondEntry = makeNoopOplogEntry(Seconds(456)); + auto thirdEntry = makeNoopOplogEntry(Seconds(789)); + + auto info = unittest::assertGet(OplogFetcher::validateDocuments( + {firstEntry, secondEntry, thirdEntry}, + true, + unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp(), + mongo::repl::OplogFetcher::StartingPoint::kEnqueueFirstDoc)); ASSERT_EQUALS(3U, info.networkDocumentCount); + ASSERT_EQUALS(3U, info.toApplyDocumentCount); ASSERT_EQUALS(size_t(firstEntry.objsize() + secondEntry.objsize() + thirdEntry.objsize()), info.networkDocumentBytes); + ASSERT_EQUALS(size_t(firstEntry.objsize() + secondEntry.objsize() + thirdEntry.objsize()), + info.toApplyDocumentBytes); ASSERT_EQUALS(unittest::assertGet(OpTime::parseFromOplogEntry(thirdEntry)), info.lastDocument); } diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 3353c3cd506..df7e2ca7c44 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -70,6 +70,7 @@ #include "mongo/db/repl/rslog.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/db/repl/topology_coordinator.h" +#include "mongo/db/repl/transaction_oplog_application.h" #include "mongo/db/repl/update_position_args.h" #include "mongo/db/repl/vote_requester.h" #include "mongo/db/server_options.h" @@ -517,7 +518,7 @@ bool ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* opCtx) // Read the last op from the oplog after cleaning up any partially applied batches. const auto stableTimestamp = boost::none; _replicationProcess->getReplicationRecovery()->recoverFromOplog(opCtx, stableTimestamp); - _replicationProcess->getReplicationRecovery()->reconstructPreparedTransactions(opCtx); + reconstructPreparedTransactions(opCtx, OplogApplication::Mode::kRecovering); const auto lastOpTimeAndWallTimeResult = _externalState->loadLastOpTimeAndWallTime(opCtx); @@ -791,7 +792,7 @@ void ReplicationCoordinatorImpl::startup(OperationContext* opCtx) { // for the recoveryTimestamp just like on replica set recovery. const auto stableTimestamp = boost::none; _replicationProcess->getReplicationRecovery()->recoverFromOplog(opCtx, stableTimestamp); - _replicationProcess->getReplicationRecovery()->reconstructPreparedTransactions(opCtx); + reconstructPreparedTransactions(opCtx, OplogApplication::Mode::kRecovering); warning() << "Setting mongod to readOnly mode as a result of specifying " "'recoverFromOplogAsStandalone'."; diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp index 5ee9f020ebe..43f65feb274 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp @@ -171,6 +171,10 @@ void ReplCoordTest::init(const std::string& replSet) { } void ReplCoordTest::start() { + // Skip reconstructing prepared transactions at the end of startup because ReplCoordTest doesn't + // construct ServiceEntryPoint and this causes a segmentation fault when + // reconstructPreparedTransactions uses DBDirectClient to call into ServiceEntryPoint. + FailPointEnableBlock skipReconstructPreparedTransactions("skipReconstructPreparedTransactions"); invariant(!_callShutdown); // if we haven't initialized yet, do that first. if (!_repl) { diff --git a/src/mongo/db/repl/replication_recovery.cpp b/src/mongo/db/repl/replication_recovery.cpp index 41df37761a9..3232e7480f8 100644 --- a/src/mongo/db/repl/replication_recovery.cpp +++ b/src/mongo/db/repl/replication_recovery.cpp @@ -276,50 +276,6 @@ void ReplicationRecoveryImpl::recoverFromOplog(OperationContext* opCtx, std::terminate(); } -void ReplicationRecoveryImpl::reconstructPreparedTransactions(OperationContext* opCtx) { - DBDirectClient client(opCtx); - const auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace, - {BSON("state" - << "prepared")}); - - // Iterate over each entry in the transactions table that has a prepared transaction. - while (cursor->more()) { - const auto txnRecordObj = cursor->next(); - const auto txnRecord = SessionTxnRecord::parse( - IDLParserErrorContext("recovering prepared transaction"), txnRecordObj); - - invariant(txnRecord.getState() == DurableTxnStateEnum::kPrepared); - - // Get the prepareTransaction oplog entry corresponding to this transactions table entry. - invariant(!opCtx->recoveryUnit()->getPointInTimeReadTimestamp()); - const auto prepareOpTime = txnRecord.getLastWriteOpTime(); - invariant(!prepareOpTime.isNull()); - TransactionHistoryIterator iter(prepareOpTime); - invariant(iter.hasNext()); - const auto prepareOplogEntry = iter.next(opCtx); - - { - // Make a new opCtx so that we can set the lsid when applying the prepare transaction - // oplog entry. - auto newClient = - opCtx->getServiceContext()->makeClient("reconstruct-prepared-transactions"); - AlternativeClientRegion acr(newClient); - const auto newOpCtx = cc().makeOperationContext(); - repl::UnreplicatedWritesBlock uwb(newOpCtx.get()); - - // Snapshot transaction can never conflict with the PBWM lock. - newOpCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(false); - - // TODO: SERVER-40177 This should be removed once it is guaranteed operations applied on - // recovering nodes cannot encounter unnecessary prepare conflicts. - newOpCtx->recoveryUnit()->setIgnorePrepared(true); - - // Checks out the session, applies the operations and prepares the transactions. - uassertStatusOK(applyRecoveredPrepareTransaction(newOpCtx.get(), prepareOplogEntry)); - } - } -} - void ReplicationRecoveryImpl::_recoverFromStableTimestamp(OperationContext* opCtx, Timestamp stableTimestamp, OpTime appliedThrough, diff --git a/src/mongo/db/repl/replication_recovery.h b/src/mongo/db/repl/replication_recovery.h index f04a651f9fe..0261fb5a74d 100644 --- a/src/mongo/db/repl/replication_recovery.h +++ b/src/mongo/db/repl/replication_recovery.h @@ -55,13 +55,6 @@ public: */ virtual void recoverFromOplog(OperationContext* opCtx, boost::optional<Timestamp> stableTimestamp) = 0; - - /** - * Reconstruct prepared transactions by iterating over the transactions table to see which - * transactions should be in the prepared state, getting the corresponding oplog entry and - * applying the operations. - */ - virtual void reconstructPreparedTransactions(OperationContext* opCtx) = 0; }; class ReplicationRecoveryImpl : public ReplicationRecovery { @@ -75,8 +68,6 @@ public: void recoverFromOplog(OperationContext* opCtx, boost::optional<Timestamp> stableTimestamp) override; - void reconstructPreparedTransactions(OperationContext* opCtx) override; - private: /** * After truncating the oplog, completes recovery if we're recovering from a stable timestamp diff --git a/src/mongo/db/repl/replication_recovery_mock.h b/src/mongo/db/repl/replication_recovery_mock.h index b6d6bf4b572..77835215ca3 100644 --- a/src/mongo/db/repl/replication_recovery_mock.h +++ b/src/mongo/db/repl/replication_recovery_mock.h @@ -44,8 +44,6 @@ public: void recoverFromOplog(OperationContext* opCtx, boost::optional<Timestamp> stableTimestamp) override {} - - void reconstructPreparedTransactions(OperationContext* opCtx) override {} }; } // namespace repl diff --git a/src/mongo/db/repl/rollback_impl.cpp b/src/mongo/db/repl/rollback_impl.cpp index 0bf542ea04b..dd3cb7fcfc0 100644 --- a/src/mongo/db/repl/rollback_impl.cpp +++ b/src/mongo/db/repl/rollback_impl.cpp @@ -53,6 +53,7 @@ #include "mongo/db/repl/replication_process.h" #include "mongo/db/repl/roll_back_local_operations.h" #include "mongo/db/repl/storage_interface.h" +#include "mongo/db/repl/transaction_oplog_application.h" #include "mongo/db/s/shard_identity_rollback_notifier.h" #include "mongo/db/s/type_shard_identity.h" #include "mongo/db/server_recovery.h" @@ -328,7 +329,7 @@ Status RollbackImpl::runRollback(OperationContext* opCtx) { // transactions were aborted (i.e. the in-memory counts were rolled-back) before computing // collection counts, reconstruct the prepared transactions now, adding on any additional counts // to the now corrected record store. - _replicationProcess->getReplicationRecovery()->reconstructPreparedTransactions(opCtx); + reconstructPreparedTransactions(opCtx, OplogApplication::Mode::kRecovering); // At this point, the last applied and durable optimes on this node still point to ops on // the divergent branch of history. We therefore update the last optimes to the top of the diff --git a/src/mongo/db/repl/storage_interface.cpp b/src/mongo/db/repl/storage_interface.cpp index f7d4efb4354..093c8890747 100644 --- a/src/mongo/db/repl/storage_interface.cpp +++ b/src/mongo/db/repl/storage_interface.cpp @@ -56,7 +56,6 @@ StorageInterface* StorageInterface::get(OperationContext* opCtx) { return get(opCtx->getClient()->getServiceContext()); } - void StorageInterface::set(ServiceContext* service, std::unique_ptr<StorageInterface> storage) { auto& storageInterface = getStorageInterface(service); storageInterface = std::move(storage); diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp index 7d7e330322a..646dc45d764 100644 --- a/src/mongo/db/repl/sync_tail_test.cpp +++ b/src/mongo/db/repl/sync_tail_test.cpp @@ -1685,7 +1685,7 @@ DEATH_TEST_F(SyncTailTest, OplogApplier::Options options; SyncTail syncTail(nullptr, // observer. not required by oplogApplication(). _consistencyMarkers.get(), - _storageInterface.get(), + getStorageInterface(), applyOperationFn, writerPool.get(), options); @@ -2769,8 +2769,6 @@ TEST_F(IdempotencyTest, CommitPreparedTransactionDataPartiallyApplied) { } TEST_F(IdempotencyTest, AbortPreparedTransaction) { - // TODO: SERVER-36492 Fix this test - return; createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); auto uuid = createCollectionWithUuid(_opCtx.get(), nss); auto lsid = makeLogicalSessionId(_opCtx.get()); diff --git a/src/mongo/db/repl/sync_tail_test_fixture.cpp b/src/mongo/db/repl/sync_tail_test_fixture.cpp index 9ef501f89b9..045b1b00bc8 100644 --- a/src/mongo/db/repl/sync_tail_test_fixture.cpp +++ b/src/mongo/db/repl/sync_tail_test_fixture.cpp @@ -107,17 +107,18 @@ OplogApplier::Options SyncTailTest::makeRecoveryOptions() { void SyncTailTest::setUp() { ServiceContextMongoDTest::setUp(); - auto service = getServiceContext(); + serviceContext = getServiceContext(); _opCtx = cc().makeOperationContext(); - ReplicationCoordinator::set(service, stdx::make_unique<ReplicationCoordinatorMock>(service)); + ReplicationCoordinator::set(serviceContext, + stdx::make_unique<ReplicationCoordinatorMock>(serviceContext)); ASSERT_OK(ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_PRIMARY)); - _storageInterface = stdx::make_unique<StorageInterfaceImpl>(); + StorageInterface::set(serviceContext, stdx::make_unique<StorageInterfaceImpl>()); DropPendingCollectionReaper::set( - service, stdx::make_unique<DropPendingCollectionReaper>(_storageInterface.get())); - repl::setOplogCollectionName(service); + serviceContext, stdx::make_unique<DropPendingCollectionReaper>(getStorageInterface())); + repl::setOplogCollectionName(serviceContext); repl::createOplog(_opCtx.get()); _consistencyMarkers = stdx::make_unique<ReplicationConsistencyMarkersMock>(); @@ -125,7 +126,7 @@ void SyncTailTest::setUp() { // Set up an OpObserver to track the documents SyncTail inserts. auto opObserver = std::make_unique<SyncTailOpObserver>(); _opObserver = opObserver.get(); - auto opObserverRegistry = dynamic_cast<OpObserverRegistry*>(service->getOpObserver()); + auto opObserverRegistry = dynamic_cast<OpObserverRegistry*>(serviceContext->getOpObserver()); opObserverRegistry->addObserver(std::move(opObserver)); // Initialize the featureCompatibilityVersion server parameter. This is necessary because this @@ -136,12 +137,10 @@ void SyncTailTest::setUp() { } void SyncTailTest::tearDown() { - auto service = getServiceContext(); _opCtx.reset(); - _storageInterface = {}; _consistencyMarkers = {}; - DropPendingCollectionReaper::set(service, {}); - StorageInterface::set(service, {}); + DropPendingCollectionReaper::set(serviceContext, {}); + StorageInterface::set(serviceContext, {}); ServiceContextMongoDTest::tearDown(); } @@ -150,7 +149,7 @@ ReplicationConsistencyMarkers* SyncTailTest::getConsistencyMarkers() const { } StorageInterface* SyncTailTest::getStorageInterface() const { - return _storageInterface.get(); + return StorageInterface::get(serviceContext); } void SyncTailTest::_testSyncApplyCrudOperation(ErrorCodes::Error expectedError, diff --git a/src/mongo/db/repl/sync_tail_test_fixture.h b/src/mongo/db/repl/sync_tail_test_fixture.h index c1362aad0f2..9b1adaea94f 100644 --- a/src/mongo/db/repl/sync_tail_test_fixture.h +++ b/src/mongo/db/repl/sync_tail_test_fixture.h @@ -120,7 +120,7 @@ protected: ServiceContext::UniqueOperationContext _opCtx; std::unique_ptr<ReplicationConsistencyMarkers> _consistencyMarkers; - std::unique_ptr<StorageInterface> _storageInterface; + ServiceContext* serviceContext; SyncTailOpObserver* _opObserver = nullptr; // Implements the SyncTail::MultiSyncApplyFn interface and does nothing. diff --git a/src/mongo/db/repl/transaction_oplog_application.cpp b/src/mongo/db/repl/transaction_oplog_application.cpp index 3af7319fefb..35394e783db 100644 --- a/src/mongo/db/repl/transaction_oplog_application.cpp +++ b/src/mongo/db/repl/transaction_oplog_application.cpp @@ -37,8 +37,10 @@ #include "mongo/db/catalog_raii.h" #include "mongo/db/commands/txn_cmds_gen.h" #include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/dbdirectclient.h" #include "mongo/db/index_builds_coordinator.h" #include "mongo/db/repl/apply_ops.h" +#include "mongo/db/repl/storage_interface_impl.h" #include "mongo/db/repl/timestamp_block.h" #include "mongo/db/session_catalog_mongod.h" #include "mongo/db/transaction_history_iterator.h" @@ -51,6 +53,9 @@ namespace { // If enabled, causes _applyPrepareTransaction to hang before preparing the transaction participant. MONGO_FAIL_POINT_DEFINE(applyPrepareCommandHangBeforePreparingTransaction); +// Failpoint that will cause reconstructPreparedTransactions to return early. +MONGO_FAIL_POINT_DEFINE(skipReconstructPreparedTransactions); + // Apply the oplog entries for a prepare or a prepared commit during recovery/initial sync. Status _applyOperationsForTransaction(OperationContext* opCtx, @@ -176,6 +181,7 @@ Status applyCommitTransaction(OperationContext* opCtx, invariant(entry.getTxnNumber()); opCtx->setLogicalSessionId(*entry.getSessionId()); opCtx->setTxnNumber(*entry.getTxnNumber()); + // The write on transaction table may be applied concurrently, so refreshing state // from disk may read that write, causing starting a new transaction on an existing // txnNumber. Thus, we start a new transaction without refreshing state from disk. @@ -197,13 +203,13 @@ Status applyAbortTransaction(OperationContext* opCtx, "abortTransaction is only used internally by secondaries.", mode != repl::OplogApplication::Mode::kApplyOpsCmd); - // We don't put transactions into the prepare state until the end of recovery, so there is - // no transaction to abort. - if (mode == repl::OplogApplication::Mode::kRecovering) { + // We don't put transactions into the prepare state until the end of recovery and initial sync, + // so there is no transaction to abort. + if (mode == repl::OplogApplication::Mode::kRecovering || + mode == repl::OplogApplication::Mode::kInitialSync) { return Status::OK(); } - // TODO: SERVER-36492 Only run on secondary until we support initial sync. invariant(mode == repl::OplogApplication::Mode::kSecondary); // Transaction operations are in its own batch, so we can modify their opCtx. @@ -294,8 +300,8 @@ repl::MultiApplier::Operations readTransactionOperationsFromOplogChain( namespace { /** - * This is the part of applyPrepareTransaction which is common to steady state and recovery - * oplog application. + * This is the part of applyPrepareTransaction which is common to steady state, initial sync and + * recovery oplog application. */ Status _applyPrepareTransaction(OperationContext* opCtx, const OplogEntry& entry, @@ -309,7 +315,8 @@ Status _applyPrepareTransaction(OperationContext* opCtx, return readTransactionOperationsFromOplogChain(opCtx, entry, {}); }(); - if (oplogApplicationMode == repl::OplogApplication::Mode::kRecovering) { + if (oplogApplicationMode == repl::OplogApplication::Mode::kRecovering || + oplogApplicationMode == repl::OplogApplication::Mode::kInitialSync) { // We might replay a prepared transaction behind oldest timestamp. Note that since this is // scoped to the storage transaction, and readTransactionOperationsFromOplogChain implicitly // abandons the storage transaction when it releases the global lock, this must be done @@ -357,6 +364,23 @@ Status _applyPrepareTransaction(OperationContext* opCtx, return Status::OK(); } + +/** + * Apply a prepared transaction during recovery. The OplogEntry must be an 'applyOps' with + * 'prepare' set or a prepareTransaction command. + */ +Status applyRecoveredPrepareTransaction(OperationContext* opCtx, + const OplogEntry& entry, + repl::OplogApplication::Mode mode) { + // Snapshot transactions never conflict with the PBWM lock. + invariant(!opCtx->lockState()->shouldConflictWithSecondaryBatchApplication()); + if (entry.getCommandType() == OplogEntry::CommandType::kPrepareTransaction) { + return _applyPrepareTransaction(opCtx, entry, mode); + } else { + // This is an applyOps with prepare. + return applyRecoveredPrepareApplyOpsOplogEntry(opCtx, entry, mode); + } +} } // namespace /** @@ -396,14 +420,54 @@ Status applyPrepareTransaction(OperationContext* opCtx, return _applyPrepareTransaction(opCtx, entry, oplogApplicationMode); } -Status applyRecoveredPrepareTransaction(OperationContext* opCtx, const OplogEntry& entry) { - // Snapshot transactions never conflict with the PBWM lock. - invariant(!opCtx->lockState()->shouldConflictWithSecondaryBatchApplication()); - if (entry.getCommandType() == OplogEntry::CommandType::kPrepareTransaction) { - return _applyPrepareTransaction(opCtx, entry, repl::OplogApplication::Mode::kRecovering); - } else { - // This is an applyOps with prepare. - return applyRecoveredPrepareApplyOpsOplogEntry(opCtx, entry); +void reconstructPreparedTransactions(OperationContext* opCtx, repl::OplogApplication::Mode mode) { + if (MONGO_FAIL_POINT(skipReconstructPreparedTransactions)) { + log() << "Hit skipReconstructPreparedTransactions failpoint"; + return; + } + // Read the transactions table with its own snapshot and read timestamp. + ReadSourceScope readSourceScope(opCtx); + + DBDirectClient client(opCtx); + const auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace, + {BSON("state" + << "prepared")}); + + // Iterate over each entry in the transactions table that has a prepared transaction. + while (cursor->more()) { + const auto txnRecordObj = cursor->next(); + const auto txnRecord = SessionTxnRecord::parse( + IDLParserErrorContext("recovering prepared transaction"), txnRecordObj); + + invariant(txnRecord.getState() == DurableTxnStateEnum::kPrepared); + + // Get the prepareTransaction oplog entry corresponding to this transactions table entry. + const auto prepareOpTime = txnRecord.getLastWriteOpTime(); + invariant(!prepareOpTime.isNull()); + TransactionHistoryIterator iter(prepareOpTime); + invariant(iter.hasNext()); + auto prepareOplogEntry = iter.next(opCtx); + + { + // Make a new opCtx so that we can set the lsid when applying the prepare transaction + // oplog entry. + auto newClient = + opCtx->getServiceContext()->makeClient("reconstruct-prepared-transactions"); + AlternativeClientRegion acr(newClient); + const auto newOpCtx = cc().makeOperationContext(); + repl::UnreplicatedWritesBlock uwb(newOpCtx.get()); + + // Snapshot transaction can never conflict with the PBWM lock. + newOpCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(false); + + // TODO: SERVER-40177 This should be removed once it is guaranteed operations applied on + // recovering nodes cannot encounter unnecessary prepare conflicts. + newOpCtx->recoveryUnit()->setIgnorePrepared(true); + + // Checks out the session, applies the operations and prepares the transaction. + uassertStatusOK( + applyRecoveredPrepareTransaction(newOpCtx.get(), prepareOplogEntry, mode)); + } } } diff --git a/src/mongo/db/repl/transaction_oplog_application.h b/src/mongo/db/repl/transaction_oplog_application.h index bc960303801..09607e820f2 100644 --- a/src/mongo/db/repl/transaction_oplog_application.h +++ b/src/mongo/db/repl/transaction_oplog_application.h @@ -66,10 +66,10 @@ Status applyPrepareTransaction(OperationContext* opCtx, const repl::OplogEntry& entry, repl::OplogApplication::Mode mode); -/** - * Apply a prepared transaction during recovery. The OplogEntry must be an 'applyOps' with - * 'prepare' set or a prepareTransaction command. +/* + * Reconstruct prepared transactions by iterating over the transactions table to see which + * transactions should be in the prepared state, getting the corresponding oplog entry and applying + * the operations. Called at the end of rollback, startup recovery and initial sync. */ -Status applyRecoveredPrepareTransaction(OperationContext* opCtx, const repl::OplogEntry& entry); - +void reconstructPreparedTransactions(OperationContext* opCtx, repl::OplogApplication::Mode mode); } // namespace mongo |