diff options
author | Lingzhi Deng <lingzhi.deng@mongodb.com> | 2021-03-04 17:37:12 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-03-04 19:12:57 +0000 |
commit | afbb044c9dc7535c3c048a38056e43d81a298d97 (patch) | |
tree | 72fa3c4975450121ea037630d05219f86f511567 | |
parent | ad77d6846c1104b282cbfdad426f68ca86b673ad (diff) | |
download | mongo-afbb044c9dc7535c3c048a38056e43d81a298d97.tar.gz |
SERVER-54912: Use o2 field for tenant migration no-op writes
-rw-r--r-- | jstests/replsets/tenant_migration_resume_oplog_application.js | 8 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 21 | ||||
-rw-r--r-- | src/mongo/db/repl/session_update_tracker.cpp | 149 | ||||
-rw-r--r-- | src/mongo/db/repl/session_update_tracker.h | 17 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_service.cpp | 49 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_service_test.cpp | 13 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_oplog_applier.cpp | 69 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_oplog_applier.h | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_oplog_applier_test.cpp | 2 |
9 files changed, 125 insertions, 206 deletions
diff --git a/jstests/replsets/tenant_migration_resume_oplog_application.js b/jstests/replsets/tenant_migration_resume_oplog_application.js index d39c3ea4bb4..5a8a65dc676 100644 --- a/jstests/replsets/tenant_migration_resume_oplog_application.js +++ b/jstests/replsets/tenant_migration_resume_oplog_application.js @@ -82,8 +82,8 @@ let resultsArr = appliedNoOps.toArray(); // We should have applied the no-op oplog entries for the first batch of documents (size 2). assert.eq(2, appliedNoOps.count(), appliedNoOps); // No-op entries will be in the same order. -assert.eq(docsToApply[0], resultsArr[0].o.o, resultsArr); -assert.eq(docsToApply[1], resultsArr[1].o.o, resultsArr); +assert.eq(docsToApply[0], resultsArr[0].o2.o, resultsArr); +assert.eq(docsToApply[1], resultsArr[1].o2.o, resultsArr); // Step up a new node in the recipient set and trigger a failover. The new primary should resume // fetching starting from the unapplied documents. @@ -100,9 +100,9 @@ local = newRecipientPrimary.getDB("local"); appliedNoOps = local.oplog.rs.find({fromTenantMigration: migrationId, op: "n"}); resultsArr = appliedNoOps.toArray(); assert.eq(3, appliedNoOps.count(), appliedNoOps); -assert.eq(docsToApply[2], resultsArr[2].o.o, resultsArr); +assert.eq(docsToApply[2], resultsArr[2].o2.o, resultsArr); tenantMigrationTest.checkTenantDBHashes(tenantId); tenantMigrationTest.stop(); recipientRst.stopSet(); -})();
\ No newline at end of file +})(); diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 59d6696c894..5659941af9b 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -696,27 +696,6 @@ void OpObserverImpl::onInternalOpMessage( oplogEntry.setOpTime(*slot); } logOperation(opCtx, &oplogEntry); - - if (repl::tenantMigrationRecipientInfo(opCtx) && o2MsgObj) { - OperationSessionInfo sessionInfo; - try { - sessionInfo = OperationSessionInfo::parse(IDLParserErrorContext("OperationSessionInfo"), - *o2MsgObj); - } catch (const DBException&) { - // Not a transaction no-op for tenant migration. - } - // TODO SERVER-53509: Handle retryable writes. - if (sessionInfo.getSessionId() && sessionInfo.getTxnNumber()) { - SessionTxnRecord sessionTxnRecord; - sessionTxnRecord.setSessionId(*sessionInfo.getSessionId()); - sessionTxnRecord.setTxnNum(*sessionInfo.getTxnNumber()); - sessionTxnRecord.setLastWriteOpTime(repl::OpTime()); - sessionTxnRecord.setLastWriteDate(oplogEntry.getWallClockTime()); - sessionTxnRecord.setState(DurableTxnStateEnum::kCommitted); - TransactionParticipant::get(opCtx).onWriteOpCompletedOnPrimary( - opCtx, {}, sessionTxnRecord); - } - } } void OpObserverImpl::onCreateCollection(OperationContext* opCtx, diff --git a/src/mongo/db/repl/session_update_tracker.cpp b/src/mongo/db/repl/session_update_tracker.cpp index e51155b19cc..bbd25a8b47d 100644 --- a/src/mongo/db/repl/session_update_tracker.cpp +++ b/src/mongo/db/repl/session_update_tracker.cpp @@ -111,7 +111,7 @@ boost::optional<repl::OplogEntry> createMatchingTransactionTableUpdate( * * 1) Have the 'fromTenantMigration' field set * 2) Be a no-op entry - * 3) Store session info inside the 'o2' field + * 3) Have sessionId and txnNumber */ bool isTransactionEntryFromTenantMigrations(OplogEntry& entry) { if (!entry.getFromTenantMigration()) { @@ -122,13 +122,7 @@ bool isTransactionEntryFromTenantMigrations(OplogEntry& entry) { return false; } - if (!entry.getObject2()) { - return false; - } - auto entryObject2 = *entry.getObject2(); - - if (!entryObject2.hasField(OperationSessionInfo::kSessionIdFieldName) || - !entryObject2.hasField(OperationSessionInfo::kTxnNumberFieldName)) { + if (!entry.getSessionId() || !entry.getTxnNumber()) { return false; } @@ -184,15 +178,7 @@ boost::optional<std::vector<OplogEntry>> SessionUpdateTracker::updateSession( // each entry originating from a multi-statement transaction. For this reason, we cannot defer // entries originating from multi-statement transactions. if (auto txnTableUpdate = _createTransactionTableUpdateFromTransactionOp(entry)) { - if (entry.getFromTenantMigration()) { - // If the entry is from tenant migrations, the session info will be stored inside of the - // 'o2' field. - auto sessionInfo = OperationSessionInfo::parse( - IDLParserErrorContext("OperationSessionInfo"), *entry.getObject2()); - _sessionsToUpdate.erase(*sessionInfo.getSessionId()); - } else { - _sessionsToUpdate.erase(*entry.getOperationSessionInfo().getSessionId()); - } + _sessionsToUpdate.erase(*entry.getOperationSessionInfo().getSessionId()); return boost::optional<std::vector<OplogEntry>>({*txnTableUpdate}); } @@ -298,97 +284,68 @@ std::vector<OplogEntry> SessionUpdateTracker::_flushForQueryPredicate( return opList; } -std::pair<BSONObj, LogicalSessionId> SessionUpdateTracker::_createSessionTxnRecordTenantMigration( +boost::optional<OplogEntry> SessionUpdateTracker::_createTransactionTableUpdateFromTransactionOp( const repl::OplogEntry& entry) { - SessionTxnRecord newTxnRecord; - - // For a tenant migrations transactions entry, all information is stored in the 'o2' field of - // the oplog entry. - auto entryObject2 = entry.getObject2(); - invariant(entryObject2); - auto sessionInfo = - OperationSessionInfo::parse(IDLParserErrorContext("OperationSessionInfo"), *entryObject2); + auto sessionInfo = entry.getOperationSessionInfo(); + // We only update the transaction table on the first partialTxn operation. + if (entry.isPartialTransaction() && !entry.getPrevWriteOpTimeInTransaction()->isNull()) { + return boost::none; + } invariant(sessionInfo.getSessionId()); - auto sessionId = *sessionInfo.getSessionId(); - invariant(sessionInfo.getTxnNumber()); - auto txnNumber = *sessionInfo.getTxnNumber(); - - newTxnRecord.setSessionId(sessionId); - newTxnRecord.setTxnNum(txnNumber); - newTxnRecord.setLastWriteOpTime(OpTime()); - newTxnRecord.setLastWriteDate(entry.getWallClockTime()); - newTxnRecord.setState(DurableTxnStateEnum::kCommitted); - return std::pair(newTxnRecord.toBSON(), sessionId); -} - -std::pair<BSONObj, LogicalSessionId> SessionUpdateTracker::_createSessionTxnRecordGeneral( - const repl::OplogEntry& entry) { - SessionTxnRecord newTxnRecord; + const auto updateBSON = [&] { + SessionTxnRecord newTxnRecord; + newTxnRecord.setSessionId(*sessionInfo.getSessionId()); + newTxnRecord.setTxnNum(*sessionInfo.getTxnNumber()); + newTxnRecord.setLastWriteOpTime(entry.getOpTime()); + newTxnRecord.setLastWriteDate(entry.getWallClockTime()); - auto sessionInfo = entry.getOperationSessionInfo(); - invariant(sessionInfo.getSessionId()); + if (entry.getFromTenantMigration() && entry.getOpType() == OpTypeEnum::kNoop) { + // For tenant migration, we don't need to set the lastWriteOpTime. + newTxnRecord.setLastWriteOpTime(OpTime()); + newTxnRecord.setState(DurableTxnStateEnum::kCommitted); + return newTxnRecord.toBSON(); + } - auto sessionId = *sessionInfo.getSessionId(); - newTxnRecord.setSessionId(sessionId); - newTxnRecord.setTxnNum(*sessionInfo.getTxnNumber()); - newTxnRecord.setLastWriteOpTime(entry.getOpTime()); - newTxnRecord.setLastWriteDate(entry.getWallClockTime()); - - if (entry.isPartialTransaction()) { - invariant(entry.getPrevWriteOpTimeInTransaction()->isNull()); - newTxnRecord.setState(DurableTxnStateEnum::kInProgress); - newTxnRecord.setStartOpTime(entry.getOpTime()); - return std::pair(newTxnRecord.toBSON(), sessionId); - } - switch (entry.getCommandType()) { - case repl::OplogEntry::CommandType::kApplyOps: - if (entry.shouldPrepare()) { - newTxnRecord.setState(DurableTxnStateEnum::kPrepared); - if (entry.getPrevWriteOpTimeInTransaction()->isNull()) { - // The prepare oplog entry is the first operation of the transaction. - newTxnRecord.setStartOpTime(entry.getOpTime()); + if (entry.isPartialTransaction()) { + invariant(entry.getPrevWriteOpTimeInTransaction()->isNull()); + newTxnRecord.setState(DurableTxnStateEnum::kInProgress); + newTxnRecord.setStartOpTime(entry.getOpTime()); + return newTxnRecord.toBSON(); + } + switch (entry.getCommandType()) { + case repl::OplogEntry::CommandType::kApplyOps: + if (entry.shouldPrepare()) { + newTxnRecord.setState(DurableTxnStateEnum::kPrepared); + if (entry.getPrevWriteOpTimeInTransaction()->isNull()) { + // The prepare oplog entry is the first operation of the transaction. + newTxnRecord.setStartOpTime(entry.getOpTime()); + } else { + // Update the transaction record using $set to avoid overwriting the + // startOpTime. + return BSON("$set" << newTxnRecord.toBSON()); + } } else { - // Update the transaction record using $set to avoid overwriting the - // startOpTime. - return std::pair(BSON("$set" << newTxnRecord.toBSON()), sessionId); + newTxnRecord.setState(DurableTxnStateEnum::kCommitted); } - } else { + break; + case repl::OplogEntry::CommandType::kCommitTransaction: newTxnRecord.setState(DurableTxnStateEnum::kCommitted); - } - break; - case repl::OplogEntry::CommandType::kCommitTransaction: - newTxnRecord.setState(DurableTxnStateEnum::kCommitted); - break; - case repl::OplogEntry::CommandType::kAbortTransaction: - newTxnRecord.setState(DurableTxnStateEnum::kAborted); - break; - default: - break; - } - return std::pair(newTxnRecord.toBSON(), sessionId); -} - -boost::optional<OplogEntry> SessionUpdateTracker::_createTransactionTableUpdateFromTransactionOp( - const repl::OplogEntry& entry) { - // We only update the transaction table on the first partialTxn operation. - if (entry.isPartialTransaction() && !entry.getPrevWriteOpTimeInTransaction()->isNull()) { - return boost::none; - } - - std::pair<BSONObj, LogicalSessionId> updatePair; - if (entry.getOpType() == OpTypeEnum::kNoop) { - // If the oplog entry is a no-op, we must have received it from tenant migrations. - updatePair = _createSessionTxnRecordTenantMigration(entry); - } else { - updatePair = _createSessionTxnRecordGeneral(entry); - } + break; + case repl::OplogEntry::CommandType::kAbortTransaction: + newTxnRecord.setState(DurableTxnStateEnum::kAborted); + break; + default: + break; + } + return newTxnRecord.toBSON(); + }(); return createOplogEntryForTransactionTableUpdate( entry.getOpTime(), - updatePair.first, - BSON(SessionTxnRecord::kSessionIdFieldName << updatePair.second.toBSON()), + updateBSON, + BSON(SessionTxnRecord::kSessionIdFieldName << sessionInfo.getSessionId()->toBSON()), entry.getWallClockTime()); } diff --git a/src/mongo/db/repl/session_update_tracker.h b/src/mongo/db/repl/session_update_tracker.h index 6d65a2a7609..4d84af6be96 100644 --- a/src/mongo/db/repl/session_update_tracker.h +++ b/src/mongo/db/repl/session_update_tracker.h @@ -90,23 +90,6 @@ private: boost::optional<std::vector<OplogEntry>> _updateOrFlush(const OplogEntry& entry); /** - * Creates a new session transaction record from a tenant migration transaction oplog entry. The - * oplog entry should be no-op, and all information needed to update should be stored in the - * 'o2' field of the oplog entry. - * - * Returns a pair of the session transaction record in BSON and the logical session ID. - */ - std::pair<BSONObj, LogicalSessionId> _createSessionTxnRecordTenantMigration( - const OplogEntry& entry); - - /** - * Creates a new session transaction record from a general transaction oplog entry. - * - * Returns a pair of the session transaction record in BSON and the logical session ID. - */ - std::pair<BSONObj, LogicalSessionId> _createSessionTxnRecordGeneral(const OplogEntry& entry); - - /** * Returns an update to the transaction table generated by a transaction operation. This returns * boost::none if the operation is a partialTxn operation that is not the first of the * transaction. diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp index f9ec0612fdb..6efb5f53ce4 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp @@ -857,8 +857,6 @@ AggregateCommand TenantMigrationRecipientService::Instance::_makeCommittedTransa void TenantMigrationRecipientService::Instance::_processCommittedTransactionEntry( const BSONObj& entry) { - OpObserver* opObserver = cc().getServiceContext()->getOpObserver(); - auto sessionTxnRecord = SessionTxnRecord::parse(IDLParserErrorContext("SessionTxnRecord"), entry); auto sessionId = sessionTxnRecord.getSessionId(); @@ -905,27 +903,28 @@ void TenantMigrationRecipientService::Instance::_processCommittedTransactionEntr txnParticipant.beginOrContinueTransactionUnconditionally(opCtx, txnNumber); + MutableOplogEntry noopEntry; + noopEntry.setOpType(repl::OpTypeEnum::kNoop); + noopEntry.setNss({}); + noopEntry.setObject({}); + noopEntry.setWallClockTime(opCtx->getServiceContext()->getFastClockSource()->now()); + noopEntry.setSessionId(sessionId); + noopEntry.setTxnNumber(txnNumber); + + // Use the same wallclock time as the noop entry. + sessionTxnRecord.setStartOpTime(boost::none); + sessionTxnRecord.setLastWriteOpTime(OpTime()); + sessionTxnRecord.setLastWriteDate(noopEntry.getWallClockTime()); + AutoGetOplog oplogWrite(opCtx, OplogAccessMode::kWrite); writeConflictRetry( opCtx, "writeDonorCommittedTxnEntry", NamespaceString::kRsOplogNamespace.ns(), [&] { WriteUnitOfWork wuow(opCtx); - OperationSessionInfo sessionInfo; - sessionInfo.setSessionId(sessionId); - sessionInfo.setTxnNumber(txnNumber); - auto sessionInfoBson = sessionInfo.toBSON(); - - // Write the no-op entry and trigger 'config.transactions' update. - opObserver->onInternalOpMessage(opCtx, - NamespaceString(), - boost::none /* uuid */, - {} /* msgObj */, - sessionInfoBson /* o2MsgObj */, - boost::none /* preImageOpTime */, - boost::none /* postImageOpTime */, - boost::none /* prevWriteOpTimeInTransaction */, - boost::none /* slot */ - ); + // Write the no-op entry and update 'config.transactions'. + repl::logOp(opCtx, &noopEntry); + TransactionParticipant::get(opCtx).onWriteOpCompletedOnPrimary( + opCtx, {}, sessionTxnRecord); wuow.commit(); }); @@ -933,7 +932,6 @@ void TenantMigrationRecipientService::Instance::_processCommittedTransactionEntr // Invalidate in-memory state so that the next time the session is checked out, it would reload // the transaction state from 'config.transactions'. txnParticipant.invalidate(opCtx); - return; } void TenantMigrationRecipientService::Instance::_fetchCommittedTransactionsBeforeStartOpTime() { @@ -1234,14 +1232,9 @@ OpTime TenantMigrationRecipientService::Instance::_getOplogResumeApplyingDonorOp getMigrationUUID()); // Find the most recent no-op oplog entry from the current migration. if (isFromCurrentMigration && - (oplogObj.getStringField("op") == OpType_serializer(repl::OpTypeEnum::kNoop))) { - const auto migratedEntryObj = oplogObj.getObjectField("o"); - if (migratedEntryObj.isEmpty()) { - // If the 'o' field is empty, this entry is a transaction no-op entry. Skip this - // entry and continue. - continue; - } - + (oplogObj.getStringField("op") == OpType_serializer(repl::OpTypeEnum::kNoop)) && + oplogObj.hasField("o2")) { + const auto migratedEntryObj = oplogObj.getObjectField("o2"); const auto swDonorOpTime = repl::OpTime::parseFromOplogEntry(migratedEntryObj); uassert(5272305, str::stream() << "Unable to parse opTime from tenant migration oplog entry: " @@ -2098,4 +2091,4 @@ const std::string& TenantMigrationRecipientService::Instance::getTenantId() cons } } // namespace repl -} // namespace mongo
\ No newline at end of file +} // namespace mongo diff --git a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp index e10c08ba493..d497254c649 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp @@ -113,7 +113,8 @@ MutableOplogEntry makeNoOpOplogEntry(OpTime opTime, oplogEntry.setOpType(repl::OpTypeEnum::kNoop); oplogEntry.setOpTime(opTime); oplogEntry.setNss(nss); - oplogEntry.setObject(o); + oplogEntry.setObject({}); + oplogEntry.setObject2(o); oplogEntry.setWallClockTime(Date_t::now()); if (migrationUUID) { oplogEntry.setFromTenantMigration(migrationUUID.get()); @@ -1913,11 +1914,11 @@ TEST_F(TenantMigrationRecipientServiceTest, OplogApplierResumesFromStartDonorApp // Create and insert the following into the oplog: // - (1) An oplog entry with opTime earlier than 'cloneFinishedRecipientOpTime'. // - (2) An oplog entry with opTime greater than 'cloneFinishedRecipientOpTime'. - // - (3) A no-op oplog entry with an inner donor oplog entry as the 'o' field. The donor opTime - // is less than the 'startApplyingDonorOpTime'. - // - (4) A no-op oplog entry with an inner oplog entry as the 'o' field but no 'fromMigrate' - // field. These oplog entries do not satisfy the conditions for the oplog applier to resume from - // so we default to resuming from 'startDonorApplyingOpTime'. + // - (3) A no-op oplog entry with an inner donor oplog entry as the 'o2' field. The donor opTime + // is less than the 'startApplyingDonorOpTime'. + // - (4) A no-op oplog entry with an inner oplog entry as the 'o2' field but no + // 'fromTenantMigrate' field. These oplog entries do not satisfy the conditions for the + // oplog applier to resume from so we default to resuming from 'startDonorApplyingOpTime'. const auto insertNss = NamespaceString("tenantA_foo.bar"); const auto beforeStartApplyingOpTime = OpTime(Timestamp(1, 1), 1); const auto entryBeforeStartApplyingOpTime = makeOplogEntry( diff --git a/src/mongo/db/repl/tenant_oplog_applier.cpp b/src/mongo/db/repl/tenant_oplog_applier.cpp index c85e053afbe..0170a8bdac8 100644 --- a/src/mongo/db/repl/tenant_oplog_applier.cpp +++ b/src/mongo/db/repl/tenant_oplog_applier.cpp @@ -477,7 +477,7 @@ TenantOplogApplier::OpTimePair TenantOplogApplier::_writeNoOpEntries( status = scheduleStatus; } else { try { - _writeSessionNoOpsForRange(opObserver, s.second.begin(), s.second.end()); + _writeSessionNoOpsForRange(s.second.begin(), s.second.end()); } catch (const DBException& e) { status = e.toStatus(); } @@ -503,7 +503,6 @@ TenantOplogApplier::OpTimePair TenantOplogApplier::_writeNoOpEntries( } void TenantOplogApplier::_writeSessionNoOpsForRange( - OpObserver* opObserver, std::vector<TenantNoOpEntry>::const_iterator begin, std::vector<TenantNoOpEntry>::const_iterator end) { auto opCtx = cc().makeOperationContext(); @@ -521,14 +520,23 @@ void TenantOplogApplier::_writeSessionNoOpsForRange( invariant(!isResumeTokenNoop(entry)); invariant(entry.getSessionId()); + MutableOplogEntry noopEntry; + noopEntry.setOpType(repl::OpTypeEnum::kNoop); + noopEntry.setNss(entry.getNss()); + noopEntry.setUuid(entry.getUuid()); + noopEntry.setObject({}); // Empty 'o' field. + noopEntry.setObject2(entry.getEntry().toBSON()); + noopEntry.setOpTime(*iter->second); + noopEntry.setWallClockTime(opCtx->getServiceContext()->getFastClockSource()->now()); + boost::optional<MongoDOperationContextSession> scopedSession; - boost::optional<BSONObj> o2; + boost::optional<SessionTxnRecord> sessionTxnRecord; if (entry.getTxnNumber() && !entry.isPartialTransaction() && (entry.getCommandType() == repl::OplogEntry::CommandType::kCommitTransaction || entry.getCommandType() == repl::OplogEntry::CommandType::kApplyOps)) { // Final applyOp for a transaction. - const auto& sessionId = *entry.getSessionId(); - const auto& txnNumber = *entry.getTxnNumber(); + auto sessionId = *entry.getSessionId(); + auto txnNumber = *entry.getTxnNumber(); opCtx->setLogicalSessionId(sessionId); opCtx->setTxnNumber(txnNumber); opCtx->setInMultiDocumentTransaction(); @@ -538,7 +546,8 @@ void TenantOplogApplier::_writeSessionNoOpsForRange( "sessionId"_attr = sessionId, "txnNumber"_attr = txnNumber, "tenant"_attr = _tenantId, - "migrationUuid"_attr = _migrationUuid); + "migrationUuid"_attr = _migrationUuid, + "op"_attr = redact(entry.toBSONForLogging())); // Check out the session. scopedSession.emplace(opCtx.get()); @@ -558,37 +567,35 @@ void TenantOplogApplier::_writeSessionNoOpsForRange( txnParticipant.getActiveTxnNumber() < txnNumber); txnParticipant.beginOrContinueTransactionUnconditionally(opCtx.get(), txnNumber); - OperationSessionInfo sessionInfo; - sessionInfo.setSessionId(sessionId); - sessionInfo.setTxnNumber(txnNumber); - o2 = sessionInfo.toBSON(); + // Only set sessionId and txnNumber for the final applyOp in a transaction. + noopEntry.setSessionId(sessionId); + noopEntry.setTxnNumber(txnNumber); + + // Use the same wallclock time as the noop entry. + sessionTxnRecord.emplace(sessionId, txnNumber, OpTime(), noopEntry.getWallClockTime()); + sessionTxnRecord->setState(DurableTxnStateEnum::kCommitted); } + // TODO(SERVER-53510) Correctly fill in pre-image and post-image op times. + const boost::optional<OpTime> preImageOpTime = boost::none; + const boost::optional<OpTime> postImageOpTime = boost::none; + // TODO(SERVER-53509) Correctly fill in prevWriteOpTime for retryable writes. + const boost::optional<OpTime> prevWriteOpTimeInTransaction = boost::none; + noopEntry.setPreImageOpTime(preImageOpTime); + noopEntry.setPostImageOpTime(postImageOpTime); + noopEntry.setPrevWriteOpTimeInTransaction(prevWriteOpTimeInTransaction); + AutoGetOplog oplogWrite(opCtx.get(), OplogAccessMode::kWrite); writeConflictRetry( opCtx.get(), "writeTenantNoOps", NamespaceString::kRsOplogNamespace.ns(), [&] { WriteUnitOfWork wuow(opCtx.get()); - // TODO(SERVER-53510) Correctly fill in pre-image and post-image op times. - const boost::optional<OpTime> preImageOpTime = boost::none; - const boost::optional<OpTime> postImageOpTime = boost::none; - // TODO(SERVER-53509) Correctly fill in prevWriteOpTime for retryable writes. - const boost::optional<OpTime> prevWriteOpTimeInTransaction = boost::none; - - // Write the noop entry and trigger config.transactions update. - opObserver->onInternalOpMessage( - opCtx.get(), - entry.getNss(), - entry.getUuid(), - entry.getEntry().toBSON(), - o2, - // We link the no-ops together by recipient op time the same way the actual ops - // were linked together by donor op time. This is to allow retryable writes - // and changestreams to find the ops they need. - preImageOpTime, - postImageOpTime, - prevWriteOpTimeInTransaction, - *iter->second); + // Write the noop entry and update config.transactions. + repl::logOp(opCtx.get(), &noopEntry); + if (sessionTxnRecord) { + TransactionParticipant::get(opCtx.get()) + .onWriteOpCompletedOnPrimary(opCtx.get(), {}, *sessionTxnRecord); + } wuow.commit(); }); @@ -636,8 +643,8 @@ void TenantOplogApplier::_writeNoOpsForRange(OpObserver* opObserver, opCtx.get(), entry.getNss(), entry.getUuid(), + {}, // Empty 'o' field. entry.getEntry().toBSON(), - BSONObj(), // We link the no-ops together by recipient op time the same way the actual ops // were linked together by donor op time. This is to allow retryable writes // and changestreams to find the ops they need. diff --git a/src/mongo/db/repl/tenant_oplog_applier.h b/src/mongo/db/repl/tenant_oplog_applier.h index 58409d61e2b..357d0f00144 100644 --- a/src/mongo/db/repl/tenant_oplog_applier.h +++ b/src/mongo/db/repl/tenant_oplog_applier.h @@ -113,8 +113,7 @@ private: void _writeNoOpsForRange(OpObserver* opObserver, std::vector<TenantNoOpEntry>::const_iterator begin, std::vector<TenantNoOpEntry>::const_iterator end); - void _writeSessionNoOpsForRange(OpObserver* opObserver, - std::vector<TenantNoOpEntry>::const_iterator begin, + void _writeSessionNoOpsForRange(std::vector<TenantNoOpEntry>::const_iterator begin, std::vector<TenantNoOpEntry>::const_iterator end); Status _applyOplogEntryOrGroupedInserts(OperationContext* opCtx, diff --git a/src/mongo/db/repl/tenant_oplog_applier_test.cpp b/src/mongo/db/repl/tenant_oplog_applier_test.cpp index 0ddff34b1fe..8b95d3dc606 100644 --- a/src/mongo/db/repl/tenant_oplog_applier_test.cpp +++ b/src/mongo/db/repl/tenant_oplog_applier_test.cpp @@ -160,7 +160,7 @@ public: } void assertNoOpMatches(const OplogEntry& op, const MutableOplogEntry& noOp) { - ASSERT_BSONOBJ_EQ(op.getEntry().toBSON(), noOp.getObject()); + ASSERT_BSONOBJ_EQ(op.getEntry().toBSON(), *noOp.getObject2()); ASSERT_EQ(op.getNss(), noOp.getNss()); ASSERT_EQ(op.getUuid(), noOp.getUuid()); ASSERT_EQ(_migrationUuid, noOp.getFromTenantMigration()); |