diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2019-12-18 20:50:41 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-12-18 20:50:41 +0000 |
commit | a8096dd2721ebbb2e2e3e8dc6ce48cd11ba2ef8b (patch) | |
tree | f7369d5b46d2d2d8401fbe73058616d7c1bd058c /src/mongo | |
parent | 19b8a234cea0bc9953c43d449d96359048140e50 (diff) | |
download | mongo-a8096dd2721ebbb2e2e3e8dc6ce48cd11ba2ef8b.tar.gz |
SERVER-44318 M/R Agg: Reject MapReduce commands which would create a new sharded collection
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/commands/map_reduce_agg_test.cpp | 18 | ||||
-rw-r--r-- | src/mongo/db/commands/mr_common.cpp | 66 | ||||
-rw-r--r-- | src/mongo/db/pipeline/mongo_process_common.h | 2 | ||||
-rw-r--r-- | src/mongo/db/pipeline/process_interface_standalone.h | 5 |
4 files changed, 51 insertions, 40 deletions
diff --git a/src/mongo/db/commands/map_reduce_agg_test.cpp b/src/mongo/db/commands/map_reduce_agg_test.cpp index 9845fa05ff0..a5859d040ca 100644 --- a/src/mongo/db/commands/map_reduce_agg_test.cpp +++ b/src/mongo/db/commands/map_reduce_agg_test.cpp @@ -256,7 +256,7 @@ TEST(MapReduceAggTest, testSourceDestinationCollectionsNotEqualMergeDoesNotFail) ASSERT_DOES_NOT_THROW(map_reduce_common::translateFromMR(mr, expCtx)); } -TEST(MapReduceAggTest, testShardedTrueWithReplaceActionFailsOnMongos) { +TEST(MapReduceAggTest, testShardedTrueWithReplaceActionIsNotAllowed) { auto nss = NamespaceString{"db", "coll"}; auto mr = MapReduce{ nss, @@ -264,20 +264,8 @@ TEST(MapReduceAggTest, testShardedTrueWithReplaceActionFailsOnMongos) { MapReduceJavascriptCode{reduceJavascript.toString()}, MapReduceOutOptions{boost::make_optional("db"s), "coll2", OutputType::Replace, true}}; boost::intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest(nss)); - expCtx->inMongos = true; - ASSERT_THROWS_CODE(map_reduce_common::translateFromMR(mr, expCtx), DBException, 31327); -} - - -TEST(MapReduceAggTest, testShardedTrueWithReplaceActionDoesNotFailOnMongod) { - auto nss = NamespaceString{"db", "coll"}; - auto mr = MapReduce{ - nss, - MapReduceJavascriptCode{mapJavascript.toString()}, - MapReduceJavascriptCode{reduceJavascript.toString()}, - MapReduceOutOptions{boost::make_optional("db"s), "coll2", OutputType::Replace, true}}; - boost::intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest(nss)); - ASSERT_DOES_NOT_THROW(map_reduce_common::translateFromMR(mr, expCtx)); + ASSERT_THROWS_CODE( + map_reduce_common::translateFromMR(mr, expCtx), DBException, ErrorCodes::InvalidOptions); } TEST(MapReduceAggTest, testErrorMessagesTranslated) { diff --git a/src/mongo/db/commands/mr_common.cpp b/src/mongo/db/commands/mr_common.cpp index c6b8750adbf..fcdf998e125 100644 --- a/src/mongo/db/commands/mr_common.cpp +++ b/src/mongo/db/commands/mr_common.cpp @@ -166,19 +166,22 @@ auto translateOutReplace(boost::intrusive_ptr<ExpressionContext> expCtx, return DocumentSourceOut::createAndAllowDifferentDB(std::move(targetNss), expCtx); } -auto translateOutMerge(boost::intrusive_ptr<ExpressionContext> expCtx, NamespaceString targetNss) { - return DocumentSourceMerge::create(targetNss, +auto translateOutMerge(boost::intrusive_ptr<ExpressionContext> expCtx, + NamespaceString targetNss, + boost::optional<ChunkVersion> targetCollectionVersion) { + return DocumentSourceMerge::create(std::move(targetNss), expCtx, MergeWhenMatchedModeEnum::kReplace, MergeWhenNotMatchedModeEnum::kInsert, boost::none, // Let variables boost::none, // pipeline std::set<FieldPath>{FieldPath("_id"s)}, - boost::none); // targetCollectionVersion + std::move(targetCollectionVersion)); } auto translateOutReduce(boost::intrusive_ptr<ExpressionContext> expCtx, NamespaceString targetNss, + boost::optional<ChunkVersion> targetCollectionVersion, std::string reduceCode, boost::optional<MapReduceJavascriptCodeOrNull> finalizeCode) { // Because of communication for sharding, $merge must hold on to a serializable BSON object @@ -204,29 +207,51 @@ auto translateOutReduce(boost::intrusive_ptr<ExpressionContext> expCtx, pipelineSpec->emplace_back(std::move(finalizeSpec)); } - return DocumentSourceMerge::create(targetNss, + return DocumentSourceMerge::create(std::move(targetNss), expCtx, MergeWhenMatchedModeEnum::kPipeline, MergeWhenNotMatchedModeEnum::kInsert, boost::none, // Let variables pipelineSpec, std::set<FieldPath>{FieldPath("_id"s)}, - boost::none); // targetCollectionVersion + std::move(targetCollectionVersion)); +} + +void rejectRequestsToCreateShardedCollections( + const MapReduceOutOptions& outOptions, + const boost::optional<ChunkVersion>& targetCollectionVersion) { + uassert(ErrorCodes::InvalidOptions, + "Combination of 'out.sharded' and 'replace' output mode is not supported. Cannot " + "replace an existing sharded collection or create a new sharded collection. Please " + "create the sharded collection first and use a different output mode or consider using " + "an unsharded collection.", + !(outOptions.getOutputType() == OutputType::Replace && outOptions.isSharded())); + uassert(ErrorCodes::InvalidOptions, + "Cannot use mapReduce to create a new sharded collection. Please create and shard the" + " target collection before proceeding.", + targetCollectionVersion || !outOptions.isSharded()); } auto translateOut(boost::intrusive_ptr<ExpressionContext> expCtx, - const OutputType outputType, + const MapReduceOutOptions& outOptions, NamespaceString targetNss, + boost::optional<ChunkVersion> targetCollectionVersion, std::string reduceCode, boost::optional<MapReduceJavascriptCodeOrNull> finalizeCode) { - switch (outputType) { + rejectRequestsToCreateShardedCollections(outOptions, targetCollectionVersion); + + switch (outOptions.getOutputType()) { case OutputType::Replace: - return boost::make_optional(translateOutReplace(expCtx, targetNss)); + return boost::make_optional(translateOutReplace(expCtx, std::move(targetNss))); case OutputType::Merge: - return boost::make_optional(translateOutMerge(expCtx, targetNss)); + return boost::make_optional(translateOutMerge( + expCtx, std::move(targetNss), std::move(targetCollectionVersion))); case OutputType::Reduce: - return boost::make_optional(translateOutReduce( - expCtx, targetNss, std::move(reduceCode), std::move(finalizeCode))); + return boost::make_optional(translateOutReduce(expCtx, + std::move(targetNss), + std::move(targetCollectionVersion), + std::move(reduceCode), + std::move(finalizeCode))); case OutputType::InMemory:; } return boost::optional<boost::intrusive_ptr<mongo::DocumentSource>>{}; @@ -354,12 +379,12 @@ std::unique_ptr<Pipeline, PipelineDeleter> translateFromMR( : parsedMr.getNamespace().db(), parsedMr.getOutOptions().getCollectionName()}; - auto outType = parsedMr.getOutOptions().getOutputType(); - + std::set<FieldPath> shardKey; + boost::optional<ChunkVersion> targetCollectionVersion; // If non-inline output, verify that the target collection is *not* sharded by anything other // than _id. - if (outType != OutputType::InMemory) { - auto [shardKey, targetCollectionVersion] = + if (parsedMr.getOutOptions().getOutputType() != OutputType::InMemory) { + std::tie(shardKey, targetCollectionVersion) = expCtx->mongoProcessInterface->ensureFieldsUniqueOrResolveDocumentKey( expCtx, boost::none, boost::none, outNss); uassert(31313, @@ -368,14 +393,6 @@ std::unique_ptr<Pipeline, PipelineDeleter> translateFromMR( shardKey == std::set<FieldPath>{FieldPath("_id"s)}); } - // If sharded option is set to true and the replace action is specified, verify that this isn't - // running on mongos. - if (outType == OutputType::Replace && parsedMr.getOutOptions().isSharded()) { - uassert(31327, - "Cannot replace output collection when specifying sharded: true", - !expCtx->inMongos); - } - try { auto pipeline = uassertStatusOK(Pipeline::create( makeFlattenedList<boost::intrusive_ptr<DocumentSource>>( @@ -390,8 +407,9 @@ std::unique_ptr<Pipeline, PipelineDeleter> translateFromMR( parsedMr.getFinalize().flat_map( [&](auto&& finalize) { return translateFinalize(expCtx, finalize); }), translateOut(expCtx, - outType, + parsedMr.getOutOptions(), std::move(outNss), + std::move(targetCollectionVersion), parsedMr.getReduce().getCode(), parsedMr.getFinalize())), expCtx)); diff --git a/src/mongo/db/pipeline/mongo_process_common.h b/src/mongo/db/pipeline/mongo_process_common.h index 08b41addcfb..0b6cd75c764 100644 --- a/src/mongo/db/pipeline/mongo_process_common.h +++ b/src/mongo/db/pipeline/mongo_process_common.h @@ -65,7 +65,7 @@ public: boost::optional<ChunkVersion> refreshAndGetCollectionVersion( const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& nss) const final; + const NamespaceString& nss) const override; std::string getHostAndPort(OperationContext* opCtx) const override; diff --git a/src/mongo/db/pipeline/process_interface_standalone.h b/src/mongo/db/pipeline/process_interface_standalone.h index 55c75cd893d..4cad0d70591 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.h +++ b/src/mongo/db/pipeline/process_interface_standalone.h @@ -155,6 +155,11 @@ public: const NamespaceString& nss, const std::set<FieldPath>& fieldPaths) const; + boost::optional<ChunkVersion> refreshAndGetCollectionVersion( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& nss) const final { + return boost::none; // Nothing is sharded here. + } virtual void checkRoutingInfoEpochOrThrow(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss, ChunkVersion targetCollectionVersion) const override { |