summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/commands/SConscript3
-rw-r--r--src/mongo/db/commands/mr_test.cpp244
2 files changed, 245 insertions, 2 deletions
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript
index e6682b2ac71..54a4f51e7d6 100644
--- a/src/mongo/db/commands/SConscript
+++ b/src/mongo/db/commands/SConscript
@@ -324,8 +324,11 @@ env.CppUnitTest(
"mr_test.cpp",
],
LIBDEPS=[
+ "$BUILD_DIR/mongo/db/repl/replmocks",
+ "$BUILD_DIR/mongo/db/repl/storage_interface_impl",
"$BUILD_DIR/mongo/db/serveronly",
"$BUILD_DIR/mongo/db/service_context_d",
+ "$BUILD_DIR/mongo/db/service_context_d_test_fixture",
"dcommands",
],
)
diff --git a/src/mongo/db/commands/mr_test.cpp b/src/mongo/db/commands/mr_test.cpp
index a3f8d95bf54..068699d88b3 100644
--- a/src/mongo/db/commands/mr_test.cpp
+++ b/src/mongo/db/commands/mr_test.cpp
@@ -32,13 +32,27 @@
#include "mongo/db/commands/mr.h"
+#include <functional>
+#include <memory>
#include <string>
+#include <vector>
+#include "mongo/db/catalog/collection_options.h"
+#include "mongo/db/client.h"
+#include "mongo/db/commands.h"
+#include "mongo/db/dbdirectclient.h"
#include "mongo/db/json.h"
+#include "mongo/db/op_observer_noop.h"
+#include "mongo/db/op_observer_registry.h"
+#include "mongo/db/repl/replication_coordinator_mock.h"
+#include "mongo/db/repl/storage_interface_impl.h"
+#include "mongo/db/service_context_d_test_fixture.h"
+#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/scripting/dbdirectclient_factory.h"
+#include "mongo/scripting/engine.h"
#include "mongo/unittest/unittest.h"
-using namespace mongo;
-
+namespace mongo {
namespace {
/**
@@ -255,4 +269,230 @@ TEST(ConfigTest, CollationNotAnObjectFailsToParse) {
ASSERT_THROWS(mr::Config(dbname, cmdObj), AssertionException);
}
+/**
+ * OpObserver for mapReduce test fixture.
+ */
+class MapReduceOpObserver : public OpObserverNoop {
+public:
+ /**
+ * This function is called whenever mapReduce inserts documents into a temporary output
+ * collection.
+ */
+ void onInserts(OperationContext* opCtx,
+ const NamespaceString& nss,
+ OptionalCollectionUUID uuid,
+ std::vector<InsertStatement>::const_iterator begin,
+ std::vector<InsertStatement>::const_iterator end,
+ bool fromMigrate) override;
+
+ /**
+ * Tracks the temporary collections mapReduces creates.
+ */
+ void onCreateCollection(OperationContext* opCtx,
+ Collection* coll,
+ const NamespaceString& collectionName,
+ const CollectionOptions& options,
+ const BSONObj& idIndex) override;
+
+ // Hook for onInserts. Defaults to a no-op function but may be overridden to inject exceptions
+ // while mapReduce inserts its results into the temporary output collection.
+ std::function<void()> onInsertsFn = [] {};
+
+ // Holds namespaces of temporary collections created by mapReduce.
+ std::vector<NamespaceString> tempNamespaces;
+};
+
+void MapReduceOpObserver::onInserts(OperationContext* opCtx,
+ const NamespaceString& nss,
+ OptionalCollectionUUID uuid,
+ std::vector<InsertStatement>::const_iterator begin,
+ std::vector<InsertStatement>::const_iterator end,
+ bool fromMigrate) {
+ onInsertsFn();
+}
+
+void MapReduceOpObserver::onCreateCollection(OperationContext*,
+ Collection*,
+ const NamespaceString& collectionName,
+ const CollectionOptions& options,
+ const BSONObj&) {
+ if (!options.temp) {
+ return;
+ }
+ tempNamespaces.push_back(collectionName);
+}
+
+/**
+ * Test fixture for MapReduceCommand.
+ */
+class MapReduceCommandTest : public ServiceContextMongoDTest {
+public:
+ static const NamespaceString inputNss;
+ static const NamespaceString outputNss;
+
+private:
+ void setUp() override;
+ void tearDown() override;
+
+protected:
+ /**
+ * Looks up the current ReplicationCoordinator.
+ * The result is cast to a ReplicationCoordinatorMock to provide access to test features.
+ */
+ repl::ReplicationCoordinatorMock* _getReplCoord() const;
+
+ /**
+ * Creates a mapReduce command object that reads from 'inputNss' and writes results to
+ * 'outputNss'.
+ */
+ BSONObj _makeCmdObj(StringData mapCode, StringData reduceCode);
+
+ /**
+ * Runs a mapReduce command.
+ * Ensures that temporary collections created by mapReduce no longer exist on success.
+ */
+ Status _runCommand(StringData mapCode, StringData reduceCode);
+
+ /**
+ * Checks that temporary collections created during mapReduce have been dropped.
+ * This is made a separate test helper to handle cases where mapReduce is unable to remove
+ * its temporary collections.
+ */
+ void _assertTemporaryCollectionsAreDropped();
+
+ ServiceContext::UniqueOperationContext _opCtx;
+ repl::StorageInterfaceImpl _storage;
+ MapReduceOpObserver* _opObserver = nullptr;
+};
+
+const NamespaceString MapReduceCommandTest::inputNss("myDB.myCollection");
+const NamespaceString MapReduceCommandTest::outputNss(inputNss.getSisterNS("outCollection"));
+
+void MapReduceCommandTest::setUp() {
+ ServiceContextMongoDTest::setUp();
+ ScriptEngine::setup();
+ auto service = getServiceContext();
+ DBDirectClientFactory::get(service).registerImplementation(
+ [](OperationContext* opCtx) { return std::make_unique<DBDirectClient>(opCtx); });
+ repl::ReplicationCoordinator::set(service,
+ std::make_unique<repl::ReplicationCoordinatorMock>(service));
+
+ // Set up an OpObserver to track the temporary collections mapReduce creates.
+ auto opObserver = std::make_unique<MapReduceOpObserver>();
+ _opObserver = opObserver.get();
+ auto opObserverRegistry = dynamic_cast<OpObserverRegistry*>(service->getOpObserver());
+ opObserverRegistry->addObserver(std::move(opObserver));
+
+ _opCtx = cc().makeOperationContext();
+
+ // Transition to PRIMARY so that the server can accept writes.
+ ASSERT_OK(_getReplCoord()->setFollowerMode(repl::MemberState::RS_PRIMARY));
+
+ // Create collection with one document.
+ CollectionOptions collectionOptions;
+ collectionOptions.uuid = UUID::gen();
+ ASSERT_OK(_storage.createCollection(_opCtx.get(), inputNss, collectionOptions));
+}
+
+void MapReduceCommandTest::tearDown() {
+ _opCtx = {};
+ _opObserver = nullptr;
+ ScriptEngine::dropScopeCache();
+ ServiceContextMongoDTest::tearDown();
+}
+
+repl::ReplicationCoordinatorMock* MapReduceCommandTest::_getReplCoord() const {
+ auto replCoord = repl::ReplicationCoordinator::get(_opCtx.get());
+ ASSERT(replCoord) << "No ReplicationCoordinator installed";
+ auto replCoordMock = dynamic_cast<repl::ReplicationCoordinatorMock*>(replCoord);
+ ASSERT(replCoordMock) << "Unexpected type for installed ReplicationCoordinator";
+ return replCoordMock;
+}
+
+BSONObj MapReduceCommandTest::_makeCmdObj(StringData mapCode, StringData reduceCode) {
+ BSONObjBuilder bob;
+ bob.append("mapReduce", inputNss.coll());
+ bob.appendCode("map", mapCode);
+ bob.appendCode("reduce", reduceCode);
+ bob.append("out", outputNss.coll());
+ return bob.obj();
+}
+
+Status MapReduceCommandTest::_runCommand(StringData mapCode, StringData reduceCode) {
+ auto command = CommandHelpers::findCommand("mapReduce");
+ ASSERT(command) << "Unable to look up mapReduce command";
+
+ auto request = OpMsgRequest::fromDBAndBody(inputNss.db(), _makeCmdObj(mapCode, reduceCode));
+ BSONObjBuilder result;
+ auto success = command->publicRun(_opCtx.get(), request, result);
+ if (!success) {
+ auto status = getStatusFromCommandResult(result.obj());
+ ASSERT_NOT_OK(status);
+ return status.withContext(str::stream() << "mapReduce command failed: " << request.body);
+ }
+
+ _assertTemporaryCollectionsAreDropped();
+ return Status::OK();
+}
+
+void MapReduceCommandTest::_assertTemporaryCollectionsAreDropped() {
+ for (const auto& tempNss : _opObserver->tempNamespaces) {
+ ASSERT_EQUALS(ErrorCodes::NamespaceNotFound,
+ _storage.getCollectionCount(_opCtx.get(), tempNss))
+ << "mapReduce did not remove temporary collection on success: " << tempNss.ns();
+ }
+}
+
+TEST_F(MapReduceCommandTest, MapIdToValue) {
+ auto sourceDoc = BSON("_id" << 0);
+ ASSERT_OK(_storage.insertDocument(_opCtx.get(), inputNss, {sourceDoc, Timestamp(0)}, 1LL));
+
+ auto mapCode = "function() { emit(this._id, this._id); }"_sd;
+ auto reduceCode = "function(k, v) { return Array.sum(v); }"_sd;
+ ASSERT_OK(_runCommand(mapCode, reduceCode));
+
+ auto targetDoc = BSON("_id" << 0 << "value" << 0);
+ ASSERT_BSONOBJ_EQ(targetDoc,
+ unittest::assertGet(_storage.findSingleton(_opCtx.get(), outputNss)));
+}
+
+TEST_F(MapReduceCommandTest, DropTemporaryCollectionsOnInsertError) {
+ auto sourceDoc = BSON("_id" << 0);
+ ASSERT_OK(_storage.insertDocument(_opCtx.get(), inputNss, {sourceDoc, Timestamp(0)}, 1LL));
+
+ _opObserver->onInsertsFn = [] { uasserted(ErrorCodes::OperationFailed, ""); };
+
+ auto mapCode = "function() { emit(this._id, this._id); }"_sd;
+ auto reduceCode = "function(k, v) { return Array.sum(v); }"_sd;
+ ASSERT_THROWS_CODE(
+ _runCommand(mapCode, reduceCode).ignore(), AssertionException, ErrorCodes::OperationFailed);
+
+ // Temporary collections created by mapReduce will be removed on failure if the server is able
+ // to accept writes.
+ _assertTemporaryCollectionsAreDropped();
+}
+
+TEST_F(MapReduceCommandTest, PrimaryStepDownPreventsTemporaryCollectionDrops) {
+ auto sourceDoc = BSON("_id" << 0);
+ ASSERT_OK(_storage.insertDocument(_opCtx.get(), inputNss, {sourceDoc, Timestamp(0)}, 1LL));
+
+ _opObserver->onInsertsFn = [this] {
+ ASSERT_OK(_getReplCoord()->setFollowerMode(repl::MemberState::RS_SECONDARY));
+ uasserted(ErrorCodes::OperationFailed, "");
+ };
+
+ auto mapCode = "function() { emit(this._id, this._id); }"_sd;
+ auto reduceCode = "function(k, v) { return Array.sum(v); }"_sd;
+ ASSERT_THROWS_CODE(
+ _runCommand(mapCode, reduceCode).ignore(), AssertionException, ErrorCodes::OperationFailed);
+
+ // Temporary collections should still be present because the server will not accept writes after
+ // stepping down.
+ for (const auto& tempNss : _opObserver->tempNamespaces) {
+ ASSERT_OK(_storage.getCollectionCount(_opCtx.get(), tempNss).getStatus())
+ << "missing mapReduce temporary collection: " << tempNss;
+ }
+}
+
} // namespace
+} // namespace mongo