summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/catalog/collection.h5
-rw-r--r--src/mongo/db/catalog/collection_impl.cpp9
-rw-r--r--src/mongo/db/catalog/collection_impl.h5
-rw-r--r--src/mongo/db/catalog/collection_mock.h5
-rw-r--r--src/mongo/db/commands/dbcheck.cpp20
-rw-r--r--src/mongo/db/op_observer_impl.cpp736
-rw-r--r--src/mongo/db/op_observer_impl_test.cpp10
-rw-r--r--src/mongo/db/repl/oplog.cpp306
-rw-r--r--src/mongo/db/repl/oplog.h67
-rw-r--r--src/mongo/db/repl/oplog_applier.cpp2
-rw-r--r--src/mongo/db/repl/oplog_entry.cpp55
-rw-r--r--src/mongo/db/repl/oplog_entry.h195
-rw-r--r--src/mongo/db/repl/oplog_entry.idl2
-rw-r--r--src/mongo/db/repl/oplog_test.cpp37
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination.cpp90
-rw-r--r--src/mongo/db/storage/biggie/biggie_record_store.cpp47
-rw-r--r--src/mongo/db/storage/biggie/biggie_record_store.h6
-rw-r--r--src/mongo/db/storage/devnull/devnull_kv_engine.cpp14
-rw-r--r--src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp41
-rw-r--r--src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.h6
-rw-r--r--src/mongo/db/storage/mobile/mobile_record_store.cpp24
-rw-r--r--src/mongo/db/storage/mobile/mobile_record_store.h6
-rw-r--r--src/mongo/db/storage/record_store.h44
-rw-r--r--src/mongo/db/storage/record_store_test_docwriter.h61
-rw-r--r--src/mongo/db/storage/record_store_test_harness.cpp43
-rw-r--r--src/mongo/db/storage/record_store_test_insertrecord.cpp68
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp42
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h6
-rw-r--r--src/mongo/db/transaction_participant_retryable_writes_test.cpp83
29 files changed, 643 insertions, 1392 deletions
diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h
index 7cbaa9d5ced..15057e3124d 100644
--- a/src/mongo/db/catalog/collection.h
+++ b/src/mongo/db/catalog/collection.h
@@ -277,9 +277,8 @@ public:
* this method.
*/
virtual Status insertDocumentsForOplog(OperationContext* const opCtx,
- const DocWriter* const* const docs,
- Timestamp* timestamps,
- const size_t nDocs) = 0;
+ std::vector<Record>* records,
+ const std::vector<Timestamp>& timestamps) = 0;
/**
* Inserts a document into the record store for a bulk loader that manages the index building
diff --git a/src/mongo/db/catalog/collection_impl.cpp b/src/mongo/db/catalog/collection_impl.cpp
index 884f1ca87f3..fdde33beeb5 100644
--- a/src/mongo/db/catalog/collection_impl.cpp
+++ b/src/mongo/db/catalog/collection_impl.cpp
@@ -354,18 +354,15 @@ StatusWithMatchExpression CollectionImpl::parseValidator(
}
Status CollectionImpl::insertDocumentsForOplog(OperationContext* opCtx,
- const DocWriter* const* docs,
- Timestamp* timestamps,
- size_t nDocs) {
+ std::vector<Record>* records,
+ const std::vector<Timestamp>& timestamps) {
dassert(opCtx->lockState()->isWriteLocked());
// Since this is only for the OpLog, we can assume these for simplicity.
- // This also means that we do not need to forward this object to the OpObserver, which is good
- // because it would defeat the purpose of using DocWriter.
invariant(!_validator);
invariant(!_indexCatalog->haveAnyIndexes());
- Status status = _recordStore->insertRecordsWithDocWriter(opCtx, docs, timestamps, nDocs);
+ Status status = _recordStore->insertRecords(opCtx, records, timestamps);
if (!status.isOK())
return status;
diff --git a/src/mongo/db/catalog/collection_impl.h b/src/mongo/db/catalog/collection_impl.h
index 180becf62d8..a9a2c6fbfde 100644
--- a/src/mongo/db/catalog/collection_impl.h
+++ b/src/mongo/db/catalog/collection_impl.h
@@ -167,9 +167,8 @@ public:
* this method.
*/
Status insertDocumentsForOplog(OperationContext* opCtx,
- const DocWriter* const* docs,
- Timestamp* timestamps,
- size_t nDocs) final;
+ std::vector<Record>* records,
+ const std::vector<Timestamp>& timestamps) final;
/**
* Inserts a document into the record store for a bulk loader that manages the index building
diff --git a/src/mongo/db/catalog/collection_mock.h b/src/mongo/db/catalog/collection_mock.h
index a3efa28633c..cb78c52b72f 100644
--- a/src/mongo/db/catalog/collection_mock.h
+++ b/src/mongo/db/catalog/collection_mock.h
@@ -122,9 +122,8 @@ public:
}
Status insertDocumentsForOplog(OperationContext* opCtx,
- const DocWriter* const* docs,
- Timestamp* timestamps,
- size_t nDocs) {
+ std::vector<Record>* records,
+ const std::vector<Timestamp>& timestamps) {
std::abort();
}
diff --git a/src/mongo/db/commands/dbcheck.cpp b/src/mongo/db/commands/dbcheck.cpp
index 5f58079b200..6cbfc7b6bf9 100644
--- a/src/mongo/db/commands/dbcheck.cpp
+++ b/src/mongo/db/commands/dbcheck.cpp
@@ -467,24 +467,18 @@ private:
const NamespaceString& nss,
OptionalCollectionUUID uuid,
const BSONObj& obj) {
+ repl::MutableOplogEntry oplogEntry;
+ oplogEntry.setOpType(repl::OpTypeEnum::kCommand);
+ oplogEntry.setNss(nss);
+ oplogEntry.setUuid(uuid);
+ oplogEntry.setObject(obj);
return writeConflictRetry(
opCtx, "dbCheck oplog entry", NamespaceString::kRsOplogNamespace.ns(), [&] {
auto const clockSource = opCtx->getServiceContext()->getFastClockSource();
- const auto wallClockTime = clockSource->now();
+ oplogEntry.setWallClockTime(clockSource->now());
WriteUnitOfWork uow(opCtx);
- repl::OpTime result = repl::logOp(opCtx,
- "c",
- nss,
- uuid,
- obj,
- nullptr,
- false,
- wallClockTime,
- {},
- kUninitializedStmtId,
- {},
- OplogSlot());
+ repl::OpTime result = repl::logOp(opCtx, &oplogEntry);
uow.commit();
return result;
});
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index b541cd20946..0e08c5f1065 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -65,6 +65,7 @@
namespace mongo {
using repl::OplogEntry;
+using repl::MutableOplogEntry;
namespace {
@@ -76,32 +77,15 @@ constexpr auto kNumRecordsFieldName = "numRecords"_sd;
constexpr auto kMsgFieldName = "msg"_sd;
constexpr long long kInvalidNumRecords = -1LL;
-repl::OpTime logOperation(OperationContext* opCtx,
- const char* opstr,
- const NamespaceString& ns,
- OptionalCollectionUUID uuid,
- const BSONObj& obj,
- const BSONObj* o2,
- bool fromMigrate,
- Date_t wallClockTime,
- const OperationSessionInfo& sessionInfo,
- boost::optional<StmtId> stmtId,
- const repl::OplogLink& oplogLink,
- const OplogSlot& oplogSlot) {
- auto& times = OpObserver::Times::get(opCtx).reservedOpTimes;
- auto opTime = repl::logOp(opCtx,
- opstr,
- ns,
- uuid,
- obj,
- o2,
- fromMigrate,
- wallClockTime,
- sessionInfo,
- stmtId,
- oplogLink,
- oplogSlot);
+Date_t getWallClockTimeForOpLog(OperationContext* opCtx) {
+ auto const clockSource = opCtx->getServiceContext()->getFastClockSource();
+ return clockSource->now();
+}
+repl::OpTime logOperation(OperationContext* opCtx, MutableOplogEntry* oplogEntry) {
+ oplogEntry->setWallClockTime(getWallClockTimeForOpLog(opCtx));
+ auto& times = OpObserver::Times::get(opCtx).reservedOpTimes;
+ auto opTime = repl::logOp(opCtx, oplogEntry);
times.push_back(opTime);
return opTime;
}
@@ -152,11 +136,6 @@ BSONObj makeObject2ForDropOrRename(uint64_t numRecords) {
return obj;
}
-Date_t getWallClockTimeForOpLog(OperationContext* opCtx) {
- auto const clockSource = opCtx->getServiceContext()->getFastClockSource();
- return clockSource->now();
-}
-
struct OpTimeBundle {
repl::OpTime writeOpTime;
repl::OpTime prePostImageOpTime;
@@ -175,32 +154,20 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs&
storeObj = args.updateArgs.updatedDoc;
}
- OperationSessionInfo sessionInfo;
- repl::OplogLink oplogLink;
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setNss(args.nss);
+ oplogEntry.setUuid(args.uuid);
- const auto txnParticipant = TransactionParticipant::get(opCtx);
- if (txnParticipant) {
- sessionInfo.setSessionId(*opCtx->getLogicalSessionId());
- sessionInfo.setTxnNumber(*opCtx->getTxnNumber());
- oplogLink.prevOpTime = txnParticipant.getLastWriteOpTime();
- }
+ repl::OplogLink oplogLink;
+ repl::appendRetryableWriteInfo(opCtx, &oplogEntry, &oplogLink, args.updateArgs.stmtId);
OpTimeBundle opTimes;
- opTimes.wallClockTime = getWallClockTimeForOpLog(opCtx);
if (!storeObj.isEmpty() && opCtx->getTxnNumber()) {
- auto noteUpdateOpTime = logOperation(opCtx,
- "n",
- args.nss,
- args.uuid,
- storeObj,
- nullptr,
- false,
- opTimes.wallClockTime,
- sessionInfo,
- args.updateArgs.stmtId,
- {},
- OplogSlot());
+ MutableOplogEntry noopEntry = oplogEntry;
+ noopEntry.setOpType(repl::OpTypeEnum::kNoop);
+ noopEntry.setObject(std::move(storeObj));
+ auto noteUpdateOpTime = logOperation(opCtx, &noopEntry);
opTimes.prePostImageOpTime = noteUpdateOpTime;
@@ -212,19 +179,14 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs&
}
}
- opTimes.writeOpTime = logOperation(opCtx,
- "u",
- args.nss,
- args.uuid,
- args.updateArgs.update,
- &args.updateArgs.criteria,
- args.updateArgs.fromMigrate,
- opTimes.wallClockTime,
- sessionInfo,
- args.updateArgs.stmtId,
- oplogLink,
- OplogSlot());
-
+ oplogEntry.setOpType(repl::OpTypeEnum::kUpdate);
+ oplogEntry.setObject(args.updateArgs.update);
+ oplogEntry.setObject2(args.updateArgs.criteria);
+ oplogEntry.setFromMigrateIfTrue(args.updateArgs.fromMigrate);
+ // oplogLink could have been changed to include pre/postImageOpTime by the previous no-op write.
+ repl::appendRetryableWriteInfo(opCtx, &oplogEntry, &oplogLink, args.updateArgs.stmtId);
+ opTimes.writeOpTime = logOperation(opCtx, &oplogEntry);
+ opTimes.wallClockTime = oplogEntry.getWallClockTime().get();
return opTimes;
}
@@ -237,79 +199,34 @@ OpTimeBundle replLogDelete(OperationContext* opCtx,
StmtId stmtId,
bool fromMigrate,
const boost::optional<BSONObj>& deletedDoc) {
- OperationSessionInfo sessionInfo;
- repl::OplogLink oplogLink;
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setNss(nss);
+ oplogEntry.setUuid(uuid);
- const auto txnParticipant = TransactionParticipant::get(opCtx);
- if (txnParticipant) {
- sessionInfo.setSessionId(*opCtx->getLogicalSessionId());
- sessionInfo.setTxnNumber(*opCtx->getTxnNumber());
- oplogLink.prevOpTime = txnParticipant.getLastWriteOpTime();
- }
+ repl::OplogLink oplogLink;
+ repl::appendRetryableWriteInfo(opCtx, &oplogEntry, &oplogLink, stmtId);
OpTimeBundle opTimes;
- opTimes.wallClockTime = getWallClockTimeForOpLog(opCtx);
if (deletedDoc && opCtx->getTxnNumber()) {
- auto noteOplog = logOperation(opCtx,
- "n",
- nss,
- uuid,
- deletedDoc.get(),
- nullptr,
- false,
- opTimes.wallClockTime,
- sessionInfo,
- stmtId,
- {},
- OplogSlot());
+ MutableOplogEntry noopEntry = oplogEntry;
+ noopEntry.setOpType(repl::OpTypeEnum::kNoop);
+ noopEntry.setObject(deletedDoc.get());
+ auto noteOplog = logOperation(opCtx, &noopEntry);
opTimes.prePostImageOpTime = noteOplog;
oplogLink.preImageOpTime = noteOplog;
}
- auto& documentKey = documentKeyDecoration(opCtx);
- opTimes.writeOpTime = logOperation(opCtx,
- "d",
- nss,
- uuid,
- documentKey,
- nullptr,
- fromMigrate,
- opTimes.wallClockTime,
- sessionInfo,
- stmtId,
- oplogLink,
- OplogSlot());
+ oplogEntry.setOpType(repl::OpTypeEnum::kDelete);
+ oplogEntry.setObject(documentKeyDecoration(opCtx));
+ oplogEntry.setFromMigrateIfTrue(fromMigrate);
+ // oplogLink could have been changed to include preImageOpTime by the previous no-op write.
+ repl::appendRetryableWriteInfo(opCtx, &oplogEntry, &oplogLink, stmtId);
+ opTimes.writeOpTime = logOperation(opCtx, &oplogEntry);
+ opTimes.wallClockTime = oplogEntry.getWallClockTime().get();
return opTimes;
}
-/**
- * Write oplog entry for applyOps/atomic transaction operations.
- */
-OpTimeBundle replLogApplyOps(OperationContext* opCtx,
- const NamespaceString& cmdNss,
- const BSONObj& applyOpCmd,
- const OperationSessionInfo& sessionInfo,
- boost::optional<StmtId> stmtId,
- const repl::OplogLink& oplogLink,
- const OplogSlot& oplogSlot) {
- OpTimeBundle times;
- times.wallClockTime = getWallClockTimeForOpLog(opCtx);
- times.writeOpTime = logOperation(opCtx,
- "c",
- cmdNss,
- {},
- applyOpCmd,
- nullptr,
- false,
- times.wallClockTime,
- sessionInfo,
- stmtId,
- oplogLink,
- oplogSlot);
- return times;
-}
-
} // namespace
BSONObj OpObserverImpl::getDocumentKey(OperationContext* opCtx,
@@ -324,7 +241,6 @@ void OpObserverImpl::onCreateIndex(OperationContext* opCtx,
CollectionUUID uuid,
BSONObj indexDoc,
bool fromMigrate) {
-
BSONObjBuilder builder;
builder.append("createIndexes", nss.coll());
@@ -333,18 +249,13 @@ void OpObserverImpl::onCreateIndex(OperationContext* opCtx,
builder.append(e);
}
- logOperation(opCtx,
- "c",
- nss.getCommandNS(),
- uuid,
- builder.done(),
- nullptr,
- fromMigrate,
- getWallClockTimeForOpLog(opCtx),
- {},
- kUninitializedStmtId,
- {},
- OplogSlot());
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setOpType(repl::OpTypeEnum::kCommand);
+ oplogEntry.setNss(nss.getCommandNS());
+ oplogEntry.setUuid(uuid);
+ oplogEntry.setObject(builder.done());
+ oplogEntry.setFromMigrateIfTrue(fromMigrate);
+ logOperation(opCtx, &oplogEntry);
}
void OpObserverImpl::onStartIndexBuild(OperationContext* opCtx,
@@ -369,18 +280,13 @@ void OpObserverImpl::onStartIndexBuild(OperationContext* opCtx,
}
indexesArr.done();
- logOperation(opCtx,
- "c",
- nss.getCommandNS(),
- collUUID,
- oplogEntryBuilder.done(),
- nullptr,
- fromMigrate,
- getWallClockTimeForOpLog(opCtx),
- {},
- kUninitializedStmtId,
- {},
- OplogSlot());
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setOpType(repl::OpTypeEnum::kCommand);
+ oplogEntry.setNss(nss.getCommandNS());
+ oplogEntry.setUuid(collUUID);
+ oplogEntry.setObject(oplogEntryBuilder.done());
+ oplogEntry.setFromMigrateIfTrue(fromMigrate);
+ logOperation(opCtx, &oplogEntry);
}
void OpObserverImpl::onCommitIndexBuild(OperationContext* opCtx,
@@ -405,18 +311,13 @@ void OpObserverImpl::onCommitIndexBuild(OperationContext* opCtx,
}
indexesArr.done();
- logOperation(opCtx,
- "c",
- nss.getCommandNS(),
- collUUID,
- oplogEntryBuilder.done(),
- nullptr,
- fromMigrate,
- getWallClockTimeForOpLog(opCtx),
- {},
- kUninitializedStmtId,
- {},
- OplogSlot());
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setOpType(repl::OpTypeEnum::kCommand);
+ oplogEntry.setNss(nss.getCommandNS());
+ oplogEntry.setUuid(collUUID);
+ oplogEntry.setObject(oplogEntryBuilder.done());
+ oplogEntry.setFromMigrateIfTrue(fromMigrate);
+ logOperation(opCtx, &oplogEntry);
}
void OpObserverImpl::onAbortIndexBuild(OperationContext* opCtx,
@@ -441,18 +342,13 @@ void OpObserverImpl::onAbortIndexBuild(OperationContext* opCtx,
}
indexesArr.done();
- logOperation(opCtx,
- "c",
- nss.getCommandNS(),
- collUUID,
- oplogEntryBuilder.done(),
- nullptr,
- fromMigrate,
- getWallClockTimeForOpLog(opCtx),
- {},
- kUninitializedStmtId,
- {},
- OplogSlot());
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setOpType(repl::OpTypeEnum::kCommand);
+ oplogEntry.setNss(nss.getCommandNS());
+ oplogEntry.setUuid(collUUID);
+ oplogEntry.setObject(oplogEntryBuilder.done());
+ oplogEntry.setFromMigrateIfTrue(fromMigrate);
+ logOperation(opCtx, &oplogEntry);
}
void OpObserverImpl::onInserts(OperationContext* opCtx,
@@ -478,12 +374,18 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
return;
}
for (auto iter = first; iter != last; iter++) {
- auto operation = OplogEntry::makeInsertOperation(nss, uuid, iter->doc);
+ auto operation = MutableOplogEntry::makeInsertOperation(nss, uuid, iter->doc);
txnParticipant.addTransactionOperation(opCtx, operation);
}
} else {
+ MutableOplogEntry oplogEntryTemplate;
+ oplogEntryTemplate.setNss(nss);
+ oplogEntryTemplate.setUuid(uuid);
+ oplogEntryTemplate.setFromMigrateIfTrue(fromMigrate);
lastWriteDate = getWallClockTimeForOpLog(opCtx);
- opTimeList = repl::logInsertOps(opCtx, nss, uuid, first, last, fromMigrate, lastWriteDate);
+ oplogEntryTemplate.setWallClockTime(lastWriteDate);
+
+ opTimeList = repl::logInsertOps(opCtx, &oplogEntryTemplate, first, last);
if (!opTimeList.empty())
lastOpTime = opTimeList.back();
@@ -553,7 +455,7 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
OpTimeBundle opTime;
if (inMultiDocumentTransaction) {
- auto operation = OplogEntry::makeUpdateOperation(
+ auto operation = MutableOplogEntry::makeUpdateOperation(
args.nss, args.uuid, args.updateArgs.update, args.updateArgs.criteria);
txnParticipant.addTransactionOperation(opCtx, operation);
} else {
@@ -614,8 +516,8 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
OpTimeBundle opTime;
if (inMultiDocumentTransaction) {
- auto operation =
- OplogEntry::makeDeleteOperation(nss, uuid, deletedDoc ? deletedDoc.get() : documentKey);
+ auto operation = MutableOplogEntry::makeDeleteOperation(
+ nss, uuid, deletedDoc ? deletedDoc.get() : documentKey);
txnParticipant.addTransactionOperation(opCtx, operation);
} else {
opTime = replLogDelete(opCtx, nss, uuid, stmtId, fromMigrate, deletedDoc);
@@ -656,19 +558,13 @@ void OpObserverImpl::onInternalOpMessage(OperationContext* opCtx,
const boost::optional<UUID> uuid,
const BSONObj& msgObj,
const boost::optional<BSONObj> o2MsgObj) {
- const BSONObj* o2MsgPtr = o2MsgObj ? o2MsgObj.get_ptr() : nullptr;
- logOperation(opCtx,
- "n",
- nss,
- uuid,
- msgObj,
- o2MsgPtr,
- false,
- getWallClockTimeForOpLog(opCtx),
- {},
- kUninitializedStmtId,
- {},
- OplogSlot());
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setOpType(repl::OpTypeEnum::kNoop);
+ oplogEntry.setNss(nss);
+ oplogEntry.setUuid(uuid);
+ oplogEntry.setObject(msgObj);
+ oplogEntry.setObject2(o2MsgObj);
+ logOperation(opCtx, &oplogEntry);
}
void OpObserverImpl::onCreateCollection(OperationContext* opCtx,
@@ -677,24 +573,15 @@ void OpObserverImpl::onCreateCollection(OperationContext* opCtx,
const CollectionOptions& options,
const BSONObj& idIndex,
const OplogSlot& createOpTime) {
- const auto cmdNss = collectionName.getCommandNS();
-
- const auto cmdObj = makeCreateCollCmdObj(collectionName, options, idIndex);
-
if (!collectionName.isSystemDotProfile()) {
// do not replicate system.profile modifications
- logOperation(opCtx,
- "c",
- cmdNss,
- options.uuid,
- cmdObj,
- nullptr,
- false,
- getWallClockTimeForOpLog(opCtx),
- {},
- kUninitializedStmtId,
- {},
- createOpTime);
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setOpType(repl::OpTypeEnum::kCommand);
+ oplogEntry.setNss(collectionName.getCommandNS());
+ oplogEntry.setUuid(options.uuid);
+ oplogEntry.setObject(makeCreateCollCmdObj(collectionName, options, idIndex));
+ oplogEntry.setOpTime(createOpTime);
+ logOperation(opCtx, &oplogEntry);
}
}
@@ -704,35 +591,25 @@ void OpObserverImpl::onCollMod(OperationContext* opCtx,
const BSONObj& collModCmd,
const CollectionOptions& oldCollOptions,
boost::optional<TTLCollModInfo> ttlInfo) {
- const auto cmdNss = nss.getCommandNS();
-
- // Create the 'o' field object.
- const auto cmdObj = makeCollModCmdObj(collModCmd, oldCollOptions, ttlInfo);
-
- // Create the 'o2' field object. We save the old collection metadata and TTL expiration.
- BSONObjBuilder o2Builder;
- o2Builder.append("collectionOptions_old", oldCollOptions.toBSON());
- if (ttlInfo) {
- auto oldExpireAfterSeconds = durationCount<Seconds>(ttlInfo->oldExpireAfterSeconds);
- o2Builder.append("expireAfterSeconds_old", oldExpireAfterSeconds);
- }
-
- const auto o2Obj = o2Builder.done();
if (!nss.isSystemDotProfile()) {
// do not replicate system.profile modifications
- logOperation(opCtx,
- "c",
- cmdNss,
- uuid,
- cmdObj,
- &o2Obj,
- false,
- getWallClockTimeForOpLog(opCtx),
- {},
- kUninitializedStmtId,
- {},
- OplogSlot());
+
+ // Create the 'o2' field object. We save the old collection metadata and TTL expiration.
+ BSONObjBuilder o2Builder;
+ o2Builder.append("collectionOptions_old", oldCollOptions.toBSON());
+ if (ttlInfo) {
+ auto oldExpireAfterSeconds = durationCount<Seconds>(ttlInfo->oldExpireAfterSeconds);
+ o2Builder.append("expireAfterSeconds_old", oldExpireAfterSeconds);
+ }
+
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setOpType(repl::OpTypeEnum::kCommand);
+ oplogEntry.setNss(nss.getCommandNS());
+ oplogEntry.setUuid(uuid);
+ oplogEntry.setObject(makeCollModCmdObj(collModCmd, oldCollOptions, ttlInfo));
+ oplogEntry.setObject2(o2Builder.done());
+ logOperation(opCtx, &oplogEntry);
}
// Make sure the UUID values in the Collection metadata, the Collection object, and the UUID
@@ -752,21 +629,11 @@ void OpObserverImpl::onCollMod(OperationContext* opCtx,
}
void OpObserverImpl::onDropDatabase(OperationContext* opCtx, const std::string& dbName) {
- const NamespaceString cmdNss{dbName, "$cmd"};
- const auto cmdObj = BSON("dropDatabase" << 1);
-
- logOperation(opCtx,
- "c",
- cmdNss,
- {},
- cmdObj,
- nullptr,
- false,
- getWallClockTimeForOpLog(opCtx),
- {},
- kUninitializedStmtId,
- {},
- OplogSlot());
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setOpType(repl::OpTypeEnum::kCommand);
+ oplogEntry.setNss({dbName, "$cmd"});
+ oplogEntry.setObject(BSON("dropDatabase" << 1));
+ logOperation(opCtx, &oplogEntry);
uassert(
50714, "dropping the admin database is not allowed.", dbName != NamespaceString::kAdminDb);
@@ -781,24 +648,15 @@ repl::OpTime OpObserverImpl::onDropCollection(OperationContext* opCtx,
OptionalCollectionUUID uuid,
std::uint64_t numRecords,
const CollectionDropType dropType) {
- const auto cmdNss = collectionName.getCommandNS();
- const auto cmdObj = BSON("drop" << collectionName.coll());
- const auto obj2 = makeObject2ForDropOrRename(numRecords);
-
if (!collectionName.isSystemDotProfile()) {
// Do not replicate system.profile modifications.
- logOperation(opCtx,
- "c",
- cmdNss,
- uuid,
- cmdObj,
- &obj2,
- false,
- getWallClockTimeForOpLog(opCtx),
- {},
- kUninitializedStmtId,
- {},
- OplogSlot());
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setOpType(repl::OpTypeEnum::kCommand);
+ oplogEntry.setNss(collectionName.getCommandNS());
+ oplogEntry.setUuid(uuid);
+ oplogEntry.setObject(BSON("drop" << collectionName.coll()));
+ oplogEntry.setObject2(makeObject2ForDropOrRename(numRecords));
+ logOperation(opCtx, &oplogEntry);
}
uassert(50715,
@@ -819,21 +677,13 @@ void OpObserverImpl::onDropIndex(OperationContext* opCtx,
OptionalCollectionUUID uuid,
const std::string& indexName,
const BSONObj& indexInfo) {
- const auto cmdNss = nss.getCommandNS();
- const auto cmdObj = BSON("dropIndexes" << nss.coll() << "index" << indexName);
-
- logOperation(opCtx,
- "c",
- cmdNss,
- uuid,
- cmdObj,
- &indexInfo,
- false,
- getWallClockTimeForOpLog(opCtx),
- {},
- kUninitializedStmtId,
- {},
- OplogSlot());
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setOpType(repl::OpTypeEnum::kCommand);
+ oplogEntry.setNss(nss.getCommandNS());
+ oplogEntry.setUuid(uuid);
+ oplogEntry.setObject(BSON("dropIndexes" << nss.coll() << "index" << indexName));
+ oplogEntry.setObject2(indexInfo);
+ logOperation(opCtx, &oplogEntry);
}
@@ -844,8 +694,6 @@ repl::OpTime OpObserverImpl::preRenameCollection(OperationContext* const opCtx,
OptionalCollectionUUID dropTargetUUID,
std::uint64_t numRecords,
bool stayTemp) {
- const auto cmdNss = fromCollection.getCommandNS();
-
BSONObjBuilder builder;
builder.append("renameCollection", fromCollection.ns());
builder.append("to", toCollection.ns());
@@ -854,26 +702,14 @@ repl::OpTime OpObserverImpl::preRenameCollection(OperationContext* const opCtx,
dropTargetUUID->appendToBuilder(&builder, "dropTarget");
}
- const auto cmdObj = builder.done();
- BSONObj obj2;
- BSONObj* obj2Ptr = nullptr;
- if (dropTargetUUID) {
- obj2 = makeObject2ForDropOrRename(numRecords);
- obj2Ptr = &obj2;
- }
-
- logOperation(opCtx,
- "c",
- cmdNss,
- uuid,
- cmdObj,
- obj2Ptr,
- false,
- getWallClockTimeForOpLog(opCtx),
- {},
- kUninitializedStmtId,
- {},
- OplogSlot());
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setOpType(repl::OpTypeEnum::kCommand);
+ oplogEntry.setNss(fromCollection.getCommandNS());
+ oplogEntry.setUuid(uuid);
+ oplogEntry.setObject(builder.done());
+ if (dropTargetUUID)
+ oplogEntry.setObject2(makeObject2ForDropOrRename(numRecords));
+ logOperation(opCtx, &oplogEntry);
return {};
}
@@ -905,31 +741,24 @@ void OpObserverImpl::onRenameCollection(OperationContext* const opCtx,
void OpObserverImpl::onApplyOps(OperationContext* opCtx,
const std::string& dbName,
const BSONObj& applyOpCmd) {
- const NamespaceString cmdNss{dbName, "$cmd"};
-
- replLogApplyOps(opCtx, cmdNss, applyOpCmd, {}, kUninitializedStmtId, {}, OplogSlot());
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setOpType(repl::OpTypeEnum::kCommand);
+ oplogEntry.setNss({dbName, "$cmd"});
+ oplogEntry.setObject(applyOpCmd);
+ logOperation(opCtx, &oplogEntry);
}
void OpObserverImpl::onEmptyCapped(OperationContext* opCtx,
const NamespaceString& collectionName,
OptionalCollectionUUID uuid) {
- const auto cmdNss = collectionName.getCommandNS();
- const auto cmdObj = BSON("emptycapped" << collectionName.coll());
-
if (!collectionName.isSystemDotProfile()) {
// Do not replicate system.profile modifications
- logOperation(opCtx,
- "c",
- cmdNss,
- uuid,
- cmdObj,
- nullptr,
- false,
- getWallClockTimeForOpLog(opCtx),
- {},
- kUninitializedStmtId,
- {},
- OplogSlot());
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setOpType(repl::OpTypeEnum::kCommand);
+ oplogEntry.setNss(collectionName.getCommandNS());
+ oplogEntry.setUuid(uuid);
+ oplogEntry.setObject(BSON("emptycapped" << collectionName.coll()));
+ logOperation(opCtx, &oplogEntry);
}
}
@@ -985,69 +814,28 @@ std::vector<repl::ReplOperation>::const_iterator packTransactionStatementsForApp
// builder object already has an 'applyOps' field appended pointing to the desired array of ops
// i.e. { "applyOps" : [op1, op2, ...] }
//
-// @param prepare determines whether a 'prepare' field will be attached to the written oplog entry.
-// @param isPartialTxn is this applyOps entry part of an in-progress multi oplog entry transaction.
-// Should be set for all non-terminal ops of an unprepared multi oplog entry transaction.
-// @param shouldWriteStateField determines whether a 'state' field will be included in the write to
-// the transactions table. Only meaningful if 'updateTxnTable' is true.
-// @param updateTxnTable determines whether the transactions table will updated after the oplog
-// entry is written.
+// @param txnState the 'state' field of the transaction table entry update.
// @param startOpTime the optime of the 'startOpTime' field of the transaction table entry update.
// If boost::none, no 'startOpTime' field will be included in the new transaction table entry. Only
// meaningful if 'updateTxnTable' is true.
+// @param updateTxnTable determines whether the transactions table will updated after the oplog
+// entry is written.
//
// Returns the optime of the written oplog entry.
OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx,
- BSONObjBuilder* applyOpsBuilder,
- const OplogSlot& oplogSlot,
- repl::OpTime prevWriteOpTime,
- boost::optional<StmtId> stmtId,
- const bool prepare,
- const bool isPartialTxn,
- const bool shouldWriteStateField,
- const bool updateTxnTable,
- boost::optional<long long> count,
- boost::optional<repl::OpTime> startOpTime) {
-
- // A 'prepare' oplog entry should never include a 'partialTxn' field.
- invariant(!(isPartialTxn && prepare));
-
- const NamespaceString cmdNss{"admin", "$cmd"};
-
- OperationSessionInfo sessionInfo;
- repl::OplogLink oplogLink;
- sessionInfo.setSessionId(*opCtx->getLogicalSessionId());
- sessionInfo.setTxnNumber(*opCtx->getTxnNumber());
-
- oplogLink.prevOpTime = prevWriteOpTime;
+ MutableOplogEntry* oplogEntry,
+ boost::optional<DurableTxnStateEnum> txnState,
+ boost::optional<repl::OpTime> startOpTime,
+ const bool updateTxnTable) {
+ oplogEntry->setOpType(repl::OpTypeEnum::kCommand);
+ oplogEntry->setNss({"admin", "$cmd"});
+ oplogEntry->setSessionId(opCtx->getLogicalSessionId());
+ oplogEntry->setTxnNumber(opCtx->getTxnNumber());
try {
- if (prepare) {
- applyOpsBuilder->append("prepare", true);
- }
- if (isPartialTxn) {
- applyOpsBuilder->append("partialTxn", true);
- }
- if (count) {
- applyOpsBuilder->append("count", *count);
- }
- auto applyOpCmd = applyOpsBuilder->done();
- auto times =
- replLogApplyOps(opCtx, cmdNss, applyOpCmd, sessionInfo, stmtId, oplogLink, oplogSlot);
-
- auto txnState = [&]() -> boost::optional<DurableTxnStateEnum> {
- if (!shouldWriteStateField) {
- invariant(!prepare);
- return boost::none;
- }
-
- if (isPartialTxn) {
- return DurableTxnStateEnum::kInProgress;
- }
-
- return prepare ? DurableTxnStateEnum::kPrepared : DurableTxnStateEnum::kCommitted;
- }();
-
+ OpTimeBundle times;
+ times.writeOpTime = logOperation(opCtx, oplogEntry);
+ times.wallClockTime = oplogEntry->getWallClockTime().get();
if (updateTxnTable) {
SessionTxnRecord sessionTxnRecord;
sessionTxnRecord.setLastWriteOpTime(times.writeOpTime);
@@ -1067,37 +855,6 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx,
MONGO_UNREACHABLE;
}
-// Log a single applyOps for transactions without any attempt to pack operations. If the given
-// statements would exceed the maximum BSON size limit of a single oplog entry, this will throw a
-// TransactionTooLarge error.
-// TODO(SERVER-41470): Remove this function once old transaction format is no longer needed.
-OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx,
- const std::vector<repl::ReplOperation>& statements,
- const OplogSlot& oplogSlot,
- repl::OpTime prevWriteOpTime,
- boost::optional<StmtId> stmtId,
- const bool prepare,
- const bool isPartialTxn,
- const bool shouldWriteStateField,
- const bool updateTxnTable,
- boost::optional<long long> count,
- boost::optional<repl::OpTime> startOpTime) {
- BSONObjBuilder applyOpsBuilder;
- packTransactionStatementsForApplyOps(
- &applyOpsBuilder, statements.begin(), statements.end(), false /* limitSize */);
- return logApplyOpsForTransaction(opCtx,
- &applyOpsBuilder,
- oplogSlot,
- prevWriteOpTime,
- stmtId,
- prepare,
- isPartialTxn,
- shouldWriteStateField,
- updateTxnTable,
- count,
- startOpTime);
-}
-
// Logs transaction oplog entries for preparing a transaction or committing an unprepared
// transaction. This includes the in-progress 'partialTxn' oplog entries followed by the implicit
// prepare or commit entry. If the 'prepare' argument is true, it will log entries for a prepared
@@ -1153,9 +910,24 @@ int logOplogEntriesForTransaction(OperationContext* opCtx,
// commit or implicit prepare, i.e. we omit the 'partialTxn' field.
auto firstOp = stmtsIter == stmts.begin();
auto lastOp = nextStmt == stmts.end();
- auto isPartialTxn = !lastOp;
+
auto implicitCommit = lastOp && !prepare;
auto implicitPrepare = lastOp && prepare;
+ auto isPartialTxn = !lastOp;
+ // A 'prepare' oplog entry should never include a 'partialTxn' field.
+ invariant(!(isPartialTxn && implicitPrepare));
+ if (implicitPrepare) {
+ applyOpsBuilder.append("prepare", true);
+ }
+ if (isPartialTxn) {
+ applyOpsBuilder.append("partialTxn", true);
+ }
+
+ // The 'count' field gives the total number of individual operations in the
+ // transaction, and is included on a non-initial implicit commit or prepare entry.
+ if (lastOp && !firstOp) {
+ applyOpsBuilder.append("count", static_cast<long long>(stmts.size()));
+ }
// For both prepared and unprepared transactions, update the transactions table on
// the first and last op.
@@ -1177,21 +949,16 @@ int logOplogEntriesForTransaction(OperationContext* opCtx,
// no longer be set.
auto startOpTime = boost::make_optional(!implicitCommit, firstOpTimeOfTxn);
- // The 'count' field gives the total number of individual operations in the
- // transaction, and is included on a non-initial implicit commit or prepare entry.
- auto count =
- (lastOp && !firstOp) ? boost::optional<long long>(stmts.size()) : boost::none;
- prevWriteOpTime = logApplyOpsForTransaction(opCtx,
- &applyOpsBuilder,
- oplogSlot,
- prevWriteOpTime.writeOpTime,
- boost::none /* stmtId */,
- implicitPrepare,
- isPartialTxn,
- true /* shouldWriteStateField */,
- updateTxnTable,
- count,
- startOpTime);
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setOpTime(oplogSlot);
+ oplogEntry.setPrevWriteOpTimeInTransaction(prevWriteOpTime.writeOpTime);
+ oplogEntry.setObject(applyOpsBuilder.done());
+ auto txnState = isPartialTxn ? DurableTxnStateEnum::kInProgress
+ : (implicitPrepare ? DurableTxnStateEnum::kPrepared
+ : DurableTxnStateEnum::kCommitted);
+ prevWriteOpTime = logApplyOpsForTransaction(
+ opCtx, &oplogEntry, txnState, startOpTime, updateTxnTable);
+
// Advance the iterator to the beginning of the remaining unpacked statements.
stmtsIter = nextStmt;
numEntriesWritten++;
@@ -1202,20 +969,14 @@ int logOplogEntriesForTransaction(OperationContext* opCtx,
}
void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx,
- const OplogSlot& oplogSlot,
- const BSONObj& objectField,
+ MutableOplogEntry* oplogEntry,
DurableTxnStateEnum durableState) {
- const NamespaceString cmdNss{"admin", "$cmd"};
-
- OperationSessionInfo sessionInfo;
- repl::OplogLink oplogLink;
- sessionInfo.setSessionId(*opCtx->getLogicalSessionId());
- sessionInfo.setTxnNumber(*opCtx->getTxnNumber());
-
- auto txnParticipant = TransactionParticipant::get(opCtx);
- oplogLink.prevOpTime = txnParticipant.getLastWriteOpTime();
-
- const auto wallClockTime = getWallClockTimeForOpLog(opCtx);
+ oplogEntry->setOpType(repl::OpTypeEnum::kCommand);
+ oplogEntry->setNss({"admin", "$cmd"});
+ oplogEntry->setSessionId(opCtx->getLogicalSessionId());
+ oplogEntry->setTxnNumber(opCtx->getTxnNumber());
+ oplogEntry->setPrevWriteOpTimeInTransaction(
+ TransactionParticipant::get(opCtx).getLastWriteOpTime());
// There should not be a parent WUOW outside of this one. This guarantees the safety of the
// write conflict retry loop.
@@ -1234,23 +995,12 @@ void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx,
invariant(opCtx->lockState()->isWriteLocked());
WriteUnitOfWork wuow(opCtx);
- const auto oplogOpTime = logOperation(opCtx,
- "c",
- cmdNss,
- {} /* uuid */,
- objectField,
- nullptr /* o2 */,
- false /* fromMigrate */,
- wallClockTime,
- sessionInfo,
- boost::none /* stmtId */,
- oplogLink,
- oplogSlot);
- invariant(oplogSlot.isNull() || oplogSlot == oplogOpTime);
+ const auto oplogOpTime = logOperation(opCtx, oplogEntry);
+ invariant(oplogEntry->getOpTime().isNull() || oplogEntry->getOpTime() == oplogOpTime);
SessionTxnRecord sessionTxnRecord;
sessionTxnRecord.setLastWriteOpTime(oplogOpTime);
- sessionTxnRecord.setLastWriteDate(wallClockTime);
+ sessionTxnRecord.setLastWriteDate(oplogEntry->getWallClockTime().get());
sessionTxnRecord.setState(durableState);
onWriteOpCompleted(opCtx, {}, sessionTxnRecord);
wuow.commit();
@@ -1281,18 +1031,22 @@ void OpObserverImpl::onUnpreparedTransactionCommit(
auto txnParticipant = TransactionParticipant::get(opCtx);
const auto lastWriteOpTime = txnParticipant.getLastWriteOpTime();
invariant(lastWriteOpTime.isNull());
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setPrevWriteOpTimeInTransaction(lastWriteOpTime);
+ oplogEntry.setStatementId(StmtId(0));
+
+ BSONObjBuilder applyOpsBuilder;
+ // TODO(SERVER-41470): Remove limitSize==false once old transaction format is no longer
+ // needed.
+ packTransactionStatementsForApplyOps(
+ &applyOpsBuilder, statements.begin(), statements.end(), false /* limitSize */);
+ oplogEntry.setObject(applyOpsBuilder.done());
+
+ auto txnState = boost::make_optional(
+ fcv >= ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo42,
+ DurableTxnStateEnum::kCommitted);
commitOpTime = logApplyOpsForTransaction(
- opCtx,
- statements,
- OplogSlot(),
- lastWriteOpTime,
- StmtId(0),
- false /* prepare */,
- false /* isPartialTxn */,
- fcv >= ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo42,
- true /* updateTxnTable */,
- boost::none,
- boost::none)
+ opCtx, &oplogEntry, txnState, boost::none, true /* updateTxnTable */)
.writeOpTime;
} else {
// Reserve all the optimes in advance, so we only need to get the optime mutex once. We
@@ -1321,10 +1075,14 @@ void OpObserverImpl::onPreparedTransactionCommit(
invariant(!commitTimestamp.isNull());
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setOpTime(commitOplogEntryOpTime);
+
CommitTransactionOplogObject cmdObj;
cmdObj.setCommitTimestamp(commitTimestamp);
- logCommitOrAbortForPreparedTransaction(
- opCtx, commitOplogEntryOpTime, cmdObj.toBSON(), DurableTxnStateEnum::kCommitted);
+ oplogEntry.setObject(cmdObj.toBSON());
+
+ logCommitOrAbortForPreparedTransaction(opCtx, &oplogEntry, DurableTxnStateEnum::kCommitted);
}
void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx,
@@ -1361,18 +1119,24 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx,
auto txnParticipant = TransactionParticipant::get(opCtx);
const auto lastWriteOpTime = txnParticipant.getLastWriteOpTime();
invariant(lastWriteOpTime.isNull());
- logApplyOpsForTransaction(
- opCtx,
- statements,
- prepareOpTime,
- lastWriteOpTime,
- boost::none /* stmtId */,
- true /* prepare */,
- false /* isPartialTxn */,
+
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setOpTime(prepareOpTime);
+ oplogEntry.setPrevWriteOpTimeInTransaction(lastWriteOpTime);
+
+ BSONObjBuilder applyOpsBuilder;
+ // TODO(SERVER-41470): Remove limitSize==false once old transaction format is no
+ // longer needed.
+ packTransactionStatementsForApplyOps(
+ &applyOpsBuilder, statements.begin(), statements.end(), false /* limitSize */);
+ applyOpsBuilder.append("prepare", true);
+ oplogEntry.setObject(applyOpsBuilder.done());
+
+ auto txnState = boost::make_optional(
fcv >= ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo42,
- true /* updateTxnTable */,
- boost::none /* count */,
- prepareOpTime /* startOpTime */);
+ DurableTxnStateEnum::kPrepared);
+ logApplyOpsForTransaction(
+ opCtx, &oplogEntry, txnState, prepareOpTime, true /* updateTxnTable */);
wuow.commit();
});
} else {
@@ -1404,18 +1168,18 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx,
BSONObjBuilder applyOpsBuilder;
BSONArrayBuilder opsArray(applyOpsBuilder.subarrayStart("applyOps"_sd));
opsArray.done();
+ applyOpsBuilder.append("prepare", true);
+
auto oplogSlot = reservedSlots.front();
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setOpTime(oplogSlot);
+ oplogEntry.setPrevWriteOpTimeInTransaction(repl::OpTime());
+ oplogEntry.setObject(applyOpsBuilder.done());
logApplyOpsForTransaction(opCtx,
- &applyOpsBuilder,
+ &oplogEntry,
+ DurableTxnStateEnum::kPrepared,
oplogSlot,
- repl::OpTime() /* prevWriteOpTime */,
- boost::none /* stmtId */,
- true /* prepare */,
- false /* isPartialTxn */,
- true /* shouldWriteStateField */,
- true /* updateTxnTable */,
- boost::none /* count */,
- oplogSlot);
+ true /* updateTxnTable */);
}
wuow.commit();
});
@@ -1440,9 +1204,13 @@ void OpObserverImpl::onTransactionAbort(OperationContext* opCtx,
return;
}
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setOpTime(*abortOplogEntryOpTime);
+
AbortTransactionOplogObject cmdObj;
- logCommitOrAbortForPreparedTransaction(
- opCtx, *abortOplogEntryOpTime, cmdObj.toBSON(), DurableTxnStateEnum::kAborted);
+ oplogEntry.setObject(cmdObj.toBSON());
+
+ logCommitOrAbortForPreparedTransaction(opCtx, &oplogEntry, DurableTxnStateEnum::kAborted);
}
void OpObserverImpl::onReplicationRollback(OperationContext* opCtx,
diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp
index d627e6f1bbe..bd333d78ecc 100644
--- a/src/mongo/db/op_observer_impl_test.cpp
+++ b/src/mongo/db/op_observer_impl_test.cpp
@@ -517,9 +517,9 @@ TEST_F(OpObserverTest, MultipleAboutToDeleteAndOnDelete) {
AutoGetDb autoDb(opCtx.get(), nss.db(), MODE_X);
WriteUnitOfWork wunit(opCtx.get());
opObserver.aboutToDelete(opCtx.get(), nss, BSON("_id" << 1));
- opObserver.onDelete(opCtx.get(), nss, uuid, {}, false, {});
+ opObserver.onDelete(opCtx.get(), nss, uuid, kUninitializedStmtId, false, {});
opObserver.aboutToDelete(opCtx.get(), nss, BSON("_id" << 1));
- opObserver.onDelete(opCtx.get(), nss, uuid, {}, false, {});
+ opObserver.onDelete(opCtx.get(), nss, uuid, kUninitializedStmtId, false, {});
}
DEATH_TEST_F(OpObserverTest, AboutToDeleteMustPreceedOnDelete, "invariant") {
@@ -527,7 +527,7 @@ DEATH_TEST_F(OpObserverTest, AboutToDeleteMustPreceedOnDelete, "invariant") {
auto opCtx = cc().makeOperationContext();
opCtx->swapLockState(std::make_unique<LockerNoop>());
NamespaceString nss = {"test", "coll"};
- opObserver.onDelete(opCtx.get(), nss, {}, {}, false, {});
+ opObserver.onDelete(opCtx.get(), nss, {}, kUninitializedStmtId, false, {});
}
DEATH_TEST_F(OpObserverTest, EachOnDeleteRequiresAboutToDelete, "invariant") {
@@ -536,8 +536,8 @@ DEATH_TEST_F(OpObserverTest, EachOnDeleteRequiresAboutToDelete, "invariant") {
opCtx->swapLockState(std::make_unique<LockerNoop>());
NamespaceString nss = {"test", "coll"};
opObserver.aboutToDelete(opCtx.get(), nss, {});
- opObserver.onDelete(opCtx.get(), nss, {}, {}, false, {});
- opObserver.onDelete(opCtx.get(), nss, {}, {}, false, {});
+ opObserver.onDelete(opCtx.get(), nss, {}, kUninitializedStmtId, false, {});
+ opObserver.onDelete(opCtx.get(), nss, {}, kUninitializedStmtId, false, {});
}
DEATH_TEST_F(OpObserverTest,
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 19176f7478b..490a2a6acf0 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -120,44 +120,6 @@ void checkOplogInsert(Status result) {
massert(17322, str::stream() << "write to oplog failed: " << result.toString(), result.isOK());
}
-/**
- * This allows us to stream the oplog entry directly into data region
- * main goal is to avoid copying the o portion
- * which can be very large
- * TODO: can have this build the entire doc
- */
-class OplogDocWriter final : public DocWriter {
-public:
- OplogDocWriter(BSONObj frame, BSONObj oField)
- : _frame(std::move(frame)), _oField(std::move(oField)) {}
-
- void writeDocument(char* start) const {
- char* buf = start;
-
- memcpy(buf, _frame.objdata(), _frame.objsize() - 1); // don't copy final EOO
-
- DataView(buf).write<LittleEndian<int>>(documentSize());
-
- buf += (_frame.objsize() - 1);
- buf[0] = (char)Object;
- buf[1] = 'o';
- buf[2] = 0;
- memcpy(buf + 3, _oField.objdata(), _oField.objsize());
- buf += 3 + _oField.objsize();
- buf[0] = EOO;
-
- verify(static_cast<size_t>((buf + 1) - start) == documentSize()); // DEV?
- }
-
- size_t documentSize() const {
- return _frame.objsize() + _oField.objsize() + 1 /* type */ + 2 /* "o" */;
- }
-
-private:
- BSONObj _frame;
- BSONObj _oField;
-};
-
bool shouldBuildInForeground(OperationContext* opCtx,
const BSONObj& index,
const NamespaceString& indexNss,
@@ -328,88 +290,6 @@ void createIndexForApplyOps(OperationContext* opCtx,
}
}
-namespace {
-
-/**
- * Attaches the session information of a write to an oplog entry if it exists.
- */
-void appendSessionInfo(OperationContext* opCtx,
- BSONObjBuilder* builder,
- boost::optional<StmtId> statementId,
- const OperationSessionInfo& sessionInfo,
- const OplogLink& oplogLink) {
- if (!sessionInfo.getTxnNumber()) {
- return;
- }
-
- // Note: certain non-transaction operations, like implicit collection creation will have an
- // uninitialized statementId.
- if (statementId == kUninitializedStmtId) {
- return;
- }
-
- sessionInfo.serialize(builder);
-
- // Only non-transaction operations will have a statementId.
- if (statementId) {
- builder->append(OplogEntryBase::kStatementIdFieldName, *statementId);
- }
- oplogLink.prevOpTime.append(builder,
- OplogEntryBase::kPrevWriteOpTimeInTransactionFieldName.toString());
-
- if (!oplogLink.preImageOpTime.isNull()) {
- oplogLink.preImageOpTime.append(builder,
- OplogEntryBase::kPreImageOpTimeFieldName.toString());
- }
-
- if (!oplogLink.postImageOpTime.isNull()) {
- oplogLink.postImageOpTime.append(builder,
- OplogEntryBase::kPostImageOpTimeFieldName.toString());
- }
-}
-
-OplogDocWriter _logOpWriter(OperationContext* opCtx,
- const char* opstr,
- const NamespaceString& nss,
- OptionalCollectionUUID uuid,
- const BSONObj& obj,
- const BSONObj* o2,
- bool fromMigrate,
- OpTime optime,
- Date_t wallTime,
- const OperationSessionInfo& sessionInfo,
- boost::optional<StmtId> statementId,
- const OplogLink& oplogLink) {
- BSONObjBuilder b(256);
-
- b.append("ts", optime.getTimestamp());
- if (optime.getTerm() != -1)
- b.append("t", optime.getTerm());
-
- // Always write zero hash instead of using FCV to gate this for retryable writes
- // and change stream, who expect to be able to read oplog across FCV's.
- b.append("h", 0LL);
- b.append("v", OplogEntry::kOplogVersion);
- b.append("op", opstr);
- b.append("ns", nss.ns());
- if (uuid)
- uuid->appendToBuilder(&b, "ui");
-
- if (fromMigrate)
- b.appendBool("fromMigrate", true);
-
- if (o2)
- b.append("o2", *o2);
-
- invariant(wallTime != Date_t{});
- b.appendDate(OplogEntryBase::kWallClockTimeFieldName, wallTime);
-
- appendSessionInfo(opCtx, &b, statementId, sessionInfo, oplogLink);
-
- return OplogDocWriter(OplogDocWriter(b.obj(), obj));
-}
-} // end anon namespace
-
/* we write to local.oplog.rs:
{ ts : ..., h: ..., v: ..., op: ..., etc }
ts: an OpTime timestamp
@@ -425,17 +305,16 @@ OplogDocWriter _logOpWriter(OperationContext* opCtx,
/*
- * writers - an array with size nDocs of DocWriter objects.
- * timestamps - an array with size nDocs of respective Timestamp objects for each DocWriter.
+ * records - a vector of oplog records to be written.
+ * timestamps - a vector of respective Timestamp objects for each oplog record.
* oplogCollection - collection to be written to.
- * finalOpTime - the OpTime of the last DocWriter object.
- * wallTime - the wall clock time of the corresponding oplog entry.
+ * finalOpTime - the OpTime of the last oplog record.
+ * wallTime - the wall clock time of the last oplog record.
*/
void _logOpsInner(OperationContext* opCtx,
const NamespaceString& nss,
- const DocWriter* const* writers,
- Timestamp* timestamps,
- size_t nDocs,
+ std::vector<Record>* records,
+ const std::vector<Timestamp>& timestamps,
Collection* oplogCollection,
OpTime finalOpTime,
Date_t wallTime) {
@@ -446,9 +325,7 @@ void _logOpsInner(OperationContext* opCtx,
str::stream() << "logOp() but can't accept write to collection " << nss.ns());
}
- // we jump through a bunch of hoops here to avoid copying the obj buffer twice --
- // instead we do a single copy to the destination in the record store.
- checkOplogInsert(oplogCollection->insertDocumentsForOplog(opCtx, writers, timestamps, nDocs));
+ checkOplogInsert(oplogCollection->insertDocumentsForOplog(opCtx, records, timestamps));
// Set replCoord last optime only after we're sure the WUOW didn't abort and roll back.
opCtx->recoveryUnit()->onCommit(
@@ -479,42 +356,26 @@ void _logOpsInner(OperationContext* opCtx,
});
}
-OpTime logOp(OperationContext* opCtx,
- const char* opstr,
- const NamespaceString& nss,
- OptionalCollectionUUID uuid,
- const BSONObj& obj,
- const BSONObj* o2,
- bool fromMigrate,
- Date_t wallClockTime,
- const OperationSessionInfo& sessionInfo,
- boost::optional<StmtId> statementId,
- const OplogLink& oplogLink,
- const OplogSlot& oplogSlot) {
+OpTime logOp(OperationContext* opCtx, MutableOplogEntry* oplogEntry) {
// All collections should have UUIDs now, so all insert, update, and delete oplog entries should
// also have uuids. Some no-op (n) and command (c) entries may still elide the uuid field.
- invariant(uuid || 'n' == *opstr || 'c' == *opstr,
- str::stream() << "Expected uuid for logOp with opstr: " << opstr << ", nss: "
- << nss.ns()
- << ", obj: "
- << obj
- << ", os: "
- << o2);
+ invariant(oplogEntry->getUuid() || oplogEntry->getOpType() == OpTypeEnum::kNoop ||
+ oplogEntry->getOpType() == OpTypeEnum::kCommand,
+ str::stream() << "Expected uuid for logOp with oplog entry: "
+ << redact(oplogEntry->toBSON()));
auto replCoord = ReplicationCoordinator::get(opCtx);
// For commands, the test below is on the command ns and therefore does not check for
// specific namespaces such as system.profile. This is the caller's responsibility.
- if (replCoord->isOplogDisabledFor(opCtx, nss)) {
- invariant(statementId);
+ if (replCoord->isOplogDisabledFor(opCtx, oplogEntry->getNss())) {
uassert(ErrorCodes::IllegalOperation,
str::stream() << "retryable writes is not supported for unreplicated ns: "
- << nss.ns(),
- *statementId == kUninitializedStmtId);
+ << oplogEntry->getNss().ns(),
+ !oplogEntry->getStatementId());
return {};
}
auto oplogInfo = LocalOplogInfo::get(opCtx);
-
// Obtain Collection exclusive intent write lock for non-document-locking storage engines.
boost::optional<Lock::DBLock> dbWriteLock;
boost::optional<Lock::CollectionLock> collWriteLock;
@@ -523,43 +384,47 @@ OpTime logOp(OperationContext* opCtx,
collWriteLock.emplace(opCtx, oplogInfo->getOplogCollectionName(), MODE_IX);
}
- OplogSlot slot;
+ // If an OpTime is not specified (i.e. isNull), a new OpTime will be assigned to the oplog entry
+ // within the WUOW. If a new OpTime is assigned, it needs to be reset back to a null OpTime
+ // before exiting this function so that the same oplog entry instance can be reused for logOp()
+ // again. For example, if the WUOW gets aborted within a writeConflictRetry loop, we need to
+ // reset the OpTime to null so a new OpTime will be assigned on retry.
+ OplogSlot slot = oplogEntry->getOpTime();
+ auto resetOpTimeGuard = makeGuard([&, resetOpTimeOnExit = bool(slot.isNull()) ] {
+ if (resetOpTimeOnExit)
+ oplogEntry->setOpTime(OplogSlot());
+ });
+
WriteUnitOfWork wuow(opCtx);
- if (oplogSlot.isNull()) {
+ if (slot.isNull()) {
slot = oplogInfo->getNextOpTimes(opCtx, 1U)[0];
- } else {
- slot = oplogSlot;
+ // TODO: make the oplogEntry a const reference instead of using the guard.
+ oplogEntry->setOpTime(slot);
}
auto oplog = oplogInfo->getCollection();
- auto writer = _logOpWriter(opCtx,
- opstr,
- nss,
- uuid,
- obj,
- o2,
- fromMigrate,
- slot,
- wallClockTime,
- sessionInfo,
- statementId,
- oplogLink);
- const DocWriter* basePtr = &writer;
- auto timestamp = slot.getTimestamp();
- _logOpsInner(opCtx, nss, &basePtr, &timestamp, 1, oplog, slot, wallClockTime);
+ auto wallClockTime = oplogEntry->getWallClockTime();
+ invariant(wallClockTime);
+
+ auto bsonOplogEntry = oplogEntry->toBSON();
+ // The storage engine will assign the RecordId based on the "ts" field of the oplog entry, see
+ // oploghack::extractKey.
+ std::vector<Record> records{
+ {RecordId(), RecordData(bsonOplogEntry.objdata(), bsonOplogEntry.objsize())}};
+ std::vector<Timestamp> timestamps{slot.getTimestamp()};
+ _logOpsInner(opCtx, oplogEntry->getNss(), &records, timestamps, oplog, slot, *wallClockTime);
wuow.commit();
return slot;
}
std::vector<OpTime> logInsertOps(OperationContext* opCtx,
- const NamespaceString& nss,
- OptionalCollectionUUID uuid,
+ MutableOplogEntry* oplogEntryTemplate,
std::vector<InsertStatement>::const_iterator begin,
- std::vector<InsertStatement>::const_iterator end,
- bool fromMigrate,
- Date_t wallClockTime) {
+ std::vector<InsertStatement>::const_iterator end) {
invariant(begin != end);
+ oplogEntryTemplate->setOpType(repl::OpTypeEnum::kInsert);
+ auto nss = oplogEntryTemplate->getNss();
auto replCoord = ReplicationCoordinator::get(opCtx);
if (replCoord->isOplogDisabledFor(opCtx, nss)) {
uassert(ErrorCodes::IllegalOperation,
@@ -570,8 +435,6 @@ std::vector<OpTime> logInsertOps(OperationContext* opCtx,
}
const size_t count = end - begin;
- std::vector<OplogDocWriter> writers;
- writers.reserve(count);
auto oplogInfo = LocalOplogInfo::get(opCtx);
// Obtain Collection exclusive intent write lock for non-document-locking storage engines.
@@ -584,40 +447,34 @@ std::vector<OpTime> logInsertOps(OperationContext* opCtx,
WriteUnitOfWork wuow(opCtx);
- OperationSessionInfo sessionInfo;
- OplogLink oplogLink;
-
- const auto txnParticipant = TransactionParticipant::get(opCtx);
- if (txnParticipant) {
- sessionInfo.setSessionId(*opCtx->getLogicalSessionId());
- sessionInfo.setTxnNumber(*opCtx->getTxnNumber());
- oplogLink.prevOpTime = txnParticipant.getLastWriteOpTime();
- }
-
- auto timestamps = std::make_unique<Timestamp[]>(count);
- std::vector<OpTime> opTimes;
+ std::vector<OpTime> opTimes(count);
+ std::vector<Timestamp> timestamps(count);
+ std::vector<BSONObj> bsonOplogEntries(count);
+ std::vector<Record> records(count);
for (size_t i = 0; i < count; i++) {
+ // Make a copy from the template for each insert oplog entry.
+ MutableOplogEntry oplogEntry = *oplogEntryTemplate;
// Make a mutable copy.
auto insertStatementOplogSlot = begin[i].oplogSlot;
// Fetch optime now, if not already fetched.
if (insertStatementOplogSlot.isNull()) {
insertStatementOplogSlot = oplogInfo->getNextOpTimes(opCtx, 1U)[0];
}
- writers.emplace_back(_logOpWriter(opCtx,
- "i",
- nss,
- uuid,
- begin[i].doc,
- nullptr,
- fromMigrate,
- insertStatementOplogSlot,
- wallClockTime,
- sessionInfo,
- begin[i].stmtId,
- oplogLink));
- oplogLink.prevOpTime = insertStatementOplogSlot;
- timestamps[i] = oplogLink.prevOpTime.getTimestamp();
- opTimes.push_back(insertStatementOplogSlot);
+ oplogEntry.setObject(begin[i].doc);
+ oplogEntry.setOpTime(insertStatementOplogSlot);
+
+ OplogLink oplogLink;
+ if (i > 0)
+ oplogLink.prevOpTime = opTimes[i - 1];
+ appendRetryableWriteInfo(opCtx, &oplogEntry, &oplogLink, begin[i].stmtId);
+
+ opTimes[i] = insertStatementOplogSlot;
+ timestamps[i] = insertStatementOplogSlot.getTimestamp();
+ bsonOplogEntries[i] = oplogEntry.toBSON();
+ // The storage engine will assign the RecordId based on the "ts" field of the oplog entry,
+ // see oploghack::extractKey.
+ records[i] = Record{
+ RecordId(), RecordData(bsonOplogEntries[i].objdata(), bsonOplogEntries[i].objsize())};
}
MONGO_FAIL_POINT_BLOCK(sleepBetweenInsertOpTimeGenerationAndLogOp, customWait) {
@@ -628,21 +485,42 @@ std::vector<OpTime> logInsertOps(OperationContext* opCtx,
sleepmillis(numMillis);
}
- std::unique_ptr<DocWriter const* []> basePtrs(new DocWriter const*[count]);
- for (size_t i = 0; i < count; i++) {
- basePtrs[i] = &writers[i];
- }
-
invariant(!opTimes.empty());
auto lastOpTime = opTimes.back();
invariant(!lastOpTime.isNull());
auto oplog = oplogInfo->getCollection();
- _logOpsInner(
- opCtx, nss, basePtrs.get(), timestamps.get(), count, oplog, lastOpTime, wallClockTime);
+ auto wallClockTime = oplogEntryTemplate->getWallClockTime();
+ invariant(wallClockTime);
+ _logOpsInner(opCtx, nss, &records, timestamps, oplog, lastOpTime, *wallClockTime);
wuow.commit();
return opTimes;
}
+void appendRetryableWriteInfo(OperationContext* opCtx,
+ MutableOplogEntry* oplogEntry,
+ OplogLink* oplogLink,
+ StmtId stmtId) {
+ // Not a retryable write.
+ if (stmtId == kUninitializedStmtId)
+ return;
+
+ const auto txnParticipant = TransactionParticipant::get(opCtx);
+ invariant(txnParticipant);
+ oplogEntry->setSessionId(opCtx->getLogicalSessionId());
+ oplogEntry->setTxnNumber(opCtx->getTxnNumber());
+ oplogEntry->setStatementId(stmtId);
+ if (oplogLink->prevOpTime.isNull()) {
+ oplogLink->prevOpTime = txnParticipant.getLastWriteOpTime();
+ }
+ oplogEntry->setPrevWriteOpTimeInTransaction(oplogLink->prevOpTime);
+ if (!oplogLink->preImageOpTime.isNull()) {
+ oplogEntry->setPreImageOpTime(oplogLink->preImageOpTime);
+ }
+ if (!oplogLink->postImageOpTime.isNull()) {
+ oplogEntry->setPostImageOpTime(oplogLink->postImageOpTime);
+ }
+}
+
namespace {
long long getNewOplogSizeBytes(OperationContext* opCtx, const ReplSettings& replSettings) {
if (replSettings.getOplogSizeBytes() != 0) {
diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h
index eb898a973b3..7bf42539624 100644
--- a/src/mongo/db/repl/oplog.h
+++ b/src/mongo/db/repl/oplog.h
@@ -80,6 +80,22 @@ struct OplogLink {
};
/**
+ * Set the "lsid", "txnNumber", "stmtId", "prevOpTime", "preImageOpTime" and "postImageOpTime"
+ * fields of the oplogEntry based on the given oplogLink for retryable writes (i.e. when stmtId !=
+ * kUninitializedStmtId).
+ *
+ * If the given oplogLink.prevOpTime is a null OpTime, both the oplogLink.prevOpTime and the
+ * "prevOpTime" field of the oplogEntry will be set to the TransactionParticipant's lastWriteOpTime.
+ * The "preImageOpTime" field will only be set if the given oplogLink.preImageOpTime is not null.
+ * Similarly, the "postImageOpTime" field will only be set if the given oplogLink.postImageOpTime is
+ * not null.
+ */
+void appendRetryableWriteInfo(OperationContext* opCtx,
+ MutableOplogEntry* oplogEntry,
+ OplogLink* oplogLink,
+ StmtId stmtId);
+
+/**
* Create a new capped collection for the oplog if it doesn't yet exist.
* If the collection already exists (and isReplSet is false),
* set the 'last' Timestamp from the last entry of the oplog collection (side effect!)
@@ -96,53 +112,26 @@ void createOplog(OperationContext* opCtx);
/**
* Log insert(s) to the local oplog.
* Returns the OpTime of every insert.
+ * @param oplogEntryTemplate: a template used to generate insert oplog entries. Callers must set the
+ * "ns", "ui", "fromMigrate" and "wall" fields before calling this function. This function will then
+ * augment the template with the "op" (which is set to kInsert), "lsid" and "txnNumber" fields if
+ * necessary.
+ * @param begin/end: first/last InsertStatement to be inserted. This function iterates from begin to
+ * end and generates insert oplog entries based on the augmented oplogEntryTemplate with the "ts",
+ * "t", "o", "prevOpTime" and "stmtId" fields replaced by the content of each InsertStatement
+ * defined by the begin-end range.
+ *
*/
std::vector<OpTime> logInsertOps(OperationContext* opCtx,
- const NamespaceString& nss,
- OptionalCollectionUUID uuid,
+ MutableOplogEntry* oplogEntryTemplate,
std::vector<InsertStatement>::const_iterator begin,
- std::vector<InsertStatement>::const_iterator end,
- bool fromMigrate,
- Date_t wallClockTime);
+ std::vector<InsertStatement>::const_iterator end);
/**
- * @param opstr
- * "i" insert
- * "u" update
- * "d" delete
- * "c" db cmd
- * "n" no-op
- * "db" declares presence of a database (ns is set to the db name + '.')
- *
- * For 'u' records, 'obj' captures the mutation made to the object but not
- * the object itself. 'o2' captures the the criteria for the object that will be modified.
- *
- * wallClockTime this specifies the wall-clock timestamp of then this oplog entry was generated. It
- * is purely informational, may not be monotonically increasing and is not interpreted in any way
- * by the replication subsystem.
- * stmtId specifies the statementId of an operation. For transaction operations, stmtId is always
- * boost::none.
- * oplogLink this contains the timestamp that points to the previous write that will be
- * linked via prevTs, and the timestamps of the oplog entry that contains the document
- * before/after update was applied. The timestamps are ignored if isNull() is true.
- * prepare this specifies if the oplog entry should be put into a 'prepare' state.
- * oplogSlot If non-null, use this reserved oplog slot instead of a new one.
- *
* Returns the optime of the oplog entry written to the oplog.
* Returns a null optime if oplog was not modified.
*/
-OpTime logOp(OperationContext* opCtx,
- const char* opstr,
- const NamespaceString& ns,
- OptionalCollectionUUID uuid,
- const BSONObj& obj,
- const BSONObj* o2,
- bool fromMigrate,
- Date_t wallClockTime,
- const OperationSessionInfo& sessionInfo,
- boost::optional<StmtId> stmtId,
- const OplogLink& oplogLink,
- const OplogSlot& oplogSlot);
+OpTime logOp(OperationContext* opCtx, MutableOplogEntry* oplogEntry);
// Flush out the cached pointer to the oplog.
void clearLocalOplogPtr();
diff --git a/src/mongo/db/repl/oplog_applier.cpp b/src/mongo/db/repl/oplog_applier.cpp
index 711bce604eb..a9879471570 100644
--- a/src/mongo/db/repl/oplog_applier.cpp
+++ b/src/mongo/db/repl/oplog_applier.cpp
@@ -233,7 +233,7 @@ StatusWith<OplogApplier::Operations> OplogApplier::getNextApplierBatch(
while (_oplogBuffer->peek(opCtx, &op)) {
auto entry = OplogEntry(op);
- // Check for oplog version change. If it is absent, its value is one.
+ // Check for oplog version change.
if (entry.getVersion() != OplogEntry::kOplogVersion) {
std::string message = str::stream()
<< "expected oplog version " << OplogEntry::kOplogVersion << " but found version "
diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp
index 6903a1a029b..57a8eb033ba 100644
--- a/src/mongo/db/repl/oplog_entry.cpp
+++ b/src/mongo/db/repl/oplog_entry.cpp
@@ -153,12 +153,12 @@ BSONObj makeOplogEntryDoc(OpTime opTime,
} // namespace
-const int OplogEntry::kOplogVersion = 2;
+const int MutableOplogEntry::kOplogVersion = 2;
// Static
-ReplOperation OplogEntry::makeInsertOperation(const NamespaceString& nss,
- boost::optional<UUID> uuid,
- const BSONObj& docToInsert) {
+ReplOperation MutableOplogEntry::makeInsertOperation(const NamespaceString& nss,
+ boost::optional<UUID> uuid,
+ const BSONObj& docToInsert) {
ReplOperation op;
op.setOpType(OpTypeEnum::kInsert);
op.setNss(nss);
@@ -167,10 +167,10 @@ ReplOperation OplogEntry::makeInsertOperation(const NamespaceString& nss,
return op;
}
-ReplOperation OplogEntry::makeUpdateOperation(const NamespaceString nss,
- boost::optional<UUID> uuid,
- const BSONObj& update,
- const BSONObj& criteria) {
+ReplOperation MutableOplogEntry::makeUpdateOperation(const NamespaceString nss,
+ boost::optional<UUID> uuid,
+ const BSONObj& update,
+ const BSONObj& criteria) {
ReplOperation op;
op.setOpType(OpTypeEnum::kUpdate);
op.setNss(nss);
@@ -180,9 +180,9 @@ ReplOperation OplogEntry::makeUpdateOperation(const NamespaceString nss,
return op;
}
-ReplOperation OplogEntry::makeDeleteOperation(const NamespaceString& nss,
- boost::optional<UUID> uuid,
- const BSONObj& docToDelete) {
+ReplOperation MutableOplogEntry::makeDeleteOperation(const NamespaceString& nss,
+ boost::optional<UUID> uuid,
+ const BSONObj& docToDelete) {
ReplOperation op;
op.setOpType(OpTypeEnum::kDelete);
op.setNss(nss);
@@ -191,6 +191,31 @@ ReplOperation OplogEntry::makeDeleteOperation(const NamespaceString& nss,
return op;
}
+StatusWith<MutableOplogEntry> MutableOplogEntry::parse(const BSONObj& object) {
+ try {
+ MutableOplogEntry oplogEntry;
+ oplogEntry.parseProtected(IDLParserErrorContext("OplogEntryBase"), object);
+ return oplogEntry;
+ } catch (...) {
+ return exceptionToStatus();
+ }
+ MONGO_UNREACHABLE;
+}
+
+void MutableOplogEntry::setOpTime(const OpTime& opTime)& {
+ setTimestamp(opTime.getTimestamp());
+ if (opTime.getTerm() != OpTime::kUninitializedTerm)
+ setTerm(opTime.getTerm());
+}
+
+OpTime MutableOplogEntry::getOpTime() const {
+ long long term = OpTime::kUninitializedTerm;
+ if (getTerm()) {
+ term = getTerm().get();
+ }
+ return OpTime(getTimestamp(), term);
+}
+
size_t OplogEntry::getDurableReplOperationSize(const DurableReplOperation& op) {
return sizeof(op) + op.getNss().size() + op.getObject().objsize() +
(op.getObject2() ? op.getObject2()->objsize() : 0);
@@ -310,14 +335,6 @@ int OplogEntry::getRawObjSizeBytes() const {
return _raw.objsize();
}
-OpTime OplogEntry::getOpTime() const {
- long long term = OpTime::kUninitializedTerm;
- if (getTerm()) {
- term = getTerm().get();
- }
- return OpTime(getTimestamp(), term);
-}
-
std::string OplogEntry::toString() const {
return _raw.toString();
}
diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h
index 8a07c462325..7d98aee00f0 100644
--- a/src/mongo/db/repl/oplog_entry.h
+++ b/src/mongo/db/repl/oplog_entry.h
@@ -66,57 +66,140 @@ private:
};
/**
- * A parsed oplog entry that privately inherits from the OplogEntryBase parsed by the IDL.
+ * Mutable class used on primary to build up oplog entries progressively.
+ */
+class MutableOplogEntry : public OplogEntryBase {
+public:
+ // Current oplog version, should be the value of the v field in all oplog entries.
+ static const int kOplogVersion;
+
+ // Helpers to generate ReplOperation.
+ static ReplOperation makeInsertOperation(const NamespaceString& nss,
+ boost::optional<UUID> uuid,
+ const BSONObj& docToInsert);
+ static ReplOperation makeUpdateOperation(const NamespaceString nss,
+ boost::optional<UUID> uuid,
+ const BSONObj& update,
+ const BSONObj& criteria);
+ static ReplOperation makeDeleteOperation(const NamespaceString& nss,
+ boost::optional<UUID> uuid,
+ const BSONObj& docToDelete);
+
+ static StatusWith<MutableOplogEntry> parse(const BSONObj& object);
+
+ MutableOplogEntry() : OplogEntryBase() {}
+
+ void setSessionId(boost::optional<LogicalSessionId> value) & {
+ getOperationSessionInfo().setSessionId(std::move(value));
+ }
+
+ void setTxnNumber(boost::optional<std::int64_t> value) & {
+ getOperationSessionInfo().setTxnNumber(std::move(value));
+ }
+
+ void setOpType(OpTypeEnum value) & {
+ getDurableReplOperation().setOpType(std::move(value));
+ }
+
+ void setNss(NamespaceString value) & {
+ getDurableReplOperation().setNss(std::move(value));
+ }
+
+ void setUuid(boost::optional<UUID> value) & {
+ getDurableReplOperation().setUuid(std::move(value));
+ }
+
+ void setObject(BSONObj value) & {
+ getDurableReplOperation().setObject(std::move(value));
+ }
+
+ void setObject2(boost::optional<BSONObj> value) & {
+ getDurableReplOperation().setObject2(std::move(value));
+ }
+
+ void setUpsert(boost::optional<bool> value) & {
+ getDurableReplOperation().setUpsert(std::move(value));
+ }
+
+ /**
+ * Sets the OpTime of the oplog entry.
+ */
+ void setOpTime(const OpTime& opTime) &;
+
+ /**
+ * Returns the OpTime of the oplog entry.
+ */
+ OpTime getOpTime() const;
+
+ /**
+ * Same as setFromMigrate but only set when it is true.
+ */
+ void setFromMigrateIfTrue(bool value) & {
+ if (value)
+ setFromMigrate(value);
+ }
+};
+
+/**
+ * A parsed oplog entry that privately inherits from the MutableOplogEntry.
* This class is immutable. All setters are hidden.
*/
-class OplogEntry : private OplogEntryBase {
+class OplogEntry : private MutableOplogEntry {
public:
// Make field names accessible.
- using OplogEntryBase::kDurableReplOperationFieldName;
- using OplogEntryBase::kOperationSessionInfoFieldName;
- using OplogEntryBase::k_idFieldName;
- using OplogEntryBase::kFromMigrateFieldName;
- using OplogEntryBase::kHashFieldName;
- using OplogEntryBase::kNssFieldName;
- using OplogEntryBase::kObjectFieldName;
- using OplogEntryBase::kObject2FieldName;
- using OplogEntryBase::kOpTypeFieldName;
- using OplogEntryBase::kPostImageOpTimeFieldName;
- using OplogEntryBase::kPreImageOpTimeFieldName;
- using OplogEntryBase::kPrevWriteOpTimeInTransactionFieldName;
- using OplogEntryBase::kSessionIdFieldName;
- using OplogEntryBase::kStatementIdFieldName;
- using OplogEntryBase::kTermFieldName;
- using OplogEntryBase::kTimestampFieldName;
- using OplogEntryBase::kTxnNumberFieldName;
- using OplogEntryBase::kUpsertFieldName;
- using OplogEntryBase::kUuidFieldName;
- using OplogEntryBase::kVersionFieldName;
- using OplogEntryBase::kWallClockTimeFieldName;
+ using MutableOplogEntry::kDurableReplOperationFieldName;
+ using MutableOplogEntry::kOperationSessionInfoFieldName;
+ using MutableOplogEntry::k_idFieldName;
+ using MutableOplogEntry::kFromMigrateFieldName;
+ using MutableOplogEntry::kHashFieldName;
+ using MutableOplogEntry::kNssFieldName;
+ using MutableOplogEntry::kObjectFieldName;
+ using MutableOplogEntry::kObject2FieldName;
+ using MutableOplogEntry::kOpTypeFieldName;
+ using MutableOplogEntry::kPostImageOpTimeFieldName;
+ using MutableOplogEntry::kPreImageOpTimeFieldName;
+ using MutableOplogEntry::kPrevWriteOpTimeInTransactionFieldName;
+ using MutableOplogEntry::kSessionIdFieldName;
+ using MutableOplogEntry::kStatementIdFieldName;
+ using MutableOplogEntry::kTermFieldName;
+ using MutableOplogEntry::kTimestampFieldName;
+ using MutableOplogEntry::kTxnNumberFieldName;
+ using MutableOplogEntry::kUpsertFieldName;
+ using MutableOplogEntry::kUuidFieldName;
+ using MutableOplogEntry::kVersionFieldName;
+ using MutableOplogEntry::kWallClockTimeFieldName;
+ using MutableOplogEntry::kOplogVersion;
+
// Make serialize(), toBSON() and getters accessible.
- using OplogEntryBase::serialize;
- using OplogEntryBase::toBSON;
- using OplogEntryBase::getOperationSessionInfo;
- using OplogEntryBase::getSessionId;
- using OplogEntryBase::getTxnNumber;
- using OplogEntryBase::getDurableReplOperation;
- using OplogEntryBase::getOpType;
- using OplogEntryBase::getNss;
- using OplogEntryBase::getUuid;
- using OplogEntryBase::getObject;
- using OplogEntryBase::getObject2;
- using OplogEntryBase::getUpsert;
- using OplogEntryBase::getTimestamp;
- using OplogEntryBase::getTerm;
- using OplogEntryBase::getHash;
- using OplogEntryBase::getVersion;
- using OplogEntryBase::getFromMigrate;
- using OplogEntryBase::get_id;
- using OplogEntryBase::getWallClockTime;
- using OplogEntryBase::getStatementId;
- using OplogEntryBase::getPrevWriteOpTimeInTransaction;
- using OplogEntryBase::getPreImageOpTime;
- using OplogEntryBase::getPostImageOpTime;
+ using MutableOplogEntry::serialize;
+ using MutableOplogEntry::toBSON;
+ using MutableOplogEntry::getOperationSessionInfo;
+ using MutableOplogEntry::getSessionId;
+ using MutableOplogEntry::getTxnNumber;
+ using MutableOplogEntry::getDurableReplOperation;
+ using MutableOplogEntry::getOpType;
+ using MutableOplogEntry::getNss;
+ using MutableOplogEntry::getUuid;
+ using MutableOplogEntry::getObject;
+ using MutableOplogEntry::getObject2;
+ using MutableOplogEntry::getUpsert;
+ using MutableOplogEntry::getTimestamp;
+ using MutableOplogEntry::getTerm;
+ using MutableOplogEntry::getHash;
+ using MutableOplogEntry::getVersion;
+ using MutableOplogEntry::getFromMigrate;
+ using MutableOplogEntry::get_id;
+ using MutableOplogEntry::getWallClockTime;
+ using MutableOplogEntry::getStatementId;
+ using MutableOplogEntry::getPrevWriteOpTimeInTransaction;
+ using MutableOplogEntry::getPreImageOpTime;
+ using MutableOplogEntry::getPostImageOpTime;
+
+ // Make helper functions accessible.
+ using MutableOplogEntry::getOpTime;
+ using MutableOplogEntry::makeInsertOperation;
+ using MutableOplogEntry::makeUpdateOperation;
+ using MutableOplogEntry::makeDeleteOperation;
enum class CommandType {
kNotCommand,
@@ -138,21 +221,6 @@ public:
kAbortTransaction,
};
- // Current oplog version, should be the value of the v field in all oplog entries.
- static const int kOplogVersion;
-
- // Helpers to generate ReplOperation.
- static ReplOperation makeInsertOperation(const NamespaceString& nss,
- boost::optional<UUID> uuid,
- const BSONObj& docToInsert);
- static ReplOperation makeUpdateOperation(const NamespaceString nss,
- boost::optional<UUID> uuid,
- const BSONObj& update,
- const BSONObj& criteria);
- static ReplOperation makeDeleteOperation(const NamespaceString& nss,
- boost::optional<UUID> uuid,
- const BSONObj& docToDelete);
-
// Get the in-memory size in bytes of a ReplOperation.
static size_t getDurableReplOperationSize(const DurableReplOperation& op);
@@ -251,11 +319,6 @@ public:
}
/**
- * Returns the OpTime of the oplog entry.
- */
- OpTime getOpTime() const;
-
- /**
* Serializes the oplog entry to a string.
*/
std::string toString() const;
diff --git a/src/mongo/db/repl/oplog_entry.idl b/src/mongo/db/repl/oplog_entry.idl
index cfed4af41cf..cc9a75172cb 100644
--- a/src/mongo/db/repl/oplog_entry.idl
+++ b/src/mongo/db/repl/oplog_entry.idl
@@ -105,7 +105,7 @@ structs:
v:
cpp_name: version
type: safeInt64
- default: 1
+ default: 2
description: "The version of the oplog"
fromMigrate:
type: bool
diff --git a/src/mongo/db/repl/oplog_test.cpp b/src/mongo/db/repl/oplog_test.cpp
index dafae7e8784..870cd21980e 100644
--- a/src/mongo/db/repl/oplog_test.cpp
+++ b/src/mongo/db/repl/oplog_test.cpp
@@ -99,20 +99,14 @@ TEST_F(OplogTest, LogOpReturnsOpTimeOnSuccessfulInsertIntoOplogCollection) {
// Write to the oplog.
OpTime opTime;
{
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setOpType(repl::OpTypeEnum::kNoop);
+ oplogEntry.setNss(nss);
+ oplogEntry.setObject(msgObj);
+ oplogEntry.setWallClockTime(Date_t::now());
AutoGetDb autoDb(opCtx.get(), nss.db(), MODE_X);
WriteUnitOfWork wunit(opCtx.get());
- opTime = logOp(opCtx.get(),
- "n",
- nss,
- {},
- msgObj,
- nullptr,
- false,
- Date_t::now(),
- {},
- kUninitializedStmtId,
- {},
- OplogSlot());
+ opTime = logOp(opCtx.get(), &oplogEntry);
ASSERT_FALSE(opTime.isNull());
wunit.commit();
}
@@ -223,19 +217,12 @@ OpTime _logOpNoopWithMsg(OperationContext* opCtx,
// logOp() must be called while holding lock because ephemeralForTest storage engine does not
// support concurrent updates to its internal state.
- const auto msgObj = BSON("msg" << nss.ns());
- auto opTime = logOp(opCtx,
- "n",
- nss,
- {},
- msgObj,
- nullptr,
- false,
- Date_t::now(),
- {},
- kUninitializedStmtId,
- {},
- OplogSlot());
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setOpType(repl::OpTypeEnum::kNoop);
+ oplogEntry.setNss(nss);
+ oplogEntry.setObject(BSON("msg" << nss.ns()));
+ oplogEntry.setWallClockTime(Date_t::now());
+ auto opTime = logOp(opCtx, &oplogEntry);
ASSERT_FALSE(opTime.isNull());
ASSERT(opTimeNssMap->find(opTime) == opTimeNssMap->end())
diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp
index eb60d0bb18a..1482462fce9 100644
--- a/src/mongo/db/s/session_catalog_migration_destination.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination.cpp
@@ -79,71 +79,60 @@ BSONObj buildMigrateSessionCmd(const MigrationSessionId& migrationSessionId) {
}
/**
- * Determines whether the oplog entry has a link to either preImage/postImage and return a new
- * oplogLink that contains the same link, but pointing to lastResult.oplogTime. For example, if
- * entry has link to preImageTs, this returns an oplogLink with preImageTs pointing to
+ * Determines whether the oplog entry has a link to either preImage/postImage and sets a new link
+ * to lastResult.oplogTime. For example, if entry has link to preImageTs, this sets preImageTs to
* lastResult.oplogTime.
*
* It is an error to have both preImage and postImage as well as not having them at all.
*/
-repl::OplogLink extractPrePostImageTs(const ProcessOplogResult& lastResult,
- const repl::OplogEntry& entry) {
- repl::OplogLink oplogLink;
-
+void setPrePostImageTs(const ProcessOplogResult& lastResult, repl::MutableOplogEntry* entry) {
if (!lastResult.isPrePostImage) {
uassert(40628,
- str::stream() << "expected oplog with ts: " << entry.getTimestamp().toString()
+ str::stream() << "expected oplog with ts: " << entry->getTimestamp().toString()
<< " to not have "
<< repl::OplogEntryBase::kPreImageOpTimeFieldName
<< " or "
<< repl::OplogEntryBase::kPostImageOpTimeFieldName,
- !entry.getPreImageOpTime() && !entry.getPostImageOpTime());
-
- return oplogLink;
+ !entry->getPreImageOpTime() && !entry->getPostImageOpTime());
+ return;
}
invariant(!lastResult.oplogTime.isNull());
- const auto& sessionInfo = entry.getOperationSessionInfo();
- const auto sessionId = *sessionInfo.getSessionId();
- const auto txnNum = *sessionInfo.getTxnNumber();
-
uassert(40629,
- str::stream() << "expected oplog with ts: " << entry.getTimestamp().toString() << ": "
- << redact(entry.toBSON())
+ str::stream() << "expected oplog with ts: " << entry->getTimestamp().toString() << ": "
+ << redact(entry->toBSON())
<< " to have session: "
<< lastResult.sessionId,
- lastResult.sessionId == sessionId);
+ lastResult.sessionId == entry->getSessionId());
uassert(40630,
- str::stream() << "expected oplog with ts: " << entry.getTimestamp().toString() << ": "
- << redact(entry.toBSON())
+ str::stream() << "expected oplog with ts: " << entry->getTimestamp().toString() << ": "
+ << redact(entry->toBSON())
<< " to have txnNumber: "
<< lastResult.txnNum,
- lastResult.txnNum == txnNum);
+ lastResult.txnNum == entry->getTxnNumber());
- if (entry.getPreImageOpTime()) {
- oplogLink.preImageOpTime = lastResult.oplogTime;
- } else if (entry.getPostImageOpTime()) {
- oplogLink.postImageOpTime = lastResult.oplogTime;
+ if (entry->getPreImageOpTime()) {
+ entry->setPreImageOpTime(lastResult.oplogTime);
+ } else if (entry->getPostImageOpTime()) {
+ entry->setPostImageOpTime(lastResult.oplogTime);
} else {
uasserted(40631,
- str::stream() << "expected oplog with opTime: " << entry.getOpTime().toString()
+ str::stream() << "expected oplog with opTime: " << entry->getOpTime().toString()
<< ": "
- << redact(entry.toBSON())
+ << redact(entry->toBSON())
<< " to have either "
<< repl::OplogEntryBase::kPreImageOpTimeFieldName
<< " or "
<< repl::OplogEntryBase::kPostImageOpTimeFieldName);
}
-
- return oplogLink;
}
/**
* Parses the oplog into an oplog entry and makes sure that it contains the expected fields.
*/
-repl::OplogEntry parseOplog(const BSONObj& oplogBSON) {
- auto oplogEntry = uassertStatusOK(repl::OplogEntry::parse(oplogBSON));
+repl::MutableOplogEntry parseOplog(const BSONObj& oplogBSON) {
+ auto oplogEntry = uassertStatusOK(repl::MutableOplogEntry::parse(oplogBSON));
// Session oplog entries must always contain wall clock time, because we will not be
// transferring anything from a previous version of the server
@@ -208,13 +197,11 @@ BSONObj getNextSessionOplogBatch(OperationContext* opCtx,
ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON,
const ProcessOplogResult& lastResult) {
auto oplogEntry = parseOplog(oplogBSON);
- const auto& sessionInfo = oplogEntry.getOperationSessionInfo();
ProcessOplogResult result;
- result.sessionId = *sessionInfo.getSessionId();
- result.txnNum = *sessionInfo.getTxnNumber();
+ result.sessionId = *oplogEntry.getSessionId();
+ result.txnNum = *oplogEntry.getTxnNumber();
- BSONObj object2;
if (oplogEntry.getOpType() == repl::OpTypeEnum::kNoop) {
// Note: Oplog is already no-op type, no need to nest.
// There are two types of type 'n' oplog format expected here:
@@ -225,8 +212,11 @@ ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON,
// findAndModify operation. In this case, o field contains the relevant info
// and o2 will be empty.
+ BSONObj object2;
if (oplogEntry.getObject2()) {
object2 = *oplogEntry.getObject2();
+ } else {
+ oplogEntry.setObject2(object2);
}
if (object2.isEmpty()) {
@@ -242,7 +232,7 @@ ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON,
!lastResult.isPrePostImage);
}
} else {
- object2 = oplogBSON; // TODO: strip redundant info?
+ oplogEntry.setObject2(oplogBSON); // TODO: strip redundant info?
}
const auto stmtId = *oplogEntry.getStatementId();
@@ -275,11 +265,16 @@ ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON,
throw;
}
- BSONObj object(result.isPrePostImage
- ? oplogEntry.getObject()
- : BSON(SessionCatalogMigrationDestination::kSessionMigrateOplogTag << 1));
- auto oplogLink = extractPrePostImageTs(lastResult, oplogEntry);
- oplogLink.prevOpTime = txnParticipant.getLastWriteOpTime();
+ if (!result.isPrePostImage)
+ oplogEntry.setObject(
+ BSON(SessionCatalogMigrationDestination::kSessionMigrateOplogTag << 1));
+ setPrePostImageTs(lastResult, &oplogEntry);
+ oplogEntry.setPrevWriteOpTimeInTransaction(txnParticipant.getLastWriteOpTime());
+
+ oplogEntry.setOpType(repl::OpTypeEnum::kNoop);
+ oplogEntry.setFromMigrate(true);
+ // Reset OpTime so logOp() can assign a new one.
+ oplogEntry.setOpTime(OplogSlot());
writeConflictRetry(
opCtx,
@@ -295,18 +290,7 @@ ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON,
opCtx, NamespaceString::kSessionTransactionsTableNamespace.db(), MODE_IX);
WriteUnitOfWork wunit(opCtx);
- result.oplogTime = repl::logOp(opCtx,
- "n",
- oplogEntry.getNss(),
- oplogEntry.getUuid(),
- object,
- &object2,
- true,
- *oplogEntry.getWallClockTime(),
- sessionInfo,
- stmtId,
- oplogLink,
- OplogSlot());
+ result.oplogTime = repl::logOp(opCtx, &oplogEntry);
const auto& oplogOpTime = result.oplogTime;
uassert(40633,
diff --git a/src/mongo/db/storage/biggie/biggie_record_store.cpp b/src/mongo/db/storage/biggie/biggie_record_store.cpp
index 60a6dcdfd04..6e656149170 100644
--- a/src/mongo/db/storage/biggie/biggie_record_store.cpp
+++ b/src/mongo/db/storage/biggie/biggie_record_store.cpp
@@ -187,53 +187,6 @@ Status RecordStore::insertRecords(OperationContext* opCtx,
return Status::OK();
}
-Status RecordStore::insertRecordsWithDocWriter(OperationContext* opCtx,
- const DocWriter* const* docs,
- const Timestamp*,
- size_t nDocs,
- RecordId* idsOut) {
- int64_t totalSize = 0;
- for (size_t i = 0; i < nDocs; i++)
- totalSize += docs[i]->documentSize();
-
- // Caller will retry one element at a time.
- if (_isCapped && totalSize > _cappedMaxSize)
- return Status(ErrorCodes::BadValue, "object to insert exceeds cappedMaxSize");
-
- auto ru = RecoveryUnit::get(opCtx);
- StringStore* workingCopy(ru->getHead());
- {
- SizeAdjuster adjuster(opCtx, this);
- for (size_t i = 0; i < nDocs; i++) {
- const size_t len = docs[i]->documentSize();
-
- std::string buf(len, '\0');
- docs[i]->writeDocument(&buf[0]);
-
- int64_t thisRecordId = 0;
- if (_isOplog) {
- StatusWith<RecordId> status = oploghack::extractKey(buf.data(), len);
- if (!status.isOK())
- return status.getStatus();
- thisRecordId = status.getValue().repr();
- _visibilityManager->addUncommittedRecord(opCtx, this, RecordId(thisRecordId));
- } else {
- thisRecordId = _nextRecordId();
- }
- std::string key = createKey(_ident, thisRecordId);
-
- StringStore::value_type vt{key, buf};
- workingCopy->insert(std::move(vt));
- if (idsOut)
- idsOut[i] = RecordId(thisRecordId);
- ru->makeDirty();
- }
- }
-
- _cappedDeleteAsNeeded(opCtx, workingCopy);
- return Status::OK();
-}
-
Status RecordStore::updateRecord(OperationContext* opCtx,
const RecordId& oldLocation,
const char* data,
diff --git a/src/mongo/db/storage/biggie/biggie_record_store.h b/src/mongo/db/storage/biggie/biggie_record_store.h
index 813c575ecf7..e8dee66da1c 100644
--- a/src/mongo/db/storage/biggie/biggie_record_store.h
+++ b/src/mongo/db/storage/biggie/biggie_record_store.h
@@ -75,12 +75,6 @@ public:
std::vector<Record>* inOutRecords,
const std::vector<Timestamp>& timestamps);
- virtual Status insertRecordsWithDocWriter(OperationContext* opCtx,
- const DocWriter* const* docs,
- const Timestamp*,
- size_t nDocs,
- RecordId* idsOut);
-
virtual Status updateRecord(OperationContext* opCtx,
const RecordId& oldLocation,
const char* data,
diff --git a/src/mongo/db/storage/devnull/devnull_kv_engine.cpp b/src/mongo/db/storage/devnull/devnull_kv_engine.cpp
index 8255616eba7..9b69642f9b0 100644
--- a/src/mongo/db/storage/devnull/devnull_kv_engine.cpp
+++ b/src/mongo/db/storage/devnull/devnull_kv_engine.cpp
@@ -108,20 +108,6 @@ public:
return Status::OK();
}
- virtual Status insertRecordsWithDocWriter(OperationContext* opCtx,
- const DocWriter* const* docs,
- const Timestamp*,
- size_t nDocs,
- RecordId* idsOut) {
- _numInserts += nDocs;
- if (idsOut) {
- for (size_t i = 0; i < nDocs; i++) {
- idsOut[i] = RecordId(6, 4);
- }
- }
- return Status::OK();
- }
-
virtual Status updateRecord(OperationContext* opCtx,
const RecordId& oldLocation,
const char* data,
diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp
index e0dfd6fcc06..dc0d2fa1ed0 100644
--- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp
+++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp
@@ -443,47 +443,6 @@ Status EphemeralForTestRecordStore::insertRecords(OperationContext* opCtx,
return Status::OK();
}
-Status EphemeralForTestRecordStore::insertRecordsWithDocWriter(OperationContext* opCtx,
- const DocWriter* const* docs,
- const Timestamp*,
- size_t nDocs,
- RecordId* idsOut) {
- stdx::lock_guard<stdx::recursive_mutex> lock(_data->recordsMutex);
-
- for (size_t i = 0; i < nDocs; i++) {
- const int len = docs[i]->documentSize();
- if (_isCapped && len > _cappedMaxSize) {
- // We use dataSize for capped rollover and we don't want to delete everything if we
- // know this won't fit.
- return Status(ErrorCodes::BadValue, "object to insert exceeds cappedMaxSize");
- }
-
- EphemeralForTestRecord rec(len);
- docs[i]->writeDocument(rec.data.get());
-
- RecordId loc;
- if (_data->isOplog) {
- StatusWith<RecordId> status = extractAndCheckLocForOplog(lock, rec.data.get(), len);
- if (!status.isOK())
- return status.getStatus();
- loc = status.getValue();
- } else {
- loc = allocateLoc(lock);
- }
-
- opCtx->recoveryUnit()->registerChange(new InsertChange(opCtx, _data, loc));
- _data->dataSize += len;
- _data->records[loc] = rec;
-
- cappedDeleteAsNeeded(lock, opCtx);
-
- if (idsOut)
- idsOut[i] = loc;
- }
-
- return Status::OK();
-}
-
Status EphemeralForTestRecordStore::updateRecord(OperationContext* opCtx,
const RecordId& loc,
const char* data,
diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.h b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.h
index 439cbe1cdf6..2fdbaaa579e 100644
--- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.h
+++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.h
@@ -71,12 +71,6 @@ public:
std::vector<Record>* inOutRecords,
const std::vector<Timestamp>& timestamps);
- virtual Status insertRecordsWithDocWriter(OperationContext* opCtx,
- const DocWriter* const* docs,
- const Timestamp*,
- size_t nDocs,
- RecordId* idsOut);
-
virtual Status updateRecord(OperationContext* opCtx,
const RecordId& oldLocation,
const char* data,
diff --git a/src/mongo/db/storage/mobile/mobile_record_store.cpp b/src/mongo/db/storage/mobile/mobile_record_store.cpp
index ba81032d609..1942214db32 100644
--- a/src/mongo/db/storage/mobile/mobile_record_store.cpp
+++ b/src/mongo/db/storage/mobile/mobile_record_store.cpp
@@ -325,30 +325,6 @@ Status MobileRecordStore::insertRecords(OperationContext* opCtx,
return Status::OK();
}
-Status MobileRecordStore::insertRecordsWithDocWriter(OperationContext* opCtx,
- const DocWriter* const* docs,
- const Timestamp* timestamps,
- size_t nDocs,
- RecordId* idsOut) {
- // Calculates the total size of the data buffer.
- size_t totalSize = 0;
- for (size_t i = 0; i < nDocs; i++) {
- totalSize += docs[i]->documentSize();
- }
-
- std::unique_ptr<char[]> buffer(new char[totalSize]);
- char* pos = buffer.get();
- for (size_t i = 0; i < nDocs; i++) {
- docs[i]->writeDocument(pos);
- size_t docLen = docs[i]->documentSize();
- StatusWith<RecordId> res = insertRecord(opCtx, pos, docLen, timestamps[i]);
- idsOut[i] = res.getValue();
- pos += docLen;
- }
-
- return Status::OK();
-}
-
Status MobileRecordStore::updateRecord(OperationContext* opCtx,
const RecordId& recId,
const char* data,
diff --git a/src/mongo/db/storage/mobile/mobile_record_store.h b/src/mongo/db/storage/mobile/mobile_record_store.h
index 9379a7760f7..d5c36e91a3e 100644
--- a/src/mongo/db/storage/mobile/mobile_record_store.h
+++ b/src/mongo/db/storage/mobile/mobile_record_store.h
@@ -63,12 +63,6 @@ public:
std::vector<Record>* inOutRecords,
const std::vector<Timestamp>& timestamps) override;
- Status insertRecordsWithDocWriter(OperationContext* opCtx,
- const DocWriter* const* docs,
- const Timestamp* timestamps,
- size_t nDocs,
- RecordId* idsOut) override;
-
Status updateRecord(OperationContext* opCtx,
const RecordId& oldLocation,
const char* data,
diff --git a/src/mongo/db/storage/record_store.h b/src/mongo/db/storage/record_store.h
index d4a5151faff..3da5078559b 100644
--- a/src/mongo/db/storage/record_store.h
+++ b/src/mongo/db/storage/record_store.h
@@ -62,22 +62,6 @@ struct CompactOptions {
struct CompactStats {};
/**
- * Allows inserting a Record "in-place" without creating a copy ahead of time.
- */
-class DocWriter {
-public:
- virtual void writeDocument(char* buf) const = 0;
- virtual size_t documentSize() const = 0;
- virtual bool addPadding() const {
- return true;
- }
-
-protected:
- // Can't delete through base pointer.
- ~DocWriter() = default;
-};
-
-/**
* The data items stored in a RecordStore.
*/
struct Record {
@@ -359,34 +343,6 @@ public:
}
/**
- * Inserts nDocs documents into this RecordStore using the DocWriter interface.
- *
- * This allows the storage engine to reserve space for a record and have it built in-place
- * rather than building the record then copying it into its destination.
- *
- * On success, if idsOut is non-null the RecordIds of the inserted records will be written into
- * it. It must have space for nDocs RecordIds.
- */
- virtual Status insertRecordsWithDocWriter(OperationContext* opCtx,
- const DocWriter* const* docs,
- const Timestamp* timestamps,
- size_t nDocs,
- RecordId* idsOut = nullptr) = 0;
-
- /**
- * A thin wrapper around insertRecordsWithDocWriter() to simplify handling of single DocWriters.
- */
- StatusWith<RecordId> insertRecordWithDocWriter(OperationContext* opCtx,
- const DocWriter* doc,
- Timestamp timestamp) {
- RecordId out;
- Status status = insertRecordsWithDocWriter(opCtx, &doc, &timestamp, 1, &out);
- if (!status.isOK())
- return status;
- return out;
- }
-
- /**
* Updates the record with id 'recordId', replacing its contents with those described by
* 'data' and 'len'.
*/
diff --git a/src/mongo/db/storage/record_store_test_docwriter.h b/src/mongo/db/storage/record_store_test_docwriter.h
deleted file mode 100644
index af40b8de3d0..00000000000
--- a/src/mongo/db/storage/record_store_test_docwriter.h
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include "mongo/db/storage/record_store.h"
-
-namespace mongo {
-namespace {
-
-class StringDocWriter final : public DocWriter {
-public:
- StringDocWriter(const std::string& data, bool padding) : _data(data), _padding(padding) {}
-
- ~StringDocWriter() {}
-
- void writeDocument(char* buf) const {
- memcpy(buf, _data.c_str(), documentSize());
- }
-
- size_t documentSize() const {
- return _data.size() + 1;
- }
-
- bool addPadding() const {
- return _padding;
- }
-
-private:
- std::string _data;
- bool _padding;
-};
-
-} // namespace
-} // namespace mongo
diff --git a/src/mongo/db/storage/record_store_test_harness.cpp b/src/mongo/db/storage/record_store_test_harness.cpp
index b93e8029d34..9f4214aef1f 100644
--- a/src/mongo/db/storage/record_store_test_harness.cpp
+++ b/src/mongo/db/storage/record_store_test_harness.cpp
@@ -98,49 +98,6 @@ TEST(RecordStoreTestHarness, Simple1) {
}
}
-namespace {
-class DummyDocWriter final : public DocWriter {
-public:
- virtual ~DummyDocWriter() {}
-
- virtual void writeDocument(char* buf) const {
- memcpy(buf, "eliot", 6);
- }
-
- virtual size_t documentSize() const {
- return 6;
- }
-
- virtual bool addPadding() const {
- return false;
- }
-};
-}
-
-
-TEST(RecordStoreTestHarness, Simple1InsertDocWroter) {
- const auto harnessHelper(newRecordStoreHarnessHelper());
- unique_ptr<RecordStore> rs(harnessHelper->newNonCappedRecordStore());
-
- RecordId loc1;
-
- {
- ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
-
- {
- WriteUnitOfWork uow(opCtx.get());
- DummyDocWriter dw;
- StatusWith<RecordId> res =
- rs->insertRecordWithDocWriter(opCtx.get(), &dw, Timestamp(1));
- ASSERT_OK(res.getStatus());
- loc1 = res.getValue();
- uow.commit();
- }
-
- ASSERT_EQUALS(string("eliot"), rs->dataFor(opCtx.get(), loc1).data());
- }
-}
-
TEST(RecordStoreTestHarness, Delete1) {
const auto harnessHelper(newRecordStoreHarnessHelper());
unique_ptr<RecordStore> rs(harnessHelper->newNonCappedRecordStore());
diff --git a/src/mongo/db/storage/record_store_test_insertrecord.cpp b/src/mongo/db/storage/record_store_test_insertrecord.cpp
index 55e50355f31..b5b9afcc9bb 100644
--- a/src/mongo/db/storage/record_store_test_insertrecord.cpp
+++ b/src/mongo/db/storage/record_store_test_insertrecord.cpp
@@ -32,7 +32,6 @@
#include "mongo/db/record_id.h"
#include "mongo/db/storage/record_data.h"
#include "mongo/db/storage/record_store.h"
-#include "mongo/db/storage/record_store_test_docwriter.h"
#include "mongo/db/storage/record_store_test_harness.h"
#include "mongo/unittest/unittest.h"
@@ -108,72 +107,5 @@ TEST(RecordStoreTestHarness, InsertMultipleRecords) {
}
}
-// Insert a record using a DocWriter and verify the number of entries
-// in the collection is 1.
-TEST(RecordStoreTestHarness, InsertRecordUsingDocWriter) {
- const auto harnessHelper(newRecordStoreHarnessHelper());
- unique_ptr<RecordStore> rs(harnessHelper->newNonCappedRecordStore());
-
- {
- ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
- ASSERT_EQUALS(0, rs->numRecords(opCtx.get()));
- }
-
- RecordId loc;
- {
- ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
- {
- StringDocWriter docWriter("my record", false);
-
- WriteUnitOfWork uow(opCtx.get());
- StatusWith<RecordId> res =
- rs->insertRecordWithDocWriter(opCtx.get(), &docWriter, Timestamp(1));
- ASSERT_OK(res.getStatus());
- loc = res.getValue();
- uow.commit();
- }
- }
-
- {
- ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
- ASSERT_EQUALS(1, rs->numRecords(opCtx.get()));
- }
-}
-
-// Insert multiple records using a DocWriter and verify the number of entries
-// in the collection equals the number that were inserted.
-TEST(RecordStoreTestHarness, InsertMultipleRecordsUsingDocWriter) {
- const auto harnessHelper(newRecordStoreHarnessHelper());
- unique_ptr<RecordStore> rs(harnessHelper->newNonCappedRecordStore());
-
- {
- ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
- ASSERT_EQUALS(0, rs->numRecords(opCtx.get()));
- }
-
- const int nToInsert = 10;
- RecordId locs[nToInsert];
- for (int i = 0; i < nToInsert; i++) {
- ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
- {
- stringstream ss;
- ss << "record " << i;
- StringDocWriter docWriter(ss.str(), false);
-
- WriteUnitOfWork uow(opCtx.get());
- StatusWith<RecordId> res =
- rs->insertRecordWithDocWriter(opCtx.get(), &docWriter, Timestamp(1));
- ASSERT_OK(res.getStatus());
- locs[i] = res.getValue();
- uow.commit();
- }
- }
-
- {
- ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
- ASSERT_EQUALS(nToInsert, rs->numRecords(opCtx.get()));
- }
-}
-
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
index 97fd85aaaf5..d711686fc42 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
@@ -1338,48 +1338,6 @@ void WiredTigerRecordStore::notifyCappedWaitersIfNeeded() {
}
}
-Status WiredTigerRecordStore::insertRecordsWithDocWriter(OperationContext* opCtx,
- const DocWriter* const* docs,
- const Timestamp* timestamps,
- size_t nDocs,
- RecordId* idsOut) {
- dassert(opCtx->lockState()->isReadLocked());
-
- std::unique_ptr<Record[]> records(new Record[nDocs]);
-
- // First get all the sizes so we can allocate a single buffer for all documents. Eventually it
- // would be nice if we could either hand off the buffers to WT without copying or write them
- // in-place as we do with MMAPv1, but for now this is the best we can do.
- size_t totalSize = 0;
- for (size_t i = 0; i < nDocs; i++) {
- const size_t docSize = docs[i]->documentSize();
- records[i].data = RecordData(nullptr, docSize); // We fill in the real ptr in next loop.
- totalSize += docSize;
- }
-
- std::unique_ptr<char[]> buffer(new char[totalSize]);
- char* pos = buffer.get();
- for (size_t i = 0; i < nDocs; i++) {
- docs[i]->writeDocument(pos);
- const size_t size = records[i].data.size();
- records[i].data = RecordData(pos, size);
- pos += size;
- }
- invariant(pos == (buffer.get() + totalSize));
-
- Status s = _insertRecords(opCtx, records.get(), timestamps, nDocs);
- if (!s.isOK())
- return s;
-
- if (idsOut) {
- for (size_t i = 0; i < nDocs; i++) {
- idsOut[i] = records[i].id;
- }
- }
-
- return s;
-}
-
Status WiredTigerRecordStore::updateRecord(OperationContext* opCtx,
const RecordId& id,
const char* data,
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h
index 546974eb4a3..33365c96b36 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h
@@ -143,12 +143,6 @@ public:
std::vector<Record>* records,
const std::vector<Timestamp>& timestamps);
- virtual Status insertRecordsWithDocWriter(OperationContext* opCtx,
- const DocWriter* const* docs,
- const Timestamp* timestamps,
- size_t nDocs,
- RecordId* idsOut);
-
virtual Status updateRecord(OperationContext* opCtx,
const RecordId& recordId,
const char* data,
diff --git a/src/mongo/db/transaction_participant_retryable_writes_test.cpp b/src/mongo/db/transaction_participant_retryable_writes_test.cpp
index ddd0a8c3ddd..7203a1c1ddf 100644
--- a/src/mongo/db/transaction_participant_retryable_writes_test.cpp
+++ b/src/mongo/db/transaction_participant_retryable_writes_test.cpp
@@ -201,25 +201,19 @@ protected:
TxnNumber txnNumber,
StmtId stmtId,
repl::OpTime prevOpTime) {
- OperationSessionInfo osi;
- osi.setSessionId(lsid);
- osi.setTxnNumber(txnNumber);
-
- repl::OplogLink link;
- link.prevOpTime = prevOpTime;
-
- return repl::logOp(opCtx,
- "n",
- nss,
- uuid,
- BSON("TestValue" << 0),
- nullptr,
- false,
- Date_t::now(),
- osi,
- stmtId,
- link,
- OplogSlot());
+ repl::MutableOplogEntry oplogEntry;
+ oplogEntry.setOpType(repl::OpTypeEnum::kNoop);
+ oplogEntry.setNss(nss);
+ oplogEntry.setUuid(uuid);
+ oplogEntry.setObject(BSON("TestValue" << 0));
+ oplogEntry.setWallClockTime(Date_t::now());
+ if (stmtId != kUninitializedStmtId) {
+ oplogEntry.setSessionId(lsid);
+ oplogEntry.setTxnNumber(txnNumber);
+ oplogEntry.setStatementId(stmtId);
+ oplogEntry.setPrevWriteOpTimeInTransaction(prevOpTime);
+ }
+ return repl::logOp(opCtx, &oplogEntry);
}
repl::OpTime writeTxnRecord(TxnNumber txnNum,
@@ -590,34 +584,32 @@ TEST_F(TransactionParticipantRetryableWritesTest, ErrorOnlyWhenStmtIdBeingChecke
txnParticipant.refreshFromStorageIfNeeded(opCtx());
txnParticipant.beginOrContinue(opCtx(), txnNum, boost::none, boost::none);
- OperationSessionInfo osi;
- osi.setSessionId(sessionId);
- osi.setTxnNumber(txnNum);
+ repl::MutableOplogEntry oplogEntry;
+ oplogEntry.setSessionId(sessionId);
+ oplogEntry.setTxnNumber(txnNum);
+ oplogEntry.setNss(kNss);
+ oplogEntry.setUuid(uuid);
auto firstOpTime = ([&]() {
+ oplogEntry.setOpType(repl::OpTypeEnum::kInsert);
+ oplogEntry.setObject(BSON("x" << 1));
+ oplogEntry.setObject2(TransactionParticipant::kDeadEndSentinel);
+ oplogEntry.setPrevWriteOpTimeInTransaction(repl::OpTime());
+ oplogEntry.setStatementId(1);
+
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
const auto wallClockTime = Date_t::now();
+ oplogEntry.setWallClockTime(wallClockTime);
- auto opTime = repl::logOp(opCtx(),
- "i",
- kNss,
- uuid,
- BSON("x" << 1),
- &TransactionParticipant::kDeadEndSentinel,
- false,
- wallClockTime,
- osi,
- 1,
- {},
- OplogSlot());
+ auto opTime = repl::logOp(opCtx(), &oplogEntry);
SessionTxnRecord sessionTxnRecord;
sessionTxnRecord.setSessionId(sessionId);
sessionTxnRecord.setTxnNum(txnNum);
sessionTxnRecord.setLastWriteOpTime(opTime);
- sessionTxnRecord.setLastWriteDate(Date_t::now());
+ sessionTxnRecord.setLastWriteDate(wallClockTime);
txnParticipant.onWriteOpCompletedOnPrimary(opCtx(), {1}, sessionTxnRecord);
wuow.commit();
@@ -625,26 +617,19 @@ TEST_F(TransactionParticipantRetryableWritesTest, ErrorOnlyWhenStmtIdBeingChecke
})();
{
- repl::OplogLink link;
- link.prevOpTime = firstOpTime;
+ oplogEntry.setOpType(repl::OpTypeEnum::kNoop);
+ oplogEntry.setObject({});
+ oplogEntry.setObject2(TransactionParticipant::kDeadEndSentinel);
+ oplogEntry.setPrevWriteOpTimeInTransaction(firstOpTime);
+ oplogEntry.setStatementId(kIncompleteHistoryStmtId);
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
const auto wallClockTime = Date_t::now();
+ oplogEntry.setWallClockTime(wallClockTime);
- auto opTime = repl::logOp(opCtx(),
- "n",
- kNss,
- uuid,
- {},
- &TransactionParticipant::kDeadEndSentinel,
- false,
- wallClockTime,
- osi,
- kIncompleteHistoryStmtId,
- link,
- OplogSlot());
+ auto opTime = repl::logOp(opCtx(), &oplogEntry);
SessionTxnRecord sessionTxnRecord;
sessionTxnRecord.setSessionId(sessionId);