summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2022-07-17 08:23:16 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-07-17 12:52:53 +0000
commitaed8e3a74d7f7bd5fc2d607d86d79636f9127cf2 (patch)
treee267b63a0efdcff3a814856b9e0111f37f6d6430
parent52a596efa4256d42d895ee56ebfa4c7328f83164 (diff)
downloadmongo-aed8e3a74d7f7bd5fc2d607d86d79636f9127cf2.tar.gz
SERVER-67508 redirect OpObserverImpl oplog access through OplogWriter
-rw-r--r--src/mongo/db/op_observer/SConscript1
-rw-r--r--src/mongo/db/op_observer/op_observer_impl.cpp120
2 files changed, 72 insertions, 49 deletions
diff --git a/src/mongo/db/op_observer/SConscript b/src/mongo/db/op_observer/SConscript
index 8cce8f0d18d..7042f75bc9b 100644
--- a/src/mongo/db/op_observer/SConscript
+++ b/src/mongo/db/op_observer/SConscript
@@ -72,7 +72,6 @@ env.Library(
'$BUILD_DIR/mongo/db/pipeline/change_stream_preimage',
'$BUILD_DIR/mongo/db/read_write_concern_defaults',
'$BUILD_DIR/mongo/db/repl/image_collection_entry',
- '$BUILD_DIR/mongo/db/repl/oplog',
'$BUILD_DIR/mongo/db/repl/repl_server_parameters',
'$BUILD_DIR/mongo/db/repl/tenant_migration_access_blocker',
'$BUILD_DIR/mongo/db/s/sharding_api_d',
diff --git a/src/mongo/db/op_observer/op_observer_impl.cpp b/src/mongo/db/op_observer/op_observer_impl.cpp
index d697e4d34d4..055c0fc5150 100644
--- a/src/mongo/db/op_observer/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer/op_observer_impl.cpp
@@ -60,7 +60,6 @@
#include "mongo/db/pipeline/change_stream_preimage_gen.h"
#include "mongo/db/read_write_concern_defaults.h"
#include "mongo/db/repl/image_collection_entry_gen.h"
-#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplog_entry_gen.h"
#include "mongo/db/repl/repl_server_parameters_gen.h"
#include "mongo/db/repl/replication_coordinator.h"
@@ -111,12 +110,13 @@ Date_t getWallClockTimeForOpLog(OperationContext* opCtx) {
repl::OpTime logOperation(OperationContext* opCtx,
MutableOplogEntry* oplogEntry,
- bool assignWallClockTime = true) {
+ bool assignWallClockTime,
+ OplogWriter* oplogWriter) {
if (assignWallClockTime) {
oplogEntry->setWallClockTime(getWallClockTimeForOpLog(opCtx));
}
auto& times = OpObserver::Times::get(opCtx).reservedOpTimes;
- auto opTime = repl::logOp(opCtx, oplogEntry);
+ auto opTime = oplogWriter->logOp(opCtx, oplogEntry);
times.push_back(opTime);
return opTime;
}
@@ -191,13 +191,14 @@ struct ImageBundle {
*/
OpTimeBundle replLogUpdate(OperationContext* opCtx,
const OplogUpdateEntryArgs& args,
- MutableOplogEntry* oplogEntry) {
+ MutableOplogEntry* oplogEntry,
+ OplogWriter* oplogWriter) {
oplogEntry->setTid(args.nss.tenantId());
oplogEntry->setNss(args.nss);
oplogEntry->setUuid(args.uuid);
repl::OplogLink oplogLink;
- repl::appendOplogEntryChainInfo(opCtx, oplogEntry, &oplogLink, args.updateArgs->stmtIds);
+ oplogWriter->appendOplogEntryChainInfo(opCtx, oplogEntry, &oplogLink, args.updateArgs->stmtIds);
OpTimeBundle opTimes;
// We never want to store pre- or post- images when we're migrating oplog entries from another
@@ -227,7 +228,8 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx,
noopEntry.setOpTime(repl::OpTime(reservedOplogSlots.front().getTimestamp(),
reservedOplogSlots.front().getTerm()));
}
- oplogLink.preImageOpTime = logOperation(opCtx, &noopEntry);
+ oplogLink.preImageOpTime =
+ logOperation(opCtx, &noopEntry, true /*assignWallClockTime*/, oplogWriter);
if (storePreImageInOplogForRetryableWrite) {
opTimes.prePostImageOpTime = oplogLink.preImageOpTime;
}
@@ -239,7 +241,8 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx,
MutableOplogEntry noopEntry = *oplogEntry;
noopEntry.setOpType(repl::OpTypeEnum::kNoop);
noopEntry.setObject(args.updateArgs->updatedDoc);
- oplogLink.postImageOpTime = logOperation(opCtx, &noopEntry);
+ oplogLink.postImageOpTime =
+ logOperation(opCtx, &noopEntry, true /*assignWallClockTime*/, oplogWriter);
invariant(opTimes.prePostImageOpTime.isNull());
opTimes.prePostImageOpTime = oplogLink.postImageOpTime;
}
@@ -249,11 +252,12 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx,
oplogEntry->setObject2(args.updateArgs->criteria);
oplogEntry->setFromMigrateIfTrue(args.updateArgs->source == OperationSource::kFromMigrate);
// oplogLink could have been changed to include pre/postImageOpTime by the previous no-op write.
- repl::appendOplogEntryChainInfo(opCtx, oplogEntry, &oplogLink, args.updateArgs->stmtIds);
+ oplogWriter->appendOplogEntryChainInfo(opCtx, oplogEntry, &oplogLink, args.updateArgs->stmtIds);
if (!args.updateArgs->oplogSlots.empty()) {
oplogEntry->setOpTime(args.updateArgs->oplogSlots.back());
}
- opTimes.writeOpTime = logOperation(opCtx, oplogEntry);
+ opTimes.writeOpTime =
+ logOperation(opCtx, oplogEntry, true /*assignWallClockTime*/, oplogWriter);
opTimes.wallClockTime = oplogEntry->getWallClockTime();
return opTimes;
}
@@ -267,14 +271,15 @@ OpTimeBundle replLogDelete(OperationContext* opCtx,
const boost::optional<UUID>& uuid,
StmtId stmtId,
bool fromMigrate,
- const boost::optional<BSONObj>& deletedDoc) {
+ const boost::optional<BSONObj>& deletedDoc,
+ OplogWriter* oplogWriter) {
oplogEntry->setTid(nss.tenantId());
oplogEntry->setNss(nss);
oplogEntry->setUuid(uuid);
oplogEntry->setDestinedRecipient(destinedRecipientDecoration(opCtx));
repl::OplogLink oplogLink;
- repl::appendOplogEntryChainInfo(opCtx, oplogEntry, &oplogLink, {stmtId});
+ oplogWriter->appendOplogEntryChainInfo(opCtx, oplogEntry, &oplogLink, {stmtId});
OpTimeBundle opTimes;
// We never want to store pre-images when we're migrating oplog entries from another
@@ -284,7 +289,7 @@ OpTimeBundle replLogDelete(OperationContext* opCtx,
MutableOplogEntry noopEntry = *oplogEntry;
noopEntry.setOpType(repl::OpTypeEnum::kNoop);
noopEntry.setObject(*deletedDoc);
- auto noteOplog = logOperation(opCtx, &noopEntry);
+ auto noteOplog = logOperation(opCtx, &noopEntry, true /*assignWallClockTime*/, oplogWriter);
opTimes.prePostImageOpTime = noteOplog;
oplogLink.preImageOpTime = noteOplog;
}
@@ -293,8 +298,9 @@ OpTimeBundle replLogDelete(OperationContext* opCtx,
oplogEntry->setObject(repl::documentKeyDecoration(opCtx).get().getShardKeyAndId());
oplogEntry->setFromMigrateIfTrue(fromMigrate);
// oplogLink could have been changed to include preImageOpTime by the previous no-op write.
- repl::appendOplogEntryChainInfo(opCtx, oplogEntry, &oplogLink, {stmtId});
- opTimes.writeOpTime = logOperation(opCtx, oplogEntry);
+ oplogWriter->appendOplogEntryChainInfo(opCtx, oplogEntry, &oplogLink, {stmtId});
+ opTimes.writeOpTime =
+ logOperation(opCtx, oplogEntry, true /*assignWallClockTime*/, oplogWriter);
opTimes.wallClockTime = oplogEntry->getWallClockTime();
return opTimes;
}
@@ -387,7 +393,7 @@ void OpObserverImpl::onCreateIndex(OperationContext* opCtx,
oplogEntry.setUuid(uuid);
oplogEntry.setObject(builder.done());
oplogEntry.setFromMigrateIfTrue(fromMigrate);
- logOperation(opCtx, &oplogEntry);
+ logOperation(opCtx, &oplogEntry, true /*assignWallClockTime*/, _oplogWriter.get());
}
}
@@ -416,7 +422,7 @@ void OpObserverImpl::onStartIndexBuild(OperationContext* opCtx,
oplogEntry.setUuid(collUUID);
oplogEntry.setObject(oplogEntryBuilder.done());
oplogEntry.setFromMigrateIfTrue(fromMigrate);
- logOperation(opCtx, &oplogEntry);
+ logOperation(opCtx, &oplogEntry, true /*assignWallClockTime*/, _oplogWriter.get());
}
void OpObserverImpl::onStartIndexBuildSinglePhase(OperationContext* opCtx,
@@ -481,7 +487,7 @@ void OpObserverImpl::onCommitIndexBuild(OperationContext* opCtx,
oplogEntry.setUuid(collUUID);
oplogEntry.setObject(oplogEntryBuilder.done());
oplogEntry.setFromMigrateIfTrue(fromMigrate);
- logOperation(opCtx, &oplogEntry);
+ logOperation(opCtx, &oplogEntry, true /*assignWallClockTime*/, _oplogWriter.get());
}
void OpObserverImpl::onAbortIndexBuild(OperationContext* opCtx,
@@ -517,7 +523,7 @@ void OpObserverImpl::onAbortIndexBuild(OperationContext* opCtx,
oplogEntry.setUuid(collUUID);
oplogEntry.setObject(oplogEntryBuilder.done());
oplogEntry.setFromMigrateIfTrue(fromMigrate);
- logOperation(opCtx, &oplogEntry);
+ logOperation(opCtx, &oplogEntry, true /*assignWallClockTime*/, _oplogWriter.get());
}
void OpObserverImpl::onInserts(OperationContext* opCtx,
@@ -619,8 +625,8 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
lastWriteDate = getWallClockTimeForOpLog(opCtx);
oplogEntryTemplate.setWallClockTime(lastWriteDate);
- opTimeList =
- repl::logInsertOps(opCtx, &oplogEntryTemplate, first, last, getDestinedRecipientFn);
+ opTimeList = _oplogWriter->logInsertOps(
+ opCtx, &oplogEntryTemplate, first, last, getDestinedRecipientFn);
if (!opTimeList.empty())
lastOpTime = opTimeList.back();
@@ -809,7 +815,7 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
}
}
- opTime = replLogUpdate(opCtx, args, &oplogEntry);
+ opTime = replLogUpdate(opCtx, args, &oplogEntry, _oplogWriter.get());
if (oplogEntry.getNeedsRetryImage()) {
// If the oplog entry has `needsRetryImage`, copy the image into image collection.
@@ -1017,8 +1023,14 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
oplogEntry.setOpTime(args.oplogSlots.back());
}
}
- opTime = replLogDelete(
- opCtx, nss, &oplogEntry, uuid, stmtId, args.fromMigrate, deletedDocForOplog);
+ opTime = replLogDelete(opCtx,
+ nss,
+ &oplogEntry,
+ uuid,
+ stmtId,
+ args.fromMigrate,
+ deletedDocForOplog,
+ _oplogWriter.get());
if (oplogEntry.getNeedsRetryImage()) {
writeToImageCollection(opCtx,
@@ -1105,7 +1117,7 @@ void OpObserverImpl::onInternalOpMessage(
if (slot) {
oplogEntry.setOpTime(*slot);
}
- logOperation(opCtx, &oplogEntry);
+ logOperation(opCtx, &oplogEntry, true /*assignWallClockTime*/, _oplogWriter.get());
}
void OpObserverImpl::onCreateCollection(OperationContext* opCtx,
@@ -1138,7 +1150,7 @@ void OpObserverImpl::onCreateCollection(OperationContext* opCtx,
MutableOplogEntry::makeCreateCollCmdObj(collectionName, options, idIndex));
oplogEntry.setOpTime(createOpTime);
oplogEntry.setFromMigrateIfTrue(fromMigrate);
- logOperation(opCtx, &oplogEntry);
+ logOperation(opCtx, &oplogEntry, true /*assignWallClockTime*/, _oplogWriter.get());
}
}
@@ -1181,7 +1193,7 @@ void OpObserverImpl::onCollMod(OperationContext* opCtx,
oplogEntry.setUuid(uuid);
oplogEntry.setObject(repl::makeCollModCmdObj(collModCmd, oldCollOptions, indexInfo));
oplogEntry.setObject2(o2Builder.done());
- logOperation(opCtx, &oplogEntry);
+ logOperation(opCtx, &oplogEntry, true /*assignWallClockTime*/, _oplogWriter.get());
}
// Make sure the UUID values in the Collection metadata, the Collection object, and the UUID
@@ -1206,7 +1218,7 @@ void OpObserverImpl::onDropDatabase(OperationContext* opCtx, const DatabaseName&
oplogEntry.setTid(dbName.tenantId());
oplogEntry.setNss({dbName, "$cmd"});
oplogEntry.setObject(BSON("dropDatabase" << 1));
- logOperation(opCtx, &oplogEntry);
+ logOperation(opCtx, &oplogEntry, true /*assignWallClockTime*/, _oplogWriter.get());
uassert(50714,
"dropping the admin database is not allowed.",
@@ -1245,7 +1257,7 @@ repl::OpTime OpObserverImpl::onDropCollection(OperationContext* opCtx,
oplogEntry.setFromMigrateIfTrue(markFromMigrate);
oplogEntry.setObject(BSON("drop" << collectionName.coll()));
oplogEntry.setObject2(makeObject2ForDropOrRename(numRecords));
- logOperation(opCtx, &oplogEntry);
+ logOperation(opCtx, &oplogEntry, true /*assignWallClockTime*/, _oplogWriter.get());
}
uassert(50715,
@@ -1294,7 +1306,7 @@ void OpObserverImpl::onDropIndex(OperationContext* opCtx,
oplogEntry.setUuid(uuid);
oplogEntry.setObject(BSON("dropIndexes" << nss.coll() << "index" << indexName));
oplogEntry.setObject2(indexInfo);
- logOperation(opCtx, &oplogEntry);
+ logOperation(opCtx, &oplogEntry, true /*assignWallClockTime*/, _oplogWriter.get());
}
repl::OpTime OpObserverImpl::preRenameCollection(OperationContext* const opCtx,
@@ -1348,7 +1360,7 @@ repl::OpTime OpObserverImpl::preRenameCollection(OperationContext* const opCtx,
oplogEntry.setObject(builder.done());
if (dropTargetUUID)
oplogEntry.setObject2(makeObject2ForDropOrRename(numRecords));
- logOperation(opCtx, &oplogEntry);
+ logOperation(opCtx, &oplogEntry, true /*assignWallClockTime*/, _oplogWriter.get());
return {};
}
@@ -1418,7 +1430,7 @@ void OpObserverImpl::onImportCollection(OperationContext* opCtx,
oplogEntry.setTid(nss.tenantId());
oplogEntry.setNss(nss.getCommandNS());
oplogEntry.setObject(importCollection.toBSON());
- logOperation(opCtx, &oplogEntry);
+ logOperation(opCtx, &oplogEntry, true /*assignWallClockTime*/, _oplogWriter.get());
}
void OpObserverImpl::onApplyOps(OperationContext* opCtx,
@@ -1430,7 +1442,7 @@ void OpObserverImpl::onApplyOps(OperationContext* opCtx,
oplogEntry.setTid(dbName.tenantId());
oplogEntry.setNss({dbName, "$cmd"});
oplogEntry.setObject(applyOpCmd);
- logOperation(opCtx, &oplogEntry);
+ logOperation(opCtx, &oplogEntry, true /*assignWallClockTime*/, _oplogWriter.get());
}
void OpObserverImpl::onEmptyCapped(OperationContext* opCtx,
@@ -1445,7 +1457,7 @@ void OpObserverImpl::onEmptyCapped(OperationContext* opCtx,
oplogEntry.setNss(collectionName.getCommandNS());
oplogEntry.setUuid(uuid);
oplogEntry.setObject(BSON("emptycapped" << collectionName.coll()));
- logOperation(opCtx, &oplogEntry);
+ logOperation(opCtx, &oplogEntry, true /*assignWallClockTime*/, _oplogWriter.get());
}
}
@@ -1743,7 +1755,8 @@ OpTimeBundle logApplyOps(OperationContext* opCtx,
boost::optional<DurableTxnStateEnum> txnState,
boost::optional<repl::OpTime> startOpTime,
std::vector<StmtId> stmtIdsWritten,
- const bool updateTxnTable) {
+ const bool updateTxnTable,
+ OplogWriter* oplogWriter) {
if (!stmtIdsWritten.empty()) {
invariant(isInternalSessionForRetryableWrite(*opCtx->getLogicalSessionId()));
}
@@ -1765,7 +1778,8 @@ OpTimeBundle logApplyOps(OperationContext* opCtx,
try {
OpTimeBundle times;
- times.writeOpTime = logOperation(opCtx, oplogEntry, false /*assignWallClockTime*/);
+ times.writeOpTime =
+ logOperation(opCtx, oplogEntry, false /*assignWallClockTime*/, oplogWriter);
times.wallClockTime = oplogEntry->getWallClockTime();
if (updateTxnTable) {
SessionTxnRecord sessionTxnRecord;
@@ -1821,7 +1835,8 @@ int logOplogEntries(
boost::optional<ImageBundle>* prePostImageToWriteToImageCollection,
size_t numberOfPrePostImagesToWrite,
bool prepare,
- Date_t wallClockTime) {
+ Date_t wallClockTime,
+ OplogWriter* oplogWriter) {
invariant(!stmts->empty());
// Storage transaction commit is the last place inside a transaction that can throw an
@@ -1864,7 +1879,7 @@ int logOplogEntries(
imageEntry.setOpTime(slot);
imageEntry.setDestinedRecipient(statement.getDestinedRecipient());
- logOperation(opCtx, &imageEntry);
+ logOperation(opCtx, &imageEntry, true /*assignWallClockTime*/, oplogWriter);
};
if (numberOfPrePostImagesToWrite > 0 && !migrationRecipientInfo) {
@@ -1986,7 +2001,8 @@ int logOplogEntries(
txnState,
startOpTime,
(lastOp ? std::move(stmtIdsWritten) : std::vector<StmtId>{}),
- updateTxnTable);
+ updateTxnTable,
+ oplogWriter);
hangAfterLoggingApplyOpsForTransaction.pauseWhileSet();
@@ -2007,7 +2023,8 @@ int logOplogEntries(
void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx,
MutableOplogEntry* oplogEntry,
- DurableTxnStateEnum durableState) {
+ DurableTxnStateEnum durableState,
+ OplogWriter* oplogWriter) {
const auto txnRetryCounter = *opCtx->getTxnRetryCounter();
oplogEntry->setOpType(repl::OpTypeEnum::kCommand);
@@ -2036,7 +2053,8 @@ void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx,
invariant(opCtx->lockState()->isWriteLocked());
WriteUnitOfWork wuow(opCtx);
- const auto oplogOpTime = logOperation(opCtx, oplogEntry);
+ const auto oplogOpTime =
+ logOperation(opCtx, oplogEntry, true /*assignWallClockTime*/, oplogWriter);
invariant(oplogEntry->getOpTime().isNull() || oplogEntry->getOpTime() == oplogOpTime);
SessionTxnRecord sessionTxnRecord;
@@ -2071,7 +2089,7 @@ void OpObserverImpl::onUnpreparedTransactionCommit(OperationContext* opCtx,
// Reserve all the optimes in advance, so we only need to get the optime mutex once. We
// reserve enough entries for all statements in the transaction.
auto oplogSlots =
- repl::getNextOpTimes(opCtx, statements->size() + numberOfPrePostImagesToWrite);
+ _oplogWriter->getNextOpTimes(opCtx, statements->size() + numberOfPrePostImagesToWrite);
// Throw TenantMigrationConflict error if the database for the transaction statements is being
// migrated. We only need check the namespace of the first statement since a transaction's
@@ -2100,7 +2118,8 @@ void OpObserverImpl::onUnpreparedTransactionCommit(OperationContext* opCtx,
&imageToWrite,
numberOfPrePostImagesToWrite,
false /* prepare*/,
- wallClockTime);
+ wallClockTime,
+ _oplogWriter.get());
// Write change stream pre-images. At this point the pre-images will be written at the
// transaction commit timestamp as driven (implicitly) by the last written "applyOps" oplog
@@ -2142,7 +2161,7 @@ void OpObserverImpl::onBatchedWriteCommit(OperationContext* opCtx) {
// Reserve all the optimes in advance, so we only need to get the optime mutex once. We
// reserve enough entries for all statements in the transaction.
- auto oplogSlots = repl::getNextOpTimes(opCtx, batchedOps.size());
+ auto oplogSlots = _oplogWriter->getNextOpTimes(opCtx, batchedOps.size());
// Throw TenantMigrationConflict error if the database for the transaction statements is being
// migrated. We only need check the namespace of the first statement since a transaction's
@@ -2165,7 +2184,8 @@ void OpObserverImpl::onBatchedWriteCommit(OperationContext* opCtx) {
&noPrePostImage,
0 /* numberOfPrePostImagesToWrite */,
false,
- wallClockTime);
+ wallClockTime,
+ _oplogWriter.get());
}
void OpObserverImpl::onBatchedWriteAbort(OperationContext* opCtx) {
@@ -2194,7 +2214,8 @@ void OpObserverImpl::onPreparedTransactionCommit(
cmdObj.setCommitTimestamp(commitTimestamp);
oplogEntry.setObject(cmdObj.toBSON());
- logCommitOrAbortForPreparedTransaction(opCtx, &oplogEntry, DurableTxnStateEnum::kCommitted);
+ logCommitOrAbortForPreparedTransaction(
+ opCtx, &oplogEntry, DurableTxnStateEnum::kCommitted, _oplogWriter.get());
}
std::unique_ptr<OpObserver::ApplyOpsOplogSlotAndOperationAssignment>
@@ -2260,7 +2281,8 @@ void OpObserverImpl::onTransactionPrepare(
&imageToWrite,
numberOfPrePostImagesToWrite,
true /* prepare */,
- wallClockTime);
+ wallClockTime,
+ _oplogWriter.get());
if (imageToWrite) {
writeToImageCollection(opCtx,
*opCtx->getLogicalSessionId(),
@@ -2288,7 +2310,8 @@ void OpObserverImpl::onTransactionPrepare(
DurableTxnStateEnum::kPrepared,
oplogSlot,
{},
- true /* updateTxnTable */);
+ true /* updateTxnTable */,
+ _oplogWriter.get());
}
wuow.commit();
});
@@ -2319,7 +2342,8 @@ void OpObserverImpl::onTransactionAbort(OperationContext* opCtx,
AbortTransactionOplogObject cmdObj;
oplogEntry.setObject(cmdObj.toBSON());
- logCommitOrAbortForPreparedTransaction(opCtx, &oplogEntry, DurableTxnStateEnum::kAborted);
+ logCommitOrAbortForPreparedTransaction(
+ opCtx, &oplogEntry, DurableTxnStateEnum::kAborted, _oplogWriter.get());
}
void OpObserverImpl::_onReplicationRollback(OperationContext* opCtx,