summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEliot Horowitz <eliot@10gen.com>2009-04-06 15:57:17 -0400
committerEliot Horowitz <eliot@10gen.com>2009-04-06 15:57:17 -0400
commit6b49e60fe5d272df63eafa44cc99886bf10d82af (patch)
tree5bd44ff2032bb3987dfefd673a5f95ba520186de
parent53e63fd6461d652a1eec98371950aeaa757d86c5 (diff)
downloadmongo-6b49e60fe5d272df63eafa44cc99886bf10d82af.tar.gz
implemented moveshard.(start|finish) in mongod
-rw-r--r--jstests/sharding/moveshard1.js39
-rw-r--r--s/d_logic.cpp163
2 files changed, 179 insertions, 23 deletions
diff --git a/jstests/sharding/moveshard1.js b/jstests/sharding/moveshard1.js
new file mode 100644
index 00000000000..18c9c1c9edf
--- /dev/null
+++ b/jstests/sharding/moveshard1.js
@@ -0,0 +1,39 @@
+// moveshard1.js
+
+s = new ShardingTest( "moveshard1" , 2 );
+
+l = s._connections[0];
+r = s._connections[1];
+
+ldb = l.getDB( "foo" );
+rdb = r.getDB( "foo" );
+
+ldb.things.save( { a : 1 } )
+ldb.things.save( { a : 2 } )
+ldb.things.save( { a : 3 } )
+
+assert.eq( ldb.things.count() , 3 );
+assert.eq( rdb.things.count() , 0 );
+
+startResult = l.getDB( "admin" ).runCommand( { "moveshard.start" : "foo.things" ,
+ "to" : s._serverNames[1] ,
+ "from" : s._serverNames[0] ,
+ filter : { a : { $gt : 2 } }
+ } );
+print( "moveshard.start: " + tojson( startResult ) );
+assert( startResult.ok == 1 , "start failed!" );
+
+finishResult = l.getDB( "admin" ).runCommand( { "moveshard.finish" : "foo.things" ,
+ finishToken : startResult.finishToken ,
+ to : s._serverNames[1] ,
+ newVersion : 1 } );
+print( "moveshard.finish: " + tojson( finishResult ) );
+assert( finishResult.ok == 1 , "finishResult failed!" );
+
+assert.eq( rdb.things.count() , 1 , "right has wrong size after move" );
+assert.eq( ldb.things.count() , 2 , "left has wrong size after move" );
+
+
+s.stop();
+
+
diff --git a/s/d_logic.cpp b/s/d_logic.cpp
index 8cf7900a08f..9fe57f17872 100644
--- a/s/d_logic.cpp
+++ b/s/d_logic.cpp
@@ -30,6 +30,8 @@
#include "../db/jsobj.h"
#include "../db/dbmessage.h"
+#include "../client/connpool.h"
+
using namespace std;
namespace mongo {
@@ -40,6 +42,22 @@ namespace mongo {
boost::thread_specific_ptr<NSVersions> clientShardVersions;
string shardConfigServer;
+ unsigned long long getVersion( BSONElement e , string& errmsg ){
+ if ( e.eoo() ){
+ errmsg = "no version";
+ return 0;
+ }
+
+ if ( e.isNumber() )
+ return (unsigned long long)e.number();
+
+ if ( e.type() == Date || e.type() == Timestamp )
+ return e.date();
+
+
+ errmsg = "version is not a numberic type";
+ return 0;
+ }
class MongodShardCommand : public Command {
public:
@@ -86,23 +104,9 @@ namespace mongo {
}
}
- unsigned long long version;
- {
- BSONElement e = cmdObj["version"];
- if ( e.eoo() ){
- errmsg = "no version";
- return false;
- }
- else if ( e.isNumber() )
- version = (unsigned long long)e.number();
- else if ( e.type() == Date || e.type() == Timestamp )
- version = e.date();
- else {
- errmsg = "version is not a numberic type";
- return false;
- }
-
- }
+ unsigned long long version = getVersion( cmdObj["version"] , errmsg );
+ if ( ! version )
+ return false;
NSVersions * versions = clientShardVersions.get();
@@ -178,19 +182,69 @@ namespace mongo {
}
} getShardVersion;
-
+
class MoveShardStartCommand : public MongodShardCommand {
public:
MoveShardStartCommand() : MongodShardCommand( "moveshard.start" ){}
virtual void help( stringstream& help ) const {
help << "should not be calling this directly" << endl;
}
-
+
bool run(const char *cmdns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
- // i assume i'm already locked
// so i have to start clone, tell caller its ok to make change
+ // at this point the caller locks me, and updates config db
// then finish calls finish, and then deletes data when cursors are done
- return false;
+
+ string ns = cmdObj["moveshard.start"].valuestrsafe();
+ string to = cmdObj["to"].valuestrsafe();
+ string from = cmdObj["from"].valuestrsafe(); // my public address, a tad redundant, but safe
+ BSONObj filter = cmdObj.getObjectField( "filter" );
+
+ if ( ns.size() == 0 ){
+ errmsg = "need to specify namespace in command";
+ return false;
+ }
+
+ if ( to.size() == 0 ){
+ errmsg = "need to specify server to move shard to";
+ return false;
+ }
+ if ( from.size() == 0 ){
+ errmsg = "need to specify server to move shard from (redundat i know)";
+ return false;
+ }
+
+ if ( filter.isEmpty() ){
+ errmsg = "need to specify a filter";
+ return false;
+ }
+
+ log() << "got moveshard.start: " << cmdObj << endl;
+
+
+ BSONObj res;
+ bool ok;
+
+ {
+ dbtemprelease unlock;
+
+ ScopedDbConnection conn( to );
+ ok = conn->runCommand( "admin" ,
+ BSON( "startCloneCollection" << ns <<
+ "from" << from <<
+ "query" << filter
+ ) ,
+ res );
+ conn.done();
+ }
+
+ log() << " moveshard.start res: " << res << endl;
+
+ if ( ok ){
+ result.append( res["finishToken"] );
+ }
+
+ return ok;
}
} moveShardStartCmd;
@@ -201,10 +255,73 @@ namespace mongo {
virtual void help( stringstream& help ) const {
help << "should not be calling this directly" << endl;
}
-
+
bool run(const char *cmdns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
// see MoveShardStartCommand::run
- return false;
+
+ string ns = cmdObj["moveshard.finish"].valuestrsafe();
+ if ( ns.size() == 0 ){
+ errmsg = "need ns as cmd value";
+ return false;
+ }
+
+ string to = cmdObj["to"].valuestrsafe();
+ if ( to.size() == 0 ){
+ errmsg = "need to specify server to move shard to";
+ return false;
+ }
+
+
+ unsigned long long newVersion = getVersion( cmdObj["newVersion"] , errmsg );
+ if ( newVersion == 0 ){
+ errmsg = "have to specify new version number";
+ return false;
+ }
+
+ BSONObj finishToken = cmdObj.getObjectField( "finishToken" );
+ if ( finishToken.isEmpty() ){
+ errmsg = "need finishToken";
+ return false;
+ }
+
+ if ( ns != finishToken["collection"].valuestrsafe() ){
+ errmsg = "namespaced don't match";
+ return false;
+ }
+
+ // now we're locked
+ myVersions[ns] = newVersion;
+
+ BSONObj res;
+ bool ok;
+
+ {
+ dbtemprelease unlock;
+
+ ScopedDbConnection conn( to );
+ ok = conn->runCommand( "admin" ,
+ BSON( "finishCloneCollection" << finishToken ) ,
+ res );
+ conn.done();
+ }
+
+ if ( ! ok ){
+ // uh oh
+ errmsg = "finishCloneCollection failed!";
+ result << "finishError" << res;
+ return false;
+ }
+
+ // wait until cursors are clean
+ cerr << "WARNING: deleting data before ensuring no more cursors TODO" << endl;
+
+ dbtemprelease unlock;
+
+ DBDirectClient client;
+ BSONObj removeFilter = finishToken.getObjectField( "query" );
+ client.remove( ns , removeFilter );
+
+ return true;
}
} moveShardFinishCmd;