diff options
author | A. Jesse Jiryu Davis <jesse@mongodb.com> | 2021-12-06 18:26:12 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-12-06 18:50:24 +0000 |
commit | ea6a59377c01ed48157557aaaae0bd8191b7fa4e (patch) | |
tree | 45b158df64366a4fbac7186a55b54234bd865792 | |
parent | 9707356f1afe773e29b64858835f55a4a93d35d8 (diff) | |
download | mongo-ea6a59377c01ed48157557aaaae0bd8191b7fa4e.tar.gz |
SERVER-61666 Fix migration for txns before startApplying
-rw-r--r-- | jstests/replsets/tenant_migration_transaction_boundary.js | 88 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_oplog_applier.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_oplog_batcher.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_oplog_batcher.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_oplog_batcher_test.cpp | 38 |
5 files changed, 120 insertions, 24 deletions
diff --git a/jstests/replsets/tenant_migration_transaction_boundary.js b/jstests/replsets/tenant_migration_transaction_boundary.js new file mode 100644 index 00000000000..4696aa61412 --- /dev/null +++ b/jstests/replsets/tenant_migration_transaction_boundary.js @@ -0,0 +1,88 @@ +/** + * Test a (non-prepared) large-format committed transaction T, with at least one oplog entry before + * startFetchingDonorOpTime, and a commit OpTime between startFetchingDonorOpTime and + * startApplyingAfterOpTime. + * + * Donor Oplog + * *-------------------*-------------------*-------------------* + * T applyOps entry startFetching T commit entry startApplying + * | + * <-------------- prevOpTime -------------* + * + * The recipient doesn't need to recover T's oplog chain, since T committed before startApplying, + * and trying to recover T would fail because the recipient didn't fetch T's oldest entries. + * + * @tags: [ + * incompatible_with_eft, + * incompatible_with_macos, + * incompatible_with_windows_tls, + * requires_majority_read_concern, + * requires_persistence, + * ] + */ + +(function() { +"use strict"; + +load("jstests/replsets/libs/tenant_migration_test.js"); +load("jstests/replsets/libs/tenant_migration_util.js"); +load("jstests/replsets/rslib.js"); +load("jstests/libs/uuid_util.js"); + +const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()}); + +const tenantId = "testTenantId"; +const tenantDB = tenantMigrationTest.tenantDB(tenantId, "testDB"); +const collName = "testColl"; +const tenantNS = `${tenantDB}.${collName}`; +const transactionsNS = "config.transactions"; + +const donorPrimary = tenantMigrationTest.getDonorPrimary(); +const recipientPrimary = tenantMigrationTest.getRecipientPrimary(); + +jsTestLog("Running a migration"); +const migrationId = UUID(); +const migrationOpts = { + migrationIdString: extractUUIDFromObject(migrationId), + tenantId, +}; + +jsTestLog("Hang just before getting start opTimes from donor"); +let fp = + configureFailPoint(recipientPrimary, "fpAfterComparingRecipientAndDonorFCV", {action: "hang"}); +tenantMigrationTest.startMigration(migrationOpts); + +fp.wait(); + +{ + // This transaction will straddle startFetching - the oplog entry for the commit will have a + // timestamp equal to startFetching, and previous entries will have timestamps earlier than it. + jsTestLog("Run and commit a transaction prior to the migration"); + const session = donorPrimary.startSession({causalConsistency: false}); + const sessionDb = session.getDatabase(tenantDB); + const sessionColl = sessionDb.getCollection(collName); + + function makeLargeDoc(numMB) { + return {x: new Array(numMB * 1024 * 1024).join('A')}; + } + + session.startTransaction(); + sessionColl.insert({doc: makeLargeDoc(10)}); + sessionColl.insert({doc: makeLargeDoc(5)}); + sessionColl.insert({doc: makeLargeDoc(5)}); + let commitRes = session.commitTransaction_forTesting(); + assert.eq(1, commitRes.ok); + session.endSession(); +} + +jsTestLog("LastWriteOpTime of transaction is " + + tojson(donorPrimary.getCollection(transactionsNS) + .find({}, {"_id": -1, "lastWriteOpTime": 1}) + .toArray())); + +fp.off(); +TenantMigrationTest.assertCommitted(tenantMigrationTest.waitForMigrationToComplete(migrationOpts)); +assert.eq(recipientPrimary.getCollection(tenantNS).countDocuments({}), 3); +assert.eq(recipientPrimary.getCollection(tenantNS).count(), 3); +tenantMigrationTest.stop(); +})(); diff --git a/src/mongo/db/repl/tenant_oplog_applier.cpp b/src/mongo/db/repl/tenant_oplog_applier.cpp index 794cb4e58db..838b53807a0 100644 --- a/src/mongo/db/repl/tenant_oplog_applier.cpp +++ b/src/mongo/db/repl/tenant_oplog_applier.cpp @@ -120,8 +120,8 @@ void TenantOplogApplier::setCloneFinishedRecipientOpTime(OpTime cloneFinishedRec } Status TenantOplogApplier::_doStartup_inlock() noexcept { - _oplogBatcher = - std::make_shared<TenantOplogBatcher>(_tenantId, _oplogBuffer, _executor, _resumeBatchingTs); + _oplogBatcher = std::make_shared<TenantOplogBatcher>( + _tenantId, _oplogBuffer, _executor, _resumeBatchingTs, _beginApplyingAfterOpTime); auto status = _oplogBatcher->startup(); if (!status.isOK()) return status; diff --git a/src/mongo/db/repl/tenant_oplog_batcher.cpp b/src/mongo/db/repl/tenant_oplog_batcher.cpp index e0d7539fcf6..c7a3bf3c87b 100644 --- a/src/mongo/db/repl/tenant_oplog_batcher.cpp +++ b/src/mongo/db/repl/tenant_oplog_batcher.cpp @@ -42,11 +42,13 @@ namespace repl { TenantOplogBatcher::TenantOplogBatcher(const std::string& tenantId, RandomAccessOplogBuffer* oplogBuffer, std::shared_ptr<executor::TaskExecutor> executor, - Timestamp resumeBatchingTs) + Timestamp resumeBatchingTs, + OpTime beginApplyingAfterOpTime) : AbstractAsyncComponent(executor.get(), std::string("TenantOplogBatcher_") + tenantId), _oplogBuffer(oplogBuffer), _executor(executor), - _resumeBatchingTs(resumeBatchingTs) {} + _resumeBatchingTs(resumeBatchingTs), + _beginApplyingAfterOpTime(beginApplyingAfterOpTime) {} TenantOplogBatcher::~TenantOplogBatcher() { shutdown(); @@ -62,6 +64,10 @@ void TenantOplogBatcher::_pushEntry(OperationContext* opCtx, !op.isPreparedCommit() && (op.getCommandType() != OplogEntry::CommandType::kApplyOps || !op.shouldPrepare())); if (op.isTerminalApplyOps()) { + if (op.getOpTime() <= _beginApplyingAfterOpTime) { + // Fetched for the sake of retryable commitTransaction, don't need to apply. + return; + } // All applyOps entries are expanded and the expansions put in the batch expansion array. // The original applyOps is kept in the batch ops array. // This applies to both multi-document transactions and atomic applyOps. diff --git a/src/mongo/db/repl/tenant_oplog_batcher.h b/src/mongo/db/repl/tenant_oplog_batcher.h index a2c266061c5..cfdb57da321 100644 --- a/src/mongo/db/repl/tenant_oplog_batcher.h +++ b/src/mongo/db/repl/tenant_oplog_batcher.h @@ -76,7 +76,8 @@ public: TenantOplogBatcher(const std::string& tenantId, RandomAccessOplogBuffer* oplogBuffer, std::shared_ptr<executor::TaskExecutor> executor, - Timestamp resumeBatchingTs); + Timestamp resumeBatchingTs, + OpTime beginApplyingAfterOpTime); virtual ~TenantOplogBatcher(); @@ -118,6 +119,7 @@ private: bool _batchRequested = false; // (M) std::shared_ptr<executor::TaskExecutor> _executor; // (R) const Timestamp _resumeBatchingTs; // (R) + const OpTime _beginApplyingAfterOpTime; // (R) }; } // namespace repl diff --git a/src/mongo/db/repl/tenant_oplog_batcher_test.cpp b/src/mongo/db/repl/tenant_oplog_batcher_test.cpp index 47d02f9084a..f7f3409b2fa 100644 --- a/src/mongo/db/repl/tenant_oplog_batcher_test.cpp +++ b/src/mongo/db/repl/tenant_oplog_batcher_test.cpp @@ -102,7 +102,7 @@ constexpr auto dbName = "tenant_test"_sd; TEST_F(TenantOplogBatcherTest, CannotRequestTwoBatchesAtOnce) { auto batcher = std::make_shared<TenantOplogBatcher>( - "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */); + "tenant", &_oplogBuffer, _executor, Timestamp(), OpTime()); ASSERT_OK(batcher->startup()); auto batchFuture = batcher->getNextBatch(bigBatchLimits); // We just started, no batch should be available. @@ -116,7 +116,7 @@ TEST_F(TenantOplogBatcherTest, CannotRequestTwoBatchesAtOnce) { TEST_F(TenantOplogBatcherTest, OplogBatcherGroupsCrudOps) { auto batcher = std::make_shared<TenantOplogBatcher>( - "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */); + "tenant", &_oplogBuffer, _executor, Timestamp(), OpTime()); ASSERT_OK(batcher->startup()); auto batchFuture = batcher->getNextBatch(bigBatchLimits); // We just started, no batch should be available. @@ -141,7 +141,7 @@ TEST_F(TenantOplogBatcherTest, OplogBatcherGroupsCrudOps) { TEST_F(TenantOplogBatcherTest, OplogBatcherFailsOnPreparedApplyOps) { auto batcher = std::make_shared<TenantOplogBatcher>( - "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */); + "tenant", &_oplogBuffer, _executor, Timestamp(), OpTime()); ASSERT_OK(batcher->startup()); auto batchFuture = batcher->getNextBatch(bigBatchLimits); @@ -156,7 +156,7 @@ TEST_F(TenantOplogBatcherTest, OplogBatcherFailsOnPreparedApplyOps) { TEST_F(TenantOplogBatcherTest, OplogBatcherFailsOnPreparedCommit) { auto batcher = std::make_shared<TenantOplogBatcher>( - "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */); + "tenant", &_oplogBuffer, _executor, Timestamp(), OpTime()); ASSERT_OK(batcher->startup()); auto batchFuture = batcher->getNextBatch(bigBatchLimits); @@ -188,7 +188,7 @@ TEST_F(TenantOplogBatcherTest, GetNextApplierBatchGroupsUnpreparedApplyOpsOpWith srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar")).getEntry().toBSON()); auto batcher = std::make_shared<TenantOplogBatcher>( - "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */); + "tenant", &_oplogBuffer, _executor, Timestamp(), OpTime()); ASSERT_OK(batcher->startup()); auto batchFuture = batcher->getNextBatch(bigBatchLimits); _oplogBuffer.push(nullptr, srcOps.cbegin(), srcOps.cend()); @@ -222,7 +222,7 @@ TEST_F(TenantOplogBatcherTest, GetNextApplierBatchGroupsMultipleTransactions) { srcOps.push_back(makeApplyOpsOplogEntry(2, false, innerOps2).getEntry().toBSON()); auto batcher = std::make_shared<TenantOplogBatcher>( - "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */); + "tenant", &_oplogBuffer, _executor, Timestamp(), OpTime()); ASSERT_OK(batcher->startup()); auto batchFuture = batcher->getNextBatch(bigBatchLimits); _oplogBuffer.push(nullptr, srcOps.cbegin(), srcOps.cend()); @@ -266,7 +266,7 @@ TEST_F(TenantOplogBatcherTest, GetNextApplierBatchChecksBatchLimitsForNumberOfOp auto limits = bigBatchLimits; limits.ops = 3U; auto batcher = std::make_shared<TenantOplogBatcher>( - "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */); + "tenant", &_oplogBuffer, _executor, Timestamp(), OpTime()); ASSERT_OK(batcher->startup()); auto batchFuture = batcher->getNextBatch(limits); @@ -298,7 +298,7 @@ TEST_F(TenantOplogBatcherTest, GetNextApplierBatchChecksBatchLimitsForSizeOfOper auto limits = bigBatchLimits; limits.bytes = std::size_t(srcOps[0].objsize() + srcOps[1].objsize()); auto batcher = std::make_shared<TenantOplogBatcher>( - "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */); + "tenant", &_oplogBuffer, _executor, Timestamp(), OpTime()); ASSERT_OK(batcher->startup()); auto batchFuture = batcher->getNextBatch(limits); @@ -344,7 +344,7 @@ TEST_F(TenantOplogBatcherTest, LargeTransactionProcessedIndividuallyAndExpanded) _oplogBuffer.push(nullptr, srcOps.cbegin(), srcOps.cend()); auto batcher = std::make_shared<TenantOplogBatcher>( - "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */); + "tenant", &_oplogBuffer, _executor, Timestamp(), OpTime()); ASSERT_OK(batcher->startup()); auto batchFuture = batcher->getNextBatch(bigBatchLimits); @@ -397,7 +397,7 @@ TEST_F(TenantOplogBatcherTest, LargeTransactionProcessedIndividuallyAndExpanded) TEST_F(TenantOplogBatcherTest, OplogBatcherRetreivesPreImageOutOfOrder) { auto batcher = std::make_shared<TenantOplogBatcher>( - "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */); + "tenant", &_oplogBuffer, _executor, Timestamp(), OpTime()); ASSERT_OK(batcher->startup()); auto batchFuture = batcher->getNextBatch(bigBatchLimits); // We just started, no batch should be available. @@ -432,7 +432,7 @@ TEST_F(TenantOplogBatcherTest, OplogBatcherRetreivesPreImageOutOfOrder) { TEST_F(TenantOplogBatcherTest, OplogBatcherRetreivesPostImageOutOfOrder) { auto batcher = std::make_shared<TenantOplogBatcher>( - "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */); + "tenant", &_oplogBuffer, _executor, Timestamp(), OpTime()); ASSERT_OK(batcher->startup()); auto batchFuture = batcher->getNextBatch(bigBatchLimits); // We just started, no batch should be available. @@ -470,7 +470,7 @@ TEST_F(TenantOplogBatcherTest, OplogBatcherRetreivesPostImageOutOfOrder) { TEST_F(TenantOplogBatcherTest, GetNextApplierBatchRejectsZeroBatchOpsLimits) { auto batcher = std::make_shared<TenantOplogBatcher>( - "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */); + "tenant", &_oplogBuffer, _executor, Timestamp(), OpTime()); ASSERT_OK(batcher->startup()); // bigBatchLimits is a legal batch limit. auto limits = bigBatchLimits; @@ -483,7 +483,7 @@ TEST_F(TenantOplogBatcherTest, GetNextApplierBatchRejectsZeroBatchOpsLimits) { TEST_F(TenantOplogBatcherTest, OplogBatcherRetreivesPreImageBeforeBatchStart) { auto batcher = std::make_shared<TenantOplogBatcher>( - "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */); + "tenant", &_oplogBuffer, _executor, Timestamp(), OpTime()); ASSERT_OK(batcher->startup()); std::vector<BSONObj> srcOps; srcOps.push_back(makeNoopOplogEntry(1, "preImage").getEntry().toBSON()); @@ -514,7 +514,7 @@ TEST_F(TenantOplogBatcherTest, OplogBatcherRetreivesPreImageBeforeBatchStart) { TEST_F(TenantOplogBatcherTest, OplogBatcherRetreivesPostImageBeforeBatchStart) { auto batcher = std::make_shared<TenantOplogBatcher>( - "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */); + "tenant", &_oplogBuffer, _executor, Timestamp(), OpTime()); ASSERT_OK(batcher->startup()); std::vector<BSONObj> srcOps; srcOps.push_back(makeNoopOplogEntry(1, "postImage").getEntry().toBSON()); @@ -548,7 +548,7 @@ TEST_F(TenantOplogBatcherTest, OplogBatcherRetreivesPostImageBeforeBatchStart) { TEST_F(TenantOplogBatcherTest, GetNextApplierBatchRejectsZeroBatchSizeLimits) { auto batcher = std::make_shared<TenantOplogBatcher>( - "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */); + "tenant", &_oplogBuffer, _executor, Timestamp(), OpTime()); ASSERT_OK(batcher->startup()); // bigBatchLimits is a legal batch limit. auto limits = bigBatchLimits; @@ -568,8 +568,8 @@ TEST_F(TenantOplogBatcherTest, ResumeOplogBatcherFromTimestamp) { srcOps.push_back(makeInsertOplogEntry(5, NamespaceString(dbName, "bar")).getEntry().toBSON()); _oplogBuffer.push(nullptr, srcOps.cbegin(), srcOps.cend()); - auto batcher = - std::make_shared<TenantOplogBatcher>("tenant", &_oplogBuffer, _executor, Timestamp(4, 1)); + auto batcher = std::make_shared<TenantOplogBatcher>( + "tenant", &_oplogBuffer, _executor, Timestamp(4, 1), OpTime()); ASSERT_OK(batcher->startup()); auto batchFuture = batcher->getNextBatch(bigBatchLimits); @@ -588,8 +588,8 @@ TEST_F(TenantOplogBatcherTest, ResumeOplogBatcherFromNonExistentTimestamp) { srcOps.push_back(makeInsertOplogEntry(5, NamespaceString(dbName, "bar")).getEntry().toBSON()); _oplogBuffer.push(nullptr, srcOps.cbegin(), srcOps.cend()); - auto batcher = - std::make_shared<TenantOplogBatcher>("tenant", &_oplogBuffer, _executor, Timestamp(3, 1)); + auto batcher = std::make_shared<TenantOplogBatcher>( + "tenant", &_oplogBuffer, _executor, Timestamp(3, 1), OpTime()); ASSERT_OK(batcher->startup()); auto batchFuture = batcher->getNextBatch(bigBatchLimits); |