summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--SConstruct2
-rw-r--r--db/commands/mr.cpp23
-rw-r--r--docs/errors.md77
-rw-r--r--s/commands_public.cpp187
-rw-r--r--s/mr_shard.cpp311
-rw-r--r--s/mr_shard.h230
-rw-r--r--s/server.cpp12
-rw-r--r--s/strategy.cpp1
-rw-r--r--s/strategy.h2
-rw-r--r--s/strategy_shard.cpp57
-rw-r--r--s/strategy_single.cpp4
11 files changed, 839 insertions, 67 deletions
diff --git a/SConstruct b/SConstruct
index 385a0ac1041..c91f91e23df 100644
--- a/SConstruct
+++ b/SConstruct
@@ -373,7 +373,7 @@ else:
coreServerFiles += scriptingFiles
coreShardFiles = [ "s/config.cpp" , "s/grid.cpp" , "s/chunk.cpp" , "s/shard.cpp" , "s/shardkey.cpp" ]
-shardServerFiles = coreShardFiles + Glob( "s/strategy*.cpp" ) + [ "s/commands_admin.cpp" , "s/commands_public.cpp" , "s/request.cpp" , "s/client.cpp" , "s/cursors.cpp" , "s/server.cpp" , "s/config_migrate.cpp" , "s/s_only.cpp" , "s/stats.cpp" , "s/balance.cpp" , "s/balancer_policy.cpp" , "db/cmdline.cpp" , "s/writeback_listener.cpp" , "s/shard_version.cpp" ]
+shardServerFiles = coreShardFiles + Glob( "s/strategy*.cpp" ) + [ "s/commands_admin.cpp" , "s/commands_public.cpp" , "s/request.cpp" , "s/client.cpp" , "s/cursors.cpp" , "s/server.cpp" , "s/config_migrate.cpp" , "s/s_only.cpp" , "s/stats.cpp" , "s/balance.cpp" , "s/balancer_policy.cpp" , "db/cmdline.cpp" , "s/writeback_listener.cpp" , "s/shard_version.cpp", "s/mr_shard.cpp" ]
serverOnlyFiles += coreShardFiles + [ "s/d_logic.cpp" , "s/d_writeback.cpp" , "s/d_migrate.cpp" , "s/d_state.cpp" , "s/d_split.cpp" , "client/distlock_test.cpp" , "s/d_chunk_manager.cpp" ]
serverOnlyFiles += [ "db/module.cpp" ] + Glob( "db/modules/*.cpp" )
diff --git a/db/commands/mr.cpp b/db/commands/mr.cpp
index bf52f46176f..c55ef3d77cc 100644
--- a/db/commands/mr.cpp
+++ b/db/commands/mr.cpp
@@ -1113,10 +1113,9 @@ namespace mongo {
set<ServerAndQuery> servers;
- BSONObjBuilder shardCounts;
- map<string,long long> counts;
-
BSONObj shards = cmdObj["shards"].embeddedObjectUserCheck();
+ BSONObj shardCounts = cmdObj["shardCounts"].embeddedObjectUserCheck();
+ BSONObj counts = cmdObj["counts"].embeddedObjectUserCheck();
vector< auto_ptr<DBClientCursor> > shardCursors;
{
@@ -1130,13 +1129,6 @@ namespace mongo {
uassert( 10078 , "something bad happened" , shardedOutputCollection == res["result"].valuestrsafe() );
servers.insert( shard );
- shardCounts.appendAs( res["counts"] , shard );
-
- BSONObjIterator j( res["counts"].embeddedObjectUserCheck() );
- while ( j.more() ) {
- BSONElement temp = j.next();
- counts[temp.fieldName()] += temp.numberLong();
- }
}
@@ -1203,15 +1195,8 @@ namespace mongo {
conn.done();
}
- result.append( "shardCounts" , shardCounts.obj() );
-
- {
- BSONObjBuilder c;
- for ( map<string,long long>::iterator i=counts.begin(); i!=counts.end(); i++ ) {
- c.append( i->first , i->second );
- }
- result.append( "counts" , c.obj() );
- }
+ result.append( "shardCounts" , shardCounts );
+ result.append( "counts" , counts );
return 1;
}
diff --git a/docs/errors.md b/docs/errors.md
index ac50c305319..4fdbaa4f7d5 100644
--- a/docs/errors.md
+++ b/docs/errors.md
@@ -289,7 +289,7 @@ db/commands/mr.cpp
* 10075 [code](http://github.com/mongodb/mongo/blob/master/db/commands/mr.cpp#L196) reduce -> multiple not supported yet
* 10076 [code](http://github.com/mongodb/mongo/blob/master/db/commands/mr.cpp#L421) rename failed:
* 10077 [code](http://github.com/mongodb/mongo/blob/master/db/commands/mr.cpp#L877) fast_emit takes 2 args
-* 10078 [code](http://github.com/mongodb/mongo/blob/master/db/commands/mr.cpp#L1131) something bad happened" , shardedOutputCollection == res["result
+* 10078 [code](http://github.com/mongodb/mongo/blob/master/db/commands/mr.cpp#L1130) something bad happened" , shardedOutputCollection == res["result
* 13069 [code](http://github.com/mongodb/mongo/blob/master/db/commands/mr.cpp#L878) an emit can't be more than half max bson size
* 13070 [code](http://github.com/mongodb/mongo/blob/master/db/commands/mr.cpp#L176) value too large to reduce
* 13522 [code](http://github.com/mongodb/mongo/blob/master/db/commands/mr.cpp#L258) unknown out specifier [" << t << "]
@@ -1062,26 +1062,27 @@ s/client.cpp
s/commands_public.cpp
----
-* 10418 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L264) how could chunk manager be null!
-* 10420 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L719) how could chunk manager be null!
-* 12594 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L487) how could chunk manager be null!
-* 13002 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L609) how could chunk manager be null!
-* 13091 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L784) how could chunk manager be null!
-* 13092 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L785) GridFS chunks collection can only be sharded on files_id", cm->getShardKey().key() == BSON("files_id
-* 13137 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L321) Source and destination collections must be on same shard
-* 13138 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L315) You can't rename a sharded collection
-* 13139 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L316) You can't rename to a sharded collection
-* 13140 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L314) Don't recognize source or target DB
-* 13343 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L612) query for sharded findAndModify must have shardkey
-* 13398 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L335) cant copy to sharded DB
-* 13399 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L343) need a fromdb argument
-* 13400 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L346) don't know where source DB is
-* 13401 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L347) cant copy from sharded DB
-* 13402 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L332) need a todb argument
-* 13407 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L641) how could chunk manager be null!
-* 13408 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L647) keyPattern must equal shard key
-* 13500 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L815) how could chunk manager be null!
-* 13512 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L267) drop collection attempted on non-sharded collection
+* 1 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L1094)
+* 10418 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L265) how could chunk manager be null!
+* 10420 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L720) how could chunk manager be null!
+* 12594 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L488) how could chunk manager be null!
+* 13002 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L610) how could chunk manager be null!
+* 13091 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L785) how could chunk manager be null!
+* 13092 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L786) GridFS chunks collection can only be sharded on files_id", cm->getShardKey().key() == BSON("files_id
+* 13137 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L322) Source and destination collections must be on same shard
+* 13138 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L316) You can't rename a sharded collection
+* 13139 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L317) You can't rename to a sharded collection
+* 13140 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L315) Don't recognize source or target DB
+* 13343 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L613) query for sharded findAndModify must have shardkey
+* 13398 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L336) cant copy to sharded DB
+* 13399 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L344) need a fromdb argument
+* 13400 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L347) don't know where source DB is
+* 13401 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L348) cant copy from sharded DB
+* 13402 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L333) need a todb argument
+* 13407 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L642) how could chunk manager be null!
+* 13408 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L648) keyPattern must equal shard key
+* 13500 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L816) how could chunk manager be null!
+* 13512 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L268) drop collection attempted on non-sharded collection
s/config.cpp
@@ -1155,6 +1156,16 @@ s/grid.cpp
* 10421 [code](http://github.com/mongodb/mongo/blob/master/s/grid.cpp#L445) getoptime failed" , conn->simpleCommand( "admin" , &result , "getoptime
+s/mr_shard.cpp
+----
+* 14814 [code](http://github.com/mongodb/mongo/blob/master/s/mr_shard.cpp#L44) couldn't compile code for:
+* 14815 [code](http://github.com/mongodb/mongo/blob/master/s/mr_shard.cpp#L148) value too large to reduce
+* 14816 [code](http://github.com/mongodb/mongo/blob/master/s/mr_shard.cpp#L168) reduce -> multiple not supported yet
+* 14817 [code](http://github.com/mongodb/mongo/blob/master/s/mr_shard.cpp#L230) unknown out specifier [" << t << "]
+* 14818 [code](http://github.com/mongodb/mongo/blob/master/s/mr_shard.cpp#L238) 'out' has to be a string or an object
+* 14819 [code](http://github.com/mongodb/mongo/blob/master/s/mr_shard.cpp#L202) outType is no longer a valid option" , cmdObj["outType
+
+
s/request.cpp
----
* 10192 [code](http://github.com/mongodb/mongo/blob/master/s/request.cpp#L65) db config reload failed!
@@ -1167,7 +1178,7 @@ s/request.cpp
s/server.cpp
----
-* 10197 [code](http://github.com/mongodb/mongo/blob/master/s/server.cpp#L187) createDirectClient not implemented for sharding yet
+* 10197 [code](http://github.com/mongodb/mongo/blob/master/s/server.cpp#L188) createDirectClient not implemented for sharding yet
s/shard.cpp
@@ -1208,6 +1219,7 @@ s/strategy.cpp
s/strategy_shard.cpp
----
+<<<<<<< HEAD
* 10201 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L199) invalid update
* 10203 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L299) bad delete message
* 12376 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L255)
@@ -1225,6 +1237,27 @@ s/strategy_shard.cpp
* 8014 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L250)
* 8015 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L330) can only delete with a non-shard key pattern if can delete as many as we find
* 8016 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L358) can't do this write op on sharded collection
+=======
+* 10201 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L263) invalid update
+* 10203 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L363) bad delete message
+* 12376 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L319)
+* 13123 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L306)
+* 13465 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L278) shard key in upsert query must be an exact match
+* 13505 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L365) $atomic not supported sharded" , pattern["$atomic
+* 13506 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L262) $atomic not supported sharded" , query["$atomic
+* 14804 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L184) collection no longer sharded
+* 14805 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L389) collection no longer sharded
+* 14806 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L351) collection no longer sharded
+* 14812 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L216) tried to insert object without shard key
+* 14813 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L251) collection no longer sharded
+* 8010 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L41) something is wrong, shouldn't see a command here
+* 8011 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L149) tried to insert object without shard key
+* 8012 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L272) can't upsert something without shard key
+* 8013 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L287) can't do non-multi update with query that doesn't have the shard key
+* 8014 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L314)
+* 8015 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L394) can only delete with a non-shard key pattern if can delete as many as we find
+* 8016 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L422) can't do this write op on sharded collection
+>>>>>>> SERVER-2531: added M/R output to shard collection for mode REPLACE
s/strategy_single.cpp
diff --git a/s/commands_public.cpp b/s/commands_public.cpp
index cc4221bf9e1..78eaf79e8dc 100644
--- a/s/commands_public.cpp
+++ b/s/commands_public.cpp
@@ -30,6 +30,7 @@
#include "chunk.h"
#include "strategy.h"
#include "grid.h"
+#include "mr_shard.h"
namespace mongo {
@@ -890,12 +891,13 @@ namespace mongo {
class MRCmd : public PublicGridCommand {
public:
+ AtomicUInt JOB_NUMBER;
+
MRCmd() : PublicGridCommand( "mapreduce" ) {}
string getTmpName( const string& coll ) {
- static int inc = 1;
stringstream ss;
- ss << "tmp.mrs." << coll << "_" << time(0) << "_" << inc++;
+ ss << "tmp.mrs." << coll << "_" << time(0) << "_" << JOB_NUMBER++;
return ss.str();
}
@@ -921,8 +923,8 @@ namespace mongo {
if (fn == "out" && e.type() == Object) {
// check if there is a custom output
BSONObj out = e.embeddedObject();
- if (out.hasField("db"))
- customOut = out;
+// if (out.hasField("db"))
+ customOut = out;
}
}
else {
@@ -946,7 +948,7 @@ namespace mongo {
BSONObj customOut;
BSONObj shardedCommand = fixForShards( cmdObj , shardedOutputCollection, customOut , badShardedField );
- bool customOutDB = ! customOut.isEmpty() && customOut.hasField( "db" );
+ bool customOutDB = customOut.hasField( "db" );
DBConfigPtr conf = grid.getDBConfig( dbName , false );
@@ -981,11 +983,16 @@ namespace mongo {
finalCmd.append( "shardedOutputCollection" , shardedOutputCollection );
+ set<ServerAndQuery> servers;
+ BSONObj shardCounts;
+ BSONObj aggCounts;
+ map<string,long long> countsMap;
{
// we need to use our connections to the shard
// so filtering is done correctly for un-owned docs
// so we allocate them in our thread
// and hand off
+ // Note: why not use pooled connections? This has been reported to create too many connections
vector< shared_ptr<ShardConnection> > shardConns;
@@ -999,8 +1006,11 @@ namespace mongo {
}
bool failed = false;
-
- BSONObjBuilder shardresults;
+
+ // now wait for the result of all shards
+ BSONObjBuilder shardResultsB;
+ BSONObjBuilder shardCountsB;
+ BSONObjBuilder aggCountsB;
for ( list< shared_ptr<Future::CommandResult> >::iterator i=futures.begin(); i!=futures.end(); i++ ) {
shared_ptr<Future::CommandResult> res = *i;
if ( ! res->join() ) {
@@ -1011,7 +1021,19 @@ namespace mongo {
failed = true;
continue;
}
- shardresults.append( res->getServer() , res->result() );
+ BSONObj result = res->result();
+ shardResultsB.append( res->getServer() , result );
+ BSONObj counts = result["counts"].embeddedObjectUserCheck();
+ shardCountsB.append( res->getServer() , counts );
+ servers.insert(res->getServer());
+
+ // add up the counts for each shard
+ // some of them will be fixed later like output and reduce
+ BSONObjIterator j( counts );
+ while ( j.more() ) {
+ BSONElement temp = j.next();
+ countsMap[temp.fieldName()] += temp.numberLong();
+ }
}
for ( unsigned i=0; i<shardConns.size(); i++ )
@@ -1020,28 +1042,143 @@ namespace mongo {
if ( failed )
return 0;
- finalCmd.append( "shards" , shardresults.obj() );
+ finalCmd.append( "shards" , shardResultsB.obj() );
+ shardCounts = shardCountsB.obj();
+ finalCmd.append( "shardCounts" , shardCounts );
timingBuilder.append( "shards" , t.millis() );
+
+ for ( map<string,long long>::iterator i=countsMap.begin(); i!=countsMap.end(); i++ ) {
+ aggCountsB.append( i->first , i->second );
+ }
+ aggCounts = aggCountsB.obj();
+ finalCmd.append( "counts" , aggCounts );
}
Timer t2;
- // by default the target database is same as input
- Shard outServer = conf->getPrimary();
- string outns = fullns;
- if ( customOutDB ) {
- // have to figure out shard for the output DB
- BSONElement elmt = customOut.getField("db");
- string outdb = elmt.valuestrsafe();
- outns = outdb + "." + collection;
- DBConfigPtr conf2 = grid.getDBConfig( outdb , true );
- outServer = conf2->getPrimary();
- }
- log() << "customOut: " << customOut << " outServer: " << outServer << endl;
-
- ShardConnection conn( outServer , outns );
BSONObj finalResult;
- bool ok = conn->runCommand( dbName , finalCmd.obj() , finalResult );
- conn.done();
+ bool ok = false;
+ string outdb = dbName;
+ if (customOutDB) {
+ BSONElement elmt = customOut.getField("db");
+ outdb = elmt.valuestrsafe();
+ }
+
+ if (!customOut.getBoolField("sharded")) {
+ // non-sharded, use the MRFinish command on target server
+ // This will save some data transfer
+
+ // by default the target database is same as input
+ Shard outServer = conf->getPrimary();
+ string outns = fullns;
+ if ( customOutDB ) {
+ // have to figure out shard for the output DB
+ DBConfigPtr conf2 = grid.getDBConfig( outdb , true );
+ outServer = conf2->getPrimary();
+ outns = outdb + "." + collection;
+ }
+ log() << "customOut: " << customOut << " outServer: " << outServer << endl;
+
+ ShardConnection conn( outServer , outns );
+ ok = conn->runCommand( dbName , finalCmd.obj() , finalResult );
+ conn.done();
+ } else {
+ // grab records from each shard and insert back in correct shard in "temp" collection
+ // we do the final reduce in mongos since records are ordered and already reduced on each shard
+// string shardedIncLong = str::stream() << outdb << ".tmp.mr." << collection << "_" << "shardedTemp" << "_" << time(0) << "_" << JOB_NUMBER++;
+
+ mr_shard::Config config( dbName , cmdObj );
+ mr_shard::State state(config);
+ log(1) << "mr sharded output ns: " << config.ns << endl;
+
+ // for now we only support replace output collection in sharded mode
+ if (config.outType != mr_shard::Config::REPLACE) {
+ errmsg = "Only support REPLACE mode for sharded output M/R";
+ return false;
+ }
+
+ if (!config.outDB.empty()) {
+ BSONObjBuilder loc;
+ if ( !config.outDB.empty())
+ loc.append( "db" , config.outDB );
+ if ( !config.finalShort.empty() )
+ loc.append( "collection" , config.finalShort );
+ result.append("result", loc.obj());
+ }
+ else {
+ if ( !config.finalShort.empty() )
+ result.append( "result" , config.finalShort );
+ }
+ string outns = config.finalLong;
+
+ if (config.outType == mr_shard::Config::REPLACE) {
+ // drop previous collection
+ BSONObj dropColCmd = BSON("drop" << config.finalShort);
+ BSONObjBuilder dropColResult(32);
+ string outdbCmd = outdb + ".$cmd";
+ bool res = Command::runAgainstRegistered(outdbCmd.c_str(), dropColCmd, dropColResult);
+ if (!res) {
+ errmsg = str::stream() << "Could not drop sharded output collection " << outns << ": " << dropColResult.obj().toString();
+ return false;
+ }
+ }
+
+ // create the sharded collection
+ BSONObj sortKey = BSON( "_id" << 1 );
+ BSONObj shardColCmd = BSON("shardCollection" << outns << "key" << sortKey);
+ BSONObjBuilder shardColResult(32);
+ bool res = Command::runAgainstRegistered("admin.$cmd", shardColCmd, shardColResult);
+ if (!res) {
+ errmsg = str::stream() << "Could not create sharded output collection " << outns << ": " << shardColResult.obj().toString();
+ return false;
+ }
+
+ ParallelSortClusteredCursor cursor( servers , dbName + "." + shardedOutputCollection ,
+ Query().sort( sortKey ) );
+ cursor.init();
+ state.init();
+
+ mr_shard::BSONList values;
+ Strategy* s = SHARDED;
+ while ( cursor.more() ) {
+ BSONObj t = cursor.next().getOwned();
+ cout << t.toString() << endl;
+
+ if ( values.size() == 0 ) {
+ values.push_back( t );
+ continue;
+ }
+
+ if ( t.woSortOrder( *(values.begin()) , sortKey ) == 0 ) {
+ values.push_back( t );
+ continue;
+ }
+
+ cout << "Doing sharded reduce on " << values.size() << " objects";
+ BSONObj final = config.reducer->finalReduce(values, config.finalizer.get());
+ s->insertSharded(conf, outns.c_str(), final, 0);
+ values.clear();
+ values.push_back( t );
+ }
+
+ if ( values.size() ) {
+ cout << "Doing sharded reduce on " << values.size() << " objects";
+ const BSONObj& final = config.reducer->finalReduce(values, config.finalizer.get());
+ s->insertSharded(conf, outns.c_str(), (BSONObj&) final, 0);
+ }
+
+// state.dumpToInc();
+// state.postProcessCollection();
+// state.appendResults( result );
+
+// for ( set<ServerAndQuery>::iterator i=servers.begin(); i!=servers.end(); i++ ) {
+// ScopedDbConnection conn( i->_server );
+// conn->dropCollection( dbname + "." + shardedOutputCollection );
+// conn.done();
+// }
+ result.append("shardCounts", shardCounts);
+ result.append("counts", aggCounts);
+ ok = true;
+ }
if ( ! ok ) {
errmsg = "final reduce failed: ";
diff --git a/s/mr_shard.cpp b/s/mr_shard.cpp
new file mode 100644
index 00000000000..44a43a2c785
--- /dev/null
+++ b/s/mr_shard.cpp
@@ -0,0 +1,311 @@
+// mr_shard.cpp
+
+/**
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "pch.h"
+#include "../util/message.h"
+#include "../db/dbmessage.h"
+
+#include "mr_shard.h"
+
+namespace mongo {
+
+ namespace mr_shard {
+
+ AtomicUInt Config::JOB_NUMBER;
+
+ JSFunction::JSFunction( string type , const BSONElement& e ) {
+ _type = type;
+ _code = e._asCode();
+
+ if ( e.type() == CodeWScope )
+ _wantedScope = e.codeWScopeObject();
+ }
+
+ void JSFunction::init( State * state ) {
+ _scope = state->scope();
+ assert( _scope );
+ _scope->init( &_wantedScope );
+
+ _func = _scope->createFunction( _code.c_str() );
+ uassert( 14814 , str::stream() << "couldn't compile code for: " << _type , _func );
+
+ // install in JS scope so that it can be called in JS mode
+ _scope->setFunction(_type.c_str(), _code.c_str());
+ }
+
+ /**
+ * Applies the finalize function to a tuple obj (key, val)
+ * Returns tuple obj {_id: key, value: newval}
+ */
+ BSONObj JSFinalizer::finalize( const BSONObj& o ) {
+ Scope * s = _func.scope();
+
+ Scope::NoDBAccess no = s->disableDBAccess( "can't access db inside finalize" );
+ s->invokeSafe( _func.func() , &o, 0 );
+
+ // don't want to use o.objsize() to size b
+ // since there are many cases where the point of finalize
+ // is converting many fields to 1
+ BSONObjBuilder b;
+ b.append( o.firstElement() );
+ s->append( b , "value" , "return" );
+ return b.obj();
+ }
+
+ void JSReducer::init( State * state ) {
+ _func.init( state );
+ }
+
+ /**
+ * Reduces a list of tuple objects (key, value) to a single tuple {"0": key, "1": value}
+ */
+ BSONObj JSReducer::reduce( const BSONList& tuples ) {
+ if (tuples.size() <= 1)
+ return tuples[0];
+ BSONObj key;
+ int endSizeEstimate = 16;
+ _reduce( tuples , key , endSizeEstimate );
+
+ BSONObjBuilder b(endSizeEstimate);
+ b.appendAs( key.firstElement() , "0" );
+ _func.scope()->append( b , "1" , "return" );
+ return b.obj();
+ }
+
+ /**
+ * Reduces a list of tuple object (key, value) to a single tuple {_id: key, value: val}
+ * Also applies a finalizer method if present.
+ */
+ BSONObj JSReducer::finalReduce( const BSONList& tuples , Finalizer * finalizer ) {
+
+ BSONObj res;
+ BSONObj key;
+
+ if (tuples.size() == 1) {
+ // 1 obj, just use it
+ key = tuples[0];
+ BSONObjBuilder b(key.objsize());
+ BSONObjIterator it(key);
+ b.appendAs( it.next() , "_id" );
+ b.appendAs( it.next() , "value" );
+ res = b.obj();
+ }
+ else {
+ // need to reduce
+ int endSizeEstimate = 16;
+ _reduce( tuples , key , endSizeEstimate );
+ BSONObjBuilder b(endSizeEstimate);
+ b.appendAs( key.firstElement() , "_id" );
+ _func.scope()->append( b , "value" , "return" );
+ res = b.obj();
+ }
+
+ if ( finalizer ) {
+ res = finalizer->finalize( res );
+ }
+
+ return res;
+ }
+
+ /**
+ * actually applies a reduce, to a list of tuples (key, value).
+ * After the call, tuples will hold a single tuple {"0": key, "1": value}
+ */
+ void JSReducer::_reduce( const BSONList& tuples , BSONObj& key , int& endSizeEstimate ) {
+ int sizeEstimate = ( tuples.size() * tuples.begin()->getField( "value" ).size() ) + 128;
+
+ // need to build the reduce args: ( key, [values] )
+ BSONObjBuilder reduceArgs( sizeEstimate );
+ boost::scoped_ptr<BSONArrayBuilder> valueBuilder;
+ int sizeSoFar = 0;
+ unsigned n = 0;
+ for ( ; n<tuples.size(); n++ ) {
+ BSONObjIterator j(tuples[n]);
+ BSONElement keyE = j.next();
+ if ( n == 0 ) {
+ reduceArgs.append( keyE );
+ key = keyE.wrap();
+ sizeSoFar = 5 + keyE.size();
+ valueBuilder.reset(new BSONArrayBuilder( reduceArgs.subarrayStart( "tuples" ) ));
+ }
+
+ BSONElement ee = j.next();
+
+ uassert( 14815 , "value too large to reduce" , ee.size() < ( BSONObjMaxUserSize / 2 ) );
+
+ if ( sizeSoFar + ee.size() > BSONObjMaxUserSize ) {
+ assert( n > 1 ); // if not, inf. loop
+ break;
+ }
+
+ valueBuilder->append( ee );
+ sizeSoFar += ee.size();
+ }
+ assert(valueBuilder);
+ valueBuilder->done();
+ BSONObj args = reduceArgs.obj();
+
+ Scope * s = _func.scope();
+
+ s->invokeSafe( _func.func() , &args, 0 );
+ ++numReduces;
+
+ if ( s->type( "return" ) == Array ) {
+ uasserted( 14816 , "reduce -> multiple not supported yet");
+ return;
+ }
+
+ endSizeEstimate = key.objsize() + ( args.objsize() / tuples.size() );
+
+ if ( n == tuples.size() )
+ return;
+
+ // the input list was too large, add the rest of elmts to new tuples and reduce again
+ // note: would be better to use loop instead of recursion to avoid stack overflow
+ BSONList x;
+ for ( ; n < tuples.size(); n++ ) {
+ x.push_back( tuples[n] );
+ }
+ BSONObjBuilder temp( endSizeEstimate );
+ temp.append( key.firstElement() );
+ s->append( temp , "1" , "return" );
+ x.push_back( temp.obj() );
+ _reduce( x , key , endSizeEstimate );
+ }
+
+ Config::Config( const string& _dbname , const BSONObj& cmdObj ) {
+
+ dbname = _dbname;
+ ns = dbname + "." + cmdObj.firstElement().valuestr();
+
+ verbose = cmdObj["verbose"].trueValue();
+ jsMode = cmdObj["jsMode"].trueValue();
+
+ jsMaxKeys = 500000;
+ reduceTriggerRatio = 2.0;
+ maxInMemSize = 5 * 1024 * 1024;
+
+ uassert( 14819 , "outType is no longer a valid option" , cmdObj["outType"].eoo() );
+
+ if ( cmdObj["out"].type() == String ) {
+ finalShort = cmdObj["out"].String();
+ outType = REPLACE;
+ }
+ else if ( cmdObj["out"].type() == Object ) {
+ BSONObj o = cmdObj["out"].embeddedObject();
+
+ BSONElement e = o.firstElement();
+ string t = e.fieldName();
+
+ if ( t == "normal" || t == "replace" ) {
+ outType = REPLACE;
+ finalShort = e.String();
+ }
+ else if ( t == "merge" ) {
+ outType = MERGE;
+ finalShort = e.String();
+ }
+ else if ( t == "reduce" ) {
+ outType = REDUCE;
+ finalShort = e.String();
+ }
+ else if ( t == "inline" ) {
+ outType = INMEMORY;
+ }
+ else {
+ uasserted( 14817 , str::stream() << "unknown out specifier [" << t << "]" );
+ }
+
+ if (o.hasElement("db")) {
+ outDB = o["db"].String();
+ }
+ }
+ else {
+ uasserted( 14818 , "'out' has to be a string or an object" );
+ }
+
+ if ( outType != INMEMORY ) { // setup names
+ tempLong = str::stream() << (outDB.empty() ? dbname : outDB) << ".tmp.mr." << cmdObj.firstElement().String() << "_" << finalShort << "_" << JOB_NUMBER++;
+
+ incLong = tempLong + "_inc";
+
+ finalLong = str::stream() << (outDB.empty() ? dbname : outDB) << "." << finalShort;
+ }
+
+ {
+ // scope and code
+
+ if ( cmdObj["scope"].type() == Object )
+ scopeSetup = cmdObj["scope"].embeddedObjectUserCheck();
+
+ reducer.reset( new JSReducer( cmdObj["reduce"] ) );
+ if ( cmdObj["finalize"].type() && cmdObj["finalize"].trueValue() )
+ finalizer.reset( new JSFinalizer( cmdObj["finalize"] ) );
+
+ }
+
+ {
+ // query options
+ if ( cmdObj["limit"].isNumber() )
+ limit = cmdObj["limit"].numberLong();
+ else
+ limit = 0;
+ }
+ }
+
+ State::State( const Config& c ) : _config( c ) {
+ _onDisk = _config.outType != Config::INMEMORY;
+ }
+
+ State::~State() {
+ if ( _onDisk ) {
+ try {
+// _db.dropCollection( _config.tempLong );
+// _db.dropCollection( _config.incLong );
+ }
+ catch ( std::exception& e ) {
+ error() << "couldn't cleanup after map reduce: " << e.what() << endl;
+ }
+ }
+
+ if (_scope) {
+ // cleanup js objects
+ ScriptingFunction cleanup = _scope->createFunction("delete _emitCt; delete _keyCt; delete _mrMap;");
+ _scope->invoke(cleanup, 0, 0, 0, true);
+ }
+ }
+
+ /**
+ * Initialize the mapreduce operation, creating the inc collection
+ */
+ void State::init() {
+ // setup js
+ _scope.reset(globalScriptEngine->getPooledScope( _config.dbname ).release() );
+// _scope->localConnect( _config.dbname.c_str() );
+ _scope->externalSetup();
+
+ if ( ! _config.scopeSetup.isEmpty() )
+ _scope->init( &_config.scopeSetup );
+
+ _config.reducer->init( this );
+ if ( _config.finalizer )
+ _config.finalizer->init( this );
+ _scope->setBoolean("_doFinal", _config.finalizer);
+ }
+ }
+}
+
diff --git a/s/mr_shard.h b/s/mr_shard.h
new file mode 100644
index 00000000000..9afabface0d
--- /dev/null
+++ b/s/mr_shard.h
@@ -0,0 +1,230 @@
+// mr_shard.h
+
+/**
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include "pch.h"
+
+namespace mongo {
+
+ namespace mr_shard {
+
+ typedef vector<BSONObj> BSONList;
+
+ class State;
+
+ // ------------ function interfaces -----------
+
+ class Finalizer : boost::noncopyable {
+ public:
+ virtual ~Finalizer() {}
+ virtual void init( State * state ) = 0;
+
+ /**
+ * this takes a tuple and returns a tuple
+ */
+ virtual BSONObj finalize( const BSONObj& tuple ) = 0;
+ };
+
+ class Reducer : boost::noncopyable {
+ public:
+ Reducer() : numReduces(0) {}
+ virtual ~Reducer() {}
+ virtual void init( State * state ) = 0;
+
+ virtual BSONObj reduce( const BSONList& tuples ) = 0;
+ /** this means its a final reduce, even if there is no finalizer */
+ virtual BSONObj finalReduce( const BSONList& tuples , Finalizer * finalizer ) = 0;
+
+ long long numReduces;
+ };
+
+ // ------------ js function implementations -----------
+
+ /**
+ * used as a holder for Scope and ScriptingFunction
+ * visitor like pattern as Scope is gotten from first access
+ */
+ class JSFunction : boost::noncopyable {
+ public:
+ /**
+ * @param type (map|reduce|finalize)
+ */
+ JSFunction( string type , const BSONElement& e );
+ virtual ~JSFunction() {}
+
+ virtual void init( State * state );
+
+ Scope * scope() const { return _scope; }
+ ScriptingFunction func() const { return _func; }
+
+ private:
+ string _type;
+ string _code; // actual javascript code
+ BSONObj _wantedScope; // this is for CodeWScope
+
+ Scope * _scope; // this is not owned by us, and might be shared
+ ScriptingFunction _func;
+ };
+
+ class JSReducer : public Reducer {
+ public:
+ JSReducer( const BSONElement& code ) : _func( "_reduce" , code ) {}
+ virtual void init( State * state );
+
+ virtual BSONObj reduce( const BSONList& tuples );
+ virtual BSONObj finalReduce( const BSONList& tuples , Finalizer * finalizer );
+
+ private:
+
+ /**
+ * result in "return"
+ * @param key OUT
+ * @param endSizeEstimate OUT
+ */
+ void _reduce( const BSONList& values , BSONObj& key , int& endSizeEstimate );
+
+ JSFunction _func;
+ };
+
+ class JSFinalizer : public Finalizer {
+ public:
+ JSFinalizer( const BSONElement& code ) : _func( "_finalize" , code ) {}
+ virtual BSONObj finalize( const BSONObj& o );
+ virtual void init( State * state ) { _func.init( state ); }
+ private:
+ JSFunction _func;
+
+ };
+
+ // -----------------
+
+ /**
+ * holds map/reduce config information
+ */
+ class Config {
+ public:
+ Config( const string& _dbname , const BSONObj& cmdObj );
+
+ string dbname;
+ string ns;
+
+ // options
+ bool verbose;
+ bool jsMode;
+
+ // query options
+
+ BSONObj filter;
+ BSONObj sort;
+ long long limit;
+
+ // functions
+ scoped_ptr<Reducer> reducer;
+ scoped_ptr<Finalizer> finalizer;
+
+ BSONObj mapParams;
+ BSONObj scopeSetup;
+
+ // output tables
+ string incLong;
+ string tempLong;
+
+ string finalShort;
+ string finalLong;
+
+ string outDB;
+
+ // max number of keys allowed in JS map before switching mode
+ long jsMaxKeys;
+ // ratio of duplicates vs unique keys before reduce is triggered in js mode
+ float reduceTriggerRatio;
+ // maximum size of map before it gets dumped to disk
+ long maxInMemSize;
+
+ enum { REPLACE , // atomically replace the collection
+ MERGE , // merge keys, override dups
+ REDUCE , // merge keys, reduce dups
+ INMEMORY // only store in memory, limited in size
+ } outType;
+
+ static AtomicUInt JOB_NUMBER;
+ }; // end MRsetup
+
+ /**
+ * stores information about intermediate map reduce state
+ * controls flow of data from map->reduce->finalize->output
+ */
+ class State {
+ public:
+ State( const Config& c );
+ ~State();
+
+ void init();
+
+ // ---- prep -----
+ bool sourceExists();
+
+ long long incomingDocuments();
+
+ // ---- map stage ----
+
+ /**
+ * stages on in in-memory storage
+ */
+ void emit( const BSONObj& a );
+
+ /**
+ * if size is big, run a reduce
+ * if its still big, dump to temp collection
+ */
+ void checkSize();
+
+ /**
+ * run reduce on _temp
+ */
+ void reduceInMemory();
+
+ // ------ reduce stage -----------
+
+ void prepTempCollection();
+
+ void finalReduce( BSONList& values );
+
+ void finalReduce( CurOp * op , ProgressMeterHolder& pm );
+
+ // ------ simple accessors -----
+
+ /** State maintains ownership, do no use past State lifetime */
+ Scope* scope() { return _scope.get(); }
+
+ const Config& config() { return _config; }
+
+ const bool isOnDisk() { return _onDisk; }
+
+ const Config& _config;
+
+ protected:
+
+ scoped_ptr<Scope> _scope;
+ bool _onDisk; // if the end result of this map reduce is disk or not
+ };
+
+ } // end mr namespace
+}
+
+
diff --git a/s/server.cpp b/s/server.cpp
index 5d0d2931c6e..48e284523af 100644
--- a/s/server.cpp
+++ b/s/server.cpp
@@ -44,6 +44,7 @@ namespace mongo {
Database *database = 0;
string mongosCommand;
bool dbexitCalled = false;
+ static bool scriptingEnabled = true;
bool inShutdown() {
return dbexitCalled;
@@ -220,6 +221,7 @@ int _main(int argc, char* argv[]) {
( "chunkSize" , po::value<int>(), "maximum amount of data per chunk" )
( "ipv6", "enable IPv6 support (disabled by default)" )
( "jsonp","allow JSONP access via http (has security implications)" )
+ ("noscripting", "disable scripting engine")
;
options.add(sharding_options);
@@ -261,6 +263,10 @@ int _main(int argc, char* argv[]) {
return 0;
}
+ if (params.count("noscripting")) {
+ scriptingEnabled = false;
+ }
+
if ( ! params.count( "configdb" ) ) {
out() << "error: no args for --configdb" << endl;
return 4;
@@ -348,6 +354,12 @@ int _main(int argc, char* argv[]) {
boost::thread web( boost::bind(&webServerThread, new NoAdminAccess() /* takes ownership */) );
+ if ( scriptingEnabled ) {
+ ScriptEngine::setup();
+// globalScriptEngine->setCheckInterruptCallback( jsInterruptCallback );
+// globalScriptEngine->setGetInterruptSpecCallback( jsGetInterruptSpecCallback );
+ }
+
MessageServer::Options opts;
opts.port = cmdLine.port;
opts.ipList = cmdLine.bind_ip;
diff --git a/s/strategy.cpp b/s/strategy.cpp
index 7502924f6bb..64adcadb933 100644
--- a/s/strategy.cpp
+++ b/s/strategy.cpp
@@ -76,4 +76,5 @@ namespace mongo {
dbcon->insert( ns , obj , flags);
dbcon.done();
}
+
}
diff --git a/s/strategy.h b/s/strategy.h
index 78c5af6f90b..9bbd2e4077b 100644
--- a/s/strategy.h
+++ b/s/strategy.h
@@ -32,6 +32,8 @@ namespace mongo {
virtual void getMore( Request& r ) = 0;
virtual void writeOp( int op , Request& r ) = 0;
+ virtual void insertSharded( DBConfigPtr conf, const char* ns, BSONObj& o, int flags ) = 0;
+
protected:
void doWrite( int op , Request& r , const Shard& shard , bool checkVersion = true );
void doQuery( Request& r , const Shard& shard );
diff --git a/s/strategy_shard.cpp b/s/strategy_shard.cpp
index 62a5417880f..600ded54799 100644
--- a/s/strategy_shard.cpp
+++ b/s/strategy_shard.cpp
@@ -191,6 +191,63 @@ namespace mongo {
}
}
+ void insertSharded( DBConfigPtr conf, const char* ns, BSONObj& o, int flags ) {
+ ChunkManagerPtr manager = conf->getChunkManager(ns);
+ if ( ! manager->hasShardKey( o ) ) {
+
+ bool bad = true;
+
+ if ( manager->getShardKey().partOfShardKey( "_id" ) ) {
+ BSONObjBuilder b;
+ b.appendOID( "_id" , 0 , true );
+ b.appendElements( o );
+ o = b.obj();
+ bad = ! manager->hasShardKey( o );
+ }
+
+ if ( bad ) {
+ log() << "tried to insert object without shard key: " << ns << " " << o << endl;
+ uasserted( 14812 , "tried to insert object without shard key" );
+ }
+
+ }
+
+ // Many operations benefit from having the shard key early in the object
+ o = manager->getShardKey().moveToFront(o);
+
+ const int maxTries = 30;
+
+ bool gotThrough = false;
+ for ( int i=0; i<maxTries; i++ ) {
+ try {
+ ChunkPtr c = manager->findChunk( o );
+ log(4) << " server:" << c->getShard().toString() << " " << o << endl;
+ insert( c->getShard() , ns , o , flags);
+
+// r.gotInsert();
+// if ( r.getClientInfo()->autoSplitOk() )
+ c->splitIfShould( o.objsize() );
+ gotThrough = true;
+ break;
+ }
+ catch ( StaleConfigException& e ) {
+ int logLevel = i < ( maxTries / 2 );
+ LOG( logLevel ) << "retrying insert because of StaleConfigException: " << e << " object: " << o << endl;
+// r.reset();
+
+ unsigned long long old = manager->getSequenceNumber();
+ manager = conf->getChunkManager(ns);
+
+ LOG( logLevel ) << " sequenece number - old: " << old << " new: " << manager->getSequenceNumber() << endl;
+
+ if (!manager) {
+ uasserted(14813, "collection no longer sharded");
+ }
+ }
+ sleepmillis( i * 20 );
+ }
+ }
+
void _update( Request& r , DbMessage& d, ChunkManagerPtr manager ) {
int flags = d.pullInt();
diff --git a/s/strategy_single.cpp b/s/strategy_single.cpp
index b364880e5c6..34e3fe00e97 100644
--- a/s/strategy_single.cpp
+++ b/s/strategy_single.cpp
@@ -262,6 +262,10 @@ namespace mongo {
return true;
}
+ void insertSharded( DBConfigPtr conf, const char* ns, BSONObj& o, int flags ) {
+ // only useful for shards
+ }
+
set<string> _commandsSafeToPass;
};