diff options
-rw-r--r-- | src/mongo/s/catalog/catalog_manager.h | 38 | ||||
-rw-r--r-- | src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp | 142 | ||||
-rw-r--r-- | src/mongo/s/catalog/legacy/catalog_manager_legacy.h | 9 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_remove_shard_cmd.cpp | 190 |
4 files changed, 232 insertions, 147 deletions
diff --git a/src/mongo/s/catalog/catalog_manager.h b/src/mongo/s/catalog/catalog_manager.h index 1c682d70427..96274c0bcbb 100644 --- a/src/mongo/s/catalog/catalog_manager.h +++ b/src/mongo/s/catalog/catalog_manager.h @@ -30,6 +30,7 @@ #include <boost/shared_ptr.hpp> #include <string> +#include <vector> #include "mongo/base/disallow_copying.h" @@ -38,13 +39,23 @@ namespace mongo { class BatchedCommandRequest; class BatchedCommandResponse; class BSONObj; + class ChunkType; class ConnectionString; class DatabaseType; class OperationContext; class Status; template<typename T> class StatusWith; - + /** + * Used to indicate to the caller of the removeShard method whether draining of chunks for + * a particular shard has started, is ongoing, or has been completed. + */ + enum ShardDrainingStatus { + STARTED, + ONGOING, + COMPLETED, + }; + /** * Abstracts reads and writes of the sharding catalog metadata. * @@ -86,6 +97,17 @@ namespace mongo { const long long maxSize) = 0; /** + * Tries to remove a shard. To completely remove a shard from a sharded cluster, + * the data residing in that shard must be moved to the remaining shards in the + * cluster by "draining" chunks from that shard. + * + * Because of the asynchronous nature of the draining mechanism, this method returns + * the current draining status. See ShardDrainingStatus enum definition for more details. + */ + virtual StatusWith<ShardDrainingStatus> removeShard(OperationContext* txn, + const std::string& name) = 0; + + /** * Updates the metadata for a given database. Currently, if the specified DB entry does * not exist, it will be created. */ @@ -97,6 +119,20 @@ namespace mongo { virtual StatusWith<DatabaseType> getDatabase(const std::string& dbName) = 0; /** + * Retrieves all databases for a shard. + * Returns a !OK status if an error occurs. + */ + virtual void getDatabasesForShard(const std::string& shardName, + std::vector<std::string>* dbs) = 0; + + /** + * Gets all chunks (of type ChunkType) for a shard. + * Returns a !OK status if an error occurs. + */ + virtual Status getChunksForShard(const std::string& shardName, + std::vector<ChunkType>* chunks) = 0; + + /** * Logs a diagnostic event locally and on the config server. * * NOTE: This method is best effort so it should never throw. diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp index 81b92f7824a..48774e3ca0d 100644 --- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp +++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp @@ -39,6 +39,7 @@ #include "mongo/client/connpool.h" #include "mongo/client/replica_set_monitor.h" +#include "mongo/db/audit.h" #include "mongo/db/client.h" #include "mongo/db/operation_context.h" #include "mongo/db/server_options.h" @@ -46,7 +47,10 @@ #include "mongo/s/catalog/legacy/config_coordinator.h" #include "mongo/s/catalog/type_changelog.h" #include "mongo/s/client/dbclient_multi_command.h" +#include "mongo/s/client/shard_connection.h" #include "mongo/s/shard.h" +#include "mongo/s/type_chunk.h" +#include "mongo/s/type_shard.h" #include "mongo/s/type_database.h" #include "mongo/s/type_shard.h" #include "mongo/s/write_ops/batched_command_request.h" @@ -237,6 +241,14 @@ namespace { return dbNames; } + BSONObj buildRemoveLogEntry(const string& shardName, bool isDraining) { + BSONObjBuilder details; + details.append("shard", shardName); + details.append("isDraining", isDraining); + + return details.obj(); + } + // Whether the logChange call should attempt to create the changelog collection AtomicInt32 changeLogCollectionCreated(0); @@ -515,6 +527,100 @@ namespace { return shardName; } + StatusWith<ShardDrainingStatus> CatalogManagerLegacy::removeShard(OperationContext* txn, + const std::string& name) { + ScopedDbConnection conn(_configServerConnectionString, 30); + + if (conn->count(ShardType::ConfigNS, + BSON(ShardType::name() << NE << name + << ShardType::draining(true)))) { + conn.done(); + return Status(ErrorCodes::ConflictingOperationInProgress, + "Can't have more than one draining shard at a time"); + } + + if (conn->count(ShardType::ConfigNS, + BSON(ShardType::name() << NE << name)) == 0) { + conn.done(); + return Status(ErrorCodes::IllegalOperation, + "Can't remove last shard"); + } + + BSONObj searchDoc = BSON(ShardType::name() << name); + + // Case 1: start draining chunks + BSONObj drainingDoc = + BSON(ShardType::name() << name << ShardType::draining(true)); + BSONObj shardDoc = conn->findOne(ShardType::ConfigNS, drainingDoc); + if (shardDoc.isEmpty()) { + log() << "going to start draining shard: " << name; + BSONObj newStatus = BSON("$set" << BSON(ShardType::draining(true))); + + Status status = update(ShardType::ConfigNS, searchDoc, newStatus, false, false, NULL); + if (!status.isOK()) { + log() << "error starting removeShard: " << name + << "; err: " << status.reason(); + return status; + } + + BSONObj primaryLocalDoc = BSON(DatabaseType::name("local") << + DatabaseType::primary(name)); + log() << "primaryLocalDoc: " << primaryLocalDoc; + if (conn->count(DatabaseType::ConfigNS, primaryLocalDoc)) { + log() << "This shard is listed as primary of local db. Removing entry."; + + Status status = remove(DatabaseType::ConfigNS, + BSON(DatabaseType::name("local")), + 0, + NULL); + if (!status.isOK()) { + log() << "error removing local db: " + << status.reason(); + return status; + } + } + + Shard::reloadShardInfo(); + conn.done(); + + // Record start in changelog + logChange(txn, "removeShard.start", "", buildRemoveLogEntry(name, true)); + return ShardDrainingStatus::STARTED; + } + + // Case 2: all chunks drained + BSONObj shardIDDoc = BSON(ChunkType::shard(shardDoc[ShardType::name()].str())); + long long chunkCount = conn->count(ChunkType::ConfigNS, shardIDDoc); + long long dbCount = conn->count(DatabaseType::ConfigNS, + BSON(DatabaseType::name.ne("local") + << DatabaseType::primary(name))); + if (chunkCount == 0 && dbCount == 0) { + log() << "going to remove shard: " << name; + audit::logRemoveShard(ClientBasic::getCurrent(), name); + + Status status = remove(ShardType::ConfigNS, searchDoc, 0, NULL); + if (!status.isOK()) { + log() << "Error concluding removeShard operation on: " << name + << "; err: " << status.reason(); + return status; + } + + Shard::removeShard(name); + shardConnectionPool.removeHost(name); + ReplicaSetMonitor::remove(name, true); + + Shard::reloadShardInfo(); + conn.done(); + + // Record finish in changelog + logChange(txn, "removeShard", "", buildRemoveLogEntry(name, false)); + return ShardDrainingStatus::COMPLETED; + } + + // case 3: draining ongoing + return ShardDrainingStatus::ONGOING; + } + Status CatalogManagerLegacy::updateDatabase(const std::string& dbName, const DatabaseType& db) { fassert(28616, db.validate()); @@ -597,6 +703,42 @@ namespace { } } + void CatalogManagerLegacy::getDatabasesForShard(const string& shardName, + vector<string>* dbs) { + ScopedDbConnection conn(_configServerConnectionString, 30.0); + BSONObj prim = BSON(DatabaseType::primary(shardName)); + boost::scoped_ptr<DBClientCursor> cursor(conn->query(DatabaseType::ConfigNS, prim)); + + while (cursor->more()) { + BSONObj shard = cursor->nextSafe(); + dbs->push_back(shard[DatabaseType::name()].str()); + } + + conn.done(); + } + + Status CatalogManagerLegacy::getChunksForShard(const string& shardName, + vector<ChunkType>* chunks) { + ScopedDbConnection conn(_configServerConnectionString, 30.0); + boost::scoped_ptr<DBClientCursor> cursor(conn->query(ChunkType::ConfigNS, + BSON(ChunkType::shard(shardName)))); + while (cursor->more()) { + BSONObj chunkObj = cursor->nextSafe(); + + StatusWith<ChunkType> chunkRes = ChunkType::fromBSON(chunkObj); + if (!chunkRes.isOK()) { + return Status(ErrorCodes::FailedToParse, + str::stream() << "Failed to parse chunk BSONObj: " + << chunkRes.getStatus().reason()); + } + ChunkType chunk = chunkRes.getValue(); + chunks->push_back(chunk); + } + conn.done(); + + return Status::OK(); + } + void CatalogManagerLegacy::writeConfigServerDirect(const BatchedCommandRequest& request, BatchedCommandResponse* response) { diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.h b/src/mongo/s/catalog/legacy/catalog_manager_legacy.h index f017ad60111..016c68c503b 100644 --- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.h +++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.h @@ -56,10 +56,19 @@ namespace mongo { const ConnectionString& shardConnectionString, const long long maxSize); + virtual StatusWith<ShardDrainingStatus> removeShard(OperationContext* txn, + const std::string& name); + virtual Status updateDatabase(const std::string& dbName, const DatabaseType& db); virtual StatusWith<DatabaseType> getDatabase(const std::string& dbName); + virtual void getDatabasesForShard(const std::string& shardName, + std::vector<std::string>* dbs); + + virtual Status getChunksForShard(const std::string& shardName, + std::vector<ChunkType>* chunks); + virtual void logChange(OperationContext* txn, const std::string& what, const std::string& ns, diff --git a/src/mongo/s/commands/cluster_remove_shard_cmd.cpp b/src/mongo/s/commands/cluster_remove_shard_cmd.cpp index 626aa4d5867..ac75bf7483b 100644 --- a/src/mongo/s/commands/cluster_remove_shard_cmd.cpp +++ b/src/mongo/s/commands/cluster_remove_shard_cmd.cpp @@ -31,16 +31,13 @@ #include "mongo/platform/basic.h" #include <string> +#include <vector> #include "mongo/client/connpool.h" -#include "mongo/client/replica_set_monitor.h" -#include "mongo/db/audit.h" #include "mongo/db/commands.h" #include "mongo/db/operation_context.h" #include "mongo/s/catalog/catalog_manager.h" -#include "mongo/s/client/shard_connection.h" #include "mongo/s/cluster_write.h" -#include "mongo/s/config.h" #include "mongo/s/grid.h" #include "mongo/s/shard.h" #include "mongo/s/type_chunk.h" @@ -51,18 +48,10 @@ namespace mongo { using std::string; + using std::vector; namespace { - BSONObj buildRemoveLogEntry(Shard s, bool isDraining) { - BSONObjBuilder details; - details.append("shard", s.getName()); - details.append("isDraining", isDraining); - - return details.obj(); - } - - class RemoveShardCmd : public Command { public: RemoveShardCmd() : Command("removeShard", false, "removeshard") { } @@ -112,159 +101,68 @@ namespace { Status(ErrorCodes::ShardNotFound, msg)); } - ScopedDbConnection conn(configServer.getPrimary().getConnString(), 30); - - if (conn->count(ShardType::ConfigNS, - BSON(ShardType::name() << NE << s.getName() - << ShardType::draining(true)))) { - conn.done(); - errmsg = "Can't have more than one draining shard at a time"; - return false; + StatusWith<ShardDrainingStatus> removeShardResult = + grid.catalogManager()->removeShard(txn, s.getName()); + if (!removeShardResult.isOK()) { + return appendCommandStatus(result, removeShardResult.getStatus()); } - if (conn->count(ShardType::ConfigNS, - BSON(ShardType::name() << NE << s.getName())) == 0) { - conn.done(); - errmsg = "Can't remove last shard"; - return false; - } + vector<string> databases; + grid.catalogManager()->getDatabasesForShard(s.getName(), &databases); - BSONObj primaryDoc = - BSON(DatabaseType::name.ne("local") << DatabaseType::primary(s.getName())); - - BSONObj dbInfo; // appended at end of result on success + // Get BSONObj containing: + // 1) note about moving or dropping databases in a shard + // 2) list of databases (excluding 'local' database) that need to be moved + BSONObj dbInfo; { - boost::scoped_ptr<DBClientCursor> cursor(conn->query(DatabaseType::ConfigNS, - primaryDoc)); - if (cursor->more()) { - // Skip block and allocations if empty - BSONObjBuilder dbInfoBuilder; - dbInfoBuilder.append("note", - "you need to drop or movePrimary these databases"); - - BSONArrayBuilder dbs(dbInfoBuilder.subarrayStart("dbsToMove")); - while (cursor->more()){ - BSONObj db = cursor->nextSafe(); - dbs.append(db[DatabaseType::name()]); + BSONObjBuilder dbInfoBuilder; + dbInfoBuilder.append("note", + "you need to drop or movePrimary these databases"); + BSONArrayBuilder dbs(dbInfoBuilder.subarrayStart("dbsToMove")); + for (vector<string>::const_iterator it = databases.begin(); + it != databases.end(); + it++) { + if (*it != "local") { + dbs.append(*it); } - dbs.doneFast(); - - dbInfo = dbInfoBuilder.obj(); } + dbs.doneFast(); + dbInfo = dbInfoBuilder.obj(); } - // If the server is not yet draining chunks, put it in draining mode. - BSONObj searchDoc = BSON(ShardType::name() << s.getName()); - BSONObj drainingDoc = - BSON(ShardType::name() << s.getName() << ShardType::draining(true)); - - BSONObj shardDoc = conn->findOne(ShardType::ConfigNS, drainingDoc); - if (shardDoc.isEmpty()) { - log() << "going to start draining shard: " << s.getName(); - BSONObj newStatus = BSON("$set" << BSON(ShardType::draining(true))); - - Status status = grid.catalogManager()->update(ShardType::ConfigNS, - searchDoc, - newStatus, - false, - false, - NULL); - if (!status.isOK()) { - errmsg = status.reason(); - log() << "error starting remove shard: " << s.getName() - << " err: " << errmsg; - return false; - } - - BSONObj primaryLocalDoc = BSON(DatabaseType::name("local") << - DatabaseType::primary(s.getName())); - PRINT(primaryLocalDoc); - - if (conn->count(DatabaseType::ConfigNS, primaryLocalDoc)) { - log() << "This shard is listed as primary of local db. Removing entry."; - - Status status = grid.catalogManager()->remove( - DatabaseType::ConfigNS, - BSON(DatabaseType::name("local")), - 0, - NULL); - if (!status.isOK()) { - log() << "error removing local db: " - << status.reason(); - return false; - } - } - - Shard::reloadShardInfo(); - + // TODO: Standardize/Seperate how we append to the result object + switch (removeShardResult.getValue()) { + case ShardDrainingStatus::STARTED: result.append("msg", "draining started successfully"); result.append("state", "started"); result.append("shard", s.getName()); result.appendElements(dbInfo); - - conn.done(); - - // Record start in changelog - grid.catalogManager()->logChange(txn, - "removeShard.start", - "", - buildRemoveLogEntry(s, true)); - return true; - } - - // If the server has been completely drained, remove it from the ConfigDB. Check not - // only for chunks but also databases. - BSONObj shardIDDoc = BSON(ChunkType::shard(shardDoc[ShardType::name()].str())); - long long chunkCount = conn->count(ChunkType::ConfigNS, shardIDDoc); - long long dbCount = conn->count(DatabaseType::ConfigNS, primaryDoc); - - if ((chunkCount == 0) && (dbCount == 0)) { - log() << "going to remove shard: " << s.getName(); - audit::logRemoveShard(ClientBasic::getCurrent(), s.getName()); - - Status status = grid.catalogManager()->remove(ShardType::ConfigNS, - searchDoc, - 0, - NULL); + break; + case ShardDrainingStatus::ONGOING: { + vector<ChunkType> chunks; + Status status = grid.catalogManager()->getChunksForShard(s.getName(), &chunks); if (!status.isOK()) { - errmsg = status.reason(); - log() << "error concluding remove shard: " << s.getName() - << " err: " << errmsg; - return false; + return appendCommandStatus(result, status); } - const string shardName = shardDoc[ShardType::name()].str(); - Shard::removeShard(shardName); - shardConnectionPool.removeHost(shardName); - ReplicaSetMonitor::remove(shardName, true); - - Shard::reloadShardInfo(); - + result.append("msg", "draining ongoing"); + result.append("state", "ongoing"); + { + BSONObjBuilder inner; + inner.append("chunks", static_cast<long long>(chunks.size())); + inner.append("dbs", static_cast<long long>(databases.size())); + BSONObj b = inner.obj(); + result.append("remaining", b); + } + result.appendElements(dbInfo); + break; + } + case ShardDrainingStatus::COMPLETED: result.append("msg", "removeshard completed successfully"); result.append("state", "completed"); result.append("shard", s.getName()); - - conn.done(); - - // Record finish in changelog - grid.catalogManager()->logChange(txn, - "removeShard", - "", - buildRemoveLogEntry(s, false)); - return true; } - // If the server is already in draining mode, just report on its progress. - // Report on databases (not just chunks) that are left too. - result.append("msg", "draining ongoing"); - result.append("state", "ongoing"); - BSONObjBuilder inner; - inner.append("chunks", chunkCount); - inner.append("dbs", dbCount); - result.append("remaining", inner.obj()); - result.appendElements(dbInfo); - - conn.done(); return true; } |