diff options
author | Randolph Tan <randolph@10gen.com> | 2017-07-27 10:32:09 -0400 |
---|---|---|
committer | Randolph Tan <randolph@10gen.com> | 2017-08-17 16:58:40 -0400 |
commit | 1e11cda15ddae9972f9993a7d6b6cbf9d172bcb3 (patch) | |
tree | 07546cb1464f52398f92db85e8af063d9f3112e0 /src/mongo/db | |
parent | d7325950d72c6aacd25ecbd65888c050fe63482c (diff) | |
download | mongo-1e11cda15ddae9972f9993a7d6b6cbf9d172bcb3.tar.gz |
SERVER-30407 Store pre/post-image documents when running findAndModify with txnNumber
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/catalog/collection.h | 10 | ||||
-rw-r--r-- | src/mongo/db/catalog/collection_impl.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/catalog/collection_impl.h | 15 | ||||
-rw-r--r-- | src/mongo/db/catalog/collection_mock.h | 3 | ||||
-rw-r--r-- | src/mongo/db/commands.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/commands/find_and_modify.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/exec/delete.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/exec/update.cpp | 25 | ||||
-rw-r--r-- | src/mongo/db/op_observer.h | 10 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 115 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl.h | 3 | ||||
-rw-r--r-- | src/mongo/db/op_observer_noop.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/op_observer_noop.h | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 20 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.h | 16 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_entry.idl | 11 |
16 files changed, 221 insertions, 48 deletions
diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h index 467f985de64..4b257571860 100644 --- a/src/mongo/db/catalog/collection.h +++ b/src/mongo/db/catalog/collection.h @@ -185,6 +185,7 @@ class Collection final : CappedCallback, UpdateNotifier { public: enum ValidationAction { WARN, ERROR_V }; enum ValidationLevel { OFF, MODERATE, STRICT_V }; + enum class StoreDeletedDoc { Off, On }; class Impl : virtual CappedCallback, virtual UpdateNotifier { public: @@ -246,7 +247,8 @@ public: const RecordId& loc, OpDebug* opDebug, bool fromMigrate, - bool noWarn) = 0; + bool noWarn, + StoreDeletedDoc storeDeletedDoc) = 0; virtual Status insertDocuments(OperationContext* opCtx, std::vector<InsertStatement>::const_iterator begin, @@ -473,8 +475,10 @@ public: const RecordId& loc, OpDebug* const opDebug, const bool fromMigrate = false, - const bool noWarn = false) { - return this->_impl().deleteDocument(opCtx, stmtId, loc, opDebug, fromMigrate, noWarn); + const bool noWarn = false, + StoreDeletedDoc storeDeletedDoc = StoreDeletedDoc::Off) { + return this->_impl().deleteDocument( + opCtx, stmtId, loc, opDebug, fromMigrate, noWarn, storeDeletedDoc); } /* diff --git a/src/mongo/db/catalog/collection_impl.cpp b/src/mongo/db/catalog/collection_impl.cpp index 3cfbbe3841f..7333330c4cc 100644 --- a/src/mongo/db/catalog/collection_impl.cpp +++ b/src/mongo/db/catalog/collection_impl.cpp @@ -546,7 +546,8 @@ void CollectionImpl::deleteDocument(OperationContext* opCtx, const RecordId& loc, OpDebug* opDebug, bool fromMigrate, - bool noWarn) { + bool noWarn, + Collection::StoreDeletedDoc storeDeletedDoc) { if (isCapped()) { log() << "failing remove on a capped ns " << _ns; uasserted(10089, "cannot remove from a capped collection"); @@ -558,6 +559,11 @@ void CollectionImpl::deleteDocument(OperationContext* opCtx, auto deleteState = getGlobalServiceContext()->getOpObserver()->aboutToDelete(opCtx, ns(), doc.value()); + boost::optional<BSONObj> deletedDoc; + if (storeDeletedDoc == Collection::StoreDeletedDoc::On) { + deletedDoc.emplace(doc.value().getOwned()); + } + /* check if any cursors point to us. if so, advance them. */ _cursorManager.invalidateDocument(opCtx, loc, INVALIDATION_DELETION); @@ -570,7 +576,7 @@ void CollectionImpl::deleteDocument(OperationContext* opCtx, _recordStore->deleteRecord(opCtx, loc); getGlobalServiceContext()->getOpObserver()->onDelete( - opCtx, ns(), uuid(), stmtId, std::move(deleteState), fromMigrate); + opCtx, ns(), uuid(), stmtId, std::move(deleteState), fromMigrate, deletedDoc); } Counter64 moveCounter; @@ -658,6 +664,8 @@ RecordId CollectionImpl::updateDocument(OperationContext* opCtx, } } + args->preImageDoc = oldDoc.value().getOwned(); + Status updateStatus = _recordStore->updateRecord( opCtx, oldLocation, newDoc.objdata(), newDoc.objsize(), _enforceQuota(enforceQuota), this); @@ -712,6 +720,8 @@ StatusWith<RecordId> CollectionImpl::_updateDocumentWithMove(OperationContext* o _cursorManager.invalidateDocument(opCtx, oldLocation, INVALIDATION_DELETION); + args->preImageDoc = oldDoc.value().getOwned(); + // Remove indexes for old record. int64_t keysDeleted; _indexCatalog.unindexRecord(opCtx, oldDoc.value(), oldLocation, true, &keysDeleted); diff --git a/src/mongo/db/catalog/collection_impl.h b/src/mongo/db/catalog/collection_impl.h index b9132e26261..9e5ca69270c 100644 --- a/src/mongo/db/catalog/collection_impl.h +++ b/src/mongo/db/catalog/collection_impl.h @@ -146,13 +146,16 @@ public: * 'cappedOK' if true, allows deletes on capped collections (Cloner::copyDB uses this). * 'noWarn' if unindexing the record causes an error, if noWarn is true the error * will not be logged. + * 'storeDeletedDoc' whether to store the document deleted in the oplog. */ - void deleteDocument(OperationContext* opCtx, - StmtId stmtId, - const RecordId& loc, - OpDebug* opDebug, - bool fromMigrate = false, - bool noWarn = false) final; + void deleteDocument( + OperationContext* opCtx, + StmtId stmtId, + const RecordId& loc, + OpDebug* opDebug, + bool fromMigrate = false, + bool noWarn = false, + Collection::StoreDeletedDoc storeDeletedDoc = Collection::StoreDeletedDoc::Off) final; /* * Inserts all documents inside one WUOW. diff --git a/src/mongo/db/catalog/collection_mock.h b/src/mongo/db/catalog/collection_mock.h index a5bb17349ac..ad836035f1b 100644 --- a/src/mongo/db/catalog/collection_mock.h +++ b/src/mongo/db/catalog/collection_mock.h @@ -133,7 +133,8 @@ public: const RecordId& loc, OpDebug* opDebug, bool fromMigrate, - bool noWarn) { + bool noWarn, + Collection::StoreDeletedDoc storeDeletedDoc) { std::abort(); } diff --git a/src/mongo/db/commands.cpp b/src/mongo/db/commands.cpp index c8f99250182..5fd7bc58d1a 100644 --- a/src/mongo/db/commands.cpp +++ b/src/mongo/db/commands.cpp @@ -412,7 +412,8 @@ BSONObj Command::filterCommandRequestForPassthrough(const BSONObj& cmdObj) { name == "$queryOptions" || // name == "maxTimeMS" || // name == "readConcern" || // - name == "writeConcern") { + name == "writeConcern" || + name == "lsid" || name == "txnNumber") { // This is the whitelist of generic arguments that commands can be trusted to blindly // forward to the shards. bob.append(elem); diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp index c11466cff22..168aa12b295 100644 --- a/src/mongo/db/commands/find_and_modify.cpp +++ b/src/mongo/db/commands/find_and_modify.cpp @@ -372,6 +372,10 @@ public: const bool isExplain = false; makeDeleteRequest(args, isExplain, &request); + if (opCtx->getTxnNumber()) { + request.setStmtId(0); + } + ParsedDelete parsedDelete(opCtx, &request); Status parsedDeleteStatus = parsedDelete.parseRequest(); if (!parsedDeleteStatus.isOK()) { @@ -453,6 +457,10 @@ public: const bool isExplain = false; makeUpdateRequest(args, isExplain, &updateLifecycle, &request); + if (opCtx->getTxnNumber()) { + request.setStmtId(0); + } + ParsedUpdate parsedUpdate(opCtx, &request); Status parsedUpdateStatus = parsedUpdate.parseRequest(); if (!parsedUpdateStatus.isOK()) { diff --git a/src/mongo/db/exec/delete.cpp b/src/mongo/db/exec/delete.cpp index 6262279c054..497e4ed04ec 100644 --- a/src/mongo/db/exec/delete.cpp +++ b/src/mongo/db/exec/delete.cpp @@ -215,8 +215,14 @@ PlanStage::StageState DeleteStage::doWork(WorkingSetID* out) { if (!_params.isExplain) { try { WriteUnitOfWork wunit(getOpCtx()); - _collection->deleteDocument( - getOpCtx(), _params.stmtId, recordId, _params.opDebug, _params.fromMigrate); + _collection->deleteDocument(getOpCtx(), + _params.stmtId, + recordId, + _params.opDebug, + _params.fromMigrate, + false, + _params.returnDeleted ? Collection::StoreDeletedDoc::On + : Collection::StoreDeletedDoc::Off); wunit.commit(); } catch (const WriteConflictException&) { memberFreer.Dismiss(); // Keep this member around so we can retry deleting it. diff --git a/src/mongo/db/exec/update.cpp b/src/mongo/db/exec/update.cpp index 0788a621ee9..ea3c014c28c 100644 --- a/src/mongo/db/exec/update.cpp +++ b/src/mongo/db/exec/update.cpp @@ -143,6 +143,19 @@ const std::vector<std::unique_ptr<FieldRef>>* getImmutableFields(OperationContex return NULL; } +OplogUpdateEntryArgs::StoreDocOption getStoreDocMode(const UpdateRequest& updateRequest) { + if (updateRequest.shouldReturnNewDocs()) { + return OplogUpdateEntryArgs::StoreDocOption::PostImage; + } + + if (updateRequest.shouldReturnOldDocs()) { + return OplogUpdateEntryArgs::StoreDocOption::PreImage; + } + + invariant(!updateRequest.shouldReturnAnyDocs()); + return OplogUpdateEntryArgs::StoreDocOption::None; +} + } // namespace const char* UpdateStage::kStageType = "UPDATE"; @@ -289,6 +302,12 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, Reco args.update = logObj; args.criteria = idQuery; args.fromMigrate = request->isFromMigration(); + args.storeDocOption = getStoreDocMode(*request); + + if (args.storeDocOption == OplogUpdateEntryArgs::StoreDocOption::PreImage) { + args.preImageDoc = oldObj.value().getOwned(); + } + StatusWith<RecordData> newRecStatus = _collection->updateDocumentWithDamages( getOpCtx(), recordId, @@ -320,6 +339,12 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, Reco args.update = logObj; args.criteria = idQuery; args.fromMigrate = request->isFromMigration(); + args.storeDocOption = getStoreDocMode(*request); + + if (args.storeDocOption == OplogUpdateEntryArgs::StoreDocOption::PreImage) { + args.preImageDoc = oldObj.value().getOwned(); + } + newRecordId = _collection->updateDocument(getOpCtx(), recordId, oldObj, diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h index 9ccafadc298..c0f67897102 100644 --- a/src/mongo/db/op_observer.h +++ b/src/mongo/db/op_observer.h @@ -50,6 +50,8 @@ class OpTime; * Holds document update information used in logging. */ struct OplogUpdateEntryArgs { + enum class StoreDocOption { None, PreImage, PostImage }; + // Name of the collection in which document is being updated. NamespaceString nss; @@ -57,6 +59,9 @@ struct OplogUpdateEntryArgs { StmtId stmtId = kUninitializedStmtId; + // The document before modifiers were applied. + boost::optional<BSONObj> preImageDoc; + // Fully updated document with damages (update modifiers) applied. BSONObj updatedDoc; @@ -68,6 +73,8 @@ struct OplogUpdateEntryArgs { // True if this update comes from a chunk migration. bool fromMigrate; + + StoreDocOption storeDocOption = StoreDocOption::None; }; struct TTLCollModInfo { @@ -112,7 +119,8 @@ public: OptionalCollectionUUID uuid, StmtId stmtId, CollectionShardingState::DeleteState deleteState, - bool fromMigrate) = 0; + bool fromMigrate, + const boost::optional<BSONObj>& deletedDoc) = 0; virtual void onOpMessage(OperationContext* opCtx, const BSONObj& msgObj) = 0; virtual void onCreateCollection(OperationContext* opCtx, Collection* coll, diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index e128a003cc8..8c103798fd6 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -74,6 +74,71 @@ void updateSessionProgress(OperationContext* opCtx, const repl::OpTime& lastTxnW } } +/** + * Write oplog entry(ies) for the update operation. + */ +repl::OpTime replLogUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) { + BSONObj storeObj; + if (args.storeDocOption == OplogUpdateEntryArgs::StoreDocOption::PreImage) { + invariant(args.preImageDoc); + storeObj = args.preImageDoc.value(); + } else if (args.storeDocOption == OplogUpdateEntryArgs::StoreDocOption::PostImage) { + storeObj = args.updatedDoc; + } + + repl::PreAndPostImageTimestamps preAndPostTs; + + if (!storeObj.isEmpty() && opCtx->getTxnNumber().is_initialized()) { + auto noteUpdateOpTime = + repl::logOp(opCtx, "n", args.nss, args.uuid, storeObj, nullptr, false, args.stmtId, {}); + + if (args.storeDocOption == OplogUpdateEntryArgs::StoreDocOption::PreImage) { + preAndPostTs.preImageTs = noteUpdateOpTime.getTimestamp(); + } else if (args.storeDocOption == OplogUpdateEntryArgs::StoreDocOption::PostImage) { + preAndPostTs.postImageTs = noteUpdateOpTime.getTimestamp(); + } + } + + return repl::logOp(opCtx, + "u", + args.nss, + args.uuid, + args.update, + &args.criteria, + args.fromMigrate, + args.stmtId, + preAndPostTs); +} + +/** + * Write oplog entry(ies) for the delete operation. + */ +repl::OpTime replLogDelete(OperationContext* opCtx, + const NamespaceString& nss, + OptionalCollectionUUID uuid, + StmtId stmtId, + CollectionShardingState::DeleteState deleteState, + bool fromMigrate, + const boost::optional<BSONObj>& deletedDoc) { + repl::PreAndPostImageTimestamps preAndPostImageTs; + + if (deletedDoc.is_initialized() && opCtx->getTxnNumber().is_initialized()) { + auto noteOplog = + repl::logOp(opCtx, "n", nss, uuid, deletedDoc.get(), nullptr, false, stmtId, {}); + preAndPostImageTs.preImageTs = noteOplog.getTimestamp(); + } + + return repl::logOp(opCtx, + "d", + nss, + uuid, + deleteState.documentKey, + nullptr, + fromMigrate, + stmtId, + preAndPostImageTs); +} + } // namespace void OpObserverImpl::onCreateIndex(OperationContext* opCtx, @@ -97,10 +162,18 @@ void OpObserverImpl::onCreateIndex(OperationContext* opCtx, builder.done(), nullptr, fromMigrate, - kUninitializedStmtId); + kUninitializedStmtId, + {}); } else { - repl::logOp( - opCtx, "i", systemIndexes, {}, indexDoc, nullptr, fromMigrate, kUninitializedStmtId); + repl::logOp(opCtx, + "i", + systemIndexes, + {}, + indexDoc, + nullptr, + fromMigrate, + kUninitializedStmtId, + {}); } AuthorizationManager::get(opCtx->getServiceContext()) ->logOp(opCtx, "i", systemIndexes, indexDoc, nullptr); @@ -151,14 +224,8 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg return; } - auto opTime = repl::logOp(opCtx, - "u", - args.nss, - args.uuid, - args.update, - &args.criteria, - args.fromMigrate, - args.stmtId); + auto opTime = replLogUpdate(opCtx, args); + AuthorizationManager::get(opCtx->getServiceContext()) ->logOp(opCtx, "u", args.nss, args.update, &args.criteria); @@ -194,12 +261,13 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, OptionalCollectionUUID uuid, StmtId stmtId, CollectionShardingState::DeleteState deleteState, - bool fromMigrate) { + bool fromMigrate, + const boost::optional<BSONObj>& deletedDoc) { if (deleteState.documentKey.isEmpty()) return; - auto opTime = - repl::logOp(opCtx, "d", nss, uuid, deleteState.documentKey, nullptr, fromMigrate, stmtId); + auto opTime = replLogDelete(opCtx, nss, uuid, stmtId, deleteState, fromMigrate, deletedDoc); + AuthorizationManager::get(opCtx->getServiceContext()) ->logOp(opCtx, "d", nss, deleteState.documentKey, nullptr); @@ -222,7 +290,7 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, } void OpObserverImpl::onOpMessage(OperationContext* opCtx, const BSONObj& msgObj) { - repl::logOp(opCtx, "n", {}, {}, msgObj, nullptr, false, kUninitializedStmtId); + repl::logOp(opCtx, "n", {}, {}, msgObj, nullptr, false, kUninitializedStmtId, {}); } void OpObserverImpl::onCreateCollection(OperationContext* opCtx, @@ -254,7 +322,8 @@ void OpObserverImpl::onCreateCollection(OperationContext* opCtx, if (!collectionName.isSystemDotProfile()) { // do not replicate system.profile modifications - repl::logOp(opCtx, "c", dbName, options.uuid, cmdObj, nullptr, false, kUninitializedStmtId); + repl::logOp( + opCtx, "c", dbName, options.uuid, cmdObj, nullptr, false, kUninitializedStmtId, {}); } getGlobalAuthorizationManager()->logOp(opCtx, "c", dbName, cmdObj, nullptr); @@ -324,7 +393,7 @@ void OpObserverImpl::onCollMod(OperationContext* opCtx, if (!nss.isSystemDotProfile()) { // do not replicate system.profile modifications - repl::logOp(opCtx, "c", cmdNss, uuid, cmdObj, &o2Obj, false, kUninitializedStmtId); + repl::logOp(opCtx, "c", cmdNss, uuid, cmdObj, &o2Obj, false, kUninitializedStmtId, {}); } getGlobalAuthorizationManager()->logOp(opCtx, "c", cmdNss, cmdObj, nullptr); @@ -354,7 +423,7 @@ void OpObserverImpl::onDropDatabase(OperationContext* opCtx, const std::string& BSONObj cmdObj = BSON("dropDatabase" << 1); const NamespaceString cmdNss{dbName, "$cmd"}; - repl::logOp(opCtx, "c", cmdNss, {}, cmdObj, nullptr, false, kUninitializedStmtId); + repl::logOp(opCtx, "c", cmdNss, {}, cmdObj, nullptr, false, kUninitializedStmtId, {}); if (dbName == FeatureCompatibilityVersion::kDatabase) { FeatureCompatibilityVersion::onDropCollection(opCtx); @@ -375,7 +444,7 @@ repl::OpTime OpObserverImpl::onDropCollection(OperationContext* opCtx, if (!collectionName.isSystemDotProfile()) { // do not replicate system.profile modifications dropOpTime = - repl::logOp(opCtx, "c", dbName, uuid, cmdObj, nullptr, false, kUninitializedStmtId); + repl::logOp(opCtx, "c", dbName, uuid, cmdObj, nullptr, false, kUninitializedStmtId, {}); } if (collectionName.coll() == DurableViewCatalog::viewsCollectionName()) { @@ -410,7 +479,7 @@ void OpObserverImpl::onDropIndex(OperationContext* opCtx, const BSONObj& indexInfo) { BSONObj cmdObj = BSON("dropIndexes" << nss.coll() << "index" << indexName); auto commandNS = nss.getCommandNS(); - repl::logOp(opCtx, "c", commandNS, uuid, cmdObj, &indexInfo, false, kUninitializedStmtId); + repl::logOp(opCtx, "c", commandNS, uuid, cmdObj, &indexInfo, false, kUninitializedStmtId, {}); getGlobalAuthorizationManager()->logOp(opCtx, "c", commandNS, cmdObj, &indexInfo); } @@ -439,7 +508,7 @@ repl::OpTime OpObserverImpl::onRenameCollection(OperationContext* opCtx, BSONObj cmdObj = builder.done(); auto renameOpTime = - repl::logOp(opCtx, "c", cmdNss, uuid, cmdObj, nullptr, false, kUninitializedStmtId); + repl::logOp(opCtx, "c", cmdNss, uuid, cmdObj, nullptr, false, kUninitializedStmtId, {}); if (fromCollection.isSystemDotViews()) DurableViewCatalog::onExternalChange(opCtx, fromCollection); @@ -472,7 +541,7 @@ void OpObserverImpl::onApplyOps(OperationContext* opCtx, const std::string& dbName, const BSONObj& applyOpCmd) { const NamespaceString cmdNss{dbName, "$cmd"}; - repl::logOp(opCtx, "c", cmdNss, {}, applyOpCmd, nullptr, false, kUninitializedStmtId); + repl::logOp(opCtx, "c", cmdNss, {}, applyOpCmd, nullptr, false, kUninitializedStmtId, {}); getGlobalAuthorizationManager()->logOp(opCtx, "c", cmdNss, applyOpCmd, nullptr); } @@ -485,7 +554,7 @@ void OpObserverImpl::onEmptyCapped(OperationContext* opCtx, if (!collectionName.isSystemDotProfile()) { // do not replicate system.profile modifications - repl::logOp(opCtx, "c", cmdNss, uuid, cmdObj, nullptr, false, kUninitializedStmtId); + repl::logOp(opCtx, "c", cmdNss, uuid, cmdObj, nullptr, false, kUninitializedStmtId, {}); } getGlobalAuthorizationManager()->logOp(opCtx, "c", cmdNss, cmdObj, nullptr); diff --git a/src/mongo/db/op_observer_impl.h b/src/mongo/db/op_observer_impl.h index 25f1e28523f..7dff0515586 100644 --- a/src/mongo/db/op_observer_impl.h +++ b/src/mongo/db/op_observer_impl.h @@ -59,7 +59,8 @@ public: OptionalCollectionUUID uuid, StmtId stmtId, CollectionShardingState::DeleteState deleteState, - bool fromMigrate) override; + bool fromMigrate, + const boost::optional<BSONObj>& deletedDoc) override; void onOpMessage(OperationContext* opCtx, const BSONObj& msgObj) override; void onCreateCollection(OperationContext* opCtx, Collection* coll, diff --git a/src/mongo/db/op_observer_noop.cpp b/src/mongo/db/op_observer_noop.cpp index 648f14b6a99..a0ad429a6c0 100644 --- a/src/mongo/db/op_observer_noop.cpp +++ b/src/mongo/db/op_observer_noop.cpp @@ -60,7 +60,8 @@ void OpObserverNoop::onDelete(OperationContext*, OptionalCollectionUUID, StmtId stmtId, CollectionShardingState::DeleteState, - bool) {} + bool fromMigrate, + const boost::optional<BSONObj>& deletedDoc) {} void OpObserverNoop::onOpMessage(OperationContext*, const BSONObj&) {} diff --git a/src/mongo/db/op_observer_noop.h b/src/mongo/db/op_observer_noop.h index 7e51f18177b..a15e2b51dec 100644 --- a/src/mongo/db/op_observer_noop.h +++ b/src/mongo/db/op_observer_noop.h @@ -59,7 +59,8 @@ public: OptionalCollectionUUID uuid, StmtId stmtId, CollectionShardingState::DeleteState deleteState, - bool fromMigrate) override; + bool fromMigrate, + const boost::optional<BSONObj>& deletedDoc) override; void onOpMessage(OperationContext* opCtx, const BSONObj& msgObj) override; void onCreateCollection(OperationContext* opCtx, Collection* coll, diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index d8877504c04..f22cc9a8ede 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -333,7 +333,8 @@ OplogDocWriter _logOpWriter(OperationContext* opCtx, long long hashNew, Date_t wallTime, StmtId statementId, - const Timestamp& prevTs) { + const Timestamp& prevTs, + const PreAndPostImageTimestamps& preAndPostTs) { BSONObjBuilder b(256); b.append("ts", optime.getTimestamp()); @@ -360,6 +361,14 @@ OplogDocWriter _logOpWriter(OperationContext* opCtx, appendSessionInfo(opCtx, &b, statementId, prevTs); + if (!preAndPostTs.preImageTs.isNull()) { + b.append(OplogEntryBase::kPreImageTsFieldName, preAndPostTs.preImageTs); + } + + if (!preAndPostTs.postImageTs.isNull()) { + b.append(OplogEntryBase::kPostImageTsFieldName, preAndPostTs.postImageTs); + } + return OplogDocWriter(OplogDocWriter(b.obj(), obj)); } } // end anon namespace @@ -414,7 +423,8 @@ OpTime logOp(OperationContext* opCtx, const BSONObj& obj, const BSONObj* o2, bool fromMigrate, - StmtId statementId) { + StmtId statementId, + const PreAndPostImageTimestamps& preAndPostTs) { auto replCoord = ReplicationCoordinator::get(opCtx); if (replCoord->isOplogDisabledFor(opCtx, nss)) { invariant(statementId == kUninitializedStmtId); @@ -444,7 +454,8 @@ OpTime logOp(OperationContext* opCtx, slot.hash, Date_t::now(), statementId, - prevTs); + prevTs, + preAndPostTs); const DocWriter* basePtr = &writer; _logOpsInner(opCtx, nss, &basePtr, 1, oplog, replMode, slot.opTime); return slot.opTime; @@ -493,7 +504,8 @@ repl::OpTime logInsertOps(OperationContext* opCtx, slots[i].hash, wallTime, insertStatement.stmtId, - prevTs)); + prevTs, + {})); prevTs = slots[i].opTime.getTimestamp(); } diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h index f119534ad7a..241b5ca746a 100644 --- a/src/mongo/db/repl/oplog.h +++ b/src/mongo/db/repl/oplog.h @@ -49,6 +49,15 @@ class OperationContext; namespace repl { class ReplSettings; +struct PreAndPostImageTimestamps { + PreAndPostImageTimestamps() = default; + PreAndPostImageTimestamps(Timestamp _preImageTs, Timestamp _postImageTs) + : preImageTs(std::move(_preImageTs)), postImageTs(std::move(_postImageTs)) {} + + Timestamp preImageTs; + Timestamp postImageTs; +}; + /** * Create a new capped collection for the oplog if it doesn't yet exist. * If the collection already exists (and isReplSet is false), @@ -88,6 +97,10 @@ OpTime logInsertOps(OperationContext* opCtx, * * For 'u' records, 'obj' captures the mutation made to the object but not * the object itself. 'o2' captures the the criteria for the object that will be modified. + * + * preAndPostImageTs this contains the timestamp of the oplog entry that contains the document + * before/after update was applied. The timestamps are ignored if isNull() is true. + * * Returns the optime of the oplog entry written to the oplog. * Returns a null optime if oplog was not modified. */ @@ -98,7 +111,8 @@ OpTime logOp(OperationContext* opCtx, const BSONObj& obj, const BSONObj* o2, bool fromMigrate, - StmtId stmtId); + StmtId stmtId, + const PreAndPostImageTimestamps& preAndPostTs); // Flush out the cached pointers to the local database and oplog. // Used by the closeDatabase command to ensure we don't cache closed things. diff --git a/src/mongo/db/repl/oplog_entry.idl b/src/mongo/db/repl/oplog_entry.idl index 0f5a5731021..a01b38f2ee0 100644 --- a/src/mongo/db/repl/oplog_entry.idl +++ b/src/mongo/db/repl/oplog_entry.idl @@ -115,4 +115,13 @@ structs: type: timestamp optional: true # Only for writes that are part of a transaction description: "The oplog timestamp of the previous write with the same transaction." - + preImageTs: + type: timestamp + optional: true + description: "The oplog timestamp of another oplog entry that contains the document + before an update/remove was applied." + postImageTs: + type: timestamp + optional: true + description: "The oplog timestamp of another oplog entry that contains the document + after an update was applied." |