diff options
author | Matt Kneiser <matt.kneiser@mongodb.com> | 2023-05-02 23:27:51 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-05-03 00:54:03 +0000 |
commit | 8037d49bdffb50394fbcb5d6af624d02022eb656 (patch) | |
tree | 36e8e6dc0ceaedc5ab75c8f67820931cb629b254 /src/mongo/db/op_observer | |
parent | adc2eea4112bab6f94d52cd22c2d7cf908ec3071 (diff) | |
download | mongo-8037d49bdffb50394fbcb5d6af624d02022eb656.tar.gz |
SERVER-76268 Create FallbackOpObserver
Diffstat (limited to 'src/mongo/db/op_observer')
-rw-r--r-- | src/mongo/db/op_observer/SConscript | 24 | ||||
-rw-r--r-- | src/mongo/db/op_observer/batched_write_context.h | 1 | ||||
-rw-r--r-- | src/mongo/db/op_observer/fallback_op_observer.cpp | 209 | ||||
-rw-r--r-- | src/mongo/db/op_observer/fallback_op_observer.h | 76 | ||||
-rw-r--r-- | src/mongo/db/op_observer/op_observer_impl.cpp | 113 |
5 files changed, 307 insertions, 116 deletions
diff --git a/src/mongo/db/op_observer/SConscript b/src/mongo/db/op_observer/SConscript index c941ccc1610..95502c35710 100644 --- a/src/mongo/db/op_observer/SConscript +++ b/src/mongo/db/op_observer/SConscript @@ -18,12 +18,14 @@ env.Library( target='op_observer_util', source=[ 'op_observer_util.cpp', + 'batched_write_context.cpp', ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/bson/dotted_path_support', '$BUILD_DIR/mongo/db/catalog/collection_options', '$BUILD_DIR/mongo/db/shard_role_api', + '$BUILD_DIR/mongo/db/transaction/transaction_operations', ], ) @@ -52,7 +54,6 @@ env.Library( target='op_observer_impl', source=[ 'op_observer_impl.cpp', - 'batched_write_context.cpp', ], LIBDEPS=[ 'op_observer', @@ -77,11 +78,9 @@ env.Library( '$BUILD_DIR/mongo/db/repl/repl_server_parameters', '$BUILD_DIR/mongo/db/repl/tenant_migration_access_blocker', '$BUILD_DIR/mongo/db/server_feature_flags', - '$BUILD_DIR/mongo/db/session/session_catalog', '$BUILD_DIR/mongo/db/session/session_catalog_mongod', '$BUILD_DIR/mongo/db/transaction/transaction', '$BUILD_DIR/mongo/db/transaction/transaction_operations', - '$BUILD_DIR/mongo/db/views/view_catalog_helpers', '$BUILD_DIR/mongo/s/coreshard', '$BUILD_DIR/mongo/s/grid', 'op_observer_util', @@ -161,3 +160,22 @@ env.CppUnitTest( 'user_write_block_mode_op_observer', ], ) + +env.Library( + target='fallback_op_observer', + source=[ + 'fallback_op_observer.cpp', + ], + LIBDEPS=[ + 'op_observer', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/catalog/collection_catalog', + '$BUILD_DIR/mongo/db/read_write_concern_defaults', + '$BUILD_DIR/mongo/db/session/session_catalog', + '$BUILD_DIR/mongo/db/session/session_catalog_mongod', + '$BUILD_DIR/mongo/db/transaction/transaction', + '$BUILD_DIR/mongo/db/views/view_catalog_helpers', + 'op_observer_util', + ], +) diff --git a/src/mongo/db/op_observer/batched_write_context.h b/src/mongo/db/op_observer/batched_write_context.h index d1972e69880..def884044e6 100644 --- a/src/mongo/db/op_observer/batched_write_context.h +++ b/src/mongo/db/op_observer/batched_write_context.h @@ -30,7 +30,6 @@ #pragma once #include "mongo/db/operation_context.h" -#include "mongo/db/repl/oplog_entry.h" #include "mongo/db/transaction/transaction_operations.h" namespace mongo { diff --git a/src/mongo/db/op_observer/fallback_op_observer.cpp b/src/mongo/db/op_observer/fallback_op_observer.cpp new file mode 100644 index 00000000000..d163e89ce8c --- /dev/null +++ b/src/mongo/db/op_observer/fallback_op_observer.cpp @@ -0,0 +1,209 @@ +/** + * Copyright (C) 2023-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/op_observer/fallback_op_observer.h" + +#include "mongo/db/catalog/collection_catalog.h" +#include "mongo/db/keys_collection_document_gen.h" +#include "mongo/db/logical_time_validator.h" +#include "mongo/db/op_observer/batched_write_context.h" +#include "mongo/db/op_observer/op_observer_util.h" +#include "mongo/db/read_write_concern_defaults.h" +#include "mongo/db/session/session_catalog.h" +#include "mongo/db/session/session_catalog_mongod.h" +#include "mongo/db/session/session_killer.h" +#include "mongo/db/views/util.h" +#include "mongo/db/views/view_catalog_helpers.h" +#include "mongo/util/namespace_string_util.h" + +namespace mongo { + +void FallbackOpObserver::onInserts(OperationContext* opCtx, + const CollectionPtr& coll, + std::vector<InsertStatement>::const_iterator first, + std::vector<InsertStatement>::const_iterator last, + std::vector<bool> fromMigrate, + bool defaultFromMigrate, + InsertsOpStateAccumulator* opAccumulator) { + auto txnParticipant = TransactionParticipant::get(opCtx); + const bool inMultiDocumentTransaction = + txnParticipant && opCtx->writesAreReplicated() && txnParticipant.transactionIsOpen(); + if (inMultiDocumentTransaction && !opCtx->getWriteUnitOfWork()) { + return; + } + + const auto& nss = coll->ns(); + + if (nss.isSystemDotJavascript()) { + Scope::storedFuncMod(opCtx); + } else if (nss.isSystemDotViews()) { + try { + for (auto it = first; it != last; it++) { + view_util::validateViewDefinitionBSON(opCtx, it->doc, nss.dbName()); + + uassertStatusOK(CollectionCatalog::get(opCtx)->createView( + opCtx, + NamespaceStringUtil::deserialize(nss.dbName().tenantId(), + it->doc.getStringField("_id")), + NamespaceStringUtil::parseNamespaceFromDoc(nss.dbName(), + it->doc.getStringField("viewOn")), + BSONArray{it->doc.getObjectField("pipeline")}, + view_catalog_helpers::validatePipeline, + it->doc.getObjectField("collation"), + ViewsForDatabase::Durability::kAlreadyDurable)); + } + } catch (const DBException&) { + // If a previous operation left the view catalog in an invalid state, our inserts can + // fail even if all the definitions are valid. Reloading may help us reset the state. + CollectionCatalog::get(opCtx)->reloadViews(opCtx, nss.dbName()); + } + } else if (nss == NamespaceString::kSessionTransactionsTableNamespace) { + if (opAccumulator) { + auto& opTimeList = opAccumulator->opTimes; + if (!opTimeList.empty() && !opTimeList.back().isNull()) { + for (auto it = first; it != last; it++) { + auto mongoDSessionCatalog = MongoDSessionCatalog::get(opCtx); + mongoDSessionCatalog->observeDirectWriteToConfigTransactions(opCtx, it->doc); + } + } + } + } else if (nss == NamespaceString::kConfigSettingsNamespace) { + for (auto it = first; it != last; it++) { + ReadWriteConcernDefaults::get(opCtx).observeDirectWriteToConfigSettings( + opCtx, it->doc["_id"], it->doc); + } + } else if (nss == NamespaceString::kExternalKeysCollectionNamespace) { + for (auto it = first; it != last; it++) { + auto externalKey = + ExternalKeysCollectionDocument::parse(IDLParserContext("externalKey"), it->doc); + opCtx->recoveryUnit()->onCommit( + [this, externalKey = std::move(externalKey)](OperationContext* opCtx, + boost::optional<Timestamp>) mutable { + auto validator = LogicalTimeValidator::get(opCtx); + if (validator) { + validator->cacheExternalKey(externalKey); + } + }); + } + } +} + +void FallbackOpObserver::onUpdate(OperationContext* opCtx, + const OplogUpdateEntryArgs& args, + OpStateAccumulator* opAccumulator) { + if (args.updateArgs->update.isEmpty()) { + return; + } + + const auto& nss = args.coll->ns(); + + if (nss.isSystemDotJavascript()) { + Scope::storedFuncMod(opCtx); + } else if (nss.isSystemDotViews()) { + CollectionCatalog::get(opCtx)->reloadViews(opCtx, nss.dbName()); + } else if (nss == NamespaceString::kSessionTransactionsTableNamespace && + !opAccumulator->opTime.writeOpTime.isNull()) { + auto mongoDSessionCatalog = MongoDSessionCatalog::get(opCtx); + mongoDSessionCatalog->observeDirectWriteToConfigTransactions(opCtx, + args.updateArgs->updatedDoc); + } else if (nss == NamespaceString::kConfigSettingsNamespace) { + ReadWriteConcernDefaults::get(opCtx).observeDirectWriteToConfigSettings( + opCtx, args.updateArgs->updatedDoc["_id"], args.updateArgs->updatedDoc); + } +} + +void FallbackOpObserver::onDelete(OperationContext* opCtx, + const CollectionPtr& coll, + StmtId stmtId, + const OplogDeleteEntryArgs& args, + OpStateAccumulator* opAccumulator) { + const auto& nss = coll->ns(); + const bool inBatchedWrite = BatchedWriteContext::get(opCtx).writesAreBatched(); + + auto optDocKey = repl::documentKeyDecoration(opCtx); + invariant(optDocKey, nss.toStringForErrorMsg()); + auto& documentKey = optDocKey.value(); + + if (nss.isSystemDotJavascript()) { + Scope::storedFuncMod(opCtx); + } else if (nss.isSystemDotViews()) { + CollectionCatalog::get(opCtx)->reloadViews(opCtx, nss.dbName()); + } else if (nss == NamespaceString::kSessionTransactionsTableNamespace && + (inBatchedWrite || !opAccumulator->opTime.writeOpTime.isNull())) { + auto mongoDSessionCatalog = MongoDSessionCatalog::get(opCtx); + mongoDSessionCatalog->observeDirectWriteToConfigTransactions(opCtx, documentKey.getId()); + } else if (nss == NamespaceString::kConfigSettingsNamespace) { + ReadWriteConcernDefaults::get(opCtx).observeDirectWriteToConfigSettings( + opCtx, documentKey.getId().firstElement(), boost::none); + } +} + +void FallbackOpObserver::onDropDatabase(OperationContext* opCtx, const DatabaseName& dbName) { + if (dbName.db() == NamespaceString::kSessionTransactionsTableNamespace.db()) { + auto mongoDSessionCatalog = MongoDSessionCatalog::get(opCtx); + mongoDSessionCatalog->invalidateAllSessions(opCtx); + } +} + +repl::OpTime FallbackOpObserver::onDropCollection(OperationContext* opCtx, + const NamespaceString& collectionName, + const UUID& uuid, + std::uint64_t numRecords, + CollectionDropType dropType) { + if (collectionName.isSystemDotJavascript()) { + Scope::storedFuncMod(opCtx); + } else if (collectionName.isSystemDotViews()) { + CollectionCatalog::get(opCtx)->clearViews(opCtx, collectionName.dbName()); + } else if (collectionName == NamespaceString::kSessionTransactionsTableNamespace) { + // Disallow this drop if there are currently prepared transactions. + const auto sessionCatalog = SessionCatalog::get(opCtx); + SessionKiller::Matcher matcherAllSessions( + KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx)}); + bool noPreparedTxns = true; + sessionCatalog->scanSessions(matcherAllSessions, [&](const ObservableSession& session) { + auto txnParticipant = TransactionParticipant::get(session); + if (txnParticipant.transactionIsPrepared()) { + noPreparedTxns = false; + } + }); + uassert(4852500, + "Unable to drop transactions table (config.transactions) while prepared " + "transactions are present.", + noPreparedTxns); + + auto mongoDSessionCatalog = MongoDSessionCatalog::get(opCtx); + mongoDSessionCatalog->invalidateAllSessions(opCtx); + } else if (collectionName == NamespaceString::kConfigSettingsNamespace) { + ReadWriteConcernDefaults::get(opCtx).invalidate(); + } + + return {}; +} + +} // namespace mongo diff --git a/src/mongo/db/op_observer/fallback_op_observer.h b/src/mongo/db/op_observer/fallback_op_observer.h new file mode 100644 index 00000000000..5c5d86f04da --- /dev/null +++ b/src/mongo/db/op_observer/fallback_op_observer.h @@ -0,0 +1,76 @@ +/** + * Copyright (C) 2023-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/op_observer/op_observer_noop.h" + +namespace mongo { + +/** + * This OpObserver contains notifications to miscellaneous entities that were sitting in + * OpObserverImpl. + */ +class FallbackOpObserver final : public OpObserverNoop { + FallbackOpObserver(const FallbackOpObserver&) = delete; + FallbackOpObserver& operator=(const FallbackOpObserver&) = delete; + +public: + FallbackOpObserver() = default; + ~FallbackOpObserver() = default; + + void onInserts(OperationContext* opCtx, + const CollectionPtr& coll, + std::vector<InsertStatement>::const_iterator first, + std::vector<InsertStatement>::const_iterator last, + std::vector<bool> fromMigrate, + bool defaultFromMigrate, + InsertsOpStateAccumulator* opAccumulator = nullptr) final; + + void onUpdate(OperationContext* opCtx, + const OplogUpdateEntryArgs& args, + OpStateAccumulator* opAccumulator = nullptr) final; + + void onDelete(OperationContext* opCtx, + const CollectionPtr& coll, + StmtId stmtId, + const OplogDeleteEntryArgs& args, + OpStateAccumulator* opAccumulator = nullptr) final; + + void onDropDatabase(OperationContext* opCtx, const DatabaseName& dbName) final; + + using OpObserver::onDropCollection; + repl::OpTime onDropCollection(OperationContext* opCtx, + const NamespaceString& collectionName, + const UUID& uuid, + std::uint64_t numRecords, + CollectionDropType dropType) final; +}; + +} // namespace mongo diff --git a/src/mongo/db/op_observer/op_observer_impl.cpp b/src/mongo/db/op_observer/op_observer_impl.cpp index 15aa42df6f6..0b2d446358f 100644 --- a/src/mongo/db/op_observer/op_observer_impl.cpp +++ b/src/mongo/db/op_observer/op_observer_impl.cpp @@ -50,7 +50,6 @@ #include "mongo/db/create_indexes_gen.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/index/index_descriptor.h" -#include "mongo/db/keys_collection_document_gen.h" #include "mongo/db/logical_time_validator.h" #include "mongo/db/multitenancy_gen.h" #include "mongo/db/namespace_string.h" @@ -69,12 +68,10 @@ #include "mongo/db/s/sharding_write_router.h" #include "mongo/db/server_feature_flags_gen.h" #include "mongo/db/server_options.h" -#include "mongo/db/session/session_catalog_mongod.h" +#include "mongo/db/session/session_txn_record_gen.h" #include "mongo/db/storage/storage_parameters_gen.h" #include "mongo/db/transaction/transaction_participant.h" #include "mongo/db/transaction/transaction_participant_gen.h" -#include "mongo/db/views/util.h" -#include "mongo/db/views/view_catalog_helpers.h" #include "mongo/logv2/log.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" @@ -831,54 +828,6 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, shardingWriteRouter, defaultFromMigrate, inMultiDocumentTransaction); - - if (nss.coll() == "system.js") { - Scope::storedFuncMod(opCtx); - } else if (nss.isSystemDotViews()) { - try { - for (auto it = first; it != last; it++) { - view_util::validateViewDefinitionBSON(opCtx, it->doc, nss.dbName()); - - uassertStatusOK(CollectionCatalog::get(opCtx)->createView( - opCtx, - NamespaceStringUtil::deserialize(nss.dbName().tenantId(), - it->doc.getStringField("_id")), - NamespaceStringUtil::parseNamespaceFromDoc(nss.dbName(), - it->doc.getStringField("viewOn")), - BSONArray{it->doc.getObjectField("pipeline")}, - view_catalog_helpers::validatePipeline, - it->doc.getObjectField("collation"), - ViewsForDatabase::Durability::kAlreadyDurable)); - } - } catch (const DBException&) { - // If a previous operation left the view catalog in an invalid state, our inserts can - // fail even if all the definitions are valid. Reloading may help us reset the state. - CollectionCatalog::get(opCtx)->reloadViews(opCtx, nss.dbName()); - } - } else if (nss == NamespaceString::kSessionTransactionsTableNamespace && !lastOpTime.isNull()) { - for (auto it = first; it != last; it++) { - auto mongoDSessionCatalog = MongoDSessionCatalog::get(opCtx); - mongoDSessionCatalog->observeDirectWriteToConfigTransactions(opCtx, it->doc); - } - } else if (nss == NamespaceString::kConfigSettingsNamespace) { - for (auto it = first; it != last; it++) { - ReadWriteConcernDefaults::get(opCtx).observeDirectWriteToConfigSettings( - opCtx, it->doc["_id"], it->doc); - } - } else if (nss == NamespaceString::kExternalKeysCollectionNamespace) { - for (auto it = first; it != last; it++) { - auto externalKey = - ExternalKeysCollectionDocument::parse(IDLParserContext("externalKey"), it->doc); - opCtx->recoveryUnit()->onCommit( - [this, externalKey = std::move(externalKey)](OperationContext* opCtx, - boost::optional<Timestamp>) mutable { - auto validator = LogicalTimeValidator::get(opCtx); - if (validator) { - validator->cacheExternalKey(externalKey); - } - }); - } - } } void OpObserverImpl::onInsertGlobalIndexKey(OperationContext* opCtx, @@ -1103,20 +1052,6 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, inMultiDocumentTransaction); } } - - if (args.coll->ns().coll() == "system.js") { - Scope::storedFuncMod(opCtx); - } else if (args.coll->ns().isSystemDotViews()) { - CollectionCatalog::get(opCtx)->reloadViews(opCtx, args.coll->ns().dbName()); - } else if (args.coll->ns() == NamespaceString::kSessionTransactionsTableNamespace && - !opTime.writeOpTime.isNull()) { - auto mongoDSessionCatalog = MongoDSessionCatalog::get(opCtx); - mongoDSessionCatalog->observeDirectWriteToConfigTransactions(opCtx, - args.updateArgs->updatedDoc); - } else if (args.coll->ns() == NamespaceString::kConfigSettingsNamespace) { - ReadWriteConcernDefaults::get(opCtx).observeDirectWriteToConfigSettings( - opCtx, args.updateArgs->updatedDoc["_id"], args.updateArgs->updatedDoc); - } } void OpObserverImpl::aboutToDelete(OperationContext* opCtx, @@ -1263,19 +1198,6 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, inMultiDocumentTransaction); } } - - if (nss.coll() == "system.js") { - Scope::storedFuncMod(opCtx); - } else if (nss.isSystemDotViews()) { - CollectionCatalog::get(opCtx)->reloadViews(opCtx, nss.dbName()); - } else if (nss == NamespaceString::kSessionTransactionsTableNamespace && - (inBatchedWrite || !opTime.writeOpTime.isNull())) { - auto mongoDSessionCatalog = MongoDSessionCatalog::get(opCtx); - mongoDSessionCatalog->observeDirectWriteToConfigTransactions(opCtx, documentKey.getId()); - } else if (nss == NamespaceString::kConfigSettingsNamespace) { - ReadWriteConcernDefaults::get(opCtx).observeDirectWriteToConfigSettings( - opCtx, documentKey.getId().firstElement(), boost::none); - } } void OpObserverImpl::onInternalOpMessage( @@ -1432,11 +1354,6 @@ void OpObserverImpl::onDropDatabase(OperationContext* opCtx, const DatabaseName& uassert(50714, "dropping the admin database is not allowed.", dbName.db() != DatabaseName::kAdmin.db()); - - if (dbName.db() == NamespaceString::kSessionTransactionsTableNamespace.db()) { - auto mongoDSessionCatalog = MongoDSessionCatalog::get(opCtx); - mongoDSessionCatalog->invalidateAllSessions(opCtx); - } } repl::OpTime OpObserverImpl::onDropCollection(OperationContext* opCtx, @@ -1479,34 +1396,6 @@ repl::OpTime OpObserverImpl::onDropCollection(OperationContext* opCtx, "dropping the server configuration collection (admin.system.version) is not allowed.", collectionName != NamespaceString::kServerConfigurationNamespace); - if (collectionName.isSystemDotViews()) { - CollectionCatalog::get(opCtx)->clearViews(opCtx, collectionName.dbName()); - } else if (collectionName == NamespaceString::kSessionTransactionsTableNamespace) { - // Disallow this drop if there are currently prepared transactions. - const auto sessionCatalog = SessionCatalog::get(opCtx); - SessionKiller::Matcher matcherAllSessions( - KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx)}); - bool noPreparedTxns = true; - sessionCatalog->scanSessions(matcherAllSessions, [&](const ObservableSession& session) { - auto txnParticipant = TransactionParticipant::get(session); - if (txnParticipant.transactionIsPrepared()) { - noPreparedTxns = false; - } - }); - uassert(4852500, - "Unable to drop transactions table (config.transactions) while prepared " - "transactions are present.", - noPreparedTxns); - - auto mongoDSessionCatalog = MongoDSessionCatalog::get(opCtx); - mongoDSessionCatalog->invalidateAllSessions(opCtx); - } else if (collectionName == NamespaceString::kConfigSettingsNamespace) { - ReadWriteConcernDefaults::get(opCtx).invalidate(); - } else if (collectionName.isSystemDotJavascript()) { - // Inform the JavaScript engine of the change to system.js. - Scope::storedFuncMod(opCtx); - } - return {}; } |