diff options
author | ADAM David Alan Martin <adam.martin@10gen.com> | 2018-03-01 16:40:51 -0500 |
---|---|---|
committer | ADAM David Alan Martin <adam.martin@10gen.com> | 2018-03-01 16:40:51 -0500 |
commit | 296fde1259d29e081069fde1c69bb9ae083932b1 (patch) | |
tree | 0e0973b0e6e642e09822f59be220be3a8c036466 /src/mongo/db/op_observer_impl.cpp | |
parent | 22b2b828a922a7459b4e1c75860a11c7eb3db630 (diff) | |
download | mongo-296fde1259d29e081069fde1c69bb9ae083932b1.tar.gz |
SERVER-32843 Allow multiple times in OpObservers
The OpObserverRegistry was introduced as an abstraction to allow
decoupling making data modifications from the side effects, which
need to happen as a result of these modifications, such as op log
writes, retryable writes, etc.
Some of the OpObserver's methods currently return the OpTime which
resulted from logging the operation to the replication oplog. In
addition, in certain cases, the OpTime resulting from an earlier
OpObserver might be needed by a later one, which is the case with
retryable writes.
In order to support these requirements, the OpObserver(s) chain
should have access to some common per-operation structure, where
runtime information could be persisted.
Diffstat (limited to 'src/mongo/db/op_observer_impl.cpp')
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 388 |
1 files changed, 210 insertions, 178 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index eea416f7db9..05d6ac0b91c 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -65,6 +65,33 @@ using DeleteState = CollectionShardingState::DeleteState; const OperationContext::Decoration<DeleteState> getDeleteState = OperationContext::declareDecoration<DeleteState>(); +repl::OpTime logOperation(OperationContext* opCtx, + const char* opstr, + const NamespaceString& ns, + OptionalCollectionUUID uuid, + const BSONObj& obj, + const BSONObj* o2, + bool fromMigrate, + Date_t wallClockTime, + const OperationSessionInfo& sessionInfo, + StmtId stmtId, + const repl::OplogLink& oplogLink) { + auto& times = OpObserver::Times::get(opCtx).reservedOpTimes; + auto opTime = repl::logOp(opCtx, + opstr, + ns, + uuid, + obj, + o2, + fromMigrate, + wallClockTime, + sessionInfo, + stmtId, + oplogLink); + times.push_back(opTime); + return opTime; +} + /** * Returns whether we're a master using master-slave replication. */ @@ -166,17 +193,17 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx, opTimes.wallClockTime = getWallClockTimeForOpLog(opCtx); if (!storeObj.isEmpty() && opCtx->getTxnNumber()) { - auto noteUpdateOpTime = repl::logOp(opCtx, - "n", - args.nss, - args.uuid, - storeObj, - nullptr, - false, - opTimes.wallClockTime, - sessionInfo, - args.stmtId, - {}); + auto noteUpdateOpTime = logOperation(opCtx, + "n", + args.nss, + args.uuid, + storeObj, + nullptr, + false, + opTimes.wallClockTime, + sessionInfo, + args.stmtId, + {}); opTimes.prePostImageOpTime = noteUpdateOpTime; @@ -187,17 +214,17 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx, } } - opTimes.writeOpTime = repl::logOp(opCtx, - "u", - args.nss, - args.uuid, - args.update, - &args.criteria, - args.fromMigrate, - opTimes.wallClockTime, - sessionInfo, - args.stmtId, - oplogLink); + opTimes.writeOpTime = logOperation(opCtx, + "u", + args.nss, + args.uuid, + args.update, + &args.criteria, + args.fromMigrate, + opTimes.wallClockTime, + sessionInfo, + args.stmtId, + oplogLink); return opTimes; } @@ -225,33 +252,33 @@ OpTimeBundle replLogDelete(OperationContext* opCtx, opTimes.wallClockTime = getWallClockTimeForOpLog(opCtx); if (deletedDoc && opCtx->getTxnNumber()) { - auto noteOplog = repl::logOp(opCtx, - "n", - nss, - uuid, - deletedDoc.get(), - nullptr, - false, - opTimes.wallClockTime, - sessionInfo, - stmtId, - {}); - opTimes.prePostImageOpTime = noteOplog; - oplogLink.preImageOpTime = noteOplog; - } - - CollectionShardingState::DeleteState& deleteState = getDeleteState(opCtx); - opTimes.writeOpTime = repl::logOp(opCtx, - "d", + auto noteOplog = logOperation(opCtx, + "n", nss, uuid, - deleteState.documentKey, + deletedDoc.get(), nullptr, - fromMigrate, + false, opTimes.wallClockTime, sessionInfo, stmtId, - oplogLink); + {}); + opTimes.prePostImageOpTime = noteOplog; + oplogLink.preImageOpTime = noteOplog; + } + + CollectionShardingState::DeleteState& deleteState = getDeleteState(opCtx); + opTimes.writeOpTime = logOperation(opCtx, + "d", + nss, + uuid, + deleteState.documentKey, + nullptr, + fromMigrate, + opTimes.wallClockTime, + sessionInfo, + stmtId, + oplogLink); return opTimes; } @@ -266,17 +293,17 @@ OpTimeBundle replLogApplyOps(OperationContext* opCtx, const repl::OplogLink& oplogLink) { OpTimeBundle times; times.wallClockTime = getWallClockTimeForOpLog(opCtx); - times.writeOpTime = repl::logOp(opCtx, - "c", - cmdNss, - {}, - applyOpCmd, - nullptr, - false, - times.wallClockTime, - sessionInfo, - stmtId, - oplogLink); + times.writeOpTime = logOperation(opCtx, + "c", + cmdNss, + {}, + applyOpCmd, + nullptr, + false, + times.wallClockTime, + sessionInfo, + stmtId, + oplogLink); return times; } @@ -298,29 +325,29 @@ void OpObserverImpl::onCreateIndex(OperationContext* opCtx, builder.append(e); } - repl::logOp(opCtx, - "c", - nss.getCommandNS(), - uuid, - builder.done(), - nullptr, - fromMigrate, - getWallClockTimeForOpLog(opCtx), - {}, - kUninitializedStmtId, - {}); + logOperation(opCtx, + "c", + nss.getCommandNS(), + uuid, + builder.done(), + nullptr, + fromMigrate, + getWallClockTimeForOpLog(opCtx), + {}, + kUninitializedStmtId, + {}); } else { - repl::logOp(opCtx, - "i", - systemIndexes, - {}, - indexDoc, - nullptr, - fromMigrate, - getWallClockTimeForOpLog(opCtx), - {}, - kUninitializedStmtId, - {}); + logOperation(opCtx, + "i", + systemIndexes, + {}, + indexDoc, + nullptr, + fromMigrate, + getWallClockTimeForOpLog(opCtx), + {}, + kUninitializedStmtId, + {}); } AuthorizationManager::get(opCtx->getServiceContext()) @@ -335,12 +362,12 @@ void OpObserverImpl::onCreateIndex(OperationContext* opCtx, void OpObserverImpl::onInserts(OperationContext* opCtx, const NamespaceString& nss, OptionalCollectionUUID uuid, - std::vector<InsertStatement>::const_iterator begin, - std::vector<InsertStatement>::const_iterator end, + std::vector<InsertStatement>::const_iterator first, + std::vector<InsertStatement>::const_iterator last, bool fromMigrate) { Session* const session = opCtx->getTxnNumber() ? OperationContextSession::get(opCtx) : nullptr; if (session && opCtx->writesAreReplicated() && session->inMultiDocumentTransaction()) { - for (auto iter = begin; iter != end; iter++) { + for (auto iter = first; iter != last; iter++) { auto operation = OplogEntry::makeInsertOperation(nss, uuid, iter->doc); session->addTransactionOperation(opCtx, operation); } @@ -350,14 +377,19 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, const auto lastWriteDate = getWallClockTimeForOpLog(opCtx); const auto opTimeList = - repl::logInsertOps(opCtx, nss, uuid, session, begin, end, fromMigrate, lastWriteDate); + repl::logInsertOps(opCtx, nss, uuid, session, first, last, fromMigrate, lastWriteDate); + + auto& times = OpObserver::Times::get(opCtx).reservedOpTimes; + using std::begin; + using std::end; + times.insert(end(times), begin(opTimeList), end(opTimeList)); auto css = (nss == NamespaceString::kSessionTransactionsTableNamespace || fromMigrate) ? nullptr : CollectionShardingState::get(opCtx, nss); size_t index = 0; - for (auto it = begin; it != end; it++, index++) { + for (auto it = first; it != last; it++, index++) { AuthorizationManager::get(opCtx->getServiceContext()) ->logOp(opCtx, "i", nss, it->doc, nullptr); if (css) { @@ -372,19 +404,20 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, } else if (nss.coll() == DurableViewCatalog::viewsCollectionName()) { DurableViewCatalog::onExternalChange(opCtx, nss); } else if (nss.ns() == FeatureCompatibilityVersion::kCollection) { - for (auto it = begin; it != end; it++) { + for (auto it = first; it != last; it++) { FeatureCompatibilityVersion::onInsertOrUpdate(opCtx, it->doc); } } else if (nss == NamespaceString::kSessionTransactionsTableNamespace && !lastOpTime.isNull()) { - for (auto it = begin; it != end; it++) { + for (auto it = first; it != last; it++) { SessionCatalog::get(opCtx)->invalidateSessions(opCtx, it->doc); } } std::vector<StmtId> stmtIdsWritten; - std::transform(begin, end, std::back_inserter(stmtIdsWritten), [](const InsertStatement& stmt) { - return stmt.stmtId; - }); + std::transform(first, + last, + std::back_inserter(stmtIdsWritten), + [](const InsertStatement& stmt) { return stmt.stmtId; }); onWriteOpCompleted(opCtx, nss, session, stmtIdsWritten, lastOpTime, lastWriteDate); } @@ -511,17 +544,17 @@ void OpObserverImpl::onInternalOpMessage(OperationContext* opCtx, const BSONObj& msgObj, const boost::optional<BSONObj> o2MsgObj) { const BSONObj* o2MsgPtr = o2MsgObj ? o2MsgObj.get_ptr() : nullptr; - repl::logOp(opCtx, - "n", - nss, - uuid, - msgObj, - o2MsgPtr, - false, - getWallClockTimeForOpLog(opCtx), - {}, - kUninitializedStmtId, - {}); + logOperation(opCtx, + "n", + nss, + uuid, + msgObj, + o2MsgPtr, + false, + getWallClockTimeForOpLog(opCtx), + {}, + kUninitializedStmtId, + {}); } void OpObserverImpl::onCreateCollection(OperationContext* opCtx, @@ -554,17 +587,17 @@ void OpObserverImpl::onCreateCollection(OperationContext* opCtx, if (!collectionName.isSystemDotProfile()) { // do not replicate system.profile modifications - repl::logOp(opCtx, - "c", - cmdNss, - options.uuid, - cmdObj, - nullptr, - false, - getWallClockTimeForOpLog(opCtx), - {}, - kUninitializedStmtId, - {}); + logOperation(opCtx, + "c", + cmdNss, + options.uuid, + cmdObj, + nullptr, + false, + getWallClockTimeForOpLog(opCtx), + {}, + kUninitializedStmtId, + {}); } AuthorizationManager::get(opCtx->getServiceContext()) @@ -600,17 +633,17 @@ void OpObserverImpl::onCollMod(OperationContext* opCtx, if (!nss.isSystemDotProfile()) { // do not replicate system.profile modifications - repl::logOp(opCtx, - "c", - cmdNss, - uuid, - cmdObj, - &o2Obj, - false, - getWallClockTimeForOpLog(opCtx), - {}, - kUninitializedStmtId, - {}); + logOperation(opCtx, + "c", + cmdNss, + uuid, + cmdObj, + &o2Obj, + false, + getWallClockTimeForOpLog(opCtx), + {}, + kUninitializedStmtId, + {}); } AuthorizationManager::get(opCtx->getServiceContext()) @@ -634,17 +667,17 @@ void OpObserverImpl::onDropDatabase(OperationContext* opCtx, const std::string& const NamespaceString cmdNss{dbName, "$cmd"}; const auto cmdObj = BSON("dropDatabase" << 1); - repl::logOp(opCtx, - "c", - cmdNss, - {}, - cmdObj, - nullptr, - false, - getWallClockTimeForOpLog(opCtx), - {}, - kUninitializedStmtId, - {}); + logOperation(opCtx, + "c", + cmdNss, + {}, + cmdObj, + nullptr, + false, + getWallClockTimeForOpLog(opCtx), + {}, + kUninitializedStmtId, + {}); uassert(50714, "dropping the admin database is not allowed.", @@ -666,20 +699,19 @@ repl::OpTime OpObserverImpl::onDropCollection(OperationContext* opCtx, const auto cmdNss = collectionName.getCommandNS(); const auto cmdObj = BSON("drop" << collectionName.coll()); - repl::OpTime dropOpTime; if (!collectionName.isSystemDotProfile()) { // Do not replicate system.profile modifications. - dropOpTime = repl::logOp(opCtx, - "c", - cmdNss, - uuid, - cmdObj, - nullptr, - false, - getWallClockTimeForOpLog(opCtx), - {}, - kUninitializedStmtId, - {}); + logOperation(opCtx, + "c", + cmdNss, + uuid, + cmdObj, + nullptr, + false, + getWallClockTimeForOpLog(opCtx), + {}, + kUninitializedStmtId, + {}); } uassert(50715, @@ -701,7 +733,7 @@ repl::OpTime OpObserverImpl::onDropCollection(OperationContext* opCtx, // Evict namespace entry from the namespace/uuid cache if it exists. NamespaceUUIDCache::get(opCtx).evictNamespace(collectionName); - return dropOpTime; + return {}; } void OpObserverImpl::onDropIndex(OperationContext* opCtx, @@ -712,23 +744,23 @@ void OpObserverImpl::onDropIndex(OperationContext* opCtx, const auto cmdNss = nss.getCommandNS(); const auto cmdObj = BSON("dropIndexes" << nss.coll() << "index" << indexName); - repl::logOp(opCtx, - "c", - cmdNss, - uuid, - cmdObj, - &indexInfo, - false, - getWallClockTimeForOpLog(opCtx), - {}, - kUninitializedStmtId, - {}); + logOperation(opCtx, + "c", + cmdNss, + uuid, + cmdObj, + &indexInfo, + false, + getWallClockTimeForOpLog(opCtx), + {}, + kUninitializedStmtId, + {}); AuthorizationManager::get(opCtx->getServiceContext()) ->logOp(opCtx, "c", cmdNss, cmdObj, &indexInfo); } -repl::OpTime OpObserverImpl::onRenameCollection(OperationContext* opCtx, +repl::OpTime OpObserverImpl::onRenameCollection(OperationContext* const opCtx, const NamespaceString& fromCollection, const NamespaceString& toCollection, OptionalCollectionUUID uuid, @@ -749,17 +781,17 @@ repl::OpTime OpObserverImpl::onRenameCollection(OperationContext* opCtx, const auto cmdObj = builder.done(); - const auto renameOpTime = repl::logOp(opCtx, - "c", - cmdNss, - uuid, - cmdObj, - nullptr, - false, - getWallClockTimeForOpLog(opCtx), - {}, - kUninitializedStmtId, - {}); + logOperation(opCtx, + "c", + cmdNss, + uuid, + cmdObj, + nullptr, + false, + getWallClockTimeForOpLog(opCtx), + {}, + kUninitializedStmtId, + {}); if (fromCollection.isSystemDotViews()) DurableViewCatalog::onExternalChange(opCtx, fromCollection); @@ -776,7 +808,7 @@ repl::OpTime OpObserverImpl::onRenameCollection(OperationContext* opCtx, opCtx->recoveryUnit()->onRollback( [&cache, toCollection]() { cache.evictNamespace(toCollection); }); - return renameOpTime; + return {}; } void OpObserverImpl::onApplyOps(OperationContext* opCtx, @@ -797,17 +829,17 @@ void OpObserverImpl::onEmptyCapped(OperationContext* opCtx, if (!collectionName.isSystemDotProfile()) { // Do not replicate system.profile modifications - repl::logOp(opCtx, - "c", - cmdNss, - uuid, - cmdObj, - nullptr, - false, - getWallClockTimeForOpLog(opCtx), - {}, - kUninitializedStmtId, - {}); + logOperation(opCtx, + "c", + cmdNss, + uuid, + cmdObj, + nullptr, + false, + getWallClockTimeForOpLog(opCtx), + {}, + kUninitializedStmtId, + {}); } AuthorizationManager::get(opCtx->getServiceContext()) |