diff options
-rw-r--r-- | src/mongo/db/commands/SConscript | 3 | ||||
-rw-r--r-- | src/mongo/db/commands/mr_test.cpp | 244 |
2 files changed, 245 insertions, 2 deletions
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index e6682b2ac71..54a4f51e7d6 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -324,8 +324,11 @@ env.CppUnitTest( "mr_test.cpp", ], LIBDEPS=[ + "$BUILD_DIR/mongo/db/repl/replmocks", + "$BUILD_DIR/mongo/db/repl/storage_interface_impl", "$BUILD_DIR/mongo/db/serveronly", "$BUILD_DIR/mongo/db/service_context_d", + "$BUILD_DIR/mongo/db/service_context_d_test_fixture", "dcommands", ], ) diff --git a/src/mongo/db/commands/mr_test.cpp b/src/mongo/db/commands/mr_test.cpp index a3f8d95bf54..068699d88b3 100644 --- a/src/mongo/db/commands/mr_test.cpp +++ b/src/mongo/db/commands/mr_test.cpp @@ -32,13 +32,27 @@ #include "mongo/db/commands/mr.h" +#include <functional> +#include <memory> #include <string> +#include <vector> +#include "mongo/db/catalog/collection_options.h" +#include "mongo/db/client.h" +#include "mongo/db/commands.h" +#include "mongo/db/dbdirectclient.h" #include "mongo/db/json.h" +#include "mongo/db/op_observer_noop.h" +#include "mongo/db/op_observer_registry.h" +#include "mongo/db/repl/replication_coordinator_mock.h" +#include "mongo/db/repl/storage_interface_impl.h" +#include "mongo/db/service_context_d_test_fixture.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/scripting/dbdirectclient_factory.h" +#include "mongo/scripting/engine.h" #include "mongo/unittest/unittest.h" -using namespace mongo; - +namespace mongo { namespace { /** @@ -255,4 +269,230 @@ TEST(ConfigTest, CollationNotAnObjectFailsToParse) { ASSERT_THROWS(mr::Config(dbname, cmdObj), AssertionException); } +/** + * OpObserver for mapReduce test fixture. + */ +class MapReduceOpObserver : public OpObserverNoop { +public: + /** + * This function is called whenever mapReduce inserts documents into a temporary output + * collection. + */ + void onInserts(OperationContext* opCtx, + const NamespaceString& nss, + OptionalCollectionUUID uuid, + std::vector<InsertStatement>::const_iterator begin, + std::vector<InsertStatement>::const_iterator end, + bool fromMigrate) override; + + /** + * Tracks the temporary collections mapReduces creates. + */ + void onCreateCollection(OperationContext* opCtx, + Collection* coll, + const NamespaceString& collectionName, + const CollectionOptions& options, + const BSONObj& idIndex) override; + + // Hook for onInserts. Defaults to a no-op function but may be overridden to inject exceptions + // while mapReduce inserts its results into the temporary output collection. + std::function<void()> onInsertsFn = [] {}; + + // Holds namespaces of temporary collections created by mapReduce. + std::vector<NamespaceString> tempNamespaces; +}; + +void MapReduceOpObserver::onInserts(OperationContext* opCtx, + const NamespaceString& nss, + OptionalCollectionUUID uuid, + std::vector<InsertStatement>::const_iterator begin, + std::vector<InsertStatement>::const_iterator end, + bool fromMigrate) { + onInsertsFn(); +} + +void MapReduceOpObserver::onCreateCollection(OperationContext*, + Collection*, + const NamespaceString& collectionName, + const CollectionOptions& options, + const BSONObj&) { + if (!options.temp) { + return; + } + tempNamespaces.push_back(collectionName); +} + +/** + * Test fixture for MapReduceCommand. + */ +class MapReduceCommandTest : public ServiceContextMongoDTest { +public: + static const NamespaceString inputNss; + static const NamespaceString outputNss; + +private: + void setUp() override; + void tearDown() override; + +protected: + /** + * Looks up the current ReplicationCoordinator. + * The result is cast to a ReplicationCoordinatorMock to provide access to test features. + */ + repl::ReplicationCoordinatorMock* _getReplCoord() const; + + /** + * Creates a mapReduce command object that reads from 'inputNss' and writes results to + * 'outputNss'. + */ + BSONObj _makeCmdObj(StringData mapCode, StringData reduceCode); + + /** + * Runs a mapReduce command. + * Ensures that temporary collections created by mapReduce no longer exist on success. + */ + Status _runCommand(StringData mapCode, StringData reduceCode); + + /** + * Checks that temporary collections created during mapReduce have been dropped. + * This is made a separate test helper to handle cases where mapReduce is unable to remove + * its temporary collections. + */ + void _assertTemporaryCollectionsAreDropped(); + + ServiceContext::UniqueOperationContext _opCtx; + repl::StorageInterfaceImpl _storage; + MapReduceOpObserver* _opObserver = nullptr; +}; + +const NamespaceString MapReduceCommandTest::inputNss("myDB.myCollection"); +const NamespaceString MapReduceCommandTest::outputNss(inputNss.getSisterNS("outCollection")); + +void MapReduceCommandTest::setUp() { + ServiceContextMongoDTest::setUp(); + ScriptEngine::setup(); + auto service = getServiceContext(); + DBDirectClientFactory::get(service).registerImplementation( + [](OperationContext* opCtx) { return std::make_unique<DBDirectClient>(opCtx); }); + repl::ReplicationCoordinator::set(service, + std::make_unique<repl::ReplicationCoordinatorMock>(service)); + + // Set up an OpObserver to track the temporary collections mapReduce creates. + auto opObserver = std::make_unique<MapReduceOpObserver>(); + _opObserver = opObserver.get(); + auto opObserverRegistry = dynamic_cast<OpObserverRegistry*>(service->getOpObserver()); + opObserverRegistry->addObserver(std::move(opObserver)); + + _opCtx = cc().makeOperationContext(); + + // Transition to PRIMARY so that the server can accept writes. + ASSERT_OK(_getReplCoord()->setFollowerMode(repl::MemberState::RS_PRIMARY)); + + // Create collection with one document. + CollectionOptions collectionOptions; + collectionOptions.uuid = UUID::gen(); + ASSERT_OK(_storage.createCollection(_opCtx.get(), inputNss, collectionOptions)); +} + +void MapReduceCommandTest::tearDown() { + _opCtx = {}; + _opObserver = nullptr; + ScriptEngine::dropScopeCache(); + ServiceContextMongoDTest::tearDown(); +} + +repl::ReplicationCoordinatorMock* MapReduceCommandTest::_getReplCoord() const { + auto replCoord = repl::ReplicationCoordinator::get(_opCtx.get()); + ASSERT(replCoord) << "No ReplicationCoordinator installed"; + auto replCoordMock = dynamic_cast<repl::ReplicationCoordinatorMock*>(replCoord); + ASSERT(replCoordMock) << "Unexpected type for installed ReplicationCoordinator"; + return replCoordMock; +} + +BSONObj MapReduceCommandTest::_makeCmdObj(StringData mapCode, StringData reduceCode) { + BSONObjBuilder bob; + bob.append("mapReduce", inputNss.coll()); + bob.appendCode("map", mapCode); + bob.appendCode("reduce", reduceCode); + bob.append("out", outputNss.coll()); + return bob.obj(); +} + +Status MapReduceCommandTest::_runCommand(StringData mapCode, StringData reduceCode) { + auto command = CommandHelpers::findCommand("mapReduce"); + ASSERT(command) << "Unable to look up mapReduce command"; + + auto request = OpMsgRequest::fromDBAndBody(inputNss.db(), _makeCmdObj(mapCode, reduceCode)); + BSONObjBuilder result; + auto success = command->publicRun(_opCtx.get(), request, result); + if (!success) { + auto status = getStatusFromCommandResult(result.obj()); + ASSERT_NOT_OK(status); + return status.withContext(str::stream() << "mapReduce command failed: " << request.body); + } + + _assertTemporaryCollectionsAreDropped(); + return Status::OK(); +} + +void MapReduceCommandTest::_assertTemporaryCollectionsAreDropped() { + for (const auto& tempNss : _opObserver->tempNamespaces) { + ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, + _storage.getCollectionCount(_opCtx.get(), tempNss)) + << "mapReduce did not remove temporary collection on success: " << tempNss.ns(); + } +} + +TEST_F(MapReduceCommandTest, MapIdToValue) { + auto sourceDoc = BSON("_id" << 0); + ASSERT_OK(_storage.insertDocument(_opCtx.get(), inputNss, {sourceDoc, Timestamp(0)}, 1LL)); + + auto mapCode = "function() { emit(this._id, this._id); }"_sd; + auto reduceCode = "function(k, v) { return Array.sum(v); }"_sd; + ASSERT_OK(_runCommand(mapCode, reduceCode)); + + auto targetDoc = BSON("_id" << 0 << "value" << 0); + ASSERT_BSONOBJ_EQ(targetDoc, + unittest::assertGet(_storage.findSingleton(_opCtx.get(), outputNss))); +} + +TEST_F(MapReduceCommandTest, DropTemporaryCollectionsOnInsertError) { + auto sourceDoc = BSON("_id" << 0); + ASSERT_OK(_storage.insertDocument(_opCtx.get(), inputNss, {sourceDoc, Timestamp(0)}, 1LL)); + + _opObserver->onInsertsFn = [] { uasserted(ErrorCodes::OperationFailed, ""); }; + + auto mapCode = "function() { emit(this._id, this._id); }"_sd; + auto reduceCode = "function(k, v) { return Array.sum(v); }"_sd; + ASSERT_THROWS_CODE( + _runCommand(mapCode, reduceCode).ignore(), AssertionException, ErrorCodes::OperationFailed); + + // Temporary collections created by mapReduce will be removed on failure if the server is able + // to accept writes. + _assertTemporaryCollectionsAreDropped(); +} + +TEST_F(MapReduceCommandTest, PrimaryStepDownPreventsTemporaryCollectionDrops) { + auto sourceDoc = BSON("_id" << 0); + ASSERT_OK(_storage.insertDocument(_opCtx.get(), inputNss, {sourceDoc, Timestamp(0)}, 1LL)); + + _opObserver->onInsertsFn = [this] { + ASSERT_OK(_getReplCoord()->setFollowerMode(repl::MemberState::RS_SECONDARY)); + uasserted(ErrorCodes::OperationFailed, ""); + }; + + auto mapCode = "function() { emit(this._id, this._id); }"_sd; + auto reduceCode = "function(k, v) { return Array.sum(v); }"_sd; + ASSERT_THROWS_CODE( + _runCommand(mapCode, reduceCode).ignore(), AssertionException, ErrorCodes::OperationFailed); + + // Temporary collections should still be present because the server will not accept writes after + // stepping down. + for (const auto& tempNss : _opObserver->tempNamespaces) { + ASSERT_OK(_storage.getCollectionCount(_opCtx.get(), tempNss).getStatus()) + << "missing mapReduce temporary collection: " << tempNss; + } +} + } // namespace +} // namespace mongo |