diff options
Diffstat (limited to 'src/mongo/db/commands/mr.cpp')
-rw-r--r-- | src/mongo/db/commands/mr.cpp | 1829 |
1 files changed, 0 insertions, 1829 deletions
diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp deleted file mode 100644 index e338498b7e1..00000000000 --- a/src/mongo/db/commands/mr.cpp +++ /dev/null @@ -1,1829 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand - -#include "mongo/platform/basic.h" - -#include "mongo/db/commands/mr.h" - -#include "mongo/base/status_with.h" -#include "mongo/bson/util/builder.h" -#include "mongo/client/connpool.h" -#include "mongo/db/auth/authorization_session.h" -#include "mongo/db/bson/dotted_path_support.h" -#include "mongo/db/catalog/collection.h" -#include "mongo/db/catalog/database_holder.h" -#include "mongo/db/catalog/document_validation.h" -#include "mongo/db/catalog/index_catalog.h" -#include "mongo/db/catalog/index_key_validate.h" -#include "mongo/db/client.h" -#include "mongo/db/clientcursor.h" -#include "mongo/db/commands.h" -#include "mongo/db/commands/map_reduce_gen.h" -#include "mongo/db/concurrency/write_conflict_exception.h" -#include "mongo/db/db_raii.h" -#include "mongo/db/dbhelpers.h" -#include "mongo/db/exec/working_set_common.h" -#include "mongo/db/index/index_descriptor.h" -#include "mongo/db/index_builds_coordinator.h" -#include "mongo/db/matcher/extensions_callback_real.h" -#include "mongo/db/op_observer.h" -#include "mongo/db/ops/insert.h" -#include "mongo/db/query/collection_query_info.h" -#include "mongo/db/query/get_executor.h" -#include "mongo/db/query/plan_summary_stats.h" -#include "mongo/db/query/query_planner.h" -#include "mongo/db/s/collection_sharding_runtime.h" -#include "mongo/db/s/sharding_state.h" -#include "mongo/db/server_options.h" -#include "mongo/db/service_context.h" -#include "mongo/db/storage/durable_catalog.h" -#include "mongo/logv2/log.h" -#include "mongo/platform/mutex.h" -#include "mongo/s/catalog_cache.h" -#include "mongo/s/client/parallel.h" -#include "mongo/s/client/shard_connection.h" -#include "mongo/s/client/shard_registry.h" -#include "mongo/s/grid.h" -#include "mongo/s/shard_key_pattern.h" -#include "mongo/s/stale_exception.h" -#include "mongo/scripting/engine.h" -#include "mongo/util/debug_util.h" -#include "mongo/util/scopeguard.h" -#include "mongo/util/str.h" - -namespace mongo { - -using std::set; -using std::shared_ptr; -using std::string; -using std::stringstream; -using std::unique_ptr; -using std::vector; - -using IndexVersion = IndexDescriptor::IndexVersion; - -namespace dps = ::mongo::dotted_path_support; - -namespace mr { -namespace { - -Rarely mapParamsDeprecationSampler; // Used to occasionally log deprecation messages. - -/** - * Runs a count against the namespace specified by 'ns'. If the caller holds the global write lock, - * then this function does not acquire any additional locks. - */ -unsigned long long collectionCount(OperationContext* opCtx, - const NamespaceString& nss, - bool callerHoldsGlobalLock) { - boost::optional<AutoGetCollectionForReadCommand> ctx; - - Collection* coll = nullptr; - - // If the global write lock is held, we must avoid using AutoGetCollectionForReadCommand as it - // may lead to deadlock when waiting for a majority snapshot to be committed. See SERVER-24596. - if (callerHoldsGlobalLock) { - auto databaseHolder = DatabaseHolder::get(opCtx); - auto db = databaseHolder->getDb(opCtx, nss.ns()); - if (db) { - coll = CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, nss); - } - } else { - ctx.emplace(opCtx, nss); - coll = ctx->getCollection(); - } - - return coll ? coll->numRecords(opCtx) : 0; -} - -/** - * Emit that will be called by a js function. - */ -BSONObj fastEmit(const BSONObj& args, void* data) { - uassert(10077, "emit takes 2 args", args.nFields() == 2); - uassert(13069, - "an emit can't be more than half max bson size", - args.objsize() < (BSONObjMaxUserSize / 2)); - - State* state = (State*)data; - if (args.firstElement().type() == Undefined) { - BSONObjBuilder b(args.objsize()); - b.appendNull(""); - BSONObjIterator i(args); - i.next(); - b.append(i.next()); - state->emit(b.obj()); - } else { - state->emit(args); - } - return BSONObj(); -} - -/** - * This function is called when we realize we cant use js mode for m/r on the 1st key. - */ -BSONObj _bailFromJS(const BSONObj& args, void* data) { - State* state = (State*)data; - state->bailFromJS(); - - // emit this particular key if there is one - if (!args.isEmpty()) { - fastEmit(args, data); - } - return BSONObj(); -} - -template <class AutoT> -void assertCollectionNotNull(const NamespaceString& nss, AutoT& autoT) { - uassert(18698, "Collection unexpectedly disappeared: " + nss.ns(), autoT.getCollection()); -} - -/** - * Clean up the temporary and incremental collections - */ -void dropTempCollections(OperationContext* cleanupOpCtx, - const NamespaceString& tempNamespace, - const NamespaceString& incLong) { - // Make sure we enforce prepare conflicts before writing. - EnforcePrepareConflictsBlock enforcePrepare(cleanupOpCtx); - - if (!tempNamespace.isEmpty()) { - writeConflictRetry( - cleanupOpCtx, - "M/R dropTempCollections", - tempNamespace.ns(), - [cleanupOpCtx, &tempNamespace] { - AutoGetDb autoDb(cleanupOpCtx, tempNamespace.db(), MODE_X); - if (auto db = autoDb.getDb()) { - if (auto collection = - CollectionCatalog::get(cleanupOpCtx) - .lookupCollectionByNamespace(cleanupOpCtx, tempNamespace)) { - uassert(ErrorCodes::PrimarySteppedDown, - str::stream() << "no longer primary while dropping temporary " - "collection for mapReduce: " - << tempNamespace.ns(), - repl::ReplicationCoordinator::get(cleanupOpCtx) - ->canAcceptWritesFor(cleanupOpCtx, tempNamespace)); - BackgroundOperation::assertNoBgOpInProgForNs(tempNamespace.ns()); - IndexBuildsCoordinator::get(cleanupOpCtx) - ->assertNoIndexBuildInProgForCollection(collection->uuid()); - WriteUnitOfWork wunit(cleanupOpCtx); - uassertStatusOK(db->dropCollection(cleanupOpCtx, tempNamespace)); - wunit.commit(); - } - } - }); - // Always forget about temporary namespaces, so we don't cache lots of them - ShardConnection::forgetNS(tempNamespace.ns()); - } - if (!incLong.isEmpty()) { - writeConflictRetry( - cleanupOpCtx, "M/R dropTempCollections", incLong.ns(), [cleanupOpCtx, &incLong] { - Lock::DBLock lk(cleanupOpCtx, incLong.db(), MODE_X); - auto databaseHolder = DatabaseHolder::get(cleanupOpCtx); - if (auto db = databaseHolder->getDb(cleanupOpCtx, incLong.ns())) { - if (auto collection = CollectionCatalog::get(cleanupOpCtx) - .lookupCollectionByNamespace(cleanupOpCtx, incLong)) { - BackgroundOperation::assertNoBgOpInProgForNs(incLong.ns()); - IndexBuildsCoordinator::get(cleanupOpCtx) - ->assertNoIndexBuildInProgForCollection(collection->uuid()); - WriteUnitOfWork wunit(cleanupOpCtx); - uassertStatusOK(db->dropCollection(cleanupOpCtx, incLong)); - wunit.commit(); - } - } - }); - - ShardConnection::forgetNS(incLong.ns()); - } -} - -} // namespace - -AtomicWord<unsigned> Config::jobNumber; - -JSFunction::JSFunction(const std::string& type, const BSONElement& e) { - _type = type; - _code = e._asCode(); - - if (e.type() == CodeWScope) - _wantedScope = e.codeWScopeObject(); -} - -void JSFunction::init(State* state) { - _scope = state->scope(); - verify(_scope); - _scope->init(&_wantedScope); - - _func = _scope->createFunction(_code.c_str()); - uassert(13598, str::stream() << "couldn't compile code for: " << _type, _func); - - // install in JS scope so that it can be called in JS mode - _scope->setFunction(_type.c_str(), _code.c_str()); -} - -void JSMapper::init(State* state) { - _func.init(state); - _params = state->config().mapParams; -} - -/** - * Applies the map function to an object, which should internally call emit() - */ -void JSMapper::map(const BSONObj& o) { - Scope* s = _func.scope(); - verify(s); - if (s->invoke(_func.func(), &_params, &o, 0, true)) - uasserted(9014, str::stream() << "map invoke failed: " << s->getError()); -} - -/** - * Applies the finalize function to a tuple obj (key, val) - * Returns tuple obj {_id: key, value: newval} - */ -BSONObj JSFinalizer::finalize(const BSONObj& o) { - Scope* s = _func.scope(); - - s->invokeSafe(_func.func(), &o, nullptr); - - // We don't want to use o.objsize() to size b since there are many cases where the point of - // finalize is converting many fields to 1 - BSONObjBuilder b; - b.append(o.firstElement()); - s->append(b, "value", "__returnValue"); - return b.obj(); -} - -void JSReducer::init(State* state) { - _func.init(state); -} - -/** - * Reduces a list of tuple objects (key, value) to a single tuple {"0": key, "1": value} - */ -BSONObj JSReducer::reduce(const BSONList& tuples) { - if (tuples.size() <= 1) - return tuples[0]; - BSONObj key; - int endSizeEstimate = 16; - _reduce(tuples, key, endSizeEstimate); - - BSONObjBuilder b(endSizeEstimate); - b.appendAs(key.firstElement(), "0"); - _func.scope()->append(b, "1", "__returnValue"); - return b.obj(); -} - -/** - * Reduces a list of tuple object (key, value) to a single tuple {_id: key, value: val} - * Also applies a finalizer method if present. - */ -BSONObj JSReducer::finalReduce(const BSONList& tuples, Finalizer* finalizer) { - BSONObj res; - BSONObj key; - - if (tuples.size() == 1) { - // 1 obj, just use it - key = tuples[0]; - BSONObjBuilder b(key.objsize()); - BSONObjIterator it(key); - b.appendAs(it.next(), "_id"); - b.appendAs(it.next(), "value"); - res = b.obj(); - } else { - // need to reduce - int endSizeEstimate = 16; - _reduce(tuples, key, endSizeEstimate); - BSONObjBuilder b(endSizeEstimate); - b.appendAs(key.firstElement(), "_id"); - _func.scope()->append(b, "value", "__returnValue"); - res = b.obj(); - } - - if (finalizer) { - res = finalizer->finalize(res); - } - - return res; -} - -/** - * actually applies a reduce, to a list of tuples (key, value). - * After the call, tuples will hold a single tuple {"0": key, "1": value} - */ -void JSReducer::_reduce(const BSONList& tuples, BSONObj& key, int& endSizeEstimate) { - uassert(10074, "need values", tuples.size()); - - int sizeEstimate = (tuples.size() * tuples.begin()->getField("value").size()) + 128; - - // need to build the reduce args: ( key, [values] ) - BSONObjBuilder reduceArgs(sizeEstimate); - std::unique_ptr<BSONArrayBuilder> valueBuilder; - unsigned n = 0; - for (; n < tuples.size(); n++) { - BSONObjIterator j(tuples[n]); - BSONElement keyE = j.next(); - if (n == 0) { - reduceArgs.append(keyE); - key = keyE.wrap(); - valueBuilder.reset(new BSONArrayBuilder(reduceArgs.subarrayStart("tuples"))); - } - - BSONElement ee = j.next(); - - uassert(13070, "value too large to reduce", ee.size() < (BSONObjMaxUserSize / 2)); - - // If adding this element to the array would cause it to be too large, break. The - // remainder of the tuples will be processed recursively at the end of this - // function. - if (valueBuilder->len() + ee.size() > BSONObjMaxUserSize) { - verify(n > 1); // if not, inf. loop - break; - } - - valueBuilder->append(ee); - } - verify(valueBuilder); - valueBuilder->done(); - BSONObj args = reduceArgs.obj(); - - Scope* s = _func.scope(); - - s->invokeSafe(_func.func(), &args, nullptr); - ++numReduces; - - if (s->type("__returnValue") == Array) { - uasserted(10075, "reduce -> multiple not supported yet"); - return; - } - - endSizeEstimate = key.objsize() + (args.objsize() / tuples.size()); - - if (n == tuples.size()) - return; - - // the input list was too large, add the rest of elmts to new tuples and reduce again - // note: would be better to use loop instead of recursion to avoid stack overflow - BSONList x; - for (; n < tuples.size(); n++) { - x.push_back(tuples[n]); - } - BSONObjBuilder temp(endSizeEstimate); - temp.append(key.firstElement()); - s->append(temp, "1", "__returnValue"); - x.push_back(temp.obj()); - _reduce(x, key, endSizeEstimate); -} - -Config::Config(const string& _dbname, const BSONObj& cmdObj) { - dbname = _dbname; - uassert(ErrorCodes::TypeMismatch, - str::stream() << "'mapReduce' must be of type String", - cmdObj.firstElement().type() == BSONType::String); - nss = NamespaceString(dbname, cmdObj.firstElement().valueStringData()); - uassert(ErrorCodes::InvalidNamespace, - str::stream() << "Invalid namespace: " << nss.ns(), - nss.isValid()); - - verbose = cmdObj["verbose"].trueValue(); - jsMode = cmdObj["jsMode"].trueValue(); - splitInfo = 0; - - if (cmdObj.hasField("splitInfo")) { - splitInfo = cmdObj["splitInfo"].Int(); - } - - jsMaxKeys = 500000; - reduceTriggerRatio = 10.0; - maxInMemSize = 500 * 1024; - - uassert(13602, "outType is no longer a valid option", cmdObj["outType"].eoo()); - - outputOptions = map_reduce_common::parseOutputOptions(dbname, cmdObj); - - shardedFirstPass = false; - if (cmdObj.hasField("shardedFirstPass") && cmdObj["shardedFirstPass"].trueValue()) { - massert(16054, - "shardedFirstPass should only use replace outType", - outputOptions.outType == OutputType::Replace); - shardedFirstPass = true; - } - - if (outputOptions.outType != OutputType::InMemory) { - // Create names for the temp collection and the incremental collection. The incremental - // collection goes in the "local" database, so that it doesn't get replicated. - const std::string& outDBName = outputOptions.outDB.empty() ? dbname : outputOptions.outDB; - const std::string tmpCollDesc = str::stream() - << "tmp.mr." << cmdObj.firstElement().valueStringData() << "_" - << jobNumber.fetchAndAdd(1); - tempNamespace = NamespaceString(outDBName, tmpCollDesc); - - // The name of the incremental collection includes the name of the database that we put - // temporary collection in, to make it easier to see which incremental database is paired - // with which temporary database when debugging. - incLong = - NamespaceString("local", str::stream() << tmpCollDesc << "_" << outDBName << "_inc"); - } - - { - // scope and code - - if (cmdObj["scope"].type() == Object) - scopeSetup = cmdObj["scope"].embeddedObjectUserCheck().getOwned(); - - mapper.reset(new JSMapper(cmdObj["map"])); - reducer.reset(new JSReducer(cmdObj["reduce"])); - if (cmdObj["finalize"].type() && cmdObj["finalize"].trueValue()) - finalizer.reset(new JSFinalizer(cmdObj["finalize"])); - - // DEPRECATED - if (auto mapParamsElem = cmdObj["mapparams"]) { - if (mapParamsDeprecationSampler.tick()) { - LOGV2_WARNING(20493, "The mapparams option to MapReduce is deprecated."); - } - if (mapParamsElem.type() == Array) { - mapParams = mapParamsElem.embeddedObjectUserCheck().getOwned(); - } - } - } - - { - // query options - BSONElement q = cmdObj["query"]; - if (q.type() == Object) - filter = q.embeddedObjectUserCheck(); - else - uassert(13608, "query has to be blank or an Object", !q.trueValue()); - - - BSONElement s = cmdObj["sort"]; - if (s.type() == Object) - sort = s.embeddedObjectUserCheck(); - else - uassert(13609, "sort has to be blank or an Object", !s.trueValue()); - - BSONElement collationElt = cmdObj["collation"]; - if (collationElt.type() == Object) - collation = collationElt.embeddedObjectUserCheck(); - else - uassert(40082, - str::stream() - << "mapReduce 'collation' parameter must be of type Object but found type: " - << typeName(collationElt.type()), - collationElt.eoo()); - - if (cmdObj["limit"].isNumber()) - limit = cmdObj["limit"].numberLong(); - else - limit = 0; - } -} - -/** - * Create temporary collection, set up indexes - */ -void State::prepTempCollection() { - if (!_onDisk) - return; - - // Make sure we enforce prepare conflicts before writing. - EnforcePrepareConflictsBlock enforcePrepare(_opCtx); - - dropTempCollections( - _opCtx, _config.tempNamespace, _useIncremental ? _config.incLong : NamespaceString()); - - if (_useIncremental) { - // Create the inc collection and make sure we have index on "0" key. The inc collection is - // in the "local" database, so it does not get replicated to secondaries. - writeConflictRetry(_opCtx, "M/R prepTempCollection", _config.incLong.ns(), [this] { - AutoGetOrCreateDb autoGetIncCollDb(_opCtx, _config.incLong.db(), MODE_X); - auto const db = autoGetIncCollDb.getDb(); - invariant(!CollectionCatalog::get(_opCtx).lookupCollectionByNamespace(_opCtx, - _config.incLong)); - - CollectionOptions options; - options.setNoIdIndex(); - options.temp = true; - options.uuid.emplace(UUID::gen()); - - WriteUnitOfWork wuow(_opCtx); - auto incColl = db->createCollection( - _opCtx, _config.incLong, options, false /* force no _id index */); - - auto rawIndexSpec = BSON("key" << BSON("0" << 1) << "name" - << "_temp_0"); - auto indexSpec = uassertStatusOK(index_key_validate::validateIndexSpec( - _opCtx, rawIndexSpec, serverGlobalParams.featureCompatibility)); - - uassertStatusOKWithContext( - incColl->getIndexCatalog()->createIndexOnEmptyCollection(_opCtx, indexSpec), - str::stream() << "createIndex failed for mr incLong ns " << _config.incLong.ns()); - wuow.commit(); - - CollectionShardingState::get(_opCtx, _config.incLong) - ->setFilteringMetadata(_opCtx, CollectionMetadata()); - }); - } - - CollectionOptions finalOptions; - vector<BSONObj> indexesToInsert; - - { - // Copy indexes and collection options into temporary storage - AutoGetCollection autoGetFinalColl(_opCtx, _config.outputOptions.finalNamespace, MODE_IS); - - auto const finalColl = autoGetFinalColl.getCollection(); - if (finalColl) { - finalOptions = DurableCatalog::get(_opCtx)->getCollectionOptions( - _opCtx, finalColl->getCatalogId()); - - std::unique_ptr<IndexCatalog::IndexIterator> ii = - finalColl->getIndexCatalog()->getIndexIterator(_opCtx, true); - // Iterate over finalColl's indexes. - while (ii->more()) { - const IndexDescriptor* currIndex = ii->next()->descriptor(); - BSONObjBuilder b; - - // Copy over contents of the index descriptor's infoObj. - BSONObjIterator j(currIndex->infoObj()); - while (j.more()) { - BSONElement e = j.next(); - if (e.fieldNameStringData() == "_id") - continue; - b.append(e); - } - indexesToInsert.push_back(b.obj()); - } - } - } - - writeConflictRetry(_opCtx, "M/R prepTempCollection", _config.tempNamespace.ns(), [&] { - // Create temp collection and insert the indexes from temporary storage - AutoGetOrCreateDb autoGetFinalDb(_opCtx, _config.tempNamespace.db(), MODE_X); - auto const db = autoGetFinalDb.getDb(); - invariant(!CollectionCatalog::get(_opCtx).lookupCollectionByNamespace( - _opCtx, _config.tempNamespace)); - - uassert( - ErrorCodes::PrimarySteppedDown, - str::stream() << "no longer primary while creating temporary collection for mapReduce: " - << _config.tempNamespace.ns(), - repl::ReplicationCoordinator::get(_opCtx)->canAcceptWritesFor(_opCtx, - _config.tempNamespace)); - - CollectionOptions options = finalOptions; - options.temp = true; - - // If a UUID for the final output collection was sent by mongos (i.e., the final output - // collection is sharded), use the UUID mongos sent when creating the temp collection. - // When the temp collection is renamed to the final output collection, the UUID will be - // preserved. - options.uuid.emplace(_config.finalOutputCollUUID ? *_config.finalOutputCollUUID - : UUID::gen()); - - // Override createCollection's prohibition on creating new replicated collections without an - // _id index. - const bool buildIdIndex = (options.autoIndexId == CollectionOptions::YES || - options.autoIndexId == CollectionOptions::DEFAULT); - - WriteUnitOfWork wuow(_opCtx); - auto const tempColl = - db->createCollection(_opCtx, _config.tempNamespace, options, buildIdIndex); - - // Secondary index builds do not filter existing indexes so we have to do this on the - // primary. - auto removeIndexBuildsToo = false; - auto filteredIndexes = tempColl->getIndexCatalog()->removeExistingIndexes( - _opCtx, indexesToInsert, removeIndexBuildsToo); - - if (!filteredIndexes.empty()) { - auto fromMigrate = false; - IndexBuildsCoordinator::get(_opCtx)->createIndexesOnEmptyCollection( - _opCtx, tempColl->uuid(), filteredIndexes, fromMigrate); - } - - wuow.commit(); - - CollectionShardingState::get(_opCtx, _config.tempNamespace) - ->setFilteringMetadata(_opCtx, CollectionMetadata()); - }); -} - -/** - * For inline mode, appends results to output object. - * Makes sure (key, value) tuple is formatted as {_id: key, value: val} - */ -void State::appendResults(BSONObjBuilder& final) { - if (_onDisk) { - if (!_config.outputOptions.outDB.empty()) { - BSONObjBuilder loc; - if (!_config.outputOptions.outDB.empty()) - loc.append("db", _config.outputOptions.outDB); - if (!_config.outputOptions.collectionName.empty()) - loc.append("collection", _config.outputOptions.collectionName); - final.append("result", loc.obj()); - } else { - if (!_config.outputOptions.collectionName.empty()) - final.append("result", _config.outputOptions.collectionName); - } - - if (_config.splitInfo > 0) { - // add split points, used for shard - BSONObj res; - BSONObj idKey = BSON("_id" << 1); - if (!_db.runCommand("admin", - BSON("splitVector" << _config.outputOptions.finalNamespace.ns() - << "keyPattern" << idKey << "maxChunkSizeBytes" - << _config.splitInfo), - res)) { - uasserted(15921, str::stream() << "splitVector failed: " << res); - } - if (res.hasField("splitKeys")) - final.append(res.getField("splitKeys")); - } - return; - } - - if (_jsMode) { - ScriptingFunction getResult = _scope->createFunction( - "var map = _mrMap;" - "var result = [];" - "for (key in map) {" - " result.push({_id: key, value: map[key]});" - "}" - "return result;"); - _scope->invoke(getResult, nullptr, nullptr, 0, false); - BSONObj obj = _scope->getObject("__returnValue"); - final.append("results", BSONArray(obj)); - return; - } - - uassert(13604, "too much data for in memory map/reduce", _size < BSONObjMaxUserSize); - - BSONArrayBuilder b((int)(_size * 1.2)); // _size is data size, doesn't count overhead and keys - - for (const auto& entry : *_temp) { - const BSONObj& key = entry.first; - const BSONList& all = entry.second; - - verify(all.size() == 1); - - BSONObjIterator vi(all[0]); - vi.next(); - - BSONObjBuilder temp(b.subobjStart()); - temp.appendAs(key.firstElement(), "_id"); - temp.appendAs(vi.next(), "value"); - temp.done(); - } - - BSONArray res = b.arr(); - final.append("results", res); -} - -/** - * Does post processing on output collection. - * This may involve replacing, merging or reducing. - */ -long long State::postProcessCollection(OperationContext* opCtx, CurOp* curOp) { - if (_onDisk == false || _config.outputOptions.outType == OutputType::InMemory) - return numInMemKeys(); - - bool holdingGlobalLock = false; - if (_config.outputOptions.outNonAtomic) - return postProcessCollectionNonAtomic(opCtx, curOp, holdingGlobalLock); - - invariant(!opCtx->lockState()->isLocked()); - - // This must be global because we may write across different databases. - Lock::GlobalWrite lock(opCtx); - holdingGlobalLock = true; - return postProcessCollectionNonAtomic(opCtx, curOp, holdingGlobalLock); -} - -long long State::postProcessCollectionNonAtomic(OperationContext* opCtx, - CurOp* curOp, - bool callerHoldsGlobalLock) { - // Make sure we enforce prepare conflicts before writing. - EnforcePrepareConflictsBlock enforcePrepare(opCtx); - - auto outputCount = - collectionCount(opCtx, _config.outputOptions.finalNamespace, callerHoldsGlobalLock); - - // Determine whether the temp collection should be renamed to the final output collection and - // thus preserve the UUID. This is possible in the following cases: - // * Output mode "replace" - // * If this mapReduce is creating a new sharded output collection, which can be determined by - // whether mongos sent the UUID that the final output collection should have (that is, whether - // _config.finalOutputCollUUID is set). - if (_config.outputOptions.outType == OutputType::Replace || _config.finalOutputCollUUID) { - // This must be global because we may write across different databases. - Lock::GlobalWrite lock(opCtx); - // replace: just rename from temp to final collection name, dropping previous collection - _db.dropCollection(_config.outputOptions.finalNamespace.ns()); - BSONObj info; - - if (!_db.runCommand("admin", - BSON("renameCollection" << _config.tempNamespace.ns() << "to" - << _config.outputOptions.finalNamespace.ns() - << "stayTemp" << _config.shardedFirstPass), - info)) { - uasserted(10076, str::stream() << "rename failed: " << info); - } - - _db.dropCollection(_config.tempNamespace.ns()); - } else if (_config.outputOptions.outType == OutputType::Merge) { - // merge: upsert new docs into old collection - ProgressMeterHolder pm; - { - stdx::unique_lock<Client> lk(*opCtx->getClient()); - pm.set(curOp->setProgress_inlock("M/R Merge Post Processing", outputCount)); - } - - unique_ptr<DBClientCursor> cursor = _db.query(_config.tempNamespace, BSONObj()); - while (cursor->more()) { - Lock::DBLock lock(opCtx, _config.outputOptions.finalNamespace.db(), MODE_X); - BSONObj o = cursor->nextSafe(); - Helpers::upsert(opCtx, _config.outputOptions.finalNamespace.ns(), o); - pm.hit(); - } - _db.dropCollection(_config.tempNamespace.ns()); - pm.finished(); - } else if (_config.outputOptions.outType == OutputType::Reduce) { - // reduce: apply reduce op on new result and existing one - BSONList values; - - ProgressMeterHolder pm; - { - stdx::unique_lock<Client> lk(*opCtx->getClient()); - pm.set(curOp->setProgress_inlock("M/R Reduce Post Processing", outputCount)); - } - - unique_ptr<DBClientCursor> cursor = _db.query(_config.tempNamespace, BSONObj()); - while (cursor->more()) { - // This must be global because we may write across different databases. - Lock::GlobalWrite lock(opCtx); - BSONObj temp = cursor->nextSafe(); - BSONObj old; - - const bool found = [&] { - AutoGetCollection autoColl(opCtx, _config.outputOptions.finalNamespace, MODE_IS); - return Helpers::findOne( - opCtx, autoColl.getCollection(), temp["_id"].wrap(), old, true); - }(); - - if (found) { - // need to reduce - values.clear(); - values.push_back(temp); - values.push_back(old); - Helpers::upsert(opCtx, - _config.outputOptions.finalNamespace.ns(), - _config.reducer->finalReduce(values, _config.finalizer.get())); - } else { - Helpers::upsert(opCtx, _config.outputOptions.finalNamespace.ns(), temp); - } - pm.hit(); - } - pm.finished(); - } - - return collectionCount(opCtx, _config.outputOptions.finalNamespace, callerHoldsGlobalLock); -} - -/** - * Insert doc in collection. This should be replicated. - */ -void State::insert(const NamespaceString& nss, const BSONObj& o) { - invariant(_onDisk); - - // Make sure we enforce prepare conflicts before writing. - EnforcePrepareConflictsBlock enforcePrepare(_opCtx); - - writeConflictRetry(_opCtx, "M/R insert", nss.ns(), [this, &nss, &o] { - AutoGetCollection autoColl(_opCtx, nss, MODE_IX); - uassert( - ErrorCodes::PrimarySteppedDown, - str::stream() << "no longer primary while inserting mapReduce result into collection: " - << nss << ": " << redact(o), - repl::ReplicationCoordinator::get(_opCtx)->canAcceptWritesFor(_opCtx, nss)); - assertCollectionNotNull(nss, autoColl); - - WriteUnitOfWork wuow(_opCtx); - BSONObjBuilder b; - if (!o.hasField("_id")) { - b.appendOID("_id", nullptr, true); - } - b.appendElements(o); - BSONObj bo = b.obj(); - - auto fixedDoc = uassertStatusOK(fixDocumentForInsert(_opCtx->getServiceContext(), bo)); - if (!fixedDoc.isEmpty()) { - bo = fixedDoc; - } - - // TODO: Consider whether to pass OpDebug for stats tracking under SERVER-23261. - OpDebug* const nullOpDebug = nullptr; - uassertStatusOK(autoColl.getCollection()->insertDocument( - _opCtx, InsertStatement(bo), nullOpDebug, true)); - wuow.commit(); - }); -} - -/** - * Insert doc into the inc collection. The inc collection is in the "local" database, so this insert - * will not be replicated. - */ -void State::_insertToInc(BSONObj& o) { - verify(_onDisk); - - // Make sure we enforce prepare conflicts before writing. - EnforcePrepareConflictsBlock enforcePrepare(_opCtx); - - writeConflictRetry(_opCtx, "M/R insertToInc", _config.incLong.ns(), [this, &o] { - AutoGetCollection autoColl(_opCtx, _config.incLong, MODE_IX); - assertCollectionNotNull(_config.incLong, autoColl); - - WriteUnitOfWork wuow(_opCtx); - // The documents inserted into the incremental collection are of the form - // {"0": <key>, "1": <value>}, so we cannot call fixDocumentForInsert(o) here because the - // check that the document has an "_id" field would fail. Instead, we directly verify that - // the size of the document to insert is smaller than 16MB. - if (o.objsize() > BSONObjMaxUserSize) { - uasserted(ErrorCodes::BadValue, - str::stream() << "object to insert too large for incremental collection" - << ". size in bytes: " << o.objsize() - << ", max size: " << BSONObjMaxUserSize); - } - - // TODO: Consider whether to pass OpDebug for stats tracking under SERVER-23261. - OpDebug* const nullOpDebug = nullptr; - uassertStatusOK(autoColl.getCollection()->insertDocument( - _opCtx, InsertStatement(o), nullOpDebug, false)); - wuow.commit(); - }); -} - -State::State(OperationContext* opCtx, const Config& c) - : _config(c), - _db(opCtx), - _useIncremental(true), - _opCtx(opCtx), - _size(0), - _dupCount(0), - _numEmits(0) { - _temp.reset(new InMemory()); - _onDisk = _config.outputOptions.outType != OutputType::InMemory; -} - -bool State::sourceExists() { - return _db.exists(_config.nss.ns()); -} - -State::~State() { - if (_onDisk) { - try { - // If we're here because the map-reduce got interrupted, any attempt to drop temporary - // collections within the same operation context is guaranteed to fail as soon as we try - // to take the X-lock for the database. (An UninterruptibleLockGuard would allow - // dropTempCollections() to take the locks it needs, but taking an X-lock in - // UninterruptibleLockGuard context is not allowed.) - // - // We don't want every single interrupted map-reduce to leak temporary collections that - // will stick around until a server restart, so we execute the cleanup as though it's a - // new operation, by constructing a new Client and OperationContext. It's possible that - // the new operation will also get interrupted, but dropTempCollections() is short - // lived, so the odds are acceptably low. - auto cleanupClient = _opCtx->getServiceContext()->makeClient("M/R cleanup"); - AlternativeClientRegion acr(cleanupClient); - auto cleanupOpCtx = cc().makeOperationContext(); - dropTempCollections(cleanupOpCtx.get(), - _config.tempNamespace, - _useIncremental ? _config.incLong : NamespaceString()); - } catch (...) { - LOGV2_ERROR(20494, - "Unable to drop temporary collection created by mapReduce: " - "{config_tempNamespace}. This collection will be removed automatically " - "the next time the server starts up. {exceptionToStatus}", - "config_tempNamespace"_attr = _config.tempNamespace, - "exceptionToStatus"_attr = exceptionToStatus()); - } - } - if (_scope && !_scope->isKillPending() && _scope->getError().empty()) { - // cleanup js objects - try { - ScriptingFunction cleanup = - _scope->createFunction("delete _emitCt; delete _keyCt; delete _mrMap;"); - _scope->invoke(cleanup, nullptr, nullptr, 0, true); - } catch (const DBException&) { - // not important because properties will be reset if scope is reused - LOGV2_DEBUG(20483, 1, "MapReduce terminated during state destruction"); - } - } -} - -/** - * Initialize the mapreduce operation, creating the inc collection - */ -void State::init() { - // setup js - _scope.reset(getGlobalScriptEngine()->newScopeForCurrentThread()); - _scope->requireOwnedObjects(); - _scope->registerOperation(_opCtx); - _scope->setLocalDB(_config.dbname); - _scope->loadStored(_opCtx, true); - - if (!_config.scopeSetup.isEmpty()) - _scope->init(&_config.scopeSetup); - - _config.mapper->init(this); - _config.reducer->init(this); - if (_config.finalizer) - _config.finalizer->init(this); - _scope->setBoolean("_doFinal", _config.finalizer.get() != nullptr); - - switchMode(_config.jsMode); // set up js-mode based on Config - - // global JS map/reduce hashmap - // we use a standard JS object which means keys are only simple types - // we could also add a real hashmap from a library and object comparison methods - // for increased performance, we may want to look at v8 Harmony Map support - // _scope->setObject("_mrMap", BSONObj(), false); - ScriptingFunction init = _scope->createFunction( - "_emitCt = 0;" - "_keyCt = 0;" - "_dupCt = 0;" - "_redCt = 0;" - "if (typeof(_mrMap) === 'undefined') {" - " _mrMap = {};" - "}"); - _scope->invoke(init, nullptr, nullptr, 0, true); - - // js function to run reduce on all keys - // redfunc = _scope->createFunction("for (var key in hashmap) { print('Key is ' + key); - // list = hashmap[key]; ret = reduce(key, list); print('Value is ' + ret); };"); - _reduceAll = _scope->createFunction( - "var map = _mrMap;" - "var list, ret;" - "for (var key in map) {" - " list = map[key];" - " if (list.length != 1) {" - " ret = _reduce(key, list);" - " map[key] = [ret];" - " ++_redCt;" - " }" - "}" - "_dupCt = 0;"); - massert(16717, "error initializing JavaScript reduceAll function", _reduceAll != 0); - - _reduceAndEmit = _scope->createFunction( - "var map = _mrMap;" - "var list, ret;" - "for (var key in map) {" - " list = map[key];" - " if (list.length == 1)" - " ret = list[0];" - " else {" - " ret = _reduce(key, list);" - " ++_redCt;" - " }" - " emit(key, ret);" - "}" - "delete _mrMap;"); - massert(16718, "error initializing JavaScript reduce/emit function", _reduceAndEmit != 0); - - _reduceAndFinalize = _scope->createFunction( - "var map = _mrMap;" - "var list, ret;" - "for (var key in map) {" - " list = map[key];" - " if (list.length == 1) {" - " if (!_doFinal) { continue; }" - " ret = list[0];" - " }" - " else {" - " ret = _reduce(key, list);" - " ++_redCt;" - " }" - " if (_doFinal)" - " ret = _finalize(key, ret);" - " map[key] = ret;" - "}"); - massert(16719, "error creating JavaScript reduce/finalize function", _reduceAndFinalize != 0); - - _reduceAndFinalizeAndInsert = _scope->createFunction( - "var map = _mrMap;" - "var list, ret;" - "for (var key in map) {" - " list = map[key];" - " if (list.length == 1)" - " ret = list[0];" - " else {" - " ret = _reduce(key, list);" - " ++_redCt;" - " }" - " if (_doFinal)" - " ret = _finalize(key, ret);" - " _nativeToTemp({_id: key, value: ret});" - "}"); - massert(16720, "error initializing JavaScript functions", _reduceAndFinalizeAndInsert != 0); -} - -void State::switchMode(bool jsMode) { - _jsMode = jsMode; - if (jsMode) { - // Emit function that stays in JS - _scope->setFunction("emit", - "function(key, value) {" - " if (typeof(key) === 'object') {" - " _bailFromJS(key, value);" - " return;" - " }" - " ++_emitCt;" - " var map = _mrMap;" - " var list = map[key];" - " if (!list) {" - " ++_keyCt;" - " list = [];" - " map[key] = list;" - " }" - " else" - " ++_dupCt;" - " list.push(value);" - "}"); - _scope->injectNative("_bailFromJS", _bailFromJS, this); - } else { - // Emit now populates C++ map - _scope->injectNative("emit", fastEmit, this); - } -} - -void State::bailFromJS() { - LOGV2_DEBUG(20484, 1, "M/R: Switching from JS mode to mixed mode"); - - // reduce and reemit into c++ - switchMode(false); - _scope->invoke(_reduceAndEmit, nullptr, nullptr, 0, true); - // need to get the real number emitted so far - _numEmits = _scope->getNumberInt("_emitCt"); - _config.reducer->numReduces = _scope->getNumberInt("_redCt"); -} - -/** - * Applies last reduce and finalize on a list of tuples (key, val) - * Inserts single result {_id: key, value: val} into temp collection - */ -void State::finalReduce(BSONList& values) { - if (!_onDisk || values.size() == 0) - return; - - BSONObj res = _config.reducer->finalReduce(values, _config.finalizer.get()); - insert(_config.tempNamespace, res); -} - -BSONObj _nativeToTemp(const BSONObj& args, void* data) { - State* state = (State*)data; - BSONObjIterator it(args); - state->insert(state->_config.tempNamespace, it.next().Obj()); - return BSONObj(); -} - -// BSONObj _nativeToInc( const BSONObj& args, void* data ) { -// State* state = (State*) data; -// BSONObjIterator it(args); -// const BSONObj& obj = it.next().Obj(); -// state->_insertToInc(const_cast<BSONObj&>(obj)); -// return BSONObj(); -// } - -/** - * Applies last reduce and finalize. - * After calling this method, the temp collection will be completed. - * If inline, the results will be in the in memory map - */ -void State::finalReduce(OperationContext* opCtx, CurOp* curOp) { - if (_jsMode) { - // apply the reduce within JS - if (_onDisk) { - _scope->injectNative("_nativeToTemp", _nativeToTemp, this); - _scope->invoke(_reduceAndFinalizeAndInsert, nullptr, nullptr, 0, true); - return; - } else { - _scope->invoke(_reduceAndFinalize, nullptr, nullptr, 0, true); - return; - } - } - - if (!_onDisk) { - // all data has already been reduced, just finalize - if (_config.finalizer) { - long size = 0; - for (InMemory::iterator i = _temp->begin(); i != _temp->end(); ++i) { - BSONObj key = i->first; - BSONList& all = i->second; - - verify(all.size() == 1); - - BSONObj res = _config.finalizer->finalize(all[0]); - - all.clear(); - all.push_back(res); - size += res.objsize(); - } - _size = size; - } - return; - } - - // use index on "0" to pull sorted data - verify(_temp->size() == 0); - BSONObj sortKey = BSON("0" << 1); - - { - AutoGetCollection autoIncColl(_opCtx, _config.incLong, MODE_IS); - assertCollectionNotNull(_config.incLong, autoIncColl); - - bool foundIndex = false; - std::unique_ptr<IndexCatalog::IndexIterator> ii = - autoIncColl.getCollection()->getIndexCatalog()->getIndexIterator(_opCtx, true); - // Iterate over incColl's indexes. - while (ii->more()) { - const IndexDescriptor* currIndex = ii->next()->descriptor(); - BSONObj x = currIndex->infoObj(); - if (sortKey.woCompare(x["key"].embeddedObject()) == 0) { - foundIndex = true; - break; - } - } - - invariant(foundIndex); - } - - boost::optional<AutoGetCollectionForReadCommand> ctx; - ctx.emplace(_opCtx, _config.incLong); - assertCollectionNotNull(_config.incLong, *ctx); - - BSONObj prev; - BSONList all; - - const auto count = _db.count(_config.incLong, BSONObj(), QueryOption_SlaveOk); - ProgressMeterHolder pm; - { - stdx::unique_lock<Client> lk(*opCtx->getClient()); - pm.set(curOp->setProgress_inlock("M/R: (3/3) Final Reduce", count)); - } - - const ExtensionsCallbackReal extensionsCallback(_opCtx, &_config.incLong); - - auto qr = std::make_unique<QueryRequest>(_config.incLong); - qr->setSort(sortKey); - - const boost::intrusive_ptr<ExpressionContext> expCtx; - auto statusWithCQ = - CanonicalQuery::canonicalize(opCtx, - std::move(qr), - expCtx, - extensionsCallback, - MatchExpressionParser::kAllowAllSpecialFeatures); - verify(statusWithCQ.isOK()); - std::unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue()); - - { - auto exec = uassertStatusOK(getExecutor(_opCtx, - ctx->getCollection(), - std::move(cq), - PlanExecutor::YIELD_AUTO, - QueryPlannerParams::NO_TABLE_SCAN)); - - // iterate over all sorted objects - BSONObj o; - PlanExecutor::ExecState state; - while (PlanExecutor::ADVANCED == (state = exec->getNext(&o, nullptr))) { - o = o.getOwned(); // we will be accessing outside of the lock - pm.hit(); - - if (dps::compareObjectsAccordingToSort(o, prev, sortKey) == 0) { - // object is same as previous, add to array - all.push_back(o); - if (pm->hits() % 100 == 0) { - _opCtx->checkForInterrupt(); - } - continue; - } - - exec->saveState(); - - ctx.reset(); - - // reduce a finalize array - finalReduce(all); - ctx.emplace(_opCtx, _config.incLong); - - all.clear(); - prev = o; - all.push_back(o); - - _opCtx->checkForInterrupt(); - exec->restoreState(); - } - - uassert(34428, - "Plan executor error during mapReduce command: " + - WorkingSetCommon::toStatusString(o), - PlanExecutor::IS_EOF == state); - } - ctx.reset(); - - // reduce and finalize last array - finalReduce(all); - ctx.emplace(_opCtx, _config.incLong); - - pm.finished(); -} - -/** - * Attempts to reduce objects in the memory map. - * A new memory map will be created to hold the results. - * If applicable, objects with unique key may be dumped to inc collection. - * Input and output objects are both {"0": key, "1": val} - */ -void State::reduceInMemory() { - if (_jsMode) { - // in js mode the reduce is applied when writing to collection - return; - } - - unique_ptr<InMemory> n(new InMemory()); // for new data - long nSize = 0; - _dupCount = 0; - - for (InMemory::iterator i = _temp->begin(); i != _temp->end(); ++i) { - BSONList& all = i->second; - - if (all.size() == 1) { - // only 1 value for this key - if (_onDisk) { - // this key has low cardinality, so just write to collection - _insertToInc(*(all.begin())); - } else { - // add to new map - nSize += _add(n.get(), all[0]); - } - } else if (all.size() > 1) { - // several values, reduce and add to map - BSONObj res = _config.reducer->reduce(all); - nSize += _add(n.get(), res); - } - } - - // swap maps - _temp.reset(n.release()); - _size = nSize; -} - -/** - * Dumps the entire in memory map to the inc collection. - */ -void State::dumpToInc() { - if (!_onDisk) - return; - - for (InMemory::iterator i = _temp->begin(); i != _temp->end(); i++) { - BSONList& all = i->second; - if (all.size() < 1) - continue; - - for (BSONList::iterator j = all.begin(); j != all.end(); j++) - _insertToInc(*j); - } - _temp->clear(); - _size = 0; -} - -/** - * Adds object to in memory map - */ -void State::emit(const BSONObj& a) { - _numEmits++; - _size += _add(_temp.get(), a); -} - -int State::_add(InMemory* im, const BSONObj& a) { - BSONList& all = (*im)[a]; - all.push_back(a); - if (all.size() > 1) { - ++_dupCount; - } - - return a.objsize() + 16; -} - -void State::reduceAndSpillInMemoryStateIfNeeded() { - // Make sure no DB locks are held, because this method manages its own locking and - // write units of work. - invariant(!_opCtx->lockState()->isLocked()); - - if (_jsMode) { - // try to reduce if it is beneficial - int dupCt = _scope->getNumberInt("_dupCt"); - int keyCt = _scope->getNumberInt("_keyCt"); - - if (keyCt > _config.jsMaxKeys) { - // too many keys for JS, switch to mixed - _bailFromJS(BSONObj(), this); - // then fall through to check map size - } else if (dupCt > (keyCt * _config.reduceTriggerRatio)) { - // reduce now to lower mem usage - Timer t; - _scope->invoke(_reduceAll, nullptr, nullptr, 0, true); - LOGV2_DEBUG(20485, - 3, - " MR - did reduceAll: keys={keyCt} dups={dupCt} " - "newKeys={scope_getNumberInt_keyCt} time={t_millis}ms", - "keyCt"_attr = keyCt, - "dupCt"_attr = dupCt, - "scope_getNumberInt_keyCt"_attr = _scope->getNumberInt("_keyCt"), - "t_millis"_attr = t.millis()); - return; - } - } - - if (_jsMode) - return; - - if (_size > _config.maxInMemSize || _dupCount > (_temp->size() * _config.reduceTriggerRatio)) { - // attempt to reduce in memory map, if memory is too high or we have many duplicates - long oldSize = _size; - Timer t; - reduceInMemory(); - LOGV2_DEBUG(20486, - 3, - " MR - did reduceInMemory: size={oldSize} dups={dupCount} newSize={size} " - "time={t_millis}ms", - "oldSize"_attr = oldSize, - "dupCount"_attr = _dupCount, - "size"_attr = _size, - "t_millis"_attr = t.millis()); - - // if size is still high, or values are not reducing well, dump - if (_onDisk && (_size > _config.maxInMemSize || _size > oldSize / 2)) { - dumpToInc(); - LOGV2_DEBUG(20487, 3, " MR - dumping to db"); - } - } -} - - -bool runMapReduce(OperationContext* opCtx, - const string& dbname, - const BSONObj& cmd, - string& errmsg, - BSONObjBuilder& result) { - Timer t; - - boost::optional<DisableDocumentValidation> maybeDisableValidation; - if (shouldBypassDocumentValidationForCommand(cmd)) - maybeDisableValidation.emplace(opCtx); - - auto client = opCtx->getClient(); - - if (client->isInDirectClient()) { - uasserted(ErrorCodes::IllegalOperation, "Cannot run mapReduce command from eval()"); - } - - auto curOp = CurOp::get(opCtx); - - const Config config(dbname, cmd); - - LOGV2_DEBUG(20488, 1, "mr ns: {config_nss}", "config_nss"_attr = config.nss); - - uassert(16149, "cannot run map reduce without the js engine", getGlobalScriptEngine()); - - const auto collectionFilter = [&] { - AutoGetCollectionForReadCommand autoColl(opCtx, config.nss); - return CollectionShardingState::get(opCtx, config.nss)->getOwnershipFilter(opCtx); - }(); - - bool shouldHaveData = false; - - BSONObjBuilder countsBuilder; - BSONObjBuilder timingBuilder; - try { - State state(opCtx, config); - if (!state.sourceExists()) { - uasserted(ErrorCodes::NamespaceNotFound, - str::stream() << "namespace does not exist: " << config.nss); - } - - state.init(); - state.prepTempCollection(); - - int64_t progressTotal = 0; - bool showTotal = true; - if (state.config().filter.isEmpty()) { - const bool holdingGlobalLock = false; - const auto count = collectionCount(opCtx, config.nss, holdingGlobalLock); - progressTotal = (config.limit && static_cast<unsigned long long>(config.limit) < count) - ? config.limit - : count; - } else { - showTotal = false; - // Set an arbitrary total > 0 so the meter will be activated. - progressTotal = 1; - } - - ProgressMeterHolder pm; - { - stdx::unique_lock<Client> lk(*opCtx->getClient()); - pm.set(curOp->setProgress_inlock("M/R: (1/3) Emit Phase", progressTotal)); - } - pm->showTotal(showTotal); - - long long mapTime = 0; - long long reduceTime = 0; - long long numInputs = 0; - - { - // We've got a cursor preventing migrations off, now re-establish our useful cursor - - // Need lock and context to use it - boost::optional<AutoGetCollection> scopedAutoColl; - scopedAutoColl.emplace(opCtx, config.nss, MODE_S); - assertCollectionNotNull(config.nss, *scopedAutoColl); - - if (state.isOnDisk()) { - // This means that it will be doing a write operation, make sure it is safe to - // do so - uassert(ErrorCodes::NotMaster, - "not master", - repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, - config.nss)); - } - - auto qr = std::make_unique<QueryRequest>(config.nss); - qr->setFilter(config.filter); - qr->setSort(config.sort); - qr->setCollation(config.collation); - - const ExtensionsCallbackReal extensionsCallback(opCtx, &config.nss); - - const boost::intrusive_ptr<ExpressionContext> expCtx; - auto cq = uassertStatusOKWithContext( - CanonicalQuery::canonicalize(opCtx, - std::move(qr), - expCtx, - extensionsCallback, - MatchExpressionParser::kAllowAllSpecialFeatures), - str::stream() << "Can't canonicalize query " << config.filter); - - auto exec = uassertStatusOK(getExecutor(opCtx, - scopedAutoColl->getCollection(), - std::move(cq), - PlanExecutor::YIELD_AUTO, - 0)); - - { - stdx::lock_guard<Client> lk(*opCtx->getClient()); - CurOp::get(opCtx)->setPlanSummary_inlock(Explain::getPlanSummary(exec.get())); - } - - Timer mt; - - BSONObj o; - PlanExecutor::ExecState execState; - while (PlanExecutor::ADVANCED == (execState = exec->getNext(&o, nullptr))) { - o = o.getOwned(); // The object will be accessed outside of collection lock - - // Check to see if this is a new object we don't own yet because of a chunk - // migration - if (collectionFilter.isSharded()) { - ShardKeyPattern kp(collectionFilter.getKeyPattern()); - if (!collectionFilter.keyBelongsToMe(kp.extractShardKeyFromDoc(o))) { - continue; - } - } - - if (config.verbose) - mt.reset(); - - config.mapper->map(o); - - if (config.verbose) - mapTime += mt.micros(); - - // Check if the state accumulated so far needs to be written to a collection. - // This may yield the DB lock temporarily and then acquire it again. - numInputs++; - if (numInputs % 100 == 0) { - Timer t; - - // TODO: As an optimization, we might want to do the save/restore state and - // yield inside the reduceAndSpillInMemoryState method, so it only happens - // if necessary. - exec->saveState(); - - scopedAutoColl.reset(); - - state.reduceAndSpillInMemoryStateIfNeeded(); - scopedAutoColl.emplace(opCtx, config.nss, MODE_S); - - exec->restoreState(); - - reduceTime += t.micros(); - - opCtx->checkForInterrupt(); - } - - pm.hit(); - - if (config.limit && numInputs >= config.limit) - break; - } - - if (PlanExecutor::FAILURE == execState) { - uasserted(ErrorCodes::OperationFailed, - str::stream() << "Executor error during mapReduce command: " - << WorkingSetCommon::toStatusString(o)); - } - - // Record the indexes used by the PlanExecutor. - PlanSummaryStats stats; - Explain::getSummaryStats(*exec, &stats); - - // TODO SERVER-23261: Confirm whether this is the correct place to gather all - // metrics. There is no harm adding here for the time being. - curOp->debug().setPlanSummaryMetrics(stats); - CollectionQueryInfo::get(scopedAutoColl->getCollection()).notifyOfQuery(opCtx, stats); - - if (curOp->shouldDBProfile()) { - BSONObjBuilder execStatsBob; - Explain::getWinningPlanStats(exec.get(), &execStatsBob); - curOp->debug().execStats = execStatsBob.obj(); - } - } - pm.finished(); - - opCtx->checkForInterrupt(); - - // update counters - countsBuilder.appendNumber("input", numInputs); - countsBuilder.appendNumber("emit", state.numEmits()); - if (state.numEmits()) - shouldHaveData = true; - - timingBuilder.appendNumber("mapTime", mapTime / 1000); - timingBuilder.append("emitLoop", t.millis()); - - { - stdx::lock_guard<Client> lk(*opCtx->getClient()); - curOp->setMessage_inlock("M/R: (2/3) Final In-Memory Reduce"); - } - Timer rt; - // do reduce in memory - // this will be the last reduce needed for inline mode - state.reduceInMemory(); - // if not inline: dump the in memory map to inc collection, all data is on disk - state.dumpToInc(); - // final reduce - state.finalReduce(opCtx, curOp); - reduceTime += rt.micros(); - - // Ensure the profile shows the source namespace. If the output was not inline, the - // active namespace will be the temporary collection we inserted into. - { - stdx::lock_guard<Client> lk(*opCtx->getClient()); - curOp->setNS_inlock(config.nss.ns()); - } - - countsBuilder.appendNumber("reduce", state.numReduces()); - timingBuilder.appendNumber("reduceTime", reduceTime / 1000); - timingBuilder.append("mode", state.jsMode() ? "js" : "mixed"); - - long long finalCount = state.postProcessCollection(opCtx, curOp); - state.appendResults(result); - - timingBuilder.appendNumber("total", t.millis()); - result.appendNumber("timeMillis", t.millis()); - countsBuilder.appendNumber("output", finalCount); - if (config.verbose) - result.append("timing", timingBuilder.obj()); - result.append("counts", countsBuilder.obj()); - - if (finalCount == 0 && shouldHaveData) { - result.append("cmd", cmd); - errmsg = "there were emits but no data!"; - return false; - } - } catch (StaleConfigException& e) { - if (serverGlobalParams.featureCompatibility.isVersionInitialized() && - serverGlobalParams.featureCompatibility.getVersion() == - ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo44) { - invariant(e.extraInfo<StaleConfigInfo>()->getShardId()); - } - - LOGV2(20489, "mr detected stale config, should retry{e}", "e"_attr = redact(e)); - throw; - } - // TODO: The error handling code for queries is v. fragile, - // *requires* rethrow AssertionExceptions - should probably fix. - catch (AssertionException& e) { - LOGV2(20490, "mr failed, removing collection{e}", "e"_attr = redact(e)); - throw; - } catch (std::exception& e) { - LOGV2(20491, "mr failed, removing collection{causedBy_e}", "causedBy_e"_attr = causedBy(e)); - throw; - } catch (...) { - LOGV2(20492, "mr failed for unknown reason, removing collection"); - throw; - } - - return true; -} - - -bool runMapReduceShardedFinish(OperationContext* opCtx, - const string& dbname, - const BSONObj& cmdObj, - BSONObjBuilder& result) { - if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { - uasserted(ErrorCodes::CommandNotSupported, - str::stream() << "Can not execute mapReduce with output database " << dbname - << " which lives on config servers"); - } - - boost::optional<DisableDocumentValidation> maybeDisableValidation; - if (shouldBypassDocumentValidationForCommand(cmdObj)) - maybeDisableValidation.emplace(opCtx); - - // legacy name - const auto shardedOutputCollectionElt = cmdObj["shardedOutputCollection"]; - uassert(ErrorCodes::InvalidNamespace, - "'shardedOutputCollection' must be of type String", - shardedOutputCollectionElt.type() == BSONType::String); - const std::string shardedOutputCollection = shardedOutputCollectionElt.str(); - verify(shardedOutputCollection.size() > 0); - - std::string inputNS; - if (cmdObj["inputDB"].type() == String) { - inputNS = - NamespaceString(cmdObj["inputDB"].valueStringData(), shardedOutputCollection).ns(); - } else { - inputNS = NamespaceString(dbname, shardedOutputCollection).ns(); - } - - CurOp* curOp = CurOp::get(opCtx); - - Config config(dbname, cmdObj.firstElement().embeddedObjectUserCheck()); - - if (cmdObj.hasField("shardedOutputCollUUID")) { - config.finalOutputCollUUID = uassertStatusOK(UUID::parse(cmdObj["shardedOutputCollUUID"])); - } - - State state(opCtx, config); - state.init(); - - // no need for incremental collection because records are already sorted - state._useIncremental = false; - config.incLong = config.tempNamespace; - - BSONObj shardCounts = cmdObj["shardCounts"].embeddedObjectUserCheck(); - BSONObj counts = cmdObj["counts"].embeddedObjectUserCheck(); - - { - stdx::unique_lock<Client> lk(*opCtx->getClient()); - curOp->setMessage_inlock("M/R Merge Sort and Reduce"); - } - set<string> servers; - - { - // Parse per shard results - BSONObjIterator i(shardCounts); - while (i.more()) { - BSONElement e = i.next(); - std::string server = e.fieldName(); - servers.insert(server); - - uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, server)); - } - } - - state.prepTempCollection(); - - std::vector<Chunk> chunks; - - if (config.outputOptions.outType != OutputType::InMemory) { - auto outRoutingInfoStatus = Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo( - opCtx, config.outputOptions.finalNamespace); - uassertStatusOK(outRoutingInfoStatus.getStatus()); - - if (auto cm = outRoutingInfoStatus.getValue().cm()) { - // Fetch result from other shards 1 chunk at a time. It would be better to do just - // one big $or query, but then the sorting would not be efficient. - const auto shardId = ShardingState::get(opCtx)->shardId(); - - for (const auto& chunk : cm->chunks()) { - if (chunk.getShardId() == shardId) { - chunks.push_back(chunk); - } - } - } - } - - long long inputCount = 0; - unsigned int index = 0; - BSONObj query; - BSONList values; - - while (true) { - if (chunks.size() > 0) { - const auto& chunk = chunks[index]; - BSONObjBuilder b; - b.appendAs(chunk.getMin().firstElement(), "$gte"); - b.appendAs(chunk.getMax().firstElement(), "$lt"); - query = BSON("_id" << b.obj()); - } - - // reduce from each shard for a chunk - BSONObj sortKey = BSON("_id" << 1); - ParallelSortClusteredCursor cursor( - servers, inputNS, Query(query).sort(sortKey), QueryOption_NoCursorTimeout); - cursor.init(opCtx); - - while (cursor.more() || !values.empty()) { - BSONObj t; - if (cursor.more()) { - t = cursor.next().getOwned(); - ++inputCount; - - if (values.size() == 0) { - values.push_back(t); - continue; - } - - if (dps::compareObjectsAccordingToSort(t, *(values.begin()), sortKey) == 0) { - values.push_back(t); - continue; - } - } - - BSONObj res = config.reducer->finalReduce(values, config.finalizer.get()); - if (state.isOnDisk()) - state.insert(config.tempNamespace, res); - else - state.emit(res); - - values.clear(); - - if (!t.isEmpty()) - values.push_back(t); - } - - if (++index >= chunks.size()) - break; - } - - // Forget temporary input collection, if output is sharded collection - ShardConnection::forgetNS(inputNS); - - long long outputCount = state.postProcessCollection(opCtx, curOp); - state.appendResults(result); - - BSONObjBuilder countsB(32); - countsB.append("input", inputCount); - countsB.append("reduce", state.numReduces()); - countsB.append("output", outputCount); - result.append("counts", countsB.obj()); - - return true; -} - -} // namespace mr -} // namespace mongo |