diff options
author | dwight <dwight@10gen.com> | 2010-11-15 16:00:51 -0500 |
---|---|---|
committer | dwight <dwight@10gen.com> | 2010-11-15 16:00:51 -0500 |
commit | 5f0fdc432c32a692bf464168066462ef760d578f (patch) | |
tree | 1052ef1a4be7723653bcbfbabca8be7b6e958b73 | |
parent | 56f4efb5dde5664071dd5016fe40f74ae4521ec2 (diff) | |
parent | c220a4a7679b9ffc6e920bd023742b41107cfcba (diff) | |
download | mongo-5f0fdc432c32a692bf464168066462ef760d578f.tar.gz |
Merge branch 'master' of github.com:mongodb/mongo
-rw-r--r-- | db/commands/mr.cpp | 518 | ||||
-rw-r--r-- | db/commands/mr.h | 153 | ||||
-rw-r--r-- | db/dbhelpers.cpp | 40 | ||||
-rw-r--r-- | db/dbhelpers.h | 16 | ||||
-rw-r--r-- | dbtests/querytests.cpp | 18 | ||||
-rw-r--r-- | jstests/mr_merge.js | 51 | ||||
-rw-r--r-- | jstests/mr_outreduce.js | 41 | ||||
-rw-r--r-- | s/s_only.cpp | 6 |
8 files changed, 499 insertions, 344 deletions
diff --git a/db/commands/mr.cpp b/db/commands/mr.cpp index a917aaa7cb6..6398ed7bc7f 100644 --- a/db/commands/mr.cpp +++ b/db/commands/mr.cpp @@ -27,25 +27,15 @@ #include "../matcher.h" #include "../clientcursor.h" +#include "mr.h" + namespace mongo { namespace mr { - typedef vector<BSONObj> BSONList; - - class MyCmp { - public: - MyCmp(){} - bool operator()( const BSONObj &l, const BSONObj &r ) const { - return l.firstElement().woCompare( r.firstElement() ) < 0; - } - }; + AtomicUInt MRSetup::JOB_NUMBER; - typedef pair<BSONObj,BSONObj> Data; - //typedef list< Data > InMemory; - typedef map< BSONObj,BSONList,MyCmp > InMemory; - - BSONObj reduceValues( BSONList& values , Scope * s , ScriptingFunction reduce , bool final , ScriptingFunction finalize ){ + BSONObj reduceValues( BSONList& values , MRReduceState * state , bool final ){ uassert( 10074 , "need values" , values.size() ); int sizeEstimate = ( values.size() * values.begin()->getField( "value" ).size() ) + 128; @@ -82,8 +72,8 @@ namespace mongo { valueBuilder->done(); BSONObj args = reduceArgs.obj(); - s->invokeSafe( reduce , args ); - if ( s->type( "return" ) == Array ){ + state->scope->invokeSafe( state->reduce , args ); + if ( state->scope->type( "return" ) == Array ){ uassert( 10075 , "reduce -> multiple not supported yet",0); return BSONObj(); } @@ -97,305 +87,301 @@ namespace mongo { } BSONObjBuilder temp( endSizeEstimate ); temp.append( key.firstElement() ); - s->append( temp , "1" , "return" ); + state->scope->append( temp , "1" , "return" ); x.push_back( temp.obj() ); - return reduceValues( x , s , reduce , final , finalize ); + return reduceValues( x , state , final ); } - if ( finalize ){ - Scope::NoDBAccess no = s->disableDBAccess( "can't access db inside finalize" ); + if ( state->finalize ){ + Scope::NoDBAccess no = state->scope->disableDBAccess( "can't access db inside finalize" ); BSONObjBuilder b(endSizeEstimate); b.appendAs( key.firstElement() , "_id" ); - s->append( b , "value" , "return" ); - s->invokeSafe( finalize , b.obj() ); + state->scope->append( b , "value" , "return" ); + state->scope->invokeSafe( state->finalize , b.obj() ); } BSONObjBuilder b(endSizeEstimate); b.appendAs( key.firstElement() , final ? "_id" : "0" ); - s->append( b , final ? "value" : "1" , "return" ); + state->scope->append( b , final ? "value" : "1" , "return" ); return b.obj(); } - class MRSetup { - public: - MRSetup( const string& _dbname , const BSONObj& cmdObj , bool markAsTemp = true ){ - static int jobNumber = 1; + MRSetup::MRSetup( const string& _dbname , const BSONObj& cmdObj , bool markAsTemp ){ + + dbname = _dbname; + ns = dbname + "." + cmdObj.firstElement().valuestr(); + + verbose = cmdObj["verbose"].trueValue(); + keeptemp = cmdObj["keeptemp"].trueValue(); + + { // setup names + stringstream ss; + if ( ! keeptemp ) + ss << "tmp."; + ss << "mr." << cmdObj.firstElement().String() << "_" << time(0) << "_" << JOB_NUMBER++; + tempShort = ss.str(); + tempLong = dbname + "." + tempShort; + incLong = tempLong + "_inc"; - dbname = _dbname; - ns = dbname + "." + cmdObj.firstElement().valuestr(); + if ( ! keeptemp && markAsTemp ) + cc().addTempCollection( tempLong ); - verbose = cmdObj["verbose"].trueValue(); - keeptemp = cmdObj["keeptemp"].trueValue(); - - { // setup names - stringstream ss; - if ( ! keeptemp ) - ss << "tmp."; - ss << "mr." << cmdObj.firstElement().String() << "_" << time(0) << "_" << jobNumber++; - tempShort = ss.str(); - tempLong = dbname + "." + tempShort; - incLong = tempLong + "_inc"; - - if ( ! keeptemp && markAsTemp ) - cc().addTempCollection( tempLong ); - - replicate = keeptemp; - - if ( cmdObj["out"].type() == String ){ - finalShort = cmdObj["out"].valuestr(); - replicate = true; - } - else - finalShort = tempShort; + replicate = keeptemp; + + if ( cmdObj["out"].type() == String ){ + finalShort = cmdObj["out"].valuestr(); + replicate = true; + } + else + finalShort = tempShort; - finalLong = dbname + "." + finalShort; + finalLong = dbname + "." + finalShort; - } - - { // scope and code - // NOTE: function scopes are merged with m/r scope, not nested like they should be - BSONObjBuilder scopeBuilder; + } + + if ( cmdObj["outType"].type() == String ){ + uassert( 13521 , "need 'out' if using 'outType'" , cmdObj["out"].type() == String ); + string t = cmdObj["outType"].String(); + if ( t == "normal" ) + outType = NORMAL; + else if ( t == "merge" ) + outType = MERGE; + else if ( t == "reduce" ) + outType = REDUCE; + else + uasserted( 13522 , str::stream() << "unknown outType [" << t << "]" ); + } + else { + outType = NORMAL; + } - if ( cmdObj["scope"].type() == Object ){ - scopeBuilder.appendElements( cmdObj["scope"].embeddedObjectUserCheck() ); - } + { // scope and code + // NOTE: function scopes are merged with m/r scope, not nested like they should be + BSONObjBuilder scopeBuilder; - mapCode = scopeAndCode( scopeBuilder, cmdObj["map"] ); - reduceCode = scopeAndCode( scopeBuilder, cmdObj["reduce"] ); - if ( cmdObj["finalize"].type() ){ - finalizeCode = scopeAndCode( scopeBuilder, cmdObj["finalize"] ); - } - - scopeSetup = scopeBuilder.obj(); + if ( cmdObj["scope"].type() == Object ){ + scopeBuilder.appendElements( cmdObj["scope"].embeddedObjectUserCheck() ); + } - if ( cmdObj["mapparams"].type() == Array ){ - mapparams = cmdObj["mapparams"].embeddedObjectUserCheck(); - } - + mapCode = scopeAndCode( scopeBuilder, cmdObj["map"] ); + reduceCode = scopeAndCode( scopeBuilder, cmdObj["reduce"] ); + if ( cmdObj["finalize"].type() ){ + finalizeCode = scopeAndCode( scopeBuilder, cmdObj["finalize"] ); } - - { // query options - if ( cmdObj["query"].type() == Object ){ - filter = cmdObj["query"].embeddedObjectUserCheck(); - } - if ( cmdObj["sort"].type() == Object ){ - sort = cmdObj["sort"].embeddedObjectUserCheck(); - } + scopeSetup = scopeBuilder.obj(); - if ( cmdObj["limit"].isNumber() ) - limit = cmdObj["limit"].numberLong(); - else - limit = 0; + if ( cmdObj["mapparams"].type() == Array ){ + mapparams = cmdObj["mapparams"].embeddedObjectUserCheck(); } + } + + { // query options + if ( cmdObj["query"].type() == Object ){ + filter = cmdObj["query"].embeddedObjectUserCheck(); + } + + if ( cmdObj["sort"].type() == Object ){ + sort = cmdObj["sort"].embeddedObjectUserCheck(); + } - /** Field expected to be a Code or CodeWScope. - * Add its scope, if any, to scopeBuilder, and return its code. - * Scopes added later will shadow those added earlier. */ - static string scopeAndCode (BSONObjBuilder& scopeBuilder, const BSONElement& field) { - if ( field.type() == CodeWScope ) - scopeBuilder.appendElements( field.codeWScopeObject() ); - return field._asCode(); + if ( cmdObj["limit"].isNumber() ) + limit = cmdObj["limit"].numberLong(); + else + limit = 0; } + } + + string MRSetup::scopeAndCode (BSONObjBuilder& scopeBuilder, const BSONElement& field) { + if ( field.type() == CodeWScope ) + scopeBuilder.appendElements( field.codeWScopeObject() ); + return field._asCode(); + } - /** - @return number objects in collection - */ - long long renameIfNeeded( DBDirectClient& db ){ - if ( finalLong != tempLong ){ + long long MRSetup::renameIfNeeded( DBDirectClient& db , MRReduceState * state ){ + assertInWriteLock(); + if ( finalLong != tempLong ){ + + if ( outType == NORMAL ){ db.dropCollection( finalLong ); if ( db.count( tempLong ) ){ BSONObj info; - uassert( 10076 , "rename failed" , db.runCommand( "admin" , BSON( "renameCollection" << tempLong << "to" << finalLong ) , info ) ); + uassert( 10076 , "rename failed" , + db.runCommand( "admin" , BSON( "renameCollection" << tempLong << "to" << finalLong ) , info ) ); } } - return db.count( finalLong ); - } + else if ( outType == MERGE ){ + auto_ptr<DBClientCursor> cursor = db.query( tempLong , BSONObj() ); + while ( cursor->more() ){ + BSONObj o = cursor->next(); + Helpers::upsert( finalLong , o ); + } + db.dropCollection( tempLong ); + } + else if ( outType == REDUCE ){ + BSONList values; + + auto_ptr<DBClientCursor> cursor = db.query( tempLong , BSONObj() ); + while ( cursor->more() ){ + BSONObj temp = cursor->next(); + BSONObj old; - void insert( const string& ns , BSONObj& o ){ - writelock l( ns ); - Client::Context ctx( ns ); + bool found; + { + Client::Context tx( finalLong ); + found = Helpers::findOne( finalLong.c_str() , temp["_id"].wrap() , old , true ); + } + + if ( found ){ + // need to reduce + values.clear(); + values.push_back( temp ); + values.push_back( old ); + Helpers::upsert( finalLong , reduceValues( values , state , true ) ); + } + else { + Helpers::upsert( finalLong , temp ); + } + } + db.dropCollection( tempLong ); + } + else { + assert(0); + } + } + return db.count( finalLong ); + } + + void MRSetup::insert( const string& ns , BSONObj& o ){ + writelock l( ns ); + Client::Context ctx( ns ); - if ( replicate ) - theDataFileMgr.insertAndLog( ns.c_str() , o , false ); - else - theDataFileMgr.insertWithObjMod( ns.c_str() , o , false ); + if ( replicate ) + theDataFileMgr.insertAndLog( ns.c_str() , o , false ); + else + theDataFileMgr.insertWithObjMod( ns.c_str() , o , false ); - } + } - string dbname; - string ns; - - // options - bool verbose; - bool keeptemp; - bool replicate; - // query options - - BSONObj filter; - BSONObj sort; - long long limit; + MRState::MRState( MRSetup& s ) + : setup(s){ + } - // functions + void MRState::init(){ + scope.reset(globalScriptEngine->getPooledScope( setup.dbname ).release() ); + scope->localConnect( setup.dbname.c_str() ); - string mapCode; - string reduceCode; - string finalizeCode; + map = scope->createFunction( setup.mapCode.c_str() ); + if ( ! map ) + throw UserException( 9012, (string)"map compile failed: " + scope->getError() ); - BSONObj mapparams; - BSONObj scopeSetup; + reduce = scope->createFunction( setup.reduceCode.c_str() ); + if ( ! reduce ) + throw UserException( 9013, (string)"reduce compile failed: " + scope->getError() ); - // output tables - string incLong; + if ( setup.finalizeCode.size() ) + finalize = scope->createFunction( setup.finalizeCode.c_str() ); + else + finalize = 0; - string tempShort; - string tempLong; + if ( ! setup.scopeSetup.isEmpty() ) + scope->init( &setup.scopeSetup ); - string finalShort; - string finalLong; + db.dropCollection( setup.tempLong ); + db.dropCollection( setup.incLong ); - }; // end MRsetup - - class MRState { - public: - MRState( MRSetup& s ) : setup(s){ - scope = globalScriptEngine->getPooledScope( setup.dbname ); - scope->localConnect( setup.dbname.c_str() ); - - map = scope->createFunction( setup.mapCode.c_str() ); - if ( ! map ) - throw UserException( 9012, (string)"map compile failed: " + scope->getError() ); - - reduce = scope->createFunction( setup.reduceCode.c_str() ); - if ( ! reduce ) - throw UserException( 9013, (string)"reduce compile failed: " + scope->getError() ); - - if ( setup.finalizeCode.size() ) - finalize = scope->createFunction( setup.finalizeCode.c_str() ); - else - finalize = 0; - - if ( ! setup.scopeSetup.isEmpty() ) - scope->init( &setup.scopeSetup ); - - db.dropCollection( setup.tempLong ); - db.dropCollection( setup.incLong ); - - writelock l( setup.incLong ); - Client::Context ctx( setup.incLong ); - string err; - assert( userCreateNS( setup.incLong.c_str() , BSON( "autoIndexId" << 0 ) , err , false ) ); - - } - - void finalReduce( BSONList& values ){ - if ( values.size() == 0 ) - return; - - BSONObj key = values.begin()->firstElement().wrap( "_id" ); - BSONObj res = reduceValues( values , scope.get() , reduce , 1 , finalize ); - - setup.insert( setup.tempLong , res ); - } - + writelock l( setup.incLong ); + Client::Context ctx( setup.incLong ); + string err; + assert( userCreateNS( setup.incLong.c_str() , BSON( "autoIndexId" << 0 ) , err , false ) ); - MRSetup& setup; - auto_ptr<Scope> scope; - DBDirectClient db; + } + + void MRState::finalReduce( BSONList& values ){ + if ( values.size() == 0 ) + return; + + BSONObj key = values.begin()->firstElement().wrap( "_id" ); + BSONObj res = reduceValues( values , this , true ); + + setup.insert( setup.tempLong , res ); + } - ScriptingFunction map; - ScriptingFunction reduce; - ScriptingFunction finalize; + MRTL::MRTL( MRState& state ) + : _state( state ) + , _temp(new InMemory()) + { + _size = 0; + numEmits = 0; + } - }; - - class MRTL { - public: - MRTL( MRState& state ) - : _state( state ) - , _temp(new InMemory()) - { - _size = 0; - numEmits = 0; - } + void MRTL::reduceInMemory(){ + boost::shared_ptr<InMemory> old = _temp; + _temp.reset(new InMemory()); + _size = 0; - void reduceInMemory(){ - boost::shared_ptr<InMemory> old = _temp; - _temp.reset(new InMemory()); - _size = 0; + for ( InMemory::iterator i=old->begin(); i!=old->end(); i++ ){ + BSONObj key = i->first; + BSONList& all = i->second; - for ( InMemory::iterator i=old->begin(); i!=old->end(); i++ ){ - BSONObj key = i->first; - BSONList& all = i->second; - - if ( all.size() == 1 ){ - // this key has low cardinality, so just write to db - writelock l(_state.setup.incLong); - Client::Context ctx(_state.setup.incLong.c_str()); - write( *(all.begin()) ); - } - else if ( all.size() > 1 ){ - BSONObj res = reduceValues( all , _state.scope.get() , _state.reduce , false , 0 ); - insert( res ); - } + if ( all.size() == 1 ){ + // this key has low cardinality, so just write to db + writelock l(_state.setup.incLong); + Client::Context ctx(_state.setup.incLong.c_str()); + write( *(all.begin()) ); + } + else if ( all.size() > 1 ){ + BSONObj res = reduceValues( all , &_state , false ); + insert( res ); } } - - void dump(){ - writelock l(_state.setup.incLong); - Client::Context ctx(_state.setup.incLong); + } + + void MRTL::dump(){ + writelock l(_state.setup.incLong); + Client::Context ctx(_state.setup.incLong); - for ( InMemory::iterator i=_temp->begin(); i!=_temp->end(); i++ ){ - BSONList& all = i->second; - if ( all.size() < 1 ) - continue; + for ( InMemory::iterator i=_temp->begin(); i!=_temp->end(); i++ ){ + BSONList& all = i->second; + if ( all.size() < 1 ) + continue; - for ( BSONList::iterator j=all.begin(); j!=all.end(); j++ ) - write( *j ); - } - _temp->clear(); - _size = 0; - + for ( BSONList::iterator j=all.begin(); j!=all.end(); j++ ) + write( *j ); } + _temp->clear(); + _size = 0; + + } - void insert( const BSONObj& a ){ - BSONList& all = (*_temp)[a]; - all.push_back( a ); - _size += a.objsize() + 16; - } + void MRTL::insert( const BSONObj& a ){ + BSONList& all = (*_temp)[a]; + all.push_back( a ); + _size += a.objsize() + 16; + } - void checkSize(){ - if ( _size < 1024 * 5 ) - return; + void MRTL::checkSize(){ + if ( _size < 1024 * 5 ) + return; - long before = _size; - reduceInMemory(); - log(1) << " mr: did reduceInMemory " << before << " -->> " << _size << endl; + long before = _size; + reduceInMemory(); + log(1) << " mr: did reduceInMemory " << before << " -->> " << _size << endl; - if ( _size < 1024 * 15 ) - return; + if ( _size < 1024 * 15 ) + return; - dump(); - log(1) << " mr: dumping to db" << endl; - } + dump(); + log(1) << " mr: dumping to db" << endl; + } - private: - void write( BSONObj& o ){ - theDataFileMgr.insertWithObjMod( _state.setup.incLong.c_str() , o , true ); - } - - MRState& _state; - - boost::shared_ptr<InMemory> _temp; - long _size; - - public: - long long numEmits; - }; + void MRTL::write( BSONObj& o ){ + theDataFileMgr.insertWithObjMod( _state.setup.incLong.c_str() , o , true ); + } boost::thread_specific_ptr<MRTL> _tlmr; @@ -440,9 +426,11 @@ namespace mongo { BSONObjBuilder countsBuilder; BSONObjBuilder timingBuilder; + MRState state( mr ); + try { - - MRState state( mr ); + state.init(); + state.scope->injectNative( "emit" , fast_emit ); MRTL * mrtl = new MRTL( state ); @@ -620,7 +608,7 @@ namespace mongo { dblock lock; db.dropCollection( mr.incLong ); - finalCount = mr.renameIfNeeded( db ); + finalCount = mr.renameIfNeeded( db , &state ); } timingBuilder.append( "total" , t.millis() ); @@ -686,6 +674,8 @@ namespace mongo { } + MRReduceState state; + DBDirectClient db; { // reduce from each stream @@ -696,12 +686,11 @@ namespace mongo { Query().sort( sortKey ) ); cursor.init(); - auto_ptr<Scope> s = globalScriptEngine->getPooledScope( dbname ); - s->localConnect( dbname.c_str() ); - ScriptingFunction reduceFunction = s->createFunction( mr.reduceCode.c_str() ); - ScriptingFunction finalizeFunction = 0; + state.scope.reset( globalScriptEngine->getPooledScope( dbname ).release() ); + state.scope->localConnect( dbname.c_str() ); + state.reduce = state.scope->createFunction( mr.reduceCode.c_str() ); if ( mr.finalizeCode.size() ) - finalizeFunction = s->createFunction( mr.finalizeCode.c_str() ); + state.finalize = state.scope->createFunction( mr.finalizeCode.c_str() ); BSONList values; @@ -721,16 +710,17 @@ namespace mongo { } - db.insert( mr.tempLong , reduceValues( values , s.get() , reduceFunction , 1 , finalizeFunction ) ); + db.insert( mr.tempLong , reduceValues( values , &state , true ) ); values.clear(); values.push_back( t ); } if ( values.size() ) - db.insert( mr.tempLong , reduceValues( values , s.get() , reduceFunction , 1 , finalizeFunction ) ); + db.insert( mr.tempLong , reduceValues( values , &state , true ) ); } - long long finalCount = mr.renameIfNeeded( db ); + + long long finalCount = mr.renameIfNeeded( db , &state ); log(0) << " mapreducefinishcommand " << mr.finalLong << " " << finalCount << endl; for ( set<ServerAndQuery>::iterator i=servers.begin(); i!=servers.end(); i++ ){ diff --git a/db/commands/mr.h b/db/commands/mr.h new file mode 100644 index 00000000000..9f8907b1de3 --- /dev/null +++ b/db/commands/mr.h @@ -0,0 +1,153 @@ +// mr.h + +/** + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "pch.h" + +namespace mongo { + + namespace mr { + + class MRReduceState { + public: + + MRReduceState() : reduce(), finalize(){} + + scoped_ptr<Scope> scope; + + ScriptingFunction reduce; + ScriptingFunction finalize; + }; + + + typedef vector<BSONObj> BSONList; + + class MyCmp { + public: + MyCmp(){} + bool operator()( const BSONObj &l, const BSONObj &r ) const { + return l.firstElement().woCompare( r.firstElement() ) < 0; + } + }; + + typedef map< BSONObj,BSONList,MyCmp > InMemory; + + /** + * holds map/reduce config information + */ + class MRSetup { + public: + MRSetup( const string& _dbname , const BSONObj& cmdObj , bool markAsTemp = true ); + + /** Field expected to be a Code or CodeWScope. + * Add its scope, if any, to scopeBuilder, and return its code. + * Scopes added later will shadow those added earlier. */ + static string scopeAndCode (BSONObjBuilder& scopeBuilder, const BSONElement& field); + + /** + @return number objects in collection + */ + long long renameIfNeeded( DBDirectClient& db , MRReduceState * state ); + + void insert( const string& ns , BSONObj& o ); + + string dbname; + string ns; + + // options + bool verbose; + bool keeptemp; + bool replicate; + + // query options + + BSONObj filter; + BSONObj sort; + long long limit; + + // functions + + string mapCode; + string reduceCode; + string finalizeCode; + + BSONObj mapparams; + BSONObj scopeSetup; + + // output tables + string incLong; + + string tempShort; + string tempLong; + + string finalShort; + string finalLong; + + enum { NORMAL , MERGE , REDUCE } outType; + + static AtomicUInt JOB_NUMBER; + }; // end MRsetup + + /** + * container for all stack based map/reduce state + */ + class MRState : public MRReduceState { + public: + MRState( MRSetup& s ); + void init(); + + void finalReduce( BSONList& values ); + + MRSetup& setup; + DBDirectClient db; + + ScriptingFunction map; + }; + + /** + * thread local map/reduce state + * used for access map/reduce state from inside javascript calls + */ + class MRTL { + public: + MRTL( MRState& state ); + + void reduceInMemory(); + + void dump(); + + void insert( const BSONObj& a ); + + void checkSize(); + + private: + void write( BSONObj& o ); + + MRState& _state; + + boost::shared_ptr<InMemory> _temp; + long _size; + + public: + long long numEmits; + }; + + } // end mr namespace +} + + diff --git a/db/dbhelpers.cpp b/db/dbhelpers.cpp index 5e04731f258..1fa90cb29b7 100644 --- a/db/dbhelpers.cpp +++ b/db/dbhelpers.cpp @@ -28,39 +28,6 @@ namespace mongo { - CursorIterator::CursorIterator( shared_ptr<Cursor> c , BSONObj filter ) - : _cursor( c ){ - if ( ! filter.isEmpty() ) - _matcher.reset( new CoveredIndexMatcher( filter , BSONObj() ) ); - _advance(); - } - - BSONObj CursorIterator::next(){ - BSONObj o = _o; - _advance(); - return o; - } - - bool CursorIterator::hasNext(){ - return ! _o.isEmpty(); - } - - void CursorIterator::_advance(){ - if ( ! _cursor->ok() ){ - _o = BSONObj(); - return; - } - - while ( _cursor->ok() ){ - _o = _cursor->current(); - _cursor->advance(); - if ( _matcher.get() == 0 || _matcher->matches( _o ) ) - return; - } - - _o = BSONObj(); - } - void Helpers::ensureIndex(const char *ns, BSONObj keyPattern, bool unique, const char *name) { NamespaceDetails *d = nsdetails(ns); if( d == 0 ) @@ -157,13 +124,6 @@ namespace mongo { return res->loc(); } - auto_ptr<CursorIterator> Helpers::find( const char *ns , BSONObj query , bool requireIndex ){ - uassert( 10047 , "requireIndex not supported in Helpers::find yet" , ! requireIndex ); - auto_ptr<CursorIterator> i; - i.reset( new CursorIterator( DataFileMgr::findAll( ns ) , query ) ); - return i; - } - bool Helpers::findById(Client& c, const char *ns, BSONObj query, BSONObj& result , bool * nsFound , bool * indexFound ){ dbMutex.assertAtLeastReadLocked(); diff --git a/db/dbhelpers.h b/db/dbhelpers.h index 981506a2966..d952613396e 100644 --- a/db/dbhelpers.h +++ b/db/dbhelpers.h @@ -33,20 +33,6 @@ namespace mongo { class Cursor; class CoveredIndexMatcher; - class CursorIterator { - public: - CursorIterator( shared_ptr<Cursor> c , BSONObj filter = BSONObj() ); - BSONObj next(); - bool hasNext(); - - private: - void _advance(); - - shared_ptr<Cursor> _cursor; - auto_ptr<CoveredIndexMatcher> _matcher; - BSONObj _o; - }; - /** all helpers assume locking is handled above them */ @@ -90,8 +76,6 @@ namespace mongo { @return null loc if not found */ static DiskLoc findById(NamespaceDetails *d, BSONObj query); - static auto_ptr<CursorIterator> find( const char *ns , BSONObj query = BSONObj() , bool requireIndex = false ); - /** Get/put the first (or last) object from a collection. Generally only useful if the collection only ever has a single object -- which is a "singleton collection". diff --git a/dbtests/querytests.cpp b/dbtests/querytests.cpp index ddc94f4cf8d..c61a6268d56 100644 --- a/dbtests/querytests.cpp +++ b/dbtests/querytests.cpp @@ -926,24 +926,6 @@ namespace QueryTests { cout << "HelperTest slow:" << slow << " fast:" << fast << endl; - { - auto_ptr<CursorIterator> i = Helpers::find( ns() ); - int n = 0; - while ( i->hasNext() ){ - BSONObj o = i->next(); - n++; - } - ASSERT_EQUALS( 50 , n ); - - i = Helpers::find( ns() , BSON( "_id" << 20 ) ); - n = 0; - while ( i->hasNext() ){ - BSONObj o = i->next(); - n++; - } - ASSERT_EQUALS( 1 , n ); - } - } }; diff --git a/jstests/mr_merge.js b/jstests/mr_merge.js new file mode 100644 index 00000000000..6393f188bf7 --- /dev/null +++ b/jstests/mr_merge.js @@ -0,0 +1,51 @@ + +t = db.mr_merge; +t.drop(); + +t.insert( { a : [ 1 , 2 ] } ) +t.insert( { a : [ 2 , 3 ] } ) +t.insert( { a : [ 3 , 4 ] } ) + +outName = "mr_merge_out"; +out = db[outName]; +out.drop(); + +m = function(){ for (i=0; i<this.a.length; i++ ) emit( this.a[i] , 1 ); } +r = function(k,vs){ return Array.sum( vs ); } + +function tos( o ){ + var s = ""; + for ( var i=0; i<100; i++ ){ + if ( o[i] ) + s += i + "_" + o[i]; + } + return s; +} + + +res = t.mapReduce( m , r , { out : outName } ) + + +expected = { "1" : 1 , "2" : 2 , "3" : 2 , "4" : 1 } +assert.eq( tos( expected ) , tos( res.convertToSingleObject() ) , "A" ); + +t.insert( { a : [ 4 , 5 ] } ) +out.insert( { _id : 10 , value : "5" } ) +res = t.mapReduce( m , r , { out : outName } ) + +expected["4"]++; +expected["5"] = 1 +assert.eq( tos( expected ) , tos( res.convertToSingleObject() ) , "B" ); + +t.insert( { a : [ 5 , 6 ] } ) +out.insert( { _id : 10 , value : "5" } ) +res = t.mapReduce( m , r , { out : outName , outType : "merge" } ) + +expected["5"]++; +expected["10"] = 5 +expected["6"] = 1 + +assert.eq( tos( expected ) , tos( res.convertToSingleObject() ) , "C" ); + + + diff --git a/jstests/mr_outreduce.js b/jstests/mr_outreduce.js new file mode 100644 index 00000000000..e5878366fe4 --- /dev/null +++ b/jstests/mr_outreduce.js @@ -0,0 +1,41 @@ + +t = db.mr_outreduce; +t.drop(); + +t.insert( { _id : 1 , a : [ 1 , 2 ] } ) +t.insert( { _id : 2 , a : [ 2 , 3 ] } ) +t.insert( { _id : 3 , a : [ 3 , 4 ] } ) + +outName = "mr_outreduce_out"; +out = db[outName]; +out.drop(); + +m = function(){ for (i=0; i<this.a.length; i++ ) emit( this.a[i] , 1 ); } +r = function(k,vs){ return Array.sum( vs ); } + +function tos( o ){ + var s = ""; + for ( var i=0; i<100; i++ ){ + if ( o[i] ) + s += i + "_" + o[i] + "|" + } + return s; +} + + +res = t.mapReduce( m , r , { out : outName } ) + + +expected = { "1" : 1 , "2" : 2 , "3" : 2 , "4" : 1 } +assert.eq( tos( expected ) , tos( res.convertToSingleObject() ) , "A" ); + +t.insert( { _id : 4 , a : [ 4 , 5 ] } ) +out.insert( { _id : 10 , value : "5" } ) // this is a sentinal to make sure it wasn't killed +res = t.mapReduce( m , r , { out : outName , outType : "reduce" , query : { _id : { $gt : 3 } } } ) + +expected["4"]++; +expected["5"] = 1 +expected["10"] = 5 +assert.eq( tos( expected ) , tos( res.convertToSingleObject() ) , "B" ); + + diff --git a/s/s_only.cpp b/s/s_only.cpp index c91ba10bfac..45b2a5e0cee 100644 --- a/s/s_only.cpp +++ b/s/s_only.cpp @@ -27,12 +27,6 @@ */ namespace mongo { - auto_ptr<CursorIterator> Helpers::find( const char *ns , BSONObj query , bool requireIndex ){ - uassert( 10196 , "Helpers::find can't be used in mongos" , 0 ); - auto_ptr<CursorIterator> i; - return i; - } - boost::thread_specific_ptr<Client> currentClient; Client::Client(const char *desc , MessagingPort *p) : |