summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheahuychou Mao <mao.cheahuychou@gmail.com>2022-03-16 18:46:39 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-16 20:13:43 +0000
commitf801cb26949e709161b9673eb5fa9e39d641b17d (patch)
tree0e2282fa3a3e46ea10ddfd0d21a9d72e99f003c4
parent6da8828226321ef96eb05c79f3899b759844e02b (diff)
downloadmongo-f801cb26949e709161b9673eb5fa9e39d641b17d.tar.gz
SERVER-64413 Make SessionCatalogMigrationSource ignore internal transactions for non-retryable writes and uncommitted internal transactions for retryable writes
-rw-r--r--src/mongo/db/s/session_catalog_migration_source.cpp25
-rw-r--r--src/mongo/db/s/session_catalog_migration_source_test.cpp337
2 files changed, 343 insertions, 19 deletions
diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp
index 75b22f61342..36785825a04 100644
--- a/src/mongo/db/s/session_catalog_migration_source.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source.cpp
@@ -201,12 +201,27 @@ SessionCatalogMigrationSource::SessionCatalogMigrationSource(OperationContext* o
// tests.
DBDirectClient client(opCtx);
FindCommandRequest findRequest{NamespaceString::kSessionTransactionsTableNamespace};
- findRequest.setSort(BSON("_id" << 1));
+ // Skip retryable internal transactions that are either aborted or still in progress so there is
+ // no write history to transfer at this point.
+ // TODO (SERVER-64331): Determine if chunk migration should migrate internal sessions for
+ // non-retryable writes.
+ findRequest.setFilter(BSON(
+ "$or" << BSON_ARRAY(BSON((SessionTxnRecord::kSessionIdFieldName + "." +
+ InternalSessionFields::kTxnUUIDFieldName)
+ << BSON("$exists" << false))
+ << BSON("$and" << BSON_ARRAY(BSON(
+ (SessionTxnRecord::kSessionIdFieldName + "." +
+ InternalSessionFields::kTxnNumberFieldName)
+ << BSON("$exists" << true)
+ << SessionTxnRecord::kStateFieldName << "committed"))))));
+ findRequest.setSort(BSON(SessionTxnRecord::kSessionIdFieldName << 1));
auto cursor = client.find(std::move(findRequest));
while (cursor->more()) {
auto nextSession = SessionTxnRecord::parse(
IDLParserErrorContext("Session migration cloning"), cursor->next());
+ const auto sessionId = nextSession.getSessionId();
+
if (!nextSession.getLastWriteOpTime().isNull()) {
_sessionOplogIterators.push_back(
std::make_unique<SessionOplogIterator>(std::move(nextSession), _rollbackIdAtInit));
@@ -486,6 +501,14 @@ bool SessionCatalogMigrationSource::_fetchNextNewWriteOplog(OperationContext* op
// If this oplog entry corresponds to transaction prepare/commit, replace it with a sentinel
// entry.
if (entryAtOpTimeType == EntryAtOpTimeType::kTransaction) {
+ const auto sessionId = *newWriteOplogEntry.getSessionId();
+
+ if (isInternalSessionForNonRetryableWrite(sessionId)) {
+ // TODO (SERVER-64331): Determine if chunk migration should migrate internal sessions
+ // for non-retryable writes.
+ return false;
+ }
+
newWriteOplogEntry =
makeSentinelOplogEntry(*newWriteOplogEntry.getSessionId(),
*newWriteOplogEntry.getTxnNumber(),
diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp
index 0aa5aad5d0f..f2fe2b99e4a 100644
--- a/src/mongo/db/s/session_catalog_migration_source_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp
@@ -181,6 +181,83 @@ repl::OplogEntry makeRewrittenOplogInSession(repl::OpTime opTime,
.get()); // optime of previous write within same transaction
};
+repl::DurableReplOperation makeDurableReplOp(
+ const mongo::repl::OpTypeEnum opType,
+ const NamespaceString& nss,
+ const BSONObj& object,
+ boost::optional<BSONObj> object2,
+ const std::vector<int> stmtIds,
+ boost::optional<repl::RetryImageEnum> needsRetryImage = boost::none,
+ boost::optional<repl::OpTime> preImageOpTime = boost::none,
+ boost::optional<repl::OpTime> postImageOpTime = boost::none) {
+ auto op = repl::DurableReplOperation(opType, nss, object);
+ op.setObject2(object2);
+ if (stmtIds.size() == 1) {
+ // This is required for making BSON equality check in the tests below work.
+ op.setStatementIds({{stmtIds.front()}});
+ } else if (!stmtIds.empty()) {
+ op.setStatementIds({{stmtIds}});
+ }
+ op.setNeedsRetryImage(needsRetryImage);
+ op.setPreImageOpTime(preImageOpTime);
+ op.setPostImageOpTime(postImageOpTime);
+ return op;
+}
+
+repl::OplogEntry makeApplyOpsOplogEntry(repl::OpTime opTime,
+ repl::OpTime prevWriteOpTimeInTransaction,
+ std::vector<repl::DurableReplOperation> ops,
+ LogicalSessionId sessionId,
+ TxnNumber txnNumber,
+ bool isPrepare,
+ bool isPartial) {
+ BSONObjBuilder applyOpsBuilder;
+
+ BSONArrayBuilder opsArrayBuilder = applyOpsBuilder.subarrayStart("applyOps");
+ for (const auto& op : ops) {
+ opsArrayBuilder.append(op.toBSON());
+ }
+ opsArrayBuilder.done();
+
+ if (isPrepare) {
+ applyOpsBuilder.append(repl::ApplyOpsCommandInfoBase::kPrepareFieldName, true);
+ }
+
+ if (isPartial) {
+ applyOpsBuilder.append(repl::ApplyOpsCommandInfoBase::kPartialTxnFieldName, true);
+ }
+
+ repl::MutableOplogEntry op;
+ op.setOpType(repl::OpTypeEnum::kCommand);
+ op.setObject(applyOpsBuilder.obj());
+ op.setSessionId(std::move(sessionId));
+ op.setTxnNumber(std::move(txnNumber));
+ op.setOpTime(opTime);
+ op.setPrevWriteOpTimeInTransaction(prevWriteOpTimeInTransaction);
+ op.setWallClockTime(Date_t::now());
+ op.setNss({});
+
+ return {op.toBSON()};
+}
+
+repl::OplogEntry makeCommandOplogEntry(repl::OpTime opTime,
+ repl::OpTime prevWriteOpTimeInTransaction,
+ BSONObj commandObj,
+ boost::optional<LogicalSessionId> sessionId,
+ boost::optional<TxnNumber> txnNumber) {
+ repl::MutableOplogEntry op;
+ op.setOpType(repl::OpTypeEnum::kCommand);
+ op.setObject(std::move(commandObj));
+ op.setSessionId(std::move(sessionId));
+ op.setTxnNumber(std::move(txnNumber));
+ op.setOpTime(opTime);
+ op.setPrevWriteOpTimeInTransaction(prevWriteOpTimeInTransaction);
+ op.setWallClockTime({});
+ op.setNss({});
+
+ return {op.toBSON()};
+};
+
TEST_F(SessionCatalogMigrationSourceTest, NoSessionsToTransferShouldNotHaveOplog) {
SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
@@ -821,6 +898,68 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertIfOplogCannotBeFound) {
ASSERT_THROWS(migrationSource.fetchNextOplog(opCtx()), AssertionException);
}
+TEST_F(SessionCatalogMigrationSourceTest,
+ ReturnDeadEndSentinelOplogEntryForNewCommittedNonInternalTransaction) {
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
+
+ const auto sessionId = makeLogicalSessionIdForTest();
+ const auto txnNumber = TxnNumber{1};
+
+ auto op = makeDurableReplOp(repl::OpTypeEnum::kInsert, kNs, BSON("x" << 1), BSONObj(), {});
+ auto applyOpsOpTime = repl::OpTime(Timestamp(110, 1), 1);
+ auto entry = makeApplyOpsOplogEntry(applyOpsOpTime,
+ {}, // prevOpTime
+ {op},
+ sessionId,
+ txnNumber,
+ false, // isPrepare
+ false); // isPartial
+ insertOplogEntry(entry);
+
+ migrationSource.notifyNewWriteOpTime(
+ entry.getOpTime(), SessionCatalogMigrationSource::EntryAtOpTimeType::kTransaction);
+ ASSERT_TRUE(migrationSource.hasMoreOplog());
+ ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
+
+ auto nextOplogResult = migrationSource.getLastFetchedOplog();
+ ASSERT_TRUE(nextOplogResult.shouldWaitForMajority);
+ ASSERT_EQ(*nextOplogResult.oplog->getSessionId(), sessionId);
+ ASSERT_EQ(*nextOplogResult.oplog->getTxnNumber(), txnNumber);
+ ASSERT_BSONOBJ_EQ(*nextOplogResult.oplog->getObject2(),
+ TransactionParticipant::kDeadEndSentinel);
+ auto stmtIds = nextOplogResult.oplog->getStatementIds();
+ ASSERT_EQ(stmtIds.size(), 1U);
+ ASSERT_EQ(stmtIds[0], kIncompleteHistoryStmtId);
+
+ ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
+}
+
+TEST_F(SessionCatalogMigrationSourceTest,
+ IgnoreNewCommittedForInternalTransactionForNonRetryableWrite) {
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
+
+ const auto sessionId = makeLogicalSessionIdWithTxnUUIDForTest();
+ const auto txnNumber = TxnNumber{1};
+
+ auto op = makeDurableReplOp(repl::OpTypeEnum::kInsert, kNs, BSON("x" << 1), BSONObj(), {});
+ auto applyOpsOpTime = repl::OpTime(Timestamp(120, 1), 1);
+ auto entry = makeApplyOpsOplogEntry(applyOpsOpTime,
+ {}, // prevOpTime
+ {op},
+ sessionId,
+ txnNumber,
+ false, // isPrepare
+ false); // isPartial
+ insertOplogEntry(entry);
+
+ migrationSource.notifyNewWriteOpTime(
+ entry.getOpTime(), SessionCatalogMigrationSource::EntryAtOpTimeType::kTransaction);
+ ASSERT_TRUE(migrationSource.hasMoreOplog());
+ ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
+}
+
TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBufferWasDepleted) {
const auto sessionId = makeLogicalSessionIdForTest();
const auto txnNumber = TxnNumber{1};
@@ -1011,11 +1150,25 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertWhenRollbackDetected) {
}
TEST_F(SessionCatalogMigrationSourceTest,
- CommitTransactionEntriesShouldBeConvertedToDeadEndSentinel) {
+ ReturnDeadEndSentinelOplogEntryForCommittedNonInternalTransaction) {
+ const auto sessionId = makeLogicalSessionIdForTest();
+ const auto txnNumber = TxnNumber{1};
+
+ auto op = makeDurableReplOp(repl::OpTypeEnum::kInsert, kNs, BSON("x" << 1), BSONObj(), {});
+ auto applyOpsOpTime = repl::OpTime(Timestamp(160, 1), 1);
+ auto entry = makeApplyOpsOplogEntry(applyOpsOpTime,
+ {}, // prevOpTime
+ {op},
+ sessionId,
+ txnNumber,
+ false, // isPrepare
+ false); // isPartial
+ insertOplogEntry(entry);
+
SessionTxnRecord txnRecord;
- txnRecord.setSessionId(makeLogicalSessionIdForTest());
- txnRecord.setTxnNum(20);
- txnRecord.setLastWriteOpTime(repl::OpTime(Timestamp(12, 34), 5));
+ txnRecord.setSessionId(sessionId);
+ txnRecord.setTxnNum(txnNumber);
+ txnRecord.setLastWriteOpTime(repl::OpTime(entry.getOpTime()));
txnRecord.setLastWriteDate(Date_t::now());
txnRecord.setState(DurableTxnStateEnum::kCommitted);
@@ -1036,12 +1189,57 @@ TEST_F(SessionCatalogMigrationSourceTest,
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
}
+TEST_F(SessionCatalogMigrationSourceTest, IgnoreCommittedInternalTransactionForNonRetryableWrite) {
+ const auto sessionId = makeLogicalSessionIdWithTxnUUIDForTest();
+ const auto txnNumber = TxnNumber{1};
+
+ auto op = makeDurableReplOp(repl::OpTypeEnum::kInsert, kNs, BSON("x" << 1), BSONObj(), {});
+ auto applyOpsOpTime = repl::OpTime(Timestamp(170, 1), 1);
+ auto entry = makeApplyOpsOplogEntry(applyOpsOpTime,
+ {}, // prevOpTime
+ {op},
+ sessionId,
+ txnNumber,
+ false, // isPrepare
+ false); // isPartial
+ insertOplogEntry(entry);
+
+ SessionTxnRecord txnRecord;
+ txnRecord.setSessionId(sessionId);
+ txnRecord.setTxnNum(txnNumber);
+ txnRecord.setLastWriteOpTime(repl::OpTime(entry.getOpTime()));
+ txnRecord.setLastWriteDate(Date_t::now());
+ txnRecord.setState(DurableTxnStateEnum::kCommitted);
+
+ DBDirectClient client(opCtx());
+ client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON());
+
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+
+ ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
+ ASSERT_FALSE(migrationSource.hasMoreOplog());
+}
+
TEST_F(SessionCatalogMigrationSourceTest,
- PrepareTransactionEntriesShouldBeConvertedToDeadEndSentinel) {
+ ReturnDeadEndSentinelOplogEntryForPreparedNonInternalTransaction) {
+ const auto sessionId = makeLogicalSessionIdForTest();
+ const auto txnNumber = TxnNumber{1};
+
+ auto op = makeDurableReplOp(repl::OpTypeEnum::kInsert, kNs, BSON("x" << 1), BSONObj(), {});
+ auto applyOpsOpTime = repl::OpTime(Timestamp(180, 1), 1);
+ auto entry = makeApplyOpsOplogEntry(applyOpsOpTime,
+ {}, // prevOpTime
+ {op},
+ sessionId,
+ txnNumber,
+ true, // isPrepare
+ false); // isPartial
+ insertOplogEntry(entry);
+
SessionTxnRecord txnRecord;
- txnRecord.setSessionId(makeLogicalSessionIdForTest());
- txnRecord.setTxnNum(20);
- txnRecord.setLastWriteOpTime(repl::OpTime(Timestamp(12, 34), 5));
+ txnRecord.setSessionId(sessionId);
+ txnRecord.setTxnNum(txnNumber);
+ txnRecord.setLastWriteOpTime(repl::OpTime(entry.getOpTime()));
txnRecord.setLastWriteDate(Date_t::now());
txnRecord.setState(DurableTxnStateEnum::kPrepared);
@@ -1062,13 +1260,27 @@ TEST_F(SessionCatalogMigrationSourceTest,
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
}
-TEST_F(SessionCatalogMigrationSourceTest, InProgressTransactionEntriesShouldBeIgnored) {
+TEST_F(SessionCatalogMigrationSourceTest, IgnorePreparedInternalTransactionForNonRetryableWrite) {
+ const auto sessionId = makeLogicalSessionIdWithTxnUUIDForTest();
+ const auto txnNumber = TxnNumber{1};
+
+ auto op = makeDurableReplOp(repl::OpTypeEnum::kInsert, kNs, BSON("x" << 1), BSONObj(), {});
+ auto applyOpsOpTime = repl::OpTime(Timestamp(190, 1), 1);
+ auto entry = makeApplyOpsOplogEntry(applyOpsOpTime,
+ {}, // prevOpTime
+ {op},
+ sessionId,
+ txnNumber,
+ true, // isPrepare
+ false); // isPartial
+ insertOplogEntry(entry);
+
SessionTxnRecord txnRecord;
- txnRecord.setSessionId(makeLogicalSessionIdForTest());
- txnRecord.setTxnNum(20);
- txnRecord.setLastWriteOpTime(repl::OpTime(Timestamp(12, 34), 5));
+ txnRecord.setSessionId(sessionId);
+ txnRecord.setTxnNum(txnNumber);
+ txnRecord.setLastWriteOpTime(repl::OpTime(entry.getOpTime()));
txnRecord.setLastWriteDate(Date_t::now());
- txnRecord.setState(DurableTxnStateEnum::kInProgress);
+ txnRecord.setState(DurableTxnStateEnum::kPrepared);
DBDirectClient client(opCtx());
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON());
@@ -1079,13 +1291,27 @@ TEST_F(SessionCatalogMigrationSourceTest, InProgressTransactionEntriesShouldBeIg
ASSERT_FALSE(migrationSource.hasMoreOplog());
}
-TEST_F(SessionCatalogMigrationSourceTest, AbortedTransactionEntriesShouldBeIgnored) {
+TEST_F(SessionCatalogMigrationSourceTest, IgnorePreparedInternalTransactionForRetryableWrite) {
+ const auto sessionId = makeLogicalSessionIdWithTxnNumberAndUUIDForTest();
+ const auto txnNumber = TxnNumber{1};
+
+ auto op = makeDurableReplOp(repl::OpTypeEnum::kInsert, kNs, BSON("x" << 1), BSONObj(), {1});
+ auto applyOpsOpTime = repl::OpTime(Timestamp(200, 1), 1);
+ auto entry = makeApplyOpsOplogEntry(applyOpsOpTime,
+ {}, // prevOpTime
+ {op},
+ sessionId,
+ txnNumber,
+ true, // isPrepare
+ false); // isPartial
+ insertOplogEntry(entry);
+
SessionTxnRecord txnRecord;
- txnRecord.setSessionId(makeLogicalSessionIdForTest());
- txnRecord.setTxnNum(20);
- txnRecord.setLastWriteOpTime(repl::OpTime(Timestamp(12, 34), 5));
+ txnRecord.setSessionId(sessionId);
+ txnRecord.setTxnNum(txnNumber);
+ txnRecord.setLastWriteOpTime(repl::OpTime(entry.getOpTime()));
txnRecord.setLastWriteDate(Date_t::now());
- txnRecord.setState(DurableTxnStateEnum::kAborted);
+ txnRecord.setState(DurableTxnStateEnum::kPrepared);
DBDirectClient client(opCtx());
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON());
@@ -1096,6 +1322,81 @@ TEST_F(SessionCatalogMigrationSourceTest, AbortedTransactionEntriesShouldBeIgnor
ASSERT_FALSE(migrationSource.hasMoreOplog());
}
+TEST_F(SessionCatalogMigrationSourceTest, IgnoreInProgressTransaction) {
+ auto runTest = [&](const LogicalSessionId& sessionId) {
+ const auto txnNumber = TxnNumber{1};
+ SessionTxnRecord txnRecord;
+ txnRecord.setSessionId(sessionId);
+ txnRecord.setTxnNum(txnNumber);
+ txnRecord.setLastWriteOpTime(repl::OpTime(Timestamp(12, 34), 5));
+ txnRecord.setLastWriteDate(Date_t::now());
+ txnRecord.setState(DurableTxnStateEnum::kInProgress);
+
+ DBDirectClient client(opCtx());
+ client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON());
+
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+
+ ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
+ ASSERT_FALSE(migrationSource.hasMoreOplog());
+ };
+
+ runTest(makeLogicalSessionIdForTest());
+ runTest(makeLogicalSessionIdWithTxnUUIDForTest());
+ runTest(makeLogicalSessionIdWithTxnNumberAndUUIDForTest());
+}
+
+TEST_F(SessionCatalogMigrationSourceTest, IgnoreAbortedTransaction) {
+ auto opTimeSecs = 200;
+
+ auto runTest = [&](const LogicalSessionId& sessionId) {
+ const auto txnNumber = TxnNumber{1};
+
+ auto applyOpsOpTime = repl::OpTime(Timestamp(opTimeSecs, 1), 1);
+ auto op = makeDurableReplOp(repl::OpTypeEnum::kInsert,
+ kNs,
+ BSON("x" << 1),
+ BSONObj(),
+ isInternalSessionForRetryableWrite(sessionId)
+ ? std::vector<StmtId>{1}
+ : std::vector<StmtId>{});
+ auto entry1 = makeApplyOpsOplogEntry(applyOpsOpTime,
+ {}, // prevOpTime
+ {op},
+ sessionId,
+ txnNumber,
+ true, // isPrepare
+ false); // isPartial
+ insertOplogEntry(entry1);
+
+ auto abortOpTime = repl::OpTime(Timestamp(opTimeSecs, 2), 1);
+ auto entry2 = makeCommandOplogEntry(
+ abortOpTime, applyOpsOpTime, BSON("abortTransaction" << 1), sessionId, txnNumber);
+ insertOplogEntry(entry2);
+
+ SessionTxnRecord txnRecord;
+ txnRecord.setSessionId(sessionId);
+ txnRecord.setTxnNum(txnNumber);
+ txnRecord.setLastWriteOpTime(abortOpTime);
+ txnRecord.setLastWriteDate(Date_t::now());
+ txnRecord.setState(DurableTxnStateEnum::kAborted);
+
+ DBDirectClient client(opCtx());
+ client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON());
+
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+
+ ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
+ ASSERT_FALSE(migrationSource.hasMoreOplog());
+
+ opTimeSecs++;
+ };
+
+ runTest(makeLogicalSessionIdForTest());
+ runTest(makeLogicalSessionIdWithTxnUUIDForTest());
+ runTest(makeLogicalSessionIdWithTxnNumberAndUUIDForTest());
+}
+
TEST_F(SessionCatalogMigrationSourceTest,
MixedTransactionEntriesAndRetryableWritesEntriesReturnCorrectResults) {
const auto sessionId1 = makeLogicalSessionIdForTest();