diff options
Diffstat (limited to 'src/mongo/db/commands/mr.h')
-rw-r--r-- | src/mongo/db/commands/mr.h | 689 |
1 files changed, 358 insertions, 331 deletions
diff --git a/src/mongo/db/commands/mr.h b/src/mongo/db/commands/mr.h index 083165ebe27..8bc4264794e 100644 --- a/src/mongo/db/commands/mr.h +++ b/src/mongo/db/commands/mr.h @@ -42,347 +42,374 @@ namespace mongo { - class Collection; - class Database; - class OperationContext; - - namespace mr { - - typedef std::vector<BSONObj> BSONList; - - class State; - - // ------------ function interfaces ----------- - - class Mapper { - MONGO_DISALLOW_COPYING(Mapper); - public: - virtual ~Mapper() {} - virtual void init( State * state ) = 0; - - virtual void map( const BSONObj& o ) = 0; - protected: - Mapper() = default; - }; - - class Finalizer { - MONGO_DISALLOW_COPYING(Finalizer); - public: - virtual ~Finalizer() {} - virtual void init( State * state ) = 0; - - /** - * this takes a tuple and returns a tuple - */ - virtual BSONObj finalize( const BSONObj& tuple ) = 0; - - protected: - Finalizer() = default; - }; - - class Reducer { - MONGO_DISALLOW_COPYING(Reducer); - public: - Reducer() : numReduces(0) {} - virtual ~Reducer() {} - virtual void init( State * state ) = 0; - - virtual BSONObj reduce( const BSONList& tuples ) = 0; - /** this means its a final reduce, even if there is no finalizer */ - virtual BSONObj finalReduce( const BSONList& tuples , Finalizer * finalizer ) = 0; - - long long numReduces; - }; - - // ------------ js function implementations ----------- - - /** - * used as a holder for Scope and ScriptingFunction - * visitor like pattern as Scope is gotten from first access - */ - class JSFunction { - MONGO_DISALLOW_COPYING(JSFunction); - public: - /** - * @param type (map|reduce|finalize) - */ - JSFunction( const std::string& type , const BSONElement& e ); - virtual ~JSFunction() {} - - virtual void init( State * state ); - - Scope * scope() const { return _scope; } - ScriptingFunction func() const { return _func; } - - private: - std::string _type; - std::string _code; // actual javascript code - BSONObj _wantedScope; // this is for CodeWScope - - Scope * _scope; // this is not owned by us, and might be shared - ScriptingFunction _func; - }; - - class JSMapper : public Mapper { - public: - JSMapper( const BSONElement & code ) : _func( "_map" , code ) {} - virtual void map( const BSONObj& o ); - virtual void init( State * state ); - - private: - JSFunction _func; - BSONObj _params; - }; - - class JSReducer : public Reducer { - public: - JSReducer( const BSONElement& code ) : _func( "_reduce" , code ) {} - virtual void init( State * state ); - - virtual BSONObj reduce( const BSONList& tuples ); - virtual BSONObj finalReduce( const BSONList& tuples , Finalizer * finalizer ); - - private: - - /** - * result in "__returnValue" - * @param key OUT - * @param endSizeEstimate OUT - */ - void _reduce( const BSONList& values , BSONObj& key , int& endSizeEstimate ); - - JSFunction _func; - }; - - class JSFinalizer : public Finalizer { - public: - JSFinalizer( const BSONElement& code ) : _func( "_finalize" , code ) {} - virtual BSONObj finalize( const BSONObj& o ); - virtual void init( State * state ) { _func.init( state ); } - private: - JSFunction _func; - - }; - - // ----------------- - - - class TupleKeyCmp { - public: - TupleKeyCmp() {} - bool operator()( const BSONObj &l, const BSONObj &r ) const { - return l.firstElement().woCompare( r.firstElement() ) < 0; - } - }; - - typedef std::map< BSONObj,BSONList,TupleKeyCmp > InMemory; // from key to list of tuples - - /** - * holds map/reduce config information - */ - class Config { - public: - Config( const std::string& _dbname , const BSONObj& cmdObj ); - - std::string dbname; - std::string ns; - - // options - bool verbose; - bool jsMode; - int splitInfo; - - // query options - - BSONObj filter; - BSONObj sort; - long long limit; - - // functions - - std::unique_ptr<Mapper> mapper; - std::unique_ptr<Reducer> reducer; - std::unique_ptr<Finalizer> finalizer; - - BSONObj mapParams; - BSONObj scopeSetup; - - // output tables - std::string incLong; - std::string tempNamespace; - - enum OutputType { - REPLACE , // atomically replace the collection - MERGE , // merge keys, override dups - REDUCE , // merge keys, reduce dups - INMEMORY // only store in memory, limited in size - }; - struct OutputOptions { - std::string outDB; - std::string collectionName; - std::string finalNamespace; - // if true, no lock during output operation - bool outNonAtomic; - OutputType outType; - } outputOptions; - - static OutputOptions parseOutputOptions(const std::string& dbname, const BSONObj& cmdObj); - - // max number of keys allowed in JS map before switching mode - long jsMaxKeys; - // ratio of duplicates vs unique keys before reduce is triggered in js mode - float reduceTriggerRatio; - // maximum size of map before it gets dumped to disk - long maxInMemSize; - - // true when called from mongos to do phase-1 of M/R - bool shardedFirstPass; - - static AtomicUInt32 JOB_NUMBER; - }; // end MRsetup - - /** - * stores information about intermediate map reduce state - * controls flow of data from map->reduce->finalize->output - */ - class State { - public: - /** - * txn must outlive this State. - */ - State( OperationContext* txn, const Config& c ); - ~State(); - - void init(); - - // ---- prep ----- - bool sourceExists(); - - long long incomingDocuments(); - - // ---- map stage ---- +class Collection; +class Database; +class OperationContext; - /** - * stages on in in-memory storage - */ - void emit( const BSONObj& a ); - - /** - * Checks the size of the transient in-memory results accumulated so far and potentially - * runs reduce in order to compact them. If the data is still too large, it will be - * spilled to the output collection. - * - * NOTE: Make sure that no DB locks are held, when calling this function, because it may - * try to acquire write DB lock for the write to the output collection. - */ - void reduceAndSpillInMemoryStateIfNeeded(); - - /** - * run reduce on _temp - */ - void reduceInMemory(); - - /** - * transfers in memory storage to temp collection - */ - void dumpToInc(); - void insertToInc( BSONObj& o ); - void _insertToInc( BSONObj& o ); - - // ------ reduce stage ----------- - - void prepTempCollection(); - - void finalReduce( BSONList& values ); - - void finalReduce( CurOp * op , ProgressMeterHolder& pm ); - - // ------- cleanup/data positioning ---------- - - /** - * Clean up the temporary and incremental collections - */ - void dropTempCollections(); - - /** - @return number objects in collection - */ - long long postProcessCollection( - OperationContext* txn, CurOp* op, ProgressMeterHolder& pm); - long long postProcessCollectionNonAtomic( - OperationContext* txn, CurOp* op, ProgressMeterHolder& pm); - - /** - * if INMEMORY will append - * may also append stats or anything else it likes - */ - void appendResults( BSONObjBuilder& b ); - - // -------- util ------------ - - /** - * inserts with correct replication semantics - */ - void insert( const std::string& ns , const BSONObj& o ); +namespace mr { - // ------ simple accessors ----- - - /** State maintains ownership, do no use past State lifetime */ - Scope* scope() { return _scope.get(); } - - const Config& config() { return _config; } - - bool isOnDisk() { return _onDisk; } +typedef std::vector<BSONObj> BSONList; - long long numEmits() const { if (_jsMode) return _scope->getNumberLongLong("_emitCt"); return _numEmits; } - long long numReduces() const { if (_jsMode) return _scope->getNumberLongLong("_redCt"); return _config.reducer->numReduces; } - long long numInMemKeys() const { if (_jsMode) return _scope->getNumberLongLong("_keyCt"); return _temp->size(); } +class State; - bool jsMode() {return _jsMode;} - void switchMode(bool jsMode); - void bailFromJS(); - - Collection* getCollectionOrUassert(Database* db, StringData ns); - - const Config& _config; - DBDirectClient _db; - bool _useIncremental; // use an incremental collection +// ------------ function interfaces ----------- - protected: +class Mapper { + MONGO_DISALLOW_COPYING(Mapper); - /** - * Appends a new document to the in-memory list of tuples, which are under that - * document's key. - * - * @return estimated in-memory size occupied by the newly added document. - */ - int _add(InMemory* im , const BSONObj& a); +public: + virtual ~Mapper() {} + virtual void init(State* state) = 0; - OperationContext* _txn; - std::unique_ptr<Scope> _scope; - bool _onDisk; // if the end result of this map reduce is disk or not + virtual void map(const BSONObj& o) = 0; - std::unique_ptr<InMemory> _temp; - long _size; // bytes in _temp - long _dupCount; // number of duplicate key entries +protected: + Mapper() = default; +}; - long long _numEmits; - - bool _jsMode; - ScriptingFunction _reduceAll; - ScriptingFunction _reduceAndEmit; - ScriptingFunction _reduceAndFinalize; - ScriptingFunction _reduceAndFinalizeAndInsert; - }; +class Finalizer { + MONGO_DISALLOW_COPYING(Finalizer); - BSONObj fast_emit( const BSONObj& args, void* data ); - BSONObj _bailFromJS( const BSONObj& args, void* data ); +public: + virtual ~Finalizer() {} + virtual void init(State* state) = 0; - void addPrivilegesRequiredForMapReduce(Command* commandTemplate, - const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out); - } // end mr namespace -} + /** + * this takes a tuple and returns a tuple + */ + virtual BSONObj finalize(const BSONObj& tuple) = 0; + +protected: + Finalizer() = default; +}; + +class Reducer { + MONGO_DISALLOW_COPYING(Reducer); + +public: + Reducer() : numReduces(0) {} + virtual ~Reducer() {} + virtual void init(State* state) = 0; + virtual BSONObj reduce(const BSONList& tuples) = 0; + /** this means its a final reduce, even if there is no finalizer */ + virtual BSONObj finalReduce(const BSONList& tuples, Finalizer* finalizer) = 0; + long long numReduces; +}; + +// ------------ js function implementations ----------- + +/** + * used as a holder for Scope and ScriptingFunction + * visitor like pattern as Scope is gotten from first access + */ +class JSFunction { + MONGO_DISALLOW_COPYING(JSFunction); + +public: + /** + * @param type (map|reduce|finalize) + */ + JSFunction(const std::string& type, const BSONElement& e); + virtual ~JSFunction() {} + + virtual void init(State* state); + + Scope* scope() const { + return _scope; + } + ScriptingFunction func() const { + return _func; + } + +private: + std::string _type; + std::string _code; // actual javascript code + BSONObj _wantedScope; // this is for CodeWScope + + Scope* _scope; // this is not owned by us, and might be shared + ScriptingFunction _func; +}; + +class JSMapper : public Mapper { +public: + JSMapper(const BSONElement& code) : _func("_map", code) {} + virtual void map(const BSONObj& o); + virtual void init(State* state); + +private: + JSFunction _func; + BSONObj _params; +}; + +class JSReducer : public Reducer { +public: + JSReducer(const BSONElement& code) : _func("_reduce", code) {} + virtual void init(State* state); + + virtual BSONObj reduce(const BSONList& tuples); + virtual BSONObj finalReduce(const BSONList& tuples, Finalizer* finalizer); + +private: + /** + * result in "__returnValue" + * @param key OUT + * @param endSizeEstimate OUT + */ + void _reduce(const BSONList& values, BSONObj& key, int& endSizeEstimate); + + JSFunction _func; +}; + +class JSFinalizer : public Finalizer { +public: + JSFinalizer(const BSONElement& code) : _func("_finalize", code) {} + virtual BSONObj finalize(const BSONObj& o); + virtual void init(State* state) { + _func.init(state); + } + +private: + JSFunction _func; +}; + +// ----------------- + + +class TupleKeyCmp { +public: + TupleKeyCmp() {} + bool operator()(const BSONObj& l, const BSONObj& r) const { + return l.firstElement().woCompare(r.firstElement()) < 0; + } +}; + +typedef std::map<BSONObj, BSONList, TupleKeyCmp> InMemory; // from key to list of tuples + +/** + * holds map/reduce config information + */ +class Config { +public: + Config(const std::string& _dbname, const BSONObj& cmdObj); + + std::string dbname; + std::string ns; + + // options + bool verbose; + bool jsMode; + int splitInfo; + + // query options + + BSONObj filter; + BSONObj sort; + long long limit; + + // functions + + std::unique_ptr<Mapper> mapper; + std::unique_ptr<Reducer> reducer; + std::unique_ptr<Finalizer> finalizer; + + BSONObj mapParams; + BSONObj scopeSetup; + + // output tables + std::string incLong; + std::string tempNamespace; + + enum OutputType { + REPLACE, // atomically replace the collection + MERGE, // merge keys, override dups + REDUCE, // merge keys, reduce dups + INMEMORY // only store in memory, limited in size + }; + struct OutputOptions { + std::string outDB; + std::string collectionName; + std::string finalNamespace; + // if true, no lock during output operation + bool outNonAtomic; + OutputType outType; + } outputOptions; + + static OutputOptions parseOutputOptions(const std::string& dbname, const BSONObj& cmdObj); + + // max number of keys allowed in JS map before switching mode + long jsMaxKeys; + // ratio of duplicates vs unique keys before reduce is triggered in js mode + float reduceTriggerRatio; + // maximum size of map before it gets dumped to disk + long maxInMemSize; + + // true when called from mongos to do phase-1 of M/R + bool shardedFirstPass; + + static AtomicUInt32 JOB_NUMBER; +}; // end MRsetup + +/** + * stores information about intermediate map reduce state + * controls flow of data from map->reduce->finalize->output + */ +class State { +public: + /** + * txn must outlive this State. + */ + State(OperationContext* txn, const Config& c); + ~State(); + + void init(); + + // ---- prep ----- + bool sourceExists(); + + long long incomingDocuments(); + + // ---- map stage ---- + + /** + * stages on in in-memory storage + */ + void emit(const BSONObj& a); + + /** + * Checks the size of the transient in-memory results accumulated so far and potentially + * runs reduce in order to compact them. If the data is still too large, it will be + * spilled to the output collection. + * + * NOTE: Make sure that no DB locks are held, when calling this function, because it may + * try to acquire write DB lock for the write to the output collection. + */ + void reduceAndSpillInMemoryStateIfNeeded(); + + /** + * run reduce on _temp + */ + void reduceInMemory(); + + /** + * transfers in memory storage to temp collection + */ + void dumpToInc(); + void insertToInc(BSONObj& o); + void _insertToInc(BSONObj& o); + + // ------ reduce stage ----------- + + void prepTempCollection(); + + void finalReduce(BSONList& values); + + void finalReduce(CurOp* op, ProgressMeterHolder& pm); + + // ------- cleanup/data positioning ---------- + + /** + * Clean up the temporary and incremental collections + */ + void dropTempCollections(); + + /** + @return number objects in collection + */ + long long postProcessCollection(OperationContext* txn, CurOp* op, ProgressMeterHolder& pm); + long long postProcessCollectionNonAtomic(OperationContext* txn, + CurOp* op, + ProgressMeterHolder& pm); + + /** + * if INMEMORY will append + * may also append stats or anything else it likes + */ + void appendResults(BSONObjBuilder& b); + + // -------- util ------------ + + /** + * inserts with correct replication semantics + */ + void insert(const std::string& ns, const BSONObj& o); + + // ------ simple accessors ----- + + /** State maintains ownership, do no use past State lifetime */ + Scope* scope() { + return _scope.get(); + } + + const Config& config() { + return _config; + } + + bool isOnDisk() { + return _onDisk; + } + + long long numEmits() const { + if (_jsMode) + return _scope->getNumberLongLong("_emitCt"); + return _numEmits; + } + long long numReduces() const { + if (_jsMode) + return _scope->getNumberLongLong("_redCt"); + return _config.reducer->numReduces; + } + long long numInMemKeys() const { + if (_jsMode) + return _scope->getNumberLongLong("_keyCt"); + return _temp->size(); + } + + bool jsMode() { + return _jsMode; + } + void switchMode(bool jsMode); + void bailFromJS(); + + Collection* getCollectionOrUassert(Database* db, StringData ns); + + const Config& _config; + DBDirectClient _db; + bool _useIncremental; // use an incremental collection + +protected: + /** + * Appends a new document to the in-memory list of tuples, which are under that + * document's key. + * + * @return estimated in-memory size occupied by the newly added document. + */ + int _add(InMemory* im, const BSONObj& a); + + OperationContext* _txn; + std::unique_ptr<Scope> _scope; + bool _onDisk; // if the end result of this map reduce is disk or not + + std::unique_ptr<InMemory> _temp; + long _size; // bytes in _temp + long _dupCount; // number of duplicate key entries + + long long _numEmits; + + bool _jsMode; + ScriptingFunction _reduceAll; + ScriptingFunction _reduceAndEmit; + ScriptingFunction _reduceAndFinalize; + ScriptingFunction _reduceAndFinalizeAndInsert; +}; + +BSONObj fast_emit(const BSONObj& args, void* data); +BSONObj _bailFromJS(const BSONObj& args, void* data); + +void addPrivilegesRequiredForMapReduce(Command* commandTemplate, + const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out); +} // end mr namespace +} |