summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordwight <dwight@10gen.com>2010-11-15 16:00:51 -0500
committerdwight <dwight@10gen.com>2010-11-15 16:00:51 -0500
commit5f0fdc432c32a692bf464168066462ef760d578f (patch)
tree1052ef1a4be7723653bcbfbabca8be7b6e958b73
parent56f4efb5dde5664071dd5016fe40f74ae4521ec2 (diff)
parentc220a4a7679b9ffc6e920bd023742b41107cfcba (diff)
downloadmongo-5f0fdc432c32a692bf464168066462ef760d578f.tar.gz
Merge branch 'master' of github.com:mongodb/mongo
-rw-r--r--db/commands/mr.cpp518
-rw-r--r--db/commands/mr.h153
-rw-r--r--db/dbhelpers.cpp40
-rw-r--r--db/dbhelpers.h16
-rw-r--r--dbtests/querytests.cpp18
-rw-r--r--jstests/mr_merge.js51
-rw-r--r--jstests/mr_outreduce.js41
-rw-r--r--s/s_only.cpp6
8 files changed, 499 insertions, 344 deletions
diff --git a/db/commands/mr.cpp b/db/commands/mr.cpp
index a917aaa7cb6..6398ed7bc7f 100644
--- a/db/commands/mr.cpp
+++ b/db/commands/mr.cpp
@@ -27,25 +27,15 @@
#include "../matcher.h"
#include "../clientcursor.h"
+#include "mr.h"
+
namespace mongo {
namespace mr {
- typedef vector<BSONObj> BSONList;
-
- class MyCmp {
- public:
- MyCmp(){}
- bool operator()( const BSONObj &l, const BSONObj &r ) const {
- return l.firstElement().woCompare( r.firstElement() ) < 0;
- }
- };
+ AtomicUInt MRSetup::JOB_NUMBER;
- typedef pair<BSONObj,BSONObj> Data;
- //typedef list< Data > InMemory;
- typedef map< BSONObj,BSONList,MyCmp > InMemory;
-
- BSONObj reduceValues( BSONList& values , Scope * s , ScriptingFunction reduce , bool final , ScriptingFunction finalize ){
+ BSONObj reduceValues( BSONList& values , MRReduceState * state , bool final ){
uassert( 10074 , "need values" , values.size() );
int sizeEstimate = ( values.size() * values.begin()->getField( "value" ).size() ) + 128;
@@ -82,8 +72,8 @@ namespace mongo {
valueBuilder->done();
BSONObj args = reduceArgs.obj();
- s->invokeSafe( reduce , args );
- if ( s->type( "return" ) == Array ){
+ state->scope->invokeSafe( state->reduce , args );
+ if ( state->scope->type( "return" ) == Array ){
uassert( 10075 , "reduce -> multiple not supported yet",0);
return BSONObj();
}
@@ -97,305 +87,301 @@ namespace mongo {
}
BSONObjBuilder temp( endSizeEstimate );
temp.append( key.firstElement() );
- s->append( temp , "1" , "return" );
+ state->scope->append( temp , "1" , "return" );
x.push_back( temp.obj() );
- return reduceValues( x , s , reduce , final , finalize );
+ return reduceValues( x , state , final );
}
- if ( finalize ){
- Scope::NoDBAccess no = s->disableDBAccess( "can't access db inside finalize" );
+ if ( state->finalize ){
+ Scope::NoDBAccess no = state->scope->disableDBAccess( "can't access db inside finalize" );
BSONObjBuilder b(endSizeEstimate);
b.appendAs( key.firstElement() , "_id" );
- s->append( b , "value" , "return" );
- s->invokeSafe( finalize , b.obj() );
+ state->scope->append( b , "value" , "return" );
+ state->scope->invokeSafe( state->finalize , b.obj() );
}
BSONObjBuilder b(endSizeEstimate);
b.appendAs( key.firstElement() , final ? "_id" : "0" );
- s->append( b , final ? "value" : "1" , "return" );
+ state->scope->append( b , final ? "value" : "1" , "return" );
return b.obj();
}
- class MRSetup {
- public:
- MRSetup( const string& _dbname , const BSONObj& cmdObj , bool markAsTemp = true ){
- static int jobNumber = 1;
+ MRSetup::MRSetup( const string& _dbname , const BSONObj& cmdObj , bool markAsTemp ){
+
+ dbname = _dbname;
+ ns = dbname + "." + cmdObj.firstElement().valuestr();
+
+ verbose = cmdObj["verbose"].trueValue();
+ keeptemp = cmdObj["keeptemp"].trueValue();
+
+ { // setup names
+ stringstream ss;
+ if ( ! keeptemp )
+ ss << "tmp.";
+ ss << "mr." << cmdObj.firstElement().String() << "_" << time(0) << "_" << JOB_NUMBER++;
+ tempShort = ss.str();
+ tempLong = dbname + "." + tempShort;
+ incLong = tempLong + "_inc";
- dbname = _dbname;
- ns = dbname + "." + cmdObj.firstElement().valuestr();
+ if ( ! keeptemp && markAsTemp )
+ cc().addTempCollection( tempLong );
- verbose = cmdObj["verbose"].trueValue();
- keeptemp = cmdObj["keeptemp"].trueValue();
-
- { // setup names
- stringstream ss;
- if ( ! keeptemp )
- ss << "tmp.";
- ss << "mr." << cmdObj.firstElement().String() << "_" << time(0) << "_" << jobNumber++;
- tempShort = ss.str();
- tempLong = dbname + "." + tempShort;
- incLong = tempLong + "_inc";
-
- if ( ! keeptemp && markAsTemp )
- cc().addTempCollection( tempLong );
-
- replicate = keeptemp;
-
- if ( cmdObj["out"].type() == String ){
- finalShort = cmdObj["out"].valuestr();
- replicate = true;
- }
- else
- finalShort = tempShort;
+ replicate = keeptemp;
+
+ if ( cmdObj["out"].type() == String ){
+ finalShort = cmdObj["out"].valuestr();
+ replicate = true;
+ }
+ else
+ finalShort = tempShort;
- finalLong = dbname + "." + finalShort;
+ finalLong = dbname + "." + finalShort;
- }
-
- { // scope and code
- // NOTE: function scopes are merged with m/r scope, not nested like they should be
- BSONObjBuilder scopeBuilder;
+ }
+
+ if ( cmdObj["outType"].type() == String ){
+ uassert( 13521 , "need 'out' if using 'outType'" , cmdObj["out"].type() == String );
+ string t = cmdObj["outType"].String();
+ if ( t == "normal" )
+ outType = NORMAL;
+ else if ( t == "merge" )
+ outType = MERGE;
+ else if ( t == "reduce" )
+ outType = REDUCE;
+ else
+ uasserted( 13522 , str::stream() << "unknown outType [" << t << "]" );
+ }
+ else {
+ outType = NORMAL;
+ }
- if ( cmdObj["scope"].type() == Object ){
- scopeBuilder.appendElements( cmdObj["scope"].embeddedObjectUserCheck() );
- }
+ { // scope and code
+ // NOTE: function scopes are merged with m/r scope, not nested like they should be
+ BSONObjBuilder scopeBuilder;
- mapCode = scopeAndCode( scopeBuilder, cmdObj["map"] );
- reduceCode = scopeAndCode( scopeBuilder, cmdObj["reduce"] );
- if ( cmdObj["finalize"].type() ){
- finalizeCode = scopeAndCode( scopeBuilder, cmdObj["finalize"] );
- }
-
- scopeSetup = scopeBuilder.obj();
+ if ( cmdObj["scope"].type() == Object ){
+ scopeBuilder.appendElements( cmdObj["scope"].embeddedObjectUserCheck() );
+ }
- if ( cmdObj["mapparams"].type() == Array ){
- mapparams = cmdObj["mapparams"].embeddedObjectUserCheck();
- }
-
+ mapCode = scopeAndCode( scopeBuilder, cmdObj["map"] );
+ reduceCode = scopeAndCode( scopeBuilder, cmdObj["reduce"] );
+ if ( cmdObj["finalize"].type() ){
+ finalizeCode = scopeAndCode( scopeBuilder, cmdObj["finalize"] );
}
-
- { // query options
- if ( cmdObj["query"].type() == Object ){
- filter = cmdObj["query"].embeddedObjectUserCheck();
- }
- if ( cmdObj["sort"].type() == Object ){
- sort = cmdObj["sort"].embeddedObjectUserCheck();
- }
+ scopeSetup = scopeBuilder.obj();
- if ( cmdObj["limit"].isNumber() )
- limit = cmdObj["limit"].numberLong();
- else
- limit = 0;
+ if ( cmdObj["mapparams"].type() == Array ){
+ mapparams = cmdObj["mapparams"].embeddedObjectUserCheck();
}
+
}
+
+ { // query options
+ if ( cmdObj["query"].type() == Object ){
+ filter = cmdObj["query"].embeddedObjectUserCheck();
+ }
+
+ if ( cmdObj["sort"].type() == Object ){
+ sort = cmdObj["sort"].embeddedObjectUserCheck();
+ }
- /** Field expected to be a Code or CodeWScope.
- * Add its scope, if any, to scopeBuilder, and return its code.
- * Scopes added later will shadow those added earlier. */
- static string scopeAndCode (BSONObjBuilder& scopeBuilder, const BSONElement& field) {
- if ( field.type() == CodeWScope )
- scopeBuilder.appendElements( field.codeWScopeObject() );
- return field._asCode();
+ if ( cmdObj["limit"].isNumber() )
+ limit = cmdObj["limit"].numberLong();
+ else
+ limit = 0;
}
+ }
+
+ string MRSetup::scopeAndCode (BSONObjBuilder& scopeBuilder, const BSONElement& field) {
+ if ( field.type() == CodeWScope )
+ scopeBuilder.appendElements( field.codeWScopeObject() );
+ return field._asCode();
+ }
- /**
- @return number objects in collection
- */
- long long renameIfNeeded( DBDirectClient& db ){
- if ( finalLong != tempLong ){
+ long long MRSetup::renameIfNeeded( DBDirectClient& db , MRReduceState * state ){
+ assertInWriteLock();
+ if ( finalLong != tempLong ){
+
+ if ( outType == NORMAL ){
db.dropCollection( finalLong );
if ( db.count( tempLong ) ){
BSONObj info;
- uassert( 10076 , "rename failed" , db.runCommand( "admin" , BSON( "renameCollection" << tempLong << "to" << finalLong ) , info ) );
+ uassert( 10076 , "rename failed" ,
+ db.runCommand( "admin" , BSON( "renameCollection" << tempLong << "to" << finalLong ) , info ) );
}
}
- return db.count( finalLong );
- }
+ else if ( outType == MERGE ){
+ auto_ptr<DBClientCursor> cursor = db.query( tempLong , BSONObj() );
+ while ( cursor->more() ){
+ BSONObj o = cursor->next();
+ Helpers::upsert( finalLong , o );
+ }
+ db.dropCollection( tempLong );
+ }
+ else if ( outType == REDUCE ){
+ BSONList values;
+
+ auto_ptr<DBClientCursor> cursor = db.query( tempLong , BSONObj() );
+ while ( cursor->more() ){
+ BSONObj temp = cursor->next();
+ BSONObj old;
- void insert( const string& ns , BSONObj& o ){
- writelock l( ns );
- Client::Context ctx( ns );
+ bool found;
+ {
+ Client::Context tx( finalLong );
+ found = Helpers::findOne( finalLong.c_str() , temp["_id"].wrap() , old , true );
+ }
+
+ if ( found ){
+ // need to reduce
+ values.clear();
+ values.push_back( temp );
+ values.push_back( old );
+ Helpers::upsert( finalLong , reduceValues( values , state , true ) );
+ }
+ else {
+ Helpers::upsert( finalLong , temp );
+ }
+ }
+ db.dropCollection( tempLong );
+ }
+ else {
+ assert(0);
+ }
+ }
+ return db.count( finalLong );
+ }
+
+ void MRSetup::insert( const string& ns , BSONObj& o ){
+ writelock l( ns );
+ Client::Context ctx( ns );
- if ( replicate )
- theDataFileMgr.insertAndLog( ns.c_str() , o , false );
- else
- theDataFileMgr.insertWithObjMod( ns.c_str() , o , false );
+ if ( replicate )
+ theDataFileMgr.insertAndLog( ns.c_str() , o , false );
+ else
+ theDataFileMgr.insertWithObjMod( ns.c_str() , o , false );
- }
+ }
- string dbname;
- string ns;
-
- // options
- bool verbose;
- bool keeptemp;
- bool replicate;
- // query options
-
- BSONObj filter;
- BSONObj sort;
- long long limit;
+ MRState::MRState( MRSetup& s )
+ : setup(s){
+ }
- // functions
+ void MRState::init(){
+ scope.reset(globalScriptEngine->getPooledScope( setup.dbname ).release() );
+ scope->localConnect( setup.dbname.c_str() );
- string mapCode;
- string reduceCode;
- string finalizeCode;
+ map = scope->createFunction( setup.mapCode.c_str() );
+ if ( ! map )
+ throw UserException( 9012, (string)"map compile failed: " + scope->getError() );
- BSONObj mapparams;
- BSONObj scopeSetup;
+ reduce = scope->createFunction( setup.reduceCode.c_str() );
+ if ( ! reduce )
+ throw UserException( 9013, (string)"reduce compile failed: " + scope->getError() );
- // output tables
- string incLong;
+ if ( setup.finalizeCode.size() )
+ finalize = scope->createFunction( setup.finalizeCode.c_str() );
+ else
+ finalize = 0;
- string tempShort;
- string tempLong;
+ if ( ! setup.scopeSetup.isEmpty() )
+ scope->init( &setup.scopeSetup );
- string finalShort;
- string finalLong;
+ db.dropCollection( setup.tempLong );
+ db.dropCollection( setup.incLong );
- }; // end MRsetup
-
- class MRState {
- public:
- MRState( MRSetup& s ) : setup(s){
- scope = globalScriptEngine->getPooledScope( setup.dbname );
- scope->localConnect( setup.dbname.c_str() );
-
- map = scope->createFunction( setup.mapCode.c_str() );
- if ( ! map )
- throw UserException( 9012, (string)"map compile failed: " + scope->getError() );
-
- reduce = scope->createFunction( setup.reduceCode.c_str() );
- if ( ! reduce )
- throw UserException( 9013, (string)"reduce compile failed: " + scope->getError() );
-
- if ( setup.finalizeCode.size() )
- finalize = scope->createFunction( setup.finalizeCode.c_str() );
- else
- finalize = 0;
-
- if ( ! setup.scopeSetup.isEmpty() )
- scope->init( &setup.scopeSetup );
-
- db.dropCollection( setup.tempLong );
- db.dropCollection( setup.incLong );
-
- writelock l( setup.incLong );
- Client::Context ctx( setup.incLong );
- string err;
- assert( userCreateNS( setup.incLong.c_str() , BSON( "autoIndexId" << 0 ) , err , false ) );
-
- }
-
- void finalReduce( BSONList& values ){
- if ( values.size() == 0 )
- return;
-
- BSONObj key = values.begin()->firstElement().wrap( "_id" );
- BSONObj res = reduceValues( values , scope.get() , reduce , 1 , finalize );
-
- setup.insert( setup.tempLong , res );
- }
-
+ writelock l( setup.incLong );
+ Client::Context ctx( setup.incLong );
+ string err;
+ assert( userCreateNS( setup.incLong.c_str() , BSON( "autoIndexId" << 0 ) , err , false ) );
- MRSetup& setup;
- auto_ptr<Scope> scope;
- DBDirectClient db;
+ }
+
+ void MRState::finalReduce( BSONList& values ){
+ if ( values.size() == 0 )
+ return;
+
+ BSONObj key = values.begin()->firstElement().wrap( "_id" );
+ BSONObj res = reduceValues( values , this , true );
+
+ setup.insert( setup.tempLong , res );
+ }
- ScriptingFunction map;
- ScriptingFunction reduce;
- ScriptingFunction finalize;
+ MRTL::MRTL( MRState& state )
+ : _state( state )
+ , _temp(new InMemory())
+ {
+ _size = 0;
+ numEmits = 0;
+ }
- };
-
- class MRTL {
- public:
- MRTL( MRState& state )
- : _state( state )
- , _temp(new InMemory())
- {
- _size = 0;
- numEmits = 0;
- }
+ void MRTL::reduceInMemory(){
+ boost::shared_ptr<InMemory> old = _temp;
+ _temp.reset(new InMemory());
+ _size = 0;
- void reduceInMemory(){
- boost::shared_ptr<InMemory> old = _temp;
- _temp.reset(new InMemory());
- _size = 0;
+ for ( InMemory::iterator i=old->begin(); i!=old->end(); i++ ){
+ BSONObj key = i->first;
+ BSONList& all = i->second;
- for ( InMemory::iterator i=old->begin(); i!=old->end(); i++ ){
- BSONObj key = i->first;
- BSONList& all = i->second;
-
- if ( all.size() == 1 ){
- // this key has low cardinality, so just write to db
- writelock l(_state.setup.incLong);
- Client::Context ctx(_state.setup.incLong.c_str());
- write( *(all.begin()) );
- }
- else if ( all.size() > 1 ){
- BSONObj res = reduceValues( all , _state.scope.get() , _state.reduce , false , 0 );
- insert( res );
- }
+ if ( all.size() == 1 ){
+ // this key has low cardinality, so just write to db
+ writelock l(_state.setup.incLong);
+ Client::Context ctx(_state.setup.incLong.c_str());
+ write( *(all.begin()) );
+ }
+ else if ( all.size() > 1 ){
+ BSONObj res = reduceValues( all , &_state , false );
+ insert( res );
}
}
-
- void dump(){
- writelock l(_state.setup.incLong);
- Client::Context ctx(_state.setup.incLong);
+ }
+
+ void MRTL::dump(){
+ writelock l(_state.setup.incLong);
+ Client::Context ctx(_state.setup.incLong);
- for ( InMemory::iterator i=_temp->begin(); i!=_temp->end(); i++ ){
- BSONList& all = i->second;
- if ( all.size() < 1 )
- continue;
+ for ( InMemory::iterator i=_temp->begin(); i!=_temp->end(); i++ ){
+ BSONList& all = i->second;
+ if ( all.size() < 1 )
+ continue;
- for ( BSONList::iterator j=all.begin(); j!=all.end(); j++ )
- write( *j );
- }
- _temp->clear();
- _size = 0;
-
+ for ( BSONList::iterator j=all.begin(); j!=all.end(); j++ )
+ write( *j );
}
+ _temp->clear();
+ _size = 0;
+
+ }
- void insert( const BSONObj& a ){
- BSONList& all = (*_temp)[a];
- all.push_back( a );
- _size += a.objsize() + 16;
- }
+ void MRTL::insert( const BSONObj& a ){
+ BSONList& all = (*_temp)[a];
+ all.push_back( a );
+ _size += a.objsize() + 16;
+ }
- void checkSize(){
- if ( _size < 1024 * 5 )
- return;
+ void MRTL::checkSize(){
+ if ( _size < 1024 * 5 )
+ return;
- long before = _size;
- reduceInMemory();
- log(1) << " mr: did reduceInMemory " << before << " -->> " << _size << endl;
+ long before = _size;
+ reduceInMemory();
+ log(1) << " mr: did reduceInMemory " << before << " -->> " << _size << endl;
- if ( _size < 1024 * 15 )
- return;
+ if ( _size < 1024 * 15 )
+ return;
- dump();
- log(1) << " mr: dumping to db" << endl;
- }
+ dump();
+ log(1) << " mr: dumping to db" << endl;
+ }
- private:
- void write( BSONObj& o ){
- theDataFileMgr.insertWithObjMod( _state.setup.incLong.c_str() , o , true );
- }
-
- MRState& _state;
-
- boost::shared_ptr<InMemory> _temp;
- long _size;
-
- public:
- long long numEmits;
- };
+ void MRTL::write( BSONObj& o ){
+ theDataFileMgr.insertWithObjMod( _state.setup.incLong.c_str() , o , true );
+ }
boost::thread_specific_ptr<MRTL> _tlmr;
@@ -440,9 +426,11 @@ namespace mongo {
BSONObjBuilder countsBuilder;
BSONObjBuilder timingBuilder;
+ MRState state( mr );
+
try {
-
- MRState state( mr );
+ state.init();
+
state.scope->injectNative( "emit" , fast_emit );
MRTL * mrtl = new MRTL( state );
@@ -620,7 +608,7 @@ namespace mongo {
dblock lock;
db.dropCollection( mr.incLong );
- finalCount = mr.renameIfNeeded( db );
+ finalCount = mr.renameIfNeeded( db , &state );
}
timingBuilder.append( "total" , t.millis() );
@@ -686,6 +674,8 @@ namespace mongo {
}
+ MRReduceState state;
+
DBDirectClient db;
{ // reduce from each stream
@@ -696,12 +686,11 @@ namespace mongo {
Query().sort( sortKey ) );
cursor.init();
- auto_ptr<Scope> s = globalScriptEngine->getPooledScope( dbname );
- s->localConnect( dbname.c_str() );
- ScriptingFunction reduceFunction = s->createFunction( mr.reduceCode.c_str() );
- ScriptingFunction finalizeFunction = 0;
+ state.scope.reset( globalScriptEngine->getPooledScope( dbname ).release() );
+ state.scope->localConnect( dbname.c_str() );
+ state.reduce = state.scope->createFunction( mr.reduceCode.c_str() );
if ( mr.finalizeCode.size() )
- finalizeFunction = s->createFunction( mr.finalizeCode.c_str() );
+ state.finalize = state.scope->createFunction( mr.finalizeCode.c_str() );
BSONList values;
@@ -721,16 +710,17 @@ namespace mongo {
}
- db.insert( mr.tempLong , reduceValues( values , s.get() , reduceFunction , 1 , finalizeFunction ) );
+ db.insert( mr.tempLong , reduceValues( values , &state , true ) );
values.clear();
values.push_back( t );
}
if ( values.size() )
- db.insert( mr.tempLong , reduceValues( values , s.get() , reduceFunction , 1 , finalizeFunction ) );
+ db.insert( mr.tempLong , reduceValues( values , &state , true ) );
}
- long long finalCount = mr.renameIfNeeded( db );
+
+ long long finalCount = mr.renameIfNeeded( db , &state );
log(0) << " mapreducefinishcommand " << mr.finalLong << " " << finalCount << endl;
for ( set<ServerAndQuery>::iterator i=servers.begin(); i!=servers.end(); i++ ){
diff --git a/db/commands/mr.h b/db/commands/mr.h
new file mode 100644
index 00000000000..9f8907b1de3
--- /dev/null
+++ b/db/commands/mr.h
@@ -0,0 +1,153 @@
+// mr.h
+
+/**
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include "pch.h"
+
+namespace mongo {
+
+ namespace mr {
+
+ class MRReduceState {
+ public:
+
+ MRReduceState() : reduce(), finalize(){}
+
+ scoped_ptr<Scope> scope;
+
+ ScriptingFunction reduce;
+ ScriptingFunction finalize;
+ };
+
+
+ typedef vector<BSONObj> BSONList;
+
+ class MyCmp {
+ public:
+ MyCmp(){}
+ bool operator()( const BSONObj &l, const BSONObj &r ) const {
+ return l.firstElement().woCompare( r.firstElement() ) < 0;
+ }
+ };
+
+ typedef map< BSONObj,BSONList,MyCmp > InMemory;
+
+ /**
+ * holds map/reduce config information
+ */
+ class MRSetup {
+ public:
+ MRSetup( const string& _dbname , const BSONObj& cmdObj , bool markAsTemp = true );
+
+ /** Field expected to be a Code or CodeWScope.
+ * Add its scope, if any, to scopeBuilder, and return its code.
+ * Scopes added later will shadow those added earlier. */
+ static string scopeAndCode (BSONObjBuilder& scopeBuilder, const BSONElement& field);
+
+ /**
+ @return number objects in collection
+ */
+ long long renameIfNeeded( DBDirectClient& db , MRReduceState * state );
+
+ void insert( const string& ns , BSONObj& o );
+
+ string dbname;
+ string ns;
+
+ // options
+ bool verbose;
+ bool keeptemp;
+ bool replicate;
+
+ // query options
+
+ BSONObj filter;
+ BSONObj sort;
+ long long limit;
+
+ // functions
+
+ string mapCode;
+ string reduceCode;
+ string finalizeCode;
+
+ BSONObj mapparams;
+ BSONObj scopeSetup;
+
+ // output tables
+ string incLong;
+
+ string tempShort;
+ string tempLong;
+
+ string finalShort;
+ string finalLong;
+
+ enum { NORMAL , MERGE , REDUCE } outType;
+
+ static AtomicUInt JOB_NUMBER;
+ }; // end MRsetup
+
+ /**
+ * container for all stack based map/reduce state
+ */
+ class MRState : public MRReduceState {
+ public:
+ MRState( MRSetup& s );
+ void init();
+
+ void finalReduce( BSONList& values );
+
+ MRSetup& setup;
+ DBDirectClient db;
+
+ ScriptingFunction map;
+ };
+
+ /**
+ * thread local map/reduce state
+ * used for access map/reduce state from inside javascript calls
+ */
+ class MRTL {
+ public:
+ MRTL( MRState& state );
+
+ void reduceInMemory();
+
+ void dump();
+
+ void insert( const BSONObj& a );
+
+ void checkSize();
+
+ private:
+ void write( BSONObj& o );
+
+ MRState& _state;
+
+ boost::shared_ptr<InMemory> _temp;
+ long _size;
+
+ public:
+ long long numEmits;
+ };
+
+ } // end mr namespace
+}
+
+
diff --git a/db/dbhelpers.cpp b/db/dbhelpers.cpp
index 5e04731f258..1fa90cb29b7 100644
--- a/db/dbhelpers.cpp
+++ b/db/dbhelpers.cpp
@@ -28,39 +28,6 @@
namespace mongo {
- CursorIterator::CursorIterator( shared_ptr<Cursor> c , BSONObj filter )
- : _cursor( c ){
- if ( ! filter.isEmpty() )
- _matcher.reset( new CoveredIndexMatcher( filter , BSONObj() ) );
- _advance();
- }
-
- BSONObj CursorIterator::next(){
- BSONObj o = _o;
- _advance();
- return o;
- }
-
- bool CursorIterator::hasNext(){
- return ! _o.isEmpty();
- }
-
- void CursorIterator::_advance(){
- if ( ! _cursor->ok() ){
- _o = BSONObj();
- return;
- }
-
- while ( _cursor->ok() ){
- _o = _cursor->current();
- _cursor->advance();
- if ( _matcher.get() == 0 || _matcher->matches( _o ) )
- return;
- }
-
- _o = BSONObj();
- }
-
void Helpers::ensureIndex(const char *ns, BSONObj keyPattern, bool unique, const char *name) {
NamespaceDetails *d = nsdetails(ns);
if( d == 0 )
@@ -157,13 +124,6 @@ namespace mongo {
return res->loc();
}
- auto_ptr<CursorIterator> Helpers::find( const char *ns , BSONObj query , bool requireIndex ){
- uassert( 10047 , "requireIndex not supported in Helpers::find yet" , ! requireIndex );
- auto_ptr<CursorIterator> i;
- i.reset( new CursorIterator( DataFileMgr::findAll( ns ) , query ) );
- return i;
- }
-
bool Helpers::findById(Client& c, const char *ns, BSONObj query, BSONObj& result ,
bool * nsFound , bool * indexFound ){
dbMutex.assertAtLeastReadLocked();
diff --git a/db/dbhelpers.h b/db/dbhelpers.h
index 981506a2966..d952613396e 100644
--- a/db/dbhelpers.h
+++ b/db/dbhelpers.h
@@ -33,20 +33,6 @@ namespace mongo {
class Cursor;
class CoveredIndexMatcher;
- class CursorIterator {
- public:
- CursorIterator( shared_ptr<Cursor> c , BSONObj filter = BSONObj() );
- BSONObj next();
- bool hasNext();
-
- private:
- void _advance();
-
- shared_ptr<Cursor> _cursor;
- auto_ptr<CoveredIndexMatcher> _matcher;
- BSONObj _o;
- };
-
/**
all helpers assume locking is handled above them
*/
@@ -90,8 +76,6 @@ namespace mongo {
@return null loc if not found */
static DiskLoc findById(NamespaceDetails *d, BSONObj query);
- static auto_ptr<CursorIterator> find( const char *ns , BSONObj query = BSONObj() , bool requireIndex = false );
-
/** Get/put the first (or last) object from a collection. Generally only useful if the collection
only ever has a single object -- which is a "singleton collection".
diff --git a/dbtests/querytests.cpp b/dbtests/querytests.cpp
index ddc94f4cf8d..c61a6268d56 100644
--- a/dbtests/querytests.cpp
+++ b/dbtests/querytests.cpp
@@ -926,24 +926,6 @@ namespace QueryTests {
cout << "HelperTest slow:" << slow << " fast:" << fast << endl;
- {
- auto_ptr<CursorIterator> i = Helpers::find( ns() );
- int n = 0;
- while ( i->hasNext() ){
- BSONObj o = i->next();
- n++;
- }
- ASSERT_EQUALS( 50 , n );
-
- i = Helpers::find( ns() , BSON( "_id" << 20 ) );
- n = 0;
- while ( i->hasNext() ){
- BSONObj o = i->next();
- n++;
- }
- ASSERT_EQUALS( 1 , n );
- }
-
}
};
diff --git a/jstests/mr_merge.js b/jstests/mr_merge.js
new file mode 100644
index 00000000000..6393f188bf7
--- /dev/null
+++ b/jstests/mr_merge.js
@@ -0,0 +1,51 @@
+
+t = db.mr_merge;
+t.drop();
+
+t.insert( { a : [ 1 , 2 ] } )
+t.insert( { a : [ 2 , 3 ] } )
+t.insert( { a : [ 3 , 4 ] } )
+
+outName = "mr_merge_out";
+out = db[outName];
+out.drop();
+
+m = function(){ for (i=0; i<this.a.length; i++ ) emit( this.a[i] , 1 ); }
+r = function(k,vs){ return Array.sum( vs ); }
+
+function tos( o ){
+ var s = "";
+ for ( var i=0; i<100; i++ ){
+ if ( o[i] )
+ s += i + "_" + o[i];
+ }
+ return s;
+}
+
+
+res = t.mapReduce( m , r , { out : outName } )
+
+
+expected = { "1" : 1 , "2" : 2 , "3" : 2 , "4" : 1 }
+assert.eq( tos( expected ) , tos( res.convertToSingleObject() ) , "A" );
+
+t.insert( { a : [ 4 , 5 ] } )
+out.insert( { _id : 10 , value : "5" } )
+res = t.mapReduce( m , r , { out : outName } )
+
+expected["4"]++;
+expected["5"] = 1
+assert.eq( tos( expected ) , tos( res.convertToSingleObject() ) , "B" );
+
+t.insert( { a : [ 5 , 6 ] } )
+out.insert( { _id : 10 , value : "5" } )
+res = t.mapReduce( m , r , { out : outName , outType : "merge" } )
+
+expected["5"]++;
+expected["10"] = 5
+expected["6"] = 1
+
+assert.eq( tos( expected ) , tos( res.convertToSingleObject() ) , "C" );
+
+
+
diff --git a/jstests/mr_outreduce.js b/jstests/mr_outreduce.js
new file mode 100644
index 00000000000..e5878366fe4
--- /dev/null
+++ b/jstests/mr_outreduce.js
@@ -0,0 +1,41 @@
+
+t = db.mr_outreduce;
+t.drop();
+
+t.insert( { _id : 1 , a : [ 1 , 2 ] } )
+t.insert( { _id : 2 , a : [ 2 , 3 ] } )
+t.insert( { _id : 3 , a : [ 3 , 4 ] } )
+
+outName = "mr_outreduce_out";
+out = db[outName];
+out.drop();
+
+m = function(){ for (i=0; i<this.a.length; i++ ) emit( this.a[i] , 1 ); }
+r = function(k,vs){ return Array.sum( vs ); }
+
+function tos( o ){
+ var s = "";
+ for ( var i=0; i<100; i++ ){
+ if ( o[i] )
+ s += i + "_" + o[i] + "|"
+ }
+ return s;
+}
+
+
+res = t.mapReduce( m , r , { out : outName } )
+
+
+expected = { "1" : 1 , "2" : 2 , "3" : 2 , "4" : 1 }
+assert.eq( tos( expected ) , tos( res.convertToSingleObject() ) , "A" );
+
+t.insert( { _id : 4 , a : [ 4 , 5 ] } )
+out.insert( { _id : 10 , value : "5" } ) // this is a sentinal to make sure it wasn't killed
+res = t.mapReduce( m , r , { out : outName , outType : "reduce" , query : { _id : { $gt : 3 } } } )
+
+expected["4"]++;
+expected["5"] = 1
+expected["10"] = 5
+assert.eq( tos( expected ) , tos( res.convertToSingleObject() ) , "B" );
+
+
diff --git a/s/s_only.cpp b/s/s_only.cpp
index c91ba10bfac..45b2a5e0cee 100644
--- a/s/s_only.cpp
+++ b/s/s_only.cpp
@@ -27,12 +27,6 @@
*/
namespace mongo {
- auto_ptr<CursorIterator> Helpers::find( const char *ns , BSONObj query , bool requireIndex ){
- uassert( 10196 , "Helpers::find can't be used in mongos" , 0 );
- auto_ptr<CursorIterator> i;
- return i;
- }
-
boost::thread_specific_ptr<Client> currentClient;
Client::Client(const char *desc , MessagingPort *p) :