// mr.h /** * Copyright (C) 2012 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #pragma once #include #include #include "mongo/db/auth/privilege.h" #include "mongo/db/curop.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/jsobj.h" #include "mongo/platform/atomic_word.h" #include "mongo/scripting/engine.h" namespace mongo { class Collection; class Database; class OperationContext; namespace mr { typedef std::vector BSONList; class State; // ------------ function interfaces ----------- class Mapper { MONGO_DISALLOW_COPYING(Mapper); public: virtual ~Mapper() {} virtual void init(State* state) = 0; virtual void map(const BSONObj& o) = 0; protected: Mapper() = default; }; class Finalizer { MONGO_DISALLOW_COPYING(Finalizer); public: virtual ~Finalizer() {} virtual void init(State* state) = 0; /** * this takes a tuple and returns a tuple */ virtual BSONObj finalize(const BSONObj& tuple) = 0; protected: Finalizer() = default; }; class Reducer { MONGO_DISALLOW_COPYING(Reducer); public: Reducer() : numReduces(0) {} virtual ~Reducer() {} virtual void init(State* state) = 0; virtual BSONObj reduce(const BSONList& tuples) = 0; /** this means its a final reduce, even if there is no finalizer */ virtual BSONObj finalReduce(const BSONList& tuples, Finalizer* finalizer) = 0; long long numReduces; }; // ------------ js function implementations ----------- /** * used as a holder for Scope and ScriptingFunction * visitor like pattern as Scope is gotten from first access */ class JSFunction { MONGO_DISALLOW_COPYING(JSFunction); public: /** * @param type (map|reduce|finalize) */ JSFunction(const std::string& type, const BSONElement& e); virtual ~JSFunction() {} virtual void init(State* state); Scope* scope() const { return _scope; } ScriptingFunction func() const { return _func; } private: std::string _type; std::string _code; // actual javascript code BSONObj _wantedScope; // this is for CodeWScope Scope* _scope; // this is not owned by us, and might be shared ScriptingFunction _func; }; class JSMapper : public Mapper { public: JSMapper(const BSONElement& code) : _func("_map", code) {} virtual void map(const BSONObj& o); virtual void init(State* state); private: JSFunction _func; BSONObj _params; }; class JSReducer : public Reducer { public: JSReducer(const BSONElement& code) : _func("_reduce", code) {} virtual void init(State* state); virtual BSONObj reduce(const BSONList& tuples); virtual BSONObj finalReduce(const BSONList& tuples, Finalizer* finalizer); private: /** * result in "__returnValue" * @param key OUT * @param endSizeEstimate OUT */ void _reduce(const BSONList& values, BSONObj& key, int& endSizeEstimate); JSFunction _func; }; class JSFinalizer : public Finalizer { public: JSFinalizer(const BSONElement& code) : _func("_finalize", code) {} virtual BSONObj finalize(const BSONObj& o); virtual void init(State* state) { _func.init(state); } private: JSFunction _func; }; // ----------------- class TupleKeyCmp { public: TupleKeyCmp() {} bool operator()(const BSONObj& l, const BSONObj& r) const { return l.firstElement().woCompare(r.firstElement()) < 0; } }; typedef std::map InMemory; // from key to list of tuples /** * holds map/reduce config information */ class Config { public: Config(const std::string& _dbname, const BSONObj& cmdObj); std::string dbname; std::string ns; // options bool verbose; bool jsMode; int splitInfo; // query options BSONObj filter; BSONObj sort; long long limit; // functions std::unique_ptr mapper; std::unique_ptr reducer; std::unique_ptr finalizer; BSONObj mapParams; BSONObj scopeSetup; // output tables std::string incLong; std::string tempNamespace; enum OutputType { REPLACE, // atomically replace the collection MERGE, // merge keys, override dups REDUCE, // merge keys, reduce dups INMEMORY // only store in memory, limited in size }; struct OutputOptions { std::string outDB; std::string collectionName; std::string finalNamespace; // if true, no lock during output operation bool outNonAtomic; OutputType outType; } outputOptions; static OutputOptions parseOutputOptions(const std::string& dbname, const BSONObj& cmdObj); // max number of keys allowed in JS map before switching mode long jsMaxKeys; // ratio of duplicates vs unique keys before reduce is triggered in js mode float reduceTriggerRatio; // maximum size of map before it gets dumped to disk long maxInMemSize; // true when called from mongos to do phase-1 of M/R bool shardedFirstPass; static AtomicUInt32 JOB_NUMBER; }; // end MRsetup /** * stores information about intermediate map reduce state * controls flow of data from map->reduce->finalize->output */ class State { public: /** * txn must outlive this State. */ State(OperationContext* txn, const Config& c); ~State(); void init(); // ---- prep ----- bool sourceExists(); long long incomingDocuments(); // ---- map stage ---- /** * stages on in in-memory storage */ void emit(const BSONObj& a); /** * Checks the size of the transient in-memory results accumulated so far and potentially * runs reduce in order to compact them. If the data is still too large, it will be * spilled to the output collection. * * NOTE: Make sure that no DB locks are held, when calling this function, because it may * try to acquire write DB lock for the write to the output collection. */ void reduceAndSpillInMemoryStateIfNeeded(); /** * run reduce on _temp */ void reduceInMemory(); /** * transfers in memory storage to temp collection */ void dumpToInc(); void insertToInc(BSONObj& o); void _insertToInc(BSONObj& o); // ------ reduce stage ----------- void prepTempCollection(); void finalReduce(BSONList& values); void finalReduce(CurOp* op, ProgressMeterHolder& pm); // ------- cleanup/data positioning ---------- /** * Clean up the temporary and incremental collections */ void dropTempCollections(); /** @return number objects in collection */ long long postProcessCollection(OperationContext* txn, CurOp* op, ProgressMeterHolder& pm); long long postProcessCollectionNonAtomic(OperationContext* txn, CurOp* op, ProgressMeterHolder& pm); /** * if INMEMORY will append * may also append stats or anything else it likes */ void appendResults(BSONObjBuilder& b); // -------- util ------------ /** * inserts with correct replication semantics */ void insert(const std::string& ns, const BSONObj& o); // ------ simple accessors ----- /** State maintains ownership, do no use past State lifetime */ Scope* scope() { return _scope.get(); } const Config& config() { return _config; } bool isOnDisk() { return _onDisk; } long long numEmits() const { if (_jsMode) return _scope->getNumberLongLong("_emitCt"); return _numEmits; } long long numReduces() const { if (_jsMode) return _scope->getNumberLongLong("_redCt"); return _config.reducer->numReduces; } long long numInMemKeys() const { if (_jsMode) return _scope->getNumberLongLong("_keyCt"); return _temp->size(); } bool jsMode() { return _jsMode; } void switchMode(bool jsMode); void bailFromJS(); Collection* getCollectionOrUassert(Database* db, StringData ns); const Config& _config; DBDirectClient _db; bool _useIncremental; // use an incremental collection protected: /** * Appends a new document to the in-memory list of tuples, which are under that * document's key. * * @return estimated in-memory size occupied by the newly added document. */ int _add(InMemory* im, const BSONObj& a); OperationContext* _txn; std::unique_ptr _scope; bool _onDisk; // if the end result of this map reduce is disk or not std::unique_ptr _temp; long _size; // bytes in _temp long _dupCount; // number of duplicate key entries long long _numEmits; bool _jsMode; ScriptingFunction _reduceAll; ScriptingFunction _reduceAndEmit; ScriptingFunction _reduceAndFinalize; ScriptingFunction _reduceAndFinalizeAndInsert; }; BSONObj fast_emit(const BSONObj& args, void* data); BSONObj _bailFromJS(const BSONObj& args, void* data); void addPrivilegesRequiredForMapReduce(Command* commandTemplate, const std::string& dbname, const BSONObj& cmdObj, std::vector* out); } // end mr namespace }