diff options
-rw-r--r-- | SConstruct | 2 | ||||
-rw-r--r-- | db/commands/mr.cpp | 23 | ||||
-rw-r--r-- | docs/errors.md | 77 | ||||
-rw-r--r-- | s/commands_public.cpp | 187 | ||||
-rw-r--r-- | s/mr_shard.cpp | 311 | ||||
-rw-r--r-- | s/mr_shard.h | 230 | ||||
-rw-r--r-- | s/server.cpp | 12 | ||||
-rw-r--r-- | s/strategy.cpp | 1 | ||||
-rw-r--r-- | s/strategy.h | 2 | ||||
-rw-r--r-- | s/strategy_shard.cpp | 57 | ||||
-rw-r--r-- | s/strategy_single.cpp | 4 |
11 files changed, 839 insertions, 67 deletions
diff --git a/SConstruct b/SConstruct index 385a0ac1041..c91f91e23df 100644 --- a/SConstruct +++ b/SConstruct @@ -373,7 +373,7 @@ else: coreServerFiles += scriptingFiles coreShardFiles = [ "s/config.cpp" , "s/grid.cpp" , "s/chunk.cpp" , "s/shard.cpp" , "s/shardkey.cpp" ] -shardServerFiles = coreShardFiles + Glob( "s/strategy*.cpp" ) + [ "s/commands_admin.cpp" , "s/commands_public.cpp" , "s/request.cpp" , "s/client.cpp" , "s/cursors.cpp" , "s/server.cpp" , "s/config_migrate.cpp" , "s/s_only.cpp" , "s/stats.cpp" , "s/balance.cpp" , "s/balancer_policy.cpp" , "db/cmdline.cpp" , "s/writeback_listener.cpp" , "s/shard_version.cpp" ] +shardServerFiles = coreShardFiles + Glob( "s/strategy*.cpp" ) + [ "s/commands_admin.cpp" , "s/commands_public.cpp" , "s/request.cpp" , "s/client.cpp" , "s/cursors.cpp" , "s/server.cpp" , "s/config_migrate.cpp" , "s/s_only.cpp" , "s/stats.cpp" , "s/balance.cpp" , "s/balancer_policy.cpp" , "db/cmdline.cpp" , "s/writeback_listener.cpp" , "s/shard_version.cpp", "s/mr_shard.cpp" ] serverOnlyFiles += coreShardFiles + [ "s/d_logic.cpp" , "s/d_writeback.cpp" , "s/d_migrate.cpp" , "s/d_state.cpp" , "s/d_split.cpp" , "client/distlock_test.cpp" , "s/d_chunk_manager.cpp" ] serverOnlyFiles += [ "db/module.cpp" ] + Glob( "db/modules/*.cpp" ) diff --git a/db/commands/mr.cpp b/db/commands/mr.cpp index bf52f46176f..c55ef3d77cc 100644 --- a/db/commands/mr.cpp +++ b/db/commands/mr.cpp @@ -1113,10 +1113,9 @@ namespace mongo { set<ServerAndQuery> servers; - BSONObjBuilder shardCounts; - map<string,long long> counts; - BSONObj shards = cmdObj["shards"].embeddedObjectUserCheck(); + BSONObj shardCounts = cmdObj["shardCounts"].embeddedObjectUserCheck(); + BSONObj counts = cmdObj["counts"].embeddedObjectUserCheck(); vector< auto_ptr<DBClientCursor> > shardCursors; { @@ -1130,13 +1129,6 @@ namespace mongo { uassert( 10078 , "something bad happened" , shardedOutputCollection == res["result"].valuestrsafe() ); servers.insert( shard ); - shardCounts.appendAs( res["counts"] , shard ); - - BSONObjIterator j( res["counts"].embeddedObjectUserCheck() ); - while ( j.more() ) { - BSONElement temp = j.next(); - counts[temp.fieldName()] += temp.numberLong(); - } } @@ -1203,15 +1195,8 @@ namespace mongo { conn.done(); } - result.append( "shardCounts" , shardCounts.obj() ); - - { - BSONObjBuilder c; - for ( map<string,long long>::iterator i=counts.begin(); i!=counts.end(); i++ ) { - c.append( i->first , i->second ); - } - result.append( "counts" , c.obj() ); - } + result.append( "shardCounts" , shardCounts ); + result.append( "counts" , counts ); return 1; } diff --git a/docs/errors.md b/docs/errors.md index ac50c305319..4fdbaa4f7d5 100644 --- a/docs/errors.md +++ b/docs/errors.md @@ -289,7 +289,7 @@ db/commands/mr.cpp * 10075 [code](http://github.com/mongodb/mongo/blob/master/db/commands/mr.cpp#L196) reduce -> multiple not supported yet * 10076 [code](http://github.com/mongodb/mongo/blob/master/db/commands/mr.cpp#L421) rename failed: * 10077 [code](http://github.com/mongodb/mongo/blob/master/db/commands/mr.cpp#L877) fast_emit takes 2 args -* 10078 [code](http://github.com/mongodb/mongo/blob/master/db/commands/mr.cpp#L1131) something bad happened" , shardedOutputCollection == res["result +* 10078 [code](http://github.com/mongodb/mongo/blob/master/db/commands/mr.cpp#L1130) something bad happened" , shardedOutputCollection == res["result * 13069 [code](http://github.com/mongodb/mongo/blob/master/db/commands/mr.cpp#L878) an emit can't be more than half max bson size * 13070 [code](http://github.com/mongodb/mongo/blob/master/db/commands/mr.cpp#L176) value too large to reduce * 13522 [code](http://github.com/mongodb/mongo/blob/master/db/commands/mr.cpp#L258) unknown out specifier [" << t << "] @@ -1062,26 +1062,27 @@ s/client.cpp s/commands_public.cpp ---- -* 10418 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L264) how could chunk manager be null! -* 10420 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L719) how could chunk manager be null! -* 12594 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L487) how could chunk manager be null! -* 13002 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L609) how could chunk manager be null! -* 13091 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L784) how could chunk manager be null! -* 13092 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L785) GridFS chunks collection can only be sharded on files_id", cm->getShardKey().key() == BSON("files_id -* 13137 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L321) Source and destination collections must be on same shard -* 13138 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L315) You can't rename a sharded collection -* 13139 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L316) You can't rename to a sharded collection -* 13140 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L314) Don't recognize source or target DB -* 13343 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L612) query for sharded findAndModify must have shardkey -* 13398 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L335) cant copy to sharded DB -* 13399 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L343) need a fromdb argument -* 13400 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L346) don't know where source DB is -* 13401 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L347) cant copy from sharded DB -* 13402 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L332) need a todb argument -* 13407 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L641) how could chunk manager be null! -* 13408 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L647) keyPattern must equal shard key -* 13500 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L815) how could chunk manager be null! -* 13512 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L267) drop collection attempted on non-sharded collection +* 1 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L1094) +* 10418 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L265) how could chunk manager be null! +* 10420 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L720) how could chunk manager be null! +* 12594 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L488) how could chunk manager be null! +* 13002 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L610) how could chunk manager be null! +* 13091 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L785) how could chunk manager be null! +* 13092 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L786) GridFS chunks collection can only be sharded on files_id", cm->getShardKey().key() == BSON("files_id +* 13137 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L322) Source and destination collections must be on same shard +* 13138 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L316) You can't rename a sharded collection +* 13139 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L317) You can't rename to a sharded collection +* 13140 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L315) Don't recognize source or target DB +* 13343 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L613) query for sharded findAndModify must have shardkey +* 13398 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L336) cant copy to sharded DB +* 13399 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L344) need a fromdb argument +* 13400 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L347) don't know where source DB is +* 13401 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L348) cant copy from sharded DB +* 13402 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L333) need a todb argument +* 13407 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L642) how could chunk manager be null! +* 13408 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L648) keyPattern must equal shard key +* 13500 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L816) how could chunk manager be null! +* 13512 [code](http://github.com/mongodb/mongo/blob/master/s/commands_public.cpp#L268) drop collection attempted on non-sharded collection s/config.cpp @@ -1155,6 +1156,16 @@ s/grid.cpp * 10421 [code](http://github.com/mongodb/mongo/blob/master/s/grid.cpp#L445) getoptime failed" , conn->simpleCommand( "admin" , &result , "getoptime +s/mr_shard.cpp +---- +* 14814 [code](http://github.com/mongodb/mongo/blob/master/s/mr_shard.cpp#L44) couldn't compile code for: +* 14815 [code](http://github.com/mongodb/mongo/blob/master/s/mr_shard.cpp#L148) value too large to reduce +* 14816 [code](http://github.com/mongodb/mongo/blob/master/s/mr_shard.cpp#L168) reduce -> multiple not supported yet +* 14817 [code](http://github.com/mongodb/mongo/blob/master/s/mr_shard.cpp#L230) unknown out specifier [" << t << "] +* 14818 [code](http://github.com/mongodb/mongo/blob/master/s/mr_shard.cpp#L238) 'out' has to be a string or an object +* 14819 [code](http://github.com/mongodb/mongo/blob/master/s/mr_shard.cpp#L202) outType is no longer a valid option" , cmdObj["outType + + s/request.cpp ---- * 10192 [code](http://github.com/mongodb/mongo/blob/master/s/request.cpp#L65) db config reload failed! @@ -1167,7 +1178,7 @@ s/request.cpp s/server.cpp ---- -* 10197 [code](http://github.com/mongodb/mongo/blob/master/s/server.cpp#L187) createDirectClient not implemented for sharding yet +* 10197 [code](http://github.com/mongodb/mongo/blob/master/s/server.cpp#L188) createDirectClient not implemented for sharding yet s/shard.cpp @@ -1208,6 +1219,7 @@ s/strategy.cpp s/strategy_shard.cpp ---- +<<<<<<< HEAD * 10201 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L199) invalid update * 10203 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L299) bad delete message * 12376 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L255) @@ -1225,6 +1237,27 @@ s/strategy_shard.cpp * 8014 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L250) * 8015 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L330) can only delete with a non-shard key pattern if can delete as many as we find * 8016 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L358) can't do this write op on sharded collection +======= +* 10201 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L263) invalid update +* 10203 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L363) bad delete message +* 12376 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L319) +* 13123 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L306) +* 13465 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L278) shard key in upsert query must be an exact match +* 13505 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L365) $atomic not supported sharded" , pattern["$atomic +* 13506 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L262) $atomic not supported sharded" , query["$atomic +* 14804 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L184) collection no longer sharded +* 14805 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L389) collection no longer sharded +* 14806 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L351) collection no longer sharded +* 14812 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L216) tried to insert object without shard key +* 14813 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L251) collection no longer sharded +* 8010 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L41) something is wrong, shouldn't see a command here +* 8011 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L149) tried to insert object without shard key +* 8012 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L272) can't upsert something without shard key +* 8013 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L287) can't do non-multi update with query that doesn't have the shard key +* 8014 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L314) +* 8015 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L394) can only delete with a non-shard key pattern if can delete as many as we find +* 8016 [code](http://github.com/mongodb/mongo/blob/master/s/strategy_shard.cpp#L422) can't do this write op on sharded collection +>>>>>>> SERVER-2531: added M/R output to shard collection for mode REPLACE s/strategy_single.cpp diff --git a/s/commands_public.cpp b/s/commands_public.cpp index cc4221bf9e1..78eaf79e8dc 100644 --- a/s/commands_public.cpp +++ b/s/commands_public.cpp @@ -30,6 +30,7 @@ #include "chunk.h" #include "strategy.h" #include "grid.h" +#include "mr_shard.h" namespace mongo { @@ -890,12 +891,13 @@ namespace mongo { class MRCmd : public PublicGridCommand { public: + AtomicUInt JOB_NUMBER; + MRCmd() : PublicGridCommand( "mapreduce" ) {} string getTmpName( const string& coll ) { - static int inc = 1; stringstream ss; - ss << "tmp.mrs." << coll << "_" << time(0) << "_" << inc++; + ss << "tmp.mrs." << coll << "_" << time(0) << "_" << JOB_NUMBER++; return ss.str(); } @@ -921,8 +923,8 @@ namespace mongo { if (fn == "out" && e.type() == Object) { // check if there is a custom output BSONObj out = e.embeddedObject(); - if (out.hasField("db")) - customOut = out; +// if (out.hasField("db")) + customOut = out; } } else { @@ -946,7 +948,7 @@ namespace mongo { BSONObj customOut; BSONObj shardedCommand = fixForShards( cmdObj , shardedOutputCollection, customOut , badShardedField ); - bool customOutDB = ! customOut.isEmpty() && customOut.hasField( "db" ); + bool customOutDB = customOut.hasField( "db" ); DBConfigPtr conf = grid.getDBConfig( dbName , false ); @@ -981,11 +983,16 @@ namespace mongo { finalCmd.append( "shardedOutputCollection" , shardedOutputCollection ); + set<ServerAndQuery> servers; + BSONObj shardCounts; + BSONObj aggCounts; + map<string,long long> countsMap; { // we need to use our connections to the shard // so filtering is done correctly for un-owned docs // so we allocate them in our thread // and hand off + // Note: why not use pooled connections? This has been reported to create too many connections vector< shared_ptr<ShardConnection> > shardConns; @@ -999,8 +1006,11 @@ namespace mongo { } bool failed = false; - - BSONObjBuilder shardresults; + + // now wait for the result of all shards + BSONObjBuilder shardResultsB; + BSONObjBuilder shardCountsB; + BSONObjBuilder aggCountsB; for ( list< shared_ptr<Future::CommandResult> >::iterator i=futures.begin(); i!=futures.end(); i++ ) { shared_ptr<Future::CommandResult> res = *i; if ( ! res->join() ) { @@ -1011,7 +1021,19 @@ namespace mongo { failed = true; continue; } - shardresults.append( res->getServer() , res->result() ); + BSONObj result = res->result(); + shardResultsB.append( res->getServer() , result ); + BSONObj counts = result["counts"].embeddedObjectUserCheck(); + shardCountsB.append( res->getServer() , counts ); + servers.insert(res->getServer()); + + // add up the counts for each shard + // some of them will be fixed later like output and reduce + BSONObjIterator j( counts ); + while ( j.more() ) { + BSONElement temp = j.next(); + countsMap[temp.fieldName()] += temp.numberLong(); + } } for ( unsigned i=0; i<shardConns.size(); i++ ) @@ -1020,28 +1042,143 @@ namespace mongo { if ( failed ) return 0; - finalCmd.append( "shards" , shardresults.obj() ); + finalCmd.append( "shards" , shardResultsB.obj() ); + shardCounts = shardCountsB.obj(); + finalCmd.append( "shardCounts" , shardCounts ); timingBuilder.append( "shards" , t.millis() ); + + for ( map<string,long long>::iterator i=countsMap.begin(); i!=countsMap.end(); i++ ) { + aggCountsB.append( i->first , i->second ); + } + aggCounts = aggCountsB.obj(); + finalCmd.append( "counts" , aggCounts ); } Timer t2; - // by default the target database is same as input - Shard outServer = conf->getPrimary(); - string outns = fullns; - if ( customOutDB ) { - // have to figure out shard for the output DB - BSONElement elmt = customOut.getField("db"); - string outdb = elmt.valuestrsafe(); - outns = outdb + "." + collection; - DBConfigPtr conf2 = grid.getDBConfig( outdb , true ); - outServer = conf2->getPrimary(); - } - log() << "customOut: " << customOut << " outServer: " << outServer << endl; - - ShardConnection conn( outServer , outns ); BSONObj finalResult; - bool ok = conn->runCommand( dbName , finalCmd.obj() , finalResult ); - conn.done(); + bool ok = false; + string outdb = dbName; + if (customOutDB) { + BSONElement elmt = customOut.getField("db"); + outdb = elmt.valuestrsafe(); + } + + if (!customOut.getBoolField("sharded")) { + // non-sharded, use the MRFinish command on target server + // This will save some data transfer + + // by default the target database is same as input + Shard outServer = conf->getPrimary(); + string outns = fullns; + if ( customOutDB ) { + // have to figure out shard for the output DB + DBConfigPtr conf2 = grid.getDBConfig( outdb , true ); + outServer = conf2->getPrimary(); + outns = outdb + "." + collection; + } + log() << "customOut: " << customOut << " outServer: " << outServer << endl; + + ShardConnection conn( outServer , outns ); + ok = conn->runCommand( dbName , finalCmd.obj() , finalResult ); + conn.done(); + } else { + // grab records from each shard and insert back in correct shard in "temp" collection + // we do the final reduce in mongos since records are ordered and already reduced on each shard +// string shardedIncLong = str::stream() << outdb << ".tmp.mr." << collection << "_" << "shardedTemp" << "_" << time(0) << "_" << JOB_NUMBER++; + + mr_shard::Config config( dbName , cmdObj ); + mr_shard::State state(config); + log(1) << "mr sharded output ns: " << config.ns << endl; + + // for now we only support replace output collection in sharded mode + if (config.outType != mr_shard::Config::REPLACE) { + errmsg = "Only support REPLACE mode for sharded output M/R"; + return false; + } + + if (!config.outDB.empty()) { + BSONObjBuilder loc; + if ( !config.outDB.empty()) + loc.append( "db" , config.outDB ); + if ( !config.finalShort.empty() ) + loc.append( "collection" , config.finalShort ); + result.append("result", loc.obj()); + } + else { + if ( !config.finalShort.empty() ) + result.append( "result" , config.finalShort ); + } + string outns = config.finalLong; + + if (config.outType == mr_shard::Config::REPLACE) { + // drop previous collection + BSONObj dropColCmd = BSON("drop" << config.finalShort); + BSONObjBuilder dropColResult(32); + string outdbCmd = outdb + ".$cmd"; + bool res = Command::runAgainstRegistered(outdbCmd.c_str(), dropColCmd, dropColResult); + if (!res) { + errmsg = str::stream() << "Could not drop sharded output collection " << outns << ": " << dropColResult.obj().toString(); + return false; + } + } + + // create the sharded collection + BSONObj sortKey = BSON( "_id" << 1 ); + BSONObj shardColCmd = BSON("shardCollection" << outns << "key" << sortKey); + BSONObjBuilder shardColResult(32); + bool res = Command::runAgainstRegistered("admin.$cmd", shardColCmd, shardColResult); + if (!res) { + errmsg = str::stream() << "Could not create sharded output collection " << outns << ": " << shardColResult.obj().toString(); + return false; + } + + ParallelSortClusteredCursor cursor( servers , dbName + "." + shardedOutputCollection , + Query().sort( sortKey ) ); + cursor.init(); + state.init(); + + mr_shard::BSONList values; + Strategy* s = SHARDED; + while ( cursor.more() ) { + BSONObj t = cursor.next().getOwned(); + cout << t.toString() << endl; + + if ( values.size() == 0 ) { + values.push_back( t ); + continue; + } + + if ( t.woSortOrder( *(values.begin()) , sortKey ) == 0 ) { + values.push_back( t ); + continue; + } + + cout << "Doing sharded reduce on " << values.size() << " objects"; + BSONObj final = config.reducer->finalReduce(values, config.finalizer.get()); + s->insertSharded(conf, outns.c_str(), final, 0); + values.clear(); + values.push_back( t ); + } + + if ( values.size() ) { + cout << "Doing sharded reduce on " << values.size() << " objects"; + const BSONObj& final = config.reducer->finalReduce(values, config.finalizer.get()); + s->insertSharded(conf, outns.c_str(), (BSONObj&) final, 0); + } + +// state.dumpToInc(); +// state.postProcessCollection(); +// state.appendResults( result ); + +// for ( set<ServerAndQuery>::iterator i=servers.begin(); i!=servers.end(); i++ ) { +// ScopedDbConnection conn( i->_server ); +// conn->dropCollection( dbname + "." + shardedOutputCollection ); +// conn.done(); +// } + result.append("shardCounts", shardCounts); + result.append("counts", aggCounts); + ok = true; + } if ( ! ok ) { errmsg = "final reduce failed: "; diff --git a/s/mr_shard.cpp b/s/mr_shard.cpp new file mode 100644 index 00000000000..44a43a2c785 --- /dev/null +++ b/s/mr_shard.cpp @@ -0,0 +1,311 @@ +// mr_shard.cpp + +/** + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "pch.h" +#include "../util/message.h" +#include "../db/dbmessage.h" + +#include "mr_shard.h" + +namespace mongo { + + namespace mr_shard { + + AtomicUInt Config::JOB_NUMBER; + + JSFunction::JSFunction( string type , const BSONElement& e ) { + _type = type; + _code = e._asCode(); + + if ( e.type() == CodeWScope ) + _wantedScope = e.codeWScopeObject(); + } + + void JSFunction::init( State * state ) { + _scope = state->scope(); + assert( _scope ); + _scope->init( &_wantedScope ); + + _func = _scope->createFunction( _code.c_str() ); + uassert( 14814 , str::stream() << "couldn't compile code for: " << _type , _func ); + + // install in JS scope so that it can be called in JS mode + _scope->setFunction(_type.c_str(), _code.c_str()); + } + + /** + * Applies the finalize function to a tuple obj (key, val) + * Returns tuple obj {_id: key, value: newval} + */ + BSONObj JSFinalizer::finalize( const BSONObj& o ) { + Scope * s = _func.scope(); + + Scope::NoDBAccess no = s->disableDBAccess( "can't access db inside finalize" ); + s->invokeSafe( _func.func() , &o, 0 ); + + // don't want to use o.objsize() to size b + // since there are many cases where the point of finalize + // is converting many fields to 1 + BSONObjBuilder b; + b.append( o.firstElement() ); + s->append( b , "value" , "return" ); + return b.obj(); + } + + void JSReducer::init( State * state ) { + _func.init( state ); + } + + /** + * Reduces a list of tuple objects (key, value) to a single tuple {"0": key, "1": value} + */ + BSONObj JSReducer::reduce( const BSONList& tuples ) { + if (tuples.size() <= 1) + return tuples[0]; + BSONObj key; + int endSizeEstimate = 16; + _reduce( tuples , key , endSizeEstimate ); + + BSONObjBuilder b(endSizeEstimate); + b.appendAs( key.firstElement() , "0" ); + _func.scope()->append( b , "1" , "return" ); + return b.obj(); + } + + /** + * Reduces a list of tuple object (key, value) to a single tuple {_id: key, value: val} + * Also applies a finalizer method if present. + */ + BSONObj JSReducer::finalReduce( const BSONList& tuples , Finalizer * finalizer ) { + + BSONObj res; + BSONObj key; + + if (tuples.size() == 1) { + // 1 obj, just use it + key = tuples[0]; + BSONObjBuilder b(key.objsize()); + BSONObjIterator it(key); + b.appendAs( it.next() , "_id" ); + b.appendAs( it.next() , "value" ); + res = b.obj(); + } + else { + // need to reduce + int endSizeEstimate = 16; + _reduce( tuples , key , endSizeEstimate ); + BSONObjBuilder b(endSizeEstimate); + b.appendAs( key.firstElement() , "_id" ); + _func.scope()->append( b , "value" , "return" ); + res = b.obj(); + } + + if ( finalizer ) { + res = finalizer->finalize( res ); + } + + return res; + } + + /** + * actually applies a reduce, to a list of tuples (key, value). + * After the call, tuples will hold a single tuple {"0": key, "1": value} + */ + void JSReducer::_reduce( const BSONList& tuples , BSONObj& key , int& endSizeEstimate ) { + int sizeEstimate = ( tuples.size() * tuples.begin()->getField( "value" ).size() ) + 128; + + // need to build the reduce args: ( key, [values] ) + BSONObjBuilder reduceArgs( sizeEstimate ); + boost::scoped_ptr<BSONArrayBuilder> valueBuilder; + int sizeSoFar = 0; + unsigned n = 0; + for ( ; n<tuples.size(); n++ ) { + BSONObjIterator j(tuples[n]); + BSONElement keyE = j.next(); + if ( n == 0 ) { + reduceArgs.append( keyE ); + key = keyE.wrap(); + sizeSoFar = 5 + keyE.size(); + valueBuilder.reset(new BSONArrayBuilder( reduceArgs.subarrayStart( "tuples" ) )); + } + + BSONElement ee = j.next(); + + uassert( 14815 , "value too large to reduce" , ee.size() < ( BSONObjMaxUserSize / 2 ) ); + + if ( sizeSoFar + ee.size() > BSONObjMaxUserSize ) { + assert( n > 1 ); // if not, inf. loop + break; + } + + valueBuilder->append( ee ); + sizeSoFar += ee.size(); + } + assert(valueBuilder); + valueBuilder->done(); + BSONObj args = reduceArgs.obj(); + + Scope * s = _func.scope(); + + s->invokeSafe( _func.func() , &args, 0 ); + ++numReduces; + + if ( s->type( "return" ) == Array ) { + uasserted( 14816 , "reduce -> multiple not supported yet"); + return; + } + + endSizeEstimate = key.objsize() + ( args.objsize() / tuples.size() ); + + if ( n == tuples.size() ) + return; + + // the input list was too large, add the rest of elmts to new tuples and reduce again + // note: would be better to use loop instead of recursion to avoid stack overflow + BSONList x; + for ( ; n < tuples.size(); n++ ) { + x.push_back( tuples[n] ); + } + BSONObjBuilder temp( endSizeEstimate ); + temp.append( key.firstElement() ); + s->append( temp , "1" , "return" ); + x.push_back( temp.obj() ); + _reduce( x , key , endSizeEstimate ); + } + + Config::Config( const string& _dbname , const BSONObj& cmdObj ) { + + dbname = _dbname; + ns = dbname + "." + cmdObj.firstElement().valuestr(); + + verbose = cmdObj["verbose"].trueValue(); + jsMode = cmdObj["jsMode"].trueValue(); + + jsMaxKeys = 500000; + reduceTriggerRatio = 2.0; + maxInMemSize = 5 * 1024 * 1024; + + uassert( 14819 , "outType is no longer a valid option" , cmdObj["outType"].eoo() ); + + if ( cmdObj["out"].type() == String ) { + finalShort = cmdObj["out"].String(); + outType = REPLACE; + } + else if ( cmdObj["out"].type() == Object ) { + BSONObj o = cmdObj["out"].embeddedObject(); + + BSONElement e = o.firstElement(); + string t = e.fieldName(); + + if ( t == "normal" || t == "replace" ) { + outType = REPLACE; + finalShort = e.String(); + } + else if ( t == "merge" ) { + outType = MERGE; + finalShort = e.String(); + } + else if ( t == "reduce" ) { + outType = REDUCE; + finalShort = e.String(); + } + else if ( t == "inline" ) { + outType = INMEMORY; + } + else { + uasserted( 14817 , str::stream() << "unknown out specifier [" << t << "]" ); + } + + if (o.hasElement("db")) { + outDB = o["db"].String(); + } + } + else { + uasserted( 14818 , "'out' has to be a string or an object" ); + } + + if ( outType != INMEMORY ) { // setup names + tempLong = str::stream() << (outDB.empty() ? dbname : outDB) << ".tmp.mr." << cmdObj.firstElement().String() << "_" << finalShort << "_" << JOB_NUMBER++; + + incLong = tempLong + "_inc"; + + finalLong = str::stream() << (outDB.empty() ? dbname : outDB) << "." << finalShort; + } + + { + // scope and code + + if ( cmdObj["scope"].type() == Object ) + scopeSetup = cmdObj["scope"].embeddedObjectUserCheck(); + + reducer.reset( new JSReducer( cmdObj["reduce"] ) ); + if ( cmdObj["finalize"].type() && cmdObj["finalize"].trueValue() ) + finalizer.reset( new JSFinalizer( cmdObj["finalize"] ) ); + + } + + { + // query options + if ( cmdObj["limit"].isNumber() ) + limit = cmdObj["limit"].numberLong(); + else + limit = 0; + } + } + + State::State( const Config& c ) : _config( c ) { + _onDisk = _config.outType != Config::INMEMORY; + } + + State::~State() { + if ( _onDisk ) { + try { +// _db.dropCollection( _config.tempLong ); +// _db.dropCollection( _config.incLong ); + } + catch ( std::exception& e ) { + error() << "couldn't cleanup after map reduce: " << e.what() << endl; + } + } + + if (_scope) { + // cleanup js objects + ScriptingFunction cleanup = _scope->createFunction("delete _emitCt; delete _keyCt; delete _mrMap;"); + _scope->invoke(cleanup, 0, 0, 0, true); + } + } + + /** + * Initialize the mapreduce operation, creating the inc collection + */ + void State::init() { + // setup js + _scope.reset(globalScriptEngine->getPooledScope( _config.dbname ).release() ); +// _scope->localConnect( _config.dbname.c_str() ); + _scope->externalSetup(); + + if ( ! _config.scopeSetup.isEmpty() ) + _scope->init( &_config.scopeSetup ); + + _config.reducer->init( this ); + if ( _config.finalizer ) + _config.finalizer->init( this ); + _scope->setBoolean("_doFinal", _config.finalizer); + } + } +} + diff --git a/s/mr_shard.h b/s/mr_shard.h new file mode 100644 index 00000000000..9afabface0d --- /dev/null +++ b/s/mr_shard.h @@ -0,0 +1,230 @@ +// mr_shard.h + +/** + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "pch.h" + +namespace mongo { + + namespace mr_shard { + + typedef vector<BSONObj> BSONList; + + class State; + + // ------------ function interfaces ----------- + + class Finalizer : boost::noncopyable { + public: + virtual ~Finalizer() {} + virtual void init( State * state ) = 0; + + /** + * this takes a tuple and returns a tuple + */ + virtual BSONObj finalize( const BSONObj& tuple ) = 0; + }; + + class Reducer : boost::noncopyable { + public: + Reducer() : numReduces(0) {} + virtual ~Reducer() {} + virtual void init( State * state ) = 0; + + virtual BSONObj reduce( const BSONList& tuples ) = 0; + /** this means its a final reduce, even if there is no finalizer */ + virtual BSONObj finalReduce( const BSONList& tuples , Finalizer * finalizer ) = 0; + + long long numReduces; + }; + + // ------------ js function implementations ----------- + + /** + * used as a holder for Scope and ScriptingFunction + * visitor like pattern as Scope is gotten from first access + */ + class JSFunction : boost::noncopyable { + public: + /** + * @param type (map|reduce|finalize) + */ + JSFunction( string type , const BSONElement& e ); + virtual ~JSFunction() {} + + virtual void init( State * state ); + + Scope * scope() const { return _scope; } + ScriptingFunction func() const { return _func; } + + private: + string _type; + string _code; // actual javascript code + BSONObj _wantedScope; // this is for CodeWScope + + Scope * _scope; // this is not owned by us, and might be shared + ScriptingFunction _func; + }; + + class JSReducer : public Reducer { + public: + JSReducer( const BSONElement& code ) : _func( "_reduce" , code ) {} + virtual void init( State * state ); + + virtual BSONObj reduce( const BSONList& tuples ); + virtual BSONObj finalReduce( const BSONList& tuples , Finalizer * finalizer ); + + private: + + /** + * result in "return" + * @param key OUT + * @param endSizeEstimate OUT + */ + void _reduce( const BSONList& values , BSONObj& key , int& endSizeEstimate ); + + JSFunction _func; + }; + + class JSFinalizer : public Finalizer { + public: + JSFinalizer( const BSONElement& code ) : _func( "_finalize" , code ) {} + virtual BSONObj finalize( const BSONObj& o ); + virtual void init( State * state ) { _func.init( state ); } + private: + JSFunction _func; + + }; + + // ----------------- + + /** + * holds map/reduce config information + */ + class Config { + public: + Config( const string& _dbname , const BSONObj& cmdObj ); + + string dbname; + string ns; + + // options + bool verbose; + bool jsMode; + + // query options + + BSONObj filter; + BSONObj sort; + long long limit; + + // functions + scoped_ptr<Reducer> reducer; + scoped_ptr<Finalizer> finalizer; + + BSONObj mapParams; + BSONObj scopeSetup; + + // output tables + string incLong; + string tempLong; + + string finalShort; + string finalLong; + + string outDB; + + // max number of keys allowed in JS map before switching mode + long jsMaxKeys; + // ratio of duplicates vs unique keys before reduce is triggered in js mode + float reduceTriggerRatio; + // maximum size of map before it gets dumped to disk + long maxInMemSize; + + enum { REPLACE , // atomically replace the collection + MERGE , // merge keys, override dups + REDUCE , // merge keys, reduce dups + INMEMORY // only store in memory, limited in size + } outType; + + static AtomicUInt JOB_NUMBER; + }; // end MRsetup + + /** + * stores information about intermediate map reduce state + * controls flow of data from map->reduce->finalize->output + */ + class State { + public: + State( const Config& c ); + ~State(); + + void init(); + + // ---- prep ----- + bool sourceExists(); + + long long incomingDocuments(); + + // ---- map stage ---- + + /** + * stages on in in-memory storage + */ + void emit( const BSONObj& a ); + + /** + * if size is big, run a reduce + * if its still big, dump to temp collection + */ + void checkSize(); + + /** + * run reduce on _temp + */ + void reduceInMemory(); + + // ------ reduce stage ----------- + + void prepTempCollection(); + + void finalReduce( BSONList& values ); + + void finalReduce( CurOp * op , ProgressMeterHolder& pm ); + + // ------ simple accessors ----- + + /** State maintains ownership, do no use past State lifetime */ + Scope* scope() { return _scope.get(); } + + const Config& config() { return _config; } + + const bool isOnDisk() { return _onDisk; } + + const Config& _config; + + protected: + + scoped_ptr<Scope> _scope; + bool _onDisk; // if the end result of this map reduce is disk or not + }; + + } // end mr namespace +} + + diff --git a/s/server.cpp b/s/server.cpp index 5d0d2931c6e..48e284523af 100644 --- a/s/server.cpp +++ b/s/server.cpp @@ -44,6 +44,7 @@ namespace mongo { Database *database = 0; string mongosCommand; bool dbexitCalled = false; + static bool scriptingEnabled = true; bool inShutdown() { return dbexitCalled; @@ -220,6 +221,7 @@ int _main(int argc, char* argv[]) { ( "chunkSize" , po::value<int>(), "maximum amount of data per chunk" ) ( "ipv6", "enable IPv6 support (disabled by default)" ) ( "jsonp","allow JSONP access via http (has security implications)" ) + ("noscripting", "disable scripting engine") ; options.add(sharding_options); @@ -261,6 +263,10 @@ int _main(int argc, char* argv[]) { return 0; } + if (params.count("noscripting")) { + scriptingEnabled = false; + } + if ( ! params.count( "configdb" ) ) { out() << "error: no args for --configdb" << endl; return 4; @@ -348,6 +354,12 @@ int _main(int argc, char* argv[]) { boost::thread web( boost::bind(&webServerThread, new NoAdminAccess() /* takes ownership */) ); + if ( scriptingEnabled ) { + ScriptEngine::setup(); +// globalScriptEngine->setCheckInterruptCallback( jsInterruptCallback ); +// globalScriptEngine->setGetInterruptSpecCallback( jsGetInterruptSpecCallback ); + } + MessageServer::Options opts; opts.port = cmdLine.port; opts.ipList = cmdLine.bind_ip; diff --git a/s/strategy.cpp b/s/strategy.cpp index 7502924f6bb..64adcadb933 100644 --- a/s/strategy.cpp +++ b/s/strategy.cpp @@ -76,4 +76,5 @@ namespace mongo { dbcon->insert( ns , obj , flags); dbcon.done(); } + } diff --git a/s/strategy.h b/s/strategy.h index 78c5af6f90b..9bbd2e4077b 100644 --- a/s/strategy.h +++ b/s/strategy.h @@ -32,6 +32,8 @@ namespace mongo { virtual void getMore( Request& r ) = 0; virtual void writeOp( int op , Request& r ) = 0; + virtual void insertSharded( DBConfigPtr conf, const char* ns, BSONObj& o, int flags ) = 0; + protected: void doWrite( int op , Request& r , const Shard& shard , bool checkVersion = true ); void doQuery( Request& r , const Shard& shard ); diff --git a/s/strategy_shard.cpp b/s/strategy_shard.cpp index 62a5417880f..600ded54799 100644 --- a/s/strategy_shard.cpp +++ b/s/strategy_shard.cpp @@ -191,6 +191,63 @@ namespace mongo { } } + void insertSharded( DBConfigPtr conf, const char* ns, BSONObj& o, int flags ) { + ChunkManagerPtr manager = conf->getChunkManager(ns); + if ( ! manager->hasShardKey( o ) ) { + + bool bad = true; + + if ( manager->getShardKey().partOfShardKey( "_id" ) ) { + BSONObjBuilder b; + b.appendOID( "_id" , 0 , true ); + b.appendElements( o ); + o = b.obj(); + bad = ! manager->hasShardKey( o ); + } + + if ( bad ) { + log() << "tried to insert object without shard key: " << ns << " " << o << endl; + uasserted( 14812 , "tried to insert object without shard key" ); + } + + } + + // Many operations benefit from having the shard key early in the object + o = manager->getShardKey().moveToFront(o); + + const int maxTries = 30; + + bool gotThrough = false; + for ( int i=0; i<maxTries; i++ ) { + try { + ChunkPtr c = manager->findChunk( o ); + log(4) << " server:" << c->getShard().toString() << " " << o << endl; + insert( c->getShard() , ns , o , flags); + +// r.gotInsert(); +// if ( r.getClientInfo()->autoSplitOk() ) + c->splitIfShould( o.objsize() ); + gotThrough = true; + break; + } + catch ( StaleConfigException& e ) { + int logLevel = i < ( maxTries / 2 ); + LOG( logLevel ) << "retrying insert because of StaleConfigException: " << e << " object: " << o << endl; +// r.reset(); + + unsigned long long old = manager->getSequenceNumber(); + manager = conf->getChunkManager(ns); + + LOG( logLevel ) << " sequenece number - old: " << old << " new: " << manager->getSequenceNumber() << endl; + + if (!manager) { + uasserted(14813, "collection no longer sharded"); + } + } + sleepmillis( i * 20 ); + } + } + void _update( Request& r , DbMessage& d, ChunkManagerPtr manager ) { int flags = d.pullInt(); diff --git a/s/strategy_single.cpp b/s/strategy_single.cpp index b364880e5c6..34e3fe00e97 100644 --- a/s/strategy_single.cpp +++ b/s/strategy_single.cpp @@ -262,6 +262,10 @@ namespace mongo { return true; } + void insertSharded( DBConfigPtr conf, const char* ns, BSONObj& o, int flags ) { + // only useful for shards + } + set<string> _commandsSafeToPass; }; |