// mr.h /** * Copyright (C) 2012 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #pragma once #include #include #include #include "mongo/db/auth/privilege.h" #include "mongo/db/curop.h" #include "mongo/db/instance.h" #include "mongo/db/jsobj.h" #include "mongo/scripting/engine.h" namespace mongo { namespace mr { typedef vector BSONList; class State; // ------------ function interfaces ----------- class Mapper : boost::noncopyable { public: virtual ~Mapper() {} virtual void init( State * state ) = 0; virtual void map( const BSONObj& o ) = 0; }; class Finalizer : boost::noncopyable { public: virtual ~Finalizer() {} virtual void init( State * state ) = 0; /** * this takes a tuple and returns a tuple */ virtual BSONObj finalize( const BSONObj& tuple ) = 0; }; class Reducer : boost::noncopyable { public: Reducer() : numReduces(0) {} virtual ~Reducer() {} virtual void init( State * state ) = 0; virtual BSONObj reduce( const BSONList& tuples ) = 0; /** this means its a final reduce, even if there is no finalizer */ virtual BSONObj finalReduce( const BSONList& tuples , Finalizer * finalizer ) = 0; long long numReduces; }; // ------------ js function implementations ----------- /** * used as a holder for Scope and ScriptingFunction * visitor like pattern as Scope is gotten from first access */ class JSFunction : boost::noncopyable { public: /** * @param type (map|reduce|finalize) */ JSFunction( const std::string& type , const BSONElement& e ); virtual ~JSFunction() {} virtual void init( State * state ); Scope * scope() const { return _scope; } ScriptingFunction func() const { return _func; } private: string _type; string _code; // actual javascript code BSONObj _wantedScope; // this is for CodeWScope Scope * _scope; // this is not owned by us, and might be shared ScriptingFunction _func; }; class JSMapper : public Mapper { public: JSMapper( const BSONElement & code ) : _func( "_map" , code ) {} virtual void map( const BSONObj& o ); virtual void init( State * state ); private: JSFunction _func; BSONObj _params; }; class JSReducer : public Reducer { public: JSReducer( const BSONElement& code ) : _func( "_reduce" , code ) {} virtual void init( State * state ); virtual BSONObj reduce( const BSONList& tuples ); virtual BSONObj finalReduce( const BSONList& tuples , Finalizer * finalizer ); private: /** * result in "__returnValue" * @param key OUT * @param endSizeEstimate OUT */ void _reduce( const BSONList& values , BSONObj& key , int& endSizeEstimate ); JSFunction _func; }; class JSFinalizer : public Finalizer { public: JSFinalizer( const BSONElement& code ) : _func( "_finalize" , code ) {} virtual BSONObj finalize( const BSONObj& o ); virtual void init( State * state ) { _func.init( state ); } private: JSFunction _func; }; // ----------------- class TupleKeyCmp { public: TupleKeyCmp() {} bool operator()( const BSONObj &l, const BSONObj &r ) const { return l.firstElement().woCompare( r.firstElement() ) < 0; } }; typedef map< BSONObj,BSONList,TupleKeyCmp > InMemory; // from key to list of tuples /** * holds map/reduce config information */ class Config { public: Config( const string& _dbname , const BSONObj& cmdObj ); string dbname; string ns; // options bool verbose; bool jsMode; int splitInfo; // query options BSONObj filter; BSONObj sort; long long limit; // functions scoped_ptr mapper; scoped_ptr reducer; scoped_ptr finalizer; BSONObj mapParams; BSONObj scopeSetup; // output tables string incLong; string tempNamespace; enum OutputType { REPLACE , // atomically replace the collection MERGE , // merge keys, override dups REDUCE , // merge keys, reduce dups INMEMORY // only store in memory, limited in size }; struct OutputOptions { string outDB; string collectionName; string finalNamespace; // if true, no lock during output operation bool outNonAtomic; OutputType outType; } outputOptions; static OutputOptions parseOutputOptions(const string& dbname, const BSONObj& cmdObj); // max number of keys allowed in JS map before switching mode long jsMaxKeys; // ratio of duplicates vs unique keys before reduce is triggered in js mode float reduceTriggerRatio; // maximum size of map before it gets dumped to disk long maxInMemSize; // true when called from mongos to do phase-1 of M/R bool shardedFirstPass; static AtomicUInt JOB_NUMBER; }; // end MRsetup /** * stores information about intermediate map reduce state * controls flow of data from map->reduce->finalize->output */ class State { public: State( const Config& c ); ~State(); void init(); // ---- prep ----- bool sourceExists(); long long incomingDocuments(); // ---- map stage ---- /** * stages on in in-memory storage */ void emit( const BSONObj& a ); /** * if size is big, run a reduce * if its still big, dump to temp collection */ void checkSize(); /** * run reduce on _temp */ void reduceInMemory(); /** * transfers in memory storage to temp collection */ void dumpToInc(); void insertToInc( BSONObj& o ); void _insertToInc( BSONObj& o ); // ------ reduce stage ----------- void prepTempCollection(); void finalReduce( BSONList& values ); void finalReduce( CurOp * op , ProgressMeterHolder& pm ); // ------- cleanup/data positioning ---------- /** * Clean up the temporary and incremental collections */ void dropTempCollections(); /** @return number objects in collection */ long long postProcessCollection( CurOp* op , ProgressMeterHolder& pm ); long long postProcessCollectionNonAtomic( CurOp* op , ProgressMeterHolder& pm ); /** * if INMEMORY will append * may also append stats or anything else it likes */ void appendResults( BSONObjBuilder& b ); // -------- util ------------ /** * inserts with correct replication semantics */ void insert( const string& ns , const BSONObj& o ); // ------ simple accessors ----- /** State maintains ownership, do no use past State lifetime */ Scope* scope() { return _scope.get(); } const Config& config() { return _config; } const bool isOnDisk() { return _onDisk; } long long numEmits() const { if (_jsMode) return _scope->getNumberLongLong("_emitCt"); return _numEmits; } long long numReduces() const { if (_jsMode) return _scope->getNumberLongLong("_redCt"); return _config.reducer->numReduces; } long long numInMemKeys() const { if (_jsMode) return _scope->getNumberLongLong("_keyCt"); return _temp->size(); } bool jsMode() {return _jsMode;} void switchMode(bool jsMode); void bailFromJS(); const Config& _config; DBDirectClient _db; bool _useIncremental; // use an incremental collection protected: void _add( InMemory* im , const BSONObj& a , long& size ); scoped_ptr _scope; bool _onDisk; // if the end result of this map reduce is disk or not scoped_ptr _temp; long _size; // bytes in _temp long _dupCount; // number of duplicate key entries long long _numEmits; bool _jsMode; ScriptingFunction _reduceAll; ScriptingFunction _reduceAndEmit; ScriptingFunction _reduceAndFinalize; ScriptingFunction _reduceAndFinalizeAndInsert; }; BSONObj fast_emit( const BSONObj& args, void* data ); BSONObj _bailFromJS( const BSONObj& args, void* data ); void addPrivilegesRequiredForMapReduce(const std::string& dbname, const BSONObj& cmdObj, std::vector* out); } // end mr namespace }