/** * Copyright (C) 2012 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand #include "mongo/platform/basic.h" #include "mongo/db/commands/mr.h" #include "mongo/base/status_with.h" #include "mongo/bson/util/builder.h" #include "mongo/client/connpool.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/bson/dotted_path_support.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/collection_catalog_entry.h" #include "mongo/db/catalog/database_holder.h" #include "mongo/db/catalog/document_validation.h" #include "mongo/db/catalog/index_catalog.h" #include "mongo/db/catalog/index_key_validate.h" #include "mongo/db/client.h" #include "mongo/db/clientcursor.h" #include "mongo/db/commands.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/db.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/exec/working_set_common.h" #include "mongo/db/index/index_descriptor.h" #include "mongo/db/matcher/extensions_callback_real.h" #include "mongo/db/op_observer.h" #include "mongo/db/ops/insert.h" #include "mongo/db/query/find_common.h" #include "mongo/db/query/get_executor.h" #include "mongo/db/query/plan_summary_stats.h" #include "mongo/db/query/query_planner.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/s/collection_metadata.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/server_options.h" #include "mongo/db/service_context.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/client/parallel.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/s/stale_exception.h" #include "mongo/scripting/engine.h" #include "mongo/stdx/mutex.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/scopeguard.h" namespace mongo { using std::set; using std::shared_ptr; using std::string; using std::stringstream; using std::unique_ptr; using std::vector; using IndexVersion = IndexDescriptor::IndexVersion; namespace dps = ::mongo::dotted_path_support; namespace mr { AtomicUInt32 Config::JOB_NUMBER; JSFunction::JSFunction(const std::string& type, const BSONElement& e) { _type = type; _code = e._asCode(); if (e.type() == CodeWScope) _wantedScope = e.codeWScopeObject(); } void JSFunction::init(State* state) { _scope = state->scope(); verify(_scope); _scope->init(&_wantedScope); _func = _scope->createFunction(_code.c_str()); uassert(13598, str::stream() << "couldn't compile code for: " << _type, _func); // install in JS scope so that it can be called in JS mode _scope->setFunction(_type.c_str(), _code.c_str()); } void JSMapper::init(State* state) { _func.init(state); _params = state->config().mapParams; } /** * Applies the map function to an object, which should internally call emit() */ void JSMapper::map(const BSONObj& o) { Scope* s = _func.scope(); verify(s); if (s->invoke(_func.func(), &_params, &o, 0, true)) uasserted(9014, str::stream() << "map invoke failed: " << s->getError()); } /** * Applies the finalize function to a tuple obj (key, val) * Returns tuple obj {_id: key, value: newval} */ BSONObj JSFinalizer::finalize(const BSONObj& o) { Scope* s = _func.scope(); Scope::NoDBAccess no = s->disableDBAccess("can't access db inside finalize"); s->invokeSafe(_func.func(), &o, 0); // don't want to use o.objsize() to size b // since there are many cases where the point of finalize // is converting many fields to 1 BSONObjBuilder b; b.append(o.firstElement()); s->append(b, "value", "__returnValue"); return b.obj(); } void JSReducer::init(State* state) { _func.init(state); } /** * Reduces a list of tuple objects (key, value) to a single tuple {"0": key, "1": value} */ BSONObj JSReducer::reduce(const BSONList& tuples) { if (tuples.size() <= 1) return tuples[0]; BSONObj key; int endSizeEstimate = 16; _reduce(tuples, key, endSizeEstimate); BSONObjBuilder b(endSizeEstimate); b.appendAs(key.firstElement(), "0"); _func.scope()->append(b, "1", "__returnValue"); return b.obj(); } /** * Reduces a list of tuple object (key, value) to a single tuple {_id: key, value: val} * Also applies a finalizer method if present. */ BSONObj JSReducer::finalReduce(const BSONList& tuples, Finalizer* finalizer) { BSONObj res; BSONObj key; if (tuples.size() == 1) { // 1 obj, just use it key = tuples[0]; BSONObjBuilder b(key.objsize()); BSONObjIterator it(key); b.appendAs(it.next(), "_id"); b.appendAs(it.next(), "value"); res = b.obj(); } else { // need to reduce int endSizeEstimate = 16; _reduce(tuples, key, endSizeEstimate); BSONObjBuilder b(endSizeEstimate); b.appendAs(key.firstElement(), "_id"); _func.scope()->append(b, "value", "__returnValue"); res = b.obj(); } if (finalizer) { res = finalizer->finalize(res); } return res; } /** * actually applies a reduce, to a list of tuples (key, value). * After the call, tuples will hold a single tuple {"0": key, "1": value} */ void JSReducer::_reduce(const BSONList& tuples, BSONObj& key, int& endSizeEstimate) { uassert(10074, "need values", tuples.size()); int sizeEstimate = (tuples.size() * tuples.begin()->getField("value").size()) + 128; // need to build the reduce args: ( key, [values] ) BSONObjBuilder reduceArgs(sizeEstimate); std::unique_ptr valueBuilder; unsigned n = 0; for (; n < tuples.size(); n++) { BSONObjIterator j(tuples[n]); BSONElement keyE = j.next(); if (n == 0) { reduceArgs.append(keyE); key = keyE.wrap(); valueBuilder.reset(new BSONArrayBuilder(reduceArgs.subarrayStart("tuples"))); } BSONElement ee = j.next(); uassert(13070, "value too large to reduce", ee.size() < (BSONObjMaxUserSize / 2)); // If adding this element to the array would cause it to be too large, break. The // remainder of the tuples will be processed recursively at the end of this // function. if (valueBuilder->len() + ee.size() > BSONObjMaxUserSize) { verify(n > 1); // if not, inf. loop break; } valueBuilder->append(ee); } verify(valueBuilder); valueBuilder->done(); BSONObj args = reduceArgs.obj(); Scope* s = _func.scope(); s->invokeSafe(_func.func(), &args, 0); ++numReduces; if (s->type("__returnValue") == Array) { uasserted(10075, "reduce -> multiple not supported yet"); return; } endSizeEstimate = key.objsize() + (args.objsize() / tuples.size()); if (n == tuples.size()) return; // the input list was too large, add the rest of elmts to new tuples and reduce again // note: would be better to use loop instead of recursion to avoid stack overflow BSONList x; for (; n < tuples.size(); n++) { x.push_back(tuples[n]); } BSONObjBuilder temp(endSizeEstimate); temp.append(key.firstElement()); s->append(temp, "1", "__returnValue"); x.push_back(temp.obj()); _reduce(x, key, endSizeEstimate); } Config::Config(const string& _dbname, const BSONObj& cmdObj) { dbname = _dbname; uassert(ErrorCodes::TypeMismatch, str::stream() << "'mapReduce' must be of type String", cmdObj.firstElement().type() == BSONType::String); nss = NamespaceString(dbname, cmdObj.firstElement().valueStringData()); uassert(ErrorCodes::InvalidNamespace, str::stream() << "Invalid namespace: " << nss.ns(), nss.isValid()); verbose = cmdObj["verbose"].trueValue(); jsMode = cmdObj["jsMode"].trueValue(); splitInfo = 0; if (cmdObj.hasField("splitInfo")) { splitInfo = cmdObj["splitInfo"].Int(); } jsMaxKeys = 500000; reduceTriggerRatio = 10.0; maxInMemSize = 500 * 1024; uassert(13602, "outType is no longer a valid option", cmdObj["outType"].eoo()); outputOptions = parseOutputOptions(dbname, cmdObj); shardedFirstPass = false; if (cmdObj.hasField("shardedFirstPass") && cmdObj["shardedFirstPass"].trueValue()) { massert(16054, "shardedFirstPass should only use replace outType", outputOptions.outType == REPLACE); shardedFirstPass = true; } if (outputOptions.outType != INMEMORY) { // setup temp collection name tempNamespace = NamespaceString( outputOptions.outDB.empty() ? dbname : outputOptions.outDB, str::stream() << "tmp.mr." << cmdObj.firstElement().valueStringData() << "_" << JOB_NUMBER.fetchAndAdd(1)); incLong = NamespaceString(str::stream() << tempNamespace.ns() << "_inc"); } { // scope and code if (cmdObj["scope"].type() == Object) scopeSetup = cmdObj["scope"].embeddedObjectUserCheck().getOwned(); mapper.reset(new JSMapper(cmdObj["map"])); reducer.reset(new JSReducer(cmdObj["reduce"])); if (cmdObj["finalize"].type() && cmdObj["finalize"].trueValue()) finalizer.reset(new JSFinalizer(cmdObj["finalize"])); if (cmdObj["mapparams"].type() == Array) { mapParams = cmdObj["mapparams"].embeddedObjectUserCheck().getOwned(); } } { // query options BSONElement q = cmdObj["query"]; if (q.type() == Object) filter = q.embeddedObjectUserCheck(); else uassert(13608, "query has to be blank or an Object", !q.trueValue()); BSONElement s = cmdObj["sort"]; if (s.type() == Object) sort = s.embeddedObjectUserCheck(); else uassert(13609, "sort has to be blank or an Object", !s.trueValue()); BSONElement collationElt = cmdObj["collation"]; if (collationElt.type() == Object) collation = collationElt.embeddedObjectUserCheck(); else uassert(40082, str::stream() << "mapReduce 'collation' parameter must be of type Object but found type: " << typeName(collationElt.type()), collationElt.eoo()); if (cmdObj["limit"].isNumber()) limit = cmdObj["limit"].numberLong(); else limit = 0; } } /** * Clean up the temporary and incremental collections */ void State::dropTempCollections() { // The cleanup handler should not be interruptible. UninterruptibleLockGuard noInterrupt(_opCtx->lockState()); if (!_config.tempNamespace.isEmpty()) { writeConflictRetry(_opCtx, "M/R dropTempCollections", _config.tempNamespace.ns(), [this] { AutoGetDb autoDb(_opCtx, _config.tempNamespace.db(), MODE_X); if (auto db = autoDb.getDb()) { WriteUnitOfWork wunit(_opCtx); uassert( ErrorCodes::PrimarySteppedDown, str::stream() << "no longer primary while dropping temporary collection for mapReduce: " << _config.tempNamespace.ns(), repl::ReplicationCoordinator::get(_opCtx)->canAcceptWritesFor( _opCtx, _config.tempNamespace)); uassertStatusOK(db->dropCollection(_opCtx, _config.tempNamespace.ns())); wunit.commit(); } }); // Always forget about temporary namespaces, so we don't cache lots of them ShardConnection::forgetNS(_config.tempNamespace.ns()); } if (_useIncremental && !_config.incLong.isEmpty()) { // We don't want to log the deletion of incLong as it isn't replicated. While // harmless, this would lead to a scary looking warning on the secondaries. repl::UnreplicatedWritesBlock uwb(_opCtx); writeConflictRetry(_opCtx, "M/R dropTempCollections", _config.incLong.ns(), [this] { Lock::DBLock lk(_opCtx, _config.incLong.db(), MODE_X); if (Database* db = DatabaseHolder::getDatabaseHolder().get(_opCtx, _config.incLong.ns())) { WriteUnitOfWork wunit(_opCtx); uassertStatusOK(db->dropCollection(_opCtx, _config.incLong.ns())); wunit.commit(); } }); ShardConnection::forgetNS(_config.incLong.ns()); } } /** * Create temporary collection, set up indexes */ void State::prepTempCollection() { if (!_onDisk) return; dropTempCollections(); if (_useIncremental) { // Create the inc collection and make sure we have index on "0" key. // Intentionally not replicating the inc collection to secondaries. repl::UnreplicatedWritesBlock uwb(_opCtx); writeConflictRetry(_opCtx, "M/R prepTempCollection", _config.incLong.ns(), [this] { OldClientWriteContext incCtx(_opCtx, _config.incLong.ns()); WriteUnitOfWork wuow(_opCtx); Collection* incColl = incCtx.getCollection(); invariant(!incColl); CollectionOptions options; options.setNoIdIndex(); options.temp = true; options.uuid.emplace(UUID::gen()); incColl = incCtx.db()->createCollection( _opCtx, _config.incLong.ns(), options, false /* force no _id index */); invariant(incColl); auto rawIndexSpec = BSON("key" << BSON("0" << 1) << "ns" << _config.incLong.ns() << "name" << "_temp_0"); auto indexSpec = uassertStatusOK(index_key_validate::validateIndexSpec( _opCtx, rawIndexSpec, _config.incLong, serverGlobalParams.featureCompatibility)); Status status = incColl->getIndexCatalog() ->createIndexOnEmptyCollection(_opCtx, indexSpec) .getStatus(); if (!status.isOK()) { uasserted(17305, str::stream() << "createIndex failed for mr incLong ns: " << _config.incLong.ns() << " err: " << status.code()); } wuow.commit(); }); } CollectionOptions finalOptions; vector indexesToInsert; { // copy indexes and collection options into temporary storage OldClientWriteContext finalCtx(_opCtx, _config.outputOptions.finalNamespace.ns()); Collection* const finalColl = finalCtx.getCollection(); if (finalColl) { finalOptions = finalColl->getCatalogEntry()->getCollectionOptions(_opCtx); if (_config.finalOutputCollUUID) { // The final output collection's UUID is passed from mongos if the final output // collection is sharded. If a UUID was sent, ensure it matches what's on this // shard. uassert(ErrorCodes::InternalError, str::stream() << "UUID sent by mongos for sharded final output collection " << _config.outputOptions.finalNamespace.ns() << " does not match UUID for the existing collection with that " "name on this shard", finalColl->uuid() == _config.finalOutputCollUUID); } IndexCatalog::IndexIterator ii = finalColl->getIndexCatalog()->getIndexIterator(_opCtx, true); // Iterate over finalColl's indexes. while (ii.more()) { IndexDescriptor* currIndex = ii.next(); BSONObjBuilder b; b.append("ns", _config.tempNamespace.ns()); // Copy over contents of the index descriptor's infoObj. BSONObjIterator j(currIndex->infoObj()); while (j.more()) { BSONElement e = j.next(); if (str::equals(e.fieldName(), "_id") || str::equals(e.fieldName(), "ns")) continue; b.append(e); } indexesToInsert.push_back(b.obj()); } } } writeConflictRetry(_opCtx, "M/R prepTempCollection", _config.tempNamespace.ns(), [&] { // create temp collection and insert the indexes from temporary storage OldClientWriteContext tempCtx(_opCtx, _config.tempNamespace.ns()); WriteUnitOfWork wuow(_opCtx); uassert( ErrorCodes::PrimarySteppedDown, str::stream() << "no longer primary while creating temporary collection for mapReduce: " << _config.tempNamespace.ns(), repl::ReplicationCoordinator::get(_opCtx)->canAcceptWritesFor(_opCtx, _config.tempNamespace)); Collection* tempColl = tempCtx.getCollection(); invariant(!tempColl); CollectionOptions options = finalOptions; options.temp = true; // If a UUID for the final output collection was sent by mongos (i.e., the final output // collection is sharded), use the UUID mongos sent when creating the temp collection. // When the temp collection is renamed to the final output collection, the UUID will be // preserved. options.uuid.emplace(_config.finalOutputCollUUID ? *_config.finalOutputCollUUID : UUID::gen()); // Override createCollection's prohibition on creating new replicated collections without an // _id index. bool buildIdIndex = (options.autoIndexId == CollectionOptions::YES || options.autoIndexId == CollectionOptions::DEFAULT); tempColl = tempCtx.db()->createCollection( _opCtx, _config.tempNamespace.ns(), options, buildIdIndex); for (vector::iterator it = indexesToInsert.begin(); it != indexesToInsert.end(); ++it) { Status status = tempColl->getIndexCatalog()->createIndexOnEmptyCollection(_opCtx, *it).getStatus(); if (!status.isOK()) { if (status.code() == ErrorCodes::IndexAlreadyExists) { continue; } uassertStatusOK(status); } // Log the createIndex operation. auto uuid = tempColl->uuid(); getGlobalServiceContext()->getOpObserver()->onCreateIndex( _opCtx, _config.tempNamespace, uuid, *it, false); } wuow.commit(); }); } /** * For inline mode, appends results to output object. * Makes sure (key, value) tuple is formatted as {_id: key, value: val} */ void State::appendResults(BSONObjBuilder& final) { if (_onDisk) { if (!_config.outputOptions.outDB.empty()) { BSONObjBuilder loc; if (!_config.outputOptions.outDB.empty()) loc.append("db", _config.outputOptions.outDB); if (!_config.outputOptions.collectionName.empty()) loc.append("collection", _config.outputOptions.collectionName); final.append("result", loc.obj()); } else { if (!_config.outputOptions.collectionName.empty()) final.append("result", _config.outputOptions.collectionName); } if (_config.splitInfo > 0) { // add split points, used for shard BSONObj res; BSONObj idKey = BSON("_id" << 1); if (!_db.runCommand("admin", BSON("splitVector" << _config.outputOptions.finalNamespace.ns() << "keyPattern" << idKey << "maxChunkSizeBytes" << _config.splitInfo), res)) { uasserted(15921, str::stream() << "splitVector failed: " << res); } if (res.hasField("splitKeys")) final.append(res.getField("splitKeys")); } return; } if (_jsMode) { ScriptingFunction getResult = _scope->createFunction( "var map = _mrMap;" "var result = [];" "for (key in map) {" " result.push({_id: key, value: map[key]});" "}" "return result;"); _scope->invoke(getResult, 0, 0, 0, false); BSONObj obj = _scope->getObject("__returnValue"); final.append("results", BSONArray(obj)); return; } uassert(13604, "too much data for in memory map/reduce", _size < BSONObjMaxUserSize); BSONArrayBuilder b((int)(_size * 1.2)); // _size is data size, doesn't count overhead and keys for (const auto& entry : *_temp) { const BSONObj& key = entry.first; const BSONList& all = entry.second; verify(all.size() == 1); BSONObjIterator vi(all[0]); vi.next(); BSONObjBuilder temp(b.subobjStart()); temp.appendAs(key.firstElement(), "_id"); temp.appendAs(vi.next(), "value"); temp.done(); } BSONArray res = b.arr(); final.append("results", res); } /** * Does post processing on output collection. * This may involve replacing, merging or reducing. */ long long State::postProcessCollection(OperationContext* opCtx, CurOp* curOp, ProgressMeterHolder& pm) { if (_onDisk == false || _config.outputOptions.outType == Config::INMEMORY) return numInMemKeys(); bool holdingGlobalLock = false; if (_config.outputOptions.outNonAtomic) return postProcessCollectionNonAtomic(opCtx, curOp, pm, holdingGlobalLock); invariant(!opCtx->lockState()->isLocked()); // This must be global because we may write across different databases. Lock::GlobalWrite lock(opCtx); holdingGlobalLock = true; return postProcessCollectionNonAtomic(opCtx, curOp, pm, holdingGlobalLock); } namespace { // Runs a count against the namespace specified by 'ns'. If the caller holds the global write lock, // then this function does not acquire any additional locks. unsigned long long _collectionCount(OperationContext* opCtx, const NamespaceString& nss, bool callerHoldsGlobalLock) { Collection* coll = nullptr; boost::optional ctx; // If the global write lock is held, we must avoid using AutoGetCollectionForReadCommand as it // may lead to deadlock when waiting for a majority snapshot to be committed. See SERVER-24596. if (callerHoldsGlobalLock) { Database* db = DatabaseHolder::getDatabaseHolder().get(opCtx, nss.ns()); if (db) { coll = db->getCollection(opCtx, nss); } } else { ctx.emplace(opCtx, nss); coll = ctx->getCollection(); } return coll ? coll->numRecords(opCtx) : 0; } } // namespace long long State::postProcessCollectionNonAtomic(OperationContext* opCtx, CurOp* curOp, ProgressMeterHolder& pm, bool callerHoldsGlobalLock) { if (_config.outputOptions.finalNamespace == _config.tempNamespace) return _collectionCount(opCtx, _config.outputOptions.finalNamespace, callerHoldsGlobalLock); if (_config.outputOptions.outType == Config::REPLACE || _collectionCount(opCtx, _config.outputOptions.finalNamespace, callerHoldsGlobalLock) == 0) { // This must be global because we may write across different databases. Lock::GlobalWrite lock(opCtx); // replace: just rename from temp to final collection name, dropping previous collection _db.dropCollection(_config.outputOptions.finalNamespace.ns()); BSONObj info; if (!_db.runCommand("admin", BSON("renameCollection" << _config.tempNamespace.ns() << "to" << _config.outputOptions.finalNamespace.ns() << "stayTemp" << _config.shardedFirstPass), info)) { uasserted(10076, str::stream() << "rename failed: " << info); } _db.dropCollection(_config.tempNamespace.ns()); } else if (_config.outputOptions.outType == Config::MERGE) { // merge: upsert new docs into old collection { const auto count = _collectionCount(opCtx, _config.tempNamespace, callerHoldsGlobalLock); stdx::lock_guard lk(*opCtx->getClient()); curOp->setMessage_inlock( "m/r: merge post processing", "M/R Merge Post Processing Progress", count); } unique_ptr cursor = _db.query(_config.tempNamespace.ns(), BSONObj()); while (cursor->more()) { Lock::DBLock lock(opCtx, _config.outputOptions.finalNamespace.db(), MODE_X); BSONObj o = cursor->nextSafe(); Helpers::upsert(opCtx, _config.outputOptions.finalNamespace.ns(), o); pm.hit(); } _db.dropCollection(_config.tempNamespace.ns()); pm.finished(); } else if (_config.outputOptions.outType == Config::REDUCE) { // reduce: apply reduce op on new result and existing one BSONList values; { const auto count = _collectionCount(opCtx, _config.tempNamespace, callerHoldsGlobalLock); stdx::lock_guard lk(*opCtx->getClient()); curOp->setMessage_inlock( "m/r: reduce post processing", "M/R Reduce Post Processing Progress", count); } unique_ptr cursor = _db.query(_config.tempNamespace.ns(), BSONObj()); while (cursor->more()) { // This must be global because we may write across different databases. Lock::GlobalWrite lock(opCtx); BSONObj temp = cursor->nextSafe(); BSONObj old; bool found; { OldClientContext tx(opCtx, _config.outputOptions.finalNamespace.ns()); Collection* coll = getCollectionOrUassert(opCtx, tx.db(), _config.outputOptions.finalNamespace); found = Helpers::findOne(opCtx, coll, temp["_id"].wrap(), old, true); } if (found) { // need to reduce values.clear(); values.push_back(temp); values.push_back(old); Helpers::upsert(opCtx, _config.outputOptions.finalNamespace.ns(), _config.reducer->finalReduce(values, _config.finalizer.get())); } else { Helpers::upsert(opCtx, _config.outputOptions.finalNamespace.ns(), temp); } pm.hit(); } pm.finished(); } return _collectionCount(opCtx, _config.outputOptions.finalNamespace, callerHoldsGlobalLock); } /** * Insert doc in collection. This should be replicated. */ void State::insert(const NamespaceString& nss, const BSONObj& o) { verify(_onDisk); writeConflictRetry(_opCtx, "M/R insert", nss.ns(), [this, &nss, &o] { OldClientWriteContext ctx(_opCtx, nss.ns()); WriteUnitOfWork wuow(_opCtx); uassert( ErrorCodes::PrimarySteppedDown, str::stream() << "no longer primary while inserting mapReduce result into collection: " << nss.ns() << ": " << redact(o), repl::ReplicationCoordinator::get(_opCtx)->canAcceptWritesFor(_opCtx, nss)); Collection* coll = getCollectionOrUassert(_opCtx, ctx.db(), nss); BSONObjBuilder b; if (!o.hasField("_id")) { b.appendOID("_id", NULL, true); } b.appendElements(o); BSONObj bo = b.obj(); StatusWith res = fixDocumentForInsert(_opCtx->getServiceContext(), bo); uassertStatusOK(res.getStatus()); if (!res.getValue().isEmpty()) { bo = res.getValue(); } // TODO: Consider whether to pass OpDebug for stats tracking under SERVER-23261. OpDebug* const nullOpDebug = nullptr; uassertStatusOK(coll->insertDocument(_opCtx, InsertStatement(bo), nullOpDebug, true)); wuow.commit(); }); } /** * Insert doc into the inc collection. This should not be replicated. */ void State::_insertToInc(BSONObj& o) { verify(_onDisk); writeConflictRetry(_opCtx, "M/R insertToInc", _config.incLong.ns(), [this, &o] { OldClientWriteContext ctx(_opCtx, _config.incLong.ns()); WriteUnitOfWork wuow(_opCtx); Collection* coll = getCollectionOrUassert(_opCtx, ctx.db(), _config.incLong); repl::UnreplicatedWritesBlock uwb(_opCtx); // The documents inserted into the incremental collection are of the form // {"0": , "1": }, so we cannot call fixDocumentForInsert(o) here because the // check that the document has an "_id" field would fail. Instead, we directly verify that // the size of the document to insert is smaller than 16MB. if (o.objsize() > BSONObjMaxUserSize) { uasserted(ErrorCodes::BadValue, str::stream() << "object to insert too large for incremental collection" << ". size in bytes: " << o.objsize() << ", max size: " << BSONObjMaxUserSize); } // TODO: Consider whether to pass OpDebug for stats tracking under SERVER-23261. OpDebug* const nullOpDebug = nullptr; uassertStatusOK(coll->insertDocument(_opCtx, InsertStatement(o), nullOpDebug, true, false)); wuow.commit(); }); } State::State(OperationContext* opCtx, const Config& c) : _config(c), _db(opCtx), _useIncremental(true), _opCtx(opCtx), _size(0), _dupCount(0), _numEmits(0) { _temp.reset(new InMemory()); _onDisk = _config.outputOptions.outType != Config::INMEMORY; } bool State::sourceExists() { return _db.exists(_config.nss.ns()); } State::~State() { if (_onDisk) { try { dropTempCollections(); } catch (...) { error() << "Unable to drop temporary collection created by mapReduce: " << _config.tempNamespace << ". This collection will be removed automatically " "the next time the server starts up. " << exceptionToStatus(); } } if (_scope && !_scope->isKillPending() && _scope->getError().empty()) { // cleanup js objects try { ScriptingFunction cleanup = _scope->createFunction("delete _emitCt; delete _keyCt; delete _mrMap;"); _scope->invoke(cleanup, 0, 0, 0, true); } catch (const DBException&) { // not important because properties will be reset if scope is reused LOG(1) << "MapReduce terminated during state destruction"; } } } /** * Initialize the mapreduce operation, creating the inc collection */ void State::init() { // setup js const string userToken = AuthorizationSession::get(Client::getCurrent())->getAuthenticatedUserNamesToken(); _scope.reset(getGlobalScriptEngine()->newScopeForCurrentThread()); _scope->requireOwnedObjects(); _scope->registerOperation(_opCtx); _scope->setLocalDB(_config.dbname); _scope->loadStored(_opCtx, true); if (!_config.scopeSetup.isEmpty()) _scope->init(&_config.scopeSetup); _config.mapper->init(this); _config.reducer->init(this); if (_config.finalizer) _config.finalizer->init(this); _scope->setBoolean("_doFinal", _config.finalizer.get() != 0); switchMode(_config.jsMode); // set up js-mode based on Config // global JS map/reduce hashmap // we use a standard JS object which means keys are only simple types // we could also add a real hashmap from a library and object comparison methods // for increased performance, we may want to look at v8 Harmony Map support // _scope->setObject("_mrMap", BSONObj(), false); ScriptingFunction init = _scope->createFunction( "_emitCt = 0;" "_keyCt = 0;" "_dupCt = 0;" "_redCt = 0;" "if (typeof(_mrMap) === 'undefined') {" " _mrMap = {};" "}"); _scope->invoke(init, 0, 0, 0, true); // js function to run reduce on all keys // redfunc = _scope->createFunction("for (var key in hashmap) { print('Key is ' + key); // list = hashmap[key]; ret = reduce(key, list); print('Value is ' + ret); };"); _reduceAll = _scope->createFunction( "var map = _mrMap;" "var list, ret;" "for (var key in map) {" " list = map[key];" " if (list.length != 1) {" " ret = _reduce(key, list);" " map[key] = [ret];" " ++_redCt;" " }" "}" "_dupCt = 0;"); massert(16717, "error initializing JavaScript reduceAll function", _reduceAll != 0); _reduceAndEmit = _scope->createFunction( "var map = _mrMap;" "var list, ret;" "for (var key in map) {" " list = map[key];" " if (list.length == 1)" " ret = list[0];" " else {" " ret = _reduce(key, list);" " ++_redCt;" " }" " emit(key, ret);" "}" "delete _mrMap;"); massert(16718, "error initializing JavaScript reduce/emit function", _reduceAndEmit != 0); _reduceAndFinalize = _scope->createFunction( "var map = _mrMap;" "var list, ret;" "for (var key in map) {" " list = map[key];" " if (list.length == 1) {" " if (!_doFinal) { continue; }" " ret = list[0];" " }" " else {" " ret = _reduce(key, list);" " ++_redCt;" " }" " if (_doFinal)" " ret = _finalize(key, ret);" " map[key] = ret;" "}"); massert(16719, "error creating JavaScript reduce/finalize function", _reduceAndFinalize != 0); _reduceAndFinalizeAndInsert = _scope->createFunction( "var map = _mrMap;" "var list, ret;" "for (var key in map) {" " list = map[key];" " if (list.length == 1)" " ret = list[0];" " else {" " ret = _reduce(key, list);" " ++_redCt;" " }" " if (_doFinal)" " ret = _finalize(key, ret);" " _nativeToTemp({_id: key, value: ret});" "}"); massert(16720, "error initializing JavaScript functions", _reduceAndFinalizeAndInsert != 0); } void State::switchMode(bool jsMode) { _jsMode = jsMode; if (jsMode) { // emit function that stays in JS _scope->setFunction("emit", "function(key, value) {" " if (typeof(key) === 'object') {" " _bailFromJS(key, value);" " return;" " }" " ++_emitCt;" " var map = _mrMap;" " var list = map[key];" " if (!list) {" " ++_keyCt;" " list = [];" " map[key] = list;" " }" " else" " ++_dupCt;" " list.push(value);" "}"); _scope->injectNative("_bailFromJS", _bailFromJS, this); } else { // emit now populates C++ map _scope->injectNative("emit", fast_emit, this); } } void State::bailFromJS() { LOG(1) << "M/R: Switching from JS mode to mixed mode"; // reduce and reemit into c++ switchMode(false); _scope->invoke(_reduceAndEmit, 0, 0, 0, true); // need to get the real number emitted so far _numEmits = _scope->getNumberInt("_emitCt"); _config.reducer->numReduces = _scope->getNumberInt("_redCt"); } Collection* State::getCollectionOrUassert(OperationContext* opCtx, Database* db, const NamespaceString& nss) { UninterruptibleLockGuard noInterrupt(opCtx->lockState()); Collection* out = db ? db->getCollection(opCtx, nss) : NULL; uassert(18697, "Collection unexpectedly disappeared: " + nss.ns(), out); return out; } /** * Applies last reduce and finalize on a list of tuples (key, val) * Inserts single result {_id: key, value: val} into temp collection */ void State::finalReduce(BSONList& values) { if (!_onDisk || values.size() == 0) return; BSONObj res = _config.reducer->finalReduce(values, _config.finalizer.get()); insert(_config.tempNamespace, res); } BSONObj _nativeToTemp(const BSONObj& args, void* data) { State* state = (State*)data; BSONObjIterator it(args); state->insert(state->_config.tempNamespace, it.next().Obj()); return BSONObj(); } // BSONObj _nativeToInc( const BSONObj& args, void* data ) { // State* state = (State*) data; // BSONObjIterator it(args); // const BSONObj& obj = it.next().Obj(); // state->_insertToInc(const_cast(obj)); // return BSONObj(); // } /** * Applies last reduce and finalize. * After calling this method, the temp collection will be completed. * If inline, the results will be in the in memory map */ void State::finalReduce(OperationContext* opCtx, CurOp* curOp, ProgressMeterHolder& pm) { if (_jsMode) { // apply the reduce within JS if (_onDisk) { _scope->injectNative("_nativeToTemp", _nativeToTemp, this); _scope->invoke(_reduceAndFinalizeAndInsert, 0, 0, 0, true); return; } else { _scope->invoke(_reduceAndFinalize, 0, 0, 0, true); return; } } if (!_onDisk) { // all data has already been reduced, just finalize if (_config.finalizer) { long size = 0; for (InMemory::iterator i = _temp->begin(); i != _temp->end(); ++i) { BSONObj key = i->first; BSONList& all = i->second; verify(all.size() == 1); BSONObj res = _config.finalizer->finalize(all[0]); all.clear(); all.push_back(res); size += res.objsize(); } _size = size; } return; } // use index on "0" to pull sorted data verify(_temp->size() == 0); BSONObj sortKey = BSON("0" << 1); writeConflictRetry(_opCtx, "finalReduce", _config.incLong.ns(), [&] { OldClientWriteContext incCtx(_opCtx, _config.incLong.ns()); WriteUnitOfWork wuow(_opCtx); Collection* incColl = getCollectionOrUassert(_opCtx, incCtx.db(), _config.incLong); bool foundIndex = false; IndexCatalog::IndexIterator ii = incColl->getIndexCatalog()->getIndexIterator(_opCtx, true); // Iterate over incColl's indexes. while (ii.more()) { IndexDescriptor* currIndex = ii.next(); BSONObj x = currIndex->infoObj(); if (sortKey.woCompare(x["key"].embeddedObject()) == 0) { foundIndex = true; break; } } verify(foundIndex); wuow.commit(); }); unique_ptr ctx( new AutoGetCollectionForReadCommand(_opCtx, _config.incLong)); BSONObj prev; BSONList all; { const auto count = _db.count(_config.incLong.ns(), BSONObj(), QueryOption_SlaveOk); stdx::lock_guard lk(*_opCtx->getClient()); verify(pm == curOp->setMessage_inlock("m/r: (3/3) final reduce to collection", "M/R: (3/3) Final Reduce Progress", count)); } const ExtensionsCallbackReal extensionsCallback(_opCtx, &_config.incLong); auto qr = stdx::make_unique(_config.incLong); qr->setSort(sortKey); const boost::intrusive_ptr expCtx; auto statusWithCQ = CanonicalQuery::canonicalize(opCtx, std::move(qr), expCtx, extensionsCallback, MatchExpressionParser::kAllowAllSpecialFeatures); verify(statusWithCQ.isOK()); std::unique_ptr cq = std::move(statusWithCQ.getValue()); Collection* coll = getCollectionOrUassert(opCtx, ctx->getDb(), _config.incLong); invariant(coll); auto exec = uassertStatusOK(getExecutor( _opCtx, coll, std::move(cq), PlanExecutor::YIELD_AUTO, QueryPlannerParams::NO_TABLE_SCAN)); // Make sure the PlanExecutor is destroyed while holding a collection lock. ON_BLOCK_EXIT([&exec, &ctx, opCtx, this] { if (!ctx) { AutoGetCollection autoColl(opCtx, _config.incLong, MODE_IS); exec.reset(); } }); // iterate over all sorted objects BSONObj o; PlanExecutor::ExecState state; while (PlanExecutor::ADVANCED == (state = exec->getNext(&o, NULL))) { o = o.getOwned(); // we will be accessing outside of the lock pm.hit(); if (dps::compareObjectsAccordingToSort(o, prev, sortKey) == 0) { // object is same as previous, add to array all.push_back(o); if (pm->hits() % 100 == 0) { _opCtx->checkForInterrupt(); } continue; } exec->saveState(); ctx.reset(); // reduce a finalize array finalReduce(all); ctx.reset(new AutoGetCollectionForReadCommand(_opCtx, _config.incLong)); all.clear(); prev = o; all.push_back(o); _opCtx->checkForInterrupt(); uassertStatusOK(exec->restoreState()); } uassert(34428, "Plan executor error during mapReduce command: " + WorkingSetCommon::toStatusString(o), PlanExecutor::IS_EOF == state); ctx.reset(); // reduce and finalize last array finalReduce(all); ctx.reset(new AutoGetCollectionForReadCommand(_opCtx, _config.incLong)); pm.finished(); } /** * Attempts to reduce objects in the memory map. * A new memory map will be created to hold the results. * If applicable, objects with unique key may be dumped to inc collection. * Input and output objects are both {"0": key, "1": val} */ void State::reduceInMemory() { if (_jsMode) { // in js mode the reduce is applied when writing to collection return; } unique_ptr n(new InMemory()); // for new data long nSize = 0; _dupCount = 0; for (InMemory::iterator i = _temp->begin(); i != _temp->end(); ++i) { BSONList& all = i->second; if (all.size() == 1) { // only 1 value for this key if (_onDisk) { // this key has low cardinality, so just write to collection _insertToInc(*(all.begin())); } else { // add to new map nSize += _add(n.get(), all[0]); } } else if (all.size() > 1) { // several values, reduce and add to map BSONObj res = _config.reducer->reduce(all); nSize += _add(n.get(), res); } } // swap maps _temp.reset(n.release()); _size = nSize; } /** * Dumps the entire in memory map to the inc collection. */ void State::dumpToInc() { if (!_onDisk) return; for (InMemory::iterator i = _temp->begin(); i != _temp->end(); i++) { BSONList& all = i->second; if (all.size() < 1) continue; for (BSONList::iterator j = all.begin(); j != all.end(); j++) _insertToInc(*j); } _temp->clear(); _size = 0; } /** * Adds object to in memory map */ void State::emit(const BSONObj& a) { _numEmits++; _size += _add(_temp.get(), a); } int State::_add(InMemory* im, const BSONObj& a) { BSONList& all = (*im)[a]; all.push_back(a); if (all.size() > 1) { ++_dupCount; } return a.objsize() + 16; } void State::reduceAndSpillInMemoryStateIfNeeded() { // Make sure no DB locks are held, because this method manages its own locking and // write units of work. invariant(!_opCtx->lockState()->isLocked()); if (_jsMode) { // try to reduce if it is beneficial int dupCt = _scope->getNumberInt("_dupCt"); int keyCt = _scope->getNumberInt("_keyCt"); if (keyCt > _config.jsMaxKeys) { // too many keys for JS, switch to mixed _bailFromJS(BSONObj(), this); // then fall through to check map size } else if (dupCt > (keyCt * _config.reduceTriggerRatio)) { // reduce now to lower mem usage Timer t; _scope->invoke(_reduceAll, 0, 0, 0, true); LOG(3) << " MR - did reduceAll: keys=" << keyCt << " dups=" << dupCt << " newKeys=" << _scope->getNumberInt("_keyCt") << " time=" << t.millis() << "ms"; return; } } if (_jsMode) return; if (_size > _config.maxInMemSize || _dupCount > (_temp->size() * _config.reduceTriggerRatio)) { // attempt to reduce in memory map, if memory is too high or we have many duplicates long oldSize = _size; Timer t; reduceInMemory(); LOG(3) << " MR - did reduceInMemory: size=" << oldSize << " dups=" << _dupCount << " newSize=" << _size << " time=" << t.millis() << "ms"; // if size is still high, or values are not reducing well, dump if (_onDisk && (_size > _config.maxInMemSize || _size > oldSize / 2)) { dumpToInc(); LOG(3) << " MR - dumping to db"; } } } /** * emit that will be called by js function */ BSONObj fast_emit(const BSONObj& args, void* data) { uassert(10077, "fast_emit takes 2 args", args.nFields() == 2); uassert(13069, "an emit can't be more than half max bson size", args.objsize() < (BSONObjMaxUserSize / 2)); State* state = (State*)data; if (args.firstElement().type() == Undefined) { BSONObjBuilder b(args.objsize()); b.appendNull(""); BSONObjIterator i(args); i.next(); b.append(i.next()); state->emit(b.obj()); } else { state->emit(args); } return BSONObj(); } /** * function is called when we realize we cant use js mode for m/r on the 1st key */ BSONObj _bailFromJS(const BSONObj& args, void* data) { State* state = (State*)data; state->bailFromJS(); // emit this particular key if there is one if (!args.isEmpty()) { fast_emit(args, data); } return BSONObj(); } /** * This class represents a map/reduce command executed on a single server */ class MapReduceCommand : public ErrmsgCommandDeprecated { public: MapReduceCommand() : ErrmsgCommandDeprecated("mapReduce", "mapreduce") {} AllowedOnSecondary secondaryAllowed(ServiceContext* serviceContext) const override { if (repl::ReplicationCoordinator::get(serviceContext)->getReplicationMode() != repl::ReplicationCoordinator::modeReplSet) { return AllowedOnSecondary::kAlways; } return AllowedOnSecondary::kOptIn; } std::size_t reserveBytesForReply() const override { return FindCommon::kInitReplyBufferSize; } std::string help() const override { return "Run a map/reduce operation on the server.\n" "Note this is used for aggregation, not querying, in MongoDB.\n" "http://dochub.mongodb.org/core/mapreduce"; } virtual bool supportsWriteConcern(const BSONObj& cmd) const override { return mrSupportsWriteConcern(cmd); } virtual void addRequiredPrivileges(const std::string& dbname, const BSONObj& cmdObj, std::vector* out) const { addPrivilegesRequiredForMapReduce(this, dbname, cmdObj, out); } bool errmsgRun(OperationContext* opCtx, const string& dbname, const BSONObj& cmd, string& errmsg, BSONObjBuilder& result) { Timer t; // Don't let a lock acquisition in map-reduce get interrupted. UninterruptibleLockGuard noInterrupt(opCtx->lockState()); boost::optional maybeDisableValidation; if (shouldBypassDocumentValidationForCommand(cmd)) maybeDisableValidation.emplace(opCtx); auto client = opCtx->getClient(); if (client->isInDirectClient()) { return CommandHelpers::appendCommandStatus( result, Status(ErrorCodes::IllegalOperation, "Cannot run mapReduce command from eval()")); } auto curOp = CurOp::get(opCtx); const Config config(dbname, cmd); LOG(1) << "mr ns: " << config.nss; uassert(16149, "cannot run map reduce without the js engine", getGlobalScriptEngine()); // Prevent sharding state from changing during the MR. const auto collMetadata = [&] { // Get metadata before we check our version, to make sure it doesn't increment in the // meantime AutoGetCollectionForReadCommand autoColl(opCtx, config.nss); return CollectionShardingState::get(opCtx, config.nss)->getMetadata(opCtx); }(); bool shouldHaveData = false; BSONObjBuilder countsBuilder; BSONObjBuilder timingBuilder; try { State state(opCtx, config); if (!state.sourceExists()) { return CommandHelpers::appendCommandStatus( result, Status(ErrorCodes::NamespaceNotFound, str::stream() << "namespace does not exist: " << config.nss.ns())); } state.init(); state.prepTempCollection(); int64_t progressTotal = 0; bool showTotal = true; if (state.config().filter.isEmpty()) { const bool holdingGlobalLock = false; const auto count = _collectionCount(opCtx, config.nss, holdingGlobalLock); progressTotal = (config.limit && static_cast(config.limit) < count) ? config.limit : count; } else { showTotal = false; // Set an arbitrary total > 0 so the meter will be activated. progressTotal = 1; } stdx::unique_lock lk(*opCtx->getClient()); ProgressMeter& progress(curOp->setMessage_inlock( "m/r: (1/3) emit phase", "M/R: (1/3) Emit Progress", progressTotal)); lk.unlock(); progress.showTotal(showTotal); ProgressMeterHolder pm(progress); long long mapTime = 0; long long reduceTime = 0; long long numInputs = 0; { // We've got a cursor preventing migrations off, now re-establish our // useful cursor. // Need lock and context to use it unique_ptr scopedAutoDb(new AutoGetDb(opCtx, config.nss.db(), MODE_S)); if (state.isOnDisk()) { // this means that it will be doing a write operation, make sure it is safe to // do so. if (!repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, config.nss)) { uasserted(ErrorCodes::NotMaster, "not master"); return false; } } auto qr = stdx::make_unique(config.nss); qr->setFilter(config.filter); qr->setSort(config.sort); qr->setCollation(config.collation); const ExtensionsCallbackReal extensionsCallback(opCtx, &config.nss); const boost::intrusive_ptr expCtx; auto statusWithCQ = CanonicalQuery::canonicalize(opCtx, std::move(qr), expCtx, extensionsCallback, MatchExpressionParser::kAllowAllSpecialFeatures); if (!statusWithCQ.isOK()) { uasserted(17238, "Can't canonicalize query " + config.filter.toString()); return 0; } std::unique_ptr cq = std::move(statusWithCQ.getValue()); unique_ptr exec; { Database* db = scopedAutoDb->getDb(); Collection* coll = State::getCollectionOrUassert(opCtx, db, config.nss); invariant(coll); exec = uassertStatusOK( getExecutor(opCtx, coll, std::move(cq), PlanExecutor::YIELD_AUTO, 0)); } // Make sure the PlanExecutor is destroyed while holding the necessary locks. ON_BLOCK_EXIT([&exec, &scopedAutoDb, opCtx, &config] { if (!scopedAutoDb) { scopedAutoDb = stdx::make_unique(opCtx, config.nss.db(), MODE_S); exec.reset(); } }); { stdx::lock_guard lk(*opCtx->getClient()); CurOp::get(opCtx)->setPlanSummary_inlock(Explain::getPlanSummary(exec.get())); } Timer mt; // go through each doc BSONObj o; PlanExecutor::ExecState execState; while (PlanExecutor::ADVANCED == (execState = exec->getNext(&o, NULL))) { o = o.getOwned(); // we will be accessing outside of the lock // check to see if this is a new object we don't own yet // because of a chunk migration if (collMetadata) { ShardKeyPattern kp(collMetadata->getKeyPattern()); if (!collMetadata->keyBelongsToMe(kp.extractShardKeyFromDoc(o))) { continue; } } // do map if (config.verbose) mt.reset(); config.mapper->map(o); if (config.verbose) mapTime += mt.micros(); // Check if the state accumulated so far needs to be written to a // collection. This may yield the DB lock temporarily and then // acquire it again. // numInputs++; if (numInputs % 100 == 0) { Timer t; // TODO: As an optimization, we might want to do the save/restore // state and yield inside the reduceAndSpillInMemoryState method, so // it only happens if necessary. exec->saveState(); scopedAutoDb.reset(); state.reduceAndSpillInMemoryStateIfNeeded(); scopedAutoDb.reset(new AutoGetDb(opCtx, config.nss.db(), MODE_S)); auto restoreStatus = exec->restoreState(); if (!restoreStatus.isOK()) { return CommandHelpers::appendCommandStatus(result, restoreStatus); } reduceTime += t.micros(); opCtx->checkForInterrupt(); } pm.hit(); if (config.limit && numInputs >= config.limit) break; } if (PlanExecutor::DEAD == execState || PlanExecutor::FAILURE == execState) { return CommandHelpers::appendCommandStatus( result, Status(ErrorCodes::OperationFailed, str::stream() << "Executor error during mapReduce command: " << WorkingSetCommon::toStatusString(o))); } // Record the indexes used by the PlanExecutor. PlanSummaryStats stats; Explain::getSummaryStats(*exec, &stats); // TODO SERVER-23261: Confirm whether this is the correct place to gather all // metrics. There is no harm adding here for the time being. curOp->debug().setPlanSummaryMetrics(stats); Collection* coll = scopedAutoDb->getDb()->getCollection(opCtx, config.nss); invariant(coll); // 'exec' hasn't been killed, so collection must be alive. coll->infoCache()->notifyOfQuery(opCtx, stats.indexesUsed); if (curOp->shouldDBProfile()) { BSONObjBuilder execStatsBob; Explain::getWinningPlanStats(exec.get(), &execStatsBob); curOp->debug().execStats = execStatsBob.obj(); } } pm.finished(); opCtx->checkForInterrupt(); // update counters countsBuilder.appendNumber("input", numInputs); countsBuilder.appendNumber("emit", state.numEmits()); if (state.numEmits()) shouldHaveData = true; timingBuilder.appendNumber("mapTime", mapTime / 1000); timingBuilder.append("emitLoop", t.millis()); { stdx::lock_guard lk(*opCtx->getClient()); curOp->setMessage_inlock("m/r: (2/3) final reduce in memory", "M/R: (2/3) Final In-Memory Reduce Progress"); } Timer rt; // do reduce in memory // this will be the last reduce needed for inline mode state.reduceInMemory(); // if not inline: dump the in memory map to inc collection, all data is on disk state.dumpToInc(); // final reduce state.finalReduce(opCtx, curOp, pm); reduceTime += rt.micros(); // Ensure the profile shows the source namespace. If the output was not inline, the // active namespace will be the temporary collection we inserted into. { stdx::lock_guard lk(*opCtx->getClient()); curOp->setNS_inlock(config.nss.ns()); } countsBuilder.appendNumber("reduce", state.numReduces()); timingBuilder.appendNumber("reduceTime", reduceTime / 1000); timingBuilder.append("mode", state.jsMode() ? "js" : "mixed"); long long finalCount = state.postProcessCollection(opCtx, curOp, pm); state.appendResults(result); timingBuilder.appendNumber("total", t.millis()); result.appendNumber("timeMillis", t.millis()); countsBuilder.appendNumber("output", finalCount); if (config.verbose) result.append("timing", timingBuilder.obj()); result.append("counts", countsBuilder.obj()); if (finalCount == 0 && shouldHaveData) { result.append("cmd", cmd); errmsg = "there were emits but no data!"; return false; } } catch (StaleConfigException& e) { log() << "mr detected stale config, should retry" << redact(e); throw; } // TODO: The error handling code for queries is v. fragile, // *requires* rethrow AssertionExceptions - should probably fix. catch (AssertionException& e) { log() << "mr failed, removing collection" << redact(e); throw; } catch (std::exception& e) { log() << "mr failed, removing collection" << causedBy(e); throw; } catch (...) { log() << "mr failed for unknown reason, removing collection"; throw; } return true; } } mapReduceCommand; /** * This class represents a map/reduce command executed on the output server of a sharded env */ class MapReduceFinishCommand : public BasicCommand { public: std::string help() const override { return "internal"; } MapReduceFinishCommand() : BasicCommand("mapreduce.shardedfinish") {} AllowedOnSecondary secondaryAllowed(ServiceContext* serviceContext) const override { if (repl::ReplicationCoordinator::get(serviceContext)->getReplicationMode() != repl::ReplicationCoordinator::modeReplSet) { return AllowedOnSecondary::kAlways; } return AllowedOnSecondary::kOptIn; } virtual bool supportsWriteConcern(const BSONObj& cmd) const override { return true; } virtual void addRequiredPrivileges(const std::string& dbname, const BSONObj& cmdObj, std::vector* out) const { ActionSet actions; actions.addAction(ActionType::internal); out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); } bool run(OperationContext* opCtx, const string& dbname, const BSONObj& cmdObj, BSONObjBuilder& result) { if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { return CommandHelpers::appendCommandStatus( result, Status(ErrorCodes::CommandNotSupported, str::stream() << "Can not execute mapReduce with output database " << dbname << " which lives on config servers")); } // Don't let any lock acquisitions get interrupted. UninterruptibleLockGuard noInterrupt(opCtx->lockState()); boost::optional maybeDisableValidation; if (shouldBypassDocumentValidationForCommand(cmdObj)) maybeDisableValidation.emplace(opCtx); // legacy name const auto shardedOutputCollectionElt = cmdObj["shardedOutputCollection"]; uassert(ErrorCodes::InvalidNamespace, "'shardedOutputCollection' must be of type String", shardedOutputCollectionElt.type() == BSONType::String); const std::string shardedOutputCollection = shardedOutputCollectionElt.str(); verify(shardedOutputCollection.size() > 0); std::string inputNS; if (cmdObj["inputDB"].type() == String) { inputNS = NamespaceString(cmdObj["inputDB"].valueStringData(), shardedOutputCollection).ns(); } else { inputNS = NamespaceString(dbname, shardedOutputCollection).ns(); } CurOp* curOp = CurOp::get(opCtx); Config config(dbname, cmdObj.firstElement().embeddedObjectUserCheck()); if (cmdObj["finalOutputCollIsSharded"].trueValue()) { uassert(ErrorCodes::InvalidOptions, "This shard has feature compatibility version 3.6, so it expects mongos to " "send the UUID to use for the sharded output collection. Was the mapReduce " "request sent from a 3.4 mongos?", cmdObj.hasField("shardedOutputCollUUID")); config.finalOutputCollUUID = uassertStatusOK(UUID::parse(cmdObj["shardedOutputCollUUID"])); } State state(opCtx, config); state.init(); // no need for incremental collection because records are already sorted state._useIncremental = false; config.incLong = config.tempNamespace; BSONObj shardCounts = cmdObj["shardCounts"].embeddedObjectUserCheck(); BSONObj counts = cmdObj["counts"].embeddedObjectUserCheck(); stdx::unique_lock lk(*opCtx->getClient()); ProgressMeterHolder pm(curOp->setMessage_inlock("m/r: merge sort and reduce", "M/R Merge Sort and Reduce Progress")); lk.unlock(); set servers; { // Parse per shard results BSONObjIterator i(shardCounts); while (i.more()) { BSONElement e = i.next(); std::string server = e.fieldName(); servers.insert(server); uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, server)); } } state.prepTempCollection(); std::vector chunks; if (config.outputOptions.outType != Config::OutputType::INMEMORY) { auto outRoutingInfoStatus = Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo( opCtx, config.outputOptions.finalNamespace); if (!outRoutingInfoStatus.isOK()) { return CommandHelpers::appendCommandStatus(result, outRoutingInfoStatus.getStatus()); } if (auto cm = outRoutingInfoStatus.getValue().cm()) { // Fetch result from other shards 1 chunk at a time. It would be better to do just // one big $or query, but then the sorting would not be efficient. const string shardName = ShardingState::get(opCtx)->getShardName(); for (const auto& chunk : cm->chunks()) { if (chunk.getShardId() == shardName) { chunks.push_back(chunk); } } } } long long inputCount = 0; unsigned int index = 0; BSONObj query; BSONArrayBuilder chunkSizes; BSONList values; while (true) { if (chunks.size() > 0) { const auto& chunk = chunks[index]; BSONObjBuilder b; b.appendAs(chunk.getMin().firstElement(), "$gte"); b.appendAs(chunk.getMax().firstElement(), "$lt"); query = BSON("_id" << b.obj()); // chunkSizes.append(min); } // reduce from each shard for a chunk BSONObj sortKey = BSON("_id" << 1); ParallelSortClusteredCursor cursor( servers, inputNS, Query(query).sort(sortKey), QueryOption_NoCursorTimeout); cursor.init(opCtx); int chunkSize = 0; while (cursor.more() || !values.empty()) { BSONObj t; if (cursor.more()) { t = cursor.next().getOwned(); ++inputCount; if (values.size() == 0) { values.push_back(t); continue; } if (dps::compareObjectsAccordingToSort(t, *(values.begin()), sortKey) == 0) { values.push_back(t); continue; } } BSONObj res = config.reducer->finalReduce(values, config.finalizer.get()); chunkSize += res.objsize(); if (state.isOnDisk()) state.insert(config.tempNamespace, res); else state.emit(res); values.clear(); if (!t.isEmpty()) values.push_back(t); } if (chunks.size() > 0) { const auto& chunk = chunks[index]; chunkSizes.append(chunk.getMin()); chunkSizes.append(chunkSize); } if (++index >= chunks.size()) break; } // Forget temporary input collection, if output is sharded collection ShardConnection::forgetNS(inputNS); result.append("chunkSizes", chunkSizes.arr()); long long outputCount = state.postProcessCollection(opCtx, curOp, pm); state.appendResults(result); BSONObjBuilder countsB(32); countsB.append("input", inputCount); countsB.append("reduce", state.numReduces()); countsB.append("output", outputCount); result.append("counts", countsB.obj()); return true; } } mapReduceFinishCommand; } // namespace } // namespace mongo