From 4ad4646e089410bee0926806d10f809baab9fbe9 Mon Sep 17 00:00:00 2001 From: Varun Ravichandran Date: Sun, 8 May 2022 22:58:12 +0000 Subject: SERVER-66082: Propagate cluster server parameters to new nodes during file-copy based initial sync (cherry picked from commit 33ced84ad3537acb5a70907b6febb34adbcfbbcc) --- src/mongo/db/mongod_main.cpp | 4 - src/mongo/idl/SConscript | 51 ++++++- .../idl/cluster_server_parameter_initializer.cpp | 159 +++++++++++++++++++ .../idl/cluster_server_parameter_initializer.h | 116 ++++++++++++++ .../cluster_server_parameter_initializer_test.cpp | 141 +++++++++++++++++ .../idl/cluster_server_parameter_op_observer.cpp | 143 ++---------------- .../idl/cluster_server_parameter_op_observer.h | 9 -- .../cluster_server_parameter_op_observer_test.cpp | 161 ++------------------ src/mongo/idl/cluster_server_parameter_test_util.h | 168 +++++++++++++++++++++ 9 files changed, 660 insertions(+), 292 deletions(-) create mode 100644 src/mongo/idl/cluster_server_parameter_initializer.cpp create mode 100644 src/mongo/idl/cluster_server_parameter_initializer.h create mode 100644 src/mongo/idl/cluster_server_parameter_initializer_test.cpp create mode 100644 src/mongo/idl/cluster_server_parameter_test_util.h diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp index bce1bc1815c..a3a56f705f6 100644 --- a/src/mongo/db/mongod_main.cpp +++ b/src/mongo/db/mongod_main.cpp @@ -548,10 +548,6 @@ ExitCode _initAndListen(ServiceContext* serviceContext, int listenPort) { auto const globalAuthzManager = AuthorizationManager::get(serviceContext); uassertStatusOK(globalAuthzManager->initialize(startupOpCtx.get())); - if (gFeatureFlagClusterWideConfig.isEnabledAndIgnoreFCV()) { - ClusterServerParameterOpObserver::initializeAllParametersFromDisk(startupOpCtx.get()); - } - if (audit::initializeManager) { audit::initializeManager(startupOpCtx.get()); } diff --git a/src/mongo/idl/SConscript b/src/mongo/idl/SConscript index de25a54c950..2fcbf03c37a 100644 --- a/src/mongo/idl/SConscript +++ b/src/mongo/idl/SConscript @@ -63,6 +63,20 @@ env.Library( ], ) +env.Library( + target='cluster_server_parameter_initializer', + source=[ + 'cluster_server_parameter_initializer.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/dbdirectclient', + '$BUILD_DIR/mongo/db/repl/replica_set_aware_service', + ] +) + env.Library( target='cluster_server_parameter_op_observer', source=[ @@ -74,17 +88,27 @@ env.Library( LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/db/dbdirectclient', '$BUILD_DIR/mongo/db/op_observer', + 'cluster_server_parameter_initializer', ], ) -env.CppUnitTest( - target='cluster_server_parameter_test', +env.Library( + target='cluster_server_parameter_test_parameter', source=[ 'cluster_server_parameter_test.idl', - 'cluster_server_parameter_op_observer_test.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/base', + 'cluster_server_parameter', + ] +) + +env.CppUnitTest( + target='cluster_server_parameter_op_observer_test', + source=[ + 'cluster_server_parameter_op_observer_test.cpp', + ], + LIBDEPS=[ '$BUILD_DIR/mongo/db/auth/authmocks', '$BUILD_DIR/mongo/db/change_stream_options_manager', '$BUILD_DIR/mongo/db/repl/oplog', @@ -93,8 +117,27 @@ env.CppUnitTest( '$BUILD_DIR/mongo/db/repl/storage_interface_impl', '$BUILD_DIR/mongo/db/service_context_d_test_fixture', '$BUILD_DIR/mongo/util/signal_handlers', - 'cluster_server_parameter', 'cluster_server_parameter_op_observer', + 'cluster_server_parameter_test_parameter', + ], +) + +env.CppUnitTest( + target='cluster_server_parameter_initializer_test', + source=[ + 'cluster_server_parameter_initializer_test.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/db/auth/authmocks', + '$BUILD_DIR/mongo/db/change_stream_options_manager', + '$BUILD_DIR/mongo/db/repl/oplog', + '$BUILD_DIR/mongo/db/repl/oplog_interface_local', + '$BUILD_DIR/mongo/db/repl/replmocks', + '$BUILD_DIR/mongo/db/repl/storage_interface_impl', + '$BUILD_DIR/mongo/db/service_context_d_test_fixture', + '$BUILD_DIR/mongo/util/signal_handlers', + 'cluster_server_parameter_initializer', + 'cluster_server_parameter_test_parameter', ], ) diff --git a/src/mongo/idl/cluster_server_parameter_initializer.cpp b/src/mongo/idl/cluster_server_parameter_initializer.cpp new file mode 100644 index 00000000000..3205f7a4fa7 --- /dev/null +++ b/src/mongo/idl/cluster_server_parameter_initializer.cpp @@ -0,0 +1,159 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kControl + +#include "mongo/idl/cluster_server_parameter_initializer.h" + +#include "mongo/base/string_data.h" +#include "mongo/db/repl/replica_set_aware_service.h" +#include "mongo/db/service_context.h" +#include "mongo/logv2/log.h" + +namespace mongo { + +namespace { +const auto getInstance = ServiceContext::declareDecoration(); +const ReplicaSetAwareServiceRegistry::Registerer _registerer( + "ClusterServerParameterInitializerRegistry"); + +constexpr auto kIdField = "_id"_sd; +constexpr auto kCPTField = "clusterParameterTime"_sd; +constexpr auto kOplog = "oplog"_sd; + +} // namespace + +ClusterServerParameterInitializer* ClusterServerParameterInitializer::get(OperationContext* opCtx) { + return get(opCtx->getServiceContext()); +} + +ClusterServerParameterInitializer* ClusterServerParameterInitializer::get( + ServiceContext* serviceContext) { + return &getInstance(serviceContext); +} + +void ClusterServerParameterInitializer::updateParameter(BSONObj doc, StringData mode) { + auto nameElem = doc[kIdField]; + if (nameElem.type() != String) { + LOGV2_DEBUG(6226301, + 1, + "Update with invalid cluster server parameter name", + "mode"_attr = mode, + "_id"_attr = nameElem); + return; + } + + auto name = nameElem.valueStringData(); + auto* sp = ServerParameterSet::getClusterParameterSet()->getIfExists(name); + if (!sp) { + LOGV2_DEBUG(6226300, + 3, + "Update to unknown cluster server parameter", + "mode"_attr = mode, + "name"_attr = name); + return; + } + + auto cptElem = doc[kCPTField]; + if ((cptElem.type() != mongo::Date) && (cptElem.type() != bsonTimestamp)) { + LOGV2_DEBUG(6226302, + 1, + "Update to cluster server parameter has invalid clusterParameterTime", + "mode"_attr = mode, + "name"_attr = name, + "clusterParameterTime"_attr = cptElem); + return; + } + + uassertStatusOK(sp->set(doc)); +} + +void ClusterServerParameterInitializer::clearParameter(ServerParameter* sp) { + if (sp->getClusterParameterTime() == LogicalTime::kUninitialized) { + // Nothing to clear. + return; + } + + uassertStatusOK(sp->reset()); +} + +void ClusterServerParameterInitializer::clearParameter(StringData id) { + auto* sp = ServerParameterSet::getClusterParameterSet()->getIfExists(id); + if (!sp) { + LOGV2_DEBUG(6226303, + 5, + "oplog event deletion of unknown cluster server parameter", + "name"_attr = id); + return; + } + + clearParameter(sp); +} + +void ClusterServerParameterInitializer::clearAllParameters() { + const auto& params = ServerParameterSet::getClusterParameterSet()->getMap(); + for (const auto& it : params) { + clearParameter(it.second); + } +} + +void ClusterServerParameterInitializer::initializeAllParametersFromDisk(OperationContext* opCtx) { + doLoadAllParametersFromDisk(opCtx, "initializing"_sd, [this](BSONObj doc, StringData mode) { + updateParameter(doc, mode); + }); +} + +void ClusterServerParameterInitializer::resynchronizeAllParametersFromDisk( + OperationContext* opCtx) { + const auto& allParams = ServerParameterSet::getClusterParameterSet()->getMap(); + std::set unsetSettings; + for (const auto& it : allParams) { + unsetSettings.insert(it.second->name()); + } + + doLoadAllParametersFromDisk( + opCtx, "resynchronizing"_sd, [this, &unsetSettings](BSONObj doc, StringData mode) { + unsetSettings.erase(doc[kIdField].str()); + updateParameter(doc, mode); + }); + + // For all known settings which were not present in this resync, + // explicitly clear any value which may be present in-memory. + for (const auto& setting : unsetSettings) { + clearParameter(setting); + } +} + +void ClusterServerParameterInitializer::onInitialDataAvailable(OperationContext* opCtx, + bool isMajorityDataAvailable) { + LOGV2_INFO(6608200, "Initializing cluster server parameters from disk"); + initializeAllParametersFromDisk(opCtx); +} + +} // namespace mongo diff --git a/src/mongo/idl/cluster_server_parameter_initializer.h b/src/mongo/idl/cluster_server_parameter_initializer.h new file mode 100644 index 00000000000..0eb02bb058c --- /dev/null +++ b/src/mongo/idl/cluster_server_parameter_initializer.h @@ -0,0 +1,116 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/platform/basic.h" + +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/repl/replica_set_aware_service.h" + +namespace mongo { + +/** + * An interface that provides methods to manipulate in-memory cluster server parameter values in + * response to on-disk changes, specifically in a replica set context. + */ +class ClusterServerParameterInitializer + : public ReplicaSetAwareService { + ClusterServerParameterInitializer(const ClusterServerParameterInitializer&) = delete; + ClusterServerParameterInitializer& operator=(const ClusterServerParameterInitializer&) = delete; + +public: + ClusterServerParameterInitializer() = default; + ~ClusterServerParameterInitializer() = default; + + static ClusterServerParameterInitializer* get(OperationContext* opCtx); + static ClusterServerParameterInitializer* get(ServiceContext* serviceContext); + + void updateParameter(BSONObj doc, StringData mode); + void clearParameter(ServerParameter* sp); + void clearParameter(StringData id); + void clearAllParameters(); + + /** + * Used to initialize in-memory cluster parameter state based on the on-disk contents after + * startup recovery or initial sync is complete. + */ + void initializeAllParametersFromDisk(OperationContext* opCtx); + + /** + * Used on rollback and rename with drop. + * Updates settings which are present and clears settings which are not. + */ + void resynchronizeAllParametersFromDisk(OperationContext* opCtx); + + // Virtual methods coming from the ReplicaSetAwareService + void onStartup(OperationContext* opCtx) override final {} + + /** + * Called after startup recovery or initial sync is complete. + */ + void onInitialDataAvailable(OperationContext* opCtx, + bool isMajorityDataAvailable) override final; + void onShutdown() override final {} + void onStepUpBegin(OperationContext* opCtx, long long term) override final {} + void onStepUpComplete(OperationContext* opCtx, long long term) override final {} + void onStepDown() override final {} + void onBecomeArbiter() override final {} + +private: + template + void doLoadAllParametersFromDisk(OperationContext* opCtx, + StringData mode, + OnEntry onEntry) try { + std::vector failures; + + DBDirectClient client(opCtx); + FindCommandRequest findRequest{NamespaceString::kClusterParametersNamespace}; + client.find(std::move(findRequest), ReadPreferenceSetting{}, [&](BSONObj doc) { + try { + onEntry(doc, mode); + } catch (const DBException& ex) { + failures.push_back(ex.toStatus()); + } + }); + if (!failures.empty()) { + StringBuilder msg; + for (const auto& failure : failures) { + msg << failure.toString() << ", "; + } + msg.reset(msg.len() - 2); + uasserted(ErrorCodes::OperationFailed, msg.str()); + } + } catch (const DBException& ex) { + uassertStatusOK(ex.toStatus().withContext( + str::stream() << "Failed " << mode << " cluster server parameters from disk")); + } +}; + +} // namespace mongo diff --git a/src/mongo/idl/cluster_server_parameter_initializer_test.cpp b/src/mongo/idl/cluster_server_parameter_initializer_test.cpp new file mode 100644 index 00000000000..e8a62b6a1c1 --- /dev/null +++ b/src/mongo/idl/cluster_server_parameter_initializer_test.cpp @@ -0,0 +1,141 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kControl + +#include "mongo/platform/basic.h" + +#include "mongo/idl/cluster_server_parameter_test_util.h" + +#include "mongo/db/change_stream_options_manager.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/repl/replication_coordinator_mock.h" +#include "mongo/db/repl/storage_interface_mock.h" +#include "mongo/db/service_context_d_test_fixture.h" +#include "mongo/idl/cluster_server_parameter_gen.h" +#include "mongo/idl/cluster_server_parameter_initializer.h" +#include "mongo/idl/cluster_server_parameter_test_gen.h" +#include "mongo/idl/server_parameter.h" +#include "mongo/logv2/log.h" +#include "mongo/s/write_ops/batched_command_response.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { +using namespace cluster_server_parameter_test_util; + +class ClusterServerParameterInitializerTest : public ClusterServerParameterTestBase { +public: + void setUp() final { + ClusterServerParameterTestBase::setUp(); + + // Insert a document on-disk for ClusterServerParameterTest. This should be loaded in-memory + // by the initializer during startup recovery and at the end of initial sync. + Timestamp now(time(nullptr)); + const auto doc = + makeClusterParametersDoc(LogicalTime(now), kInitialIntValue, kInitialStrValue); + + upsert(doc); + } + + void tearDown() final { + // Delete all cluster server parameter documents written and refresh in-memory state. + remove(); + auto opCtx = cc().makeOperationContext(); + _initializer.resynchronizeAllParametersFromDisk(opCtx.get()); + } + /** + * Simulates the call to the ClusterServerParameterInitializer at the end of initial sync, when + * data is available but is not guaranteed to be majority committed. + */ + void doInitialSync() { + auto opCtx = cc().makeOperationContext(); + _initializer.onInitialDataAvailable(opCtx.get(), false /* isMajorityDataAvailable */); + } + + /** + * Simulates the call to the ClusterServerParameterInitializer at the end of startup recovery, + * when we expect to see majority committed data on-disk. + */ + void doStartupRecovery() { + auto opCtx = cc().makeOperationContext(); + _initializer.onInitialDataAvailable(opCtx.get(), true /* isMajorityDataAvailable */); + } + +protected: + ClusterServerParameterInitializer _initializer; +}; + +TEST_F(ClusterServerParameterInitializerTest, OnInitialSync) { + // Retrieve the in-memory test cluster server parameter and ensure it's set to the default + // value. + auto* sp = ServerParameterSet::getClusterParameterSet() + ->get>(kCSPTest); + ASSERT(sp != nullptr); + ClusterServerParameterTest cspTest = sp->getValue(); + ASSERT_EQ(cspTest.getIntValue(), kDefaultIntValue); + ASSERT_EQ(cspTest.getStrValue(), kDefaultStrValue); + + // Indicate that data is available at the end of initial sync and check that the in-memory data + // is updated. + doInitialSync(); + sp = ServerParameterSet::getClusterParameterSet() + ->get>(kCSPTest); + ASSERT(sp != nullptr); + cspTest = sp->getValue(); + ASSERT_EQ(cspTest.getIntValue(), kInitialIntValue); + ASSERT_EQ(cspTest.getStrValue(), kInitialStrValue); +} + +TEST_F(ClusterServerParameterInitializerTest, OnStartupRecovery) { + // Retrieve the test cluster server parameter and ensure it's set to the default value. + auto* sp = ServerParameterSet::getClusterParameterSet() + ->get>(kCSPTest); + ASSERT(sp != nullptr); + ClusterServerParameterTest cspTest = sp->getValue(); + ASSERT_EQ(cspTest.getIntValue(), kDefaultIntValue); + ASSERT_EQ(cspTest.getStrValue(), kDefaultStrValue); + + // Indicate that data is available at the end of startup recovery and check that the in-memory + // data is updated. + doStartupRecovery(); + sp = ServerParameterSet::getClusterParameterSet() + ->get>(kCSPTest); + ASSERT(sp != nullptr); + cspTest = sp->getValue(); + ASSERT_EQ(cspTest.getIntValue(), kInitialIntValue); + ASSERT_EQ(cspTest.getStrValue(), kInitialStrValue); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/idl/cluster_server_parameter_op_observer.cpp b/src/mongo/idl/cluster_server_parameter_op_observer.cpp index de195a75ab1..b8787dcfa12 100644 --- a/src/mongo/idl/cluster_server_parameter_op_observer.cpp +++ b/src/mongo/idl/cluster_server_parameter_op_observer.cpp @@ -34,13 +34,14 @@ #include #include "mongo/db/dbdirectclient.h" +#include "mongo/idl/cluster_server_parameter_initializer.h" #include "mongo/logv2/log.h" namespace mongo { namespace { constexpr auto kIdField = "_id"_sd; -constexpr auto kCPTField = "clusterParameterTime"_sd; +constexpr auto kOplog = "oplog"_sd; /** * Per-operation scratch space indicating the document being deleted. @@ -53,127 +54,8 @@ bool isConfigNamespace(const NamespaceString& nss) { return nss == NamespaceString::kClusterParametersNamespace; } -constexpr auto kOplog = "oplog"_sd; -void updateParameter(BSONObj doc, StringData mode) { - auto nameElem = doc[kIdField]; - if (nameElem.type() != String) { - LOGV2_DEBUG(6226301, - 1, - "Update with invalid cluster server parameter name", - "mode"_attr = mode, - "_id"_attr = nameElem); - return; - } - - auto name = doc[kIdField].valueStringData(); - auto* sp = ServerParameterSet::getClusterParameterSet()->getIfExists(name); - if (!sp) { - LOGV2_DEBUG(6226300, - 3, - "Update to unknown cluster server parameter", - "mode"_attr = mode, - "name"_attr = name); - return; - } - - auto cptElem = doc[kCPTField]; - if ((cptElem.type() != mongo::Date) && (cptElem.type() != bsonTimestamp)) { - LOGV2_DEBUG(6226302, - 1, - "Update to cluster server parameter has invalid clusterParameterTime", - "mode"_attr = mode, - "name"_attr = name, - "clusterParameterTime"_attr = cptElem); - return; - } - - uassertStatusOK(sp->set(doc)); -} - -void clearParameter(ServerParameter* sp) { - if (sp->getClusterParameterTime() == LogicalTime::kUninitialized) { - // Nothing to clear. - return; - } - - uassertStatusOK(sp->reset()); -} - -void clearParameter(StringData id) { - auto* sp = ServerParameterSet::getClusterParameterSet()->getIfExists(id); - if (!sp) { - LOGV2_DEBUG(6226303, - 5, - "oplog event deletion of unknown cluster server parameter", - "name"_attr = id); - return; - } - - clearParameter(sp); -} - -void clearAllParameters() { - const auto& params = ServerParameterSet::getClusterParameterSet()->getMap(); - for (const auto& it : params) { - clearParameter(it.second); - } -} - -template -void doLoadAllParametersFromDisk(OperationContext* opCtx, StringData mode, OnEntry onEntry) try { - std::vector failures; - - DBDirectClient client(opCtx); - FindCommandRequest findRequest{NamespaceString::kClusterParametersNamespace}; - client.find(std::move(findRequest), ReadPreferenceSetting{}, [&](BSONObj doc) { - try { - onEntry(doc, mode); - } catch (const DBException& ex) { - failures.push_back(ex.toStatus()); - } - }); - if (!failures.empty()) { - StringBuilder msg; - for (const auto& failure : failures) { - msg << failure.toString() << ", "; - } - msg.reset(msg.len() - 2); - uasserted(ErrorCodes::OperationFailed, msg.str()); - } -} catch (const DBException& ex) { - uassertStatusOK(ex.toStatus().withContext( - str::stream() << "Failed " << mode << " cluster server parameters from disk")); -} - -/** - * Used on rollback and rename with drop. - * Updates settings which are present and clears settings which are not. - */ -void resynchronizeAllParametersFromDisk(OperationContext* opCtx) { - const auto& allParams = ServerParameterSet::getClusterParameterSet()->getMap(); - std::set unsetSettings; - for (const auto& it : allParams) { - unsetSettings.insert(it.second->name()); - } - - doLoadAllParametersFromDisk( - opCtx, "resynchronizing"_sd, [&unsetSettings](BSONObj doc, StringData mode) { - unsetSettings.erase(doc[kIdField].str()); - updateParameter(doc, mode); - }); - - // For all known settings which were not present in this resync, - // explicitly clear any value which may be present in-memory. - for (const auto& setting : unsetSettings) { - clearParameter(setting); - } -} } // namespace -void ClusterServerParameterOpObserver::initializeAllParametersFromDisk(OperationContext* opCtx) { - doLoadAllParametersFromDisk(opCtx, "initializing"_sd, updateParameter); -} - void ClusterServerParameterOpObserver::onInserts(OperationContext* opCtx, const NamespaceString& nss, const UUID& uuid, @@ -185,7 +67,7 @@ void ClusterServerParameterOpObserver::onInserts(OperationContext* opCtx, } for (auto it = first; it != last; ++it) { - updateParameter(it->doc, kOplog); + ClusterServerParameterInitializer::get(opCtx)->updateParameter(it->doc, kOplog); } } @@ -196,7 +78,7 @@ void ClusterServerParameterOpObserver::onUpdate(OperationContext* opCtx, return; } - updateParameter(updatedDoc, kOplog); + ClusterServerParameterInitializer::get(opCtx)->updateParameter(updatedDoc, kOplog); } void ClusterServerParameterOpObserver::aboutToDelete(OperationContext* opCtx, @@ -233,7 +115,7 @@ void ClusterServerParameterOpObserver::onDelete(OperationContext* opCtx, const OplogDeleteEntryArgs& args) { const auto& docName = aboutToDeleteDoc(opCtx); if (!docName.empty()) { - clearParameter(docName); + ClusterServerParameterInitializer::get(opCtx)->clearParameter(docName); } } @@ -241,7 +123,7 @@ void ClusterServerParameterOpObserver::onDropDatabase(OperationContext* opCtx, const std::string& dbName) { if (dbName == NamespaceString::kConfigDb) { // Entire config DB deleted, reset to default state. - clearAllParameters(); + ClusterServerParameterInitializer::get(opCtx)->clearAllParameters(); } } @@ -253,7 +135,7 @@ repl::OpTime ClusterServerParameterOpObserver::onDropCollection( CollectionDropType dropType) { if (isConfigNamespace(collectionName)) { // Entire collection deleted, reset to default state. - clearAllParameters(); + ClusterServerParameterInitializer::get(opCtx)->clearAllParameters(); } return {}; @@ -268,17 +150,18 @@ void ClusterServerParameterOpObserver::postRenameCollection( bool stayTemp) { if (isConfigNamespace(fromCollection)) { // Same as collection dropped from a config point of view. - clearAllParameters(); + ClusterServerParameterInitializer::get(opCtx)->clearAllParameters(); } if (isConfigNamespace(toCollection)) { // Potentially many documents now set, perform full scan. if (dropTargetUUID) { // Possibly lost configurations in overwrite. - resynchronizeAllParametersFromDisk(opCtx); + ClusterServerParameterInitializer::get(opCtx)->resynchronizeAllParametersFromDisk( + opCtx); } else { // Collection did not exist prior to rename. - initializeAllParametersFromDisk(opCtx); + ClusterServerParameterInitializer::get(opCtx)->initializeAllParametersFromDisk(opCtx); } } } @@ -294,7 +177,7 @@ void ClusterServerParameterOpObserver::onImportCollection(OperationContext* opCt if (!isDryRun && (numRecords > 0) && isConfigNamespace(nss)) { // Something was imported, do a full collection scan to sync up. // No need to apply rollback rules since nothing will have been deleted. - initializeAllParametersFromDisk(opCtx); + ClusterServerParameterInitializer::get(opCtx)->initializeAllParametersFromDisk(opCtx); } } @@ -303,7 +186,7 @@ void ClusterServerParameterOpObserver::_onReplicationRollback(OperationContext* if (rbInfo.rollbackNamespaces.count(NamespaceString::kClusterParametersNamespace)) { // Some kind of rollback happend in the settings collection. // Just reload from disk to be safe. - resynchronizeAllParametersFromDisk(opCtx); + ClusterServerParameterInitializer::get(opCtx)->resynchronizeAllParametersFromDisk(opCtx); } } diff --git a/src/mongo/idl/cluster_server_parameter_op_observer.h b/src/mongo/idl/cluster_server_parameter_op_observer.h index 530a5e63148..2ef05729e39 100644 --- a/src/mongo/idl/cluster_server_parameter_op_observer.h +++ b/src/mongo/idl/cluster_server_parameter_op_observer.h @@ -47,15 +47,6 @@ namespace mongo { */ class ClusterServerParameterOpObserver final : public OpObserver { public: - /** - * Used on start, import, and rename without drop. - * No need to clear any documents not in the current set, - * since the load is purely additive. - * - * Should only be invoked directly from mongod_main. - */ - static void initializeAllParametersFromDisk(OperationContext*); - // Interface methods. void onInserts(OperationContext* opCtx, diff --git a/src/mongo/idl/cluster_server_parameter_op_observer_test.cpp b/src/mongo/idl/cluster_server_parameter_op_observer_test.cpp index a55a88980c0..2ba82076bf2 100644 --- a/src/mongo/idl/cluster_server_parameter_op_observer_test.cpp +++ b/src/mongo/idl/cluster_server_parameter_op_observer_test.cpp @@ -31,138 +31,22 @@ #include "mongo/platform/basic.h" -#include "mongo/db/change_stream_options_manager.h" -#include "mongo/db/dbdirectclient.h" -#include "mongo/db/repl/oplog.h" -#include "mongo/db/repl/oplog_interface_local.h" -#include "mongo/db/repl/repl_client_info.h" -#include "mongo/db/repl/replication_coordinator_mock.h" -#include "mongo/db/repl/storage_interface_mock.h" -#include "mongo/db/service_context_d_test_fixture.h" -#include "mongo/idl/cluster_server_parameter_gen.h" +#include "mongo/idl/cluster_server_parameter_test_util.h" + #include "mongo/idl/cluster_server_parameter_op_observer.h" -#include "mongo/idl/cluster_server_parameter_test_gen.h" -#include "mongo/idl/server_parameter.h" #include "mongo/logv2/log.h" -#include "mongo/s/write_ops/batched_command_response.h" -#include "mongo/unittest/unittest.h" namespace mongo { namespace { +using namespace cluster_server_parameter_test_util; const std::vector kIgnoredNamespaces = { NamespaceString("config"_sd, "settings"_sd), NamespaceString("local"_sd, "clusterParameters"_sd), NamespaceString("test"_sd, "foo"_sd)}; -constexpr auto kCSPTest = "cspTest"_sd; -const auto kNilCPT = LogicalTime::kUninitialized; - -constexpr auto kConfigDB = "config"_sd; -constexpr auto kClusterParametersColl = "clusterParameters"_sd; -const NamespaceString kClusterParametersNS(kConfigDB, kClusterParametersColl); - -void upsert(BSONObj doc) { - const auto kMajorityWriteConcern = BSON("writeConcern" << BSON("w" - << "majority")); - - auto uniqueOpCtx = cc().makeOperationContext(); - auto* opCtx = uniqueOpCtx.get(); - - BSONObj res; - DBDirectClient client(opCtx); - - client.runCommand(kConfigDB.toString(), - [&] { - write_ops::UpdateCommandRequest updateOp(kClusterParametersNS); - updateOp.setUpdates({[&] { - write_ops::UpdateOpEntry entry; - entry.setQ(BSON(ClusterServerParameter::k_idFieldName << kCSPTest)); - entry.setU(write_ops::UpdateModification::parseFromClassicUpdate( - BSON("$set" << doc))); - entry.setMulti(false); - entry.setUpsert(true); - return entry; - }()}); - return updateOp.toBSON(kMajorityWriteConcern); - }(), - res); - - BatchedCommandResponse response; - std::string errmsg; - if (!response.parseBSON(res, &errmsg)) { - uasserted(ErrorCodes::FailedToParse, str::stream() << "Failure: " << errmsg); - } - - uassertStatusOK(response.toStatus()); - uassert(ErrorCodes::OperationFailed, "No documents upserted", response.getN()); -} - -void remove() { - auto uniqueOpCtx = cc().makeOperationContext(); - auto* opCtx = uniqueOpCtx.get(); - - BSONObj res; - DBDirectClient(opCtx).runCommand( - kConfigDB.toString(), - [] { - write_ops::DeleteCommandRequest deleteOp(kClusterParametersNS); - deleteOp.setDeletes({[] { - write_ops::DeleteOpEntry entry; - entry.setQ(BSON(ClusterServerParameter::k_idFieldName << kCSPTest)); - entry.setMulti(true); - return entry; - }()}); - return deleteOp.toBSON({}); - }(), - res); - - BatchedCommandResponse response; - std::string errmsg; - if (!response.parseBSON(res, &errmsg)) { - uasserted(ErrorCodes::FailedToParse, - str::stream() << "Failed to parse reply to delete command: " << errmsg); - } - uassertStatusOK(response.toStatus()); -} - -BSONObj makeClusterParametersDoc(const LogicalTime& cpTime, int intValue, StringData strValue) { - ClusterServerParameter csp; - csp.set_id(kCSPTest); - csp.setClusterParameterTime(cpTime); - - ClusterServerParameterTest cspt; - cspt.setClusterServerParameter(std::move(csp)); - cspt.setIntValue(intValue); - cspt.setStrValue(strValue); - - return cspt.toBSON(); -} - -class ClusterServerParameterOpObserverTest : public ServiceContextMongoDTest { +class ClusterServerParameterOpObserverTest : public ClusterServerParameterTestBase { public: - void setUp() final { - // Set up mongod. - ServiceContextMongoDTest::setUp(); - - auto service = getServiceContext(); - auto opCtx = cc().makeOperationContext(); - repl::StorageInterface::set(service, std::make_unique()); - - // Set up ReplicationCoordinator and create oplog. - repl::ReplicationCoordinator::set( - service, - std::make_unique(service, createReplSettings())); - repl::createOplog(opCtx.get()); - - // Set up the ChangeStreamOptionsManager so that it can be retrieved/set. - ChangeStreamOptionsManager::create(service); - - // Ensure that we are primary. - auto replCoord = repl::ReplicationCoordinator::get(opCtx.get()); - ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_PRIMARY)); - } - void doInserts(const NamespaceString& nss, std::initializer_list docs) { std::vector stmts; std::transform(docs.begin(), docs.end(), std::back_inserter(stmts), [](auto doc) { @@ -246,18 +130,13 @@ public: ASSERT_EQ(finalCspTest.getStrValue(), initialCspTest.getStrValue()); } - static constexpr auto kInitialIntValue = 123; - static constexpr auto kDefaultIntValue = 42; - static constexpr auto kInitialStrValue = "initialState"_sd; - static constexpr auto kDefaultStrValue = ""_sd; - BSONObj initializeState() { Timestamp now(time(nullptr)); const auto doc = makeClusterParametersDoc(LogicalTime(now), kInitialIntValue, kInitialStrValue); upsert(doc); - doInserts(kClusterParametersNS, {doc}); + doInserts(NamespaceString::kClusterParametersNamespace, {doc}); auto* sp = ServerParameterSet::getClusterParameterSet() ->get void assertIgnoredAlways(F fn) { assertIgnoredOtherNamespaces(fn); - assertIgnored(kClusterParametersNS, fn); - } - -private: - static repl::ReplSettings createReplSettings() { - repl::ReplSettings settings; - settings.setOplogSizeBytes(5 * 1024 * 1024); - settings.setReplSetString("mySet/node1:12345"); - return settings; + assertIgnored(NamespaceString::kClusterParametersNamespace, fn); } protected: @@ -311,7 +182,7 @@ TEST_F(ClusterServerParameterOpObserverTest, OnInsertRecord) { const auto singleStrValue = "OnInsertRecord.single"; ASSERT_LT(initialLogicalTime, singleLogicalTime); - doInserts(kClusterParametersNS, + doInserts(NamespaceString::kClusterParametersNamespace, {makeClusterParametersDoc(singleLogicalTime, singleIntValue, singleStrValue)}); ClusterServerParameterTest cspTest = sp->getValue(); @@ -325,7 +196,7 @@ TEST_F(ClusterServerParameterOpObserverTest, OnInsertRecord) { const auto multiStrValue = "OnInsertRecord.multi"; ASSERT_LT(singleLogicalTime, multiLogicalTime); - doInserts(kClusterParametersNS, + doInserts(NamespaceString::kClusterParametersNamespace, { BSON(ClusterServerParameter::k_idFieldName << "ignored"), makeClusterParametersDoc(multiLogicalTime, multiIntValue, multiStrValue), @@ -379,7 +250,7 @@ TEST_F(ClusterServerParameterOpObserverTest, OnUpdateRecord) { const auto singleStrValue = "OnUpdateRecord.single"; ASSERT_LT(initialLogicalTime, singleLogicalTime); - doUpdate(kClusterParametersNS, + doUpdate(NamespaceString::kClusterParametersNamespace, makeClusterParametersDoc(singleLogicalTime, singleIntValue, singleStrValue)); ClusterServerParameterTest cspTest = sp->getValue(); @@ -416,7 +287,7 @@ TEST_F(ClusterServerParameterOpObserverTest, onDeleteRecord) { }); // Reset configuration to defaults when we claim to have deleted the doc. - doDelete(kClusterParametersNS, initialDoc); + doDelete(NamespaceString::kClusterParametersNamespace, initialDoc); ClusterServerParameterTest cspTest = sp->getValue(); ASSERT_EQ(cspTest.getIntValue(), kDefaultIntValue); ASSERT_EQ(cspTest.getStrValue(), kDefaultStrValue); @@ -424,7 +295,7 @@ TEST_F(ClusterServerParameterOpObserverTest, onDeleteRecord) { // Restore configured state, and delete without including deleteDoc reference. initializeState(); - doDelete(kClusterParametersNS, initialDoc, false); + doDelete(NamespaceString::kClusterParametersNamespace, initialDoc, false); cspTest = sp->getValue(); ASSERT_EQ(cspTest.getIntValue(), kDefaultIntValue); ASSERT_EQ(cspTest.getStrValue(), kDefaultStrValue); @@ -471,13 +342,13 @@ TEST_F(ClusterServerParameterOpObserverTest, onRenameCollection) { // since the rename away doesn't require a rescan. // Rename away (and reset to default) - doRenameCollection(kClusterParametersNS, kTestFoo); + doRenameCollection(NamespaceString::kClusterParametersNamespace, kTestFoo); ClusterServerParameterTest cspTest = sp->getValue(); ASSERT_EQ(cspTest.getIntValue(), kDefaultIntValue); ASSERT_EQ(cspTest.getStrValue(), kDefaultStrValue); // Rename in (and restore to initialized state) - doRenameCollection(kTestFoo, kClusterParametersNS); + doRenameCollection(kTestFoo, NamespaceString::kClusterParametersNamespace); cspTest = sp->getValue(); ASSERT_EQ(cspTest.getIntValue(), kInitialIntValue); ASSERT_EQ(cspTest.getStrValue(), kInitialStrValue); @@ -499,7 +370,7 @@ TEST_F(ClusterServerParameterOpObserverTest, onImportCollection) { auto doc = makeClusterParametersDoc(LogicalTime(Timestamp(time(nullptr))), 333, "onImportCollection"); upsert(doc); - doImportCollection(kClusterParametersNS); + doImportCollection(NamespaceString::kClusterParametersNamespace); ClusterServerParameterTest cspTest = sp->getValue(); ASSERT_EQ(cspTest.getIntValue(), 333); ASSERT_EQ(cspTest.getStrValue(), "onImportCollection"); @@ -526,7 +397,7 @@ TEST_F(ClusterServerParameterOpObserverTest, onReplicationRollback) { // Trigger rollback of relevant namespace. remove(); - doReplicationRollback({kClusterParametersNS}); + doReplicationRollback({NamespaceString::kClusterParametersNamespace}); cspTest = sp->getValue(); ASSERT_EQ(cspTest.getIntValue(), kDefaultIntValue); ASSERT_EQ(cspTest.getStrValue(), kDefaultStrValue); @@ -536,7 +407,7 @@ TEST_F(ClusterServerParameterOpObserverTest, onReplicationRollback) { LogicalTime(Timestamp(time(nullptr))), 444, "onReplicationRollback"); upsert(doc); cspTest = sp->getValue(); - doReplicationRollback({kClusterParametersNS}); + doReplicationRollback({NamespaceString::kClusterParametersNamespace}); ASSERT_EQ(cspTest.getIntValue(), kDefaultIntValue); ASSERT_EQ(cspTest.getStrValue(), kDefaultStrValue); } diff --git a/src/mongo/idl/cluster_server_parameter_test_util.h b/src/mongo/idl/cluster_server_parameter_test_util.h new file mode 100644 index 00000000000..095dbf213b3 --- /dev/null +++ b/src/mongo/idl/cluster_server_parameter_test_util.h @@ -0,0 +1,168 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/platform/basic.h" + +#include "mongo/db/change_stream_options_manager.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/repl/replication_coordinator_mock.h" +#include "mongo/db/repl/storage_interface_mock.h" +#include "mongo/db/service_context_d_test_fixture.h" +#include "mongo/idl/cluster_server_parameter_gen.h" +#include "mongo/idl/cluster_server_parameter_test_gen.h" +#include "mongo/s/write_ops/batched_command_response.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace cluster_server_parameter_test_util { + +constexpr auto kCSPTest = "cspTest"_sd; +constexpr auto kConfigDB = "config"_sd; +const auto kNilCPT = LogicalTime::kUninitialized; + +void upsert(BSONObj doc) { + const auto kMajorityWriteConcern = BSON("writeConcern" << BSON("w" + << "majority")); + + auto uniqueOpCtx = cc().makeOperationContext(); + auto* opCtx = uniqueOpCtx.get(); + + BSONObj res; + DBDirectClient client(opCtx); + + client.runCommand( + kConfigDB.toString(), + [&] { + write_ops::UpdateCommandRequest updateOp(NamespaceString::kClusterParametersNamespace); + updateOp.setUpdates({[&] { + write_ops::UpdateOpEntry entry; + entry.setQ(BSON(ClusterServerParameter::k_idFieldName << kCSPTest)); + entry.setU( + write_ops::UpdateModification::parseFromClassicUpdate(BSON("$set" << doc))); + entry.setMulti(false); + entry.setUpsert(true); + return entry; + }()}); + return updateOp.toBSON(kMajorityWriteConcern); + }(), + res); + + BatchedCommandResponse response; + std::string errmsg; + if (!response.parseBSON(res, &errmsg)) { + uasserted(ErrorCodes::FailedToParse, str::stream() << "Failure: " << errmsg); + } + + uassertStatusOK(response.toStatus()); + uassert(ErrorCodes::OperationFailed, "No documents upserted", response.getN()); +} + +void remove() { + auto uniqueOpCtx = cc().makeOperationContext(); + auto* opCtx = uniqueOpCtx.get(); + + BSONObj res; + DBDirectClient(opCtx).runCommand( + kConfigDB.toString(), + [] { + write_ops::DeleteCommandRequest deleteOp(NamespaceString::kClusterParametersNamespace); + deleteOp.setDeletes({[] { + write_ops::DeleteOpEntry entry; + entry.setQ(BSON(ClusterServerParameter::k_idFieldName << kCSPTest)); + entry.setMulti(true); + return entry; + }()}); + return deleteOp.toBSON({}); + }(), + res); + + BatchedCommandResponse response; + std::string errmsg; + if (!response.parseBSON(res, &errmsg)) { + uasserted(ErrorCodes::FailedToParse, + str::stream() << "Failed to parse reply to delete command: " << errmsg); + } + uassertStatusOK(response.toStatus()); +} + +BSONObj makeClusterParametersDoc(const LogicalTime& cpTime, int intValue, StringData strValue) { + ClusterServerParameter csp; + csp.set_id(kCSPTest); + csp.setClusterParameterTime(cpTime); + + ClusterServerParameterTest cspt; + cspt.setClusterServerParameter(std::move(csp)); + cspt.setIntValue(intValue); + cspt.setStrValue(strValue); + + return cspt.toBSON(); +} + +class ClusterServerParameterTestBase : public ServiceContextMongoDTest { +public: + virtual void setUp() override { + // Set up mongod. + ServiceContextMongoDTest::setUp(); + + auto service = getServiceContext(); + auto opCtx = cc().makeOperationContext(); + repl::StorageInterface::set(service, std::make_unique()); + + // Set up ReplicationCoordinator and create oplog. + repl::ReplicationCoordinator::set( + service, + std::make_unique(service, createReplSettings())); + repl::createOplog(opCtx.get()); + + // Set up the ChangeStreamOptionsManager so that it can be retrieved/set. + ChangeStreamOptionsManager::create(service); + + // Ensure that we are primary. + auto replCoord = repl::ReplicationCoordinator::get(opCtx.get()); + ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_PRIMARY)); + } + + static constexpr auto kInitialIntValue = 123; + static constexpr auto kDefaultIntValue = 42; + static constexpr auto kInitialStrValue = "initialState"_sd; + static constexpr auto kDefaultStrValue = ""_sd; + +private: + static repl::ReplSettings createReplSettings() { + repl::ReplSettings settings; + settings.setOplogSizeBytes(5 * 1024 * 1024); + settings.setReplSetString("mySet/node1:12345"); + return settings; + } +}; + +} // namespace cluster_server_parameter_test_util +} // namespace mongo -- cgit v1.2.1