diff options
author | Grace Luong <grace.luong@mongodb.com> | 2020-08-03 22:28:46 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-08-19 17:04:39 +0000 |
commit | 5b5ba520338a8c0b8f36b60042bfdeb40579e1a1 (patch) | |
tree | d117390a4d07ab1fa3798402c7d1cc2fd3b33f20 /src/mongo/db | |
parent | 3e0366d2b1ff45a3ea91b63793743023479d9c4a (diff) | |
download | mongo-5b5ba520338a8c0b8f36b60042bfdeb40579e1a1.tar.gz |
SERVER-47667: Create FCVOpObserver
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/SConscript | 23 | ||||
-rw-r--r-- | src/mongo/db/commands/SConscript | 5 | ||||
-rw-r--r-- | src/mongo/db/commands/feature_compatibility_version.cpp | 97 | ||||
-rw-r--r-- | src/mongo/db/commands/feature_compatibility_version.h | 20 | ||||
-rw-r--r-- | src/mongo/db/fcv_op_observer.cpp | 185 | ||||
-rw-r--r-- | src/mongo/db/fcv_op_observer.h | 200 | ||||
-rw-r--r-- | src/mongo/db/mongod_main.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 26 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl.h | 3 |
9 files changed, 418 insertions, 143 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index d1923f219d3..5ad1e052a01 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -806,12 +806,31 @@ env.Library( ], LIBDEPS_PRIVATE=[ 'transaction', - '$BUILD_DIR/mongo/db/commands/mongod_fcv', "$BUILD_DIR/mongo/db/catalog/commit_quorum_options", ], ) env.Library( + target="fcv_op_observer", + source=[ + "fcv_op_observer.cpp", + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/db/catalog/collection_catalog', + 'catalog/collection_options', + 'op_observer', + 'op_observer_util', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/commands/mongod_fcv', + '$BUILD_DIR/mongo/db/kill_sessions_local', + '$BUILD_DIR/mongo/executor/egress_tag_closer_manager', + 'op_observer_impl', + ], +) + +env.Library( target="service_entry_point_common", source=[ "service_entry_point_common.cpp", @@ -2054,6 +2073,7 @@ env.Library( 'commands/mongod', 'concurrency/flow_control_ticketholder', 'concurrency/lock_manager', + 'fcv_op_observer', 'free_mon/free_mon_mongod', 'ftdc/ftdc_mongod', 'index/index_access_method_factory', @@ -2220,6 +2240,7 @@ envWithAsio.CppUnitTest( 'curop', 'dbdirectclient', 'dbmessage', + 'fcv_op_observer', 'index_build_entry_helpers', 'index_builds_coordinator_mongod', 'keys_collection_client_direct', diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index 9136f3e3270..6242303cee6 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -228,11 +228,12 @@ env.Library( 'feature_compatibility_parsers', ], LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/catalog_raii', '$BUILD_DIR/mongo/db/commands', '$BUILD_DIR/mongo/db/dbdirectclient', - '$BUILD_DIR/mongo/db/kill_sessions_local', + '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface', + '$BUILD_DIR/mongo/db/repl/repl_settings', '$BUILD_DIR/mongo/idl/server_parameter', - '$BUILD_DIR/mongo/executor/egress_tag_closer_manager', ], ) diff --git a/src/mongo/db/commands/feature_compatibility_version.cpp b/src/mongo/db/commands/feature_compatibility_version.cpp index 04df3030c56..0a2d3810cb8 100644 --- a/src/mongo/db/commands/feature_compatibility_version.cpp +++ b/src/mongo/db/commands/feature_compatibility_version.cpp @@ -40,7 +40,6 @@ #include "mongo/db/commands/feature_compatibility_version_gen.h" #include "mongo/db/commands/feature_compatibility_version_parser.h" #include "mongo/db/dbdirectclient.h" -#include "mongo/db/kill_sessions_local.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/optime.h" @@ -53,7 +52,6 @@ #include "mongo/db/storage/storage_engine.h" #include "mongo/db/wire_version.h" #include "mongo/db/write_concern_options.h" -#include "mongo/executor/egress_tag_closer_manager.h" #include "mongo/logv2/log.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/catalog_cache.h" @@ -66,8 +64,6 @@ using repl::UnreplicatedWritesBlock; Lock::ResourceMutex FeatureCompatibilityVersion::fcvLock("featureCompatibilityVersionLock"); -MONGO_FAIL_POINT_DEFINE(hangBeforeAbortingRunningTransactionsOnFCVDowngrade); - namespace { bool isWriteableStorageEngine() { return !storageGlobalParams.readOnly && (storageGlobalParams.engine != "devnull"); @@ -203,33 +199,6 @@ bool FeatureCompatibilityVersion::isCleanStartUp() { return true; } -void FeatureCompatibilityVersion::onInsertOrUpdate(OperationContext* opCtx, const BSONObj& doc) { - auto idElement = doc["_id"]; - if (idElement.type() != BSONType::String || - idElement.String() != FeatureCompatibilityVersionParser::kParameterName) { - return; - } - auto newVersion = uassertStatusOK(FeatureCompatibilityVersionParser::parse(doc)); - - // To avoid extra log messages when the targetVersion is set/unset, only log when the version - // changes. - logv2::DynamicAttributes attrs; - bool isDifferent = true; - if (serverGlobalParams.featureCompatibility.isVersionInitialized()) { - const auto currentVersion = serverGlobalParams.featureCompatibility.getVersion(); - attrs.add("currentVersion", FeatureCompatibilityVersionParser::toString(currentVersion)); - isDifferent = currentVersion != newVersion; - } - - if (isDifferent) { - attrs.add("newVersion", FeatureCompatibilityVersionParser::toString(newVersion)); - LOGV2(20459, "Setting featureCompatibilityVersion", attrs); - } - - opCtx->recoveryUnit()->onCommit( - [opCtx, newVersion](boost::optional<Timestamp>) { _setVersion(opCtx, newVersion); }); -} - void FeatureCompatibilityVersion::updateMinWireVersion() { WireSpec& wireSpec = WireSpec::instance(); @@ -338,72 +307,6 @@ void FeatureCompatibilityVersion::fassertInitializedAfterStartup(OperationContex } } -void FeatureCompatibilityVersion::_setVersion( - OperationContext* opCtx, ServerGlobalParams::FeatureCompatibility::Version newVersion) { - serverGlobalParams.featureCompatibility.setVersion(newVersion); - updateMinWireVersion(); - - // (Generic FCV reference): This FCV check should exist across LTS binary versions. - if (newVersion != ServerGlobalParams::FeatureCompatibility::kLastLTS) { - // Close all incoming connections from internal clients with binary versions lower than - // ours. - opCtx->getServiceContext()->getServiceEntryPoint()->endAllSessions( - transport::Session::kLatestVersionInternalClientKeepOpen | - transport::Session::kExternalClientKeepOpen); - // Close all outgoing connections to servers with binary versions lower than ours. - executor::EgressTagCloserManager::get(opCtx->getServiceContext()) - .dropConnections(transport::Session::kKeepOpen); - } - - // (Generic FCV reference): This FCV check should exist across LTS binary versions. - if (newVersion != ServerGlobalParams::FeatureCompatibility::kLatest) { - if (MONGO_unlikely(hangBeforeAbortingRunningTransactionsOnFCVDowngrade.shouldFail())) { - LOGV2(20460, - "FeatureCompatibilityVersion - " - "hangBeforeAbortingRunningTransactionsOnFCVDowngrade fail point enabled, " - "blocking until fail point is disabled"); - hangBeforeAbortingRunningTransactionsOnFCVDowngrade.pauseWhileSet(); - } - // Abort all open transactions when downgrading the featureCompatibilityVersion. - SessionKiller::Matcher matcherAllSessions( - KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx)}); - killSessionsAbortUnpreparedTransactions(opCtx, matcherAllSessions); - } - const auto replCoordinator = repl::ReplicationCoordinator::get(opCtx); - const bool isReplSet = - replCoordinator->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet; - // We only want to increment the server TopologyVersion when the minWireVersion has changed. - // This can only happen in two scenarios: - // 1. Setting featureCompatibilityVersion from downgrading to fullyDowngraded. - // 2. Setting featureCompatibilityVersion from fullyDowngraded to upgrading. - // (Generic FCV reference): This FCV check should exist across LTS binary versions. - const auto shouldIncrementTopologyVersion = - newVersion == ServerGlobalParams::FeatureCompatibility::kLastLTS || - newVersion == ServerGlobalParams::FeatureCompatibility::Version::kUpgradingFrom44To47; - if (isReplSet && shouldIncrementTopologyVersion) { - replCoordinator->incrementTopologyVersion(); - } -} - -void FeatureCompatibilityVersion::onReplicationRollback(OperationContext* opCtx) { - const auto query = BSON("_id" << FeatureCompatibilityVersionParser::kParameterName); - const auto swFcv = repl::StorageInterface::get(opCtx)->findById( - opCtx, NamespaceString::kServerConfigurationNamespace, query["_id"]); - if (swFcv.isOK()) { - const auto featureCompatibilityVersion = swFcv.getValue(); - auto swVersion = FeatureCompatibilityVersionParser::parse(featureCompatibilityVersion); - const auto memoryFcv = serverGlobalParams.featureCompatibility.getVersion(); - if (swVersion.isOK() && (swVersion.getValue() != memoryFcv)) { - auto diskFcv = swVersion.getValue(); - LOGV2(4675801, - "Setting featureCompatibilityVersion as part of rollback", - "newVersion"_attr = FeatureCompatibilityVersionParser::toString(diskFcv), - "oldVersion"_attr = FeatureCompatibilityVersionParser::toString(memoryFcv)); - _setVersion(opCtx, diskFcv); - } - } -} - /** * Read-only server parameter for featureCompatibilityVersion. */ diff --git a/src/mongo/db/commands/feature_compatibility_version.h b/src/mongo/db/commands/feature_compatibility_version.h index 9ac8f0073f5..3f458066614 100644 --- a/src/mongo/db/commands/feature_compatibility_version.h +++ b/src/mongo/db/commands/feature_compatibility_version.h @@ -106,30 +106,10 @@ public: static bool isCleanStartUp(); /** - * Examines a document inserted or updated in the server configuration collection - * (admin.system.version). If it is the featureCompatibilityVersion document, validates the - * document and on commit, updates the server parameter. - */ - static void onInsertOrUpdate(OperationContext* opCtx, const BSONObj& doc); - - /** * Sets the server's outgoing and incomingInternalClient minWireVersions according to the * current featureCompatibilityVersion value. */ static void updateMinWireVersion(); - - /** - * Ensures the in-memory and on-disk FCV states are consistent after a rollback. - */ - static void onReplicationRollback(OperationContext* opCtx); - -private: - /** - * Set the FCV to newVersion, making sure to close any outgoing connections with incompatible - * servers and closing open transactions if necessary. Increments the server TopologyVersion. - */ - static void _setVersion(OperationContext* opCtx, - ServerGlobalParams::FeatureCompatibility::Version newVersion); }; /** diff --git a/src/mongo/db/fcv_op_observer.cpp b/src/mongo/db/fcv_op_observer.cpp new file mode 100644 index 00000000000..723977bf917 --- /dev/null +++ b/src/mongo/db/fcv_op_observer.cpp @@ -0,0 +1,185 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication + +#include "mongo/platform/basic.h" + +#include "mongo/db/fcv_op_observer.h" +#include "mongo/db/op_observer_impl.h" + +#include "mongo/db/catalog/collection_options.h" +#include "mongo/db/commands/feature_compatibility_version.h" +#include "mongo/db/commands/feature_compatibility_version_parser.h" +#include "mongo/db/kill_sessions_local.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/repl/oplog_entry.h" +#include "mongo/db/repl/replication_coordinator.h" +#include "mongo/executor/egress_tag_closer_manager.h" +#include "mongo/logv2/log.h" +#include "mongo/transport/service_entry_point.h" +#include "mongo/util/assert_util.h" + +namespace mongo { + +void FcvOpObserver::_setVersion(OperationContext* opCtx, + ServerGlobalParams::FeatureCompatibility::Version newVersion) { + serverGlobalParams.featureCompatibility.setVersion(newVersion); + FeatureCompatibilityVersion::updateMinWireVersion(); + + // (Generic FCV reference): This FCV check should exist across LTS binary versions. + if (newVersion != ServerGlobalParams::FeatureCompatibility::kLastLTS) { + // Close all incoming connections from internal clients with binary versions lower than + // ours. + opCtx->getServiceContext()->getServiceEntryPoint()->endAllSessions( + transport::Session::kLatestVersionInternalClientKeepOpen | + transport::Session::kExternalClientKeepOpen); + // Close all outgoing connections to servers with binary versions lower than ours. + executor::EgressTagCloserManager::get(opCtx->getServiceContext()) + .dropConnections(transport::Session::kKeepOpen); + } + + // We make assumptions that transactions don't span an FCV change. And FCV changes also take + // the global lock in S mode to create a barrier for operations in IX/X mode, we abort all open + // transactions here to release the global IX locks held by the transactions more proactively + // rather than waiting for the transactions to complete. FCV changes take the global S lock when + // in the upgrading/downgrading state. + if (serverGlobalParams.featureCompatibility.isUpgradingOrDowngrading()) { + SessionKiller::Matcher matcherAllSessions( + KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx)}); + killSessionsAbortUnpreparedTransactions(opCtx, matcherAllSessions); + } + + const auto replCoordinator = repl::ReplicationCoordinator::get(opCtx); + const bool isReplSet = + replCoordinator->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet; + // We only want to increment the server TopologyVersion when the minWireVersion has changed. + // This can only happen in two scenarios: + // 1. Setting featureCompatibilityVersion from downgrading to fullyDowngraded. + // 2. Setting featureCompatibilityVersion from fullyDowngraded to upgrading. + // (Generic FCV reference): This FCV check should exist across LTS binary versions. + const auto shouldIncrementTopologyVersion = + newVersion == ServerGlobalParams::FeatureCompatibility::kLastLTS || + newVersion == ServerGlobalParams::FeatureCompatibility::Version::kUpgradingFrom44To47; + if (isReplSet && shouldIncrementTopologyVersion) { + replCoordinator->incrementTopologyVersion(); + } +} + +void FcvOpObserver::_onInsertOrUpdate(OperationContext* opCtx, const BSONObj& doc) { + auto idElement = doc["_id"]; + if (idElement.type() != BSONType::String || + idElement.String() != FeatureCompatibilityVersionParser::kParameterName) { + return; + } + auto newVersion = uassertStatusOK(FeatureCompatibilityVersionParser::parse(doc)); + + // To avoid extra log messages when the targetVersion is set/unset, only log when the version + // changes. + logv2::DynamicAttributes attrs; + bool isDifferent = true; + if (serverGlobalParams.featureCompatibility.isVersionInitialized()) { + const auto currentVersion = serverGlobalParams.featureCompatibility.getVersion(); + attrs.add("currentVersion", FeatureCompatibilityVersionParser::toString(currentVersion)); + isDifferent = currentVersion != newVersion; + } + + if (isDifferent) { + attrs.add("newVersion", FeatureCompatibilityVersionParser::toString(newVersion)); + LOGV2(20459, "Setting featureCompatibilityVersion", attrs); + } + + opCtx->recoveryUnit()->onCommit( + [opCtx, newVersion](boost::optional<Timestamp>) { _setVersion(opCtx, newVersion); }); +} + +void FcvOpObserver::onInserts(OperationContext* opCtx, + const NamespaceString& nss, + OptionalCollectionUUID uuid, + std::vector<InsertStatement>::const_iterator first, + std::vector<InsertStatement>::const_iterator last, + bool fromMigrate) { + if (nss.isServerConfigurationCollection()) { + for (auto it = first; it != last; it++) { + _onInsertOrUpdate(opCtx, it->doc); + } + } +} + +void FcvOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) { + if (args.updateArgs.update.isEmpty()) { + return; + } + if (args.nss.isServerConfigurationCollection()) { + _onInsertOrUpdate(opCtx, args.updateArgs.updatedDoc); + } +} + +void FcvOpObserver::onDelete(OperationContext* opCtx, + const NamespaceString& nss, + OptionalCollectionUUID uuid, + StmtId stmtId, + bool fromMigrate, + const boost::optional<BSONObj>& deletedDoc) { + // documentKeyDecoration is set in OpObserverImpl::aboutToDelete. So the FcvOpObserver + // relies on the OpObserverImpl also being in the opObserverRegistry. + auto optDocKey = documentKeyDecoration(opCtx); + invariant(optDocKey, nss.ns()); + if (nss.isServerConfigurationCollection()) { + auto id = optDocKey.get().getId().firstElement(); + if (id.type() == BSONType::String && + id.String() == FeatureCompatibilityVersionParser::kParameterName) { + uasserted(40670, "removing FeatureCompatibilityVersion document is not allowed"); + } + } +} + +void FcvOpObserver::onReplicationRollback(OperationContext* opCtx, + const RollbackObserverInfo& rbInfo) { + // Ensures the in-memory and on-disk FCV states are consistent after a rollback. + const auto query = BSON("_id" << FeatureCompatibilityVersionParser::kParameterName); + const auto swFcv = repl::StorageInterface::get(opCtx)->findById( + opCtx, NamespaceString::kServerConfigurationNamespace, query["_id"]); + if (swFcv.isOK()) { + const auto featureCompatibilityVersion = swFcv.getValue(); + auto swVersion = FeatureCompatibilityVersionParser::parse(featureCompatibilityVersion); + const auto memoryFcv = serverGlobalParams.featureCompatibility.getVersion(); + if (swVersion.isOK() && (swVersion.getValue() != memoryFcv)) { + auto diskFcv = swVersion.getValue(); + LOGV2(4675801, + "Setting featureCompatibilityVersion as part of rollback", + "newVersion"_attr = FeatureCompatibilityVersionParser::toString(diskFcv), + "oldVersion"_attr = FeatureCompatibilityVersionParser::toString(memoryFcv)); + _setVersion(opCtx, diskFcv); + } + } +} + +} // namespace mongo diff --git a/src/mongo/db/fcv_op_observer.h b/src/mongo/db/fcv_op_observer.h new file mode 100644 index 00000000000..93c4d0b7bab --- /dev/null +++ b/src/mongo/db/fcv_op_observer.h @@ -0,0 +1,200 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/op_observer_noop.h" +#include "mongo/db/server_options.h" + +namespace mongo { + +/** + * OpObserver for Feature Compatibility Version (FCV). + * Observes all writes to the FCV document under admin.system.version and sets the in-memory FCV + * value. + */ +class FcvOpObserver final : public OpObserver { + FcvOpObserver(const FcvOpObserver&) = delete; + FcvOpObserver& operator=(const FcvOpObserver&) = delete; + +public: + FcvOpObserver() = default; + ~FcvOpObserver() = default; + + // FcvOpObserver overrides. + + void onInserts(OperationContext* opCtx, + const NamespaceString& nss, + OptionalCollectionUUID uuid, + std::vector<InsertStatement>::const_iterator first, + std::vector<InsertStatement>::const_iterator last, + bool fromMigrate) final; + + void onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) final; + + void onDelete(OperationContext* opCtx, + const NamespaceString& nss, + OptionalCollectionUUID uuid, + StmtId stmtId, + bool fromMigrate, + const boost::optional<BSONObj>& deletedDoc) final; + + void onReplicationRollback(OperationContext* opCtx, const RollbackObserverInfo& rbInfo) final; + + // Noop overrides. + + void onCreateIndex(OperationContext* opCtx, + const NamespaceString& nss, + CollectionUUID uuid, + BSONObj indexDoc, + bool fromMigrate) final {} + + void onStartIndexBuild(OperationContext* opCtx, + const NamespaceString& nss, + CollectionUUID collUUID, + const UUID& indexBuildUUID, + const std::vector<BSONObj>& indexes, + bool fromMigrate) final {} + + void onStartIndexBuildSinglePhase(OperationContext* opCtx, const NamespaceString& nss) final {} + + void onCommitIndexBuild(OperationContext* opCtx, + const NamespaceString& nss, + CollectionUUID collUUID, + const UUID& indexBuildUUID, + const std::vector<BSONObj>& indexes, + bool fromMigrate) final {} + + void onAbortIndexBuild(OperationContext* opCtx, + const NamespaceString& nss, + CollectionUUID collUUID, + const UUID& indexBuildUUID, + const std::vector<BSONObj>& indexes, + const Status& cause, + bool fromMigrate) final {} + + void aboutToDelete(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& doc) final {} + void onInternalOpMessage(OperationContext* opCtx, + const NamespaceString& nss, + const boost::optional<UUID> uuid, + const BSONObj& msgObj, + const boost::optional<BSONObj> o2MsgObj, + const boost::optional<repl::OpTime> preImageOpTime, + const boost::optional<repl::OpTime> postImageOpTime, + const boost::optional<repl::OpTime> prevWriteOpTimeInTransaction, + const boost::optional<OplogSlot> slot) final {} + void onCreateCollection(OperationContext* opCtx, + Collection* coll, + const NamespaceString& collectionName, + const CollectionOptions& options, + const BSONObj& idIndex, + const OplogSlot& createOpTime) final {} + void onCollMod(OperationContext* opCtx, + const NamespaceString& nss, + OptionalCollectionUUID uuid, + const BSONObj& collModCmd, + const CollectionOptions& oldCollOptions, + boost::optional<IndexCollModInfo> indexInfo) final {} + void onDropDatabase(OperationContext* opCtx, const std::string& dbName) final {} + repl::OpTime onDropCollection(OperationContext* opCtx, + const NamespaceString& collectionName, + OptionalCollectionUUID uuid, + std::uint64_t numRecords, + const CollectionDropType dropType) final { + return {}; + } + void onDropIndex(OperationContext* opCtx, + const NamespaceString& nss, + OptionalCollectionUUID uuid, + const std::string& indexName, + const BSONObj& idxDescriptor) final {} + void onRenameCollection(OperationContext* opCtx, + const NamespaceString& fromCollection, + const NamespaceString& toCollection, + OptionalCollectionUUID uuid, + OptionalCollectionUUID dropTargetUUID, + std::uint64_t numRecords, + bool stayTemp) final {} + repl::OpTime preRenameCollection(OperationContext* opCtx, + const NamespaceString& fromCollection, + const NamespaceString& toCollection, + OptionalCollectionUUID uuid, + OptionalCollectionUUID dropTargetUUID, + std::uint64_t numRecords, + bool stayTemp) final { + return {}; + } + void postRenameCollection(OperationContext* opCtx, + const NamespaceString& fromCollection, + const NamespaceString& toCollection, + OptionalCollectionUUID uuid, + OptionalCollectionUUID dropTargetUUID, + bool stayTemp) final {} + void onApplyOps(OperationContext* opCtx, + const std::string& dbName, + const BSONObj& applyOpCmd) final {} + void onEmptyCapped(OperationContext* opCtx, + const NamespaceString& collectionName, + OptionalCollectionUUID uuid) final {} + void onUnpreparedTransactionCommit(OperationContext* opCtx, + std::vector<repl::ReplOperation>* statements, + size_t numberOfPreImagesToWrite) final {} + void onPreparedTransactionCommit( + OperationContext* opCtx, + OplogSlot commitOplogEntryOpTime, + Timestamp commitTimestamp, + const std::vector<repl::ReplOperation>& statements) noexcept final{}; + void onTransactionPrepare(OperationContext* opCtx, + const std::vector<OplogSlot>& reservedSlots, + std::vector<repl::ReplOperation>* statements, + size_t numberOfPreImagesToWrite) final{}; + void onTransactionAbort(OperationContext* opCtx, + boost::optional<OplogSlot> abortOplogEntryOpTime) final{}; + void onMajorityCommitPointUpdate(ServiceContext* service, + const repl::OpTime& newCommitPoint) final {} + +private: + /** + * Set the FCV to newVersion, making sure to close any outgoing connections with incompatible + * servers and closing open transactions if necessary. Increments the server TopologyVersion. + */ + static void _setVersion(OperationContext* opCtx, + ServerGlobalParams::FeatureCompatibility::Version newVersion); + + /** + * Examines a document inserted or updated in the server configuration collection + * (admin.system.version). If it is the featureCompatibilityVersion document, validates the + * document and on commit, updates the server parameter. + */ + static void _onInsertOrUpdate(OperationContext* opCtx, const BSONObj& doc); +}; + +} // namespace mongo diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp index d49f70d4440..1d5673b25bf 100644 --- a/src/mongo/db/mongod_main.cpp +++ b/src/mongo/db/mongod_main.cpp @@ -77,6 +77,7 @@ #include "mongo/db/dbhelpers.h" #include "mongo/db/dbmessage.h" #include "mongo/db/exec/working_set_common.h" +#include "mongo/db/fcv_op_observer.h" #include "mongo/db/free_mon/free_mon_mongod.h" #include "mongo/db/ftdc/ftdc_mongod.h" #include "mongo/db/global_settings.h" @@ -984,6 +985,7 @@ void setUpObservers(ServiceContext* serviceContext) { opObserverRegistry->addObserver(std::make_unique<AuthOpObserver>()); opObserverRegistry->addObserver( std::make_unique<repl::PrimaryOnlyServiceOpObserver>(serviceContext)); + opObserverRegistry->addObserver(std::make_unique<FcvOpObserver>()); setupFreeMonitoringOpObserver(opObserverRegistry.get()); diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 11c6402566e..6cf1e8f9440 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -40,8 +40,6 @@ #include "mongo/db/catalog/collection_options.h" #include "mongo/db/catalog/database.h" #include "mongo/db/catalog/database_holder.h" -#include "mongo/db/commands/feature_compatibility_version.h" -#include "mongo/db/commands/feature_compatibility_version_parser.h" #include "mongo/db/commands/txn_cmds_gen.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/write_conflict_exception.h" @@ -72,15 +70,15 @@ namespace mongo { using repl::MutableOplogEntry; using repl::OplogEntry; +const OperationContext::Decoration<boost::optional<OpObserverImpl::DocumentKey>> + documentKeyDecoration = + OperationContext::declareDecoration<boost::optional<OpObserverImpl::DocumentKey>>(); namespace { MONGO_FAIL_POINT_DEFINE(failCollectionUpdates); MONGO_FAIL_POINT_DEFINE(hangAndFailUnpreparedCommitAfterReservingOplogSlot); -const auto documentKeyDecoration = - OperationContext::declareDecoration<boost::optional<OpObserverImpl::DocumentKey>>(); - constexpr auto kNumRecordsFieldName = "numRecords"_sd; constexpr auto kMsgFieldName = "msg"_sd; constexpr long long kInvalidNumRecords = -1LL; @@ -490,12 +488,6 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, Scope::storedFuncMod(opCtx); } else if (nss.coll() == DurableViewCatalog::viewsCollectionName()) { DurableViewCatalog::onExternalChange(opCtx, nss); - } else if (nss == NamespaceString::kServerConfigurationNamespace) { - // We must check server configuration collection writes for featureCompatibilityVersion - // document changes. - for (auto it = first; it != last; it++) { - FeatureCompatibilityVersion::onInsertOrUpdate(opCtx, it->doc); - } } else if (nss == NamespaceString::kSessionTransactionsTableNamespace && !lastOpTime.isNull()) { for (auto it = first; it != last; it++) { MongoDSessionCatalog::observeDirectWriteToConfigTransactions(opCtx, it->doc); @@ -566,10 +558,6 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg Scope::storedFuncMod(opCtx); } else if (args.nss.coll() == DurableViewCatalog::viewsCollectionName()) { DurableViewCatalog::onExternalChange(opCtx, args.nss); - } else if (args.nss == NamespaceString::kServerConfigurationNamespace) { - // We must check server configuration collection writes for featureCompatibilityVersion - // document changes. - FeatureCompatibilityVersion::onInsertOrUpdate(opCtx, args.updateArgs.updatedDoc); } else if (args.nss == NamespaceString::kSessionTransactionsTableNamespace && !opTime.writeOpTime.isNull()) { MongoDSessionCatalog::observeDirectWriteToConfigTransactions(opCtx, @@ -636,11 +624,6 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, Scope::storedFuncMod(opCtx); } else if (nss.coll() == DurableViewCatalog::viewsCollectionName()) { DurableViewCatalog::onExternalChange(opCtx, nss); - } else if (nss.isServerConfigurationCollection()) { - auto _id = documentKey.getId().firstElement(); - if (_id.type() == BSONType::String && - _id.String() == FeatureCompatibilityVersionParser::kParameterName) - uasserted(40670, "removing FeatureCompatibilityVersion document is not allowed"); } else if (nss == NamespaceString::kSessionTransactionsTableNamespace && !opTime.writeOpTime.isNull()) { MongoDSessionCatalog::observeDirectWriteToConfigTransactions(opCtx, documentKey.getId()); @@ -1339,9 +1322,6 @@ void OpObserverImpl::onReplicationRollback(OperationContext* opCtx, // Force the default read/write concern cache to reload on next access in case the defaults // document was rolled back. ReadWriteConcernDefaults::get(opCtx).invalidate(); - - // Make sure the in-memory FCV matches the on-disk FCV. - FeatureCompatibilityVersion::onReplicationRollback(opCtx); } } // namespace mongo diff --git a/src/mongo/db/op_observer_impl.h b/src/mongo/db/op_observer_impl.h index 266cdb603a6..6e4a27110a4 100644 --- a/src/mongo/db/op_observer_impl.h +++ b/src/mongo/db/op_observer_impl.h @@ -215,4 +215,7 @@ private: const repl::OpTime& prepareOrCommitOptime) {} }; +extern const OperationContext::Decoration<boost::optional<OpObserverImpl::DocumentKey>> + documentKeyDecoration; + } // namespace mongo |