/** * Copyright (C) 2014 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ /** * This file contains tests for mongo/db/commands/mr.h */ #include "mongo/db/commands/mr.h" #include #include #include #include #include "mongo/db/catalog/collection_options.h" #include "mongo/db/client.h" #include "mongo/db/commands.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/json.h" #include "mongo/db/op_observer_noop.h" #include "mongo/db/op_observer_registry.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/repl/storage_interface_impl.h" #include "mongo/db/service_context_d_test_fixture.h" #include "mongo/rpc/factory.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/scripting/dbdirectclient_factory.h" #include "mongo/scripting/engine.h" #include "mongo/unittest/unittest.h" namespace mongo { namespace { /** * Tests for mr::Config */ /** * Helper function to verify field of mr::Config::OutputOptions. */ template void _compareOutputOptionField(const std::string& dbname, const std::string& cmdObjStr, const std::string& fieldName, const T& actual, const T& expected) { if (actual == expected) return; FAIL(str::stream() << "parseOutputOptions(\"" << dbname << ", " << cmdObjStr << "): " << fieldName << ": Expected: " << expected << ". Actual: " << actual); } /** * Returns string representation of mr::Config::OutputType */ std::string _getOutTypeString(mr::Config::OutputType outType) { switch (outType) { case mr::Config::REPLACE: return "REPLACE"; case mr::Config::MERGE: return "MERGE"; case mr::Config::REDUCE: return "REDUCE"; case mr::Config::INMEMORY: return "INMEMORY"; } MONGO_UNREACHABLE; } /** * Test helper function to check expected result of parseOutputOptions. */ void _testConfigParseOutputOptions(const std::string& dbname, const std::string& cmdObjStr, const std::string& expectedOutDb, const std::string& expectedCollectionName, const std::string& expectedFinalNamespace, bool expectedOutNonAtomic, mr::Config::OutputType expectedOutType) { const BSONObj cmdObj = fromjson(cmdObjStr); mr::Config::OutputOptions outputOptions = mr::Config::parseOutputOptions(dbname, cmdObj); _compareOutputOptionField(dbname, cmdObjStr, "outDb", outputOptions.outDB, expectedOutDb); _compareOutputOptionField( dbname, cmdObjStr, "collectionName", outputOptions.collectionName, expectedCollectionName); _compareOutputOptionField(dbname, cmdObjStr, "finalNamespace", outputOptions.finalNamespace.ns(), expectedFinalNamespace); _compareOutputOptionField( dbname, cmdObjStr, "outNonAtomic", outputOptions.outNonAtomic, expectedOutNonAtomic); _compareOutputOptionField(dbname, cmdObjStr, "outType", _getOutTypeString(outputOptions.outType), _getOutTypeString(expectedOutType)); } /** * Tests for mr::Config::parseOutputOptions. */ TEST(ConfigOutputOptionsTest, parseOutputOptions) { // Missing 'out' field. ASSERT_THROWS(mr::Config::parseOutputOptions("mydb", fromjson("{}")), AssertionException); // 'out' must be either string or object. ASSERT_THROWS(mr::Config::parseOutputOptions("mydb", fromjson("{out: 99}")), AssertionException); // 'out.nonAtomic' is not supported with normal, replace or inline. ASSERT_THROWS(mr::Config::parseOutputOptions( "mydb", fromjson("{out: {normal: 'mycoll', nonAtomic: true}}")), AssertionException); ASSERT_THROWS(mr::Config::parseOutputOptions( "mydb", fromjson("{out: {replace: 'mycoll', nonAtomic: true}}")), AssertionException); ASSERT_THROWS(mr::Config::parseOutputOptions( "mydb", fromjson("{out: {inline: 'mycoll', nonAtomic: true}}")), AssertionException); // Unknown output specifer. ASSERT_THROWS( mr::Config::parseOutputOptions("mydb", fromjson("{out: {no_such_out_type: 'mycoll'}}")), AssertionException); // 'out' is string. _testConfigParseOutputOptions( "mydb", "{out: 'mycoll'}", "", "mycoll", "mydb.mycoll", false, mr::Config::REPLACE); // 'out' is object. _testConfigParseOutputOptions("mydb", "{out: {normal: 'mycoll'}}", "", "mycoll", "mydb.mycoll", false, mr::Config::REPLACE); // 'out.db' overrides dbname parameter _testConfigParseOutputOptions("mydb1", "{out: {replace: 'mycoll', db: 'mydb2'}}", "mydb2", "mycoll", "mydb2.mycoll", false, mr::Config::REPLACE); // 'out.nonAtomic' is supported with merge and reduce. _testConfigParseOutputOptions("mydb", "{out: {merge: 'mycoll', nonAtomic: true}}", "", "mycoll", "mydb.mycoll", true, mr::Config::MERGE); _testConfigParseOutputOptions("mydb", "{out: {reduce: 'mycoll', nonAtomic: true}}", "", "mycoll", "mydb.mycoll", true, mr::Config::REDUCE); // inline _testConfigParseOutputOptions("mydb1", "{out: {inline: 'mycoll', db: 'mydb2'}}", "mydb2", "", "", false, mr::Config::INMEMORY); // Order should not matter in fields of 'out' object. _testConfigParseOutputOptions("mydb1", "{out: {db: 'mydb2', normal: 'mycoll'}}", "mydb2", "mycoll", "mydb2.mycoll", false, mr::Config::REPLACE); _testConfigParseOutputOptions("mydb1", "{out: {db: 'mydb2', replace: 'mycoll'}}", "mydb2", "mycoll", "mydb2.mycoll", false, mr::Config::REPLACE); _testConfigParseOutputOptions("mydb1", "{out: {nonAtomic: true, merge: 'mycoll'}}", "", "mycoll", "mydb1.mycoll", true, mr::Config::MERGE); _testConfigParseOutputOptions("mydb1", "{out: {nonAtomic: true, reduce: 'mycoll'}}", "", "mycoll", "mydb1.mycoll", true, mr::Config::REDUCE); _testConfigParseOutputOptions("mydb1", "{out: {db: 'mydb2', inline: 'mycoll'}}", "mydb2", "", "", false, mr::Config::INMEMORY); } TEST(ConfigTest, ParseCollation) { std::string dbname = "myDB"; BSONObj collation = BSON("locale" << "en_US"); BSONObjBuilder bob; bob.append("mapReduce", "myCollection"); bob.appendCode("map", "function() { emit(0, 1); }"); bob.appendCode("reduce", "function(k, v) { return {count: 0}; }"); bob.append("out", "outCollection"); bob.append("collation", collation); BSONObj cmdObj = bob.obj(); mr::Config config(dbname, cmdObj); ASSERT_BSONOBJ_EQ(config.collation, collation); } TEST(ConfigTest, ParseNoCollation) { std::string dbname = "myDB"; BSONObjBuilder bob; bob.append("mapReduce", "myCollection"); bob.appendCode("map", "function() { emit(0, 1); }"); bob.appendCode("reduce", "function(k, v) { return {count: 0}; }"); bob.append("out", "outCollection"); BSONObj cmdObj = bob.obj(); mr::Config config(dbname, cmdObj); ASSERT_BSONOBJ_EQ(config.collation, BSONObj()); } TEST(ConfigTest, CollationNotAnObjectFailsToParse) { std::string dbname = "myDB"; BSONObjBuilder bob; bob.append("mapReduce", "myCollection"); bob.appendCode("map", "function() { emit(0, 1); }"); bob.appendCode("reduce", "function(k, v) { return {count: 0}; }"); bob.append("out", "outCollection"); bob.append("collation", "en_US"); BSONObj cmdObj = bob.obj(); ASSERT_THROWS(mr::Config(dbname, cmdObj), AssertionException); } /** * OpObserver for mapReduce test fixture. */ class MapReduceOpObserver : public OpObserverNoop { public: /** * This function is called whenever mapReduce inserts documents into a temporary output * collection. */ void onInserts(OperationContext* opCtx, const NamespaceString& nss, OptionalCollectionUUID uuid, std::vector::const_iterator begin, std::vector::const_iterator end, bool fromMigrate) override; /** * Tracks the temporary collections mapReduces creates. */ void onCreateCollection(OperationContext* opCtx, Collection* coll, const NamespaceString& collectionName, const CollectionOptions& options, const BSONObj& idIndex, const OplogSlot& createOpTime) override; // Hook for onInserts. Defaults to a no-op function but may be overridden to inject exceptions // while mapReduce inserts its results into the temporary output collection. std::function onInsertsFn = [] {}; // Holds namespaces of temporary collections created by mapReduce. std::vector tempNamespaces; }; void MapReduceOpObserver::onInserts(OperationContext* opCtx, const NamespaceString& nss, OptionalCollectionUUID uuid, std::vector::const_iterator begin, std::vector::const_iterator end, bool fromMigrate) { onInsertsFn(); } void MapReduceOpObserver::onCreateCollection(OperationContext*, Collection*, const NamespaceString& collectionName, const CollectionOptions& options, const BSONObj&, const OplogSlot&) { if (!options.temp) { return; } tempNamespaces.push_back(collectionName); } /** * Test fixture for MapReduceCommand. */ class MapReduceCommandTest : public ServiceContextMongoDTest { public: static const NamespaceString inputNss; static const NamespaceString outputNss; private: void setUp() override; void tearDown() override; protected: /** * Looks up the current ReplicationCoordinator. * The result is cast to a ReplicationCoordinatorMock to provide access to test features. */ repl::ReplicationCoordinatorMock* _getReplCoord() const; /** * Creates a mapReduce command object that reads from 'inputNss' and writes results to * 'outputNss'. */ BSONObj _makeCmdObj(StringData mapCode, StringData reduceCode); /** * Runs a mapReduce command. * Ensures that temporary collections created by mapReduce no longer exist on success. */ Status _runCommand(StringData mapCode, StringData reduceCode); /** * Checks that temporary collections created during mapReduce have been dropped. * This is made a separate test helper to handle cases where mapReduce is unable to remove * its temporary collections. */ void _assertTemporaryCollectionsAreDropped(); ServiceContext::UniqueOperationContext _opCtx; repl::StorageInterfaceImpl _storage; MapReduceOpObserver* _opObserver = nullptr; }; const NamespaceString MapReduceCommandTest::inputNss("myDB.myCollection"); const NamespaceString MapReduceCommandTest::outputNss(inputNss.getSisterNS("outCollection")); void MapReduceCommandTest::setUp() { ServiceContextMongoDTest::setUp(); ScriptEngine::setup(); auto service = getServiceContext(); DBDirectClientFactory::get(service).registerImplementation( [](OperationContext* opCtx) { return std::make_unique(opCtx); }); repl::ReplicationCoordinator::set(service, std::make_unique(service)); // Set up an OpObserver to track the temporary collections mapReduce creates. auto opObserver = std::make_unique(); _opObserver = opObserver.get(); auto opObserverRegistry = dynamic_cast(service->getOpObserver()); opObserverRegistry->addObserver(std::move(opObserver)); _opCtx = cc().makeOperationContext(); // Transition to PRIMARY so that the server can accept writes. ASSERT_OK(_getReplCoord()->setFollowerMode(repl::MemberState::RS_PRIMARY)); // Create collection with one document. CollectionOptions collectionOptions; collectionOptions.uuid = UUID::gen(); ASSERT_OK(_storage.createCollection(_opCtx.get(), inputNss, collectionOptions)); } void MapReduceCommandTest::tearDown() { _opCtx = {}; _opObserver = nullptr; ScriptEngine::dropScopeCache(); ServiceContextMongoDTest::tearDown(); } repl::ReplicationCoordinatorMock* MapReduceCommandTest::_getReplCoord() const { auto replCoord = repl::ReplicationCoordinator::get(_opCtx.get()); ASSERT(replCoord) << "No ReplicationCoordinator installed"; auto replCoordMock = dynamic_cast(replCoord); ASSERT(replCoordMock) << "Unexpected type for installed ReplicationCoordinator"; return replCoordMock; } BSONObj MapReduceCommandTest::_makeCmdObj(StringData mapCode, StringData reduceCode) { BSONObjBuilder bob; bob.append("mapReduce", inputNss.coll()); bob.appendCode("map", mapCode); bob.appendCode("reduce", reduceCode); bob.append("out", outputNss.coll()); return bob.obj(); } Status MapReduceCommandTest::_runCommand(StringData mapCode, StringData reduceCode) { auto command = CommandHelpers::findCommand("mapReduce"); ASSERT(command) << "Unable to look up mapReduce command"; auto request = OpMsgRequest::fromDBAndBody(inputNss.db(), _makeCmdObj(mapCode, reduceCode)); auto replyBuilder = rpc::makeReplyBuilder(rpc::Protocol::kOpMsg); auto result = CommandHelpers::runCommandDirectly(_opCtx.get(), request); auto status = getStatusFromCommandResult(result); if (!status.isOK()) { return status.withContext(str::stream() << "mapReduce command failed: " << request.body); } _assertTemporaryCollectionsAreDropped(); return Status::OK(); } void MapReduceCommandTest::_assertTemporaryCollectionsAreDropped() { for (const auto& tempNss : _opObserver->tempNamespaces) { ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, _storage.getCollectionCount(_opCtx.get(), tempNss)) << "mapReduce did not remove temporary collection on success: " << tempNss.ns(); } } TEST_F(MapReduceCommandTest, MapIdToValue) { auto sourceDoc = BSON("_id" << 0); ASSERT_OK(_storage.insertDocument(_opCtx.get(), inputNss, {sourceDoc, Timestamp(0)}, 1LL)); auto mapCode = "function() { emit(this._id, this._id); }"_sd; auto reduceCode = "function(k, v) { return Array.sum(v); }"_sd; ASSERT_OK(_runCommand(mapCode, reduceCode)); auto targetDoc = BSON("_id" << 0 << "value" << 0); ASSERT_BSONOBJ_EQ(targetDoc, unittest::assertGet(_storage.findSingleton(_opCtx.get(), outputNss))); } TEST_F(MapReduceCommandTest, DropTemporaryCollectionsOnInsertError) { auto sourceDoc = BSON("_id" << 0); ASSERT_OK(_storage.insertDocument(_opCtx.get(), inputNss, {sourceDoc, Timestamp(0)}, 1LL)); _opObserver->onInsertsFn = [] { uasserted(ErrorCodes::OperationFailed, ""); }; auto mapCode = "function() { emit(this._id, this._id); }"_sd; auto reduceCode = "function(k, v) { return Array.sum(v); }"_sd; ASSERT_EQ(_runCommand(mapCode, reduceCode), ErrorCodes::OperationFailed); // Temporary collections created by mapReduce will be removed on failure if the server is able // to accept writes. _assertTemporaryCollectionsAreDropped(); } TEST_F(MapReduceCommandTest, PrimaryStepDownPreventsTemporaryCollectionDrops) { auto sourceDoc = BSON("_id" << 0); ASSERT_OK(_storage.insertDocument(_opCtx.get(), inputNss, {sourceDoc, Timestamp(0)}, 1LL)); _opObserver->onInsertsFn = [this] { ASSERT_OK(_getReplCoord()->setFollowerMode(repl::MemberState::RS_SECONDARY)); uasserted(ErrorCodes::OperationFailed, ""); }; auto mapCode = "function() { emit(this._id, this._id); }"_sd; auto reduceCode = "function(k, v) { return Array.sum(v); }"_sd; ASSERT_EQ(_runCommand(mapCode, reduceCode), ErrorCodes::OperationFailed); // Temporary collections should still be present because the server will not accept writes after // stepping down. for (const auto& tempNss : _opObserver->tempNamespaces) { ASSERT_OK(_storage.getCollectionCount(_opCtx.get(), tempNss).getStatus()) << "missing mapReduce temporary collection: " << tempNss; } } } // namespace } // namespace mongo