summaryrefslogtreecommitdiff
path: root/db/mr.cpp
diff options
context:
space:
mode:
authorEliot Horowitz <eliot@10gen.com>2009-11-03 11:40:00 -0500
committerEliot Horowitz <eliot@10gen.com>2009-11-03 11:40:00 -0500
commit6d5e43755d739b526d5b56c7a6c457e8f019a74b (patch)
treecd929539e73fb9f46129ad9bd1c4a9fa016a41cd /db/mr.cpp
parent3a1c9831c135fbef4bf02ac3458772196bd88038 (diff)
downloadmongo-6d5e43755d739b526d5b56c7a6c457e8f019a74b.tar.gz
sharded map-reduce basic test works SHARDING-37
Diffstat (limited to 'db/mr.cpp')
-rw-r--r--db/mr.cpp88
1 files changed, 64 insertions, 24 deletions
diff --git a/db/mr.cpp b/db/mr.cpp
index a3aa178ddec..dcf994074fb 100644
--- a/db/mr.cpp
+++ b/db/mr.cpp
@@ -22,6 +22,7 @@
#include "../scripting/engine.h"
#include "../client/dbclient.h"
#include "../client/connpool.h"
+#include "../client/parallel.h"
namespace mongo {
@@ -211,7 +212,17 @@ namespace mongo {
_tlmr->resetNum();
return BSONObj();
}
-
+
+ string tempCollectionName( const BSONObj& cmd ){
+ static int inc = 1;
+ stringstream ss;
+ ss << cc().database()->name << ".";
+ if ( ! cmd["keeptemp"].trueValue() )
+ ss << "tmp.";
+ ss << "mr." << cmd.firstElement().fieldName() << "_" << time(0) << "_" << inc++;
+ return ss.str();
+ }
+
class MapReduceCommand : public Command {
public:
MapReduceCommand() : Command("mapreduce"){}
@@ -221,16 +232,6 @@ namespace mongo {
help << "see http://www.mongodb.org/display/DOCS/MapReduce";
}
- string tempCollectionName( string coll , bool tmp ){
- static int inc = 1;
- stringstream ss;
- ss << cc().database()->name << ".";
- if ( tmp )
- ss << "tmp.";
- ss << "mr." << coll << "_" << time(0) << "_" << inc++;
- return ss.str();
- }
-
void doReduce( const string& resultColl , list<BSONObj>& values , Scope * s , ScriptingFunction reduce ){
if ( values.size() == 0 )
return;
@@ -272,7 +273,7 @@ namespace mongo {
s->localConnect( cc().database()->name.c_str() );
bool istemp = ! cmdObj["keeptemp"].trueValue();
- string resultColl = tempCollectionName( cmdObj.firstElement().valuestr() , istemp );
+ string resultColl = tempCollectionName( cmdObj );
if ( istemp )
currentClient->addTempCollection( resultColl );
string finalOutput = resultColl;
@@ -436,12 +437,14 @@ namespace mongo {
virtual bool slaveOk() { return true; }
bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
-
+ dbtemprelease temprlease; // we don't touch the db directly
+
string dbname = cc().database()->name;
-
+ string shardedOutputCollection = cmdObj["shardedOutputCollection"].valuestrsafe();
+
BSONObj origCmd = cmdObj.firstElement().embeddedObjectUserCheck();
- cout << origCmd << endl;
- errmsg = "eliot was here";
+
+ set<ServerAndQuery> servers;
BSONObj shards = cmdObj["shards"].embeddedObjectUserCheck();
vector< auto_ptr<DBClientCursor> > shardCursors;
@@ -449,18 +452,55 @@ namespace mongo {
while ( i.more() ){
BSONElement e = i.next();
string shard = e.fieldName();
- BSONObj res = e.embeddedObjectUserCheck();
- cout << "\t" << shard << "\t" << res << endl;
- ScopedDbConnection conn( shard );
- //shardCursors.push_back( conn->query( dbname + "." + res["result"].valuestrsafe() , Query().sort( BSON( "_id" << 1 ) ) ) );
+ BSONObj res = e.embeddedObjectUserCheck();
+
+ uassert( "something bad happened" , shardedOutputCollection == res["result"].valuestrsafe() );
+ servers.insert( shard );
}
+
+ BSONObj sortKey = BSON( "_id" << 1 );
+
+ ParallelSortClusteredCursor cursor( servers , dbname + "." + shardedOutputCollection ,
+ Query().sort( sortKey ) );
- //while ( true ){
-
- //}
+
+ auto_ptr<Scope> s = globalScriptEngine->getPooledScope( ns );
+ ScriptingFunction reduceFunction = s->createFunction( origCmd["reduce"].ascode().c_str() );
+
+ list<BSONObj> values;
+
+ string output = origCmd["out"].valuestrsafe();
+ if ( output.size() == 0 )
+ output = tempCollectionName( origCmd );
+ string fulloutput = dbname + "." + output;
+ result.append( "result" , output );
- return 0;
+ DBDirectClient db;
+
+ while ( cursor.more() ){
+ BSONObj t = cursor.next();
+
+ if ( values.size() == 0 ){
+ values.push_back( t );
+ continue;
+ }
+
+ if ( t.woSortOrder( *(values.begin()) , sortKey ) == 0 ){
+ values.push_back( t );
+ continue;
+ }
+
+ db.insert( fulloutput , reduceValues( values , s.get() , reduceFunction ) );
+ values.clear();
+ values.push_back( t );
+ }
+
+ if ( values.size() )
+ db.insert( fulloutput , reduceValues( values , s.get() , reduceFunction ) );
+
+
+ return 1;
}
} mapReduceFinishCommand;