summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorTed Tuckman <ted.tuckman@mongodb.com>2019-11-15 19:50:52 +0000
committerevergreen <evergreen@mongodb.com>2019-11-15 19:50:52 +0000
commit6258369bc2d74e69a5e1fd8e025a291550aeb368 (patch)
treebe895955944d025eb4dd94f1771ef883cb3515cc /src/mongo/db
parentab0e1e6875faa56155eb4777be66b575f6b48395 (diff)
downloadmongo-6258369bc2d74e69a5e1fd8e025a291550aeb368.tar.gz
SERVER-42693 Add renameAndPreserveOptions command and allow $out to output to different DB
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/catalog/SConscript1
-rw-r--r--src/mongo/db/catalog/list_indexes.cpp111
-rw-r--r--src/mongo/db/catalog/list_indexes.h54
-rw-r--r--src/mongo/db/catalog/rename_collection.cpp44
-rw-r--r--src/mongo/db/catalog/rename_collection.h7
-rw-r--r--src/mongo/db/commands/SConscript2
-rw-r--r--src/mongo/db/commands/dbcommands.cpp4
-rw-r--r--src/mongo/db/commands/internal_rename_if_options_and_indexes_match.idl52
-rw-r--r--src/mongo/db/commands/internal_rename_if_options_and_indexes_match_cmd.cpp109
-rw-r--r--src/mongo/db/commands/list_indexes.cpp38
-rw-r--r--src/mongo/db/commands/map_reduce_agg_test.cpp11
-rw-r--r--src/mongo/db/commands/map_reduce_stats.cpp4
-rw-r--r--src/mongo/db/commands/map_reduce_stats_test.cpp3
-rw-r--r--src/mongo/db/commands/mr_common.cpp16
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/document_source_out.cpp95
-rw-r--r--src/mongo/db/pipeline/document_source_out.h26
-rw-r--r--src/mongo/db/pipeline/document_source_out_test.cpp6
-rw-r--r--src/mongo/db/pipeline/mongo_process_interface.h36
-rw-r--r--src/mongo/db/pipeline/mongos_process_interface.h27
-rw-r--r--src/mongo/db/pipeline/process_interface_shardsvr.cpp146
-rw-r--r--src/mongo/db/pipeline/process_interface_shardsvr.h16
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.cpp95
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.h15
-rw-r--r--src/mongo/db/pipeline/stub_mongo_process_interface.h27
25 files changed, 776 insertions, 170 deletions
diff --git a/src/mongo/db/catalog/SConscript b/src/mongo/db/catalog/SConscript
index 70cb654df94..d2c7b9ec1e5 100644
--- a/src/mongo/db/catalog/SConscript
+++ b/src/mongo/db/catalog/SConscript
@@ -395,6 +395,7 @@ env.Library(
'drop_database.cpp',
'drop_indexes.cpp',
'rename_collection.cpp',
+ 'list_indexes.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
diff --git a/src/mongo/db/catalog/list_indexes.cpp b/src/mongo/db/catalog/list_indexes.cpp
new file mode 100644
index 00000000000..510fd7e3aec
--- /dev/null
+++ b/src/mongo/db/catalog/list_indexes.cpp
@@ -0,0 +1,111 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/catalog/list_indexes.h"
+
+#include <list>
+
+#include "mongo/bson/bsonobj.h"
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/curop_failpoint_helpers.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/storage/durable_catalog.h"
+#include "mongo/db/storage/storage_engine.h"
+#include "mongo/util/uuid.h"
+
+// Failpoint which causes to hang "listIndexes" cmd after acquiring the DB lock.
+MONGO_FAIL_POINT_DEFINE(hangBeforeListIndexes);
+
+namespace mongo {
+
+StatusWith<std::list<BSONObj>> listIndexes(OperationContext* opCtx,
+ const NamespaceStringOrUUID& ns,
+ bool includeBuildUUIDs) {
+ AutoGetCollectionForReadCommand ctx(opCtx, ns);
+ Collection* collection = ctx.getCollection();
+ auto nss = ctx.getNss();
+ if (!collection) {
+ return StatusWith<std::list<BSONObj>>(ErrorCodes::NamespaceNotFound,
+ str::stream()
+ << "ns does not exist: " << ctx.getNss().ns());
+ }
+ return StatusWith<std::list<BSONObj>>(
+ listIndexesInLock(opCtx, collection, nss, includeBuildUUIDs));
+}
+
+std::list<BSONObj> listIndexesInLock(OperationContext* opCtx,
+ Collection* collection,
+ const NamespaceString& nss,
+ bool includeBuildUUIDs) {
+
+ auto durableCatalog = DurableCatalog::get(opCtx);
+
+ CurOpFailpointHelpers::waitWhileFailPointEnabled(
+ &hangBeforeListIndexes, opCtx, "hangBeforeListIndexes", []() {}, false, nss);
+
+ std::vector<std::string> indexNames;
+ writeConflictRetry(opCtx, "listIndexes", nss.ns(), [&] {
+ indexNames.clear();
+ durableCatalog->getAllIndexes(opCtx, collection->getCatalogId(), &indexNames);
+ });
+
+ std::list<BSONObj> indexSpecs;
+
+ for (size_t i = 0; i < indexNames.size(); i++) {
+ auto indexSpec = writeConflictRetry(opCtx, "listIndexes", nss.ns(), [&] {
+ if (includeBuildUUIDs &&
+ !durableCatalog->isIndexReady(opCtx, collection->getCatalogId(), indexNames[i])) {
+ BSONObjBuilder builder;
+ builder.append(
+ "spec"_sd,
+ durableCatalog->getIndexSpec(opCtx, collection->getCatalogId(), indexNames[i]));
+
+ // TODO(SERVER-37980): Replace with index build UUID.
+ auto indexBuildUUID = UUID::gen();
+ indexBuildUUID.appendToBuilder(&builder, "buildUUID"_sd);
+ return builder.obj();
+ }
+ return durableCatalog->getIndexSpec(opCtx, collection->getCatalogId(), indexNames[i]);
+ });
+ indexSpecs.push_back(indexSpec);
+ }
+ return indexSpecs;
+}
+std::list<BSONObj> listIndexesEmptyListIfMissing(OperationContext* opCtx,
+ const NamespaceStringOrUUID& nss,
+ bool includeBuildUUIDs) {
+ auto listStatus = listIndexes(opCtx, nss, includeBuildUUIDs);
+ return listStatus.isOK() ? listStatus.getValue() : std::list<BSONObj>();
+}
+} // namespace mongo
diff --git a/src/mongo/db/catalog/list_indexes.h b/src/mongo/db/catalog/list_indexes.h
new file mode 100644
index 00000000000..62296826e9a
--- /dev/null
+++ b/src/mongo/db/catalog/list_indexes.h
@@ -0,0 +1,54 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+#include <list>
+
+#include "mongo/base/status_with.h"
+#include "mongo/bson/bsonobj.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/operation_context.h"
+
+namespace mongo {
+
+/**
+ * Return a list of the indexes on the given collection.
+ */
+StatusWith<std::list<BSONObj>> listIndexes(OperationContext* opCtx,
+ const NamespaceStringOrUUID& ns,
+ bool includeBuildUUIDs);
+std::list<BSONObj> listIndexesInLock(OperationContext* opCtx,
+ Collection* collection,
+ const NamespaceString& nss,
+ bool includeBuildUUIDs);
+std::list<BSONObj> listIndexesEmptyListIfMissing(OperationContext* opCtx,
+ const NamespaceStringOrUUID& nss,
+ bool includeBuildUUIDs);
+
+} // namespace mongo
diff --git a/src/mongo/db/catalog/rename_collection.cpp b/src/mongo/db/catalog/rename_collection.cpp
index 0731fa184e4..dc1d2208914 100644
--- a/src/mongo/db/catalog/rename_collection.cpp
+++ b/src/mongo/db/catalog/rename_collection.cpp
@@ -39,6 +39,7 @@
#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/catalog/drop_collection.h"
#include "mongo/db/catalog/index_catalog.h"
+#include "mongo/db/catalog/list_indexes.h"
#include "mongo/db/client.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/curop.h"
@@ -706,6 +707,49 @@ Status renameBetweenDBs(OperationContext* opCtx,
} // namespace
+void doLocalRenameIfOptionsAndIndexesHaveNotChanged(OperationContext* opCtx,
+ const NamespaceString& sourceNs,
+ const NamespaceString& targetNs,
+ bool dropTarget,
+ bool stayTemp,
+ std::list<BSONObj> originalIndexes,
+ BSONObj originalCollectionOptions) {
+ AutoGetDb dbLock(opCtx, targetNs.db(), MODE_X);
+ auto collection = dbLock.getDb()
+ ? CollectionCatalog::get(opCtx).lookupCollectionByNamespace(targetNs)
+ : nullptr;
+ BSONObj collectionOptions = {};
+ if (collection) {
+ // We do not include the UUID field in the options comparison. It is ok if the target
+ // collection was dropped and recreated, as long as the new target collection has the same
+ // options and indexes as the original one did. This is mainly to support concurrent $out
+ // to the same collection.
+ collectionOptions = DurableCatalog::get(opCtx)
+ ->getCollectionOptions(opCtx, collection->getCatalogId())
+ .toBSON()
+ .removeField("uuid");
+ }
+
+ uassert(ErrorCodes::CommandFailed,
+ str::stream() << "collection options of target collection " << targetNs.ns()
+ << " changed during processing. Original options: "
+ << originalCollectionOptions << ", new options: " << collectionOptions,
+ SimpleBSONObjComparator::kInstance.evaluate(
+ originalCollectionOptions.removeField("uuid") == collectionOptions));
+
+ auto currentIndexes =
+ listIndexesEmptyListIfMissing(opCtx, targetNs, false /* includeBuildUUIDs */);
+ uassert(ErrorCodes::CommandFailed,
+ str::stream() << "indexes of target collection " << targetNs.ns()
+ << " changed during processing.",
+ originalIndexes.size() == currentIndexes.size() &&
+ std::equal(originalIndexes.begin(),
+ originalIndexes.end(),
+ currentIndexes.begin(),
+ SimpleBSONObjComparator::kInstance.makeEqualTo()));
+
+ validateAndRunRenameCollection(opCtx, sourceNs, targetNs, dropTarget, stayTemp);
+}
void validateAndRunRenameCollection(OperationContext* opCtx,
const NamespaceString& source,
const NamespaceString& target,
diff --git a/src/mongo/db/catalog/rename_collection.h b/src/mongo/db/catalog/rename_collection.h
index 8b4af732173..7269fb93ac9 100644
--- a/src/mongo/db/catalog/rename_collection.h
+++ b/src/mongo/db/catalog/rename_collection.h
@@ -42,6 +42,13 @@ namespace repl {
class OpTime;
} // namespace repl
+void doLocalRenameIfOptionsAndIndexesHaveNotChanged(OperationContext* opCtx,
+ const NamespaceString& sourceNs,
+ const NamespaceString& targetNs,
+ bool dropTarget,
+ bool stayTemp,
+ std::list<BSONObj> originalIndexes,
+ BSONObj collectionOptions);
/**
* Renames the collection from "source" to "target" and drops the existing collection iff
* "dropTarget" is true. "stayTemp" indicates whether a collection should maintain its
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript
index 143874e5836..7464bc5b2b0 100644
--- a/src/mongo/db/commands/SConscript
+++ b/src/mongo/db/commands/SConscript
@@ -356,6 +356,7 @@ env.Library(
"dbhash.cpp",
"driverHelpers.cpp",
"haystack.cpp",
+ "internal_rename_if_options_and_indexes_match_cmd.cpp",
"map_reduce_command.cpp",
"map_reduce_finish_command.cpp",
"mr.cpp",
@@ -371,6 +372,7 @@ env.Library(
"txn_cmds.cpp",
"user_management_commands.cpp",
"vote_commit_index_build_command.cpp",
+ env.Idlc('internal_rename_if_options_and_indexes_match.idl')[0],
env.Idlc('vote_commit_index_build.idl')[0],
],
LIBDEPS=[
diff --git a/src/mongo/db/commands/dbcommands.cpp b/src/mongo/db/commands/dbcommands.cpp
index 3db0141f5a5..e21d689045a 100644
--- a/src/mongo/db/commands/dbcommands.cpp
+++ b/src/mongo/db/commands/dbcommands.cpp
@@ -343,7 +343,9 @@ public:
if (cmd.getTemp()) {
uassert(ErrorCodes::InvalidOptions,
str::stream() << "the 'temp' field is an invalid option",
- opCtx->getClient()->isInDirectClient());
+ opCtx->getClient()->isInDirectClient() ||
+ (opCtx->getClient()->session()->getTags() |
+ transport::Session::kInternalClient));
}
// Validate _id index spec and fill in missing fields.
diff --git a/src/mongo/db/commands/internal_rename_if_options_and_indexes_match.idl b/src/mongo/db/commands/internal_rename_if_options_and_indexes_match.idl
new file mode 100644
index 00000000000..fd0ec5f6ecf
--- /dev/null
+++ b/src/mongo/db/commands/internal_rename_if_options_and_indexes_match.idl
@@ -0,0 +1,52 @@
+# Copyright (C) 2019-present MongoDB, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the Server Side Public License, version 1,
+# as published by MongoDB, Inc.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Server Side Public License for more details.
+#
+# You should have received a copy of the Server Side Public License
+# along with this program. If not, see
+# <http://www.mongodb.com/licensing/server-side-public-license>.
+#
+# As a special exception, the copyright holders give permission to link the
+# code of portions of this program with the OpenSSL library under certain
+# conditions as described in each individual source file and distribute
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the Server Side Public License in all respects for
+# all of the code used other than as permitted herein. If you modify file(s)
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
+# delete this exception statement from your version. If you delete this
+# exception statement from all source files in the program, then also delete
+# it in the license file.
+
+global:
+ cpp_namespace: "mongo"
+
+imports:
+ - "mongo/idl/basic_types.idl"
+
+commands:
+ internalRenameIfOptionsAndIndexesMatch:
+ description: "An internal command that does a rename, but first checks to make sure the
+ indexes and collection options on the destination match those given in the
+ command."
+ namespace: ignored
+ fields:
+ from:
+ type: namespacestring
+ to:
+ type: namespacestring
+ collectionOptions:
+ description: "An object representing the options on the from collection with the
+ same format as the options from the listCollections command."
+ type: object
+ indexes:
+ description: "An object with form {indexName: {spec}, indexName: {spec}, ...}"
+ type: array<object>
+
diff --git a/src/mongo/db/commands/internal_rename_if_options_and_indexes_match_cmd.cpp b/src/mongo/db/commands/internal_rename_if_options_and_indexes_match_cmd.cpp
new file mode 100644
index 00000000000..8e112043f5a
--- /dev/null
+++ b/src/mongo/db/commands/internal_rename_if_options_and_indexes_match_cmd.cpp
@@ -0,0 +1,109 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/commands/internal_rename_if_options_and_indexes_match_gen.h"
+
+#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/catalog/rename_collection.h"
+#include "mongo/db/commands.h"
+#include "mongo/db/db_raii.h"
+
+namespace mongo {
+
+/**
+ * Rename a collection while checking collection option and indexes.
+ */
+class InternalRenameIfOptionsAndIndexesMatchCmd final
+ : public TypedCommand<InternalRenameIfOptionsAndIndexesMatchCmd> {
+public:
+ using Request = InternalRenameIfOptionsAndIndexesMatch;
+
+ class Invocation final : public InvocationBase {
+ public:
+ using InvocationBase::InvocationBase;
+
+ void typedRun(OperationContext* opCtx) {
+ auto thisRequest = request();
+ auto originalIndexes = thisRequest.getIndexes();
+ auto indexList = std::list<BSONObj>(originalIndexes.begin(), originalIndexes.end());
+ doLocalRenameIfOptionsAndIndexesHaveNotChanged(opCtx,
+ thisRequest.getFrom(),
+ thisRequest.getTo(),
+ true /* dropTarget */,
+ false /* stayTemp */,
+ std::move(indexList),
+ thisRequest.getCollectionOptions());
+ }
+
+ private:
+ NamespaceString ns() const override {
+ return request().getFrom();
+ }
+ bool supportsWriteConcern() const override {
+ return true;
+ }
+
+ void doCheckAuthorization(OperationContext* opCtx) const override {
+ auto thisRequest = request();
+ auto from = thisRequest.getFrom();
+ auto to = thisRequest.getTo();
+ uassert(ErrorCodes::Unauthorized,
+ str::stream() << "Unauthorized to rename " << from,
+ AuthorizationSession::get(opCtx->getClient())
+ ->isAuthorizedForActionsOnResource(ResourcePattern::forExactNamespace(from),
+ ActionType::renameCollection));
+ uassert(ErrorCodes::Unauthorized,
+ str::stream() << "Unauthorized to drop " << to,
+ AuthorizationSession::get(opCtx->getClient())
+ ->isAuthorizedForActionsOnResource(ResourcePattern::forExactNamespace(to),
+ ActionType::dropCollection));
+ uassert(ErrorCodes::Unauthorized,
+ str::stream() << "Unauthorized to insert into " << to,
+ AuthorizationSession::get(opCtx->getClient())
+ ->isAuthorizedForActionsOnResource(ResourcePattern::forExactNamespace(to),
+ ActionType::insert));
+ }
+ };
+
+ std::string help() const override {
+ return "Internal command to rename and check collection options";
+ }
+
+ bool adminOnly() const override {
+ return true;
+ }
+
+ AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
+ return AllowedOnSecondary::kNever;
+ }
+
+} internalRenameIfOptionsAndIndexesMatchCmd;
+} // namespace mongo
diff --git a/src/mongo/db/commands/list_indexes.cpp b/src/mongo/db/commands/list_indexes.cpp
index 858b22daba0..55d3c6b0a65 100644
--- a/src/mongo/db/commands/list_indexes.cpp
+++ b/src/mongo/db/commands/list_indexes.cpp
@@ -33,6 +33,7 @@
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/catalog/list_indexes.h"
#include "mongo/db/clientcursor.h"
#include "mongo/db/commands.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
@@ -60,9 +61,6 @@ using std::vector;
namespace {
-// Failpoint which causes to hang "listIndexes" cmd after acquiring the DB lock.
-MONGO_FAIL_POINT_DEFINE(hangBeforeListIndexes);
-
/**
* Lists the indexes for a given collection.
* If 'includeBuildUUIDs' is true, then the index build uuid is also returned alongside the index
@@ -153,42 +151,12 @@ public:
uassert(ErrorCodes::NamespaceNotFound,
str::stream() << "ns does not exist: " << ctx.getNss().ns(),
collection);
-
- auto durableCatalog = DurableCatalog::get(opCtx);
-
nss = ctx.getNss();
-
- CurOpFailpointHelpers::waitWhileFailPointEnabled(
- &hangBeforeListIndexes, opCtx, "hangBeforeListIndexes", []() {}, false, nss);
-
- vector<string> indexNames;
- writeConflictRetry(opCtx, "listIndexes", nss.ns(), [&] {
- indexNames.clear();
- durableCatalog->getAllIndexes(opCtx, collection->getCatalogId(), &indexNames);
- });
-
+ auto indexList = listIndexesInLock(opCtx, collection, nss, includeBuildUUIDs);
auto ws = std::make_unique<WorkingSet>();
auto root = std::make_unique<QueuedDataStage>(opCtx, ws.get());
- for (size_t i = 0; i < indexNames.size(); i++) {
- auto indexSpec = writeConflictRetry(opCtx, "listIndexes", nss.ns(), [&] {
- if (includeBuildUUIDs &&
- !durableCatalog->isIndexReady(
- opCtx, collection->getCatalogId(), indexNames[i])) {
- BSONObjBuilder builder;
- builder.append("spec"_sd,
- durableCatalog->getIndexSpec(
- opCtx, collection->getCatalogId(), indexNames[i]));
-
- // TODO(SERVER-37980): Replace with index build UUID.
- auto indexBuildUUID = UUID::gen();
- indexBuildUUID.appendToBuilder(&builder, "buildUUID"_sd);
- return builder.obj();
- }
- return durableCatalog->getIndexSpec(
- opCtx, collection->getCatalogId(), indexNames[i]);
- });
-
+ for (auto&& indexSpec : indexList) {
WorkingSetID id = ws->allocate();
WorkingSetMember* member = ws->get(id);
member->keyData.clear();
diff --git a/src/mongo/db/commands/map_reduce_agg_test.cpp b/src/mongo/db/commands/map_reduce_agg_test.cpp
index f1e27e6e1fc..877a93b7fdd 100644
--- a/src/mongo/db/commands/map_reduce_agg_test.cpp
+++ b/src/mongo/db/commands/map_reduce_agg_test.cpp
@@ -215,17 +215,6 @@ TEST(MapReduceAggTest, testOutReduceTranslate) {
ASSERT_EQ("$project"s, (*subpipeline)[0].firstElement().fieldName());
}
-TEST(MapReduceAggTest, testOutDifferentDBFails) {
- auto nss = NamespaceString{"db", "coll"};
- auto mr = MapReduce{
- nss,
- MapReduceJavascriptCode{mapJavascript.toString()},
- MapReduceJavascriptCode{reduceJavascript.toString()},
- MapReduceOutOptions{boost::make_optional("db2"s), "coll2", OutputType::Replace, false}};
- boost::intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest(nss));
- ASSERT_THROWS_CODE(map_reduce_common::translateFromMR(mr, expCtx), AssertionException, 31278);
-}
-
TEST(MapReduceAggTest, testOutSameCollection) {
auto nss = NamespaceString{"db", "coll"};
auto mr = MapReduce{
diff --git a/src/mongo/db/commands/map_reduce_stats.cpp b/src/mongo/db/commands/map_reduce_stats.cpp
index 5440bb435df..bff4dfd8732 100644
--- a/src/mongo/db/commands/map_reduce_stats.cpp
+++ b/src/mongo/db/commands/map_reduce_stats.cpp
@@ -62,7 +62,9 @@ MapReduceStats::MapReduceStats(const std::vector<CommonStats>& pipelineStats,
_timing.reduce = stageStats.executionTimeMillis - prevTime;
_counts.output = stageStats.advanced;
} else {
- invariant(stageName == "$out"_sd || stageName == "$merge"_sd, stageName);
+ invariant(stageName == "$out"_sd || stageName == "$internalOutToDifferentDB"_sd ||
+ stageName == "$merge"_sd,
+ stageName);
}
prevTime = stageStats.executionTimeMillis;
diff --git a/src/mongo/db/commands/map_reduce_stats_test.cpp b/src/mongo/db/commands/map_reduce_stats_test.cpp
index 28784f58c1f..a769e47b564 100644
--- a/src/mongo/db/commands/map_reduce_stats_test.cpp
+++ b/src/mongo/db/commands/map_reduce_stats_test.cpp
@@ -190,7 +190,8 @@ TEST_F(MapReduceStatsTest, ConfirmStatsUnshardedWithFinalizeProjectStage) {
DEATH_TEST_F(MapReduceStatsTest,
DeathByUnknownStage,
- "Invariant failure stageName == \"$out\"_sd || stageName == \"$merge\"_sd") {
+ "Invariant failure stageName == \"$out\"_sd || stageName == "
+ "\"$internalOutToDifferentDB\"_sd || stageName == \"$merge\"_sd") {
addInvalidStageForMapReduce();
MapReduceStats mapReduceStats(buildMapReducePipelineStats(),
MapReduceStats::ResponseType::kUnsharded,
diff --git a/src/mongo/db/commands/mr_common.cpp b/src/mongo/db/commands/mr_common.cpp
index 10cb66b3543..bad14132cc1 100644
--- a/src/mongo/db/commands/mr_common.cpp
+++ b/src/mongo/db/commands/mr_common.cpp
@@ -159,13 +159,8 @@ auto translateFinalize(boost::intrusive_ptr<ExpressionContext> expCtx, std::stri
}
auto translateOutReplace(boost::intrusive_ptr<ExpressionContext> expCtx,
- const StringData inputDatabase,
NamespaceString targetNss) {
- uassert(31278,
- "MapReduce must output to the database belonging to its input collection - Input: "s +
- inputDatabase + " Output: " + targetNss.db(),
- inputDatabase == targetNss.db());
- return DocumentSourceOut::create(std::move(targetNss), expCtx);
+ return DocumentSourceOut::createAndAllowDifferentDB(std::move(targetNss), expCtx);
}
auto translateOutMerge(boost::intrusive_ptr<ExpressionContext> expCtx, NamespaceString targetNss) {
@@ -205,12 +200,11 @@ auto translateOutReduce(boost::intrusive_ptr<ExpressionContext> expCtx,
auto translateOut(boost::intrusive_ptr<ExpressionContext> expCtx,
const OutputType outputType,
- const StringData inputDatabase,
NamespaceString targetNss,
std::string reduceCode) {
switch (outputType) {
case OutputType::Replace:
- return boost::make_optional(translateOutReplace(expCtx, inputDatabase, targetNss));
+ return boost::make_optional(translateOutReplace(expCtx, targetNss));
case OutputType::Merge:
return boost::make_optional(translateOutMerge(expCtx, targetNss));
case OutputType::Reduce:
@@ -388,11 +382,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> translateFromMR(
parsedMr.getFinalize().map([&](auto&& finalize) {
return translateFinalize(expCtx, parsedMr.getFinalize()->getCode());
}),
- translateOut(expCtx,
- outType,
- parsedMr.getNamespace().db(),
- std::move(outNss),
- parsedMr.getReduce().getCode())),
+ translateOut(expCtx, outType, std::move(outNss), parsedMr.getReduce().getCode())),
expCtx));
pipeline->optimizePipeline();
return pipeline;
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 13da4edb4aa..088a7953325 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -205,6 +205,7 @@ env.Library(
'process_interface_standalone.cpp',
],
LIBDEPS=[
+ '$BUILD_DIR/mongo/db/catalog/catalog_helpers',
'$BUILD_DIR/mongo/db/ops/write_ops_exec',
'$BUILD_DIR/mongo/db/query_exec',
'$BUILD_DIR/mongo/db/repl/speculative_majority_read_info',
diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp
index 860d9028bec..fc390947621 100644
--- a/src/mongo/db/pipeline/document_source_out.cpp
+++ b/src/mongo/db/pipeline/document_source_out.cpp
@@ -41,16 +41,18 @@
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/util/destructor_guard.h"
#include "mongo/util/log.h"
+#include "mongo/util/uuid.h"
namespace mongo {
using namespace fmt::literals;
-static AtomicWord<unsigned> aggOutCounter;
-
MONGO_FAIL_POINT_DEFINE(hangWhileBuildingDocumentSourceOutBatch);
REGISTER_DOCUMENT_SOURCE(out,
DocumentSourceOut::LiteParsed::parse,
DocumentSourceOut::createFromBson);
+REGISTER_DOCUMENT_SOURCE(internalOutToDifferentDB,
+ DocumentSourceOut::LiteParsed::parseToDifferentDB,
+ DocumentSourceOut::createFromBsonToDifferentDB);
DocumentSourceOut::~DocumentSourceOut() {
DESTRUCTOR_GUARD(
@@ -72,18 +74,42 @@ DocumentSourceOut::~DocumentSourceOut() {
[this] { pExpCtx->mongoProcessInterface->setOperationContext(pExpCtx->opCtx); });
pExpCtx->mongoProcessInterface->setOperationContext(cleanupOpCtx.get());
- pExpCtx->mongoProcessInterface->directClient()->dropCollection(_tempNs.ns());
+ pExpCtx->mongoProcessInterface->dropCollection(cleanupOpCtx.get(), _tempNs);
});
}
+std::unique_ptr<DocumentSourceOut::LiteParsed> DocumentSourceOut::LiteParsed::parseToDifferentDB(
+ const AggregationRequest& request, const BSONElement& spec) {
+
+ auto specObj = spec.Obj();
+ auto dbElem = specObj["db"];
+ auto collElem = specObj["coll"];
+ uassert(16994,
+ str::stream() << kStageName << " must have db and coll string arguments",
+ dbElem.type() == BSONType::String && collElem.type() == BSONType::String);
+ NamespaceString targetNss{dbElem.String(), collElem.String()};
+ uassert(ErrorCodes::InvalidNamespace,
+ "Invalid {} target namespace, {}"_format(kStageName, targetNss.ns()),
+ targetNss.isValid());
+
+ ActionSet actions{ActionType::insert, ActionType::remove};
+ if (request.shouldBypassDocumentValidation()) {
+ actions.addAction(ActionType::bypassDocumentValidation);
+ }
+
+ PrivilegeVector privileges{Privilege(ResourcePattern::forExactNamespace(targetNss), actions)};
+
+ return std::make_unique<DocumentSourceOut::LiteParsed>(std::move(targetNss),
+ std::move(privileges));
+}
+
std::unique_ptr<DocumentSourceOut::LiteParsed> DocumentSourceOut::LiteParsed::parse(
const AggregationRequest& request, const BSONElement& spec) {
- uassert(ErrorCodes::TypeMismatch,
- "{} stage requires a string argument, but found {}"_format(kStageName,
- typeName(spec.type())),
+ uassert(16990,
+ "{} only supports a string argument, but found {}"_format(kStageName,
+ typeName(spec.type())),
spec.type() == BSONType::String);
-
NamespaceString targetNss{request.getNamespaceString().db(), spec.valueStringData()};
uassert(ErrorCodes::InvalidNamespace,
"Invalid {} target namespace, {}"_format(kStageName, targetNss.ns()),
@@ -103,16 +129,19 @@ std::unique_ptr<DocumentSourceOut::LiteParsed> DocumentSourceOut::LiteParsed::pa
void DocumentSourceOut::initialize() {
DocumentSourceWriteBlock writeBlock(pExpCtx->opCtx);
- DBClientBase* conn = pExpCtx->mongoProcessInterface->directClient();
-
const auto& outputNs = getOutputNs();
- _tempNs = NamespaceString(str::stream()
- << outputNs.db() << ".tmp.agg_out." << aggOutCounter.addAndFetch(1));
+ // We will write all results into a temporary collection, then rename the temporary collection
+ // to be the target collection once we are done.
+ _tempNs = NamespaceString(str::stream() << outputNs.db() << ".tmp.agg_out." << UUID::gen());
// Save the original collection options and index specs so we can check they didn't change
// during computation.
- _originalOutOptions = pExpCtx->mongoProcessInterface->getCollectionOptions(outputNs);
- _originalIndexes = conn->getIndexSpecs(outputNs);
+ _originalOutOptions =
+ // The uuid field is considered an option, but cannot be passed to createCollection.
+ pExpCtx->mongoProcessInterface->getCollectionOptions(pExpCtx->opCtx, outputNs)
+ .removeField("uuid");
+ _originalIndexes = pExpCtx->mongoProcessInterface->getIndexSpecs(
+ pExpCtx->opCtx, outputNs, false /* includeBuildUUIDs */);
// Check if it's capped to make sure we have a chance of succeeding before we do all the work.
// If the collection becomes capped during processing, the collection options will have changed,
@@ -121,11 +150,6 @@ void DocumentSourceOut::initialize() {
"namespace '{}' is capped so it can't be used for {}"_format(outputNs.ns(), kStageName),
_originalOutOptions["capped"].eoo());
- // We will write all results into a temporary collection, then rename the temporary
- // collection to be the target collection once we are done.
- _tempNs = NamespaceString(str::stream()
- << outputNs.db() << ".tmp.agg_out." << aggOutCounter.addAndFetch(1));
-
// Create temp collection, copying options from the existing output collection if any.
{
BSONObjBuilder cmd;
@@ -133,11 +157,8 @@ void DocumentSourceOut::initialize() {
cmd << "temp" << true;
cmd.appendElementsUnique(_originalOutOptions);
- BSONObj info;
- uassert(16994,
- "failed to create temporary {} collection '{}': {}"_format(
- kStageName, _tempNs.ns(), getStatusFromCommandResult(info).reason()),
- conn->runCommand(outputNs.db().toString(), cmd.done(), info));
+ pExpCtx->mongoProcessInterface->createCollection(
+ pExpCtx->opCtx, _tempNs.db().toString(), cmd.done());
}
if (_originalIndexes.empty()) {
@@ -148,7 +169,7 @@ void DocumentSourceOut::initialize() {
try {
std::vector<BSONObj> tempNsIndexes = {std::begin(_originalIndexes),
std::end(_originalIndexes)};
- conn->createIndexes(_tempNs.ns(), tempNsIndexes);
+ pExpCtx->mongoProcessInterface->createIndexes(pExpCtx->opCtx, _tempNs, tempNsIndexes);
} catch (DBException& ex) {
ex.addContext("Copying indexes for $out failed");
throw;
@@ -177,7 +198,11 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceOut::create(
"{} is not supported when the output collection is in a different "
"database"_format(kStageName),
outputNs.db() == expCtx->ns.db());
+ return createAndAllowDifferentDB(outputNs, expCtx);
+}
+boost::intrusive_ptr<DocumentSource> DocumentSourceOut::createAndAllowDifferentDB(
+ NamespaceString outputNs, const boost::intrusive_ptr<ExpressionContext>& expCtx) {
uassert(ErrorCodes::OperationNotSupportedInTransaction,
"{} cannot be used in a transaction"_format(kStageName),
!expCtx->inMultiDocumentTransaction);
@@ -191,10 +216,6 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceOut::create(
"Invalid {} target namespace, {}"_format(kStageName, outputNs.ns()),
outputNs.isValid());
- uassert(17017,
- "{} is not supported to an existing *sharded* output collection"_format(kStageName),
- !expCtx->mongoProcessInterface->isSharded(expCtx->opCtx, outputNs));
-
uassert(17385,
"Can't {} to special collection: {}"_format(kStageName, outputNs.coll()),
!outputNs.isSystem());
@@ -208,20 +229,24 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceOut::create(
boost::intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) {
- uassert(16990,
+ uassert(31278,
"{} only supports a string argument, but found {}"_format(kStageName,
typeName(elem.type())),
elem.type() == BSONType::String);
-
return create({expCtx->ns.db(), elem.str()}, expCtx);
}
-Value DocumentSourceOut::serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
- massert(17000,
- "{} shouldn't have different db than input"_format(kStageName),
- _outputNs.db() == pExpCtx->ns.db());
+boost::intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBsonToDifferentDB(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) {
- return Value(DOC(getSourceName() << _outputNs.coll()));
+ auto nsObj = elem.Obj();
+ return createAndAllowDifferentDB(NamespaceString(nsObj["db"].String(), nsObj["coll"].String()),
+ expCtx);
+}
+Value DocumentSourceOut::serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
+ return _toDifferentDB
+ ? Value(DOC(getSourceName() << DOC("db" << _outputNs.db() << "coll" << _outputNs.coll())))
+ : Value(DOC(getSourceName() << _outputNs.coll()));
}
void DocumentSourceOut::waitWhileFailPointEnabled() {
diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h
index 28241d82893..971f1652c47 100644
--- a/src/mongo/db/pipeline/document_source_out.h
+++ b/src/mongo/db/pipeline/document_source_out.h
@@ -38,6 +38,7 @@ namespace mongo {
class DocumentSourceOut final : public DocumentSourceWriter<BSONObj> {
public:
static constexpr StringData kStageName = "$out"_sd;
+ static constexpr StringData kInternalStageName = "$internalOutToDifferentDB"_sd;
/**
* A "lite parsed" $out stage is similar to other stages involving foreign collections except in
@@ -51,6 +52,9 @@ public:
static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request,
const BSONElement& spec);
+ static std::unique_ptr<LiteParsed> parseToDifferentDB(const AggregationRequest& request,
+ const BSONElement& spec);
+
bool allowShardedForeignCollection(NamespaceString nss) const final {
return _foreignNssSet.find(nss) == _foreignNssSet.end();
}
@@ -63,6 +67,9 @@ public:
~DocumentSourceOut() override;
const char* getSourceName() const final override {
+ if (_toDifferentDB) {
+ return kInternalStageName.rawData();
+ }
return kStageName.rawData();
}
@@ -80,21 +87,30 @@ public:
boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final override;
/**
- * Creates a new $out stage from the given arguments.
+ * Creates a new $out or $internalOutToDifferentDB stage from the given arguments.
*/
static boost::intrusive_ptr<DocumentSource> create(
NamespaceString outputNs, const boost::intrusive_ptr<ExpressionContext>& expCtx);
+ static boost::intrusive_ptr<DocumentSource> createAndAllowDifferentDB(
+ NamespaceString outputNs, const boost::intrusive_ptr<ExpressionContext>& expCtx);
+
/**
- * Parses a $out stage from the user-supplied BSON.
+ * Parses a $out or $internalOutToDifferentDB stage from the user-supplied BSON.
*/
static boost::intrusive_ptr<DocumentSource> createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+ static boost::intrusive_ptr<DocumentSource> createFromBsonToDifferentDB(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
private:
DocumentSourceOut(NamespaceString outputNs,
const boost::intrusive_ptr<ExpressionContext>& expCtx)
- : DocumentSourceWriter(kStageName.rawData(), std::move(outputNs), expCtx) {}
+ : DocumentSourceWriter(outputNs.db() == expCtx->ns.db() ? kStageName.rawData()
+ : kInternalStageName.rawData(),
+ std::move(outputNs),
+ expCtx),
+ _toDifferentDB(getOutputNs().db() != expCtx->ns.db()) {}
void initialize() override;
@@ -122,6 +138,10 @@ private:
// The temporary namespace for the $out writes.
NamespaceString _tempNs;
+
+ // Keep track of whether this document source is writing to a different DB for serialization
+ // purposes.
+ bool _toDifferentDB;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_out_test.cpp b/src/mongo/db/pipeline/document_source_out_test.cpp
index e8decf18ad6..a1cf3722676 100644
--- a/src/mongo/db/pipeline/document_source_out_test.cpp
+++ b/src/mongo/db/pipeline/document_source_out_test.cpp
@@ -85,13 +85,13 @@ public:
TEST_F(DocumentSourceOutTest, FailsToParseIncorrectType) {
BSONObj spec = BSON("$out" << 1);
- ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 16990);
+ ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 31278);
spec = BSON("$out" << BSONArray());
- ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 16990);
+ ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 31278);
spec = BSON("$out" << BSONObj());
- ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 16990);
+ ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 31278);
}
TEST_F(DocumentSourceOutTest, AcceptsStringArgument) {
diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h
index cbe85447f19..86bc3f1c2e2 100644
--- a/src/mongo/db/pipeline/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/mongo_process_interface.h
@@ -125,19 +125,13 @@ public:
virtual ~MongoProcessInterface(){};
/**
- * Sets the OperationContext of the DBDirectClient returned by directClient(). This method must
- * be called after updating the 'opCtx' member of the ExpressionContext associated with the
- * document source.
+ * Sets the OperationContext of the DBDirectClient used by mongo process interface functions.
+ * This method must be called after updating the 'opCtx' member of the ExpressionContext
+ * associated with the document source.
*/
virtual void setOperationContext(OperationContext* opCtx) = 0;
/**
- * Always returns a DBDirectClient. The return type in the function signature is a DBClientBase*
- * because DBDirectClient isn't linked into mongos.
- */
- virtual DBClientBase* directClient() = 0;
-
- /**
* Creates a new TransactionHistoryIterator object. Only applicable in processes which support
* locally traversing the oplog.
*/
@@ -179,6 +173,10 @@ public:
virtual CollectionIndexUsageMap getIndexStats(OperationContext* opCtx,
const NamespaceString& ns) = 0;
+ virtual std::list<BSONObj> getIndexSpecs(OperationContext* opCtx,
+ const NamespaceString& ns,
+ bool includeBuildUUIDs) = 0;
+
/**
* Appends operation latency statistics for collection "nss" to "builder"
*/
@@ -213,7 +211,7 @@ public:
* ErrorCodes::CommandNotSupportedOnView if 'nss' describes a view. Future callers may want to
* parameterize this behavior.
*/
- virtual BSONObj getCollectionOptions(const NamespaceString& nss) = 0;
+ virtual BSONObj getCollectionOptions(OperationContext* opCtx, const NamespaceString& nss) = 0;
/**
* Performs the given rename command if the collection given by 'targetNs' has the same options
@@ -229,6 +227,24 @@ public:
const std::list<BSONObj>& originalIndexes) = 0;
/**
+ * Creates a collection on the given database by running the given command. On shardsvr targets
+ * the primary shard of 'dbName'.
+ */
+ virtual void createCollection(OperationContext* opCtx,
+ const std::string& dbName,
+ const BSONObj& cmdObj) = 0;
+
+ /**
+ * Runs createIndexes on the given database for the given index specs. If running on a shardsvr
+ * this targets the primary shard of the database part of 'ns'.
+ */
+ virtual void createIndexes(OperationContext* opCtx,
+ const NamespaceString& ns,
+ const std::vector<BSONObj>& indexSpecs) = 0;
+
+ virtual void dropCollection(OperationContext* opCtx, const NamespaceString& collection) = 0;
+
+ /**
* Parses a Pipeline from a vector of BSONObjs representing DocumentSources. The state of the
* returned pipeline will depend upon the supplied MakePipelineOptions:
* - The boolean opts.optimize determines whether the pipeline will be optimized.
diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h
index f94e286d6c0..fcb3a1d1a5d 100644
--- a/src/mongo/db/pipeline/mongos_process_interface.h
+++ b/src/mongo/db/pipeline/mongos_process_interface.h
@@ -67,10 +67,6 @@ public:
MONGO_UNREACHABLE;
}
- DBClientBase* directClient() final {
- MONGO_UNREACHABLE;
- }
-
bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final;
Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
@@ -95,6 +91,11 @@ public:
const NamespaceString& ns) final {
MONGO_UNREACHABLE;
}
+ std::list<BSONObj> getIndexSpecs(OperationContext* opCtx,
+ const NamespaceString& ns,
+ bool includeBuildUUIDs) final {
+ MONGO_UNREACHABLE;
+ }
void appendLatencyStats(OperationContext* opCtx,
const NamespaceString& nss,
@@ -122,7 +123,7 @@ public:
MONGO_UNREACHABLE;
}
- BSONObj getCollectionOptions(const NamespaceString& nss) final {
+ BSONObj getCollectionOptions(OperationContext* opCtx, const NamespaceString& nss) final {
MONGO_UNREACHABLE;
}
@@ -134,6 +135,22 @@ public:
MONGO_UNREACHABLE;
}
+ void createCollection(OperationContext* opCtx,
+ const std::string& dbName,
+ const BSONObj& cmdObj) final {
+ MONGO_UNREACHABLE;
+ }
+
+ void createIndexes(OperationContext* opCtx,
+ const NamespaceString& ns,
+ const std::vector<BSONObj>& indexSpecs) final {
+ MONGO_UNREACHABLE;
+ }
+
+ void dropCollection(OperationContext* opCtx, const NamespaceString& collection) final {
+ MONGO_UNREACHABLE;
+ }
+
std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final;
diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.cpp b/src/mongo/db/pipeline/process_interface_shardsvr.cpp
index b8b2a31c793..af3c68ee750 100644
--- a/src/mongo/db/pipeline/process_interface_shardsvr.cpp
+++ b/src/mongo/db/pipeline/process_interface_shardsvr.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/catalog/collection_catalog.h"
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/catalog/document_validation.h"
+#include "mongo/db/commands.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/curop.h"
#include "mongo/db/db_raii.h"
@@ -49,6 +50,7 @@
#include "mongo/db/query/query_knobs_gen.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/sharding_state.h"
+#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/s/cluster_commands_helpers.h"
#include "mongo/s/grid.h"
@@ -210,4 +212,148 @@ std::unique_ptr<ShardFilterer> MongoInterfaceShardServer::getShardFilterer(
return std::make_unique<ShardFiltererImpl>(std::move(shardingMetadata));
}
+void MongoInterfaceShardServer::renameIfOptionsAndIndexesHaveNotChanged(
+ OperationContext* opCtx,
+ const BSONObj& renameCommandObj,
+ const NamespaceString& destinationNs,
+ const BSONObj& originalCollectionOptions,
+ const std::list<BSONObj>& originalIndexes) {
+ BSONObjBuilder newCmd;
+ newCmd.append("internalRenameIfOptionsAndIndexesMatch", 1);
+ newCmd.append("from", renameCommandObj["renameCollection"].String());
+ newCmd.append("to", renameCommandObj["to"].String());
+ newCmd.append("collectionOptions", originalCollectionOptions);
+ if (!opCtx->getWriteConcern().usedDefault) {
+ newCmd.append(WriteConcernOptions::kWriteConcernField, opCtx->getWriteConcern().toBSON());
+ }
+ BSONArrayBuilder indexArrayBuilder(newCmd.subarrayStart("indexes"));
+ for (auto&& index : originalIndexes) {
+ indexArrayBuilder.append(index);
+ }
+ indexArrayBuilder.done();
+ auto cachedDbInfo =
+ uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, destinationNs.db()));
+ auto newCmdObj = newCmd.obj();
+ auto response =
+ executeCommandAgainstDatabasePrimary(opCtx,
+ // internalRenameIfOptionsAndIndexesMatch is adminOnly.
+ NamespaceString::kAdminDb,
+ std::move(cachedDbInfo),
+ newCmdObj,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ Shard::RetryPolicy::kNoRetry);
+ uassertStatusOKWithContext(response.swResponse,
+ str::stream() << "failed while running command " << newCmdObj);
+ auto result = response.swResponse.getValue().data;
+ uassertStatusOKWithContext(getStatusFromCommandResult(result),
+ str::stream() << "failed while running command " << newCmdObj);
+ uassertStatusOKWithContext(getWriteConcernStatusFromCommandResult(result),
+ str::stream() << "failed while running command " << newCmdObj);
+}
+
+std::list<BSONObj> MongoInterfaceShardServer::getIndexSpecs(OperationContext* opCtx,
+ const NamespaceString& ns,
+ bool includeBuildUUIDs) {
+ auto cachedDbInfo =
+ uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, ns.db()));
+ auto shard = uassertStatusOK(
+ Grid::get(opCtx)->shardRegistry()->getShard(opCtx, cachedDbInfo.primaryId()));
+ auto cmdObj = BSON("listIndexes" << ns.coll());
+ Shard::QueryResponse indexes;
+ try {
+ indexes = uassertStatusOK(
+ shard->runExhaustiveCursorCommand(opCtx,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ ns.db().toString(),
+ appendDbVersionIfPresent(cmdObj, cachedDbInfo),
+ Milliseconds(-1)));
+ } catch (ExceptionFor<ErrorCodes::NamespaceNotFound>&) {
+ return std::list<BSONObj>();
+ }
+ return std::list<BSONObj>(indexes.docs.begin(), indexes.docs.end());
+}
+void MongoInterfaceShardServer::createCollection(OperationContext* opCtx,
+ const std::string& dbName,
+ const BSONObj& cmdObj) {
+ auto cachedDbInfo =
+ uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName));
+ BSONObj finalCmdObj = cmdObj;
+ if (!opCtx->getWriteConcern().usedDefault) {
+ auto writeObj =
+ BSON(WriteConcernOptions::kWriteConcernField << opCtx->getWriteConcern().toBSON());
+ finalCmdObj = cmdObj.addField(writeObj.getField(WriteConcernOptions::kWriteConcernField));
+ }
+ auto response =
+ executeCommandAgainstDatabasePrimary(opCtx,
+ dbName,
+ std::move(cachedDbInfo),
+ finalCmdObj,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ Shard::RetryPolicy::kIdempotent);
+ uassertStatusOKWithContext(response.swResponse,
+ str::stream() << "failed while running command " << finalCmdObj);
+ auto result = response.swResponse.getValue().data;
+ uassertStatusOKWithContext(getStatusFromCommandResult(result),
+ str::stream() << "failed while running command " << finalCmdObj);
+ uassertStatusOKWithContext(getWriteConcernStatusFromCommandResult(result),
+ str::stream()
+ << "write concern failed while running command " << finalCmdObj);
+}
+
+void MongoInterfaceShardServer::createIndexes(OperationContext* opCtx,
+ const NamespaceString& ns,
+ const std::vector<BSONObj>& indexSpecs) {
+ auto cachedDbInfo = Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, ns.db());
+ BSONObjBuilder newCmdBuilder;
+ newCmdBuilder.append("createIndexes", ns.coll());
+ newCmdBuilder.append("indexes", indexSpecs);
+ if (!opCtx->getWriteConcern().usedDefault) {
+ newCmdBuilder.append(WriteConcernOptions::kWriteConcernField,
+ opCtx->getWriteConcern().toBSON());
+ }
+ auto cmdObj = newCmdBuilder.done();
+ auto response =
+ executeCommandAgainstDatabasePrimary(opCtx,
+ ns.db(),
+ cachedDbInfo.getValue(),
+ cmdObj,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ Shard::RetryPolicy::kIdempotent);
+ uassertStatusOKWithContext(response.swResponse,
+ str::stream() << "failed while running command " << cmdObj);
+ auto result = response.swResponse.getValue().data;
+ uassertStatusOKWithContext(getStatusFromCommandResult(result),
+ str::stream() << "failed while running command " << cmdObj);
+ uassertStatusOKWithContext(getWriteConcernStatusFromCommandResult(result),
+ str::stream()
+ << "write concern failed while running command " << cmdObj);
+}
+void MongoInterfaceShardServer::dropCollection(OperationContext* opCtx, const NamespaceString& ns) {
+ // Build and execute the dropCollection command against the primary shard of the given
+ // database.
+ auto cachedDbInfo = Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, ns.db());
+ BSONObjBuilder newCmdBuilder;
+ newCmdBuilder.append("drop", ns.coll());
+ if (!opCtx->getWriteConcern().usedDefault) {
+ newCmdBuilder.append(WriteConcernOptions::kWriteConcernField,
+ opCtx->getWriteConcern().toBSON());
+ }
+ auto cmdObj = newCmdBuilder.done();
+ auto response =
+ executeCommandAgainstDatabasePrimary(opCtx,
+ ns.db(),
+ cachedDbInfo.getValue(),
+ cmdObj,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ Shard::RetryPolicy::kIdempotent);
+ uassertStatusOKWithContext(response.swResponse,
+ str::stream() << "failed while running command " << cmdObj);
+ auto result = response.swResponse.getValue().data;
+ uassertStatusOKWithContext(getStatusFromCommandResult(result),
+ str::stream() << "failed while running command " << cmdObj);
+ uassertStatusOKWithContext(getWriteConcernStatusFromCommandResult(result),
+ str::stream()
+ << "write concern failed while running command " << cmdObj);
+}
+
} // namespace mongo
diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.h b/src/mongo/db/pipeline/process_interface_shardsvr.h
index 8b4666c9b55..64af23bce19 100644
--- a/src/mongo/db/pipeline/process_interface_shardsvr.h
+++ b/src/mongo/db/pipeline/process_interface_shardsvr.h
@@ -87,6 +87,22 @@ public:
std::unique_ptr<ShardFilterer> getShardFilterer(
const boost::intrusive_ptr<ExpressionContext>& expCtx) const override final;
+
+ std::list<BSONObj> getIndexSpecs(OperationContext* opCtx,
+ const NamespaceString& ns,
+ bool includeBuildUUIDs) final;
+ void renameIfOptionsAndIndexesHaveNotChanged(OperationContext* opCtx,
+ const BSONObj& renameCommandObj,
+ const NamespaceString& targetNs,
+ const BSONObj& originalCollectionOptions,
+ const std::list<BSONObj>& originalIndexes) final;
+ void createCollection(OperationContext* opCtx,
+ const std::string& dbName,
+ const BSONObj& cmdObj) final;
+ void createIndexes(OperationContext* opCtx,
+ const NamespaceString& ns,
+ const std::vector<BSONObj>& indexSpecs) final;
+ void dropCollection(OperationContext* opCtx, const NamespaceString& collection) final;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp
index c26d1642b1d..aebdebc1ffb 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.cpp
+++ b/src/mongo/db/pipeline/process_interface_standalone.cpp
@@ -36,10 +36,15 @@
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/collection_catalog.h"
+#include "mongo/db/catalog/create_collection.h"
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/catalog/document_validation.h"
+#include "mongo/db/catalog/drop_collection.h"
#include "mongo/db/catalog/index_catalog_entry.h"
+#include "mongo/db/catalog/list_indexes.h"
+#include "mongo/db/catalog/rename_collection.h"
#include "mongo/db/concurrency/d_concurrency.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/curop.h"
#include "mongo/db/cursor_manager.h"
#include "mongo/db/db_raii.h"
@@ -58,6 +63,7 @@
#include "mongo/db/stats/fill_locker_info.h"
#include "mongo/db/stats/storage_stats.h"
#include "mongo/db/storage/backup_cursor_hooks.h"
+#include "mongo/db/storage/durable_catalog.h"
#include "mongo/db/transaction_history_iterator.h"
#include "mongo/db/transaction_participant.h"
#include "mongo/s/cluster_commands_helpers.h"
@@ -151,10 +157,6 @@ void MongoInterfaceStandalone::setOperationContext(OperationContext* opCtx) {
_client.setOpCtx(opCtx);
}
-DBClientBase* MongoInterfaceStandalone::directClient() {
- return &_client;
-}
-
std::unique_ptr<TransactionHistoryIteratorBase>
MongoInterfaceStandalone::createTransactionHistoryIterator(repl::OpTime time) const {
bool permitYield = true;
@@ -272,6 +274,12 @@ CollectionIndexUsageMap MongoInterfaceStandalone::getIndexStats(OperationContext
return CollectionQueryInfo::get(collection).getIndexUsageStats();
}
+std::list<BSONObj> MongoInterfaceStandalone::getIndexSpecs(OperationContext* opCtx,
+ const NamespaceString& ns,
+ bool includeBuildUUIDs) {
+ return listIndexesEmptyListIfMissing(opCtx, ns, includeBuildUUIDs);
+}
+
void MongoInterfaceStandalone::appendLatencyStats(OperationContext* opCtx,
const NamespaceString& nss,
bool includeHistograms,
@@ -325,24 +333,22 @@ Status MongoInterfaceStandalone::appendQueryExecStats(OperationContext* opCtx,
return Status::OK();
}
-BSONObj MongoInterfaceStandalone::getCollectionOptions(const NamespaceString& nss) {
- std::list<BSONObj> infos;
-
- try {
- infos = _client.getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll()));
- if (infos.empty()) {
- return BSONObj();
- }
- } catch (const DBException& e) {
- uasserted(ErrorCodes::CommandFailed, e.reason());
+BSONObj MongoInterfaceStandalone::getCollectionOptions(OperationContext* opCtx,
+ const NamespaceString& nss) {
+ AutoGetCollectionForReadCommand autoColl(opCtx, nss);
+ BSONObj collectionOptions = {};
+ if (!autoColl.getDb()) {
+ return collectionOptions;
+ }
+ Collection* collection = autoColl.getCollection();
+ if (!collection) {
+ return collectionOptions;
}
- const auto& infoObj = infos.front();
- uassert(ErrorCodes::CommandNotSupportedOnView,
- str::stream() << nss.toString() << " is a view, not a collection",
- infoObj["type"].valueStringData() != "view"_sd);
-
- return infoObj.getObjectField("options").getOwned();
+ collectionOptions = DurableCatalog::get(opCtx)
+ ->getCollectionOptions(opCtx, collection->getCatalogId())
+ .toBSON();
+ return collectionOptions;
}
void MongoInterfaceStandalone::renameIfOptionsAndIndexesHaveNotChanged(
@@ -351,30 +357,31 @@ void MongoInterfaceStandalone::renameIfOptionsAndIndexesHaveNotChanged(
const NamespaceString& targetNs,
const BSONObj& originalCollectionOptions,
const std::list<BSONObj>& originalIndexes) {
- Lock::DBLock lk(opCtx, targetNs.db(), MODE_X);
-
- uassert(ErrorCodes::CommandFailed,
- str::stream() << "collection options of target collection " << targetNs.ns()
- << " changed during processing. Original options: "
- << originalCollectionOptions
- << ", new options: " << getCollectionOptions(targetNs),
- SimpleBSONObjComparator::kInstance.evaluate(originalCollectionOptions ==
- getCollectionOptions(targetNs)));
-
- auto currentIndexes = _client.getIndexSpecs(targetNs);
- uassert(ErrorCodes::CommandFailed,
- str::stream() << "indexes of target collection " << targetNs.ns()
- << " changed during processing.",
- originalIndexes.size() == currentIndexes.size() &&
- std::equal(originalIndexes.begin(),
- originalIndexes.end(),
- currentIndexes.begin(),
- SimpleBSONObjComparator::kInstance.makeEqualTo()));
-
- BSONObj info;
- uassert(ErrorCodes::CommandFailed,
- str::stream() << "renameCollection failed: " << info,
- _client.runCommand("admin", renameCommandObj, info));
+ NamespaceString sourceNs = NamespaceString(renameCommandObj["renameCollection"].String());
+ doLocalRenameIfOptionsAndIndexesHaveNotChanged(opCtx,
+ sourceNs,
+ targetNs,
+ renameCommandObj["dropTarget"].trueValue(),
+ renameCommandObj["stayTemp"].trueValue(),
+ originalIndexes,
+ originalCollectionOptions);
+}
+
+void MongoInterfaceStandalone::createCollection(OperationContext* opCtx,
+ const std::string& dbName,
+ const BSONObj& cmdObj) {
+ uassertStatusOK(mongo::createCollection(opCtx, dbName, cmdObj));
+}
+
+void MongoInterfaceStandalone::createIndexes(OperationContext* opCtx,
+ const NamespaceString& ns,
+ const std::vector<BSONObj>& indexSpecs) {
+ _client.createIndexes(ns.ns(), indexSpecs);
+}
+void MongoInterfaceStandalone::dropCollection(OperationContext* opCtx, const NamespaceString& ns) {
+ BSONObjBuilder result;
+ uassertStatusOK(mongo::dropCollection(
+ opCtx, ns, result, {}, DropCollectionSystemCollectionMode::kDisallowSystemCollectionDrops));
}
std::unique_ptr<Pipeline, PipelineDeleter> MongoInterfaceStandalone::makePipeline(
diff --git a/src/mongo/db/pipeline/process_interface_standalone.h b/src/mongo/db/pipeline/process_interface_standalone.h
index c045260e3e0..cb97e3267fb 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.h
+++ b/src/mongo/db/pipeline/process_interface_standalone.h
@@ -55,7 +55,6 @@ public:
virtual ~MongoInterfaceStandalone() = default;
void setOperationContext(OperationContext* opCtx) final;
- DBClientBase* directClient() final;
std::unique_ptr<TransactionHistoryIteratorBase> createTransactionHistoryIterator(
repl::OpTime time) const final;
@@ -78,6 +77,9 @@ public:
boost::optional<OID> targetEpoch) override;
CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, const NamespaceString& ns) final;
+ std::list<BSONObj> getIndexSpecs(OperationContext* opCtx,
+ const NamespaceString& ns,
+ bool includeBuildUUIDs);
void appendLatencyStats(OperationContext* opCtx,
const NamespaceString& nss,
bool includeHistograms,
@@ -92,12 +94,19 @@ public:
Status appendQueryExecStats(OperationContext* opCtx,
const NamespaceString& nss,
BSONObjBuilder* builder) const final override;
- BSONObj getCollectionOptions(const NamespaceString& nss) final;
+ BSONObj getCollectionOptions(OperationContext* opCtx, const NamespaceString& nss) final;
void renameIfOptionsAndIndexesHaveNotChanged(OperationContext* opCtx,
const BSONObj& renameCommandObj,
const NamespaceString& targetNs,
const BSONObj& originalCollectionOptions,
- const std::list<BSONObj>& originalIndexes) final;
+ const std::list<BSONObj>& originalIndexes);
+ void createCollection(OperationContext* opCtx,
+ const std::string& dbName,
+ const BSONObj& cmdObj);
+ void createIndexes(OperationContext* opCtx,
+ const NamespaceString& ns,
+ const std::vector<BSONObj>& indexSpecs);
+ void dropCollection(OperationContext* opCtx, const NamespaceString& collection);
std::unique_ptr<Pipeline, PipelineDeleter> makePipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h
index 381da454c5d..b9b84086f6e 100644
--- a/src/mongo/db/pipeline/stub_mongo_process_interface.h
+++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h
@@ -50,10 +50,6 @@ public:
MONGO_UNREACHABLE;
}
- DBClientBase* directClient() override {
- MONGO_UNREACHABLE;
- }
-
std::unique_ptr<TransactionHistoryIteratorBase> createTransactionHistoryIterator(
repl::OpTime time) const override {
MONGO_UNREACHABLE;
@@ -86,6 +82,12 @@ public:
MONGO_UNREACHABLE;
}
+ std::list<BSONObj> getIndexSpecs(OperationContext* opCtx,
+ const NamespaceString& ns,
+ bool includeBuildUUIDs) override {
+ MONGO_UNREACHABLE;
+ }
+
void appendLatencyStats(OperationContext* opCtx,
const NamespaceString& nss,
bool includeHistograms,
@@ -112,7 +114,7 @@ public:
MONGO_UNREACHABLE;
}
- BSONObj getCollectionOptions(const NamespaceString& nss) override {
+ BSONObj getCollectionOptions(OperationContext* opCtx, const NamespaceString& nss) override {
MONGO_UNREACHABLE;
}
@@ -125,6 +127,21 @@ public:
MONGO_UNREACHABLE;
}
+ void createCollection(OperationContext* opCtx,
+ const std::string& dbName,
+ const BSONObj& cmdObj) override {
+ MONGO_UNREACHABLE;
+ }
+
+ void createIndexes(OperationContext* opCtx,
+ const NamespaceString& ns,
+ const std::vector<BSONObj>& indexSpecs) override {
+ MONGO_UNREACHABLE;
+ }
+ void dropCollection(OperationContext* opCtx, const NamespaceString& ns) override {
+ MONGO_UNREACHABLE;
+ }
+
std::unique_ptr<Pipeline, PipelineDeleter> makePipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,