summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2019-12-18 20:50:41 +0000
committerevergreen <evergreen@mongodb.com>2019-12-18 20:50:41 +0000
commita8096dd2721ebbb2e2e3e8dc6ce48cd11ba2ef8b (patch)
treef7369d5b46d2d2d8401fbe73058616d7c1bd058c /src/mongo
parent19b8a234cea0bc9953c43d449d96359048140e50 (diff)
downloadmongo-a8096dd2721ebbb2e2e3e8dc6ce48cd11ba2ef8b.tar.gz
SERVER-44318 M/R Agg: Reject MapReduce commands which would create a new sharded collection
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/commands/map_reduce_agg_test.cpp18
-rw-r--r--src/mongo/db/commands/mr_common.cpp66
-rw-r--r--src/mongo/db/pipeline/mongo_process_common.h2
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.h5
4 files changed, 51 insertions, 40 deletions
diff --git a/src/mongo/db/commands/map_reduce_agg_test.cpp b/src/mongo/db/commands/map_reduce_agg_test.cpp
index 9845fa05ff0..a5859d040ca 100644
--- a/src/mongo/db/commands/map_reduce_agg_test.cpp
+++ b/src/mongo/db/commands/map_reduce_agg_test.cpp
@@ -256,7 +256,7 @@ TEST(MapReduceAggTest, testSourceDestinationCollectionsNotEqualMergeDoesNotFail)
ASSERT_DOES_NOT_THROW(map_reduce_common::translateFromMR(mr, expCtx));
}
-TEST(MapReduceAggTest, testShardedTrueWithReplaceActionFailsOnMongos) {
+TEST(MapReduceAggTest, testShardedTrueWithReplaceActionIsNotAllowed) {
auto nss = NamespaceString{"db", "coll"};
auto mr = MapReduce{
nss,
@@ -264,20 +264,8 @@ TEST(MapReduceAggTest, testShardedTrueWithReplaceActionFailsOnMongos) {
MapReduceJavascriptCode{reduceJavascript.toString()},
MapReduceOutOptions{boost::make_optional("db"s), "coll2", OutputType::Replace, true}};
boost::intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest(nss));
- expCtx->inMongos = true;
- ASSERT_THROWS_CODE(map_reduce_common::translateFromMR(mr, expCtx), DBException, 31327);
-}
-
-
-TEST(MapReduceAggTest, testShardedTrueWithReplaceActionDoesNotFailOnMongod) {
- auto nss = NamespaceString{"db", "coll"};
- auto mr = MapReduce{
- nss,
- MapReduceJavascriptCode{mapJavascript.toString()},
- MapReduceJavascriptCode{reduceJavascript.toString()},
- MapReduceOutOptions{boost::make_optional("db"s), "coll2", OutputType::Replace, true}};
- boost::intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest(nss));
- ASSERT_DOES_NOT_THROW(map_reduce_common::translateFromMR(mr, expCtx));
+ ASSERT_THROWS_CODE(
+ map_reduce_common::translateFromMR(mr, expCtx), DBException, ErrorCodes::InvalidOptions);
}
TEST(MapReduceAggTest, testErrorMessagesTranslated) {
diff --git a/src/mongo/db/commands/mr_common.cpp b/src/mongo/db/commands/mr_common.cpp
index c6b8750adbf..fcdf998e125 100644
--- a/src/mongo/db/commands/mr_common.cpp
+++ b/src/mongo/db/commands/mr_common.cpp
@@ -166,19 +166,22 @@ auto translateOutReplace(boost::intrusive_ptr<ExpressionContext> expCtx,
return DocumentSourceOut::createAndAllowDifferentDB(std::move(targetNss), expCtx);
}
-auto translateOutMerge(boost::intrusive_ptr<ExpressionContext> expCtx, NamespaceString targetNss) {
- return DocumentSourceMerge::create(targetNss,
+auto translateOutMerge(boost::intrusive_ptr<ExpressionContext> expCtx,
+ NamespaceString targetNss,
+ boost::optional<ChunkVersion> targetCollectionVersion) {
+ return DocumentSourceMerge::create(std::move(targetNss),
expCtx,
MergeWhenMatchedModeEnum::kReplace,
MergeWhenNotMatchedModeEnum::kInsert,
boost::none, // Let variables
boost::none, // pipeline
std::set<FieldPath>{FieldPath("_id"s)},
- boost::none); // targetCollectionVersion
+ std::move(targetCollectionVersion));
}
auto translateOutReduce(boost::intrusive_ptr<ExpressionContext> expCtx,
NamespaceString targetNss,
+ boost::optional<ChunkVersion> targetCollectionVersion,
std::string reduceCode,
boost::optional<MapReduceJavascriptCodeOrNull> finalizeCode) {
// Because of communication for sharding, $merge must hold on to a serializable BSON object
@@ -204,29 +207,51 @@ auto translateOutReduce(boost::intrusive_ptr<ExpressionContext> expCtx,
pipelineSpec->emplace_back(std::move(finalizeSpec));
}
- return DocumentSourceMerge::create(targetNss,
+ return DocumentSourceMerge::create(std::move(targetNss),
expCtx,
MergeWhenMatchedModeEnum::kPipeline,
MergeWhenNotMatchedModeEnum::kInsert,
boost::none, // Let variables
pipelineSpec,
std::set<FieldPath>{FieldPath("_id"s)},
- boost::none); // targetCollectionVersion
+ std::move(targetCollectionVersion));
+}
+
+void rejectRequestsToCreateShardedCollections(
+ const MapReduceOutOptions& outOptions,
+ const boost::optional<ChunkVersion>& targetCollectionVersion) {
+ uassert(ErrorCodes::InvalidOptions,
+ "Combination of 'out.sharded' and 'replace' output mode is not supported. Cannot "
+ "replace an existing sharded collection or create a new sharded collection. Please "
+ "create the sharded collection first and use a different output mode or consider using "
+ "an unsharded collection.",
+ !(outOptions.getOutputType() == OutputType::Replace && outOptions.isSharded()));
+ uassert(ErrorCodes::InvalidOptions,
+ "Cannot use mapReduce to create a new sharded collection. Please create and shard the"
+ " target collection before proceeding.",
+ targetCollectionVersion || !outOptions.isSharded());
}
auto translateOut(boost::intrusive_ptr<ExpressionContext> expCtx,
- const OutputType outputType,
+ const MapReduceOutOptions& outOptions,
NamespaceString targetNss,
+ boost::optional<ChunkVersion> targetCollectionVersion,
std::string reduceCode,
boost::optional<MapReduceJavascriptCodeOrNull> finalizeCode) {
- switch (outputType) {
+ rejectRequestsToCreateShardedCollections(outOptions, targetCollectionVersion);
+
+ switch (outOptions.getOutputType()) {
case OutputType::Replace:
- return boost::make_optional(translateOutReplace(expCtx, targetNss));
+ return boost::make_optional(translateOutReplace(expCtx, std::move(targetNss)));
case OutputType::Merge:
- return boost::make_optional(translateOutMerge(expCtx, targetNss));
+ return boost::make_optional(translateOutMerge(
+ expCtx, std::move(targetNss), std::move(targetCollectionVersion)));
case OutputType::Reduce:
- return boost::make_optional(translateOutReduce(
- expCtx, targetNss, std::move(reduceCode), std::move(finalizeCode)));
+ return boost::make_optional(translateOutReduce(expCtx,
+ std::move(targetNss),
+ std::move(targetCollectionVersion),
+ std::move(reduceCode),
+ std::move(finalizeCode)));
case OutputType::InMemory:;
}
return boost::optional<boost::intrusive_ptr<mongo::DocumentSource>>{};
@@ -354,12 +379,12 @@ std::unique_ptr<Pipeline, PipelineDeleter> translateFromMR(
: parsedMr.getNamespace().db(),
parsedMr.getOutOptions().getCollectionName()};
- auto outType = parsedMr.getOutOptions().getOutputType();
-
+ std::set<FieldPath> shardKey;
+ boost::optional<ChunkVersion> targetCollectionVersion;
// If non-inline output, verify that the target collection is *not* sharded by anything other
// than _id.
- if (outType != OutputType::InMemory) {
- auto [shardKey, targetCollectionVersion] =
+ if (parsedMr.getOutOptions().getOutputType() != OutputType::InMemory) {
+ std::tie(shardKey, targetCollectionVersion) =
expCtx->mongoProcessInterface->ensureFieldsUniqueOrResolveDocumentKey(
expCtx, boost::none, boost::none, outNss);
uassert(31313,
@@ -368,14 +393,6 @@ std::unique_ptr<Pipeline, PipelineDeleter> translateFromMR(
shardKey == std::set<FieldPath>{FieldPath("_id"s)});
}
- // If sharded option is set to true and the replace action is specified, verify that this isn't
- // running on mongos.
- if (outType == OutputType::Replace && parsedMr.getOutOptions().isSharded()) {
- uassert(31327,
- "Cannot replace output collection when specifying sharded: true",
- !expCtx->inMongos);
- }
-
try {
auto pipeline = uassertStatusOK(Pipeline::create(
makeFlattenedList<boost::intrusive_ptr<DocumentSource>>(
@@ -390,8 +407,9 @@ std::unique_ptr<Pipeline, PipelineDeleter> translateFromMR(
parsedMr.getFinalize().flat_map(
[&](auto&& finalize) { return translateFinalize(expCtx, finalize); }),
translateOut(expCtx,
- outType,
+ parsedMr.getOutOptions(),
std::move(outNss),
+ std::move(targetCollectionVersion),
parsedMr.getReduce().getCode(),
parsedMr.getFinalize())),
expCtx));
diff --git a/src/mongo/db/pipeline/mongo_process_common.h b/src/mongo/db/pipeline/mongo_process_common.h
index 08b41addcfb..0b6cd75c764 100644
--- a/src/mongo/db/pipeline/mongo_process_common.h
+++ b/src/mongo/db/pipeline/mongo_process_common.h
@@ -65,7 +65,7 @@ public:
boost::optional<ChunkVersion> refreshAndGetCollectionVersion(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& nss) const final;
+ const NamespaceString& nss) const override;
std::string getHostAndPort(OperationContext* opCtx) const override;
diff --git a/src/mongo/db/pipeline/process_interface_standalone.h b/src/mongo/db/pipeline/process_interface_standalone.h
index 55c75cd893d..4cad0d70591 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.h
+++ b/src/mongo/db/pipeline/process_interface_standalone.h
@@ -155,6 +155,11 @@ public:
const NamespaceString& nss,
const std::set<FieldPath>& fieldPaths) const;
+ boost::optional<ChunkVersion> refreshAndGetCollectionVersion(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& nss) const final {
+ return boost::none; // Nothing is sharded here.
+ }
virtual void checkRoutingInfoEpochOrThrow(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& nss,
ChunkVersion targetCollectionVersion) const override {