summaryrefslogtreecommitdiff
path: root/src/mongo/db/op_observer_impl.cpp
diff options
context:
space:
mode:
authorADAM David Alan Martin <adam.martin@10gen.com>2018-03-01 16:40:51 -0500
committerADAM David Alan Martin <adam.martin@10gen.com>2018-03-01 16:40:51 -0500
commit296fde1259d29e081069fde1c69bb9ae083932b1 (patch)
tree0e0973b0e6e642e09822f59be220be3a8c036466 /src/mongo/db/op_observer_impl.cpp
parent22b2b828a922a7459b4e1c75860a11c7eb3db630 (diff)
downloadmongo-296fde1259d29e081069fde1c69bb9ae083932b1.tar.gz
SERVER-32843 Allow multiple times in OpObservers
The OpObserverRegistry was introduced as an abstraction to allow decoupling making data modifications from the side effects, which need to happen as a result of these modifications, such as op log writes, retryable writes, etc. Some of the OpObserver's methods currently return the OpTime which resulted from logging the operation to the replication oplog. In addition, in certain cases, the OpTime resulting from an earlier OpObserver might be needed by a later one, which is the case with retryable writes. In order to support these requirements, the OpObserver(s) chain should have access to some common per-operation structure, where runtime information could be persisted.
Diffstat (limited to 'src/mongo/db/op_observer_impl.cpp')
-rw-r--r--src/mongo/db/op_observer_impl.cpp388
1 files changed, 210 insertions, 178 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index eea416f7db9..05d6ac0b91c 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -65,6 +65,33 @@ using DeleteState = CollectionShardingState::DeleteState;
const OperationContext::Decoration<DeleteState> getDeleteState =
OperationContext::declareDecoration<DeleteState>();
+repl::OpTime logOperation(OperationContext* opCtx,
+ const char* opstr,
+ const NamespaceString& ns,
+ OptionalCollectionUUID uuid,
+ const BSONObj& obj,
+ const BSONObj* o2,
+ bool fromMigrate,
+ Date_t wallClockTime,
+ const OperationSessionInfo& sessionInfo,
+ StmtId stmtId,
+ const repl::OplogLink& oplogLink) {
+ auto& times = OpObserver::Times::get(opCtx).reservedOpTimes;
+ auto opTime = repl::logOp(opCtx,
+ opstr,
+ ns,
+ uuid,
+ obj,
+ o2,
+ fromMigrate,
+ wallClockTime,
+ sessionInfo,
+ stmtId,
+ oplogLink);
+ times.push_back(opTime);
+ return opTime;
+}
+
/**
* Returns whether we're a master using master-slave replication.
*/
@@ -166,17 +193,17 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx,
opTimes.wallClockTime = getWallClockTimeForOpLog(opCtx);
if (!storeObj.isEmpty() && opCtx->getTxnNumber()) {
- auto noteUpdateOpTime = repl::logOp(opCtx,
- "n",
- args.nss,
- args.uuid,
- storeObj,
- nullptr,
- false,
- opTimes.wallClockTime,
- sessionInfo,
- args.stmtId,
- {});
+ auto noteUpdateOpTime = logOperation(opCtx,
+ "n",
+ args.nss,
+ args.uuid,
+ storeObj,
+ nullptr,
+ false,
+ opTimes.wallClockTime,
+ sessionInfo,
+ args.stmtId,
+ {});
opTimes.prePostImageOpTime = noteUpdateOpTime;
@@ -187,17 +214,17 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx,
}
}
- opTimes.writeOpTime = repl::logOp(opCtx,
- "u",
- args.nss,
- args.uuid,
- args.update,
- &args.criteria,
- args.fromMigrate,
- opTimes.wallClockTime,
- sessionInfo,
- args.stmtId,
- oplogLink);
+ opTimes.writeOpTime = logOperation(opCtx,
+ "u",
+ args.nss,
+ args.uuid,
+ args.update,
+ &args.criteria,
+ args.fromMigrate,
+ opTimes.wallClockTime,
+ sessionInfo,
+ args.stmtId,
+ oplogLink);
return opTimes;
}
@@ -225,33 +252,33 @@ OpTimeBundle replLogDelete(OperationContext* opCtx,
opTimes.wallClockTime = getWallClockTimeForOpLog(opCtx);
if (deletedDoc && opCtx->getTxnNumber()) {
- auto noteOplog = repl::logOp(opCtx,
- "n",
- nss,
- uuid,
- deletedDoc.get(),
- nullptr,
- false,
- opTimes.wallClockTime,
- sessionInfo,
- stmtId,
- {});
- opTimes.prePostImageOpTime = noteOplog;
- oplogLink.preImageOpTime = noteOplog;
- }
-
- CollectionShardingState::DeleteState& deleteState = getDeleteState(opCtx);
- opTimes.writeOpTime = repl::logOp(opCtx,
- "d",
+ auto noteOplog = logOperation(opCtx,
+ "n",
nss,
uuid,
- deleteState.documentKey,
+ deletedDoc.get(),
nullptr,
- fromMigrate,
+ false,
opTimes.wallClockTime,
sessionInfo,
stmtId,
- oplogLink);
+ {});
+ opTimes.prePostImageOpTime = noteOplog;
+ oplogLink.preImageOpTime = noteOplog;
+ }
+
+ CollectionShardingState::DeleteState& deleteState = getDeleteState(opCtx);
+ opTimes.writeOpTime = logOperation(opCtx,
+ "d",
+ nss,
+ uuid,
+ deleteState.documentKey,
+ nullptr,
+ fromMigrate,
+ opTimes.wallClockTime,
+ sessionInfo,
+ stmtId,
+ oplogLink);
return opTimes;
}
@@ -266,17 +293,17 @@ OpTimeBundle replLogApplyOps(OperationContext* opCtx,
const repl::OplogLink& oplogLink) {
OpTimeBundle times;
times.wallClockTime = getWallClockTimeForOpLog(opCtx);
- times.writeOpTime = repl::logOp(opCtx,
- "c",
- cmdNss,
- {},
- applyOpCmd,
- nullptr,
- false,
- times.wallClockTime,
- sessionInfo,
- stmtId,
- oplogLink);
+ times.writeOpTime = logOperation(opCtx,
+ "c",
+ cmdNss,
+ {},
+ applyOpCmd,
+ nullptr,
+ false,
+ times.wallClockTime,
+ sessionInfo,
+ stmtId,
+ oplogLink);
return times;
}
@@ -298,29 +325,29 @@ void OpObserverImpl::onCreateIndex(OperationContext* opCtx,
builder.append(e);
}
- repl::logOp(opCtx,
- "c",
- nss.getCommandNS(),
- uuid,
- builder.done(),
- nullptr,
- fromMigrate,
- getWallClockTimeForOpLog(opCtx),
- {},
- kUninitializedStmtId,
- {});
+ logOperation(opCtx,
+ "c",
+ nss.getCommandNS(),
+ uuid,
+ builder.done(),
+ nullptr,
+ fromMigrate,
+ getWallClockTimeForOpLog(opCtx),
+ {},
+ kUninitializedStmtId,
+ {});
} else {
- repl::logOp(opCtx,
- "i",
- systemIndexes,
- {},
- indexDoc,
- nullptr,
- fromMigrate,
- getWallClockTimeForOpLog(opCtx),
- {},
- kUninitializedStmtId,
- {});
+ logOperation(opCtx,
+ "i",
+ systemIndexes,
+ {},
+ indexDoc,
+ nullptr,
+ fromMigrate,
+ getWallClockTimeForOpLog(opCtx),
+ {},
+ kUninitializedStmtId,
+ {});
}
AuthorizationManager::get(opCtx->getServiceContext())
@@ -335,12 +362,12 @@ void OpObserverImpl::onCreateIndex(OperationContext* opCtx,
void OpObserverImpl::onInserts(OperationContext* opCtx,
const NamespaceString& nss,
OptionalCollectionUUID uuid,
- std::vector<InsertStatement>::const_iterator begin,
- std::vector<InsertStatement>::const_iterator end,
+ std::vector<InsertStatement>::const_iterator first,
+ std::vector<InsertStatement>::const_iterator last,
bool fromMigrate) {
Session* const session = opCtx->getTxnNumber() ? OperationContextSession::get(opCtx) : nullptr;
if (session && opCtx->writesAreReplicated() && session->inMultiDocumentTransaction()) {
- for (auto iter = begin; iter != end; iter++) {
+ for (auto iter = first; iter != last; iter++) {
auto operation = OplogEntry::makeInsertOperation(nss, uuid, iter->doc);
session->addTransactionOperation(opCtx, operation);
}
@@ -350,14 +377,19 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
const auto lastWriteDate = getWallClockTimeForOpLog(opCtx);
const auto opTimeList =
- repl::logInsertOps(opCtx, nss, uuid, session, begin, end, fromMigrate, lastWriteDate);
+ repl::logInsertOps(opCtx, nss, uuid, session, first, last, fromMigrate, lastWriteDate);
+
+ auto& times = OpObserver::Times::get(opCtx).reservedOpTimes;
+ using std::begin;
+ using std::end;
+ times.insert(end(times), begin(opTimeList), end(opTimeList));
auto css = (nss == NamespaceString::kSessionTransactionsTableNamespace || fromMigrate)
? nullptr
: CollectionShardingState::get(opCtx, nss);
size_t index = 0;
- for (auto it = begin; it != end; it++, index++) {
+ for (auto it = first; it != last; it++, index++) {
AuthorizationManager::get(opCtx->getServiceContext())
->logOp(opCtx, "i", nss, it->doc, nullptr);
if (css) {
@@ -372,19 +404,20 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
} else if (nss.coll() == DurableViewCatalog::viewsCollectionName()) {
DurableViewCatalog::onExternalChange(opCtx, nss);
} else if (nss.ns() == FeatureCompatibilityVersion::kCollection) {
- for (auto it = begin; it != end; it++) {
+ for (auto it = first; it != last; it++) {
FeatureCompatibilityVersion::onInsertOrUpdate(opCtx, it->doc);
}
} else if (nss == NamespaceString::kSessionTransactionsTableNamespace && !lastOpTime.isNull()) {
- for (auto it = begin; it != end; it++) {
+ for (auto it = first; it != last; it++) {
SessionCatalog::get(opCtx)->invalidateSessions(opCtx, it->doc);
}
}
std::vector<StmtId> stmtIdsWritten;
- std::transform(begin, end, std::back_inserter(stmtIdsWritten), [](const InsertStatement& stmt) {
- return stmt.stmtId;
- });
+ std::transform(first,
+ last,
+ std::back_inserter(stmtIdsWritten),
+ [](const InsertStatement& stmt) { return stmt.stmtId; });
onWriteOpCompleted(opCtx, nss, session, stmtIdsWritten, lastOpTime, lastWriteDate);
}
@@ -511,17 +544,17 @@ void OpObserverImpl::onInternalOpMessage(OperationContext* opCtx,
const BSONObj& msgObj,
const boost::optional<BSONObj> o2MsgObj) {
const BSONObj* o2MsgPtr = o2MsgObj ? o2MsgObj.get_ptr() : nullptr;
- repl::logOp(opCtx,
- "n",
- nss,
- uuid,
- msgObj,
- o2MsgPtr,
- false,
- getWallClockTimeForOpLog(opCtx),
- {},
- kUninitializedStmtId,
- {});
+ logOperation(opCtx,
+ "n",
+ nss,
+ uuid,
+ msgObj,
+ o2MsgPtr,
+ false,
+ getWallClockTimeForOpLog(opCtx),
+ {},
+ kUninitializedStmtId,
+ {});
}
void OpObserverImpl::onCreateCollection(OperationContext* opCtx,
@@ -554,17 +587,17 @@ void OpObserverImpl::onCreateCollection(OperationContext* opCtx,
if (!collectionName.isSystemDotProfile()) {
// do not replicate system.profile modifications
- repl::logOp(opCtx,
- "c",
- cmdNss,
- options.uuid,
- cmdObj,
- nullptr,
- false,
- getWallClockTimeForOpLog(opCtx),
- {},
- kUninitializedStmtId,
- {});
+ logOperation(opCtx,
+ "c",
+ cmdNss,
+ options.uuid,
+ cmdObj,
+ nullptr,
+ false,
+ getWallClockTimeForOpLog(opCtx),
+ {},
+ kUninitializedStmtId,
+ {});
}
AuthorizationManager::get(opCtx->getServiceContext())
@@ -600,17 +633,17 @@ void OpObserverImpl::onCollMod(OperationContext* opCtx,
if (!nss.isSystemDotProfile()) {
// do not replicate system.profile modifications
- repl::logOp(opCtx,
- "c",
- cmdNss,
- uuid,
- cmdObj,
- &o2Obj,
- false,
- getWallClockTimeForOpLog(opCtx),
- {},
- kUninitializedStmtId,
- {});
+ logOperation(opCtx,
+ "c",
+ cmdNss,
+ uuid,
+ cmdObj,
+ &o2Obj,
+ false,
+ getWallClockTimeForOpLog(opCtx),
+ {},
+ kUninitializedStmtId,
+ {});
}
AuthorizationManager::get(opCtx->getServiceContext())
@@ -634,17 +667,17 @@ void OpObserverImpl::onDropDatabase(OperationContext* opCtx, const std::string&
const NamespaceString cmdNss{dbName, "$cmd"};
const auto cmdObj = BSON("dropDatabase" << 1);
- repl::logOp(opCtx,
- "c",
- cmdNss,
- {},
- cmdObj,
- nullptr,
- false,
- getWallClockTimeForOpLog(opCtx),
- {},
- kUninitializedStmtId,
- {});
+ logOperation(opCtx,
+ "c",
+ cmdNss,
+ {},
+ cmdObj,
+ nullptr,
+ false,
+ getWallClockTimeForOpLog(opCtx),
+ {},
+ kUninitializedStmtId,
+ {});
uassert(50714,
"dropping the admin database is not allowed.",
@@ -666,20 +699,19 @@ repl::OpTime OpObserverImpl::onDropCollection(OperationContext* opCtx,
const auto cmdNss = collectionName.getCommandNS();
const auto cmdObj = BSON("drop" << collectionName.coll());
- repl::OpTime dropOpTime;
if (!collectionName.isSystemDotProfile()) {
// Do not replicate system.profile modifications.
- dropOpTime = repl::logOp(opCtx,
- "c",
- cmdNss,
- uuid,
- cmdObj,
- nullptr,
- false,
- getWallClockTimeForOpLog(opCtx),
- {},
- kUninitializedStmtId,
- {});
+ logOperation(opCtx,
+ "c",
+ cmdNss,
+ uuid,
+ cmdObj,
+ nullptr,
+ false,
+ getWallClockTimeForOpLog(opCtx),
+ {},
+ kUninitializedStmtId,
+ {});
}
uassert(50715,
@@ -701,7 +733,7 @@ repl::OpTime OpObserverImpl::onDropCollection(OperationContext* opCtx,
// Evict namespace entry from the namespace/uuid cache if it exists.
NamespaceUUIDCache::get(opCtx).evictNamespace(collectionName);
- return dropOpTime;
+ return {};
}
void OpObserverImpl::onDropIndex(OperationContext* opCtx,
@@ -712,23 +744,23 @@ void OpObserverImpl::onDropIndex(OperationContext* opCtx,
const auto cmdNss = nss.getCommandNS();
const auto cmdObj = BSON("dropIndexes" << nss.coll() << "index" << indexName);
- repl::logOp(opCtx,
- "c",
- cmdNss,
- uuid,
- cmdObj,
- &indexInfo,
- false,
- getWallClockTimeForOpLog(opCtx),
- {},
- kUninitializedStmtId,
- {});
+ logOperation(opCtx,
+ "c",
+ cmdNss,
+ uuid,
+ cmdObj,
+ &indexInfo,
+ false,
+ getWallClockTimeForOpLog(opCtx),
+ {},
+ kUninitializedStmtId,
+ {});
AuthorizationManager::get(opCtx->getServiceContext())
->logOp(opCtx, "c", cmdNss, cmdObj, &indexInfo);
}
-repl::OpTime OpObserverImpl::onRenameCollection(OperationContext* opCtx,
+repl::OpTime OpObserverImpl::onRenameCollection(OperationContext* const opCtx,
const NamespaceString& fromCollection,
const NamespaceString& toCollection,
OptionalCollectionUUID uuid,
@@ -749,17 +781,17 @@ repl::OpTime OpObserverImpl::onRenameCollection(OperationContext* opCtx,
const auto cmdObj = builder.done();
- const auto renameOpTime = repl::logOp(opCtx,
- "c",
- cmdNss,
- uuid,
- cmdObj,
- nullptr,
- false,
- getWallClockTimeForOpLog(opCtx),
- {},
- kUninitializedStmtId,
- {});
+ logOperation(opCtx,
+ "c",
+ cmdNss,
+ uuid,
+ cmdObj,
+ nullptr,
+ false,
+ getWallClockTimeForOpLog(opCtx),
+ {},
+ kUninitializedStmtId,
+ {});
if (fromCollection.isSystemDotViews())
DurableViewCatalog::onExternalChange(opCtx, fromCollection);
@@ -776,7 +808,7 @@ repl::OpTime OpObserverImpl::onRenameCollection(OperationContext* opCtx,
opCtx->recoveryUnit()->onRollback(
[&cache, toCollection]() { cache.evictNamespace(toCollection); });
- return renameOpTime;
+ return {};
}
void OpObserverImpl::onApplyOps(OperationContext* opCtx,
@@ -797,17 +829,17 @@ void OpObserverImpl::onEmptyCapped(OperationContext* opCtx,
if (!collectionName.isSystemDotProfile()) {
// Do not replicate system.profile modifications
- repl::logOp(opCtx,
- "c",
- cmdNss,
- uuid,
- cmdObj,
- nullptr,
- false,
- getWallClockTimeForOpLog(opCtx),
- {},
- kUninitializedStmtId,
- {});
+ logOperation(opCtx,
+ "c",
+ cmdNss,
+ uuid,
+ cmdObj,
+ nullptr,
+ false,
+ getWallClockTimeForOpLog(opCtx),
+ {},
+ kUninitializedStmtId,
+ {});
}
AuthorizationManager::get(opCtx->getServiceContext())