summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorA. Jesse Jiryu Davis <jesse@mongodb.com>2021-12-06 18:26:12 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-12-06 18:50:24 +0000
commitea6a59377c01ed48157557aaaae0bd8191b7fa4e (patch)
tree45b158df64366a4fbac7186a55b54234bd865792
parent9707356f1afe773e29b64858835f55a4a93d35d8 (diff)
downloadmongo-ea6a59377c01ed48157557aaaae0bd8191b7fa4e.tar.gz
SERVER-61666 Fix migration for txns before startApplying
-rw-r--r--jstests/replsets/tenant_migration_transaction_boundary.js88
-rw-r--r--src/mongo/db/repl/tenant_oplog_applier.cpp4
-rw-r--r--src/mongo/db/repl/tenant_oplog_batcher.cpp10
-rw-r--r--src/mongo/db/repl/tenant_oplog_batcher.h4
-rw-r--r--src/mongo/db/repl/tenant_oplog_batcher_test.cpp38
5 files changed, 120 insertions, 24 deletions
diff --git a/jstests/replsets/tenant_migration_transaction_boundary.js b/jstests/replsets/tenant_migration_transaction_boundary.js
new file mode 100644
index 00000000000..4696aa61412
--- /dev/null
+++ b/jstests/replsets/tenant_migration_transaction_boundary.js
@@ -0,0 +1,88 @@
+/**
+ * Test a (non-prepared) large-format committed transaction T, with at least one oplog entry before
+ * startFetchingDonorOpTime, and a commit OpTime between startFetchingDonorOpTime and
+ * startApplyingAfterOpTime.
+ *
+ * Donor Oplog
+ * *-------------------*-------------------*-------------------*
+ * T applyOps entry startFetching T commit entry startApplying
+ * |
+ * <-------------- prevOpTime -------------*
+ *
+ * The recipient doesn't need to recover T's oplog chain, since T committed before startApplying,
+ * and trying to recover T would fail because the recipient didn't fetch T's oldest entries.
+ *
+ * @tags: [
+ * incompatible_with_eft,
+ * incompatible_with_macos,
+ * incompatible_with_windows_tls,
+ * requires_majority_read_concern,
+ * requires_persistence,
+ * ]
+ */
+
+(function() {
+"use strict";
+
+load("jstests/replsets/libs/tenant_migration_test.js");
+load("jstests/replsets/libs/tenant_migration_util.js");
+load("jstests/replsets/rslib.js");
+load("jstests/libs/uuid_util.js");
+
+const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()});
+
+const tenantId = "testTenantId";
+const tenantDB = tenantMigrationTest.tenantDB(tenantId, "testDB");
+const collName = "testColl";
+const tenantNS = `${tenantDB}.${collName}`;
+const transactionsNS = "config.transactions";
+
+const donorPrimary = tenantMigrationTest.getDonorPrimary();
+const recipientPrimary = tenantMigrationTest.getRecipientPrimary();
+
+jsTestLog("Running a migration");
+const migrationId = UUID();
+const migrationOpts = {
+ migrationIdString: extractUUIDFromObject(migrationId),
+ tenantId,
+};
+
+jsTestLog("Hang just before getting start opTimes from donor");
+let fp =
+ configureFailPoint(recipientPrimary, "fpAfterComparingRecipientAndDonorFCV", {action: "hang"});
+tenantMigrationTest.startMigration(migrationOpts);
+
+fp.wait();
+
+{
+ // This transaction will straddle startFetching - the oplog entry for the commit will have a
+ // timestamp equal to startFetching, and previous entries will have timestamps earlier than it.
+ jsTestLog("Run and commit a transaction prior to the migration");
+ const session = donorPrimary.startSession({causalConsistency: false});
+ const sessionDb = session.getDatabase(tenantDB);
+ const sessionColl = sessionDb.getCollection(collName);
+
+ function makeLargeDoc(numMB) {
+ return {x: new Array(numMB * 1024 * 1024).join('A')};
+ }
+
+ session.startTransaction();
+ sessionColl.insert({doc: makeLargeDoc(10)});
+ sessionColl.insert({doc: makeLargeDoc(5)});
+ sessionColl.insert({doc: makeLargeDoc(5)});
+ let commitRes = session.commitTransaction_forTesting();
+ assert.eq(1, commitRes.ok);
+ session.endSession();
+}
+
+jsTestLog("LastWriteOpTime of transaction is " +
+ tojson(donorPrimary.getCollection(transactionsNS)
+ .find({}, {"_id": -1, "lastWriteOpTime": 1})
+ .toArray()));
+
+fp.off();
+TenantMigrationTest.assertCommitted(tenantMigrationTest.waitForMigrationToComplete(migrationOpts));
+assert.eq(recipientPrimary.getCollection(tenantNS).countDocuments({}), 3);
+assert.eq(recipientPrimary.getCollection(tenantNS).count(), 3);
+tenantMigrationTest.stop();
+})();
diff --git a/src/mongo/db/repl/tenant_oplog_applier.cpp b/src/mongo/db/repl/tenant_oplog_applier.cpp
index 794cb4e58db..838b53807a0 100644
--- a/src/mongo/db/repl/tenant_oplog_applier.cpp
+++ b/src/mongo/db/repl/tenant_oplog_applier.cpp
@@ -120,8 +120,8 @@ void TenantOplogApplier::setCloneFinishedRecipientOpTime(OpTime cloneFinishedRec
}
Status TenantOplogApplier::_doStartup_inlock() noexcept {
- _oplogBatcher =
- std::make_shared<TenantOplogBatcher>(_tenantId, _oplogBuffer, _executor, _resumeBatchingTs);
+ _oplogBatcher = std::make_shared<TenantOplogBatcher>(
+ _tenantId, _oplogBuffer, _executor, _resumeBatchingTs, _beginApplyingAfterOpTime);
auto status = _oplogBatcher->startup();
if (!status.isOK())
return status;
diff --git a/src/mongo/db/repl/tenant_oplog_batcher.cpp b/src/mongo/db/repl/tenant_oplog_batcher.cpp
index e0d7539fcf6..c7a3bf3c87b 100644
--- a/src/mongo/db/repl/tenant_oplog_batcher.cpp
+++ b/src/mongo/db/repl/tenant_oplog_batcher.cpp
@@ -42,11 +42,13 @@ namespace repl {
TenantOplogBatcher::TenantOplogBatcher(const std::string& tenantId,
RandomAccessOplogBuffer* oplogBuffer,
std::shared_ptr<executor::TaskExecutor> executor,
- Timestamp resumeBatchingTs)
+ Timestamp resumeBatchingTs,
+ OpTime beginApplyingAfterOpTime)
: AbstractAsyncComponent(executor.get(), std::string("TenantOplogBatcher_") + tenantId),
_oplogBuffer(oplogBuffer),
_executor(executor),
- _resumeBatchingTs(resumeBatchingTs) {}
+ _resumeBatchingTs(resumeBatchingTs),
+ _beginApplyingAfterOpTime(beginApplyingAfterOpTime) {}
TenantOplogBatcher::~TenantOplogBatcher() {
shutdown();
@@ -62,6 +64,10 @@ void TenantOplogBatcher::_pushEntry(OperationContext* opCtx,
!op.isPreparedCommit() &&
(op.getCommandType() != OplogEntry::CommandType::kApplyOps || !op.shouldPrepare()));
if (op.isTerminalApplyOps()) {
+ if (op.getOpTime() <= _beginApplyingAfterOpTime) {
+ // Fetched for the sake of retryable commitTransaction, don't need to apply.
+ return;
+ }
// All applyOps entries are expanded and the expansions put in the batch expansion array.
// The original applyOps is kept in the batch ops array.
// This applies to both multi-document transactions and atomic applyOps.
diff --git a/src/mongo/db/repl/tenant_oplog_batcher.h b/src/mongo/db/repl/tenant_oplog_batcher.h
index a2c266061c5..cfdb57da321 100644
--- a/src/mongo/db/repl/tenant_oplog_batcher.h
+++ b/src/mongo/db/repl/tenant_oplog_batcher.h
@@ -76,7 +76,8 @@ public:
TenantOplogBatcher(const std::string& tenantId,
RandomAccessOplogBuffer* oplogBuffer,
std::shared_ptr<executor::TaskExecutor> executor,
- Timestamp resumeBatchingTs);
+ Timestamp resumeBatchingTs,
+ OpTime beginApplyingAfterOpTime);
virtual ~TenantOplogBatcher();
@@ -118,6 +119,7 @@ private:
bool _batchRequested = false; // (M)
std::shared_ptr<executor::TaskExecutor> _executor; // (R)
const Timestamp _resumeBatchingTs; // (R)
+ const OpTime _beginApplyingAfterOpTime; // (R)
};
} // namespace repl
diff --git a/src/mongo/db/repl/tenant_oplog_batcher_test.cpp b/src/mongo/db/repl/tenant_oplog_batcher_test.cpp
index 47d02f9084a..f7f3409b2fa 100644
--- a/src/mongo/db/repl/tenant_oplog_batcher_test.cpp
+++ b/src/mongo/db/repl/tenant_oplog_batcher_test.cpp
@@ -102,7 +102,7 @@ constexpr auto dbName = "tenant_test"_sd;
TEST_F(TenantOplogBatcherTest, CannotRequestTwoBatchesAtOnce) {
auto batcher = std::make_shared<TenantOplogBatcher>(
- "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */);
+ "tenant", &_oplogBuffer, _executor, Timestamp(), OpTime());
ASSERT_OK(batcher->startup());
auto batchFuture = batcher->getNextBatch(bigBatchLimits);
// We just started, no batch should be available.
@@ -116,7 +116,7 @@ TEST_F(TenantOplogBatcherTest, CannotRequestTwoBatchesAtOnce) {
TEST_F(TenantOplogBatcherTest, OplogBatcherGroupsCrudOps) {
auto batcher = std::make_shared<TenantOplogBatcher>(
- "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */);
+ "tenant", &_oplogBuffer, _executor, Timestamp(), OpTime());
ASSERT_OK(batcher->startup());
auto batchFuture = batcher->getNextBatch(bigBatchLimits);
// We just started, no batch should be available.
@@ -141,7 +141,7 @@ TEST_F(TenantOplogBatcherTest, OplogBatcherGroupsCrudOps) {
TEST_F(TenantOplogBatcherTest, OplogBatcherFailsOnPreparedApplyOps) {
auto batcher = std::make_shared<TenantOplogBatcher>(
- "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */);
+ "tenant", &_oplogBuffer, _executor, Timestamp(), OpTime());
ASSERT_OK(batcher->startup());
auto batchFuture = batcher->getNextBatch(bigBatchLimits);
@@ -156,7 +156,7 @@ TEST_F(TenantOplogBatcherTest, OplogBatcherFailsOnPreparedApplyOps) {
TEST_F(TenantOplogBatcherTest, OplogBatcherFailsOnPreparedCommit) {
auto batcher = std::make_shared<TenantOplogBatcher>(
- "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */);
+ "tenant", &_oplogBuffer, _executor, Timestamp(), OpTime());
ASSERT_OK(batcher->startup());
auto batchFuture = batcher->getNextBatch(bigBatchLimits);
@@ -188,7 +188,7 @@ TEST_F(TenantOplogBatcherTest, GetNextApplierBatchGroupsUnpreparedApplyOpsOpWith
srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar")).getEntry().toBSON());
auto batcher = std::make_shared<TenantOplogBatcher>(
- "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */);
+ "tenant", &_oplogBuffer, _executor, Timestamp(), OpTime());
ASSERT_OK(batcher->startup());
auto batchFuture = batcher->getNextBatch(bigBatchLimits);
_oplogBuffer.push(nullptr, srcOps.cbegin(), srcOps.cend());
@@ -222,7 +222,7 @@ TEST_F(TenantOplogBatcherTest, GetNextApplierBatchGroupsMultipleTransactions) {
srcOps.push_back(makeApplyOpsOplogEntry(2, false, innerOps2).getEntry().toBSON());
auto batcher = std::make_shared<TenantOplogBatcher>(
- "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */);
+ "tenant", &_oplogBuffer, _executor, Timestamp(), OpTime());
ASSERT_OK(batcher->startup());
auto batchFuture = batcher->getNextBatch(bigBatchLimits);
_oplogBuffer.push(nullptr, srcOps.cbegin(), srcOps.cend());
@@ -266,7 +266,7 @@ TEST_F(TenantOplogBatcherTest, GetNextApplierBatchChecksBatchLimitsForNumberOfOp
auto limits = bigBatchLimits;
limits.ops = 3U;
auto batcher = std::make_shared<TenantOplogBatcher>(
- "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */);
+ "tenant", &_oplogBuffer, _executor, Timestamp(), OpTime());
ASSERT_OK(batcher->startup());
auto batchFuture = batcher->getNextBatch(limits);
@@ -298,7 +298,7 @@ TEST_F(TenantOplogBatcherTest, GetNextApplierBatchChecksBatchLimitsForSizeOfOper
auto limits = bigBatchLimits;
limits.bytes = std::size_t(srcOps[0].objsize() + srcOps[1].objsize());
auto batcher = std::make_shared<TenantOplogBatcher>(
- "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */);
+ "tenant", &_oplogBuffer, _executor, Timestamp(), OpTime());
ASSERT_OK(batcher->startup());
auto batchFuture = batcher->getNextBatch(limits);
@@ -344,7 +344,7 @@ TEST_F(TenantOplogBatcherTest, LargeTransactionProcessedIndividuallyAndExpanded)
_oplogBuffer.push(nullptr, srcOps.cbegin(), srcOps.cend());
auto batcher = std::make_shared<TenantOplogBatcher>(
- "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */);
+ "tenant", &_oplogBuffer, _executor, Timestamp(), OpTime());
ASSERT_OK(batcher->startup());
auto batchFuture = batcher->getNextBatch(bigBatchLimits);
@@ -397,7 +397,7 @@ TEST_F(TenantOplogBatcherTest, LargeTransactionProcessedIndividuallyAndExpanded)
TEST_F(TenantOplogBatcherTest, OplogBatcherRetreivesPreImageOutOfOrder) {
auto batcher = std::make_shared<TenantOplogBatcher>(
- "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */);
+ "tenant", &_oplogBuffer, _executor, Timestamp(), OpTime());
ASSERT_OK(batcher->startup());
auto batchFuture = batcher->getNextBatch(bigBatchLimits);
// We just started, no batch should be available.
@@ -432,7 +432,7 @@ TEST_F(TenantOplogBatcherTest, OplogBatcherRetreivesPreImageOutOfOrder) {
TEST_F(TenantOplogBatcherTest, OplogBatcherRetreivesPostImageOutOfOrder) {
auto batcher = std::make_shared<TenantOplogBatcher>(
- "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */);
+ "tenant", &_oplogBuffer, _executor, Timestamp(), OpTime());
ASSERT_OK(batcher->startup());
auto batchFuture = batcher->getNextBatch(bigBatchLimits);
// We just started, no batch should be available.
@@ -470,7 +470,7 @@ TEST_F(TenantOplogBatcherTest, OplogBatcherRetreivesPostImageOutOfOrder) {
TEST_F(TenantOplogBatcherTest, GetNextApplierBatchRejectsZeroBatchOpsLimits) {
auto batcher = std::make_shared<TenantOplogBatcher>(
- "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */);
+ "tenant", &_oplogBuffer, _executor, Timestamp(), OpTime());
ASSERT_OK(batcher->startup());
// bigBatchLimits is a legal batch limit.
auto limits = bigBatchLimits;
@@ -483,7 +483,7 @@ TEST_F(TenantOplogBatcherTest, GetNextApplierBatchRejectsZeroBatchOpsLimits) {
TEST_F(TenantOplogBatcherTest, OplogBatcherRetreivesPreImageBeforeBatchStart) {
auto batcher = std::make_shared<TenantOplogBatcher>(
- "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */);
+ "tenant", &_oplogBuffer, _executor, Timestamp(), OpTime());
ASSERT_OK(batcher->startup());
std::vector<BSONObj> srcOps;
srcOps.push_back(makeNoopOplogEntry(1, "preImage").getEntry().toBSON());
@@ -514,7 +514,7 @@ TEST_F(TenantOplogBatcherTest, OplogBatcherRetreivesPreImageBeforeBatchStart) {
TEST_F(TenantOplogBatcherTest, OplogBatcherRetreivesPostImageBeforeBatchStart) {
auto batcher = std::make_shared<TenantOplogBatcher>(
- "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */);
+ "tenant", &_oplogBuffer, _executor, Timestamp(), OpTime());
ASSERT_OK(batcher->startup());
std::vector<BSONObj> srcOps;
srcOps.push_back(makeNoopOplogEntry(1, "postImage").getEntry().toBSON());
@@ -548,7 +548,7 @@ TEST_F(TenantOplogBatcherTest, OplogBatcherRetreivesPostImageBeforeBatchStart) {
TEST_F(TenantOplogBatcherTest, GetNextApplierBatchRejectsZeroBatchSizeLimits) {
auto batcher = std::make_shared<TenantOplogBatcher>(
- "tenant", &_oplogBuffer, _executor, Timestamp(0, 0) /* resumeBatchingTs */);
+ "tenant", &_oplogBuffer, _executor, Timestamp(), OpTime());
ASSERT_OK(batcher->startup());
// bigBatchLimits is a legal batch limit.
auto limits = bigBatchLimits;
@@ -568,8 +568,8 @@ TEST_F(TenantOplogBatcherTest, ResumeOplogBatcherFromTimestamp) {
srcOps.push_back(makeInsertOplogEntry(5, NamespaceString(dbName, "bar")).getEntry().toBSON());
_oplogBuffer.push(nullptr, srcOps.cbegin(), srcOps.cend());
- auto batcher =
- std::make_shared<TenantOplogBatcher>("tenant", &_oplogBuffer, _executor, Timestamp(4, 1));
+ auto batcher = std::make_shared<TenantOplogBatcher>(
+ "tenant", &_oplogBuffer, _executor, Timestamp(4, 1), OpTime());
ASSERT_OK(batcher->startup());
auto batchFuture = batcher->getNextBatch(bigBatchLimits);
@@ -588,8 +588,8 @@ TEST_F(TenantOplogBatcherTest, ResumeOplogBatcherFromNonExistentTimestamp) {
srcOps.push_back(makeInsertOplogEntry(5, NamespaceString(dbName, "bar")).getEntry().toBSON());
_oplogBuffer.push(nullptr, srcOps.cbegin(), srcOps.cend());
- auto batcher =
- std::make_shared<TenantOplogBatcher>("tenant", &_oplogBuffer, _executor, Timestamp(3, 1));
+ auto batcher = std::make_shared<TenantOplogBatcher>(
+ "tenant", &_oplogBuffer, _executor, Timestamp(3, 1), OpTime());
ASSERT_OK(batcher->startup());
auto batchFuture = batcher->getNextBatch(bigBatchLimits);