diff options
Diffstat (limited to 'src/mongo/db/op_observer_impl.cpp')
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 388 |
1 files changed, 210 insertions, 178 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index eea416f7db9..05d6ac0b91c 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -65,6 +65,33 @@ using DeleteState = CollectionShardingState::DeleteState; const OperationContext::Decoration<DeleteState> getDeleteState = OperationContext::declareDecoration<DeleteState>(); +repl::OpTime logOperation(OperationContext* opCtx, + const char* opstr, + const NamespaceString& ns, + OptionalCollectionUUID uuid, + const BSONObj& obj, + const BSONObj* o2, + bool fromMigrate, + Date_t wallClockTime, + const OperationSessionInfo& sessionInfo, + StmtId stmtId, + const repl::OplogLink& oplogLink) { + auto& times = OpObserver::Times::get(opCtx).reservedOpTimes; + auto opTime = repl::logOp(opCtx, + opstr, + ns, + uuid, + obj, + o2, + fromMigrate, + wallClockTime, + sessionInfo, + stmtId, + oplogLink); + times.push_back(opTime); + return opTime; +} + /** * Returns whether we're a master using master-slave replication. */ @@ -166,17 +193,17 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx, opTimes.wallClockTime = getWallClockTimeForOpLog(opCtx); if (!storeObj.isEmpty() && opCtx->getTxnNumber()) { - auto noteUpdateOpTime = repl::logOp(opCtx, - "n", - args.nss, - args.uuid, - storeObj, - nullptr, - false, - opTimes.wallClockTime, - sessionInfo, - args.stmtId, - {}); + auto noteUpdateOpTime = logOperation(opCtx, + "n", + args.nss, + args.uuid, + storeObj, + nullptr, + false, + opTimes.wallClockTime, + sessionInfo, + args.stmtId, + {}); opTimes.prePostImageOpTime = noteUpdateOpTime; @@ -187,17 +214,17 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx, } } - opTimes.writeOpTime = repl::logOp(opCtx, - "u", - args.nss, - args.uuid, - args.update, - &args.criteria, - args.fromMigrate, - opTimes.wallClockTime, - sessionInfo, - args.stmtId, - oplogLink); + opTimes.writeOpTime = logOperation(opCtx, + "u", + args.nss, + args.uuid, + args.update, + &args.criteria, + args.fromMigrate, + opTimes.wallClockTime, + sessionInfo, + args.stmtId, + oplogLink); return opTimes; } @@ -225,33 +252,33 @@ OpTimeBundle replLogDelete(OperationContext* opCtx, opTimes.wallClockTime = getWallClockTimeForOpLog(opCtx); if (deletedDoc && opCtx->getTxnNumber()) { - auto noteOplog = repl::logOp(opCtx, - "n", - nss, - uuid, - deletedDoc.get(), - nullptr, - false, - opTimes.wallClockTime, - sessionInfo, - stmtId, - {}); - opTimes.prePostImageOpTime = noteOplog; - oplogLink.preImageOpTime = noteOplog; - } - - CollectionShardingState::DeleteState& deleteState = getDeleteState(opCtx); - opTimes.writeOpTime = repl::logOp(opCtx, - "d", + auto noteOplog = logOperation(opCtx, + "n", nss, uuid, - deleteState.documentKey, + deletedDoc.get(), nullptr, - fromMigrate, + false, opTimes.wallClockTime, sessionInfo, stmtId, - oplogLink); + {}); + opTimes.prePostImageOpTime = noteOplog; + oplogLink.preImageOpTime = noteOplog; + } + + CollectionShardingState::DeleteState& deleteState = getDeleteState(opCtx); + opTimes.writeOpTime = logOperation(opCtx, + "d", + nss, + uuid, + deleteState.documentKey, + nullptr, + fromMigrate, + opTimes.wallClockTime, + sessionInfo, + stmtId, + oplogLink); return opTimes; } @@ -266,17 +293,17 @@ OpTimeBundle replLogApplyOps(OperationContext* opCtx, const repl::OplogLink& oplogLink) { OpTimeBundle times; times.wallClockTime = getWallClockTimeForOpLog(opCtx); - times.writeOpTime = repl::logOp(opCtx, - "c", - cmdNss, - {}, - applyOpCmd, - nullptr, - false, - times.wallClockTime, - sessionInfo, - stmtId, - oplogLink); + times.writeOpTime = logOperation(opCtx, + "c", + cmdNss, + {}, + applyOpCmd, + nullptr, + false, + times.wallClockTime, + sessionInfo, + stmtId, + oplogLink); return times; } @@ -298,29 +325,29 @@ void OpObserverImpl::onCreateIndex(OperationContext* opCtx, builder.append(e); } - repl::logOp(opCtx, - "c", - nss.getCommandNS(), - uuid, - builder.done(), - nullptr, - fromMigrate, - getWallClockTimeForOpLog(opCtx), - {}, - kUninitializedStmtId, - {}); + logOperation(opCtx, + "c", + nss.getCommandNS(), + uuid, + builder.done(), + nullptr, + fromMigrate, + getWallClockTimeForOpLog(opCtx), + {}, + kUninitializedStmtId, + {}); } else { - repl::logOp(opCtx, - "i", - systemIndexes, - {}, - indexDoc, - nullptr, - fromMigrate, - getWallClockTimeForOpLog(opCtx), - {}, - kUninitializedStmtId, - {}); + logOperation(opCtx, + "i", + systemIndexes, + {}, + indexDoc, + nullptr, + fromMigrate, + getWallClockTimeForOpLog(opCtx), + {}, + kUninitializedStmtId, + {}); } AuthorizationManager::get(opCtx->getServiceContext()) @@ -335,12 +362,12 @@ void OpObserverImpl::onCreateIndex(OperationContext* opCtx, void OpObserverImpl::onInserts(OperationContext* opCtx, const NamespaceString& nss, OptionalCollectionUUID uuid, - std::vector<InsertStatement>::const_iterator begin, - std::vector<InsertStatement>::const_iterator end, + std::vector<InsertStatement>::const_iterator first, + std::vector<InsertStatement>::const_iterator last, bool fromMigrate) { Session* const session = opCtx->getTxnNumber() ? OperationContextSession::get(opCtx) : nullptr; if (session && opCtx->writesAreReplicated() && session->inMultiDocumentTransaction()) { - for (auto iter = begin; iter != end; iter++) { + for (auto iter = first; iter != last; iter++) { auto operation = OplogEntry::makeInsertOperation(nss, uuid, iter->doc); session->addTransactionOperation(opCtx, operation); } @@ -350,14 +377,19 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, const auto lastWriteDate = getWallClockTimeForOpLog(opCtx); const auto opTimeList = - repl::logInsertOps(opCtx, nss, uuid, session, begin, end, fromMigrate, lastWriteDate); + repl::logInsertOps(opCtx, nss, uuid, session, first, last, fromMigrate, lastWriteDate); + + auto& times = OpObserver::Times::get(opCtx).reservedOpTimes; + using std::begin; + using std::end; + times.insert(end(times), begin(opTimeList), end(opTimeList)); auto css = (nss == NamespaceString::kSessionTransactionsTableNamespace || fromMigrate) ? nullptr : CollectionShardingState::get(opCtx, nss); size_t index = 0; - for (auto it = begin; it != end; it++, index++) { + for (auto it = first; it != last; it++, index++) { AuthorizationManager::get(opCtx->getServiceContext()) ->logOp(opCtx, "i", nss, it->doc, nullptr); if (css) { @@ -372,19 +404,20 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, } else if (nss.coll() == DurableViewCatalog::viewsCollectionName()) { DurableViewCatalog::onExternalChange(opCtx, nss); } else if (nss.ns() == FeatureCompatibilityVersion::kCollection) { - for (auto it = begin; it != end; it++) { + for (auto it = first; it != last; it++) { FeatureCompatibilityVersion::onInsertOrUpdate(opCtx, it->doc); } } else if (nss == NamespaceString::kSessionTransactionsTableNamespace && !lastOpTime.isNull()) { - for (auto it = begin; it != end; it++) { + for (auto it = first; it != last; it++) { SessionCatalog::get(opCtx)->invalidateSessions(opCtx, it->doc); } } std::vector<StmtId> stmtIdsWritten; - std::transform(begin, end, std::back_inserter(stmtIdsWritten), [](const InsertStatement& stmt) { - return stmt.stmtId; - }); + std::transform(first, + last, + std::back_inserter(stmtIdsWritten), + [](const InsertStatement& stmt) { return stmt.stmtId; }); onWriteOpCompleted(opCtx, nss, session, stmtIdsWritten, lastOpTime, lastWriteDate); } @@ -511,17 +544,17 @@ void OpObserverImpl::onInternalOpMessage(OperationContext* opCtx, const BSONObj& msgObj, const boost::optional<BSONObj> o2MsgObj) { const BSONObj* o2MsgPtr = o2MsgObj ? o2MsgObj.get_ptr() : nullptr; - repl::logOp(opCtx, - "n", - nss, - uuid, - msgObj, - o2MsgPtr, - false, - getWallClockTimeForOpLog(opCtx), - {}, - kUninitializedStmtId, - {}); + logOperation(opCtx, + "n", + nss, + uuid, + msgObj, + o2MsgPtr, + false, + getWallClockTimeForOpLog(opCtx), + {}, + kUninitializedStmtId, + {}); } void OpObserverImpl::onCreateCollection(OperationContext* opCtx, @@ -554,17 +587,17 @@ void OpObserverImpl::onCreateCollection(OperationContext* opCtx, if (!collectionName.isSystemDotProfile()) { // do not replicate system.profile modifications - repl::logOp(opCtx, - "c", - cmdNss, - options.uuid, - cmdObj, - nullptr, - false, - getWallClockTimeForOpLog(opCtx), - {}, - kUninitializedStmtId, - {}); + logOperation(opCtx, + "c", + cmdNss, + options.uuid, + cmdObj, + nullptr, + false, + getWallClockTimeForOpLog(opCtx), + {}, + kUninitializedStmtId, + {}); } AuthorizationManager::get(opCtx->getServiceContext()) @@ -600,17 +633,17 @@ void OpObserverImpl::onCollMod(OperationContext* opCtx, if (!nss.isSystemDotProfile()) { // do not replicate system.profile modifications - repl::logOp(opCtx, - "c", - cmdNss, - uuid, - cmdObj, - &o2Obj, - false, - getWallClockTimeForOpLog(opCtx), - {}, - kUninitializedStmtId, - {}); + logOperation(opCtx, + "c", + cmdNss, + uuid, + cmdObj, + &o2Obj, + false, + getWallClockTimeForOpLog(opCtx), + {}, + kUninitializedStmtId, + {}); } AuthorizationManager::get(opCtx->getServiceContext()) @@ -634,17 +667,17 @@ void OpObserverImpl::onDropDatabase(OperationContext* opCtx, const std::string& const NamespaceString cmdNss{dbName, "$cmd"}; const auto cmdObj = BSON("dropDatabase" << 1); - repl::logOp(opCtx, - "c", - cmdNss, - {}, - cmdObj, - nullptr, - false, - getWallClockTimeForOpLog(opCtx), - {}, - kUninitializedStmtId, - {}); + logOperation(opCtx, + "c", + cmdNss, + {}, + cmdObj, + nullptr, + false, + getWallClockTimeForOpLog(opCtx), + {}, + kUninitializedStmtId, + {}); uassert(50714, "dropping the admin database is not allowed.", @@ -666,20 +699,19 @@ repl::OpTime OpObserverImpl::onDropCollection(OperationContext* opCtx, const auto cmdNss = collectionName.getCommandNS(); const auto cmdObj = BSON("drop" << collectionName.coll()); - repl::OpTime dropOpTime; if (!collectionName.isSystemDotProfile()) { // Do not replicate system.profile modifications. - dropOpTime = repl::logOp(opCtx, - "c", - cmdNss, - uuid, - cmdObj, - nullptr, - false, - getWallClockTimeForOpLog(opCtx), - {}, - kUninitializedStmtId, - {}); + logOperation(opCtx, + "c", + cmdNss, + uuid, + cmdObj, + nullptr, + false, + getWallClockTimeForOpLog(opCtx), + {}, + kUninitializedStmtId, + {}); } uassert(50715, @@ -701,7 +733,7 @@ repl::OpTime OpObserverImpl::onDropCollection(OperationContext* opCtx, // Evict namespace entry from the namespace/uuid cache if it exists. NamespaceUUIDCache::get(opCtx).evictNamespace(collectionName); - return dropOpTime; + return {}; } void OpObserverImpl::onDropIndex(OperationContext* opCtx, @@ -712,23 +744,23 @@ void OpObserverImpl::onDropIndex(OperationContext* opCtx, const auto cmdNss = nss.getCommandNS(); const auto cmdObj = BSON("dropIndexes" << nss.coll() << "index" << indexName); - repl::logOp(opCtx, - "c", - cmdNss, - uuid, - cmdObj, - &indexInfo, - false, - getWallClockTimeForOpLog(opCtx), - {}, - kUninitializedStmtId, - {}); + logOperation(opCtx, + "c", + cmdNss, + uuid, + cmdObj, + &indexInfo, + false, + getWallClockTimeForOpLog(opCtx), + {}, + kUninitializedStmtId, + {}); AuthorizationManager::get(opCtx->getServiceContext()) ->logOp(opCtx, "c", cmdNss, cmdObj, &indexInfo); } -repl::OpTime OpObserverImpl::onRenameCollection(OperationContext* opCtx, +repl::OpTime OpObserverImpl::onRenameCollection(OperationContext* const opCtx, const NamespaceString& fromCollection, const NamespaceString& toCollection, OptionalCollectionUUID uuid, @@ -749,17 +781,17 @@ repl::OpTime OpObserverImpl::onRenameCollection(OperationContext* opCtx, const auto cmdObj = builder.done(); - const auto renameOpTime = repl::logOp(opCtx, - "c", - cmdNss, - uuid, - cmdObj, - nullptr, - false, - getWallClockTimeForOpLog(opCtx), - {}, - kUninitializedStmtId, - {}); + logOperation(opCtx, + "c", + cmdNss, + uuid, + cmdObj, + nullptr, + false, + getWallClockTimeForOpLog(opCtx), + {}, + kUninitializedStmtId, + {}); if (fromCollection.isSystemDotViews()) DurableViewCatalog::onExternalChange(opCtx, fromCollection); @@ -776,7 +808,7 @@ repl::OpTime OpObserverImpl::onRenameCollection(OperationContext* opCtx, opCtx->recoveryUnit()->onRollback( [&cache, toCollection]() { cache.evictNamespace(toCollection); }); - return renameOpTime; + return {}; } void OpObserverImpl::onApplyOps(OperationContext* opCtx, @@ -797,17 +829,17 @@ void OpObserverImpl::onEmptyCapped(OperationContext* opCtx, if (!collectionName.isSystemDotProfile()) { // Do not replicate system.profile modifications - repl::logOp(opCtx, - "c", - cmdNss, - uuid, - cmdObj, - nullptr, - false, - getWallClockTimeForOpLog(opCtx), - {}, - kUninitializedStmtId, - {}); + logOperation(opCtx, + "c", + cmdNss, + uuid, + cmdObj, + nullptr, + false, + getWallClockTimeForOpLog(opCtx), + {}, + kUninitializedStmtId, + {}); } AuthorizationManager::get(opCtx->getServiceContext()) |