diff options
author | Eliot Horowitz <eliot@10gen.com> | 2010-12-16 16:38:24 -0500 |
---|---|---|
committer | Eliot Horowitz <eliot@10gen.com> | 2010-12-16 16:41:28 -0500 |
commit | c0fa67131864a2f2c5cf59f7cb83eb8612ab40b8 (patch) | |
tree | ce7abb68c8b2bc609b085597eb1d4747e5cbbb15 | |
parent | c2496691f11bab8bef8e9a1cbce0d356577d09cd (diff) | |
download | mongo-c0fa67131864a2f2c5cf59f7cb83eb8612ab40b8.tar.gz |
map/reduce no longer uses temp collections.
you have to specify out
SERVER-1837
-rw-r--r-- | db/commands/mr.cpp | 217 | ||||
-rw-r--r-- | db/commands/mr.h | 44 | ||||
-rw-r--r-- | jstests/mr1.js | 22 | ||||
-rw-r--r-- | jstests/mr2.js | 26 | ||||
-rw-r--r-- | jstests/mr3.js | 10 | ||||
-rw-r--r-- | jstests/mr4.js | 4 | ||||
-rw-r--r-- | jstests/mr5.js | 4 | ||||
-rw-r--r-- | jstests/mr_bigobject.js | 6 | ||||
-rw-r--r-- | jstests/mr_errorhandling.js | 6 | ||||
-rw-r--r-- | jstests/mr_index2.js | 6 | ||||
-rw-r--r-- | jstests/mr_merge.js | 2 | ||||
-rw-r--r-- | jstests/mr_outreduce.js | 2 | ||||
-rw-r--r-- | jstests/mr_sort.js | 6 | ||||
-rw-r--r-- | jstests/sharding/features2.js | 31 | ||||
-rw-r--r-- | s/commands_public.cpp | 4 | ||||
-rw-r--r-- | shell/collection.js | 13 | ||||
-rw-r--r-- | shell/mongo_vstudio.cpp | 13 |
17 files changed, 269 insertions, 147 deletions
diff --git a/db/commands/mr.cpp b/db/commands/mr.cpp index 4a6e4d07ae7..bc32180b565 100644 --- a/db/commands/mr.cpp +++ b/db/commands/mr.cpp @@ -75,7 +75,7 @@ namespace mongo { // since there are many cases where the point of finalize // is converting many fields to 1 BSONObjBuilder b; - b.append( o["_id"] ); + b.append( o.firstElement() ); s->append( b , "value" , "return" ); return b.obj(); } @@ -180,47 +180,53 @@ namespace mongo { ns = dbname + "." + cmdObj.firstElement().valuestr(); verbose = cmdObj["verbose"].trueValue(); - keeptemp = cmdObj["keeptemp"].trueValue(); - - { // setup names - stringstream ss; - if ( ! keeptemp ) - ss << "tmp."; - ss << "mr." << cmdObj.firstElement().String() << "_" << time(0) << "_" << JOB_NUMBER++; - tempShort = ss.str(); - tempLong = dbname + "." + tempShort; - incLong = tempLong + "_inc"; - - if ( ! keeptemp && markAsTemp ) - cc().addTempCollection( tempLong ); - replicate = keeptemp; + uassert( 13602 , "outType is no longer a valid option" , cmdObj["outType"].eoo() ); - if ( cmdObj["out"].type() == String ){ - finalShort = cmdObj["out"].valuestr(); - replicate = true; - } - else - finalShort = tempShort; - - finalLong = dbname + "." + finalShort; - + if ( cmdObj["out"].type() == String ){ + finalShort = cmdObj["out"].String(); + outType = REPLACE; } + else if ( cmdObj["out"].type() == Object ){ + BSONObj o = cmdObj["out"].embeddedObject(); + uassert( 13607 , "'out' has to have a single field" , o.nFields() == 1 ); + + BSONElement e = o.firstElement(); + string t = e.fieldName(); - if ( cmdObj["outType"].type() == String ){ - uassert( 13521 , "need 'out' if using 'outType'" , cmdObj["out"].type() == String ); - string t = cmdObj["outType"].String(); - if ( t == "normal" || t == "replace" ) + if ( t == "normal" || t == "replace" ){ outType = REPLACE; - else if ( t == "merge" ) + finalShort = e.String(); + } + else if ( t == "merge" ){ outType = MERGE; - else if ( t == "reduce" ) + finalShort = e.String(); + } + else if ( t == "reduce" ){ outType = REDUCE; - else - uasserted( 13522 , str::stream() << "unknown outType [" << t << "]" ); + finalShort = e.String(); + } + else if ( t == "inline" ){ + outType = INMEMORY; + } + else{ + uasserted( 13522 , str::stream() << "unknown out specifier [" << t << "]" ); + } } else { - outType = REPLACE; + uasserted( 13606 , "'out' has to be a string or an object" ); + } + + if ( outType != INMEMORY ){ // setup names + tempShort = str::stream() << "tmp.mr." << cmdObj.firstElement().String() << "_" << finalShort << "_" << JOB_NUMBER++; + tempLong = dbname + "." + tempShort; + + incLong = tempLong + "_inc"; + + //if ( markAsTemp ) + cc().addTempCollection( tempLong ); + + finalLong = dbname + "." + finalShort; } { // scope and code @@ -256,13 +262,16 @@ namespace mongo { } void State::prepTempCollection(){ + if ( ! _onDisk ) + return; + _db.dropCollection( _config.tempLong ); { // create writelock lock( _config.tempLong.c_str() ); Client::Context ctx( _config.tempLong.c_str() ); string errmsg; - assert( userCreateNS( _config.tempLong.c_str() , BSONObj() , errmsg , _config.replicate ) ); + assert( userCreateNS( _config.tempLong.c_str() , BSONObj() , errmsg , true ) ); } @@ -291,8 +300,41 @@ namespace mongo { } + void State::appendResults( BSONObjBuilder& final ){ + if ( _onDisk ) + return; + + uassert( 13604 , "too much data for in memory map/reduce" , _size < ( BSONObjMaxUserSize / 2 ) ); + + BSONArrayBuilder b( (int)(_size * 1.2) ); // _size is data size, doesn't count overhead and keys + + for ( InMemory::iterator i=_temp->begin(); i!=_temp->end(); ++i ){ + BSONObj key = i->first; + BSONList& all = i->second; + + assert( all.size() == 1 ); + + BSONObjIterator vi( all[0] ); + vi.next(); + + BSONObjBuilder temp( b.subobjStart() ); + temp.appendAs( key.firstElement() , "_id" ); + temp.appendAs( vi.next() , "value" ); + temp.done(); + } + + BSONArray res = b.arr(); + uassert( 13605 , "too much data for in memory map/reduce" , res.objsize() < ( BSONObjMaxUserSize * 2 / 3 ) ); + + final.append( "results" , res ); + } + long long State::renameIfNeeded(){ - assertInWriteLock(); + if ( ! _onDisk ) + return _temp->size(); + + dblock lock; + if ( _config.finalLong == _config.tempLong ) return _db.count( _config.finalLong ); @@ -342,26 +384,31 @@ namespace mongo { _db.dropCollection( _config.tempLong ); break; } + case Config::INMEMORY: { + return _temp->size(); + } } + return _db.count( _config.finalLong ); } void State::insert( const string& ns , BSONObj& o ){ + assert( _onDisk ); + writelock l( ns ); Client::Context ctx( ns ); - if ( _config.replicate ) - theDataFileMgr.insertAndLog( ns.c_str() , o , false ); - else - theDataFileMgr.insertWithObjMod( ns.c_str() , o , false ); + theDataFileMgr.insertAndLog( ns.c_str() , o , false ); } void State::_insertToInc( BSONObj& o ){ + assert( _onDisk ); theDataFileMgr.insertWithObjMod( _config.incLong.c_str() , o , true ); } State::State( const Config& c ) : _config( c ), _size(0), _numEmits(0){ _temp.reset( new InMemory() ); + _onDisk = _config.outType != Config::INMEMORY; } bool State::sourceExists(){ @@ -372,9 +419,16 @@ namespace mongo { return _db.count( _config.ns , _config.filter , 0 , (unsigned) _config.limit ); } - void State::cleanup(){ - _db.dropCollection( _config.tempLong ); - _db.dropCollection( _config.incLong ); + State::~State(){ + if ( _onDisk ){ + try { + _db.dropCollection( _config.tempLong ); + _db.dropCollection( _config.incLong ); + } + catch ( std::exception& e ){ + error() << "couldn't cleanup after map reduce: " << e.what() << endl; + } + } } void State::init(){ @@ -391,16 +445,17 @@ namespace mongo { _config.finalizer->init( this ); _scope->injectNative( "emit" , fast_emit ); - - // clear temp collections - _db.dropCollection( _config.tempLong ); - _db.dropCollection( _config.incLong ); - - writelock l( _config.incLong ); - Client::Context ctx( _config.incLong ); - string err; - assert( userCreateNS( _config.incLong.c_str() , BSON( "autoIndexId" << 0 ) , err , false ) ); + if ( _onDisk ){ + // clear temp collections + _db.dropCollection( _config.tempLong ); + _db.dropCollection( _config.incLong ); + + writelock l( _config.incLong ); + Client::Context ctx( _config.incLong ); + string err; + assert( userCreateNS( _config.incLong.c_str() , BSON( "autoIndexId" << 0 ) , err , false ) ); + } } @@ -415,6 +470,26 @@ namespace mongo { } void State::finalReduce( CurOp * op , ProgressMeterHolder& pm ){ + if ( ! _onDisk ){ + if ( _config.finalizer ){ + long size = 0; + for ( InMemory::iterator i=_temp->begin(); i!=_temp->end(); ++i ){ + BSONObj key = i->first; + BSONList& all = i->second; + + assert( all.size() == 1 ); + + BSONObj res = _config.finalizer->finalize( all[0] ); + + all.clear(); + all.push_back( res ); + size += res.objsize(); + } + _size = size; + } + return; + } + assert( _temp->size() == 0 ); // TODO: this is a bit sketchy @@ -477,6 +552,7 @@ namespace mongo { } void State::reduceInMemory(){ + InMemory * n = new InMemory(); // for new data long nSize = 0; @@ -485,10 +561,15 @@ namespace mongo { BSONList& all = i->second; if ( all.size() == 1 ){ - // this key has low cardinality, so just write to db - writelock l(_config.incLong); - Client::Context ctx(_config.incLong.c_str()); - _insertToInc( *(all.begin()) ); + if ( _onDisk ){ + // this key has low cardinality, so just write to db + writelock l(_config.incLong); + Client::Context ctx(_config.incLong.c_str()); + _insertToInc( *(all.begin()) ); + } + else { + _add( n , all[0] , nSize ); + } } else if ( all.size() > 1 ){ BSONObj res = _config.reducer->reduce( all ); @@ -501,6 +582,9 @@ namespace mongo { } void State::dumpToInc(){ + if ( ! _onDisk ) + return; + writelock l(_config.incLong); Client::Context ctx(_config.incLong); @@ -529,6 +613,9 @@ namespace mongo { } void State::checkSize(){ + if ( ! _onDisk ) + return; + if ( _size < 1024 * 5 ) return; @@ -670,16 +757,12 @@ namespace mongo { } catch ( ... ){ log() << "mr failed, removing collection" << endl; - state.cleanup(); throw; } - long long finalCount = 0; - { - dblock lock; - finalCount = state.renameIfNeeded(); - } - + long long finalCount = state.renameIfNeeded(); + state.appendResults( result ); + timingBuilder.append( "total" , t.millis() ); result.append( "result" , config.finalShort ); @@ -783,13 +866,9 @@ namespace mongo { state.dumpToInc(); - long long finalCount; - { - dblock lk; - finalCount = state.renameIfNeeded(); - } - log(0) << " mapreducefinishcommand " << config.finalLong << " " << finalCount << endl; - + state.renameIfNeeded(); + state.appendResults( result ); + for ( set<ServerAndQuery>::iterator i=servers.begin(); i!=servers.end(); i++ ){ ScopedDbConnection conn( i->_server ); conn->dropCollection( dbname + "." + shardedOutputCollection ); diff --git a/db/commands/mr.h b/db/commands/mr.h index 6d29fee78bd..6149bb5fa2b 100644 --- a/db/commands/mr.h +++ b/db/commands/mr.h @@ -42,7 +42,10 @@ namespace mongo { virtual ~Finalizer(){} virtual void init( State * state ) = 0; - virtual BSONObj finalize( const BSONObj& o ) = 0; + /** + * this takes a tuple and returns a tuple + */ + virtual BSONObj finalize( const BSONObj& tuple ) = 0; }; class Reducer : boost::noncopyable { @@ -150,8 +153,6 @@ namespace mongo { // options bool verbose; - bool keeptemp; - bool replicate; // query options @@ -179,7 +180,8 @@ namespace mongo { enum { REPLACE , // atomically replace the collection MERGE , // merge keys, override dups - REDUCE // merge keys, reduce dups + REDUCE , // merge keys, reduce dups + INMEMORY // only store in memory, limited in size } outType; static AtomicUInt JOB_NUMBER; @@ -192,6 +194,8 @@ namespace mongo { class State { public: State( const Config& c ); + ~State(); + void init(); // ---- prep ----- @@ -231,15 +235,18 @@ namespace mongo { void finalReduce( CurOp * op , ProgressMeterHolder& pm ); // ------- cleanup/data positioning ---------- - + /** @return number objects in collection */ long long renameIfNeeded(); - /** removes temp collections */ - void cleanup(); - + /** + * if INMEMORY will append + * may also append stats or anything else it likes + */ + void appendResults( BSONObjBuilder& b ); + // -------- util ------------ /** @@ -263,33 +270,16 @@ namespace mongo { scoped_ptr<Scope> _scope; const Config& _config; + bool _onDisk; // if the end result of this map reduce is disk or not DBDirectClient _db; scoped_ptr<InMemory> _temp; - long _size; + long _size; // bytes in _temp long long _numEmits; }; - /** - * keeps all temporary state in memory - * if data is larger than can fit in a BSONObj to return - * will throw an exception - */ - class StateInMemory : public State { - - }; - - /** - * keeps some things in memory and pushes - * to disk when gets too big for ram - * intended for when output will end up on disk - */ - class StateOnDisk : public State { - - }; - BSONObj fast_emit( const BSONObj& args ); } // end mr namespace diff --git a/jstests/mr1.js b/jstests/mr1.js index aacd69b26fa..dc81534c36e 100644 --- a/jstests/mr1.js +++ b/jstests/mr1.js @@ -49,7 +49,7 @@ r2 = function( key , values ){ return total; }; -res = db.runCommand( { mapreduce : "mr1" , map : m , reduce : r } ); +res = db.runCommand( { mapreduce : "mr1" , map : m , reduce : r , out : "mr1_out" } ); d( res ); if ( ks == "_id" ) assert( res.ok , "not ok" ); assert.eq( 4 , res.counts.input , "A" ); @@ -66,7 +66,7 @@ assert.eq( 3 , z.b , "E" ); assert.eq( 3 , z.c , "F" ); x.drop(); -res = db.runCommand( { mapreduce : "mr1" , map : m , reduce : r , query : { x : { "$gt" : 2 } } } ); +res = db.runCommand( { mapreduce : "mr1" , map : m , reduce : r , query : { x : { "$gt" : 2 } } , out : "mr1_out" } ); d( res ); assert.eq( 2 , res.counts.input , "B" ); x = db[res.result]; @@ -77,7 +77,7 @@ assert.eq( 1 , z.b , "C2" ); assert.eq( 2 , z.c , "C3" ); x.drop(); -res = db.runCommand( { mapreduce : "mr1" , map : m2 , reduce : r2 , query : { x : { "$gt" : 2 } } } ); +res = db.runCommand( { mapreduce : "mr1" , map : m2 , reduce : r2 , query : { x : { "$gt" : 2 } } , out : "mr1_out" } ); d( res ); assert.eq( 2 , res.counts.input , "B" ); x = db[res.result]; @@ -104,7 +104,7 @@ for ( i=5; i<1000; i++ ){ t.save( { x : i , tags : [ "b" , "d" ] } ); } -res = db.runCommand( { mapreduce : "mr1" , map : m , reduce : r } ); +res = db.runCommand( { mapreduce : "mr1" , map : m , reduce : r , out : "mr1_out" } ); d( res ); assert.eq( 999 , res.counts.input , "Z1" ); x = db[res.result]; @@ -125,12 +125,12 @@ assert.eq( 995 , getk( "d" ).value.count , "ZD" ); x.drop(); if ( true ){ - printjson( db.runCommand( { mapreduce : "mr1" , map : m , reduce : r , verbose : true } ) ); + printjson( db.runCommand( { mapreduce : "mr1" , map : m , reduce : r , verbose : true , out : "mr1_out" } ) ); } print( "t1: " + Date.timeFunc( function(){ - var out = db.runCommand( { mapreduce : "mr1" , map : m , reduce : r } ); + var out = db.runCommand( { mapreduce : "mr1" , map : m , reduce : r , out : "mr1_out" } ); if ( ks == "_id" ) assert( out.ok , "XXX : " + tojson( out ) ); db[out.result].drop(); } , 10 ) + " (~500 on 2.8ghz) - itcount: " + Date.timeFunc( function(){ db.mr1.find().itcount(); } , 10 ) ); @@ -138,7 +138,7 @@ print( "t1: " + Date.timeFunc( // test doesn't exist -res = db.runCommand( { mapreduce : "lasjdlasjdlasjdjasldjalsdj12e" , map : m , reduce : r } ); +res = db.runCommand( { mapreduce : "lasjdlasjdlasjdjasldjalsdj12e" , map : m , reduce : r , out : "mr1_out" } ); assert( ! res.ok , "should be not ok" ); if ( true ){ @@ -166,11 +166,15 @@ if ( true ){ } x.drop(); - res = db.runCommand( { mapreduce : "mr1" , out : "mr1_foo" , map : m2 , reduce : r2 } ); + res = db.runCommand( { mapreduce : "mr1" , out : "mr1_foo" , map : m2 , reduce : r2 , out : "mr1_out" } ); d(res); print( "t3: " + res.timeMillis + " (~3500 on 2.8ghz)" ); + + res = db.runCommand( { mapreduce : "mr1" , map : m2 , reduce : r2 , out : { inline : true } } ); + print( "t4: " + res.timeMillis ); + } -res = db.runCommand( { mapreduce : "mr1" , map : m , reduce : r } ); +res = db.runCommand( { mapreduce : "mr1" , map : m , reduce : r , out : "mr1_out" } ); assert( res.ok , "should be ok" ); diff --git a/jstests/mr2.js b/jstests/mr2.js index 0a8e9d68442..d55906e31a9 100644 --- a/jstests/mr2.js +++ b/jstests/mr2.js @@ -29,7 +29,12 @@ function r( who , values ){ function reformat( r ){ var x = {}; - r.find().forEach( + var cursor; + if ( r.results ) + cursor = r.results; + else + cursor = r.find(); + cursor.forEach( function(z){ x[z._id] = z.value; } @@ -41,10 +46,21 @@ function f( who , res ){ res.avg = res.totalSize / res.num; return res; } -res = t.mapReduce( m , r , { finalize : f } ); + +res = t.mapReduce( m , r , { finalize : f , out : "mr2_out" } ); +printjson( res ) x = reformat( res ); -assert.eq( 9 , x.a.avg , "A" ); -assert.eq( 16 , x.b.avg , "B" ); -assert.eq( 18 , x.c.avg , "C" ); +assert.eq( 9 , x.a.avg , "A1" ); +assert.eq( 16 , x.b.avg , "A2" ); +assert.eq( 18 , x.c.avg , "A3" ); res.drop(); +res = t.mapReduce( m , r , { finalize : f , out : { inline : 1 } } ); +printjson( res ) +x = reformat( res ); +assert.eq( 9 , x.a.avg , "B1" ); +assert.eq( 16 , x.b.avg , "B2" ); +assert.eq( 18 , x.c.avg , "B3" ); +res.drop(); + + diff --git a/jstests/mr3.js b/jstests/mr3.js index e7d1f2c16d6..3b0a918a4f3 100644 --- a/jstests/mr3.js +++ b/jstests/mr3.js @@ -25,7 +25,7 @@ r = function( key , values ){ return { count : total }; }; -res = t.mapReduce( m , r ); +res = t.mapReduce( m , r , { out : "mr3_out" } ); z = res.convertToSingleObject() assert.eq( 3 , Object.keySet( z ).length , "A1" ); @@ -35,7 +35,7 @@ assert.eq( 3 , z.c.count , "A4" ); res.drop(); -res = t.mapReduce( m , r , { mapparams : [ 2 , 2 ] } ); +res = t.mapReduce( m , r , { out : "mr3_out" , mapparams : [ 2 , 2 ] } ); z = res.convertToSingleObject() assert.eq( 3 , Object.keySet( z ).length , "B1" ); @@ -52,7 +52,7 @@ realm = m; m = function(){ emit( this._id , 1 ); } -res = t.mapReduce( m , r ); +res = t.mapReduce( m , r , { out : "mr3_out" } ); res.drop(); m = function(){ @@ -60,7 +60,7 @@ m = function(){ } before = db.getCollectionNames().length; -assert.throws( function(){ t.mapReduce( m , r ); } ); +assert.throws( function(){ t.mapReduce( m , r , { out : "mr3_out" } ); } ); assert.eq( before , db.getCollectionNames().length , "after throw crap" ); @@ -69,5 +69,5 @@ r = function( k , v ){ return v.x.x.x; } before = db.getCollectionNames().length; -assert.throws( function(){ t.mapReduce( m , r ); } ); +assert.throws( function(){ t.mapReduce( m , r , "mr3_out" ) } ) assert.eq( before , db.getCollectionNames().length , "after throw crap" ); diff --git a/jstests/mr4.js b/jstests/mr4.js index b14cdfef88e..78c8bce8953 100644 --- a/jstests/mr4.js +++ b/jstests/mr4.js @@ -23,7 +23,7 @@ r = function( key , values ){ return { count : total }; }; -res = t.mapReduce( m , r , { scope : { xx : 1 } } ); +res = t.mapReduce( m , r , { out : "mr4_out" , scope : { xx : 1 } } ); z = res.convertToSingleObject() assert.eq( 3 , Object.keySet( z ).length , "A1" ); @@ -34,7 +34,7 @@ assert.eq( 3 , z.c.count , "A4" ); res.drop(); -res = t.mapReduce( m , r , { scope : { xx : 2 } } ); +res = t.mapReduce( m , r , { scope : { xx : 2 } , out : "mr4_out" } ); z = res.convertToSingleObject() assert.eq( 3 , Object.keySet( z ).length , "A1" ); diff --git a/jstests/mr5.js b/jstests/mr5.js index bbac3fec4f3..50a63d1d55b 100644 --- a/jstests/mr5.js +++ b/jstests/mr5.js @@ -25,7 +25,7 @@ r = function( k , v ){ return { stats : stats , total : total } } -res = t.mapReduce( m , r , { scope : { xx : 1 } } ); +res = t.mapReduce( m , r , { out : "mr5_out" , scope : { xx : 1 } } ); //res.find().forEach( printjson ) z = res.convertToSingleObject() @@ -44,7 +44,7 @@ m = function(){ -res = t.mapReduce( m , r , { scope : { xx : 1 } } ); +res = t.mapReduce( m , r , { out : "mr5_out" , scope : { xx : 1 } } ); //res.find().forEach( printjson ) z = res.convertToSingleObject() diff --git a/jstests/mr_bigobject.js b/jstests/mr_bigobject.js index 8224209379c..0f76e7b981e 100644 --- a/jstests/mr_bigobject.js +++ b/jstests/mr_bigobject.js @@ -18,13 +18,13 @@ r = function( k , v ){ return 1; } -assert.throws( function(){ t.mapReduce( m , r ); } , "emit should fail" ) +assert.throws( function(){ t.mapReduce( m , r , "mr_bigobject_out" ); } , "emit should fail" ) m = function(){ emit( 1 , this.s ); } -assert.eq( { 1 : 1 } , t.mapReduce( m , r ).convertToSingleObject() , "A1" ) +assert.eq( { 1 : 1 } , t.mapReduce( m , r , "mr_bigobject_out" ).convertToSingleObject() , "A1" ) r = function( k , v ){ total = 0; @@ -38,4 +38,4 @@ r = function( k , v ){ return total; } -assert.eq( { 1 : 10 * s.length } , t.mapReduce( m , r ).convertToSingleObject() , "A1" ) +assert.eq( { 1 : 10 * s.length } , t.mapReduce( m , r , "mr_bigobject_out" ).convertToSingleObject() , "A1" ) diff --git a/jstests/mr_errorhandling.js b/jstests/mr_errorhandling.js index 57724f14a88..df2d8e83151 100644 --- a/jstests/mr_errorhandling.js +++ b/jstests/mr_errorhandling.js @@ -24,7 +24,7 @@ r = function( k , v ){ return total; } -res = t.mapReduce( m_good , r ); +res = t.mapReduce( m_good , r , "mr_errorhandling_out" ); assert.eq( { 1 : 1 , 2 : 2 , 3 : 2 , 4 : 1 } , res.convertToSingleObject() , "A" ); res.drop() @@ -32,7 +32,7 @@ res = null; theerror = null; try { - res = t.mapReduce( m_bad , r ); + res = t.mapReduce( m_bad , r , "mr_errorhandling_out" ); } catch ( e ){ theerror = e.toString(); @@ -42,6 +42,6 @@ assert( theerror , "B2" ); assert( theerror.indexOf( "emit" ) >= 0 , "B3" ); // test things are still in an ok state -res = t.mapReduce( m_good , r ); +res = t.mapReduce( m_good , r , "mr_errorhandling_out" ); assert.eq( { 1 : 1 , 2 : 2 , 3 : 2 , 4 : 1 } , res.convertToSingleObject() , "A" ); res.drop() diff --git a/jstests/mr_index2.js b/jstests/mr_index2.js index 0a8b19bdc37..a8d845ed69d 100644 --- a/jstests/mr_index2.js +++ b/jstests/mr_index2.js @@ -7,16 +7,16 @@ t.save( { arr : [1, 2] } ) map = function() { emit(this._id, 1) } reduce = function(k,vals) { return Array.sum( vals ); } -res = t.mapReduce(map,reduce, { query : {} }) +res = t.mapReduce(map,reduce, { out : "mr_index2_out" , query : {} }) assert.eq( 1 ,res.counts.input , "A" ) res.drop() -res = t.mapReduce(map,reduce, { query : { arr: {$gte:0} } }) +res = t.mapReduce(map,reduce, { out : "mr_index2_out" , query : { arr: {$gte:0} } }) assert.eq( 1 ,res.counts.input , "B" ) res.drop() t.ensureIndex({arr:1}) -res = t.mapReduce(map,reduce, { query : { arr: {$gte:0} } }) +res = t.mapReduce(map,reduce, { out : "mr_index2_out" , query : { arr: {$gte:0} } }) assert.eq( 1 ,res.counts.input , "C" ) res.drop(); diff --git a/jstests/mr_merge.js b/jstests/mr_merge.js index 6393f188bf7..c008ebb7b89 100644 --- a/jstests/mr_merge.js +++ b/jstests/mr_merge.js @@ -39,7 +39,7 @@ assert.eq( tos( expected ) , tos( res.convertToSingleObject() ) , "B" ); t.insert( { a : [ 5 , 6 ] } ) out.insert( { _id : 10 , value : "5" } ) -res = t.mapReduce( m , r , { out : outName , outType : "merge" } ) +res = t.mapReduce( m , r , { out : { merge : outName } } ) expected["5"]++; expected["10"] = 5 diff --git a/jstests/mr_outreduce.js b/jstests/mr_outreduce.js index e5878366fe4..87cba98d6d8 100644 --- a/jstests/mr_outreduce.js +++ b/jstests/mr_outreduce.js @@ -31,7 +31,7 @@ assert.eq( tos( expected ) , tos( res.convertToSingleObject() ) , "A" ); t.insert( { _id : 4 , a : [ 4 , 5 ] } ) out.insert( { _id : 10 , value : "5" } ) // this is a sentinal to make sure it wasn't killed -res = t.mapReduce( m , r , { out : outName , outType : "reduce" , query : { _id : { $gt : 3 } } } ) +res = t.mapReduce( m , r , { out : { reduce : outName } , query : { _id : { $gt : 3 } } } ) expected["4"]++; expected["5"] = 1 diff --git a/jstests/mr_sort.js b/jstests/mr_sort.js index 76920625d96..cc8db18e174 100644 --- a/jstests/mr_sort.js +++ b/jstests/mr_sort.js @@ -24,17 +24,17 @@ r = function( k , v ){ } -res = t.mapReduce( m , r ); +res = t.mapReduce( m , r , "mr_sort_out " ); x = res.convertToSingleObject(); res.drop(); assert.eq( { "a" : 55 } , x , "A1" ) -res = t.mapReduce( m , r , { query : { x : { $lt : 3 } } } ) +res = t.mapReduce( m , r , { out : "mr_sort_out" , query : { x : { $lt : 3 } } } ) x = res.convertToSingleObject(); res.drop(); assert.eq( { "a" : 3 } , x , "A2" ) -res = t.mapReduce( m , r , { sort : { x : 1 } , limit : 2 } ); +res = t.mapReduce( m , r , { out : "mr_sort_out" , sort : { x : 1 } , limit : 2 } ); x = res.convertToSingleObject(); res.drop(); assert.eq( { "a" : 3 } , x , "A3" ) diff --git a/jstests/sharding/features2.js b/jstests/sharding/features2.js index dfb28834443..6cb8108ae52 100644 --- a/jstests/sharding/features2.js +++ b/jstests/sharding/features2.js @@ -92,8 +92,10 @@ r = function( key , values ){ doMR = function( n ){ print(n); - - var res = db.mr.mapReduce( m , r ); + + // on-disk + + var res = db.mr.mapReduce( m , r , "smr1_out" ); printjson( res ); assert.eq( new NumberLong(4) , res.counts.input , "MR T0 " + n ); @@ -103,11 +105,30 @@ doMR = function( n ){ var z = {}; x.find().forEach( function(a){ z[a._id] = a.value.count; } ); assert.eq( 3 , Object.keySet( z ).length , "MR T2 " + n ); - assert.eq( 2 , z.a , "MR T2 " + n ); - assert.eq( 3 , z.b , "MR T2 " + n ); - assert.eq( 3 , z.c , "MR T2 " + n ); + assert.eq( 2 , z.a , "MR T3 " + n ); + assert.eq( 3 , z.b , "MR T4 " + n ); + assert.eq( 3 , z.c , "MR T5 " + n ); x.drop(); + + // inline + + var res = db.mr.mapReduce( m , r , { out : { inline : 1 } } ); + printjson( res ); + assert.eq( new NumberLong(4) , res.counts.input , "MR T6 " + n ); + + var z = {}; + res.find().forEach( function(a){ z[a._id] = a.value.count; } ); + printjson( z ); + assert.eq( 3 , Object.keySet( z ).length , "MR T7 " + n ) ; + assert.eq( 2 , z.a , "MR T8 " + n ); + assert.eq( 3 , z.b , "MR T9 " + n ); + assert.eq( 3 , z.c , "MR TA " + n ); + + print( "sleeping for eliot" ); + sleep( 20000 ); + + } doMR( "before" ); diff --git a/s/commands_public.cpp b/s/commands_public.cpp index 4a4e433e785..de3f37c67d8 100644 --- a/s/commands_public.cpp +++ b/s/commands_public.cpp @@ -842,9 +842,7 @@ namespace mongo { fn == "verbose" ){ b.append( e ); } - else if ( fn == "keeptemp" || - fn == "out" || - fn == "outType" || + else if ( fn == "out" || fn == "finalize" ){ // we don't want to copy these } diff --git a/shell/collection.js b/shell/collection.js index 99a377d17a2..174b70ae322 100644 --- a/shell/collection.js +++ b/shell/collection.js @@ -541,6 +541,8 @@ MapReduceResult.prototype._simpleKeys = function(){ } MapReduceResult.prototype.find = function(){ + if ( this.results ) + return this.results; return DBCollection.prototype.find.apply( this._coll , arguments ); } @@ -560,10 +562,15 @@ MapReduceResult.prototype.convertToSingleObject = function(){ /** * @param optional object of optional fields; */ -DBCollection.prototype.mapReduce = function( map , reduce , optional ){ +DBCollection.prototype.mapReduce = function( map , reduce , optionsOrOutString ){ var c = { mapreduce : this._shortName , map : map , reduce : reduce }; - if ( optional ) - Object.extend( c , optional ); + assert( optionsOrOutString , "need to an optionsOrOutString" ) + + if ( typeof( optionsOrOutString ) == "string" ) + c["out"] = optionsOrOutString; + else + Object.extend( c , optionsOrOutString ); + var raw = this._db.runCommand( c ); if ( ! raw.ok ) throw "map reduce failed: " + tojson( raw ); diff --git a/shell/mongo_vstudio.cpp b/shell/mongo_vstudio.cpp index f70a8a7500b..449d2c2e4bc 100644 --- a/shell/mongo_vstudio.cpp +++ b/shell/mongo_vstudio.cpp @@ -3289,6 +3289,8 @@ const StringData _jscode_raw_collection = "}\n" "\n" "MapReduceResult.prototype.find = function(){\n" +"if ( this.results )\n" +"return this.results;\n" "return DBCollection.prototype.find.apply( this._coll , arguments );\n" "}\n" "\n" @@ -3308,10 +3310,15 @@ const StringData _jscode_raw_collection = "/**\n" "* @param optional object of optional fields;\n" "*/\n" -"DBCollection.prototype.mapReduce = function( map , reduce , optional ){\n" +"DBCollection.prototype.mapReduce = function( map , reduce , optionsOrOutString ){\n" "var c = { mapreduce : this._shortName , map : map , reduce : reduce };\n" -"if ( optional )\n" -"Object.extend( c , optional );\n" +"assert( optionsOrOutString , \"need to an optionsOrOutString\" )\n" +"\n" +"if ( typeof( optionsOrOutString ) == \"string\" )\n" +"c[\"out\"] = optionsOrOutString;\n" +"else\n" +"Object.extend( c , optionsOrOutString );\n" +"\n" "var raw = this._db.runCommand( c );\n" "if ( ! raw.ok )\n" "throw \"map reduce failed: \" + tojson( raw );\n" |