diff options
author | Eliot Horowitz <eliot@10gen.com> | 2010-06-11 17:16:58 -0400 |
---|---|---|
committer | Eliot Horowitz <eliot@10gen.com> | 2010-06-11 17:16:58 -0400 |
commit | cd005b0e587e96360eef620be80588109f0b5cc2 (patch) | |
tree | 310872c0371730c14aed8d2b7ff7c41a825bec5e | |
parent | 76b74e2223647f76b15847070eceeb79915b821c (diff) | |
download | mongo-cd005b0e587e96360eef620be80588109f0b5cc2.tar.gz |
more mongod sharding cleaning
-rw-r--r-- | SConstruct | 2 | ||||
-rw-r--r-- | s/d_logic.cpp | 172 | ||||
-rw-r--r-- | s/d_logic.h | 25 | ||||
-rw-r--r-- | s/d_migrate.cpp | 208 | ||||
-rw-r--r-- | s/d_writeback.cpp | 2 |
5 files changed, 240 insertions, 169 deletions
diff --git a/SConstruct b/SConstruct index 743ccc1b6e1..f6214398a82 100644 --- a/SConstruct +++ b/SConstruct @@ -434,7 +434,7 @@ coreServerFiles += scriptingFiles coreShardFiles = [] shardServerFiles = coreShardFiles + Glob( "s/strategy*.cpp" ) + [ "s/commands_admin.cpp" , "s/commands_public.cpp" , "s/request.cpp" , "s/cursors.cpp" , "s/server.cpp" , "s/chunk.cpp" , "s/shard.cpp" , "s/shardkey.cpp" , "s/config.cpp" , "s/config_migrate.cpp" , "s/s_only.cpp" , "s/stats.cpp" , "s/balance.cpp" , "s/balancer_policy.cpp" , "db/cmdline.cpp" ] -serverOnlyFiles += coreShardFiles + [ "s/d_logic.cpp" , "s/d_writeback.cpp" ] +serverOnlyFiles += coreShardFiles + [ "s/d_logic.cpp" , "s/d_writeback.cpp" , "s/d_migrate.cpp" ] serverOnlyFiles += [ "db/module.cpp" ] + Glob( "db/modules/*.cpp" ) diff --git a/s/d_logic.cpp b/s/d_logic.cpp index ca4fd098d7b..2d49ad59317 100644 --- a/s/d_logic.cpp +++ b/s/d_logic.cpp @@ -42,10 +42,8 @@ using namespace std; namespace mongo { - typedef map<string,unsigned long long> NSVersions; - - NSVersions globalVersions; - boost::thread_specific_ptr<NSVersions> clientShardVersions; + NSVersionMap globalVersions; + boost::thread_specific_ptr<NSVersionMap> clientShardVersions; string shardConfigServer; @@ -146,11 +144,11 @@ namespace mongo { return false; } - NSVersions * versions = clientShardVersions.get(); + NSVersionMap * versions = clientShardVersions.get(); if ( ! versions ){ log(1) << "entering shard mode for connection" << endl; - versions = new NSVersions(); + versions = new NSVersionMap(); clientShardVersions.reset( versions ); } @@ -246,163 +244,7 @@ namespace mongo { } getShardVersion; - class MoveShardStartCommand : public MongodShardCommand { - public: - MoveShardStartCommand() : MongodShardCommand( "movechunk.start" ){} - virtual void help( stringstream& help ) const { - help << "should not be calling this directly" << endl; - } - - virtual LockType locktype() const { return WRITE; } - - bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ - // so i have to start clone, tell caller its ok to make change - // at this point the caller locks me, and updates config db - // then finish calls finish, and then deletes data when cursors are done - - string ns = cmdObj["movechunk.start"].valuestrsafe(); - string to = cmdObj["to"].valuestrsafe(); - string from = cmdObj["from"].valuestrsafe(); // my public address, a tad redundant, but safe - BSONObj filter = cmdObj.getObjectField( "filter" ); - - if ( ns.size() == 0 ){ - errmsg = "need to specify namespace in command"; - return false; - } - - if ( to.size() == 0 ){ - errmsg = "need to specify server to move shard to"; - return false; - } - if ( from.size() == 0 ){ - errmsg = "need to specify server to move shard from (redundat i know)"; - return false; - } - - if ( filter.isEmpty() ){ - errmsg = "need to specify a filter"; - return false; - } - - log() << "got movechunk.start: " << cmdObj << endl; - - - BSONObj res; - bool ok; - - { - dbtemprelease unlock; - - ScopedDbConnection conn( to ); - ok = conn->runCommand( "admin" , - BSON( "startCloneCollection" << ns << - "from" << from << - "query" << filter - ) , - res ); - conn.done(); - } - - log() << " movechunk.start res: " << res << endl; - - if ( ok ){ - result.append( res["finishToken"] ); - } - else { - errmsg = "startCloneCollection failed: "; - errmsg += res["errmsg"].valuestrsafe(); - } - return ok; - } - - } moveShardStartCmd; - class MoveShardFinishCommand : public MongodShardCommand { - public: - MoveShardFinishCommand() : MongodShardCommand( "movechunk.finish" ){} - virtual void help( stringstream& help ) const { - help << "should not be calling this directly" << endl; - } - - virtual LockType locktype() const { return WRITE; } - - bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ - // see MoveShardStartCommand::run - - string ns = cmdObj["movechunk.finish"].valuestrsafe(); - if ( ns.size() == 0 ){ - errmsg = "need ns as cmd value"; - return false; - } - - string to = cmdObj["to"].valuestrsafe(); - if ( to.size() == 0 ){ - errmsg = "need to specify server to move shard to"; - return false; - } - - - unsigned long long newVersion = extractVersion( cmdObj["newVersion"] , errmsg ); - if ( newVersion == 0 ){ - errmsg = "have to specify new version number"; - return false; - } - - BSONObj finishToken = cmdObj.getObjectField( "finishToken" ); - if ( finishToken.isEmpty() ){ - errmsg = "need finishToken"; - return false; - } - - if ( ns != finishToken["collection"].valuestrsafe() ){ - errmsg = "namespaced don't match"; - return false; - } - - // now we're locked - globalVersions[ns] = newVersion; - NSVersions * versions = clientShardVersions.get(); - if ( ! versions ){ - versions = new NSVersions(); - clientShardVersions.reset( versions ); - } - (*versions)[ns] = newVersion; - - BSONObj res; - bool ok; - - { - dbtemprelease unlock; - - ScopedDbConnection conn( to ); - ok = conn->runCommand( "admin" , - BSON( "finishCloneCollection" << finishToken ) , - res ); - conn.done(); - } - - if ( ! ok ){ - // uh oh - errmsg = "finishCloneCollection failed!"; - result << "finishError" << res; - return false; - } - - // wait until cursors are clean - cout << "WARNING: deleting data before ensuring no more cursors TODO" << endl; - - { - BSONObj removeFilter = finishToken.getObjectField( "query" ); - Client::Context ctx(ns); - long long num = deleteObjects( ns.c_str() , removeFilter , false , true ); - log() << "movechunk.finish deleted: " << num << endl; - result.appendNumber( "numDeleted" , num ); - } - - return true; - } - - } moveShardFinishCmd; bool haveLocalShardingInfo( const string& ns ){ if ( shardConfigServer.empty() ) @@ -413,7 +255,7 @@ namespace mongo { if ( version == 0 ) return false; - NSVersions * versions = clientShardVersions.get(); + NSVersionMap * versions = clientShardVersions.get(); if ( ! versions ) return false; @@ -429,11 +271,11 @@ namespace mongo { return true; } - NSVersions::iterator i = globalVersions.find( ns ); + NSVersionMap::iterator i = globalVersions.find( ns ); if ( i == globalVersions.end() ) return true; - NSVersions * versions = clientShardVersions.get(); + NSVersionMap * versions = clientShardVersions.get(); if ( ! versions ){ // this means the client has nothing sharded // so this allows direct connections to do whatever they want diff --git a/s/d_logic.h b/s/d_logic.h index f3174b8bd31..998725ebd1d 100644 --- a/s/d_logic.h +++ b/s/d_logic.h @@ -22,9 +22,13 @@ namespace mongo { - /* queue a write back on a remote server for a failed write */ - void queueWriteBack( const string& remote , const BSONObj& o ); + // ----------------- + // --- core --- + // ----------------- + unsigned long long extractVersion( BSONElement e , string& errmsg ); + + /** * @return true if we have any shard info for the ns */ @@ -39,4 +43,21 @@ namespace mongo { * @return true if we took care of the message and nothing else should be done */ bool handlePossibleShardedMessage( Message &m, DbResponse &dbresponse ); + + // ----------------- + // --- writeback --- + // ----------------- + + /* queue a write back on a remote server for a failed write */ + void queueWriteBack( const string& remote , const BSONObj& o ); + + + + // TEMP TEMP TEMP + + typedef map<string,unsigned long long> NSVersionMap; + + extern NSVersionMap globalVersions; + extern boost::thread_specific_ptr<NSVersionMap> clientShardVersions; + } diff --git a/s/d_migrate.cpp b/s/d_migrate.cpp new file mode 100644 index 00000000000..15c4bc1a2bb --- /dev/null +++ b/s/d_migrate.cpp @@ -0,0 +1,208 @@ +// d_migrate.cpp + +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + + +/** + these are commands that live in mongod + mostly around shard management and checking + */ + +#include "pch.h" +#include <map> +#include <string> + +#include "../db/commands.h" +#include "../db/jsobj.h" +#include "../db/dbmessage.h" +#include "../db/query.h" + +#include "../client/connpool.h" + +#include "../util/queue.h" + +#include "shard.h" +#include "d_logic.h" + +using namespace std; + +namespace mongo { + + + class MoveShardStartCommand : public Command { + public: + MoveShardStartCommand() : Command( "movechunk.start" ){} + virtual void help( stringstream& help ) const { + help << "should not be calling this directly" << endl; + } + + virtual bool slaveOk() const { return false; } + virtual bool adminOnly() const { return true; } + virtual LockType locktype() const { return WRITE; } + + bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ + // so i have to start clone, tell caller its ok to make change + // at this point the caller locks me, and updates config db + // then finish calls finish, and then deletes data when cursors are done + + string ns = cmdObj["movechunk.start"].valuestrsafe(); + string to = cmdObj["to"].valuestrsafe(); + string from = cmdObj["from"].valuestrsafe(); // my public address, a tad redundant, but safe + BSONObj filter = cmdObj.getObjectField( "filter" ); + + if ( ns.size() == 0 ){ + errmsg = "need to specify namespace in command"; + return false; + } + + if ( to.size() == 0 ){ + errmsg = "need to specify server to move shard to"; + return false; + } + if ( from.size() == 0 ){ + errmsg = "need to specify server to move shard from (redundat i know)"; + return false; + } + + if ( filter.isEmpty() ){ + errmsg = "need to specify a filter"; + return false; + } + + log() << "got movechunk.start: " << cmdObj << endl; + + + BSONObj res; + bool ok; + + { + dbtemprelease unlock; + + ScopedDbConnection conn( to ); + ok = conn->runCommand( "admin" , + BSON( "startCloneCollection" << ns << + "from" << from << + "query" << filter + ) , + res ); + conn.done(); + } + + log() << " movechunk.start res: " << res << endl; + + if ( ok ){ + result.append( res["finishToken"] ); + } + else { + errmsg = "startCloneCollection failed: "; + errmsg += res["errmsg"].valuestrsafe(); + } + return ok; + } + + } moveShardStartCmd; + + class MoveShardFinishCommand : public Command { + public: + MoveShardFinishCommand() : Command( "movechunk.finish" ){} + virtual void help( stringstream& help ) const { + help << "should not be calling this directly" << endl; + } + + virtual bool slaveOk() const { return false; } + virtual bool adminOnly() const { return true; } + virtual LockType locktype() const { return WRITE; } + + bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ + // see MoveShardStartCommand::run + + string ns = cmdObj["movechunk.finish"].valuestrsafe(); + if ( ns.size() == 0 ){ + errmsg = "need ns as cmd value"; + return false; + } + + string to = cmdObj["to"].valuestrsafe(); + if ( to.size() == 0 ){ + errmsg = "need to specify server to move shard to"; + return false; + } + + + unsigned long long newVersion = extractVersion( cmdObj["newVersion"] , errmsg ); + if ( newVersion == 0 ){ + errmsg = "have to specify new version number"; + return false; + } + + BSONObj finishToken = cmdObj.getObjectField( "finishToken" ); + if ( finishToken.isEmpty() ){ + errmsg = "need finishToken"; + return false; + } + + if ( ns != finishToken["collection"].valuestrsafe() ){ + errmsg = "namespaced don't match"; + return false; + } + + // now we're locked + globalVersions[ns] = newVersion; + NSVersionMap * versions = clientShardVersions.get(); + if ( ! versions ){ + versions = new NSVersionMap(); + clientShardVersions.reset( versions ); + } + (*versions)[ns] = newVersion; + + BSONObj res; + bool ok; + + { + dbtemprelease unlock; + + ScopedDbConnection conn( to ); + ok = conn->runCommand( "admin" , + BSON( "finishCloneCollection" << finishToken ) , + res ); + conn.done(); + } + + if ( ! ok ){ + // uh oh + errmsg = "finishCloneCollection failed!"; + result << "finishError" << res; + return false; + } + + // wait until cursors are clean + cout << "WARNING: deleting data before ensuring no more cursors TODO" << endl; + + { + BSONObj removeFilter = finishToken.getObjectField( "query" ); + Client::Context ctx(ns); + long long num = deleteObjects( ns.c_str() , removeFilter , false , true ); + log() << "movechunk.finish deleted: " << num << endl; + result.appendNumber( "numDeleted" , num ); + } + + return true; + } + + } moveShardFinishCmd; + +} diff --git a/s/d_writeback.cpp b/s/d_writeback.cpp index 4c0e134b23d..d49d025b63e 100644 --- a/s/d_writeback.cpp +++ b/s/d_writeback.cpp @@ -1,4 +1,4 @@ -// d_logic.cpp +// d_writeback.cpp /** * Copyright (C) 2008 10gen Inc. |