// mr.h
/**
* Copyright (C) 2012 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include
#include
#include "mongo/db/auth/privilege.h"
#include "mongo/db/curop.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/namespace_string.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/scripting/engine.h"
namespace mongo {
class Collection;
class Database;
class OperationContext;
namespace mr {
typedef std::vector BSONList;
class State;
// ------------ function interfaces -----------
class Mapper {
MONGO_DISALLOW_COPYING(Mapper);
public:
virtual ~Mapper() {}
virtual void init(State* state) = 0;
virtual void map(const BSONObj& o) = 0;
protected:
Mapper() = default;
};
class Finalizer {
MONGO_DISALLOW_COPYING(Finalizer);
public:
virtual ~Finalizer() {}
virtual void init(State* state) = 0;
/**
* this takes a tuple and returns a tuple
*/
virtual BSONObj finalize(const BSONObj& tuple) = 0;
protected:
Finalizer() = default;
};
class Reducer {
MONGO_DISALLOW_COPYING(Reducer);
public:
Reducer() : numReduces(0) {}
virtual ~Reducer() {}
virtual void init(State* state) = 0;
virtual BSONObj reduce(const BSONList& tuples) = 0;
/** this means its a final reduce, even if there is no finalizer */
virtual BSONObj finalReduce(const BSONList& tuples, Finalizer* finalizer) = 0;
long long numReduces;
};
// ------------ js function implementations -----------
/**
* used as a holder for Scope and ScriptingFunction
* visitor like pattern as Scope is gotten from first access
*/
class JSFunction {
MONGO_DISALLOW_COPYING(JSFunction);
public:
/**
* @param type (map|reduce|finalize)
*/
JSFunction(const std::string& type, const BSONElement& e);
virtual ~JSFunction() {}
virtual void init(State* state);
Scope* scope() const {
return _scope;
}
ScriptingFunction func() const {
return _func;
}
private:
std::string _type;
std::string _code; // actual javascript code
BSONObj _wantedScope; // this is for CodeWScope
Scope* _scope; // this is not owned by us, and might be shared
ScriptingFunction _func;
};
class JSMapper : public Mapper {
public:
JSMapper(const BSONElement& code) : _func("_map", code) {}
virtual void map(const BSONObj& o);
virtual void init(State* state);
private:
JSFunction _func;
BSONObj _params;
};
class JSReducer : public Reducer {
public:
JSReducer(const BSONElement& code) : _func("_reduce", code) {}
virtual void init(State* state);
virtual BSONObj reduce(const BSONList& tuples);
virtual BSONObj finalReduce(const BSONList& tuples, Finalizer* finalizer);
private:
/**
* result in "__returnValue"
* @param key OUT
* @param endSizeEstimate OUT
*/
void _reduce(const BSONList& values, BSONObj& key, int& endSizeEstimate);
JSFunction _func;
};
class JSFinalizer : public Finalizer {
public:
JSFinalizer(const BSONElement& code) : _func("_finalize", code) {}
virtual BSONObj finalize(const BSONObj& o);
virtual void init(State* state) {
_func.init(state);
}
private:
JSFunction _func;
};
// -----------------
class TupleKeyCmp {
public:
TupleKeyCmp() {}
bool operator()(const BSONObj& l, const BSONObj& r) const {
return l.firstElement().woCompare(r.firstElement()) < 0;
}
};
typedef std::map InMemory; // from key to list of tuples
/**
* holds map/reduce config information
*/
class Config {
public:
Config(const std::string& _dbname, const BSONObj& cmdObj);
std::string dbname;
NamespaceString nss;
// options
bool verbose;
bool jsMode;
int splitInfo;
// query options
BSONObj filter;
BSONObj sort;
BSONObj collation;
long long limit;
// functions
std::unique_ptr mapper;
std::unique_ptr reducer;
std::unique_ptr finalizer;
BSONObj mapParams;
BSONObj scopeSetup;
// output tables
NamespaceString incLong;
NamespaceString tempNamespace;
enum OutputType {
REPLACE, // atomically replace the collection
MERGE, // merge keys, override dups
REDUCE, // merge keys, reduce dups
INMEMORY // only store in memory, limited in size
};
struct OutputOptions {
std::string outDB;
std::string collectionName;
NamespaceString finalNamespace;
// if true, no lock during output operation
bool outNonAtomic;
OutputType outType;
} outputOptions;
static OutputOptions parseOutputOptions(const std::string& dbname, const BSONObj& cmdObj);
// max number of keys allowed in JS map before switching mode
long jsMaxKeys;
// ratio of duplicates vs unique keys before reduce is triggered in js mode
float reduceTriggerRatio;
// maximum size of map before it gets dumped to disk
long maxInMemSize;
// true when called from mongos to do phase-1 of M/R
bool shardedFirstPass;
static AtomicUInt32 JOB_NUMBER;
}; // end MRsetup
/**
* stores information about intermediate map reduce state
* controls flow of data from map->reduce->finalize->output
*/
class State {
public:
/**
* opCtx must outlive this State.
*/
State(OperationContext* opCtx, const Config& c);
~State();
void init();
// ---- prep -----
bool sourceExists();
// ---- map stage ----
/**
* stages on in in-memory storage
*/
void emit(const BSONObj& a);
/**
* Checks the size of the transient in-memory results accumulated so far and potentially
* runs reduce in order to compact them. If the data is still too large, it will be
* spilled to the output collection.
*
* NOTE: Make sure that no DB locks are held, when calling this function, because it may
* try to acquire write DB lock for the write to the output collection.
*/
void reduceAndSpillInMemoryStateIfNeeded();
/**
* run reduce on _temp
*/
void reduceInMemory();
/**
* transfers in memory storage to temp collection
*/
void dumpToInc();
void insertToInc(BSONObj& o);
void _insertToInc(BSONObj& o);
// ------ reduce stage -----------
void prepTempCollection();
void finalReduce(BSONList& values);
void finalReduce(OperationContext* opCtx, CurOp* op, ProgressMeterHolder& pm);
// ------- cleanup/data positioning ----------
/**
* Clean up the temporary and incremental collections
*/
void dropTempCollections();
/**
@return number objects in collection
*/
long long postProcessCollection(OperationContext* opCtx, CurOp* op, ProgressMeterHolder& pm);
long long postProcessCollectionNonAtomic(OperationContext* opCtx,
CurOp* op,
ProgressMeterHolder& pm,
bool callerHoldsGlobalLock);
/**
* if INMEMORY will append
* may also append stats or anything else it likes
*/
void appendResults(BSONObjBuilder& b);
// -------- util ------------
/**
* inserts with correct replication semantics
*/
void insert(const NamespaceString& nss, const BSONObj& o);
// ------ simple accessors -----
/** State maintains ownership, do no use past State lifetime */
Scope* scope() {
return _scope.get();
}
const Config& config() {
return _config;
}
bool isOnDisk() {
return _onDisk;
}
long long numEmits() const {
if (_jsMode)
return _scope->getNumberLongLong("_emitCt");
return _numEmits;
}
long long numReduces() const {
if (_jsMode)
return _scope->getNumberLongLong("_redCt");
return _config.reducer->numReduces;
}
long long numInMemKeys() const {
if (_jsMode)
return _scope->getNumberLongLong("_keyCt");
return _temp->size();
}
bool jsMode() {
return _jsMode;
}
void switchMode(bool jsMode);
void bailFromJS();
static Collection* getCollectionOrUassert(OperationContext* opCtx,
Database* db,
const NamespaceString& nss);
const Config& _config;
DBDirectClient _db;
bool _useIncremental; // use an incremental collection
protected:
/**
* Appends a new document to the in-memory list of tuples, which are under that
* document's key.
*
* @return estimated in-memory size occupied by the newly added document.
*/
int _add(InMemory* im, const BSONObj& a);
OperationContext* _opCtx;
std::unique_ptr _scope;
bool _onDisk; // if the end result of this map reduce is disk or not
std::unique_ptr _temp;
long _size; // bytes in _temp
long _dupCount; // number of duplicate key entries
long long _numEmits;
bool _jsMode;
ScriptingFunction _reduceAll;
ScriptingFunction _reduceAndEmit;
ScriptingFunction _reduceAndFinalize;
ScriptingFunction _reduceAndFinalizeAndInsert;
};
BSONObj fast_emit(const BSONObj& args, void* data);
BSONObj _bailFromJS(const BSONObj& args, void* data);
void addPrivilegesRequiredForMapReduce(Command* commandTemplate,
const std::string& dbname,
const BSONObj& cmdObj,
std::vector* out);
/**
* Returns true if the provided mapReduce command has an 'out' parameter.
*/
bool mrSupportsWriteConcern(const BSONObj& cmd);
} // end mr namespace
}