// mr_shard.h /** * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #pragma once #include "pch.h" namespace mongo { namespace mr_shard { typedef vector BSONList; class State; // ------------ function interfaces ----------- class Finalizer : boost::noncopyable { public: virtual ~Finalizer() {} virtual void init( State * state ) = 0; /** * this takes a tuple and returns a tuple */ virtual BSONObj finalize( const BSONObj& tuple ) = 0; }; class Reducer : boost::noncopyable { public: Reducer() : numReduces(0) {} virtual ~Reducer() {} virtual void init( State * state ) = 0; virtual BSONObj reduce( const BSONList& tuples ) = 0; /** this means its a final reduce, even if there is no finalizer */ virtual BSONObj finalReduce( const BSONList& tuples , Finalizer * finalizer ) = 0; long long numReduces; }; // ------------ js function implementations ----------- /** * used as a holder for Scope and ScriptingFunction * visitor like pattern as Scope is gotten from first access */ class JSFunction : boost::noncopyable { public: /** * @param type (map|reduce|finalize) */ JSFunction( string type , const BSONElement& e ); virtual ~JSFunction() {} virtual void init( State * state ); Scope * scope() const { return _scope; } ScriptingFunction func() const { return _func; } private: string _type; string _code; // actual javascript code BSONObj _wantedScope; // this is for CodeWScope Scope * _scope; // this is not owned by us, and might be shared ScriptingFunction _func; }; class JSReducer : public Reducer { public: JSReducer( const BSONElement& code ) : _func( "_reduce" , code ) {} virtual void init( State * state ); virtual BSONObj reduce( const BSONList& tuples ); virtual BSONObj finalReduce( const BSONList& tuples , Finalizer * finalizer ); private: /** * result in "return" * @param key OUT * @param endSizeEstimate OUT */ void _reduce( const BSONList& values , BSONObj& key , int& endSizeEstimate ); JSFunction _func; }; class JSFinalizer : public Finalizer { public: JSFinalizer( const BSONElement& code ) : _func( "_finalize" , code ) {} virtual BSONObj finalize( const BSONObj& o ); virtual void init( State * state ) { _func.init( state ); } private: JSFunction _func; }; // ----------------- /** * holds map/reduce config information */ class Config { public: Config( const string& _dbname , const BSONObj& cmdObj ); string dbname; string ns; // options bool verbose; bool jsMode; // query options BSONObj filter; BSONObj sort; long long limit; // functions scoped_ptr reducer; scoped_ptr finalizer; BSONObj mapParams; BSONObj scopeSetup; // output tables string incLong; string tempLong; string finalShort; string finalLong; string outDB; // max number of keys allowed in JS map before switching mode long jsMaxKeys; // ratio of duplicates vs unique keys before reduce is triggered in js mode float reduceTriggerRatio; // maximum size of map before it gets dumped to disk long maxInMemSize; enum { REPLACE , // atomically replace the collection MERGE , // merge keys, override dups REDUCE , // merge keys, reduce dups INMEMORY // only store in memory, limited in size } outType; static AtomicUInt JOB_NUMBER; }; // end MRsetup /** * stores information about intermediate map reduce state * controls flow of data from map->reduce->finalize->output */ class State { public: State( const Config& c ); ~State(); void init(); // ---- prep ----- bool sourceExists(); long long incomingDocuments(); // ---- map stage ---- /** * stages on in in-memory storage */ void emit( const BSONObj& a ); /** * if size is big, run a reduce * if its still big, dump to temp collection */ void checkSize(); /** * run reduce on _temp */ void reduceInMemory(); // ------ reduce stage ----------- void prepTempCollection(); void finalReduce( BSONList& values ); void finalReduce( CurOp * op , ProgressMeterHolder& pm ); // ------ simple accessors ----- /** State maintains ownership, do no use past State lifetime */ Scope* scope() { return _scope.get(); } const Config& config() { return _config; } const bool isOnDisk() { return _onDisk; } long long numReduces() const { return _config.reducer->numReduces; } const Config& _config; protected: scoped_ptr _scope; bool _onDisk; // if the end result of this map reduce is disk or not }; } // end mr namespace }