summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLingzhi Deng <lingzhi.deng@mongodb.com>2021-03-04 17:37:12 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-03-04 19:12:57 +0000
commitafbb044c9dc7535c3c048a38056e43d81a298d97 (patch)
tree72fa3c4975450121ea037630d05219f86f511567
parentad77d6846c1104b282cbfdad426f68ca86b673ad (diff)
downloadmongo-afbb044c9dc7535c3c048a38056e43d81a298d97.tar.gz
SERVER-54912: Use o2 field for tenant migration no-op writes
-rw-r--r--jstests/replsets/tenant_migration_resume_oplog_application.js8
-rw-r--r--src/mongo/db/op_observer_impl.cpp21
-rw-r--r--src/mongo/db/repl/session_update_tracker.cpp149
-rw-r--r--src/mongo/db/repl/session_update_tracker.h17
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.cpp49
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service_test.cpp13
-rw-r--r--src/mongo/db/repl/tenant_oplog_applier.cpp69
-rw-r--r--src/mongo/db/repl/tenant_oplog_applier.h3
-rw-r--r--src/mongo/db/repl/tenant_oplog_applier_test.cpp2
9 files changed, 125 insertions, 206 deletions
diff --git a/jstests/replsets/tenant_migration_resume_oplog_application.js b/jstests/replsets/tenant_migration_resume_oplog_application.js
index d39c3ea4bb4..5a8a65dc676 100644
--- a/jstests/replsets/tenant_migration_resume_oplog_application.js
+++ b/jstests/replsets/tenant_migration_resume_oplog_application.js
@@ -82,8 +82,8 @@ let resultsArr = appliedNoOps.toArray();
// We should have applied the no-op oplog entries for the first batch of documents (size 2).
assert.eq(2, appliedNoOps.count(), appliedNoOps);
// No-op entries will be in the same order.
-assert.eq(docsToApply[0], resultsArr[0].o.o, resultsArr);
-assert.eq(docsToApply[1], resultsArr[1].o.o, resultsArr);
+assert.eq(docsToApply[0], resultsArr[0].o2.o, resultsArr);
+assert.eq(docsToApply[1], resultsArr[1].o2.o, resultsArr);
// Step up a new node in the recipient set and trigger a failover. The new primary should resume
// fetching starting from the unapplied documents.
@@ -100,9 +100,9 @@ local = newRecipientPrimary.getDB("local");
appliedNoOps = local.oplog.rs.find({fromTenantMigration: migrationId, op: "n"});
resultsArr = appliedNoOps.toArray();
assert.eq(3, appliedNoOps.count(), appliedNoOps);
-assert.eq(docsToApply[2], resultsArr[2].o.o, resultsArr);
+assert.eq(docsToApply[2], resultsArr[2].o2.o, resultsArr);
tenantMigrationTest.checkTenantDBHashes(tenantId);
tenantMigrationTest.stop();
recipientRst.stopSet();
-})(); \ No newline at end of file
+})();
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 59d6696c894..5659941af9b 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -696,27 +696,6 @@ void OpObserverImpl::onInternalOpMessage(
oplogEntry.setOpTime(*slot);
}
logOperation(opCtx, &oplogEntry);
-
- if (repl::tenantMigrationRecipientInfo(opCtx) && o2MsgObj) {
- OperationSessionInfo sessionInfo;
- try {
- sessionInfo = OperationSessionInfo::parse(IDLParserErrorContext("OperationSessionInfo"),
- *o2MsgObj);
- } catch (const DBException&) {
- // Not a transaction no-op for tenant migration.
- }
- // TODO SERVER-53509: Handle retryable writes.
- if (sessionInfo.getSessionId() && sessionInfo.getTxnNumber()) {
- SessionTxnRecord sessionTxnRecord;
- sessionTxnRecord.setSessionId(*sessionInfo.getSessionId());
- sessionTxnRecord.setTxnNum(*sessionInfo.getTxnNumber());
- sessionTxnRecord.setLastWriteOpTime(repl::OpTime());
- sessionTxnRecord.setLastWriteDate(oplogEntry.getWallClockTime());
- sessionTxnRecord.setState(DurableTxnStateEnum::kCommitted);
- TransactionParticipant::get(opCtx).onWriteOpCompletedOnPrimary(
- opCtx, {}, sessionTxnRecord);
- }
- }
}
void OpObserverImpl::onCreateCollection(OperationContext* opCtx,
diff --git a/src/mongo/db/repl/session_update_tracker.cpp b/src/mongo/db/repl/session_update_tracker.cpp
index e51155b19cc..bbd25a8b47d 100644
--- a/src/mongo/db/repl/session_update_tracker.cpp
+++ b/src/mongo/db/repl/session_update_tracker.cpp
@@ -111,7 +111,7 @@ boost::optional<repl::OplogEntry> createMatchingTransactionTableUpdate(
*
* 1) Have the 'fromTenantMigration' field set
* 2) Be a no-op entry
- * 3) Store session info inside the 'o2' field
+ * 3) Have sessionId and txnNumber
*/
bool isTransactionEntryFromTenantMigrations(OplogEntry& entry) {
if (!entry.getFromTenantMigration()) {
@@ -122,13 +122,7 @@ bool isTransactionEntryFromTenantMigrations(OplogEntry& entry) {
return false;
}
- if (!entry.getObject2()) {
- return false;
- }
- auto entryObject2 = *entry.getObject2();
-
- if (!entryObject2.hasField(OperationSessionInfo::kSessionIdFieldName) ||
- !entryObject2.hasField(OperationSessionInfo::kTxnNumberFieldName)) {
+ if (!entry.getSessionId() || !entry.getTxnNumber()) {
return false;
}
@@ -184,15 +178,7 @@ boost::optional<std::vector<OplogEntry>> SessionUpdateTracker::updateSession(
// each entry originating from a multi-statement transaction. For this reason, we cannot defer
// entries originating from multi-statement transactions.
if (auto txnTableUpdate = _createTransactionTableUpdateFromTransactionOp(entry)) {
- if (entry.getFromTenantMigration()) {
- // If the entry is from tenant migrations, the session info will be stored inside of the
- // 'o2' field.
- auto sessionInfo = OperationSessionInfo::parse(
- IDLParserErrorContext("OperationSessionInfo"), *entry.getObject2());
- _sessionsToUpdate.erase(*sessionInfo.getSessionId());
- } else {
- _sessionsToUpdate.erase(*entry.getOperationSessionInfo().getSessionId());
- }
+ _sessionsToUpdate.erase(*entry.getOperationSessionInfo().getSessionId());
return boost::optional<std::vector<OplogEntry>>({*txnTableUpdate});
}
@@ -298,97 +284,68 @@ std::vector<OplogEntry> SessionUpdateTracker::_flushForQueryPredicate(
return opList;
}
-std::pair<BSONObj, LogicalSessionId> SessionUpdateTracker::_createSessionTxnRecordTenantMigration(
+boost::optional<OplogEntry> SessionUpdateTracker::_createTransactionTableUpdateFromTransactionOp(
const repl::OplogEntry& entry) {
- SessionTxnRecord newTxnRecord;
-
- // For a tenant migrations transactions entry, all information is stored in the 'o2' field of
- // the oplog entry.
- auto entryObject2 = entry.getObject2();
- invariant(entryObject2);
- auto sessionInfo =
- OperationSessionInfo::parse(IDLParserErrorContext("OperationSessionInfo"), *entryObject2);
+ auto sessionInfo = entry.getOperationSessionInfo();
+ // We only update the transaction table on the first partialTxn operation.
+ if (entry.isPartialTransaction() && !entry.getPrevWriteOpTimeInTransaction()->isNull()) {
+ return boost::none;
+ }
invariant(sessionInfo.getSessionId());
- auto sessionId = *sessionInfo.getSessionId();
- invariant(sessionInfo.getTxnNumber());
- auto txnNumber = *sessionInfo.getTxnNumber();
-
- newTxnRecord.setSessionId(sessionId);
- newTxnRecord.setTxnNum(txnNumber);
- newTxnRecord.setLastWriteOpTime(OpTime());
- newTxnRecord.setLastWriteDate(entry.getWallClockTime());
- newTxnRecord.setState(DurableTxnStateEnum::kCommitted);
- return std::pair(newTxnRecord.toBSON(), sessionId);
-}
-
-std::pair<BSONObj, LogicalSessionId> SessionUpdateTracker::_createSessionTxnRecordGeneral(
- const repl::OplogEntry& entry) {
- SessionTxnRecord newTxnRecord;
+ const auto updateBSON = [&] {
+ SessionTxnRecord newTxnRecord;
+ newTxnRecord.setSessionId(*sessionInfo.getSessionId());
+ newTxnRecord.setTxnNum(*sessionInfo.getTxnNumber());
+ newTxnRecord.setLastWriteOpTime(entry.getOpTime());
+ newTxnRecord.setLastWriteDate(entry.getWallClockTime());
- auto sessionInfo = entry.getOperationSessionInfo();
- invariant(sessionInfo.getSessionId());
+ if (entry.getFromTenantMigration() && entry.getOpType() == OpTypeEnum::kNoop) {
+ // For tenant migration, we don't need to set the lastWriteOpTime.
+ newTxnRecord.setLastWriteOpTime(OpTime());
+ newTxnRecord.setState(DurableTxnStateEnum::kCommitted);
+ return newTxnRecord.toBSON();
+ }
- auto sessionId = *sessionInfo.getSessionId();
- newTxnRecord.setSessionId(sessionId);
- newTxnRecord.setTxnNum(*sessionInfo.getTxnNumber());
- newTxnRecord.setLastWriteOpTime(entry.getOpTime());
- newTxnRecord.setLastWriteDate(entry.getWallClockTime());
-
- if (entry.isPartialTransaction()) {
- invariant(entry.getPrevWriteOpTimeInTransaction()->isNull());
- newTxnRecord.setState(DurableTxnStateEnum::kInProgress);
- newTxnRecord.setStartOpTime(entry.getOpTime());
- return std::pair(newTxnRecord.toBSON(), sessionId);
- }
- switch (entry.getCommandType()) {
- case repl::OplogEntry::CommandType::kApplyOps:
- if (entry.shouldPrepare()) {
- newTxnRecord.setState(DurableTxnStateEnum::kPrepared);
- if (entry.getPrevWriteOpTimeInTransaction()->isNull()) {
- // The prepare oplog entry is the first operation of the transaction.
- newTxnRecord.setStartOpTime(entry.getOpTime());
+ if (entry.isPartialTransaction()) {
+ invariant(entry.getPrevWriteOpTimeInTransaction()->isNull());
+ newTxnRecord.setState(DurableTxnStateEnum::kInProgress);
+ newTxnRecord.setStartOpTime(entry.getOpTime());
+ return newTxnRecord.toBSON();
+ }
+ switch (entry.getCommandType()) {
+ case repl::OplogEntry::CommandType::kApplyOps:
+ if (entry.shouldPrepare()) {
+ newTxnRecord.setState(DurableTxnStateEnum::kPrepared);
+ if (entry.getPrevWriteOpTimeInTransaction()->isNull()) {
+ // The prepare oplog entry is the first operation of the transaction.
+ newTxnRecord.setStartOpTime(entry.getOpTime());
+ } else {
+ // Update the transaction record using $set to avoid overwriting the
+ // startOpTime.
+ return BSON("$set" << newTxnRecord.toBSON());
+ }
} else {
- // Update the transaction record using $set to avoid overwriting the
- // startOpTime.
- return std::pair(BSON("$set" << newTxnRecord.toBSON()), sessionId);
+ newTxnRecord.setState(DurableTxnStateEnum::kCommitted);
}
- } else {
+ break;
+ case repl::OplogEntry::CommandType::kCommitTransaction:
newTxnRecord.setState(DurableTxnStateEnum::kCommitted);
- }
- break;
- case repl::OplogEntry::CommandType::kCommitTransaction:
- newTxnRecord.setState(DurableTxnStateEnum::kCommitted);
- break;
- case repl::OplogEntry::CommandType::kAbortTransaction:
- newTxnRecord.setState(DurableTxnStateEnum::kAborted);
- break;
- default:
- break;
- }
- return std::pair(newTxnRecord.toBSON(), sessionId);
-}
-
-boost::optional<OplogEntry> SessionUpdateTracker::_createTransactionTableUpdateFromTransactionOp(
- const repl::OplogEntry& entry) {
- // We only update the transaction table on the first partialTxn operation.
- if (entry.isPartialTransaction() && !entry.getPrevWriteOpTimeInTransaction()->isNull()) {
- return boost::none;
- }
-
- std::pair<BSONObj, LogicalSessionId> updatePair;
- if (entry.getOpType() == OpTypeEnum::kNoop) {
- // If the oplog entry is a no-op, we must have received it from tenant migrations.
- updatePair = _createSessionTxnRecordTenantMigration(entry);
- } else {
- updatePair = _createSessionTxnRecordGeneral(entry);
- }
+ break;
+ case repl::OplogEntry::CommandType::kAbortTransaction:
+ newTxnRecord.setState(DurableTxnStateEnum::kAborted);
+ break;
+ default:
+ break;
+ }
+ return newTxnRecord.toBSON();
+ }();
return createOplogEntryForTransactionTableUpdate(
entry.getOpTime(),
- updatePair.first,
- BSON(SessionTxnRecord::kSessionIdFieldName << updatePair.second.toBSON()),
+ updateBSON,
+ BSON(SessionTxnRecord::kSessionIdFieldName << sessionInfo.getSessionId()->toBSON()),
entry.getWallClockTime());
}
diff --git a/src/mongo/db/repl/session_update_tracker.h b/src/mongo/db/repl/session_update_tracker.h
index 6d65a2a7609..4d84af6be96 100644
--- a/src/mongo/db/repl/session_update_tracker.h
+++ b/src/mongo/db/repl/session_update_tracker.h
@@ -90,23 +90,6 @@ private:
boost::optional<std::vector<OplogEntry>> _updateOrFlush(const OplogEntry& entry);
/**
- * Creates a new session transaction record from a tenant migration transaction oplog entry. The
- * oplog entry should be no-op, and all information needed to update should be stored in the
- * 'o2' field of the oplog entry.
- *
- * Returns a pair of the session transaction record in BSON and the logical session ID.
- */
- std::pair<BSONObj, LogicalSessionId> _createSessionTxnRecordTenantMigration(
- const OplogEntry& entry);
-
- /**
- * Creates a new session transaction record from a general transaction oplog entry.
- *
- * Returns a pair of the session transaction record in BSON and the logical session ID.
- */
- std::pair<BSONObj, LogicalSessionId> _createSessionTxnRecordGeneral(const OplogEntry& entry);
-
- /**
* Returns an update to the transaction table generated by a transaction operation. This returns
* boost::none if the operation is a partialTxn operation that is not the first of the
* transaction.
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
index f9ec0612fdb..6efb5f53ce4 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
@@ -857,8 +857,6 @@ AggregateCommand TenantMigrationRecipientService::Instance::_makeCommittedTransa
void TenantMigrationRecipientService::Instance::_processCommittedTransactionEntry(
const BSONObj& entry) {
- OpObserver* opObserver = cc().getServiceContext()->getOpObserver();
-
auto sessionTxnRecord =
SessionTxnRecord::parse(IDLParserErrorContext("SessionTxnRecord"), entry);
auto sessionId = sessionTxnRecord.getSessionId();
@@ -905,27 +903,28 @@ void TenantMigrationRecipientService::Instance::_processCommittedTransactionEntr
txnParticipant.beginOrContinueTransactionUnconditionally(opCtx, txnNumber);
+ MutableOplogEntry noopEntry;
+ noopEntry.setOpType(repl::OpTypeEnum::kNoop);
+ noopEntry.setNss({});
+ noopEntry.setObject({});
+ noopEntry.setWallClockTime(opCtx->getServiceContext()->getFastClockSource()->now());
+ noopEntry.setSessionId(sessionId);
+ noopEntry.setTxnNumber(txnNumber);
+
+ // Use the same wallclock time as the noop entry.
+ sessionTxnRecord.setStartOpTime(boost::none);
+ sessionTxnRecord.setLastWriteOpTime(OpTime());
+ sessionTxnRecord.setLastWriteDate(noopEntry.getWallClockTime());
+
AutoGetOplog oplogWrite(opCtx, OplogAccessMode::kWrite);
writeConflictRetry(
opCtx, "writeDonorCommittedTxnEntry", NamespaceString::kRsOplogNamespace.ns(), [&] {
WriteUnitOfWork wuow(opCtx);
- OperationSessionInfo sessionInfo;
- sessionInfo.setSessionId(sessionId);
- sessionInfo.setTxnNumber(txnNumber);
- auto sessionInfoBson = sessionInfo.toBSON();
-
- // Write the no-op entry and trigger 'config.transactions' update.
- opObserver->onInternalOpMessage(opCtx,
- NamespaceString(),
- boost::none /* uuid */,
- {} /* msgObj */,
- sessionInfoBson /* o2MsgObj */,
- boost::none /* preImageOpTime */,
- boost::none /* postImageOpTime */,
- boost::none /* prevWriteOpTimeInTransaction */,
- boost::none /* slot */
- );
+ // Write the no-op entry and update 'config.transactions'.
+ repl::logOp(opCtx, &noopEntry);
+ TransactionParticipant::get(opCtx).onWriteOpCompletedOnPrimary(
+ opCtx, {}, sessionTxnRecord);
wuow.commit();
});
@@ -933,7 +932,6 @@ void TenantMigrationRecipientService::Instance::_processCommittedTransactionEntr
// Invalidate in-memory state so that the next time the session is checked out, it would reload
// the transaction state from 'config.transactions'.
txnParticipant.invalidate(opCtx);
- return;
}
void TenantMigrationRecipientService::Instance::_fetchCommittedTransactionsBeforeStartOpTime() {
@@ -1234,14 +1232,9 @@ OpTime TenantMigrationRecipientService::Instance::_getOplogResumeApplyingDonorOp
getMigrationUUID());
// Find the most recent no-op oplog entry from the current migration.
if (isFromCurrentMigration &&
- (oplogObj.getStringField("op") == OpType_serializer(repl::OpTypeEnum::kNoop))) {
- const auto migratedEntryObj = oplogObj.getObjectField("o");
- if (migratedEntryObj.isEmpty()) {
- // If the 'o' field is empty, this entry is a transaction no-op entry. Skip this
- // entry and continue.
- continue;
- }
-
+ (oplogObj.getStringField("op") == OpType_serializer(repl::OpTypeEnum::kNoop)) &&
+ oplogObj.hasField("o2")) {
+ const auto migratedEntryObj = oplogObj.getObjectField("o2");
const auto swDonorOpTime = repl::OpTime::parseFromOplogEntry(migratedEntryObj);
uassert(5272305,
str::stream() << "Unable to parse opTime from tenant migration oplog entry: "
@@ -2098,4 +2091,4 @@ const std::string& TenantMigrationRecipientService::Instance::getTenantId() cons
}
} // namespace repl
-} // namespace mongo \ No newline at end of file
+} // namespace mongo
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
index e10c08ba493..d497254c649 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
@@ -113,7 +113,8 @@ MutableOplogEntry makeNoOpOplogEntry(OpTime opTime,
oplogEntry.setOpType(repl::OpTypeEnum::kNoop);
oplogEntry.setOpTime(opTime);
oplogEntry.setNss(nss);
- oplogEntry.setObject(o);
+ oplogEntry.setObject({});
+ oplogEntry.setObject2(o);
oplogEntry.setWallClockTime(Date_t::now());
if (migrationUUID) {
oplogEntry.setFromTenantMigration(migrationUUID.get());
@@ -1913,11 +1914,11 @@ TEST_F(TenantMigrationRecipientServiceTest, OplogApplierResumesFromStartDonorApp
// Create and insert the following into the oplog:
// - (1) An oplog entry with opTime earlier than 'cloneFinishedRecipientOpTime'.
// - (2) An oplog entry with opTime greater than 'cloneFinishedRecipientOpTime'.
- // - (3) A no-op oplog entry with an inner donor oplog entry as the 'o' field. The donor opTime
- // is less than the 'startApplyingDonorOpTime'.
- // - (4) A no-op oplog entry with an inner oplog entry as the 'o' field but no 'fromMigrate'
- // field. These oplog entries do not satisfy the conditions for the oplog applier to resume from
- // so we default to resuming from 'startDonorApplyingOpTime'.
+ // - (3) A no-op oplog entry with an inner donor oplog entry as the 'o2' field. The donor opTime
+ // is less than the 'startApplyingDonorOpTime'.
+ // - (4) A no-op oplog entry with an inner oplog entry as the 'o2' field but no
+ // 'fromTenantMigrate' field. These oplog entries do not satisfy the conditions for the
+ // oplog applier to resume from so we default to resuming from 'startDonorApplyingOpTime'.
const auto insertNss = NamespaceString("tenantA_foo.bar");
const auto beforeStartApplyingOpTime = OpTime(Timestamp(1, 1), 1);
const auto entryBeforeStartApplyingOpTime = makeOplogEntry(
diff --git a/src/mongo/db/repl/tenant_oplog_applier.cpp b/src/mongo/db/repl/tenant_oplog_applier.cpp
index c85e053afbe..0170a8bdac8 100644
--- a/src/mongo/db/repl/tenant_oplog_applier.cpp
+++ b/src/mongo/db/repl/tenant_oplog_applier.cpp
@@ -477,7 +477,7 @@ TenantOplogApplier::OpTimePair TenantOplogApplier::_writeNoOpEntries(
status = scheduleStatus;
} else {
try {
- _writeSessionNoOpsForRange(opObserver, s.second.begin(), s.second.end());
+ _writeSessionNoOpsForRange(s.second.begin(), s.second.end());
} catch (const DBException& e) {
status = e.toStatus();
}
@@ -503,7 +503,6 @@ TenantOplogApplier::OpTimePair TenantOplogApplier::_writeNoOpEntries(
}
void TenantOplogApplier::_writeSessionNoOpsForRange(
- OpObserver* opObserver,
std::vector<TenantNoOpEntry>::const_iterator begin,
std::vector<TenantNoOpEntry>::const_iterator end) {
auto opCtx = cc().makeOperationContext();
@@ -521,14 +520,23 @@ void TenantOplogApplier::_writeSessionNoOpsForRange(
invariant(!isResumeTokenNoop(entry));
invariant(entry.getSessionId());
+ MutableOplogEntry noopEntry;
+ noopEntry.setOpType(repl::OpTypeEnum::kNoop);
+ noopEntry.setNss(entry.getNss());
+ noopEntry.setUuid(entry.getUuid());
+ noopEntry.setObject({}); // Empty 'o' field.
+ noopEntry.setObject2(entry.getEntry().toBSON());
+ noopEntry.setOpTime(*iter->second);
+ noopEntry.setWallClockTime(opCtx->getServiceContext()->getFastClockSource()->now());
+
boost::optional<MongoDOperationContextSession> scopedSession;
- boost::optional<BSONObj> o2;
+ boost::optional<SessionTxnRecord> sessionTxnRecord;
if (entry.getTxnNumber() && !entry.isPartialTransaction() &&
(entry.getCommandType() == repl::OplogEntry::CommandType::kCommitTransaction ||
entry.getCommandType() == repl::OplogEntry::CommandType::kApplyOps)) {
// Final applyOp for a transaction.
- const auto& sessionId = *entry.getSessionId();
- const auto& txnNumber = *entry.getTxnNumber();
+ auto sessionId = *entry.getSessionId();
+ auto txnNumber = *entry.getTxnNumber();
opCtx->setLogicalSessionId(sessionId);
opCtx->setTxnNumber(txnNumber);
opCtx->setInMultiDocumentTransaction();
@@ -538,7 +546,8 @@ void TenantOplogApplier::_writeSessionNoOpsForRange(
"sessionId"_attr = sessionId,
"txnNumber"_attr = txnNumber,
"tenant"_attr = _tenantId,
- "migrationUuid"_attr = _migrationUuid);
+ "migrationUuid"_attr = _migrationUuid,
+ "op"_attr = redact(entry.toBSONForLogging()));
// Check out the session.
scopedSession.emplace(opCtx.get());
@@ -558,37 +567,35 @@ void TenantOplogApplier::_writeSessionNoOpsForRange(
txnParticipant.getActiveTxnNumber() < txnNumber);
txnParticipant.beginOrContinueTransactionUnconditionally(opCtx.get(), txnNumber);
- OperationSessionInfo sessionInfo;
- sessionInfo.setSessionId(sessionId);
- sessionInfo.setTxnNumber(txnNumber);
- o2 = sessionInfo.toBSON();
+ // Only set sessionId and txnNumber for the final applyOp in a transaction.
+ noopEntry.setSessionId(sessionId);
+ noopEntry.setTxnNumber(txnNumber);
+
+ // Use the same wallclock time as the noop entry.
+ sessionTxnRecord.emplace(sessionId, txnNumber, OpTime(), noopEntry.getWallClockTime());
+ sessionTxnRecord->setState(DurableTxnStateEnum::kCommitted);
}
+ // TODO(SERVER-53510) Correctly fill in pre-image and post-image op times.
+ const boost::optional<OpTime> preImageOpTime = boost::none;
+ const boost::optional<OpTime> postImageOpTime = boost::none;
+ // TODO(SERVER-53509) Correctly fill in prevWriteOpTime for retryable writes.
+ const boost::optional<OpTime> prevWriteOpTimeInTransaction = boost::none;
+ noopEntry.setPreImageOpTime(preImageOpTime);
+ noopEntry.setPostImageOpTime(postImageOpTime);
+ noopEntry.setPrevWriteOpTimeInTransaction(prevWriteOpTimeInTransaction);
+
AutoGetOplog oplogWrite(opCtx.get(), OplogAccessMode::kWrite);
writeConflictRetry(
opCtx.get(), "writeTenantNoOps", NamespaceString::kRsOplogNamespace.ns(), [&] {
WriteUnitOfWork wuow(opCtx.get());
- // TODO(SERVER-53510) Correctly fill in pre-image and post-image op times.
- const boost::optional<OpTime> preImageOpTime = boost::none;
- const boost::optional<OpTime> postImageOpTime = boost::none;
- // TODO(SERVER-53509) Correctly fill in prevWriteOpTime for retryable writes.
- const boost::optional<OpTime> prevWriteOpTimeInTransaction = boost::none;
-
- // Write the noop entry and trigger config.transactions update.
- opObserver->onInternalOpMessage(
- opCtx.get(),
- entry.getNss(),
- entry.getUuid(),
- entry.getEntry().toBSON(),
- o2,
- // We link the no-ops together by recipient op time the same way the actual ops
- // were linked together by donor op time. This is to allow retryable writes
- // and changestreams to find the ops they need.
- preImageOpTime,
- postImageOpTime,
- prevWriteOpTimeInTransaction,
- *iter->second);
+ // Write the noop entry and update config.transactions.
+ repl::logOp(opCtx.get(), &noopEntry);
+ if (sessionTxnRecord) {
+ TransactionParticipant::get(opCtx.get())
+ .onWriteOpCompletedOnPrimary(opCtx.get(), {}, *sessionTxnRecord);
+ }
wuow.commit();
});
@@ -636,8 +643,8 @@ void TenantOplogApplier::_writeNoOpsForRange(OpObserver* opObserver,
opCtx.get(),
entry.getNss(),
entry.getUuid(),
+ {}, // Empty 'o' field.
entry.getEntry().toBSON(),
- BSONObj(),
// We link the no-ops together by recipient op time the same way the actual ops
// were linked together by donor op time. This is to allow retryable writes
// and changestreams to find the ops they need.
diff --git a/src/mongo/db/repl/tenant_oplog_applier.h b/src/mongo/db/repl/tenant_oplog_applier.h
index 58409d61e2b..357d0f00144 100644
--- a/src/mongo/db/repl/tenant_oplog_applier.h
+++ b/src/mongo/db/repl/tenant_oplog_applier.h
@@ -113,8 +113,7 @@ private:
void _writeNoOpsForRange(OpObserver* opObserver,
std::vector<TenantNoOpEntry>::const_iterator begin,
std::vector<TenantNoOpEntry>::const_iterator end);
- void _writeSessionNoOpsForRange(OpObserver* opObserver,
- std::vector<TenantNoOpEntry>::const_iterator begin,
+ void _writeSessionNoOpsForRange(std::vector<TenantNoOpEntry>::const_iterator begin,
std::vector<TenantNoOpEntry>::const_iterator end);
Status _applyOplogEntryOrGroupedInserts(OperationContext* opCtx,
diff --git a/src/mongo/db/repl/tenant_oplog_applier_test.cpp b/src/mongo/db/repl/tenant_oplog_applier_test.cpp
index 0ddff34b1fe..8b95d3dc606 100644
--- a/src/mongo/db/repl/tenant_oplog_applier_test.cpp
+++ b/src/mongo/db/repl/tenant_oplog_applier_test.cpp
@@ -160,7 +160,7 @@ public:
}
void assertNoOpMatches(const OplogEntry& op, const MutableOplogEntry& noOp) {
- ASSERT_BSONOBJ_EQ(op.getEntry().toBSON(), noOp.getObject());
+ ASSERT_BSONOBJ_EQ(op.getEntry().toBSON(), *noOp.getObject2());
ASSERT_EQ(op.getNss(), noOp.getNss());
ASSERT_EQ(op.getUuid(), noOp.getUuid());
ASSERT_EQ(_migrationUuid, noOp.getFromTenantMigration());