// mr_shard.cpp
/**
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
#include "pch.h"
#include "../util/net/message.h"
#include "../db/dbmessage.h"
#include "../scripting/engine.h"
#include "mr_shard.h"
namespace mongo {
namespace mr_shard {
AtomicUInt Config::JOB_NUMBER;
JSFunction::JSFunction( string type , const BSONElement& e ) {
_type = type;
_code = e._asCode();
if ( e.type() == CodeWScope )
_wantedScope = e.codeWScopeObject();
}
void JSFunction::init( State * state ) {
_scope = state->scope();
assert( _scope );
_scope->init( &_wantedScope );
_func = _scope->createFunction( _code.c_str() );
uassert( 14836 , str::stream() << "couldn't compile code for: " << _type , _func );
// install in JS scope so that it can be called in JS mode
_scope->setFunction(_type.c_str(), _code.c_str());
}
/**
* Applies the finalize function to a tuple obj (key, val)
* Returns tuple obj {_id: key, value: newval}
*/
BSONObj JSFinalizer::finalize( const BSONObj& o ) {
Scope * s = _func.scope();
Scope::NoDBAccess no = s->disableDBAccess( "can't access db inside finalize" );
s->invokeSafe( _func.func() , &o, 0 );
// don't want to use o.objsize() to size b
// since there are many cases where the point of finalize
// is converting many fields to 1
BSONObjBuilder b;
b.append( o.firstElement() );
s->append( b , "value" , "return" );
return b.obj();
}
void JSReducer::init( State * state ) {
_func.init( state );
}
/**
* Reduces a list of tuple objects (key, value) to a single tuple {"0": key, "1": value}
*/
BSONObj JSReducer::reduce( const BSONList& tuples ) {
if (tuples.size() <= 1)
return tuples[0];
BSONObj key;
int endSizeEstimate = 16;
_reduce( tuples , key , endSizeEstimate );
BSONObjBuilder b(endSizeEstimate);
b.appendAs( key.firstElement() , "0" );
_func.scope()->append( b , "1" , "return" );
return b.obj();
}
/**
* Reduces a list of tuple object (key, value) to a single tuple {_id: key, value: val}
* Also applies a finalizer method if present.
*/
BSONObj JSReducer::finalReduce( const BSONList& tuples , Finalizer * finalizer ) {
BSONObj res;
BSONObj key;
if (tuples.size() == 1) {
// 1 obj, just use it
key = tuples[0];
BSONObjBuilder b(key.objsize());
BSONObjIterator it(key);
b.appendAs( it.next() , "_id" );
b.appendAs( it.next() , "value" );
res = b.obj();
}
else {
// need to reduce
int endSizeEstimate = 16;
_reduce( tuples , key , endSizeEstimate );
BSONObjBuilder b(endSizeEstimate);
b.appendAs( key.firstElement() , "_id" );
_func.scope()->append( b , "value" , "return" );
res = b.obj();
}
if ( finalizer ) {
res = finalizer->finalize( res );
}
return res;
}
/**
* actually applies a reduce, to a list of tuples (key, value).
* After the call, tuples will hold a single tuple {"0": key, "1": value}
*/
void JSReducer::_reduce( const BSONList& tuples , BSONObj& key , int& endSizeEstimate ) {
int sizeEstimate = ( tuples.size() * tuples.begin()->getField( "value" ).size() ) + 128;
// need to build the reduce args: ( key, [values] )
BSONObjBuilder reduceArgs( sizeEstimate );
boost::scoped_ptr valueBuilder;
int sizeSoFar = 0;
unsigned n = 0;
for ( ; n BSONObjMaxUserSize ) {
assert( n > 1 ); // if not, inf. loop
break;
}
valueBuilder->append( ee );
sizeSoFar += ee.size();
}
assert(valueBuilder);
valueBuilder->done();
BSONObj args = reduceArgs.obj();
Scope * s = _func.scope();
s->invokeSafe( _func.func() , &args, 0 );
++numReduces;
if ( s->type( "return" ) == Array ) {
uasserted( 14838 , "reduce -> multiple not supported yet");
return;
}
endSizeEstimate = key.objsize() + ( args.objsize() / tuples.size() );
if ( n == tuples.size() )
return;
// the input list was too large, add the rest of elmts to new tuples and reduce again
// note: would be better to use loop instead of recursion to avoid stack overflow
BSONList x;
for ( ; n < tuples.size(); n++ ) {
x.push_back( tuples[n] );
}
BSONObjBuilder temp( endSizeEstimate );
temp.append( key.firstElement() );
s->append( temp , "1" , "return" );
x.push_back( temp.obj() );
_reduce( x , key , endSizeEstimate );
}
Config::Config( const string& _dbname , const BSONObj& cmdObj ) {
dbname = _dbname;
ns = dbname + "." + cmdObj.firstElement().valuestr();
verbose = cmdObj["verbose"].trueValue();
jsMode = cmdObj["jsMode"].trueValue();
jsMaxKeys = 500000;
reduceTriggerRatio = 2.0;
maxInMemSize = 5 * 1024 * 1024;
uassert( 14841 , "outType is no longer a valid option" , cmdObj["outType"].eoo() );
if ( cmdObj["out"].type() == String ) {
finalShort = cmdObj["out"].String();
outType = REPLACE;
}
else if ( cmdObj["out"].type() == Object ) {
BSONObj o = cmdObj["out"].embeddedObject();
BSONElement e = o.firstElement();
string t = e.fieldName();
if ( t == "normal" || t == "replace" ) {
outType = REPLACE;
finalShort = e.String();
}
else if ( t == "merge" ) {
outType = MERGE;
finalShort = e.String();
}
else if ( t == "reduce" ) {
outType = REDUCE;
finalShort = e.String();
}
else if ( t == "inline" ) {
outType = INMEMORY;
}
else {
uasserted( 14839 , str::stream() << "unknown out specifier [" << t << "]" );
}
if (o.hasElement("db")) {
outDB = o["db"].String();
}
}
else {
uasserted( 14840 , "'out' has to be a string or an object" );
}
if ( outType != INMEMORY ) { // setup names
tempLong = str::stream() << (outDB.empty() ? dbname : outDB) << ".tmp.mr." << cmdObj.firstElement().String() << "_" << finalShort << "_" << JOB_NUMBER++;
incLong = tempLong + "_inc";
finalLong = str::stream() << (outDB.empty() ? dbname : outDB) << "." << finalShort;
}
{
// scope and code
if ( cmdObj["scope"].type() == Object )
scopeSetup = cmdObj["scope"].embeddedObjectUserCheck();
reducer.reset( new JSReducer( cmdObj["reduce"] ) );
if ( cmdObj["finalize"].type() && cmdObj["finalize"].trueValue() )
finalizer.reset( new JSFinalizer( cmdObj["finalize"] ) );
}
{
// query options
if ( cmdObj["limit"].isNumber() )
limit = cmdObj["limit"].numberLong();
else
limit = 0;
}
}
State::State( const Config& c ) : _config( c ) {
_onDisk = _config.outType != Config::INMEMORY;
}
State::~State() {
if ( _onDisk ) {
try {
// _db.dropCollection( _config.tempLong );
// _db.dropCollection( _config.incLong );
}
catch ( std::exception& e ) {
error() << "couldn't cleanup after map reduce: " << e.what() << endl;
}
}
if (_scope) {
// cleanup js objects
ScriptingFunction cleanup = _scope->createFunction("delete _emitCt; delete _keyCt; delete _mrMap;");
_scope->invoke(cleanup, 0, 0, 0, true);
}
}
/**
* Initialize the mapreduce operation, creating the inc collection
*/
void State::init() {
// setup js
_scope.reset(globalScriptEngine->getPooledScope( _config.dbname ).release() );
// _scope->localConnect( _config.dbname.c_str() );
_scope->externalSetup();
if ( ! _config.scopeSetup.isEmpty() )
_scope->init( &_config.scopeSetup );
_config.reducer->init( this );
if ( _config.finalizer )
_config.finalizer->init( this );
_scope->setBoolean("_doFinal", _config.finalizer);
}
}
}