summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--client/parallel.cpp19
-rw-r--r--client/parallel.h4
-rw-r--r--db/mr.cpp88
-rw-r--r--jstests/sharding/features2.js4
-rw-r--r--s/commands_public.cpp42
5 files changed, 127 insertions, 30 deletions
diff --git a/client/parallel.cpp b/client/parallel.cpp
index e77acfccdb5..410e3e90499 100644
--- a/client/parallel.cpp
+++ b/client/parallel.cpp
@@ -113,16 +113,29 @@ namespace mongo {
// -------- ParallelSortClusteredCursor -----------
- ParallelSortClusteredCursor::ParallelSortClusteredCursor( set<ServerAndQuery> servers , QueryMessage& q , const BSONObj& sortKey ) : ClusteredCursor( q ) , _servers( servers ){
- _numServers = servers.size();
+ ParallelSortClusteredCursor::ParallelSortClusteredCursor( set<ServerAndQuery> servers , QueryMessage& q ,
+ const BSONObj& sortKey )
+ : ClusteredCursor( q ) , _servers( servers ){
_sortKey = sortKey.getOwned();
+ _init();
+ }
+
+ ParallelSortClusteredCursor::ParallelSortClusteredCursor( set<ServerAndQuery> servers , const string& ns ,
+ const Query& q ,
+ int options , const BSONObj& fields )
+ : ClusteredCursor( ns , q.obj , options , fields ) , _servers( servers ){
+ _sortKey = q.getSort().copy();
+ _init();
+ }
+ void ParallelSortClusteredCursor::_init(){
+ _numServers = _servers.size();
_cursors = new auto_ptr<DBClientCursor>[_numServers];
_nexts = new BSONObj[_numServers];
// TODO: parellize
int num = 0;
- for ( set<ServerAndQuery>::iterator i = servers.begin(); i!=servers.end(); i++ ){
+ for ( set<ServerAndQuery>::iterator i = _servers.begin(); i!=_servers.end(); i++ ){
const ServerAndQuery& sq = *i;
_cursors[num++] = query( sq._server , 0 , sq._extra );
}
diff --git a/client/parallel.h b/client/parallel.h
index e7d39a89ba9..a2189c6c745 100644
--- a/client/parallel.h
+++ b/client/parallel.h
@@ -87,10 +87,14 @@ namespace mongo {
class ParallelSortClusteredCursor : public ClusteredCursor {
public:
ParallelSortClusteredCursor( set<ServerAndQuery> servers , QueryMessage& q , const BSONObj& sortKey );
+ ParallelSortClusteredCursor( set<ServerAndQuery> servers , const string& ns ,
+ const Query& q , int options=0, const BSONObj& fields=BSONObj() );
virtual ~ParallelSortClusteredCursor();
virtual bool more();
virtual BSONObj next();
private:
+ void _init();
+
void advance();
int _numServers;
diff --git a/db/mr.cpp b/db/mr.cpp
index a3aa178ddec..dcf994074fb 100644
--- a/db/mr.cpp
+++ b/db/mr.cpp
@@ -22,6 +22,7 @@
#include "../scripting/engine.h"
#include "../client/dbclient.h"
#include "../client/connpool.h"
+#include "../client/parallel.h"
namespace mongo {
@@ -211,7 +212,17 @@ namespace mongo {
_tlmr->resetNum();
return BSONObj();
}
-
+
+ string tempCollectionName( const BSONObj& cmd ){
+ static int inc = 1;
+ stringstream ss;
+ ss << cc().database()->name << ".";
+ if ( ! cmd["keeptemp"].trueValue() )
+ ss << "tmp.";
+ ss << "mr." << cmd.firstElement().fieldName() << "_" << time(0) << "_" << inc++;
+ return ss.str();
+ }
+
class MapReduceCommand : public Command {
public:
MapReduceCommand() : Command("mapreduce"){}
@@ -221,16 +232,6 @@ namespace mongo {
help << "see http://www.mongodb.org/display/DOCS/MapReduce";
}
- string tempCollectionName( string coll , bool tmp ){
- static int inc = 1;
- stringstream ss;
- ss << cc().database()->name << ".";
- if ( tmp )
- ss << "tmp.";
- ss << "mr." << coll << "_" << time(0) << "_" << inc++;
- return ss.str();
- }
-
void doReduce( const string& resultColl , list<BSONObj>& values , Scope * s , ScriptingFunction reduce ){
if ( values.size() == 0 )
return;
@@ -272,7 +273,7 @@ namespace mongo {
s->localConnect( cc().database()->name.c_str() );
bool istemp = ! cmdObj["keeptemp"].trueValue();
- string resultColl = tempCollectionName( cmdObj.firstElement().valuestr() , istemp );
+ string resultColl = tempCollectionName( cmdObj );
if ( istemp )
currentClient->addTempCollection( resultColl );
string finalOutput = resultColl;
@@ -436,12 +437,14 @@ namespace mongo {
virtual bool slaveOk() { return true; }
bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
-
+ dbtemprelease temprlease; // we don't touch the db directly
+
string dbname = cc().database()->name;
-
+ string shardedOutputCollection = cmdObj["shardedOutputCollection"].valuestrsafe();
+
BSONObj origCmd = cmdObj.firstElement().embeddedObjectUserCheck();
- cout << origCmd << endl;
- errmsg = "eliot was here";
+
+ set<ServerAndQuery> servers;
BSONObj shards = cmdObj["shards"].embeddedObjectUserCheck();
vector< auto_ptr<DBClientCursor> > shardCursors;
@@ -449,18 +452,55 @@ namespace mongo {
while ( i.more() ){
BSONElement e = i.next();
string shard = e.fieldName();
- BSONObj res = e.embeddedObjectUserCheck();
- cout << "\t" << shard << "\t" << res << endl;
- ScopedDbConnection conn( shard );
- //shardCursors.push_back( conn->query( dbname + "." + res["result"].valuestrsafe() , Query().sort( BSON( "_id" << 1 ) ) ) );
+ BSONObj res = e.embeddedObjectUserCheck();
+
+ uassert( "something bad happened" , shardedOutputCollection == res["result"].valuestrsafe() );
+ servers.insert( shard );
}
+
+ BSONObj sortKey = BSON( "_id" << 1 );
+
+ ParallelSortClusteredCursor cursor( servers , dbname + "." + shardedOutputCollection ,
+ Query().sort( sortKey ) );
- //while ( true ){
-
- //}
+
+ auto_ptr<Scope> s = globalScriptEngine->getPooledScope( ns );
+ ScriptingFunction reduceFunction = s->createFunction( origCmd["reduce"].ascode().c_str() );
+
+ list<BSONObj> values;
+
+ string output = origCmd["out"].valuestrsafe();
+ if ( output.size() == 0 )
+ output = tempCollectionName( origCmd );
+ string fulloutput = dbname + "." + output;
+ result.append( "result" , output );
- return 0;
+ DBDirectClient db;
+
+ while ( cursor.more() ){
+ BSONObj t = cursor.next();
+
+ if ( values.size() == 0 ){
+ values.push_back( t );
+ continue;
+ }
+
+ if ( t.woSortOrder( *(values.begin()) , sortKey ) == 0 ){
+ values.push_back( t );
+ continue;
+ }
+
+ db.insert( fulloutput , reduceValues( values , s.get() , reduceFunction ) );
+ values.clear();
+ values.push_back( t );
+ }
+
+ if ( values.size() )
+ db.insert( fulloutput , reduceValues( values , s.get() , reduceFunction ) );
+
+
+ return 1;
}
} mapReduceFinishCommand;
diff --git a/jstests/sharding/features2.js b/jstests/sharding/features2.js
index 46a398e2596..a16f5887aef 100644
--- a/jstests/sharding/features2.js
+++ b/jstests/sharding/features2.js
@@ -91,8 +91,8 @@ doMR = function( n ){
doMR( "before" );
assert.eq( 1 , s.onNumShards( "mr" ) , "E1" );
-//s.shardGo( "mr" , { x : 1 } , { x : 2 } , { x : 3 } );
-//assert.eq( 2 , s.onNumShards( "mr" ) , "E1" );
+s.shardGo( "mr" , { x : 1 } , { x : 2 } , { x : 3 } );
+assert.eq( 2 , s.onNumShards( "mr" ) , "E1" );
doMR( "after" );
diff --git a/s/commands_public.cpp b/s/commands_public.cpp
index a9f512cf35d..aa4b6e458cd 100644
--- a/s/commands_public.cpp
+++ b/s/commands_public.cpp
@@ -204,6 +204,41 @@ namespace mongo {
class MRCmd : public PublicGridCommand {
public:
MRCmd() : PublicGridCommand( "mapreduce" ){}
+
+ string getTmpName( const string& coll ){
+ static int inc = 1;
+ stringstream ss;
+ ss << "tmp.mrs." << coll << "_" << time(0) << "_" << inc++;
+ return ss.str();
+ }
+
+ BSONObj fixForShards( const BSONObj& orig , const string& output ){
+ BSONObjBuilder b;
+ BSONObjIterator i( orig );
+ while ( i.more() ){
+ BSONElement e = i.next();
+ string fn = e.fieldName();
+ if ( fn == "map" ||
+ fn == "mapreduce" ||
+ fn == "reduce" ||
+ fn == "query" ||
+ fn == "sort" ||
+ fn == "verbose" ){
+ b.append( e );
+ }
+ else if ( fn == "keeptemp" ||
+ fn == "out" ||
+ fn == "finalize" ){
+ // we don't want to copy these
+ }
+ else {
+ uassert( (string)"don't know mr field: " + fn , 0 );
+ }
+ }
+ b.append( "out" , output );
+ return b.obj();
+ }
+
bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
string dbName = getDBName( ns );
@@ -226,8 +261,13 @@ namespace mongo {
vector<Chunk*> chunks;
cm->getChunksForQuery( chunks , q );
+ const string shardedOutputCollection = getTmpName( collection );
+
+ BSONObj shardedCommand = fixForShards( cmdObj , shardedOutputCollection );
+
BSONObjBuilder finalB;
finalB.append( "mapreduce.shardedfinish" , cmdObj );
+ finalB.append( "shardedOutputCollection" , shardedOutputCollection );
BSONObjBuilder shardresults;
for ( vector<Chunk*>::iterator i = chunks.begin() ; i != chunks.end() ; i++ ){
@@ -235,7 +275,7 @@ namespace mongo {
ScopedDbConnection conn( c->getShard() );
BSONObj myres;
- if ( ! conn->runCommand( dbName , cmdObj , myres ) ){
+ if ( ! conn->runCommand( dbName , shardedCommand , myres ) ){
errmsg = "mongod mr failed: ";
errmsg += myres.toString();
return 0;