diff options
author | Benety Goh <benety@mongodb.com> | 2022-07-17 08:23:16 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-07-17 12:52:53 +0000 |
commit | aed8e3a74d7f7bd5fc2d607d86d79636f9127cf2 (patch) | |
tree | e267b63a0efdcff3a814856b9e0111f37f6d6430 | |
parent | 52a596efa4256d42d895ee56ebfa4c7328f83164 (diff) | |
download | mongo-aed8e3a74d7f7bd5fc2d607d86d79636f9127cf2.tar.gz |
SERVER-67508 redirect OpObserverImpl oplog access through OplogWriter
-rw-r--r-- | src/mongo/db/op_observer/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/op_observer/op_observer_impl.cpp | 120 |
2 files changed, 72 insertions, 49 deletions
diff --git a/src/mongo/db/op_observer/SConscript b/src/mongo/db/op_observer/SConscript index 8cce8f0d18d..7042f75bc9b 100644 --- a/src/mongo/db/op_observer/SConscript +++ b/src/mongo/db/op_observer/SConscript @@ -72,7 +72,6 @@ env.Library( '$BUILD_DIR/mongo/db/pipeline/change_stream_preimage', '$BUILD_DIR/mongo/db/read_write_concern_defaults', '$BUILD_DIR/mongo/db/repl/image_collection_entry', - '$BUILD_DIR/mongo/db/repl/oplog', '$BUILD_DIR/mongo/db/repl/repl_server_parameters', '$BUILD_DIR/mongo/db/repl/tenant_migration_access_blocker', '$BUILD_DIR/mongo/db/s/sharding_api_d', diff --git a/src/mongo/db/op_observer/op_observer_impl.cpp b/src/mongo/db/op_observer/op_observer_impl.cpp index d697e4d34d4..055c0fc5150 100644 --- a/src/mongo/db/op_observer/op_observer_impl.cpp +++ b/src/mongo/db/op_observer/op_observer_impl.cpp @@ -60,7 +60,6 @@ #include "mongo/db/pipeline/change_stream_preimage_gen.h" #include "mongo/db/read_write_concern_defaults.h" #include "mongo/db/repl/image_collection_entry_gen.h" -#include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplog_entry_gen.h" #include "mongo/db/repl/repl_server_parameters_gen.h" #include "mongo/db/repl/replication_coordinator.h" @@ -111,12 +110,13 @@ Date_t getWallClockTimeForOpLog(OperationContext* opCtx) { repl::OpTime logOperation(OperationContext* opCtx, MutableOplogEntry* oplogEntry, - bool assignWallClockTime = true) { + bool assignWallClockTime, + OplogWriter* oplogWriter) { if (assignWallClockTime) { oplogEntry->setWallClockTime(getWallClockTimeForOpLog(opCtx)); } auto& times = OpObserver::Times::get(opCtx).reservedOpTimes; - auto opTime = repl::logOp(opCtx, oplogEntry); + auto opTime = oplogWriter->logOp(opCtx, oplogEntry); times.push_back(opTime); return opTime; } @@ -191,13 +191,14 @@ struct ImageBundle { */ OpTimeBundle replLogUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args, - MutableOplogEntry* oplogEntry) { + MutableOplogEntry* oplogEntry, + OplogWriter* oplogWriter) { oplogEntry->setTid(args.nss.tenantId()); oplogEntry->setNss(args.nss); oplogEntry->setUuid(args.uuid); repl::OplogLink oplogLink; - repl::appendOplogEntryChainInfo(opCtx, oplogEntry, &oplogLink, args.updateArgs->stmtIds); + oplogWriter->appendOplogEntryChainInfo(opCtx, oplogEntry, &oplogLink, args.updateArgs->stmtIds); OpTimeBundle opTimes; // We never want to store pre- or post- images when we're migrating oplog entries from another @@ -227,7 +228,8 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx, noopEntry.setOpTime(repl::OpTime(reservedOplogSlots.front().getTimestamp(), reservedOplogSlots.front().getTerm())); } - oplogLink.preImageOpTime = logOperation(opCtx, &noopEntry); + oplogLink.preImageOpTime = + logOperation(opCtx, &noopEntry, true /*assignWallClockTime*/, oplogWriter); if (storePreImageInOplogForRetryableWrite) { opTimes.prePostImageOpTime = oplogLink.preImageOpTime; } @@ -239,7 +241,8 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx, MutableOplogEntry noopEntry = *oplogEntry; noopEntry.setOpType(repl::OpTypeEnum::kNoop); noopEntry.setObject(args.updateArgs->updatedDoc); - oplogLink.postImageOpTime = logOperation(opCtx, &noopEntry); + oplogLink.postImageOpTime = + logOperation(opCtx, &noopEntry, true /*assignWallClockTime*/, oplogWriter); invariant(opTimes.prePostImageOpTime.isNull()); opTimes.prePostImageOpTime = oplogLink.postImageOpTime; } @@ -249,11 +252,12 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx, oplogEntry->setObject2(args.updateArgs->criteria); oplogEntry->setFromMigrateIfTrue(args.updateArgs->source == OperationSource::kFromMigrate); // oplogLink could have been changed to include pre/postImageOpTime by the previous no-op write. - repl::appendOplogEntryChainInfo(opCtx, oplogEntry, &oplogLink, args.updateArgs->stmtIds); + oplogWriter->appendOplogEntryChainInfo(opCtx, oplogEntry, &oplogLink, args.updateArgs->stmtIds); if (!args.updateArgs->oplogSlots.empty()) { oplogEntry->setOpTime(args.updateArgs->oplogSlots.back()); } - opTimes.writeOpTime = logOperation(opCtx, oplogEntry); + opTimes.writeOpTime = + logOperation(opCtx, oplogEntry, true /*assignWallClockTime*/, oplogWriter); opTimes.wallClockTime = oplogEntry->getWallClockTime(); return opTimes; } @@ -267,14 +271,15 @@ OpTimeBundle replLogDelete(OperationContext* opCtx, const boost::optional<UUID>& uuid, StmtId stmtId, bool fromMigrate, - const boost::optional<BSONObj>& deletedDoc) { + const boost::optional<BSONObj>& deletedDoc, + OplogWriter* oplogWriter) { oplogEntry->setTid(nss.tenantId()); oplogEntry->setNss(nss); oplogEntry->setUuid(uuid); oplogEntry->setDestinedRecipient(destinedRecipientDecoration(opCtx)); repl::OplogLink oplogLink; - repl::appendOplogEntryChainInfo(opCtx, oplogEntry, &oplogLink, {stmtId}); + oplogWriter->appendOplogEntryChainInfo(opCtx, oplogEntry, &oplogLink, {stmtId}); OpTimeBundle opTimes; // We never want to store pre-images when we're migrating oplog entries from another @@ -284,7 +289,7 @@ OpTimeBundle replLogDelete(OperationContext* opCtx, MutableOplogEntry noopEntry = *oplogEntry; noopEntry.setOpType(repl::OpTypeEnum::kNoop); noopEntry.setObject(*deletedDoc); - auto noteOplog = logOperation(opCtx, &noopEntry); + auto noteOplog = logOperation(opCtx, &noopEntry, true /*assignWallClockTime*/, oplogWriter); opTimes.prePostImageOpTime = noteOplog; oplogLink.preImageOpTime = noteOplog; } @@ -293,8 +298,9 @@ OpTimeBundle replLogDelete(OperationContext* opCtx, oplogEntry->setObject(repl::documentKeyDecoration(opCtx).get().getShardKeyAndId()); oplogEntry->setFromMigrateIfTrue(fromMigrate); // oplogLink could have been changed to include preImageOpTime by the previous no-op write. - repl::appendOplogEntryChainInfo(opCtx, oplogEntry, &oplogLink, {stmtId}); - opTimes.writeOpTime = logOperation(opCtx, oplogEntry); + oplogWriter->appendOplogEntryChainInfo(opCtx, oplogEntry, &oplogLink, {stmtId}); + opTimes.writeOpTime = + logOperation(opCtx, oplogEntry, true /*assignWallClockTime*/, oplogWriter); opTimes.wallClockTime = oplogEntry->getWallClockTime(); return opTimes; } @@ -387,7 +393,7 @@ void OpObserverImpl::onCreateIndex(OperationContext* opCtx, oplogEntry.setUuid(uuid); oplogEntry.setObject(builder.done()); oplogEntry.setFromMigrateIfTrue(fromMigrate); - logOperation(opCtx, &oplogEntry); + logOperation(opCtx, &oplogEntry, true /*assignWallClockTime*/, _oplogWriter.get()); } } @@ -416,7 +422,7 @@ void OpObserverImpl::onStartIndexBuild(OperationContext* opCtx, oplogEntry.setUuid(collUUID); oplogEntry.setObject(oplogEntryBuilder.done()); oplogEntry.setFromMigrateIfTrue(fromMigrate); - logOperation(opCtx, &oplogEntry); + logOperation(opCtx, &oplogEntry, true /*assignWallClockTime*/, _oplogWriter.get()); } void OpObserverImpl::onStartIndexBuildSinglePhase(OperationContext* opCtx, @@ -481,7 +487,7 @@ void OpObserverImpl::onCommitIndexBuild(OperationContext* opCtx, oplogEntry.setUuid(collUUID); oplogEntry.setObject(oplogEntryBuilder.done()); oplogEntry.setFromMigrateIfTrue(fromMigrate); - logOperation(opCtx, &oplogEntry); + logOperation(opCtx, &oplogEntry, true /*assignWallClockTime*/, _oplogWriter.get()); } void OpObserverImpl::onAbortIndexBuild(OperationContext* opCtx, @@ -517,7 +523,7 @@ void OpObserverImpl::onAbortIndexBuild(OperationContext* opCtx, oplogEntry.setUuid(collUUID); oplogEntry.setObject(oplogEntryBuilder.done()); oplogEntry.setFromMigrateIfTrue(fromMigrate); - logOperation(opCtx, &oplogEntry); + logOperation(opCtx, &oplogEntry, true /*assignWallClockTime*/, _oplogWriter.get()); } void OpObserverImpl::onInserts(OperationContext* opCtx, @@ -619,8 +625,8 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, lastWriteDate = getWallClockTimeForOpLog(opCtx); oplogEntryTemplate.setWallClockTime(lastWriteDate); - opTimeList = - repl::logInsertOps(opCtx, &oplogEntryTemplate, first, last, getDestinedRecipientFn); + opTimeList = _oplogWriter->logInsertOps( + opCtx, &oplogEntryTemplate, first, last, getDestinedRecipientFn); if (!opTimeList.empty()) lastOpTime = opTimeList.back(); @@ -809,7 +815,7 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg } } - opTime = replLogUpdate(opCtx, args, &oplogEntry); + opTime = replLogUpdate(opCtx, args, &oplogEntry, _oplogWriter.get()); if (oplogEntry.getNeedsRetryImage()) { // If the oplog entry has `needsRetryImage`, copy the image into image collection. @@ -1017,8 +1023,14 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, oplogEntry.setOpTime(args.oplogSlots.back()); } } - opTime = replLogDelete( - opCtx, nss, &oplogEntry, uuid, stmtId, args.fromMigrate, deletedDocForOplog); + opTime = replLogDelete(opCtx, + nss, + &oplogEntry, + uuid, + stmtId, + args.fromMigrate, + deletedDocForOplog, + _oplogWriter.get()); if (oplogEntry.getNeedsRetryImage()) { writeToImageCollection(opCtx, @@ -1105,7 +1117,7 @@ void OpObserverImpl::onInternalOpMessage( if (slot) { oplogEntry.setOpTime(*slot); } - logOperation(opCtx, &oplogEntry); + logOperation(opCtx, &oplogEntry, true /*assignWallClockTime*/, _oplogWriter.get()); } void OpObserverImpl::onCreateCollection(OperationContext* opCtx, @@ -1138,7 +1150,7 @@ void OpObserverImpl::onCreateCollection(OperationContext* opCtx, MutableOplogEntry::makeCreateCollCmdObj(collectionName, options, idIndex)); oplogEntry.setOpTime(createOpTime); oplogEntry.setFromMigrateIfTrue(fromMigrate); - logOperation(opCtx, &oplogEntry); + logOperation(opCtx, &oplogEntry, true /*assignWallClockTime*/, _oplogWriter.get()); } } @@ -1181,7 +1193,7 @@ void OpObserverImpl::onCollMod(OperationContext* opCtx, oplogEntry.setUuid(uuid); oplogEntry.setObject(repl::makeCollModCmdObj(collModCmd, oldCollOptions, indexInfo)); oplogEntry.setObject2(o2Builder.done()); - logOperation(opCtx, &oplogEntry); + logOperation(opCtx, &oplogEntry, true /*assignWallClockTime*/, _oplogWriter.get()); } // Make sure the UUID values in the Collection metadata, the Collection object, and the UUID @@ -1206,7 +1218,7 @@ void OpObserverImpl::onDropDatabase(OperationContext* opCtx, const DatabaseName& oplogEntry.setTid(dbName.tenantId()); oplogEntry.setNss({dbName, "$cmd"}); oplogEntry.setObject(BSON("dropDatabase" << 1)); - logOperation(opCtx, &oplogEntry); + logOperation(opCtx, &oplogEntry, true /*assignWallClockTime*/, _oplogWriter.get()); uassert(50714, "dropping the admin database is not allowed.", @@ -1245,7 +1257,7 @@ repl::OpTime OpObserverImpl::onDropCollection(OperationContext* opCtx, oplogEntry.setFromMigrateIfTrue(markFromMigrate); oplogEntry.setObject(BSON("drop" << collectionName.coll())); oplogEntry.setObject2(makeObject2ForDropOrRename(numRecords)); - logOperation(opCtx, &oplogEntry); + logOperation(opCtx, &oplogEntry, true /*assignWallClockTime*/, _oplogWriter.get()); } uassert(50715, @@ -1294,7 +1306,7 @@ void OpObserverImpl::onDropIndex(OperationContext* opCtx, oplogEntry.setUuid(uuid); oplogEntry.setObject(BSON("dropIndexes" << nss.coll() << "index" << indexName)); oplogEntry.setObject2(indexInfo); - logOperation(opCtx, &oplogEntry); + logOperation(opCtx, &oplogEntry, true /*assignWallClockTime*/, _oplogWriter.get()); } repl::OpTime OpObserverImpl::preRenameCollection(OperationContext* const opCtx, @@ -1348,7 +1360,7 @@ repl::OpTime OpObserverImpl::preRenameCollection(OperationContext* const opCtx, oplogEntry.setObject(builder.done()); if (dropTargetUUID) oplogEntry.setObject2(makeObject2ForDropOrRename(numRecords)); - logOperation(opCtx, &oplogEntry); + logOperation(opCtx, &oplogEntry, true /*assignWallClockTime*/, _oplogWriter.get()); return {}; } @@ -1418,7 +1430,7 @@ void OpObserverImpl::onImportCollection(OperationContext* opCtx, oplogEntry.setTid(nss.tenantId()); oplogEntry.setNss(nss.getCommandNS()); oplogEntry.setObject(importCollection.toBSON()); - logOperation(opCtx, &oplogEntry); + logOperation(opCtx, &oplogEntry, true /*assignWallClockTime*/, _oplogWriter.get()); } void OpObserverImpl::onApplyOps(OperationContext* opCtx, @@ -1430,7 +1442,7 @@ void OpObserverImpl::onApplyOps(OperationContext* opCtx, oplogEntry.setTid(dbName.tenantId()); oplogEntry.setNss({dbName, "$cmd"}); oplogEntry.setObject(applyOpCmd); - logOperation(opCtx, &oplogEntry); + logOperation(opCtx, &oplogEntry, true /*assignWallClockTime*/, _oplogWriter.get()); } void OpObserverImpl::onEmptyCapped(OperationContext* opCtx, @@ -1445,7 +1457,7 @@ void OpObserverImpl::onEmptyCapped(OperationContext* opCtx, oplogEntry.setNss(collectionName.getCommandNS()); oplogEntry.setUuid(uuid); oplogEntry.setObject(BSON("emptycapped" << collectionName.coll())); - logOperation(opCtx, &oplogEntry); + logOperation(opCtx, &oplogEntry, true /*assignWallClockTime*/, _oplogWriter.get()); } } @@ -1743,7 +1755,8 @@ OpTimeBundle logApplyOps(OperationContext* opCtx, boost::optional<DurableTxnStateEnum> txnState, boost::optional<repl::OpTime> startOpTime, std::vector<StmtId> stmtIdsWritten, - const bool updateTxnTable) { + const bool updateTxnTable, + OplogWriter* oplogWriter) { if (!stmtIdsWritten.empty()) { invariant(isInternalSessionForRetryableWrite(*opCtx->getLogicalSessionId())); } @@ -1765,7 +1778,8 @@ OpTimeBundle logApplyOps(OperationContext* opCtx, try { OpTimeBundle times; - times.writeOpTime = logOperation(opCtx, oplogEntry, false /*assignWallClockTime*/); + times.writeOpTime = + logOperation(opCtx, oplogEntry, false /*assignWallClockTime*/, oplogWriter); times.wallClockTime = oplogEntry->getWallClockTime(); if (updateTxnTable) { SessionTxnRecord sessionTxnRecord; @@ -1821,7 +1835,8 @@ int logOplogEntries( boost::optional<ImageBundle>* prePostImageToWriteToImageCollection, size_t numberOfPrePostImagesToWrite, bool prepare, - Date_t wallClockTime) { + Date_t wallClockTime, + OplogWriter* oplogWriter) { invariant(!stmts->empty()); // Storage transaction commit is the last place inside a transaction that can throw an @@ -1864,7 +1879,7 @@ int logOplogEntries( imageEntry.setOpTime(slot); imageEntry.setDestinedRecipient(statement.getDestinedRecipient()); - logOperation(opCtx, &imageEntry); + logOperation(opCtx, &imageEntry, true /*assignWallClockTime*/, oplogWriter); }; if (numberOfPrePostImagesToWrite > 0 && !migrationRecipientInfo) { @@ -1986,7 +2001,8 @@ int logOplogEntries( txnState, startOpTime, (lastOp ? std::move(stmtIdsWritten) : std::vector<StmtId>{}), - updateTxnTable); + updateTxnTable, + oplogWriter); hangAfterLoggingApplyOpsForTransaction.pauseWhileSet(); @@ -2007,7 +2023,8 @@ int logOplogEntries( void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx, MutableOplogEntry* oplogEntry, - DurableTxnStateEnum durableState) { + DurableTxnStateEnum durableState, + OplogWriter* oplogWriter) { const auto txnRetryCounter = *opCtx->getTxnRetryCounter(); oplogEntry->setOpType(repl::OpTypeEnum::kCommand); @@ -2036,7 +2053,8 @@ void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx, invariant(opCtx->lockState()->isWriteLocked()); WriteUnitOfWork wuow(opCtx); - const auto oplogOpTime = logOperation(opCtx, oplogEntry); + const auto oplogOpTime = + logOperation(opCtx, oplogEntry, true /*assignWallClockTime*/, oplogWriter); invariant(oplogEntry->getOpTime().isNull() || oplogEntry->getOpTime() == oplogOpTime); SessionTxnRecord sessionTxnRecord; @@ -2071,7 +2089,7 @@ void OpObserverImpl::onUnpreparedTransactionCommit(OperationContext* opCtx, // Reserve all the optimes in advance, so we only need to get the optime mutex once. We // reserve enough entries for all statements in the transaction. auto oplogSlots = - repl::getNextOpTimes(opCtx, statements->size() + numberOfPrePostImagesToWrite); + _oplogWriter->getNextOpTimes(opCtx, statements->size() + numberOfPrePostImagesToWrite); // Throw TenantMigrationConflict error if the database for the transaction statements is being // migrated. We only need check the namespace of the first statement since a transaction's @@ -2100,7 +2118,8 @@ void OpObserverImpl::onUnpreparedTransactionCommit(OperationContext* opCtx, &imageToWrite, numberOfPrePostImagesToWrite, false /* prepare*/, - wallClockTime); + wallClockTime, + _oplogWriter.get()); // Write change stream pre-images. At this point the pre-images will be written at the // transaction commit timestamp as driven (implicitly) by the last written "applyOps" oplog @@ -2142,7 +2161,7 @@ void OpObserverImpl::onBatchedWriteCommit(OperationContext* opCtx) { // Reserve all the optimes in advance, so we only need to get the optime mutex once. We // reserve enough entries for all statements in the transaction. - auto oplogSlots = repl::getNextOpTimes(opCtx, batchedOps.size()); + auto oplogSlots = _oplogWriter->getNextOpTimes(opCtx, batchedOps.size()); // Throw TenantMigrationConflict error if the database for the transaction statements is being // migrated. We only need check the namespace of the first statement since a transaction's @@ -2165,7 +2184,8 @@ void OpObserverImpl::onBatchedWriteCommit(OperationContext* opCtx) { &noPrePostImage, 0 /* numberOfPrePostImagesToWrite */, false, - wallClockTime); + wallClockTime, + _oplogWriter.get()); } void OpObserverImpl::onBatchedWriteAbort(OperationContext* opCtx) { @@ -2194,7 +2214,8 @@ void OpObserverImpl::onPreparedTransactionCommit( cmdObj.setCommitTimestamp(commitTimestamp); oplogEntry.setObject(cmdObj.toBSON()); - logCommitOrAbortForPreparedTransaction(opCtx, &oplogEntry, DurableTxnStateEnum::kCommitted); + logCommitOrAbortForPreparedTransaction( + opCtx, &oplogEntry, DurableTxnStateEnum::kCommitted, _oplogWriter.get()); } std::unique_ptr<OpObserver::ApplyOpsOplogSlotAndOperationAssignment> @@ -2260,7 +2281,8 @@ void OpObserverImpl::onTransactionPrepare( &imageToWrite, numberOfPrePostImagesToWrite, true /* prepare */, - wallClockTime); + wallClockTime, + _oplogWriter.get()); if (imageToWrite) { writeToImageCollection(opCtx, *opCtx->getLogicalSessionId(), @@ -2288,7 +2310,8 @@ void OpObserverImpl::onTransactionPrepare( DurableTxnStateEnum::kPrepared, oplogSlot, {}, - true /* updateTxnTable */); + true /* updateTxnTable */, + _oplogWriter.get()); } wuow.commit(); }); @@ -2319,7 +2342,8 @@ void OpObserverImpl::onTransactionAbort(OperationContext* opCtx, AbortTransactionOplogObject cmdObj; oplogEntry.setObject(cmdObj.toBSON()); - logCommitOrAbortForPreparedTransaction(opCtx, &oplogEntry, DurableTxnStateEnum::kAborted); + logCommitOrAbortForPreparedTransaction( + opCtx, &oplogEntry, DurableTxnStateEnum::kAborted, _oplogWriter.get()); } void OpObserverImpl::_onReplicationRollback(OperationContext* opCtx, |