summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEliot Horowitz <eliot@10gen.com>2010-06-11 17:16:58 -0400
committerEliot Horowitz <eliot@10gen.com>2010-06-11 17:16:58 -0400
commitcd005b0e587e96360eef620be80588109f0b5cc2 (patch)
tree310872c0371730c14aed8d2b7ff7c41a825bec5e
parent76b74e2223647f76b15847070eceeb79915b821c (diff)
downloadmongo-cd005b0e587e96360eef620be80588109f0b5cc2.tar.gz
more mongod sharding cleaning
-rw-r--r--SConstruct2
-rw-r--r--s/d_logic.cpp172
-rw-r--r--s/d_logic.h25
-rw-r--r--s/d_migrate.cpp208
-rw-r--r--s/d_writeback.cpp2
5 files changed, 240 insertions, 169 deletions
diff --git a/SConstruct b/SConstruct
index 743ccc1b6e1..f6214398a82 100644
--- a/SConstruct
+++ b/SConstruct
@@ -434,7 +434,7 @@ coreServerFiles += scriptingFiles
coreShardFiles = []
shardServerFiles = coreShardFiles + Glob( "s/strategy*.cpp" ) + [ "s/commands_admin.cpp" , "s/commands_public.cpp" , "s/request.cpp" , "s/cursors.cpp" , "s/server.cpp" , "s/chunk.cpp" , "s/shard.cpp" , "s/shardkey.cpp" , "s/config.cpp" , "s/config_migrate.cpp" , "s/s_only.cpp" , "s/stats.cpp" , "s/balance.cpp" , "s/balancer_policy.cpp" , "db/cmdline.cpp" ]
-serverOnlyFiles += coreShardFiles + [ "s/d_logic.cpp" , "s/d_writeback.cpp" ]
+serverOnlyFiles += coreShardFiles + [ "s/d_logic.cpp" , "s/d_writeback.cpp" , "s/d_migrate.cpp" ]
serverOnlyFiles += [ "db/module.cpp" ] + Glob( "db/modules/*.cpp" )
diff --git a/s/d_logic.cpp b/s/d_logic.cpp
index ca4fd098d7b..2d49ad59317 100644
--- a/s/d_logic.cpp
+++ b/s/d_logic.cpp
@@ -42,10 +42,8 @@ using namespace std;
namespace mongo {
- typedef map<string,unsigned long long> NSVersions;
-
- NSVersions globalVersions;
- boost::thread_specific_ptr<NSVersions> clientShardVersions;
+ NSVersionMap globalVersions;
+ boost::thread_specific_ptr<NSVersionMap> clientShardVersions;
string shardConfigServer;
@@ -146,11 +144,11 @@ namespace mongo {
return false;
}
- NSVersions * versions = clientShardVersions.get();
+ NSVersionMap * versions = clientShardVersions.get();
if ( ! versions ){
log(1) << "entering shard mode for connection" << endl;
- versions = new NSVersions();
+ versions = new NSVersionMap();
clientShardVersions.reset( versions );
}
@@ -246,163 +244,7 @@ namespace mongo {
} getShardVersion;
- class MoveShardStartCommand : public MongodShardCommand {
- public:
- MoveShardStartCommand() : MongodShardCommand( "movechunk.start" ){}
- virtual void help( stringstream& help ) const {
- help << "should not be calling this directly" << endl;
- }
-
- virtual LockType locktype() const { return WRITE; }
-
- bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
- // so i have to start clone, tell caller its ok to make change
- // at this point the caller locks me, and updates config db
- // then finish calls finish, and then deletes data when cursors are done
-
- string ns = cmdObj["movechunk.start"].valuestrsafe();
- string to = cmdObj["to"].valuestrsafe();
- string from = cmdObj["from"].valuestrsafe(); // my public address, a tad redundant, but safe
- BSONObj filter = cmdObj.getObjectField( "filter" );
-
- if ( ns.size() == 0 ){
- errmsg = "need to specify namespace in command";
- return false;
- }
-
- if ( to.size() == 0 ){
- errmsg = "need to specify server to move shard to";
- return false;
- }
- if ( from.size() == 0 ){
- errmsg = "need to specify server to move shard from (redundat i know)";
- return false;
- }
-
- if ( filter.isEmpty() ){
- errmsg = "need to specify a filter";
- return false;
- }
-
- log() << "got movechunk.start: " << cmdObj << endl;
-
-
- BSONObj res;
- bool ok;
-
- {
- dbtemprelease unlock;
-
- ScopedDbConnection conn( to );
- ok = conn->runCommand( "admin" ,
- BSON( "startCloneCollection" << ns <<
- "from" << from <<
- "query" << filter
- ) ,
- res );
- conn.done();
- }
-
- log() << " movechunk.start res: " << res << endl;
-
- if ( ok ){
- result.append( res["finishToken"] );
- }
- else {
- errmsg = "startCloneCollection failed: ";
- errmsg += res["errmsg"].valuestrsafe();
- }
- return ok;
- }
-
- } moveShardStartCmd;
- class MoveShardFinishCommand : public MongodShardCommand {
- public:
- MoveShardFinishCommand() : MongodShardCommand( "movechunk.finish" ){}
- virtual void help( stringstream& help ) const {
- help << "should not be calling this directly" << endl;
- }
-
- virtual LockType locktype() const { return WRITE; }
-
- bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
- // see MoveShardStartCommand::run
-
- string ns = cmdObj["movechunk.finish"].valuestrsafe();
- if ( ns.size() == 0 ){
- errmsg = "need ns as cmd value";
- return false;
- }
-
- string to = cmdObj["to"].valuestrsafe();
- if ( to.size() == 0 ){
- errmsg = "need to specify server to move shard to";
- return false;
- }
-
-
- unsigned long long newVersion = extractVersion( cmdObj["newVersion"] , errmsg );
- if ( newVersion == 0 ){
- errmsg = "have to specify new version number";
- return false;
- }
-
- BSONObj finishToken = cmdObj.getObjectField( "finishToken" );
- if ( finishToken.isEmpty() ){
- errmsg = "need finishToken";
- return false;
- }
-
- if ( ns != finishToken["collection"].valuestrsafe() ){
- errmsg = "namespaced don't match";
- return false;
- }
-
- // now we're locked
- globalVersions[ns] = newVersion;
- NSVersions * versions = clientShardVersions.get();
- if ( ! versions ){
- versions = new NSVersions();
- clientShardVersions.reset( versions );
- }
- (*versions)[ns] = newVersion;
-
- BSONObj res;
- bool ok;
-
- {
- dbtemprelease unlock;
-
- ScopedDbConnection conn( to );
- ok = conn->runCommand( "admin" ,
- BSON( "finishCloneCollection" << finishToken ) ,
- res );
- conn.done();
- }
-
- if ( ! ok ){
- // uh oh
- errmsg = "finishCloneCollection failed!";
- result << "finishError" << res;
- return false;
- }
-
- // wait until cursors are clean
- cout << "WARNING: deleting data before ensuring no more cursors TODO" << endl;
-
- {
- BSONObj removeFilter = finishToken.getObjectField( "query" );
- Client::Context ctx(ns);
- long long num = deleteObjects( ns.c_str() , removeFilter , false , true );
- log() << "movechunk.finish deleted: " << num << endl;
- result.appendNumber( "numDeleted" , num );
- }
-
- return true;
- }
-
- } moveShardFinishCmd;
bool haveLocalShardingInfo( const string& ns ){
if ( shardConfigServer.empty() )
@@ -413,7 +255,7 @@ namespace mongo {
if ( version == 0 )
return false;
- NSVersions * versions = clientShardVersions.get();
+ NSVersionMap * versions = clientShardVersions.get();
if ( ! versions )
return false;
@@ -429,11 +271,11 @@ namespace mongo {
return true;
}
- NSVersions::iterator i = globalVersions.find( ns );
+ NSVersionMap::iterator i = globalVersions.find( ns );
if ( i == globalVersions.end() )
return true;
- NSVersions * versions = clientShardVersions.get();
+ NSVersionMap * versions = clientShardVersions.get();
if ( ! versions ){
// this means the client has nothing sharded
// so this allows direct connections to do whatever they want
diff --git a/s/d_logic.h b/s/d_logic.h
index f3174b8bd31..998725ebd1d 100644
--- a/s/d_logic.h
+++ b/s/d_logic.h
@@ -22,9 +22,13 @@
namespace mongo {
- /* queue a write back on a remote server for a failed write */
- void queueWriteBack( const string& remote , const BSONObj& o );
+ // -----------------
+ // --- core ---
+ // -----------------
+ unsigned long long extractVersion( BSONElement e , string& errmsg );
+
+
/**
* @return true if we have any shard info for the ns
*/
@@ -39,4 +43,21 @@ namespace mongo {
* @return true if we took care of the message and nothing else should be done
*/
bool handlePossibleShardedMessage( Message &m, DbResponse &dbresponse );
+
+ // -----------------
+ // --- writeback ---
+ // -----------------
+
+ /* queue a write back on a remote server for a failed write */
+ void queueWriteBack( const string& remote , const BSONObj& o );
+
+
+
+ // TEMP TEMP TEMP
+
+ typedef map<string,unsigned long long> NSVersionMap;
+
+ extern NSVersionMap globalVersions;
+ extern boost::thread_specific_ptr<NSVersionMap> clientShardVersions;
+
}
diff --git a/s/d_migrate.cpp b/s/d_migrate.cpp
new file mode 100644
index 00000000000..15c4bc1a2bb
--- /dev/null
+++ b/s/d_migrate.cpp
@@ -0,0 +1,208 @@
+// d_migrate.cpp
+
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+
+/**
+ these are commands that live in mongod
+ mostly around shard management and checking
+ */
+
+#include "pch.h"
+#include <map>
+#include <string>
+
+#include "../db/commands.h"
+#include "../db/jsobj.h"
+#include "../db/dbmessage.h"
+#include "../db/query.h"
+
+#include "../client/connpool.h"
+
+#include "../util/queue.h"
+
+#include "shard.h"
+#include "d_logic.h"
+
+using namespace std;
+
+namespace mongo {
+
+
+ class MoveShardStartCommand : public Command {
+ public:
+ MoveShardStartCommand() : Command( "movechunk.start" ){}
+ virtual void help( stringstream& help ) const {
+ help << "should not be calling this directly" << endl;
+ }
+
+ virtual bool slaveOk() const { return false; }
+ virtual bool adminOnly() const { return true; }
+ virtual LockType locktype() const { return WRITE; }
+
+ bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+ // so i have to start clone, tell caller its ok to make change
+ // at this point the caller locks me, and updates config db
+ // then finish calls finish, and then deletes data when cursors are done
+
+ string ns = cmdObj["movechunk.start"].valuestrsafe();
+ string to = cmdObj["to"].valuestrsafe();
+ string from = cmdObj["from"].valuestrsafe(); // my public address, a tad redundant, but safe
+ BSONObj filter = cmdObj.getObjectField( "filter" );
+
+ if ( ns.size() == 0 ){
+ errmsg = "need to specify namespace in command";
+ return false;
+ }
+
+ if ( to.size() == 0 ){
+ errmsg = "need to specify server to move shard to";
+ return false;
+ }
+ if ( from.size() == 0 ){
+ errmsg = "need to specify server to move shard from (redundat i know)";
+ return false;
+ }
+
+ if ( filter.isEmpty() ){
+ errmsg = "need to specify a filter";
+ return false;
+ }
+
+ log() << "got movechunk.start: " << cmdObj << endl;
+
+
+ BSONObj res;
+ bool ok;
+
+ {
+ dbtemprelease unlock;
+
+ ScopedDbConnection conn( to );
+ ok = conn->runCommand( "admin" ,
+ BSON( "startCloneCollection" << ns <<
+ "from" << from <<
+ "query" << filter
+ ) ,
+ res );
+ conn.done();
+ }
+
+ log() << " movechunk.start res: " << res << endl;
+
+ if ( ok ){
+ result.append( res["finishToken"] );
+ }
+ else {
+ errmsg = "startCloneCollection failed: ";
+ errmsg += res["errmsg"].valuestrsafe();
+ }
+ return ok;
+ }
+
+ } moveShardStartCmd;
+
+ class MoveShardFinishCommand : public Command {
+ public:
+ MoveShardFinishCommand() : Command( "movechunk.finish" ){}
+ virtual void help( stringstream& help ) const {
+ help << "should not be calling this directly" << endl;
+ }
+
+ virtual bool slaveOk() const { return false; }
+ virtual bool adminOnly() const { return true; }
+ virtual LockType locktype() const { return WRITE; }
+
+ bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+ // see MoveShardStartCommand::run
+
+ string ns = cmdObj["movechunk.finish"].valuestrsafe();
+ if ( ns.size() == 0 ){
+ errmsg = "need ns as cmd value";
+ return false;
+ }
+
+ string to = cmdObj["to"].valuestrsafe();
+ if ( to.size() == 0 ){
+ errmsg = "need to specify server to move shard to";
+ return false;
+ }
+
+
+ unsigned long long newVersion = extractVersion( cmdObj["newVersion"] , errmsg );
+ if ( newVersion == 0 ){
+ errmsg = "have to specify new version number";
+ return false;
+ }
+
+ BSONObj finishToken = cmdObj.getObjectField( "finishToken" );
+ if ( finishToken.isEmpty() ){
+ errmsg = "need finishToken";
+ return false;
+ }
+
+ if ( ns != finishToken["collection"].valuestrsafe() ){
+ errmsg = "namespaced don't match";
+ return false;
+ }
+
+ // now we're locked
+ globalVersions[ns] = newVersion;
+ NSVersionMap * versions = clientShardVersions.get();
+ if ( ! versions ){
+ versions = new NSVersionMap();
+ clientShardVersions.reset( versions );
+ }
+ (*versions)[ns] = newVersion;
+
+ BSONObj res;
+ bool ok;
+
+ {
+ dbtemprelease unlock;
+
+ ScopedDbConnection conn( to );
+ ok = conn->runCommand( "admin" ,
+ BSON( "finishCloneCollection" << finishToken ) ,
+ res );
+ conn.done();
+ }
+
+ if ( ! ok ){
+ // uh oh
+ errmsg = "finishCloneCollection failed!";
+ result << "finishError" << res;
+ return false;
+ }
+
+ // wait until cursors are clean
+ cout << "WARNING: deleting data before ensuring no more cursors TODO" << endl;
+
+ {
+ BSONObj removeFilter = finishToken.getObjectField( "query" );
+ Client::Context ctx(ns);
+ long long num = deleteObjects( ns.c_str() , removeFilter , false , true );
+ log() << "movechunk.finish deleted: " << num << endl;
+ result.appendNumber( "numDeleted" , num );
+ }
+
+ return true;
+ }
+
+ } moveShardFinishCmd;
+
+}
diff --git a/s/d_writeback.cpp b/s/d_writeback.cpp
index 4c0e134b23d..d49d025b63e 100644
--- a/s/d_writeback.cpp
+++ b/s/d_writeback.cpp
@@ -1,4 +1,4 @@
-// d_logic.cpp
+// d_writeback.cpp
/**
* Copyright (C) 2008 10gen Inc.