summaryrefslogtreecommitdiff
path: root/src/mongo/db/op_observer_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/op_observer_impl.cpp')
-rw-r--r--src/mongo/db/op_observer_impl.cpp388
1 files changed, 210 insertions, 178 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index eea416f7db9..05d6ac0b91c 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -65,6 +65,33 @@ using DeleteState = CollectionShardingState::DeleteState;
const OperationContext::Decoration<DeleteState> getDeleteState =
OperationContext::declareDecoration<DeleteState>();
+repl::OpTime logOperation(OperationContext* opCtx,
+ const char* opstr,
+ const NamespaceString& ns,
+ OptionalCollectionUUID uuid,
+ const BSONObj& obj,
+ const BSONObj* o2,
+ bool fromMigrate,
+ Date_t wallClockTime,
+ const OperationSessionInfo& sessionInfo,
+ StmtId stmtId,
+ const repl::OplogLink& oplogLink) {
+ auto& times = OpObserver::Times::get(opCtx).reservedOpTimes;
+ auto opTime = repl::logOp(opCtx,
+ opstr,
+ ns,
+ uuid,
+ obj,
+ o2,
+ fromMigrate,
+ wallClockTime,
+ sessionInfo,
+ stmtId,
+ oplogLink);
+ times.push_back(opTime);
+ return opTime;
+}
+
/**
* Returns whether we're a master using master-slave replication.
*/
@@ -166,17 +193,17 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx,
opTimes.wallClockTime = getWallClockTimeForOpLog(opCtx);
if (!storeObj.isEmpty() && opCtx->getTxnNumber()) {
- auto noteUpdateOpTime = repl::logOp(opCtx,
- "n",
- args.nss,
- args.uuid,
- storeObj,
- nullptr,
- false,
- opTimes.wallClockTime,
- sessionInfo,
- args.stmtId,
- {});
+ auto noteUpdateOpTime = logOperation(opCtx,
+ "n",
+ args.nss,
+ args.uuid,
+ storeObj,
+ nullptr,
+ false,
+ opTimes.wallClockTime,
+ sessionInfo,
+ args.stmtId,
+ {});
opTimes.prePostImageOpTime = noteUpdateOpTime;
@@ -187,17 +214,17 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx,
}
}
- opTimes.writeOpTime = repl::logOp(opCtx,
- "u",
- args.nss,
- args.uuid,
- args.update,
- &args.criteria,
- args.fromMigrate,
- opTimes.wallClockTime,
- sessionInfo,
- args.stmtId,
- oplogLink);
+ opTimes.writeOpTime = logOperation(opCtx,
+ "u",
+ args.nss,
+ args.uuid,
+ args.update,
+ &args.criteria,
+ args.fromMigrate,
+ opTimes.wallClockTime,
+ sessionInfo,
+ args.stmtId,
+ oplogLink);
return opTimes;
}
@@ -225,33 +252,33 @@ OpTimeBundle replLogDelete(OperationContext* opCtx,
opTimes.wallClockTime = getWallClockTimeForOpLog(opCtx);
if (deletedDoc && opCtx->getTxnNumber()) {
- auto noteOplog = repl::logOp(opCtx,
- "n",
- nss,
- uuid,
- deletedDoc.get(),
- nullptr,
- false,
- opTimes.wallClockTime,
- sessionInfo,
- stmtId,
- {});
- opTimes.prePostImageOpTime = noteOplog;
- oplogLink.preImageOpTime = noteOplog;
- }
-
- CollectionShardingState::DeleteState& deleteState = getDeleteState(opCtx);
- opTimes.writeOpTime = repl::logOp(opCtx,
- "d",
+ auto noteOplog = logOperation(opCtx,
+ "n",
nss,
uuid,
- deleteState.documentKey,
+ deletedDoc.get(),
nullptr,
- fromMigrate,
+ false,
opTimes.wallClockTime,
sessionInfo,
stmtId,
- oplogLink);
+ {});
+ opTimes.prePostImageOpTime = noteOplog;
+ oplogLink.preImageOpTime = noteOplog;
+ }
+
+ CollectionShardingState::DeleteState& deleteState = getDeleteState(opCtx);
+ opTimes.writeOpTime = logOperation(opCtx,
+ "d",
+ nss,
+ uuid,
+ deleteState.documentKey,
+ nullptr,
+ fromMigrate,
+ opTimes.wallClockTime,
+ sessionInfo,
+ stmtId,
+ oplogLink);
return opTimes;
}
@@ -266,17 +293,17 @@ OpTimeBundle replLogApplyOps(OperationContext* opCtx,
const repl::OplogLink& oplogLink) {
OpTimeBundle times;
times.wallClockTime = getWallClockTimeForOpLog(opCtx);
- times.writeOpTime = repl::logOp(opCtx,
- "c",
- cmdNss,
- {},
- applyOpCmd,
- nullptr,
- false,
- times.wallClockTime,
- sessionInfo,
- stmtId,
- oplogLink);
+ times.writeOpTime = logOperation(opCtx,
+ "c",
+ cmdNss,
+ {},
+ applyOpCmd,
+ nullptr,
+ false,
+ times.wallClockTime,
+ sessionInfo,
+ stmtId,
+ oplogLink);
return times;
}
@@ -298,29 +325,29 @@ void OpObserverImpl::onCreateIndex(OperationContext* opCtx,
builder.append(e);
}
- repl::logOp(opCtx,
- "c",
- nss.getCommandNS(),
- uuid,
- builder.done(),
- nullptr,
- fromMigrate,
- getWallClockTimeForOpLog(opCtx),
- {},
- kUninitializedStmtId,
- {});
+ logOperation(opCtx,
+ "c",
+ nss.getCommandNS(),
+ uuid,
+ builder.done(),
+ nullptr,
+ fromMigrate,
+ getWallClockTimeForOpLog(opCtx),
+ {},
+ kUninitializedStmtId,
+ {});
} else {
- repl::logOp(opCtx,
- "i",
- systemIndexes,
- {},
- indexDoc,
- nullptr,
- fromMigrate,
- getWallClockTimeForOpLog(opCtx),
- {},
- kUninitializedStmtId,
- {});
+ logOperation(opCtx,
+ "i",
+ systemIndexes,
+ {},
+ indexDoc,
+ nullptr,
+ fromMigrate,
+ getWallClockTimeForOpLog(opCtx),
+ {},
+ kUninitializedStmtId,
+ {});
}
AuthorizationManager::get(opCtx->getServiceContext())
@@ -335,12 +362,12 @@ void OpObserverImpl::onCreateIndex(OperationContext* opCtx,
void OpObserverImpl::onInserts(OperationContext* opCtx,
const NamespaceString& nss,
OptionalCollectionUUID uuid,
- std::vector<InsertStatement>::const_iterator begin,
- std::vector<InsertStatement>::const_iterator end,
+ std::vector<InsertStatement>::const_iterator first,
+ std::vector<InsertStatement>::const_iterator last,
bool fromMigrate) {
Session* const session = opCtx->getTxnNumber() ? OperationContextSession::get(opCtx) : nullptr;
if (session && opCtx->writesAreReplicated() && session->inMultiDocumentTransaction()) {
- for (auto iter = begin; iter != end; iter++) {
+ for (auto iter = first; iter != last; iter++) {
auto operation = OplogEntry::makeInsertOperation(nss, uuid, iter->doc);
session->addTransactionOperation(opCtx, operation);
}
@@ -350,14 +377,19 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
const auto lastWriteDate = getWallClockTimeForOpLog(opCtx);
const auto opTimeList =
- repl::logInsertOps(opCtx, nss, uuid, session, begin, end, fromMigrate, lastWriteDate);
+ repl::logInsertOps(opCtx, nss, uuid, session, first, last, fromMigrate, lastWriteDate);
+
+ auto& times = OpObserver::Times::get(opCtx).reservedOpTimes;
+ using std::begin;
+ using std::end;
+ times.insert(end(times), begin(opTimeList), end(opTimeList));
auto css = (nss == NamespaceString::kSessionTransactionsTableNamespace || fromMigrate)
? nullptr
: CollectionShardingState::get(opCtx, nss);
size_t index = 0;
- for (auto it = begin; it != end; it++, index++) {
+ for (auto it = first; it != last; it++, index++) {
AuthorizationManager::get(opCtx->getServiceContext())
->logOp(opCtx, "i", nss, it->doc, nullptr);
if (css) {
@@ -372,19 +404,20 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
} else if (nss.coll() == DurableViewCatalog::viewsCollectionName()) {
DurableViewCatalog::onExternalChange(opCtx, nss);
} else if (nss.ns() == FeatureCompatibilityVersion::kCollection) {
- for (auto it = begin; it != end; it++) {
+ for (auto it = first; it != last; it++) {
FeatureCompatibilityVersion::onInsertOrUpdate(opCtx, it->doc);
}
} else if (nss == NamespaceString::kSessionTransactionsTableNamespace && !lastOpTime.isNull()) {
- for (auto it = begin; it != end; it++) {
+ for (auto it = first; it != last; it++) {
SessionCatalog::get(opCtx)->invalidateSessions(opCtx, it->doc);
}
}
std::vector<StmtId> stmtIdsWritten;
- std::transform(begin, end, std::back_inserter(stmtIdsWritten), [](const InsertStatement& stmt) {
- return stmt.stmtId;
- });
+ std::transform(first,
+ last,
+ std::back_inserter(stmtIdsWritten),
+ [](const InsertStatement& stmt) { return stmt.stmtId; });
onWriteOpCompleted(opCtx, nss, session, stmtIdsWritten, lastOpTime, lastWriteDate);
}
@@ -511,17 +544,17 @@ void OpObserverImpl::onInternalOpMessage(OperationContext* opCtx,
const BSONObj& msgObj,
const boost::optional<BSONObj> o2MsgObj) {
const BSONObj* o2MsgPtr = o2MsgObj ? o2MsgObj.get_ptr() : nullptr;
- repl::logOp(opCtx,
- "n",
- nss,
- uuid,
- msgObj,
- o2MsgPtr,
- false,
- getWallClockTimeForOpLog(opCtx),
- {},
- kUninitializedStmtId,
- {});
+ logOperation(opCtx,
+ "n",
+ nss,
+ uuid,
+ msgObj,
+ o2MsgPtr,
+ false,
+ getWallClockTimeForOpLog(opCtx),
+ {},
+ kUninitializedStmtId,
+ {});
}
void OpObserverImpl::onCreateCollection(OperationContext* opCtx,
@@ -554,17 +587,17 @@ void OpObserverImpl::onCreateCollection(OperationContext* opCtx,
if (!collectionName.isSystemDotProfile()) {
// do not replicate system.profile modifications
- repl::logOp(opCtx,
- "c",
- cmdNss,
- options.uuid,
- cmdObj,
- nullptr,
- false,
- getWallClockTimeForOpLog(opCtx),
- {},
- kUninitializedStmtId,
- {});
+ logOperation(opCtx,
+ "c",
+ cmdNss,
+ options.uuid,
+ cmdObj,
+ nullptr,
+ false,
+ getWallClockTimeForOpLog(opCtx),
+ {},
+ kUninitializedStmtId,
+ {});
}
AuthorizationManager::get(opCtx->getServiceContext())
@@ -600,17 +633,17 @@ void OpObserverImpl::onCollMod(OperationContext* opCtx,
if (!nss.isSystemDotProfile()) {
// do not replicate system.profile modifications
- repl::logOp(opCtx,
- "c",
- cmdNss,
- uuid,
- cmdObj,
- &o2Obj,
- false,
- getWallClockTimeForOpLog(opCtx),
- {},
- kUninitializedStmtId,
- {});
+ logOperation(opCtx,
+ "c",
+ cmdNss,
+ uuid,
+ cmdObj,
+ &o2Obj,
+ false,
+ getWallClockTimeForOpLog(opCtx),
+ {},
+ kUninitializedStmtId,
+ {});
}
AuthorizationManager::get(opCtx->getServiceContext())
@@ -634,17 +667,17 @@ void OpObserverImpl::onDropDatabase(OperationContext* opCtx, const std::string&
const NamespaceString cmdNss{dbName, "$cmd"};
const auto cmdObj = BSON("dropDatabase" << 1);
- repl::logOp(opCtx,
- "c",
- cmdNss,
- {},
- cmdObj,
- nullptr,
- false,
- getWallClockTimeForOpLog(opCtx),
- {},
- kUninitializedStmtId,
- {});
+ logOperation(opCtx,
+ "c",
+ cmdNss,
+ {},
+ cmdObj,
+ nullptr,
+ false,
+ getWallClockTimeForOpLog(opCtx),
+ {},
+ kUninitializedStmtId,
+ {});
uassert(50714,
"dropping the admin database is not allowed.",
@@ -666,20 +699,19 @@ repl::OpTime OpObserverImpl::onDropCollection(OperationContext* opCtx,
const auto cmdNss = collectionName.getCommandNS();
const auto cmdObj = BSON("drop" << collectionName.coll());
- repl::OpTime dropOpTime;
if (!collectionName.isSystemDotProfile()) {
// Do not replicate system.profile modifications.
- dropOpTime = repl::logOp(opCtx,
- "c",
- cmdNss,
- uuid,
- cmdObj,
- nullptr,
- false,
- getWallClockTimeForOpLog(opCtx),
- {},
- kUninitializedStmtId,
- {});
+ logOperation(opCtx,
+ "c",
+ cmdNss,
+ uuid,
+ cmdObj,
+ nullptr,
+ false,
+ getWallClockTimeForOpLog(opCtx),
+ {},
+ kUninitializedStmtId,
+ {});
}
uassert(50715,
@@ -701,7 +733,7 @@ repl::OpTime OpObserverImpl::onDropCollection(OperationContext* opCtx,
// Evict namespace entry from the namespace/uuid cache if it exists.
NamespaceUUIDCache::get(opCtx).evictNamespace(collectionName);
- return dropOpTime;
+ return {};
}
void OpObserverImpl::onDropIndex(OperationContext* opCtx,
@@ -712,23 +744,23 @@ void OpObserverImpl::onDropIndex(OperationContext* opCtx,
const auto cmdNss = nss.getCommandNS();
const auto cmdObj = BSON("dropIndexes" << nss.coll() << "index" << indexName);
- repl::logOp(opCtx,
- "c",
- cmdNss,
- uuid,
- cmdObj,
- &indexInfo,
- false,
- getWallClockTimeForOpLog(opCtx),
- {},
- kUninitializedStmtId,
- {});
+ logOperation(opCtx,
+ "c",
+ cmdNss,
+ uuid,
+ cmdObj,
+ &indexInfo,
+ false,
+ getWallClockTimeForOpLog(opCtx),
+ {},
+ kUninitializedStmtId,
+ {});
AuthorizationManager::get(opCtx->getServiceContext())
->logOp(opCtx, "c", cmdNss, cmdObj, &indexInfo);
}
-repl::OpTime OpObserverImpl::onRenameCollection(OperationContext* opCtx,
+repl::OpTime OpObserverImpl::onRenameCollection(OperationContext* const opCtx,
const NamespaceString& fromCollection,
const NamespaceString& toCollection,
OptionalCollectionUUID uuid,
@@ -749,17 +781,17 @@ repl::OpTime OpObserverImpl::onRenameCollection(OperationContext* opCtx,
const auto cmdObj = builder.done();
- const auto renameOpTime = repl::logOp(opCtx,
- "c",
- cmdNss,
- uuid,
- cmdObj,
- nullptr,
- false,
- getWallClockTimeForOpLog(opCtx),
- {},
- kUninitializedStmtId,
- {});
+ logOperation(opCtx,
+ "c",
+ cmdNss,
+ uuid,
+ cmdObj,
+ nullptr,
+ false,
+ getWallClockTimeForOpLog(opCtx),
+ {},
+ kUninitializedStmtId,
+ {});
if (fromCollection.isSystemDotViews())
DurableViewCatalog::onExternalChange(opCtx, fromCollection);
@@ -776,7 +808,7 @@ repl::OpTime OpObserverImpl::onRenameCollection(OperationContext* opCtx,
opCtx->recoveryUnit()->onRollback(
[&cache, toCollection]() { cache.evictNamespace(toCollection); });
- return renameOpTime;
+ return {};
}
void OpObserverImpl::onApplyOps(OperationContext* opCtx,
@@ -797,17 +829,17 @@ void OpObserverImpl::onEmptyCapped(OperationContext* opCtx,
if (!collectionName.isSystemDotProfile()) {
// Do not replicate system.profile modifications
- repl::logOp(opCtx,
- "c",
- cmdNss,
- uuid,
- cmdObj,
- nullptr,
- false,
- getWallClockTimeForOpLog(opCtx),
- {},
- kUninitializedStmtId,
- {});
+ logOperation(opCtx,
+ "c",
+ cmdNss,
+ uuid,
+ cmdObj,
+ nullptr,
+ false,
+ getWallClockTimeForOpLog(opCtx),
+ {},
+ kUninitializedStmtId,
+ {});
}
AuthorizationManager::get(opCtx->getServiceContext())