From 6258369bc2d74e69a5e1fd8e025a291550aeb368 Mon Sep 17 00:00:00 2001 From: Ted Tuckman Date: Fri, 15 Nov 2019 19:50:52 +0000 Subject: SERVER-42693 Add renameAndPreserveOptions command and allow $out to output to different DB --- src/mongo/db/catalog/SConscript | 1 + src/mongo/db/catalog/list_indexes.cpp | 111 ++++++++++++++++ src/mongo/db/catalog/list_indexes.h | 54 ++++++++ src/mongo/db/catalog/rename_collection.cpp | 44 +++++++ src/mongo/db/catalog/rename_collection.h | 7 + src/mongo/db/commands/SConscript | 2 + src/mongo/db/commands/dbcommands.cpp | 4 +- ...nternal_rename_if_options_and_indexes_match.idl | 52 ++++++++ ...nal_rename_if_options_and_indexes_match_cmd.cpp | 109 +++++++++++++++ src/mongo/db/commands/list_indexes.cpp | 38 +----- src/mongo/db/commands/map_reduce_agg_test.cpp | 11 -- src/mongo/db/commands/map_reduce_stats.cpp | 4 +- src/mongo/db/commands/map_reduce_stats_test.cpp | 3 +- src/mongo/db/commands/mr_common.cpp | 16 +-- src/mongo/db/pipeline/SConscript | 1 + src/mongo/db/pipeline/document_source_out.cpp | 95 +++++++++----- src/mongo/db/pipeline/document_source_out.h | 26 +++- src/mongo/db/pipeline/document_source_out_test.cpp | 6 +- src/mongo/db/pipeline/mongo_process_interface.h | 36 +++-- src/mongo/db/pipeline/mongos_process_interface.h | 27 +++- .../db/pipeline/process_interface_shardsvr.cpp | 146 +++++++++++++++++++++ src/mongo/db/pipeline/process_interface_shardsvr.h | 16 +++ .../db/pipeline/process_interface_standalone.cpp | 95 +++++++------- .../db/pipeline/process_interface_standalone.h | 15 ++- .../db/pipeline/stub_mongo_process_interface.h | 27 +++- 25 files changed, 776 insertions(+), 170 deletions(-) create mode 100644 src/mongo/db/catalog/list_indexes.cpp create mode 100644 src/mongo/db/catalog/list_indexes.h create mode 100644 src/mongo/db/commands/internal_rename_if_options_and_indexes_match.idl create mode 100644 src/mongo/db/commands/internal_rename_if_options_and_indexes_match_cmd.cpp (limited to 'src/mongo/db') diff --git a/src/mongo/db/catalog/SConscript b/src/mongo/db/catalog/SConscript index 70cb654df94..d2c7b9ec1e5 100644 --- a/src/mongo/db/catalog/SConscript +++ b/src/mongo/db/catalog/SConscript @@ -395,6 +395,7 @@ env.Library( 'drop_database.cpp', 'drop_indexes.cpp', 'rename_collection.cpp', + 'list_indexes.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/base', diff --git a/src/mongo/db/catalog/list_indexes.cpp b/src/mongo/db/catalog/list_indexes.cpp new file mode 100644 index 00000000000..510fd7e3aec --- /dev/null +++ b/src/mongo/db/catalog/list_indexes.cpp @@ -0,0 +1,111 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/catalog/list_indexes.h" + +#include + +#include "mongo/bson/bsonobj.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/curop_failpoint_helpers.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/storage/durable_catalog.h" +#include "mongo/db/storage/storage_engine.h" +#include "mongo/util/uuid.h" + +// Failpoint which causes to hang "listIndexes" cmd after acquiring the DB lock. +MONGO_FAIL_POINT_DEFINE(hangBeforeListIndexes); + +namespace mongo { + +StatusWith> listIndexes(OperationContext* opCtx, + const NamespaceStringOrUUID& ns, + bool includeBuildUUIDs) { + AutoGetCollectionForReadCommand ctx(opCtx, ns); + Collection* collection = ctx.getCollection(); + auto nss = ctx.getNss(); + if (!collection) { + return StatusWith>(ErrorCodes::NamespaceNotFound, + str::stream() + << "ns does not exist: " << ctx.getNss().ns()); + } + return StatusWith>( + listIndexesInLock(opCtx, collection, nss, includeBuildUUIDs)); +} + +std::list listIndexesInLock(OperationContext* opCtx, + Collection* collection, + const NamespaceString& nss, + bool includeBuildUUIDs) { + + auto durableCatalog = DurableCatalog::get(opCtx); + + CurOpFailpointHelpers::waitWhileFailPointEnabled( + &hangBeforeListIndexes, opCtx, "hangBeforeListIndexes", []() {}, false, nss); + + std::vector indexNames; + writeConflictRetry(opCtx, "listIndexes", nss.ns(), [&] { + indexNames.clear(); + durableCatalog->getAllIndexes(opCtx, collection->getCatalogId(), &indexNames); + }); + + std::list indexSpecs; + + for (size_t i = 0; i < indexNames.size(); i++) { + auto indexSpec = writeConflictRetry(opCtx, "listIndexes", nss.ns(), [&] { + if (includeBuildUUIDs && + !durableCatalog->isIndexReady(opCtx, collection->getCatalogId(), indexNames[i])) { + BSONObjBuilder builder; + builder.append( + "spec"_sd, + durableCatalog->getIndexSpec(opCtx, collection->getCatalogId(), indexNames[i])); + + // TODO(SERVER-37980): Replace with index build UUID. + auto indexBuildUUID = UUID::gen(); + indexBuildUUID.appendToBuilder(&builder, "buildUUID"_sd); + return builder.obj(); + } + return durableCatalog->getIndexSpec(opCtx, collection->getCatalogId(), indexNames[i]); + }); + indexSpecs.push_back(indexSpec); + } + return indexSpecs; +} +std::list listIndexesEmptyListIfMissing(OperationContext* opCtx, + const NamespaceStringOrUUID& nss, + bool includeBuildUUIDs) { + auto listStatus = listIndexes(opCtx, nss, includeBuildUUIDs); + return listStatus.isOK() ? listStatus.getValue() : std::list(); +} +} // namespace mongo diff --git a/src/mongo/db/catalog/list_indexes.h b/src/mongo/db/catalog/list_indexes.h new file mode 100644 index 00000000000..62296826e9a --- /dev/null +++ b/src/mongo/db/catalog/list_indexes.h @@ -0,0 +1,54 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once +#include + +#include "mongo/base/status_with.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/operation_context.h" + +namespace mongo { + +/** + * Return a list of the indexes on the given collection. + */ +StatusWith> listIndexes(OperationContext* opCtx, + const NamespaceStringOrUUID& ns, + bool includeBuildUUIDs); +std::list listIndexesInLock(OperationContext* opCtx, + Collection* collection, + const NamespaceString& nss, + bool includeBuildUUIDs); +std::list listIndexesEmptyListIfMissing(OperationContext* opCtx, + const NamespaceStringOrUUID& nss, + bool includeBuildUUIDs); + +} // namespace mongo diff --git a/src/mongo/db/catalog/rename_collection.cpp b/src/mongo/db/catalog/rename_collection.cpp index 0731fa184e4..dc1d2208914 100644 --- a/src/mongo/db/catalog/rename_collection.cpp +++ b/src/mongo/db/catalog/rename_collection.cpp @@ -39,6 +39,7 @@ #include "mongo/db/catalog/document_validation.h" #include "mongo/db/catalog/drop_collection.h" #include "mongo/db/catalog/index_catalog.h" +#include "mongo/db/catalog/list_indexes.h" #include "mongo/db/client.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/curop.h" @@ -706,6 +707,49 @@ Status renameBetweenDBs(OperationContext* opCtx, } // namespace +void doLocalRenameIfOptionsAndIndexesHaveNotChanged(OperationContext* opCtx, + const NamespaceString& sourceNs, + const NamespaceString& targetNs, + bool dropTarget, + bool stayTemp, + std::list originalIndexes, + BSONObj originalCollectionOptions) { + AutoGetDb dbLock(opCtx, targetNs.db(), MODE_X); + auto collection = dbLock.getDb() + ? CollectionCatalog::get(opCtx).lookupCollectionByNamespace(targetNs) + : nullptr; + BSONObj collectionOptions = {}; + if (collection) { + // We do not include the UUID field in the options comparison. It is ok if the target + // collection was dropped and recreated, as long as the new target collection has the same + // options and indexes as the original one did. This is mainly to support concurrent $out + // to the same collection. + collectionOptions = DurableCatalog::get(opCtx) + ->getCollectionOptions(opCtx, collection->getCatalogId()) + .toBSON() + .removeField("uuid"); + } + + uassert(ErrorCodes::CommandFailed, + str::stream() << "collection options of target collection " << targetNs.ns() + << " changed during processing. Original options: " + << originalCollectionOptions << ", new options: " << collectionOptions, + SimpleBSONObjComparator::kInstance.evaluate( + originalCollectionOptions.removeField("uuid") == collectionOptions)); + + auto currentIndexes = + listIndexesEmptyListIfMissing(opCtx, targetNs, false /* includeBuildUUIDs */); + uassert(ErrorCodes::CommandFailed, + str::stream() << "indexes of target collection " << targetNs.ns() + << " changed during processing.", + originalIndexes.size() == currentIndexes.size() && + std::equal(originalIndexes.begin(), + originalIndexes.end(), + currentIndexes.begin(), + SimpleBSONObjComparator::kInstance.makeEqualTo())); + + validateAndRunRenameCollection(opCtx, sourceNs, targetNs, dropTarget, stayTemp); +} void validateAndRunRenameCollection(OperationContext* opCtx, const NamespaceString& source, const NamespaceString& target, diff --git a/src/mongo/db/catalog/rename_collection.h b/src/mongo/db/catalog/rename_collection.h index 8b4af732173..7269fb93ac9 100644 --- a/src/mongo/db/catalog/rename_collection.h +++ b/src/mongo/db/catalog/rename_collection.h @@ -42,6 +42,13 @@ namespace repl { class OpTime; } // namespace repl +void doLocalRenameIfOptionsAndIndexesHaveNotChanged(OperationContext* opCtx, + const NamespaceString& sourceNs, + const NamespaceString& targetNs, + bool dropTarget, + bool stayTemp, + std::list originalIndexes, + BSONObj collectionOptions); /** * Renames the collection from "source" to "target" and drops the existing collection iff * "dropTarget" is true. "stayTemp" indicates whether a collection should maintain its diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index 143874e5836..7464bc5b2b0 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -356,6 +356,7 @@ env.Library( "dbhash.cpp", "driverHelpers.cpp", "haystack.cpp", + "internal_rename_if_options_and_indexes_match_cmd.cpp", "map_reduce_command.cpp", "map_reduce_finish_command.cpp", "mr.cpp", @@ -371,6 +372,7 @@ env.Library( "txn_cmds.cpp", "user_management_commands.cpp", "vote_commit_index_build_command.cpp", + env.Idlc('internal_rename_if_options_and_indexes_match.idl')[0], env.Idlc('vote_commit_index_build.idl')[0], ], LIBDEPS=[ diff --git a/src/mongo/db/commands/dbcommands.cpp b/src/mongo/db/commands/dbcommands.cpp index 3db0141f5a5..e21d689045a 100644 --- a/src/mongo/db/commands/dbcommands.cpp +++ b/src/mongo/db/commands/dbcommands.cpp @@ -343,7 +343,9 @@ public: if (cmd.getTemp()) { uassert(ErrorCodes::InvalidOptions, str::stream() << "the 'temp' field is an invalid option", - opCtx->getClient()->isInDirectClient()); + opCtx->getClient()->isInDirectClient() || + (opCtx->getClient()->session()->getTags() | + transport::Session::kInternalClient)); } // Validate _id index spec and fill in missing fields. diff --git a/src/mongo/db/commands/internal_rename_if_options_and_indexes_match.idl b/src/mongo/db/commands/internal_rename_if_options_and_indexes_match.idl new file mode 100644 index 00000000000..fd0ec5f6ecf --- /dev/null +++ b/src/mongo/db/commands/internal_rename_if_options_and_indexes_match.idl @@ -0,0 +1,52 @@ +# Copyright (C) 2019-present MongoDB, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the Server Side Public License, version 1, +# as published by MongoDB, Inc. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Server Side Public License for more details. +# +# You should have received a copy of the Server Side Public License +# along with this program. If not, see +# . +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the Server Side Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. + +global: + cpp_namespace: "mongo" + +imports: + - "mongo/idl/basic_types.idl" + +commands: + internalRenameIfOptionsAndIndexesMatch: + description: "An internal command that does a rename, but first checks to make sure the + indexes and collection options on the destination match those given in the + command." + namespace: ignored + fields: + from: + type: namespacestring + to: + type: namespacestring + collectionOptions: + description: "An object representing the options on the from collection with the + same format as the options from the listCollections command." + type: object + indexes: + description: "An object with form {indexName: {spec}, indexName: {spec}, ...}" + type: array + diff --git a/src/mongo/db/commands/internal_rename_if_options_and_indexes_match_cmd.cpp b/src/mongo/db/commands/internal_rename_if_options_and_indexes_match_cmd.cpp new file mode 100644 index 00000000000..8e112043f5a --- /dev/null +++ b/src/mongo/db/commands/internal_rename_if_options_and_indexes_match_cmd.cpp @@ -0,0 +1,109 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/commands/internal_rename_if_options_and_indexes_match_gen.h" + +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/catalog/rename_collection.h" +#include "mongo/db/commands.h" +#include "mongo/db/db_raii.h" + +namespace mongo { + +/** + * Rename a collection while checking collection option and indexes. + */ +class InternalRenameIfOptionsAndIndexesMatchCmd final + : public TypedCommand { +public: + using Request = InternalRenameIfOptionsAndIndexesMatch; + + class Invocation final : public InvocationBase { + public: + using InvocationBase::InvocationBase; + + void typedRun(OperationContext* opCtx) { + auto thisRequest = request(); + auto originalIndexes = thisRequest.getIndexes(); + auto indexList = std::list(originalIndexes.begin(), originalIndexes.end()); + doLocalRenameIfOptionsAndIndexesHaveNotChanged(opCtx, + thisRequest.getFrom(), + thisRequest.getTo(), + true /* dropTarget */, + false /* stayTemp */, + std::move(indexList), + thisRequest.getCollectionOptions()); + } + + private: + NamespaceString ns() const override { + return request().getFrom(); + } + bool supportsWriteConcern() const override { + return true; + } + + void doCheckAuthorization(OperationContext* opCtx) const override { + auto thisRequest = request(); + auto from = thisRequest.getFrom(); + auto to = thisRequest.getTo(); + uassert(ErrorCodes::Unauthorized, + str::stream() << "Unauthorized to rename " << from, + AuthorizationSession::get(opCtx->getClient()) + ->isAuthorizedForActionsOnResource(ResourcePattern::forExactNamespace(from), + ActionType::renameCollection)); + uassert(ErrorCodes::Unauthorized, + str::stream() << "Unauthorized to drop " << to, + AuthorizationSession::get(opCtx->getClient()) + ->isAuthorizedForActionsOnResource(ResourcePattern::forExactNamespace(to), + ActionType::dropCollection)); + uassert(ErrorCodes::Unauthorized, + str::stream() << "Unauthorized to insert into " << to, + AuthorizationSession::get(opCtx->getClient()) + ->isAuthorizedForActionsOnResource(ResourcePattern::forExactNamespace(to), + ActionType::insert)); + } + }; + + std::string help() const override { + return "Internal command to rename and check collection options"; + } + + bool adminOnly() const override { + return true; + } + + AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { + return AllowedOnSecondary::kNever; + } + +} internalRenameIfOptionsAndIndexesMatchCmd; +} // namespace mongo diff --git a/src/mongo/db/commands/list_indexes.cpp b/src/mongo/db/commands/list_indexes.cpp index 858b22daba0..55d3c6b0a65 100644 --- a/src/mongo/db/commands/list_indexes.cpp +++ b/src/mongo/db/commands/list_indexes.cpp @@ -33,6 +33,7 @@ #include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/auth/authorization_session.h" +#include "mongo/db/catalog/list_indexes.h" #include "mongo/db/clientcursor.h" #include "mongo/db/commands.h" #include "mongo/db/concurrency/write_conflict_exception.h" @@ -60,9 +61,6 @@ using std::vector; namespace { -// Failpoint which causes to hang "listIndexes" cmd after acquiring the DB lock. -MONGO_FAIL_POINT_DEFINE(hangBeforeListIndexes); - /** * Lists the indexes for a given collection. * If 'includeBuildUUIDs' is true, then the index build uuid is also returned alongside the index @@ -153,42 +151,12 @@ public: uassert(ErrorCodes::NamespaceNotFound, str::stream() << "ns does not exist: " << ctx.getNss().ns(), collection); - - auto durableCatalog = DurableCatalog::get(opCtx); - nss = ctx.getNss(); - - CurOpFailpointHelpers::waitWhileFailPointEnabled( - &hangBeforeListIndexes, opCtx, "hangBeforeListIndexes", []() {}, false, nss); - - vector indexNames; - writeConflictRetry(opCtx, "listIndexes", nss.ns(), [&] { - indexNames.clear(); - durableCatalog->getAllIndexes(opCtx, collection->getCatalogId(), &indexNames); - }); - + auto indexList = listIndexesInLock(opCtx, collection, nss, includeBuildUUIDs); auto ws = std::make_unique(); auto root = std::make_unique(opCtx, ws.get()); - for (size_t i = 0; i < indexNames.size(); i++) { - auto indexSpec = writeConflictRetry(opCtx, "listIndexes", nss.ns(), [&] { - if (includeBuildUUIDs && - !durableCatalog->isIndexReady( - opCtx, collection->getCatalogId(), indexNames[i])) { - BSONObjBuilder builder; - builder.append("spec"_sd, - durableCatalog->getIndexSpec( - opCtx, collection->getCatalogId(), indexNames[i])); - - // TODO(SERVER-37980): Replace with index build UUID. - auto indexBuildUUID = UUID::gen(); - indexBuildUUID.appendToBuilder(&builder, "buildUUID"_sd); - return builder.obj(); - } - return durableCatalog->getIndexSpec( - opCtx, collection->getCatalogId(), indexNames[i]); - }); - + for (auto&& indexSpec : indexList) { WorkingSetID id = ws->allocate(); WorkingSetMember* member = ws->get(id); member->keyData.clear(); diff --git a/src/mongo/db/commands/map_reduce_agg_test.cpp b/src/mongo/db/commands/map_reduce_agg_test.cpp index f1e27e6e1fc..877a93b7fdd 100644 --- a/src/mongo/db/commands/map_reduce_agg_test.cpp +++ b/src/mongo/db/commands/map_reduce_agg_test.cpp @@ -215,17 +215,6 @@ TEST(MapReduceAggTest, testOutReduceTranslate) { ASSERT_EQ("$project"s, (*subpipeline)[0].firstElement().fieldName()); } -TEST(MapReduceAggTest, testOutDifferentDBFails) { - auto nss = NamespaceString{"db", "coll"}; - auto mr = MapReduce{ - nss, - MapReduceJavascriptCode{mapJavascript.toString()}, - MapReduceJavascriptCode{reduceJavascript.toString()}, - MapReduceOutOptions{boost::make_optional("db2"s), "coll2", OutputType::Replace, false}}; - boost::intrusive_ptr expCtx(new ExpressionContextForTest(nss)); - ASSERT_THROWS_CODE(map_reduce_common::translateFromMR(mr, expCtx), AssertionException, 31278); -} - TEST(MapReduceAggTest, testOutSameCollection) { auto nss = NamespaceString{"db", "coll"}; auto mr = MapReduce{ diff --git a/src/mongo/db/commands/map_reduce_stats.cpp b/src/mongo/db/commands/map_reduce_stats.cpp index 5440bb435df..bff4dfd8732 100644 --- a/src/mongo/db/commands/map_reduce_stats.cpp +++ b/src/mongo/db/commands/map_reduce_stats.cpp @@ -62,7 +62,9 @@ MapReduceStats::MapReduceStats(const std::vector& pipelineStats, _timing.reduce = stageStats.executionTimeMillis - prevTime; _counts.output = stageStats.advanced; } else { - invariant(stageName == "$out"_sd || stageName == "$merge"_sd, stageName); + invariant(stageName == "$out"_sd || stageName == "$internalOutToDifferentDB"_sd || + stageName == "$merge"_sd, + stageName); } prevTime = stageStats.executionTimeMillis; diff --git a/src/mongo/db/commands/map_reduce_stats_test.cpp b/src/mongo/db/commands/map_reduce_stats_test.cpp index 28784f58c1f..a769e47b564 100644 --- a/src/mongo/db/commands/map_reduce_stats_test.cpp +++ b/src/mongo/db/commands/map_reduce_stats_test.cpp @@ -190,7 +190,8 @@ TEST_F(MapReduceStatsTest, ConfirmStatsUnshardedWithFinalizeProjectStage) { DEATH_TEST_F(MapReduceStatsTest, DeathByUnknownStage, - "Invariant failure stageName == \"$out\"_sd || stageName == \"$merge\"_sd") { + "Invariant failure stageName == \"$out\"_sd || stageName == " + "\"$internalOutToDifferentDB\"_sd || stageName == \"$merge\"_sd") { addInvalidStageForMapReduce(); MapReduceStats mapReduceStats(buildMapReducePipelineStats(), MapReduceStats::ResponseType::kUnsharded, diff --git a/src/mongo/db/commands/mr_common.cpp b/src/mongo/db/commands/mr_common.cpp index 10cb66b3543..bad14132cc1 100644 --- a/src/mongo/db/commands/mr_common.cpp +++ b/src/mongo/db/commands/mr_common.cpp @@ -159,13 +159,8 @@ auto translateFinalize(boost::intrusive_ptr expCtx, std::stri } auto translateOutReplace(boost::intrusive_ptr expCtx, - const StringData inputDatabase, NamespaceString targetNss) { - uassert(31278, - "MapReduce must output to the database belonging to its input collection - Input: "s + - inputDatabase + " Output: " + targetNss.db(), - inputDatabase == targetNss.db()); - return DocumentSourceOut::create(std::move(targetNss), expCtx); + return DocumentSourceOut::createAndAllowDifferentDB(std::move(targetNss), expCtx); } auto translateOutMerge(boost::intrusive_ptr expCtx, NamespaceString targetNss) { @@ -205,12 +200,11 @@ auto translateOutReduce(boost::intrusive_ptr expCtx, auto translateOut(boost::intrusive_ptr expCtx, const OutputType outputType, - const StringData inputDatabase, NamespaceString targetNss, std::string reduceCode) { switch (outputType) { case OutputType::Replace: - return boost::make_optional(translateOutReplace(expCtx, inputDatabase, targetNss)); + return boost::make_optional(translateOutReplace(expCtx, targetNss)); case OutputType::Merge: return boost::make_optional(translateOutMerge(expCtx, targetNss)); case OutputType::Reduce: @@ -388,11 +382,7 @@ std::unique_ptr translateFromMR( parsedMr.getFinalize().map([&](auto&& finalize) { return translateFinalize(expCtx, parsedMr.getFinalize()->getCode()); }), - translateOut(expCtx, - outType, - parsedMr.getNamespace().db(), - std::move(outNss), - parsedMr.getReduce().getCode())), + translateOut(expCtx, outType, std::move(outNss), parsedMr.getReduce().getCode())), expCtx)); pipeline->optimizePipeline(); return pipeline; diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 13da4edb4aa..088a7953325 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -205,6 +205,7 @@ env.Library( 'process_interface_standalone.cpp', ], LIBDEPS=[ + '$BUILD_DIR/mongo/db/catalog/catalog_helpers', '$BUILD_DIR/mongo/db/ops/write_ops_exec', '$BUILD_DIR/mongo/db/query_exec', '$BUILD_DIR/mongo/db/repl/speculative_majority_read_info', diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp index 860d9028bec..fc390947621 100644 --- a/src/mongo/db/pipeline/document_source_out.cpp +++ b/src/mongo/db/pipeline/document_source_out.cpp @@ -41,16 +41,18 @@ #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/util/destructor_guard.h" #include "mongo/util/log.h" +#include "mongo/util/uuid.h" namespace mongo { using namespace fmt::literals; -static AtomicWord aggOutCounter; - MONGO_FAIL_POINT_DEFINE(hangWhileBuildingDocumentSourceOutBatch); REGISTER_DOCUMENT_SOURCE(out, DocumentSourceOut::LiteParsed::parse, DocumentSourceOut::createFromBson); +REGISTER_DOCUMENT_SOURCE(internalOutToDifferentDB, + DocumentSourceOut::LiteParsed::parseToDifferentDB, + DocumentSourceOut::createFromBsonToDifferentDB); DocumentSourceOut::~DocumentSourceOut() { DESTRUCTOR_GUARD( @@ -72,18 +74,42 @@ DocumentSourceOut::~DocumentSourceOut() { [this] { pExpCtx->mongoProcessInterface->setOperationContext(pExpCtx->opCtx); }); pExpCtx->mongoProcessInterface->setOperationContext(cleanupOpCtx.get()); - pExpCtx->mongoProcessInterface->directClient()->dropCollection(_tempNs.ns()); + pExpCtx->mongoProcessInterface->dropCollection(cleanupOpCtx.get(), _tempNs); }); } +std::unique_ptr DocumentSourceOut::LiteParsed::parseToDifferentDB( + const AggregationRequest& request, const BSONElement& spec) { + + auto specObj = spec.Obj(); + auto dbElem = specObj["db"]; + auto collElem = specObj["coll"]; + uassert(16994, + str::stream() << kStageName << " must have db and coll string arguments", + dbElem.type() == BSONType::String && collElem.type() == BSONType::String); + NamespaceString targetNss{dbElem.String(), collElem.String()}; + uassert(ErrorCodes::InvalidNamespace, + "Invalid {} target namespace, {}"_format(kStageName, targetNss.ns()), + targetNss.isValid()); + + ActionSet actions{ActionType::insert, ActionType::remove}; + if (request.shouldBypassDocumentValidation()) { + actions.addAction(ActionType::bypassDocumentValidation); + } + + PrivilegeVector privileges{Privilege(ResourcePattern::forExactNamespace(targetNss), actions)}; + + return std::make_unique(std::move(targetNss), + std::move(privileges)); +} + std::unique_ptr DocumentSourceOut::LiteParsed::parse( const AggregationRequest& request, const BSONElement& spec) { - uassert(ErrorCodes::TypeMismatch, - "{} stage requires a string argument, but found {}"_format(kStageName, - typeName(spec.type())), + uassert(16990, + "{} only supports a string argument, but found {}"_format(kStageName, + typeName(spec.type())), spec.type() == BSONType::String); - NamespaceString targetNss{request.getNamespaceString().db(), spec.valueStringData()}; uassert(ErrorCodes::InvalidNamespace, "Invalid {} target namespace, {}"_format(kStageName, targetNss.ns()), @@ -103,16 +129,19 @@ std::unique_ptr DocumentSourceOut::LiteParsed::pa void DocumentSourceOut::initialize() { DocumentSourceWriteBlock writeBlock(pExpCtx->opCtx); - DBClientBase* conn = pExpCtx->mongoProcessInterface->directClient(); - const auto& outputNs = getOutputNs(); - _tempNs = NamespaceString(str::stream() - << outputNs.db() << ".tmp.agg_out." << aggOutCounter.addAndFetch(1)); + // We will write all results into a temporary collection, then rename the temporary collection + // to be the target collection once we are done. + _tempNs = NamespaceString(str::stream() << outputNs.db() << ".tmp.agg_out." << UUID::gen()); // Save the original collection options and index specs so we can check they didn't change // during computation. - _originalOutOptions = pExpCtx->mongoProcessInterface->getCollectionOptions(outputNs); - _originalIndexes = conn->getIndexSpecs(outputNs); + _originalOutOptions = + // The uuid field is considered an option, but cannot be passed to createCollection. + pExpCtx->mongoProcessInterface->getCollectionOptions(pExpCtx->opCtx, outputNs) + .removeField("uuid"); + _originalIndexes = pExpCtx->mongoProcessInterface->getIndexSpecs( + pExpCtx->opCtx, outputNs, false /* includeBuildUUIDs */); // Check if it's capped to make sure we have a chance of succeeding before we do all the work. // If the collection becomes capped during processing, the collection options will have changed, @@ -121,11 +150,6 @@ void DocumentSourceOut::initialize() { "namespace '{}' is capped so it can't be used for {}"_format(outputNs.ns(), kStageName), _originalOutOptions["capped"].eoo()); - // We will write all results into a temporary collection, then rename the temporary - // collection to be the target collection once we are done. - _tempNs = NamespaceString(str::stream() - << outputNs.db() << ".tmp.agg_out." << aggOutCounter.addAndFetch(1)); - // Create temp collection, copying options from the existing output collection if any. { BSONObjBuilder cmd; @@ -133,11 +157,8 @@ void DocumentSourceOut::initialize() { cmd << "temp" << true; cmd.appendElementsUnique(_originalOutOptions); - BSONObj info; - uassert(16994, - "failed to create temporary {} collection '{}': {}"_format( - kStageName, _tempNs.ns(), getStatusFromCommandResult(info).reason()), - conn->runCommand(outputNs.db().toString(), cmd.done(), info)); + pExpCtx->mongoProcessInterface->createCollection( + pExpCtx->opCtx, _tempNs.db().toString(), cmd.done()); } if (_originalIndexes.empty()) { @@ -148,7 +169,7 @@ void DocumentSourceOut::initialize() { try { std::vector tempNsIndexes = {std::begin(_originalIndexes), std::end(_originalIndexes)}; - conn->createIndexes(_tempNs.ns(), tempNsIndexes); + pExpCtx->mongoProcessInterface->createIndexes(pExpCtx->opCtx, _tempNs, tempNsIndexes); } catch (DBException& ex) { ex.addContext("Copying indexes for $out failed"); throw; @@ -177,7 +198,11 @@ boost::intrusive_ptr DocumentSourceOut::create( "{} is not supported when the output collection is in a different " "database"_format(kStageName), outputNs.db() == expCtx->ns.db()); + return createAndAllowDifferentDB(outputNs, expCtx); +} +boost::intrusive_ptr DocumentSourceOut::createAndAllowDifferentDB( + NamespaceString outputNs, const boost::intrusive_ptr& expCtx) { uassert(ErrorCodes::OperationNotSupportedInTransaction, "{} cannot be used in a transaction"_format(kStageName), !expCtx->inMultiDocumentTransaction); @@ -191,10 +216,6 @@ boost::intrusive_ptr DocumentSourceOut::create( "Invalid {} target namespace, {}"_format(kStageName, outputNs.ns()), outputNs.isValid()); - uassert(17017, - "{} is not supported to an existing *sharded* output collection"_format(kStageName), - !expCtx->mongoProcessInterface->isSharded(expCtx->opCtx, outputNs)); - uassert(17385, "Can't {} to special collection: {}"_format(kStageName, outputNs.coll()), !outputNs.isSystem()); @@ -208,20 +229,24 @@ boost::intrusive_ptr DocumentSourceOut::create( boost::intrusive_ptr DocumentSourceOut::createFromBson( BSONElement elem, const boost::intrusive_ptr& expCtx) { - uassert(16990, + uassert(31278, "{} only supports a string argument, but found {}"_format(kStageName, typeName(elem.type())), elem.type() == BSONType::String); - return create({expCtx->ns.db(), elem.str()}, expCtx); } -Value DocumentSourceOut::serialize(boost::optional explain) const { - massert(17000, - "{} shouldn't have different db than input"_format(kStageName), - _outputNs.db() == pExpCtx->ns.db()); +boost::intrusive_ptr DocumentSourceOut::createFromBsonToDifferentDB( + BSONElement elem, const boost::intrusive_ptr& expCtx) { - return Value(DOC(getSourceName() << _outputNs.coll())); + auto nsObj = elem.Obj(); + return createAndAllowDifferentDB(NamespaceString(nsObj["db"].String(), nsObj["coll"].String()), + expCtx); +} +Value DocumentSourceOut::serialize(boost::optional explain) const { + return _toDifferentDB + ? Value(DOC(getSourceName() << DOC("db" << _outputNs.db() << "coll" << _outputNs.coll()))) + : Value(DOC(getSourceName() << _outputNs.coll())); } void DocumentSourceOut::waitWhileFailPointEnabled() { diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h index 28241d82893..971f1652c47 100644 --- a/src/mongo/db/pipeline/document_source_out.h +++ b/src/mongo/db/pipeline/document_source_out.h @@ -38,6 +38,7 @@ namespace mongo { class DocumentSourceOut final : public DocumentSourceWriter { public: static constexpr StringData kStageName = "$out"_sd; + static constexpr StringData kInternalStageName = "$internalOutToDifferentDB"_sd; /** * A "lite parsed" $out stage is similar to other stages involving foreign collections except in @@ -51,6 +52,9 @@ public: static std::unique_ptr parse(const AggregationRequest& request, const BSONElement& spec); + static std::unique_ptr parseToDifferentDB(const AggregationRequest& request, + const BSONElement& spec); + bool allowShardedForeignCollection(NamespaceString nss) const final { return _foreignNssSet.find(nss) == _foreignNssSet.end(); } @@ -63,6 +67,9 @@ public: ~DocumentSourceOut() override; const char* getSourceName() const final override { + if (_toDifferentDB) { + return kInternalStageName.rawData(); + } return kStageName.rawData(); } @@ -80,21 +87,30 @@ public: boost::optional explain = boost::none) const final override; /** - * Creates a new $out stage from the given arguments. + * Creates a new $out or $internalOutToDifferentDB stage from the given arguments. */ static boost::intrusive_ptr create( NamespaceString outputNs, const boost::intrusive_ptr& expCtx); + static boost::intrusive_ptr createAndAllowDifferentDB( + NamespaceString outputNs, const boost::intrusive_ptr& expCtx); + /** - * Parses a $out stage from the user-supplied BSON. + * Parses a $out or $internalOutToDifferentDB stage from the user-supplied BSON. */ static boost::intrusive_ptr createFromBson( BSONElement elem, const boost::intrusive_ptr& pExpCtx); + static boost::intrusive_ptr createFromBsonToDifferentDB( + BSONElement elem, const boost::intrusive_ptr& pExpCtx); private: DocumentSourceOut(NamespaceString outputNs, const boost::intrusive_ptr& expCtx) - : DocumentSourceWriter(kStageName.rawData(), std::move(outputNs), expCtx) {} + : DocumentSourceWriter(outputNs.db() == expCtx->ns.db() ? kStageName.rawData() + : kInternalStageName.rawData(), + std::move(outputNs), + expCtx), + _toDifferentDB(getOutputNs().db() != expCtx->ns.db()) {} void initialize() override; @@ -122,6 +138,10 @@ private: // The temporary namespace for the $out writes. NamespaceString _tempNs; + + // Keep track of whether this document source is writing to a different DB for serialization + // purposes. + bool _toDifferentDB; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_out_test.cpp b/src/mongo/db/pipeline/document_source_out_test.cpp index e8decf18ad6..a1cf3722676 100644 --- a/src/mongo/db/pipeline/document_source_out_test.cpp +++ b/src/mongo/db/pipeline/document_source_out_test.cpp @@ -85,13 +85,13 @@ public: TEST_F(DocumentSourceOutTest, FailsToParseIncorrectType) { BSONObj spec = BSON("$out" << 1); - ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 16990); + ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 31278); spec = BSON("$out" << BSONArray()); - ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 16990); + ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 31278); spec = BSON("$out" << BSONObj()); - ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 16990); + ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 31278); } TEST_F(DocumentSourceOutTest, AcceptsStringArgument) { diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h index cbe85447f19..86bc3f1c2e2 100644 --- a/src/mongo/db/pipeline/mongo_process_interface.h +++ b/src/mongo/db/pipeline/mongo_process_interface.h @@ -125,18 +125,12 @@ public: virtual ~MongoProcessInterface(){}; /** - * Sets the OperationContext of the DBDirectClient returned by directClient(). This method must - * be called after updating the 'opCtx' member of the ExpressionContext associated with the - * document source. + * Sets the OperationContext of the DBDirectClient used by mongo process interface functions. + * This method must be called after updating the 'opCtx' member of the ExpressionContext + * associated with the document source. */ virtual void setOperationContext(OperationContext* opCtx) = 0; - /** - * Always returns a DBDirectClient. The return type in the function signature is a DBClientBase* - * because DBDirectClient isn't linked into mongos. - */ - virtual DBClientBase* directClient() = 0; - /** * Creates a new TransactionHistoryIterator object. Only applicable in processes which support * locally traversing the oplog. @@ -179,6 +173,10 @@ public: virtual CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, const NamespaceString& ns) = 0; + virtual std::list getIndexSpecs(OperationContext* opCtx, + const NamespaceString& ns, + bool includeBuildUUIDs) = 0; + /** * Appends operation latency statistics for collection "nss" to "builder" */ @@ -213,7 +211,7 @@ public: * ErrorCodes::CommandNotSupportedOnView if 'nss' describes a view. Future callers may want to * parameterize this behavior. */ - virtual BSONObj getCollectionOptions(const NamespaceString& nss) = 0; + virtual BSONObj getCollectionOptions(OperationContext* opCtx, const NamespaceString& nss) = 0; /** * Performs the given rename command if the collection given by 'targetNs' has the same options @@ -228,6 +226,24 @@ public: const BSONObj& originalCollectionOptions, const std::list& originalIndexes) = 0; + /** + * Creates a collection on the given database by running the given command. On shardsvr targets + * the primary shard of 'dbName'. + */ + virtual void createCollection(OperationContext* opCtx, + const std::string& dbName, + const BSONObj& cmdObj) = 0; + + /** + * Runs createIndexes on the given database for the given index specs. If running on a shardsvr + * this targets the primary shard of the database part of 'ns'. + */ + virtual void createIndexes(OperationContext* opCtx, + const NamespaceString& ns, + const std::vector& indexSpecs) = 0; + + virtual void dropCollection(OperationContext* opCtx, const NamespaceString& collection) = 0; + /** * Parses a Pipeline from a vector of BSONObjs representing DocumentSources. The state of the * returned pipeline will depend upon the supplied MakePipelineOptions: diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h index f94e286d6c0..fcb3a1d1a5d 100644 --- a/src/mongo/db/pipeline/mongos_process_interface.h +++ b/src/mongo/db/pipeline/mongos_process_interface.h @@ -67,10 +67,6 @@ public: MONGO_UNREACHABLE; } - DBClientBase* directClient() final { - MONGO_UNREACHABLE; - } - bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final; Status insert(const boost::intrusive_ptr& expCtx, @@ -95,6 +91,11 @@ public: const NamespaceString& ns) final { MONGO_UNREACHABLE; } + std::list getIndexSpecs(OperationContext* opCtx, + const NamespaceString& ns, + bool includeBuildUUIDs) final { + MONGO_UNREACHABLE; + } void appendLatencyStats(OperationContext* opCtx, const NamespaceString& nss, @@ -122,7 +123,7 @@ public: MONGO_UNREACHABLE; } - BSONObj getCollectionOptions(const NamespaceString& nss) final { + BSONObj getCollectionOptions(OperationContext* opCtx, const NamespaceString& nss) final { MONGO_UNREACHABLE; } @@ -134,6 +135,22 @@ public: MONGO_UNREACHABLE; } + void createCollection(OperationContext* opCtx, + const std::string& dbName, + const BSONObj& cmdObj) final { + MONGO_UNREACHABLE; + } + + void createIndexes(OperationContext* opCtx, + const NamespaceString& ns, + const std::vector& indexSpecs) final { + MONGO_UNREACHABLE; + } + + void dropCollection(OperationContext* opCtx, const NamespaceString& collection) final { + MONGO_UNREACHABLE; + } + std::unique_ptr attachCursorSourceToPipeline( const boost::intrusive_ptr& expCtx, Pipeline* pipeline) final; diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.cpp b/src/mongo/db/pipeline/process_interface_shardsvr.cpp index b8b2a31c793..af3c68ee750 100644 --- a/src/mongo/db/pipeline/process_interface_shardsvr.cpp +++ b/src/mongo/db/pipeline/process_interface_shardsvr.cpp @@ -38,6 +38,7 @@ #include "mongo/db/catalog/collection_catalog.h" #include "mongo/db/catalog/database_holder.h" #include "mongo/db/catalog/document_validation.h" +#include "mongo/db/commands.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/curop.h" #include "mongo/db/db_raii.h" @@ -49,6 +50,7 @@ #include "mongo/db/query/query_knobs_gen.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/sharding_state.h" +#include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/cluster_commands_helpers.h" #include "mongo/s/grid.h" @@ -210,4 +212,148 @@ std::unique_ptr MongoInterfaceShardServer::getShardFilterer( return std::make_unique(std::move(shardingMetadata)); } +void MongoInterfaceShardServer::renameIfOptionsAndIndexesHaveNotChanged( + OperationContext* opCtx, + const BSONObj& renameCommandObj, + const NamespaceString& destinationNs, + const BSONObj& originalCollectionOptions, + const std::list& originalIndexes) { + BSONObjBuilder newCmd; + newCmd.append("internalRenameIfOptionsAndIndexesMatch", 1); + newCmd.append("from", renameCommandObj["renameCollection"].String()); + newCmd.append("to", renameCommandObj["to"].String()); + newCmd.append("collectionOptions", originalCollectionOptions); + if (!opCtx->getWriteConcern().usedDefault) { + newCmd.append(WriteConcernOptions::kWriteConcernField, opCtx->getWriteConcern().toBSON()); + } + BSONArrayBuilder indexArrayBuilder(newCmd.subarrayStart("indexes")); + for (auto&& index : originalIndexes) { + indexArrayBuilder.append(index); + } + indexArrayBuilder.done(); + auto cachedDbInfo = + uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, destinationNs.db())); + auto newCmdObj = newCmd.obj(); + auto response = + executeCommandAgainstDatabasePrimary(opCtx, + // internalRenameIfOptionsAndIndexesMatch is adminOnly. + NamespaceString::kAdminDb, + std::move(cachedDbInfo), + newCmdObj, + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + Shard::RetryPolicy::kNoRetry); + uassertStatusOKWithContext(response.swResponse, + str::stream() << "failed while running command " << newCmdObj); + auto result = response.swResponse.getValue().data; + uassertStatusOKWithContext(getStatusFromCommandResult(result), + str::stream() << "failed while running command " << newCmdObj); + uassertStatusOKWithContext(getWriteConcernStatusFromCommandResult(result), + str::stream() << "failed while running command " << newCmdObj); +} + +std::list MongoInterfaceShardServer::getIndexSpecs(OperationContext* opCtx, + const NamespaceString& ns, + bool includeBuildUUIDs) { + auto cachedDbInfo = + uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, ns.db())); + auto shard = uassertStatusOK( + Grid::get(opCtx)->shardRegistry()->getShard(opCtx, cachedDbInfo.primaryId())); + auto cmdObj = BSON("listIndexes" << ns.coll()); + Shard::QueryResponse indexes; + try { + indexes = uassertStatusOK( + shard->runExhaustiveCursorCommand(opCtx, + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + ns.db().toString(), + appendDbVersionIfPresent(cmdObj, cachedDbInfo), + Milliseconds(-1))); + } catch (ExceptionFor&) { + return std::list(); + } + return std::list(indexes.docs.begin(), indexes.docs.end()); +} +void MongoInterfaceShardServer::createCollection(OperationContext* opCtx, + const std::string& dbName, + const BSONObj& cmdObj) { + auto cachedDbInfo = + uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName)); + BSONObj finalCmdObj = cmdObj; + if (!opCtx->getWriteConcern().usedDefault) { + auto writeObj = + BSON(WriteConcernOptions::kWriteConcernField << opCtx->getWriteConcern().toBSON()); + finalCmdObj = cmdObj.addField(writeObj.getField(WriteConcernOptions::kWriteConcernField)); + } + auto response = + executeCommandAgainstDatabasePrimary(opCtx, + dbName, + std::move(cachedDbInfo), + finalCmdObj, + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + Shard::RetryPolicy::kIdempotent); + uassertStatusOKWithContext(response.swResponse, + str::stream() << "failed while running command " << finalCmdObj); + auto result = response.swResponse.getValue().data; + uassertStatusOKWithContext(getStatusFromCommandResult(result), + str::stream() << "failed while running command " << finalCmdObj); + uassertStatusOKWithContext(getWriteConcernStatusFromCommandResult(result), + str::stream() + << "write concern failed while running command " << finalCmdObj); +} + +void MongoInterfaceShardServer::createIndexes(OperationContext* opCtx, + const NamespaceString& ns, + const std::vector& indexSpecs) { + auto cachedDbInfo = Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, ns.db()); + BSONObjBuilder newCmdBuilder; + newCmdBuilder.append("createIndexes", ns.coll()); + newCmdBuilder.append("indexes", indexSpecs); + if (!opCtx->getWriteConcern().usedDefault) { + newCmdBuilder.append(WriteConcernOptions::kWriteConcernField, + opCtx->getWriteConcern().toBSON()); + } + auto cmdObj = newCmdBuilder.done(); + auto response = + executeCommandAgainstDatabasePrimary(opCtx, + ns.db(), + cachedDbInfo.getValue(), + cmdObj, + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + Shard::RetryPolicy::kIdempotent); + uassertStatusOKWithContext(response.swResponse, + str::stream() << "failed while running command " << cmdObj); + auto result = response.swResponse.getValue().data; + uassertStatusOKWithContext(getStatusFromCommandResult(result), + str::stream() << "failed while running command " << cmdObj); + uassertStatusOKWithContext(getWriteConcernStatusFromCommandResult(result), + str::stream() + << "write concern failed while running command " << cmdObj); +} +void MongoInterfaceShardServer::dropCollection(OperationContext* opCtx, const NamespaceString& ns) { + // Build and execute the dropCollection command against the primary shard of the given + // database. + auto cachedDbInfo = Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, ns.db()); + BSONObjBuilder newCmdBuilder; + newCmdBuilder.append("drop", ns.coll()); + if (!opCtx->getWriteConcern().usedDefault) { + newCmdBuilder.append(WriteConcernOptions::kWriteConcernField, + opCtx->getWriteConcern().toBSON()); + } + auto cmdObj = newCmdBuilder.done(); + auto response = + executeCommandAgainstDatabasePrimary(opCtx, + ns.db(), + cachedDbInfo.getValue(), + cmdObj, + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + Shard::RetryPolicy::kIdempotent); + uassertStatusOKWithContext(response.swResponse, + str::stream() << "failed while running command " << cmdObj); + auto result = response.swResponse.getValue().data; + uassertStatusOKWithContext(getStatusFromCommandResult(result), + str::stream() << "failed while running command " << cmdObj); + uassertStatusOKWithContext(getWriteConcernStatusFromCommandResult(result), + str::stream() + << "write concern failed while running command " << cmdObj); +} + } // namespace mongo diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.h b/src/mongo/db/pipeline/process_interface_shardsvr.h index 8b4666c9b55..64af23bce19 100644 --- a/src/mongo/db/pipeline/process_interface_shardsvr.h +++ b/src/mongo/db/pipeline/process_interface_shardsvr.h @@ -87,6 +87,22 @@ public: std::unique_ptr getShardFilterer( const boost::intrusive_ptr& expCtx) const override final; + + std::list getIndexSpecs(OperationContext* opCtx, + const NamespaceString& ns, + bool includeBuildUUIDs) final; + void renameIfOptionsAndIndexesHaveNotChanged(OperationContext* opCtx, + const BSONObj& renameCommandObj, + const NamespaceString& targetNs, + const BSONObj& originalCollectionOptions, + const std::list& originalIndexes) final; + void createCollection(OperationContext* opCtx, + const std::string& dbName, + const BSONObj& cmdObj) final; + void createIndexes(OperationContext* opCtx, + const NamespaceString& ns, + const std::vector& indexSpecs) final; + void dropCollection(OperationContext* opCtx, const NamespaceString& collection) final; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp index c26d1642b1d..aebdebc1ffb 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.cpp +++ b/src/mongo/db/pipeline/process_interface_standalone.cpp @@ -36,10 +36,15 @@ #include "mongo/db/auth/authorization_session.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/collection_catalog.h" +#include "mongo/db/catalog/create_collection.h" #include "mongo/db/catalog/database_holder.h" #include "mongo/db/catalog/document_validation.h" +#include "mongo/db/catalog/drop_collection.h" #include "mongo/db/catalog/index_catalog_entry.h" +#include "mongo/db/catalog/list_indexes.h" +#include "mongo/db/catalog/rename_collection.h" #include "mongo/db/concurrency/d_concurrency.h" +#include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/curop.h" #include "mongo/db/cursor_manager.h" #include "mongo/db/db_raii.h" @@ -58,6 +63,7 @@ #include "mongo/db/stats/fill_locker_info.h" #include "mongo/db/stats/storage_stats.h" #include "mongo/db/storage/backup_cursor_hooks.h" +#include "mongo/db/storage/durable_catalog.h" #include "mongo/db/transaction_history_iterator.h" #include "mongo/db/transaction_participant.h" #include "mongo/s/cluster_commands_helpers.h" @@ -151,10 +157,6 @@ void MongoInterfaceStandalone::setOperationContext(OperationContext* opCtx) { _client.setOpCtx(opCtx); } -DBClientBase* MongoInterfaceStandalone::directClient() { - return &_client; -} - std::unique_ptr MongoInterfaceStandalone::createTransactionHistoryIterator(repl::OpTime time) const { bool permitYield = true; @@ -272,6 +274,12 @@ CollectionIndexUsageMap MongoInterfaceStandalone::getIndexStats(OperationContext return CollectionQueryInfo::get(collection).getIndexUsageStats(); } +std::list MongoInterfaceStandalone::getIndexSpecs(OperationContext* opCtx, + const NamespaceString& ns, + bool includeBuildUUIDs) { + return listIndexesEmptyListIfMissing(opCtx, ns, includeBuildUUIDs); +} + void MongoInterfaceStandalone::appendLatencyStats(OperationContext* opCtx, const NamespaceString& nss, bool includeHistograms, @@ -325,24 +333,22 @@ Status MongoInterfaceStandalone::appendQueryExecStats(OperationContext* opCtx, return Status::OK(); } -BSONObj MongoInterfaceStandalone::getCollectionOptions(const NamespaceString& nss) { - std::list infos; - - try { - infos = _client.getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll())); - if (infos.empty()) { - return BSONObj(); - } - } catch (const DBException& e) { - uasserted(ErrorCodes::CommandFailed, e.reason()); +BSONObj MongoInterfaceStandalone::getCollectionOptions(OperationContext* opCtx, + const NamespaceString& nss) { + AutoGetCollectionForReadCommand autoColl(opCtx, nss); + BSONObj collectionOptions = {}; + if (!autoColl.getDb()) { + return collectionOptions; + } + Collection* collection = autoColl.getCollection(); + if (!collection) { + return collectionOptions; } - const auto& infoObj = infos.front(); - uassert(ErrorCodes::CommandNotSupportedOnView, - str::stream() << nss.toString() << " is a view, not a collection", - infoObj["type"].valueStringData() != "view"_sd); - - return infoObj.getObjectField("options").getOwned(); + collectionOptions = DurableCatalog::get(opCtx) + ->getCollectionOptions(opCtx, collection->getCatalogId()) + .toBSON(); + return collectionOptions; } void MongoInterfaceStandalone::renameIfOptionsAndIndexesHaveNotChanged( @@ -351,30 +357,31 @@ void MongoInterfaceStandalone::renameIfOptionsAndIndexesHaveNotChanged( const NamespaceString& targetNs, const BSONObj& originalCollectionOptions, const std::list& originalIndexes) { - Lock::DBLock lk(opCtx, targetNs.db(), MODE_X); - - uassert(ErrorCodes::CommandFailed, - str::stream() << "collection options of target collection " << targetNs.ns() - << " changed during processing. Original options: " - << originalCollectionOptions - << ", new options: " << getCollectionOptions(targetNs), - SimpleBSONObjComparator::kInstance.evaluate(originalCollectionOptions == - getCollectionOptions(targetNs))); - - auto currentIndexes = _client.getIndexSpecs(targetNs); - uassert(ErrorCodes::CommandFailed, - str::stream() << "indexes of target collection " << targetNs.ns() - << " changed during processing.", - originalIndexes.size() == currentIndexes.size() && - std::equal(originalIndexes.begin(), - originalIndexes.end(), - currentIndexes.begin(), - SimpleBSONObjComparator::kInstance.makeEqualTo())); - - BSONObj info; - uassert(ErrorCodes::CommandFailed, - str::stream() << "renameCollection failed: " << info, - _client.runCommand("admin", renameCommandObj, info)); + NamespaceString sourceNs = NamespaceString(renameCommandObj["renameCollection"].String()); + doLocalRenameIfOptionsAndIndexesHaveNotChanged(opCtx, + sourceNs, + targetNs, + renameCommandObj["dropTarget"].trueValue(), + renameCommandObj["stayTemp"].trueValue(), + originalIndexes, + originalCollectionOptions); +} + +void MongoInterfaceStandalone::createCollection(OperationContext* opCtx, + const std::string& dbName, + const BSONObj& cmdObj) { + uassertStatusOK(mongo::createCollection(opCtx, dbName, cmdObj)); +} + +void MongoInterfaceStandalone::createIndexes(OperationContext* opCtx, + const NamespaceString& ns, + const std::vector& indexSpecs) { + _client.createIndexes(ns.ns(), indexSpecs); +} +void MongoInterfaceStandalone::dropCollection(OperationContext* opCtx, const NamespaceString& ns) { + BSONObjBuilder result; + uassertStatusOK(mongo::dropCollection( + opCtx, ns, result, {}, DropCollectionSystemCollectionMode::kDisallowSystemCollectionDrops)); } std::unique_ptr MongoInterfaceStandalone::makePipeline( diff --git a/src/mongo/db/pipeline/process_interface_standalone.h b/src/mongo/db/pipeline/process_interface_standalone.h index c045260e3e0..cb97e3267fb 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.h +++ b/src/mongo/db/pipeline/process_interface_standalone.h @@ -55,7 +55,6 @@ public: virtual ~MongoInterfaceStandalone() = default; void setOperationContext(OperationContext* opCtx) final; - DBClientBase* directClient() final; std::unique_ptr createTransactionHistoryIterator( repl::OpTime time) const final; @@ -78,6 +77,9 @@ public: boost::optional targetEpoch) override; CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, const NamespaceString& ns) final; + std::list getIndexSpecs(OperationContext* opCtx, + const NamespaceString& ns, + bool includeBuildUUIDs); void appendLatencyStats(OperationContext* opCtx, const NamespaceString& nss, bool includeHistograms, @@ -92,12 +94,19 @@ public: Status appendQueryExecStats(OperationContext* opCtx, const NamespaceString& nss, BSONObjBuilder* builder) const final override; - BSONObj getCollectionOptions(const NamespaceString& nss) final; + BSONObj getCollectionOptions(OperationContext* opCtx, const NamespaceString& nss) final; void renameIfOptionsAndIndexesHaveNotChanged(OperationContext* opCtx, const BSONObj& renameCommandObj, const NamespaceString& targetNs, const BSONObj& originalCollectionOptions, - const std::list& originalIndexes) final; + const std::list& originalIndexes); + void createCollection(OperationContext* opCtx, + const std::string& dbName, + const BSONObj& cmdObj); + void createIndexes(OperationContext* opCtx, + const NamespaceString& ns, + const std::vector& indexSpecs); + void dropCollection(OperationContext* opCtx, const NamespaceString& collection); std::unique_ptr makePipeline( const std::vector& rawPipeline, const boost::intrusive_ptr& expCtx, diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h index 381da454c5d..b9b84086f6e 100644 --- a/src/mongo/db/pipeline/stub_mongo_process_interface.h +++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h @@ -50,10 +50,6 @@ public: MONGO_UNREACHABLE; } - DBClientBase* directClient() override { - MONGO_UNREACHABLE; - } - std::unique_ptr createTransactionHistoryIterator( repl::OpTime time) const override { MONGO_UNREACHABLE; @@ -86,6 +82,12 @@ public: MONGO_UNREACHABLE; } + std::list getIndexSpecs(OperationContext* opCtx, + const NamespaceString& ns, + bool includeBuildUUIDs) override { + MONGO_UNREACHABLE; + } + void appendLatencyStats(OperationContext* opCtx, const NamespaceString& nss, bool includeHistograms, @@ -112,7 +114,7 @@ public: MONGO_UNREACHABLE; } - BSONObj getCollectionOptions(const NamespaceString& nss) override { + BSONObj getCollectionOptions(OperationContext* opCtx, const NamespaceString& nss) override { MONGO_UNREACHABLE; } @@ -125,6 +127,21 @@ public: MONGO_UNREACHABLE; } + void createCollection(OperationContext* opCtx, + const std::string& dbName, + const BSONObj& cmdObj) override { + MONGO_UNREACHABLE; + } + + void createIndexes(OperationContext* opCtx, + const NamespaceString& ns, + const std::vector& indexSpecs) override { + MONGO_UNREACHABLE; + } + void dropCollection(OperationContext* opCtx, const NamespaceString& ns) override { + MONGO_UNREACHABLE; + } + std::unique_ptr makePipeline( const std::vector& rawPipeline, const boost::intrusive_ptr& expCtx, -- cgit v1.2.1