summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEliot Horowitz <eliot@10gen.com>2010-11-15 15:10:31 -0500
committerEliot Horowitz <eliot@10gen.com>2010-11-15 15:10:31 -0500
commit94ba794b852279ac73c40b4c924f8b4487b2da8b (patch)
tree6f5bc7dad12436a85a42557d6a3c5519ce1b8fcc
parent1cafbccecade5457bb9a637619a34055c2936fbf (diff)
downloadmongo-94ba794b852279ac73c40b4c924f8b4487b2da8b.tar.gz
map/reduce into old collections merge or reduce SERVER-647
-rw-r--r--db/commands/mr.cpp88
-rw-r--r--db/commands/mr.h24
-rw-r--r--jstests/mr_merge.js51
-rw-r--r--jstests/mr_outreduce.js41
4 files changed, 168 insertions, 36 deletions
diff --git a/db/commands/mr.cpp b/db/commands/mr.cpp
index 722ee513e3f..6398ed7bc7f 100644
--- a/db/commands/mr.cpp
+++ b/db/commands/mr.cpp
@@ -35,7 +35,7 @@ namespace mongo {
AtomicUInt MRSetup::JOB_NUMBER;
- BSONObj reduceValues( BSONList& values , Scope * s , ScriptingFunction reduce , bool final , ScriptingFunction finalize ){
+ BSONObj reduceValues( BSONList& values , MRReduceState * state , bool final ){
uassert( 10074 , "need values" , values.size() );
int sizeEstimate = ( values.size() * values.begin()->getField( "value" ).size() ) + 128;
@@ -72,8 +72,8 @@ namespace mongo {
valueBuilder->done();
BSONObj args = reduceArgs.obj();
- s->invokeSafe( reduce , args );
- if ( s->type( "return" ) == Array ){
+ state->scope->invokeSafe( state->reduce , args );
+ if ( state->scope->type( "return" ) == Array ){
uassert( 10075 , "reduce -> multiple not supported yet",0);
return BSONObj();
}
@@ -87,24 +87,24 @@ namespace mongo {
}
BSONObjBuilder temp( endSizeEstimate );
temp.append( key.firstElement() );
- s->append( temp , "1" , "return" );
+ state->scope->append( temp , "1" , "return" );
x.push_back( temp.obj() );
- return reduceValues( x , s , reduce , final , finalize );
+ return reduceValues( x , state , final );
}
- if ( finalize ){
- Scope::NoDBAccess no = s->disableDBAccess( "can't access db inside finalize" );
+ if ( state->finalize ){
+ Scope::NoDBAccess no = state->scope->disableDBAccess( "can't access db inside finalize" );
BSONObjBuilder b(endSizeEstimate);
b.appendAs( key.firstElement() , "_id" );
- s->append( b , "value" , "return" );
- s->invokeSafe( finalize , b.obj() );
+ state->scope->append( b , "value" , "return" );
+ state->scope->invokeSafe( state->finalize , b.obj() );
}
BSONObjBuilder b(endSizeEstimate);
b.appendAs( key.firstElement() , final ? "_id" : "0" );
- s->append( b , final ? "value" : "1" , "return" );
+ state->scope->append( b , final ? "value" : "1" , "return" );
return b.obj();
}
@@ -201,7 +201,7 @@ namespace mongo {
return field._asCode();
}
- long long MRSetup::renameIfNeeded( DBDirectClient& db ){
+ long long MRSetup::renameIfNeeded( DBDirectClient& db , MRReduceState * state ){
assertInWriteLock();
if ( finalLong != tempLong ){
@@ -222,7 +222,31 @@ namespace mongo {
db.dropCollection( tempLong );
}
else if ( outType == REDUCE ){
- assert(0);
+ BSONList values;
+
+ auto_ptr<DBClientCursor> cursor = db.query( tempLong , BSONObj() );
+ while ( cursor->more() ){
+ BSONObj temp = cursor->next();
+ BSONObj old;
+
+ bool found;
+ {
+ Client::Context tx( finalLong );
+ found = Helpers::findOne( finalLong.c_str() , temp["_id"].wrap() , old , true );
+ }
+
+ if ( found ){
+ // need to reduce
+ values.clear();
+ values.push_back( temp );
+ values.push_back( old );
+ Helpers::upsert( finalLong , reduceValues( values , state , true ) );
+ }
+ else {
+ Helpers::upsert( finalLong , temp );
+ }
+ }
+ db.dropCollection( tempLong );
}
else {
assert(0);
@@ -230,7 +254,7 @@ namespace mongo {
}
return db.count( finalLong );
}
-
+
void MRSetup::insert( const string& ns , BSONObj& o ){
writelock l( ns );
Client::Context ctx( ns );
@@ -243,8 +267,12 @@ namespace mongo {
}
- MRState::MRState( MRSetup& s ) : setup(s){
- scope = globalScriptEngine->getPooledScope( setup.dbname );
+ MRState::MRState( MRSetup& s )
+ : setup(s){
+ }
+
+ void MRState::init(){
+ scope.reset(globalScriptEngine->getPooledScope( setup.dbname ).release() );
scope->localConnect( setup.dbname.c_str() );
map = scope->createFunction( setup.mapCode.c_str() );
@@ -278,7 +306,7 @@ namespace mongo {
return;
BSONObj key = values.begin()->firstElement().wrap( "_id" );
- BSONObj res = reduceValues( values , scope.get() , reduce , 1 , finalize );
+ BSONObj res = reduceValues( values , this , true );
setup.insert( setup.tempLong , res );
}
@@ -307,7 +335,7 @@ namespace mongo {
write( *(all.begin()) );
}
else if ( all.size() > 1 ){
- BSONObj res = reduceValues( all , _state.scope.get() , _state.reduce , false , 0 );
+ BSONObj res = reduceValues( all , &_state , false );
insert( res );
}
}
@@ -398,9 +426,11 @@ namespace mongo {
BSONObjBuilder countsBuilder;
BSONObjBuilder timingBuilder;
+ MRState state( mr );
+
try {
-
- MRState state( mr );
+ state.init();
+
state.scope->injectNative( "emit" , fast_emit );
MRTL * mrtl = new MRTL( state );
@@ -578,7 +608,7 @@ namespace mongo {
dblock lock;
db.dropCollection( mr.incLong );
- finalCount = mr.renameIfNeeded( db );
+ finalCount = mr.renameIfNeeded( db , &state );
}
timingBuilder.append( "total" , t.millis() );
@@ -644,6 +674,8 @@ namespace mongo {
}
+ MRReduceState state;
+
DBDirectClient db;
{ // reduce from each stream
@@ -654,12 +686,11 @@ namespace mongo {
Query().sort( sortKey ) );
cursor.init();
- auto_ptr<Scope> s = globalScriptEngine->getPooledScope( dbname );
- s->localConnect( dbname.c_str() );
- ScriptingFunction reduceFunction = s->createFunction( mr.reduceCode.c_str() );
- ScriptingFunction finalizeFunction = 0;
+ state.scope.reset( globalScriptEngine->getPooledScope( dbname ).release() );
+ state.scope->localConnect( dbname.c_str() );
+ state.reduce = state.scope->createFunction( mr.reduceCode.c_str() );
if ( mr.finalizeCode.size() )
- finalizeFunction = s->createFunction( mr.finalizeCode.c_str() );
+ state.finalize = state.scope->createFunction( mr.finalizeCode.c_str() );
BSONList values;
@@ -679,16 +710,17 @@ namespace mongo {
}
- db.insert( mr.tempLong , reduceValues( values , s.get() , reduceFunction , 1 , finalizeFunction ) );
+ db.insert( mr.tempLong , reduceValues( values , &state , true ) );
values.clear();
values.push_back( t );
}
if ( values.size() )
- db.insert( mr.tempLong , reduceValues( values , s.get() , reduceFunction , 1 , finalizeFunction ) );
+ db.insert( mr.tempLong , reduceValues( values , &state , true ) );
}
- long long finalCount = mr.renameIfNeeded( db );
+
+ long long finalCount = mr.renameIfNeeded( db , &state );
log(0) << " mapreducefinishcommand " << mr.finalLong << " " << finalCount << endl;
for ( set<ServerAndQuery>::iterator i=servers.begin(); i!=servers.end(); i++ ){
diff --git a/db/commands/mr.h b/db/commands/mr.h
index 7eec40078bc..9f8907b1de3 100644
--- a/db/commands/mr.h
+++ b/db/commands/mr.h
@@ -23,6 +23,18 @@ namespace mongo {
namespace mr {
+ class MRReduceState {
+ public:
+
+ MRReduceState() : reduce(), finalize(){}
+
+ scoped_ptr<Scope> scope;
+
+ ScriptingFunction reduce;
+ ScriptingFunction finalize;
+ };
+
+
typedef vector<BSONObj> BSONList;
class MyCmp {
@@ -50,7 +62,7 @@ namespace mongo {
/**
@return number objects in collection
*/
- long long renameIfNeeded( DBDirectClient& db );
+ long long renameIfNeeded( DBDirectClient& db , MRReduceState * state );
void insert( const string& ns , BSONObj& o );
@@ -91,24 +103,20 @@ namespace mongo {
static AtomicUInt JOB_NUMBER;
}; // end MRsetup
-
/**
* container for all stack based map/reduce state
*/
- class MRState {
+ class MRState : public MRReduceState {
public:
MRState( MRSetup& s );
-
+ void init();
+
void finalReduce( BSONList& values );
MRSetup& setup;
- auto_ptr<Scope> scope;
DBDirectClient db;
ScriptingFunction map;
- ScriptingFunction reduce;
- ScriptingFunction finalize;
-
};
/**
diff --git a/jstests/mr_merge.js b/jstests/mr_merge.js
new file mode 100644
index 00000000000..6393f188bf7
--- /dev/null
+++ b/jstests/mr_merge.js
@@ -0,0 +1,51 @@
+
+t = db.mr_merge;
+t.drop();
+
+t.insert( { a : [ 1 , 2 ] } )
+t.insert( { a : [ 2 , 3 ] } )
+t.insert( { a : [ 3 , 4 ] } )
+
+outName = "mr_merge_out";
+out = db[outName];
+out.drop();
+
+m = function(){ for (i=0; i<this.a.length; i++ ) emit( this.a[i] , 1 ); }
+r = function(k,vs){ return Array.sum( vs ); }
+
+function tos( o ){
+ var s = "";
+ for ( var i=0; i<100; i++ ){
+ if ( o[i] )
+ s += i + "_" + o[i];
+ }
+ return s;
+}
+
+
+res = t.mapReduce( m , r , { out : outName } )
+
+
+expected = { "1" : 1 , "2" : 2 , "3" : 2 , "4" : 1 }
+assert.eq( tos( expected ) , tos( res.convertToSingleObject() ) , "A" );
+
+t.insert( { a : [ 4 , 5 ] } )
+out.insert( { _id : 10 , value : "5" } )
+res = t.mapReduce( m , r , { out : outName } )
+
+expected["4"]++;
+expected["5"] = 1
+assert.eq( tos( expected ) , tos( res.convertToSingleObject() ) , "B" );
+
+t.insert( { a : [ 5 , 6 ] } )
+out.insert( { _id : 10 , value : "5" } )
+res = t.mapReduce( m , r , { out : outName , outType : "merge" } )
+
+expected["5"]++;
+expected["10"] = 5
+expected["6"] = 1
+
+assert.eq( tos( expected ) , tos( res.convertToSingleObject() ) , "C" );
+
+
+
diff --git a/jstests/mr_outreduce.js b/jstests/mr_outreduce.js
new file mode 100644
index 00000000000..e5878366fe4
--- /dev/null
+++ b/jstests/mr_outreduce.js
@@ -0,0 +1,41 @@
+
+t = db.mr_outreduce;
+t.drop();
+
+t.insert( { _id : 1 , a : [ 1 , 2 ] } )
+t.insert( { _id : 2 , a : [ 2 , 3 ] } )
+t.insert( { _id : 3 , a : [ 3 , 4 ] } )
+
+outName = "mr_outreduce_out";
+out = db[outName];
+out.drop();
+
+m = function(){ for (i=0; i<this.a.length; i++ ) emit( this.a[i] , 1 ); }
+r = function(k,vs){ return Array.sum( vs ); }
+
+function tos( o ){
+ var s = "";
+ for ( var i=0; i<100; i++ ){
+ if ( o[i] )
+ s += i + "_" + o[i] + "|"
+ }
+ return s;
+}
+
+
+res = t.mapReduce( m , r , { out : outName } )
+
+
+expected = { "1" : 1 , "2" : 2 , "3" : 2 , "4" : 1 }
+assert.eq( tos( expected ) , tos( res.convertToSingleObject() ) , "A" );
+
+t.insert( { _id : 4 , a : [ 4 , 5 ] } )
+out.insert( { _id : 10 , value : "5" } ) // this is a sentinal to make sure it wasn't killed
+res = t.mapReduce( m , r , { out : outName , outType : "reduce" , query : { _id : { $gt : 3 } } } )
+
+expected["4"]++;
+expected["5"] = 1
+expected["10"] = 5
+assert.eq( tos( expected ) , tos( res.convertToSingleObject() ) , "B" );
+
+