diff options
-rw-r--r-- | jstests/sharding/moveshard1.js | 39 | ||||
-rw-r--r-- | s/d_logic.cpp | 163 |
2 files changed, 179 insertions, 23 deletions
diff --git a/jstests/sharding/moveshard1.js b/jstests/sharding/moveshard1.js new file mode 100644 index 00000000000..18c9c1c9edf --- /dev/null +++ b/jstests/sharding/moveshard1.js @@ -0,0 +1,39 @@ +// moveshard1.js + +s = new ShardingTest( "moveshard1" , 2 ); + +l = s._connections[0]; +r = s._connections[1]; + +ldb = l.getDB( "foo" ); +rdb = r.getDB( "foo" ); + +ldb.things.save( { a : 1 } ) +ldb.things.save( { a : 2 } ) +ldb.things.save( { a : 3 } ) + +assert.eq( ldb.things.count() , 3 ); +assert.eq( rdb.things.count() , 0 ); + +startResult = l.getDB( "admin" ).runCommand( { "moveshard.start" : "foo.things" , + "to" : s._serverNames[1] , + "from" : s._serverNames[0] , + filter : { a : { $gt : 2 } } + } ); +print( "moveshard.start: " + tojson( startResult ) ); +assert( startResult.ok == 1 , "start failed!" ); + +finishResult = l.getDB( "admin" ).runCommand( { "moveshard.finish" : "foo.things" , + finishToken : startResult.finishToken , + to : s._serverNames[1] , + newVersion : 1 } ); +print( "moveshard.finish: " + tojson( finishResult ) ); +assert( finishResult.ok == 1 , "finishResult failed!" ); + +assert.eq( rdb.things.count() , 1 , "right has wrong size after move" ); +assert.eq( ldb.things.count() , 2 , "left has wrong size after move" ); + + +s.stop(); + + diff --git a/s/d_logic.cpp b/s/d_logic.cpp index 8cf7900a08f..9fe57f17872 100644 --- a/s/d_logic.cpp +++ b/s/d_logic.cpp @@ -30,6 +30,8 @@ #include "../db/jsobj.h" #include "../db/dbmessage.h" +#include "../client/connpool.h" + using namespace std; namespace mongo { @@ -40,6 +42,22 @@ namespace mongo { boost::thread_specific_ptr<NSVersions> clientShardVersions; string shardConfigServer; + unsigned long long getVersion( BSONElement e , string& errmsg ){ + if ( e.eoo() ){ + errmsg = "no version"; + return 0; + } + + if ( e.isNumber() ) + return (unsigned long long)e.number(); + + if ( e.type() == Date || e.type() == Timestamp ) + return e.date(); + + + errmsg = "version is not a numberic type"; + return 0; + } class MongodShardCommand : public Command { public: @@ -86,23 +104,9 @@ namespace mongo { } } - unsigned long long version; - { - BSONElement e = cmdObj["version"]; - if ( e.eoo() ){ - errmsg = "no version"; - return false; - } - else if ( e.isNumber() ) - version = (unsigned long long)e.number(); - else if ( e.type() == Date || e.type() == Timestamp ) - version = e.date(); - else { - errmsg = "version is not a numberic type"; - return false; - } - - } + unsigned long long version = getVersion( cmdObj["version"] , errmsg ); + if ( ! version ) + return false; NSVersions * versions = clientShardVersions.get(); @@ -178,19 +182,69 @@ namespace mongo { } } getShardVersion; - + class MoveShardStartCommand : public MongodShardCommand { public: MoveShardStartCommand() : MongodShardCommand( "moveshard.start" ){} virtual void help( stringstream& help ) const { help << "should not be calling this directly" << endl; } - + bool run(const char *cmdns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ - // i assume i'm already locked // so i have to start clone, tell caller its ok to make change + // at this point the caller locks me, and updates config db // then finish calls finish, and then deletes data when cursors are done - return false; + + string ns = cmdObj["moveshard.start"].valuestrsafe(); + string to = cmdObj["to"].valuestrsafe(); + string from = cmdObj["from"].valuestrsafe(); // my public address, a tad redundant, but safe + BSONObj filter = cmdObj.getObjectField( "filter" ); + + if ( ns.size() == 0 ){ + errmsg = "need to specify namespace in command"; + return false; + } + + if ( to.size() == 0 ){ + errmsg = "need to specify server to move shard to"; + return false; + } + if ( from.size() == 0 ){ + errmsg = "need to specify server to move shard from (redundat i know)"; + return false; + } + + if ( filter.isEmpty() ){ + errmsg = "need to specify a filter"; + return false; + } + + log() << "got moveshard.start: " << cmdObj << endl; + + + BSONObj res; + bool ok; + + { + dbtemprelease unlock; + + ScopedDbConnection conn( to ); + ok = conn->runCommand( "admin" , + BSON( "startCloneCollection" << ns << + "from" << from << + "query" << filter + ) , + res ); + conn.done(); + } + + log() << " moveshard.start res: " << res << endl; + + if ( ok ){ + result.append( res["finishToken"] ); + } + + return ok; } } moveShardStartCmd; @@ -201,10 +255,73 @@ namespace mongo { virtual void help( stringstream& help ) const { help << "should not be calling this directly" << endl; } - + bool run(const char *cmdns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ // see MoveShardStartCommand::run - return false; + + string ns = cmdObj["moveshard.finish"].valuestrsafe(); + if ( ns.size() == 0 ){ + errmsg = "need ns as cmd value"; + return false; + } + + string to = cmdObj["to"].valuestrsafe(); + if ( to.size() == 0 ){ + errmsg = "need to specify server to move shard to"; + return false; + } + + + unsigned long long newVersion = getVersion( cmdObj["newVersion"] , errmsg ); + if ( newVersion == 0 ){ + errmsg = "have to specify new version number"; + return false; + } + + BSONObj finishToken = cmdObj.getObjectField( "finishToken" ); + if ( finishToken.isEmpty() ){ + errmsg = "need finishToken"; + return false; + } + + if ( ns != finishToken["collection"].valuestrsafe() ){ + errmsg = "namespaced don't match"; + return false; + } + + // now we're locked + myVersions[ns] = newVersion; + + BSONObj res; + bool ok; + + { + dbtemprelease unlock; + + ScopedDbConnection conn( to ); + ok = conn->runCommand( "admin" , + BSON( "finishCloneCollection" << finishToken ) , + res ); + conn.done(); + } + + if ( ! ok ){ + // uh oh + errmsg = "finishCloneCollection failed!"; + result << "finishError" << res; + return false; + } + + // wait until cursors are clean + cerr << "WARNING: deleting data before ensuring no more cursors TODO" << endl; + + dbtemprelease unlock; + + DBDirectClient client; + BSONObj removeFilter = finishToken.getObjectField( "query" ); + client.remove( ns , removeFilter ); + + return true; } } moveShardFinishCmd; |