/** * Copyright (C) 2016 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/platform/basic.h" #include "mongo/db/op_observer_impl.h" #include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/auth/authorization_manager.h" #include "mongo/db/catalog/collection_catalog_entry.h" #include "mongo/db/catalog/collection_options.h" #include "mongo/db/catalog/database.h" #include "mongo/db/catalog/database_holder.h" #include "mongo/db/catalog/namespace_uuid_cache.h" #include "mongo/db/commands/feature_compatibility_version.h" #include "mongo/db/commands/feature_compatibility_version_parser.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/index/index_descriptor.h" #include "mongo/db/logical_time_validator.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/server_options.h" #include "mongo/db/session_catalog.h" #include "mongo/db/views/durable_view_catalog.h" #include "mongo/scripting/engine.h" #include "mongo/util/assert_util.h" #include "mongo/util/fail_point_service.h" namespace mongo { using repl::OplogEntry; namespace { MONGO_FP_DECLARE(failCollectionUpdates); using DeleteState = CollectionShardingState::DeleteState; const OperationContext::Decoration getDeleteState = OperationContext::declareDecoration(); repl::OpTime logOperation(OperationContext* opCtx, const char* opstr, const NamespaceString& ns, OptionalCollectionUUID uuid, const BSONObj& obj, const BSONObj* o2, bool fromMigrate, Date_t wallClockTime, const OperationSessionInfo& sessionInfo, StmtId stmtId, const repl::OplogLink& oplogLink) { auto& times = OpObserver::Times::get(opCtx).reservedOpTimes; auto opTime = repl::logOp(opCtx, opstr, ns, uuid, obj, o2, fromMigrate, wallClockTime, sessionInfo, stmtId, oplogLink); times.push_back(opTime); return opTime; } /** * Updates the session state with the last write timestamp and transaction for that session. * * In the case of writes with transaction/statement id, this method will be recursively entered a * second time for the actual write to the transactions table. Since this write does not generate an * oplog entry, the recursion will stop at this point. */ void onWriteOpCompleted(OperationContext* opCtx, const NamespaceString& nss, Session* session, std::vector stmtIdsWritten, const repl::OpTime& lastStmtIdWriteOpTime, Date_t lastStmtIdWriteDate) { if (lastStmtIdWriteOpTime.isNull()) return; if (session) { session->onWriteOpCompletedOnPrimary(opCtx, *opCtx->getTxnNumber(), std::move(stmtIdsWritten), lastStmtIdWriteOpTime, lastStmtIdWriteDate); } } /** * Given a raw collMod command object and associated collection metadata, create and return the * object for the 'o' field of a collMod oplog entry. For TTL index updates, we make sure the oplog * entry always stores the index name, instead of a key pattern. */ BSONObj makeCollModCmdObj(const BSONObj& collModCmd, const CollectionOptions& oldCollOptions, boost::optional ttlInfo) { BSONObjBuilder cmdObjBuilder; std::string ttlIndexFieldName = "index"; // Add all fields from the original collMod command. for (auto elem : collModCmd) { // We normalize all TTL collMod oplog entry objects to use the index name, even if the // command used an index key pattern. if (elem.fieldNameStringData() == ttlIndexFieldName && ttlInfo) { BSONObjBuilder ttlIndexObjBuilder; ttlIndexObjBuilder.append("name", ttlInfo->indexName); ttlIndexObjBuilder.append("expireAfterSeconds", durationCount(ttlInfo->expireAfterSeconds)); cmdObjBuilder.append(ttlIndexFieldName, ttlIndexObjBuilder.obj()); } else { cmdObjBuilder.append(elem); } } return cmdObjBuilder.obj(); } Date_t getWallClockTimeForOpLog(OperationContext* opCtx) { auto const clockSource = opCtx->getServiceContext()->getFastClockSource(); return clockSource->now(); } struct OpTimeBundle { repl::OpTime writeOpTime; repl::OpTime prePostImageOpTime; Date_t wallClockTime; }; /** * Write oplog entry(ies) for the update operation. */ OpTimeBundle replLogUpdate(OperationContext* opCtx, Session* session, const OplogUpdateEntryArgs& args) { BSONObj storeObj; if (args.storeDocOption == OplogUpdateEntryArgs::StoreDocOption::PreImage) { invariant(args.preImageDoc); storeObj = *args.preImageDoc; } else if (args.storeDocOption == OplogUpdateEntryArgs::StoreDocOption::PostImage) { storeObj = args.updatedDoc; } OperationSessionInfo sessionInfo; repl::OplogLink oplogLink; if (session) { sessionInfo.setSessionId(*opCtx->getLogicalSessionId()); sessionInfo.setTxnNumber(*opCtx->getTxnNumber()); oplogLink.prevOpTime = session->getLastWriteOpTime(*opCtx->getTxnNumber()); } OpTimeBundle opTimes; opTimes.wallClockTime = getWallClockTimeForOpLog(opCtx); if (!storeObj.isEmpty() && opCtx->getTxnNumber()) { auto noteUpdateOpTime = logOperation(opCtx, "n", args.nss, args.uuid, storeObj, nullptr, false, opTimes.wallClockTime, sessionInfo, args.stmtId, {}); opTimes.prePostImageOpTime = noteUpdateOpTime; if (args.storeDocOption == OplogUpdateEntryArgs::StoreDocOption::PreImage) { oplogLink.preImageOpTime = noteUpdateOpTime; } else if (args.storeDocOption == OplogUpdateEntryArgs::StoreDocOption::PostImage) { oplogLink.postImageOpTime = noteUpdateOpTime; } } opTimes.writeOpTime = logOperation(opCtx, "u", args.nss, args.uuid, args.update, &args.criteria, args.fromMigrate, opTimes.wallClockTime, sessionInfo, args.stmtId, oplogLink); return opTimes; } /** * Write oplog entry(ies) for the delete operation. */ OpTimeBundle replLogDelete(OperationContext* opCtx, const NamespaceString& nss, OptionalCollectionUUID uuid, Session* session, StmtId stmtId, bool fromMigrate, const boost::optional& deletedDoc) { OperationSessionInfo sessionInfo; repl::OplogLink oplogLink; if (session) { sessionInfo.setSessionId(*opCtx->getLogicalSessionId()); sessionInfo.setTxnNumber(*opCtx->getTxnNumber()); oplogLink.prevOpTime = session->getLastWriteOpTime(*opCtx->getTxnNumber()); } OpTimeBundle opTimes; opTimes.wallClockTime = getWallClockTimeForOpLog(opCtx); if (deletedDoc && opCtx->getTxnNumber()) { auto noteOplog = logOperation(opCtx, "n", nss, uuid, deletedDoc.get(), nullptr, false, opTimes.wallClockTime, sessionInfo, stmtId, {}); opTimes.prePostImageOpTime = noteOplog; oplogLink.preImageOpTime = noteOplog; } CollectionShardingState::DeleteState& deleteState = getDeleteState(opCtx); opTimes.writeOpTime = logOperation(opCtx, "d", nss, uuid, deleteState.documentKey, nullptr, fromMigrate, opTimes.wallClockTime, sessionInfo, stmtId, oplogLink); return opTimes; } /** * Write oplog entry for applyOps/atomic transaction operations. */ OpTimeBundle replLogApplyOps(OperationContext* opCtx, const NamespaceString& cmdNss, const BSONObj& applyOpCmd, const OperationSessionInfo& sessionInfo, StmtId stmtId, const repl::OplogLink& oplogLink) { OpTimeBundle times; times.wallClockTime = getWallClockTimeForOpLog(opCtx); times.writeOpTime = logOperation(opCtx, "c", cmdNss, {}, applyOpCmd, nullptr, false, times.wallClockTime, sessionInfo, stmtId, oplogLink); return times; } } // namespace void OpObserverImpl::onCreateIndex(OperationContext* opCtx, const NamespaceString& nss, OptionalCollectionUUID uuid, BSONObj indexDoc, bool fromMigrate) { const NamespaceString systemIndexes{nss.getSystemIndexesCollection()}; if (uuid) { BSONObjBuilder builder; builder.append("createIndexes", nss.coll()); for (const auto& e : indexDoc) { if (e.fieldNameStringData() != "ns"_sd) builder.append(e); } logOperation(opCtx, "c", nss.getCommandNS(), uuid, builder.done(), nullptr, fromMigrate, getWallClockTimeForOpLog(opCtx), {}, kUninitializedStmtId, {}); } else { logOperation(opCtx, "i", systemIndexes, {}, indexDoc, nullptr, fromMigrate, getWallClockTimeForOpLog(opCtx), {}, kUninitializedStmtId, {}); } AuthorizationManager::get(opCtx->getServiceContext()) ->logOp(opCtx, "i", systemIndexes, indexDoc, nullptr); auto css = CollectionShardingState::get(opCtx, systemIndexes); if (!fromMigrate) { css->onInsertOp(opCtx, indexDoc, {}); } } void OpObserverImpl::onInserts(OperationContext* opCtx, const NamespaceString& nss, OptionalCollectionUUID uuid, std::vector::const_iterator first, std::vector::const_iterator last, bool fromMigrate) { Session* const session = opCtx->getTxnNumber() ? OperationContextSession::get(opCtx) : nullptr; if (session && opCtx->writesAreReplicated() && session->inMultiDocumentTransaction()) { for (auto iter = first; iter != last; iter++) { auto operation = OplogEntry::makeInsertOperation(nss, uuid, iter->doc); session->addTransactionOperation(opCtx, operation); } return; } const auto lastWriteDate = getWallClockTimeForOpLog(opCtx); const auto opTimeList = repl::logInsertOps(opCtx, nss, uuid, session, first, last, fromMigrate, lastWriteDate); auto& times = OpObserver::Times::get(opCtx).reservedOpTimes; using std::begin; using std::end; times.insert(end(times), begin(opTimeList), end(opTimeList)); auto css = (nss == NamespaceString::kSessionTransactionsTableNamespace || fromMigrate) ? nullptr : CollectionShardingState::get(opCtx, nss); size_t index = 0; for (auto it = first; it != last; it++, index++) { AuthorizationManager::get(opCtx->getServiceContext()) ->logOp(opCtx, "i", nss, it->doc, nullptr); if (css) { auto opTime = opTimeList.empty() ? repl::OpTime() : opTimeList[index]; css->onInsertOp(opCtx, it->doc, opTime); } } const auto lastOpTime = opTimeList.empty() ? repl::OpTime() : opTimeList.back(); if (nss.coll() == "system.js") { Scope::storedFuncMod(opCtx); } else if (nss.coll() == DurableViewCatalog::viewsCollectionName()) { DurableViewCatalog::onExternalChange(opCtx, nss); } else if (nss == NamespaceString::kServerConfigurationNamespace) { // We must check server configuration collection writes for featureCompatibilityVersion // document changes. for (auto it = first; it != last; it++) { FeatureCompatibilityVersion::onInsertOrUpdate(opCtx, it->doc); } } else if (nss == NamespaceString::kSessionTransactionsTableNamespace && !lastOpTime.isNull()) { for (auto it = first; it != last; it++) { SessionCatalog::get(opCtx)->invalidateSessions(opCtx, it->doc); } } std::vector stmtIdsWritten; std::transform(first, last, std::back_inserter(stmtIdsWritten), [](const InsertStatement& stmt) { return stmt.stmtId; }); onWriteOpCompleted(opCtx, nss, session, stmtIdsWritten, lastOpTime, lastWriteDate); } void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) { MONGO_FAIL_POINT_BLOCK(failCollectionUpdates, extraData) { auto collElem = extraData.getData()["collectionNS"]; // If the failpoint specifies no collection or matches the existing one, fail. if (!collElem || args.nss.ns() == collElem.String()) { uasserted(40654, str::stream() << "failCollectionUpdates failpoint enabled, namespace: " << args.nss.ns() << ", update: " << args.update << " on document with " << args.criteria); } } // Do not log a no-op operation; see SERVER-21738 if (args.update.isEmpty()) { return; } Session* const session = opCtx->getTxnNumber() ? OperationContextSession::get(opCtx) : nullptr; if (session && opCtx->writesAreReplicated() && session->inMultiDocumentTransaction()) { auto operation = OplogEntry::makeUpdateOperation(args.nss, args.uuid, args.update, args.criteria); session->addTransactionOperation(opCtx, operation); return; } const auto opTime = replLogUpdate(opCtx, session, args); AuthorizationManager::get(opCtx->getServiceContext()) ->logOp(opCtx, "u", args.nss, args.update, &args.criteria); if (args.nss != NamespaceString::kSessionTransactionsTableNamespace) { if (!args.fromMigrate) { auto css = CollectionShardingState::get(opCtx, args.nss); css->onUpdateOp(opCtx, args.updatedDoc, opTime.writeOpTime, opTime.prePostImageOpTime); } } if (args.nss.coll() == "system.js") { Scope::storedFuncMod(opCtx); } else if (args.nss.coll() == DurableViewCatalog::viewsCollectionName()) { DurableViewCatalog::onExternalChange(opCtx, args.nss); } else if (args.nss == NamespaceString::kServerConfigurationNamespace) { // We must check server configuration collection writes for featureCompatibilityVersion // document changes. FeatureCompatibilityVersion::onInsertOrUpdate(opCtx, args.updatedDoc); } else if (args.nss == NamespaceString::kSessionTransactionsTableNamespace && !opTime.writeOpTime.isNull()) { SessionCatalog::get(opCtx)->invalidateSessions(opCtx, args.updatedDoc); } onWriteOpCompleted(opCtx, args.nss, session, std::vector{args.stmtId}, opTime.writeOpTime, opTime.wallClockTime); } void OpObserverImpl::aboutToDelete(OperationContext* opCtx, NamespaceString const& nss, BSONObj const& doc) { auto& deleteState = getDeleteState(opCtx); auto* css = CollectionShardingState::get(opCtx, nss); deleteState = css->makeDeleteState(opCtx, doc); } void OpObserverImpl::onDelete(OperationContext* opCtx, const NamespaceString& nss, OptionalCollectionUUID uuid, StmtId stmtId, bool fromMigrate, const boost::optional& deletedDoc) { Session* const session = opCtx->getTxnNumber() ? OperationContextSession::get(opCtx) : nullptr; auto& deleteState = getDeleteState(opCtx); invariant(!deleteState.documentKey.isEmpty()); if (session && opCtx->writesAreReplicated() && session->inMultiDocumentTransaction()) { auto operation = OplogEntry::makeDeleteOperation( nss, uuid, deletedDoc ? deletedDoc.get() : deleteState.documentKey); session->addTransactionOperation(opCtx, operation); return; } const auto opTime = replLogDelete(opCtx, nss, uuid, session, stmtId, fromMigrate, deletedDoc); AuthorizationManager::get(opCtx->getServiceContext()) ->logOp(opCtx, "d", nss, deleteState.documentKey, nullptr); if (nss != NamespaceString::kSessionTransactionsTableNamespace) { if (!fromMigrate) { auto css = CollectionShardingState::get(opCtx, nss); css->onDeleteOp(opCtx, deleteState, opTime.writeOpTime, opTime.prePostImageOpTime); } } if (nss.coll() == "system.js") { Scope::storedFuncMod(opCtx); } else if (nss.coll() == DurableViewCatalog::viewsCollectionName()) { DurableViewCatalog::onExternalChange(opCtx, nss); } else if (nss.isServerConfigurationCollection()) { auto _id = deleteState.documentKey["_id"]; if (_id.type() == BSONType::String && _id.String() == FeatureCompatibilityVersionParser::kParameterName) uasserted(40670, "removing FeatureCompatibilityVersion document is not allowed"); } else if (nss == NamespaceString::kSessionTransactionsTableNamespace && !opTime.writeOpTime.isNull()) { SessionCatalog::get(opCtx)->invalidateSessions(opCtx, deleteState.documentKey); } onWriteOpCompleted( opCtx, nss, session, std::vector{stmtId}, opTime.writeOpTime, opTime.wallClockTime); } void OpObserverImpl::onInternalOpMessage(OperationContext* opCtx, const NamespaceString& nss, const boost::optional uuid, const BSONObj& msgObj, const boost::optional o2MsgObj) { const BSONObj* o2MsgPtr = o2MsgObj ? o2MsgObj.get_ptr() : nullptr; logOperation(opCtx, "n", nss, uuid, msgObj, o2MsgPtr, false, getWallClockTimeForOpLog(opCtx), {}, kUninitializedStmtId, {}); } void OpObserverImpl::onCreateCollection(OperationContext* opCtx, Collection* coll, const NamespaceString& collectionName, const CollectionOptions& options, const BSONObj& idIndex) { const auto cmdNss = collectionName.getCommandNS(); BSONObjBuilder b; b.append("create", collectionName.coll().toString()); { // Don't store the UUID as part of the options, but instead only at the top level CollectionOptions optionsToStore = options; optionsToStore.uuid.reset(); b.appendElements(optionsToStore.toBSON()); } // Include the full _id index spec in the oplog for index versions >= 2. if (!idIndex.isEmpty()) { auto versionElem = idIndex[IndexDescriptor::kIndexVersionFieldName]; invariant(versionElem.isNumber()); if (IndexDescriptor::IndexVersion::kV2 <= static_cast(versionElem.numberInt())) { b.append("idIndex", idIndex); } } const auto cmdObj = b.done(); if (!collectionName.isSystemDotProfile()) { // do not replicate system.profile modifications logOperation(opCtx, "c", cmdNss, options.uuid, cmdObj, nullptr, false, getWallClockTimeForOpLog(opCtx), {}, kUninitializedStmtId, {}); } AuthorizationManager::get(opCtx->getServiceContext()) ->logOp(opCtx, "c", cmdNss, cmdObj, nullptr); if (options.uuid) { opCtx->recoveryUnit()->onRollback([opCtx, collectionName]() { NamespaceUUIDCache::get(opCtx).evictNamespace(collectionName); }); } } void OpObserverImpl::onCollMod(OperationContext* opCtx, const NamespaceString& nss, OptionalCollectionUUID uuid, const BSONObj& collModCmd, const CollectionOptions& oldCollOptions, boost::optional ttlInfo) { const auto cmdNss = nss.getCommandNS(); // Create the 'o' field object. const auto cmdObj = makeCollModCmdObj(collModCmd, oldCollOptions, ttlInfo); // Create the 'o2' field object. We save the old collection metadata and TTL expiration. BSONObjBuilder o2Builder; o2Builder.append("collectionOptions_old", oldCollOptions.toBSON()); if (ttlInfo) { auto oldExpireAfterSeconds = durationCount(ttlInfo->oldExpireAfterSeconds); o2Builder.append("expireAfterSeconds_old", oldExpireAfterSeconds); } const auto o2Obj = o2Builder.done(); if (!nss.isSystemDotProfile()) { // do not replicate system.profile modifications logOperation(opCtx, "c", cmdNss, uuid, cmdObj, &o2Obj, false, getWallClockTimeForOpLog(opCtx), {}, kUninitializedStmtId, {}); } AuthorizationManager::get(opCtx->getServiceContext()) ->logOp(opCtx, "c", cmdNss, cmdObj, nullptr); // Make sure the UUID values in the Collection metadata, the Collection object, and the UUID // catalog are all present and equal, unless the collection is system.indexes or // system.namespaces (see SERVER-29926, SERVER-30095). invariant(opCtx->lockState()->isDbLockedForMode(nss.db(), MODE_X)); Database* db = dbHolder().get(opCtx, nss.db()); // Some unit tests call the op observer on an unregistered Database. if (!db) { return; } Collection* coll = db->getCollection(opCtx, nss.ns()); invariant(coll->uuid() || nss.coll() == "system.indexes" || nss.coll() == "system.namespaces"); invariant(coll->uuid() == uuid); CollectionCatalogEntry* entry = coll->getCatalogEntry(); invariant(entry->isEqualToMetadataUUID(opCtx, uuid)); } void OpObserverImpl::onDropDatabase(OperationContext* opCtx, const std::string& dbName) { const NamespaceString cmdNss{dbName, "$cmd"}; const auto cmdObj = BSON("dropDatabase" << 1); logOperation(opCtx, "c", cmdNss, {}, cmdObj, nullptr, false, getWallClockTimeForOpLog(opCtx), {}, kUninitializedStmtId, {}); uassert( 50714, "dropping the admin database is not allowed.", dbName != NamespaceString::kAdminDb); if (dbName == NamespaceString::kSessionTransactionsTableNamespace.db()) { SessionCatalog::get(opCtx)->invalidateSessions(opCtx, boost::none); } NamespaceUUIDCache::get(opCtx).evictNamespacesInDatabase(dbName); AuthorizationManager::get(opCtx->getServiceContext()) ->logOp(opCtx, "c", cmdNss, cmdObj, nullptr); } repl::OpTime OpObserverImpl::onDropCollection(OperationContext* opCtx, const NamespaceString& collectionName, OptionalCollectionUUID uuid) { const auto cmdNss = collectionName.getCommandNS(); const auto cmdObj = BSON("drop" << collectionName.coll()); if (!collectionName.isSystemDotProfile()) { // Do not replicate system.profile modifications. logOperation(opCtx, "c", cmdNss, uuid, cmdObj, nullptr, false, getWallClockTimeForOpLog(opCtx), {}, kUninitializedStmtId, {}); } uassert(50715, "dropping the server configuration collection (admin.system.version) is not allowed.", collectionName != NamespaceString::kServerConfigurationNamespace); if (collectionName.coll() == DurableViewCatalog::viewsCollectionName()) { DurableViewCatalog::onExternalChange(opCtx, collectionName); } else if (collectionName == NamespaceString::kSessionTransactionsTableNamespace) { SessionCatalog::get(opCtx)->invalidateSessions(opCtx, boost::none); } AuthorizationManager::get(opCtx->getServiceContext()) ->logOp(opCtx, "c", cmdNss, cmdObj, nullptr); // Evict namespace entry from the namespace/uuid cache if it exists. NamespaceUUIDCache::get(opCtx).evictNamespace(collectionName); return {}; } void OpObserverImpl::onDropIndex(OperationContext* opCtx, const NamespaceString& nss, OptionalCollectionUUID uuid, const std::string& indexName, const BSONObj& indexInfo) { const auto cmdNss = nss.getCommandNS(); const auto cmdObj = BSON("dropIndexes" << nss.coll() << "index" << indexName); logOperation(opCtx, "c", cmdNss, uuid, cmdObj, &indexInfo, false, getWallClockTimeForOpLog(opCtx), {}, kUninitializedStmtId, {}); AuthorizationManager::get(opCtx->getServiceContext()) ->logOp(opCtx, "c", cmdNss, cmdObj, &indexInfo); } repl::OpTime OpObserverImpl::onRenameCollection(OperationContext* const opCtx, const NamespaceString& fromCollection, const NamespaceString& toCollection, OptionalCollectionUUID uuid, OptionalCollectionUUID dropTargetUUID, bool stayTemp) { const auto cmdNss = fromCollection.getCommandNS(); BSONObjBuilder builder; builder.append("renameCollection", fromCollection.ns()); builder.append("to", toCollection.ns()); builder.append("stayTemp", stayTemp); if (dropTargetUUID) { dropTargetUUID->appendToBuilder(&builder, "dropTarget"); } const auto cmdObj = builder.done(); logOperation(opCtx, "c", cmdNss, uuid, cmdObj, nullptr, false, getWallClockTimeForOpLog(opCtx), {}, kUninitializedStmtId, {}); if (fromCollection.isSystemDotViews()) DurableViewCatalog::onExternalChange(opCtx, fromCollection); if (toCollection.isSystemDotViews()) DurableViewCatalog::onExternalChange(opCtx, toCollection); AuthorizationManager::get(opCtx->getServiceContext()) ->logOp(opCtx, "c", cmdNss, cmdObj, nullptr); // Evict namespace entry from the namespace/uuid cache if it exists. NamespaceUUIDCache& cache = NamespaceUUIDCache::get(opCtx); cache.evictNamespace(fromCollection); cache.evictNamespace(toCollection); opCtx->recoveryUnit()->onRollback( [&cache, toCollection]() { cache.evictNamespace(toCollection); }); return {}; } void OpObserverImpl::onApplyOps(OperationContext* opCtx, const std::string& dbName, const BSONObj& applyOpCmd) { const NamespaceString cmdNss{dbName, "$cmd"}; replLogApplyOps(opCtx, cmdNss, applyOpCmd, {}, kUninitializedStmtId, {}); AuthorizationManager::get(opCtx->getServiceContext()) ->logOp(opCtx, "c", cmdNss, applyOpCmd, nullptr); } void OpObserverImpl::onEmptyCapped(OperationContext* opCtx, const NamespaceString& collectionName, OptionalCollectionUUID uuid) { const auto cmdNss = collectionName.getCommandNS(); const auto cmdObj = BSON("emptycapped" << collectionName.coll()); if (!collectionName.isSystemDotProfile()) { // Do not replicate system.profile modifications logOperation(opCtx, "c", cmdNss, uuid, cmdObj, nullptr, false, getWallClockTimeForOpLog(opCtx), {}, kUninitializedStmtId, {}); } AuthorizationManager::get(opCtx->getServiceContext()) ->logOp(opCtx, "c", cmdNss, cmdObj, nullptr); } void OpObserverImpl::onTransactionCommit(OperationContext* opCtx) { invariant(opCtx->getTxnNumber()); Session* const session = OperationContextSession::get(opCtx); invariant(session); auto stmts = session->endTransactionAndRetrieveOperations(opCtx); // It is possible that the transaction resulted in no changes. In that case, we should // not write an empty applyOps entry. if (stmts.empty()) return; BSONObjBuilder applyOpsBuilder; BSONArrayBuilder opsArray(applyOpsBuilder.subarrayStart("applyOps"_sd)); for (auto& stmt : stmts) { opsArray.append(stmt.toBSON()); } opsArray.done(); const auto dbName = stmts[0].getNamespace().db().toString(); const NamespaceString cmdNss{dbName, "$cmd"}; OperationSessionInfo sessionInfo; repl::OplogLink oplogLink; sessionInfo.setSessionId(*opCtx->getLogicalSessionId()); sessionInfo.setTxnNumber(*opCtx->getTxnNumber()); StmtId stmtId(0); oplogLink.prevOpTime = session->getLastWriteOpTime(*opCtx->getTxnNumber()); // Until we support multiple oplog entries per transaction, prevOpTime should always be null. invariant(oplogLink.prevOpTime.isNull()); auto applyOpCmd = applyOpsBuilder.done(); auto times = replLogApplyOps(opCtx, cmdNss, applyOpCmd, sessionInfo, stmtId, oplogLink); onWriteOpCompleted(opCtx, cmdNss, session, {stmtId}, times.writeOpTime, times.wallClockTime); } void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx) { invariant(opCtx->getTxnNumber()); Session* const session = OperationContextSession::get(opCtx); invariant(session->inMultiDocumentTransaction()); auto opTime = repl::getNextOpTimeNoPersistForTesting(opCtx).opTime; opCtx->recoveryUnit()->setPrepareTimestamp(opTime.getTimestamp()); } void OpObserverImpl::onTransactionAbort(OperationContext* opCtx) { invariant(opCtx->getTxnNumber()); } void OpObserverImpl::onReplicationRollback(OperationContext* opCtx, const RollbackObserverInfo& rbInfo) { // Invalidate any in-memory auth data if necessary. const auto& rollbackNamespaces = rbInfo.rollbackNamespaces; if (rollbackNamespaces.count(AuthorizationManager::versionCollectionNamespace) == 1 || rollbackNamespaces.count(AuthorizationManager::usersCollectionNamespace) == 1 || rollbackNamespaces.count(AuthorizationManager::rolesCollectionNamespace) == 1) { AuthorizationManager::get(opCtx->getServiceContext())->invalidateUserCache(); } // If there were ops rolled back that were part of operations on a session, then invalidate the // session cache. if (rbInfo.rollbackSessionIds.size() > 0) { SessionCatalog::get(opCtx)->invalidateSessions(opCtx, boost::none); } // Reset the key manager cache. auto validator = LogicalTimeValidator::get(opCtx); if (validator) { validator->resetKeyManagerCache(); } // Check if the shard identity document rolled back. if (rbInfo.shardIdentityRolledBack) { fassertFailedNoTrace(50712); } } } // namespace mongo