diff options
author | Henrik Edin <henrik.edin@mongodb.com> | 2017-12-29 14:24:02 -0500 |
---|---|---|
committer | Henrik Edin <henrik.edin@mongodb.com> | 2018-01-31 09:34:20 -0500 |
commit | e8596ec81cd0448310e40d9c2af8f8161dae3965 (patch) | |
tree | 9b62fa4ec5fa960bb57356a961a1a03aac341ecc /src | |
parent | f63ccea7c22204ddbacbb93c3aeecc8f002696cd (diff) | |
download | mongo-e8596ec81cd0448310e40d9c2af8f8161dae3965.tar.gz |
SERVER-32489 New main library for embedded to link and initialize the system with. Replaces mongodmain and allow us to start reducing dependencies. Embedded types for service_context and service_entry_point.
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/SConscript | 17 | ||||
-rw-r--r-- | src/mongo/client/embedded/SConscript | 72 | ||||
-rw-r--r-- | src/mongo/client/embedded/embedded.cpp | 334 | ||||
-rw-r--r-- | src/mongo/client/embedded/embedded.h | 38 | ||||
-rw-r--r-- | src/mongo/client/embedded/libmongodbcapi.cpp | 17 | ||||
-rw-r--r-- | src/mongo/client/embedded/service_context_embedded.cpp | 303 | ||||
-rw-r--r-- | src/mongo/client/embedded/service_context_embedded.h | 90 | ||||
-rw-r--r-- | src/mongo/client/embedded/service_entry_point_embedded.cpp | 1134 | ||||
-rw-r--r-- | src/mongo/client/embedded/service_entry_point_embedded.h | 47 | ||||
-rw-r--r-- | src/mongo/db/SConscript | 20 | ||||
-rw-r--r-- | src/mongo/db/commands/SConscript | 3 | ||||
-rw-r--r-- | src/mongo/db/db.cpp | 440 | ||||
-rw-r--r-- | src/mongo/db/exec/SConscript | 3 | ||||
-rw-r--r-- | src/mongo/db/query/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/db/repair_database_and_check_version.cpp | 525 | ||||
-rw-r--r-- | src/mongo/db/repair_database_and_check_version.h | 41 | ||||
-rw-r--r-- | src/mongo/db/repl/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/storage/mmap_v1/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/storage/wiredtiger/SConscript | 1 |
19 files changed, 2635 insertions, 454 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript index f60ec342e3d..2a4fd413d46 100644 --- a/src/mongo/SConscript +++ b/src/mongo/SConscript @@ -274,10 +274,22 @@ generateConfigHeaderFile = env.Substfile( env.Alias('generated-sources', generateConfigHeaderFile) env.Library( + target="mongod_options_init", + source=[ + "db/mongod_options_init.cpp", + ], + LIBDEPS=[ + 'base', + ], + LIBDEPS_PRIVATE=[ + 'db/mongod_options', + ] +) + +env.Library( target="mongodmain", source=[ "db/db.cpp", - "db/mongod_options_init.cpp", ], LIBDEPS_PRIVATE=[ 'db/catalog/health_log', @@ -290,11 +302,11 @@ env.Library( 'db/kill_sessions_local', 'db/keys_collection_client_direct', 'db/logical_session_cache_factory_mongod', - 'db/mongod_options', 'db/mongodandmongos', 'db/op_observer_d', 'db/query_exec', 'db/repair_database', + 'db/repair_database_and_check_version', 'db/repl/repl_set_commands', 'db/repl/storage_interface_impl', 'db/repl/topology_coordinator', @@ -305,6 +317,7 @@ env.Library( 'db/system_index', 'db/ttl_d', 'executor/network_interface_factory', + 'mongod_options_init', 'rpc/rpc', 's/catalog/sharding_catalog_manager', 's/commands/shared_cluster_commands', diff --git a/src/mongo/client/embedded/SConscript b/src/mongo/client/embedded/SConscript index 904e1c826a2..57f9fd9cc48 100644 --- a/src/mongo/client/embedded/SConscript +++ b/src/mongo/client/embedded/SConscript @@ -4,17 +4,83 @@ Import("env") env = env.Clone() +env.Library( + target="service_entry_point_embedded", + source=[ + "service_entry_point_embedded.cpp", + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/transport/service_entry_point', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/auth/authcore', + '$BUILD_DIR/mongo/db/auth/authmongod', + '$BUILD_DIR/mongo/db/ops/write_ops_exec', + '$BUILD_DIR/mongo/db/rw_concern_d', + '$BUILD_DIR/mongo/db/s/sharding', + '$BUILD_DIR/mongo/db/storage/storage_engine_lock_file', + '$BUILD_DIR/mongo/db/storage/storage_engine_metadata', + ], +) + +env.Library( + target="service_context_embedded", + source=[ + "service_context_embedded.cpp", + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/db/service_context', + '$BUILD_DIR/mongo/transport/service_entry_point', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/concurrency/lock_manager', + '$BUILD_DIR/mongo/db/storage/storage_engine_lock_file', + '$BUILD_DIR/mongo/db/storage/storage_engine_metadata', + '$BUILD_DIR/mongo/db/storage/storage_options', + '$BUILD_DIR/mongo/util/options_parser/options_parser_init', + 'service_entry_point_embedded', + ], +) + +env.Library( + target='embedded', + source=[ + 'embedded.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/authz_manager_external_state_factory_d', + '$BUILD_DIR/mongo/db/catalog/catalog_impl', + '$BUILD_DIR/mongo/db/commands/dcommands', + '$BUILD_DIR/mongo/db/dbdirectclient', + '$BUILD_DIR/mongo/db/mongod_options', + '$BUILD_DIR/mongo/db/op_observer_d', + '$BUILD_DIR/mongo/db/repair_database_and_check_version', + '$BUILD_DIR/mongo/db/repl/storage_interface_impl', + '$BUILD_DIR/mongo/db/repl/repl_coordinator_impl', + '$BUILD_DIR/mongo/db/startup_warnings_mongod', + '$BUILD_DIR/mongo/db/storage/wiredtiger/storage_wiredtiger', + '$BUILD_DIR/mongo/mongod_options_init', + '$BUILD_DIR/mongo/util/net/network', + '$BUILD_DIR/mongo/util/version_impl', + 'service_context_embedded', + 'service_entry_point_embedded', + ] +) + capi = env.Library( target='mongo_embedded_capi', source=[ 'libmongodbcapi.cpp', ], LIBDEPS_PRIVATE=[ - '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/service_context', - '$BUILD_DIR/mongo/mongodmain', '$BUILD_DIR/mongo/transport/transport_layer_mock', - '$BUILD_DIR/mongo/util/net/network', + 'embedded', ] ) diff --git a/src/mongo/client/embedded/embedded.cpp b/src/mongo/client/embedded/embedded.cpp new file mode 100644 index 00000000000..cb1c5870189 --- /dev/null +++ b/src/mongo/client/embedded/embedded.cpp @@ -0,0 +1,334 @@ +/** +* Copyright (C) 2018 MongoDB Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +* +* As a special exception, the copyright holders give permission to link the +* code of portions of this program with the OpenSSL library under certain +* conditions as described in each individual source file and distribute +* linked combinations including the program with the OpenSSL library. You +* must comply with the GNU Affero General Public License in all respects +* for all of the code used other than as permitted herein. If you modify +* file(s) with this exception, you may extend this exception to your +* version of the file(s), but you are not obligated to do so. If you do not +* wish to do so, delete this exception statement from your version. If you +* delete this exception statement from all source files in the program, +* then also delete it in the license file. +*/ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage + +#include "mongo/platform/basic.h" + +#include "embedded.h" + +#include "mongo/base/checked_cast.h" +#include "mongo/base/initializer.h" +#include "mongo/client/embedded/service_context_embedded.h" +#include "mongo/client/embedded/service_entry_point_embedded.h" +#include "mongo/config.h" +#include "mongo/db/catalog/health_log.h" +#include "mongo/db/catalog/uuid_catalog.h" +#include "mongo/db/client.h" +#include "mongo/db/commands/feature_compatibility_version.h" +#include "mongo/db/concurrency/lock_state.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/index_rebuilder.h" +#include "mongo/db/kill_sessions_local.h" +#include "mongo/db/log_process_details.h" +#include "mongo/db/mongod_options.h" +#include "mongo/db/op_observer_impl.h" +#include "mongo/db/op_observer_registry.h" +#include "mongo/db/repair_database_and_check_version.h" +#include "mongo/db/repl/replication_coordinator_impl.h" +#include "mongo/db/repl/storage_interface_impl.h" +#include "mongo/db/session_catalog.h" +#include "mongo/db/session_killer.h" +#include "mongo/db/startup_warnings_mongod.h" +#include "mongo/db/storage/encryption_hooks.h" +#include "mongo/db/ttl.h" +#include "mongo/logger/log_component.h" +#include "mongo/scripting/dbdirectclient_factory.h" +#include "mongo/scripting/engine.h" +#include "mongo/util/background.h" +#include "mongo/util/exit.h" +#include "mongo/util/fast_clock_source_factory.h" +#include "mongo/util/log.h" +#include "mongo/util/periodic_runner_factory.h" +#include "mongo/util/quick_exit.h" +#include "mongo/util/time_support.h" + +#include <boost/filesystem.hpp> + + +namespace mongo { +namespace embedded { +namespace { +void initWireSpec() { + WireSpec& spec = WireSpec::instance(); + + // The featureCompatibilityVersion behavior defaults to the downgrade behavior while the + // in-memory version is unset. + + spec.incomingInternalClient.minWireVersion = RELEASE_2_4_AND_BEFORE; + spec.incomingInternalClient.maxWireVersion = LATEST_WIRE_VERSION; + + spec.outgoing.minWireVersion = RELEASE_2_4_AND_BEFORE; + spec.outgoing.maxWireVersion = LATEST_WIRE_VERSION; + + spec.isInternalClient = true; +} + + +// Noop, to fulfull dependencies for other initializers +MONGO_INITIALIZER_GENERAL(ForkServer, ("EndStartupOptionHandling"), ("default")) +(InitializerContext* context) { + return Status::OK(); +} + +// Create a minimalistic replication coordinator to provide a limited interface for users. Not +// functional to provide any replication logic. +MONGO_INITIALIZER_WITH_PREREQUISITES(CreateReplicationManager, + ("SetGlobalEnvironment", "SSLManager", "default")) +(InitializerContext* context) { + auto serviceContext = getGlobalServiceContext(); + repl::StorageInterface::set(serviceContext, stdx::make_unique<repl::StorageInterfaceImpl>()); + auto storageInterface = repl::StorageInterface::get(serviceContext); + + auto replCoord = stdx::make_unique<repl::ReplicationCoordinatorImpl>( + serviceContext, + getGlobalReplSettings(), + nullptr, + nullptr, + nullptr, + nullptr, + storageInterface, + static_cast<int64_t>(curTimeMillis64())); + repl::ReplicationCoordinator::set(serviceContext, std::move(replCoord)); + repl::setOplogCollectionName(serviceContext); + return Status::OK(); +} +} // namespace + +using logger::LogComponent; +using std::endl; + +void shutdown() { + Client::initThreadIfNotAlready(); + + auto const client = Client::getCurrent(); + auto const serviceContext = client->getServiceContext(); + + serviceContext->setKillAllOperations(); + + // Shut down the background periodic task runner + if (auto runner = serviceContext->getPeriodicRunner()) { + runner->shutdown(); + } + + // We should always be able to acquire the global lock at shutdown. + // + // TODO: This call chain uses the locker directly, because we do not want to start an + // operation context, which also instantiates a recovery unit. Also, using the + // lockGlobalBegin/lockGlobalComplete sequence, we avoid taking the flush lock. + // + // For a Windows service, dbexit does not call exit(), so we must leak the lock outside + // of this function to prevent any operations from running that need a lock. + // + DefaultLockerImpl* globalLocker = new DefaultLockerImpl(); + LockResult result = globalLocker->lockGlobalBegin(MODE_X, Milliseconds::max()); + if (result == LOCK_WAITING) { + result = globalLocker->lockGlobalComplete(Milliseconds::max()); + } + + invariant(LOCK_OK == result); + + // Global storage engine may not be started in all cases before we exit + if (serviceContext->getGlobalStorageEngine()) { + serviceContext->shutdownGlobalStorageEngineCleanly(); + } + + // We drop the scope cache because leak sanitizer can't see across the + // thread we use for proxying MozJS requests. Dropping the cache cleans up + // the memory and makes leak sanitizer happy. + ScriptEngine::dropScopeCache(); + + log(LogComponent::kControl) << "now exiting"; +} + + +int initialize(int argc, char* argv[], char** envp) { + registerShutdownTask(shutdown); + + srand(static_cast<unsigned>(curTimeMicros64())); + // + + Status status = mongo::runGlobalInitializers(argc, argv, envp); + if (!status.isOK()) { + severe(LogComponent::kControl) << "Failed global initializations: " << status; + return EXIT_FAILURE; + } + + Client::initThread("initandlisten"); + + initWireSpec(); + + auto serviceContext = checked_cast<ServiceContextMongoEmbedded*>(getGlobalServiceContext()); + + serviceContext->setFastClockSource(FastClockSourceFactory::create(Milliseconds(10))); + auto opObserverRegistry = stdx::make_unique<OpObserverRegistry>(); + opObserverRegistry->addObserver(stdx::make_unique<OpObserverImpl>()); + opObserverRegistry->addObserver(stdx::make_unique<UUIDCatalogObserver>()); + serviceContext->setOpObserver(std::move(opObserverRegistry)); + + DBDirectClientFactory::get(serviceContext).registerImplementation([](OperationContext* opCtx) { + return std::unique_ptr<DBClientBase>(new DBDirectClient(opCtx)); + }); + + { + ProcessId pid = ProcessId::getCurrent(); + LogstreamBuilder l = log(LogComponent::kControl); + l << "MongoDB starting : pid=" << pid << " port=" << serverGlobalParams.port + << " dbpath=" << storageGlobalParams.dbpath; + + const bool is32bit = sizeof(int*) == 4; + l << (is32bit ? " 32" : " 64") << "-bit" << endl; + } + + DEV log(LogComponent::kControl) << "DEBUG build (which is slower)" << endl; + + logProcessDetails(); + + serviceContext->createLockFile(); + + serviceContext->setServiceEntryPoint( + stdx::make_unique<ServiceEntryPointEmbedded>(serviceContext)); + + serviceContext->initializeGlobalStorageEngine(); + +#ifdef MONGO_CONFIG_WIREDTIGER_ENABLED + if (EncryptionHooks::get(serviceContext)->restartRequired()) { + quickExit(EXIT_CLEAN); + } +#endif + + // Warn if we detect configurations for multiple registered storage engines in the same + // configuration file/environment. + if (serverGlobalParams.parsedOpts.hasField("storage")) { + BSONElement storageElement = serverGlobalParams.parsedOpts.getField("storage"); + invariant(storageElement.isABSONObj()); + for (auto&& e : storageElement.Obj()) { + // Ignore if field name under "storage" matches current storage engine. + if (storageGlobalParams.engine == e.fieldName()) { + continue; + } + + // Warn if field name matches non-active registered storage engine. + if (serviceContext->isRegisteredStorageEngine(e.fieldName())) { + warning() << "Detected configuration for non-active storage engine " + << e.fieldName() << " when current storage engine is " + << storageGlobalParams.engine; + } + } + } + + logMongodStartupWarnings(storageGlobalParams, serverGlobalParams, serviceContext); + + { + std::stringstream ss; + ss << endl; + ss << "*********************************************************************" << endl; + ss << " ERROR: dbpath (" << storageGlobalParams.dbpath << ") does not exist." << endl; + ss << " Create this directory or give existing directory in --dbpath." << endl; + ss << " See http://dochub.mongodb.org/core/startingandstoppingmongo" << endl; + ss << "*********************************************************************" << endl; + uassert(50677, ss.str().c_str(), boost::filesystem::exists(storageGlobalParams.dbpath)); + } + + { + std::stringstream ss; + ss << "repairpath (" << storageGlobalParams.repairpath << ") does not exist"; + uassert(50678, ss.str().c_str(), boost::filesystem::exists(storageGlobalParams.repairpath)); + } + + if (!storageGlobalParams.readOnly) { + boost::filesystem::remove_all(storageGlobalParams.dbpath + "/_tmp/"); + } + + if (mongodGlobalParams.scriptingEnabled) { + ScriptEngine::setup(); + } + + auto startupOpCtx = serviceContext->makeOperationContext(&cc()); + + bool canCallFCVSetIfCleanStartup = + !storageGlobalParams.readOnly && !(storageGlobalParams.engine == "devnull"); + if (canCallFCVSetIfCleanStartup) { + Lock::GlobalWrite lk(startupOpCtx.get()); + FeatureCompatibilityVersion::setIfCleanStartup(startupOpCtx.get(), + repl::StorageInterface::get(serviceContext)); + } + + auto swNonLocalDatabases = repairDatabasesAndCheckVersion(startupOpCtx.get()); + if (!swNonLocalDatabases.isOK()) { + // SERVER-31611 introduced a return value to `repairDatabasesAndCheckVersion`. Previously, + // a failing condition would fassert. SERVER-31611 covers a case where the binary (3.6) is + // refusing to start up because it refuses acknowledgement of FCV 3.2 and requires the + // user to start up with an older binary. Thus shutting down the server must leave the + // datafiles in a state that the older binary can start up. This requires going through a + // clean shutdown. + // + // The invariant is *not* a statement that `repairDatabasesAndCheckVersion` must return + // `MustDowngrade`. Instead, it is meant as a guardrail to protect future developers from + // accidentally buying into this behavior. New errors that are returned from the method + // may or may not want to go through a clean shutdown, and they likely won't want the + // program to return an exit code of `EXIT_NEED_DOWNGRADE`. + severe(LogComponent::kControl) << "** IMPORTANT: " + << swNonLocalDatabases.getStatus().reason(); + invariant(swNonLocalDatabases == ErrorCodes::MustDowngrade); + quickExit(EXIT_NEED_DOWNGRADE); + } + + // Assert that the in-memory featureCompatibilityVersion parameter has been explicitly set. If + // we are part of a replica set and are started up with no data files, we do not set the + // featureCompatibilityVersion until a primary is chosen. For this case, we expect the in-memory + // featureCompatibilityVersion parameter to still be uninitialized until after startup. + if (canCallFCVSetIfCleanStartup) { + invariant(serverGlobalParams.featureCompatibility.isVersionInitialized()); + } + + if (storageGlobalParams.upgrade) { + log() << "finished checking dbs"; + exitCleanly(EXIT_CLEAN); + } + + // This is for security on certain platforms (nonce generation) + srand((unsigned)(curTimeMicros64()) ^ (unsigned(uintptr_t(&startupOpCtx)))); + + if (!storageGlobalParams.readOnly) { + restartInProgressIndexesFromLastShutdown(startupOpCtx.get()); + } + + // MessageServer::run will return when exit code closes its socket and we don't need the + // operation context anymore + startupOpCtx.reset(); + + // Make sure current thread have no client set in thread_local + Client::releaseCurrent(); + + serviceContext->notifyStartupComplete(); + + return 0; +} +} // namespace embedded +} // namespace mongo diff --git a/src/mongo/client/embedded/embedded.h b/src/mongo/client/embedded/embedded.h new file mode 100644 index 00000000000..b3e91953e04 --- /dev/null +++ b/src/mongo/client/embedded/embedded.h @@ -0,0 +1,38 @@ +/** +* Copyright (C) 2018 MongoDB Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +* +* As a special exception, the copyright holders give permission to link the +* code of portions of this program with the OpenSSL library under certain +* conditions as described in each individual source file and distribute +* linked combinations including the program with the OpenSSL library. You +* must comply with the GNU Affero General Public License in all respects +* for all of the code used other than as permitted herein. If you modify +* file(s) with this exception, you may extend this exception to your +* version of the file(s), but you are not obligated to do so. If you do not +* wish to do so, delete this exception statement from your version. If you +* delete this exception statement from all source files in the program, +* then also delete it in the license file. +*/ + +#pragma once + +#include "mongo/platform/basic.h" + +namespace mongo { +namespace embedded { +int initialize(int argc, char* argv[], char** envp); +void shutdown(); +} // namespace embedded +} // namespace mongo diff --git a/src/mongo/client/embedded/libmongodbcapi.cpp b/src/mongo/client/embedded/libmongodbcapi.cpp index 045777e9e4a..247dba6cc59 100644 --- a/src/mongo/client/embedded/libmongodbcapi.cpp +++ b/src/mongo/client/embedded/libmongodbcapi.cpp @@ -33,6 +33,7 @@ #include <unordered_map> #include <vector> +#include "mongo/client/embedded/embedded.h" #include "mongo/db/client.h" #include "mongo/db/dbmain.h" #include "mongo/db/service_context.h" @@ -52,7 +53,6 @@ struct libmongodbcapi_db { libmongodbcapi_db& operator=(const libmongodbcapi_db&) = delete; mongo::ServiceContext* serviceContext = nullptr; - mongo::stdx::thread mongodThread; mongo::stdx::unordered_map<libmongodbcapi_client*, std::unique_ptr<libmongodbcapi_client>> open_clients; std::unique_ptr<mongo::transport::TransportLayerMock> transportLayer; @@ -111,11 +111,7 @@ libmongodbcapi_db* db_new(int argc, const char** argv, const char** envp) noexce } global_db->envpPointers.push_back(nullptr); - // call mongoDbMain() in a new thread because it currently does not terminate - global_db->mongodThread = stdx::thread([=] { - mongoDbMain(argc, global_db->argvPointers.data(), global_db->envpPointers.data()); - }); - global_db->mongodThread.detach(); + embedded::initialize(argc, global_db->argvPointers.data(), global_db->envpPointers.data()); // wait until the global service context is not null global_db->serviceContext = waitAndGetGlobalServiceContext(); @@ -128,7 +124,8 @@ libmongodbcapi_db* db_new(int argc, const char** argv, const char** envp) noexce // wait until the global service context is not null global_db->serviceContext = waitAndGetGlobalServiceContext(); } - // creating mock transport layer + + // creating mock transport layer to be able to create sessions global_db->transportLayer = stdx::make_unique<transport::TransportLayerMock>(); return global_db; @@ -138,6 +135,12 @@ libmongodbcapi_db* db_new(int argc, const char** argv, const char** envp) noexce } void db_destroy(libmongodbcapi_db* db) noexcept { + // todo, we can't teardown and re-initialize yet. + /*if (run_setup) { + embedded::shutdown(); + run_setup = false; + }*/ + delete db; invariant(!db || db == global_db); if (db) { diff --git a/src/mongo/client/embedded/service_context_embedded.cpp b/src/mongo/client/embedded/service_context_embedded.cpp new file mode 100644 index 00000000000..4ab14c35152 --- /dev/null +++ b/src/mongo/client/embedded/service_context_embedded.cpp @@ -0,0 +1,303 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage + +#include "mongo/platform/basic.h" + +#include "mongo/base/init.h" +#include "mongo/base/initializer.h" +#include "mongo/client/embedded/service_context_embedded.h" +#include "mongo/client/embedded/service_entry_point_embedded.h" +#include "mongo/db/client.h" +#include "mongo/db/concurrency/lock_state.h" +#include "mongo/db/storage/storage_engine.h" +#include "mongo/db/storage/storage_engine_lock_file.h" +#include "mongo/db/storage/storage_engine_metadata.h" +#include "mongo/db/storage/storage_options.h" +#include "mongo/stdx/memory.h" +#include "mongo/util/log.h" +#include "mongo/util/map_util.h" +#include "mongo/util/mongoutils/str.h" +#include "mongo/util/scopeguard.h" +#include "mongo/util/system_clock_source.h" +#include "mongo/util/system_tick_source.h" + +namespace mongo { +namespace { +auto makeMongoEmbeddedServiceContext() { + auto service = stdx::make_unique<ServiceContextMongoEmbedded>(); + service->setServiceEntryPoint(stdx::make_unique<ServiceEntryPointEmbedded>(service.get())); + service->setTickSource(stdx::make_unique<SystemTickSource>()); + service->setFastClockSource(stdx::make_unique<SystemClockSource>()); + service->setPreciseClockSource(stdx::make_unique<SystemClockSource>()); + return service; +} + +MONGO_INITIALIZER(SetGlobalEnvironment)(InitializerContext* context) { + setGlobalServiceContext(makeMongoEmbeddedServiceContext()); + return Status::OK(); +} +} // namespace + +extern bool _supportsDocLocking; + +ServiceContextMongoEmbedded::ServiceContextMongoEmbedded() = default; + +ServiceContextMongoEmbedded::~ServiceContextMongoEmbedded() = default; + +StorageEngine* ServiceContextMongoEmbedded::getGlobalStorageEngine() { + // We don't check that globalStorageEngine is not-NULL here intentionally. We can encounter + // an error before it's initialized and proceed to exitCleanly which is equipped to deal + // with a NULL storage engine. + return _storageEngine; +} + +void ServiceContextMongoEmbedded::createLockFile() { + try { + _lockFile = stdx::make_unique<StorageEngineLockFile>(storageGlobalParams.dbpath); + } catch (const std::exception& ex) { + uassert(50668, + str::stream() << "Unable to determine status of lock file in the data directory " + << storageGlobalParams.dbpath + << ": " + << ex.what(), + false); + } + bool wasUnclean = _lockFile->createdByUncleanShutdown(); + auto openStatus = _lockFile->open(); + if (storageGlobalParams.readOnly && openStatus == ErrorCodes::IllegalOperation) { + _lockFile.reset(); + } else { + uassertStatusOK(openStatus); + } + + if (wasUnclean) { + if (storageGlobalParams.readOnly) { + severe() << "Attempted to open dbpath in readOnly mode, but the server was " + "previously not shut down cleanly."; + fassertFailedNoTrace(50669); + } + warning() << "Detected unclean shutdown - " << _lockFile->getFilespec() << " is not empty."; + } +} + +void ServiceContextMongoEmbedded::initializeGlobalStorageEngine() { + // This should be set once. + invariant(!_storageEngine); + + // We should have a _lockFile or be in read-only mode. Confusingly, we can still have a lockFile + // if we are in read-only mode. This can happen if the server is started in read-only mode on a + // writable dbpath. + invariant(_lockFile || storageGlobalParams.readOnly); + + const std::string dbpath = storageGlobalParams.dbpath; + if (auto existingStorageEngine = StorageEngineMetadata::getStorageEngineForPath(dbpath)) { + if (*existingStorageEngine == "mmapv1" || + (storageGlobalParams.engineSetByUser && storageGlobalParams.engine == "mmapv1")) { + log() << startupWarningsLog; + log() << "** WARNING: Support for MMAPV1 storage engine has been deprecated and will be" + << startupWarningsLog; + log() << "** removed in version 4.0. Please plan to migrate to the wiredTiger" + << startupWarningsLog; + log() << "** storage engine." << startupWarningsLog; + log() << "** See http://dochub.mongodb.org/core/deprecated-mmapv1"; + log() << startupWarningsLog; + } + + if (storageGlobalParams.engineSetByUser) { + // Verify that the name of the user-supplied storage engine matches the contents of + // the metadata file. + const StorageEngine::Factory* factory = + mapFindWithDefault(_storageFactories, + storageGlobalParams.engine, + static_cast<const StorageEngine::Factory*>(nullptr)); + + if (factory) { + uassert(50667, + str::stream() << "Cannot start server. Detected data files in " << dbpath + << " created by" + << " the '" + << *existingStorageEngine + << "' storage engine, but the" + << " specified storage engine was '" + << factory->getCanonicalName() + << "'.", + factory->getCanonicalName() == *existingStorageEngine); + } + } else { + // Otherwise set the active storage engine as the contents of the metadata file. + log() << "Detected data files in " << dbpath << " created by the '" + << *existingStorageEngine << "' storage engine, so setting the active" + << " storage engine to '" << *existingStorageEngine << "'."; + storageGlobalParams.engine = *existingStorageEngine; + } + } else if (!storageGlobalParams.engineSetByUser) { + // Ensure the default storage engine is available with this build of mongod. + uassert(50683, + str::stream() + << "Cannot start server. The default storage engine '" + << storageGlobalParams.engine + << "' is not available with this build of mongod. Please specify a different" + << " storage engine explicitly, e.g. --storageEngine=mmapv1.", + isRegisteredStorageEngine(storageGlobalParams.engine)); + } else if (storageGlobalParams.engineSetByUser && storageGlobalParams.engine == "mmapv1") { + log() << startupWarningsLog; + log() << "** WARNING: You have explicitly specified 'MMAPV1' storage engine in your" + << startupWarningsLog; + log() << "** config file or as a command line option. Support for the MMAPV1" + << startupWarningsLog; + log() << "** storage engine has been deprecated and will be removed in" + << startupWarningsLog; + log() << "** version 4.0. See http://dochub.mongodb.org/core/deprecated-mmapv1"; + log() << startupWarningsLog; + } + + const std::string repairpath = storageGlobalParams.repairpath; + uassert(50682, + str::stream() << "Cannot start server. The command line option '--repairpath'" + << " is only supported by the mmapv1 storage engine", + repairpath.empty() || repairpath == dbpath || storageGlobalParams.engine == "mmapv1"); + + const StorageEngine::Factory* factory = _storageFactories[storageGlobalParams.engine]; + + uassert(50681, + str::stream() << "Cannot start server with an unknown storage engine: " + << storageGlobalParams.engine, + factory); + + if (storageGlobalParams.readOnly) { + uassert(50679, + str::stream() + << "Server was started in read-only mode, but the configured storage engine, " + << storageGlobalParams.engine + << ", does not support read-only operation", + factory->supportsReadOnly()); + } + + std::unique_ptr<StorageEngineMetadata> metadata = StorageEngineMetadata::forPath(dbpath); + + if (storageGlobalParams.readOnly) { + uassert(50680, + "Server was started in read-only mode, but the storage metadata file was not" + " found.", + metadata.get()); + } + + // Validate options in metadata against current startup options. + if (metadata.get()) { + uassertStatusOK(factory->validateMetadata(*metadata, storageGlobalParams)); + } + + ScopeGuard guard = MakeGuard([&] { + if (_lockFile) { + _lockFile->close(); + } + }); + + _storageEngine = factory->create(storageGlobalParams, _lockFile.get()); + _storageEngine->finishInit(); + + if (_lockFile) { + uassertStatusOK(_lockFile->writePid()); + } + + // Write a new metadata file if it is not present. + if (!metadata.get()) { + invariant(!storageGlobalParams.readOnly); + metadata.reset(new StorageEngineMetadata(storageGlobalParams.dbpath)); + metadata->setStorageEngine(factory->getCanonicalName().toString()); + metadata->setStorageEngineOptions(factory->createMetadataOptions(storageGlobalParams)); + uassertStatusOK(metadata->write()); + } + + guard.Dismiss(); + + _supportsDocLocking = _storageEngine->supportsDocLocking(); +} + +void ServiceContextMongoEmbedded::shutdownGlobalStorageEngineCleanly() { + invariant(_storageEngine); + _storageEngine->cleanShutdown(); + if (_lockFile) { + _lockFile->clearPidAndUnlock(); + } +} + +void ServiceContextMongoEmbedded::registerStorageEngine(const std::string& name, + const StorageEngine::Factory* factory) { + // No double-registering. + invariant(0 == _storageFactories.count(name)); + + // Some sanity checks: the factory must exist, + invariant(factory); + + // and all factories should be added before we pick a storage engine. + invariant(NULL == _storageEngine); + + _storageFactories[name] = factory; +} + +bool ServiceContextMongoEmbedded::isRegisteredStorageEngine(const std::string& name) { + return _storageFactories.count(name); +} + +StorageFactoriesIterator* ServiceContextMongoEmbedded::makeStorageFactoriesIterator() { + return new StorageFactoriesIteratorMongoEmbedded(_storageFactories.begin(), + _storageFactories.end()); +} + +StorageFactoriesIteratorMongoEmbedded::StorageFactoriesIteratorMongoEmbedded( + const FactoryMapIterator& begin, const FactoryMapIterator& end) + : _curr(begin), _end(end) {} + +bool StorageFactoriesIteratorMongoEmbedded::more() const { + return _curr != _end; +} + +const StorageEngine::Factory* StorageFactoriesIteratorMongoEmbedded::next() { + return _curr++->second; +} + +std::unique_ptr<OperationContext> ServiceContextMongoEmbedded::_newOpCtx(Client* client, + unsigned opId) { + invariant(&cc() == client); + auto opCtx = stdx::make_unique<OperationContext>(client, opId); + + if (isMMAPV1()) { + opCtx->setLockState(stdx::make_unique<MMAPV1LockerImpl>()); + } else { + opCtx->setLockState(stdx::make_unique<DefaultLockerImpl>()); + } + + opCtx->setRecoveryUnit(getGlobalStorageEngine()->newRecoveryUnit(), + OperationContext::kNotInUnitOfWork); + return opCtx; +} + +} // namespace mongo diff --git a/src/mongo/client/embedded/service_context_embedded.h b/src/mongo/client/embedded/service_context_embedded.h new file mode 100644 index 00000000000..661c96d4677 --- /dev/null +++ b/src/mongo/client/embedded/service_context_embedded.h @@ -0,0 +1,90 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <map> + +#include "mongo/db/service_context.h" + +namespace mongo { + +class Client; +class StorageEngineLockFile; + +class ServiceContextMongoEmbedded final : public ServiceContext { +public: + using FactoryMap = std::map<std::string, const StorageEngine::Factory*>; + + ServiceContextMongoEmbedded(); + + ~ServiceContextMongoEmbedded(); + + StorageEngine* getGlobalStorageEngine() override; + + void createLockFile(); + + void initializeGlobalStorageEngine() override; + + void shutdownGlobalStorageEngineCleanly() override; + + void registerStorageEngine(const std::string& name, + const StorageEngine::Factory* factory) override; + + bool isRegisteredStorageEngine(const std::string& name) override; + + StorageFactoriesIterator* makeStorageFactoriesIterator() override; + +private: + std::unique_ptr<OperationContext> _newOpCtx(Client* client, unsigned opId) override; + + std::unique_ptr<StorageEngineLockFile> _lockFile; + + // logically owned here, but never deleted by anyone. + StorageEngine* _storageEngine = nullptr; + + // All possible storage engines are registered here through MONGO_INIT. + FactoryMap _storageFactories; +}; + +class StorageFactoriesIteratorMongoEmbedded final : public StorageFactoriesIterator { +public: + typedef ServiceContextMongoEmbedded::FactoryMap::const_iterator FactoryMapIterator; + + StorageFactoriesIteratorMongoEmbedded(const FactoryMapIterator& begin, + const FactoryMapIterator& end); + + bool more() const override; + const StorageEngine::Factory* next() override; + +private: + FactoryMapIterator _curr; + FactoryMapIterator _end; +}; + +} // namespace mongo diff --git a/src/mongo/client/embedded/service_entry_point_embedded.cpp b/src/mongo/client/embedded/service_entry_point_embedded.cpp new file mode 100644 index 00000000000..003c5ff95d3 --- /dev/null +++ b/src/mongo/client/embedded/service_entry_point_embedded.cpp @@ -0,0 +1,1134 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand + +#include "mongo/platform/basic.h" + +#include "mongo/base/checked_cast.h" +#include "mongo/client/embedded/service_entry_point_embedded.h" +#include "mongo/db/audit.h" +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/auth/impersonation_session.h" +#include "mongo/db/client.h" +#include "mongo/db/commands.h" +#include "mongo/db/commands/fsync.h" +#include "mongo/db/concurrency/global_lock_acquisition_tracker.h" +#include "mongo/db/curop.h" +#include "mongo/db/curop_metrics.h" +#include "mongo/db/cursor_manager.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/initialize_operation_session_info.h" +#include "mongo/db/introspect.h" +#include "mongo/db/jsobj.h" +#include "mongo/db/lasterror.h" +#include "mongo/db/logical_clock.h" +#include "mongo/db/logical_session_id.h" +#include "mongo/db/logical_session_id_helpers.h" +#include "mongo/db/logical_time_validator.h" +#include "mongo/db/ops/write_ops.h" +#include "mongo/db/ops/write_ops_exec.h" +#include "mongo/db/query/find.h" +#include "mongo/db/read_concern.h" +#include "mongo/db/repl/optime.h" +#include "mongo/db/repl/read_concern_args.h" +#include "mongo/db/repl/repl_client_info.h" +#include "mongo/db/repl/replication_coordinator_global.h" +#include "mongo/db/s/operation_sharding_state.h" +#include "mongo/db/s/sharded_connection_info.h" +#include "mongo/db/s/sharding_state.h" +#include "mongo/db/session_catalog.h" +#include "mongo/db/stats/counters.h" +#include "mongo/db/stats/top.h" +#include "mongo/rpc/factory.h" +#include "mongo/rpc/metadata.h" +#include "mongo/rpc/metadata/config_server_metadata.h" +#include "mongo/rpc/metadata/logical_time_metadata.h" +#include "mongo/rpc/metadata/oplog_query_metadata.h" +#include "mongo/rpc/metadata/repl_set_metadata.h" +#include "mongo/rpc/metadata/sharding_metadata.h" +#include "mongo/rpc/metadata/tracking_metadata.h" +#include "mongo/rpc/reply_builder_interface.h" +#include "mongo/s/grid.h" +#include "mongo/s/stale_exception.h" +#include "mongo/util/fail_point_service.h" +#include "mongo/util/log.h" +#include "mongo/util/net/message.h" +#include "mongo/util/net/op_msg.h" +#include "mongo/util/scopeguard.h" + +namespace mongo { + +MONGO_FP_DECLARE(rsStopGetMore); +MONGO_FP_DECLARE(respondWithNotPrimaryInCommandDispatch); + +namespace { +using logger::LogComponent; + +// The command names for which to check out a session. +// +// Note: Eval should check out a session because it defaults to running under a global write lock, +// so if it didn't, and the function it was given contains any of these whitelisted commands, they +// would try to check out a session under a lock, which is not allowed. Similarly, +// refreshLogicalSessionCacheNow triggers a bulk update under a lock on the sessions collection. +const StringMap<int> cmdWhitelist = {{"delete", 1}, + {"eval", 1}, + {"$eval", 1}, + {"findandmodify", 1}, + {"findAndModify", 1}, + {"insert", 1}, + {"refreshLogicalSessionCacheNow", 1}, + {"update", 1}}; + +void generateLegacyQueryErrorResponse(const AssertionException* exception, + const QueryMessage& queryMessage, + CurOp* curop, + Message* response) { + curop->debug().exceptionInfo = exception->toStatus(); + + log(LogComponent::kQuery) << "assertion " << exception->toString() << " ns:" << queryMessage.ns + << " query:" << (queryMessage.query.valid(BSONVersion::kLatest) + ? queryMessage.query.toString() + : "query object is corrupt"); + if (queryMessage.ntoskip || queryMessage.ntoreturn) { + log(LogComponent::kQuery) << " ntoskip:" << queryMessage.ntoskip + << " ntoreturn:" << queryMessage.ntoreturn; + } + + auto scex = exception->extraInfo<StaleConfigInfo>(); + + BSONObjBuilder err; + err.append("$err", exception->reason()); + err.append("code", exception->code()); + if (scex) { + err.append("ok", 0.0); + err.append("ns", scex->getns()); + scex->getVersionReceived().addToBSON(err, "vReceived"); + scex->getVersionWanted().addToBSON(err, "vWanted"); + } + BSONObj errObj = err.done(); + + if (scex) { + log(LogComponent::kQuery) << "stale version detected during query over " << queryMessage.ns + << " : " << errObj; + } + + BufBuilder bb; + bb.skip(sizeof(QueryResult::Value)); + bb.appendBuf((void*)errObj.objdata(), errObj.objsize()); + + // TODO: call replyToQuery() from here instead of this!!! see dbmessage.h + QueryResult::View msgdata = bb.buf(); + QueryResult::View qr = msgdata; + qr.setResultFlags(ResultFlag_ErrSet); + if (scex) + qr.setResultFlags(qr.getResultFlags() | ResultFlag_ShardConfigStale); + qr.msgdata().setLen(bb.len()); + qr.msgdata().setOperation(opReply); + qr.setCursorId(0); + qr.setStartingFrom(0); + qr.setNReturned(1); + response->setData(bb.release()); +} + +void registerError(OperationContext* opCtx, const DBException& exception) { + LastError::get(opCtx->getClient()).setLastError(exception.code(), exception.reason()); + CurOp::get(opCtx)->debug().exceptionInfo = exception.toStatus(); +} + +void generateErrorResponse(OperationContext* opCtx, + rpc::ReplyBuilderInterface* replyBuilder, + const DBException& exception, + const BSONObj& replyMetadata) { + registerError(opCtx, exception); + + // We could have thrown an exception after setting fields in the builder, + // so we need to reset it to a clean state just to be sure. + replyBuilder->reset(); + replyBuilder->setCommandReply(exception.toStatus()); + replyBuilder->setMetadata(replyMetadata); +} + +void generateErrorResponse(OperationContext* opCtx, + rpc::ReplyBuilderInterface* replyBuilder, + const DBException& exception, + const BSONObj& replyMetadata, + LogicalTime operationTime) { + registerError(opCtx, exception); + + // We could have thrown an exception after setting fields in the builder, + // so we need to reset it to a clean state just to be sure. + replyBuilder->reset(); + replyBuilder->setCommandReply(exception.toStatus(), + BSON("operationTime" << operationTime.asTimestamp())); + replyBuilder->setMetadata(replyMetadata); +} + +/** + * Guard object for making a good-faith effort to enter maintenance mode and leave it when it + * goes out of scope. + * + * Sometimes we cannot set maintenance mode, in which case the call to setMaintenanceMode will + * return a non-OK status. This class does not treat that case as an error which means that + * anybody using it is assuming it is ok to continue execution without maintenance mode. + * + * TODO: This assumption needs to be audited and documented, or this behavior should be moved + * elsewhere. + */ +class MaintenanceModeSetter { + MONGO_DISALLOW_COPYING(MaintenanceModeSetter); + +public: + MaintenanceModeSetter(OperationContext* opCtx) + : _opCtx(opCtx), + _maintenanceModeSet( + repl::ReplicationCoordinator::get(_opCtx)->setMaintenanceMode(true).isOK()) {} + + ~MaintenanceModeSetter() { + if (_maintenanceModeSet) { + repl::ReplicationCoordinator::get(_opCtx) + ->setMaintenanceMode(false) + .transitional_ignore(); + } + } + +private: + OperationContext* const _opCtx; + const bool _maintenanceModeSet; +}; + +// Called from the error contexts where request may not be available. +// It only attaches clusterTime and operationTime. +void appendReplyMetadataOnError(OperationContext* opCtx, BSONObjBuilder* metadataBob) { + auto const replCoord = repl::ReplicationCoordinator::get(opCtx); + const bool isReplSet = + replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet; + + if (isReplSet) { + if (LogicalTimeValidator::isAuthorizedToAdvanceClock(opCtx)) { + // No need to sign cluster times for internal clients. + SignedLogicalTime currentTime( + LogicalClock::get(opCtx)->getClusterTime(), TimeProofService::TimeProof(), 0); + rpc::LogicalTimeMetadata logicalTimeMetadata(currentTime); + logicalTimeMetadata.writeToMetadata(metadataBob); + } else if (auto validator = LogicalTimeValidator::get(opCtx)) { + auto currentTime = + validator->trySignLogicalTime(LogicalClock::get(opCtx)->getClusterTime()); + rpc::LogicalTimeMetadata logicalTimeMetadata(currentTime); + logicalTimeMetadata.writeToMetadata(metadataBob); + } + } +} + +void appendReplyMetadata(OperationContext* opCtx, + const OpMsgRequest& request, + BSONObjBuilder* metadataBob) { + const bool isShardingAware = ShardingState::get(opCtx)->enabled(); + const bool isConfig = serverGlobalParams.clusterRole == ClusterRole::ConfigServer; + auto const replCoord = repl::ReplicationCoordinator::get(opCtx); + const bool isReplSet = + replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet; + + if (isReplSet) { + // Attach our own last opTime. + repl::OpTime lastOpTimeFromClient = + repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + replCoord->prepareReplMetadata(opCtx, request.body, lastOpTimeFromClient, metadataBob); + // For commands from mongos, append some info to help getLastError(w) work. + // TODO: refactor out of here as part of SERVER-18236 + if (isShardingAware || isConfig) { + rpc::ShardingMetadata(lastOpTimeFromClient, replCoord->getElectionId()) + .writeToMetadata(metadataBob) + .transitional_ignore(); + } + if (LogicalTimeValidator::isAuthorizedToAdvanceClock(opCtx)) { + // No need to sign cluster times for internal clients. + SignedLogicalTime currentTime( + LogicalClock::get(opCtx)->getClusterTime(), TimeProofService::TimeProof(), 0); + rpc::LogicalTimeMetadata logicalTimeMetadata(currentTime); + logicalTimeMetadata.writeToMetadata(metadataBob); + } else if (auto validator = LogicalTimeValidator::get(opCtx)) { + auto currentTime = + validator->trySignLogicalTime(LogicalClock::get(opCtx)->getClusterTime()); + rpc::LogicalTimeMetadata logicalTimeMetadata(currentTime); + logicalTimeMetadata.writeToMetadata(metadataBob); + } + } + + // If we're a shard other than the config shard, attach the last configOpTime we know about. + if (isShardingAware && !isConfig) { + auto opTime = grid.configOpTime(); + rpc::ConfigServerMetadata(opTime).writeToMetadata(metadataBob); + } +} + +/** + * Given the specified command, returns an effective read concern which should be used or an error + * if the read concern is not valid for the command. + */ +StatusWith<repl::ReadConcernArgs> _extractReadConcern(const Command* command, + const std::string& dbName, + const BSONObj& cmdObj) { + repl::ReadConcernArgs readConcernArgs; + + auto readConcernParseStatus = readConcernArgs.initialize(cmdObj); + if (!readConcernParseStatus.isOK()) { + return readConcernParseStatus; + } + + if (!command->supportsReadConcern(dbName, cmdObj, readConcernArgs.getLevel())) { + return {ErrorCodes::InvalidOptions, + str::stream() << "Command does not support read concern " + << readConcernArgs.toString()}; + } + + return readConcernArgs; +} + +void _waitForWriteConcernAndAddToCommandResponse(OperationContext* opCtx, + const std::string& commandName, + const repl::OpTime& lastOpBeforeRun, + BSONObjBuilder* commandResponseBuilder) { + auto lastOpAfterRun = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + + // Ensures that if we tried to do a write, we wait for write concern, even if that write was + // a noop. + if ((lastOpAfterRun == lastOpBeforeRun) && + GlobalLockAcquisitionTracker::get(opCtx).getGlobalExclusiveLockTaken()) { + repl::ReplClientInfo::forClient(opCtx->getClient()).setLastOpToSystemLastOpTime(opCtx); + lastOpAfterRun = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + } + + WriteConcernResult res; + auto waitForWCStatus = + waitForWriteConcern(opCtx, lastOpAfterRun, opCtx->getWriteConcern(), &res); + CommandHelpers::appendCommandWCStatus(*commandResponseBuilder, waitForWCStatus, res); + + // SERVER-22421: This code is to ensure error response backwards compatibility with the + // user management commands. This can be removed in 3.6. + if (!waitForWCStatus.isOK() && CommandHelpers::isUserManagementCommand(commandName)) { + BSONObj temp = commandResponseBuilder->asTempObj().copy(); + commandResponseBuilder->resetToEmpty(); + CommandHelpers::appendCommandStatus(*commandResponseBuilder, waitForWCStatus); + commandResponseBuilder->appendElementsUnique(temp); + } +} + +/** + * For replica set members it returns the last known op time from opCtx. Otherwise will return + * uninitialized cluster time. + */ +LogicalTime getClientOperationTime(OperationContext* opCtx) { + auto const replCoord = repl::ReplicationCoordinator::get(opCtx); + const bool isReplSet = + replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet; + LogicalTime operationTime; + if (isReplSet) { + operationTime = LogicalTime( + repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp().getTimestamp()); + } + return operationTime; +} + +/** + * Returns the proper operationTime for a command. To construct the operationTime for replica set + * members, it uses the last optime in the oplog for writes, last committed optime for majority + * reads, and the last applied optime for every other read. An uninitialized cluster time is + * returned for non replica set members. + */ +LogicalTime computeOperationTime(OperationContext* opCtx, + LogicalTime startOperationTime, + repl::ReadConcernLevel level) { + auto const replCoord = repl::ReplicationCoordinator::get(opCtx); + const bool isReplSet = + replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet; + + if (!isReplSet) { + return LogicalTime(); + } + + auto operationTime = getClientOperationTime(opCtx); + invariant(operationTime >= startOperationTime); + + // If the last operationTime has not changed, consider this command a read, and, for replica set + // members, construct the operationTime with the proper optime for its read concern level. + if (operationTime == startOperationTime) { + if (level == repl::ReadConcernLevel::kMajorityReadConcern) { + operationTime = LogicalTime(replCoord->getLastCommittedOpTime().getTimestamp()); + } else { + operationTime = LogicalTime(replCoord->getMyLastAppliedOpTime().getTimestamp()); + } + } + + return operationTime; +} + +bool runCommandImpl(OperationContext* opCtx, + Command* command, + const OpMsgRequest& request, + rpc::ReplyBuilderInterface* replyBuilder, + LogicalTime startOperationTime) { + auto bytesToReserve = command->reserveBytesForReply(); + +// SERVER-22100: In Windows DEBUG builds, the CRT heap debugging overhead, in conjunction with the +// additional memory pressure introduced by reply buffer pre-allocation, causes the concurrency +// suite to run extremely slowly. As a workaround we do not pre-allocate in Windows DEBUG builds. +#ifdef _WIN32 + if (kDebugBuild) + bytesToReserve = 0; +#endif + + // run expects non-const bsonobj + BSONObj cmd = request.body; + + // run expects const db std::string (can't bind to temporary) + const std::string db = request.getDatabase().toString(); + + BSONObjBuilder inPlaceReplyBob = replyBuilder->getInPlaceReplyBuilder(bytesToReserve); + + bool result; + if (!command->supportsWriteConcern(cmd)) { + result = command->publicRun(opCtx, request, inPlaceReplyBob); + } else { + auto wcResult = extractWriteConcern(opCtx, cmd, db); + if (!wcResult.isOK()) { + auto result = + CommandHelpers::appendCommandStatus(inPlaceReplyBob, wcResult.getStatus()); + inPlaceReplyBob.doneFast(); + BSONObjBuilder metadataBob; + appendReplyMetadataOnError(opCtx, &metadataBob); + replyBuilder->setMetadata(metadataBob.done()); + return result; + } + + auto lastOpBeforeRun = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + + // Change the write concern while running the command. + const auto oldWC = opCtx->getWriteConcern(); + ON_BLOCK_EXIT([&] { opCtx->setWriteConcern(oldWC); }); + opCtx->setWriteConcern(wcResult.getValue()); + ON_BLOCK_EXIT([&] { + _waitForWriteConcernAndAddToCommandResponse( + opCtx, command->getName(), lastOpBeforeRun, &inPlaceReplyBob); + }); + + result = command->publicRun(opCtx, request, inPlaceReplyBob); + + // Nothing in run() should change the writeConcern. + dassert(SimpleBSONObjComparator::kInstance.evaluate(opCtx->getWriteConcern().toBSON() == + wcResult.getValue().toBSON())); + } + + CommandHelpers::appendCommandStatus(inPlaceReplyBob, result); + + auto operationTime = computeOperationTime( + opCtx, startOperationTime, repl::ReadConcernArgs::get(opCtx).getLevel()); + + // An uninitialized operation time means the cluster time is not propagated, so the operation + // time should not be attached to the response. + if (operationTime != LogicalTime::kUninitialized) { + operationTime.appendAsOperationTime(&inPlaceReplyBob); + } + + inPlaceReplyBob.doneFast(); + + BSONObjBuilder metadataBob; + appendReplyMetadata(opCtx, request, &metadataBob); + replyBuilder->setMetadata(metadataBob.done()); + + return result; +} + +// When active, we won't check if we are master in command dispatch. Activate this if you want to +// test failing during command execution. +MONGO_FP_DECLARE(skipCheckingForNotMasterInCommandDispatch); + +/** + * Executes a command after stripping metadata, performing authorization checks, + * handling audit impersonation, and (potentially) setting maintenance mode. This method + * also checks that the command is permissible to run on the node given its current + * replication state. All the logic here is independent of any particular command; any + * functionality relevant to a specific command should be confined to its run() method. + */ +void execCommandDatabase(OperationContext* opCtx, + Command* command, + const OpMsgRequest& request, + rpc::ReplyBuilderInterface* replyBuilder) { + + auto startOperationTime = getClientOperationTime(opCtx); + try { + { + stdx::lock_guard<Client> lk(*opCtx->getClient()); + CurOp::get(opCtx)->setCommand_inlock(command); + } + + // TODO: move this back to runCommands when mongos supports OperationContext + // see SERVER-18515 for details. + rpc::readRequestMetadata(opCtx, request.body); + rpc::TrackingMetadata::get(opCtx).initWithOperName(command->getName()); + + auto const replCoord = repl::ReplicationCoordinator::get(opCtx); + initializeOperationSessionInfo( + opCtx, + request.body, + command->requiresAuth(), + replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet, + opCtx->getServiceContext()->getGlobalStorageEngine()->supportsDocLocking()); + + const auto dbname = request.getDatabase().toString(); + uassert( + ErrorCodes::InvalidNamespace, + str::stream() << "Invalid database name: '" << dbname << "'", + NamespaceString::validDBName(dbname, NamespaceString::DollarInDbNameBehavior::Allow)); + + std::unique_ptr<MaintenanceModeSetter> mmSetter; + + BSONElement cmdOptionMaxTimeMSField; + BSONElement allowImplicitCollectionCreationField; + BSONElement helpField; + BSONElement shardVersionFieldIdx; + BSONElement queryOptionMaxTimeMSField; + + StringMap<int> topLevelFields; + for (auto&& element : request.body) { + StringData fieldName = element.fieldNameStringData(); + if (fieldName == QueryRequest::cmdOptionMaxTimeMS) { + cmdOptionMaxTimeMSField = element; + } else if (fieldName == "allowImplicitCollectionCreation") { + allowImplicitCollectionCreationField = element; + } else if (fieldName == CommandHelpers::kHelpFieldName) { + helpField = element; + } else if (fieldName == ChunkVersion::kShardVersionField) { + shardVersionFieldIdx = element; + } else if (fieldName == QueryRequest::queryOptionMaxTimeMS) { + queryOptionMaxTimeMSField = element; + } + + uassert(ErrorCodes::FailedToParse, + str::stream() << "Parsed command object contains duplicate top level key: " + << fieldName, + topLevelFields[fieldName]++ == 0); + } + + if (CommandHelpers::isHelpRequest(helpField)) { + CurOp::get(opCtx)->ensureStarted(); + // We disable last-error for help requests due to SERVER-11492, because config servers + // use help requests to determine which commands are database writes, and so must be + // forwarded to all config servers. + LastError::get(opCtx->getClient()).disable(); + Command::generateHelpResponse(opCtx, replyBuilder, *command); + return; + } + + // Session ids are forwarded in requests, so commands that require roundtrips between + // servers may result in a deadlock when a server tries to check out a session it is already + // using to service an earlier operation in the command's chain. To avoid this, only check + // out sessions for commands that require them (i.e. write commands). + OperationContextSession sessionTxnState( + opCtx, cmdWhitelist.find(command->getName()) != cmdWhitelist.cend()); + + ImpersonationSessionGuard guard(opCtx); + uassertStatusOK(Command::checkAuthorization(command, opCtx, request)); + + const bool iAmPrimary = replCoord->canAcceptWritesForDatabase_UNSAFE(opCtx, dbname); + + if (!opCtx->getClient()->isInDirectClient() && + !MONGO_FAIL_POINT(skipCheckingForNotMasterInCommandDispatch)) { + + bool commandCanRunOnSecondary = command->slaveOk(); + + bool commandIsOverriddenToRunOnSecondary = + command->slaveOverrideOk() && ReadPreferenceSetting::get(opCtx).canRunOnSecondary(); + + bool iAmStandalone = !opCtx->writesAreReplicated(); + bool canRunHere = iAmPrimary || commandCanRunOnSecondary || + commandIsOverriddenToRunOnSecondary || iAmStandalone; + + // This logic is clearer if we don't have to invert it. + if (!canRunHere && command->slaveOverrideOk()) { + uasserted(ErrorCodes::NotMasterNoSlaveOk, "not master and slaveOk=false"); + } + + if (MONGO_FAIL_POINT(respondWithNotPrimaryInCommandDispatch)) { + uassert(ErrorCodes::NotMaster, "not primary", canRunHere); + } else { + uassert(ErrorCodes::NotMaster, "not master", canRunHere); + } + + if (!command->maintenanceOk() && + replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet && + !replCoord->canAcceptWritesForDatabase_UNSAFE(opCtx, dbname) && + !replCoord->getMemberState().secondary()) { + + uassert(ErrorCodes::NotMasterOrSecondary, + "node is recovering", + !replCoord->getMemberState().recovering()); + uassert(ErrorCodes::NotMasterOrSecondary, + "node is not in primary or recovering state", + replCoord->getMemberState().primary()); + // Check ticket SERVER-21432, slaveOk commands are allowed in drain mode + uassert(ErrorCodes::NotMasterOrSecondary, + "node is in drain mode", + commandIsOverriddenToRunOnSecondary || commandCanRunOnSecondary); + } + } + + if (command->adminOnly()) { + LOG(2) << "command: " << request.getCommandName(); + } + + if (command->maintenanceMode()) { + mmSetter.reset(new MaintenanceModeSetter(opCtx)); + } + + if (command->shouldAffectCommandCounter()) { + OpCounters* opCounters = &globalOpCounters; + opCounters->gotCommand(); + } + + // Handle command option maxTimeMS. + int maxTimeMS = uassertStatusOK(QueryRequest::parseMaxTimeMS(cmdOptionMaxTimeMSField)); + + uassert(ErrorCodes::InvalidOptions, + "no such command option $maxTimeMs; use maxTimeMS instead", + queryOptionMaxTimeMSField.eoo()); + + if (maxTimeMS > 0) { + uassert(50673, + "Illegal attempt to set operation deadline within DBDirectClient", + !opCtx->getClient()->isInDirectClient()); + opCtx->setDeadlineAfterNowBy(Milliseconds{maxTimeMS}); + } + + auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); + readConcernArgs = uassertStatusOK(_extractReadConcern(command, dbname, request.body)); + + auto& oss = OperationShardingState::get(opCtx); + + if (!opCtx->getClient()->isInDirectClient() && + readConcernArgs.getLevel() != repl::ReadConcernLevel::kAvailableReadConcern && + (iAmPrimary || + (readConcernArgs.hasLevel() || readConcernArgs.getArgsAfterClusterTime()))) { + oss.initializeShardVersion(NamespaceString(command->parseNs(dbname, request.body)), + shardVersionFieldIdx); + + auto const shardingState = ShardingState::get(opCtx); + if (oss.hasShardVersion()) { + uassertStatusOK(shardingState->canAcceptShardedCommands()); + } + + // Handle config optime information that may have been sent along with the command. + uassertStatusOK(shardingState->updateConfigServerOpTimeFromMetadata(opCtx)); + } + + oss.setAllowImplicitCollectionCreation(allowImplicitCollectionCreationField); + + // Can throw + opCtx->checkForInterrupt(); // May trigger maxTimeAlwaysTimeOut fail point. + + bool retval = false; + + CurOp::get(opCtx)->ensureStarted(); + + command->incrementCommandsExecuted(); + + if (logger::globalLogDomain()->shouldLog(logger::LogComponent::kTracking, + logger::LogSeverity::Debug(1)) && + rpc::TrackingMetadata::get(opCtx).getParentOperId()) { + MONGO_LOG_COMPONENT(1, logger::LogComponent::kTracking) + << rpc::TrackingMetadata::get(opCtx).toString(); + rpc::TrackingMetadata::get(opCtx).setIsLogged(true); + } + + retval = runCommandImpl(opCtx, command, request, replyBuilder, startOperationTime); + + if (!retval) { + command->incrementCommandsFailed(); + } + } catch (const DBException& e) { + // If we got a stale config, wait in case the operation is stuck in a critical section + if (auto sce = e.extraInfo<StaleConfigInfo>()) { + if (!opCtx->getClient()->isInDirectClient()) { + ShardingState::get(opCtx) + ->onStaleShardVersion( + opCtx, NamespaceString(sce->getns()), sce->getVersionReceived()) + .transitional_ignore(); + } + } + + BSONObjBuilder metadataBob; + appendReplyMetadata(opCtx, request, &metadataBob); + + // Note: the read concern may not have been successfully or yet placed on the opCtx, so + // parsing it separately here. + const std::string db = request.getDatabase().toString(); + auto readConcernArgsStatus = _extractReadConcern(command, db, request.body); + auto operationTime = readConcernArgsStatus.isOK() + ? computeOperationTime( + opCtx, startOperationTime, readConcernArgsStatus.getValue().getLevel()) + : LogicalClock::get(opCtx)->getClusterTime(); + + // An uninitialized operation time means the cluster time is not propagated, so the + // operation time should not be attached to the error response. + if (operationTime != LogicalTime::kUninitialized) { + LOG(1) << "assertion while executing command '" << request.getCommandName() << "' " + << "on database '" << request.getDatabase() << "' " + << "with arguments '" << command->getRedactedCopyForLogging(request.body) + << "' and operationTime '" << operationTime.toString() << "': " << e.toString(); + + generateErrorResponse(opCtx, replyBuilder, e, metadataBob.obj(), operationTime); + } else { + LOG(1) << "assertion while executing command '" << request.getCommandName() << "' " + << "on database '" << request.getDatabase() << "' " + << "with arguments '" << command->getRedactedCopyForLogging(request.body) + << "': " << e.toString(); + + generateErrorResponse(opCtx, replyBuilder, e, metadataBob.obj()); + } + } +} + +/** + * Fills out CurOp / OpDebug with basic command info. + */ +void curOpCommandSetup(OperationContext* opCtx, const OpMsgRequest& request) { + auto curop = CurOp::get(opCtx); + curop->debug().iscommand = true; + + // We construct a legacy $cmd namespace so we can fill in curOp using + // the existing logic that existed for OP_QUERY commands + NamespaceString nss(request.getDatabase(), "$cmd"); + + stdx::lock_guard<Client> lk(*opCtx->getClient()); + curop->setOpDescription_inlock(request.body); + curop->markCommand_inlock(); + curop->setNS_inlock(nss.ns()); +} + +DbResponse runCommands(OperationContext* opCtx, const Message& message) { + auto replyBuilder = rpc::makeReplyBuilder(rpc::protocolForMessage(message)); + [&] { + OpMsgRequest request; + try { // Parse. + request = rpc::opMsgRequestFromAnyProtocol(message); + } catch (const DBException& ex) { + // If this error needs to fail the connection, propagate it out. + if (ErrorCodes::isConnectionFatalMessageParseError(ex.code())) + throw; + + auto operationTime = LogicalClock::get(opCtx)->getClusterTime(); + BSONObjBuilder metadataBob; + appendReplyMetadataOnError(opCtx, &metadataBob); + // Otherwise, reply with the parse error. This is useful for cases where parsing fails + // due to user-supplied input, such as the document too deep error. Since we failed + // during parsing, we can't log anything about the command. + LOG(1) << "assertion while parsing command: " << ex.toString(); + generateErrorResponse(opCtx, replyBuilder.get(), ex, metadataBob.obj(), operationTime); + + return; // From lambda. Don't try executing if parsing failed. + } + + try { // Execute. + curOpCommandSetup(opCtx, request); + + Command* c = nullptr; + // In the absence of a Command object, no redaction is possible. Therefore + // to avoid displaying potentially sensitive information in the logs, + // we restrict the log message to the name of the unrecognized command. + // However, the complete command object will still be echoed to the client. + if (!(c = CommandHelpers::findCommand(request.getCommandName()))) { + globalCommandRegistry()->incrementUnknownCommands(); + std::string msg = str::stream() << "no such command: '" << request.getCommandName() + << "'"; + LOG(2) << msg; + uasserted(ErrorCodes::CommandNotFound, + str::stream() << msg << ", bad cmd: '" << redact(request.body) << "'"); + } + + LOG(2) << "run command " << request.getDatabase() << ".$cmd" << ' ' + << c->getRedactedCopyForLogging(request.body); + + { + // Try to set this as early as possible, as soon as we have figured out the command. + stdx::lock_guard<Client> lk(*opCtx->getClient()); + CurOp::get(opCtx)->setLogicalOp_inlock(c->getLogicalOp()); + } + + execCommandDatabase(opCtx, c, request, replyBuilder.get()); + } catch (const DBException& ex) { + BSONObjBuilder metadataBob; + appendReplyMetadataOnError(opCtx, &metadataBob); + auto operationTime = LogicalClock::get(opCtx)->getClusterTime(); + LOG(1) << "assertion while executing command '" << request.getCommandName() << "' " + << "on database '" << request.getDatabase() << "': " << ex.toString(); + + generateErrorResponse(opCtx, replyBuilder.get(), ex, metadataBob.obj(), operationTime); + } + }(); + + if (OpMsg::isFlagSet(message, OpMsg::kMoreToCome)) { + // Close the connection to get client to go through server selection again. + uassert(ErrorCodes::NotMaster, + "Not-master error during fire-and-forget command processing", + !LastError::get(opCtx->getClient()).hadNotMasterError()); + + return {}; // Don't reply. + } + + auto response = replyBuilder->done(); + CurOp::get(opCtx)->debug().responseLength = response.header().dataLen(); + + // TODO exhaust + return DbResponse{std::move(response)}; +} + +DbResponse receivedQuery(OperationContext* opCtx, + const NamespaceString& nss, + Client& c, + const Message& m) { + invariant(!nss.isCommand()); + globalOpCounters.gotQuery(); + + DbMessage d(m); + QueryMessage q(d); + + CurOp& op = *CurOp::get(opCtx); + DbResponse dbResponse; + + try { + Client* client = opCtx->getClient(); + Status status = AuthorizationSession::get(client)->checkAuthForFind(nss, false); + audit::logQueryAuthzCheck(client, nss, q.query, status.code()); + uassertStatusOK(status); + + dbResponse.exhaustNS = runQuery(opCtx, q, nss, dbResponse.response); + } catch (const AssertionException& e) { + // If we got a stale config, wait in case the operation is stuck in a critical section + if (auto sce = e.extraInfo<StaleConfigInfo>()) { + if (!opCtx->getClient()->isInDirectClient()) { + ShardingState::get(opCtx) + ->onStaleShardVersion( + opCtx, NamespaceString(sce->getns()), sce->getVersionReceived()) + .transitional_ignore(); + } + } + + dbResponse.response.reset(); + generateLegacyQueryErrorResponse(&e, q, &op, &dbResponse.response); + } + + op.debug().responseLength = dbResponse.response.header().dataLen(); + return dbResponse; +} + +void receivedKillCursors(OperationContext* opCtx, const Message& m) { + LastError::get(opCtx->getClient()).disable(); + DbMessage dbmessage(m); + int n = dbmessage.pullInt(); + + uassert(50675, "sent 0 cursors to kill", n != 0); + massert(50674, + str::stream() << "bad kill cursors size: " << m.dataSize(), + m.dataSize() == 8 + (8 * n)); + uassert(50671, str::stream() << "sent negative cursors to kill: " << n, n >= 1); + + if (n > 2000) { + (n < 30000 ? warning() : error()) << "_receivedKillCursors, n=" << n; + verify(n < 30000); + } + + const char* cursorArray = dbmessage.getArray(n); + + int found = CursorManager::killCursorGlobalIfAuthorized(opCtx, n, cursorArray); + + if (shouldLog(logger::LogSeverity::Debug(1)) || found != n) { + LOG(found == n ? 1 : 0) << "killcursors: found " << found << " of " << n; + } +} + +void receivedInsert(OperationContext* opCtx, const NamespaceString& nsString, const Message& m) { + auto insertOp = InsertOp::parseLegacy(m); + invariant(insertOp.getNamespace() == nsString); + + for (const auto& obj : insertOp.getDocuments()) { + Status status = + AuthorizationSession::get(opCtx->getClient())->checkAuthForInsert(opCtx, nsString, obj); + audit::logInsertAuthzCheck(opCtx->getClient(), nsString, obj, status.code()); + uassertStatusOK(status); + } + performInserts(opCtx, insertOp); +} + +void receivedUpdate(OperationContext* opCtx, const NamespaceString& nsString, const Message& m) { + auto updateOp = UpdateOp::parseLegacy(m); + auto& singleUpdate = updateOp.getUpdates()[0]; + invariant(updateOp.getNamespace() == nsString); + + Status status = AuthorizationSession::get(opCtx->getClient()) + ->checkAuthForUpdate(opCtx, + nsString, + singleUpdate.getQ(), + singleUpdate.getU(), + singleUpdate.getUpsert()); + audit::logUpdateAuthzCheck(opCtx->getClient(), + nsString, + singleUpdate.getQ(), + singleUpdate.getU(), + singleUpdate.getUpsert(), + singleUpdate.getMulti(), + status.code()); + uassertStatusOK(status); + + performUpdates(opCtx, updateOp); +} + +void receivedDelete(OperationContext* opCtx, const NamespaceString& nsString, const Message& m) { + auto deleteOp = DeleteOp::parseLegacy(m); + auto& singleDelete = deleteOp.getDeletes()[0]; + invariant(deleteOp.getNamespace() == nsString); + + Status status = AuthorizationSession::get(opCtx->getClient()) + ->checkAuthForDelete(opCtx, nsString, singleDelete.getQ()); + audit::logDeleteAuthzCheck(opCtx->getClient(), nsString, singleDelete.getQ(), status.code()); + uassertStatusOK(status); + + performDeletes(opCtx, deleteOp); +} + +DbResponse receivedGetMore(OperationContext* opCtx, + const Message& m, + CurOp& curop, + bool* shouldLogOpDebug) { + globalOpCounters.gotGetMore(); + DbMessage d(m); + + const char* ns = d.getns(); + int ntoreturn = d.pullInt(); + uassert( + 50676, str::stream() << "Invalid ntoreturn for OP_GET_MORE: " << ntoreturn, ntoreturn >= 0); + long long cursorid = d.pullInt64(); + + curop.debug().ntoreturn = ntoreturn; + curop.debug().cursorid = cursorid; + + { + stdx::lock_guard<Client> lk(*opCtx->getClient()); + CurOp::get(opCtx)->setNS_inlock(ns); + } + + bool exhaust = false; + bool isCursorAuthorized = false; + + DbResponse dbresponse; + try { + const NamespaceString nsString(ns); + uassert(ErrorCodes::InvalidNamespace, + str::stream() << "Invalid ns [" << ns << "]", + nsString.isValid()); + + Status status = AuthorizationSession::get(opCtx->getClient()) + ->checkAuthForGetMore(nsString, cursorid, false); + audit::logGetMoreAuthzCheck(opCtx->getClient(), nsString, cursorid, status.code()); + uassertStatusOK(status); + + while (MONGO_FAIL_POINT(rsStopGetMore)) { + sleepmillis(0); + } + + dbresponse.response = + getMore(opCtx, ns, ntoreturn, cursorid, &exhaust, &isCursorAuthorized); + } catch (AssertionException& e) { + if (isCursorAuthorized) { + // If a cursor with id 'cursorid' was authorized, it may have been advanced + // before an exception terminated processGetMore. Erase the ClientCursor + // because it may now be out of sync with the client's iteration state. + // SERVER-7952 + // TODO Temporary code, see SERVER-4563 for a cleanup overview. + CursorManager::killCursorGlobal(opCtx, cursorid); + } + + BSONObjBuilder err; + err.append("$err", e.reason()); + err.append("code", e.code()); + BSONObj errObj = err.obj(); + + curop.debug().exceptionInfo = e.toStatus(); + + dbresponse = replyToQuery(errObj, ResultFlag_ErrSet); + curop.debug().responseLength = dbresponse.response.header().dataLen(); + curop.debug().nreturned = 1; + *shouldLogOpDebug = true; + return dbresponse; + } + + curop.debug().responseLength = dbresponse.response.header().dataLen(); + auto queryResult = QueryResult::ConstView(dbresponse.response.buf()); + curop.debug().nreturned = queryResult.getNReturned(); + + if (exhaust) { + curop.debug().exhaust = true; + dbresponse.exhaustNS = ns; + } + + return dbresponse; +} + +} // namespace + +DbResponse ServiceEntryPointEmbedded::handleRequest(OperationContext* opCtx, const Message& m) { + // before we lock... + NetworkOp op = m.operation(); + bool isCommand = false; + + DbMessage dbmsg(m); + + Client& c = *opCtx->getClient(); + if (c.isInDirectClient()) { + invariant(!opCtx->lockState()->inAWriteUnitOfWork()); + } else { + LastError::get(c).startRequest(); + AuthorizationSession::get(c)->startRequest(opCtx); + + // We should not be holding any locks at this point + invariant(!opCtx->lockState()->isLocked()); + } + + const char* ns = dbmsg.messageShouldHaveNs() ? dbmsg.getns() : NULL; + const NamespaceString nsString = ns ? NamespaceString(ns) : NamespaceString(); + + if (op == dbQuery) { + if (nsString.isCommand()) { + isCommand = true; + } + } else if (op == dbCommand || op == dbMsg) { + isCommand = true; + } + + CurOp& currentOp = *CurOp::get(opCtx); + { + stdx::lock_guard<Client> lk(*opCtx->getClient()); + // Commands handling code will reset this if the operation is a command + // which is logically a basic CRUD operation like query, insert, etc. + currentOp.setNetworkOp_inlock(op); + currentOp.setLogicalOp_inlock(networkOpToLogicalOp(op)); + } + + OpDebug& debug = currentOp.debug(); + + long long logThresholdMs = serverGlobalParams.slowMS; + bool shouldLogOpDebug = shouldLog(logger::LogSeverity::Debug(1)); + + DbResponse dbresponse; + if (op == dbMsg || op == dbCommand || (op == dbQuery && isCommand)) { + dbresponse = runCommands(opCtx, m); + } else if (op == dbQuery) { + invariant(!isCommand); + dbresponse = receivedQuery(opCtx, nsString, c, m); + } else if (op == dbGetMore) { + dbresponse = receivedGetMore(opCtx, m, currentOp, &shouldLogOpDebug); + } else { + // The remaining operations do not return any response. They are fire-and-forget. + try { + if (op == dbKillCursors) { + currentOp.ensureStarted(); + logThresholdMs = 10; + receivedKillCursors(opCtx, m); + } else if (op != dbInsert && op != dbUpdate && op != dbDelete) { + log() << " operation isn't supported: " << static_cast<int>(op); + currentOp.done(); + shouldLogOpDebug = true; + } else { + if (!opCtx->getClient()->isInDirectClient()) { + uassert(50670, + str::stream() << "legacy writeOps not longer supported for " + << "versioned connections, ns: " + << nsString.ns() + << ", op: " + << networkOpToString(op), + !ShardedConnectionInfo::get(&c, false)); + } + + if (!nsString.isValid()) { + uassert(50672, str::stream() << "Invalid ns [" << ns << "]", false); + } else if (op == dbInsert) { + receivedInsert(opCtx, nsString, m); + } else if (op == dbUpdate) { + receivedUpdate(opCtx, nsString, m); + } else if (op == dbDelete) { + receivedDelete(opCtx, nsString, m); + } else { + invariant(false); + } + } + } catch (const AssertionException& ue) { + LastError::get(c).setLastError(ue.code(), ue.reason()); + LOG(3) << " Caught Assertion in " << networkOpToString(op) << ", continuing " + << redact(ue); + debug.exceptionInfo = ue.toStatus(); + } + } + currentOp.ensureStarted(); + currentOp.done(); + debug.executionTimeMicros = durationCount<Microseconds>(currentOp.elapsedTimeExcludingPauses()); + + Top::get(opCtx->getServiceContext()) + .incrementGlobalLatencyStats( + opCtx, + durationCount<Microseconds>(currentOp.elapsedTimeExcludingPauses()), + currentOp.getReadWriteType()); + + const bool shouldSample = serverGlobalParams.sampleRate == 1.0 + ? true + : c.getPrng().nextCanonicalDouble() < serverGlobalParams.sampleRate; + + if (shouldLogOpDebug || (shouldSample && debug.executionTimeMicros > logThresholdMs * 1000LL)) { + Locker::LockerInfo lockerInfo; + opCtx->lockState()->getLockerInfo(&lockerInfo); + log() << debug.report(&c, currentOp, lockerInfo.stats); + } + + if (currentOp.shouldDBProfile(shouldSample)) { + // Performance profiling is on + if (opCtx->lockState()->isReadLocked()) { + LOG(1) << "note: not profiling because recursive read lock"; + } /*else if (lockedForWriting()) { + // TODO SERVER-26825: Fix race condition where fsyncLock is acquired post + // lockedForWriting() call but prior to profile collection lock acquisition. + LOG(1) << "note: not profiling because doing fsync+lock"; + }*/ else if (storageGlobalParams.readOnly) { + LOG(1) << "note: not profiling because server is read-only"; + } else { + profile(opCtx, op); + } + } + + recordCurOpMetrics(opCtx); + return dbresponse; +} + +} // namespace mongo diff --git a/src/mongo/client/embedded/service_entry_point_embedded.h b/src/mongo/client/embedded/service_entry_point_embedded.h new file mode 100644 index 00000000000..d44847a5c2d --- /dev/null +++ b/src/mongo/client/embedded/service_entry_point_embedded.h @@ -0,0 +1,47 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/base/disallow_copying.h" +#include "mongo/transport/service_entry_point_impl.h" + +namespace mongo { + +/** + * The entry point into mongod. Just a wrapper around assembleResponse. + */ +class ServiceEntryPointEmbedded final : public ServiceEntryPointImpl { + MONGO_DISALLOW_COPYING(ServiceEntryPointEmbedded); + +public: + using ServiceEntryPointImpl::ServiceEntryPointImpl; + DbResponse handleRequest(OperationContext* opCtx, const Message& request) override; +}; + +} // namespace mongo diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 5b3288cf73f..85edb77edbc 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -930,7 +930,6 @@ env.Library( "s/collection_metadata", "s/commands_db_s", "s/sharding", - "service_context_d", "startup_warnings_mongod", "stats/counters", "stats/serveronly_stats", @@ -952,6 +951,20 @@ env.Library( ) env.Library( + target="repair_database_and_check_version", + source=[ + "repair_database_and_check_version.cpp", + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/catalog/catalog_helpers', + 'dbdirectclient', + 'dbhelpers', + 'repair_database', + 'repl/drop_pending_collection_reaper', + ], +) + +env.Library( target='logical_session_id', source=[ 'logical_session_id.cpp', @@ -1666,8 +1679,9 @@ env.CppUnitTest( 'transaction_history_iterator_test.cpp', ], LIBDEPS=[ - 'query_exec', '$BUILD_DIR/mongo/db/repl/mock_repl_coord_server_fixture', - + 'query_exec', + 'service_context_d', + 'write_ops', ], ) diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index d1efc864bd3..2bf312a71a5 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -297,6 +297,7 @@ env.CppUnitTest( "$BUILD_DIR/mongo/db/query/query_planner", "$BUILD_DIR/mongo/db/query/query_test_service_context", "$BUILD_DIR/mongo/db/serveronly", + "$BUILD_DIR/mongo/db/service_context_d", "dcommands", ], ) @@ -308,6 +309,7 @@ env.CppUnitTest( ], LIBDEPS=[ "$BUILD_DIR/mongo/db/serveronly", + "$BUILD_DIR/mongo/db/service_context_d", "dcommands", ], ) @@ -321,6 +323,7 @@ env.CppUnitTest( "$BUILD_DIR/mongo/db/query/query_planner", "$BUILD_DIR/mongo/db/query/query_test_service_context", "$BUILD_DIR/mongo/db/serveronly", + "$BUILD_DIR/mongo/db/service_context_d", "dcommands", ], ) diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index 87409298366..66d14b7a71b 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -92,7 +92,7 @@ #include "mongo/db/op_observer_registry.h" #include "mongo/db/operation_context.h" #include "mongo/db/query/internal_plans.h" -#include "mongo/db/repair_database.h" +#include "mongo/db/repair_database_and_check_version.h" #include "mongo/db/repl/drop_pending_collection_reaper.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/repl_settings.h" @@ -183,95 +183,6 @@ constexpr StringData upgradeLink = "http://dochub.mongodb.org/core/3.6-upgrade-f constexpr StringData mustDowngradeErrorMsg = "UPGRADE PROBLEM: The data files need to be fully upgraded to version 3.4 before attempting an upgrade to 3.6; see http://dochub.mongodb.org/core/3.6-upgrade-fcv for more details."_sd; -Status restoreMissingFeatureCompatibilityVersionDocument(OperationContext* opCtx, - const std::vector<std::string>& dbNames) { - bool isMmapV1 = opCtx->getServiceContext()->getGlobalStorageEngine()->isMmapV1(); - // Check whether there are any collections with UUIDs. - bool collsHaveUuids = false; - bool allCollsHaveUuids = true; - for (const auto& dbName : dbNames) { - Database* db = dbHolder().openDb(opCtx, dbName); - invariant(db); - for (auto collectionIt = db->begin(); collectionIt != db->end(); ++collectionIt) { - Collection* coll = *collectionIt; - if (coll->uuid()) { - collsHaveUuids = true; - } else if (!coll->uuid() && (!isMmapV1 || - !(coll->ns().coll() == "system.indexes" || - coll->ns().coll() == "system.namespaces"))) { - // Exclude system.indexes and system.namespaces from the UUID check until - // SERVER-29926 and SERVER-30095 are addressed, respectively. - allCollsHaveUuids = false; - } - } - } - - if (!collsHaveUuids) { - return {ErrorCodes::MustDowngrade, mustDowngradeErrorMsg}; - } - - // Restore the featureCompatibilityVersion document if it is missing. - NamespaceString nss(FeatureCompatibilityVersion::kCollection); - - Database* db = dbHolder().get(opCtx, nss.db()); - - // If the admin database does not exist, create it. - if (!db) { - log() << "Re-creating admin database that was dropped."; - } - - db = dbHolder().openDb(opCtx, nss.db()); - invariant(db); - - // If admin.system.version does not exist, create it without a UUID. - if (!db->getCollection(opCtx, FeatureCompatibilityVersion::kCollection)) { - log() << "Re-creating admin.system.version collection that was dropped."; - allCollsHaveUuids = false; - BSONObjBuilder bob; - bob.append("create", nss.coll()); - BSONObj createObj = bob.done(); - uassertStatusOK(createCollection(opCtx, nss.db().toString(), createObj)); - } - - Collection* versionColl = db->getCollection(opCtx, FeatureCompatibilityVersion::kCollection); - invariant(versionColl); - BSONObj featureCompatibilityVersion; - if (!Helpers::findOne(opCtx, - versionColl, - BSON("_id" << FeatureCompatibilityVersion::kParameterName), - featureCompatibilityVersion)) { - log() << "Re-creating featureCompatibilityVersion document that was deleted."; - BSONObjBuilder bob; - bob.append("_id", FeatureCompatibilityVersion::kParameterName); - if (allCollsHaveUuids) { - // If all collections have UUIDs, create a featureCompatibilityVersion document with - // version equal to 3.6. - bob.append(FeatureCompatibilityVersion::kVersionField, - FeatureCompatibilityVersionCommandParser::kVersion36); - } else { - // If some collections do not have UUIDs, create a featureCompatibilityVersion document - // with version equal to 3.4 and targetVersion 3.6. - bob.append(FeatureCompatibilityVersion::kVersionField, - FeatureCompatibilityVersionCommandParser::kVersion34); - bob.append(FeatureCompatibilityVersion::kTargetVersionField, - FeatureCompatibilityVersionCommandParser::kVersion36); - } - auto fcvObj = bob.done(); - writeConflictRetry(opCtx, "insertFCVDocument", nss.ns(), [&] { - WriteUnitOfWork wunit(opCtx); - OpDebug* const nullOpDebug = nullptr; - uassertStatusOK( - versionColl->insertDocument(opCtx, InsertStatement(fcvObj), nullOpDebug, false)); - wunit.commit(); - }); - } - invariant(Helpers::findOne(opCtx, - versionColl, - BSON("_id" << FeatureCompatibilityVersion::kParameterName), - featureCompatibilityVersion)); - return Status::OK(); -} - const NamespaceString startupLogCollectionName("local.startup_log"); const NamespaceString kSystemReplSetCollection("local.system.replset"); @@ -320,49 +231,6 @@ void logStartup(OperationContext* opCtx) { } /** - * If we are in a replset, every replicated collection must have an _id index. - * As we scan each database, we also gather a list of drop-pending collection namespaces for - * the DropPendingCollectionReaper to clean up eventually. - */ -void checkForIdIndexesAndDropPendingCollections(OperationContext* opCtx, Database* db) { - if (db->name() == "local") { - // Collections in the local database are not replicated, so we do not need an _id index on - // any collection. For the same reason, it is not possible for the local database to contain - // any drop-pending collections (drops are effective immediately). - return; - } - - std::list<std::string> collectionNames; - db->getDatabaseCatalogEntry()->getCollectionNamespaces(&collectionNames); - - for (const auto& collectionName : collectionNames) { - const NamespaceString ns(collectionName); - - if (ns.isDropPendingNamespace()) { - auto dropOpTime = fassertStatusOK(40459, ns.getDropPendingNamespaceOpTime()); - log() << "Found drop-pending namespace " << ns << " with drop optime " << dropOpTime; - repl::DropPendingCollectionReaper::get(opCtx)->addDropPendingNamespace(dropOpTime, ns); - } - - if (ns.isSystem()) - continue; - - Collection* coll = db->getCollection(opCtx, collectionName); - if (!coll) - continue; - - if (coll->getIndexCatalog()->findIdIndex(opCtx)) - continue; - - log() << "WARNING: the collection '" << collectionName << "' lacks a unique index on _id." - << " This index is needed for replication to function properly" << startupWarningsLog; - log() << "\t To fix this, you need to create a unique index on _id." - << " See http://dochub.mongodb.org/core/build-replica-set-indexes" - << startupWarningsLog; - } -} - -/** * Checks if this server was started without --replset but has a config in local.system.replset * (meaning that this is probably a replica set member started in stand-alone mode). * @@ -379,312 +247,6 @@ unsigned long long checkIfReplMissingFromCommandLine(OperationContext* opCtx) { return 0; } -/** - * Check that the oplog is capped, and abort the process if it is not. - * Caller must lock DB before calling this function. - */ -void checkForCappedOplog(OperationContext* opCtx, Database* db) { - const NamespaceString oplogNss(NamespaceString::kRsOplogNamespace); - invariant(opCtx->lockState()->isDbLockedForMode(oplogNss.db(), MODE_IS)); - Collection* oplogCollection = db->getCollection(opCtx, oplogNss); - if (oplogCollection && !oplogCollection->isCapped()) { - severe() << "The oplog collection " << oplogNss - << " is not capped; a capped oplog is a requirement for replication to function."; - fassertFailedNoTrace(40115); - } -} - -/** - * Return an error status if the wrong mongod version was used for these datafiles. The boolean - * represents whether there are non-local databases. - */ -StatusWith<bool> repairDatabasesAndCheckVersion(OperationContext* opCtx) { - LOG(1) << "enter repairDatabases (to check pdfile version #)"; - - auto const storageEngine = opCtx->getServiceContext()->getGlobalStorageEngine(); - - Lock::GlobalWrite lk(opCtx); - - std::vector<std::string> dbNames; - storageEngine->listDatabases(&dbNames); - - // Repair all databases first, so that we do not try to open them if they are in bad shape - if (storageGlobalParams.repair) { - invariant(!storageGlobalParams.readOnly); - for (const auto& dbName : dbNames) { - LOG(1) << " Repairing database: " << dbName; - fassert(18506, repairDatabase(opCtx, storageEngine, dbName)); - } - - // Attempt to restore the featureCompatibilityVersion document if it is missing. - NamespaceString nss(FeatureCompatibilityVersion::kCollection); - - Database* db = dbHolder().get(opCtx, nss.db()); - Collection* versionColl; - BSONObj featureCompatibilityVersion; - if (!db || !(versionColl = db->getCollection(opCtx, nss)) || - !Helpers::findOne(opCtx, - versionColl, - BSON("_id" << FeatureCompatibilityVersion::kParameterName), - featureCompatibilityVersion)) { - auto status = restoreMissingFeatureCompatibilityVersionDocument(opCtx, dbNames); - if (!status.isOK()) { - return status; - } - } - } - - const repl::ReplSettings& replSettings = - repl::ReplicationCoordinator::get(opCtx)->getSettings(); - - if (!storageGlobalParams.readOnly) { - StatusWith<std::vector<StorageEngine::CollectionIndexNamePair>> swIndexesToRebuild = - storageEngine->reconcileCatalogAndIdents(opCtx); - fassertStatusOK(40593, swIndexesToRebuild); - for (auto&& collIndexPair : swIndexesToRebuild.getValue()) { - const std::string& coll = collIndexPair.first; - const std::string& indexName = collIndexPair.second; - DatabaseCatalogEntry* dbce = - storageEngine->getDatabaseCatalogEntry(opCtx, NamespaceString(coll).db()); - invariant(dbce); - CollectionCatalogEntry* cce = dbce->getCollectionCatalogEntry(coll); - invariant(cce); - - StatusWith<IndexNameObjs> swIndexToRebuild( - getIndexNameObjs(opCtx, dbce, cce, [&indexName](const std::string& str) { - return str == indexName; - })); - if (!swIndexToRebuild.isOK() || swIndexToRebuild.getValue().first.empty()) { - severe() << "Unable to get indexes for collection. Collection: " << coll; - fassertFailedNoTrace(40590); - } - - invariant(swIndexToRebuild.getValue().first.size() == 1 && - swIndexToRebuild.getValue().second.size() == 1); - fassertStatusOK( - 40592, rebuildIndexesOnCollection(opCtx, dbce, cce, swIndexToRebuild.getValue())); - } - - // We open the "local" database before calling checkIfReplMissingFromCommandLine() to - // ensure the in-memory catalog entries for the 'kSystemReplSetCollection' collection have - // been populated if the collection exists. If the "local" database didn't exist at this - // point yet, then it will be created. If the mongod is running in a read-only mode, then - // it is fine to not open the "local" database and populate the catalog entries because we - // won't attempt to drop the temporary collections anyway. - Lock::DBLock dbLock(opCtx, kSystemReplSetCollection.db(), MODE_X); - dbHolder().openDb(opCtx, kSystemReplSetCollection.db()); - } - - // On replica set members we only clear temp collections on DBs other than "local" during - // promotion to primary. On pure slaves, they are only cleared when the oplog tells them - // to. The local DB is special because it is not replicated. See SERVER-10927 for more - // details. - const bool shouldClearNonLocalTmpCollections = - !(checkIfReplMissingFromCommandLine(opCtx) || replSettings.usingReplSets() || - replSettings.isSlave()); - - // To check whether we are upgrading to 3.6 or have already upgraded to 3.6. - bool collsHaveUuids = false; - - // To check whether a featureCompatibilityVersion document exists. - bool fcvDocumentExists = false; - - // To check whether we have databases other than local. - bool nonLocalDatabases = false; - - // Refresh list of database names to include newly-created admin, if it exists. - dbNames.clear(); - storageEngine->listDatabases(&dbNames); - for (const auto& dbName : dbNames) { - if (dbName != "local") { - nonLocalDatabases = true; - } - LOG(1) << " Recovering database: " << dbName; - - Database* db = dbHolder().openDb(opCtx, dbName); - invariant(db); - - // First thing after opening the database is to check for file compatibility, - // otherwise we might crash if this is a deprecated format. - auto status = db->getDatabaseCatalogEntry()->currentFilesCompatible(opCtx); - if (!status.isOK()) { - if (status.code() == ErrorCodes::CanRepairToDowngrade) { - // Convert CanRepairToDowngrade statuses to MustUpgrade statuses to avoid logging a - // potentially confusing and inaccurate message. - // - // TODO SERVER-24097: Log a message informing the user that they can start the - // current version of mongod with --repair and then proceed with normal startup. - status = {ErrorCodes::MustUpgrade, status.reason()}; - } - severe() << "Unable to start mongod due to an incompatibility with the data files and" - " this version of mongod: " - << redact(status); - severe() << "Please consult our documentation when trying to downgrade to a previous" - " major release"; - quickExit(EXIT_NEED_UPGRADE); - MONGO_UNREACHABLE; - } - - // Check if admin.system.version contains an invalid featureCompatibilityVersion. - // If a valid featureCompatibilityVersion is present, cache it as a server parameter. - if (dbName == "admin") { - if (Collection* versionColl = - db->getCollection(opCtx, FeatureCompatibilityVersion::kCollection)) { - BSONObj featureCompatibilityVersion; - if (Helpers::findOne(opCtx, - versionColl, - BSON("_id" << FeatureCompatibilityVersion::kParameterName), - featureCompatibilityVersion)) { - auto swVersion = - FeatureCompatibilityVersion::parse(featureCompatibilityVersion); - if (!swVersion.isOK()) { - severe() << swVersion.getStatus(); - // Note this error path captures all cases of an FCV document existing, - // but with any value other than "3.4" or "3.6". This includes unexpected - // cases with no path forward such as the FCV value not being a string. - return {ErrorCodes::MustDowngrade, - str::stream() - << "UPGRADE PROBLEM: Unable to parse the " - "featureCompatibilityVersion document. The data files need " - "to be fully upgraded to version 3.4 before attempting an " - "upgrade to 3.6. If you are upgrading to 3.6, see " - << upgradeLink - << "."}; - } - fcvDocumentExists = true; - auto version = swVersion.getValue(); - serverGlobalParams.featureCompatibility.setVersion(version); - FeatureCompatibilityVersion::updateMinWireVersion(); - - // On startup, if the version is in an upgrading or downrading state, print a - // warning. - if (version == - ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo36) { - log() << "** WARNING: A featureCompatibilityVersion upgrade did not " - << "complete." << startupWarningsLog; - log() << "** The current featureCompatibilityVersion is " - << FeatureCompatibilityVersion::toString(version) << "." - << startupWarningsLog; - log() << "** To fix this, use the setFeatureCompatibilityVersion " - << "command to resume upgrade to 3.6." << startupWarningsLog; - } else if (version == ServerGlobalParams::FeatureCompatibility::Version:: - kDowngradingTo34) { - log() << "** WARNING: A featureCompatibilityVersion downgrade did not " - << "complete. " << startupWarningsLog; - log() << "** The current featureCompatibilityVersion is " - << FeatureCompatibilityVersion::toString(version) << "." - << startupWarningsLog; - log() << "** To fix this, use the setFeatureCompatibilityVersion " - << "command to resume downgrade to 3.4." << startupWarningsLog; - } else if (version == - ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo40) { - log() << "** WARNING: A featureCompatibilityVersion upgrade did not " - << "complete. " << startupWarningsLog; - log() << "** The current featureCompatibilityVersion is " - << FeatureCompatibilityVersion::toString(version) << "." - << startupWarningsLog; - log() << "** To fix this, use the setFeatureCompatibilityVersion " - << "command to resume upgrade to 4.0." << startupWarningsLog; - } else if (version == ServerGlobalParams::FeatureCompatibility::Version:: - kDowngradingTo36) { - log() << "** WARNING: A featureCompatibilityVersion downgrade did not " - << "complete. " << startupWarningsLog; - log() << "** The current featureCompatibilityVersion is " - << FeatureCompatibilityVersion::toString(version) << "." - << startupWarningsLog; - log() << "** To fix this, use the setFeatureCompatibilityVersion " - << "command to resume downgrade to 3.6." << startupWarningsLog; - } - } - } - } - - // Iterate through collections and check for UUIDs. - for (auto collectionIt = db->begin(); !collsHaveUuids && collectionIt != db->end(); - ++collectionIt) { - Collection* coll = *collectionIt; - if (coll->uuid()) { - collsHaveUuids = true; - } - } - - // Major versions match, check indexes - const NamespaceString systemIndexes(db->name(), "system.indexes"); - - Collection* coll = db->getCollection(opCtx, systemIndexes); - auto exec = InternalPlanner::collectionScan( - opCtx, systemIndexes.ns(), coll, PlanExecutor::NO_YIELD); - - BSONObj index; - PlanExecutor::ExecState state; - while (PlanExecutor::ADVANCED == (state = exec->getNext(&index, NULL))) { - const BSONObj key = index.getObjectField("key"); - const auto plugin = IndexNames::findPluginName(key); - - if (db->getDatabaseCatalogEntry()->isOlderThan24(opCtx)) { - if (IndexNames::existedBefore24(plugin)) { - continue; - } - - log() << "Index " << index << " claims to be of type '" << plugin << "', " - << "which is either invalid or did not exist before v2.4. " - << "See the upgrade section: " - << "http://dochub.mongodb.org/core/upgrade-2.4" << startupWarningsLog; - } - - if (index["v"].isNumber() && index["v"].numberInt() == 0) { - log() << "WARNING: The index: " << index << " was created with the deprecated" - << " v:0 format. This format will not be supported in a future release." - << startupWarningsLog; - log() << "\t To fix this, you need to rebuild this index." - << " For instructions, see http://dochub.mongodb.org/core/rebuild-v0-indexes" - << startupWarningsLog; - } - } - - // Non-yielding collection scans from InternalPlanner will never error. - invariant(PlanExecutor::IS_EOF == state); - - if (replSettings.usingReplSets()) { - // We only care about _id indexes and drop-pending collections if we are in a replset. - checkForIdIndexesAndDropPendingCollections(opCtx, db); - // Ensure oplog is capped (mmap does not guarantee order of inserts on noncapped - // collections) - if (db->name() == "local") { - checkForCappedOplog(opCtx, db); - } - } - - if (!storageGlobalParams.readOnly && - (shouldClearNonLocalTmpCollections || dbName == "local")) { - db->clearTmpCollections(opCtx); - } - } - - // Fail to start up if there is no featureCompatibilityVersion document and there are non-local - // databases present. - if (!fcvDocumentExists && nonLocalDatabases) { - if (collsHaveUuids) { - severe() - << "Unable to start up mongod due to missing featureCompatibilityVersion document."; - if (opCtx->getServiceContext()->getGlobalStorageEngine()->isMmapV1()) { - severe() - << "Please run with --journalOptions " - << static_cast<int>(MMAPV1Options::JournalRecoverOnly) - << " to recover the journal. Then run with --repair to restore the document."; - } else { - severe() << "Please run with --repair to restore the document."; - } - fassertFailedNoTrace(40652); - } else { - return {ErrorCodes::MustDowngrade, mustDowngradeErrorMsg}; - } - } - - LOG(1) << "done repairDatabases"; - return nonLocalDatabases; -} - void initWireSpec() { WireSpec& spec = WireSpec::instance(); diff --git a/src/mongo/db/exec/SConscript b/src/mongo/db/exec/SConscript index 892686c3d8e..55af8286f8a 100644 --- a/src/mongo/db/exec/SConscript +++ b/src/mongo/db/exec/SConscript @@ -56,6 +56,7 @@ env.CppUnitTest( LIBDEPS = [ "$BUILD_DIR/mongo/db/query_exec", "$BUILD_DIR/mongo/db/serveronly", + "$BUILD_DIR/mongo/db/service_context_d", "$BUILD_DIR/mongo/dbtests/mocklib", "$BUILD_DIR/mongo/util/clock_source_mock", ], @@ -69,6 +70,7 @@ env.CppUnitTest( LIBDEPS = [ "$BUILD_DIR/mongo/db/query_exec", "$BUILD_DIR/mongo/db/serveronly", + "$BUILD_DIR/mongo/db/service_context_d", "$BUILD_DIR/mongo/dbtests/mocklib", "$BUILD_DIR/mongo/db/query/collation/collator_factory_mock", "$BUILD_DIR/mongo/db/query/collation/collator_interface_mock", @@ -86,5 +88,6 @@ env.CppUnitTest( "$BUILD_DIR/mongo/db/query/collation/collator_interface_mock", "$BUILD_DIR/mongo/db/query/query_test_service_context", "$BUILD_DIR/mongo/db/serveronly", + "$BUILD_DIR/mongo/db/service_context_d", ], ) diff --git a/src/mongo/db/query/SConscript b/src/mongo/db/query/SConscript index d665ccd9e28..81c4ac22cf9 100644 --- a/src/mongo/db/query/SConscript +++ b/src/mongo/db/query/SConscript @@ -118,6 +118,7 @@ env.CppUnitTest( "query_test_service_context", "$BUILD_DIR/mongo/db/query_exec", "$BUILD_DIR/mongo/db/serveronly", + "$BUILD_DIR/mongo/db/service_context_d", "$BUILD_DIR/mongo/dbtests/mocklib", ], ) @@ -398,6 +399,7 @@ env.CppUnitTest( "query_planner_test_fixture", "$BUILD_DIR/mongo/db/query_exec", "$BUILD_DIR/mongo/db/serveronly", + "$BUILD_DIR/mongo/db/service_context_d", "$BUILD_DIR/mongo/dbtests/mocklib", ], ) diff --git a/src/mongo/db/repair_database_and_check_version.cpp b/src/mongo/db/repair_database_and_check_version.cpp new file mode 100644 index 00000000000..3c1a134e87a --- /dev/null +++ b/src/mongo/db/repair_database_and_check_version.cpp @@ -0,0 +1,525 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage + +#include "mongo/platform/basic.h" + +#include "mongo/db/catalog/create_collection.h" +#include "mongo/db/catalog/database.h" +#include "mongo/db/catalog/database_catalog_entry.h" +#include "mongo/db/catalog/database_holder.h" +#include "mongo/db/catalog/index_catalog.h" +#include "mongo/db/commands/feature_compatibility_version.h" +#include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/dbhelpers.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/query/internal_plans.h" +#include "mongo/db/repair_database.h" +#include "mongo/db/repl/drop_pending_collection_reaper.h" +#include "mongo/db/repl/replication_coordinator_global.h" +#include "mongo/db/storage/mmap_v1/mmap_v1_options.h" +#include "mongo/util/log.h" +#include "mongo/util/quick_exit.h" +#include "mongo/util/version.h" + +#if !defined(_WIN32) +#include <sys/file.h> +#endif + +namespace mongo { + +using logger::LogComponent; +using std::endl; + +namespace { + +constexpr StringData upgradeLink = "http://dochub.mongodb.org/core/3.6-upgrade-fcv"_sd; +constexpr StringData mustDowngradeErrorMsg = + "UPGRADE PROBLEM: The data files need to be fully upgraded to version 3.4 before attempting an upgrade to 3.6; see http://dochub.mongodb.org/core/3.6-upgrade-fcv for more details."_sd; + +Status restoreMissingFeatureCompatibilityVersionDocument(OperationContext* opCtx, + const std::vector<std::string>& dbNames) { + bool isMmapV1 = opCtx->getServiceContext()->getGlobalStorageEngine()->isMmapV1(); + // Check whether there are any collections with UUIDs. + bool collsHaveUuids = false; + bool allCollsHaveUuids = true; + for (const auto& dbName : dbNames) { + Database* db = dbHolder().openDb(opCtx, dbName); + invariant(db); + for (auto collectionIt = db->begin(); collectionIt != db->end(); ++collectionIt) { + Collection* coll = *collectionIt; + if (coll->uuid()) { + collsHaveUuids = true; + } else if (!coll->uuid() && (!isMmapV1 || + !(coll->ns().coll() == "system.indexes" || + coll->ns().coll() == "system.namespaces"))) { + // Exclude system.indexes and system.namespaces from the UUID check until + // SERVER-29926 and SERVER-30095 are addressed, respectively. + allCollsHaveUuids = false; + } + } + } + + if (!collsHaveUuids) { + return {ErrorCodes::MustDowngrade, mustDowngradeErrorMsg}; + } + + // Restore the featureCompatibilityVersion document if it is missing. + NamespaceString nss(FeatureCompatibilityVersion::kCollection); + + Database* db = dbHolder().get(opCtx, nss.db()); + + // If the admin database does not exist, create it. + if (!db) { + log() << "Re-creating admin database that was dropped."; + } + + db = dbHolder().openDb(opCtx, nss.db()); + invariant(db); + + // If admin.system.version does not exist, create it without a UUID. + if (!db->getCollection(opCtx, FeatureCompatibilityVersion::kCollection)) { + log() << "Re-creating admin.system.version collection that was dropped."; + allCollsHaveUuids = false; + BSONObjBuilder bob; + bob.append("create", nss.coll()); + BSONObj createObj = bob.done(); + uassertStatusOK(createCollection(opCtx, nss.db().toString(), createObj)); + } + + Collection* versionColl = db->getCollection(opCtx, FeatureCompatibilityVersion::kCollection); + invariant(versionColl); + BSONObj featureCompatibilityVersion; + if (!Helpers::findOne(opCtx, + versionColl, + BSON("_id" << FeatureCompatibilityVersion::kParameterName), + featureCompatibilityVersion)) { + log() << "Re-creating featureCompatibilityVersion document that was deleted."; + BSONObjBuilder bob; + bob.append("_id", FeatureCompatibilityVersion::kParameterName); + if (allCollsHaveUuids) { + // If all collections have UUIDs, create a featureCompatibilityVersion document with + // version equal to 3.6. + bob.append(FeatureCompatibilityVersion::kVersionField, + FeatureCompatibilityVersionCommandParser::kVersion36); + } else { + // If some collections do not have UUIDs, create a featureCompatibilityVersion document + // with version equal to 3.4 and targetVersion 3.6. + bob.append(FeatureCompatibilityVersion::kVersionField, + FeatureCompatibilityVersionCommandParser::kVersion34); + bob.append(FeatureCompatibilityVersion::kTargetVersionField, + FeatureCompatibilityVersionCommandParser::kVersion36); + } + auto fcvObj = bob.done(); + writeConflictRetry(opCtx, "insertFCVDocument", nss.ns(), [&] { + WriteUnitOfWork wunit(opCtx); + OpDebug* const nullOpDebug = nullptr; + uassertStatusOK( + versionColl->insertDocument(opCtx, InsertStatement(fcvObj), nullOpDebug, false)); + wunit.commit(); + }); + } + invariant(Helpers::findOne(opCtx, + versionColl, + BSON("_id" << FeatureCompatibilityVersion::kParameterName), + featureCompatibilityVersion)); + return Status::OK(); +} + +const NamespaceString startupLogCollectionName("local.startup_log"); +const NamespaceString kSystemReplSetCollection("local.system.replset"); + +/** + * If we are in a replset, every replicated collection must have an _id index. + * As we scan each database, we also gather a list of drop-pending collection namespaces for + * the DropPendingCollectionReaper to clean up eventually. + */ +void checkForIdIndexesAndDropPendingCollections(OperationContext* opCtx, Database* db) { + if (db->name() == "local") { + // Collections in the local database are not replicated, so we do not need an _id index on + // any collection. For the same reason, it is not possible for the local database to contain + // any drop-pending collections (drops are effective immediately). + return; + } + + std::list<std::string> collectionNames; + db->getDatabaseCatalogEntry()->getCollectionNamespaces(&collectionNames); + + for (const auto& collectionName : collectionNames) { + const NamespaceString ns(collectionName); + + if (ns.isDropPendingNamespace()) { + auto dropOpTime = fassertStatusOK(40459, ns.getDropPendingNamespaceOpTime()); + log() << "Found drop-pending namespace " << ns << " with drop optime " << dropOpTime; + repl::DropPendingCollectionReaper::get(opCtx)->addDropPendingNamespace(dropOpTime, ns); + } + + if (ns.isSystem()) + continue; + + Collection* coll = db->getCollection(opCtx, collectionName); + if (!coll) + continue; + + if (coll->getIndexCatalog()->findIdIndex(opCtx)) + continue; + + log() << "WARNING: the collection '" << collectionName << "' lacks a unique index on _id." + << " This index is needed for replication to function properly" << startupWarningsLog; + log() << "\t To fix this, you need to create a unique index on _id." + << " See http://dochub.mongodb.org/core/build-replica-set-indexes" + << startupWarningsLog; + } +} + +/** + * Checks if this server was started without --replset but has a config in local.system.replset + * (meaning that this is probably a replica set member started in stand-alone mode). + * + * @returns the number of documents in local.system.replset or 0 if this was started with + * --replset. + */ +unsigned long long checkIfReplMissingFromCommandLine(OperationContext* opCtx) { + // This is helpful for the query below to work as you can't open files when readlocked + Lock::GlobalWrite lk(opCtx); + if (!repl::getGlobalReplicationCoordinator()->getSettings().usingReplSets()) { + DBDirectClient c(opCtx); + return c.count(kSystemReplSetCollection.ns()); + } + return 0; +} + +/** +* Check that the oplog is capped, and abort the process if it is not. +* Caller must lock DB before calling this function. +*/ +void checkForCappedOplog(OperationContext* opCtx, Database* db) { + const NamespaceString oplogNss(NamespaceString::kRsOplogNamespace); + invariant(opCtx->lockState()->isDbLockedForMode(oplogNss.db(), MODE_IS)); + Collection* oplogCollection = db->getCollection(opCtx, oplogNss); + if (oplogCollection && !oplogCollection->isCapped()) { + severe() << "The oplog collection " << oplogNss + << " is not capped; a capped oplog is a requirement for replication to function."; + fassertFailedNoTrace(40115); + } +} +} // namespace + +/** +* Return an error status if the wrong mongod version was used for these datafiles. The boolean +* represents whether there are non-local databases. +*/ +StatusWith<bool> repairDatabasesAndCheckVersion(OperationContext* opCtx) { + LOG(1) << "enter repairDatabases (to check pdfile version #)"; + + auto const storageEngine = opCtx->getServiceContext()->getGlobalStorageEngine(); + + Lock::GlobalWrite lk(opCtx); + + std::vector<std::string> dbNames; + storageEngine->listDatabases(&dbNames); + + // Repair all databases first, so that we do not try to open them if they are in bad shape + if (storageGlobalParams.repair) { + invariant(!storageGlobalParams.readOnly); + for (const auto& dbName : dbNames) { + LOG(1) << " Repairing database: " << dbName; + fassert(18506, repairDatabase(opCtx, storageEngine, dbName)); + } + + // Attempt to restore the featureCompatibilityVersion document if it is missing. + NamespaceString nss(FeatureCompatibilityVersion::kCollection); + + Database* db = dbHolder().get(opCtx, nss.db()); + Collection* versionColl; + BSONObj featureCompatibilityVersion; + if (!db || !(versionColl = db->getCollection(opCtx, nss)) || + !Helpers::findOne(opCtx, + versionColl, + BSON("_id" << FeatureCompatibilityVersion::kParameterName), + featureCompatibilityVersion)) { + auto status = restoreMissingFeatureCompatibilityVersionDocument(opCtx, dbNames); + if (!status.isOK()) { + return status; + } + } + } + + const repl::ReplSettings& replSettings = + repl::ReplicationCoordinator::get(opCtx)->getSettings(); + + if (!storageGlobalParams.readOnly) { + StatusWith<std::vector<StorageEngine::CollectionIndexNamePair>> swIndexesToRebuild = + storageEngine->reconcileCatalogAndIdents(opCtx); + fassertStatusOK(40593, swIndexesToRebuild); + for (auto&& collIndexPair : swIndexesToRebuild.getValue()) { + const std::string& coll = collIndexPair.first; + const std::string& indexName = collIndexPair.second; + DatabaseCatalogEntry* dbce = + storageEngine->getDatabaseCatalogEntry(opCtx, NamespaceString(coll).db()); + invariant(dbce); + CollectionCatalogEntry* cce = dbce->getCollectionCatalogEntry(coll); + invariant(cce); + + StatusWith<IndexNameObjs> swIndexToRebuild( + getIndexNameObjs(opCtx, dbce, cce, [&indexName](const std::string& str) { + return str == indexName; + })); + if (!swIndexToRebuild.isOK() || swIndexToRebuild.getValue().first.empty()) { + severe() << "Unable to get indexes for collection. Collection: " << coll; + fassertFailedNoTrace(40590); + } + + invariant(swIndexToRebuild.getValue().first.size() == 1 && + swIndexToRebuild.getValue().second.size() == 1); + fassertStatusOK( + 40592, rebuildIndexesOnCollection(opCtx, dbce, cce, swIndexToRebuild.getValue())); + } + + // We open the "local" database before calling checkIfReplMissingFromCommandLine() to + // ensure the in-memory catalog entries for the 'kSystemReplSetCollection' collection have + // been populated if the collection exists. If the "local" database didn't exist at this + // point yet, then it will be created. If the mongod is running in a read-only mode, then + // it is fine to not open the "local" database and populate the catalog entries because we + // won't attempt to drop the temporary collections anyway. + Lock::DBLock dbLock(opCtx, kSystemReplSetCollection.db(), MODE_X); + dbHolder().openDb(opCtx, kSystemReplSetCollection.db()); + } + + // On replica set members we only clear temp collections on DBs other than "local" during + // promotion to primary. On pure slaves, they are only cleared when the oplog tells them + // to. The local DB is special because it is not replicated. See SERVER-10927 for more + // details. + const bool shouldClearNonLocalTmpCollections = + !(checkIfReplMissingFromCommandLine(opCtx) || replSettings.usingReplSets() || + replSettings.isSlave()); + + // To check whether we are upgrading to 3.6 or have already upgraded to 3.6. + bool collsHaveUuids = false; + + // To check whether a featureCompatibilityVersion document exists. + bool fcvDocumentExists = false; + + // To check whether we have databases other than local. + bool nonLocalDatabases = false; + + // Refresh list of database names to include newly-created admin, if it exists. + dbNames.clear(); + storageEngine->listDatabases(&dbNames); + for (const auto& dbName : dbNames) { + if (dbName != "local") { + nonLocalDatabases = true; + } + LOG(1) << " Recovering database: " << dbName; + + Database* db = dbHolder().openDb(opCtx, dbName); + invariant(db); + + // First thing after opening the database is to check for file compatibility, + // otherwise we might crash if this is a deprecated format. + auto status = db->getDatabaseCatalogEntry()->currentFilesCompatible(opCtx); + if (!status.isOK()) { + if (status.code() == ErrorCodes::CanRepairToDowngrade) { + // Convert CanRepairToDowngrade statuses to MustUpgrade statuses to avoid logging a + // potentially confusing and inaccurate message. + // + // TODO SERVER-24097: Log a message informing the user that they can start the + // current version of mongod with --repair and then proceed with normal startup. + status = {ErrorCodes::MustUpgrade, status.reason()}; + } + severe() << "Unable to start mongod due to an incompatibility with the data files and" + " this version of mongod: " + << redact(status); + severe() << "Please consult our documentation when trying to downgrade to a previous" + " major release"; + quickExit(EXIT_NEED_UPGRADE); + MONGO_UNREACHABLE; + } + + // Check if admin.system.version contains an invalid featureCompatibilityVersion. + // If a valid featureCompatibilityVersion is present, cache it as a server parameter. + if (dbName == "admin") { + if (Collection* versionColl = + db->getCollection(opCtx, FeatureCompatibilityVersion::kCollection)) { + BSONObj featureCompatibilityVersion; + if (Helpers::findOne(opCtx, + versionColl, + BSON("_id" << FeatureCompatibilityVersion::kParameterName), + featureCompatibilityVersion)) { + auto swVersion = + FeatureCompatibilityVersion::parse(featureCompatibilityVersion); + if (!swVersion.isOK()) { + severe() << swVersion.getStatus(); + // Note this error path captures all cases of an FCV document existing, + // but with any value other than "3.4" or "3.6". This includes unexpected + // cases with no path forward such as the FCV value not being a string. + return {ErrorCodes::MustDowngrade, + str::stream() + << "UPGRADE PROBLEM: Unable to parse the " + "featureCompatibilityVersion document. The data files need " + "to be fully upgraded to version 3.4 before attempting an " + "upgrade to 3.6. If you are upgrading to 3.6, see " + << upgradeLink + << "."}; + } + fcvDocumentExists = true; + auto version = swVersion.getValue(); + serverGlobalParams.featureCompatibility.setVersion(version); + FeatureCompatibilityVersion::updateMinWireVersion(); + + // On startup, if the version is in an upgrading or downrading state, print a + // warning. + if (version == + ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo36) { + log() << "** WARNING: A featureCompatibilityVersion upgrade did not " + << "complete." << startupWarningsLog; + log() << "** The current featureCompatibilityVersion is " + << FeatureCompatibilityVersion::toString(version) << "." + << startupWarningsLog; + log() << "** To fix this, use the setFeatureCompatibilityVersion " + << "command to resume upgrade to 3.6." << startupWarningsLog; + } else if (version == ServerGlobalParams::FeatureCompatibility::Version:: + kDowngradingTo34) { + log() << "** WARNING: A featureCompatibilityVersion downgrade did not " + << "complete. " << startupWarningsLog; + log() << "** The current featureCompatibilityVersion is " + << FeatureCompatibilityVersion::toString(version) << "." + << startupWarningsLog; + log() << "** To fix this, use the setFeatureCompatibilityVersion " + << "command to resume downgrade to 3.4." << startupWarningsLog; + } else if (version == + ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo40) { + log() << "** WARNING: A featureCompatibilityVersion upgrade did not " + << "complete. " << startupWarningsLog; + log() << "** The current featureCompatibilityVersion is " + << FeatureCompatibilityVersion::toString(version) << "." + << startupWarningsLog; + log() << "** To fix this, use the setFeatureCompatibilityVersion " + << "command to resume upgrade to 4.0." << startupWarningsLog; + } else if (version == ServerGlobalParams::FeatureCompatibility::Version:: + kDowngradingTo36) { + log() << "** WARNING: A featureCompatibilityVersion downgrade did not " + << "complete. " << startupWarningsLog; + log() << "** The current featureCompatibilityVersion is " + << FeatureCompatibilityVersion::toString(version) << "." + << startupWarningsLog; + log() << "** To fix this, use the setFeatureCompatibilityVersion " + << "command to resume downgrade to 3.6." << startupWarningsLog; + } + } + } + } + + // Iterate through collections and check for UUIDs. + for (auto collectionIt = db->begin(); !collsHaveUuids && collectionIt != db->end(); + ++collectionIt) { + Collection* coll = *collectionIt; + if (coll->uuid()) { + collsHaveUuids = true; + } + } + + // Major versions match, check indexes + const NamespaceString systemIndexes(db->name(), "system.indexes"); + + Collection* coll = db->getCollection(opCtx, systemIndexes); + auto exec = InternalPlanner::collectionScan( + opCtx, systemIndexes.ns(), coll, PlanExecutor::NO_YIELD); + + BSONObj index; + PlanExecutor::ExecState state; + while (PlanExecutor::ADVANCED == (state = exec->getNext(&index, NULL))) { + const BSONObj key = index.getObjectField("key"); + const auto plugin = IndexNames::findPluginName(key); + + if (db->getDatabaseCatalogEntry()->isOlderThan24(opCtx)) { + if (IndexNames::existedBefore24(plugin)) { + continue; + } + + log() << "Index " << index << " claims to be of type '" << plugin << "', " + << "which is either invalid or did not exist before v2.4. " + << "See the upgrade section: " + << "http://dochub.mongodb.org/core/upgrade-2.4" << startupWarningsLog; + } + + if (index["v"].isNumber() && index["v"].numberInt() == 0) { + log() << "WARNING: The index: " << index << " was created with the deprecated" + << " v:0 format. This format will not be supported in a future release." + << startupWarningsLog; + log() << "\t To fix this, you need to rebuild this index." + << " For instructions, see http://dochub.mongodb.org/core/rebuild-v0-indexes" + << startupWarningsLog; + } + } + + // Non-yielding collection scans from InternalPlanner will never error. + invariant(PlanExecutor::IS_EOF == state); + + if (replSettings.usingReplSets()) { + // We only care about _id indexes and drop-pending collections if we are in a replset. + checkForIdIndexesAndDropPendingCollections(opCtx, db); + // Ensure oplog is capped (mmap does not guarantee order of inserts on noncapped + // collections) + if (db->name() == "local") { + checkForCappedOplog(opCtx, db); + } + } + + if (!storageGlobalParams.readOnly && + (shouldClearNonLocalTmpCollections || dbName == "local")) { + db->clearTmpCollections(opCtx); + } + } + + // Fail to start up if there is no featureCompatibilityVersion document and there are non-local + // databases present. + if (!fcvDocumentExists && nonLocalDatabases) { + if (collsHaveUuids) { + severe() + << "Unable to start up mongod due to missing featureCompatibilityVersion document."; + if (opCtx->getServiceContext()->getGlobalStorageEngine()->isMmapV1()) { + severe() + << "Please run with --journalOptions " + << static_cast<int>(MMAPV1Options::JournalRecoverOnly) + << " to recover the journal. Then run with --repair to restore the document."; + } else { + severe() << "Please run with --repair to restore the document."; + } + fassertFailedNoTrace(40652); + } else { + return {ErrorCodes::MustDowngrade, mustDowngradeErrorMsg}; + } + } + + LOG(1) << "done repairDatabases"; + return nonLocalDatabases; +} +} // namespace mongo diff --git a/src/mongo/db/repair_database_and_check_version.h b/src/mongo/db/repair_database_and_check_version.h new file mode 100644 index 00000000000..7ab45755254 --- /dev/null +++ b/src/mongo/db/repair_database_and_check_version.h @@ -0,0 +1,41 @@ +/** +* Copyright (C) 2018 MongoDB Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +* +* As a special exception, the copyright holders give permission to link the +* code of portions of this program with the OpenSSL library under certain +* conditions as described in each individual source file and distribute +* linked combinations including the program with the OpenSSL library. You +* must comply with the GNU Affero General Public License in all respects for +* all of the code used other than as permitted herein. If you modify file(s) +* with this exception, you may extend this exception to your version of the +* file(s), but you are not obligated to do so. If you do not wish to do so, +* delete this exception statement from your version. If you delete this +* exception statement from all source files in the program, then also delete +* it in the license file. +*/ + +#pragma once + +#include "mongo/base/status_with.h" + +namespace mongo { +class OperationContext; + +/** +* Return an error status if the wrong mongod version was used for these datafiles. The boolean +* represents whether there are non-local databases. +*/ +StatusWith<bool> repairDatabasesAndCheckVersion(OperationContext* opCtx); +} diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index e1adf4d86eb..23345293afe 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -1044,6 +1044,7 @@ env.CppUnitTest('isself_test', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/serveronly', + "$BUILD_DIR/mongo/db/service_context_d", 'isself', ], ) diff --git a/src/mongo/db/storage/mmap_v1/SConscript b/src/mongo/db/storage/mmap_v1/SConscript index 50f32018f8d..c21a2006fbc 100644 --- a/src/mongo/db/storage/mmap_v1/SConscript +++ b/src/mongo/db/storage/mmap_v1/SConscript @@ -207,6 +207,7 @@ if mmapv1: LIBDEPS=[ '$BUILD_DIR/mongo/db/serveronly', '$BUILD_DIR/mongo/db/service_context', + '$BUILD_DIR/mongo/db/service_context_d', '$BUILD_DIR/mongo/db/storage/storage_engine_metadata', '$BUILD_DIR/mongo/db/storage/storage_options', ], diff --git a/src/mongo/db/storage/wiredtiger/SConscript b/src/mongo/db/storage/wiredtiger/SConscript index 495ac863558..1d6b1461a24 100644 --- a/src/mongo/db/storage/wiredtiger/SConscript +++ b/src/mongo/db/storage/wiredtiger/SConscript @@ -119,6 +119,7 @@ if wiredtiger: LIBDEPS=[ '$BUILD_DIR/mongo/db/serveronly', '$BUILD_DIR/mongo/db/service_context', + '$BUILD_DIR/mongo/db/service_context_d', '$BUILD_DIR/mongo/db/storage/storage_engine_metadata', '$BUILD_DIR/mongo/db/storage/storage_options', '$BUILD_DIR/mongo/db/storage/wiredtiger/storage_wiredtiger_core', |