summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2017-07-27 10:32:09 -0400
committerRandolph Tan <randolph@10gen.com>2017-08-17 16:58:40 -0400
commit1e11cda15ddae9972f9993a7d6b6cbf9d172bcb3 (patch)
tree07546cb1464f52398f92db85e8af063d9f3112e0 /src/mongo/db
parentd7325950d72c6aacd25ecbd65888c050fe63482c (diff)
downloadmongo-1e11cda15ddae9972f9993a7d6b6cbf9d172bcb3.tar.gz
SERVER-30407 Store pre/post-image documents when running findAndModify with txnNumber
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/catalog/collection.h10
-rw-r--r--src/mongo/db/catalog/collection_impl.cpp14
-rw-r--r--src/mongo/db/catalog/collection_impl.h15
-rw-r--r--src/mongo/db/catalog/collection_mock.h3
-rw-r--r--src/mongo/db/commands.cpp3
-rw-r--r--src/mongo/db/commands/find_and_modify.cpp8
-rw-r--r--src/mongo/db/exec/delete.cpp10
-rw-r--r--src/mongo/db/exec/update.cpp25
-rw-r--r--src/mongo/db/op_observer.h10
-rw-r--r--src/mongo/db/op_observer_impl.cpp115
-rw-r--r--src/mongo/db/op_observer_impl.h3
-rw-r--r--src/mongo/db/op_observer_noop.cpp3
-rw-r--r--src/mongo/db/op_observer_noop.h3
-rw-r--r--src/mongo/db/repl/oplog.cpp20
-rw-r--r--src/mongo/db/repl/oplog.h16
-rw-r--r--src/mongo/db/repl/oplog_entry.idl11
16 files changed, 221 insertions, 48 deletions
diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h
index 467f985de64..4b257571860 100644
--- a/src/mongo/db/catalog/collection.h
+++ b/src/mongo/db/catalog/collection.h
@@ -185,6 +185,7 @@ class Collection final : CappedCallback, UpdateNotifier {
public:
enum ValidationAction { WARN, ERROR_V };
enum ValidationLevel { OFF, MODERATE, STRICT_V };
+ enum class StoreDeletedDoc { Off, On };
class Impl : virtual CappedCallback, virtual UpdateNotifier {
public:
@@ -246,7 +247,8 @@ public:
const RecordId& loc,
OpDebug* opDebug,
bool fromMigrate,
- bool noWarn) = 0;
+ bool noWarn,
+ StoreDeletedDoc storeDeletedDoc) = 0;
virtual Status insertDocuments(OperationContext* opCtx,
std::vector<InsertStatement>::const_iterator begin,
@@ -473,8 +475,10 @@ public:
const RecordId& loc,
OpDebug* const opDebug,
const bool fromMigrate = false,
- const bool noWarn = false) {
- return this->_impl().deleteDocument(opCtx, stmtId, loc, opDebug, fromMigrate, noWarn);
+ const bool noWarn = false,
+ StoreDeletedDoc storeDeletedDoc = StoreDeletedDoc::Off) {
+ return this->_impl().deleteDocument(
+ opCtx, stmtId, loc, opDebug, fromMigrate, noWarn, storeDeletedDoc);
}
/*
diff --git a/src/mongo/db/catalog/collection_impl.cpp b/src/mongo/db/catalog/collection_impl.cpp
index 3cfbbe3841f..7333330c4cc 100644
--- a/src/mongo/db/catalog/collection_impl.cpp
+++ b/src/mongo/db/catalog/collection_impl.cpp
@@ -546,7 +546,8 @@ void CollectionImpl::deleteDocument(OperationContext* opCtx,
const RecordId& loc,
OpDebug* opDebug,
bool fromMigrate,
- bool noWarn) {
+ bool noWarn,
+ Collection::StoreDeletedDoc storeDeletedDoc) {
if (isCapped()) {
log() << "failing remove on a capped ns " << _ns;
uasserted(10089, "cannot remove from a capped collection");
@@ -558,6 +559,11 @@ void CollectionImpl::deleteDocument(OperationContext* opCtx,
auto deleteState =
getGlobalServiceContext()->getOpObserver()->aboutToDelete(opCtx, ns(), doc.value());
+ boost::optional<BSONObj> deletedDoc;
+ if (storeDeletedDoc == Collection::StoreDeletedDoc::On) {
+ deletedDoc.emplace(doc.value().getOwned());
+ }
+
/* check if any cursors point to us. if so, advance them. */
_cursorManager.invalidateDocument(opCtx, loc, INVALIDATION_DELETION);
@@ -570,7 +576,7 @@ void CollectionImpl::deleteDocument(OperationContext* opCtx,
_recordStore->deleteRecord(opCtx, loc);
getGlobalServiceContext()->getOpObserver()->onDelete(
- opCtx, ns(), uuid(), stmtId, std::move(deleteState), fromMigrate);
+ opCtx, ns(), uuid(), stmtId, std::move(deleteState), fromMigrate, deletedDoc);
}
Counter64 moveCounter;
@@ -658,6 +664,8 @@ RecordId CollectionImpl::updateDocument(OperationContext* opCtx,
}
}
+ args->preImageDoc = oldDoc.value().getOwned();
+
Status updateStatus = _recordStore->updateRecord(
opCtx, oldLocation, newDoc.objdata(), newDoc.objsize(), _enforceQuota(enforceQuota), this);
@@ -712,6 +720,8 @@ StatusWith<RecordId> CollectionImpl::_updateDocumentWithMove(OperationContext* o
_cursorManager.invalidateDocument(opCtx, oldLocation, INVALIDATION_DELETION);
+ args->preImageDoc = oldDoc.value().getOwned();
+
// Remove indexes for old record.
int64_t keysDeleted;
_indexCatalog.unindexRecord(opCtx, oldDoc.value(), oldLocation, true, &keysDeleted);
diff --git a/src/mongo/db/catalog/collection_impl.h b/src/mongo/db/catalog/collection_impl.h
index b9132e26261..9e5ca69270c 100644
--- a/src/mongo/db/catalog/collection_impl.h
+++ b/src/mongo/db/catalog/collection_impl.h
@@ -146,13 +146,16 @@ public:
* 'cappedOK' if true, allows deletes on capped collections (Cloner::copyDB uses this).
* 'noWarn' if unindexing the record causes an error, if noWarn is true the error
* will not be logged.
+ * 'storeDeletedDoc' whether to store the document deleted in the oplog.
*/
- void deleteDocument(OperationContext* opCtx,
- StmtId stmtId,
- const RecordId& loc,
- OpDebug* opDebug,
- bool fromMigrate = false,
- bool noWarn = false) final;
+ void deleteDocument(
+ OperationContext* opCtx,
+ StmtId stmtId,
+ const RecordId& loc,
+ OpDebug* opDebug,
+ bool fromMigrate = false,
+ bool noWarn = false,
+ Collection::StoreDeletedDoc storeDeletedDoc = Collection::StoreDeletedDoc::Off) final;
/*
* Inserts all documents inside one WUOW.
diff --git a/src/mongo/db/catalog/collection_mock.h b/src/mongo/db/catalog/collection_mock.h
index a5bb17349ac..ad836035f1b 100644
--- a/src/mongo/db/catalog/collection_mock.h
+++ b/src/mongo/db/catalog/collection_mock.h
@@ -133,7 +133,8 @@ public:
const RecordId& loc,
OpDebug* opDebug,
bool fromMigrate,
- bool noWarn) {
+ bool noWarn,
+ Collection::StoreDeletedDoc storeDeletedDoc) {
std::abort();
}
diff --git a/src/mongo/db/commands.cpp b/src/mongo/db/commands.cpp
index c8f99250182..5fd7bc58d1a 100644
--- a/src/mongo/db/commands.cpp
+++ b/src/mongo/db/commands.cpp
@@ -412,7 +412,8 @@ BSONObj Command::filterCommandRequestForPassthrough(const BSONObj& cmdObj) {
name == "$queryOptions" || //
name == "maxTimeMS" || //
name == "readConcern" || //
- name == "writeConcern") {
+ name == "writeConcern" ||
+ name == "lsid" || name == "txnNumber") {
// This is the whitelist of generic arguments that commands can be trusted to blindly
// forward to the shards.
bob.append(elem);
diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp
index c11466cff22..168aa12b295 100644
--- a/src/mongo/db/commands/find_and_modify.cpp
+++ b/src/mongo/db/commands/find_and_modify.cpp
@@ -372,6 +372,10 @@ public:
const bool isExplain = false;
makeDeleteRequest(args, isExplain, &request);
+ if (opCtx->getTxnNumber()) {
+ request.setStmtId(0);
+ }
+
ParsedDelete parsedDelete(opCtx, &request);
Status parsedDeleteStatus = parsedDelete.parseRequest();
if (!parsedDeleteStatus.isOK()) {
@@ -453,6 +457,10 @@ public:
const bool isExplain = false;
makeUpdateRequest(args, isExplain, &updateLifecycle, &request);
+ if (opCtx->getTxnNumber()) {
+ request.setStmtId(0);
+ }
+
ParsedUpdate parsedUpdate(opCtx, &request);
Status parsedUpdateStatus = parsedUpdate.parseRequest();
if (!parsedUpdateStatus.isOK()) {
diff --git a/src/mongo/db/exec/delete.cpp b/src/mongo/db/exec/delete.cpp
index 6262279c054..497e4ed04ec 100644
--- a/src/mongo/db/exec/delete.cpp
+++ b/src/mongo/db/exec/delete.cpp
@@ -215,8 +215,14 @@ PlanStage::StageState DeleteStage::doWork(WorkingSetID* out) {
if (!_params.isExplain) {
try {
WriteUnitOfWork wunit(getOpCtx());
- _collection->deleteDocument(
- getOpCtx(), _params.stmtId, recordId, _params.opDebug, _params.fromMigrate);
+ _collection->deleteDocument(getOpCtx(),
+ _params.stmtId,
+ recordId,
+ _params.opDebug,
+ _params.fromMigrate,
+ false,
+ _params.returnDeleted ? Collection::StoreDeletedDoc::On
+ : Collection::StoreDeletedDoc::Off);
wunit.commit();
} catch (const WriteConflictException&) {
memberFreer.Dismiss(); // Keep this member around so we can retry deleting it.
diff --git a/src/mongo/db/exec/update.cpp b/src/mongo/db/exec/update.cpp
index 0788a621ee9..ea3c014c28c 100644
--- a/src/mongo/db/exec/update.cpp
+++ b/src/mongo/db/exec/update.cpp
@@ -143,6 +143,19 @@ const std::vector<std::unique_ptr<FieldRef>>* getImmutableFields(OperationContex
return NULL;
}
+OplogUpdateEntryArgs::StoreDocOption getStoreDocMode(const UpdateRequest& updateRequest) {
+ if (updateRequest.shouldReturnNewDocs()) {
+ return OplogUpdateEntryArgs::StoreDocOption::PostImage;
+ }
+
+ if (updateRequest.shouldReturnOldDocs()) {
+ return OplogUpdateEntryArgs::StoreDocOption::PreImage;
+ }
+
+ invariant(!updateRequest.shouldReturnAnyDocs());
+ return OplogUpdateEntryArgs::StoreDocOption::None;
+}
+
} // namespace
const char* UpdateStage::kStageType = "UPDATE";
@@ -289,6 +302,12 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, Reco
args.update = logObj;
args.criteria = idQuery;
args.fromMigrate = request->isFromMigration();
+ args.storeDocOption = getStoreDocMode(*request);
+
+ if (args.storeDocOption == OplogUpdateEntryArgs::StoreDocOption::PreImage) {
+ args.preImageDoc = oldObj.value().getOwned();
+ }
+
StatusWith<RecordData> newRecStatus = _collection->updateDocumentWithDamages(
getOpCtx(),
recordId,
@@ -320,6 +339,12 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, Reco
args.update = logObj;
args.criteria = idQuery;
args.fromMigrate = request->isFromMigration();
+ args.storeDocOption = getStoreDocMode(*request);
+
+ if (args.storeDocOption == OplogUpdateEntryArgs::StoreDocOption::PreImage) {
+ args.preImageDoc = oldObj.value().getOwned();
+ }
+
newRecordId = _collection->updateDocument(getOpCtx(),
recordId,
oldObj,
diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h
index 9ccafadc298..c0f67897102 100644
--- a/src/mongo/db/op_observer.h
+++ b/src/mongo/db/op_observer.h
@@ -50,6 +50,8 @@ class OpTime;
* Holds document update information used in logging.
*/
struct OplogUpdateEntryArgs {
+ enum class StoreDocOption { None, PreImage, PostImage };
+
// Name of the collection in which document is being updated.
NamespaceString nss;
@@ -57,6 +59,9 @@ struct OplogUpdateEntryArgs {
StmtId stmtId = kUninitializedStmtId;
+ // The document before modifiers were applied.
+ boost::optional<BSONObj> preImageDoc;
+
// Fully updated document with damages (update modifiers) applied.
BSONObj updatedDoc;
@@ -68,6 +73,8 @@ struct OplogUpdateEntryArgs {
// True if this update comes from a chunk migration.
bool fromMigrate;
+
+ StoreDocOption storeDocOption = StoreDocOption::None;
};
struct TTLCollModInfo {
@@ -112,7 +119,8 @@ public:
OptionalCollectionUUID uuid,
StmtId stmtId,
CollectionShardingState::DeleteState deleteState,
- bool fromMigrate) = 0;
+ bool fromMigrate,
+ const boost::optional<BSONObj>& deletedDoc) = 0;
virtual void onOpMessage(OperationContext* opCtx, const BSONObj& msgObj) = 0;
virtual void onCreateCollection(OperationContext* opCtx,
Collection* coll,
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index e128a003cc8..8c103798fd6 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -74,6 +74,71 @@ void updateSessionProgress(OperationContext* opCtx, const repl::OpTime& lastTxnW
}
}
+/**
+ * Write oplog entry(ies) for the update operation.
+ */
+repl::OpTime replLogUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) {
+ BSONObj storeObj;
+ if (args.storeDocOption == OplogUpdateEntryArgs::StoreDocOption::PreImage) {
+ invariant(args.preImageDoc);
+ storeObj = args.preImageDoc.value();
+ } else if (args.storeDocOption == OplogUpdateEntryArgs::StoreDocOption::PostImage) {
+ storeObj = args.updatedDoc;
+ }
+
+ repl::PreAndPostImageTimestamps preAndPostTs;
+
+ if (!storeObj.isEmpty() && opCtx->getTxnNumber().is_initialized()) {
+ auto noteUpdateOpTime =
+ repl::logOp(opCtx, "n", args.nss, args.uuid, storeObj, nullptr, false, args.stmtId, {});
+
+ if (args.storeDocOption == OplogUpdateEntryArgs::StoreDocOption::PreImage) {
+ preAndPostTs.preImageTs = noteUpdateOpTime.getTimestamp();
+ } else if (args.storeDocOption == OplogUpdateEntryArgs::StoreDocOption::PostImage) {
+ preAndPostTs.postImageTs = noteUpdateOpTime.getTimestamp();
+ }
+ }
+
+ return repl::logOp(opCtx,
+ "u",
+ args.nss,
+ args.uuid,
+ args.update,
+ &args.criteria,
+ args.fromMigrate,
+ args.stmtId,
+ preAndPostTs);
+}
+
+/**
+ * Write oplog entry(ies) for the delete operation.
+ */
+repl::OpTime replLogDelete(OperationContext* opCtx,
+ const NamespaceString& nss,
+ OptionalCollectionUUID uuid,
+ StmtId stmtId,
+ CollectionShardingState::DeleteState deleteState,
+ bool fromMigrate,
+ const boost::optional<BSONObj>& deletedDoc) {
+ repl::PreAndPostImageTimestamps preAndPostImageTs;
+
+ if (deletedDoc.is_initialized() && opCtx->getTxnNumber().is_initialized()) {
+ auto noteOplog =
+ repl::logOp(opCtx, "n", nss, uuid, deletedDoc.get(), nullptr, false, stmtId, {});
+ preAndPostImageTs.preImageTs = noteOplog.getTimestamp();
+ }
+
+ return repl::logOp(opCtx,
+ "d",
+ nss,
+ uuid,
+ deleteState.documentKey,
+ nullptr,
+ fromMigrate,
+ stmtId,
+ preAndPostImageTs);
+}
+
} // namespace
void OpObserverImpl::onCreateIndex(OperationContext* opCtx,
@@ -97,10 +162,18 @@ void OpObserverImpl::onCreateIndex(OperationContext* opCtx,
builder.done(),
nullptr,
fromMigrate,
- kUninitializedStmtId);
+ kUninitializedStmtId,
+ {});
} else {
- repl::logOp(
- opCtx, "i", systemIndexes, {}, indexDoc, nullptr, fromMigrate, kUninitializedStmtId);
+ repl::logOp(opCtx,
+ "i",
+ systemIndexes,
+ {},
+ indexDoc,
+ nullptr,
+ fromMigrate,
+ kUninitializedStmtId,
+ {});
}
AuthorizationManager::get(opCtx->getServiceContext())
->logOp(opCtx, "i", systemIndexes, indexDoc, nullptr);
@@ -151,14 +224,8 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
return;
}
- auto opTime = repl::logOp(opCtx,
- "u",
- args.nss,
- args.uuid,
- args.update,
- &args.criteria,
- args.fromMigrate,
- args.stmtId);
+ auto opTime = replLogUpdate(opCtx, args);
+
AuthorizationManager::get(opCtx->getServiceContext())
->logOp(opCtx, "u", args.nss, args.update, &args.criteria);
@@ -194,12 +261,13 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
OptionalCollectionUUID uuid,
StmtId stmtId,
CollectionShardingState::DeleteState deleteState,
- bool fromMigrate) {
+ bool fromMigrate,
+ const boost::optional<BSONObj>& deletedDoc) {
if (deleteState.documentKey.isEmpty())
return;
- auto opTime =
- repl::logOp(opCtx, "d", nss, uuid, deleteState.documentKey, nullptr, fromMigrate, stmtId);
+ auto opTime = replLogDelete(opCtx, nss, uuid, stmtId, deleteState, fromMigrate, deletedDoc);
+
AuthorizationManager::get(opCtx->getServiceContext())
->logOp(opCtx, "d", nss, deleteState.documentKey, nullptr);
@@ -222,7 +290,7 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
}
void OpObserverImpl::onOpMessage(OperationContext* opCtx, const BSONObj& msgObj) {
- repl::logOp(opCtx, "n", {}, {}, msgObj, nullptr, false, kUninitializedStmtId);
+ repl::logOp(opCtx, "n", {}, {}, msgObj, nullptr, false, kUninitializedStmtId, {});
}
void OpObserverImpl::onCreateCollection(OperationContext* opCtx,
@@ -254,7 +322,8 @@ void OpObserverImpl::onCreateCollection(OperationContext* opCtx,
if (!collectionName.isSystemDotProfile()) {
// do not replicate system.profile modifications
- repl::logOp(opCtx, "c", dbName, options.uuid, cmdObj, nullptr, false, kUninitializedStmtId);
+ repl::logOp(
+ opCtx, "c", dbName, options.uuid, cmdObj, nullptr, false, kUninitializedStmtId, {});
}
getGlobalAuthorizationManager()->logOp(opCtx, "c", dbName, cmdObj, nullptr);
@@ -324,7 +393,7 @@ void OpObserverImpl::onCollMod(OperationContext* opCtx,
if (!nss.isSystemDotProfile()) {
// do not replicate system.profile modifications
- repl::logOp(opCtx, "c", cmdNss, uuid, cmdObj, &o2Obj, false, kUninitializedStmtId);
+ repl::logOp(opCtx, "c", cmdNss, uuid, cmdObj, &o2Obj, false, kUninitializedStmtId, {});
}
getGlobalAuthorizationManager()->logOp(opCtx, "c", cmdNss, cmdObj, nullptr);
@@ -354,7 +423,7 @@ void OpObserverImpl::onDropDatabase(OperationContext* opCtx, const std::string&
BSONObj cmdObj = BSON("dropDatabase" << 1);
const NamespaceString cmdNss{dbName, "$cmd"};
- repl::logOp(opCtx, "c", cmdNss, {}, cmdObj, nullptr, false, kUninitializedStmtId);
+ repl::logOp(opCtx, "c", cmdNss, {}, cmdObj, nullptr, false, kUninitializedStmtId, {});
if (dbName == FeatureCompatibilityVersion::kDatabase) {
FeatureCompatibilityVersion::onDropCollection(opCtx);
@@ -375,7 +444,7 @@ repl::OpTime OpObserverImpl::onDropCollection(OperationContext* opCtx,
if (!collectionName.isSystemDotProfile()) {
// do not replicate system.profile modifications
dropOpTime =
- repl::logOp(opCtx, "c", dbName, uuid, cmdObj, nullptr, false, kUninitializedStmtId);
+ repl::logOp(opCtx, "c", dbName, uuid, cmdObj, nullptr, false, kUninitializedStmtId, {});
}
if (collectionName.coll() == DurableViewCatalog::viewsCollectionName()) {
@@ -410,7 +479,7 @@ void OpObserverImpl::onDropIndex(OperationContext* opCtx,
const BSONObj& indexInfo) {
BSONObj cmdObj = BSON("dropIndexes" << nss.coll() << "index" << indexName);
auto commandNS = nss.getCommandNS();
- repl::logOp(opCtx, "c", commandNS, uuid, cmdObj, &indexInfo, false, kUninitializedStmtId);
+ repl::logOp(opCtx, "c", commandNS, uuid, cmdObj, &indexInfo, false, kUninitializedStmtId, {});
getGlobalAuthorizationManager()->logOp(opCtx, "c", commandNS, cmdObj, &indexInfo);
}
@@ -439,7 +508,7 @@ repl::OpTime OpObserverImpl::onRenameCollection(OperationContext* opCtx,
BSONObj cmdObj = builder.done();
auto renameOpTime =
- repl::logOp(opCtx, "c", cmdNss, uuid, cmdObj, nullptr, false, kUninitializedStmtId);
+ repl::logOp(opCtx, "c", cmdNss, uuid, cmdObj, nullptr, false, kUninitializedStmtId, {});
if (fromCollection.isSystemDotViews())
DurableViewCatalog::onExternalChange(opCtx, fromCollection);
@@ -472,7 +541,7 @@ void OpObserverImpl::onApplyOps(OperationContext* opCtx,
const std::string& dbName,
const BSONObj& applyOpCmd) {
const NamespaceString cmdNss{dbName, "$cmd"};
- repl::logOp(opCtx, "c", cmdNss, {}, applyOpCmd, nullptr, false, kUninitializedStmtId);
+ repl::logOp(opCtx, "c", cmdNss, {}, applyOpCmd, nullptr, false, kUninitializedStmtId, {});
getGlobalAuthorizationManager()->logOp(opCtx, "c", cmdNss, applyOpCmd, nullptr);
}
@@ -485,7 +554,7 @@ void OpObserverImpl::onEmptyCapped(OperationContext* opCtx,
if (!collectionName.isSystemDotProfile()) {
// do not replicate system.profile modifications
- repl::logOp(opCtx, "c", cmdNss, uuid, cmdObj, nullptr, false, kUninitializedStmtId);
+ repl::logOp(opCtx, "c", cmdNss, uuid, cmdObj, nullptr, false, kUninitializedStmtId, {});
}
getGlobalAuthorizationManager()->logOp(opCtx, "c", cmdNss, cmdObj, nullptr);
diff --git a/src/mongo/db/op_observer_impl.h b/src/mongo/db/op_observer_impl.h
index 25f1e28523f..7dff0515586 100644
--- a/src/mongo/db/op_observer_impl.h
+++ b/src/mongo/db/op_observer_impl.h
@@ -59,7 +59,8 @@ public:
OptionalCollectionUUID uuid,
StmtId stmtId,
CollectionShardingState::DeleteState deleteState,
- bool fromMigrate) override;
+ bool fromMigrate,
+ const boost::optional<BSONObj>& deletedDoc) override;
void onOpMessage(OperationContext* opCtx, const BSONObj& msgObj) override;
void onCreateCollection(OperationContext* opCtx,
Collection* coll,
diff --git a/src/mongo/db/op_observer_noop.cpp b/src/mongo/db/op_observer_noop.cpp
index 648f14b6a99..a0ad429a6c0 100644
--- a/src/mongo/db/op_observer_noop.cpp
+++ b/src/mongo/db/op_observer_noop.cpp
@@ -60,7 +60,8 @@ void OpObserverNoop::onDelete(OperationContext*,
OptionalCollectionUUID,
StmtId stmtId,
CollectionShardingState::DeleteState,
- bool) {}
+ bool fromMigrate,
+ const boost::optional<BSONObj>& deletedDoc) {}
void OpObserverNoop::onOpMessage(OperationContext*, const BSONObj&) {}
diff --git a/src/mongo/db/op_observer_noop.h b/src/mongo/db/op_observer_noop.h
index 7e51f18177b..a15e2b51dec 100644
--- a/src/mongo/db/op_observer_noop.h
+++ b/src/mongo/db/op_observer_noop.h
@@ -59,7 +59,8 @@ public:
OptionalCollectionUUID uuid,
StmtId stmtId,
CollectionShardingState::DeleteState deleteState,
- bool fromMigrate) override;
+ bool fromMigrate,
+ const boost::optional<BSONObj>& deletedDoc) override;
void onOpMessage(OperationContext* opCtx, const BSONObj& msgObj) override;
void onCreateCollection(OperationContext* opCtx,
Collection* coll,
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index d8877504c04..f22cc9a8ede 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -333,7 +333,8 @@ OplogDocWriter _logOpWriter(OperationContext* opCtx,
long long hashNew,
Date_t wallTime,
StmtId statementId,
- const Timestamp& prevTs) {
+ const Timestamp& prevTs,
+ const PreAndPostImageTimestamps& preAndPostTs) {
BSONObjBuilder b(256);
b.append("ts", optime.getTimestamp());
@@ -360,6 +361,14 @@ OplogDocWriter _logOpWriter(OperationContext* opCtx,
appendSessionInfo(opCtx, &b, statementId, prevTs);
+ if (!preAndPostTs.preImageTs.isNull()) {
+ b.append(OplogEntryBase::kPreImageTsFieldName, preAndPostTs.preImageTs);
+ }
+
+ if (!preAndPostTs.postImageTs.isNull()) {
+ b.append(OplogEntryBase::kPostImageTsFieldName, preAndPostTs.postImageTs);
+ }
+
return OplogDocWriter(OplogDocWriter(b.obj(), obj));
}
} // end anon namespace
@@ -414,7 +423,8 @@ OpTime logOp(OperationContext* opCtx,
const BSONObj& obj,
const BSONObj* o2,
bool fromMigrate,
- StmtId statementId) {
+ StmtId statementId,
+ const PreAndPostImageTimestamps& preAndPostTs) {
auto replCoord = ReplicationCoordinator::get(opCtx);
if (replCoord->isOplogDisabledFor(opCtx, nss)) {
invariant(statementId == kUninitializedStmtId);
@@ -444,7 +454,8 @@ OpTime logOp(OperationContext* opCtx,
slot.hash,
Date_t::now(),
statementId,
- prevTs);
+ prevTs,
+ preAndPostTs);
const DocWriter* basePtr = &writer;
_logOpsInner(opCtx, nss, &basePtr, 1, oplog, replMode, slot.opTime);
return slot.opTime;
@@ -493,7 +504,8 @@ repl::OpTime logInsertOps(OperationContext* opCtx,
slots[i].hash,
wallTime,
insertStatement.stmtId,
- prevTs));
+ prevTs,
+ {}));
prevTs = slots[i].opTime.getTimestamp();
}
diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h
index f119534ad7a..241b5ca746a 100644
--- a/src/mongo/db/repl/oplog.h
+++ b/src/mongo/db/repl/oplog.h
@@ -49,6 +49,15 @@ class OperationContext;
namespace repl {
class ReplSettings;
+struct PreAndPostImageTimestamps {
+ PreAndPostImageTimestamps() = default;
+ PreAndPostImageTimestamps(Timestamp _preImageTs, Timestamp _postImageTs)
+ : preImageTs(std::move(_preImageTs)), postImageTs(std::move(_postImageTs)) {}
+
+ Timestamp preImageTs;
+ Timestamp postImageTs;
+};
+
/**
* Create a new capped collection for the oplog if it doesn't yet exist.
* If the collection already exists (and isReplSet is false),
@@ -88,6 +97,10 @@ OpTime logInsertOps(OperationContext* opCtx,
*
* For 'u' records, 'obj' captures the mutation made to the object but not
* the object itself. 'o2' captures the the criteria for the object that will be modified.
+ *
+ * preAndPostImageTs this contains the timestamp of the oplog entry that contains the document
+ * before/after update was applied. The timestamps are ignored if isNull() is true.
+ *
* Returns the optime of the oplog entry written to the oplog.
* Returns a null optime if oplog was not modified.
*/
@@ -98,7 +111,8 @@ OpTime logOp(OperationContext* opCtx,
const BSONObj& obj,
const BSONObj* o2,
bool fromMigrate,
- StmtId stmtId);
+ StmtId stmtId,
+ const PreAndPostImageTimestamps& preAndPostTs);
// Flush out the cached pointers to the local database and oplog.
// Used by the closeDatabase command to ensure we don't cache closed things.
diff --git a/src/mongo/db/repl/oplog_entry.idl b/src/mongo/db/repl/oplog_entry.idl
index 0f5a5731021..a01b38f2ee0 100644
--- a/src/mongo/db/repl/oplog_entry.idl
+++ b/src/mongo/db/repl/oplog_entry.idl
@@ -115,4 +115,13 @@ structs:
type: timestamp
optional: true # Only for writes that are part of a transaction
description: "The oplog timestamp of the previous write with the same transaction."
-
+ preImageTs:
+ type: timestamp
+ optional: true
+ description: "The oplog timestamp of another oplog entry that contains the document
+ before an update/remove was applied."
+ postImageTs:
+ type: timestamp
+ optional: true
+ description: "The oplog timestamp of another oplog entry that contains the document
+ after an update was applied."