diff options
author | Lingzhi Deng <lingzhi.deng@mongodb.com> | 2019-06-12 15:49:24 -0400 |
---|---|---|
committer | Lingzhi Deng <lingzhi.deng@mongodb.com> | 2019-07-08 21:33:37 -0400 |
commit | e7abff841a2f5cf6c3bc71a344e347f6342822b0 (patch) | |
tree | 84e457cde984e2f150b1b2537a1de29a6c1c0b8a /src/mongo | |
parent | 740dbbec28c970befdfea5177957916882bffa2b (diff) | |
download | mongo-e7abff841a2f5cf6c3bc71a344e347f6342822b0.tar.gz |
SERVER-35353: Use MutableOplogEntry class to build up oplog entries progressively
Diffstat (limited to 'src/mongo')
29 files changed, 643 insertions, 1392 deletions
diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h index 7cbaa9d5ced..15057e3124d 100644 --- a/src/mongo/db/catalog/collection.h +++ b/src/mongo/db/catalog/collection.h @@ -277,9 +277,8 @@ public: * this method. */ virtual Status insertDocumentsForOplog(OperationContext* const opCtx, - const DocWriter* const* const docs, - Timestamp* timestamps, - const size_t nDocs) = 0; + std::vector<Record>* records, + const std::vector<Timestamp>& timestamps) = 0; /** * Inserts a document into the record store for a bulk loader that manages the index building diff --git a/src/mongo/db/catalog/collection_impl.cpp b/src/mongo/db/catalog/collection_impl.cpp index 884f1ca87f3..fdde33beeb5 100644 --- a/src/mongo/db/catalog/collection_impl.cpp +++ b/src/mongo/db/catalog/collection_impl.cpp @@ -354,18 +354,15 @@ StatusWithMatchExpression CollectionImpl::parseValidator( } Status CollectionImpl::insertDocumentsForOplog(OperationContext* opCtx, - const DocWriter* const* docs, - Timestamp* timestamps, - size_t nDocs) { + std::vector<Record>* records, + const std::vector<Timestamp>& timestamps) { dassert(opCtx->lockState()->isWriteLocked()); // Since this is only for the OpLog, we can assume these for simplicity. - // This also means that we do not need to forward this object to the OpObserver, which is good - // because it would defeat the purpose of using DocWriter. invariant(!_validator); invariant(!_indexCatalog->haveAnyIndexes()); - Status status = _recordStore->insertRecordsWithDocWriter(opCtx, docs, timestamps, nDocs); + Status status = _recordStore->insertRecords(opCtx, records, timestamps); if (!status.isOK()) return status; diff --git a/src/mongo/db/catalog/collection_impl.h b/src/mongo/db/catalog/collection_impl.h index 180becf62d8..a9a2c6fbfde 100644 --- a/src/mongo/db/catalog/collection_impl.h +++ b/src/mongo/db/catalog/collection_impl.h @@ -167,9 +167,8 @@ public: * this method. */ Status insertDocumentsForOplog(OperationContext* opCtx, - const DocWriter* const* docs, - Timestamp* timestamps, - size_t nDocs) final; + std::vector<Record>* records, + const std::vector<Timestamp>& timestamps) final; /** * Inserts a document into the record store for a bulk loader that manages the index building diff --git a/src/mongo/db/catalog/collection_mock.h b/src/mongo/db/catalog/collection_mock.h index a3efa28633c..cb78c52b72f 100644 --- a/src/mongo/db/catalog/collection_mock.h +++ b/src/mongo/db/catalog/collection_mock.h @@ -122,9 +122,8 @@ public: } Status insertDocumentsForOplog(OperationContext* opCtx, - const DocWriter* const* docs, - Timestamp* timestamps, - size_t nDocs) { + std::vector<Record>* records, + const std::vector<Timestamp>& timestamps) { std::abort(); } diff --git a/src/mongo/db/commands/dbcheck.cpp b/src/mongo/db/commands/dbcheck.cpp index 5f58079b200..6cbfc7b6bf9 100644 --- a/src/mongo/db/commands/dbcheck.cpp +++ b/src/mongo/db/commands/dbcheck.cpp @@ -467,24 +467,18 @@ private: const NamespaceString& nss, OptionalCollectionUUID uuid, const BSONObj& obj) { + repl::MutableOplogEntry oplogEntry; + oplogEntry.setOpType(repl::OpTypeEnum::kCommand); + oplogEntry.setNss(nss); + oplogEntry.setUuid(uuid); + oplogEntry.setObject(obj); return writeConflictRetry( opCtx, "dbCheck oplog entry", NamespaceString::kRsOplogNamespace.ns(), [&] { auto const clockSource = opCtx->getServiceContext()->getFastClockSource(); - const auto wallClockTime = clockSource->now(); + oplogEntry.setWallClockTime(clockSource->now()); WriteUnitOfWork uow(opCtx); - repl::OpTime result = repl::logOp(opCtx, - "c", - nss, - uuid, - obj, - nullptr, - false, - wallClockTime, - {}, - kUninitializedStmtId, - {}, - OplogSlot()); + repl::OpTime result = repl::logOp(opCtx, &oplogEntry); uow.commit(); return result; }); diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index b541cd20946..0e08c5f1065 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -65,6 +65,7 @@ namespace mongo { using repl::OplogEntry; +using repl::MutableOplogEntry; namespace { @@ -76,32 +77,15 @@ constexpr auto kNumRecordsFieldName = "numRecords"_sd; constexpr auto kMsgFieldName = "msg"_sd; constexpr long long kInvalidNumRecords = -1LL; -repl::OpTime logOperation(OperationContext* opCtx, - const char* opstr, - const NamespaceString& ns, - OptionalCollectionUUID uuid, - const BSONObj& obj, - const BSONObj* o2, - bool fromMigrate, - Date_t wallClockTime, - const OperationSessionInfo& sessionInfo, - boost::optional<StmtId> stmtId, - const repl::OplogLink& oplogLink, - const OplogSlot& oplogSlot) { - auto& times = OpObserver::Times::get(opCtx).reservedOpTimes; - auto opTime = repl::logOp(opCtx, - opstr, - ns, - uuid, - obj, - o2, - fromMigrate, - wallClockTime, - sessionInfo, - stmtId, - oplogLink, - oplogSlot); +Date_t getWallClockTimeForOpLog(OperationContext* opCtx) { + auto const clockSource = opCtx->getServiceContext()->getFastClockSource(); + return clockSource->now(); +} +repl::OpTime logOperation(OperationContext* opCtx, MutableOplogEntry* oplogEntry) { + oplogEntry->setWallClockTime(getWallClockTimeForOpLog(opCtx)); + auto& times = OpObserver::Times::get(opCtx).reservedOpTimes; + auto opTime = repl::logOp(opCtx, oplogEntry); times.push_back(opTime); return opTime; } @@ -152,11 +136,6 @@ BSONObj makeObject2ForDropOrRename(uint64_t numRecords) { return obj; } -Date_t getWallClockTimeForOpLog(OperationContext* opCtx) { - auto const clockSource = opCtx->getServiceContext()->getFastClockSource(); - return clockSource->now(); -} - struct OpTimeBundle { repl::OpTime writeOpTime; repl::OpTime prePostImageOpTime; @@ -175,32 +154,20 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& storeObj = args.updateArgs.updatedDoc; } - OperationSessionInfo sessionInfo; - repl::OplogLink oplogLink; + MutableOplogEntry oplogEntry; + oplogEntry.setNss(args.nss); + oplogEntry.setUuid(args.uuid); - const auto txnParticipant = TransactionParticipant::get(opCtx); - if (txnParticipant) { - sessionInfo.setSessionId(*opCtx->getLogicalSessionId()); - sessionInfo.setTxnNumber(*opCtx->getTxnNumber()); - oplogLink.prevOpTime = txnParticipant.getLastWriteOpTime(); - } + repl::OplogLink oplogLink; + repl::appendRetryableWriteInfo(opCtx, &oplogEntry, &oplogLink, args.updateArgs.stmtId); OpTimeBundle opTimes; - opTimes.wallClockTime = getWallClockTimeForOpLog(opCtx); if (!storeObj.isEmpty() && opCtx->getTxnNumber()) { - auto noteUpdateOpTime = logOperation(opCtx, - "n", - args.nss, - args.uuid, - storeObj, - nullptr, - false, - opTimes.wallClockTime, - sessionInfo, - args.updateArgs.stmtId, - {}, - OplogSlot()); + MutableOplogEntry noopEntry = oplogEntry; + noopEntry.setOpType(repl::OpTypeEnum::kNoop); + noopEntry.setObject(std::move(storeObj)); + auto noteUpdateOpTime = logOperation(opCtx, &noopEntry); opTimes.prePostImageOpTime = noteUpdateOpTime; @@ -212,19 +179,14 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& } } - opTimes.writeOpTime = logOperation(opCtx, - "u", - args.nss, - args.uuid, - args.updateArgs.update, - &args.updateArgs.criteria, - args.updateArgs.fromMigrate, - opTimes.wallClockTime, - sessionInfo, - args.updateArgs.stmtId, - oplogLink, - OplogSlot()); - + oplogEntry.setOpType(repl::OpTypeEnum::kUpdate); + oplogEntry.setObject(args.updateArgs.update); + oplogEntry.setObject2(args.updateArgs.criteria); + oplogEntry.setFromMigrateIfTrue(args.updateArgs.fromMigrate); + // oplogLink could have been changed to include pre/postImageOpTime by the previous no-op write. + repl::appendRetryableWriteInfo(opCtx, &oplogEntry, &oplogLink, args.updateArgs.stmtId); + opTimes.writeOpTime = logOperation(opCtx, &oplogEntry); + opTimes.wallClockTime = oplogEntry.getWallClockTime().get(); return opTimes; } @@ -237,79 +199,34 @@ OpTimeBundle replLogDelete(OperationContext* opCtx, StmtId stmtId, bool fromMigrate, const boost::optional<BSONObj>& deletedDoc) { - OperationSessionInfo sessionInfo; - repl::OplogLink oplogLink; + MutableOplogEntry oplogEntry; + oplogEntry.setNss(nss); + oplogEntry.setUuid(uuid); - const auto txnParticipant = TransactionParticipant::get(opCtx); - if (txnParticipant) { - sessionInfo.setSessionId(*opCtx->getLogicalSessionId()); - sessionInfo.setTxnNumber(*opCtx->getTxnNumber()); - oplogLink.prevOpTime = txnParticipant.getLastWriteOpTime(); - } + repl::OplogLink oplogLink; + repl::appendRetryableWriteInfo(opCtx, &oplogEntry, &oplogLink, stmtId); OpTimeBundle opTimes; - opTimes.wallClockTime = getWallClockTimeForOpLog(opCtx); if (deletedDoc && opCtx->getTxnNumber()) { - auto noteOplog = logOperation(opCtx, - "n", - nss, - uuid, - deletedDoc.get(), - nullptr, - false, - opTimes.wallClockTime, - sessionInfo, - stmtId, - {}, - OplogSlot()); + MutableOplogEntry noopEntry = oplogEntry; + noopEntry.setOpType(repl::OpTypeEnum::kNoop); + noopEntry.setObject(deletedDoc.get()); + auto noteOplog = logOperation(opCtx, &noopEntry); opTimes.prePostImageOpTime = noteOplog; oplogLink.preImageOpTime = noteOplog; } - auto& documentKey = documentKeyDecoration(opCtx); - opTimes.writeOpTime = logOperation(opCtx, - "d", - nss, - uuid, - documentKey, - nullptr, - fromMigrate, - opTimes.wallClockTime, - sessionInfo, - stmtId, - oplogLink, - OplogSlot()); + oplogEntry.setOpType(repl::OpTypeEnum::kDelete); + oplogEntry.setObject(documentKeyDecoration(opCtx)); + oplogEntry.setFromMigrateIfTrue(fromMigrate); + // oplogLink could have been changed to include preImageOpTime by the previous no-op write. + repl::appendRetryableWriteInfo(opCtx, &oplogEntry, &oplogLink, stmtId); + opTimes.writeOpTime = logOperation(opCtx, &oplogEntry); + opTimes.wallClockTime = oplogEntry.getWallClockTime().get(); return opTimes; } -/** - * Write oplog entry for applyOps/atomic transaction operations. - */ -OpTimeBundle replLogApplyOps(OperationContext* opCtx, - const NamespaceString& cmdNss, - const BSONObj& applyOpCmd, - const OperationSessionInfo& sessionInfo, - boost::optional<StmtId> stmtId, - const repl::OplogLink& oplogLink, - const OplogSlot& oplogSlot) { - OpTimeBundle times; - times.wallClockTime = getWallClockTimeForOpLog(opCtx); - times.writeOpTime = logOperation(opCtx, - "c", - cmdNss, - {}, - applyOpCmd, - nullptr, - false, - times.wallClockTime, - sessionInfo, - stmtId, - oplogLink, - oplogSlot); - return times; -} - } // namespace BSONObj OpObserverImpl::getDocumentKey(OperationContext* opCtx, @@ -324,7 +241,6 @@ void OpObserverImpl::onCreateIndex(OperationContext* opCtx, CollectionUUID uuid, BSONObj indexDoc, bool fromMigrate) { - BSONObjBuilder builder; builder.append("createIndexes", nss.coll()); @@ -333,18 +249,13 @@ void OpObserverImpl::onCreateIndex(OperationContext* opCtx, builder.append(e); } - logOperation(opCtx, - "c", - nss.getCommandNS(), - uuid, - builder.done(), - nullptr, - fromMigrate, - getWallClockTimeForOpLog(opCtx), - {}, - kUninitializedStmtId, - {}, - OplogSlot()); + MutableOplogEntry oplogEntry; + oplogEntry.setOpType(repl::OpTypeEnum::kCommand); + oplogEntry.setNss(nss.getCommandNS()); + oplogEntry.setUuid(uuid); + oplogEntry.setObject(builder.done()); + oplogEntry.setFromMigrateIfTrue(fromMigrate); + logOperation(opCtx, &oplogEntry); } void OpObserverImpl::onStartIndexBuild(OperationContext* opCtx, @@ -369,18 +280,13 @@ void OpObserverImpl::onStartIndexBuild(OperationContext* opCtx, } indexesArr.done(); - logOperation(opCtx, - "c", - nss.getCommandNS(), - collUUID, - oplogEntryBuilder.done(), - nullptr, - fromMigrate, - getWallClockTimeForOpLog(opCtx), - {}, - kUninitializedStmtId, - {}, - OplogSlot()); + MutableOplogEntry oplogEntry; + oplogEntry.setOpType(repl::OpTypeEnum::kCommand); + oplogEntry.setNss(nss.getCommandNS()); + oplogEntry.setUuid(collUUID); + oplogEntry.setObject(oplogEntryBuilder.done()); + oplogEntry.setFromMigrateIfTrue(fromMigrate); + logOperation(opCtx, &oplogEntry); } void OpObserverImpl::onCommitIndexBuild(OperationContext* opCtx, @@ -405,18 +311,13 @@ void OpObserverImpl::onCommitIndexBuild(OperationContext* opCtx, } indexesArr.done(); - logOperation(opCtx, - "c", - nss.getCommandNS(), - collUUID, - oplogEntryBuilder.done(), - nullptr, - fromMigrate, - getWallClockTimeForOpLog(opCtx), - {}, - kUninitializedStmtId, - {}, - OplogSlot()); + MutableOplogEntry oplogEntry; + oplogEntry.setOpType(repl::OpTypeEnum::kCommand); + oplogEntry.setNss(nss.getCommandNS()); + oplogEntry.setUuid(collUUID); + oplogEntry.setObject(oplogEntryBuilder.done()); + oplogEntry.setFromMigrateIfTrue(fromMigrate); + logOperation(opCtx, &oplogEntry); } void OpObserverImpl::onAbortIndexBuild(OperationContext* opCtx, @@ -441,18 +342,13 @@ void OpObserverImpl::onAbortIndexBuild(OperationContext* opCtx, } indexesArr.done(); - logOperation(opCtx, - "c", - nss.getCommandNS(), - collUUID, - oplogEntryBuilder.done(), - nullptr, - fromMigrate, - getWallClockTimeForOpLog(opCtx), - {}, - kUninitializedStmtId, - {}, - OplogSlot()); + MutableOplogEntry oplogEntry; + oplogEntry.setOpType(repl::OpTypeEnum::kCommand); + oplogEntry.setNss(nss.getCommandNS()); + oplogEntry.setUuid(collUUID); + oplogEntry.setObject(oplogEntryBuilder.done()); + oplogEntry.setFromMigrateIfTrue(fromMigrate); + logOperation(opCtx, &oplogEntry); } void OpObserverImpl::onInserts(OperationContext* opCtx, @@ -478,12 +374,18 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, return; } for (auto iter = first; iter != last; iter++) { - auto operation = OplogEntry::makeInsertOperation(nss, uuid, iter->doc); + auto operation = MutableOplogEntry::makeInsertOperation(nss, uuid, iter->doc); txnParticipant.addTransactionOperation(opCtx, operation); } } else { + MutableOplogEntry oplogEntryTemplate; + oplogEntryTemplate.setNss(nss); + oplogEntryTemplate.setUuid(uuid); + oplogEntryTemplate.setFromMigrateIfTrue(fromMigrate); lastWriteDate = getWallClockTimeForOpLog(opCtx); - opTimeList = repl::logInsertOps(opCtx, nss, uuid, first, last, fromMigrate, lastWriteDate); + oplogEntryTemplate.setWallClockTime(lastWriteDate); + + opTimeList = repl::logInsertOps(opCtx, &oplogEntryTemplate, first, last); if (!opTimeList.empty()) lastOpTime = opTimeList.back(); @@ -553,7 +455,7 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg OpTimeBundle opTime; if (inMultiDocumentTransaction) { - auto operation = OplogEntry::makeUpdateOperation( + auto operation = MutableOplogEntry::makeUpdateOperation( args.nss, args.uuid, args.updateArgs.update, args.updateArgs.criteria); txnParticipant.addTransactionOperation(opCtx, operation); } else { @@ -614,8 +516,8 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, OpTimeBundle opTime; if (inMultiDocumentTransaction) { - auto operation = - OplogEntry::makeDeleteOperation(nss, uuid, deletedDoc ? deletedDoc.get() : documentKey); + auto operation = MutableOplogEntry::makeDeleteOperation( + nss, uuid, deletedDoc ? deletedDoc.get() : documentKey); txnParticipant.addTransactionOperation(opCtx, operation); } else { opTime = replLogDelete(opCtx, nss, uuid, stmtId, fromMigrate, deletedDoc); @@ -656,19 +558,13 @@ void OpObserverImpl::onInternalOpMessage(OperationContext* opCtx, const boost::optional<UUID> uuid, const BSONObj& msgObj, const boost::optional<BSONObj> o2MsgObj) { - const BSONObj* o2MsgPtr = o2MsgObj ? o2MsgObj.get_ptr() : nullptr; - logOperation(opCtx, - "n", - nss, - uuid, - msgObj, - o2MsgPtr, - false, - getWallClockTimeForOpLog(opCtx), - {}, - kUninitializedStmtId, - {}, - OplogSlot()); + MutableOplogEntry oplogEntry; + oplogEntry.setOpType(repl::OpTypeEnum::kNoop); + oplogEntry.setNss(nss); + oplogEntry.setUuid(uuid); + oplogEntry.setObject(msgObj); + oplogEntry.setObject2(o2MsgObj); + logOperation(opCtx, &oplogEntry); } void OpObserverImpl::onCreateCollection(OperationContext* opCtx, @@ -677,24 +573,15 @@ void OpObserverImpl::onCreateCollection(OperationContext* opCtx, const CollectionOptions& options, const BSONObj& idIndex, const OplogSlot& createOpTime) { - const auto cmdNss = collectionName.getCommandNS(); - - const auto cmdObj = makeCreateCollCmdObj(collectionName, options, idIndex); - if (!collectionName.isSystemDotProfile()) { // do not replicate system.profile modifications - logOperation(opCtx, - "c", - cmdNss, - options.uuid, - cmdObj, - nullptr, - false, - getWallClockTimeForOpLog(opCtx), - {}, - kUninitializedStmtId, - {}, - createOpTime); + MutableOplogEntry oplogEntry; + oplogEntry.setOpType(repl::OpTypeEnum::kCommand); + oplogEntry.setNss(collectionName.getCommandNS()); + oplogEntry.setUuid(options.uuid); + oplogEntry.setObject(makeCreateCollCmdObj(collectionName, options, idIndex)); + oplogEntry.setOpTime(createOpTime); + logOperation(opCtx, &oplogEntry); } } @@ -704,35 +591,25 @@ void OpObserverImpl::onCollMod(OperationContext* opCtx, const BSONObj& collModCmd, const CollectionOptions& oldCollOptions, boost::optional<TTLCollModInfo> ttlInfo) { - const auto cmdNss = nss.getCommandNS(); - - // Create the 'o' field object. - const auto cmdObj = makeCollModCmdObj(collModCmd, oldCollOptions, ttlInfo); - - // Create the 'o2' field object. We save the old collection metadata and TTL expiration. - BSONObjBuilder o2Builder; - o2Builder.append("collectionOptions_old", oldCollOptions.toBSON()); - if (ttlInfo) { - auto oldExpireAfterSeconds = durationCount<Seconds>(ttlInfo->oldExpireAfterSeconds); - o2Builder.append("expireAfterSeconds_old", oldExpireAfterSeconds); - } - - const auto o2Obj = o2Builder.done(); if (!nss.isSystemDotProfile()) { // do not replicate system.profile modifications - logOperation(opCtx, - "c", - cmdNss, - uuid, - cmdObj, - &o2Obj, - false, - getWallClockTimeForOpLog(opCtx), - {}, - kUninitializedStmtId, - {}, - OplogSlot()); + + // Create the 'o2' field object. We save the old collection metadata and TTL expiration. + BSONObjBuilder o2Builder; + o2Builder.append("collectionOptions_old", oldCollOptions.toBSON()); + if (ttlInfo) { + auto oldExpireAfterSeconds = durationCount<Seconds>(ttlInfo->oldExpireAfterSeconds); + o2Builder.append("expireAfterSeconds_old", oldExpireAfterSeconds); + } + + MutableOplogEntry oplogEntry; + oplogEntry.setOpType(repl::OpTypeEnum::kCommand); + oplogEntry.setNss(nss.getCommandNS()); + oplogEntry.setUuid(uuid); + oplogEntry.setObject(makeCollModCmdObj(collModCmd, oldCollOptions, ttlInfo)); + oplogEntry.setObject2(o2Builder.done()); + logOperation(opCtx, &oplogEntry); } // Make sure the UUID values in the Collection metadata, the Collection object, and the UUID @@ -752,21 +629,11 @@ void OpObserverImpl::onCollMod(OperationContext* opCtx, } void OpObserverImpl::onDropDatabase(OperationContext* opCtx, const std::string& dbName) { - const NamespaceString cmdNss{dbName, "$cmd"}; - const auto cmdObj = BSON("dropDatabase" << 1); - - logOperation(opCtx, - "c", - cmdNss, - {}, - cmdObj, - nullptr, - false, - getWallClockTimeForOpLog(opCtx), - {}, - kUninitializedStmtId, - {}, - OplogSlot()); + MutableOplogEntry oplogEntry; + oplogEntry.setOpType(repl::OpTypeEnum::kCommand); + oplogEntry.setNss({dbName, "$cmd"}); + oplogEntry.setObject(BSON("dropDatabase" << 1)); + logOperation(opCtx, &oplogEntry); uassert( 50714, "dropping the admin database is not allowed.", dbName != NamespaceString::kAdminDb); @@ -781,24 +648,15 @@ repl::OpTime OpObserverImpl::onDropCollection(OperationContext* opCtx, OptionalCollectionUUID uuid, std::uint64_t numRecords, const CollectionDropType dropType) { - const auto cmdNss = collectionName.getCommandNS(); - const auto cmdObj = BSON("drop" << collectionName.coll()); - const auto obj2 = makeObject2ForDropOrRename(numRecords); - if (!collectionName.isSystemDotProfile()) { // Do not replicate system.profile modifications. - logOperation(opCtx, - "c", - cmdNss, - uuid, - cmdObj, - &obj2, - false, - getWallClockTimeForOpLog(opCtx), - {}, - kUninitializedStmtId, - {}, - OplogSlot()); + MutableOplogEntry oplogEntry; + oplogEntry.setOpType(repl::OpTypeEnum::kCommand); + oplogEntry.setNss(collectionName.getCommandNS()); + oplogEntry.setUuid(uuid); + oplogEntry.setObject(BSON("drop" << collectionName.coll())); + oplogEntry.setObject2(makeObject2ForDropOrRename(numRecords)); + logOperation(opCtx, &oplogEntry); } uassert(50715, @@ -819,21 +677,13 @@ void OpObserverImpl::onDropIndex(OperationContext* opCtx, OptionalCollectionUUID uuid, const std::string& indexName, const BSONObj& indexInfo) { - const auto cmdNss = nss.getCommandNS(); - const auto cmdObj = BSON("dropIndexes" << nss.coll() << "index" << indexName); - - logOperation(opCtx, - "c", - cmdNss, - uuid, - cmdObj, - &indexInfo, - false, - getWallClockTimeForOpLog(opCtx), - {}, - kUninitializedStmtId, - {}, - OplogSlot()); + MutableOplogEntry oplogEntry; + oplogEntry.setOpType(repl::OpTypeEnum::kCommand); + oplogEntry.setNss(nss.getCommandNS()); + oplogEntry.setUuid(uuid); + oplogEntry.setObject(BSON("dropIndexes" << nss.coll() << "index" << indexName)); + oplogEntry.setObject2(indexInfo); + logOperation(opCtx, &oplogEntry); } @@ -844,8 +694,6 @@ repl::OpTime OpObserverImpl::preRenameCollection(OperationContext* const opCtx, OptionalCollectionUUID dropTargetUUID, std::uint64_t numRecords, bool stayTemp) { - const auto cmdNss = fromCollection.getCommandNS(); - BSONObjBuilder builder; builder.append("renameCollection", fromCollection.ns()); builder.append("to", toCollection.ns()); @@ -854,26 +702,14 @@ repl::OpTime OpObserverImpl::preRenameCollection(OperationContext* const opCtx, dropTargetUUID->appendToBuilder(&builder, "dropTarget"); } - const auto cmdObj = builder.done(); - BSONObj obj2; - BSONObj* obj2Ptr = nullptr; - if (dropTargetUUID) { - obj2 = makeObject2ForDropOrRename(numRecords); - obj2Ptr = &obj2; - } - - logOperation(opCtx, - "c", - cmdNss, - uuid, - cmdObj, - obj2Ptr, - false, - getWallClockTimeForOpLog(opCtx), - {}, - kUninitializedStmtId, - {}, - OplogSlot()); + MutableOplogEntry oplogEntry; + oplogEntry.setOpType(repl::OpTypeEnum::kCommand); + oplogEntry.setNss(fromCollection.getCommandNS()); + oplogEntry.setUuid(uuid); + oplogEntry.setObject(builder.done()); + if (dropTargetUUID) + oplogEntry.setObject2(makeObject2ForDropOrRename(numRecords)); + logOperation(opCtx, &oplogEntry); return {}; } @@ -905,31 +741,24 @@ void OpObserverImpl::onRenameCollection(OperationContext* const opCtx, void OpObserverImpl::onApplyOps(OperationContext* opCtx, const std::string& dbName, const BSONObj& applyOpCmd) { - const NamespaceString cmdNss{dbName, "$cmd"}; - - replLogApplyOps(opCtx, cmdNss, applyOpCmd, {}, kUninitializedStmtId, {}, OplogSlot()); + MutableOplogEntry oplogEntry; + oplogEntry.setOpType(repl::OpTypeEnum::kCommand); + oplogEntry.setNss({dbName, "$cmd"}); + oplogEntry.setObject(applyOpCmd); + logOperation(opCtx, &oplogEntry); } void OpObserverImpl::onEmptyCapped(OperationContext* opCtx, const NamespaceString& collectionName, OptionalCollectionUUID uuid) { - const auto cmdNss = collectionName.getCommandNS(); - const auto cmdObj = BSON("emptycapped" << collectionName.coll()); - if (!collectionName.isSystemDotProfile()) { // Do not replicate system.profile modifications - logOperation(opCtx, - "c", - cmdNss, - uuid, - cmdObj, - nullptr, - false, - getWallClockTimeForOpLog(opCtx), - {}, - kUninitializedStmtId, - {}, - OplogSlot()); + MutableOplogEntry oplogEntry; + oplogEntry.setOpType(repl::OpTypeEnum::kCommand); + oplogEntry.setNss(collectionName.getCommandNS()); + oplogEntry.setUuid(uuid); + oplogEntry.setObject(BSON("emptycapped" << collectionName.coll())); + logOperation(opCtx, &oplogEntry); } } @@ -985,69 +814,28 @@ std::vector<repl::ReplOperation>::const_iterator packTransactionStatementsForApp // builder object already has an 'applyOps' field appended pointing to the desired array of ops // i.e. { "applyOps" : [op1, op2, ...] } // -// @param prepare determines whether a 'prepare' field will be attached to the written oplog entry. -// @param isPartialTxn is this applyOps entry part of an in-progress multi oplog entry transaction. -// Should be set for all non-terminal ops of an unprepared multi oplog entry transaction. -// @param shouldWriteStateField determines whether a 'state' field will be included in the write to -// the transactions table. Only meaningful if 'updateTxnTable' is true. -// @param updateTxnTable determines whether the transactions table will updated after the oplog -// entry is written. +// @param txnState the 'state' field of the transaction table entry update. // @param startOpTime the optime of the 'startOpTime' field of the transaction table entry update. // If boost::none, no 'startOpTime' field will be included in the new transaction table entry. Only // meaningful if 'updateTxnTable' is true. +// @param updateTxnTable determines whether the transactions table will updated after the oplog +// entry is written. // // Returns the optime of the written oplog entry. OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx, - BSONObjBuilder* applyOpsBuilder, - const OplogSlot& oplogSlot, - repl::OpTime prevWriteOpTime, - boost::optional<StmtId> stmtId, - const bool prepare, - const bool isPartialTxn, - const bool shouldWriteStateField, - const bool updateTxnTable, - boost::optional<long long> count, - boost::optional<repl::OpTime> startOpTime) { - - // A 'prepare' oplog entry should never include a 'partialTxn' field. - invariant(!(isPartialTxn && prepare)); - - const NamespaceString cmdNss{"admin", "$cmd"}; - - OperationSessionInfo sessionInfo; - repl::OplogLink oplogLink; - sessionInfo.setSessionId(*opCtx->getLogicalSessionId()); - sessionInfo.setTxnNumber(*opCtx->getTxnNumber()); - - oplogLink.prevOpTime = prevWriteOpTime; + MutableOplogEntry* oplogEntry, + boost::optional<DurableTxnStateEnum> txnState, + boost::optional<repl::OpTime> startOpTime, + const bool updateTxnTable) { + oplogEntry->setOpType(repl::OpTypeEnum::kCommand); + oplogEntry->setNss({"admin", "$cmd"}); + oplogEntry->setSessionId(opCtx->getLogicalSessionId()); + oplogEntry->setTxnNumber(opCtx->getTxnNumber()); try { - if (prepare) { - applyOpsBuilder->append("prepare", true); - } - if (isPartialTxn) { - applyOpsBuilder->append("partialTxn", true); - } - if (count) { - applyOpsBuilder->append("count", *count); - } - auto applyOpCmd = applyOpsBuilder->done(); - auto times = - replLogApplyOps(opCtx, cmdNss, applyOpCmd, sessionInfo, stmtId, oplogLink, oplogSlot); - - auto txnState = [&]() -> boost::optional<DurableTxnStateEnum> { - if (!shouldWriteStateField) { - invariant(!prepare); - return boost::none; - } - - if (isPartialTxn) { - return DurableTxnStateEnum::kInProgress; - } - - return prepare ? DurableTxnStateEnum::kPrepared : DurableTxnStateEnum::kCommitted; - }(); - + OpTimeBundle times; + times.writeOpTime = logOperation(opCtx, oplogEntry); + times.wallClockTime = oplogEntry->getWallClockTime().get(); if (updateTxnTable) { SessionTxnRecord sessionTxnRecord; sessionTxnRecord.setLastWriteOpTime(times.writeOpTime); @@ -1067,37 +855,6 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx, MONGO_UNREACHABLE; } -// Log a single applyOps for transactions without any attempt to pack operations. If the given -// statements would exceed the maximum BSON size limit of a single oplog entry, this will throw a -// TransactionTooLarge error. -// TODO(SERVER-41470): Remove this function once old transaction format is no longer needed. -OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx, - const std::vector<repl::ReplOperation>& statements, - const OplogSlot& oplogSlot, - repl::OpTime prevWriteOpTime, - boost::optional<StmtId> stmtId, - const bool prepare, - const bool isPartialTxn, - const bool shouldWriteStateField, - const bool updateTxnTable, - boost::optional<long long> count, - boost::optional<repl::OpTime> startOpTime) { - BSONObjBuilder applyOpsBuilder; - packTransactionStatementsForApplyOps( - &applyOpsBuilder, statements.begin(), statements.end(), false /* limitSize */); - return logApplyOpsForTransaction(opCtx, - &applyOpsBuilder, - oplogSlot, - prevWriteOpTime, - stmtId, - prepare, - isPartialTxn, - shouldWriteStateField, - updateTxnTable, - count, - startOpTime); -} - // Logs transaction oplog entries for preparing a transaction or committing an unprepared // transaction. This includes the in-progress 'partialTxn' oplog entries followed by the implicit // prepare or commit entry. If the 'prepare' argument is true, it will log entries for a prepared @@ -1153,9 +910,24 @@ int logOplogEntriesForTransaction(OperationContext* opCtx, // commit or implicit prepare, i.e. we omit the 'partialTxn' field. auto firstOp = stmtsIter == stmts.begin(); auto lastOp = nextStmt == stmts.end(); - auto isPartialTxn = !lastOp; + auto implicitCommit = lastOp && !prepare; auto implicitPrepare = lastOp && prepare; + auto isPartialTxn = !lastOp; + // A 'prepare' oplog entry should never include a 'partialTxn' field. + invariant(!(isPartialTxn && implicitPrepare)); + if (implicitPrepare) { + applyOpsBuilder.append("prepare", true); + } + if (isPartialTxn) { + applyOpsBuilder.append("partialTxn", true); + } + + // The 'count' field gives the total number of individual operations in the + // transaction, and is included on a non-initial implicit commit or prepare entry. + if (lastOp && !firstOp) { + applyOpsBuilder.append("count", static_cast<long long>(stmts.size())); + } // For both prepared and unprepared transactions, update the transactions table on // the first and last op. @@ -1177,21 +949,16 @@ int logOplogEntriesForTransaction(OperationContext* opCtx, // no longer be set. auto startOpTime = boost::make_optional(!implicitCommit, firstOpTimeOfTxn); - // The 'count' field gives the total number of individual operations in the - // transaction, and is included on a non-initial implicit commit or prepare entry. - auto count = - (lastOp && !firstOp) ? boost::optional<long long>(stmts.size()) : boost::none; - prevWriteOpTime = logApplyOpsForTransaction(opCtx, - &applyOpsBuilder, - oplogSlot, - prevWriteOpTime.writeOpTime, - boost::none /* stmtId */, - implicitPrepare, - isPartialTxn, - true /* shouldWriteStateField */, - updateTxnTable, - count, - startOpTime); + MutableOplogEntry oplogEntry; + oplogEntry.setOpTime(oplogSlot); + oplogEntry.setPrevWriteOpTimeInTransaction(prevWriteOpTime.writeOpTime); + oplogEntry.setObject(applyOpsBuilder.done()); + auto txnState = isPartialTxn ? DurableTxnStateEnum::kInProgress + : (implicitPrepare ? DurableTxnStateEnum::kPrepared + : DurableTxnStateEnum::kCommitted); + prevWriteOpTime = logApplyOpsForTransaction( + opCtx, &oplogEntry, txnState, startOpTime, updateTxnTable); + // Advance the iterator to the beginning of the remaining unpacked statements. stmtsIter = nextStmt; numEntriesWritten++; @@ -1202,20 +969,14 @@ int logOplogEntriesForTransaction(OperationContext* opCtx, } void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx, - const OplogSlot& oplogSlot, - const BSONObj& objectField, + MutableOplogEntry* oplogEntry, DurableTxnStateEnum durableState) { - const NamespaceString cmdNss{"admin", "$cmd"}; - - OperationSessionInfo sessionInfo; - repl::OplogLink oplogLink; - sessionInfo.setSessionId(*opCtx->getLogicalSessionId()); - sessionInfo.setTxnNumber(*opCtx->getTxnNumber()); - - auto txnParticipant = TransactionParticipant::get(opCtx); - oplogLink.prevOpTime = txnParticipant.getLastWriteOpTime(); - - const auto wallClockTime = getWallClockTimeForOpLog(opCtx); + oplogEntry->setOpType(repl::OpTypeEnum::kCommand); + oplogEntry->setNss({"admin", "$cmd"}); + oplogEntry->setSessionId(opCtx->getLogicalSessionId()); + oplogEntry->setTxnNumber(opCtx->getTxnNumber()); + oplogEntry->setPrevWriteOpTimeInTransaction( + TransactionParticipant::get(opCtx).getLastWriteOpTime()); // There should not be a parent WUOW outside of this one. This guarantees the safety of the // write conflict retry loop. @@ -1234,23 +995,12 @@ void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx, invariant(opCtx->lockState()->isWriteLocked()); WriteUnitOfWork wuow(opCtx); - const auto oplogOpTime = logOperation(opCtx, - "c", - cmdNss, - {} /* uuid */, - objectField, - nullptr /* o2 */, - false /* fromMigrate */, - wallClockTime, - sessionInfo, - boost::none /* stmtId */, - oplogLink, - oplogSlot); - invariant(oplogSlot.isNull() || oplogSlot == oplogOpTime); + const auto oplogOpTime = logOperation(opCtx, oplogEntry); + invariant(oplogEntry->getOpTime().isNull() || oplogEntry->getOpTime() == oplogOpTime); SessionTxnRecord sessionTxnRecord; sessionTxnRecord.setLastWriteOpTime(oplogOpTime); - sessionTxnRecord.setLastWriteDate(wallClockTime); + sessionTxnRecord.setLastWriteDate(oplogEntry->getWallClockTime().get()); sessionTxnRecord.setState(durableState); onWriteOpCompleted(opCtx, {}, sessionTxnRecord); wuow.commit(); @@ -1281,18 +1031,22 @@ void OpObserverImpl::onUnpreparedTransactionCommit( auto txnParticipant = TransactionParticipant::get(opCtx); const auto lastWriteOpTime = txnParticipant.getLastWriteOpTime(); invariant(lastWriteOpTime.isNull()); + MutableOplogEntry oplogEntry; + oplogEntry.setPrevWriteOpTimeInTransaction(lastWriteOpTime); + oplogEntry.setStatementId(StmtId(0)); + + BSONObjBuilder applyOpsBuilder; + // TODO(SERVER-41470): Remove limitSize==false once old transaction format is no longer + // needed. + packTransactionStatementsForApplyOps( + &applyOpsBuilder, statements.begin(), statements.end(), false /* limitSize */); + oplogEntry.setObject(applyOpsBuilder.done()); + + auto txnState = boost::make_optional( + fcv >= ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo42, + DurableTxnStateEnum::kCommitted); commitOpTime = logApplyOpsForTransaction( - opCtx, - statements, - OplogSlot(), - lastWriteOpTime, - StmtId(0), - false /* prepare */, - false /* isPartialTxn */, - fcv >= ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo42, - true /* updateTxnTable */, - boost::none, - boost::none) + opCtx, &oplogEntry, txnState, boost::none, true /* updateTxnTable */) .writeOpTime; } else { // Reserve all the optimes in advance, so we only need to get the optime mutex once. We @@ -1321,10 +1075,14 @@ void OpObserverImpl::onPreparedTransactionCommit( invariant(!commitTimestamp.isNull()); + MutableOplogEntry oplogEntry; + oplogEntry.setOpTime(commitOplogEntryOpTime); + CommitTransactionOplogObject cmdObj; cmdObj.setCommitTimestamp(commitTimestamp); - logCommitOrAbortForPreparedTransaction( - opCtx, commitOplogEntryOpTime, cmdObj.toBSON(), DurableTxnStateEnum::kCommitted); + oplogEntry.setObject(cmdObj.toBSON()); + + logCommitOrAbortForPreparedTransaction(opCtx, &oplogEntry, DurableTxnStateEnum::kCommitted); } void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx, @@ -1361,18 +1119,24 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx, auto txnParticipant = TransactionParticipant::get(opCtx); const auto lastWriteOpTime = txnParticipant.getLastWriteOpTime(); invariant(lastWriteOpTime.isNull()); - logApplyOpsForTransaction( - opCtx, - statements, - prepareOpTime, - lastWriteOpTime, - boost::none /* stmtId */, - true /* prepare */, - false /* isPartialTxn */, + + MutableOplogEntry oplogEntry; + oplogEntry.setOpTime(prepareOpTime); + oplogEntry.setPrevWriteOpTimeInTransaction(lastWriteOpTime); + + BSONObjBuilder applyOpsBuilder; + // TODO(SERVER-41470): Remove limitSize==false once old transaction format is no + // longer needed. + packTransactionStatementsForApplyOps( + &applyOpsBuilder, statements.begin(), statements.end(), false /* limitSize */); + applyOpsBuilder.append("prepare", true); + oplogEntry.setObject(applyOpsBuilder.done()); + + auto txnState = boost::make_optional( fcv >= ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo42, - true /* updateTxnTable */, - boost::none /* count */, - prepareOpTime /* startOpTime */); + DurableTxnStateEnum::kPrepared); + logApplyOpsForTransaction( + opCtx, &oplogEntry, txnState, prepareOpTime, true /* updateTxnTable */); wuow.commit(); }); } else { @@ -1404,18 +1168,18 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx, BSONObjBuilder applyOpsBuilder; BSONArrayBuilder opsArray(applyOpsBuilder.subarrayStart("applyOps"_sd)); opsArray.done(); + applyOpsBuilder.append("prepare", true); + auto oplogSlot = reservedSlots.front(); + MutableOplogEntry oplogEntry; + oplogEntry.setOpTime(oplogSlot); + oplogEntry.setPrevWriteOpTimeInTransaction(repl::OpTime()); + oplogEntry.setObject(applyOpsBuilder.done()); logApplyOpsForTransaction(opCtx, - &applyOpsBuilder, + &oplogEntry, + DurableTxnStateEnum::kPrepared, oplogSlot, - repl::OpTime() /* prevWriteOpTime */, - boost::none /* stmtId */, - true /* prepare */, - false /* isPartialTxn */, - true /* shouldWriteStateField */, - true /* updateTxnTable */, - boost::none /* count */, - oplogSlot); + true /* updateTxnTable */); } wuow.commit(); }); @@ -1440,9 +1204,13 @@ void OpObserverImpl::onTransactionAbort(OperationContext* opCtx, return; } + MutableOplogEntry oplogEntry; + oplogEntry.setOpTime(*abortOplogEntryOpTime); + AbortTransactionOplogObject cmdObj; - logCommitOrAbortForPreparedTransaction( - opCtx, *abortOplogEntryOpTime, cmdObj.toBSON(), DurableTxnStateEnum::kAborted); + oplogEntry.setObject(cmdObj.toBSON()); + + logCommitOrAbortForPreparedTransaction(opCtx, &oplogEntry, DurableTxnStateEnum::kAborted); } void OpObserverImpl::onReplicationRollback(OperationContext* opCtx, diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp index d627e6f1bbe..bd333d78ecc 100644 --- a/src/mongo/db/op_observer_impl_test.cpp +++ b/src/mongo/db/op_observer_impl_test.cpp @@ -517,9 +517,9 @@ TEST_F(OpObserverTest, MultipleAboutToDeleteAndOnDelete) { AutoGetDb autoDb(opCtx.get(), nss.db(), MODE_X); WriteUnitOfWork wunit(opCtx.get()); opObserver.aboutToDelete(opCtx.get(), nss, BSON("_id" << 1)); - opObserver.onDelete(opCtx.get(), nss, uuid, {}, false, {}); + opObserver.onDelete(opCtx.get(), nss, uuid, kUninitializedStmtId, false, {}); opObserver.aboutToDelete(opCtx.get(), nss, BSON("_id" << 1)); - opObserver.onDelete(opCtx.get(), nss, uuid, {}, false, {}); + opObserver.onDelete(opCtx.get(), nss, uuid, kUninitializedStmtId, false, {}); } DEATH_TEST_F(OpObserverTest, AboutToDeleteMustPreceedOnDelete, "invariant") { @@ -527,7 +527,7 @@ DEATH_TEST_F(OpObserverTest, AboutToDeleteMustPreceedOnDelete, "invariant") { auto opCtx = cc().makeOperationContext(); opCtx->swapLockState(std::make_unique<LockerNoop>()); NamespaceString nss = {"test", "coll"}; - opObserver.onDelete(opCtx.get(), nss, {}, {}, false, {}); + opObserver.onDelete(opCtx.get(), nss, {}, kUninitializedStmtId, false, {}); } DEATH_TEST_F(OpObserverTest, EachOnDeleteRequiresAboutToDelete, "invariant") { @@ -536,8 +536,8 @@ DEATH_TEST_F(OpObserverTest, EachOnDeleteRequiresAboutToDelete, "invariant") { opCtx->swapLockState(std::make_unique<LockerNoop>()); NamespaceString nss = {"test", "coll"}; opObserver.aboutToDelete(opCtx.get(), nss, {}); - opObserver.onDelete(opCtx.get(), nss, {}, {}, false, {}); - opObserver.onDelete(opCtx.get(), nss, {}, {}, false, {}); + opObserver.onDelete(opCtx.get(), nss, {}, kUninitializedStmtId, false, {}); + opObserver.onDelete(opCtx.get(), nss, {}, kUninitializedStmtId, false, {}); } DEATH_TEST_F(OpObserverTest, diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 19176f7478b..490a2a6acf0 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -120,44 +120,6 @@ void checkOplogInsert(Status result) { massert(17322, str::stream() << "write to oplog failed: " << result.toString(), result.isOK()); } -/** - * This allows us to stream the oplog entry directly into data region - * main goal is to avoid copying the o portion - * which can be very large - * TODO: can have this build the entire doc - */ -class OplogDocWriter final : public DocWriter { -public: - OplogDocWriter(BSONObj frame, BSONObj oField) - : _frame(std::move(frame)), _oField(std::move(oField)) {} - - void writeDocument(char* start) const { - char* buf = start; - - memcpy(buf, _frame.objdata(), _frame.objsize() - 1); // don't copy final EOO - - DataView(buf).write<LittleEndian<int>>(documentSize()); - - buf += (_frame.objsize() - 1); - buf[0] = (char)Object; - buf[1] = 'o'; - buf[2] = 0; - memcpy(buf + 3, _oField.objdata(), _oField.objsize()); - buf += 3 + _oField.objsize(); - buf[0] = EOO; - - verify(static_cast<size_t>((buf + 1) - start) == documentSize()); // DEV? - } - - size_t documentSize() const { - return _frame.objsize() + _oField.objsize() + 1 /* type */ + 2 /* "o" */; - } - -private: - BSONObj _frame; - BSONObj _oField; -}; - bool shouldBuildInForeground(OperationContext* opCtx, const BSONObj& index, const NamespaceString& indexNss, @@ -328,88 +290,6 @@ void createIndexForApplyOps(OperationContext* opCtx, } } -namespace { - -/** - * Attaches the session information of a write to an oplog entry if it exists. - */ -void appendSessionInfo(OperationContext* opCtx, - BSONObjBuilder* builder, - boost::optional<StmtId> statementId, - const OperationSessionInfo& sessionInfo, - const OplogLink& oplogLink) { - if (!sessionInfo.getTxnNumber()) { - return; - } - - // Note: certain non-transaction operations, like implicit collection creation will have an - // uninitialized statementId. - if (statementId == kUninitializedStmtId) { - return; - } - - sessionInfo.serialize(builder); - - // Only non-transaction operations will have a statementId. - if (statementId) { - builder->append(OplogEntryBase::kStatementIdFieldName, *statementId); - } - oplogLink.prevOpTime.append(builder, - OplogEntryBase::kPrevWriteOpTimeInTransactionFieldName.toString()); - - if (!oplogLink.preImageOpTime.isNull()) { - oplogLink.preImageOpTime.append(builder, - OplogEntryBase::kPreImageOpTimeFieldName.toString()); - } - - if (!oplogLink.postImageOpTime.isNull()) { - oplogLink.postImageOpTime.append(builder, - OplogEntryBase::kPostImageOpTimeFieldName.toString()); - } -} - -OplogDocWriter _logOpWriter(OperationContext* opCtx, - const char* opstr, - const NamespaceString& nss, - OptionalCollectionUUID uuid, - const BSONObj& obj, - const BSONObj* o2, - bool fromMigrate, - OpTime optime, - Date_t wallTime, - const OperationSessionInfo& sessionInfo, - boost::optional<StmtId> statementId, - const OplogLink& oplogLink) { - BSONObjBuilder b(256); - - b.append("ts", optime.getTimestamp()); - if (optime.getTerm() != -1) - b.append("t", optime.getTerm()); - - // Always write zero hash instead of using FCV to gate this for retryable writes - // and change stream, who expect to be able to read oplog across FCV's. - b.append("h", 0LL); - b.append("v", OplogEntry::kOplogVersion); - b.append("op", opstr); - b.append("ns", nss.ns()); - if (uuid) - uuid->appendToBuilder(&b, "ui"); - - if (fromMigrate) - b.appendBool("fromMigrate", true); - - if (o2) - b.append("o2", *o2); - - invariant(wallTime != Date_t{}); - b.appendDate(OplogEntryBase::kWallClockTimeFieldName, wallTime); - - appendSessionInfo(opCtx, &b, statementId, sessionInfo, oplogLink); - - return OplogDocWriter(OplogDocWriter(b.obj(), obj)); -} -} // end anon namespace - /* we write to local.oplog.rs: { ts : ..., h: ..., v: ..., op: ..., etc } ts: an OpTime timestamp @@ -425,17 +305,16 @@ OplogDocWriter _logOpWriter(OperationContext* opCtx, /* - * writers - an array with size nDocs of DocWriter objects. - * timestamps - an array with size nDocs of respective Timestamp objects for each DocWriter. + * records - a vector of oplog records to be written. + * timestamps - a vector of respective Timestamp objects for each oplog record. * oplogCollection - collection to be written to. - * finalOpTime - the OpTime of the last DocWriter object. - * wallTime - the wall clock time of the corresponding oplog entry. + * finalOpTime - the OpTime of the last oplog record. + * wallTime - the wall clock time of the last oplog record. */ void _logOpsInner(OperationContext* opCtx, const NamespaceString& nss, - const DocWriter* const* writers, - Timestamp* timestamps, - size_t nDocs, + std::vector<Record>* records, + const std::vector<Timestamp>& timestamps, Collection* oplogCollection, OpTime finalOpTime, Date_t wallTime) { @@ -446,9 +325,7 @@ void _logOpsInner(OperationContext* opCtx, str::stream() << "logOp() but can't accept write to collection " << nss.ns()); } - // we jump through a bunch of hoops here to avoid copying the obj buffer twice -- - // instead we do a single copy to the destination in the record store. - checkOplogInsert(oplogCollection->insertDocumentsForOplog(opCtx, writers, timestamps, nDocs)); + checkOplogInsert(oplogCollection->insertDocumentsForOplog(opCtx, records, timestamps)); // Set replCoord last optime only after we're sure the WUOW didn't abort and roll back. opCtx->recoveryUnit()->onCommit( @@ -479,42 +356,26 @@ void _logOpsInner(OperationContext* opCtx, }); } -OpTime logOp(OperationContext* opCtx, - const char* opstr, - const NamespaceString& nss, - OptionalCollectionUUID uuid, - const BSONObj& obj, - const BSONObj* o2, - bool fromMigrate, - Date_t wallClockTime, - const OperationSessionInfo& sessionInfo, - boost::optional<StmtId> statementId, - const OplogLink& oplogLink, - const OplogSlot& oplogSlot) { +OpTime logOp(OperationContext* opCtx, MutableOplogEntry* oplogEntry) { // All collections should have UUIDs now, so all insert, update, and delete oplog entries should // also have uuids. Some no-op (n) and command (c) entries may still elide the uuid field. - invariant(uuid || 'n' == *opstr || 'c' == *opstr, - str::stream() << "Expected uuid for logOp with opstr: " << opstr << ", nss: " - << nss.ns() - << ", obj: " - << obj - << ", os: " - << o2); + invariant(oplogEntry->getUuid() || oplogEntry->getOpType() == OpTypeEnum::kNoop || + oplogEntry->getOpType() == OpTypeEnum::kCommand, + str::stream() << "Expected uuid for logOp with oplog entry: " + << redact(oplogEntry->toBSON())); auto replCoord = ReplicationCoordinator::get(opCtx); // For commands, the test below is on the command ns and therefore does not check for // specific namespaces such as system.profile. This is the caller's responsibility. - if (replCoord->isOplogDisabledFor(opCtx, nss)) { - invariant(statementId); + if (replCoord->isOplogDisabledFor(opCtx, oplogEntry->getNss())) { uassert(ErrorCodes::IllegalOperation, str::stream() << "retryable writes is not supported for unreplicated ns: " - << nss.ns(), - *statementId == kUninitializedStmtId); + << oplogEntry->getNss().ns(), + !oplogEntry->getStatementId()); return {}; } auto oplogInfo = LocalOplogInfo::get(opCtx); - // Obtain Collection exclusive intent write lock for non-document-locking storage engines. boost::optional<Lock::DBLock> dbWriteLock; boost::optional<Lock::CollectionLock> collWriteLock; @@ -523,43 +384,47 @@ OpTime logOp(OperationContext* opCtx, collWriteLock.emplace(opCtx, oplogInfo->getOplogCollectionName(), MODE_IX); } - OplogSlot slot; + // If an OpTime is not specified (i.e. isNull), a new OpTime will be assigned to the oplog entry + // within the WUOW. If a new OpTime is assigned, it needs to be reset back to a null OpTime + // before exiting this function so that the same oplog entry instance can be reused for logOp() + // again. For example, if the WUOW gets aborted within a writeConflictRetry loop, we need to + // reset the OpTime to null so a new OpTime will be assigned on retry. + OplogSlot slot = oplogEntry->getOpTime(); + auto resetOpTimeGuard = makeGuard([&, resetOpTimeOnExit = bool(slot.isNull()) ] { + if (resetOpTimeOnExit) + oplogEntry->setOpTime(OplogSlot()); + }); + WriteUnitOfWork wuow(opCtx); - if (oplogSlot.isNull()) { + if (slot.isNull()) { slot = oplogInfo->getNextOpTimes(opCtx, 1U)[0]; - } else { - slot = oplogSlot; + // TODO: make the oplogEntry a const reference instead of using the guard. + oplogEntry->setOpTime(slot); } auto oplog = oplogInfo->getCollection(); - auto writer = _logOpWriter(opCtx, - opstr, - nss, - uuid, - obj, - o2, - fromMigrate, - slot, - wallClockTime, - sessionInfo, - statementId, - oplogLink); - const DocWriter* basePtr = &writer; - auto timestamp = slot.getTimestamp(); - _logOpsInner(opCtx, nss, &basePtr, ×tamp, 1, oplog, slot, wallClockTime); + auto wallClockTime = oplogEntry->getWallClockTime(); + invariant(wallClockTime); + + auto bsonOplogEntry = oplogEntry->toBSON(); + // The storage engine will assign the RecordId based on the "ts" field of the oplog entry, see + // oploghack::extractKey. + std::vector<Record> records{ + {RecordId(), RecordData(bsonOplogEntry.objdata(), bsonOplogEntry.objsize())}}; + std::vector<Timestamp> timestamps{slot.getTimestamp()}; + _logOpsInner(opCtx, oplogEntry->getNss(), &records, timestamps, oplog, slot, *wallClockTime); wuow.commit(); return slot; } std::vector<OpTime> logInsertOps(OperationContext* opCtx, - const NamespaceString& nss, - OptionalCollectionUUID uuid, + MutableOplogEntry* oplogEntryTemplate, std::vector<InsertStatement>::const_iterator begin, - std::vector<InsertStatement>::const_iterator end, - bool fromMigrate, - Date_t wallClockTime) { + std::vector<InsertStatement>::const_iterator end) { invariant(begin != end); + oplogEntryTemplate->setOpType(repl::OpTypeEnum::kInsert); + auto nss = oplogEntryTemplate->getNss(); auto replCoord = ReplicationCoordinator::get(opCtx); if (replCoord->isOplogDisabledFor(opCtx, nss)) { uassert(ErrorCodes::IllegalOperation, @@ -570,8 +435,6 @@ std::vector<OpTime> logInsertOps(OperationContext* opCtx, } const size_t count = end - begin; - std::vector<OplogDocWriter> writers; - writers.reserve(count); auto oplogInfo = LocalOplogInfo::get(opCtx); // Obtain Collection exclusive intent write lock for non-document-locking storage engines. @@ -584,40 +447,34 @@ std::vector<OpTime> logInsertOps(OperationContext* opCtx, WriteUnitOfWork wuow(opCtx); - OperationSessionInfo sessionInfo; - OplogLink oplogLink; - - const auto txnParticipant = TransactionParticipant::get(opCtx); - if (txnParticipant) { - sessionInfo.setSessionId(*opCtx->getLogicalSessionId()); - sessionInfo.setTxnNumber(*opCtx->getTxnNumber()); - oplogLink.prevOpTime = txnParticipant.getLastWriteOpTime(); - } - - auto timestamps = std::make_unique<Timestamp[]>(count); - std::vector<OpTime> opTimes; + std::vector<OpTime> opTimes(count); + std::vector<Timestamp> timestamps(count); + std::vector<BSONObj> bsonOplogEntries(count); + std::vector<Record> records(count); for (size_t i = 0; i < count; i++) { + // Make a copy from the template for each insert oplog entry. + MutableOplogEntry oplogEntry = *oplogEntryTemplate; // Make a mutable copy. auto insertStatementOplogSlot = begin[i].oplogSlot; // Fetch optime now, if not already fetched. if (insertStatementOplogSlot.isNull()) { insertStatementOplogSlot = oplogInfo->getNextOpTimes(opCtx, 1U)[0]; } - writers.emplace_back(_logOpWriter(opCtx, - "i", - nss, - uuid, - begin[i].doc, - nullptr, - fromMigrate, - insertStatementOplogSlot, - wallClockTime, - sessionInfo, - begin[i].stmtId, - oplogLink)); - oplogLink.prevOpTime = insertStatementOplogSlot; - timestamps[i] = oplogLink.prevOpTime.getTimestamp(); - opTimes.push_back(insertStatementOplogSlot); + oplogEntry.setObject(begin[i].doc); + oplogEntry.setOpTime(insertStatementOplogSlot); + + OplogLink oplogLink; + if (i > 0) + oplogLink.prevOpTime = opTimes[i - 1]; + appendRetryableWriteInfo(opCtx, &oplogEntry, &oplogLink, begin[i].stmtId); + + opTimes[i] = insertStatementOplogSlot; + timestamps[i] = insertStatementOplogSlot.getTimestamp(); + bsonOplogEntries[i] = oplogEntry.toBSON(); + // The storage engine will assign the RecordId based on the "ts" field of the oplog entry, + // see oploghack::extractKey. + records[i] = Record{ + RecordId(), RecordData(bsonOplogEntries[i].objdata(), bsonOplogEntries[i].objsize())}; } MONGO_FAIL_POINT_BLOCK(sleepBetweenInsertOpTimeGenerationAndLogOp, customWait) { @@ -628,21 +485,42 @@ std::vector<OpTime> logInsertOps(OperationContext* opCtx, sleepmillis(numMillis); } - std::unique_ptr<DocWriter const* []> basePtrs(new DocWriter const*[count]); - for (size_t i = 0; i < count; i++) { - basePtrs[i] = &writers[i]; - } - invariant(!opTimes.empty()); auto lastOpTime = opTimes.back(); invariant(!lastOpTime.isNull()); auto oplog = oplogInfo->getCollection(); - _logOpsInner( - opCtx, nss, basePtrs.get(), timestamps.get(), count, oplog, lastOpTime, wallClockTime); + auto wallClockTime = oplogEntryTemplate->getWallClockTime(); + invariant(wallClockTime); + _logOpsInner(opCtx, nss, &records, timestamps, oplog, lastOpTime, *wallClockTime); wuow.commit(); return opTimes; } +void appendRetryableWriteInfo(OperationContext* opCtx, + MutableOplogEntry* oplogEntry, + OplogLink* oplogLink, + StmtId stmtId) { + // Not a retryable write. + if (stmtId == kUninitializedStmtId) + return; + + const auto txnParticipant = TransactionParticipant::get(opCtx); + invariant(txnParticipant); + oplogEntry->setSessionId(opCtx->getLogicalSessionId()); + oplogEntry->setTxnNumber(opCtx->getTxnNumber()); + oplogEntry->setStatementId(stmtId); + if (oplogLink->prevOpTime.isNull()) { + oplogLink->prevOpTime = txnParticipant.getLastWriteOpTime(); + } + oplogEntry->setPrevWriteOpTimeInTransaction(oplogLink->prevOpTime); + if (!oplogLink->preImageOpTime.isNull()) { + oplogEntry->setPreImageOpTime(oplogLink->preImageOpTime); + } + if (!oplogLink->postImageOpTime.isNull()) { + oplogEntry->setPostImageOpTime(oplogLink->postImageOpTime); + } +} + namespace { long long getNewOplogSizeBytes(OperationContext* opCtx, const ReplSettings& replSettings) { if (replSettings.getOplogSizeBytes() != 0) { diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h index eb898a973b3..7bf42539624 100644 --- a/src/mongo/db/repl/oplog.h +++ b/src/mongo/db/repl/oplog.h @@ -80,6 +80,22 @@ struct OplogLink { }; /** + * Set the "lsid", "txnNumber", "stmtId", "prevOpTime", "preImageOpTime" and "postImageOpTime" + * fields of the oplogEntry based on the given oplogLink for retryable writes (i.e. when stmtId != + * kUninitializedStmtId). + * + * If the given oplogLink.prevOpTime is a null OpTime, both the oplogLink.prevOpTime and the + * "prevOpTime" field of the oplogEntry will be set to the TransactionParticipant's lastWriteOpTime. + * The "preImageOpTime" field will only be set if the given oplogLink.preImageOpTime is not null. + * Similarly, the "postImageOpTime" field will only be set if the given oplogLink.postImageOpTime is + * not null. + */ +void appendRetryableWriteInfo(OperationContext* opCtx, + MutableOplogEntry* oplogEntry, + OplogLink* oplogLink, + StmtId stmtId); + +/** * Create a new capped collection for the oplog if it doesn't yet exist. * If the collection already exists (and isReplSet is false), * set the 'last' Timestamp from the last entry of the oplog collection (side effect!) @@ -96,53 +112,26 @@ void createOplog(OperationContext* opCtx); /** * Log insert(s) to the local oplog. * Returns the OpTime of every insert. + * @param oplogEntryTemplate: a template used to generate insert oplog entries. Callers must set the + * "ns", "ui", "fromMigrate" and "wall" fields before calling this function. This function will then + * augment the template with the "op" (which is set to kInsert), "lsid" and "txnNumber" fields if + * necessary. + * @param begin/end: first/last InsertStatement to be inserted. This function iterates from begin to + * end and generates insert oplog entries based on the augmented oplogEntryTemplate with the "ts", + * "t", "o", "prevOpTime" and "stmtId" fields replaced by the content of each InsertStatement + * defined by the begin-end range. + * */ std::vector<OpTime> logInsertOps(OperationContext* opCtx, - const NamespaceString& nss, - OptionalCollectionUUID uuid, + MutableOplogEntry* oplogEntryTemplate, std::vector<InsertStatement>::const_iterator begin, - std::vector<InsertStatement>::const_iterator end, - bool fromMigrate, - Date_t wallClockTime); + std::vector<InsertStatement>::const_iterator end); /** - * @param opstr - * "i" insert - * "u" update - * "d" delete - * "c" db cmd - * "n" no-op - * "db" declares presence of a database (ns is set to the db name + '.') - * - * For 'u' records, 'obj' captures the mutation made to the object but not - * the object itself. 'o2' captures the the criteria for the object that will be modified. - * - * wallClockTime this specifies the wall-clock timestamp of then this oplog entry was generated. It - * is purely informational, may not be monotonically increasing and is not interpreted in any way - * by the replication subsystem. - * stmtId specifies the statementId of an operation. For transaction operations, stmtId is always - * boost::none. - * oplogLink this contains the timestamp that points to the previous write that will be - * linked via prevTs, and the timestamps of the oplog entry that contains the document - * before/after update was applied. The timestamps are ignored if isNull() is true. - * prepare this specifies if the oplog entry should be put into a 'prepare' state. - * oplogSlot If non-null, use this reserved oplog slot instead of a new one. - * * Returns the optime of the oplog entry written to the oplog. * Returns a null optime if oplog was not modified. */ -OpTime logOp(OperationContext* opCtx, - const char* opstr, - const NamespaceString& ns, - OptionalCollectionUUID uuid, - const BSONObj& obj, - const BSONObj* o2, - bool fromMigrate, - Date_t wallClockTime, - const OperationSessionInfo& sessionInfo, - boost::optional<StmtId> stmtId, - const OplogLink& oplogLink, - const OplogSlot& oplogSlot); +OpTime logOp(OperationContext* opCtx, MutableOplogEntry* oplogEntry); // Flush out the cached pointer to the oplog. void clearLocalOplogPtr(); diff --git a/src/mongo/db/repl/oplog_applier.cpp b/src/mongo/db/repl/oplog_applier.cpp index 711bce604eb..a9879471570 100644 --- a/src/mongo/db/repl/oplog_applier.cpp +++ b/src/mongo/db/repl/oplog_applier.cpp @@ -233,7 +233,7 @@ StatusWith<OplogApplier::Operations> OplogApplier::getNextApplierBatch( while (_oplogBuffer->peek(opCtx, &op)) { auto entry = OplogEntry(op); - // Check for oplog version change. If it is absent, its value is one. + // Check for oplog version change. if (entry.getVersion() != OplogEntry::kOplogVersion) { std::string message = str::stream() << "expected oplog version " << OplogEntry::kOplogVersion << " but found version " diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp index 6903a1a029b..57a8eb033ba 100644 --- a/src/mongo/db/repl/oplog_entry.cpp +++ b/src/mongo/db/repl/oplog_entry.cpp @@ -153,12 +153,12 @@ BSONObj makeOplogEntryDoc(OpTime opTime, } // namespace -const int OplogEntry::kOplogVersion = 2; +const int MutableOplogEntry::kOplogVersion = 2; // Static -ReplOperation OplogEntry::makeInsertOperation(const NamespaceString& nss, - boost::optional<UUID> uuid, - const BSONObj& docToInsert) { +ReplOperation MutableOplogEntry::makeInsertOperation(const NamespaceString& nss, + boost::optional<UUID> uuid, + const BSONObj& docToInsert) { ReplOperation op; op.setOpType(OpTypeEnum::kInsert); op.setNss(nss); @@ -167,10 +167,10 @@ ReplOperation OplogEntry::makeInsertOperation(const NamespaceString& nss, return op; } -ReplOperation OplogEntry::makeUpdateOperation(const NamespaceString nss, - boost::optional<UUID> uuid, - const BSONObj& update, - const BSONObj& criteria) { +ReplOperation MutableOplogEntry::makeUpdateOperation(const NamespaceString nss, + boost::optional<UUID> uuid, + const BSONObj& update, + const BSONObj& criteria) { ReplOperation op; op.setOpType(OpTypeEnum::kUpdate); op.setNss(nss); @@ -180,9 +180,9 @@ ReplOperation OplogEntry::makeUpdateOperation(const NamespaceString nss, return op; } -ReplOperation OplogEntry::makeDeleteOperation(const NamespaceString& nss, - boost::optional<UUID> uuid, - const BSONObj& docToDelete) { +ReplOperation MutableOplogEntry::makeDeleteOperation(const NamespaceString& nss, + boost::optional<UUID> uuid, + const BSONObj& docToDelete) { ReplOperation op; op.setOpType(OpTypeEnum::kDelete); op.setNss(nss); @@ -191,6 +191,31 @@ ReplOperation OplogEntry::makeDeleteOperation(const NamespaceString& nss, return op; } +StatusWith<MutableOplogEntry> MutableOplogEntry::parse(const BSONObj& object) { + try { + MutableOplogEntry oplogEntry; + oplogEntry.parseProtected(IDLParserErrorContext("OplogEntryBase"), object); + return oplogEntry; + } catch (...) { + return exceptionToStatus(); + } + MONGO_UNREACHABLE; +} + +void MutableOplogEntry::setOpTime(const OpTime& opTime)& { + setTimestamp(opTime.getTimestamp()); + if (opTime.getTerm() != OpTime::kUninitializedTerm) + setTerm(opTime.getTerm()); +} + +OpTime MutableOplogEntry::getOpTime() const { + long long term = OpTime::kUninitializedTerm; + if (getTerm()) { + term = getTerm().get(); + } + return OpTime(getTimestamp(), term); +} + size_t OplogEntry::getDurableReplOperationSize(const DurableReplOperation& op) { return sizeof(op) + op.getNss().size() + op.getObject().objsize() + (op.getObject2() ? op.getObject2()->objsize() : 0); @@ -310,14 +335,6 @@ int OplogEntry::getRawObjSizeBytes() const { return _raw.objsize(); } -OpTime OplogEntry::getOpTime() const { - long long term = OpTime::kUninitializedTerm; - if (getTerm()) { - term = getTerm().get(); - } - return OpTime(getTimestamp(), term); -} - std::string OplogEntry::toString() const { return _raw.toString(); } diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h index 8a07c462325..7d98aee00f0 100644 --- a/src/mongo/db/repl/oplog_entry.h +++ b/src/mongo/db/repl/oplog_entry.h @@ -66,57 +66,140 @@ private: }; /** - * A parsed oplog entry that privately inherits from the OplogEntryBase parsed by the IDL. + * Mutable class used on primary to build up oplog entries progressively. + */ +class MutableOplogEntry : public OplogEntryBase { +public: + // Current oplog version, should be the value of the v field in all oplog entries. + static const int kOplogVersion; + + // Helpers to generate ReplOperation. + static ReplOperation makeInsertOperation(const NamespaceString& nss, + boost::optional<UUID> uuid, + const BSONObj& docToInsert); + static ReplOperation makeUpdateOperation(const NamespaceString nss, + boost::optional<UUID> uuid, + const BSONObj& update, + const BSONObj& criteria); + static ReplOperation makeDeleteOperation(const NamespaceString& nss, + boost::optional<UUID> uuid, + const BSONObj& docToDelete); + + static StatusWith<MutableOplogEntry> parse(const BSONObj& object); + + MutableOplogEntry() : OplogEntryBase() {} + + void setSessionId(boost::optional<LogicalSessionId> value) & { + getOperationSessionInfo().setSessionId(std::move(value)); + } + + void setTxnNumber(boost::optional<std::int64_t> value) & { + getOperationSessionInfo().setTxnNumber(std::move(value)); + } + + void setOpType(OpTypeEnum value) & { + getDurableReplOperation().setOpType(std::move(value)); + } + + void setNss(NamespaceString value) & { + getDurableReplOperation().setNss(std::move(value)); + } + + void setUuid(boost::optional<UUID> value) & { + getDurableReplOperation().setUuid(std::move(value)); + } + + void setObject(BSONObj value) & { + getDurableReplOperation().setObject(std::move(value)); + } + + void setObject2(boost::optional<BSONObj> value) & { + getDurableReplOperation().setObject2(std::move(value)); + } + + void setUpsert(boost::optional<bool> value) & { + getDurableReplOperation().setUpsert(std::move(value)); + } + + /** + * Sets the OpTime of the oplog entry. + */ + void setOpTime(const OpTime& opTime) &; + + /** + * Returns the OpTime of the oplog entry. + */ + OpTime getOpTime() const; + + /** + * Same as setFromMigrate but only set when it is true. + */ + void setFromMigrateIfTrue(bool value) & { + if (value) + setFromMigrate(value); + } +}; + +/** + * A parsed oplog entry that privately inherits from the MutableOplogEntry. * This class is immutable. All setters are hidden. */ -class OplogEntry : private OplogEntryBase { +class OplogEntry : private MutableOplogEntry { public: // Make field names accessible. - using OplogEntryBase::kDurableReplOperationFieldName; - using OplogEntryBase::kOperationSessionInfoFieldName; - using OplogEntryBase::k_idFieldName; - using OplogEntryBase::kFromMigrateFieldName; - using OplogEntryBase::kHashFieldName; - using OplogEntryBase::kNssFieldName; - using OplogEntryBase::kObjectFieldName; - using OplogEntryBase::kObject2FieldName; - using OplogEntryBase::kOpTypeFieldName; - using OplogEntryBase::kPostImageOpTimeFieldName; - using OplogEntryBase::kPreImageOpTimeFieldName; - using OplogEntryBase::kPrevWriteOpTimeInTransactionFieldName; - using OplogEntryBase::kSessionIdFieldName; - using OplogEntryBase::kStatementIdFieldName; - using OplogEntryBase::kTermFieldName; - using OplogEntryBase::kTimestampFieldName; - using OplogEntryBase::kTxnNumberFieldName; - using OplogEntryBase::kUpsertFieldName; - using OplogEntryBase::kUuidFieldName; - using OplogEntryBase::kVersionFieldName; - using OplogEntryBase::kWallClockTimeFieldName; + using MutableOplogEntry::kDurableReplOperationFieldName; + using MutableOplogEntry::kOperationSessionInfoFieldName; + using MutableOplogEntry::k_idFieldName; + using MutableOplogEntry::kFromMigrateFieldName; + using MutableOplogEntry::kHashFieldName; + using MutableOplogEntry::kNssFieldName; + using MutableOplogEntry::kObjectFieldName; + using MutableOplogEntry::kObject2FieldName; + using MutableOplogEntry::kOpTypeFieldName; + using MutableOplogEntry::kPostImageOpTimeFieldName; + using MutableOplogEntry::kPreImageOpTimeFieldName; + using MutableOplogEntry::kPrevWriteOpTimeInTransactionFieldName; + using MutableOplogEntry::kSessionIdFieldName; + using MutableOplogEntry::kStatementIdFieldName; + using MutableOplogEntry::kTermFieldName; + using MutableOplogEntry::kTimestampFieldName; + using MutableOplogEntry::kTxnNumberFieldName; + using MutableOplogEntry::kUpsertFieldName; + using MutableOplogEntry::kUuidFieldName; + using MutableOplogEntry::kVersionFieldName; + using MutableOplogEntry::kWallClockTimeFieldName; + using MutableOplogEntry::kOplogVersion; + // Make serialize(), toBSON() and getters accessible. - using OplogEntryBase::serialize; - using OplogEntryBase::toBSON; - using OplogEntryBase::getOperationSessionInfo; - using OplogEntryBase::getSessionId; - using OplogEntryBase::getTxnNumber; - using OplogEntryBase::getDurableReplOperation; - using OplogEntryBase::getOpType; - using OplogEntryBase::getNss; - using OplogEntryBase::getUuid; - using OplogEntryBase::getObject; - using OplogEntryBase::getObject2; - using OplogEntryBase::getUpsert; - using OplogEntryBase::getTimestamp; - using OplogEntryBase::getTerm; - using OplogEntryBase::getHash; - using OplogEntryBase::getVersion; - using OplogEntryBase::getFromMigrate; - using OplogEntryBase::get_id; - using OplogEntryBase::getWallClockTime; - using OplogEntryBase::getStatementId; - using OplogEntryBase::getPrevWriteOpTimeInTransaction; - using OplogEntryBase::getPreImageOpTime; - using OplogEntryBase::getPostImageOpTime; + using MutableOplogEntry::serialize; + using MutableOplogEntry::toBSON; + using MutableOplogEntry::getOperationSessionInfo; + using MutableOplogEntry::getSessionId; + using MutableOplogEntry::getTxnNumber; + using MutableOplogEntry::getDurableReplOperation; + using MutableOplogEntry::getOpType; + using MutableOplogEntry::getNss; + using MutableOplogEntry::getUuid; + using MutableOplogEntry::getObject; + using MutableOplogEntry::getObject2; + using MutableOplogEntry::getUpsert; + using MutableOplogEntry::getTimestamp; + using MutableOplogEntry::getTerm; + using MutableOplogEntry::getHash; + using MutableOplogEntry::getVersion; + using MutableOplogEntry::getFromMigrate; + using MutableOplogEntry::get_id; + using MutableOplogEntry::getWallClockTime; + using MutableOplogEntry::getStatementId; + using MutableOplogEntry::getPrevWriteOpTimeInTransaction; + using MutableOplogEntry::getPreImageOpTime; + using MutableOplogEntry::getPostImageOpTime; + + // Make helper functions accessible. + using MutableOplogEntry::getOpTime; + using MutableOplogEntry::makeInsertOperation; + using MutableOplogEntry::makeUpdateOperation; + using MutableOplogEntry::makeDeleteOperation; enum class CommandType { kNotCommand, @@ -138,21 +221,6 @@ public: kAbortTransaction, }; - // Current oplog version, should be the value of the v field in all oplog entries. - static const int kOplogVersion; - - // Helpers to generate ReplOperation. - static ReplOperation makeInsertOperation(const NamespaceString& nss, - boost::optional<UUID> uuid, - const BSONObj& docToInsert); - static ReplOperation makeUpdateOperation(const NamespaceString nss, - boost::optional<UUID> uuid, - const BSONObj& update, - const BSONObj& criteria); - static ReplOperation makeDeleteOperation(const NamespaceString& nss, - boost::optional<UUID> uuid, - const BSONObj& docToDelete); - // Get the in-memory size in bytes of a ReplOperation. static size_t getDurableReplOperationSize(const DurableReplOperation& op); @@ -251,11 +319,6 @@ public: } /** - * Returns the OpTime of the oplog entry. - */ - OpTime getOpTime() const; - - /** * Serializes the oplog entry to a string. */ std::string toString() const; diff --git a/src/mongo/db/repl/oplog_entry.idl b/src/mongo/db/repl/oplog_entry.idl index cfed4af41cf..cc9a75172cb 100644 --- a/src/mongo/db/repl/oplog_entry.idl +++ b/src/mongo/db/repl/oplog_entry.idl @@ -105,7 +105,7 @@ structs: v: cpp_name: version type: safeInt64 - default: 1 + default: 2 description: "The version of the oplog" fromMigrate: type: bool diff --git a/src/mongo/db/repl/oplog_test.cpp b/src/mongo/db/repl/oplog_test.cpp index dafae7e8784..870cd21980e 100644 --- a/src/mongo/db/repl/oplog_test.cpp +++ b/src/mongo/db/repl/oplog_test.cpp @@ -99,20 +99,14 @@ TEST_F(OplogTest, LogOpReturnsOpTimeOnSuccessfulInsertIntoOplogCollection) { // Write to the oplog. OpTime opTime; { + MutableOplogEntry oplogEntry; + oplogEntry.setOpType(repl::OpTypeEnum::kNoop); + oplogEntry.setNss(nss); + oplogEntry.setObject(msgObj); + oplogEntry.setWallClockTime(Date_t::now()); AutoGetDb autoDb(opCtx.get(), nss.db(), MODE_X); WriteUnitOfWork wunit(opCtx.get()); - opTime = logOp(opCtx.get(), - "n", - nss, - {}, - msgObj, - nullptr, - false, - Date_t::now(), - {}, - kUninitializedStmtId, - {}, - OplogSlot()); + opTime = logOp(opCtx.get(), &oplogEntry); ASSERT_FALSE(opTime.isNull()); wunit.commit(); } @@ -223,19 +217,12 @@ OpTime _logOpNoopWithMsg(OperationContext* opCtx, // logOp() must be called while holding lock because ephemeralForTest storage engine does not // support concurrent updates to its internal state. - const auto msgObj = BSON("msg" << nss.ns()); - auto opTime = logOp(opCtx, - "n", - nss, - {}, - msgObj, - nullptr, - false, - Date_t::now(), - {}, - kUninitializedStmtId, - {}, - OplogSlot()); + MutableOplogEntry oplogEntry; + oplogEntry.setOpType(repl::OpTypeEnum::kNoop); + oplogEntry.setNss(nss); + oplogEntry.setObject(BSON("msg" << nss.ns())); + oplogEntry.setWallClockTime(Date_t::now()); + auto opTime = logOp(opCtx, &oplogEntry); ASSERT_FALSE(opTime.isNull()); ASSERT(opTimeNssMap->find(opTime) == opTimeNssMap->end()) diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp index eb60d0bb18a..1482462fce9 100644 --- a/src/mongo/db/s/session_catalog_migration_destination.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination.cpp @@ -79,71 +79,60 @@ BSONObj buildMigrateSessionCmd(const MigrationSessionId& migrationSessionId) { } /** - * Determines whether the oplog entry has a link to either preImage/postImage and return a new - * oplogLink that contains the same link, but pointing to lastResult.oplogTime. For example, if - * entry has link to preImageTs, this returns an oplogLink with preImageTs pointing to + * Determines whether the oplog entry has a link to either preImage/postImage and sets a new link + * to lastResult.oplogTime. For example, if entry has link to preImageTs, this sets preImageTs to * lastResult.oplogTime. * * It is an error to have both preImage and postImage as well as not having them at all. */ -repl::OplogLink extractPrePostImageTs(const ProcessOplogResult& lastResult, - const repl::OplogEntry& entry) { - repl::OplogLink oplogLink; - +void setPrePostImageTs(const ProcessOplogResult& lastResult, repl::MutableOplogEntry* entry) { if (!lastResult.isPrePostImage) { uassert(40628, - str::stream() << "expected oplog with ts: " << entry.getTimestamp().toString() + str::stream() << "expected oplog with ts: " << entry->getTimestamp().toString() << " to not have " << repl::OplogEntryBase::kPreImageOpTimeFieldName << " or " << repl::OplogEntryBase::kPostImageOpTimeFieldName, - !entry.getPreImageOpTime() && !entry.getPostImageOpTime()); - - return oplogLink; + !entry->getPreImageOpTime() && !entry->getPostImageOpTime()); + return; } invariant(!lastResult.oplogTime.isNull()); - const auto& sessionInfo = entry.getOperationSessionInfo(); - const auto sessionId = *sessionInfo.getSessionId(); - const auto txnNum = *sessionInfo.getTxnNumber(); - uassert(40629, - str::stream() << "expected oplog with ts: " << entry.getTimestamp().toString() << ": " - << redact(entry.toBSON()) + str::stream() << "expected oplog with ts: " << entry->getTimestamp().toString() << ": " + << redact(entry->toBSON()) << " to have session: " << lastResult.sessionId, - lastResult.sessionId == sessionId); + lastResult.sessionId == entry->getSessionId()); uassert(40630, - str::stream() << "expected oplog with ts: " << entry.getTimestamp().toString() << ": " - << redact(entry.toBSON()) + str::stream() << "expected oplog with ts: " << entry->getTimestamp().toString() << ": " + << redact(entry->toBSON()) << " to have txnNumber: " << lastResult.txnNum, - lastResult.txnNum == txnNum); + lastResult.txnNum == entry->getTxnNumber()); - if (entry.getPreImageOpTime()) { - oplogLink.preImageOpTime = lastResult.oplogTime; - } else if (entry.getPostImageOpTime()) { - oplogLink.postImageOpTime = lastResult.oplogTime; + if (entry->getPreImageOpTime()) { + entry->setPreImageOpTime(lastResult.oplogTime); + } else if (entry->getPostImageOpTime()) { + entry->setPostImageOpTime(lastResult.oplogTime); } else { uasserted(40631, - str::stream() << "expected oplog with opTime: " << entry.getOpTime().toString() + str::stream() << "expected oplog with opTime: " << entry->getOpTime().toString() << ": " - << redact(entry.toBSON()) + << redact(entry->toBSON()) << " to have either " << repl::OplogEntryBase::kPreImageOpTimeFieldName << " or " << repl::OplogEntryBase::kPostImageOpTimeFieldName); } - - return oplogLink; } /** * Parses the oplog into an oplog entry and makes sure that it contains the expected fields. */ -repl::OplogEntry parseOplog(const BSONObj& oplogBSON) { - auto oplogEntry = uassertStatusOK(repl::OplogEntry::parse(oplogBSON)); +repl::MutableOplogEntry parseOplog(const BSONObj& oplogBSON) { + auto oplogEntry = uassertStatusOK(repl::MutableOplogEntry::parse(oplogBSON)); // Session oplog entries must always contain wall clock time, because we will not be // transferring anything from a previous version of the server @@ -208,13 +197,11 @@ BSONObj getNextSessionOplogBatch(OperationContext* opCtx, ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON, const ProcessOplogResult& lastResult) { auto oplogEntry = parseOplog(oplogBSON); - const auto& sessionInfo = oplogEntry.getOperationSessionInfo(); ProcessOplogResult result; - result.sessionId = *sessionInfo.getSessionId(); - result.txnNum = *sessionInfo.getTxnNumber(); + result.sessionId = *oplogEntry.getSessionId(); + result.txnNum = *oplogEntry.getTxnNumber(); - BSONObj object2; if (oplogEntry.getOpType() == repl::OpTypeEnum::kNoop) { // Note: Oplog is already no-op type, no need to nest. // There are two types of type 'n' oplog format expected here: @@ -225,8 +212,11 @@ ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON, // findAndModify operation. In this case, o field contains the relevant info // and o2 will be empty. + BSONObj object2; if (oplogEntry.getObject2()) { object2 = *oplogEntry.getObject2(); + } else { + oplogEntry.setObject2(object2); } if (object2.isEmpty()) { @@ -242,7 +232,7 @@ ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON, !lastResult.isPrePostImage); } } else { - object2 = oplogBSON; // TODO: strip redundant info? + oplogEntry.setObject2(oplogBSON); // TODO: strip redundant info? } const auto stmtId = *oplogEntry.getStatementId(); @@ -275,11 +265,16 @@ ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON, throw; } - BSONObj object(result.isPrePostImage - ? oplogEntry.getObject() - : BSON(SessionCatalogMigrationDestination::kSessionMigrateOplogTag << 1)); - auto oplogLink = extractPrePostImageTs(lastResult, oplogEntry); - oplogLink.prevOpTime = txnParticipant.getLastWriteOpTime(); + if (!result.isPrePostImage) + oplogEntry.setObject( + BSON(SessionCatalogMigrationDestination::kSessionMigrateOplogTag << 1)); + setPrePostImageTs(lastResult, &oplogEntry); + oplogEntry.setPrevWriteOpTimeInTransaction(txnParticipant.getLastWriteOpTime()); + + oplogEntry.setOpType(repl::OpTypeEnum::kNoop); + oplogEntry.setFromMigrate(true); + // Reset OpTime so logOp() can assign a new one. + oplogEntry.setOpTime(OplogSlot()); writeConflictRetry( opCtx, @@ -295,18 +290,7 @@ ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON, opCtx, NamespaceString::kSessionTransactionsTableNamespace.db(), MODE_IX); WriteUnitOfWork wunit(opCtx); - result.oplogTime = repl::logOp(opCtx, - "n", - oplogEntry.getNss(), - oplogEntry.getUuid(), - object, - &object2, - true, - *oplogEntry.getWallClockTime(), - sessionInfo, - stmtId, - oplogLink, - OplogSlot()); + result.oplogTime = repl::logOp(opCtx, &oplogEntry); const auto& oplogOpTime = result.oplogTime; uassert(40633, diff --git a/src/mongo/db/storage/biggie/biggie_record_store.cpp b/src/mongo/db/storage/biggie/biggie_record_store.cpp index 60a6dcdfd04..6e656149170 100644 --- a/src/mongo/db/storage/biggie/biggie_record_store.cpp +++ b/src/mongo/db/storage/biggie/biggie_record_store.cpp @@ -187,53 +187,6 @@ Status RecordStore::insertRecords(OperationContext* opCtx, return Status::OK(); } -Status RecordStore::insertRecordsWithDocWriter(OperationContext* opCtx, - const DocWriter* const* docs, - const Timestamp*, - size_t nDocs, - RecordId* idsOut) { - int64_t totalSize = 0; - for (size_t i = 0; i < nDocs; i++) - totalSize += docs[i]->documentSize(); - - // Caller will retry one element at a time. - if (_isCapped && totalSize > _cappedMaxSize) - return Status(ErrorCodes::BadValue, "object to insert exceeds cappedMaxSize"); - - auto ru = RecoveryUnit::get(opCtx); - StringStore* workingCopy(ru->getHead()); - { - SizeAdjuster adjuster(opCtx, this); - for (size_t i = 0; i < nDocs; i++) { - const size_t len = docs[i]->documentSize(); - - std::string buf(len, '\0'); - docs[i]->writeDocument(&buf[0]); - - int64_t thisRecordId = 0; - if (_isOplog) { - StatusWith<RecordId> status = oploghack::extractKey(buf.data(), len); - if (!status.isOK()) - return status.getStatus(); - thisRecordId = status.getValue().repr(); - _visibilityManager->addUncommittedRecord(opCtx, this, RecordId(thisRecordId)); - } else { - thisRecordId = _nextRecordId(); - } - std::string key = createKey(_ident, thisRecordId); - - StringStore::value_type vt{key, buf}; - workingCopy->insert(std::move(vt)); - if (idsOut) - idsOut[i] = RecordId(thisRecordId); - ru->makeDirty(); - } - } - - _cappedDeleteAsNeeded(opCtx, workingCopy); - return Status::OK(); -} - Status RecordStore::updateRecord(OperationContext* opCtx, const RecordId& oldLocation, const char* data, diff --git a/src/mongo/db/storage/biggie/biggie_record_store.h b/src/mongo/db/storage/biggie/biggie_record_store.h index 813c575ecf7..e8dee66da1c 100644 --- a/src/mongo/db/storage/biggie/biggie_record_store.h +++ b/src/mongo/db/storage/biggie/biggie_record_store.h @@ -75,12 +75,6 @@ public: std::vector<Record>* inOutRecords, const std::vector<Timestamp>& timestamps); - virtual Status insertRecordsWithDocWriter(OperationContext* opCtx, - const DocWriter* const* docs, - const Timestamp*, - size_t nDocs, - RecordId* idsOut); - virtual Status updateRecord(OperationContext* opCtx, const RecordId& oldLocation, const char* data, diff --git a/src/mongo/db/storage/devnull/devnull_kv_engine.cpp b/src/mongo/db/storage/devnull/devnull_kv_engine.cpp index 8255616eba7..9b69642f9b0 100644 --- a/src/mongo/db/storage/devnull/devnull_kv_engine.cpp +++ b/src/mongo/db/storage/devnull/devnull_kv_engine.cpp @@ -108,20 +108,6 @@ public: return Status::OK(); } - virtual Status insertRecordsWithDocWriter(OperationContext* opCtx, - const DocWriter* const* docs, - const Timestamp*, - size_t nDocs, - RecordId* idsOut) { - _numInserts += nDocs; - if (idsOut) { - for (size_t i = 0; i < nDocs; i++) { - idsOut[i] = RecordId(6, 4); - } - } - return Status::OK(); - } - virtual Status updateRecord(OperationContext* opCtx, const RecordId& oldLocation, const char* data, diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp index e0dfd6fcc06..dc0d2fa1ed0 100644 --- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp +++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp @@ -443,47 +443,6 @@ Status EphemeralForTestRecordStore::insertRecords(OperationContext* opCtx, return Status::OK(); } -Status EphemeralForTestRecordStore::insertRecordsWithDocWriter(OperationContext* opCtx, - const DocWriter* const* docs, - const Timestamp*, - size_t nDocs, - RecordId* idsOut) { - stdx::lock_guard<stdx::recursive_mutex> lock(_data->recordsMutex); - - for (size_t i = 0; i < nDocs; i++) { - const int len = docs[i]->documentSize(); - if (_isCapped && len > _cappedMaxSize) { - // We use dataSize for capped rollover and we don't want to delete everything if we - // know this won't fit. - return Status(ErrorCodes::BadValue, "object to insert exceeds cappedMaxSize"); - } - - EphemeralForTestRecord rec(len); - docs[i]->writeDocument(rec.data.get()); - - RecordId loc; - if (_data->isOplog) { - StatusWith<RecordId> status = extractAndCheckLocForOplog(lock, rec.data.get(), len); - if (!status.isOK()) - return status.getStatus(); - loc = status.getValue(); - } else { - loc = allocateLoc(lock); - } - - opCtx->recoveryUnit()->registerChange(new InsertChange(opCtx, _data, loc)); - _data->dataSize += len; - _data->records[loc] = rec; - - cappedDeleteAsNeeded(lock, opCtx); - - if (idsOut) - idsOut[i] = loc; - } - - return Status::OK(); -} - Status EphemeralForTestRecordStore::updateRecord(OperationContext* opCtx, const RecordId& loc, const char* data, diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.h b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.h index 439cbe1cdf6..2fdbaaa579e 100644 --- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.h +++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.h @@ -71,12 +71,6 @@ public: std::vector<Record>* inOutRecords, const std::vector<Timestamp>& timestamps); - virtual Status insertRecordsWithDocWriter(OperationContext* opCtx, - const DocWriter* const* docs, - const Timestamp*, - size_t nDocs, - RecordId* idsOut); - virtual Status updateRecord(OperationContext* opCtx, const RecordId& oldLocation, const char* data, diff --git a/src/mongo/db/storage/mobile/mobile_record_store.cpp b/src/mongo/db/storage/mobile/mobile_record_store.cpp index ba81032d609..1942214db32 100644 --- a/src/mongo/db/storage/mobile/mobile_record_store.cpp +++ b/src/mongo/db/storage/mobile/mobile_record_store.cpp @@ -325,30 +325,6 @@ Status MobileRecordStore::insertRecords(OperationContext* opCtx, return Status::OK(); } -Status MobileRecordStore::insertRecordsWithDocWriter(OperationContext* opCtx, - const DocWriter* const* docs, - const Timestamp* timestamps, - size_t nDocs, - RecordId* idsOut) { - // Calculates the total size of the data buffer. - size_t totalSize = 0; - for (size_t i = 0; i < nDocs; i++) { - totalSize += docs[i]->documentSize(); - } - - std::unique_ptr<char[]> buffer(new char[totalSize]); - char* pos = buffer.get(); - for (size_t i = 0; i < nDocs; i++) { - docs[i]->writeDocument(pos); - size_t docLen = docs[i]->documentSize(); - StatusWith<RecordId> res = insertRecord(opCtx, pos, docLen, timestamps[i]); - idsOut[i] = res.getValue(); - pos += docLen; - } - - return Status::OK(); -} - Status MobileRecordStore::updateRecord(OperationContext* opCtx, const RecordId& recId, const char* data, diff --git a/src/mongo/db/storage/mobile/mobile_record_store.h b/src/mongo/db/storage/mobile/mobile_record_store.h index 9379a7760f7..d5c36e91a3e 100644 --- a/src/mongo/db/storage/mobile/mobile_record_store.h +++ b/src/mongo/db/storage/mobile/mobile_record_store.h @@ -63,12 +63,6 @@ public: std::vector<Record>* inOutRecords, const std::vector<Timestamp>& timestamps) override; - Status insertRecordsWithDocWriter(OperationContext* opCtx, - const DocWriter* const* docs, - const Timestamp* timestamps, - size_t nDocs, - RecordId* idsOut) override; - Status updateRecord(OperationContext* opCtx, const RecordId& oldLocation, const char* data, diff --git a/src/mongo/db/storage/record_store.h b/src/mongo/db/storage/record_store.h index d4a5151faff..3da5078559b 100644 --- a/src/mongo/db/storage/record_store.h +++ b/src/mongo/db/storage/record_store.h @@ -62,22 +62,6 @@ struct CompactOptions { struct CompactStats {}; /** - * Allows inserting a Record "in-place" without creating a copy ahead of time. - */ -class DocWriter { -public: - virtual void writeDocument(char* buf) const = 0; - virtual size_t documentSize() const = 0; - virtual bool addPadding() const { - return true; - } - -protected: - // Can't delete through base pointer. - ~DocWriter() = default; -}; - -/** * The data items stored in a RecordStore. */ struct Record { @@ -359,34 +343,6 @@ public: } /** - * Inserts nDocs documents into this RecordStore using the DocWriter interface. - * - * This allows the storage engine to reserve space for a record and have it built in-place - * rather than building the record then copying it into its destination. - * - * On success, if idsOut is non-null the RecordIds of the inserted records will be written into - * it. It must have space for nDocs RecordIds. - */ - virtual Status insertRecordsWithDocWriter(OperationContext* opCtx, - const DocWriter* const* docs, - const Timestamp* timestamps, - size_t nDocs, - RecordId* idsOut = nullptr) = 0; - - /** - * A thin wrapper around insertRecordsWithDocWriter() to simplify handling of single DocWriters. - */ - StatusWith<RecordId> insertRecordWithDocWriter(OperationContext* opCtx, - const DocWriter* doc, - Timestamp timestamp) { - RecordId out; - Status status = insertRecordsWithDocWriter(opCtx, &doc, ×tamp, 1, &out); - if (!status.isOK()) - return status; - return out; - } - - /** * Updates the record with id 'recordId', replacing its contents with those described by * 'data' and 'len'. */ diff --git a/src/mongo/db/storage/record_store_test_docwriter.h b/src/mongo/db/storage/record_store_test_docwriter.h deleted file mode 100644 index af40b8de3d0..00000000000 --- a/src/mongo/db/storage/record_store_test_docwriter.h +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include "mongo/db/storage/record_store.h" - -namespace mongo { -namespace { - -class StringDocWriter final : public DocWriter { -public: - StringDocWriter(const std::string& data, bool padding) : _data(data), _padding(padding) {} - - ~StringDocWriter() {} - - void writeDocument(char* buf) const { - memcpy(buf, _data.c_str(), documentSize()); - } - - size_t documentSize() const { - return _data.size() + 1; - } - - bool addPadding() const { - return _padding; - } - -private: - std::string _data; - bool _padding; -}; - -} // namespace -} // namespace mongo diff --git a/src/mongo/db/storage/record_store_test_harness.cpp b/src/mongo/db/storage/record_store_test_harness.cpp index b93e8029d34..9f4214aef1f 100644 --- a/src/mongo/db/storage/record_store_test_harness.cpp +++ b/src/mongo/db/storage/record_store_test_harness.cpp @@ -98,49 +98,6 @@ TEST(RecordStoreTestHarness, Simple1) { } } -namespace { -class DummyDocWriter final : public DocWriter { -public: - virtual ~DummyDocWriter() {} - - virtual void writeDocument(char* buf) const { - memcpy(buf, "eliot", 6); - } - - virtual size_t documentSize() const { - return 6; - } - - virtual bool addPadding() const { - return false; - } -}; -} - - -TEST(RecordStoreTestHarness, Simple1InsertDocWroter) { - const auto harnessHelper(newRecordStoreHarnessHelper()); - unique_ptr<RecordStore> rs(harnessHelper->newNonCappedRecordStore()); - - RecordId loc1; - - { - ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext()); - - { - WriteUnitOfWork uow(opCtx.get()); - DummyDocWriter dw; - StatusWith<RecordId> res = - rs->insertRecordWithDocWriter(opCtx.get(), &dw, Timestamp(1)); - ASSERT_OK(res.getStatus()); - loc1 = res.getValue(); - uow.commit(); - } - - ASSERT_EQUALS(string("eliot"), rs->dataFor(opCtx.get(), loc1).data()); - } -} - TEST(RecordStoreTestHarness, Delete1) { const auto harnessHelper(newRecordStoreHarnessHelper()); unique_ptr<RecordStore> rs(harnessHelper->newNonCappedRecordStore()); diff --git a/src/mongo/db/storage/record_store_test_insertrecord.cpp b/src/mongo/db/storage/record_store_test_insertrecord.cpp index 55e50355f31..b5b9afcc9bb 100644 --- a/src/mongo/db/storage/record_store_test_insertrecord.cpp +++ b/src/mongo/db/storage/record_store_test_insertrecord.cpp @@ -32,7 +32,6 @@ #include "mongo/db/record_id.h" #include "mongo/db/storage/record_data.h" #include "mongo/db/storage/record_store.h" -#include "mongo/db/storage/record_store_test_docwriter.h" #include "mongo/db/storage/record_store_test_harness.h" #include "mongo/unittest/unittest.h" @@ -108,72 +107,5 @@ TEST(RecordStoreTestHarness, InsertMultipleRecords) { } } -// Insert a record using a DocWriter and verify the number of entries -// in the collection is 1. -TEST(RecordStoreTestHarness, InsertRecordUsingDocWriter) { - const auto harnessHelper(newRecordStoreHarnessHelper()); - unique_ptr<RecordStore> rs(harnessHelper->newNonCappedRecordStore()); - - { - ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext()); - ASSERT_EQUALS(0, rs->numRecords(opCtx.get())); - } - - RecordId loc; - { - ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext()); - { - StringDocWriter docWriter("my record", false); - - WriteUnitOfWork uow(opCtx.get()); - StatusWith<RecordId> res = - rs->insertRecordWithDocWriter(opCtx.get(), &docWriter, Timestamp(1)); - ASSERT_OK(res.getStatus()); - loc = res.getValue(); - uow.commit(); - } - } - - { - ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext()); - ASSERT_EQUALS(1, rs->numRecords(opCtx.get())); - } -} - -// Insert multiple records using a DocWriter and verify the number of entries -// in the collection equals the number that were inserted. -TEST(RecordStoreTestHarness, InsertMultipleRecordsUsingDocWriter) { - const auto harnessHelper(newRecordStoreHarnessHelper()); - unique_ptr<RecordStore> rs(harnessHelper->newNonCappedRecordStore()); - - { - ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext()); - ASSERT_EQUALS(0, rs->numRecords(opCtx.get())); - } - - const int nToInsert = 10; - RecordId locs[nToInsert]; - for (int i = 0; i < nToInsert; i++) { - ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext()); - { - stringstream ss; - ss << "record " << i; - StringDocWriter docWriter(ss.str(), false); - - WriteUnitOfWork uow(opCtx.get()); - StatusWith<RecordId> res = - rs->insertRecordWithDocWriter(opCtx.get(), &docWriter, Timestamp(1)); - ASSERT_OK(res.getStatus()); - locs[i] = res.getValue(); - uow.commit(); - } - } - - { - ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext()); - ASSERT_EQUALS(nToInsert, rs->numRecords(opCtx.get())); - } -} - } // namespace } // namespace mongo diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp index 97fd85aaaf5..d711686fc42 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp @@ -1338,48 +1338,6 @@ void WiredTigerRecordStore::notifyCappedWaitersIfNeeded() { } } -Status WiredTigerRecordStore::insertRecordsWithDocWriter(OperationContext* opCtx, - const DocWriter* const* docs, - const Timestamp* timestamps, - size_t nDocs, - RecordId* idsOut) { - dassert(opCtx->lockState()->isReadLocked()); - - std::unique_ptr<Record[]> records(new Record[nDocs]); - - // First get all the sizes so we can allocate a single buffer for all documents. Eventually it - // would be nice if we could either hand off the buffers to WT without copying or write them - // in-place as we do with MMAPv1, but for now this is the best we can do. - size_t totalSize = 0; - for (size_t i = 0; i < nDocs; i++) { - const size_t docSize = docs[i]->documentSize(); - records[i].data = RecordData(nullptr, docSize); // We fill in the real ptr in next loop. - totalSize += docSize; - } - - std::unique_ptr<char[]> buffer(new char[totalSize]); - char* pos = buffer.get(); - for (size_t i = 0; i < nDocs; i++) { - docs[i]->writeDocument(pos); - const size_t size = records[i].data.size(); - records[i].data = RecordData(pos, size); - pos += size; - } - invariant(pos == (buffer.get() + totalSize)); - - Status s = _insertRecords(opCtx, records.get(), timestamps, nDocs); - if (!s.isOK()) - return s; - - if (idsOut) { - for (size_t i = 0; i < nDocs; i++) { - idsOut[i] = records[i].id; - } - } - - return s; -} - Status WiredTigerRecordStore::updateRecord(OperationContext* opCtx, const RecordId& id, const char* data, diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h index 546974eb4a3..33365c96b36 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h @@ -143,12 +143,6 @@ public: std::vector<Record>* records, const std::vector<Timestamp>& timestamps); - virtual Status insertRecordsWithDocWriter(OperationContext* opCtx, - const DocWriter* const* docs, - const Timestamp* timestamps, - size_t nDocs, - RecordId* idsOut); - virtual Status updateRecord(OperationContext* opCtx, const RecordId& recordId, const char* data, diff --git a/src/mongo/db/transaction_participant_retryable_writes_test.cpp b/src/mongo/db/transaction_participant_retryable_writes_test.cpp index ddd0a8c3ddd..7203a1c1ddf 100644 --- a/src/mongo/db/transaction_participant_retryable_writes_test.cpp +++ b/src/mongo/db/transaction_participant_retryable_writes_test.cpp @@ -201,25 +201,19 @@ protected: TxnNumber txnNumber, StmtId stmtId, repl::OpTime prevOpTime) { - OperationSessionInfo osi; - osi.setSessionId(lsid); - osi.setTxnNumber(txnNumber); - - repl::OplogLink link; - link.prevOpTime = prevOpTime; - - return repl::logOp(opCtx, - "n", - nss, - uuid, - BSON("TestValue" << 0), - nullptr, - false, - Date_t::now(), - osi, - stmtId, - link, - OplogSlot()); + repl::MutableOplogEntry oplogEntry; + oplogEntry.setOpType(repl::OpTypeEnum::kNoop); + oplogEntry.setNss(nss); + oplogEntry.setUuid(uuid); + oplogEntry.setObject(BSON("TestValue" << 0)); + oplogEntry.setWallClockTime(Date_t::now()); + if (stmtId != kUninitializedStmtId) { + oplogEntry.setSessionId(lsid); + oplogEntry.setTxnNumber(txnNumber); + oplogEntry.setStatementId(stmtId); + oplogEntry.setPrevWriteOpTimeInTransaction(prevOpTime); + } + return repl::logOp(opCtx, &oplogEntry); } repl::OpTime writeTxnRecord(TxnNumber txnNum, @@ -590,34 +584,32 @@ TEST_F(TransactionParticipantRetryableWritesTest, ErrorOnlyWhenStmtIdBeingChecke txnParticipant.refreshFromStorageIfNeeded(opCtx()); txnParticipant.beginOrContinue(opCtx(), txnNum, boost::none, boost::none); - OperationSessionInfo osi; - osi.setSessionId(sessionId); - osi.setTxnNumber(txnNum); + repl::MutableOplogEntry oplogEntry; + oplogEntry.setSessionId(sessionId); + oplogEntry.setTxnNumber(txnNum); + oplogEntry.setNss(kNss); + oplogEntry.setUuid(uuid); auto firstOpTime = ([&]() { + oplogEntry.setOpType(repl::OpTypeEnum::kInsert); + oplogEntry.setObject(BSON("x" << 1)); + oplogEntry.setObject2(TransactionParticipant::kDeadEndSentinel); + oplogEntry.setPrevWriteOpTimeInTransaction(repl::OpTime()); + oplogEntry.setStatementId(1); + AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto wallClockTime = Date_t::now(); + oplogEntry.setWallClockTime(wallClockTime); - auto opTime = repl::logOp(opCtx(), - "i", - kNss, - uuid, - BSON("x" << 1), - &TransactionParticipant::kDeadEndSentinel, - false, - wallClockTime, - osi, - 1, - {}, - OplogSlot()); + auto opTime = repl::logOp(opCtx(), &oplogEntry); SessionTxnRecord sessionTxnRecord; sessionTxnRecord.setSessionId(sessionId); sessionTxnRecord.setTxnNum(txnNum); sessionTxnRecord.setLastWriteOpTime(opTime); - sessionTxnRecord.setLastWriteDate(Date_t::now()); + sessionTxnRecord.setLastWriteDate(wallClockTime); txnParticipant.onWriteOpCompletedOnPrimary(opCtx(), {1}, sessionTxnRecord); wuow.commit(); @@ -625,26 +617,19 @@ TEST_F(TransactionParticipantRetryableWritesTest, ErrorOnlyWhenStmtIdBeingChecke })(); { - repl::OplogLink link; - link.prevOpTime = firstOpTime; + oplogEntry.setOpType(repl::OpTypeEnum::kNoop); + oplogEntry.setObject({}); + oplogEntry.setObject2(TransactionParticipant::kDeadEndSentinel); + oplogEntry.setPrevWriteOpTimeInTransaction(firstOpTime); + oplogEntry.setStatementId(kIncompleteHistoryStmtId); AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto wallClockTime = Date_t::now(); + oplogEntry.setWallClockTime(wallClockTime); - auto opTime = repl::logOp(opCtx(), - "n", - kNss, - uuid, - {}, - &TransactionParticipant::kDeadEndSentinel, - false, - wallClockTime, - osi, - kIncompleteHistoryStmtId, - link, - OplogSlot()); + auto opTime = repl::logOp(opCtx(), &oplogEntry); SessionTxnRecord sessionTxnRecord; sessionTxnRecord.setSessionId(sessionId); |